Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 115 additions & 55 deletions core/src/banking_stage/consume_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,9 @@ pub(crate) mod external {
},
agave_scheduler_bindings::{
pack_message_flags,
worker_message_types::{not_included_reasons, ExecutionResponse, Resolved},
worker_message_types::{
not_included_reasons, resolve_flags, CheckResponse, ExecutionResponse,
},
PackToWorkerMessage, SharablePubkeys, TransactionResponseRegion, WorkerToPackMessage,
MAX_TRANSACTIONS_PER_MESSAGE,
},
Expand Down Expand Up @@ -303,18 +305,23 @@ pub(crate) mod external {
should_drain_executes: bool,
) -> Result<bool, ExternalConsumeWorkerError> {
if !Self::validate_message(message) {
return self.return_invalid_message(message).map(|()| false);
return self
.return_unprocessed_message(
message,
agave_scheduler_bindings::processed_codes::INVALID_MESSAGE,
)
.map(|()| false);
}

self.metrics
.count_metrics
.num_messages_processed
.fetch_add(1, Ordering::Relaxed);

match message.flags {
pack_message_flags::NONE => self.execute_batch(message, should_drain_executes),
pack_message_flags::RESOLVE => self.resolve_batch(message).map(|()| false),
_ => unreachable!("flags verified earlier"),
if message.flags & agave_scheduler_bindings::pack_message_flags::EXECUTE == 1 {
self.execute_batch(message, should_drain_executes)
} else {
self.resolve_batch(message).map(|()| false)
}
}

Expand Down Expand Up @@ -353,11 +360,11 @@ pub(crate) mod external {
let bank = leader_state
.working_bank()
.expect("active_leader_state_with_timeout should only return an active bank");
if bank.slot() > message.max_execution_slot {
if bank.slot() > message.max_working_slot {
return self
.return_not_included_with_reason(
.return_unprocessed_message(
message,
not_included_reasons::SLOT_MISMATCH,
agave_scheduler_bindings::processed_codes::MAX_WORKING_SLOT_EXCEEDED,
)
.map(|()| false);
}
Expand Down Expand Up @@ -388,7 +395,7 @@ pub(crate) mod external {
else {
// If already ON the last possible execution slot,
// immediately give up instead of trying on next slot.
if bank.slot() == message.max_execution_slot {
if bank.slot() == message.max_working_slot {
break;
}
continue; // recording failed, try again on next slot if possible.
Expand All @@ -406,7 +413,7 @@ pub(crate) mod external {
.ok_or(ExternalConsumeWorkerError::AllocationFailure)?;
let response = WorkerToPackMessage {
batch: message.batch,
processed: agave_scheduler_bindings::PROCESSED,
processed_code: agave_scheduler_bindings::processed_codes::MESSAGE_PROCESSED,
responses,
};

Expand Down Expand Up @@ -449,7 +456,7 @@ pub(crate) mod external {

let response = WorkerToPackMessage {
batch: message.batch,
processed: agave_scheduler_bindings::PROCESSED,
processed_code: agave_scheduler_bindings::processed_codes::MESSAGE_PROCESSED,
responses,
};

Expand Down Expand Up @@ -499,7 +506,8 @@ pub(crate) mod external {
&self,
batch: TransactionPtrBatch,
bank: &Bank,
) -> Result<impl ExactSizeIterator<Item = Resolved>, ExternalConsumeWorkerError> {
) -> Result<impl ExactSizeIterator<Item = CheckResponse>, ExternalConsumeWorkerError>
{
let enable_static_instruction_limit = bank
.feature_set
.is_active(&agave_feature_set::static_instruction_limit::ID);
Expand Down Expand Up @@ -594,7 +602,7 @@ pub(crate) mod external {

let response_message = WorkerToPackMessage {
batch: message.batch,
processed: agave_scheduler_bindings::PROCESSED,
processed_code: agave_scheduler_bindings::processed_codes::MESSAGE_PROCESSED,
responses: response_region,
};

Expand All @@ -611,13 +619,18 @@ pub(crate) mod external {
Ok(())
}

fn return_invalid_message(
fn return_unprocessed_message(
&mut self,
message: &PackToWorkerMessage,
processed_code: u8,
) -> Result<(), ExternalConsumeWorkerError> {
let invalid_message = WorkerToPackMessage {
assert_ne!(
processed_code,
agave_scheduler_bindings::processed_codes::MESSAGE_PROCESSED
);
let response = WorkerToPackMessage {
batch: message.batch,
processed: agave_scheduler_bindings::NOT_PROCESSED,
processed_code,
responses: TransactionResponseRegion {
tag: 0,
num_transaction_responses: 0,
Expand All @@ -632,7 +645,7 @@ pub(crate) mod external {

// SAFETY: `reserve` guarantees a properly aligned space
// for a `WorkerToPackMessage`
unsafe { send_ptr.write(invalid_message) };
unsafe { send_ptr.write(response) };

Ok(())
}
Expand Down Expand Up @@ -696,27 +709,45 @@ pub(crate) mod external {
fn resolved_pubkeys_to_response(
resolving_result: Result<Option<(SharablePubkeys, u64)>, ()>,
slot: Slot,
) -> Resolved {
) -> CheckResponse {
let resolve_flags = resolve_flags::REQUESTED | resolve_flags::PERFORMED;

match resolving_result {
Ok(Some((resolved_pubkeys, min_alt_deactivation_slot))) => Resolved {
success: agave_scheduler_bindings::worker_message_types::RESOLVE_SUCCESS,
slot,
Ok(Some((resolved_pubkeys, min_alt_deactivation_slot))) => CheckResponse {
parsing_and_sanitization_flags: 0,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parsing_and_sanitization_flags: needs to be set to success now right?

status_check_flags: 0,
fee_payer_balance_flags: 0,
resolve_flags,
included_slot: 0,
balance_slot: 0,
fee_payer_balance: 0,
resolution_slot: slot,
min_alt_deactivation_slot,
resolved_pubkeys,
},
_ => Resolved {
success: if resolving_result.is_ok() {
agave_scheduler_bindings::worker_message_types::RESOLVE_SUCCESS

_ => {
let resolve_flags = if resolving_result.is_err() {
resolve_flags | resolve_flags::FAILED
} else {
agave_scheduler_bindings::worker_message_types::RESOLVE_FAILURE
},
slot,
min_alt_deactivation_slot: u64::MAX,
resolved_pubkeys: SharablePubkeys {
offset: 0,
num_pubkeys: 0,
},
},
resolve_flags
};
CheckResponse {
parsing_and_sanitization_flags: 0,
status_check_flags: 0,
fee_payer_balance_flags: 0,
resolve_flags,
included_slot: 0,
balance_slot: 0,
fee_payer_balance: 0,
resolution_slot: slot,
min_alt_deactivation_slot: 0,
resolved_pubkeys: SharablePubkeys {
offset: 0,
num_pubkeys: 0,
},
}
}
}
}

Expand Down Expand Up @@ -744,7 +775,10 @@ pub(crate) mod external {
}

fn validate_message_flags(flags: u16) -> bool {
flags == pack_message_flags::NONE || flags == pack_message_flags::RESOLVE
flags == pack_message_flags::EXECUTE
|| (flags
== (pack_message_flags::CHECK
| pack_message_flags::check_flags::RESOLVE_PUBKEYS))
}

fn response_from_commit_details(
Expand Down Expand Up @@ -791,13 +825,16 @@ pub(crate) mod external {

#[cfg(test)]
mod tests {
use {super::*, solana_system_transaction::transfer, solana_transaction::TransactionError};
use {
super::*, agave_scheduler_bindings::worker_message_types::resolve_flags,
solana_system_transaction::transfer, solana_transaction::TransactionError,
};

#[test]
fn test_validate_message() {
let mut message = PackToWorkerMessage {
flags: agave_scheduler_bindings::pack_message_flags::NONE,
max_execution_slot: u64::MAX,
flags: agave_scheduler_bindings::pack_message_flags::EXECUTE,
max_working_slot: u64::MAX,
batch: agave_scheduler_bindings::SharableTransactionBatchRegion {
num_transactions: 0,
transactions_offset: 0,
Expand All @@ -816,20 +853,21 @@ pub(crate) mod external {
message.flags = u16::MAX;
assert!(!ExternalWorker::validate_message(&message));

message.flags = pack_message_flags::NONE;
message.flags = pack_message_flags::EXECUTE;
assert!(ExternalWorker::validate_message(&message));
}

#[test]
fn test_validate_message_flags() {
assert!(ExternalWorker::validate_message_flags(
pack_message_flags::NONE
pack_message_flags::EXECUTE
));
assert!(ExternalWorker::validate_message_flags(
pack_message_flags::RESOLVE
pack_message_flags::CHECK
| agave_scheduler_bindings::pack_message_flags::check_flags::RESOLVE_PUBKEYS
));
assert!(!ExternalWorker::validate_message_flags(
pack_message_flags::RESOLVE + 1
pack_message_flags::CHECK
))
}

Expand Down Expand Up @@ -921,28 +959,44 @@ pub(crate) mod external {
const TEST_SLOT: Slot = 7;
assert_eq!(
ExternalWorker::resolved_pubkeys_to_response(Err(()), TEST_SLOT),
Resolved {
success: agave_scheduler_bindings::worker_message_types::RESOLVE_FAILURE,
slot: TEST_SLOT,
min_alt_deactivation_slot: u64::MAX,
CheckResponse {
parsing_and_sanitization_flags: 0,
status_check_flags: 0,
fee_payer_balance_flags: 0,
resolve_flags: resolve_flags::REQUESTED
| resolve_flags::PERFORMED
| resolve_flags::FAILED,
included_slot: 0,
balance_slot: 0,
fee_payer_balance: 0,
resolution_slot: TEST_SLOT,
min_alt_deactivation_slot: 0,
resolved_pubkeys: SharablePubkeys {
offset: 0,
num_pubkeys: 0
}
}
);

assert_eq!(
ExternalWorker::resolved_pubkeys_to_response(Ok(None), TEST_SLOT),
Resolved {
success: agave_scheduler_bindings::worker_message_types::RESOLVE_SUCCESS,
slot: TEST_SLOT,
min_alt_deactivation_slot: u64::MAX,
CheckResponse {
parsing_and_sanitization_flags: 0,
status_check_flags: 0,
fee_payer_balance_flags: 0,
resolve_flags: resolve_flags::REQUESTED | resolve_flags::PERFORMED,
included_slot: 0,
balance_slot: 0,
fee_payer_balance: 0,
resolution_slot: TEST_SLOT,
min_alt_deactivation_slot: 0,
resolved_pubkeys: SharablePubkeys {
offset: 0,
num_pubkeys: 0
}
}
},
);

let resolved_pubkeys = SharablePubkeys {
offset: 256,
num_pubkeys: 21,
Expand All @@ -952,12 +1006,18 @@ pub(crate) mod external {
Ok(Some((resolved_pubkeys, 120))),
TEST_SLOT
),
Resolved {
success: agave_scheduler_bindings::worker_message_types::RESOLVE_SUCCESS,
slot: TEST_SLOT,
CheckResponse {
parsing_and_sanitization_flags: 0,
status_check_flags: 0,
fee_payer_balance_flags: 0,
resolve_flags: resolve_flags::REQUESTED | resolve_flags::PERFORMED,
included_slot: 0,
balance_slot: 0,
fee_payer_balance: 0,
resolution_slot: TEST_SLOT,
min_alt_deactivation_slot: 120,
resolved_pubkeys
}
},
);
}

Expand Down
Loading
Loading