diff --git a/sdks/python/apache_beam/io/requestresponse.py b/sdks/python/apache_beam/io/requestresponse.py index 9fdf33e2299d..ef570f4d8556 100644 --- a/sdks/python/apache_beam/io/requestresponse.py +++ b/sdks/python/apache_beam/io/requestresponse.py @@ -358,7 +358,7 @@ def process(self, request: RequestT, *args, **kwargs): self._metrics_collector.requests.inc(1) is_throttled_request = False - if self._throttler: + if self._throttler is not None: while self._throttler.throttler.throttle_request(time.time() * MSEC_TO_SEC): _LOGGER.info( @@ -375,7 +375,8 @@ def process(self, request: RequestT, *args, **kwargs): response = self._repeater.repeat( self._caller, request, self._timeout, self._metrics_collector) self._metrics_collector.responses.inc(1) - self._throttler.throttler.successful_request(req_time * MSEC_TO_SEC) + if self._throttler is not None: + self._throttler.throttler.successful_request(req_time * MSEC_TO_SEC) yield response except Exception as e: raise e diff --git a/sdks/python/apache_beam/io/requestresponse_test.py b/sdks/python/apache_beam/io/requestresponse_test.py index f88df9657dae..7d4d6b4caf38 100644 --- a/sdks/python/apache_beam/io/requestresponse_test.py +++ b/sdks/python/apache_beam/io/requestresponse_test.py @@ -95,6 +95,16 @@ def test_valid_call(self): self.assertIsNotNone(output) + def test_valid_call_without_throttler(self): + caller = AckCaller() + with TestPipeline() as test_pipeline: + output = ( + test_pipeline + | beam.Create(["sample_request"]) + | RequestResponseIO(caller=caller, throttler=None)) + + self.assertIsNotNone(output) + def test_call_timeout(self): caller = CallerWithTimeout() with self.assertRaisesRegex(Exception, "Timeout"):