Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/redis/go-redis/v9 v9.7.0
github.com/stretchr/testify v1.10.0
github.com/tom-draper/api-analytics/analytics/go/gin v0.0.0-20250124150115-22f84a4e90ef
go.uber.org/zap v1.27.0
)

require (
Expand All @@ -32,7 +33,6 @@ require (
github.com/google/go-cmp v0.5.9 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.2.9 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
Expand All @@ -43,6 +43,7 @@ require (
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.12 // indirect
github.com/yuin/gopher-lua v1.1.1 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/arch v0.13.0 // indirect
golang.org/x/crypto v0.32.0 // indirect
golang.org/x/net v0.34.0 // indirect
Expand Down
7 changes: 6 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL
github.com/cloudwego/base64x v0.1.5 h1:XPciSp1xaq2VCSt6lF0phncD4koWyULpl5bUxbfCyP4=
github.com/cloudwego/base64x v0.1.5/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w=
github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -100,6 +99,12 @@ github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65E
github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M=
github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
golang.org/x/arch v0.13.0 h1:KCkqVVV1kGg0X87TFysjCJ8MxtZEIU4Ja/yXGeoECdA=
golang.org/x/arch v0.13.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys=
golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc=
Expand Down
51 changes: 31 additions & 20 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"log"
"net/http"
"net/http/pprof"
"os"
Expand All @@ -28,6 +27,8 @@ import (
"github.com/jasonlovesdoggo/abacus/utils"

"github.com/gin-gonic/gin"

"go.uber.org/zap"
)

const (
Expand All @@ -46,6 +47,12 @@ var (
const is32Bit = uint64(^uintptr(0)) != ^uint64(0)

func init() {
var logErr error
logger, logErr = zap.NewProduction()
if logErr != nil {
panic(fmt.Sprintf("Failed to initialize logger: %v", logErr))
}

utils.LoadEnv()

if strings.ToLower(os.Getenv("DEBUG")) == "true" {
Expand All @@ -64,13 +71,14 @@ func init() {
Shard = namegen.New().Get()

ADDR := os.Getenv("REDIS_HOST") + ":" + os.Getenv("REDIS_PORT")
log.Println("Listening to redis on: " + ADDR)
logger.Info("Listening to Redis", zap.String("address", ADDR))
var err error
DbNum, err = strconv.Atoi(os.Getenv("REDIS_DB"))
if err != nil {
DbNum = 0 // Default to 0 if not set
logger.Warn("Invalid REDIS_DB value, defaulting to 0", zap.Error(err))
} else if DbNum < 0 || DbNum > 16 {
log.Fatalf("Redis DB must be between 0-16: %v", DbNum)
logger.Fatal("Redis DB must be between 0-16", zap.Int("DbNum", DbNum))
}

Client = redis.NewClient(&redis.Options{
Expand All @@ -91,10 +99,10 @@ func setupMockRedis() {
// Used for testing, "miniredis" is a mock Redis server that runs in-memory for testing purposes only (no persistence)
mr, err := miniredis.Run()
if err != nil {
log.Fatalf("Failed to start miniredis: %v", err)
logger.Fatal("Failed to start miniredis", zap.Error(err))
}

log.Println("Using miniredis for testing")
logger.Info("Using miniredis for testing")

// Connect clients to miniredis
Client = redis.NewClient(&redis.Options{
Expand Down Expand Up @@ -140,13 +148,13 @@ func CreateRouter() *gin.Engine {
r.Use(gin.Recovery()) // recover from panics and returns a 500 error
if os.Getenv("API_ANALYTICS_ENABLED") == "true" {
r.Use(analytics.Analytics(os.Getenv("API_ANALYTICS_KEY"))) // Add middleware
log.Println("Analytics enabled")
logger.Info("Analytics enabled")
}
route := r.Group("")
route.Use(middleware.Stats())
if os.Getenv("RATE_LIMIT_ENABLED") == "true" {
route.Use(middleware.RateLimit(RateLimitClient))
log.Println("Rate limiting enabled")
logger.Info("Rate limiting enabled")
}
// Define routes
r.NoRoute(func(c *gin.Context) {
Expand Down Expand Up @@ -195,8 +203,10 @@ func CreateRouter() *gin.Engine {
}

func main() {
defer logger.Sync()

if is32Bit {
log.Fatal("This program is not supported on 32-bit systems, " +
logger.Fatal("This program is not supported on 32-bit systems, " +
"please run on a 64-bit system. If you wish for 32-bit support, " +
"please open an issue on the GitHub repository.\nexiting...")
}
Expand All @@ -212,37 +222,38 @@ func main() {
Addr: ":" + os.Getenv("PORT"),
Handler: r,
}
fmt.Println("Listening on port " + os.Getenv("PORT"))
port := os.Getenv("PORT")
logger.Info("Listening on port", zap.String("port", port))

go func() {
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Fatalf("listen: %s\n", err)
logger.Fatal("Failed to start server", zap.Error(err))
}
}()

// Wait for interrupt signal
<-ctx.Done()
log.Println("Shutdown signal received")
logger.Info("Shutdown signal received")

// Signal StatsManager to save and wait for completion
log.Println("Signaling stats manager to save data...")
logger.Info("Signaling stats manager to save data...")
utils.ServerClose <- true

// Wait for the response on the same channel
log.Println("Waiting for stats manager to complete...")
logger.Info("Waiting for stats manager to complete...")
<-utils.ServerClose
log.Println("Stats saving confirmed complete")
logger.Info("Stats saving confirmed complete")

// Now close Redis connections
log.Println("Closing Redis connections...")
logger.Info("Closing Redis connections...")
if Client != nil {
if err := Client.Close(); err != nil {
log.Printf("Error closing Redis client: %v", err)
logger.Error("Error closing Redis client", zap.Error(err))
}
}
if RateLimitClient != nil {
if err := RateLimitClient.Close(); err != nil {
log.Printf("Error closing Redis rate limit client: %v", err)
logger.Error("Error closing Redis rate limit client", zap.Error(err))
}
}

Expand All @@ -251,12 +262,12 @@ func main() {
defer cancel()

if err := srv.Shutdown(shutdownCtx); err != nil {
log.Fatal("Server Shutdown:", err)
logger.Fatal("Server shutdown failed", zap.Error(err))
}

<-shutdownCtx.Done()
if errors.Is(shutdownCtx.Err(), context.DeadlineExceeded) {
log.Println("Shutdown timeout of 5 seconds exceeded")
logger.Warn("Shutdown timeout of 5 seconds exceeded")
}
log.Println("Server exiting")
logger.Info("Server exiting")
}
6 changes: 4 additions & 2 deletions middleware/stats.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package middleware

import (
"log"
"strings"
"sync/atomic"

"github.com/gin-gonic/gin"
"github.com/jasonlovesdoggo/abacus/utils"
"go.uber.org/zap"
)

var logger, _ = zap.NewProduction()

func formatPath(path string) string {
if len(path) == 0 {
return ""
Expand All @@ -23,7 +25,7 @@ func formatPath(path string) string {
func Stats() gin.HandlerFunc {
return func(c *gin.Context) {
if utils.StatsManager == nil {
log.Fatal("StatsManager not initialized. Call InitializeStatsManager first")
logger.Fatal("StatsManager not initialized. Call InitializeStatsManager first")
}

path := formatPath(c.Request.URL.Path)
Expand Down
25 changes: 18 additions & 7 deletions routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"io"
"log"
"math"
"net/http"
"strconv"
Expand All @@ -20,8 +19,21 @@ import (
"github.com/jasonlovesdoggo/abacus/utils"

"github.com/gin-gonic/gin"
"go.uber.org/zap"
)

var logger *zap.Logger

func init() {
// Initialize Zap logger
var err error
logger, err = zap.NewProduction()
if err != nil {
panic(fmt.Sprintf("Failed to initialize logger: %v", err))
}
defer logger.Sync() // Ensures logs are flushed properly
}

func StreamValueView(c *gin.Context) {
namespace, key := utils.GetNamespaceKey(c)
if namespace == "" || key == "" {
Expand Down Expand Up @@ -68,8 +80,7 @@ func StreamValueView(c *gin.Context) {
case utils.ValueEventServer.ClosedClients <- utils.KeyClientPair{Key: dbKey, Client: clientChan}:
// Successfully sent cleanup signal
case <-time.After(500 * time.Millisecond):
// Timed out waiting to send cleanup signal
log.Printf("Warning: Timed out sending cleanup signal for %s", dbKey)
logger.Warn("Timed out sending cleanup signal", zap.String("key", dbKey))
}
} else {
cleanupMutex.Unlock()
Expand All @@ -85,15 +96,15 @@ func StreamValueView(c *gin.Context) {
cleanupDone = true
cleanupMutex.Unlock()

log.Printf("Client disconnected for key %s, cleaning up", dbKey)
logger.Info("Client disconnected, cleaning up", zap.String("key", dbKey))

// Signal the event server to remove this client
select {
case utils.ValueEventServer.ClosedClients <- utils.KeyClientPair{Key: dbKey, Client: clientChan}:
// Successfully sent cleanup signal
case <-time.After(500 * time.Millisecond):
// Timed out waiting to send cleanup signal
log.Printf("Warning: Timed out sending cleanup signal for %s after disconnect", dbKey)
logger.Warn("Timed out sending cleanup signal after disconnect", zap.String("key", dbKey))
}
} else {
cleanupMutex.Unlock()
Expand All @@ -106,7 +117,7 @@ func StreamValueView(c *gin.Context) {
// Keep your exact format
_, err := c.Writer.WriteString(fmt.Sprintf("data: {\"value\":%d}\n\n", count))
if err != nil {
log.Printf("Error writing to client: %v", err)
logger.Error("Error writing to client", zap.Error(err))
return
}
c.Writer.Flush()
Expand All @@ -124,7 +135,7 @@ func StreamValueView(c *gin.Context) {
// Keep your exact format
_, err := c.Writer.WriteString(fmt.Sprintf("data: {\"value\":%d}\n\n", count))
if err != nil {
log.Printf("Error writing to client: %v", err)
logger.Error("Error writing to client", zap.Error(err))
return false
}
c.Writer.Flush()
Expand Down
28 changes: 21 additions & 7 deletions utils/sse.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package utils

import (
"log"
"sync"
"time"

"go.uber.org/zap"
)

type ValueEvent struct {
Expand All @@ -24,6 +25,7 @@ type KeyClientPair struct {
Client chan int
}


func NewValueEventServer() *ValueEvent {
event := &ValueEvent{
// Use buffered channels to prevent blocking
Expand All @@ -46,7 +48,10 @@ func (v *ValueEvent) listen() {
}
v.TotalClients[newClient.Key][newClient.Client] = true
v.Mu.Unlock()
log.Printf("Client added for key %s. Total clients: %d", newClient.Key, len(v.TotalClients[newClient.Key]))
logger.Info("Client added",
zap.String("key", newClient.Key),
zap.Int("total_clients", len(v.TotalClients[newClient.Key])),
)

case closedClient := <-v.ClosedClients:
v.Mu.Lock()
Expand All @@ -57,12 +62,12 @@ func (v *ValueEvent) listen() {
// Close channel safely
close(closedClient.Client)

log.Printf("Removed client for key %s", closedClient.Key)
logger.Info("Removed client", zap.String("key", closedClient.Key))

// Clean up key map if no more clients
if len(clients) == 0 {
delete(v.TotalClients, closedClient.Key)
log.Printf("No more clients for key %s, removed key entry", closedClient.Key)
logger.Info("No more clients, removed key entry", zap.String("key", closedClient.Key))
}
}
}
Expand Down Expand Up @@ -110,7 +115,7 @@ func (v *ValueEvent) listen() {
case v.ClosedClients <- KeyClientPair{Key: key, Client: client}:
// Success on retry
default:
log.Printf("Failed to remove client for key %s even after retry", key)
logger.Warn("Failed to remove client even after retry", zap.String("key", key))
}
}(keyValue.Key, failedClient)
}
Expand All @@ -133,6 +138,12 @@ func (v *ValueEvent) CountClientsForKey(key string) int {
var ValueEventServer *ValueEvent

func init() {
var err error
logger, err = zap.NewProduction()
if err != nil {
panic(err)
}

ValueEventServer = NewValueEventServer()
}

Expand All @@ -143,7 +154,7 @@ func SetStream(dbKey string, newValue int) {
case ValueEventServer.Message <- KeyValue{Key: dbKey, Value: newValue}:
// Message sent successfully
default:
log.Printf("Warning: Message channel full, update for key %s dropped", dbKey)
logger.Warn("Message channel full, update dropped", zap.String("key", dbKey))
}
}

Expand All @@ -168,6 +179,9 @@ func CloseStream(dbKey string) {
}

if len(channelsToClose) > 0 {
log.Printf("Closed all streams for key %s (%d clients)", dbKey, len(channelsToClose))
logger.Info("Closed all streams for",
zap.String("key", dbKey),
zap.Int("clients", len(channelsToClose)),
)
}
}
Loading