Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
83 commits
Select commit Hold shift + click to select a range
44ae815
Add agent info endpoint and implement GetAgentInfo method
zestyoreo Dec 19, 2025
1d91351
Enhance agent type support, validation and corresponding test
zestyoreo Dec 19, 2025
9a2d90c
Enhance MockOperator with GetAgentInfo method and add corresponding t…
zestyoreo Dec 19, 2025
0a44bfa
Implement Fluent Bit integration for CtrlB agent
zestyoreo Dec 22, 2025
3b0158d
Enhance Fluent Bit integration with Docker support and configuration …
zestyoreo Dec 22, 2025
856ed9d
Update Fluent Bit configuration with additional HTTP server settings
zestyoreo Dec 22, 2025
c39e41c
Add GetMetrics method to Adapter interface and implement in OTELAdapter
zestyoreo Dec 22, 2025
eda05b6
Enhance agent registration and metrics handling for multiple agent types
zestyoreo Dec 23, 2025
c92cacb
Implement graceful shutdown for agent queue and enhance HTTP client t…
zestyoreo Dec 23, 2025
97c3016
Update Fluent Bit configuration file references in Dockerfile and README
zestyoreo Dec 23, 2025
776b7f9
Enhance OTEL configuration with gRPC and HTTP endpoints
zestyoreo Dec 23, 2025
2e153e9
Refactor metrics fetching and enhance error handling in agent status …
zestyoreo Dec 24, 2025
d4961c9
Refactor configuration compilation for Fluent Bit and OTEL agents
zestyoreo Dec 26, 2025
2cd210e
Refactor configuration compilation for agents
zestyoreo Dec 26, 2025
7332943
Add configuration compilation for Fluent Bit and OTEL agents
zestyoreo Dec 29, 2025
cbcc3b0
Implement heartbeat management for agent metrics reporting
zestyoreo Dec 30, 2025
be623bf
Implement jitter in heartbeat interval for improved metric reporting
zestyoreo Dec 30, 2025
1f1ddd3
Update go.mod to remove indirect comments for Prometheus dependencies
zestyoreo Dec 30, 2025
7dbeef9
Implement heartbeat handling and staleness checking for agents
zestyoreo Dec 30, 2025
690dbd1
Refactor agent service tests to use MockMetricsRepository
zestyoreo Dec 30, 2025
dd782a2
Refactor Fluent Bit configuration and metrics handling
zestyoreo Dec 30, 2025
cbab4b0
Remove unused metric extraction functions from heartbeat.go for impro…
zestyoreo Dec 31, 2025
390f68c
Add Fluent Bit configuration schemas and UI definitions
zestyoreo Dec 31, 2025
4affdff
Add comprehensive tests for configuration compilation and alias gener…
zestyoreo Jan 1, 2026
95ce889
Add test runner scripts for agent and backend
zestyoreo Jan 2, 2026
cc8d085
Enhance agent and pipeline compatibility handling
zestyoreo Jan 2, 2026
05c4312
Update schema manifest to include new Fluent Bit input types and enha…
zestyoreo Jan 3, 2026
cda3119
Introduced a channel to signal fatal errors from goroutines, enhancin…
zestyoreo Jan 3, 2026
2947e0f
Enhance agent registration and pipeline synchronization
zestyoreo Jan 5, 2026
00c7256
Refactor Fluent Bit configuration pipeline structure
zestyoreo Jan 5, 2026
8974db1
Merge branch 'staging' into feat/fluent-bit
zestyoreo Jan 6, 2026
d092f1e
ENHANCE | Update Fluent Bit Dockerfile Configuration
zestyoreo Jan 6, 2026
a347a78
ENHANCE | Improve Environment Variable Handling and Database Queries
zestyoreo Jan 7, 2026
a5d8a89
REMOVE | Delete Fluent Bit Integration Documentation and Implementati…
zestyoreo Jan 7, 2026
c4f5c29
REFINE | Simplify Metric Extraction Logic in Heartbeat Module
zestyoreo Jan 7, 2026
9825890
ENHANCE | Add Type Field to PipelineInfoWithAgent and Update SQL Query
zestyoreo Jan 7, 2026
87fa58a
FEAT | Add HTTP ctrlb Output Configuration and UI Schema
zestyoreo Jan 8, 2026
debf950
FEAT | Add Fluent Bit Installation Commands and Agent Type Selection
zestyoreo Jan 7, 2026
3820734
REFC | Refactored Overall View Pipe Line Flow (#115)
Ranitpal-003-07 Jan 7, 2026
3fafc57
fixed page blank issue
Ranitpal-003-07 Jan 7, 2026
8604fdd
refactored add pipline form
Ranitpal-003-07 Jan 7, 2026
0a7c7c4
refact add pipeline
Ranitpal-003-07 Jan 7, 2026
a9ed60d
show configure button only after getting success heartbeat
Ranitpal-003-07 Jan 7, 2026
4980d85
fixed deployment issue for fluentbit
Ranitpal-003-07 Jan 7, 2026
50e4164
fixed healthchart and pipeline overview
Ranitpal-003-07 Jan 7, 2026
2d5fbcf
modified review panel to see different type of changes in different s…
Ranitpal-003-07 Jan 7, 2026
914251f
added redirection to pipeline edit page
Ranitpal-003-07 Jan 8, 2026
187b573
fix dorpdown options based on agent type
Ranitpal-003-07 Jan 8, 2026
af0538a
refactor JSON schema properties to use dot notation for database and …
zestyoreo Jan 9, 2026
8368abe
remove unused properties from tail_input JSON schemas
zestyoreo Jan 9, 2026
b0dcb9c
add Fluent Bit component name mapping and update GetPluginName function
zestyoreo Jan 9, 2026
729c7ab
Merge pull request #116 from ctrlb-hq/feat/fluent-bit-frontend
zestyoreo Jan 9, 2026
8b40cd3
Reduce shutdown timeout from 20 seconds to 10 seconds and implement c…
zestyoreo Jan 13, 2026
15d03a7
Remove deprecated properties from OpenTelemetry and Prometheus scrape…
zestyoreo Jan 13, 2026
244f344
Add default parsers for Fluent Bit configuration and refactor agent c…
zestyoreo Jan 13, 2026
99afea3
Update Fluent Bit JSON schemas for CtrlB and Syslog configurations
zestyoreo Jan 13, 2026
30393d1
Remove OpenTelemetry output JSON schemas and associated component map…
zestyoreo Jan 13, 2026
c7c5726
Add installation script for CtrlB Collector with Fluent Bit backend
zestyoreo Jan 14, 2026
5dea498
Update Fluent Bit installation script to correct download URL structure
zestyoreo Jan 14, 2026
f84d558
Update Fluent Bit installation script to use version variable for dow…
zestyoreo Jan 14, 2026
b4ce474
Update Fluent Bit installation script to download specific collector …
zestyoreo Jan 14, 2026
9fc5b6d
Update agent installation script to download specific collector versi…
zestyoreo Jan 14, 2026
7c1e307
Enhance agent installation script with clearer download messages
zestyoreo Jan 14, 2026
f4fd6cb
Refactor agent installation scripts to use version variable for consi…
zestyoreo Jan 14, 2026
385c8b7
Update HTTP CtrlB output JSON schema to refine log format and timesta…
zestyoreo Jan 15, 2026
f1fda8f
Enhance Frontend Pipeline Service with Agent Management Features
zestyoreo Jan 15, 2026
889ffba
Add UpdateAgentIP functionality to Frontend Agent Management
zestyoreo Jan 15, 2026
af2fa89
Refactor DeletePipelineDialog to include onPipelineDeleted callback
zestyoreo Jan 15, 2026
308fbf8
Add Fluent Bit configuration validation and operator integration
zestyoreo Jan 15, 2026
5e684fd
minor fix
zestyoreo Jan 15, 2026
c597dc1
Refactor GetHealthMetricsForGraph and GetRateMetricsForGraph queries …
zestyoreo Jan 15, 2026
fb85a77
Update JSON schemas for stdout output
zestyoreo Jan 16, 2026
baf6df1
Add Fluent Bit configuration validation for input and filter components
zestyoreo Jan 20, 2026
1936fed
Enhance Fluent Bit configuration handling with nested object flatteni…
zestyoreo Jan 21, 2026
75181a9
Update collector version to v2.0.1 in installation scripts
zestyoreo Jan 21, 2026
b286105
Refactor PluginDropdownOptions and HealthChart for improved data stru…
zestyoreo Jan 21, 2026
6f3dd80
Update collector and backend version to v0.2.1 in installation scripts
zestyoreo Jan 21, 2026
d1eef88
Remove deprecated fields from tail_input JSON schemas and update Flue…
zestyoreo Jan 21, 2026
25f653c
Add SKIP_CONFIG_VALIDATION feature and enhance Fluent Bit config logging
zestyoreo Jan 21, 2026
203dde6
Remove unused properties from tail_input JSON schemas to streamline c…
zestyoreo Jan 21, 2026
9864073
Update Fluent Bit configuration for auth log input
zestyoreo Jan 21, 2026
22db36f
Refactor Fluent Bit installation script to check for existing install…
zestyoreo Jan 21, 2026
c853719
Add SKIP_CONFIG_VALIDATION to Fluent Bit installation script
zestyoreo Jan 21, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions agent/Dockerfile.fluentbit
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Use the official Golang image to build the binary.
FROM golang:1.23-alpine AS builder

# Set the Current Working Directory inside the container
WORKDIR /app

# Copy go mod and sum files
COPY go.mod go.sum ./
RUN go mod download

# Copy the source code.
COPY . .

# Build the Go app. Replace "myapp" with your binary name.
RUN go build -o ctrlb-agent ./cmd/ctrlb_collector/main.go

# Get Fluent Bit binary
FROM fluent/fluent-bit:3.2 AS fluentbit

# Use Debian slim as base for compatibility with Fluent Bit
FROM debian:bookworm-slim

# Install required runtime dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates \
libyaml-0-2 \
libsystemd0 \
libcurl4 \
libsasl2-2 \
libpq5 \
libgcc-s1 \
&& rm -rf /var/lib/apt/lists/*

# Create directory structure
RUN mkdir -p /opt/fluent-bit/bin

# Copy Fluent Bit binary and libraries from official image
COPY --from=fluentbit /fluent-bit /opt/fluent-bit

# Add Fluent Bit to PATH
ENV PATH="/opt/fluent-bit/bin:${PATH}"

WORKDIR /app

# Copy the binary from the builder stage.
COPY --from=builder /app/ctrlb-agent .
COPY --from=builder /app/internal/config/fluentbit-config.yaml ./config.yaml

# Command to run the executable
CMD ["./ctrlb-agent"]

105 changes: 97 additions & 8 deletions agent/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# 🛰️ CTRLB Agent

The **CTRLB Agent** is a lightweight wrapper built on top of the OpenTelemetry Collector. Once installed, it connects to the CTRLB backend, shares its runtime status, and receives its initial configuration. The agent can be managed remotely through a set of defined HTTP endpoints.
The **CTRLB Agent** is a flexible telemetry collection agent that supports multiple backends including **OpenTelemetry Collector** and **Fluent Bit**. Once installed, it connects to the CTRLB backend, shares its runtime status, and receives its initial configuration. The agent can be managed remotely through a set of defined HTTP endpoints.

---

Expand All @@ -17,10 +17,92 @@ The **CTRLB Agent** is a lightweight wrapper built on top of the OpenTelemetry C

The backend provides a platform-specific installation script. Once executed, the agent starts and reaches out to the backend for initial configuration.

You can customize agent behavior using the following command-line flags:
You can customize agent behavior using environment variables:

- `--config`: Path to the agent configuration file. Default is `./config.yaml`
- `--backend`: URL of the backend server. Default is `http://pipeline.ctrlb.ai:8096`
- `AGENT_TYPE`: Type of telemetry backend to use (`otel`, `fluent-bit`, `fluentbit`, `fb`). Default is `otel`
- `AGENT_CONFIG_PATH`: Path to the agent configuration file. Default is `./config.yaml`
- `BACKEND_URL`: URL of the backend server. Default is `http://pipeline.ctrlb.ai:8096`
- `PIPELINE_NAME`: Name of the pipeline (optional)
- `STARTED_BY`: User who started the agent (optional)

### Supported Backends

#### OpenTelemetry Collector (Default)
The OpenTelemetry Collector runs as an embedded library within the agent process.

```bash
export AGENT_TYPE=otel
export AGENT_CONFIG_PATH=./otel-config.yaml
./ctrlb_collector
```

#### Fluent Bit
Fluent Bit runs as a child process controlled by the agent via HTTP APIs.

```bash
export AGENT_TYPE=fluent-bit
export AGENT_CONFIG_PATH=./fluentbit-config.yaml
./ctrlb_collector
```

See [Fluent Bit Integration Guide](../docs/collector/fluentbit-integration.md) for detailed setup instructions.

---

## 🐳 Docker Deployment

### Building the Fluent Bit Agent Image

Build the Docker image with Fluent Bit support:

```bash
docker build -f Dockerfile.fluentbit -t ctrlb-agent-fluentbit:latest .
```

### Running the Fluent Bit Agent Container

Run the agent container with Fluent Bit:

```bash
docker run -d \
--name ctrlb-agent-fluentbit \
-p 3421:3421 \
-v $(pwd)/internal/config:/app/internal/config \
-v /var/log:/var/log \
-e BACKEND_URL=http://host.docker.internal:8096 \
-e AGENT_TYPE=fluent-bit \
-e AGENT_CONFIG_PATH=./internal/config/config.yaml \
-e PORT=3421 \
ctrlb-agent-fluentbit:latest
```

**Configuration Options:**

- `-p 3421:3421` – Exposes the agent API port
- `-v $(pwd)/internal/config:/app/internal/config` – Mounts config directory for dynamic updates
- `-v /var/log:/var/log` – Mounts host logs for collection (optional)
- `-e BACKEND_URL` – Backend server URL (use `host.docker.internal` for local development)
- `-e AGENT_TYPE=fluent-bit` – Specifies Fluent Bit as the telemetry backend
- `-e AGENT_CONFIG_PATH` – Path to Fluent Bit configuration file
- `-e PORT=3421` – Agent API port

**Useful Commands:**

```bash
# View agent logs
docker logs -f ctrlb-agent-fluentbit

# Stop the agent
docker stop ctrlb-agent-fluentbit

# Remove the container
docker rm ctrlb-agent-fluentbit

# Interactive mode (for testing)
docker run -it --rm \
-e BACKEND_URL=http://host.docker.internal:8096 \
ctrlb-agent-fluentbit:latest
```

---

Expand All @@ -32,9 +114,9 @@ All endpoints are served under the base path: `/agent/v1`

These endpoints control the agent's operational state:

- `POST /agent/v1/start` – Start the OpenTelemetry Collector instance without restarting the agent process.
- `POST /agent/v1/stop` – Stop the OpenTelemetry Collector instance while keeping the agent process alive.
- `POST /agent/v1/shutdown` – Gracefully shut down the agent.
- `POST /agent/v1/start` – Start the telemetry collector (OTEL/Fluent Bit) without restarting the agent process.
- `POST /agent/v1/stop` – Stop the telemetry collector while keeping the agent process alive.
- `POST /agent/v1/shutdown` – Gracefully shut down the agent and collector.

### Configuration

Expand All @@ -49,9 +131,16 @@ These endpoints manage the agent’s configuration:
## 🛠️ Tech Stack

- **Go** – Core implementation language.
- **OpenTelemetry Collector** – Base collector engine.
- **OpenTelemetry Collector** – Embedded telemetry collector (default).
- **Fluent Bit** – Lightweight log processor (alternative backend).
- **HTTP + Gorilla Mux** – Communication protocol and routing.

## 📚 Documentation

- [Fluent Bit Integration Guide](../docs/collector/fluentbit-integration.md)
- [Fluent Bit Quick Start](../docs/collector/fluentbit-quickstart.md)
- [Architecture Overview](../docs/architecture.md)

---

## 📄 License
Expand Down
87 changes: 69 additions & 18 deletions agent/cmd/ctrlb_collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"net/http"
"os"
"os/signal"
"slices"
"strconv"
"sync"
"syscall"
"time"
Expand Down Expand Up @@ -48,6 +50,38 @@ func main() {
if constants.STARTED_BY == "" {
logger.Logger.Info("STARTED_BY environment variable is not set. Using default value: empty string.")
}

constants.PORT = os.Getenv("PORT")
if constants.PORT == "" {
logger.Logger.Info("PORT environment variable is not set. Using default value: 3421.")
constants.PORT = "3421"
}

constants.AGENT_TYPE = os.Getenv("AGENT_TYPE")
if constants.AGENT_TYPE == "" {
logger.Logger.Info("AGENT_TYPE environment variable is not set. Using default value: otel.")
constants.AGENT_TYPE = "otel"
}

isValid := slices.Contains(constants.SUPPORTED_AGENT_TYPES, constants.AGENT_TYPE)
if !isValid {
logger.Logger.Sugar().Infof("Invalid AGENT_TYPE: %s. Using default value: otel.", constants.AGENT_TYPE)
constants.AGENT_TYPE = "otel"
}

// Configure heartbeat interval
if heartbeatIntervalEnv := os.Getenv("HEARTBEAT_INTERVAL_SEC"); heartbeatIntervalEnv != "" {
if interval, err := strconv.Atoi(heartbeatIntervalEnv); err == nil && interval > 0 {
constants.HEARTBEAT_INTERVAL_SEC = interval
}
}

// SKIP_CONFIG_VALIDATION: Optional, skip fluent-bit binary config validation
if skipValidation := os.Getenv("SKIP_CONFIG_VALIDATION"); skipValidation == "true" || skipValidation == "1" {
constants.SKIP_CONFIG_VALIDATION = true
logger.Logger.Info("SKIP_CONFIG_VALIDATION is enabled. Skipping fluent-bit binary config validation.")
}

// Check if config file exists
if _, err := os.Stat(constants.AGENT_CONFIG_PATH); err != nil {
logger.Logger.Sugar().Errorf("Config file doesn't exist at location: %v", constants.AGENT_CONFIG_PATH)
Expand All @@ -70,10 +104,14 @@ func main() {

version, err := adapter.GetVersion()
if err != nil {
logger.Logger.Sugar().Fatalf("Error while fetching agent version: %v", err)
} else {
constants.AGENT_VERSION = version
logger.Logger.Sugar().Errorf("Error while fetching agent version: %v", err)
adapter.GracefulShutdown()
os.Exit(1)
}
constants.AGENT_VERSION = version

// Channel to signal fatal errors from goroutines
fatalErrChan := make(chan error, 2)

// Call Backend server which will be informed about agent being started
wg.Add(1)
Expand All @@ -85,19 +123,26 @@ func main() {
}
serverStartConfig, err := client.InformBackendServerStart(sys, httpClient)
if err != nil {
logger.Logger.Sugar().Fatalf("Failed to register with backend server: %v", err)
} else {
err = config.SaveToYAML(serverStartConfig, constants.AGENT_CONFIG_PATH)
if err != nil {
logger.Logger.Sugar().Fatalf("Error writing config to file: %v", err)
}
logger.Logger.Info("Successfully registered with the backend server")
logger.Logger.Sugar().Errorf("Failed to register with backend server: %v", err)
fatalErrChan <- err
return
}
err = config.SaveToYAML(serverStartConfig, constants.AGENT_CONFIG_PATH)
if err != nil {
logger.Logger.Sugar().Errorf("Error writing config to file: %v", err)
fatalErrChan <- err
return
}
logger.Logger.Info("Successfully registered with the backend server")
}()

operator_service := *operators.NewOperatorService(adapter)
// Start heartbeat manager to periodically send status/metrics to backend
heartbeatManager := client.NewHeartbeatManager(constants.HEARTBEAT_INTERVAL_SEC)
heartbeatManager.Start()

handler := api.NewRouter(&operator_service)
operatorService := operators.NewOperatorService(adapter, constants.AGENT_TYPE)

handler := api.NewRouter(operatorService)

server := &http.Server{
Addr: ":" + constants.PORT,
Expand All @@ -113,19 +158,25 @@ func main() {
logger.Logger.Sugar().Infof("Client started at port: %s", constants.PORT)
err := server.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
logger.Logger.Sugar().Fatalf("Failed to start Server: %v", err)
logger.Logger.Sugar().Errorf("Failed to start Server: %v", err)
fatalErrChan <- err
}
}()

// Set up signal handling for graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

// Wait for termination signal
<-sigChan

logger.Logger.Info("Received termination signal. Initiating graceful shutdown...")
// Wait for termination signal or fatal error
select {
case <-sigChan:
logger.Logger.Info("Received termination signal. Initiating graceful shutdown...")
case err := <-fatalErrChan:
logger.Logger.Sugar().Errorf("Fatal error occurred: %v. Initiating graceful shutdown...", err)
}

heartbeatManager.Stop()
shutdown.ShutdownServer()
adapter.GracefulShutdown()

wg.Wait()
}
4 changes: 2 additions & 2 deletions agent/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ require (
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/prometheus-community/windows_exporter v0.27.2 // indirect
github.com/prometheus/client_golang v1.21.1 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.62.0 // indirect
github.com/prometheus/client_model v0.6.1
github.com/prometheus/common v0.62.0
github.com/prometheus/procfs v0.15.1 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/rs/cors v1.11.1 // indirect
Expand Down
5 changes: 5 additions & 0 deletions agent/internal/adapters/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,16 @@ type Adapter interface {
GracefulShutdown() error
GetVersion() (string, error)
ValidateConfigInMemory(data *map[string]any) error
ValidateConfigOnDisk(data *map[string]any) error
GetMetrics() (map[string]any, error)
}

func NewAdapter(wg *sync.WaitGroup, agentType string) (Adapter, error) {
if agentType == "otel" || agentType == "" {
return NewOTELAdapter(wg), nil
}
if agentType == "fluent-bit" || agentType == "fluentbit" || agentType == "fb" {
return NewFluentBitAdapter(wg), nil
}
return nil, fmt.Errorf("unsupported agent type: %s", agentType)
}
12 changes: 11 additions & 1 deletion agent/internal/adapters/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ func TestNewAdapter_OTEL(t *testing.T) {
assert.True(t, ok, "adapter should be of type OTELAdapter")
}

func TestNewAdapter_FLUENTBIT(t *testing.T) {
wg := &sync.WaitGroup{}

adapter, err := adapters.NewAdapter(wg, "fluent-bit")
assert.NoError(t, err)
assert.NotNil(t, adapter)
_, ok := adapter.(*adapters.FluentBitAdapter)
assert.True(t, ok, "adapter should be of type FluentBitAdapter")
}

func TestNewAdapter_DefaultEmptyType(t *testing.T) {
wg := &sync.WaitGroup{}

Expand All @@ -31,7 +41,7 @@ func TestNewAdapter_DefaultEmptyType(t *testing.T) {
func TestNewAdapter_UnsupportedType(t *testing.T) {
wg := &sync.WaitGroup{}

adapter, err := adapters.NewAdapter(wg, "fluentbit")
adapter, err := adapters.NewAdapter(wg, "unsupported-type")
assert.Error(t, err)
assert.Nil(t, adapter)
assert.Contains(t, err.Error(), "unsupported agent type")
Expand Down
Loading