Skip to content

Commit

Permalink
parallel collation refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
SmaGMan committed Dec 31, 2023
1 parent cd2feba commit 0440c96
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 32 deletions.
5 changes: 5 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ pub struct CollatorConfig {
pub finalize_empty_after_ms: u32,
pub empty_collation_sleep_ms: u32
}
impl CollatorConfig {
pub fn get_finalize_parallel_timeout_ms(&self) -> u32 {
self.stop_timeout_ms * self.finalize_parallel_percentage_points / 1000
}
}
impl Default for CollatorConfig {
fn default() -> Self {
Self {
Expand Down
62 changes: 30 additions & 32 deletions src/validator/collator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,6 @@ impl PartialOrd for NewMessage {
}
}

type TxLastLt = u64;

struct CollatorData {
// lists, empty by default
in_msgs: InMsgDescr,
Expand Down Expand Up @@ -265,7 +263,7 @@ struct CollatorData {
imported_visited: HashSet<UInt256>,
/// * key - msg_sync_key
/// * value - last account lt after msg processing
tx_last_lt_buffer: HashMap<usize, TxLastLt>,
tx_last_lt_buffer: HashMap<usize, u64>,

// determined fields
gen_utime: u32,
Expand Down Expand Up @@ -462,7 +460,7 @@ impl CollatorData {
}

/// Stores transaction last LT to buffer by src msg, updates block_limit_status
fn add_tx_last_lt_to_buffer(&mut self, src_msg_sync_key: usize, tx_last_lt: TxLastLt) {
fn add_tx_last_lt_to_buffer(&mut self, src_msg_sync_key: usize, tx_last_lt: u64) {
self.tx_last_lt_buffer.insert(src_msg_sync_key, tx_last_lt);
self.block_limit_status.update_lt(tx_last_lt, false);
}
Expand All @@ -471,7 +469,7 @@ impl CollatorData {
self.tx_last_lt_buffer.remove(src_msg_sync_key);
}
/// Saves max transaction last LT to block_limit_status and returns value
fn commit_tx_last_lt(&mut self) -> Option<TxLastLt> {
fn commit_tx_last_lt(&mut self) -> Option<u64> {
if let Some(max_lt) = self.tx_last_lt_buffer.values().reduce(|curr, next| curr.max(next)) {
self.block_limit_status.update_lt(*max_lt, true);
Some(*max_lt)
Expand Down Expand Up @@ -1096,9 +1094,10 @@ struct ExecutionManager {
accounts_processed_msgs: HashMap<AccountId, Vec<usize>>,

cancellation_token: tokio_util::sync::CancellationToken,
f_check_finalize_parallel_timeout: Box<dyn Fn() -> (bool, u32) + Send>,

receive_tr: tokio::sync::mpsc::UnboundedReceiver<Option<(Arc<AsyncMessageSync>, Result<Transaction>, TxLastLt)>>,
wait_tr: Arc<Wait<(Arc<AsyncMessageSync>, Result<Transaction>, TxLastLt)>>,
receive_tr: tokio::sync::mpsc::UnboundedReceiver<Option<(Arc<AsyncMessageSync>, Result<Transaction>, u64)>>,
wait_tr: Arc<Wait<(Arc<AsyncMessageSync>, Result<Transaction>, u64)>>,
max_collate_threads: usize,
libraries: Libraries,
gen_utime: u32,
Expand Down Expand Up @@ -1140,6 +1139,7 @@ impl ExecutionManager {
max_collate_msgs_queue_on_account: usize,
collated_block_descr: Arc<String>,
debug: bool,
f_check_finalize_parallel_timeout: Box<dyn Fn() -> (bool, u32) + Send>,
) -> Result<Self> {
log::trace!("{}: ExecutionManager::new", collated_block_descr);
let (wait_tr, receive_tr) = Wait::new();
Expand All @@ -1148,6 +1148,7 @@ impl ExecutionManager {
msgs_queue: Vec::new(),
accounts_processed_msgs: HashMap::new(),
cancellation_token: tokio_util::sync::CancellationToken::new(),
f_check_finalize_parallel_timeout,
receive_tr,
wait_tr,
max_collate_threads,
Expand Down Expand Up @@ -1178,7 +1179,6 @@ impl ExecutionManager {
pub async fn wait_transactions(
&mut self,
collator_data: &mut CollatorData,
check_finalize_parallel_timeout: impl Fn() -> (bool, u32),
) -> Result<()> {
log::trace!("{}: wait_transactions", self.collated_block_descr);
if self.is_parallel_processing_cancelled() {
Expand All @@ -1190,7 +1190,7 @@ impl ExecutionManager {
self.wait_transaction(collator_data).await?;

// stop parallel collation if finalize timeout reached
let check_finalize_parallel = check_finalize_parallel_timeout();
let check_finalize_parallel = (self.f_check_finalize_parallel_timeout)();
if check_finalize_parallel.0 {
log::warn!("{}: FINALIZE PARALLEL TIMEOUT ({}ms) is elapsed, stop parallel collation",
self.collated_block_descr, check_finalize_parallel.1,
Expand Down Expand Up @@ -1426,7 +1426,7 @@ impl ExecutionManager {
&mut self,
new_msg_sync: &AsyncMessageSync,
transaction_res: Result<Transaction>,
tx_last_lt: TxLastLt,
tx_last_lt: u64,
collator_data: &mut CollatorData
) -> Result<AccountId> {
let AsyncMessageSync(msg_sync_key, new_msg) = new_msg_sync;
Expand Down Expand Up @@ -1584,9 +1584,8 @@ impl ExecutionManager {
}
}
}
msgs_to_revert.reverse();
metrics::gauge!("reverted_transactions", msgs_to_revert.len() as f64);
for (account_addr, msg_sync_key, msg_hash) in msgs_to_revert {
for (account_addr, msg_sync_key, msg_hash) in msgs_to_revert.into_iter().rev() {
if let Some(msg_info) = self.msgs_queue.get_mut(msg_sync_key) {
msg_info.1 = false;
}
Expand Down Expand Up @@ -1755,16 +1754,14 @@ impl Collator {

let rand_seed = rand_seed.unwrap_or_else(|| secure_256_bits().into());

let finalize_parallel_timeout_ms =
engine.collator_config().stop_timeout_ms * engine.collator_config().finalize_parallel_percentage_points / 1000;

Ok(Self {
new_block_id_part: BlockIdExt {
shard_id: shard.clone(),
seq_no: new_block_seqno,
root_hash: UInt256::default(),
file_hash: UInt256::default(),
},
finalize_parallel_timeout_ms: engine.collator_config().get_finalize_parallel_timeout_ms(),
engine,
shard,
min_mc_seqno,
Expand All @@ -1779,7 +1776,6 @@ impl Collator {
collator_settings,
started: Instant::now(),
stop_flag: Arc::new(AtomicBool::new(false)),
finalize_parallel_timeout_ms,
#[cfg(test)]
test_msg_process_sleep: 0,
})
Expand Down Expand Up @@ -2056,6 +2052,14 @@ impl Collator {
// compute created / minted / recovered / from_prev_blk
self.update_value_flow(mc_data, &prev_data, collator_data)?;

// closure to check the finalize timeout for parallel transactions
let collation_started = self.started.clone();
let finalize_parallel_timeout_ms = self.finalize_parallel_timeout_ms;
let check_finilize_parallel_timeout_closure = move || (
collation_started.elapsed().as_millis() as u32 > finalize_parallel_timeout_ms,
finalize_parallel_timeout_ms,
);

let mut exec_manager = ExecutionManager::new(
collator_data.gen_utime(),
collator_data.start_lt()?,
Expand All @@ -2068,6 +2072,7 @@ impl Collator {
self.engine.collator_config().max_collate_msgs_queue_on_account as usize,
self.collated_block_descr.clone(),
self.debug,
Box::new(check_finilize_parallel_timeout_closure),
)?;

#[cfg(test)]
Expand Down Expand Up @@ -2115,8 +2120,7 @@ impl Collator {
// (all new messages were queued already, we remove them if we process them)
let now = std::time::Instant::now();
if collator_data.inbound_queues_empty {
self.process_new_messages(!collator_data.inbound_queues_empty, prev_data,
collator_data, &mut exec_manager).await?;
self.process_new_messages(prev_data, collator_data, &mut exec_manager).await?;
}
log::debug!("{}: TIME: process_new_messages {}ms;",
self.collated_block_descr, now.elapsed().as_millis());
Expand Down Expand Up @@ -2172,10 +2176,6 @@ impl Collator {
true, mc_data, prev_data, collator_data, &mut exec_manager).await?;
}

// DO NOT NEED THIS - all new messages were queued already
//// process newly-generated messages (only by including them into output queue)
//self.process_new_messages(true, prev_data, collator_data, &mut exec_manager).await?;

// If block is empty - stop collation to try one more time (may be there are some new messages)
let cc = self.engine.collator_config();
if !self.after_split &&
Expand Down Expand Up @@ -2868,7 +2868,7 @@ impl Collator {
}
self.create_ticktock_transaction(config_account_id, tock, prev_data, collator_data,
exec_manager).await?;
exec_manager.wait_transactions(collator_data, || self.check_finilize_parallel_timeout()).await?;
exec_manager.wait_transactions(collator_data).await?;
Ok(())
}

Expand Down Expand Up @@ -2937,7 +2937,7 @@ impl Collator {
exec_manager
).await?;

exec_manager.wait_transactions(collator_data, || self.check_finilize_parallel_timeout()).await?;
exec_manager.wait_transactions(collator_data).await?;

Ok(())
}
Expand Down Expand Up @@ -3113,7 +3113,7 @@ impl Collator {
}
self.check_stop_flag()?;
}
exec_manager.wait_transactions(collator_data, || self.check_finilize_parallel_timeout()).await?;
exec_manager.wait_transactions(collator_data).await?;
let (accepted, rejected) = collator_data.withdraw_ext_msg_statuses();
self.engine.complete_external_messages(
rejected.into_iter().map(|(id, _)| id).collect(),
Expand Down Expand Up @@ -3173,17 +3173,15 @@ impl Collator {
}
self.check_stop_flag()?;
}
exec_manager.wait_transactions(collator_data, || self.check_finilize_parallel_timeout()).await?;
exec_manager.wait_transactions(collator_data).await?;
let (accepted, rejected) = collator_data.withdraw_ext_msg_statuses();
let processed = accepted.len() + rejected.len();
collator_data.set_remp_msg_statuses(accepted, rejected, ignored);
Ok(processed)
}

//TODO: this method does no any useful work when enqueue_only == true
async fn process_new_messages(
&self,
enqueue_only: bool,
prev_data: &PrevData,
collator_data: &mut CollatorData,
exec_manager: &mut ExecutionManager,
Expand Down Expand Up @@ -3220,8 +3218,8 @@ impl Collator {
break;
}

if enqueue_only || !self.shard.contains_address(&info.dst)? {
// everything was made in new_transaction
if !self.shard.contains_address(&info.dst)? {
// skip msg if it is not to our shard
} else {
CHECK!(info.created_at.as_u32(), collator_data.gen_utime);

Expand All @@ -3237,7 +3235,7 @@ impl Collator {
};
self.check_stop_flag()?;
}
exec_manager.wait_transactions(collator_data, || self.check_finilize_parallel_timeout()).await?;
exec_manager.wait_transactions(collator_data).await?;
self.check_stop_flag()?;
}

Expand Down Expand Up @@ -3354,7 +3352,7 @@ impl Collator {

self.check_stop_flag()?;
}
exec_manager.wait_transactions(collator_data, || self.check_finilize_parallel_timeout()).await?;
exec_manager.wait_transactions(collator_data).await?;

return Ok(new_state_copyleft_rewards)
}
Expand Down

0 comments on commit 0440c96

Please sign in to comment.