Skip to content
This repository has been archived by the owner on Aug 21, 2024. It is now read-only.

Commit

Permalink
fix(concurrency): stopping the program if one of the threads panics
Browse files Browse the repository at this point in the history
  • Loading branch information
meship-starkware committed Jul 16, 2024
1 parent f6fa5af commit 694cf90
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 5 deletions.
20 changes: 17 additions & 3 deletions crates/blockifier/src/blockifier/transaction_executor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#[cfg(feature = "concurrency")]
use std::collections::{HashMap, HashSet};
#[cfg(feature = "concurrency")]
use std::panic::{self, catch_unwind, AssertUnwindSafe};
#[cfg(feature = "concurrency")]
use std::sync::Arc;
#[cfg(feature = "concurrency")]
use std::sync::Mutex;
Expand All @@ -12,6 +14,7 @@ use thiserror::Error;

use crate::blockifier::config::TransactionExecutorConfig;
use crate::bouncer::{Bouncer, BouncerWeights};
use crate::concurrency::scheduler::TransactionCommitter;
#[cfg(feature = "concurrency")]
use crate::concurrency::worker_logic::WorkerExecutor;
use crate::context::BlockContext;
Expand Down Expand Up @@ -233,10 +236,22 @@ impl<S: StateReader + Send + Sync> TransactionExecutor<S> {
// TODO(barak, 01/07/2024): Consider using tokio and spawn tasks that will be served by some
// upper level tokio thread pool (Runtime in tokio terminology).
std::thread::scope(|s| {
for _ in 0..self.config.concurrency_config.n_workers {
for i in 0..self.config.concurrency_config.n_workers {
let worker_executor = Arc::clone(&worker_executor);
s.spawn(move || {
worker_executor.run();
let result = catch_unwind(AssertUnwindSafe(|| {
worker_executor.run();
}));
if let Err(err) = result {
println!("Thread {} caught a panic, propagating it.", i);
let panic_idx = Mutex::new(1);
let mut tx_commiter = TransactionCommitter::new(
&worker_executor.scheduler,
panic_idx.lock().unwrap(),
);
tx_commiter.halt_scheduler();
panic::resume_unwind(err);
}
});
}
});
Expand Down Expand Up @@ -270,7 +285,6 @@ impl<S: StateReader + Send + Sync> TransactionExecutor<S> {
})
.commit_chunk_and_recover_block_state(n_committed_txs, visited_pcs);
self.block_state.replace(block_state_after_commit);

tx_execution_results
}
}
2 changes: 1 addition & 1 deletion crates/blockifier/src/concurrency/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ impl Scheduler {
}

/// Returns the done marker.
fn done(&self) -> bool {
pub(crate) fn done(&self) -> bool {
self.done_marker.load(Ordering::Acquire)
}

Expand Down
9 changes: 8 additions & 1 deletion crates/blockifier/src/concurrency/worker_logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ impl<'a, S: StateReader> WorkerExecutor<'a, S> {
}

pub fn run(&self) {
if self.scheduler.done() {
return;
};
let mut task = Task::AskForTask;
loop {
self.commit_while_possible();
Expand Down Expand Up @@ -118,7 +121,10 @@ impl<'a, S: StateReader> WorkerExecutor<'a, S> {

fn execute(&self, tx_index: TxIndex) {
self.execute_tx(tx_index);
self.scheduler.finish_execution(tx_index)
if tx_index == 1 {
panic!("test concurrency panic behaviour");
}
self.scheduler.finish_execution(tx_index);
}

fn execute_tx(&self, tx_index: TxIndex) {
Expand Down Expand Up @@ -189,6 +195,7 @@ impl<'a, S: StateReader> WorkerExecutor<'a, S> {
/// * Else (execution failed), commit the transaction without fixing the call info or
/// updating the sequencer balance.
fn commit_tx(&self, tx_index: TxIndex) -> bool {
println!("commit_tx {}", tx_index);
let execution_output = lock_mutex_in_array(&self.execution_outputs, tx_index);
let execution_output_ref = execution_output.as_ref().expect(EXECUTION_OUTPUTS_UNWRAP_ERROR);
let reads = &execution_output_ref.reads;
Expand Down

0 comments on commit 694cf90

Please sign in to comment.