davanstrien's picture
davanstrien HF Staff
Fix: Handle datasets with no README.md
af6fe58
"""Dataset Card Drafter - MVP Space.
Watches davanstrien/* datasets and opens PRs with auto-generated descriptions.
"""
import asyncio
import json
import logging
import os
from datetime import datetime
from pathlib import Path
import gradio as gr
from huggingface_hub import (
DatasetCard,
WebhookPayload,
WebhooksServer,
get_repo_discussions,
)
from description_generator import ViewerNotReadyError, generate_description
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Space URL for attribution
SPACE_URL = "https://huggingface.co/spaces/librarian-bots/dataset-card-drafter"
BOT_NAME = "librarian-bot"
PR_TITLE = "Add dataset description"
# Configuration
WATCHED_PREFIXES = ["davanstrien/"] # Repos to watch
MIN_DESCRIPTION_LENGTH = 100 # Chars below which we generate
# Persistence directory
DATA_DIR = Path("/data") if Path("/data").exists() else Path("./local_data")
DATA_DIR.mkdir(exist_ok=True)
PROCESSED_FILE = DATA_DIR / "processed.json"
PENDING_FILE = DATA_DIR / "pending.json"
# Retry configuration
RETRY_DELAYS = [60, 120, 300] # Seconds to wait between retries (1min, 2min, 5min)
def load_processed() -> dict:
"""Load processed datasets from persistence."""
if PROCESSED_FILE.exists():
return json.loads(PROCESSED_FILE.read_text())
return {}
def save_processed(data: dict) -> None:
"""Save processed datasets to persistence."""
PROCESSED_FILE.write_text(json.dumps(data, indent=2))
def load_pending() -> dict:
"""Load pending datasets from persistence."""
if PENDING_FILE.exists():
return json.loads(PENDING_FILE.read_text())
return {}
def save_pending(data: dict) -> None:
"""Save pending datasets to persistence."""
PENDING_FILE.write_text(json.dumps(data, indent=2))
def add_to_pending(dataset_id: str, reason: str) -> None:
"""Add a dataset to the pending queue."""
pending = load_pending()
pending[dataset_id] = {
"added": datetime.now().isoformat(),
"reason": reason,
"retries": 0,
}
save_pending(pending)
logger.info(f"Added {dataset_id} to pending queue: {reason}")
def remove_from_pending(dataset_id: str) -> None:
"""Remove a dataset from the pending queue."""
pending = load_pending()
if dataset_id in pending:
del pending[dataset_id]
save_pending(pending)
logger.info(f"Removed {dataset_id} from pending queue")
def is_watched_repo(repo_name: str) -> bool:
"""Check if a repo is in our watched list."""
return any(repo_name.startswith(prefix) for prefix in WATCHED_PREFIXES)
def should_generate(card: DatasetCard) -> bool:
"""Check if a dataset card needs a description."""
if not card.text:
return True
return len(card.text.strip()) < MIN_DESCRIPTION_LENGTH
def has_existing_pr(dataset_id: str) -> bool:
"""Check if there's already an open PR from librarian-bot for this dataset."""
try:
discussions = list(get_repo_discussions(dataset_id, repo_type="dataset"))
for discussion in discussions:
if not discussion.is_pull_request:
continue
# Check if it's from librarian-bot and matches our PR title
if discussion.author == BOT_NAME and discussion.title == PR_TITLE:
# Check if still open
if discussion.status == "open":
return True
return False
except Exception:
# If we can't check, err on the side of caution
return False
def build_pr_description(description: str) -> str:
"""Build the PR description with attribution and the generated content."""
return f"""## Auto-generated Dataset Description
This PR was automatically generated by the [Dataset Card Drafter]({SPACE_URL}) Space.
### Suggested Description
{description}
---
*If you find this description helpful, please merge the PR. If you'd like to edit it first, you can modify the README.md in this branch before merging.*
*Generated by [{BOT_NAME}]({SPACE_URL})*"""
async def process_dataset(dataset_id: str, inference_token: str, pr_token: str) -> dict:
"""Process a single dataset: check, generate, and open PR.
Args:
dataset_id: The dataset to process
inference_token: Token for inference API calls (e.g., davanstrien's token)
pr_token: Token for opening PRs (librarian-bot's token)
Returns a status dict with results.
"""
# Check for existing open PR first
if has_existing_pr(dataset_id):
remove_from_pending(dataset_id) # Clean up if it was pending
return {"status": "skipped", "reason": "open PR already exists"}
# Load current card (or create new if none exists)
try:
card = DatasetCard.load(dataset_id)
except Exception:
# No README exists - create a new card
card = DatasetCard("")
logger.info(f"No existing card for {dataset_id}, creating new one")
# Check if description needed
if not should_generate(card):
remove_from_pending(dataset_id) # Clean up if it was pending
return {"status": "skipped", "reason": "description exists"}
# Generate description using inference token
try:
description = generate_description(dataset_id, inference_token)
except ViewerNotReadyError as e:
return {"status": "pending", "reason": str(e)}
except Exception as e:
return {"status": "error", "reason": f"generation failed: {e}"}
if not description:
return {"status": "error", "reason": "empty description generated"}
# Update card and push as PR using librarian-bot token
card.text = description
try:
commit_info = card.push_to_hub(
repo_id=dataset_id,
repo_type="dataset",
commit_message=PR_TITLE,
commit_description=build_pr_description(description),
create_pr=True,
token=pr_token,
)
pr_url = getattr(commit_info, "pr_url", str(commit_info))
except Exception as e:
return {"status": "error", "reason": f"PR creation failed: {e}"}
# Success - remove from pending if it was there
remove_from_pending(dataset_id)
return {"status": "pr_created", "pr_url": pr_url, "description": description}
async def retry_pending_dataset(dataset_id: str) -> None:
"""Background task to retry a pending dataset after delays."""
inference_token = os.getenv("HF_TOKEN")
pr_token = os.getenv("LIBRARIAN_BOT_TOKEN")
if not inference_token or not pr_token:
logger.error("Missing tokens for retry")
return
for i, delay in enumerate(RETRY_DELAYS):
logger.info(f"Waiting {delay}s before retry {i + 1} for {dataset_id}")
await asyncio.sleep(delay)
# Update retry count
pending = load_pending()
if dataset_id not in pending:
logger.info(f"{dataset_id} no longer pending, stopping retries")
return
pending[dataset_id]["retries"] = i + 1
save_pending(pending)
# Try processing
result = await process_dataset(dataset_id, inference_token, pr_token)
if result["status"] == "pr_created":
logger.info(f"Successfully processed {dataset_id} on retry {i + 1}")
# Log to processed
processed = load_processed()
processed[dataset_id] = {
"pr_url": result.get("pr_url"),
"timestamp": datetime.now().isoformat(),
"status": "pr_created",
"trigger": "retry",
"retry_attempt": i + 1,
}
save_processed(processed)
return
elif result["status"] != "pending":
# Got a definitive answer (error or skipped), stop retrying
logger.info(f"Stopping retries for {dataset_id}: {result}")
remove_from_pending(dataset_id)
return
# Exhausted retries
logger.warning(f"Exhausted retries for {dataset_id}")
pending = load_pending()
if dataset_id in pending:
pending[dataset_id]["exhausted"] = True
save_pending(pending)
# Gradio UI
with gr.Blocks(title="Dataset Card Drafter") as demo:
gr.Markdown("# Dataset Card Drafter MVP")
gr.Markdown(
f"Watching datasets matching: `{'`, `'.join(WATCHED_PREFIXES)}`\n\n"
f"Triggers when description < {MIN_DESCRIPTION_LENGTH} characters."
)
with gr.Tab("Status"):
status_display = gr.JSON(label="Processed Datasets", value=load_processed)
refresh_btn = gr.Button("Refresh")
refresh_btn.click(fn=load_processed, outputs=status_display)
with gr.Tab("Pending"):
gr.Markdown(
"Datasets waiting for the viewer to be ready.\n\n"
"Background retries happen at 1min, 2min, 5min intervals."
)
pending_display = gr.JSON(label="Pending Datasets", value=load_pending)
pending_refresh_btn = gr.Button("Refresh")
pending_refresh_btn.click(fn=load_pending, outputs=pending_display)
# Manual retry button
retry_input = gr.Textbox(
label="Dataset ID to retry",
placeholder="davanstrien/dataset-name",
)
retry_btn = gr.Button("Retry Now")
retry_output = gr.JSON(label="Result")
async def manual_retry(dataset_id: str):
if not dataset_id:
return {"status": "error", "reason": "no dataset ID provided"}
inference_token = os.getenv("HF_TOKEN")
pr_token = os.getenv("LIBRARIAN_BOT_TOKEN")
if not inference_token or not pr_token:
return {"status": "error", "reason": "tokens not configured"}
result = await process_dataset(dataset_id, inference_token, pr_token)
if result.get("status") == "pr_created":
processed = load_processed()
processed[dataset_id] = {
"pr_url": result.get("pr_url"),
"timestamp": datetime.now().isoformat(),
"status": "pr_created",
"trigger": "manual_retry",
}
save_processed(processed)
return result
retry_btn.click(
fn=manual_retry,
inputs=retry_input,
outputs=retry_output,
)
with gr.Tab("Manual Test"):
gr.Markdown(
"Test description generation without opening a PR.\n\n"
"**Note:** This requires `HF_TOKEN` to be set."
)
test_input = gr.Textbox(
label="Dataset ID",
placeholder="davanstrien/test-dataset",
)
test_btn = gr.Button("Generate Description (Preview)")
test_output = gr.Textbox(label="Generated Description", lines=5)
test_status = gr.JSON(label="Status")
def test_generate(dataset_id: str):
if not dataset_id:
return "", {"status": "error", "reason": "no dataset ID provided"}
hf_token = os.getenv("HF_TOKEN")
if not hf_token:
return "", {"status": "error", "reason": "HF_TOKEN not set"}
try:
description = generate_description(dataset_id, hf_token)
return description, {"status": "success", "length": len(description)}
except Exception as e:
return "", {"status": "error", "reason": str(e)}
test_btn.click(
fn=test_generate,
inputs=test_input,
outputs=[test_output, test_status],
)
with gr.Tab("Trigger PR"):
gr.Markdown(
"Manually trigger description generation and PR creation.\n\n"
"**Warning:** This will open a real PR!\n\n"
"Requires `HF_TOKEN` (for inference) and `LIBRARIAN_BOT_TOKEN` (for PRs)."
)
trigger_input = gr.Textbox(
label="Dataset ID",
placeholder="davanstrien/test-dataset",
)
trigger_btn = gr.Button("Generate & Open PR", variant="primary")
trigger_output = gr.JSON(label="Result")
async def trigger_pr(dataset_id: str):
if not dataset_id:
return {"status": "error", "reason": "no dataset ID provided"}
inference_token = os.getenv("HF_TOKEN")
pr_token = os.getenv("LIBRARIAN_BOT_TOKEN")
if not inference_token:
return {"status": "error", "reason": "HF_TOKEN not set"}
if not pr_token:
return {"status": "error", "reason": "LIBRARIAN_BOT_TOKEN not set"}
result = await process_dataset(dataset_id, inference_token, pr_token)
# Save to processed log
if result.get("status") == "pr_created":
processed = load_processed()
processed[dataset_id] = {
"pr_url": result.get("pr_url"),
"timestamp": datetime.now().isoformat(),
"status": "pr_created",
"trigger": "manual",
}
save_processed(processed)
return result
trigger_btn.click(
fn=trigger_pr,
inputs=trigger_input,
outputs=trigger_output,
)
# WebhooksServer with automatic secret verification
app = WebhooksServer(ui=demo, webhook_secret=os.getenv("WEBHOOK_SECRET"))
@app.add_webhook("/dataset_update")
async def handle_dataset_webhook(payload: WebhookPayload) -> dict:
"""Handle dataset creation/update webhooks."""
# Filter for datasets only
if payload.repo.type != "dataset":
return {"status": "skipped", "reason": "not a dataset"}
# Filter for watched repos
if not is_watched_repo(payload.repo.name):
return {"status": "skipped", "reason": "not in watched list"}
dataset_id = payload.repo.name
# Get tokens
inference_token = os.getenv("HF_TOKEN")
pr_token = os.getenv("LIBRARIAN_BOT_TOKEN")
if not inference_token:
return {"status": "error", "reason": "HF_TOKEN not configured"}
if not pr_token:
return {"status": "error", "reason": "LIBRARIAN_BOT_TOKEN not configured"}
# Process the dataset
result = await process_dataset(dataset_id, inference_token, pr_token)
# Handle pending status - queue for retry
if result.get("status") == "pending":
add_to_pending(dataset_id, result.get("reason", "viewer not ready"))
# Spawn background retry task (non-blocking)
asyncio.create_task(retry_pending_dataset(dataset_id))
logger.info(f"Queued {dataset_id} for background retry")
return result
# Save to processed log
processed = load_processed()
processed[dataset_id] = {
"pr_url": result.get("pr_url"),
"timestamp": datetime.now().isoformat(),
"status": result.get("status"),
"reason": result.get("reason"),
"trigger": "webhook",
"event": payload.event.action if payload.event else None,
}
save_processed(processed)
return result
if __name__ == "__main__":
app.launch(ssr_mode=False)