Skip to content

Latest commit

 

History

History
376 lines (303 loc) · 37.5 KB

File metadata and controls

376 lines (303 loc) · 37.5 KB

AGENTS

This file provides repository-specific guidance for coding agents working on lib-commons.

Project snapshot

  • Module: github.com/LerianStudio/lib-commons/v5
  • Language: Go
  • Go version: 1.25.9 (see go.mod)
  • Current API generation: v5

Primary objective for changes

  • Preserve v5 public API contracts unless a task explicitly asks for breaking changes.
  • Prefer explicit error returns over panic paths in production code.
  • Keep behavior nil-safe and concurrency-safe by default.

Repository shape

Root:

  • commons/: root shared helpers (app, context, errors, utilities, time, string, os)

Observability and logging:

  • commons/opentelemetry: telemetry bootstrap, propagation, redaction, span helpers
  • commons/opentelemetry/metrics: metric factory + fluent builders (Counter, Gauge, Histogram)
  • commons/log: logging abstraction (Logger interface), typed Field constructors, log-injection prevention, sanitizer
  • commons/zap: zap adapter for commons/log with OTEL bridge support

Data and messaging:

  • commons/postgres: Postgres connector with dbresolver, migrations, OTEL spans, backoff-based lazy-connect
  • commons/mongo: MongoDB connector with functional options, URI builder, index helpers, OTEL spans
  • commons/redis: Redis connector with topology-based config (standalone/sentinel/cluster), GCP IAM auth, distributed locking (Redsync), backoff-based reconnect
  • commons/rabbitmq: AMQP connection/channel/health helpers with context-aware methods
  • commons/streaming: CloudEvents-framed domain event publisher to Redpanda/Kafka with circuit-breaker + outbox fallback, per-topic DLQ, and franz-go backing (producer only; orthogonal to commons/rabbitmq)
  • commons/dlq: Redis-backed dead letter queue with tenant-scoped keys, exponential backoff, and a background consumer with retry/exhaust lifecycle
  • commons/systemplane: dual-backend (Postgres/MongoDB) hot-reload runtime configuration store with LISTEN/NOTIFY and change-stream subscriptions, admin HTTP routes, and test contract suite

HTTP and server:

  • commons/net/http: Fiber HTTP helpers (response, error rendering, cursor/offset/sort pagination, validation, SSRF-protected reverse proxy, CORS, basic auth, telemetry middleware, health checks, access logging)
  • commons/net/http/ratelimit: Redis-backed rate limit storage for Fiber
  • commons/net/http/idempotency: Redis-backed at-most-once request middleware for Fiber (SetNX, fail-open, 409 for in-flight, response replay)
  • commons/server: ServerManager-based graceful shutdown and lifecycle helpers
  • commons/webhook: outbound webhook delivery with SSRF protection, HMAC-SHA256 signing, DNS pinning, concurrency control, and exponential backoff retries

Resilience and safety:

  • commons/circuitbreaker: circuit breaker manager with preset configs and health checker
  • commons/backoff: exponential backoff with jitter and context-aware sleep
  • commons/runtime: panic recovery, panic metrics, safe goroutine wrappers, error reporter, production mode
  • commons/assert: production-safe assertions with telemetry integration and domain predicates
  • commons/safe: panic-free math/regex/slice operations with error returns
  • commons/security: sensitive field detection and handling
  • commons/security/ssrf: canonical SSRF validation — IP blocking (CIDR blocklist + stdlib predicates), hostname blocking (metadata endpoints, dangerous suffixes), URL validation, DNS-pinned resolution with TOCTOU elimination
  • commons/errgroup: goroutine coordination with panic recovery
  • commons/certificate: thread-safe TLS certificate manager with hot reload, PEM file loading, PKCS#8/PKCS#1/EC key support, and tls.Config integration

Domain and support:

  • commons/transaction: intent-based transaction planning, balance eligibility validation, posting flow
  • commons/crypto: hashing and symmetric encryption with credential-safe fmt output
  • commons/jwt: HMAC-based JWT signing, verification, and time-claim validation
  • commons/license: license validation and enforcement with functional options
  • commons/pointers: pointer conversion helpers
  • commons/cron: cron expression parsing and scheduling
  • commons/constants: shared constants (headers, errors, pagination, transactions, metadata, datasource status, OTEL attributes, obfuscation)

Build and shell:

  • commons/shell/: Makefile include helpers (makefile_colors.mk, makefile_utils.mk), shell scripts, ASCII art

API invariants to respect

Telemetry (commons/opentelemetry)

  • Initialization is explicit with opentelemetry.NewTelemetry(cfg TelemetryConfig) (*Telemetry, error).
  • Global OTEL providers are opt-in via (*Telemetry).ApplyGlobals().
  • (*Telemetry).Tracer(name) (trace.Tracer, error) and (*Telemetry).Meter(name) (metric.Meter, error) for named providers.
  • Shutdown via ShutdownTelemetry() or ShutdownTelemetryWithContext(ctx) error.
  • TelemetryConfig includes InsecureExporter, Propagator, and Redactor fields.
  • Redaction uses Redactor with RedactionRule patterns; NewDefaultRedactor() and NewRedactor(rules, mask). Old FieldObfuscator interface is removed.
  • RedactingAttrBagSpanProcessor redacts sensitive span attributes using a Redactor.

Metrics (commons/opentelemetry/metrics)

  • Metric factory/builder operations return errors and should not be silently ignored.
  • Supports Counter, Histogram, and Gauge instrument types.
  • NewMetricsFactory(meter, logger) (*MetricsFactory, error).
  • NewNopFactory() *MetricsFactory for tests / disabled metrics.
  • Builder pattern: .WithLabels(map) or .WithAttributes(attrs...) then .Add() / .Set() / .Record().
  • Convenience recorders: RecordAccountCreated, RecordTransactionProcessed, etc. (no more org/ledger positional args).

Logging (commons/log)

  • Logger interface: 5 methods -- Log(ctx, level, msg, fields...), With(fields...), WithGroup(name), Enabled(level), Sync(ctx).
  • Level constants: LevelError (0), LevelWarn (1), LevelInfo (2), LevelDebug (3).
  • Field constructors: String(), Int(), Bool(), Err(), Any().
  • NewNop() Logger for test/disabled logging.
  • GoLogger provides a stdlib-based implementation with CWE-117 log-injection prevention.
  • Sanitizer: SafeError() and SanitizeExternalResponse().

Zap adapter (commons/zap)

  • New(cfg Config) (*Logger, error) for construction.
  • Logger implements log.Logger and also exposes Raw() *zap.Logger, Level() zap.AtomicLevel.
  • Direct zap convenience: Debug(), Info(), Warn(), Error(), WithZapFields().
  • Config has Environment (typed string), Level, OTelLibraryName fields.
  • Field constructors: Any(), String(), Int(), Bool(), Duration(), ErrorField().

HTTP helpers (commons/net/http)

  • Response: Respond, RespondStatus, RespondError, RenderError, FiberErrorHandler. Individual status helpers (BadRequestError, etc.) are removed.
  • Health: Ping (returns "pong"), HealthWithDependencies(deps...) with AND semantics (both circuit breaker and health check must pass).
  • Reverse proxy: ServeReverseProxy(target, policy, res, req) error with ReverseProxyPolicy for SSRF protection.
  • Pagination: offset-based (ParsePagination), opaque cursor (ParseOpaqueCursorPagination), timestamp cursor, and sort cursor APIs. All encode functions return errors.
  • Validation: ParseBodyAndValidate, ValidateStruct, GetValidator, ValidateSortDirection, ValidateLimit, ValidateQueryParamLength.
  • Context/ownership: ParseAndVerifyTenantScopedID, ParseAndVerifyResourceScopedID with TenantOwnershipVerifier and ResourceOwnershipVerifier func types.
  • Middleware: WithHTTPLogging, WithGrpcLogging, WithCORS, AllowFullOptionsWithCORS, WithBasicAuth, NewTelemetryMiddleware.
  • ErrorResponse has Code int (not string), Title, Message; implements error.

Server lifecycle (commons/server)

  • ServerManager exclusively; GracefulShutdown is removed.
  • NewServerManager(licenseClient, telemetry, logger) *ServerManager.
  • Chainable config: WithHTTPServer, WithGRPCServer, WithShutdownChannel, WithShutdownTimeout, WithShutdownHook.
  • StartWithGracefulShutdown() (exits on error) or StartWithGracefulShutdownWithError() error (returns error).
  • ServersStarted() <-chan struct{} for test coordination.

Circuit breaker (commons/circuitbreaker)

  • Manager interface with NewManager(logger, opts...) (Manager, error) constructor.
  • GetOrCreate returns (CircuitBreaker, error) and validates config.
  • Preset configs: DefaultConfig(), AggressiveConfig(), ConservativeConfig(), HTTPServiceConfig(), DatabaseConfig().
  • Metrics via WithMetricsFactory option.
  • NewHealthCheckerWithValidation(manager, interval, timeout, logger) (HealthChecker, error).

Certificate manager (commons/certificate)

  • NewManager(certPath, keyPath string) (*Manager, error) — loads PEM files at construction; if both paths are empty an unconfigured manager is returned (TLS optional). Returns ErrIncompleteConfig when exactly one path is provided.
  • Key parsing order: PKCS#8 → PKCS#1 (RSA) → EC (SEC 1). Key file must have mode 0600 or stricter; looser permissions return an error before reading.
  • Atomic hot-reload via (*Manager).Rotate(cert *x509.Certificate, key crypto.Signer, intermediates ...[]byte) error — validates expiry and public-key match before swapping under a write lock.
  • Sentinel errors: ErrNilManager, ErrCertRequired, ErrKeyRequired, ErrExpired, ErrNoPEMBlock, ErrKeyParseFailure, ErrNotSigner, ErrKeyMismatch, ErrIncompleteConfig.
  • Read accessors (all nil-safe, read-locked): GetCertificate() *x509.Certificate, GetSigner() crypto.Signer, PublicKey() crypto.PublicKey, ExpiresAt() time.Time, DaysUntilExpiry() int.
  • TLS integration: TLSCertificate() tls.Certificate (returns populated tls.Certificate struct); GetCertificateFunc() func(*tls.ClientHelloInfo) (*tls.Certificate, error) — suitable for assignment to tls.Config.GetCertificate for transparent hot-reload.
  • Package-level helper: LoadFromFiles(certPath, keyPath string) (*x509.Certificate, crypto.Signer, error) — validates without touching any manager state, useful for pre-flight checking before calling Rotate.
  • Package-level helper: LoadFromFilesWithChain(certPath, keyPath string) (*x509.Certificate, crypto.Signer, [][]byte, error) — returns the full DER chain for chain-preserving hot-reload.

SSRF validation (commons/security/ssrf)

  • IsBlockedIP(net.IP) and IsBlockedAddr(netip.Addr) for IP-level SSRF blocking. IsBlockedAddr is the core; IsBlockedIP delegates after conversion.
  • IsBlockedHostname(hostname) for hostname-level blocking: localhost, cloud metadata endpoints, .local/.internal/.cluster.local suffixes.
  • BlockedPrefixes() []netip.Prefix returns a copy of the canonical CIDR blocklist (8 ranges: this-network, CGNAT, IETF assignments, TEST-NET-1/2/3, benchmarking, reserved).
  • ValidateURL(ctx, rawURL, opts...) validates scheme, hostname, and parsed IP without DNS.
  • ResolveAndValidate(ctx, rawURL, opts...) (*ResolveResult, error) performs DNS-pinned validation. ResolveResult has PinnedURL, Authority (host:port for HTTP Host), SNIHostname (for TLS ServerName).
  • Functional options: WithHTTPSOnly(), WithAllowPrivateNetwork(), WithLookupFunc(fn), WithAllowHostname(hostname).
  • Sentinel errors: ErrBlocked, ErrInvalidURL, ErrDNSFailed.
  • Both commons/webhook and commons/net/http delegate to this package — it is the single source of truth for SSRF blocking across all Lerian services.

Assertions (commons/assert)

  • New(ctx, logger, component, operation) *Asserter and return errors instead of panicking.
  • Methods: That(), NotNil(), NotEmpty(), NoError(), Never(), Halt().
  • Metrics: InitAssertionMetrics(factory), GetAssertionMetrics(), ResetAssertionMetrics().
  • Predicates library (predicates.go): Positive, NonNegative, InRange, ValidUUID, ValidAmount, PositiveDecimal, NonNegativeDecimal, ValidPort, ValidSSLMode, DebitsEqualCredits, TransactionCanTransitionTo, BalanceSufficientForRelease, and more.

Runtime (commons/runtime)

  • Recovery: RecoverAndLog, RecoverAndCrash, RecoverWithPolicy (and *WithContext variants).
  • Safe goroutines: SafeGo, SafeGoWithContext, SafeGoWithContextAndComponent with PanicPolicy (KeepRunning/CrashProcess).
  • Panic metrics: InitPanicMetrics(factory[, logger]), GetPanicMetrics(), ResetPanicMetrics().
  • Span recording: RecordPanicToSpan, RecordPanicToSpanWithComponent.
  • Error reporter: SetErrorReporter(reporter), GetErrorReporter().
  • Production mode: SetProductionMode(bool), IsProductionMode() bool.

Safe operations (commons/safe)

  • Math: Divide, DivideRound, DivideOrZero, DivideOrDefault, Percentage, PercentageOrZero, DivideFloat64, DivideFloat64OrZero.
  • Regex: Compile, CompilePOSIX, MatchString, FindString, ClearCache (all with caching).
  • Slices: First[T], Last[T], At[T] with error returns and *OrDefault variants.

JWT (commons/jwt)

  • Parse(token, secret, allowedAlgs) (*Token, error) -- signature verification only.
  • ParseAndValidate(token, secret, allowedAlgs) (*Token, error) -- signature + time claims.
  • Sign(claims, secret, alg) (string, error).
  • ValidateTimeClaims(claims) and ValidateTimeClaimsAt(claims, now).
  • Token.SignatureValid (bool) -- replaces v1 Token.Valid; clarifies signature-only scope.
  • Algorithms: AlgHS256, AlgHS384, AlgHS512.

Data connectors

  • Postgres: New(cfg Config) (*Client, error) with explicit Config; Resolver(ctx) replaces GetDB(). Primary() (*sql.DB, error) for raw access. Migrations via NewMigrator(cfg).
  • Mongo: NewClient(ctx, cfg, opts...) (*Client, error); methods Client(ctx), ResolveClient(ctx), Database(ctx), Ping(ctx), Close(ctx), EnsureIndexes(ctx, collection, indexes...).
  • Redis: New(ctx, cfg) (*Client, error) with topology-based Config (standalone/sentinel/cluster). GetClient(ctx), Close(), Status(), IsConnected(), LastRefreshError(). SetPackageLogger(logger) for nil-receiver diagnostics.
  • Redis locking: NewRedisLockManager(conn) (*RedisLockManager, error) and LockManager interface. LockHandle for acquired locks. DefaultLockOptions(), RateLimiterLockOptions().
  • RabbitMQ: *Context() variants of all lifecycle methods; HealthCheck() (bool, error). Secret redaction marker switched from "xxxxx" to "****" (unified with commons/security/sanitize.SecretRedactionMarker) — operator tooling (SIEM rules, dashboards, log-grep alerts) keyed on the literal "xxxxx" in rabbitmq sanitized error messages must update queries to match "****". Affects sanitizeAMQPErr, redactURLCredentialToken, and redactURLCredentialsFallback output only.

Dead letter queue (commons/dlq)

  • New(conn *libRedis.Client, keyPrefix string, maxRetries int, opts ...Option) *Handler — returns nil when conn is nil; all Handler methods guard against a nil receiver with ErrNilHandler.
  • Functional options for Handler: WithLogger, WithTracer, WithMetrics, WithModule.
  • DLQMetrics interface: RecordRetried(ctx, source), RecordExhausted(ctx, source), RecordLost(ctx, source) — a nop implementation is used when not set.
  • Key operations: Enqueue(ctx, *FailedMessage) error (RPush), Dequeue(ctx, source string) (*FailedMessage, error) (LPop, destructive), QueueLength(ctx, source string) (int64, error).
  • Tenant-scoped Redis keys: "<prefix><tenantID>:<source>" (e.g. "dlq:tenant-abc:outbound"); falls back to "<prefix><source>" when no tenant is in context.
  • ScanQueues(ctx, source string) ([]string, error) — uses SCAN (non-blocking) to discover all tenant-scoped keys for a source; suitable for background consumers without tenant context.
  • ExtractTenantFromKey(key, source string) string — recovers the tenant ID from a scoped Redis key.
  • PruneExhaustedMessages(ctx, source string, limit int) (int, error) — dequeues up to limit messages, discards exhausted ones, re-enqueues the rest; at-most-once semantics.
  • Backoff: exponential with AWS Full Jitter, base 30s, floor 5s, computed by backoffDuration(retryCount).
  • Sentinel errors: ErrNilHandler, ErrNilRetryFunc, ErrMessageExhausted.
  • NewConsumer(handler *Handler, retryFn RetryFunc, opts ...ConsumerOption) (*Consumer, error) — errors if handler or retryFn is nil.
  • Consumer functional options: WithConsumerLogger, WithConsumerTracer, WithConsumerMetrics, WithConsumerModule, WithPollInterval, WithBatchSize, WithSources.
  • Consumer lifecycle: Run(ctx) — blocks, stops on ctx cancel or Stop(); Stop() — safe to call multiple times; ProcessOnce(ctx) — exported for testing.
  • FailedMessage fields: Source, OriginalData, ErrorMessage, RetryCount, MaxRetries, CreatedAt, NextRetryAt, TenantID.

Idempotency middleware (commons/net/http/idempotency)

  • New(conn *libRedis.Client, opts ...Option) *Middleware — returns nil when conn is nil; Check() on a nil *Middleware returns a pass-through Fiber handler (fail-open by design).
  • Functional options: WithLogger, WithKeyPrefix (default "idempotency:"), WithKeyTTL (default 7 days), WithMaxKeyLength (default 256), WithRedisTimeout (default 500ms), WithRejectedHandler, WithMaxBodyCache (default 1 MB).
  • (*Middleware).Check() fiber.Handler — registers the middleware on a Fiber route.
  • Only applies to mutating methods (POST, PUT, PATCH, DELETE); GET/HEAD/OPTIONS pass through unconditionally.
  • Idempotency key is read from the X-Idempotency request header (constants.IdempotencyKey); missing key passes through.
  • Key too long → 400 JSON VALIDATION_ERROR (or custom WithRejectedHandler).
  • Redis SetNX atomically claims the key as "processing" for the TTL duration.
  • Duplicate request behavior:
    • Cached response available → replays status + body verbatim, sets Idempotency-Replayed: true header.
    • Key in "processing" state (in-flight) → 409 JSON IDEMPOTENCY_CONFLICT.
    • Key in "complete" but no cached body → 200 JSON IDEMPOTENT.
  • On handler success: stores response body under <key>:response (if ≤ maxBodyCache), marks key as "complete".
  • On handler error: deletes both keys so the client may retry with the same idempotency key.
  • Redis unavailable → fail-open (request proceeds without idempotency enforcement, logged at WARN).
  • Keys are tenant-scoped: "<prefix><tenantID>:<idempotencyKey>".

Webhook delivery (commons/webhook)

  • NewDeliverer(lister EndpointLister, opts ...Option) *Deliverer — returns nil when lister is nil; both Deliver and DeliverWithResults guard against a nil receiver.
  • EndpointLister interface: ListActiveEndpoints(ctx context.Context) ([]Endpoint, error).
  • Functional options: WithLogger, WithTracer, WithMetrics, WithMaxConcurrency (default 20), WithMaxRetries (default 3), WithHTTPClient, WithSecretDecryptor.
  • Deliver(ctx, *Event) error — fans out to all active endpoints concurrently; only returns an error for pre-flight failures (nil receiver, nil event, endpoint listing failure). Per-endpoint failures are logged + metricked but do not propagate.
  • DeliverWithResults(ctx, *Event) []DeliveryResult — same fan-out, returns one DeliveryResult per active endpoint for callers that need individual outcomes.
  • Endpoint fields: ID, URL, Secret (plaintext or enc: prefix for encrypted), Active.
  • Event fields: Type, Payload []byte, Timestamp int64 (Unix epoch seconds).
  • DeliveryResult fields: EndpointID, StatusCode, Success, Error, Attempts.
  • DeliveryMetrics interface: RecordDelivery(ctx, endpointID string, success bool, statusCode, attempts int).
  • SecretDecryptor type: func(encrypted string) (string, error) — receives ciphertext with enc: prefix stripped. No decryptor + encrypted secret = fail-closed (delivery skipped with error).
  • SSRF protection: delegates to commons/security/ssrf.ResolveAndValidate for DNS-pinned validation, IP range blocking, and hostname blocking. The webhook package maps ssrf sentinel errors to its own (ErrSSRFBlocked, ErrInvalidURL). Only http and https schemes are allowed.
  • HMAC signing: X-Webhook-Signature: sha256=<hex(HMAC-SHA256(payload, secret))>. Timestamp is NOT included in the signature — replay protection is the receiver's responsibility.
  • HTTP client blocks all redirects to prevent SSRF bypass via 302 to internal addresses.
  • Retry strategy: exponential backoff with jitter (commons/backoff), base 1s. Non-retryable on 4xx except 429.
  • Sentinel errors: ErrNilDeliverer, ErrSSRFBlocked, ErrDeliveryFailed, ErrInvalidURL.

Streaming (commons/streaming)

  • commons/streaming is producer-only; commons/rabbitmq handles internal command queues. The two are orthogonal and neither deprecates the other.
  • Three-method Emitter interface (Emit(ctx, Event) error, Close() error, Healthy(ctx) error). Three implementations: *Producer (franz-go-backed), NoopEmitter (fail-safe when disabled), and MockEmitter (concurrency-safe test double with deep-copy Events, Assert* helpers, and WaitForEvent).
  • Constructors: New(ctx, cfg Config, opts ...EmitterOption) (Emitter, error) picks the right implementation from Config — returns a NoopEmitter when Enabled=false or Brokers is empty; NewProducer(ctx, cfg, opts...) (*Producer, error) forces construction and never substitutes a Noop.
  • LoadConfig() (Config, error) reads every STREAMING_* env var, applies defaults, and validates the result. Validation is skipped when Enabled=false.
  • Functional options: WithLogger, WithMetricsFactory, WithTracer, WithCircuitBreakerManager, WithPartitionKey, WithCloseTimeout, WithOutboxRepository, WithTLSConfig, WithSASL. Passing nil for any factory or manager is safe — the Producer falls back to a no-op recorder / its own CB manager.
  • Event struct carries the CloudEvents 1.0 binary-mode envelope: TenantID, ResourceType, EventType, EventID, SchemaVersion, Timestamp, Source (required CloudEvents fields) plus Subject, DataContentType, DataSchema, SystemEvent, and Payload json.RawMessage. ApplyDefaults() fills missing EventID (uuid.NewV7), Timestamp (now UTC), SchemaVersion ("1.0.0"), and DataContentType ("application/json") on a local copy before publish.
  • Event.Topic() derives "lerian.streaming.<resource>.<event>" (with ".v<major>" suffix when SchemaVersion major is ≥2). Event.PartitionKey() returns TenantID by default, or "system:" + EventType when SystemEvent=true.
  • Caller-side sentinels (synchronous, no I/O): ErrMissingTenantID, ErrMissingSource, ErrPayloadTooLarge (1 MiB cap), ErrNotJSON, ErrEventDisabled, ErrEmitterClosed.
  • Config-validation sentinels: ErrMissingBrokers, ErrInvalidCompression, ErrInvalidAcks (ErrMissingSource is shared with Emit).
  • Lifecycle/wiring sentinels (NOT caller errors — IsCallerError returns false): ErrNilProducer, ErrCircuitOpen, ErrOutboxNotConfigured, ErrNilOutboxRegistry.
  • *EmitError carries ResourceType, EventType, TenantID, Topic, Class ErrorClass, and Cause error. Error() runs through sanitizeBrokerURL so SASL credentials never surface in logs. IsCallerError(err) returns true for the caller-correctable sentinels and for *EmitError with class ClassSerialization, ClassValidation, or ClassAuth.
  • Eight ErrorClass values: ClassSerialization, ClassValidation, ClassAuth, ClassTopicNotFound, ClassBrokerUnavailable, ClassNetworkTimeout, ClassContextCanceled, ClassBrokerOverloaded. DLQ routing applies to every class except ClassValidation and ClassContextCanceled (TRD §C9).
  • Lifecycle invariants: *Producer implements commons.AppRun(launcher) / RunContext(ctx, launcher) block until ctx is canceled or Close is called, then invoke CloseContext with a fresh background ctx so a canceled caller ctx does not abort Flush. Close/CloseContext are idempotent via atomic.Bool CAS. Post-close Emit returns ErrEmitterClosed synchronously before any I/O. Service methods MUST NOT call Close — the Launcher owns lifecycle.
  • Outbox fallback: when WithOutboxRepository is wired and the circuit breaker is OPEN, Emit writes the event to the outbox and returns nil. The caller registers the replay handler via (*Producer).RegisterOutboxHandler(registry, eventTypes...), which routes back through publishDirect (NOT Emit) so replays bypass the breaker and cannot re-enqueue themselves on a sustained outage (TRD §C7). Without an outbox wired, circuit-open Emits return ErrCircuitOpen. The Producer NEVER constructs an OutboxRepository itself — ownership stays with the consuming service.
  • DLQ: per-topic, named "<source>.dlq". Each DLQ message carries six headers (x-lerian-dlq-source-topic, x-lerian-dlq-error-class, x-lerian-dlq-error-message, x-lerian-dlq-retry-count, x-lerian-dlq-first-failure-at, x-lerian-dlq-producer-id) alongside the CloudEvents ce-* context attributes. DLQ publish failures surface on the streaming_dlq_publish_failed_total counter and are logged, not returned to the caller.
  • Healthy(ctx) returns nil when ready; otherwise returns a *HealthError whose State() is one of Healthy, Degraded (broker unreachable but outbox viable), or Down (both unreachable). Health check bounds the broker Ping to 500ms.
  • Concurrency: *Producer, MockEmitter, and NoopEmitter are all safe for concurrent use from any number of goroutines.
  • Metrics: streaming_emitted_total, streaming_emit_duration_ms, streaming_dlq_total, streaming_dlq_publish_failed_total, streaming_outbox_routed_total, streaming_circuit_state. All registered via the MetricsFactory passed through WithMetricsFactory; nil factory degrades to a no-op recorder after a single WARN log at first Emit.

Runtime configuration (commons/systemplane)

  • Dual-backend hot-reload config store. Consumers choose at construction: NewPostgres(db *sql.DB, listenDSN string, opts ...Option) (*Client, error) or NewMongoDB(client *mongo.Client, database string, opts ...Option) (*Client, error).
  • Client lifecycle: construct → Register(namespace, key, default, opts...) and/or RegisterTenantScoped(namespace, key, default, opts...) for each known key → Start(ctx) (hydrates from store, begins Subscribe) → runtime operations → Close(). Register after Start returns ErrRegisterAfterStart. Nil-receiver safe on all read paths.
  • Reads: Get(ns, key) (any, bool) plus typed accessors GetString, GetInt, GetBool, GetFloat64, GetDuration. All nil-safe; return zero values on miss.
  • Writes: Set(ctx, ns, key, value, actor) — last-write-wins, write-through cache for same-process read consistency, fires subscribers via the changefeed echo (not synchronously from Set).
  • Subscriptions: OnChange(ns, key, fn) returns an unsubscribe func. Callbacks invoked serially with panic recovery via commons/runtime.RecoverAndLog. Nil-receiver safe.
  • Listing/metadata: List(namespace) []ListEntry returns sorted entries in a namespace; KeyRedaction(ns, key) RedactPolicy for admin redaction.
  • Namespaces are free-text (convention: "global", "tenant:<id>", "feature-flags"). Authorization is enforced at the admin HTTP boundary, not in the Client.
  • Registered keys carry: default value, description, validator func, redaction policy (RedactNone | RedactMask | RedactFull). Options: WithDescription, WithValidator, WithRedaction.
  • Client options: WithLogger, WithTelemetry, WithDebounce (default 100ms), WithListenChannel (Postgres default "systemplane_changes"), WithTable (Postgres default "systemplane_entries"), WithCollection (MongoDB default "systemplane_entries"), WithPollInterval (MongoDB — non-zero switches from change-streams to polling; required for standalone MongoDB without a replica set), WithLazyTenantLoad(maxEntries) (bounded-LRU tenant cache in lazy mode; non-positive maxEntries falls back to eager hydration).
  • Tenant-scoped keys (additive, non-breaking): RegisterTenantScoped(ns, key, default, opts...) declares a key eligible for per-tenant overrides while the legacy Get/OnChange/List surface keeps observing only the shared _global row (PRD AC1/AC8). Tenant-aware methods: GetForTenant(ctx, ns, key) (any, bool, error), SetForTenant(ctx, ns, key, value, actor) error, DeleteForTenant(ctx, ns, key, actor) error, ListTenantsForKey(ns, key) []string, OnTenantChange(ns, key, fn func(ctx context.Context, ns, key, tenantID string, newValue any)) (unsubscribe func()) (the ctx is pre-scoped to tenantID via core.ContextWithTenantID, so subscribers can directly call tenant-aware lib-commons facilities like DLQ, idempotency, and webhook), plus typed accessor mirrors GetStringForTenant, GetIntForTenant, GetBoolForTenant, GetFloat64ForTenant, GetDurationForTenant. Tenant ID is extracted from ctx via core.GetTenantIDContext and validated against core.IsValidTenantID; fail-closed — there is no silent fallback to the shared global. _global is the reserved sentinel for shared rows and is rejected as a tenant ID. Delete is idempotent; when a row exists its removal fires OnTenantChange with newValue = registered default, but a no-op delete (no row to remove) emits no changefeed event and does NOT fire subscribers. GetForTenant resolution order: per-tenant cache → legacy global cache → registered default.
  • Admin HTTP surface (commons/systemplane/admin): Mount(router, client, opts...) registers six routes at a configurable prefix (default /system). Legacy globals: GET :prefix/:namespace (list), GET :prefix/:namespace/:key (read), PUT :prefix/:namespace/:key (write). Tenant-scoped: GET :prefix/:namespace/:key/tenants (list tenants with an override), PUT :prefix/:namespace/:key/tenants/:tenantID (write tenant override), DELETE :prefix/:namespace/:key/tenants/:tenantID (remove tenant override). Options: WithPathPrefix, WithAuthorizer (legacy routes only, hook with "read" / "write" actions), WithTenantAuthorizer(fn func(c *fiber.Ctx, action, tenantID string) error) (tenant routes only; default-deny when absent — the library does NOT silently fall back to WithAuthorizer for tenant routes to avoid silent privilege escalation), WithActorExtractor. Values are redacted per the registered RedactPolicy before responding.
  • Storage evolution: Postgres adds tenant_id TEXT NOT NULL DEFAULT '_global' with a composite unique index on (namespace, key, tenant_id); existing rows are backfilled with _global by the column default. MongoDB switches to a compound BSON document _id {namespace, key, tenant_id}; first boot against a legacy ObjectId _id collection runs an idempotent backfill migration during store construction (inside NewMongoDB via ensureSchema, not deferred to Start) and is safe to resume on restart if a crash interrupts it mid-flight.
  • Internal Store interface (internal/store) has two implementations: internal/postgres (LISTEN/NOTIFY, pgx/v5) and internal/mongodb (change streams with polling fallback, mongo-driver/v2). External service repositories verify backend behavior through the public Client contract suite: systemplanetest.Run(t, func(t *testing.T) *systemplane.Client { ... }).
  • Sentinel errors: ErrClosed, ErrNotStarted, ErrRegisterAfterStart, ErrUnknownKey, ErrValidation, ErrDuplicateKey, ErrMissingTenantContext (ctx has no tenant ID), ErrInvalidTenantID (fails core.IsValidTenantID or equals _global), ErrTenantScopeNotRegistered (key was registered via Register, not RegisterTenantScoped), ErrTenantSchemaNotEnabled (emitted by phase-1 tenant write paths — SetForTenant / DeleteForTenant — when the backend store was constructed without TenantSchemaEnabled; surfaces as 503 tenant_schema_not_enabled through the admin HTTP layer).
  • NewForTesting(s TestStore, opts ...Option) (*Client, error) is an explicit out-of-package test helper for consumers that need a Client bound to a caller-controlled store (e.g., the admin subpackage's tests). Not a promised production API.
  • Scope: runtime-mutable knobs only. Bootstrap-only config (DB DSNs, secrets, TLS paths, telemetry init, server identity) should live in env-vars or the secret manager, not in systemplane.
  • Consumer adoption guide for tenant-scoped keys: commons/systemplane/MIGRATION_TENANT_SCOPED.md.

Other packages

  • Backoff: ExponentialWithJitter() and WaitContext(). Used by redis and postgres for retry rate-limiting.
  • Errgroup: WithContext(ctx) (*Group, context.Context); Go(fn) with panic recovery; SetLogger(logger).
  • Crypto: Crypto struct with GenerateHash, InitializeCipher, Encrypt, Decrypt. String() / GoString() redact credentials.
  • License: New(opts...) *ManagerShutdown with WithLogger() option. SetHandler(), Terminate(), TerminateWithError(), TerminateSafe().
  • Pointers: String(), Bool(), Time(), Int(), Int64(), Float64().
  • Cron: Parse(expr) (Schedule, error); Schedule.Next(t) (time.Time, error).
  • Security: IsSensitiveField(name), DefaultSensitiveFields(), DefaultSensitiveFieldsMap().
  • SSRF: IsBlockedIP(), IsBlockedAddr(), IsBlockedHostname(), BlockedPrefixes(), ValidateURL(), ResolveAndValidate(). Single source of truth for all SSRF protection. Both webhook and net/http delegate here.
  • Transaction: BuildIntentPlan() + ValidateBalanceEligibility() + ApplyPosting() with typed IntentPlan, Posting, LedgerTarget. ResolveOperation(pending, isSource, status) (Operation, error).
  • Constants: SanitizeMetricLabel(value) string for OTEL label safety.
  • Certificate: NewManager(certPath, keyPath) (*Manager, error); Rotate(cert, key), TLSCertificate(), GetCertificateFunc(); package-level LoadFromFiles(certPath, keyPath) for pre-flight validation.
  • DLQ: New(conn, keyPrefix, maxRetries, opts...) *Handler; NewConsumer(handler, retryFn, opts...) (*Consumer, error); Run(ctx) / Stop() / ProcessOnce(ctx) for consumer lifecycle.
  • Idempotency: New(conn, opts...) *Middleware; (*Middleware).Check() fiber.Handler; fail-open when Redis is unavailable.
  • Webhook: NewDeliverer(lister, opts...) *Deliverer; Deliver(ctx, event) error; DeliverWithResults(ctx, event) []DeliveryResult; SSRF-protected with DNS pinning and HMAC-SHA256 signing.
  • Streaming: LoadConfig() (Config, error); New(ctx, cfg, opts...) (Emitter, error) picks Producer or NoopEmitter from Config; NewProducer(ctx, cfg, opts...) (*Producer, error) forces construction; three-method Emitter interface (Emit/Close/Healthy); MockEmitter test double with Assert* helpers; *Producer implements commons.App — Close is idempotent, post-close Emit returns ErrEmitterClosed. Producer-only; commons/rabbitmq handles internal command queues.

Coding rules

  • Do not add panic(...) in production paths.
  • Do not swallow errors; return or handle with context.
  • Keep exported docs aligned with behavior.
  • Reuse existing package patterns before introducing new abstractions.
  • Avoid introducing high-cardinality telemetry labels by default.
  • Use the structured log interface (Log(ctx, level, msg, fields...)) -- do not add printf-style methods.

Testing and validation

Core commands

  • make test -- run unit tests (uses gotestsum if available)
  • make test-unit -- run unit tests excluding integration
  • make test-integration -- run integration tests with testcontainers (requires Docker)
  • make test-all -- run all tests (unit + integration)
  • make ci -- run the local fix + verify pipeline (lint-fix, format, tidy, check-tests, sec, vet, test-unit, test-integration)
  • make lint -- run lint checks (read-only)
  • make lint-fix -- auto-fix lint issues
  • make build -- build all packages
  • make format -- format code with gofmt
  • make tidy -- clean dependencies
  • make vet -- run go vet on all packages
  • make sec -- run security checks using gosec (SARIF=1 for SARIF output)
  • make clean -- clean build artifacts

Coverage

  • make coverage-unit -- unit tests with coverage report (respects .ignorecoverunit)
  • make coverage-integration -- integration tests with coverage
  • make coverage -- run all coverage targets

Test flags

  • LOW_RESOURCE=1 -- sets -p=1 -parallel=1, disables -race for constrained machines
  • RETRY_ON_FAIL=1 -- retries failed tests once
  • RUN=<pattern> -- filter integration tests by name pattern
  • PKG=<path> -- filter to specific package(s)
  • DISABLE_OSX_LINKER_WORKAROUND=1 -- disable macOS ld_classic workaround

Integration test conventions

  • Test files: *_integration_test.go
  • Test functions: TestIntegration_<Name>
  • Build tag: integration

Other

  • make tools -- install gotestsum
  • make check-tests -- verify test coverage for packages
  • make setup-git-hooks -- install git hooks
  • make check-hooks -- verify git hooks installation
  • make check-envs -- check hooks + environment file security
  • make goreleaser -- create release snapshot

Migration awareness

  • If a task touches renamed/removed v1 symbols, update MIGRATION_MAP.md.
  • If a task changes package-level behavior or API expectations, update README.md.

Project rules

Documentation policy

  • Keep docs factual and code-backed.
  • Avoid speculative roadmap text.
  • Prefer concise package-level examples that compile with current API names.
  • When adding, removing, or changing any environment variable consumed by lib-commons, update .env.reference in the same change.