-
Notifications
You must be signed in to change notification settings - Fork 86
Expand file tree
/
Copy pathloaddata.py
More file actions
1567 lines (1313 loc) · 61.9 KB
/
loaddata.py
File metadata and controls
1567 lines (1313 loc) · 61.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
"""
Data Loading and Preprocessing Module
This module handles loading and preprocessing of market data for the statistical
arbitrage trading system. It provides functions to load various data sources including:
- Stock universe definitions with filters for price, volume, and market cap
- Daily OHLCV price data from CSV files
- Intraday tick bar data aggregated into 30-minute intervals
- Barra risk factors (beta, volatility, momentum, size, etc.)
- Fundamental data (earnings dates, analyst estimates, ratings)
- Short locate availability data
Key Functions:
get_uni(): Load and filter stock universe based on criteria
load_prices(): Load daily price data and calculate returns
load_bars(): Load and aggregate intraday bar data with VWAP
load_barra(): Load Barra risk factors and industry classifications
load_earnings(): Load earnings announcement dates
load_locates(): Load short borrow availability data
load_ratings_hist(): Load analyst rating change history
load_cache(): Cache preprocessed data in HDF5 format for faster access
Universe Filters:
- Price range: $2.00 - $500.00
- Minimum ADV: $1M (tradable) / $5M (expandable)
- Country: USA only
- Currency: USD only
- Top stocks by market cap (default: 1400)
The module caches processed data in HDF5 format to improve performance on
subsequent runs.
"""
from __future__ import division, print_function
import sys
import os
import glob
import re
import math
import logging
from dateutil import parser as dateparser
import time
from datetime import datetime
from datetime import timedelta
import numpy as np
import pandas as pd
import pandas.io.sql as psql
import sqlite3 as lite
from util import *
from calc import *
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
UNIV_BASE_DIR = ""
SECDATA_BASE_DIR = ""
PRICE_BASE_DIR = ""
BARRA_BASE_DIR = ""
BAR_BASE_DIR = ""
EARNINGS_BASE_DIR = ""
LOCATES_BASE_DIR = ""
#ESTIMATES_BASE_DIR = ""
ESTIMATES_BASE_DIR = ""
t_low_price = 2.0
t_high_price = 500.0
t_min_advp = 1000000.0
e_low_price = 2.25
e_high_price = 500.0
e_min_advp = 5000000.0
fromtimestamp = lambda x:datetime.fromtimestamp(int(x) / 1000.)
def get_uni(start, end, lookback, uni_size=1400):
"""
Load and filter stock universe based on multiple criteria.
Applies a series of filters to create a tradable universe of stocks:
1. Country/Currency: USA only, USD only
2. Barra estimation universe membership (estu_barra4s) or REITs (ind=404020)
3. Excludes biotechnology stocks (group=3520)
4. Price range: $2.00-$500.00 (t_low_price to t_high_price)
5. Minimum ADV: $1M tradable (t_min_advp)
6. Market cap ranking: top N stocks (default 1400)
Args:
start: datetime, simulation start date
end: datetime, simulation end date
lookback: int, days before start to determine universe date
uni_size: int, maximum number of stocks to include (default 1400)
Returns:
DataFrame with columns ['symbol', 'sector_name'], indexed by sid.
Contains filtered universe of tradable stocks.
File Dependencies:
- UNIV_BASE_DIR/{year}/{date}.csv - Universe definitions
- SECDATA_BASE_DIR/{year}/{date}.csv.gz - Security data with sectors
- PRICE_BASE_DIR/{year}/{date}.csv - Price/volume data
"""
# Validate input parameters
if not isinstance(start, datetime) or not isinstance(end, datetime):
raise ValueError("start and end must be datetime objects")
if start >= end:
raise ValueError("start date must be before end date: start={}, end={}".format(start, end))
if not isinstance(lookback, int) or lookback < 0:
raise ValueError("lookback must be a non-negative integer, got: {}".format(lookback))
if not isinstance(uni_size, int) or uni_size <= 0:
raise ValueError("uni_size must be a positive integer, got: {}".format(uni_size))
unidate = start - timedelta(days=lookback)
year = unidate.strftime("%Y")
unidate = unidate.strftime("%Y%m%d")
univ_dir = UNIV_BASE_DIR + year
univ_file = univ_dir + "/" + unidate + ".csv"
try:
univ_df = pd.read_csv(univ_file, header=0, usecols=['sid', 'ticker_root', 'status', 'country', 'currency'], index_col=['sid'])
except IOError as e:
raise IOError("Failed to load universe file {}: {}".format(univ_file, str(e)))
except Exception as e:
raise ValueError("Error reading universe file {}: {}".format(univ_file, str(e)))
if univ_df.empty:
raise ValueError("Universe file {} is empty".format(univ_file))
if univ_df.isnull().all().all():
raise ValueError("Universe file {} contains all NaN values".format(univ_file))
print("Universe size (raw): {}".format(len(univ_df.index)))
univ_df = univ_df[ (univ_df['country'] == "USA") & (univ_df['currency'] == "USD") ]
print("Universe size (US/USD): {}".format(len(univ_df.index)))
if univ_df.empty:
logging.warning("No US/USD stocks found in universe for date {}".format(unidate))
secdata_dir = SECDATA_BASE_DIR + year
secdata_file = secdata_dir + "/" + unidate + ".csv.gz"
try:
secdata_df = pd.read_csv(secdata_file, header=0, compression='gzip', usecols=['sid', 'symbol', 'sector', 'ind', 'group', 'sector_name', 'ind_name', 'estu_inter', 'estu_barra4s'], index_col=['sid'])
except IOError as e:
raise IOError("Failed to load secdata file {}: {}".format(secdata_file, str(e)))
except Exception as e:
raise ValueError("Error reading secdata file {}: {}".format(secdata_file, str(e)))
if secdata_df.empty:
raise ValueError("Secdata file {} is empty".format(secdata_file))
univ_df = pd.merge(univ_df, secdata_df, how='inner', left_index=True, right_index=True, sort=True)
print("Universe size (secdata): {}".format(len(univ_df.index)))
univ_df = univ_df[ (univ_df['estu_barra4s'] == 1) | (univ_df['ind'] == 404020) ]
del univ_df['estu_inter']
del univ_df['estu_barra4s']
print("Universe size (estu_inter): {}".format(len(univ_df.index)))
univ_df = univ_df[ univ_df['group'] != 3520 ]
print("Universe size (bio): {}".format(len(univ_df.index)))
price_dir = PRICE_BASE_DIR + year
price_file = price_dir + "/" + unidate + ".csv"
try:
price_df = pd.read_csv(price_file, header=0, usecols=['sid', 'ticker', 'close', 'tradable_med_volume_21', 'mkt_cap'], index_col=['sid'])
except IOError as e:
raise IOError("Failed to load price file {}: {}".format(price_file, str(e)))
except Exception as e:
raise ValueError("Error reading price file {}: {}".format(price_file, str(e)))
if price_df.empty:
raise ValueError("Price file {} is empty".format(price_file))
univ_df = pd.merge(univ_df, price_df, how='inner', left_index=True, right_index=True, sort=True)
print("Universe size (prices): {}".format(len(univ_df.index)) )
# Check for invalid price data
if np.isinf(univ_df['close']).any():
logging.warning("Found infinite values in close prices, filtering out")
univ_df = univ_df[~np.isinf(univ_df['close'])]
univ_df = univ_df[ (univ_df['close'] > t_low_price) & (univ_df['close'] < t_high_price) ]
print("Universe size (price range): {}".format(len(univ_df.index)))
univ_df['mdvp'] = univ_df['tradable_med_volume_21'] * univ_df['close']
# Check for invalid volume data
if np.isinf(univ_df['mdvp']).any():
logging.warning("Found infinite values in mdvp, filtering out")
univ_df = univ_df[~np.isinf(univ_df['mdvp'])]
univ_df = univ_df[ univ_df['mdvp'] > t_min_advp ]
print("Universe size (adv): {}".format(len(univ_df.index)))
univ_df['rank'] = univ_df['mkt_cap'].fillna(0).rank(ascending=False)
univ_df = univ_df[ univ_df['rank'] <= uni_size ]
print("Universe size (mktcap): {}".format(len(univ_df.index)))
if univ_df.empty:
raise ValueError("Final universe is empty after all filters. Check universe parameters.")
result_df = univ_df[ ['symbol', 'sector_name'] ]
# Final validation
if result_df.isnull().any().any():
logging.warning("Final universe contains NaN values in symbol or sector_name columns")
return result_df
# univ_df = univ_df[['ticker_root']]
# date = start
# result_df = pd.DataFrame()
# while( date < end ):
# dateStr = date.strftime("%Y%m%d")
# year = date.strftime("%Y")
# secdata_dir = SECDATA_BASE_DIR + year
# secdata_file = secdata_dir + "/" + dateStr + ".estu.csv.gz"
# try:
# secdata_df = pd.read_csv(secdata_file, header=0, compression='gzip', usecols=['sid', 'symbol', 'sector', 'ind', 'sector_name', 'ind_name'])
# except IOError:
# print "File not found: {}".format(secdata_file)
# date += timedelta(days=1)
# continue
# secdata_df.set_index('sid', inplace=True)
# secdata_df = pd.merge(univ_df, secdata_df, how='inner', left_index=True, right_index=True, sort=True, suffixes=['', '_dead'])
# secdata_df['ticker'] = secdata_df['symbol']
# del secdata_df['symbol']
# secdata_df['date'] = date
# secdata_df.reset_index(inplace=True)
# result_df = result_df.append(secdata_df)
# date += timedelta(days=1)
# result_df.set_index(keys=['date', 'sid'], inplace=True)
# result_df = result_df.unstack().asfreq('D', method='pad').stack()
# result_df.index.names = ['date', 'sid']
# return result_df
def load_barra(uni_df, start, end, cols=None):
"""
Load Barra risk factors for the given universe and date range.
Loads daily Barra USE4S risk model data including style factors (beta,
momentum, size, volatility, etc.) and industry classifications.
Args:
uni_df: DataFrame, universe of stocks (from get_uni)
start: datetime, start date
end: datetime, end date
cols: list of str or None, specific columns to load (sid auto-added)
Returns:
DataFrame indexed by (date, sid) containing Barra risk factors.
Common columns: hbeta, pbeta, srisk_pct, trisk_pct, momentum,
size, sizenl, beta, resvol, leverage, liquidty, indname1, etc.
File Dependencies:
- BARRA_BASE_DIR/{year}/{date}.use4s_rsk.rev.csv
"""
if cols is not None:
cols.append('sid')
# if cols is None:
# cols = ['hbeta', 'pbeta', 'srisk_pct', 'trisk_pct', 'country', 'growth', 'size', 'sizenl', 'divyild', 'btop', 'earnyild', 'beta', 'resvol', 'betanl', 'momentum', 'leverage', 'liquidty', 'yld_pct', 'indname1', 'ind1', 'sid']
date = start
result_dfs = list()
while ( date < end ):
dateStr = date.strftime('%Y%m%d')
year = dateStr[0:4]
barra_dir = BARRA_BASE_DIR + year
barra_file = barra_dir + "/" + dateStr + ".use4s_rsk.rev.csv"
print("Loading {}".format(barra_file))
try:
if cols is not None:
barra_df = pd.read_csv(barra_file, usecols=cols)
else:
barra_df = pd.read_csv(barra_file)
except IOError:
print("File not found: {}".format(barra_file))
date += timedelta(days=1)
continue
barra_df['date'] = date
#should be left join, but the nans create an issue downstream with unstacking...
barra_df = pd.merge(uni_df.reset_index(), barra_df, how='inner', left_on=['sid'], right_on=['sid'], sort=False)
barra_df.set_index(keys=['date', 'sid'], inplace=True)
result_dfs.append(barra_df)
date += timedelta(days=1)
result_df = pd.concat(result_dfs, verify_integrity=True)
result_df = remove_dup_cols(result_df)
return result_df
def load_daybars(uni_df, start, end, cols=None, freq='30Min'):
"""
Load daily bar summaries with intraday high/low and VWAP.
Loads pre-aggregated daily bars that include both daily OHLC and
intraday extremes. Data is resampled to specified frequency.
Args:
uni_df: DataFrame, universe of stocks (must have 'ticker' column)
start: datetime, start date
end: datetime, end date
cols: list of str or None, columns to load (default includes OHLC, VWAP)
freq: str, resampling frequency (default '30Min')
Returns:
DataFrame indexed by (iclose_ts, sid) with daily bar data.
Columns include: dopen, dhigh, dlow, qhigh, qlow, dvwap, dvolume, dtrades
File Dependencies:
- BAR_BASE_DIR/{date}/daily.txt.gz
"""
if cols is None:
cols = ['dopen', 'dhigh', 'dlow', 'qhigh', 'qlow', 'dvwap', 'dvolume', 'dtrades']
cols.extend( ['ticker', 'iclose_ts', 'iclose', 'dvolume', 'dopen' ] )
date = start
result_dfs = list()
uni_df = uni_df[ ['ticker'] ]
uni_df = uni_df.reset_index()
while ( date < end ):
dateStr = date.strftime("%Y%m%d")
bar_file = BAR_BASE_DIR + dateStr + "/daily.txt.gz"
print("Loading {}".format(bar_file))
try:
bars_df = pd.read_csv(bar_file, compression='gzip', sep="|", header=None, names=['ticker', 'iclose_ts', 'dopen', 'dhigh', 'dlow', 'iclose', 'qhigh', 'qlow', 'dvwap', 'dvolume', 'dtrades', 'open_c', 'open_c_volume', 'close_c', 'close_c_volume'], index_col=['iclose_ts'], converters={'iclose_ts': fromtimestamp}, usecols=cols)
except IOError:
print("File not found: {}".format(bar_file))
date += timedelta(days=1)
continue
bars_df = bars_df.between_time('09:30:00', '16:00:01')
bars_df.set_index('ticker', append=True, inplace=True)
#change closed to 'left' maybe?
bars_df = bars_df.unstack(level=-1).resample(freq, how='last', closed='right', label='right').stack()
bars_df.reset_index(inplace=True)
bars_df['date'] = date
bars_df['giclose_ts'] = bars_df['iclose_ts']
bars_df = pd.merge(uni_df[ uni_df['date'] == date ], bars_df, how='inner', left_on=['date', 'ticker'], right_on=['date', 'ticker'], sort=True, suffixes=['', '_dead'])
bars_df['gsid'] = bars_df['sid']
result_dfs.append(bars_df)
date += timedelta(days=1)
result_df = pd.concat(result_dfs)
result_df.set_index(keys=['iclose_ts', 'sid'], inplace=True)
result_df = remove_dup_cols(result_df)
return result_df
def load_prices(uni_df, start, end, cols):
"""
Load daily price data with returns and volume metrics.
Primary data loader for equity prices including OHLC, returns,
volatility, and median volume calculations. Also computes lagged
values and expandable universe flags.
Args:
uni_df: DataFrame, universe of stocks
start: datetime, start date
end: datetime, end date
cols: list of str or None, columns to load
Returns:
DataFrame indexed by (date, sid) containing:
- Price data: open, high, low, close
- Returns: ret, log_ret, overnight_log_ret, today_log_ret
- Volume: prim_volume, comp_volume, tradable_med_volume_*
- Volatility: volat_21, volat_60, overnight_volat_21
- Corporate actions: split, div
- Derived: mdvp (median dollar volume), expandable flag
- Lagged values (*_y suffix) from lag_data()
File Dependencies:
- PRICE_BASE_DIR/{year}/{date}.equ_prices.rev.csv
"""
# Validate inputs
if uni_df is None or uni_df.empty:
raise ValueError("uni_df cannot be None or empty")
if not isinstance(start, datetime) or not isinstance(end, datetime):
raise ValueError("start and end must be datetime objects")
if start >= end:
raise ValueError("start date must be before end date: start={}, end={}".format(start, end))
if cols is None:
cols = ['open', 'high', 'low', 'prim_volume', 'comp_volume', 'shares_out', 'ret', 'log_ret', 'prim_med_volume_21', 'prim_med_volume_60', 'comp_med_volume_21', 'comp_med_volume_60', 'tradable_med_volume_60', 'volat_21', 'volat_60', 'cum_log_ret', 'overnight_log_ret', 'today_log_ret', 'overnight_volat_21', 'today_volat_21', 'split', 'div']
cols.extend( ['sid', 'ticker', 'close', 'tradable_med_volume_21', 'log_ret', 'tradable_volume', 'mkt_cap'] )
date = start
result_dfs = list()
while ( date < end ):
dateStr = date.strftime('%Y%m%d')
year = dateStr[0:4]
price_dir = PRICE_BASE_DIR + year
price_file = price_dir + "/" + dateStr + ".equ_prices.rev.csv"
print("Loading {}".format(price_file))
try:
prices_df = pd.read_csv(price_file, header=0, usecols=cols)
except IOError:
print("File not found: {}".format(price_file))
date += timedelta(days=1)
continue
prices_df['date'] = prices_df['gdate'] = date
# print uni_df.head()
prices_df = pd.merge(uni_df.reset_index(), prices_df.reset_index(), how='inner', left_on=['sid'], right_on=['sid'], sort=True, suffixes=['', '_dead'])
prices_df.set_index(keys=['date', 'sid'], inplace=True)
result_dfs.append(prices_df)
date += timedelta(days=1)
if not result_dfs:
raise ValueError("No price data loaded for date range {} to {}".format(start, end))
result_df = pd.concat(result_dfs, verify_integrity=True)
if result_df.empty:
raise ValueError("Price data is empty after loading and merging")
# Check for NaN/inf in critical columns
if 'close' in result_df.columns:
if result_df['close'].isnull().all():
raise ValueError("All close prices are NaN")
if np.isinf(result_df['close']).any():
logging.warning("Found infinite values in close prices, filtering out")
result_df = result_df[~np.isinf(result_df['close'])]
result_df['tradable_med_volume_21'] = result_df['tradable_med_volume_21'].astype(float)
if 'shares_out' in result_df.columns:
result_df['shares_out'] = result_df['shares_out'].astype(float)
result_df['tradable_volume'] = result_df['tradable_volume'].astype(float)
if 'comp_voume' in result_df.columns:
result_df['comp_volume'] = result_df['comp_volume'].astype(float)
result_df['mdvp'] = result_df['close'] * result_df['tradable_med_volume_21']
# Check for invalid mdvp values
if np.isinf(result_df['mdvp']).any():
logging.warning("Found infinite values in mdvp")
result_df.loc[np.isinf(result_df['mdvp']), 'mdvp'] = np.nan
result_df = lag_data(result_df)
result_df['expandable'] = (result_df['close_y'] > e_low_price) & (result_df['close_y'] < e_high_price) & (result_df['mdvp_y'] > e_min_advp)
for col in ['ind_name_y', 'name_y', 'sector_name_y', 'sector_y', 'ticker_y', 'gdate_y', 'ticker_root_y']:
if col in result_df.columns:
del result_df[col]
return result_df
def load_volume_profile(uni_df, start, end, freq='30Min'):
"""
Load intraday volume profiles showing median cumulative volume by time.
Loads 20-day median volume profiles for each stock, which are used
for VWAP calculations and participation rate estimates.
Args:
uni_df: DataFrame, universe of stocks
start: datetime, start date
end: datetime, end date
freq: str, resampling frequency (default '30Min')
Returns:
DataFrame indexed by (iclose_ts, sid) with med_cum_volume column
showing the median cumulative volume at each time slice.
File Dependencies:
- PRICE_BASE_DIR/{year}/{date}.equ_volume_profiles_20d.rev.csv
"""
date = start
result_dfs = list()
while ( date < end ):
dateStr = date.strftime('%Y%m%d')
year = dateStr[0:4]
price_dir = PRICE_BASE_DIR + year
volume_file = price_dir + "/" + dateStr + ".equ_volume_profiles_20d.rev.csv"
print("Loading {}".format(volume_file))
try:
volume_df = pd.read_csv(volume_file, header=0, index_col=['sid'])
except IOError:
print("File not found: {}".format(volume_file))
date += timedelta(days=1)
continue
print("stacking...")
volume_df = volume_df.stack()
volume_df = volume_df.reset_index()
volume_df = volume_df[ (volume_df['level_1'] != 'med_open_volume' ) & (volume_df['level_1'] != 'med_close_volume') & (volume_df['level_1'] != 'med_cum_pre_mkt_volume') & (volume_df['level_1'] != 'med_cum_post_mkt_volume')]
timemap = dict()
print("parsing dates..." )
for rawtime in volume_df['level_1'].unique():
val = None
try:
val = dateparser.parse(dateStr + " " + rawtime[:-2] + ":" + rawtime[-2:])
except:
pass
timemap[rawtime] = val
print("mapping dates...")
volume_df['iclose_ts'] = volume_df['level_1'].apply(lambda x: timemap[x])
volume_df['date'] = date
volume_df.set_index(keys=['date', 'sid'], inplace=True)
print("merging...")
volume_df = pd.merge(uni_df, volume_df, how='inner', left_index=True, right_index=True, sort=True, suffixes=['', '_dead'])
volume_df.reset_index(inplace=True)
grouped = volume_df.groupby('sid')
print("accumulating volumes...")
for name, group in grouped:
group['med_cum_volume'] = pd.expanding_sum(group[0])
del group[0]
group['sid'] = name
group = group.reset_index()
# print group.head()
group.set_index('iclose_ts', inplace=True)
group_df = group.resample(freq, how='last', closed='right', label='right')
# print group_df.head()
result_dfs.append( group_df )
date += timedelta(days=1)
result_df = pd.concat(result_dfs)
result_df = result_df.reset_index()
print(result_df.head())
result_df['iclose_ts'] = result_df['level_0']
del result_df['level_0']
result_df.set_index(keys=['iclose_ts', 'sid'], inplace=True)
result_df = remove_dup_cols(result_df)
return result_df
def aggregate_bars(bars_df, freq=30):
"""
Aggregate tick-level bars into specified time intervals.
Takes raw bars (typically 1-minute or tick data) and aggregates into
larger time buckets. Computes both cumulative (from market open) and
interval-specific metrics (suffix _b).
Args:
bars_df: DataFrame with tick bar data indexed by timestamp
freq: int, aggregation frequency in minutes (default 30)
Returns:
DataFrame with aggregated bars including:
- OHLC: bopen, bhigh, blow, blast, iclose
- Volume: bvwap, bvolume, btrades
- Microstructure: meanSpread, meanEffectiveSpread, bidHitTrades,
askHitTrades, outsideTrades, etc.
- Derived metrics: insideness (log effective/quoted spread),
adj_trade_size, spread_bps
Notes:
- Cumulative columns aggregate from market open
- Interval columns (suffix _b) aggregate within each bucket
"""
print("Aggregating bars...")
#ASSUMES SORTED
start = bars_df['bopen_ts'].min()
t0 = start
t1 = t0 + timedelta(minutes=freq)
end = bars_df.index.max()
agg_bars_dfs = list()
while ( t1 <= end ):
print("Grouping to {}".format(t1))
sub_df = bars_df.truncate(after=t1)
grouped = sub_df.groupby('ticker')
df_d = dict()
df_d['bopen_ts'] = start
df_d['iclose_ts'] = t1
df_d['bopen'] = grouped['bopen'].first()
df_d['bfirst'] = grouped['bfirst'].first()
df_d['bigh'] = grouped['bhigh'].max()
df_d['blow'] = grouped['blow'].min()
df_d['blast'] = grouped['blast'].last()
df_d['iclose'] = grouped['iclose'].last()
df_d['bvwap'] = grouped.apply(lambda x: (x['bvwap'] * x['bvolume'] / x['bvolume'].sum()).sum())
df_d['bvolume'] = grouped['bvolume'].sum()
df_d['btrades'] = grouped['btrades'].sum()
df_d['meanSpread'] = grouped['meanSpread'].mean()
df_d['meanEffectiveSpread'] = grouped['meanEffectiveSpread'].mean()
df_d['meanBidSize'] = grouped['meanBidSize'].mean()
df_d['meanAskSize'] = grouped['meanAskSize'].mean()
df_d['bidHitTrades'] = grouped['bidHitTrades'].sum()
df_d['midHitTrades'] = grouped['midHitTrades'].sum()
df_d['askHitTrades'] = grouped['askHitTrades'].sum()
df_d['bidHitDollars'] = grouped['bidHitDollars'].sum()
df_d['midHitDollars'] = grouped['midHitDollars'].sum()
df_d['askHitDollars'] = grouped['askHitDollars'].sum()
df_d['outsideTrades'] = grouped['outsideTrades'].sum()
df_d['outsideDollars'] = grouped['outsideDollars'].sum()
sub_df = bars_df.truncate(before=t0, after=t1)
grouped = sub_df.groupby('ticker')
df_d['bopen_b'] = grouped['bopen'].first()
df_d['bfirst_b'] = grouped['bfirst'].first()
df_d['bigh_b'] = grouped['bhigh'].max()
df_d['blow_b'] = grouped['blow'].min()
df_d['blast_b'] = grouped['blast'].last()
df_d['iclose_b'] = grouped['iclose'].last()
df_d['bvwap_b'] = grouped.apply(lambda x: (x['bvwap'] * x['bvolume'] / x['bvolume'].sum()).sum())
df_d['bvolume_b'] = grouped['bvolume'].sum()
df_d['btrades_b'] = grouped['btrades'].sum()
df_d['meanSpread_b'] = grouped['meanSpread'].mean()
df_d['meanEffectiveSpread_b'] = grouped['meanEffectiveSpread'].mean()
df_d['meanBidSize_b'] = grouped['meanBidSize'].mean()
df_d['meanAskSize_b'] = grouped['meanAskSize'].mean()
df_d['bidHitTrades_b'] = grouped['bidHitTrades'].sum()
df_d['midHitTrades_b'] = grouped['midHitTrades'].sum()
df_d['askHitTrades_b'] = grouped['askHitTrades'].sum()
df_d['bidHitDollars_b'] = grouped['bidHitDollars'].sum()
df_d['midHitDollars_b'] = grouped['midHitDollars'].sum()
df_d['askHitDollars_b'] = grouped['askHitDollars'].sum()
df_d['outsideTrades_b'] = grouped['outsideTrades'].sum()
df_d['outsideDollars_b'] = grouped['outsideDollars'].sum()
df = pd.DataFrame(df_d)
df = df.reset_index()
agg_bars_dfs.append(df)
t0 = t1
t1 += timedelta(minutes=freq)
agg_bars_df = pd.concat(agg_bars_dfs)
agg_bars_df['insideness'] = np.log(agg_bars_df['meanEffectiveSpread']/agg_bars_df['meanSpread'])
agg_bars_df['adj_trade_size'] = agg_bars_df['bvolume']/agg_bars_df['btrades'] / agg_bars_df['bvwap']
agg_bars_df['spread_bps'] = agg_bars_df['meanSpread'] / agg_bars_df['bvwap']
agg_bars_df['insideness_b'] = np.log(agg_bars_df['meanEffectiveSpread_b']/agg_bars_df['meanSpread_b'])
agg_bars_df['adj_trade_size_b'] = agg_bars_df['bvolume_b']/agg_bars_df['btrades_b'] / agg_bars_df['bvwap_b']
agg_bars_df['spread_bps_b'] = agg_bars_df['meanSpread_b'] / agg_bars_df['bvwap_b']
return agg_bars_df
def load_bars(uni_df, start, end, cols=None, freq=30):
"""
Load and aggregate intraday tick bars for the given universe.
Primary intraday data loader that reads raw bar files, filters to
market hours (9:30-16:00), aggregates to specified frequency, and
joins with universe data.
Args:
uni_df: DataFrame, universe of stocks
start: datetime, start date
end: datetime, end date
cols: list of str or None, specific columns to include
freq: int, aggregation frequency in minutes (default 30)
Returns:
DataFrame indexed by (iclose_ts, sid) with aggregated bar data.
See aggregate_bars() for column details.
File Dependencies:
- BAR_BASE_DIR/{date}/bars.txt.gz
Notes:
Uses aggregate_bars() internally for time aggregation.
Bar files are pipe-delimited with 24 columns of tick data.
"""
if cols is not None:
cols.extend( ['ticker', 'iclose_ts', 'iclose', 'date', 'gdate', 'giclose_ts'] )
date = start
result_dfs = list()
uni_df = uni_df.reset_index()
while ( date < end ):
dateStr = date.strftime("%Y%m%d")
bar_file = BAR_BASE_DIR + dateStr + "/bars.txt.gz"
print("Loading {}".format(bar_file))
try:
bars_df = pd.read_csv(bar_file, compression='gzip', sep="|", header=None, names=['ticker', 'bopen_ts', 'iclose_ts', 'bopen', 'bfirst', 'bhigh', 'blow', 'blast', 'iclose', 'bvwap', 'bvolume', 'btrades', 'meanSpread', 'meanEffectiveSpread', 'meanBidSize', 'meanAskSize', 'bidHitTrades', 'midHitTrades', 'askHitTrades', 'bidHitDollars', 'midHitDollars', 'askHitDollars', 'outsideTrades', 'outsideDollars'], converters={'iclose_ts': fromtimestamp, 'bopen_ts': fromtimestamp}, na_values=['-1'])
except IOError:
print("File not found: {}".format(bar_file))
date += timedelta(days=1)
continue
bars_df.set_index('iclose_ts', inplace=True)
bars_df = bars_df.between_time('09:30:00', '16:00:00')
bars_df = aggregate_bars(bars_df, freq)
bars_df['giclose_ts'] = bars_df['iclose_ts']
bars_df['date'] = bars_df['gdate'] = date
if cols is not None:
for col in bars_df.columns:
if col not in cols: del bars_df[col]
bars_df = pd.merge(uni_df[ uni_df['date'] == date ], bars_df, how='inner', left_on=['ticker'], right_on=['ticker'], sort=True, suffixes=['', '_dead'])
result_dfs.append(bars_df)
date += timedelta(days=1)
result_df = pd.concat(result_dfs)
result_df.set_index(keys=['iclose_ts', 'sid'], inplace=True)
result_df = remove_dup_cols(result_df)
return result_df
def load_earnings_dates(uni_df, start, end):
"""
Load upcoming earnings announcement dates for the universe.
Calculates business days until each stock's next earnings report,
adjusting for after-market announcements.
Args:
uni_df: DataFrame, universe of stocks
start: datetime, start date
end: datetime, end date
Returns:
DataFrame indexed by (date, sid) containing:
- earndate: earnings announcement date
- daysToEarn: business days until announcement
- earn_rpt_time_norm: announcement timing (AFTMKT, etc.)
File Dependencies:
- EARNINGS_BASE_DIR/{date}.csv
Notes:
After-market (AFTMKT) announcements add +1 to daysToEarn
since the price reaction occurs the next trading day.
"""
date = start
result_dfs = list()
uni_df = uni_df.reset_index()
while ( date < end ):
# year = date.strftime("%Y")
dateStr = date.strftime("%Y%m%d")
earnings_dir = EARNINGS_BASE_DIR
earnings_file = earnings_dir + "/" + dateStr + ".csv"
try:
df = pd.read_csv(earnings_file)
except IOError:
print("File not found: {}".format(earnings_file))
date += timedelta(days=1)
continue
df['date'] = date
df = pd.merge(uni_df[ uni_df['date'] == date ], df, how='inner', left_on=['sid'], right_on=['sid'], sort=True, suffixes=['', '_dead'])
result_dfs.append(df)
date += timedelta(days=1)
result_df = pd.concat(result_dfs)
result_df['earndate'] = pd.to_datetime(result_df['earn_rpt_date'].apply(str))
f = lambda x: len(pd.bdate_range(x['date'], x['earndate']))
result_df['daysToEarn'] = result_df.apply(f, axis=1)
result_df.loc[ result_df['earn_rpt_time_norm'] == "AFTMKT", 'daysToEarn' ] = result_df.loc[ result_df['earn_rpt_time_norm'] == "AFTMKT", 'daysToEarn' ] + 1
result_df.set_index(keys=['date', 'sid'], inplace=True)
result_df = remove_dup_cols(result_df)
return result_df
def load_past_earnings_dates(uni_df, start, end):
"""
Load most recent past earnings dates for the universe.
Calculates business days since each stock's last earnings report,
useful for post-earnings drift signals.
Args:
uni_df: DataFrame, universe of stocks
start: datetime, start date
end: datetime, end date
Returns:
DataFrame indexed by (date, sid) containing:
- latest_earnings_date: most recent earnings date
- daysFromEarn: business days since last announcement
File Dependencies:
- EARNINGS_BASE_DIR/{date}.latest_earnings_date.rev.csv
"""
date = start
result_dfs = list()
uni_df = uni_df.reset_index()
while ( date < end ):
# year = date.strftime("%Y")
dateStr = date.strftime("%Y%m%d")
earnings_dir = EARNINGS_BASE_DIR
earnings_file = earnings_dir + "/" + dateStr + ".latest_earnings_date.rev.csv"
print(earnings_file)
try:
df = pd.read_csv(earnings_file)
except IOError:
print("File not found: {}".format(earnings_file))
date += timedelta(days=1)
continue
df['date'] = date
df = pd.merge(uni_df[ uni_df['date'] == date ], df, how='inner', left_on=['sid'], right_on=['sid'], sort=True, suffixes=['', '_dead'])
result_dfs.append(df)
date += timedelta(days=1)
result_df = pd.concat(result_dfs)
result_df = result_df.dropna(subset=['latest_earnings_date'])
#ugh...
result_df['latest_earnings_date'] = result_df['latest_earnings_date'].apply(str).str.replace(r"\.0", "")
result_df['latest_earnings_date'] = pd.to_datetime(result_df['latest_earnings_date'])
f = lambda x: len(pd.bdate_range(x['date'], x['latest_earnings_date']))
result_df['daysFromEarn'] = result_df.apply(f, axis=1)
result_df.set_index(keys=['date', 'sid'], inplace=True)
result_df = remove_dup_cols(result_df)
return result_df
def load_locates(uni_df, start, end):
"""
Load short sale locate availability and borrow costs.
Loads data on shares available to borrow for short selling and
associated fee rates. Essential for realistic short-side simulation.
Args:
uni_df: DataFrame, universe of stocks
start: datetime, start date
end: datetime, end date
Returns:
DataFrame indexed by (date, sid) containing:
- borrow_qty: shares available to borrow (negative sign convention)
- fee_rate: annualized borrow fee rate
File Dependencies:
- LOCATES_BASE_DIR/{date}.csv
"""
date = start
result_dfs = list()
uni_df = uni_df.reset_index()
while ( date < end ):
# year = date.strftime("%Y")
dateStr = date.strftime("%Y%m%d")
locates_dir = LOCATES_BASE_DIR
locates_file = locates_dir + dateStr + ".csv"
try:
df = pd.read_csv(locates_file, usecols=['sid', 'qty', 'fee_rate'])
except IOError:
print("File not found: {}".format(locates_file))
date += timedelta(days=1)
continue
df['date'] = date
df = pd.merge(uni_df[ uni_df['date'] == date ], df, how='inner', left_on=['sid'], right_on=['sid'], sort=True, suffixes=['', '_dead'])
result_dfs.append(df)
date += timedelta(days=1)
result_df = pd.concat(result_dfs)
result_df.set_index(['date', 'sid'], inplace=True)
result_df['borrow_qty'] = -1 * result_df['qty']
del result_df['qty']
result_df = remove_dup_cols(result_df)
return result_df
def load_merged_results(fdirs, start, end, cols=None):
"""
Load and merge alpha results from multiple forecast directories.
Combines alpha signals from different strategy directories into
a single DataFrame, handling column conflicts with suffixes.
Args:
fdirs: list of str, directories containing alpha results
start: datetime, start date
end: datetime, end date
cols: list of str or None, specific columns to load
Returns:
DataFrame with merged alpha signals from all directories.
Duplicate columns (excluding _dead suffix) are removed.
"""
merged_df = None
for fdir in fdirs:
df = load_all_results(fdir, start, end, cols)
if merged_df is None:
merged_df = df
else:
merged_df = pd.merge(merged_df, df, left_index=True, right_index=True, suffixes=['', '_dead'])
merged_df = remove_dup_cols(merged_df)
return merged_df
def load_mus(mdir, fcast, start, end):
"""
Load alpha/mu forecasts from a specific forecast directory.
Loads intraday alpha signals generated by a specific strategy.
Only loads data for market hours (9:30-16:00).
Args:
mdir: str, base directory containing forecast subdirectories
fcast: str, forecast name to load
start: str or None, start date (YYYYMMDD format)
end: str or None, end date (YYYYMMDD format)
Returns:
DataFrame indexed by (iclose_ts, sid) with alpha forecasts.
File Pattern:
{mdir}/{fcast}/alpha.{fcast}.{date}_{time}.*
"""
print("Looking in {}".format(mdir))
fcast_dfs = list()
for ff in sorted(glob.glob(mdir + "/"+ fcast + "/alpha.*")):
m = re.match(r".*alpha\." + fcast + r"\.(\d{8})_(\d{4}).*", str(ff))
fdate = m.group(1)
ftime = m.group(2)
if int(ftime) <= 930 or int(ftime) >= 1600: continue
if start is not None:
if int(fdate) < int(start) or int(fdate) > int(end): continue
print("Loading {} for {}".format(ff, fdate) )
ts = dateparser.parse(fdate + " " + ftime)
df = pd.read_csv(ff, header=0, parse_dates=True, sep=",")
df['iclose_ts'] = ts
df.set_index(['iclose_ts', 'sid'], inplace=True)
fcast_dfs.append(df)
fcast_df = pd.concat(fcast_dfs, verify_integrity=True)
return fcast_df
def load_qb_implied_orders(mdir, start, end):
"""
Load implied order flow from quantbot portfolio files.
Extracts implied order sizes from open long/short amounts at
specific intraday time slices (every 30 min from 10:02 to 15:32).
Args:
mdir: str, directory containing portfolio files
start: str or None, start date (YYYYMMDD format)
end: str or None, end date (YYYYMMDD format)
Returns:
DataFrame indexed by (iclose_ts, sid) with:
- net_qty: current position
- open_order: implied order size from open amounts
- ref_price: reference price for calculations
File Pattern:
{mdir}/{date}.PORTFOLIO.csv
"""
print("Looking in {}".format(mdir))
fcast_dfs = list()
files = sorted(glob.glob(mdir + "/*.PORTFOLIO.csv"))
for ff in files:
m = re.match(r".*(\d{8}).PORTFOLIO.csv", str(ff))
fdate = m.group(1)
if start is not None:
if int(fdate) < int(start) or int(fdate) > int(end): continue
print("Loading {} for {}".format(ff, fdate) )
df = pd.read_csv(ff, header=0, parse_dates=True, sep=",", usecols=['time', 'sid', 'ref_price', 'net_qty', 'open_long_amt', 'open_short_amt'], index=['time', 'sid'])
df['open_order'] = (df['open_long_amt'] - df['open_short_amt']) / df['ref_price']
result_l = list()
for timeslice in ['10:02', '10:32', '11:02', '11:32', '12:02', '12:32', '13:02', '13:32', '14:02', '14:32', '15:02', '15:32' ]:
timeslice_df = df.unstack().between_time(timeslice, timeslice).stack()
result_l.append(timeslice_df)
df = pd.concat(result_l)
df['iclose_ts'] = pd.to_datetime(fdate + " " + df['time'])
df['iclose_ts'] = df['iclose_ts'] - timedelta(minutes=2)
df = df.reset_index().set_index(['iclose_ts', 'sid'])
fcast_dfs.append(df)
fcast_df = pd.concat(fcast_dfs, verify_integrity=True)
return fcast_df
def load_qb_positions(mdir, start, end):
"""
Load full intraday position history from quantbot portfolio files.
Unlike load_qb_implied_orders, this loads all timestamps without
filtering to specific time slices.
Args:
mdir: str, directory containing portfolio files
start: str or None, start date (YYYYMMDD format)
end: str or None, end date (YYYYMMDD format)
Returns:
DataFrame indexed by (iclose_ts, sid) with position data.
File Pattern:
{mdir}/{date}.PORTFOLIO.csv
"""
print("Looking in {}".format(mdir))
fcast_dfs = list()
files = sorted(glob.glob(mdir + "/*.csv"))
for ff in files:
m = re.match(r".*(\d{8}).PORTFOLIO.csv", str(ff))
fdate = m.group(1)
if start is not None:
if int(fdate) < int(start) or int(fdate) > int(end): continue
print("Loading {} for {}".format(ff, fdate) )
df = pd.read_csv(ff, header=0, parse_dates=True, sep=",", usecols=['time', 'sid', 'ref_price', 'net_qty', 'open_long_amt', 'open_short_amt'])
df['iclose_ts'] = pd.to_datetime(fdate + " " + df['time'])
df.set_index(['iclose_ts', 'sid'], inplace=True)
fcast_dfs.append(df)
fcast_df = pd.concat(fcast_dfs, verify_integrity=True)
return fcast_df
def load_qb_eods(mdir, start, end):
"""
Load end-of-day position and trading summary from quantbot.