Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,31 @@ Used when `VECTOR_DB_TYPE=qdrant`.
* `QDRANT_PORT`: Port of the Qdrant service (default: `6333`).
* `QDRANT_COLLECTION`: Name of the Qdrant collection.

## Vectorstore Visualization
Running `poetry run python visualize.py` will generate a 2d scatterplot
of the vectors.
See `visualize.ChartType` for supported charts.
## RAG Pipeline Invocation
The ZenML RAG pipeline can be invoked using the `run-pipeline` script. This script allows for running different parts of the pipeline with various configurations.

![image](scatterplot2d.png)
### Configuration
Local runs should include a `.env` file in the project root to load necessary environment variables.

### Local Execution
To run the feature engineering pipeline locally:
```shell
poetry run run-pipeline --run-feature-engineering
```

![image](scatterplot3d.png)
To run without caching:
```shell
poetry run run-pipeline --run-feature-engineering --no-cache
```

# Gemini Agent Configuration
Available options:
- `--no-cache`: Disable ZenML caching.
- `--run-feature-engineering`: Run the feature engineering pipeline.
- `--run-end-to-end-data`: Run the full data pipeline.
- ... (see `poetry run run-pipeline --help` for all options)

### Unit Tests
...

The `gemeni` directory contains the configuration files for the Gemini agent.

Expand Down
File renamed without changes.
16 changes: 12 additions & 4 deletions docs/Agent.md → docs/agent/Agent.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# System Prompt
You are an expert Software Engineer, who specializes in `Python`, `Docker`, `zenml`, and RAG Data Pipelines.
You are an expert Software Engineer, who specializes in `Python`, `Docker`, `mongo`, `zenml`, and RAG Data Pipelines.

You always enter plan mode.

Expand All @@ -15,9 +15,6 @@ For this project, you are working on a RAG feature pipeline, which will
- Clean data from the data lake.
- Chunk and embed in the vector store.

# User Prompt
Please wait for user input.

## Development Tools
### Tox
This project includes `tox`. Its purpose is
Expand Down Expand Up @@ -50,3 +47,14 @@ The service runs locally on the `babylon` docker-compose network, in which the f
- Vector DB: Name: `chroma`
- Secrets Manager: Name: `openbao`
- ZenML Orchestrator: Name: `zenml`


# User Prompt
Please wait for user input.

## Planning
When planning, write your plan to `<GIT_BRANCH_NAME>.md`, where `<GIT_BRANCH_NAME>` is the name of the current local git branch.
Ask the user to validate the plan before executing.

## Readme
Always keep the `README.md` up-to-date with relevant information.
File renamed without changes.
20 changes: 20 additions & 0 deletions docs/agent/run-features.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
We are updating `tools/run.py` so that `features` generation can be invoked.

## Python Application Configuration Settings
- Python Application Configuration settings should exist in `.env` for local runs.

## ZenML Pipeline Infrastructure Configuration
- zenml infrastructure configuration exists in `./zenml/pipeline/config`.

### ZenML Docker Image.
- The pipeline configuration should point to the image created by `Dockerfile`. For now it's only a locally built image.

## Task
Your task is to ensure that `poetry run run.py` correctly invokes `generate_features` via Zenml.
Note that some implementation details are still in progress. Create mock data where appropriate.

## Validation
To validate your changes, build a `tests/test_run.py` pytest suite.

## Open Questions
Write any open questions to a file `QUESTIONS.md`. Wait for the user to respond before executing.
File renamed without changes.
8 changes: 6 additions & 2 deletions features/pipeline/generate_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@


@pipeline
def generate_features(wait_for: str | list[str] | None = None) -> list[str]:
def generate_features(
wait_for: str | list[str] | None = None,
mock: bool = False,
**kwargs,
) -> list[str]:
"""Entry point to the zenml pipeline for generating Babylon features."""
raw_documents = fg_steps.query_data_lake(after=wait_for)
raw_documents = fg_steps.query_data_lake(after=wait_for, mock=mock)
clean_documents = fg_steps.clean_documents(raw_documents)

# Load cleaned docs to vector db.
Expand Down
9 changes: 8 additions & 1 deletion features/steps/feature_generation/query_data_lake.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,18 @@
@step
def query_data_lake(
transaction_descriptions: list[str] | None = None,
mock: bool = False,
) -> Annotated[list, "raw_documents"]:
"""Entry point to query the datalake for new documents.

:param transaction_descriptions: Optional list of
:param transaction_descriptions: Optional list of transaction descriptions.
:param mock: Whether to return mock data.
"""
if mock:
return [
{"id": "1", "content": "Sample transaction 1", "metadata": {"source": "mock"}},
{"id": "2", "content": "Sample transaction 2", "metadata": {"source": "mock"}},
]
documents: list = []
# todo
return documents
Empty file added features/tools/__init__.py
Empty file.
175 changes: 175 additions & 0 deletions features/tools/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
"""
Driver script for invoking the Zenml RAG pipeline.
This script will be updated accordingly as more steps are built.
"""
from datetime import datetime as dt
from pathlib import Path

import click
import dotenv
from loguru import logger

from features.config import config

from features.pipeline import (
generate_features,
)

_HELP_PROMPT = """
Babylon RAG Pipeline.

Main entry point for pipeline execution.

Run the Run the ZenML project pipelines with various options.

Run a pipeline with the required parameters. This executes
all steps in the pipeline in the correct order using the orchestrator
stack component that is configured in your active ZenML stack.

Examples:
\b
# Run the pipeline with default options
python run.py

\b
# Run the pipeline without cache
python run.py --no-cache

\b
# Run only the ETL pipeline
python run.py --only-etl
"""
@click.command(
help=_HELP_PROMPT
)
@click.option(
"--no-cache",
is_flag=True,
default=False,
help="Disable caching for the pipeline run.",
)
@click.option(
"--run-end-to-end-data",
is_flag=True,
default=False,
help="Whether to run all the data pipelines in one go.",
)
@click.option(
"--run-etl",
is_flag=True,
default=False,
help="Whether to run the ETL pipeline.",
)
@click.option(
"--run-export-artifact-to-json",
is_flag=True,
default=False,
help="Whether to run the Artifact -> JSON pipeline",
)
@click.option(
"--etl-config-filename",
default="digital_data_etl_paul_iusztin.yaml",
help="Filename of the ETL config file.",
)
@click.option(
"--run-feature-engineering",
is_flag=True,
default=False,
help="Whether to run the FE pipeline.",
)
@click.option(
"--run-generate-instruct-datasets",
is_flag=True,
default=False,
help="Whether to run the instruct dataset generation pipeline.",
)
@click.option(
"--run-generate-preference-datasets",
is_flag=True,
default=False,
help="Whether to run the preference dataset generation pipeline.",
)
@click.option(
"--run-training",
is_flag=True,
default=False,
help="Whether to run the training pipeline.",
)
@click.option(
"--run-evaluation",
is_flag=True,
default=False,
help="Whether to run the evaluation pipeline.",
)
@click.option(
"--export-settings",
is_flag=True,
default=False,
help="Whether to export your settings to ZenML or not.",
)
def main(
no_cache: bool = False,
run_end_to_end_data: bool = False,
run_etl: bool = False,
run_export_artifact_to_json: bool = False,
etl_config_filename: str = "digital_data_etl_paul_iusztin.yaml",
run_feature_engineering: bool = False,
run_generate_instruct_datasets: bool = False,
run_generate_preference_datasets: bool = False,
run_training: bool = False,
run_evaluation: bool = False,
export_settings: bool = False,
):
if not any(
[
run_feature_engineering,
run_end_to_end_data,
run_etl,
run_generate_instruct_datasets,
run_generate_preference_datasets,
run_training,
run_evaluation,
]
):
click.echo("Please specify an action to run.")
return

pipeline_args = {
"enable_cache": not no_cache,
}
# root_dir is two levels up from features/tools/run.py
root_dir = Path(__file__).resolve().parent.parent.parent
logger.debug(f"Project root directory: {root_dir}")

if run_feature_engineering:
# Features pipeline config.
run_args_fe = {}
# Update environment with any `.env` (for local runs only)
env_path = root_dir / ".env"
if env_path.exists():
logger.info(f"Update environment with `{env_path}`")
dotenv.load_dotenv(env_path)
else:
logger.warning(f"No .env found at {env_path}")

# Update config with existing environment variables.
logger.debug("Loading features config with environment variables.")
config.update_config_from_environment(run_args_fe)
logger.info("Environment loaded.")

pipeline_args["config_path"] = (
root_dir / "zenml" / "pipeline" / "config" / "feature_engineering.yaml"
)
pipeline_args["run_name"] = (
f"feature_engineering_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}"
)

logger.info(f"Invoking generate_features with config {pipeline_args['config_path']}")
generate_features.with_options(**pipeline_args)(**run_args_fe)

else:
logger.warning("Selected action is not yet implemented in run.py.")


if __name__ == '__main__':
main()
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ matplotlib = "^3.10.7"
plotly = "^6.3.1"
scikit-learn = "1.7.2"

[tool.poetry.scripts]
run-pipeline = "features.tools.run:main"


[build-system]
requires = ["poetry-core>=1.0.0"]
Expand Down
61 changes: 61 additions & 0 deletions tests/test_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import sys
from pathlib import Path
from unittest.mock import patch, MagicMock
import pytest
from click.testing import CliRunner

# Add the project root to sys.path if needed
sys.path.append(str(Path(__file__).resolve().parent.parent))

from features.tools.run import main

def test_main_help():
"""Test that the help message is displayed."""
runner = CliRunner()
result = runner.invoke(main, ["--help"])
assert result.exit_code == 0
assert "Babylon RAG Pipeline." in result.output

@patch("features.tools.run.dotenv.load_dotenv")
@patch("features.tools.run.config.update_config_from_environment")
@patch("features.tools.run.generate_features")
def test_main_run_feature_engineering(mock_generate_features, mock_update_config, mock_load_dotenv):
"""Test that run_feature_engineering calls the pipeline with expected config."""
# Setup mocks
mock_pipeline_with_options = MagicMock()
mock_generate_features.with_options.return_value = mock_pipeline_with_options

# We need to simulate that some env vars are found and populated into run_args_fe
def side_effect(config_dict):
config_dict["MOCK_CONFIG"] = "MOCK_VALUE"
mock_update_config.side_effect = side_effect

runner = CliRunner()
# We need to pass --run-feature-engineering
result = runner.invoke(main, ["--run-feature-engineering"])

# Assertions
assert result.exit_code == 0

# Check if load_dotenv was called (assuming .env might exist in test environment or we mock it)
# Even if it doesn't exist, we check if it was attempted.
assert mock_load_dotenv.called

# Check if generate_features.with_options was called
assert mock_generate_features.with_options.called
options_call_args = mock_generate_features.with_options.call_args[1]
assert "config_path" in options_call_args
assert "feature_engineering.yaml" in str(options_call_args["config_path"])

# Check if the pipeline was invoked with run_args_fe
mock_pipeline_with_options.assert_called_once()
invocation_args = mock_pipeline_with_options.call_args[1]
assert invocation_args["MOCK_CONFIG"] == "MOCK_VALUE"

def test_main_no_action():
"""Test that main shows an error if no action is specified."""
runner = CliRunner()
result = runner.invoke(main, [])
# In the updated run.py, it logs an error and returns instead of asserting
# logger.error("Please specify an action to run.")
assert "Please specify an action to run." in result.output
16 changes: 16 additions & 0 deletions zenml/pipeline/config/end_to_end_data.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
settings:
docker:
# todo: this should be the image built by babylon.
# parent_image: 992382797823.dkr.ecr.eu-central-1.amazonaws.com/zenml-rlwlcs:latest
skip_build: True
orchestrator.sagemaker:
synchronous: false

parameters:
# Data ETL & Feature engineering pipelines parameters
# Add config as needed
# Generate instruct dataset pipeline parameters
test_split_size: 0.1
push_to_huggingface: false
dataset_id: 'todo'
mock: false
Loading
Loading