Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugins/filter_kubernetes/kube_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ struct flb_kube {
size_t podname_len;

/* Kubernetes Token from FLB_KUBE_TOKEN file */
char *namespace_file;
char *token_file;
char *token;
size_t token_len;
Expand Down
201 changes: 180 additions & 21 deletions plugins/filter_kubernetes/kube_meta.c
Original file line number Diff line number Diff line change
Expand Up @@ -280,17 +280,20 @@ static int get_local_pod_info(struct flb_kube *ctx)
char *hostname;

/* Get the namespace name */
ret = file_to_buffer(FLB_KUBE_NAMESPACE, &ns, &ns_size);
ret = file_to_buffer(ctx->namespace_file, &ns, &ns_size);
if (ret == -1) {
/*
* If it fails, it's just informational, as likely the caller
* wanted to connect using the Proxy instead from inside a POD.
*/
flb_plg_warn(ctx->ins, "cannot open %s", FLB_KUBE_NAMESPACE);
flb_plg_warn(ctx->ins, "cannot open %s", ctx->namespace_file);
return FLB_FALSE;
}

/* Namespace */
while (ns_size > 0 && (ns[ns_size - 1] == '\n' || ns[ns_size - 1] == '\r')) {
ns[--ns_size] = '\0';
}
ctx->namespace = ns;
ctx->namespace_len = ns_size;

Expand Down Expand Up @@ -1951,6 +1954,85 @@ static inline int extract_pod_meta(struct flb_kube *ctx,
return 0;
}

static int set_local_namespace_meta(struct flb_kube *ctx,
struct flb_kube_meta *meta)
{
int n;

memset(meta, '\0', sizeof(struct flb_kube_meta));

if (ctx->namespace == NULL) {
return -1;
}

meta->namespace = flb_strndup(ctx->namespace, ctx->namespace_len);
if (meta->namespace == NULL) {
flb_errno();
return -1;
}
meta->namespace_len = ctx->namespace_len;

n = meta->namespace_len + 1;
meta->cache_key = flb_malloc(n);
if (meta->cache_key == NULL) {
flb_errno();
return -1;
}

memcpy(meta->cache_key, meta->namespace, meta->namespace_len);
meta->cache_key[meta->namespace_len] = '\0';
meta->cache_key_len = meta->namespace_len;

return 0;
}

static int set_local_pod_meta(struct flb_kube *ctx, struct flb_kube_meta *meta)
{
int n;
size_t off = 0;

memset(meta, '\0', sizeof(struct flb_kube_meta));

if (ctx->namespace == NULL || ctx->podname == NULL) {
return -1;
}

meta->namespace = flb_strndup(ctx->namespace, ctx->namespace_len);
if (meta->namespace == NULL) {
flb_errno();
return -1;
}
meta->namespace_len = ctx->namespace_len;
meta->fields++;

meta->podname = flb_strndup(ctx->podname, ctx->podname_len);
if (meta->podname == NULL) {
flb_errno();
return -1;
}
meta->podname_len = ctx->podname_len;
meta->fields++;

n = meta->namespace_len + 1 + meta->podname_len + 1;
meta->cache_key = flb_malloc(n);
if (meta->cache_key == NULL) {
flb_errno();
return -1;
}

memcpy(meta->cache_key, meta->namespace, meta->namespace_len);
off = meta->namespace_len;

meta->cache_key[off++] = ':';
memcpy(meta->cache_key + off, meta->podname, meta->podname_len);
off += meta->podname_len;

meta->cache_key[off] = '\0';
meta->cache_key_len = off;

return 0;
}

/*
* Given a fixed meta data (namespace), get API server information
* and merge buffers.
Expand Down Expand Up @@ -2314,12 +2396,10 @@ int flb_kube_dummy_meta_get(char **out_buf, size_t *out_size)
return 0;
}

static inline int flb_kube_pod_meta_get(struct flb_kube *ctx,
const char *tag, int tag_len,
const char *data, size_t data_size,
const char **out_buf, size_t *out_size,
struct flb_kube_meta *meta,
struct flb_kube_props *props)
static inline int lookup_pod_meta(struct flb_kube *ctx,
const char **out_buf, size_t *out_size,
struct flb_kube_meta *meta,
struct flb_kube_props *props)
{
int id;
int ret;
Expand All @@ -2329,12 +2409,6 @@ static inline int flb_kube_pod_meta_get(struct flb_kube *ctx,
size_t hash_meta_size;
msgpack_unpacked result;

/* Get metadata from tag or record (cache key is the important one) */
ret = extract_pod_meta(ctx, tag, tag_len, data, data_size, meta);
if (ret != 0) {
return -1;
}

/* Check if we have some data associated to the cache key */
ret = flb_hash_table_get(ctx->hash_table,
meta->cache_key, meta->cache_key_len,
Expand Down Expand Up @@ -2394,9 +2468,40 @@ static inline int flb_kube_pod_meta_get(struct flb_kube *ctx,
return 0;
}

static inline int flb_kube_namespace_meta_get(struct flb_kube *ctx,
static inline int flb_kube_pod_meta_get(struct flb_kube *ctx,
const char *tag, int tag_len,
const char *data, size_t data_size,
const char **out_buf, size_t *out_size,
struct flb_kube_meta *meta,
struct flb_kube_props *props)
{
int ret;

/* Get metadata from tag or record (cache key is the important one) */
ret = extract_pod_meta(ctx, tag, tag_len, data, data_size, meta);
if (ret != 0) {
return -1;
}

return lookup_pod_meta(ctx, out_buf, out_size, meta, props);
}

static inline int flb_kube_local_pod_meta_get(struct flb_kube *ctx,
const char **out_buf, size_t *out_size,
struct flb_kube_meta *meta,
struct flb_kube_props *props)
{
int ret;

ret = set_local_pod_meta(ctx, meta);
if (ret != 0) {
return -1;
}

return lookup_pod_meta(ctx, out_buf, out_size, meta, props);
}

static inline int lookup_namespace_meta(struct flb_kube *ctx,
const char **out_buf, size_t *out_size,
struct flb_kube_meta *meta)
{
Expand All @@ -2408,12 +2513,6 @@ static inline int flb_kube_namespace_meta_get(struct flb_kube *ctx,
size_t hash_meta_size;
msgpack_unpacked result;

/* Get metadata from tag or record (cache key is the important one) */
ret = extract_namespace_meta(ctx, tag, tag_len, data, data_size, meta);
if (ret != 0) {
return -1;
}

/* Check if we have some data associated to the cache key */
ret = flb_hash_table_get(ctx->namespace_hash_table,
meta->cache_key, meta->cache_key_len,
Expand Down Expand Up @@ -2463,6 +2562,37 @@ static inline int flb_kube_namespace_meta_get(struct flb_kube *ctx,
return 0;
}

static inline int flb_kube_namespace_meta_get(struct flb_kube *ctx,
const char *tag, int tag_len,
const char *data, size_t data_size,
const char **out_buf, size_t *out_size,
struct flb_kube_meta *meta)
{
int ret;

/* Get metadata from tag or record (cache key is the important one) */
ret = extract_namespace_meta(ctx, tag, tag_len, data, data_size, meta);
if (ret != 0) {
return -1;
}

return lookup_namespace_meta(ctx, out_buf, out_size, meta);
}

static inline int flb_kube_local_namespace_meta_get(struct flb_kube *ctx,
const char **out_buf, size_t *out_size,
struct flb_kube_meta *meta)
{
int ret;

ret = set_local_namespace_meta(ctx, meta);
if (ret != 0) {
return -1;
}

return lookup_namespace_meta(ctx, out_buf, out_size, meta);
}

int flb_kube_meta_get(struct flb_kube *ctx,
const char *tag, int tag_len,
const char *data, size_t data_size,
Expand Down Expand Up @@ -2495,6 +2625,35 @@ int flb_kube_meta_get(struct flb_kube *ctx,
return -1;
}

int flb_kube_meta_get_local(struct flb_kube *ctx,
const char **out_buf, size_t *out_size,
const char **namespace_out_buf,
size_t *namespace_out_size,
struct flb_kube_meta *meta,
struct flb_kube_props *props,
struct flb_kube_meta *namespace_meta)
{
int ret_namespace_meta = -1;
int ret_pod_meta = -1;

if (ctx->namespace_labels == FLB_TRUE || ctx->namespace_annotations == FLB_TRUE) {
ret_namespace_meta = flb_kube_local_namespace_meta_get(ctx, namespace_out_buf,
namespace_out_size,
namespace_meta);
}

if (ctx->namespace_metadata_only == FLB_FALSE) {
ret_pod_meta = flb_kube_local_pod_meta_get(ctx, out_buf, out_size,
meta, props);
Comment thread
cosmo0920 marked this conversation as resolved.
}

if (ret_pod_meta == 0 || ret_namespace_meta == 0) {
return 0;
}

return -1;
}

int flb_kube_meta_release(struct flb_kube_meta *meta)
{
int r = 0;
Expand Down
7 changes: 7 additions & 0 deletions plugins/filter_kubernetes/kube_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ int flb_kube_meta_get(struct flb_kube *ctx,
struct flb_kube_meta *meta,
struct flb_kube_props *props,
struct flb_kube_meta *namespace_meta);
int flb_kube_meta_get_local(struct flb_kube *ctx,
const char **out_buf, size_t *out_size,
const char **namespace_out_buf,
size_t *namespace_out_size,
struct flb_kube_meta *meta,
struct flb_kube_props *props,
struct flb_kube_meta *namespace_meta);
int flb_kube_meta_release(struct flb_kube_meta *meta);
int flb_kube_pod_association_init(struct flb_kube *ctx, struct flb_config *config);
int get_api_server_configmap(struct flb_kube *ctx, const char *namespace, const char *configmap, char **out_buf, size_t *out_size);
Expand Down
18 changes: 18 additions & 0 deletions plugins/filter_kubernetes/kubernetes.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <fluent-bit/flb_info.h>
#include <fluent-bit/flb_filter_plugin.h>
#include <fluent-bit/flb_filter.h>
#include <fluent-bit/flb_input.h>
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_pack.h>
#include <fluent-bit/flb_parser.h>
Expand All @@ -41,6 +42,7 @@
#define MERGE_NONE 0 /* merge unescaped string in temporary buffer */
#define MERGE_PARSED 1 /* merge parsed string (log_buf) */
#define MERGE_MAP 2 /* merge direct binary object (v) */
#define FLB_KUBE_LOCAL_LOGS_INPUT "fluentbit_logs"

struct task_args {
struct flb_kube *ctx;
Expand Down Expand Up @@ -624,6 +626,15 @@ static int cb_kube_filter(const void *data, size_t bytes,
ret = flb_kube_dummy_meta_get(&dummy_cache_buf, &cache_size);
cache_buf = dummy_cache_buf;
}
else if (i_ins != NULL && i_ins->p != NULL &&
strcmp(i_ins->p->name, FLB_KUBE_LOCAL_LOGS_INPUT) == 0) {
ret = flb_kube_meta_get_local(ctx,
&cache_buf, &cache_size,
&namespace_cache_buf,
&namespace_cache_size,
&meta, &props,
&namespace_meta);
}
else {
/* Check if we have some cached metadata for the incoming events */
ret = flb_kube_meta_get(ctx,
Expand Down Expand Up @@ -943,6 +954,13 @@ static struct flb_config_map config_map[] = {
"prefix used in tag by the input plugin"
},

/* Kubernetes Namespace file */
{
FLB_CONFIG_MAP_STR, "kube_namespace_file", FLB_KUBE_NAMESPACE,
0, FLB_TRUE, offsetof(struct flb_kube, namespace_file),
"Kubernetes namespace file"
},

/* Kubernetes Token file */
{
FLB_CONFIG_MAP_STR, "kube_token_file", FLB_KUBE_TOKEN,
Expand Down
6 changes: 6 additions & 0 deletions plugins/in_fluentbit_logs/fluentbit_logs.c
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ static int in_fluentbit_logs_init(struct flb_input_instance *in,
}

ctx->ins = in;
ctx->coll_fd = -1;
ctx->log_encoder = flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_DEFAULT);
if (ctx->log_encoder == NULL) {
flb_plg_error(in, "could not initialize event encoder");
Expand Down Expand Up @@ -172,6 +173,11 @@ static int in_fluentbit_logs_exit(void *data, struct flb_config *config)
return 0;
}

if (ctx->coll_fd >= 0) {
flb_input_collector_delete(ctx->coll_fd, ctx->ins);
ctx->coll_fd = -1;
}

flb_log_pipeline_disable(config);

if (ctx->log_encoder != NULL) {
Expand Down
1 change: 1 addition & 0 deletions tests/runtime/data/kubernetes/local/namespace
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
default
1 change: 1 addition & 0 deletions tests/runtime/data/kubernetes/local/token
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
test-token
Loading
Loading