Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 29 additions & 17 deletions cmds/core-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
aux "github.com/interuss/dss/pkg/aux_"
auxs "github.com/interuss/dss/pkg/aux_/store"
"github.com/interuss/dss/pkg/build"
dsserr "github.com/interuss/dss/pkg/errors"
"github.com/interuss/dss/pkg/logging"
"github.com/interuss/dss/pkg/rid/application"
rid_v1 "github.com/interuss/dss/pkg/rid/server/v1"
Expand Down Expand Up @@ -169,16 +170,30 @@ func RunHTTPServer(ctx context.Context, ctxCanceler func(), address, locality st

// Initialize aux
auxV1Server, err = createAuxServer(ctx, locality, *publicEndpoint, *scdGlobalLock, logger)
if err != nil {
if stacktrace.GetCode(err) == dsserr.NotImplemented {
logger.Warn("aux not supported by current store, those endpoints will not be registered")
} else if err != nil {
return stacktrace.Propagate(err, "Failed to create aux server")
}

// Initialize remote ID
ridV1Server, ridV2Server, err = createRIDServers(ctx, locality, logger)
if err != nil {
if stacktrace.GetCode(err) == dsserr.NotImplemented {
logger.Warn("remote ID not supported by current store, those endpoints will not be registered")
} else if err != nil {
return stacktrace.Propagate(err, "Failed to create remote ID server")
}

// Initialize strategic conflict detection
if *enableSCD {
scdV1Server, err = createSCDServer(ctx, logger)
if stacktrace.GetCode(err) == dsserr.NotImplemented {
logger.Warn("strategic conflict detection not supported by current store, those endpoints will not be registered")
} else if err != nil {
return stacktrace.Propagate(err, "Failed to create strategic conflict detection server")
}
}

// Initialize access token validation
keyResolver, err := createKeyResolver()
switch {
Expand All @@ -199,25 +214,22 @@ func RunHTTPServer(ctx context.Context, ctxCanceler func(), address, locality st
return stacktrace.Propagate(err, "Error creating RSA authorizer")
}

auxV1Router := apiauxv1.MakeAPIRouter(auxV1Server, authorizer)
versioningV1Router := apiversioningv1.MakeAPIRouter(versioningV1Server, authorizer)
ridV1Router := apiridv1.MakeAPIRouter(ridV1Server, authorizer)
ridV2Router := apiridv2.MakeAPIRouter(ridV2Server, authorizer)
multiRouter := api.MultiRouter{
Routers: []api.PartialRouter{
&auxV1Router,
&versioningV1Router,
&ridV1Router,
&ridV2Router,
}}
Routers: []api.PartialRouter{&versioningV1Router},
}

// Initialize strategic conflict detection
if *enableSCD {
scdV1Server, err = createSCDServer(ctx, logger)
if err != nil {
return stacktrace.Propagate(err, "Failed to create strategic conflict detection server")
}
if ridV1Server != nil {
ridV1Router := apiridv1.MakeAPIRouter(ridV1Server, authorizer)
ridV2Router := apiridv2.MakeAPIRouter(ridV2Server, authorizer)
multiRouter.Routers = append(multiRouter.Routers, &ridV1Router, &ridV2Router)
}
if auxV1Server != nil {
auxV1Router := apiauxv1.MakeAPIRouter(auxV1Server, authorizer)
multiRouter.Routers = append(multiRouter.Routers, &auxV1Router)
}
Comment thread
MariemBaccari marked this conversation as resolved.

if scdV1Server != nil {
scdV1Router := apiscdv1.MakeAPIRouter(scdV1Server, authorizer)
multiRouter.Routers = append(multiRouter.Routers, &scdV1Router)
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/aux_/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/interuss/dss/pkg/aux_/repos"
auxsqlstore "github.com/interuss/dss/pkg/aux_/store/sqlstore"
dsserr "github.com/interuss/dss/pkg/errors"
dssstore "github.com/interuss/dss/pkg/store"
"github.com/interuss/dss/pkg/store/params"
"github.com/interuss/stacktrace"
Expand All @@ -19,8 +20,10 @@ type Store = dssstore.Store[repos.Repository]
// Init selects and initializes the aux store backend.
func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool) (Store, error) {
switch storeType := params.GetStoreParameters().StoreType; storeType {
case "sql":
case params.SQLStoreType:
return auxsqlstore.Init(ctx, logger, withCheckCron)
case params.RaftStoreType:
return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "Raft store not yet implemented for aux")
default:
return nil, stacktrace.NewError("Unsupported store type %q for aux", storeType)
}
Expand Down
30 changes: 30 additions & 0 deletions pkg/raftstore/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package raftstore

import (
"context"

dsserr "github.com/interuss/dss/pkg/errors"
"github.com/interuss/stacktrace"
)

type Store[R any] struct{}

func Init[R any]() *Store[R] {
return &Store[R]{}
}

// Transact proposes the entry to Raft and blocks until it is committed and applied.
func (s *Store[R]) Transact(ctx context.Context, f func(context.Context, R) error) error {
return stacktrace.NewErrorWithCode(dsserr.NotImplemented, "Transact not yet implemented for Raft store")
}

// Interact returns a repository that can be used to query the store without proposing a Raft entry.
func (s *Store[R]) Interact(_ context.Context) (R, error) {
var empty R
return empty, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "Interact not yet implemented for Raft store")
}

// Close shuts down the consensus instance.
func (s *Store[R]) Close() error {
return stacktrace.NewErrorWithCode(dsserr.NotImplemented, "Close not yet implemented for Raft store")
}
5 changes: 4 additions & 1 deletion pkg/rid/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package store
import (
"context"

dsserr "github.com/interuss/dss/pkg/errors"
"github.com/interuss/dss/pkg/rid/repos"
ridsqlstore "github.com/interuss/dss/pkg/rid/store/sqlstore"
dssstore "github.com/interuss/dss/pkg/store"
Expand All @@ -19,8 +20,10 @@ type Store = dssstore.Store[repos.Repository]
func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool) (Store, error) {
storeType := params.GetStoreParameters().StoreType
switch storeType {
case "sql":
case params.SQLStoreType:
return ridsqlstore.Init(ctx, logger, withCheckCron)
case params.RaftStoreType:
return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "Raft store not yet implemented for remote ID")
default:
return nil, stacktrace.NewError("Unsupported store type %q for rid", storeType)
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/scd/store/raftstore/availability.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package raftstore

import (
"context"

dsserr "github.com/interuss/dss/pkg/errors"
dssmodels "github.com/interuss/dss/pkg/models"
scdmodels "github.com/interuss/dss/pkg/scd/models"
"github.com/interuss/stacktrace"
)

func (r *repo) GetUssAvailability(_ context.Context, id dssmodels.Manager) (*scdmodels.UssAvailabilityStatus, error) {
return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "GetUssAvailability not yet implemented in raft store")
}

func (r *repo) UpsertUssAvailability(_ context.Context, ussa *scdmodels.UssAvailabilityStatus) (*scdmodels.UssAvailabilityStatus, error) {
return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "UpsertUssAvailability not yet implemented in raft store")
}
26 changes: 26 additions & 0 deletions pkg/scd/store/raftstore/constraints.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package raftstore

import (
"context"

dsserr "github.com/interuss/dss/pkg/errors"
dssmodels "github.com/interuss/dss/pkg/models"
scdmodels "github.com/interuss/dss/pkg/scd/models"
"github.com/interuss/stacktrace"
)

func (r *repo) SearchConstraints(_ context.Context, v4d *dssmodels.Volume4D) ([]*scdmodels.Constraint, error) {
return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "SearchConstraints not yet implemented in raft store")
}

func (r *repo) GetConstraint(_ context.Context, id dssmodels.ID) (*scdmodels.Constraint, error) {
return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "GetConstraint not yet implemented in raft store")
}

func (r *repo) UpsertConstraint(_ context.Context, constraint *scdmodels.Constraint) (*scdmodels.Constraint, error) {
return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "UpsertConstraint not yet implemented in raft store")
}

func (r *repo) DeleteConstraint(_ context.Context, id dssmodels.ID) error {
return stacktrace.NewErrorWithCode(dsserr.NotImplemented, "DeleteConstraint not yet implemented in raft store")
}
3 changes: 3 additions & 0 deletions pkg/scd/store/raftstore/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// Package scd.store.raftstore provides a full implementation of store.Store[scd.repos.Repository]
// for Raft-based in-memory data storage.
package raftstore
35 changes: 35 additions & 0 deletions pkg/scd/store/raftstore/operational_intents.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package raftstore

import (
"context"
"time"

dsserr "github.com/interuss/dss/pkg/errors"
dssmodels "github.com/interuss/dss/pkg/models"
scdmodels "github.com/interuss/dss/pkg/scd/models"
"github.com/interuss/stacktrace"
)

func (r *repo) GetOperationalIntent(_ context.Context, id dssmodels.ID) (*scdmodels.OperationalIntent, error) {
return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "GetOperationalIntent not yet implemented in raft store")
}

func (r *repo) DeleteOperationalIntent(_ context.Context, id dssmodels.ID) error {
return stacktrace.NewErrorWithCode(dsserr.NotImplemented, "DeleteOperationalIntent not yet implemented in raft store")
}

func (r *repo) UpsertOperationalIntent(_ context.Context, operation *scdmodels.OperationalIntent) (*scdmodels.OperationalIntent, error) {
return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "UpsertOperationalIntent not yet implemented in raft store")
}

func (r *repo) SearchOperationalIntents(_ context.Context, v4d *dssmodels.Volume4D) ([]*scdmodels.OperationalIntent, error) {
return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "SearchOperationalIntents not yet implemented in raft store")
}

func (r *repo) GetDependentOperationalIntents(_ context.Context, subscriptionID dssmodels.ID) ([]dssmodels.ID, error) {
return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "GetDependentOperationalIntents not yet implemented in raft store")
}

func (r *repo) ListExpiredOperationalIntents(_ context.Context, threshold time.Time) ([]*scdmodels.OperationalIntent, error) {
return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "ListExpiredOperationalIntents not yet implemented in raft store")
}
13 changes: 13 additions & 0 deletions pkg/scd/store/raftstore/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package raftstore

import (
"github.com/interuss/dss/pkg/raftstore"
"github.com/interuss/dss/pkg/scd/repos"
)

// repo is a full implementation of scd.repos.Repository for Raft-based in-memory storage.
type repo struct{}

func Init() (*raftstore.Store[repos.Repository], error) {
return raftstore.Init[repos.Repository](), nil
}
40 changes: 40 additions & 0 deletions pkg/scd/store/raftstore/subscriptions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package raftstore

import (
"context"
"time"

"github.com/golang/geo/s2"
dsserr "github.com/interuss/dss/pkg/errors"
dssmodels "github.com/interuss/dss/pkg/models"
scdmodels "github.com/interuss/dss/pkg/scd/models"
"github.com/interuss/stacktrace"
)

func (r *repo) SearchSubscriptions(_ context.Context, v4d *dssmodels.Volume4D) ([]*scdmodels.Subscription, error) {
return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "SearchSubscriptions not yet implemented in raft store")
}

func (r *repo) GetSubscription(_ context.Context, id dssmodels.ID) (*scdmodels.Subscription, error) {
return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "GetSubscription not yet implemented in raft store")
}

func (r *repo) UpsertSubscription(_ context.Context, sub *scdmodels.Subscription) (*scdmodels.Subscription, error) {
return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "UpsertSubscription not yet implemented in raft store")
}

func (r *repo) DeleteSubscription(_ context.Context, id dssmodels.ID) error {
return stacktrace.NewErrorWithCode(dsserr.NotImplemented, "DeleteSubscription not yet implemented in raft store")
}

func (r *repo) IncrementNotificationIndices(_ context.Context, subscriptionIds []dssmodels.ID) ([]int, error) {
return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "IncrementNotificationIndices not yet implemented in raft store")
}

func (r *repo) LockSubscriptionsOnCells(_ context.Context, cells s2.CellUnion, subscriptionIds []dssmodels.ID, startTime *time.Time, endTime *time.Time) error {
return stacktrace.NewErrorWithCode(dsserr.NotImplemented, "LockSubscriptionsOnCells not yet implemented in raft store")
}

func (r *repo) ListExpiredSubscriptions(_ context.Context, threshold time.Time) ([]*scdmodels.Subscription, error) {
return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "ListExpiredSubscriptions not yet implemented in raft store")
}
5 changes: 4 additions & 1 deletion pkg/scd/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/interuss/dss/pkg/scd/repos"
"github.com/interuss/dss/pkg/scd/store/raftstore"
scdsqlstore "github.com/interuss/dss/pkg/scd/store/sqlstore"
dssstore "github.com/interuss/dss/pkg/store"
"github.com/interuss/dss/pkg/store/params"
Expand All @@ -19,8 +20,10 @@ type Store = dssstore.Store[repos.Repository]
func Init(ctx context.Context, logger *zap.Logger, withCheckCron bool, globalLock bool) (Store, error) {
storeType := params.GetStoreParameters().StoreType
switch storeType {
case "sql":
case params.SQLStoreType:
return scdsqlstore.Init(ctx, logger, withCheckCron, globalLock)
case params.RaftStoreType:
return raftstore.Init()
default:
return nil, stacktrace.NewError("Unsupported store type %q for scd", storeType)
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/store/params/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@ package params

import (
"flag"
"fmt"
)

const (
RaftStoreType = "raft"
SQLStoreType = "sql"
)

type (
Expand All @@ -16,7 +22,7 @@ var (
)

func init() {
flag.StringVar(&storeParameters.StoreType, "store_type", "sql", "Store type. Use 'sql' for CockroachDB/YugabyteDB")
flag.StringVar(&storeParameters.StoreType, "store_type", SQLStoreType, fmt.Sprintf("Store type. Use '%s' for CockroachDB/YugabyteDB and '%s' for Raft-based store.", SQLStoreType, RaftStoreType))
}

// ConnectParameters returns a ConnectParameters instance that gets populated from well-known CLI flags.
Expand Down
Loading