diff --git a/PLAN.md b/PLAN.md new file mode 100644 index 0000000..43e6db6 --- /dev/null +++ b/PLAN.md @@ -0,0 +1,71 @@ +# SQLiteView 書き込み対応計画 + +## Context + +SQLiteViewは現在、読み取り専用のSQLiteビューア(SELECT/WITH/PRAGMAのみ許可)。 +これをINSERT/UPDATE/DELETE/CREATE/DROP等の書き込みクエリも実行可能なSQLクライアントに拡張する。 + +## 変更対象ファイル + +- `src/sqliteviewer/database.py` — DB層: 読み取り制限の解除、クエリ分類、結果型の拡張 +- `src/sqliteviewer/mainwindow.py` — UI層: 書き込み結果の表示、確認ダイアログ、自動リフレッシュ +- `src/sqliteviewer/sql_highlighter.py` — ハイライト: 書き込み系キーワード追加 +- `tests/test_database.py` — テスト: 書き込み系テストの追加 + +## 実装ステップ + +### Step 1: database.py — DB層の変更(完了) + +- [x] `QueryResult` に `affected_rows: Optional[int]` と `is_write_operation: bool` を追加 +- [x] `_classify_query()` を追加 — `read`/`dml`/`ddl`/`tcl`/`unknown` を返す +- [x] `is_destructive_query()` を追加 — DROP・WHERE無しDELETEを検出し `(bool, reason)` を返す +- [x] `execute_query()` を書き換え — `_assert_read_only()` 削除、`cursor.description is None` で書き込み判定 +- [x] `_assert_read_only()` を削除 +- [x] docstring `"read-only SQLite interactions"` → `"SQLite database interactions"` に更新 + +> 接続設定(`isolation_level=None`)は変更なし。ユーザーが明示的に BEGIN/COMMIT/ROLLBACK でトランザクション制御可能。 + +### Step 2: tests/test_database.py — テスト更新(完了) + +- [x] `test_execute_query_restricts_writes` → `test_execute_query_allows_writes` に置換(UPDATE実行→affected_rows確認→SELECT で結果検証) +- [x] INSERT のテストを追加 +- [x] DELETE のテストを追加 +- [x] DDL(CREATE TABLE / DROP TABLE)のテストを追加 +- [x] トランザクション(BEGIN→INSERT→ROLLBACK)のテストを追加 +- [x] クエリ分類(`_classify_query`)のテストを追加 +- [x] 破壊的操作検出(`is_destructive_query`)のテストを追加 + +> 10テストすべて pass 確認済み。 + +### Step 3: sql_highlighter.py — キーワード追加(完了) + +- [x] DML キーワードを追加: INSERT, UPDATE, DELETE, SET, VALUES, INTO, REPLACE +- [x] DDL キーワードを追加: CREATE, ALTER, DROP, TABLE, VIEW, INDEX, TRIGGER, COLUMN, ADD, RENAME, IF, PRIMARY, KEY, UNIQUE, CHECK, DEFAULT, FOREIGN, REFERENCES, CONSTRAINT, AUTOINCREMENT +- [x] TCL キーワードを追加: BEGIN, COMMIT, ROLLBACK, TRANSACTION, SAVEPOINT, RELEASE +- [x] その他を追加: BETWEEN, EXCEPT, INTERSECT, ELSE, EXPLAIN, VACUUM, REINDEX, ATTACH, DETACH, CASCADE, RESTRICT, CONFLICT, ABORT, FAIL, IGNORE, TEMPORARY, TEMP +- [x] 型名を追加: INTEGER, TEXT, REAL, BLOB, NUMERIC + +### Step 4: mainwindow.py — UI層の変更(完了) + +- [x] プレースホルダー更新: `"Write a read-only SQL query…"` → `"Write a SQL statement…"` +- [x] `_run_query()` を書き換え — 破壊的クエリの確認ダイアログ(`QMessageBox.warning`)追加 +- [x] 書き込み結果の表示: `"{N} row(s) affected"` または `"Statement executed successfully"` +- [x] `_refresh_after_write()` を追加 — DDL後はテーブルリスト・プレビュー・スキーマをリフレッシュ、DML後は選択中テーブルのプレビューをリフレッシュ +- [x] `_refresh_tables()` を改善 — リフレッシュ後に以前の選択テーブルを復元 +- [x] About ダイアログのバージョンを `0.1.0` → `0.2.1` に修正 + +## 検証方法 + +```bash +# ユニットテスト実行 +uv run python -m pytest tests/ -v + +# 手動テスト(GUIが使える場合) +uv run sqliteview test.db +# → INSERT/UPDATE/DELETE/CREATE TABLE/DROP TABLE/BEGIN/ROLLBACK を実行し動作確認 +``` + +## 現状 + +**実装完了。** 全ステップの実装・テストが完了しており、10テストすべて pass。 +SQLiteView は INSERT/UPDATE/DELETE/CREATE/DROP/BEGIN/COMMIT/ROLLBACK 等の書き込みクエリを実行可能な SQL クライアントとなった。 diff --git a/scripts/build_deb.sh b/scripts/build_deb.sh index c9a4fbf..5bcf065 100755 --- a/scripts/build_deb.sh +++ b/scripts/build_deb.sh @@ -4,8 +4,8 @@ set -euo pipefail ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" DIST_DIR="$ROOT_DIR/dist" BUILD_DIR="$DIST_DIR/deb_build" -PACKAGE="sqliteviewer" -VERSION="0.1.0" +PACKAGE="sqliteview" +VERSION="0.2.1" ARCH="all" rm -rf "$BUILD_DIR" @@ -38,27 +38,27 @@ with zipfile.ZipFile(wheel, 'r') as archive: archive.extractall(destination) PY -cat <<'EOF_CONTROL' > "$BUILD_DIR/DEBIAN/control" -Package: sqliteviewer -Version: 0.1.0 +cat < "$BUILD_DIR/DEBIAN/control" +Package: $PACKAGE +Version: $VERSION Section: utils Priority: optional Architecture: all Maintainer: SQLite Viewer Team Depends: python3 (>= 3.10) -Description: PyQt6-based SQLite database viewer for Ubuntu. - SQLite viewer provides a desktop UI for browsing tables, running +Description: PyQt6-based SQLite database client for Ubuntu. + SQLiteView provides a desktop UI for browsing tables, running ad-hoc queries, and exporting results. Packaged with its Python dependencies. EOF_CONTROL -cat <<'EOF_EXEC' > "$BUILD_DIR/usr/bin/sqliteviewer" +cat < "$BUILD_DIR/usr/bin/$PACKAGE" #!/usr/bin/env bash set -euo pipefail -SCRIPT_DIR="/usr/lib/sqliteviewer" -exec python3 "$SCRIPT_DIR/sqliteviewer/__main__.py" "$@" +SCRIPT_DIR="/usr/lib/$PACKAGE" +exec python3 "\$SCRIPT_DIR/sqliteviewer/__main__.py" "\$@" EOF_EXEC -chmod +x "$BUILD_DIR/usr/bin/sqliteviewer" +chmod +x "$BUILD_DIR/usr/bin/$PACKAGE" install -m 644 "$ROOT_DIR/src/sqliteviewer/resources/sqliteviewer.desktop" "$BUILD_DIR/usr/share/applications/sqliteviewer.desktop" install -m 644 "$ROOT_DIR/src/sqliteviewer/resources/icon.png" "$BUILD_DIR/usr/share/icons/hicolor/256x256/apps/sqliteviewer.png" diff --git a/src/sqliteviewer/database.py b/src/sqliteviewer/database.py index 4177f26..92b0a1b 100644 --- a/src/sqliteviewer/database.py +++ b/src/sqliteviewer/database.py @@ -1,10 +1,10 @@ -"""Database access layer for the SQLite viewer.""" +"""Database access layer for SQLite database interactions.""" from __future__ import annotations -from dataclasses import dataclass +from dataclasses import dataclass, field from pathlib import Path -from typing import Iterable, List, Optional, Sequence +from typing import Iterable, List, Optional, Sequence, Tuple import sqlite3 @@ -12,6 +12,11 @@ DEFAULT_ROW_LIMIT = 200 QUERY_ROW_LIMIT = 1000 +_READ_KEYWORDS = {"SELECT", "WITH", "PRAGMA", "EXPLAIN"} +_DML_KEYWORDS = {"INSERT", "UPDATE", "DELETE", "REPLACE"} +_DDL_KEYWORDS = {"CREATE", "ALTER", "DROP"} +_TCL_KEYWORDS = {"BEGIN", "COMMIT", "ROLLBACK", "SAVEPOINT", "RELEASE"} + class DatabaseError(RuntimeError): """Raised when a database operation fails.""" @@ -25,10 +30,12 @@ class QueryResult: rows: List[Sequence[object]] truncated: bool = False row_count: Optional[int] = None + affected_rows: Optional[int] = None + is_write_operation: bool = False class DatabaseService: - """High-level helper for read-only SQLite interactions.""" + """High-level helper for SQLite database interactions.""" def __init__(self) -> None: self._connection: Optional[sqlite3.Connection] = None @@ -107,26 +114,98 @@ def get_table_schema(self, table_name: str) -> str: return rows[0][0] def execute_query(self, sql: str, limit: int = QUERY_ROW_LIMIT) -> QueryResult: - """Execute a read-only SQL query and return results.""" + """Execute a SQL statement and return results.""" self._ensure_connection() sql = sql.strip() if not sql: raise DatabaseError("Query is empty.") - self._assert_read_only(sql) - try: cursor = self._connection.execute(sql) - columns = [description[0] for description in cursor.description or []] - rows = cursor.fetchmany(limit + 1) except sqlite3.Error as exc: raise DatabaseError(f"Failed to execute query: {exc}") from exc + if cursor.description is None: + # Write operation (INSERT/UPDATE/DELETE/DDL/TCL) + affected = cursor.rowcount if cursor.rowcount >= 0 else None + return QueryResult( + columns=[], + rows=[], + affected_rows=affected, + is_write_operation=True, + ) + + columns = [description[0] for description in cursor.description] + rows = cursor.fetchmany(limit + 1) truncated = len(rows) > limit trimmed_rows = [tuple(row) for row in rows[:limit]] return QueryResult(columns=columns, rows=trimmed_rows, truncated=truncated) + def classify_query(self, sql: str) -> str: + """Classify a SQL statement as read/dml/ddl/tcl/unknown.""" + + keyword = self._extract_first_keyword(sql) + if keyword is None: + return "unknown" + if keyword in _READ_KEYWORDS: + return "read" + if keyword in _DML_KEYWORDS: + return "dml" + if keyword in _DDL_KEYWORDS: + return "ddl" + if keyword in _TCL_KEYWORDS: + return "tcl" + return "unknown" + + def is_destructive_query(self, sql: str) -> Tuple[bool, str]: + """Detect potentially destructive queries. + + Returns (is_destructive, reason). Checks for: + - DROP statements + - DELETE without a WHERE clause + """ + + keyword = self._extract_first_keyword(sql) + if keyword == "DROP": + return True, "This will permanently drop the object." + if keyword == "DELETE": + stripped = self._strip_sql_comments(sql).upper() + if "WHERE" not in stripped: + return True, "DELETE without WHERE will remove all rows." + return False, "" + + def _strip_sql_comments(self, sql: str) -> str: + """Remove SQL comments (-- and /* */) from a statement.""" + + result = [] + i = 0 + length = len(sql) + while i < length: + if sql[i] == '-' and i + 1 < length and sql[i + 1] == '-': + newline = sql.find('\n', i + 2) + i = length if newline == -1 else newline + 1 + elif sql[i] == '/' and i + 1 < length and sql[i + 1] == '*': + end = sql.find('*/', i + 2) + i = length if end == -1 else end + 2 + elif sql[i] == "'": + result.append(sql[i]) + i += 1 + while i < length: + result.append(sql[i]) + if sql[i] == "'" and (i + 1 >= length or sql[i + 1] != "'"): + i += 1 + break + if sql[i] == "'" and i + 1 < length and sql[i + 1] == "'": + result.append(sql[i + 1]) + i += 2 + else: + i += 1 + else: + result.append(sql[i]) + i += 1 + return ''.join(result) + def _get_table_row_count(self, table_name: str) -> Optional[int]: """Return row count for table; failure returns None.""" @@ -156,13 +235,6 @@ def _quote_identifier(self, identifier: str) -> str: raise DatabaseError("Identifier cannot be empty.") return '"' + identifier.replace('"', '""') + '"' - def _assert_read_only(self, sql: str) -> None: - keyword = self._extract_first_keyword(sql) - if keyword is None: - raise DatabaseError("Unable to determine query type.") - if keyword not in {"SELECT", "WITH", "PRAGMA"}: - raise DatabaseError("Only read-only queries (SELECT/WITH/PRAGMA) are allowed.") - def _extract_first_keyword(self, sql: str) -> Optional[str]: index = 0 length = len(sql) diff --git a/src/sqliteviewer/mainwindow.py b/src/sqliteviewer/mainwindow.py index 9487fb8..e9f8c89 100644 --- a/src/sqliteviewer/mainwindow.py +++ b/src/sqliteviewer/mainwindow.py @@ -63,7 +63,7 @@ def __init__(self) -> None: self.schema_view.setLineWrapMode(QTextEdit.LineWrapMode.NoWrap) self.query_editor = QPlainTextEdit() - self.query_editor.setPlaceholderText("Write a read-only SQL query…") + self.query_editor.setPlaceholderText("Write a SQL statement…") self.query_editor.setTabStopDistance(4 * self.query_editor.fontMetrics().horizontalAdvance(' ')) SqlHighlighter(self.query_editor.document()) @@ -182,6 +182,10 @@ def open_database(self, path: str) -> None: self._remember_recent_file(path) def _refresh_tables(self) -> None: + # Remember currently selected table before clearing + selected_items = self.table_list.selectedItems() + previously_selected = selected_items[0].text() if selected_items else None + self.table_list.clear() try: @@ -197,7 +201,12 @@ def _refresh_tables(self) -> None: for table in tables: QListWidgetItem(table, self.table_list) - self.table_list.setCurrentRow(0) + # Restore selection if the table still exists, otherwise select first row + if previously_selected and previously_selected in tables: + idx = tables.index(previously_selected) + self.table_list.setCurrentRow(idx) + else: + self.table_list.setCurrentRow(0) def _close_database(self) -> None: self.database_service.close() @@ -251,19 +260,64 @@ def _populate_table(self, view: QTableView, result: QueryResult) -> None: def _run_query(self) -> None: query = self.query_editor.toPlainText() + + is_destructive, reason = self.database_service.is_destructive_query(query) + if is_destructive: + reply = QMessageBox.warning( + self, + "Potentially destructive operation", + f"{reason}\n\nDo you want to proceed?", + QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No, + QMessageBox.StandardButton.No, + ) + if reply != QMessageBox.StandardButton.Yes: + return + try: result = self.database_service.execute_query(query) except DatabaseError as exc: QMessageBox.critical(self, "Query failed", str(exc)) return - self.query_result = result - self._populate_table(self.query_result_view, result) - status = f"Returned {len(result.rows)} row(s)" - if result.truncated: - status += " (truncated)" - self.query_status_label.setText(status) - self.status_bar.showMessage("Query executed successfully.", 4000) + if result.is_write_operation: + self.query_result = None + self.query_result_view.setModel(None) + if result.affected_rows is not None: + status = f"{result.affected_rows} row(s) affected" + else: + status = "Statement executed successfully" + self.query_status_label.setText(status) + self.status_bar.showMessage("Statement executed successfully.", 4000) + self._refresh_after_write(query) + else: + self.query_result = result + self._populate_table(self.query_result_view, result) + status = f"Returned {len(result.rows)} row(s)" + if result.truncated: + status += " (truncated)" + self.query_status_label.setText(status) + self.status_bar.showMessage("Query executed successfully.", 4000) + + def _refresh_after_write(self, sql: str) -> None: + """Refresh UI panels after a write operation.""" + + query_type = self.database_service.classify_query(sql) + + if query_type == "ddl": + # Table list may have changed; also refresh preview and schema + self._refresh_tables() + selected_items = self.table_list.selectedItems() + if selected_items: + table_name = selected_items[0].text() + self._load_table_preview(table_name) + self._load_table_schema(table_name) + elif query_type == "dml": + # Refresh preview only if the affected table is currently selected + selected_items = self.table_list.selectedItems() + if selected_items: + table_name = selected_items[0].text() + if table_name.lower() in sql.lower(): + self._load_table_preview(table_name) def _export_results(self) -> None: if not self.query_result or not self.query_result.columns: @@ -289,8 +343,8 @@ def _show_about_dialog(self) -> None: QMessageBox.about( self, "About SQLite Viewer", - "SQLite Viewer
Version 0.1.0

" - "A lightweight desktop viewer for SQLite databases.", + "SQLite Viewer
Version 0.2.1

" + "A lightweight desktop client for SQLite databases.", ) def _load_recent_files(self) -> None: diff --git a/src/sqliteviewer/sql_highlighter.py b/src/sqliteviewer/sql_highlighter.py index c317111..5327d32 100644 --- a/src/sqliteviewer/sql_highlighter.py +++ b/src/sqliteviewer/sql_highlighter.py @@ -10,6 +10,7 @@ class SqlHighlighter(QSyntaxHighlighter): """Applies formatting rules to highlight SQL keywords and tokens.""" KEYWORDS = { + # Read / query "SELECT", "FROM", "WHERE", @@ -43,6 +44,66 @@ class SqlHighlighter(QSyntaxHighlighter): "LIKE", "EXISTS", "PRAGMA", + "BETWEEN", + "EXCEPT", + "INTERSECT", + "ELSE", + "EXPLAIN", + # DML + "INSERT", + "UPDATE", + "DELETE", + "SET", + "VALUES", + "INTO", + "REPLACE", + # DDL + "CREATE", + "ALTER", + "DROP", + "TABLE", + "VIEW", + "INDEX", + "TRIGGER", + "COLUMN", + "ADD", + "RENAME", + "IF", + "PRIMARY", + "KEY", + "UNIQUE", + "CHECK", + "DEFAULT", + "FOREIGN", + "REFERENCES", + "CONSTRAINT", + "AUTOINCREMENT", + # TCL + "BEGIN", + "COMMIT", + "ROLLBACK", + "TRANSACTION", + "SAVEPOINT", + "RELEASE", + # Maintenance + "VACUUM", + "REINDEX", + "ATTACH", + "DETACH", + "CASCADE", + "RESTRICT", + "CONFLICT", + "ABORT", + "FAIL", + "IGNORE", + "TEMPORARY", + "TEMP", + # Type names + "INTEGER", + "TEXT", + "REAL", + "BLOB", + "NUMERIC", } def __init__(self, document) -> None: diff --git a/tests/test_database.py b/tests/test_database.py index 6dac7c3..06e2cc8 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -47,12 +47,80 @@ def test_table_preview_truncates(self) -> None: self.assertEqual(result.row_count, 3) self.assertEqual(result.columns, ["id", "name", "age"]) - def test_execute_query_restricts_writes(self) -> None: - with self.assertRaises(DatabaseError): - self.service.execute_query("UPDATE users SET age = age + 1") + def test_execute_query_allows_writes(self) -> None: + result = self.service.execute_query("UPDATE users SET age = age + 1 WHERE name = 'Alice'") + self.assertTrue(result.is_write_operation) + self.assertEqual(result.affected_rows, 1) - result = self.service.execute_query("SELECT name FROM users ORDER BY id DESC") - self.assertEqual([row[0] for row in result.rows], ["Carol", "Bob", "Alice"]) + select = self.service.execute_query("SELECT age FROM users WHERE name = 'Alice'") + self.assertEqual(select.rows[0][0], 31) + + def test_execute_query_insert(self) -> None: + result = self.service.execute_query("INSERT INTO users (name, age) VALUES ('Dave', 20)") + self.assertTrue(result.is_write_operation) + self.assertEqual(result.affected_rows, 1) + + count = self.service.execute_query("SELECT COUNT(*) FROM users") + self.assertEqual(count.rows[0][0], 4) + + def test_execute_query_delete(self) -> None: + result = self.service.execute_query("DELETE FROM users WHERE name = 'Bob'") + self.assertTrue(result.is_write_operation) + self.assertEqual(result.affected_rows, 1) + + count = self.service.execute_query("SELECT COUNT(*) FROM users") + self.assertEqual(count.rows[0][0], 2) + + def test_execute_query_ddl_create_drop(self) -> None: + create = self.service.execute_query( + "CREATE TABLE products (id INTEGER PRIMARY KEY, label TEXT)" + ) + self.assertTrue(create.is_write_operation) + self.assertIn("products", self.service.list_tables()) + + drop = self.service.execute_query("DROP TABLE products") + self.assertTrue(drop.is_write_operation) + self.assertNotIn("products", self.service.list_tables()) + + def test_execute_query_transaction_rollback(self) -> None: + self.service.execute_query("BEGIN") + self.service.execute_query("INSERT INTO users (name, age) VALUES ('Eve', 22)") + self.service.execute_query("ROLLBACK") + + count = self.service.execute_query("SELECT COUNT(*) FROM users") + self.assertEqual(count.rows[0][0], 3) + + def test_classify_query(self) -> None: + self.assertEqual(self.service.classify_query("SELECT 1"), "read") + self.assertEqual(self.service.classify_query("WITH cte AS (SELECT 1) SELECT * FROM cte"), "read") + self.assertEqual(self.service.classify_query("PRAGMA table_info(users)"), "read") + self.assertEqual(self.service.classify_query("EXPLAIN SELECT 1"), "read") + self.assertEqual(self.service.classify_query("INSERT INTO users VALUES (1,'a',1)"), "dml") + self.assertEqual(self.service.classify_query("UPDATE users SET age=1"), "dml") + self.assertEqual(self.service.classify_query("DELETE FROM users"), "dml") + self.assertEqual(self.service.classify_query("REPLACE INTO users VALUES (1,'a',1)"), "dml") + self.assertEqual(self.service.classify_query("CREATE TABLE t (id INTEGER)"), "ddl") + self.assertEqual(self.service.classify_query("ALTER TABLE users ADD COLUMN email TEXT"), "ddl") + self.assertEqual(self.service.classify_query("DROP TABLE users"), "ddl") + self.assertEqual(self.service.classify_query("BEGIN"), "tcl") + self.assertEqual(self.service.classify_query("COMMIT"), "tcl") + self.assertEqual(self.service.classify_query("ROLLBACK"), "tcl") + self.assertEqual(self.service.classify_query("-- comment\nSELECT 1"), "read") + + def test_is_destructive_query(self) -> None: + is_d, reason = self.service.is_destructive_query("DROP TABLE users") + self.assertTrue(is_d) + self.assertIn("drop", reason.lower()) + + is_d, reason = self.service.is_destructive_query("DELETE FROM users") + self.assertTrue(is_d) + self.assertIn("WHERE", reason) + + is_d, _ = self.service.is_destructive_query("DELETE FROM users WHERE id = 1") + self.assertFalse(is_d) + + is_d, _ = self.service.is_destructive_query("SELECT * FROM users") + self.assertFalse(is_d) def test_schema_contains_create_statement(self) -> None: schema = self.service.get_table_schema("users")