diff --git a/build.zig b/build.zig index e3f8464..b816a31 100644 --- a/build.zig +++ b/build.zig @@ -34,6 +34,16 @@ pub fn build(b: *std.Build) void { build_options.addOption([]const u8, "version", version); exe.root_module.addOptions("build_options", build_options); + // Translate sqlite3.h to Zig declarations, exposed as @import("c"). + // We always use lib/sqlite3.h for stable type declarations regardless of + // whether we bundle or link the system SQLite library. + const translate_c = b.addTranslateC(.{ + .root_source_file = b.path("lib/sqlite3.h"), + .target = target, + .optimize = optimize, + }); + exe.root_module.addImport("c", translate_c.createModule()); + if (bundle_sqlite) { exe.root_module.addIncludePath(b.path("lib")); exe.root_module.addCSourceFile(.{ @@ -481,6 +491,154 @@ pub fn build(b: *std.Build) void { test_output_bad_path.step.dependOn(b.getInstallStep()); test_step.dependOn(&test_output_bad_path.step); + // ─── JSON / NDJSON input/output integration tests ──────────────────────── + + // Integration test 48: JSON array input → CSV output + const test_json_input_csv_out = b.addSystemCommand(&.{ + "bash", "-c", + \\result=$(printf '[{"name":"Alice","age":30},{"name":"Bob","age":25}]' \ + \\ | ./zig-out/bin/sql-pipe --input-format json 'SELECT name,age FROM t ORDER BY age') + \\expected=$(printf 'Bob,25\nAlice,30') + \\[ "$result" = "$expected" ] + }); + test_json_input_csv_out.step.dependOn(b.getInstallStep()); + test_step.dependOn(&test_json_input_csv_out.step); + + // Integration test 49: CSV input → JSON output (--json alias) + const test_csv_to_json = b.addSystemCommand(&.{ + "bash", "-c", + \\result=$(printf 'name,age\nAlice,30\nBob,25\n' \ + \\ | ./zig-out/bin/sql-pipe --json 'SELECT name,age FROM t ORDER BY age') + \\expected=$(printf '[{"name":"Bob","age":25},{"name":"Alice","age":30}]\n') + \\[ "$result" = "$expected" ] + }); + test_csv_to_json.step.dependOn(b.getInstallStep()); + test_step.dependOn(&test_csv_to_json.step); + + // Integration test 50: CSV input → JSON output (--output-format json) + const test_output_format_json = b.addSystemCommand(&.{ + "bash", "-c", + \\result=$(printf 'name,age\nAlice,30\n' \ + \\ | ./zig-out/bin/sql-pipe --output-format json 'SELECT * FROM t') + \\expected=$(printf '[{"name":"Alice","age":30}]\n') + \\[ "$result" = "$expected" ] + }); + test_output_format_json.step.dependOn(b.getInstallStep()); + test_step.dependOn(&test_output_format_json.step); + + // Integration test 51: CSV input → NDJSON output + const test_csv_to_ndjson = b.addSystemCommand(&.{ + "bash", "-c", + \\result=$(printf 'name,age\nAlice,30\nBob,25\n' \ + \\ | ./zig-out/bin/sql-pipe --output-format ndjson 'SELECT name,age FROM t ORDER BY age') + \\expected=$(printf '{"name":"Bob","age":25}\n{"name":"Alice","age":30}\n') + \\[ "$result" = "$expected" ] + }); + test_csv_to_ndjson.step.dependOn(b.getInstallStep()); + test_step.dependOn(&test_csv_to_ndjson.step); + + // Integration test 52: NDJSON input → CSV output + const test_ndjson_input_csv_out = b.addSystemCommand(&.{ + "bash", "-c", + \\result=$(printf '{"name":"Alice","age":30}\n{"name":"Bob","age":25}\n' \ + \\ | ./zig-out/bin/sql-pipe --input-format ndjson 'SELECT name,age FROM t ORDER BY age') + \\expected=$(printf 'Bob,25\nAlice,30') + \\[ "$result" = "$expected" ] + }); + test_ndjson_input_csv_out.step.dependOn(b.getInstallStep()); + test_step.dependOn(&test_ndjson_input_csv_out.step); + + // Integration test 53: NDJSON input → NDJSON output (-I / -O short flags) + const test_ndjson_roundtrip = b.addSystemCommand(&.{ + "bash", "-c", + \\result=$(printf '{"name":"Alice","age":30}\n{"name":"Bob","age":25}\n' \ + \\ | ./zig-out/bin/sql-pipe -I ndjson -O ndjson 'SELECT name FROM t WHERE age > 26') + \\expected=$(printf '{"name":"Alice"}\n') + \\[ "$result" = "$expected" ] + }); + test_ndjson_roundtrip.step.dependOn(b.getInstallStep()); + test_step.dependOn(&test_ndjson_roundtrip.step); + + // Integration test 54: JSON input with null value + const test_json_null_value = b.addSystemCommand(&.{ + "bash", "-c", + \\result=$(printf '[{"name":"Alice","age":null}]' \ + \\ | ./zig-out/bin/sql-pipe -I json -O json 'SELECT name, age FROM t') + \\expected=$(printf '[{"name":"Alice","age":null}]\n') + \\[ "$result" = "$expected" ] + }); + test_json_null_value.step.dependOn(b.getInstallStep()); + test_step.dependOn(&test_json_null_value.step); + + // Integration test 55: JSON input with boolean value (stored as 1/0) + const test_json_bool_value = b.addSystemCommand(&.{ + "bash", "-c", + \\result=$(printf '[{"active":true},{"active":false}]' \ + \\ | ./zig-out/bin/sql-pipe -I json 'SELECT active FROM t ORDER BY active') + \\expected=$(printf '0\n1') + \\[ "$result" = "$expected" ] + }); + test_json_bool_value.step.dependOn(b.getInstallStep()); + test_step.dependOn(&test_json_bool_value.step); + + // Integration test 56: empty JSON array → error exit 2 + const test_json_empty_array = b.addSystemCommand(&.{ + "bash", "-c", + \\msg=$(printf '[]' | ./zig-out/bin/sql-pipe -I json 'SELECT * FROM t' 2>&1 >/dev/null; echo "EXIT:$?") + \\echo "$msg" | grep -q 'empty JSON array' && echo "$msg" | grep -q 'EXIT:2' + }); + test_json_empty_array.step.dependOn(b.getInstallStep()); + test_step.dependOn(&test_json_empty_array.step); + + // Integration test 57: unknown input format → error exit 1 + const test_bad_input_format = b.addSystemCommand(&.{ + "bash", "-c", + \\msg=$(printf '' | ./zig-out/bin/sql-pipe --input-format xml 'SELECT 1' 2>&1 >/dev/null; echo "EXIT:$?") + \\echo "$msg" | grep -q 'unknown input format' && echo "$msg" | grep -q 'EXIT:1' + }); + test_bad_input_format.step.dependOn(b.getInstallStep()); + test_step.dependOn(&test_bad_input_format.step); + + // Integration test 58: unknown output format → error exit 1 + const test_bad_output_format = b.addSystemCommand(&.{ + "bash", "-c", + \\msg=$(printf 'a\n1\n' | ./zig-out/bin/sql-pipe --output-format xml 'SELECT * FROM t' 2>&1 >/dev/null; echo "EXIT:$?") + \\echo "$msg" | grep -q 'unknown output format' && echo "$msg" | grep -q 'EXIT:1' + }); + test_bad_output_format.step.dependOn(b.getInstallStep()); + test_step.dependOn(&test_bad_output_format.step); + + // Integration test 59: --header with --output-format json → IncompatibleFlags error exit 1 + const test_header_with_json_format = b.addSystemCommand(&.{ + "bash", "-c", + \\msg=$(printf 'a\n1\n' | ./zig-out/bin/sql-pipe --header --output-format json 'SELECT * FROM t' 2>&1; echo "EXIT:$?") + \\echo "$msg" | grep -q 'error:' && echo "$msg" | grep -q 'EXIT:1' + }); + test_header_with_json_format.step.dependOn(b.getInstallStep()); + test_step.dependOn(&test_header_with_json_format.step); + + // Integration test 60: --columns with JSON input + const test_columns_json_input = b.addSystemCommand(&.{ + "bash", "-c", + \\result=$(printf '[{"name":"Alice","age":30}]' \ + \\ | ./zig-out/bin/sql-pipe --input-format json --columns) + \\expected=$(printf 'name\nage') + \\[ "$result" = "$expected" ] + }); + test_columns_json_input.step.dependOn(b.getInstallStep()); + test_step.dependOn(&test_columns_json_input.step); + + // Integration test 61: --columns with NDJSON input + const test_columns_ndjson_input = b.addSystemCommand(&.{ + "bash", "-c", + \\result=$(printf '{"name":"Alice","age":30}\n' \ + \\ | ./zig-out/bin/sql-pipe -I ndjson --columns) + \\expected=$(printf 'name\nage') + \\[ "$result" = "$expected" ] + }); + test_columns_ndjson_input.step.dependOn(b.getInstallStep()); + test_step.dependOn(&test_columns_ndjson_input.step); + // Unit tests for the RFC 4180 CSV parser (src/csv.zig) const unit_tests = b.addTest(.{ .root_module = b.createModule(.{ diff --git a/src/json.zig b/src/json.zig new file mode 100644 index 0000000..869c086 --- /dev/null +++ b/src/json.zig @@ -0,0 +1,546 @@ +//! JSON and NDJSON I/O — input loading and output formatting. +//! +//! Input +//! ───── +//! loadJsonArray — read all of `reader` as a JSON array of objects, +//! create table `t` in `db`, and insert every row. +//! loadNdjsonInput — stream `reader` line-by-line as NDJSON objects, +//! create table `t` in `db`, and insert every row. +//! +//! Output +//! ────── +//! writeJsonString — emit a JSON string literal with RFC 8259 escaping. +//! printJsonRow — emit one SQLite result row as a JSON object. +//! printNdjsonRow — emit one SQLite result row as an NDJSON line. +//! +//! Shared helpers +//! ────────────── +//! bindJsonValue — bind a std.json.Value to a prepared-statement parameter. +//! insertRowFromJson — bind all fields of a JSON object and step the statement. +//! readLine — read one line from a reader (also used by runColumns in main). + +const std = @import("std"); +const c = @import("c"); + +/// SQLITE_STATIC: caller manages string lifetime; SQLite must not free it. +const sqlite_static: c.sqlite3_destructor_type = null; + +const exit_usage: u8 = 1; +const exit_parse: u8 = 2; +const exit_sql: u8 = 3; + +// ─── Internal utilities ─────────────────────────────── + +fn fatal(comptime fmt: []const u8, writer: *std.Io.Writer, code: u8, args: anytype) noreturn { + writer.print("error: " ++ fmt ++ "\n", args) catch |err| std.log.err("failed to write error: {}", .{err}); + writer.flush() catch |err| std.log.err("failed to flush: {}", .{err}); + std.process.exit(code); +} + +/// Create table `t` with all-TEXT columns. Column names are double-quote–escaped +/// per SQL identifier rules. +fn createAllTextTable( + allocator: std.mem.Allocator, + db: *c.sqlite3, + cols: []const []const u8, + writer: *std.Io.Writer, +) void { + var sql: std.ArrayList(u8) = .empty; + defer sql.deinit(allocator); + + sql.appendSlice(allocator, "CREATE TABLE t (") catch fatal("out of memory", writer, exit_parse, .{}); + for (cols, 0..) |col, i| { + if (i > 0) sql.appendSlice(allocator, ", ") catch fatal("out of memory", writer, exit_parse, .{}); + sql.append(allocator, '"') catch fatal("out of memory", writer, exit_parse, .{}); + for (col) |ch| { + if (ch == '"') sql.append(allocator, '"') catch fatal("out of memory", writer, exit_parse, .{}); + sql.append(allocator, ch) catch fatal("out of memory", writer, exit_parse, .{}); + } + sql.appendSlice(allocator, "\" TEXT") catch fatal("out of memory", writer, exit_parse, .{}); + } + sql.appendSlice(allocator, ")") catch fatal("out of memory", writer, exit_parse, .{}); + sql.append(allocator, 0) catch fatal("out of memory", writer, exit_parse, .{}); + + var errmsg: [*c]u8 = null; + if (c.sqlite3_exec(db, sql.items.ptr, null, null, &errmsg) != c.SQLITE_OK) { + const msg = if (errmsg != null) std.mem.span(errmsg) else std.mem.span(c.sqlite3_errmsg(db)); + if (errmsg != null) c.sqlite3_free(errmsg); + fatal("{s}", writer, exit_sql, .{msg}); + } +} + +/// Prepare `INSERT INTO t VALUES (?, …, ?)` with n parameters. +fn prepareInsertStmt( + allocator: std.mem.Allocator, + db: *c.sqlite3, + n: usize, + writer: *std.Io.Writer, +) *c.sqlite3_stmt { + var sql: std.ArrayList(u8) = .empty; + defer sql.deinit(allocator); + + sql.appendSlice(allocator, "INSERT INTO t VALUES (") catch fatal("out of memory", writer, exit_parse, .{}); + for (0..n) |i| { + if (i > 0) sql.append(allocator, ',') catch fatal("out of memory", writer, exit_parse, .{}); + sql.append(allocator, '?') catch fatal("out of memory", writer, exit_parse, .{}); + } + sql.appendSlice(allocator, ")") catch fatal("out of memory", writer, exit_parse, .{}); + sql.append(allocator, 0) catch fatal("out of memory", writer, exit_parse, .{}); + + var stmt: ?*c.sqlite3_stmt = null; + if (c.sqlite3_prepare_v2(db, sql.items.ptr, -1, &stmt, null) != c.SQLITE_OK) + fatal("{s}", writer, exit_sql, .{std.mem.span(c.sqlite3_errmsg(db))}); + return stmt.?; +} + +fn beginTransaction(db: *c.sqlite3, writer: *std.Io.Writer) void { + var errmsg: [*c]u8 = null; + if (c.sqlite3_exec(db, "BEGIN TRANSACTION", null, null, &errmsg) != c.SQLITE_OK) { + const msg = if (errmsg != null) std.mem.span(errmsg) else std.mem.span(c.sqlite3_errmsg(db)); + if (errmsg != null) c.sqlite3_free(errmsg); + fatal("{s}", writer, exit_sql, .{msg}); + } +} + +fn commitTransaction(db: *c.sqlite3, writer: *std.Io.Writer) void { + var errmsg: [*c]u8 = null; + if (c.sqlite3_exec(db, "COMMIT", null, null, &errmsg) != c.SQLITE_OK) { + const msg = if (errmsg != null) std.mem.span(errmsg) else std.mem.span(c.sqlite3_errmsg(db)); + if (errmsg != null) c.sqlite3_free(errmsg); + fatal("{s}", writer, exit_sql, .{msg}); + } + if (errmsg != null) c.sqlite3_free(errmsg); +} + +// ─── Shared helpers ─────────────────────────────────── + +/// Read one line from reader, stripping the line terminator (LF and optional preceding CR). +/// +/// Pre: reader is positioned at the start of the next line (or EOF) +/// Post: result = null ⟺ reader was at EOF before any bytes were read +/// result = line ⟺ bytes up to (but not including) the next '\n' are returned; +/// trailing '\r' before the '\n' is stripped; heap-allocated +pub fn readLine( + allocator: std.mem.Allocator, + reader: *std.Io.Reader, +) (std.mem.Allocator.Error || error{ReadFailed})!?[]u8 { + var line: std.ArrayList(u8) = .empty; + errdefer line.deinit(allocator); + var got_any = false; + // Loop invariant I: line contains bytes of the current line read so far (excl. terminator) + // Bounding function: bytes remaining in stream (stream is finite for well-formed input) + while (true) { + const byte = reader.takeByte() catch |err| switch (err) { + error.EndOfStream => { + if (!got_any) { + line.deinit(allocator); + return null; + } + break; + }, + error.ReadFailed => return error.ReadFailed, + }; + got_any = true; + if (byte == '\n') break; + try line.append(allocator, byte); + } + // Strip trailing \r for \r\n line endings + if (line.items.len > 0 and line.items[line.items.len - 1] == '\r') { + line.items.len -= 1; + } + const result = try line.toOwnedSlice(allocator); + return result; +} + +/// bindJsonValue(allocator, stmt, col_idx, value, deferred_allocs) → void +/// +/// Pre: stmt is a prepared statement; col_idx ≥ 1 +/// deferred_allocs collects allocations that must outlive sqlite3_step +/// Post: value is bound to parameter col_idx: +/// .null → sqlite3_bind_null +/// .bool → sqlite3_bind_int64 (1 / 0) +/// .integer → sqlite3_bind_int64 +/// .float → sqlite3_bind_double +/// .number_string → sqlite3_bind_text (STATIC; arena owns the string) +/// .string → sqlite3_bind_text (STATIC; arena owns the string) +/// .array/.object → sqlite3_bind_text (JSON-serialised; owned by deferred_allocs) +/// error.BindFailed on any sqlite3_bind_* failure +pub fn bindJsonValue( + allocator: std.mem.Allocator, + stmt: *c.sqlite3_stmt, + col_idx: c_int, + value: std.json.Value, + deferred_allocs: *std.ArrayList([]u8), +) (error{BindFailed} || std.mem.Allocator.Error)!void { + switch (value) { + .null => { + if (c.sqlite3_bind_null(stmt, col_idx) != c.SQLITE_OK) return error.BindFailed; + }, + .bool => |b| { + if (c.sqlite3_bind_int64(stmt, col_idx, if (b) 1 else 0) != c.SQLITE_OK) return error.BindFailed; + }, + .integer => |n| { + if (c.sqlite3_bind_int64(stmt, col_idx, n) != c.SQLITE_OK) return error.BindFailed; + }, + .float => |f| { + if (c.sqlite3_bind_double(stmt, col_idx, f) != c.SQLITE_OK) return error.BindFailed; + }, + .number_string => |s| { + if (c.sqlite3_bind_text(stmt, col_idx, s.ptr, @intCast(s.len), sqlite_static) != c.SQLITE_OK) + return error.BindFailed; + }, + .string => |s| { + if (c.sqlite3_bind_text(stmt, col_idx, s.ptr, @intCast(s.len), sqlite_static) != c.SQLITE_OK) + return error.BindFailed; + }, + .array, .object => { + // Serialise to JSON text; defer free must happen AFTER sqlite3_step, so we + // hand the allocation to the caller via deferred_allocs. + const serialized = try std.fmt.allocPrint(allocator, "{f}", .{std.json.fmt(value, .{})}); + try deferred_allocs.append(allocator, serialized); + if (c.sqlite3_bind_text(stmt, col_idx, serialized.ptr, @intCast(serialized.len), sqlite_static) != c.SQLITE_OK) + return error.BindFailed; + }, + } +} + +/// insertRowFromJson(allocator, stmt, cols, obj) → void +/// +/// Pre: stmt is a prepared INSERT with cols.len parameters, ready for reset +/// cols is the ordered list of column names used to look up values in obj +/// obj is a JSON object +/// Post: each column in cols is bound to the corresponding value in obj +/// (SQL NULL when the key is absent); sqlite3_step returned SQLITE_DONE +/// error.BindFailed / error.StepFailed on SQLite errors +pub fn insertRowFromJson( + allocator: std.mem.Allocator, + stmt: *c.sqlite3_stmt, + cols: []const []const u8, + obj: std.json.ObjectMap, +) (error{ BindFailed, StepFailed } || std.mem.Allocator.Error)!void { + _ = c.sqlite3_reset(stmt); + _ = c.sqlite3_clear_bindings(stmt); + + // Collect serialized array/object strings so they outlive sqlite3_step. + var deferred_allocs: std.ArrayList([]u8) = .empty; + defer { + for (deferred_allocs.items) |s| allocator.free(s); + deferred_allocs.deinit(allocator); + } + + // Loop invariant I: params 1..col_idx-1 are bound for cols[0..col_idx-2] + // Bounding function: cols.len - j + for (cols, 0..) |col, j| { + const col_idx: c_int = @intCast(j + 1); + if (obj.get(col)) |val| { + try bindJsonValue(allocator, stmt, col_idx, val, &deferred_allocs); + } else { + if (c.sqlite3_bind_null(stmt, col_idx) != c.SQLITE_OK) return error.BindFailed; + } + } + + if (c.sqlite3_step(stmt) != c.SQLITE_DONE) return error.StepFailed; +} + +// ─── Input loading ──────────────────────────────────── + +/// loadJsonArray(allocator, reader, db, max_rows, stderr_writer) → usize +/// +/// Pre: reader is positioned at the start of a JSON document +/// db is an open, empty SQLite database +/// Post: table `t` is created with TEXT columns derived from the first object's keys; +/// all elements of the JSON array are inserted as rows +/// result = number of rows inserted +/// aborts the process on any parse, I/O, or SQL error +pub fn loadJsonArray( + allocator: std.mem.Allocator, + reader: *std.Io.Reader, + db: *c.sqlite3, + max_rows: ?usize, + stderr_writer: *std.Io.Writer, +) usize { + // Read all input into a buffer + var buf: std.ArrayList(u8) = .empty; + defer buf.deinit(allocator); + // Loop invariant I: buf contains all bytes read from reader so far + // Bounding function: bytes remaining in reader (finite input) + while (true) { + const byte = reader.takeByte() catch |err| switch (err) { + error.EndOfStream => break, + error.ReadFailed => fatal("failed to read JSON input", stderr_writer, exit_parse, .{}), + }; + buf.append(allocator, byte) catch fatal("out of memory reading JSON input", stderr_writer, exit_parse, .{}); + } + + if (buf.items.len == 0) fatal("empty input", stderr_writer, exit_parse, .{}); + + var parsed = std.json.parseFromSlice(std.json.Value, allocator, buf.items, .{}) catch + fatal("failed to parse JSON input", stderr_writer, exit_parse, .{}); + defer parsed.deinit(); + + const array = switch (parsed.value) { + .array => |a| a, + else => fatal("JSON input must be an array of objects", stderr_writer, exit_parse, .{}), + }; + + if (array.items.len == 0) fatal("empty JSON array: cannot determine column names", stderr_writer, exit_parse, .{}); + + // Extract column names from the first object's keys (insertion order) + const first_obj = switch (array.items[0]) { + .object => |o| o, + else => fatal("JSON array elements must be objects", stderr_writer, exit_parse, .{}), + }; + + var cols: std.ArrayList([]const u8) = .empty; + defer cols.deinit(allocator); + var key_iter = first_obj.iterator(); + while (key_iter.next()) |entry| { + cols.append(allocator, entry.key_ptr.*) catch + fatal("out of memory building column list", stderr_writer, exit_parse, .{}); + } + if (cols.items.len == 0) fatal("first JSON object has no keys", stderr_writer, exit_parse, .{}); + + // Create all-TEXT table (column names are owned by parsed arena — valid until parsed.deinit()) + createAllTextTable(allocator, db, cols.items, stderr_writer); + beginTransaction(db, stderr_writer); + + const stmt = prepareInsertStmt(allocator, db, cols.items.len, stderr_writer); + defer _ = c.sqlite3_finalize(stmt); + + var rows_inserted: usize = 0; + // Loop invariant I: array.items[0..rows_inserted] have been inserted into t + // Bounding function: array.items.len - rows_inserted + for (array.items) |item| { + const obj = switch (item) { + .object => |o| o, + else => fatal("JSON array element is not an object", stderr_writer, exit_parse, .{}), + }; + rows_inserted += 1; + if (max_rows) |limit| { + if (rows_inserted > limit) + fatal("input exceeds --max-rows limit ({d} rows)", stderr_writer, exit_usage, .{limit}); + } + insertRowFromJson(allocator, stmt, cols.items, obj) catch + fatal("{s}", stderr_writer, exit_sql, .{std.mem.span(c.sqlite3_errmsg(db))}); + } + + commitTransaction(db, stderr_writer); + return rows_inserted; +} + +/// loadNdjsonInput(allocator, reader, db, max_rows, stderr_writer) → usize +/// +/// Pre: reader is positioned at the start of a newline-delimited JSON stream +/// db is an open, empty SQLite database +/// Post: table `t` is created with TEXT columns derived from the first non-blank line; +/// every non-blank line is parsed as a JSON object and inserted as a row +/// result = number of rows inserted +/// aborts the process on any parse, I/O, or SQL error +pub fn loadNdjsonInput( + allocator: std.mem.Allocator, + reader: *std.Io.Reader, + db: *c.sqlite3, + max_rows: ?usize, + stderr_writer: *std.Io.Writer, +) usize { + var line_num: usize = 0; + // cols_owned: column names duplicated into allocator; populated on first object + var cols_owned: ?[][]u8 = null; + defer if (cols_owned) |cs| { + for (cs) |col| allocator.free(col); + allocator.free(cs); + }; + var insert_stmt: ?*c.sqlite3_stmt = null; + defer if (insert_stmt) |s| { _ = c.sqlite3_finalize(s); }; + var rows_inserted: usize = 0; + var in_transaction = false; + + // Loop invariant I: all non-blank lines 1..line_num have been processed; + // rows_inserted = number of objects inserted; in_transaction is true after first object + // Bounding function: lines remaining in reader (finite input) + while (true) { + line_num += 1; + const line = readLine(allocator, reader) catch |err| switch (err) { + error.OutOfMemory => fatal("out of memory reading NDJSON", stderr_writer, exit_parse, .{}), + error.ReadFailed => fatal("line {d}: failed to read NDJSON input", stderr_writer, exit_parse, .{line_num}), + } orelse break; + defer allocator.free(line); + + const trimmed = std.mem.trim(u8, line, " \t\r"); + if (trimmed.len == 0) { + line_num -= 1; + continue; // skip blank lines + } + + var parsed_line = std.json.parseFromSlice(std.json.Value, allocator, trimmed, .{}) catch + fatal("line {d}: failed to parse NDJSON", stderr_writer, exit_parse, .{line_num}); + defer parsed_line.deinit(); + + const obj = switch (parsed_line.value) { + .object => |o| o, + else => fatal("line {d}: NDJSON element must be a JSON object", stderr_writer, exit_parse, .{line_num}), + }; + + if (cols_owned == null) { + // First object: extract column names and create table + var col_list: std.ArrayList([]u8) = .empty; + errdefer { + for (col_list.items) |col| allocator.free(col); + col_list.deinit(allocator); + } + var ki = obj.iterator(); + while (ki.next()) |entry| { + const owned_key = allocator.dupe(u8, entry.key_ptr.*) catch + fatal("out of memory building column list", stderr_writer, exit_parse, .{}); + col_list.append(allocator, owned_key) catch + fatal("out of memory building column list", stderr_writer, exit_parse, .{}); + } + if (col_list.items.len == 0) + fatal("line 1: first NDJSON object has no keys", stderr_writer, exit_parse, .{}); + + cols_owned = col_list.toOwnedSlice(allocator) catch + fatal("out of memory", stderr_writer, exit_parse, .{}); + + const cols_const: []const []const u8 = @ptrCast(cols_owned.?); + createAllTextTable(allocator, db, cols_const, stderr_writer); + beginTransaction(db, stderr_writer); + in_transaction = true; + + insert_stmt = prepareInsertStmt(allocator, db, cols_owned.?.len, stderr_writer); + } + + rows_inserted += 1; + if (max_rows) |limit| { + if (rows_inserted > limit) + fatal("input exceeds --max-rows limit ({d} rows)", stderr_writer, exit_usage, .{limit}); + } + + const cols_const: []const []const u8 = @ptrCast(cols_owned.?); + insertRowFromJson(allocator, insert_stmt.?, cols_const, obj) catch + fatal("line {d}: {s}", stderr_writer, exit_sql, .{ line_num, std.mem.span(c.sqlite3_errmsg(db)) }); + } + + if (cols_owned == null) + fatal("empty NDJSON input", stderr_writer, exit_parse, .{}); + + if (in_transaction) commitTransaction(db, stderr_writer); + return rows_inserted; +} + +// ─── Output formatting ──────────────────────────────── + +/// writeJsonString(writer, s) → !void +/// +/// Pre: s is a valid UTF-8 slice +/// Post: s is emitted to writer as a JSON string literal enclosed in double-quotes, +/// with all RFC 8259–required characters escaped +pub fn writeJsonString(writer: *std.Io.Writer, s: []const u8) !void { + try writer.writeByte('"'); + for (s) |ch| { + switch (ch) { + '"' => try writer.writeAll("\\\""), + '\\' => try writer.writeAll("\\\\"), + '/' => try writer.writeAll("\\/"), + '\x08' => try writer.writeAll("\\b"), + '\x0C' => try writer.writeAll("\\f"), + '\n' => try writer.writeAll("\\n"), + '\r' => try writer.writeAll("\\r"), + '\t' => try writer.writeAll("\\t"), + 0x00...0x07, 0x0B, 0x0E...0x1F => try writer.print("\\u{x:0>4}", .{ch}), + else => try writer.writeByte(ch), + } + } + try writer.writeByte('"'); +} + +/// printJsonRow(stmt, col_count, col_names, writer, is_first) → !void +/// +/// Pre: sqlite3_step returned SQLITE_ROW for stmt +/// col_count = sqlite3_column_count(stmt) > 0 +/// col_names.len ≥ col_count +/// Post: one JSON object written to writer representing the current row; +/// preceded by a ',' separator when is_first = false +pub fn printJsonRow( + stmt: *c.sqlite3_stmt, + col_count: c_int, + col_names: []const [*:0]const u8, + writer: *std.Io.Writer, + is_first: bool, +) !void { + if (!is_first) try writer.writeByte(','); + try writer.writeByte('{'); + // Loop invariant I: columns 0..i-1 have been written, separated by commas + // Bounding function: col_count - i + var i: c_int = 0; + while (i < col_count) : (i += 1) { + if (i > 0) try writer.writeByte(','); + const name = std.mem.span(col_names[@intCast(i)]); + try writeJsonString(writer, name); + try writer.writeByte(':'); + switch (c.sqlite3_column_type(stmt, i)) { + c.SQLITE_NULL => try writer.writeAll("null"), + c.SQLITE_INTEGER => try writer.print("{d}", .{c.sqlite3_column_int64(stmt, i)}), + c.SQLITE_FLOAT => { + const f = c.sqlite3_column_double(stmt, i); + if (f == @trunc(f) and !std.math.isInf(f) and !std.math.isNan(f)) { + try writer.print("{d}", .{@as(i64, @intFromFloat(f))}); + } else { + try writer.print("{d}", .{f}); + } + }, + else => { + const ptr = c.sqlite3_column_text(stmt, i); + if (ptr != null) { + try writeJsonString(writer, std.mem.span(@as([*:0]const u8, @ptrCast(ptr)))); + } else { + try writer.writeAll("null"); + } + }, + } + } + try writer.writeByte('}'); +} + +/// printNdjsonRow(stmt, col_count, col_names, writer) → !void +/// +/// Pre: sqlite3_step returned SQLITE_ROW for stmt +/// col_count > 0; col_names.len ≥ col_count +/// Post: one JSON object followed by '\n' written to writer +pub fn printNdjsonRow( + stmt: *c.sqlite3_stmt, + col_count: c_int, + col_names: []const [*:0]const u8, + writer: *std.Io.Writer, +) !void { + try writer.writeByte('{'); + // Loop invariant I: columns 0..i-1 have been written, separated by commas + // Bounding function: col_count - i + var i: c_int = 0; + while (i < col_count) : (i += 1) { + if (i > 0) try writer.writeByte(','); + const name = std.mem.span(col_names[@intCast(i)]); + try writeJsonString(writer, name); + try writer.writeByte(':'); + switch (c.sqlite3_column_type(stmt, i)) { + c.SQLITE_NULL => try writer.writeAll("null"), + c.SQLITE_INTEGER => try writer.print("{d}", .{c.sqlite3_column_int64(stmt, i)}), + c.SQLITE_FLOAT => { + const f = c.sqlite3_column_double(stmt, i); + if (f == @trunc(f) and !std.math.isInf(f) and !std.math.isNan(f)) { + try writer.print("{d}", .{@as(i64, @intFromFloat(f))}); + } else { + try writer.print("{d}", .{f}); + } + }, + else => { + const ptr = c.sqlite3_column_text(stmt, i); + if (ptr != null) { + try writeJsonString(writer, std.mem.span(@as([*:0]const u8, @ptrCast(ptr)))); + } else { + try writer.writeAll("null"); + } + }, + } + } + try writer.writeAll("}\n"); +} diff --git a/src/main.zig b/src/main.zig index 94f4edd..aac0fae 100644 --- a/src/main.zig +++ b/src/main.zig @@ -1,8 +1,7 @@ const std = @import("std"); -const c = @cImport({ - @cInclude("sqlite3.h"); -}); +const c = @import("c"); const csv = @import("csv.zig"); +const json = @import("json.zig"); const build_options = @import("build_options"); const VERSION: []const u8 = build_options.version; @@ -11,6 +10,8 @@ const VERSION: []const u8 = build_options.version; /// caller-managed and SQLite must not attempt to free it. const sqlite_static: c.sqlite3_destructor_type = null; +/// SQLITE_TRANSIENT sentinel: tells sqlite3_bind_text to copy the string +/// immediately (safe for short-lived source buffers, e.g. JSON arena data). // ─── Error types ───────────────────────────────────── const SqlPipeError = error{ @@ -19,6 +20,8 @@ const SqlPipeError = error{ IncompatibleFlags, ColumnsWithQuery, InvalidMaxRows, + InvalidInputFormat, + InvalidOutputFormat, OpenDbFailed, EmptyInput, EmptyColumnName, @@ -57,6 +60,12 @@ const ExitCode = enum(u8) { sql_error = 3, }; +/// Supported input formats. +const InputFormat = enum { csv, json, ndjson }; + +/// Supported output formats. +const OutputFormat = enum { csv, json, ndjson }; + /// Parsed command-line arguments. const ParsedArgs = struct { /// SQL query to execute against table `t`. @@ -65,13 +74,15 @@ const ParsedArgs = struct { type_inference: bool, /// CSV field delimiter (default: ','). delimiter: u8, - /// Emit column names as first output row when true. + /// Emit column names as first output row when true (CSV output only). header: bool, - /// Output results as a JSON array of objects when true. - json: bool, + /// Input format (default: csv). + input_format: InputFormat, + /// Output format (default: csv). + output_format: OutputFormat, /// Abort with exit 1 when more than this many data rows are read; null = unlimited. max_rows: ?usize, - /// Print "Loaded rows" to stderr after all CSV rows are inserted when true. + /// Print "Loaded rows" to stderr after all rows are inserted when true. /// When false, the message is still shown automatically when stderr is a TTY. verbose: bool, /// Write results to this file path instead of stdout; null = write to stdout. @@ -84,6 +95,8 @@ const ColumnsArgs = struct { delimiter: u8, /// Show inferred type alongside name when true. verbose: bool, + /// Input format (default: csv). + input_format: InputFormat, }; /// Result of argument parsing — either parsed arguments or a special action. @@ -107,29 +120,31 @@ fn printUsage(writer: *std.Io.Writer) !void { try writer.writeAll( \\Usage: sql-pipe [OPTIONS] \\ - \\Reads CSV from stdin, loads it into an in-memory SQLite table `t`, - \\runs , and prints results as CSV to stdout. + \\Reads input from stdin, loads it into an in-memory SQLite table `t`, + \\runs , and prints results to stdout. \\ \\Options: - \\ -d, --delimiter Input field delimiter (default: ,) - \\ --tsv Alias for --delimiter '\t' - \\ --no-type-inference Treat all columns as TEXT (skip auto-detection) - \\ -H, --header Print column names as the first output row - \\ --json Output results as a JSON array of objects - \\ --max-rows Stop if more than data rows are read (exit 1) - \\ -v, --verbose Force row count to stderr (shown automatically on TTY) - \\ With --columns: show inferred type per column - \\ --columns List column names from header (one per line) and exit - \\ Combine with -v/--verbose to include inferred types - \\ Cannot be combined with --output or a query argument - \\ --output Write results to file instead of stdout - \\ -h, --help Show this help message and exit - \\ -V, --version Show version and exit + \\ -d, --delimiter Input field delimiter for CSV (default: ,) + \\ --tsv Alias for --delimiter '\t' + \\ -I, --input-format Input format: csv (default), json, ndjson + \\ -O, --output-format Output format: csv (default), json, ndjson + \\ --json Alias for --output-format json + \\ --no-type-inference Treat all columns as TEXT (CSV input only) + \\ -H, --header Print column names as the first output row (CSV output only) + \\ --max-rows Stop if more than data rows are read (exit 1) + \\ -v, --verbose Force row count to stderr (shown automatically on TTY) + \\ With --columns: show inferred type per column + \\ --columns List column names from input header (one per line) and exit + \\ Combine with -v/--verbose to include inferred types + \\ Cannot be combined with --output or a query argument + \\ --output Write results to file instead of stdout + \\ -h, --help Show this help message and exit + \\ -V, --version Show version and exit \\ \\Exit codes: \\ 0 Success \\ 1 Usage error (missing query, bad arguments) - \\ 2 CSV parse error + \\ 2 Input parse error \\ 3 SQL error \\ \\Examples: @@ -137,7 +152,9 @@ fn printUsage(writer: *std.Io.Writer) !void { \\ cat data.tsv | sql-pipe --tsv 'SELECT * FROM t' \\ cat data.psv | sql-pipe -d '|' 'SELECT * FROM t' \\ cat data.csv | sql-pipe 'SELECT region, SUM(revenue) FROM t GROUP BY region' - \\ cat data.csv | sql-pipe --json 'SELECT * FROM t' + \\ cat data.csv | sql-pipe --output-format json 'SELECT * FROM t' + \\ cat data.json | sql-pipe --input-format json 'SELECT * FROM t' + \\ cat data.ndjson | sql-pipe -I ndjson -O ndjson 'SELECT name FROM t WHERE age > 18' \\ ); } @@ -152,23 +169,45 @@ fn parseDelimiter(value: []const u8) SqlPipeError!u8 { return value[0]; } +/// parseInputFormat(s) → InputFormat +/// Pre: s is the format string provided by the user +/// Post: result is the matching InputFormat +/// error.InvalidInputFormat when s is not "csv", "json", or "ndjson" +fn parseInputFormat(s: []const u8) SqlPipeError!InputFormat { + if (std.mem.eql(u8, s, "csv")) return .csv; + if (std.mem.eql(u8, s, "json")) return .json; + if (std.mem.eql(u8, s, "ndjson")) return .ndjson; + return error.InvalidInputFormat; +} + +/// parseOutputFormat(s) → OutputFormat +/// Pre: s is the format string provided by the user +/// Post: result is the matching OutputFormat +/// error.InvalidOutputFormat when s is not "csv", "json", or "ndjson" +fn parseOutputFormat(s: []const u8) SqlPipeError!OutputFormat { + if (std.mem.eql(u8, s, "csv")) return .csv; + if (std.mem.eql(u8, s, "json")) return .json; + if (std.mem.eql(u8, s, "ndjson")) return .ndjson; + return error.InvalidOutputFormat; +} + /// parseArgs(args) → ArgsResult /// Pre: args is the full process argument slice; args[0] is the program name /// Post: result.parsed.query is the first non-flag argument /// result.parsed.type_inference = false when "--no-type-inference" is present -/// result.parsed.json = true when "--json" is present +/// result.parsed.output_format = .json when "--json" or "--output-format json" is present /// result = .help when --help or -h is present /// result = .version when --version or -V is present /// error.MissingQuery when no non-flag argument is found -/// error.IncompatibleFlags when --json is combined with --header +/// error.IncompatibleFlags when a non-CSV output format is combined with --header fn parseArgs(args: []const [:0]const u8) SqlPipeError!ArgsResult { var query: ?[]const u8 = null; var type_inference = true; var delimiter: u8 = ','; var header = false; - var json = false; - var explicit_delimiter = false; - var explicit_tsv = false; + var input_format: InputFormat = .csv; + var output_format: OutputFormat = .csv; + var max_rows: ?usize = null; var verbose = false; var list_columns = false; @@ -179,7 +218,8 @@ fn parseArgs(args: []const [:0]const u8) SqlPipeError!ArgsResult { // type_inference reflects the presence of --no-type-inference; // delimiter reflects -d/--delimiter/--tsv if present; // header reflects the presence of --header/-H; - // json reflects the presence of --json; + // output_format reflects the last --output-format/--json flag seen; + // input_format reflects the last --input-format flag seen; // max_rows reflects the presence of --max-rows // Bounding function: args.len - i var i: usize = 1; @@ -191,24 +231,36 @@ fn parseArgs(args: []const [:0]const u8) SqlPipeError!ArgsResult { return .version; } else if (std.mem.eql(u8, arg, "--tsv")) { delimiter = '\t'; - explicit_tsv = true; } else if (std.mem.eql(u8, arg, "-d") or std.mem.eql(u8, arg, "--delimiter")) { i += 1; if (i >= args.len) return error.InvalidDelimiter; delimiter = try parseDelimiter(args[i]); - explicit_delimiter = true; } else if (std.mem.startsWith(u8, arg, "--delimiter=")) { delimiter = try parseDelimiter(arg["--delimiter=".len..]); - explicit_delimiter = true; } else if (std.mem.startsWith(u8, arg, "-d=")) { delimiter = try parseDelimiter(arg["-d=".len..]); - explicit_delimiter = true; } else if (std.mem.eql(u8, arg, "--no-type-inference")) { type_inference = false; } else if (std.mem.eql(u8, arg, "--header") or std.mem.eql(u8, arg, "-H")) { header = true; } else if (std.mem.eql(u8, arg, "--json")) { - json = true; + output_format = .json; + } else if (std.mem.eql(u8, arg, "-I") or std.mem.eql(u8, arg, "--input-format")) { + i += 1; + if (i >= args.len) return error.InvalidInputFormat; + input_format = try parseInputFormat(args[i]); + } else if (std.mem.startsWith(u8, arg, "--input-format=")) { + input_format = try parseInputFormat(arg["--input-format=".len..]); + } else if (std.mem.startsWith(u8, arg, "-I=")) { + input_format = try parseInputFormat(arg["-I=".len..]); + } else if (std.mem.eql(u8, arg, "-O") or std.mem.eql(u8, arg, "--output-format")) { + i += 1; + if (i >= args.len) return error.InvalidOutputFormat; + output_format = try parseOutputFormat(args[i]); + } else if (std.mem.startsWith(u8, arg, "--output-format=")) { + output_format = try parseOutputFormat(arg["--output-format=".len..]); + } else if (std.mem.startsWith(u8, arg, "-O=")) { + output_format = try parseOutputFormat(arg["-O=".len..]); } else if (std.mem.eql(u8, arg, "--max-rows")) { i += 1; if (i >= args.len) return error.InvalidMaxRows; @@ -236,8 +288,8 @@ fn parseArgs(args: []const [:0]const u8) SqlPipeError!ArgsResult { } } - // --json is mutually exclusive with --header (both affect output format) - if (json and header) + // Non-CSV output format is mutually exclusive with --header + if (output_format != .csv and header) return error.IncompatibleFlags; // --output is mutually exclusive with --columns (--columns always writes to stdout) @@ -250,14 +302,19 @@ fn parseArgs(args: []const [:0]const u8) SqlPipeError!ArgsResult { // --columns mode: list headers and exit if (list_columns) - return .{ .columns = ColumnsArgs{ .delimiter = delimiter, .verbose = verbose } }; + return .{ .columns = ColumnsArgs{ + .delimiter = delimiter, + .verbose = verbose, + .input_format = input_format, + } }; return .{ .parsed = ParsedArgs{ .query = query orelse return error.MissingQuery, .type_inference = type_inference, .delimiter = delimiter, .header = header, - .json = json, + .input_format = input_format, + .output_format = output_format, .max_rows = max_rows, .verbose = verbose, .output = output, @@ -675,89 +732,15 @@ fn printHeaderRow( try writer.writeByte('\n'); } -/// writeJsonString(writer, s) → !void -/// Pre: writer is valid, s is a UTF-8 slice -/// Post: s is written as a JSON string literal (double-quoted, with special -/// characters escaped per RFC 8259: \", \\, \/, \b, \f, \n, \r, \t, -/// and \uXXXX for control characters 0x00–0x1F) -fn writeJsonString(writer: *std.Io.Writer, s: []const u8) !void { - try writer.writeByte('"'); - for (s) |ch| { - switch (ch) { - '"' => try writer.writeAll("\\\""), - '\\' => try writer.writeAll("\\\\"), - '/' => try writer.writeAll("\\/"), - '\x08' => try writer.writeAll("\\b"), - '\x0C' => try writer.writeAll("\\f"), - '\n' => try writer.writeAll("\\n"), - '\r' => try writer.writeAll("\\r"), - '\t' => try writer.writeAll("\\t"), - 0x00...0x07, 0x0B, 0x0E...0x1F => try writer.print("\\u{x:0>4}", .{ch}), - else => try writer.writeByte(ch), - } - } - try writer.writeByte('"'); -} - -/// printJsonRow(stmt, col_count, col_names, writer, is_first) → !void -/// Pre: sqlite3_step returned SQLITE_ROW for stmt -/// col_count > 0; col_names.len = col_count -/// is_first indicates whether this is the first row (no leading comma) -/// Post: one JSON object written to writer as { "col": value, … } -/// NULL cells are written as JSON null -/// INTEGER / REAL columns written as JSON numbers -/// TEXT columns written as JSON strings -fn printJsonRow( - stmt: *c.sqlite3_stmt, - col_count: c_int, - col_names: []const [*:0]const u8, - writer: *std.Io.Writer, - is_first: bool, -) !void { - if (!is_first) try writer.writeByte(','); - try writer.writeByte('{'); - // Loop invariant I: columns 0..i-1 have been written as "name":value pairs - // Bounding function: col_count - i - var i: c_int = 0; - while (i < col_count) : (i += 1) { - if (i > 0) try writer.writeByte(','); - const name = std.mem.span(col_names[@intCast(i)]); - try writeJsonString(writer, name); - try writer.writeByte(':'); - switch (c.sqlite3_column_type(stmt, i)) { - c.SQLITE_NULL => try writer.writeAll("null"), - c.SQLITE_INTEGER => try writer.print("{d}", .{c.sqlite3_column_int64(stmt, i)}), - c.SQLITE_FLOAT => { - const f = c.sqlite3_column_double(stmt, i); - // Emit as integer notation when value has no fractional part, - // otherwise use full precision float. - if (f == @trunc(f) and !std.math.isInf(f) and !std.math.isNan(f)) { - try writer.print("{d}", .{@as(i64, @intFromFloat(f))}); - } else { - try writer.print("{d}", .{f}); - } - }, - else => { // SQLITE_TEXT and SQLITE_BLOB → emit as string - const ptr = c.sqlite3_column_text(stmt, i); - if (ptr != null) { - try writeJsonString(writer, std.mem.span(@as([*:0]const u8, @ptrCast(ptr)))); - } else { - try writer.writeAll("null"); - } - }, - } - } - try writer.writeByte('}'); -} - -/// execQuery(db, query, allocator, writer, header, json) → !void +/// execQuery(db, query, allocator, writer, header, output_format) → !void /// Pre: db is open with table `t` populated /// query is a valid SQL string (not null-terminated) /// allocator is valid -/// when json = true, header and delimiter flags must not be set (caller's responsibility) -/// Post: if json = true, results are written as a JSON array of objects -/// if header = true (and json = false), column names written as the first CSV row -/// all result rows written to writer as CSV lines via printRow (when json = false) +/// when output_format = .json or .ndjson, header must not be set (caller's responsibility) +/// Post: if output_format = .json, results are written as a JSON array of objects +/// if output_format = .ndjson, results are written as one JSON object per line +/// if header = true (and output_format = .csv), column names written as the first CSV row +/// all result rows written to writer as CSV lines via printRow (when output_format = .csv) /// error.PrepareQueryFailed when sqlite3_prepare_v2 returns non-SQLITE_OK /// propagates any writer I/O error fn execQuery( @@ -766,7 +749,7 @@ fn execQuery( query: []const u8, writer: *std.Io.Writer, header: bool, - json: bool, + output_format: OutputFormat, ) (SqlPipeError || std.mem.Allocator.Error || std.Io.Writer.Error)!void { const query_z = try allocator.dupeZ(u8, query); defer allocator.free(query_z); @@ -778,35 +761,52 @@ fn execQuery( const col_count = c.sqlite3_column_count(stmt); - if (json) { - // Collect column names before stepping (sqlite3_column_name is valid before step) - var col_names = try allocator.alloc([*:0]const u8, @intCast(col_count)); - defer allocator.free(col_names); - var ci: c_int = 0; - while (ci < col_count) : (ci += 1) { - col_names[@intCast(ci)] = c.sqlite3_column_name(stmt, ci); - } + switch (output_format) { + .json => { + // Collect column names before stepping (sqlite3_column_name is valid before step) + var col_names = try allocator.alloc([*:0]const u8, @intCast(col_count)); + defer allocator.free(col_names); + var ci: c_int = 0; + while (ci < col_count) : (ci += 1) { + col_names[@intCast(ci)] = c.sqlite3_column_name(stmt, ci); + } - try writer.writeByte('['); - var first = true; - // Loop invariant I: all SQLITE_ROW results returned so far have been printed as JSON objects - // Bounding function: number of remaining rows in the result set (finite) - while (c.sqlite3_step(stmt) == c.SQLITE_ROW) { - try printJsonRow(stmt.?, col_count, col_names, writer, first); - first = false; - } - try writer.writeAll("]\n"); - } else { - // When header is requested, print column names before data rows - if (header and col_count > 0) { - try printHeaderRow(stmt.?, col_count, writer); - } + try writer.writeByte('['); + var first = true; + // Loop invariant I: all SQLITE_ROW results returned so far have been printed as JSON objects + // Bounding function: number of remaining rows in the result set (finite) + while (c.sqlite3_step(stmt) == c.SQLITE_ROW) { + try json.printJsonRow(stmt.?, col_count, col_names, writer, first); + first = false; + } + try writer.writeAll("]\n"); + }, + .ndjson => { + // Collect column names before stepping + var col_names = try allocator.alloc([*:0]const u8, @intCast(col_count)); + defer allocator.free(col_names); + var ci: c_int = 0; + while (ci < col_count) : (ci += 1) { + col_names[@intCast(ci)] = c.sqlite3_column_name(stmt, ci); + } + // Loop invariant I: all SQLITE_ROW results returned so far have been printed as NDJSON lines + // Bounding function: number of remaining rows in the result set (finite) + while (c.sqlite3_step(stmt) == c.SQLITE_ROW) { + try json.printNdjsonRow(stmt.?, col_count, col_names, writer); + } + }, + .csv => { + // When header is requested, print column names before data rows + if (header and col_count > 0) { + try printHeaderRow(stmt.?, col_count, writer); + } - // Loop invariant I: all SQLITE_ROW results returned so far have been printed - // Bounding function: number of remaining rows in the result set (finite) - while (c.sqlite3_step(stmt) == c.SQLITE_ROW) { - try printRow(stmt.?, col_count, writer); - } + // Loop invariant I: all SQLITE_ROW results returned so far have been printed + // Bounding function: number of remaining rows in the result set (finite) + while (c.sqlite3_step(stmt) == c.SQLITE_ROW) { + try printRow(stmt.?, col_count, writer); + } + }, } } @@ -983,122 +983,20 @@ fn fatalSqlWithContext( std.process.exit(@intFromEnum(ExitCode.sql_error)); } -/// runColumns(args, allocator, io, stderr_writer, stdout_writer) → void -/// Pre: args.delimiter is valid; allocator and writers are valid -/// Post: column names from stdin CSV header row are written to stdout, one per line; -/// when args.verbose is true, each line has format " " where TYPE -/// is inferred from the first 100 data rows (INTEGER, REAL, or TEXT) -/// error messages go to stderr; process exits 0 on success, 2 on CSV error -fn runColumns( - args: ColumnsArgs, +/// loadCsvInput loads all CSV rows from stdin into db table `t`. +/// Pre: db is an open in-memory SQLite handle with no tables yet +/// parsed.delimiter is valid; allocator and writers are valid +/// Post: table `t` exists in db with columns inferred from the CSV header; +/// all CSV rows have been inserted; transaction has been committed +/// returns rows_inserted (data rows only, header not counted) +/// on error: writes message to stderr_writer and exits with appropriate code +fn loadCsvInput( allocator: std.mem.Allocator, io: std.Io, - stderr_writer: *std.Io.Writer, - stdout_writer: *std.Io.Writer, -) void { - var stdin_buf: [4096]u8 = undefined; - var stdin_file_reader = std.Io.File.reader(std.Io.File.stdin(), io, &stdin_buf); - var csv_reader = csv.csvReaderWithDelimiter(allocator, &stdin_file_reader.interface, args.delimiter); - - const header_record = csv_reader.nextRecord() catch |err| switch (err) { - error.UnterminatedQuotedField => fatal("row 1: unterminated quoted field", stderr_writer, .csv_error, .{}), - else => fatal("row 1: failed to parse CSV header", stderr_writer, .csv_error, .{}), - } orelse fatal("empty input (no header row)", stderr_writer, .csv_error, .{}); - defer csv_reader.freeRecord(header_record); - - const cols = parseHeader(allocator, header_record, stderr_writer) catch |err| { - switch (err) { - error.EmptyColumnName => fatal("row 1: empty column name in header", stderr_writer, .csv_error, .{}), - error.NoColumns => fatal("row 1: no columns found in header", stderr_writer, .csv_error, .{}), - else => fatal("row 1: failed to parse header", stderr_writer, .csv_error, .{}), - } - }; - defer { - for (cols) |col| allocator.free(col); - allocator.free(cols); - } - // {A1: cols is a non-empty slice of trimmed, BOM-free column names} - - if (args.verbose) { - // Read up to inference_buffer_size rows for type inference - var row_buffer: std.ArrayList([][]u8) = .empty; - defer { - for (row_buffer.items) |row| csv_reader.freeRecord(row); - row_buffer.deinit(allocator); - } - var data_row: usize = 1; // row 1 = header already read; data rows start at 2 - // Loop invariant I: row_buffer.items.len ≤ inference_buffer_size - // all items are valid parsed CSV records - // data_row = 1 + number of data rows attempted so far - // Bounding function: inference_buffer_size - row_buffer.items.len - // (decreases for each non-empty row appended; empty rows are counted by - // data_row but do not move the buffer toward the bound — stream must - // be finite for termination) - while (row_buffer.items.len < inference_buffer_size) { - data_row += 1; - const rec = csv_reader.nextRecord() catch |err| switch (err) { - error.UnterminatedQuotedField => fatal( - "row {d}: unterminated quoted field", - stderr_writer, - .csv_error, - .{data_row}, - ), - else => fatal( - "row {d}: failed to parse CSV", - stderr_writer, - .csv_error, - .{data_row}, - ), - } orelse break; - if (rec.len == 0) { - csv_reader.freeRecord(rec); - continue; - } - row_buffer.append(allocator, rec) catch - fatal("out of memory while buffering rows", stderr_writer, .csv_error, .{}); - } - const types = inferTypes(allocator, row_buffer.items, cols.len) catch - fatal("out of memory during type inference", stderr_writer, .csv_error, .{}); - defer allocator.free(types); - - // Loop invariant I: cols[0..i] have been written with type annotation to stdout - // Bounding function: cols.len - i - for (cols, types) |col, t| { - stdout_writer.print("{s} {s}\n", .{ col, @tagName(t) }) catch |err| { - std.log.err("failed to write output: {}", .{err}); - }; - } - } else { - // Loop invariant I: cols[0..i] have been written to stdout - // Bounding function: cols.len - i - for (cols) |col| { - stdout_writer.print("{s}\n", .{col}) catch |err| { - std.log.err("failed to write output: {}", .{err}); - }; - } - } -} - -/// run(parsed, allocator, stderr_writer, stdout_writer) → void -/// Pre: parsed contains a valid query; allocator and writers are valid -/// Post: CSV from stdin has been loaded, query executed, results written to stdout -/// On error, an "error: ..." message is written to stderr and process -/// exits with the appropriate ExitCode (1, 2, or 3) -fn run( + db: *c.sqlite3, parsed: ParsedArgs, - allocator: std.mem.Allocator, - io: std.Io, stderr_writer: *std.Io.Writer, - stdout_writer: *std.Io.Writer, -) void { - const query = parsed.query; - // {A1: query is the SQL string; parsed.type_inference indicates buffer-first mode} - - const db = openDb() catch - fatal("failed to open in-memory database", stderr_writer, .sql_error, .{}); - defer _ = c.sqlite3_close(db); - // {A2: db is an open, empty in-memory SQLite database} - +) usize { var stdin_buf: [4096]u8 = undefined; var stdin_file_reader = std.Io.File.reader(std.Io.File.stdin(), io, &stdin_buf); var csv_reader = csv.csvReaderWithDelimiter(allocator, &stdin_file_reader.interface, parsed.delimiter); @@ -1109,23 +1007,18 @@ fn run( } orelse fatal("empty input (no header row)", stderr_writer, .csv_error, .{}); defer csv_reader.freeRecord(header_record); - const cols = parseHeader(allocator, header_record, stderr_writer) catch |err| { - switch (err) { - error.EmptyColumnName => fatal("row 1: empty column name in header", stderr_writer, .csv_error, .{}), - error.NoColumns => fatal("row 1: no columns found in header", stderr_writer, .csv_error, .{}), - else => fatal("row 1: failed to parse header", stderr_writer, .csv_error, .{}), - } + const cols = parseHeader(allocator, header_record, stderr_writer) catch |err| switch (err) { + error.EmptyColumnName => fatal("row 1: empty column name in header", stderr_writer, .csv_error, .{}), + error.NoColumns => fatal("row 1: no columns found in header", stderr_writer, .csv_error, .{}), + else => fatal("row 1: failed to parse header", stderr_writer, .csv_error, .{}), }; defer { for (cols) |col| allocator.free(col); allocator.free(cols); } - // {A3: cols is a non-empty list of trimmed, BOM-free column names} const num_cols = cols.len; - - const is_tty = std.Io.File.isTty(std.Io.File.stderr(), io) catch false; - const start_ts = std.Io.Timestamp.now(io, .awake); + var csv_row_count: usize = 1; // 1 = header already read // ─── Phase 1: determine column types ───────────────────────────────────── var row_buffer: std.ArrayList([][]u8) = .empty; @@ -1134,8 +1027,6 @@ fn run( row_buffer.deinit(allocator); } - var csv_row_count: usize = 1; // 1 = header already read - const types: []ColumnType = if (parsed.type_inference) blk: { while (row_buffer.items.len < inference_buffer_size) { const rec = csv_reader.nextRecord() catch |err| switch (err) { @@ -1174,7 +1065,6 @@ fn run( createTable(allocator, db, cols, types) catch fatal("{s}", stderr_writer, .sql_error, .{std.mem.span(c.sqlite3_errmsg(db))}); - // {A5: table `t` exists in db with num_cols columns typed per `types`} { var errmsg: [*c]u8 = null; @@ -1183,28 +1073,26 @@ fn run( fatalSqlWithContext(allocator, db, msg, stderr_writer); } } - // {A6: an active transaction is open on db} const stmt = prepareInsert(allocator, db, num_cols) catch fatalSqlWithContext(allocator, db, std.mem.span(c.sqlite3_errmsg(db)), stderr_writer); defer _ = c.sqlite3_finalize(stmt); - // Insert buffered rows + const is_tty = std.Io.File.isTty(std.Io.File.stderr(), io) catch false; var rows_inserted: usize = 0; + + // Insert buffered rows for (row_buffer.items) |row| { rows_inserted += 1; if (parsed.max_rows) |limit| { - if (rows_inserted > limit) { + if (rows_inserted > limit) fatal("input exceeds --max-rows limit ({d} rows)", stderr_writer, .usage, .{limit}); - } } insertRowTyped(stmt, db, row, types, @intCast(num_cols)) catch fatalSqlWithContext(allocator, db, std.mem.span(c.sqlite3_errmsg(db)), stderr_writer); - if (is_tty and rows_inserted % progress_interval == 0) { + if (is_tty and rows_inserted % progress_interval == 0) printProgress(stderr_writer, rows_inserted, parsed.max_rows); - } } - // {A7: all buffered rows are in t} // Stream remaining rows from stdin while (true) { @@ -1229,17 +1117,14 @@ fn run( rows_inserted += 1; if (parsed.max_rows) |limit| { - if (rows_inserted > limit) { + if (rows_inserted > limit) fatal("input exceeds --max-rows limit ({d} rows)", stderr_writer, .usage, .{limit}); - } } insertRowTyped(stmt, db, record, types, @intCast(num_cols)) catch fatalSqlWithContext(allocator, db, std.mem.span(c.sqlite3_errmsg(db)), stderr_writer); - if (is_tty and rows_inserted % progress_interval == 0) { + if (is_tty and rows_inserted % progress_interval == 0) printProgress(stderr_writer, rows_inserted, parsed.max_rows); - } } - // {A8: all stdin CSV rows are inserted into t; transaction is still active} { var errmsg: [*c]u8 = null; @@ -1250,10 +1135,210 @@ fn run( } if (errmsg != null) c.sqlite3_free(errmsg); } - // {A9: transaction committed; t holds all input rows, no active transaction} + + return rows_inserted; +} + +/// runColumns(args, allocator, io, stderr_writer, stdout_writer) → void +/// Pre: args is valid; allocator and writers are valid +/// Post: column names from the input header (CSV/JSON/NDJSON) are written to stdout, +/// one per line; when args.verbose is true each line has format " " +/// (CSV only — JSON/NDJSON always show TEXT); exits 0 on success, 2 on parse error +fn runColumns( + args: ColumnsArgs, + allocator: std.mem.Allocator, + io: std.Io, + stderr_writer: *std.Io.Writer, + stdout_writer: *std.Io.Writer, +) void { + switch (args.input_format) { + .csv => { + var stdin_buf: [4096]u8 = undefined; + var stdin_file_reader = std.Io.File.reader(std.Io.File.stdin(), io, &stdin_buf); + var csv_reader = csv.csvReaderWithDelimiter(allocator, &stdin_file_reader.interface, args.delimiter); + + const header_record = csv_reader.nextRecord() catch |err| switch (err) { + error.UnterminatedQuotedField => fatal("row 1: unterminated quoted field", stderr_writer, .csv_error, .{}), + else => fatal("row 1: failed to parse CSV header", stderr_writer, .csv_error, .{}), + } orelse fatal("empty input (no header row)", stderr_writer, .csv_error, .{}); + defer csv_reader.freeRecord(header_record); + + const cols = parseHeader(allocator, header_record, stderr_writer) catch |err| switch (err) { + error.EmptyColumnName => fatal("row 1: empty column name in header", stderr_writer, .csv_error, .{}), + error.NoColumns => fatal("row 1: no columns found in header", stderr_writer, .csv_error, .{}), + else => fatal("row 1: failed to parse header", stderr_writer, .csv_error, .{}), + }; + defer { + for (cols) |col| allocator.free(col); + allocator.free(cols); + } + + if (args.verbose) { + var row_buffer: std.ArrayList([][]u8) = .empty; + defer { + for (row_buffer.items) |row| csv_reader.freeRecord(row); + row_buffer.deinit(allocator); + } + var data_row: usize = 1; + while (row_buffer.items.len < inference_buffer_size) { + data_row += 1; + const rec = csv_reader.nextRecord() catch |err| switch (err) { + error.UnterminatedQuotedField => fatal( + "row {d}: unterminated quoted field", + stderr_writer, + .csv_error, + .{data_row}, + ), + else => fatal("row {d}: failed to parse CSV", stderr_writer, .csv_error, .{data_row}), + } orelse break; + if (rec.len == 0) { + csv_reader.freeRecord(rec); + continue; + } + row_buffer.append(allocator, rec) catch + fatal("out of memory while buffering rows", stderr_writer, .csv_error, .{}); + } + const types = inferTypes(allocator, row_buffer.items, cols.len) catch + fatal("out of memory during type inference", stderr_writer, .csv_error, .{}); + defer allocator.free(types); + for (cols, types) |col, t| { + stdout_writer.print("{s} {s}\n", .{ col, @tagName(t) }) catch |err| { + std.log.err("failed to write output: {}", .{err}); + }; + } + } else { + for (cols) |col| { + stdout_writer.print("{s}\n", .{col}) catch |err| { + std.log.err("failed to write output: {}", .{err}); + }; + } + } + }, + .json => { + var stdin_buf: [4096]u8 = undefined; + var stdin_file_reader = std.Io.File.reader(std.Io.File.stdin(), io, &stdin_buf); + + var buf: std.ArrayList(u8) = .empty; + defer buf.deinit(allocator); + while (true) { + const byte = stdin_file_reader.interface.takeByte() catch |err| switch (err) { + error.EndOfStream => break, + error.ReadFailed => fatal("failed to read JSON input", stderr_writer, .csv_error, .{}), + }; + buf.append(allocator, byte) catch fatal("out of memory reading JSON", stderr_writer, .csv_error, .{}); + } + if (buf.items.len == 0) fatal("empty input", stderr_writer, .csv_error, .{}); + + var parsed = std.json.parseFromSlice(std.json.Value, allocator, buf.items, .{}) catch + fatal("failed to parse JSON input", stderr_writer, .csv_error, .{}); + defer parsed.deinit(); + + const array = switch (parsed.value) { + .array => |a| a, + else => fatal("JSON input must be an array of objects", stderr_writer, .csv_error, .{}), + }; + if (array.items.len == 0) fatal("empty JSON array: cannot determine column names", stderr_writer, .csv_error, .{}); + + const first_obj = switch (array.items[0]) { + .object => |o| o, + else => fatal("JSON array elements must be objects", stderr_writer, .csv_error, .{}), + }; + + var ki = first_obj.iterator(); + while (ki.next()) |entry| { + if (args.verbose) { + stdout_writer.print("{s} TEXT\n", .{entry.key_ptr.*}) catch |err| { + std.log.err("failed to write output: {}", .{err}); + }; + } else { + stdout_writer.print("{s}\n", .{entry.key_ptr.*}) catch |err| { + std.log.err("failed to write output: {}", .{err}); + }; + } + } + }, + .ndjson => { + var stdin_buf: [4096]u8 = undefined; + var stdin_file_reader = std.Io.File.reader(std.Io.File.stdin(), io, &stdin_buf); + + // Read until we find a non-empty line + var line_num: usize = 0; + while (true) { + line_num += 1; + const line = json.readLine(allocator, &stdin_file_reader.interface) catch |err| switch (err) { + error.OutOfMemory => fatal("out of memory reading NDJSON", stderr_writer, .csv_error, .{}), + error.ReadFailed => fatal("line {d}: failed to read NDJSON", stderr_writer, .csv_error, .{line_num}), + } orelse fatal("empty NDJSON input", stderr_writer, .csv_error, .{}); + defer allocator.free(line); + + const trimmed = std.mem.trim(u8, line, " \t\r"); + if (trimmed.len == 0) { line_num -= 1; continue; } + + var parsed = std.json.parseFromSlice(std.json.Value, allocator, trimmed, .{}) catch + fatal("line 1: failed to parse NDJSON", stderr_writer, .csv_error, .{}); + defer parsed.deinit(); + + const obj = switch (parsed.value) { + .object => |o| o, + else => fatal("line 1: NDJSON element must be a JSON object", stderr_writer, .csv_error, .{}), + }; + + var ki = obj.iterator(); + while (ki.next()) |entry| { + if (args.verbose) { + stdout_writer.print("{s} TEXT\n", .{entry.key_ptr.*}) catch |err| { + std.log.err("failed to write output: {}", .{err}); + }; + } else { + stdout_writer.print("{s}\n", .{entry.key_ptr.*}) catch |err| { + std.log.err("failed to write output: {}", .{err}); + }; + } + } + break; + } + }, + } +} + +/// run(parsed, allocator, io, stderr_writer, stdout_writer) → void +/// Pre: parsed contains a valid query; allocator and writers are valid +/// Post: input from stdin has been loaded (dispatched on parsed.input_format), +/// query executed, results written to stdout in parsed.output_format +/// On error, an "error: ..." message is written to stderr and process +/// exits with the appropriate ExitCode (1, 2, or 3) +fn run( + parsed: ParsedArgs, + allocator: std.mem.Allocator, + io: std.Io, + stderr_writer: *std.Io.Writer, + stdout_writer: *std.Io.Writer, +) void { + const query = parsed.query; + + const db = openDb() catch + fatal("failed to open in-memory database", stderr_writer, .sql_error, .{}); + defer _ = c.sqlite3_close(db); + + const start_ts = std.Io.Timestamp.now(io, .awake); + + // Load input into `t` — dispatch on input format + const rows_inserted: usize = switch (parsed.input_format) { + .csv => loadCsvInput(allocator, io, db, parsed, stderr_writer), + .json => blk: { + var stdin_buf: [4096]u8 = undefined; + var stdin_reader = std.Io.File.reader(std.Io.File.stdin(), io, &stdin_buf); + break :blk json.loadJsonArray(allocator, &stdin_reader.interface, db, parsed.max_rows, stderr_writer); + }, + .ndjson => blk: { + var stdin_buf: [4096]u8 = undefined; + var stdin_reader = std.Io.File.reader(std.Io.File.stdin(), io, &stdin_buf); + break :blk json.loadNdjsonInput(allocator, &stdin_reader.interface, db, parsed.max_rows, stderr_writer); + }, + }; // Print row count and elapsed time to stderr when stderr is a TTY or --verbose is set. - // When progress was shown (TTY + ≥ progress_interval rows), clear the progress line first. + const is_tty = std.Io.File.isTty(std.Io.File.stderr(), io) catch false; if (parsed.verbose or is_tty) { const end_ts = std.Io.Timestamp.now(io, .awake); const elapsed_ns: i96 = end_ts.nanoseconds - start_ts.nanoseconds; @@ -1261,9 +1346,8 @@ fn run( var count_buf: [32]u8 = undefined; const count_str = fmtThousands(&count_buf, rows_inserted); const secs = elapsed_ms / 1000; - const frac = (elapsed_ms % 1000) / 100; // tenths of a second + const frac = (elapsed_ms % 1000) / 100; if (is_tty and rows_inserted >= progress_interval) { - // Overwrite the in-progress line: move to column 0, erase to end of line stderr_writer.writeAll("\r\x1b[K") catch |err| std.log.err("failed to clear progress line: {}", .{err}); } stderr_writer.print("Loaded {s} rows in {d}.{d}s\n", .{ count_str, secs, frac }) catch |err| { @@ -1272,11 +1356,10 @@ fn run( stderr_writer.flush() catch |err| std.log.err("failed to flush stderr: {}", .{err}); } - execQuery(allocator, db, query, stdout_writer, parsed.header, parsed.json) catch { + execQuery(allocator, db, query, stdout_writer, parsed.header, parsed.output_format) catch { stdout_writer.flush() catch |err| std.log.err("failed to flush output before fatal: {}", .{err}); fatalSqlWithContext(allocator, db, std.mem.span(c.sqlite3_errmsg(db)), stderr_writer); }; - // {A10: all result rows written to stdout as CSV lines} } pub fn main(init: std.process.Init.Minimal) void { @@ -1302,9 +1385,9 @@ pub fn main(init: std.process.Init.Minimal) void { const args_result = parseArgs(args) catch |err| { switch (err) { error.IncompatibleFlags => { - stderr_writer.writeAll("error: --json cannot be combined with --header\n") catch |werr| { - std.log.err("failed to write error message: {}", .{werr}); - }; + stderr_writer.writeAll( + "error: --header cannot be combined with non-CSV output format\n", + ) catch |werr| std.log.err("failed to write error message: {}", .{werr}); stderr_writer.flush() catch |ferr| std.log.err("failed to flush: {}", .{ferr}); std.process.exit(@intFromEnum(ExitCode.usage)); }, @@ -1315,6 +1398,20 @@ pub fn main(init: std.process.Init.Minimal) void { stderr_writer.flush() catch |ferr| std.log.err("failed to flush: {}", .{ferr}); std.process.exit(@intFromEnum(ExitCode.usage)); }, + error.InvalidInputFormat => { + stderr_writer.writeAll( + "error: unknown input format; supported: csv, json, ndjson\n", + ) catch |werr| std.log.err("failed to write error message: {}", .{werr}); + stderr_writer.flush() catch |ferr| std.log.err("failed to flush: {}", .{ferr}); + std.process.exit(@intFromEnum(ExitCode.usage)); + }, + error.InvalidOutputFormat => { + stderr_writer.writeAll( + "error: unknown output format; supported: csv, json, ndjson\n", + ) catch |werr| std.log.err("failed to write error message: {}", .{werr}); + stderr_writer.flush() catch |ferr| std.log.err("failed to flush: {}", .{ferr}); + std.process.exit(@intFromEnum(ExitCode.usage)); + }, error.ColumnsWithQuery => { stderr_writer.writeAll("error: --columns cannot be combined with a query argument\n") catch |werr| { std.log.err("failed to write error message: {}", .{werr});