"""Dataset Card Drafter - MVP Space. Watches davanstrien/* datasets and opens PRs with auto-generated descriptions. """ import asyncio import json import logging import os from datetime import datetime from pathlib import Path import gradio as gr from huggingface_hub import ( DatasetCard, WebhookPayload, WebhooksServer, get_repo_discussions, ) from description_generator import ViewerNotReadyError, generate_description # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Space URL for attribution SPACE_URL = "https://huggingface.co/spaces/librarian-bots/dataset-card-drafter" BOT_NAME = "librarian-bot" PR_TITLE = "Add dataset description" # Configuration WATCHED_PREFIXES = ["davanstrien/"] # Repos to watch MIN_DESCRIPTION_LENGTH = 100 # Chars below which we generate # Persistence directory DATA_DIR = Path("/data") if Path("/data").exists() else Path("./local_data") DATA_DIR.mkdir(exist_ok=True) PROCESSED_FILE = DATA_DIR / "processed.json" PENDING_FILE = DATA_DIR / "pending.json" # Retry configuration RETRY_DELAYS = [60, 120, 300] # Seconds to wait between retries (1min, 2min, 5min) def load_processed() -> dict: """Load processed datasets from persistence.""" if PROCESSED_FILE.exists(): return json.loads(PROCESSED_FILE.read_text()) return {} def save_processed(data: dict) -> None: """Save processed datasets to persistence.""" PROCESSED_FILE.write_text(json.dumps(data, indent=2)) def load_pending() -> dict: """Load pending datasets from persistence.""" if PENDING_FILE.exists(): return json.loads(PENDING_FILE.read_text()) return {} def save_pending(data: dict) -> None: """Save pending datasets to persistence.""" PENDING_FILE.write_text(json.dumps(data, indent=2)) def add_to_pending(dataset_id: str, reason: str) -> None: """Add a dataset to the pending queue.""" pending = load_pending() pending[dataset_id] = { "added": datetime.now().isoformat(), "reason": reason, "retries": 0, } save_pending(pending) logger.info(f"Added {dataset_id} to pending queue: {reason}") def remove_from_pending(dataset_id: str) -> None: """Remove a dataset from the pending queue.""" pending = load_pending() if dataset_id in pending: del pending[dataset_id] save_pending(pending) logger.info(f"Removed {dataset_id} from pending queue") def is_watched_repo(repo_name: str) -> bool: """Check if a repo is in our watched list.""" return any(repo_name.startswith(prefix) for prefix in WATCHED_PREFIXES) def should_generate(card: DatasetCard) -> bool: """Check if a dataset card needs a description.""" if not card.text: return True return len(card.text.strip()) < MIN_DESCRIPTION_LENGTH def has_existing_pr(dataset_id: str) -> bool: """Check if there's already an open PR from librarian-bot for this dataset.""" try: discussions = list(get_repo_discussions(dataset_id, repo_type="dataset")) for discussion in discussions: if not discussion.is_pull_request: continue # Check if it's from librarian-bot and matches our PR title if discussion.author == BOT_NAME and discussion.title == PR_TITLE: # Check if still open if discussion.status == "open": return True return False except Exception: # If we can't check, err on the side of caution return False def build_pr_description(description: str) -> str: """Build the PR description with attribution and the generated content.""" return f"""## Auto-generated Dataset Description This PR was automatically generated by the [Dataset Card Drafter]({SPACE_URL}) Space. ### Suggested Description {description} --- *If you find this description helpful, please merge the PR. If you'd like to edit it first, you can modify the README.md in this branch before merging.* *Generated by [{BOT_NAME}]({SPACE_URL})*""" async def process_dataset(dataset_id: str, inference_token: str, pr_token: str) -> dict: """Process a single dataset: check, generate, and open PR. Args: dataset_id: The dataset to process inference_token: Token for inference API calls (e.g., davanstrien's token) pr_token: Token for opening PRs (librarian-bot's token) Returns a status dict with results. """ # Check for existing open PR first if has_existing_pr(dataset_id): remove_from_pending(dataset_id) # Clean up if it was pending return {"status": "skipped", "reason": "open PR already exists"} # Load current card (or create new if none exists) try: card = DatasetCard.load(dataset_id) except Exception: # No README exists - create a new card card = DatasetCard("") logger.info(f"No existing card for {dataset_id}, creating new one") # Check if description needed if not should_generate(card): remove_from_pending(dataset_id) # Clean up if it was pending return {"status": "skipped", "reason": "description exists"} # Generate description using inference token try: description = generate_description(dataset_id, inference_token) except ViewerNotReadyError as e: return {"status": "pending", "reason": str(e)} except Exception as e: return {"status": "error", "reason": f"generation failed: {e}"} if not description: return {"status": "error", "reason": "empty description generated"} # Update card and push as PR using librarian-bot token card.text = description try: commit_info = card.push_to_hub( repo_id=dataset_id, repo_type="dataset", commit_message=PR_TITLE, commit_description=build_pr_description(description), create_pr=True, token=pr_token, ) pr_url = getattr(commit_info, "pr_url", str(commit_info)) except Exception as e: return {"status": "error", "reason": f"PR creation failed: {e}"} # Success - remove from pending if it was there remove_from_pending(dataset_id) return {"status": "pr_created", "pr_url": pr_url, "description": description} async def retry_pending_dataset(dataset_id: str) -> None: """Background task to retry a pending dataset after delays.""" inference_token = os.getenv("HF_TOKEN") pr_token = os.getenv("LIBRARIAN_BOT_TOKEN") if not inference_token or not pr_token: logger.error("Missing tokens for retry") return for i, delay in enumerate(RETRY_DELAYS): logger.info(f"Waiting {delay}s before retry {i + 1} for {dataset_id}") await asyncio.sleep(delay) # Update retry count pending = load_pending() if dataset_id not in pending: logger.info(f"{dataset_id} no longer pending, stopping retries") return pending[dataset_id]["retries"] = i + 1 save_pending(pending) # Try processing result = await process_dataset(dataset_id, inference_token, pr_token) if result["status"] == "pr_created": logger.info(f"Successfully processed {dataset_id} on retry {i + 1}") # Log to processed processed = load_processed() processed[dataset_id] = { "pr_url": result.get("pr_url"), "timestamp": datetime.now().isoformat(), "status": "pr_created", "trigger": "retry", "retry_attempt": i + 1, } save_processed(processed) return elif result["status"] != "pending": # Got a definitive answer (error or skipped), stop retrying logger.info(f"Stopping retries for {dataset_id}: {result}") remove_from_pending(dataset_id) return # Exhausted retries logger.warning(f"Exhausted retries for {dataset_id}") pending = load_pending() if dataset_id in pending: pending[dataset_id]["exhausted"] = True save_pending(pending) # Gradio UI with gr.Blocks(title="Dataset Card Drafter") as demo: gr.Markdown("# Dataset Card Drafter MVP") gr.Markdown( f"Watching datasets matching: `{'`, `'.join(WATCHED_PREFIXES)}`\n\n" f"Triggers when description < {MIN_DESCRIPTION_LENGTH} characters." ) with gr.Tab("Status"): status_display = gr.JSON(label="Processed Datasets", value=load_processed) refresh_btn = gr.Button("Refresh") refresh_btn.click(fn=load_processed, outputs=status_display) with gr.Tab("Pending"): gr.Markdown( "Datasets waiting for the viewer to be ready.\n\n" "Background retries happen at 1min, 2min, 5min intervals." ) pending_display = gr.JSON(label="Pending Datasets", value=load_pending) pending_refresh_btn = gr.Button("Refresh") pending_refresh_btn.click(fn=load_pending, outputs=pending_display) # Manual retry button retry_input = gr.Textbox( label="Dataset ID to retry", placeholder="davanstrien/dataset-name", ) retry_btn = gr.Button("Retry Now") retry_output = gr.JSON(label="Result") async def manual_retry(dataset_id: str): if not dataset_id: return {"status": "error", "reason": "no dataset ID provided"} inference_token = os.getenv("HF_TOKEN") pr_token = os.getenv("LIBRARIAN_BOT_TOKEN") if not inference_token or not pr_token: return {"status": "error", "reason": "tokens not configured"} result = await process_dataset(dataset_id, inference_token, pr_token) if result.get("status") == "pr_created": processed = load_processed() processed[dataset_id] = { "pr_url": result.get("pr_url"), "timestamp": datetime.now().isoformat(), "status": "pr_created", "trigger": "manual_retry", } save_processed(processed) return result retry_btn.click( fn=manual_retry, inputs=retry_input, outputs=retry_output, ) with gr.Tab("Manual Test"): gr.Markdown( "Test description generation without opening a PR.\n\n" "**Note:** This requires `HF_TOKEN` to be set." ) test_input = gr.Textbox( label="Dataset ID", placeholder="davanstrien/test-dataset", ) test_btn = gr.Button("Generate Description (Preview)") test_output = gr.Textbox(label="Generated Description", lines=5) test_status = gr.JSON(label="Status") def test_generate(dataset_id: str): if not dataset_id: return "", {"status": "error", "reason": "no dataset ID provided"} hf_token = os.getenv("HF_TOKEN") if not hf_token: return "", {"status": "error", "reason": "HF_TOKEN not set"} try: description = generate_description(dataset_id, hf_token) return description, {"status": "success", "length": len(description)} except Exception as e: return "", {"status": "error", "reason": str(e)} test_btn.click( fn=test_generate, inputs=test_input, outputs=[test_output, test_status], ) with gr.Tab("Trigger PR"): gr.Markdown( "Manually trigger description generation and PR creation.\n\n" "**Warning:** This will open a real PR!\n\n" "Requires `HF_TOKEN` (for inference) and `LIBRARIAN_BOT_TOKEN` (for PRs)." ) trigger_input = gr.Textbox( label="Dataset ID", placeholder="davanstrien/test-dataset", ) trigger_btn = gr.Button("Generate & Open PR", variant="primary") trigger_output = gr.JSON(label="Result") async def trigger_pr(dataset_id: str): if not dataset_id: return {"status": "error", "reason": "no dataset ID provided"} inference_token = os.getenv("HF_TOKEN") pr_token = os.getenv("LIBRARIAN_BOT_TOKEN") if not inference_token: return {"status": "error", "reason": "HF_TOKEN not set"} if not pr_token: return {"status": "error", "reason": "LIBRARIAN_BOT_TOKEN not set"} result = await process_dataset(dataset_id, inference_token, pr_token) # Save to processed log if result.get("status") == "pr_created": processed = load_processed() processed[dataset_id] = { "pr_url": result.get("pr_url"), "timestamp": datetime.now().isoformat(), "status": "pr_created", "trigger": "manual", } save_processed(processed) return result trigger_btn.click( fn=trigger_pr, inputs=trigger_input, outputs=trigger_output, ) # WebhooksServer with automatic secret verification app = WebhooksServer(ui=demo, webhook_secret=os.getenv("WEBHOOK_SECRET")) @app.add_webhook("/dataset_update") async def handle_dataset_webhook(payload: WebhookPayload) -> dict: """Handle dataset creation/update webhooks.""" # Filter for datasets only if payload.repo.type != "dataset": return {"status": "skipped", "reason": "not a dataset"} # Filter for watched repos if not is_watched_repo(payload.repo.name): return {"status": "skipped", "reason": "not in watched list"} dataset_id = payload.repo.name # Get tokens inference_token = os.getenv("HF_TOKEN") pr_token = os.getenv("LIBRARIAN_BOT_TOKEN") if not inference_token: return {"status": "error", "reason": "HF_TOKEN not configured"} if not pr_token: return {"status": "error", "reason": "LIBRARIAN_BOT_TOKEN not configured"} # Process the dataset result = await process_dataset(dataset_id, inference_token, pr_token) # Handle pending status - queue for retry if result.get("status") == "pending": add_to_pending(dataset_id, result.get("reason", "viewer not ready")) # Spawn background retry task (non-blocking) asyncio.create_task(retry_pending_dataset(dataset_id)) logger.info(f"Queued {dataset_id} for background retry") return result # Save to processed log processed = load_processed() processed[dataset_id] = { "pr_url": result.get("pr_url"), "timestamp": datetime.now().isoformat(), "status": result.get("status"), "reason": result.get("reason"), "trigger": "webhook", "event": payload.event.action if payload.event else None, } save_processed(processed) return result if __name__ == "__main__": app.launch(ssr_mode=False)