From f2e9ecfc3ec32b7e7765916afd2dccd06dd41b25 Mon Sep 17 00:00:00 2001 From: "Victor M. Varela" Date: Wed, 29 Apr 2026 14:01:20 +0200 Subject: [PATCH 1/5] feat: add --max-rows flag to limit input rows (#82) - Add --max-rows CLI option; exit with error if input exceeds the limit - Migrate to Zig 0.16.0 std.Io API (File.reader/writer, takeByte, flush) - Fix CsvReader: non-generic struct using *std.Io.Reader to avoid fieldParentPtr breakage when copying interface value - Flush stdout/stderr before all process.exit() calls to drain buffered writers --- README.md | 1 + build.zig | 30 +++++++++++- build.zig.zon | 2 +- docs/sql-pipe.1.scd | 5 ++ src/csv.zig | 88 +++++++++++++++++------------------- src/main.zig | 108 ++++++++++++++++++++++++++++++-------------- 6 files changed, 151 insertions(+), 83 deletions(-) diff --git a/README.md b/README.md index 6f8f686..33b509f 100644 --- a/README.md +++ b/README.md @@ -173,6 +173,7 @@ $ cat events.csv \ | `--no-type-inference` | Treat all columns as TEXT (skip auto-detection) | | `-H`, `--header` | Print column names as the first output row | | `--json` | Output results as a JSON array of objects (mutually exclusive with `-d`, `--tsv`, `-H`) | +| `--max-rows ` | Stop if more than `n` data rows are read (exit 1) | | `-h`, `--help` | Show usage help and exit | | `-V`, `--version` | Print version and exit | diff --git a/build.zig b/build.zig index 8c95e9a..abfe678 100644 --- a/build.zig +++ b/build.zig @@ -35,8 +35,8 @@ pub fn build(b: *std.Build) void { exe.root_module.addOptions("build_options", build_options); if (bundle_sqlite) { - exe.addIncludePath(b.path("lib")); - exe.addCSourceFile(.{ + exe.root_module.addIncludePath(b.path("lib")); + exe.root_module.addCSourceFile(.{ .file = b.path("lib/sqlite3.c"), .flags = &.{"-DSQLITE_OMIT_LOAD_EXTENSION=1"}, }); @@ -230,6 +230,32 @@ pub fn build(b: *std.Build) void { test_dup_col_stdout.step.dependOn(b.getInstallStep()); test_step.dependOn(&test_dup_col_stdout.step); + // Integration test 20: --max-rows under limit succeeds + const test_max_rows_under = b.addSystemCommand(&.{ + "bash", "-c", + \\printf 'name,age\nAlice,30\nBob,25\n' | ./zig-out/bin/sql-pipe --max-rows 5 'SELECT name FROM t ORDER BY name' | diff - <(printf 'Alice\nBob\n') + }); + test_max_rows_under.step.dependOn(b.getInstallStep()); + test_step.dependOn(&test_max_rows_under.step); + + // Integration test 21: --max-rows limit hit exits 1 with error message + const test_max_rows_hit = b.addSystemCommand(&.{ + "bash", "-c", + \\msg=$(printf 'name,age\nAlice,30\nBob,25\nCarol,35\n' | ./zig-out/bin/sql-pipe --max-rows 1 'SELECT * FROM t' 2>&1 >/dev/null; echo "EXIT:$?") + \\echo "$msg" | grep -q 'error: input exceeds --max-rows limit (1 rows)' && echo "$msg" | grep -q 'EXIT:1' + }); + test_max_rows_hit.step.dependOn(b.getInstallStep()); + test_step.dependOn(&test_max_rows_hit.step); + + // Integration test 22: --max-rows 0 exits 1 with error message + const test_max_rows_zero = b.addSystemCommand(&.{ + "bash", "-c", + \\msg=$(printf 'name,age\nAlice,30\n' | ./zig-out/bin/sql-pipe --max-rows 0 'SELECT * FROM t' 2>&1 >/dev/null; echo "EXIT:$?") + \\echo "$msg" | grep -q 'error: --max-rows must be a positive integer' && echo "$msg" | grep -q 'EXIT:1' + }); + test_max_rows_zero.step.dependOn(b.getInstallStep()); + test_step.dependOn(&test_max_rows_zero.step); + // Unit tests for the RFC 4180 CSV parser (src/csv.zig) const unit_tests = b.addTest(.{ .root_module = b.createModule(.{ diff --git a/build.zig.zon b/build.zig.zon index 929b23c..b4a7487 100644 --- a/build.zig.zon +++ b/build.zig.zon @@ -2,7 +2,7 @@ .name = .sql_pipe, .version = "0.2.0", .fingerprint = 0xf649b9ac95d768ab, - .minimum_zig_version = "0.15.0", + .minimum_zig_version = "0.16.0", .paths = .{"."}, .dependencies = .{}, } diff --git a/docs/sql-pipe.1.scd b/docs/sql-pipe.1.scd index a3143b4..14288ae 100644 --- a/docs/sql-pipe.1.scd +++ b/docs/sql-pipe.1.scd @@ -52,6 +52,11 @@ OPTIONS number, or null). Mutually exclusive with *--delimiter*, *--tsv*, and *--header*. + *--max-rows* + Stop reading input after inserting data rows and exit with + code 1 and an error message. Use this to guard against + accidentally piping extremely large files into memory. + *-h, --help* Print the help message and exit with code 0. diff --git a/src/csv.zig b/src/csv.zig index 8640ccb..6cbfe73 100644 --- a/src/csv.zig +++ b/src/csv.zig @@ -52,18 +52,15 @@ const State = enum { /// // record[0], record[1], … /// } /// ``` -pub fn CsvReader(comptime ReaderType: type) type { - return struct { - reader: ReaderType, - allocator: std.mem.Allocator, - delimiter: u8, - done: bool = false, +pub const CsvReader = struct { + reader: *std.Io.Reader, + allocator: std.mem.Allocator, + delimiter: u8, + done: bool = false, - const Self = @This(); - - pub fn init(reader: ReaderType, allocator: std.mem.Allocator, delimiter: u8) Self { - return .{ .reader = reader, .allocator = allocator, .delimiter = delimiter }; - } + pub fn init(allocator: std.mem.Allocator, reader: *std.Io.Reader, delimiter: u8) CsvReader { + return .{ .reader = reader, .allocator = allocator, .delimiter = delimiter }; + } /// Read the next CSV record. /// @@ -76,16 +73,16 @@ pub fn CsvReader(comptime ReaderType: type) type { /// decoded according to RFC 4180 ("" → ", embedded newlines preserved) /// On error.UnterminatedQuotedField: input ended inside a quoted field /// All returned memory must be freed with freeRecord. - pub fn nextRecord(self: *Self) !?[][]u8 { + pub fn nextRecord(self: *CsvReader) !?[][]u8 { if (self.done) return null; - var fields = std.ArrayList([]u8){}; + var fields = std.ArrayList([]u8).empty; errdefer { for (fields.items) |f| self.allocator.free(f); fields.deinit(self.allocator); } - var field = std.ArrayList(u8){}; + var field = std.ArrayList(u8).empty; errdefer field.deinit(self.allocator); var state: State = .field_start; @@ -100,7 +97,7 @@ pub fn CsvReader(comptime ReaderType: type) type { // Number of bytes remaining in `reader` (finite input; decreases by 1 // each iteration except on the EOF branch which exits immediately). while (true) { - const byte = self.reader.readByte() catch |err| switch (err) { + const byte = self.reader.takeByte() catch |err| switch (err) { error.EndOfStream => { // EOF: flush whatever pending data we have. if (!has_data and fields.items.len == 0) { @@ -216,29 +213,28 @@ pub fn CsvReader(comptime ReaderType: type) type { /// (same allocator); not yet freed. /// Post: every field string in record and the record slice itself are freed; /// no further access to record or its elements is valid. - pub fn freeRecord(self: *Self, record: [][]u8) void { + pub fn freeRecord(self: *CsvReader, record: [][]u8) void { for (record) |f| self.allocator.free(f); self.allocator.free(record); } - }; -} +}; -/// Convenience constructor — infers `ReaderType` from the argument. -pub fn csvReader(allocator: std.mem.Allocator, reader: anytype) CsvReader(@TypeOf(reader)) { +/// Convenience constructor — comma delimiter. +pub fn csvReader(allocator: std.mem.Allocator, reader: *std.Io.Reader) CsvReader { return csvReaderWithDelimiter(allocator, reader, ','); } /// Convenience constructor with custom input delimiter. -pub fn csvReaderWithDelimiter(allocator: std.mem.Allocator, reader: anytype, delimiter: u8) CsvReader(@TypeOf(reader)) { - return CsvReader(@TypeOf(reader)).init(reader, allocator, delimiter); +pub fn csvReaderWithDelimiter(allocator: std.mem.Allocator, reader: *std.Io.Reader, delimiter: u8) CsvReader { + return CsvReader.init(allocator, reader, delimiter); } // ─── Unit Tests ─────────────────────────────────────── test "simple unquoted fields, two records" { const input = "a,b,c\n1,2,3\n"; - var stream = std.io.fixedBufferStream(input); - var csv = csvReader(std.testing.allocator, stream.reader()); + var input_reader: std.Io.Reader = .fixed(input); + var csv = csvReader(std.testing.allocator, &input_reader); const r1 = (try csv.nextRecord()).?; defer csv.freeRecord(r1); @@ -259,8 +255,8 @@ test "simple unquoted fields, two records" { test "quoted field with embedded comma" { const input = "\"hello, world\",42\n"; - var stream = std.io.fixedBufferStream(input); - var csv = csvReader(std.testing.allocator, stream.reader()); + var input_reader: std.Io.Reader = .fixed(input); + var csv = csvReader(std.testing.allocator, &input_reader); const r = (try csv.nextRecord()).?; defer csv.freeRecord(r); @@ -271,8 +267,8 @@ test "quoted field with embedded comma" { test "escaped double-quote inside quoted field" { const input = "\"say \"\"hello\"\"\",done\n"; - var stream = std.io.fixedBufferStream(input); - var csv = csvReader(std.testing.allocator, stream.reader()); + var input_reader: std.Io.Reader = .fixed(input); + var csv = csvReader(std.testing.allocator, &input_reader); const r = (try csv.nextRecord()).?; defer csv.freeRecord(r); @@ -283,8 +279,8 @@ test "escaped double-quote inside quoted field" { test "quoted field with embedded newline (multi-line record)" { const input = "id,text\n1,\"line one\nline two\"\n"; - var stream = std.io.fixedBufferStream(input); - var csv = csvReader(std.testing.allocator, stream.reader()); + var input_reader: std.Io.Reader = .fixed(input); + var csv = csvReader(std.testing.allocator, &input_reader); const r1 = (try csv.nextRecord()).?; defer csv.freeRecord(r1); @@ -302,8 +298,8 @@ test "quoted field with embedded newline (multi-line record)" { test "crlf line endings outside quoted fields" { const input = "a,b\r\n1,2\r\n"; - var stream = std.io.fixedBufferStream(input); - var csv = csvReader(std.testing.allocator, stream.reader()); + var input_reader: std.Io.Reader = .fixed(input); + var csv = csvReader(std.testing.allocator, &input_reader); const r1 = (try csv.nextRecord()).?; defer csv.freeRecord(r1); @@ -318,8 +314,8 @@ test "crlf line endings outside quoted fields" { test "empty fields are preserved" { const input = ",middle,\n"; - var stream = std.io.fixedBufferStream(input); - var csv = csvReader(std.testing.allocator, stream.reader()); + var input_reader: std.Io.Reader = .fixed(input); + var csv = csvReader(std.testing.allocator, &input_reader); const r = (try csv.nextRecord()).?; defer csv.freeRecord(r); @@ -331,8 +327,8 @@ test "empty fields are preserved" { test "no trailing newline at EOF" { const input = "x,y"; - var stream = std.io.fixedBufferStream(input); - var csv = csvReader(std.testing.allocator, stream.reader()); + var input_reader: std.Io.Reader = .fixed(input); + var csv = csvReader(std.testing.allocator, &input_reader); const r = (try csv.nextRecord()).?; defer csv.freeRecord(r); @@ -345,8 +341,8 @@ test "no trailing newline at EOF" { test "quoted field ending at EOF without trailing newline" { const input = "\"value\""; - var stream = std.io.fixedBufferStream(input); - var csv = csvReader(std.testing.allocator, stream.reader()); + var input_reader: std.Io.Reader = .fixed(input); + var csv = csvReader(std.testing.allocator, &input_reader); const r = (try csv.nextRecord()).?; defer csv.freeRecord(r); @@ -356,8 +352,8 @@ test "quoted field ending at EOF without trailing newline" { test "empty quoted field" { const input = "\"\",b\n"; - var stream = std.io.fixedBufferStream(input); - var csv = csvReader(std.testing.allocator, stream.reader()); + var input_reader: std.Io.Reader = .fixed(input); + var csv = csvReader(std.testing.allocator, &input_reader); const r = (try csv.nextRecord()).?; defer csv.freeRecord(r); @@ -368,16 +364,16 @@ test "empty quoted field" { test "entirely empty input returns null" { const input = ""; - var stream = std.io.fixedBufferStream(input); - var csv = csvReader(std.testing.allocator, stream.reader()); + var input_reader: std.Io.Reader = .fixed(input); + var csv = csvReader(std.testing.allocator, &input_reader); try std.testing.expectEqual(@as(?[][]u8, null), try csv.nextRecord()); } test "custom pipe delimiter" { const input = "a|b|c\n1|2|3\n"; - var stream = std.io.fixedBufferStream(input); - var csv = csvReaderWithDelimiter(std.testing.allocator, stream.reader(), '|'); + var input_reader: std.Io.Reader = .fixed(input); + var csv = csvReaderWithDelimiter(std.testing.allocator, &input_reader, '|'); const r1 = (try csv.nextRecord()).?; defer csv.freeRecord(r1); @@ -396,8 +392,8 @@ test "custom pipe delimiter" { test "custom tab delimiter" { const input = "name\tage\nAlice\t30\n"; - var stream = std.io.fixedBufferStream(input); - var csv = csvReaderWithDelimiter(std.testing.allocator, stream.reader(), '\t'); + var input_reader: std.Io.Reader = .fixed(input); + var csv = csvReaderWithDelimiter(std.testing.allocator, &input_reader, '\t'); const r1 = (try csv.nextRecord()).?; defer csv.freeRecord(r1); diff --git a/src/main.zig b/src/main.zig index 517cbb6..84b98e8 100644 --- a/src/main.zig +++ b/src/main.zig @@ -5,15 +5,8 @@ const c = @cImport({ const csv = @import("csv.zig"); const build_options = @import("build_options"); -/// Version string injected at build time from build.zig.zon via build.zig. const VERSION: []const u8 = build_options.version; -// sqlite_static (null): SQLite assumes the memory is constant and won't free it. -// Safety: sqlite3_step is called inside insertRowTyped immediately after all -// bindings, returning SQLITE_DONE before the function returns. The caller's -// row buffer is only freed after insertRowTyped returns, so the bound pointers -// remain valid throughout the statement's execution. sqlite3_reset at the top -// of the next call releases any prior references. const sqlite_static: c.sqlite3_destructor_type = null; // ─── Error types ───────────────────────────────────── @@ -22,6 +15,7 @@ const SqlPipeError = error{ MissingQuery, InvalidDelimiter, IncompatibleFlags, + InvalidMaxRows, OpenDbFailed, EmptyInput, EmptyColumnName, @@ -57,16 +51,12 @@ const ExitCode = enum(u8) { /// Parsed command-line arguments. const ParsedArgs = struct { - /// The SQL query to execute after loading stdin. query: []const u8, - /// When false, skip type inference and use TEXT for every column (pure TEXT mode). type_inference: bool, - /// Input field delimiter for CSV parsing. delimiter: u8, - /// When true, print a header row with column names before data rows. header: bool, - /// When true, emit results as a JSON array of objects instead of CSV. json: bool, + max_rows: ?usize, }; /// Result of argument parsing — either parsed arguments or a special action. @@ -84,7 +74,7 @@ const ArgsResult = union(enum) { /// printUsage(writer) → void /// Pre: writer is a valid stderr writer /// Post: usage text has been written to writer -fn printUsage(writer: anytype) !void { +fn printUsage(writer: *std.Io.Writer) !void { try writer.writeAll( \\Usage: sql-pipe [OPTIONS] \\ @@ -97,6 +87,7 @@ fn printUsage(writer: anytype) !void { \\ --no-type-inference Treat all columns as TEXT (skip auto-detection) \\ -H, --header Print column names as the first output row \\ --json Output results as a JSON array of objects + \\ --max-rows Stop after data rows and exit with error \\ -h, --help Show this help message and exit \\ -V, --version Show version and exit \\ @@ -135,7 +126,7 @@ fn parseDelimiter(value: []const u8) SqlPipeError!u8 { /// result = .version when --version or -V is present /// error.MissingQuery when no non-flag argument is found /// error.IncompatibleFlags when --json is combined with --delimiter/--tsv/--header -fn parseArgs(args: []const [:0]u8) SqlPipeError!ArgsResult { +fn parseArgs(args: []const [:0]const u8) SqlPipeError!ArgsResult { var query: ?[]const u8 = null; var type_inference = true; var delimiter: u8 = ','; @@ -143,13 +134,15 @@ fn parseArgs(args: []const [:0]u8) SqlPipeError!ArgsResult { var json = false; var explicit_delimiter = false; var explicit_tsv = false; + var max_rows: ?usize = null; // Loop invariant I: all args[1..i] have been processed; // query holds the first non-flag argument seen, or null; // type_inference reflects the presence of --no-type-inference; // delimiter reflects -d/--delimiter/--tsv if present; // header reflects the presence of --header/-H; - // json reflects the presence of --json + // json reflects the presence of --json; + // max_rows reflects the presence of --max-rows // Bounding function: args.len - i var i: usize = 1; while (i < args.len) : (i += 1) { @@ -178,6 +171,14 @@ fn parseArgs(args: []const [:0]u8) SqlPipeError!ArgsResult { header = true; } else if (std.mem.eql(u8, arg, "--json")) { json = true; + } else if (std.mem.eql(u8, arg, "--max-rows")) { + i += 1; + if (i >= args.len) return error.InvalidMaxRows; + max_rows = std.fmt.parseUnsigned(usize, args[i], 10) catch return error.InvalidMaxRows; + if (max_rows.? == 0) return error.InvalidMaxRows; + } else if (std.mem.startsWith(u8, arg, "--max-rows=")) { + max_rows = std.fmt.parseUnsigned(usize, arg["--max-rows=".len..], 10) catch return error.InvalidMaxRows; + if (max_rows.? == 0) return error.InvalidMaxRows; } else { if (query == null) query = arg; } @@ -193,6 +194,7 @@ fn parseArgs(args: []const [:0]u8) SqlPipeError!ArgsResult { .delimiter = delimiter, .header = header, .json = json, + .max_rows = max_rows, } }; } @@ -337,7 +339,7 @@ fn parseHeader( record[0] = without_bom; } - var cols: std.ArrayList([]const u8) = .{}; + var cols: std.ArrayList([]const u8) = .empty; errdefer { for (cols.items) |col| allocator.free(col); cols.deinit(allocator); @@ -393,7 +395,7 @@ fn createTable( cols: []const []const u8, types: []const ColumnType, ) (SqlPipeError || std.mem.Allocator.Error)!void { - var sql: std.ArrayList(u8) = .{}; + var sql: std.ArrayList(u8) = .empty; defer sql.deinit(allocator); try sql.appendSlice(allocator, "CREATE TABLE t ("); @@ -434,7 +436,7 @@ fn prepareInsert( db: *c.sqlite3, n: usize, ) (SqlPipeError || std.mem.Allocator.Error)!*c.sqlite3_stmt { - var sql: std.ArrayList(u8) = .{}; + var sql: std.ArrayList(u8) = .empty; defer sql.deinit(allocator); try sql.appendSlice(allocator, "INSERT INTO t VALUES ("); @@ -696,10 +698,10 @@ fn execQuery( allocator: std.mem.Allocator, db: *c.sqlite3, query: []const u8, - writer: anytype, + writer: *std.Io.Writer, header: bool, json: bool, -) (SqlPipeError || std.mem.Allocator.Error || @TypeOf(writer).Error)!void { +) (SqlPipeError || std.mem.Allocator.Error || std.Io.Writer.Error)!void { const query_z = try allocator.dupeZ(u8, query); defer allocator.free(query_z); @@ -747,26 +749,32 @@ fn execQuery( /// fatal(writer, code, comptime fmt, args) → noreturn /// Pre: writer is stderr, code is non-zero ExitCode /// Post: "error: \n" written to stderr, process exits with code -fn fatal(comptime fmt: []const u8, writer: anytype, code: ExitCode, args: anytype) noreturn { +fn fatal(comptime fmt: []const u8, writer: *std.Io.Writer, code: ExitCode, args: anytype) noreturn { writer.print("error: " ++ fmt ++ "\n", args) catch |err| { std.log.err("failed to write error message: {}", .{err}); }; + writer.flush() catch |err| std.log.err("failed to flush: {}", .{err}); std.process.exit(@intFromEnum(code)); } -pub fn main() void { +pub fn main(init: std.process.Init.Minimal) void { var gpa: std.heap.DebugAllocator(.{}) = .init; defer _ = gpa.deinit(); const allocator = gpa.allocator(); - const stderr = std.fs.File.stderr(); - const stderr_writer = stderr.deprecatedWriter(); - const stdout_writer = std.fs.File.stdout().deprecatedWriter(); + var io = std.Io.Threaded.init_single_threaded; + + var stderr_buf: [1024]u8 = undefined; + var stderr_file_writer = std.Io.File.writer(std.Io.File.stderr(), io.io(), &stderr_buf); + const stderr_writer: *std.Io.Writer = &stderr_file_writer.interface; + + var stdout_buf: [4096]u8 = undefined; + var stdout_file_writer = std.Io.File.writer(std.Io.File.stdout(), io.io(), &stdout_buf); + const stdout_writer: *std.Io.Writer = &stdout_file_writer.interface; - // {A0: process argv is accessible, allocator is valid} - const args = std.process.argsAlloc(allocator) catch + const args = init.args.toSlice(allocator) catch fatal("failed to read process arguments", stderr_writer, .usage, .{}); - defer std.process.argsFree(allocator, args); + defer allocator.free(args); const args_result = parseArgs(args) catch |err| { switch (err) { @@ -774,6 +782,14 @@ pub fn main() void { stderr_writer.writeAll("error: --json cannot be combined with --delimiter, --tsv, or --header\n") catch |werr| { std.log.err("failed to write error message: {}", .{werr}); }; + stderr_writer.flush() catch |ferr| std.log.err("failed to flush: {}", .{ferr}); + std.process.exit(@intFromEnum(ExitCode.usage)); + }, + error.InvalidMaxRows => { + stderr_writer.writeAll("error: --max-rows must be a positive integer\n") catch |werr| { + std.log.err("failed to write error message: {}", .{werr}); + }; + stderr_writer.flush() catch |ferr| std.log.err("failed to flush: {}", .{ferr}); std.process.exit(@intFromEnum(ExitCode.usage)); }, else => {}, @@ -781,6 +797,7 @@ pub fn main() void { printUsage(stderr_writer) catch |werr| { std.log.err("failed to write usage: {}", .{werr}); }; + stderr_writer.flush() catch |ferr| std.log.err("failed to flush: {}", .{ferr}); std.process.exit(@intFromEnum(ExitCode.usage)); }; @@ -789,16 +806,24 @@ pub fn main() void { printUsage(stderr_writer) catch |err| { std.log.err("failed to write usage: {}", .{err}); }; + stderr_writer.flush() catch |err| std.log.err("failed to flush: {}", .{err}); std.process.exit(@intFromEnum(ExitCode.success)); }, .version => { stderr_writer.print("sql-pipe {s}\n", .{VERSION}) catch |err| { std.log.err("failed to write version: {}", .{err}); }; + stderr_writer.flush() catch |err| std.log.err("failed to flush: {}", .{err}); std.process.exit(@intFromEnum(ExitCode.success)); }, .parsed => |parsed| { - run(parsed, allocator, stderr_writer, stdout_writer); + run(parsed, allocator, io.io(), stderr_writer, stdout_writer); + stdout_file_writer.flush() catch |err| { + std.log.err("failed to flush stdout: {}", .{err}); + }; + stderr_file_writer.flush() catch |err| { + std.log.err("failed to flush stderr: {}", .{err}); + }; }, } } @@ -811,8 +836,9 @@ pub fn main() void { fn run( parsed: ParsedArgs, allocator: std.mem.Allocator, - stderr_writer: anytype, - stdout_writer: anytype, + io: std.Io, + stderr_writer: *std.Io.Writer, + stdout_writer: *std.Io.Writer, ) void { const query = parsed.query; // {A1: query is the SQL string; parsed.type_inference indicates buffer-first mode} @@ -822,8 +848,9 @@ fn run( defer _ = c.sqlite3_close(db); // {A2: db is an open, empty in-memory SQLite database} - const stdin = std.fs.File.stdin().deprecatedReader(); - var csv_reader = csv.csvReaderWithDelimiter(allocator, stdin, parsed.delimiter); + var stdin_buf: [4096]u8 = undefined; + var stdin_file_reader = std.Io.File.reader(std.Io.File.stdin(), io, &stdin_buf); + var csv_reader = csv.csvReaderWithDelimiter(allocator, &stdin_file_reader.interface, parsed.delimiter); const header_record = csv_reader.nextRecord() catch |err| switch (err) { error.UnterminatedQuotedField => fatal("row 1: unterminated quoted field", stderr_writer, .csv_error, .{}), @@ -847,7 +874,7 @@ fn run( const num_cols = cols.len; // ─── Phase 1: determine column types ───────────────────────────────────── - var row_buffer: std.ArrayList([][]u8) = .{}; + var row_buffer: std.ArrayList([][]u8) = .empty; defer { for (row_buffer.items) |row| csv_reader.freeRecord(row); row_buffer.deinit(allocator); @@ -909,7 +936,14 @@ fn run( defer _ = c.sqlite3_finalize(stmt); // Insert buffered rows + var rows_inserted: usize = 0; for (row_buffer.items) |row| { + rows_inserted += 1; + if (parsed.max_rows) |limit| { + if (rows_inserted > limit) { + fatal("input exceeds --max-rows limit ({d} rows)", stderr_writer, .usage, .{limit}); + } + } insertRowTyped(stmt, db, row, types, @intCast(num_cols)) catch fatal("{s}", stderr_writer, .sql_error, .{std.mem.span(c.sqlite3_errmsg(db))}); } @@ -936,6 +970,12 @@ fn run( if (record.len == 0) continue; + rows_inserted += 1; + if (parsed.max_rows) |limit| { + if (rows_inserted > limit) { + fatal("input exceeds --max-rows limit ({d} rows)", stderr_writer, .usage, .{limit}); + } + } insertRowTyped(stmt, db, record, types, @intCast(num_cols)) catch fatal("{s}", stderr_writer, .sql_error, .{std.mem.span(c.sqlite3_errmsg(db))}); } From eb76ec2fc540f867b08d722c1297c9f39c259cc9 Mon Sep 17 00:00:00 2001 From: "Victor M. Varela" Date: Wed, 29 Apr 2026 14:01:24 +0200 Subject: [PATCH 2/5] chore: upgrade CI and release workflows to Zig 0.16.0 --- .github/workflows/ci.yml | 2 +- .github/workflows/release.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index edae829..cb856ba 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -9,7 +9,7 @@ on: env: SQLITE_VERSION: "3490100" SQLITE_YEAR: "2025" - ZIG_VERSION: "0.15.0" + ZIG_VERSION: "0.16.0" jobs: build-and-test: diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index c5f542b..7390758 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -8,7 +8,7 @@ on: env: SQLITE_VERSION: "3490100" SQLITE_YEAR: "2025" - ZIG_VERSION: "0.15.0" + ZIG_VERSION: "0.16.0" jobs: build: From 6a59bc0f63fa9cb4a48de8e3e8cded28a5faeaca Mon Sep 17 00:00:00 2001 From: "Victor M. Varela" Date: Wed, 29 Apr 2026 14:08:19 +0200 Subject: [PATCH 3/5] fix default version dev --- build.zig | 2 +- build.zig.zon | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/build.zig b/build.zig index abfe678..4594e09 100644 --- a/build.zig +++ b/build.zig @@ -17,7 +17,7 @@ pub fn build(b: *std.Build) void { []const u8, "version", "Override version string (default: from build.zig.zon)", - ) orelse "0.2.0"; + ) orelse "0.0.0-dev"; const exe = b.addExecutable(.{ .name = "sql-pipe", diff --git a/build.zig.zon b/build.zig.zon index b4a7487..25963ed 100644 --- a/build.zig.zon +++ b/build.zig.zon @@ -1,6 +1,6 @@ .{ .name = .sql_pipe, - .version = "0.2.0", + .version = "0.0.0-dev", .fingerprint = 0xf649b9ac95d768ab, .minimum_zig_version = "0.16.0", .paths = .{"."}, From b045ae0569a44ee6f8ca9d8e58241c1826b65e91 Mon Sep 17 00:00:00 2001 From: "Victor M. Varela" Date: Wed, 29 Apr 2026 14:26:30 +0200 Subject: [PATCH 4/5] fix: address post-review findings from PR #109 - Restore sqlite_static doc comment and ParsedArgs field doc comments - Add max_rows field doc comment to ParsedArgs - Fix --max-rows help text wording - Migrate remaining 6 anytype writer params to *std.Io.Writer - Fix csv.zig method indentation via zig fmt - Add integration tests 23-25 (= form, non-numeric value, --no-type-inference path) --- build.zig | 24 +++++ src/csv.zig | 280 +++++++++++++++++++++++++-------------------------- src/main.zig | 22 ++-- 3 files changed, 179 insertions(+), 147 deletions(-) diff --git a/build.zig b/build.zig index 4594e09..8ee1349 100644 --- a/build.zig +++ b/build.zig @@ -256,6 +256,30 @@ pub fn build(b: *std.Build) void { test_max_rows_zero.step.dependOn(b.getInstallStep()); test_step.dependOn(&test_max_rows_zero.step); + // Integration test 23: --max-rows=5 (equals form) succeeds + const test_max_rows_equals = b.addSystemCommand(&.{ + "bash", "-c", + \\printf 'name,age\nAlice,30\nBob,25\n' | ./zig-out/bin/sql-pipe --max-rows=5 'SELECT name FROM t ORDER BY name' | diff - <(printf 'Alice\nBob\n') + }); + test_max_rows_equals.step.dependOn(b.getInstallStep()); + test_step.dependOn(&test_max_rows_equals.step); + + // Integration test 24: --max-rows with non-numeric value exits 1 + const test_max_rows_nan = b.addSystemCommand(&.{ + "bash", "-c", + \\printf 'name,age\nAlice,30\n' | ./zig-out/bin/sql-pipe --max-rows abc 'SELECT * FROM t' 2>&1 >/dev/null; test $? -eq 1 + }); + test_max_rows_nan.step.dependOn(b.getInstallStep()); + test_step.dependOn(&test_max_rows_nan.step); + + // Integration test 25: --no-type-inference --max-rows 1 hits limit and exits 1 + const test_max_rows_streaming = b.addSystemCommand(&.{ + "bash", "-c", + \\msg=$(printf 'name,age\nAlice,30\nBob,25\n' | ./zig-out/bin/sql-pipe --no-type-inference --max-rows 1 'SELECT * FROM t' 2>&1 >/dev/null; echo "EXIT:$?") && echo "$msg" | grep -q 'EXIT:1' + }); + test_max_rows_streaming.step.dependOn(b.getInstallStep()); + test_step.dependOn(&test_max_rows_streaming.step); + // Unit tests for the RFC 4180 CSV parser (src/csv.zig) const unit_tests = b.addTest(.{ .root_module = b.createModule(.{ diff --git a/src/csv.zig b/src/csv.zig index 6cbfe73..5e04512 100644 --- a/src/csv.zig +++ b/src/csv.zig @@ -62,161 +62,161 @@ pub const CsvReader = struct { return .{ .reader = reader, .allocator = allocator, .delimiter = delimiter }; } - /// Read the next CSV record. - /// - /// Pre: self.done = false (otherwise returns null immediately) - /// self.reader is positioned at the start of the next record - /// self.allocator is valid - /// Post: result = null ⟺ no more records exist in the input - /// result = fields ⟺ fields is a heap-allocated [][]u8 where - /// fields[i] is the UTF-8 content of the i-th field of the record, - /// decoded according to RFC 4180 ("" → ", embedded newlines preserved) - /// On error.UnterminatedQuotedField: input ended inside a quoted field - /// All returned memory must be freed with freeRecord. - pub fn nextRecord(self: *CsvReader) !?[][]u8 { - if (self.done) return null; - - var fields = std.ArrayList([]u8).empty; - errdefer { - for (fields.items) |f| self.allocator.free(f); - fields.deinit(self.allocator); - } + /// Read the next CSV record. + /// + /// Pre: self.done = false (otherwise returns null immediately) + /// self.reader is positioned at the start of the next record + /// self.allocator is valid + /// Post: result = null ⟺ no more records exist in the input + /// result = fields ⟺ fields is a heap-allocated [][]u8 where + /// fields[i] is the UTF-8 content of the i-th field of the record, + /// decoded according to RFC 4180 ("" → ", embedded newlines preserved) + /// On error.UnterminatedQuotedField: input ended inside a quoted field + /// All returned memory must be freed with freeRecord. + pub fn nextRecord(self: *CsvReader) !?[][]u8 { + if (self.done) return null; + + var fields = std.ArrayList([]u8).empty; + errdefer { + for (fields.items) |f| self.allocator.free(f); + fields.deinit(self.allocator); + } - var field = std.ArrayList(u8).empty; - errdefer field.deinit(self.allocator); - - var state: State = .field_start; - var has_data = false; - - // Loop invariant I: - // `state` satisfies the representation invariant of the automaton. - // `field` contains the decoded bytes of the field currently being parsed. - // `fields` contains the completed, heap-allocated fields of this record. - // All bytes read from `reader` so far have been processed exactly once. - // Bounding function: - // Number of bytes remaining in `reader` (finite input; decreases by 1 - // each iteration except on the EOF branch which exits immediately). - while (true) { - const byte = self.reader.takeByte() catch |err| switch (err) { - error.EndOfStream => { - // EOF: flush whatever pending data we have. - if (!has_data and fields.items.len == 0) { - field.deinit(self.allocator); - fields.deinit(self.allocator); - self.done = true; - return null; - } - if (state == .quoted) { - field.deinit(self.allocator); - for (fields.items) |f| self.allocator.free(f); - fields.deinit(self.allocator); - return error.UnterminatedQuotedField; - } - // Flush the last field and return the record. - try fields.append(self.allocator, try field.toOwnedSlice(self.allocator)); + var field = std.ArrayList(u8).empty; + errdefer field.deinit(self.allocator); + + var state: State = .field_start; + var has_data = false; + + // Loop invariant I: + // `state` satisfies the representation invariant of the automaton. + // `field` contains the decoded bytes of the field currently being parsed. + // `fields` contains the completed, heap-allocated fields of this record. + // All bytes read from `reader` so far have been processed exactly once. + // Bounding function: + // Number of bytes remaining in `reader` (finite input; decreases by 1 + // each iteration except on the EOF branch which exits immediately). + while (true) { + const byte = self.reader.takeByte() catch |err| switch (err) { + error.EndOfStream => { + // EOF: flush whatever pending data we have. + if (!has_data and fields.items.len == 0) { + field.deinit(self.allocator); + fields.deinit(self.allocator); self.done = true; - return try fields.toOwnedSlice(self.allocator); - }, - else => return err, - }; - - has_data = true; - - switch (state) { - .field_start => { - if (byte == self.delimiter) { - // Empty unquoted field before delimiter. + return null; + } + if (state == .quoted) { + field.deinit(self.allocator); + for (fields.items) |f| self.allocator.free(f); + fields.deinit(self.allocator); + return error.UnterminatedQuotedField; + } + // Flush the last field and return the record. + try fields.append(self.allocator, try field.toOwnedSlice(self.allocator)); + self.done = true; + return try fields.toOwnedSlice(self.allocator); + }, + else => return err, + }; + + has_data = true; + + switch (state) { + .field_start => { + if (byte == self.delimiter) { + // Empty unquoted field before delimiter. + try fields.append(self.allocator, try field.toOwnedSlice(self.allocator)); + state = .field_start; + } else switch (byte) { + '"' => { + state = .quoted; + }, + '\r' => { + // Part of \r\n; ignore, \n will terminate the record. + }, + '\n' => { + // End of record — last field is empty. try fields.append(self.allocator, try field.toOwnedSlice(self.allocator)); - state = .field_start; - } else switch (byte) { - '"' => { - state = .quoted; - }, - '\r' => { - // Part of \r\n; ignore, \n will terminate the record. - }, - '\n' => { - // End of record — last field is empty. - try fields.append(self.allocator, try field.toOwnedSlice(self.allocator)); - return try fields.toOwnedSlice(self.allocator); - }, - else => { - try field.append(self.allocator, byte); - state = .unquoted; - }, - } - }, + return try fields.toOwnedSlice(self.allocator); + }, + else => { + try field.append(self.allocator, byte); + state = .unquoted; + }, + } + }, - .unquoted => { - if (byte == self.delimiter) { + .unquoted => { + if (byte == self.delimiter) { + try fields.append(self.allocator, try field.toOwnedSlice(self.allocator)); + state = .field_start; + } else switch (byte) { + '\r' => { + // Strip \r before the \n record terminator. + }, + '\n' => { try fields.append(self.allocator, try field.toOwnedSlice(self.allocator)); - state = .field_start; - } else switch (byte) { - '\r' => { - // Strip \r before the \n record terminator. - }, - '\n' => { - try fields.append(self.allocator, try field.toOwnedSlice(self.allocator)); - return try fields.toOwnedSlice(self.allocator); - }, - else => { - try field.append(self.allocator, byte); - }, - } - }, - - .quoted => switch (byte) { - '"' => { - state = .quote_saw; + return try fields.toOwnedSlice(self.allocator); }, - // All bytes including \n and \r are part of the field value - // when inside a quoted field (RFC 4180 §2.6). else => { try field.append(self.allocator, byte); }, + } + }, + + .quoted => switch (byte) { + '"' => { + state = .quote_saw; + }, + // All bytes including \n and \r are part of the field value + // when inside a quoted field (RFC 4180 §2.6). + else => { + try field.append(self.allocator, byte); }, + }, - .quote_saw => { - if (byte == self.delimiter) { - // Closing quote followed by field delimiter. + .quote_saw => { + if (byte == self.delimiter) { + // Closing quote followed by field delimiter. + try fields.append(self.allocator, try field.toOwnedSlice(self.allocator)); + state = .field_start; + } else switch (byte) { + '"' => { + // Escaped double-quote: "" → single " + try field.append(self.allocator, '"'); + state = .quoted; + }, + '\r' => { + // Skip \r before \n record terminator. + }, + '\n' => { + // Closing quote followed by record terminator. try fields.append(self.allocator, try field.toOwnedSlice(self.allocator)); - state = .field_start; - } else switch (byte) { - '"' => { - // Escaped double-quote: "" → single " - try field.append(self.allocator, '"'); - state = .quoted; - }, - '\r' => { - // Skip \r before \n record terminator. - }, - '\n' => { - // Closing quote followed by record terminator. - try fields.append(self.allocator, try field.toOwnedSlice(self.allocator)); - return try fields.toOwnedSlice(self.allocator); - }, - else => { - // Non-standard content after closing quote; treat as - // continuation of the field in unquoted mode. - try field.append(self.allocator, byte); - state = .unquoted; - }, - } - }, - } + return try fields.toOwnedSlice(self.allocator); + }, + else => { + // Non-standard content after closing quote; treat as + // continuation of the field in unquoted mode. + try field.append(self.allocator, byte); + state = .unquoted; + }, + } + }, } } + } - /// Release a record previously returned by `nextRecord`. - /// - /// Pre: record was returned by nextRecord on this CsvReader instance - /// (same allocator); not yet freed. - /// Post: every field string in record and the record slice itself are freed; - /// no further access to record or its elements is valid. - pub fn freeRecord(self: *CsvReader, record: [][]u8) void { - for (record) |f| self.allocator.free(f); - self.allocator.free(record); - } + /// Release a record previously returned by `nextRecord`. + /// + /// Pre: record was returned by nextRecord on this CsvReader instance + /// (same allocator); not yet freed. + /// Post: every field string in record and the record slice itself are freed; + /// no further access to record or its elements is valid. + pub fn freeRecord(self: *CsvReader, record: [][]u8) void { + for (record) |f| self.allocator.free(f); + self.allocator.free(record); + } }; /// Convenience constructor — comma delimiter. diff --git a/src/main.zig b/src/main.zig index 84b98e8..38b2c19 100644 --- a/src/main.zig +++ b/src/main.zig @@ -7,6 +7,8 @@ const build_options = @import("build_options"); const VERSION: []const u8 = build_options.version; +/// SQLITE_STATIC sentinel: tells sqlite3_bind_text that the string is +/// caller-managed and SQLite must not attempt to free it. const sqlite_static: c.sqlite3_destructor_type = null; // ─── Error types ───────────────────────────────────── @@ -51,11 +53,17 @@ const ExitCode = enum(u8) { /// Parsed command-line arguments. const ParsedArgs = struct { + /// SQL query to execute against table `t`. query: []const u8, + /// Infer column types from the first 100 buffered rows when true. type_inference: bool, + /// CSV field delimiter (default: ','). delimiter: u8, + /// Emit column names as first output row when true. header: bool, + /// Output results as a JSON array of objects when true. json: bool, + /// Abort with exit 1 when more than this many data rows are read; null = unlimited. max_rows: ?usize, }; @@ -87,7 +95,7 @@ fn printUsage(writer: *std.Io.Writer) !void { \\ --no-type-inference Treat all columns as TEXT (skip auto-detection) \\ -H, --header Print column names as the first output row \\ --json Output results as a JSON array of objects - \\ --max-rows Stop after data rows and exit with error + \\ --max-rows Stop if more than data rows are read (exit 1) \\ -h, --help Show this help message and exit \\ -V, --version Show version and exit \\ @@ -327,7 +335,7 @@ fn inferTypes( fn parseHeader( allocator: std.mem.Allocator, record: [][]u8, - stderr_writer: anytype, + stderr_writer: *std.Io.Writer, ) (SqlPipeError || std.mem.Allocator.Error)![][]const u8 { if (record.len == 0) return error.NoColumns; @@ -538,7 +546,7 @@ fn insertRowTyped( fn printRow( stmt: *c.sqlite3_stmt, col_count: c_int, - writer: anytype, + writer: *std.Io.Writer, ) !void { // Loop invariant I: columns 0..i-1 have been written, separated by commas // Bounding function: col_count - i @@ -565,7 +573,7 @@ fn printRow( /// if value contains comma, double-quote, or newline, it is enclosed /// in double-quotes with internal quotes escaped as "" (RFC 4180); /// otherwise it is written verbatim -fn writeField(writer: anytype, value: []const u8) !void { +fn writeField(writer: *std.Io.Writer, value: []const u8) !void { var needs_quoting = false; for (value) |ch| { if (ch == ',' or ch == '"' or ch == '\n' or ch == '\r') { @@ -593,7 +601,7 @@ fn writeField(writer: anytype, value: []const u8) !void { fn printHeaderRow( stmt: *c.sqlite3_stmt, col_count: c_int, - writer: anytype, + writer: *std.Io.Writer, ) !void { // Loop invariant I: columns 0..i-1 names have been written, separated by commas // Bounding function: col_count - i @@ -614,7 +622,7 @@ fn printHeaderRow( /// Post: s is written as a JSON string literal (double-quoted, with special /// characters escaped per RFC 8259: \", \\, \/, \b, \f, \n, \r, \t, /// and \uXXXX for control characters 0x00–0x1F) -fn writeJsonString(writer: anytype, s: []const u8) !void { +fn writeJsonString(writer: *std.Io.Writer, s: []const u8) !void { try writer.writeByte('"'); for (s) |ch| { switch (ch) { @@ -645,7 +653,7 @@ fn printJsonRow( stmt: *c.sqlite3_stmt, col_count: c_int, col_names: []const [*:0]const u8, - writer: anytype, + writer: *std.Io.Writer, is_first: bool, ) !void { if (!is_first) try writer.writeByte(','); From da045f8f8c56a6e7f07df1b2dd0b7f6d06475e43 Mon Sep 17 00:00:00 2001 From: "Victor M. Varela" Date: Wed, 29 Apr 2026 14:36:36 +0200 Subject: [PATCH 5/5] fix: toSlice Windows CI error --- src/main.zig | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main.zig b/src/main.zig index 38b2c19..e6f4662 100644 --- a/src/main.zig +++ b/src/main.zig @@ -780,9 +780,10 @@ pub fn main(init: std.process.Init.Minimal) void { var stdout_file_writer = std.Io.File.writer(std.Io.File.stdout(), io.io(), &stdout_buf); const stdout_writer: *std.Io.Writer = &stdout_file_writer.interface; - const args = init.args.toSlice(allocator) catch + var args_arena = std.heap.ArenaAllocator.init(allocator); + defer args_arena.deinit(); + const args = init.args.toSlice(args_arena.allocator()) catch fatal("failed to read process arguments", stderr_writer, .usage, .{}); - defer allocator.free(args); const args_result = parseArgs(args) catch |err| { switch (err) {