-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdataexchange.go
More file actions
153 lines (135 loc) · 4.18 KB
/
dataexchange.go
File metadata and controls
153 lines (135 loc) · 4.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
// SPDX-License-Identifier: AGPL-3.0-or-later
package dataexchange
import (
"encoding/binary"
"fmt"
"io"
"path/filepath"
"strings"
)
// Frame types for data exchange on port 1001.
const (
TypeText uint32 = 1
TypeBinary uint32 = 2
TypeJSON uint32 = 3
TypeFile uint32 = 4
// TypeTrace wraps another frame type with nanosecond-precision timing.
// Wire layout: [4-byte TypeTrace][4-byte len][4-byte inner_type][8-byte sent_at_ns][inner_payload]
TypeTrace uint32 = 5
)
// TraceFrame carries timing metadata around an inner message frame.
type TraceFrame struct {
SentAtNs int64
InnerType uint32
Payload []byte
}
// WriteTraceFrame serialises a TraceFrame as a TypeTrace outer frame.
func WriteTraceFrame(w io.Writer, tf *TraceFrame) error {
buf := make([]byte, 12+len(tf.Payload))
binary.BigEndian.PutUint32(buf[0:4], tf.InnerType)
binary.BigEndian.PutUint64(buf[4:12], uint64(tf.SentAtNs))
copy(buf[12:], tf.Payload)
return WriteFrame(w, &Frame{Type: TypeTrace, Payload: buf})
}
// ReadTracePayload decodes a TraceFrame from a raw TypeTrace Frame.
func ReadTracePayload(f *Frame) (*TraceFrame, error) {
if f.Type != TypeTrace {
return nil, fmt.Errorf("expected TypeTrace, got %d", f.Type)
}
if len(f.Payload) < 12 {
return nil, fmt.Errorf("trace payload too short: %d bytes", len(f.Payload))
}
return &TraceFrame{
InnerType: binary.BigEndian.Uint32(f.Payload[0:4]),
SentAtNs: int64(binary.BigEndian.Uint64(f.Payload[4:12])),
Payload: f.Payload[12:],
}, nil
}
// maxFilenameLen limits filename length to prevent abuse.
const maxFilenameLen = 255
// MaxFrameSize caps a single data-exchange frame at 256 MiB. Sized to fit
// the test fleet's 100 MiB file payloads with margin while still rejecting
// pathological 500 MiB+ frames that would dominate memory.
const MaxFrameSize = 1 << 28
// Frame is a typed data unit exchanged between agents.
// Wire format: [4-byte type][4-byte length][payload]
// For TypeFile, payload is: [2-byte name length][name bytes][file data]
type Frame struct {
Type uint32
Payload []byte
Filename string // only for TypeFile
}
// WriteFrame writes a frame to a writer.
func WriteFrame(w io.Writer, f *Frame) error {
payload := f.Payload
if f.Type == TypeFile {
// Prepend filename
name := []byte(f.Filename)
payload = make([]byte, 2+len(name)+len(f.Payload))
binary.BigEndian.PutUint16(payload[0:2], uint16(len(name)))
copy(payload[2:], name)
copy(payload[2+len(name):], f.Payload)
}
var hdr [8]byte
binary.BigEndian.PutUint32(hdr[0:4], f.Type)
binary.BigEndian.PutUint32(hdr[4:8], uint32(len(payload)))
if _, err := w.Write(hdr[:]); err != nil {
return err
}
_, err := w.Write(payload)
return err
}
// ReadFrame reads a frame from a reader.
func ReadFrame(r io.Reader) (*Frame, error) {
var hdr [8]byte
if _, err := io.ReadFull(r, hdr[:]); err != nil {
return nil, err
}
ftype := binary.BigEndian.Uint32(hdr[0:4])
length := binary.BigEndian.Uint32(hdr[4:8])
if length > MaxFrameSize {
return nil, fmt.Errorf("frame too large: %d (max %d)", length, MaxFrameSize)
}
payload := make([]byte, length)
if _, err := io.ReadFull(r, payload); err != nil {
return nil, err
}
f := &Frame{Type: ftype, Payload: payload}
if ftype == TypeFile && len(payload) >= 2 {
nameLen := binary.BigEndian.Uint16(payload[0:2])
if int(nameLen)+2 <= len(payload) {
if nameLen > maxFilenameLen {
return nil, fmt.Errorf("filename too long: %d bytes (max %d)", nameLen, maxFilenameLen)
}
name := string(payload[2 : 2+nameLen])
if strings.ContainsAny(name, "/\\") {
return nil, fmt.Errorf("invalid filename: path traversal characters not allowed")
}
if name != "" {
f.Filename = filepath.Base(name)
if f.Filename == "." || f.Filename == ".." {
return nil, fmt.Errorf("invalid filename: path traversal name %q not allowed", f.Filename)
}
}
f.Payload = payload[2+nameLen:]
}
}
return f, nil
}
// TypeName returns a human-readable name for a frame type.
func TypeName(t uint32) string {
switch t {
case TypeText:
return "TEXT"
case TypeBinary:
return "BINARY"
case TypeJSON:
return "JSON"
case TypeFile:
return "FILE"
case TypeTrace:
return "TRACE"
default:
return fmt.Sprintf("UNKNOWN(%d)", t)
}
}