Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/arrow_scan_ipc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ unique_ptr <FunctionData> ArrowIPCTableFunction::ArrowScanBind(ClientContext &co
uint64_t ptr = unpacked[0].GetValue<uint64_t>();
uint64_t size = unpacked[1].GetValue<uint64_t>();

if (stream_decoder->buffer()->is_eos()) {
throw IOException("More IPC buffers passed while the stream has already ended");
}

// Feed stream into decoder
auto res = stream_decoder->Consume((const uint8_t *) ptr, size);

Expand Down
18 changes: 18 additions & 0 deletions test/nodejs/arrow_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,24 @@ for (const [name, fun] of Object.entries(to_ipc_functions)) {
});
});
});

it(`Registering multiple IPC buffers for the same table in DuckDB`, async () => {
const table = await fun(db, 'SELECT * FROM range(1, 10000) tbl(i)');

db.register_buffer("table1", table, true, (err) => {
assert(!err);
});
});

it(`Registering IPC buffers for two different tables fails`, async () => {
const ipc_buffers1 = await fun(db, 'SELECT * FROM range(1, 3) tbl(i)');
const ipc_buffers2 = await fun(db, 'SELECT * FROM range(2, 4) tbl(i)');

// This will fail when reading buffers for the second table
db.register_buffer("table1", [...ipc_buffers1, ...ipc_buffers2], true, (err) => {
assert(err);
});
});
})
}

Expand Down