From 2a29c6344b390739aaabd4223004da7e15853ec0 Mon Sep 17 00:00:00 2001 From: Jared Yu Date: Thu, 9 Apr 2026 20:28:54 -0700 Subject: [PATCH 1/5] feat: implement async context manager support for connection, writers, and scanners in Python bindings --- bindings/python/example/example.py | 182 +++++++++---------- bindings/python/fluss/__init__.pyi | 78 ++++++++ bindings/python/src/connection.rs | 22 +++ bindings/python/src/table.rs | 57 ++++++ bindings/python/src/upsert.rs | 32 ++++ bindings/python/test/test_context_manager.py | 135 ++++++++++++++ 6 files changed, 415 insertions(+), 91 deletions(-) create mode 100644 bindings/python/test/test_context_manager.py diff --git a/bindings/python/example/example.py b/bindings/python/example/example.py index 52cefe1e..57a3313d 100644 --- a/bindings/python/example/example.py +++ b/bindings/python/example/example.py @@ -40,9 +40,8 @@ async def main(): config = fluss.Config(config_spec) # Create connection using the static create method - conn = await fluss.FlussConnection.create(config) - - # Define fields for PyArrow + async with await fluss.FlussConnection.create(config) as conn: + # Define fields for PyArrow fields = [ pa.field("id", pa.int32()), pa.field("name", pa.string()), @@ -105,8 +104,8 @@ async def main(): print(f"Got table: {table}") # Create a writer for the table - append_writer = table.new_append().create_writer() - print(f"Created append writer: {append_writer}") + async with table.new_append().create_writer() as append_writer: + print(f"Created append writer: {append_writer}") try: # Demo: Write PyArrow Table @@ -240,10 +239,10 @@ async def main(): append_writer.write_pandas(df) print("Successfully wrote Pandas DataFrame") - # Flush all pending data - print("\n--- Flushing data ---") - await append_writer.flush() - print("Successfully flushed data") + # Note: flush() and close() are automatically called by the 'async with' block on successful exit. + # To manually flush before the context ends, you can still call: + # await append_writer.flush() + print("\n--- Ending append writer context (auto-flushing) ---") # Demo: Check offsets after writes print("\n--- Checking offsets after writes ---") @@ -264,8 +263,8 @@ async def main(): print("\n--- Scanning table (batch scanner) ---") try: # Use new_scan().create_record_batch_log_scanner() for batch-based operations - batch_scanner = await table.new_scan().create_record_batch_log_scanner() - print(f"Created batch scanner: {batch_scanner}") + async with await table.new_scan().create_record_batch_log_scanner() as batch_scanner: + print(f"Created batch scanner: {batch_scanner}") # Subscribe to buckets (required before to_arrow/to_pandas) # Use subscribe_buckets to subscribe all buckets from EARLIEST_OFFSET @@ -284,15 +283,15 @@ async def main(): print(f"Could not convert to PyArrow: {e}") # Create a new batch scanner for to_pandas() test - batch_scanner2 = await table.new_scan().create_record_batch_log_scanner() - batch_scanner2.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) + async with await table.new_scan().create_record_batch_log_scanner() as batch_scanner2: + batch_scanner2.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) - # Try to get as Pandas DataFrame - try: - df_result = batch_scanner2.to_pandas() - print(f"\nAs Pandas DataFrame:\n{df_result}") - except Exception as e: - print(f"Could not convert to Pandas: {e}") + # Try to get as Pandas DataFrame + try: + df_result = batch_scanner2.to_pandas() + print(f"\nAs Pandas DataFrame:\n{df_result}") + except Exception as e: + print(f"Could not convert to Pandas: {e}") # TODO: support to_arrow_batch_reader() # which is reserved for streaming use cases @@ -301,43 +300,43 @@ async def main(): # Test poll_arrow() method for incremental reading as Arrow Table print("\n--- Testing poll_arrow() method ---") - batch_scanner3 = await table.new_scan().create_record_batch_log_scanner() - batch_scanner3.subscribe(bucket_id=0, start_offset=fluss.EARLIEST_OFFSET) - print(f"Subscribed to bucket 0 at EARLIEST_OFFSET ({fluss.EARLIEST_OFFSET})") - - # Poll with a timeout of 5000ms (5 seconds) - # Note: poll_arrow() returns an empty table (not an error) on timeout - try: - poll_result = batch_scanner3.poll_arrow(5000) - print(f"Number of rows: {poll_result.num_rows}") - - if poll_result.num_rows > 0: - poll_df = poll_result.to_pandas() - print(f"Polled data:\n{poll_df}") - else: - print("Empty result (no records available)") - # Empty table still has schema - this is useful! - print(f"Schema: {poll_result.schema}") - - except Exception as e: - print(f"Error during poll_arrow: {e}") + async with await table.new_scan().create_record_batch_log_scanner() as batch_scanner3: + batch_scanner3.subscribe(bucket_id=0, start_offset=fluss.EARLIEST_OFFSET) + print(f"Subscribed to bucket 0 at EARLIEST_OFFSET ({fluss.EARLIEST_OFFSET})") + + # Poll with a timeout of 5000ms (5 seconds) + # Note: poll_arrow() returns an empty table (not an error) on timeout + try: + poll_result = batch_scanner3.poll_arrow(5000) + print(f"Number of rows: {poll_result.num_rows}") + + if poll_result.num_rows > 0: + poll_df = poll_result.to_pandas() + print(f"Polled data:\n{poll_df}") + else: + print("Empty result (no records available)") + # Empty table still has schema - this is useful! + print(f"Schema: {poll_result.schema}") + + except Exception as e: + print(f"Error during poll_arrow: {e}") # Test poll_record_batch() method for batches with metadata print("\n--- Testing poll_record_batch() method ---") - batch_scanner4 = await table.new_scan().create_record_batch_log_scanner() - batch_scanner4.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) + async with await table.new_scan().create_record_batch_log_scanner() as batch_scanner4: + batch_scanner4.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) - try: - batches = batch_scanner4.poll_record_batch(5000) - print(f"Number of batches: {len(batches)}") + try: + batches = batch_scanner4.poll_record_batch(5000) + print(f"Number of batches: {len(batches)}") - for i, batch in enumerate(batches): - print(f" Batch {i}: bucket={batch.bucket}, " - f"offsets={batch.base_offset}-{batch.last_offset}, " - f"rows={batch.batch.num_rows}") + for i, batch in enumerate(batches): + print(f" Batch {i}: bucket={batch.bucket}, " + f"offsets={batch.base_offset}-{batch.last_offset}, " + f"rows={batch.batch.num_rows}") - except Exception as e: - print(f"Error during poll_record_batch: {e}") + except Exception as e: + print(f"Error during poll_record_batch: {e}") except Exception as e: print(f"Error during batch scanning: {e}") @@ -346,34 +345,34 @@ async def main(): print("\n--- Scanning table (record scanner) ---") try: # Use new_scan().create_log_scanner() for record-based operations - record_scanner = await table.new_scan().create_log_scanner() - print(f"Created record scanner: {record_scanner}") - - record_scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) - - # Poll returns ScanRecords — records grouped by bucket - print("\n--- Testing poll() method (record-by-record) ---") - try: - scan_records = record_scanner.poll(5000) - print(f"Total records: {scan_records.count()}, buckets: {len(scan_records.buckets())}") - - # Flat iteration over all records (regardless of bucket) - print(f" Flat iteration: {scan_records.count()} records") - for record in scan_records: - print(f" offset={record.offset}, timestamp={record.timestamp}") - - # Per-bucket access - for bucket in scan_records.buckets(): - bucket_recs = scan_records.records(bucket) - print(f" Bucket {bucket}: {len(bucket_recs)} records") - for record in bucket_recs[:3]: - print(f" offset={record.offset}, " - f"timestamp={record.timestamp}, " - f"change_type={record.change_type}, " - f"row={record.row}") - - except Exception as e: - print(f"Error during poll: {e}") + async with await table.new_scan().create_log_scanner() as record_scanner: + print(f"Created record scanner: {record_scanner}") + + record_scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) + + # Poll returns ScanRecords — records grouped by bucket + print("\n--- Testing poll() method (record-by-record) ---") + try: + scan_records = record_scanner.poll(5000) + print(f"Total records: {scan_records.count()}, buckets: {len(scan_records.buckets())}") + + # Flat iteration over all records (regardless of bucket) + print(f" Flat iteration: {scan_records.count()} records") + for record in scan_records: + print(f" offset={record.offset}, timestamp={record.timestamp}") + + # Per-bucket access + for bucket in scan_records.buckets(): + bucket_recs = scan_records.records(bucket) + print(f" Bucket {bucket}: {len(bucket_recs)} records") + for record in bucket_recs[:3]: + print(f" offset={record.offset}, " + f"timestamp={record.timestamp}, " + f"change_type={record.change_type}, " + f"row={record.row}") + + except Exception as e: + print(f"Error during poll: {e}") except Exception as e: print(f"Error during record scanning: {e}") @@ -381,14 +380,14 @@ async def main(): # Demo: unsubscribe — unsubscribe from a bucket (non-partitioned tables) print("\n--- Testing unsubscribe ---") try: - unsub_scanner = await table.new_scan().create_record_batch_log_scanner() - unsub_scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) - print(f"Subscribed to {num_buckets} buckets") - # Unsubscribe from bucket 0 — future polls will skip this bucket - unsub_scanner.unsubscribe(bucket_id=0) - print("Unsubscribed from bucket 0") - remaining = unsub_scanner.poll_arrow(5000) - print(f"After unsubscribe, got {remaining.num_rows} records (from remaining buckets)") + async with await table.new_scan().create_record_batch_log_scanner() as unsub_scanner: + unsub_scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) + print(f"Subscribed to {num_buckets} buckets") + # Unsubscribe from bucket 0 — future polls will skip this bucket + unsub_scanner.unsubscribe(bucket_id=0) + print("Unsubscribed from bucket 0") + remaining = unsub_scanner.poll_arrow(5000) + print(f"After unsubscribe, got {remaining.num_rows} records (from remaining buckets)") except Exception as e: print(f"Error during unsubscribe test: {e}") @@ -437,8 +436,8 @@ async def main(): # --- Test Upsert --- print("\n--- Testing Upsert (fire-and-forget) ---") try: - upsert_writer = pk_table.new_upsert().create_writer() - print(f"Created upsert writer: {upsert_writer}") + async with pk_table.new_upsert().create_writer() as upsert_writer: + print(f"Created upsert writer: {upsert_writer}") # Fire-and-forget: queue writes synchronously, flush at end. # Records are batched internally for efficiency. @@ -489,9 +488,10 @@ async def main(): ) print("Queued user_id=3 (Charlie)") - # flush() waits for all queued writes to be acknowledged by the server - await upsert_writer.flush() - print("Flushed — all 3 rows acknowledged by server") + # flush() and close() are automatically called by the 'async with' block on successful exit. + # Bypass manual flush: + # await upsert_writer.flush() + print("Ending upsert writer context (auto-flushing)") # Per-record acknowledgment: await the returned handle to block until # the server confirms this specific write, useful when you need to diff --git a/bindings/python/fluss/__init__.pyi b/bindings/python/fluss/__init__.pyi index 02edcdb3..0ad12c69 100644 --- a/bindings/python/fluss/__init__.pyi +++ b/bindings/python/fluss/__init__.pyi @@ -253,6 +253,13 @@ class FlussConnection: exc_value: Optional[BaseException], traceback: Optional[TracebackType], ) -> bool: ... + async def __aenter__(self) -> FlussConnection: ... + async def __aexit__( + self, + exc_type: Optional[type], + exc_value: Optional[BaseException], + traceback: Optional[TracebackType], + ) -> bool: ... def __repr__(self) -> str: ... class ServerNode: @@ -611,6 +618,37 @@ class AppendWriter: def write_arrow_batch(self, batch: pa.RecordBatch) -> WriteResultHandle: ... def write_pandas(self, df: pd.DataFrame) -> None: ... async def flush(self) -> None: ... + async def __aenter__(self) -> AppendWriter: + """ + Enter the async context manager. + + Returns: + The AppendWriter instance. + """ + ... + async def __aexit__( + self, + exc_type: Optional[type], + exc_value: Optional[BaseException], + traceback: Optional[TracebackType], + ) -> bool: + """ + Exit the async context manager. + + On successful exit, the writer is automatically flushed to ensure + all pending records are sent and acknowledged. + + Note on Exceptions: + If an exception occurs inside the `async with` block, `flush()` is + bypassed to return control to the event loop immediately. However, + any records already passed to `append()` prior to the exception + reside in a shared background buffer and will still be transmitted + to the server. + + To achieve true atomicity, buffer your records in a Python list and + write them in a single batch at the end of your logic. + """ + ... def __repr__(self) -> str: ... class UpsertWriter: @@ -644,6 +682,37 @@ class UpsertWriter: async def flush(self) -> None: """Flush all pending upsert/delete operations to the server.""" ... + async def __aenter__(self) -> UpsertWriter: + """ + Enter the async context manager. + + Returns: + The UpsertWriter instance. + """ + ... + async def __aexit__( + self, + exc_type: Optional[type], + exc_value: Optional[BaseException], + traceback: Optional[TracebackType], + ) -> bool: + """ + Exit the async context manager. + + On successful exit, the writer is automatically flushed to ensure + all pending records are sent and acknowledged. + + Note on Exceptions: + If an exception occurs inside the `async with` block, `flush()` is + bypassed to return control to the event loop immediately. However, + any records already passed to `upsert()` or `delete()` prior to the + exception reside in a shared background buffer and will still be + transmitted to the server. + + To achieve true atomicity, buffer your records in a Python list and + write them in a single batch at the end of your logic. + """ + ... def __repr__(self) -> str: ... @@ -807,6 +876,15 @@ class LogScanner: You must call subscribe(), subscribe_buckets(), or subscribe_partition() first. """ + ... + def close(self) -> None: ... + async def __aenter__(self) -> LogScanner: ... + async def __aexit__( + self, + exc_type: Optional[type], + exc_value: Optional[BaseException], + traceback: Optional[TracebackType], + ) -> bool: ... def __repr__(self) -> str: ... def __aiter__(self) -> AsyncIterator[Union[ScanRecord, RecordBatch]]: ... diff --git a/bindings/python/src/connection.rs b/bindings/python/src/connection.rs index a8d2d9e3..8112e2d5 100644 --- a/bindings/python/src/connection.rs +++ b/bindings/python/src/connection.rs @@ -104,6 +104,28 @@ impl FlussConnection { Ok(false) } + // Enter the async runtime context (for 'async with' statement) + fn __aenter__<'py>(slf: PyRef<'py, Self>, py: Python<'py>) -> PyResult> { + let py_slf = slf.into_pyobject(py)?.unbind(); + future_into_py(py, async move { Ok(py_slf) }) + } + + // Exit the async runtime context (for 'async with' statement) + #[pyo3(signature = (_exc_type=None, _exc_value=None, _traceback=None))] + fn __aexit__<'py>( + &mut self, + py: Python<'py>, + _exc_type: Option>, + _exc_value: Option>, + _traceback: Option>, + ) -> PyResult> { + future_into_py(py, async move { + // In the future, we could call an async close on the core connection here + // e.g., client.close().await; + Ok(false) + }) + } + fn __repr__(&self) -> String { "FlussConnection()".to_string() } diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index 8d92aba1..338a9339 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -989,6 +989,37 @@ impl AppendWriter { }) } + // Enter the async runtime context (for 'async with' statement) + fn __aenter__<'py>(slf: PyRef<'py, Self>, py: Python<'py>) -> PyResult> { + let py_slf = slf.into_pyobject(py)?.unbind(); + future_into_py(py, async move { Ok(py_slf) }) + } + + // Exit the async runtime context (for 'async with' statement) + /// On successful exit, the writer is automatically flushed. + /// If an exception occurs, the flush is skipped to allow immediate error + /// propagation, though pending records may still be sent in the background. + #[pyo3(signature = (exc_type=None, _exc_value=None, _traceback=None))] + fn __aexit__<'py>( + &self, + py: Python<'py>, + exc_type: Option>, + _exc_value: Option>, + _traceback: Option>, + ) -> PyResult> { + let has_error = exc_type.is_some(); + let inner = self.inner.clone(); + future_into_py(py, async move { + if !has_error { + inner + .flush() + .await + .map_err(|e| FlussError::from_core_error(&e))?; + } + Ok(false) + }) + } + fn __repr__(&self) -> String { "AppendWriter()".to_string() } @@ -2331,6 +2362,32 @@ async def _async_scan_generic(scanner, method_name): fn __repr__(&self) -> String { format!("LogScanner(table={})", self.table_info.table_path) } + + /// Close the scanner + pub fn close(&self) -> PyResult<()> { + Ok(()) + } + + // Enter the async runtime context (for 'async with' statement) + fn __aenter__<'py>(slf: PyRef<'py, Self>, py: Python<'py>) -> PyResult> { + let py_slf = slf.into_pyobject(py)?.unbind(); + future_into_py(py, async move { Ok(py_slf) }) + } + + // Exit the async runtime context (for 'async with' statement) + #[pyo3(signature = (_exc_type=None, _exc_value=None, _traceback=None))] + fn __aexit__<'py>( + &self, + py: Python<'py>, + _exc_type: Option>, + _exc_value: Option>, + _traceback: Option>, + ) -> PyResult> { + future_into_py(py, async move { + // In the future, we can call an async close on the core scanner here + Ok(false) + }) + } } impl LogScanner { diff --git a/bindings/python/src/upsert.rs b/bindings/python/src/upsert.rs index 02ad7fa4..f597737d 100644 --- a/bindings/python/src/upsert.rs +++ b/bindings/python/src/upsert.rs @@ -97,6 +97,7 @@ impl UpsertWriter { /// /// Returns: /// None on success + /// Flush any pending data pub fn flush<'py>(&self, py: Python<'py>) -> PyResult> { let writer = self.writer.clone(); @@ -108,6 +109,37 @@ impl UpsertWriter { }) } + // Enter the async runtime context (for 'async with' statement) + fn __aenter__<'py>(slf: PyRef<'py, Self>, py: Python<'py>) -> PyResult> { + let py_slf = slf.into_pyobject(py)?.unbind(); + future_into_py(py, async move { Ok(py_slf) }) + } + + // Exit the async runtime context (for 'async with' statement) + /// On successful exit, the writer is automatically flushed. + /// If an exception occurs, the flush is skipped to allow immediate error + /// propagation, though pending records may still be sent in the background. + #[pyo3(signature = (exc_type=None, _exc_value=None, _traceback=None))] + fn __aexit__<'py>( + &self, + py: Python<'py>, + exc_type: Option>, + _exc_value: Option>, + _traceback: Option>, + ) -> PyResult> { + let has_error = exc_type.is_some(); + let writer = self.writer.clone(); + future_into_py(py, async move { + if !has_error { + writer + .flush() + .await + .map_err(|e| FlussError::from_core_error(&e))?; + } + Ok(false) + }) + } + fn __repr__(&self) -> String { "UpsertWriter()".to_string() } diff --git a/bindings/python/test/test_context_manager.py b/bindings/python/test/test_context_manager.py new file mode 100644 index 00000000..f94daab1 --- /dev/null +++ b/bindings/python/test/test_context_manager.py @@ -0,0 +1,135 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import asyncio +import pytest +import pyarrow as pa +import time +import fluss + +def _poll_records(scanner, expected_count, timeout_s=10): + """Poll a record-based scanner until expected_count records are collected.""" + collected = [] + deadline = time.monotonic() + timeout_s + while len(collected) < expected_count and time.monotonic() < deadline: + records = scanner.poll(5000) + collected.extend(records) + return collected + +@pytest.mark.asyncio +async def test_connection_context_manager(plaintext_bootstrap_servers): + config = fluss.Config({"bootstrap.servers": plaintext_bootstrap_servers}) + async with await fluss.FlussConnection.create(config) as conn: + admin = conn.get_admin() + nodes = await admin.get_server_nodes() + assert len(nodes) > 0 + # conn should be closed (though currently close is a no-op in python side, but verifies syntax) + +@pytest.mark.asyncio +async def test_append_writer_success_flush(connection, admin): + table_path = fluss.TablePath("fluss", "test_append_ctx_success") + await admin.drop_table(table_path, ignore_if_not_exists=True) + + schema = fluss.Schema(pa.schema([pa.field("a", pa.int32())])) + await admin.create_table(table_path, fluss.TableDescriptor(schema)) + + table = await connection.get_table(table_path) + + async with table.new_append().create_writer() as writer: + writer.append({"a": 1}) + writer.append({"a": 2}) + # No explicit flush here + + # After context exit, data should be flushed + scanner = await table.new_scan().create_log_scanner() + scanner.subscribe(0, fluss.EARLIEST_OFFSET) + records = _poll_records(scanner, expected_count=2) + assert len(records) == 2 + assert sorted([r.row["a"] for r in records]) == [1, 2] + +@pytest.mark.asyncio +async def test_append_writer_exception_no_flush(connection, admin): + table_path = fluss.TablePath("fluss", "test_append_ctx_fail") + await admin.drop_table(table_path, ignore_if_not_exists=True) + + schema = fluss.Schema(pa.schema([pa.field("a", pa.int32())])) + await admin.create_table(table_path, fluss.TableDescriptor(schema)) + table = await connection.get_table(table_path) + + class TestException(Exception): pass + + start_time = time.perf_counter() + try: + async with table.new_append().create_writer() as writer: + writer.append({"a": 100}) + raise TestException("abort") + except TestException: + pass + duration = time.perf_counter() - start_time + + # Verification: + # 1. The exception was propagated immediately. + # 2. The block exited nearly instantly because it bypassed the network flush. + assert duration < 0.1, f"Context exit took too long ({duration:.3f}s), likely performed a flush" + + # NOTE: Records may still eventually arrive because of the background sender threads. + # We don't assert 0 records here because Fluss does not support true transactional rollback. + +@pytest.mark.asyncio +async def test_upsert_writer_context_manager(connection, admin): + table_path = fluss.TablePath("fluss", "test_upsert_ctx") + await admin.drop_table(table_path, ignore_if_not_exists=True) + + schema = fluss.Schema(pa.schema([pa.field("id", pa.int32()), pa.field("v", pa.string())]), primary_keys=["id"]) + await admin.create_table(table_path, fluss.TableDescriptor(schema)) + + table = await connection.get_table(table_path) + + # Success path: verify it flushes + async with table.new_upsert().create_writer() as writer: + writer.upsert({"id": 1, "v": "a"}) + + lookuper = table.new_lookup().create_lookuper() + res = await lookuper.lookup({"id": 1}) + assert res is not None + assert res["v"] == "a" + + # Failure path: verify it bypasses flush + class TestException(Exception): pass + start_time = time.perf_counter() + try: + async with table.new_upsert().create_writer() as writer: + writer.upsert({"id": 2, "v": "b"}) + raise TestException("abort") + except TestException: + pass + duration = time.perf_counter() - start_time + assert duration < 0.1, f"Context exit took too long ({duration:.3f}s), likely performed a flush" + +@pytest.mark.asyncio +async def test_log_scanner_context_manager(connection, admin): + table_path = fluss.TablePath("fluss", "test_scanner_ctx") + await admin.drop_table(table_path, ignore_if_not_exists=True) + schema = fluss.Schema(pa.schema([pa.field("a", pa.int32())])) + await admin.create_table(table_path, fluss.TableDescriptor(schema)) + table = await connection.get_table(table_path) + + async with await table.new_scan().create_log_scanner() as scanner: + scanner.subscribe(0, fluss.EARLIEST_OFFSET) + # Verifies it works and closes properly + res = scanner.poll(100) + assert len(res) == 0 From dec09214ba4e505410987a8e0203f2f8525d56d4 Mon Sep 17 00:00:00 2001 From: Jared Yu Date: Thu, 9 Apr 2026 21:07:58 -0700 Subject: [PATCH 2/5] feat: expand python context manager test coverage for scanners and connections --- bindings/python/test/test_context_manager.py | 51 ++++++++++++++++++++ bindings/python/test/test_log_table.py | 1 + 2 files changed, 52 insertions(+) diff --git a/bindings/python/test/test_context_manager.py b/bindings/python/test/test_context_manager.py index f94daab1..0e4f6cc1 100644 --- a/bindings/python/test/test_context_manager.py +++ b/bindings/python/test/test_context_manager.py @@ -133,3 +133,54 @@ async def test_log_scanner_context_manager(connection, admin): # Verifies it works and closes properly res = scanner.poll(100) assert len(res) == 0 + +@pytest.mark.asyncio +async def test_connection_context_manager_exception(plaintext_bootstrap_servers): + config = fluss.Config({"bootstrap.servers": plaintext_bootstrap_servers}) + class TestException(Exception): pass + + try: + async with await fluss.FlussConnection.create(config) as conn: + raise TestException("connection error") + except TestException: + pass + # If we reach here without hanging, the connection __aexit__ gracefully handled the error + +@pytest.mark.asyncio +async def test_record_batch_scanner_context_manager(connection, admin): + table_path = fluss.TablePath("fluss", "test_batch_scanner_ctx") + await admin.drop_table(table_path, ignore_if_not_exists=True) + schema = fluss.Schema(pa.schema([pa.field("a", pa.int32())])) + await admin.create_table(table_path, fluss.TableDescriptor(schema)) + table = await connection.get_table(table_path) + + async with await table.new_scan().create_record_batch_log_scanner() as scanner: + scanner.subscribe(0, fluss.EARLIEST_OFFSET) + res = scanner.poll_arrow(100) + assert res.num_rows == 0 + +@pytest.mark.asyncio +async def test_scanner_exception_propagation(connection, admin): + table_path = fluss.TablePath("fluss", "test_scanner_ctx_fail") + await admin.drop_table(table_path, ignore_if_not_exists=True) + schema = fluss.Schema(pa.schema([pa.field("a", pa.int32())])) + await admin.create_table(table_path, fluss.TableDescriptor(schema)) + table = await connection.get_table(table_path) + + class TestException(Exception): pass + + # Test record scanner exception + try: + async with await table.new_scan().create_log_scanner() as scanner: + scanner.subscribe(0, fluss.EARLIEST_OFFSET) + raise TestException("scanner error") + except TestException: + pass + + # Test batch scanner exception + try: + async with await table.new_scan().create_record_batch_log_scanner() as scanner: + scanner.subscribe(0, fluss.EARLIEST_OFFSET) + raise TestException("batch scanner error") + except TestException: + pass \ No newline at end of file diff --git a/bindings/python/test/test_log_table.py b/bindings/python/test/test_log_table.py index eb118748..48567d41 100644 --- a/bindings/python/test/test_log_table.py +++ b/bindings/python/test/test_log_table.py @@ -143,6 +143,7 @@ async def test_list_offsets(connection, admin): assert latest[0] == 0 before_append_ms = int(time.time() * 1000) + await asyncio.sleep(0.1) # Append some records table = await connection.get_table(table_path) From 180ad11b975ab7adf75aa74128f06cd8ebe055e8 Mon Sep 17 00:00:00 2001 From: Jared Yu Date: Sat, 11 Apr 2026 09:10:29 -0700 Subject: [PATCH 3/5] refactor: adjust indentation and clean up Python example code structure --- bindings/python/example/example.py | 1692 ++++++++++++++-------------- 1 file changed, 846 insertions(+), 846 deletions(-) diff --git a/bindings/python/example/example.py b/bindings/python/example/example.py index 57a3313d..f438a6c1 100644 --- a/bindings/python/example/example.py +++ b/bindings/python/example/example.py @@ -42,900 +42,900 @@ async def main(): # Create connection using the static create method async with await fluss.FlussConnection.create(config) as conn: # Define fields for PyArrow - fields = [ - pa.field("id", pa.int32()), - pa.field("name", pa.string()), - pa.field("score", pa.float32()), - pa.field("age", pa.int32()), - pa.field("birth_date", pa.date32()), - pa.field("check_in_time", pa.time32("ms")), - pa.field("created_at", pa.timestamp("us")), # TIMESTAMP (NTZ) - pa.field("updated_at", pa.timestamp("us", tz="UTC")), # TIMESTAMP_LTZ - pa.field("salary", pa.decimal128(10, 2)), - ] - - # Create a PyArrow schema - schema = pa.schema(fields) - - # Create a Fluss Schema first (this is what TableDescriptor expects) - fluss_schema = fluss.Schema(schema) - - # Create a Fluss TableDescriptor - table_descriptor = fluss.TableDescriptor(fluss_schema) - - # Get the admin for Fluss - admin = conn.get_admin() - - # Create a Fluss table - table_path = fluss.TablePath("fluss", "sample_table_types") - - try: - await admin.create_table(table_path, table_descriptor, True) - print(f"Created table: {table_path}") - except Exception as e: - print(f"Table creation failed: {e}") - - # Get table information via admin - try: - table_info = await admin.get_table_info(table_path) - print(f"Table info: {table_info}") - print(f"Table ID: {table_info.table_id}") - print(f"Schema ID: {table_info.schema_id}") - print(f"Created time: {table_info.created_time}") - print(f"Primary keys: {table_info.get_primary_keys()}") - except Exception as e: - print(f"Failed to get table info: {e}") - - # Demo: List offsets - print("\n--- Testing list_offsets() ---") - try: - # Query latest offsets using OffsetSpec factory method - offsets = await admin.list_offsets( - table_path, - bucket_ids=[0], - offset_spec=fluss.OffsetSpec.latest() - ) - print(f"Latest offsets for table (before writes): {offsets}") - except Exception as e: - print(f"Failed to list offsets: {e}") - - # Get the table instance - table = await conn.get_table(table_path) - print(f"Got table: {table}") - - # Create a writer for the table - async with table.new_append().create_writer() as append_writer: - print(f"Created append writer: {append_writer}") - - try: - # Demo: Write PyArrow Table - print("\n--- Testing PyArrow Table write ---") - pa_table = pa.Table.from_arrays( - [ - pa.array([1, 2, 3], type=pa.int32()), - pa.array(["Alice", "Bob", "Charlie"], type=pa.string()), - pa.array([95.2, 87.2, 92.1], type=pa.float32()), - pa.array([25, 30, 35], type=pa.int32()), - pa.array( - [date(1999, 5, 15), date(1994, 3, 20), date(1989, 11, 8)], - type=pa.date32(), - ), - pa.array( - [dt_time(9, 0, 0), dt_time(9, 30, 0), dt_time(10, 0, 0)], - type=pa.time32("ms"), - ), - pa.array( - [ - datetime(2024, 1, 15, 10, 30), - datetime(2024, 1, 15, 11, 0), - datetime(2024, 1, 15, 11, 30), - ], - type=pa.timestamp("us"), - ), - pa.array( - [ - datetime(2024, 1, 15, 10, 30), - datetime(2024, 1, 15, 11, 0), - datetime(2024, 1, 15, 11, 30), - ], - type=pa.timestamp("us", tz="UTC"), - ), - pa.array( - [Decimal("75000.00"), Decimal("82000.50"), Decimal("95000.75")], - type=pa.decimal128(10, 2), - ), - ], - schema=schema, - ) + fields = [ + pa.field("id", pa.int32()), + pa.field("name", pa.string()), + pa.field("score", pa.float32()), + pa.field("age", pa.int32()), + pa.field("birth_date", pa.date32()), + pa.field("check_in_time", pa.time32("ms")), + pa.field("created_at", pa.timestamp("us")), # TIMESTAMP (NTZ) + pa.field("updated_at", pa.timestamp("us", tz="UTC")), # TIMESTAMP_LTZ + pa.field("salary", pa.decimal128(10, 2)), + ] - append_writer.write_arrow(pa_table) - print("Successfully wrote PyArrow Table") - - # Demo: Write PyArrow RecordBatch - print("\n--- Testing PyArrow RecordBatch write ---") - pa_record_batch = pa.RecordBatch.from_arrays( - [ - pa.array([4, 5], type=pa.int32()), - pa.array(["David", "Eve"], type=pa.string()), - pa.array([88.5, 91.0], type=pa.float32()), - pa.array([28, 32], type=pa.int32()), - pa.array([date(1996, 7, 22), date(1992, 12, 1)], type=pa.date32()), - pa.array([dt_time(14, 15, 0), dt_time(8, 45, 0)], type=pa.time32("ms")), - pa.array( - [datetime(2024, 1, 16, 9, 0), datetime(2024, 1, 16, 9, 30)], - type=pa.timestamp("us"), - ), - pa.array( - [datetime(2024, 1, 16, 9, 0), datetime(2024, 1, 16, 9, 30)], - type=pa.timestamp("us", tz="UTC"), - ), - pa.array( - [Decimal("68000.00"), Decimal("72500.25")], - type=pa.decimal128(10, 2), - ), - ], - schema=schema, - ) + # Create a PyArrow schema + schema = pa.schema(fields) - append_writer.write_arrow_batch(pa_record_batch) - print("Successfully wrote PyArrow RecordBatch") - - # Test 3: Append single rows with Date, Time, Timestamp, Decimal - print("\n--- Testing single row append with temporal/decimal types ---") - # Dict input with all types including Date, Time, Timestamp, Decimal - append_writer.append( - { - "id": 8, - "name": "Helen", - "score": 93.5, - "age": 26, - "birth_date": date(1998, 4, 10), - "check_in_time": dt_time(11, 30, 45), - "created_at": datetime(2024, 1, 17, 14, 0, 0), - "updated_at": datetime(2024, 1, 17, 14, 0, 0), - "salary": Decimal("88000.00"), - } - ) - print("Successfully appended row (dict with Date, Time, Timestamp, Decimal)") - - # List input with all types - append_writer.append( - [ - 9, - "Ivan", - 90.0, - 31, - date(1993, 8, 25), - dt_time(16, 45, 0), - datetime(2024, 1, 17, 15, 30, 0), - datetime(2024, 1, 17, 15, 30, 0), - Decimal("91500.50"), - ] - ) - print("Successfully appended row (list with Date, Time, Timestamp, Decimal)") - - # Demo: Write Pandas DataFrame - print("\n--- Testing Pandas DataFrame write ---") - df = pd.DataFrame( - { - "id": [10, 11], - "name": ["Frank", "Grace"], - "score": [89.3, 94.7], - "age": [29, 27], - "birth_date": [date(1995, 2, 14), date(1997, 9, 30)], - "check_in_time": [dt_time(10, 0, 0), dt_time(10, 30, 0)], - "created_at": [ - datetime(2024, 1, 18, 8, 0), - datetime(2024, 1, 18, 8, 30), - ], - "updated_at": [ - datetime(2024, 1, 18, 8, 0), - datetime(2024, 1, 18, 8, 30), - ], - "salary": [Decimal("79000.00"), Decimal("85500.75")], - } - ) + # Create a Fluss Schema first (this is what TableDescriptor expects) + fluss_schema = fluss.Schema(schema) - append_writer.write_pandas(df) - print("Successfully wrote Pandas DataFrame") + # Create a Fluss TableDescriptor + table_descriptor = fluss.TableDescriptor(fluss_schema) - # Note: flush() and close() are automatically called by the 'async with' block on successful exit. - # To manually flush before the context ends, you can still call: - # await append_writer.flush() - print("\n--- Ending append writer context (auto-flushing) ---") + # Get the admin for Fluss + admin = conn.get_admin() + + # Create a Fluss table + table_path = fluss.TablePath("fluss", "sample_table_types") - # Demo: Check offsets after writes - print("\n--- Checking offsets after writes ---") try: + await admin.create_table(table_path, table_descriptor, True) + print(f"Created table: {table_path}") + except Exception as e: + print(f"Table creation failed: {e}") + + # Get table information via admin + try: + table_info = await admin.get_table_info(table_path) + print(f"Table info: {table_info}") + print(f"Table ID: {table_info.table_id}") + print(f"Schema ID: {table_info.schema_id}") + print(f"Created time: {table_info.created_time}") + print(f"Primary keys: {table_info.get_primary_keys()}") + except Exception as e: + print(f"Failed to get table info: {e}") + + # Demo: List offsets + print("\n--- Testing list_offsets() ---") + try: + # Query latest offsets using OffsetSpec factory method offsets = await admin.list_offsets( table_path, bucket_ids=[0], offset_spec=fluss.OffsetSpec.latest() ) - print(f"Latest offsets after writing 7 records: {offsets}") + print(f"Latest offsets for table (before writes): {offsets}") except Exception as e: print(f"Failed to list offsets: {e}") - except Exception as e: - print(f"Error during writing: {e}") + # Get the table instance + table = await conn.get_table(table_path) + print(f"Got table: {table}") - # Now scan the table to verify data was written - print("\n--- Scanning table (batch scanner) ---") - try: - # Use new_scan().create_record_batch_log_scanner() for batch-based operations - async with await table.new_scan().create_record_batch_log_scanner() as batch_scanner: - print(f"Created batch scanner: {batch_scanner}") + # Create a writer for the table + async with table.new_append().create_writer() as append_writer: + print(f"Created append writer: {append_writer}") - # Subscribe to buckets (required before to_arrow/to_pandas) - # Use subscribe_buckets to subscribe all buckets from EARLIEST_OFFSET - num_buckets = (await admin.get_table_info(table_path)).num_buckets - batch_scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) - print(f"Subscribed to {num_buckets} buckets from EARLIEST_OFFSET") + try: + # Demo: Write PyArrow Table + print("\n--- Testing PyArrow Table write ---") + pa_table = pa.Table.from_arrays( + [ + pa.array([1, 2, 3], type=pa.int32()), + pa.array(["Alice", "Bob", "Charlie"], type=pa.string()), + pa.array([95.2, 87.2, 92.1], type=pa.float32()), + pa.array([25, 30, 35], type=pa.int32()), + pa.array( + [date(1999, 5, 15), date(1994, 3, 20), date(1989, 11, 8)], + type=pa.date32(), + ), + pa.array( + [dt_time(9, 0, 0), dt_time(9, 30, 0), dt_time(10, 0, 0)], + type=pa.time32("ms"), + ), + pa.array( + [ + datetime(2024, 1, 15, 10, 30), + datetime(2024, 1, 15, 11, 0), + datetime(2024, 1, 15, 11, 30), + ], + type=pa.timestamp("us"), + ), + pa.array( + [ + datetime(2024, 1, 15, 10, 30), + datetime(2024, 1, 15, 11, 0), + datetime(2024, 1, 15, 11, 30), + ], + type=pa.timestamp("us", tz="UTC"), + ), + pa.array( + [Decimal("75000.00"), Decimal("82000.50"), Decimal("95000.75")], + type=pa.decimal128(10, 2), + ), + ], + schema=schema, + ) - # Read all data using to_arrow() - print("Scanning results using to_arrow():") + append_writer.write_arrow(pa_table) + print("Successfully wrote PyArrow Table") + + # Demo: Write PyArrow RecordBatch + print("\n--- Testing PyArrow RecordBatch write ---") + pa_record_batch = pa.RecordBatch.from_arrays( + [ + pa.array([4, 5], type=pa.int32()), + pa.array(["David", "Eve"], type=pa.string()), + pa.array([88.5, 91.0], type=pa.float32()), + pa.array([28, 32], type=pa.int32()), + pa.array([date(1996, 7, 22), date(1992, 12, 1)], type=pa.date32()), + pa.array([dt_time(14, 15, 0), dt_time(8, 45, 0)], type=pa.time32("ms")), + pa.array( + [datetime(2024, 1, 16, 9, 0), datetime(2024, 1, 16, 9, 30)], + type=pa.timestamp("us"), + ), + pa.array( + [datetime(2024, 1, 16, 9, 0), datetime(2024, 1, 16, 9, 30)], + type=pa.timestamp("us", tz="UTC"), + ), + pa.array( + [Decimal("68000.00"), Decimal("72500.25")], + type=pa.decimal128(10, 2), + ), + ], + schema=schema, + ) - # Try to get as PyArrow Table - try: - pa_table_result = batch_scanner.to_arrow() - print(f"\nAs PyArrow Table: {pa_table_result}") - except Exception as e: - print(f"Could not convert to PyArrow: {e}") + append_writer.write_arrow_batch(pa_record_batch) + print("Successfully wrote PyArrow RecordBatch") + + # Test 3: Append single rows with Date, Time, Timestamp, Decimal + print("\n--- Testing single row append with temporal/decimal types ---") + # Dict input with all types including Date, Time, Timestamp, Decimal + append_writer.append( + { + "id": 8, + "name": "Helen", + "score": 93.5, + "age": 26, + "birth_date": date(1998, 4, 10), + "check_in_time": dt_time(11, 30, 45), + "created_at": datetime(2024, 1, 17, 14, 0, 0), + "updated_at": datetime(2024, 1, 17, 14, 0, 0), + "salary": Decimal("88000.00"), + } + ) + print("Successfully appended row (dict with Date, Time, Timestamp, Decimal)") + + # List input with all types + append_writer.append( + [ + 9, + "Ivan", + 90.0, + 31, + date(1993, 8, 25), + dt_time(16, 45, 0), + datetime(2024, 1, 17, 15, 30, 0), + datetime(2024, 1, 17, 15, 30, 0), + Decimal("91500.50"), + ] + ) + print("Successfully appended row (list with Date, Time, Timestamp, Decimal)") + + # Demo: Write Pandas DataFrame + print("\n--- Testing Pandas DataFrame write ---") + df = pd.DataFrame( + { + "id": [10, 11], + "name": ["Frank", "Grace"], + "score": [89.3, 94.7], + "age": [29, 27], + "birth_date": [date(1995, 2, 14), date(1997, 9, 30)], + "check_in_time": [dt_time(10, 0, 0), dt_time(10, 30, 0)], + "created_at": [ + datetime(2024, 1, 18, 8, 0), + datetime(2024, 1, 18, 8, 30), + ], + "updated_at": [ + datetime(2024, 1, 18, 8, 0), + datetime(2024, 1, 18, 8, 30), + ], + "salary": [Decimal("79000.00"), Decimal("85500.75")], + } + ) - # Create a new batch scanner for to_pandas() test - async with await table.new_scan().create_record_batch_log_scanner() as batch_scanner2: - batch_scanner2.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) + append_writer.write_pandas(df) + print("Successfully wrote Pandas DataFrame") - # Try to get as Pandas DataFrame + # Note: flush() and close() are automatically called by the 'async with' block on successful exit. + # To manually flush before the context ends, you can still call: + # await append_writer.flush() + print("\n--- Ending append writer context (auto-flushing) ---") + + # Demo: Check offsets after writes + print("\n--- Checking offsets after writes ---") try: - df_result = batch_scanner2.to_pandas() - print(f"\nAs Pandas DataFrame:\n{df_result}") + offsets = await admin.list_offsets( + table_path, + bucket_ids=[0], + offset_spec=fluss.OffsetSpec.latest() + ) + print(f"Latest offsets after writing 7 records: {offsets}") except Exception as e: - print(f"Could not convert to Pandas: {e}") - - # TODO: support to_arrow_batch_reader() - # which is reserved for streaming use cases + print(f"Failed to list offsets: {e}") - # TODO: support to_duckdb() + except Exception as e: + print(f"Error during writing: {e}") - # Test poll_arrow() method for incremental reading as Arrow Table - print("\n--- Testing poll_arrow() method ---") - async with await table.new_scan().create_record_batch_log_scanner() as batch_scanner3: - batch_scanner3.subscribe(bucket_id=0, start_offset=fluss.EARLIEST_OFFSET) - print(f"Subscribed to bucket 0 at EARLIEST_OFFSET ({fluss.EARLIEST_OFFSET})") + # Now scan the table to verify data was written + print("\n--- Scanning table (batch scanner) ---") + try: + # Use new_scan().create_record_batch_log_scanner() for batch-based operations + async with await table.new_scan().create_record_batch_log_scanner() as batch_scanner: + print(f"Created batch scanner: {batch_scanner}") - # Poll with a timeout of 5000ms (5 seconds) - # Note: poll_arrow() returns an empty table (not an error) on timeout - try: - poll_result = batch_scanner3.poll_arrow(5000) - print(f"Number of rows: {poll_result.num_rows}") + # Subscribe to buckets (required before to_arrow/to_pandas) + # Use subscribe_buckets to subscribe all buckets from EARLIEST_OFFSET + num_buckets = (await admin.get_table_info(table_path)).num_buckets + batch_scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) + print(f"Subscribed to {num_buckets} buckets from EARLIEST_OFFSET") - if poll_result.num_rows > 0: - poll_df = poll_result.to_pandas() - print(f"Polled data:\n{poll_df}") - else: - print("Empty result (no records available)") - # Empty table still has schema - this is useful! - print(f"Schema: {poll_result.schema}") + # Read all data using to_arrow() + print("Scanning results using to_arrow():") + # Try to get as PyArrow Table + try: + pa_table_result = batch_scanner.to_arrow() + print(f"\nAs PyArrow Table: {pa_table_result}") except Exception as e: - print(f"Error during poll_arrow: {e}") + print(f"Could not convert to PyArrow: {e}") + + # Create a new batch scanner for to_pandas() test + async with await table.new_scan().create_record_batch_log_scanner() as batch_scanner2: + batch_scanner2.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) + + # Try to get as Pandas DataFrame + try: + df_result = batch_scanner2.to_pandas() + print(f"\nAs Pandas DataFrame:\n{df_result}") + except Exception as e: + print(f"Could not convert to Pandas: {e}") + + # TODO: support to_arrow_batch_reader() + # which is reserved for streaming use cases + + # TODO: support to_duckdb() + + # Test poll_arrow() method for incremental reading as Arrow Table + print("\n--- Testing poll_arrow() method ---") + async with await table.new_scan().create_record_batch_log_scanner() as batch_scanner3: + batch_scanner3.subscribe(bucket_id=0, start_offset=fluss.EARLIEST_OFFSET) + print(f"Subscribed to bucket 0 at EARLIEST_OFFSET ({fluss.EARLIEST_OFFSET})") + + # Poll with a timeout of 5000ms (5 seconds) + # Note: poll_arrow() returns an empty table (not an error) on timeout + try: + poll_result = batch_scanner3.poll_arrow(5000) + print(f"Number of rows: {poll_result.num_rows}") + + if poll_result.num_rows > 0: + poll_df = poll_result.to_pandas() + print(f"Polled data:\n{poll_df}") + else: + print("Empty result (no records available)") + # Empty table still has schema - this is useful! + print(f"Schema: {poll_result.schema}") + + except Exception as e: + print(f"Error during poll_arrow: {e}") + + # Test poll_record_batch() method for batches with metadata + print("\n--- Testing poll_record_batch() method ---") + async with await table.new_scan().create_record_batch_log_scanner() as batch_scanner4: + batch_scanner4.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) + + try: + batches = batch_scanner4.poll_record_batch(5000) + print(f"Number of batches: {len(batches)}") + + for i, batch in enumerate(batches): + print(f" Batch {i}: bucket={batch.bucket}, " + f"offsets={batch.base_offset}-{batch.last_offset}, " + f"rows={batch.batch.num_rows}") + + except Exception as e: + print(f"Error during poll_record_batch: {e}") - # Test poll_record_batch() method for batches with metadata - print("\n--- Testing poll_record_batch() method ---") - async with await table.new_scan().create_record_batch_log_scanner() as batch_scanner4: - batch_scanner4.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) + except Exception as e: + print(f"Error during batch scanning: {e}") - try: - batches = batch_scanner4.poll_record_batch(5000) - print(f"Number of batches: {len(batches)}") + # Test record-based scanning with poll() + print("\n--- Scanning table (record scanner) ---") + try: + # Use new_scan().create_log_scanner() for record-based operations + async with await table.new_scan().create_log_scanner() as record_scanner: + print(f"Created record scanner: {record_scanner}") + + record_scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) + + # Poll returns ScanRecords — records grouped by bucket + print("\n--- Testing poll() method (record-by-record) ---") + try: + scan_records = record_scanner.poll(5000) + print(f"Total records: {scan_records.count()}, buckets: {len(scan_records.buckets())}") + + # Flat iteration over all records (regardless of bucket) + print(f" Flat iteration: {scan_records.count()} records") + for record in scan_records: + print(f" offset={record.offset}, timestamp={record.timestamp}") + + # Per-bucket access + for bucket in scan_records.buckets(): + bucket_recs = scan_records.records(bucket) + print(f" Bucket {bucket}: {len(bucket_recs)} records") + for record in bucket_recs[:3]: + print(f" offset={record.offset}, " + f"timestamp={record.timestamp}, " + f"change_type={record.change_type}, " + f"row={record.row}") + + except Exception as e: + print(f"Error during poll: {e}") - for i, batch in enumerate(batches): - print(f" Batch {i}: bucket={batch.bucket}, " - f"offsets={batch.base_offset}-{batch.last_offset}, " - f"rows={batch.batch.num_rows}") + except Exception as e: + print(f"Error during record scanning: {e}") - except Exception as e: - print(f"Error during poll_record_batch: {e}") + # Demo: unsubscribe — unsubscribe from a bucket (non-partitioned tables) + print("\n--- Testing unsubscribe ---") + try: + async with await table.new_scan().create_record_batch_log_scanner() as unsub_scanner: + unsub_scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) + print(f"Subscribed to {num_buckets} buckets") + # Unsubscribe from bucket 0 — future polls will skip this bucket + unsub_scanner.unsubscribe(bucket_id=0) + print("Unsubscribed from bucket 0") + remaining = unsub_scanner.poll_arrow(5000) + print(f"After unsubscribe, got {remaining.num_rows} records (from remaining buckets)") + except Exception as e: + print(f"Error during unsubscribe test: {e}") + + # ===================================================== + # Demo: Primary Key Table with Lookup and Upsert + # ===================================================== + print("\n" + "=" * 60) + print("--- Testing Primary Key Table (Lookup & Upsert) ---") + print("=" * 60) + + # Create a primary key table for lookup/upsert tests + # Include temporal and decimal types to test full conversion + pk_table_fields = [ + pa.field("user_id", pa.int32()), + pa.field("name", pa.string()), + pa.field("email", pa.string()), + pa.field("age", pa.int32()), + pa.field("birth_date", pa.date32()), + pa.field("login_time", pa.time32("ms")), + pa.field("created_at", pa.timestamp("us")), # TIMESTAMP (NTZ) + pa.field("updated_at", pa.timestamp("us", tz="UTC")), # TIMESTAMP_LTZ + pa.field("balance", pa.decimal128(10, 2)), + ] + pk_schema = pa.schema(pk_table_fields) + fluss_pk_schema = fluss.Schema(pk_schema, primary_keys=["user_id"]) - except Exception as e: - print(f"Error during batch scanning: {e}") + # Create table descriptor + pk_table_descriptor = fluss.TableDescriptor( + fluss_pk_schema, + bucket_count=3, + ) - # Test record-based scanning with poll() - print("\n--- Scanning table (record scanner) ---") - try: - # Use new_scan().create_log_scanner() for record-based operations - async with await table.new_scan().create_log_scanner() as record_scanner: - print(f"Created record scanner: {record_scanner}") + pk_table_path = fluss.TablePath("fluss", "users_pk_table_v3") - record_scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) + try: + await admin.create_table(pk_table_path, pk_table_descriptor, True) + print(f"Created PK table: {pk_table_path}") + except Exception as e: + print(f"PK Table creation failed (may already exist): {e}") - # Poll returns ScanRecords — records grouped by bucket - print("\n--- Testing poll() method (record-by-record) ---") - try: - scan_records = record_scanner.poll(5000) - print(f"Total records: {scan_records.count()}, buckets: {len(scan_records.buckets())}") - - # Flat iteration over all records (regardless of bucket) - print(f" Flat iteration: {scan_records.count()} records") - for record in scan_records: - print(f" offset={record.offset}, timestamp={record.timestamp}") - - # Per-bucket access - for bucket in scan_records.buckets(): - bucket_recs = scan_records.records(bucket) - print(f" Bucket {bucket}: {len(bucket_recs)} records") - for record in bucket_recs[:3]: - print(f" offset={record.offset}, " - f"timestamp={record.timestamp}, " - f"change_type={record.change_type}, " - f"row={record.row}") + # Get the PK table + pk_table = await conn.get_table(pk_table_path) + print(f"Got PK table: {pk_table}") + print(f"Has primary key: {pk_table.has_primary_key()}") - except Exception as e: - print(f"Error during poll: {e}") - - except Exception as e: - print(f"Error during record scanning: {e}") - - # Demo: unsubscribe — unsubscribe from a bucket (non-partitioned tables) - print("\n--- Testing unsubscribe ---") - try: - async with await table.new_scan().create_record_batch_log_scanner() as unsub_scanner: - unsub_scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) - print(f"Subscribed to {num_buckets} buckets") - # Unsubscribe from bucket 0 — future polls will skip this bucket - unsub_scanner.unsubscribe(bucket_id=0) - print("Unsubscribed from bucket 0") - remaining = unsub_scanner.poll_arrow(5000) - print(f"After unsubscribe, got {remaining.num_rows} records (from remaining buckets)") - except Exception as e: - print(f"Error during unsubscribe test: {e}") - - # ===================================================== - # Demo: Primary Key Table with Lookup and Upsert - # ===================================================== - print("\n" + "=" * 60) - print("--- Testing Primary Key Table (Lookup & Upsert) ---") - print("=" * 60) - - # Create a primary key table for lookup/upsert tests - # Include temporal and decimal types to test full conversion - pk_table_fields = [ - pa.field("user_id", pa.int32()), - pa.field("name", pa.string()), - pa.field("email", pa.string()), - pa.field("age", pa.int32()), - pa.field("birth_date", pa.date32()), - pa.field("login_time", pa.time32("ms")), - pa.field("created_at", pa.timestamp("us")), # TIMESTAMP (NTZ) - pa.field("updated_at", pa.timestamp("us", tz="UTC")), # TIMESTAMP_LTZ - pa.field("balance", pa.decimal128(10, 2)), - ] - pk_schema = pa.schema(pk_table_fields) - fluss_pk_schema = fluss.Schema(pk_schema, primary_keys=["user_id"]) - - # Create table descriptor - pk_table_descriptor = fluss.TableDescriptor( - fluss_pk_schema, - bucket_count=3, - ) - - pk_table_path = fluss.TablePath("fluss", "users_pk_table_v3") - - try: - await admin.create_table(pk_table_path, pk_table_descriptor, True) - print(f"Created PK table: {pk_table_path}") - except Exception as e: - print(f"PK Table creation failed (may already exist): {e}") - - # Get the PK table - pk_table = await conn.get_table(pk_table_path) - print(f"Got PK table: {pk_table}") - print(f"Has primary key: {pk_table.has_primary_key()}") - - # --- Test Upsert --- - print("\n--- Testing Upsert (fire-and-forget) ---") - try: - async with pk_table.new_upsert().create_writer() as upsert_writer: - print(f"Created upsert writer: {upsert_writer}") - - # Fire-and-forget: queue writes synchronously, flush at end. - # Records are batched internally for efficiency. - upsert_writer.upsert( - { - "user_id": 1, - "name": "Alice", - "email": "alice@example.com", - "age": 25, - "birth_date": date(1999, 5, 15), - "login_time": dt_time(9, 30, 45, 123000), # 09:30:45.123 - "created_at": datetime( - 2024, 1, 15, 10, 30, 45, 123456 - ), # with microseconds - "updated_at": datetime(2024, 1, 15, 10, 30, 45, 123456), - "balance": Decimal("1234.56"), - } - ) - print("Queued user_id=1 (Alice)") - - upsert_writer.upsert( - { - "user_id": 2, - "name": "Bob", - "email": "bob@example.com", - "age": 30, - "birth_date": date(1994, 3, 20), - "login_time": dt_time(14, 15, 30, 500000), # 14:15:30.500 - "created_at": datetime(2024, 1, 16, 11, 22, 33, 444555), - "updated_at": datetime(2024, 1, 16, 11, 22, 33, 444555), - "balance": Decimal("5678.91"), - } - ) - print("Queued user_id=2 (Bob)") - - upsert_writer.upsert( - { - "user_id": 3, - "name": "Charlie", - "email": "charlie@example.com", - "age": 35, - "birth_date": date(1989, 11, 8), - "login_time": dt_time(16, 45, 59, 999000), # 16:45:59.999 - "created_at": datetime(2024, 1, 17, 23, 59, 59, 999999), - "updated_at": datetime(2024, 1, 17, 23, 59, 59, 999999), - "balance": Decimal("9876.54"), - } - ) - print("Queued user_id=3 (Charlie)") - - # flush() and close() are automatically called by the 'async with' block on successful exit. - # Bypass manual flush: - # await upsert_writer.flush() - print("Ending upsert writer context (auto-flushing)") - - # Per-record acknowledgment: await the returned handle to block until - # the server confirms this specific write, useful when you need to - # read-after-write or verify critical updates. - print("\n--- Testing Upsert (per-record acknowledgment) ---") - handle = upsert_writer.upsert( - { - "user_id": 1, - "name": "Alice Updated", - "email": "alice.new@example.com", - "age": 26, - "birth_date": date(1999, 5, 15), - "login_time": dt_time(10, 11, 12, 345000), # 10:11:12.345 - "created_at": datetime(2024, 1, 15, 10, 30, 45, 123456), # unchanged - "updated_at": datetime( - 2024, 1, 20, 15, 45, 30, 678901 - ), # new update time - "balance": Decimal("2345.67"), - } - ) - await handle.wait() # wait for server acknowledgment - print("Updated user_id=1 (Alice -> Alice Updated) — server acknowledged") - - except Exception as e: - print(f"Error during upsert: {e}") - traceback.print_exc() - - # --- Test Lookup --- - print("\n--- Testing Lookup ---") - try: - lookuper = pk_table.new_lookup().create_lookuper() - print(f"Created lookuper: {lookuper}") - - result = await lookuper.lookup({"user_id": 1}) - if result: - print("Lookup user_id=1: Found!") - print(f" name: {result['name']}") - print(f" email: {result['email']}") - print(f" age: {result['age']}") - print( - f" birth_date: {result['birth_date']} (type: {type(result['birth_date']).__name__})" + # --- Test Upsert --- + print("\n--- Testing Upsert (fire-and-forget) ---") + try: + async with pk_table.new_upsert().create_writer() as upsert_writer: + print(f"Created upsert writer: {upsert_writer}") + + # Fire-and-forget: queue writes synchronously, flush at end. + # Records are batched internally for efficiency. + upsert_writer.upsert( + { + "user_id": 1, + "name": "Alice", + "email": "alice@example.com", + "age": 25, + "birth_date": date(1999, 5, 15), + "login_time": dt_time(9, 30, 45, 123000), # 09:30:45.123 + "created_at": datetime( + 2024, 1, 15, 10, 30, 45, 123456 + ), # with microseconds + "updated_at": datetime(2024, 1, 15, 10, 30, 45, 123456), + "balance": Decimal("1234.56"), + } ) - print( - f" login_time: {result['login_time']} (type: {type(result['login_time']).__name__})" + print("Queued user_id=1 (Alice)") + + upsert_writer.upsert( + { + "user_id": 2, + "name": "Bob", + "email": "bob@example.com", + "age": 30, + "birth_date": date(1994, 3, 20), + "login_time": dt_time(14, 15, 30, 500000), # 14:15:30.500 + "created_at": datetime(2024, 1, 16, 11, 22, 33, 444555), + "updated_at": datetime(2024, 1, 16, 11, 22, 33, 444555), + "balance": Decimal("5678.91"), + } ) - print( - f" created_at: {result['created_at']} (type: {type(result['created_at']).__name__})" + print("Queued user_id=2 (Bob)") + + upsert_writer.upsert( + { + "user_id": 3, + "name": "Charlie", + "email": "charlie@example.com", + "age": 35, + "birth_date": date(1989, 11, 8), + "login_time": dt_time(16, 45, 59, 999000), # 16:45:59.999 + "created_at": datetime(2024, 1, 17, 23, 59, 59, 999999), + "updated_at": datetime(2024, 1, 17, 23, 59, 59, 999999), + "balance": Decimal("9876.54"), + } ) - print( - f" updated_at: {result['updated_at']} (type: {type(result['updated_at']).__name__})" + print("Queued user_id=3 (Charlie)") + + # flush() and close() are automatically called by the 'async with' block on successful exit. + # Bypass manual flush: + # await upsert_writer.flush() + print("Ending upsert writer context (auto-flushing)") + + # Per-record acknowledgment: await the returned handle to block until + # the server confirms this specific write, useful when you need to + # read-after-write or verify critical updates. + print("\n--- Testing Upsert (per-record acknowledgment) ---") + handle = upsert_writer.upsert( + { + "user_id": 1, + "name": "Alice Updated", + "email": "alice.new@example.com", + "age": 26, + "birth_date": date(1999, 5, 15), + "login_time": dt_time(10, 11, 12, 345000), # 10:11:12.345 + "created_at": datetime(2024, 1, 15, 10, 30, 45, 123456), # unchanged + "updated_at": datetime( + 2024, 1, 20, 15, 45, 30, 678901 + ), # new update time + "balance": Decimal("2345.67"), + } ) + await handle.wait() # wait for server acknowledgment + print("Updated user_id=1 (Alice -> Alice Updated) — server acknowledged") + + except Exception as e: + print(f"Error during upsert: {e}") + traceback.print_exc() + + # --- Test Lookup --- + print("\n--- Testing Lookup ---") + try: + lookuper = pk_table.new_lookup().create_lookuper() + print(f"Created lookuper: {lookuper}") + + result = await lookuper.lookup({"user_id": 1}) + if result: + print("Lookup user_id=1: Found!") + print(f" name: {result['name']}") + print(f" email: {result['email']}") + print(f" age: {result['age']}") + print( + f" birth_date: {result['birth_date']} (type: {type(result['birth_date']).__name__})" + ) + print( + f" login_time: {result['login_time']} (type: {type(result['login_time']).__name__})" + ) + print( + f" created_at: {result['created_at']} (type: {type(result['created_at']).__name__})" + ) + print( + f" updated_at: {result['updated_at']} (type: {type(result['updated_at']).__name__})" + ) + print( + f" balance: {result['balance']} (type: {type(result['balance']).__name__})" + ) + else: + print("Lookup user_id=1: Not found") + + # Lookup another row + result = await lookuper.lookup({"user_id": 2}) + if result: + print(f"Lookup user_id=2: Found! -> {result}") + else: + print("Lookup user_id=2: Not found") + + # Lookup non-existent row + result = await lookuper.lookup({"user_id": 999}) + if result: + print(f"Lookup user_id=999: Found! -> {result}") + else: + print("Lookup user_id=999: Not found (as expected)") + + except Exception as e: + print(f"Error during lookup: {e}") + traceback.print_exc() + + # --- Test Delete --- + print("\n--- Testing Delete ---") + try: + upsert_writer = pk_table.new_upsert().create_writer() + + handle = upsert_writer.delete({"user_id": 3}) + await handle.wait() + print("Deleted user_id=3 — server acknowledged") + + lookuper = pk_table.new_lookup().create_lookuper() + result = await lookuper.lookup({"user_id": 3}) + if result: + print(f"Lookup user_id=3 after delete: Still found! -> {result}") + else: + print("Lookup user_id=3 after delete: Not found (deletion confirmed)") + + except Exception as e: + print(f"Error during delete: {e}") + traceback.print_exc() + + # --- Test Partial Update by column names --- + print("\n--- Testing Partial Update (by column names) ---") + try: + partial_writer = pk_table.new_upsert().partial_update_by_name(["user_id", "balance"]).create_writer() + handle = partial_writer.upsert({"user_id": 1, "balance": Decimal("9999.99")}) + await handle.wait() + print("Partial update: set balance=9999.99 for user_id=1") + + lookuper = pk_table.new_lookup().create_lookuper() + result = await lookuper.lookup({"user_id": 1}) + if result: + print(f"Partial update verified:" + f"\n name={result['name']} (unchanged)" + f"\n balance={result['balance']} (updated)") + else: + print("ERROR: Expected to find user_id=1") + + except Exception as e: + print(f"Error during partial update by names: {e}") + traceback.print_exc() + + # --- Test Partial Update by column indices --- + print("\n--- Testing Partial Update (by column indices) ---") + try: + # Columns: 0=user_id (PK), 1=name — update name only + partial_writer_idx = pk_table.new_upsert().partial_update_by_index([0, 1]).create_writer() + handle = partial_writer_idx.upsert([1, "Alice Renamed"]) + await handle.wait() + print("Partial update by indices: set name='Alice Renamed' for user_id=1") + + lookuper = pk_table.new_lookup().create_lookuper() + result = await lookuper.lookup({"user_id": 1}) + if result: + print(f"Partial update by indices verified:" + f"\n name={result['name']} (updated)" + f"\n balance={result['balance']} (unchanged)") + else: + print("ERROR: Expected to find user_id=1") + + except Exception as e: + print(f"Error during partial update by indices: {e}") + traceback.print_exc() + + # Demo: Column projection using builder pattern + print("\n--- Testing Column Projection ---") + try: + # Get bucket count for subscriptions + num_buckets = (await admin.get_table_info(table_path)).num_buckets + + # Project specific columns by index (using batch scanner for to_pandas) + print("\n1. Projection by index [0, 1] (id, name):") + scanner_index = await table.new_scan().project([0, 1]).create_record_batch_log_scanner() + scanner_index.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) + df_projected = scanner_index.to_pandas() + print(df_projected.head()) print( - f" balance: {result['balance']} (type: {type(result['balance']).__name__})" + f" Projected {df_projected.shape[1]} columns: {list(df_projected.columns)}" ) - else: - print("Lookup user_id=1: Not found") - - # Lookup another row - result = await lookuper.lookup({"user_id": 2}) - if result: - print(f"Lookup user_id=2: Found! -> {result}") - else: - print("Lookup user_id=2: Not found") - - # Lookup non-existent row - result = await lookuper.lookup({"user_id": 999}) - if result: - print(f"Lookup user_id=999: Found! -> {result}") - else: - print("Lookup user_id=999: Not found (as expected)") - - except Exception as e: - print(f"Error during lookup: {e}") - traceback.print_exc() - - # --- Test Delete --- - print("\n--- Testing Delete ---") - try: - upsert_writer = pk_table.new_upsert().create_writer() - - handle = upsert_writer.delete({"user_id": 3}) - await handle.wait() - print("Deleted user_id=3 — server acknowledged") - - lookuper = pk_table.new_lookup().create_lookuper() - result = await lookuper.lookup({"user_id": 3}) - if result: - print(f"Lookup user_id=3 after delete: Still found! -> {result}") - else: - print("Lookup user_id=3 after delete: Not found (deletion confirmed)") - - except Exception as e: - print(f"Error during delete: {e}") - traceback.print_exc() - - # --- Test Partial Update by column names --- - print("\n--- Testing Partial Update (by column names) ---") - try: - partial_writer = pk_table.new_upsert().partial_update_by_name(["user_id", "balance"]).create_writer() - handle = partial_writer.upsert({"user_id": 1, "balance": Decimal("9999.99")}) - await handle.wait() - print("Partial update: set balance=9999.99 for user_id=1") - - lookuper = pk_table.new_lookup().create_lookuper() - result = await lookuper.lookup({"user_id": 1}) - if result: - print(f"Partial update verified:" - f"\n name={result['name']} (unchanged)" - f"\n balance={result['balance']} (updated)") - else: - print("ERROR: Expected to find user_id=1") - - except Exception as e: - print(f"Error during partial update by names: {e}") - traceback.print_exc() - - # --- Test Partial Update by column indices --- - print("\n--- Testing Partial Update (by column indices) ---") - try: - # Columns: 0=user_id (PK), 1=name — update name only - partial_writer_idx = pk_table.new_upsert().partial_update_by_index([0, 1]).create_writer() - handle = partial_writer_idx.upsert([1, "Alice Renamed"]) - await handle.wait() - print("Partial update by indices: set name='Alice Renamed' for user_id=1") - - lookuper = pk_table.new_lookup().create_lookuper() - result = await lookuper.lookup({"user_id": 1}) - if result: - print(f"Partial update by indices verified:" - f"\n name={result['name']} (updated)" - f"\n balance={result['balance']} (unchanged)") - else: - print("ERROR: Expected to find user_id=1") - - except Exception as e: - print(f"Error during partial update by indices: {e}") - traceback.print_exc() - - # Demo: Column projection using builder pattern - print("\n--- Testing Column Projection ---") - try: - # Get bucket count for subscriptions - num_buckets = (await admin.get_table_info(table_path)).num_buckets - - # Project specific columns by index (using batch scanner for to_pandas) - print("\n1. Projection by index [0, 1] (id, name):") - scanner_index = await table.new_scan().project([0, 1]).create_record_batch_log_scanner() - scanner_index.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) - df_projected = scanner_index.to_pandas() - print(df_projected.head()) - print( - f" Projected {df_projected.shape[1]} columns: {list(df_projected.columns)}" - ) - # Project specific columns by name (Pythonic!) - print("\n2. Projection by name ['name', 'score'] (Pythonic):") - scanner_names = await table.new_scan() \ - .project_by_name(["name", "score"]) \ - .create_record_batch_log_scanner() - scanner_names.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) - df_named = scanner_names.to_pandas() - print(df_named.head()) - print(f" Projected {df_named.shape[1]} columns: {list(df_named.columns)}") - - # Test empty result schema with projection - print("\n3. Testing empty result schema with projection:") - scanner_proj = await table.new_scan().project([0, 2]).create_record_batch_log_scanner() - scanner_proj.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) - # Quick poll that may return empty - result = scanner_proj.poll_arrow(100) - print(f" Schema columns: {result.schema.names}") - - except Exception as e: - print(f"Error during projection: {e}") - - # Demo: Drop tables - print("\n--- Testing drop_table() ---") - try: - # Drop the log table - await admin.drop_table(table_path, ignore_if_not_exists=True) - print(f"Successfully dropped table: {table_path}") - # Drop the PK table - await admin.drop_table(pk_table_path, ignore_if_not_exists=True) - print(f"Successfully dropped table: {pk_table_path}") - except Exception as e: - print(f"Failed to drop table: {e}") - - # ===================================================== - # Demo: Partitioned Table with list_partition_offsets - # ===================================================== - print("\n" + "=" * 60) - print("--- Testing Partitioned Table ---") - print("=" * 60) - - # Create a partitioned log table - partitioned_fields = [ - pa.field("id", pa.int32()), - pa.field("region", pa.string()), # partition key - pa.field("value", pa.int64()), - ] - partitioned_schema = pa.schema(partitioned_fields) - fluss_partitioned_schema = fluss.Schema(partitioned_schema) - - partitioned_table_descriptor = fluss.TableDescriptor( - fluss_partitioned_schema, - partition_keys=["region"], # Partition by region - bucket_count=1, - ) - - partitioned_table_path = fluss.TablePath("fluss", "partitioned_log_table_py") - - try: - # Drop if exists first - await admin.drop_table(partitioned_table_path, ignore_if_not_exists=True) - print(f"Dropped existing table: {partitioned_table_path}") - - # Create the partitioned table - await admin.create_table(partitioned_table_path, partitioned_table_descriptor, False) - print(f"Created partitioned table: {partitioned_table_path}") - - # Create partitions for US and EU regions - print("\n--- Creating partitions ---") - await admin.create_partition(partitioned_table_path, {"region": "US"}, ignore_if_exists=True) - print("Created partition: region=US") - await admin.create_partition(partitioned_table_path, {"region": "EU"}, ignore_if_exists=True) - print("Created partition: region=EU") - - # List partitions - print("\n--- Listing partitions ---") - partition_infos = await admin.list_partition_infos(partitioned_table_path) - for p in partition_infos: - print(f" {p}") # PartitionInfo(partition_id=..., partition_name='region=...') - - # Get the table and write some data - partitioned_table = await conn.get_table(partitioned_table_path) - partitioned_writer = partitioned_table.new_append().create_writer() - - # Append data to US partition - partitioned_writer.append({"id": 1, "region": "US", "value": 100}) - partitioned_writer.append({"id": 2, "region": "US", "value": 200}) - # Append data to EU partition - partitioned_writer.append({"id": 3, "region": "EU", "value": 300}) - partitioned_writer.append({"id": 4, "region": "EU", "value": 400}) - await partitioned_writer.flush() - print("\nWrote 4 records (2 to US, 2 to EU)") - - # Demo: list_partition_infos with partial spec filter - print("\n--- Testing list_partition_infos with spec ---") - us_partitions = await admin.list_partition_infos( - partitioned_table_path, partition_spec={"region": "US"} - ) - print(f"Filtered partitions (region=US): {us_partitions}") - - # Demo: list_partition_offsets - print("\n--- Testing list_partition_offsets ---") - - # Query offsets for US partition - # Note: partition_name is just the value (e.g., "US"), not "region=US" - us_offsets = await admin.list_partition_offsets( - partitioned_table_path, - partition_name="US", - bucket_ids=[0], - offset_spec=fluss.OffsetSpec.latest() - ) - print(f"US partition latest offsets: {us_offsets}") - - # Query offsets for EU partition - eu_offsets = await admin.list_partition_offsets( - partitioned_table_path, - partition_name="EU", - bucket_ids=[0], - offset_spec=fluss.OffsetSpec.latest() + # Project specific columns by name (Pythonic!) + print("\n2. Projection by name ['name', 'score'] (Pythonic):") + scanner_names = await table.new_scan() \ + .project_by_name(["name", "score"]) \ + .create_record_batch_log_scanner() + scanner_names.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) + df_named = scanner_names.to_pandas() + print(df_named.head()) + print(f" Projected {df_named.shape[1]} columns: {list(df_named.columns)}") + + # Test empty result schema with projection + print("\n3. Testing empty result schema with projection:") + scanner_proj = await table.new_scan().project([0, 2]).create_record_batch_log_scanner() + scanner_proj.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) + # Quick poll that may return empty + result = scanner_proj.poll_arrow(100) + print(f" Schema columns: {result.schema.names}") + + except Exception as e: + print(f"Error during projection: {e}") + + # Demo: Drop tables + print("\n--- Testing drop_table() ---") + try: + # Drop the log table + await admin.drop_table(table_path, ignore_if_not_exists=True) + print(f"Successfully dropped table: {table_path}") + # Drop the PK table + await admin.drop_table(pk_table_path, ignore_if_not_exists=True) + print(f"Successfully dropped table: {pk_table_path}") + except Exception as e: + print(f"Failed to drop table: {e}") + + # ===================================================== + # Demo: Partitioned Table with list_partition_offsets + # ===================================================== + print("\n" + "=" * 60) + print("--- Testing Partitioned Table ---") + print("=" * 60) + + # Create a partitioned log table + partitioned_fields = [ + pa.field("id", pa.int32()), + pa.field("region", pa.string()), # partition key + pa.field("value", pa.int64()), + ] + partitioned_schema = pa.schema(partitioned_fields) + fluss_partitioned_schema = fluss.Schema(partitioned_schema) + + partitioned_table_descriptor = fluss.TableDescriptor( + fluss_partitioned_schema, + partition_keys=["region"], # Partition by region + bucket_count=1, ) - print(f"EU partition latest offsets: {eu_offsets}") - - # Demo: subscribe_partition for reading partitioned data - print("\n--- Testing subscribe_partition + to_arrow() ---") - partitioned_scanner = await partitioned_table.new_scan().create_record_batch_log_scanner() - - # Subscribe to each partition using partition_id - for p in partition_infos: - partitioned_scanner.subscribe_partition( - partition_id=p.partition_id, - bucket_id=0, - start_offset=fluss.EARLIEST_OFFSET + + partitioned_table_path = fluss.TablePath("fluss", "partitioned_log_table_py") + + try: + # Drop if exists first + await admin.drop_table(partitioned_table_path, ignore_if_not_exists=True) + print(f"Dropped existing table: {partitioned_table_path}") + + # Create the partitioned table + await admin.create_table(partitioned_table_path, partitioned_table_descriptor, False) + print(f"Created partitioned table: {partitioned_table_path}") + + # Create partitions for US and EU regions + print("\n--- Creating partitions ---") + await admin.create_partition(partitioned_table_path, {"region": "US"}, ignore_if_exists=True) + print("Created partition: region=US") + await admin.create_partition(partitioned_table_path, {"region": "EU"}, ignore_if_exists=True) + print("Created partition: region=EU") + + # List partitions + print("\n--- Listing partitions ---") + partition_infos = await admin.list_partition_infos(partitioned_table_path) + for p in partition_infos: + print(f" {p}") # PartitionInfo(partition_id=..., partition_name='region=...') + + # Get the table and write some data + partitioned_table = await conn.get_table(partitioned_table_path) + partitioned_writer = partitioned_table.new_append().create_writer() + + # Append data to US partition + partitioned_writer.append({"id": 1, "region": "US", "value": 100}) + partitioned_writer.append({"id": 2, "region": "US", "value": 200}) + # Append data to EU partition + partitioned_writer.append({"id": 3, "region": "EU", "value": 300}) + partitioned_writer.append({"id": 4, "region": "EU", "value": 400}) + await partitioned_writer.flush() + print("\nWrote 4 records (2 to US, 2 to EU)") + + # Demo: list_partition_infos with partial spec filter + print("\n--- Testing list_partition_infos with spec ---") + us_partitions = await admin.list_partition_infos( + partitioned_table_path, partition_spec={"region": "US"} + ) + print(f"Filtered partitions (region=US): {us_partitions}") + + # Demo: list_partition_offsets + print("\n--- Testing list_partition_offsets ---") + + # Query offsets for US partition + # Note: partition_name is just the value (e.g., "US"), not "region=US" + us_offsets = await admin.list_partition_offsets( + partitioned_table_path, + partition_name="US", + bucket_ids=[0], + offset_spec=fluss.OffsetSpec.latest() + ) + print(f"US partition latest offsets: {us_offsets}") + + # Query offsets for EU partition + eu_offsets = await admin.list_partition_offsets( + partitioned_table_path, + partition_name="EU", + bucket_ids=[0], + offset_spec=fluss.OffsetSpec.latest() ) - print(f"Subscribed to partition {p.partition_name} (id={p.partition_id})") - - # Use to_arrow() - now works for partitioned tables! - partitioned_arrow = partitioned_scanner.to_arrow() - print(f"\nto_arrow() returned {partitioned_arrow.num_rows} records from partitioned table:") - print(partitioned_arrow.to_pandas()) - - # Demo: subscribe_partition_buckets for batch subscribing to multiple partitions at once - print("\n--- Testing subscribe_partition_buckets + to_arrow() ---") - partitioned_scanner_batch = await partitioned_table.new_scan().create_record_batch_log_scanner() - partition_bucket_offsets = { - (p.partition_id, 0): fluss.EARLIEST_OFFSET for p in partition_infos - } - partitioned_scanner_batch.subscribe_partition_buckets(partition_bucket_offsets) - print(f"Batch subscribed to {len(partition_bucket_offsets)} partition+bucket combinations") - partitioned_batch_arrow = partitioned_scanner_batch.to_arrow() - print(f"to_arrow() returned {partitioned_batch_arrow.num_rows} records:") - print(partitioned_batch_arrow.to_pandas()) - - # Demo: unsubscribe_partition - unsubscribe from one partition, read remaining - print("\n--- Testing unsubscribe_partition ---") - partitioned_scanner3 = await partitioned_table.new_scan().create_record_batch_log_scanner() - for p in partition_infos: - partitioned_scanner3.subscribe_partition(p.partition_id, 0, fluss.EARLIEST_OFFSET) - # Unsubscribe from the first partition - first_partition = partition_infos[0] - partitioned_scanner3.unsubscribe_partition(first_partition.partition_id, 0) - print(f"Unsubscribed from partition {first_partition.partition_name} (id={first_partition.partition_id})") - remaining_arrow = partitioned_scanner3.to_arrow() - print(f"After unsubscribe, to_arrow() returned {remaining_arrow.num_rows} records (from remaining partitions):") - print(remaining_arrow.to_pandas()) - - # Demo: to_pandas() also works for partitioned tables - print("\n--- Testing to_pandas() on partitioned table ---") - partitioned_scanner2 = await partitioned_table.new_scan().create_record_batch_log_scanner() - for p in partition_infos: - partitioned_scanner2.subscribe_partition(p.partition_id, 0, fluss.EARLIEST_OFFSET) - partitioned_df = partitioned_scanner2.to_pandas() - print(f"to_pandas() returned {len(partitioned_df)} records:") - print(partitioned_df) - - # Cleanup - await admin.drop_table(partitioned_table_path, ignore_if_not_exists=True) - print(f"\nDropped partitioned table: {partitioned_table_path}") - - except Exception as e: - print(f"Error with partitioned table: {e}") - traceback.print_exc() - - # ===================================================== - # Demo: Partitioned KV Table (Upsert, Lookup, Delete) - # ===================================================== - print("\n" + "=" * 60) - print("--- Testing Partitioned KV Table ---") - print("=" * 60) - - partitioned_kv_fields = [ - pa.field("region", pa.string()), # partition key + part of PK - pa.field("user_id", pa.int32()), # part of PK - pa.field("name", pa.string()), - pa.field("score", pa.int64()), - ] - partitioned_kv_schema = pa.schema(partitioned_kv_fields) - fluss_partitioned_kv_schema = fluss.Schema( - partitioned_kv_schema, primary_keys=["region", "user_id"] - ) - - partitioned_kv_descriptor = fluss.TableDescriptor( - fluss_partitioned_kv_schema, - partition_keys=["region"], - ) - - partitioned_kv_path = fluss.TablePath("fluss", "partitioned_kv_table_py") - - try: - await admin.drop_table(partitioned_kv_path, ignore_if_not_exists=True) - await admin.create_table(partitioned_kv_path, partitioned_kv_descriptor, False) - print(f"Created partitioned KV table: {partitioned_kv_path}") - - # Create partitions - await admin.create_partition(partitioned_kv_path, {"region": "US"}) - await admin.create_partition(partitioned_kv_path, {"region": "EU"}) - await admin.create_partition(partitioned_kv_path, {"region": "APAC"}) - print("Created partitions: US, EU, APAC") - - partitioned_kv_table = await conn.get_table(partitioned_kv_path) - upsert_writer = partitioned_kv_table.new_upsert().create_writer() - - # Upsert rows across partitions - test_data = [ - ("US", 1, "Gustave", 100), - ("US", 2, "Lune", 200), - ("EU", 1, "Sciel", 150), - ("EU", 2, "Maelle", 250), - ("APAC", 1, "Noco", 300), + print(f"EU partition latest offsets: {eu_offsets}") + + # Demo: subscribe_partition for reading partitioned data + print("\n--- Testing subscribe_partition + to_arrow() ---") + partitioned_scanner = await partitioned_table.new_scan().create_record_batch_log_scanner() + + # Subscribe to each partition using partition_id + for p in partition_infos: + partitioned_scanner.subscribe_partition( + partition_id=p.partition_id, + bucket_id=0, + start_offset=fluss.EARLIEST_OFFSET + ) + print(f"Subscribed to partition {p.partition_name} (id={p.partition_id})") + + # Use to_arrow() - now works for partitioned tables! + partitioned_arrow = partitioned_scanner.to_arrow() + print(f"\nto_arrow() returned {partitioned_arrow.num_rows} records from partitioned table:") + print(partitioned_arrow.to_pandas()) + + # Demo: subscribe_partition_buckets for batch subscribing to multiple partitions at once + print("\n--- Testing subscribe_partition_buckets + to_arrow() ---") + partitioned_scanner_batch = await partitioned_table.new_scan().create_record_batch_log_scanner() + partition_bucket_offsets = { + (p.partition_id, 0): fluss.EARLIEST_OFFSET for p in partition_infos + } + partitioned_scanner_batch.subscribe_partition_buckets(partition_bucket_offsets) + print(f"Batch subscribed to {len(partition_bucket_offsets)} partition+bucket combinations") + partitioned_batch_arrow = partitioned_scanner_batch.to_arrow() + print(f"to_arrow() returned {partitioned_batch_arrow.num_rows} records:") + print(partitioned_batch_arrow.to_pandas()) + + # Demo: unsubscribe_partition - unsubscribe from one partition, read remaining + print("\n--- Testing unsubscribe_partition ---") + partitioned_scanner3 = await partitioned_table.new_scan().create_record_batch_log_scanner() + for p in partition_infos: + partitioned_scanner3.subscribe_partition(p.partition_id, 0, fluss.EARLIEST_OFFSET) + # Unsubscribe from the first partition + first_partition = partition_infos[0] + partitioned_scanner3.unsubscribe_partition(first_partition.partition_id, 0) + print(f"Unsubscribed from partition {first_partition.partition_name} (id={first_partition.partition_id})") + remaining_arrow = partitioned_scanner3.to_arrow() + print(f"After unsubscribe, to_arrow() returned {remaining_arrow.num_rows} records (from remaining partitions):") + print(remaining_arrow.to_pandas()) + + # Demo: to_pandas() also works for partitioned tables + print("\n--- Testing to_pandas() on partitioned table ---") + partitioned_scanner2 = await partitioned_table.new_scan().create_record_batch_log_scanner() + for p in partition_infos: + partitioned_scanner2.subscribe_partition(p.partition_id, 0, fluss.EARLIEST_OFFSET) + partitioned_df = partitioned_scanner2.to_pandas() + print(f"to_pandas() returned {len(partitioned_df)} records:") + print(partitioned_df) + + # Cleanup + await admin.drop_table(partitioned_table_path, ignore_if_not_exists=True) + print(f"\nDropped partitioned table: {partitioned_table_path}") + + except Exception as e: + print(f"Error with partitioned table: {e}") + traceback.print_exc() + + # ===================================================== + # Demo: Partitioned KV Table (Upsert, Lookup, Delete) + # ===================================================== + print("\n" + "=" * 60) + print("--- Testing Partitioned KV Table ---") + print("=" * 60) + + partitioned_kv_fields = [ + pa.field("region", pa.string()), # partition key + part of PK + pa.field("user_id", pa.int32()), # part of PK + pa.field("name", pa.string()), + pa.field("score", pa.int64()), ] - for region, user_id, name, score in test_data: - upsert_writer.upsert({ - "region": region, "user_id": user_id, - "name": name, "score": score, + partitioned_kv_schema = pa.schema(partitioned_kv_fields) + fluss_partitioned_kv_schema = fluss.Schema( + partitioned_kv_schema, primary_keys=["region", "user_id"] + ) + + partitioned_kv_descriptor = fluss.TableDescriptor( + fluss_partitioned_kv_schema, + partition_keys=["region"], + ) + + partitioned_kv_path = fluss.TablePath("fluss", "partitioned_kv_table_py") + + try: + await admin.drop_table(partitioned_kv_path, ignore_if_not_exists=True) + await admin.create_table(partitioned_kv_path, partitioned_kv_descriptor, False) + print(f"Created partitioned KV table: {partitioned_kv_path}") + + # Create partitions + await admin.create_partition(partitioned_kv_path, {"region": "US"}) + await admin.create_partition(partitioned_kv_path, {"region": "EU"}) + await admin.create_partition(partitioned_kv_path, {"region": "APAC"}) + print("Created partitions: US, EU, APAC") + + partitioned_kv_table = await conn.get_table(partitioned_kv_path) + upsert_writer = partitioned_kv_table.new_upsert().create_writer() + + # Upsert rows across partitions + test_data = [ + ("US", 1, "Gustave", 100), + ("US", 2, "Lune", 200), + ("EU", 1, "Sciel", 150), + ("EU", 2, "Maelle", 250), + ("APAC", 1, "Noco", 300), + ] + for region, user_id, name, score in test_data: + upsert_writer.upsert({ + "region": region, "user_id": user_id, + "name": name, "score": score, + }) + await upsert_writer.flush() + print(f"Upserted {len(test_data)} rows across 3 partitions") + + # Lookup all rows across partitions + print("\n--- Lookup across partitions ---") + lookuper = partitioned_kv_table.new_lookup().create_lookuper() + for region, user_id, name, score in test_data: + result = await lookuper.lookup({"region": region, "user_id": user_id}) + assert result is not None, f"Expected to find region={region} user_id={user_id}" + assert result["name"] == name, f"Name mismatch: {result['name']} != {name}" + assert result["score"] == score, f"Score mismatch: {result['score']} != {score}" + print(f"All {len(test_data)} rows verified across partitions") + + # Update within a partition + print("\n--- Update within partition ---") + handle = upsert_writer.upsert({ + "region": "US", "user_id": 1, + "name": "Gustave Updated", "score": 999, }) - await upsert_writer.flush() - print(f"Upserted {len(test_data)} rows across 3 partitions") - - # Lookup all rows across partitions - print("\n--- Lookup across partitions ---") - lookuper = partitioned_kv_table.new_lookup().create_lookuper() - for region, user_id, name, score in test_data: - result = await lookuper.lookup({"region": region, "user_id": user_id}) - assert result is not None, f"Expected to find region={region} user_id={user_id}" - assert result["name"] == name, f"Name mismatch: {result['name']} != {name}" - assert result["score"] == score, f"Score mismatch: {result['score']} != {score}" - print(f"All {len(test_data)} rows verified across partitions") - - # Update within a partition - print("\n--- Update within partition ---") - handle = upsert_writer.upsert({ - "region": "US", "user_id": 1, - "name": "Gustave Updated", "score": 999, - }) - await handle.wait() - result = await lookuper.lookup({"region": "US", "user_id": 1}) - assert result is not None, "Expected to find region=US user_id=1 after update" - assert result["name"] == "Gustave Updated" - assert result["score"] == 999 - print(f"Update verified: US/1 name={result['name']} score={result['score']}") - - # Lookup in non-existent partition - print("\n--- Lookup in non-existent partition ---") - result = await lookuper.lookup({"region": "UNKNOWN", "user_id": 1}) - assert result is None, "Expected UNKNOWN partition lookup to return None" - print("UNKNOWN partition lookup: not found (expected)") - - # Delete within a partition - print("\n--- Delete within partition ---") - handle = upsert_writer.delete({"region": "EU", "user_id": 1}) - await handle.wait() - result = await lookuper.lookup({"region": "EU", "user_id": 1}) - assert result is None, "Expected EU/1 to be deleted" - print("Delete verified: EU/1 not found") - - # Verify sibling record still exists - result = await lookuper.lookup({"region": "EU", "user_id": 2}) - assert result is not None, "Expected EU/2 to still exist" - assert result["name"] == "Maelle" - print(f"EU/2 still exists: name={result['name']}") - - # Cleanup - await admin.drop_table(partitioned_kv_path, ignore_if_not_exists=True) - print(f"\nDropped partitioned KV table: {partitioned_kv_path}") - - except Exception as e: - print(f"Error with partitioned KV table: {e}") - traceback.print_exc() - - # Close connection - conn.close() - print("\nConnection closed") + await handle.wait() + result = await lookuper.lookup({"region": "US", "user_id": 1}) + assert result is not None, "Expected to find region=US user_id=1 after update" + assert result["name"] == "Gustave Updated" + assert result["score"] == 999 + print(f"Update verified: US/1 name={result['name']} score={result['score']}") + + # Lookup in non-existent partition + print("\n--- Lookup in non-existent partition ---") + result = await lookuper.lookup({"region": "UNKNOWN", "user_id": 1}) + assert result is None, "Expected UNKNOWN partition lookup to return None" + print("UNKNOWN partition lookup: not found (expected)") + + # Delete within a partition + print("\n--- Delete within partition ---") + handle = upsert_writer.delete({"region": "EU", "user_id": 1}) + await handle.wait() + result = await lookuper.lookup({"region": "EU", "user_id": 1}) + assert result is None, "Expected EU/1 to be deleted" + print("Delete verified: EU/1 not found") + + # Verify sibling record still exists + result = await lookuper.lookup({"region": "EU", "user_id": 2}) + assert result is not None, "Expected EU/2 to still exist" + assert result["name"] == "Maelle" + print(f"EU/2 still exists: name={result['name']}") + + # Cleanup + await admin.drop_table(partitioned_kv_path, ignore_if_not_exists=True) + print(f"\nDropped partitioned KV table: {partitioned_kv_path}") + + except Exception as e: + print(f"Error with partitioned KV table: {e}") + traceback.print_exc() + + # Close connection + conn.close() + print("\nConnection closed") if __name__ == "__main__": From a1cadf2fe7ed28e4844a09895c7e11c11bb7a70a Mon Sep 17 00:00:00 2001 From: Jared Yu Date: Sat, 11 Apr 2026 09:11:19 -0700 Subject: [PATCH 4/5] fix: Add self.close()?; to __aexit__ in connection.rs --- bindings/python/src/connection.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/bindings/python/src/connection.rs b/bindings/python/src/connection.rs index 8112e2d5..dd2af3ee 100644 --- a/bindings/python/src/connection.rs +++ b/bindings/python/src/connection.rs @@ -119,6 +119,7 @@ impl FlussConnection { _exc_value: Option>, _traceback: Option>, ) -> PyResult> { + self.close()?; future_into_py(py, async move { // In the future, we could call an async close on the core connection here // e.g., client.close().await; From ff103714cea41cd561385fac240bc119459544d1 Mon Sep 17 00:00:00 2001 From: Jared Yu Date: Sat, 11 Apr 2026 09:11:37 -0700 Subject: [PATCH 5/5] fix: Cleanup docstring --- bindings/python/src/upsert.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/bindings/python/src/upsert.rs b/bindings/python/src/upsert.rs index f597737d..1c278123 100644 --- a/bindings/python/src/upsert.rs +++ b/bindings/python/src/upsert.rs @@ -97,7 +97,6 @@ impl UpsertWriter { /// /// Returns: /// None on success - /// Flush any pending data pub fn flush<'py>(&self, py: Python<'py>) -> PyResult> { let writer = self.writer.clone();