Skip to content

feat: add periodized training program and tools#104

Merged
QueryPlanner merged 2 commits into
mainfrom
feat/periodized-training-program
Jun 7, 2026
Merged

feat: add periodized training program and tools#104
QueryPlanner merged 2 commits into
mainfrom
feat/periodized-training-program

Conversation

@QueryPlanner
Copy link
Copy Markdown
Owner

What

Add database models, storage schemas, helper logic, and 7 new Agent tools to support a highly customizable, rotating, periodized 14-day concurrent training program incorporating resistance targets, conditioning (Zone 2, VO2, cane), and macro test-metrics (1RMs).

Why

This provides a comprehensive training companion system capable of tracking progression, managing scheduled deloads, and recommending dynamic exercise swaps (such as swapping active lifts for conditioning or active recovery when the system detects muscle strain), while maintaining full backward-compatibility with simpler, legacy weekly splits.

How

  • Expanded schema with training_programs, training_program_days, training_program_state, and training_metrics tables
  • Handled schema migrations with an idempotent _ensure_columns routine on workout_sessions to add metadata tracking (program_id, cycle_day, session_type, etc.) and multi-modal metrics
  • Created 7 new tools in src/blacki/workouts/tools.py: set_training_program, get_todays_training, log_training, advance_training_cycle, get_training_history, get_training_metrics, and update_training_metrics
  • Registered and exported all 7 tools globally in src/blacki/registry.py and src/blacki/workouts/__init__.py
  • Upgraded agent prompts in src/blacki/prompt.py to prioritize the new periodized training workflow and support multi-modal metric details
  • Added a full integration and unit test suite in tests/workouts/test_training.py

Tests

  • Run uv run ruff check to confirm clean linting
  • Run uv run ruff format --check to confirm clean formatting
  • Run uv run mypy . to ensure 100% static type checking success
  • Run uv run pytest --cov=src to verify perfect coverage (100.00% codebase-wide statement and branch coverage)

- Add training program, day, state, and metric database tables
- Implement 7 new training tools for logging, tracking, and cycles
- Register training tools and integrate prompt workout guidance
- Add comprehensive unit and integration tests with 100% coverage
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a rotating, multi-modal training program feature to the workout tracking system, adding new database tables and columns to support structured cycle days, session types, completion statuses, and metrics. It registers seven new tools, updates the system prompt, and adds comprehensive unit tests. The review feedback focuses on enhancing robustness and performance, specifically recommending wrapping the entire read-modify-write sequence in advance_training_state inside the storage lock to prevent race conditions, adding defensive type-checking for LLM-provided inputs (such as validating that exercises, sets, and metrics are dictionaries), and optimizing database queries in get_training_history and get_latest_training_metrics to avoid N+1 query issues and excessive memory usage.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment thread src/blacki/workouts/storage.py Outdated
Comment on lines +662 to +685
program = await self.get_active_training_program(user_id)
if program is None or program.state is None or program.id is None:
return None

current_day = program.state.current_cycle_day
current_week = program.state.current_mesocycle_week
for _ in range(days):
if current_day % 7 == 0:
current_week += 1
if current_week > program.deload_week_interval:
current_week = 1
current_day = (
current_day + 1 if current_day < program.cycle_length_days else 1
)

async with self._lock:
await self._conn.execute(
"""
UPDATE training_program_state
SET current_cycle_day = ?, current_mesocycle_week = ?, updated_at = ?
WHERE user_id = ?
""",
(current_day, current_week, updated_at, user_id),
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The advance_training_state method performs a read-modify-write operation on the user's training program state. However, the active training program is fetched outside of the self._lock block. If multiple requests attempt to advance the training state concurrently, they could read the same initial state, perform the calculations, and overwrite each other's updates, leading to a race condition.

To prevent this, the entire block—including fetching the active training program—should be wrapped inside the async with self._lock: block.

        async with self._lock:
            program = await self.get_active_training_program(user_id)
            if program is None or program.state is None or program.id is None:
                return None

            current_day = program.state.current_cycle_day
            current_week = program.state.current_mesocycle_week
            for _ in range(days):
                if current_day % 7 == 0:
                    current_week += 1
                    if current_week > program.deload_week_interval:
                        current_week = 1
                current_day = (
                    current_day + 1 if current_day < program.cycle_length_days else 1
                )

            await self._conn.execute(
                """
                UPDATE training_program_state
                SET current_cycle_day = ?, current_mesocycle_week = ?, updated_at = ?
                WHERE user_id = ?
                """,
                (current_day, current_week, updated_at, user_id),
            )

Comment on lines 34 to +36
for i, ex_dict in enumerate(exercises):
if "name" not in ex_dict or "sets" not in ex_dict:
return {
"status": "error",
"message": "Each exercise must have 'name' and 'sets' keys",
}
return [], "Each exercise must have 'name' and 'sets' keys"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Since this tool is invoked by an LLM agent, the input structure of exercises cannot be fully trusted. If exercises contains elements that are not dictionaries (e.g., a list of strings like ["squat"]), checking "name" not in ex_dict will succeed but ex_dict["sets"] will raise a TypeError and crash the tool. We should defensively validate that each ex_dict is indeed a dictionary.

Suggested change
for i, ex_dict in enumerate(exercises):
if "name" not in ex_dict or "sets" not in ex_dict:
return {
"status": "error",
"message": "Each exercise must have 'name' and 'sets' keys",
}
return [], "Each exercise must have 'name' and 'sets' keys"
for i, ex_dict in enumerate(exercises):
if not isinstance(ex_dict, dict):
return [], "Each exercise must be a dictionary"
if "name" not in ex_dict or "sets" not in ex_dict:

Comment on lines 53 to +57
for set_dict in sets_list:
if "weight_kg" not in set_dict and "weight" not in set_dict:
return {
"status": "error",
"message": "Each set must have 'weight_kg' (or 'weight')",
}
if "reps" not in set_dict: # pragma: no cover
return {
"status": "error",
"message": "Each set must have 'reps'",
}
return [], "Each set must have 'weight_kg' (or 'weight')"
if "reps" not in set_dict:
return [], "Each set must have 'reps'"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Similarly, if sets_list contains elements that are not dictionaries (e.g., if the LLM passes a list of integers like [10, 10, 10]), iterating over them and calling "weight_kg" not in set_dict or set_dict.get(...) will raise a TypeError or AttributeError and crash the tool. We should defensively validate that each set_dict is a dictionary.

Suggested change
for set_dict in sets_list:
if "weight_kg" not in set_dict and "weight" not in set_dict:
return {
"status": "error",
"message": "Each set must have 'weight_kg' (or 'weight')",
}
if "reps" not in set_dict: # pragma: no cover
return {
"status": "error",
"message": "Each set must have 'reps'",
}
return [], "Each set must have 'weight_kg' (or 'weight')"
if "reps" not in set_dict:
return [], "Each set must have 'reps'"
sets: list[SetDetail] = []
for set_dict in sets_list:
if not isinstance(set_dict, dict):
return [], "Each set must be a dictionary"
if "weight_kg" not in set_dict and "weight" not in set_dict:
return [], "Each set must have 'weight_kg' (or 'weight')"
if "reps" not in set_dict:

Comment on lines +728 to +734
rows = await self._fetch_all(query, tuple(values))
sessions = []
for row in rows:
session = await self.get_session(int(row["id"]), user_id)
if session is not None:
sessions.append(session)
return sessions
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The get_training_history method currently fetches the matching session rows, and then for each row, calls self.get_session(int(row["id"]), user_id). This introduces two inefficiencies:

  1. Redundant Session Fetching: It queries SELECT * FROM workout_sessions again for each session, even though the initial query already fetched all session columns (s.*).
  2. N+1 Query Problem: It executes a separate query to fetch exercises for each individual session sequentially.

We can optimize this by directly converting the fetched rows to WorkoutSession objects using self._row_to_session(row), and then fetching all exercises for all matched sessions in a single batch query using session_id IN (...). This reduces the database roundtrips from 2N + 1 to exactly 2 queries.

        rows = await self._fetch_all(query, tuple(values))
        sessions = []
        if not rows:
            return sessions

        for row in rows:
            sessions.append(self._row_to_session(row))

        session_ids = [s.id for s in sessions if s.id is not None]
        if session_ids:
            placeholders = ", ".join("?" for _ in session_ids)
            ex_rows = await self._fetch_all(
                f"""
                SELECT * FROM workout_exercises
                WHERE session_id IN ({placeholders})
                ORDER BY session_id, exercise_order ASC, id ASC
                """,
                tuple(session_ids),
            )
            exercises_by_session = {}
            for r in ex_rows:
                sid = int(r["session_id"])
                exercises_by_session.setdefault(sid, []).append(self._row_to_exercise(r))
            for s in sessions:
                if s.id is not None:
                    s.exercises = exercises_by_session.get(s.id, [])
        return sessions

Comment thread src/blacki/workouts/storage.py Outdated
Comment on lines +772 to +785
rows = await self._fetch_all(
f"""
SELECT * FROM training_metrics
{where}
ORDER BY metric_name ASC, recorded_at DESC, id DESC
""", # noqa: S608
tuple(values),
)
latest_by_name: dict[str, TrainingMetric] = {}
for row in rows:
name = row["metric_name"]
if name not in latest_by_name:
latest_by_name[name] = self._row_to_training_metric(row)
return list(latest_by_name.values())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The get_latest_training_metrics method currently fetches all historical metric rows for the user from the database and filters them in Python memory to keep only the latest row per metric name. If a user has a large history of recorded metrics, this will load a significant amount of unnecessary data into memory.

We can optimize this by using a window function (ROW_NUMBER() OVER (PARTITION BY metric_name ORDER BY recorded_at DESC, id DESC)) directly in the SQL query. This allows SQLite to perform the filtering and only return the single latest row per metric name, drastically reducing memory usage and database transfer overhead.

Suggested change
rows = await self._fetch_all(
f"""
SELECT * FROM training_metrics
{where}
ORDER BY metric_name ASC, recorded_at DESC, id DESC
""", # noqa: S608
tuple(values),
)
latest_by_name: dict[str, TrainingMetric] = {}
for row in rows:
name = row["metric_name"]
if name not in latest_by_name:
latest_by_name[name] = self._row_to_training_metric(row)
return list(latest_by_name.values())
where_clause = "WHERE user_id = ?"
if metric_names:
placeholders = ", ".join("?" for _ in metric_names)
where_clause = f"{where_clause} AND metric_name IN ({placeholders})"
query = f"""
SELECT id, user_id, metric_name, value, unit, recorded_at, notes
FROM (
SELECT *, ROW_NUMBER() OVER (
PARTITION BY metric_name
ORDER BY recorded_at DESC, id DESC
) as rn
FROM training_metrics
{where_clause}
)
WHERE rn = 1
ORDER BY metric_name ASC
"""
rows = await self._fetch_all(query, tuple(values))
return [self._row_to_training_metric(row) for row in rows]

Comment on lines +847 to +850
metrics_data = row.get("metrics") or "{}"
metrics = (
json.loads(metrics_data) if isinstance(metrics_data, str) else metrics_data
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If the metrics column in the database contains a JSON null or is otherwise malformed, json.loads might return None or a non-dictionary type. Since WorkoutSession.metrics is expected to be a dictionary, we should defensively ensure that metrics is always a dictionary to prevent AttributeError or type mismatches when accessing it later in the application (e.g., in get_todays_training).

        metrics_data = row.get("metrics") or "{}"
        metrics = (
            json.loads(metrics_data) if isinstance(metrics_data, str) else metrics_data
        )
        if not isinstance(metrics, dict):
            metrics = {}

Comment on lines +96 to +97
if not metrics:
return [], "metrics cannot be empty"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If the LLM passes a non-dictionary object (such as a list) for the metrics parameter, calling metrics.items() will raise an AttributeError and crash the tool. We should defensively validate that metrics is a dictionary before processing it.

Suggested change
if not metrics:
return [], "metrics cannot be empty"
if not isinstance(metrics, dict):
return [], "metrics must be a dictionary"
if not metrics:

- Acquire storage lock in advance_training_state to prevent races
- Add defensive type checking for LLM-provided inputs in tools
- Batch load exercises in get_training_history to prevent N+1 queries
- Optimize get_latest_training_metrics using SQLite window functions
- Update unit tests to achieve 100% statement and branch coverage
@QueryPlanner QueryPlanner merged commit 905e0e2 into main Jun 7, 2026
1 check passed
@QueryPlanner QueryPlanner deleted the feat/periodized-training-program branch June 7, 2026 08:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant