Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ $ printf 'name,age\nAlice,30\nBob,25' | sql-pipe --json 'SELECT * FROM t'
[{"name":"Alice","age":30},{"name":"Bob","age":25}]
```

`--json` is mutually exclusive with `-d`/`--delimiter`, `--tsv`, and `-H`/`--header`.
`--json` is mutually exclusive with `-H`/`--header`. It can be combined with `-d`/`--delimiter` and `--tsv` to read non-comma-separated input.

Chain queries by piping back in — useful for two-pass aggregations:

Expand All @@ -172,7 +172,7 @@ $ cat events.csv \
| `--tsv` | Alias for `--delimiter '\t'` |
| `--no-type-inference` | Treat all columns as TEXT (skip auto-detection) |
| `-H`, `--header` | Print column names as the first output row |
| `--json` | Output results as a JSON array of objects (mutually exclusive with `-d`, `--tsv`, `-H`) |
| `--json` | Output results as a JSON array of objects (mutually exclusive with `-H`) |
| `--max-rows <n>` | Stop if more than `n` data rows are read (exit 1) |
| `-h`, `--help` | Show usage help and exit |
| `-V`, `--version` | Print version and exit |
Expand All @@ -183,11 +183,20 @@ $ cat events.csv \
|------|----------|
| `0` | Success |
| `1` | Usage error (missing query, bad arguments) |
| `2` | CSV parse error (with row number) |
| `3` | SQL error (with sqlite3 error message) |
| `2` | CSV parse error (with 1-based row number) |
| `3` | SQL error (with sqlite3 error message, available columns, and a "did you mean?" hint when applicable) |

All error messages are prefixed with `error:` and written to stderr.

On SQL error, `sql-pipe` also prints the list of columns available in table `t` and,
when the unknown identifier closely matches a column name (edit distance ≤ 2), a hint:

```
error: no such column: amout
table "t" has columns: id, amount, region
hint: did you mean "amount"?
```

## Recipes

**Top N rows by a column:**
Expand Down
37 changes: 37 additions & 0 deletions build.zig
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,14 @@ pub fn build(b: *std.Build) void {
test_json_incompatible.step.dependOn(b.getInstallStep());
test_step.dependOn(&test_json_incompatible.step);

// Integration test 17b: --json is compatible with --delimiter (delimiter affects input only)
const test_json_with_delimiter = b.addSystemCommand(&.{
"bash", "-c",
\\printf 'name;age\nAlice;30\nBob;25\n' | ./zig-out/bin/sql-pipe --json -d ';' 'SELECT name, age FROM t ORDER BY age' | diff - <(printf '[{"name":"Bob","age":25},{"name":"Alice","age":30}]\n')
});
test_json_with_delimiter.step.dependOn(b.getInstallStep());
test_step.dependOn(&test_json_with_delimiter.step);

// Integration test 18: duplicate column names emit warning to stderr
const test_dup_col_warning = b.addSystemCommand(&.{
"bash", "-c",
Expand Down Expand Up @@ -280,6 +288,35 @@ pub fn build(b: *std.Build) void {
test_max_rows_streaming.step.dependOn(b.getInstallStep());
test_step.dependOn(&test_max_rows_streaming.step);

// Integration test 26: SQL error on unknown column prints column list to stderr
const test_sql_error_col_list = b.addSystemCommand(&.{
"bash", "-c",
\\msg=$(printf 'id,amount,region\n1,100,east\n' | ./zig-out/bin/sql-pipe 'SELECT revenue FROM t' 2>&1 >/dev/null; echo "EXIT:$?")
\\echo "$msg" | grep -q 'no such column: revenue' \
\\ && echo "$msg" | grep -q 'table "t" has columns: id, amount, region' \
\\ && echo "$msg" | grep -q 'EXIT:3'
});
test_sql_error_col_list.step.dependOn(b.getInstallStep());
test_step.dependOn(&test_sql_error_col_list.step);

// Integration test 27: SQL error on near-miss column name prints "did you mean" hint
const test_sql_error_hint = b.addSystemCommand(&.{
"bash", "-c",
\\msg=$(printf 'id,amount,region\n1,100,east\n' | ./zig-out/bin/sql-pipe 'SELECT amout FROM t' 2>&1 >/dev/null; echo "EXIT:$?")
\\echo "$msg" | grep -q 'hint: did you mean "amount"' && echo "$msg" | grep -q 'EXIT:3'
});
test_sql_error_hint.step.dependOn(b.getInstallStep());
test_step.dependOn(&test_sql_error_hint.step);

// Integration test 28: CSV parse error includes 1-based row number in message
const test_csv_row_number = b.addSystemCommand(&.{
"bash", "-c",
\\msg=$(printf 'name,age\n"unterminated' | ./zig-out/bin/sql-pipe 'SELECT * FROM t' 2>&1 >/dev/null; echo "EXIT:$?")
\\echo "$msg" | grep -q 'row 2: unterminated quoted field' && echo "$msg" | grep -q 'EXIT:2'
});
test_csv_row_number.step.dependOn(b.getInstallStep());
test_step.dependOn(&test_csv_row_number.step);

// Unit tests for the RFC 4180 CSV parser (src/csv.zig)
const unit_tests = b.addTest(.{
.root_module = b.createModule(.{
Expand Down
4 changes: 1 addition & 3 deletions src/csv.zig
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,7 @@ pub const CsvReader = struct {
return null;
}
if (state == .quoted) {
field.deinit(self.allocator);
for (fields.items) |f| self.allocator.free(f);
fields.deinit(self.allocator);
// errdefer blocks above handle cleanup of `field` and `fields`.
return error.UnterminatedQuotedField;
}
// Flush the last field and return the record.
Expand Down
142 changes: 124 additions & 18 deletions src/main.zig
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ fn parseDelimiter(value: []const u8) SqlPipeError!u8 {
/// result = .help when --help or -h is present
/// result = .version when --version or -V is present
/// error.MissingQuery when no non-flag argument is found
/// error.IncompatibleFlags when --json is combined with --delimiter/--tsv/--header
/// error.IncompatibleFlags when --json is combined with --header
fn parseArgs(args: []const [:0]const u8) SqlPipeError!ArgsResult {
var query: ?[]const u8 = null;
var type_inference = true;
Expand Down Expand Up @@ -192,8 +192,8 @@ fn parseArgs(args: []const [:0]const u8) SqlPipeError!ArgsResult {
}
}

// --json is mutually exclusive with --delimiter / --tsv / --header
if (json and (explicit_delimiter or explicit_tsv or header))
// --json is mutually exclusive with --header (both affect output format)
if (json and header)
return error.IncompatibleFlags;

return .{ .parsed = ParsedArgs{
Expand Down Expand Up @@ -752,6 +752,102 @@ fn execQuery(
}
}

// ─── SQL error context helpers ────────────────────────

/// Compute the Levenshtein edit distance between two strings.
/// Uses two-row DP over at most max_len characters per string.
fn levenshteinDistance(a: []const u8, b: []const u8) usize {
const max_len = 128;
var prev: [max_len + 1]usize = undefined;
var curr: [max_len + 1]usize = undefined;
const a_len = @min(a.len, max_len);
const b_len = @min(b.len, max_len);

for (0..b_len + 1) |j| prev[j] = j;
for (0..a_len) |i| {
curr[0] = i + 1;
for (0..b_len) |j| {
const cost: usize = if (a[i] == b[j]) 0 else 1;
curr[j + 1] = @min(curr[j] + 1, @min(prev[j + 1] + 1, prev[j] + cost));
}
@memcpy(prev[0..b_len + 1], curr[0..b_len + 1]);
}
return prev[b_len];
}

/// Return column names of table `t` via PRAGMA table_info.
/// Caller owns the returned slice; free each element and the slice with allocator.
/// Returns empty slice on PRAGMA failure.
fn getTableColumns(allocator: std.mem.Allocator, db: *c.sqlite3) ![][]const u8 {
var stmt: ?*c.sqlite3_stmt = null;
if (c.sqlite3_prepare_v2(db, "PRAGMA table_info(t)", -1, &stmt, null) != c.SQLITE_OK)
return &.{};
defer _ = c.sqlite3_finalize(stmt);

var cols = std.ArrayList([]const u8).empty;
errdefer {
for (cols.items) |col| allocator.free(col);
cols.deinit(allocator);
}

while (c.sqlite3_step(stmt) == c.SQLITE_ROW) {
// PRAGMA table_info columns: cid(0), name(1), type(2), notnull(3), dflt_value(4), pk(5)
const ptr = c.sqlite3_column_text(stmt, 1);
if (ptr == null) continue;
const name = std.mem.span(@as([*:0]const u8, @ptrCast(ptr)));
const owned = try allocator.dupe(u8, name);
errdefer allocator.free(owned);
try cols.append(allocator, owned);
}

return cols.toOwnedSlice(allocator);
}

/// Print column context to writer after a SQL error.
/// Prints " table \"t\" has columns: ..." and optionally " hint: did you mean \"<col>\"?"
/// when the error message matches "no such column: <name>" and a column exists within edit distance 2.
/// Silently returns on any failure (PRAGMA unavailable, OOM, writer error).
fn printSqlErrorContext(
allocator: std.mem.Allocator,
db: *c.sqlite3,
errmsg: []const u8,
writer: *std.Io.Writer,
) void {
const columns = getTableColumns(allocator, db) catch return;
defer {
for (columns) |col| allocator.free(col);
allocator.free(columns);
}
if (columns.len == 0) return;

writer.writeAll(" table \"t\" has columns: ") catch return;
for (columns, 0..) |col, i| {
if (i > 0) writer.writeAll(", ") catch return;
writer.writeAll(col) catch return;
}
writer.writeByte('\n') catch return;

// Suggest the closest column when the error is "no such column: <name>"
const no_such_col = "no such column: ";
if (std.mem.find(u8, errmsg, no_such_col)) |start| {
const missing = errmsg[start + no_such_col.len ..];
var best_col: ?[]const u8 = null;
var best_dist: usize = std.math.maxInt(usize);
for (columns) |col| {
const dist = levenshteinDistance(missing, col);
if (dist < best_dist) {
best_dist = dist;
best_col = col;
}
}
if (best_dist <= 2) {
if (best_col) |col| {
writer.print(" hint: did you mean \"{s}\"?\n", .{col}) catch return;
}
}
}
}

// ─── Entry point ──────────────────────────────────────

/// fatal(writer, code, comptime fmt, args) → noreturn
Expand All @@ -765,6 +861,23 @@ fn fatal(comptime fmt: []const u8, writer: *std.Io.Writer, code: ExitCode, args:
std.process.exit(@intFromEnum(code));
}

/// Print SQL error message with column context then exit with sql_error code.
/// Pre: errmsg is the SQLite error string; db has table `t` (or PRAGMA silently fails)
/// Post: stderr has "error: <msg>\n" + optional column list + optional hint; process exits 3
fn fatalSqlWithContext(
allocator: std.mem.Allocator,
db: *c.sqlite3,
errmsg: []const u8,
writer: *std.Io.Writer,
) noreturn {
writer.print("error: {s}\n", .{errmsg}) catch |err| {
std.log.err("failed to write error message: {}", .{err});
};
printSqlErrorContext(allocator, db, errmsg, writer);
writer.flush() catch |err| std.log.err("failed to flush: {}", .{err});
std.process.exit(@intFromEnum(ExitCode.sql_error));
}

pub fn main(init: std.process.Init.Minimal) void {
var gpa: std.heap.DebugAllocator(.{}) = .init;
defer _ = gpa.deinit();
Expand All @@ -788,7 +901,7 @@ pub fn main(init: std.process.Init.Minimal) void {
const args_result = parseArgs(args) catch |err| {
switch (err) {
error.IncompatibleFlags => {
stderr_writer.writeAll("error: --json cannot be combined with --delimiter, --tsv, or --header\n") catch |werr| {
stderr_writer.writeAll("error: --json cannot be combined with --header\n") catch |werr| {
std.log.err("failed to write error message: {}", .{werr});
};
stderr_writer.flush() catch |ferr| std.log.err("failed to flush: {}", .{ferr});
Expand Down Expand Up @@ -935,13 +1048,13 @@ fn run(
var errmsg: [*c]u8 = null;
if (c.sqlite3_exec(db, "BEGIN TRANSACTION", null, null, &errmsg) != c.SQLITE_OK) {
const msg = if (errmsg != null) std.mem.span(errmsg) else std.mem.span(c.sqlite3_errmsg(db));
fatal("{s}", stderr_writer, .sql_error, .{msg});
fatalSqlWithContext(allocator, db, msg, stderr_writer);
}
}
// {A6: an active transaction is open on db}

const stmt = prepareInsert(allocator, db, num_cols) catch
fatal("{s}", stderr_writer, .sql_error, .{std.mem.span(c.sqlite3_errmsg(db))});
fatalSqlWithContext(allocator, db, std.mem.span(c.sqlite3_errmsg(db)), stderr_writer);
defer _ = c.sqlite3_finalize(stmt);

// Insert buffered rows
Expand All @@ -954,7 +1067,7 @@ fn run(
}
}
insertRowTyped(stmt, db, row, types, @intCast(num_cols)) catch
fatal("{s}", stderr_writer, .sql_error, .{std.mem.span(c.sqlite3_errmsg(db))});
fatalSqlWithContext(allocator, db, std.mem.span(c.sqlite3_errmsg(db)), stderr_writer);
}
// {A7: all buffered rows are in t}

Expand Down Expand Up @@ -986,7 +1099,7 @@ fn run(
}
}
insertRowTyped(stmt, db, record, types, @intCast(num_cols)) catch
fatal("{s}", stderr_writer, .sql_error, .{std.mem.span(c.sqlite3_errmsg(db))});
fatalSqlWithContext(allocator, db, std.mem.span(c.sqlite3_errmsg(db)), stderr_writer);
}
// {A8: all stdin CSV rows are inserted into t; transaction is still active}

Expand All @@ -995,21 +1108,14 @@ fn run(
const rc = c.sqlite3_exec(db, "COMMIT", null, null, &errmsg);
if (rc != c.SQLITE_OK) {
const msg = if (errmsg != null) std.mem.span(errmsg) else std.mem.span(c.sqlite3_errmsg(db));
fatal("{s}", stderr_writer, .sql_error, .{msg});
fatalSqlWithContext(allocator, db, msg, stderr_writer);
}
if (errmsg != null) c.sqlite3_free(errmsg);
}
// {A9: transaction committed; t holds all input rows, no active transaction}

execQuery(allocator, db, query, stdout_writer, parsed.header, parsed.json) catch |err| {
switch (err) {
error.PrepareQueryFailed => {
fatal("{s}", stderr_writer, .sql_error, .{std.mem.span(c.sqlite3_errmsg(db))});
},
else => {
fatal("{s}", stderr_writer, .sql_error, .{std.mem.span(c.sqlite3_errmsg(db))});
},
}
execQuery(allocator, db, query, stdout_writer, parsed.header, parsed.json) catch {
fatalSqlWithContext(allocator, db, std.mem.span(c.sqlite3_errmsg(db)), stderr_writer);
};
// {A10: all result rows written to stdout as CSV lines}
}
Loading