From 38781fef6c252d9dda741b964d69746815fa5c25 Mon Sep 17 00:00:00 2001 From: Tim Snyder Date: Thu, 27 Jun 2024 09:20:36 -0400 Subject: [PATCH 01/15] logging --- src/exchanges/binance/rest_api/endpoints/instruments.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/exchanges/binance/rest_api/endpoints/instruments.rs b/src/exchanges/binance/rest_api/endpoints/instruments.rs index b99add6..4b363de 100644 --- a/src/exchanges/binance/rest_api/endpoints/instruments.rs +++ b/src/exchanges/binance/rest_api/endpoints/instruments.rs @@ -64,7 +64,7 @@ impl<'de> Deserialize<'de> for BinanceAllInstruments { let instruments_value = val .get("symbols") - .ok_or(eyre::ErrReport::msg("could not find 'symbols' field in binance instruments response".to_string())) + .ok_or(eyre::ErrReport::msg(format!("could not find 'symbols' field in binance instruments response of {val:?}"))) .map_err(serde::de::Error::custom)?; let instruments = serde_json::from_value(instruments_value.clone()).map_err(serde::de::Error::custom)?; From 1feaf04e5b5743dd2383e79ca72dd2aaadf5fd6b Mon Sep 17 00:00:00 2001 From: Tim Snyder Date: Thu, 27 Jun 2024 10:04:23 -0400 Subject: [PATCH 02/15] remove unsupported 'a' field from trades stream --- src/exchanges/binance/ws/channels/trades.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/exchanges/binance/ws/channels/trades.rs b/src/exchanges/binance/ws/channels/trades.rs index ef499bb..ee7456e 100644 --- a/src/exchanges/binance/ws/channels/trades.rs +++ b/src/exchanges/binance/ws/channels/trades.rs @@ -21,8 +21,6 @@ pub struct BinanceTrade { pub quantity: f64, #[serde(rename = "t")] pub trade_id: u64, - #[serde(rename = "a")] - pub seller_order_id: u64, #[serde(rename = "m")] pub is_buyer_market_maker: bool, #[serde(rename = "T")] From 9ff9ec28edb39499c5fb0bb58f30eeb9092c153d Mon Sep 17 00:00:00 2001 From: jnoorchashm37 Date: Thu, 27 Jun 2024 12:47:50 -0400 Subject: [PATCH 03/15] test timeout --- tests/utils.rs | 12 ++++++++++++ tests/ws.rs | 4 +++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/tests/utils.rs b/tests/utils.rs index 75fb6a9..98fcf21 100644 --- a/tests/utils.rs +++ b/tests/utils.rs @@ -145,3 +145,15 @@ pub fn init_test_tracing() { .with(vec![data_layer, general_layer]) .init(); } + +pub async fn timeout_function(test_duration_sec: u64, f: impl futures::Future) -> bool +where +{ + let sleep = tokio::time::sleep(std::time::Duration::from_secs(test_duration_sec)); + tokio::pin!(sleep); + + tokio::select! { + _ = f => true, + _ = &mut sleep => false, + } +} diff --git a/tests/ws.rs b/tests/ws.rs index 8f79411..c1fe97b 100644 --- a/tests/ws.rs +++ b/tests/ws.rs @@ -136,7 +136,9 @@ mod okex_tests { init_test_tracing(); let builder = OkexWsBuilder::new(None) .add_channel(OkexWsChannel::new_trade(vec![RawTradingPair::new_raw("ETH_USDt", '_'), RawTradingPair::new_no_delim("BTC-USdt")]).unwrap()); - okex_util(builder, 5).await; + + let is_success = timeout_function(5, okex_util(builder, 5)).await; + //okex_util(builder, 5).await; } #[tokio::test] From f0b427961b7fb438a0f87dc3f757618cecccfcdb Mon Sep 17 00:00:00 2001 From: jnoorchashm37 Date: Thu, 27 Jun 2024 12:48:00 -0400 Subject: [PATCH 04/15] test timeout --- tests/ws.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/ws.rs b/tests/ws.rs index c1fe97b..0f3b800 100644 --- a/tests/ws.rs +++ b/tests/ws.rs @@ -138,6 +138,7 @@ mod okex_tests { .add_channel(OkexWsChannel::new_trade(vec![RawTradingPair::new_raw("ETH_USDt", '_'), RawTradingPair::new_no_delim("BTC-USdt")]).unwrap()); let is_success = timeout_function(5, okex_util(builder, 5)).await; + assert!(is_success); //okex_util(builder, 5).await; } From 47b01cd98049667aed89c051d2d9d31f315fe532 Mon Sep 17 00:00:00 2001 From: Tim Snyder Date: Thu, 27 Jun 2024 12:48:24 -0400 Subject: [PATCH 05/15] restructure init to avoid double tracing --- Cargo.lock | 1 + Cargo.toml | 4 ++ .../binance/rest_api/endpoints/instruments.rs | 53 +++++++++++++++++++ tests/utils.rs | 52 +++++++++--------- 4 files changed, 86 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9ae2c66..0f9a20c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -189,6 +189,7 @@ dependencies = [ "clap", "eyre", "futures", + "once_cell", "paste", "rand", "reqwest", diff --git a/Cargo.toml b/Cargo.toml index cc4d065..186a353 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,3 +49,7 @@ default = ["us", "non-us"] all = ["non-us", "us"] non-us = [] us = [] + +[dev-dependencies] + +once_cell = "1.18.0" \ No newline at end of file diff --git a/src/exchanges/binance/rest_api/endpoints/instruments.rs b/src/exchanges/binance/rest_api/endpoints/instruments.rs index 4b363de..c6aa01f 100644 --- a/src/exchanges/binance/rest_api/endpoints/instruments.rs +++ b/src/exchanges/binance/rest_api/endpoints/instruments.rs @@ -165,3 +165,56 @@ impl PartialEq for BinanceInstrument { equals } } + +// #[cfg(test)] +// mod tests { +// use crate::normalized::types::NormalizedTradingPair; + +// use super::*; + +// #[test] +// fn test_binance_instrument_normalize() { +// let bi = BinanceInstrument { +// symbol: BinanceTradingPair("ETHBTC".to_string()), +// status: "TRADING".to_string(), +// base_asset: "ETH".to_string(), +// base_asset_precision: 8, +// quote_asset: "BTC".to_string(), +// quote_precision: 8, +// quote_asset_precision: 8, +// order_types: vec!["LIMIT".to_string(), "LIMIT_MAKER".to_string(), "MARKET".to_string(), "STOP_LOSS_LIMIT".to_string(), "TAKE_PROFIT_LIMIT".to_string()], +// iceberg_allowed: true, +// oco_allowed: true, +// quote_order_qty_market_allowed: true, +// allow_trailing_stop: true, +// cancel_replace_allowed: true, +// is_spot_trading_allowed: true, +// is_margin_trading_allowed: true, +// permission_sets: vec![vec![BinanceTradingType::Spot, BinanceTradingType::Margin, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other]], +// permissions: vec![], +// default_self_trade_prevention_mode: "EXPIRE_MAKER".to_string(), +// allowed_self_trade_prevention_modes: vec!["EXPIRE_TAKER".to_string(), "EXPIRE_MAKER".to_string(), "EXPIRE_BOTH".to_string()], +// }; + +// let expected = vec![ +// NormalizedInstrument { +// exchange: CexExchange::Binance, +// trading_pair: NormalizedTradingPair::new_base_quote(CexExchange::Binance, "ETH", "BTC", None, None), +// trading_type: NormalizedTradingType::Spot, +// base_asset_symbol: "ETH".to_string(), +// quote_asset_symbol: "BTC".to_string(), +// active: true, +// futures_expiry: None +// }, NormalizedInstrument { +// exchange: CexExchange::Binance, +// trading_pair: NormalizedTradingPair::new_base_quote(CexExchange::Binance, "ETH", "BTC", None, None), +// trading_type: NormalizedTradingType::Margin, +// base_asset_symbol: "ETH".to_string(), +// quote_asset_symbol: "BTC".to_string(), +// active: true, +// futures_expiry: None +// } +// ]; +// assert_eq!(bi.normalize(), expected); +// } +// } \ No newline at end of file diff --git a/tests/utils.rs b/tests/utils.rs index 98fcf21..01c9598 100644 --- a/tests/utils.rs +++ b/tests/utils.rs @@ -1,5 +1,5 @@ #![allow(unused)] -use std::fmt::Debug; +use std::{fmt::Debug, sync::Once}; use cex_exchanges::{ clients::ws::{MutliWsStreamBuilder, WsStream}, @@ -120,30 +120,34 @@ where writeln!(f0, "{}", serde_json::to_string(&a).unwrap()).unwrap(); } +static TRACING: Once = Once::new(); + pub fn init_test_tracing() { - let data_filter = EnvFilter::builder() - .with_default_directive(format!("cex-exchanges={}", Level::TRACE).parse().unwrap()) - .from_env_lossy(); - - let data_layer = tracing_subscriber::fmt::layer() - .with_ansi(true) - .with_target(true) - .with_filter(data_filter) - .boxed(); - - let general_filter = EnvFilter::builder() - .with_default_directive(Level::DEBUG.into()) - .from_env_lossy(); - - let general_layer = tracing_subscriber::fmt::layer() - .with_ansi(true) - .with_target(true) - .with_filter(general_filter) - .boxed(); - - tracing_subscriber::registry() - .with(vec![data_layer, general_layer]) - .init(); + TRACING.call_once(|| { + let data_filter = EnvFilter::builder() + .with_default_directive(format!("cex-exchanges={}", Level::TRACE).parse().unwrap()) + .from_env_lossy(); + + let data_layer = tracing_subscriber::fmt::layer() + .with_ansi(true) + .with_target(true) + .with_filter(data_filter) + .boxed(); + + let general_filter = EnvFilter::builder() + .with_default_directive(Level::DEBUG.into()) + .from_env_lossy(); + + let general_layer = tracing_subscriber::fmt::layer() + .with_ansi(true) + .with_target(true) + .with_filter(general_filter) + .boxed(); + + tracing_subscriber::registry() + .with(vec![data_layer, general_layer]) + .init(); + }); } pub async fn timeout_function(test_duration_sec: u64, f: impl futures::Future) -> bool From d44ad8543217576a0240ec3a5c1f897b0f78dc9a Mon Sep 17 00:00:00 2001 From: Tim Snyder Date: Thu, 27 Jun 2024 18:08:38 -0400 Subject: [PATCH 06/15] add timeouts --- src/exchanges/binance/mod.rs | 1 + src/exchanges/kucoin/ws/builder.rs | 4 +-- tests/ws.rs | 58 +++++++++++++++--------------- 3 files changed, 31 insertions(+), 32 deletions(-) diff --git a/src/exchanges/binance/mod.rs b/src/exchanges/binance/mod.rs index 0fa800c..25d36eb 100644 --- a/src/exchanges/binance/mod.rs +++ b/src/exchanges/binance/mod.rs @@ -170,6 +170,7 @@ impl Exchange for Binance { .await .map(|v| BinanceRestApiResponse::Instruments(v)) }; + info!(target: "cex-exchanges::binance", "response: {:?}", api_response); if let Err(e) = api_response.as_ref() { error!(target: "cex-exchanges::binance", "error calling rest-api endpoint {:?} -- {:?}", api_channel, e); diff --git a/src/exchanges/kucoin/ws/builder.rs b/src/exchanges/kucoin/ws/builder.rs index e7ec79f..e05c6f1 100644 --- a/src/exchanges/kucoin/ws/builder.rs +++ b/src/exchanges/kucoin/ws/builder.rs @@ -10,8 +10,8 @@ use crate::{ /// There is a limit of 300 connections per attempt every 5 minutes per IP. const MAX_KUCOIN_STREAMS: usize = 300; -/// A single connection can listen to a maximum of 1024 streams. -const MAX_KUCOIN_WS_CONNS_PER_STREAM: usize = 1024; +/// A single connection can listen to a maximum of 100 streams. +const MAX_KUCOIN_WS_CONNS_PER_STREAM: usize = 100; #[derive(Debug, Clone, Default)] pub struct KucoinWsBuilder { diff --git a/tests/ws.rs b/tests/ws.rs index 0f3b800..724a449 100644 --- a/tests/ws.rs +++ b/tests/ws.rs @@ -30,7 +30,7 @@ mod coinbase_tests { let builder = CoinbaseWsBuilder::default().add_channel( CoinbaseWsChannel::new_match(vec![RawTradingPair::new_raw("ETH_USD", '_'), RawTradingPair::new_no_delim("BTC-USD")]).unwrap() ); - coinbase_util(builder, 5).await; + assert!(timeout_function(5, coinbase_util(builder, 5)).await); } #[tokio::test] @@ -40,7 +40,7 @@ mod coinbase_tests { let builder = CoinbaseWsBuilder::default().add_channel( CoinbaseWsChannel::new_ticker(vec![RawTradingPair::new_raw("ETH_USD", '_'), RawTradingPair::new_no_delim("BTC-USD")]).unwrap() ); - coinbase_util(builder, 5).await; + assert!(timeout_function(5, coinbase_util(builder, 5)).await); } #[tokio::test] @@ -56,7 +56,7 @@ mod coinbase_tests { .build_many_distributed() .unwrap(); - mutlistream_util(builder, 50).await; + assert!(timeout_function(30, mutlistream_util(builder, 50)).await); } #[tokio::test(flavor = "multi_thread", worker_threads = 3)] @@ -69,7 +69,7 @@ mod coinbase_tests { .await .unwrap(); - mutlistream_util(builder, 1000).await; + assert!(timeout_function(45, mutlistream_util(builder, 1000)).await); } #[tokio::test(flavor = "multi_thread", worker_threads = 3)] @@ -81,7 +81,7 @@ mod coinbase_tests { let builder = CoinbaseWsBuilder::build_from_all_instruments(&channels, Some(10)) .await .unwrap(); - mutlithreaded_util(builder, 1000).await; + assert!(timeout_function(45, mutlithreaded_util(builder, 1000)).await); } #[tokio::test(flavor = "multi_thread", worker_threads = 3)] @@ -104,7 +104,7 @@ mod coinbase_tests { .collect::>() ); - normalized_mutlithreaded_util(builder, 1000).await; + assert!(timeout_function(45, normalized_mutlithreaded_util(builder, 1000)).await); } } @@ -127,7 +127,7 @@ mod okex_tests { use super::*; async fn okex_util(builder: OkexWsBuilder, iterations: usize) { - stream_util(builder.build_single(), iterations).await; + assert!(timeout_function(5, stream_util(builder.build_single(), iterations)).await); } #[tokio::test] @@ -137,9 +137,7 @@ mod okex_tests { let builder = OkexWsBuilder::new(None) .add_channel(OkexWsChannel::new_trade(vec![RawTradingPair::new_raw("ETH_USDt", '_'), RawTradingPair::new_no_delim("BTC-USdt")]).unwrap()); - let is_success = timeout_function(5, okex_util(builder, 5)).await; - assert!(is_success); - //okex_util(builder, 5).await; + assert!(timeout_function(15, okex_util(builder, 5)).await); } #[tokio::test] @@ -149,7 +147,7 @@ mod okex_tests { let builder = OkexWsBuilder::new(None).add_channel( OkexWsChannel::new_book_ticker(vec![RawTradingPair::new_raw("ETH_USDt", '_'), RawTradingPair::new_no_delim("BTC-USdc")]).unwrap() ); - okex_util(builder, 5).await; + assert!(timeout_function(5, okex_util(builder, 5)).await); } #[tokio::test] @@ -168,7 +166,7 @@ mod okex_tests { .build_many_distributed() .unwrap(); - mutlistream_util(builder, 50).await; + assert!(timeout_function(60, mutlistream_util(builder, 50)).await); } #[tokio::test(flavor = "multi_thread", worker_threads = 3)] @@ -181,7 +179,7 @@ mod okex_tests { .await .unwrap(); - mutlistream_util(builder, 1000).await; + assert!(timeout_function(15, mutlistream_util(builder, 1000)).await); } #[tokio::test(flavor = "multi_thread", worker_threads = 3)] @@ -193,7 +191,7 @@ mod okex_tests { let builder = OkexWsBuilder::build_from_all_instruments(&channels, None, Some(10)) .await .unwrap(); - mutlithreaded_util(builder, 1000).await; + assert!(timeout_function(15, mutlithreaded_util(builder, 1000)).await); } #[tokio::test(flavor = "multi_thread", worker_threads = 3)] @@ -216,7 +214,7 @@ mod okex_tests { .collect::>() ); - normalized_mutlithreaded_util(builder, 1000).await; + assert!(timeout_function(25, normalized_mutlithreaded_util(builder, 1000)).await); } } @@ -245,7 +243,7 @@ mod binance_tests { let builder = BinanceWsBuilder::default().add_channel( BinanceWsChannel::new_trade(vec![RawTradingPair::new_raw("ETH_USDt", '_'), RawTradingPair::new_no_delim("BTC-USdc")]).unwrap() ); - binance_util(builder, 5).await; + assert!(timeout_function(60, binance_util(builder, 5)).await); } #[tokio::test] @@ -255,7 +253,7 @@ mod binance_tests { let builder = BinanceWsBuilder::default().add_channel( BinanceWsChannel::new_book_ticker(vec![RawTradingPair::new_raw("ETH_USDt", '_'), RawTradingPair::new_no_delim("BTC-USdc")]).unwrap() ); - binance_util(builder, 5).await; + assert!(timeout_function(5, binance_util(builder, 5)).await); } #[tokio::test] @@ -274,7 +272,7 @@ mod binance_tests { .build_many_distributed() .unwrap(); - mutlistream_util(builder, 50).await; + assert!(timeout_function(120, mutlistream_util(builder, 50)).await); } #[tokio::test(flavor = "multi_thread", worker_threads = 3)] @@ -287,7 +285,7 @@ mod binance_tests { .await .unwrap(); - mutlistream_util(builder, 1000).await; + assert!(timeout_function(120, mutlistream_util(builder, 1000)).await); } #[tokio::test(flavor = "multi_thread", worker_threads = 3)] @@ -299,7 +297,7 @@ mod binance_tests { let builder = BinanceWsBuilder::build_from_all_instruments(&channels) .await .unwrap(); - mutlithreaded_util(builder, 1000).await; + assert!(timeout_function(120, mutlithreaded_util(builder, 1000)).await); } } @@ -328,7 +326,7 @@ mod kucoin_tests { let builder = KucoinWsBuilder::default().add_channel( KucoinWsChannel::new_match(vec![RawTradingPair::new_raw("ETH_USDt", '_'), RawTradingPair::new_no_delim("BTC-USdc")]).unwrap() ); - kucoin_util(builder, 5).await; + assert!(timeout_function(45, kucoin_util(builder, 5)).await); } #[tokio::test] @@ -338,7 +336,7 @@ mod kucoin_tests { let builder = KucoinWsBuilder::default().add_channel( KucoinWsChannel::new_ticker(vec![RawTradingPair::new_raw("ETH_USDt", '_'), RawTradingPair::new_no_delim("BTC-USdc")]).unwrap() ); - kucoin_util(builder, 5).await; + assert!(timeout_function(5, kucoin_util(builder, 5)).await); } #[tokio::test] @@ -355,7 +353,7 @@ mod kucoin_tests { .build_many_distributed() .unwrap(); - mutlistream_util(builder, 50).await; + assert!(timeout_function(15, mutlistream_util(builder, 50)).await); } #[tokio::test(flavor = "multi_thread", worker_threads = 3)] @@ -368,7 +366,7 @@ mod kucoin_tests { .await .unwrap(); - mutlistream_util(builder, 1000).await; + assert!(timeout_function(145, mutlistream_util(builder, 1000)).await); } #[tokio::test(flavor = "multi_thread", worker_threads = 3)] @@ -380,7 +378,7 @@ mod kucoin_tests { let builder = KucoinWsBuilder::build_from_all_instruments(&channels) .await .unwrap(); - mutlithreaded_util(builder, 1000).await; + assert!(timeout_function(145, mutlithreaded_util(builder, 1000)).await); } } @@ -409,7 +407,7 @@ mod bybit_tests { let builder = BybitWsBuilder::default().add_channel( BybitWsChannel::new_trade(vec![RawTradingPair::new_raw("ETH_USDt", '_'), RawTradingPair::new_no_delim("BTC-USdc")]).unwrap() ); - bybit_util(builder, 5).await; + assert!(timeout_function(5, bybit_util(builder, 5)).await); } #[tokio::test] @@ -419,7 +417,7 @@ mod bybit_tests { let builder = BybitWsBuilder::default().add_channel( BybitWsChannel::new_ticker(vec![RawTradingPair::new_raw("ETH_USDt", '_'), RawTradingPair::new_no_delim("BTC-USdc")]).unwrap() ); - bybit_util(builder, 5).await; + assert!(timeout_function(5, bybit_util(builder, 5)).await); } #[tokio::test] @@ -434,7 +432,7 @@ mod bybit_tests { .build_many_distributed() .unwrap(); - mutlistream_util(builder, 50).await; + assert!(timeout_function(5, mutlistream_util(builder, 50)).await); } #[tokio::test(flavor = "multi_thread", worker_threads = 3)] @@ -447,7 +445,7 @@ mod bybit_tests { .await .unwrap(); - mutlistream_util(builder, 1000).await; + assert!(timeout_function(15, mutlistream_util(builder, 1000)).await); } #[tokio::test(flavor = "multi_thread", worker_threads = 3)] @@ -459,6 +457,6 @@ mod bybit_tests { let builder = BybitWsBuilder::build_from_all_instruments(&channels) .await .unwrap(); - mutlithreaded_util(builder, 1000).await; + assert!(timeout_function(15, mutlithreaded_util(builder, 1000)).await); } } From 9ddf99a5cb00ade82f04e21929452ef19cb88741 Mon Sep 17 00:00:00 2001 From: Tim Snyder Date: Tue, 2 Jul 2024 11:29:00 -0400 Subject: [PATCH 07/15] export max conns per stream --- src/exchanges/kucoin/ws/builder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/exchanges/kucoin/ws/builder.rs b/src/exchanges/kucoin/ws/builder.rs index e05c6f1..47edff2 100644 --- a/src/exchanges/kucoin/ws/builder.rs +++ b/src/exchanges/kucoin/ws/builder.rs @@ -11,7 +11,7 @@ use crate::{ /// There is a limit of 300 connections per attempt every 5 minutes per IP. const MAX_KUCOIN_STREAMS: usize = 300; /// A single connection can listen to a maximum of 100 streams. -const MAX_KUCOIN_WS_CONNS_PER_STREAM: usize = 100; +pub const MAX_KUCOIN_WS_CONNS_PER_STREAM: usize = 100; #[derive(Debug, Clone, Default)] pub struct KucoinWsBuilder { From 4b2d658579805d037c7527194d1149b8259e13b2 Mon Sep 17 00:00:00 2001 From: Tim Snyder Date: Tue, 2 Jul 2024 10:07:43 -0400 Subject: [PATCH 08/15] fix a log message --- src/exchanges/bybit/ws/message.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/exchanges/bybit/ws/message.rs b/src/exchanges/bybit/ws/message.rs index fb1982f..6f725b8 100644 --- a/src/exchanges/bybit/ws/message.rs +++ b/src/exchanges/bybit/ws/message.rs @@ -35,7 +35,7 @@ impl BybitWsMessage { } } - Err(eyre::ErrReport::msg(format!("Could not deserialize kucoin ws message: {:?}", value))) + Err(eyre::ErrReport::msg(format!("Could not deserialize bybit ws message: {:?}", value))) } } From ebed82ff7d04746ebf95c603882c675656d57023 Mon Sep 17 00:00:00 2001 From: Tim Snyder Date: Tue, 2 Jul 2024 10:35:21 -0400 Subject: [PATCH 09/15] update comment --- src/exchanges/bybit/ws/builder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/exchanges/bybit/ws/builder.rs b/src/exchanges/bybit/ws/builder.rs index c513fcf..15bef26 100644 --- a/src/exchanges/bybit/ws/builder.rs +++ b/src/exchanges/bybit/ws/builder.rs @@ -8,7 +8,7 @@ use crate::{ normalized::ws::NormalizedWsChannels }; -/// There is a limit of 500 connections per 5 minutes per IP. +/// There is a limit of 300 connections per 5 minutes per IP. const MAX_BYBIT_STREAMS: usize = 300; /// A single connection can listen to a maximum of 10 streams. const MAX_BYBIT_WS_CONNS_PER_STREAM: usize = 10; From 3ebab568ff773467dcb3f1221f6af5a2858f73f6 Mon Sep 17 00:00:00 2001 From: Tim Snyder Date: Tue, 2 Jul 2024 11:31:00 -0400 Subject: [PATCH 10/15] export conns per stream --- src/exchanges/bybit/ws/builder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/exchanges/bybit/ws/builder.rs b/src/exchanges/bybit/ws/builder.rs index 15bef26..f85ac42 100644 --- a/src/exchanges/bybit/ws/builder.rs +++ b/src/exchanges/bybit/ws/builder.rs @@ -11,7 +11,7 @@ use crate::{ /// There is a limit of 300 connections per 5 minutes per IP. const MAX_BYBIT_STREAMS: usize = 300; /// A single connection can listen to a maximum of 10 streams. -const MAX_BYBIT_WS_CONNS_PER_STREAM: usize = 10; +pub const MAX_BYBIT_WS_CONNS_PER_STREAM: usize = 10; #[derive(Debug, Clone, Default)] pub struct BybitWsBuilder { From 98a842c3a4f0f9a5cbfd4cc791f620aec57fa859 Mon Sep 17 00:00:00 2001 From: Tim Snyder Date: Tue, 2 Jul 2024 11:34:18 -0400 Subject: [PATCH 11/15] export conns per stream --- src/exchanges/binance/ws/builder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/exchanges/binance/ws/builder.rs b/src/exchanges/binance/ws/builder.rs index adb9d07..150c0b1 100644 --- a/src/exchanges/binance/ws/builder.rs +++ b/src/exchanges/binance/ws/builder.rs @@ -13,7 +13,7 @@ use crate::{ const MAX_BINANCE_STREAMS: usize = 300; /// A single connection can listen to a maximum of 1024 streams. /// (https://binance-docs.github.io/apidocs/spot/en/#limits) -const MAX_BINANCE_WS_CONNS_PER_STREAM: usize = 1024; +pub const MAX_BINANCE_WS_CONNS_PER_STREAM: usize = 1024; #[derive(Debug, Clone, Default)] pub struct BinanceWsBuilder { From 2a3e3e12fec78af827ee9d66cd213795a1b94066 Mon Sep 17 00:00:00 2001 From: Tim Snyder Date: Tue, 2 Jul 2024 11:35:08 -0400 Subject: [PATCH 12/15] export conns per stream --- src/exchanges/okex/ws/builder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/exchanges/okex/ws/builder.rs b/src/exchanges/okex/ws/builder.rs index 26793e3..889dadf 100644 --- a/src/exchanges/okex/ws/builder.rs +++ b/src/exchanges/okex/ws/builder.rs @@ -13,7 +13,7 @@ use crate::{ const MAX_OKEX_STREAMS: usize = 300; /// A single connection can listen to a maximum of 100 streams. /// Setting lower for io purposes -const MAX_OKEX_WS_CONNS_PER_STREAM: usize = 1024; +pub const MAX_OKEX_WS_CONNS_PER_STREAM: usize = 100; #[derive(Debug, Clone)] pub struct OkexWsBuilder { From 10a0b839b69f0206310b20a238124af681da2e2d Mon Sep 17 00:00:00 2001 From: Tim Snyder Date: Wed, 3 Jul 2024 15:34:34 -0400 Subject: [PATCH 13/15] handle empty askPx for okex --- src/exchanges/okex/ws/channels/tickers.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/exchanges/okex/ws/channels/tickers.rs b/src/exchanges/okex/ws/channels/tickers.rs index 2ed6713..02be8f6 100644 --- a/src/exchanges/okex/ws/channels/tickers.rs +++ b/src/exchanges/okex/ws/channels/tickers.rs @@ -22,9 +22,9 @@ pub struct OkexTicker { #[serde_as(as = "DisplayFromStr")] #[serde(rename = "lastSz")] pub last_size: f64, - #[serde_as(as = "DisplayFromStr")] + #[serde_as(as = "Option")] #[serde(rename = "askPx")] - pub ask_price: f64, + pub ask_price: Option, #[serde_as(as = "DisplayFromStr")] #[serde(rename = "askSz")] pub ask_amt: f64, @@ -74,7 +74,7 @@ impl OkexTicker { pair: self.pair.normalize(), time: DateTime::from_timestamp_millis(self.timestamp as i64).unwrap(), ask_amount: self.ask_amt, - ask_price: self.ask_price, + ask_price: self.ask_price.unwrap_or_default(), bid_amount: self.bid_amt, bid_price: self.bid_price, quote_id: None @@ -90,7 +90,7 @@ impl PartialEq for OkexTicker { && other.bid_amount == self.bid_amt && other.bid_price == self.bid_price && other.ask_amount == self.ask_amt - && other.ask_price == self.ask_price + && other.ask_price == self.ask_price.unwrap_or_default() && other.quote_id.is_none(); if !equals { From 4975e7ee5cff0ee995425c54a1b688a4775efeba Mon Sep 17 00:00:00 2001 From: Tim Snyder Date: Fri, 5 Jul 2024 09:22:51 -0400 Subject: [PATCH 14/15] all tests pass --- src/exchanges/mod.rs | 12 ++++++++++-- tests/ws.rs | 2 +- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/src/exchanges/mod.rs b/src/exchanges/mod.rs index f7700ba..21172ba 100644 --- a/src/exchanges/mod.rs +++ b/src/exchanges/mod.rs @@ -159,7 +159,11 @@ impl CexExchange { /// /// if calling without a filter: /// ``` - /// CexExchange::Okex.get_all_currencies::(None).await; + /// use cex_exchanges::{CexExchange, EmptyFilter}; + /// async { + /// CexExchange::Okex.get_all_currencies::(None).await; + /// }; + /// () /// ``` pub async fn get_all_currencies(self, filter: Option) -> Result, RestApiError> where @@ -212,7 +216,11 @@ impl CexExchange { /// /// if calling without a filter: /// ``` - /// CexExchange::Okex.get_all_instruments::(None).await; + /// use cex_exchanges::{CexExchange, EmptyFilter}; + /// async { + /// CexExchange::Okex.get_all_instruments::(None).await; + /// }; + /// () /// ``` pub async fn get_all_instruments(self, filter: Option) -> Result, RestApiError> where diff --git a/tests/ws.rs b/tests/ws.rs index 724a449..fda14b3 100644 --- a/tests/ws.rs +++ b/tests/ws.rs @@ -127,7 +127,7 @@ mod okex_tests { use super::*; async fn okex_util(builder: OkexWsBuilder, iterations: usize) { - assert!(timeout_function(5, stream_util(builder.build_single(), iterations)).await); + assert!(timeout_function(15, stream_util(builder.build_single(), iterations)).await); } #[tokio::test] From 2928e9bf1b2abdfcee7cd9d359a1cf53be8e7247 Mon Sep 17 00:00:00 2001 From: Tim Snyder Date: Fri, 5 Jul 2024 09:28:06 -0400 Subject: [PATCH 15/15] ready for review --- Cargo.toml | 3 +- src/exchanges/binance/mod.rs | 1 - .../binance/rest_api/endpoints/instruments.rs | 102 +++++++++--------- 3 files changed, 52 insertions(+), 54 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 186a353..b701522 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,5 +51,4 @@ non-us = [] us = [] [dev-dependencies] - -once_cell = "1.18.0" \ No newline at end of file +once_cell = "1.18.0" diff --git a/src/exchanges/binance/mod.rs b/src/exchanges/binance/mod.rs index 25d36eb..0fa800c 100644 --- a/src/exchanges/binance/mod.rs +++ b/src/exchanges/binance/mod.rs @@ -170,7 +170,6 @@ impl Exchange for Binance { .await .map(|v| BinanceRestApiResponse::Instruments(v)) }; - info!(target: "cex-exchanges::binance", "response: {:?}", api_response); if let Err(e) = api_response.as_ref() { error!(target: "cex-exchanges::binance", "error calling rest-api endpoint {:?} -- {:?}", api_channel, e); diff --git a/src/exchanges/binance/rest_api/endpoints/instruments.rs b/src/exchanges/binance/rest_api/endpoints/instruments.rs index c6aa01f..9f48896 100644 --- a/src/exchanges/binance/rest_api/endpoints/instruments.rs +++ b/src/exchanges/binance/rest_api/endpoints/instruments.rs @@ -166,55 +166,55 @@ impl PartialEq for BinanceInstrument { } } -// #[cfg(test)] -// mod tests { -// use crate::normalized::types::NormalizedTradingPair; - -// use super::*; - -// #[test] -// fn test_binance_instrument_normalize() { -// let bi = BinanceInstrument { -// symbol: BinanceTradingPair("ETHBTC".to_string()), -// status: "TRADING".to_string(), -// base_asset: "ETH".to_string(), -// base_asset_precision: 8, -// quote_asset: "BTC".to_string(), -// quote_precision: 8, -// quote_asset_precision: 8, -// order_types: vec!["LIMIT".to_string(), "LIMIT_MAKER".to_string(), "MARKET".to_string(), "STOP_LOSS_LIMIT".to_string(), "TAKE_PROFIT_LIMIT".to_string()], -// iceberg_allowed: true, -// oco_allowed: true, -// quote_order_qty_market_allowed: true, -// allow_trailing_stop: true, -// cancel_replace_allowed: true, -// is_spot_trading_allowed: true, -// is_margin_trading_allowed: true, -// permission_sets: vec![vec![BinanceTradingType::Spot, BinanceTradingType::Margin, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other]], -// permissions: vec![], -// default_self_trade_prevention_mode: "EXPIRE_MAKER".to_string(), -// allowed_self_trade_prevention_modes: vec!["EXPIRE_TAKER".to_string(), "EXPIRE_MAKER".to_string(), "EXPIRE_BOTH".to_string()], -// }; +#[cfg(test)] +mod tests { + use crate::normalized::types::NormalizedTradingPair; + + use super::*; + + #[test] + fn test_binance_instrument_normalize() { + let bi = BinanceInstrument { + symbol: BinanceTradingPair("ETHBTC".to_string()), + status: "TRADING".to_string(), + base_asset: "ETH".to_string(), + base_asset_precision: 8, + quote_asset: "BTC".to_string(), + quote_precision: 8, + quote_asset_precision: 8, + order_types: vec!["LIMIT".to_string(), "LIMIT_MAKER".to_string(), "MARKET".to_string(), "STOP_LOSS_LIMIT".to_string(), "TAKE_PROFIT_LIMIT".to_string()], + iceberg_allowed: true, + oco_allowed: true, + quote_order_qty_market_allowed: true, + allow_trailing_stop: true, + cancel_replace_allowed: true, + is_spot_trading_allowed: true, + is_margin_trading_allowed: true, + permission_sets: vec![vec![BinanceTradingType::Spot, BinanceTradingType::Margin, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other, BinanceTradingType::Other]], + permissions: vec![], + default_self_trade_prevention_mode: "EXPIRE_MAKER".to_string(), + allowed_self_trade_prevention_modes: vec!["EXPIRE_TAKER".to_string(), "EXPIRE_MAKER".to_string(), "EXPIRE_BOTH".to_string()], + }; -// let expected = vec![ -// NormalizedInstrument { -// exchange: CexExchange::Binance, -// trading_pair: NormalizedTradingPair::new_base_quote(CexExchange::Binance, "ETH", "BTC", None, None), -// trading_type: NormalizedTradingType::Spot, -// base_asset_symbol: "ETH".to_string(), -// quote_asset_symbol: "BTC".to_string(), -// active: true, -// futures_expiry: None -// }, NormalizedInstrument { -// exchange: CexExchange::Binance, -// trading_pair: NormalizedTradingPair::new_base_quote(CexExchange::Binance, "ETH", "BTC", None, None), -// trading_type: NormalizedTradingType::Margin, -// base_asset_symbol: "ETH".to_string(), -// quote_asset_symbol: "BTC".to_string(), -// active: true, -// futures_expiry: None -// } -// ]; -// assert_eq!(bi.normalize(), expected); -// } -// } \ No newline at end of file + let expected = vec![ + NormalizedInstrument { + exchange: CexExchange::Binance, + trading_pair: NormalizedTradingPair::new_base_quote(CexExchange::Binance, "ETH", "BTC", None, None), + trading_type: NormalizedTradingType::Spot, + base_asset_symbol: "ETH".to_string(), + quote_asset_symbol: "BTC".to_string(), + active: true, + futures_expiry: None + }, NormalizedInstrument { + exchange: CexExchange::Binance, + trading_pair: NormalizedTradingPair::new_base_quote(CexExchange::Binance, "ETH", "BTC", None, None), + trading_type: NormalizedTradingType::Margin, + base_asset_symbol: "ETH".to_string(), + quote_asset_symbol: "BTC".to_string(), + active: true, + futures_expiry: None + } + ]; + assert_eq!(bi.normalize(), expected); + } +} \ No newline at end of file