Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,7 @@ type opInfo struct {
}

// liftedOps is the catalogue of scalar operations a continuous transform can
// apply to a tfloat stream. Every entry is a MEOS lifted temporal function that
// exists today, so the POC needs no kernel change. (sin/cos/tan join this set
// once MEOS adds them; arithmetic and the listed math functions are present.)
// apply to a tfloat stream. Every entry maps to a MEOS lifted temporal function.
var liftedOps = map[string]opInfo{
"ln": {false, "natural logarithm"},
"exp": {false, "exponential"},
Expand All @@ -220,6 +218,9 @@ var liftedOps = map[string]opInfo{
"abs": {false, "absolute value"},
"degrees": {false, "radians to degrees"},
"radians": {false, "degrees to radians"},
"sin": {false, "sine"},
"cos": {false, "cosine"},
"tan": {false, "tangent"},
"add": {true, "add a scalar"},
"sub": {true, "subtract a scalar"},
"mul": {true, "multiply by a scalar"},
Expand Down
6 changes: 6 additions & 0 deletions stream_engine_meos.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,12 @@ func liftInstant(op string, arg float64, in Instant) (Instant, error) {
res = C.tfloat_degrees(temp, false)
case "radians":
res = C.tfloat_radians(temp)
case "sin":
res = C.tfloat_sin(temp)
case "cos":
res = C.tfloat_cos(temp)
case "tan":
res = C.tfloat_tan(temp)
case "add":
res = C.add_tfloat_float(temp, C.double(arg))
case "sub":
Expand Down
3 changes: 3 additions & 0 deletions stream_meos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ func TestMeosTransform(t *testing.T) {
{"div", 4, 10, 2.5},
{"degrees", 0, math.Pi, 180},
{"radians", 0, 180, math.Pi},
{"sin", 0, math.Pi / 2, 1},
{"cos", 0, 0, 1},
{"tan", 0, math.Pi / 4, 1},
}
for _, c := range cases {
ctx, cancel := context.WithCancel(context.Background())
Expand Down
6 changes: 4 additions & 2 deletions stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,10 @@ func TestLiftedOpsCatalogue(t *testing.T) {
t.Errorf("%s should be a known scalar-arg op", op)
}
}
if _, ok := liftedOps["sin"]; ok {
t.Error("sin is not in MEOS yet and must not be advertised")
for _, op := range []string{"sin", "cos", "tan"} {
if info, ok := liftedOps[op]; !ok || info.needsArg {
t.Errorf("%s should be a known unary op", op)
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions tutorial-stream/flink/MfStreamBridgeJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ static String lift(String op, double arg, String floatLine) {
case "abs": r = GeneratedFunctions.tnumber_abs(t); break;
case "degrees": r = GeneratedFunctions.tfloat_degrees(t, false); break;
case "radians": r = GeneratedFunctions.tfloat_radians(t); break;
case "sin": r = GeneratedFunctions.tfloat_sin(t); break;
case "cos": r = GeneratedFunctions.tfloat_cos(t); break;
case "tan": r = GeneratedFunctions.tfloat_tan(t); break;
case "add": r = GeneratedFunctions.add_tfloat_float(t, arg); break;
case "sub": r = GeneratedFunctions.sub_tfloat_float(t, arg); break;
case "mul": r = GeneratedFunctions.mul_tfloat_float(t, arg); break;
Expand Down
3 changes: 3 additions & 0 deletions tutorial-stream/kafka/MfStreamKafkaBridgeJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ static String lift(String op, double arg, String floatLine) {
case "abs": r = GeneratedFunctions.tnumber_abs(t); break;
case "degrees": r = GeneratedFunctions.tfloat_degrees(t, false); break;
case "radians": r = GeneratedFunctions.tfloat_radians(t); break;
case "sin": r = GeneratedFunctions.tfloat_sin(t); break;
case "cos": r = GeneratedFunctions.tfloat_cos(t); break;
case "tan": r = GeneratedFunctions.tfloat_tan(t); break;
case "add": r = GeneratedFunctions.add_tfloat_float(t, arg); break;
case "sub": r = GeneratedFunctions.sub_tfloat_float(t, arg); break;
case "mul": r = GeneratedFunctions.mul_tfloat_float(t, arg); break;
Expand Down