From 31d18f37629b26971d69b1d48f8a635baa786b7a Mon Sep 17 00:00:00 2001 From: Joseph Scorsone Date: Mon, 29 Dec 2025 14:34:13 -0500 Subject: [PATCH] Improvements from previous PR. Adhere to QC code conventions better. More robust code. Refactored and properly passing tests. --- .../DataBentoDataDownloaderTests.cs | 221 +++++----- .../DataBentoDataProviderHistoryTests.cs | 170 +++----- .../DataBentoRawLiveClientTests.cs | 227 +++------- ...tConnect.DataSource.DataBento.Tests.csproj | 4 +- QuantConnect.DataBento.Tests/TestSetup.cs | 17 - QuantConnect.DataBento.Tests/config.json | 2 +- .../DataBentoDataDownloader.cs | 332 ++++++--------- .../DataBentoDataProvider.cs | 383 +++++++---------- .../DataBentoHistoryProivder.cs | 85 +--- .../DataBentoRawLiveClient.cs | 402 ++++++++---------- .../DataBentoSymbolMapper.cs | 174 ++++++++ .../QuantConnect.DataSource.DataBento.csproj | 4 + models/DataBentoTypes.cs | 112 +++++ 13 files changed, 1012 insertions(+), 1121 deletions(-) create mode 100644 QuantConnect.DataBento/DataBentoSymbolMapper.cs create mode 100644 models/DataBentoTypes.cs diff --git a/QuantConnect.DataBento.Tests/DataBentoDataDownloaderTests.cs b/QuantConnect.DataBento.Tests/DataBentoDataDownloaderTests.cs index d9d8071..a5063df 100644 --- a/QuantConnect.DataBento.Tests/DataBentoDataDownloaderTests.cs +++ b/QuantConnect.DataBento.Tests/DataBentoDataDownloaderTests.cs @@ -17,11 +17,13 @@ using System; using System.Linq; using NUnit.Framework; +using QuantConnect.Configuration; using QuantConnect.Data; using QuantConnect.Data.Market; using QuantConnect.Lean.DataSource.DataBento; using QuantConnect.Logging; -using QuantConnect.Configuration; +using QuantConnect.Securities; +using QuantConnect.Util; namespace QuantConnect.Lean.DataSource.DataBento.Tests { @@ -29,12 +31,20 @@ namespace QuantConnect.Lean.DataSource.DataBento.Tests public class DataBentoDataDownloaderTests { private DataBentoDataDownloader _downloader; - private readonly string _apiKey = Config.Get("databento-api-key"); + private MarketHoursDatabase _marketHoursDatabase; + protected readonly string ApiKey = Config.Get("databento-api-key"); + + private static Symbol CreateEsFuture() + { + var expiration = new DateTime(2026, 3, 20); + return Symbol.CreateFuture("ES", Market.CME, expiration); + } [SetUp] public void SetUp() { - _downloader = new DataBentoDataDownloader(_apiKey); + _marketHoursDatabase = MarketHoursDatabase.FromDataFolder(); + _downloader = new DataBentoDataDownloader(ApiKey, _marketHoursDatabase); } [TearDown] @@ -43,141 +53,154 @@ public void TearDown() _downloader?.Dispose(); } - [Test] - [TestCase("ESM3", SecurityType.Future, Market.CME, Resolution.Daily, TickType.Trade)] - [TestCase("ESM3", SecurityType.Future, Market.CME, Resolution.Hour, TickType.Trade)] - [TestCase("ESM3", SecurityType.Future, Market.CME, Resolution.Minute, TickType.Trade)] - [TestCase("ESM3", SecurityType.Future, Market.CME, Resolution.Second, TickType.Trade)] - [TestCase("ESM3", SecurityType.Future, Market.CME, Resolution.Tick, TickType.Trade)] - [Explicit("This test requires a configured DataBento API key")] - public void DownloadsHistoricalData(string ticker, SecurityType securityType, string market, Resolution resolution, TickType tickType) + [TestCase(Resolution.Daily)] + [TestCase(Resolution.Hour)] + [TestCase(Resolution.Minute)] + [TestCase(Resolution.Second)] + [TestCase(Resolution.Tick)] + public void DownloadsTradeDataForLeanFuture(Resolution resolution) { - var symbol = Symbol.Create(ticker, securityType, market); - var startTime = new DateTime(2024, 1, 15); - var endTime = new DateTime(2024, 1, 16); - var param = new DataDownloaderGetParameters(symbol, resolution, startTime, endTime, tickType); + var symbol = CreateEsFuture(); + var exchangeTimeZone = _marketHoursDatabase.GetExchangeHours(symbol.ID.Market, symbol, symbol.SecurityType).TimeZone; - var downloadResponse = _downloader.Get(param).ToList(); + var startUtc = new DateTime(2024, 5, 1, 0, 0, 0, DateTimeKind.Utc); + var endUtc = new DateTime(2024, 5, 2, 0, 0, 0, DateTimeKind.Utc); - Log.Trace($"Downloaded {downloadResponse.Count} data points for {symbol} at {resolution} resolution"); + if (resolution == Resolution.Tick) + { + startUtc = new DateTime(2024, 5, 1, 9, 30, 0, DateTimeKind.Utc); + endUtc = startUtc.AddMinutes(15); + } - Assert.IsTrue(downloadResponse.Any(), "Expected to download at least one data point"); + var parameters = new DataDownloaderGetParameters( + symbol, + resolution, + startUtc, + endUtc, + TickType.Trade + ); - foreach (var data in downloadResponse) + var data = _downloader.Get(parameters).ToList(); + + Log.Trace($"Downloaded {data.Count} trade points for {symbol} @ {resolution}"); + + Assert.IsNotEmpty(data); + + var startExchange = startUtc.ConvertFromUtc(exchangeTimeZone); + var endExchange = endUtc.ConvertFromUtc(exchangeTimeZone); + + foreach (var point in data) { - Assert.IsNotNull(data, "Data point should not be null"); - Assert.AreEqual(symbol, data.Symbol, "Symbol should match requested symbol"); - Assert.IsTrue(data.Time >= startTime && data.Time <= endTime, "Data time should be within requested range"); + Assert.AreEqual(symbol, point.Symbol); + Assert.That(point.Time, Is.InRange(startExchange, endExchange)); - if (data is TradeBar tradeBar) + switch (point) { - Assert.Greater(tradeBar.Close, 0, "Close price should be positive"); - Assert.GreaterOrEqual(tradeBar.Volume, 0, "Volume should be non-negative"); - Assert.Greater(tradeBar.High, 0, "High price should be positive"); - Assert.Greater(tradeBar.Low, 0, "Low price should be positive"); - Assert.Greater(tradeBar.Open, 0, "Open price should be positive"); - Assert.GreaterOrEqual(tradeBar.High, tradeBar.Low, "High should be >= Low"); - Assert.GreaterOrEqual(tradeBar.High, tradeBar.Open, "High should be >= Open"); - Assert.GreaterOrEqual(tradeBar.High, tradeBar.Close, "High should be >= Close"); - Assert.LessOrEqual(tradeBar.Low, tradeBar.Open, "Low should be <= Open"); - Assert.LessOrEqual(tradeBar.Low, tradeBar.Close, "Low should be <= Close"); - } - else if (data is QuoteBar quoteBar) - { - Assert.Greater(quoteBar.Close, 0, "Quote close price should be positive"); - if (quoteBar.Bid != null) - { - Assert.Greater(quoteBar.Bid.Close, 0, "Bid price should be positive"); - } - if (quoteBar.Ask != null) - { - Assert.Greater(quoteBar.Ask.Close, 0, "Ask price should be positive"); - } - } - else if (data is Tick tick) - { - Assert.Greater(tick.Value, 0, "Tick value should be positive"); - Assert.GreaterOrEqual(tick.Quantity, 0, "Tick quantity should be non-negative"); + case TradeBar bar: + Assert.Greater(bar.Open, 0); + Assert.Greater(bar.High, 0); + Assert.Greater(bar.Low, 0); + Assert.Greater(bar.Close, 0); + Assert.GreaterOrEqual(bar.Volume, 0); + Assert.GreaterOrEqual(bar.High, bar.Low); + break; + + case Tick tick: + Assert.Greater(tick.Value, 0); + Assert.GreaterOrEqual(tick.Quantity, 0); + break; + + default: + Assert.Fail($"Unexpected data type {point.GetType()}"); + break; } } } [Test] - [TestCase("ZNM3", SecurityType.Future, Market.CME, Resolution.Daily, TickType.Trade)] - [TestCase("ZNM3", SecurityType.Future, Market.CME, Resolution.Hour, TickType.Trade)] - [Explicit("This test requires a configured DataBento API key")] - public void DownloadsFuturesHistoricalData(string ticker, SecurityType securityType, string market, Resolution resolution, TickType tickType) + public void DownloadsQuoteTicksForLeanFuture() { - var symbol = Symbol.Create(ticker, securityType, market); - var startTime = new DateTime(2024, 1, 15); - var endTime = new DateTime(2024, 1, 16); - var param = new DataDownloaderGetParameters(symbol, resolution, startTime, endTime, tickType); - - var downloadResponse = _downloader.Get(param).ToList(); + var symbol = CreateEsFuture(); + var exchangeTimeZone = _marketHoursDatabase.GetExchangeHours(symbol.ID.Market, symbol, symbol.SecurityType).TimeZone; - Log.Trace($"Downloaded {downloadResponse.Count} data points for futures {symbol}"); + var startUtc = new DateTime(2024, 5, 1, 9, 30, 0, DateTimeKind.Utc); + var endUtc = startUtc.AddMinutes(15); - Assert.IsTrue(downloadResponse.Any(), "Expected to download futures data"); + var parameters = new DataDownloaderGetParameters( + symbol, + Resolution.Tick, + startUtc, + endUtc, + TickType.Quote + ); - foreach (var data in downloadResponse) - { - Assert.AreEqual(symbol, data.Symbol, "Symbol should match requested futures symbol"); - Assert.Greater(data.Value, 0, "Data value should be positive"); - } - } - - [Test] - [TestCase("ESM3", SecurityType.Future, Market.CME, Resolution.Tick, TickType.Quote)] - [Explicit("This test requires a configured DataBento API key and advanced subscription")] - public void DownloadsQuoteData(string ticker, SecurityType securityType, string market, Resolution resolution, TickType tickType) - { - var symbol = Symbol.Create(ticker, securityType, market); - var startTime = new DateTime(2024, 1, 15, 9, 30, 0); - var endTime = new DateTime(2024, 1, 15, 9, 45, 0); - var param = new DataDownloaderGetParameters(symbol, resolution, startTime, endTime, tickType); + var data = _downloader.Get(parameters).ToList(); - var downloadResponse = _downloader.Get(param).ToList(); + Log.Trace($"Downloaded {data.Count} quote ticks for {symbol}"); - Log.Trace($"Downloaded {downloadResponse.Count} quote data points for {symbol}"); + Assert.IsNotEmpty(data); - Assert.IsTrue(downloadResponse.Any(), "Expected to download quote data"); + var startExchange = startUtc.ConvertFromUtc(exchangeTimeZone); + var endExchange = endUtc.ConvertFromUtc(exchangeTimeZone); - foreach (var data in downloadResponse) + foreach (var point in data) { - Assert.AreEqual(symbol, data.Symbol, "Symbol should match requested symbol"); - if (data is QuoteBar quoteBar) + Assert.AreEqual(symbol, point.Symbol); + Assert.That(point.Time, Is.InRange(startExchange, endExchange)); + + if (point is Tick tick) + { + Assert.AreEqual(TickType.Quote, tick.TickType); + Assert.IsTrue( + tick.BidPrice > 0 || tick.AskPrice > 0, + "Quote tick must have bid or ask" + ); + } + else if (point is QuoteBar bar) { - Assert.IsTrue(quoteBar.Bid != null || quoteBar.Ask != null, "Quote should have bid or ask data"); + Assert.IsTrue(bar.Bid != null || bar.Ask != null); } } } [Test] - [Explicit("This test requires a configured DataBento API key")] public void DataIsSortedByTime() { - var symbol = Symbol.Create("ESM3", SecurityType.Future, Market.CME); - var startTime = new DateTime(2024, 1, 15); - var endTime = new DateTime(2024, 1, 16); - var param = new DataDownloaderGetParameters(symbol, Resolution.Minute, startTime, endTime, TickType.Trade); + var symbol = CreateEsFuture(); - var downloadResponse = _downloader.Get(param).ToList(); + var startUtc = new DateTime(2024, 5, 1, 0, 0, 0, DateTimeKind.Utc); + var endUtc = new DateTime(2024, 5, 2, 0, 0, 0, DateTimeKind.Utc); - Assert.IsTrue(downloadResponse.Any(), "Expected to download data for time sorting test"); + var parameters = new DataDownloaderGetParameters( + symbol, + Resolution.Minute, + startUtc, + endUtc, + TickType.Trade + ); - for (int i = 1; i < downloadResponse.Count; i++) + var data = _downloader.Get(parameters).ToList(); + + Assert.IsNotEmpty(data); + + for (int i = 1; i < data.Count; i++) { - Assert.GreaterOrEqual(downloadResponse[i].Time, downloadResponse[i - 1].Time, - $"Data should be sorted by time. Item {i} time {downloadResponse[i].Time} should be >= item {i - 1} time {downloadResponse[i - 1].Time}"); + Assert.GreaterOrEqual( + data[i].Time, + data[i - 1].Time, + $"Data not sorted at index {i}" + ); } } [Test] - public void DisposesCorrectly() + public void DisposeIsIdempotent() { - var downloader = new DataBentoDataDownloader(); - Assert.DoesNotThrow(() => downloader.Dispose(), "Dispose should not throw"); - Assert.DoesNotThrow(() => downloader.Dispose(), "Multiple dispose calls should not throw"); + var downloader = new DataBentoDataDownloader(ApiKey, + MarketHoursDatabase.FromDataFolder()); + + Assert.DoesNotThrow(downloader.Dispose); + Assert.DoesNotThrow(downloader.Dispose); } } } diff --git a/QuantConnect.DataBento.Tests/DataBentoDataProviderHistoryTests.cs b/QuantConnect.DataBento.Tests/DataBentoDataProviderHistoryTests.cs index 2da5b8f..94c78f8 100644 --- a/QuantConnect.DataBento.Tests/DataBentoDataProviderHistoryTests.cs +++ b/QuantConnect.DataBento.Tests/DataBentoDataProviderHistoryTests.cs @@ -32,12 +32,19 @@ namespace QuantConnect.Lean.DataSource.DataBento.Tests public class DataBentoDataProviderHistoryTests { private DataBentoProvider _historyDataProvider; - private readonly string _apiKey = Config.Get("databento-api-key"); + private MarketHoursDatabase _marketHoursDatabase; + protected readonly string ApiKey = Config.Get("databento-api-key"); + + private static Symbol CreateEsFuture() + { + var expiration = new DateTime(2026, 3, 20); + return Symbol.CreateFuture("ES", Market.CME, expiration); + } [SetUp] public void SetUp() { - _historyDataProvider = new DataBentoProvider(_apiKey); + _historyDataProvider = new DataBentoProvider(); } [TearDown] @@ -50,150 +57,99 @@ internal static IEnumerable TestParameters { get { + var es = CreateEsFuture(); - // DataBento futures - var esMini = Symbol.Create("ESM3", SecurityType.Future, Market.CME); - var znNote = Symbol.Create("ZNM3", SecurityType.Future, Market.CME); - var gcGold = Symbol.Create("GCM3", SecurityType.Future, Market.CME); - - // test cases for supported futures - yield return new TestCaseData(esMini, Resolution.Daily, TickType.Trade, TimeSpan.FromDays(5), false) - .SetDescription("Valid ES futures - Daily resolution, 5 days period") - .SetCategory("Valid"); - - yield return new TestCaseData(esMini, Resolution.Hour, TickType.Trade, TimeSpan.FromDays(2), false) - .SetDescription("Valid ES futures - Hour resolution, 2 days period") - .SetCategory("Valid"); - - yield return new TestCaseData(esMini, Resolution.Minute, TickType.Trade, TimeSpan.FromHours(4), false) - .SetDescription("Valid ES futures - Minute resolution, 4 hours period") + yield return new TestCaseData(es, Resolution.Daily, TickType.Trade, TimeSpan.FromDays(5), false) + .SetDescription("ES futures daily trade history") .SetCategory("Valid"); - yield return new TestCaseData(znNote, Resolution.Daily, TickType.Trade, TimeSpan.FromDays(3), false) - .SetDescription("Valid ZN futures - Daily resolution, 3 days period") + yield return new TestCaseData(es, Resolution.Hour, TickType.Trade, TimeSpan.FromDays(2), false) + .SetDescription("ES futures hourly trade history") .SetCategory("Valid"); - yield return new TestCaseData(gcGold, Resolution.Hour, TickType.Trade, TimeSpan.FromDays(1), false) - .SetDescription("Valid GC futures - Hour resolution, 1 day period") + yield return new TestCaseData(es, Resolution.Minute, TickType.Trade, TimeSpan.FromHours(4), false) + .SetDescription("ES futures minute trade history") .SetCategory("Valid"); - // Test cases for quote data (may require advanced subscription) - yield return new TestCaseData(esMini, Resolution.Tick, TickType.Quote, TimeSpan.FromMinutes(15), false) - .SetDescription("ES futures quote data - Tick resolution") + yield return new TestCaseData(es, Resolution.Tick, TickType.Quote, TimeSpan.FromMinutes(15), false) + .SetDescription("ES futures quote ticks") .SetCategory("Quote"); - - // Unsupported security types - var equity = Symbol.Create("SPY", SecurityType.Equity, Market.USA); - var option = Symbol.Create("SPY", SecurityType.Option, Market.USA); - - yield return new TestCaseData(equity, Resolution.Daily, TickType.Trade, TimeSpan.FromDays(5), true) - .SetDescription("Invalid - Equity not supported by DataBento") - .SetCategory("Invalid"); - - yield return new TestCaseData(option, Resolution.Daily, TickType.Trade, TimeSpan.FromDays(5), true) - .SetDescription("Invalid - Option not supported by DataBento") - .SetCategory("Invalid"); } } [Test, TestCaseSource(nameof(TestParameters))] - [Explicit("This test requires a configured DataBento API key")] public void GetsHistory(Symbol symbol, Resolution resolution, TickType tickType, TimeSpan period, bool expectsNoData) { var request = GetHistoryRequest(resolution, tickType, symbol, period); - try + var history = _historyDataProvider.GetHistory(request); + + if (expectsNoData) { - var slices = _historyDataProvider.GetHistory(request)?.Select(data => new Slice(data.Time, new[] { data }, data.Time.ConvertToUtc(request.DataTimeZone))).ToList(); + Assert.IsTrue(history == null || !history.Any(), + $"Expected no data for unsupported symbol: {symbol}"); + return; + } - if (expectsNoData) - { - Assert.IsTrue(slices == null || !slices.Any(), - $"Expected no data for unsupported symbol/security type: {symbol}"); - } - else + Assert.IsNotNull(history); + var data = history.ToList(); + Assert.IsNotEmpty(data); + + Log.Trace($"Received {data.Count} data points for {symbol} @ {resolution}"); + + foreach (var point in data.Take(5)) + { + Assert.AreEqual(symbol, point.Symbol); + + if (point is TradeBar bar) { - Assert.IsNotNull(slices, "Expected to receive history data"); - - if (slices.Any()) - { - Log.Trace($"Received {slices.Count} slices for {symbol} at {resolution} resolution"); - - foreach (var slice in slices.Take(5)) // Check first 5 slices - { - Assert.IsNotNull(slice, "Slice should not be null"); - Assert.IsTrue(slice.Time >= request.StartTimeUtc && slice.Time <= request.EndTimeUtc, - "Slice time should be within requested range"); - - if (slice.Bars.ContainsKey(symbol)) - { - var bar = slice.Bars[symbol]; - Assert.Greater(bar.Close, 0, "Bar close price should be positive"); - Assert.GreaterOrEqual(bar.Volume, 0, "Bar volume should be non-negative"); - } - } - } + Assert.Greater(bar.Close, 0); + Assert.GreaterOrEqual(bar.Volume, 0); } - } - catch (Exception ex) - { - Log.Error($"Error getting history for {symbol}: {ex.Message}"); - if (!expectsNoData) + if (point is Tick tick && tickType == TickType.Quote) { - throw; + Assert.IsTrue(tick.BidPrice > 0 || tick.AskPrice > 0); } } } [Test] - [Explicit("This test requires a configured DataBento API key")] public void GetHistoryWithMultipleSymbols() { - var symbol1 = Symbol.Create("ESM3", SecurityType.Future, Market.CME); - var symbol2 = Symbol.Create("ZNM3", SecurityType.Future, Market.CME); - - var request1 = GetHistoryRequest(Resolution.Daily, TickType.Trade, symbol1, TimeSpan.FromDays(3)); - var request2 = GetHistoryRequest(Resolution.Daily, TickType.Trade, symbol2, TimeSpan.FromDays(3)); - - var history1 = _historyDataProvider.GetHistory(request1); - var history2 = _historyDataProvider.GetHistory(request2); + var es = CreateEsFuture(); - var allData = new List(); - if (history1 != null) allData.AddRange(history1); - if (history2 != null) allData.AddRange(history2); + var request = GetHistoryRequest(Resolution.Daily, TickType.Trade, es, TimeSpan.FromDays(3)); - // timezone from the first request - var slices = allData.GroupBy(d => d.Time) - .Select(g => new Slice(g.Key, g.ToList(), g.Key.ConvertToUtc(request1.DataTimeZone))) - .ToList(); + var history = _historyDataProvider.GetHistory(request)?.ToList(); - Assert.IsNotNull(slices, "Expected to receive history data for multiple symbols"); - - if (slices.Any()) - { - Log.Trace($"Received {slices.Count} slices for multiple symbols"); - - var hasSymbol1Data = slices.Any(s => s.Bars.ContainsKey(symbol1)); - var hasSymbol2Data = slices.Any(s => s.Bars.ContainsKey(symbol2)); - - Assert.IsTrue(hasSymbol1Data || hasSymbol2Data, - "Expected data for at least one of the requested symbols"); - } + Assert.IsTrue( + history != null && history.Any(), + "Expected history for ES" + ); } - internal static HistoryRequest GetHistoryRequest(Resolution resolution, TickType tickType, Symbol symbol, TimeSpan period) + internal static HistoryRequest GetHistoryRequest( + Resolution resolution, + TickType tickType, + Symbol symbol, + TimeSpan period) { - var utcNow = DateTime.UtcNow; + var endUtc = new DateTime(2024, 5, 10, 0, 0, 0, DateTimeKind.Utc); + var startUtc = endUtc - period; + var dataType = LeanData.GetDataType(resolution, tickType); var marketHoursDatabase = MarketHoursDatabase.FromDataFolder(); - var exchangeHours = marketHoursDatabase.GetExchangeHours(symbol.ID.Market, symbol, symbol.SecurityType); - var dataTimeZone = marketHoursDatabase.GetDataTimeZone(symbol.ID.Market, symbol, symbol.SecurityType); + var exchangeHours = marketHoursDatabase.GetExchangeHours( + symbol.ID.Market, symbol, symbol.SecurityType); + + var dataTimeZone = marketHoursDatabase.GetDataTimeZone( + symbol.ID.Market, symbol, symbol.SecurityType); return new HistoryRequest( - startTimeUtc: utcNow.Add(-period), - endTimeUtc: utcNow, + startTimeUtc: startUtc, + endTimeUtc: endUtc, dataType: dataType, symbol: symbol, resolution: resolution, @@ -204,7 +160,7 @@ internal static HistoryRequest GetHistoryRequest(Resolution resolution, TickType isCustomData: false, DataNormalizationMode.Raw, tickType: tickType - ); + ); } } } diff --git a/QuantConnect.DataBento.Tests/DataBentoRawLiveClientTests.cs b/QuantConnect.DataBento.Tests/DataBentoRawLiveClientTests.cs index 7df71a4..22e7f04 100644 --- a/QuantConnect.DataBento.Tests/DataBentoRawLiveClientTests.cs +++ b/QuantConnect.DataBento.Tests/DataBentoRawLiveClientTests.cs @@ -16,26 +16,32 @@ using System; using System.Threading; -using System.Threading.Tasks; using NUnit.Framework; +using QuantConnect.Configuration; using QuantConnect.Data; using QuantConnect.Data.Market; using QuantConnect.Lean.DataSource.DataBento; using QuantConnect.Logging; -using QuantConnect.Configuration; namespace QuantConnect.Lean.DataSource.DataBento.Tests { [TestFixture] - public class DataBentoRawLiveClientTests + public class DataBentoRawLiveClientSyncTests { - private DatabentoRawClient _client; - private readonly string _apiKey = Config.Get("databento-api-key"); + private DataBentoRawLiveClient _client; + protected readonly string ApiKey = Config.Get("databento-api-key"); + + private static Symbol CreateEsFuture() + { + var expiration = new DateTime(2026, 3, 20); + return Symbol.CreateFuture("ES", Market.CME, expiration); + } [SetUp] public void SetUp() { - _client = new DatabentoRawClient(_apiKey); + Log.Trace("DataBentoLiveClientTests: Using API Key: " + ApiKey); + _client = new DataBentoRawLiveClient(ApiKey); } [TearDown] @@ -45,208 +51,97 @@ public void TearDown() } [Test] - [Explicit("This test requires a configured DataBento API key and live connection")] - public async Task ConnectsToGateway() + public void Connects() { - if (string.IsNullOrEmpty(_apiKey)) - { - Assert.Ignore("DataBento API key not configured"); - return; - } - - var connected = await _client.ConnectAsync(); + var connected = _client.Connect(); - Assert.IsTrue(connected, "Should successfully connect to DataBento gateway"); - Assert.IsTrue(_client.IsConnected, "IsConnected should return true after successful connection"); + Assert.IsTrue(connected); + Assert.IsTrue(_client.IsConnected); - Log.Trace("Successfully connected to DataBento gateway"); + Log.Trace("Connected successfully"); } [Test] - [Explicit("This test requires a configured DataBento API key and live connection")] - public async Task SubscribesToSymbol() + public void SubscribesToLeanFutureSymbol() { - if (string.IsNullOrEmpty(_apiKey)) - { - Assert.Ignore("DataBento API key not configured"); - return; - } - - var connected = await _client.ConnectAsync(); - Assert.IsTrue(connected, "Must be connected to test subscription"); + Assert.IsTrue(_client.Connect()); - var symbol = Symbol.Create("ESM3", SecurityType.Future, Market.CME); - var subscribed = _client.Subscribe(symbol, Resolution.Minute, TickType.Trade); + var symbol = CreateEsFuture(); - Assert.IsTrue(subscribed, "Should successfully subscribe to symbol"); + Assert.IsTrue(_client.Subscribe(symbol, TickType.Trade)); + Assert.IsTrue(_client.StartSession()); - Log.Trace($"Successfully subscribed to {symbol}"); + Thread.Sleep(1000); - // Wait a moment to ensure subscription is active - await Task.Delay(2000); - - var unsubscribed = _client.Unsubscribe(symbol); - Assert.IsTrue(unsubscribed, "Should successfully unsubscribe from symbol"); - - Log.Trace($"Successfully unsubscribed from {symbol}"); + Assert.IsTrue(_client.Unsubscribe(symbol)); } [Test] - [Explicit("This test requires a configured DataBento API key and live connection")] - public async Task ReceivesLiveData() + public void ReceivesTradeOrQuoteTicks() { - if (string.IsNullOrEmpty(_apiKey)) - { - Assert.Ignore("DataBento API key not configured"); - return; - } - - var dataReceived = false; - var dataReceivedEvent = new ManualResetEventSlim(false); - BaseData receivedData = null; + var receivedEvent = new ManualResetEventSlim(false); + BaseData received = null; - _client.DataReceived += (sender, data) => + _client.DataReceived += (_, data) => { - receivedData = data; - dataReceived = true; - dataReceivedEvent.Set(); - Log.Trace($"Received data: {data}"); + received = data; + receivedEvent.Set(); }; - var connected = await _client.ConnectAsync(); - Assert.IsTrue(connected, "Must be connected to test data reception"); + Assert.IsTrue(_client.Connect()); - var symbol = Symbol.Create("ESM3", SecurityType.Future, Market.CME); - var subscribed = _client.Subscribe(symbol, Resolution.Tick, TickType.Trade); - Assert.IsTrue(subscribed, "Must be subscribed to receive data"); + var symbol = CreateEsFuture(); - // Wait for data with timeout - var dataReceiptTimeout = TimeSpan.FromMinutes(2); - var receivedWithinTimeout = dataReceivedEvent.Wait(dataReceiptTimeout); + Assert.IsTrue(_client.Subscribe(symbol, TickType.Trade)); + Assert.IsTrue(_client.StartSession()); - if (receivedWithinTimeout) - { - Assert.IsTrue(dataReceived, "Should have received data"); - Assert.IsNotNull(receivedData, "Received data should not be null"); - Assert.AreEqual(symbol, receivedData.Symbol, "Received data symbol should match subscription"); - Assert.Greater(receivedData.Value, 0, "Received data value should be positive"); + var gotData = receivedEvent.Wait(TimeSpan.FromMinutes(2)); - Log.Trace($"Successfully received live data: {receivedData}"); - } - else + if (!gotData) { - Log.Trace("No data received within timeout period - this may be expected during non-market hours"); + Assert.Inconclusive("No data received (likely outside market hours)"); + return; } - _client.Unsubscribe(symbol); - } + Assert.NotNull(received); + Assert.AreEqual(symbol, received.Symbol); - [Test] - [Explicit("This test requires a configured DataBento API key and live connection")] - public async Task HandlesConnectionEvents() - { - if (string.IsNullOrEmpty(_apiKey)) + if (received is Tick tick) { - Assert.Ignore("DataBento API key not configured"); - return; + Assert.Greater(tick.Time, DateTime.MinValue); + Assert.Greater(tick.Value, 0); } - - var connectionStatusChanged = false; - var connectionStatusEvent = new ManualResetEventSlim(false); - - _client.ConnectionStatusChanged += (sender, isConnected) => + else if (received is TradeBar bar) { - connectionStatusChanged = true; - connectionStatusEvent.Set(); - Log.Trace($"Connection status changed: {isConnected}"); - }; - - var connected = await _client.ConnectAsync(); - Assert.IsTrue(connected, "Should connect successfully"); - - // Connection status event should fire on connect - var eventFiredWithinTimeout = connectionStatusEvent.Wait(TimeSpan.FromSeconds(10)); - Assert.IsTrue(eventFiredWithinTimeout || connectionStatusChanged, - "Connection status changed event should fire"); - - _client.Disconnect(); - Assert.IsFalse(_client.IsConnected, "Should be disconnected after calling Disconnect()"); - } - - [Test] - public void HandlesInvalidApiKey() - { - var invalidClient = new DatabentoRawClient("invalid-api-key"); - - // Connection with invalid API key should fail gracefully - Assert.DoesNotThrowAsync(async () => + Assert.Greater(bar.Close, 0); + } + else { - var connected = await invalidClient.ConnectAsync(); - Assert.IsFalse(connected, "Connection should fail with invalid API key"); - }); - - invalidClient.Dispose(); + Assert.Fail($"Unexpected data type: {received.GetType()}"); + } } [Test] - public void DisposesCorrectly() + public void DisposeIsIdempotent() { - var client = new DatabentoRawClient(_apiKey); - Assert.DoesNotThrow(() => client.Dispose(), "Dispose should not throw"); - Assert.DoesNotThrow(() => client.Dispose(), "Multiple dispose calls should not throw"); + var client = new DataBentoRawLiveClient(ApiKey); + Assert.DoesNotThrow(client.Dispose); + Assert.DoesNotThrow(client.Dispose); } [Test] - public void SymbolMappingWorksCorrectly() + public void SymbolMappingDoesNotThrow() { - // Test that futures are mapped correctly to DataBento format - var esFuture = Symbol.Create("ESM3", SecurityType.Future, Market.CME); + Assert.IsTrue(_client.Connect()); - // Since the mapping method is private, we test indirectly through subscription - Assert.DoesNotThrowAsync(async () => - { - if (!string.IsNullOrEmpty(_apiKey)) - { - var connected = await _client.ConnectAsync(); - if (connected) - { - _client.Subscribe(esFuture, Resolution.Minute, TickType.Trade); - _client.Unsubscribe(esFuture); - } - } - }); - } - - [Test] - public void SchemaResolutionMappingWorksCorrectly() - { - // Test that resolution mappings work correctly - var symbol = Symbol.Create("ESM3", SecurityType.Future, Market.CME); + var symbol = CreateEsFuture(); - Assert.DoesNotThrowAsync(async () => + Assert.DoesNotThrow(() => { - if (!string.IsNullOrEmpty(_apiKey)) - { - var connected = await _client.ConnectAsync(); - if (connected) - { - // Test different resolutions - _client.Subscribe(symbol, Resolution.Tick, TickType.Trade); - _client.Unsubscribe(symbol); - - _client.Subscribe(symbol, Resolution.Second, TickType.Trade); - _client.Unsubscribe(symbol); - - _client.Subscribe(symbol, Resolution.Minute, TickType.Trade); - _client.Unsubscribe(symbol); - - _client.Subscribe(symbol, Resolution.Hour, TickType.Trade); - _client.Unsubscribe(symbol); - - _client.Subscribe(symbol, Resolution.Daily, TickType.Trade); - _client.Unsubscribe(symbol); - } - } + _client.Subscribe(symbol, TickType.Trade); + _client.StartSession(); + Thread.Sleep(500); + _client.Unsubscribe(symbol); }); } } diff --git a/QuantConnect.DataBento.Tests/QuantConnect.DataSource.DataBento.Tests.csproj b/QuantConnect.DataBento.Tests/QuantConnect.DataSource.DataBento.Tests.csproj index d2d7ef1..f5fce66 100644 --- a/QuantConnect.DataBento.Tests/QuantConnect.DataSource.DataBento.Tests.csproj +++ b/QuantConnect.DataBento.Tests/QuantConnect.DataSource.DataBento.Tests.csproj @@ -15,7 +15,6 @@ - @@ -24,10 +23,9 @@ - - + PreserveNewest diff --git a/QuantConnect.DataBento.Tests/TestSetup.cs b/QuantConnect.DataBento.Tests/TestSetup.cs index b0e07c1..39cb9de 100644 --- a/QuantConnect.DataBento.Tests/TestSetup.cs +++ b/QuantConnect.DataBento.Tests/TestSetup.cs @@ -62,22 +62,5 @@ private static void ReloadConfiguration() // resets the version among other things Globals.Reset(); } - - private static void SetUp() - { - Log.LogHandler = new CompositeLogHandler(); - Log.Trace("TestSetup(): starting..."); - ReloadConfiguration(); - Log.DebuggingEnabled = Config.GetBool("debug-mode"); - } - - private static TestCaseData[] TestParameters - { - get - { - SetUp(); - return new [] { new TestCaseData() }; - } - } } } diff --git a/QuantConnect.DataBento.Tests/config.json b/QuantConnect.DataBento.Tests/config.json index ff861ca..7256aba 100644 --- a/QuantConnect.DataBento.Tests/config.json +++ b/QuantConnect.DataBento.Tests/config.json @@ -1,5 +1,5 @@ { - "data-folder":"../../../../../Lean/Data/", + "data-folder":"../../../../../Data/", "job-user-id": "0", "api-access-token": "", diff --git a/QuantConnect.DataBento/DataBentoDataDownloader.cs b/QuantConnect.DataBento/DataBentoDataDownloader.cs index 2a44e55..841d070 100644 --- a/QuantConnect.DataBento/DataBentoDataDownloader.cs +++ b/QuantConnect.DataBento/DataBentoDataDownloader.cs @@ -15,19 +15,19 @@ */ using System; +using System.Collections.Generic; +using System.Globalization; +using NodaTime; using System.IO; -using System.Text; +using System.Linq; using System.Net.Http; -using System.Net.Http.Headers; -using System.Globalization; -using System.Collections.Generic; +using System.Text; using CsvHelper; -using CsvHelper.Configuration.Attributes; +using QuantConnect.Configuration; using QuantConnect.Data; using QuantConnect.Data.Market; +using QuantConnect.Lean.DataSource.DataBento.Models; using QuantConnect.Util; -using QuantConnect.Configuration; -using QuantConnect.Interfaces; using QuantConnect.Securities; namespace QuantConnect.Lean.DataSource.DataBento @@ -38,32 +38,28 @@ namespace QuantConnect.Lean.DataSource.DataBento /// public class DataBentoDataDownloader : IDataDownloader, IDisposable { - private readonly HttpClient _httpClient; + private readonly HttpClient _httpClient = new(); private readonly string _apiKey; - private const decimal PriceScaleFactor = 1e-9m; + private readonly DataBentoSymbolMapper _symbolMapper; + private readonly MarketHoursDatabase _marketHoursDatabase; + private readonly Dictionary _symbolExchangeTimeZones = new(); /// /// Initializes a new instance of the /// - public DataBentoDataDownloader(string apiKey) + /// The DataBento API key. + public DataBentoDataDownloader(string apiKey, MarketHoursDatabase marketHoursDatabase) { + _marketHoursDatabase = marketHoursDatabase; _apiKey = apiKey; - _httpClient = new HttpClient(); - - // Set up HTTP Basic Authentication - var credentials = Convert.ToBase64String(Encoding.ASCII.GetBytes($"{_apiKey}:")); - _httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", credentials); - } - - public DataBentoDataDownloader() - : this(Config.Get("databento-api-key")) - { + _httpClient.DefaultRequestHeaders.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Basic", Convert.ToBase64String(Encoding.UTF8.GetBytes($"{_apiKey}:"))); + _symbolMapper = new DataBentoSymbolMapper(); } /// /// Get historical data enumerable for a single symbol, type and resolution given this start and end time (in UTC). /// - /// Parameters for the historical data request + /// Parameters for the historical data request /// Enumerable of base data for this symbol /// public IEnumerable Get(DataDownloaderGetParameters parameters) @@ -72,25 +68,29 @@ public IEnumerable Get(DataDownloaderGetParameters parameters) var resolution = parameters.Resolution; var tickType = parameters.TickType; - var dataset = "GLBX.MDP3"; // hard coded for now. Later on can add equities and options with different mapping + /// + /// Dataset for CME Globex futures + /// https://databento.com/docs/venues-and-datasets has more information on datasets through DataBento + /// + const string dataset = "GLBX.MDP3"; // hard coded for now. Later on can add equities and options with different mapping var schema = GetSchema(resolution, tickType); - var dbSymbol = MapSymbolToDataBento(symbol); + var databentoSymbol = _symbolMapper.GetBrokerageSymbol(symbol); // prepare body for Raw HTTP request - var body = new StringBuilder(); - body.Append($"dataset={dataset}"); - body.Append($"&symbols={dbSymbol}"); - body.Append($"&schema={schema}"); - body.Append($"&start={parameters.StartUtc:yyyy-MM-ddTHH:mm}"); - body.Append($"&end={parameters.EndUtc:yyyy-MM-ddTHH:mm}"); - body.Append("&stype_in=parent"); - body.Append("&encoding=csv"); - - var request = new HttpRequestMessage( - HttpMethod.Post, + var body = new StringBuilder() + .Append($"dataset={dataset}") + .Append($"&symbols={databentoSymbol}") + .Append($"&schema={schema}") + .Append($"&start={parameters.StartUtc:yyyy-MM-ddTHH:mm}") + .Append($"&end={parameters.EndUtc:yyyy-MM-ddTHH:mm}") + .Append("&stype_in=parent") + .Append("&encoding=csv") + .ToString(); + + using var request = new HttpRequestMessage(HttpMethod.Post, "https://hist.databento.com/v0/timeseries.get_range") { - Content = new StringContent(body.ToString(), Encoding.UTF8, "application/x-www-form-urlencoded") + Content = new StringContent(body, Encoding.UTF8, "application/x-www-form-urlencoded") }; // send the request with the get range url @@ -99,85 +99,70 @@ public IEnumerable Get(DataDownloaderGetParameters parameters) // Add error handling to see the actual error message if (!response.IsSuccessStatusCode) { - var errorContent = response.Content.ReadAsStringAsync().Result; + var errorContent = response.Content.ReadAsStringAsync().SynchronouslyAwaitTaskResult(); throw new HttpRequestException($"DataBento API error ({response.StatusCode}): {errorContent}"); } - response.EnsureSuccessStatusCode(); - - using var stream = response.Content.ReadAsStream(); - using var reader = new StreamReader(stream); - using var csv = new CsvReader(reader, CultureInfo.InvariantCulture); + using var csv = new CsvReader( + new StreamReader(response.Content.ReadAsStream()), + CultureInfo.InvariantCulture + ); - if (tickType == TickType.Trade) + return (tickType, resolution) switch { - if (resolution == Resolution.Tick) - { - // For tick data, use the trades schema which returns individual trades - foreach (var record in csv.GetRecords()) - { - yield return new Tick - { - Time = record.Timestamp, - Symbol = symbol, - Value = record.Price, - Quantity = record.Size - }; - } - } - else - { - // For aggregated data, use the ohlcv schema which returns bars - foreach (var record in csv.GetRecords()) - { - yield return new TradeBar - { - Symbol = symbol, - Time = record.Timestamp, - Open = record.Open, - High = record.High, - Low = record.Low, - Close = record.Close, - Volume = record.Volume - }; - } - } - } - else if (tickType == TickType.Quote) - { - foreach (var record in csv.GetRecords()) - { - var bidPrice = record.BidPrice * PriceScaleFactor; - var askPrice = record.AskPrice * PriceScaleFactor; - - if (resolution == Resolution.Tick) - { - yield return new Tick + (TickType.Trade, Resolution.Tick) => + csv.ForEach(dt => + new Tick( + GetTickTime(symbol, dt.Timestamp), + symbol, + string.Empty, + string.Empty, + dt.Size, + dt.Price + ) + ), + + (TickType.Trade, _) => + csv.ForEach(bar => + new TradeBar( + GetTickTime(symbol, bar.Timestamp), + symbol, + bar.Open, + bar.High, + bar.Low, + bar.Close, + bar.Volume + ) + ), + + (TickType.Quote, Resolution.Tick) => + csv.ForEach(q => + new Tick( + GetTickTime(symbol, q.Timestamp), + symbol, + bidPrice: q.BidPrice, + askPrice: q.AskPrice, + bidSize: q.BidSize, + askSize: q.AskSize + ) { - Time = record.Timestamp, - Symbol = symbol, - AskPrice = askPrice, - BidPrice = bidPrice, - AskSize = record.AskSize, - BidSize = record.BidSize, TickType = TickType.Quote - }; - } - else - { - var bidBar = new Bar(bidPrice, bidPrice, bidPrice, bidPrice); - var askBar = new Bar(askPrice, askPrice, askPrice, askPrice); - yield return new QuoteBar( - record.Timestamp, + } + ), + + (TickType.Quote, _) => + csv.ForEach(q => + new QuoteBar( + GetTickTime(symbol, q.Timestamp), symbol, - bidBar, - record.BidSize, - askBar, - record.AskSize - ); - } - } - } + new Bar(q.BidPrice, q.BidPrice, q.BidPrice, q.BidPrice), q.BidSize, + new Bar(q.AskPrice, q.AskPrice, q.AskPrice, q.AskPrice), q.AskSize + ) + ), + + _ => throw new NotSupportedException( + $"Unsupported tickType={tickType} resolution={resolution}") + }; } /// @@ -191,113 +176,60 @@ public void Dispose() /// /// Pick Databento schema from Lean resolution/ticktype /// - private string GetSchema(Resolution resolution, TickType tickType) + private static string GetSchema(Resolution resolution, TickType tickType) { - if (tickType == TickType.Trade) + return (tickType, resolution) switch { - if (resolution == Resolution.Tick) - return "trades"; - if (resolution == Resolution.Second) - return "ohlcv-1s"; - if (resolution == Resolution.Minute) - return "ohlcv-1m"; - if (resolution == Resolution.Hour) - return "ohlcv-1h"; - if (resolution == Resolution.Daily) - return "ohlcv-1d"; - } - else if (tickType == TickType.Quote) - { - // top of book - if (resolution == Resolution.Tick || resolution == Resolution.Second || resolution == Resolution.Minute || resolution == Resolution.Hour || resolution == Resolution.Daily) - return "mbp-1"; - } + (TickType.Trade, Resolution.Tick) => "mbp-1", + (TickType.Trade, Resolution.Second) => "ohlcv-1s", + (TickType.Trade, Resolution.Minute) => "ohlcv-1m", + (TickType.Trade, Resolution.Hour) => "ohlcv-1h", + (TickType.Trade, Resolution.Daily) => "ohlcv-1d", - throw new NotSupportedException($"Unsupported resolution {resolution} / {tickType}"); + (TickType.Quote, _) => "mbp-1", + + _ => throw new NotSupportedException( + $"Unsupported resolution {resolution} / {tickType}" + ) + }; } /// - /// Maps a LEAN symbol to DataBento symbol format + /// Converts the given UTC time into the symbol security exchange time zone /// - private string MapSymbolToDataBento(Symbol symbol) + private DateTime GetTickTime(Symbol symbol, DateTime utcTime) { - if (symbol.SecurityType == SecurityType.Future) + DateTimeZone exchangeTimeZone; + lock (_symbolExchangeTimeZones) { - // For DataBento, use the root symbol with .FUT suffix for parent subscription - // ES19Z25 -> ES.FUT - var value = symbol.Value; - - // Extract root by removing digits and month codes - var root = new string(value.TakeWhile(c => !char.IsDigit(c)).ToArray()); + if (!_symbolExchangeTimeZones.TryGetValue(symbol, out exchangeTimeZone)) + { + // read the exchange time zone from market-hours-database + if (_marketHoursDatabase.TryGetEntry(symbol.ID.Market, symbol, symbol.SecurityType, out var entry)) + { + exchangeTimeZone = entry.ExchangeHours.TimeZone; + } + // If there is no entry for the given Symbol, default to New York + else + { + exchangeTimeZone = TimeZones.NewYork; + } - return $"{root}.FUT"; + _symbolExchangeTimeZones.Add(symbol, exchangeTimeZone); + } } - return symbol.Value; - } - - /// Class for parsing trade data from Databento - /// Really used as a map from the http request to then get it in QC data structures - private class DatabentoBar - { - [Name("ts_event")] - public long TimestampNanos { get; set; } - - public DateTime Timestamp => DateTimeOffset.FromUnixTimeSeconds(TimestampNanos / 1_000_000_000) - .AddTicks((TimestampNanos % 1_000_000_000) / 100).UtcDateTime; - - [Name("open")] - public decimal Open { get; set; } - - [Name("high")] - public decimal High { get; set; } - - [Name("low")] - public decimal Low { get; set; } - - [Name("close")] - public decimal Close { get; set; } - - [Name("volume")] - public decimal Volume { get; set; } - } - - private class DatabentoTrade - { - [Name("ts_event")] - public long TimestampNanos { get; set; } - - public DateTime Timestamp => DateTimeOffset.FromUnixTimeSeconds(TimestampNanos / 1_000_000_000) - .AddTicks((TimestampNanos % 1_000_000_000) / 100).UtcDateTime; - - [Name("price")] - public long PriceRaw { get; set; } - - public decimal Price => PriceRaw * PriceScaleFactor; - - [Name("size")] - public int Size { get; set; } + return utcTime.ConvertFromUtc(exchangeTimeZone); } + } +} - private class DatabentoQuote - { - [Name("ts_event")] - public long TimestampNanos { get; set; } - - public DateTime Timestamp => DateTimeOffset.FromUnixTimeSeconds(TimestampNanos / 1_000_000_000) - .AddTicks((TimestampNanos % 1_000_000_000) / 100).UtcDateTime; - - [Name("bid_px_00")] - public long BidPrice { get; set; } - - [Name("bid_sz_00")] - public int BidSize { get; set; } - - [Name("ask_px_00")] - public long AskPrice { get; set; } - - [Name("ask_sz_00")] - public int AskSize { get; set; } - } +public static class CsvReaderExtensions +{ + public static IEnumerable ForEach( + this CsvReader csv, + Func map) + { + return csv.GetRecords().Select(map).ToList(); } } diff --git a/QuantConnect.DataBento/DataBentoDataProvider.cs b/QuantConnect.DataBento/DataBentoDataProvider.cs index 8cf6015..aa9dd32 100644 --- a/QuantConnect.DataBento/DataBentoDataProvider.cs +++ b/QuantConnect.DataBento/DataBentoDataProvider.cs @@ -14,14 +14,11 @@ * */ -using System; -using System.Linq; using NodaTime; using QuantConnect.Data; using QuantConnect.Data.Market; using QuantConnect.Util; using QuantConnect.Interfaces; -using System.Collections.Generic; using QuantConnect.Configuration; using QuantConnect.Logging; using QuantConnect.Packets; @@ -31,23 +28,24 @@ namespace QuantConnect.Lean.DataSource.DataBento { /// - /// Implementation of Custom Data Provider + /// A data Provider for DataBento that provides live market data and historical data. + /// Handles Subscribing, Unsubscribing, and fetching historical data from DataBento. + /// It will handle if a symbol is subscribable and will log errors if it is not. /// - public class DataBentoProvider : IDataQueueHandler + public partial class DataBentoProvider : IDataQueueHandler { private readonly IDataAggregator _dataAggregator = Composer.Instance.GetExportedValueByTypeName( Config.Get("data-aggregator", "QuantConnect.Lean.Engine.DataFeeds.AggregationManager"), forceTypeNameOnExisting: false); - private EventBasedDataQueueHandlerSubscriptionManager _subscriptionManager = null!; - private readonly List _activeSubscriptionConfigs = new(); - private readonly ConcurrentDictionary _subscriptionConfigs = new(); - private DatabentoRawClient _client = null!; - private readonly string _apiKey; + private EventBasedDataQueueHandlerSubscriptionManager _subscriptionManager; + private DataBentoRawLiveClient _client; private readonly DataBentoDataDownloader _dataDownloader; private bool _potentialUnsupportedResolutionMessageLogged; private bool _sessionStarted = false; - private readonly object _sessionLock = new object(); + private readonly object _sessionLock = new(); private readonly MarketHoursDatabase _marketHoursDatabase = MarketHoursDatabase.FromDataFolder(); private readonly ConcurrentDictionary _symbolExchangeTimeZones = new(); + private bool _initialized; + private bool _unsupportedTickTypeMessagedLogged; /// /// Returns true if we're currently connected to the Data Provider @@ -59,109 +57,120 @@ public class DataBentoProvider : IDataQueueHandler /// public DataBentoProvider() { - _apiKey = Config.Get("databento-api-key"); - if (string.IsNullOrEmpty(_apiKey)) - { - throw new ArgumentException("DataBento API key is required. Set 'databento-api-key' in configuration."); - } - - _dataDownloader = new DataBentoDataDownloader(_apiKey); - Initialize(); - } - - /// - /// Initializes a new instance of the DataBentoProvider with custom API key - /// - /// DataBento API key - public DataBentoProvider(string apiKey) - { - _apiKey = apiKey ?? throw new ArgumentNullException(nameof(apiKey)); - _dataDownloader = new DataBentoDataDownloader(_apiKey); - Initialize(); + var apiKey = Config.Get("databento-api-key"); + _dataDownloader = new DataBentoDataDownloader(apiKey, _marketHoursDatabase); + Initialize(apiKey); } /// /// Common initialization logic + /// DataBento API key from config file retrieved on constructor /// - private void Initialize() + private void Initialize(string apiKey) { - Log.Trace("DataBentoProvider.Initialize(): Starting initialization"); - _subscriptionManager = new EventBasedDataQueueHandlerSubscriptionManager(); - _subscriptionManager.SubscribeImpl = (symbols, tickType) => + Log.Debug("DataBentoProvider.Initialize(): Starting initialization"); + _subscriptionManager = new EventBasedDataQueueHandlerSubscriptionManager() { - Log.Trace($"DataBentoProvider.SubscribeImpl(): Received subscription request for {symbols.Count()} symbols, TickType={tickType}"); - foreach (var symbol in symbols) + SubscribeImpl = (symbols, tickType) => { - Log.Trace($"DataBentoProvider.SubscribeImpl(): Processing symbol {symbol}"); - if (!_subscriptionConfigs.TryGetValue(symbol, out var config)) - { - Log.Error($"DataBentoProvider.SubscribeImpl(): No subscription config found for {symbol}"); - return false; - } - if (_client?.IsConnected != true) - { - Log.Error($"DataBentoProvider.SubscribeImpl(): Client is not connected. Cannot subscribe to {symbol}"); - return false; - } + return SubscriptionLogic(symbols, tickType); + }, + UnsubscribeImpl = (symbols, tickType) => + { + return UnsubscribeLogic(symbols, tickType); + } + }; + + // Initialize the live client + _client = new DataBentoRawLiveClient(apiKey); + _client.DataReceived += OnDataReceived; - var resolution = config.Resolution > Resolution.Tick ? Resolution.Tick : config.Resolution; - if (!_client.Subscribe(config.Symbol, resolution, config.TickType)) + // Connect to live gateway + Log.Debug("DataBentoProvider.Initialize(): Attempting connection to DataBento live gateway"); + var cancellationTokenSource = new CancellationTokenSource(); + Task.Factory.StartNew(() => + { + try + { + var connected = _client.Connect(); + Log.Debug($"DataBentoProvider.Initialize(): Connect() returned {connected}"); + + if (connected) { - Log.Error($"Failed to subscribe to {config.Symbol}"); - return false; + Log.Debug("DataBentoProvider.Initialize(): Successfully connected to DataBento live gateway"); } - - lock (_sessionLock) + else { - if (!_sessionStarted) - _sessionStarted = _client.StartSession(); + Log.Error("DataBentoProvider.Initialize(): Failed to connect to DataBento live gateway"); } } + catch (Exception ex) + { + Log.Error($"DataBentoProvider.Initialize(): Exception during Connect(): {ex.Message}\n{ex.StackTrace}"); + } + }, + cancellationTokenSource.Token, + TaskCreationOptions.LongRunning, + TaskScheduler.Default); + _initialized = true; - return true; - }; + Log.Debug("DataBentoProvider.Initialize(): Initialization complete"); + } - _subscriptionManager.UnsubscribeImpl = (symbols, tickType) => + /// + /// Logic to unsubscribe from the specified symbols + /// + public bool UnsubscribeLogic(IEnumerable symbols, TickType tickType) + { + foreach (var symbol in symbols) { - foreach (var symbol in symbols) + Log.Debug($"DataBentoProvider.UnsubscribeImpl(): Processing symbol {symbol}"); + if (_client?.IsConnected != true) { - Log.Trace($"DataBentoProvider.UnsubscribeImpl(): Processing symbol {symbol}"); - if (_client?.IsConnected != true) - { - throw new InvalidOperationException($"DataBentoProvider.UnsubscribeImpl(): Client is not connected. Cannot unsubscribe from {symbol}"); - } - - _client.Unsubscribe(symbol); + throw new InvalidOperationException($"DataBentoProvider.UnsubscribeImpl(): Client is not connected. Cannot unsubscribe from {symbol}"); } - return true; - }; + _client.Unsubscribe(symbol); + } - // Initialize the live client - Log.Trace("DataBentoProvider.Initialize(): Creating DatabentoRawClient"); - _client = new DatabentoRawClient(_apiKey); - _client.DataReceived += OnDataReceived; - _client.ConnectionStatusChanged += OnConnectionStatusChanged; + return true; + } - // Connect to live gateway - Log.Trace("DataBentoProvider.Initialize(): Attempting connection to DataBento live gateway"); - Task.Run(() => + /// + /// Logic to subscribe to the specified symbols + /// + public bool SubscriptionLogic(IEnumerable symbols, TickType tickType) + { + if (_client?.IsConnected != true) { - var connected = _client.Connect(); - Log.Trace($"DataBentoProvider.Initialize(): Connect() returned {connected}"); + Log.Error("DataBentoProvider.SubscriptionLogic(): Client is not connected. Cannot subscribe to symbols"); + return false; + } - if (connected) - { - Log.Trace("DataBentoProvider.Initialize(): Successfully connected to DataBento live gateway"); - } - else + foreach (var symbol in symbols) + { + if (!CanSubscribe(symbol)) { - Log.Error("DataBentoProvider.Initialize(): Failed to connect to DataBento live gateway"); + Log.Error($"DataBentoProvider.SubscriptionLogic(): Unsupported subscription: {symbol}"); + return false; } - }); + _client.Subscribe(symbol, tickType); + } - Log.Trace("DataBentoProvider.Initialize(): Initialization complete"); + return true; + } + + /// + /// Checks if this Data provider supports the specified symbol + /// + /// The symbol + /// returns true if Data Provider supports the specified symbol; otherwise false + private bool CanSubscribe(Symbol symbol) + { + return !symbol.Value.Contains("universe", StringComparison.InvariantCultureIgnoreCase) && + !symbol.IsCanonical() && + IsSecurityTypeSupported(symbol.SecurityType); } /// @@ -172,17 +181,22 @@ private void Initialize() /// The new enumerator for this subscription request public IEnumerator? Subscribe(SubscriptionDataConfig dataConfig, EventHandler newDataAvailableHandler) { - Log.Trace($"DataBentoProvider.Subscribe(): Received subscription request for {dataConfig.Symbol}, Resolution={dataConfig.Resolution}, TickType={dataConfig.TickType}"); - if (!CanSubscribe(dataConfig)) + if (!IsSupported(dataConfig.SecurityType, dataConfig.Type, dataConfig.TickType, dataConfig.Resolution)) { - Log.Error($"DataBentoProvider.Subscribe(): Cannot subscribe to {dataConfig.Symbol} with Resolution={dataConfig.Resolution}, TickType={dataConfig.TickType}"); return null; } - _subscriptionConfigs[dataConfig.Symbol] = dataConfig; + lock (_sessionLock) + { + if (!_sessionStarted) + { + Log.Debug("DataBentoProvider.SubscriptionLogic(): Starting session"); + _sessionStarted = _client.StartSession(); + } + } + var enumerator = _dataAggregator.Add(dataConfig, newDataAvailableHandler); _subscriptionManager.Subscribe(dataConfig); - _activeSubscriptionConfigs.Add(dataConfig); return enumerator; } @@ -193,15 +207,8 @@ private void Initialize() /// Subscription config to be removed public void Unsubscribe(SubscriptionDataConfig dataConfig) { - Log.Trace($"DataBentoProvider.Unsubscribe(): Received unsubscription request for {dataConfig.Symbol}, Resolution={dataConfig.Resolution}, TickType={dataConfig.TickType}"); - _subscriptionConfigs.TryRemove(dataConfig.Symbol, out _); + Log.Debug($"DataBentoProvider.Unsubscribe(): Received unsubscription request for {dataConfig.Symbol}, Resolution={dataConfig.Resolution}, TickType={dataConfig.TickType}"); _subscriptionManager.Unsubscribe(dataConfig); - var toRemove = _activeSubscriptionConfigs.FirstOrDefault(c => c.Symbol == dataConfig.Symbol && c.TickType == dataConfig.TickType); - if (toRemove != null) - { - Log.Trace($"DataBentoProvider.Unsubscribe(): Removing active subscription for {dataConfig.Symbol}, Resolution={dataConfig.Resolution}, TickType={dataConfig.TickType}"); - _activeSubscriptionConfigs.Remove(toRemove); - } _dataAggregator.Remove(dataConfig); } @@ -211,7 +218,10 @@ public void Unsubscribe(SubscriptionDataConfig dataConfig) /// Job we're subscribing for public void SetJob(LiveNodePacket job) { - // No action required for DataBento since the job details are not used in the subscription process. + if (_initialized) + { + return; + } } /// @@ -221,62 +231,8 @@ public void Dispose() { _dataAggregator?.DisposeSafely(); _subscriptionManager?.DisposeSafely(); - _client?.Dispose(); - _dataDownloader?.Dispose(); - } - - /// - /// Gets the history for the requested security - /// - /// The historical data request - /// An enumerable of BaseData points - public IEnumerable? GetHistory(Data.HistoryRequest request) - { - Log.Trace($"DataBentoProvider.GetHistory(): Received history request for {request.Symbol}, Resolution={request.Resolution}, TickType={request.TickType}"); - if (!CanSubscribe(request.Symbol)) - { - Log.Error($"DataBentoProvider.GetHistory(): Cannot provide history for {request.Symbol} with Resolution={request.Resolution}, TickType={request.TickType}"); - return null; - } - - try - { - // Use the data downloader to get historical data - var parameters = new DataDownloaderGetParameters( - request.Symbol, - request.Resolution, - request.StartTimeUtc, - request.EndTimeUtc, - request.TickType); - - return _dataDownloader.Get(parameters); - } - catch (Exception ex) - { - Log.Error($"DataBentoProvider.GetHistory(): Failed to get history for {request.Symbol}: {ex.Message}"); - return null; - } - } - - /// - /// Checks if this Data provider supports the specified symbol - /// - /// The symbol - /// returns true if Data Provider supports the specified symbol; otherwise false - private bool CanSubscribe(Symbol symbol) - { - return !symbol.Value.Contains("universe", StringComparison.InvariantCultureIgnoreCase) && - !symbol.IsCanonical() && - IsSecurityTypeSupported(symbol.SecurityType); - } - - /// - /// Determines whether or not the specified config can be subscribed to - /// - private bool CanSubscribe(SubscriptionDataConfig config) - { - return CanSubscribe(config.Symbol) && - IsSupported(config.SecurityType, config.Type, config.TickType, config.Resolution); + _client?.DisposeSafely(); + _dataDownloader?.DisposeSafely(); } /// @@ -295,12 +251,6 @@ private bool IsSecurityTypeSupported(SecurityType securityType) /// private bool IsSupported(SecurityType securityType, Type dataType, TickType tickType, Resolution resolution) { - // Check supported security types - if (!IsSecurityTypeSupported(securityType)) - { - throw new NotSupportedException($"Unsupported security type: {securityType}"); - } - // Check supported data types if (dataType != typeof(TradeBar) && dataType != typeof(QuoteBar) && @@ -328,47 +278,66 @@ private bool IsSupported(SecurityType securityType, Type dataType, TickType tick /// private DateTime GetTickTime(Symbol symbol, DateTime utcTime) { - var exchangeTimeZone = _symbolExchangeTimeZones.GetOrAdd(symbol, sym => + DateTimeZone exchangeTimeZone; + lock (_symbolExchangeTimeZones) { - if (_marketHoursDatabase.TryGetEntry(sym.ID.Market, sym, sym.SecurityType, out var entry)) + if (!_symbolExchangeTimeZones.TryGetValue(symbol, out exchangeTimeZone)) { - return entry.ExchangeHours.TimeZone; + // read the exchange time zone from market-hours-database + if (_marketHoursDatabase.TryGetEntry(symbol.ID.Market, symbol, symbol.SecurityType, out var entry)) + { + exchangeTimeZone = entry.ExchangeHours.TimeZone; + } + // If there is no entry for the given Symbol, default to New York + else + { + exchangeTimeZone = TimeZones.NewYork; + } + + _symbolExchangeTimeZones[symbol] = exchangeTimeZone; } - // Futures default to Chicago - return TimeZones.Chicago; - }); + } return utcTime.ConvertFromUtc(exchangeTimeZone); } - // + /// /// Handles data received from the live client /// - private void OnDataReceived(object? sender, BaseData data) + private void OnDataReceived(object _, BaseData data) { try { - if (data is Tick tick) + switch (data) { - tick.Time = GetTickTime(tick.Symbol, tick.Time); - _dataAggregator.Update(tick); - - Log.Trace($"DataBentoProvider.OnDataReceived(): Updated tick - Symbol: {tick.Symbol}, " + - $"TickType: {tick.TickType}, Price: {tick.Value}, Quantity: {tick.Quantity}"); - } - else if (data is TradeBar tradeBar) - { - tradeBar.Time = GetTickTime(tradeBar.Symbol, tradeBar.Time); - tradeBar.EndTime = GetTickTime(tradeBar.Symbol, tradeBar.EndTime); - _dataAggregator.Update(tradeBar); + case Tick tick: + tick.Time = GetTickTime(tick.Symbol, tick.Time); + lock (_dataAggregator) + { + _dataAggregator.Update(tick); + } + // Log.Trace($"DataBentoProvider.OnDataReceived(): Updated tick - Symbol: {tick.Symbol}, " + + // $"TickType: {tick.TickType}, Price: {tick.Value}, Quantity: {tick.Quantity}"); + break; + + case TradeBar tradeBar: + tradeBar.Time = GetTickTime(tradeBar.Symbol, tradeBar.Time); + tradeBar.EndTime = GetTickTime(tradeBar.Symbol, tradeBar.EndTime); + lock (_dataAggregator) + { + _dataAggregator.Update(tradeBar); + } + // Log.Trace($"DataBentoProvider.OnDataReceived(): Updated TradeBar - Symbol: {tradeBar.Symbol}, " + + // $"O:{tradeBar.Open} H:{tradeBar.High} L:{tradeBar.Low} C:{tradeBar.Close} V:{tradeBar.Volume}"); + break; - Log.Trace($"DataBentoProvider.OnDataReceived(): Updated TradeBar - Symbol: {tradeBar.Symbol}, " + - $"O:{tradeBar.Open} H:{tradeBar.High} L:{tradeBar.Low} C:{tradeBar.Close} V:{tradeBar.Volume}"); - } - else - { - data.Time = GetTickTime(data.Symbol, data.Time); - _dataAggregator.Update(data); + default: + data.Time = GetTickTime(data.Symbol, data.Time); + lock (_dataAggregator) + { + _dataAggregator.Update(data); + } + break; } } catch (Exception ex) @@ -376,41 +345,5 @@ private void OnDataReceived(object? sender, BaseData data) Log.Error($"DataBentoProvider.OnDataReceived(): Error updating data aggregator: {ex.Message}\n{ex.StackTrace}"); } } - - /// - /// Handles connection status changes from the live client - /// - private void OnConnectionStatusChanged(object? sender, bool isConnected) - { - Log.Trace($"DataBentoProvider.OnConnectionStatusChanged(): Connection status changed to: {isConnected}"); - - if (isConnected) - { - // Reset session flag on reconnection - lock (_sessionLock) - { - _sessionStarted = false; - } - - // Resubscribe to all active subscriptions - foreach (var config in _activeSubscriptionConfigs) - { - _client.Subscribe(config.Symbol, config.Resolution, config.TickType); - } - - // Start session after resubscribing - if (_activeSubscriptionConfigs.Any()) - { - lock (_sessionLock) - { - if (!_sessionStarted) - { - Log.Trace("DataBentoProvider.OnConnectionStatusChanged(): Starting session after reconnection"); - _sessionStarted = _client.StartSession(); - } - } - } - } - } } } diff --git a/QuantConnect.DataBento/DataBentoHistoryProivder.cs b/QuantConnect.DataBento/DataBentoHistoryProivder.cs index a93b50e..eb23154 100644 --- a/QuantConnect.DataBento/DataBentoHistoryProivder.cs +++ b/QuantConnect.DataBento/DataBentoHistoryProivder.cs @@ -14,7 +14,6 @@ * */ -using System; using NodaTime; using QuantConnect.Data; using QuantConnect.Data.Market; @@ -22,30 +21,25 @@ using QuantConnect.Lean.Engine.HistoricalData; using QuantConnect.Logging; using QuantConnect.Util; -using QuantConnect.Lean.DataSource.DataBento; using QuantConnect.Interfaces; -using System.Collections.Generic; -using QuantConnect.Configuration; using QuantConnect.Securities; using QuantConnect.Data.Consolidators; namespace QuantConnect.Lean.DataSource.DataBento { /// - /// DataBento implementation of + /// Impleements a history provider for DataBento historical data. + /// Uses consolidators to produce the requested resolution when necessary. /// - public partial class DataBentoHistoryProvider : SynchronizingHistoryProvider + public partial class DataBentoProvider : SynchronizingHistoryProvider { private int _dataPointCount; - private DataBentoDataDownloader _dataDownloader; + + /// + /// Indicates whether a error for an invalid start time has been fired, where the start time is greater than or equal to the end time in UTC. + /// private volatile bool _invalidStartTimeErrorFired; - private volatile bool _invalidTickTypeAndResolutionErrorFired; - private volatile bool _unsupportedTickTypeMessagedLogged; - private MarketHoursDatabase _marketHoursDatabase; - private bool _unsupportedSecurityTypeMessageLogged; - private bool _unsupportedDataTypeMessageLogged; - private bool _potentialUnsupportedResolutionMessageLogged; - + /// /// Gets the total number of data points emitted by this history provider /// @@ -57,8 +51,6 @@ public partial class DataBentoHistoryProvider : SynchronizingHistoryProvider /// The initialization parameters public override void Initialize(HistoryProviderInitializeParameters parameters) { - _dataDownloader = new DataBentoDataDownloader(); - _marketHoursDatabase = MarketHoursDatabase.FromDataFolder(); } /// @@ -101,8 +93,7 @@ public override void Initialize(HistoryProviderInitializeParameters parameters) /// An enumerable of BaseData points public IEnumerable? GetHistory(HistoryRequest request) { - if (request.Symbol.IsCanonical() || - !IsSupported(request.Symbol.SecurityType, request.DataType, request.TickType, request.Resolution)) + if (!CanSubscribe(request.Symbol)) { // It is Logged in IsSupported(...) return null; @@ -113,7 +104,7 @@ public override void Initialize(HistoryProviderInitializeParameters parameters) if (!_unsupportedTickTypeMessagedLogged) { _unsupportedTickTypeMessagedLogged = true; - Log.Trace($"DataBentoHistoryProvider.GetHistory(): Unsupported tick type: {TickType.OpenInterest}"); + Log.Trace($"DataBentoProvider.GetHistory(): Unsupported tick type: {TickType.OpenInterest}"); } return null; } @@ -123,7 +114,7 @@ public override void Initialize(HistoryProviderInitializeParameters parameters) if (!_invalidStartTimeErrorFired) { _invalidStartTimeErrorFired = true; - Log.Error($"{nameof(DataBentoHistoryProvider)}.{nameof(GetHistory)}:InvalidDateRange. The history request start date must precede the end date, no history returned"); + Log.Error($"{nameof(DataBentoProvider)}.{nameof(GetHistory)}:InvalidDateRange. The history request start date must precede the end date, no history returned"); } return null; } @@ -230,59 +221,5 @@ private IEnumerable GetQuotes(HistoryRequest request) var parameters = new DataDownloaderGetParameters(request.Symbol, Resolution.Tick, request.StartTimeUtc, request.EndTimeUtc, request.TickType); return _dataDownloader.Get(parameters); } - - /// - /// Checks if the security type is supported - /// - /// Security type to check - /// True if supported - private bool IsSecurityTypeSupported(SecurityType securityType) - { - // DataBento primarily supports futures, but also has equity and option coverage - return securityType == SecurityType.Future; - } - - /// - /// Determines if the specified subscription is supported - /// - private bool IsSupported(SecurityType securityType, Type dataType, TickType tickType, Resolution resolution) - { - // Check supported security types - if (!IsSecurityTypeSupported(securityType)) - { - if (!_unsupportedSecurityTypeMessageLogged) - { - _unsupportedSecurityTypeMessageLogged = true; - Log.Trace($"DataBentoDataProvider.IsSupported(): Unsupported security type: {securityType}"); - } - return false; - } - - // Check supported data types - if (dataType != typeof(TradeBar) && - dataType != typeof(QuoteBar) && - dataType != typeof(Tick) && - dataType != typeof(OpenInterest)) - { - if (!_unsupportedDataTypeMessageLogged) - { - _unsupportedDataTypeMessageLogged = true; - Log.Trace($"DataBentoDataProvider.IsSupported(): Unsupported data type: {dataType}"); - } - return false; - } - - // Warn about potential limitations for tick data - // I'm mimicing polygon implementation with this - if (!_potentialUnsupportedResolutionMessageLogged) - { - _potentialUnsupportedResolutionMessageLogged = true; - Log.Trace("DataBentoDataProvider.IsSupported(): " + - $"Subscription for {securityType}-{dataType}-{tickType}-{resolution} will be attempted. " + - $"An Advanced DataBento subscription plan is required to stream tick data."); - } - - return true; - } } } diff --git a/QuantConnect.DataBento/DataBentoRawLiveClient.cs b/QuantConnect.DataBento/DataBentoRawLiveClient.cs index 2e7bb97..2dab1d2 100644 --- a/QuantConnect.DataBento/DataBentoRawLiveClient.cs +++ b/QuantConnect.DataBento/DataBentoRawLiveClient.cs @@ -14,14 +14,12 @@ * */ -using System; -using System.IO; using System.Text; using System.Net.Sockets; using System.Security.Cryptography; using System.Collections.Concurrent; using System.Text.Json; -using System.Linq; +using System.Threading.Tasks; using QuantConnect.Data; using QuantConnect.Data.Market; using QuantConnect.Logging; @@ -31,33 +29,44 @@ namespace QuantConnect.Lean.DataSource.DataBento /// /// DataBento Raw TCP client for live streaming data /// - public class DatabentoRawClient : IDisposable + public class DataBentoRawLiveClient : IDisposable { + /// + /// The DataBento API key for authentication + /// private readonly string _apiKey; - private readonly string _gateway; + /// + /// The DataBento live gateway address to receive data from + /// + private const string _gateway = "glbx-mdp3.lsg.databento.com:13000"; + /// + /// The dataset to subscribe to + /// private readonly string _dataset; - private TcpClient? _tcpClient; + private readonly TcpClient? _tcpClient; + private readonly string _host; + private readonly int _port; private NetworkStream? _stream; - private StreamReader? _reader; - private StreamWriter? _writer; - private CancellationTokenSource _cancellationTokenSource; + private StreamReader _reader; + private StreamWriter _writer; + private readonly CancellationTokenSource _cancellationTokenSource; private readonly ConcurrentDictionary _subscriptions; private readonly object _connectionLock = new object(); private bool _isConnected; private bool _disposed; private const decimal PriceScaleFactor = 1e-9m; private readonly ConcurrentDictionary _instrumentIdToSymbol = new ConcurrentDictionary(); - private readonly ConcurrentDictionary _lastTicks = new ConcurrentDictionary(); + private readonly DataBentoSymbolMapper _symbolMapper; /// /// Event fired when new data is received /// - public event EventHandler? DataReceived; + public event EventHandler DataReceived; /// /// Event fired when connection status changes /// - public event EventHandler? ConnectionStatusChanged; + public event EventHandler ConnectionStatusChanged; /// /// Gets whether the client is currently connected @@ -65,15 +74,21 @@ public class DatabentoRawClient : IDisposable public bool IsConnected => _isConnected && _tcpClient?.Connected == true; /// - /// Initializes a new instance of the DatabentoRawClient + /// Initializes a new instance of the DataBentoRawLiveClient + /// The DataBento API key. /// - public DatabentoRawClient(string apiKey, string gateway = "glbx-mdp3.lsg.databento.com:13000", string dataset = "GLBX.MDP3") + public DataBentoRawLiveClient(string apiKey, string dataset = "GLBX.MDP3") { _apiKey = apiKey ?? throw new ArgumentNullException(nameof(apiKey)); - _gateway = gateway ?? throw new ArgumentNullException(nameof(gateway)); _dataset = dataset; + _tcpClient = new TcpClient(); _subscriptions = new ConcurrentDictionary(); _cancellationTokenSource = new CancellationTokenSource(); + _symbolMapper = new DataBentoSymbolMapper(); + + var parts = _gateway.Split(':'); + _host = parts[0]; + _port = parts.Length > 1 ? int.Parse(parts[1]) : 13000; } /// @@ -81,20 +96,15 @@ public DatabentoRawClient(string apiKey, string gateway = "glbx-mdp3.lsg.databen /// public bool Connect() { - Log.Trace("DatabentoRawClient.Connect(): Connecting to DataBento live gateway"); - if (_isConnected || _disposed) + Log.Trace("DataBentoRawLiveClient.Connect(): Connecting to DataBento live gateway"); + if (_isConnected) { return _isConnected; } try { - var parts = _gateway.Split(':'); - var host = parts[0]; - var port = parts.Length > 1 ? int.Parse(parts[1]) : 13000; - - _tcpClient = new TcpClient(); - _tcpClient.Connect(host, port); + _tcpClient.Connect(_host, _port); _stream = _tcpClient.GetStream(); _reader = new StreamReader(_stream, Encoding.ASCII); _writer = new StreamWriter(_stream, Encoding.ASCII) { AutoFlush = true }; @@ -106,15 +116,15 @@ public bool Connect() ConnectionStatusChanged?.Invoke(this, true); // Start message processing - ProcessMessages(); + Task.Run(ProcessMessages, _cancellationTokenSource.Token); - Log.Trace("DatabentoRawClient.Connect(): Connected and authenticated to DataBento live gateway"); + Log.Trace("DataBentoRawLiveClient.Connect(): Connected and authenticated to DataBento live gateway"); return true; } } catch (Exception ex) { - Log.Error($"DatabentoRawClient.Connect(): Failed to connect: {ex.Message}"); + Log.Error($"DataBentoRawLiveClient.Connect(): Failed to connect: {ex.Message}"); Disconnect(); } @@ -126,124 +136,97 @@ public bool Connect() /// private bool Authenticate() { - if (_reader == null || _writer == null) - return false; - try { // Read greeting and challenge - string? versionLine = _reader.ReadLine(); - string? cramLine = _reader.ReadLine(); + var versionLine = _reader.ReadLine(); + var cramLine = _reader.ReadLine(); if (string.IsNullOrEmpty(versionLine) || string.IsNullOrEmpty(cramLine)) { - Log.Error("DatabentoRawClient.Authenticate(): Failed to receive greeting or challenge"); + Log.Error("DataBentoRawLiveClient.Authenticate(): Failed to receive greeting or challenge"); return false; } - Log.Trace($"DatabentoRawClient.Authenticate(): Version: {versionLine}"); - Log.Trace($"DatabentoRawClient.Authenticate(): Challenge: {cramLine}"); - // Parse challenge - string[] cramParts = cramLine.Split('='); + var cramParts = cramLine.Split('='); if (cramParts.Length != 2 || cramParts[0] != "cram") { - Log.Error("DatabentoRawClient.Authenticate(): Invalid challenge format"); - return false; - } - string cram = cramParts[1].Trim(); - - // Compute auth hash - string concat = $"{cram}|{_apiKey}"; - string hashHex = ComputeSHA256(concat); - string bucketId = _apiKey.Length >= 5 ? _apiKey.Substring(_apiKey.Length - 5) : _apiKey; - string authString = $"{hashHex}-{bucketId}"; - - // Send auth message - string authMsg = $"auth={authString}|dataset={_dataset}|encoding=json|ts_out=0"; - Log.Trace($"DatabentoRawClient.Authenticate(): Sending auth"); - _writer.WriteLine(authMsg); - - // Read auth response - string? authResp = _reader.ReadLine(); - if (string.IsNullOrEmpty(authResp)) - { - Log.Error("DatabentoRawClient.Authenticate(): No authentication response received"); + Log.Error("DataBentoRawLiveClient.Authenticate(): Invalid challenge format"); return false; } + var cram = cramParts[1].Trim(); - Log.Trace($"DatabentoRawClient.Authenticate(): Auth response: {authResp}"); - + // Auth + _writer.WriteLine($"auth={GetAuthStringFromCram(cram)}|dataset={_dataset}|encoding=json|ts_out=0"); + var authResp = _reader.ReadLine(); if (!authResp.Contains("success=1")) { - Log.Error($"DatabentoRawClient.Authenticate(): Authentication failed: {authResp}"); + Log.Error($"DataBentoRawLiveClient.Authenticate(): Authentication failed: {authResp}"); return false; } - Log.Trace("DatabentoRawClient.Authenticate(): Authentication successful"); + Log.Trace("DataBentoRawLiveClient.Authenticate(): Authentication successful"); return true; } catch (Exception ex) { - Log.Error($"DatabentoRawClient.Authenticate(): Authentication failed: {ex.Message}"); + Log.Error($"DataBentoRawLiveClient.Authenticate(): Authentication failed: {ex.Message}"); return false; } } - private static string ComputeSHA256(string input) + + /// + /// Handles the DataBento authentication string from a CRAM challenge + /// + /// The CRAM challenge string + /// The auth string to send to the server + private string GetAuthStringFromCram(string cram) { - using var sha = SHA256.Create(); - byte[] hash = sha.ComputeHash(Encoding.UTF8.GetBytes(input)); - var sb = new StringBuilder(); - foreach (byte b in hash) - { - sb.Append(b.ToString("x2")); - } - return sb.ToString(); + if (string.IsNullOrWhiteSpace(cram)) + throw new ArgumentException("CRAM challenge cannot be null or empty", nameof(cram)); + + string concat = $"{cram}|{_apiKey}"; + string hashHex = ComputeSHA256(concat); + string bucketId = _apiKey.Substring(_apiKey.Length - 5); + + return $"{hashHex}-{bucketId}"; } /// /// Subscribes to live data for a symbol /// - public bool Subscribe(Symbol symbol, Resolution resolution, TickType tickType) + public bool Subscribe(Symbol symbol, TickType tickType) { - if (!IsConnected || _writer == null) + if (!IsConnected) { - Log.Error("DatabentoRawClient.Subscribe(): Not connected to gateway"); + Log.Error("DataBentoRawLiveClient.Subscribe(): Not connected to gateway"); return false; } try { // Get the databento symbol form LEAN symbol - // Get schema from the resolution - var databentoSymbol = MapSymbolToDataBento(symbol); - var schema = GetSchema(resolution, tickType); + var databentoSymbol = _symbolMapper.GetBrokerageSymbol(symbol); + var schema = "mbp-1"; + var resolution = Resolution.Tick; // subscribe var subscribeMessage = $"schema={schema}|stype_in=parent|symbols={databentoSymbol}"; - Log.Trace($"DatabentoRawClient.Subscribe(): Subscribing with message: {subscribeMessage}"); + Log.Debug($"DataBentoRawLiveClient.Subscribe(): Subscribing with message: {subscribeMessage}"); // Send subscribe message _writer.WriteLine(subscribeMessage); // Store subscription _subscriptions.TryAdd(symbol, (resolution, tickType)); - Log.Trace($"DatabentoRawClient.Subscribe(): Subscribed to {symbol} ({databentoSymbol}) at {resolution} resolution for {tickType}"); - - // If subscribing to quote ticks, also subscribe to trade ticks - if (tickType == TickType.Quote && resolution == Resolution.Tick) - { - var tradeSchema = GetSchema(resolution, TickType.Trade); - var tradeSubscribeMessage = $"schema={tradeSchema}|stype_in=parent|symbols={databentoSymbol}"; - Log.Trace($"DatabentoRawClient.Subscribe(): Also subscribing to trades with message: {tradeSubscribeMessage}"); - _writer.WriteLine(tradeSubscribeMessage); - } + Log.Debug($"DataBentoRawLiveClient.Subscribe(): Subscribed to {symbol} ({databentoSymbol}) at {resolution} resolution for {tickType}"); return true; } catch (Exception ex) { - Log.Error($"DatabentoRawClient.Subscribe(): Failed to subscribe to {symbol}: {ex.Message}"); + Log.Error($"DataBentoRawLiveClient.Subscribe(): Failed to subscribe to {symbol}: {ex.Message}"); return false; } } @@ -253,21 +236,21 @@ public bool Subscribe(Symbol symbol, Resolution resolution, TickType tickType) /// public bool StartSession() { - if (!IsConnected || _writer == null) + if (!IsConnected) { - Log.Error("DatabentoRawClient.StartSession(): Not connected"); + Log.Error("DataBentoRawLiveClient.StartSession(): Not connected"); return false; } try { - Log.Trace("DatabentoRawClient.StartSession(): Starting session"); + Log.Trace("DataBentoRawLiveClient.StartSession(): Starting session"); _writer.WriteLine("start_session=1"); return true; } catch (Exception ex) { - Log.Error($"DatabentoRawClient.StartSession(): Failed to start session: {ex.Message}"); + Log.Error($"DataBentoRawLiveClient.StartSession(): Failed to start session: {ex.Message}"); return false; } } @@ -281,13 +264,13 @@ public bool Unsubscribe(Symbol symbol) { if (_subscriptions.TryRemove(symbol, out _)) { - Log.Trace($"DatabentoRawClient.Unsubscribe(): Unsubscribed from {symbol}"); + Log.Debug($"DataBentoRawLiveClient.Unsubscribe(): Unsubscribed from {symbol}"); } return true; } catch (Exception ex) { - Log.Error($"DatabentoRawClient.Unsubscribe(): Failed to unsubscribe from {symbol}: {ex.Message}"); + Log.Error($"DataBentoRawLiveClient.Unsubscribe(): Failed to unsubscribe from {symbol}: {ex.Message}"); return false; } } @@ -297,33 +280,17 @@ public bool Unsubscribe(Symbol symbol) /// private void ProcessMessages() { - Log.Trace("DatabentoRawClient.ProcessMessages(): Starting message processing"); - if (_reader == null) - { - Log.Error("DatabentoRawClient.ProcessMessages(): No reader available"); - return; - } - - var messageCount = 0; + Log.Debug("DataBentoRawLiveClient.ProcessMessages(): Starting message processing"); try { while (!_cancellationTokenSource.IsCancellationRequested && IsConnected) { var line = _reader.ReadLine(); - if (line == null) - { - Log.Trace("DatabentoRawClient.ProcessMessages(): Connection closed by server"); - break; - } - if (string.IsNullOrWhiteSpace(line)) - continue; - - messageCount++; - if (messageCount <= 50 || messageCount % 100 == 0) { - Log.Trace($"DatabentoRawClient.ProcessMessages(): Message #{messageCount}: {line.Substring(0, Math.Min(150, line.Length))}..."); + Log.Trace("DataBentoRawLiveClient.ProcessMessages(): Line is null or empty. Issue receiving data."); + break; } ProcessSingleMessage(line); @@ -331,19 +298,18 @@ private void ProcessMessages() } catch (OperationCanceledException) { - Log.Trace("DatabentoRawClient.ProcessMessages(): Message processing cancelled"); + Log.Trace("DataBentoRawLiveClient.ProcessMessages(): Message processing cancelled"); } catch (IOException ex) when (ex.InnerException is SocketException) { - Log.Trace($"DatabentoRawClient.ProcessMessages(): Socket exception: {ex.Message}"); + Log.Trace($"DataBentoRawLiveClient.ProcessMessages(): Socket exception: {ex.Message}"); } catch (Exception ex) { - Log.Error($"DatabentoRawClient.ProcessMessages(): Error processing messages: {ex.Message}\n{ex.StackTrace}"); + Log.Error($"DataBentoRawLiveClient.ProcessMessages(): Error processing messages: {ex.Message}\n{ex.StackTrace}"); } finally { - Log.Trace($"DatabentoRawClient.ProcessMessages(): Exiting. Total messages processed: {messageCount}"); Disconnect(); } } @@ -365,57 +331,71 @@ private void ProcessSingleMessage(string message) { var rtype = rtypeElement.GetInt32(); - if (rtype == 23) - { - if (root.TryGetProperty("msg", out var msgElement)) - { - Log.Trace($"DatabentoRawClient: System message: {msgElement.GetString()}"); - } - return; - } - else if (rtype == 22) + switch (rtype) { - // Symbol mapping message - if (root.TryGetProperty("stype_in_symbol", out var inSymbol) && - root.TryGetProperty("stype_out_symbol", out var outSymbol) && - headerElement.TryGetProperty("instrument_id", out var instId)) - { - var instrumentId = instId.GetInt64(); - var outSymbolStr = outSymbol.GetString(); - - Log.Trace($"DatabentoRawClient: Symbol mapping: {inSymbol.GetString()} -> {outSymbolStr} (instrument_id: {instrumentId})"); - - // Find the subscription that matches this symbol - foreach (var kvp in _subscriptions) + case 23: + // System message + if (root.TryGetProperty("msg", out var msgElement)) { - var leanSymbol = kvp.Key; + Log.Debug($"DataBentoRawLiveClient: System message: {msgElement.GetString()}"); + } + return; + + case 22: + // Symbol mapping message + if (root.TryGetProperty("stype_in_symbol", out var inSymbol) && + root.TryGetProperty("stype_out_symbol", out var outSymbol) && + headerElement.TryGetProperty("instrument_id", out var instId)) + { + var instrumentId = instId.GetInt64(); + var outSymbolStr = outSymbol.GetString(); + + Log.Debug($"DataBentoRawLiveClient: Symbol mapping: {inSymbol.GetString()} -> {outSymbolStr} (instrument_id: {instrumentId})"); + if (outSymbolStr != null) { - _instrumentIdToSymbol[instrumentId] = leanSymbol; - Log.Trace($"DatabentoRawClient: Mapped instrument_id {instrumentId} to {leanSymbol}"); - break; + // Let's find the subscribed symbol to get the market and security type + var inSymbolStr = inSymbol.GetString(); + var subscription = _subscriptions.Keys.FirstOrDefault(s => _symbolMapper.GetBrokerageSymbol(s) == inSymbolStr); + if (subscription != null) + { + if (subscription.SecurityType == SecurityType.Future) + { + var leanSymbol = _symbolMapper.GetLeanSymbolForFuture(outSymbolStr); + if (leanSymbol == null) + { + Log.Trace($"DataBentoRawLiveClient: Future spreads are not supported: {outSymbolStr}. Skipping mapping."); + return; + } + _instrumentIdToSymbol[instrumentId] = leanSymbol; + Log.Debug($"DataBentoRawLiveClient: Mapped instrument_id {instrumentId} to {leanSymbol}"); + } + } } } - } - return; - } - else if (rtype == 1) - { - // MBP-1 (Market By Price) - Quote ticks - HandleMBPMessage(root, headerElement); - return; - } - else if (rtype == 0) - { - // Trade messages - Trade ticks - HandleTradeTickMessage(root, headerElement); - return; - } - else if (rtype == 32 || rtype == 33 || rtype == 34 || rtype == 35) - { - // OHLCV bar messages - HandleOHLCVMessage(root, headerElement); - return; + return; + + case 1: + // MBP-1 (Market By Price) + HandleMBPMessage(root, headerElement); + return; + + case 0: + // Trade messages + HandleTradeTickMessage(root, headerElement); + return; + + case 32: + case 33: + case 34: + case 35: + // OHLCV bar messages + HandleOHLCVMessage(root, headerElement); + return; + + default: + Log.Error($"DataBentoRawLiveClient: Unknown rtype {rtype} in message"); + return; } } } @@ -423,16 +403,16 @@ private void ProcessSingleMessage(string message) // Handle other message types if needed if (root.TryGetProperty("error", out var errorElement)) { - Log.Error($"DatabentoRawClient: Server error: {errorElement.GetString()}"); + Log.Error($"DataBentoRawLiveClient: Server error: {errorElement.GetString()}"); } } catch (JsonException ex) { - Log.Error($"DatabentoRawClient.ProcessSingleMessage(): JSON parse error: {ex.Message}"); + Log.Error($"DataBentoRawLiveClient.ProcessSingleMessage(): JSON parse error: {ex.Message}"); } catch (Exception ex) { - Log.Error($"DatabentoRawClient.ProcessSingleMessage(): Error: {ex.Message}"); + Log.Error($"DataBentoRawLiveClient.ProcessSingleMessage(): Error: {ex.Message}"); } } @@ -458,7 +438,7 @@ private void HandleOHLCVMessage(JsonElement root, JsonElement header) if (!_instrumentIdToSymbol.TryGetValue(instrumentId, out var matchedSymbol)) { - Log.Trace($"DatabentoRawClient: No mapping for instrument_id {instrumentId} in OHLCV message."); + Log.Debug($"DataBentoRawLiveClient: No mapping for instrument_id {instrumentId} in OHLCV message."); return; } @@ -511,13 +491,13 @@ private void HandleOHLCVMessage(JsonElement root, JsonElement header) period ); - Log.Trace($"DatabentoRawClient: OHLCV bar: {matchedSymbol} O={open} H={high} L={low} C={close} V={volume} at {timestamp}"); + // Log.Trace($"DataBentoRawLiveClient: OHLCV bar: {matchedSymbol} O={open} H={high} L={low} C={close} V={volume} at {timestamp}"); DataReceived?.Invoke(this, tradeBar); } } catch (Exception ex) { - Log.Error($"DatabentoRawClient.HandleOHLCVMessage(): Error: {ex.Message}"); + Log.Error($"DataBentoRawLiveClient.HandleOHLCVMessage(): Error: {ex.Message}"); } } @@ -543,7 +523,7 @@ private void HandleMBPMessage(JsonElement root, JsonElement header) if (!_instrumentIdToSymbol.TryGetValue(instrumentId, out var matchedSymbol)) { - Log.Trace($"DatabentoRawClient: No mapping for instrument_id {instrumentId} in MBP message."); + Log.Trace($"DataBentoRawLiveClient: No mapping for instrument_id {instrumentId} in MBP message."); return; } @@ -582,13 +562,13 @@ private void HandleMBPMessage(JsonElement root, JsonElement header) // QuantConnect convention: Quote ticks should have zero Price and Quantity quoteTick.Quantity = 0; - Log.Trace($"DatabentoRawClient: Quote tick: {matchedSymbol} Bid={quoteTick.BidPrice}x{quoteTick.BidSize} Ask={quoteTick.AskPrice}x{quoteTick.AskSize}"); + // Log.Trace($"DataBentoRawLiveClient: Quote tick: {matchedSymbol} Bid={quoteTick.BidPrice}x{quoteTick.BidSize} Ask={quoteTick.AskPrice}x{quoteTick.AskSize}"); DataReceived?.Invoke(this, quoteTick); } } catch (Exception ex) { - Log.Error($"DatabentoRawClient.HandleMBPMessage(): Error: {ex.Message}"); + Log.Error($"DataBentoRawLiveClient.HandleMBPMessage(): Error: {ex.Message}"); } } @@ -614,7 +594,7 @@ private void HandleTradeTickMessage(JsonElement root, JsonElement header) if (!_instrumentIdToSymbol.TryGetValue(instrumentId, out var matchedSymbol)) { - Log.Trace($"DatabentoRawClient: No mapping for instrument_id {instrumentId} in trade message."); + Log.Trace($"DataBentoRawLiveClient: No mapping for instrument_id {instrumentId} in trade message."); return; } @@ -639,66 +619,14 @@ private void HandleTradeTickMessage(JsonElement root, JsonElement header) AskSize = 0 }; - Log.Trace($"DatabentoRawClient: Trade tick: {matchedSymbol} Price={price} Quantity={size}"); + // Log.Trace($"DataBentoRawLiveClient: Trade tick: {matchedSymbol} Price={price} Quantity={size}"); DataReceived?.Invoke(this, tradeTick); } } catch (Exception ex) { - Log.Error($"DatabentoRawClient.HandleTradeTickMessage(): Error: {ex.Message}"); - } - } - - /// - /// Maps a LEAN symbol to DataBento symbol format - /// - private string MapSymbolToDataBento(Symbol symbol) - { - if (symbol.SecurityType == SecurityType.Future) - { - // For DataBento, use the root symbol with .FUT suffix for parent subscription - // ES19Z25 -> ES.FUT - var value = symbol.Value; - - // Extract root by removing digits and month codes - var root = new string(value.TakeWhile(c => !char.IsDigit(c)).ToArray()); - - return $"{root}.FUT"; + Log.Error($"DataBentoRawLiveClient.HandleTradeTickMessage(): Error: {ex.Message}"); } - - return symbol.Value; - } - - /// - /// Pick Databento schema from Lean resolution/ticktype - /// - private string GetSchema(Resolution resolution, TickType tickType) - { - if (tickType == TickType.Trade) - { - if (resolution == Resolution.Tick) - return "trades"; - if (resolution == Resolution.Second) - return "ohlcv-1s"; - if (resolution == Resolution.Minute) - return "ohlcv-1m"; - if (resolution == Resolution.Hour) - return "ohlcv-1h"; - if (resolution == Resolution.Daily) - return "ohlcv-1d"; - } - else if (tickType == TickType.Quote) - { - // top of book - if (resolution == Resolution.Tick || resolution == Resolution.Second || resolution == Resolution.Minute || resolution == Resolution.Hour || resolution == Resolution.Daily) - return "mbp-1"; - } - else if (tickType == TickType.OpenInterest) - { - return "statistics"; - } - - throw new NotSupportedException($"Unsupported resolution {resolution} / {tickType}"); } /// @@ -723,11 +651,11 @@ public void Disconnect() } catch (Exception ex) { - Log.Trace($"DatabentoRawClient.Disconnect(): Error during disconnect: {ex.Message}"); + Log.Trace($"DataBentoRawLiveClient.Disconnect(): Error during disconnect: {ex.Message}"); } ConnectionStatusChanged?.Invoke(this, false); - Log.Trace("DatabentoRawClient.Disconnect(): Disconnected from DataBento gateway"); + Log.Trace("DataBentoRawLiveClient.Disconnect(): Disconnected from DataBento gateway"); } } @@ -748,5 +676,21 @@ public void Dispose() _stream?.Dispose(); _tcpClient?.Dispose(); } + + /// + /// Computes the SHA-256 hash of the input string + /// + private static string ComputeSHA256(string input) + { + using var sha = SHA256.Create(); + var hash = sha.ComputeHash(Encoding.UTF8.GetBytes(input)); + var sb = new StringBuilder(); + foreach (byte b in hash) + { + sb.Append(b.ToString("x2")); + } + return sb.ToString(); + } + } } diff --git a/QuantConnect.DataBento/DataBentoSymbolMapper.cs b/QuantConnect.DataBento/DataBentoSymbolMapper.cs new file mode 100644 index 0000000..3c6b8b3 --- /dev/null +++ b/QuantConnect.DataBento/DataBentoSymbolMapper.cs @@ -0,0 +1,174 @@ +/* + * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. + * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +using QuantConnect; +using QuantConnect.Brokerages; +using System.Globalization; + +namespace QuantConnect.Lean.DataSource.DataBento +{ + /// + /// Provides the mapping between Lean symbols and DataBento symbols. + /// + public class DataBentoSymbolMapper : ISymbolMapper + { + private readonly Dictionary _leanSymbolsCache = new(); + private readonly Dictionary _brokerageSymbolsCache = new(); + private readonly object _locker = new(); + + /// + /// Converts a Lean symbol instance to a brokerage symbol + /// + /// A Lean symbol instance + /// The brokerage symbol + public string GetBrokerageSymbol(Symbol symbol) + { + if (symbol == null || string.IsNullOrWhiteSpace(symbol.Value)) + { + throw new ArgumentException($"Invalid symbol: {(symbol == null ? "null" : symbol.ToString())}"); + } + + return GetBrokerageSymbol(symbol, false); + } + + /// + /// Converts a Lean symbol instance to a brokerage symbol with updating of cached symbol collection + /// + /// + /// + /// + /// + public string GetBrokerageSymbol(Symbol symbol, bool isUpdateCachedSymbol) + { + lock (_locker) + { + if (!_brokerageSymbolsCache.TryGetValue(symbol, out var brokerageSymbol) || isUpdateCachedSymbol) + { + switch (symbol.SecurityType) + { + case SecurityType.Future: + brokerageSymbol = $"{symbol.ID.Symbol}.FUT"; + break; + + case SecurityType.Equity: + brokerageSymbol = symbol.Value; + break; + + default: + throw new Exception($"DataBentoSymbolMapper.GetBrokerageSymbol(): unsupported security type: {symbol.SecurityType}"); + } + + // Lean-to-DataBento symbol conversion is accurate, so we can cache it both ways + _brokerageSymbolsCache[symbol] = brokerageSymbol; + _leanSymbolsCache[brokerageSymbol] = symbol; + } + + return brokerageSymbol; + } + } + + /// + /// Converts a brokerage symbol to a Lean symbol instance + /// + /// The brokerage symbol + /// The security type + /// The market + /// Expiration date of the security(if applicable) + /// A new Lean Symbol instance + public Symbol GetLeanSymbol(string brokerageSymbol, SecurityType securityType, string market, + DateTime expirationDate = new DateTime(), decimal strike = 0, OptionRight optionRight = 0) + { + if (string.IsNullOrWhiteSpace(brokerageSymbol)) + { + throw new ArgumentException("Invalid symbol: " + brokerageSymbol); + } + + lock (_locker) + { + if (!_leanSymbolsCache.TryGetValue(brokerageSymbol, out var leanSymbol)) + { + switch (securityType) + { + case SecurityType.Future: + leanSymbol = Symbol.CreateFuture(brokerageSymbol, market, expirationDate); + break; + + default: + throw new Exception($"DataBentoSymbolMapper.GetLeanSymbol(): unsupported security type: {securityType}"); + } + + _leanSymbolsCache[brokerageSymbol] = leanSymbol; + _brokerageSymbolsCache[leanSymbol] = brokerageSymbol; + } + + return leanSymbol; + } + } + + /// + /// Gets the Lean symbol for the specified DataBento symbol + /// + /// The databento symbol + /// The corresponding Lean symbol + public Symbol GetLeanSymbol(string databentoSymbol) + { + lock (_locker) + { + if (!_leanSymbolsCache.TryGetValue(databentoSymbol, out var symbol)) + { + symbol = GetLeanSymbol(databentoSymbol, SecurityType.Equity, Market.USA); + } + + return symbol; + } + } + + /// + /// Converts a brokerage future symbol to a Lean symbol instance + /// + /// The brokerage symbol + /// A new Lean Symbol instance + public Symbol GetLeanSymbolForFuture(string brokerageSymbol) + { + if (string.IsNullOrWhiteSpace(brokerageSymbol)) + { + throw new ArgumentException("Invalid symbol: " + brokerageSymbol); + } + + // ignore futures spreads + if (brokerageSymbol.Contains("-")) + { + return null; + } + + lock (_locker) + { + if (!_leanSymbolsCache.TryGetValue(brokerageSymbol, out var leanSymbol)) + { + leanSymbol = SymbolRepresentation.ParseFutureSymbol(brokerageSymbol); + + if (leanSymbol == null) + { + throw new ArgumentException("Invalid future symbol: " + brokerageSymbol); + } + + _leanSymbolsCache[brokerageSymbol] = leanSymbol; + } + + return leanSymbol; + } + } + } +} diff --git a/QuantConnect.DataBento/QuantConnect.DataSource.DataBento.csproj b/QuantConnect.DataBento/QuantConnect.DataSource.DataBento.csproj index 718002c..249ad0a 100644 --- a/QuantConnect.DataBento/QuantConnect.DataSource.DataBento.csproj +++ b/QuantConnect.DataBento/QuantConnect.DataSource.DataBento.csproj @@ -39,4 +39,8 @@ + + + + diff --git a/models/DataBentoTypes.cs b/models/DataBentoTypes.cs new file mode 100644 index 0000000..b7fa58c --- /dev/null +++ b/models/DataBentoTypes.cs @@ -0,0 +1,112 @@ +using System; +using CsvHelper.Configuration.Attributes; +using QuantConnect; + +namespace QuantConnect.Lean.DataSource.DataBento.Models +{ + /// + /// Provides a constant for scaling price values from DataBento. + /// + public static class PriceScaling + { + /// + /// price scale factor is needed to find the true price from the message + /// Due to compression each "1 unit corresponds to 1e-9, i.e. 1/1,000,000,000 or 0.000000001" + /// https://databento.com/docs/api-reference-live/basics/schemas-and-conventions?historical=raw&live=raw&reference=raw + /// + public const decimal PriceScaleFactor = 1e-9m; + } + + /// + /// Represents a single bar of historical data from DataBento. + /// This class is used to map CSV data from HTTP requests into a structured format. + /// + public class DatabentoBar + { + [Name("ts_event")] + public long TimestampNanos { get; set; } + + public DateTime Timestamp => Time.UnixNanosecondTimeStampToDateTime(TimestampNanos); + + [Name("open")] + public long RawOpen { get; set; } + + [Name("high")] + public long RawHigh { get; set; } + + [Name("low")] + public long RawLow { get; set; } + + + + [Name("close")] + public long RawClose { get; set; } + + [Ignore] + public decimal Open => RawOpen == long.MaxValue ? 0m : RawOpen * PriceScaling.PriceScaleFactor; + + [Ignore] + public decimal High => RawHigh == long.MaxValue ? 0m : RawHigh * PriceScaling.PriceScaleFactor; + + [Ignore] + public decimal Low => RawLow == long.MaxValue ? 0m : RawLow * PriceScaling.PriceScaleFactor; + + [Ignore] + public decimal Close => RawClose == long.MaxValue ? 0m : RawClose * PriceScaling.PriceScaleFactor; + + [Name("volume")] + public long RawVolume { get; set; } + + [Ignore] + public decimal Volume => RawVolume == long.MaxValue ? 0m : RawVolume; + } + + /// + /// Represents a single trade event from DataBento. + /// + public class DatabentoTrade + { + [Name("ts_event")] + public long TimestampNanos { get; set; } + + public DateTime Timestamp => Time.UnixNanosecondTimeStampToDateTime(TimestampNanos); + + [Name("price")] + public long RawPrice { get; set; } + + [Ignore] + public decimal Price => RawPrice == long.MaxValue ? 0m : RawPrice * PriceScaling.PriceScaleFactor; + + [Name("size")] + public int Size { get; set; } + } + + /// + /// Represents a single quote from DataBento. + /// + public class DatabentoQuote + { + [Name("ts_event")] + public long TimestampNanos { get; set; } + + public DateTime Timestamp => Time.UnixNanosecondTimeStampToDateTime(TimestampNanos); + + [Name("bid_px_00")] + public long RawBidPrice { get; set; } + + [Ignore] + public decimal BidPrice => RawBidPrice == long.MaxValue ? 0m : RawBidPrice * PriceScaling.PriceScaleFactor; + + [Name("bid_sz_00")] + public int BidSize { get; set; } + + [Name("ask_px_00")] + public long RawAskPrice { get; set; } + + [Ignore] + public decimal AskPrice => RawAskPrice == long.MaxValue ? 0m : RawAskPrice * PriceScaling.PriceScaleFactor; + + [Name("ask_sz_00")] + public int AskSize { get; set; } + } +}