diff --git a/.gitmodules b/.gitmodules index 55b8de33..20bc2831 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,3 @@ [submodule "clickhouse-cpp"] path = vendor/clickhouse-cpp - url = https://github.com/ClickHouse/clickhouse-cpp.git + url = git@github.com:iskakaushik/clickhouse-cpp.git diff --git a/doc/pg_clickhouse.md b/doc/pg_clickhouse.md index 406d1b75..48084613 100644 --- a/doc/pg_clickhouse.md +++ b/doc/pg_clickhouse.md @@ -139,6 +139,11 @@ The supported options are: * `dbname`: The ClickHouse database to use upon connecting. Defaults to "default". * `host`: The host name of the ClickHouse server. Defaults to "localhost"; +* `fetch_size`: The number of remote rows to buffer per streamed fetch while + scanning foreign tables with the `binary` driver. Defaults to `50000`. Set + it to `0` to disable streaming for tables that use this server and fall + back to buffering the full result set in memory. The `http` driver always + uses the buffered query path. * `port`: The port to connect to on the ClickHouse server. Defaults as follows: * 9440 if `driver` is "binary" and `host` is a ClickHouse Cloud host @@ -299,6 +304,10 @@ The supported table options are: defined for the foreign server. * `table_name`: The name of the remote table. Default to the name specified for the foreign table. +* `fetch_size`: Overrides the server-level `fetch_size` for this table only. + Use it to make scans buffer fewer or more remote rows at a time, or set it + to `0` to disable streaming for this table. Streaming currently applies + only to tables that use the `binary` driver. * `engine`: The [table engine] used by the ClickHouse table. For `CollapsingMergeTree()` and `AggregatingMergeTree()`, pg_clickhouse automatically applies the parameters to function expressions executed on diff --git a/src/binary.cpp b/src/binary.cpp index e6882243..3cf5c260 100644 --- a/src/binary.cpp +++ b/src/binary.cpp @@ -41,8 +41,7 @@ extern "C" #include "utils/uuid.h" #include "binary.hh" - - using namespace clickhouse; +#include "binary_internal.hh" #if defined(__APPLE__) /* Byte ordering on macOS */ #include @@ -55,6 +54,8 @@ extern "C" #define BIG_ENDIAN_64_TO_HOST(x) be64toh(x) #endif + using namespace clickhouse; + #define THROW_UNEXPECTED_COLUMN(exp_type, col) \ throw std::runtime_error("unexpected column type for " + std::string(exp_type) + ": " + col->Type()->GetName()) @@ -181,7 +182,7 @@ extern "C" /* * Converts query->settings to QuerySettings. */ - static QuerySettings + QuerySettings ch_binary_settings(const ch_query *query) { kv_iter iter; @@ -198,7 +199,7 @@ extern "C" /* * Converts query->param_values to QueryParams. */ - static QueryParams + QueryParams ch_binary_params(const ch_query *query) { int i; @@ -232,6 +233,12 @@ extern "C" client->Select(clickhouse::Query(query->sql) .SetQuerySettings(ch_binary_settings(query)) .SetParams(ch_binary_params(query)) + .OnProgress( + [&check_cancel](const Progress &) + { + if (check_cancel && check_cancel()) + throw std::runtime_error("query was canceled"); + }) .OnDataCancelable( [&resp, &values, &check_cancel](const Block &block) { @@ -836,8 +843,8 @@ extern "C" * There is not an adequate (without huge overheads) solution, we just consider * this state unfixable. */ - static Datum - make_datum(clickhouse::ColumnRef col, size_t row, Oid *valtype, bool *is_null) + Datum + ch_binary_make_datum(clickhouse::ColumnRef col, size_t row, Oid *valtype, bool *is_null) { Datum ret = (Datum)0; @@ -1088,7 +1095,7 @@ extern "C" slot->nulls = (bool *)exc_palloc0(sizeof(bool) * len); for (size_t i = 0; i < len; ++i) - slot->datums[i] = make_datum(arr, i, &slot->item_type, &slot->nulls[i]); + slot->datums[i] = ch_binary_make_datum(arr, i, &slot->item_type, &slot->nulls[i]); } /* this one will need additional work, since we just return raw slot */ @@ -1112,7 +1119,7 @@ extern "C" slot->len = len; for (size_t i = 0; i < len; ++i) - slot->datums[i] = make_datum((*tuple)[i], row, &slot->types[i], &slot->nulls[i]); + slot->datums[i] = ch_binary_make_datum((*tuple)[i], row, &slot->types[i], &slot->nulls[i]); /* this one will need additional work, since we just return raw slot */ ret = PointerGetDatum(slot); @@ -1173,7 +1180,7 @@ extern "C" for (size_t i = 0; i < state->resp->columns_count; i++) { /* fill value and null arrays */ - state->values[i] = make_datum(block[i], state->row, &state->coltypes[i], &state->nulls[i]); + state->values[i] = ch_binary_make_datum(block[i], state->row, &state->coltypes[i], &state->nulls[i]); } res = true; diff --git a/src/binary_streaming.cpp b/src/binary_streaming.cpp new file mode 100644 index 00000000..deb3530d --- /dev/null +++ b/src/binary_streaming.cpp @@ -0,0 +1,272 @@ +#include +#include +#include +#include + +#include +#include + +extern "C" +{ + +#include "postgres.h" +#include "internal.h" +#include "engine.h" + +} + +#include "binary_internal.hh" + +using namespace clickhouse; + +static const char kBinaryStreamingCanceled[] = "query was canceled"; +static const char kBinaryStreamingOom[] = "out of memory"; + +struct ch_binary_streaming_state +{ + /* Current block returned by clickhouse-cpp. */ + std::optional current_block; + size_t current_row = 0; + bool have_block = false; + bool done = false; + + std::unique_ptr coltypes; + std::unique_ptr values; + std::unique_ptr nulls; + size_t columns_count = 0; + + std::optional error; + bool (*check_cancel) (void) = nullptr; + + Client *client = nullptr; + std::string sql; + QuerySettings settings; + QueryParams params; + + void + SetError(const char *message) + { + if (error) + return; + error.emplace(message ? message : kBinaryStreamingOom); + } + + const char * + GetError() const + { + return error ? error->c_str() : nullptr; + } +}; + +static bool +binary_streaming_fill_block(ch_binary_streaming_state * st) +{ + std::optional block; + + for (;;) + { + try + { + if (st->check_cancel && st->check_cancel()) + { + st->SetError(kBinaryStreamingCanceled); + st->done = true; + return false; + } + + block = st->client->ReceiveSelectBlock(); + } + catch (const std::exception & e) + { + st->SetError(e.what()); + st->done = true; + return false; + } + + if (!block) + { + try + { + st->client->EndSelect(); + } + catch (const std::exception & e) + { + st->SetError(e.what()); + } + st->current_block.reset(); + st->have_block = false; + st->done = true; + return false; + } + + /* Match the old callback path, which ignored zero-column blocks. */ + if (block->GetColumnCount() == 0) + continue; + + if (st->columns_count != 0 && + block->GetColumnCount() != st->columns_count) + { + st->SetError("columns mismatch in blocks"); + st->done = true; + return false; + } + + st->current_block = std::move(block); + st->columns_count = st->current_block->GetColumnCount(); + st->have_block = true; + st->current_row = 0; + return true; + } +} + +extern "C" +{ + + ch_binary_streaming_state * + ch_binary_begin_streaming(ch_binary_connection_t * conn, + const ch_query * query, + bool (*check_cancel) (void)) + { + ch_binary_streaming_state *st; + + st = new (std::nothrow) ch_binary_streaming_state(); + if (!st) + return NULL; + + st->check_cancel = check_cancel; + st->client = (Client *) conn->client; + st->sql = query->sql; + st->settings = ch_binary_settings(query); + st->params = ch_binary_params(query); + + try + { + st->client->BeginSelect(clickhouse::Query(st->sql) + .SetQuerySettings(st->settings) + .SetParams(st->params)); + } + catch (const std::exception & e) + { + st->SetError(e.what()); + st->done = true; + return st; + } + + (void) binary_streaming_fill_block(st); + + return st; + } + + bool + ch_binary_fetch_block(ch_binary_streaming_state * st) + { + if (!st || st->done) + return false; + if (st->have_block) + return true; + + return binary_streaming_fill_block(st); + } + + bool + ch_binary_streaming_read_row(ch_binary_streaming_state * st) + { + Block *block; + size_t row_count; + + if (!st || !st->have_block || !st->current_block) + return false; + + block = &*st->current_block; + row_count = block->GetRowCount(); + if (st->current_row >= row_count) + { + st->have_block = false; + return false; + } + + if (!st->coltypes && st->columns_count > 0) + { + st->coltypes.reset(new (std::nothrow) Oid[st->columns_count]); + st->values.reset(new (std::nothrow) Datum[st->columns_count]); + st->nulls.reset(new (std::nothrow) bool[st->columns_count]); + if (!st->coltypes || !st->values || !st->nulls) + { + st->SetError(kBinaryStreamingOom); + return false; + } + } + + try + { + for (size_t i = 0; i < st->columns_count; i++) + { + st->values[i] = ch_binary_make_datum((*block)[i], st->current_row, + &st->coltypes[i], &st->nulls[i]); + } + } + catch (const std::exception & e) + { + st->SetError(e.what()); + return false; + } + + st->current_row++; + return true; + } + + size_t + ch_binary_streaming_columns(ch_binary_streaming_state * st) + { + return st ? st->columns_count : 0; + } + + Datum + ch_binary_streaming_value(ch_binary_streaming_state * st, size_t col, + Oid * valtype, bool * is_null) + { + if (!st || col >= st->columns_count) + { + *is_null = true; + *valtype = InvalidOid; + return (Datum) 0; + } + + *valtype = st->coltypes[col]; + *is_null = st->nulls[col]; + return st->values[col]; + } + + const char * + ch_binary_streaming_error(ch_binary_streaming_state * st) + { + return st ? st->GetError() : NULL; + } + + void + ch_binary_end_streaming(ch_binary_streaming_state * st) + { + if (!st) + return; + + try + { + if (st->client) + st->client->EndSelect(); + } + catch (const std::exception &) + { + try + { + if (st->client) + st->client->ResetConnection(); + } + catch (const std::exception &) + { + } + } + + delete st; + } + +} diff --git a/src/fdw.c.in b/src/fdw.c.in index 1ffafd5f..f06f7173 100644 --- a/src/fdw.c.in +++ b/src/fdw.c.in @@ -34,7 +34,9 @@ #endif /* extension includes. */ +#include "commands/defrem.h" #include "utils/builtins.h" +#include "utils/guc.h" #include "binary.hh" #include "internal.h" #include "fdw.h" @@ -55,6 +57,9 @@ PG_MODULE_MAGIC; /* If no remote estimates, assume a sort costs 20% extra */ #define DEFAULT_FDW_SORT_MULTIPLIER 1.2 +/* Default number of remote rows to fetch per streamed batch. */ +#define DEFAULT_FETCH_SIZE 50000 + /* * Indexes of FDW-private information stored in fdw_private lists. * @@ -205,6 +210,7 @@ static int double *totaldeadrows); static void clickhouseBeginForeignScan(ForeignScanState * node, int eflags); static TupleTableSlot * clickhouseIterateForeignScan(ForeignScanState * node); +static void clickhouseReScanForeignScan(ForeignScanState * node); static void clickhouseEndForeignScan(ForeignScanState * node); static List * clickhousePlanForeignModify(PlannerInfo * root, ModifyTable * plan, @@ -355,6 +361,25 @@ clickhouseGetForeignRelSize(PlannerInfo * root, fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST; fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST; fpinfo->shippable_extensions = NIL; + fpinfo->fetch_size = DEFAULT_FETCH_SIZE; + + /* Server-level fetch_size overrides default. */ + foreach(lc, fpinfo->server->options) + { + DefElem *def = (DefElem *) lfirst(lc); + + if (strcmp(def->defname, "fetch_size") == 0) + (void) parse_int(defGetString(def), &fpinfo->fetch_size, 0, NULL); + } + + /* Table-level fetch_size overrides server-level. */ + foreach(lc, fpinfo->table->options) + { + DefElem *def = (DefElem *) lfirst(lc); + + if (strcmp(def->defname, "fetch_size") == 0) + (void) parse_int(defGetString(def), &fpinfo->fetch_size, 0, NULL); + } chfdw_apply_custom_table_options(fpinfo, foreigntableid); @@ -931,6 +956,9 @@ clickhouseIterateForeignScan(ForeignScanState * node) TupleDesc tupdesc; ch_query query = new_query(fsstate->query, fsstate->numParams, fsstate->param_values); + /* Allow query cancel (e.g. Ctrl+C) between tuple fetches. */ + CHECK_FOR_INTERRUPTS(); + /* make query if needed */ if (fsstate->ch_cursor == NULL) { @@ -949,8 +977,13 @@ clickhouseIterateForeignScan(ForeignScanState * node) values); } - fsstate->ch_cursor = fsstate->conn.methods->simple_query(fsstate->conn.conn, - &query); + if (fsstate->fetch_size > 0 && + fsstate->conn.methods->streaming_query != NULL) + fsstate->ch_cursor = fsstate->conn.methods->streaming_query( + fsstate->conn.conn, &query, fsstate->fetch_size); + else + fsstate->ch_cursor = fsstate->conn.methods->simple_query( + fsstate->conn.conn, &query); time_used += fsstate->ch_cursor->request_time; MemoryContextSwitchTo(old); @@ -979,6 +1012,23 @@ clickhouseIterateForeignScan(ForeignScanState * node) return slot; } +/* + * clickhouseReScanForeignScan + * Restart the scan from scratch by disposing the current cursor. + * The next IterateForeignScan call will re-issue the query. + */ +static void +clickhouseReScanForeignScan(ForeignScanState * node) +{ + ChFdwScanState *fsstate = (ChFdwScanState *) node->fdw_state; + + if (fsstate && fsstate->ch_cursor) + { + MemoryContextDelete(fsstate->ch_cursor->memcxt); + fsstate->ch_cursor = NULL; + } +} + /* * clickhouseEndForeignScan * Finish scanning foreign table and dispose objects used for this scan @@ -2927,7 +2977,7 @@ clickhouse_fdw_handler(PG_FUNCTION_ARGS) routine->GetForeignPlan = clickhouseGetForeignPlan; routine->BeginForeignScan = clickhouseBeginForeignScan; routine->IterateForeignScan = clickhouseIterateForeignScan; - routine->ReScanForeignScan = clickhouseEndForeignScan; + routine->ReScanForeignScan = clickhouseReScanForeignScan; routine->EndForeignScan = clickhouseEndForeignScan; /* Functions for updating foreign tables */ diff --git a/src/include/binary.hh b/src/include/binary.hh index b856a7bb..2c01d48a 100644 --- a/src/include/binary.hh +++ b/src/include/binary.hh @@ -74,6 +74,20 @@ extern "C" const ch_query * query, bool (*check_cancel) (void)); extern void ch_binary_response_free(ch_binary_response_t * resp); +/* Streaming binary API: block-at-a-time iteration */ + typedef struct ch_binary_streaming_state ch_binary_streaming_state; + + extern ch_binary_streaming_state * ch_binary_begin_streaming( + ch_binary_connection_t * conn, const ch_query * query, + bool (*check_cancel) (void)); + extern bool ch_binary_fetch_block(ch_binary_streaming_state * state); + extern bool ch_binary_streaming_read_row(ch_binary_streaming_state * state); + extern size_t ch_binary_streaming_columns(ch_binary_streaming_state * state); + extern Datum ch_binary_streaming_value(ch_binary_streaming_state * state, size_t col, + Oid * valtype, bool *is_null); + extern const char *ch_binary_streaming_error(ch_binary_streaming_state * state); + extern void ch_binary_end_streaming(ch_binary_streaming_state * state); + /* reading */ void ch_binary_read_state_init(ch_binary_read_state_t * state, ch_binary_response_t * resp); void ch_binary_read_state_free(ch_binary_read_state_t * state); diff --git a/src/include/binary_internal.hh b/src/include/binary_internal.hh new file mode 100644 index 00000000..b7f73fdf --- /dev/null +++ b/src/include/binary_internal.hh @@ -0,0 +1,18 @@ +#ifndef CLICKHOUSE_BINARY_INTERNAL_HH +#define CLICKHOUSE_BINARY_INTERNAL_HH + +#ifdef __cplusplus +extern "C" +{ +#endif + +clickhouse::QuerySettings ch_binary_settings(const ch_query * query); +clickhouse::QueryParams ch_binary_params(const ch_query * query); +Datum ch_binary_make_datum(clickhouse::ColumnRef col, size_t row, + Oid * valtype, bool * is_null); + +#ifdef __cplusplus +} +#endif + +#endif /* CLICKHOUSE_BINARY_INTERNAL_HH */ diff --git a/src/include/fdw.h b/src/include/fdw.h index 6adb9b8c..0e2cfe89 100644 --- a/src/include/fdw.h +++ b/src/include/fdw.h @@ -54,6 +54,10 @@ typedef struct ch_cursor double total_time; size_t columns_count; uintptr_t *conversion_states; /* for binary */ + + /* Streaming support for binary scans. */ + void *streaming_state; + bool is_streaming; } ch_cursor; typedef struct ChFdwScanRowContext @@ -69,6 +73,7 @@ typedef struct ChFdwScanRowContext typedef void (*disconnect_method) (void *conn); typedef void (*check_conn_method) (const char *password, UserMapping * user); typedef ch_cursor * (*simple_query_method) (void *conn, const ch_query * query); +typedef ch_cursor * (*streaming_query_method) (void *conn, const ch_query * query, int fetch_size); typedef void (*simple_insert_method) (void *conn, const ch_query * query); typedef Datum * (*cursor_fetch_row_method) (ChFdwScanRowContext * ctx); typedef void *(*prepare_insert_method) (void *conn, ResultRelInfo *, List *, @@ -79,6 +84,7 @@ typedef struct { disconnect_method disconnect; simple_query_method simple_query; + streaming_query_method streaming_query; cursor_fetch_row_method fetch_row; prepare_insert_method prepare_insert; insert_tuple_method insert_tuple; diff --git a/src/option.c b/src/option.c index a1e3c2cc..35113bc8 100644 --- a/src/option.c +++ b/src/option.c @@ -149,6 +149,8 @@ InitChFdwOptions(void) {"table_name", ForeignTableRelationId, false}, {"engine", ForeignTableRelationId, false}, {"driver", ForeignServerRelationId, false}, + {"fetch_size", ForeignServerRelationId, false}, + {"fetch_size", ForeignTableRelationId, false}, {"aggregatefunction", AttributeRelationId, false}, {"simpleaggregatefunction", AttributeRelationId, false}, {"column_name", AttributeRelationId, false}, diff --git a/src/pglink.c b/src/pglink.c index 9693453e..32e20922 100644 --- a/src/pglink.c +++ b/src/pglink.c @@ -39,6 +39,8 @@ static libclickhouse_methods http_methods = { .disconnect = http_disconnect, .simple_query = http_simple_query, + /* HTTP currently uses only the buffered query path. */ + .streaming_query = NULL, .fetch_row = http_fetch_row, .prepare_insert = http_prepare_insert, .insert_tuple = http_insert_tuple @@ -46,7 +48,9 @@ static libclickhouse_methods http_methods = static void binary_disconnect(void *conn); static ch_cursor * binary_simple_query(void *conn, const ch_query * query); +static ch_cursor * binary_streaming_query(void *conn, const ch_query * query, int fetch_size); static void binary_cursor_free(void *cursor); +static void binary_streaming_cursor_free(void *cursor); /* static void binary_simple_insert(void *conn, const char *query); */ static Datum * binary_fetch_row(ChFdwScanRowContext * ctx); @@ -62,6 +66,7 @@ static libclickhouse_methods binary_methods = { .disconnect = binary_disconnect, .simple_query = binary_simple_query, + .streaming_query = binary_streaming_query, .fetch_row = binary_fetch_row, .prepare_insert = binary_prepare_insert, .insert_tuple = binary_insert_tuple @@ -182,6 +187,85 @@ kill_query(void *conn, const char *query_id) ch_http_response_free(resp); } +static void +report_oom(void) +{ + ereport(ERROR, + (errcode(ERRCODE_FDW_OUT_OF_MEMORY), + errmsg("out of memory"))); +} + +static void +report_binary_query_error(const char *query_sql, const char *error) +{ + ereport(ERROR, + (errcode(ERRCODE_SQL_ROUTINE_EXCEPTION), + errmsg("pg_clickhouse: %s", error), + errdetail_internal("Remote Query: %.64000s", query_sql))); +} + +static void +report_binary_streaming_state(ch_binary_streaming_state * sstate, + const char *query_sql, + bool cleanup) +{ + const char *err = ch_binary_streaming_error(sstate); + char *errcopy; + + if (err == NULL) + return; + + errcopy = pstrdup(err); + if (cleanup) + ch_binary_end_streaming(sstate); + report_binary_query_error(query_sql, errcopy); +} + +static void +report_binary_streaming_read_error(ch_binary_streaming_state * sstate) +{ + const char *err = ch_binary_streaming_error(sstate); + + if (err == NULL) + return; + + ereport(ERROR, + (errcode(ERRCODE_SQL_ROUTINE_EXCEPTION), + errmsg("pg_clickhouse: error while reading row: %s", err))); +} + +/* + * Allocate a cursor in its own memory context with a cleanup callback. + * The memory context owns the cursor itself; the callback only releases the + * external resources hanging off it. + */ +static ch_cursor * +cursor_create(const char *sql, size_t columns_count, + MemoryContextCallbackFunction cleanup_func) +{ + MemoryContext tempcxt, + oldcxt; + ch_cursor *cursor; + + tempcxt = AllocSetContextCreate(PortalContext, "pg_clickhouse cursor", + ALLOCSET_DEFAULT_SIZES); + + oldcxt = MemoryContextSwitchTo(tempcxt); + cursor = palloc0(sizeof(ch_cursor)); + cursor->query = pstrdup(sql); + cursor->columns_count = columns_count; + if (columns_count > 0) + cursor->conversion_states = palloc0(sizeof(uintptr_t) * columns_count); + + cursor->memcxt = tempcxt; + cursor->callback.func = cleanup_func; + cursor->callback.arg = cursor; + MemoryContextRegisterResetCallback(tempcxt, &cursor->callback); + MemoryContextSwitchTo(oldcxt); + + return cursor; +} + static ch_cursor * http_simple_query(void *conn, const ch_query * query) { @@ -640,8 +724,6 @@ binary_disconnect(void *conn) static ch_cursor * binary_simple_query(void *conn, const ch_query * query) { - MemoryContext tempcxt, - oldcxt; ch_cursor *cursor; ch_binary_read_state_t *state; @@ -652,31 +734,21 @@ binary_simple_query(void *conn, const ch_query * query) char *error = pstrdup(resp->error); ch_binary_response_free(resp); - ereport(ERROR, ( - errcode(ERRCODE_SQL_ROUTINE_EXCEPTION), - errmsg("pg_clickhouse: %s", error), - errdetail_internal("Remote Query: %.64000s", query->sql) - )); + report_binary_query_error(query->sql, error); } - tempcxt = AllocSetContextCreate(PortalContext, "pg_clickhouse cursor", - ALLOCSET_DEFAULT_SIZES); + cursor = cursor_create(query->sql, resp->columns_count, + binary_cursor_free); - oldcxt = MemoryContextSwitchTo(tempcxt); - cursor = palloc0(sizeof(ch_cursor)); - cursor->query_response = resp; - state = (ch_binary_read_state_t *) palloc0(sizeof(ch_binary_read_state_t)); - cursor->query = pstrdup(query->sql); - cursor->read_state = state; - cursor->columns_count = resp->columns_count; - ch_binary_read_state_init(cursor->read_state, resp); - cursor->conversion_states = palloc0(sizeof(uintptr_t) * cursor->columns_count); + { + MemoryContext oldcxt = MemoryContextSwitchTo(cursor->memcxt); - cursor->memcxt = tempcxt; - cursor->callback.func = binary_cursor_free; - cursor->callback.arg = cursor; - MemoryContextRegisterResetCallback(tempcxt, &cursor->callback); - MemoryContextSwitchTo(oldcxt); + cursor->query_response = resp; + state = (ch_binary_read_state_t *) palloc0(sizeof(ch_binary_read_state_t)); + cursor->read_state = state; + ch_binary_read_state_init(state, resp); + MemoryContextSwitchTo(oldcxt); + } if (state->error) { @@ -689,6 +761,139 @@ binary_simple_query(void *conn, const ch_query * query) return cursor; } +static void +binary_streaming_cursor_free(void *c) +{ + ch_cursor *cursor = c; + + if (cursor->streaming_state) + ch_binary_end_streaming(cursor->streaming_state); +} + +/* + * Start a streaming query over the binary protocol. Returns a cursor + * that yields rows block-by-block via clickhouse-cpp's streaming select API. + */ +static ch_cursor * +binary_streaming_query(void *conn, const ch_query * query, int fetch_size) +{ + ch_binary_streaming_state *sstate; + + (void) fetch_size; + sstate = ch_binary_begin_streaming(conn, query, &is_canceled); + if (sstate == NULL) + report_oom(); + + report_binary_streaming_state(sstate, query->sql, true); + + { + ch_cursor *cursor; + + cursor = cursor_create(query->sql, + ch_binary_streaming_columns(sstate), + binary_streaming_cursor_free); + cursor->streaming_state = sstate; + cursor->is_streaming = true; + return cursor; + } +} + +/* + * Convert a single binary column value, initializing the conversion state + * on first use. Shared by both the buffered and streaming fetch_row paths. + */ +static Datum +binary_convert_column(ch_cursor * cursor, TupleDesc tupdesc, + int attindex, size_t colindex, + Datum val, Oid valtype, bool isnull) +{ + intptr_t convstate; + + if (isnull) + return (Datum) 0; + +retry: + convstate = cursor->conversion_states[colindex]; + switch (convstate) + { + case 0: + { + MemoryContext old_mcxt; + Oid outtype = TupleDescAttr(tupdesc, attindex)->atttypid; + void *s; + + /* + * Conversion states must outlive the per-tuple memory + * context, so allocate them in the cursor's memory context. + */ + old_mcxt = MemoryContextSwitchTo(cursor->memcxt); + s = ch_binary_init_convert_state(val, valtype, outtype); + MemoryContextSwitchTo(old_mcxt); + + if (s == NULL) + cursor->conversion_states[colindex] = 1; + else + cursor->conversion_states[colindex] = (uintptr_t) s; + goto retry; + } + case 1: + return val; + default: + return ch_binary_convert_datum((void *) convstate, val); + } +} + +/* + * Streaming variant of binary_fetch_row. Reads rows from the block-backed + * streaming state, fetching the next block when the current one is exhausted. + */ +static Datum * +binary_streaming_fetch_row(ChFdwScanRowContext * ctx) +{ + ListCell *lc; + ch_cursor *cursor = ctx->cursor; + ch_binary_streaming_state *sstate = cursor->streaming_state; + List *attrs = ctx->retrieved_attrs; + TupleDesc tupdesc = ctx->tupdesc; + Datum *values = ctx->values; + bool *nulls = ctx->nulls; + + /* Try to read a row; if block exhausted, fetch next block. */ + while (!ch_binary_streaming_read_row(sstate)) + { + report_binary_streaming_read_error(sstate); + + if (!ch_binary_fetch_block(sstate)) + { + report_binary_streaming_state(sstate, cursor->query, false); + return NULL; + } + } + + if (tupdesc) + { + size_t j = 0; + + Assert(values && nulls); + foreach(lc, attrs) + { + int i = lfirst_int(lc); + Oid valtype; + bool isnull; + Datum val; + + val = ch_binary_streaming_value(sstate, j, &valtype, &isnull); + values[i - 1] = binary_convert_column(cursor, tupdesc, + i - 1, j, + val, valtype, isnull); + nulls[i - 1] = isnull; + j++; + } + } + + return values; +} + /* * Fetch a row from the binary cursor and return its values. * @@ -713,10 +918,17 @@ binary_fetch_row(ChFdwScanRowContext * ctx) TupleDesc tupdesc = ctx->tupdesc; Datum *values = ctx->values; bool *nulls = ctx->nulls; - ch_binary_read_state_t *state = cursor->read_state; - bool have_data = ch_binary_read_row(state); + ch_binary_read_state_t *state; + bool have_data; size_t attcount = list_length(attrs); + /* Streaming cursors use a separate code path. */ + if (cursor->is_streaming) + return binary_streaming_fetch_row(ctx); + + state = cursor->read_state; + have_data = ch_binary_read_row(state); + if (state->error) ereport(ERROR, (errcode(ERRCODE_SQL_ROUTINE_EXCEPTION), @@ -757,51 +969,12 @@ binary_fetch_row(ChFdwScanRowContext * ctx) { int i = lfirst_int(lc); bool isnull = state->nulls[j]; - intptr_t convstate; - - - if (isnull) - values[i - 1] = (Datum) 0; - else - { - again: - convstate = cursor->conversion_states[j]; - switch (convstate) - { - case 0: - { - MemoryContext old_mcxt; - - Oid outtype = TupleDescAttr(tupdesc, i - 1)->atttypid; - void *s; - - /* - * now we're should be in temporary memory - * context, so make sure conversion states outlive - * it. - */ - old_mcxt = MemoryContextSwitchTo(cursor->memcxt); - s = ch_binary_init_convert_state(state->values[j], - state->coltypes[j], outtype); - MemoryContextSwitchTo(old_mcxt); - - if (s == NULL) - /* no conversion but state is initialized */ - cursor->conversion_states[j] = 1; - else - cursor->conversion_states[j] = (uintptr_t) s; - goto again; - } - case 1: - /* no conversion */ - values[i - 1] = state->values[j]; - break; - default: - values[i - 1] = ch_binary_convert_datum((void *) convstate, - state->values[j]); - } - } + values[i - 1] = binary_convert_column(cursor, tupdesc, + i - 1, j, + state->values[j], + state->coltypes[j], + isnull); nulls[i - 1] = isnull; j++; } diff --git a/vendor/clickhouse-cpp b/vendor/clickhouse-cpp index 3c94b448..d3fdc3b7 160000 --- a/vendor/clickhouse-cpp +++ b/vendor/clickhouse-cpp @@ -1 +1 @@ -Subproject commit 3c94b44894197860cabbba97c015b967055a55c2 +Subproject commit d3fdc3b712a8c5f1e30f14fdc3885198fd48a94e