Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/out queue clean refactoring #198

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
20 changes: 10 additions & 10 deletions Cargo.toml
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,20 @@ stream-cancel = '0.8.0'
string-builder = '^0.2.0'
tokio = { features = [ 'rt-multi-thread' ], version = '1.5' }
tokio-util = '0.7'
adnl = { features = [ 'client', 'node', 'server' ], git = 'https://github.com/tonlabs/ever-adnl.git', tag = '0.9.15' }
adnl = { features = [ 'client', 'node', 'server' ], git = 'https://github.com/tonlabs/ever-adnl.git', tag = '0.9.18' }
catchain = { path = 'catchain' }
dht = { git = 'https://github.com/tonlabs/ever-dht.git', tag = '0.6.79' }
dht = { git = 'https://github.com/tonlabs/ever-dht.git', tag = '0.6.82' }
lockfree = { git = 'https://github.com/tonlabs/lockfree.git' }
overlay = { git = 'https://github.com/tonlabs/ever-overlay.git', tag = '0.7.11' }
rldp = { git = 'https://github.com/tonlabs/ever-rldp.git', tag = '0.8.8' }
overlay = { git = 'https://github.com/tonlabs/ever-overlay.git', tag = '0.7.14' }
rldp = { git = 'https://github.com/tonlabs/ever-rldp.git', tag = '0.8.11' }
storage = { path = 'storage' }
ton_abi = { git = 'https://github.com/tonlabs/ever-abi.git', optional = true, tag = '2.4.8' }
ton_api = { git = 'https://github.com/tonlabs/ever-tl.git', package = 'ton_api', tag = '0.3.53' }
ton_block = { git = 'https://github.com/tonlabs/ever-block.git', tag = '1.9.117' }
ton_block_json = { git = 'https://github.com/tonlabs/ever-block-json.git', tag = '0.7.205' }
ton_executor = { git = 'https://github.com/tonlabs/ever-executor.git', tag = '1.16.97' }
ton_abi = { git = 'https://github.com/tonlabs/ever-abi.git', optional = true, tag = '2.4.11' }
ton_api = { git = 'https://github.com/tonlabs/ever-tl.git', package = 'ton_api', tag = '0.3.55' }
ton_block = { git = 'https://github.com/tonlabs/ever-block.git', tag = '1.9.119' }
ton_block_json = { git = 'https://github.com/tonlabs/ever-block-json.git', tag = '0.7.207' }
ton_executor = { git = 'https://github.com/tonlabs/ever-executor.git', tag = '1.16.99' }
ton_types = { git = 'https://github.com/tonlabs/ever-types.git', tag = '2.0.31' }
ton_vm = { git = 'https://github.com/tonlabs/ever-vm.git', tag = '1.8.225' }
ton_vm = { git = 'https://github.com/tonlabs/ever-vm.git', tag = '1.8.227' }
validator_session = { path = 'validator-session' }

[features]
Expand Down
8 changes: 4 additions & 4 deletions catchain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ quanta = '0.11.1'
rand = '0.8'
regex = '1.3.1'
tokio = { features = [ 'rt-multi-thread' ], version = '1.5' }
adnl = { features = [ 'node' ], git = 'https://github.com/tonlabs/ever-adnl.git', tag = '0.9.15' }
overlay = { git = 'https://github.com/tonlabs/ever-overlay.git', tag = '0.7.11' }
rldp = { git = 'https://github.com/tonlabs/ever-rldp.git', tag = '0.8.8' }
adnl = { features = [ 'node' ], git = 'https://github.com/tonlabs/ever-adnl.git', tag = '0.9.18' }
overlay = { git = 'https://github.com/tonlabs/ever-overlay.git', tag = '0.7.14' }
rldp = { git = 'https://github.com/tonlabs/ever-rldp.git', tag = '0.8.11' }
storage = { path = '../storage' }
ton_api = { git = 'https://github.com/tonlabs/ever-tl.git', package = 'ton_api', tag = '0.3.53' }
ton_api = { git = 'https://github.com/tonlabs/ever-tl.git', package = 'ton_api', tag = '0.3.55' }
ton_types = { git = 'https://github.com/tonlabs/ever-types.git', tag = '2.0.31' }

[features]
Expand Down
9 changes: 9 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,32 @@ impl Default for CellsGcConfig {
pub struct CollatorConfig {
pub cutoff_timeout_ms: u32,
pub stop_timeout_ms: u32,
pub finalize_parallel_percentage_points: u32,
pub clean_timeout_percentage_points: u32,
pub optimistic_clean_percentage_points: u32,
pub max_secondary_clean_timeout_percentage_points: u32,
pub max_collate_threads: u32,
pub max_collate_msgs_queue_on_account: u32,
pub retry_if_empty: bool,
pub finalize_empty_after_ms: u32,
pub empty_collation_sleep_ms: u32
}
impl CollatorConfig {
pub fn get_finalize_parallel_timeout_ms(&self) -> u32 {
self.stop_timeout_ms * self.finalize_parallel_percentage_points / 1000
}
}
impl Default for CollatorConfig {
fn default() -> Self {
Self {
cutoff_timeout_ms: 1000,
stop_timeout_ms: 1500,
finalize_parallel_percentage_points: 800, // 0.8 = 80% * stop_timeout_ms = 1200
clean_timeout_percentage_points: 150, // 0.150 = 15% = 150ms
optimistic_clean_percentage_points: 1000, // 1.000 = 100% = 150ms
max_secondary_clean_timeout_percentage_points: 350, // 0.350 = 35% = 350ms
max_collate_threads: 10,
max_collate_msgs_queue_on_account: 3,
retry_if_empty: false,
finalize_empty_after_ms: 800,
empty_collation_sleep_ms: 100
Expand Down
185 changes: 163 additions & 22 deletions src/types/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,48 +11,112 @@
* limitations under the License.
*/

use std::sync::{atomic::AtomicU64, Arc};
use ton_block::{
Account, AccountBlock, Augmentation, CopyleftRewards, Deserializable, HashUpdate,
HashmapAugType, LibDescr, Libraries, Serializable, ShardAccount, ShardAccounts, StateInitLib,
Transaction, Transactions,
};
use ton_types::{fail, AccountId, Cell, HashmapRemover, Result, UInt256};
use ton_types::{error, fail, AccountId, Cell, HashmapRemover, Result, UInt256, SliceData};

pub struct ShardAccountStuff {
account_addr: AccountId,
account_root: Cell,
last_trans_hash: UInt256,
last_trans_lt: u64,
lt: Arc<AtomicU64>,
transactions: Transactions,
lt: u64,
transactions: Option<Transactions>,
state_update: HashUpdate,
orig_libs: StateInitLib,
copyleft_rewards: CopyleftRewards,
orig_libs: Option<StateInitLib>,
copyleft_rewards: Option<CopyleftRewards>,

/// * Sync key of message, which updated account state
/// * It is an incremental counter set by executor
update_msg_sync_key: Option<usize>,

// /// * Executor sets transaction that updated account to current state
// /// * Initial account state contains None
// last_transaction: Option<(Cell, CurrencyCollection)>,

/// LT of transaction, which updated account state
update_trans_lt: Option<u64>,

/// The copyleft_reward of transaction, which updated account state (if exists)
update_copyleft_reward_address: Option<AccountId>,

/// Executor stores prevoius account state
prev_account_stuff: Option<Box<ShardAccountStuff>>,
}

impl ShardAccountStuff {
pub fn new(
account_addr: AccountId,
shard_acc: ShardAccount,
lt: Arc<AtomicU64>,
lt: u64,
) -> Result<Self> {
let account_hash = shard_acc.account_cell().repr_hash();
let account_root = shard_acc.account_cell();
let last_trans_hash = shard_acc.last_trans_hash().clone();
let last_trans_lt = shard_acc.last_trans_lt();
Ok(Self{
account_addr,
orig_libs: shard_acc.read_account()?.libraries(),
orig_libs: Some(shard_acc.read_account()?.libraries()),
account_root,
last_trans_hash,
last_trans_lt,
lt,
transactions: Transactions::default(),
transactions: Some(Transactions::default()),
state_update: HashUpdate::with_hashes(account_hash.clone(), account_hash),
copyleft_rewards: CopyleftRewards::default(),
copyleft_rewards: Some(CopyleftRewards::default()),

update_msg_sync_key: None,
//last_transaction: None,
update_trans_lt: None,
update_copyleft_reward_address: None,
prev_account_stuff: None,
})
}
/// Returns:
/// * None - if no updates or no matching records in history
/// * Some(particular) - record from history that matches update_msg_sync_key == on_msg_sync_key
pub fn commit(mut self, on_msg_sync_key: usize) -> Result<Option<Self>> {
while let Some(current_update_msg_sync_key) = self.update_msg_sync_key {
if current_update_msg_sync_key == on_msg_sync_key {
log::debug!("account {:x} state committed by processed message {} in the queue", self.account_addr(), on_msg_sync_key);
return Ok(Some(self));
} else {
if !self.revert()? {
log::debug!("unable to revert account {:x} state, current state is a first in history", self.account_addr());
return Ok(None);
} else {
log::debug!("account {:x} state reverted one step back to message {:?} in the queue", self.account_addr(), self.update_msg_sync_key);
}
}
}
Ok(None)
}
fn revert(&mut self) -> Result<bool> {
let mut taked_prev = match self.prev_account_stuff.take() {
Some(prev) => prev,
None => return Ok(false),
};
let prev = taked_prev.as_mut();

prev.orig_libs = self.orig_libs.take();

prev.transactions = self.transactions.take();
if let Some(update_trans_lt) = self.update_trans_lt {
prev.remove_trans(update_trans_lt)?;
}

prev.copyleft_rewards = self.copyleft_rewards.take();
if let Some(update_copyleft_reward_address) = self.update_copyleft_reward_address.as_ref() {
prev.remove_copyleft_reward(update_copyleft_reward_address)?;
}

std::mem::swap(self, prev);

Ok(true)
}
pub fn update_shard_state(&mut self, new_accounts: &mut ShardAccounts) -> Result<AccountBlock> {
let account = self.read_account()?;
if account.is_none() {
Expand All @@ -62,10 +126,10 @@ impl ShardAccountStuff {
let value = shard_acc.write_to_new_cell()?;
new_accounts.set_builder_serialized(self.account_addr().clone(), &value, &account.aug()?)?;
}
AccountBlock::with_params(&self.account_addr, &self.transactions, &self.state_update)
AccountBlock::with_params(&self.account_addr, self.transactions()?, &self.state_update)
}
pub fn lt(&self) -> Arc<AtomicU64> {
self.lt.clone()
pub fn lt(&self) -> u64 {
self.lt
}
pub fn read_account(&self) -> Result<Account> {
Account::construct_from_cell(self.account_root())
Expand All @@ -79,39 +143,116 @@ impl ShardAccountStuff {
pub fn account_addr(&self) -> &AccountId {
&self.account_addr
}
pub fn copyleft_rewards(&self) -> &CopyleftRewards {
&self.copyleft_rewards
pub fn copyleft_rewards(&self) -> Result<&CopyleftRewards> {
self.copyleft_rewards.as_ref()
.ok_or_else(|| error!(
"`copyleft_rewards` field is None, possibly you try access a not root record in the history, run commit() before"
))
}
fn copyleft_rewards_mut(&mut self) -> Result<&mut CopyleftRewards> {
self.copyleft_rewards.as_mut()
.ok_or_else(|| error!(
"`copyleft_rewards` field is None, possibly you try access a not root record in the history, run commit() before"
))
}
fn remove_copyleft_reward(&mut self, address: &AccountId) -> Result<bool> {
self.copyleft_rewards_mut()?.remove(address)
}

fn transactions(&self) -> Result<&Transactions> {
self.transactions.as_ref()
.ok_or_else(|| error!(
"`transactions` field is None, possibly you try access a not root record in the history, run commit() before"
))
}
fn transactions_mut(&mut self) -> Result<&mut Transactions> {
self.transactions.as_mut()
.ok_or_else(|| error!(
"`transactions` field is None, possibly you try access a not root record in the history, run commit() before"
))
}
fn remove_trans(&mut self, trans_lt: u64) -> Result<()> {
let key = SliceData::load_builder(trans_lt.write_to_new_cell()?)?;
self.transactions_mut()?.remove(key)?;
Ok(())
}

fn orig_libs(&self) -> Result<&StateInitLib> {
self.orig_libs.as_ref()
.ok_or_else(|| error!(
"`orig_libs` field is None, possibly you try access a not root record in the history, run commit() before"
))
}

pub fn apply_transaction_res(
&mut self,
update_msg_sync_key: usize,
tx_last_lt: u64,
transaction_res: &mut Result<Transaction>,
account_root: Cell,
) -> Result<()> {
let mut res = ShardAccountStuff {
account_addr: self.account_addr.clone(),
account_root: self.account_root.clone(),
last_trans_hash: self.last_trans_hash.clone(),
last_trans_lt: self.last_trans_lt,
lt: tx_last_lt, // 1014 or 1104 or 1024
transactions: self.transactions.take(),
state_update: self.state_update.clone(),
orig_libs: self.orig_libs.take(),
copyleft_rewards: Some(CopyleftRewards::default()),
update_msg_sync_key: Some(update_msg_sync_key),
update_trans_lt: None,
update_copyleft_reward_address: None,
prev_account_stuff: None,
};

if let Ok(transaction) = transaction_res {
res.add_transaction(transaction, account_root)?;
}

std::mem::swap(self, &mut res);

self.prev_account_stuff = Some(Box::new(res));

Ok(())
}
pub fn add_transaction(&mut self, transaction: &mut Transaction, account_root: Cell) -> Result<()> {
transaction.set_prev_trans_hash(self.last_trans_hash.clone());
transaction.set_prev_trans_lt(self.last_trans_lt);
transaction.set_prev_trans_lt(self.last_trans_lt); // 1010
// log::trace!("{} {}", self.collated_block_descr, debug_transaction(transaction.clone())?);

self.account_root = account_root;
self.state_update.new_hash = self.account_root.repr_hash();

let tr_root = transaction.serialize()?;
let tr_lt = transaction.logical_time(); // 1011

self.last_trans_hash = tr_root.repr_hash();
self.last_trans_lt = transaction.logical_time();
self.last_trans_lt = tr_lt;

self.update_trans_lt = Some(tr_lt);

self.transactions.setref(
&transaction.logical_time(),
self.transactions_mut()?.setref(
&tr_lt,
&tr_root,
transaction.total_fees()
)?;

if let Some(copyleft_reward) = transaction.copyleft_reward() {
log::trace!("Copyleft reward {} {} from transaction {}", copyleft_reward.address, copyleft_reward.reward, self.last_trans_hash);
self.copyleft_rewards.add_copyleft_reward(&copyleft_reward.address, &copyleft_reward.reward)?;
self.copyleft_rewards_mut()?.add_copyleft_reward(&copyleft_reward.address, &copyleft_reward.reward)?;
self.update_copyleft_reward_address = Some(copyleft_reward.address.clone());
}

Ok(())
}
pub fn update_public_libraries(&self, libraries: &mut Libraries) -> Result<()> {
let account = self.read_account()?;
let new_libs = account.libraries();
if new_libs.root() != self.orig_libs.root() {
new_libs.scan_diff(&self.orig_libs, |key: UInt256, old, new| {
let orig_libs = self.orig_libs()?;
if new_libs.root() != orig_libs.root() {
new_libs.scan_diff(orig_libs, |key: UInt256, old, new| {
let old = old.unwrap_or_default();
let new = new.unwrap_or_default();
if old.is_public_library() && !new.is_public_library() {
Expand Down
4 changes: 2 additions & 2 deletions src/types/limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ impl BlockLimitStatus {
}

/// Update logical time
pub fn update_lt(&mut self, lt: u64) {
self.lt_current = max(self.lt_current, lt);
pub fn update_lt(&mut self, lt: u64, force: bool) {
self.lt_current = if force { lt } else { max(self.lt_current, lt) };
if self.lt_start > self.lt_current {
self.lt_start = lt;
}
Expand Down
5 changes: 5 additions & 0 deletions src/types/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ impl MsgEnvelopeStuff {
pub fn message(&self) -> &Message { &self.msg }
pub fn message_hash(&self) -> UInt256 { self.env.message_hash() }
pub fn message_cell(&self) -> Cell { self.env.message_cell() }
pub fn out_msg_key(&self) -> OutMsgQueueKey {
OutMsgQueueKey::with_account_prefix(&self.next_prefix(), self.message_hash())
}
#[cfg(test)]
pub fn src_prefix(&self) -> &AccountIdPrefixFull { &self.src_prefix }
pub fn dst_prefix(&self) -> &AccountIdPrefixFull { &self.dst_prefix }
pub fn cur_prefix(&self) -> &AccountIdPrefixFull { &self.cur_prefix }
pub fn next_prefix(&self) -> &AccountIdPrefixFull { &self.next_prefix }
Expand Down
Loading