diff --git a/Demonstration.cs b/Demonstration.cs index f5a1fe1..f98b935 100644 --- a/Demonstration.cs +++ b/Demonstration.cs @@ -22,6 +22,7 @@ using QuantConnect.Securities.Future; using QuantConnect.Util; using System; +using System.Linq; namespace QuantConnect.Algorithm.CSharp { @@ -39,7 +40,19 @@ public override void Initialize() var exp = new DateTime(2025, 12, 19); var symbol = QuantConnect.Symbol.CreateFuture("ES", Market.CME, exp); + //_es = AddFutureContract(symbol, Resolution.Tick, true, 1, true); _es = AddFutureContract(symbol, Resolution.Second, true, 1, true); + Log($"_es: {_es}"); + + var history = History(_es.Symbol, 10, Resolution.Minute).ToList(); + + Log($"History returned {history.Count} bars"); + + foreach (var bar in history) + { + Log($"History Bar: {bar.Time} - O:{bar.Open} H:{bar.High} L:{bar.Low} C:{bar.Close} V:{bar.Volume}"); + } + } public override void OnData(Slice slice) @@ -51,13 +64,13 @@ public override void OnData(Slice slice) } Log($"OnData: Slice has {slice.Count} data points"); - + // For Tick resolution, check Ticks collection if (slice.Ticks.ContainsKey(_es.Symbol)) { var ticks = slice.Ticks[_es.Symbol]; Log($"Received {ticks.Count} ticks for {_es.Symbol}"); - + foreach (var tick in ticks) { if (tick.TickType == TickType.Trade) @@ -70,12 +83,11 @@ public override void OnData(Slice slice) } } } - - // These won't have data for Tick resolution - if (slice.Bars.ContainsKey(_es.Symbol)) + + // Access OHLCV bars + foreach (var bar in slice.Bars.Values) { - var bar = slice.Bars[_es.Symbol]; - Log($"Bar - O:{bar.Open} H:{bar.High} L:{bar.Low} C:{bar.Close} V:{bar.Volume}"); + Log($"OHLCV BAR: {bar.Symbol.Value} - O: {bar.Open}, H: {bar.High}, L: {bar.Low}, C: {bar.Close}, V: {bar.Volume}"); } } } diff --git a/DemonstrationUniverse.cs b/DemonstrationUniverse.cs deleted file mode 100644 index 86a0d7b..0000000 --- a/DemonstrationUniverse.cs +++ /dev/null @@ -1,28 +0,0 @@ -/* - * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. - * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * -*/ - -using System.Linq; -using QuantConnect.Data.UniverseSelection; -using QuantConnect.DataSource; - -namespace QuantConnect.Algorithm.CSharp -{ - /// - /// Example algorithm using the custom data type as a source of alpha - /// - public class CustomDataUniverse : QCAlgorithm - { } -} \ No newline at end of file diff --git a/QuantConnect.DataBento.Tests/QuantConnect.DataSource.DataBento.Tests.csproj b/QuantConnect.DataBento.Tests/QuantConnect.DataSource.DataBento.Tests.csproj index 8e212ee..d2d7ef1 100644 --- a/QuantConnect.DataBento.Tests/QuantConnect.DataSource.DataBento.Tests.csproj +++ b/QuantConnect.DataBento.Tests/QuantConnect.DataSource.DataBento.Tests.csproj @@ -5,7 +5,6 @@ - diff --git a/QuantConnect.DataBento/DataBentoDataDownloader.cs b/QuantConnect.DataBento/DataBentoDataDownloader.cs index c8a1783..2a44e55 100644 --- a/QuantConnect.DataBento/DataBentoDataDownloader.cs +++ b/QuantConnect.DataBento/DataBentoDataDownloader.cs @@ -33,15 +33,13 @@ namespace QuantConnect.Lean.DataSource.DataBento { /// - /// Data downloader class for pulling data from Data Provider + /// Data downloader for historical data from DataBento's Raw HTTP API + /// Converts DataBento data to Lean data types /// public class DataBentoDataDownloader : IDataDownloader, IDisposable { - /// private readonly HttpClient _httpClient; - private readonly string _apiKey; - private const decimal PriceScaleFactor = 1e-9m; /// @@ -72,8 +70,6 @@ public IEnumerable Get(DataDownloaderGetParameters parameters) { var symbol = parameters.Symbol; var resolution = parameters.Resolution; - var startUtc = parameters.StartUtc; - var endUtc = parameters.EndUtc; var tickType = parameters.TickType; var dataset = "GLBX.MDP3"; // hard coded for now. Later on can add equities and options with different mapping @@ -85,8 +81,8 @@ public IEnumerable Get(DataDownloaderGetParameters parameters) body.Append($"dataset={dataset}"); body.Append($"&symbols={dbSymbol}"); body.Append($"&schema={schema}"); - body.Append($"&start={startUtc:yyyy-MM-ddTHH:mm}"); - body.Append($"&end={endUtc:yyyy-MM-ddTHH:mm}"); + body.Append($"&start={parameters.StartUtc:yyyy-MM-ddTHH:mm}"); + body.Append($"&end={parameters.EndUtc:yyyy-MM-ddTHH:mm}"); body.Append("&stype_in=parent"); body.Append("&encoding=csv"); diff --git a/QuantConnect.DataBento/DataBentoDataProvider.cs b/QuantConnect.DataBento/DataBentoDataProvider.cs index f1ed3c0..8cf6015 100644 --- a/QuantConnect.DataBento/DataBentoDataProvider.cs +++ b/QuantConnect.DataBento/DataBentoDataProvider.cs @@ -15,8 +15,10 @@ */ using System; +using System.Linq; using NodaTime; using QuantConnect.Data; +using QuantConnect.Data.Market; using QuantConnect.Util; using QuantConnect.Interfaces; using System.Collections.Generic; @@ -24,8 +26,7 @@ using QuantConnect.Logging; using QuantConnect.Packets; using QuantConnect.Securities; -using System.Threading.Tasks; -using QuantConnect.Data.Market; +using System.Collections.Concurrent; namespace QuantConnect.Lean.DataSource.DataBento { @@ -34,46 +35,20 @@ namespace QuantConnect.Lean.DataSource.DataBento /// public class DataBentoProvider : IDataQueueHandler { - /// - /// - /// private readonly IDataAggregator _dataAggregator = Composer.Instance.GetExportedValueByTypeName( Config.Get("data-aggregator", "QuantConnect.Lean.Engine.DataFeeds.AggregationManager"), forceTypeNameOnExisting: false); - - /// - /// - /// private EventBasedDataQueueHandlerSubscriptionManager _subscriptionManager = null!; - private readonly List _activeSubscriptionConfigs = new(); - - private readonly System.Collections.Concurrent.ConcurrentDictionary _subscriptionConfigs = new(); - - /// - /// - /// + private readonly ConcurrentDictionary _subscriptionConfigs = new(); private DatabentoRawClient _client = null!; - - /// - /// DataBento API key - /// private readonly string _apiKey; - - /// - /// DataBento historical data downloader - /// private readonly DataBentoDataDownloader _dataDownloader; - - private bool _unsupportedSecurityTypeMessageLogged; - private bool _unsupportedDataTypeMessageLogged; private bool _potentialUnsupportedResolutionMessageLogged; - private bool _sessionStarted = false; private readonly object _sessionLock = new object(); - private readonly MarketHoursDatabase _marketHoursDatabase = MarketHoursDatabase.FromDataFolder(); - private readonly System.Collections.Concurrent.ConcurrentDictionary _symbolExchangeTimeZones = new(); - + private readonly ConcurrentDictionary _symbolExchangeTimeZones = new(); + /// /// Returns true if we're currently connected to the Data Provider /// @@ -84,7 +59,6 @@ public class DataBentoProvider : IDataQueueHandler /// public DataBentoProvider() { - Log.Trace("From Plugin DataBentoProvider.DataBentoProvider() being initialized 1"); _apiKey = Config.Get("databento-api-key"); if (string.IsNullOrEmpty(_apiKey)) { @@ -101,7 +75,6 @@ public DataBentoProvider() /// DataBento API key public DataBentoProvider(string apiKey) { - Log.Trace("From Plugin DataBentoProvider.DataBentoProvider() being initialized 2"); _apiKey = apiKey ?? throw new ArgumentNullException(nameof(apiKey)); _dataDownloader = new DataBentoDataDownloader(_apiKey); Initialize(); @@ -113,64 +86,35 @@ public DataBentoProvider(string apiKey) private void Initialize() { Log.Trace("DataBentoProvider.Initialize(): Starting initialization"); - _subscriptionManager = new EventBasedDataQueueHandlerSubscriptionManager(); _subscriptionManager.SubscribeImpl = (symbols, tickType) => { Log.Trace($"DataBentoProvider.SubscribeImpl(): Received subscription request for {symbols.Count()} symbols, TickType={tickType}"); - foreach (var symbol in symbols) { Log.Trace($"DataBentoProvider.SubscribeImpl(): Processing symbol {symbol}"); - - if (_subscriptionConfigs.TryGetValue(symbol, out var config)) + if (!_subscriptionConfigs.TryGetValue(symbol, out var config)) { - Log.Trace($"DataBentoProvider.SubscribeImpl(): Found config for {symbol}, Resolution={config.Resolution}, TickType={config.TickType}"); + Log.Error($"DataBentoProvider.SubscribeImpl(): No subscription config found for {symbol}"); + return false; + } + if (_client?.IsConnected != true) + { + Log.Error($"DataBentoProvider.SubscribeImpl(): Client is not connected. Cannot subscribe to {symbol}"); + return false; + } - if (_client?.IsConnected == true) - { - Log.Trace($"DataBentoProvider.SubscribeImpl(): Client is connected, attempting async subscribe for {symbol}"); - Task.Run(async () => - { - // If the requested resolution is higher than tick, we subscribe to ticks and let the aggregator handle it. - var resolutionToSubscribe = config.Resolution > Resolution.Tick ? Resolution.Tick : config.Resolution; - var success = _client.Subscribe(config.Symbol, resolutionToSubscribe, config.TickType); - if (success) - { - Log.Trace($"DataBentoProvider.SubscribeImpl(): Successfully subscribed to {config.Symbol} at {resolutionToSubscribe} resolution"); - - // Start session after first successful subscription - lock (_sessionLock) - { - if (!_sessionStarted) - { - Log.Trace("DataBentoProvider.SubscribeImpl(): Starting DataBento session to receive data"); - _sessionStarted = _client.StartSession(); - if (_sessionStarted) - { - Log.Trace("DataBentoProvider.SubscribeImpl(): Session started successfully - data should begin flowing"); - } - else - { - Log.Error("DataBentoProvider.SubscribeImpl(): Failed to start session"); - } - } - } - } - else - { - Log.Error($"DataBentoProvider.SubscribeImpl(): Failed to subscribe to live data for {config.Symbol}"); - } - }); - } - else - { - Log.Trace($"DataBentoProvider.SubscribeImpl(): Client not connected, skipping subscription for {symbol}"); - } + var resolution = config.Resolution > Resolution.Tick ? Resolution.Tick : config.Resolution; + if (!_client.Subscribe(config.Symbol, resolution, config.TickType)) + { + Log.Error($"Failed to subscribe to {config.Symbol}"); + return false; } - else + + lock (_sessionLock) { - Log.Trace($"DataBentoProvider.SubscribeImpl(): No config found for {symbol}, skipping"); + if (!_sessionStarted) + _sessionStarted = _client.StartSession(); } } @@ -179,25 +123,15 @@ private void Initialize() _subscriptionManager.UnsubscribeImpl = (symbols, tickType) => { - Log.Trace($"DataBentoProvider.UnsubscribeImpl(): Received unsubscribe request for {symbols.Count()} symbols, TickType={tickType}"); - foreach (var symbol in symbols) { Log.Trace($"DataBentoProvider.UnsubscribeImpl(): Processing symbol {symbol}"); - - if (_client?.IsConnected == true) - { - Log.Trace($"DataBentoProvider.UnsubscribeImpl(): Client is connected, unsubscribing from {symbol}"); - Task.Run(() => - { - _client.Unsubscribe(symbol); - Log.Trace($"DataBentoProvider.UnsubscribeImpl(): Unsubscribed from {symbol}"); - }); - } - else + if (_client?.IsConnected != true) { - Log.Trace($"DataBentoProvider.UnsubscribeImpl(): Client not connected, skipping unsubscribe for {symbol}"); + throw new InvalidOperationException($"DataBentoProvider.UnsubscribeImpl(): Client is not connected. Cannot unsubscribe from {symbol}"); } + + _client.Unsubscribe(symbol); } return true; @@ -210,11 +144,11 @@ private void Initialize() _client.ConnectionStatusChanged += OnConnectionStatusChanged; // Connect to live gateway - Log.Trace("DataBentoProvider.Initialize(): Attempting async connection to DataBento live gateway"); - Task.Run(async () => + Log.Trace("DataBentoProvider.Initialize(): Attempting connection to DataBento live gateway"); + Task.Run(() => { - var connected = await _client.ConnectAsync(); - Log.Trace($"DataBentoProvider.Initialize(): ConnectAsync() returned {connected}"); + var connected = _client.Connect(); + Log.Trace($"DataBentoProvider.Initialize(): Connect() returned {connected}"); if (connected) { @@ -226,6 +160,7 @@ private void Initialize() } }); + Log.Trace("DataBentoProvider.Initialize(): Initialization complete"); } @@ -237,8 +172,10 @@ private void Initialize() /// The new enumerator for this subscription request public IEnumerator? Subscribe(SubscriptionDataConfig dataConfig, EventHandler newDataAvailableHandler) { - Log.Trace("From Plugin Subscribed ENTER"); - if (!CanSubscribe(dataConfig)){ + Log.Trace($"DataBentoProvider.Subscribe(): Received subscription request for {dataConfig.Symbol}, Resolution={dataConfig.Resolution}, TickType={dataConfig.TickType}"); + if (!CanSubscribe(dataConfig)) + { + Log.Error($"DataBentoProvider.Subscribe(): Cannot subscribe to {dataConfig.Symbol} with Resolution={dataConfig.Resolution}, TickType={dataConfig.TickType}"); return null; } @@ -247,7 +184,6 @@ private void Initialize() _subscriptionManager.Subscribe(dataConfig); _activeSubscriptionConfigs.Add(dataConfig); - Log.Trace("From Plugin Subscribed DONE"); return enumerator; } @@ -257,11 +193,13 @@ private void Initialize() /// Subscription config to be removed public void Unsubscribe(SubscriptionDataConfig dataConfig) { + Log.Trace($"DataBentoProvider.Unsubscribe(): Received unsubscription request for {dataConfig.Symbol}, Resolution={dataConfig.Resolution}, TickType={dataConfig.TickType}"); _subscriptionConfigs.TryRemove(dataConfig.Symbol, out _); _subscriptionManager.Unsubscribe(dataConfig); var toRemove = _activeSubscriptionConfigs.FirstOrDefault(c => c.Symbol == dataConfig.Symbol && c.TickType == dataConfig.TickType); if (toRemove != null) { + Log.Trace($"DataBentoProvider.Unsubscribe(): Removing active subscription for {dataConfig.Symbol}, Resolution={dataConfig.Resolution}, TickType={dataConfig.TickType}"); _activeSubscriptionConfigs.Remove(toRemove); } _dataAggregator.Remove(dataConfig); @@ -294,8 +232,10 @@ public void Dispose() /// An enumerable of BaseData points public IEnumerable? GetHistory(Data.HistoryRequest request) { + Log.Trace($"DataBentoProvider.GetHistory(): Received history request for {request.Symbol}, Resolution={request.Resolution}, TickType={request.TickType}"); if (!CanSubscribe(request.Symbol)) { + Log.Error($"DataBentoProvider.GetHistory(): Cannot provide history for {request.Symbol} with Resolution={request.Resolution}, TickType={request.TickType}"); return null; } @@ -358,12 +298,7 @@ private bool IsSupported(SecurityType securityType, Type dataType, TickType tick // Check supported security types if (!IsSecurityTypeSupported(securityType)) { - if (!_unsupportedSecurityTypeMessageLogged) - { - _unsupportedSecurityTypeMessageLogged = true; - Log.Trace($"DataBentoDataProvider.IsSupported(): Unsupported security type: {securityType}"); - } - return false; + throw new NotSupportedException($"Unsupported security type: {securityType}"); } // Check supported data types @@ -372,12 +307,7 @@ private bool IsSupported(SecurityType securityType, Type dataType, TickType tick dataType != typeof(Tick) && dataType != typeof(OpenInterest)) { - if (!_unsupportedDataTypeMessageLogged) - { - _unsupportedDataTypeMessageLogged = true; - Log.Trace($"DataBentoDataProvider.IsSupported(): Unsupported data type: {dataType}"); - } - return false; + throw new NotSupportedException($"Unsupported data type: {dataType}"); } // Warn about potential limitations for tick data @@ -422,7 +352,7 @@ private void OnDataReceived(object? sender, BaseData data) { tick.Time = GetTickTime(tick.Symbol, tick.Time); _dataAggregator.Update(tick); - + Log.Trace($"DataBentoProvider.OnDataReceived(): Updated tick - Symbol: {tick.Symbol}, " + $"TickType: {tick.TickType}, Price: {tick.Value}, Quantity: {tick.Quantity}"); } @@ -431,7 +361,7 @@ private void OnDataReceived(object? sender, BaseData data) tradeBar.Time = GetTickTime(tradeBar.Symbol, tradeBar.Time); tradeBar.EndTime = GetTickTime(tradeBar.Symbol, tradeBar.EndTime); _dataAggregator.Update(tradeBar); - + Log.Trace($"DataBentoProvider.OnDataReceived(): Updated TradeBar - Symbol: {tradeBar.Symbol}, " + $"O:{tradeBar.Open} H:{tradeBar.High} L:{tradeBar.Low} C:{tradeBar.Close} V:{tradeBar.Volume}"); } @@ -461,28 +391,25 @@ private void OnConnectionStatusChanged(object? sender, bool isConnected) { _sessionStarted = false; } - - // Resubscribe to all active subscriptions - Task.Run(() => + + // Resubscribe to all active subscriptions + foreach (var config in _activeSubscriptionConfigs) { - foreach (var config in _activeSubscriptionConfigs) - { - _client.Subscribe(config.Symbol, config.Resolution, config.TickType); - } - - // Start session after resubscribing - if (_activeSubscriptionConfigs.Any()) + _client.Subscribe(config.Symbol, config.Resolution, config.TickType); + } + + // Start session after resubscribing + if (_activeSubscriptionConfigs.Any()) + { + lock (_sessionLock) { - lock (_sessionLock) + if (!_sessionStarted) { - if (!_sessionStarted) - { - Log.Trace("DataBentoProvider.OnConnectionStatusChanged(): Starting session after reconnection"); - _sessionStarted = _client.StartSession(); - } + Log.Trace("DataBentoProvider.OnConnectionStatusChanged(): Starting session after reconnection"); + _sessionStarted = _client.StartSession(); } } - }); + } } } } diff --git a/QuantConnect.DataBento/DataBentoHistoryProivder.cs b/QuantConnect.DataBento/DataBentoHistoryProivder.cs index 64b1da8..a93b50e 100644 --- a/QuantConnect.DataBento/DataBentoHistoryProivder.cs +++ b/QuantConnect.DataBento/DataBentoHistoryProivder.cs @@ -27,7 +27,6 @@ using System.Collections.Generic; using QuantConnect.Configuration; using QuantConnect.Securities; -using System.Threading.Tasks; using QuantConnect.Data.Consolidators; namespace QuantConnect.Lean.DataSource.DataBento @@ -39,36 +38,19 @@ public partial class DataBentoHistoryProvider : SynchronizingHistoryProvider { private int _dataPointCount; private DataBentoDataDownloader _dataDownloader; - - /// - /// Indicates whether a error for an invalid start time has been fired, where the start time is greater than or equal to the end time in UTC. - /// private volatile bool _invalidStartTimeErrorFired; - - /// - /// Indicates whether an error has been fired due to invalid conditions if the TickType is and the is greater than one second. - /// private volatile bool _invalidTickTypeAndResolutionErrorFired; - - /// - /// Indicates whether unsupported tick type message has been logged - /// private volatile bool _unsupportedTickTypeMessagedLogged; - - /// - /// Market hours database instance - /// private MarketHoursDatabase _marketHoursDatabase; - + private bool _unsupportedSecurityTypeMessageLogged; + private bool _unsupportedDataTypeMessageLogged; + private bool _potentialUnsupportedResolutionMessageLogged; + /// /// Gets the total number of data points emitted by this history provider /// public override int DataPointCount => _dataPointCount; - private bool _unsupportedSecurityTypeMessageLogged; - private bool _unsupportedDataTypeMessageLogged; - private bool _potentialUnsupportedResolutionMessageLogged; - /// /// Initializes this history provider to work for the specified job /// diff --git a/QuantConnect.DataBento/DataBentoRawLiveClient.cs b/QuantConnect.DataBento/DataBentoRawLiveClient.cs index 9a5c8b8..2e7bb97 100644 --- a/QuantConnect.DataBento/DataBentoRawLiveClient.cs +++ b/QuantConnect.DataBento/DataBentoRawLiveClient.cs @@ -19,10 +19,9 @@ using System.Text; using System.Net.Sockets; using System.Security.Cryptography; -using System.Threading; -using System.Threading.Tasks; using System.Collections.Concurrent; using System.Text.Json; +using System.Linq; using QuantConnect.Data; using QuantConnect.Data.Market; using QuantConnect.Logging; @@ -80,9 +79,9 @@ public DatabentoRawClient(string apiKey, string gateway = "glbx-mdp3.lsg.databen /// /// Connects to the DataBento live gateway /// - public async Task ConnectAsync() + public bool Connect() { - Log.Trace("DatabentoRawClient.ConnectAsync(): Connecting to DataBento live gateway"); + Log.Trace("DatabentoRawClient.Connect(): Connecting to DataBento live gateway"); if (_isConnected || _disposed) { return _isConnected; @@ -95,27 +94,27 @@ public async Task ConnectAsync() var port = parts.Length > 1 ? int.Parse(parts[1]) : 13000; _tcpClient = new TcpClient(); - await _tcpClient.ConnectAsync(host, port).ConfigureAwait(false); + _tcpClient.Connect(host, port); _stream = _tcpClient.GetStream(); _reader = new StreamReader(_stream, Encoding.ASCII); _writer = new StreamWriter(_stream, Encoding.ASCII) { AutoFlush = true }; // Perform authentication handshake - if (await AuthenticateAsync().ConfigureAwait(false)) + if (Authenticate()) { _isConnected = true; ConnectionStatusChanged?.Invoke(this, true); - // Start message processing task - _ = Task.Run(() => ProcessMessagesAsync(_cancellationTokenSource.Token)); + // Start message processing + ProcessMessages(); - Log.Trace("DatabentoRawClient.ConnectAsync(): Connected and authenticated to DataBento live gateway"); + Log.Trace("DatabentoRawClient.Connect(): Connected and authenticated to DataBento live gateway"); return true; } } catch (Exception ex) { - Log.Error($"DatabentoRawClient.ConnectAsync(): Failed to connect: {ex.Message}"); + Log.Error($"DatabentoRawClient.Connect(): Failed to connect: {ex.Message}"); Disconnect(); } @@ -125,7 +124,7 @@ public async Task ConnectAsync() /// /// Authenticates with the DataBento gateway using CRAM-SHA256 /// - private async Task AuthenticateAsync() + private bool Authenticate() { if (_reader == null || _writer == null) return false; @@ -133,23 +132,23 @@ private async Task AuthenticateAsync() try { // Read greeting and challenge - string? versionLine = await _reader.ReadLineAsync(); - string? cramLine = await _reader.ReadLineAsync(); + string? versionLine = _reader.ReadLine(); + string? cramLine = _reader.ReadLine(); if (string.IsNullOrEmpty(versionLine) || string.IsNullOrEmpty(cramLine)) { - Log.Error("DatabentoRawClient.AuthenticateAsync(): Failed to receive greeting or challenge"); + Log.Error("DatabentoRawClient.Authenticate(): Failed to receive greeting or challenge"); return false; } - Log.Trace($"DatabentoRawClient.AuthenticateAsync(): Version: {versionLine}"); - Log.Trace($"DatabentoRawClient.AuthenticateAsync(): Challenge: {cramLine}"); + Log.Trace($"DatabentoRawClient.Authenticate(): Version: {versionLine}"); + Log.Trace($"DatabentoRawClient.Authenticate(): Challenge: {cramLine}"); // Parse challenge string[] cramParts = cramLine.Split('='); if (cramParts.Length != 2 || cramParts[0] != "cram") { - Log.Error("DatabentoRawClient.AuthenticateAsync(): Invalid challenge format"); + Log.Error("DatabentoRawClient.Authenticate(): Invalid challenge format"); return false; } string cram = cramParts[1].Trim(); @@ -162,31 +161,31 @@ private async Task AuthenticateAsync() // Send auth message string authMsg = $"auth={authString}|dataset={_dataset}|encoding=json|ts_out=0"; - Log.Trace($"DatabentoRawClient.AuthenticateAsync(): Sending auth"); - await _writer.WriteLineAsync(authMsg); + Log.Trace($"DatabentoRawClient.Authenticate(): Sending auth"); + _writer.WriteLine(authMsg); // Read auth response - string? authResp = await _reader.ReadLineAsync(); + string? authResp = _reader.ReadLine(); if (string.IsNullOrEmpty(authResp)) { - Log.Error("DatabentoRawClient.AuthenticateAsync(): No authentication response received"); + Log.Error("DatabentoRawClient.Authenticate(): No authentication response received"); return false; } - Log.Trace($"DatabentoRawClient.AuthenticateAsync(): Auth response: {authResp}"); + Log.Trace($"DatabentoRawClient.Authenticate(): Auth response: {authResp}"); if (!authResp.Contains("success=1")) { - Log.Error($"DatabentoRawClient.AuthenticateAsync(): Authentication failed: {authResp}"); + Log.Error($"DatabentoRawClient.Authenticate(): Authentication failed: {authResp}"); return false; } - Log.Trace("DatabentoRawClient.AuthenticateAsync(): Authentication successful"); + Log.Trace("DatabentoRawClient.Authenticate(): Authentication successful"); return true; } catch (Exception ex) { - Log.Error($"DatabentoRawClient.AuthenticateAsync(): Authentication failed: {ex.Message}"); + Log.Error($"DatabentoRawClient.Authenticate(): Authentication failed: {ex.Message}"); return false; } } @@ -296,12 +295,12 @@ public bool Unsubscribe(Symbol symbol) /// /// Processes incoming messages from the DataBento gateway /// - private async Task ProcessMessagesAsync(CancellationToken cancellationToken) + private void ProcessMessages() { - Log.Trace("DatabentoRawClient.ProcessMessagesAsync(): Starting message processing"); + Log.Trace("DatabentoRawClient.ProcessMessages(): Starting message processing"); if (_reader == null) { - Log.Error("DatabentoRawClient.ProcessMessagesAsync(): No reader available"); + Log.Error("DatabentoRawClient.ProcessMessages(): No reader available"); return; } @@ -309,12 +308,12 @@ private async Task ProcessMessagesAsync(CancellationToken cancellationToken) try { - while (!cancellationToken.IsCancellationRequested && IsConnected) + while (!_cancellationTokenSource.IsCancellationRequested && IsConnected) { - var line = await _reader.ReadLineAsync(); + var line = _reader.ReadLine(); if (line == null) { - Log.Trace("DatabentoRawClient.ProcessMessagesAsync(): Connection closed by server"); + Log.Trace("DatabentoRawClient.ProcessMessages(): Connection closed by server"); break; } @@ -324,27 +323,27 @@ private async Task ProcessMessagesAsync(CancellationToken cancellationToken) messageCount++; if (messageCount <= 50 || messageCount % 100 == 0) { - Log.Trace($"DatabentoRawClient.ProcessMessagesAsync(): Message #{messageCount}: {line.Substring(0, Math.Min(150, line.Length))}..."); + Log.Trace($"DatabentoRawClient.ProcessMessages(): Message #{messageCount}: {line.Substring(0, Math.Min(150, line.Length))}..."); } - await ProcessSingleMessage(line); + ProcessSingleMessage(line); } } catch (OperationCanceledException) { - Log.Trace("DatabentoRawClient.ProcessMessagesAsync(): Message processing cancelled"); + Log.Trace("DatabentoRawClient.ProcessMessages(): Message processing cancelled"); } catch (IOException ex) when (ex.InnerException is SocketException) { - Log.Trace($"DatabentoRawClient.ProcessMessagesAsync(): Socket exception: {ex.Message}"); + Log.Trace($"DatabentoRawClient.ProcessMessages(): Socket exception: {ex.Message}"); } catch (Exception ex) { - Log.Error($"DatabentoRawClient.ProcessMessagesAsync(): Error processing messages: {ex.Message}\n{ex.StackTrace}"); + Log.Error($"DatabentoRawClient.ProcessMessages(): Error processing messages: {ex.Message}\n{ex.StackTrace}"); } finally { - Log.Trace($"DatabentoRawClient.ProcessMessagesAsync(): Exiting. Total messages processed: {messageCount}"); + Log.Trace($"DatabentoRawClient.ProcessMessages(): Exiting. Total messages processed: {messageCount}"); Disconnect(); } } @@ -352,10 +351,8 @@ private async Task ProcessMessagesAsync(CancellationToken cancellationToken) /// /// Processes a single message from DataBento /// - private async Task ProcessSingleMessage(string message) + private void ProcessSingleMessage(string message) { - await Task.CompletedTask; - try { using var document = JsonDocument.Parse(message); @@ -405,19 +402,19 @@ private async Task ProcessSingleMessage(string message) else if (rtype == 1) { // MBP-1 (Market By Price) - Quote ticks - await HandleMBPMessage(root, headerElement); + HandleMBPMessage(root, headerElement); return; } else if (rtype == 0) { // Trade messages - Trade ticks - await HandleTradeTickMessage(root, headerElement); + HandleTradeTickMessage(root, headerElement); return; } else if (rtype == 32 || rtype == 33 || rtype == 34 || rtype == 35) { // OHLCV bar messages - await HandleOHLCVMessage(root, headerElement); + HandleOHLCVMessage(root, headerElement); return; } } @@ -442,10 +439,8 @@ private async Task ProcessSingleMessage(string message) /// /// Handles OHLCV messages and converts to LEAN TradeBar data /// - private async Task HandleOHLCVMessage(JsonElement root, JsonElement header) + private void HandleOHLCVMessage(JsonElement root, JsonElement header) { - await Task.CompletedTask; - try { if (!header.TryGetProperty("ts_event", out var tsElement) || @@ -529,10 +524,8 @@ private async Task HandleOHLCVMessage(JsonElement root, JsonElement header) /// /// Handles MBP messages for quote ticks /// - private async Task HandleMBPMessage(JsonElement root, JsonElement header) + private void HandleMBPMessage(JsonElement root, JsonElement header) { - await Task.CompletedTask; - try { if (!header.TryGetProperty("ts_event", out var tsElement) || @@ -602,10 +595,8 @@ private async Task HandleMBPMessage(JsonElement root, JsonElement header) /// /// Handles trade tick messages. Aggressor fills /// - private async Task HandleTradeTickMessage(JsonElement root, JsonElement header) + private void HandleTradeTickMessage(JsonElement root, JsonElement header) { - await Task.CompletedTask; - try { if (!header.TryGetProperty("ts_event", out var tsElement) || diff --git a/README.md b/README.md index 642fc49..17afc80 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,8 @@ [![Build Status](https://github.com/QuantConnect/LeanDataSdk/workflows/Build%20%26%20Test/badge.svg)](https://github.com/QuantConnect/LeanDataSdk/actions?query=workflow%3A%22Build%20%26%20Test%22) +Welcome to the DataBento Library repository! This library, built on .NET 6, provides seamless integration with the QuantConnect LEAN Algorithmic Trading Engine. It empowers users to interact with DaatBento's services to create powerful trading algorithms + ### Introduction The Lean Data SDK is a cross-platform template repository for developing custom data types for Lean. @@ -11,9 +13,11 @@ These data types will be consumed by [QuantConnect](https://www.quantconnect.com It is composed by example .Net solution for the data type and converter scripts. +DataBento Library is an open-source project written in C#, designed to simplify the process of accessing real-time and historical financial market data. With support for Futures Data across the CME exchange and low latency, it offers a comprehensive solution for algorithmic trading. + ### Prerequisites -The solution targets dotnet 5, for installation instructions please follow [dotnet download](https://dotnet.microsoft.com/download). +The solution targets dotnet 6, for installation instructions please follow [dotnet download](https://dotnet.microsoft.com/download). The data downloader and converter script can be developed in different ways: C# executable, Python script, Python Jupyter notebook or even a bash script. - The python script should be compatible with python 3.6.8 @@ -21,27 +25,53 @@ The data downloader and converter script can be developed in different ways: C# Specifically, the enviroment where these scripts will be run is [quantconnect/research](https://hub.docker.com/repository/docker/quantconnect/research) based on [quantconnect/lean:foundation](https://hub.docker.com/repository/docker/quantconnect/lean). +### DatBento Overview +DataBento provides real time and historical market data through many powerful and developer-friendly APIs. Currently this implementation uses the Globex dataset to access CME +exchanges and provide data on CME products. DataBento provides a wide array of datasets, and exchanges for stocks, futures, and options. + +### Tutorial + +You can use the following command line arguments to launch the [LEAN CLI] (https://github.com/quantConnect/Lean-cli) pip project with DataBento. + +#### Downloading data +``` +lean data download --data-provider-historical DataBento --data-type Trade --resolution Daily --security-type Future --ticker ES --start 20240303 --end 20240404 --databento-api-key +``` + +#### Backtesting +``` +lean backtest "My Project" --data-provider-historical DataBento --databento-api-key +``` + +#### Jupyter Research Notebooks +``` +lean research "My Project" --data-provider-historical DataBento --databento-api-key +``` + +#### Live Trading +``` +lean live deploy "My Project" --data-provider-live DataBento --brokerage "Paper Trading" --databento-api-key +``` + ### Installation -The "Use this template" feature should be used for each unique data source which requires its own data processing. Once it is cloned locally, you should be able to successfully build the solution, run all tests and execute the downloader and/or conveter scripts. The final version should pass all CI tests of GitHub Actions. +To contribute to the DataBento API Connector Library for .NET 6 within QuantConnect LEAN, follow these steps: + - Obtain API Key: Visit [DataBento] (https://databento.com/) and sign up for an API key. + - Fork the Project: Fork the repository by clicking the "Fork" button at the top right of the GitHub page. + - Clone Your Forked Repository: -Once ready, please contact support@quantconnect.com and we will create a listing in the QuantConnect Data Market for your company and link to your public repository and commit hash. +Configure your project by + - Set the databento-api-key in your QuantConnect configuration (config.json or environment variables). -### Datasets Vendor Requirements +### Documentation +Refer to the [documentation] (https://databento.com/docs/) for detailed information on the library's functions, parameters, and usage examples. -Key requirements for new vendors include: +### Price Plan - - A well-defined dataset with a clear and static vision for the data to minimize churn or changes as people will be building systems from it. This is easiest with "raw" data (e.g. sunshine hours vs a sentiment algorithm) - - Robust ticker and security links to ensure the tickers are tracked well through time, or accurately point in time. ISIN, FIGI, or point in time ticker supported - - Robust funding to ensure viable for at least 1 year - - Robust API to ensure reliable up-time. No dead links on site or and 502 servers while using API - - Consistent delivery schedule, on time and in time for market trading - - Consistent data format with notifications and lead time on data format updates - - At least 1 year of historical point in time data - - Survivorship bias free data - - Good documentation for the dataset +For detailed information on DataBento's pricing plans, please refer to the [DataBento Pricing] (https://databento.com/pricing) page. +### License -### Tutorials +This project is licensed under the Apache License 2.0 — see the [LICENSE](https://github.com/QuantConnect/Lean.DataSource.DataBento/blob/master/LICENSE) file for details. - - See [Tutorials](https://www.quantconnect.com/docs/v2/our-platform/datasets/contributing-datasets) for a step by step guide for creating a new LEAN Data Source. \ No newline at end of file +Happy coding and algorithmic trading! \ No newline at end of file diff --git a/databento.json b/databento.json index c8efbf9..7580511 100644 --- a/databento.json +++ b/databento.json @@ -29,8 +29,7 @@ "Future" ], "markets": [ - "CME", - "Globex" + "CME" ] } }