diff --git a/changes/48497-cron-interrupted-run-status b/changes/48497-cron-interrupted-run-status new file mode 100644 index 00000000000..ec573cbc06d --- /dev/null +++ b/changes/48497-cron-interrupted-run-status @@ -0,0 +1 @@ +- Fixed cron jobs getting stuck in "expired" when a run is interrupted mid-flight (e.g. during server shutdown); the run now records a terminal "canceled" status, preserving any job errors, instead of being left "pending" until reaped to "expired". diff --git a/server/service/schedule/schedule.go b/server/service/schedule/schedule.go index d7e30483ff6..de2a8e99766 100644 --- a/server/service/schedule/schedule.go +++ b/server/service/schedule/schedule.go @@ -20,6 +20,10 @@ import ( "go.opentelemetry.io/otel/trace" ) +// terminalStatusWriteTimeout bounds the detached write that records a run's +// terminal status. +const terminalStatusWriteTimeout = 30 * time.Second + // ReloadInterval reloads and returns a new interval. type ReloadInterval func(ctx context.Context) (time.Duration, error) @@ -522,11 +526,24 @@ func (s *Schedule) runWithStats(ctx context.Context, statsType fleet.CronStatsTy s.runAllJobs(ctx) - if err := s.updateStats(ctx, statsID, fleet.CronStatsStatusCompleted); err != nil { - s.logger.ErrorContext(ctx, fmt.Sprintf("update cron stats %s", s.name), "err", err) - ctxerr.Handle(ctx, err) + status := fleet.CronStatsStatusCompleted + if ctx.Err() != nil && len(s.errors) > 0 { + status = fleet.CronStatsStatusCanceled + } + + // Record the terminal status on a context detached from cancellation, with + // its own short timeout. Otherwise an interrupted run cannot persist its + // outcome (the write would fail on the cancelled context), leaving the row + // "pending" until CleanupCronStats reaps it to "expired" — which hides the + // fact that the run was actually cancelled and discards the captured job + // errors. + updateCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), terminalStatusWriteTimeout) + defer cancel() + if err := s.updateStats(updateCtx, statsID, status); err != nil { + s.logger.ErrorContext(updateCtx, fmt.Sprintf("update cron stats %s", s.name), "err", err) + ctxerr.Handle(updateCtx, err) } - s.logger.InfoContext(ctx, "completed") + s.logger.InfoContext(updateCtx, "run finished", "status", string(status)) } // runAllJobs runs all jobs in the schedule with tracing context. diff --git a/server/service/schedule/schedule_test.go b/server/service/schedule/schedule_test.go index 14920ac2f69..1981b1b7670 100644 --- a/server/service/schedule/schedule_test.go +++ b/server/service/schedule/schedule_test.go @@ -991,3 +991,162 @@ func TestRemoteTriggerSchedule(t *testing.T) { rts.Start() // should not panic }) } + +// ctxAwareStatsStore mimics the real datastore: its writes run on the provided +// context and fail when that context is cancelled (as ds.writer(ctx).ExecContext +// does). It records the last persisted status and errors so tests can assert on +// the terminal state of a run. +type ctxAwareStatsStore struct { + mu sync.Mutex + status fleet.CronStatsStatus + errors fleet.CronScheduleErrors +} + +func (s *ctxAwareStatsStore) GetLatestCronStats(_ context.Context, _ string) ([]fleet.CronStats, error) { + return []fleet.CronStats{}, nil +} + +func (s *ctxAwareStatsStore) InsertCronStats(ctx context.Context, _ fleet.CronStatsType, _ string, _ string, status fleet.CronStatsStatus) (int, error) { + if err := ctx.Err(); err != nil { + return 0, err + } + s.mu.Lock() + defer s.mu.Unlock() + s.status = status + return 1, nil +} + +func (s *ctxAwareStatsStore) UpdateCronStats(ctx context.Context, _ int, status fleet.CronStatsStatus, cronErrors *fleet.CronScheduleErrors) error { + if err := ctx.Err(); err != nil { + return err + } + s.mu.Lock() + defer s.mu.Unlock() + s.status = status + if cronErrors != nil { + s.errors = *cronErrors + } + return nil +} + +func (s *ctxAwareStatsStore) ClaimCronStats(_ context.Context, _ int, _ string, _ fleet.CronStatsStatus) error { + return nil +} + +func (s *ctxAwareStatsStore) getStatus() fleet.CronStatsStatus { + s.mu.Lock() + defer s.mu.Unlock() + return s.status +} + +func (s *ctxAwareStatsStore) getErrors() fleet.CronScheduleErrors { + s.mu.Lock() + defer s.mu.Unlock() + return s.errors +} + +// TestRunWithStats verifies the terminal status runWithStats records for a run. +// A run is "canceled" only when the context was cancelled (e.g. the instance +// received SIGTERM mid-run) AND a job reported an error; otherwise it is +// "completed". In every case the captured job errors must be persisted rather +// than left "pending" to be reaped to "expired". +func TestRunWithStats(t *testing.T) { + testCases := []struct { + name string + // jobs builds the schedule's jobs. cancel cancels the run's context, so a + // job can simulate a shutdown signal arriving mid-run. + jobs func(cancel context.CancelFunc) []Option + statsType fleet.CronStatsType + existingStatsID int // > 0 mirrors a claimed triggered run (skips the insert) + wantStatus fleet.CronStatsStatus + wantErrorKeys []string // exact set of job IDs expected in the persisted errors + }{ + { + name: "interrupted scheduled run records canceled with errors", + jobs: func(cancel context.CancelFunc) []Option { + return []Option{ + WithJob("interrupted_job", func(jobCtx context.Context) error { + cancel() // shutdown signal arrives while this job is running + return jobCtx.Err() + }), + WithJob("never_reached_cleanly", func(jobCtx context.Context) error { + return jobCtx.Err() + }), + } + }, + statsType: fleet.CronStatsTypeScheduled, + wantStatus: fleet.CronStatsStatusCanceled, + // The second job also returns its context error on the already-cancelled + // context, so both jobs are persisted. + wantErrorKeys: []string{"interrupted_job", "never_reached_cleanly"}, + }, + { + name: "clean run records completed", + jobs: func(context.CancelFunc) []Option { + return []Option{WithJob("ok_job", func(context.Context) error { return nil })} + }, + statsType: fleet.CronStatsTypeScheduled, + wantStatus: fleet.CronStatsStatusCompleted, + }, + { + name: "cancellation racing a clean run records completed", + jobs: func(cancel context.CancelFunc) []Option { + return []Option{ + WithJob("clean_job", func(context.Context) error { + // The job finishes its work, then cancellation lands in the + // window before it returns cleanly. No job error results. + cancel() + return nil + }), + } + }, + statsType: fleet.CronStatsTypeScheduled, + wantStatus: fleet.CronStatsStatusCompleted, + }, + { + name: "interrupted triggered run records canceled with errors", + jobs: func(cancel context.CancelFunc) []Option { + return []Option{ + WithJob("interrupted_job", func(jobCtx context.Context) error { + cancel() + return jobCtx.Err() + }), + } + }, + statsType: fleet.CronStatsTypeTriggered, + existingStatsID: 42, + wantStatus: fleet.CronStatsStatusCanceled, + wantErrorKeys: []string{"interrupted_job"}, + }, + { + name: "job error without cancellation records completed", + jobs: func(context.CancelFunc) []Option { + return []Option{WithJob("failing_job", func(context.Context) error { return errors.New("boom") })} + }, + statsType: fleet.CronStatsTypeScheduled, + wantStatus: fleet.CronStatsStatusCompleted, + wantErrorKeys: []string{"failing_job"}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + store := &ctxAwareStatsStore{} + + s := New(ctx, "test_schedule", "test_instance", time.Hour, scheduletest.NopLocker{}, store, + tc.jobs(cancel)...) + + s.runWithStats(ctx, tc.statsType, tc.existingStatsID) + + require.Equal(t, tc.wantStatus, store.getStatus()) + gotErrors := store.getErrors() + gotKeys := make([]string, 0, len(gotErrors)) + for key := range gotErrors { + gotKeys = append(gotKeys, key) + } + require.ElementsMatch(t, tc.wantErrorKeys, gotKeys) + }) + } +}