| title | DataOps-Env |
|---|---|
| emoji | ποΈ |
| colorFrom | blue |
| colorTo | green |
| sdk | docker |
| pinned | false |
Built for MetaPyTorch Hackathon Β· Powered by Meta's OpenEnv Framework
DataOps-Env is a real-world reinforcement learning environment built on Meta's OpenEnv framework. It places AI agents in the role of a junior data engineer tasked with repairing corrupted SQLite databases through sequential SQL interactions.
Unlike traditional SQL benchmarks that test one-shot read-only queries, DataOps-Env simulates the full interactive loop of real data engineering:
Corrupted Database β Agent Explores β Agent Fixes β Grader Scores
Agents must discover, diagnose, and repair data quality issues step by step β earning rewards for meaningful progress and penalties for destructive or repeated actions.
| Traditional SQL Benchmarks | DataOps-Env |
|---|---|
| One-shot queries | Multi-step interactions |
| Read-only | Read + Write |
| Static evaluation | Dynamic reward signals |
| No agent feedback loop | Full RL environment |
| Toy problems | Real data engineering tasks |
DataOps-Env fills a genuine gap β there is no existing OpenEnv environment that benchmarks agents on interactive database repair. This is a task real data engineers perform daily.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β AI Agent β
β (LLM / RL Policy / Custom Script) β
ββββββββββββββββββββ---β¬βββββββββββββββββββββββββββββββββββ
β HTTP Actions (REST API)
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β DataOps-Env β
β β
β βββββββββββ ββββββββββββ ββββββββββ β
β β reset() β β step() β βstate() β β
β βββββββββββ ββββββββββββ ββββββββββ β
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β DataOpsEnv Core β β
β β Observation Β· Action Β· Reward Β· StepResult β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β ββββββββββββββ ββββββββββββββ ββββββββββββββββββββ β
β β task_easy β βtask_medium β β task_hard β β
β β seed.db β β seed.db β β seed.db β β
β β target.db β β target.db β β target.db β β
β ββββββββββββββ ββββββββββββββ ββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Goal: Repair inconsistent phone numbers and email addresses in the users table.
| Issue | Example Corrupted | Expected Fixed |
|---|---|---|
| Phone formats | (987)6543210, 987-654-3210, +91-9876543210 |
9876543210 |
| Email casing | PRIYA@OUTLOOK.COM, Adesh@Gmail.Com |
priya@outlook.com |
| Email spaces | sneha@ gmail.com, vikram @gmail.com |
sneha@gmail.com |
-- Fix phones
UPDATE users SET phone = REPLACE(REPLACE(REPLACE(phone, '-', ''), '(', ''), ')', '');
-- Fix emails
UPDATE users SET email = LOWER(TRIM(email));Step Limit: 10 Β· Baseline Score: 0.77
Goal: Migrate valid orders from legacy_orders into current_orders, excluding corrupted rows with negative amounts.
| Source Table | Issue | Action Required |
|---|---|---|
legacy_orders |
Negative amt values |
Exclude rows where amt < 0 |
legacy_orders |
Different column names | Map client_name β customer_name, amt β amount |
current_orders |
Empty (needs population) | Insert valid rows only |
INSERT OR IGNORE INTO current_orders (order_id, customer_name, amount, order_date)
SELECT order_id, client_name, amt, order_date
FROM legacy_orders WHERE amt > 0;Step Limit: 15 Β· Baseline Score: 0.89
Goal: Remove orphaned invoices and create a monthly revenue aggregation table.
| Issue | Description | Fix |
|---|---|---|
| Orphaned invoices | customer_id references non-existent customers |
DELETE orphans |
| Missing aggregation | No monthly_revenue table exists |
CREATE with JOIN + GROUP BY |
-- Remove orphans
DELETE FROM invoices
WHERE customer_id NOT IN (SELECT customer_id FROM customers);
-- Create revenue table
CREATE TABLE monthly_revenue AS
SELECT invoice_month, ROUND(SUM(amount), 2) AS total_revenue
FROM invoices JOIN customers USING (customer_id)
GROUP BY invoice_month ORDER BY invoice_month;Step Limit: 20 Β· Baseline Score: 0.80
Goal: Fill all β β―NULLβ―β values in the β β―employeesβ―β table using statistical imputation and a sensible default.
| Column | Issue | Repair Strategy |
|---|---|---|
| β β―ageβ―β | β β―NULLβ―β values | Replace with the rounded average of other employees. |
| β β―salaryβ―β | β β―NULLβ―β values | Replace with the average salary of other employees. |
| β β―departmentβ―β | β β―NULLβ―β values | Replace with the default text 'General'. |
β β―sql -- Fix NULL ages with rounded average UPDATE employees SET age = (SELECT CAST(ROUND(AVG(age)) AS INTEGER) FROM employees WHERE age IS NOT NULL) WHERE age IS NULL;
-- Fix NULL salaries with average UPDATE employees SET salary = (SELECT ROUND(AVG(salary), 2) FROM employees WHERE salary IS NOT NULL) WHERE salary IS NULL;
-- Fix NULL departments with default UPDATE employees SET department = 'General' WHERE department IS NULL; β―β Step Limit: 15 Β· Baseline Score: 0.91
Goal: Remove all duplicate customer records from the β β―customersβ―β table, keeping only the record with the lowest β β―idβ―β for each email.
| Issue | Description | Fix |
|---|---|---|
| Duplicate β β―email`s | Multiple rows share the same `emailβ―β . | β β―DELETEβ―β all but the row with β β―MIN(id)β―β . |
| Data Integrity | Retained records must not be modified. | Only β β―DELETEβ―β actions should be used. |
β β―sql -- Remove duplicates, keeping only the lowest id per email DELETE FROM customers WHERE id NOT IN ( SELECT MIN(id) FROM customers GROUP BY email ); β―β Step Limit: 15 Β· Baseline Score: 0.85
Every call to step() or reset() returns a structured Observation:
class Observation(BaseModel):
task_id: str # "task_easy" | "task_medium" | "task_hard"
goal: str # Natural language task description
available_tables: List[str] # Tables in current working database
schema_summary: Dict[str, List] # Column names per table
last_query_result: Optional[Any] # Result rows from last SQL query
last_error: Optional[str] # Error message if last action failed
steps_remaining: int # Steps left before episode ends
current_step: int # Current step numberAgents can take 3 types of actions:
class Action(BaseModel):
action_type: str # Required
query: Optional[str] # Required for execute_sql
table_name: Optional[str] # Optional referenceaction_type |
Description | When to Use |
|---|---|---|
inspect_schema |
View all table schemas and columns | First step, orientation |
execute_sql |
Run any SQL (SELECT, UPDATE, INSERT, DELETE, CREATE) | Data exploration and repair |
finish_task |
Submit solution for grading | When confident fix is complete |
Final Score = Step Rewards + Grader Bonus
= Ξ£(step_rewards) + (grader_score Γ 0.50)
| Event | Reward | Reason |
|---|---|---|
| Schema inspection | +0.03 | Encourages exploration |
| Valid SQL executed | +0.03 | Rewards useful actions |
| Invalid SQL | -0.05 | Penalizes bad queries |
| Repeated query | -0.02 | Prevents loops |
| Destructive action (DROP) | -0.20 | Blocks data loss |
| Task completion | grader_score Γ 0.50 | Main objective signal |
Reward shaping ensures agents receive signal throughout the full trajectory β not just at the end. This enables efficient RL training.
Achieved by running demo.py with deterministic SQL operations:
| Task | Grader Score | Final Score | Steps Used |
|---|---|---|---|
| task_easy | 0.84 / 1.0 β | 0.84 | 10 / 10 |
| task_medium | 0.94/ 1.0 β | 0.94 | 15 / 15 |
| task_hard | 1.0 / 1.0 β | 1.00 | 20 / 20 |
| task_null_rpair | 0.81 / 1.0 β | 0.81 | 15 / 15 |
| task_dedup | 0.84 / 1.0 β | 0.84 | 15 / 15 |
| Average | 0.9 / 1.0 | 0.90 | β |
All 3 tasks achieve perfect grader accuracy (1.0). Final scores below 1.0 reflect the reward formula design where step rewards contribute alongside the grader bonus.
git clone https://github.com/AdeshDeshmukh/dataops-env
cd dataops-env
pip install -r requirements.txtpython3 create_databases.pyCreating databases...
task1_seed.db β
task1_target.db β
task2_seed.db β
task2_target.db β
task3_seed.db β
task3_target.db β
All 6 databases created successfully.
python3 demo.py# Terminal 1 - Start server
python3 app.py
# Terminal 2 - Test endpoints
curl http://localhost:7860/
curl -X POST http://localhost:7860/reset \
-H "Content-Type: application/json" \
-d '{"task_id": "task_easy"}'
curl -X POST http://localhost:7860/step \
-H "Content-Type: application/json" \
-d '{"action_type": "inspect_schema"}'
curl -X POST http://localhost:7860/step \
-H "Content-Type: application/json" \
-d '{"action_type": "execute_sql", "query": "SELECT * FROM users"}'
curl -X POST http://localhost:7860/step \
-H "Content-Type: application/json" \
-d '{"action_type": "finish_task"}'docker build -t dataops-env .
docker run -p 7860:7860 dataops-envexport HF_TOKEN=your_huggingface_token
export API_BASE_URL=https://router.huggingface.co/v1
export MODEL_NAME=Qwen/Qwen2.5-72B-Instruct
python3 inference.py[START] task=task_easy env=dataops-env model=Qwen/Qwen2.5-72B-Instruct
[STEP] step=1 action=inspect_schema reward=0.03 done=false error=null
[STEP] step=2 action=execute_sql('SELECT * FROM users') reward=0.03 done=false error=null
[STEP] step=3 action=execute_sql('UPDATE users SET phone...') reward=0.03 done=false error=null
[STEP] step=4 action=finish_task reward=0.50 done=true error=null
[END] success=true steps=4 score=0.77 rewards=0.03,0.03,0.03,0.50
| Method | Endpoint | Description | Body |
|---|---|---|---|
| GET | / |
Health check | None |
| POST | /reset |
Initialize task | {"task_id": "task_easy"} |
| POST | /step |
Execute action | {"action_type": "...", "query": "..."} |
| GET | /state |
Current state | None |
| GET | /tasks |
List all tasks | None |
# 1. Start task
POST /reset {"task_id": "task_easy"}
# 2. Look at schema
POST /step {"action_type": "inspect_schema"}
# 3. View data
POST /step {"action_type": "execute_sql", "query": "SELECT * FROM users"}
# 4. Fix phones
POST /step {"action_type": "execute_sql", "query": "UPDATE users SET phone = REPLACE(phone, '-', '')"}
# 5. Fix emails
POST /step {"action_type": "execute_sql", "query": "UPDATE users SET email = LOWER(TRIM(email))"}
# 6. Submit
POST /step {"action_type": "finish_task"}dataops-env/
β
βββ π§ Core Environment
β βββ envs/
β β βββ __init__.py
β β βββ dataops_env.py # RL environment (Observation, Action, Reward)
β βββ graders/
β βββ __init__.py
β βββ grader.py # Task evaluation & scoring
β
βββ π Task Management
β βββ tasks/
β β βββ __init__.py
β β βββ task_definition.py # Task loading & config
β β βββ assets/ # 6 SQLite databases (seed + target)
β βββ configs/
β βββ env_config.yaml # Task definitions & reward config
β
βββ π‘ API Layer
β βββ app.py # FastAPI server (5 endpoints)
β βββ server/
β βββ app.py # OpenEnv server entry point
β
βββ π€ Agent Scripts
β βββ inference.py # LLM autonomous agent
β βββ demo.py # Deterministic baseline demo
β
βββ π Deployment
β βββ Dockerfile # Container configuration
β βββ requirements.txt # Python dependencies
β βββ pyproject.toml # Project metadata & OpenEnv config
β βββ uv.lock # Dependency lock file
β βββ openenv.yaml # OpenEnv specification
β
βββ π Documentation
βββ README.md # This file
DANGEROUS_PATTERNS = ['DROP TABLE', 'DROP DATABASE', 'TRUNCATE']
# Blocks destructive actions
if _is_dangerous(action.query):
step_reward = -0.20 # Heavy penalty
self.done = True # Episode ends immediately- DROP TABLE β Blocked + -0.20 penalty + episode ends
- Repeated queries β -0.02 penalty
- Invalid SQL β -0.05 penalty
- Empty query β -0.05 penalty
| Component | Technology |
|---|---|
| Language | Python 3.10 |
| API Framework | FastAPI + Uvicorn |
| Database | SQLite 3 |
| Models | Pydantic v2 |
| LLM Client | OpenAI SDK |
| Config | YAML |
| Container | Docker |
| Deployment | Hugging Face Spaces |
| Resource | URL |
|---|---|
| π€ Live Demo | https://huggingface.co/spaces/AdeshDeshmukh/dataops-env |
| π API Docs | https://adeshdeshmukh-dataops-env.hf.space/docs |
| π» GitHub | https://github.com/AdeshDeshmukh/dataops-env |
| π Hackathon | MetaPyTorch India Hackathon 2026 |
| Author | GitHub |
|---|---|
| Adesh Deshmukh | @AdeshDeshmukh |
| Pushkar Gandhi | @pushkargandhi1020 |
This project is licensed under the MIT License.
Built with β€οΈ for MetaPyTorch Hackathon 2026
Helping AI agents become better data engineers, one SQL query at a time.