-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstream_engine_bridge.go
More file actions
173 lines (157 loc) · 4.87 KB
/
Copy pathstream_engine_bridge.go
File metadata and controls
173 lines (157 loc) · 4.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
// bridgeEngine is the cluster-engine realization of the StreamEngine seam shared
// by every external streaming runtime (Flink, Kafka Streams, …): it runs each
// continuous query as a job in that runtime and bridges the job's output back
// into the control plane's SSE delivery. The control plane is unchanged —
// register, lifecycle and SSE are identical to the in-process meos-local engine;
// only the per-record execution moves to the runtime, where MEOS runs as a JMEOS
// UDF (no SQL).
//
// The engine is pure Go (it spawns the bridge job and pipes through it), so it
// builds without cgo. The job command is configured by <RUNTIME>_CMD (the command
// prefix; the operation and its scalar argument are appended) and <RUNTIME>_LIBPATH
// (the libmeos path passed as LD_LIBRARY_PATH): MFAPI_FLINK_CMD/MFAPI_FLINK_LIBPATH
// for Flink (MFAPI_STREAM_ENGINE=flink), MFAPI_KAFKA_CMD/MFAPI_KAFKA_LIBPATH for
// Kafka Streams (MFAPI_STREAM_ENGINE=kafka).
//
// One float per line goes to the job's stdin; one transformed float per line
// comes back on stdout, in order (the job runs at parallelism 1), so each output
// is paired with its source instant's timestamp through a FIFO.
package main
import (
"bufio"
"context"
"fmt"
"io"
"os"
"os/exec"
"strconv"
"strings"
"sync"
)
func newBridgeEngine(name, cmdEnv, libEnv string) (StreamEngine, error) {
cmd := os.Getenv(cmdEnv)
if strings.TrimSpace(cmd) == "" {
return nil, fmt.Errorf("%s engine requires %s (the bridge-job command prefix)", name, cmdEnv)
}
return &bridgeEngine{name: name, argv: strings.Fields(cmd), libPath: os.Getenv(libEnv)}, nil
}
func newFlinkEngine() (StreamEngine, error) {
return newBridgeEngine("flink", "MFAPI_FLINK_CMD", "MFAPI_FLINK_LIBPATH")
}
func newKafkaEngine() (StreamEngine, error) {
return newBridgeEngine("kafka", "MFAPI_KAFKA_CMD", "MFAPI_KAFKA_LIBPATH")
}
type bridgeEngine struct {
name string
argv []string
libPath string
}
func (e *bridgeEngine) Name() string { return e.name }
func (e *bridgeEngine) Submit(ctx context.Context, spec QuerySpec, source <-chan Instant) (QueryHandle, error) {
if spec.Agg != "" {
return nil, fmt.Errorf("the %s engine runs transforms; windowed aggregation is served by the in-process engine", e.name)
}
info, ok := liftedOps[spec.Op]
if !ok {
return nil, fmt.Errorf("unknown operation %q", spec.Op)
}
args := append([]string{}, e.argv[1:]...)
args = append(args, spec.Op)
if info.needsArg {
args = append(args, strconv.FormatFloat(spec.Arg, 'g', -1, 64))
}
cmd := exec.CommandContext(ctx, e.argv[0], args...)
cmd.Env = os.Environ()
if e.libPath != "" {
cmd.Env = append(cmd.Env, "LD_LIBRARY_PATH="+e.libPath)
}
cmd.Stderr = io.Discard
stdin, err := cmd.StdinPipe()
if err != nil {
return nil, err
}
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("start %s bridge job: %w", e.name, err)
}
h := &bridgeHandle{results: make(chan Event, 64), status: "running"}
ts := make(chan string, 1024) // source timestamps, paired with outputs by order
// feeder: source instant values → the job's stdin (one float per line).
go func() {
defer stdin.Close()
w := bufio.NewWriter(stdin)
for {
select {
case <-ctx.Done():
return
case in, ok := <-source:
if !ok {
return
}
select {
case ts <- in.T:
case <-ctx.Done():
return
}
if _, err := w.WriteString(strconv.FormatFloat(in.V, 'g', -1, 64) + "\n"); err != nil {
return
}
if err := w.Flush(); err != nil {
return
}
}
}
}()
// reader: the job's stdout (transformed floats) → result instants, each
// carrying its source instant's timestamp.
go func() {
defer close(h.results)
sc := bufio.NewScanner(stdout)
for sc.Scan() {
v, err := strconv.ParseFloat(strings.TrimSpace(sc.Text()), 64)
if err != nil {
continue
}
var t string
select {
case t = <-ts:
case <-ctx.Done():
h.setStatus("stopped")
return
}
ev := Event{"datetime": t, "value": v, "property": spec.Pname, "operation": spec.Op}
select {
case h.results <- ev:
case <-ctx.Done():
h.setStatus("stopped")
return
}
}
h.setStatus("stopped")
}()
return h, nil
}
// bridgeHandle exposes the result channel and the live status of a query running
// on a bridge-job runtime (Flink, Kafka Streams, …).
type bridgeHandle struct {
results chan Event
mu sync.Mutex
status string
}
func (h *bridgeHandle) Results() <-chan Event { return h.results }
func (h *bridgeHandle) Status() string {
h.mu.Lock()
defer h.mu.Unlock()
return h.status
}
func (h *bridgeHandle) setStatus(s string) {
h.mu.Lock()
h.status = s
h.mu.Unlock()
}
// Stop is a no-op: the control plane cancels the context, which kills the bridge
// job subprocess (exec.CommandContext) and ends the feeder/reader goroutines.
func (h *bridgeHandle) Stop() error { return nil }