-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscanner.py
More file actions
2784 lines (2582 loc) · 120 KB
/
scanner.py
File metadata and controls
2784 lines (2582 loc) · 120 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import asyncio
import ccxt.pro as ccxt_async # Use ccxt.pro for async support; install with 'pip install ccxtpro'
import pandas as pd
import numpy as np
import time
import aiohttp
from datetime import datetime, timezone
import json
import os
import multiprocessing
import logging
from tqdm import tqdm
import async_timeout
from dataclasses import dataclass
from typing import Dict, List, Optional, Union
import warnings
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from collections import Counter
from pathlib import Path
# Suppress pandas future warnings
warnings.filterwarnings('ignore', category=FutureWarning)
# Technical indicators with better error handling
BollingerBands = None
average_true_range = None
StochasticOscillator = None
rsi = None
ema_indicator = None
sma_indicator = None
adx = None
on_balance_volume = None
try:
from ta.volatility import BollingerBands, average_true_range
from ta.momentum import StochasticOscillator, rsi
from ta.trend import ema_indicator, sma_indicator, adx
from ta.volume import on_balance_volume
TA_AVAILABLE = True
except ImportError:
TA_AVAILABLE = False
logging.warning("'ta' library not found. Install with 'pip install ta' for full indicator support.")
# Set up enhanced logging
def setup_logging():
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.handlers.clear()
file_handler = logging.FileHandler('momentum_scanner.log')
file_handler.setLevel(logging.DEBUG)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
logger = setup_logging()
@dataclass
class TradingConfig:
"""Configuration class for trading parameters"""
timeframes: Dict[str, str]
backtest_periods: Dict[str, int]
momentum_periods: Dict[str, Dict[str, Dict[str, int]]]
signal_thresholds: Dict[str, Dict[str, Dict[str, float]]]
trade_durations: Dict[str, int]
rsi_period: int = 14
macd_params: Optional[Dict[str, int]] = None
volume_trend_thresholds: Optional[Dict[str, float]] = None
retry_attempts: int = 3
retry_delay: int = 2
rate_limit_delay: float = 0.01
max_concurrent_requests: int = 50
circuit_breaker_threshold: int = 10
circuit_breaker_pause: int = 60
websocket_update_interval: int = 1
volume_profile_bins: int = 50
fixed_range_bars: int = 20
def __post_init__(self):
if self.macd_params is None:
self.macd_params = {"fast": 12, "slow": 26, "signal": 9}
if self.volume_trend_thresholds is None:
self.volume_trend_thresholds = {"up": 0.05, "down": -0.05}
def get_dynamic_config() -> TradingConfig:
"""Generate dynamic configuration based on system resources"""
cpu_count = multiprocessing.cpu_count()
max_concurrent = min(max(20, cpu_count * 5), 100)
return TradingConfig(
timeframes={
"scalping": "1m",
"short": "5m",
"medium": "1h",
"daily": "1d",
"weekly": "1w"
},
backtest_periods={
"scalping": 100,
"short": 50,
"medium": 24,
"daily": 7,
"weekly": 4
},
momentum_periods={
"crypto": {
"scalping": {"short": 10, "long": 60},
"short": {"short": 5, "long": 20},
"medium": {"short": 4, "long": 12},
"daily": {"short": 7, "long": 30},
"weekly": {"short": 4, "long": 12}
},
"forex": {
"scalping": {"short": 20, "long": 120},
"short": {"short": 10, "long": 50},
"medium": {"short": 6, "long": 24},
"daily": {"short": 5, "long": 20},
"weekly": {"short": 3, "long": 10}
}
},
signal_thresholds={
"crypto": {
"scalping": {"momentum_short": 0.01, "rsi_min": 55, "rsi_max": 70, "macd_min": 0},
"short": {"momentum_short": 0.03, "rsi_min": 52, "rsi_max": 68, "macd_min": 0},
"medium": {"momentum_short": 0.05, "rsi_min": 50, "rsi_max": 65, "macd_min": 0},
"daily": {"momentum_short": 0.06, "rsi_min": 50, "rsi_max": 65, "macd_min": 0},
"weekly": {"momentum_short": 0.15, "rsi_min": 45, "rsi_max": 70, "macd_min": 0}
},
"forex": {
"scalping": {"momentum_short": 0.002, "rsi_min": 50, "rsi_max": 70, "macd_min": 0},
"short": {"momentum_short": 0.005, "rsi_min": 48, "rsi_max": 68, "macd_min": 0},
"medium": {"momentum_short": 0.008, "rsi_min": 47, "rsi_max": 67, "macd_min": 0},
"daily": {"momentum_short": 0.01, "rsi_min": 45, "rsi_max": 65, "macd_min": 0},
"weekly": {"momentum_short": 0.03, "rsi_min": 40, "rsi_max": 70, "macd_min": 0}
}
},
trade_durations={
"scalping": 1800,
"short": 14400,
"medium": 86400,
"daily": 604800,
"weekly": 2592000
},
max_concurrent_requests=max_concurrent
)
class PortfolioSimulator:
"""
Simulates live portfolio equity curve, position breakdown, daily balance, CSV export, and plotting.
"""
def __init__(self, initial_capital=10000):
self.equity_curve = []
self.daily_balance = []
self.open_positions = {}
self.trade_log = []
self.initial_capital = initial_capital
self.current_balance = initial_capital
self.max_drawdown = 0.0
self.max_balance = initial_capital
self.trade_id = 0
def open_position(self, symbol, action, size, price, stop, tp, date):
self.trade_id += 1
self.open_positions[symbol] = {
'entry': price,
'size': size,
'stop': stop,
'tp': tp,
'action': action,
'trade_id': self.trade_id,
'date': date,
'pnl': 0.0
}
self.trade_log.append({
'date': date,
'equity': self.current_balance,
'trade_id': self.trade_id,
'symbol': symbol,
'action': action,
'size': size,
'price': price,
'pnl': 0.0
})
def close_position(self, symbol, exit_price, date, reason):
if symbol in self.open_positions:
pos = self.open_positions[symbol]
pnl = (exit_price - pos['entry']) * pos['size'] if pos['action'] == 'buy' else (pos['entry'] - exit_price) * pos['size']
self.current_balance += pnl
self.max_balance = max(self.max_balance, self.current_balance)
drawdown = (self.max_balance - self.current_balance) / self.max_balance
self.max_drawdown = max(self.max_drawdown, drawdown)
self.equity_curve.append(self.current_balance)
self.daily_balance.append(self.current_balance)
self.trade_log.append({
'date': date,
'equity': self.current_balance,
'trade_id': pos['trade_id'],
'symbol': symbol,
'action': reason,
'size': pos['size'],
'price': exit_price,
'pnl': pnl
})
del self.open_positions[symbol]
def update_daily(self, date):
self.equity_curve.append(self.current_balance)
self.daily_balance.append(self.current_balance)
self.trade_log.append({
'date': date,
'equity': self.current_balance,
'trade_id': None,
'symbol': None,
'action': 'daily_update',
'size': None,
'price': None,
'pnl': None
})
def export_csv(self, filename='portfolio_equity.csv'):
import csv
with open(filename, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=['date', 'equity', 'trade_id', 'symbol', 'action', 'size', 'price', 'pnl'])
writer.writeheader()
for row in self.trade_log:
writer.writerow(row)
def plot_equity(self):
import matplotlib.pyplot as plt
eq = self.equity_curve
plt.figure(figsize=(12,6))
plt.plot(eq, label='Portfolio Equity', color='blue')
drawdowns = [self.max_drawdown * e for e in eq]
plt.scatter(range(len(eq)), drawdowns, color='red', label='Drawdown', s=10)
plt.title('Portfolio Equity Curve')
plt.xlabel('Time')
plt.ylabel('Equity')
plt.legend()
plt.tight_layout()
plt.show()
from abc import ABC, abstractmethod
# OptimizableAgent interface
class OptimizableAgent(ABC):
@abstractmethod
def get_hyperparameters(self) -> dict:
pass
@abstractmethod
def set_hyperparameters(self, params: dict) -> None:
pass
@abstractmethod
def validate_params(self, params: dict) -> bool:
pass
@abstractmethod
def evaluate(self) -> float:
pass
class TechnicalIndicators:
@staticmethod
def detect_market_regime(df: pd.DataFrame) -> Dict[str, any]:
"""
Detect current market regime: Bull, Bear, Sideways/Ranging
Returns dict with:
- regime: 'bull' | 'bear' | 'ranging'
- confidence: 0-100 (how confident in the classification)
- trend_strength: 0-100 (strength of trend if trending)
- volatility: 'low' | 'medium' | 'high'
- suggested_opportunity_threshold: adjusted threshold based on regime
"""
# Calculate multiple timeframe EMAs
ema_20 = df['close'].ewm(span=20, adjust=False).mean()
ema_50 = df['close'].ewm(span=50, adjust=False).mean()
ema_200 = df['close'].ewm(span=200, adjust=False).mean() if len(df) >= 200 else ema_50
current_price = df['close'].iloc[-1]
# Trend direction indicators
above_ema20 = current_price > ema_20.iloc[-1]
above_ema50 = current_price > ema_50.iloc[-1]
above_ema200 = current_price > ema_200.iloc[-1]
ema20_above_50 = ema_20.iloc[-1] > ema_50.iloc[-1]
ema50_above_200 = ema_50.iloc[-1] > ema_200.iloc[-1]
# ADX for trend strength (if available)
if 'adx' in df and pd.notna(df['adx'].iloc[-1]):
adx = df['adx'].iloc[-1]
else:
# Fallback: calculate simple ADX-like metric
high_low = df['high'] - df['low']
adx = high_low.tail(14).mean() / df['close'].iloc[-1] * 100
# Volatility (ATR-based)
if 'atr' in df and pd.notna(df['atr'].iloc[-1]):
atr = df['atr'].iloc[-1]
atr_pct = (atr / current_price) * 100
else:
high_low = df['high'] - df['low']
atr = high_low.tail(14).mean()
atr_pct = (atr / current_price) * 100
# Price trend over different periods
returns_20 = (df['close'].iloc[-1] / df['close'].iloc[-20] - 1) * 100 if len(df) >= 20 else 0
returns_50 = (df['close'].iloc[-1] / df['close'].iloc[-50] - 1) * 100 if len(df) >= 50 else 0
# Determine regime
bull_signals = sum([above_ema20, above_ema50, above_ema200, ema20_above_50, ema50_above_200])
bear_signals = sum([not above_ema20, not above_ema50, not above_ema200, not ema20_above_50, not ema50_above_200])
# Ranging detection: price oscillating around EMAs with low ADX
price_volatility = df['close'].tail(20).std() / df['close'].tail(20).mean() * 100
# Classification logic
if adx < 20 and price_volatility < 3:
# Low trend strength + low volatility = ranging
regime = 'ranging'
confidence = min(100, (20 - adx) * 5 + (3 - price_volatility) * 10)
trend_strength = adx
elif bull_signals >= 4:
regime = 'bull'
confidence = min(100, bull_signals * 20 + (returns_20 if returns_20 > 0 else 0))
trend_strength = adx
elif bear_signals >= 4:
regime = 'bear'
confidence = min(100, bear_signals * 20 + (abs(returns_20) if returns_20 < 0 else 0))
trend_strength = adx
elif bull_signals > bear_signals:
regime = 'bull'
confidence = min(80, bull_signals * 15)
trend_strength = adx
elif bear_signals > bull_signals:
regime = 'bear'
confidence = min(80, bear_signals * 15)
trend_strength = adx
else:
regime = 'ranging'
confidence = 50
trend_strength = adx
# Volatility classification
if atr_pct < 1.5:
volatility = 'low'
elif atr_pct < 3.5:
volatility = 'medium'
else:
volatility = 'high'
# Adjust opportunity thresholds based on regime
# In bull markets: slightly lower threshold (easier to find good longs)
# In bear markets: higher threshold (harder to find good longs)
# In ranging: much higher threshold (wait for breakouts)
if regime == 'bull':
opportunity_threshold = 60 # Lower bar in bull market
elif regime == 'bear':
opportunity_threshold = 75 # Higher bar in bear market
else: # ranging
opportunity_threshold = 80 # Very high bar in ranging market
return {
'regime': regime,
'confidence': round(confidence, 1),
'trend_strength': round(trend_strength, 1),
'volatility': volatility,
'atr_pct': round(atr_pct, 2),
'suggested_opportunity_threshold': opportunity_threshold,
'ema_alignment': {
'price_above_20': above_ema20,
'price_above_50': above_ema50,
'price_above_200': above_ema200,
'ema20_above_50': ema20_above_50,
'ema50_above_200': ema50_above_200
},
'returns': {
'20d': round(returns_20, 2),
'50d': round(returns_50, 2)
}
}
@staticmethod
def fib_levels(df: pd.DataFrame,
lookback: int = 55,
mode: str = "swing") -> dict:
"""
Returns dict with:
retracements: 0.0, 0.236, 0.382, 0.5, 0.618, 0.786, 1.0
extensions: 1.272, 1.618, 2.0
direction: 'bull' | 'bear'
swing_high, swing_low
"""
if len(df) < lookback:
return {}
highs = df['high'].iloc[-lookback:]
lows = df['low'].iloc[-lookback:]
# Swing detection
swing_high_idx = highs.idxmax()
swing_low_idx = lows.idxmin()
swing_high = highs.loc[swing_high_idx]
swing_low = lows.loc[swing_low_idx]
# Determine if last swing is up or down
if swing_high_idx > swing_low_idx:
direction = "bull" # retracement from high
base, top = swing_low, swing_high
else:
direction = "bear" # retracement from low
base, top = swing_high, swing_low
diff = abs(top - base)
retracements = {
0.0: top,
0.236: top - 0.236 * diff,
0.382: top - 0.382 * diff,
0.5: top - 0.5 * diff,
0.618: top - 0.618 * diff,
0.786: top - 0.786 * diff,
1.0: base,
}
extensions = {
1.272: top + 0.272 * diff if direction == "bull" else base - 0.272 * diff,
1.618: top + 0.618 * diff if direction == "bull" else base - 0.618 * diff,
2.0: top + 1.0 * diff if direction == "bull" else base - 1.0 * diff,
}
# current price vs nearest fib
current = df['close'].iloc[-1]
nearest_r = min(retracements.values(), key=lambda x: abs(x - current))
nearest_e = min(extensions.values(), key=lambda x: abs(x - current))
return {
"direction": direction,
"swing_high": swing_high,
"swing_low": swing_low,
"retracements": retracements,
"extensions": extensions,
"nearest_retracement": nearest_r,
"nearest_extension": nearest_e,
"distance_to_nearest_r": (current - nearest_r) / current,
"distance_to_nearest_e": (current - nearest_e) / current,
}
@staticmethod
def fib_confluence_score(fib_dict: dict,
poc: float,
vwap: float,
tolerance: float = 0.005) -> float:
"""
0–100 score: 100 = price sits on fib + poc + vwap within tolerance
"""
if not fib_dict:
return 0.0
current = fib_dict["retracements"][0.0] - fib_dict["distance_to_nearest_r"] * fib_dict["retracements"][0.0]
confluence = 0
for level in list(fib_dict["retracements"].values()) + list(fib_dict["extensions"].values()):
if abs(current - level) / current < tolerance:
confluence += 20
if abs(current - poc) / current < tolerance:
confluence += 20
if abs(current - vwap) / current < tolerance:
confluence += 20
return min(100, confluence)
"""Class to handle all technical indicator calculations"""
@staticmethod
def calculate_rsi(prices: pd.Series, period: int = 14) -> float:
if TA_AVAILABLE and callable(rsi):
try:
rsi_result = rsi(prices, window=period)
rsi_series = pd.Series(rsi_result)
return rsi_series.iloc[-1]
except Exception as e:
logger.warning(f"Error using ta.momentum.rsi: {e}")
delta = prices.diff().astype(float)
gain = np.where(delta > 0, delta, 0)
loss = np.where(delta < 0, -delta, 0)
avg_gain = pd.Series(gain).rolling(window=period).mean().iloc[-1]
avg_loss = pd.Series(loss).rolling(window=period).mean().iloc[-1]
if avg_loss == 0:
return 100
rs = avg_gain / avg_loss
return 100 - (100 / (1 + rs))
@staticmethod
def calculate_macd(prices: pd.Series, fast: int = 12, slow: int = 26, signal: int = 9) -> float:
ema_fast = prices.ewm(span=fast, adjust=False).mean()
ema_slow = prices.ewm(span=slow, adjust=False).mean()
macd_line = ema_fast - ema_slow
signal_line = macd_line.ewm(span=signal, adjust=False).mean()
return macd_line.iloc[-1] - signal_line.iloc[-1]
@staticmethod
def calculate_momentum(prices: pd.Series, period: int) -> float:
return prices.pct_change(periods=period).iloc[-1]
@staticmethod
def calculate_ichimoku(df: pd.DataFrame) -> pd.DataFrame:
if len(df) < 26:
return df.assign(tenkan_sen=None, kijun_sen=None, senkou_a=None, senkou_b=None, cloud_green=None)
high = df['high']
low = df['low']
close = df['close']
tenkan_sen = (high.rolling(9).max() + low.rolling(9).min()) / 2
kijun_sen = (high.rolling(26).max() + low.rolling(26).min()) / 2
senkou_a = ((tenkan_sen + kijun_sen) / 2).shift(26)
senkou_b = (high.rolling(52).max() + low.rolling(52).min()) / 2
senkou_b = senkou_b.shift(26)
cloud_green = (senkou_a > senkou_b).astype(int)
return df.assign(
tenkan_sen=tenkan_sen,
kijun_sen=kijun_sen,
senkou_a=senkou_a,
senkou_b=senkou_b,
cloud_green=cloud_green
)
@staticmethod
def calculate_vwap(df: pd.DataFrame) -> pd.Series:
if len(df) < 2:
return pd.Series([None] * len(df), index=df.index)
typical_price = (df['high'] + df['low'] + df['close']) / 3
vwap = (typical_price * df['volume']).cumsum() / df['volume'].cumsum()
return vwap
@staticmethod
def rsi_bearish_divergence(df: pd.DataFrame, lookback: int = 10) -> bool:
if len(df) < lookback + 2 or 'rsi' not in df:
return False
price = df['close']
rsi = df['rsi']
return (price.iloc[-1] > price.iloc[-lookback]) and (rsi.iloc[-1] < rsi.iloc[-lookback])
@staticmethod
def calculate_volume_profile(df: pd.DataFrame, bins: int = 50) -> tuple:
"""Calculate volume profile and Point of Control (POC)"""
if len(df) < 2 or 'volume' not in df or 'close' not in df:
return None, None
price = df['close']
volume = df['volume']
price_bins = np.linspace(price.min(), price.max(), bins)
volume_hist, bin_edges = np.histogram(price, bins=price_bins, weights=volume)
poc_index = np.argmax(volume_hist)
poc_price = (bin_edges[poc_index] + bin_edges[poc_index + 1]) / 2
return volume_hist, poc_price
@staticmethod
def calculate_anchored_volume_profile(df: pd.DataFrame, anchor_index: int, bins: int = 50) -> tuple:
"""Calculate anchored volume profile from a specific index"""
if len(df) < anchor_index + 2 or 'volume' not in df or 'close' not in df:
return None, None
df_anchored = df.iloc[anchor_index:]
# Ensure df_anchored is a DataFrame, not a Series
if isinstance(df_anchored, pd.Series):
df_anchored = df_anchored.to_frame().T
return TechnicalIndicators.calculate_volume_profile(df_anchored, bins)
@staticmethod
def calculate_fixed_range_volume_profile(df: pd.DataFrame, price_range: float, bins: int = 50) -> tuple:
"""Calculate volume profile within a fixed price range"""
if len(df) < 2 or 'volume' not in df or 'close' not in df:
return None, None
price = df['close']
price_min = price.iloc[-1] - price_range / 2
price_max = price.iloc[-1] + price_range / 2
df_range = df[(price >= price_min) & (price <= price_max)]
return TechnicalIndicators.calculate_volume_profile(df_range, bins)
@staticmethod
def calculate_opportunity_score(
momentum_short: float,
momentum_long: float,
rsi: float,
macd: float,
bb_position: Optional[float],
trend_score: float,
volume_ratio: float,
stoch_k: Optional[float] = None,
rsi_bearish_div: bool = False
) -> float:
"""
Calculate opportunity score that identifies the BEST entry points, not just momentum.
Penalizes overbought/oversold conditions and rewards pullbacks in trends.
Returns a score from 0-100 where higher means better opportunity.
"""
# 1. RSI Opportunity Score (favor 30-50 for longs, 50-70 for shorts)
# Penalize extreme overbought (>70) and oversold (<30)
if rsi < 30:
rsi_opp = 0.3 # Oversold - risky catching falling knife
elif 30 <= rsi < 45:
rsi_opp = 1.0 # Sweet spot - pullback in uptrend
elif 45 <= rsi < 55:
rsi_opp = 0.8 # Neutral - okay
elif 55 <= rsi < 70:
rsi_opp = 0.5 # Getting extended
else: # rsi >= 70
rsi_opp = 0.2 # Overbought - poor entry point
# 2. Bollinger Band Position Score (favor lower BB positions for longs)
if bb_position is not None:
if bb_position < 0.3:
bb_opp = 1.0 # Near lower band - good entry for reversal
elif 0.3 <= bb_position < 0.5:
bb_opp = 0.9 # Below midline - good
elif 0.5 <= bb_position < 0.7:
bb_opp = 0.6 # Above midline - okay
else: # bb_position >= 0.7
bb_opp = 0.2 # Near upper band - overbought
else:
bb_opp = 0.5 # No BB data
# 3. Stochastic Opportunity (favor oversold stoch with bullish trend)
if stoch_k is not None:
if stoch_k < 20:
stoch_opp = 1.0 if momentum_long > 0 else 0.3 # Oversold in uptrend = buy
elif 20 <= stoch_k < 40:
stoch_opp = 0.9
elif 40 <= stoch_k < 60:
stoch_opp = 0.7
elif 60 <= stoch_k < 80:
stoch_opp = 0.4
else: # stoch_k >= 80
stoch_opp = 0.1 # Overbought
else:
stoch_opp = 0.5
# 4. Momentum Context (positive long-term, moderate short-term = pullback)
if momentum_long > 0.001: # Uptrend
if -0.005 < momentum_short < 0.002: # Slight pullback or consolidation
momentum_opp = 1.0 # Perfect - pullback in uptrend
elif momentum_short > 0.005: # Strong short-term momentum
momentum_opp = 0.4 # Already running hot
else:
momentum_opp = 0.6
elif momentum_long < -0.001: # Downtrend
if -0.002 < momentum_short < 0.005: # Slight bounce
momentum_opp = 1.0 # Good short opportunity
else:
momentum_opp = 0.5
else: # Flat
momentum_opp = 0.5
# 5. Divergence Penalty (bearish divergence = bad for longs)
divergence_penalty = 0.5 if rsi_bearish_div else 1.0
# 6. Volume Context (high volume on pullbacks = accumulation)
if volume_ratio > 1.5:
vol_opp = 1.0 if rsi < 55 else 0.3 # High vol + not overbought = good
elif volume_ratio > 1.2:
vol_opp = 0.8
elif volume_ratio > 0.8:
vol_opp = 0.6
else:
vol_opp = 0.4 # Low volume = lack of conviction
# 7. Trend Quality (strong trend = better context for pullbacks)
trend_opp = min(max(trend_score / 10, 0), 1)
# 8. MACD Opportunity (slight negative MACD in uptrend = pullback)
if momentum_long > 0 and -0.5 < macd < 0:
macd_opp = 1.0 # Pullback in uptrend
elif macd > 0:
macd_opp = 0.7 if macd < 2 else 0.3 # Moderate positive vs. overextended
else:
macd_opp = 0.5
# Weighted combination emphasizing key opportunity factors
opportunity = (
rsi_opp * 0.25 + # RSI is critical for overbought/oversold
bb_opp * 0.20 + # BB position shows value
stoch_opp * 0.15 + # Stochastic confirms
momentum_opp * 0.15 + # Pullback detection
vol_opp * 0.10 + # Volume confirmation
trend_opp * 0.10 + # Trend quality
macd_opp * 0.05 # MACD context
) * divergence_penalty
return round(opportunity * 100, 2)
@staticmethod
def calculate_composite_score(
momentum_short: float,
momentum_long: float,
rsi: float,
macd: float,
trend_score: float,
volume_ratio: float,
ichimoku_bullish: bool,
fib_confluence: float = 0.0,
weights: Optional[Dict[str, float]] = None
) -> float:
"""Calculate a composite score combining multiple indicators, including optional fib_confluence."""
weights = weights or {
'momentum_short': 0.2,
'momentum_long': 0.15,
'rsi': 0.2,
'macd': 0.15,
'trend_score': 0.2,
'volume_ratio': 0.1,
'ichimoku': 0.1,
'fib_confluence': 0.15
}
mom_short_score = min(max(abs(momentum_short) * 1000, 0), 1)
mom_long_score = min(max(abs(momentum_long) * 500, 0), 1)
rsi_score = min(max((rsi - 50) / 30, 0), 1) if rsi >= 50 else min(max((50 - rsi) / 30, 0), 1)
macd_score = min(max(abs(macd) * 50, 0), 1)
trend_score_norm = min(max(trend_score / 10, 0), 1)
vol_score = min(max((volume_ratio - 1) / 1.5, 0), 1)
ichimoku_score = 1.0 if ichimoku_bullish else 0.0
fib_score = min(max(fib_confluence / 100, 0), 1)
score = (
mom_short_score * weights['momentum_short'] +
mom_long_score * weights['momentum_long'] +
rsi_score * weights['rsi'] +
macd_score * weights['macd'] +
trend_score_norm * weights['trend_score'] +
vol_score * weights['volume_ratio'] +
ichimoku_score * weights['ichimoku'] +
fib_score * weights.get('fib_confluence', 0.15)
)
return round(score * 100, 2)
@staticmethod
def calculate_volume_composite_score(
volume_ratio: float,
volume_hist: np.ndarray,
poc_distance: float,
weights: Optional[Dict[str, float]] = None
) -> float:
"""Calculate a volume-weighted composite score"""
weights = weights or {
'volume_ratio': 0.5,
'volume_hist_max': 0.3,
'poc_distance': 0.2
}
vol_ratio_score = min(max((volume_ratio - 1) / 1.5, 0), 1)
vol_hist_score = min(max(np.max(volume_hist) / np.sum(volume_hist) if volume_hist is not None else 0, 0), 1)
poc_dist_score = min(max(1 - abs(poc_distance) / 0.05, 0), 1)
score = (
vol_ratio_score * weights['volume_ratio'] +
vol_hist_score * weights['volume_hist_max'] +
poc_dist_score * weights['poc_distance']
)
return round(score * 100, 2)
@staticmethod
def calculate_stop_loss_take_profit(
current_price: float,
df: pd.DataFrame,
signal: str,
atr: Optional[float] = None,
bb_lower: Optional[float] = None,
bb_upper: Optional[float] = None,
support_level: Optional[float] = None,
resistance_level: Optional[float] = None,
risk_reward_ratio: float = 2.5
) -> Dict[str, float]:
"""
Calculate optimal stop-loss and take-profit levels based on multiple methods.
Returns a dict with entry, stop_loss, take_profit, risk_amount, reward_amount, and risk_reward_ratio
"""
# Calculate ATR if not provided
if atr is None and 'atr' in df:
atr = df['atr'].iloc[-1]
elif atr is None:
# Fallback: calculate simple ATR from last 14 periods
high_low = df['high'] - df['low']
atr = high_low.tail(14).mean() if len(df) >= 14 else high_low.mean()
# Calculate recent swing high/low (last 20 periods)
recent_data = df.tail(20) if len(df) >= 20 else df
swing_low = recent_data['low'].min()
swing_high = recent_data['high'].max()
# Determine support and resistance if not provided
if support_level is None:
# Use swing low or Bollinger lower band as support
support_level = bb_lower if bb_lower is not None else swing_low
if resistance_level is None:
# Use swing high or Bollinger upper band as resistance
resistance_level = bb_upper if bb_upper is not None else swing_high
# Calculate stop-loss and take-profit based on signal direction
if signal in ['Strong Buy', 'Buy', 'Weak Buy']:
# LONG POSITION
# Stop Loss Methods:
# 1. ATR-based: 1.5-2x ATR below entry
# 2. Support-based: Just below recent support
# 3. Percentage-based: 2-3% below entry
atr_stop = current_price - (atr * 1.5)
support_stop = support_level * 0.995 # 0.5% below support
percentage_stop = current_price * 0.97 # 3% stop
# Use the tightest reasonable stop (but not too tight)
stop_loss_candidates = [atr_stop, support_stop, percentage_stop]
# Filter out stops that are too close (< 0.5%) or too far (> 8%)
valid_stops = [
s for s in stop_loss_candidates
if 0.005 < (current_price - s) / current_price < 0.08
]
stop_loss = max(valid_stops) if valid_stops else atr_stop
# Take Profit: Based on risk/reward ratio and resistance
risk_amount = current_price - stop_loss
reward_by_rr = current_price + (risk_amount * risk_reward_ratio)
# Consider resistance as a take-profit zone
resistance_tp = resistance_level * 0.995 # Slightly before resistance
# Use closer of RR-based or resistance-based TP
if resistance_tp > current_price and resistance_tp < reward_by_rr:
take_profit = resistance_tp
actual_rr = (take_profit - current_price) / risk_amount
else:
take_profit = reward_by_rr
actual_rr = risk_reward_ratio
elif signal in ['Strong Sell', 'Sell', 'Weak Sell']:
# SHORT POSITION
atr_stop = current_price + (atr * 1.5)
resistance_stop = resistance_level * 1.005 # 0.5% above resistance
percentage_stop = current_price * 1.03 # 3% stop
stop_loss_candidates = [atr_stop, resistance_stop, percentage_stop]
valid_stops = [
s for s in stop_loss_candidates
if 0.005 < (s - current_price) / current_price < 0.08
]
stop_loss = min(valid_stops) if valid_stops else atr_stop
risk_amount = stop_loss - current_price
reward_by_rr = current_price - (risk_amount * risk_reward_ratio)
support_tp = support_level * 1.005 # Slightly above support
if support_tp < current_price and support_tp > reward_by_rr:
take_profit = support_tp
actual_rr = (current_price - take_profit) / risk_amount
else:
take_profit = reward_by_rr
actual_rr = risk_reward_ratio
else:
# NEUTRAL - provide conservative levels
stop_loss = current_price * 0.97
take_profit = current_price * 1.03
risk_amount = current_price - stop_loss
actual_rr = (take_profit - current_price) / risk_amount if risk_amount > 0 else 0
return {
'entry_price': round(current_price, 8),
'stop_loss': round(stop_loss, 8),
'take_profit': round(take_profit, 8),
'risk_amount': round(abs(current_price - stop_loss), 8),
'reward_amount': round(abs(take_profit - current_price), 8),
'risk_reward_ratio': round(actual_rr, 2),
'stop_loss_pct': round(((stop_loss - current_price) / current_price) * 100, 2),
'take_profit_pct': round(((take_profit - current_price) / current_price) * 100, 2),
'support_level': round(support_level, 8) if support_level else None,
'resistance_level': round(resistance_level, 8) if resistance_level else None
}
@staticmethod
def calculate_position_size(
account_balance: float,
risk_per_trade_pct: float,
entry_price: float,
stop_loss: float,
leverage: float = 1.0,
fee_rate: float = 0.001
) -> Dict[str, float]:
"""
Calculate optimal position size based on account size and risk management.
Args:
account_balance: Total account balance in USD
risk_per_trade_pct: Percentage of account to risk (e.g., 2 for 2%)
entry_price: Planned entry price
stop_loss: Stop-loss price
leverage: Leverage to use (default 1 = no leverage)
fee_rate: Trading fee rate (default 0.1%)
Returns:
Dict with position_size, units, risk_amount, position_value, and more
"""
# Calculate risk amount in USD
risk_amount_usd = account_balance * (risk_per_trade_pct / 100)
# Calculate stop distance as percentage
stop_distance_pct = abs((entry_price - stop_loss) / entry_price)
# Calculate position size without leverage
# Risk Amount = Position Size × Stop Distance %
# Position Size = Risk Amount / Stop Distance %
base_position_size = risk_amount_usd / stop_distance_pct
# Apply leverage
position_value = base_position_size * leverage
# Calculate number of units/coins
units = position_value / entry_price
# Calculate fees (entry + exit)
entry_fee = position_value * fee_rate
exit_fee = position_value * fee_rate
total_fees = entry_fee + exit_fee
# Adjust for fees
adjusted_risk = risk_amount_usd - total_fees
adjusted_position_value = max(0, position_value - total_fees)
# Calculate margin required (for leveraged positions)
margin_required = position_value / leverage
# Calculate liquidation price (approximate, for leveraged positions)
if leverage > 1:
# Simplified liquidation calc: Entry - (Margin / Position Size)
liquidation_buffer = margin_required * 0.9 # 90% of margin before liquidation
liquidation_price = entry_price - (liquidation_buffer / units) if stop_loss < entry_price else entry_price + (liquidation_buffer / units)
else:
liquidation_price = 0 # No liquidation without leverage
# Risk/Reward metrics
stop_distance_usd = abs(entry_price - stop_loss) * units
# Warnings
warnings = []
if margin_required > account_balance:
warnings.append("Insufficient balance for this position")
if margin_required > account_balance * 0.5:
warnings.append("Position uses >50% of account (high risk)")
if leverage > 3:
warnings.append(f"High leverage ({leverage}x) - increased liquidation risk")
if risk_per_trade_pct > 3:
warnings.append(f"Risking {risk_per_trade_pct}% per trade (recommended: 1-2%)")
if liquidation_price != 0 and ((stop_loss < entry_price and liquidation_price > stop_loss) or (stop_loss > entry_price and liquidation_price < stop_loss)):
warnings.append("Liquidation price is beyond stop-loss - very risky!")
return {
'position_value': round(position_value, 2),
'units': round(units, 8),
'margin_required': round(margin_required, 2),
'risk_amount_usd': round(risk_amount_usd, 2),
'adjusted_risk_usd': round(adjusted_risk, 2),
'total_fees': round(total_fees, 2),
'stop_distance_pct': round(stop_distance_pct * 100, 2),
'stop_distance_usd': round(stop_distance_usd, 2),
'leverage': leverage,
'liquidation_price': round(liquidation_price, 8) if liquidation_price > 0 else None,
'account_balance': account_balance,
'risk_per_trade_pct': risk_per_trade_pct,
'margin_usage_pct': round((margin_required / account_balance) * 100, 2),
'warnings': warnings,
'safe_to_trade': len([w for w in warnings if 'Insufficient' in w or 'Liquidation' in w]) == 0
}
@staticmethod
def add_all_indicators(df: pd.DataFrame) -> pd.DataFrame:
df = TechnicalIndicators.calculate_ichimoku(df)
df['vwap'] = TechnicalIndicators.calculate_vwap(df)
volume_hist, poc_price = TechnicalIndicators.calculate_volume_profile(df)
anchor_index = df['high'].idxmax() if not df['high'].empty else 0
# Ensure anchor_index is an int
if not isinstance(anchor_index, int):
try:
anchor_index = int(anchor_index[0])
except Exception:
anchor_index = 0
anchored_volume_hist, anchored_poc = TechnicalIndicators.calculate_anchored_volume_profile(df, anchor_index)
price_range = (df['high'].max() - df['low'].min()) * 0.2 if not df.empty else 0.01
fixed_volume_hist, fixed_poc = TechnicalIndicators.calculate_fixed_range_volume_profile(df, price_range)
df['volume_hist'] = [volume_hist] * len(df)
df['poc_price'] = poc_price
df['anchored_volume_hist'] = [anchored_volume_hist] * len(df)
df['anchored_poc'] = anchored_poc
df['fixed_volume_hist'] = [fixed_volume_hist] * len(df)
df['fixed_poc'] = fixed_poc
if not TA_AVAILABLE:
logger.warning("TA library not available, using basic indicators only")
df['rsi'] = TechnicalIndicators.calculate_rsi(df['close'])
df['bb_upper'] = None
df['bb_middle'] = None
df['bb_lower'] = None
df['bb_width'] = None
df['stoch_k'] = None
df['stoch_d'] = None