Skip to content

Feature implementation from commits 76b328b..8f3ca5e #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 30 commits into
base: feature-base-branch-2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
7a320de
Refine DefiDataCommand equality
cjdsellers Jun 21, 2025
4913977
Implement live pool swaps for blockchain adapter
cjdsellers Jun 21, 2025
06583e0
Fix value type display and debug for wei precision
cjdsellers Jun 21, 2025
e4f52ee
Implement pool swap streams for blockchain adapter
cjdsellers Jun 21, 2025
8cace05
Improve pool swap transform and tests
cjdsellers Jun 21, 2025
d473f5a
Fix max float precision import for display and debug
cjdsellers Jun 21, 2025
9457026
Fix circular import in DataEngine
cjdsellers Jun 21, 2025
da6cae8
Update IB adapter documentation (#2729)
faysou Jun 21, 2025
acc46d7
Improve implementation, validations and testing for Rust instruments …
nicolad Jun 21, 2025
eadb901
Gracefully handle negative balances (#2730)
ms32035 Jun 22, 2025
6b642e0
Fix catalog identifier matching to exact match (#2732)
faysou Jun 22, 2025
bf53b70
Update dependencies and release notes
cjdsellers Jun 22, 2025
b04b9a5
Fix pre-commit cargo doc lint
cjdsellers Jun 22, 2025
e561cb9
Refine new balance calculation
cjdsellers Jun 22, 2025
598cdd8
Improve implementation, validations and testing for Rust instruments …
nicolad Jun 23, 2025
1674b54
Refine type casting to avoid lint error
cjdsellers Jun 23, 2025
37c68fc
Update dependencies and release notes
cjdsellers Jun 23, 2025
d44d39c
Remove redundant comment
cjdsellers Jun 23, 2025
196092f
Refine instrument validations
cjdsellers Jun 23, 2025
2caaa26
Fix instrument notional validations
cjdsellers Jun 24, 2025
6c560f8
Update Tardis exchange mappings
cjdsellers Jun 25, 2025
24c2efe
Fix clippy lints
cjdsellers Jun 25, 2025
7e2f267
Update Tardis exchange mappings
cjdsellers Jun 25, 2025
9847e75
Add check_positive_decimal correctness function (#2736)
nicolad Jun 25, 2025
24eaca0
Update dependencies and release notes
cjdsellers Jun 25, 2025
339f9e0
Add check_positive_money correctness function (#2738)
nicolad Jun 26, 2025
e39765f
Refine value type correctness functions
cjdsellers Jun 26, 2025
0a467d3
Update pre-commit
cjdsellers Jun 26, 2025
17c89d6
Update Tardis docs
cjdsellers Jun 26, 2025
8f3ca5e
Update crate docs
cjdsellers Jun 26, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ repos:
- id: cargo-doc
name: cargo doc
description: Check documentation builds without errors or warnings.
entry: bash -c 'RUSTDOCFLAGS="--cfg docsrs -D warnings" cargo doc --all-features --no-deps --workspace --quiet'
entry: bash -c 'RUSTDOCFLAGS="--cfg docsrs -D warnings" cargo doc --features "high-precision,ffi,python,extension-module" --no-deps --workspace --quiet'
language: system
files: '\.(rs|toml)$'
types: [file]
Expand Down Expand Up @@ -163,7 +163,7 @@ repos:
]

- repo: https://github.com/astral-sh/uv-pre-commit
rev: 0.7.13 # uv version
rev: 0.7.15 # uv version
hooks:
- id: uv-lock

Expand Down
23 changes: 13 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ clap = { version = "4.5.39", features = ["derive", "env"] }
compare = "0.1.0"
csv = "1.3.1"
dashmap = "6.1.0"
databento = { version = "0.27.0", default-features = false, features = ["historical", "live"] }
databento = { version = "0.27.1", default-features = false, features = ["historical", "live"] }
datafusion = { version = "48.0.0", default-features = false, features = [
"parquet",
"regex_expressions",
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ cargo-test-crate-%: RUST_BACKTRACE=1
cargo-test-crate-%: HIGH_PRECISION=true
cargo-test-crate-%: check-nextest
cargo-test-crate-%:
cargo nextest run --lib --no-default-features --features "ffi,python,high-precision,defi,stubs" --no-fail-fast --cargo-profile nextest -p $*
cargo nextest run --lib --no-default-features --all-features --no-fail-fast --cargo-profile nextest -p $*

.PHONY: cargo-bench
cargo-bench:
Expand Down
10 changes: 8 additions & 2 deletions RELEASES.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,17 @@ Released on TBD (UTC).
- Added property-based testing for `TestTimer` in Rust
- Added property-based testing for `network` crate in Rust
- Added chaos testing with `turmoil` for socket clients in Rust
- Added `check_positive_decimal` correctness function and use for instrument validations (#2736), thanks @nicolad
- Added `check_positive_money` correctness function and use for instrument validations (#2738), thanks @nicolad
- Ported data catalog refactor to Rust (#2681, #2720), thanks @faysou
- Consolidated the clocks and timers v2 feature from @twitu
- Consolidated on pure Rust cryptography crates with no dependencies on native certs or openssl
- Consolidated on `aws-lc-rs` cryptography for FIPS compliance
- Confirmed parity between Cython and Rust indicators (#2700, #2710, #2713), thanks @nicolad
- Implemented `From<Pool>` -> `CurrencyPair` & `InstrumentAny` (#2693), thanks @nicolad
- Updated Tardis exchange mappings
- Improved handling of negative balances in backtests (#2730), thanks @ms32035
- Improved implementation, validations and testing for Rust instruments (#2723, #2733), thanks @nicolad
- Improved `Currency` equality to use `strcmp` to avoid C pointer comparison issues with `ustr` string interning
- Improved unsubscribe cleanup(s) for Bybit adapter
- Refactored IB adapter (#2647), thanks @faysou
Expand All @@ -44,7 +49,7 @@ Released on TBD (UTC).
- Refined signal serialization and tests (#2705), thanks @faysou
- Refined CI/CD and build system (#2707), thanks @stastnypremysl
- Upgraded Cython to v3.1.2
- Upgraded `databento` crate to v0.27.0
- Upgraded `databento` crate to v0.27.1
- Upgraded `datafusion` crate to v48.0.0
- Upgraded `pyo3` and `pyo3-async-runtimes` crates to v0.25.1
- Upgraded `redis` crate to v0.32.2
Expand All @@ -69,6 +74,7 @@ Released on TBD (UTC).
- Fixed registration of encoder and decoder for `BinanceBar`, thanks for reporting @miller-moore
- Fixed spot and futures sandbox for Binance (#2687), thanks @petioptrv
- Fixed `clean` and `distclean` make targets entering `.venv` and corrupting the Python virtual env, thanks @faysou
- Fixed catalog identifier matching to exact match (#2732), thanks @faysou
- Fixed last value updating for RSI indicator (#2703), thanks @bartlaw
- Fixed gateway/TWS reconnect process for IBKR (#2710), thanks @bartlaw
- Fixed Interactive Brokers options chain issue (#2711), thanks @FGU1
Expand All @@ -79,7 +85,7 @@ Released on TBD (UTC).
- Restore task error logs for IBKR (#2716), thanks @bartlaw

### Documentation Updates
None
- Updated IB adapter documentation (#2729), thanks @faysou

### Deprecations
- Deprecated `Portfolio.set_specific_venue(...)`, to be removed in a future release; use `Cache.set_specific_venue(...)` instead
Expand Down
1 change: 1 addition & 0 deletions crates/adapters/blockchain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ nautilus-model = { workspace = true, features = ["defi"] }
nautilus-network = { workspace = true }
nautilus-system = { workspace = true }

ahash = { workspace = true }
alloy = { workspace = true }
anyhow = { workspace = true }
async-stream = { workspace = true }
Expand Down
13 changes: 9 additions & 4 deletions crates/adapters/blockchain/bin/node_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

use std::{
ops::{Deref, DerefMut},
str::FromStr,
sync::Arc,
time::Duration,
};
Expand All @@ -25,7 +26,8 @@ use nautilus_blockchain::{
};
use nautilus_common::{
actor::{DataActor, DataActorCore, data_actor::DataActorConfig},
enums::Environment,
enums::{Environment, LogColor},
logging::log_info,
};
use nautilus_core::env::get_env_var;
use nautilus_live::node::LiveNode;
Expand Down Expand Up @@ -77,8 +79,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create and register a blockchain subscriber actor
let client_id = ClientId::new(format!("BLOCKCHAIN-{}", chain.name));
let pools = vec![
// Address::from("0xC31E54c7A869B9fCbECC14363CF510d1C41Fa443"), // WETH/USDC Arbitrum One
// Address::from(0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640), // USDC/ETH 0.05% on Uniswap V3
Address::from_str("0xC31E54c7A869B9fCbECC14363CF510d1C41Fa443")?, // WETH/USDC Arbitrum One
];

let actor_config = BlockchainSubscriberActorConfig::new(client_id, chain.name, pools);
Expand Down Expand Up @@ -120,7 +121,7 @@ impl BlockchainSubscriberActorConfig {
/// A basic blockchain subscriber actor that monitors DeFi activities.
///
/// This actor demonstrates how to use the `DataActor` trait to monitor blockchain data
/// from DEXs, pools, and other DeFi protocols. It logs received swaps and liquidity updates
/// from DEXs, pools, and other DeFi protocols. It logs received blocks and swaps
/// to demonstrate the data flow.
#[derive(Debug)]
pub struct BlockchainSubscriberActor {
Expand Down Expand Up @@ -198,11 +199,15 @@ impl DataActor for BlockchainSubscriberActor {
}

fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
log_info!("Received {block}", color = LogColor::Cyan);

self.received_blocks.push(block.clone());
Ok(())
}

fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
log_info!("Received {swap}", color = LogColor::Cyan);

self.received_pool_swaps.push(swap.clone());
Ok(())
}
Expand Down
109 changes: 56 additions & 53 deletions crates/adapters/blockchain/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use nautilus_common::{
},
runner::get_data_event_sender,
};
use nautilus_core::UnixNanos;
use nautilus_data::client::DataClient;
use nautilus_infrastructure::sql::pg::PostgresConnectOptions;
use nautilus_model::{
Expand Down Expand Up @@ -173,20 +174,18 @@ impl BlockchainDataClient {
/// Spawns a unified task that handles both commands and data from the same client instances.
/// This replaces both the command processor and hypersync forwarder with a single unified handler.
fn spawn_process_task(&mut self) {
let command_rx = match self.command_rx.take() {
Some(r) => r,
None => {
tracing::error!("Command receiver already taken, not spawning handler");
return;
}
let command_rx = if let Some(r) = self.command_rx.take() {
r
} else {
tracing::error!("Command receiver already taken, not spawning handler");
return;
};

let hypersync_rx = match self.hypersync_rx.take() {
Some(r) => r,
None => {
tracing::error!("HyperSync receiver already taken, not spawning handler");
return;
}
let hypersync_rx = if let Some(r) = self.hypersync_rx.take() {
r
} else {
tracing::error!("HyperSync receiver already taken, not spawning handler");
return;
};

let mut hypersync_client = std::mem::replace(
Expand All @@ -205,43 +204,37 @@ impl BlockchainDataClient {
loop {
tokio::select! {
command = command_rx.recv() => {
match command {
Some(cmd) => {
if let Err(e) = Self::process_command(
cmd,
&mut hypersync_client,
rpc_client.as_mut()
).await {
tracing::error!("Error processing command: {e}");
}
}
None => {
tracing::debug!("Command channel closed");
break;
if let Some(cmd) = command {
if let Err(e) = Self::process_command(
cmd,
&mut hypersync_client,
rpc_client.as_mut()
).await {
tracing::error!("Error processing command: {e}");
}
} else {
tracing::debug!("Command channel closed");
break;
}
}
data = hypersync_rx.recv() => {
match data {
Some(msg) => {
let data_event = match msg {
BlockchainMessage::Block(block) => {
DataEvent::DeFi(DefiData::Block(block))
}
BlockchainMessage::Swap(swap) => {
DataEvent::DeFi(DefiData::PoolSwap(swap))
}
};

if let Err(e) = data_sender.send(data_event) {
tracing::error!("Failed to send data event: {e}");
break;
if let Some(msg) = data {
let data_event = match msg {
BlockchainMessage::Block(block) => {
DataEvent::DeFi(DefiData::Block(block))
}
}
None => {
tracing::debug!("HyperSync data channel closed");
BlockchainMessage::Swap(swap) => {
DataEvent::DeFi(DefiData::PoolSwap(swap))
}
};

if let Err(e) = data_sender.send(data_event) {
tracing::error!("Failed to send data event: {e}");
break;
}
} else {
tracing::debug!("HyperSync data channel closed");
break;
}
}
}
Expand Down Expand Up @@ -302,10 +295,21 @@ impl BlockchainDataClient {
tracing::warn!("Pool subscriptions are handled at application level");
Ok(())
}
DefiSubscribeCommand::PoolSwaps(_cmd) => {
tracing::info!("Processing subscribe pool swaps command");
tracing::warn!("Pool swaps subscription not yet implemented");
// TODO: Implement actual pool swaps subscription logic
DefiSubscribeCommand::PoolSwaps(cmd) => {
tracing::info!(
"Processing subscribe pool swaps command for address: {}",
cmd.address
);

if let Some(ref mut _rpc) = rpc_client {
tracing::warn!(
"RPC pool swaps subscription not yet implemented, using HyperSync"
);
}

hypersync_client.subscribe_pool_swaps(cmd.address);
tracing::info!("Subscribed to pool swaps for address: {}", cmd.address);

Ok(())
}
DefiSubscribeCommand::PoolLiquidityUpdates(_cmd) => {
Expand Down Expand Up @@ -686,7 +690,7 @@ impl BlockchainDataClient {
self.cache.get_token(&event.token1).cloned().unwrap(),
event.fee,
event.tick_spacing,
nautilus_core::UnixNanos::default(), // Use default timestamp for now
UnixNanos::default(), // TODO: Use default timestamp for now
);
self.cache.add_pool(pool.clone()).await?;

Expand All @@ -705,12 +709,11 @@ impl BlockchainDataClient {
pub async fn process_hypersync_messages(&mut self) {
tracing::info!("Starting task 'process_hypersync_messages'");

let mut rx = match self.hypersync_rx.take() {
Some(r) => r,
None => {
tracing::warn!("HyperSync receiver already taken, not spawning forwarder");
return;
}
let mut rx = if let Some(r) = self.hypersync_rx.take() {
r
} else {
tracing::warn!("HyperSync receiver already taken, not spawning forwarder");
return;
};

while let Some(msg) = rx.recv().await {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,9 +374,9 @@ mod tests {
fn mint_event_log() -> Log {
serde_json::from_str(r#"{
"removed": null,
"log_index": null,
"transaction_index": null,
"transaction_hash": null,
"log_index": "0xa",
"transaction_index": "0x5",
"transaction_hash": "0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef",
"block_hash": null,
"block_number": "0x1581756",
"address": null,
Expand Down
Loading