Skip to content

Commit e31cfea

Browse files
committed
Refine blockchain adapter
1 parent f720a4c commit e31cfea

File tree

4 files changed

+36
-43
lines changed

4 files changed

+36
-43
lines changed

crates/adapters/blockchain/src/data/client.rs

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,6 @@ use nautilus_model::{
3030
defi::{DefiData, SharedChain, validation::validate_address},
3131
identifiers::{ClientId, Venue},
3232
};
33-
use tokio::sync::{
34-
mpsc::{UnboundedReceiver, UnboundedSender},
35-
oneshot,
36-
};
3733

3834
use crate::{
3935
config::BlockchainDataClientConfig,
@@ -63,13 +59,13 @@ pub struct BlockchainDataClient {
6359
/// Channel receiver for messages from the HyperSync client.
6460
hypersync_rx: Option<tokio::sync::mpsc::UnboundedReceiver<BlockchainMessage>>,
6561
/// Channel sender for commands to be processed asynchronously.
66-
command_tx: UnboundedSender<DefiDataCommand>,
62+
command_tx: tokio::sync::mpsc::UnboundedSender<DefiDataCommand>,
6763
/// Channel receiver for commands to be processed asynchronously.
68-
command_rx: Option<UnboundedReceiver<DefiDataCommand>>,
64+
command_rx: Option<tokio::sync::mpsc::UnboundedReceiver<DefiDataCommand>>,
6965
/// Background task for processing messages.
7066
process_task: Option<tokio::task::JoinHandle<()>>,
7167
/// Oneshot channel sender for graceful shutdown signal.
72-
shutdown_tx: Option<oneshot::Sender<()>>,
68+
shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
7369
}
7470

7571
impl BlockchainDataClient {
@@ -122,7 +118,7 @@ impl BlockchainDataClient {
122118
return;
123119
};
124120

125-
let (shutdown_tx, shutdown_rx) = oneshot::channel();
121+
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
126122
self.shutdown_tx = Some(shutdown_tx);
127123

128124
let mut core_client = self.core_client.take().unwrap();
@@ -205,7 +201,6 @@ impl BlockchainDataClient {
205201
Ok(swap) => Some(DataEvent::DeFi(DefiData::PoolSwap(swap))),
206202
Err(e) => {
207203
tracing::error!("Error processing pool swap event: {e}");
208-
println!("FAIL");
209204
None
210205
}
211206
}
@@ -217,7 +212,6 @@ impl BlockchainDataClient {
217212
}
218213
}
219214
BlockchainMessage::BurnEvent(burn_event) => {
220-
println!("BURN EVENT-->>> {:?}", burn_event);
221215
match core_client.get_pool(&burn_event.pool_address) {
222216
Ok(pool) => {
223217
let dex_extended = get_dex_extended(core_client.chain.name, &pool.dex.name).expect("Failed to get dex extended");
@@ -240,7 +234,6 @@ impl BlockchainDataClient {
240234
}
241235
}
242236
BlockchainMessage::MintEvent(mint_event) => {
243-
println!("MINT EVENT-->>> {:?}", mint_event);
244237
match core_client.get_pool(&mint_event.pool_address) {
245238
Ok(pool) => {
246239
let dex_extended = get_dex_extended(core_client.chain.name,&pool.dex.name).expect("Failed to get dex extended");
@@ -266,8 +259,6 @@ impl BlockchainDataClient {
266259

267260
if let Some(event) = data_event {
268261
core_client.send_data(event);
269-
}else{
270-
println!("NO EVENT");
271262
}
272263
} else {
273264
tracing::debug!("HyperSync data channel closed");
@@ -496,7 +487,7 @@ impl BlockchainDataClient {
496487
pub async fn await_process_task_close(&mut self) {
497488
if let Some(handle) = self.process_task.take() {
498489
if let Err(e) = handle.await {
499-
tracing::error!("Process task join error: {}", e);
490+
tracing::error!("Process task join error: {e}");
500491
}
501492
}
502493
}

crates/adapters/blockchain/src/data/core.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use nautilus_model::defi::{
2323
Block, Blockchain, DefiData, DexType, Pool, PoolLiquidityUpdate, PoolSwap, SharedChain,
2424
SharedDex, SharedPool, Token, validation::validate_address,
2525
};
26-
use tokio::sync::mpsc::UnboundedSender;
2726

2827
use crate::{
2928
cache::BlockchainCache,
@@ -63,7 +62,7 @@ pub struct BlockchainDataClientCore {
6362
/// Interface for interacting with ERC20 token contracts.
6463
tokens: Erc20Contract,
6564
/// Channel sender for publishing data events to the `AsyncRunner`.
66-
data_sender: UnboundedSender<DataEvent>,
65+
data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
6766
/// Client for the HyperSync data indexing service.
6867
pub hypersync_client: HyperSyncClient,
6968
/// Optional WebSocket RPC client for direct blockchain node communication.
@@ -77,7 +76,7 @@ impl BlockchainDataClientCore {
7776
pub fn new(
7877
chain: SharedChain,
7978
config: BlockchainDataClientConfig,
80-
hypersync_tx: UnboundedSender<BlockchainMessage>,
79+
hypersync_tx: tokio::sync::mpsc::UnboundedSender<BlockchainMessage>,
8180
) -> Self {
8281
let cache = BlockchainCache::new(chain.clone());
8382
let rpc_client = if !config.use_hypersync_for_live_data && config.wss_rpc_url.is_some() {

crates/adapters/blockchain/src/data/subscription.rs

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@
1313
// limitations under the License.
1414
// -------------------------------------------------------------------------------------------------
1515

16-
use std::collections::HashMap;
17-
18-
use ahash::{HashSet, HashSetExt};
16+
use ahash::{AHashMap, AHashSet};
1917
use alloy::primitives::{Address, keccak256};
2018
use nautilus_model::defi::DexType;
2119

@@ -25,30 +23,30 @@ use nautilus_model::defi::DexType;
2523
/// and maintains the event signature encodings for efficient filtering.
2624
#[derive(Debug)]
2725
pub struct DefiDataSubscriptionManager {
28-
subscribed_pool_swaps: HashMap<DexType, HashSet<Address>>,
29-
pool_swap_event_encoded: HashMap<DexType, String>,
30-
subscribed_pool_mints: HashMap<DexType, HashSet<Address>>,
31-
pool_mint_event_encoded: HashMap<DexType, String>,
32-
subscribed_pool_burns: HashMap<DexType, HashSet<Address>>,
33-
pool_burn_event_encoded: HashMap<DexType, String>,
26+
subscribed_pool_swaps: AHashMap<DexType, AHashSet<Address>>,
27+
pool_swap_event_encoded: AHashMap<DexType, String>,
28+
subscribed_pool_mints: AHashMap<DexType, AHashSet<Address>>,
29+
pool_mint_event_encoded: AHashMap<DexType, String>,
30+
subscribed_pool_burns: AHashMap<DexType, AHashSet<Address>>,
31+
pool_burn_event_encoded: AHashMap<DexType, String>,
3432
}
3533

3634
impl DefiDataSubscriptionManager {
3735
/// Creates a new [`DefiDataSubscriptionManager`] instance.
3836
pub fn new() -> Self {
3937
Self {
40-
subscribed_pool_burns: HashMap::new(),
41-
subscribed_pool_mints: HashMap::new(),
42-
subscribed_pool_swaps: HashMap::new(),
43-
pool_swap_event_encoded: HashMap::new(),
44-
pool_burn_event_encoded: HashMap::new(),
45-
pool_mint_event_encoded: HashMap::new(),
38+
subscribed_pool_burns: AHashMap::new(),
39+
subscribed_pool_mints: AHashMap::new(),
40+
subscribed_pool_swaps: AHashMap::new(),
41+
pool_swap_event_encoded: AHashMap::new(),
42+
pool_burn_event_encoded: AHashMap::new(),
43+
pool_mint_event_encoded: AHashMap::new(),
4644
}
4745
}
4846

4947
/// Gets all unique contract addresses subscribed for any event type for a given DEX.
5048
pub fn get_subscribed_dex_contract_addresses(&self, dex: &DexType) -> Vec<Address> {
51-
let mut unique_addresses = HashSet::new();
49+
let mut unique_addresses = AHashSet::new();
5250

5351
if let Some(addresses) = self.subscribed_pool_swaps.get(dex) {
5452
unique_addresses.extend(addresses.iter().cloned());
@@ -105,39 +103,39 @@ impl DefiDataSubscriptionManager {
105103
mint_event_signature: &str,
106104
burn_event_signature: &str,
107105
) {
108-
self.subscribed_pool_swaps.insert(dex, HashSet::new());
106+
self.subscribed_pool_swaps.insert(dex, AHashSet::new());
109107
let swap_event_hash = keccak256(swap_event_signature.as_bytes());
110108
let encoded_swap_event = format!(
111109
"0x{encoded_hash}",
112110
encoded_hash = hex::encode(swap_event_hash)
113111
);
114112
self.pool_swap_event_encoded.insert(dex, encoded_swap_event);
115113

116-
self.subscribed_pool_mints.insert(dex, HashSet::new());
114+
self.subscribed_pool_mints.insert(dex, AHashSet::new());
117115
let mint_event_hash = keccak256(mint_event_signature.as_bytes());
118116
let encoded_mint_event = format!(
119117
"0x{encoded_hash}",
120118
encoded_hash = hex::encode(mint_event_hash)
121119
);
122120
self.pool_mint_event_encoded.insert(dex, encoded_mint_event);
123121

124-
self.subscribed_pool_burns.insert(dex, HashSet::new());
122+
self.subscribed_pool_burns.insert(dex, AHashSet::new());
125123
let burn_event_hash = keccak256(burn_event_signature.as_bytes());
126124
let encoded_burn_event = format!(
127125
"0x{encoded_hash}",
128126
encoded_hash = hex::encode(burn_event_hash)
129127
);
130128
self.pool_burn_event_encoded.insert(dex, encoded_burn_event);
131129

132-
tracing::info!("Registered dex for subscriptions: {:?}", dex);
130+
tracing::info!("Registered DEX for subscriptions: {dex:?}");
133131
}
134132

135133
/// Subscribes to swap events for a specific pool address on a DEX.
136134
pub fn subscribe_swaps(&mut self, dex: DexType, address: Address) {
137135
if let Some(pool_set) = self.subscribed_pool_swaps.get_mut(&dex) {
138136
pool_set.insert(address);
139137
} else {
140-
tracing::error!("Dex not registered for swap subscriptions: {:?}", dex);
138+
tracing::error!("DEX not registered for swap subscriptions: {dex:?}");
141139
}
142140
}
143141

@@ -146,7 +144,7 @@ impl DefiDataSubscriptionManager {
146144
if let Some(pool_set) = self.subscribed_pool_mints.get_mut(&dex) {
147145
pool_set.insert(address);
148146
} else {
149-
tracing::error!("Dex not registered for mint subscriptions: {:?}", dex);
147+
tracing::error!("DEX not registered for mint subscriptions: {dex:?}");
150148
}
151149
}
152150

@@ -155,7 +153,7 @@ impl DefiDataSubscriptionManager {
155153
if let Some(pool_set) = self.subscribed_pool_burns.get_mut(&dex) {
156154
pool_set.insert(address);
157155
} else {
158-
tracing::warn!("Dex not registered for burn subscriptions: {:?}", dex);
156+
tracing::warn!("DEX not registered for burn subscriptions: {dex:?}");
159157
}
160158
}
161159

@@ -164,7 +162,7 @@ impl DefiDataSubscriptionManager {
164162
if let Some(pool_set) = self.subscribed_pool_swaps.get_mut(&dex) {
165163
pool_set.remove(&address);
166164
} else {
167-
tracing::error!("Dex not registered for swap subscriptions: {:?}", dex);
165+
tracing::error!("DEX not registered for swap subscriptions: {dex:?}");
168166
}
169167
}
170168

@@ -173,7 +171,7 @@ impl DefiDataSubscriptionManager {
173171
if let Some(pool_set) = self.subscribed_pool_mints.get_mut(&dex) {
174172
pool_set.remove(&address);
175173
} else {
176-
tracing::error!("Dex not registered for mint subscriptions: {:?}", dex);
174+
tracing::error!("DEX not registered for mint subscriptions: {dex:?}");
177175
}
178176
}
179177

@@ -182,11 +180,15 @@ impl DefiDataSubscriptionManager {
182180
if let Some(pool_set) = self.subscribed_pool_burns.get_mut(&dex) {
183181
pool_set.remove(&address);
184182
} else {
185-
tracing::error!("Dex not registered for burn subscriptions: {:?}", dex);
183+
tracing::error!("DEX not registered for burn subscriptions: {dex:?}");
186184
}
187185
}
188186
}
189187

188+
////////////////////////////////////////////////////////////////////////////////
189+
// Tests
190+
////////////////////////////////////////////////////////////////////////////////
191+
190192
#[cfg(test)]
191193
mod tests {
192194
use alloy::primitives::address;

docs/developer_guide/rust.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,7 @@ Use consistent test module structure with section separators:
387387
////////////////////////////////////////////////////////////////////////////////
388388
// Tests
389389
////////////////////////////////////////////////////////////////////////////////
390+
390391
#[cfg(test)]
391392
mod tests {
392393
use rstest::rstest;

0 commit comments

Comments
 (0)