Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions backend/src/osspckgs/migrations/V1781800000__pypi_worker.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@

CREATE TABLE pypi_package_state (
purl text PRIMARY KEY,
metadata_first_scanned_at timestamptz NOT NULL DEFAULT now(),
metadata_last_run_at timestamptz,
metadata_run_result jsonb -- { status, attempts, httpStatus?, errorKind?, message? }
);

CREATE INDEX ON pypi_package_state (metadata_last_run_at);
63 changes: 63 additions & 0 deletions docs/adr/0005-pypi-downloads-bigquery-merge-scoping.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# ADR-0005: PyPI downloads via BigQuery bulk export, scoped in the Postgres merge

**Date**: 2026-07-01
**Status**: accepted
**Deciders**: Anil B

_Consolidated ADR for the PyPI downloads worker — record further PyPI-worker download decisions here rather than opening new ADRs._

## Context

We need PyPI download counts to match the npm shape: daily counts for the **Critical slice**
(`downloads_daily`) and rolling 30-day **Window** counts for all tracked pypi packages
(`downloads_last_30d`, mirrored to `packages.downloads_last_30d`). Unlike npm, **PyPI exposes no
per-package downloads HTTP API** — the only source is the public BigQuery dataset
`bigquery-public-data.pypi.file_downloads` (raw per-download events, timestamp-partitioned). The
worker already has proven deps.dev BigQuery→GCS→staging→merge plumbing and a job monitor keyed on
`osspckgs_ingest_jobs`. Cost is driven by bytes scanned, and a single day of the three columns we
read (`file.project`, `timestamp`, `details.installer.name`) measures ~107 GB (weekend) / ~147 GB
(monthly average), so a 30-day window is ~4.56 TB.

## Decision

Ingest PyPI downloads as two new `bq-dataset-ingest` job kinds (`pypi_downloads_30d`,
`pypi_downloads_daily`) that run one BigQuery aggregate over a date range, export **all** projects to
GCS, load to staging, and **scope to the Critical slice in the Postgres merge** (`JOIN packages …
AND is_critical` for daily) — we never push our package list into BigQuery. The 30d workflow does a
**Latest-window refresh** for all pypi (mirroring the latest **Window**); the daily workflow does a
2-day **Trailing re-scan** for the critical subset. Both are idempotent (`ON CONFLICT DO UPDATE`),
fixed-window, and gap-recovered by manual **Backfill** — they are deliberately **not** self-healing.

## Alternatives Considered

### Alternative 1: npm-style per-package HTTP fetch with watermark due-selection
- **Pros**: reuses the npm downloads model exactly; source is scoped to what's due; naturally self-healing.
- **Cons**: requires a per-package downloads API.
- **Why not**: PyPI has no such API. The BigQuery public dataset is the only source, which forces a bulk-aggregate model.

### Alternative 2: Push the critical package list into BigQuery (inline `IN UNNEST([...])`) to shrink the export
- **Pros**: smaller GCS export and staging load, especially for daily backfills.
- **Cons**: inlines our data into the query text.
- **Why not**: the critical set can grow to tens of thousands+; the inline list blows BigQuery's ~1 MB query-text limit (and Temporal's ~2 MB payload limit for the name list). Merge-scoping is unbounded and matches how every deps.dev job scopes to our data in Postgres, not at the source. A cheap `getCriticalPypiCount` guard skips the scan when there are zero critical packages.

### Alternative 3: Gap-filling self-healing (npm's `computeMissingLast30dWindows` model)
- **Pros**: auto-recovers missed days/months without manual intervention.
- **Cons**: needs per-package due-selection / existing-window diffing, extra state and complexity, and re-scans BigQuery anyway.
- **Why not**: for a bulk-BQ source the simpler fixed-window + idempotent-upsert + manual **Backfill** model is sufficient; deps.dev jobs re-scan on re-run too. The daily 2-day **Trailing re-scan** already corrects a partial most-recent partition.

## Consequences

### Positive
- Reuses the deps.dev BQ→GCS→staging→merge plumbing and the `monitor:osspckgs` cost/row dashboard for free.
- Scoping in the merge scales to any critical-set size; our package identifiers never leave Postgres.
- Idempotent upserts make re-runs and overlapping backfills safe (no duplicate rows).

### Negative
- Re-running a date range re-scans BigQuery and re-bills — there is no "already imported" skip.
- The daily 2-day window re-scans each calendar day ~2×; steady-state cost ≈ $610/yr daily + $311/yr 30d ≈ **~$920/yr** at $6.25/TiB (measured).
- Not self-healing: an outage or missed schedule is recovered only by a manual **Backfill**.
- Daily export carries all ~800k projects even though the merge keeps only the critical subset (larger data movement than a source-filtered approach).

### Risks
- **BigQuery cost / runaway scans** — mitigated by per-kind `BQ_DATASET_INGEST_PYPI_DOWNLOADS_*_MAX_BQ_GB` ceilings enforced via a pre-run dry-run (aborts before billing); defaults set from measured sizes (30d = 6000 GB, daily = 2000 GB).
- **Traffic growth** — the ~4.56 TB/30d figure grows with PyPI traffic; ceilings may need raising over time.
1 change: 1 addition & 0 deletions docs/adr/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Use the `/adr` skill in Claude Code to record new ADRs or query past decisions.
| [ADR-0001](./0001-oss-packages-design-decisions.md) | OSS packages — design decisions (living) | living | 2026-05-27 |
| [ADR-0003](./0003-deps-bq-table-selection.md) | Use DependencyGraphEdgesLatest for deps ingestion; defer DependenciesLatest until NUGET or GO needed | accepted | 2026-05-29 |
| [ADR-0004](./0004-go-nuget-transitive-dependent-counts.md) | Compute GO/NUGET transitive dependent counts via exact reverse closure (over HLL approximation) | accepted | 2026-06-23 |
| [ADR-0005](./0005-pypi-downloads-bigquery-merge-scoping.md) | PyPI downloads via BigQuery bulk export, scoped in the Postgres merge | accepted | 2026-07-01 |

## Why ADRs?

Expand Down
67 changes: 67 additions & 0 deletions scripts/services/pypi-worker.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
version: '3.1'

x-env-args: &env-args
DOCKER_BUILDKIT: 1
NODE_ENV: docker
SERVICE: pypi-worker
CROWD_TEMPORAL_TASKQUEUE: pypi-worker
CROWD_TEMPORAL_NAMESPACE: ${CROWD_PACKAGES_TEMPORAL_NAMESPACE}
SHELL: /bin/sh
SUPPRESS_NO_CONFIG_WARNING: 'true'

services:
pypi-worker:
build:
context: ../../
dockerfile: ./scripts/services/docker/Dockerfile.packages
command: 'pnpm run start:pypi-worker'
working_dir: /usr/crowd/app/services/apps/packages_worker
env_file:
- ../../backend/.env.dist.local
- ../../backend/.env.dist.composed
- ../../backend/.env.override.local
- ../../backend/.env.override.composed
environment:
<<: *env-args
restart: always
networks:
- crowd-bridge

pypi-worker-dev:
build:
context: ../../
dockerfile: ./scripts/services/docker/Dockerfile.packages
command: 'pnpm run dev:pypi-worker'
working_dir: /usr/crowd/app/services/apps/packages_worker
# user: '${USER_ID}:${GROUP_ID}'
env_file:
- ../../backend/.env.dist.local
- ../../backend/.env.dist.composed
- ../../backend/.env.override.local
- ../../backend/.env.override.composed
environment:
<<: *env-args
hostname: pypi-worker
networks:
- crowd-bridge
volumes:
- ../../services/libs/audit-logs/src:/usr/crowd/app/services/libs/audit-logs/src
- ../../services/libs/common/src:/usr/crowd/app/services/libs/common/src
- ../../services/libs/common_services/src:/usr/crowd/app/services/libs/common_services/src
- ../../services/libs/data-access-layer/src:/usr/crowd/app/services/libs/data-access-layer/src
- ../../services/libs/database/src:/usr/crowd/app/services/libs/database/src
- ../../services/libs/integrations/src:/usr/crowd/app/services/libs/integrations/src
- ../../services/libs/logging/src:/usr/crowd/app/services/libs/logging/src
- ../../services/libs/nango/src:/usr/crowd/app/services/libs/nango/src
- ../../services/libs/opensearch/src:/usr/crowd/app/services/libs/opensearch/src
- ../../services/libs/queue/src:/usr/crowd/app/services/libs/queue/src
- ../../services/libs/redis/src:/usr/crowd/app/services/libs/redis/src
- ../../services/libs/snowflake/src:/usr/crowd/app/services/libs/snowflake/src
- ../../services/libs/telemetry/src:/usr/crowd/app/services/libs/telemetry/src
- ../../services/libs/temporal/src:/usr/crowd/app/services/libs/temporal/src
- ../../services/libs/types/src:/usr/crowd/app/services/libs/types/src
- ../../services/apps/packages_worker/src:/usr/crowd/app/services/apps/packages_worker/src

networks:
crowd-bridge:
external: true
16 changes: 13 additions & 3 deletions services/apps/packages_worker/CONTEXT.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# OSS Packages

Tracks open-source packages across ecosystems (npm, Maven). All packages live in `packages`; criticality scoring ranks them in place and marks the top-N per ecosystem as `is_critical = true`.
Tracks open-source packages across ecosystems (npm, pypi, maven, go, nuget, cargo — the set grows as ecosystems are onboarded). All packages live in `packages`; criticality scoring ranks them in place and marks the top-N per ecosystem as `is_critical = true`.

## Language

Expand All @@ -25,7 +25,7 @@ Package URL (`pkg:npm/react`, `pkg:maven/org.apache/commons`). The canonical cro
_Avoid_: package id (that's the `packages.id` bigserial)

**Ecosystem**:
A package registry namespace — `npm`, `maven`. Lowercase.
A package registry namespace — `npm`, `pypi`, `maven`, `go`, `nuget`, `cargo`. Lowercase. Open set — new ecosystems are onboarded over time.
_Avoid_: system (deps.dev's term), registry

**Packument**:
Expand All @@ -38,9 +38,19 @@ One rolling 30-day span in `downloads_last_30d`, identified by its `end_date` (a
_Avoid_: month, period, snapshot

**Self-healing**:
A workflow that recomputes the full set of expected rows on every run, diffs against what's in the DB, and fills only the gaps. No assumption of continuity between runs.
A workflow that recomputes the full set of expected rows on every run, diffs against what's in the DB, and fills only the gaps. No assumption of continuity between runs. **npm downloads only** — pypi downloads deliberately do NOT self-heal (see **Trailing re-scan** / **Latest-window refresh**).
_Avoid_: backfill (that's the one-time historical fill; self-healing is the ongoing property)

**Trailing re-scan** (pypi daily):
The pypi daily downloads workflow re-scans a fixed 2-day trailing window (`[today−2, today−1]`) every run and upserts. It corrects a partial most-recent partition but does **not** diff against the DB or fill older gaps. Missed days are recovered only by **Backfill**.
_Avoid_: self-healing (that's the npm gap-filling property)

**Latest-window refresh** (pypi 30d):
The pypi 30d workflow, given no `fromDate`, computes and ingests only the latest **Window** (idempotent upsert, mirrored to `packages.downloads_last_30d`). It does not gap-fill missed months; those are recovered by **Backfill**.

**Backfill**:
A manual, one-time run over an explicit date range to fill history or recover gaps — pypi daily takes `{startDate, endDate}`, pypi 30d takes `{fromDate}` (enumerates every monthly **Window** from then to the latest). For pypi downloads this is the *only* gap-recovery mechanism; scheduled runs are fixed-window, not self-healing.

## Relationships

- All packages live in `packages`; `rank_packages()` sets `is_critical = true` on the top-N per ecosystem to define the **Critical slice**.
Expand Down
3 changes: 3 additions & 0 deletions services/apps/packages_worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
"start:npm-worker": "CROWD_TEMPORAL_TASKQUEUE=npm-worker SERVICE=npm-worker tsx src/bin/npm-worker.ts",
"dev:npm-worker": "CROWD_TEMPORAL_TASKQUEUE=npm-worker SERVICE=npm-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9236 src/bin/npm-worker.ts",
"dev:npm-worker:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=npm-worker SERVICE=npm-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9236 src/bin/npm-worker.ts",
"start:pypi-worker": "CROWD_TEMPORAL_TASKQUEUE=pypi-worker SERVICE=pypi-worker tsx src/bin/pypi-worker.ts",
"dev:pypi-worker": "CROWD_TEMPORAL_TASKQUEUE=pypi-worker CROWD_TEMPORAL_NAMESPACE=$CROWD_PACKAGES_TEMPORAL_NAMESPACE SERVICE=pypi-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9242 src/bin/pypi-worker.ts",
Comment thread
epipav marked this conversation as resolved.
Outdated
"dev:pypi-worker:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=pypi-worker SERVICE=pypi-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9242 src/bin/pypi-worker.ts",
Comment thread
epipav marked this conversation as resolved.
Outdated
Comment thread
epipav marked this conversation as resolved.
Outdated
"start:osv-worker": "CROWD_TEMPORAL_TASKQUEUE=osv-worker SERVICE=osv-worker tsx src/bin/osv-worker.ts",
"dev:osv-worker": "CROWD_TEMPORAL_TASKQUEUE=osv-worker SERVICE=osv-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9238 src/bin/osv-worker.ts",
"dev:osv-worker:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=osv-worker SERVICE=osv-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9238 src/bin/osv-worker.ts",
Expand Down
5 changes: 5 additions & 0 deletions services/apps/packages_worker/src/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,9 @@ export {
cargoCleanup,
} from './cargo/activities'
export { enrichGoVersionsBatch, enrichGoStatusBatch } from './go/activities'
export {
getUnscannedPypiBatch,
ingestPypiPackageBatch,
pypiStopAfterFirstPage,
} from './pypi/activities'
export { processNuGetBatch } from './nuget/activities'
6 changes: 6 additions & 0 deletions services/apps/packages_worker/src/bin/bq-dataset-ingest.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
import { scheduleOsspckgsBootstrap } from '../deps-dev/schedules/bootstrap'
import {
schedulePypiDownloads30d,
schedulePypiDownloadsDaily,
} from '../deps-dev/schedules/pypiDownloads'
import { svc } from '../service'

setImmediate(async () => {
await svc.init()
await scheduleOsspckgsBootstrap()
await schedulePypiDownloads30d()
await schedulePypiDownloadsDaily()
await svc.start()
})
8 changes: 8 additions & 0 deletions services/apps/packages_worker/src/bin/pypi-worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { schedulePypiIngest } from '../pypi/schedule'
import { svc } from '../service'

setImmediate(async () => {
await svc.init()
await schedulePypiIngest()
await svc.start()
})
2 changes: 2 additions & 0 deletions services/apps/packages_worker/src/deps-dev/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ The mode-specific key takes precedence over the generic key. Value must be a pos
| `BQ_DATASET_INGEST_DEPENDENT_COUNTS_NUGET_MAX_BQ_GB` | 200 | `dependent_counts_nuget` | NUGET exact reverse transitive closure over `NuGetRequirementsLatest` (script mode). All 3 count columns. `maximumBytesBilled` runaway cap above the measured ~32 GB. |
| `BQ_DATASET_INGEST_SCORECARD_REPOS_MAX_BQ_GB` | 50 | `scorecard_repos` | |
| `BQ_DATASET_INGEST_SCORECARD_CHECKS_MAX_BQ_GB` | 500 | `scorecard_checks` | |
| `BQ_DATASET_INGEST_PYPI_DOWNLOADS_30D_MAX_BQ_GB` | 6000 | `pypi_downloads_30d` | Per 30-day window scan (~4.56 TB measured; set in `ingestPypiDownloads.ts`). |
| `BQ_DATASET_INGEST_PYPI_DOWNLOADS_DAILY_MAX_BQ_GB` | 2000 | `pypi_downloads_daily` | Daily 2-day trailing window (~300 GB); scales with backfill range, raise for long backfills. |

The override logic lives in `src/deps-dev/activities/bqExportToGcs.ts`.

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { getCriticalPypiPackageCount } from '@crowd/data-access-layer'

import { getPackagesDb } from '../../db'

// Count of critical PyPI packages, so the daily downloads workflow can skip its BigQuery scan
// when there are none (the merge is scoped to is_critical, mirroring how deps.dev scopes to our
// packages in the Postgres merge rather than pushing our package list into BigQuery).
export async function getCriticalPypiCount(): Promise<{ count: number }> {
const qx = await getPackagesDb()
const count = await getCriticalPypiPackageCount(qx)
return { count }
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export * from './bqExportToGcs'
export * from './getCriticalPypiCount'
export * from './setJobStep'
export * from './createVersionsLookup'
export * from './managePackageDepsConstraints'
Expand Down
Loading
Loading