davanstrien HF Staff Claude Opus 4.5 commited on
Commit
763c57d
·
1 Parent(s): afeeac0

Add pending queue for datasets where viewer isn't ready

Browse files

- ViewerNotReadyError raised when dataset preview unavailable
- Pending queue (pending.json) tracks datasets waiting for viewer
- Background retry task with delays: 1min, 2min, 5min
- New 'Pending' tab in UI to view queue and manual retry
- Webhook spawns async retry task when status is 'pending'

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <[email protected]>

Files changed (3) hide show
  1. .beads/issues.jsonl +1 -0
  2. app.py +157 -1
  3. description_generator.py +20 -1
.beads/issues.jsonl CHANGED
@@ -1,2 +1,3 @@
 
1
  {"id":"dataset-card-drafter-ebu","title":"Add PR deduplication logic","description":"Multiple PRs being opened for same dataset. Need to check for existing open PRs before creating new ones.","status":"closed","priority":1,"issue_type":"bug","created_at":"2025-12-15T17:43:02.474669Z","updated_at":"2025-12-15T17:48:03.770007Z","closed_at":"2025-12-15T17:48:03.770007Z","close_reason":"Added has_existing_pr() check using get_repo_discussions + improved PR description"}
2
  {"id":"dataset-card-drafter-wbd","title":"MVP implementation: WebhooksServer + DatasetCard + InferenceClient","description":"","status":"closed","priority":1,"issue_type":"feature","created_at":"2025-12-15T17:24:36.365733Z","updated_at":"2025-12-15T17:28:21.127763Z","closed_at":"2025-12-15T17:28:21.127763Z","close_reason":"MVP implemented with WebhooksServer, DatasetCard, and InferenceClient"}
 
1
+ {"id":"dataset-card-drafter-a69","title":"Add pending queue for datasets with viewer not ready","description":"","status":"in_progress","priority":1,"issue_type":"feature","created_at":"2025-12-15T18:00:44.18695Z","updated_at":"2025-12-15T18:00:50.025261Z"}
2
  {"id":"dataset-card-drafter-ebu","title":"Add PR deduplication logic","description":"Multiple PRs being opened for same dataset. Need to check for existing open PRs before creating new ones.","status":"closed","priority":1,"issue_type":"bug","created_at":"2025-12-15T17:43:02.474669Z","updated_at":"2025-12-15T17:48:03.770007Z","closed_at":"2025-12-15T17:48:03.770007Z","close_reason":"Added has_existing_pr() check using get_repo_discussions + improved PR description"}
3
  {"id":"dataset-card-drafter-wbd","title":"MVP implementation: WebhooksServer + DatasetCard + InferenceClient","description":"","status":"closed","priority":1,"issue_type":"feature","created_at":"2025-12-15T17:24:36.365733Z","updated_at":"2025-12-15T17:28:21.127763Z","closed_at":"2025-12-15T17:28:21.127763Z","close_reason":"MVP implemented with WebhooksServer, DatasetCard, and InferenceClient"}
app.py CHANGED
@@ -3,7 +3,9 @@
3
  Watches davanstrien/* datasets and opens PRs with auto-generated descriptions.
4
  """
5
 
 
6
  import json
 
7
  import os
8
  from datetime import datetime
9
  from pathlib import Path
@@ -16,7 +18,11 @@ from huggingface_hub import (
16
  get_repo_discussions,
17
  )
18
 
19
- from description_generator import generate_description
 
 
 
 
20
 
21
  # Space URL for attribution
22
  SPACE_URL = "https://huggingface.co/spaces/librarian-bots/dataset-card-drafter"
@@ -31,6 +37,10 @@ MIN_DESCRIPTION_LENGTH = 100 # Chars below which we generate
31
  DATA_DIR = Path("/data") if Path("/data").exists() else Path("./local_data")
32
  DATA_DIR.mkdir(exist_ok=True)
33
  PROCESSED_FILE = DATA_DIR / "processed.json"
 
 
 
 
34
 
35
 
36
  def load_processed() -> dict:
@@ -45,6 +55,39 @@ def save_processed(data: dict) -> None:
45
  PROCESSED_FILE.write_text(json.dumps(data, indent=2))
46
 
47
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
48
  def is_watched_repo(repo_name: str) -> bool:
49
  """Check if a repo is in our watched list."""
50
  return any(repo_name.startswith(prefix) for prefix in WATCHED_PREFIXES)
@@ -104,6 +147,7 @@ async def process_dataset(dataset_id: str, inference_token: str, pr_token: str)
104
  """
105
  # Check for existing open PR first
106
  if has_existing_pr(dataset_id):
 
107
  return {"status": "skipped", "reason": "open PR already exists"}
108
 
109
  # Load current card
@@ -114,11 +158,14 @@ async def process_dataset(dataset_id: str, inference_token: str, pr_token: str)
114
 
115
  # Check if description needed
116
  if not should_generate(card):
 
117
  return {"status": "skipped", "reason": "description exists"}
118
 
119
  # Generate description using inference token
120
  try:
121
  description = generate_description(dataset_id, inference_token)
 
 
122
  except Exception as e:
123
  return {"status": "error", "reason": f"generation failed: {e}"}
124
 
@@ -141,9 +188,63 @@ async def process_dataset(dataset_id: str, inference_token: str, pr_token: str)
141
  except Exception as e:
142
  return {"status": "error", "reason": f"PR creation failed: {e}"}
143
 
 
 
 
144
  return {"status": "pr_created", "pr_url": pr_url, "description": description}
145
 
146
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
147
  # Gradio UI
148
  with gr.Blocks(title="Dataset Card Drafter") as demo:
149
  gr.Markdown("# Dataset Card Drafter MVP")
@@ -157,6 +258,53 @@ with gr.Blocks(title="Dataset Card Drafter") as demo:
157
  refresh_btn = gr.Button("Refresh")
158
  refresh_btn.click(fn=load_processed, outputs=status_display)
159
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
160
  with gr.Tab("Manual Test"):
161
  gr.Markdown(
162
  "Test description generation without opening a PR.\n\n"
@@ -266,6 +414,14 @@ async def handle_dataset_webhook(payload: WebhookPayload) -> dict:
266
  # Process the dataset
267
  result = await process_dataset(dataset_id, inference_token, pr_token)
268
 
 
 
 
 
 
 
 
 
269
  # Save to processed log
270
  processed = load_processed()
271
  processed[dataset_id] = {
 
3
  Watches davanstrien/* datasets and opens PRs with auto-generated descriptions.
4
  """
5
 
6
+ import asyncio
7
  import json
8
+ import logging
9
  import os
10
  from datetime import datetime
11
  from pathlib import Path
 
18
  get_repo_discussions,
19
  )
20
 
21
+ from description_generator import ViewerNotReadyError, generate_description
22
+
23
+ # Configure logging
24
+ logging.basicConfig(level=logging.INFO)
25
+ logger = logging.getLogger(__name__)
26
 
27
  # Space URL for attribution
28
  SPACE_URL = "https://huggingface.co/spaces/librarian-bots/dataset-card-drafter"
 
37
  DATA_DIR = Path("/data") if Path("/data").exists() else Path("./local_data")
38
  DATA_DIR.mkdir(exist_ok=True)
39
  PROCESSED_FILE = DATA_DIR / "processed.json"
40
+ PENDING_FILE = DATA_DIR / "pending.json"
41
+
42
+ # Retry configuration
43
+ RETRY_DELAYS = [60, 120, 300] # Seconds to wait between retries (1min, 2min, 5min)
44
 
45
 
46
  def load_processed() -> dict:
 
55
  PROCESSED_FILE.write_text(json.dumps(data, indent=2))
56
 
57
 
58
+ def load_pending() -> dict:
59
+ """Load pending datasets from persistence."""
60
+ if PENDING_FILE.exists():
61
+ return json.loads(PENDING_FILE.read_text())
62
+ return {}
63
+
64
+
65
+ def save_pending(data: dict) -> None:
66
+ """Save pending datasets to persistence."""
67
+ PENDING_FILE.write_text(json.dumps(data, indent=2))
68
+
69
+
70
+ def add_to_pending(dataset_id: str, reason: str) -> None:
71
+ """Add a dataset to the pending queue."""
72
+ pending = load_pending()
73
+ pending[dataset_id] = {
74
+ "added": datetime.now().isoformat(),
75
+ "reason": reason,
76
+ "retries": 0,
77
+ }
78
+ save_pending(pending)
79
+ logger.info(f"Added {dataset_id} to pending queue: {reason}")
80
+
81
+
82
+ def remove_from_pending(dataset_id: str) -> None:
83
+ """Remove a dataset from the pending queue."""
84
+ pending = load_pending()
85
+ if dataset_id in pending:
86
+ del pending[dataset_id]
87
+ save_pending(pending)
88
+ logger.info(f"Removed {dataset_id} from pending queue")
89
+
90
+
91
  def is_watched_repo(repo_name: str) -> bool:
92
  """Check if a repo is in our watched list."""
93
  return any(repo_name.startswith(prefix) for prefix in WATCHED_PREFIXES)
 
147
  """
148
  # Check for existing open PR first
149
  if has_existing_pr(dataset_id):
150
+ remove_from_pending(dataset_id) # Clean up if it was pending
151
  return {"status": "skipped", "reason": "open PR already exists"}
152
 
153
  # Load current card
 
158
 
159
  # Check if description needed
160
  if not should_generate(card):
161
+ remove_from_pending(dataset_id) # Clean up if it was pending
162
  return {"status": "skipped", "reason": "description exists"}
163
 
164
  # Generate description using inference token
165
  try:
166
  description = generate_description(dataset_id, inference_token)
167
+ except ViewerNotReadyError as e:
168
+ return {"status": "pending", "reason": str(e)}
169
  except Exception as e:
170
  return {"status": "error", "reason": f"generation failed: {e}"}
171
 
 
188
  except Exception as e:
189
  return {"status": "error", "reason": f"PR creation failed: {e}"}
190
 
191
+ # Success - remove from pending if it was there
192
+ remove_from_pending(dataset_id)
193
+
194
  return {"status": "pr_created", "pr_url": pr_url, "description": description}
195
 
196
 
197
+ async def retry_pending_dataset(dataset_id: str) -> None:
198
+ """Background task to retry a pending dataset after delays."""
199
+ inference_token = os.getenv("HF_TOKEN")
200
+ pr_token = os.getenv("LIBRARIAN_BOT_TOKEN")
201
+
202
+ if not inference_token or not pr_token:
203
+ logger.error("Missing tokens for retry")
204
+ return
205
+
206
+ for i, delay in enumerate(RETRY_DELAYS):
207
+ logger.info(f"Waiting {delay}s before retry {i + 1} for {dataset_id}")
208
+ await asyncio.sleep(delay)
209
+
210
+ # Update retry count
211
+ pending = load_pending()
212
+ if dataset_id not in pending:
213
+ logger.info(f"{dataset_id} no longer pending, stopping retries")
214
+ return
215
+ pending[dataset_id]["retries"] = i + 1
216
+ save_pending(pending)
217
+
218
+ # Try processing
219
+ result = await process_dataset(dataset_id, inference_token, pr_token)
220
+
221
+ if result["status"] == "pr_created":
222
+ logger.info(f"Successfully processed {dataset_id} on retry {i + 1}")
223
+ # Log to processed
224
+ processed = load_processed()
225
+ processed[dataset_id] = {
226
+ "pr_url": result.get("pr_url"),
227
+ "timestamp": datetime.now().isoformat(),
228
+ "status": "pr_created",
229
+ "trigger": "retry",
230
+ "retry_attempt": i + 1,
231
+ }
232
+ save_processed(processed)
233
+ return
234
+ elif result["status"] != "pending":
235
+ # Got a definitive answer (error or skipped), stop retrying
236
+ logger.info(f"Stopping retries for {dataset_id}: {result}")
237
+ remove_from_pending(dataset_id)
238
+ return
239
+
240
+ # Exhausted retries
241
+ logger.warning(f"Exhausted retries for {dataset_id}")
242
+ pending = load_pending()
243
+ if dataset_id in pending:
244
+ pending[dataset_id]["exhausted"] = True
245
+ save_pending(pending)
246
+
247
+
248
  # Gradio UI
249
  with gr.Blocks(title="Dataset Card Drafter") as demo:
250
  gr.Markdown("# Dataset Card Drafter MVP")
 
258
  refresh_btn = gr.Button("Refresh")
259
  refresh_btn.click(fn=load_processed, outputs=status_display)
260
 
261
+ with gr.Tab("Pending"):
262
+ gr.Markdown(
263
+ "Datasets waiting for the viewer to be ready.\n\n"
264
+ "Background retries happen at 1min, 2min, 5min intervals."
265
+ )
266
+ pending_display = gr.JSON(label="Pending Datasets", value=load_pending)
267
+ pending_refresh_btn = gr.Button("Refresh")
268
+ pending_refresh_btn.click(fn=load_pending, outputs=pending_display)
269
+
270
+ # Manual retry button
271
+ retry_input = gr.Textbox(
272
+ label="Dataset ID to retry",
273
+ placeholder="davanstrien/dataset-name",
274
+ )
275
+ retry_btn = gr.Button("Retry Now")
276
+ retry_output = gr.JSON(label="Result")
277
+
278
+ async def manual_retry(dataset_id: str):
279
+ if not dataset_id:
280
+ return {"status": "error", "reason": "no dataset ID provided"}
281
+
282
+ inference_token = os.getenv("HF_TOKEN")
283
+ pr_token = os.getenv("LIBRARIAN_BOT_TOKEN")
284
+
285
+ if not inference_token or not pr_token:
286
+ return {"status": "error", "reason": "tokens not configured"}
287
+
288
+ result = await process_dataset(dataset_id, inference_token, pr_token)
289
+
290
+ if result.get("status") == "pr_created":
291
+ processed = load_processed()
292
+ processed[dataset_id] = {
293
+ "pr_url": result.get("pr_url"),
294
+ "timestamp": datetime.now().isoformat(),
295
+ "status": "pr_created",
296
+ "trigger": "manual_retry",
297
+ }
298
+ save_processed(processed)
299
+
300
+ return result
301
+
302
+ retry_btn.click(
303
+ fn=manual_retry,
304
+ inputs=retry_input,
305
+ outputs=retry_output,
306
+ )
307
+
308
  with gr.Tab("Manual Test"):
309
  gr.Markdown(
310
  "Test description generation without opening a PR.\n\n"
 
414
  # Process the dataset
415
  result = await process_dataset(dataset_id, inference_token, pr_token)
416
 
417
+ # Handle pending status - queue for retry
418
+ if result.get("status") == "pending":
419
+ add_to_pending(dataset_id, result.get("reason", "viewer not ready"))
420
+ # Spawn background retry task (non-blocking)
421
+ asyncio.create_task(retry_pending_dataset(dataset_id))
422
+ logger.info(f"Queued {dataset_id} for background retry")
423
+ return result
424
+
425
  # Save to processed log
426
  processed = load_processed()
427
  processed[dataset_id] = {
description_generator.py CHANGED
@@ -9,8 +9,18 @@ from huggingface_hub import InferenceClient
9
  DEFAULT_MODEL = "zai-org/GLM-4.6V:zai-org"
10
 
11
 
 
 
 
 
 
 
12
  def gather_dataset_info(dataset: str, hf_token: str | None = None) -> dict:
13
- """Gather all dataset information upfront from Datasets Viewer API."""
 
 
 
 
14
  client = DatasetsServerClient(token=hf_token)
15
 
16
  info = {"dataset": dataset}
@@ -25,6 +35,15 @@ def gather_dataset_info(dataset: str, hf_token: str | None = None) -> dict:
25
  "filter": validity.filter,
26
  "statistics": validity.statistics,
27
  }
 
 
 
 
 
 
 
 
 
28
  except Exception as e:
29
  info["validity_error"] = str(e)
30
  return info # Can't continue without validity
 
9
  DEFAULT_MODEL = "zai-org/GLM-4.6V:zai-org"
10
 
11
 
12
+ class ViewerNotReadyError(Exception):
13
+ """Raised when the Datasets Viewer hasn't processed a dataset yet."""
14
+
15
+ pass
16
+
17
+
18
  def gather_dataset_info(dataset: str, hf_token: str | None = None) -> dict:
19
+ """Gather all dataset information upfront from Datasets Viewer API.
20
+
21
+ Raises:
22
+ ViewerNotReadyError: If the dataset preview is not available yet.
23
+ """
24
  client = DatasetsServerClient(token=hf_token)
25
 
26
  info = {"dataset": dataset}
 
35
  "filter": validity.filter,
36
  "statistics": validity.statistics,
37
  }
38
+
39
+ # Check if preview is ready - we need it to get sample rows
40
+ if not validity.preview:
41
+ raise ViewerNotReadyError(
42
+ f"Dataset viewer not ready for '{dataset}'. "
43
+ "The dataset may be new or still processing."
44
+ )
45
+ except ViewerNotReadyError:
46
+ raise # Re-raise our custom exception
47
  except Exception as e:
48
  info["validity_error"] = str(e)
49
  return info # Can't continue without validity