Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 57 additions & 84 deletions trading_crew/src/trading_crew/crew.py
Original file line number Diff line number Diff line change
@@ -1,97 +1,70 @@
from crewai import Agent, Crew, Process, Task
from crewai.project import CrewBase, agent, crew, task, before_kickoff, after_kickoff
from tools.alpaca_market_tool import AlpacaMarketDataTool
from knowledge.trading_strategy import trading_strategy_source
# Uncomment the following line to use an example of a custom tool
# from trading_crew.tools.custom_tool import MyCustomTool
import logging
from datetime import datetime
from typing import Optional, Dict, Any
import ccxt
from trading_crew.state import market_state

# Check our tools documentations for more information on how to use them
# from crewai_tools import SerperDevTool
log = logging.getLogger(__name__)

class TradingCrew:
"""Trading crew that manages the trading operations."""

def __init__(self, test: bool = False, credentials_path: Optional[str] = None):
"""Initialize the trading crew.

@CrewBase
class TradingCrew():
"""TradingCrew crew"""
Args:
test: Whether to run in test mode
credentials_path: Path to credentials file
"""
self.test = test
self.credentials_path = credentials_path
self.exchange = self._initialize_exchange()

agents_config = 'config/agents.yaml'
tasks_config = 'config/tasks.yaml'
def _initialize_exchange(self) -> ccxt.Exchange:
"""Initialize the exchange connection.

@before_kickoff # Optional hook to be executed before the crew starts
def pull_data_example(self, inputs):
# Example of pulling data from an external API, dynamically changing the inputs
inputs['extra_data'] = "This is extra data"
return inputs
Returns:
Configured exchange instance
"""
if self.test:
return ccxt.binance({'options': {'defaultType': 'future'}})

@after_kickoff # Optional hook to be executed after the crew has finished
def log_results(self, output):
# Example of logging results, dynamically changing the output
print(f"Results: {output}")
return output
# TODO: Implement credentials loading and real exchange initialization
return ccxt.binance({'options': {'defaultType': 'future'}})

@agent
def market_analyst(self) -> Agent:
return Agent(
config=self.agents_config['market_analyst'],
verbose=True,
tools=[AlpacaMarketDataTool()]
)
def _fetch_market_data(self) -> None:
"""Fetch and update market data in the global state."""
try:
# Fetch current price
ticker = self.exchange.fetch_ticker(market_state.symbol)
market_state.update_price(ticker['last'])

@agent
def technical_analyst(self) -> Agent:
return Agent(
config=self.agents_config['technical_analyst'],
verbose=True,
tools=[AlpacaMarketDataTool()]
)
# Fetch historical data
ohlcv = self.exchange.fetch_ohlcv(
market_state.symbol,
market_state.timeframe,
limit=100
)
df = self.exchange.convert_ohlcv_to_dataframe(ohlcv)
market_state.update_historical_data(df)

@agent
def strategy_evaluator(self) -> Agent:
return Agent(
config=self.agents_config['strategy_evaluator'],
verbose=True
# Update timestamp
market_state.last_updated = datetime.now().isoformat()

)
except Exception as e:
log.error(f'Error fetching market data: {str(e)}')
raise

@agent
def risk_manager(self) -> Agent:
return Agent(
config=self.agents_config['risk_manager'],
verbose=True
)

@task
def market_analysis_task(self) -> Task:
return Task(
config=self.tasks_config['market_analysis_task']
)

@task
def technical_analysis_task(self) -> Task:
return Task(
config=self.tasks_config['technical_analysis_task']
)

@task
def strategy_evaluation_task(self) -> Task:
return Task(
config=self.tasks_config['strategy_evaluation_task']
)

@task
def risk_assessment_task(self) -> Task:
return Task(
config=self.tasks_config['risk_assessment_task']
)
def run(self) -> None:
"""Run the trading crew operations."""
try:
log.info(f'Starting trading crew for {market_state.symbol}')
self._fetch_market_data()

# TODO: Implement trading logic using market_state
log.info(f'Current price: {market_state.current_price}')
log.info(f'Last updated: {market_state.last_updated}')

@crew
def crew(self) -> Crew:
"""Creates the TradingCrew crew"""
return Crew(
agents=self.agents, # Automatically created by the @agent decorator
tasks=self.tasks, # Automatically created by the @task decorator
process=Process.sequential,
verbose=True,
knowledge={"sources": [trading_strategy_source], "metadata": {"trading_strategy": "simple"}}
# process=Process.hierarchical, # In case you wanna use that instead https://docs.crewai.com/how-to/Hierarchical/
)
except Exception as e:
log.error(f'Error in trading crew: {str(e)}')
raise
110 changes: 49 additions & 61 deletions trading_crew/src/trading_crew/main.py
Original file line number Diff line number Diff line change
@@ -1,61 +1,49 @@
#!/usr/bin/env python
import sys
import warnings

from crew import TradingCrew

warnings.filterwarnings("ignore", category=SyntaxWarning, module="pysbd")

# This main file is intended to be a way for you to run your
# crew locally, so refrain from adding unnecessary logic into this file.
# Replace with inputs you want to test with, it will automatically
# interpolate any tasks and agents information

def run():
"""
Run the crew.
"""
inputs = {
'topic': 'AI LLMs'
}
TradingCrew().crew().kickoff(inputs=inputs)


def train():
"""
Train the crew for a given number of iterations.
"""
inputs = {
"topic": "AI LLMs"
}
try:
TradingCrew().crew().train(n_iterations=int(sys.argv[1]), filename=sys.argv[2], inputs=inputs)

except Exception as e:
raise Exception(f"An error occurred while training the crew: {e}")

def replay():
"""
Replay the crew execution from a specific task.
"""
try:
TradingCrew().crew().replay(task_id=sys.argv[1])

except Exception as e:
raise Exception(f"An error occurred while replaying the crew: {e}")

def test():
"""
Test the crew execution and returns the results.
"""
inputs = {
"topic": "AI LLMs"
}
try:
TradingCrew().crew().test(n_iterations=int(sys.argv[1]), openai_model_name=sys.argv[2], inputs=inputs)

except Exception as e:
raise Exception(f"An error occurred while replaying the crew: {e}")

if __name__ == "__main__":
run()
import logging
from typing import List, Optional
from trading_crew.crew import TradingCrew
from trading_crew.state import market_state

log = logging.getLogger(__name__)

def run_trading_crew(
symbol: str,
timeframe: str = '1h',
test: bool = False,
credentials_path: Optional[str] = None,
) -> None:
"""Run the trading crew for a specific symbol and timeframe.

Args:
symbol: Trading pair symbol (e.g., 'BTC/USDT')
timeframe: Candlestick timeframe (default: '1h')
test: Whether to run in test mode
credentials_path: Path to credentials file
"""
# Initialize market state
market_state.symbol = symbol
market_state.timeframe = timeframe

crew = TradingCrew(test=test, credentials_path=credentials_path)
crew.run()

def main():
"""Main entry point for the trading crew application."""
import argparse

parser = argparse.ArgumentParser(description='Run the trading crew')
parser.add_argument('--symbol', type=str, required=True, help='Trading pair symbol')
parser.add_argument('--timeframe', type=str, default='1h', help='Candlestick timeframe')
parser.add_argument('--test', action='store_true', help='Run in test mode')
parser.add_argument('--credentials', type=str, help='Path to credentials file')

args = parser.parse_args()

run_trading_crew(
symbol=args.symbol,
timeframe=args.timeframe,
test=args.test,
credentials_path=args.credentials,
)

if __name__ == '__main__':
main()
32 changes: 32 additions & 0 deletions trading_crew/src/trading_crew/state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from dataclasses import dataclass, field
from typing import Dict, Optional
import pandas as pd

@dataclass
class MarketState:
"""Global state for market data."""
symbol: str = ''
timeframe: str = ''
current_price: float = 0.0
historical_data: Optional[pd.DataFrame] = None
indicators: Dict = field(default_factory=dict)
last_updated: Optional[str] = None

def update_price(self, price: float) -> None:
"""Update the current price."""
self.current_price = price

def update_historical_data(self, data: pd.DataFrame) -> None:
"""Update historical price data."""
self.historical_data = data

def add_indicator(self, name: str, value: any) -> None:
"""Add or update a technical indicator."""
self.indicators[name] = value

def get_indicator(self, name: str) -> any:
"""Get a technical indicator value."""
return self.indicators.get(name)

# Global market state instance
market_state = MarketState()