feat: pypi worker with downloads#4291
Conversation
Signed-off-by: anilb <epipav@gmail.com>
Signed-off-by: anilb <epipav@gmail.com>
Signed-off-by: anilb <epipav@gmail.com>
|
|
|
Your PR title doesn't contain a Jira issue key. Consider adding it for better traceability. Example:
Projects:
Please add a Jira issue key to your PR title. |
There was a problem hiding this comment.
Pull request overview
Adds a new PyPI ingestion lane to packages_worker, including registry metadata enrichment and BigQuery-based download count ingestion, backed by new DAL helpers and a pypi_package_state table for scan watermarking.
Changes:
- Introduces
pypi-workerTemporal workflow/activity set to fetch PyPI JSON API metadata and upsert packages/versions/maintainers/funding links with scan state tracking. - Adds PyPI downloads ingestion workflows (monthly last-30d for all PyPI packages + daily trailing window for critical PyPI) using the existing BigQuery→GCS→staging→merge pipeline and new job kinds.
- Factors shared utilities (proxy parsing, client-error classification, NUL byte stripping) and updates monitoring/docs/ADR/migration to support the new flows.
Reviewed changes
Copilot reviewed 44 out of 46 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| services/libs/data-access-layer/src/packages/versions.ts | Adds upsertPypiVersions DAL upsert for PyPI version rows and is_latest cleanup. |
| services/libs/data-access-layer/src/packages/pypiPackageState.ts | Adds DAL helpers for marking PyPI packages scanned and selecting stale/unscanned PyPI purls. |
| services/libs/data-access-layer/src/packages/packages.ts | Adds upsertPypiPackage DAL upsert and a critical PyPI count helper. |
| services/libs/data-access-layer/src/packages/maintainers.ts | Generalizes maintainer upsert to support multiple ecosystems via a parameter. |
| services/libs/data-access-layer/src/packages/index.ts | Re-exports new PyPI package state DAL module. |
| services/libs/data-access-layer/src/osspckgs/ingestJobs.ts | Registers new osspckgs ingest job kinds for PyPI downloads. |
| services/apps/packages_worker/src/workflows/index.ts | Exposes new PyPI ingest and PyPI downloads workflows from the worker entrypoint exports. |
| services/apps/packages_worker/src/utils/stripNullBytesDeep.ts | Adds shared deep NUL-byte stripping utility for Postgres-safe string persistence. |
| services/apps/packages_worker/src/utils/isClientError.ts | Adds shared helper to classify skippable 4xx outcomes vs retryable failures. |
| services/apps/packages_worker/src/utils/tests/stripNullBytesDeep.test.ts | Adds unit tests for the new NUL-byte stripping utility. |
| services/apps/packages_worker/src/scripts/monitorOsspckgs.ts | Extends monitoring mappings to include PyPI download tables and abbreviations. |
| services/apps/packages_worker/src/pypi/workflows.ts | Adds PyPI metadata ingest workflow with batching + continue-as-new behavior. |
| services/apps/packages_worker/src/pypi/upsertProject.ts | Implements PyPI project normalization and DAL upsert wiring (package, versions, repo link, maintainers, funding). |
| services/apps/packages_worker/src/pypi/types.ts | Adds types and guards for PyPI JSON API and fetch error classification. |
| services/apps/packages_worker/src/pypi/schedule.ts | Registers Temporal schedule for the PyPI registry ingest workflow. |
| services/apps/packages_worker/src/pypi/retryPolicy.ts | Centralizes max-attempts constant shared by workflow retry + activity give-up logic. |
| services/apps/packages_worker/src/pypi/proxies.ts | Adds PyPI-specific enable flag + proxy pool selection based on shared proxy parsing. |
| services/apps/packages_worker/src/pypi/normalize.ts | Adds PyPI normalization helpers: purl decoding, license parsing, prerelease detection, maintainers parsing, URL classification, version row derivation. |
| services/apps/packages_worker/src/pypi/fetchProject.ts | Adds PyPI JSON API fetcher with timeout/abort and HTTP→error-kind mapping. |
| services/apps/packages_worker/src/pypi/activities.ts | Adds PyPI activities for selecting unscanned batches, ingesting packages with retry/give-up behavior, optional proxy rotation, and audit/state writes. |
| services/apps/packages_worker/src/pypi/tests/normalize.test.ts | Adds unit tests for PyPI normalization logic (licenses, prereleases, URL classification, maintainers, version rows). |
| services/apps/packages_worker/src/pypi/tests/ingest.test.ts | Adds unit tests for ingest retry/give-up behavior and client-error classification. |
| services/apps/packages_worker/src/pypi/tests/fetchProject.test.ts | Adds unit tests for fetcher status/error mapping and timeout behavior. |
| services/apps/packages_worker/src/proxies.ts | Introduces shared proxy parsing utilities for reuse across workers. |
| services/apps/packages_worker/src/npm/upsertPackage.ts | Switches npm upsert path to use shared stripNullBytesDeep utility. |
| services/apps/packages_worker/src/npm/proxies.ts | Refactors npm proxy logic to reuse shared proxy parsing primitives. |
| services/apps/packages_worker/src/npm/normalize.ts | Removes now-shared stripNullBytesDeep implementation from npm normalize module. |
| services/apps/packages_worker/src/npm/activities.ts | Refactors npm activities to use shared proxyUrl and shared isClientError. |
| services/apps/packages_worker/src/deps-dev/workflows/ingestPypiDownloads.ts | Adds PyPI downloads ingest workflows using BQ export + staging + merge, with per-kind scan ceilings and critical-count guard. |
| services/apps/packages_worker/src/deps-dev/workflows/index.ts | Exports the new PyPI downloads workflows. |
| services/apps/packages_worker/src/deps-dev/schedules/pypiDownloads.ts | Registers Temporal schedules for monthly 30d and daily PyPI downloads ingestion. |
| services/apps/packages_worker/src/deps-dev/README.md | Documents new PyPI downloads BigQuery ceiling environment variables. |
| services/apps/packages_worker/src/deps-dev/queries/pypiDownloadsSql.ts | Adds BigQuery aggregate SQL builders and Postgres merge SQL builders for PyPI downloads. |
| services/apps/packages_worker/src/deps-dev/queries/pypiDownloadsDates.ts | Adds deterministic date/window math helpers for PyPI downloads workflows. |
| services/apps/packages_worker/src/deps-dev/queries/tests/pypiDownloads.test.ts | Adds unit tests for PyPI downloads date math and SQL builder outputs. |
| services/apps/packages_worker/src/deps-dev/activities/index.ts | Exports new critical PyPI count activity. |
| services/apps/packages_worker/src/deps-dev/activities/getCriticalPypiCount.ts | Adds activity to count critical PyPI packages to skip BQ scans when none exist. |
| services/apps/packages_worker/src/bin/pypi-worker.ts | Adds a new worker entrypoint to register PyPI ingest schedule and start the service. |
| services/apps/packages_worker/src/bin/bq-dataset-ingest.ts | Registers PyPI downloads schedules in the BigQuery dataset ingest service. |
| services/apps/packages_worker/src/activities.ts | Exports PyPI activities from the global activities barrel. |
| services/apps/packages_worker/package.json | Adds start/dev scripts for the new pypi-worker lane. |
| services/apps/packages_worker/CONTEXT.md | Updates OSS packages context docs to include PyPI ecosystem and explains PyPI downloads windowing semantics. |
| scripts/services/pypi-worker.yaml | Adds docker-compose service definitions for pypi-worker (prod + dev). |
| docs/adr/README.md | Registers the new ADR-0005 in the ADR index table. |
| docs/adr/0005-pypi-downloads-bigquery-merge-scoping.md | Adds ADR documenting the chosen PyPI downloads ingestion approach and tradeoffs. |
| backend/src/osspckgs/migrations/V1781800000__pypi_worker.sql | Adds pypi_package_state table + index to persist scan outcomes and staleness tracking. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Signed-off-by: anilb <epipav@gmail.com>
Signed-off-by: anilb <epipav@gmail.com>
| "dev:pypi-worker": "CROWD_TEMPORAL_TASKQUEUE=pypi-worker CROWD_TEMPORAL_NAMESPACE=$CROWD_PACKAGES_TEMPORAL_NAMESPACE SERVICE=pypi-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9242 src/bin/pypi-worker.ts", | ||
| "dev:pypi-worker:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=pypi-worker SERVICE=pypi-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9242 src/bin/pypi-worker.ts", |
Signed-off-by: anilb <epipav@gmail.com>
Signed-off-by: anilb <epipav@gmail.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 4142844. Configure here.
Signed-off-by: anilb <epipav@gmail.com>
Signed-off-by: anilb <epipav@gmail.com>
| return ` | ||
| SELECT | ||
| ${BQ_PROJECT_NORM} AS project, | ||
| DATE(timestamp) AS day, |
| DATE(timestamp) AS day, | ||
| COUNT(*) AS downloads | ||
| FROM ${BQ_TABLE} | ||
| WHERE DATE(timestamp) BETWEEN DATE('${startDate}') AND DATE('${endDate}') |
Signed-off-by: anilb <epipav@gmail.com>

Note
Medium Risk
Large new surface area (Temporal workflows, external PyPI API, and multi-TB BigQuery scans with ongoing cost); mitigations include BQ byte ceilings and idempotent merges, but missed schedules are not self-healing.
Overview
Adds a PyPI ecosystem to the packages worker: a new
pypi-workerTemporal service for registry metadata enrichment, plus PyPI download counts wired through the existingbq-dataset-ingestpipeline.Metadata (
pypi-worker) fetchespypi.orgJSON per package, normalizes licenses/versions/maintainers/repos, and upserts intopackages/versions/ maintainers. Scan progress lives in newpypi_package_state; due selection is staleness-based (no npm-style_changesfeed), defaulting tois_criticalpackages. Ingest mirrors npm’s retry/give-up behavior (sharedisClientError,stripNullBytesDeep, shared proxy helpers).Downloads (
bq-dataset-ingest) adds job kindspypi_downloads_30dandpypi_downloads_daily: BigQuery aggregates overbigquery-public-data.pypi.file_downloads, export-all-projects to GCS, then Postgres merge joins topackages(daily scoped tois_critical). Scheduled monthly 30d latest-window refresh (mirrors topackages.downloads_last_30d) and daily 2-day trailing re-scan; gaps need manual backfill (documented in ADR-0005).getCriticalPypiCountskips daily BQ when there are zero critical PyPI packages.Shared refactors:
upsertPackageMaintainersreplaces npm-only maintainer upsert (npm now passesecosystem); npmCOALESCEon release dates when upserting. Docker/build scripts,monitor:osspckgs, ingest job types, andCONTEXT.mdterminology updated for PyPI.Reviewed by Cursor Bugbot for commit fd4dbd7. Bugbot is set up for automated code reviews on this repo. Configure here.