Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugins/out_kafka/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Fluent Bit Kafka Output plugin
set(src
kafka_config.c
kafka_schema_registry.c
kafka_topic.c
kafka.c)

Expand Down
78 changes: 77 additions & 1 deletion plugins/out_kafka/kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,12 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
#ifdef FLB_HAVE_AVRO_ENCODER
else if (ctx->format == FLB_KAFKA_FMT_AVRO) {

ret = flb_kafka_schema_registry_resolve(ctx);
if (ret != FLB_OK) {
msgpack_sbuffer_destroy(&mp_sbuf);
return ret;
}

flb_plg_debug(ctx->ins, "avro schema ID:%d:\n", ctx->avro_fields.schema_id);
flb_plg_debug(ctx->ins, "avro schema string:%s:\n", ctx->avro_fields.schema_str);

Expand Down Expand Up @@ -1497,6 +1503,76 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_out_kafka, avro_fields) + offsetof(struct flb_avro_fields, schema_id),
"Set AVRO schema ID."
},
{
FLB_CONFIG_MAP_STR, "schema_registry_url", (char *)NULL,
0, FLB_TRUE, offsetof(struct flb_out_kafka, schema_registry_url),
"Set the Confluent Schema Registry base URL for AVRO schemas."
},
{
FLB_CONFIG_MAP_STR, "schema.registry.url", (char *)NULL,
0, FLB_TRUE, offsetof(struct flb_out_kafka, schema_registry_url),
"Set the Confluent Schema Registry base URL for AVRO schemas."
},
{
FLB_CONFIG_MAP_STR, "schema_registry_subject", (char *)NULL,
0, FLB_TRUE, offsetof(struct flb_out_kafka, schema_registry_subject),
"Set the Confluent Schema Registry subject for AVRO schemas."
},
{
FLB_CONFIG_MAP_STR, "schema.registry.subject", (char *)NULL,
0, FLB_TRUE, offsetof(struct flb_out_kafka, schema_registry_subject),
"Set the Confluent Schema Registry subject for AVRO schemas."
},
{
FLB_CONFIG_MAP_STR, "schema_registry_version", "latest",
0, FLB_TRUE, offsetof(struct flb_out_kafka, schema_registry_version),
"Set the Confluent Schema Registry subject version for AVRO schemas."
},
{
FLB_CONFIG_MAP_STR, "schema.registry.version", (char *)NULL,
0, FLB_TRUE, offsetof(struct flb_out_kafka, schema_registry_version),
"Set the Confluent Schema Registry subject version for AVRO schemas."
},
{
FLB_CONFIG_MAP_STR, "schema_registry_http_user", (char *)NULL,
0, FLB_TRUE, offsetof(struct flb_out_kafka, schema_registry_http_user),
"Set the Confluent Schema Registry HTTP basic authentication user."
},
{
FLB_CONFIG_MAP_STR, "schema.registry.http.user", (char *)NULL,
0, FLB_TRUE, offsetof(struct flb_out_kafka, schema_registry_http_user),
"Set the Confluent Schema Registry HTTP basic authentication user."
},
{
FLB_CONFIG_MAP_STR, "schema_registry_http_passwd", (char *)NULL,
0, FLB_TRUE, offsetof(struct flb_out_kafka, schema_registry_http_passwd),
"Set the Confluent Schema Registry HTTP basic authentication password."
},
{
FLB_CONFIG_MAP_STR, "schema.registry.http.password", (char *)NULL,
0, FLB_TRUE, offsetof(struct flb_out_kafka, schema_registry_http_passwd),
"Set the Confluent Schema Registry HTTP basic authentication password."
},
{
FLB_CONFIG_MAP_STR, "schema_registry_bearer_token", (char *)NULL,
0, FLB_TRUE, offsetof(struct flb_out_kafka, schema_registry_bearer_token),
"Set the Confluent Schema Registry bearer token."
},
{
FLB_CONFIG_MAP_STR, "schema.registry.bearer.token", (char *)NULL,
0, FLB_TRUE, offsetof(struct flb_out_kafka, schema_registry_bearer_token),
"Set the Confluent Schema Registry bearer token."
},
{
FLB_CONFIG_MAP_STR, "schema_registry_framing", "cp1",
0, FLB_TRUE, offsetof(struct flb_out_kafka, schema_registry_framing),
"Set the Schema Registry serializer framing. Only cp1 is supported."
},
{
FLB_CONFIG_MAP_STR, "serializer.framing", (char *)NULL,
0, FLB_TRUE, offsetof(struct flb_out_kafka, schema_registry_framing),
"Set the Schema Registry serializer framing. Only cp1 is supported."
},
#endif
{
FLB_CONFIG_MAP_STR, "topics", (char *)NULL,
Expand Down Expand Up @@ -1556,6 +1632,6 @@ struct flb_output_plugin out_kafka_plugin = {
.cb_flush = cb_kafka_flush,
.cb_exit = cb_kafka_exit,
.config_map = config_map,
.flags = 0,
.flags = FLB_IO_OPT_TLS,
.event_type = FLB_OUTPUT_LOGS
};
10 changes: 10 additions & 0 deletions plugins/out_kafka/kafka_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins,
ctx->ins = ins;
ctx->blocked = FLB_FALSE;
mk_list_init(&ctx->topics);
#ifdef FLB_HAVE_AVRO_ENCODER
mk_list_init(&ctx->schema_registry_endpoints);
#endif

ret = flb_output_config_map_set(ins, (void*) ctx);
if (ret == -1) {
Expand Down Expand Up @@ -266,6 +269,12 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins,
if (tmp) {
ctx->avro_fields.schema_str = flb_sds_create(tmp);
}

ret = flb_kafka_schema_registry_configure(ctx, config);
if (ret == -1) {
flb_out_kafka_destroy(ctx);
return NULL;
}
#endif

/* Config: Topic */
Expand Down Expand Up @@ -341,6 +350,7 @@ int flb_out_kafka_destroy(struct flb_out_kafka *ctx)
#ifdef FLB_HAVE_AVRO_ENCODER
// avro
flb_sds_destroy(ctx->avro_fields.schema_str);
flb_kafka_schema_registry_destroy(ctx);
#endif

flb_free(ctx);
Expand Down
36 changes: 36 additions & 0 deletions plugins/out_kafka/kafka_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#endif

#include <fluent-bit/flb_kafka.h>
#include <fluent-bit/flb_upstream.h>
#include <fluent-bit/aws/flb_aws_msk_iam.h>

#define FLB_KAFKA_FMT_JSON 0
Expand Down Expand Up @@ -56,6 +57,19 @@
#define FLB_JSON_DATE_ISO8601_NS 2
#define FLB_JSON_DATE_ISO8601_FMT "%Y-%m-%dT%H:%M:%S"

#ifdef FLB_HAVE_AVRO_ENCODER
struct flb_kafka_schema_registry_endpoint {
flb_sds_t host;
flb_sds_t uri;
int port;
#ifdef FLB_HAVE_TLS
struct flb_tls *tls;
#endif
struct flb_upstream *upstream;
struct mk_list _head;
};
#endif

struct flb_kafka_topic {
int name_len;
char *name;
Expand Down Expand Up @@ -127,6 +141,18 @@ struct flb_out_kafka {
// flb_sds_t avro_schema_str;
// flb_sds_t avro_schema_id;
struct flb_avro_fields avro_fields;

/* Optional Confluent Schema Registry resolver for Avro schemas */
flb_sds_t schema_registry_url;
flb_sds_t schema_registry_subject;
flb_sds_t schema_registry_version;
flb_sds_t schema_registry_http_user;
flb_sds_t schema_registry_http_passwd;
flb_sds_t schema_registry_bearer_token;
flb_sds_t schema_registry_framing;
int schema_registry_endpoint_count;
int schema_registry_endpoint_index;
struct mk_list schema_registry_endpoints;
#endif

#ifdef FLB_HAVE_AWS_MSK_IAM
Expand All @@ -147,4 +173,14 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins,
struct flb_config *config);
int flb_out_kafka_destroy(struct flb_out_kafka *ctx);

#ifdef FLB_HAVE_AVRO_ENCODER
int flb_kafka_schema_registry_configure(struct flb_out_kafka *ctx,
struct flb_config *config);
int flb_kafka_schema_registry_resolve(struct flb_out_kafka *ctx);
int flb_kafka_schema_registry_parse_response(struct flb_out_kafka *ctx,
const char *payload,
size_t payload_size);
void flb_kafka_schema_registry_destroy(struct flb_out_kafka *ctx);
#endif

#endif
Loading
Loading