diff --git a/cmds/core-service/main.go b/cmds/core-service/main.go index 9eeed0746..9de7eaebe 100644 --- a/cmds/core-service/main.go +++ b/cmds/core-service/main.go @@ -23,6 +23,7 @@ import ( aux "github.com/interuss/dss/pkg/aux_" auxs "github.com/interuss/dss/pkg/aux_/store" "github.com/interuss/dss/pkg/build" + dsserr "github.com/interuss/dss/pkg/errors" "github.com/interuss/dss/pkg/logging" "github.com/interuss/dss/pkg/rid/application" rid_v1 "github.com/interuss/dss/pkg/rid/server/v1" @@ -169,16 +170,30 @@ func RunHTTPServer(ctx context.Context, ctxCanceler func(), address, locality st // Initialize aux auxV1Server, err = createAuxServer(ctx, locality, *publicEndpoint, *scdGlobalLock, logger) - if err != nil { + if stacktrace.GetCode(err) == dsserr.NotImplemented { + logger.Warn("aux not supported by current store, those endpoints will not be registered") + } else if err != nil { return stacktrace.Propagate(err, "Failed to create aux server") } // Initialize remote ID ridV1Server, ridV2Server, err = createRIDServers(ctx, locality, logger) - if err != nil { + if stacktrace.GetCode(err) == dsserr.NotImplemented { + logger.Warn("remote ID not supported by current store, those endpoints will not be registered") + } else if err != nil { return stacktrace.Propagate(err, "Failed to create remote ID server") } + // Initialize strategic conflict detection + if *enableSCD { + scdV1Server, err = createSCDServer(ctx, logger) + if stacktrace.GetCode(err) == dsserr.NotImplemented { + logger.Warn("strategic conflict detection not supported by current store, those endpoints will not be registered") + } else if err != nil { + return stacktrace.Propagate(err, "Failed to create strategic conflict detection server") + } + } + // Initialize access token validation keyResolver, err := createKeyResolver() switch { @@ -199,25 +214,22 @@ func RunHTTPServer(ctx context.Context, ctxCanceler func(), address, locality st return stacktrace.Propagate(err, "Error creating RSA authorizer") } - auxV1Router := apiauxv1.MakeAPIRouter(auxV1Server, authorizer) versioningV1Router := apiversioningv1.MakeAPIRouter(versioningV1Server, authorizer) - ridV1Router := apiridv1.MakeAPIRouter(ridV1Server, authorizer) - ridV2Router := apiridv2.MakeAPIRouter(ridV2Server, authorizer) multiRouter := api.MultiRouter{ - Routers: []api.PartialRouter{ - &auxV1Router, - &versioningV1Router, - &ridV1Router, - &ridV2Router, - }} + Routers: []api.PartialRouter{&versioningV1Router}, + } - // Initialize strategic conflict detection - if *enableSCD { - scdV1Server, err = createSCDServer(ctx, logger) - if err != nil { - return stacktrace.Propagate(err, "Failed to create strategic conflict detection server") - } + if ridV1Server != nil { + ridV1Router := apiridv1.MakeAPIRouter(ridV1Server, authorizer) + ridV2Router := apiridv2.MakeAPIRouter(ridV2Server, authorizer) + multiRouter.Routers = append(multiRouter.Routers, &ridV1Router, &ridV2Router) + } + if auxV1Server != nil { + auxV1Router := apiauxv1.MakeAPIRouter(auxV1Server, authorizer) + multiRouter.Routers = append(multiRouter.Routers, &auxV1Router) + } + if scdV1Server != nil { scdV1Router := apiscdv1.MakeAPIRouter(scdV1Server, authorizer) multiRouter.Routers = append(multiRouter.Routers, &scdV1Router) } diff --git a/pkg/aux_/store/store.go b/pkg/aux_/store/store.go index 1a8e1be3e..9d3d30e7a 100644 --- a/pkg/aux_/store/store.go +++ b/pkg/aux_/store/store.go @@ -5,6 +5,7 @@ import ( "github.com/interuss/dss/pkg/aux_/repos" auxsqlstore "github.com/interuss/dss/pkg/aux_/store/sqlstore" + dsserr "github.com/interuss/dss/pkg/errors" dssstore "github.com/interuss/dss/pkg/store" "github.com/interuss/dss/pkg/store/params" "github.com/interuss/stacktrace" @@ -19,8 +20,10 @@ type Store = dssstore.Store[repos.Repository] // Init selects and initializes the aux store backend. func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool) (Store, error) { switch storeType := params.GetStoreParameters().StoreType; storeType { - case "sql": + case params.SQLStoreType: return auxsqlstore.Init(ctx, logger, withCheckCron) + case params.RaftStoreType: + return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "Raft store not yet implemented for aux") default: return nil, stacktrace.NewError("Unsupported store type %q for aux", storeType) } diff --git a/pkg/raftstore/store.go b/pkg/raftstore/store.go new file mode 100644 index 000000000..ad69f11cc --- /dev/null +++ b/pkg/raftstore/store.go @@ -0,0 +1,30 @@ +package raftstore + +import ( + "context" + + dsserr "github.com/interuss/dss/pkg/errors" + "github.com/interuss/stacktrace" +) + +type Store[R any] struct{} + +func Init[R any]() *Store[R] { + return &Store[R]{} +} + +// Transact proposes the entry to Raft and blocks until it is committed and applied. +func (s *Store[R]) Transact(ctx context.Context, f func(context.Context, R) error) error { + return stacktrace.NewErrorWithCode(dsserr.NotImplemented, "Transact not yet implemented for Raft store") +} + +// Interact returns a repository that can be used to query the store without proposing a Raft entry. +func (s *Store[R]) Interact(_ context.Context) (R, error) { + var empty R + return empty, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "Interact not yet implemented for Raft store") +} + +// Close shuts down the consensus instance. +func (s *Store[R]) Close() error { + return stacktrace.NewErrorWithCode(dsserr.NotImplemented, "Close not yet implemented for Raft store") +} diff --git a/pkg/rid/store/store.go b/pkg/rid/store/store.go index 026846306..835164cf7 100644 --- a/pkg/rid/store/store.go +++ b/pkg/rid/store/store.go @@ -3,6 +3,7 @@ package store import ( "context" + dsserr "github.com/interuss/dss/pkg/errors" "github.com/interuss/dss/pkg/rid/repos" ridsqlstore "github.com/interuss/dss/pkg/rid/store/sqlstore" dssstore "github.com/interuss/dss/pkg/store" @@ -19,8 +20,10 @@ type Store = dssstore.Store[repos.Repository] func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool) (Store, error) { storeType := params.GetStoreParameters().StoreType switch storeType { - case "sql": + case params.SQLStoreType: return ridsqlstore.Init(ctx, logger, withCheckCron) + case params.RaftStoreType: + return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "Raft store not yet implemented for remote ID") default: return nil, stacktrace.NewError("Unsupported store type %q for rid", storeType) } diff --git a/pkg/scd/store/raftstore/availability.go b/pkg/scd/store/raftstore/availability.go new file mode 100644 index 000000000..3750bf34b --- /dev/null +++ b/pkg/scd/store/raftstore/availability.go @@ -0,0 +1,18 @@ +package raftstore + +import ( + "context" + + dsserr "github.com/interuss/dss/pkg/errors" + dssmodels "github.com/interuss/dss/pkg/models" + scdmodels "github.com/interuss/dss/pkg/scd/models" + "github.com/interuss/stacktrace" +) + +func (r *repo) GetUssAvailability(_ context.Context, id dssmodels.Manager) (*scdmodels.UssAvailabilityStatus, error) { + return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "GetUssAvailability not yet implemented in raft store") +} + +func (r *repo) UpsertUssAvailability(_ context.Context, ussa *scdmodels.UssAvailabilityStatus) (*scdmodels.UssAvailabilityStatus, error) { + return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "UpsertUssAvailability not yet implemented in raft store") +} diff --git a/pkg/scd/store/raftstore/constraints.go b/pkg/scd/store/raftstore/constraints.go new file mode 100644 index 000000000..72ac25709 --- /dev/null +++ b/pkg/scd/store/raftstore/constraints.go @@ -0,0 +1,26 @@ +package raftstore + +import ( + "context" + + dsserr "github.com/interuss/dss/pkg/errors" + dssmodels "github.com/interuss/dss/pkg/models" + scdmodels "github.com/interuss/dss/pkg/scd/models" + "github.com/interuss/stacktrace" +) + +func (r *repo) SearchConstraints(_ context.Context, v4d *dssmodels.Volume4D) ([]*scdmodels.Constraint, error) { + return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "SearchConstraints not yet implemented in raft store") +} + +func (r *repo) GetConstraint(_ context.Context, id dssmodels.ID) (*scdmodels.Constraint, error) { + return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "GetConstraint not yet implemented in raft store") +} + +func (r *repo) UpsertConstraint(_ context.Context, constraint *scdmodels.Constraint) (*scdmodels.Constraint, error) { + return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "UpsertConstraint not yet implemented in raft store") +} + +func (r *repo) DeleteConstraint(_ context.Context, id dssmodels.ID) error { + return stacktrace.NewErrorWithCode(dsserr.NotImplemented, "DeleteConstraint not yet implemented in raft store") +} diff --git a/pkg/scd/store/raftstore/doc.go b/pkg/scd/store/raftstore/doc.go new file mode 100644 index 000000000..2192c849a --- /dev/null +++ b/pkg/scd/store/raftstore/doc.go @@ -0,0 +1,3 @@ +// Package scd.store.raftstore provides a full implementation of store.Store[scd.repos.Repository] +// for Raft-based in-memory data storage. +package raftstore diff --git a/pkg/scd/store/raftstore/operational_intents.go b/pkg/scd/store/raftstore/operational_intents.go new file mode 100644 index 000000000..93620927c --- /dev/null +++ b/pkg/scd/store/raftstore/operational_intents.go @@ -0,0 +1,35 @@ +package raftstore + +import ( + "context" + "time" + + dsserr "github.com/interuss/dss/pkg/errors" + dssmodels "github.com/interuss/dss/pkg/models" + scdmodels "github.com/interuss/dss/pkg/scd/models" + "github.com/interuss/stacktrace" +) + +func (r *repo) GetOperationalIntent(_ context.Context, id dssmodels.ID) (*scdmodels.OperationalIntent, error) { + return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "GetOperationalIntent not yet implemented in raft store") +} + +func (r *repo) DeleteOperationalIntent(_ context.Context, id dssmodels.ID) error { + return stacktrace.NewErrorWithCode(dsserr.NotImplemented, "DeleteOperationalIntent not yet implemented in raft store") +} + +func (r *repo) UpsertOperationalIntent(_ context.Context, operation *scdmodels.OperationalIntent) (*scdmodels.OperationalIntent, error) { + return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "UpsertOperationalIntent not yet implemented in raft store") +} + +func (r *repo) SearchOperationalIntents(_ context.Context, v4d *dssmodels.Volume4D) ([]*scdmodels.OperationalIntent, error) { + return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "SearchOperationalIntents not yet implemented in raft store") +} + +func (r *repo) GetDependentOperationalIntents(_ context.Context, subscriptionID dssmodels.ID) ([]dssmodels.ID, error) { + return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "GetDependentOperationalIntents not yet implemented in raft store") +} + +func (r *repo) ListExpiredOperationalIntents(_ context.Context, threshold time.Time) ([]*scdmodels.OperationalIntent, error) { + return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "ListExpiredOperationalIntents not yet implemented in raft store") +} diff --git a/pkg/scd/store/raftstore/store.go b/pkg/scd/store/raftstore/store.go new file mode 100644 index 000000000..d1cb97804 --- /dev/null +++ b/pkg/scd/store/raftstore/store.go @@ -0,0 +1,13 @@ +package raftstore + +import ( + "github.com/interuss/dss/pkg/raftstore" + "github.com/interuss/dss/pkg/scd/repos" +) + +// repo is a full implementation of scd.repos.Repository for Raft-based in-memory storage. +type repo struct{} + +func Init() (*raftstore.Store[repos.Repository], error) { + return raftstore.Init[repos.Repository](), nil +} diff --git a/pkg/scd/store/raftstore/subscriptions.go b/pkg/scd/store/raftstore/subscriptions.go new file mode 100644 index 000000000..0b7477b39 --- /dev/null +++ b/pkg/scd/store/raftstore/subscriptions.go @@ -0,0 +1,40 @@ +package raftstore + +import ( + "context" + "time" + + "github.com/golang/geo/s2" + dsserr "github.com/interuss/dss/pkg/errors" + dssmodels "github.com/interuss/dss/pkg/models" + scdmodels "github.com/interuss/dss/pkg/scd/models" + "github.com/interuss/stacktrace" +) + +func (r *repo) SearchSubscriptions(_ context.Context, v4d *dssmodels.Volume4D) ([]*scdmodels.Subscription, error) { + return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "SearchSubscriptions not yet implemented in raft store") +} + +func (r *repo) GetSubscription(_ context.Context, id dssmodels.ID) (*scdmodels.Subscription, error) { + return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "GetSubscription not yet implemented in raft store") +} + +func (r *repo) UpsertSubscription(_ context.Context, sub *scdmodels.Subscription) (*scdmodels.Subscription, error) { + return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "UpsertSubscription not yet implemented in raft store") +} + +func (r *repo) DeleteSubscription(_ context.Context, id dssmodels.ID) error { + return stacktrace.NewErrorWithCode(dsserr.NotImplemented, "DeleteSubscription not yet implemented in raft store") +} + +func (r *repo) IncrementNotificationIndices(_ context.Context, subscriptionIds []dssmodels.ID) ([]int, error) { + return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "IncrementNotificationIndices not yet implemented in raft store") +} + +func (r *repo) LockSubscriptionsOnCells(_ context.Context, cells s2.CellUnion, subscriptionIds []dssmodels.ID, startTime *time.Time, endTime *time.Time) error { + return stacktrace.NewErrorWithCode(dsserr.NotImplemented, "LockSubscriptionsOnCells not yet implemented in raft store") +} + +func (r *repo) ListExpiredSubscriptions(_ context.Context, threshold time.Time) ([]*scdmodels.Subscription, error) { + return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "ListExpiredSubscriptions not yet implemented in raft store") +} diff --git a/pkg/scd/store/store.go b/pkg/scd/store/store.go index 9e977655d..1b6d12b00 100644 --- a/pkg/scd/store/store.go +++ b/pkg/scd/store/store.go @@ -4,6 +4,7 @@ import ( "context" "github.com/interuss/dss/pkg/scd/repos" + "github.com/interuss/dss/pkg/scd/store/raftstore" scdsqlstore "github.com/interuss/dss/pkg/scd/store/sqlstore" dssstore "github.com/interuss/dss/pkg/store" "github.com/interuss/dss/pkg/store/params" @@ -19,8 +20,10 @@ type Store = dssstore.Store[repos.Repository] func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool, globalLock bool) (Store, error) { storeType := params.GetStoreParameters().StoreType switch storeType { - case "sql": + case params.SQLStoreType: return scdsqlstore.Init(ctx, logger, withCheckCron, globalLock) + case params.RaftStoreType: + return raftstore.Init() default: return nil, stacktrace.NewError("Unsupported store type %q for scd", storeType) } diff --git a/pkg/store/params/params.go b/pkg/store/params/params.go index 48af00d19..2741c94e3 100644 --- a/pkg/store/params/params.go +++ b/pkg/store/params/params.go @@ -2,6 +2,12 @@ package params import ( "flag" + "fmt" +) + +const ( + RaftStoreType = "raft" + SQLStoreType = "sql" ) type ( @@ -16,7 +22,7 @@ var ( ) func init() { - flag.StringVar(&storeParameters.StoreType, "store_type", "sql", "Store type. Use 'sql' for CockroachDB/YugabyteDB") + flag.StringVar(&storeParameters.StoreType, "store_type", SQLStoreType, fmt.Sprintf("Store type. Use '%s' for CockroachDB/YugabyteDB and '%s' for Raft-based store.", SQLStoreType, RaftStoreType)) } // ConnectParameters returns a ConnectParameters instance that gets populated from well-known CLI flags.