Skip to content
Merged

Jdev #27

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 182 additions & 15 deletions Backend/services/recommendation_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import yfinance as yf
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
import numpy as np
import pandas as pd
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
from .stock_service import StockService, sanitize_value
Expand Down Expand Up @@ -71,26 +73,40 @@ def get_news_sentiment(ticker: str) -> Dict:
Returns average sentiment and individual news sentiments
"""
try:
news = StockService.format_news(yf.Ticker(ticker).get_news(count=20, tab='all'))
formatted_news = StockService.format_news(news)

stock = yf.Ticker(ticker)
news = stock.get_news(count=20)

if not news:
logger.warning(f"No news found for {ticker}")
return {'average_sentiment': 'neutral', 'sentiment_score': 0.0, 'news_count': 0}

# Format news using StockService
formatted_news = StockService.format_news(news)

if not formatted_news:
logger.warning(f"No formatted news for {ticker}")
return {'average_sentiment': 'neutral', 'sentiment_score': 0.0, 'news_count': 0}

sentiments = []
for article in formatted_news:
content = article.get('content', {})
title = content.get('title', '')
summary = content.get('summary', '')
# Handle both flattened format and nested content format
title = article.get('title', '')
summary = article.get('summary', '')

# Combine title and summary for analysis
text = f"{title}. {summary}" if summary else title

if text:
if text and len(text.strip()) > 10: # Ensure meaningful text
sentiment_result = RecommendationService.analyze_sentiment(text)
sentiments.append(sentiment_result)
logger.info(f"Analyzed article for {ticker}: '{title[:50]}...' -> {sentiment_result}")
else:
logger.debug(f"Skipping article with insufficient text for {ticker}")

logger.info(f"Total sentiments analyzed for {ticker}: {len(sentiments)} out of {len(formatted_news)} articles")

if not sentiments:
logger.warning(f"No sentiments extracted from {len(formatted_news)} news articles for {ticker}")
return {'average_sentiment': 'neutral', 'sentiment_score': 0.0, 'news_count': 0}

# Calculate weighted average sentiment score
Expand Down Expand Up @@ -182,6 +198,119 @@ def get_analyst_recommendation(ticker: str) -> Dict:
logger.error(f"Error getting analyst recommendation for {ticker}: {e}")
return {'recommendation': 'hold', 'confidence': 0.0}

@staticmethod
def calculate_volatility(ticker: str, period: str = '3mo') -> Dict:
"""
Calculate historical volatility (annualized standard deviation of returns)
Returns volatility percentage and risk level
"""
try:
stock = yf.Ticker(ticker)
history = stock.history(period=period)

if history.empty or len(history) < 2:
logger.warning(f"Insufficient data for volatility calculation: {ticker}")
return {'volatility': 0.0, 'risk_level': 'unknown'}

# Calculate daily returns
history['returns'] = history['Close'].pct_change()

# Calculate standard deviation of returns
std_dev = history['returns'].std()

# Annualize volatility (assuming 252 trading days)
annualized_volatility = std_dev * np.sqrt(252) * 100 # Convert to percentage

# Determine risk level
if annualized_volatility < 15:
risk_level = 'low'
risk_score = 0.8 # Low volatility is good
elif annualized_volatility < 30:
risk_level = 'moderate'
risk_score = 0.5
elif annualized_volatility < 50:
risk_level = 'high'
risk_score = 0.2
else:
risk_level = 'very_high'
risk_score = 0.0

logger.info(f"Volatility for {ticker}: {annualized_volatility:.2f}% ({risk_level})")

return {
'volatility': round(annualized_volatility, 2),
'risk_level': risk_level,
'risk_score': risk_score # 0-1, higher is better (lower volatility)
}

except Exception as e:
logger.error(f"Error calculating volatility for {ticker}: {e}")
return {'volatility': 0.0, 'risk_level': 'unknown', 'risk_score': 0.5}

@staticmethod
def calculate_rsi(ticker: str, period: str = '3mo', rsi_period: int = 14) -> Dict:
"""
Calculate Relative Strength Index (RSI)
RSI ranges from 0-100:
- Above 70: Overbought (potential sell signal)
- Below 30: Oversold (potential buy signal)
- 40-60: Neutral
"""
try:
stock = yf.Ticker(ticker)
history = stock.history(period=period)

if history.empty or len(history) < rsi_period + 1:
logger.warning(f"Insufficient data for RSI calculation: {ticker}")
return {'rsi': 50.0, 'signal': 'neutral', 'rsi_score': 0.0}

# Calculate price changes
delta = history['Close'].diff()

# Separate gains and losses
gains = delta.where(delta > 0, 0)
losses = -delta.where(delta < 0, 0)

# Calculate average gains and losses using EMA (Exponential Moving Average)
avg_gain = gains.ewm(span=rsi_period, adjust=False).mean()
avg_loss = losses.ewm(span=rsi_period, adjust=False).mean()

# Calculate RS (Relative Strength)
rs = avg_gain / avg_loss

# Calculate RSI
rsi = 100 - (100 / (1 + rs))
current_rsi = float(rsi.iloc[-1])

# Determine signal and score
if current_rsi > 70:
signal = 'overbought'
rsi_score = -0.5 # Negative score for overbought (sell signal)
elif current_rsi > 60:
signal = 'slightly_overbought'
rsi_score = -0.2
elif current_rsi < 30:
signal = 'oversold'
rsi_score = 0.8 # Positive score for oversold (buy signal)
elif current_rsi < 40:
signal = 'slightly_oversold'
rsi_score = 0.4
else:
signal = 'neutral'
rsi_score = 0.0

logger.info(f"RSI for {ticker}: {current_rsi:.2f} ({signal})")

return {
'rsi': round(current_rsi, 2),
'signal': signal,
'rsi_score': rsi_score # -1 to 1, positive favors buy
}

except Exception as e:
logger.error(f"Error calculating RSI for {ticker}: {e}")
return {'rsi': 50.0, 'signal': 'neutral', 'rsi_score': 0.0}

@staticmethod
def analyze_holding(holding: Dict, in_portfolio: bool = True) -> Optional[Dict]:
"""
Expand Down Expand Up @@ -217,6 +346,10 @@ def analyze_holding(holding: Dict, in_portfolio: bool = True) -> Optional[Dict]:
# Get analyst recommendations
analyst_rec = RecommendationService.get_analyst_recommendation(ticker)

# Get technical indicators
volatility_data = RecommendationService.calculate_volatility(ticker)
rsi_data = RecommendationService.calculate_rsi(ticker)

# Convert analyst recommendation to score
rec_map = {
'strong_buy': 1.0,
Expand All @@ -236,6 +369,10 @@ def analyze_holding(holding: Dict, in_portfolio: bool = True) -> Optional[Dict]:
'sentimentScore': news_sentiment['sentiment_score'],
'analystRecommendation': analyst_rec['recommendation'],
'analystConfidence': analyst_rec['confidence'],
'volatility': volatility_data['volatility'],
'riskLevel': volatility_data['risk_level'],
'rsi': rsi_data['rsi'],
'rsiSignal': rsi_data['signal'],
'inPortfolio': in_portfolio
}

Expand All @@ -245,11 +382,14 @@ def analyze_holding(holding: Dict, in_portfolio: bool = True) -> Optional[Dict]:
gain_loss_percent = (gain_loss / buy_price) * 100 if buy_price > 0 else 0
performance_score = max(min(gain_loss_percent / 50, 1), -1) # Normalize to -1 to 1

# Weighted composite score: performance (30%), sentiment (35%), analyst (35%)
# Weighted composite score for portfolio holdings:
# Performance (20%), Sentiment (25%), Analyst (25%), RSI (15%), Risk (15%)
composite_score = (
performance_score * 0.30 +
sentiment_score * 0.35 +
analyst_score * 0.35
performance_score * 0.20 +
sentiment_score * 0.25 +
analyst_score * 0.25 +
rsi_data['rsi_score'] * 0.15 +
volatility_data['risk_score'] * 0.15
)

# Determine action for existing holdings
Expand All @@ -274,14 +414,18 @@ def analyze_holding(holding: Dict, in_portfolio: bool = True) -> Optional[Dict]:
gain_loss_percent,
news_sentiment['average_sentiment'],
analyst_rec['recommendation'],
rsi_data['signal'],
volatility_data['risk_level'],
in_portfolio
)
})
else:
# For stocks not in portfolio: only sentiment (50%) + analyst (50%)
# For stocks not in portfolio: Sentiment (30%) + Analyst (30%) + RSI (20%) + Risk (20%)
composite_score = (
sentiment_score * 0.50 +
analyst_score * 0.50
sentiment_score * 0.30 +
analyst_score * 0.30 +
rsi_data['rsi_score'] * 0.20 +
volatility_data['risk_score'] * 0.20
)

# Determine action for new investments
Expand All @@ -301,6 +445,8 @@ def analyze_holding(holding: Dict, in_portfolio: bool = True) -> Optional[Dict]:
None,
news_sentiment['average_sentiment'],
analyst_rec['recommendation'],
rsi_data['signal'],
volatility_data['risk_level'],
in_portfolio
)
})
Expand All @@ -311,7 +457,8 @@ def analyze_holding(holding: Dict, in_portfolio: bool = True) -> Optional[Dict]:
return None

@staticmethod
def _generate_reasoning(performance_pct: Optional[float], sentiment: str, analyst_rec: str, in_portfolio: bool) -> str:
def _generate_reasoning(performance_pct: Optional[float], sentiment: str, analyst_rec: str,
rsi_signal: str, risk_level: str, in_portfolio: bool) -> str:
"""Generate human-readable reasoning for recommendation"""
parts = []

Expand All @@ -325,9 +472,29 @@ def _generate_reasoning(performance_pct: Optional[float], sentiment: str, analys
else:
parts.append(f"Negative returns ({performance_pct:.1f}%)")

# Add RSI signal
if rsi_signal == 'oversold':
parts.append("RSI oversold (buy opportunity)")
elif rsi_signal == 'overbought':
parts.append("RSI overbought (sell signal)")
elif rsi_signal == 'slightly_oversold':
parts.append("RSI slightly oversold")
elif rsi_signal == 'slightly_overbought':
parts.append("RSI slightly overbought")

parts.append(f"{sentiment} news sentiment")
parts.append(f"analysts {analyst_rec.replace('_', ' ')}")

# Add risk level
if risk_level == 'low':
parts.append("low volatility (stable)")
elif risk_level == 'very_high':
parts.append("very high volatility (risky)")
elif risk_level == 'high':
parts.append("high volatility")
elif risk_level == 'moderate':
parts.append("moderate volatility")

if not in_portfolio:
parts.append("not currently in portfolio")

Expand Down
90 changes: 50 additions & 40 deletions Backend/services/stock_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -829,17 +829,22 @@ def handle_connect():
emit('connected', {'message': 'Successfully connected to stock price stream'})

@socketio.on('disconnect')
def handle_disconnect():
logger.info(f"Client disconnected: {request.sid}")
def handle_disconnect(reason=None):
logger.info(f"Client disconnected: {request.sid}, reason: {reason}")
# Stop any active price streaming for this client
if request.sid in active_connections:
active_connections[request.sid]['active'] = False
# Wait for thread to finish (with timeout)
thread = active_connections[request.sid].get('thread')
if thread and thread.is_alive():
thread.join(timeout=1.0) # Wait max 1 second
del active_connections[request.sid]
logger.info(f"Cleaned up thread for client {request.sid}")
try:
active_connections[request.sid]['active'] = False
# Wait for thread to finish (with timeout)
thread = active_connections[request.sid].get('thread')
if thread and thread.is_alive():
thread.join(timeout=1.0) # Wait max 1 second
del active_connections[request.sid]
logger.info(f"Cleaned up thread for client {request.sid}")
except KeyError:
logger.warning(f"Client {request.sid} already removed from active_connections")
except Exception as e:
logger.error(f"Error cleaning up connection {request.sid}: {str(e)}")

@socketio.on('subscribe_ticker')
def handle_subscribe_ticker(data):
Expand Down Expand Up @@ -882,41 +887,46 @@ def stream_prices(sid):
stock_info = stock.info
# Get latest price data
history = stock.history(period="1d", interval="1m")

if history.empty:
logger.warning(f"No history data available for {ticker}")
socketio.emit('error', {'message': f'No data available for {ticker}'}, room=sid)
time.sleep(120)
continue

latest_timestamp = history.index[-1] # Get the datetime index
print(f"Latest timestamp for {ticker}: {latest_timestamp}")
print(f"Streaming price for {stock_info.get('shortName', ticker)}, history length: {len(history)}")
if not history.empty:
latest = history.iloc[-1]
current_price = float(latest['Close'])
open_price = float(history.iloc[0]['Open'])
high_price = float(max(history['High']))
low_price = float(min(history['Low']))
volume = int(latest['Volume'] if latest['Volume'] != 0 else stock.info.get('volume', 0))

# Calculate change
day_open = history.iloc[0]['Open']
change = current_price - day_open
change_percent = (change / day_open * 100) if day_open > 0 else 0

price_data = {
'ticker': ticker,
'currency': stock_info.get('currency', 'USD'),
'name': stock_info.get('shortName', ticker),

'price': round(current_price, 2),
'open': round(open_price, 2),
'high': round(high_price, 2),
'low': round(low_price, 2),
'volume': volume,
'change': round(change, 2),
'changePercent': round(change_percent, 2),
'timestamp': latest_timestamp.strftime('%Y-%m-%d %H:%M:%S')
}

latest = history.iloc[-1]
current_price = float(latest['Close'])
open_price = float(history.iloc[0]['Open'])
high_price = float(max(history['High']))
low_price = float(min(history['Low']))
volume = int(latest['Volume'] if latest['Volume'] != 0 else stock.info.get('volume', 0))

# Calculate change
day_open = history.iloc[0]['Open']
change = current_price - day_open
change_percent = (change / day_open * 100) if day_open > 0 else 0

price_data = {
'ticker': ticker,
'currency': stock_info.get('currency', 'USD'),
'name': stock_info.get('shortName', ticker),

socketio.emit('price_update', price_data, room=sid)
logger.debug(f"Sent price update for {ticker} to {sid}: ${current_price}")
else:
logger.warning(f"No price data available for {ticker}")
'price': round(current_price, 2),
'open': round(open_price, 2),
'high': round(high_price, 2),
'low': round(low_price, 2),
'volume': volume,
'change': round(change, 2),
'changePercent': round(change_percent, 2),
'timestamp': latest_timestamp.strftime('%Y-%m-%d %H:%M:%S')
}

socketio.emit('price_update', price_data, room=sid)
logger.debug(f"Sent price update for {ticker} to {sid}: ${current_price}")

# Wait before next update (2 minutes to prevent rate limiting)
time.sleep(120)
Expand Down
Loading
Loading