From 500f2d83adc6bb26b767a4a13179defc1484bf72 Mon Sep 17 00:00:00 2001 From: tanmay-9 Date: Fri, 10 Apr 2026 11:23:44 +0200 Subject: [PATCH 01/11] Add first version of monitor-queries command --- src/qlever/commands/benchmark_queries.py | 31 +----- src/qlever/commands/monitor_queries.py | 121 +++++++++++++++++++++++ src/qlever/util.py | 30 ++++++ 3 files changed, 152 insertions(+), 30 deletions(-) create mode 100644 src/qlever/commands/monitor_queries.py diff --git a/src/qlever/commands/benchmark_queries.py b/src/qlever/commands/benchmark_queries.py index 05a3647d..f21b306c 100644 --- a/src/qlever/commands/benchmark_queries.py +++ b/src/qlever/commands/benchmark_queries.py @@ -20,37 +20,8 @@ from qlever.command import QleverCommand from qlever.commands.clear_cache import ClearCacheCommand from qlever.commands.ui import dict_to_yaml -from qlever.containerize import Containerize from qlever.log import log, mute_log -from qlever.util import run_command, run_curl_command - - -def pretty_printed_query( - query: str, show_prefixes: bool, system: str = "docker" -) -> str: - """ - Pretty-print a SPARQL query using the sparql-formatter Docker image. - Optionally strips PREFIX declarations from the output. - Argument `system` can either be docker or podman. - """ - if system not in Containerize.supported_systems(): - system = "docker" - remove_prefixes_cmd = " | sed '/^PREFIX /Id'" if not show_prefixes else "" - pretty_print_query_cmd = ( - f"echo {shlex.quote(query)}" - f" | {system} run -i --rm docker.io/sparqling/sparql-formatter" - f"{remove_prefixes_cmd} | grep -v '^$'" - ) - try: - query_pretty_printed = run_command( - pretty_print_query_cmd, return_output=True - ) - return query_pretty_printed.rstrip() - except Exception as e: - log.debug( - f"Failed to pretty-print query, returning original query: {e}" - ) - return query.rstrip() +from qlever.util import pretty_printed_query, run_command, run_curl_command def sparql_query_type(query: str) -> str: diff --git a/src/qlever/commands/monitor_queries.py b/src/qlever/commands/monitor_queries.py new file mode 100644 index 00000000..354a78c3 --- /dev/null +++ b/src/qlever/commands/monitor_queries.py @@ -0,0 +1,121 @@ +from __future__ import annotations + +import json +import subprocess +import textwrap + +from qlever.command import QleverCommand +from qlever.log import log +from qlever.util import pretty_printed_query + + +class MonitorQueriesCommand(QleverCommand): + """ + Class for executing the `monitor-queries` command. + """ + + def __init__(self): + pass + + def description(self) -> str: + return "Show the currently active queries on the server" + + def should_have_qleverfile(self) -> bool: + return False + + def relevant_qleverfile_arguments(self) -> dict[str, list[str]]: + return { + "server": ["access_token", "host_name", "port"], + "runtime": ["system"], + } + + def additional_arguments(self, subparser) -> None: + subparser.add_argument( + "--sparql-endpoint", + help="URL of the SPARQL endpoint, default is {host_name}:{port}", + ) + subparser.add_argument( + "--detailed", + action="store_true", + default=False, + help="Show the full SPARQL text for each active query", + ) + subparser.add_argument( + "--query-id", + help="Show the full SPARQL text for a specific query," + " either by its index (#) or server query ID", + ) + + def execute(self, args) -> bool: + sparql_endpoint = ( + args.sparql_endpoint + if args.sparql_endpoint + else f"{args.host_name}:{args.port}" + ) + monitor_queries_cmd = ( + f'curl -s {sparql_endpoint} --data-urlencode "cmd=dump-active-queries" ' + f'--data-urlencode access-token="{args.access_token}"' + ) + + # Show them. + self.show(monitor_queries_cmd, only_show=args.show) + if args.show: + return True + + # Execute them. + try: + monitored_queries = subprocess.check_output( + monitor_queries_cmd, shell=True + ) + monitored_queries_dict = json.loads(monitored_queries) + except Exception as e: + log.error(f"Failed to get active queries: {e}") + return False + + if not monitored_queries_dict: + log.info("No active queries on the server") + return True + + queries = list(monitored_queries_dict.items()) + + # Show the full SPARQL for a specific query. + if args.query_id: + # Try as a table index first, then as a server query ID. + try: + idx = int(args.query_id) + if 1 <= idx <= len(queries): + sparql_query = queries[idx - 1][1] + else: + sparql_query = None + except ValueError: + sparql_query = monitored_queries_dict.get(args.query_id) + if not sparql_query: + log.error("No active query found for the given ID") + return False + log.info(pretty_printed_query(sparql_query, False, args.system)) + return True + + # Table header. + col_index = 3 + col_qid = max(len(qid) for qid, _ in queries) + indent = " " * (2 + col_index + 2 + col_qid + 2) + log.info(f" {'#':<{col_index}} {'Query ID':<{col_qid}} SPARQL") + + for i, (qid, sparql) in enumerate(queries, 1): + if args.detailed: + wrapped = textwrap.fill( + sparql, + width=100, + initial_indent="", + subsequent_indent=indent, + ) + log.info(f" {i:<{col_index}} {qid:<{col_qid}} {wrapped}") + else: + short_sparql = ( + sparql[:80] + "..." if len(sparql) > 80 else sparql + ) + log.info( + f" {i:<{col_index}} {qid:<{col_qid}} {short_sparql}" + ) + + return True diff --git a/src/qlever/util.py b/src/qlever/util.py index 848549bd..e1cecf31 100644 --- a/src/qlever/util.py +++ b/src/qlever/util.py @@ -270,6 +270,36 @@ def format_size(bytes, suffix="B"): bytes /= factor +def pretty_printed_query( + query: str, show_prefixes: bool, system: str = "docker" +) -> str: + """ + Pretty-print a SPARQL query using the sparql-formatter Docker image. + Optionally strips PREFIX declarations from the output. + Argument `system` can either be docker or podman. + """ + from qlever.containerize import Containerize + + if system not in Containerize.supported_systems(): + system = "docker" + remove_prefixes_cmd = " | sed '/^PREFIX /Id'" if not show_prefixes else "" + pretty_print_query_cmd = ( + f"echo {shlex.quote(query)}" + f" | {system} run -i --rm docker.io/sparqling/sparql-formatter" + f"{remove_prefixes_cmd} | grep -v '^$'" + ) + try: + query_pretty_printed = run_command( + pretty_print_query_cmd, return_output=True + ) + return query_pretty_printed.rstrip() + except Exception as e: + log.debug( + f"Failed to pretty-print query, returning original query: {e}" + ) + return query.rstrip() + + def stop_process(proc: psutil.Process, pinfo: dict[str, Any]) -> bool: """ Try to kill the given process, return True iff it was killed From 666835f204567141677b6be8ad0c16086dfd1e0b Mon Sep 17 00:00:00 2001 From: Hannah Bast Date: Thu, 16 Apr 2026 10:50:08 +0200 Subject: [PATCH 02/11] Replace all whitespace by single space --- src/qlever/commands/monitor_queries.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/qlever/commands/monitor_queries.py b/src/qlever/commands/monitor_queries.py index 354a78c3..29e45391 100644 --- a/src/qlever/commands/monitor_queries.py +++ b/src/qlever/commands/monitor_queries.py @@ -1,6 +1,7 @@ from __future__ import annotations import json +import re import subprocess import textwrap @@ -102,9 +103,11 @@ def execute(self, args) -> bool: log.info(f" {'#':<{col_index}} {'Query ID':<{col_qid}} SPARQL") for i, (qid, sparql) in enumerate(queries, 1): + # Collapse whitespace for compact display. + sparql_oneline = re.sub(r"\s+", " ", sparql).strip() if args.detailed: wrapped = textwrap.fill( - sparql, + sparql_oneline, width=100, initial_indent="", subsequent_indent=indent, @@ -112,7 +115,9 @@ def execute(self, args) -> bool: log.info(f" {i:<{col_index}} {qid:<{col_qid}} {wrapped}") else: short_sparql = ( - sparql[:80] + "..." if len(sparql) > 80 else sparql + sparql_oneline[:80] + "..." + if len(sparql_oneline) > 80 + else sparql_oneline ) log.info( f" {i:<{col_index}} {qid:<{col_qid}} {short_sparql}" From 28997ade6c0395516141bce110a86224b66c43f6 Mon Sep 17 00:00:00 2001 From: tanmay-9 Date: Sat, 18 Apr 2026 17:25:47 +0200 Subject: [PATCH 03/11] Add --watch parameter for continuous updates --- src/qlever/commands/monitor_queries.py | 150 +++++++++++++++---------- 1 file changed, 92 insertions(+), 58 deletions(-) diff --git a/src/qlever/commands/monitor_queries.py b/src/qlever/commands/monitor_queries.py index 29e45391..aec64944 100644 --- a/src/qlever/commands/monitor_queries.py +++ b/src/qlever/commands/monitor_queries.py @@ -4,12 +4,76 @@ import re import subprocess import textwrap +import time from qlever.command import QleverCommand from qlever.log import log from qlever.util import pretty_printed_query +def render_queries(monitor_queries_cmd, args) -> bool: + try: + monitored_queries = subprocess.check_output( + monitor_queries_cmd, shell=True + ) + monitored_queries_dict = json.loads(monitored_queries) + except Exception as e: + log.error(f"Failed to get active queries: {e}") + return False + + if not monitored_queries_dict: + log.info("No active queries on the server") + return True + + queries = list(monitored_queries_dict.items()) + + # Show the full SPARQL for a specific query. + if args.query_id: + # Try as a table index first, then as a server query ID. + try: + idx = int(args.query_id) + if 1 <= idx <= len(queries): + sparql_query = queries[idx - 1][1] + else: + sparql_query = None + except ValueError: + sparql_query = monitored_queries_dict.get(args.query_id) + if not sparql_query: + log.error("No active query found for the given ID") + return False + log.info(pretty_printed_query(sparql_query, False, args.system)) + return True + + # Table header. + col_index = 3 + col_qid = max(len(qid) for qid, _ in queries) + indent = " " * (2 + col_index + 2 + col_qid + 2) + log.info(f" {'#':<{col_index}} {'Query ID':<{col_qid}} SPARQL") + + for i, (qid, sparql) in enumerate(queries, 1): + # Collapse whitespace for compact display. + sparql_oneline = re.sub(r"\s+", " ", sparql).strip() + if args.detailed: + wrapped = textwrap.fill( + sparql_oneline, + width=100, + initial_indent="", + subsequent_indent=indent, + ) + log.info(f" {i:<{col_index}} {qid:<{col_qid}} {wrapped}") + else: + short_sparql = ( + sparql_oneline[:80] + "..." + if len(sparql_oneline) > 80 + else sparql_oneline + ) + log.info( + f" {i:<{col_index}} {qid:<{col_qid}} {short_sparql}" + ) + + return True + + class MonitorQueriesCommand(QleverCommand): """ Class for executing the `monitor-queries` command. @@ -46,6 +110,20 @@ def additional_arguments(self, subparser) -> None: help="Show the full SPARQL text for a specific query," " either by its index (#) or server query ID", ) + subparser.add_argument( + "--watch", + action="store_true", + default=False, + help="Continuously refresh the list of active queries" + " until interrupted with Ctrl-C", + ) + subparser.add_argument( + "--interval", + type=float, + default=2.0, + help="Refresh interval in seconds when using --watch" + " (default: 2.0)", + ) def execute(self, args) -> bool: sparql_endpoint = ( @@ -63,64 +141,20 @@ def execute(self, args) -> bool: if args.show: return True - # Execute them. - try: - monitored_queries = subprocess.check_output( - monitor_queries_cmd, shell=True - ) - monitored_queries_dict = json.loads(monitored_queries) - except Exception as e: - log.error(f"Failed to get active queries: {e}") + if args.watch and args.interval < 0.5: + log.error("--interval must be at least 0.5 seconds") + return False + if args.watch and args.query_id: + log.error("--watch cannot be combined with --query-id") return False - if not monitored_queries_dict: - log.info("No active queries on the server") - return True - - queries = list(monitored_queries_dict.items()) - - # Show the full SPARQL for a specific query. - if args.query_id: - # Try as a table index first, then as a server query ID. + if args.watch: try: - idx = int(args.query_id) - if 1 <= idx <= len(queries): - sparql_query = queries[idx - 1][1] - else: - sparql_query = None - except ValueError: - sparql_query = monitored_queries_dict.get(args.query_id) - if not sparql_query: - log.error("No active query found for the given ID") - return False - log.info(pretty_printed_query(sparql_query, False, args.system)) - return True - - # Table header. - col_index = 3 - col_qid = max(len(qid) for qid, _ in queries) - indent = " " * (2 + col_index + 2 + col_qid + 2) - log.info(f" {'#':<{col_index}} {'Query ID':<{col_qid}} SPARQL") - - for i, (qid, sparql) in enumerate(queries, 1): - # Collapse whitespace for compact display. - sparql_oneline = re.sub(r"\s+", " ", sparql).strip() - if args.detailed: - wrapped = textwrap.fill( - sparql_oneline, - width=100, - initial_indent="", - subsequent_indent=indent, - ) - log.info(f" {i:<{col_index}} {qid:<{col_qid}} {wrapped}") - else: - short_sparql = ( - sparql_oneline[:80] + "..." - if len(sparql_oneline) > 80 - else sparql_oneline - ) - log.info( - f" {i:<{col_index}} {qid:<{col_qid}} {short_sparql}" - ) - - return True + while True: + print("\033[H\033[2J", end="", flush=True) + render_queries(monitor_queries_cmd, args) + time.sleep(args.interval) + except KeyboardInterrupt: + return True + + return render_queries(monitor_queries_cmd, args) From da0fde5f866806357ca704d5346c9ff5de042cec Mon Sep 17 00:00:00 2001 From: tanmay-9 Date: Mon, 20 Apr 2026 22:09:09 +0200 Subject: [PATCH 04/11] Add `rich` library to render a neatly-formatted live table in the terminal --- pyproject.toml | 2 +- src/qlever/commands/monitor_queries.py | 179 +++++++++++++++---------- 2 files changed, 110 insertions(+), 71 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 0ddb1066..fe8c6d41 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,7 +21,7 @@ classifiers = [ "Topic :: Database :: Front-Ends" ] -dependencies = [ "psutil", "termcolor", "argcomplete", "pyyaml", "rdflib", "requests-sse", "tqdm>=4.60.0" ] +dependencies = [ "psutil", "termcolor", "argcomplete", "pyyaml", "rdflib", "requests-sse", "rich", "tqdm>=4.60.0" ] [project.urls] homepage = "https://github.com/ad-freiburg/qlever" diff --git a/src/qlever/commands/monitor_queries.py b/src/qlever/commands/monitor_queries.py index aec64944..741982ae 100644 --- a/src/qlever/commands/monitor_queries.py +++ b/src/qlever/commands/monitor_queries.py @@ -3,75 +3,76 @@ import json import re import subprocess -import textwrap import time +from rich.console import Console +from rich.live import Live +from rich.table import Table + from qlever.command import QleverCommand from qlever.log import log from qlever.util import pretty_printed_query -def render_queries(monitor_queries_cmd, args) -> bool: +def fetch_queries(monitor_queries_cmd: str) -> dict | None: try: - monitored_queries = subprocess.check_output( - monitor_queries_cmd, shell=True - ) - monitored_queries_dict = json.loads(monitored_queries) + output = subprocess.check_output(monitor_queries_cmd, shell=True) except Exception as e: - log.error(f"Failed to get active queries: {e}") - return False - - if not monitored_queries_dict: - log.info("No active queries on the server") - return True - - queries = list(monitored_queries_dict.items()) - - # Show the full SPARQL for a specific query. - if args.query_id: - # Try as a table index first, then as a server query ID. - try: - idx = int(args.query_id) - if 1 <= idx <= len(queries): - sparql_query = queries[idx - 1][1] - else: - sparql_query = None - except ValueError: - sparql_query = monitored_queries_dict.get(args.query_id) - if not sparql_query: - log.error("No active query found for the given ID") - return False - log.info(pretty_printed_query(sparql_query, False, args.system)) - return True - - # Table header. - col_index = 3 - col_qid = max(len(qid) for qid, _ in queries) - indent = " " * (2 + col_index + 2 + col_qid + 2) - log.info(f" {'#':<{col_index}} {'Query ID':<{col_qid}} SPARQL") - - for i, (qid, sparql) in enumerate(queries, 1): - # Collapse whitespace for compact display. - sparql_oneline = re.sub(r"\s+", " ", sparql).strip() - if args.detailed: - wrapped = textwrap.fill( - sparql_oneline, - width=100, - initial_indent="", - subsequent_indent=indent, - ) - log.info(f" {i:<{col_index}} {qid:<{col_qid}} {wrapped}") - else: - short_sparql = ( - sparql_oneline[:80] + "..." - if len(sparql_oneline) > 80 - else sparql_oneline + log.error(f"Failed to fetch active queries: {e}") + return None + output = output.strip() + if not output: + return {} + try: + parsed = json.loads(output) + except json.JSONDecodeError as e: + log.error(f"Server returned unexpected response: {e}") + return None + return parsed if isinstance(parsed, dict) else {} + + +def server_supports_duration(queries_dict: dict) -> bool: + return any(isinstance(v, dict) for v in queries_dict.values()) + + +def build_table(queries_dict: dict, has_duration: bool) -> Table: + table = Table( + show_header=True, + header_style="bold", + expand=True, + padding=(0, 1), + ) + + table.add_column("#", width=3, justify="right", no_wrap=True) + table.add_column( + "Query ID", min_width=12, max_width=18, overflow="ellipsis" + ) + if has_duration: + table.add_column("Duration", width=8, justify="right", no_wrap=True) + table.add_column( + "SPARQL", + ratio=1, + overflow="ellipsis", + no_wrap=True, + ) + + now_ms = int(time.time() * 1000) + for i, (qid, info) in enumerate(queries_dict.items(), 1): + query_text = info["query"] if isinstance(info, dict) else info + sparql = re.sub(r"\s+", " ", query_text).strip() + if has_duration: + started_at = ( + info.get("started_at") if isinstance(info, dict) else None ) - log.info( - f" {i:<{col_index}} {qid:<{col_qid}} {short_sparql}" + duration = ( + f"{(now_ms - started_at) // 1000}s" + if started_at is not None + else "N/A" ) - - return True + table.add_row(str(i), qid, duration, sparql) + else: + table.add_row(str(i), qid, sparql) + return table class MonitorQueriesCommand(QleverCommand): @@ -99,12 +100,6 @@ def additional_arguments(self, subparser) -> None: "--sparql-endpoint", help="URL of the SPARQL endpoint, default is {host_name}:{port}", ) - subparser.add_argument( - "--detailed", - action="store_true", - default=False, - help="Show the full SPARQL text for each active query", - ) subparser.add_argument( "--query-id", help="Show the full SPARQL text for a specific query," @@ -136,7 +131,6 @@ def execute(self, args) -> bool: f'--data-urlencode access-token="{args.access_token}"' ) - # Show them. self.show(monitor_queries_cmd, only_show=args.show) if args.show: return True @@ -148,13 +142,58 @@ def execute(self, args) -> bool: log.error("--watch cannot be combined with --query-id") return False + console = Console() + + # One-shot: show full SPARQL for a specific query. + if args.query_id: + queries_dict = fetch_queries(monitor_queries_cmd) + if queries_dict is None: + return False + queries = list(queries_dict.items()) + try: + # When user passes row index as query id + idx = int(args.query_id) + info = ( + queries[idx - 1][1] if 1 <= idx <= len(queries) else None + ) + except ValueError: + # When user passes server query id directly + info = queries_dict.get(args.query_id) + if not info: + log.error("No active query found for the given ID") + return False + query_text = info["query"] if isinstance(info, dict) else info + log.info(pretty_printed_query(query_text, False, args.system)) + return True + + # Watch mode: refresh the table in place. if args.watch: + has_duration = None try: - while True: - print("\033[H\033[2J", end="", flush=True) - render_queries(monitor_queries_cmd, args) - time.sleep(args.interval) + with Live(console=console, refresh_per_second=4) as live: + while True: + queries_dict = fetch_queries(monitor_queries_cmd) + if queries_dict is None: + live.update( + "(failed to fetch active queries, retrying...)" + ) + else: + # Lock in the format on the first non-empty fetch. + if has_duration is None and queries_dict: + has_duration = server_supports_duration( + queries_dict + ) + live.update( + build_table(queries_dict, bool(has_duration)) + ) + time.sleep(args.interval) except KeyboardInterrupt: return True - return render_queries(monitor_queries_cmd, args) + # One-shot: print the table once. + queries_dict = fetch_queries(monitor_queries_cmd) + if queries_dict is None: + return False + has_duration = server_supports_duration(queries_dict) + console.print(build_table(queries_dict, has_duration)) + return True From d9800c6ccce2bb3c4f7d8ffa5cf3cf5218b778e1 Mon Sep 17 00:00:00 2001 From: tanmay-9 Date: Wed, 22 Apr 2026 16:38:25 +0200 Subject: [PATCH 05/11] Add slow-query logging and show hidden queries if table doesn't fit on terminal screen --- src/qlever/commands/monitor_queries.py | 237 +++++++++++++++++++------ 1 file changed, 186 insertions(+), 51 deletions(-) diff --git a/src/qlever/commands/monitor_queries.py b/src/qlever/commands/monitor_queries.py index 741982ae..0977d6de 100644 --- a/src/qlever/commands/monitor_queries.py +++ b/src/qlever/commands/monitor_queries.py @@ -4,15 +4,19 @@ import re import subprocess import time +from datetime import datetime -from rich.console import Console +from rich.console import Console, Group from rich.live import Live from rich.table import Table +from rich.text import Text from qlever.command import QleverCommand from qlever.log import log from qlever.util import pretty_printed_query +MAX_CONSECUTIVE_FAILURES = 5 + def fetch_queries(monitor_queries_cmd: str) -> dict | None: try: @@ -35,7 +39,92 @@ def server_supports_duration(queries_dict: dict) -> bool: return any(isinstance(v, dict) for v in queries_dict.values()) -def build_table(queries_dict: dict, has_duration: bool) -> Table: +def append_slow_log( + path: str, event: str, qid: str, duration_s: int, sparql: str = "" +) -> None: + """Append a single TSV-formatted slow-query event to the warning log.""" + ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + with open(path, "a") as f: + f.write(f"{ts}\t{event}\t{qid}\t{duration_s}\t{sparql}\n") + + +def detect_slow_queries( + queries_dict: dict, + slow_seen: dict, + warn_after: float, + log_path: str, +) -> None: + """Log start/finish events for queries that cross the slow threshold. + + Mutates slow_seen in place (qid -> started_at). Appends a 'start' event + when a query first exceeds warn_after, and a 'finish' event when a + previously logged slow query is no longer active. + """ + now_ms = int(time.time() * 1000) + + # Queries that finished since last poll: log with final duration. + for qid in list(slow_seen): + if qid not in queries_dict: + final_s = (now_ms - slow_seen[qid]) // 1000 + append_slow_log(log_path, "finish", qid, final_s) + del slow_seen[qid] + + # Queries that just crossed the threshold: log start event. + for qid, info in queries_dict.items(): + if not isinstance(info, dict) or qid in slow_seen: + continue + started_at = info.get("started_at") + if started_at is None: + continue + duration_s = (now_ms - started_at) // 1000 + if duration_s >= warn_after: + slow_seen[qid] = started_at + sparql = re.sub(r"\s+", " ", info["query"]).strip() + append_slow_log(log_path, "start", qid, duration_s, sparql) + + +def compact_slow_log(path: str) -> None: + """Collapse start/finish event pairs in the log into one row per qid. + + Reads the append-only log, pairs each 'start' with its matching 'finish' + by qid, and rewrites the file with a single row per query showing the + final duration (or the last-known duration if the query was still + running at compaction time). No-op if the log does not exist or has + no start events. + """ + starts = {} + finals = {} + try: + with open(path) as f: + for line in f: + parts = line.rstrip("\n").split("\t", maxsplit=4) + if len(parts) < 5: + continue + ts, event, qid, duration_str, sparql = parts + try: + duration_s = int(duration_str) + except ValueError: + continue + if event == "start": + starts[qid] = (ts, duration_s, sparql) + elif event == "finish": + finals[qid] = duration_s + except FileNotFoundError: + return + if not starts: + return + with open(path, "w") as f: + for qid, (logged_at, start_duration, sparql) in starts.items(): + duration = finals.get(qid, start_duration) + status = "finished" if qid in finals else "unfinished" + f.write( + f"{logged_at}\t{status}\t{qid}\t{duration}\t{sparql}\n" + ) + + +def build_table( + queries_dict: dict, has_duration: bool, warn_after: float +) -> Table: table = Table( show_header=True, header_style="bold", @@ -64,11 +153,13 @@ def build_table(queries_dict: dict, has_duration: bool) -> Table: started_at = ( info.get("started_at") if isinstance(info, dict) else None ) - duration = ( - f"{(now_ms - started_at) // 1000}s" - if started_at is not None - else "N/A" - ) + if started_at is not None: + duration_s = (now_ms - started_at) // 1000 + duration = f"{duration_s}s" + if duration_s >= warn_after: + duration = f"[red]{duration}[/red]" + else: + duration = "N/A" table.add_row(str(i), qid, duration, sparql) else: table.add_row(str(i), qid, sparql) @@ -91,7 +182,8 @@ def should_have_qleverfile(self) -> bool: def relevant_qleverfile_arguments(self) -> dict[str, list[str]]: return { - "server": ["access_token", "host_name", "port"], + "data": ["name"], + "server": ["access_token", "host_name", "port", "timeout"], "runtime": ["system"], } @@ -105,19 +197,25 @@ def additional_arguments(self, subparser) -> None: help="Show the full SPARQL text for a specific query," " either by its index (#) or server query ID", ) - subparser.add_argument( - "--watch", - action="store_true", - default=False, - help="Continuously refresh the list of active queries" - " until interrupted with Ctrl-C", - ) subparser.add_argument( "--interval", type=float, default=2.0, - help="Refresh interval in seconds when using --watch" - " (default: 2.0)", + help="Refresh interval in seconds (default = 2.0)", + ) + subparser.add_argument( + "--warn-after", + type=float, + default=None, + help="Duration in seconds after which an active query is logged" + " as slow (default = server timeout - 10s)", + ) + subparser.add_argument( + "--warning-log", + type=str, + default=None, + help="File to append slow-query warnings to" + " (default = {name}.slow-queries.log)", ) def execute(self, args) -> bool: @@ -135,16 +233,26 @@ def execute(self, args) -> bool: if args.show: return True - if args.watch and args.interval < 0.5: + if args.interval < 0.5: log.error("--interval must be at least 0.5 seconds") return False - if args.watch and args.query_id: - log.error("--watch cannot be combined with --query-id") - return False + + if args.warn_after is None: + try: + timeout_s = float(args.timeout.rstrip("s")) + except ValueError: + log.error( + f"Could not parse server timeout {args.timeout!r};" + " pass --warn-after explicitly" + ) + return False + args.warn_after = max(1.0, timeout_s - 10) + if args.warning_log is None: + args.warning_log = f"{args.name}.slow-queries.log" console = Console() - # One-shot: show full SPARQL for a specific query. + # Show full SPARQL for a specific query. if args.query_id: queries_dict = fetch_queries(monitor_queries_cmd) if queries_dict is None: @@ -166,34 +274,61 @@ def execute(self, args) -> bool: log.info(pretty_printed_query(query_text, False, args.system)) return True - # Watch mode: refresh the table in place. - if args.watch: - has_duration = None - try: - with Live(console=console, refresh_per_second=4) as live: - while True: - queries_dict = fetch_queries(monitor_queries_cmd) - if queries_dict is None: - live.update( - "(failed to fetch active queries, retrying...)" + has_duration = None + failures = 0 + slow_seen = {} + try: + with Live(console=console, refresh_per_second=4) as live: + while True: + queries_dict = fetch_queries(monitor_queries_cmd) + if queries_dict is None: + failures += 1 + if failures > MAX_CONSECUTIVE_FAILURES: + log.error( + f"Failed to fetch active queries more than " + f"{MAX_CONSECUTIVE_FAILURES} times. Exiting..." ) - else: - # Lock in the format on the first non-empty fetch. - if has_duration is None and queries_dict: - has_duration = server_supports_duration( - queries_dict - ) - live.update( - build_table(queries_dict, bool(has_duration)) + compact_slow_log(args.warning_log) + return False + live.update( + "Failed to fetch active queries, retrying..." + ) + else: + # Reset the failures on successful fetch + failures = 0 + # Lock in the format on the first non-empty fetch. + if has_duration is None and queries_dict: + has_duration = server_supports_duration( + queries_dict ) - time.sleep(args.interval) - except KeyboardInterrupt: - return True - - # One-shot: print the table once. - queries_dict = fetch_queries(monitor_queries_cmd) - if queries_dict is None: - return False - has_duration = server_supports_duration(queries_dict) - console.print(build_table(queries_dict, has_duration)) - return True + if has_duration: + detect_slow_queries( + queries_dict, + slow_seen, + args.warn_after, + args.warning_log, + ) + table = build_table( + queries_dict, + bool(has_duration), + args.warn_after, + ) + # Approx. rows rich can show: terminal height minus + # top/bottom border, header row, header separator, + # and one line for the caption above. + max_rows = max(1, console.size.height - 5) + hidden = max(0, len(queries_dict) - max_rows) + if hidden > 0: + caption = Text( + f"{len(queries_dict)} active queries, " + f"{hidden} not shown", + justify="center", + style="bold", + ) + live.update(Group(caption, table)) + else: + live.update(table) + time.sleep(args.interval) + except KeyboardInterrupt: + compact_slow_log(args.warning_log) + return True From dde2873c4e633b8c6bf3f1eae55377c60c2a6598 Mon Sep 17 00:00:00 2001 From: tanmay-9 Date: Thu, 23 Apr 2026 01:55:34 +0200 Subject: [PATCH 06/11] Add headers to tsv log file --- src/qlever/commands/monitor_queries.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/src/qlever/commands/monitor_queries.py b/src/qlever/commands/monitor_queries.py index 0977d6de..5cedcf81 100644 --- a/src/qlever/commands/monitor_queries.py +++ b/src/qlever/commands/monitor_queries.py @@ -16,6 +16,7 @@ from qlever.util import pretty_printed_query MAX_CONSECUTIVE_FAILURES = 5 +SLOW_LOG_HEADER = "logged_at\tevent\tqid\tduration_s\tsparql\n" def fetch_queries(monitor_queries_cmd: str) -> dict | None: @@ -42,9 +43,14 @@ def server_supports_duration(queries_dict: dict) -> bool: def append_slow_log( path: str, event: str, qid: str, duration_s: int, sparql: str = "" ) -> None: - """Append a single TSV-formatted slow-query event to the warning log.""" + """Append a single TSV-formatted slow-query event to the warning log. + + Writes a column header if the file is empty or newly created. + """ ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") with open(path, "a") as f: + if f.tell() == 0: + f.write(SLOW_LOG_HEADER) f.write(f"{ts}\t{event}\t{qid}\t{duration_s}\t{sparql}\n") @@ -97,6 +103,8 @@ def compact_slow_log(path: str) -> None: try: with open(path) as f: for line in f: + if line == SLOW_LOG_HEADER: + continue parts = line.rstrip("\n").split("\t", maxsplit=4) if len(parts) < 5: continue @@ -114,6 +122,7 @@ def compact_slow_log(path: str) -> None: if not starts: return with open(path, "w") as f: + f.write("logged_at\tstatus\tqid\tduration_s\tsparql\n") for qid, (logged_at, start_duration, sparql) in starts.items(): duration = finals.get(qid, start_duration) status = "finished" if qid in finals else "unfinished" @@ -215,7 +224,7 @@ def additional_arguments(self, subparser) -> None: type=str, default=None, help="File to append slow-query warnings to" - " (default = {name}.slow-queries.log)", + " (default = {name}.slow-queries.tsv)", ) def execute(self, args) -> bool: @@ -248,7 +257,7 @@ def execute(self, args) -> bool: return False args.warn_after = max(1.0, timeout_s - 10) if args.warning_log is None: - args.warning_log = f"{args.name}.slow-queries.log" + args.warning_log = f"{args.name}.slow-queries.tsv" console = Console() From 47ccc6cb163c1b2c44344dcfaa4d408f37234251 Mon Sep 17 00:00:00 2001 From: tanmay-9 Date: Tue, 28 Apr 2026 16:35:32 +0200 Subject: [PATCH 07/11] Add first version of monitor quieries TUI --- pyproject.toml | 2 +- src/qlever/commands/monitor_queries_tui.py | 636 +++++++++++++++++++++ 2 files changed, 637 insertions(+), 1 deletion(-) create mode 100644 src/qlever/commands/monitor_queries_tui.py diff --git a/pyproject.toml b/pyproject.toml index b86b32b2..f1b545b4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,7 +21,7 @@ classifiers = [ "Topic :: Database :: Front-Ends" ] -dependencies = [ "psutil", "termcolor", "argcomplete", "pyyaml", "rdflib", "requests-sse", "rich", "tqdm>=4.60.0" ] +dependencies = [ "psutil", "termcolor", "argcomplete", "pyyaml", "rdflib", "requests-sse", "rich", "textual", "tqdm>=4.60.0" ] [project.urls] homepage = "https://github.com/ad-freiburg/qlever" diff --git a/src/qlever/commands/monitor_queries_tui.py b/src/qlever/commands/monitor_queries_tui.py new file mode 100644 index 00000000..adc9369e --- /dev/null +++ b/src/qlever/commands/monitor_queries_tui.py @@ -0,0 +1,636 @@ +from __future__ import annotations + +import json +import os +import platform +import re +import shutil +import subprocess +import time +from datetime import datetime + +from rich.console import Group +from rich.syntax import Syntax +from rich.text import Text +from textual import work +from textual.app import App, ComposeResult +from textual.widgets import DataTable, Footer, Static + +from qlever.command import QleverCommand +from qlever.log import log +from qlever.util import pretty_printed_query + +MAX_CONSECUTIVE_FAILURES = 5 +SLOW_LOG_HEADER = "logged_at\tevent\tqid\tduration_s\tsparql\n" +HINT_TEXT = ( + "Click a row (or press Enter on a cursored row) to view its full" + " SPARQL. Arrow keys move the cursor without triggering pretty-print." +) + + +def copy_text(text: str) -> bool: + """ + Cross-platform clipboard copy for Textual / TUI apps. + + Supports: + - macOS : pbcopy + - Linux : wl-copy (Wayland) OR xclip/xsel (X11), never both — + falling through to xclip on a Wayland session writes + to the XWayland selection that Wayland apps don't read. + """ + try: + system = platform.system() + + candidates = [] + if system == "Darwin": + candidates.append(["pbcopy"]) + elif system == "Linux": + on_wayland = bool(os.environ.get("WAYLAND_DISPLAY")) + if on_wayland and shutil.which("wl-copy"): + # Force text/plain so wl-copy doesn't auto-detect a + # different MIME type from the content. SPARQL queries + # starting with `PREFIX foo: ` otherwise + # get tagged as a URI-ish type and browsers requesting + # text/plain on paste get nothing. + candidates.append(["wl-copy", "--type", "text/plain"]) + else: + if shutil.which("xclip"): + candidates.append( + [ + "xclip", + "-selection", + "clipboard", + "-t", + "UTF8_STRING", + ] + ) + if shutil.which("xsel"): + candidates.append(["xsel", "--clipboard", "--input"]) + + payload = text.encode("utf-8") + for cmd in candidates: + proc = subprocess.Popen( + cmd, + stdin=subprocess.PIPE, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + try: + proc.communicate(input=payload, timeout=2) + except subprocess.TimeoutExpired: + proc.kill() + try: + proc.communicate(timeout=1) + except Exception: + pass + continue + except Exception: + continue + if proc.returncode == 0: + return True + return False + except Exception: + return False + + +def fetch_queries(monitor_queries_cmd: str) -> dict | None: + """Fetch and parse active queries from the SPARQL endpoint. + + Returns None on failure (network error, non-JSON response), {} when + the server reports no active queries. Stays silent on failure on + purpose — the Textual app owns the screen, so any log/print here + would corrupt the rendered display. Callers surface failures via + the status caption instead. + """ + try: + output = subprocess.check_output( + monitor_queries_cmd, shell=True, stderr=subprocess.DEVNULL + ) + except Exception: + return None + output = output.strip() + if not output: + return {} + try: + parsed = json.loads(output) + except json.JSONDecodeError: + return None + return parsed if isinstance(parsed, dict) else {} + + +def server_supports_duration(queries_dict: dict) -> bool: + """Return True iff the server reports per-query duration metadata.""" + return any(isinstance(v, dict) for v in queries_dict.values()) + + +def append_slow_log( + path: str, event: str, qid: str, duration_s: int, sparql: str = "" +) -> None: + """Append a single TSV-formatted slow-query event to the warning log. + + Writes a column header if the file is empty or newly created. + """ + ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + with open(path, "a") as f: + if f.tell() == 0: + f.write(SLOW_LOG_HEADER) + f.write(f"{ts}\t{event}\t{qid}\t{duration_s}\t{sparql}\n") + + +def detect_slow_queries( + queries_dict: dict, + slow_seen: dict, + warn_after: float, + log_path: str, +) -> None: + """Log start/finish events for queries that cross the slow threshold. + + Mutates slow_seen in place (qid -> started_at). Appends a 'start' event + when a query first exceeds warn_after, and a 'finish' event when a + previously logged slow query is no longer active. + """ + now_ms = int(time.time() * 1000) + + # Queries that finished since last poll: log with final duration. + for qid in list(slow_seen): + if qid not in queries_dict: + final_s = (now_ms - slow_seen[qid]) // 1000 + append_slow_log(log_path, "finish", qid, final_s) + del slow_seen[qid] + + # Queries that just crossed the threshold: log start event. + for qid, info in queries_dict.items(): + if not isinstance(info, dict) or qid in slow_seen: + continue + started_at = info.get("started_at") + if started_at is None: + continue + duration_s = (now_ms - started_at) // 1000 + if duration_s >= warn_after: + slow_seen[qid] = started_at + sparql = re.sub(r"\s+", " ", info["query"]).strip() + append_slow_log(log_path, "start", qid, duration_s, sparql) + + +def compact_slow_log(path: str) -> None: + """Collapse start/finish event pairs in the log into one row per qid. + + Reads the append-only log, pairs each 'start' with its matching 'finish' + by qid, and rewrites the file with a single row per query showing the + final duration (or the last-known duration if the query was still + running at compaction time). No-op if the log does not exist or has + no start events. + """ + starts = {} + finals = {} + try: + with open(path) as f: + for line in f: + if line == SLOW_LOG_HEADER: + continue + parts = line.rstrip("\n").split("\t", maxsplit=4) + if len(parts) < 5: + continue + ts, event, qid, duration_str, sparql = parts + try: + duration_s = int(duration_str) + except ValueError: + continue + if event == "start": + starts[qid] = (ts, duration_s, sparql) + elif event == "finish": + finals[qid] = duration_s + except FileNotFoundError: + return + if not starts: + return + with open(path, "w") as f: + f.write("logged_at\tstatus\tqid\tduration_s\tsparql\n") + for qid, (logged_at, start_duration, sparql) in starts.items(): + duration = finals.get(qid, start_duration) + status = "finished" if qid in finals else "unfinished" + f.write(f"{logged_at}\t{status}\t{qid}\t{duration}\t{sparql}\n") + + +class MonitorApp(App): + """Textual app for the interactive monitor-queries TUI. + + Polls the server every `interval` seconds and renders active queries + in a DataTable. Selecting a row (click on the cursored row, or + Enter while the cursor is on it) shows the full pretty-printed + SPARQL of that query in a detail pane below the table. Arrow keys + move the cursor for browsing — they do not trigger the docker-based + pretty-printer, so navigation stays snappy. + + Until the user selects something the detail pane shows a short + hint. After a selection the cursor is re-anchored to the selected + qid on every refresh so the highlight tracks the selected query + even when the row order shifts. If the selected query disappears + from the server, the cursor stays where it was and the detail pane + keeps showing the last seen SPARQL until the user presses `c` or + selects a different row. + + On fetch failure the last good rows stay visible (frozen) while a + status caption shows the retry count; after MAX_CONSECUTIVE_FAILURES + consecutive failures the app exits. + """ + + BINDINGS = [ + ("q", "quit", "Quit"), + ("c", "clear_detail", "Clear SPARQL"), + ("y", "copy_sparql", "Copy SPARQL"), + ("f", "freeze", "Freeze/Unfreeze table"), + ("d", "toggle_dark", "Toggle dark mode"), + ] + # Use Textual's default theme rather than inheriting terminal + # colors — partial inheritance left the Footer's text invisible + # (its theme-driven background collided with ANSI defaults). + CSS = "#detail { padding: 2; padding-top: 0; }" + + def __init__( + self, + monitor_queries_cmd: str, + interval: float, + warn_after: float, + warning_log: str, + system: str, + ) -> None: + """Stash the fetch command, refresh interval, slow threshold, + slow-query log path, and container system. + + State that must survive across ticks (`has_duration`, + `failures`, the latest `queries_dict`, the sticky selection, + `slow_seen`) lives on the instance — in the original Live loop + these were locals, but Textual calls `refresh_table` from a + timer so each tick is a fresh frame. + """ + super().__init__() + self.monitor_queries_cmd = monitor_queries_cmd + self.interval = interval + self.warn_after = warn_after + self.warning_log = warning_log + self.system = system + # Locked on the first non-empty fetch: True for newer QLever + # servers (dict-shaped query info with started_at), False for + # older servers (plain SPARQL string). None = not yet known, + # which also means columns haven't been added yet. + self.has_duration = None + self.failures = 0 + # Most recent successful fetch; the row-click handler reads from + # this rather than re-hitting the server. + self.queries_dict = {} + # Sticky selection survives table refreshes (we identify by qid + # rather than row index — indices renumber every tick). + self.selected_qid = None + # Raw SPARQL of the selected query, cached at selection time so + # `y` keeps working even after the query has finished + # server-side and is no longer in queries_dict. + self.selected_query_text = None + self.freeze = False + # qid -> started_at for queries that have crossed the slow + # threshold and had a 'start' row appended to the warning log. + # Mutated in place by detect_slow_queries. + self.slow_seen = {} + + def compose(self) -> ComposeResult: + """Build the widget tree: DataTable, status, detail, footer.""" + # cursor_type="row" so a click selects the whole row and emits + # RowSelected (default cursor_type is "cell"). + yield DataTable(cursor_type="row") + yield Static("", id="status") + yield Static("", id="detail") + # Footer renders the BINDINGS list as labelled key hints, + # docked to the bottom by Textual's default CSS. + yield Footer() + + def on_mount(self) -> None: + """Kick off the first refresh and start the recurring timer. + + Columns are added lazily in `setup_columns` once we've seen + real data — that's how the Duration column gets hidden on + servers that don't support it (DataTable can't insert a column + mid-stream, so we defer the whole column setup until we know). + """ + self.theme = "textual-dark" + self.query_one("#detail", Static).update(HINT_TEXT) + self.refresh_table() + self.set_interval(self.interval, self.refresh_table) + + def setup_columns(self) -> None: + """Add columns now that `has_duration` is known. + + Called exactly once, on the first non-empty fetch. The Duration + column is omitted entirely on older servers. + """ + table = self.query_one(DataTable) + table.add_column("#", width=3, key="idx") + table.add_column("Query ID", width=18, key="qid") + if self.has_duration: + table.add_column("Duration", width=8, key="duration") + table.add_column("SPARQL", key="sparql") + + def refresh_table(self) -> None: + """Fetch active queries and rerender the table. + + Skipped entirely while `self.freeze` is True — the timer keeps + ticking but does nothing, so the last-rendered rows stay put. + On fetch failure the table is left untouched (durations freeze) + and the status caption shows the retry count; the app exits if + failures pass the threshold. + """ + if self.freeze: + return + queries_dict = fetch_queries(self.monitor_queries_cmd) + status = self.query_one("#status", Static) + if queries_dict is None: + self.failures += 1 + if self.failures > MAX_CONSECUTIVE_FAILURES: + # Triggers App.run() to return, so the try/finally in + # execute() still runs compact_slow_log on the way out. + self.exit( + message=( + f"Failed to fetch active queries more than " + f"{MAX_CONSECUTIVE_FAILURES} times." + ) + ) + return + status.update( + f"fetch failed, retrying... " + f"({self.failures}/{MAX_CONSECUTIVE_FAILURES})" + ) + return + self.failures = 0 + status.update("") + # Capture the cursor's current qid before we mutate state — used + # below to preserve cursor position across the clear+repopulate + # so arrow-key browsing isn't undone on every tick. + table = self.query_one(DataTable) + old_qids = list(self.queries_dict.keys()) + cursor_row = ( + table.cursor_coordinate.row + if table.cursor_coordinate is not None + else None + ) + cursor_qid = ( + old_qids[cursor_row] + if cursor_row is not None and 0 <= cursor_row < len(old_qids) + else None + ) + self.queries_dict = queries_dict + # Lock the server's format and configure columns the first time + # we actually see data. + if self.has_duration is None and queries_dict: + self.has_duration = server_supports_duration(queries_dict) + self.setup_columns() + # Still waiting for the first non-empty fetch — no columns yet, + # nothing to render. + if self.has_duration is None: + return + # Append slow-query start/finish events to the warning log. + # No-op on servers that don't report started_at. + if self.has_duration: + detect_slow_queries( + queries_dict, + self.slow_seen, + self.warn_after, + self.warning_log, + ) + table.clear() + now_ms = int(time.time() * 1000) + for i, (qid, info) in enumerate(queries_dict.items(), 1): + query_text = info["query"] if isinstance(info, dict) else info + sparql = re.sub(r"\s+", " ", query_text).strip() + if self.has_duration: + started_at = ( + info.get("started_at") if isinstance(info, dict) else None + ) + if started_at is not None: + duration_s = (now_ms - started_at) // 1000 + if duration_s >= self.warn_after: + # Text.from_markup keeps the [red]...[/red] markup + # working inside DataTable cells. + duration_cell = Text.from_markup( + f"[red]{duration_s}s[/red]" + ) + else: + duration_cell = f"{duration_s}s" + else: + duration_cell = "N/A" + # Row key = qid so a click can resolve back to a query. + table.add_row(str(i), qid, duration_cell, sparql, key=qid) + else: + table.add_row(str(i), qid, sparql, key=qid) + # Restore cursor: prefer the user's explicit selection so the + # highlight tracks it across refreshes; otherwise restore to the + # qid the cursor was on before the refresh, so plain arrow-key + # browsing isn't reset every tick. If neither qid is still in + # the table, leave the cursor at its post-clear default (row 0). + target_qid = self.selected_qid or cursor_qid + if target_qid is not None: + new_qids = list(queries_dict.keys()) + if target_qid in new_qids: + table.move_cursor(row=new_qids.index(target_qid)) + + def on_data_table_row_selected(self, event: DataTable.RowSelected) -> None: + """Show the full pretty-printed SPARQL for the selected row. + + Fires on Enter while the cursor is on a row, or on a click on + the row that already holds the cursor. Arrow-key navigation + alone does NOT fire this — that's by design, so the docker-based + pretty-printer doesn't run on every keystroke. + + `row_key.value` is the qid we passed to `add_row(..., key=qid)`. + Reads from `self.queries_dict` (last successful fetch) so we + don't hit the server twice. `pretty_printed_query` shells out + to docker/podman, so this blocks the UI for a few hundred ms — + acceptable for an explicit selection. + """ + qid = event.row_key.value if event.row_key else None + if qid is None or qid == self.selected_qid: + return + self.selected_qid = qid + info = self.queries_dict.get(qid) + if info is None: + return + query_text = info["query"] if isinstance(info, dict) else info + # Cache the raw SPARQL for the copy action — keeps copy working + # if the query finishes server-side after this selection. + pretty = pretty_printed_query(query_text, True, self.system) + self.selected_query_text = pretty + self.query_one("#detail", Static).update(self.render_detail(qid, pretty)) + + def render_detail(self, qid: str, pretty: str) -> Group: + """Build the detail-pane renderable: bold qid + highlighted SPARQL. + + Uses Pygments via rich.syntax. Theme tracks self.theme so the + highlight stays readable after a light/dark mode toggle. The + Syntax background is set to match the pane so the highlighter + doesn't paint its own block of color over the Static. + """ + is_dark = "light" not in self.theme + syntax_theme = "monokai" if is_dark else "default" + syntax = Syntax( + pretty, + "sparql", + theme=syntax_theme, + word_wrap=True, + ) + return Group(Text(f"Server Query ID: {qid}", style="bold"), Text(""), syntax) + + def action_freeze(self) -> None: + """Pause or resume the periodic table refresh.""" + self.freeze = not self.freeze + status = self.query_one("#status", Static) + status.update( + "paused — press 'f' to resume\n" if self.freeze else "" + ) + + def action_toggle_dark(self) -> None: + """Quick switch between textual-dark and textual-light. + + Independent of the command palette's full theme picker: this + is just a one-key flip between the two textual theme variants. + Also re-renders the detail pane so the SPARQL syntax theme + follows the new mode. + """ + self.theme = ( + "textual-light" if self.theme == "textual-dark" else "textual-dark" + ) + if self.selected_qid is not None and self.selected_query_text is not None: + self.query_one("#detail", Static).update( + self.render_detail(self.selected_qid, self.selected_query_text) + ) + + def action_clear_detail(self) -> None: + """Clear the selected-query detail pane (bound to `c`). + + Restores the initial hint text so the user knows what to do + next; the cursor stays visible because Textual ties cursor + visibility to interactivity (hiding it disables click/Enter). + """ + self.selected_qid = None + self.selected_query_text = None + self.query_one("#detail", Static).update(HINT_TEXT) + + def action_copy_sparql(self) -> None: + """Copy the selected query's raw SPARQL to the clipboard (`y`). + + Uses Textual's `copy_to_clipboard` which writes via OSC 52 + escape sequences — the terminal emulator picks it up and routes + to the system clipboard. Works over SSH if the local terminal + supports OSC 52 (modern alacritty, kitty, gnome-terminal, + iterm). Copies the raw SPARQL rather than the pretty-printed + form so it round-trips cleanly through any SPARQL endpoint. + """ + if self.selected_qid is None or self.selected_query_text is None: + self.notify("No query selected", severity="warning") + return + self.do_copy(self.selected_query_text) + + @work(thread=True, exclusive=True) + def do_copy(self, text: str) -> None: + """Run the clipboard copy on a worker thread. + + Off-thread so a slow clipboard tool can't stall the 2s refresh + tick. exclusive=True drops the result of any older copy still + in flight when the user presses 'y' again, so only the latest + selection produces a toast. + """ + nbytes = len(text.encode("utf-8")) + ok = copy_text(text) + msg = f"Copied ({nbytes} B)" if ok else f"Copy failed ({nbytes} B)" + self.call_from_thread(self.notify, msg) + + +class MonitorQueriesTuiCommand(QleverCommand): + """ + Class for executing the `monitor-queries-tui` command. + """ + + def __init__(self): + pass + + def description(self) -> str: + return ( + "Show the currently active queries on the server (interactive TUI)" + ) + + def should_have_qleverfile(self) -> bool: + return False + + def relevant_qleverfile_arguments(self) -> dict[str, list[str]]: + return { + "data": ["name"], + "server": ["access_token", "host_name", "port", "timeout"], + "runtime": ["system"], + } + + def additional_arguments(self, subparser) -> None: + subparser.add_argument( + "--sparql-endpoint", + help="URL of the SPARQL endpoint, default is {host_name}:{port}", + ) + subparser.add_argument( + "--interval", + type=float, + default=2.0, + help="Refresh interval in seconds (default = 2.0)", + ) + subparser.add_argument( + "--warn-after", + type=float, + default=None, + help="Duration in seconds after which an active query is logged" + " as slow (default = server timeout - 10s)", + ) + subparser.add_argument( + "--warning-log", + type=str, + default=None, + help="File to append slow-query warnings to" + " (default = {name}.slow-queries.tsv)", + ) + + def execute(self, args) -> bool: + sparql_endpoint = ( + args.sparql_endpoint + if args.sparql_endpoint + else f"{args.host_name}:{args.port}" + ) + monitor_queries_cmd = ( + f'curl -s {sparql_endpoint} --data-urlencode "cmd=dump-active-queries" ' + f'--data-urlencode access-token="{args.access_token}"' + ) + + self.show(monitor_queries_cmd, only_show=args.show) + if args.show: + return True + + if args.interval < 1: + log.error("--interval must be at least 1 second") + return False + + if args.warn_after is None: + try: + timeout_s = float(args.timeout.rstrip("s")) + except ValueError: + log.error( + f"Could not parse server timeout {args.timeout!r};" + " pass --warn-after explicitly" + ) + return False + args.warn_after = max(1.0, timeout_s - 10) + if args.warning_log is None: + args.warning_log = f"{args.name}.slow-queries.tsv" + + try: + MonitorApp( + monitor_queries_cmd, + args.interval, + args.warn_after, + args.warning_log, + args.system, + ).run() + finally: + compact_slow_log(args.warning_log) + return True From c32f8d96ab75a68626ebf0c0c9124cff7afc4bba Mon Sep 17 00:00:00 2001 From: tanmay-9 Date: Sun, 3 May 2026 23:43:35 +0200 Subject: [PATCH 08/11] Fix server json keys and add descending duration sort order to the displayed table --- src/qlever/commands/monitor_queries_tui.py | 201 +++++++++++++-------- 1 file changed, 121 insertions(+), 80 deletions(-) diff --git a/src/qlever/commands/monitor_queries_tui.py b/src/qlever/commands/monitor_queries_tui.py index adc9369e..5a95da5d 100644 --- a/src/qlever/commands/monitor_queries_tui.py +++ b/src/qlever/commands/monitor_queries_tui.py @@ -12,7 +12,6 @@ from rich.console import Group from rich.syntax import Syntax from rich.text import Text -from textual import work from textual.app import App, ComposeResult from textual.widgets import DataTable, Footer, Static @@ -23,8 +22,9 @@ MAX_CONSECUTIVE_FAILURES = 5 SLOW_LOG_HEADER = "logged_at\tevent\tqid\tduration_s\tsparql\n" HINT_TEXT = ( - "Click a row (or press Enter on a cursored row) to view its full" - " SPARQL. Arrow keys move the cursor without triggering pretty-print." + "Double-click a row (or press Enter on a highlighted row) to view its full " + "pretty-printed SPARQL. Arrow keys move the cursor without triggering " + "pretty-print." ) @@ -162,7 +162,7 @@ def detect_slow_queries( for qid, info in queries_dict.items(): if not isinstance(info, dict) or qid in slow_seen: continue - started_at = info.get("started_at") + started_at = info.get("started-at") if started_at is None: continue duration_s = (now_ms - started_at) // 1000 @@ -212,12 +212,28 @@ def compact_slow_log(path: str) -> None: f.write(f"{logged_at}\t{status}\t{qid}\t{duration}\t{sparql}\n") +def duration_sort_key(cell) -> int: + """Parse a Duration cell back into an integer for sorting. + + Cells are written as `f"{N}s"`, `Text.from_markup("[red]Ns[/red]")`, + or the literal `"N/A"` for queries the server didn't report a + started_at for. `Text` objects expose the unstyled string via + `.plain`. `"N/A"` is mapped to -1 so it sorts to the bottom under + reverse=True (longest-first ordering). + """ + text = cell.plain if hasattr(cell, "plain") else str(cell) + try: + return int(text.rstrip("s")) + except ValueError: + return -1 + + class MonitorApp(App): """Textual app for the interactive monitor-queries TUI. Polls the server every `interval` seconds and renders active queries - in a DataTable. Selecting a row (click on the cursored row, or - Enter while the cursor is on it) shows the full pretty-printed + in a DataTable. Selecting a row (double-click a row, or + Enter on a highlighted row) shows the full pretty-printed SPARQL of that query in a detail pane below the table. Arrow keys move the cursor for browsing — they do not trigger the docker-based pretty-printer, so navigation stays snappy. @@ -242,9 +258,6 @@ class MonitorApp(App): ("f", "freeze", "Freeze/Unfreeze table"), ("d", "toggle_dark", "Toggle dark mode"), ] - # Use Textual's default theme rather than inheriting terminal - # colors — partial inheritance left the Footer's text invisible - # (its theme-driven background collided with ANSI defaults). CSS = "#detail { padding: 2; padding-top: 0; }" def __init__( @@ -282,7 +295,7 @@ def __init__( # Sticky selection survives table refreshes (we identify by qid # rather than row index — indices renumber every tick). self.selected_qid = None - # Raw SPARQL of the selected query, cached at selection time so + # Pretty-printed SPARQL of the selected query, cached at selection time so # `y` keeps working even after the query has finished # server-side and is no longer in queries_dict. self.selected_query_text = None @@ -330,13 +343,21 @@ def setup_columns(self) -> None: table.add_column("SPARQL", key="sparql") def refresh_table(self) -> None: - """Fetch active queries and rerender the table. - - Skipped entirely while `self.freeze` is True — the timer keeps - ticking but does nothing, so the last-rendered rows stay put. - On fetch failure the table is left untouched (durations freeze) - and the status caption shows the retry count; the app exits if - failures pass the threshold. + """Fetch active queries and incrementally update the table. + + Skipped entirely while `self.freeze` is True. On fetch failure + the table is left untouched (durations freeze) and the status + caption shows the retry count; the app exits if failures pass + the threshold. + + Mutation is incremental rather than clear+rebuild: rows that + persist between ticks stay in place (so the cursor anchor + survives naturally), qids that disappeared are removed, and + new qids are appended at the end. Duration and the `#` index + are rewritten on every current row every tick — duration + because the server only reports `started-at` and we derive + seconds client-side, `#` because rows above may have been + removed. """ if self.freeze: return @@ -361,21 +382,6 @@ def refresh_table(self) -> None: return self.failures = 0 status.update("") - # Capture the cursor's current qid before we mutate state — used - # below to preserve cursor position across the clear+repopulate - # so arrow-key browsing isn't undone on every tick. - table = self.query_one(DataTable) - old_qids = list(self.queries_dict.keys()) - cursor_row = ( - table.cursor_coordinate.row - if table.cursor_coordinate is not None - else None - ) - cursor_qid = ( - old_qids[cursor_row] - if cursor_row is not None and 0 <= cursor_row < len(old_qids) - else None - ) self.queries_dict = queries_dict # Lock the server's format and configure columns the first time # we actually see data. @@ -395,39 +401,81 @@ def refresh_table(self) -> None: self.warn_after, self.warning_log, ) - table.clear() - now_ms = int(time.time() * 1000) - for i, (qid, info) in enumerate(queries_dict.items(), 1): + + table = self.query_one(DataTable) + # Capture cursor's qid before mutation: removing a row above + # the cursor shifts indices up, but the cursor is tracked by + # index, so without this it'd jump to a different qid. + existing_qids = [rk.value for rk in table.rows] + cursor_row = ( + table.cursor_coordinate.row + if table.cursor_coordinate is not None + else None + ) + cursor_qid = ( + existing_qids[cursor_row] + if cursor_row is not None and 0 <= cursor_row < len(existing_qids) + else None + ) + + # Drop rows whose qids the server no longer reports. + new_qid_set = set(queries_dict) + for qid in existing_qids: + if qid not in new_qid_set: + table.remove_row(qid) + + # Append new qids at the end (server order). Index and + # duration cells are filled by the rewrite pass below. + existing_qid_set = set(existing_qids) + for qid, info in queries_dict.items(): + if qid in existing_qid_set: + continue query_text = info["query"] if isinstance(info, dict) else info sparql = re.sub(r"\s+", " ", query_text).strip() if self.has_duration: - started_at = ( - info.get("started_at") if isinstance(info, dict) else None - ) - if started_at is not None: - duration_s = (now_ms - started_at) // 1000 - if duration_s >= self.warn_after: - # Text.from_markup keeps the [red]...[/red] markup - # working inside DataTable cells. - duration_cell = Text.from_markup( - f"[red]{duration_s}s[/red]" - ) - else: - duration_cell = f"{duration_s}s" + table.add_row("", qid, "", sparql, key=qid) + else: + table.add_row("", qid, sparql, key=qid) + + # Rewrite # and duration on every current row. + now_ms = int(time.time() * 1000) + for i, row_key in enumerate(table.rows, 1): + qid = row_key.value + table.update_cell(row_key, "idx", str(i)) + if not self.has_duration: + continue + info = queries_dict.get(qid) + started_at = ( + info.get("started-at") if isinstance(info, dict) else None + ) + if started_at is not None: + duration_s = (now_ms - started_at) // 1000 + if duration_s >= self.warn_after: + # Text.from_markup keeps the [red]...[/red] markup + # working inside DataTable cells. + duration_cell = Text.from_markup( + f"[red]{duration_s}s[/red]" + ) else: - duration_cell = "N/A" - # Row key = qid so a click can resolve back to a query. - table.add_row(str(i), qid, duration_cell, sparql, key=qid) + duration_cell = f"{duration_s}s" else: - table.add_row(str(i), qid, sparql, key=qid) - # Restore cursor: prefer the user's explicit selection so the - # highlight tracks it across refreshes; otherwise restore to the - # qid the cursor was on before the refresh, so plain arrow-key - # browsing isn't reset every tick. If neither qid is still in - # the table, leave the cursor at its post-clear default (row 0). + duration_cell = "N/A" + table.update_cell(row_key, "duration", duration_cell) + + # Sort longest-running first. Stable across ticks because + # duration order is fixed by started_at — two existing rows + # never swap, only new rows slide into place. No-op on old + # servers, which have no duration column to sort by. + if self.has_duration: + table.sort("duration", key=duration_sort_key, reverse=True) + + # Re-anchor the cursor: prefer the user's explicit selection so + # the highlight tracks it; otherwise restore to whatever qid + # the cursor was on before the mutation. If neither qid is + # still in the table, leave the cursor where Textual placed it. target_qid = self.selected_qid or cursor_qid if target_qid is not None: - new_qids = list(queries_dict.keys()) + new_qids = [rk.value for rk in table.rows] if target_qid in new_qids: table.move_cursor(row=new_qids.index(target_qid)) @@ -453,11 +501,13 @@ def on_data_table_row_selected(self, event: DataTable.RowSelected) -> None: if info is None: return query_text = info["query"] if isinstance(info, dict) else info - # Cache the raw SPARQL for the copy action — keeps copy working + # Cache the pretty-printed SPARQL for the copy action — keeps copy working # if the query finishes server-side after this selection. pretty = pretty_printed_query(query_text, True, self.system) self.selected_query_text = pretty - self.query_one("#detail", Static).update(self.render_detail(qid, pretty)) + self.query_one("#detail", Static).update( + self.render_detail(qid, pretty) + ) def render_detail(self, qid: str, pretty: str) -> Group: """Build the detail-pane renderable: bold qid + highlighted SPARQL. @@ -475,15 +525,15 @@ def render_detail(self, qid: str, pretty: str) -> Group: theme=syntax_theme, word_wrap=True, ) - return Group(Text(f"Server Query ID: {qid}", style="bold"), Text(""), syntax) + return Group( + Text(f"Server Query ID: {qid}", style="bold"), Text(""), syntax + ) def action_freeze(self) -> None: """Pause or resume the periodic table refresh.""" self.freeze = not self.freeze status = self.query_one("#status", Static) - status.update( - "paused — press 'f' to resume\n" if self.freeze else "" - ) + status.update("paused — press 'f' to resume\n" if self.freeze else "") def action_toggle_dark(self) -> None: """Quick switch between textual-dark and textual-light. @@ -496,7 +546,10 @@ def action_toggle_dark(self) -> None: self.theme = ( "textual-light" if self.theme == "textual-dark" else "textual-dark" ) - if self.selected_qid is not None and self.selected_query_text is not None: + if ( + self.selected_qid is not None + and self.selected_query_text is not None + ): self.query_one("#detail", Static).update( self.render_detail(self.selected_qid, self.selected_query_text) ) @@ -525,21 +578,9 @@ def action_copy_sparql(self) -> None: if self.selected_qid is None or self.selected_query_text is None: self.notify("No query selected", severity="warning") return - self.do_copy(self.selected_query_text) - - @work(thread=True, exclusive=True) - def do_copy(self, text: str) -> None: - """Run the clipboard copy on a worker thread. - - Off-thread so a slow clipboard tool can't stall the 2s refresh - tick. exclusive=True drops the result of any older copy still - in flight when the user presses 'y' again, so only the latest - selection produces a toast. - """ - nbytes = len(text.encode("utf-8")) - ok = copy_text(text) - msg = f"Copied ({nbytes} B)" if ok else f"Copy failed ({nbytes} B)" - self.call_from_thread(self.notify, msg) + ok = copy_text(self.selected_query_text) + msg = "Copied" if ok else "Copy failed" + self.notify(msg) class MonitorQueriesTuiCommand(QleverCommand): From f2114ca39d216e6c9fbfe59299a229bd2a319aa4 Mon Sep 17 00:00:00 2001 From: tanmay-9 Date: Tue, 5 May 2026 16:49:53 +0200 Subject: [PATCH 09/11] Separate monitor-queries-app code, implement metrics header, fix slow-queries.tsv file overwrite, and make table and sparql sections fixed and internally scrollable --- src/qlever/commands/monitor_queries_tui.py | 594 +----------------- src/qlever/monitor_queries_app.py | 675 +++++++++++++++++++++ 2 files changed, 685 insertions(+), 584 deletions(-) create mode 100644 src/qlever/monitor_queries_app.py diff --git a/src/qlever/commands/monitor_queries_tui.py b/src/qlever/commands/monitor_queries_tui.py index 5a95da5d..482bfdca 100644 --- a/src/qlever/commands/monitor_queries_tui.py +++ b/src/qlever/commands/monitor_queries_tui.py @@ -1,586 +1,8 @@ from __future__ import annotations -import json -import os -import platform -import re -import shutil -import subprocess -import time -from datetime import datetime - -from rich.console import Group -from rich.syntax import Syntax -from rich.text import Text -from textual.app import App, ComposeResult -from textual.widgets import DataTable, Footer, Static - from qlever.command import QleverCommand from qlever.log import log -from qlever.util import pretty_printed_query - -MAX_CONSECUTIVE_FAILURES = 5 -SLOW_LOG_HEADER = "logged_at\tevent\tqid\tduration_s\tsparql\n" -HINT_TEXT = ( - "Double-click a row (or press Enter on a highlighted row) to view its full " - "pretty-printed SPARQL. Arrow keys move the cursor without triggering " - "pretty-print." -) - - -def copy_text(text: str) -> bool: - """ - Cross-platform clipboard copy for Textual / TUI apps. - - Supports: - - macOS : pbcopy - - Linux : wl-copy (Wayland) OR xclip/xsel (X11), never both — - falling through to xclip on a Wayland session writes - to the XWayland selection that Wayland apps don't read. - """ - try: - system = platform.system() - - candidates = [] - if system == "Darwin": - candidates.append(["pbcopy"]) - elif system == "Linux": - on_wayland = bool(os.environ.get("WAYLAND_DISPLAY")) - if on_wayland and shutil.which("wl-copy"): - # Force text/plain so wl-copy doesn't auto-detect a - # different MIME type from the content. SPARQL queries - # starting with `PREFIX foo: ` otherwise - # get tagged as a URI-ish type and browsers requesting - # text/plain on paste get nothing. - candidates.append(["wl-copy", "--type", "text/plain"]) - else: - if shutil.which("xclip"): - candidates.append( - [ - "xclip", - "-selection", - "clipboard", - "-t", - "UTF8_STRING", - ] - ) - if shutil.which("xsel"): - candidates.append(["xsel", "--clipboard", "--input"]) - - payload = text.encode("utf-8") - for cmd in candidates: - proc = subprocess.Popen( - cmd, - stdin=subprocess.PIPE, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ) - try: - proc.communicate(input=payload, timeout=2) - except subprocess.TimeoutExpired: - proc.kill() - try: - proc.communicate(timeout=1) - except Exception: - pass - continue - except Exception: - continue - if proc.returncode == 0: - return True - return False - except Exception: - return False - - -def fetch_queries(monitor_queries_cmd: str) -> dict | None: - """Fetch and parse active queries from the SPARQL endpoint. - - Returns None on failure (network error, non-JSON response), {} when - the server reports no active queries. Stays silent on failure on - purpose — the Textual app owns the screen, so any log/print here - would corrupt the rendered display. Callers surface failures via - the status caption instead. - """ - try: - output = subprocess.check_output( - monitor_queries_cmd, shell=True, stderr=subprocess.DEVNULL - ) - except Exception: - return None - output = output.strip() - if not output: - return {} - try: - parsed = json.loads(output) - except json.JSONDecodeError: - return None - return parsed if isinstance(parsed, dict) else {} - - -def server_supports_duration(queries_dict: dict) -> bool: - """Return True iff the server reports per-query duration metadata.""" - return any(isinstance(v, dict) for v in queries_dict.values()) - - -def append_slow_log( - path: str, event: str, qid: str, duration_s: int, sparql: str = "" -) -> None: - """Append a single TSV-formatted slow-query event to the warning log. - - Writes a column header if the file is empty or newly created. - """ - ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - with open(path, "a") as f: - if f.tell() == 0: - f.write(SLOW_LOG_HEADER) - f.write(f"{ts}\t{event}\t{qid}\t{duration_s}\t{sparql}\n") - - -def detect_slow_queries( - queries_dict: dict, - slow_seen: dict, - warn_after: float, - log_path: str, -) -> None: - """Log start/finish events for queries that cross the slow threshold. - - Mutates slow_seen in place (qid -> started_at). Appends a 'start' event - when a query first exceeds warn_after, and a 'finish' event when a - previously logged slow query is no longer active. - """ - now_ms = int(time.time() * 1000) - - # Queries that finished since last poll: log with final duration. - for qid in list(slow_seen): - if qid not in queries_dict: - final_s = (now_ms - slow_seen[qid]) // 1000 - append_slow_log(log_path, "finish", qid, final_s) - del slow_seen[qid] - - # Queries that just crossed the threshold: log start event. - for qid, info in queries_dict.items(): - if not isinstance(info, dict) or qid in slow_seen: - continue - started_at = info.get("started-at") - if started_at is None: - continue - duration_s = (now_ms - started_at) // 1000 - if duration_s >= warn_after: - slow_seen[qid] = started_at - sparql = re.sub(r"\s+", " ", info["query"]).strip() - append_slow_log(log_path, "start", qid, duration_s, sparql) - - -def compact_slow_log(path: str) -> None: - """Collapse start/finish event pairs in the log into one row per qid. - - Reads the append-only log, pairs each 'start' with its matching 'finish' - by qid, and rewrites the file with a single row per query showing the - final duration (or the last-known duration if the query was still - running at compaction time). No-op if the log does not exist or has - no start events. - """ - starts = {} - finals = {} - try: - with open(path) as f: - for line in f: - if line == SLOW_LOG_HEADER: - continue - parts = line.rstrip("\n").split("\t", maxsplit=4) - if len(parts) < 5: - continue - ts, event, qid, duration_str, sparql = parts - try: - duration_s = int(duration_str) - except ValueError: - continue - if event == "start": - starts[qid] = (ts, duration_s, sparql) - elif event == "finish": - finals[qid] = duration_s - except FileNotFoundError: - return - if not starts: - return - with open(path, "w") as f: - f.write("logged_at\tstatus\tqid\tduration_s\tsparql\n") - for qid, (logged_at, start_duration, sparql) in starts.items(): - duration = finals.get(qid, start_duration) - status = "finished" if qid in finals else "unfinished" - f.write(f"{logged_at}\t{status}\t{qid}\t{duration}\t{sparql}\n") - - -def duration_sort_key(cell) -> int: - """Parse a Duration cell back into an integer for sorting. - - Cells are written as `f"{N}s"`, `Text.from_markup("[red]Ns[/red]")`, - or the literal `"N/A"` for queries the server didn't report a - started_at for. `Text` objects expose the unstyled string via - `.plain`. `"N/A"` is mapped to -1 so it sorts to the bottom under - reverse=True (longest-first ordering). - """ - text = cell.plain if hasattr(cell, "plain") else str(cell) - try: - return int(text.rstrip("s")) - except ValueError: - return -1 - - -class MonitorApp(App): - """Textual app for the interactive monitor-queries TUI. - - Polls the server every `interval` seconds and renders active queries - in a DataTable. Selecting a row (double-click a row, or - Enter on a highlighted row) shows the full pretty-printed - SPARQL of that query in a detail pane below the table. Arrow keys - move the cursor for browsing — they do not trigger the docker-based - pretty-printer, so navigation stays snappy. - - Until the user selects something the detail pane shows a short - hint. After a selection the cursor is re-anchored to the selected - qid on every refresh so the highlight tracks the selected query - even when the row order shifts. If the selected query disappears - from the server, the cursor stays where it was and the detail pane - keeps showing the last seen SPARQL until the user presses `c` or - selects a different row. - - On fetch failure the last good rows stay visible (frozen) while a - status caption shows the retry count; after MAX_CONSECUTIVE_FAILURES - consecutive failures the app exits. - """ - - BINDINGS = [ - ("q", "quit", "Quit"), - ("c", "clear_detail", "Clear SPARQL"), - ("y", "copy_sparql", "Copy SPARQL"), - ("f", "freeze", "Freeze/Unfreeze table"), - ("d", "toggle_dark", "Toggle dark mode"), - ] - CSS = "#detail { padding: 2; padding-top: 0; }" - - def __init__( - self, - monitor_queries_cmd: str, - interval: float, - warn_after: float, - warning_log: str, - system: str, - ) -> None: - """Stash the fetch command, refresh interval, slow threshold, - slow-query log path, and container system. - - State that must survive across ticks (`has_duration`, - `failures`, the latest `queries_dict`, the sticky selection, - `slow_seen`) lives on the instance — in the original Live loop - these were locals, but Textual calls `refresh_table` from a - timer so each tick is a fresh frame. - """ - super().__init__() - self.monitor_queries_cmd = monitor_queries_cmd - self.interval = interval - self.warn_after = warn_after - self.warning_log = warning_log - self.system = system - # Locked on the first non-empty fetch: True for newer QLever - # servers (dict-shaped query info with started_at), False for - # older servers (plain SPARQL string). None = not yet known, - # which also means columns haven't been added yet. - self.has_duration = None - self.failures = 0 - # Most recent successful fetch; the row-click handler reads from - # this rather than re-hitting the server. - self.queries_dict = {} - # Sticky selection survives table refreshes (we identify by qid - # rather than row index — indices renumber every tick). - self.selected_qid = None - # Pretty-printed SPARQL of the selected query, cached at selection time so - # `y` keeps working even after the query has finished - # server-side and is no longer in queries_dict. - self.selected_query_text = None - self.freeze = False - # qid -> started_at for queries that have crossed the slow - # threshold and had a 'start' row appended to the warning log. - # Mutated in place by detect_slow_queries. - self.slow_seen = {} - - def compose(self) -> ComposeResult: - """Build the widget tree: DataTable, status, detail, footer.""" - # cursor_type="row" so a click selects the whole row and emits - # RowSelected (default cursor_type is "cell"). - yield DataTable(cursor_type="row") - yield Static("", id="status") - yield Static("", id="detail") - # Footer renders the BINDINGS list as labelled key hints, - # docked to the bottom by Textual's default CSS. - yield Footer() - - def on_mount(self) -> None: - """Kick off the first refresh and start the recurring timer. - - Columns are added lazily in `setup_columns` once we've seen - real data — that's how the Duration column gets hidden on - servers that don't support it (DataTable can't insert a column - mid-stream, so we defer the whole column setup until we know). - """ - self.theme = "textual-dark" - self.query_one("#detail", Static).update(HINT_TEXT) - self.refresh_table() - self.set_interval(self.interval, self.refresh_table) - - def setup_columns(self) -> None: - """Add columns now that `has_duration` is known. - - Called exactly once, on the first non-empty fetch. The Duration - column is omitted entirely on older servers. - """ - table = self.query_one(DataTable) - table.add_column("#", width=3, key="idx") - table.add_column("Query ID", width=18, key="qid") - if self.has_duration: - table.add_column("Duration", width=8, key="duration") - table.add_column("SPARQL", key="sparql") - - def refresh_table(self) -> None: - """Fetch active queries and incrementally update the table. - - Skipped entirely while `self.freeze` is True. On fetch failure - the table is left untouched (durations freeze) and the status - caption shows the retry count; the app exits if failures pass - the threshold. - - Mutation is incremental rather than clear+rebuild: rows that - persist between ticks stay in place (so the cursor anchor - survives naturally), qids that disappeared are removed, and - new qids are appended at the end. Duration and the `#` index - are rewritten on every current row every tick — duration - because the server only reports `started-at` and we derive - seconds client-side, `#` because rows above may have been - removed. - """ - if self.freeze: - return - queries_dict = fetch_queries(self.monitor_queries_cmd) - status = self.query_one("#status", Static) - if queries_dict is None: - self.failures += 1 - if self.failures > MAX_CONSECUTIVE_FAILURES: - # Triggers App.run() to return, so the try/finally in - # execute() still runs compact_slow_log on the way out. - self.exit( - message=( - f"Failed to fetch active queries more than " - f"{MAX_CONSECUTIVE_FAILURES} times." - ) - ) - return - status.update( - f"fetch failed, retrying... " - f"({self.failures}/{MAX_CONSECUTIVE_FAILURES})" - ) - return - self.failures = 0 - status.update("") - self.queries_dict = queries_dict - # Lock the server's format and configure columns the first time - # we actually see data. - if self.has_duration is None and queries_dict: - self.has_duration = server_supports_duration(queries_dict) - self.setup_columns() - # Still waiting for the first non-empty fetch — no columns yet, - # nothing to render. - if self.has_duration is None: - return - # Append slow-query start/finish events to the warning log. - # No-op on servers that don't report started_at. - if self.has_duration: - detect_slow_queries( - queries_dict, - self.slow_seen, - self.warn_after, - self.warning_log, - ) - - table = self.query_one(DataTable) - # Capture cursor's qid before mutation: removing a row above - # the cursor shifts indices up, but the cursor is tracked by - # index, so without this it'd jump to a different qid. - existing_qids = [rk.value for rk in table.rows] - cursor_row = ( - table.cursor_coordinate.row - if table.cursor_coordinate is not None - else None - ) - cursor_qid = ( - existing_qids[cursor_row] - if cursor_row is not None and 0 <= cursor_row < len(existing_qids) - else None - ) - - # Drop rows whose qids the server no longer reports. - new_qid_set = set(queries_dict) - for qid in existing_qids: - if qid not in new_qid_set: - table.remove_row(qid) - - # Append new qids at the end (server order). Index and - # duration cells are filled by the rewrite pass below. - existing_qid_set = set(existing_qids) - for qid, info in queries_dict.items(): - if qid in existing_qid_set: - continue - query_text = info["query"] if isinstance(info, dict) else info - sparql = re.sub(r"\s+", " ", query_text).strip() - if self.has_duration: - table.add_row("", qid, "", sparql, key=qid) - else: - table.add_row("", qid, sparql, key=qid) - - # Rewrite # and duration on every current row. - now_ms = int(time.time() * 1000) - for i, row_key in enumerate(table.rows, 1): - qid = row_key.value - table.update_cell(row_key, "idx", str(i)) - if not self.has_duration: - continue - info = queries_dict.get(qid) - started_at = ( - info.get("started-at") if isinstance(info, dict) else None - ) - if started_at is not None: - duration_s = (now_ms - started_at) // 1000 - if duration_s >= self.warn_after: - # Text.from_markup keeps the [red]...[/red] markup - # working inside DataTable cells. - duration_cell = Text.from_markup( - f"[red]{duration_s}s[/red]" - ) - else: - duration_cell = f"{duration_s}s" - else: - duration_cell = "N/A" - table.update_cell(row_key, "duration", duration_cell) - - # Sort longest-running first. Stable across ticks because - # duration order is fixed by started_at — two existing rows - # never swap, only new rows slide into place. No-op on old - # servers, which have no duration column to sort by. - if self.has_duration: - table.sort("duration", key=duration_sort_key, reverse=True) - - # Re-anchor the cursor: prefer the user's explicit selection so - # the highlight tracks it; otherwise restore to whatever qid - # the cursor was on before the mutation. If neither qid is - # still in the table, leave the cursor where Textual placed it. - target_qid = self.selected_qid or cursor_qid - if target_qid is not None: - new_qids = [rk.value for rk in table.rows] - if target_qid in new_qids: - table.move_cursor(row=new_qids.index(target_qid)) - - def on_data_table_row_selected(self, event: DataTable.RowSelected) -> None: - """Show the full pretty-printed SPARQL for the selected row. - - Fires on Enter while the cursor is on a row, or on a click on - the row that already holds the cursor. Arrow-key navigation - alone does NOT fire this — that's by design, so the docker-based - pretty-printer doesn't run on every keystroke. - - `row_key.value` is the qid we passed to `add_row(..., key=qid)`. - Reads from `self.queries_dict` (last successful fetch) so we - don't hit the server twice. `pretty_printed_query` shells out - to docker/podman, so this blocks the UI for a few hundred ms — - acceptable for an explicit selection. - """ - qid = event.row_key.value if event.row_key else None - if qid is None or qid == self.selected_qid: - return - self.selected_qid = qid - info = self.queries_dict.get(qid) - if info is None: - return - query_text = info["query"] if isinstance(info, dict) else info - # Cache the pretty-printed SPARQL for the copy action — keeps copy working - # if the query finishes server-side after this selection. - pretty = pretty_printed_query(query_text, True, self.system) - self.selected_query_text = pretty - self.query_one("#detail", Static).update( - self.render_detail(qid, pretty) - ) - - def render_detail(self, qid: str, pretty: str) -> Group: - """Build the detail-pane renderable: bold qid + highlighted SPARQL. - - Uses Pygments via rich.syntax. Theme tracks self.theme so the - highlight stays readable after a light/dark mode toggle. The - Syntax background is set to match the pane so the highlighter - doesn't paint its own block of color over the Static. - """ - is_dark = "light" not in self.theme - syntax_theme = "monokai" if is_dark else "default" - syntax = Syntax( - pretty, - "sparql", - theme=syntax_theme, - word_wrap=True, - ) - return Group( - Text(f"Server Query ID: {qid}", style="bold"), Text(""), syntax - ) - - def action_freeze(self) -> None: - """Pause or resume the periodic table refresh.""" - self.freeze = not self.freeze - status = self.query_one("#status", Static) - status.update("paused — press 'f' to resume\n" if self.freeze else "") - - def action_toggle_dark(self) -> None: - """Quick switch between textual-dark and textual-light. - - Independent of the command palette's full theme picker: this - is just a one-key flip between the two textual theme variants. - Also re-renders the detail pane so the SPARQL syntax theme - follows the new mode. - """ - self.theme = ( - "textual-light" if self.theme == "textual-dark" else "textual-dark" - ) - if ( - self.selected_qid is not None - and self.selected_query_text is not None - ): - self.query_one("#detail", Static).update( - self.render_detail(self.selected_qid, self.selected_query_text) - ) - - def action_clear_detail(self) -> None: - """Clear the selected-query detail pane (bound to `c`). - - Restores the initial hint text so the user knows what to do - next; the cursor stays visible because Textual ties cursor - visibility to interactivity (hiding it disables click/Enter). - """ - self.selected_qid = None - self.selected_query_text = None - self.query_one("#detail", Static).update(HINT_TEXT) - - def action_copy_sparql(self) -> None: - """Copy the selected query's raw SPARQL to the clipboard (`y`). - - Uses Textual's `copy_to_clipboard` which writes via OSC 52 - escape sequences — the terminal emulator picks it up and routes - to the system clipboard. Works over SSH if the local terminal - supports OSC 52 (modern alacritty, kitty, gnome-terminal, - iterm). Copies the raw SPARQL rather than the pretty-printed - form so it round-trips cleanly through any SPARQL endpoint. - """ - if self.selected_qid is None or self.selected_query_text is None: - self.notify("No query selected", severity="warning") - return - ok = copy_text(self.selected_query_text) - msg = "Copied" if ok else "Copy failed" - self.notify(msg) +from qlever.monitor_queries_app import MonitorApp, compact_slow_log class MonitorQueriesTuiCommand(QleverCommand): @@ -638,8 +60,16 @@ def execute(self, args) -> bool: if args.sparql_endpoint else f"{args.host_name}:{args.port}" ) + if args.interval < 1: + log.error("--interval must be at least 1 second") + return False + # --max-time bounds end-to-end fetch time. Lower bound 5s so a + # busy server doesn't get classified as failed; upper cap 30s + # so failure detection stays responsive at long intervals. + max_time = max(5, min(int(args.interval), 30)) monitor_queries_cmd = ( - f'curl -s {sparql_endpoint} --data-urlencode "cmd=dump-active-queries" ' + f'curl -s --max-time {max_time} {sparql_endpoint} ' + f'--data-urlencode "cmd=dump-active-queries" ' f'--data-urlencode access-token="{args.access_token}"' ) @@ -647,10 +77,6 @@ def execute(self, args) -> bool: if args.show: return True - if args.interval < 1: - log.error("--interval must be at least 1 second") - return False - if args.warn_after is None: try: timeout_s = float(args.timeout.rstrip("s")) diff --git a/src/qlever/monitor_queries_app.py b/src/qlever/monitor_queries_app.py new file mode 100644 index 00000000..5cd1a772 --- /dev/null +++ b/src/qlever/monitor_queries_app.py @@ -0,0 +1,675 @@ +from __future__ import annotations + +import json +import os +import platform +import re +import shutil +import subprocess +import time +from collections import deque +from datetime import datetime + +from rich.console import Group +from rich.syntax import Syntax +from rich.text import Text +from textual.app import App, ComposeResult +from textual.containers import VerticalScroll +from textual.widgets import DataTable, Footer, Static + +from qlever.util import pretty_printed_query + +MAX_CONSECUTIVE_FAILURES = 5 +SLOW_LOG_HEADER = "logged_at\tevent\tqid\tduration_s\tsparql\n" +COMPACTED_HEADER = "logged_at\tstatus\tqid\tduration_s\tsparql\n" +WINDOWS_S = [ + ("5m", "Last 5m", 300), + ("15m", "Last 15m", 900), + ("1h", "Last 1h", 3600), +] +MAX_WINDOW_S = 3600 +LABEL_WIDTH = 8 +HINT_TEXT = ( + "Double-click a row (or press Enter on a highlighted row) to view its full " + "pretty-printed SPARQL. Arrow keys move the cursor without triggering " + "pretty-print." +) + + +def copy_text(text: str) -> bool: + """Copy text to the system clipboard. Returns True on success.""" + try: + system = platform.system() + + candidates = [] + if system == "Darwin": + candidates.append(["pbcopy"]) + elif system == "Linux": + # On Wayland, never fall through to xclip/xsel: they write to + # the XWayland selection, which Wayland apps don't read. + on_wayland = bool(os.environ.get("WAYLAND_DISPLAY")) + if on_wayland and shutil.which("wl-copy"): + # Force text/plain so wl-copy doesn't tag SPARQL starting + # with `PREFIX foo: ` as a URI-ish MIME type. + candidates.append(["wl-copy", "--type", "text/plain"]) + else: + if shutil.which("xclip"): + candidates.append( + [ + "xclip", + "-selection", + "clipboard", + "-t", + "UTF8_STRING", + ] + ) + if shutil.which("xsel"): + candidates.append(["xsel", "--clipboard", "--input"]) + + payload = text.encode("utf-8") + for cmd in candidates: + proc = subprocess.Popen( + cmd, + stdin=subprocess.PIPE, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + try: + proc.communicate(input=payload, timeout=2) + except subprocess.TimeoutExpired: + proc.kill() + try: + proc.communicate(timeout=1) + except Exception: + pass + continue + except Exception: + continue + if proc.returncode == 0: + return True + return False + except Exception: + return False + + +def fetch_queries(monitor_queries_cmd: str) -> dict | None: + """Fetch active queries. Returns None on failure, {} if none active.""" + try: + output = subprocess.check_output( + monitor_queries_cmd, shell=True, stderr=subprocess.DEVNULL + ) + except Exception: + return None + output = output.strip() + if not output: + return {} + try: + parsed = json.loads(output) + except json.JSONDecodeError: + return None + return parsed if isinstance(parsed, dict) else {} + + +def server_supports_duration(queries_dict: dict) -> bool: + """Return True iff the server reports per-query duration metadata.""" + return any(isinstance(v, dict) for v in queries_dict.values()) + + +def append_slow_log( + path: str, event: str, qid: str, duration_s: int, sparql: str = "" +) -> None: + """Append one TSV slow-query event, writing the header on first write.""" + ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + with open(path, "a") as f: + if f.tell() == 0: + f.write(SLOW_LOG_HEADER) + f.write(f"{ts}\t{event}\t{qid}\t{duration_s}\t{sparql}\n") + + +def detect_slow_queries( + queries_dict: dict, + slow_seen: dict, + warn_after: float, + log_path: str, +) -> None: + """Log start/finish events for queries crossing warn_after. + + Mutates slow_seen (qid -> started_at) in place. + """ + now_ms = int(time.time() * 1000) + + for qid in list(slow_seen): + if qid not in queries_dict: + final_s = (now_ms - slow_seen[qid]) // 1000 + append_slow_log(log_path, "finish", qid, final_s) + del slow_seen[qid] + + for qid, info in queries_dict.items(): + if not isinstance(info, dict) or qid in slow_seen: + continue + started_at = info.get("started-at") + if started_at is None: + continue + duration_s = (now_ms - started_at) // 1000 + if duration_s >= warn_after: + slow_seen[qid] = started_at + sparql = re.sub(r"\s+", " ", info["query"]).strip() + append_slow_log(log_path, "start", qid, duration_s, sparql) + + +def compact_slow_log(path: str) -> None: + """Rewrite the slow-query log with one row per qid, preserving prior runs. + + Pairs each raw 'start' with its matching 'finish' from the current run; + queries still running are written with status 'unfinished'. Pre-existing + compacted rows from previous runs are passed through unchanged. + No-op if the log is missing or has nothing to write. + """ + prior_finished = {} + prior_unfinished = {} + starts = {} + finals = {} + try: + with open(path) as f: + for line in f: + if line == SLOW_LOG_HEADER or line == COMPACTED_HEADER: + continue + parts = line.rstrip("\n").split("\t", maxsplit=4) + if len(parts) < 5: + continue + ts, kind, qid, duration_str, sparql = parts + try: + duration_s = int(duration_str) + except ValueError: + continue + if kind == "start": + starts[qid] = (ts, duration_s, sparql) + elif kind == "finish": + finals[qid] = duration_s + elif kind == "finished" and qid not in prior_finished: + prior_finished[qid] = (ts, duration_s, sparql) + elif kind == "unfinished" and qid not in prior_unfinished: + prior_unfinished[qid] = (ts, duration_s, sparql) + except FileNotFoundError: + return + if not prior_finished and not prior_unfinished and not starts: + return + with open(path, "w") as f: + f.write(COMPACTED_HEADER) + for qid, (ts, duration, sparql) in prior_finished.items(): + f.write(f"{ts}\tfinished\t{qid}\t{duration}\t{sparql}\n") + for qid, (ts, duration, sparql) in prior_unfinished.items(): + if qid in starts or qid in prior_finished: + continue + f.write(f"{ts}\tunfinished\t{qid}\t{duration}\t{sparql}\n") + for qid, (logged_at, start_duration, sparql) in starts.items(): + if qid in prior_finished: + continue + duration = finals.get(qid, start_duration) + status = "finished" if qid in finals else "unfinished" + f.write(f"{logged_at}\t{status}\t{qid}\t{duration}\t{sparql}\n") + + +def format_elapsed(seconds: float) -> str: + """Format a duration as `Xs`, `Xm Ys`, or `Xh Ym`.""" + total = int(seconds) + if total < 60: + return f"{total}s" + if total < 3600: + return f"{total // 60}m {total % 60}s" + return f"{total // 3600}h {(total % 3600) // 60}m" + + +def format_metrics_line( + active: int, + peak_concurrent: int, + slow_logged_total: int, + warn_after: float, + session_elapsed_s: float, +) -> Text: + """Render the session metrics row shown above the windowed rows.""" + threshold = int(warn_after) + elapsed = format_elapsed(session_elapsed_s) + return Text.from_markup( + f"[bold green]{'Session':<{LABEL_WIDTH}}[/] [dim]│[/dim] " + f"Active queries: [cyan]{active}[/cyan] " + f"Peak concurrent: [cyan]{peak_concurrent}[/cyan] " + f"Slow queries logged (>{threshold}s): [red]{slow_logged_total}[/red]" + f" [dim](running {elapsed})[/dim]" + ) + + +def nearest_rank_percentile(sorted_values: list[int], pct: float) -> int: + """Nearest-rank percentile of an already-sorted list of ints.""" + idx = int(pct * (len(sorted_values) - 1)) + return sorted_values[idx] + + +def format_window_line( + label: str, + window_s: int, + elapsed_s: float, + finish_events: deque, + slow_start_events: deque, +) -> Text: + """Render one windowed-metrics row (5m / 15m / 1h).""" + if elapsed_s < window_s: + return Text.from_markup( + f"[bold yellow]{label:<{LABEL_WIDTH}}[/] [dim]│[/dim] " + f"[dim]warming up...[/dim]" + ) + cutoff = time.monotonic() - window_s + durations = sorted(d for ts, d in finish_events if ts >= cutoff) + slow_logged = sum(1 for ts in slow_start_events if ts >= cutoff) + if durations: + p50 = f"{nearest_rank_percentile(durations, 0.50)}s" + p95 = f"{nearest_rank_percentile(durations, 0.95)}s" + else: + p50 = p95 = "—" + return Text.from_markup( + f"[bold yellow]{label:<{LABEL_WIDTH}}[/] [dim]│[/dim] " + f"p50 duration: [cyan]{p50}[/cyan] " + f"p95 duration: [cyan]{p95}[/cyan] " + f"Slow queries logged: [red]{slow_logged}[/red]" + ) + + +def duration_sort_key(cell) -> int: + """Parse a Duration cell into an int for sorting; "N/A" -> -1.""" + text = cell.plain if hasattr(cell, "plain") else str(cell) + try: + return int(text.rstrip("s")) + except ValueError: + return -1 + + +class MonitorApp(App): + """Textual app for the interactive monitor-queries TUI. + + Polls every `interval` seconds and renders active queries in a + DataTable. Selecting a row (Enter or double-click) shows the + pretty-printed SPARQL in a detail pane; arrow keys browse without + triggering the (slow) pretty-printer. + + The cursor re-anchors to the selected qid each refresh so the + highlight follows the row even when the duration sort reorders. + On fetch failure the last good rows stay visible while a status + caption shows the retry count; the app exits after + MAX_CONSECUTIVE_FAILURES. + """ + + BINDINGS = [ + ("q", "quit", "Quit"), + ("c", "clear_detail", "Clear SPARQL"), + ("y", "copy_sparql", "Copy SPARQL"), + ("f", "freeze", "Freeze/Unfreeze table"), + ("d", "toggle_dark", "Toggle dark mode"), + ] + # Table and detail-scroll both 1fr so they share leftover space + # evenly and the detail pane doesn't jump as the table resizes. + CSS = """ + #metrics { padding-left: 1; } + Static.window-row { padding-left: 1; } + #window-1h { margin-bottom: 1; } + DataTable { height: 1fr; } + #detail-scroll { height: 1fr; } + #detail { padding-left: 2; } + """ + + def __init__( + self, + monitor_queries_cmd: str, + interval: float, + warn_after: float, + warning_log: str, + system: str, + ) -> None: + """Initialize the app and the per-session state.""" + super().__init__() + self.monitor_queries_cmd = monitor_queries_cmd + self.interval = interval + self.warn_after = warn_after + self.warning_log = warning_log + self.system = system + # None until the first non-empty fetch: True for newer servers + # (dict info with started_at), False for older ones (plain + # SPARQL string). None also means columns aren't set up yet. + self.has_duration = None + self.failures = 0 + self.queries_dict = {} + # Identify selection by qid, not row index — indices renumber + # every tick. + self.selected_qid = None + # Cached pretty-printed SPARQL so copy keeps working after the + # query finishes server-side. + self.selected_query_text = None + self.freeze = False + # qid -> started_at for queries already logged as slow. + self.slow_seen = {} + self.peak_concurrent = 0 + # Cumulative threshold-crossings this session. Distinct from + # len(slow_seen), which is currently-active slow queries. + self.slow_logged_total = 0 + # Caps in-flight fetches at one so a slow server can't pile up + # worker threads. Reset in apply_fetch on the main thread. + self.is_fetching = False + # Monotonic so it's immune to wall-clock jumps. + self.started_at = time.monotonic() + # Per-window event log. (ts_monotonic, duration_s) for each + # observed finish; ts_monotonic for each new slow-threshold + # crossing. Pruned to MAX_WINDOW_S each fetch. + self.finish_events = deque() + self.slow_start_events = deque() + + def compose(self) -> ComposeResult: + """Build the widget tree: metrics, table, status, detail, footer.""" + yield Static("", id="metrics") + for key, _, _ in WINDOWS_S: + yield Static("", id=f"window-{key}", classes="window-row") + # cursor_type="row" so a click selects the whole row and emits + # RowSelected (default is "cell"). + yield DataTable(cursor_type="row") + yield Static("", id="status") + # Wrap detail in VerticalScroll so long SPARQL scrolls inside + # the pane instead of squeezing the table. + with VerticalScroll(id="detail-scroll"): + yield Static("", id="detail") + yield Footer() + + def on_mount(self) -> None: + """Show the hint, dispatch the first fetch, start the timers.""" + # Columns are added lazily once we've seen real data, since + # DataTable can't insert a column mid-stream and we don't know + # whether to include Duration until the first fetch. + # Two timers: a 1s repaint so durations advance smoothly, and + # a slower fetch timer at the user's interval. Decoupling them + # avoids the "two repaints clustered, then long pause" jank + # that came from doing both on a single timer. + self.theme = "textual-dark" + self.query_one("#detail", Static).update(HINT_TEXT) + self.dispatch_fetch() + self.set_interval(1.0, self.repaint_timer) + self.set_interval(self.interval, self.fetch_timer) + + def setup_columns(self) -> None: + """Add columns once `has_duration` is known. Called once.""" + table = self.query_one(DataTable) + table.add_column("Query ID", width=18, key="qid") + if self.has_duration: + # Header as Text to match the right-justified cell values. + table.add_column( + Text("Duration", justify="right"), width=8, key="duration" + ) + table.add_column("SPARQL", key="sparql") + if not self.has_duration: + self.query_one("#metrics", Static).display = False + for key, _, _ in WINDOWS_S: + self.query_one(f"#window-{key}", Static).display = False + + def repaint_timer(self) -> None: + """1s timer callback: repaint cached durations and metrics.""" + if self.freeze: + return + self.rerender_from_cache() + + def fetch_timer(self) -> None: + """Interval timer callback: dispatch a fetch worker.""" + if self.freeze: + return + self.dispatch_fetch() + + def dispatch_fetch(self) -> None: + """Start a curl worker if none is in flight. + + Only one fetch in flight at a time so a slow server can't pile + up worker threads. thread=True keeps the blocking subprocess + off the event loop; the worker calls back via call_from_thread + so widget mutation stays single-threaded. + """ + if self.is_fetching: + return + self.is_fetching = True + self.run_worker(self.fetch_in_thread, thread=True) + + def fetch_in_thread(self) -> None: + """Worker body: fetch off the event loop, hand result back.""" + queries_dict = fetch_queries(self.monitor_queries_cmd) + self.call_from_thread(self.apply_fetch, queries_dict) + + def apply_fetch(self, queries_dict: dict | None) -> None: + """Process a fetch result on the main thread.""" + self.is_fetching = False + status = self.query_one("#status", Static) + if queries_dict is None: + self.failures += 1 + if self.failures > MAX_CONSECUTIVE_FAILURES: + # exit() returns from App.run() so the try/finally in + # execute() still runs compact_slow_log. + self.exit( + message=( + f"Failed to fetch active queries more than " + f"{MAX_CONSECUTIVE_FAILURES} times." + ) + ) + return + status.update( + f"fetch failed, retrying... " + f"({self.failures}/{MAX_CONSECUTIVE_FAILURES})" + ) + return + self.failures = 0 + status.update("") + prev_queries = self.queries_dict + self.queries_dict = queries_dict + # Lock the server format and configure columns on the first + # non-empty fetch. + if self.has_duration is None and queries_dict: + self.has_duration = server_supports_duration(queries_dict) + self.setup_columns() + if self.has_duration is None: + return + if self.has_duration and len(queries_dict) > self.peak_concurrent: + self.peak_concurrent = len(queries_dict) + if self.has_duration: + # Snapshot slow_seen before detect_slow_queries so the post-call + # set diff is exactly this tick's new threshold-crossings. + slow_seen_before = set(self.slow_seen) + detect_slow_queries( + queries_dict, + self.slow_seen, + self.warn_after, + self.warning_log, + ) + new_slow = set(self.slow_seen) - slow_seen_before + self.slow_logged_total += len(new_slow) + self.record_window_events(prev_queries, queries_dict, new_slow) + + table = self.query_one(DataTable) + # Capture cursor's qid before mutation: the cursor is tracked + # by index, so removing a row above it would shift it to a + # different qid. + existing_qids = [rk.value for rk in table.rows] + cursor_row = ( + table.cursor_coordinate.row + if table.cursor_coordinate is not None + else None + ) + cursor_qid = ( + existing_qids[cursor_row] + if cursor_row is not None and 0 <= cursor_row < len(existing_qids) + else None + ) + + new_qid_set = set(queries_dict) + for qid in existing_qids: + if qid not in new_qid_set: + table.remove_row(qid) + + # Duration cells are filled by rerender_from_cache below. + existing_qid_set = set(existing_qids) + for qid, info in queries_dict.items(): + if qid in existing_qid_set: + continue + query_text = info["query"] if isinstance(info, dict) else info + sparql = re.sub(r"\s+", " ", query_text).strip() + if self.has_duration: + table.add_row(qid, "", sparql, key=qid) + else: + table.add_row(qid, sparql, key=qid) + + # Prefer the explicit selection so the highlight tracks it; + # else restore to wherever the cursor was before the diff. + target_qid = self.selected_qid or cursor_qid + if target_qid is not None: + new_qids = [rk.value for rk in table.rows] + if target_qid in new_qids: + table.move_cursor(row=new_qids.index(target_qid)) + + self.rerender_from_cache() + + def record_window_events( + self, prev_queries: dict, queries_dict: dict, new_slow: set + ) -> None: + """Append finish/slow-start events to the window deques and prune. + + A "finish" is a qid present in the previous snapshot but absent + now; duration is derived from its server-reported started-at. + """ + now_mono = time.monotonic() + now_ms = int(time.time() * 1000) + for qid in set(prev_queries) - set(queries_dict): + info = prev_queries.get(qid) + started_at = ( + info.get("started-at") if isinstance(info, dict) else None + ) + if started_at is not None: + duration_s = (now_ms - started_at) // 1000 + self.finish_events.append((now_mono, duration_s)) + for _ in new_slow: + self.slow_start_events.append(now_mono) + cutoff = now_mono - MAX_WINDOW_S + while self.finish_events and self.finish_events[0][0] < cutoff: + self.finish_events.popleft() + while self.slow_start_events and self.slow_start_events[0] < cutoff: + self.slow_start_events.popleft() + + def rerender_from_cache(self) -> None: + """Repaint durations and metrics from the cached snapshot. + + No I/O, no row add/remove. Runs every tick (so durations tick + up while a fetch is in flight) and at the tail of apply_fetch + (so a fresh snapshot is visible immediately). + """ + if not self.has_duration: + return + table = self.query_one(DataTable) + now_ms = int(time.time() * 1000) + for row_key in table.rows: + qid = row_key.value + info = self.queries_dict.get(qid) + started_at = ( + info.get("started-at") if isinstance(info, dict) else None + ) + if started_at is not None: + duration_s = (now_ms - started_at) // 1000 + if duration_s >= self.warn_after: + duration_cell = Text.from_markup( + f"[red]{duration_s}s[/red]", justify="right" + ) + else: + duration_cell = Text( + f"{duration_s}s", justify="right" + ) + else: + duration_cell = Text("N/A", justify="right") + table.update_cell(row_key, "duration", duration_cell) + table.sort("duration", key=duration_sort_key, reverse=True) + elapsed_s = time.monotonic() - self.started_at + self.query_one("#metrics", Static).update( + format_metrics_line( + active=len(self.queries_dict), + peak_concurrent=self.peak_concurrent, + slow_logged_total=self.slow_logged_total, + warn_after=self.warn_after, + session_elapsed_s=elapsed_s, + ) + ) + for key, label, window_s in WINDOWS_S: + self.query_one(f"#window-{key}", Static).update( + format_window_line( + label=label, + window_s=window_s, + elapsed_s=elapsed_s, + finish_events=self.finish_events, + slow_start_events=self.slow_start_events, + ) + ) + + def on_data_table_row_selected(self, event: DataTable.RowSelected) -> None: + """Pretty-print and show the SPARQL of the selected row. + + Fires on Enter or click; arrow-key navigation does not fire + this, by design — pretty_printed_query shells out to + docker/podman and blocks for hundreds of ms. + """ + qid = event.row_key.value if event.row_key else None + if qid is None or qid == self.selected_qid: + return + self.selected_qid = qid + info = self.queries_dict.get(qid) + if info is None: + return + query_text = info["query"] if isinstance(info, dict) else info + pretty = pretty_printed_query(query_text, True, self.system) + self.selected_query_text = pretty + self.query_one("#detail", Static).update( + self.render_detail(qid, pretty) + ) + + def render_detail(self, qid: str, pretty: str) -> Group: + """Render the detail pane: bold qid plus highlighted SPARQL.""" + is_dark = "light" not in self.theme + syntax_theme = "monokai" if is_dark else "default" + syntax = Syntax( + pretty, + "sparql", + theme=syntax_theme, + word_wrap=True, + ) + return Group( + Text(f"Server Query ID: {qid}", style="bold"), Text(""), syntax + ) + + def action_freeze(self) -> None: + """Pause or resume the periodic table refresh.""" + self.freeze = not self.freeze + status = self.query_one("#status", Static) + status.update("paused — press 'f' to resume\n" if self.freeze else "") + + def action_toggle_dark(self) -> None: + """Flip between textual-dark and textual-light themes.""" + self.theme = ( + "textual-light" if self.theme == "textual-dark" else "textual-dark" + ) + if ( + self.selected_qid is not None + and self.selected_query_text is not None + ): + self.query_one("#detail", Static).update( + self.render_detail(self.selected_qid, self.selected_query_text) + ) + + def action_clear_detail(self) -> None: + """Clear the detail pane and restore the hint.""" + self.selected_qid = None + self.selected_query_text = None + self.query_one("#detail", Static).update(HINT_TEXT) + + def action_copy_sparql(self) -> None: + """Copy the selected query's pretty-printed SPARQL to the clipboard.""" + if self.selected_qid is None or self.selected_query_text is None: + self.notify("No query selected", severity="warning") + return + ok = copy_text(self.selected_query_text) + msg = "Copied" if ok else "Copy failed" + self.notify(msg) From c52bcaf646623868a65eac5d2023cb8bbff8f330 Mon Sep 17 00:00:00 2001 From: tanmay-9 Date: Tue, 5 May 2026 17:24:45 +0200 Subject: [PATCH 10/11] Fix the first render of the table and metrics header, and truncate the qid with ellipsis --- src/qlever/monitor_queries_app.py | 78 ++++++++++++++++++++----------- 1 file changed, 51 insertions(+), 27 deletions(-) diff --git a/src/qlever/monitor_queries_app.py b/src/qlever/monitor_queries_app.py index 5cd1a772..40fa82ec 100644 --- a/src/qlever/monitor_queries_app.py +++ b/src/qlever/monitor_queries_app.py @@ -274,6 +274,16 @@ def format_window_line( ) +QID_COL_WIDTH = 14 + + +def truncate_qid(qid: str) -> str: + """Truncate a qid to fit the Query ID column, with a trailing ellipsis.""" + if len(qid) <= QID_COL_WIDTH: + return qid + return qid[: QID_COL_WIDTH - 1] + "…" + + def duration_sort_key(cell) -> int: """Parse a Duration cell into an int for sorting; "N/A" -> -1.""" text = cell.plain if hasattr(cell, "plain") else str(cell) @@ -387,6 +397,14 @@ def on_mount(self) -> None: # that came from doing both on a single timer. self.theme = "textual-dark" self.query_one("#detail", Static).update(HINT_TEXT) + # Paint a placeholder so the user sees something while the + # first fetch (which can take up to --max-time) is in flight. + self.query_one("#metrics", Static).update( + Text.from_markup( + f"[bold green]{'Session':<{LABEL_WIDTH}}[/] [dim]│[/dim] " + f"[dim]waiting for first fetch...[/dim]" + ) + ) self.dispatch_fetch() self.set_interval(1.0, self.repaint_timer) self.set_interval(self.interval, self.fetch_timer) @@ -394,7 +412,7 @@ def on_mount(self) -> None: def setup_columns(self) -> None: """Add columns once `has_duration` is known. Called once.""" table = self.query_one(DataTable) - table.add_column("Query ID", width=18, key="qid") + table.add_column("Query ID", width=QID_COL_WIDTH, key="qid") if self.has_duration: # Header as Text to match the right-justified cell values. table.add_column( @@ -467,6 +485,9 @@ def apply_fetch(self, queries_dict: dict | None) -> None: self.has_duration = server_supports_duration(queries_dict) self.setup_columns() if self.has_duration is None: + # Empty server, format not yet locked in. Paint metrics so + # the placeholder is replaced with real zeros. + self.rerender_from_cache() return if self.has_duration and len(queries_dict) > self.peak_concurrent: self.peak_concurrent = len(queries_dict) @@ -512,10 +533,11 @@ def apply_fetch(self, queries_dict: dict | None) -> None: continue query_text = info["query"] if isinstance(info, dict) else info sparql = re.sub(r"\s+", " ", query_text).strip() + display_qid = truncate_qid(qid) if self.has_duration: - table.add_row(qid, "", sparql, key=qid) + table.add_row(display_qid, "", sparql, key=qid) else: - table.add_row(qid, sparql, key=qid) + table.add_row(display_qid, sparql, key=qid) # Prefer the explicit selection so the highlight tracks it; # else restore to wherever the cursor was before the diff. @@ -558,32 +580,34 @@ def rerender_from_cache(self) -> None: No I/O, no row add/remove. Runs every tick (so durations tick up while a fetch is in flight) and at the tail of apply_fetch - (so a fresh snapshot is visible immediately). + (so a fresh snapshot is visible immediately). Metrics paint + unconditionally so an empty server still shows real zeros + instead of the "waiting for first fetch" placeholder; the + old-server path hides those rows via setup_columns. """ - if not self.has_duration: - return - table = self.query_one(DataTable) - now_ms = int(time.time() * 1000) - for row_key in table.rows: - qid = row_key.value - info = self.queries_dict.get(qid) - started_at = ( - info.get("started-at") if isinstance(info, dict) else None - ) - if started_at is not None: - duration_s = (now_ms - started_at) // 1000 - if duration_s >= self.warn_after: - duration_cell = Text.from_markup( - f"[red]{duration_s}s[/red]", justify="right" - ) + if self.has_duration: + table = self.query_one(DataTable) + now_ms = int(time.time() * 1000) + for row_key in table.rows: + qid = row_key.value + info = self.queries_dict.get(qid) + started_at = ( + info.get("started-at") if isinstance(info, dict) else None + ) + if started_at is not None: + duration_s = (now_ms - started_at) // 1000 + if duration_s >= self.warn_after: + duration_cell = Text.from_markup( + f"[red]{duration_s}s[/red]", justify="right" + ) + else: + duration_cell = Text( + f"{duration_s}s", justify="right" + ) else: - duration_cell = Text( - f"{duration_s}s", justify="right" - ) - else: - duration_cell = Text("N/A", justify="right") - table.update_cell(row_key, "duration", duration_cell) - table.sort("duration", key=duration_sort_key, reverse=True) + duration_cell = Text("N/A", justify="right") + table.update_cell(row_key, "duration", duration_cell) + table.sort("duration", key=duration_sort_key, reverse=True) elapsed_s = time.monotonic() - self.started_at self.query_one("#metrics", Static).update( format_metrics_line( From 2c91b7c26db3fee753b70b2953b7f2bdd01cdf00 Mon Sep 17 00:00:00 2001 From: tanmay-9 Date: Wed, 6 May 2026 12:18:46 +0200 Subject: [PATCH 11/11] Move pretty printing to its own thread --- src/qlever/monitor_queries_app.py | 29 +++++++++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/src/qlever/monitor_queries_app.py b/src/qlever/monitor_queries_app.py index 40fa82ec..477a96c2 100644 --- a/src/qlever/monitor_queries_app.py +++ b/src/qlever/monitor_queries_app.py @@ -630,21 +630,46 @@ def rerender_from_cache(self) -> None: ) def on_data_table_row_selected(self, event: DataTable.RowSelected) -> None: - """Pretty-print and show the SPARQL of the selected row. + """Pretty-print (off-thread) and show the SPARQL of the selected row. Fires on Enter or click; arrow-key navigation does not fire this, by design — pretty_printed_query shells out to - docker/podman and blocks for hundreds of ms. + docker/podman and blocks for hundreds of ms, so it runs in a + worker thread and the detail pane shows a placeholder while + it's in flight. """ qid = event.row_key.value if event.row_key else None if qid is None or qid == self.selected_qid: return self.selected_qid = qid + self.selected_query_text = None info = self.queries_dict.get(qid) if info is None: return query_text = info["query"] if isinstance(info, dict) else info + self.query_one("#detail", Static).update( + Group( + Text(f"Server Query ID: {qid}", style="bold"), + Text(""), + Text("Pretty-printing...", style="dim italic"), + ) + ) + self.run_worker( + lambda: self.pretty_print_in_thread(qid, query_text), thread=True + ) + + def pretty_print_in_thread(self, qid: str, query_text: str) -> None: + """Worker body: pretty-print off the event loop, hand result back.""" pretty = pretty_printed_query(query_text, True, self.system) + self.call_from_thread(self.apply_pretty, qid, pretty) + + def apply_pretty(self, qid: str, pretty: str) -> None: + """Install pretty-printed SPARQL if the selection hasn't moved on.""" + # Discard stale results: if the user has already selected a + # different row, the in-flight worker's output would overwrite + # the newer selection. + if qid != self.selected_qid: + return self.selected_query_text = pretty self.query_one("#detail", Static).update( self.render_detail(qid, pretty)