diff --git a/cts/messaging.py b/cts/messaging.py index e2c3538..553adc2 100644 --- a/cts/messaging.py +++ b/cts/messaging.py @@ -92,51 +92,61 @@ def _retry_with_backoff(func, max_retries=3, initial_delay=1.0, backoff_multipli raise last_exception +_kafka_producer = None + + +def _get_kafka_producer(): + """Get or create a long-lived Kafka producer. + + The producer is created lazily on first use and reused for subsequent + calls to avoid repeated TCP/TLS/SASL handshakes. + """ + global _kafka_producer + if _kafka_producer is None: + from kafka import KafkaProducer + + _kafka_producer = KafkaProducer( + bootstrap_servers=conf.messaging_broker_urls, + compression_type=conf.messaging_kafka_compression_type, + security_protocol=conf.messaging_kafka_security_protocol, + sasl_mechanism=conf.messaging_kafka_sasl_mechanism, + sasl_plain_username=conf.messaging_kafka_username, + sasl_plain_password=conf.messaging_kafka_password, + value_serializer=lambda v: json.dumps(v).encode("utf-8"), + ) + return _kafka_producer + + def _kafka_send_msg(msgs): """Send messages to Kafka with retry logic. + Uses a persistent producer that is reused across calls. On failure, + the producer is closed and recreated on the next retry attempt. + :param list[dict] msgs: List of messages to be sent. :raises Exception: If Kafka operations fail after retries """ - from kafka import KafkaProducer def _send(): - """Inner function to send messages (will be retried on failure)""" - config = { - "bootstrap_servers": conf.messaging_broker_urls, - "compression_type": conf.messaging_kafka_compression_type, - "security_protocol": conf.messaging_kafka_security_protocol, - "sasl_mechanism": conf.messaging_kafka_sasl_mechanism, - "sasl_plain_username": conf.messaging_kafka_username, - "sasl_plain_password": conf.messaging_kafka_password, - "value_serializer": lambda v: json.dumps(v).encode("utf-8"), - } - - producer = None + global _kafka_producer try: - producer = KafkaProducer(**config) - - # Send all messages first, then flush once for better performance + producer = _get_kafka_producer() for msg in msgs: event = msg.get("event", "event") topic = "%s%s" % (conf.messaging_topic_prefix, event) producer.send(topic, msg) - - # Single flush for all messages - more efficient than flushing each message - producer.flush() - except Exception as e: log.error("Failed to send messages to Kafka: %s", str(e)) - raise - finally: - # Ensure producer is always closed, even on exceptions - if producer is not None: + # Close and discard the broken producer so the next retry + # creates a fresh connection. + if _kafka_producer is not None: try: - producer.close() - except Exception as e: - log.warning("Error closing Kafka producer: %s", str(e)) + _kafka_producer.close() + except Exception: + pass + _kafka_producer = None + raise - # Retry the send operation with exponential backoff _retry_with_backoff(_send) diff --git a/tests/test_events.py b/tests/test_events.py index 0fe9037..6e5d406 100644 --- a/tests/test_events.py +++ b/tests/test_events.py @@ -30,6 +30,7 @@ from cts import conf from cts import app, db +import cts.messaging from cts.messaging import _retry_with_backoff, _kafka_send_msg, _umb_send_msg, publish from cts.models import Compose, User, Tag from utils import ModelsBaseTest @@ -95,6 +96,14 @@ class TestKafkaSendMessageWhenComposeIsCreated(ModelsBaseTest): disable_event_handlers = False + def setUp(self): + super(TestKafkaSendMessageWhenComposeIsCreated, self).setUp() + cts.messaging._kafka_producer = None + + def tearDown(self): + super(TestKafkaSendMessageWhenComposeIsCreated, self).tearDown() + cts.messaging._kafka_producer = None + def setup_composes(self): User.create_user(username="odcs") db.session.commit() @@ -130,28 +139,45 @@ def test_send_message(self, KafkaProducer): }, ) - # Verify flush and close were called - mock_producer.flush.assert_called_once() - mock_producer.close.assert_called_once() + # Producer should not be closed on success (long-lived) + mock_producer.close.assert_not_called() @patch.object(conf, "messaging_broker_urls", new=["localhost:9092"]) @patch.object(conf, "messaging_kafka_username", new="test_user") @patch.object(conf, "messaging_kafka_password", new="test_password") @patch("kafka.KafkaProducer") def test_kafka_producer_closed_on_exception(self, KafkaProducer): - """Test that Kafka producer is closed even when send() fails""" + """Test that producer is closed and recreated on failure""" mock_producer = KafkaProducer.return_value mock_producer.send.side_effect = Exception("Send failed") msgs = [{"event": "test", "data": "test_data"}] - with patch("time.sleep"): # Mock sleep for retry backoff + with patch("time.sleep"): with self.assertRaises(Exception): _kafka_send_msg(msgs) - # Producer close should be called on every retry attempt (finally block) + # Producer should be closed on each failed attempt and recreated # Default is 3 retries + 1 initial = 4 attempts self.assertEqual(mock_producer.close.call_count, 4) + # After all retries exhausted, producer should be reset to None + self.assertIsNone(cts.messaging._kafka_producer) + + @patch.object(conf, "messaging_broker_urls", new=["localhost:9092"]) + @patch.object(conf, "messaging_kafka_username", new="test_user") + @patch.object(conf, "messaging_kafka_password", new="test_password") + @patch("kafka.KafkaProducer") + def test_kafka_producer_reused_across_calls(self, KafkaProducer): + """Test that producer is created once and reused""" + mock_producer = KafkaProducer.return_value + + _kafka_send_msg([{"event": "compose-created", "compose": {}}]) + _kafka_send_msg([{"event": "compose-tagged", "compose": {}}]) + + # Producer should only be created once + KafkaProducer.assert_called_once() + # But send should be called twice + self.assertEqual(mock_producer.send.call_count, 2) @patch("cts.messaging.publish") @@ -510,25 +536,32 @@ def producer_side_effect(*args, **kwargs): class TestKafkaRetries(unittest.TestCase): """Test Kafka-specific retry behavior""" + def setUp(self): + cts.messaging._kafka_producer = None + + def tearDown(self): + cts.messaging._kafka_producer = None + @patch.object(conf, "messaging_broker_urls", new=["localhost:9092"]) @patch.object(conf, "messaging_kafka_username", new="test_user") @patch.object(conf, "messaging_kafka_password", new="test_password") @patch.object(conf, "messaging_topic_prefix", new="cts.") def test_kafka_send_msg_retries_on_transient_failure(self): """Test that _kafka_send_msg retries on transient failures""" - # Simulate transient failure then success - attempt_count = [0] + # Simulate send failure on first call, then success + send_count = [0] mock_producer = Mock() - def producer_side_effect(*args, **kwargs): - attempt_count[0] += 1 - if attempt_count[0] == 1: + def send_side_effect(*args, **kwargs): + send_count[0] += 1 + if send_count[0] == 1: raise ConnectionError("Transient network error") - return mock_producer - with patch("time.sleep"): # Mock sleep to speed up test - with patch("kafka.KafkaProducer", side_effect=producer_side_effect): - # Should succeed on second attempt + mock_producer.send.side_effect = send_side_effect + + with patch("time.sleep"): + with patch("kafka.KafkaProducer", return_value=mock_producer): _kafka_send_msg([{"event": "test", "data": "test"}]) - self.assertEqual(attempt_count[0], 2) + # First send fails, producer is closed and reset, second send succeeds + self.assertEqual(send_count[0], 2)