Skip to content

Commit 8073c0d

Browse files
committed
Refactor : GCS 폴더명 수정
1 parent e2117e8 commit 8073c0d

9 files changed

Lines changed: 69 additions & 65 deletions

File tree

collector/main.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from shared.worker.redis_to_gcs import RedisToGCSWorker
66
from worker.binance_ws import BinanceWebsocketWorker
77
from worker.binance_rest import BinanceRestWorker
8-
from shared.utils.constants import RAW_DATA_TYPES
8+
from shared.utils.constants import RAW_DATA_TYPES, GCS_UPLOAD_INTERVAL
99

1010

1111
async def main():
@@ -25,6 +25,7 @@ async def main():
2525
data_types=RAW_DATA_TYPES,
2626
redis_key_prefix="queue:spot",
2727
gcs_path_prefix="raw/spot",
28+
interval_seconds=GCS_UPLOAD_INTERVAL,
2829
)
2930
futures_gcs = RedisToGCSWorker(
3031
name="RawToGCS-Futures",
@@ -33,6 +34,7 @@ async def main():
3334
data_types=RAW_DATA_TYPES,
3435
redis_key_prefix="queue:futures",
3536
gcs_path_prefix="raw/futures",
37+
interval_seconds=GCS_UPLOAD_INTERVAL,
3638
)
3739

3840
await asyncio.gather(

collector/worker/binance_rest.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,8 @@ async def run(self):
2222
await asyncio.gather(
2323
self._fetch_open_interest(session),
2424
self._fetch_funding_rate(session),
25+
self._fetch_long_short_ratio(session),
2526
)
26-
if self._lsr_counter % 5 == 0:
27-
await self._fetch_long_short_ratio(session)
28-
self._lsr_counter += 1
2927
await asyncio.sleep(60)
3028

3129
async def _fetch(self, session: aiohttp.ClientSession, endpoint: str, extra_params: dict = None):
@@ -52,7 +50,7 @@ def _save_to_gcs(self, data_type: str, data: dict):
5250
f"year={now.strftime('%Y')}/"
5351
f"month={now.strftime('%m')}/"
5452
f"day={now.strftime('%d')}/"
55-
f"{int(time.time())}_{SYMBOL}.parquet"
53+
f"{now.strftime('%Y-%m-%d %H:%M')}_{SYMBOL}.parquet"
5654
)
5755
self.gcs.upload_parquet(blob_name, df)
5856

processor/load/gcs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,6 @@ def save(self, data_type, data):
2020
f"year={now.strftime('%Y')}/"
2121
f"month={now.strftime('%m')}/"
2222
f"day={now.strftime('%d')}/"
23-
f"{int(time.time())}_{self.symbol}.parquet"
23+
f"{now.strftime('%Y-%m-%d %H:%M')}_{self.symbol}.parquet"
2424
)
2525
self.gcs.upload_parquet(blob_name, df)

processor/main.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from load.redis import RedisLoader
1111
from load.gcs import GCSLoader
1212
from utils import calc_change_percent
13-
from shared.utils.constants import CORE_DATA_TYPES
13+
from shared.utils.constants import CORE_DATA_TYPES, GCS_UPLOAD_INTERVAL
1414

1515

1616
class Processor:
@@ -35,6 +35,7 @@ def __init__(self, symbol="btcusdt"):
3535
data_types=CORE_DATA_TYPES,
3636
redis_key_prefix="queue:core",
3737
gcs_path_prefix="core",
38+
interval_seconds=GCS_UPLOAD_INTERVAL,
3839
)
3940

4041
self.prev_cvd = 0.0

processor/transform/batch.py

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,20 @@ def transform_oi_trend(self, prev_oi=0, price_change_pct=0):
1919
oi_chg = ((curr_oi - prev_oi) / prev_oi * 100) if prev_oi > 0 else 0
2020

2121
# 시그널 로직 고도화 (전문 트레이딩 관점)
22-
# 1. 상승 + OI 상승: 신규 롱 진입 (추세 강화 - 매우 긍정)
22+
# 1. 상승 + OI 상승: 신규 롱 진입 (+1.0)
2323
if price_change_pct > 0.5 and oi_chg > 1.0:
24-
signal = "ACCUMULATION_LONG"
25-
# 2. 하락 + OI 상승: 신규 숏 진입 (추세 하락 강화 - 매우 부정)
24+
signal = 1.0
25+
# 2. 하락 + OI 상승: 신규 숏 진입 (-1.0)
2626
elif price_change_pct < -0.5 and oi_chg > 1.0:
27-
signal = "AGGRESSIVE_SHORTING"
28-
# 3. 상승 + OI 하락: 숏 포지션의 강제 청산/손절로 인한 상승 (추세 반전 가능성)
27+
signal = -1.0
28+
# 3. 상승 + OI 하락: 숏 커버링 (+0.5)
2929
elif price_change_pct > 0.5 and oi_chg < -1.0:
30-
signal = "SHORT_COVERING_RALLY"
31-
# 4. 하락 + OI 하락: 롱 포지션의 항복/손절 (바닥 근접 가능성)
30+
signal = 0.5
31+
# 4. 하락 + OI 하락: 롱 손절 (-0.5)
3232
elif price_change_pct < -0.5 and oi_chg < -1.0:
33-
signal = "LONG_LIQUIDATION_DUMP"
33+
signal = -0.5
3434
else:
35-
signal = "NEUTRAL"
35+
signal = 0.0
3636

3737
return ProcessedOITrend(
3838
symbol=self.symbol,
@@ -57,18 +57,20 @@ def transform_fr_heatmap(self):
5757
deviation = rate - 0.0001
5858

5959
# 8시간마다 결제되는 펀딩비 특성상 0.05% 이상은 매우 극단적인 상태
60+
# heat_level: -1 (Short 과열) ~ 1 (Long 과열)
61+
# squeeze_risk: 0 (안전) ~ 1 (위험)
6062
if rate >= 0.05: # 0.05%
61-
heat, risk = "LONG_OVERHEATED", "CRITICAL"
63+
heat, risk = 1.0, 1.0
6264
elif rate >= 0.02:
63-
heat, risk = "LONG_CROWDED", "HIGH"
65+
heat, risk = 0.5, 0.7
6466
elif rate <= -0.05:
65-
heat, risk = "SHORT_OVERHEATED", "CRITICAL"
67+
heat, risk = -1.0, 1.0
6668
elif rate <= -0.02:
67-
heat, risk = "SHORT_CROWDED", "HIGH"
69+
heat, risk = -0.5, 0.7
6870
elif abs(rate) < 0.01:
69-
heat, risk = "STABLE", "LOW"
71+
heat, risk = 0.0, 0.1
7072
else:
71-
heat, risk = "NORMAL", "MEDIUM"
73+
heat, risk = (0.2 if rate > 0 else -0.2), 0.4
7274

7375
return ProcessedFRHeatmap(
7476
symbol=self.symbol,
@@ -92,22 +94,19 @@ def transform_ls_divergence(self, price_change_pct=0):
9294
short_r = float(ls_data.short_account)
9395
ls_ratio = float(ls_data.long_short_ratio)
9496

95-
is_div, div_type, signal = False, None, "NEUTRAL"
97+
is_div, div_type, signal = False, None, 0.0
9698

97-
# 1. 강세 다이버전스 (Bullish Reversal)
98-
# 가격은 급락하는데, 개미들은 겁먹고 숏을 늘림 (LS Ratio 하락) -> 세력의 매집 시점
99+
# 1. 강세 다이버전스 (Bullish Reversal) (+1.0)
99100
if price_change_pct < -2.0 and ls_ratio < 0.8:
100-
is_div, div_type, signal = True, "BULLISH_CONTRARIAN", "STRONG_BUY_SIGNAL"
101+
is_div, div_type, signal = True, "BULLISH_CONTRARIAN", 1.0
101102

102-
# 2. 약세 다이버전스 (Bearish Reversal)
103-
# 가격은 오르는데, 개미들이 환희에 차서 롱을 늘림 (LS Ratio 급상승) -> 세력의 매도 시점
103+
# 2. 약세 다이버전스 (Bearish Reversal) (-1.0)
104104
elif price_change_pct > 2.0 and ls_ratio > 1.5:
105-
is_div, div_type, signal = True, "BEARISH_CONTRARIAN", "STRONG_SELL_SIGNAL"
105+
is_div, div_type, signal = True, "BEARISH_CONTRARIAN", -1.0
106106

107-
# 3. 롱 트랩 (Long Trap)
108-
# 가격은 횡보하거나 소폭 하락하는데 롱 비율만 계속 높아짐 -> 개미들만 타 있는 배 (무거움)
107+
# 3. 롱 트랩 (Long Trap) (-0.5)
109108
elif abs(price_change_pct) < 0.5 and ls_ratio > 1.3:
110-
is_div, div_type, signal = True, "LONG_TRAP", "WARNING_EXIT_LONG"
109+
is_div, div_type, signal = True, "LONG_TRAP", -0.5
111110

112111
return ProcessedLSDivergence(
113112
symbol=self.symbol,

processor/transform/realtime.py

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,13 @@ def transform_cvd(self, market, prev_cvd=0, price_change_pct=0, interval="1m"):
3434
curr_cvd = prev_cvd + delta
3535

3636
# 시그널 판단 로직 (CVD 다이버전스)
37-
signal = "NEUTRAL"
37+
signal = 0.0
3838
if price_change_pct > 0.05 and delta < 0:
39-
signal = "BEARISH_DIVERGENCE" # 가격은 오르는데 매도세가 강함 (가짜 상승)
39+
signal = -0.7 # BEARISH_DIVERGENCE
4040
elif price_change_pct < -0.05 and delta > 0:
41-
signal = "BULLISH_DIVERGENCE" # 가격은 내리는데 매수세가 강함 (바닥 다지기)
41+
signal = 0.7 # BULLISH_DIVERGENCE
4242
elif abs(price_change_pct) > 0.1 and (price_change_pct * delta > 0):
43-
signal = "TREND_CONFIRMED" # 가격과 매수/매도 방향이 일치 (강한 추세)
43+
signal = 0.5 if delta > 0 else -0.5 # TREND_CONFIRMED
4444

4545
return ProcessedCVD(
4646
symbol=self.symbol,
@@ -50,7 +50,7 @@ def transform_cvd(self, market, prev_cvd=0, price_change_pct=0, interval="1m"):
5050
buy_volume=0 if is_sell else qty,
5151
sell_volume=qty if is_sell else 0,
5252
delta=delta,
53-
signal=signal # 모델 필드에 signal이 있다고 가정하거나 추가 필요
53+
signal=signal
5454
)
5555

5656
def transform_book_imbalance(self, market):
@@ -69,17 +69,17 @@ def transform_book_imbalance(self, market):
6969

7070
ratio = (bid_total - ask_total) / total
7171

72-
# 불균형 강도에 따른 시그널 세분화
72+
# 불균형 강도에 따른 시그널 세분화 (-1 to 1)
7373
if ratio > 0.4:
74-
signal = "STRONG_BUY_IMMINE" # 매수벽이 압도적 (지지선 형성)
74+
signal = 1.0 # STRONG_BUY_IMMINE
7575
elif ratio > 0.15:
76-
signal = "BUY_PRESSURE"
76+
signal = 0.5 # BUY_PRESSURE
7777
elif ratio < -0.4:
78-
signal = "STRONG_SELL_IMMINE" # 매도벽이 압도적 (저항선 형성)
78+
signal = -1.0 # STRONG_SELL_IMMINE
7979
elif ratio < -0.15:
80-
signal = "SELL_PRESSURE"
80+
signal = -0.5 # SELL_PRESSURE
8181
else:
82-
signal = "NEUTRAL"
82+
signal = 0.0
8383

8484
return ProcessedBookImbalance(
8585
symbol=self.symbol,
@@ -104,13 +104,13 @@ def transform_spread_analysis(self, market):
104104
spread = best_ask - best_bid
105105
spread_pct = (spread / best_bid) * 100
106106

107-
# 비트코인 선물 기준 스프레드 상태 (0.01% 내외가 정상)
107+
# liquidity_status: 1.0 (Healthy) to 0.0 (Critical)
108108
if spread_pct <= 0.015:
109-
status = "HEALTHY"
109+
status = 1.0 # HEALTHY
110110
elif spread_pct <= 0.04:
111-
status = "THIN_LIQUIDITY" # 유동성 부족 (작은 물량에도 큰 변동 위험)
111+
status = 0.5 # THIN_LIQUIDITY
112112
else:
113-
status = "HIGH_VOLATILITY_ALERT" # 급변동 중 (매매 주의)
113+
status = 0.0 # HIGH_VOLATILITY_ALERT
114114

115115
return ProcessedSpreadAnalysis(
116116
symbol=self.symbol,
@@ -177,11 +177,11 @@ def transform_liq_spike(self, market, avg_liq_1h=10000):
177177
is_spike = ratio > 5.0 # 5배 이상 터졌을 때 유의미한 스파이크
178178
is_long_liq = liq.side == "SELL" # 롱 포지션이 강제 매도됨
179179

180-
# 시그널: 롱 대량 청산 시 '바닥권 매수 기회', 숏 대량 청산 시 '고점 매도 기회'
180+
# 시그널: 1.0 (Bullish reversal from long liq) to -1.0 (Bearish reversal from short liq)
181181
if is_spike:
182-
signal = "POTENTIAL_BOTTOM" if is_long_liq else "POTENTIAL_TOP"
182+
signal = 1.0 if is_long_liq else -1.0
183183
else:
184-
signal = "NEUTRAL"
184+
signal = 0.0
185185

186186
return ProcessedLiqSpike(
187187
symbol=self.symbol,
@@ -216,13 +216,13 @@ def transform_price_vol_spike(self, market, avg_volume=0, vol_std_dev=0):
216216
is_price_spike = abs(price_chg) > 0.4
217217

218218
if is_vol_spike and is_price_spike:
219-
signal = "VOL_DRIVEN_BREAKOUT" if price_chg > 0 else "VOL_DRIVEN_DUMP"
219+
signal = 1.0 if price_chg > 0 else -1.0 # VOL_DRIVEN_BREAKOUT / DUMP
220220
elif not is_vol_spike and is_price_spike:
221-
signal = "LOW_VOL_FAKE_MOVE" # 거래량 없는 가격 움직임 (트랩 가능성)
221+
signal = 0.3 if price_chg > 0 else -0.3 # LOW_VOL_FAKE_MOVE
222222
elif is_vol_spike and not is_price_spike:
223-
signal = "ABSORPTION" # 거래량은 터지는데 가격이 안 움직임 (물량 소화 중)
223+
signal = 0.1 # ABSORPTION (Indecision)
224224
else:
225-
signal = "NORMAL"
225+
signal = 0.0
226226

227227
return ProcessedPriceVolSpike(
228228
symbol=self.symbol,

shared/models/core.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,15 @@ class ProcessedCVD(BaseProcessed):
2424
buy_volume: float
2525
sell_volume: float
2626
delta: float
27+
signal: float = 0.0 # -1 (Bearish) to 1 (Bullish)
2728

2829

2930
class ProcessedBookImbalance(BaseProcessed):
3031
"""Book Imbalance - 소스: Order Book"""
3132
bid_total: float
3233
ask_total: float
3334
imbalance_ratio: float
34-
signal: str # BUY_PRESSURE, SELL_PRESSURE, NEUTRAL
35+
signal: float # -1 to 1
3536

3637

3738
class ProcessedSpreadAnalysis(BaseProcessed):
@@ -40,7 +41,7 @@ class ProcessedSpreadAnalysis(BaseProcessed):
4041
best_ask: float
4142
spread: float
4243
spread_percent: float
43-
liquidity_status: str # NORMAL, LOW, CRITICAL
44+
liquidity_status: float # 0 to 1 (Safety/Health score)
4445

4546

4647
class ProcessedWallDetection(BaseProcessed):
@@ -59,7 +60,7 @@ class ProcessedLiqSpike(BaseProcessed):
5960
long_liq_value: float
6061
short_liq_value: float
6162
is_spike: bool
62-
signal: str # LONG_SQUEEZE, SHORT_SQUEEZE, NEUTRAL
63+
signal: float # -1 to 1
6364

6465

6566
class ProcessedPriceVolSpike(BaseProcessed):
@@ -70,23 +71,23 @@ class ProcessedPriceVolSpike(BaseProcessed):
7071
volume_ratio: float
7172
is_price_spike: bool
7273
is_volume_spike: bool
73-
signal: str # IMPULSE_UP, IMPULSE_DOWN, NORMAL
74+
signal: float # -1 to 1
7475

7576

7677
class ProcessedOITrend(BaseProcessed):
7778
"""OI Trend - 소스: Open Interest"""
7879
open_interest: float
7980
oi_change_percent: float
8081
price_change_percent: float
81-
trend_signal: str # HEALTHY_LONG, HEALTHY_SHORT, WEAK_LONG, WEAK_SHORT, NEUTRAL
82+
trend_signal: float # -1 to 1
8283

8384

8485
class ProcessedFRHeatmap(BaseProcessed):
8586
"""FR Heatmap - 소스: Funding Rate"""
8687
funding_rate: float
8788
deviation: float
88-
heat_level: str # EXTREME_LONG, HIGH_LONG, NORMAL, HIGH_SHORT, EXTREME_SHORT
89-
squeeze_risk: str # HIGH, MEDIUM, LOW
89+
heat_level: float # -1 to 1
90+
squeeze_risk: float # 0 to 1
9091

9192

9293
class ProcessedLSDivergence(BaseProcessed):
@@ -96,5 +97,5 @@ class ProcessedLSDivergence(BaseProcessed):
9697
ls_ratio: float
9798
price_change_percent: float
9899
is_divergence: bool
99-
divergence_type: Optional[str] # BEARISH_DIV, BULLISH_DIV
100-
signal: str # CONTRARIAN_SHORT, CONTRARIAN_LONG, NEUTRAL
100+
divergence_type: Optional[str]
101+
signal: float # -1 to 1

shared/utils/constants.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,6 @@
2222
"cvd", "book_imbalance", "spread_analysis",
2323
"wall_detection", "price_vol_spike", "liq_spike",
2424
]
25+
26+
# intervals
27+
GCS_UPLOAD_INTERVAL = 60

shared/worker/redis_to_gcs.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ def __init__(
1919
data_types: List[str],
2020
redis_key_prefix: str,
2121
gcs_path_prefix: str,
22-
interval_seconds: int = 300,
22+
interval_seconds: int,
2323
):
2424
self.redis = redis
2525
self.gcs = gcs
@@ -65,7 +65,7 @@ async def _transfer(self, data_type: str):
6565
f"year={now.strftime('%Y')}/"
6666
f"month={now.strftime('%m')}/"
6767
f"day={now.strftime('%d')}/"
68-
f"{int(time.time())}_{SYMBOL}.parquet"
68+
f"{now.strftime('%Y-%m-%d %H:%M')}_{SYMBOL}.parquet"
6969
)
7070

7171
self.gcs.upload_parquet(blob_name, df)

0 commit comments

Comments
 (0)