|
|
"""Dataset Card Drafter - MVP Space. |
|
|
|
|
|
Watches davanstrien/* datasets and opens PRs with auto-generated descriptions. |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import json |
|
|
import logging |
|
|
import os |
|
|
from datetime import datetime |
|
|
from pathlib import Path |
|
|
|
|
|
import gradio as gr |
|
|
from huggingface_hub import ( |
|
|
DatasetCard, |
|
|
WebhookPayload, |
|
|
WebhooksServer, |
|
|
get_repo_discussions, |
|
|
) |
|
|
|
|
|
from description_generator import ViewerNotReadyError, generate_description |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
SPACE_URL = "https://huggingface.co/spaces/librarian-bots/dataset-card-drafter" |
|
|
BOT_NAME = "librarian-bot" |
|
|
PR_TITLE = "Add dataset description" |
|
|
|
|
|
|
|
|
WATCHED_PREFIXES = ["davanstrien/"] |
|
|
MIN_DESCRIPTION_LENGTH = 100 |
|
|
|
|
|
|
|
|
DATA_DIR = Path("/data") if Path("/data").exists() else Path("./local_data") |
|
|
DATA_DIR.mkdir(exist_ok=True) |
|
|
PROCESSED_FILE = DATA_DIR / "processed.json" |
|
|
PENDING_FILE = DATA_DIR / "pending.json" |
|
|
|
|
|
|
|
|
RETRY_DELAYS = [60, 120, 300] |
|
|
|
|
|
|
|
|
def load_processed() -> dict: |
|
|
"""Load processed datasets from persistence.""" |
|
|
if PROCESSED_FILE.exists(): |
|
|
return json.loads(PROCESSED_FILE.read_text()) |
|
|
return {} |
|
|
|
|
|
|
|
|
def save_processed(data: dict) -> None: |
|
|
"""Save processed datasets to persistence.""" |
|
|
PROCESSED_FILE.write_text(json.dumps(data, indent=2)) |
|
|
|
|
|
|
|
|
def load_pending() -> dict: |
|
|
"""Load pending datasets from persistence.""" |
|
|
if PENDING_FILE.exists(): |
|
|
return json.loads(PENDING_FILE.read_text()) |
|
|
return {} |
|
|
|
|
|
|
|
|
def save_pending(data: dict) -> None: |
|
|
"""Save pending datasets to persistence.""" |
|
|
PENDING_FILE.write_text(json.dumps(data, indent=2)) |
|
|
|
|
|
|
|
|
def add_to_pending(dataset_id: str, reason: str) -> None: |
|
|
"""Add a dataset to the pending queue.""" |
|
|
pending = load_pending() |
|
|
pending[dataset_id] = { |
|
|
"added": datetime.now().isoformat(), |
|
|
"reason": reason, |
|
|
"retries": 0, |
|
|
} |
|
|
save_pending(pending) |
|
|
logger.info(f"Added {dataset_id} to pending queue: {reason}") |
|
|
|
|
|
|
|
|
def remove_from_pending(dataset_id: str) -> None: |
|
|
"""Remove a dataset from the pending queue.""" |
|
|
pending = load_pending() |
|
|
if dataset_id in pending: |
|
|
del pending[dataset_id] |
|
|
save_pending(pending) |
|
|
logger.info(f"Removed {dataset_id} from pending queue") |
|
|
|
|
|
|
|
|
def is_watched_repo(repo_name: str) -> bool: |
|
|
"""Check if a repo is in our watched list.""" |
|
|
return any(repo_name.startswith(prefix) for prefix in WATCHED_PREFIXES) |
|
|
|
|
|
|
|
|
def should_generate(card: DatasetCard) -> bool: |
|
|
"""Check if a dataset card needs a description.""" |
|
|
if not card.text: |
|
|
return True |
|
|
return len(card.text.strip()) < MIN_DESCRIPTION_LENGTH |
|
|
|
|
|
|
|
|
def has_existing_pr(dataset_id: str) -> bool: |
|
|
"""Check if there's already an open PR from librarian-bot for this dataset.""" |
|
|
try: |
|
|
discussions = list(get_repo_discussions(dataset_id, repo_type="dataset")) |
|
|
for discussion in discussions: |
|
|
if not discussion.is_pull_request: |
|
|
continue |
|
|
|
|
|
if discussion.author == BOT_NAME and discussion.title == PR_TITLE: |
|
|
|
|
|
if discussion.status == "open": |
|
|
return True |
|
|
return False |
|
|
except Exception: |
|
|
|
|
|
return False |
|
|
|
|
|
|
|
|
def build_pr_description(description: str) -> str: |
|
|
"""Build the PR description with attribution and the generated content.""" |
|
|
return f"""## Auto-generated Dataset Description |
|
|
|
|
|
This PR was automatically generated by the [Dataset Card Drafter]({SPACE_URL}) Space. |
|
|
|
|
|
### Suggested Description |
|
|
|
|
|
{description} |
|
|
|
|
|
--- |
|
|
|
|
|
*If you find this description helpful, please merge the PR. If you'd like to edit it first, you can modify the README.md in this branch before merging.* |
|
|
|
|
|
*Generated by [{BOT_NAME}]({SPACE_URL})*""" |
|
|
|
|
|
|
|
|
async def process_dataset(dataset_id: str, inference_token: str, pr_token: str) -> dict: |
|
|
"""Process a single dataset: check, generate, and open PR. |
|
|
|
|
|
Args: |
|
|
dataset_id: The dataset to process |
|
|
inference_token: Token for inference API calls (e.g., davanstrien's token) |
|
|
pr_token: Token for opening PRs (librarian-bot's token) |
|
|
|
|
|
Returns a status dict with results. |
|
|
""" |
|
|
|
|
|
if has_existing_pr(dataset_id): |
|
|
remove_from_pending(dataset_id) |
|
|
return {"status": "skipped", "reason": "open PR already exists"} |
|
|
|
|
|
|
|
|
try: |
|
|
card = DatasetCard.load(dataset_id) |
|
|
except Exception: |
|
|
|
|
|
card = DatasetCard("") |
|
|
logger.info(f"No existing card for {dataset_id}, creating new one") |
|
|
|
|
|
|
|
|
if not should_generate(card): |
|
|
remove_from_pending(dataset_id) |
|
|
return {"status": "skipped", "reason": "description exists"} |
|
|
|
|
|
|
|
|
try: |
|
|
description = generate_description(dataset_id, inference_token) |
|
|
except ViewerNotReadyError as e: |
|
|
return {"status": "pending", "reason": str(e)} |
|
|
except Exception as e: |
|
|
return {"status": "error", "reason": f"generation failed: {e}"} |
|
|
|
|
|
if not description: |
|
|
return {"status": "error", "reason": "empty description generated"} |
|
|
|
|
|
|
|
|
card.text = description |
|
|
|
|
|
try: |
|
|
commit_info = card.push_to_hub( |
|
|
repo_id=dataset_id, |
|
|
repo_type="dataset", |
|
|
commit_message=PR_TITLE, |
|
|
commit_description=build_pr_description(description), |
|
|
create_pr=True, |
|
|
token=pr_token, |
|
|
) |
|
|
pr_url = getattr(commit_info, "pr_url", str(commit_info)) |
|
|
except Exception as e: |
|
|
return {"status": "error", "reason": f"PR creation failed: {e}"} |
|
|
|
|
|
|
|
|
remove_from_pending(dataset_id) |
|
|
|
|
|
return {"status": "pr_created", "pr_url": pr_url, "description": description} |
|
|
|
|
|
|
|
|
async def retry_pending_dataset(dataset_id: str) -> None: |
|
|
"""Background task to retry a pending dataset after delays.""" |
|
|
inference_token = os.getenv("HF_TOKEN") |
|
|
pr_token = os.getenv("LIBRARIAN_BOT_TOKEN") |
|
|
|
|
|
if not inference_token or not pr_token: |
|
|
logger.error("Missing tokens for retry") |
|
|
return |
|
|
|
|
|
for i, delay in enumerate(RETRY_DELAYS): |
|
|
logger.info(f"Waiting {delay}s before retry {i + 1} for {dataset_id}") |
|
|
await asyncio.sleep(delay) |
|
|
|
|
|
|
|
|
pending = load_pending() |
|
|
if dataset_id not in pending: |
|
|
logger.info(f"{dataset_id} no longer pending, stopping retries") |
|
|
return |
|
|
pending[dataset_id]["retries"] = i + 1 |
|
|
save_pending(pending) |
|
|
|
|
|
|
|
|
result = await process_dataset(dataset_id, inference_token, pr_token) |
|
|
|
|
|
if result["status"] == "pr_created": |
|
|
logger.info(f"Successfully processed {dataset_id} on retry {i + 1}") |
|
|
|
|
|
processed = load_processed() |
|
|
processed[dataset_id] = { |
|
|
"pr_url": result.get("pr_url"), |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"status": "pr_created", |
|
|
"trigger": "retry", |
|
|
"retry_attempt": i + 1, |
|
|
} |
|
|
save_processed(processed) |
|
|
return |
|
|
elif result["status"] != "pending": |
|
|
|
|
|
logger.info(f"Stopping retries for {dataset_id}: {result}") |
|
|
remove_from_pending(dataset_id) |
|
|
return |
|
|
|
|
|
|
|
|
logger.warning(f"Exhausted retries for {dataset_id}") |
|
|
pending = load_pending() |
|
|
if dataset_id in pending: |
|
|
pending[dataset_id]["exhausted"] = True |
|
|
save_pending(pending) |
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks(title="Dataset Card Drafter") as demo: |
|
|
gr.Markdown("# Dataset Card Drafter MVP") |
|
|
gr.Markdown( |
|
|
f"Watching datasets matching: `{'`, `'.join(WATCHED_PREFIXES)}`\n\n" |
|
|
f"Triggers when description < {MIN_DESCRIPTION_LENGTH} characters." |
|
|
) |
|
|
|
|
|
with gr.Tab("Status"): |
|
|
status_display = gr.JSON(label="Processed Datasets", value=load_processed) |
|
|
refresh_btn = gr.Button("Refresh") |
|
|
refresh_btn.click(fn=load_processed, outputs=status_display) |
|
|
|
|
|
with gr.Tab("Pending"): |
|
|
gr.Markdown( |
|
|
"Datasets waiting for the viewer to be ready.\n\n" |
|
|
"Background retries happen at 1min, 2min, 5min intervals." |
|
|
) |
|
|
pending_display = gr.JSON(label="Pending Datasets", value=load_pending) |
|
|
pending_refresh_btn = gr.Button("Refresh") |
|
|
pending_refresh_btn.click(fn=load_pending, outputs=pending_display) |
|
|
|
|
|
|
|
|
retry_input = gr.Textbox( |
|
|
label="Dataset ID to retry", |
|
|
placeholder="davanstrien/dataset-name", |
|
|
) |
|
|
retry_btn = gr.Button("Retry Now") |
|
|
retry_output = gr.JSON(label="Result") |
|
|
|
|
|
async def manual_retry(dataset_id: str): |
|
|
if not dataset_id: |
|
|
return {"status": "error", "reason": "no dataset ID provided"} |
|
|
|
|
|
inference_token = os.getenv("HF_TOKEN") |
|
|
pr_token = os.getenv("LIBRARIAN_BOT_TOKEN") |
|
|
|
|
|
if not inference_token or not pr_token: |
|
|
return {"status": "error", "reason": "tokens not configured"} |
|
|
|
|
|
result = await process_dataset(dataset_id, inference_token, pr_token) |
|
|
|
|
|
if result.get("status") == "pr_created": |
|
|
processed = load_processed() |
|
|
processed[dataset_id] = { |
|
|
"pr_url": result.get("pr_url"), |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"status": "pr_created", |
|
|
"trigger": "manual_retry", |
|
|
} |
|
|
save_processed(processed) |
|
|
|
|
|
return result |
|
|
|
|
|
retry_btn.click( |
|
|
fn=manual_retry, |
|
|
inputs=retry_input, |
|
|
outputs=retry_output, |
|
|
) |
|
|
|
|
|
with gr.Tab("Manual Test"): |
|
|
gr.Markdown( |
|
|
"Test description generation without opening a PR.\n\n" |
|
|
"**Note:** This requires `HF_TOKEN` to be set." |
|
|
) |
|
|
test_input = gr.Textbox( |
|
|
label="Dataset ID", |
|
|
placeholder="davanstrien/test-dataset", |
|
|
) |
|
|
test_btn = gr.Button("Generate Description (Preview)") |
|
|
test_output = gr.Textbox(label="Generated Description", lines=5) |
|
|
test_status = gr.JSON(label="Status") |
|
|
|
|
|
def test_generate(dataset_id: str): |
|
|
if not dataset_id: |
|
|
return "", {"status": "error", "reason": "no dataset ID provided"} |
|
|
|
|
|
hf_token = os.getenv("HF_TOKEN") |
|
|
if not hf_token: |
|
|
return "", {"status": "error", "reason": "HF_TOKEN not set"} |
|
|
|
|
|
try: |
|
|
description = generate_description(dataset_id, hf_token) |
|
|
return description, {"status": "success", "length": len(description)} |
|
|
except Exception as e: |
|
|
return "", {"status": "error", "reason": str(e)} |
|
|
|
|
|
test_btn.click( |
|
|
fn=test_generate, |
|
|
inputs=test_input, |
|
|
outputs=[test_output, test_status], |
|
|
) |
|
|
|
|
|
with gr.Tab("Trigger PR"): |
|
|
gr.Markdown( |
|
|
"Manually trigger description generation and PR creation.\n\n" |
|
|
"**Warning:** This will open a real PR!\n\n" |
|
|
"Requires `HF_TOKEN` (for inference) and `LIBRARIAN_BOT_TOKEN` (for PRs)." |
|
|
) |
|
|
trigger_input = gr.Textbox( |
|
|
label="Dataset ID", |
|
|
placeholder="davanstrien/test-dataset", |
|
|
) |
|
|
trigger_btn = gr.Button("Generate & Open PR", variant="primary") |
|
|
trigger_output = gr.JSON(label="Result") |
|
|
|
|
|
async def trigger_pr(dataset_id: str): |
|
|
if not dataset_id: |
|
|
return {"status": "error", "reason": "no dataset ID provided"} |
|
|
|
|
|
inference_token = os.getenv("HF_TOKEN") |
|
|
pr_token = os.getenv("LIBRARIAN_BOT_TOKEN") |
|
|
|
|
|
if not inference_token: |
|
|
return {"status": "error", "reason": "HF_TOKEN not set"} |
|
|
if not pr_token: |
|
|
return {"status": "error", "reason": "LIBRARIAN_BOT_TOKEN not set"} |
|
|
|
|
|
result = await process_dataset(dataset_id, inference_token, pr_token) |
|
|
|
|
|
|
|
|
if result.get("status") == "pr_created": |
|
|
processed = load_processed() |
|
|
processed[dataset_id] = { |
|
|
"pr_url": result.get("pr_url"), |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"status": "pr_created", |
|
|
"trigger": "manual", |
|
|
} |
|
|
save_processed(processed) |
|
|
|
|
|
return result |
|
|
|
|
|
trigger_btn.click( |
|
|
fn=trigger_pr, |
|
|
inputs=trigger_input, |
|
|
outputs=trigger_output, |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
app = WebhooksServer(ui=demo, webhook_secret=os.getenv("WEBHOOK_SECRET")) |
|
|
|
|
|
|
|
|
@app.add_webhook("/dataset_update") |
|
|
async def handle_dataset_webhook(payload: WebhookPayload) -> dict: |
|
|
"""Handle dataset creation/update webhooks.""" |
|
|
|
|
|
if payload.repo.type != "dataset": |
|
|
return {"status": "skipped", "reason": "not a dataset"} |
|
|
|
|
|
|
|
|
if not is_watched_repo(payload.repo.name): |
|
|
return {"status": "skipped", "reason": "not in watched list"} |
|
|
|
|
|
dataset_id = payload.repo.name |
|
|
|
|
|
|
|
|
inference_token = os.getenv("HF_TOKEN") |
|
|
pr_token = os.getenv("LIBRARIAN_BOT_TOKEN") |
|
|
|
|
|
if not inference_token: |
|
|
return {"status": "error", "reason": "HF_TOKEN not configured"} |
|
|
if not pr_token: |
|
|
return {"status": "error", "reason": "LIBRARIAN_BOT_TOKEN not configured"} |
|
|
|
|
|
|
|
|
result = await process_dataset(dataset_id, inference_token, pr_token) |
|
|
|
|
|
|
|
|
if result.get("status") == "pending": |
|
|
add_to_pending(dataset_id, result.get("reason", "viewer not ready")) |
|
|
|
|
|
asyncio.create_task(retry_pending_dataset(dataset_id)) |
|
|
logger.info(f"Queued {dataset_id} for background retry") |
|
|
return result |
|
|
|
|
|
|
|
|
processed = load_processed() |
|
|
processed[dataset_id] = { |
|
|
"pr_url": result.get("pr_url"), |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"status": result.get("status"), |
|
|
"reason": result.get("reason"), |
|
|
"trigger": "webhook", |
|
|
"event": payload.event.action if payload.event else None, |
|
|
} |
|
|
save_processed(processed) |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
app.launch(ssr_mode=False) |
|
|
|