diff --git a/.github/workflows/docker-build.yml b/.github/workflows/docker-build.yml index 102a286..bd576a4 100644 --- a/.github/workflows/docker-build.yml +++ b/.github/workflows/docker-build.yml @@ -17,6 +17,13 @@ jobs: - name: Checkout code uses: actions/checkout@v4 + - name: Prepare environment file for CI + run: | + if [ ! -f installer/.env ]; then + cp installer/.env.example installer/.env + echo "Created .env file from .env.example" + fi + - name: Validate docker-compose files run: | cd installer @@ -32,7 +39,7 @@ jobs: strategy: matrix: - service: [slackbot, lappy, startup-data-loader, file-uploader] + service: [slackbot, lap-detector, startup-data-loader, file-uploader] steps: - name: Checkout code @@ -52,7 +59,10 @@ jobs: - name: Extract metadata id: meta run: | - echo -e "tags=${{ env.REGISTRY }}/${IMAGE_NAME_LOWER}/${{ matrix.service }}:latest\n${{ env.REGISTRY }}/${IMAGE_NAME_LOWER}/${{ matrix.service }}:${{ github.sha }}" >> $GITHUB_OUTPUT + echo "tags<> $GITHUB_OUTPUT + echo "${{ env.REGISTRY }}/${IMAGE_NAME_LOWER}/${{ matrix.service }}:latest" >> $GITHUB_OUTPUT + echo "${{ env.REGISTRY }}/${IMAGE_NAME_LOWER}/${{ matrix.service }}:${{ github.sha }}" >> $GITHUB_OUTPUT + echo "EOF" >> $GITHUB_OUTPUT - name: Build and push Docker image uses: docker/build-push-action@v5 diff --git a/.github/workflows/stack-smoke-test.yml b/.github/workflows/stack-smoke-test.yml index e9e18c3..2dee02d 100644 --- a/.github/workflows/stack-smoke-test.yml +++ b/.github/workflows/stack-smoke-test.yml @@ -29,8 +29,17 @@ jobs: - name: Ensure CI script is executable run: chmod +x ./dev-utils/ci/stack-smoke-test.sh + - name: Prepare environment file for CI + run: | + if [ ! -f installer/.env ]; then + cp installer/.env.example installer/.env + echo "Created .env file from .env.example" + fi + ls -la installer/.env + - name: Run installer smoke test env: + CI: "true" DOCKER_BUILDKIT: "1" COMPOSE_DOCKER_CLI_BUILD: "1" run: ./dev-utils/ci/stack-smoke-test.sh \ No newline at end of file diff --git a/.gitignore b/.gitignore index b751da6..4fa4e92 100644 --- a/.gitignore +++ b/.gitignore @@ -203,7 +203,13 @@ installer/*.dbc !installer/example.dbc installer/slackbot/logs/* + +installer/sandbox/prompt-guide.txt + +wfr-telemetry +/installer/data-downloader/data installer/slackbot/*.png +!installer/slackbot/lappy_test_image.png installer/slackbot/*.jpg installer/slackbot/*.jpeg diff --git a/installer/data-downloader/README.md b/installer/data-downloader/README.md index 1446e09..ca336b3 100644 --- a/installer/data-downloader/README.md +++ b/installer/data-downloader/README.md @@ -21,6 +21,51 @@ Both JSON files are shared through the `./data` directory so every service (fron 3. Open http://localhost:3000 to access the web UI, and keep the API running on http://localhost:8000 if you want to call it directly. ## Runtime behaviour +```mermaid +sequenceDiagram + participant Worker as periodic_worker.py + participant Service as DataDownloaderService + participant Scanner as server_scanner.py + participant Slicks as slicks library + participant InfluxDB as InfluxDB3 + participant Storage as JSON Storage + + Worker->>Service: run_full_scan(source="periodic") + Service->>Service: Sort seasons by year (newest first) + + loop For each season (WFR25, WFR26) + Service->>Scanner: scan_runs(ScannerConfig{
database: season.database,
year: season.year}) + Scanner->>Slicks: connect_influxdb3(url, token, db) + Scanner->>Slicks: scan_data_availability(start, end, table, bin_size) + + loop Adaptive scanning (inside slicks) + Slicks->>InfluxDB: Try query_grouped_bins()
(DATE_BIN + COUNT(*)) + alt Success + InfluxDB-->>Slicks: Return bins with counts + else Failure (timeout/size) + Slicks->>Slicks: Binary subdivision + Slicks->>InfluxDB: query_exists_per_bin()
(SELECT 1 LIMIT 1 per bin) + InfluxDB-->>Slicks: Return existence flags + end + end + + Slicks-->>Scanner: ScanResult (windows) + Scanner-->>Service: List[dict] (formatted runs) + + Service->>Service: fetch_unique_sensors(season.database) + Service->>Storage: runs_repos[season.name].merge_scanned_runs(runs) + Storage-->>Storage: Atomic write to runs_WFR25.json + Service->>Storage: sensors_repos[season.name].write_sensors(sensors) + Storage-->>Storage: Atomic write to sensors_WFR25.json + + alt Season scan failed + Service->>Service: Log error, continue to next season + end + end + + Service->>Storage: status_repo.mark_finish(success) + Storage-->>Storage: Update scanner_status.json +``` - `frontend` serves the compiled React bundle via nginx and now proxies `/api` requests (including `/api/scan` and `/api/scanner-status`) directly to the FastAPI container. When the UI is loaded from anything other than `localhost`, the client automatically falls back to relative `/api/...` calls so a single origin on a VPS still reaches the backend. Override `VITE_API_BASE_URL` if you want the UI to talk to a different host (for example when running `npm run dev` locally) and keep that host in `ALLOWED_ORIGINS`. - `api` runs `uvicorn backend.app:app`, exposing diff --git a/installer/data-downloader/backend/app.py b/installer/data-downloader/backend/app.py index 09e47c7..b4b4dea 100644 --- a/installer/data-downloader/backend/app.py +++ b/installer/data-downloader/backend/app.py @@ -4,6 +4,7 @@ from fastapi import BackgroundTasks, FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import HTMLResponse from pydantic import BaseModel from backend.config import get_settings @@ -40,14 +41,19 @@ def healthcheck() -> dict: return {"status": "ok"} +@app.get("/api/seasons") +def list_seasons() -> List[dict]: + return service.get_seasons() + + @app.get("/api/runs") -def list_runs() -> dict: - return service.get_runs() +def list_runs(season: str | None = None) -> dict: + return service.get_runs(season=season) @app.get("/api/sensors") -def list_sensors() -> dict: - return service.get_sensors() +def list_sensors(season: str | None = None) -> dict: + return service.get_sensors(season=season) @app.get("/api/scanner-status") @@ -56,10 +62,10 @@ def scanner_status() -> dict: @app.post("/api/runs/{key}/note") -def save_note(key: str, payload: NotePayload) -> dict: - run = service.update_note(key, payload.note.strip()) +def save_note(key: str, payload: NotePayload, season: str | None = None) -> dict: + run = service.update_note(key, payload.note.strip(), season=season) if not run: - raise HTTPException(status_code=404, detail=f"Run {key} not found") + raise HTTPException(status_code=404, detail=f"Run {key} not found (season={season})") return run @@ -69,7 +75,77 @@ def trigger_scan(background_tasks: BackgroundTasks) -> dict: return {"status": "scheduled"} -@app.post("/api/data/query") -def query_data(payload: DataQueryPayload) -> dict: +@app.post("/api/query") +def query_signal(payload: DataQueryPayload, season: str | None = None) -> dict: limit = None if payload.no_limit else (payload.limit or 2000) - return service.query_signal_series(payload.signal, payload.start, payload.end, limit) + return service.query_signal_series( + payload.signal, + payload.start, + payload.end, + limit, + season=season + ) + + +@app.get("/", response_class=HTMLResponse) +def index(): + """Simple status page for debugging.""" + influx_status = "Unknown" + influx_color = "gray" + try: + service._log_influx_connectivity() + influx_status = "Connected" + influx_color = "green" + except Exception as e: + influx_status = f"Error: {e}" + influx_color = "red" + + # Default to first season for overview + runs = service.get_runs() + sensors = service.get_sensors() + scanner_status = service.get_scanner_status() + seasons_list = service.get_seasons() + seasons_html = ", ".join([f"{s['name']} ({s['year']})" for s in seasons_list]) + + html = f""" + + + + DAQ Data Downloader Status + + + +

DAQ Data Downloader Status

+ +
+

System Status

+

InfluxDB Connection: {influx_status}

+

Scanner Status: {scanner_status.get('status', 'Unknown')} (Last run: {scanner_status.get('last_run', 'Never')})

+

API Version: 1.1.0 (Multi-Season Support)

+
+ +
+

Active Config

+

Seasons Configured: {seasons_html}

+
+ +
+

Default Season Stats ({seasons_list[0]['name'] if seasons_list else 'None'})

+ +
+ +

API Docs | JSON Seasons List | Frontend

+ + + """ + return HTMLResponse(content=html) diff --git a/installer/data-downloader/backend/config.py b/installer/data-downloader/backend/config.py index 3a39205..653f70e 100644 --- a/installer/data-downloader/backend/config.py +++ b/installer/data-downloader/backend/config.py @@ -12,6 +12,52 @@ def _parse_origins(raw: str | None) -> List[str]: return [origin.strip() for origin in raw.split(",") if origin.strip()] +class SeasonConfig(BaseModel): + name: str # e.g. "WFR25" + year: int # e.g. 2025 + database: str # e.g. "WFR25" + color: str | None = None # e.g. "222 76 153" + + +def _parse_seasons(raw: str | None) -> List[SeasonConfig]: + """Parse SEASONS env var: "WFR25:2025:222 76 153,WFR26:2026:...".""" + if not raw: + # Default fallback if not set + return [SeasonConfig(name="WFR25", year=2025, database="WFR25", color="#DE4C99")] + + seasons = [] + for part in raw.split(","): + part = part.strip() + if not part: + continue + try: + # Split into at most 3 parts: Name, Year, Color + parts = part.split(":", 2) + name = parts[0] + + if len(parts) >= 2: + year = int(parts[1]) + else: + # Malformed or simple format not supported purely by regex? + # Actually if just "WFR25", split gives ['WFR25'] + # require at least year + continue + + color = parts[2] if len(parts) > 2 else None + + # Assume DB name matches Season Name + seasons.append(SeasonConfig(name=name, year=year, database=name, color=color)) + except ValueError: + continue + + if not seasons: + return [SeasonConfig(name="WFR25", year=2025, database="WFR25")] + + # Sort by year descending (newest first) + seasons.sort(key=lambda s: s.year, reverse=True) + return seasons + + class Settings(BaseModel): """Centralised configuration pulled from environment variables.""" @@ -19,12 +65,15 @@ class Settings(BaseModel): influx_host: str = Field(default_factory=lambda: os.getenv("INFLUX_HOST", "http://localhost:9000")) influx_token: str = Field(default_factory=lambda: os.getenv("INFLUX_TOKEN", "")) - influx_database: str = Field(default_factory=lambda: os.getenv("INFLUX_DATABASE", "WFR25")) + + # Global/Default Influx settings (used for connectivity check or default fallback) influx_schema: str = Field(default_factory=lambda: os.getenv("INFLUX_SCHEMA", "iox")) influx_table: str = Field(default_factory=lambda: os.getenv("INFLUX_TABLE", "WFR25")) - scanner_year: int = Field(default_factory=lambda: int(os.getenv("SCANNER_YEAR", "2025"))) - scanner_bin: str = Field(default_factory=lambda: os.getenv("SCANNER_BIN", "hour")) # hour or day + seasons: List[SeasonConfig] = Field(default_factory=lambda: _parse_seasons(os.getenv("SEASONS"))) + + # Scanner settings common to all seasons (unless we want per-season granularity later) + scanner_bin: str = Field(default_factory=lambda: os.getenv("SCANNER_BIN", "hour")) scanner_include_counts: bool = Field(default_factory=lambda: os.getenv("SCANNER_INCLUDE_COUNTS", "true").lower() == "true") scanner_initial_chunk_days: int = Field(default_factory=lambda: int(os.getenv("SCANNER_INITIAL_CHUNK_DAYS", "31"))) @@ -32,6 +81,7 @@ class Settings(BaseModel): sensor_lookback_days: int = Field(default_factory=lambda: int(os.getenv("SENSOR_LOOKBACK_DAYS", "30"))) periodic_interval_seconds: int = Field(default_factory=lambda: int(os.getenv("SCAN_INTERVAL_SECONDS", "3600"))) + scan_daily_time: str | None = Field(default_factory=lambda: os.getenv("SCAN_DAILY_TIME")) allowed_origins: List[str] = Field(default_factory=lambda: _parse_origins(os.getenv("ALLOWED_ORIGINS", "*"))) diff --git a/installer/data-downloader/backend/influx_queries.py b/installer/data-downloader/backend/influx_queries.py index 18991f9..95a4369 100644 --- a/installer/data-downloader/backend/influx_queries.py +++ b/installer/data-downloader/backend/influx_queries.py @@ -14,7 +14,14 @@ def _normalize(dt: datetime) -> datetime: return dt.astimezone(timezone.utc) -def fetch_signal_series(settings: Settings, signal: str, start: datetime, end: datetime, limit: int | None) -> dict: +def fetch_signal_series( + settings: Settings, + signal: str, + start: datetime, + end: datetime, + limit: int | None, + database: str | None = None +) -> dict: start_dt = _normalize(start) end_dt = _normalize(end) if start_dt >= end_dt: @@ -35,8 +42,11 @@ def fetch_signal_series(settings: Settings, signal: str, start: datetime, end: d AND time <= TIMESTAMP '{end_dt.isoformat()}' ORDER BY time{limit_clause} """ + + # Use provided database or fallback to default setting + target_db = database if database else settings.influx_database - with InfluxDBClient3(host=settings.influx_host, token=settings.influx_token, database=settings.influx_database) as client: + with InfluxDBClient3(host=settings.influx_host, token=settings.influx_token, database=target_db) as client: tbl = client.query(sql) points = [] for idx in range(tbl.num_rows): @@ -56,6 +66,7 @@ def fetch_signal_series(settings: Settings, signal: str, start: datetime, end: d "start": start_dt.isoformat(), "end": end_dt.isoformat(), "limit": limit, + "database": target_db, "row_count": len(points), "points": points, "sql": " ".join(line.strip() for line in sql.strip().splitlines()), diff --git a/installer/data-downloader/backend/periodic_worker.py b/installer/data-downloader/backend/periodic_worker.py index 78b839b..b2e18a5 100644 --- a/installer/data-downloader/backend/periodic_worker.py +++ b/installer/data-downloader/backend/periodic_worker.py @@ -2,6 +2,7 @@ import asyncio import logging +from datetime import datetime, timedelta from backend.config import get_settings from backend.services import DataDownloaderService @@ -12,16 +13,40 @@ async def run_worker(): settings = get_settings() service = DataDownloaderService(settings) + interval = max(30, settings.periodic_interval_seconds) - logging.info("Starting periodic scanner loop (interval=%ss)", interval) + daily_time = settings.scan_daily_time + + if daily_time: + logging.info(f"Starting periodic scanner loop (daily at {daily_time})") + else: + logging.info(f"Starting periodic scanner loop (interval={interval}s)") + while True: try: logging.info("Running scheduled scan...") service.run_full_scan(source="periodic") logging.info("Finished scheduled scan.") + + if daily_time: + # Calculate seconds until next occurrence of daily_time + now = datetime.now() + target_hour, target_minute = map(int, daily_time.split(":")) + target = now.replace(hour=target_hour, minute=target_minute, second=0, microsecond=0) + + if target <= now: + # If target time has passed today, schedule for tomorrow + target += timedelta(days=1) + + sleep_seconds = (target - now).total_seconds() + logging.info(f"Next scan scheduled for {target} (in {sleep_seconds:.0f}s)") + await asyncio.sleep(sleep_seconds) + else: + await asyncio.sleep(interval) + except Exception: - logging.exception("Scheduled scan failed") - await asyncio.sleep(interval) + logging.exception("Scheduled scan failed. Retrying in 60s...") + await asyncio.sleep(60) if __name__ == "__main__": diff --git a/installer/data-downloader/backend/requirements.txt b/installer/data-downloader/backend/requirements.txt index 3ab2d76..6f2deab 100644 --- a/installer/data-downloader/backend/requirements.txt +++ b/installer/data-downloader/backend/requirements.txt @@ -2,3 +2,4 @@ fastapi==0.115.4 uvicorn[standard]==0.23.2 influxdb3-python==0.16.0 pydantic==2.9.2 +slicks>=0.1.5 diff --git a/installer/data-downloader/backend/server_scanner.py b/installer/data-downloader/backend/server_scanner.py index 9e8b59d..4326027 100644 --- a/installer/data-downloader/backend/server_scanner.py +++ b/installer/data-downloader/backend/server_scanner.py @@ -1,14 +1,20 @@ +"""Thin wrapper that delegates run scanning to the *slicks* package. + +The public API (``ScannerConfig`` + ``scan_runs``) is unchanged so the +rest of the backend continues to work without modification. +""" + from __future__ import annotations from dataclasses import dataclass -from datetime import datetime, timedelta, timezone +from datetime import datetime, timezone from hashlib import md5 -from typing import Iterable, List, Sequence, Tuple +from typing import List -from influxdb_client_3 import InfluxDBClient3 from zoneinfo import ZoneInfo -from backend.table_utils import quote_table +import slicks +from slicks.scanner import scan_data_availability UTC = timezone.utc @@ -31,167 +37,73 @@ def tz(self) -> ZoneInfo: @property def start(self) -> datetime: - return datetime(self.year, 1, 1, tzinfo=UTC) + # Season starts in August of the previous year + return datetime(self.year - 1, 8, 1, tzinfo=UTC) @property def end(self) -> datetime: + # Season ends at the end of the configured year (Jan 1 of year + 1) return datetime(self.year + 1, 1, 1, tzinfo=UTC) - @property - def interval(self) -> str: - return "1 day" if self.bin_size == "day" else "1 hour" - - @property - def step(self) -> timedelta: - return timedelta(days=1) if self.bin_size == "day" else timedelta(hours=1) - @property - def table_ref(self) -> str: - return quote_table(self.table) +def _build_key(start_dt_utc: datetime, end_dt_utc: datetime) -> str: + raw = f"{start_dt_utc.isoformat()}_{end_dt_utc.isoformat()}" + return md5(raw.encode()).hexdigest()[:10] def scan_runs(config: ScannerConfig) -> List[dict]: - """Run the adaptive scan and return formatted windows.""" - bins = list(fetch_bins_adaptive(config)) - windows = compress_bins(bins, config.step) - return format_windows(windows, config) - - -def fetch_bins_adaptive(config: ScannerConfig) -> Iterable[Tuple[datetime, int]]: - """Iterate over bucket start times with counts.""" - - # Fixed origin so DATE_BIN alignment is consistent across all chunks/splits - bin_origin = config.start.isoformat() - - def query_grouped_bins(client: InfluxDBClient3, t0: datetime, t1: datetime) -> Sequence[Tuple[datetime, int]]: - sql = f""" - SELECT - DATE_BIN(INTERVAL '{config.interval}', time, TIMESTAMP '{bin_origin}') AS bucket, - COUNT(*) AS n - FROM {config.table_ref} - WHERE time >= TIMESTAMP '{t0.isoformat()}' - AND time < TIMESTAMP '{t1.isoformat()}' - GROUP BY bucket - HAVING COUNT(*) > 0 - ORDER BY bucket - """ - tbl = client.query(sql) - rows: List[Tuple[datetime, int]] = [] - for i in range(tbl.num_rows): - bucket = tbl.column("bucket")[i].as_py() - n = tbl.column("n")[i].as_py() - if bucket.tzinfo is None: - bucket = bucket.replace(tzinfo=UTC) - else: - bucket = bucket.astimezone(UTC) - rows.append((bucket, int(n))) - return rows - - def query_exists_per_bin(client: InfluxDBClient3, t0: datetime, t1: datetime) -> List[Tuple[datetime, int]]: - cur = t0 - rows: List[Tuple[datetime, int]] = [] - while cur < t1: - nxt = min(cur + config.step, t1) - sql = f""" - SELECT 1 - FROM {config.table_ref} - WHERE time >= TIMESTAMP '{cur.isoformat()}' - AND time < TIMESTAMP '{nxt.isoformat()}' - LIMIT 1 - """ - try: - tbl = client.query(sql) - if tbl.num_rows > 0: - rows.append((cur, 1)) - except Exception: - pass - cur = nxt - return rows - - def process_range(client: InfluxDBClient3, t0: datetime, t1: datetime, chunk_days: float): - min_exists_span = config.step * 4 - if (t1 - t0) <= min_exists_span: - for pair in query_exists_per_bin(client, t0, t1): - yield pair - return - try: - for pair in query_grouped_bins(client, t0, t1): - yield pair - return - except Exception: - mid = t0 + (t1 - t0) / 2 - if mid <= t0 or mid >= t1: - for pair in query_exists_per_bin(client, t0, t1): - yield pair - return - for pair in process_range(client, t0, mid, chunk_days / 2): - yield pair - for pair in process_range(client, mid, t1, chunk_days / 2): - yield pair - - with InfluxDBClient3(host=config.host, token=config.token, database=config.database) as client: - cur = config.start - while cur < config.end: - nxt = min(cur + timedelta(days=config.initial_chunk_days), config.end) - for pair in process_range(client, cur, nxt, config.initial_chunk_days): - yield pair - cur = nxt - - -def compress_bins(pairs: Sequence[Tuple[datetime, int]], step: timedelta) -> List[Tuple[datetime, datetime, int, int]]: - """Merge consecutive buckets into contiguous windows.""" - sorted_pairs = sorted(pairs, key=lambda row: row[0]) - windows: List[Tuple[datetime, datetime, int, int]] = [] - cur_start = cur_end = None - bins_in = rows_in = 0 - - for bucket_start, n in sorted_pairs: - if cur_start is None: - cur_start = bucket_start - cur_end = bucket_start + step - bins_in = 1 - rows_in = n - continue - if bucket_start == cur_end: - cur_end += step - bins_in += 1 - rows_in += n - else: - windows.append((cur_start, cur_end, bins_in, rows_in)) - cur_start = bucket_start - cur_end = bucket_start + step - bins_in = 1 - rows_in = n - - if cur_start is not None: - windows.append((cur_start, cur_end, bins_in, rows_in)) - return windows - - -def format_windows(windows: Sequence[Tuple[datetime, datetime, int, int]], config: ScannerConfig) -> List[dict]: - tz = config.tz - formatted = [] - for start_utc, end_utc, bins_cnt, rows_cnt in windows: - start_local = start_utc.astimezone(tz) - end_local = end_utc.astimezone(tz) - entry = { - "key": build_key(start_utc, end_utc), - "start_utc": start_utc.isoformat(), - "end_utc": end_utc.isoformat(), - "start_local": start_local.isoformat(), - "end_local": end_local.isoformat(), - "timezone": config.timezone_name, - "bins": bins_cnt, - } - if config.include_counts: - entry["row_count"] = rows_cnt - formatted.append(entry) - return formatted + """Run the adaptive scan via *slicks* and return formatted windows.""" + + # Configure slicks to point at the same InfluxDB instance + # config.table comes in as "schema.table" from services.py + schema, table_name = "iox", config.table + if "." in config.table: + parts = config.table.split(".", 1) + schema, table_name = parts[0], parts[1] + + slicks.connect_influxdb3( + url=config.host, + token=config.token, + db=config.database, + schema=schema, + table=table_name, + ) + # Determine the table string slicks expects ("schema.table") + # We pass None to use the global configured table we just set above + # Or we can just pass table_name if scan_data_availability expects a name? + # scan_data_availability expects "schema.table" or defaults to config. + # Let's rely on the global config we just set. + table = None + + result = scan_data_availability( + start=config.start, + end=config.end, + timezone=config.timezone_name, + table=table, + bin_size=config.bin_size, + include_counts=config.include_counts, + show_progress=False, + ) -def build_key(start_dt_utc: datetime, end_dt_utc: datetime) -> str: - raw = f"{start_dt_utc.isoformat()}_{end_dt_utc.isoformat()}" - return md5(raw.encode()).hexdigest()[:10] + # Convert ScanResult → List[dict] matching the old format + formatted: List[dict] = [] + for _day, windows in result: + for w in windows: + entry = { + "key": _build_key(w.start_utc, w.end_utc), + "start_utc": w.start_utc.isoformat(), + "end_utc": w.end_utc.isoformat(), + "start_local": w.start_local.isoformat(), + "end_local": w.end_local.isoformat(), + "timezone": config.timezone_name, + "bins": w.bins, + } + if config.include_counts: + entry["row_count"] = w.row_count + formatted.append(entry) + + return formatted if __name__ == "__main__": # pragma: no cover diff --git a/installer/data-downloader/backend/services.py b/installer/data-downloader/backend/services.py index a7d252e..ab485cc 100644 --- a/installer/data-downloader/backend/services.py +++ b/installer/data-downloader/backend/services.py @@ -36,86 +36,154 @@ def __init__(self, settings: Settings): self.settings = settings data_dir = Path(settings.data_dir).resolve() data_dir.mkdir(parents=True, exist_ok=True) - self.runs_repo = RunsRepository(data_dir) - self.sensors_repo = SensorsRepository(data_dir) + + # Repositories keyed by season name (e.g. "WFR25") + self.runs_repos: Dict[str, RunsRepository] = {} + self.sensors_repos: Dict[str, SensorsRepository] = {} + + for season in settings.seasons: + # Suffix file with season name: runs_WFR25.json + self.runs_repos[season.name] = RunsRepository(data_dir, suffix=season.name) + self.sensors_repos[season.name] = SensorsRepository(data_dir, suffix=season.name) + self.status_repo = ScannerStatusRepository(data_dir) self._log_influx_connectivity() - def get_runs(self) -> dict: - return self.runs_repo.list_runs() - - def get_sensors(self) -> dict: - return self.sensors_repo.list_sensors() - - def update_note(self, key: str, note: str) -> dict | None: - return self.runs_repo.update_note(key, note) + def get_runs(self, season: str | None = None) -> dict: + target_season = season or self._default_season() + repo = self.runs_repos.get(target_season) + if not repo: + return {"runs": [], "error": f"Season {target_season} not found"} + return repo.list_runs() + + def get_sensors(self, season: str | None = None) -> dict: + target_season = season or self._default_season() + repo = self.sensors_repos.get(target_season) + if not repo: + return {"sensors": [], "error": f"Season {target_season} not found"} + return repo.list_sensors() + + def update_note(self, key: str, note: str, season: str | None = None) -> dict | None: + target_season = season or self._default_season() + repo = self.runs_repos.get(target_season) + if not repo: + return None + return repo.update_note(key, note) def get_scanner_status(self) -> dict: return self.status_repo.get_status() + + def get_seasons(self) -> List[dict]: + """Return list of available seasons.""" + return [ + {"name": s.name, "year": s.year, "database": s.database, "color": s.color} + for s in self.settings.seasons + ] def run_full_scan(self, source: str = "manual") -> Dict[str, dict]: self.status_repo.mark_start(source) + results = {} + errors = [] + try: - runs = scan_runs( - ScannerConfig( - host=self.settings.influx_host, - token=self.settings.influx_token, - database=self.settings.influx_database, - table=f"{self.settings.influx_schema}.{self.settings.influx_table}", - year=self.settings.scanner_year, - bin_size=self.settings.scanner_bin, - include_counts=self.settings.scanner_include_counts, - initial_chunk_days=self.settings.scanner_initial_chunk_days, - ) - ) - fallback_start, fallback_end = self._build_sensor_fallback_range(runs) - runs_payload = self.runs_repo.merge_scanned_runs(runs) - - sensors = fetch_unique_sensors( - SensorQueryConfig( - host=self.settings.influx_host, - token=self.settings.influx_token, - database=self.settings.influx_database, - schema=self.settings.influx_schema, - table=self.settings.influx_table, - window_days=self.settings.sensor_window_days, - lookback_days=self.settings.sensor_lookback_days, - fallback_start=fallback_start, - fallback_end=fallback_end, - ) - ) - sensors_payload = self.sensors_repo.write_sensors(sensors) - - self.status_repo.mark_finish(success=True) - - return { - "runs": runs_payload, - "sensors": sensors_payload, - } + # Sort seasons by year descending to ensure most recent is scanned first + sorted_seasons = sorted(self.settings.seasons, key=lambda s: s.year, reverse=True) + for season in sorted_seasons: + try: + logger.info(f"Scanning season {season.name} (DB: {season.database})...") + + runs = scan_runs( + ScannerConfig( + host=self.settings.influx_host, + token=self.settings.influx_token, + database=season.database, + table=f"{self.settings.influx_schema}.{self.settings.influx_table}", + year=season.year, + bin_size=self.settings.scanner_bin, + include_counts=self.settings.scanner_include_counts, + initial_chunk_days=self.settings.scanner_initial_chunk_days, + ) + ) + + repo_runs = self.runs_repos[season.name] + runs_payload = repo_runs.merge_scanned_runs(runs) + + fallback_start, fallback_end = self._build_sensor_fallback_range(runs) + + sensors = fetch_unique_sensors( + SensorQueryConfig( + host=self.settings.influx_host, + token=self.settings.influx_token, + database=season.database, + schema=self.settings.influx_schema, + table=self.settings.influx_table, + window_days=self.settings.sensor_window_days, + lookback_days=self.settings.sensor_lookback_days, + fallback_start=fallback_start, + fallback_end=fallback_end, + ) + ) + repo_sensors = self.sensors_repos[season.name] + sensors_payload = repo_sensors.write_sensors(sensors) + + results[season.name] = { + "runs": len(runs_payload.get("runs", [])), + "sensors": len(sensors_payload.get("sensors", [])) + } + + except Exception as e: + logger.exception(f"Failed to scan season {season.name}") + errors.append(f"{season.name}: {str(e)}") + # Continue scanning other seasons even if one fails + + if errors: + self.status_repo.mark_finish(success=False, error="; ".join(errors)) + else: + self.status_repo.mark_finish(success=True) + + return results + except Exception as exc: self.status_repo.mark_finish(success=False, error=str(exc)) raise - def query_signal_series(self, signal: str, start: datetime, end: datetime, limit: Optional[int]) -> dict: - return fetch_signal_series(self.settings, signal, start, end, limit) + def query_signal_series(self, signal: str, start: datetime, end: datetime, limit: Optional[int], season: str | None = None) -> dict: + target_season_name = season or self._default_season() + season_cfg = next((s for s in self.settings.seasons if s.name == target_season_name), None) + + if not season_cfg: + raise ValueError(f"Season {target_season_name} not configured") + + # Temporarily override settings with season database for the query + # This is a bit hacky but avoids refactoring fetch_signal_series signature deeper + # Ideally fetch_signal_series should take db name argument + + # Actually fetch_signal_series takes 'settings' object. + # We can construct a proxy or just rely on the existing signature if we modify it. + # But modify backend/influx_queries.py is safer. + # For now, let's assume fetch_signal_series uses settings.influx_database. + # We need to pass the correct DB. + + return fetch_signal_series(self.settings, signal, start, end, limit, database=season_cfg.database) + + def _default_season(self) -> str: + # Default to the first (newest) season if available + if self.settings.seasons: + return self.settings.seasons[0].name + return "WFR25" def _log_influx_connectivity(self) -> None: + # Check connectivity for the default season + season = self.settings.seasons[0] if self.settings.seasons else None + if not season: + return + host = self.settings.influx_host - database = self.settings.influx_database - table = f"{self.settings.influx_schema}.{self.settings.influx_table}" + database = season.database try: - logger.info( - "Checking InfluxDB connectivity (host=%s, database=%s, table=%s)", - host, - database, - table, - ) + logger.info("Checking InfluxDB connectivity (%s -> %s)", host, database) with InfluxDBClient3(host=host, token=self.settings.influx_token, database=database) as client: - ping_fn = getattr(client, "ping", None) - if callable(ping_fn): - ping_fn() - else: - client.query("SELECT 1") + getattr(client, "ping", lambda: client.query("SELECT 1"))() logger.info("InfluxDB connectivity OK") except Exception: logger.exception("InfluxDB connectivity check failed") diff --git a/installer/data-downloader/backend/sql.py b/installer/data-downloader/backend/sql.py index 66cd6e9..d529e5f 100644 --- a/installer/data-downloader/backend/sql.py +++ b/installer/data-downloader/backend/sql.py @@ -1,12 +1,18 @@ +"""Thin wrapper that delegates sensor discovery to the *slicks* package. + +The public API (``SensorQueryConfig`` + ``fetch_unique_sensors``) is +unchanged so ``services.py`` works without modification. +""" + from __future__ import annotations from dataclasses import dataclass from datetime import datetime, timedelta, timezone -from typing import List, Set +from typing import List -from influxdb_client_3 import InfluxDBClient3 +import slicks +from slicks.discovery import discover_sensors -from backend.table_utils import quote_table UTC = timezone.utc @@ -23,52 +29,41 @@ class SensorQueryConfig: fallback_start: datetime | None = None fallback_end: datetime | None = None - @property - def table_ref(self) -> str: - identifier = f"{self.schema}.{self.table}" if self.schema else self.table - return quote_table(identifier) - def fetch_unique_sensors(config: SensorQueryConfig) -> List[str]: - """Collect distinct signal names by scanning the recent history.""" + """Collect distinct signal names by scanning recent history via *slicks*.""" + + # Configure slicks to point at the same InfluxDB instance + slicks.connect_influxdb3( + url=config.host, + token=config.token, + db=config.database, + schema=config.schema, + table=config.table, + ) + end = datetime.now(UTC) start = end - timedelta(days=config.lookback_days) - unique: Set[str] = set() - with InfluxDBClient3(host=config.host, token=config.token, database=config.database) as client: - unique.update( - _collect_range(client, config, start, end) + # slicks 0.1.3 generates invalid SQL for InfluxDB 3 if passed timezone-aware datetimes + # because it appends 'Z' to an ISO string that already has an offset. + # Passing naive UTC datetimes works around this. + sensors = discover_sensors( + start_time=start.replace(tzinfo=None), + end_time=end.replace(tzinfo=None), + chunk_size_days=config.window_days, + show_progress=False, + ) + + if not sensors and config.fallback_start and config.fallback_end: + sensors = discover_sensors( + start_time=config.fallback_start.replace(tzinfo=None), + end_time=config.fallback_end.replace(tzinfo=None), + chunk_size_days=config.window_days, + show_progress=False, ) - if not unique and config.fallback_start and config.fallback_end: - unique.update( - _collect_range(client, config, config.fallback_start, config.fallback_end) - ) - return sorted(unique) - - -def _collect_range(client: InfluxDBClient3, config: SensorQueryConfig, start: datetime, end: datetime) -> Set[str]: - cur = start - found: Set[str] = set() - while cur < end: - nxt = min(cur + timedelta(days=config.window_days), end) - sql = f""" - SELECT DISTINCT "signalName" - FROM {config.table_ref} - WHERE time >= TIMESTAMP '{cur.isoformat()}' - AND time < TIMESTAMP '{nxt.isoformat()}' - LIMIT 5000 - """ - try: - tbl = client.query(sql) - if tbl.num_rows == 0: - cur = nxt - continue - for row in tbl.column("signalName"): - found.add(row.as_py()) - except Exception: - pass - cur = nxt - return found + + return sensors if __name__ == "__main__": # pragma: no cover diff --git a/installer/data-downloader/backend/storage.py b/installer/data-downloader/backend/storage.py index 018a80d..1a103c9 100644 --- a/installer/data-downloader/backend/storage.py +++ b/installer/data-downloader/backend/storage.py @@ -43,9 +43,10 @@ def _write_file(self, payload: dict) -> None: class RunsRepository: - def __init__(self, data_dir: Path): + def __init__(self, data_dir: Path, suffix: str = ""): + filename = f"runs_{suffix}.json" if suffix else "runs.json" default = {"updated_at": None, "runs": []} - self.store = JSONStore(data_dir / "runs.json", default) + self.store = JSONStore(data_dir / filename, default) def list_runs(self) -> dict: return self.store.read() @@ -130,9 +131,10 @@ def _parse_timestamp(value: Optional[str]) -> datetime: class SensorsRepository: - def __init__(self, data_dir: Path): + def __init__(self, data_dir: Path, suffix: str = ""): + filename = f"sensors_{suffix}.json" if suffix else "sensors.json" default = {"updated_at": None, "sensors": []} - self.store = JSONStore(data_dir / "sensors.json", default) + self.store = JSONStore(data_dir / filename, default) def list_sensors(self) -> dict: return self.store.read() diff --git a/installer/data-downloader/frontend/src/App.tsx b/installer/data-downloader/frontend/src/App.tsx index 9980cf6..8a54cb9 100644 --- a/installer/data-downloader/frontend/src/App.tsx +++ b/installer/data-downloader/frontend/src/App.tsx @@ -1,6 +1,6 @@ import { useCallback, useEffect, useMemo, useRef, useState } from "react"; -import { fetchRuns, fetchSensors, fetchScannerStatus, triggerScan, updateNote } from "./api"; -import { RunRecord, RunsResponse, ScannerStatus, SensorsResponse } from "./types"; +import { fetchRuns, fetchSensors, fetchScannerStatus, triggerScan, updateNote, fetchSeasons } from "./api"; +import { RunRecord, RunsResponse, ScannerStatus, SensorsResponse, Season } from "./types"; import { RunTable } from "./components/RunTable"; import { DataDownload } from "./components/data-download"; @@ -15,6 +15,8 @@ interface DownloaderSelection { } export default function App() { + const [seasons, setSeasons] = useState([]); + const [selectedSeason, setSelectedSeason] = useState(""); // season name const [runs, setRuns] = useState(null); const [sensors, setSensors] = useState(null); const [loading, setLoading] = useState(true); @@ -31,16 +33,36 @@ export default function App() { const loadData = useCallback(async () => { try { setLoading(true); - const [runsData, sensorsData] = await Promise.all([fetchRuns(), fetchSensors()]); + + let currentSeason = selectedSeason; + + // Initial load: fetch seasons if we don't have them + if (seasons.length === 0) { + const seasonsList = await fetchSeasons(); + setSeasons(seasonsList); + if (seasonsList.length > 0 && !currentSeason) { + currentSeason = seasonsList[0].name; + setSelectedSeason(currentSeason); + } + } + + // If we still don't have a season (e.g. no seasons configured), fetch with default (undefined) + const seasonArg = currentSeason || undefined; + + const [runsData, sensorsData] = await Promise.all([ + fetchRuns(seasonArg), + fetchSensors(seasonArg) + ]); setRuns(runsData); setSensors(sensorsData); setError(null); } catch (err) { + console.error(err); setError(err instanceof Error ? err.message : "Failed to fetch data"); } finally { setLoading(false); } - }, []); + }, [selectedSeason, seasons.length]); const loadStatus = useCallback( async (syncOnFinishChange: boolean) => { @@ -110,14 +132,14 @@ export default function App() { prev ? { ...prev, scanning: false, last_result: "error", error: message } : { - scanning: false, - started_at: null, - finished_at: null, - source: null, - last_result: "error", - error: message, - updated_at: new Date().toISOString() - } + scanning: false, + started_at: null, + finished_at: null, + source: null, + last_result: "error", + error: message, + updated_at: new Date().toISOString() + } ); } finally { setTimeout(() => setScanState("idle"), 5000); @@ -137,7 +159,7 @@ export default function App() { const nextNote = noteDrafts[key] ?? runs?.runs.find((r) => r.key === key)?.note ?? ""; setSavingKey(key); try { - const updated = await updateNote(key, nextNote); + const updated = await updateNote(key, nextNote, selectedSeason); setRuns((prev) => { if (!prev) return prev; const updatedRuns = prev.runs.map((run) => (run.key === key ? updated : run)); @@ -188,13 +210,46 @@ export default function App() { ? new Date(sensors.updated_at).toLocaleString() : "never"; + const selectedSeasonColor = useMemo(() => { + return seasons.find(s => s.name === selectedSeason)?.color || "#0bf"; // Default blue + }, [seasons, selectedSeason]); + return (
-
-

DAQ Data Downloader

-

- Inspect historical scans, refresh availability, and capture run notes. -

+
+
+
+

DAQ Data Downloader

+

+ Inspect historical scans, refresh availability, and capture run notes. +

+
+ + {seasons.length > 0 && ( +
+ + +
+ )} +
{scanningActive && ( @@ -278,6 +333,7 @@ export default function App() { diff --git a/installer/data-downloader/frontend/src/api.ts b/installer/data-downloader/frontend/src/api.ts index c3b3b1b..bb585cd 100644 --- a/installer/data-downloader/frontend/src/api.ts +++ b/installer/data-downloader/frontend/src/api.ts @@ -2,6 +2,7 @@ import { RunRecord, RunsResponse, ScannerStatus, + Season, SensorDataResponse, SensorsResponse } from "./types"; @@ -33,12 +34,18 @@ async function request(path: string, init?: RequestInit): Promise { return (await response.json()) as T; } -export function fetchRuns(): Promise { - return request("/api/runs"); +export function fetchSeasons(): Promise { + return request("/api/seasons"); } -export function fetchSensors(): Promise { - return request("/api/sensors"); +export function fetchRuns(season?: string): Promise { + const query = season ? `?season=${encodeURIComponent(season)}` : ""; + return request(`/api/runs${query}`); +} + +export function fetchSensors(season?: string): Promise { + const query = season ? `?season=${encodeURIComponent(season)}` : ""; + return request(`/api/sensors${query}`); } export function fetchScannerStatus(): Promise { @@ -49,8 +56,9 @@ export function triggerScan(): Promise<{ status: string }> { return request("/api/scan", { method: "POST" }); } -export function updateNote(key: string, note: string): Promise { - return request(`/api/runs/${encodeURIComponent(key)}/note`, { +export function updateNote(key: string, note: string, season?: string): Promise { + const query = season ? `?season=${encodeURIComponent(season)}` : ""; + return request(`/api/runs/${encodeURIComponent(key)}/note${query}`, { method: "POST", body: JSON.stringify({ note }) }); @@ -64,8 +72,9 @@ export interface DataQueryPayload { no_limit?: boolean; } -export function querySensorData(payload: DataQueryPayload): Promise { - return request("/api/data/query", { +export function querySensorData(payload: DataQueryPayload, season?: string): Promise { + const query = season ? `?season=${encodeURIComponent(season)}` : ""; + return request(`/api/query${query}`, { method: "POST", body: JSON.stringify(payload) }); diff --git a/installer/data-downloader/frontend/src/components/data-download.tsx b/installer/data-downloader/frontend/src/components/data-download.tsx index 53873df..3dfed2b 100644 --- a/installer/data-downloader/frontend/src/components/data-download.tsx +++ b/installer/data-downloader/frontend/src/components/data-download.tsx @@ -18,6 +18,7 @@ interface ExternalSelection { interface Props { runs: RunRecord[]; sensors: string[]; + season?: string; externalSelection?: ExternalSelection; } @@ -60,7 +61,7 @@ const toUtcTooltip = (value: string) => { return dt.isValid ? `${dt.toFormat("yyyy-LL-dd HH:mm:ss")} UTC` : value; }; -export function DataDownload({ runs, sensors, externalSelection }: Props) { +export function DataDownload({ runs, sensors, season, externalSelection }: Props) { const [selectedRunKey, setSelectedRunKey] = useState(""); const [selectedRunTimezone, setSelectedRunTimezone] = useState(null); const [selectedSensor, setSelectedSensor] = useState(""); @@ -205,7 +206,7 @@ export function DataDownload({ runs, sensors, externalSelection }: Props) { limit: parsedLimit, no_limit: noLimit || undefined }; - const response = await querySensorData(payload); + const response = await querySensorData(payload, season); setSeries(response.points); const { points, ...meta } = response; setQueryMeta(meta); @@ -223,17 +224,17 @@ export function DataDownload({ runs, sensors, externalSelection }: Props) { series.length === 0 ? [] : [ - { - x: series.map((point) => point.time), - y: series.map((point) => point.value), - customdata: series.map((point) => toUtcTooltip(point.time)), - type: "scatter", - mode: "lines", - line: { color: "#2563eb", width: 2 }, - hovertemplate: "%{y}
%{customdata}", - name: selectedSensor || "Sensor" - } - ], + { + x: series.map((point) => point.time), + y: series.map((point) => point.value), + customdata: series.map((point) => toUtcTooltip(point.time)), + type: "scatter", + mode: "lines", + line: { color: "#2563eb", width: 2 }, + hovertemplate: "%{y}
%{customdata}", + name: selectedSensor || "Sensor" + } + ], [series, selectedSensor] ); diff --git a/installer/data-downloader/frontend/src/types.ts b/installer/data-downloader/frontend/src/types.ts index d2ce7a1..f379b2d 100644 --- a/installer/data-downloader/frontend/src/types.ts +++ b/installer/data-downloader/frontend/src/types.ts @@ -45,3 +45,10 @@ export interface ScannerStatus { error?: string | null; updated_at: string | null; } + +export interface Season { + name: string; + year: number; + database: string; + color?: string; +} diff --git a/installer/data-downloader/testing/check_slicks_env.py b/installer/data-downloader/testing/check_slicks_env.py new file mode 100644 index 0000000..a8f262e --- /dev/null +++ b/installer/data-downloader/testing/check_slicks_env.py @@ -0,0 +1,7 @@ + +import os +import slicks.config + +print(f"Env INFLUX_URL: {os.environ.get('INFLUX_URL')}") +print(f"slicks.config.INFLUX_URL: {getattr(slicks.config, 'INFLUX_URL', 'Not Set')}") +print(f"slicks.config.INFLUX_DB: {getattr(slicks.config, 'INFLUX_DB', 'Not Set')}") diff --git a/installer/data-downloader/testing/debug_influx.py b/installer/data-downloader/testing/debug_influx.py new file mode 100644 index 0000000..36ed03b --- /dev/null +++ b/installer/data-downloader/testing/debug_influx.py @@ -0,0 +1,59 @@ + +import os +import pandas as pd +from influxdb_client_3 import InfluxDBClient3 + +host = os.getenv("INFLUX_HOST", "http://influxdb3:8181") +token = os.getenv("INFLUX_TOKEN", "") +db = os.getenv("INFLUX_DATABASE", "WFR25") +schema = os.getenv("INFLUX_SCHEMA", "iox") +table_name = os.getenv("INFLUX_TABLE", "WFR25") + +full_table = f'"{schema}"."{table_name}"' + +print(f"Connecting to {host}, db={db}") +print(f"Querying table: {full_table}") + +client = InfluxDBClient3(host=host, token=token, database=db) + +# 1. Check time range +try: + sql = f'SELECT MIN(time) as min_t, MAX(time) as max_t FROM {full_table}' + print(f"Executing: {sql}") + table = client.query(sql) + print("Full Time Range:") + print(table.to_pandas()) + + # 1.5 Check June 2025 Specifically + sql_june = f""" + SELECT COUNT("time") as count + FROM {full_table} + WHERE time >= '2025-06-01T00:00:00Z' + AND time < '2025-07-01T00:00:00Z' + """ + print(f"\nChecking June 2025: {sql_june}") + table_june = client.query(sql_june) + print(table_june.to_pandas()) +except Exception as e: + print(f"Error querying time range: {e}") + +# 2. Check columns and sample data +try: + sql = f'SELECT * FROM {full_table} LIMIT 1' + print(f"Executing: {sql}") + table = client.query(sql) + print("Columns:", table.column_names) + print("Sample Data:") + print(table.to_pandas()) +except Exception as e: + print(f"Error querying sample data: {e}") + +# 3. Check distinct signalName +try: + sql = f'SELECT DISTINCT "signalName" FROM {full_table} LIMIT 10' + print(f"Executing: {sql}") + table = client.query(sql) + print("Distinct Signals:") + print(table.to_pandas()) +except Exception as e: + print(f"Error querying distinct signals: {e}") diff --git a/installer/data-downloader/testing/inspect_slicks.py b/installer/data-downloader/testing/inspect_slicks.py new file mode 100644 index 0000000..28ecf27 --- /dev/null +++ b/installer/data-downloader/testing/inspect_slicks.py @@ -0,0 +1,6 @@ + +import inspect +from slicks.scanner import scan_data_availability + +print("Source of scan_data_availability:") +print(inspect.getsource(scan_data_availability)) diff --git a/installer/data-downloader/testing/inspect_slicks_config.py b/installer/data-downloader/testing/inspect_slicks_config.py new file mode 100644 index 0000000..26e0945 --- /dev/null +++ b/installer/data-downloader/testing/inspect_slicks_config.py @@ -0,0 +1,25 @@ + +import slicks +import inspect + +# 1. Print connect_influxdb3 source +if hasattr(slicks, 'connect_influxdb3'): + print("Source of connect_influxdb3:") + print(inspect.getsource(slicks.connect_influxdb3)) + +# 2. Check config values before and after +import slicks.config +print("\nConfig BEFORE:") +print(f"INFLUX_URL: {getattr(slicks.config, 'INFLUX_URL', 'Not Set')}") +print(f"INFLUX_DB: {getattr(slicks.config, 'INFLUX_DB', 'Not Set')}") + +print("\nCalling connect_influxdb3...") +slicks.connect_influxdb3( + url="http://test-host:9999", + token="test-token", + db="test-db" +) + +print("\nConfig AFTER:") +print(f"INFLUX_URL: {getattr(slicks.config, 'INFLUX_URL', 'Not Set')}") +print(f"INFLUX_DB: {getattr(slicks.config, 'INFLUX_DB', 'Not Set')}") diff --git a/installer/data-downloader/testing/reproduce_slicks.py b/installer/data-downloader/testing/reproduce_slicks.py new file mode 100644 index 0000000..99ef5fc --- /dev/null +++ b/installer/data-downloader/testing/reproduce_slicks.py @@ -0,0 +1,30 @@ + +import os +import slicks +from slicks.discovery import discover_sensors +from datetime import datetime, timedelta, timezone + +host = os.getenv("INFLUX_HOST", "http://influxdb3:8181") +token = os.getenv("INFLUX_TOKEN", "") +db = os.getenv("INFLUX_DATABASE", "WFR25") + +print(f"Connecting to {host}, db={db}") +slicks.connect_influxdb3(url=host, token=token, db=db) + +# Range: [now-365d, now] +end = datetime.now(timezone.utc) +start = end - timedelta(days=365) + +print(f"Scanning range: {start} to {end}") + +try: + sensors = discover_sensors( + start_time=start, + end_time=end, + chunk_size_days=7, + show_progress=True, + ) + print(f"Found {len(sensors)} sensors.") + print(sensors) +except Exception as e: + print(f"Error: {e}") diff --git a/installer/data-downloader/testing/reproduce_slicks_naive.py b/installer/data-downloader/testing/reproduce_slicks_naive.py new file mode 100644 index 0000000..da4d99b --- /dev/null +++ b/installer/data-downloader/testing/reproduce_slicks_naive.py @@ -0,0 +1,30 @@ + +import os +import slicks +from slicks.discovery import discover_sensors +from datetime import datetime, timedelta + +host = os.getenv("INFLUX_HOST", "http://influxdb3:8181") +token = os.getenv("INFLUX_TOKEN", "") +db = os.getenv("INFLUX_DATABASE", "WFR25") + +print(f"Connecting to {host}, db={db}") +slicks.connect_influxdb3(url=host, token=token, db=db) + +# Naive UTC datetime (no tzinfo) +end = datetime.utcnow() +start = end - timedelta(days=365) + +print(f"Scanning range (naive): {start} to {end}") + +try: + sensors = discover_sensors( + start_time=start, + end_time=end, + chunk_size_days=7, + show_progress=False, + ) + print(f"Found {len(sensors)} sensors.") + # print(sensors) +except Exception as e: + print(f"Error: {e}") diff --git a/installer/data-downloader/testing/test_timestamp.py b/installer/data-downloader/testing/test_timestamp.py new file mode 100644 index 0000000..7634618 --- /dev/null +++ b/installer/data-downloader/testing/test_timestamp.py @@ -0,0 +1,46 @@ + +import os +from datetime import datetime, timezone +from influxdb_client_3 import InfluxDBClient3 + +host = os.getenv("INFLUX_HOST", "http://influxdb3:8181") +token = os.getenv("INFLUX_TOKEN", "") +db = os.getenv("INFLUX_DATABASE", "WFR25") + +client = InfluxDBClient3(host=host, token=token, database=db) + +# Create a UTC datetime +t0 = datetime(2025, 10, 4, tzinfo=timezone.utc) +formatted = f"{t0.isoformat()}Z" +print(f"Testing timestamp format: {formatted}") + +# "2025-10-04T00:00:00+00:00Z" <- Potential double timezone issue + +sql = f""" +SELECT DISTINCT "signalName" +FROM "iox"."WFR25" +WHERE time >= '{formatted}' +""" +print(f"Query: {sql}") + +try: + table = client.query(sql) + print("Rows found:", table.num_rows) + print(table.to_pandas()) +except Exception as e: + print(f"Error: {e}") + +# Try without the extra Z if it has offset +formatted_clean = t0.isoformat() +print(f"\nTesting cleaner format: {formatted_clean}") +sql = f""" +SELECT DISTINCT "signalName" +FROM "iox"."WFR25" +WHERE time >= '{formatted_clean}' +""" +try: + table = client.query(sql) + print("Rows found:", table.num_rows) + print(table.to_pandas()) +except Exception as e: + print(f"Error: {e}") diff --git a/installer/docker-compose.yml b/installer/docker-compose.yml index 0d65a85..3297b71 100644 --- a/installer/docker-compose.yml +++ b/installer/docker-compose.yml @@ -7,18 +7,13 @@ volumes: influxdb3-data: influxdb3-explorer-db: + services: influxdb3: image: influxdb:3.5.0-core container_name: influxdb3 command: > - influxdb3 serve - --node-id influxdb3-node - --object-store file - --data-dir /var/lib/influxdb3 - --admin-token-file /influxdb3-admin-token.json - --wal-snapshot-size 100 - --snapshotted-wal-files-to-keep 10 + influxdb3 serve --node-id influxdb3-node --object-store file --data-dir /var/lib/influxdb3 --admin-token-file /influxdb3-admin-token.json --wal-snapshot-size 100 --snapshotted-wal-files-to-keep 10 restart: unless-stopped ports: - "9000:8181" @@ -32,7 +27,7 @@ services: networks: - datalink healthcheck: - test: ["CMD-SHELL", "timeout 1 bash -c 'cat < /dev/null > /dev/tcp/localhost/8181' || exit 1"] + test: [ "CMD-SHELL", "timeout 1 bash -c 'cat < /dev/null > /dev/tcp/localhost/8181' || exit 1" ] interval: 10s timeout: 5s retries: 5 @@ -50,7 +45,7 @@ services: volumes: - influxdb3-explorer-db:/db:rw - ./influxdb3-explorer-config:/app-root/config:ro - command: ["--mode=admin"] + command: [ "--mode=admin" ] networks: - datalink depends_on: @@ -78,7 +73,6 @@ services: networks: - datalink - slackbot: build: ./slackbot container_name: slackbot @@ -126,7 +120,6 @@ services: networks: - datalink - startup-data-loader: build: ./startup-data-loader container_name: startup-data-loader @@ -152,7 +145,6 @@ services: sleep 5 && python load_data.py" - file-uploader: build: ./file-uploader container_name: file-uploader @@ -177,7 +169,6 @@ services: cpus: "1" memory: 1024M - data-downloader-api: build: context: ./data-downloader @@ -185,6 +176,12 @@ services: container_name: data-downloader-api env_file: - .env + environment: + # Ensure slicks child processes can self-configure + INFLUX_URL: ${INFLUXDB_URL:-http://influxdb3:8181} + INFLUX_TOKEN: ${INFLUXDB_ADMIN_TOKEN} + INFLUX_DB: ${INFLUX_DATABASE:-WFR25} + SEASONS: "${SEASONS:-WFR25:2025}" ports: - "8000:8000" volumes: @@ -196,7 +193,6 @@ services: influxdb3: condition: service_healthy - data-downloader-scanner: build: context: ./data-downloader @@ -204,7 +200,14 @@ services: container_name: data-downloader-scanner env_file: - .env - command: ["python", "-m", "backend.periodic_worker"] + environment: + # Ensure slicks child processes can self-configure + INFLUX_URL: ${INFLUXDB_URL:-http://influxdb3:8181} + INFLUX_TOKEN: ${INFLUXDB_ADMIN_TOKEN} + INFLUX_DB: ${INFLUX_DATABASE:-WFR25} + SEASONS: "${SEASONS:-WFR25:2025}" + SCAN_DAILY_TIME: "${SCAN_DAILY_TIME}" + command: [ "python", "-m", "backend.periodic_worker" ] volumes: - ./data-downloader/data:/app/data restart: unless-stopped diff --git a/installer/sandbox/Dockerfile b/installer/sandbox/Dockerfile index 0215e02..f77de87 100644 --- a/installer/sandbox/Dockerfile +++ b/installer/sandbox/Dockerfile @@ -8,7 +8,11 @@ RUN pip install --no-cache-dir -r requirements.txt # Copy application code COPY code_generator.py . -COPY prompt-guide.txt . +COPY prompt-guide.txt* prompt-guide.txt.example ./ +RUN if [ ! -f prompt-guide.txt ]; then \ + echo "WARNING: prompt-guide.txt not found, using prompt-guide.txt.example"; \ + cp prompt-guide.txt.example prompt-guide.txt; \ + fi # Create directory for generated code RUN mkdir -p /app/generated diff --git a/installer/slackbot/lappy_test_image.png b/installer/slackbot/lappy_test_image.png new file mode 100644 index 0000000..e863b13 Binary files /dev/null and b/installer/slackbot/lappy_test_image.png differ