Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions backend/src/osspckgs/migrations/V1781800000__pypi_worker.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@

CREATE TABLE pypi_package_state (
purl text PRIMARY KEY,
metadata_first_scanned_at timestamptz NOT NULL DEFAULT now(),
metadata_last_run_at timestamptz,
metadata_run_result jsonb -- { status, attempts, httpStatus?, errorKind?, message? }
);

CREATE INDEX ON pypi_package_state (metadata_last_run_at);
63 changes: 63 additions & 0 deletions docs/adr/0005-pypi-downloads-bigquery-merge-scoping.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# ADR-0005: PyPI downloads via BigQuery bulk export, scoped in the Postgres merge

**Date**: 2026-07-01
**Status**: accepted
**Deciders**: Anil B

_Consolidated ADR for the PyPI downloads worker — record further PyPI-worker download decisions here rather than opening new ADRs._

## Context

We need PyPI download counts to match the npm shape: daily counts for the **Critical slice**
(`downloads_daily`) and rolling 30-day **Window** counts for all tracked pypi packages
(`downloads_last_30d`, mirrored to `packages.downloads_last_30d`). Unlike npm, **PyPI exposes no
per-package downloads HTTP API** — the only source is the public BigQuery dataset
`bigquery-public-data.pypi.file_downloads` (raw per-download events, timestamp-partitioned). The
worker already has proven deps.dev BigQuery→GCS→staging→merge plumbing and a job monitor keyed on
`osspckgs_ingest_jobs`. Cost is driven by bytes scanned, and a single day of the three columns we
read (`file.project`, `timestamp`, `details.installer.name`) measures ~107 GB (weekend) / ~147 GB
(monthly average), so a 30-day window is ~4.56 TB.

## Decision

Ingest PyPI downloads as two new `bq-dataset-ingest` job kinds (`pypi_downloads_30d`,
`pypi_downloads_daily`) that run one BigQuery aggregate over a date range, export **all** projects to
GCS, load to staging, and **scope to the Critical slice in the Postgres merge** (`JOIN packages …
AND is_critical` for daily) — we never push our package list into BigQuery. The 30d workflow does a
**Latest-window refresh** for all pypi (mirroring the latest **Window**); the daily workflow does a
2-day **Trailing re-scan** for the critical subset. Both are idempotent (`ON CONFLICT DO UPDATE`),
fixed-window, and gap-recovered by manual **Backfill** — they are deliberately **not** self-healing.

## Alternatives Considered

### Alternative 1: npm-style per-package HTTP fetch with watermark due-selection
- **Pros**: reuses the npm downloads model exactly; source is scoped to what's due; naturally self-healing.
- **Cons**: requires a per-package downloads API.
- **Why not**: PyPI has no such API. The BigQuery public dataset is the only source, which forces a bulk-aggregate model.

### Alternative 2: Push the critical package list into BigQuery (inline `IN UNNEST([...])`) to shrink the export
- **Pros**: smaller GCS export and staging load, especially for daily backfills.
- **Cons**: inlines our data into the query text.
- **Why not**: the critical set can grow to tens of thousands+; the inline list blows BigQuery's ~1 MB query-text limit (and Temporal's ~2 MB payload limit for the name list). Merge-scoping is unbounded and matches how every deps.dev job scopes to our data in Postgres, not at the source. A cheap `getCriticalPypiCount` guard skips the scan when there are zero critical packages.

### Alternative 3: Gap-filling self-healing (npm's `computeMissingLast30dWindows` model)
- **Pros**: auto-recovers missed days/months without manual intervention.
- **Cons**: needs per-package due-selection / existing-window diffing, extra state and complexity, and re-scans BigQuery anyway.
- **Why not**: for a bulk-BQ source the simpler fixed-window + idempotent-upsert + manual **Backfill** model is sufficient; deps.dev jobs re-scan on re-run too. The daily 2-day **Trailing re-scan** already corrects a partial most-recent partition.

## Consequences

### Positive
- Reuses the deps.dev BQ→GCS→staging→merge plumbing and the `monitor:osspckgs` cost/row dashboard for free.
- Scoping in the merge scales to any critical-set size; our package identifiers never leave Postgres.
- Idempotent upserts make re-runs and overlapping backfills safe (no duplicate rows).

### Negative
- Re-running a date range re-scans BigQuery and re-bills — there is no "already imported" skip.
- The daily 2-day window re-scans each calendar day ~2×; steady-state cost ≈ $610/yr daily + $311/yr 30d ≈ **~$920/yr** at $6.25/TiB (measured).
- Not self-healing: an outage or missed schedule is recovered only by a manual **Backfill**.
- Daily export carries all ~800k projects even though the merge keeps only the critical subset (larger data movement than a source-filtered approach).

### Risks
- **BigQuery cost / runaway scans** — mitigated by per-kind `BQ_DATASET_INGEST_PYPI_DOWNLOADS_*_MAX_BQ_GB` ceilings enforced via a pre-run dry-run (aborts before billing); defaults set from measured sizes (30d = 6000 GB, daily = 2000 GB).
- **Traffic growth** — the ~4.56 TB/30d figure grows with PyPI traffic; ceilings may need raising over time.
1 change: 1 addition & 0 deletions docs/adr/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Use the `/adr` skill in Claude Code to record new ADRs or query past decisions.
| [ADR-0001](./0001-oss-packages-design-decisions.md) | OSS packages — design decisions (living) | living | 2026-05-27 |
| [ADR-0003](./0003-deps-bq-table-selection.md) | Use DependencyGraphEdgesLatest for deps ingestion; defer DependenciesLatest until NUGET or GO needed | accepted | 2026-05-29 |
| [ADR-0004](./0004-go-nuget-transitive-dependent-counts.md) | Compute GO/NUGET transitive dependent counts via exact reverse closure (over HLL approximation) | accepted | 2026-06-23 |
| [ADR-0005](./0005-pypi-downloads-bigquery-merge-scoping.md) | PyPI downloads via BigQuery bulk export, scoped in the Postgres merge | accepted | 2026-07-01 |

## Why ADRs?

Expand Down
67 changes: 67 additions & 0 deletions scripts/services/pypi-worker.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
version: '3.1'

x-env-args: &env-args
DOCKER_BUILDKIT: 1
NODE_ENV: docker
SERVICE: pypi-worker
CROWD_TEMPORAL_TASKQUEUE: pypi-worker
CROWD_TEMPORAL_NAMESPACE: ${CROWD_PACKAGES_TEMPORAL_NAMESPACE}
SHELL: /bin/sh
SUPPRESS_NO_CONFIG_WARNING: 'true'

services:
pypi-worker:
build:
context: ../../
dockerfile: ./scripts/services/docker/Dockerfile.packages
command: 'pnpm run start:pypi-worker'
working_dir: /usr/crowd/app/services/apps/packages_worker
env_file:
- ../../backend/.env.dist.local
- ../../backend/.env.dist.composed
- ../../backend/.env.override.local
- ../../backend/.env.override.composed
environment:
<<: *env-args
restart: always
networks:
- crowd-bridge

pypi-worker-dev:
build:
context: ../../
dockerfile: ./scripts/services/docker/Dockerfile.packages
command: 'pnpm run dev:pypi-worker'
working_dir: /usr/crowd/app/services/apps/packages_worker
# user: '${USER_ID}:${GROUP_ID}'
env_file:
- ../../backend/.env.dist.local
- ../../backend/.env.dist.composed
- ../../backend/.env.override.local
- ../../backend/.env.override.composed
environment:
<<: *env-args
hostname: pypi-worker
networks:
- crowd-bridge
volumes:
- ../../services/libs/audit-logs/src:/usr/crowd/app/services/libs/audit-logs/src
- ../../services/libs/common/src:/usr/crowd/app/services/libs/common/src
- ../../services/libs/common_services/src:/usr/crowd/app/services/libs/common_services/src
- ../../services/libs/data-access-layer/src:/usr/crowd/app/services/libs/data-access-layer/src
- ../../services/libs/database/src:/usr/crowd/app/services/libs/database/src
- ../../services/libs/integrations/src:/usr/crowd/app/services/libs/integrations/src
- ../../services/libs/logging/src:/usr/crowd/app/services/libs/logging/src
- ../../services/libs/nango/src:/usr/crowd/app/services/libs/nango/src
- ../../services/libs/opensearch/src:/usr/crowd/app/services/libs/opensearch/src
- ../../services/libs/queue/src:/usr/crowd/app/services/libs/queue/src
- ../../services/libs/redis/src:/usr/crowd/app/services/libs/redis/src
- ../../services/libs/snowflake/src:/usr/crowd/app/services/libs/snowflake/src
- ../../services/libs/telemetry/src:/usr/crowd/app/services/libs/telemetry/src
- ../../services/libs/temporal/src:/usr/crowd/app/services/libs/temporal/src
- ../../services/libs/types/src:/usr/crowd/app/services/libs/types/src
- ../../services/apps/packages_worker/src:/usr/crowd/app/services/apps/packages_worker/src

networks:
crowd-bridge:
external: true
16 changes: 13 additions & 3 deletions services/apps/packages_worker/CONTEXT.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# OSS Packages

Tracks open-source packages across ecosystems (npm, Maven). All packages live in `packages`; criticality scoring ranks them in place and marks the top-N per ecosystem as `is_critical = true`.
Tracks open-source packages across ecosystems (npm, pypi, maven, go, nuget, cargo — the set grows as ecosystems are onboarded). All packages live in `packages`; criticality scoring ranks them in place and marks the top-N per ecosystem as `is_critical = true`.

## Language

Expand All @@ -25,7 +25,7 @@ Package URL (`pkg:npm/react`, `pkg:maven/org.apache/commons`). The canonical cro
_Avoid_: package id (that's the `packages.id` bigserial)

**Ecosystem**:
A package registry namespace — `npm`, `maven`. Lowercase.
A package registry namespace — `npm`, `pypi`, `maven`, `go`, `nuget`, `cargo`. Lowercase. Open set — new ecosystems are onboarded over time.
_Avoid_: system (deps.dev's term), registry

**Packument**:
Expand All @@ -38,9 +38,19 @@ One rolling 30-day span in `downloads_last_30d`, identified by its `end_date` (a
_Avoid_: month, period, snapshot

**Self-healing**:
A workflow that recomputes the full set of expected rows on every run, diffs against what's in the DB, and fills only the gaps. No assumption of continuity between runs.
A workflow that recomputes the full set of expected rows on every run, diffs against what's in the DB, and fills only the gaps. No assumption of continuity between runs. **npm downloads only** — pypi downloads deliberately do NOT self-heal (see **Trailing re-scan** / **Latest-window refresh**).
_Avoid_: backfill (that's the one-time historical fill; self-healing is the ongoing property)

**Trailing re-scan** (pypi daily):
The pypi daily downloads workflow re-scans a fixed 2-day trailing window (`[today−2, today−1]`) every run and upserts. It corrects a partial most-recent partition but does **not** diff against the DB or fill older gaps. Missed days are recovered only by **Backfill**.
_Avoid_: self-healing (that's the npm gap-filling property)

**Latest-window refresh** (pypi 30d):
The pypi 30d workflow, given no `fromDate`, computes and ingests only the latest **Window** (idempotent upsert, mirrored to `packages.downloads_last_30d`). It does not gap-fill missed months; those are recovered by **Backfill**.

**Backfill**:
A manual, one-time run over an explicit date range to fill history or recover gaps — pypi daily takes `{startDate, endDate}`, pypi 30d takes `{fromDate}` (enumerates every monthly **Window** from then to the latest). For pypi downloads this is the _only_ gap-recovery mechanism; scheduled runs are fixed-window, not self-healing.

## Relationships

- All packages live in `packages`; `rank_packages()` sets `is_critical = true` on the top-N per ecosystem to define the **Critical slice**.
Expand Down
3 changes: 3 additions & 0 deletions services/apps/packages_worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
"start:npm-worker": "CROWD_TEMPORAL_TASKQUEUE=npm-worker SERVICE=npm-worker tsx src/bin/npm-worker.ts",
"dev:npm-worker": "CROWD_TEMPORAL_TASKQUEUE=npm-worker SERVICE=npm-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9236 src/bin/npm-worker.ts",
"dev:npm-worker:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=npm-worker SERVICE=npm-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9236 src/bin/npm-worker.ts",
"start:pypi-worker": "CROWD_TEMPORAL_TASKQUEUE=pypi-worker SERVICE=pypi-worker tsx src/bin/pypi-worker.ts",
"dev:pypi-worker": "CROWD_TEMPORAL_TASKQUEUE=pypi-worker SERVICE=pypi-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9243 src/bin/pypi-worker.ts",
"dev:pypi-worker:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=pypi-worker SERVICE=pypi-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9243 src/bin/pypi-worker.ts",
Comment on lines +32 to +33
"start:osv-worker": "CROWD_TEMPORAL_TASKQUEUE=osv-worker SERVICE=osv-worker tsx src/bin/osv-worker.ts",
"dev:osv-worker": "CROWD_TEMPORAL_TASKQUEUE=osv-worker SERVICE=osv-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9238 src/bin/osv-worker.ts",
"dev:osv-worker:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=osv-worker SERVICE=osv-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9238 src/bin/osv-worker.ts",
Expand Down
6 changes: 6 additions & 0 deletions services/apps/packages_worker/src/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,10 @@ export {
cargoCleanup,
} from './cargo/activities'
export { enrichGoVersionsBatch, enrichGoStatusBatch } from './go/activities'
export {
getUnscannedPypiBatch,
ingestPypiPackageBatch,
pypiStopAfterFirstPage,
} from './pypi/activities'
export { getCriticalPypiCount } from './pypi/downloads/getCriticalPypiCount'
export { processNuGetBatch } from './nuget/activities'
6 changes: 6 additions & 0 deletions services/apps/packages_worker/src/bin/bq-dataset-ingest.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
import { scheduleOsspckgsBootstrap } from '../deps-dev/schedules/bootstrap'
import {
schedulePypiDownloads30d,
schedulePypiDownloadsDaily,
} from '../pypi/downloads/pypiDownloads'
import { svc } from '../service'

setImmediate(async () => {
await svc.init()
await scheduleOsspckgsBootstrap()
await schedulePypiDownloads30d()
await schedulePypiDownloadsDaily()
await svc.start()
})
8 changes: 8 additions & 0 deletions services/apps/packages_worker/src/bin/pypi-worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { schedulePypiIngest } from '../pypi/schedule'
import { svc } from '../service'

setImmediate(async () => {
await svc.init()
await schedulePypiIngest()
await svc.start()
})
2 changes: 2 additions & 0 deletions services/apps/packages_worker/src/deps-dev/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ The mode-specific key takes precedence over the generic key. Value must be a pos
| `BQ_DATASET_INGEST_DEPENDENT_COUNTS_NUGET_MAX_BQ_GB` | 200 | `dependent_counts_nuget` | NUGET exact reverse transitive closure over `NuGetRequirementsLatest` (script mode). All 3 count columns. `maximumBytesBilled` runaway cap above the measured ~32 GB. |
| `BQ_DATASET_INGEST_SCORECARD_REPOS_MAX_BQ_GB` | 50 | `scorecard_repos` | |
| `BQ_DATASET_INGEST_SCORECARD_CHECKS_MAX_BQ_GB` | 500 | `scorecard_checks` | |
| `BQ_DATASET_INGEST_PYPI_DOWNLOADS_30D_MAX_BQ_GB` | 6000 | `pypi_downloads_30d` | Per 30-day window scan (~4.56 TB measured; set in `ingestPypiDownloads.ts`). |
| `BQ_DATASET_INGEST_PYPI_DOWNLOADS_DAILY_MAX_BQ_GB` | 2000 | `pypi_downloads_daily` | Daily 2-day trailing window (~300 GB); scales with backfill range, raise for long backfills. |

The override logic lives in `src/deps-dev/activities/bqExportToGcs.ts`.

Expand Down
11 changes: 3 additions & 8 deletions services/apps/packages_worker/src/npm/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import type { QueryExecutor } from '@crowd/data-access-layer/src/queryExecutor'
import { getServiceChildLogger } from '@crowd/logging'

import { getPackagesDb } from '../db'
import { proxyUrl } from '../proxies'
import { isClientError } from '../utils/isClientError'

import { NPM_EARLIEST, computeChunks } from './downloadGaps'
import { fetchChangesSince, fetchCurrentSeq } from './fetchChanges'
Expand All @@ -35,7 +37,7 @@ import {
} from './fetchDownloads'
import { fetchPackument } from './fetchPackument'
import { Last30dWindow, computeMissingLast30dWindows } from './last30dGaps'
import { laneCount, proxyForLane, proxyUrl } from './proxies'
import { laneCount, proxyForLane } from './proxies'
import { isFetchError } from './types'
import { upsertPackage } from './upsertPackage'

Expand Down Expand Up @@ -94,13 +96,6 @@ export async function commitNpmChangesSeq(lastSeq: string): Promise<void> {
await setNpmChangesLastSeq(qx, lastSeq)
}

// 4xx (404 or any other client error like 405 from a malformed/illegal npm name —
// e.g. deps.dev dependency-chain strings that leaked into `packages`). 429 is
// excluded — it's transient and handled by the slow exponential path.
function isClientError(code: number | undefined, kind: string): boolean {
return kind === 'NOT_FOUND' || (code !== undefined && code >= 400 && code < 500 && code !== 429)
}

// 4xx errors get a few quick in-lane retries with a small linear backoff (1s, 2s),
// then the package is given up on and marked scanned. 429/5xx/network errors are NOT
// handled here — they throw and ride Temporal's exponential activity-retry instead.
Expand Down
21 changes: 0 additions & 21 deletions services/apps/packages_worker/src/npm/normalize.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,6 @@ export function parseNpmName(raw: string): { namespace: string | null; name: str
return { namespace: null, name: raw }
}

// Postgres text columns cannot store NUL (U+0000); npm packuments occasionally
// carry them (e.g. mojibake descriptions). Strip them in place from every string
// in the packument before persisting — otherwise the inlined value breaks the
// PostgreSQL wire protocol ("invalid message format").
export function stripNullBytesDeep<T>(value: T): T {
if (typeof value === 'string') {
// eslint-disable-next-line no-control-regex
return value.replace(/\u0000/g, '') as T
}
if (Array.isArray(value)) {
for (let i = 0; i < value.length; i++) value[i] = stripNullBytesDeep(value[i])
return value
}
if (value !== null && typeof value === 'object') {
const obj = value as Record<string, unknown>
for (const k of Object.keys(obj)) obj[k] = stripNullBytesDeep(obj[k])
return value
}
return value
}

export function normalizeLicenses(packument: Packument): string[] {
const rawArr = packument.licenses
if (rawArr && Array.isArray(rawArr)) {
Expand Down
Loading
Loading