Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 60 additions & 4 deletions commonspace/object/tree/objecttree/objecttree.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type ReadableObjectTree interface {
Debug(parser DescriptionParser) (DebugInfo, error)
IterateRoot(convert ChangeConvertFunc, iterate ChangeIterateFunc) error
IterateFrom(id string, convert ChangeConvertFunc, iterate ChangeIterateFunc) error
IterateAfterAddSeq(ctx context.Context, addSeq uint64, convert ChangeConvertFunc, iterate ChangeIterateFunc) error
}

type ObjectTree interface {
Expand All @@ -106,6 +107,7 @@ type ObjectTree interface {
Delete() error
Close() error
SetFlusher(flusher Flusher)
SetDeferredUpdater(deferred bool)
TryClose(objectTTL time.Duration) (bool, error)
}

Expand All @@ -122,9 +124,10 @@ type objectTree struct {
root *Change
tree *Tree

keys map[string]crypto.SymKey
currentReadKey crypto.SymKey
isDeleted bool
keys map[string]crypto.SymKey
currentReadKey crypto.SymKey
isDeleted bool
deferredUpdater bool

// buffers
difSnapshotBuf []*treechangeproto.RawTreeChangeWithId
Expand Down Expand Up @@ -204,6 +207,10 @@ func (ot *objectTree) SetFlusher(flusher Flusher) {
ot.flusher = flusher
}

func (ot *objectTree) SetDeferredUpdater(deferred bool) {
ot.deferredUpdater = deferred
}

func (ot *objectTree) UnmarshalledHeader() *Change {
return ot.root
}
Expand Down Expand Up @@ -412,7 +419,7 @@ func (ot *objectTree) AddRawChangesWithUpdater(ctx context.Context, changes RawC
}
}

if updater != nil {
if updater != nil && !ot.deferredUpdater {
err = updater(ot, addResult.Mode)
if err != nil {
rollback()
Expand All @@ -425,6 +432,13 @@ func (ot *objectTree) AddRawChangesWithUpdater(ctx context.Context, changes RawC
rollback()
return
}

if updater != nil && ot.deferredUpdater {
err = updater(ot, addResult.Mode)
if err != nil {
return
}
}
ot.flusher.Flush(ot)
return
}
Expand Down Expand Up @@ -701,6 +715,48 @@ func (ot *objectTree) IterateFrom(id string, convert ChangeConvertFunc, iterate
return
}

func (ot *objectTree) IterateAfterAddSeq(ctx context.Context, addSeq uint64, convert ChangeConvertFunc, iterate ChangeIterateFunc) (err error) {
if ot.isDeleted {
return ErrDeleted
}
var buf []byte
return ot.storage.GetAfterAddSeq(ctx, addSeq, func(ctx context.Context, sc StorageChange) (bool, error) {
raw := sc.RawTreeChangeWithId()
ch, uErr := ot.changeBuilder.Unmarshall(raw, false)
if uErr != nil {
return false, uErr
}
ch.OrderId = sc.OrderId
ch.SnapshotCounter = sc.SnapshotCounter
ch.AddSeq = sc.AddSeq

if convert != nil && ch.Id != ot.id {
if ch.ReadKeyId != "" {
readKey, exists := ot.keys[ch.ReadKeyId]
if !exists {
return false, list.ErrNoReadKey
}
if ch.Data == nil {
return false, fmt.Errorf("no data in change %s", ch.Id)
}
buf, uErr = readKey.DecryptReuse(buf, ch.Data)
if uErr != nil {
return false, uErr
}
} else {
buf = ch.Data
}
model, cErr := convert(ch, buf)
if cErr != nil {
return false, cErr
}
ch.Model = model
ch.Data = nil
}
return iterate(ch), nil
})
}

func (ot *objectTree) HasChanges(chs ...string) bool {
if ot.isDeleted {
return false
Expand Down
17 changes: 15 additions & 2 deletions commonspace/object/tree/objecttree/objecttreefactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type HistoryTreeParams struct {
AclList list.AclList
Heads []string
IncludeBeforeId bool
BuildEmptyData bool
}

type objectTreeDeps struct {
Expand Down Expand Up @@ -213,7 +214,13 @@ func BuildNonVerifiableHistoryTree(params HistoryTreeParams) (HistoryTree, error
}
root := rootChange.RawTreeChangeWithId()
// Use real key storage to preserve actual identities, but skip verification
changeBuilder := &nonVerifiableChangeBuilder{NewChangeBuilder(crypto.NewKeyStorage(), root)}
var cb ChangeBuilder
if params.BuildEmptyData {
cb = NewEmptyDataChangeBuilder(crypto.NewKeyStorage(), root)
} else {
cb = NewChangeBuilder(crypto.NewKeyStorage(), root)
}
changeBuilder := &nonVerifiableChangeBuilder{cb}
treeBuilder := newTreeBuilder(params.Storage, changeBuilder)
deps := objectTreeDeps{
changeBuilder: changeBuilder,
Expand All @@ -231,7 +238,13 @@ func BuildHistoryTree(params HistoryTreeParams) (HistoryTree, error) {
if err != nil {
return nil, err
}
deps := defaultObjectTreeDeps(rootChange.RawTreeChangeWithId(), params.Storage, params.AclList)
root := rootChange.RawTreeChangeWithId()
var deps objectTreeDeps
if params.BuildEmptyData {
deps = emptyDataTreeDeps(root, params.Storage, params.AclList)
} else {
deps = defaultObjectTreeDeps(root, params.Storage, params.AclList)
}
return buildHistoryTree(deps, params)
}

Expand Down
5 changes: 5 additions & 0 deletions commonspace/object/tree/objecttree/treemigrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"sync/atomic"

anystore "github.com/anyproto/any-store"

Expand Down Expand Up @@ -85,6 +86,10 @@ func (tm *TreeMigrator) MigrateTreeStorage(ctx context.Context, storage treeStor
return fmt.Errorf("migration: failed to start old storage: %w", err)
}
}
// Set up AddSeq counter so storage.AddAll can assign sequence numbers
if setter, ok := newStorage.(interface{ SetAddSeq(seq *atomic.Uint64) }); ok {
setter.SetAddSeq(&atomic.Uint64{})
}
objTree, err := BuildMigratableObjectTree(newStorage, tm.aclList)
if err != nil {
return fmt.Errorf("migration: failed to build object tree: %w", err)
Expand Down
26 changes: 26 additions & 0 deletions commonspace/object/tree/synctree/mock_synctree/mock_synctree.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions commonspace/object/tree/synctree/synctree.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type HeadNotifiable interface {

type ListenerSetter interface {
SetListener(listener updatelistener.UpdateListener)
SetDeferredUpdater(deferred bool)
}

type peerSendableObjectTree interface {
Expand Down Expand Up @@ -177,6 +178,17 @@ func (s *syncTree) IterateRoot(convert objecttree.ChangeConvertFunc, iterate obj
return s.ObjectTree.IterateRoot(convert, iterate)
}

func (s *syncTree) IterateAfterAddSeq(ctx context.Context, addSeq uint64, convert objecttree.ChangeConvertFunc, iterate objecttree.ChangeIterateFunc) (err error) {
if err = s.checkAlive(); err != nil {
return
}
return s.ObjectTree.IterateAfterAddSeq(ctx, addSeq, convert, iterate)
}

func (s *syncTree) SetDeferredUpdater(deferred bool) {
s.ObjectTree.SetDeferredUpdater(deferred)
}

func (s *syncTree) AddContent(ctx context.Context, content objecttree.SignableChangeContent) (res objecttree.AddResult, err error) {
return s.AddContentWithValidator(ctx, content, nil)
}
Expand Down
6 changes: 4 additions & 2 deletions commonspace/objecttreebuilder/treebuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ var log = logger.NewNamed(CName)
var ErrSpaceClosed = errors.New("space is closed")

type HistoryTreeOpts struct {
Heads []string
Include bool
Heads []string
Include bool
BuildEmptyData bool
}

type TreeBuilder interface {
Expand Down Expand Up @@ -176,6 +177,7 @@ func (t *treeBuilder) BuildHistoryTree(ctx context.Context, id string, opts Hist
AclList: t.aclList,
Heads: opts.Heads,
IncludeBeforeId: opts.Include,
BuildEmptyData: opts.BuildEmptyData,
}
params.Storage, err = t.spaceStorage.TreeStorage(ctx, id)
if err != nil {
Expand Down
Loading