This package provides high-level, type-safe table abstractions built on top of the database layer. It offers two main implementations: a basic Table for direct database access and a CachedTable with in-memory caching for performance-critical applications.
The table package sits on top of the db package and integrates with the reconciler package:
Table Layer (table/)
↓
Database Layer (db/)
↓
MongoDB
Both implementations integrate with the reconciler infrastructure for key enumeration and change notifications.
A generic table abstraction providing type-safe operations without caching.
type Table[K any, E any] struct {
reconciler.ManagerImpl
col db.StoreCollection
}Type Parameters:
K: Key type (must not be a pointer)E: Entry/document type (must not be a pointer)
Features:
- Direct database access
- Type safety through Go generics
- Reconciler integration for change notifications
- Automatic key type registration
- Watch callback support
An enhanced table with in-memory caching for fast reads.
type CachedTable[K comparable, E any] struct {
reconciler.ManagerImpl
cacheMu sync.RWMutex
cache map[K]*E
col db.StoreCollection
}Type Parameters:
K: Key type (must be comparable for use as map key)E: Entry/document type (must not be a pointer)
Features:
- In-memory cache with
map[K]*E - Thread-safe cache access with
sync.RWMutex - Automatic cache synchronization via change streams
- Eager loading on initialization
- Separate methods for cached vs. direct database access
Implements the basic Table[K, E] type.
func (mgr *Table[K, E]) Initialize(
ctx context.Context,
col db.StoreCollection,
callback reconciler.CallbackFunc,
) errorSteps performed:
- Validates that K and E are not pointer types
- Registers the key type with the collection
- Sets up watch callback for change notifications
- Initializes the reconciler manager
- Starts watching for database changes
Insert:
func (mgr *Table[K, E]) Insert(ctx context.Context, key K, entry E) errorInserts a new entry. Returns errors.AlreadyExists if key exists.
Update:
func (mgr *Table[K, E]) Update(ctx context.Context, key K, entry E, upsert bool) errorUpdates an existing entry. If upsert=true, creates if not exists.
Locate (Upsert):
func (mgr *Table[K, E]) Locate(ctx context.Context, key K, entry E) errorConvenience method for upsert operations.
Find:
func (mgr *Table[K, E]) Find(ctx context.Context, key K) (*E, error)Retrieves a single entry by key. Returns pointer to entry or errors.NotFound.
FindMany:
func (mgr *Table[K, E]) FindMany(
ctx context.Context,
filter any,
entries *[]*E,
opts ...any,
) errorRetrieves multiple entries matching a filter. Supports pagination via options.
DeleteKey:
func (mgr *Table[K, E]) DeleteKey(ctx context.Context, key K) errorDeletes a single entry by key.
DeleteByFilter:
func (mgr *Table[K, E]) DeleteByFilter(ctx context.Context, filter any) (int64, error)Deletes multiple entries matching a filter. Returns count of deleted entries.
func (mgr *Table[K, E]) ReconcilerGetAllKeys(ctx context.Context) ([]any, error)Returns all keys in the table for reconciler enumeration.
Implements the CachedTable[K, E] type with in-memory caching.
func (mgr *CachedTable[K, E]) Initialize(
ctx context.Context,
col db.StoreCollection,
callback reconciler.CallbackFunc,
) errorSteps performed:
- Same validation and registration as
Table - Initializes empty cache
map[K]*E - Eager loads all entries from database into cache
- Sets up watch callback for cache synchronization
- Starts watching for database changes
Important: The cache is fully populated during initialization, which may take time for large tables.
Insert:
func (mgr *CachedTable[K, E]) Insert(ctx context.Context, key K, entry E) errorWrites to database. Cache updated via watch callback.
Update:
func (mgr *CachedTable[K, E]) Update(ctx context.Context, key K, entry E, upsert bool) errorWrites to database. Cache updated via watch callback.
Locate:
func (mgr *CachedTable[K, E]) Locate(ctx context.Context, key K, entry E) errorUpsert operation. Cache updated via watch callback.
Delete Operations:
func (mgr *CachedTable[K, E]) DeleteKey(ctx context.Context, key K) error
func (mgr *CachedTable[K, E]) DeleteByFilter(ctx context.Context, filter any) (int64, error)Writes to database. Cache updated via watch callback.
Find (Cached):
func (mgr *CachedTable[K, E]) Find(ctx context.Context, key K) (*E, error)Fast path: Returns directly from cache without database access.
- Uses
RLockfor concurrent read safety - Returns cached pointer or
errors.NotFound - No database I/O
DBFind (Direct):
func (mgr *CachedTable[K, E]) DBFind(ctx context.Context, key K) (*E, error)Bypass cache: Queries database directly.
- Use when you need guaranteed consistency
- Use when cache might be stale
DBFindMany:
func (mgr *CachedTable[K, E]) DBFindMany(
ctx context.Context,
filter any,
entries *[]*E,
opts ...any,
) errorQueries database with filter. Supports pagination options.
watchCallback (Internal):
func (mgr *CachedTable[K, E]) watchCallback(op string, key any) errorHandles change stream events to synchronize cache:
- Add/Update operations: Fetches entry from DB and updates cache
- Delete operations: Removes entry from cache
- Thread-safe with write lock during updates
Cache Synchronization Flow:
Database Change
↓
Watch Callback Triggered
↓
DBFind() to get latest data
↓
Lock cache with write lock
↓
Update/Delete cache entry
↓
Unlock cache
↓
Notify Reconciler
func (mgr *CachedTable[K, E]) ReconcilerGetAllKeys(ctx context.Context) ([]any, error)Returns all keys from cache (not database).
import (
"context"
"your-project/db"
"your-project/table"
)
// Define your entry type
type User struct {
Name string
Email string
Active bool
}
// Initialize database collection
client, _ := db.NewMongoClient(config)
col := client.GetCollection("mydb", "users")
// Create table
var userTable table.Table[string, User]
// Initialize with reconciler callback
callback := func(ctx context.Context, key any) error {
log.Printf("User changed: %v", key)
return nil
}
err := userTable.Initialize(ctx, col, callback)
// Insert user
user := User{Name: "Alice", Email: "alice@example.com", Active: true}
err = userTable.Insert(ctx, "alice", user)
// Find user
foundUser, err := userTable.Find(ctx, "alice")
if err != nil {
log.Fatal(err)
}
// Update user
foundUser.Email = "newemail@example.com"
err = userTable.Update(ctx, "alice", *foundUser, false)
// Delete user
err = userTable.DeleteKey(ctx, "alice")
// Find many with filter
var users []*User
filter := bson.M{"active": true}
err = userTable.FindMany(ctx, filter, &users)// Create cached table
var userCache table.CachedTable[string, User]
// Initialize (loads all entries into cache)
err := userCache.Initialize(ctx, col, callback)
// Fast cached reads (no database I/O)
user, err := userCache.Find(ctx, "alice") // Returns from cache
// Direct database access when needed
user, err := userCache.DBFind(ctx, "alice") // Bypasses cache
// Writes update database and cache automatically
newUser := User{Name: "Bob", Email: "bob@example.com", Active: true}
err = userCache.Insert(ctx, "bob", newUser)
// Cache automatically updated via watch callback
// Query database directly
var activeUsers []*User
filter := bson.M{"active": true}
err = userCache.DBFindMany(ctx, filter, &activeUsers)// Complex filter
filter := bson.M{
"active": true,
"email": bson.M{"$regex": "@example.com$"},
}
// Pagination options
opts := options.Find().
SetLimit(10).
SetSkip(20).
SetSort(bson.M{"name": 1})
var users []*User
err := userTable.FindMany(ctx, filter, &users, opts)// Delete all inactive users
filter := bson.M{"active": false}
count, err := userTable.DeleteByFilter(ctx, filter)
log.Printf("Deleted %d inactive users", count)Type safety through Go generics:
Table[K any, E any] // Flexible types
CachedTable[K comparable, E any] // K must support equalityCachedTable extends Table behavior with caching.
Change streams propagate updates:
DB Change → Watch Callback → Cache Update → Reconciler Notify
- Writes go directly to database
- Cache updated asynchronously via watch callbacks
- Eventual consistency model
Optimizes concurrent cache access:
- Multiple concurrent readers with
RLock() - Exclusive writer with
Lock()
- Safe for concurrent use (backed by thread-safe database operations)
- Cache reads: Multiple concurrent readers via
RLock() - Cache writes: Exclusive access via
Lock() - Database operations: Thread-safe through db layer
- No deadlocks: Lock held for minimal duration
- Reads: Direct database access (network I/O)
- Writes: Direct database access
- Best for: Write-heavy workloads, guaranteed consistency
- Initialization: O(n) - loads all entries
- Cached reads: O(1) - in-memory map lookup, no I/O
- Writes: Database I/O + eventual cache update
- Memory: O(n) - stores all entries in RAM
- Best for: Read-heavy workloads, large tables with frequent access
- Strong consistency: Always reads from database
- Immediate visibility: Writes immediately visible to all readers
- Eventual consistency: Cache updated asynchronously
- Typical lag: Milliseconds (depends on change stream latency)
- Consistency guarantees:
- Writes always go to database first
- Cache never has data that wasn't written
- Cache may be slightly stale (bounded staleness)
- Strong consistency is required
- Write-heavy workload
- Memory constraints (large tables)
- Fresh data more important than read speed
- Read-heavy workload (>90% reads)
- Acceptable eventual consistency
- Low read latency critical
- Table size fits comfortably in memory
- Frequent access to same keys
-
Type choices:
- Don't use pointer types for K or E
- K must be comparable for CachedTable
- Use simple types for keys (string, int, UUID)
-
Initialization:
- Initialize during startup, not per-request
- Handle initialization errors (database connectivity)
- Be aware CachedTable loads all data upfront
-
Error handling:
- Check for
errors.AlreadyExistson Insert - Check for
errors.NotFoundon Find - Handle context cancellation gracefully
- Check for
-
Cache usage:
- Use
Find()for cached reads (fast path) - Use
DBFind()when consistency matters - Consider cache size vs. memory available
- Use
-
Filters:
- Use MongoDB BSON filters
- Leverage indexes for better performance
- Use pagination for large result sets
-
Reconciler integration:
- Implement meaningful callback logic
- Keep callbacks fast (avoid blocking)
- Handle callback errors appropriately
See cached_generic_test.go for comprehensive unit tests covering:
- Initialization and eager loading
- Cache synchronization via watch callbacks
- Concurrent read/write operations
- Thread safety verification
- Cache eviction policies for memory-constrained environments
- Partial caching with LRU eviction
- Read-through cache fallback to database on cache miss
- Cache statistics for monitoring hit rates
- Batch operations for improved performance