Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 84 additions & 10 deletions cmd/cmd_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,26 @@ func init() {
runCmd.Flags().String("config", "config.json", "Configuration file path")
runCmd.Flags().String("geo-city-url", "https://lanterngeo.lantern.io/GeoLite2-City.mmdb.tar.gz", "URL for downloading GeoLite2-City database")
runCmd.Flags().String("city-database-name", "GeoLite2-City.mmdb", "Filename for storing GeoLite2-City database")
runCmd.Flags().String("datacap-url", "", "Datacap server URL")
runCmd.Flags().String("datacap-url", "", "Datacap sidecar URL (legacy mode)")
runCmd.Flags().String("datacap-grpc-api", "", "Datacap cloud gRPC API address (direct mode, mutually exclusive with --datacap-url)")
runCmd.Flags().Duration("datacap-batch-interval", 30*time.Second, "Datacap batch upload interval (direct mode)")
runCmd.Flags().Duration("datacap-cache-ttl", 5*time.Minute, "Datacap device state cache TTL (direct mode)")
runCmd.Flags().String("datacap-ca-cert", "", "Path to CA cert for datacap mTLS")
runCmd.Flags().String("datacap-client-cert", "", "Path to client cert for datacap mTLS")
runCmd.Flags().String("datacap-client-key", "", "Path to client key for datacap mTLS")
runCmd.Flags().String("proxy-info", "", "Path to proxy info INI file")
}

type datacapFlags struct {
URL string
GRPCAPI string
CACertPath string
ClientCertPath string
ClientKeyPath string
BatchInterval time.Duration
CacheTTL time.Duration
}

var runCmd = &cobra.Command{
Use: "run",
Short: "Run service",
Expand All @@ -42,11 +58,37 @@ var runCmd = &cobra.Command{
if err != nil {
return fmt.Errorf("get config flag: %w", err)
}
datacapURL, err := cmd.Flags().GetString("datacap-url")
if err != nil {
dcFlags := datacapFlags{}
if dcFlags.URL, err = cmd.Flags().GetString("datacap-url"); err != nil {
return fmt.Errorf("get datacap-url flag: %w", err)
}
return run(path, datacapURL)
if dcFlags.GRPCAPI, err = cmd.Flags().GetString("datacap-grpc-api"); err != nil {
return fmt.Errorf("get datacap-grpc-api flag: %w", err)
}
if dcFlags.GRPCAPI == "" {
dcFlags.GRPCAPI = os.Getenv("DATACAP_GRPC_API")
}
if dcFlags.CACertPath, err = cmd.Flags().GetString("datacap-ca-cert"); err != nil {
return fmt.Errorf("get datacap-ca-cert flag: %w", err)
}
if dcFlags.ClientCertPath, err = cmd.Flags().GetString("datacap-client-cert"); err != nil {
return fmt.Errorf("get datacap-client-cert flag: %w", err)
}
if dcFlags.ClientKeyPath, err = cmd.Flags().GetString("datacap-client-key"); err != nil {
return fmt.Errorf("get datacap-client-key flag: %w", err)
}
if dcFlags.BatchInterval, err = cmd.Flags().GetDuration("datacap-batch-interval"); err != nil {
return fmt.Errorf("get datacap-batch-interval flag: %w", err)
}
if dcFlags.CacheTTL, err = cmd.Flags().GetDuration("datacap-cache-ttl"); err != nil {
return fmt.Errorf("get datacap-cache-ttl flag: %w", err)
}

if dcFlags.URL != "" && dcFlags.GRPCAPI != "" {
return fmt.Errorf("--datacap-url and --datacap-grpc-api are mutually exclusive")
}

return run(path, dcFlags)
},
}

Expand All @@ -62,7 +104,7 @@ func readConfig(path string) (option.Options, error) {
return options, nil
}

func create(configPath string, datacapURL string) (*box.Box, context.CancelFunc, error) {
func create(configPath string, dcFlags datacapFlags) (*box.Box, context.CancelFunc, error) {
options, err := readConfig(configPath)
if err != nil {
return nil, nil, fmt.Errorf("read config: %w", err)
Expand Down Expand Up @@ -91,11 +133,41 @@ func create(configPath string, datacapURL string) (*box.Box, context.CancelFunc,
log.Info("Metric Tracking Enabled")
}

if datacapURL != "" {
log.Info("Datacap enabled. Creating tracker...")
if dcFlags.GRPCAPI != "" {
log.Info("Datacap direct mode enabled (gRPC API: ", dcFlags.GRPCAPI, ")")
var mtls *datacapMTLSConfig
if dcFlags.ClientCertPath != "" {
mtls = &datacapMTLSConfig{
CACertPath: dcFlags.CACertPath,
ClientCertPath: dcFlags.ClientCertPath,
ClientKeyPath: dcFlags.ClientKeyPath,
}
}
grpcConn, err := newDataCapGRPCConn(dcFlags.GRPCAPI, mtls)
if err != nil {
cancel()
return nil, nil, fmt.Errorf("create datacap gRPC connection: %w", err)
}
api := newGRPCDataCapAPI(grpcConn)
store := datacap.NewDeviceUsageStore(api, datacap.StoreOptions{
BatchInterval: dcFlags.BatchInterval,
CacheTTL: dcFlags.CacheTTL,
}, log.StdLogger())
tracker := datacap.NewDatacapTrackerWithStore(store, 10*time.Second, log.StdLogger())
tracker.Start(ctx)
clientCtxMgr.AppendTracker(tracker)
go func() {
<-ctx.Done()
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer shutdownCancel()
tracker.Stop(shutdownCtx)
grpcConn.Close()
}()
} else if dcFlags.URL != "" {
log.Info("Datacap sidecar mode enabled (URL: ", dcFlags.URL, ")")
datacapTracker, err := datacap.NewDatacapTracker(
datacap.Options{
URL: datacapURL,
URL: dcFlags.URL,
ReportInterval: "10s",
},
log.StdLogger(),
Expand All @@ -105,6 +177,8 @@ func create(configPath string, datacapURL string) (*box.Box, context.CancelFunc,
return nil, nil, fmt.Errorf("create datacap tracker: %w", err)
}
clientCtxMgr.AppendTracker(datacapTracker)
} else {
log.Warn("Datacap not configured, datacap tracking disabled")
}

osSignals := make(chan os.Signal, 1)
Expand Down Expand Up @@ -140,13 +214,13 @@ func closeMonitor(ctx context.Context) {
log.Fatal("sing-box did not close!")
}

func run(configPath string, datacapURL string) error {
func run(configPath string, dcFlags datacapFlags) error {
log.Info("build info: version ", version, ", commit ", commit)
osSignals := make(chan os.Signal, 1)
signal.Notify(osSignals, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP)
defer signal.Stop(osSignals)
for {
instance, cancel, err := create(configPath, datacapURL)
instance, cancel, err := create(configPath, dcFlags)
if err != nil {
return err
}
Expand Down
123 changes: 123 additions & 0 deletions cmd/grpc_datacap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package main

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"os"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

"github.com/getlantern/lantern-box/tracker/datacap"
"github.com/getlantern/lantern-box/tracker/datacap/dcpb"
)

// grpcDataCapAPI implements datacap.DataCapAPI using generated gRPC types.
// The proto types in dcpb/ mirror lantern-cloud's DataCapService definition,
// keeping lantern-cloud out of lantern-box's dependency graph.
type grpcDataCapAPI struct {
client dcpb.DataCapServiceClient
}

func newGRPCDataCapAPI(conn grpc.ClientConnInterface) *grpcDataCapAPI {
return &grpcDataCapAPI{
client: dcpb.NewDataCapServiceClient(conn),
}
}

func (g *grpcDataCapAPI) ReportUsage(ctx context.Context, batch *datacap.UsageBatch) (*datacap.ReportUsageResult, error) {
records := make([]*dcpb.DataCapUsage, 0, len(batch.Records))
for _, r := range batch.Records {
records = append(records, &dcpb.DataCapUsage{
DeviceId: r.DeviceID,
BytesUsed: r.BytesUsed,
CapLimit: r.CapLimit,
Platform: r.Platform,
CountryCode: r.CountryCode,
})
}

resp, err := g.client.ReportUsage(ctx, &dcpb.DataCapUsageBatch{Records: records})
if err != nil {
return nil, fmt.Errorf("gRPC ReportUsage: %w", err)
}

result := &datacap.ReportUsageResult{
Results: make([]datacap.UsageResultEntry, 0, len(resp.GetResults())),
}
if resp != nil {
for _, r := range resp.Results {
result.Results = append(result.Results, datacap.UsageResultEntry{
DeviceID: r.DeviceId,
Success: r.Success,
Error: r.Error,
})
}
}
return result, nil
}

func (g *grpcDataCapAPI) SyncDeviceState(ctx context.Context, deviceID string) (*datacap.DeviceState, error) {
resp, err := g.client.SyncDeviceState(ctx, &dcpb.DataCapSyncRequest{
DeviceId: deviceID,
})
if err != nil {
return nil, fmt.Errorf("gRPC SyncDeviceState: %w", err)
}

return &datacap.DeviceState{
DeviceID: resp.DeviceId,
BytesUsed: resp.BytesUsed,
CapLimit: resp.CapLimit,
ExpiryTime: resp.ExpiryTime,
CountryCode: resp.CountryCode,
Platform: resp.Platform,
}, nil
}

// datacapMTLSConfig holds paths to mTLS credential files for the datacap gRPC connection.
type datacapMTLSConfig struct {
CACertPath string
ClientCertPath string
ClientKeyPath string
}

// newDataCapGRPCConn creates a gRPC client connection for the datacap cloud API.
// If mtls is provided, the connection uses mTLS with the given client certificate.
// Otherwise, it uses standard TLS (for local dev).
func newDataCapGRPCConn(addr string, mtls *datacapMTLSConfig) (*grpc.ClientConn, error) {
var tlsCfg tls.Config

if mtls != nil {
if mtls.CACertPath == "" || mtls.ClientCertPath == "" || mtls.ClientKeyPath == "" {
return nil, fmt.Errorf("datacap mTLS requires all three paths: --datacap-ca-cert, --datacap-client-cert, --datacap-client-key")
}

// Load client certificate
clientCert, err := tls.LoadX509KeyPair(mtls.ClientCertPath, mtls.ClientKeyPath)
if err != nil {
return nil, fmt.Errorf("loading datacap client cert: %w", err)
}
tlsCfg.Certificates = []tls.Certificate{clientCert}

// Load CA cert to verify the server's self-signed certificate.
caCert, err := os.ReadFile(mtls.CACertPath)
if err != nil {
return nil, fmt.Errorf("reading datacap CA cert: %w", err)
}
Comment thread
myleshorton marked this conversation as resolved.
caPool := x509.NewCertPool()
if !caPool.AppendCertsFromPEM(caCert) {
return nil, fmt.Errorf("failed to parse datacap CA cert")
}
tlsCfg.RootCAs = caPool
}

creds := credentials.NewTLS(&tlsCfg)
conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(creds))
if err != nil {
return nil, fmt.Errorf("dialing datacap gRPC server %s: %w", addr, err)
}
return conn, nil
}
52 changes: 52 additions & 0 deletions tracker/datacap/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package datacap

import "context"

// ReportSink is the interface for reporting datacap consumption.
// Both the HTTP Client (legacy sidecar mode) and DeviceUsageStore (direct mode) implement this.
type ReportSink interface {
Report(ctx context.Context, report *DataCapReport) (*DataCapStatus, error)
}

// DataCapAPI abstracts the cloud API for datacap operations.
// The concrete gRPC implementation lives in cmd/ to isolate the proto dependency.
type DataCapAPI interface {
ReportUsage(ctx context.Context, batch *UsageBatch) (*ReportUsageResult, error)
SyncDeviceState(ctx context.Context, deviceID string) (*DeviceState, error)
}

// UsageBatch is a batch of usage records to upload to the cloud API.
type UsageBatch struct {
Records []UsageRecord
}

// UsageRecord is a single device's usage to report.
type UsageRecord struct {
DeviceID string
BytesUsed int64
CapLimit int64
Platform string
CountryCode string
}

// ReportUsageResult contains per-device results from a batch upload.
type ReportUsageResult struct {
Results []UsageResultEntry
}

// UsageResultEntry is the result of reporting usage for a single device.
type UsageResultEntry struct {
DeviceID string
Success bool
Error string
}

// DeviceState is the cloud API's view of a device's datacap state.
type DeviceState struct {
DeviceID string
BytesUsed int64
CapLimit int64
ExpiryTime int64
CountryCode string
Platform string
}
5 changes: 5 additions & 0 deletions tracker/datacap/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,8 @@ func (c *Client) ReportDataCapConsumption(ctx context.Context, report *DataCapRe

return &status, nil
}

// Report implements ReportSink by delegating to ReportDataCapConsumption.
func (c *Client) Report(ctx context.Context, report *DataCapReport) (*DataCapStatus, error) {
return c.ReportDataCapConsumption(ctx, report)
}
Loading
Loading