Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 70 additions & 6 deletions transport/frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,14 +389,78 @@ type markedReader struct {
}

func (r *markedReader) Read(p []byte) (int, error) {
for i := 0; i < len(p); i++ {
b, err := r.ReadByte()
if err != nil {
return i, err
if r.r == nil {
return 0, ErrInvalidIO
}
if r.eof {
return 0, io.EOF
}

var written int

// get the first byte of the delimiter to match initially
delimStart := endOfMsg[0]

for written < len(p) {
// remaining space in p
remain := len(p) - written

peeked, err := r.r.Peek(remain)
if errors.Is(err, bufio.ErrBufferFull) {
err = nil // wipe the error. We will work with the full buffer.
}

if len(peeked) == 0 && err != nil {
if errors.Is(err, io.EOF) {
return written, io.ErrUnexpectedEOF
}
return written, err
}

// look for that start of the delimiter
idx := bytes.IndexByte(peeked, delimStart)
switch {
// no delimiter found, copy all we peeked
case idx == -1:
n := copy(p[written:], peeked)
if _, err := r.r.Discard(n); err != nil {
return written, err
}
written += n
// Found ']' somewhere in the peeked data
case idx > 0:
n, err := r.r.Read(p[written : written+idx])
written += n
if err != nil {
return written, err
}
// We are currently at ']' check for full delimiter
default:
marker, err := r.r.Peek(len(endOfMsg))
if err != nil {
if errors.Is(err, io.EOF) {
return written, io.ErrUnexpectedEOF
}
return written, err
}

// If we found the end-of-message marker, return EOF
if bytes.Equal(marker, endOfMsg) {
// Discard the end-of-message marker
if _, err := r.r.Discard(len(endOfMsg)); err != nil {
return written, err
}
r.eof = true
return written, io.EOF
}

// Add the written byte and continue
b, _ := r.r.ReadByte()
p[written] = b
written++
}
p[i] = b
}
return len(p), nil
return written, nil
}

func (r *markedReader) ReadByte() (byte, error) {
Expand Down
Loading