From 18ac11064a71ef826463a91bbaf8b77bb5b4ea90 Mon Sep 17 00:00:00 2001 From: arashi Date: Wed, 21 Jan 2026 01:05:09 +0100 Subject: [PATCH 1/2] initial code --- mod/all/mods.go | 3 +- mod/indexing/errors.go | 5 + mod/indexing/module.go | 7 + mod/indexing/src/config.go | 6 + mod/indexing/src/db.go | 127 ++++++++++++++++++ mod/indexing/src/db_repo_entry.go | 19 +++ mod/indexing/src/deps.go | 23 ++++ mod/indexing/src/loader.go | 43 ++++++ mod/indexing/src/module.go | 206 +++++++++++++++++++++++++++++ mod/indexing/src/op_enable_repo.go | 36 +++++ 10 files changed, 474 insertions(+), 1 deletion(-) create mode 100644 mod/indexing/errors.go create mode 100644 mod/indexing/module.go create mode 100644 mod/indexing/src/config.go create mode 100644 mod/indexing/src/db.go create mode 100644 mod/indexing/src/db_repo_entry.go create mode 100644 mod/indexing/src/deps.go create mode 100644 mod/indexing/src/loader.go create mode 100644 mod/indexing/src/module.go create mode 100644 mod/indexing/src/op_enable_repo.go diff --git a/mod/all/mods.go b/mod/all/mods.go index c11aba9fe..e185badac 100644 --- a/mod/all/mods.go +++ b/mod/all/mods.go @@ -16,8 +16,9 @@ import ( _ "github.com/cryptopunkscc/astrald/mod/fs/src" _ "github.com/cryptopunkscc/astrald/mod/fwd/src" _ "github.com/cryptopunkscc/astrald/mod/gateway/src" + _ "github.com/cryptopunkscc/astrald/mod/indexing/src" _ "github.com/cryptopunkscc/astrald/mod/ip/src" - _ "github.com/cryptopunkscc/astrald/mod/kcp/src" + //_ "github.com/cryptopunkscc/astrald/mod/kcp/src" _ "github.com/cryptopunkscc/astrald/mod/keys/src" _ "github.com/cryptopunkscc/astrald/mod/kos/src" _ "github.com/cryptopunkscc/astrald/mod/log/src" diff --git a/mod/indexing/errors.go b/mod/indexing/errors.go new file mode 100644 index 000000000..b277b0ec3 --- /dev/null +++ b/mod/indexing/errors.go @@ -0,0 +1,5 @@ +package indexing + +import "errors" + +var ErrObjectAlreadyAdded = errors.New("object already added") diff --git a/mod/indexing/module.go b/mod/indexing/module.go new file mode 100644 index 000000000..53fc97585 --- /dev/null +++ b/mod/indexing/module.go @@ -0,0 +1,7 @@ +package indexing + +const ModuleName = "indexing" +const DBPrefix = "indexing__" + +type Module interface { +} diff --git a/mod/indexing/src/config.go b/mod/indexing/src/config.go new file mode 100644 index 000000000..acb5c35af --- /dev/null +++ b/mod/indexing/src/config.go @@ -0,0 +1,6 @@ +package indexing + +type Config struct { +} + +var defaultConfig = Config{} diff --git a/mod/indexing/src/db.go b/mod/indexing/src/db.go new file mode 100644 index 000000000..d706a1c43 --- /dev/null +++ b/mod/indexing/src/db.go @@ -0,0 +1,127 @@ +package indexing + +import ( + "errors" + + "github.com/cryptopunkscc/astrald/astral" + "github.com/cryptopunkscc/astrald/mod/indexing" + "gorm.io/gorm" +) + +type DB struct { + *gorm.DB +} + +func (db *DB) autoMigrate() error { + return db.AutoMigrate(&dbRepoEntry{}) +} + +func newDB(gormDB *gorm.DB) (*DB, error) { + db := &DB{DB: gormDB} + + err := db.DB.AutoMigrate() + if err != nil { + return nil, err + } + + return db, nil +} + +func (db *DB) addToRepo(repoName string, objectID *astral.ObjectID) (err error) { + err = db.Transaction(func(tx *gorm.DB) error { + // get next version + var ver int + err = tx.Model(&dbRepoEntry{}). + Select("MAX(version)+1"). + Where("repo = ?", repoName, objectID). + First(&ver). + Error + + var row dbRepoEntry + err = tx.Where("repo = ? and object_id = ?", repoName, objectID).First(&row).Error + if err == nil { + if row.Exist { + return indexing.ErrObjectAlreadyAdded + } + row.Version = ver + row.Exist = true + return tx.Save(&row).Error + } + + return tx.Create(&dbRepoEntry{ + Repo: repoName, + ObjectID: objectID, + Version: ver, + Exist: true, + }).Error + }) + + return +} + +func (db *DB) removeFromRepo(repoName string, objectID *astral.ObjectID) error { + return db.Transaction(func(tx *gorm.DB) (err error) { + var row dbRepoEntry + err = tx.Where("repo = ? and object_id = ?", repoName, objectID).First(&row).Error + switch { + case err != nil: + return err + case !row.Exist: + return errors.New("row already marked as removed") + } + + err = tx.Model(&dbRepoEntry{}). + Select("MAX(version)+1"). + Where("repo = ?", repoName, objectID). + First(&row.Version). + Error + if err != nil { + return err + } + + row.Exist = false + + err = tx.Save(&row).Error + + return err + }) +} + +func (db *DB) findExcessObjectIDs(repoName string, set []*astral.ObjectID) (excess []*astral.ObjectID, err error) { + err = db.DB. + Model(&dbRepoEntry{}). + Select("object_id"). + Where("repo = ? and object_id not in (?) and exist = true", repoName, set). + Find(&excess). + Error + + return +} + +func (db *DB) findMissingObjectIDs(repoName string, set []*astral.ObjectID) (missing []*astral.ObjectID, err error) { + // fetch existing + var existing []*astral.ObjectID + err = db.DB. + Model(&dbRepoEntry{}). + Where("repo = ? and object_id in (?) and exist = true", repoName, set). + Pluck("object_id", &existing). + Error + if err != nil { + return nil, err + } + + // prepare a lookup set + present := make(map[string]struct{}, len(existing)) + for _, id := range existing { + present[id.String()] = struct{}{} + } + + // find missing + for _, id := range set { + if _, ok := present[id.String()]; !ok { + missing = append(missing, id) + } + } + + return +} diff --git a/mod/indexing/src/db_repo_entry.go b/mod/indexing/src/db_repo_entry.go new file mode 100644 index 000000000..020e34ff3 --- /dev/null +++ b/mod/indexing/src/db_repo_entry.go @@ -0,0 +1,19 @@ +package indexing + +import ( + "time" + + "github.com/cryptopunkscc/astrald/astral" + "github.com/cryptopunkscc/astrald/mod/indexing" +) + +type dbRepoEntry struct { + Repo string `gorm:"index;primaryKey;uniqueIndex:idx_repo_version"` + ObjectID *astral.ObjectID `gorm:"index;primaryKey"` + Version int `gorm:"index;uniqueIndex:idx_repo_version"` + Exist bool `gorm:"index"` + CreatedAt time.Time + UpdatedAt time.Time +} + +func (dbRepoEntry) TableName() string { return indexing.DBPrefix + "repo_entries" } diff --git a/mod/indexing/src/deps.go b/mod/indexing/src/deps.go new file mode 100644 index 000000000..829139e6b --- /dev/null +++ b/mod/indexing/src/deps.go @@ -0,0 +1,23 @@ +package indexing + +import ( + "github.com/cryptopunkscc/astrald/astral" + "github.com/cryptopunkscc/astrald/core" + "github.com/cryptopunkscc/astrald/mod/tree" +) + +func (mod *Module) LoadDependencies() (err error) { + ctx := astral.NewContext(nil) + + err = core.Inject(mod.node, &mod.Deps) + if err != nil { + return + } + + mod.repos, err = tree.Query(ctx, mod.Tree.Root(), "/mod/indexing/repos", true) + if err != nil { + return err + } + + return err +} diff --git a/mod/indexing/src/loader.go b/mod/indexing/src/loader.go new file mode 100644 index 000000000..6fd42307d --- /dev/null +++ b/mod/indexing/src/loader.go @@ -0,0 +1,43 @@ +package indexing + +import ( + "github.com/cryptopunkscc/astrald/astral" + "github.com/cryptopunkscc/astrald/astral/log" + "github.com/cryptopunkscc/astrald/core" + "github.com/cryptopunkscc/astrald/core/assets" + "github.com/cryptopunkscc/astrald/mod/indexing" +) + +type Loader struct{} + +func (Loader) Load(node astral.Node, assets assets.Assets, log *log.Logger) (core.Module, error) { + var err error + var mod = &Module{ + node: node, + config: defaultConfig, + log: log, + assets: assets, + } + + _ = assets.LoadYAML(indexing.ModuleName, &mod.config) + + mod.ops.AddStruct(mod, "Op") + + mod.db, err = newDB(assets.Database()) + if err != nil { + return nil, err + } + + err = mod.db.autoMigrate() + if err != nil { + return nil, err + } + + return mod, err +} + +func init() { + if err := core.RegisterModule(indexing.ModuleName, Loader{}); err != nil { + panic(err) + } +} diff --git a/mod/indexing/src/module.go b/mod/indexing/src/module.go new file mode 100644 index 000000000..c6e48c862 --- /dev/null +++ b/mod/indexing/src/module.go @@ -0,0 +1,206 @@ +package indexing + +import ( + "context" + "errors" + "time" + + "github.com/cryptopunkscc/astrald/astral" + "github.com/cryptopunkscc/astrald/astral/log" + "github.com/cryptopunkscc/astrald/mod/indexing" + "github.com/cryptopunkscc/astrald/mod/objects" + "github.com/cryptopunkscc/astrald/mod/shell" + "github.com/cryptopunkscc/astrald/mod/tree" + "github.com/cryptopunkscc/astrald/resources" + "github.com/cryptopunkscc/astrald/sig" +) + +type Deps struct { + Objects objects.Module + Tree tree.Module +} + +type Module struct { + Deps + config Config + node astral.Node + log *log.Logger + assets resources.Resources + ops shell.Scope + db *DB + ctx *astral.Context + repos tree.Node + + mod sig.Map[string, context.CancelFunc] +} + +func (mod *Module) Run(ctx *astral.Context) error { + mod.ctx = ctx + + sub, err := mod.repos.Sub(ctx) + if err != nil { + return err + } + + for repoName := range sub { + err = mod.startRepoSync(repoName) + if err != nil { + mod.log.Logv(1, "error starting repo sync: %v", err) + } + } + + <-ctx.Done() + return nil +} + +func (mod *Module) Scope() *shell.Scope { + return &mod.ops +} + +func (mod *Module) String() string { + return indexing.ModuleName +} + +func (mod *Module) EnableRepo(ctx *astral.Context, repoName string) error { + _, err := mod.repos.Create(ctx, repoName) + if err != nil { + return err + } + + return mod.startRepoSync(repoName) +} + +func (mod *Module) DisableRepo(ctx *astral.Context, repoName string) error { + mod.stopRepoSync(repoName) + + sub, err := mod.repos.Sub(ctx) + if err != nil { + return err + } + + del, ok := sub[repoName] + if ok { + del.Delete(ctx) + } + + return nil +} + +func (mod *Module) startRepoSync(repoName string) error { + ctx, cancel := mod.ctx.WithCancel() + + _, ok := mod.mod.Set(repoName, cancel) + if !ok { + cancel() + return errors.New("repo already syncing") + } + + go func() { + mod.log.Logv(1, "following repo %v", repoName) + err := mod.syncRepo(ctx, repoName) + if err != nil { + mod.log.Logv(1, "error syncing repo %v: %v", repoName, err) + } else { + mod.log.Logv(1, "stopped following %v", repoName) + } + }() + + return nil +} + +func (mod *Module) stopRepoSync(repoName string) error { + cancel, ok := mod.mod.Delete(repoName) + if !ok { + return errors.New("repo not syncing") + } + cancel() + return nil +} + +func (mod *Module) syncRepo(ctx *astral.Context, repoName string) error { + repo := mod.Objects.GetRepository(repoName) + if repo == nil { + return errors.New("repository not found: " + repoName) + } + + scan, err := repo.Scan(ctx, true) + if err != nil { + return err + } + + // take a snapshot + var snapshot []*astral.ObjectID + + timeout := time.NewTimer(time.Second * 1) + defer timeout.Stop() + + for { + select { + case objectJD := <-scan: + if objectJD == nil { + goto snapshot + } + snapshot = append(snapshot, objectJD) + if !timeout.Stop() { + <-timeout.C + } + timeout.Reset(time.Second * 1) + case <-timeout.C: + goto snapshot + } + } + +snapshot: + + var removed, added int + + // remove objects from the index that are not in the snapshot + excess, err := mod.db.findExcessObjectIDs(repoName, snapshot) + if err != nil { + return err + } + + for _, objectID := range excess { + removed++ + err = mod.db.removeFromRepo(repoName, objectID) + if err != nil { + mod.log.Logv(1, "db error removing from repo: %v", err) + } + + // check context + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + } + + // add objects from the snapshot that are not in the index + missing, err := mod.db.findMissingObjectIDs(repoName, snapshot) + for _, objectID := range missing { + added++ + err = mod.db.addToRepo(repoName, objectID) + if err != nil { + mod.log.Logv(1, "error adding %v to repo %v: %v", objectID, repoName, err) + } + + // check context + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + } + + mod.log.Logv(1, "index synced with repo %v: %v removed, %v added.", repoName, removed, added) + + // follow updates from the repo until ctx is canceled + for objectID := range scan { + err = mod.db.addToRepo(repoName, objectID) + if err != nil { + mod.log.Logv(1, "db error adding to repo: %v", err) + } + } + + return nil +} diff --git a/mod/indexing/src/op_enable_repo.go b/mod/indexing/src/op_enable_repo.go new file mode 100644 index 000000000..e509ce964 --- /dev/null +++ b/mod/indexing/src/op_enable_repo.go @@ -0,0 +1,36 @@ +package indexing + +import ( + "github.com/cryptopunkscc/astrald/astral" + "github.com/cryptopunkscc/astrald/astral/channel" + "github.com/cryptopunkscc/astrald/mod/shell" +) + +type opEnableRepoArgs struct { + Repo string + Disable bool `query:"optional"` + In string `query:"optional"` + Out string `query:"optional"` +} + +func (mod *Module) OpEnableRepo(ctx *astral.Context, q shell.Query, args opEnableRepoArgs) (err error) { + ch := channel.New(q.Accept(), channel.WithFormats(args.In, args.Out)) + defer ch.Close() + + repo := mod.Objects.GetRepository(args.Repo) + if repo == nil { + return ch.Send(astral.NewError("repository not found")) + } + + if args.Disable { + err = mod.DisableRepo(ctx, args.Repo) + } else { + err = mod.EnableRepo(ctx, args.Repo) + } + + if err != nil { + return ch.Send(astral.Err(err)) + } + + return ch.Send(&astral.Ack{}) +} From e903347f7caeaa0d4e0fb541201991f0a763b0eb Mon Sep 17 00:00:00 2001 From: arashi Date: Wed, 21 Jan 2026 14:09:28 +0100 Subject: [PATCH 2/2] wip --- mod/indexing/indexer.go | 11 +++++++++++ mod/indexing/src/deps.go | 5 +++++ mod/indexing/src/module.go | 30 ++++++++++++++++++++++-------- 3 files changed, 38 insertions(+), 8 deletions(-) create mode 100644 mod/indexing/indexer.go diff --git a/mod/indexing/indexer.go b/mod/indexing/indexer.go new file mode 100644 index 000000000..646e553dd --- /dev/null +++ b/mod/indexing/indexer.go @@ -0,0 +1,11 @@ +package indexing + +import ( + "github.com/cryptopunkscc/astrald/astral" + "github.com/cryptopunkscc/astrald/mod/objects" +) + +type Indexer interface { + Add(id *astral.ObjectID, repo objects.Repository) error + Remove(id *astral.ObjectID) error +} diff --git a/mod/indexing/src/deps.go b/mod/indexing/src/deps.go index 829139e6b..2c6ef45af 100644 --- a/mod/indexing/src/deps.go +++ b/mod/indexing/src/deps.go @@ -19,5 +19,10 @@ func (mod *Module) LoadDependencies() (err error) { return err } + mod.indexes, err = tree.Query(ctx, mod.Tree.Root(), "/mod/indexing/indexes", true) + if err != nil { + return err + } + return err } diff --git a/mod/indexing/src/module.go b/mod/indexing/src/module.go index c6e48c862..89b31c37b 100644 --- a/mod/indexing/src/module.go +++ b/mod/indexing/src/module.go @@ -22,14 +22,15 @@ type Deps struct { type Module struct { Deps - config Config - node astral.Node - log *log.Logger - assets resources.Resources - ops shell.Scope - db *DB - ctx *astral.Context - repos tree.Node + config Config + node astral.Node + log *log.Logger + assets resources.Resources + ops shell.Scope + db *DB + ctx *astral.Context + repos tree.Node + indexes tree.Node mod sig.Map[string, context.CancelFunc] } @@ -86,6 +87,19 @@ func (mod *Module) DisableRepo(ctx *astral.Context, repoName string) error { return nil } +func (mod *Module) CreateIndex(ctx *astral.Context, indexName string) error { + indexNode, err := mod.indexes.Create(ctx, indexName) + if err != nil { + return err + } + + var height = astral.Uint64(0) + + indexNode.Set(ctx, &height) + + return nil +} + func (mod *Module) startRepoSync(repoName string) error { ctx, cancel := mod.ctx.WithCancel()