Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions kafka-streams-app/docs/parity-status.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# MobilityKafka parity status — MEOS surface audit

Generated 2026-05-31 by `tools/parity/parity_audit.py`.

The MobilityKafka MEOS facade (`org.mobilitydb.kafka.meos.MeosOps*`) exposes MEOS C functions to Flink through JMEOS. This audit measures, per type family, the share of the **MEOS public C API** that the facade exposes and that JMEOS binds.

**Headline.** The facade exposes **2296 of 2296 public MEOS functions (100.0%)**. The MEOS public surface (`meos/include/meos*.h`, excluding internal headers) is 2297 functions; JMEOS binds 2296 of them. 0 bindable functions are not exposed (listed in §3).

Coverage is **static**: a function counts as covered when the facade declares a method of the same name and arity that delegates to a JMEOS export.

Per-family runtime behaviour is asserted by `src/test/java/org/mobilitydb/kafka/meos/MeosFacadeSmokeTest.java`, which constructs and reads back a value in the core, geo, cbuffer, npoint and pose families through the facade against libmeos. The cbuffer, npoint and pose families require a libmeos built with the extended modules (`-DCBUFFER=ON -DNPOINT=ON -DPOSE=ON -DRGEO=ON`); the stock library carries the core and geo surfaces only.

## 1. Reference surface and method

- **Denominator**: distinct function names declared `extern` in the MEOS public headers `meos.h`, `meos_geo.h`, `meos_cbuffer.h`, `meos_npoint.h`, `meos_pose.h`, `meos_rgeo.h`. Internal headers (`meos_internal*.h`) are excluded.

- **Numerator**: `public static` methods on the generated `MeosOps*` facade whose name is also a `functions.GeneratedFunctions` export in the bundled JMEOS jar.

- **JMEOS jar**: jar/JMEOS.jar exports 2916 static methods.

## 2. Per-family coverage of the public MEOS surface

| Family (header) | Public ∩ JMEOS | Exposed | Missing | Coverage |
|---|---:|---:|---:|---:|
| core temporal / set / span / spanset / tbox (`meos.h`) | 1343 | 1343 | 0 | 100.0% |
| geo (tgeo / tpoint / stbox) (`meos_geo.h`) | 421 | 421 | 0 | 100.0% |
| cbuffer (`meos_cbuffer.h`) | 175 | 175 | 0 | 100.0% |
| npoint (`meos_npoint.h`) | 119 | 119 | 0 | 100.0% |
| pose (`meos_pose.h`) | 101 | 101 | 0 | 100.0% |
| rgeo (`meos_rgeo.h`) | 68 | 68 | 0 | 100.0% |
| h3 / th3index (`meos_h3.h`) | 69 | 69 | 0 | 100.0% |
| **total** | **2296** | **2296** | **0** | **100.0%** |

## 3. Bindable MEOS functions not exposed by the facade

0 functions are present in the public MEOS headers and bound by JMEOS but not generated into the facade:


## 4. MobilityDB SQL-surface cross-check

The facade is also matched against the underlying MEOS C symbol of each addressable `CREATE FUNCTION` in `mobilitydb/sql/**/*.in.sql` (PG-only sections and helper symbols bucketed out; 876 out-of-scope, 113 SQL/plpgsql-composed functions with no single C symbol). Functions the SQL layer implements through the internal MEOS headers (`meos_internal*.h`) are exposed via `MeosOpsSqlSurface`.

- Addressable distinct C symbols: **1336**; bound by JMEOS: **1044**; exposed by the facade: **1044** (100.0% of the JMEOS-bindable SQL surface).

- The remaining **292** addressable C symbols are not exported by JMEOS under the name the SQL layer's extension wrapper uses; the wrapper names differ from the MEOS function names they call.

94 changes: 94 additions & 0 deletions kafka-streams-app/tools/parity/emit_gap_methods.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#!/usr/bin/env python3
"""Emit forwarding facade methods for MEOS public-surface functions that the
generated MeosOps* facade does not yet expose.

Each method forwards to its `functions.GeneratedFunctions` export using the
exact JMEOS signature (captured via `javap`), so the output compiles by
construction. Family and nature (scalar / sequence-constructor / tiling) are
recorded in the Javadoc; the wiring tier is governed by the wirings layer, not
by the presence of the forwarder.

Run from flink-processor/:
javap -classpath jar/JMEOS.jar -p functions.GeneratedFunctions > /tmp/gen_sigs.txt
python3 tools/parity/emit_gap_methods.py /tmp/gen_sigs.txt
"""
import re, os, sys, glob

HERE = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
INC = os.environ.get("MEOS_INCLUDE", "/home/esteban/src/MobilityDB/meos/include")
PUBLIC_HEADERS = ["meos.h", "meos_geo.h", "meos_cbuffer.h", "meos_npoint.h", "meos_pose.h", "meos_rgeo.h", "meos_h3.h"]
FACADE = os.path.join(HERE, "src/main/java/org/mobilitydb/kafka/meos")
OUT = os.path.join(FACADE, "MeosOpsParityGaps.java")
# Forwarder facades are themselves derived; the "already covered" baseline is the
# tier-aware generated OO classes only, so the generator reproduces idempotently.
DERIVED = {"MeosOpsParityGaps.java", "MeosOpsSqlSurface.java"}

_DECL = re.compile(r'^\s*extern\s+.+?\b([a-z][A-Za-z0-9_]*)\s*\(', re.M)
_PUBSTATIC = re.compile(r'public static [A-Za-z0-9_.<>\[\]]+ ([a-z0-9_]+)\(')
_SIG = re.compile(r'public static (\S+) (\w+)\(([^)]*)\);')


def family(name, fam):
return fam.get(name, "meos.h")


def nature(name):
if name.endswith("_make") or "_from_base_" in name or name.endswith("make_coords"):
return "whole-sequence constructor — not a per-event op"
if name.endswith("_tiles"):
return "multidimensional tiling (windowed)"
return "scalar / stateless"


def main():
sigfile = sys.argv[1] if len(sys.argv) > 1 else "/tmp/parity/gen_sigs.txt"
fam, pub = {}, set()
for h in PUBLIC_HEADERS:
for n in set(_DECL.findall(open(os.path.join(INC, h)).read())):
pub.add(n); fam.setdefault(n, h)
facade = set()
for f in glob.glob(os.path.join(FACADE, "MeosOps*.java")):
if os.path.basename(f) in DERIVED:
continue
facade |= set(_PUBSTATIC.findall(open(f).read()))
# all JMEOS signatures, grouped by name
sigs = {}
for line in open(sigfile):
m = _SIG.search(line)
if m:
ret, name, args = m.group(1), m.group(2), m.group(3).strip()
sigs.setdefault(name, []).append((ret, args))
missing = sorted(n for n in (pub & set(sigs)) - facade)

L = ["package org.mobilitydb.kafka.meos;", "",
"/**", " * Forwarding facade methods for MEOS public-surface functions not emitted",
" * by the tier-aware code generator. Each method delegates to its JMEOS",
" * {@code functions.GeneratedFunctions} export under the shared",
" * {@link MeosOpsRuntime#MEOS_AVAILABLE} guard.",
" */",
"public final class MeosOpsParityGaps {", "",
" private MeosOpsParityGaps() { /* utility */ }", ""]
count = 0
for name in missing:
for ret, args in sigs[name]:
params = [a.strip() for a in args.split(",")] if args else []
decl = ", ".join(f"{t} arg{i}" for i, t in enumerate(params))
call = ", ".join(f"arg{i}" for i in range(len(params)))
ret_kw = "" if ret == "void" else "return "
L += [f" /** MEOS {{@code {name}}} — {family(name, fam)} · {nature(name)}. */",
f" public static {ret} {name}({decl}) {{",
f" if (!MeosOpsRuntime.MEOS_AVAILABLE)",
f' throw new UnsupportedOperationException("{name} requires libmeos'
f' — set -Dmobilitykafka.meos.enabled=true");',
f" {ret_kw}functions.GeneratedFunctions.{name}({call});",
" }", ""]
count += 1
L.append("}")
open(OUT, "w").write("\n".join(L) + "\n")
print(f"missing public-surface functions: {len(missing)}")
print(f"forwarding methods emitted: {count}")
print(f"wrote {OUT}")


if __name__ == "__main__":
main()
115 changes: 115 additions & 0 deletions kafka-streams-app/tools/parity/emit_sql_surface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
#!/usr/bin/env python3
"""Emit forwarding facade methods for MEOS functions that the MobilityDB SQL
layer exposes as user functions but whose implementation lives in the internal
headers (`meos_internal*.h`). JMEOS binds them; exposing them here makes the
Flink facade match the SQL user surface as well as the public MEOS API.

Each method forwards to its `functions.GeneratedFunctions` export using the
exact JMEOS signature, so the output compiles by construction.

Run from flink-processor/:
javap -classpath jar/JMEOS.jar -p functions.GeneratedFunctions > /tmp/gen_sigs.txt
python3 tools/parity/emit_sql_surface.py /tmp/gen_sigs.txt
"""
import re, os, sys, glob

HERE = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
INC = os.environ.get("MEOS_INCLUDE", "/home/esteban/src/MobilityDB/meos/include")
MDB_SQL = os.environ.get("MDB_SQL", "/home/esteban/src/MobilityDB/mobilitydb/sql")
FACADE = os.path.join(HERE, "src/main/java/org/mobilitydb/kafka/meos")
OUT = os.path.join(FACADE, "MeosOpsSqlSurface.java")

_PUBSTATIC = re.compile(r'public static [A-Za-z0-9_.<>\[\]]+ ([a-z0-9_]+)\(')
_SIG = re.compile(r'public static (\S+) (\w+)\(([^)]*)\);')
_CREATE = re.compile(r'CREATE\s+(?:OR\s+REPLACE\s+)?FUNCTION\s+([A-Za-z0-9_]+)\s*\(', re.I)
_CSYM = re.compile(r"AS\s+'[^']*'\s*,\s*'([A-Za-z0-9_]+)'", re.I)

OOS_SECTIONS = {
"temporal/011_span_indexes.in.sql", "temporal/012_spanset_indexes.in.sql",
"temporal/013_set_indexes.in.sql", "temporal/019_geo_constructors.in.sql",
"temporal/043_temporal_gist.in.sql", "temporal/044_temporal_spgist.in.sql",
"temporal/999_oid_cache.in.sql", "geo/073_tgeo_gist.in.sql",
"geo/073_tpoint_gist.in.sql", "geo/074_tgeo_spgist.in.sql",
"geo/074_tpoint_spgist.in.sql", "cbuffer/166_tcbuffer_indexes.in.sql",
"npoint/092_tnpoint_gin.in.sql", "npoint/098_tnpoint_indexes.in.sql",
"pose/114_tpose_indexes.in.sql", "rgeo/134_trgeo_indexes.in.sql",
}
OOS_SUFFIXES = ("_in", "_out", "_recv", "_send", "_typmod_in", "_typmod_out",
"_transfn", "_combinefn", "_finalfn", "_serialize", "_deserialize",
"_sel", "_joinsel", "_supportfn", "_analyze",
"_cmp", "_eq", "_ne", "_lt", "_le", "_gt", "_ge", "_hash", "_hash_extended")
OOS_NAMES = {"range", "multirange", "create_trip", "transform_gk"}


def header_names(*headers):
names = set()
for h in headers:
names |= set(re.findall(r'^\s*extern\s+.+?\b([a-z][A-Za-z0-9_]*)\s*\(', open(os.path.join(INC, h)).read(), re.M))
return names


def sql_addressable():
addr = set()
for path in glob.glob(os.path.join(MDB_SQL, "**", "*.in.sql"), recursive=True):
section = os.path.relpath(path, MDB_SQL).replace(os.sep, "/")
txt = open(path, encoding="utf-8", errors="replace").read()
for m in _CREATE.finditer(txt):
name = m.group(1); tail = txt[m.end():m.end() + 1200].split(';')[0]
cm = _CSYM.search(tail); sym = cm.group(1) if cm else None
key = (sym or name).lower()
if section in OOS_SECTIONS or key in OOS_NAMES or key.endswith(OOS_SUFFIXES):
continue
if sym is not None:
addr.add(key)
return addr


def main():
sigfile = sys.argv[1] if len(sys.argv) > 1 else "/tmp/parity/gen_sigs.txt"
sigs = {}
for line in open(sigfile):
m = _SIG.search(line)
if m:
sigs.setdefault(m.group(2), []).append((m.group(1), m.group(3).strip()))
facade = set()
for f in glob.glob(os.path.join(FACADE, "MeosOps*.java")):
facade |= set(_PUBSTATIC.findall(open(f).read()))
pub = header_names("meos.h", "meos_geo.h", "meos_cbuffer.h", "meos_npoint.h", "meos_pose.h", "meos_rgeo.h")
addr = sql_addressable()
# SQL-addressable, bound by JMEOS, not in the public-surface facade, not already exposed
targets = sorted((addr & set(sigs)) - facade - pub)

L = ["package org.mobilitydb.kafka.meos;", "",
"/**", " * Forwarding facade methods for MEOS functions that the MobilityDB SQL layer",
" * exposes as user functions but whose implementation lives in the internal headers",
" * ({@code meos_internal*.h}). JMEOS binds them; they are exposed here so the facade",
" * matches the SQL user surface as well as the public MEOS API. Each method delegates",
" * to its {@code functions.GeneratedFunctions} export under the",
" * {@link MeosOpsRuntime#MEOS_AVAILABLE} guard.",
" */",
"public final class MeosOpsSqlSurface {", "",
" private MeosOpsSqlSurface() { /* utility */ }", ""]
count = 0
for name in targets:
for ret, args in sigs[name]:
params = [a.strip() for a in args.split(",")] if args else []
decl = ", ".join(f"{t} arg{i}" for i, t in enumerate(params))
call = ", ".join(f"arg{i}" for i in range(len(params)))
ret_kw = "" if ret == "void" else "return "
L += [f" /** MEOS {{@code {name}}} — SQL-surface function (meos_internal). */",
f" public static {ret} {name}({decl}) {{",
f" if (!MeosOpsRuntime.MEOS_AVAILABLE)",
f' throw new UnsupportedOperationException("{name} requires libmeos'
f' — set -Dmobilitykafka.meos.enabled=true");',
f" {ret_kw}functions.GeneratedFunctions.{name}({call});",
" }", ""]
count += 1
L.append("}")
open(OUT, "w").write("\n".join(L) + "\n")
print(f"SQL-surface (internal-backed) functions to expose: {len(targets)}")
print(f"forwarding methods emitted: {count}")
print(f"wrote {OUT}")


if __name__ == "__main__":
main()
1 change: 1 addition & 0 deletions kafka-streams-app/tools/parity/meos-ref/PIN.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
PIN=2c4243a265
136 changes: 136 additions & 0 deletions kafka-streams-app/tools/parity/meos-ref/meos/include/cbuffer/cbuffer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*****************************************************************************
*
* This MobilityDB code is provided under The PostgreSQL License.
* Copyright (c) 2016-2025, Université libre de Bruxelles and MobilityDB
* contributors
*
* MobilityDB includes portions of PostGIS version 3 source code released
* under the GNU General Public License (GPLv2 or later).
* Copyright (c) 2001-2025, PostGIS contributors
*
* Permission to use, copy, modify, and distribute this software and its
* documentation for any purpose, without fee, and without a written
* agreement is hereby granted, provided that the above copyright notice and
* this paragraph and the following two paragraphs appear in all copies.
*
* IN NO EVENT SHALL UNIVERSITE LIBRE DE BRUXELLES BE LIABLE TO ANY PARTY FOR
* DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
* LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION,
* EVEN IF UNIVERSITE LIBRE DE BRUXELLES HAS BEEN ADVISED OF THE POSSIBILITY
* OF SUCH DAMAGE.
*
* UNIVERSITE LIBRE DE BRUXELLES SPECIFICALLY DISCLAIMS ANY WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
* AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS ON
* AN "AS IS" BASIS, AND UNIVERSITE LIBRE DE BRUXELLES HAS NO OBLIGATIONS TO
* PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
*
*****************************************************************************/

/**
* @brief Functions for temporal buffers.
*/

#ifndef __CBUFFER_H__
#define __CBUFFER_H__

/* PostgreSQL */
#include <postgres.h>
/* MEOS */
#include <meos.h>
#include <meos_cbuffer.h>

/*****************************************************************************
* Type definitions
*****************************************************************************/

/* Structure to represent circular buffers */

struct Cbuffer
{
int32 vl_len_; /**< Varlena header (do not touch directly!) */
double radius; /**< radius */
Datum point; /**< First 8 bytes of the point which is passed by
reference. The extra bytes needed are added upon
creation. */
/* variable-length data follows */
};

/*****************************************************************************
* fmgr macros
*****************************************************************************/

#define DatumGetCbufferP(X) ((Cbuffer *) DatumGetPointer(X))
#define CbufferPGetDatum(X) PointerGetDatum(X)
#define PG_GETARG_CBUFFER_P(X) DatumGetCbufferP(PG_GETARG_DATUM(X))
#define PG_RETURN_CBUFFER_P(X) PG_RETURN_POINTER(X)

/*****************************************************************************/

/* Validity functions */

extern bool ensure_valid_cbuffer_cbuffer(const Cbuffer *cb1,
const Cbuffer *cb2);
extern bool ensure_valid_cbuffer_geo(const Cbuffer *cb,
const GSERIALIZED *gs);
extern bool ensure_valid_cbuffer_stbox(const Cbuffer *cb, const STBox *box);
extern bool ensure_valid_cbufferset_cbuffer(const Set *s, const Cbuffer *cb);

/* Collinear and interpolation functions */

extern bool cbuffer_collinear(const Cbuffer *cb1, const Cbuffer *cb2,
const Cbuffer *cbuf3, double ratio);
extern Cbuffer *cbuffersegm_interpolate(const Cbuffer *start,
const Cbuffer *end, long double ratio);
extern long double cbuffersegm_locate(const Cbuffer *start, const Cbuffer *end,
const Cbuffer *value);

/* Input/output functions */

extern Cbuffer *cbuffer_parse(const char **str, bool end);
extern char *cbuffer_wkt_out(Datum value, int maxdd, bool extended);

/* Accessor functions */

extern const GSERIALIZED *cbuffer_point_p(const Cbuffer *cb);

extern Datum datum_cbuffer_round(Datum buffer, Datum size);

/* Transformation functions */

extern Cbuffer *cbuffer_transf_pj(const Cbuffer *cb, int32_t srid_to, const LWPROJ *pj);

/* Distance function */

extern double cbuffer_distance(const Cbuffer *cb1, const Cbuffer *cb2);
extern Datum datum_cbuffer_distance(Datum cb1, Datum cb2);
extern int cbuffersegm_distance_turnpt(const Cbuffer *start1,
const Cbuffer *end1, const Cbuffer *start2, const Cbuffer *end2,
TimestampTz lower, TimestampTz upper, TimestampTz *t1, TimestampTz *t2);

/* Spatial relationship functions */

extern int cbuffer_contains(const Cbuffer *cb1, const Cbuffer *cb2);
extern int cbuffer_covers(const Cbuffer *cb1, const Cbuffer *cb2);
extern int cbuffer_disjoint(const Cbuffer *cb1, const Cbuffer *cb2);
extern int cbuffer_intersects(const Cbuffer *cb1, const Cbuffer *cb2);
extern int cbuffer_dwithin(const Cbuffer *cb1, const Cbuffer *cb2, double dist);
extern int cbuffer_touches(const Cbuffer *cb1, const Cbuffer *cb2);

extern int contains_cbuffer_cbuffer(const Cbuffer *cb1, const Cbuffer *cb2);
extern int covers_cbuffer_cbuffer(const Cbuffer *cb1, const Cbuffer *cb2);
extern int disjoint_cbuffer_cbuffer(const Cbuffer *cb1, const Cbuffer *cb2);
extern int intersects_cbuffer_cbuffer(const Cbuffer *cb1, const Cbuffer *cb2);
extern int dwithin_cbuffer_cbuffer(const Cbuffer *cb1, const Cbuffer *cb2, double dist);
extern int touches_cbuffer_cbuffer(const Cbuffer *cb1, const Cbuffer *cb2);

extern Datum datum_cbuffer_contains(Datum cb1, Datum cb2);
extern Datum datum_cbuffer_covers(Datum cb1, Datum cb2);
extern Datum datum_cbuffer_disjoint(Datum cb1, Datum cb2);
extern Datum datum_cbuffer_intersects(Datum cb1, Datum cb2);
extern Datum datum_cbuffer_dwithin(Datum cb1, Datum cb2, Datum dist);
extern Datum datum_cbuffer_touches(Datum cb1, Datum cb2);

/*****************************************************************************/

#endif /* __CBUFFER_H__ */
Loading