Skip to content

AdeshDeshmukh/dataops-env

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

45 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

title DataOps-Env
emoji πŸ—„οΈ
colorFrom blue
colorTo green
sdk docker
pinned false

πŸ—„οΈ DataOps-Env

An OpenEnv Reinforcement Learning Environment for Autonomous Database Repair

License: MIT Python 3.10 FastAPI Docker OpenEnv HF Space

Built for MetaPyTorch Hackathon Β· Powered by Meta's OpenEnv Framework

πŸš€ Live Demo Β· πŸ“– API Docs Β· πŸ’» GitHub


🌟 What is DataOps-Env?

DataOps-Env is a real-world reinforcement learning environment built on Meta's OpenEnv framework. It places AI agents in the role of a junior data engineer tasked with repairing corrupted SQLite databases through sequential SQL interactions.

Unlike traditional SQL benchmarks that test one-shot read-only queries, DataOps-Env simulates the full interactive loop of real data engineering:

Corrupted Database β†’ Agent Explores β†’ Agent Fixes β†’ Grader Scores

Agents must discover, diagnose, and repair data quality issues step by step β€” earning rewards for meaningful progress and penalties for destructive or repeated actions.


🎯 Why Does This Matter?

Traditional SQL Benchmarks DataOps-Env
One-shot queries Multi-step interactions
Read-only Read + Write
Static evaluation Dynamic reward signals
No agent feedback loop Full RL environment
Toy problems Real data engineering tasks

DataOps-Env fills a genuine gap β€” there is no existing OpenEnv environment that benchmarks agents on interactive database repair. This is a task real data engineers perform daily.


πŸ—οΈ Architecture Overview

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                   AI Agent                              β”‚
β”‚          (LLM / RL Policy / Custom Script)              β”‚
└───────────────────---β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                       β”‚ HTTP Actions (REST API)
                       β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                  DataOps-Env                            β”‚
β”‚                                                         β”‚
β”‚       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”               β”‚
β”‚       β”‚ reset() β”‚ β”‚ step()   β”‚ β”‚state() β”‚               β”‚
β”‚       β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜               β”‚
β”‚                                                         β”‚
β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚
β”‚ β”‚               DataOpsEnv Core                    β”‚    β”‚
β”‚ β”‚ Observation Β· Action Β· Reward Β· StepResult       β”‚    β”‚
β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚
β”‚                                                         β”‚
β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”‚
β”‚ β”‚ task_easy  β”‚ β”‚task_medium β”‚ β”‚  task_hard       β”‚      β”‚
β”‚ β”‚ seed.db    β”‚ β”‚ seed.db    β”‚ β”‚  seed.db         β”‚      β”‚
β”‚ β”‚ target.db  β”‚ β”‚ target.db  β”‚ β”‚  target.db       β”‚      β”‚
β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸ“‹ Tasks

🟒 Task 1 β€” Easy: Data Standardization

Goal: Repair inconsistent phone numbers and email addresses in the users table.

Issue Example Corrupted Expected Fixed
Phone formats (987)6543210, 987-654-3210, +91-9876543210 9876543210
Email casing PRIYA@OUTLOOK.COM, Adesh@Gmail.Com priya@outlook.com
Email spaces sneha@ gmail.com, vikram @gmail.com sneha@gmail.com
-- Fix phones
UPDATE users SET phone = REPLACE(REPLACE(REPLACE(phone, '-', ''), '(', ''), ')', '');

-- Fix emails  
UPDATE users SET email = LOWER(TRIM(email));

Step Limit: 10 Β· Baseline Score: 0.77


🟑 Task 2 β€” Medium: Conditional Merge & Anomaly Handling

Goal: Migrate valid orders from legacy_orders into current_orders, excluding corrupted rows with negative amounts.

Source Table Issue Action Required
legacy_orders Negative amt values Exclude rows where amt < 0
legacy_orders Different column names Map client_name β†’ customer_name, amt β†’ amount
current_orders Empty (needs population) Insert valid rows only
INSERT OR IGNORE INTO current_orders (order_id, customer_name, amount, order_date)
SELECT order_id, client_name, amt, order_date
FROM legacy_orders WHERE amt > 0;

Step Limit: 15 Β· Baseline Score: 0.89


πŸ”΄ Task 3 β€” Hard: Referential Integrity & Aggregation

Goal: Remove orphaned invoices and create a monthly revenue aggregation table.

Issue Description Fix
Orphaned invoices customer_id references non-existent customers DELETE orphans
Missing aggregation No monthly_revenue table exists CREATE with JOIN + GROUP BY
-- Remove orphans
DELETE FROM invoices
WHERE customer_id NOT IN (SELECT customer_id FROM customers);

-- Create revenue table
CREATE TABLE monthly_revenue AS
SELECT invoice_month, ROUND(SUM(amount), 2) AS total_revenue
FROM invoices JOIN customers USING (customer_id)
GROUP BY invoice_month ORDER BY invoice_month;

Step Limit: 20 Β· Baseline Score: 0.80


🟑 Task 4 β€” Medium: NULL Value Repair

Goal: Fill all ⁠ NULL ⁠ values in the ⁠ employees ⁠ table using statistical imputation and a sensible default.

Column Issue Repair Strategy
⁠ age ⁠ ⁠ NULL ⁠ values Replace with the rounded average of other employees.
⁠ salary ⁠ ⁠ NULL ⁠ values Replace with the average salary of other employees.
⁠ department ⁠ ⁠ NULL ⁠ values Replace with the default text 'General'.

⁠ sql -- Fix NULL ages with rounded average UPDATE employees SET age = (SELECT CAST(ROUND(AVG(age)) AS INTEGER) FROM employees WHERE age IS NOT NULL) WHERE age IS NULL;

-- Fix NULL salaries with average UPDATE employees SET salary = (SELECT ROUND(AVG(salary), 2) FROM employees WHERE salary IS NOT NULL) WHERE salary IS NULL;

-- Fix NULL departments with default UPDATE employees SET department = 'General' WHERE department IS NULL;  ⁠ Step Limit: 15 Β· Baseline Score: 0.91


πŸ”΄ Task 5 β€” Hard: Duplicate Record Detection & Removal

Goal: Remove all duplicate customer records from the ⁠ customers ⁠ table, keeping only the record with the lowest ⁠ id ⁠ for each email.

Issue Description Fix
Duplicate ⁠ email`s Multiple rows share the same `email ⁠. ⁠ DELETE ⁠ all but the row with ⁠ MIN(id) ⁠.
Data Integrity Retained records must not be modified. Only ⁠ DELETE ⁠ actions should be used.

⁠ sql -- Remove duplicates, keeping only the lowest id per email DELETE FROM customers WHERE id NOT IN ( SELECT MIN(id) FROM customers GROUP BY email );  ⁠ Step Limit: 15 Β· Baseline Score: 0.85


πŸ‘οΈ Observation Space

Every call to step() or reset() returns a structured Observation:

class Observation(BaseModel):
    task_id:           str               # "task_easy" | "task_medium" | "task_hard"
    goal:              str               # Natural language task description
    available_tables:  List[str]         # Tables in current working database
    schema_summary:    Dict[str, List]   # Column names per table
    last_query_result: Optional[Any]     # Result rows from last SQL query
    last_error:        Optional[str]     # Error message if last action failed
    steps_remaining:   int               # Steps left before episode ends
    current_step:      int               # Current step number

⚑ Action Space

Agents can take 3 types of actions:

class Action(BaseModel):
    action_type: str              # Required
    query:       Optional[str]    # Required for execute_sql
    table_name:  Optional[str]    # Optional reference
action_type Description When to Use
inspect_schema View all table schemas and columns First step, orientation
execute_sql Run any SQL (SELECT, UPDATE, INSERT, DELETE, CREATE) Data exploration and repair
finish_task Submit solution for grading When confident fix is complete

πŸ† Reward Design

Final Score = Step Rewards + Grader Bonus
           = Ξ£(step_rewards) + (grader_score Γ— 0.50)
Event Reward Reason
Schema inspection +0.03 Encourages exploration
Valid SQL executed +0.03 Rewards useful actions
Invalid SQL -0.05 Penalizes bad queries
Repeated query -0.02 Prevents loops
Destructive action (DROP) -0.20 Blocks data loss
Task completion grader_score Γ— 0.50 Main objective signal

Reward shaping ensures agents receive signal throughout the full trajectory β€” not just at the end. This enables efficient RL training.


πŸ“Š Baseline Scores

Achieved by running demo.py with deterministic SQL operations:

Task Grader Score Final Score Steps Used
task_easy 0.84 / 1.0 βœ… 0.84 10 / 10
task_medium 0.94/ 1.0 βœ… 0.94 15 / 15
task_hard 1.0 / 1.0 βœ… 1.00 20 / 20
task_null_rpair 0.81 / 1.0 βœ… 0.81 15 / 15
task_dedup 0.84 / 1.0 βœ… 0.84 15 / 15
Average 0.9 / 1.0 0.90 β€”

All 3 tasks achieve perfect grader accuracy (1.0). Final scores below 1.0 reflect the reward formula design where step rewards contribute alongside the grader bonus.


πŸš€ Quick Start

Prerequisites

git clone https://github.com/AdeshDeshmukh/dataops-env
cd dataops-env
pip install -r requirements.txt

Initialize Databases

python3 create_databases.py
Creating databases...
task1_seed.db   βœ“
task1_target.db βœ“
task2_seed.db   βœ“
task2_target.db βœ“
task3_seed.db   βœ“
task3_target.db βœ“

All 6 databases created successfully.

Run Demo

python3 demo.py

πŸ–₯️ Run Locally

Option 1: Direct Python

# Terminal 1 - Start server
python3 app.py

# Terminal 2 - Test endpoints
curl http://localhost:7860/

curl -X POST http://localhost:7860/reset \
  -H "Content-Type: application/json" \
  -d '{"task_id": "task_easy"}'

curl -X POST http://localhost:7860/step \
  -H "Content-Type: application/json" \
  -d '{"action_type": "inspect_schema"}'

curl -X POST http://localhost:7860/step \
  -H "Content-Type: application/json" \
  -d '{"action_type": "execute_sql", "query": "SELECT * FROM users"}'

curl -X POST http://localhost:7860/step \
  -H "Content-Type: application/json" \
  -d '{"action_type": "finish_task"}'

Option 2: Docker

docker build -t dataops-env .
docker run -p 7860:7860 dataops-env

πŸ€– Run Inference (LLM Agent)

export HF_TOKEN=your_huggingface_token
export API_BASE_URL=https://router.huggingface.co/v1
export MODEL_NAME=Qwen/Qwen2.5-72B-Instruct
python3 inference.py

Expected Log Output

[START] task=task_easy env=dataops-env model=Qwen/Qwen2.5-72B-Instruct
[STEP] step=1 action=inspect_schema reward=0.03 done=false error=null
[STEP] step=2 action=execute_sql('SELECT * FROM users') reward=0.03 done=false error=null
[STEP] step=3 action=execute_sql('UPDATE users SET phone...') reward=0.03 done=false error=null
[STEP] step=4 action=finish_task reward=0.50 done=true error=null
[END] success=true steps=4 score=0.77 rewards=0.03,0.03,0.03,0.50

πŸ“‘ API Endpoints

Method Endpoint Description Body
GET / Health check None
POST /reset Initialize task {"task_id": "task_easy"}
POST /step Execute action {"action_type": "...", "query": "..."}
GET /state Current state None
GET /tasks List all tasks None

Example Full Interaction

# 1. Start task
POST /reset  {"task_id": "task_easy"}

# 2. Look at schema
POST /step   {"action_type": "inspect_schema"}

# 3. View data
POST /step   {"action_type": "execute_sql", "query": "SELECT * FROM users"}

# 4. Fix phones
POST /step   {"action_type": "execute_sql", "query": "UPDATE users SET phone = REPLACE(phone, '-', '')"}

# 5. Fix emails
POST /step   {"action_type": "execute_sql", "query": "UPDATE users SET email = LOWER(TRIM(email))"}

# 6. Submit
POST /step   {"action_type": "finish_task"}

πŸ“ Project Structure

dataops-env/
β”‚
β”œβ”€β”€ πŸ”§ Core Environment
β”‚   β”œβ”€β”€ envs/
β”‚   β”‚   β”œβ”€β”€ __init__.py
β”‚   β”‚   └── dataops_env.py      # RL environment (Observation, Action, Reward)
β”‚   └── graders/
β”‚       β”œβ”€β”€ __init__.py
β”‚       └── grader.py           # Task evaluation & scoring
β”‚
β”œβ”€β”€ πŸ“‹ Task Management
β”‚   β”œβ”€β”€ tasks/
β”‚   β”‚   β”œβ”€β”€ __init__.py
β”‚   β”‚   β”œβ”€β”€ task_definition.py  # Task loading & config
β”‚   β”‚   └── assets/             # 6 SQLite databases (seed + target)
β”‚   └── configs/
β”‚       └── env_config.yaml     # Task definitions & reward config
β”‚
β”œβ”€β”€ πŸ“‘ API Layer
β”‚   β”œβ”€β”€ app.py                  # FastAPI server (5 endpoints)
β”‚   └── server/
β”‚       └── app.py              # OpenEnv server entry point
β”‚
β”œβ”€β”€ πŸ€– Agent Scripts
β”‚   β”œβ”€β”€ inference.py            # LLM autonomous agent
β”‚   └── demo.py                 # Deterministic baseline demo
β”‚
β”œβ”€β”€ πŸš€ Deployment
β”‚   β”œβ”€β”€ Dockerfile              # Container configuration
β”‚   β”œβ”€β”€ requirements.txt        # Python dependencies
β”‚   β”œβ”€β”€ pyproject.toml          # Project metadata & OpenEnv config
β”‚   β”œβ”€β”€ uv.lock                 # Dependency lock file
β”‚   └── openenv.yaml            # OpenEnv specification
β”‚
└── πŸ“– Documentation
    └── README.md               # This file

πŸ”’ Safety Features

DANGEROUS_PATTERNS = ['DROP TABLE', 'DROP DATABASE', 'TRUNCATE']

# Blocks destructive actions
if _is_dangerous(action.query):
    step_reward = -0.20          # Heavy penalty
    self.done   = True           # Episode ends immediately
  • DROP TABLE β†’ Blocked + -0.20 penalty + episode ends
  • Repeated queries β†’ -0.02 penalty
  • Invalid SQL β†’ -0.05 penalty
  • Empty query β†’ -0.05 penalty

πŸ› οΈ Tech Stack

Component Technology
Language Python 3.10
API Framework FastAPI + Uvicorn
Database SQLite 3
Models Pydantic v2
LLM Client OpenAI SDK
Config YAML
Container Docker
Deployment Hugging Face Spaces

πŸ”— Links

Resource URL
πŸ€— Live Demo https://huggingface.co/spaces/AdeshDeshmukh/dataops-env
πŸ“– API Docs https://adeshdeshmukh-dataops-env.hf.space/docs
πŸ’» GitHub https://github.com/AdeshDeshmukh/dataops-env
πŸ† Hackathon MetaPyTorch India Hackathon 2026

πŸ‘₯ Authors

Author GitHub
Adesh Deshmukh @AdeshDeshmukh
Pushkar Gandhi @pushkargandhi1020

πŸ“„ License

This project is licensed under the MIT License.


Built with ❀️ for MetaPyTorch Hackathon 2026

Helping AI agents become better data engineers, one SQL query at a time.

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors