-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscanner_api.py
More file actions
1221 lines (1023 loc) · 45.4 KB
/
scanner_api.py
File metadata and controls
1221 lines (1023 loc) · 45.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Flask API wrapper for the Momentum Scanner
Provides REST endpoints to trigger scans and retrieve results
"""
from flask import Flask, request, jsonify
from flask_cors import CORS
import asyncio
import logging
from datetime import datetime
from scanner import MomentumScanner, get_dynamic_config, TechnicalIndicators
from continuous_scanner import ContinuousMultiTimeframeScanner, StreamConfig
import pandas as pd
from typing import Optional, List, Dict
import json
import threading
import ccxt.async_support as ccxt_async
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
app = Flask(__name__)
CORS(app) # Enable CORS for all routes
# Global scanner instances
scanner: Optional[MomentumScanner] = None
continuous_scanner: Optional[ContinuousMultiTimeframeScanner] = None
continuous_scanner_task: Optional[asyncio.Task] = None
continuous_scanner_loop: Optional[asyncio.AbstractEventLoop] = None
last_scan_results: Optional[pd.DataFrame] = None
last_scan_timestamp: Optional[datetime] = None
async def initialize_exchange(exchange_id: str = 'kucoinfutures'):
"""Initialize and return a ccxt exchange instance"""
try:
exchange_class = getattr(ccxt_async, exchange_id)
exchange = exchange_class({
'enableRateLimit': True,
'timeout': 30000,
'options': {
'defaultType': 'future', # or 'spot'
'recvWindow': 10000
}
})
await exchange.load_markets()
logger.info(f"Exchange {exchange_id} initialized successfully")
return exchange
except Exception as e:
logger.error(f"Failed to initialize exchange {exchange_id}: {e}")
raise
def get_scanner(loop, exchange_id='kucoinfutures'):
"""Create a fresh scanner instance for the given event loop"""
try:
# Initialize exchange synchronously with the provided loop
exchange = loop.run_until_complete(initialize_exchange(exchange_id))
config = get_dynamic_config()
scanner_instance = MomentumScanner(
exchange=exchange,
config=config,
market_type='crypto',
quote_currency='USDT',
top_n=50,
min_volume_usd=100000
)
logger.info(f"Scanner instance created successfully for exchange: {exchange_id}")
return scanner_instance
except Exception as e:
logger.error(f"Failed to create scanner for {exchange_id}: {e}")
raise
async def scan_single_exchange_async(exchange_id: str, timeframe: str, full_analysis: bool = True) -> tuple:
"""
Scan a single exchange asynchronously
Returns: (exchange_id, results_df, duration, error)
"""
start_time = datetime.now()
try:
logger.info(f"⚡ Starting async scan for {exchange_id}")
# Initialize exchange
init_start = datetime.now()
exchange = await initialize_exchange(exchange_id)
init_duration = (datetime.now() - init_start).total_seconds()
# Create scanner
config = get_dynamic_config()
scanner_instance = MomentumScanner(
exchange=exchange,
config=config,
market_type='crypto',
quote_currency='USDT',
top_n=50,
min_volume_usd=100000
)
# Run scan
scan_start = datetime.now()
results = await scanner_instance.scan_market(
timeframe=timeframe,
full_analysis=full_analysis,
save_results=False
)
scan_duration = (datetime.now() - scan_start).total_seconds()
# Clean up
await exchange.close()
total_duration = (datetime.now() - start_time).total_seconds()
logger.info(f"✅ {exchange_id} scan completed in {total_duration:.2f}s (init: {init_duration:.2f}s, scan: {scan_duration:.2f}s)")
return (exchange_id, results, total_duration, None)
except Exception as e:
total_duration = (datetime.now() - start_time).total_seconds()
logger.error(f"❌ {exchange_id} scan failed after {total_duration:.2f}s: {str(e)}")
return (exchange_id, pd.DataFrame(), total_duration, str(e))
async def scan_multiple_exchanges_parallel(exchanges: List[str], timeframe: str, full_analysis: bool = True) -> Dict:
"""
Scan multiple exchanges in parallel using asyncio.gather
Args:
exchanges: List of exchange IDs to scan
timeframe: Timeframe for scanning
full_analysis: Whether to run full analysis
Returns:
Dictionary with aggregated results and metadata
"""
parallel_start = datetime.now()
logger.info("="*80)
logger.info(f"🚀 PARALLEL SCAN STARTED")
logger.info(f" Exchanges: {', '.join(exchanges)}")
logger.info(f" Timeframe: {timeframe}")
logger.info(f" Full Analysis: {full_analysis}")
logger.info(f" Start Time: {parallel_start.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}")
logger.info("="*80)
# Run all exchanges in parallel
tasks = [scan_single_exchange_async(ex, timeframe, full_analysis) for ex in exchanges]
results = await asyncio.gather(*tasks, return_exceptions=False)
parallel_end = datetime.now()
total_duration = (parallel_end - parallel_start).total_seconds()
# Aggregate results
all_results = []
exchange_metadata = {}
successful_scans = 0
failed_scans = 0
for exchange_id, df, duration, error in results:
exchange_metadata[exchange_id] = {
'duration_seconds': round(duration, 2),
'success': error is None,
'error': error,
'signals_found': len(df) if not df.empty else 0
}
if error is None:
successful_scans += 1
all_results.append((exchange_id, df))
else:
failed_scans += 1
# Calculate efficiency gain
sequential_time = sum(meta['duration_seconds'] for meta in exchange_metadata.values())
speedup = sequential_time / total_duration if total_duration > 0 else 1
time_saved = sequential_time - total_duration
logger.info("="*80)
logger.info(f"✅ PARALLEL SCAN COMPLETED")
logger.info(f" Successful: {successful_scans}/{len(exchanges)} exchanges")
logger.info(f" Failed: {failed_scans}/{len(exchanges)} exchanges")
logger.info(f" Parallel Time: {total_duration:.2f} seconds")
logger.info(f" Sequential Time (estimated): {sequential_time:.2f} seconds")
logger.info(f" Speedup: {speedup:.2f}x faster")
logger.info(f" Time Saved: {time_saved:.2f} seconds ({(time_saved/sequential_time*100):.1f}%)")
logger.info(f" End Time: {parallel_end.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}")
logger.info("="*80)
return {
'results': all_results,
'exchange_metadata': exchange_metadata,
'parallel_duration': round(total_duration, 2),
'sequential_duration_estimated': round(sequential_time, 2),
'speedup': round(speedup, 2),
'time_saved': round(time_saved, 2),
'successful_scans': successful_scans,
'failed_scans': failed_scans
}
def format_signal_for_api(row: pd.Series, exchange: str = 'kucoinfutures') -> dict:
"""Format a scanner result row for API response"""
try:
# Map signal types to BUY/SELL/HOLD
signal_mapping = {
'Strong Buy': 'BUY',
'Buy': 'BUY',
'Weak Buy': 'BUY',
'Strong Sell': 'SELL',
'Sell': 'SELL',
'Weak Sell': 'SELL',
'Neutral': 'HOLD'
}
signal_type = signal_mapping.get(row.get('signal', 'Neutral'), 'HOLD')
# Calculate price change percentage
momentum_short = row.get('momentum_short', 0)
change = momentum_short * 100 if momentum_short else 0
return {
'id': f"{row.get('symbol', 'UNKNOWN')}_{int(datetime.now().timestamp())}",
'symbol': row.get('symbol', 'UNKNOWN'),
'exchange': exchange,
'timeframe': row.get('timeframe', '1h'),
'signal': signal_type,
'strength': min(100, max(0, int(row.get('signal_strength', 50) * 100))),
'price': float(row.get('price', 0)),
'change': float(change),
'volume': float(row.get('volume_usd', 0)),
'timestamp': row.get('timestamp', datetime.now()).isoformat() if hasattr(row.get('timestamp'), 'isoformat') else datetime.now().isoformat(),
'indicators': {
'rsi': float(row.get('rsi', 50)),
'macd': 'bullish' if row.get('macd', 0) > 0 else 'bearish',
'ema': 'above' if row.get('ema_5_13_bullish', False) else 'below',
'volume': 'very_high' if row.get('volume_ratio', 1) > 2 else 'high' if row.get('volume_ratio', 1) > 1.5 else 'medium'
},
'advanced': {
'opportunity_score': float(row.get('opportunity_score', 0)), # NEW: Best entry point score
'composite_score': float(row.get('composite_score', 0)),
'trend_score': float(row.get('trend_score', 0)),
'confidence_score': float(row.get('confidence_score', 0)),
'combined_score': float(row.get('combined_score', 0)), # Overall ranking score
'ichimoku_bullish': bool(row.get('ichimoku_bullish', False)),
'vwap_bullish': bool(row.get('vwap_bullish', False)),
'bb_position': float(row.get('bb_position', 0.5)) if pd.notna(row.get('bb_position')) else 0.5
},
'risk_reward': {
'entry_price': float(row.get('entry_price', row.get('price', 0))),
'stop_loss': float(row.get('stop_loss', 0)),
'take_profit': float(row.get('take_profit', 0)),
'risk_amount': float(row.get('risk_amount', 0)),
'reward_amount': float(row.get('reward_amount', 0)),
'risk_reward_ratio': float(row.get('risk_reward_ratio', 0)),
'stop_loss_pct': float(row.get('stop_loss_pct', 0)),
'take_profit_pct': float(row.get('take_profit_pct', 0)),
'support_level': float(row.get('support_level', 0)) if pd.notna(row.get('support_level')) else None,
'resistance_level': float(row.get('resistance_level', 0)) if pd.notna(row.get('resistance_level')) else None
},
'market_regime': {
'regime': row.get('market_regime', 'unknown'),
'confidence': float(row.get('regime_confidence', 0)),
'trend_strength': float(row.get('regime_trend_strength', 0)),
'volatility': row.get('regime_volatility', 'medium'),
'suggested_threshold': int(row.get('regime_suggested_threshold', 65))
}
}
except Exception as e:
logger.error(f"Error formatting signal: {e}")
return None
@app.route('/health', methods=['GET'])
def health_check():
"""Health check endpoint"""
return jsonify({
'status': 'healthy',
'service': 'scanner-api',
'timestamp': datetime.now().isoformat()
})
def _run_scan_background(data):
"""
Background thread function to execute scan asynchronously
This avoids HTTP timeout and keeps API responsive
Data dict is passed in (not request object)
"""
global last_scan_results, last_scan_timestamp
# Track scan timing
scan_start_time = datetime.now()
try:
# Extract parameters
timeframe = data.get('timeframe', 'medium')
exchange_param = data.get('exchange', 'kucoinfutures')
signal_filter = data.get('signal', 'all')
min_strength = data.get('minStrength', 50) / 100 # Convert to 0-1 range
full_analysis = data.get('fullAnalysis', True)
parallel_mode = data.get('parallel', False)
# Determine if parallel scanning (exchange is list or parallel=true)
if isinstance(exchange_param, list):
exchanges = exchange_param
parallel_mode = True
else:
exchanges = [exchange_param]
# Auto-enable parallel if multiple exchanges
if len(exchanges) > 1:
parallel_mode = True
# Auto-initialize scanner if not already initialized (for single exchange scans)
global scanner
if not parallel_mode and scanner is None:
logger.info(f"Auto-initializing scanner for {exchanges[0]}")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
scanner = get_scanner(loop, exchanges[0])
logger.info(f"Auto-initialized scanner for {exchanges[0]}")
except Exception as e:
logger.error(f"Failed to auto-initialize scanner: {e}")
raise
finally:
loop.close()
# === PARALLEL SCANNING MODE ===
if parallel_mode and len(exchanges) > 1:
logger.info("="*80)
logger.info(f"🚀 PARALLEL SCAN REQUESTED")
logger.info(f" Exchanges: {', '.join(exchanges)}")
logger.info(f" Timeframe: {timeframe}")
logger.info(f" Signal Filter: {signal_filter}")
logger.info(f" Min Strength: {min_strength * 100}%")
logger.info(f" Full Analysis: {full_analysis}")
logger.info(f" Start Time: {scan_start_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}")
logger.info("="*80)
# Create event loop for parallel scanning
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# Run parallel scan
parallel_result = loop.run_until_complete(
scan_multiple_exchanges_parallel(exchanges, timeframe, full_analysis)
)
finally:
loop.close()
# Process and aggregate results from all exchanges
all_results = []
filter_start = datetime.now()
for exchange_id, df in parallel_result['results']:
if not df.empty:
# Filter by signal strength
filtered = df[df['signal_strength'] >= min_strength]
# Filter by signal type
if signal_filter != 'all':
signal_mapping = {
'BUY': ['Strong Buy', 'Buy', 'Weak Buy'],
'SELL': ['Strong Sell', 'Sell', 'Weak Sell'],
'HOLD': ['Neutral']
}
if signal_filter in signal_mapping:
filtered = filtered[filtered['signal'].isin(signal_mapping[signal_filter])]
# Format signals
for _, row in filtered.iterrows():
formatted = format_signal_for_api(row, exchange_id)
if formatted:
all_results.append(formatted)
filter_duration = (datetime.now() - filter_start).total_seconds()
scan_end_time = datetime.now()
total_duration = (scan_end_time - scan_start_time).total_seconds()
# Store aggregated results
last_scan_timestamp = scan_end_time
logger.info(f"⏱️ Filtering and formatting took: {filter_duration:.2f} seconds")
logger.info(f"Parallel scan background task completed: {len(all_results)} signals found")
# === SINGLE EXCHANGE SCANNING MODE ===
exchange = exchanges[0]
# LOG: Scan start
logger.info("="*80)
logger.info(f"🚀 SCAN STARTED")
logger.info(f" Exchange: {exchange}")
logger.info(f" Timeframe: {timeframe}")
logger.info(f" Signal Filter: {signal_filter}")
logger.info(f" Min Strength: {min_strength * 100}%")
logger.info(f" Full Analysis: {full_analysis}")
logger.info(f" Start Time: {scan_start_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}")
logger.info("="*80)
# Create a new event loop for this scan
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Track initialization time
init_start = datetime.now()
try:
# Create fresh scanner instance with this loop and selected exchange
scanner_instance = get_scanner(loop, exchange_id=exchange)
init_duration = (datetime.now() - init_start).total_seconds()
logger.info(f"⏱️ Exchange initialization took: {init_duration:.2f} seconds")
# Track scan execution time
scan_exec_start = datetime.now()
# Run async scan
results = loop.run_until_complete(
scanner_instance.scan_market(
timeframe=timeframe,
full_analysis=full_analysis,
save_results=False
)
)
scan_exec_duration = (datetime.now() - scan_exec_start).total_seconds()
logger.info(f"⏱️ Market scan execution took: {scan_exec_duration:.2f} seconds")
# Clean up exchange connection
if hasattr(scanner_instance, 'exchange') and scanner_instance.exchange:
try:
loop.run_until_complete(scanner_instance.exchange.close())
except Exception as e:
logger.warning(f"Error closing exchange: {e}")
finally:
loop.close()
if results.empty:
scan_end_time = datetime.now()
total_duration = (scan_end_time - scan_start_time).total_seconds()
logger.warning("="*80)
logger.warning(f"⚠️ SCAN COMPLETED - NO RESULTS")
logger.warning(f" End Time: {scan_end_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}")
logger.warning(f" Total Duration: {total_duration:.2f} seconds")
logger.warning("="*80)
# Store results
last_scan_results = results
last_scan_timestamp = datetime.now()
# Track filtering time
filter_start = datetime.now()
# Filter by signal strength
filtered_results = results[results['signal_strength'] >= min_strength]
# Filter by signal type
if signal_filter != 'all':
signal_mapping = {
'BUY': ['Strong Buy', 'Buy', 'Weak Buy'],
'SELL': ['Strong Sell', 'Sell', 'Weak Sell'],
'HOLD': ['Neutral']
}
if signal_filter in signal_mapping:
filtered_results = filtered_results[
filtered_results['signal'].isin(signal_mapping[signal_filter])
]
# Format results for API
signals = []
for _, row in filtered_results.iterrows():
formatted = format_signal_for_api(row, exchange)
if formatted:
signals.append(formatted)
filter_duration = (datetime.now() - filter_start).total_seconds()
logger.info(f"⏱️ Filtering and formatting took: {filter_duration:.2f} seconds")
# Calculate total duration
scan_end_time = datetime.now()
total_duration = (scan_end_time - scan_start_time).total_seconds()
# LOG: Scan completion summary
logger.info("="*80)
logger.info(f"✅ SCAN COMPLETED SUCCESSFULLY")
logger.info(f" Exchange: {exchange}")
logger.info(f" Timeframe: {timeframe}")
logger.info(f" Total Symbols Scanned: {len(results)}")
logger.info(f" Signals After Filtering: {len(signals)}")
logger.info(f" End Time: {scan_end_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}")
logger.info(f" Total Duration: {total_duration:.2f} seconds")
logger.info(f" Performance Breakdown:")
logger.info(f" - Initialization: {init_duration:.2f}s ({(init_duration/total_duration*100):.1f}%)")
logger.info(f" - Scan Execution: {scan_exec_duration:.2f}s ({(scan_exec_duration/total_duration*100):.1f}%)")
logger.info(f" - Filtering: {filter_duration:.2f}s ({(filter_duration/total_duration*100):.1f}%)")
logger.info("="*80)
except Exception as e:
scan_end_time = datetime.now()
total_duration = (scan_end_time - scan_start_time).total_seconds()
logger.error("="*80)
logger.error(f"❌ SCAN FAILED")
logger.error(f" Exchange: {data.get('exchange', 'kucoinfutures')}")
logger.error(f" Timeframe: {data.get('timeframe', 'medium')}")
logger.error(f" Error: {str(e)}")
logger.error(f" End Time: {scan_end_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}")
logger.error(f" Duration Before Failure: {total_duration:.2f} seconds")
logger.error("="*80)
logger.error(f"Stack trace:", exc_info=True)
@app.route('/api/scanner/scan', methods=['POST'])
def trigger_scan():
"""
Trigger a market scan (single or parallel)
Returns 202 Accepted immediately - scan runs in background
Request body:
{
"timeframe": "1h" | "4h" | "1d" | "scalping" | "short" | "medium" | "daily",
"exchange": "binance" | "kucoinfutures" | "coinbase" | "kraken" | ["binance", "okx"],
"parallel": true | false (default: false, auto-enabled if exchange is array),
"signal": "all" | "BUY" | "SELL" | "HOLD",
"minStrength": 0-100,
"fullAnalysis": true | false
}
"""
try:
data = request.get_json() or {}
# Determine if parallel mode
exchange_param = data.get('exchange', 'kucoinfutures')
parallel_mode = data.get('parallel', False)
if isinstance(exchange_param, list):
exchanges = exchange_param
parallel_mode = True
else:
exchanges = [exchange_param]
if len(exchanges) > 1:
parallel_mode = True
mode_str = "parallel" if parallel_mode else "single"
logger.info(f"Scan request queued: {mode_str} mode, exchanges: {exchanges}")
# Spawn background thread to run scan
scan_thread = threading.Thread(target=_run_scan_background, args=(data,), daemon=True)
scan_thread.start()
# Return immediately with 202 Accepted
return jsonify({
'status': 'accepted',
'message': f'Scan queued in background ({mode_str} mode)',
'mode': mode_str,
'exchanges': exchanges,
'timeframe': data.get('timeframe', 'medium'),
'timestamp': datetime.now().isoformat(),
'note': 'Poll /api/scanner/status to check progress. Results will be available in /api/scanner/signals'
}), 202
except Exception as e:
logger.error(f"Error queueing scan: {str(e)}", exc_info=True)
return jsonify({
'error': str(e),
'message': 'Failed to queue scan'
}), 500
@app.route('/api/scanner/signals', methods=['GET'])
def get_signals():
"""
Get latest scan results with optional filtering
Query parameters:
- exchange: filter by exchange
- timeframe: filter by timeframe
- signal: filter by signal type (BUY/SELL/HOLD)
- minStrength: minimum signal strength (0-100)
"""
global last_scan_results
try:
if last_scan_results is None or last_scan_results.empty:
# Return empty results
return jsonify({
'signals': [],
'filters': {
'exchanges': ['binance', 'kucoinfutures', 'coinbase', 'kraken'],
'timeframes': ['1m', '5m', '15m', '1h', '4h', '1d'],
'signals': ['BUY', 'SELL', 'HOLD'],
'minStrength': 0,
'maxStrength': 100
},
'metadata': {
'count': 0,
'message': 'No scan results available. Please trigger a scan first.'
}
})
# Get query parameters
exchange_filter = request.args.get('exchange', 'all')
timeframe_filter = request.args.get('timeframe', 'all')
signal_filter = request.args.get('signal', 'all')
min_strength = float(request.args.get('minStrength', 0)) / 100
results = last_scan_results.copy()
# Apply filters
if min_strength > 0:
results = results[results['signal_strength'] >= min_strength]
if signal_filter != 'all':
signal_mapping = {
'BUY': ['Strong Buy', 'Buy', 'Weak Buy'],
'SELL': ['Strong Sell', 'Sell', 'Weak Sell'],
'HOLD': ['Neutral']
}
if signal_filter in signal_mapping:
results = results[results['signal'].isin(signal_mapping[signal_filter])]
if timeframe_filter != 'all':
results = results[results['timeframe'] == timeframe_filter]
# Format results
signals = []
for _, row in results.iterrows():
formatted = format_signal_for_api(row, exchange_filter if exchange_filter != 'all' else 'kucoinfutures')
if formatted:
signals.append(formatted)
return jsonify({
'signals': signals,
'filters': {
'exchanges': ['binance', 'kucoinfutures', 'coinbase', 'kraken'],
'timeframes': ['1m', '5m', '15m', '1h', '4h', '1d'],
'signals': ['BUY', 'SELL', 'HOLD'],
'minStrength': 0,
'maxStrength': 100
},
'metadata': {
'count': len(signals),
'last_scan': last_scan_timestamp.isoformat() if last_scan_timestamp else None
}
})
except Exception as e:
logger.error(f"Error retrieving signals: {str(e)}", exc_info=True)
return jsonify({
'error': str(e),
'message': 'Failed to retrieve signals'
}), 500
@app.route('/api/scanner/status', methods=['GET'])
def get_status():
"""Get scanner status and statistics"""
global scanner
is_initialized = scanner is not None
is_active = is_initialized # Only active if properly initialized
return jsonify({
'status': 'active' if is_active else 'inactive',
'scanner_initialized': is_initialized,
'last_scan': last_scan_timestamp.isoformat() if last_scan_timestamp else None,
'results_count': len(last_scan_results) if last_scan_results is not None else 0,
'timestamp': datetime.now().isoformat()
})
def _initialize_scanner_background(exchange_id):
"""
Background thread function to initialize scanner
"""
global scanner
try:
logger.info(f"Background init starting for exchange: {exchange_id}")
# Create event loop and initialize scanner
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
scanner = get_scanner(loop, exchange_id)
logger.info(f"✅ Scanner initialized successfully for {exchange_id} (background)")
finally:
loop.close()
except Exception as e:
logger.error(f"❌ Error initializing scanner in background: {str(e)}", exc_info=True)
@app.route('/api/scanner/initialize', methods=['POST'])
def initialize_scanner():
"""Initialize the scanner with a specific exchange (non-blocking)"""
global scanner
try:
data = request.get_json() or {}
exchange_id = data.get('exchange', 'kucoinfutures')
if scanner is not None:
logger.info(f"Scanner already initialized for exchange: {exchange_id}")
return jsonify({
'status': 'already_initialized',
'message': f'Scanner is already initialized',
'exchange': exchange_id,
'timestamp': datetime.now().isoformat()
}), 200
logger.info(f"Queueing scanner initialization for exchange: {exchange_id}")
# Spawn background thread to initialize
init_thread = threading.Thread(target=_initialize_scanner_background, args=(exchange_id,), daemon=True)
init_thread.start()
# Return immediately with 202 Accepted
return jsonify({
'status': 'accepted',
'message': f'Scanner initialization queued for {exchange_id}',
'exchange': exchange_id,
'timestamp': datetime.now().isoformat(),
'note': 'Poll /api/scanner/status to check when scanner_initialized is true'
}), 202
except Exception as e:
logger.error(f"Error queueing scanner initialization: {str(e)}", exc_info=True)
return jsonify({
'error': str(e),
'message': 'Failed to queue scanner initialization'
}), 500
@app.route('/api/scanner/reset', methods=['POST'])
def reset_scanner():
"""Reset scanner state (clears current instance)"""
global scanner, last_scan_results, last_scan_timestamp
try:
if scanner:
# Close exchange connection if needed
if hasattr(scanner, 'exchange') and hasattr(scanner.exchange, 'close'):
try:
asyncio.run(scanner.exchange.close())
except:
pass
scanner = None
last_scan_results = None
last_scan_timestamp = None
logger.info("Scanner reset successfully")
return jsonify({
'status': 'reset',
'message': 'Scanner has been reset',
'timestamp': datetime.now().isoformat()
})
except Exception as e:
logger.error(f"Error resetting scanner: {str(e)}", exc_info=True)
return jsonify({
'error': str(e),
'message': 'Failed to reset scanner'
}), 500
@app.route('/api/scanner/multi-timeframe', methods=['POST'])
def multi_timeframe_confluence():
"""
Scan multiple timeframes and find confluence opportunities
Request body:
{
"symbol": "BTC/USDT",
"timeframes": ["1h", "4h", "1d"],
"minOpportunity": 65
}
"""
try:
data = request.get_json() or {}
symbol = data.get('symbol', 'BTC/USDT')
timeframes = data.get('timeframes', ['short', 'medium', 'daily'])
min_opportunity = data.get('minOpportunity', 65)
logger.info(f"Multi-timeframe analysis for {symbol}: {timeframes}")
scanner_instance = get_scanner()
# Run scans for each timeframe
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
all_results = []
for tf in timeframes:
results = loop.run_until_complete(
scanner_instance.scan_market(
timeframe=tf,
full_analysis=True,
save_results=False
)
)
if not results.empty:
symbol_result = results[results['symbol'] == symbol]
if not symbol_result.empty:
all_results.append({
'timeframe': tf,
'data': symbol_result.iloc[0].to_dict()
})
finally:
loop.close()
if not all_results:
return jsonify({
'symbol': symbol,
'confluence': False,
'message': f'No data found for {symbol} across timeframes',
'timeframes_analyzed': timeframes
})
# Analyze confluence
opportunity_scores = [r['data'].get('opportunity_score', 0) for r in all_results]
signals = [r['data'].get('signal', 'Neutral') for r in all_results]
regimes = [r['data'].get('market_regime', 'unknown') for r in all_results]
# Check for bullish confluence
bullish_count = sum(1 for s in signals if s in ['Strong Buy', 'Buy', 'Weak Buy'])
bearish_count = sum(1 for s in signals if s in ['Strong Sell', 'Sell', 'Weak Sell'])
has_confluence = (bullish_count >= 2 or bearish_count >= 2) and min(opportunity_scores) >= min_opportunity
avg_opportunity = sum(opportunity_scores) / len(opportunity_scores) if opportunity_scores else 0
return jsonify({
'symbol': symbol,
'confluence': has_confluence,
'timeframes_analyzed': len(all_results),
'average_opportunity': round(avg_opportunity, 2),
'bullish_timeframes': bullish_count,
'bearish_timeframes': bearish_count,
'dominant_regime': max(set(regimes), key=regimes.count) if regimes else 'unknown',
'timeframe_results': [
{
'timeframe': r['timeframe'],
'signal': r['data'].get('signal', 'Neutral'),
'opportunity_score': r['data'].get('opportunity_score', 0),
'market_regime': r['data'].get('market_regime', 'unknown'),
'price': r['data'].get('price', 0),
'rsi': r['data'].get('rsi', 50)
}
for r in all_results
],
'recommendation': 'STRONG' if has_confluence and avg_opportunity > 75 else 'MODERATE' if has_confluence else 'WEAK'
})
except Exception as e:
logger.error(f"Error in multi-timeframe analysis: {str(e)}", exc_info=True)
return jsonify({
'error': str(e),
'message': 'Failed to complete multi-timeframe analysis'
}), 500
@app.route('/api/position/calculate', methods=['POST'])
def calculate_position():
"""
Calculate position size based on account and risk parameters
Request body:
{
"accountBalance": 10000,
"riskPerTrade": 2,
"entryPrice": 45000,
"stopLoss": 43000,
"leverage": 1,
"feeRate": 0.001
}
"""
try:
data = request.get_json() or {}
account_balance = float(data.get('accountBalance', 10000))
risk_per_trade = float(data.get('riskPerTrade', 2))
entry_price = float(data.get('entryPrice', 0))
stop_loss = float(data.get('stopLoss', 0))
leverage = float(data.get('leverage', 1))
fee_rate = float(data.get('feeRate', 0.001))
if entry_price <= 0 or stop_loss <= 0:
return jsonify({'error': 'Invalid entry price or stop loss'}), 400
position_calc = TechnicalIndicators.calculate_position_size(
account_balance=account_balance,
risk_per_trade_pct=risk_per_trade,
entry_price=entry_price,
stop_loss=stop_loss,
leverage=leverage,
fee_rate=fee_rate
)
return jsonify(position_calc)
except Exception as e:
logger.error(f"Error calculating position: {str(e)}", exc_info=True)
return jsonify({
'error': str(e),
'message': 'Failed to calculate position size'
}), 500
@app.route('/api/scanner/continuous/start', methods=['POST'])
def start_continuous_scanner():
"""
Start continuous multi-timeframe scanner
Request body:
{
"symbols": ["BTC/USDT", "ETH/USDT", ...],
"exchanges": ["binance", "kucoinfutures"],
"config": {optional config overrides}
}
"""
global continuous_scanner, continuous_scanner_task, continuous_scanner_loop
try:
if continuous_scanner and continuous_scanner.running:
return jsonify({
'status': 'already_running',
'message': 'Continuous scanner is already running'
})
data = request.get_json() or {}
symbols = data.get('symbols', [
'BTC/USDT', 'ETH/USDT', 'SOL/USDT', 'BNB/USDT', 'XRP/USDT',
'ADA/USDT', 'DOGE/USDT', 'MATIC/USDT', 'DOT/USDT', 'LINK/USDT'
])
exchanges = data.get('exchanges', ['binance', 'kucoinfutures'])
config_dict = data.get('config', {})
# Create config
config = StreamConfig(**config_dict) if config_dict else StreamConfig()
# Create continuous scanner
continuous_scanner = ContinuousMultiTimeframeScanner(config)
# Start in background thread with its own event loop
def run_scanner_loop():
global continuous_scanner_loop
continuous_scanner_loop = asyncio.new_event_loop()
asyncio.set_event_loop(continuous_scanner_loop)
try:
continuous_scanner_loop.run_until_complete(
continuous_scanner.start(symbols, exchanges)
)
except Exception as e:
logger.error(f"Continuous scanner error: {e}", exc_info=True)
finally:
continuous_scanner_loop.close()
scanner_thread = threading.Thread(target=run_scanner_loop, daemon=True)
scanner_thread.start()
logger.info(f"Started continuous scanner for {len(symbols)} symbols")
return jsonify({
'status': 'started',
'symbols': symbols,
'exchanges': exchanges,
'timeframes': config.timeframes,
'message': 'Continuous scanner started successfully'
})
except Exception as e:
logger.error(f"Error starting continuous scanner: {str(e)}", exc_info=True)
return jsonify({
'error': str(e),
'message': 'Failed to start continuous scanner'
}), 500
@app.route('/api/scanner/continuous/stop', methods=['POST'])
def stop_continuous_scanner():
"""Stop continuous scanner"""
global continuous_scanner, continuous_scanner_task, continuous_scanner_loop
try:
if not continuous_scanner or not continuous_scanner.running:
return jsonify({
'status': 'not_running',
'message': 'Continuous scanner is not running'
})