Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 38 additions & 28 deletions cts/messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,51 +92,61 @@ def _retry_with_backoff(func, max_retries=3, initial_delay=1.0, backoff_multipli
raise last_exception


_kafka_producer = None


def _get_kafka_producer():
"""Get or create a long-lived Kafka producer.

The producer is created lazily on first use and reused for subsequent
calls to avoid repeated TCP/TLS/SASL handshakes.
"""
global _kafka_producer
if _kafka_producer is None:
from kafka import KafkaProducer

_kafka_producer = KafkaProducer(
bootstrap_servers=conf.messaging_broker_urls,
compression_type=conf.messaging_kafka_compression_type,
security_protocol=conf.messaging_kafka_security_protocol,
sasl_mechanism=conf.messaging_kafka_sasl_mechanism,
sasl_plain_username=conf.messaging_kafka_username,
sasl_plain_password=conf.messaging_kafka_password,
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
)
return _kafka_producer


def _kafka_send_msg(msgs):
"""Send messages to Kafka with retry logic.

Uses a persistent producer that is reused across calls. On failure,
the producer is closed and recreated on the next retry attempt.

:param list[dict] msgs: List of messages to be sent.
:raises Exception: If Kafka operations fail after retries
"""
from kafka import KafkaProducer

def _send():
"""Inner function to send messages (will be retried on failure)"""
config = {
"bootstrap_servers": conf.messaging_broker_urls,
"compression_type": conf.messaging_kafka_compression_type,
"security_protocol": conf.messaging_kafka_security_protocol,
"sasl_mechanism": conf.messaging_kafka_sasl_mechanism,
"sasl_plain_username": conf.messaging_kafka_username,
"sasl_plain_password": conf.messaging_kafka_password,
"value_serializer": lambda v: json.dumps(v).encode("utf-8"),
}

producer = None
global _kafka_producer
try:
producer = KafkaProducer(**config)

# Send all messages first, then flush once for better performance
producer = _get_kafka_producer()
for msg in msgs:
event = msg.get("event", "event")
topic = "%s%s" % (conf.messaging_topic_prefix, event)
producer.send(topic, msg)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this actually ever raise any exceptions? It returns a Future immediately, so I would not expecte any network issues to appear as exceptions. Adding back the flush() might help.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flush() was dropped to let Kafka handle batching. CTS sends few messages, and linger_ms=0 means they're sent almost immediately anyway.

But you're right, without flush(), send() just returns a Future and delivery errors are never raised. The error recovery code would never trigger. I'll add it back.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The built-in batching is a good point though. I didn't think about that. Maybe it's the error handling that should be removed?


# Single flush for all messages - more efficient than flushing each message
producer.flush()

except Exception as e:
log.error("Failed to send messages to Kafka: %s", str(e))
raise
finally:
# Ensure producer is always closed, even on exceptions
if producer is not None:
# Close and discard the broken producer so the next retry
# creates a fresh connection.
if _kafka_producer is not None:
try:
producer.close()
except Exception as e:
log.warning("Error closing Kafka producer: %s", str(e))
_kafka_producer.close()
except Exception:
pass
_kafka_producer = None

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code seems rather fragile. There's a helper to create the producer, but here we still need to touch the global variable directly. Does KafkaProducer have some reconnection logic we could use instead?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Roger, relying on Kafka's built-in reconnection and retry API instead.

raise

# Retry the send operation with exponential backoff
_retry_with_backoff(_send)


Expand Down
65 changes: 49 additions & 16 deletions tests/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

from cts import conf
from cts import app, db
import cts.messaging
from cts.messaging import _retry_with_backoff, _kafka_send_msg, _umb_send_msg, publish
from cts.models import Compose, User, Tag
from utils import ModelsBaseTest
Expand Down Expand Up @@ -95,6 +96,14 @@ class TestKafkaSendMessageWhenComposeIsCreated(ModelsBaseTest):

disable_event_handlers = False

def setUp(self):
super(TestKafkaSendMessageWhenComposeIsCreated, self).setUp()
cts.messaging._kafka_producer = None

def tearDown(self):
super(TestKafkaSendMessageWhenComposeIsCreated, self).tearDown()
cts.messaging._kafka_producer = None

def setup_composes(self):
User.create_user(username="odcs")
db.session.commit()
Expand Down Expand Up @@ -130,28 +139,45 @@ def test_send_message(self, KafkaProducer):
},
)

# Verify flush and close were called
mock_producer.flush.assert_called_once()
mock_producer.close.assert_called_once()
# Producer should not be closed on success (long-lived)
mock_producer.close.assert_not_called()

@patch.object(conf, "messaging_broker_urls", new=["localhost:9092"])
@patch.object(conf, "messaging_kafka_username", new="test_user")
@patch.object(conf, "messaging_kafka_password", new="test_password")
@patch("kafka.KafkaProducer")
def test_kafka_producer_closed_on_exception(self, KafkaProducer):
"""Test that Kafka producer is closed even when send() fails"""
"""Test that producer is closed and recreated on failure"""
mock_producer = KafkaProducer.return_value
mock_producer.send.side_effect = Exception("Send failed")

msgs = [{"event": "test", "data": "test_data"}]

with patch("time.sleep"): # Mock sleep for retry backoff
with patch("time.sleep"):
with self.assertRaises(Exception):
_kafka_send_msg(msgs)

# Producer close should be called on every retry attempt (finally block)
# Producer should be closed on each failed attempt and recreated
# Default is 3 retries + 1 initial = 4 attempts
self.assertEqual(mock_producer.close.call_count, 4)
# After all retries exhausted, producer should be reset to None
self.assertIsNone(cts.messaging._kafka_producer)

@patch.object(conf, "messaging_broker_urls", new=["localhost:9092"])
@patch.object(conf, "messaging_kafka_username", new="test_user")
@patch.object(conf, "messaging_kafka_password", new="test_password")
@patch("kafka.KafkaProducer")
def test_kafka_producer_reused_across_calls(self, KafkaProducer):
"""Test that producer is created once and reused"""
mock_producer = KafkaProducer.return_value

_kafka_send_msg([{"event": "compose-created", "compose": {}}])
_kafka_send_msg([{"event": "compose-tagged", "compose": {}}])

# Producer should only be created once
KafkaProducer.assert_called_once()
# But send should be called twice
self.assertEqual(mock_producer.send.call_count, 2)


@patch("cts.messaging.publish")
Expand Down Expand Up @@ -510,25 +536,32 @@ def producer_side_effect(*args, **kwargs):
class TestKafkaRetries(unittest.TestCase):
"""Test Kafka-specific retry behavior"""

def setUp(self):
cts.messaging._kafka_producer = None

def tearDown(self):
cts.messaging._kafka_producer = None

@patch.object(conf, "messaging_broker_urls", new=["localhost:9092"])
@patch.object(conf, "messaging_kafka_username", new="test_user")
@patch.object(conf, "messaging_kafka_password", new="test_password")
@patch.object(conf, "messaging_topic_prefix", new="cts.")
def test_kafka_send_msg_retries_on_transient_failure(self):
"""Test that _kafka_send_msg retries on transient failures"""
# Simulate transient failure then success
attempt_count = [0]
# Simulate send failure on first call, then success
send_count = [0]
mock_producer = Mock()

def producer_side_effect(*args, **kwargs):
attempt_count[0] += 1
if attempt_count[0] == 1:
def send_side_effect(*args, **kwargs):
send_count[0] += 1
if send_count[0] == 1:
raise ConnectionError("Transient network error")
return mock_producer

with patch("time.sleep"): # Mock sleep to speed up test
with patch("kafka.KafkaProducer", side_effect=producer_side_effect):
# Should succeed on second attempt
mock_producer.send.side_effect = send_side_effect

with patch("time.sleep"):
with patch("kafka.KafkaProducer", return_value=mock_producer):
_kafka_send_msg([{"event": "test", "data": "test"}])

self.assertEqual(attempt_count[0], 2)
# First send fails, producer is closed and reset, second send succeeds
self.assertEqual(send_count[0], 2)