-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathjson2msgpackStreamer.go
More file actions
78 lines (68 loc) · 1.48 KB
/
json2msgpackStreamer.go
File metadata and controls
78 lines (68 loc) · 1.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
/*
* Copyright (c) 2022, arivum.
* All rights reserved.
* SPDX-License-Identifier: MIT
* For full license text, see the LICENSE file in the repo root or https://opensource.org/licenses/MIT
*/
package json2msgpackStreamer
import (
"bufio"
"io"
)
func NewJSON2MsgPackStreamer(r io.Reader) *JSON2MsgPackStreamer {
var (
pipeR, pipeW = io.Pipe()
t = &JSON2MsgPackStreamer{
r: bufio.NewReader(r),
underlayingReader: r,
pipeW: pipeW,
pipeR: pipeR,
buf: newBlockBuf(),
}
)
go t.convert()
return t
}
func (t *JSON2MsgPackStreamer) Read(p []byte) (int, error) {
return t.pipeR.Read(p)
}
func (t *JSON2MsgPackStreamer) GetLastError() error {
return t.lastError
}
func (t *JSON2MsgPackStreamer) convert() {
defer t.pipeW.Close()
for {
if t.nextByte, t.lastError = t.r.ReadByte(); t.lastError != nil {
break
}
switch t.nextByte {
case ' ', '\t', '\r':
// do nothing
case '{':
if t.lastError = t.handleStruct(); t.lastError != nil {
return
}
case '[':
if t.lastError = t.handleArray(); t.lastError != nil {
return
}
case '"':
if t.lastError = t.handleString(); t.lastError != nil {
return
}
case '\n':
t.buf.flushToWriter(t.pipeW)
t.buf = newBlockBuf()
t.buf.reset()
default:
t.r.UnreadByte()
if t.lastError = t.handleAtomic(); t.lastError != nil {
return
}
}
}
if t.nextByte != '\n' {
t.buf.flushToWriter(t.pipeW)
}
t.buf.reset()
}