Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,19 @@ All notable changes to ml4t-engineer are documented in this file.
Format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
Versioning follows [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.1.0b9] - 2026-06-21

### Added
- `FixedTickRunBarSampler` for stable, fixed-threshold tick run bars

### Fixed
- Public package metadata links now point to the `ml4t/engineer` repository and
published documentation
- Added the missing MIT license file to the source distribution
- Synced lockfile metadata with relaxed dependency bounds
- Reduced pytest warning noise for performance markers and cyclical feature
metadata
Comment on lines +13 to +19

## [0.1.0b8] - 2026-05-05

### Fixed
Expand Down Expand Up @@ -184,7 +197,11 @@ Initial public alpha release.
- `MLDatasetBuilder` for dataset construction
- `PreprocessingPipeline` for feature transformation

[Unreleased]: https://github.com/ml4t/engineer/compare/v0.1.0b5...HEAD
[Unreleased]: https://github.com/ml4t/engineer/compare/v0.1.0b9...HEAD
[0.1.0b9]: https://github.com/ml4t/engineer/compare/v0.1.0b8...v0.1.0b9
[0.1.0b8]: https://github.com/ml4t/engineer/compare/v0.1.0b7...v0.1.0b8
[0.1.0b7]: https://github.com/ml4t/engineer/compare/v0.1.0b6...v0.1.0b7
[0.1.0b6]: https://github.com/ml4t/engineer/compare/v0.1.0b5...v0.1.0b6
[0.1.0b5]: https://github.com/ml4t/engineer/compare/v0.1.0b4...v0.1.0b5
[0.1.0b4]: https://github.com/stefan-jansen/ml4t-engineer/compare/v0.1.0b3...v0.1.0b4
[0.1.0b3]: https://github.com/stefan-jansen/ml4t-engineer/compare/v0.1.0b2...v0.1.0b3
Expand Down
8 changes: 7 additions & 1 deletion src/ml4t/engineer/bars/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@
from ml4t.engineer.bars.imbalance import ImbalanceBarSampler as ImbalanceBarSamplerOriginal

# Import run bars (new implementation)
from ml4t.engineer.bars.run import DollarRunBarSampler, TickRunBarSampler, VolumeRunBarSampler
from ml4t.engineer.bars.run import (
DollarRunBarSampler,
FixedTickRunBarSampler,
TickRunBarSampler,
VolumeRunBarSampler,
)
from ml4t.engineer.bars.tick import TickBarSampler as TickBarSamplerOriginal

# Import vectorized implementations as default
Expand Down Expand Up @@ -72,6 +77,7 @@
"WindowVolumeImbalanceBarSampler",
# Run bars
"TickRunBarSampler",
"FixedTickRunBarSampler",
"VolumeRunBarSampler",
"DollarRunBarSampler",
# Original implementations (if needed)
Expand Down
242 changes: 242 additions & 0 deletions src/ml4t/engineer/bars/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -759,8 +759,250 @@ def _empty_run_bars_df(self) -> pl.DataFrame:
)


@jit(nopython=True, cache=True) # type: ignore[misc]
def _calculate_fixed_tick_run_bars_nb(
sides: npt.NDArray[np.float64],
threshold: float,
) -> tuple[
npt.NDArray[np.int64],
npt.NDArray[np.float64],
npt.NDArray[np.float64],
npt.NDArray[np.float64],
]:
"""Calculate fixed-threshold tick run bar indices.

Simple, stable algorithm with no adaptation. A bar forms when the larger
of the cumulative buy-tick and sell-tick counts within the bar reaches a
fixed threshold:

θ = max(Σ buys in bar, Σ sells in bar) >= threshold

Counts are cumulative within the bar and reset only at bar boundaries
(direction changes do NOT reset them), matching the AFML run definition.

Parameters
----------
sides : ndarray
Array of trade signs (+1 for buy, -1 for sell)
threshold : float
Fixed run threshold (bar forms when max(cum_buys, cum_sells) >= threshold)

Returns
-------
tuple of arrays
(bar_indices, thetas, cumulative_buys, cumulative_sells)
"""
n = len(sides)

bar_indices = []
thetas = []
cumulative_buys_out = []
cumulative_sells_out = []

cumulative_buys = 0.0
cumulative_sells = 0.0

for i in range(n):
if sides[i] > 0:
cumulative_buys += 1.0
else:
cumulative_sells += 1.0

theta = max(cumulative_buys, cumulative_sells)

if theta >= threshold:
bar_indices.append(i)
thetas.append(theta)
cumulative_buys_out.append(cumulative_buys)
cumulative_sells_out.append(cumulative_sells)

cumulative_buys = 0.0
cumulative_sells = 0.0

return (
np.array(bar_indices, dtype=np.int64),
np.array(thetas, dtype=np.float64),
np.array(cumulative_buys_out, dtype=np.float64),
np.array(cumulative_sells_out, dtype=np.float64),
)


class FixedTickRunBarSampler(BarSampler):
"""Sample tick run bars using a fixed run threshold.

Unlike the adaptive AFML algorithm (:class:`TickRunBarSampler`), this uses
a fixed threshold that does not change during sampling. This avoids the
threshold-spiral instability that occurs with the adaptive algorithm when
order flow is persistently one-sided.

**Recommended for production use** — more stable and predictable than the
adaptive version (mirrors :class:`FixedTickImbalanceBarSampler`).

A bar forms when the larger of the cumulative buy-tick and sell-tick counts
within the bar reaches the threshold:

θ = max(Σ buys in bar, Σ sells in bar) >= threshold

Parameters
----------
threshold : int
Fixed run threshold. Bar forms when max(cum_buys, cum_sells) >= threshold.
Typical values: 20-500 depending on desired bar frequency.

Calibration
-----------
To calibrate threshold for N bars per day, test a range and pick the
threshold giving the desired bar count; larger thresholds yield fewer bars.

Examples
--------
>>> sampler = FixedTickRunBarSampler(threshold=50)
>>> bars = sampler.sample(tick_data)

Notes
-----
Advantages over the adaptive (AFML) algorithm:

- No threshold spiral with imbalanced order flow
- Predictable bar count based on run statistics
- No feedback loops — stable by construction
- Works consistently across all market conditions

References
----------
.. [1] López de Prado, M. (2018). Advances in Financial Machine Learning.
John Wiley & Sons. Chapter 2.3: Information-Driven Bars.
"""

def __init__(self, threshold: int):
"""Initialize fixed tick run bar sampler.

Parameters
----------
threshold : int
Fixed run threshold (positive integer)
"""
if threshold <= 0:
raise ValueError("threshold must be positive")

self.threshold = threshold

def sample(
self,
data: pl.DataFrame,
include_incomplete: bool = False,
) -> pl.DataFrame:
"""Sample fixed tick run bars from data.

Parameters
----------
data : pl.DataFrame
Tick data with columns: timestamp, price, volume, side
include_incomplete : bool, default False
Whether to include incomplete final bar

Returns
-------
pl.DataFrame
Sampled tick run bars
"""
self._validate_data(data)

if "side" not in data.columns:
raise DataValidationError("Run bars require 'side' column")

if len(data) == 0:
return self._empty_bars_df()

volumes = data["volume"].to_numpy().astype(np.float64)
sides = data["side"].to_numpy().astype(np.float64)

bar_indices, thetas, cumulative_buys, cumulative_sells = _calculate_fixed_tick_run_bars_nb(
sides, float(self.threshold)
)

bars = []
start_idx = 0

for i, end_idx in enumerate(bar_indices):
bar_ticks = data.slice(start_idx, end_idx - start_idx + 1)
bar_volumes = volumes[start_idx : end_idx + 1]
bar_sides = sides[start_idx : end_idx + 1]

buy_volume = float(np.sum(bar_volumes[bar_sides > 0]))
sell_volume = float(np.sum(bar_volumes[bar_sides < 0]))

bar = self._create_ohlcv_bar(
bar_ticks,
additional_cols={
"buy_volume": buy_volume,
"sell_volume": sell_volume,
"run_length": int(thetas[i]),
"theta": float(thetas[i]),
"cumulative_buys": float(cumulative_buys[i]),
"cumulative_sells": float(cumulative_sells[i]),
"threshold": float(self.threshold),
},
)
bars.append(bar)
start_idx = end_idx + 1

# Handle incomplete final bar
if include_incomplete and start_idx < len(data):
bar_ticks = data.slice(start_idx)
if len(bar_ticks) > 0:
bar_volumes = volumes[start_idx:]
bar_sides = sides[start_idx:]

buy_volume = float(np.sum(bar_volumes[bar_sides > 0]))
sell_volume = float(np.sum(bar_volumes[bar_sides < 0]))
cum_buys = float(np.sum(bar_sides > 0))
cum_sells = float(np.sum(bar_sides < 0))

bar = self._create_ohlcv_bar(
bar_ticks,
additional_cols={
"buy_volume": buy_volume,
"sell_volume": sell_volume,
"run_length": int(max(cum_buys, cum_sells)),
"theta": float(max(cum_buys, cum_sells)),
"cumulative_buys": cum_buys,
"cumulative_sells": cum_sells,
"threshold": float(self.threshold),
},
)
bars.append(bar)

if not bars:
return self._empty_bars_df()

return pl.DataFrame(bars)

def _empty_bars_df(self) -> pl.DataFrame:
"""Return empty DataFrame with correct schema."""
return pl.DataFrame(
{
"timestamp": [],
"open": [],
"high": [],
"low": [],
"close": [],
"volume": [],
"tick_count": [],
"buy_volume": [],
"sell_volume": [],
"run_length": [],
"theta": [],
"cumulative_buys": [],
"cumulative_sells": [],
"threshold": [],
},
)


__all__ = [
"DollarRunBarSampler",
"FixedTickRunBarSampler",
"TickRunBarSampler",
"VolumeRunBarSampler",
]
81 changes: 81 additions & 0 deletions tests/bars/test_run_bars.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from ml4t.engineer.bars.run import (
DollarRunBarSampler,
FixedTickRunBarSampler,
TickRunBarSampler,
VolumeRunBarSampler,
_calculate_run_bars_nb,
Expand Down Expand Up @@ -679,3 +680,83 @@ def test_cumulative_not_reset_on_direction_change(self):
# The theta should be 5 or more (cumulative buys)
if len(thetas) > 0:
assert thetas[0] >= 5


class TestFixedTickRunBarSampler:
"""Tests for the stable, fixed-threshold run bar sampler."""

def test_init_valid(self):
sampler = FixedTickRunBarSampler(threshold=50)
assert sampler.threshold == 50

def test_init_invalid_threshold_zero(self):
with pytest.raises(ValueError, match="threshold must be positive"):
FixedTickRunBarSampler(threshold=0)

def test_init_invalid_threshold_negative(self):
with pytest.raises(ValueError, match="threshold must be positive"):
FixedTickRunBarSampler(threshold=-10)

def test_sample_missing_side(self, sample_tick_data):
sampler = FixedTickRunBarSampler(threshold=10)
data = sample_tick_data.drop("side")
with pytest.raises(DataValidationError, match="side"):
sampler.sample(data)

def test_sample_empty_data(self):
sampler = FixedTickRunBarSampler(threshold=10)
empty = pl.DataFrame({"timestamp": [], "price": [], "volume": [], "side": []})
bars = sampler.sample(empty)
assert len(bars) == 0

def test_sample_basic(self, sample_tick_data):
sampler = FixedTickRunBarSampler(threshold=10)
bars = sampler.sample(sample_tick_data)
assert len(bars) >= 1
for col in ("open", "high", "low", "close", "volume", "theta", "threshold"):
assert col in bars.columns
# Every completed bar must have reached the threshold.
assert (bars["theta"] >= 10).all()
assert (bars["threshold"] == 10).all()

def test_threshold_controls_count_monotonically(self, large_tick_data):
"""Larger threshold => fewer (or equal) bars. Predictable, no spiral."""
few = FixedTickRunBarSampler(threshold=100).sample(large_tick_data)
many = FixedTickRunBarSampler(threshold=20).sample(large_tick_data)
assert len(many) >= len(few)

def test_stable_on_one_sided_flow(self):
"""A persistently one-sided stream must NOT threshold-spiral.

With an all-buy stream and threshold T, a bar forms every T ticks, so
the count is deterministic: floor(n / T).
"""
n = 1000
timestamps = [datetime(2024, 1, 1, 9, 30, i // 60, (i % 60) * 1000) for i in range(n)]
data = pl.DataFrame(
{
"timestamp": timestamps,
"price": [100.0 + i * 0.01 for i in range(n)],
"volume": [10.0] * n,
"side": [1] * n,
}
)
bars = FixedTickRunBarSampler(threshold=50).sample(data)
assert len(bars) == n // 50 # exactly 20 — bounded and predictable

def test_cumulative_not_consecutive(self, alternating_sides_data):
"""Counts are cumulative within a bar (do not reset on direction change)."""
# 50 alternating ticks => 25 buys, 25 sells. With threshold=25, one bar
# forms once cumulative buys (or sells) reaches 25, at the last tick.
bars = FixedTickRunBarSampler(threshold=25).sample(alternating_sides_data)
assert len(bars) == 1
assert bars["theta"][0] >= 25

def test_include_incomplete(self, sample_tick_data):
complete = FixedTickRunBarSampler(threshold=10).sample(
sample_tick_data, include_incomplete=False
)
with_incomplete = FixedTickRunBarSampler(threshold=10).sample(
sample_tick_data, include_incomplete=True
)
assert len(with_incomplete) >= len(complete)