Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,9 @@ async def lifespan(app: FastAPI):
"""
app.state.main_logger = main_logger
app.state.main_logger.info("Application starting up.")
if str(os.environ.get("ICEFABRIC_DEPLOY_ENV")).lower() in ["t", "test", "p", "prod", "production"]:
# Override the deploy env. Allows for specifying the env when running a docker container
deploy_env = os.environ["ICEFABRIC_DEPLOY_ENV"].lower()
load_creds(deploy_env)
else:
deploy_env = args.deploy_env
load_creds(deploy_env)
deploy_env = os.environ.get("ICEFABRIC_DEPLOY_ENV") or os.environ.get("ENVIRONMENT") or args.deploy_env
deploy_env = deploy_env.lower()
load_creds(deploy_env)
if args.cache_catalog == "sql":
app.state.main_logger.info("Building local SQL cache...")
build_cache(set(args.cached_namespaces), deploy_env)
Expand Down
6 changes: 4 additions & 2 deletions app/routers/streamflow_observations/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
from werkzeug.utils import secure_filename

from icefabric.cli.streamflow import (
BUCKET,
PREFIX,
TIME_FORMATS,
NoResultsFoundError,
get_bucket,
streamflow_observations,
)
from icefabric.schemas.hydrofabric import StreamflowOutputFormats
Expand Down Expand Up @@ -95,7 +95,9 @@ def integrate_time_range(sd: str, ed: str, filename_parts: list, command_args: l
def get_data_and_repo_hist():
"""Get repo/data from icechunk for a given data source"""
try:
storage_config = icechunk.s3_storage(bucket=BUCKET, prefix=PREFIX, region="us-east-1", from_env=True)
storage_config = icechunk.s3_storage(
bucket=get_bucket(), prefix=PREFIX, region="us-east-1", from_env=True
)
repo = icechunk.Repository.open(storage_config)
session = repo.writable_session("main")
ds = xr.open_zarr(session.store, consolidated=False)
Expand Down
11 changes: 1 addition & 10 deletions app/streamlit/streamlit.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,7 @@
pass

# Check environment variable override
if str(os.environ.get("ICEFABRIC_DEPLOY_ENV")).lower() in [
"t",
"test",
"p",
"prod",
"production",
"l",
"local",
]:
deploy_env = os.environ["ICEFABRIC_DEPLOY_ENV"].lower()
deploy_env = os.environ.get("ICEFABRIC_DEPLOY_ENV") or os.environ.get("ENVIRONMENT") or deploy_env

# Set catalog type based on deploy environment
catalog_type = "sql" if deploy_env in ["l", "local"] else "glue"
Expand Down
7 changes: 2 additions & 5 deletions src/icefabric/cache/cache_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,8 @@ def build_cache(namespaces: set, deploy_env: str):
"""
namespaces = parse_namespaces(namespaces)

if str(os.environ.get("ICEFABRIC_DEPLOY_ENV")).lower() in ["t", "test", "p", "prod", "production"]:
# Override the deploy env. Allows for specifying the env when running a docker container
load_creds(os.environ["ICEFABRIC_DEPLOY_ENV"].lower())
else:
load_creds(deploy_env)
deploy_env = os.environ.get("ICEFABRIC_DEPLOY_ENV") or os.environ.get("ENVIRONMENT") or deploy_env
load_creds(deploy_env.lower())

# Creates the local dir for the warehouse if it does not exist
with open(os.environ["PYICEBERG_HOME"]) as f:
Expand Down
13 changes: 11 additions & 2 deletions src/icefabric/cli/streamflow.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Contains all click CLI code for accessing hourly streamflow data"""

import os
from pathlib import Path

import click
Expand All @@ -12,8 +13,14 @@

load_dotenv()

BUCKET = "edfs-data"
PREFIX = "streamflow_observations/hourly_streamflow_observations"


def get_bucket():
"""Return the S3 bucket name from environment, defaulting to edfs-data."""
return os.environ.get("CATALOG_S3_BUCKET", "edfs-data")


TIME_FORMATS = [
"%Y",
"%Y-%m",
Expand Down Expand Up @@ -45,7 +52,9 @@ def validate_file_extension(ctx, param, value):
def get_dataset():
"""Get repo/data from icechunk for a given data source"""
try:
storage_config = icechunk.s3_storage(bucket=BUCKET, prefix=PREFIX, region="us-east-1", from_env=True)
storage_config = icechunk.s3_storage(
bucket=get_bucket(), prefix=PREFIX, region="us-east-1", from_env=True
)
repo = icechunk.Repository.open(storage_config)
session = repo.writable_session("main")
ds = xr.open_zarr(session.store, consolidated=False)
Expand Down
2 changes: 1 addition & 1 deletion src/icefabric/helpers/creds.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def load_creds(deploy: str | None = "test"):
if deploy.lower() in ["t", "test"]:
load_dotenv(dotenv_path=here() / ".env", override=True)
os.environ["CATALOG_S3_BUCKET"] = "edfs-data"
elif deploy.lower() in ["p", "prod", "production"]:
elif deploy.lower() in ["p", "prod", "production", "oe"]:
load_dotenv(dotenv_path=here() / ".prod.env", override=True)
os.environ["CATALOG_S3_BUCKET"] = "iceberg-data-oe"
pyiceberg_file = here() / ".pyiceberg.yaml"
Expand Down
74 changes: 74 additions & 0 deletions tests/smoke/test_streamflow_observations_smoke.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""Smoke tests for the streamflow_observations endpoint against a deployed API."""

import os

import pytest
import requests

API_BASE_URL = os.environ.get("API_BASE_URL")

pytestmark = pytest.mark.skipif(not API_BASE_URL, reason="API_BASE_URL environment variable not set")

VALID_STATION_ID = "01031500"


def test_available():
"""Verify the available endpoint returns a list of station IDs."""
url = f"{API_BASE_URL}/v1/streamflow_observations/available?limit=10"

response = requests.get(url, timeout=120)
assert response.status_code == 200, f"Request failed: {response.status_code} - {response.text}"

data = response.json()
assert "identifiers" in data
assert "total_identifiers" in data
assert len(data["identifiers"]) > 0, "No identifiers returned"
assert data["showing"] <= 10


def test_station_info():
"""Verify the info endpoint returns metadata for a known station."""
url = f"{API_BASE_URL}/v1/streamflow_observations/{VALID_STATION_ID}/info"

response = requests.get(url, timeout=120)
assert response.status_code == 200, f"Request failed: {response.status_code} - {response.text}"

data = response.json()
assert data["identifier"] == VALID_STATION_ID
assert data["total_records"] > 0
assert "date_range" in data
assert data["date_range"]["start"] is not None
assert data["date_range"]["end"] is not None


def test_history():
"""Verify the history endpoint returns repo snapshots."""
url = f"{API_BASE_URL}/v1/streamflow_observations/history"

response = requests.get(url, timeout=120)
assert response.status_code == 200, f"Request failed: {response.status_code} - {response.text}"

data = response.json()
assert "latest_snapshot" in data
assert "snapshots" in data
assert len(data["snapshots"]) > 0


def test_csv_download():
"""Verify CSV download works for a small time range."""
url = (
f"{API_BASE_URL}/v1/streamflow_observations/{VALID_STATION_ID}/csv"
f"?start_date=2023-01-01&end_date=2023-01-02"
)

response = requests.get(url, timeout=120)
assert response.status_code == 200, f"Request failed: {response.status_code} - {response.text}"
assert len(response.content) > 0, "CSV response body is empty"


def test_invalid_station_returns_404():
"""Verify that a nonexistent station ID returns 404."""
url = f"{API_BASE_URL}/v1/streamflow_observations/ZZZZZZZZZZ/info"

response = requests.get(url, timeout=120)
assert response.status_code == 404
Loading