From dd9e73818a51a90a062409eda041854c0fa7b2b4 Mon Sep 17 00:00:00 2001 From: Esteban Zimanyi Date: Thu, 11 Jun 2026 00:40:46 +0200 Subject: [PATCH] feat(stream): add sin/cos/tan to the lifted-transform table Wire `tfloat_sin`, `tfloat_cos`, `tfloat_tan` into the StreamEngine lifted-operation surface: the `liftedOps` catalogue, the meos-engine `liftInstant` switch, the Flink bridge job, and the Kafka bridge job all handle `"sin"`, `"cos"`, `"tan"`. Tests cover the MEOS path (exact pointwise results on instants) and the no-MEOS catalogue (sin/cos/tan present as unary ops). --- stream.go | 7 ++++--- stream_engine_meos.go | 6 ++++++ stream_meos_test.go | 3 +++ stream_test.go | 6 ++++-- tutorial-stream/flink/MfStreamBridgeJob.java | 3 +++ tutorial-stream/kafka/MfStreamKafkaBridgeJob.java | 3 +++ 6 files changed, 23 insertions(+), 5 deletions(-) diff --git a/stream.go b/stream.go index e303162..e8a5d3f 100644 --- a/stream.go +++ b/stream.go @@ -208,9 +208,7 @@ type opInfo struct { } // liftedOps is the catalogue of scalar operations a continuous transform can -// apply to a tfloat stream. Every entry is a MEOS lifted temporal function that -// exists today, so the POC needs no kernel change. (sin/cos/tan join this set -// once MEOS adds them; arithmetic and the listed math functions are present.) +// apply to a tfloat stream. Every entry maps to a MEOS lifted temporal function. var liftedOps = map[string]opInfo{ "ln": {false, "natural logarithm"}, "exp": {false, "exponential"}, @@ -220,6 +218,9 @@ var liftedOps = map[string]opInfo{ "abs": {false, "absolute value"}, "degrees": {false, "radians to degrees"}, "radians": {false, "degrees to radians"}, + "sin": {false, "sine"}, + "cos": {false, "cosine"}, + "tan": {false, "tangent"}, "add": {true, "add a scalar"}, "sub": {true, "subtract a scalar"}, "mul": {true, "multiply by a scalar"}, diff --git a/stream_engine_meos.go b/stream_engine_meos.go index 53e734d..997f287 100644 --- a/stream_engine_meos.go +++ b/stream_engine_meos.go @@ -278,6 +278,12 @@ func liftInstant(op string, arg float64, in Instant) (Instant, error) { res = C.tfloat_degrees(temp, false) case "radians": res = C.tfloat_radians(temp) + case "sin": + res = C.tfloat_sin(temp) + case "cos": + res = C.tfloat_cos(temp) + case "tan": + res = C.tfloat_tan(temp) case "add": res = C.add_tfloat_float(temp, C.double(arg)) case "sub": diff --git a/stream_meos_test.go b/stream_meos_test.go index fd109d4..786ba58 100644 --- a/stream_meos_test.go +++ b/stream_meos_test.go @@ -36,6 +36,9 @@ func TestMeosTransform(t *testing.T) { {"div", 4, 10, 2.5}, {"degrees", 0, math.Pi, 180}, {"radians", 0, 180, math.Pi}, + {"sin", 0, math.Pi / 2, 1}, + {"cos", 0, 0, 1}, + {"tan", 0, math.Pi / 4, 1}, } for _, c := range cases { ctx, cancel := context.WithCancel(context.Background()) diff --git a/stream_test.go b/stream_test.go index dff2543..b82a7e3 100644 --- a/stream_test.go +++ b/stream_test.go @@ -92,8 +92,10 @@ func TestLiftedOpsCatalogue(t *testing.T) { t.Errorf("%s should be a known scalar-arg op", op) } } - if _, ok := liftedOps["sin"]; ok { - t.Error("sin is not in MEOS yet and must not be advertised") + for _, op := range []string{"sin", "cos", "tan"} { + if info, ok := liftedOps[op]; !ok || info.needsArg { + t.Errorf("%s should be a known unary op", op) + } } } diff --git a/tutorial-stream/flink/MfStreamBridgeJob.java b/tutorial-stream/flink/MfStreamBridgeJob.java index 75bb803..e4c1a7e 100644 --- a/tutorial-stream/flink/MfStreamBridgeJob.java +++ b/tutorial-stream/flink/MfStreamBridgeJob.java @@ -49,6 +49,9 @@ static String lift(String op, double arg, String floatLine) { case "abs": r = GeneratedFunctions.tnumber_abs(t); break; case "degrees": r = GeneratedFunctions.tfloat_degrees(t, false); break; case "radians": r = GeneratedFunctions.tfloat_radians(t); break; + case "sin": r = GeneratedFunctions.tfloat_sin(t); break; + case "cos": r = GeneratedFunctions.tfloat_cos(t); break; + case "tan": r = GeneratedFunctions.tfloat_tan(t); break; case "add": r = GeneratedFunctions.add_tfloat_float(t, arg); break; case "sub": r = GeneratedFunctions.sub_tfloat_float(t, arg); break; case "mul": r = GeneratedFunctions.mul_tfloat_float(t, arg); break; diff --git a/tutorial-stream/kafka/MfStreamKafkaBridgeJob.java b/tutorial-stream/kafka/MfStreamKafkaBridgeJob.java index 2595445..a2d1046 100644 --- a/tutorial-stream/kafka/MfStreamKafkaBridgeJob.java +++ b/tutorial-stream/kafka/MfStreamKafkaBridgeJob.java @@ -74,6 +74,9 @@ static String lift(String op, double arg, String floatLine) { case "abs": r = GeneratedFunctions.tnumber_abs(t); break; case "degrees": r = GeneratedFunctions.tfloat_degrees(t, false); break; case "radians": r = GeneratedFunctions.tfloat_radians(t); break; + case "sin": r = GeneratedFunctions.tfloat_sin(t); break; + case "cos": r = GeneratedFunctions.tfloat_cos(t); break; + case "tan": r = GeneratedFunctions.tfloat_tan(t); break; case "add": r = GeneratedFunctions.add_tfloat_float(t, arg); break; case "sub": r = GeneratedFunctions.sub_tfloat_float(t, arg); break; case "mul": r = GeneratedFunctions.mul_tfloat_float(t, arg); break;