diff --git a/Cargo.toml b/Cargo.toml old mode 100644 new mode 100755 index e31a2e00..acc3a6e7 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,20 +47,20 @@ stream-cancel = '0.8.0' string-builder = '^0.2.0' tokio = { features = [ 'rt-multi-thread' ], version = '1.5' } tokio-util = '0.7' -adnl = { features = [ 'client', 'node', 'server' ], git = 'https://github.com/tonlabs/ever-adnl.git', tag = '0.9.15' } +adnl = { features = [ 'client', 'node', 'server' ], git = 'https://github.com/tonlabs/ever-adnl.git', tag = '0.9.18' } catchain = { path = 'catchain' } -dht = { git = 'https://github.com/tonlabs/ever-dht.git', tag = '0.6.79' } +dht = { git = 'https://github.com/tonlabs/ever-dht.git', tag = '0.6.82' } lockfree = { git = 'https://github.com/tonlabs/lockfree.git' } -overlay = { git = 'https://github.com/tonlabs/ever-overlay.git', tag = '0.7.11' } -rldp = { git = 'https://github.com/tonlabs/ever-rldp.git', tag = '0.8.8' } +overlay = { git = 'https://github.com/tonlabs/ever-overlay.git', tag = '0.7.14' } +rldp = { git = 'https://github.com/tonlabs/ever-rldp.git', tag = '0.8.11' } storage = { path = 'storage' } -ton_abi = { git = 'https://github.com/tonlabs/ever-abi.git', optional = true, tag = '2.4.8' } -ton_api = { git = 'https://github.com/tonlabs/ever-tl.git', package = 'ton_api', tag = '0.3.53' } -ton_block = { git = 'https://github.com/tonlabs/ever-block.git', tag = '1.9.117' } -ton_block_json = { git = 'https://github.com/tonlabs/ever-block-json.git', tag = '0.7.205' } -ton_executor = { git = 'https://github.com/tonlabs/ever-executor.git', tag = '1.16.97' } +ton_abi = { git = 'https://github.com/tonlabs/ever-abi.git', optional = true, tag = '2.4.11' } +ton_api = { git = 'https://github.com/tonlabs/ever-tl.git', package = 'ton_api', tag = '0.3.55' } +ton_block = { git = 'https://github.com/tonlabs/ever-block.git', tag = '1.9.119' } +ton_block_json = { git = 'https://github.com/tonlabs/ever-block-json.git', tag = '0.7.207' } +ton_executor = { git = 'https://github.com/tonlabs/ever-executor.git', tag = '1.16.99' } ton_types = { git = 'https://github.com/tonlabs/ever-types.git', tag = '2.0.31' } -ton_vm = { git = 'https://github.com/tonlabs/ever-vm.git', tag = '1.8.225' } +ton_vm = { git = 'https://github.com/tonlabs/ever-vm.git', tag = '1.8.227' } validator_session = { path = 'validator-session' } [features] diff --git a/catchain/Cargo.toml b/catchain/Cargo.toml index 947aacd2..9d7bb901 100644 --- a/catchain/Cargo.toml +++ b/catchain/Cargo.toml @@ -19,11 +19,11 @@ quanta = '0.11.1' rand = '0.8' regex = '1.3.1' tokio = { features = [ 'rt-multi-thread' ], version = '1.5' } -adnl = { features = [ 'node' ], git = 'https://github.com/tonlabs/ever-adnl.git', tag = '0.9.15' } -overlay = { git = 'https://github.com/tonlabs/ever-overlay.git', tag = '0.7.11' } -rldp = { git = 'https://github.com/tonlabs/ever-rldp.git', tag = '0.8.8' } +adnl = { features = [ 'node' ], git = 'https://github.com/tonlabs/ever-adnl.git', tag = '0.9.18' } +overlay = { git = 'https://github.com/tonlabs/ever-overlay.git', tag = '0.7.14' } +rldp = { git = 'https://github.com/tonlabs/ever-rldp.git', tag = '0.8.11' } storage = { path = '../storage' } -ton_api = { git = 'https://github.com/tonlabs/ever-tl.git', package = 'ton_api', tag = '0.3.53' } +ton_api = { git = 'https://github.com/tonlabs/ever-tl.git', package = 'ton_api', tag = '0.3.55' } ton_types = { git = 'https://github.com/tonlabs/ever-types.git', tag = '2.0.31' } [features] diff --git a/src/config.rs b/src/config.rs index 059bd60a..89200080 100644 --- a/src/config.rs +++ b/src/config.rs @@ -80,23 +80,32 @@ impl Default for CellsGcConfig { pub struct CollatorConfig { pub cutoff_timeout_ms: u32, pub stop_timeout_ms: u32, + pub finalize_parallel_percentage_points: u32, pub clean_timeout_percentage_points: u32, pub optimistic_clean_percentage_points: u32, pub max_secondary_clean_timeout_percentage_points: u32, pub max_collate_threads: u32, + pub max_collate_msgs_queue_on_account: u32, pub retry_if_empty: bool, pub finalize_empty_after_ms: u32, pub empty_collation_sleep_ms: u32 } +impl CollatorConfig { + pub fn get_finalize_parallel_timeout_ms(&self) -> u32 { + self.stop_timeout_ms * self.finalize_parallel_percentage_points / 1000 + } +} impl Default for CollatorConfig { fn default() -> Self { Self { cutoff_timeout_ms: 1000, stop_timeout_ms: 1500, + finalize_parallel_percentage_points: 800, // 0.8 = 80% * stop_timeout_ms = 1200 clean_timeout_percentage_points: 150, // 0.150 = 15% = 150ms optimistic_clean_percentage_points: 1000, // 1.000 = 100% = 150ms max_secondary_clean_timeout_percentage_points: 350, // 0.350 = 35% = 350ms max_collate_threads: 10, + max_collate_msgs_queue_on_account: 3, retry_if_empty: false, finalize_empty_after_ms: 800, empty_collation_sleep_ms: 100 diff --git a/src/types/accounts.rs b/src/types/accounts.rs index 8df24471..652dc768 100644 --- a/src/types/accounts.rs +++ b/src/types/accounts.rs @@ -11,31 +11,47 @@ * limitations under the License. */ -use std::sync::{atomic::AtomicU64, Arc}; use ton_block::{ Account, AccountBlock, Augmentation, CopyleftRewards, Deserializable, HashUpdate, HashmapAugType, LibDescr, Libraries, Serializable, ShardAccount, ShardAccounts, StateInitLib, Transaction, Transactions, }; -use ton_types::{fail, AccountId, Cell, HashmapRemover, Result, UInt256}; +use ton_types::{error, fail, AccountId, Cell, HashmapRemover, Result, UInt256, SliceData}; pub struct ShardAccountStuff { account_addr: AccountId, account_root: Cell, last_trans_hash: UInt256, last_trans_lt: u64, - lt: Arc, - transactions: Transactions, + lt: u64, + transactions: Option, state_update: HashUpdate, - orig_libs: StateInitLib, - copyleft_rewards: CopyleftRewards, + orig_libs: Option, + copyleft_rewards: Option, + + /// * Sync key of message, which updated account state + /// * It is an incremental counter set by executor + update_msg_sync_key: Option, + + // /// * Executor sets transaction that updated account to current state + // /// * Initial account state contains None + // last_transaction: Option<(Cell, CurrencyCollection)>, + + /// LT of transaction, which updated account state + update_trans_lt: Option, + + /// The copyleft_reward of transaction, which updated account state (if exists) + update_copyleft_reward_address: Option, + + /// Executor stores prevoius account state + prev_account_stuff: Option>, } impl ShardAccountStuff { pub fn new( account_addr: AccountId, shard_acc: ShardAccount, - lt: Arc, + lt: u64, ) -> Result { let account_hash = shard_acc.account_cell().repr_hash(); let account_root = shard_acc.account_cell(); @@ -43,16 +59,64 @@ impl ShardAccountStuff { let last_trans_lt = shard_acc.last_trans_lt(); Ok(Self{ account_addr, - orig_libs: shard_acc.read_account()?.libraries(), + orig_libs: Some(shard_acc.read_account()?.libraries()), account_root, last_trans_hash, last_trans_lt, lt, - transactions: Transactions::default(), + transactions: Some(Transactions::default()), state_update: HashUpdate::with_hashes(account_hash.clone(), account_hash), - copyleft_rewards: CopyleftRewards::default(), + copyleft_rewards: Some(CopyleftRewards::default()), + + update_msg_sync_key: None, + //last_transaction: None, + update_trans_lt: None, + update_copyleft_reward_address: None, + prev_account_stuff: None, }) } + /// Returns: + /// * None - if no updates or no matching records in history + /// * Some(particular) - record from history that matches update_msg_sync_key == on_msg_sync_key + pub fn commit(mut self, on_msg_sync_key: usize) -> Result> { + while let Some(current_update_msg_sync_key) = self.update_msg_sync_key { + if current_update_msg_sync_key == on_msg_sync_key { + log::debug!("account {:x} state committed by processed message {} in the queue", self.account_addr(), on_msg_sync_key); + return Ok(Some(self)); + } else { + if !self.revert()? { + log::debug!("unable to revert account {:x} state, current state is a first in history", self.account_addr()); + return Ok(None); + } else { + log::debug!("account {:x} state reverted one step back to message {:?} in the queue", self.account_addr(), self.update_msg_sync_key); + } + } + } + Ok(None) + } + fn revert(&mut self) -> Result { + let mut taked_prev = match self.prev_account_stuff.take() { + Some(prev) => prev, + None => return Ok(false), + }; + let prev = taked_prev.as_mut(); + + prev.orig_libs = self.orig_libs.take(); + + prev.transactions = self.transactions.take(); + if let Some(update_trans_lt) = self.update_trans_lt { + prev.remove_trans(update_trans_lt)?; + } + + prev.copyleft_rewards = self.copyleft_rewards.take(); + if let Some(update_copyleft_reward_address) = self.update_copyleft_reward_address.as_ref() { + prev.remove_copyleft_reward(update_copyleft_reward_address)?; + } + + std::mem::swap(self, prev); + + Ok(true) + } pub fn update_shard_state(&mut self, new_accounts: &mut ShardAccounts) -> Result { let account = self.read_account()?; if account.is_none() { @@ -62,10 +126,10 @@ impl ShardAccountStuff { let value = shard_acc.write_to_new_cell()?; new_accounts.set_builder_serialized(self.account_addr().clone(), &value, &account.aug()?)?; } - AccountBlock::with_params(&self.account_addr, &self.transactions, &self.state_update) + AccountBlock::with_params(&self.account_addr, self.transactions()?, &self.state_update) } - pub fn lt(&self) -> Arc { - self.lt.clone() + pub fn lt(&self) -> u64 { + self.lt } pub fn read_account(&self) -> Result { Account::construct_from_cell(self.account_root()) @@ -79,30 +143,106 @@ impl ShardAccountStuff { pub fn account_addr(&self) -> &AccountId { &self.account_addr } - pub fn copyleft_rewards(&self) -> &CopyleftRewards { - &self.copyleft_rewards + pub fn copyleft_rewards(&self) -> Result<&CopyleftRewards> { + self.copyleft_rewards.as_ref() + .ok_or_else(|| error!( + "`copyleft_rewards` field is None, possibly you try access a not root record in the history, run commit() before" + )) + } + fn copyleft_rewards_mut(&mut self) -> Result<&mut CopyleftRewards> { + self.copyleft_rewards.as_mut() + .ok_or_else(|| error!( + "`copyleft_rewards` field is None, possibly you try access a not root record in the history, run commit() before" + )) + } + fn remove_copyleft_reward(&mut self, address: &AccountId) -> Result { + self.copyleft_rewards_mut()?.remove(address) + } + + fn transactions(&self) -> Result<&Transactions> { + self.transactions.as_ref() + .ok_or_else(|| error!( + "`transactions` field is None, possibly you try access a not root record in the history, run commit() before" + )) + } + fn transactions_mut(&mut self) -> Result<&mut Transactions> { + self.transactions.as_mut() + .ok_or_else(|| error!( + "`transactions` field is None, possibly you try access a not root record in the history, run commit() before" + )) + } + fn remove_trans(&mut self, trans_lt: u64) -> Result<()> { + let key = SliceData::load_builder(trans_lt.write_to_new_cell()?)?; + self.transactions_mut()?.remove(key)?; + Ok(()) + } + + fn orig_libs(&self) -> Result<&StateInitLib> { + self.orig_libs.as_ref() + .ok_or_else(|| error!( + "`orig_libs` field is None, possibly you try access a not root record in the history, run commit() before" + )) + } + + pub fn apply_transaction_res( + &mut self, + update_msg_sync_key: usize, + tx_last_lt: u64, + transaction_res: &mut Result, + account_root: Cell, + ) -> Result<()> { + let mut res = ShardAccountStuff { + account_addr: self.account_addr.clone(), + account_root: self.account_root.clone(), + last_trans_hash: self.last_trans_hash.clone(), + last_trans_lt: self.last_trans_lt, + lt: tx_last_lt, // 1014 or 1104 or 1024 + transactions: self.transactions.take(), + state_update: self.state_update.clone(), + orig_libs: self.orig_libs.take(), + copyleft_rewards: Some(CopyleftRewards::default()), + update_msg_sync_key: Some(update_msg_sync_key), + update_trans_lt: None, + update_copyleft_reward_address: None, + prev_account_stuff: None, + }; + + if let Ok(transaction) = transaction_res { + res.add_transaction(transaction, account_root)?; + } + + std::mem::swap(self, &mut res); + + self.prev_account_stuff = Some(Box::new(res)); + + Ok(()) } pub fn add_transaction(&mut self, transaction: &mut Transaction, account_root: Cell) -> Result<()> { transaction.set_prev_trans_hash(self.last_trans_hash.clone()); - transaction.set_prev_trans_lt(self.last_trans_lt); + transaction.set_prev_trans_lt(self.last_trans_lt); // 1010 // log::trace!("{} {}", self.collated_block_descr, debug_transaction(transaction.clone())?); self.account_root = account_root; self.state_update.new_hash = self.account_root.repr_hash(); let tr_root = transaction.serialize()?; + let tr_lt = transaction.logical_time(); // 1011 + self.last_trans_hash = tr_root.repr_hash(); - self.last_trans_lt = transaction.logical_time(); + self.last_trans_lt = tr_lt; + + self.update_trans_lt = Some(tr_lt); - self.transactions.setref( - &transaction.logical_time(), + self.transactions_mut()?.setref( + &tr_lt, &tr_root, transaction.total_fees() )?; if let Some(copyleft_reward) = transaction.copyleft_reward() { log::trace!("Copyleft reward {} {} from transaction {}", copyleft_reward.address, copyleft_reward.reward, self.last_trans_hash); - self.copyleft_rewards.add_copyleft_reward(©left_reward.address, ©left_reward.reward)?; + self.copyleft_rewards_mut()?.add_copyleft_reward(©left_reward.address, ©left_reward.reward)?; + self.update_copyleft_reward_address = Some(copyleft_reward.address.clone()); } Ok(()) @@ -110,8 +250,9 @@ impl ShardAccountStuff { pub fn update_public_libraries(&self, libraries: &mut Libraries) -> Result<()> { let account = self.read_account()?; let new_libs = account.libraries(); - if new_libs.root() != self.orig_libs.root() { - new_libs.scan_diff(&self.orig_libs, |key: UInt256, old, new| { + let orig_libs = self.orig_libs()?; + if new_libs.root() != orig_libs.root() { + new_libs.scan_diff(orig_libs, |key: UInt256, old, new| { let old = old.unwrap_or_default(); let new = new.unwrap_or_default(); if old.is_public_library() && !new.is_public_library() { diff --git a/src/types/limits.rs b/src/types/limits.rs index cb3e00d5..1ecfa171 100644 --- a/src/types/limits.rs +++ b/src/types/limits.rs @@ -107,8 +107,8 @@ impl BlockLimitStatus { } /// Update logical time - pub fn update_lt(&mut self, lt: u64) { - self.lt_current = max(self.lt_current, lt); + pub fn update_lt(&mut self, lt: u64, force: bool) { + self.lt_current = if force { lt } else { max(self.lt_current, lt) }; if self.lt_start > self.lt_current { self.lt_start = lt; } diff --git a/src/types/messages.rs b/src/types/messages.rs index d9f90ab8..14770038 100644 --- a/src/types/messages.rs +++ b/src/types/messages.rs @@ -77,6 +77,11 @@ impl MsgEnvelopeStuff { pub fn message(&self) -> &Message { &self.msg } pub fn message_hash(&self) -> UInt256 { self.env.message_hash() } pub fn message_cell(&self) -> Cell { self.env.message_cell() } + pub fn out_msg_key(&self) -> OutMsgQueueKey { + OutMsgQueueKey::with_account_prefix(&self.next_prefix(), self.message_hash()) + } + #[cfg(test)] + pub fn src_prefix(&self) -> &AccountIdPrefixFull { &self.src_prefix } pub fn dst_prefix(&self) -> &AccountIdPrefixFull { &self.dst_prefix } pub fn cur_prefix(&self) -> &AccountIdPrefixFull { &self.cur_prefix } pub fn next_prefix(&self) -> &AccountIdPrefixFull { &self.next_prefix } diff --git a/src/validator/collator.rs b/src/validator/collator.rs index 33edec43..30de09be 100644 --- a/src/validator/collator.rs +++ b/src/validator/collator.rs @@ -39,6 +39,7 @@ use crate::{ use adnl::common::Wait; use futures::try_join; use rand::Rng; +use tokio::sync::Mutex; use std::{ cmp::{max, min}, collections::{BinaryHeap, HashMap, HashSet}, @@ -49,6 +50,7 @@ use std::{ }, time::{Duration, Instant}, }; +use std::collections::BTreeMap; use ton_block::{ AddSub, BlkPrevInfo, Block, BlockCreateStats, BlockExtra, BlockIdExt, BlockInfo, CommonMsgInfo, ConfigParams, CopyleftRewards, CreatorStats, CurrencyCollection, Deserializable, ExtBlkRef, @@ -58,13 +60,15 @@ use ton_block::{ ParamLimitIndex, Serializable, ShardAccount, ShardAccountBlocks, ShardAccounts, ShardDescr, ShardFees, ShardHashes, ShardIdent, ShardStateSplit, ShardStateUnsplit, TopBlockDescrSet, Transaction, TransactionTickTock, UnixTime32, ValidatorSet, ValueFlow, WorkchainDescr, - Workchains, Account, AccountIdPrefixFull, OutQueueUpdates, OutMsgQueueInfo, MASTERCHAIN_ID + Workchains, Account, AccountIdPrefixFull, OutQueueUpdates, OutMsgQueueInfo, MASTERCHAIN_ID, + EnqueuedMsg, GetRepresentationHash }; use ton_executor::{ BlockchainConfig, ExecuteParams, OrdinaryTransactionExecutor, TickTockTransactionExecutor, TransactionExecutor, }; use ton_types::{error, fail, AccountId, Cell, HashmapType, Result, UInt256, UsageTree, SliceData}; +use ton_types::HashmapRemover; use crate::validator::validator_utils::is_remp_enabled; @@ -168,14 +172,26 @@ enum AsyncMessage { Copyleft(Message), Ext(Message), Int(MsgEnqueueStuff, bool), - New(MsgEnvelopeStuff, Cell), // prev_trans_cell + New(MsgEnvelopeStuff, Cell, u64), // prev_trans_cell TickTock(TransactionTickTock), } impl AsyncMessage { fn is_external(&self) -> bool { matches!(self, Self::Ext(_)) } + fn compute_message_hash(&self) -> Result> { + let hash_opt = match self { + Self::Recover(msg) | Self::Mint(msg) | Self::Copyleft(msg) | Self::Ext(msg) => Some(msg.hash()?), + Self::Int(enq, _) => Some(enq.message_hash()), + Self::New(env, _, _) => Some(env.message_hash()), + Self::TickTock(_) => None, + }; + Ok(hash_opt) + } } +#[derive(Debug)] +struct AsyncMessageSync(usize, AsyncMessage); + #[derive(Clone, Eq, PartialEq)] struct NewMessage { lt_hash: (u64, UInt256), @@ -210,20 +226,44 @@ impl PartialOrd for NewMessage { struct CollatorData { // lists, empty by default in_msgs: InMsgDescr, + /// * key - msg_sync_key + /// * value - in msg descr hash + in_msgs_descr_history: HashMap, out_msgs: OutMsgDescr, + /// * key - msg_sync_key + /// * value - list of out msgs descrs hashes + out_msgs_descr_history: HashMap)>>, accounts: ShardAccountBlocks, out_msg_queue_info: OutMsgQueueInfoStuff, + /// * key - msg_sync_key + /// * value - removed out msg key, EnqueuedMsg and is_new flag + del_out_queue_msg_history: HashMap, + /// * key - msg_sync_key + /// * value - msg key in out queue + add_out_queue_msg_history: HashMap>, shard_fees: ShardFees, shard_top_block_descriptors: Vec>, block_create_count: HashMap, new_messages: BinaryHeap, // using for priority queue + /// * key - msg_sync_key + /// * value - list of new msgs + new_messages_buffer: BTreeMap>, accepted_ext_messages: Vec, + /// * key - msg_sync_key + /// * value - ext msg id + accepted_ext_messages_buffer: HashMap, + /// * key - msg_sync_key + /// * value - ext msg id and error info rejected_ext_messages: Vec<(UInt256, String)>, + rejected_ext_messages_buffer: HashMap, accepted_remp_messages: Vec, rejected_remp_messages: Vec<(UInt256, String)>, ignored_remp_messages: Vec, usage_tree: UsageTree, imported_visited: HashSet, + /// * key - msg_sync_key + /// * value - last account lt after msg processing + tx_last_lt_buffer: HashMap, // determined fields gen_utime: u32, @@ -236,17 +276,31 @@ struct CollatorData { prev_stuff: Option, shards: Option, mint_msg: Option, + /// * key - msg_sync_key + /// * value - mint msg descr + mint_msg_buffer: BTreeMap>, recover_create_msg: Option, + /// * key - msg_sync_key + /// * value - recover create msg descr + recover_create_msg_buffer: BTreeMap>, copyleft_msgs: Vec, + /// * key - msg_sync_key + /// * value - list of copyleft msgs + copyleft_msgs_buffer: BTreeMap, // fields with default values skip_topmsgdescr: bool, skip_extmsg: bool, shard_conf_adjusted: bool, + // Will not support history. When parallel collation cancelled + // no new msgs can be processed so we do not need to check limits anymore block_limit_status: BlockLimitStatus, block_create_total: u64, inbound_queues_empty: bool, + /// * key - msg_sync_key + /// * value - incoming internal msg LT HASH last_proc_int_msg: (u64, UInt256), + last_proc_int_msg_buffer: BTreeMap, shards_max_end_lt: u64, before_split: bool, now_upper_limit: u32, @@ -265,6 +319,9 @@ struct CollatorData { execute_count: usize, out_msg_count: usize, in_msg_count: usize, + + // string with format like `-1:8000000000000000, 100500`, is used for logging. + collated_block_descr: Arc, } impl CollatorData { @@ -275,24 +332,33 @@ impl CollatorData { usage_tree: UsageTree, prev_data: &PrevData, is_masterchain: bool, + collated_block_descr: Arc, ) -> Result { let limits = Arc::new(config.raw_config().block_limits(is_masterchain)?); let ret = Self { in_msgs: InMsgDescr::default(), + in_msgs_descr_history: Default::default(), out_msgs: OutMsgDescr::default(), + out_msgs_descr_history: Default::default(), accounts: ShardAccountBlocks::default(), out_msg_queue_info: OutMsgQueueInfoStuff::default(), + del_out_queue_msg_history: Default::default(), + add_out_queue_msg_history: Default::default(), shard_fees: ShardFees::default(), shard_top_block_descriptors: Vec::new(), block_create_count: HashMap::new(), new_messages: Default::default(), + new_messages_buffer: Default::default(), accepted_ext_messages: Default::default(), + accepted_ext_messages_buffer: Default::default(), rejected_ext_messages: Default::default(), + rejected_ext_messages_buffer: Default::default(), accepted_remp_messages: Default::default(), rejected_remp_messages: Default::default(), ignored_remp_messages: Default::default(), usage_tree, imported_visited: HashSet::new(), + tx_last_lt_buffer: Default::default(), gen_utime, config, start_lt: None, @@ -303,8 +369,11 @@ impl CollatorData { prev_stuff: None, shards: None, mint_msg: None, + mint_msg_buffer: BTreeMap::new(), recover_create_msg: None, + recover_create_msg_buffer: BTreeMap::new(), copyleft_msgs: Default::default(), + copyleft_msgs_buffer: BTreeMap::new(), skip_topmsgdescr: false, skip_extmsg: false, shard_conf_adjusted: false, @@ -312,6 +381,7 @@ impl CollatorData { block_create_total: 0, inbound_queues_empty: false, last_proc_int_msg: (0, UInt256::default()), + last_proc_int_msg_buffer: Default::default(), want_merge: false, underload_history: prev_data.underload_history() << 1, want_split: false, @@ -324,6 +394,7 @@ impl CollatorData { out_msg_count: 0, in_msg_count: 0, before_split: false, + collated_block_descr, }; Ok(ret) } @@ -342,13 +413,41 @@ impl CollatorData { self.out_msgs.data().cloned().ok_or_else(|| error!("out msg descr is empty")) } + /// Stores processed internal message LT HASH to buffer + fn add_last_proc_int_msg_to_buffer(&mut self, src_msg_sync_key: usize, lt_hash: (u64, UInt256)) { + log::trace!( + "{}: added last_proc_int_msg {}: ({}, {:x}) to buffer", + self.collated_block_descr, + src_msg_sync_key, lt_hash.0, lt_hash.1, + ); + self.last_proc_int_msg_buffer.insert(src_msg_sync_key, lt_hash); + } + /// Clean out processed internal message LT HASH from buffer by src msg + fn revert_last_proc_int_msg_by_src_msg(&mut self, src_msg_sync_key: &usize) { + let lt_hash = self.last_proc_int_msg_buffer.remove(src_msg_sync_key); + log::trace!( + "{}: removed last_proc_int_msg {}: ({:?}) to buffer", + self.collated_block_descr, + src_msg_sync_key, lt_hash, + ); + } + /// Updates last processed internal message LT HASH from not reverted in buffer + fn commit_last_proc_int_msg(&mut self) -> Result<()> { + log::trace!("{}: last_proc_int_msg_buffer: {:?}", self.collated_block_descr, self.last_proc_int_msg_buffer); + while let Some((_, lt_hash)) = self.last_proc_int_msg_buffer.pop_first() { + self.update_last_proc_int_msg(lt_hash)?; + } + Ok(()) + } + fn update_last_proc_int_msg(&mut self, new_lt_hash: (u64, UInt256)) -> Result<()> { if self.last_proc_int_msg < new_lt_hash { CHECK!(new_lt_hash.0 > 0); - log::trace!("last_proc_int_msg updated to ({},{:x})", new_lt_hash.0, new_lt_hash.1); + log::trace!("{}: last_proc_int_msg updated to ({},{:x})", self.collated_block_descr, new_lt_hash.0, new_lt_hash.1); self.last_proc_int_msg = new_lt_hash; } else { - log::error!("processed message ({},{:x}) AFTER message ({},{:x})", new_lt_hash.0, new_lt_hash.1, + log::error!("{}: processed message ({},{:x}) AFTER message ({},{:x})", + self.collated_block_descr, new_lt_hash.0, new_lt_hash.1, self.last_proc_int_msg.0, self.last_proc_int_msg.1); self.last_proc_int_msg.0 = std::u64::MAX; fail!("internal message processing order violated!") @@ -357,12 +456,37 @@ impl CollatorData { } fn update_lt(&mut self, lt: u64) { - self.block_limit_status.update_lt(lt); + self.block_limit_status.update_lt(lt, false); + } + + /// Stores transaction last LT to buffer by src msg, updates block_limit_status + fn add_tx_last_lt_to_buffer(&mut self, src_msg_sync_key: usize, tx_last_lt: u64) { + self.tx_last_lt_buffer.insert(src_msg_sync_key, tx_last_lt); + self.block_limit_status.update_lt(tx_last_lt, false); + } + /// Clean transaction last LT from buffer by src msg + fn revert_tx_last_lt_by_src_msg(&mut self, src_msg_sync_key: &usize) { + self.tx_last_lt_buffer.remove(src_msg_sync_key); + } + /// Saves max transaction last LT to block_limit_status and returns value + fn commit_tx_last_lt(&mut self) -> Option { + if let Some(max_lt) = self.tx_last_lt_buffer.values().reduce(|curr, next| curr.max(next)) { + self.block_limit_status.update_lt(*max_lt, true); + Some(*max_lt) + } else { + None + } } /// add in and out messages from to block, and to new message queue - fn new_transaction(&mut self, transaction: &Transaction, tr_cell: Cell, in_msg_opt: Option<&InMsg>) -> Result<()> { + fn new_transaction( + &mut self, + transaction: &Transaction, + tr_cell: Cell, + in_msg_opt: Option<&InMsg>, + src_msg_sync_key: usize, + ) -> Result<()> { // log::trace!( // "new transaction, message {:x}\n{}", // in_msg_opt.map(|m| m.message_cell().unwrap().repr_hash()).unwrap_or_default(), @@ -374,6 +498,7 @@ impl CollatorData { self.block_limit_status.add_transaction(transaction.logical_time() == self.start_lt()? + 1); if let Some(in_msg) = in_msg_opt { self.add_in_msg_to_block(in_msg)?; + self.add_in_msg_descr_to_history(src_msg_sync_key, in_msg)?; } let shard = self.out_msg_queue_info.shard().clone(); transaction.out_msgs.iterate_slices(|slice| { @@ -386,17 +511,24 @@ impl CollatorData { let use_hypercube = !self.config.has_capability(GlobalCapabilities::CapOffHypercube); let fwd_fee = *info.fwd_fee(); let enq = MsgEnqueueStuff::new(msg.clone(), &shard, fwd_fee, use_hypercube)?; - self.enqueue_count += 1; - self.out_msg_queue_info.add_message(&enq)?; - // Add to message block here for counting time later it may be replaced + let out_msg = OutMsg::new(enq.envelope_cell(), tr_cell.clone()); - self.add_out_msg_to_block(msg_hash.clone(), &out_msg)?; - self.new_messages.push(NewMessage::new((info.created_lt, msg_hash), msg, tr_cell.clone(), enq.next_prefix().clone())); + let new_msg = NewMessage::new((info.created_lt, msg_hash.clone()), msg, tr_cell.clone(), enq.next_prefix().clone()); + + self.add_out_queue_msg_with_history(src_msg_sync_key, enq)?; + + // Add to message block here for counting time later it may be replaced + let prev_out_msg_slice_opt = self.add_out_msg_to_block(msg_hash.clone(), &out_msg)?; + self.add_out_msg_descr_to_history(src_msg_sync_key, msg_hash, prev_out_msg_slice_opt); + + self.add_new_message_to_buffer(src_msg_sync_key, new_msg); } CommonMsgInfo::ExtOutMsgInfo(_) => { let out_msg = OutMsg::external(msg_cell, tr_cell.clone()); - self.add_out_msg_to_block(out_msg.read_message_hash()?, &out_msg)?; + let msg_hash = out_msg.read_message_hash()?; + let prev_out_msg_slice_opt = self.add_out_msg_to_block(msg_hash.clone(), &out_msg)?; + self.add_out_msg_descr_to_history(src_msg_sync_key, msg_hash, prev_out_msg_slice_opt); } CommonMsgInfo::ExtInMsgInfo(_) => fail!("External inbound message cannot be output") }; @@ -408,30 +540,159 @@ impl CollatorData { /// put InMsg to block fn add_in_msg_to_block(&mut self, in_msg: &InMsg) -> Result<()> { self.in_msg_count += 1; - let msg_cell = in_msg.serialize()?; self.in_msgs.insert(in_msg)?; + + let msg_cell = in_msg.serialize()?; self.block_limit_status.register_in_msg_op(&msg_cell, &self.in_msgs_root()?) } + /// Stores in_msg descr hash by src msg + fn add_in_msg_descr_to_history(&mut self, src_msg_sync_key: usize, in_msg: &InMsg) -> Result<()> { + let msg_hash = in_msg.message_cell()?.repr_hash(); + self.in_msgs_descr_history.insert(src_msg_sync_key, msg_hash); + Ok(()) + } + /// Removes in_msg descr created by src msg. Does not update block_limit_status + fn revert_in_msgs_descr_by_src_msg(&mut self, src_msg_sync_key: &usize) -> Result<()> { + if let Some(msg_hash) = self.in_msgs_descr_history.remove(src_msg_sync_key) { + self.in_msg_count -= 1; + let key = SliceData::load_builder(msg_hash.write_to_new_cell()?)?; + self.in_msgs.remove(key)?; + } + Ok(()) + } + /// Clean out in_msg descr history + fn commit_in_msgs_descr_by_src_msg(&mut self) { + self.in_msgs_descr_history.clear(); + self.in_msgs_descr_history.shrink_to_fit(); + } + /// put OutMsg to block - fn add_out_msg_to_block(&mut self, key: UInt256, out_msg: &OutMsg) -> Result<()> { + fn add_out_msg_to_block(&mut self, key: UInt256, out_msg: &OutMsg) -> Result> { self.out_msg_count += 1; - self.out_msgs.insert_with_key(key, out_msg)?; + + let prev_value = self.out_msgs.insert_with_key_return_prev(key, out_msg)?; let msg_cell = out_msg.serialize()?; - self.block_limit_status.register_out_msg_op(&msg_cell, &self.out_msgs_root()?) + self.block_limit_status.register_out_msg_op(&msg_cell, &self.out_msgs_root()?)?; + + Ok(prev_value) + } + /// put OutMsg to block, does not update block_limit_status + fn add_out_msg_to_block_without_limits_update(&mut self, key: UInt256, out_msg: &OutMsg) -> Result> { + self.out_msg_count += 1; + + self.out_msgs.insert_with_key_return_prev(key, out_msg) + } + + /// Stores out_msg descr hash by src msg + fn add_out_msg_descr_to_history( + &mut self, + src_msg_sync_key: usize, + out_msg_hash: UInt256, + prev_out_msg_slice_opt: Option, + ) { + if let Some(v) = self.out_msgs_descr_history.get_mut(&src_msg_sync_key) { + v.push((out_msg_hash, prev_out_msg_slice_opt)); + } else { + self.out_msgs_descr_history.insert(src_msg_sync_key, vec![(out_msg_hash, prev_out_msg_slice_opt)]); + } + } + /// Removes all out_msg descrs created by src msg. Does not update block_limit_status + fn revert_out_msgs_descr_by_src_msg(&mut self, src_msg_sync_key: &usize) -> Result<()> { + if let Some(msgs_history) = self.out_msgs_descr_history.remove(src_msg_sync_key) { + for (msg_hash, prev_out_msg_slice_opt) in msgs_history { + self.out_msg_count -= 1; + + // return prev out msg descr to map if exists + if let Some(mut prev_out_msg_slice) = prev_out_msg_slice_opt { + log::debug!("{}: previous out msg descr {:x} reverted to block", self.collated_block_descr, msg_hash); + let prev_out_msg = OutMsg::construct_from(&mut prev_out_msg_slice)?; + self.add_out_msg_to_block_without_limits_update(msg_hash, &prev_out_msg)?; + } else { + let key = SliceData::load_builder(msg_hash.write_to_new_cell()?)?; + self.out_msgs.remove(key)?; + } + } + } + Ok(()) + } + /// Clean out out_msg descrs history + fn commit_out_msgs_descr_by_src_msg(&mut self) { + self.out_msgs_descr_history.clear(); + self.out_msgs_descr_history.shrink_to_fit(); + } + + /// Stores accepted ext message id in buffer of accepted by src msg sync id + fn add_accepted_ext_message_to_buffer(&mut self, src_msg_sync_key: usize, msg_id: UInt256) { + self.accepted_ext_messages_buffer.insert(src_msg_sync_key, msg_id); + } + /// Clean accepted ext message id from buffer + fn revert_accepted_ext_message_by_src_msg(&mut self, src_msg_sync_key: &usize) { + self.accepted_ext_messages_buffer.remove(src_msg_sync_key); + } + /// Add accepted ext messages from buffer to collator data + fn commit_accepted_ext_messages(&mut self) { + for (_, msg_id) in self.accepted_ext_messages_buffer.drain() { + self.accepted_ext_messages.push(msg_id); + } + } + + /// Stores rejected ext message info in buffer of accepted by src msg sync id + fn add_rejected_ext_message_to_buffer(&mut self, src_msg_sync_key: usize, rejected_msg: (UInt256, String)) { + self.rejected_ext_messages_buffer.insert(src_msg_sync_key, rejected_msg); + } + /// Clean rejected ext message info from buffer + fn revert_rejected_ext_message_by_src_msg(&mut self, src_msg_sync_key: &usize) { + self.rejected_ext_messages_buffer.remove(src_msg_sync_key); + } + /// Add rejected ext messages info from buffer to collator data + fn commit_rejected_ext_messages(&mut self) { + for (_, msg_info) in self.rejected_ext_messages_buffer.drain() { + self.rejected_ext_messages.push(msg_info); + } } /// delete message from state queue - fn del_out_msg_from_state(&mut self, key: &OutMsgQueueKey) -> Result<()> { - log::debug!("del_out_msg_from_state {:x}", key); + fn del_out_msg_from_state(&mut self, key: &OutMsgQueueKey) -> Result { + log::debug!("{}: del_out_msg_from_state {:x}", self.collated_block_descr, key); self.dequeue_count += 1; - self.out_msg_queue_info.del_message(key)?; + let enq = self.out_msg_queue_info.del_message(key)?; self.block_limit_status.register_out_msg_queue_op( self.out_msg_queue_info.out_queue()?.data(), &self.usage_tree, false )?; + Ok(enq) + } + + /// Removes msg from out queue, stores msg in the history to be able to revert it futher + fn del_out_queue_msg_with_history(&mut self, src_msg_sync_key: usize, key: OutMsgQueueKey, is_new: bool) -> Result<()> { + log::debug!("{}: del_out_queue_msg_with_history {:x}", self.collated_block_descr, key); + if is_new { self.enqueue_count -= 1; } else { self.dequeue_count += 1; } + let enq = self.out_msg_queue_info.del_message(&key)?; + self.block_limit_status.register_out_msg_queue_op( + self.out_msg_queue_info.out_queue()?.data(), + &self.usage_tree, + false + )?; + self.del_out_queue_msg_history.insert(src_msg_sync_key, (key, enq, is_new)); + Ok(()) + } + /// Reverts previously removed msg from out queue + fn revert_del_out_queue_msg(&mut self, src_msg_sync_key: &usize) -> Result<()> { + if let Some((key, enq, is_new)) = self.del_out_queue_msg_history.remove(src_msg_sync_key) { + if is_new { self.enqueue_count += 1; } else { self.dequeue_count -= 1; } + let enq_stuff = MsgEnqueueStuff::from_enqueue(enq)?; + self.out_msg_queue_info.add_message(&enq_stuff)?; + log::debug!("{}: reverted del_out_queue_msg {:x}", self.collated_block_descr, key); + } + Ok(()) + } + /// Cleans out queue msgs removing history + fn commit_del_out_queue_msgs(&mut self) -> Result<()> { + self.del_out_queue_msg_history.clear(); + self.del_out_queue_msg_history.shrink_to_fit(); Ok(()) } @@ -447,6 +708,112 @@ impl CollatorData { Ok(()) } + /// Adds new msg to out queue, stores history to be able to revert futher + fn add_out_queue_msg_with_history(&mut self, src_msg_sync_key: usize, enq_stuff: MsgEnqueueStuff) -> Result<()> { + self.enqueue_count += 1; + self.out_msg_queue_info.add_message(&enq_stuff)?; + let key = enq_stuff.out_msg_key(); + if let Some(v) = self.add_out_queue_msg_history.get_mut(&src_msg_sync_key) { + v.push(key); + } else { + self.add_out_queue_msg_history.insert(src_msg_sync_key, vec![key]); + } + Ok(()) + } + /// Removes previously added new msgs from out queue + fn revert_add_out_queue_msgs(&mut self, src_msg_sync_key: &usize) -> Result<()> { + if let Some(keys) = self.add_out_queue_msg_history.remove(src_msg_sync_key) { + let remove_count = keys.len(); + for key in keys { + self.enqueue_count -= 1; + self.out_msg_queue_info.del_message(&key)?; + } + log::debug!("{}: {} new created messages removed from out queue", self.collated_block_descr, remove_count); + } + Ok(()) + } + /// Cleans out queue msgs adding history + fn commit_add_out_queue_msgs(&mut self) -> Result<()> { + self.add_out_queue_msg_history.clear(); + self.add_out_queue_msg_history.shrink_to_fit(); + Ok(()) + } + + /// Stores new internal msg, created by src msg, to buffer + fn add_new_message_to_buffer(&mut self, src_msg_sync_key: usize, new_msg: NewMessage) { + if let Some(v) = self.new_messages_buffer.get_mut(&src_msg_sync_key) { + v.push(new_msg); + } else { + self.new_messages_buffer.insert(src_msg_sync_key, vec![new_msg]); + } + } + /// Clean out new internal msgs, created by src msg, from buffer + fn revert_new_messages_by_src_msg(&mut self, src_msg_sync_key: &usize) { + self.new_messages_buffer.remove(src_msg_sync_key); + } + /// Adds new internal msgs to new_messages queue for processing + fn commit_new_messages(&mut self) { + let new_msgs_count = self.new_messages_buffer.len(); + while let Some((_, msgs)) = self.new_messages_buffer.pop_first() { + for new_msg in msgs { + log::trace!( + "{}: committed new created msg {:x} (bounced: {:?}) from {:x} to account {:x} from buffer to new_messages", + self.collated_block_descr, new_msg.lt_hash.1, + new_msg.msg.int_header().map(|h| h.bounced), + new_msg.msg.src().unwrap_or_default().address(), + new_msg.msg.dst().unwrap_or_default().address(), + ); + self.new_messages.push(new_msg); + } + } + log::debug!("{}: {} new created messages committed from buffer to new_messages", self.collated_block_descr, new_msgs_count); + } + + /// Stores mint message in buffer by src msg + fn add_mint_msg_to_buffer(&mut self, src_msg_sync_key: usize, msg: Option) { + self.mint_msg_buffer.insert(src_msg_sync_key, msg); + } + /// Clean mint message from buffer by src msg + fn revert_mint_msg_by_src_msg(&mut self, src_msg_sync_key: &usize) { + self.mint_msg_buffer.remove(src_msg_sync_key); + } + /// Save the last processed and not reverted mint message to collator data + fn commit_mint_msg(&mut self) { + if let Some((k, v)) = self.mint_msg_buffer.pop_last() { + self.mint_msg = v; + } + } + + /// Stores recover create message in buffer by src msg + fn add_recover_create_msg_to_buffer(&mut self, src_msg_sync_key: usize, msg: Option) { + self.recover_create_msg_buffer.insert(src_msg_sync_key, msg); + } + /// Clean recover create message from buffer by src msg + fn revert_recover_create_msg_by_src_msg(&mut self, src_msg_sync_key: &usize) { + self.recover_create_msg_buffer.remove(src_msg_sync_key); + } + /// Save the last processed and not reverted recover create message to collator data + fn commit_recover_create_msg(&mut self) { + if let Some((k, v)) = self.recover_create_msg_buffer.pop_last() { + self.recover_create_msg = v; + } + } + + /// Stores copyleft message in buffer by src msg + fn add_copyleft_msg_to_buffer(&mut self, src_msg_sync_key: usize, msg: InMsg) { + self.copyleft_msgs_buffer.insert(src_msg_sync_key, msg); + } + /// Clean copyleft message from buffer by src msg + fn revert_copyleft_msg_by_src_msg(&mut self, src_msg_sync_key: &usize) { + self.copyleft_msgs_buffer.remove(src_msg_sync_key); + } + /// Save all not reverted copyleft messages to collator data + fn commit_copyleft_msgs(&mut self) { + while let Some((_, msg)) = self.copyleft_msgs_buffer.pop_first() { + self.copyleft_msgs.push(msg); + } + } + fn enqueue_transit_message( &mut self, shard: &ShardIdent, @@ -535,7 +902,7 @@ impl CollatorData { if self.start_lt.is_some() { fail!("`start_lt` is already initialized") } - self.block_limit_status.update_lt(lt); + self.block_limit_status.update_lt(lt, false); self.start_lt = Some(lt); Ok(()) } @@ -603,7 +970,7 @@ impl CollatorData { fn shard_conf_adjusted(&self) -> bool { self.shard_conf_adjusted } fn set_shard_conf_adjusted(&mut self) { self.shard_conf_adjusted = true; } - fn dequeue_message(&mut self, enq: MsgEnqueueStuff, deliver_lt: u64, short: bool) -> Result<()> { + fn dequeue_message(&mut self, enq: MsgEnqueueStuff, deliver_lt: u64, short: bool) -> Result> { self.dequeue_count += 1; let out_msg = match short { true => OutMsg::dequeue_short(enq.envelope_hash(), enq.next_prefix(), deliver_lt), @@ -641,25 +1008,106 @@ impl CollatorData { } } +struct ParallelMsgsCounter { + max_parallel_threads: usize, + max_msgs_queue_on_account: usize, + + limits_reached: Arc, + msgs_by_accounts: Arc)>>, +} + +impl ParallelMsgsCounter { + pub fn new(max_parallel_threads: usize, max_msgs_queue_on_account: usize) -> Self { + Self { + max_parallel_threads: max_parallel_threads.max(1), + max_msgs_queue_on_account: max_msgs_queue_on_account.max(1), + + limits_reached: Arc::new(AtomicBool::new(false)), + + msgs_by_accounts: Arc::new(Mutex::new((0, HashMap::new()))), + } + } + + pub fn limits_reached(&self) -> bool { + self.limits_reached.load(Ordering::Relaxed) + } + fn set_limits_reached(&self, val: bool) { + self.limits_reached.store(val, Ordering::Relaxed); + } + + pub async fn add_account_msgs_counter(&self, account_id: AccountId) { + let account_id_str = format!("{:x}", account_id); + let mut guard = self.msgs_by_accounts.clone().lock_owned().await; + let (active_threads, msgs_by_account) = &mut *guard; + let msgs_count = msgs_by_account + .entry(account_id) + .and_modify(|val| { + if *val == 0 { + *active_threads += 1; + } + *val += 1; + }) + .or_insert_with(|| { + *active_threads += 1; + 1 + }); + if *msgs_count >= self.max_msgs_queue_on_account || *active_threads >= self.max_parallel_threads { + self.set_limits_reached(true); + } + + log::trace!("ParallelMsgsCounter: msgs count inreased for {}, counter state is: ({}, {:?})", account_id_str, active_threads, msgs_by_account); + } + + pub async fn sub_account_msgs_counter(&self, account_id: AccountId) { + let account_id_str = format!("{:x}", account_id); + let mut guard = self.msgs_by_accounts.clone().lock_owned().await; + let (active_threads, msgs_by_account) = &mut *guard; + let msgs_count = msgs_by_account + .entry(account_id) + .and_modify(|val| { + *val -= 1; + if *val == 0 { + *active_threads -= 1; + } + }) + .or_insert(0); + if *msgs_count < self.max_msgs_queue_on_account { + if *active_threads < self.max_parallel_threads && msgs_by_account.values().all(|c| *c < self.max_msgs_queue_on_account) { + self.set_limits_reached(false); + } + } + + log::trace!("ParallelMsgsCounter: msgs count decreased for {}, counter state is: ({}, {:?})", account_id_str, active_threads, msgs_by_account); + } +} + struct ExecutionManager { changed_accounts: HashMap< AccountId, ( - tokio::sync::mpsc::UnboundedSender>, + tokio::sync::mpsc::UnboundedSender>, tokio::task::JoinHandle> ) >, - - receive_tr: tokio::sync::mpsc::UnboundedReceiver, Result)>>, - wait_tr: Arc, Result)>>, + + msgs_queue: Vec<(AccountId, bool, Option)>, + accounts_processed_msgs: HashMap>, + + cancellation_token: tokio_util::sync::CancellationToken, + f_check_finalize_parallel_timeout: Box (bool, u32) + Send>, + + receive_tr: tokio::sync::mpsc::UnboundedReceiver, Result, u64)>>, + wait_tr: Arc, Result, u64)>>, max_collate_threads: usize, libraries: Libraries, gen_utime: u32, + parallel_msgs_counter: ParallelMsgsCounter, + // bloc's start logical time start_lt: u64, // actual maximum logical time - max_lt: Arc, + max_lt: u64, // this time is used if account's lt is smaller min_lt: Arc, // block random seed @@ -673,6 +1121,9 @@ struct ExecutionManager { collated_block_descr: Arc, debug: bool, config: BlockchainConfig, + + #[cfg(test)] + test_msg_process_sleep: u64, } impl ExecutionManager { @@ -685,16 +1136,23 @@ impl ExecutionManager { libraries: Libraries, config: BlockchainConfig, max_collate_threads: usize, + max_collate_msgs_queue_on_account: usize, collated_block_descr: Arc, debug: bool, + f_check_finalize_parallel_timeout: Box (bool, u32) + Send>, ) -> Result { log::trace!("{}: ExecutionManager::new", collated_block_descr); let (wait_tr, receive_tr) = Wait::new(); Ok(Self { changed_accounts: HashMap::new(), + msgs_queue: Vec::new(), + accounts_processed_msgs: HashMap::new(), + cancellation_token: tokio_util::sync::CancellationToken::new(), + f_check_finalize_parallel_timeout, receive_tr, wait_tr, max_collate_threads, + parallel_msgs_counter: ParallelMsgsCounter::new(max_collate_threads, max_collate_msgs_queue_on_account), libraries, config, start_lt, @@ -702,28 +1160,54 @@ impl ExecutionManager { seed_block, #[cfg(feature = "signature_with_id")] signature_id, - max_lt: Arc::new(AtomicU64::new(start_lt + 1)), + max_lt: start_lt + 1, min_lt: Arc::new(AtomicU64::new(start_lt + 1)), total_trans_duration: Arc::new(AtomicU64::new(0)), collated_block_descr, debug, + #[cfg(test)] + test_msg_process_sleep: 0, }) } + #[cfg(test)] + pub fn set_test_msg_process_sleep(&mut self, sleep_timeout: u64) { + self.test_msg_process_sleep = sleep_timeout; + } + // waits and finalizes all parallel tasks - pub async fn wait_transactions(&mut self, collator_data: &mut CollatorData) -> Result<()> { + pub async fn wait_transactions( + &mut self, + collator_data: &mut CollatorData, + ) -> Result<()> { log::trace!("{}: wait_transactions", self.collated_block_descr); + if self.is_parallel_processing_cancelled() { + log::debug!("{}: parallel collation was already stopped, do not wait transactions anymore", self.collated_block_descr); + return Ok(()); + } while self.wait_tr.count() > 0 { + log::trace!("{}: wait_tr count = {}", self.collated_block_descr, self.wait_tr.count()); self.wait_transaction(collator_data).await?; + + // stop parallel collation if finalize timeout reached + let check_finalize_parallel = (self.f_check_finalize_parallel_timeout)(); + if check_finalize_parallel.0 { + log::warn!("{}: FINALIZE PARALLEL TIMEOUT ({}ms) is elapsed, stop parallel collation", + self.collated_block_descr, check_finalize_parallel.1, + ); + self.cancel_parallel_processing(); + break; + } } - self.min_lt.fetch_max(self.max_lt.load(Ordering::Relaxed), Ordering::Relaxed); + self.commit_processed_msgs_changes(collator_data)?; + self.min_lt.fetch_max(self.max_lt, Ordering::Relaxed); Ok(()) } - // checks if a number of parallel transactilns is not too big, waits and finalizes some if needed. + // checks limits of parallel transactions reached, waits and finalizes some if needed. pub async fn check_parallel_transactions(&mut self, collator_data: &mut CollatorData) -> Result<()> { log::trace!("{}: check_parallel_transactions", self.collated_block_descr); - if self.wait_tr.count() >= self.max_collate_threads { + if self.parallel_msgs_counter.limits_reached() { self.wait_transaction(collator_data).await?; } Ok(()) @@ -735,17 +1219,30 @@ impl ExecutionManager { msg: AsyncMessage, prev_data: &PrevData, collator_data: &mut CollatorData, - ) -> Result<()> { + ) -> Result> { log::trace!("{}: execute (adding into queue): {:x}", self.collated_block_descr, account_id); - let msg = Arc::new(msg); + let msg_sync_key = self.get_next_msg_sync_key(); + + // store last processed internal (incl. New) message LT HASH in buffer + if let Some(lt_hash) = match &msg { + AsyncMessage::Int(enq, _) => Some((enq.created_lt(), enq.message_hash())), + AsyncMessage::New(env, _, created_lt) => Some((*created_lt, env.message_hash())), + _ => None, + } { + collator_data.add_last_proc_int_msg_to_buffer(msg_sync_key, lt_hash); + } + + let msg_hash = msg.compute_message_hash()?; + + let msg = Arc::new(AsyncMessageSync(msg_sync_key, msg)); if let Some((sender, _handle)) = self.changed_accounts.get(&account_id) { self.wait_tr.request(); sender.send(msg)?; } else { let shard_acc = if let Some(shard_acc) = prev_data.accounts().account(&account_id)? { shard_acc - } else if msg.is_external() { - return Ok(()); // skip external messages for unexisting accounts + } else if msg.1.is_external() { + return Ok(None); // skip external messages for unexisting accounts } else { ShardAccount::default() }; @@ -755,25 +1252,28 @@ impl ExecutionManager { )?; self.wait_tr.request(); sender.send(msg)?; - self.changed_accounts.insert(account_id, (sender, handle)); - } + self.changed_accounts.insert(account_id.clone(), (sender, handle)); + }; + + self.append_msgs_queue(msg_sync_key, &account_id, msg_hash); + self.parallel_msgs_counter.add_account_msgs_counter(account_id).await; self.check_parallel_transactions(collator_data).await?; - Ok(()) + Ok(Some(msg_sync_key)) } fn start_account_job( &self, account_addr: AccountId, shard_acc: ShardAccount, - ) -> Result<(tokio::sync::mpsc::UnboundedSender>, tokio::task::JoinHandle>)> { + ) -> Result<(tokio::sync::mpsc::UnboundedSender>, tokio::task::JoinHandle>)> { log::trace!("{}: start_account_job: {:x}", self.collated_block_descr, account_addr); let mut shard_acc = ShardAccountStuff::new( account_addr, shard_acc, - Arc::new(AtomicU64::new(self.min_lt.load(Ordering::Relaxed))), + self.min_lt.load(Ordering::Relaxed), )?; let debug = self.debug; @@ -783,34 +1283,49 @@ impl ExecutionManager { #[cfg(feature = "signature_with_id")] let signature_id = self.signature_id; let collated_block_descr = self.collated_block_descr.clone(); - let total_trans_duration = self.total_trans_duration.clone(); - let wait_tr = self.wait_tr.clone(); + let exec_mgr_total_trans_duration_rw = self.total_trans_duration.clone(); + let exec_mgr_wait_tr = self.wait_tr.clone(); let config = self.config.clone(); - let min_lt = self.min_lt.clone(); - let max_lt = self.max_lt.clone(); + let exec_mgr_min_lt_ro = self.min_lt.clone(); let libraries = self.libraries.clone().inner(); - let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel::>(); + let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel::>(); + let cancellation_token = self.cancellation_token.clone(); + #[cfg(test)] + let test_msg_process_sleep = self.test_msg_process_sleep; let handle = tokio::spawn(async move { while let Some(new_msg) = receiver.recv().await { + if cancellation_token.is_cancelled() { + log::debug!( + "{}: parallel collation was cancelled before message {} processing on {:x}", + collated_block_descr, + new_msg.0, + shard_acc.account_addr(), + ); + exec_mgr_wait_tr.respond(None); + break; + } + + #[cfg(test)] + { + let sleep_ms = test_msg_process_sleep; + log::trace!("{}: (msg_sync_key: {}) sleep {}ms to emulate hard load and slow smart contract...", collated_block_descr, new_msg.0, sleep_ms); + tokio::time::sleep(tokio::time::Duration::from_millis(sleep_ms)).await; + } + log::trace!("{}: new message for {:x}", collated_block_descr, shard_acc.account_addr()); let config = config.clone(); // TODO: use Arc - shard_acc.lt().fetch_max(min_lt.load(Ordering::Relaxed), Ordering::Relaxed); - shard_acc.lt().fetch_max( - shard_acc.last_trans_lt() + 1, - Ordering::Relaxed - ); - shard_acc.lt().fetch_max( - shard_acc.last_trans_lt() + 1, - Ordering::Relaxed - ); + let mut lt = shard_acc.lt().max(exec_mgr_min_lt_ro.load(Ordering::Relaxed)); // 1000 + lt = lt.max(shard_acc.last_trans_lt() + 1); // 1010+1=1011 + + let tx_last_lt = Arc::new(AtomicU64::new(lt)); let mut account_root = shard_acc.account_root(); let params = ExecuteParams { state_libs: libraries.clone(), block_unixtime, block_lt, - last_tr_lt: shard_acc.lt(), + last_tr_lt: tx_last_lt.clone(), // 1011, passed by reference seed_block: seed_block.clone(), debug, block_version: supported_version(), @@ -822,21 +1337,43 @@ impl ExecutionManager { let (mut transaction_res, account_root, duration) = tokio::task::spawn_blocking(move || { let now = std::time::Instant::now(); ( - Self::execute_new_message(&new_msg1, &mut account_root, config, params), + Self::execute_new_message(&new_msg1.1, &mut account_root, config, params), account_root, now.elapsed().as_micros() as u64 ) }).await?; - if let Ok(transaction) = transaction_res.as_mut() { - shard_acc.add_transaction(transaction, account_root)?; + if cancellation_token.is_cancelled() { + log::debug!( + "{}: parallel collation was cancelled after message {} processing on {:x}", + collated_block_descr, + new_msg.0, + shard_acc.account_addr(), + ); + exec_mgr_wait_tr.respond(None); + break; } - total_trans_duration.fetch_add(duration, Ordering::Relaxed); + + // LT transformations during execution: + // * params.last_tr_lt = max(account.last_tr_time(), params.last_tr_lt, in_msg.lt() + 1) + // * transaction.logical_time() = params.last_tr_lt (copy) + // * params.last_tr_lt = 1 + out_msgs.len() + // * tx_last_lt = params.last_tr_lt (by ref) + // So for 2 out_msgs may be: + // * transaction.logical_time() == 1011 + // * params.last_tr_lt == 1014 (1011+1+2) or 1104 (account.last_tr_time()+1+2) or 1024 (in_msg.lt()+1+1+2) + // * account.last_tr_time() == params.last_tr_lt == 1014 or 1104 or 1024 + // * tx_last_lt == params.last_tr_lt == 1014 or 1104 or 1024 + + let tx_last_lt = tx_last_lt.load(Ordering::Relaxed); + + shard_acc.apply_transaction_res(new_msg.0, tx_last_lt, &mut transaction_res, account_root)?; + + exec_mgr_total_trans_duration_rw.fetch_add(duration, Ordering::Relaxed); log::trace!("{}: account {:x} TIME execute {}μ;", collated_block_descr, shard_acc.account_addr(), duration); - max_lt.fetch_max(shard_acc.lt().load(Ordering::Relaxed), Ordering::Relaxed); - wait_tr.respond(Some((new_msg, transaction_res))); + exec_mgr_wait_tr.respond(Some((new_msg, transaction_res, tx_last_lt))); } Ok(shard_acc) }); @@ -853,7 +1390,7 @@ impl ExecutionManager { AsyncMessage::Int(enq, _our) => { (Box::new(OrdinaryTransactionExecutor::new(config)), Some(enq.message())) } - AsyncMessage::New(env, _prev_tr_cell) => { + AsyncMessage::New(env, _prev_tr_cell, _created_lt) => { (Box::new(OrdinaryTransactionExecutor::new(config)), Some(env.message())) } AsyncMessage::Recover(msg) | AsyncMessage::Mint(msg) | AsyncMessage::Ext(msg) => { @@ -872,19 +1409,28 @@ impl ExecutionManager { async fn wait_transaction(&mut self, collator_data: &mut CollatorData) -> Result<()> { log::trace!("{}: wait_transaction", self.collated_block_descr); let wait_op = self.wait_tr.wait(&mut self.receive_tr, false).await; - if let Some(Some((new_msg, transaction_res))) = wait_op { - self.finalize_transaction(new_msg, transaction_res, collator_data)?; + if let Some(Some((new_msg, transaction_res, tx_last_lt))) = wait_op { + // we can safely decrease parallel_msgs_counter because + // sender sends some until parallel processing not cancelled + let account_id = self.finalize_transaction(&new_msg, transaction_res, tx_last_lt, collator_data)?; + // decrease account msgs counter to control parallel processing limits + self.parallel_msgs_counter.sub_account_msgs_counter(account_id).await; + + // mark message as processed + self.set_msg_processed(new_msg.0); } Ok(()) } fn finalize_transaction( &mut self, - new_msg: Arc, + new_msg_sync: &AsyncMessageSync, transaction_res: Result, + tx_last_lt: u64, collator_data: &mut CollatorData - ) -> Result<()> { - if let AsyncMessage::Ext(ref msg) = new_msg.deref() { + ) -> Result { + let AsyncMessageSync(msg_sync_key, new_msg) = new_msg_sync; + if let AsyncMessage::Ext(ref msg) = new_msg { let msg_id = msg.serialize()?.repr_hash(); let account_id = msg.int_dst_account_id().unwrap_or_default(); if let Err(err) = transaction_res { @@ -893,8 +1439,8 @@ impl ExecutionManager { "{}: account {:x} rejected inbound external message {:x}, by reason: {}", self.collated_block_descr, account_id, msg_id, err ); - collator_data.rejected_ext_messages.push((msg_id, err.to_string())); - return Ok(()) + collator_data.add_rejected_ext_message_to_buffer(*msg_sync_key, (msg_id, err.to_string())); + return Ok(account_id) } else { log::debug!( target: EXT_MESSAGES_TRACE_TARGET, @@ -905,24 +1451,30 @@ impl ExecutionManager { } } let tr = transaction_res?; + let account_id = tr.account_id().clone(); let tr_cell = tr.serialize()?; log::trace!("{}: finalize_transaction {} with hash {:x}, {:x}", self.collated_block_descr, tr.logical_time(), tr_cell.repr_hash(), tr.account_id()); - let in_msg_opt = match new_msg.deref() { + let in_msg_opt = match new_msg { AsyncMessage::Int(enq, our) => { let in_msg = InMsg::final_msg(enq.envelope_cell(), tr_cell.clone(), enq.fwd_fee_remaining().clone()); if *our { let out_msg = OutMsg::dequeue_immediate(enq.envelope_cell(), in_msg.serialize()?); - collator_data.add_out_msg_to_block(enq.message_hash(), &out_msg)?; - collator_data.del_out_msg_from_state(&enq.out_msg_key())?; + let msg_hash = enq.message_hash(); + let prev_out_msg_slice_opt = collator_data.add_out_msg_to_block(msg_hash.clone(), &out_msg)?; + collator_data.add_out_msg_descr_to_history(*msg_sync_key, msg_hash, prev_out_msg_slice_opt); + collator_data.del_out_queue_msg_with_history(*msg_sync_key, enq.out_msg_key(), false)?; } Some(in_msg) } - AsyncMessage::New(env, prev_tr_cell) => { + AsyncMessage::New(env, prev_tr_cell, _created_lt) => { let env_cell = env.inner().serialize()?; let in_msg = InMsg::immediate(env_cell.clone(), tr_cell.clone(), env.fwd_fee_remaining().clone()); let out_msg = OutMsg::immediate(env_cell, prev_tr_cell.clone(), in_msg.serialize()?); - collator_data.add_out_msg_to_block(env.message_hash(), &out_msg)?; + let msg_hash = env.message_hash(); + let prev_out_msg_slice_opt = collator_data.add_out_msg_to_block(msg_hash.clone(), &out_msg)?; + collator_data.add_out_msg_descr_to_history(*msg_sync_key, msg_hash, prev_out_msg_slice_opt); + collator_data.del_out_queue_msg_with_history(*msg_sync_key, env.out_msg_key(), true)?; Some(in_msg) } AsyncMessage::Mint(msg) | @@ -947,18 +1499,157 @@ impl ExecutionManager { tr.in_msg_cell().unwrap_or_default().repr_hash() ); } - collator_data.new_transaction(&tr, tr_cell, in_msg_opt.as_ref())?; - collator_data.update_lt(self.max_lt.load(Ordering::Relaxed)); + collator_data.new_transaction(&tr, tr_cell, in_msg_opt.as_ref(), *msg_sync_key)?; + + collator_data.add_tx_last_lt_to_buffer(*msg_sync_key, tx_last_lt); - match new_msg.deref() { - AsyncMessage::Mint(_) => collator_data.mint_msg = in_msg_opt, - AsyncMessage::Recover(_) => collator_data.recover_create_msg = in_msg_opt, - AsyncMessage::Copyleft(_) => collator_data.copyleft_msgs.push(in_msg_opt.ok_or_else(|| error!("Can't unwrap `in_msg_opt`"))?), + match new_msg { + AsyncMessage::Mint(_) => collator_data.add_mint_msg_to_buffer(*msg_sync_key, in_msg_opt), + AsyncMessage::Recover(_) => collator_data.add_recover_create_msg_to_buffer(*msg_sync_key, in_msg_opt), + AsyncMessage::Copyleft(_) => collator_data.add_copyleft_msg_to_buffer(*msg_sync_key, in_msg_opt.ok_or_else(|| error!("Can't unwrap `in_msg_opt`"))?), _ => () } + + // Will not support history. When parallel collation cancelled + // no new msgs can be processed so we do not need to check limits anymore collator_data.block_full |= !collator_data.block_limit_status.fits(ParamLimitIndex::Normal); + Ok(account_id) + } + + /// Actually the length of messages queue + fn get_next_msg_sync_key(&self) -> usize { + self.msgs_queue.len() + } + fn append_msgs_queue(&mut self, msg_sync_key: usize, account_addr: &AccountId, msg_hash_opt: Option) { + self.msgs_queue.insert(msg_sync_key, (account_addr.clone(), false, msg_hash_opt)); + } + fn set_msg_processed(&mut self, msg_sync_key: usize) { + if let Some(entry) = self.msgs_queue.get_mut(msg_sync_key) { + entry.1 = true; + self.accounts_processed_msgs + .entry(entry.0.clone()) + .and_modify(|list| list.push(msg_sync_key)) + .or_insert([msg_sync_key].into()); + } + } + fn revert_last_account_processed_msg(&mut self, account_addr: &AccountId) { + if let Some(list) = self.accounts_processed_msgs.get_mut(account_addr) { + list.pop(); + } + } + + fn accounts_processed_msgs(&self) -> &HashMap> { + &self.accounts_processed_msgs + } + fn get_last_processed_msg_sync_key<'a>( + accounts_processed_msgs: &'a HashMap>, + for_account_addr: &'a AccountId, + ) -> Option<&'a usize> { + if let Some(entry) = accounts_processed_msgs.get(for_account_addr) { + entry.last() + } else { + None + } + } + + /// Signal to cancellation_token due to a finalizing timeout + pub fn cancel_parallel_processing(&mut self) { + self.cancellation_token.cancel(); + } + /// When cancellation_token was cancelled due to a finalizing timeout + pub fn is_parallel_processing_cancelled(&self) -> bool { + self.cancellation_token.is_cancelled() + } + + fn commit_processed_msgs_changes(&mut self, collator_data: &mut CollatorData) -> Result<()> { + // revert processed messages which going after first unprocessed + let mut msgs_to_revert = vec![]; + let mut msgs_to_revert_last_proc_int = vec![]; + let mut found_first_unprocessed = false; + for msg_sync_key in 0..self.msgs_queue.len() { + if let Some((account_addr, processed, msg_hash)) = self.msgs_queue.get(msg_sync_key) { + if *processed { + // collect all processed messages which going after first unprocessed + if found_first_unprocessed { + msgs_to_revert.push((account_addr.clone(), msg_sync_key, msg_hash.clone())); + msgs_to_revert_last_proc_int.push(msg_sync_key); + } + } else { + if !found_first_unprocessed { + found_first_unprocessed = true; + } + msgs_to_revert_last_proc_int.push(msg_sync_key); + } + } + } + metrics::gauge!("reverted_transactions", msgs_to_revert.len() as f64); + for (account_addr, msg_sync_key, msg_hash) in msgs_to_revert.into_iter().rev() { + if let Some(msg_info) = self.msgs_queue.get_mut(msg_sync_key) { + msg_info.1 = false; + } + self.revert_msg_changes(collator_data, &msg_sync_key, &account_addr)?; + log::debug!( + "{}: reverted changes from message {:x} (sync_key: {}) on account {:x}", + self.collated_block_descr, msg_hash.unwrap_or_default(), msg_sync_key, account_addr, + ); + } + for msg_sync_key in msgs_to_revert_last_proc_int { + collator_data.revert_last_proc_int_msg_by_src_msg(&msg_sync_key); + } + + // commit all not reverted changes + self.commit_not_reverted_changes(collator_data)?; + + log::debug!("{}: all not reverted account changes committed", self.collated_block_descr); + + Ok(()) + } + fn revert_msg_changes( + &mut self, + collator_data: &mut CollatorData, + msg_sync_key: &usize, + account_addr: &AccountId, + ) -> Result<()> { + collator_data.execute_count -= 1; + + collator_data.revert_in_msgs_descr_by_src_msg(msg_sync_key)?; + collator_data.revert_out_msgs_descr_by_src_msg(msg_sync_key)?; + collator_data.revert_accepted_ext_message_by_src_msg(msg_sync_key); + collator_data.revert_rejected_ext_message_by_src_msg(msg_sync_key); + collator_data.revert_del_out_queue_msg(msg_sync_key)?; + collator_data.revert_add_out_queue_msgs(msg_sync_key)?; + collator_data.revert_new_messages_by_src_msg(msg_sync_key); + collator_data.revert_mint_msg_by_src_msg(msg_sync_key); + collator_data.revert_recover_create_msg_by_src_msg(msg_sync_key); + collator_data.revert_copyleft_msg_by_src_msg(msg_sync_key); + collator_data.revert_tx_last_lt_by_src_msg(msg_sync_key); + + self.revert_last_account_processed_msg(account_addr); + + Ok(()) + } + fn commit_not_reverted_changes(&mut self, collator_data: &mut CollatorData) -> Result<()> { + collator_data.commit_in_msgs_descr_by_src_msg(); + collator_data.commit_out_msgs_descr_by_src_msg(); + collator_data.commit_accepted_ext_messages(); + collator_data.commit_rejected_ext_messages(); + collator_data.commit_del_out_queue_msgs()?; + collator_data.commit_add_out_queue_msgs()?; + collator_data.commit_new_messages(); + + collator_data.commit_mint_msg(); + collator_data.commit_recover_create_msg(); + collator_data.commit_copyleft_msgs(); + + collator_data.commit_last_proc_int_msg()?; + + // save max lt + if let Some(max_lt) = collator_data.commit_tx_last_lt() { + self.max_lt = max_lt; + } + Ok(()) } } @@ -983,6 +1674,11 @@ pub struct Collator { started: Instant, stop_flag: Arc, + + finalize_parallel_timeout_ms: u32, + + #[cfg(test)] + test_msg_process_sleep: u64, } impl Collator { @@ -994,7 +1690,7 @@ impl Collator { created_by: UInt256, engine: Arc, rand_seed: Option, - collator_settings: CollatorSettings + collator_settings: CollatorSettings, ) -> Result { log::debug!( @@ -1065,6 +1761,7 @@ impl Collator { root_hash: UInt256::default(), file_hash: UInt256::default(), }, + finalize_parallel_timeout_ms: engine.collator_config().get_finalize_parallel_timeout_ms(), engine, shard, min_mc_seqno, @@ -1079,9 +1776,16 @@ impl Collator { collator_settings, started: Instant::now(), stop_flag: Arc::new(AtomicBool::new(false)), + #[cfg(test)] + test_msg_process_sleep: 0, }) } + #[cfg(test)] + pub fn set_test_msg_process_sleep(&mut self, sleep_timeout: u64) { + self.test_msg_process_sleep = sleep_timeout; + } + pub async fn collate(mut self) -> Result<(BlockCandidate, ShardStateUnsplit)> { log::info!( "{}: COLLATE min_mc_seqno = {}, prev_blocks_ids: {} {}", @@ -1250,6 +1954,7 @@ impl Collator { usage_tree, &prev_data, is_masterchain, + self.collated_block_descr.clone(), )?; if !self.shard.is_masterchain() { let (now_upper_limit, before_split, _accept_msgs) = check_this_shard_mc_info( @@ -1317,27 +2022,23 @@ impl Collator { // loads out queues from neighbors and out queue of current shard let mut output_queue_manager = self.request_neighbor_msg_queues(mc_data, prev_data, collator_data).await?; - let mut out_queue_cleaned_partial = false; - let mut out_queue_clean_deleted = 0; + // indicates if initial out queue clean was partial + let mut initial_out_queue_clean_partial = false; + // stores the deleted messages count during the inital clean + let mut initial_out_queue_clean_deleted_count = 0; + // delete delivered messages from output queue for a limited time if !self.after_split { - // delete delivered messages from output queue for a limited time - let now = std::time::Instant::now(); - let cc = self.engine.collator_config(); - let clean_timeout_nanos = (cc.cutoff_timeout_ms as i128) * 1_000_000 * (cc.clean_timeout_percentage_points as i128) / 1000; - let processed; - (out_queue_cleaned_partial, processed, out_queue_clean_deleted) = - self.clean_out_msg_queue(mc_data, collator_data, &mut output_queue_manager, clean_timeout_nanos, cc.optimistic_clean_percentage_points).await?; - let elapsed = now.elapsed().as_millis(); + let clean_timeout_nanos = self.get_initial_clean_timeout(); + let elapsed; + (initial_out_queue_clean_partial, initial_out_queue_clean_deleted_count, elapsed) = + self.clean_out_msg_queue(mc_data, collator_data, &mut output_queue_manager, + clean_timeout_nanos, self.engine.collator_config().optimistic_clean_percentage_points, "initial", + ).await?; log::debug!("{}: TIME: clean_out_msg_queue initial {}ms;", self.collated_block_descr, elapsed); - let labels = [("shard", self.shard.to_string()), ("step", "initial".to_owned())]; - metrics::gauge!("clean_out_msg_queue_partial", if out_queue_cleaned_partial { 1.0 } else { 0.0 }, &labels); - metrics::gauge!("clean_out_msg_queue_elapsed", elapsed as f64, &labels); - metrics::gauge!("clean_out_msg_queue_processed", processed as f64, &labels); - metrics::gauge!("clean_out_msg_queue_deleted", out_queue_clean_deleted as f64, &labels); } else { - log::debug!("{}: TIME: clean_out_msg_queue initial SKIPPED because of after_split block", - self.collated_block_descr); + log::debug!("{}: TIME: clean_out_msg_queue initial SKIPPED because of after_split block", self.collated_block_descr); + self.push_out_queue_clean_empty_metrics("initial"); } // copy out msg queue from next state which is cleared compared to previous @@ -1347,6 +2048,14 @@ impl Collator { // compute created / minted / recovered / from_prev_blk self.update_value_flow(mc_data, &prev_data, collator_data)?; + // closure to check the finalize timeout for parallel transactions + let collation_started = self.started.clone(); + let finalize_parallel_timeout_ms = self.finalize_parallel_timeout_ms; + let check_finilize_parallel_timeout_closure = move || ( + collation_started.elapsed().as_millis() as u32 > finalize_parallel_timeout_ms, + finalize_parallel_timeout_ms, + ); + let mut exec_manager = ExecutionManager::new( collator_data.gen_utime(), collator_data.start_lt()?, @@ -1356,10 +2065,15 @@ impl Collator { mc_data.libraries()?.clone(), collator_data.config.clone(), self.engine.collator_config().max_collate_threads as usize, + self.engine.collator_config().max_collate_msgs_queue_on_account as usize, self.collated_block_descr.clone(), self.debug, + Box::new(check_finilize_parallel_timeout_closure), )?; + #[cfg(test)] + exec_manager.set_test_msg_process_sleep(self.test_msg_process_sleep); + // tick & special transactions if self.shard.is_masterchain() { self.create_ticktock_transactions( @@ -1399,10 +2113,11 @@ impl Collator { metrics::histogram!("collator_process_inbound_external_messages_time", now.elapsed()); // process newly-generated messages (if space&gas left) - // (if we were unable to process all inbound messages, all new messages must be queued) + // (all new messages were queued already, we remove them if we process them) let now = std::time::Instant::now(); - self.process_new_messages(!collator_data.inbound_queues_empty, prev_data, - collator_data, &mut exec_manager).await?; + if collator_data.inbound_queues_empty { + self.process_new_messages(prev_data, collator_data, &mut exec_manager).await?; + } log::debug!("{}: TIME: process_new_messages {}ms;", self.collated_block_descr, now.elapsed().as_millis()); metrics::histogram!("collator_process_new_messages_time", now.elapsed()); @@ -1411,41 +2126,32 @@ impl Collator { self.collated_block_descr); } + // perform secondary out queue clean + // if block limits not reached, inital clean was partial and not messages were deleted let clean_remaining_timeout_nanos = self.get_remaining_clean_time_limit_nanos(); - - if !collator_data.block_full && out_queue_cleaned_partial && out_queue_clean_deleted == 0 && clean_remaining_timeout_nanos > 10_000_000 { + if self.check_should_perform_secondary_clean( + collator_data.block_full, + initial_out_queue_clean_partial, + initial_out_queue_clean_deleted_count, + clean_remaining_timeout_nanos, + ) { if !self.after_split { - // we have collation time left and out msg queue was not fully processed - // so will try to clean more for a remaining time only by random algorithm - let now = std::time::Instant::now(); - // set current out msg queue to manager to process new clean *output_queue_manager.next_mut() = std::mem::take(&mut collator_data.out_msg_queue_info); - let processed; - (out_queue_cleaned_partial, processed, out_queue_clean_deleted) = - self.clean_out_msg_queue(mc_data, collator_data, &mut output_queue_manager, clean_remaining_timeout_nanos, 0).await?; - let elapsed = now.elapsed().as_millis(); + let (_, _, elapsed) = + self.clean_out_msg_queue(mc_data, collator_data, &mut output_queue_manager, clean_remaining_timeout_nanos, 0, "remaining").await?; log::debug!("{}: TIME: clean_out_msg_queue remaining {}ms;", self.collated_block_descr, elapsed); - let labels = [("shard", self.shard.to_string()), ("step", "remaining".to_owned())]; - metrics::gauge!("clean_out_msg_queue_partial", if out_queue_cleaned_partial { 1.0 } else { 0.0 }, &labels); - metrics::gauge!("clean_out_msg_queue_elapsed", elapsed as f64, &labels); - metrics::gauge!("clean_out_msg_queue_processed", processed as f64, &labels); - metrics::gauge!("clean_out_msg_queue_deleted", out_queue_clean_deleted as f64, &labels); - + // copy out msg queue from manager after clean collator_data.out_msg_queue_info = output_queue_manager.take_next(); collator_data.out_msg_queue_info.forced_fix_out_queue()?; } else { - log::debug!("{}: TIME: clean_out_msg_queue remaining SKIPPED because of after_split block", - self.collated_block_descr); + log::debug!("{}: TIME: clean_out_msg_queue remaining SKIPPED because of after_split block", self.collated_block_descr); + self.push_out_queue_clean_empty_metrics("remaining"); } } else { - let labels = [("shard", self.shard.to_string()), ("step", "remaining".to_owned())]; - metrics::gauge!("clean_out_msg_queue_partial", 0.0, &labels); - metrics::gauge!("clean_out_msg_queue_elapsed", 0.0, &labels); - metrics::gauge!("clean_out_msg_queue_processed", 0.0, &labels); - metrics::gauge!("clean_out_msg_queue_deleted", 0.0, &labels); + self.push_out_queue_clean_empty_metrics("remaining"); } // split prepare / split install @@ -1457,10 +2163,6 @@ impl Collator { true, mc_data, prev_data, collator_data, &mut exec_manager).await?; } - // process newly-generated messages (only by including them into output queue) - self.process_new_messages( - true, prev_data, collator_data, &mut exec_manager).await?; - // If block is empty - stop collation to try one more time (may be there are some new messages) let cc = self.engine.collator_config(); if !self.after_split && @@ -1498,11 +2200,18 @@ impl Collator { output_queue_manager: &mut MsgQueueManager, clean_timeout_nanos: i128, optimistic_clean_percentage_points: u32, - ) -> Result<(bool, i32, i32)> { + clean_step: &str, + ) -> Result<(bool, i32, u128)> { log::debug!("{}: clean_out_msg_queue", self.collated_block_descr); let short = mc_data.config().has_capability(GlobalCapabilities::CapShortDequeue); - output_queue_manager.clean_out_msg_queue(clean_timeout_nanos, optimistic_clean_percentage_points, |message, root| { + let now = std::time::Instant::now(); + + let ( + out_queue_cleaned_partial, + messages_processed, + out_queue_clean_deleted_count, + ) = output_queue_manager.clean_out_msg_queue(clean_timeout_nanos, optimistic_clean_percentage_points, |message, root| { self.check_stop_flag()?; if let Some((enq, deliver_lt)) = message { log::trace!("{}: dequeue message: {:x}", self.collated_block_descr, enq.message_hash()); @@ -1515,7 +2224,25 @@ impl Collator { collator_data.block_limit_status.register_out_msg_queue_op(root, &collator_data.usage_tree, true)?; Ok(true) } - }).await + }).await?; + + let elapsed = now.elapsed().as_millis(); + + let labels = [("shard", self.shard.to_string()), ("step", clean_step.to_owned())]; + metrics::gauge!("clean_out_msg_queue_partial", if out_queue_cleaned_partial { 1.0 } else { 0.0 }, &labels); + metrics::gauge!("clean_out_msg_queue_elapsed", elapsed as f64, &labels); + metrics::gauge!("clean_out_msg_queue_processed", messages_processed as f64, &labels); + metrics::gauge!("clean_out_msg_queue_deleted", out_queue_clean_deleted_count as f64, &labels); + + Ok((out_queue_cleaned_partial, out_queue_clean_deleted_count, elapsed)) + } + + fn push_out_queue_clean_empty_metrics(&self, clean_step: &str) { + let labels = [("shard", self.shard.to_string()), ("step", clean_step.to_owned())]; + metrics::gauge!("clean_out_msg_queue_partial", 0.0, &labels); + metrics::gauge!("clean_out_msg_queue_elapsed", 0.0, &labels); + metrics::gauge!("clean_out_msg_queue_processed", 0.0, &labels); + metrics::gauge!("clean_out_msg_queue_deleted", 0.0, &labels); } // @@ -2273,7 +3000,9 @@ impl Collator { "{}: message {:x}, lt: {}, enq lt: {}", self.collated_block_descr, key, created_lt, enq.enqueued_lt() ); - collator_data.update_last_proc_int_msg((created_lt, enq.message_hash()))?; + + // Do not need to update last processed int message LT_HASH here + // if it is already processed or not sent to us if collator_data.out_msg_queue_info.already_processed(&enq)? { log::trace!( "{}: message {:x} has been already processed by us before, skipping", @@ -2283,13 +3012,17 @@ impl Collator { self.check_inbound_internal_message(&key, &enq, created_lt, block_id.shard()) .map_err(|err| error!("problem processing internal inbound message \ with hash {:x} : {}", key.hash, err))?; + let src_addr = enq.message().src().unwrap_or_default().address(); let our = self.shard.contains_full_prefix(&enq.cur_prefix()); let to_us = self.shard.contains_full_prefix(&enq.dst_prefix()); if to_us { let account_id = enq.dst_account_id()?; - log::debug!("{}: message {:x} sent to execution to account {:x}", self.collated_block_descr, key.hash, account_id); let msg = AsyncMessage::Int(enq, our); - exec_manager.execute(account_id, msg, prev_data, collator_data).await?; + let msg_sync_key = exec_manager.execute(account_id.clone(), msg, prev_data, collator_data).await?; + log::debug!( + "{}: int message {:x} (sync_key: {:?}) from {:x} sent to execution to account {:x}", + self.collated_block_descr, key.hash, msg_sync_key, src_addr, account_id, + ); } else { // println!("{:x} {:#}", key, enq); // println!("cur: {}, dst: {}", enq.cur_prefix(), enq.dst_prefix()); @@ -2301,7 +3034,7 @@ impl Collator { } } if collator_data.block_full { - log::debug!("{}: BLOCK FULL, stop processing internal messages", self.collated_block_descr); + log::debug!("{}: BLOCK FULL (>= Soft), stop processing internal messages", self.collated_block_descr); break } if self.check_cutoff_timeout() { @@ -2357,24 +3090,33 @@ impl Collator { return Ok(()) } + if exec_manager.is_parallel_processing_cancelled() { + log::debug!("{}: parallel processing cancelled, skipping processing of inbound external messages", self.collated_block_descr); + return Ok(()) + } + log::debug!("{}: process_inbound_external_messages", self.collated_block_descr); for (msg, id) in self.engine.get_external_messages_iterator(self.shard.clone()) { + if !collator_data.block_limit_status.fits(ParamLimitIndex::Soft) { + log::debug!("{}: BLOCK FULL (>= Medium), stop processing external messages", self.collated_block_descr); + break; + } + if self.check_cutoff_timeout() { + log::warn!("{}: TIMEOUT ({}ms) is elapsed, stop processing external messages", + self.collated_block_descr, self.engine.collator_config().cutoff_timeout_ms, + ); + break; + } let header = msg.ext_in_header().ok_or_else(|| error!("message {:x} \ is not external inbound message", id))?; if self.shard.contains_address(&header.dst)? { - if !collator_data.block_limit_status.fits(ParamLimitIndex::Soft) { - log::debug!("{}: BLOCK FULL, stop processing external messages", self.collated_block_descr); - break - } - if self.check_cutoff_timeout() { - log::warn!("{}: TIMEOUT is elapsed, stop processing external messages", - self.collated_block_descr); - break - } let (_, account_id) = header.dst.extract_std_address(true)?; let msg = AsyncMessage::Ext(msg.deref().clone()); - log::debug!("{}: message {:x} sent to execution", self.collated_block_descr, id); - exec_manager.execute(account_id, msg, prev_data, collator_data).await?; + let msg_sync_key = exec_manager.execute(account_id.clone(), msg, prev_data, collator_data).await?; + log::debug!( + "{}: ext message {:x} (sync_key: {:?}) sent to execution to account {:x}", + self.collated_block_descr, id, msg_sync_key, account_id, + ); } else { // usually node collates more than one shard, the message can belong another one, // so we can't postpone it @@ -2428,8 +3170,11 @@ impl Collator { } else { let (_, account_id) = header.dst.extract_std_address(true)?; let msg = AsyncMessage::Ext(msg.deref().clone()); - log::trace!("{}: remp message {:x} sent to execution", self.collated_block_descr, id); - exec_manager.execute(account_id, msg, prev_data, collator_data).await?; + let msg_sync_key = exec_manager.execute(account_id.clone(), msg, prev_data, collator_data).await?; + log::trace!( + "{}: remp message {:x} (sync_key: {:?}) sent to execution to account {:x}", + self.collated_block_descr, id, msg_sync_key, account_id, + ); } } else { log::warn!( @@ -2449,38 +3194,56 @@ impl Collator { async fn process_new_messages( &self, - mut enqueue_only: bool, prev_data: &PrevData, collator_data: &mut CollatorData, exec_manager: &mut ExecutionManager, ) -> Result<()> { + if exec_manager.is_parallel_processing_cancelled() { + log::debug!("{}: parallel processing cancelled, skipping processing of new messages", self.collated_block_descr); + return Ok(()) + } + log::debug!("{}: process_new_messages", self.collated_block_descr); let use_hypercube = !collator_data.config.has_capability(GlobalCapabilities::CapOffHypercube); - while !collator_data.new_messages.is_empty() { + let mut stop_processing = false; + while !stop_processing && !collator_data.new_messages.is_empty() { // In the iteration we execute only existing messages. // Newly generating messages will be executed next itaration (only after waiting). let mut new_messages = std::mem::take(&mut collator_data.new_messages); + log::debug!("{}: new_messages count: {}", self.collated_block_descr, new_messages.len()); // we can get sorted items somehow later while let Some(NewMessage{ lt_hash: (created_lt, hash), msg, tr_cell, prefix }) = new_messages.pop() { let info = msg.int_header().ok_or_else(|| error!("message is not internal"))?; - let fwd_fee = *info.fwd_fee(); - enqueue_only |= collator_data.block_full | self.check_cutoff_timeout(); - if enqueue_only || !self.shard.contains_address(&info.dst)? { - // everything was made in new_transaction + + if !collator_data.block_limit_status.fits(ParamLimitIndex::Soft) { + log::debug!("{}: BLOCK FULL (>= Medium), stop processing new messages", self.collated_block_descr); + stop_processing = true; + break; + } + if self.check_cutoff_timeout() { + log::warn!("{}: TIMEOUT ({}ms) is elapsed, stop processing new messages", + self.collated_block_descr, self.engine.collator_config().cutoff_timeout_ms, + ); + stop_processing = true; + break; + } + + if !self.shard.contains_address(&info.dst)? { + // skip msg if it is not to our shard } else { CHECK!(info.created_at.as_u32(), collator_data.gen_utime); - let key = OutMsgQueueKey::with_account_prefix(&prefix, hash.clone()); - collator_data.out_msg_queue_info.del_message(&key)?; - collator_data.enqueue_count -= 1; + let fwd_fee = *info.fwd_fee(); let env = MsgEnvelopeStuff::new(msg, &self.shard, fwd_fee, use_hypercube)?; let account_id = env.message().int_dst_account_id().unwrap_or_default(); - collator_data.update_last_proc_int_msg((created_lt, hash))?; - let msg = AsyncMessage::New(env, tr_cell); - log::debug!("{}: message {:x} sent to execution", self.collated_block_descr, key.hash); - exec_manager.execute(account_id, msg, prev_data, collator_data).await?; + let msg = AsyncMessage::New(env, tr_cell, created_lt); + let msg_sync_key = exec_manager.execute(account_id.clone(), msg, prev_data, collator_data).await?; + log::debug!( + "{}: new int message {:x} (sync_key: {:?}) sent to execution to account {:x}", + self.collated_block_descr, hash, msg_sync_key, account_id, + ); }; self.check_stop_flag()?; } @@ -2635,10 +3398,24 @@ impl Collator { let mut changed_accounts = HashMap::new(); let mut new_config_opt = None; let mut current_workchain_copyleft_rewards = CopyleftRewards::default(); + let accounts_processed_msgs = exec_manager.accounts_processed_msgs().clone(); for (account_id, (sender, handle)) in exec_manager.changed_accounts.drain() { std::mem::drop(sender); let mut shard_acc = handle.await .map_err(|err| error!("account {:x} thread didn't finish: {}", account_id, err))??; + + // commit account state by last processed msg before the canceling of parallel collation + shard_acc = match ExecutionManager::get_last_processed_msg_sync_key( + &accounts_processed_msgs, + &account_id, + ) { + None => continue, + Some(msg_sync_key) => match shard_acc.commit(*msg_sync_key)? { + None => continue, + Some(committed) => committed, + } + }; + let account = shard_acc.read_account()?; if let Some(addr) = &config_addr { if addr == &account_id { @@ -2653,7 +3430,7 @@ impl Collator { if !acc_block.transactions().is_empty() { accounts.insert(&acc_block)?; } - current_workchain_copyleft_rewards.merge_rewards(shard_acc.copyleft_rewards())?; + current_workchain_copyleft_rewards.merge_rewards(shard_acc.copyleft_rewards()?)?; changed_accounts.insert(account_id, shard_acc); } @@ -2883,23 +3660,23 @@ impl Collator { if workchain_id != -1 && (collator_data.dequeue_count > 0 || collator_data.enqueue_count > 0 || collator_data.in_msg_count > 0 || collator_data.out_msg_count > 0 || collator_data.execute_count > 0 - || collator_data.transit_count > 0 + || collator_data.transit_count > 0 || changed_accounts.len() > 0 ) { log::debug!( "{}: finalize_block finished: dequeue_count: {}, enqueue_count: {}, in_msg_count: {}, out_msg_count: {}, \ - execute_count: {}, transit_count: {}", + execute_count: {}, transit_count: {}, changed_accounts: {}", self.collated_block_descr, collator_data.dequeue_count, collator_data.enqueue_count, collator_data.in_msg_count, collator_data.out_msg_count, collator_data.execute_count, - collator_data.transit_count, + collator_data.transit_count, changed_accounts.len(), ); } log::trace!( "{}: finalize_block finished: dequeue_count: {}, enqueue_count: {}, in_msg_count: {}, out_msg_count: {}, \ - execute_count: {}, transit_count: {}, data len: {}", + execute_count: {}, transit_count: {}, changed_accounts: {}, data len: {}", self.collated_block_descr, collator_data.dequeue_count, collator_data.enqueue_count, collator_data.in_msg_count, collator_data.out_msg_count, collator_data.execute_count, - collator_data.transit_count, candidate.data.len(), + collator_data.transit_count, changed_accounts.len(), candidate.data.len(), ); Ok((candidate, new_state, exec_manager)) } @@ -3532,12 +4309,24 @@ impl Collator { self.started.elapsed().as_millis() as u32 > cutoff_timeout } + fn check_finilize_parallel_timeout(&self) -> (bool, u32) { + ( + self.started.elapsed().as_millis() as u32 > self.finalize_parallel_timeout_ms, + self.finalize_parallel_timeout_ms, + ) + } + fn get_remaining_cutoff_time_limit_nanos(&self) -> i128 { let cutoff_timeout_nanos = self.engine.collator_config().cutoff_timeout_ms as i128 * 1_000_000; let elapsed_nanos = self.started.elapsed().as_nanos() as i128; cutoff_timeout_nanos - elapsed_nanos } + fn get_initial_clean_timeout(&self) -> i128 { + let cc = self.engine.collator_config(); + (cc.cutoff_timeout_ms as i128) * 1_000_000 * (cc.clean_timeout_percentage_points as i128) / 1000 + } + fn get_remaining_clean_time_limit_nanos(&self) -> i128 { let remaining_cutoff_timeout_nanos = self.get_remaining_cutoff_time_limit_nanos(); let cc = self.engine.collator_config(); @@ -3545,6 +4334,16 @@ impl Collator { remaining_cutoff_timeout_nanos.min(max_secondary_clean_timeout_nanos) } + fn check_should_perform_secondary_clean( + &self, + block_full: bool, + prev_out_queue_cleaned_partial: bool, + prev_out_queue_clean_deleted_count: i32, + clean_timeout_nanos: i128, + ) -> bool { + !block_full && prev_out_queue_cleaned_partial && prev_out_queue_clean_deleted_count == 0 && clean_timeout_nanos >= 10_000_000 + } + fn check_stop_flag(&self) -> Result<()> { if self.stop_flag.load(Ordering::Relaxed) { fail!("Stop flag was set") @@ -3608,4 +4407,3 @@ pub fn report_collation_metrics( metrics::histogram!("gas_rate_collator", gas_rate as f64, &labels); metrics::histogram!("block_size", block_size as f64, &labels); } - diff --git a/src/validator/out_msg_queue.rs b/src/validator/out_msg_queue.rs index f71c9cda..7a9c4261 100644 --- a/src/validator/out_msg_queue.rs +++ b/src/validator/out_msg_queue.rs @@ -27,7 +27,7 @@ use ton_block::{ OutMsgQueueInfo, OutMsgQueue, OutMsgQueueKey, IhrPendingInfo, ProcessedInfo, ProcessedUpto, ProcessedInfoKey, ShardHashes, AccountIdPrefixFull, - HashmapAugType, ShardStateUnsplit, + HashmapAugType, ShardStateUnsplit, EnqueuedMsg, }; use ton_types::{ error, fail, BuilderData, Cell, LabelReader, SliceData, IBitstring, Result, UInt256, @@ -590,13 +590,14 @@ impl OutMsgQueueInfoStuff { self.out_queue_mut()?.set(&key, enq.enqueued(), &enq.created_lt()) } - pub fn del_message(&mut self, key: &OutMsgQueueKey) -> Result<()> { + pub fn del_message(&mut self, key: &OutMsgQueueKey) -> Result { let labels = [("shard", self.shard().to_string())]; metrics::counter!("out_msg_queue_del", 1, &labels); - if self.out_queue_mut()?.remove(SliceData::load_bitstring(key.write_to_new_cell()?)?)?.is_none() { + if let Some(mut msg_data) = self.out_queue_mut()?.remove(SliceData::load_bitstring(key.write_to_new_cell()?)?)? { + EnqueuedMsg::construct_from(&mut msg_data) + } else { fail!("error deleting from out_msg_queue dictionary: {:x}", key) } - Ok(()) } // remove all messages which are not from new_shard @@ -1125,7 +1126,7 @@ impl MsgQueueManager { pub async fn clean_out_msg_queue( &mut self, clean_timeout_nanos: i128, - optimistic_clean_percentage_points: u32, + ordered_clean_percentage_points: u32, mut on_message: impl FnMut(Option<(MsgEnqueueStuff, u64)>, Option<&Cell>) -> Result ) -> Result<(bool, i32, i32)> { let timer = std::time::Instant::now(); @@ -1149,7 +1150,7 @@ impl MsgQueueManager { let mut deleted = 0; let mut skipped = 0; - let ordered_cleaning_timeout_nanos = clean_timeout_nanos * (optimistic_clean_percentage_points as i128) / 1000; + let ordered_cleaning_timeout_nanos = clean_timeout_nanos * (ordered_clean_percentage_points as i128) / 1000; let random_cleaning_timeout_nanos = clean_timeout_nanos - ordered_cleaning_timeout_nanos; log::debug!( @@ -1163,29 +1164,17 @@ impl MsgQueueManager { if ordered_cleaning_timeout_nanos > 0 { let max_processed_lt = self.get_max_processed_lt_from_queue_info(); - let mut clean_timeout_check = 50_000_000; - let max_clean_timeout_check = 550_000_000; - partial = out_msg_queue_cleaner::hashmap_filter_ordered_by_lt_hash( &mut queue, max_processed_lt, ordered_cleaning_timeout_nanos, |node_obj| { if block_full { - log::debug!("{}: BLOCK FULL when ordered cleaning output queue, cleanup is partial", self.block_descr); + log::debug!("{}: BLOCK FULL (>= Soft) when ordered cleaning output queue, cleanup is partial", self.block_descr); partial = true; return Ok(HashmapFilterResult::Stop); } - let elapsed_nanos = timer.elapsed().as_nanos() as i128; - if clean_timeout_check <= max_clean_timeout_check && elapsed_nanos >= clean_timeout_check { - log::debug!( - "{}: clean_out_msg_queue: ordered cleaning time elapsed {} nanos: processed = {}, deleted = {}, skipped = {}", - self.block_descr, elapsed_nanos, deleted + skipped, deleted, skipped, - ); - clean_timeout_check += 50_000_000; - } - let lt = node_obj.lt(); let mut data_and_refs = node_obj.data_and_refs()?; let enq = MsgEnqueueStuff::construct_from(&mut data_and_refs, lt)?; @@ -1236,28 +1225,15 @@ impl MsgQueueManager { let random_clean_timer = std::time::Instant::now(); - let mut clean_timeout_check = 50_000_000; - let max_clean_timeout_check = 550_000_000; - queue.hashmap_filter(|_key, mut slice| { if block_full { - log::debug!("{}: BLOCK FULL when random cleaning output queue, cleanup is partial", self.block_descr); + log::debug!("{}: BLOCK FULL (>= Soft) when random cleaning output queue, cleanup is partial", self.block_descr); partial = true; return Ok(HashmapFilterResult::Stop) } - let elapsed_nanos = random_clean_timer.elapsed().as_nanos() as i128; - - if clean_timeout_check <= max_clean_timeout_check && elapsed_nanos >= clean_timeout_check { - log::debug!( - "{}: clean_out_msg_queue: random cleaning time elapsed {} nanos: processed = {}, deleted = {}, skipped = {}", - self.block_descr, elapsed_nanos, - random_deleted + random_skipped, random_deleted, random_skipped, - ); - clean_timeout_check += 50_000_000; - } - // stop when reached the time limit + let elapsed_nanos = random_clean_timer.elapsed().as_nanos() as i128; if elapsed_nanos >= random_cleaning_timeout_nanos { log::debug!( "{}: clean_out_msg_queue: stopped random cleaning output queue because of time elapsed {} nanos >= {} nanos limit", diff --git a/src/validator/out_msg_queue_cleaner.rs b/src/validator/out_msg_queue_cleaner.rs index 5192b0eb..efe67cf8 100644 --- a/src/validator/out_msg_queue_cleaner.rs +++ b/src/validator/out_msg_queue_cleaner.rs @@ -762,23 +762,6 @@ impl HashmapOrderedFilterCursor { } } - // stop pocessing when max_lt reached - #[cfg(not(feature = "only_sorted_clean"))] - if current.node_obj_ref().lt() == self.max_lt { - log::debug!( - "clean_out_msg_queue: hop {}: stop processing when current node (bottom_bit_len = {}, key = {}) lt {} == max_lt {}, elapsed = {} nanos", - self.hops_counter, - current.bottom_bit_len(), - current.node_obj_ref().key_hex(), - current.node_obj_ref().lt(), - self.max_lt, - self.timer.elapsed().as_nanos(), - ); - - self.stop_processing = true; - self.stopped_by_max_lt = true; - } - current.is_processed = true; self.processed_count += 1; @@ -1263,7 +1246,8 @@ where cursor_creation_elapsed, ); - let partial = filter_cursor.stop_processing | filter_cursor.cancel_processing; + let partial = (filter_cursor.stop_processing | filter_cursor.cancel_processing) + && !filter_cursor.stopped_by_max_lt; Ok(partial) } diff --git a/storage/Cargo.toml b/storage/Cargo.toml index ebe24f26..28702564 100644 --- a/storage/Cargo.toml +++ b/storage/Cargo.toml @@ -22,10 +22,10 @@ serde_derive = '1.0.114' strum = '0.18.0' strum_macros = '0.18.0' tokio = { features = [ 'fs', 'rt-multi-thread' ], version = '1.5' } -adnl = { git = 'https://github.com/tonlabs/ever-adnl.git', tag = '0.9.15' } +adnl = { git = 'https://github.com/tonlabs/ever-adnl.git', tag = '0.9.18' } lockfree = { git = 'https://github.com/tonlabs/lockfree.git' } -ton_api = { git = 'https://github.com/tonlabs/ever-tl.git', package = 'ton_api', tag = '0.3.53' } -ton_block = { git = 'https://github.com/tonlabs/ever-block.git', tag = '1.9.117' } +ton_api = { git = 'https://github.com/tonlabs/ever-tl.git', package = 'ton_api', tag = '0.3.55' } +ton_block = { git = 'https://github.com/tonlabs/ever-block.git', tag = '1.9.119' } ton_types = { git = 'https://github.com/tonlabs/ever-types.git', tag = '2.0.31' } [build-dependencies] diff --git a/validator-session/Cargo.toml b/validator-session/Cargo.toml index ac00a422..fccbf941 100644 --- a/validator-session/Cargo.toml +++ b/validator-session/Cargo.toml @@ -16,10 +16,10 @@ metrics = '0.21.0' metrics-core = '0.5' rand = '0.8' catchain = { path = '../catchain' } -overlay = { git = 'https://github.com/tonlabs/ever-overlay.git', tag = '0.7.11' } +overlay = { git = 'https://github.com/tonlabs/ever-overlay.git', tag = '0.7.14' } storage = { path = '../storage' } -ton_api = { git = 'https://github.com/tonlabs/ever-tl.git', package = 'ton_api', tag = '0.3.53' } -ton_block = { git = 'https://github.com/tonlabs/ever-block.git', tag = '1.9.117' } +ton_api = { git = 'https://github.com/tonlabs/ever-tl.git', package = 'ton_api', tag = '0.3.55' } +ton_block = { git = 'https://github.com/tonlabs/ever-block.git', tag = '1.9.119' } ton_types = { git = 'https://github.com/tonlabs/ever-types.git', tag = '2.0.31' } [features]