Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions include/fluent-bit/flb_input.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@
/* Owner-side ingress queue status */
#define FLB_INPUT_INGRESS_BUSY -2

/* 1 second in nsec unit */
#define FLB_NSEC_IN_SEC 1000000000ULL

struct flb_input_instance;
struct flb_http_server_config;

Expand Down Expand Up @@ -437,6 +440,33 @@ struct flb_input_instance {
struct cmt_counter *cmt_ring_buffer_retries;
struct cmt_counter *cmt_ring_buffer_retry_failures;

/* flow rate metrics */
struct cmt_gauge *cmt_rate_bytes; /* metric: input_rate_bytes/window */
struct cmt_gauge *cmt_rate_records;/* metric: input_rate_records/window */
struct cmt_gauge *cmt_rate_gate_limited; /* metric: input rate gate */
struct cmt_gauge *cmt_rate_gate_busy_chunks;
struct cmt_gauge *cmt_rate_gate_retry_attempts;

/*
* Input rate accounting state
* ---------------------------
*/
uint64_t rate_window_start;
/* Window length used by rate accounting, in nanoseconds */
uint64_t rate_window_size;
size_t rate_window_bytes;
size_t rate_window_records;
double rate_bytes;
double rate_records;
int rate_gate_enabled;
int rate_gate_status;
int rate_gate_use_backpressure;
double rate_gate_resume_ratio;
size_t rate_gate_max_bytes;
size_t rate_gate_max_records;
size_t rate_gate_busy_chunks;
size_t rate_gate_retry_attempts;

/*
* Indexes for generated chunks: simple hash tables that keeps the latest
* available chunks for writing data operations. This optimizes the
Expand Down Expand Up @@ -740,6 +770,19 @@ static inline int flb_input_buf_paused(struct flb_input_instance *i)
return FLB_FALSE;
}

static inline int flb_input_paused(struct flb_input_instance *i)
{
if (flb_input_buf_paused(i) == FLB_TRUE) {
return FLB_TRUE;
}

if (i->rate_gate_status == FLB_INPUT_PAUSED) {
return FLB_TRUE;
}

return FLB_FALSE;
}

static inline int flb_input_config_map_set(struct flb_input_instance *ins,
void *context)
{
Expand Down Expand Up @@ -852,6 +895,12 @@ int flb_input_test_pause_resume(struct flb_input_instance *ins, int sleep_second
int flb_input_pause(struct flb_input_instance *ins);
int flb_input_pause_all(struct flb_config *config);
int flb_input_resume(struct flb_input_instance *ins);
void flb_input_rate_update(struct flb_input_instance *ins,
uint64_t timestamp,
size_t records,
size_t bytes);
int flb_input_rate_gate_protect(struct flb_input_instance *ins);
void flb_input_rate_gate_maybe_resume(struct flb_input_instance *ins);

const char *flb_input_name(struct flb_input_instance *ins);
int flb_input_name_exists(const char *name, struct flb_config *config);
Expand Down
Loading
Loading