-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcoding_agent.py
More file actions
146 lines (106 loc) · 5.18 KB
/
coding_agent.py
File metadata and controls
146 lines (106 loc) · 5.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#LLMs The Brain:
from base_agent import BaseAgent
from google.genai import types
#The structured output:
from pydantic import BaseModel, field_validator, ConfigDict
from typing import List, Literal, Optional
#The tools:
import os
import json
import subprocess #For executing commands on the terminal
from pathlib import Path
#Debugging flags
writing_code = True
pydantic_test = False
#Source code begins
class CodeSchema(BaseModel):
#Enable field validation from pydantic
model_config = ConfigDict(validate_default=True)
kind: Literal["code", "command"]
filename: Optional[str] = None
code: Optional[List[str]] = None
command: Optional[str] = None
#cls is the class, v is the value, info is the resulting schema
@field_validator("code", mode="after")
def code_required(cls, v, info):
if info.data.get("kind") == "code" and not v:
raise ValueError("[DEBUG] code[] required when kind='code'")
return v
@field_validator("command", mode="after")
def command_required(cls, v, info):
if info.data.get("kind") == "command" and not v:
raise ValueError("[DEBUG] command required when kind='command'")
return v
class ArrayOfCode(BaseModel): #This will result in the LLM returning a dictionary
commands: List[CodeSchema]
class CodingAgent(BaseAgent):
def __init__(self, model_name:str, system_prompt:str=""):
super().__init__(model_name, system_prompt)
self.config = types.GenerateContentConfig(system_instruction=system_prompt,
response_mime_type="application/json",
response_schema=ArrayOfCode)
with open("cached_code/counter.txt", "r+") as f:
counter = int(f.read())
if(writing_code):
counter += 1
f.seek(0)
f.write(str(counter))
self.example_filename = f"code_example{counter}.txt"
def write_code(self, input:str=""):
self.gemini_output = self.call_LLM(input, self.config)
#print(self.gemini_output.text)
#print(self.gemini_output.parsed)
#For validation or you could just use self.gemini_output.parsed directly for a naive trusting approach
if (pydantic_test):
commands = [CodeSchema.model_validate(item) for item in self.gemini_output.parsed]
else:
commands = self.gemini_output.parsed
payload = [commands.model_dump() for c in commands]
#Dumps the pydantic model data into JSON
with open(f"cached_code/{self.example_filename}", "w") as f:
json.dump(payload, f, ensure_ascii=False, indent=2)
def load_code(self):
#Testing
#Read saved code from the cache to reduce API useage during testing
with open(f"cached_code/{self.example_filename}", "r") as f:
commands_cached = f.read()
cached_dict = json.loads(commands_cached) #Convert string to dictionary
if (pydantic_test):
commands = [CodeSchema.model_validate(item) for item in cached_dict] #Now validate it
else:
commands = cached_dict
output = self.execute(commands)
print(output)
def execute(self, commands:List):
data = commands[0]["commands"]
output = []
result = None
working_directory = Path("agent_generated_code")
for command in data:
match command['kind']:
case "code":
with open(f"agent_generated_code/{command["filename"]}", "w") as f:
text = "\n".join(command["code"])
f.write(text)
case "command":
try:
result = subprocess.run(["cmd", "/c", command["command"]], cwd=working_directory, stdin=subprocess.DEVNULL, capture_output=True, text=True, timeout=30, check=True)
output.append(result.stdout)
except subprocess.TimeoutExpired as e:
print(f"[DEBUG] Process took too long (> {e.timeout} seconds), killing it.")
except subprocess.CalledProcessError as e:
print(f"[DEBUG] Process failed with exit code {e.returncode}")
return output
def coding_agent(self, input:str):
self.write_code(input)
commands = self.load_code()
output = self.execute(commands)
print(output)
if __name__ == "__main__":
with open("cached_code/system_prompt.txt", "r") as f:
prompt = f.read()
input = "Build me an AI agent that uses OpenAI (You can just leave the API key blank) and works through Azure AD. I want it to also have tool calling abilities so that it can call PowerBI API to build PowerBI dashboards given only an excel file and input text. Make this a chatbot with a GUI and create your own folder to save your files."
agent = CodingAgent("gemini-2.5-flash", system_prompt=prompt)
if (writing_code):
agent.write_code(input)
agent.load_code()