Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 8 additions & 21 deletions Engine/TransactionHandlers/BacktestingTransactionHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,14 @@ public override void Initialize(IAlgorithm algorithm, IBrokerage brokerage, IRes
_enableConcurrency = _brokerage.ConcurrencyEnabled && _algorithm.LiveMode;

base.Initialize(algorithm, brokerage, resultHandler);

if (!_enableConcurrency)
{
// non blocking implementation
_orderRequestQueues = new() { new BusyCollection<OrderRequest>() };
}
}

/// <summary>
/// For backtesting order requests are processed synchronously by the algorithm thread, only live
/// deployments with a concurrency enabled brokerage use background transaction threads
/// </summary>
protected override bool SynchronousProcessing => !_enableConcurrency;

/// <summary>
/// Processes all synchronous events that must take place before the next time loop for the algorithm
/// </summary>
Expand All @@ -74,7 +74,7 @@ public override void ProcessSynchronousEvents()
if (!_enableConcurrency)
{
// we process pending order requests our selves
Run(0);
ProcessPendingRequests();
}

base.ProcessSynchronousEvents();
Expand Down Expand Up @@ -113,7 +113,7 @@ protected override void WaitForOrderSubmission(OrderTicket ticket)
}

// we submit the order request our selves
Run(0);
ProcessPendingRequests();

if (!ticket.OrderSet.WaitOne(0))
{
Expand All @@ -124,18 +124,5 @@ protected override void WaitForOrderSubmission(OrderTicket ticket)
"See the OrderRequest.Response for more information");
}
}

/// <summary>
/// For backtesting order requests will be processed by the algorithm thread
/// sequentially at <see cref="WaitForOrderSubmission"/> and <see cref="ProcessSynchronousEvents"/>
/// </summary>
protected override void InitializeTransactionThread()
{
if (_enableConcurrency)
{
// let the base class handle this
base.InitializeTransactionThread();
}
}
}
}
150 changes: 64 additions & 86 deletions Engine/TransactionHandlers/BrokerageTransactionHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,10 @@ public class BrokerageTransactionHandler : ITransactionHandler
private int _failedCashSyncAttempts;

/// <summary>
/// OrderQueue holds the newly updated orders from the user algorithm waiting to be processed. Once
/// orders are processed they are moved into the Orders queue awaiting the brokerage response.
/// Holds the worker threads and their queues, dispatching each order request to the queue pinned to
/// its order and growing the pool on demand as the threads get saturated.
/// </summary>
protected List<IBusyCollection<OrderRequest>> _orderRequestQueues { get; set; }

private List<Thread> _processingThreads;
private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
protected OrderRequestProcessingPool _threadPool;

private readonly ConcurrentQueue<OrderEvent> _orderEvents = new ConcurrentQueue<OrderEvent>();

Expand Down Expand Up @@ -210,8 +207,6 @@ public virtual void Initialize(IAlgorithm algorithm, IBrokerage brokerage, IResu
HandleOrderUpdated(e);
};

IsActive = true;

if (_algorithm is QCAlgorithm qcAlgorithm)
{
_qcAlgorithmInstance = qcAlgorithm;
Expand All @@ -230,35 +225,58 @@ public virtual void Initialize(IAlgorithm algorithm, IBrokerage brokerage, IResu
InitializeTransactionThread();
}

/// <summary>
/// Whether the transaction thread pool can grow on demand to process order requests concurrently.
/// When false a single worker thread is used.
/// </summary>
protected virtual bool ConcurrencyEnabled => _brokerage.ConcurrencyEnabled;

/// <summary>
/// Whether order requests are drained synchronously by the algorithm thread instead of by background
/// worker threads. Used by backtesting deployments.
/// </summary>
protected virtual bool SynchronousProcessing => false;

/// <summary>
/// The maximum number of transaction threads the pool can grow to
/// </summary>
protected virtual int MaximumTransactionThreads => Config.GetInt("maximum-transaction-threads", 10);

/// <summary>
/// The number of transaction threads the pool starts with
/// </summary>
protected virtual int MinimumTransactionThreads => Config.GetInt("minimum-transaction-threads", 2);

/// <summary>
/// The number of transaction threads currently running
/// </summary>
protected int ProcessingThreadsCount => _threadPool?.ThreadCount ?? 0;

/// <summary>
/// Boolean flag indicating the transaction threads are busy.
/// False indicates they are completely finished processing and ready to be terminated.
/// </summary>
public bool IsActive => _threadPool?.IsActive ?? false;

/// <summary>
/// Create and start the transaction thread, who will be in charge of processing
/// the order requests
/// </summary>
protected virtual void InitializeTransactionThread()
{
// multi threaded queue, used for live deployments
var processingThreadsCount = _brokerage.ConcurrencyEnabled
? Config.GetInt("maximum-transaction-threads", 4)
: 1;
_orderRequestQueues = new(processingThreadsCount);
_processingThreads = new(processingThreadsCount);
for (var i = 0; i < processingThreadsCount; i++)
{
_orderRequestQueues.Add(new BusyBlockingCollection<OrderRequest>());
var threadId = i; // avoid modified closure
_processingThreads.Add(new Thread(() => Run(threadId)) { IsBackground = true, Name = $"Transaction Thread {i}" });
}
foreach (var thread in _processingThreads)
Action<OrderRequest> processRequest = request =>
{
thread.Start();
}
}
HandleOrderRequest(request);
ProcessAsynchronousEvents();
};
Action<Exception> onError = error => _algorithm.SetRuntimeError(error, "HandleOrderRequest");

/// <summary>
/// Boolean flag indicating the Run thread method is busy.
/// False indicates it is completely finished processing and ready to be terminated.
/// </summary>
public bool IsActive { get; private set; }
// backtesting drains a single queue synchronously on the algorithm thread, live deployments use
// background worker threads: a single one, or growing on demand up to the maximum when concurrent.
_threadPool = SynchronousProcessing
? OrderRequestProcessingPool.Synchronous(processRequest, onError)
: new OrderRequestProcessingPool(ConcurrencyEnabled, MinimumTransactionThreads, MaximumTransactionThreads, processRequest, onError);
}

#region Order Request Processing

Expand Down Expand Up @@ -338,7 +356,7 @@ public OrderTicket AddOrder(SubmitOrderRequest request)
order.OrderSubmissionData = new OrderSubmissionData(security.BidPrice, security.AskPrice, security.Close);
_openOrders[order.Id] = new OpenOrderState(order, ticket, security);

EnqueueOrderRequest(request, order);
_threadPool.Dispatch(request, order);

WaitForOrderSubmission(ticket);
}
Expand Down Expand Up @@ -366,7 +384,7 @@ public OrderTicket AddOrder(SubmitOrderRequest request)
}

/// <summary>
/// Wait for the order to be handled by the <see cref="_processingThreads"/>
/// Wait for the order to be handled by the <see cref="_threadPool"/>
/// </summary>
/// <param name="ticket">The <see cref="OrderTicket"/> expecting to be submitted</param>
protected virtual void WaitForOrderSubmission(OrderTicket ticket)
Expand Down Expand Up @@ -454,7 +472,7 @@ public OrderTicket UpdateOrder(UpdateOrderRequest request)
else
{
request.SetResponse(OrderResponse.Success(request), OrderRequestStatus.Processing);
EnqueueOrderRequest(request, order);
_threadPool.Dispatch(request, order);
}
}
catch (Exception err)
Expand Down Expand Up @@ -526,7 +544,7 @@ public OrderTicket CancelOrder(CancelOrderRequest request)

// send the request to be processed
request.SetResponse(OrderResponse.Success(request), OrderRequestStatus.Processing);
EnqueueOrderRequest(request, order);
_threadPool.Dispatch(request, order);
}
}
catch (Exception err)
Expand Down Expand Up @@ -674,29 +692,12 @@ public List<Order> GetOpenOrders(Func<Order, bool> filter = null)
}

/// <summary>
/// Primary thread entry point to launch the transaction thread.
/// Drains the pending order requests on the calling thread. Used by synchronous (non concurrent)
/// deployments, where the algorithm thread pumps the request queue itself.
/// </summary>
protected void Run(int threadId)
protected void ProcessPendingRequests()
{
try
{
foreach (var request in _orderRequestQueues[threadId].GetConsumingEnumerable(_cancellationTokenSource.Token))
{
HandleOrderRequest(request);
ProcessAsynchronousEvents();
}
}
catch (Exception err)
{
// unexpected error, we need to close down shop
_algorithm.SetRuntimeError(err, "HandleOrderRequest");
}

if (_processingThreads != null)
{
Log.Trace($"BrokerageTransactionHandler.Run(): Ending Thread {threadId}...");
IsActive = false;
}
_threadPool.ProcessPending();
}

/// <summary>
Expand All @@ -717,7 +718,7 @@ public virtual void ProcessSynchronousEvents()
// in backtesting we need to wait for orders to be removed from the queue and finished processing
if (!_algorithm.LiveMode)
{
if (_orderRequestQueues.Any(queue => queue.IsBusy && !queue.WaitHandle.WaitOne(Time.OneSecond, _cancellationTokenSource.Token)))
if (_threadPool.WaitForProcessing(Time.OneSecond))
{
Log.Error("BrokerageTransactionHandler.ProcessSynchronousEvents(): Timed out waiting for request queue to finish processing.");
}
Expand Down Expand Up @@ -799,27 +800,8 @@ public void AddOpenOrder(Order order, IAlgorithm algorithm)
/// </summary>
public void Exit()
{
var timeout = TimeSpan.FromSeconds(60);
if (_processingThreads != null)
{
// only wait if the processing thread is running
if (_orderRequestQueues.Any(queue => queue.IsBusy && !queue.WaitHandle.WaitOne(timeout)))
{
Log.Error("BrokerageTransactionHandler.Exit(): Exceed timeout: " + (int)(timeout.TotalSeconds) + " seconds.");
}

foreach (var queue in _orderRequestQueues)
{
queue.CompleteAdding();
}

foreach (var thread in _processingThreads)
{
thread?.StopSafely(timeout, _cancellationTokenSource);
}
}
IsActive = false;
_cancellationTokenSource.DisposeSafely();
// Shutdown drains the queued requests (CompleteAdding) and waits for the threads before stopping
_threadPool?.Shutdown(TimeSpan.FromSeconds(60));
}

/// <summary>
Expand Down Expand Up @@ -1227,6 +1209,12 @@ private void HandleOrderEvents(List<OrderEvent> orderEvents)
order.Status = orderEvent.Status;
}

// notify the pool once an order reaches a final state so it can release its processing queue
if (order.Status.IsClosed())
{
_threadPool.Release(order);
}

orderEvent.Id = order.GetNewId();

// set the modified time of the order to the fill's timestamp
Expand Down Expand Up @@ -1937,16 +1925,6 @@ private string GetShortableErrorMessage(Symbol symbol, decimal quantity)
return $"Order exceeds shortable quantity {shortableQuantity} for Symbol {symbol} requested {quantity})";
}

private void EnqueueOrderRequest(OrderRequest request, Order order)
{
var queueKey = request.OrderId;
if (order.GroupOrderManager?.Id > 0)
{
queueKey = order.GroupOrderManager.Id;
}
_orderRequestQueues[queueKey % _orderRequestQueues.Count].Add(request);
}

/// <summary>
/// Holds an order and its state
/// </summary>
Expand Down
Loading
Loading