From 41c3657ba04fac3810d7b7498e758b1744324f33 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Fri, 26 Jun 2026 18:52:06 +0000 Subject: [PATCH 1/2] fix race condition with lock for reset --- .../apache_beam/runners/worker/statesampler_fast.pyx | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx b/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx index 7075ef47017d..c7836db53581 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx +++ b/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx @@ -153,9 +153,13 @@ cdef class StateSampler(object): self.sampling_thread.join() def reset(self): - for state in self.scoped_states_by_index: - (state)._nsecs = 0 - self.started = self.finished = False + pythread.PyThread_acquire_lock(self.lock, pythread.WAIT_LOCK) + try: + for state in self.scoped_states_by_index: + (state)._nsecs = 0 + self.started = self.finished = False + finally: + pythread.PyThread_release_lock(self.lock) cpdef ScopedState current_state(self): return self.current_state_c() From 1583e9b30c17f7887014efb2cc84ba2434dccb45 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Fri, 26 Jun 2026 23:46:26 +0000 Subject: [PATCH 2/2] add nogil --- sdks/python/apache_beam/runners/worker/statesampler_fast.pyx | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx b/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx index c7836db53581..5a67d6764ea0 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx +++ b/sdks/python/apache_beam/runners/worker/statesampler_fast.pyx @@ -153,7 +153,8 @@ cdef class StateSampler(object): self.sampling_thread.join() def reset(self): - pythread.PyThread_acquire_lock(self.lock, pythread.WAIT_LOCK) + with nogil: + pythread.PyThread_acquire_lock(self.lock, pythread.WAIT_LOCK) try: for state in self.scoped_states_by_index: (state)._nsecs = 0