diff --git a/CHANGELOG.md b/CHANGELOG.md index 0d431c1d..4d67d8a0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ All notable changes to this project will be documented in this file. +## Version 0.59.12 + +- Use modern collator with aborting procedure + ## Version 0.59.11 - Fix for broken shard merge diff --git a/Cargo.toml b/Cargo.toml index f8c0b2cd..14fd765a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ build = 'common/build/build.rs' edition = '2021' name = 'ever-node' -version = '0.59.11' +version = '0.59.12' [workspace] members = [ 'storage' ] diff --git a/src/config.rs b/src/config.rs index 65e98fd2..48b21272 100644 --- a/src/config.rs +++ b/src/config.rs @@ -83,6 +83,7 @@ pub struct CollatorConfig { pub optimistic_clean_percentage_points: u32, pub max_secondary_clean_timeout_percentage_points: u32, pub max_collate_threads: u32, + pub max_collate_msgs_queue_on_account: u32, pub retry_if_empty: bool, pub finalize_empty_after_ms: u32, pub empty_collation_sleep_ms: u32, @@ -99,6 +100,7 @@ impl Default for CollatorConfig { optimistic_clean_percentage_points: 1000, // 1.000 = 100% = 150ms max_secondary_clean_timeout_percentage_points: 350, // 0.350 = 35% = 350ms max_collate_threads: 10, + max_collate_msgs_queue_on_account: 3, retry_if_empty: false, finalize_empty_after_ms: 800, empty_collation_sleep_ms: 100, diff --git a/src/lib.rs b/src/lib.rs index 36973801..be8d446b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -39,10 +39,6 @@ mod shard_blocks; include!("../common/src/info.rs"); -#[cfg(feature = "tracing")] -pub mod jaeger; - -#[cfg(not(feature = "tracing"))] pub mod jaeger { pub fn init_jaeger(){} #[cfg(feature = "external_db")] diff --git a/src/main.rs b/src/main.rs index 436807cf..2d00c137 100644 --- a/src/main.rs +++ b/src/main.rs @@ -39,10 +39,6 @@ mod ext_messages; mod shard_blocks; -#[cfg(feature = "tracing")] -mod jaeger; - -#[cfg(not(feature = "tracing"))] mod jaeger { pub fn init_jaeger(){} #[cfg(feature = "external_db")] @@ -52,7 +48,7 @@ mod jaeger { use crate::{ config::TonNodeConfig, engine::{Engine, Stopper, EngineFlags}, - jaeger::init_jaeger, internal_db::restore::set_graceful_termination, + internal_db::restore::set_graceful_termination, validating_utils::supported_version }; #[cfg(feature = "external_db")] @@ -456,7 +452,7 @@ fn main() { .build() .expect("Can't create Validator tokio runtime"); - init_jaeger(); + jaeger::init_jaeger(); #[cfg(feature = "trace_alloc_detail")] thread::spawn( diff --git a/src/tests/test_shard_blocks.rs b/src/tests/test_shard_blocks.rs index 6460c990..6d96fbce 100644 --- a/src/tests/test_shard_blocks.rs +++ b/src/tests/test_shard_blocks.rs @@ -14,7 +14,7 @@ use super::*; use crate::test_helper::gen_master_state; use crate::collator_test_bundle::{create_block_handle_storage, create_engine_allocated}; -#[cfg(all(feature = "telemetry", not(feature = "fast_finality")))] +#[cfg(all(feature = "telemetry"))] use crate::collator_test_bundle::create_engine_telemetry; use std::{sync::{atomic::{AtomicU32, Ordering}, Arc}, collections::HashSet}; use storage::{block_handle_db::{BlockHandle, BlockHandleStorage}, types::BlockMeta}; diff --git a/src/types/accounts.rs b/src/types/accounts.rs index 95f16e42..9109107c 100644 --- a/src/types/accounts.rs +++ b/src/types/accounts.rs @@ -11,24 +11,40 @@ * limitations under the License. */ -use std::sync::{atomic::AtomicU64, Arc}; use ever_block::{ + error, fail, AccountId, Cell, HashmapRemover, Result, UInt256, Account, AccountBlock, Augmentation, CopyleftRewards, Deserializable, HashUpdate, HashmapAugType, LibDescr, Libraries, Serializable, ShardAccount, ShardAccounts, StateInitLib, Transaction, Transactions, }; -use ever_block::{fail, AccountId, Cell, HashmapRemover, Result, UInt256}; pub struct ShardAccountStuff { account_addr: AccountId, account_root: Cell, last_trans_hash: UInt256, last_trans_lt: u64, - lt: Arc, - transactions: Transactions, + lt: u64, + transactions: Option, state_update: HashUpdate, - orig_libs: StateInitLib, - copyleft_rewards: CopyleftRewards, + orig_libs: Option, + copyleft_rewards: Option, + + /// * Sync key of message, which updated account state + /// * It is an incremental counter set by executor + update_msg_sync_key: Option, + + // /// * Executor sets transaction that updated account to current state + // /// * Initial account state contains None + // last_transaction: Option<(Cell, CurrencyCollection)>, + + /// LT of transaction, which updated account state + update_trans_lt: Option, + + /// The copyleft_reward of transaction, which updated account state (if exists) + update_copyleft_reward_address: Option, + + /// Executor stores prevoius account state + prev_account_stuff: Option>, serde_opts: u8, } @@ -36,7 +52,7 @@ impl ShardAccountStuff { pub fn new( account_addr: AccountId, shard_acc: ShardAccount, - lt: Arc, + lt: u64, serde_opts: u8, ) -> Result { let account_hash = shard_acc.account_cell().repr_hash(); @@ -45,17 +61,65 @@ impl ShardAccountStuff { let last_trans_lt = shard_acc.last_trans_lt(); Ok(Self{ account_addr, - orig_libs: shard_acc.read_account()?.libraries(), + orig_libs: Some(shard_acc.read_account()?.libraries()), account_root, last_trans_hash, last_trans_lt, lt, - transactions: Transactions::with_serde_opts(serde_opts), + transactions: Some(Transactions::with_serde_opts(serde_opts)), state_update: HashUpdate::with_hashes(account_hash.clone(), account_hash), - copyleft_rewards: CopyleftRewards::default(), + copyleft_rewards: Some(CopyleftRewards::default()), + + update_msg_sync_key: None, + //last_transaction: None, + update_trans_lt: None, + update_copyleft_reward_address: None, + prev_account_stuff: None, serde_opts, }) } + /// Returns: + /// * None - if no updates or no matching records in history + /// * Some(particular) - record from history that matches update_msg_sync_key == on_msg_sync_key + pub fn commit(mut self, on_msg_sync_key: usize) -> Result> { + while let Some(current_update_msg_sync_key) = self.update_msg_sync_key { + if current_update_msg_sync_key == on_msg_sync_key { + log::debug!("account {:x} state committed by processed message {} in the queue", self.account_addr(), on_msg_sync_key); + return Ok(Some(self)); + } else { + if !self.revert()? { + log::debug!("unable to revert account {:x} state, current state is a first in history", self.account_addr()); + return Ok(None); + } else { + log::debug!("account {:x} state reverted one step back to message {:?} in the queue", self.account_addr(), self.update_msg_sync_key); + } + } + } + Ok(None) + } + fn revert(&mut self) -> Result { + let mut taked_prev = match self.prev_account_stuff.take() { + Some(prev) => prev, + None => return Ok(false), + }; + let prev = taked_prev.as_mut(); + + prev.orig_libs = self.orig_libs.take(); + + prev.transactions = self.transactions.take(); + if let Some(update_trans_lt) = self.update_trans_lt { + prev.remove_trans(update_trans_lt)?; + } + + prev.copyleft_rewards = self.copyleft_rewards.take(); + if let Some(update_copyleft_reward_address) = self.update_copyleft_reward_address.as_ref() { + prev.remove_copyleft_reward(update_copyleft_reward_address)?; + } + + std::mem::swap(self, prev); + + Ok(true) + } pub fn update_shard_state(&mut self, new_accounts: &mut ShardAccounts) -> Result { let account = self.read_account()?; if account.is_none() { @@ -65,10 +129,10 @@ impl ShardAccountStuff { let value = shard_acc.write_to_new_cell()?; new_accounts.set_builder_serialized(self.account_addr().clone(), &value, &account.aug()?)?; } - AccountBlock::with_params(&self.account_addr, &self.transactions, &self.state_update) + AccountBlock::with_params(&self.account_addr, self.transactions()?, &self.state_update) } - pub fn lt(&self) -> Arc { - self.lt.clone() + pub fn lt(&self) -> u64 { + self.lt } pub fn read_account(&self) -> Result { Account::construct_from_cell(self.account_root()) @@ -82,30 +146,107 @@ impl ShardAccountStuff { pub fn account_addr(&self) -> &AccountId { &self.account_addr } - pub fn copyleft_rewards(&self) -> &CopyleftRewards { - &self.copyleft_rewards + pub fn copyleft_rewards(&self) -> Result<&CopyleftRewards> { + self.copyleft_rewards.as_ref() + .ok_or_else(|| error!( + "`copyleft_rewards` field is None, possibly you try access a not root record in the history, run commit() before" + )) + } + fn copyleft_rewards_mut(&mut self) -> Result<&mut CopyleftRewards> { + self.copyleft_rewards.as_mut() + .ok_or_else(|| error!( + "`copyleft_rewards` field is None, possibly you try access a not root record in the history, run commit() before" + )) + } + fn remove_copyleft_reward(&mut self, address: &AccountId) -> Result { + self.copyleft_rewards_mut()?.remove(address) + } + + fn transactions(&self) -> Result<&Transactions> { + self.transactions.as_ref() + .ok_or_else(|| error!( + "`transactions` field is None, possibly you try access a not root record in the history, run commit() before" + )) + } + fn transactions_mut(&mut self) -> Result<&mut Transactions> { + self.transactions.as_mut() + .ok_or_else(|| error!( + "`transactions` field is None, possibly you try access a not root record in the history, run commit() before" + )) + } + fn remove_trans(&mut self, trans_lt: u64) -> Result<()> { + let key = trans_lt.write_to_bitstring()?; + self.transactions_mut()?.remove(key)?; + Ok(()) + } + + fn orig_libs(&self) -> Result<&StateInitLib> { + self.orig_libs.as_ref() + .ok_or_else(|| error!( + "`orig_libs` field is None, possibly you try access a not root record in the history, run commit() before" + )) + } + + pub fn apply_transaction_res( + &mut self, + update_msg_sync_key: usize, + tx_last_lt: u64, + transaction_res: &mut Result, + account_root: Cell, + ) -> Result<()> { + let mut res = ShardAccountStuff { + account_addr: self.account_addr.clone(), + account_root: self.account_root.clone(), + last_trans_hash: self.last_trans_hash.clone(), + last_trans_lt: self.last_trans_lt, + lt: tx_last_lt, // 1014 or 1104 or 1024 + transactions: self.transactions.take(), + state_update: self.state_update.clone(), + orig_libs: self.orig_libs.take(), + copyleft_rewards: Some(CopyleftRewards::default()), + update_msg_sync_key: Some(update_msg_sync_key), + update_trans_lt: None, + update_copyleft_reward_address: None, + prev_account_stuff: None, + serde_opts: self.serde_opts, + }; + + if let Ok(transaction) = transaction_res { + res.add_transaction(transaction, account_root)?; + } + + std::mem::swap(self, &mut res); + + self.prev_account_stuff = Some(Box::new(res)); + + Ok(()) } pub fn add_transaction(&mut self, transaction: &mut Transaction, account_root: Cell) -> Result<()> { transaction.set_prev_trans_hash(self.last_trans_hash.clone()); - transaction.set_prev_trans_lt(self.last_trans_lt); + transaction.set_prev_trans_lt(self.last_trans_lt); // 1010 // log::trace!("{} {}", self.collated_block_descr, debug_transaction(transaction.clone())?); self.account_root = account_root; self.state_update.new_hash = self.account_root.repr_hash(); let tr_root = transaction.serialize_with_opts(self.serde_opts)?; + let tr_lt = transaction.logical_time(); // 1011 + self.last_trans_hash = tr_root.repr_hash(); - self.last_trans_lt = transaction.logical_time(); + self.last_trans_lt = tr_lt; + + self.update_trans_lt = Some(tr_lt); - self.transactions.setref( - &transaction.logical_time(), + self.transactions_mut()?.setref( + &tr_lt, &tr_root, transaction.total_fees() )?; if let Some(copyleft_reward) = transaction.copyleft_reward() { log::trace!("Copyleft reward {} {} from transaction {}", copyleft_reward.address, copyleft_reward.reward, self.last_trans_hash); - self.copyleft_rewards.add_copyleft_reward(©left_reward.address, ©left_reward.reward)?; + self.copyleft_rewards_mut()?.add_copyleft_reward(©left_reward.address, ©left_reward.reward)?; + self.update_copyleft_reward_address = Some(copyleft_reward.address.clone()); } Ok(()) @@ -113,8 +254,9 @@ impl ShardAccountStuff { pub fn update_public_libraries(&self, libraries: &mut Libraries) -> Result<()> { let account = self.read_account()?; let new_libs = account.libraries(); - if new_libs.root() != self.orig_libs.root() { - new_libs.scan_diff(&self.orig_libs, |key: UInt256, old, new| { + let orig_libs = self.orig_libs()?; + if new_libs.root() != orig_libs.root() { + new_libs.scan_diff(orig_libs, |key: UInt256, old, new| { let old = old.unwrap_or_default(); let new = new.unwrap_or_default(); if old.is_public_library() && !new.is_public_library() { diff --git a/src/types/limits.rs b/src/types/limits.rs index fe296f71..a51ef114 100644 --- a/src/types/limits.rs +++ b/src/types/limits.rs @@ -109,8 +109,8 @@ impl BlockLimitStatus { } /// Update logical time - pub fn update_lt(&mut self, lt: u64) { - self.lt_current = max(self.lt_current, lt); + pub fn update_lt(&mut self, lt: u64, force: bool) { + self.lt_current = if force { lt } else { max(self.lt_current, lt) }; if self.lt_start > self.lt_current { self.lt_start = lt; } diff --git a/src/types/messages.rs b/src/types/messages.rs index 85d3d340..4fc6523b 100644 --- a/src/types/messages.rs +++ b/src/types/messages.rs @@ -13,12 +13,12 @@ use std::fmt::{self, Display, Formatter}; use ever_block::{ - GlobalCapabilities, + error, fail, Result, AccountId, SliceData, UInt256, + GlobalCapabilities, InternalMessageHeader, EnqueuedMsg, MsgEnvelope, AccountIdPrefixFull, IntermediateAddress, OutMsgQueueKey, Serializable, Deserializable, Grams, ShardIdent, AddSub, CommonMessage, ChildCell }; use ever_executor::{BlockchainConfig, CalcMsgFwdFees}; -use ever_block::{error, fail, Result, AccountId, SliceData, UInt256}; #[cfg(test)] #[path = "tests/test_messages.rs"] @@ -83,6 +83,9 @@ impl MsgEnvelopeStuff { pub fn message(&self) -> &CommonMessage { &self.msg } pub fn message_hash(&self) -> UInt256 { self.env.message_hash() } pub fn message_cell(&self) -> ChildCell { self.env.msg_cell() } + pub fn out_msg_key(&self) -> OutMsgQueueKey { + OutMsgQueueKey::with_account_prefix(&self.next_prefix(), self.message_hash()) + } #[cfg(test)] pub fn src_prefix(&self) -> &AccountIdPrefixFull { &self.src_prefix } pub fn dst_prefix(&self) -> &AccountIdPrefixFull { &self.dst_prefix } @@ -216,6 +219,18 @@ impl MsgEnqueueStuff { pub fn out_msg_key(&self) -> OutMsgQueueKey { OutMsgQueueKey::with_account_prefix(&self.next_prefix(), self.message_hash()) } + pub fn int_header(&self) -> Result<&InternalMessageHeader> { + self.message().get_std()?.int_header() + .ok_or_else(|| error!("message with hash {:x} is not internal", self.message_hash())) + } + pub fn src_account_id(&self) -> Result { + self.message() + .get_std()? + .src_ref() + .map(|address| address.address()) + .ok_or_else(|| error!("internal message with hash {:x} \ + has wrong source address", self.message_hash())) + } pub fn dst_account_id(&self) -> Result { self.message() .get_std()? diff --git a/src/validator/collator.rs b/src/validator/collator.rs index 070a9974..6b920cc2 100644 --- a/src/validator/collator.rs +++ b/src/validator/collator.rs @@ -43,6 +43,8 @@ use ton_api::ton::ton_node::{ rempmessagestatus::{RempAccepted, RempIgnored, RempRejected}, RempMessageLevel, RempMessageStatus }; +use tokio::sync::Mutex; +use std::collections::BTreeMap; use std::{ cmp::{max, min}, collections::{BinaryHeap, HashMap, HashSet}, @@ -65,13 +67,13 @@ use ever_block::{ Transaction, TransactionTickTock, UnixTime32, ValidatorSet, ValueFlow, VarUInteger32, WorkchainDescr, Workchains, MASTERCHAIN_ID, SERDE_OPTS_COMMON_MESSAGE, SERDE_OPTS_EMPTY, CommonMessage, AccountIdPrefixFull, ChildCell, ConnectedNwOutDescr, HashUpdate, InRefValue, - ConnectedNwDescrExt, ConnectedNwDescr, Account, GetRepresentationHash, + ConnectedNwDescrExt, ConnectedNwDescr, Account, GetRepresentationHash, HashmapRemover, + EnqueuedMsg, SliceData, error, fail, AccountId, Cell, HashmapType, Result, UInt256, UsageTree }; use ever_executor::{ BlockchainConfig, ExecuteParams, OrdinaryTransactionExecutor, TickTockTransactionExecutor, TransactionExecutor, }; -use ever_block::{error, fail, AccountId, Cell, HashmapType, Result, UInt256, UsageTree, SliceData}; use crate::engine_traits::RempQueueCollatorInterface; use crate::validator::validator_utils::{is_remp_enabled, PrevBlockHistory}; @@ -184,10 +186,13 @@ enum AsyncMessage { Copyleft(CommonMessage), Ext(CommonMessage, UInt256), Int(MsgEnqueueStuff, bool), - New(MsgEnvelopeStuff, Cell), // prev_trans_cell + New(MsgEnvelopeStuff, Cell, u64), // prev_trans_cell TickTock(TransactionTickTock), } +#[derive(Debug)] +struct AsyncMessageSync(usize, AsyncMessage); + #[derive(Clone, Eq, PartialEq)] struct NewMessage { lt_hash: (u64, UInt256), @@ -196,17 +201,6 @@ struct NewMessage { prefix: AccountIdPrefixFull, } -impl NewMessage { - fn new(lt_hash: (u64, UInt256), msg: CommonMessage, tr_cell: Cell, prefix: AccountIdPrefixFull) -> Self { - Self { - lt_hash, - msg, - tr_cell, - prefix, - } - } -} - impl Ord for NewMessage { fn cmp(&self, other: &Self) -> std::cmp::Ordering { other.lt_hash.cmp(&self.lt_hash) @@ -222,17 +216,41 @@ impl PartialOrd for NewMessage { struct CollatorData { // lists, empty by default in_msgs: InMsgDescr, + /// * key - msg_sync_key + /// * value - in msg descr hash + in_msgs_descr_history: HashMap, out_msgs: OutMsgDescr, + /// * key - msg_sync_key + /// * value - list of out msgs descrs hashes + out_msgs_descr_history: HashMap)>>, accounts: ShardAccountBlocks, out_msg_queue_info: OutMsgQueueInfoStuff, + /// * key - msg_sync_key + /// * value - removed out msg key, EnqueuedMsg and is_new flag + del_out_queue_msg_history: HashMap, + /// * key - msg_sync_key + /// * value - msg key in out queue + add_out_queue_msg_history: HashMap>, shard_fees: ShardFees, shard_top_block_descriptors: Vec>, block_create_count: HashMap, new_messages: BinaryHeap, // using for priority queue + /// * key - msg_sync_key + /// * value - list of new msgs + new_messages_buffer: BTreeMap>, accepted_ext_messages: Vec<(UInt256, i32)>, // message id and wokchain id - rejected_ext_messages: Vec<(UInt256, String)>, // message id and reject reason + /// * key - msg_sync_key + /// * value - ext msg id + accepted_ext_messages_buffer: HashMap, + /// * key - msg_sync_key + /// * value - ext msg id and error info + rejected_ext_messages: Vec<(UInt256, String)>, + rejected_ext_messages_buffer: HashMap, usage_tree: UsageTree, imported_visited: HashSet, + /// * key - msg_sync_key + /// * value - last account lt after msg processing + tx_last_lt_buffer: HashMap, // determined fields gen_utime: u32, @@ -246,17 +264,31 @@ struct CollatorData { shards: Option, mesh: Option, mint_msg: Option, + /// * key - msg_sync_key + /// * value - mint msg descr + mint_msg_buffer: BTreeMap>, recover_create_msg: Option, + /// * key - msg_sync_key + /// * value - recover create msg descr + recover_create_msg_buffer: BTreeMap>, copyleft_msgs: Vec, + /// * key - msg_sync_key + /// * value - list of copyleft msgs + copyleft_msgs_buffer: BTreeMap, // fields with default values skip_topmsgdescr: bool, skip_extmsg: bool, shard_conf_adjusted: bool, + // Will not support history. When parallel collation cancelled + // no new msgs can be processed so we do not need to check limits anymore block_limit_status: BlockLimitStatus, block_create_total: u64, inbound_queues_empty: bool, + /// * key - msg_sync_key + /// * value - incoming internal msg LT HASH last_proc_int_msg: (u64, UInt256), + last_proc_int_msg_buffer: BTreeMap, shards_max_end_lt: u64, before_split: bool, now_upper_limit: u32, @@ -281,6 +313,9 @@ struct CollatorData { // timings and global capabilities split_queues: bool, + + // string with format like `-1:8000000000000000, 100500`, is used for logging. + collated_block_descr: Arc, } impl CollatorData { @@ -291,23 +326,32 @@ impl CollatorData { usage_tree: UsageTree, prev_data: &PrevData, is_masterchain: bool, + collated_block_descr: Arc, ) -> Result { let limits = Arc::new(config.raw_config().block_limits(is_masterchain)?); - let opts = serde_opts_from_caps(&config); + let serde_opts = serde_opts_from_caps(&config); let split_queues = !config.has_capability(GlobalCapabilities::CapNoSplitOutQueue); let ret = Self { - in_msgs: InMsgDescr::with_serde_opts(opts), - out_msgs: OutMsgDescr::with_serde_opts(opts), + in_msgs: InMsgDescr::with_serde_opts(serde_opts), + in_msgs_descr_history: Default::default(), + out_msgs: OutMsgDescr::with_serde_opts(serde_opts), + out_msgs_descr_history: Default::default(), accounts: ShardAccountBlocks::default(), out_msg_queue_info: OutMsgQueueInfoStuff::default(), + del_out_queue_msg_history: Default::default(), + add_out_queue_msg_history: Default::default(), shard_fees: ShardFees::default(), shard_top_block_descriptors: Vec::new(), block_create_count: HashMap::new(), new_messages: Default::default(), + new_messages_buffer: Default::default(), accepted_ext_messages: Default::default(), + accepted_ext_messages_buffer: Default::default(), rejected_ext_messages: Default::default(), + rejected_ext_messages_buffer: Default::default(), usage_tree, imported_visited: HashSet::new(), + tx_last_lt_buffer: Default::default(), gen_utime, config, start_lt: None, @@ -319,8 +363,11 @@ impl CollatorData { shards: None, mesh: None, mint_msg: None, + mint_msg_buffer: BTreeMap::new(), recover_create_msg: None, + recover_create_msg_buffer: BTreeMap::new(), copyleft_msgs: Default::default(), + copyleft_msgs_buffer: BTreeMap::new(), skip_topmsgdescr: false, skip_extmsg: false, shard_conf_adjusted: false, @@ -328,6 +375,7 @@ impl CollatorData { block_create_total: 0, inbound_queues_empty: false, last_proc_int_msg: (0, UInt256::default()), + last_proc_int_msg_buffer: Default::default(), want_merge: false, underload_history: prev_data.underload_history() << 1, want_split: false, @@ -342,8 +390,9 @@ impl CollatorData { remove_count: 0, msg_queue_depth_sum: 0, before_split: false, - serde_opts: opts, + serde_opts, split_queues, + collated_block_descr, }; Ok(ret) } @@ -362,13 +411,41 @@ impl CollatorData { self.out_msgs.data().cloned().ok_or_else(|| error!("out msg descr is empty")) } + /// Stores processed internal message LT HASH to buffer + fn add_last_proc_int_msg_to_buffer(&mut self, src_msg_sync_key: usize, lt_hash: (u64, UInt256)) { + log::trace!( + "{}: added last_proc_int_msg {}: ({}, {:x}) to buffer", + self.collated_block_descr, + src_msg_sync_key, lt_hash.0, lt_hash.1, + ); + self.last_proc_int_msg_buffer.insert(src_msg_sync_key, lt_hash); + } + /// Clean out processed internal message LT HASH from buffer by src msg + fn revert_last_proc_int_msg_by_src_msg(&mut self, src_msg_sync_key: &usize) { + let lt_hash = self.last_proc_int_msg_buffer.remove(src_msg_sync_key); + log::trace!( + "{}: removed last_proc_int_msg {}: ({:?}) to buffer", + self.collated_block_descr, + src_msg_sync_key, lt_hash, + ); + } + /// Updates last processed internal message LT HASH from not reverted in buffer + fn commit_last_proc_int_msg(&mut self) -> Result<()> { + log::trace!("{}: last_proc_int_msg_buffer: {:?}", self.collated_block_descr, self.last_proc_int_msg_buffer); + while let Some((_, lt_hash)) = self.last_proc_int_msg_buffer.pop_first() { + self.update_last_proc_int_msg(lt_hash)?; + } + Ok(()) + } + fn update_last_proc_int_msg(&mut self, new_lt_hash: (u64, UInt256)) -> Result<()> { if self.last_proc_int_msg < new_lt_hash { CHECK!(new_lt_hash.0 > 0); - log::trace!("last_proc_int_msg updated to ({},{:x})", new_lt_hash.0, new_lt_hash.1); + log::trace!("{}: last_proc_int_msg updated to ({},{:x})", self.collated_block_descr, new_lt_hash.0, new_lt_hash.1); self.last_proc_int_msg = new_lt_hash; } else { - log::error!("processed message ({},{:x}) AFTER message ({},{:x})", new_lt_hash.0, new_lt_hash.1, + log::error!("{}: processed message ({},{:x}) AFTER message ({},{:x})", + self.collated_block_descr, new_lt_hash.0, new_lt_hash.1, self.last_proc_int_msg.0, self.last_proc_int_msg.1); self.last_proc_int_msg.0 = std::u64::MAX; fail!("internal message processing order violated!") @@ -377,12 +454,37 @@ impl CollatorData { } fn update_lt(&mut self, lt: u64) { - self.block_limit_status.update_lt(lt); + self.block_limit_status.update_lt(lt, false); + } + + /// Stores transaction last LT to buffer by src msg, updates block_limit_status + fn add_tx_last_lt_to_buffer(&mut self, src_msg_sync_key: usize, tx_last_lt: u64) { + self.tx_last_lt_buffer.insert(src_msg_sync_key, tx_last_lt); + self.block_limit_status.update_lt(tx_last_lt, false); + } + /// Clean transaction last LT from buffer by src msg + fn revert_tx_last_lt_by_src_msg(&mut self, src_msg_sync_key: &usize) { + self.tx_last_lt_buffer.remove(src_msg_sync_key); + } + /// Saves max transaction last LT to block_limit_status and returns value + fn commit_tx_last_lt(&mut self) -> Option { + if let Some(max_lt) = self.tx_last_lt_buffer.values().reduce(|curr, next| curr.max(next)) { + self.block_limit_status.update_lt(*max_lt, true); + Some(*max_lt) + } else { + None + } } /// add in and out messages from to block, and to new message queue - fn new_transaction(&mut self, transaction: &Transaction, tr_cell: ChildCell, in_msg_opt: Option<&InMsg>) -> Result<()> { + fn new_transaction( + &mut self, + transaction: &Transaction, + tr_cell: ChildCell, + in_msg_opt: Option<&InMsg>, + src_msg_sync_key: usize, + ) -> Result<()> { // log::trace!( // "new transaction, message {:x}\n{}", // in_msg_opt.map(|m| m.message_cell().unwrap().repr_hash()).unwrap_or_default(), @@ -394,11 +496,11 @@ impl CollatorData { self.block_limit_status.add_transaction(transaction.logical_time() == self.start_lt()? + 1); if let Some(in_msg) = in_msg_opt { self.add_in_msg_to_block(in_msg)?; + self.add_in_msg_descr_to_history(src_msg_sync_key, in_msg)?; } let shard = self.out_msg_queue_info.shard().clone(); - let opts = self.serde_opts; transaction.out_msgs.iterate_slices(|slice| { - let msg_cell: ChildCell = ChildCell::with_cell_and_opts(slice.reference(0)?, opts); + let msg_cell: ChildCell = ChildCell::with_cell_and_opts(slice.reference(0)?, self.serde_opts); let msg_hash = msg_cell.hash(); let common_msg = msg_cell.read_struct()?; let msg = common_msg.get_std()?; @@ -407,18 +509,30 @@ impl CollatorData { // Add out message to state for counting time and it may be removed if used let use_hypercube = !self.config.has_capability(GlobalCapabilities::CapOffHypercube); let fwd_fee = *info.fwd_fee(); - let enq = MsgEnqueueStuff::new(common_msg.clone(), &shard, fwd_fee, use_hypercube, opts)?; + let enq = MsgEnqueueStuff::new(common_msg.clone(), &shard, fwd_fee, use_hypercube, self.serde_opts)?; self.enqueue_count += 1; self.msg_queue_depth_sum += self.out_msg_queue_info.add_message(&enq)?; // Add to message block here for counting time later it may be replaced let out_msg = OutMsg::new(enq.envelope_cell(), tr_cell.clone()); - self.add_out_msg_to_block(msg_hash.clone(), &out_msg)?; - self.new_messages.push(NewMessage::new((info.created_lt, msg_hash), common_msg, tr_cell.cell(), enq.next_prefix().clone())); + let new_msg = NewMessage { + lt_hash: (info.created_lt, msg_hash.clone()), + msg: common_msg, + tr_cell: tr_cell.cell(), + prefix: enq.next_prefix().clone(), + }; + self.add_out_queue_msg_with_history(src_msg_sync_key, enq)?; + + // Add to message block here for counting time later it may be replaced + let prev_out_msg_slice_opt = self.add_out_msg_to_block(msg_hash.clone(), &out_msg)?; + self.add_out_msg_descr_to_history(src_msg_sync_key, msg_hash, prev_out_msg_slice_opt); + self.add_new_message_to_buffer(src_msg_sync_key, new_msg); } CommonMsgInfo::ExtOutMsgInfo(_) => { let out_msg = OutMsg::external(msg_cell, tr_cell.clone()); - self.add_out_msg_to_block(out_msg.read_message_hash()?, &out_msg)?; + let msg_hash = out_msg.read_message_hash()?; + let prev_out_msg_slice_opt = self.add_out_msg_to_block(msg_hash.clone(), &out_msg)?; + self.add_out_msg_descr_to_history(src_msg_sync_key, msg_hash, prev_out_msg_slice_opt); } CommonMsgInfo::ExtInMsgInfo(_) => fail!("External inbound message cannot be output") }; @@ -430,35 +544,165 @@ impl CollatorData { /// put InMsg to block fn add_in_msg_to_block(&mut self, in_msg: &InMsg) -> Result<()> { self.in_msg_count += 1; - let msg_cell = in_msg.serialize_with_opts(self.serde_opts)?; self.in_msgs.insert(in_msg)?; + + let msg_cell = in_msg.serialize_with_opts(self.serde_opts)?; self.block_limit_status.register_in_msg_op(&msg_cell, &self.in_msgs_root()?) } + /// Stores in_msg descr hash by src msg + fn add_in_msg_descr_to_history(&mut self, src_msg_sync_key: usize, in_msg: &InMsg) -> Result<()> { + let msg_hash = in_msg.message_cell()?.repr_hash(); + self.in_msgs_descr_history.insert(src_msg_sync_key, msg_hash); + Ok(()) + } + /// Removes in_msg descr created by src msg. Does not update block_limit_status + fn revert_in_msgs_descr_by_src_msg(&mut self, src_msg_sync_key: &usize) -> Result<()> { + if let Some(msg_hash) = self.in_msgs_descr_history.remove(src_msg_sync_key) { + self.in_msg_count -= 1; + let key = SliceData::load_builder(msg_hash.write_to_new_cell()?)?; + self.in_msgs.remove(key)?; + } + Ok(()) + } + /// Clean out in_msg descr history + fn commit_in_msgs_descr_by_src_msg(&mut self) { + self.in_msgs_descr_history.clear(); + self.in_msgs_descr_history.shrink_to_fit(); + } + /// put OutMsg to block - fn add_out_msg_to_block(&mut self, key: UInt256, out_msg: &OutMsg) -> Result<()> { + fn add_out_msg_to_block(&mut self, key: UInt256, out_msg: &OutMsg) -> Result> { self.out_msg_count += 1; - self.out_msgs.insert_with_key(key, out_msg)?; + + let prev_value = self.out_msgs.insert_with_key_return_prev(key, out_msg)?; let msg_cell = out_msg.serialize_with_opts(self.serde_opts)?; - self.block_limit_status.register_out_msg_op(&msg_cell, &self.out_msgs_root()?) + self.block_limit_status.register_out_msg_op(&msg_cell, &self.out_msgs_root()?)?; + + Ok(prev_value) + } + + /// put OutMsg to block, does not update block_limit_status + fn add_out_msg_to_block_without_limits_update(&mut self, key: UInt256, out_msg: &OutMsg) -> Result> { + self.out_msg_count += 1; + + self.out_msgs.insert_with_key_return_prev(key, out_msg) + } + + /// Stores out_msg descr hash by src msg + fn add_out_msg_descr_to_history( + &mut self, + src_msg_sync_key: usize, + out_msg_hash: UInt256, + prev_out_msg_slice_opt: Option, + ) { + if let Some(v) = self.out_msgs_descr_history.get_mut(&src_msg_sync_key) { + v.push((out_msg_hash, prev_out_msg_slice_opt)); + } else { + self.out_msgs_descr_history.insert(src_msg_sync_key, vec![(out_msg_hash, prev_out_msg_slice_opt)]); + } + } + /// Removes all out_msg descrs created by src msg. Does not update block_limit_status + fn revert_out_msgs_descr_by_src_msg(&mut self, src_msg_sync_key: &usize) -> Result<()> { + if let Some(msgs_history) = self.out_msgs_descr_history.remove(src_msg_sync_key) { + for (msg_hash, prev_out_msg_slice_opt) in msgs_history { + self.out_msg_count -= 1; + + // return prev out msg descr to map if exists + if let Some(mut prev_out_msg_slice) = prev_out_msg_slice_opt { + log::debug!("{}: previous out msg descr {:x} reverted to block", self.collated_block_descr, msg_hash); + let prev_out_msg = OutMsg::construct_from(&mut prev_out_msg_slice)?; + self.add_out_msg_to_block_without_limits_update(msg_hash, &prev_out_msg)?; + } else { + let key = SliceData::load_builder(msg_hash.write_to_new_cell()?)?; + self.out_msgs.remove(key)?; + } + } + } + Ok(()) + } + /// Clean out out_msg descrs history + fn commit_out_msgs_descr_by_src_msg(&mut self) { + self.out_msgs_descr_history.clear(); + self.out_msgs_descr_history.shrink_to_fit(); + } + + /// Stores accepted ext message id in buffer of accepted by src msg sync id + fn add_accepted_ext_message_to_buffer(&mut self, src_msg_sync_key: usize, msg_id: UInt256, workchain_id: i32) { + self.accepted_ext_messages_buffer.insert(src_msg_sync_key, (msg_id, workchain_id)); + } + /// Clean accepted ext message id from buffer + fn revert_accepted_ext_message_by_src_msg(&mut self, src_msg_sync_key: &usize) { + self.accepted_ext_messages_buffer.remove(src_msg_sync_key); + } + /// Add accepted ext messages from buffer to collator data + fn commit_accepted_ext_messages(&mut self) { + for (_, (msg_id, workchain_id)) in self.accepted_ext_messages_buffer.drain() { + self.accepted_ext_messages.push((msg_id, workchain_id)); + } + } + + /// Stores rejected ext message info in buffer of accepted by src msg sync id + fn add_rejected_ext_message_to_buffer(&mut self, src_msg_sync_key: usize, rejected_msg: (UInt256, String)) { + self.rejected_ext_messages_buffer.insert(src_msg_sync_key, rejected_msg); + } + /// Clean rejected ext message info from buffer + fn revert_rejected_ext_message_by_src_msg(&mut self, src_msg_sync_key: &usize) { + self.rejected_ext_messages_buffer.remove(src_msg_sync_key); + } + /// Add rejected ext messages info from buffer to collator data + fn commit_rejected_ext_messages(&mut self) { + for (_, msg_info) in self.rejected_ext_messages_buffer.drain() { + self.rejected_ext_messages.push(msg_info); + } } /// delete message from state queue - fn del_out_msg_from_state(&mut self, key: &OutMsgQueueKey) -> Result<()> { + fn del_out_msg_from_state(&mut self, key: &OutMsgQueueKey) -> Result { // let mut data = self.out_msg_queue_info.del_message(key)?; // let created_lt = u64::construct_from(&mut data)?; // let enq = MsgEnqueueStuff::construct_from(&mut data, created_lt)?; // let data = ever_block::write_boc(&enq.message_cell())?; // log::debug!("del_out_msg_from_state {:x} size {}", key, data.len()); - log::debug!("del_out_msg_from_state {:x}", key); + log::debug!("{}: del_out_msg_from_state {:x}", self.collated_block_descr, key); self.dequeue_count += 1; - self.out_msg_queue_info.del_message(key)?; + let enq = self.out_msg_queue_info.del_message(key)?; + self.block_limit_status.register_out_msg_queue_op( + self.out_msg_queue_info.out_queue()?.data(), + &self.usage_tree, + false + )?; + Ok(enq) + } + + /// Removes msg from out queue, stores msg in the history to be able to revert it futher + fn del_out_queue_msg_with_history(&mut self, src_msg_sync_key: usize, key: OutMsgQueueKey, is_new: bool) -> Result<()> { + log::debug!("{}: del_out_queue_msg_with_history {:x}", self.collated_block_descr, key); + if is_new { self.enqueue_count -= 1; } else { self.dequeue_count += 1; } + let enq = self.out_msg_queue_info.del_message(&key)?; self.block_limit_status.register_out_msg_queue_op( self.out_msg_queue_info.out_queue()?.data(), &self.usage_tree, false )?; + self.del_out_queue_msg_history.insert(src_msg_sync_key, (key, enq, is_new)); + Ok(()) + } + /// Reverts previously removed msg from out queue + fn revert_del_out_queue_msg(&mut self, src_msg_sync_key: &usize) -> Result<()> { + if let Some((key, enq, is_new)) = self.del_out_queue_msg_history.remove(src_msg_sync_key) { + if is_new { self.enqueue_count += 1; } else { self.dequeue_count -= 1; } + let enq_stuff = MsgEnqueueStuff::from_enqueue(enq)?; + self.out_msg_queue_info.add_message(&enq_stuff)?; + log::debug!("{}: reverted del_out_queue_msg {:x}", self.collated_block_descr, key); + } + Ok(()) + } + /// Cleans out queue msgs removing history + fn commit_del_out_queue_msgs(&mut self) -> Result<()> { + self.del_out_queue_msg_history.clear(); + self.del_out_queue_msg_history.shrink_to_fit(); Ok(()) } @@ -474,6 +718,114 @@ impl CollatorData { Ok(()) } + /// Adds new msg to out queue, stores history to be able to revert futher + fn add_out_queue_msg_with_history(&mut self, src_msg_sync_key: usize, enq_stuff: MsgEnqueueStuff) -> Result<()> { + self.enqueue_count += 1; + self.out_msg_queue_info.add_message(&enq_stuff)?; + let key = enq_stuff.out_msg_key(); + if let Some(v) = self.add_out_queue_msg_history.get_mut(&src_msg_sync_key) { + v.push(key); + } else { + self.add_out_queue_msg_history.insert(src_msg_sync_key, vec![key]); + } + Ok(()) + } + /// Removes previously added new msgs from out queue + fn revert_add_out_queue_msgs(&mut self, src_msg_sync_key: &usize) -> Result<()> { + if let Some(keys) = self.add_out_queue_msg_history.remove(src_msg_sync_key) { + let remove_count = keys.len(); + for key in keys { + self.enqueue_count -= 1; + self.out_msg_queue_info.del_message(&key)?; + } + log::debug!("{}: {} new created messages removed from out queue", self.collated_block_descr, remove_count); + } + Ok(()) + } + /// Cleans out queue msgs adding history + fn commit_add_out_queue_msgs(&mut self) -> Result<()> { + self.add_out_queue_msg_history.clear(); + self.add_out_queue_msg_history.shrink_to_fit(); + Ok(()) + } + + /// Stores new internal msg, created by src msg, to buffer + fn add_new_message_to_buffer(&mut self, src_msg_sync_key: usize, new_msg: NewMessage) { + if let Some(v) = self.new_messages_buffer.get_mut(&src_msg_sync_key) { + v.push(new_msg); + } else { + self.new_messages_buffer.insert(src_msg_sync_key, vec![new_msg]); + } + } + /// Clean out new internal msgs, created by src msg, from buffer + fn revert_new_messages_by_src_msg(&mut self, src_msg_sync_key: &usize) { + self.new_messages_buffer.remove(src_msg_sync_key); + } + /// Adds new internal msgs to new_messages queue for processing + fn commit_new_messages(&mut self) { + let new_msgs_count = self.new_messages_buffer.len(); + while let Some((_, msgs)) = self.new_messages_buffer.pop_first() { + for new_msg in msgs { + if let Ok(msg) = new_msg.msg.get_std() { + log::trace!( + "{}: committed new created msg {:x} (bounced: {:?}) from {:x} to account {:x} from buffer to new_messages", + self.collated_block_descr, new_msg.lt_hash.1, + msg.int_header().map(|h| h.bounced), + msg.src().unwrap_or_default().address(), + msg.dst().unwrap_or_default().address(), + ); + self.new_messages.push(new_msg); + } + } + } + log::debug!("{}: {} new created messages committed from buffer to new_messages", self.collated_block_descr, new_msgs_count); + } + + /// Stores mint message in buffer by src msg + fn add_mint_msg_to_buffer(&mut self, src_msg_sync_key: usize, msg: Option) { + self.mint_msg_buffer.insert(src_msg_sync_key, msg); + } + /// Clean mint message from buffer by src msg + fn revert_mint_msg_by_src_msg(&mut self, src_msg_sync_key: &usize) { + self.mint_msg_buffer.remove(src_msg_sync_key); + } + /// Save the last processed and not reverted mint message to collator data + fn commit_mint_msg(&mut self) { + if let Some((k, v)) = self.mint_msg_buffer.pop_last() { + self.mint_msg = v; + } + } + + /// Stores recover create message in buffer by src msg + fn add_recover_create_msg_to_buffer(&mut self, src_msg_sync_key: usize, msg: Option) { + self.recover_create_msg_buffer.insert(src_msg_sync_key, msg); + } + /// Clean recover create message from buffer by src msg + fn revert_recover_create_msg_by_src_msg(&mut self, src_msg_sync_key: &usize) { + self.recover_create_msg_buffer.remove(src_msg_sync_key); + } + /// Save the last processed and not reverted recover create message to collator data + fn commit_recover_create_msg(&mut self) { + if let Some((k, v)) = self.recover_create_msg_buffer.pop_last() { + self.recover_create_msg = v; + } + } + + /// Stores copyleft message in buffer by src msg + fn add_copyleft_msg_to_buffer(&mut self, src_msg_sync_key: usize, msg: InMsg) { + self.copyleft_msgs_buffer.insert(src_msg_sync_key, msg); + } + /// Clean copyleft message from buffer by src msg + fn revert_copyleft_msg_by_src_msg(&mut self, src_msg_sync_key: &usize) { + self.copyleft_msgs_buffer.remove(src_msg_sync_key); + } + /// Save all not reverted copyleft messages to collator data + fn commit_copyleft_msgs(&mut self) { + while let Some((_, msg)) = self.copyleft_msgs_buffer.pop_first() { + self.copyleft_msgs.push(msg); + } + } + fn enqueue_transit_message( &mut self, shard: &ShardIdent, @@ -562,7 +914,7 @@ impl CollatorData { if self.start_lt.is_some() { fail!("`start_lt` is already initialized") } - self.block_limit_status.update_lt(lt); + self.block_limit_status.update_lt(lt, false); self.start_lt = Some(lt); Ok(()) } @@ -646,7 +998,7 @@ impl CollatorData { fn shard_conf_adjusted(&self) -> bool { self.shard_conf_adjusted } fn set_shard_conf_adjusted(&mut self) { self.shard_conf_adjusted = true; } - fn dequeue_message(&mut self, enq: MsgEnqueueStuff, deliver_lt: u64, short: bool) -> Result<()> { + fn dequeue_message(&mut self, enq: MsgEnqueueStuff, deliver_lt: u64, short: bool) -> Result> { self.dequeue_count += 1; let out_msg = match short { true => OutMsg::dequeue_short(enq.envelope_hash(), enq.next_prefix(), deliver_lt), @@ -686,29 +1038,111 @@ impl CollatorData { } } +struct ParallelMsgsCounter { + max_parallel_threads: usize, + max_msgs_queue_on_account: usize, + + limits_reached: Arc, + msgs_by_accounts: Arc)>>, +} + +impl ParallelMsgsCounter { + pub fn new(max_parallel_threads: usize, max_msgs_queue_on_account: usize) -> Self { + Self { + max_parallel_threads: max_parallel_threads.max(1), + max_msgs_queue_on_account: max_msgs_queue_on_account.max(1), + + limits_reached: Arc::new(AtomicBool::new(false)), + + msgs_by_accounts: Arc::new(Mutex::new((0, HashMap::new()))), + } + } + + pub fn limits_reached(&self) -> bool { + self.limits_reached.load(Ordering::Relaxed) + } + fn set_limits_reached(&self, val: bool) { + self.limits_reached.store(val, Ordering::Relaxed); + } + + pub async fn add_account_msgs_counter(&self, account_id: AccountId) { + let account_id_str = format!("{:x}", account_id); + let mut guard = self.msgs_by_accounts.clone().lock_owned().await; + let (active_threads, msgs_by_account) = &mut *guard; + let msgs_count = msgs_by_account + .entry(account_id) + .and_modify(|val| { + if *val == 0 { + *active_threads += 1; + } + *val += 1; + }) + .or_insert_with(|| { + *active_threads += 1; + 1 + }); + if *msgs_count >= self.max_msgs_queue_on_account || *active_threads >= self.max_parallel_threads { + self.set_limits_reached(true); + } + + log::trace!("ParallelMsgsCounter: msgs count inreased for {}, counter state is: ({}, {:?})", account_id_str, active_threads, msgs_by_account); + } + + pub async fn sub_account_msgs_counter(&self, account_id: AccountId) { + let account_id_str = format!("{:x}", account_id); + let mut guard = self.msgs_by_accounts.clone().lock_owned().await; + let (active_threads, msgs_by_account) = &mut *guard; + let msgs_count = msgs_by_account + .entry(account_id) + .and_modify(|val| { + *val -= 1; + if *val == 0 { + *active_threads -= 1; + } + }) + .or_insert(0); + if *msgs_count < self.max_msgs_queue_on_account { + if *active_threads < self.max_parallel_threads && msgs_by_account.values().all(|c| *c < self.max_msgs_queue_on_account) { + self.set_limits_reached(false); + } + } + + log::trace!("ParallelMsgsCounter: msgs count decreased for {}, counter state is: ({}, {:?})", account_id_str, active_threads, msgs_by_account); + } +} + struct ExecutionManager { changed_accounts: HashMap< AccountId, ( - tokio::sync::mpsc::UnboundedSender>, + tokio::sync::mpsc::UnboundedSender>, tokio::task::JoinHandle> ) >, - - receive_tr: tokio::sync::mpsc::UnboundedReceiver, Result)>>, - wait_tr: Arc, Result)>>, + + msgs_queue: Vec<(AccountId, bool, Option)>, + accounts_processed_msgs: HashMap>, + + cancellation_token: tokio_util::sync::CancellationToken, + check_cutoff_timeout: Box (bool, u32) + Send>, + + receive_tr: tokio::sync::mpsc::UnboundedReceiver, Result, u64)>>, + wait_tr: Arc, Result, u64)>>, max_collate_threads: usize, libraries: Libraries, gen_utime: u32, + parallel_msgs_counter: ParallelMsgsCounter, + // bloc's start logical time start_lt: u64, // actual maximum logical time - max_lt: Arc, + max_lt: u64, // this time is used if account's lt is smaller min_lt: Arc, // block random seed seed_block: UInt256, + serde_opts: u8, #[cfg(feature = "signature_with_id")] // signature ID used in VM @@ -718,57 +1152,97 @@ struct ExecutionManager { collated_block_descr: Arc, debug: bool, config: BlockchainConfig, + + #[cfg(test)] + test_msg_process_sleep: u64, } impl ExecutionManager { pub fn new( - gen_utime: u32, - start_lt: u64, - seed_block: UInt256, - #[cfg(feature = "signature_with_id")] - signature_id: i32, - libraries: Libraries, - config: BlockchainConfig, - max_collate_threads: usize, - collated_block_descr: Arc, - debug: bool, + collator: &Collator, + collator_data: &CollatorData, + mc_data: &McData, ) -> Result { + let collated_block_descr = collator_data.collated_block_descr.clone(); log::trace!("{}: ExecutionManager::new", collated_block_descr); let (wait_tr, receive_tr) = Wait::new(); + // closure to check the finalize timeout for parallel transactions + let collation_started = collator.started.clone(); + let check_cutoff_timeout = { + let cutoff_timeout = collator.engine.collator_config().cutoff_timeout_ms; + let started = collator.started.clone(); + Box::new(move || (started.elapsed().as_millis() as u32 > cutoff_timeout, cutoff_timeout)) + }; + let start_lt = collator_data.start_lt()?; + let max_collate_threads = collator.engine.collator_config().max_collate_threads as usize; + let max_collate_msgs_queue_on_account = collator.engine.collator_config().max_collate_msgs_queue_on_account as usize; Ok(Self { changed_accounts: HashMap::new(), + msgs_queue: Vec::new(), + accounts_processed_msgs: HashMap::new(), + // cancellation_token: collator.engine.collator_config() tokio_util::sync::CancellationToken::new(), + cancellation_token: tokio_util::sync::CancellationToken::new(), + check_cutoff_timeout, receive_tr, wait_tr, max_collate_threads, - libraries, - config, + parallel_msgs_counter: ParallelMsgsCounter::new(max_collate_threads, max_collate_msgs_queue_on_account), + libraries: mc_data.libraries()?.clone(), + config: collator_data.config.clone(), start_lt, - gen_utime, - seed_block, + gen_utime: collator_data.gen_utime(), + seed_block: collator.rand_seed.clone(), + serde_opts: collator_data.serde_opts, #[cfg(feature = "signature_with_id")] - signature_id, - max_lt: Arc::new(AtomicU64::new(start_lt + 1)), + signature_id: mc_data.state().state()?.global_id(), // Use network global ID as signature ID + max_lt: start_lt + 1, min_lt: Arc::new(AtomicU64::new(start_lt + 1)), total_trans_duration: Arc::new(AtomicU64::new(0)), collated_block_descr, - debug, + debug: collator.debug, + #[cfg(test)] + test_msg_process_sleep: 0, }) } + #[cfg(test)] + pub fn set_test_msg_process_sleep(&mut self, sleep_timeout: u64) { + self.test_msg_process_sleep = sleep_timeout; + } + // waits and finalizes all parallel tasks - pub async fn wait_transactions(&mut self, collator_data: &mut CollatorData) -> Result<()> { + pub async fn wait_transactions( + &mut self, + collator_data: &mut CollatorData, + ) -> Result<()> { log::trace!("{}: wait_transactions", self.collated_block_descr); + if self.is_parallel_processing_cancelled() { + log::debug!("{}: parallel collation was already stopped, do not wait transactions anymore", self.collated_block_descr); + return Ok(()); + } while self.wait_tr.count() > 0 { + log::trace!("{}: wait_tr count = {}", self.collated_block_descr, self.wait_tr.count()); self.wait_transaction(collator_data).await?; + + // stop parallel collation if finalize timeout reached + let (is_timeout, timeout_ms) = (self.check_cutoff_timeout)(); + if is_timeout { + log::warn!("{}: FINALIZE PARALLEL TIMEOUT ({}ms) is elapsed, stop parallel collation", + self.collated_block_descr, timeout_ms, + ); + self.cancel_parallel_processing(); + break; + } } - self.min_lt.fetch_max(self.max_lt.load(Ordering::Relaxed), Ordering::Relaxed); + self.commit_processed_msgs_changes(collator_data)?; + self.min_lt.fetch_max(self.max_lt, Ordering::Relaxed); Ok(()) } - // checks if a number of parallel transactilns is not too big, waits and finalizes some if needed. + // checks limits of parallel transactions reached, waits and finalizes some if needed. pub async fn check_parallel_transactions(&mut self, collator_data: &mut CollatorData) -> Result<()> { log::trace!("{}: check_parallel_transactions", self.collated_block_descr); - if self.wait_tr.count() >= self.max_collate_threads { + if self.parallel_msgs_counter.limits_reached() { self.wait_transaction(collator_data).await?; } Ok(()) @@ -778,19 +1252,32 @@ impl ExecutionManager { &mut self, account_id: AccountId, msg: AsyncMessage, + msg_hash_opt: Option, prev_data: &PrevData, collator_data: &mut CollatorData, - ) -> Result<()> { + ) -> Result> { log::trace!("{}: execute (adding into queue): {:x}", self.collated_block_descr, account_id); + let msg_sync_key = self.get_next_msg_sync_key(); + + // store last processed internal (incl. New) message LT HASH in buffer + if let Some(lt_hash) = match &msg { + AsyncMessage::Int(enq, _) => Some((enq.created_lt(), enq.message_hash())), + AsyncMessage::New(env, _, created_lt) => Some((*created_lt, env.message_hash())), + _ => None, + } { + collator_data.add_last_proc_int_msg_to_buffer(msg_sync_key, lt_hash); + } + if let Some((sender, _handle)) = self.changed_accounts.get(&account_id) { self.wait_tr.request(); - sender.send(Arc::new(msg))?; + let msg = Arc::new(AsyncMessageSync(msg_sync_key, msg)); + sender.send(msg)?; } else { let shard_acc = if let Some(shard_acc) = prev_data.accounts().account(&account_id)? { shard_acc } else if let AsyncMessage::Ext(_, msg_id) = msg { collator_data.rejected_ext_messages.push((msg_id, format!("account {:x} not found", account_id))); - return Ok(()); // skip external messages for unexisting accounts + return Ok(None); // skip external messages for unexisting accounts } else { ShardAccount::default() }; @@ -799,27 +1286,31 @@ impl ExecutionManager { shard_acc, )?; self.wait_tr.request(); - sender.send(Arc::new(msg))?; - self.changed_accounts.insert(account_id, (sender, handle)); + let msg = Arc::new(AsyncMessageSync(msg_sync_key, msg)); + sender.send(msg)?; + self.changed_accounts.insert(account_id.clone(), (sender, handle)); } + self.append_msgs_queue(msg_sync_key, &account_id, msg_hash_opt); + self.parallel_msgs_counter.add_account_msgs_counter(account_id).await; + self.check_parallel_transactions(collator_data).await?; - Ok(()) + Ok(Some(msg_sync_key)) } fn start_account_job( &self, account_addr: AccountId, shard_acc: ShardAccount, - ) -> Result<(tokio::sync::mpsc::UnboundedSender>, tokio::task::JoinHandle>)> { + ) -> Result<(tokio::sync::mpsc::UnboundedSender>, tokio::task::JoinHandle>)> { log::trace!("{}: start_account_job: {:x}", self.collated_block_descr, account_addr); let mut shard_acc = ShardAccountStuff::new( account_addr, shard_acc, - Arc::new(AtomicU64::new(self.min_lt.load(Ordering::Relaxed))), - serde_opts_from_caps(&self.config) + self.min_lt.load(Ordering::Relaxed), + self.serde_opts )?; let debug = self.debug; @@ -829,34 +1320,49 @@ impl ExecutionManager { #[cfg(feature = "signature_with_id")] let signature_id = self.signature_id; let collated_block_descr = self.collated_block_descr.clone(); - let total_trans_duration = self.total_trans_duration.clone(); - let wait_tr = self.wait_tr.clone(); + let exec_mgr_total_trans_duration_rw = self.total_trans_duration.clone(); + let exec_mgr_wait_tr = self.wait_tr.clone(); let config = self.config.clone(); - let min_lt = self.min_lt.clone(); - let max_lt = self.max_lt.clone(); + let exec_mgr_min_lt_ro = self.min_lt.clone(); let libraries = self.libraries.clone().inner(); - let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel::>(); + let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel::>(); + let cancellation_token = self.cancellation_token.clone(); + #[cfg(test)] + let test_msg_process_sleep = self.test_msg_process_sleep; let handle = tokio::spawn(async move { while let Some(new_msg) = receiver.recv().await { + if cancellation_token.is_cancelled() { + log::debug!( + "{}: parallel collation was cancelled before message {} processing on {:x}", + collated_block_descr, + new_msg.0, + shard_acc.account_addr(), + ); + exec_mgr_wait_tr.respond(None); + break; + } + + #[cfg(test)] + { + let sleep_ms = test_msg_process_sleep; + log::trace!("{}: (msg_sync_key: {}) sleep {}ms to emulate hard load and slow smart contract...", collated_block_descr, new_msg.0, sleep_ms); + tokio::time::sleep(tokio::time::Duration::from_millis(sleep_ms)).await; + } + log::trace!("{}: new message for {:x}", collated_block_descr, shard_acc.account_addr()); let config = config.clone(); // TODO: use Arc - shard_acc.lt().fetch_max(min_lt.load(Ordering::Relaxed), Ordering::Relaxed); - shard_acc.lt().fetch_max( - shard_acc.last_trans_lt() + 1, - Ordering::Relaxed - ); - shard_acc.lt().fetch_max( - shard_acc.last_trans_lt() + 1, - Ordering::Relaxed - ); + let mut lt = shard_acc.lt().max(exec_mgr_min_lt_ro.load(Ordering::Relaxed)); // 1000 + lt = lt.max(shard_acc.last_trans_lt() + 1); // 1010+1=1011 + + let tx_last_lt = Arc::new(AtomicU64::new(lt)); let mut account_root = shard_acc.account_root(); let params = ExecuteParams { state_libs: libraries.clone(), block_unixtime, block_lt, - last_tr_lt: shard_acc.lt(), + last_tr_lt: tx_last_lt.clone(), // 1011, passed by reference seed_block: seed_block.clone(), debug, block_version: supported_version(), @@ -868,25 +1374,48 @@ impl ExecutionManager { let (mut transaction_res, account_root, duration) = tokio::task::spawn_blocking(move || { let now = std::time::Instant::now(); ( - Self::execute_new_message(&new_msg1, &mut account_root, config, params), + Self::execute_new_message(&new_msg1.1, &mut account_root, config, params), account_root, now.elapsed().as_micros() as u64 ) }).await?; - if let Ok(transaction) = transaction_res.as_mut() { - let res = shard_acc.add_transaction(transaction, account_root); - if let Err(err) = res { - log::error!("FAILED to add transaction to shard account staff: {}", &err); - fail!(err); - } + if cancellation_token.is_cancelled() { + log::debug!( + "{}: parallel collation was cancelled after message {} processing on {:x}", + collated_block_descr, + new_msg.0, + shard_acc.account_addr(), + ); + exec_mgr_wait_tr.respond(None); + break; } - total_trans_duration.fetch_add(duration, Ordering::Relaxed); + + // LT transformations during execution: + // * params.last_tr_lt = max(account.last_tr_time(), params.last_tr_lt, in_msg.lt() + 1) + // * transaction.logical_time() = params.last_tr_lt (copy) + // * params.last_tr_lt = 1 + out_msgs.len() + // * tx_last_lt = params.last_tr_lt (by ref) + // So for 2 out_msgs may be: + // * transaction.logical_time() == 1011 + // * params.last_tr_lt == 1014 (1011+1+2) or 1104 (account.last_tr_time()+1+2) or 1024 (in_msg.lt()+1+1+2) + // * account.last_tr_time() == params.last_tr_lt == 1014 or 1104 or 1024 + // * tx_last_lt == params.last_tr_lt == 1014 or 1104 or 1024 + + let tx_last_lt = tx_last_lt.load(Ordering::Relaxed); + + shard_acc.apply_transaction_res( + new_msg.0, + tx_last_lt, + &mut transaction_res, + account_root.clone() + )?; + + exec_mgr_total_trans_duration_rw.fetch_add(duration, Ordering::Relaxed); log::trace!("{}: account {:x} TIME execute {}μ;", collated_block_descr, shard_acc.account_addr(), duration); - max_lt.fetch_max(shard_acc.lt().load(Ordering::Relaxed), Ordering::Relaxed); - wait_tr.respond(Some((new_msg, transaction_res))); + exec_mgr_wait_tr.respond(Some((new_msg, transaction_res, tx_last_lt))); } Ok(shard_acc) }); @@ -903,7 +1432,7 @@ impl ExecutionManager { AsyncMessage::Int(enq, _our) => { (Box::new(OrdinaryTransactionExecutor::new(config)), Some(enq.message())) } - AsyncMessage::New(env, _prev_tr_cell) => { + AsyncMessage::New(env, _prev_tr_cell, _created_lt) => { (Box::new(OrdinaryTransactionExecutor::new(config)), Some(env.message())) } AsyncMessage::Recover(msg) | AsyncMessage::Mint(msg) | AsyncMessage::Ext(msg, _) => { @@ -916,35 +1445,44 @@ impl ExecutionManager { (Box::new(TickTockTransactionExecutor::new(config, tt.clone())), None) } }; - let res = executor.execute_with_libs_and_params(msg_opt, account_root, params); - res + executor.execute_with_libs_and_params(msg_opt, account_root, params) } async fn wait_transaction(&mut self, collator_data: &mut CollatorData) -> Result<()> { log::trace!("{}: wait_transaction", self.collated_block_descr); let wait_op = self.wait_tr.wait(&mut self.receive_tr, false).await; - if let Some(Some((new_msg, transaction_res))) = wait_op { - self.finalize_transaction(new_msg, transaction_res, collator_data)?; + if let Some(Some((new_msg, transaction_res, tx_last_lt))) = wait_op { + // we can safely decrease parallel_msgs_counter because + // sender sends some until parallel processing not cancelled + let account_id = self.finalize_transaction(&new_msg, transaction_res, tx_last_lt, collator_data)?; + // decrease account msgs counter to control parallel processing limits + self.parallel_msgs_counter.sub_account_msgs_counter(account_id).await; + + // mark message as processed + self.set_msg_processed(new_msg.0); } Ok(()) } fn finalize_transaction( &mut self, - new_msg: Arc, + new_msg_sync: &AsyncMessageSync, transaction_res: Result, + tx_last_lt: u64, collator_data: &mut CollatorData - ) -> Result<()> { - if let AsyncMessage::Ext(msg, msg_id) = new_msg.deref() { - let account_id = msg.get_std()?.int_dst_account_id().unwrap_or_default(); + ) -> Result { + let AsyncMessageSync(msg_sync_key, new_msg) = new_msg_sync; + if let AsyncMessage::Ext(msg, msg_id) = new_msg { + let msg = msg.get_std()?; + let account_id = msg.int_dst_account_id().unwrap_or_default(); if let Err(err) = transaction_res { log::warn!( target: EXT_MESSAGES_TRACE_TARGET, "{}: account {:x} rejected inbound external message {:x}, by reason: {}", self.collated_block_descr, account_id, msg_id, err ); - collator_data.rejected_ext_messages.push((msg_id.clone(), err.to_string())); - return Ok(()) + collator_data.add_rejected_ext_message_to_buffer(*msg_sync_key, (msg_id.clone(), err.to_string())); + return Ok(account_id); } else { log::debug!( target: EXT_MESSAGES_TRACE_TARGET, @@ -952,16 +1490,17 @@ impl ExecutionManager { self.collated_block_descr, account_id, msg_id, ); collator_data.accepted_ext_messages.push( - (msg_id.clone(), msg.get_std()?.dst_workchain_id().unwrap_or_default()) + (msg_id.clone(), msg.dst_workchain_id().unwrap_or_default()) ); } } let opts = collator_data.serde_opts; let tr = transaction_res?; + let account_id = tr.account_id().clone(); let tr_cell = ChildCell::with_struct_and_opts(&tr, opts)?; log::trace!("{}: finalize_transaction {} with hash {:x}, {:x}", self.collated_block_descr, tr.logical_time(), tr_cell.cell().repr_hash(), tr.account_id()); - let in_msg_opt = match new_msg.deref() { + let in_msg_opt = match new_msg { AsyncMessage::Int(enq, our) => { let in_msg = InMsg::final_msg( enq.envelope_cell(), @@ -973,12 +1512,14 @@ impl ExecutionManager { enq.envelope_cell(), ChildCell::with_struct_and_opts(&in_msg, opts)?, ); - collator_data.add_out_msg_to_block(enq.message_hash(), &out_msg)?; - collator_data.del_out_msg_from_state(&enq.out_msg_key())?; + let msg_hash = enq.message_hash(); + let prev_out_msg_slice_opt = collator_data.add_out_msg_to_block(msg_hash.clone(), &out_msg)?; + collator_data.add_out_msg_descr_to_history(*msg_sync_key, msg_hash, prev_out_msg_slice_opt); + collator_data.del_out_queue_msg_with_history(*msg_sync_key, enq.out_msg_key(), false)?; } Some(in_msg) } - AsyncMessage::New(env, prev_tr_cell) => { + AsyncMessage::New(env, prev_tr_cell, _created_lt) => { let env_cell = ChildCell::with_struct_and_opts(env.inner(), opts)?; let in_msg = InMsg::immediate( env_cell.clone(), @@ -990,7 +1531,10 @@ impl ExecutionManager { ChildCell::with_cell_and_opts(prev_tr_cell.clone(), opts), ChildCell::with_struct_and_opts(&in_msg, opts)?, ); - collator_data.add_out_msg_to_block(env.message_hash(), &out_msg)?; + let msg_hash = env.message_hash(); + let prev_out_msg_slice_opt = collator_data.add_out_msg_to_block(msg_hash.clone(), &out_msg)?; + collator_data.add_out_msg_descr_to_history(*msg_sync_key, msg_hash, prev_out_msg_slice_opt); + collator_data.del_out_queue_msg_with_history(*msg_sync_key, env.out_msg_key(), true)?; Some(in_msg) } AsyncMessage::Mint(msg) | @@ -1030,17 +1574,155 @@ impl ExecutionManager { tr.in_msg_cell().unwrap_or_default().repr_hash() ); } - collator_data.new_transaction(&tr, tr_cell, in_msg_opt.as_ref())?; - collator_data.update_lt(self.max_lt.load(Ordering::Relaxed)); + collator_data.new_transaction(&tr, tr_cell, in_msg_opt.as_ref(), *msg_sync_key)?; + + collator_data.add_tx_last_lt_to_buffer(*msg_sync_key, tx_last_lt); - match new_msg.deref() { - AsyncMessage::Mint(_) => collator_data.mint_msg = in_msg_opt, - AsyncMessage::Recover(_) => collator_data.recover_create_msg = in_msg_opt, - AsyncMessage::Copyleft(_) => collator_data.copyleft_msgs.push(in_msg_opt.ok_or_else(|| error!("Can't unwrap `in_msg_opt`"))?), + match new_msg { + AsyncMessage::Mint(_) => collator_data.add_mint_msg_to_buffer(*msg_sync_key, in_msg_opt), + AsyncMessage::Recover(_) => collator_data.add_recover_create_msg_to_buffer(*msg_sync_key, in_msg_opt), + AsyncMessage::Copyleft(_) => collator_data.add_copyleft_msg_to_buffer(*msg_sync_key, in_msg_opt.ok_or_else(|| error!("Can't unwrap `in_msg_opt`"))?), _ => () } + // Will not support history. When parallel collation cancelled + // no new msgs can be processed so we do not need to check limits anymore collator_data.block_full |= !collator_data.limit_fits(ParamLimitIndex::Normal); + Ok(account_id) + } + + /// Actually the length of messages queue + fn get_next_msg_sync_key(&self) -> usize { + self.msgs_queue.len() + } + fn append_msgs_queue(&mut self, msg_sync_key: usize, account_addr: &AccountId, msg_hash_opt: Option) { + self.msgs_queue.insert(msg_sync_key, (account_addr.clone(), false, msg_hash_opt)); + } + fn set_msg_processed(&mut self, msg_sync_key: usize) { + if let Some(entry) = self.msgs_queue.get_mut(msg_sync_key) { + entry.1 = true; + self.accounts_processed_msgs + .entry(entry.0.clone()) + .and_modify(|list| list.push(msg_sync_key)) + .or_insert([msg_sync_key].into()); + } + } + fn revert_last_account_processed_msg(&mut self, account_addr: &AccountId) { + if let Some(list) = self.accounts_processed_msgs.get_mut(account_addr) { + list.pop(); + } + } + + fn accounts_processed_msgs(&self) -> &HashMap> { + &self.accounts_processed_msgs + } + fn get_last_processed_msg_sync_key<'a>( + accounts_processed_msgs: &'a HashMap>, + for_account_addr: &'a AccountId, + ) -> Option<&'a usize> { + if let Some(entry) = accounts_processed_msgs.get(for_account_addr) { + entry.last() + } else { + None + } + } + + /// Signal to cancellation_token due to a finalizing timeout + pub fn cancel_parallel_processing(&mut self) { + self.cancellation_token.cancel(); + } + /// When cancellation_token was cancelled due to a finalizing timeout + pub fn is_parallel_processing_cancelled(&self) -> bool { + self.cancellation_token.is_cancelled() + } + + fn commit_processed_msgs_changes(&mut self, collator_data: &mut CollatorData) -> Result<()> { + // revert processed messages which going after first unprocessed + let mut msgs_to_revert = vec![]; + let mut msgs_to_revert_last_proc_int = vec![]; + let mut found_first_unprocessed = false; + for msg_sync_key in 0..self.msgs_queue.len() { + if let Some((account_addr, processed, msg_hash)) = self.msgs_queue.get(msg_sync_key) { + if *processed { + // collect all processed messages which going after first unprocessed + if found_first_unprocessed { + msgs_to_revert.push((account_addr.clone(), msg_sync_key, msg_hash.clone())); + msgs_to_revert_last_proc_int.push(msg_sync_key); + } + } else { + if !found_first_unprocessed { + found_first_unprocessed = true; + } + msgs_to_revert_last_proc_int.push(msg_sync_key); + } + } + } + metrics::gauge!("reverted_transactions", msgs_to_revert.len() as f64); + for (account_addr, msg_sync_key, msg_hash) in msgs_to_revert.into_iter().rev() { + if let Some(msg_info) = self.msgs_queue.get_mut(msg_sync_key) { + msg_info.1 = false; + } + self.revert_msg_changes(collator_data, &msg_sync_key, &account_addr)?; + log::debug!( + "{}: reverted changes from message {:x} (sync_key: {}) on account {:x}", + self.collated_block_descr, msg_hash.unwrap_or_default(), msg_sync_key, account_addr, + ); + } + for msg_sync_key in msgs_to_revert_last_proc_int { + collator_data.revert_last_proc_int_msg_by_src_msg(&msg_sync_key); + } + + // commit all not reverted changes + self.commit_not_reverted_changes(collator_data)?; + + log::debug!("{}: all not reverted account changes committed", self.collated_block_descr); + + Ok(()) + } + fn revert_msg_changes( + &mut self, + collator_data: &mut CollatorData, + msg_sync_key: &usize, + account_addr: &AccountId, + ) -> Result<()> { + collator_data.execute_count -= 1; + + collator_data.revert_in_msgs_descr_by_src_msg(msg_sync_key)?; + collator_data.revert_out_msgs_descr_by_src_msg(msg_sync_key)?; + collator_data.revert_accepted_ext_message_by_src_msg(msg_sync_key); + collator_data.revert_rejected_ext_message_by_src_msg(msg_sync_key); + collator_data.revert_del_out_queue_msg(msg_sync_key)?; + collator_data.revert_add_out_queue_msgs(msg_sync_key)?; + collator_data.revert_new_messages_by_src_msg(msg_sync_key); + collator_data.revert_mint_msg_by_src_msg(msg_sync_key); + collator_data.revert_recover_create_msg_by_src_msg(msg_sync_key); + collator_data.revert_copyleft_msg_by_src_msg(msg_sync_key); + collator_data.revert_tx_last_lt_by_src_msg(msg_sync_key); + + self.revert_last_account_processed_msg(account_addr); + + Ok(()) + } + fn commit_not_reverted_changes(&mut self, collator_data: &mut CollatorData) -> Result<()> { + collator_data.commit_in_msgs_descr_by_src_msg(); + collator_data.commit_out_msgs_descr_by_src_msg(); + collator_data.commit_accepted_ext_messages(); + collator_data.commit_rejected_ext_messages(); + collator_data.commit_del_out_queue_msgs()?; + collator_data.commit_add_out_queue_msgs()?; + collator_data.commit_new_messages(); + + collator_data.commit_mint_msg(); + collator_data.commit_recover_create_msg(); + collator_data.commit_copyleft_msgs(); + + collator_data.commit_last_proc_int_msg()?; + + // save max lt + if let Some(max_lt) = collator_data.commit_tx_last_lt() { + self.max_lt = max_lt; + } + Ok(()) } } @@ -1066,6 +1748,9 @@ pub struct Collator { started: Instant, stop_flag: Arc, + + #[cfg(test)] + test_msg_process_sleep: u64, } impl Collator { @@ -1168,9 +1853,16 @@ impl Collator { collator_settings, started: Instant::now(), stop_flag: Arc::new(AtomicBool::new(false)), + #[cfg(test)] + test_msg_process_sleep: 0, }) } + #[cfg(test)] + pub fn set_test_msg_process_sleep(&mut self, sleep_timeout: u64) { + self.test_msg_process_sleep = sleep_timeout; + } + pub async fn collate(mut self) -> Result<(BlockCandidate, ShardStateUnsplit)> { log::info!( "{}: COLLATE min_mc_seqno = {}, prev_blocks_ids: {} {}", @@ -1347,6 +2039,7 @@ impl Collator { usage_tree, &prev_data, is_masterchain, + self.collated_block_descr.clone(), )?; if !self.shard.is_masterchain() { let (now_upper_limit, before_split, _accept_msgs) = check_this_shard_mc_info( @@ -1445,18 +2138,10 @@ impl Collator { // compute created / minted / recovered / from_prev_blk self.update_value_flow(mc_data, &prev_data, collator_data)?; - let mut exec_manager = ExecutionManager::new( - collator_data.gen_utime(), - collator_data.start_lt()?, - self.rand_seed.clone(), - #[cfg(feature = "signature_with_id")] - mc_data.state().state()?.global_id(), // Use network global ID as signature ID - mc_data.libraries()?.clone(), - collator_data.config.clone(), - self.engine.collator_config().max_collate_threads as usize, - self.collated_block_descr.clone(), - self.debug, - )?; + let mut exec_manager = ExecutionManager::new(self, collator_data, mc_data)?; + + #[cfg(test)] + exec_manager.set_test_msg_process_sleep(self.test_msg_process_sleep); // tick & special transactions if self.shard.is_masterchain() { @@ -1500,10 +2185,11 @@ impl Collator { metrics::histogram!("collator_process_inbound_external_messages_time", now.elapsed()); // process newly-generated messages (if space&gas left) - // (if we were unable to process all inbound messages, all new messages must be queued) + // (all new messages were queued already, we remove them if we process them) let now = std::time::Instant::now(); - self.process_new_messages(!collator_data.inbound_queues_empty, prev_data, - collator_data, &mut exec_manager).await?; + if collator_data.inbound_queues_empty { + self.process_new_messages(prev_data, collator_data, &mut exec_manager).await?; + } log::debug!("{}: TIME: process_new_messages {}ms;", self.collated_block_descr, now.elapsed().as_millis()); metrics::histogram!("collator_process_new_messages_time", now.elapsed()); @@ -1558,10 +2244,6 @@ impl Collator { true, mc_data, prev_data, collator_data, &mut exec_manager).await?; } - // process newly-generated messages (only by including them into output queue) - self.process_new_messages( - true, prev_data, collator_data, &mut exec_manager).await?; - // If block is empty - stop collation to try one more time (may be there are some new messages) let cc = self.engine.collator_config(); if !self.after_split && @@ -2366,7 +3048,13 @@ impl Collator { if (tick_tock.tock && tock) || (tick_tock.tick && !tock) { let tt = if tock {TransactionTickTock::Tock} else {TransactionTickTock::Tick}; // different accounts can produce messages with same LT which cause order violation - exec_manager.execute(account_id, AsyncMessage::TickTock(tt), prev_data, collator_data).await?; + exec_manager.execute( + account_id, + AsyncMessage::TickTock(tt), + None, + prev_data, + collator_data + ).await?; } Ok(()) @@ -2436,8 +3124,16 @@ impl Collator { ); hdr.created_lt = collator_data.start_lt()?; hdr.created_at = collator_data.gen_utime.into(); - let msg = CommonMessage::Std(Message::with_int_header(hdr)); - exec_manager.execute(account_id, f(msg), prev_data, collator_data).await?; + let msg = Message::with_int_header(hdr); + let msg_hash_opt = Some(msg.hash()?); + let msg = CommonMessage::Std(msg); + exec_manager.execute( + account_id, + f(msg), + msg_hash_opt, + prev_data, + collator_data + ).await?; Ok(()) } @@ -2460,7 +3156,9 @@ impl Collator { "{}: message {:x}, lt: {}, enq lt: {}", self.collated_block_descr, key, created_lt, enq.enqueued_lt() ); - collator_data.update_last_proc_int_msg((created_lt, enq.message_hash()))?; + + // Do not need to update last processed int message LT_HASH here + // if it is already processed or not sent to us if collator_data.out_msg_queue_info.already_processed(&enq)? { log::trace!( "{}: message {:x} has been already processed by us before, skipping", @@ -2470,13 +3168,23 @@ impl Collator { self.check_inbound_internal_message(&key, &enq, created_lt, block_id.shard()) .map_err(|err| error!("problem processing internal inbound message \ with hash {:x} : {}", key.hash, err))?; + let src_addr = enq.src_account_id()?; let our = self.shard.contains_full_prefix(&enq.cur_prefix()); let to_us = self.shard.contains_full_prefix(&enq.dst_prefix()); if to_us { let account_id = enq.dst_account_id()?; - log::debug!("{}: message {:x} sent to execution to account {:x}", self.collated_block_descr, key.hash, account_id); let msg = AsyncMessage::Int(enq, our); - exec_manager.execute(account_id, msg, prev_data, collator_data).await?; + let msg_sync_key = exec_manager.execute( + account_id.clone(), + msg, + Some(key.hash.clone()), + prev_data, + collator_data + ).await?; + log::debug!( + "{}: int message {:x} (sync_key: {:?}) from {:x} sent to execution to account {:x}", + self.collated_block_descr, key.hash, msg_sync_key, src_addr, account_id, + ); } else { // println!("{:x} {:#}", key, enq); // println!("cur: {}, dst: {}", enq.cur_prefix(), enq.dst_prefix()); @@ -2488,7 +3196,7 @@ impl Collator { } } if collator_data.block_full { - log::debug!("{}: BLOCK FULL, stop processing internal messages", self.collated_block_descr); + log::debug!("{}: BLOCK FULL (>= Soft), stop processing internal messages", self.collated_block_descr); break } if self.check_cutoff_timeout() { @@ -2510,8 +3218,7 @@ impl Collator { created_lt: u64, nb_shard: &ShardIdent, ) -> Result<()> { - let header = enq.message().get_std().ok().and_then(|msg| msg.int_header()) - .ok_or_else(|| error!("message is not internal"))?; + let header = enq.int_header()?; if created_lt != header.created_lt { fail!("inbound internal message has an augmentation value in source OutMsgQueue \ distinct from the one in its contents") @@ -2546,42 +3253,54 @@ impl Collator { ); return Ok(()) } + + if exec_manager.is_parallel_processing_cancelled() { + log::debug!("{}: parallel processing cancelled, skipping processing of inbound external messages", self.collated_block_descr); + return Ok(()) + } + log::debug!("{}: process_inbound_external_messages", self.collated_block_descr); let finish_time_ms = self.get_external_messages_finish_time_micros(); let mut iter = self.engine.get_external_messages_iterator(self.shard.clone(), finish_time_ms); - loop { - let Some((msg, msg_id)) = iter.next() else { - break; - }; + while let Some((msg, msg_id)) = iter.next() { + self.check_stop_flag()?; let header = msg.ext_in_header() .ok_or_else(|| error!("message {:x} is not external inbound message", msg_id))?; - if self.shard.contains_address(&header.dst)? { - if !collator_data.limit_fits(ParamLimitIndex::Soft) { - log::debug!( - "{}: BLOCK FULL, stop processing external messages", - self.collated_block_descr - ); - break - } - if self.check_cutoff_timeout() { - log::warn!( - "{}: TIMEOUT is elapsed, stop processing external messages", - self.collated_block_descr - ); - break - } - log::debug!("{}: message {:x} sent to execution", self.collated_block_descr, msg_id); - let (_, account_id) = header.dst.extract_std_address(true)?; - log::debug!("{}: message {:x} sent to execution", self.collated_block_descr, msg_id); - let msg = AsyncMessage::Ext(CommonMessage::Std(msg.deref().clone()), msg_id); - exec_manager.execute(account_id, msg, prev_data, collator_data).await?; - } else { + if !self.shard.contains_address(&header.dst)? { // usually node collates more than one shard, the message can belong another one, // so we can't postpone it // (difference with t-node) // collator_data.to_delay.push(id); + continue; } - self.check_stop_flag()?; + + if !collator_data.limit_fits(ParamLimitIndex::Soft) { + log::debug!( + "{}: BLOCK FULL, stop processing external messages", + self.collated_block_descr + ); + break + } + if self.check_cutoff_timeout() { + log::warn!( + "{}: TIMEOUT is elapsed, stop processing external messages", + self.collated_block_descr + ); + break + } + let (_, account_id) = header.dst.extract_std_address(true)?; + let msg = AsyncMessage::Ext(CommonMessage::Std(msg.deref().clone()), msg_id.clone()); + let msg_sync_key = exec_manager.execute( + account_id.clone(), + msg, + Some(msg_id.clone()), + prev_data, + collator_data + ).await?; + log::debug!( + "{}: ext message {:x} (sync_key: {:?}) sent to execution to account {:x}", + self.collated_block_descr, msg_id, msg_sync_key, account_id, + ); } exec_manager.wait_transactions(collator_data).await?; let (accepted, rejected) = collator_data.withdraw_ext_msg_statuses(); @@ -2635,11 +3354,20 @@ impl Collator { self.collated_block_descr); break; } else { - log::trace!("{}: remp message {:x} sent to execution", self.collated_block_descr, id); let (_, account_id) = header.dst.extract_std_address(true)?; let msg_std = CommonMessage::Std(msg.deref().clone()); - let msg = AsyncMessage::Ext(msg_std, id); - exec_manager.execute(account_id, msg, prev_data, collator_data).await?; + let msg = AsyncMessage::Ext(msg_std, id.clone()); + let msg_sync_key = exec_manager.execute( + account_id.clone(), + msg, + Some(id.clone()), + prev_data, + collator_data + ).await?; + log::trace!( + "{}: remp message {:x} (sync_key: {:?}) sent to execution to account {:x}", + self.collated_block_descr, id, msg_sync_key, account_id, + ); } } else { remp_collator_interface.update_message_collation_result( @@ -2677,41 +3405,64 @@ impl Collator { async fn process_new_messages( &self, - mut enqueue_only: bool, prev_data: &PrevData, collator_data: &mut CollatorData, exec_manager: &mut ExecutionManager, ) -> Result<()> { + if exec_manager.is_parallel_processing_cancelled() { + log::debug!("{}: parallel processing cancelled, skipping processing of new messages", self.collated_block_descr); + return Ok(()) + } + log::debug!("{}: process_new_messages", self.collated_block_descr); let use_hypercube = !collator_data.config.has_capability(GlobalCapabilities::CapOffHypercube); let opts = collator_data.serde_opts; - while !collator_data.new_messages.is_empty() { + let mut stop_processing = false; + while !stop_processing && !collator_data.new_messages.is_empty() { // In the iteration we execute only existing messages. // Newly generating messages will be executed next itaration (only after waiting). let mut new_messages = std::mem::take(&mut collator_data.new_messages); + log::debug!("{}: new_messages count: {}", self.collated_block_descr, new_messages.len()); // we can get sorted items somehow later while let Some(NewMessage{ lt_hash: (created_lt, hash), msg, tr_cell, prefix }) = new_messages.pop() { + if !collator_data.limit_fits(ParamLimitIndex::Soft) { + log::debug!("{}: BLOCK FULL (>= Medium), stop processing new messages", self.collated_block_descr); + stop_processing = true; + break; + } + if self.check_cutoff_timeout() { + log::warn!("{}: TIMEOUT ({}ms) is elapsed, stop processing new messages", + self.collated_block_descr, self.engine.collator_config().cutoff_timeout_ms, + ); + stop_processing = true; + break; + } let std_msg = msg.get_std()?; let info = std_msg.int_header().ok_or_else(|| error!("message is not internal"))?; + if !self.shard.contains_address(&info.dst)? { + // skip msg if it is not to our shard + continue; + } + CHECK!(info.created_at.as_u32(), collator_data.gen_utime); + let fwd_fee = *info.fwd_fee(); - enqueue_only |= collator_data.block_full | self.check_cutoff_timeout(); - if enqueue_only || !self.shard.contains_address(&info.dst)? { - // everything was made in new_transaction - } else { - CHECK!(info.created_at.as_u32(), collator_data.gen_utime); - let key = OutMsgQueueKey::with_account_prefix(&prefix, hash.clone()); - collator_data.out_msg_queue_info.del_message(&key)?; - collator_data.enqueue_count -= 1; - - let env = MsgEnvelopeStuff::new(msg, &self.shard, fwd_fee, use_hypercube, opts)?; - let account_id = env.message().get_std()?.int_dst_account_id().unwrap_or_default(); - collator_data.update_last_proc_int_msg((created_lt, hash))?; - let msg = AsyncMessage::New(env, tr_cell); - log::debug!("{}: message {:x} sent to execution", self.collated_block_descr, key.hash); - exec_manager.execute(account_id, msg, prev_data, collator_data).await?; - }; + let env = MsgEnvelopeStuff::new(msg, &self.shard, fwd_fee, use_hypercube, opts)?; + let account_id = env.message().get_std()?.int_dst_account_id().unwrap_or_default(); + let msg_hash_opt = Some(env.message_hash()); + let msg = AsyncMessage::New(env, tr_cell, created_lt); + let msg_sync_key = exec_manager.execute( + account_id.clone(), + msg, + msg_hash_opt, + prev_data, + collator_data + ).await?; + log::debug!( + "{}: new int message {:x} (sync_key: {:?}) sent to execution to account {:x}", + self.collated_block_descr, hash, msg_sync_key, account_id, + ); self.check_stop_flag()?; } exec_manager.wait_transactions(collator_data).await?; @@ -2827,8 +3578,16 @@ impl Collator { hdr.bounce = false; hdr.created_lt = collator_data.start_lt()?; hdr.created_at = UnixTime32::new(collator_data.gen_utime); - let msg = CommonMessage::Std(Message::with_int_header(hdr)); - exec_manager.execute(account_id, AsyncMessage::Copyleft(msg), prev_data, collator_data).await?; + let msg = Message::with_int_header(hdr); + let msg_hash_opt = Some(msg.hash()?); + let msg = CommonMessage::Std(msg); + exec_manager.execute( + account_id, + AsyncMessage::Copyleft(msg), + msg_hash_opt, + prev_data, + collator_data + ).await?; self.check_stop_flag()?; } @@ -2866,10 +3625,24 @@ impl Collator { let mut changed_accounts = HashMap::new(); let mut new_config_opt = None; let mut current_workchain_copyleft_rewards = CopyleftRewards::default(); + let accounts_processed_msgs = exec_manager.accounts_processed_msgs().clone(); for (account_id, (sender, handle)) in exec_manager.changed_accounts.drain() { std::mem::drop(sender); let mut shard_acc = handle.await .map_err(|err| error!("account {:x} thread didn't finish: {}", account_id, err))??; + + // commit account state by last processed msg before the canceling of parallel collation + shard_acc = match ExecutionManager::get_last_processed_msg_sync_key( + &accounts_processed_msgs, + &account_id, + ) { + None => continue, + Some(msg_sync_key) => match shard_acc.commit(*msg_sync_key)? { + None => continue, + Some(committed) => committed, + } + }; + let account = shard_acc.read_account()?; if let Some(addr) = &config_addr { if addr == &account_id { @@ -2884,7 +3657,7 @@ impl Collator { if !acc_block.transactions().is_empty() { accounts.insert(&acc_block)?; } - current_workchain_copyleft_rewards.merge_rewards(shard_acc.copyleft_rewards())?; + current_workchain_copyleft_rewards.merge_rewards(shard_acc.copyleft_rewards()?)?; changed_accounts.insert(account_id, shard_acc); } @@ -3144,23 +3917,24 @@ impl Collator { if workchain_id != -1 && (collator_data.dequeue_count != 0 || collator_data.enqueue_count != 0 || collator_data.in_msg_count != 0 || collator_data.out_msg_count != 0 || collator_data.execute_count != 0 - || collator_data.transit_count != 0 || collator_data.remove_count != 0 + || collator_data.transit_count != 0 || changed_accounts.len() != 0 || collator_data.remove_count != 0 ) { log::debug!( "{}: finalize_block finished: dequeue_count: {}, enqueue_count: {}, in_msg_count: {}, out_msg_count: {}, \ - execute_count: {}, transit_count: {}, remove_count: {} msg_queue_depth_sum: {}", + execute_count: {}, transit_count: {}, changed_accounts: {}, remove_count: {} msg_queue_depth_sum: {}", self.collated_block_descr, collator_data.dequeue_count, collator_data.enqueue_count, collator_data.in_msg_count, collator_data.out_msg_count, collator_data.execute_count, - collator_data.transit_count, collator_data.remove_count, collator_data.msg_queue_depth_sum + collator_data.transit_count, changed_accounts.len(), + collator_data.remove_count, collator_data.msg_queue_depth_sum ); } log::trace!( "{}: finalize_block finished: dequeue_count: {}, enqueue_count: {}, in_msg_count: {}, out_msg_count: {}, \ - execute_count: {}, transit_count: {}, remove_count: {}, data len: {}", + execute_count: {}, transit_count: {}, changed_accounts: {}, remove_count: {}, data len: {}", self.collated_block_descr, collator_data.dequeue_count, collator_data.enqueue_count, collator_data.in_msg_count, collator_data.out_msg_count, collator_data.execute_count, - collator_data.transit_count, collator_data.remove_count, candidate.data.len() + collator_data.transit_count, changed_accounts.len(), collator_data.remove_count, candidate.data.len() ); Ok((candidate, new_state, exec_manager)) } @@ -3646,7 +4420,7 @@ impl Collator { if !res { fail!( "cannot apply the after-split update for {} without a corresponding sibling update", - new_info.blk_id() + new_info.block_id() ); } if let Some(ancestor) = ancestor { @@ -3847,6 +4621,13 @@ impl Collator { self.started.elapsed().as_millis() as u32 > cutoff_timeout } + // fn check_finilize_parallel_timeout(&self) -> (bool, u32) { + // ( + // self.started.elapsed().as_millis() as u32 > self.finalize_parallel_timeout_ms, + // self.finalize_parallel_timeout_ms, + // ) + // } + fn get_remaining_cutoff_time_limit_nanos(&self) -> i128 { let cutoff_timeout_nanos = self.engine.collator_config().cutoff_timeout_ms as i128 * 1_000_000; let elapsed_nanos = self.started.elapsed().as_nanos() as i128; diff --git a/src/validator/out_msg_queue.rs b/src/validator/out_msg_queue.rs index 5992902a..3ff89008 100644 --- a/src/validator/out_msg_queue.rs +++ b/src/validator/out_msg_queue.rs @@ -27,7 +27,7 @@ use ever_block::{ HashmapAugType, HashmapFilterResult, HashmapRemover, HashmapSubtree, HashmapType, IBitstring, IhrPendingInfo, LabelReader, MASTERCHAIN_ID, OutMsgQueue, OutMsgQueueInfo, OutMsgQueueKey, ProcessedInfo, ProcessedInfoKey, ProcessedUpto, Result, Serializable, - ShardIdent, ShardHashes, ShardStateUnsplit, SliceData, UInt256, UsageTree + ShardIdent, ShardHashes, ShardStateUnsplit, SliceData, UInt256, UsageTree, EnqueuedMsg, }; #[cfg(test)] @@ -622,11 +622,14 @@ impl OutMsgQueueInfoStuff { Ok(depth) } - pub fn del_message(&mut self, key: &OutMsgQueueKey) -> Result { + pub fn del_message(&mut self, key: &OutMsgQueueKey) -> Result { let labels = [("shard", self.shard().to_string())]; metrics::counter!("out_msg_queue_del", 1, &labels); - self.out_queue_mut()?.remove(key.write_to_bitstring()?)? - .ok_or_else(|| error!("error deleting from out_msg_queue dictionary: {:x}", key)) + let mut slice = self.out_queue_mut()?.remove(key.write_to_bitstring()?)? + .ok_or_else(|| error!("error deleting from out_msg_queue dictionary: {:x}", key))?; + let serde_opts = self.out_queue_mut()?.serde_opts(); + let (enq, _) = OutMsgQueue::value_aug(serde_opts, &mut slice)?; + Ok(enq) } // remove all messages which are not from new_shard @@ -1182,7 +1185,7 @@ impl MsgQueueManager { ordered_cleaning_timeout_nanos, |node_obj| { if block_full { - log::debug!("{}: BLOCK FULL when ordered cleaning output queue, cleanup is partial", self.block_descr); + log::debug!("{}: BLOCK FULL (>= Soft) when ordered cleaning output queue, cleanup is partial", self.block_descr); partial = true; return Ok(HashmapFilterResult::Stop); } @@ -1254,7 +1257,7 @@ impl MsgQueueManager { queue.hashmap_filter(|_key, mut slice| { if block_full { - log::debug!("{}: BLOCK FULL when random cleaning output queue, cleanup is partial", self.block_descr); + log::debug!("{}: BLOCK FULL (>= Soft) when random cleaning output queue, cleanup is partial", self.block_descr); partial = true; return Ok(HashmapFilterResult::Stop) } diff --git a/src/validator/out_msg_queue_cleaner.rs b/src/validator/out_msg_queue_cleaner.rs index c607b938..74f82522 100644 --- a/src/validator/out_msg_queue_cleaner.rs +++ b/src/validator/out_msg_queue_cleaner.rs @@ -775,6 +775,7 @@ impl HashmapOrderedFilterCursor { } } + // TODO: review and may be fix this because there can be many msgs with same LT // stop pocessing when max_lt reached #[cfg(not(feature = "only_sorted_clean"))] if current.node_obj_ref().lt() == self.max_lt { diff --git a/src/validator/verification/mod.rs b/src/validator/verification/mod.rs index 1c2d62b0..a0a45736 100644 --- a/src/validator/verification/mod.rs +++ b/src/validator/verification/mod.rs @@ -11,8 +11,6 @@ * limitations under the License. */ -#![cfg(not(feature = "fast_finality"))] - extern crate catchain; use crate::engine_traits::EngineOperations;