-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsegment_reader.go
More file actions
130 lines (113 loc) · 3.34 KB
/
segment_reader.go
File metadata and controls
130 lines (113 loc) · 3.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package wal
import (
"bufio"
"encoding/binary"
"errors"
"fmt"
"hash/crc32"
"io"
)
// The SegmentReader is responsible for reading WAL entries from their binary
// representation, typically from disk. It is used by the WAL to automatically
// resume the last open segment upon startup, but it can also be used to manually
// iterate through WAL segments.
//
// The complete usage pattern looks like this:
//
// r, err := NewSegmentReader(…)
// …
//
// for r.ReadNext() {
// offset := r.Offset()
// …
// entry, err := r.Decode()
// …
// }
//
// if err := r.Err(); err != nil {
// …
// }
type SegmentReader struct {
r *bufio.Reader
offset uint32
typ EntryType
checksum uint32
entry Entry
payload []byte
err error
registry *EntryRegistry
}
// NewSegmentReader creates a new SegmentReader that reads encoded WAL entries
// from the provided reader. The registry is used to map the entry types that
// have been read to their Entry implementations which contain the decoding logic.
func NewSegmentReader(r io.Reader, registry *EntryRegistry) (*SegmentReader, error) {
return &SegmentReader{
r: bufio.NewReader(r),
registry: registry,
}, nil
}
// SeekEnd reads through the entire segment until the end and returns the last offset.
func (r *SegmentReader) SeekEnd() (lastOffset uint32, err error) {
for r.ReadNext() {
lastOffset = r.Offset()
}
return lastOffset, r.Err()
}
// ReadNext loads the data for the next Entry from the underlying reader.
// For efficiency reasons, this function neither checks the entry checksum,
// nor does it decode the entry bytes. This is done, so the caller can quickly
// seek through a WAL up to a specific offset without having to decode each WAL
// entry.
//
// You can get the offset of the current entry using SegmentReader.Offset().
// In order to actually decode the read WAL entry, you need to use SegmentReader.Decode(…).
func (r *SegmentReader) ReadNext() bool {
var header [9]byte // 4B offset + 1B type + 4B checksum
n, err := io.ReadFull(r.r, header[:])
if err == io.EOF {
return false
}
if err != nil {
r.err = err
return false
}
if n != 9 {
r.err = io.ErrUnexpectedEOF
return false
}
r.offset = binary.BigEndian.Uint32(header[:4])
r.typ = EntryType(header[4])
r.checksum = binary.BigEndian.Uint32(header[5:9])
r.entry, err = r.registry.New(r.typ)
if err != nil {
r.err = err
return false
}
r.payload, r.err = r.entry.ReadPayload(r.r)
return true
}
// Offset returns the offset of the last entry that was read by SegmentReader.ReadNext().
func (r *SegmentReader) Offset() uint32 {
return r.offset
}
// Decode decodes the last entry that was read using SegmentReader.ReadNext().
func (r *SegmentReader) Decode() (Entry, error) {
if r.err != nil {
return nil, r.err
}
if r.entry == nil {
return nil, errors.New("must call SegmentReader.ReadNext() first")
}
if r.checksum != crc32.ChecksumIEEE(r.payload) {
return nil, fmt.Errorf("detected WAL Entry corruption at WAL offset %d", r.offset)
}
err := r.entry.DecodePayload(r.payload)
return r.entry, err
}
// Err returns any error that happened when calling ReadNext(). This function must
// always be called even if ReadNext() never returned true.
//
// Please refer to the comment on the SegmentReader type to see the full usage pattern.
func (r *SegmentReader) Err() error {
return r.err
}