Skip to content

Add bar_build_delay to data engine config #2676

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 13 additions & 19 deletions crates/data/src/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -860,11 +860,10 @@ where
timer_name: String,
interval_ns: UnixNanos,
next_close_ns: UnixNanos,
composite_bar_build_delay: i64,
add_delay: bool,
bar_build_delay: u64,
batch_open_ns: UnixNanos,
batch_next_close_ns: UnixNanos,
time_bars_origin: Option<TimeDelta>,
time_bars_origin_offset: Option<TimeDelta>,
skip_first_non_full_bar: bool,
}

Expand All @@ -877,7 +876,7 @@ impl<H: FnMut(Bar)> Debug for TimeBarAggregator<H> {
.field("is_left_open", &self.is_left_open)
.field("timer_name", &self.timer_name)
.field("interval_ns", &self.interval_ns)
.field("composite_bar_build_delay", &self.composite_bar_build_delay)
.field("bar_build_delay", &self.bar_build_delay)
.field("skip_first_non_full_bar", &self.skip_first_non_full_bar)
.finish()
}
Expand Down Expand Up @@ -926,18 +925,15 @@ where
build_with_no_updates: bool,
timestamp_on_close: bool,
interval_type: BarIntervalType,
time_bars_origin: Option<TimeDelta>,
composite_bar_build_delay: i64,
time_bars_origin_offset: Option<TimeDelta>,
bar_build_delay: u64,
skip_first_non_full_bar: bool,
) -> Self {
let is_left_open = match interval_type {
BarIntervalType::LeftOpen => true,
BarIntervalType::RightOpen => false,
};

let add_delay = bar_type.is_composite()
&& bar_type.composite().aggregation_source() == AggregationSource::Internal;

let core = BarAggregatorCore::new(
bar_type.standard(),
price_precision,
Expand All @@ -958,11 +954,10 @@ where
timer_name: bar_type.to_string(),
interval_ns: get_bar_interval_ns(&bar_type),
next_close_ns: UnixNanos::default(),
composite_bar_build_delay,
add_delay,
bar_build_delay,
batch_open_ns: UnixNanos::default(),
batch_next_close_ns: UnixNanos::default(),
time_bars_origin,
time_bars_origin_offset,
skip_first_non_full_bar,
}
}
Expand All @@ -978,15 +973,14 @@ where
/// Panics if the underlying clock timer registration fails.
pub fn start(&mut self, callback: NewBarCallback<H>) -> anyhow::Result<()> {
let now = self.clock.borrow().utc_now();
let mut start_time = get_time_bar_start(now, &self.bar_type(), self.time_bars_origin);
let mut start_time =
get_time_bar_start(now, &self.bar_type(), self.time_bars_origin_offset);

if start_time == now {
self.skip_first_non_full_bar = false;
}

if self.add_delay {
start_time += TimeDelta::microseconds(self.composite_bar_build_delay);
}
start_time += TimeDelta::microseconds(self.bar_build_delay as i64);

let spec = &self.bar_type().spec();
let start_time_ns = UnixNanos::from(start_time);
Expand Down Expand Up @@ -1032,7 +1026,7 @@ where
self.core.batch_mode = true;

let time = time_ns.to_datetime_utc();
let start_time = get_time_bar_start(time, &self.bar_type(), self.time_bars_origin);
let start_time = get_time_bar_start(time, &self.bar_type(), self.time_bars_origin_offset);
self.batch_open_ns = UnixNanos::from(start_time);

if spec.aggregation == BarAggregation::Month {
Expand Down Expand Up @@ -1946,8 +1940,8 @@ mod tests {
true, // build_with_no_updates
false, // timestamp_on_close
BarIntervalType::LeftOpen,
None, // time_bars_origin
15, // composite_bar_build_delay
None, // time_bars_origin_offset
15, // bar_build_delay
false, // skip_first_non_full_bar
);

Expand Down
9 changes: 7 additions & 2 deletions examples/backtest/databento_test_request_bars.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# extension: .py
# format_name: percent
# format_version: '1.3'
# jupytext_version: 1.17.0
# jupytext_version: 1.16.6
# kernelspec:
# display_name: Python 3 (ipykernel)
# language: python
Expand Down Expand Up @@ -268,9 +268,12 @@ def on_stop(self):
]

data_engine = DataEngineConfig(
time_bars_origins={
time_bars_origin_offset={
BarAggregation.MINUTE: pd.Timedelta(seconds=0),
},
bar_build_delay=20,
# default is 15 when using composite bars aggregating internal bars
# also useful in live context to account for network delay
)

engine_config = BacktestEngineConfig(
Expand Down Expand Up @@ -332,3 +335,5 @@ def on_stop(self):

# %%
results = node.run()

# %%
5 changes: 2 additions & 3 deletions nautilus_trader/data/aggregation.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -99,17 +99,16 @@ cdef class TimeBarAggregator(BarAggregator):
cdef bint _build_on_next_tick
cdef uint64_t _stored_open_ns
cdef uint64_t _stored_close_ns
cdef tuple _cached_update
cdef str _timer_name
cdef bint _is_left_open
cdef bint _timestamp_on_close
cdef bint _skip_first_non_full_bar
cdef bint _build_with_no_updates
cdef int _composite_bar_build_delay
cdef int _bar_build_delay
cdef bint _add_delay
cdef uint64_t _batch_open_ns
cdef uint64_t _batch_next_close_ns
cdef object _time_bars_origin
cdef object _time_bars_origin_offset

cdef readonly timedelta interval
"""The aggregators time interval.\n\n:returns: `timedelta`"""
Expand Down
81 changes: 42 additions & 39 deletions nautilus_trader/data/aggregation.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -693,10 +693,12 @@ cdef class TimeBarAggregator(BarAggregator):
If will skip emitting a bar if the aggregation starts mid-interval.
build_with_no_updates : bool, default True
If build and emit bars with no new market updates.
time_bars_origin : pd.Timedelta or pd.DateOffset, optional
time_bars_origin_offset : pd.Timedelta or pd.DateOffset, optional
The origin time offset.
composite_bar_build_delay : int, default 15
bar_build_delay : int, default 0
The time delay (microseconds) before building and emitting a composite bar type.
15 microseconds can be useful in a backtest context, when aggregating internal bars
from internal bars several times so all messages are processed before a timer triggers.

Raises
------
Expand All @@ -714,34 +716,15 @@ cdef class TimeBarAggregator(BarAggregator):
bint timestamp_on_close = True,
bint skip_first_non_full_bar = False,
bint build_with_no_updates = True,
object time_bars_origin: pd.Timedelta | pd.DateOffset = None,
int composite_bar_build_delay = 15, # in microsecond
object time_bars_origin_offset: pd.Timedelta | pd.DateOffset = None,
int bar_build_delay = 0,
) -> None:
super().__init__(
instrument=instrument,
bar_type=bar_type.standard(),
handler=handler,
)

self._clock = clock
self.interval = self._get_interval()
self.interval_ns = self._get_interval_ns()
self._timer_name = None
self._set_build_timer()
self.next_close_ns = self._clock.next_time_ns(self._timer_name)
self._build_on_next_tick = False
cdef datetime now = self._clock.utc_now()
self._stored_open_ns = dt_to_unix_nanos(self.get_start_time(now))
self._stored_close_ns = 0
self._cached_update = None
self._build_with_no_updates = build_with_no_updates
self._timestamp_on_close = timestamp_on_close
self._composite_bar_build_delay = composite_bar_build_delay
self._add_delay = bar_type.is_composite() and bar_type.composite().is_internally_aggregated()
self._batch_open_ns = 0
self._batch_next_close_ns = 0
self._time_bars_origin = time_bars_origin
self._skip_first_non_full_bar = skip_first_non_full_bar

if interval_type == "left-open":
self._is_left_open = True
Expand All @@ -752,6 +735,26 @@ cdef class TimeBarAggregator(BarAggregator):
f"Invalid interval_type: {interval_type}. Must be 'left-open' or 'right-open'.",
)

self._timestamp_on_close = timestamp_on_close
self._skip_first_non_full_bar = skip_first_non_full_bar
self._build_with_no_updates = build_with_no_updates
self._time_bars_origin_offset = time_bars_origin_offset
self._bar_build_delay = bar_build_delay

self._timer_name = None
self._build_on_next_tick = False
self._batch_open_ns = 0
self._batch_next_close_ns = 0

self.interval = self._get_interval()
self.interval_ns = self._get_interval_ns()
self._set_build_timer()
self.next_close_ns = self._clock.next_time_ns(self._timer_name)

cdef datetime now = self._clock.utc_now()
self._stored_open_ns = dt_to_unix_nanos(self.get_start_time(now))
self._stored_close_ns = 0

def __str__(self):
return f"{type(self).__name__}(interval_ns={self.interval_ns}, next_close_ns={self.next_close_ns})"

Expand All @@ -771,8 +774,8 @@ cdef class TimeBarAggregator(BarAggregator):
if aggregation == BarAggregation.MILLISECOND:
start_time = now.floor(freq="s")

if self._time_bars_origin is not None:
start_time += self._time_bars_origin
if self._time_bars_origin_offset is not None:
start_time += self._time_bars_origin_offset

if now < start_time:
start_time -= pd.Timedelta(seconds=1)
Expand All @@ -784,8 +787,8 @@ cdef class TimeBarAggregator(BarAggregator):
elif aggregation == BarAggregation.SECOND:
start_time = now.floor(freq="min")

if self._time_bars_origin is not None:
start_time += self._time_bars_origin
if self._time_bars_origin_offset is not None:
start_time += self._time_bars_origin_offset

if now < start_time:
start_time -= pd.Timedelta(minutes=1)
Expand All @@ -797,8 +800,8 @@ cdef class TimeBarAggregator(BarAggregator):
elif aggregation == BarAggregation.MINUTE:
start_time = now.floor(freq="h")

if self._time_bars_origin is not None:
start_time += self._time_bars_origin
if self._time_bars_origin_offset is not None:
start_time += self._time_bars_origin_offset

if now < start_time:
start_time -= pd.Timedelta(hours=1)
Expand All @@ -810,8 +813,8 @@ cdef class TimeBarAggregator(BarAggregator):
elif aggregation == BarAggregation.HOUR:
start_time = now.floor(freq="d")

if self._time_bars_origin is not None:
start_time += self._time_bars_origin
if self._time_bars_origin_offset is not None:
start_time += self._time_bars_origin_offset

if now < start_time:
start_time -= pd.Timedelta(days=1)
Expand All @@ -823,24 +826,24 @@ cdef class TimeBarAggregator(BarAggregator):
elif aggregation == BarAggregation.DAY:
start_time = now.floor(freq="d")

if self._time_bars_origin is not None:
start_time += self._time_bars_origin
if self._time_bars_origin_offset is not None:
start_time += self._time_bars_origin_offset

if now < start_time:
start_time -= pd.Timedelta(days=1)
elif aggregation == BarAggregation.WEEK:
start_time = (now - pd.Timedelta(days=now.dayofweek)).floor(freq="d")

if self._time_bars_origin is not None:
start_time += self._time_bars_origin
if self._time_bars_origin_offset is not None:
start_time += self._time_bars_origin_offset

if now < start_time:
start_time -= pd.Timedelta(weeks=1)
elif aggregation == BarAggregation.MONTH:
start_time = (now - pd.DateOffset(months=now.month - 1, days=now.day - 1)).floor(freq="d")

if self._time_bars_origin is not None:
start_time += self._time_bars_origin
if self._time_bars_origin_offset is not None:
start_time += self._time_bars_origin_offset

if now < start_time:
start_time -= pd.DateOffset(years=1)
Expand Down Expand Up @@ -916,8 +919,8 @@ cdef class TimeBarAggregator(BarAggregator):
if start_time == now:
self._skip_first_non_full_bar = False

if self._add_delay:
start_time += timedelta(microseconds=self._composite_bar_build_delay)
start_time += timedelta(microseconds=self._bar_build_delay)
self._log.debug(f"Timer {start_time=}")

if self.bar_type.spec.aggregation != BarAggregation.MONTH:
self._clock.set_timer(
Expand Down
9 changes: 7 additions & 2 deletions nautilus_trader/data/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,12 @@ class DataEngineConfig(NautilusConfig, frozen=True):
If time bar aggregators will skip emitting a bar if the aggregation starts mid-interval.
time_bars_build_with_no_updates : bool, default True
If time bar aggregators will build and emit bars with no new market updates.
time_bars_origins : dict[BarAggregation, pd.Timedelta | pd.DateOffset], optional
time_bars_origin_offset : dict[BarAggregation, pd.Timedelta | pd.DateOffset], optional
A dictionary mapping time bar aggregations to their origin time offsets.
bar_build_delay : int, default 0
The time delay (microseconds) before building and emitting a composite bar type.
15 microseconds can be useful in a backtest context, when aggregating internal bars
from internal bars several times so all messages are processed before a timer triggers.
validate_data_sequence : bool, default False
If data objects timestamp sequencing will be validated and handled.
buffer_deltas : bool, default False
Expand All @@ -54,7 +58,8 @@ class DataEngineConfig(NautilusConfig, frozen=True):
time_bars_timestamp_on_close: bool = True
time_bars_skip_first_non_full_bar: bool = False
time_bars_build_with_no_updates: bool = True
time_bars_origins: dict | None = None
time_bars_origin_offset: dict | None = None
bar_build_delay: int = 0
validate_data_sequence: bool = False
buffer_deltas: bool = False
external_clients: list[ClientId] | None = None
Expand Down
3 changes: 2 additions & 1 deletion nautilus_trader/data/engine.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ cdef class DataEngine(Component):
cdef readonly bint _time_bars_timestamp_on_close
cdef readonly bint _time_bars_skip_first_non_full_bar
cdef readonly bint _time_bars_build_with_no_updates
cdef readonly dict[BarAggregation, object] _time_bars_origins # pd.Timedelta or pd.DateOffset
cdef readonly dict[BarAggregation, object] _time_bars_origin_offset # pd.Timedelta or pd.DateOffset
cdef readonly int _bar_build_delay
cdef readonly bint _validate_data_sequence
cdef readonly bint _buffer_deltas

Expand Down
13 changes: 11 additions & 2 deletions nautilus_trader/data/engine.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ cdef class DataEngine(Component):
self._time_bars_timestamp_on_close = config.time_bars_timestamp_on_close
self._time_bars_skip_first_non_full_bar = config.time_bars_skip_first_non_full_bar
self._time_bars_build_with_no_updates = config.time_bars_build_with_no_updates
self._time_bars_origins = config.time_bars_origins or {}
self._time_bars_origin_offset = config.time_bars_origin_offset or {}
self._bar_build_delay = config.bar_build_delay
self._validate_data_sequence = config.validate_data_sequence
self._buffer_deltas = config.buffer_deltas

Expand Down Expand Up @@ -2251,6 +2252,12 @@ cdef class DataEngine(Component):

cpdef object _create_bar_aggregator(self, Instrument instrument, BarType bar_type):
if bar_type.spec.is_time_aggregated():
# Use configured bar_build_delay, with special handling for composite bars
bar_build_delay = self._bar_build_delay

if bar_type.is_composite() and bar_type.composite().is_internally_aggregated() and bar_build_delay == 0:
bar_build_delay = 15 # Default for composite bars when config is 0

aggregator = TimeBarAggregator(
instrument=instrument,
bar_type=bar_type,
Expand All @@ -2260,7 +2267,8 @@ cdef class DataEngine(Component):
timestamp_on_close=self._time_bars_timestamp_on_close,
skip_first_non_full_bar=self._time_bars_skip_first_non_full_bar,
build_with_no_updates=self._time_bars_build_with_no_updates,
time_bars_origin=self._time_bars_origins.get(bar_type.spec.aggregation),
time_bars_origin_offset=self._time_bars_origin_offset.get(bar_type.spec.aggregation),
bar_build_delay=bar_build_delay,
)
elif bar_type.spec.aggregation == BarAggregation.TICK:
aggregator = TickBarAggregator(
Expand Down Expand Up @@ -2291,6 +2299,7 @@ cdef class DataEngine(Component):

cpdef void _start_bar_aggregator(self, MarketDataClient client, SubscribeBars command):
cdef Instrument instrument = self._cache.instrument(command.bar_type.instrument_id)

if instrument is None:
self._log.error(
f"Cannot start bar aggregation: "
Expand Down
2 changes: 1 addition & 1 deletion tests/unit_tests/backtest/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ def test_backtest_run_config_id(self) -> None:
TestConfigStubs.backtest_engine_config,
("catalog",),
{"persist": True},
("f39654b91400406374e3e4e3e7ff433e890adcae5f623d1ef6af7f823ce88449",),
("f7d56cc577f56aa583466d1d61dbcc8378f44109cf51ce6fa7235f490150560b",),
),
(
TestConfigStubs.risk_engine_config,
Expand Down
Loading
Loading