-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathincremental.go
More file actions
122 lines (99 loc) · 3 KB
/
incremental.go
File metadata and controls
122 lines (99 loc) · 3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package feedx
import (
"context"
"errors"
"github.com/bsm/bfs"
)
// IncrmentalProduceFunc returns a ProduceFunc closure around an incremental version.
type IncrementalProduceFunc func(remoteVersion int64) ProduceFunc
// IncrementalProducer pushes incremental feeds to a remote bucket location.
type IncrementalProducer struct {
bucket bfs.Bucket
object *bfs.Object
ownBucket bool
}
// NewIncrementalProducer inits a new incremental feed producer.
func NewIncrementalProducer(ctx context.Context, bucketURL string) (*IncrementalProducer, error) {
bucket, err := bfs.Connect(ctx, bucketURL)
if err != nil {
return nil, err
}
pcr := NewIncrementalProducerForBucket(bucket)
pcr.ownBucket = true
return pcr, nil
}
// NewIncrementalProducerForRemote starts a new incremental feed producer for a bucket.
func NewIncrementalProducerForBucket(bucket bfs.Bucket) *IncrementalProducer {
return &IncrementalProducer{
bucket: bucket,
object: bfs.NewObjectFromBucket(bucket, "manifest.json"),
}
}
// Close stops the producer.
func (p *IncrementalProducer) Close() (err error) {
if e := p.object.Close(); e != nil {
err = errors.Join(err, e)
}
if p.ownBucket && p.bucket != nil {
if e := p.bucket.Close(); e != nil {
err = errors.Join(err, e)
}
p.bucket = nil
}
return
}
func (p *IncrementalProducer) Produce(ctx context.Context, version int64, opt *WriterOptions, pfn IncrementalProduceFunc) (*Status, error) {
status := Status{LocalVersion: version}
// fetch manifest from remote object
mft, err := loadManifest(ctx, p.object)
if err != nil {
return nil, err
}
// skip if not modified
remoteVersion := mft.Version
status.RemoteVersion = remoteVersion
if skipSync(version, remoteVersion) {
status.Skipped = true
return &status, nil
}
// set version for writer
if opt == nil {
opt = new(WriterOptions)
}
opt.Version = version
// write data modified since last version
numWritten, err := p.writeDataFile(ctx, mft, version, remoteVersion, opt, pfn)
if err != nil {
return nil, err
}
// write new manifest to remote
if err := p.commitManifest(ctx, mft, &WriterOptions{Version: version}); err != nil {
return nil, err
}
status.NumItems = numWritten
return &status, nil
}
func (p *IncrementalProducer) writeDataFile(ctx context.Context, mft *manifest, version, remoteVersion int64, opt *WriterOptions, pfn IncrementalProduceFunc) (int64, error) {
fname := mft.newDataFileName(opt)
obj := bfs.NewObjectFromBucket(p.bucket, fname)
defer obj.Close()
writer := NewWriter(ctx, obj, opt)
defer writer.Discard()
if err := pfn(remoteVersion)(writer); err != nil {
return 0, err
}
if err := writer.Commit(); err != nil {
return 0, err
}
mft.Files = append(mft.Files, fname)
mft.Version = version
return writer.NumWritten(), nil
}
func (p *IncrementalProducer) commitManifest(ctx context.Context, mft *manifest, opt *WriterOptions) error {
writer := NewWriter(ctx, p.object, opt)
defer writer.Discard()
if err := writer.Encode(mft); err != nil {
return err
}
return writer.Commit()
}