Skip to content

Commit

Permalink
chore(node-metrics): create a HooksBuilder type to help with Hooks in…
Browse files Browse the repository at this point in the history
…itialization
  • Loading branch information
nils-mathieu committed Nov 19, 2024
1 parent 7c7baca commit 8ca3dcc
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 39 deletions.
19 changes: 15 additions & 4 deletions crates/cli/commands/src/stage/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use reth_cli::chainspec::ChainSpecParser;
use reth_cli_runner::CliContext;
use reth_cli_util::get_secret_key;
use reth_config::config::{HashingConfig, SenderRecoveryConfig, TransactionLookupConfig};
use reth_db_api::database_metrics::DatabaseMetrics;
use reth_downloaders::{
bodies::bodies::BodiesDownloaderBuilder,
headers::reverse_headers::ReverseHeadersDownloaderBuilder,
Expand Down Expand Up @@ -132,10 +133,20 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
},
ChainSpecInfo { name: provider_factory.chain_spec().chain().to_string() },
ctx.task_executor,
Hooks::new(
provider_factory.db_ref().clone(),
provider_factory.static_file_provider(),
),
Hooks::builder()
.with_hook({
let db = provider_factory.db_ref().clone();
move || db.report_metrics()
})
.with_hook({
let sfp = provider_factory.static_file_provider();
move || {
if let Err(error) = sfp.report_metrics() {
error!(%error, "Failed to report metrics from static file provider");
}
}
})
.build(),
);

MetricServer::new(config).serve().await?;
Expand Down
17 changes: 15 additions & 2 deletions crates/node/builder/src/launch/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use reth_blockchain_tree::{
use reth_chainspec::{Chain, EthChainSpec, EthereumHardforks};
use reth_config::{config::EtlConfig, PruneConfig};
use reth_consensus::Consensus;
use reth_db_api::database::Database;
use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
use reth_db_common::init::{init_genesis, InitDatabaseError};
use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
use reth_engine_local::MiningMode;
Expand Down Expand Up @@ -535,7 +535,20 @@ where
},
ChainSpecInfo { name: self.left().config.chain.chain().to_string() },
self.task_executor().clone(),
Hooks::new(self.database().clone(), self.static_file_provider()),
Hooks::builder()
.with_hook({
let db = self.database().clone();
move || db.report_metrics()
})
.with_hook({
let sfp = self.static_file_provider();
move || {
if let Err(error) = sfp.report_metrics() {
error!(%error, "Failed to report metrics for the static file provider");
}
}
})
.build(),
);

MetricServer::new(config).serve().await?;
Expand Down
89 changes: 57 additions & 32 deletions crates/node/metrics/src/hooks.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,50 @@
use metrics_process::Collector;
use reth_db_api::database_metrics::DatabaseMetrics;
use reth_primitives_traits::NodePrimitives;
use reth_provider::providers::StaticFileProvider;
use std::{
fmt::{self},
sync::Arc,
};
use std::{fmt, sync::Arc};

pub(crate) trait Hook: Fn() + Send + Sync {}
impl<T: Fn() + Send + Sync> Hook for T {}
/// The simple alias for function types that are `'static`, `Send`, and `Sync`.
pub trait Hook: 'static + Fn() + Send + Sync {}
impl<T: 'static + Fn() + Send + Sync> Hook for T {}

impl fmt::Debug for Hooks {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let hooks_len = self.inner.len();
f.debug_struct("Hooks")
.field("inner", &format!("Arc<Vec<Box<dyn Hook>>>, len: {}", hooks_len))
/// A builder-like type to create a new [`Hooks`] instance.
pub struct HooksBuilder {
hooks: Vec<Box<dyn Hook<Output = ()>>>,
}

impl HooksBuilder {
/// Registers a [`Hook`].
pub fn with_hook(self, hook: impl Hook<Output = ()>) -> Self {
self.with_boxed_hook(Box::new(hook))
}

/// Registers a [`Hook`].
#[inline]
pub fn with_boxed_hook(mut self, hook: Box<dyn Hook<Output = ()>>) -> Self {
self.hooks.push(hook);
self
}

/// Builds the [`Hooks`] collection from the registered hooks.
pub fn build(self) -> Hooks {
Hooks { inner: Arc::new(self.hooks) }
}
}

impl Default for HooksBuilder {
fn default() -> Self {
Self {
hooks: vec![
Box::new(|| Collector::default().collect()),
Box::new(collect_memory_stats),
Box::new(collect_io_stats),
],
}
}
}

impl std::fmt::Debug for HooksBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HooksBuilder")
.field("hooks", &format_args!("Vec<Box<dyn Hook>>, len: {}", self.hooks.len()))
.finish()
}
}
Expand All @@ -26,31 +56,26 @@ pub struct Hooks {
}

impl Hooks {
/// Create a new set of hooks
pub fn new<Metrics, N>(db: Metrics, static_file_provider: StaticFileProvider<N>) -> Self
where
Metrics: DatabaseMetrics + 'static + Send + Sync,
N: NodePrimitives,
{
let hooks: Vec<Box<dyn Hook<Output = ()>>> = vec![
Box::new(move || db.report_metrics()),
Box::new(move || {
let _ = static_file_provider.report_metrics().map_err(
|error| tracing::error!(%error, "Failed to report static file provider metrics"),
);
}),
Box::new(move || Collector::default().collect()),
Box::new(collect_memory_stats),
Box::new(collect_io_stats),
];
Self { inner: Arc::new(hooks) }
/// Creates a new [`HooksBuilder`] instance.
#[inline]
pub fn builder() -> HooksBuilder {
HooksBuilder::default()
}

pub(crate) fn iter(&self) -> impl Iterator<Item = &Box<dyn Hook<Output = ()>>> {
self.inner.iter()
}
}

impl fmt::Debug for Hooks {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let hooks_len = self.inner.len();
f.debug_struct("Hooks")
.field("inner", &format_args!("Arc<Vec<Box<dyn Hook>>>, len: {}", hooks_len))
.finish()
}
}

#[cfg(all(feature = "jemalloc", unix))]
fn collect_memory_stats() {
use metrics::gauge;
Expand Down
16 changes: 15 additions & 1 deletion crates/node/metrics/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ const fn describe_io_stats() {}
mod tests {
use super::*;
use reqwest::Client;
use reth_db_api::database_metrics::DatabaseMetrics;
use reth_provider::{test_utils::create_test_provider_factory, StaticFileProviderFactory};
use reth_tasks::TaskManager;
use socket2::{Domain, Socket, Type};
Expand Down Expand Up @@ -237,7 +238,20 @@ mod tests {
let executor = tasks.executor();

let factory = create_test_provider_factory();
let hooks = Hooks::new(factory.db_ref().clone(), factory.static_file_provider());
let hooks = Hooks::builder()
.with_hook({
let db = factory.db_ref().clone();
move || db.report_metrics()
})
.with_hook({
let sfp = factory.static_file_provider();
move || {
if let Err(error) = sfp.report_metrics() {
tracing::error!(%error, "Failed to report metrics for the static file provider");
}
}
})
.build();

let listen_addr = get_random_available_addr();
let config =
Expand Down

0 comments on commit 8ca3dcc

Please sign in to comment.