Skip to content

Kafka/long lived producer#83

Draft
guillermodotn wants to merge 1 commit into
release-engineering:mainfrom
guillermodotn:kafka/long-lived-producer
Draft

Kafka/long lived producer#83
guillermodotn wants to merge 1 commit into
release-engineering:mainfrom
guillermodotn:kafka/long-lived-producer

Conversation

@guillermodotn

Copy link
Copy Markdown
Contributor

Addresses sections 3 and 4 commented on PR #82

3.- There is batching of messages. The producer does this automatically (partially).
4.- The producer is created for each batch of messages. Kafka seems to prefer a long lived producer that is reused.

Comment ref: #82 (review)
Stacked on: #82

@guillermodotn guillermodotn requested a review from lubomir June 26, 2026 11:11
@guillermodotn guillermodotn force-pushed the kafka/long-lived-producer branch from 08ef23e to da3334f Compare June 26, 2026 11:15
@codecov-commenter

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 84.61538% with 2 lines in your changes missing coverage. Please review.
✅ Project coverage is 83.73%. Comparing base (a24ff29) to head (da3334f).

Files with missing lines Patch % Lines
cts/messaging.py 84.61% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main      #83      +/-   ##
==========================================
+ Coverage   83.69%   83.73%   +0.03%     
==========================================
  Files          13       13              
  Lines        1325     1328       +3     
==========================================
+ Hits         1109     1112       +3     
  Misses        216      216              
Flag Coverage Δ
unit-tests 83.73% <84.61%> (+0.03%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@lubomir lubomir left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the general direction. However, I think we should wait with further changes until we confirm in stage that the code is indeed working so that we don't have to chase a moving target.

Comment thread cts/messaging.py
for msg in msgs:
event = msg.get("event", "event")
topic = "%s%s" % (conf.messaging_topic_prefix, event)
producer.send(topic, msg)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this actually ever raise any exceptions? It returns a Future immediately, so I would not expecte any network issues to appear as exceptions. Adding back the flush() might help.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flush() was dropped to let Kafka handle batching. CTS sends few messages, and linger_ms=0 means they're sent almost immediately anyway.

But you're right, without flush(), send() just returns a Future and delivery errors are never raised. The error recovery code would never trigger. I'll add it back.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The built-in batching is a good point though. I didn't think about that. Maybe it's the error handling that should be removed?

Comment thread cts/messaging.py
_kafka_producer.close()
except Exception:
pass
_kafka_producer = None

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code seems rather fragile. There's a helper to create the producer, but here we still need to touch the global variable directly. Does KafkaProducer have some reconnection logic we could use instead?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Roger, relying on Kafka's built-in reconnection and retry API instead.

@guillermodotn guillermodotn marked this pull request as draft June 26, 2026 12:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants