From 3f3b8ac5f22d7c03693c3f27c2e587fb2c4b6655 Mon Sep 17 00:00:00 2001 From: Mariem Baccari Date: Wed, 29 Apr 2026 11:46:33 +0200 Subject: [PATCH 01/16] add raft store stubs --- cmds/core-service/main.go | 28 ++++++++------ pkg/aux_/store/store.go | 3 ++ pkg/rid/store/store.go | 3 ++ pkg/scd/store/raftstore/availability.go | 16 ++++++++ pkg/scd/store/raftstore/constraints.go | 24 ++++++++++++ pkg/scd/store/raftstore/doc.go | 3 ++ .../store/raftstore/operational_intents.go | 33 ++++++++++++++++ pkg/scd/store/raftstore/store.go | 38 +++++++++++++++++++ pkg/scd/store/raftstore/subscriptions.go | 38 +++++++++++++++++++ pkg/scd/store/store.go | 3 ++ 10 files changed, 177 insertions(+), 12 deletions(-) create mode 100644 pkg/scd/store/raftstore/availability.go create mode 100644 pkg/scd/store/raftstore/constraints.go create mode 100644 pkg/scd/store/raftstore/doc.go create mode 100644 pkg/scd/store/raftstore/operational_intents.go create mode 100644 pkg/scd/store/raftstore/store.go create mode 100644 pkg/scd/store/raftstore/subscriptions.go diff --git a/cmds/core-service/main.go b/cmds/core-service/main.go index 9eeed0746..9f8fe8bac 100644 --- a/cmds/core-service/main.go +++ b/cmds/core-service/main.go @@ -89,15 +89,16 @@ func createAuxServer(ctx context.Context, locality string, publicEndpoint string return nil, err } - repo, err := auxStore.Interact(ctx) - if err != nil { - return nil, stacktrace.Propagate(err, "Unable to interact with store") - } - - err = repo.SaveOwnMetadata(ctx, locality, publicEndpoint) - - if err != nil { - return nil, stacktrace.Propagate(err, "Unable to store current metadata") + if auxStore != nil { + repo, err := auxStore.Interact(ctx) + if err != nil { + return nil, stacktrace.Propagate(err, "Unable to interact with store") + } + if err = repo.SaveOwnMetadata(ctx, locality, publicEndpoint); err != nil { + return nil, stacktrace.Propagate(err, "Unable to store current metadata") + } + } else { + logger.Warn("aux store not available for this store type, skipping metadata save") } return &aux.Server{Store: auxStore, Locality: locality, ScdGlobalLock: scdGlobalLock}, nil @@ -110,9 +111,12 @@ func createRIDServers(ctx context.Context, locality string, logger *zap.Logger) return nil, nil, err } - _, err = ridStore.Interact(ctx) - if err != nil { - return nil, nil, stacktrace.Propagate(err, "Unable to interact with store") + if ridStore != nil { + if _, err = ridStore.Interact(ctx); err != nil { + return nil, nil, stacktrace.Propagate(err, "Unable to interact with store") + } + } else { + logger.Warn("RID store not available for this store type, RID functionality will not work") } app := application.NewFromTransactor(ridStore, logger) diff --git a/pkg/aux_/store/store.go b/pkg/aux_/store/store.go index 1a8e1be3e..565a07c4e 100644 --- a/pkg/aux_/store/store.go +++ b/pkg/aux_/store/store.go @@ -21,6 +21,9 @@ func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool) (Store, e switch storeType := params.GetStoreParameters().StoreType; storeType { case "sql": return auxsqlstore.Init(ctx, logger, withCheckCron) + case "raft": + logger.Warn("Raft store is not implemented for aux yet") + return nil, nil default: return nil, stacktrace.NewError("Unsupported store type %q for aux", storeType) } diff --git a/pkg/rid/store/store.go b/pkg/rid/store/store.go index 026846306..dd1396c54 100644 --- a/pkg/rid/store/store.go +++ b/pkg/rid/store/store.go @@ -21,6 +21,9 @@ func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool) (Store, e switch storeType { case "sql": return ridsqlstore.Init(ctx, logger, withCheckCron) + case "raft": + logger.Warn("Raft store is not implemented for RID yet") + return nil, nil default: return nil, stacktrace.NewError("Unsupported store type %q for rid", storeType) } diff --git a/pkg/scd/store/raftstore/availability.go b/pkg/scd/store/raftstore/availability.go new file mode 100644 index 000000000..6d5145e67 --- /dev/null +++ b/pkg/scd/store/raftstore/availability.go @@ -0,0 +1,16 @@ +package raftstore + +import ( + "context" + + dssmodels "github.com/interuss/dss/pkg/models" + scdmodels "github.com/interuss/dss/pkg/scd/models" +) + +func (r *repo) GetUssAvailability(_ context.Context, id dssmodels.Manager) (*scdmodels.UssAvailabilityStatus, error) { + panic("GetUssAvailability not yet implemented in raft store") +} + +func (r *repo) UpsertUssAvailability(_ context.Context, ussa *scdmodels.UssAvailabilityStatus) (*scdmodels.UssAvailabilityStatus, error) { + panic("UpsertUssAvailability not yet implemented in raft store") +} diff --git a/pkg/scd/store/raftstore/constraints.go b/pkg/scd/store/raftstore/constraints.go new file mode 100644 index 000000000..40cbf8bbe --- /dev/null +++ b/pkg/scd/store/raftstore/constraints.go @@ -0,0 +1,24 @@ +package raftstore + +import ( + "context" + + dssmodels "github.com/interuss/dss/pkg/models" + scdmodels "github.com/interuss/dss/pkg/scd/models" +) + +func (r *repo) SearchConstraints(_ context.Context, v4d *dssmodels.Volume4D) ([]*scdmodels.Constraint, error) { + panic("SearchConstraints not yet implemented in raft store") +} + +func (r *repo) GetConstraint(_ context.Context, id dssmodels.ID) (*scdmodels.Constraint, error) { + panic("GetConstraint not yet implemented in raft store") +} + +func (r *repo) UpsertConstraint(_ context.Context, constraint *scdmodels.Constraint) (*scdmodels.Constraint, error) { + panic("UpsertConstraint not yet implemented in raft store") +} + +func (r *repo) DeleteConstraint(_ context.Context, id dssmodels.ID) error { + panic("DeleteConstraint not yet implemented in raft store") +} diff --git a/pkg/scd/store/raftstore/doc.go b/pkg/scd/store/raftstore/doc.go new file mode 100644 index 000000000..c618174c4 --- /dev/null +++ b/pkg/scd/store/raftstore/doc.go @@ -0,0 +1,3 @@ +// Package scd.store.raftstore provides a full implementation of store.Store[scd.repos.Repository] +// for Raft-based in-memory data storage. +package raftstore diff --git a/pkg/scd/store/raftstore/operational_intents.go b/pkg/scd/store/raftstore/operational_intents.go new file mode 100644 index 000000000..6293d30f4 --- /dev/null +++ b/pkg/scd/store/raftstore/operational_intents.go @@ -0,0 +1,33 @@ +package raftstore + +import ( + "context" + "time" + + dssmodels "github.com/interuss/dss/pkg/models" + scdmodels "github.com/interuss/dss/pkg/scd/models" +) + +func (r *repo) GetOperationalIntent(_ context.Context, id dssmodels.ID) (*scdmodels.OperationalIntent, error) { + panic("GetOperationalIntent not yet implemented in raft store") +} + +func (r *repo) DeleteOperationalIntent(_ context.Context, id dssmodels.ID) error { + panic("DeleteOperationalIntent not yet implemented in raft store") +} + +func (r *repo) UpsertOperationalIntent(_ context.Context, operation *scdmodels.OperationalIntent) (*scdmodels.OperationalIntent, error) { + panic("UpsertOperationalIntent not yet implemented in raft store") +} + +func (r *repo) SearchOperationalIntents(_ context.Context, v4d *dssmodels.Volume4D) ([]*scdmodels.OperationalIntent, error) { + panic("SearchOperationalIntents not yet implemented in raft store") +} + +func (r *repo) GetDependentOperationalIntents(_ context.Context, subscriptionID dssmodels.ID) ([]dssmodels.ID, error) { + panic("GetDependentOperationalIntents not yet implemented in raft store") +} + +func (r *repo) ListExpiredOperationalIntents(_ context.Context, threshold time.Time) ([]*scdmodels.OperationalIntent, error) { + panic("ListExpiredOperationalIntents not yet implemented in raft store") +} diff --git a/pkg/scd/store/raftstore/store.go b/pkg/scd/store/raftstore/store.go new file mode 100644 index 000000000..50f330d88 --- /dev/null +++ b/pkg/scd/store/raftstore/store.go @@ -0,0 +1,38 @@ +package raftstore + +import ( + "context" + + "github.com/interuss/dss/pkg/scd/repos" + dssstore "github.com/interuss/dss/pkg/store" + "go.uber.org/zap" +) + +// Store implements store.Store[repos.Repository] for Raft-based in-memory storage. +type Store struct { + logger *zap.Logger +} + +// repo is a full implementation of scd.repos.Repository for Raft-based in-memory storage. +type repo struct { + store *Store +} + +func Init(logger *zap.Logger) (dssstore.Store[repos.Repository], error) { + return &Store{logger: logger}, nil +} + +func (s *Store) Close() error { + s.logger.Warn("raft store Close not yet implemented") + return nil +} + +func (s *Store) Interact(_ context.Context) (repos.Repository, error) { + s.logger.Warn("raft store Interact not yet implemented") + return &repo{store: s}, nil +} + +func (s *Store) Transact(ctx context.Context, f func(context.Context, repos.Repository) error) error { + s.logger.Warn("raft store Transact not yet implemented") + return f(ctx, &repo{store: s}) +} diff --git a/pkg/scd/store/raftstore/subscriptions.go b/pkg/scd/store/raftstore/subscriptions.go new file mode 100644 index 000000000..ef16dd910 --- /dev/null +++ b/pkg/scd/store/raftstore/subscriptions.go @@ -0,0 +1,38 @@ +package raftstore + +import ( + "context" + "time" + + "github.com/golang/geo/s2" + dssmodels "github.com/interuss/dss/pkg/models" + scdmodels "github.com/interuss/dss/pkg/scd/models" +) + +func (r *repo) SearchSubscriptions(_ context.Context, v4d *dssmodels.Volume4D) ([]*scdmodels.Subscription, error) { + panic("SearchSubscriptions not yet implemented in raft store") +} + +func (r *repo) GetSubscription(_ context.Context, id dssmodels.ID) (*scdmodels.Subscription, error) { + panic("GetSubscription not yet implemented in raft store") +} + +func (r *repo) UpsertSubscription(_ context.Context, sub *scdmodels.Subscription) (*scdmodels.Subscription, error) { + panic("UpsertSubscription not yet implemented in raft store") +} + +func (r *repo) DeleteSubscription(_ context.Context, id dssmodels.ID) error { + panic("DeleteSubscription not yet implemented in raft store") +} + +func (r *repo) IncrementNotificationIndices(_ context.Context, subscriptionIds []dssmodels.ID) ([]int, error) { + panic("IncrementNotificationIndices not yet implemented in raft store") +} + +func (r *repo) LockSubscriptionsOnCells(_ context.Context, cells s2.CellUnion, subscriptionIds []dssmodels.ID, startTime *time.Time, endTime *time.Time) error { + panic("LockSubscriptionsOnCells not yet implemented in raft store") +} + +func (r *repo) ListExpiredSubscriptions(_ context.Context, threshold time.Time) ([]*scdmodels.Subscription, error) { + panic("ListExpiredSubscriptions not yet implemented in raft store") +} diff --git a/pkg/scd/store/store.go b/pkg/scd/store/store.go index 9e977655d..8cf7388a0 100644 --- a/pkg/scd/store/store.go +++ b/pkg/scd/store/store.go @@ -4,6 +4,7 @@ import ( "context" "github.com/interuss/dss/pkg/scd/repos" + "github.com/interuss/dss/pkg/scd/store/raftstore" scdsqlstore "github.com/interuss/dss/pkg/scd/store/sqlstore" dssstore "github.com/interuss/dss/pkg/store" "github.com/interuss/dss/pkg/store/params" @@ -21,6 +22,8 @@ func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool, globalLoc switch storeType { case "sql": return scdsqlstore.Init(ctx, logger, withCheckCron, globalLock) + case "raft": + return raftstore.Init(logger) default: return nil, stacktrace.NewError("Unsupported store type %q for scd", storeType) } From d5b7fcc939422cadc65e8f5726919ca04ab9de10 Mon Sep 17 00:00:00 2001 From: Mariem Baccari Date: Wed, 29 Apr 2026 11:57:25 +0200 Subject: [PATCH 02/16] Store methods should panic --- pkg/scd/store/raftstore/store.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/pkg/scd/store/raftstore/store.go b/pkg/scd/store/raftstore/store.go index 50f330d88..2ffa05340 100644 --- a/pkg/scd/store/raftstore/store.go +++ b/pkg/scd/store/raftstore/store.go @@ -23,16 +23,13 @@ func Init(logger *zap.Logger) (dssstore.Store[repos.Repository], error) { } func (s *Store) Close() error { - s.logger.Warn("raft store Close not yet implemented") - return nil + panic("Close not yet implemented in raft store") } func (s *Store) Interact(_ context.Context) (repos.Repository, error) { - s.logger.Warn("raft store Interact not yet implemented") - return &repo{store: s}, nil + panic("Interact not yet implemented in raft store") } -func (s *Store) Transact(ctx context.Context, f func(context.Context, repos.Repository) error) error { - s.logger.Warn("raft store Transact not yet implemented") - return f(ctx, &repo{store: s}) +func (s *Store) Transact(_ context.Context, _ func(context.Context, repos.Repository) error) error { + panic("Transact not yet implemented in raft store") } From 93ccf26714aa79b8d48e469cdbca59c50a104b84 Mon Sep 17 00:00:00 2001 From: Mariem Baccari Date: Wed, 29 Apr 2026 13:24:27 +0200 Subject: [PATCH 03/16] fix lint --- pkg/scd/store/raftstore/doc.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/scd/store/raftstore/doc.go b/pkg/scd/store/raftstore/doc.go index c618174c4..2192c849a 100644 --- a/pkg/scd/store/raftstore/doc.go +++ b/pkg/scd/store/raftstore/doc.go @@ -1,3 +1,3 @@ // Package scd.store.raftstore provides a full implementation of store.Store[scd.repos.Repository] -// for Raft-based in-memory data storage. +// for Raft-based in-memory data storage. package raftstore From 506ecf403e91aebdaf20938948332b6e278955b7 Mon Sep 17 00:00:00 2001 From: Mariem Baccari Date: Wed, 29 Apr 2026 13:53:14 +0200 Subject: [PATCH 04/16] improve skipping --- cmds/core-service/main.go | 68 ++++++++++++++++++--------------------- 1 file changed, 32 insertions(+), 36 deletions(-) diff --git a/cmds/core-service/main.go b/cmds/core-service/main.go index 9f8fe8bac..02821f88b 100644 --- a/cmds/core-service/main.go +++ b/cmds/core-service/main.go @@ -31,6 +31,7 @@ import ( "github.com/interuss/dss/pkg/scd" scds "github.com/interuss/dss/pkg/scd/store" "github.com/interuss/dss/pkg/store" + "github.com/interuss/dss/pkg/store/params" "github.com/interuss/dss/pkg/version" "github.com/interuss/dss/pkg/versioning" "github.com/interuss/stacktrace" @@ -89,34 +90,27 @@ func createAuxServer(ctx context.Context, locality string, publicEndpoint string return nil, err } - if auxStore != nil { - repo, err := auxStore.Interact(ctx) - if err != nil { - return nil, stacktrace.Propagate(err, "Unable to interact with store") - } - if err = repo.SaveOwnMetadata(ctx, locality, publicEndpoint); err != nil { - return nil, stacktrace.Propagate(err, "Unable to store current metadata") - } - } else { - logger.Warn("aux store not available for this store type, skipping metadata save") + repo, err := auxStore.Interact(ctx) + if err != nil { + return nil, stacktrace.Propagate(err, "Unable to interact with store") + } + + err = repo.SaveOwnMetadata(ctx, locality, publicEndpoint) + if err != nil { + return nil, stacktrace.Propagate(err, "Unable to store current metadata") } return &aux.Server{Store: auxStore, Locality: locality, ScdGlobalLock: scdGlobalLock}, nil } func createRIDServers(ctx context.Context, locality string, logger *zap.Logger) (*rid_v1.Server, *rid_v2.Server, error) { - ridStore, err := rids.Init(ctx, logger, true) if err != nil { return nil, nil, err } - if ridStore != nil { - if _, err = ridStore.Interact(ctx); err != nil { - return nil, nil, stacktrace.Propagate(err, "Unable to interact with store") - } - } else { - logger.Warn("RID store not available for this store type, RID functionality will not work") + if _, err = ridStore.Interact(ctx); err != nil { + return nil, nil, stacktrace.Propagate(err, "Unable to interact with store") } app := application.NewFromTransactor(ridStore, logger) @@ -171,16 +165,18 @@ func RunHTTPServer(ctx context.Context, ctxCanceler func(), address, locality st ctx, ctxCancel := context.WithCancel(ctx) defer ctxCancel() - // Initialize aux - auxV1Server, err = createAuxServer(ctx, locality, *publicEndpoint, *scdGlobalLock, logger) - if err != nil { - return stacktrace.Propagate(err, "Failed to create aux server") - } - - // Initialize remote ID - ridV1Server, ridV2Server, err = createRIDServers(ctx, locality, logger) - if err != nil { - return stacktrace.Propagate(err, "Failed to create remote ID server") + if params.GetStoreParameters().StoreType != "raft" { + auxV1Server, err = createAuxServer(ctx, locality, *publicEndpoint, *scdGlobalLock, logger) + if err != nil { + return stacktrace.Propagate(err, "Failed to create aux server") + } + ridV1Server, ridV2Server, err = createRIDServers(ctx, locality, logger) + if err != nil { + return stacktrace.Propagate(err, "Failed to create remote ID server") + } + } else { + logger.Warn("aux and rid not supported by current store type, those endpoints will not be registered", + zap.String("store_type", params.GetStoreParameters().StoreType)) } // Initialize access token validation @@ -203,17 +199,17 @@ func RunHTTPServer(ctx context.Context, ctxCanceler func(), address, locality st return stacktrace.Propagate(err, "Error creating RSA authorizer") } - auxV1Router := apiauxv1.MakeAPIRouter(auxV1Server, authorizer) versioningV1Router := apiversioningv1.MakeAPIRouter(versioningV1Server, authorizer) - ridV1Router := apiridv1.MakeAPIRouter(ridV1Server, authorizer) - ridV2Router := apiridv2.MakeAPIRouter(ridV2Server, authorizer) multiRouter := api.MultiRouter{ - Routers: []api.PartialRouter{ - &auxV1Router, - &versioningV1Router, - &ridV1Router, - &ridV2Router, - }} + Routers: []api.PartialRouter{&versioningV1Router}, + } + + if params.GetStoreParameters().StoreType != "raft" { + auxV1Router := apiauxv1.MakeAPIRouter(auxV1Server, authorizer) + ridV1Router := apiridv1.MakeAPIRouter(ridV1Server, authorizer) + ridV2Router := apiridv2.MakeAPIRouter(ridV2Server, authorizer) + multiRouter.Routers = append(multiRouter.Routers, &auxV1Router, &ridV1Router, &ridV2Router) + } // Initialize strategic conflict detection if *enableSCD { From 8de30aed824e9907a4c176a42d8844867fe15412 Mon Sep 17 00:00:00 2001 From: Mariem Baccari Date: Wed, 29 Apr 2026 13:55:46 +0200 Subject: [PATCH 05/16] cleanup --- cmds/core-service/main.go | 12 ++++++++---- pkg/aux_/store/store.go | 3 --- pkg/rid/store/store.go | 3 --- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/cmds/core-service/main.go b/cmds/core-service/main.go index 02821f88b..5698f331c 100644 --- a/cmds/core-service/main.go +++ b/cmds/core-service/main.go @@ -96,6 +96,7 @@ func createAuxServer(ctx context.Context, locality string, publicEndpoint string } err = repo.SaveOwnMetadata(ctx, locality, publicEndpoint) + if err != nil { return nil, stacktrace.Propagate(err, "Unable to store current metadata") } @@ -104,12 +105,14 @@ func createAuxServer(ctx context.Context, locality string, publicEndpoint string } func createRIDServers(ctx context.Context, locality string, logger *zap.Logger) (*rid_v1.Server, *rid_v2.Server, error) { + ridStore, err := rids.Init(ctx, logger, true) if err != nil { return nil, nil, err } - if _, err = ridStore.Interact(ctx); err != nil { + _, err = ridStore.Interact(ctx) + if err != nil { return nil, nil, stacktrace.Propagate(err, "Unable to interact with store") } @@ -165,6 +168,7 @@ func RunHTTPServer(ctx context.Context, ctxCanceler func(), address, locality st ctx, ctxCancel := context.WithCancel(ctx) defer ctxCancel() + // Initialize aux and remote ID if implemented by the store if params.GetStoreParameters().StoreType != "raft" { auxV1Server, err = createAuxServer(ctx, locality, *publicEndpoint, *scdGlobalLock, logger) if err != nil { @@ -174,9 +178,6 @@ func RunHTTPServer(ctx context.Context, ctxCanceler func(), address, locality st if err != nil { return stacktrace.Propagate(err, "Failed to create remote ID server") } - } else { - logger.Warn("aux and rid not supported by current store type, those endpoints will not be registered", - zap.String("store_type", params.GetStoreParameters().StoreType)) } // Initialize access token validation @@ -209,6 +210,9 @@ func RunHTTPServer(ctx context.Context, ctxCanceler func(), address, locality st ridV1Router := apiridv1.MakeAPIRouter(ridV1Server, authorizer) ridV2Router := apiridv2.MakeAPIRouter(ridV2Server, authorizer) multiRouter.Routers = append(multiRouter.Routers, &auxV1Router, &ridV1Router, &ridV2Router) + } else { + logger.Warn("aux and remote ID not supported by current store type, those endpoints will not be registered", + zap.String("store_type", params.GetStoreParameters().StoreType)) } // Initialize strategic conflict detection diff --git a/pkg/aux_/store/store.go b/pkg/aux_/store/store.go index 565a07c4e..1a8e1be3e 100644 --- a/pkg/aux_/store/store.go +++ b/pkg/aux_/store/store.go @@ -21,9 +21,6 @@ func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool) (Store, e switch storeType := params.GetStoreParameters().StoreType; storeType { case "sql": return auxsqlstore.Init(ctx, logger, withCheckCron) - case "raft": - logger.Warn("Raft store is not implemented for aux yet") - return nil, nil default: return nil, stacktrace.NewError("Unsupported store type %q for aux", storeType) } diff --git a/pkg/rid/store/store.go b/pkg/rid/store/store.go index dd1396c54..026846306 100644 --- a/pkg/rid/store/store.go +++ b/pkg/rid/store/store.go @@ -21,9 +21,6 @@ func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool) (Store, e switch storeType { case "sql": return ridsqlstore.Init(ctx, logger, withCheckCron) - case "raft": - logger.Warn("Raft store is not implemented for RID yet") - return nil, nil default: return nil, stacktrace.NewError("Unsupported store type %q for rid", storeType) } From 07a8e8a4d6cb03c62951d2ba9e9cff51c0f42a2e Mon Sep 17 00:00:00 2001 From: Mariem Baccari Date: Wed, 29 Apr 2026 13:57:41 +0200 Subject: [PATCH 06/16] add RaftStoreType --- cmds/core-service/main.go | 4 ++-- pkg/scd/store/store.go | 2 +- pkg/store/params/params.go | 2 ++ 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/cmds/core-service/main.go b/cmds/core-service/main.go index 5698f331c..7eaab705a 100644 --- a/cmds/core-service/main.go +++ b/cmds/core-service/main.go @@ -169,7 +169,7 @@ func RunHTTPServer(ctx context.Context, ctxCanceler func(), address, locality st defer ctxCancel() // Initialize aux and remote ID if implemented by the store - if params.GetStoreParameters().StoreType != "raft" { + if params.GetStoreParameters().StoreType != params.RaftStoreType { auxV1Server, err = createAuxServer(ctx, locality, *publicEndpoint, *scdGlobalLock, logger) if err != nil { return stacktrace.Propagate(err, "Failed to create aux server") @@ -205,7 +205,7 @@ func RunHTTPServer(ctx context.Context, ctxCanceler func(), address, locality st Routers: []api.PartialRouter{&versioningV1Router}, } - if params.GetStoreParameters().StoreType != "raft" { + if params.GetStoreParameters().StoreType != params.RaftStoreType { auxV1Router := apiauxv1.MakeAPIRouter(auxV1Server, authorizer) ridV1Router := apiridv1.MakeAPIRouter(ridV1Server, authorizer) ridV2Router := apiridv2.MakeAPIRouter(ridV2Server, authorizer) diff --git a/pkg/scd/store/store.go b/pkg/scd/store/store.go index 8cf7388a0..fafbbcd20 100644 --- a/pkg/scd/store/store.go +++ b/pkg/scd/store/store.go @@ -22,7 +22,7 @@ func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool, globalLoc switch storeType { case "sql": return scdsqlstore.Init(ctx, logger, withCheckCron, globalLock) - case "raft": + case params.RaftStoreType: return raftstore.Init(logger) default: return nil, stacktrace.NewError("Unsupported store type %q for scd", storeType) diff --git a/pkg/store/params/params.go b/pkg/store/params/params.go index 48af00d19..efa3f1385 100644 --- a/pkg/store/params/params.go +++ b/pkg/store/params/params.go @@ -4,6 +4,8 @@ import ( "flag" ) +const RaftStoreType = "raft" + type ( // StoreParameters bundles up parameters used to configure store at a generic/top level. StoreParameters struct { From 46b6cf81b16a1bfa25d273becc59968731d5d758 Mon Sep 17 00:00:00 2001 From: Mariem Baccari Date: Wed, 29 Apr 2026 14:18:16 +0200 Subject: [PATCH 07/16] move struct definition --- pkg/scd/store/raftstore/store.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/scd/store/raftstore/store.go b/pkg/scd/store/raftstore/store.go index 2ffa05340..622684600 100644 --- a/pkg/scd/store/raftstore/store.go +++ b/pkg/scd/store/raftstore/store.go @@ -8,11 +8,6 @@ import ( "go.uber.org/zap" ) -// Store implements store.Store[repos.Repository] for Raft-based in-memory storage. -type Store struct { - logger *zap.Logger -} - // repo is a full implementation of scd.repos.Repository for Raft-based in-memory storage. type repo struct { store *Store @@ -22,6 +17,11 @@ func Init(logger *zap.Logger) (dssstore.Store[repos.Repository], error) { return &Store{logger: logger}, nil } +// Store implements store.Store[repos.Repository] for Raft-based in-memory storage. +type Store struct { + logger *zap.Logger +} + func (s *Store) Close() error { panic("Close not yet implemented in raft store") } From c37535fee245a5633ad4d5be26f014288fe3cb05 Mon Sep 17 00:00:00 2001 From: Mariem Baccari Date: Thu, 30 Apr 2026 11:36:59 +0200 Subject: [PATCH 08/16] use err type and const for sql --- cmds/core-service/main.go | 34 +++++++++++++++++--------------- pkg/aux_/store/store.go | 4 +++- pkg/rid/store/store.go | 4 +++- pkg/scd/store/raftstore/store.go | 20 +++---------------- pkg/scd/store/store.go | 2 +- pkg/store/params/params.go | 10 ++++++++-- 6 files changed, 36 insertions(+), 38 deletions(-) diff --git a/cmds/core-service/main.go b/cmds/core-service/main.go index 7eaab705a..806424a7b 100644 --- a/cmds/core-service/main.go +++ b/cmds/core-service/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "errors" "flag" "fmt" "net/http" @@ -168,16 +169,17 @@ func RunHTTPServer(ctx context.Context, ctxCanceler func(), address, locality st ctx, ctxCancel := context.WithCancel(ctx) defer ctxCancel() - // Initialize aux and remote ID if implemented by the store - if params.GetStoreParameters().StoreType != params.RaftStoreType { - auxV1Server, err = createAuxServer(ctx, locality, *publicEndpoint, *scdGlobalLock, logger) - if err != nil { - return stacktrace.Propagate(err, "Failed to create aux server") - } - ridV1Server, ridV2Server, err = createRIDServers(ctx, locality, logger) - if err != nil { - return stacktrace.Propagate(err, "Failed to create remote ID server") - } + auxV1Server, err = createAuxServer(ctx, locality, *publicEndpoint, *scdGlobalLock, logger) + if errors.Is(err, params.ErrUnsupportedStoreType) { + logger.Warn("aux not supported by current store, those endpoints will not be registered") + } else if err != nil { + return stacktrace.Propagate(err, "Failed to create aux server") + } + ridV1Server, ridV2Server, err = createRIDServers(ctx, locality, logger) + if errors.Is(err, params.ErrUnsupportedStoreType) { + logger.Warn("remote ID not supported by current store, those endpoints will not be registered") + } else if err != nil { + return stacktrace.Propagate(err, "Failed to create remote ID server") } // Initialize access token validation @@ -205,14 +207,14 @@ func RunHTTPServer(ctx context.Context, ctxCanceler func(), address, locality st Routers: []api.PartialRouter{&versioningV1Router}, } - if params.GetStoreParameters().StoreType != params.RaftStoreType { - auxV1Router := apiauxv1.MakeAPIRouter(auxV1Server, authorizer) + if ridV1Server != nil { ridV1Router := apiridv1.MakeAPIRouter(ridV1Server, authorizer) ridV2Router := apiridv2.MakeAPIRouter(ridV2Server, authorizer) - multiRouter.Routers = append(multiRouter.Routers, &auxV1Router, &ridV1Router, &ridV2Router) - } else { - logger.Warn("aux and remote ID not supported by current store type, those endpoints will not be registered", - zap.String("store_type", params.GetStoreParameters().StoreType)) + multiRouter.Routers = append(multiRouter.Routers, &ridV1Router, &ridV2Router) + } + if auxV1Server != nil { + auxV1Router := apiauxv1.MakeAPIRouter(auxV1Server, authorizer) + multiRouter.Routers = append(multiRouter.Routers, &auxV1Router) } // Initialize strategic conflict detection diff --git a/pkg/aux_/store/store.go b/pkg/aux_/store/store.go index 1a8e1be3e..7b31b7a84 100644 --- a/pkg/aux_/store/store.go +++ b/pkg/aux_/store/store.go @@ -19,8 +19,10 @@ type Store = dssstore.Store[repos.Repository] // Init selects and initializes the aux store backend. func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool) (Store, error) { switch storeType := params.GetStoreParameters().StoreType; storeType { - case "sql": + case params.SQLStoreType: return auxsqlstore.Init(ctx, logger, withCheckCron) + case params.RaftStoreType: + return nil, params.ErrUnsupportedStoreType default: return nil, stacktrace.NewError("Unsupported store type %q for aux", storeType) } diff --git a/pkg/rid/store/store.go b/pkg/rid/store/store.go index 026846306..afb7bad22 100644 --- a/pkg/rid/store/store.go +++ b/pkg/rid/store/store.go @@ -19,8 +19,10 @@ type Store = dssstore.Store[repos.Repository] func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool) (Store, error) { storeType := params.GetStoreParameters().StoreType switch storeType { - case "sql": + case params.SQLStoreType: return ridsqlstore.Init(ctx, logger, withCheckCron) + case params.RaftStoreType: + return nil, params.ErrUnsupportedStoreType default: return nil, stacktrace.NewError("Unsupported store type %q for rid", storeType) } diff --git a/pkg/scd/store/raftstore/store.go b/pkg/scd/store/raftstore/store.go index 622684600..f6e6bc53f 100644 --- a/pkg/scd/store/raftstore/store.go +++ b/pkg/scd/store/raftstore/store.go @@ -1,10 +1,8 @@ package raftstore import ( - "context" - + "github.com/interuss/dss/pkg/raftstore" "github.com/interuss/dss/pkg/scd/repos" - dssstore "github.com/interuss/dss/pkg/store" "go.uber.org/zap" ) @@ -13,23 +11,11 @@ type repo struct { store *Store } -func Init(logger *zap.Logger) (dssstore.Store[repos.Repository], error) { - return &Store{logger: logger}, nil +func Init(logger *zap.Logger) (*raftstore.Store[repos.Repository], error) { + return raftstore.Init } // Store implements store.Store[repos.Repository] for Raft-based in-memory storage. type Store struct { logger *zap.Logger } - -func (s *Store) Close() error { - panic("Close not yet implemented in raft store") -} - -func (s *Store) Interact(_ context.Context) (repos.Repository, error) { - panic("Interact not yet implemented in raft store") -} - -func (s *Store) Transact(_ context.Context, _ func(context.Context, repos.Repository) error) error { - panic("Transact not yet implemented in raft store") -} diff --git a/pkg/scd/store/store.go b/pkg/scd/store/store.go index fafbbcd20..717e1b331 100644 --- a/pkg/scd/store/store.go +++ b/pkg/scd/store/store.go @@ -20,7 +20,7 @@ type Store = dssstore.Store[repos.Repository] func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool, globalLock bool) (Store, error) { storeType := params.GetStoreParameters().StoreType switch storeType { - case "sql": + case params.SQLStoreType: return scdsqlstore.Init(ctx, logger, withCheckCron, globalLock) case params.RaftStoreType: return raftstore.Init(logger) diff --git a/pkg/store/params/params.go b/pkg/store/params/params.go index efa3f1385..24553d090 100644 --- a/pkg/store/params/params.go +++ b/pkg/store/params/params.go @@ -1,10 +1,16 @@ package params import ( + "errors" "flag" ) -const RaftStoreType = "raft" +const ( + RaftStoreType = "raft" + SQLStoreType = "sql" +) + +var ErrUnsupportedStoreType = errors.New("unsupported store type") type ( // StoreParameters bundles up parameters used to configure store at a generic/top level. @@ -18,7 +24,7 @@ var ( ) func init() { - flag.StringVar(&storeParameters.StoreType, "store_type", "sql", "Store type. Use 'sql' for CockroachDB/YugabyteDB") + flag.StringVar(&storeParameters.StoreType, "store_type", SQLStoreType, "Store type. Use 'sql' for CockroachDB/YugabyteDB and 'raft' for Raft-based store.") } // ConnectParameters returns a ConnectParameters instance that gets populated from well-known CLI flags. From 3326aa8534e5e85d20504acb115e07cd5af6950f Mon Sep 17 00:00:00 2001 From: Mariem Baccari Date: Mon, 4 May 2026 13:58:24 +0200 Subject: [PATCH 09/16] use errcode --- cmds/core-service/main.go | 6 ++---- pkg/aux_/store/store.go | 4 +--- pkg/rid/store/store.go | 4 +--- pkg/store/params/params.go | 3 --- pkg/store/store.go | 3 ++- 5 files changed, 6 insertions(+), 14 deletions(-) diff --git a/cmds/core-service/main.go b/cmds/core-service/main.go index 806424a7b..8ce06a9c6 100644 --- a/cmds/core-service/main.go +++ b/cmds/core-service/main.go @@ -2,7 +2,6 @@ package main import ( "context" - "errors" "flag" "fmt" "net/http" @@ -32,7 +31,6 @@ import ( "github.com/interuss/dss/pkg/scd" scds "github.com/interuss/dss/pkg/scd/store" "github.com/interuss/dss/pkg/store" - "github.com/interuss/dss/pkg/store/params" "github.com/interuss/dss/pkg/version" "github.com/interuss/dss/pkg/versioning" "github.com/interuss/stacktrace" @@ -170,13 +168,13 @@ func RunHTTPServer(ctx context.Context, ctxCanceler func(), address, locality st defer ctxCancel() auxV1Server, err = createAuxServer(ctx, locality, *publicEndpoint, *scdGlobalLock, logger) - if errors.Is(err, params.ErrUnsupportedStoreType) { + if stacktrace.GetCode(err) == store.CodeUnsupported { logger.Warn("aux not supported by current store, those endpoints will not be registered") } else if err != nil { return stacktrace.Propagate(err, "Failed to create aux server") } ridV1Server, ridV2Server, err = createRIDServers(ctx, locality, logger) - if errors.Is(err, params.ErrUnsupportedStoreType) { + if stacktrace.GetCode(err) == store.CodeUnsupported { logger.Warn("remote ID not supported by current store, those endpoints will not be registered") } else if err != nil { return stacktrace.Propagate(err, "Failed to create remote ID server") diff --git a/pkg/aux_/store/store.go b/pkg/aux_/store/store.go index 7b31b7a84..43d5e4d0e 100644 --- a/pkg/aux_/store/store.go +++ b/pkg/aux_/store/store.go @@ -21,9 +21,7 @@ func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool) (Store, e switch storeType := params.GetStoreParameters().StoreType; storeType { case params.SQLStoreType: return auxsqlstore.Init(ctx, logger, withCheckCron) - case params.RaftStoreType: - return nil, params.ErrUnsupportedStoreType default: - return nil, stacktrace.NewError("Unsupported store type %q for aux", storeType) + return nil, stacktrace.NewErrorWithCode(dssstore.CodeUnsupported, "Unsupported store type %q for aux", storeType) } } diff --git a/pkg/rid/store/store.go b/pkg/rid/store/store.go index afb7bad22..088801eda 100644 --- a/pkg/rid/store/store.go +++ b/pkg/rid/store/store.go @@ -21,9 +21,7 @@ func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool) (Store, e switch storeType { case params.SQLStoreType: return ridsqlstore.Init(ctx, logger, withCheckCron) - case params.RaftStoreType: - return nil, params.ErrUnsupportedStoreType default: - return nil, stacktrace.NewError("Unsupported store type %q for rid", storeType) + return nil, stacktrace.NewErrorWithCode(dssstore.CodeUnsupported, "Unsupported store type %q for rid", storeType) } } diff --git a/pkg/store/params/params.go b/pkg/store/params/params.go index 24553d090..a5242f528 100644 --- a/pkg/store/params/params.go +++ b/pkg/store/params/params.go @@ -1,7 +1,6 @@ package params import ( - "errors" "flag" ) @@ -10,8 +9,6 @@ const ( SQLStoreType = "sql" ) -var ErrUnsupportedStoreType = errors.New("unsupported store type") - type ( // StoreParameters bundles up parameters used to configure store at a generic/top level. StoreParameters struct { diff --git a/pkg/store/store.go b/pkg/store/store.go index e5e95c4b0..2495e9979 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -20,5 +20,6 @@ type Store[R any] interface { } const ( - CodeRetryable = stacktrace.ErrorCode(1) + CodeRetryable = stacktrace.ErrorCode(1) + CodeUnsupported = stacktrace.ErrorCode(2) ) From 95c46f5c92e1a9c8b3ce0d40a426568b6e8f9786 Mon Sep 17 00:00:00 2001 From: Mariem Baccari Date: Mon, 4 May 2026 17:03:07 +0200 Subject: [PATCH 10/16] move to pkg/raftstore --- pkg/scd/store/raftstore/store.go | 14 +++----------- pkg/scd/store/store.go | 2 +- 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/pkg/scd/store/raftstore/store.go b/pkg/scd/store/raftstore/store.go index f6e6bc53f..d1cb97804 100644 --- a/pkg/scd/store/raftstore/store.go +++ b/pkg/scd/store/raftstore/store.go @@ -3,19 +3,11 @@ package raftstore import ( "github.com/interuss/dss/pkg/raftstore" "github.com/interuss/dss/pkg/scd/repos" - "go.uber.org/zap" ) // repo is a full implementation of scd.repos.Repository for Raft-based in-memory storage. -type repo struct { - store *Store -} - -func Init(logger *zap.Logger) (*raftstore.Store[repos.Repository], error) { - return raftstore.Init -} +type repo struct{} -// Store implements store.Store[repos.Repository] for Raft-based in-memory storage. -type Store struct { - logger *zap.Logger +func Init() (*raftstore.Store[repos.Repository], error) { + return raftstore.Init[repos.Repository](), nil } diff --git a/pkg/scd/store/store.go b/pkg/scd/store/store.go index 717e1b331..1b6d12b00 100644 --- a/pkg/scd/store/store.go +++ b/pkg/scd/store/store.go @@ -23,7 +23,7 @@ func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool, globalLoc case params.SQLStoreType: return scdsqlstore.Init(ctx, logger, withCheckCron, globalLock) case params.RaftStoreType: - return raftstore.Init(logger) + return raftstore.Init() default: return nil, stacktrace.NewError("Unsupported store type %q for scd", storeType) } From 51750f0ecac81505c4043cf484dff8da8f85080c Mon Sep 17 00:00:00 2001 From: Mariem Baccari Date: Mon, 4 May 2026 18:53:54 +0200 Subject: [PATCH 11/16] formatted string for flag def --- pkg/store/params/params.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/store/params/params.go b/pkg/store/params/params.go index a5242f528..2741c94e3 100644 --- a/pkg/store/params/params.go +++ b/pkg/store/params/params.go @@ -2,6 +2,7 @@ package params import ( "flag" + "fmt" ) const ( @@ -21,7 +22,7 @@ var ( ) func init() { - flag.StringVar(&storeParameters.StoreType, "store_type", SQLStoreType, "Store type. Use 'sql' for CockroachDB/YugabyteDB and 'raft' for Raft-based store.") + flag.StringVar(&storeParameters.StoreType, "store_type", SQLStoreType, fmt.Sprintf("Store type. Use '%s' for CockroachDB/YugabyteDB and '%s' for Raft-based store.", SQLStoreType, RaftStoreType)) } // ConnectParameters returns a ConnectParameters instance that gets populated from well-known CLI flags. From 6e86c0cadbe4f5ebb8be773d8b750c885cf48187 Mon Sep 17 00:00:00 2001 From: Mariem Baccari Date: Mon, 4 May 2026 18:58:56 +0200 Subject: [PATCH 12/16] add raftstore dir --- pkg/raftstore/store.go | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 pkg/raftstore/store.go diff --git a/pkg/raftstore/store.go b/pkg/raftstore/store.go new file mode 100644 index 000000000..900ae5482 --- /dev/null +++ b/pkg/raftstore/store.go @@ -0,0 +1,26 @@ +package raftstore + +import ( + "context" +) + +type Store[R any] struct{} + +func Init[R any]() *Store[R] { + return &Store[R]{} +} + +// Transact proposes the entry to Raft and blocks until it is committed and applied. +func (s *Store[R]) Transact(ctx context.Context, f func(context.Context, R) error) error { + panic("not implemented") +} + +// Interact returns a read-only repository that can be used to query the store without proposing a Raft entry. +func (s *Store[R]) Interact(_ context.Context) (R, error) { + panic("not implemented") +} + +// Close shuts down the consensus instance. +func (s *Store[R]) Close() error { + panic("not implemented") +} From f8252016206cadc4e08a1c184dc22dde84fef987 Mon Sep 17 00:00:00 2001 From: Mariem Baccari Date: Mon, 4 May 2026 19:06:30 +0200 Subject: [PATCH 13/16] add back comments --- cmds/core-service/main.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cmds/core-service/main.go b/cmds/core-service/main.go index 8ce06a9c6..8cff4d0c0 100644 --- a/cmds/core-service/main.go +++ b/cmds/core-service/main.go @@ -167,12 +167,15 @@ func RunHTTPServer(ctx context.Context, ctxCanceler func(), address, locality st ctx, ctxCancel := context.WithCancel(ctx) defer ctxCancel() + // Initialize aux auxV1Server, err = createAuxServer(ctx, locality, *publicEndpoint, *scdGlobalLock, logger) if stacktrace.GetCode(err) == store.CodeUnsupported { logger.Warn("aux not supported by current store, those endpoints will not be registered") } else if err != nil { return stacktrace.Propagate(err, "Failed to create aux server") } + + // Initialize remote ID ridV1Server, ridV2Server, err = createRIDServers(ctx, locality, logger) if stacktrace.GetCode(err) == store.CodeUnsupported { logger.Warn("remote ID not supported by current store, those endpoints will not be registered") From 5d6b9ed94942be8e1b717dc949c9b7bbd2f4cb12 Mon Sep 17 00:00:00 2001 From: Mariem Baccari Date: Tue, 5 May 2026 09:01:30 +0200 Subject: [PATCH 14/16] address max's comments --- cmds/core-service/main.go | 18 +++++++++++------- pkg/raftstore/store.go | 2 +- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/cmds/core-service/main.go b/cmds/core-service/main.go index 8cff4d0c0..53678a080 100644 --- a/cmds/core-service/main.go +++ b/cmds/core-service/main.go @@ -183,6 +183,16 @@ func RunHTTPServer(ctx context.Context, ctxCanceler func(), address, locality st return stacktrace.Propagate(err, "Failed to create remote ID server") } + // Initialize strategic conflict detection + if *enableSCD { + scdV1Server, err = createSCDServer(ctx, logger) + if stacktrace.GetCode(err) == store.CodeUnsupported { + logger.Warn("strategic conflict detection not supported by current store, those endpoints will not be registered") + } else if err != nil { + return stacktrace.Propagate(err, "Failed to create strategic conflict detection server") + } + } + // Initialize access token validation keyResolver, err := createKeyResolver() switch { @@ -218,13 +228,7 @@ func RunHTTPServer(ctx context.Context, ctxCanceler func(), address, locality st multiRouter.Routers = append(multiRouter.Routers, &auxV1Router) } - // Initialize strategic conflict detection - if *enableSCD { - scdV1Server, err = createSCDServer(ctx, logger) - if err != nil { - return stacktrace.Propagate(err, "Failed to create strategic conflict detection server") - } - + if scdV1Server != nil { scdV1Router := apiscdv1.MakeAPIRouter(scdV1Server, authorizer) multiRouter.Routers = append(multiRouter.Routers, &scdV1Router) } diff --git a/pkg/raftstore/store.go b/pkg/raftstore/store.go index 900ae5482..5a46caa81 100644 --- a/pkg/raftstore/store.go +++ b/pkg/raftstore/store.go @@ -15,7 +15,7 @@ func (s *Store[R]) Transact(ctx context.Context, f func(context.Context, R) erro panic("not implemented") } -// Interact returns a read-only repository that can be used to query the store without proposing a Raft entry. +// Interact returns a repository that can be used to query the store without proposing a Raft entry. func (s *Store[R]) Interact(_ context.Context) (R, error) { panic("not implemented") } From 9fe813cb676973d658fc86b7bfdba19bf7a2b831 Mon Sep 17 00:00:00 2001 From: Mariem Baccari Date: Tue, 5 May 2026 13:42:43 +0200 Subject: [PATCH 15/16] address Mike's comments --- cmds/core-service/main.go | 7 ++++--- pkg/aux_/store/store.go | 5 ++++- pkg/rid/store/store.go | 5 ++++- pkg/scd/store/raftstore/availability.go | 6 ++++-- pkg/scd/store/raftstore/constraints.go | 10 ++++++---- pkg/scd/store/raftstore/operational_intents.go | 14 ++++++++------ pkg/scd/store/raftstore/subscriptions.go | 16 +++++++++------- pkg/store/store.go | 3 +-- 8 files changed, 40 insertions(+), 26 deletions(-) diff --git a/cmds/core-service/main.go b/cmds/core-service/main.go index 53678a080..9de7eaebe 100644 --- a/cmds/core-service/main.go +++ b/cmds/core-service/main.go @@ -23,6 +23,7 @@ import ( aux "github.com/interuss/dss/pkg/aux_" auxs "github.com/interuss/dss/pkg/aux_/store" "github.com/interuss/dss/pkg/build" + dsserr "github.com/interuss/dss/pkg/errors" "github.com/interuss/dss/pkg/logging" "github.com/interuss/dss/pkg/rid/application" rid_v1 "github.com/interuss/dss/pkg/rid/server/v1" @@ -169,7 +170,7 @@ func RunHTTPServer(ctx context.Context, ctxCanceler func(), address, locality st // Initialize aux auxV1Server, err = createAuxServer(ctx, locality, *publicEndpoint, *scdGlobalLock, logger) - if stacktrace.GetCode(err) == store.CodeUnsupported { + if stacktrace.GetCode(err) == dsserr.NotImplemented { logger.Warn("aux not supported by current store, those endpoints will not be registered") } else if err != nil { return stacktrace.Propagate(err, "Failed to create aux server") @@ -177,7 +178,7 @@ func RunHTTPServer(ctx context.Context, ctxCanceler func(), address, locality st // Initialize remote ID ridV1Server, ridV2Server, err = createRIDServers(ctx, locality, logger) - if stacktrace.GetCode(err) == store.CodeUnsupported { + if stacktrace.GetCode(err) == dsserr.NotImplemented { logger.Warn("remote ID not supported by current store, those endpoints will not be registered") } else if err != nil { return stacktrace.Propagate(err, "Failed to create remote ID server") @@ -186,7 +187,7 @@ func RunHTTPServer(ctx context.Context, ctxCanceler func(), address, locality st // Initialize strategic conflict detection if *enableSCD { scdV1Server, err = createSCDServer(ctx, logger) - if stacktrace.GetCode(err) == store.CodeUnsupported { + if stacktrace.GetCode(err) == dsserr.NotImplemented { logger.Warn("strategic conflict detection not supported by current store, those endpoints will not be registered") } else if err != nil { return stacktrace.Propagate(err, "Failed to create strategic conflict detection server") diff --git a/pkg/aux_/store/store.go b/pkg/aux_/store/store.go index 43d5e4d0e..9d3d30e7a 100644 --- a/pkg/aux_/store/store.go +++ b/pkg/aux_/store/store.go @@ -5,6 +5,7 @@ import ( "github.com/interuss/dss/pkg/aux_/repos" auxsqlstore "github.com/interuss/dss/pkg/aux_/store/sqlstore" + dsserr "github.com/interuss/dss/pkg/errors" dssstore "github.com/interuss/dss/pkg/store" "github.com/interuss/dss/pkg/store/params" "github.com/interuss/stacktrace" @@ -21,7 +22,9 @@ func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool) (Store, e switch storeType := params.GetStoreParameters().StoreType; storeType { case params.SQLStoreType: return auxsqlstore.Init(ctx, logger, withCheckCron) + case params.RaftStoreType: + return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "Raft store not yet implemented for aux") default: - return nil, stacktrace.NewErrorWithCode(dssstore.CodeUnsupported, "Unsupported store type %q for aux", storeType) + return nil, stacktrace.NewError("Unsupported store type %q for aux", storeType) } } diff --git a/pkg/rid/store/store.go b/pkg/rid/store/store.go index 088801eda..835164cf7 100644 --- a/pkg/rid/store/store.go +++ b/pkg/rid/store/store.go @@ -3,6 +3,7 @@ package store import ( "context" + dsserr "github.com/interuss/dss/pkg/errors" "github.com/interuss/dss/pkg/rid/repos" ridsqlstore "github.com/interuss/dss/pkg/rid/store/sqlstore" dssstore "github.com/interuss/dss/pkg/store" @@ -21,7 +22,9 @@ func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool) (Store, e switch storeType { case params.SQLStoreType: return ridsqlstore.Init(ctx, logger, withCheckCron) + case params.RaftStoreType: + return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "Raft store not yet implemented for remote ID") default: - return nil, stacktrace.NewErrorWithCode(dssstore.CodeUnsupported, "Unsupported store type %q for rid", storeType) + return nil, stacktrace.NewError("Unsupported store type %q for rid", storeType) } } diff --git a/pkg/scd/store/raftstore/availability.go b/pkg/scd/store/raftstore/availability.go index 6d5145e67..3750bf34b 100644 --- a/pkg/scd/store/raftstore/availability.go +++ b/pkg/scd/store/raftstore/availability.go @@ -3,14 +3,16 @@ package raftstore import ( "context" + dsserr "github.com/interuss/dss/pkg/errors" dssmodels "github.com/interuss/dss/pkg/models" scdmodels "github.com/interuss/dss/pkg/scd/models" + "github.com/interuss/stacktrace" ) func (r *repo) GetUssAvailability(_ context.Context, id dssmodels.Manager) (*scdmodels.UssAvailabilityStatus, error) { - panic("GetUssAvailability not yet implemented in raft store") + return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "GetUssAvailability not yet implemented in raft store") } func (r *repo) UpsertUssAvailability(_ context.Context, ussa *scdmodels.UssAvailabilityStatus) (*scdmodels.UssAvailabilityStatus, error) { - panic("UpsertUssAvailability not yet implemented in raft store") + return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "UpsertUssAvailability not yet implemented in raft store") } diff --git a/pkg/scd/store/raftstore/constraints.go b/pkg/scd/store/raftstore/constraints.go index 40cbf8bbe..72ac25709 100644 --- a/pkg/scd/store/raftstore/constraints.go +++ b/pkg/scd/store/raftstore/constraints.go @@ -3,22 +3,24 @@ package raftstore import ( "context" + dsserr "github.com/interuss/dss/pkg/errors" dssmodels "github.com/interuss/dss/pkg/models" scdmodels "github.com/interuss/dss/pkg/scd/models" + "github.com/interuss/stacktrace" ) func (r *repo) SearchConstraints(_ context.Context, v4d *dssmodels.Volume4D) ([]*scdmodels.Constraint, error) { - panic("SearchConstraints not yet implemented in raft store") + return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "SearchConstraints not yet implemented in raft store") } func (r *repo) GetConstraint(_ context.Context, id dssmodels.ID) (*scdmodels.Constraint, error) { - panic("GetConstraint not yet implemented in raft store") + return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "GetConstraint not yet implemented in raft store") } func (r *repo) UpsertConstraint(_ context.Context, constraint *scdmodels.Constraint) (*scdmodels.Constraint, error) { - panic("UpsertConstraint not yet implemented in raft store") + return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "UpsertConstraint not yet implemented in raft store") } func (r *repo) DeleteConstraint(_ context.Context, id dssmodels.ID) error { - panic("DeleteConstraint not yet implemented in raft store") + return stacktrace.NewErrorWithCode(dsserr.NotImplemented, "DeleteConstraint not yet implemented in raft store") } diff --git a/pkg/scd/store/raftstore/operational_intents.go b/pkg/scd/store/raftstore/operational_intents.go index 6293d30f4..93620927c 100644 --- a/pkg/scd/store/raftstore/operational_intents.go +++ b/pkg/scd/store/raftstore/operational_intents.go @@ -4,30 +4,32 @@ import ( "context" "time" + dsserr "github.com/interuss/dss/pkg/errors" dssmodels "github.com/interuss/dss/pkg/models" scdmodels "github.com/interuss/dss/pkg/scd/models" + "github.com/interuss/stacktrace" ) func (r *repo) GetOperationalIntent(_ context.Context, id dssmodels.ID) (*scdmodels.OperationalIntent, error) { - panic("GetOperationalIntent not yet implemented in raft store") + return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "GetOperationalIntent not yet implemented in raft store") } func (r *repo) DeleteOperationalIntent(_ context.Context, id dssmodels.ID) error { - panic("DeleteOperationalIntent not yet implemented in raft store") + return stacktrace.NewErrorWithCode(dsserr.NotImplemented, "DeleteOperationalIntent not yet implemented in raft store") } func (r *repo) UpsertOperationalIntent(_ context.Context, operation *scdmodels.OperationalIntent) (*scdmodels.OperationalIntent, error) { - panic("UpsertOperationalIntent not yet implemented in raft store") + return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "UpsertOperationalIntent not yet implemented in raft store") } func (r *repo) SearchOperationalIntents(_ context.Context, v4d *dssmodels.Volume4D) ([]*scdmodels.OperationalIntent, error) { - panic("SearchOperationalIntents not yet implemented in raft store") + return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "SearchOperationalIntents not yet implemented in raft store") } func (r *repo) GetDependentOperationalIntents(_ context.Context, subscriptionID dssmodels.ID) ([]dssmodels.ID, error) { - panic("GetDependentOperationalIntents not yet implemented in raft store") + return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "GetDependentOperationalIntents not yet implemented in raft store") } func (r *repo) ListExpiredOperationalIntents(_ context.Context, threshold time.Time) ([]*scdmodels.OperationalIntent, error) { - panic("ListExpiredOperationalIntents not yet implemented in raft store") + return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "ListExpiredOperationalIntents not yet implemented in raft store") } diff --git a/pkg/scd/store/raftstore/subscriptions.go b/pkg/scd/store/raftstore/subscriptions.go index ef16dd910..0b7477b39 100644 --- a/pkg/scd/store/raftstore/subscriptions.go +++ b/pkg/scd/store/raftstore/subscriptions.go @@ -5,34 +5,36 @@ import ( "time" "github.com/golang/geo/s2" + dsserr "github.com/interuss/dss/pkg/errors" dssmodels "github.com/interuss/dss/pkg/models" scdmodels "github.com/interuss/dss/pkg/scd/models" + "github.com/interuss/stacktrace" ) func (r *repo) SearchSubscriptions(_ context.Context, v4d *dssmodels.Volume4D) ([]*scdmodels.Subscription, error) { - panic("SearchSubscriptions not yet implemented in raft store") + return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "SearchSubscriptions not yet implemented in raft store") } func (r *repo) GetSubscription(_ context.Context, id dssmodels.ID) (*scdmodels.Subscription, error) { - panic("GetSubscription not yet implemented in raft store") + return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "GetSubscription not yet implemented in raft store") } func (r *repo) UpsertSubscription(_ context.Context, sub *scdmodels.Subscription) (*scdmodels.Subscription, error) { - panic("UpsertSubscription not yet implemented in raft store") + return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "UpsertSubscription not yet implemented in raft store") } func (r *repo) DeleteSubscription(_ context.Context, id dssmodels.ID) error { - panic("DeleteSubscription not yet implemented in raft store") + return stacktrace.NewErrorWithCode(dsserr.NotImplemented, "DeleteSubscription not yet implemented in raft store") } func (r *repo) IncrementNotificationIndices(_ context.Context, subscriptionIds []dssmodels.ID) ([]int, error) { - panic("IncrementNotificationIndices not yet implemented in raft store") + return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "IncrementNotificationIndices not yet implemented in raft store") } func (r *repo) LockSubscriptionsOnCells(_ context.Context, cells s2.CellUnion, subscriptionIds []dssmodels.ID, startTime *time.Time, endTime *time.Time) error { - panic("LockSubscriptionsOnCells not yet implemented in raft store") + return stacktrace.NewErrorWithCode(dsserr.NotImplemented, "LockSubscriptionsOnCells not yet implemented in raft store") } func (r *repo) ListExpiredSubscriptions(_ context.Context, threshold time.Time) ([]*scdmodels.Subscription, error) { - panic("ListExpiredSubscriptions not yet implemented in raft store") + return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "ListExpiredSubscriptions not yet implemented in raft store") } diff --git a/pkg/store/store.go b/pkg/store/store.go index 2495e9979..e5e95c4b0 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -20,6 +20,5 @@ type Store[R any] interface { } const ( - CodeRetryable = stacktrace.ErrorCode(1) - CodeUnsupported = stacktrace.ErrorCode(2) + CodeRetryable = stacktrace.ErrorCode(1) ) From 063810c278e68157348ca29847b616f88c0a208f Mon Sep 17 00:00:00 2001 From: Mariem Baccari Date: Tue, 5 May 2026 13:49:12 +0200 Subject: [PATCH 16/16] remove panics from store.go --- pkg/raftstore/store.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/pkg/raftstore/store.go b/pkg/raftstore/store.go index 5a46caa81..ad69f11cc 100644 --- a/pkg/raftstore/store.go +++ b/pkg/raftstore/store.go @@ -2,6 +2,9 @@ package raftstore import ( "context" + + dsserr "github.com/interuss/dss/pkg/errors" + "github.com/interuss/stacktrace" ) type Store[R any] struct{} @@ -12,15 +15,16 @@ func Init[R any]() *Store[R] { // Transact proposes the entry to Raft and blocks until it is committed and applied. func (s *Store[R]) Transact(ctx context.Context, f func(context.Context, R) error) error { - panic("not implemented") + return stacktrace.NewErrorWithCode(dsserr.NotImplemented, "Transact not yet implemented for Raft store") } // Interact returns a repository that can be used to query the store without proposing a Raft entry. func (s *Store[R]) Interact(_ context.Context) (R, error) { - panic("not implemented") + var empty R + return empty, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "Interact not yet implemented for Raft store") } // Close shuts down the consensus instance. func (s *Store[R]) Close() error { - panic("not implemented") + return stacktrace.NewErrorWithCode(dsserr.NotImplemented, "Close not yet implemented for Raft store") }