Skip to content

feat: Bring Your Own Spark - SparkApplication#6550

Draft
aniketpalu wants to merge 1 commit into
feast-dev:masterfrom
aniketpalu:feat/spark-application-compute-engine
Draft

feat: Bring Your Own Spark - SparkApplication#6550
aniketpalu wants to merge 1 commit into
feast-dev:masterfrom
aniketpalu:feat/spark-application-compute-engine

Conversation

@aniketpalu

Copy link
Copy Markdown
Contributor

What this PR does / why we need it:

Core changes

Upstream (feature_store.py):

  • Refactored `materialize()` and `materialize_incremental()` to collect all `MaterializationTask`s and pass them to the engine in a single `batch_engine.materialize(registry, tasks)` call. This enables batching engines to process all feature views in one job. Existing engines are unaffected — `ComputeEngine.materialize()` base class loops tasks internally via `_materialize_one()`.
  • Added public `get_provider()` method on `FeatureStore` (wraps existing private `_get_provider()`).
    New engine (spark_application/):
  • `config.py`: Pydantic config with Spark resource specs, K8s options, validation (`env` entries must have `name`, `staging_location` warning).
  • `compute.py`: Creates a K8s Secret with `feature_store.yaml` + task list, builds SparkApplication CR, polls for completion, retrieves driver logs on failure.
  • `job.py`: Maps all 14 SparkApplication states to `MaterializationJobStatus`, with exponential backoff retry on transient API errors.
  • `main.py`: Driver script that runs inside the pod — loads config from Secret, materializes each feature view via `provider.materialize_single_feature_view()`. Supports concurrent FV processing via ThreadPoolExecutor + FAIR scheduler.
  • `Dockerfile`: Based on `apache/spark:4.0.1`, installs `feast[redis]==0.64.0`, includes Hadoop AWS JARs for S3A support.

Design decisions

  • Two config rewrites in driver pod: `batch_engine` → `spark.engine` (prevents recursive CRD creation, uses local SparkSession), `registry` → `remote` when `registry_address` is set (pod can't access server filesystem).
  • Driver calls `provider.materialize_single_feature_view()`, not `store.materialize()` — avoids FeatureViewState transition conflicts (MATERIALIZING→MATERIALIZING is invalid) and duplicate OpenLineage/MLflow emissions.
  • SQLite online store rejected at init — data written inside a pod is lost on termination.
  • Kueue integration via `queue_name` config field → `kueue.x-k8s.io/queue-name` label.

Validated on

  • OpenShift AI (ROSA) with Spark Operator v2.5.0, MinIO (S3A), Redis
  • 5 feature views, 4800 rows each, 2 executors: 2.4x speedup with concurrent mode

Test plan

  • 12 unit tests: config defaults, SQLite rejection, registry validation, config rewrites, CR structure, state mapping (14 states), cleanup 404 handling, timeout behavior, job naming
  • Existing tests unaffected: 29 feature_view_state + 22 local_feature_store tests pass
  • Integration test on cluster with real Spark Operator"

Which issue(s) this PR fixes:

Checks

  • I've made sure the tests are passing.
  • My commits are signed off (git commit -s)
  • My PR title follows conventional commits format

Testing Strategy

  • Unit tests
  • Integration tests
  • Manual tests
  • Testing is not required for this change

Misc

…aterialization

Adds a new batch compute engine that submits materialization jobs as
SparkApplication CRDs via the Kubeflow Spark Operator. One 'feast materialize'
call creates one SparkApplication pod that processes all feature views using
distributed Spark, rather than running in-process on the Feast server.
Key changes:
- Refactor materialize()/materialize_incremental() to pass all tasks to the
  engine in a single batch call instead of looping per feature view. Existing
  engines are unaffected (base class loops tasks internally via _materialize_one).
- Add public get_provider() method on FeatureStore.
- New spark_application engine: config, compute, job, driver script, Dockerfile.
- 12 unit tests covering config, validation, CR structure, state mapping,
  timeout, cleanup, and job naming.
@aniketpalu aniketpalu requested a review from a team as a code owner June 24, 2026 08:57
from tqdm import tqdm

fv_name = task_info["feature_view"]
logger.info(f"Thread started: {fv_name}")
Comment on lines +112 to +113
f"Starting materialization: {total} feature views, "
f"concurrency={concurrency}"
succeeded, failed = 0, 0
for i, task in enumerate(tasks, 1):
fv_name = task["feature_view"]
logger.info(f"[{i}/{total}] Materializing: {fv_name}")
try:
name, elapsed = _materialize_one_fv(spark, feast_config, task)
succeeded += 1
logger.info(f"[{i}/{total}] Completed: {name} ({elapsed:.1f}s)")
logger.info(f"[{i}/{total}] Completed: {name} ({elapsed:.1f}s)")
except Exception:
failed += 1
logger.exception(f"[{i}/{total}] Failed: {fv_name}")
logger.info(f"Completed: {name} ({elapsed:.1f}s)")
except Exception:
failed += 1
logger.exception(f"Failed: {fv_name}")
@aniketpalu aniketpalu marked this pull request as draft June 24, 2026 11:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants