Skip to content

Commit 51bd055

Browse files
committed
Cleanup
1 parent 954d4e1 commit 51bd055

14 files changed

Lines changed: 123 additions & 91 deletions

File tree

Core/Core/Conventions/Gateway.cs

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,19 @@ public interface ITradeObserver : IGrainObserver
1717
/// Order message
1818
/// </summary>
1919
/// <param name="order"></param>
20-
Task StreamOrder(Order order);
20+
void StreamOrder(Order order);
2121

2222
/// <summary>
2323
/// Price message
2424
/// </summary>
2525
/// <param name="instrument"></param>
26-
Task StreamTrade(Instrument instrument);
26+
void StreamPrice(Instrument instrument);
2727

2828
/// <summary>
2929
/// Price message
3030
/// </summary>
3131
/// <param name="instrument"></param>
32-
void StreamPrice(Instrument instrument);
32+
Task StreamTrade(Instrument instrument);
3333
}
3434

3535
public interface IGateway
@@ -39,15 +39,10 @@ public interface IGateway
3939
/// </summary>
4040
Account Account { get; set; }
4141

42-
/// <summary>
43-
/// Messenger
44-
/// </summary>
45-
IAsyncStream<Message> Messenger { get; }
46-
4742
/// <summary>
4843
/// Order message
4944
/// </summary>
50-
Func<Order, Task> OnOrder { get; set; }
45+
Action<Order> OnOrder { get; set; }
5146

5247
/// <summary>
5348
/// Price message
@@ -164,32 +159,25 @@ public abstract class Gateway : IGateway, ITradeObserver
164159
public virtual string Space { get; set; } = Guid.NewGuid().ToString();
165160

166161
/// <summary>
167-
/// Messenger
162+
/// Order message
168163
/// </summary>
169-
public virtual IAsyncStream<Message> Messenger => Connector
170-
.GetStreamProvider(nameof(Message))
171-
.GetStream<Message>(string.Empty, Guid.Empty);
164+
public virtual Action<Order> OnOrder { get; set; } = o => { };
172165

173166
/// <summary>
174-
/// Order message
167+
/// Price message
175168
/// </summary>
176-
public virtual Func<Order, Task> OnOrder { get; set; } = o => Task.CompletedTask;
169+
public virtual Action<Instrument> OnPrice { get; set; } = o => { };
177170

178171
/// <summary>
179172
/// Trade message
180173
/// </summary>
181174
public virtual Func<Instrument, Task> OnTrade { get; set; } = o => Task.CompletedTask;
182175

183-
/// <summary>
184-
/// Price message
185-
/// </summary>
186-
public virtual Action<Instrument> OnPrice { get; set; } = o => { };
187-
188176
/// <summary>
189177
/// Order message
190178
/// </summary>
191179
/// <param name="order"></param>
192-
public virtual Task StreamOrder(Order order) => OnOrder(order);
180+
public virtual void StreamOrder(Order order) => OnOrder(order);
193181

194182
/// <summary>
195183
/// Price message
@@ -341,8 +329,6 @@ protected virtual void SubscribeToUpdates()
341329
Performance = Account.Performance + position.Balance.Current
342330
};
343331
}
344-
345-
return Task.CompletedTask;
346332
};
347333
}
348334
}

Core/Core/Core.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
<PackageReference Include="Microsoft.Orleans.Serialization.MessagePack" Version="9.2.1" />
1313
<PackageReference Include="Microsoft.Orleans.Streaming" Version="9.2.1" />
1414
<PackageReference Include="Serilog.AspNetCore" Version="9.0.0" />
15-
<PackageReference Include="System.Interactive" Version="6.0.3" />
15+
<PackageReference Include="System.Interactive" Version="7.0.0" />
1616
<PackageReference Include="System.ServiceModel.Primitives" Version="10.0.652802" />
1717
</ItemGroup>
1818

Core/Core/Grains/OrdersGrain.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using Core.Models;
44
using Core.Validators;
55
using Orleans;
6+
using System;
67
using System.Collections.Generic;
78
using System.Linq;
89
using System.Threading.Tasks;
@@ -17,6 +18,12 @@ public interface IOrdersGrain : IGrainWithStringKey
1718
/// <param name="criteria"></param>
1819
Task<OrdersResponse> Orders(Criteria criteria);
1920

21+
/// <summary>
22+
/// Store order
23+
/// </summary>
24+
/// <param name="order"></param>
25+
Task<OrderResponse> Store(Order order);
26+
2027
/// <summary>
2128
/// Send order
2229
/// </summary>
@@ -66,7 +73,7 @@ public virtual async Task<OrdersResponse> Orders(Criteria criteria)
6673
}
6774

6875
/// <summary>
69-
/// Send order
76+
/// Store order
7077
/// </summary>
7178
/// <param name="order"></param>
7279
public virtual async Task<OrderResponse> Store(Order order)

Core/Core/Grains/PositionsGrain.cs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,12 @@ public interface IPositionsGrain : IGrainWithStringKey
1515
/// <param name="criteria"></param>
1616
Task<OrdersResponse> Positions(Criteria criteria);
1717

18+
/// <summary>
19+
/// Store
20+
/// </summary>
21+
/// <param name="order"></param>
22+
Task<OrderResponse> Store(Order order);
23+
1824
/// <summary>
1925
/// Process order to position conversion
2026
/// </summary>
@@ -52,6 +58,21 @@ public virtual async Task<OrdersResponse> Positions(Criteria criteria)
5258
};
5359
}
5460

61+
/// <summary>
62+
/// Store
63+
/// </summary>
64+
/// <param name="order"></param>
65+
public virtual async Task<OrderResponse> Store(Order order)
66+
{
67+
var instrument = order.Operation.Instrument.Name;
68+
var positionGrain = GrainFactory.GetGrain<IPositionGrain>(this.GetDescriptor(order.Id));
69+
var orderResponse = await positionGrain.Send(order);
70+
71+
State.Grains[instrument] = positionGrain;
72+
73+
return orderResponse;
74+
}
75+
5576
/// <summary>
5677
/// Process order to position conversion
5778
/// </summary>
@@ -64,12 +85,7 @@ public virtual async Task<OrderResponse> Send(Order order)
6485

6586
if (grain is null)
6687
{
67-
var positionGrain = GrainFactory.GetGrain<IPositionGrain>(this.GetDescriptor(order.Id));
68-
var orderResponse = await positionGrain.Send(order);
69-
70-
State.Grains[instrument] = positionGrain;
71-
72-
return orderResponse;
88+
return await Store(order);
7389
}
7490

7591
var response = await grain.Combine(order);

Core/Core/Grains/TransactionsGrain.cs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,7 @@
22
using Core.Enums;
33
using Core.Extensions;
44
using Core.Models;
5-
using Core.Services;
65
using Orleans;
7-
using System;
8-
using System.Collections.Generic;
9-
using System.IO;
106
using System.Linq;
117
using System.Threading.Tasks;
128

@@ -85,8 +81,7 @@ public virtual async Task<OrderResponse> Store(Order order)
8581
var response = await grain.Store(order);
8682

8783
State.Grains.Add(grain);
88-
89-
await observer.StreamOrder(order);
84+
observer.StreamOrder(order);
9085

9186
return response;
9287
}

Gateways/InteractiveBrokers/Libs/Grains/ConnectionGrain.cs

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
using Core.Extensions;
44
using Core.Grains;
55
using Core.Models;
6+
using Google.Protobuf.Reflection;
67
using IBApi;
78
using IBApi.Messages;
9+
using IBApi.Queries;
810
using InteractiveBrokers.Mappers;
911
using InteractiveBrokers.Models;
1012
using Orleans;
@@ -186,7 +188,7 @@ public override async Task<StatusResponse> Subscribe(Instrument instrument)
186188

187189
var contract = Upstream.MapContract(instrument);
188190
var cleaner = new CancellationTokenSource(state.Timeout);
189-
var contracts = await connector.GetContracts(cleaner.Token, contract);
191+
var contracts = await connector.GetContracts(contract, cleaner.Token);
190192
var contractMessage = contracts.FirstOrDefault();
191193

192194
if (contractMessage is null)
@@ -256,14 +258,16 @@ public virtual async Task<PricesResponse> Prices(Criteria criteria)
256258
{
257259
var contract = Upstream.MapContract(criteria.Instrument);
258260
var cleaner = new CancellationTokenSource(state.Timeout);
259-
var sourceItems = await connector.GetTicks(
260-
cleaner.Token,
261-
contract,
262-
criteria.MinDate.Value,
263-
criteria.MaxDate.Value,
264-
"BID_ASK",
265-
criteria.Count ?? 1);
261+
var query = new HistoricalTicksQuery()
262+
{
263+
Contract = contract,
264+
MinDate = criteria.MinDate.Value,
265+
MaxDate = criteria.MaxDate.Value,
266+
DataType = "BID_ASK",
267+
Count = criteria.Count ?? 1
268+
};
266269

270+
var sourceItems = await connector.GetTicks(query, cleaner.Token);
267271
var items = sourceItems.Select(Downstream.MapPrice).ToArray();
268272

269273
await Task.Delay(state.Span);
@@ -283,15 +287,16 @@ public virtual async Task<PricesResponse> PriceGroups(Criteria criteria)
283287
var cleaner = new CancellationTokenSource(state.Timeout);
284288
var contract = Upstream.MapContract(criteria.Instrument);
285289
var maxDate = criteria.MaxDate ?? DateTime.Now;
286-
var sourceItems = await connector.GetBars(
287-
cleaner.Token,
288-
contract,
289-
maxDate,
290-
criteria.Data.Get("Duration"),
291-
criteria.Data.Get("BarType"),
292-
criteria.Data.Get("DataType"),
293-
0);
290+
var query = new HistoricalBarsQuery
291+
{
292+
Contract = contract,
293+
MaxDate = maxDate,
294+
Duration = criteria.Data.Get("Duration"),
295+
BarType = criteria.Data.Get("BarType"),
296+
DataType = criteria.Data.Get("DataType"),
297+
};
294298

299+
var sourceItems = await connector.GetBars(query, cleaner.Token);
295300
var items = sourceItems.Select(Downstream.MapPrice).ToArray();
296301

297302
await Task.Delay(state.Span);
@@ -313,7 +318,7 @@ public virtual async Task<InstrumentsResponse> Options(Criteria criteria)
313318
var maxDate = (criteria.MaxDate ?? DateTime.Now).ToString($"yyyyMMdd-HH:mm:ss");
314319
var contract = Upstream.MapContract(criteria.Instrument);
315320
var cleaner = new CancellationTokenSource(state.Timeout);
316-
var sourceItems = await connector.GetContracts(cleaner.Token, contract);
321+
var sourceItems = await connector.GetContracts(contract, cleaner.Token);
317322
var items = sourceItems.Select(o => Downstream.MapInstrumentType(o.Contract)).ToArray();
318323

319324
await Task.Delay(state.Span);
@@ -346,10 +351,14 @@ public virtual async Task<Account> AccountSummary()
346351
/// <param name="criteria"></param>
347352
public virtual async Task<OrdersResponse> Orders(Criteria criteria)
348353
{
354+
var descriptor = this.GetDescriptor();
355+
var ordersGrain = GrainFactory.GetGrain<IOrdersGrain>(descriptor);
349356
var cleaner = new CancellationTokenSource(state.Timeout);
350357
var sourceItems = await connector.GetOrders(cleaner.Token);
351358
var items = sourceItems.Select(Downstream.MapOrder).ToArray();
352359

360+
await ordersGrain.Clear();
361+
await Task.WhenAll(items.Select(ordersGrain.Store));
353362
await Task.Delay(state.Span);
354363

355364
return new()
@@ -364,10 +373,14 @@ public virtual async Task<OrdersResponse> Orders(Criteria criteria)
364373
/// <param name="criteria"></param>
365374
public virtual async Task<OrdersResponse> Positions(Criteria criteria)
366375
{
376+
var descriptor = this.GetDescriptor();
377+
var positionsGrain = GrainFactory.GetGrain<IPositionsGrain>(descriptor);
367378
var cleaner = new CancellationTokenSource(state.Timeout);
368-
var sourceItems = await connector.GetPositions(cleaner.Token, state.Account.Descriptor);
379+
var sourceItems = await connector.GetPositions(state.Account.Descriptor, cleaner.Token);
369380
var items = sourceItems.Where(o => o.Position is not 0).Select(Downstream.MapPosition).ToArray();
370381

382+
await positionsGrain.Clear();
383+
await Task.WhenAll(items.Select(positionsGrain.Store));
371384
await Task.Delay(state.Span);
372385

373386
return new()
@@ -382,11 +395,12 @@ public virtual async Task<OrdersResponse> Positions(Criteria criteria)
382395
/// <param name="order"></param>
383396
public virtual async Task<OrderResponse> SendOrder(Core.Models.Order order)
384397
{
385-
var cleaner = new CancellationTokenSource(state.Timeout);
386398
var contract = Upstream.MapContract(order.Operation.Instrument);
387-
var (orderMessage, SL, TP) = Upstream.MapOrder(connector.Id, order, state.Account);
399+
var (orderMessage, SL, TP) = Upstream.MapOrder(order, state.Account);
400+
var (group, braces) = connector.SendOrder(contract, orderMessage, SL, TP);
401+
402+
order = order with { Operation = order.Operation with { Id = $"{group.OrderId}" } };
388403

389-
await connector.SendOrder(cleaner.Token, contract, orderMessage, SL, TP);
390404
await Task.Delay(state.Span);
391405

392406
return new()

Gateways/InteractiveBrokers/Libs/InterGateway.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,12 @@ public class InterGateway : Gateway
3535
/// </summary>
3636
public override async Task<StatusResponse> Connect()
3737
{
38-
var grain = Component<IInterConnectionGrain>();
38+
var actionsGrain = Component<ITransactionsGrain>();
39+
var connectionGrain = Component<IInterConnectionGrain>();
3940
var observer = Connector.CreateObjectReference<ITradeObserver>(this);
4041

41-
await grain.Setup(new Connection
42+
await actionsGrain.Setup(observer);
43+
await connectionGrain.Setup(new Connection
4244
{
4345
Host = Host,
4446
Port = Port,
@@ -50,7 +52,7 @@ await grain.Setup(new Connection
5052

5153
SubscribeToUpdates();
5254

53-
return await grain.Connect();
55+
return await connectionGrain.Connect();
5456
}
5557

5658
/// <summary>

Gateways/InteractiveBrokers/Libs/InteractiveBrokers.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
</PropertyGroup>
88

99
<ItemGroup>
10-
<PackageReference Include="InterBroker" Version="0.0.23" />
10+
<PackageReference Include="InterBroker" Version="0.0.28" />
1111
</ItemGroup>
1212

1313
<ItemGroup>

Gateways/InteractiveBrokers/Libs/Maps/Upstream.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,12 @@ public class Upstream
1010
/// <summary>
1111
/// Convert remote order from brokerage to local record
1212
/// </summary>
13-
/// <param name="orderId"></param>
1413
/// <param name="orderModel"></param>
1514
/// <param name="account"></param>
16-
public static (IBApi.Order, double?, double?) MapOrder(int orderId, Core.Models.Order orderModel, Account account)
15+
public static (IBApi.Order, double?, double?) MapOrder(Core.Models.Order orderModel, Account account)
1716
{
1817
var order = new IBApi.Order
1918
{
20-
OrderId = orderId,
2119
Action = MapSide(orderModel.Side),
2220
Tif = MapTimeSpan(orderModel.TimeSpan),
2321
OrderType = MapOrderType(orderModel.Type),

Gateways/Schwab/Libs/Grains/OrdersGrain.cs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
using Core.Enums;
2-
using Core.Extensions;
32
using Core.Grains;
43
using Core.Models;
54
using Schwab.Messages;
@@ -61,19 +60,11 @@ public virtual async Task<StatusResponse> Setup(Connection connection)
6160
/// <param name="criteria"></param>
6261
public override async Task<OrdersResponse> Orders(Criteria criteria)
6362
{
64-
if (criteria?.Source is true)
65-
{
66-
return await Orders(criteria);
67-
}
68-
6963
var cleaner = new CancellationTokenSource(state.Timeout);
7064
var query = new OrderQuery { AccountCode = criteria.Account.Descriptor};
7165
var messages = await connector.GetOrders(query, cleaner.Token);
7266
var items = messages.Select(MapOrder);
7367

74-
await Clear();
75-
await Task.WhenAll(items.Select(Send));
76-
7768
return new()
7869
{
7970
Data = [.. items]

0 commit comments

Comments
 (0)