@@ -87,31 +87,17 @@ public override Task<StatusResponse> Disconnect()
8787 /// <param name="instrument"></param>
8888 public override async Task < StatusResponse > Subscribe ( Instrument instrument )
8989 {
90- var descriptor = this . GetDescriptor ( ) ;
9190 var currency = instrument . Currency . Name ;
92- var instrumentDescriptor = this . GetDescriptor ( instrument . Name + currency ) ;
93- var domGrain = GrainFactory . GetGrain < IDomGrain > ( instrumentDescriptor ) ;
94- var instrumentGrain = GrainFactory . GetGrain < IInstrumentGrain > ( instrumentDescriptor ) ;
95- var positionsGrain = GrainFactory . GetGrain < IPositionsGrain > ( descriptor ) ;
96- var ordersGrain = GrainFactory . GetGrain < IOrdersGrain > ( descriptor ) ;
97- var symbol = new SharedSymbol ( TradingMode . Spot , instrument . Name , currency ) ;
98- var subResponse = await streamer . SubscribeToBookTickerUpdatesAsync ( state . Exchange , new SubscribeBookTickerRequest ( symbol ) , async o =>
99- {
100- var group = await instrumentGrain . Send ( instrument with { Price = MapBook ( o ) } ) ;
101-
102- observer . StreamPrice ( group ) ;
103- await observer . StreamInstrument ( group ) ;
104- } ) ;
91+ var security = new SharedSymbol ( TradingMode . Spot , instrument . Name , currency , instrument ? . Derivative ? . ExpirationDate ) ;
92+ var query = new SubscribeBookTickerRequest ( security ) ;
93+ var subResponse = await streamer . SubscribeToBookTickerUpdatesAsync ( state . Exchange , query , o => SendStream ( instrument , MapPrice ( o ) ) ) ;
10594
10695 if ( subResponse . Success is false )
10796 {
108- await streamer . SubscribeToTickerUpdatesAsync ( state . Exchange , new SubscribeTickerRequest ( symbol ) , async o =>
97+ switch ( true )
10998 {
110- var group = await instrumentGrain . Send ( instrument with { Price = MapPrice ( o ) } ) ;
111-
112- observer . StreamPrice ( group ) ;
113- await observer . StreamInstrument ( group ) ;
114- } ) ;
99+ case true when Equals ( state . Exchange , streamer . Coinbase . Exchange ) : await Coinbase ( instrument ) ; break ;
100+ }
115101 }
116102
117103 return new ( )
@@ -120,30 +106,67 @@ public override async Task<StatusResponse> Subscribe(Instrument instrument)
120106 } ;
121107 }
122108
109+ /// <summary>
110+ /// Stream price
111+ /// </summary>
112+ /// <param name="instrument"></param>
113+ /// <param name="o"></param>
114+ protected virtual async void SendStream ( Instrument instrument , Price o )
115+ {
116+ var currency = instrument . Currency . Name ;
117+ var instrumentDescriptor = this . GetDescriptor ( instrument . Name + currency ) ;
118+ var instrumentGrain = GrainFactory . GetGrain < IInstrumentGrain > ( instrumentDescriptor ) ;
119+ var group = await instrumentGrain . Send ( instrument with { Price = o } ) ;
120+
121+ observer . StreamPrice ( group ) ;
122+
123+ await observer . StreamInstrument ( group ) ;
124+ }
125+
123126 /// <summary>
124127 /// Map book
125128 /// </summary>
126129 /// <param name="o"></param>
127- protected virtual Price MapBook ( ExchangeEvent < SharedBookTicker > o ) => new ( )
130+ protected virtual Price MapPrice ( ExchangeEvent < SharedBookTicker > o ) => new ( )
128131 {
129132 Bid = ( double ) o . Data . BestBidPrice ,
130133 BidSize = ( double ) o . Data . BestBidQuantity ,
131134 Ask = ( double ) o . Data . BestAskPrice ,
132135 AskSize = ( double ) o . Data . BestAskQuantity ,
133- Time = DateTime . Now . Ticks
136+ Last = ( double ) ( ( o . Data . BestBidPrice + o . Data . BestAskPrice ) / 2 ) ,
137+ Time = o . DataTime ? . Ticks
134138 } ;
135139
136140 /// <summary>
137- /// Map price
141+ /// Subscribe to Coinbase
138142 /// </summary>
139- /// <param name="o "></param>
140- protected virtual Price MapPrice ( ExchangeEvent < SharedSpotTicker > o ) => new ( )
143+ /// <param name="instrument "></param>
144+ protected virtual Task Coinbase ( Instrument instrument )
141145 {
142- Bid = ( double ) o . Data . LastPrice ,
143- BidSize = ( double ) o . Data . LastPrice ,
144- Ask = ( double ) o . Data . QuoteVolume ,
145- AskSize = ( double ) o . Data . QuoteVolume ,
146- Time = DateTime . Now . Ticks
147- } ;
146+ var security = new SharedSymbol (
147+ TradingMode . Spot ,
148+ instrument . Name ,
149+ instrument . Currency . Name ,
150+ instrument ? . Derivative ? . ExpirationDate ) ;
151+
152+ var name = streamer . Coinbase . AdvancedTradeApi . FormatSymbol (
153+ security . BaseAsset ,
154+ security . QuoteAsset ,
155+ security . TradingMode ,
156+ security . DeliverTime ) ;
157+
158+ return streamer . Coinbase . AdvancedTradeApi . SubscribeToTickerUpdatesAsync ( name , o =>
159+ {
160+ SendStream ( instrument , new ( )
161+ {
162+ Bid = ( double ) o . Data . BestBidPrice ,
163+ BidSize = ( double ) o . Data . BestBidQuantity ,
164+ Ask = ( double ) o . Data . BestAskPrice ,
165+ AskSize = ( double ) o . Data . BestAskQuantity ,
166+ Last = ( double ) ( ( o . Data . BestBidPrice + o . Data . BestAskPrice ) / 2 ) ,
167+ Time = o . DataTime ? . Ticks
168+ } ) ;
169+ } ) ;
170+ }
148171 }
149172}
0 commit comments