Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use modern collator with aborting procedure #265

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

All notable changes to this project will be documented in this file.

## Version 0.59.12

- Use modern collator with aborting procedure

## Version 0.59.11

- Fix for broken shard merge
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
build = 'common/build/build.rs'
edition = '2021'
name = 'ever-node'
version = '0.59.11'
version = '0.59.12'

[workspace]
members = [ 'storage' ]
Expand Down
2 changes: 2 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ pub struct CollatorConfig {
pub optimistic_clean_percentage_points: u32,
pub max_secondary_clean_timeout_percentage_points: u32,
pub max_collate_threads: u32,
pub max_collate_msgs_queue_on_account: u32,
pub retry_if_empty: bool,
pub finalize_empty_after_ms: u32,
pub empty_collation_sleep_ms: u32,
Expand All @@ -99,6 +100,7 @@ impl Default for CollatorConfig {
optimistic_clean_percentage_points: 1000, // 1.000 = 100% = 150ms
max_secondary_clean_timeout_percentage_points: 350, // 0.350 = 35% = 350ms
max_collate_threads: 10,
max_collate_msgs_queue_on_account: 3,
retry_if_empty: false,
finalize_empty_after_ms: 800,
empty_collation_sleep_ms: 100,
Expand Down
4 changes: 0 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ mod shard_blocks;

include!("../common/src/info.rs");

#[cfg(feature = "tracing")]
pub mod jaeger;

#[cfg(not(feature = "tracing"))]
pub mod jaeger {
pub fn init_jaeger(){}
#[cfg(feature = "external_db")]
Expand Down
8 changes: 2 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ mod ext_messages;

mod shard_blocks;

#[cfg(feature = "tracing")]
mod jaeger;

#[cfg(not(feature = "tracing"))]
mod jaeger {
pub fn init_jaeger(){}
#[cfg(feature = "external_db")]
Expand All @@ -52,7 +48,7 @@ mod jaeger {

use crate::{
config::TonNodeConfig, engine::{Engine, Stopper, EngineFlags},
jaeger::init_jaeger, internal_db::restore::set_graceful_termination,
internal_db::restore::set_graceful_termination,
validating_utils::supported_version
};
#[cfg(feature = "external_db")]
Expand Down Expand Up @@ -456,7 +452,7 @@ fn main() {
.build()
.expect("Can't create Validator tokio runtime");

init_jaeger();
jaeger::init_jaeger();

#[cfg(feature = "trace_alloc_detail")]
thread::spawn(
Expand Down
2 changes: 1 addition & 1 deletion src/tests/test_shard_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use super::*;
use crate::test_helper::gen_master_state;
use crate::collator_test_bundle::{create_block_handle_storage, create_engine_allocated};
#[cfg(all(feature = "telemetry", not(feature = "fast_finality")))]
#[cfg(all(feature = "telemetry"))]
use crate::collator_test_bundle::create_engine_telemetry;
use std::{sync::{atomic::{AtomicU32, Ordering}, Arc}, collections::HashSet};
use storage::{block_handle_db::{BlockHandle, BlockHandleStorage}, types::BlockMeta};
Expand Down
186 changes: 164 additions & 22 deletions src/types/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,48 @@
* limitations under the License.
*/

use std::sync::{atomic::AtomicU64, Arc};
use ever_block::{
error, fail, AccountId, Cell, HashmapRemover, Result, UInt256,
Account, AccountBlock, Augmentation, CopyleftRewards, Deserializable, HashUpdate,
HashmapAugType, LibDescr, Libraries, Serializable, ShardAccount, ShardAccounts, StateInitLib,
Transaction, Transactions,
};
use ever_block::{fail, AccountId, Cell, HashmapRemover, Result, UInt256};

pub struct ShardAccountStuff {
account_addr: AccountId,
account_root: Cell,
last_trans_hash: UInt256,
last_trans_lt: u64,
lt: Arc<AtomicU64>,
transactions: Transactions,
lt: u64,
transactions: Option<Transactions>,
state_update: HashUpdate,
orig_libs: StateInitLib,
copyleft_rewards: CopyleftRewards,
orig_libs: Option<StateInitLib>,
copyleft_rewards: Option<CopyleftRewards>,

/// * Sync key of message, which updated account state
/// * It is an incremental counter set by executor
update_msg_sync_key: Option<usize>,

// /// * Executor sets transaction that updated account to current state
// /// * Initial account state contains None
// last_transaction: Option<(Cell, CurrencyCollection)>,

/// LT of transaction, which updated account state
update_trans_lt: Option<u64>,

/// The copyleft_reward of transaction, which updated account state (if exists)
update_copyleft_reward_address: Option<AccountId>,

/// Executor stores prevoius account state
prev_account_stuff: Option<Box<ShardAccountStuff>>,
serde_opts: u8,
}

impl ShardAccountStuff {
pub fn new(
account_addr: AccountId,
shard_acc: ShardAccount,
lt: Arc<AtomicU64>,
lt: u64,
serde_opts: u8,
) -> Result<Self> {
let account_hash = shard_acc.account_cell().repr_hash();
Expand All @@ -45,17 +61,65 @@ impl ShardAccountStuff {
let last_trans_lt = shard_acc.last_trans_lt();
Ok(Self{
account_addr,
orig_libs: shard_acc.read_account()?.libraries(),
orig_libs: Some(shard_acc.read_account()?.libraries()),
account_root,
last_trans_hash,
last_trans_lt,
lt,
transactions: Transactions::with_serde_opts(serde_opts),
transactions: Some(Transactions::with_serde_opts(serde_opts)),
state_update: HashUpdate::with_hashes(account_hash.clone(), account_hash),
copyleft_rewards: CopyleftRewards::default(),
copyleft_rewards: Some(CopyleftRewards::default()),

update_msg_sync_key: None,
//last_transaction: None,
update_trans_lt: None,
update_copyleft_reward_address: None,
prev_account_stuff: None,
serde_opts,
})
}
/// Returns:
/// * None - if no updates or no matching records in history
/// * Some(particular) - record from history that matches update_msg_sync_key == on_msg_sync_key
pub fn commit(mut self, on_msg_sync_key: usize) -> Result<Option<Self>> {
while let Some(current_update_msg_sync_key) = self.update_msg_sync_key {
if current_update_msg_sync_key == on_msg_sync_key {
log::debug!("account {:x} state committed by processed message {} in the queue", self.account_addr(), on_msg_sync_key);
return Ok(Some(self));
} else {
if !self.revert()? {
log::debug!("unable to revert account {:x} state, current state is a first in history", self.account_addr());
return Ok(None);
} else {
log::debug!("account {:x} state reverted one step back to message {:?} in the queue", self.account_addr(), self.update_msg_sync_key);
}
}
}
Ok(None)
}
fn revert(&mut self) -> Result<bool> {
let mut taked_prev = match self.prev_account_stuff.take() {
Some(prev) => prev,
None => return Ok(false),
};
let prev = taked_prev.as_mut();

prev.orig_libs = self.orig_libs.take();

prev.transactions = self.transactions.take();
if let Some(update_trans_lt) = self.update_trans_lt {
prev.remove_trans(update_trans_lt)?;
}

prev.copyleft_rewards = self.copyleft_rewards.take();
if let Some(update_copyleft_reward_address) = self.update_copyleft_reward_address.as_ref() {
prev.remove_copyleft_reward(update_copyleft_reward_address)?;
}

std::mem::swap(self, prev);

Ok(true)
}
pub fn update_shard_state(&mut self, new_accounts: &mut ShardAccounts) -> Result<AccountBlock> {
let account = self.read_account()?;
if account.is_none() {
Expand All @@ -65,10 +129,10 @@ impl ShardAccountStuff {
let value = shard_acc.write_to_new_cell()?;
new_accounts.set_builder_serialized(self.account_addr().clone(), &value, &account.aug()?)?;
}
AccountBlock::with_params(&self.account_addr, &self.transactions, &self.state_update)
AccountBlock::with_params(&self.account_addr, self.transactions()?, &self.state_update)
}
pub fn lt(&self) -> Arc<AtomicU64> {
self.lt.clone()
pub fn lt(&self) -> u64 {
self.lt
}
pub fn read_account(&self) -> Result<Account> {
Account::construct_from_cell(self.account_root())
Expand All @@ -82,39 +146,117 @@ impl ShardAccountStuff {
pub fn account_addr(&self) -> &AccountId {
&self.account_addr
}
pub fn copyleft_rewards(&self) -> &CopyleftRewards {
&self.copyleft_rewards
pub fn copyleft_rewards(&self) -> Result<&CopyleftRewards> {
self.copyleft_rewards.as_ref()
.ok_or_else(|| error!(
"`copyleft_rewards` field is None, possibly you try access a not root record in the history, run commit() before"
))
}
fn copyleft_rewards_mut(&mut self) -> Result<&mut CopyleftRewards> {
self.copyleft_rewards.as_mut()
.ok_or_else(|| error!(
"`copyleft_rewards` field is None, possibly you try access a not root record in the history, run commit() before"
))
}
fn remove_copyleft_reward(&mut self, address: &AccountId) -> Result<bool> {
self.copyleft_rewards_mut()?.remove(address)
}

fn transactions(&self) -> Result<&Transactions> {
self.transactions.as_ref()
.ok_or_else(|| error!(
"`transactions` field is None, possibly you try access a not root record in the history, run commit() before"
))
}
fn transactions_mut(&mut self) -> Result<&mut Transactions> {
self.transactions.as_mut()
.ok_or_else(|| error!(
"`transactions` field is None, possibly you try access a not root record in the history, run commit() before"
))
}
fn remove_trans(&mut self, trans_lt: u64) -> Result<()> {
let key = trans_lt.write_to_bitstring()?;
self.transactions_mut()?.remove(key)?;
Ok(())
}

fn orig_libs(&self) -> Result<&StateInitLib> {
self.orig_libs.as_ref()
.ok_or_else(|| error!(
"`orig_libs` field is None, possibly you try access a not root record in the history, run commit() before"
))
}

pub fn apply_transaction_res(
&mut self,
update_msg_sync_key: usize,
tx_last_lt: u64,
transaction_res: &mut Result<Transaction>,
account_root: Cell,
) -> Result<()> {
let mut res = ShardAccountStuff {
account_addr: self.account_addr.clone(),
account_root: self.account_root.clone(),
last_trans_hash: self.last_trans_hash.clone(),
last_trans_lt: self.last_trans_lt,
lt: tx_last_lt, // 1014 or 1104 or 1024
transactions: self.transactions.take(),
state_update: self.state_update.clone(),
orig_libs: self.orig_libs.take(),
copyleft_rewards: Some(CopyleftRewards::default()),
update_msg_sync_key: Some(update_msg_sync_key),
update_trans_lt: None,
update_copyleft_reward_address: None,
prev_account_stuff: None,
serde_opts: self.serde_opts,
};

if let Ok(transaction) = transaction_res {
res.add_transaction(transaction, account_root)?;
}

std::mem::swap(self, &mut res);

self.prev_account_stuff = Some(Box::new(res));

Ok(())
}
pub fn add_transaction(&mut self, transaction: &mut Transaction, account_root: Cell) -> Result<()> {
transaction.set_prev_trans_hash(self.last_trans_hash.clone());
transaction.set_prev_trans_lt(self.last_trans_lt);
transaction.set_prev_trans_lt(self.last_trans_lt); // 1010
// log::trace!("{} {}", self.collated_block_descr, debug_transaction(transaction.clone())?);

self.account_root = account_root;
self.state_update.new_hash = self.account_root.repr_hash();

let tr_root = transaction.serialize_with_opts(self.serde_opts)?;
let tr_lt = transaction.logical_time(); // 1011

self.last_trans_hash = tr_root.repr_hash();
self.last_trans_lt = transaction.logical_time();
self.last_trans_lt = tr_lt;

self.update_trans_lt = Some(tr_lt);

self.transactions.setref(
&transaction.logical_time(),
self.transactions_mut()?.setref(
&tr_lt,
&tr_root,
transaction.total_fees()
)?;

if let Some(copyleft_reward) = transaction.copyleft_reward() {
log::trace!("Copyleft reward {} {} from transaction {}", copyleft_reward.address, copyleft_reward.reward, self.last_trans_hash);
self.copyleft_rewards.add_copyleft_reward(&copyleft_reward.address, &copyleft_reward.reward)?;
self.copyleft_rewards_mut()?.add_copyleft_reward(&copyleft_reward.address, &copyleft_reward.reward)?;
self.update_copyleft_reward_address = Some(copyleft_reward.address.clone());
}

Ok(())
}
pub fn update_public_libraries(&self, libraries: &mut Libraries) -> Result<()> {
let account = self.read_account()?;
let new_libs = account.libraries();
if new_libs.root() != self.orig_libs.root() {
new_libs.scan_diff(&self.orig_libs, |key: UInt256, old, new| {
let orig_libs = self.orig_libs()?;
if new_libs.root() != orig_libs.root() {
new_libs.scan_diff(orig_libs, |key: UInt256, old, new| {
let old = old.unwrap_or_default();
let new = new.unwrap_or_default();
if old.is_public_library() && !new.is_public_library() {
Expand Down
4 changes: 2 additions & 2 deletions src/types/limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ impl BlockLimitStatus {
}

/// Update logical time
pub fn update_lt(&mut self, lt: u64) {
self.lt_current = max(self.lt_current, lt);
pub fn update_lt(&mut self, lt: u64, force: bool) {
self.lt_current = if force { lt } else { max(self.lt_current, lt) };
if self.lt_start > self.lt_current {
self.lt_start = lt;
}
Expand Down
Loading