Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve metrics hooks setup (fixes #12672) #12684

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 15 additions & 4 deletions crates/cli/commands/src/stage/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use reth_cli::chainspec::ChainSpecParser;
use reth_cli_runner::CliContext;
use reth_cli_util::get_secret_key;
use reth_config::config::{HashingConfig, SenderRecoveryConfig, TransactionLookupConfig};
use reth_db_api::database_metrics::DatabaseMetrics;
use reth_downloaders::{
bodies::bodies::BodiesDownloaderBuilder,
headers::reverse_headers::ReverseHeadersDownloaderBuilder,
Expand Down Expand Up @@ -132,10 +133,20 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
},
ChainSpecInfo { name: provider_factory.chain_spec().chain().to_string() },
ctx.task_executor,
Hooks::new(
provider_factory.db_ref().clone(),
provider_factory.static_file_provider(),
),
Hooks::builder()
.with_hook({
let db = provider_factory.db_ref().clone();
move || db.report_metrics()
})
.with_hook({
let sfp = provider_factory.static_file_provider();
move || {
if let Err(error) = sfp.report_metrics() {
error!(%error, "Failed to report metrics from static file provider");
}
}
})
.build(),
);

MetricServer::new(config).serve().await?;
Expand Down
17 changes: 15 additions & 2 deletions crates/node/builder/src/launch/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use reth_blockchain_tree::{
use reth_chainspec::{Chain, EthChainSpec, EthereumHardforks};
use reth_config::{config::EtlConfig, PruneConfig};
use reth_consensus::Consensus;
use reth_db_api::database::Database;
use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
use reth_db_common::init::{init_genesis, InitDatabaseError};
use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
use reth_engine_local::MiningMode;
Expand Down Expand Up @@ -535,7 +535,20 @@ where
},
ChainSpecInfo { name: self.left().config.chain.chain().to_string() },
self.task_executor().clone(),
Hooks::new(self.database().clone(), self.static_file_provider()),
Hooks::builder()
.with_hook({
let db = self.database().clone();
move || db.report_metrics()
})
.with_hook({
let sfp = self.static_file_provider();
move || {
if let Err(error) = sfp.report_metrics() {
error!(%error, "Failed to report metrics for the static file provider");
}
}
})
.build(),
);

MetricServer::new(config).serve().await?;
Expand Down
3 changes: 0 additions & 3 deletions crates/node/metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ homepage.workspace = true
repository.workspace = true

[dependencies]
reth-db-api.workspace = true
reth-primitives-traits.workspace = true
reth-provider.workspace = true
reth-metrics.workspace = true
reth-tasks.workspace = true

Expand Down
89 changes: 57 additions & 32 deletions crates/node/metrics/src/hooks.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,50 @@
use metrics_process::Collector;
use reth_db_api::database_metrics::DatabaseMetrics;
use reth_primitives_traits::NodePrimitives;
use reth_provider::providers::StaticFileProvider;
use std::{
fmt::{self},
sync::Arc,
};
use std::{fmt, sync::Arc};

pub(crate) trait Hook: Fn() + Send + Sync {}
impl<T: Fn() + Send + Sync> Hook for T {}
/// The simple alias for function types that are `'static`, `Send`, and `Sync`.
pub trait Hook: 'static + Fn() + Send + Sync {}
impl<T: 'static + Fn() + Send + Sync> Hook for T {}

impl fmt::Debug for Hooks {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let hooks_len = self.inner.len();
f.debug_struct("Hooks")
.field("inner", &format!("Arc<Vec<Box<dyn Hook>>>, len: {}", hooks_len))
/// A builder-like type to create a new [`Hooks`] instance.
pub struct HooksBuilder {
hooks: Vec<Box<dyn Hook<Output = ()>>>,
}

impl HooksBuilder {
/// Registers a [`Hook`].
pub fn with_hook(self, hook: impl Hook<Output = ()>) -> Self {
self.with_boxed_hook(Box::new(hook))
}

/// Registers a [`Hook`].
#[inline]
pub fn with_boxed_hook(mut self, hook: Box<dyn Hook<Output = ()>>) -> Self {
self.hooks.push(hook);
self
}

/// Builds the [`Hooks`] collection from the registered hooks.
pub fn build(self) -> Hooks {
Hooks { inner: Arc::new(self.hooks) }
}
}

impl Default for HooksBuilder {
fn default() -> Self {
Self {
hooks: vec![
Box::new(|| Collector::default().collect()),
Box::new(collect_memory_stats),
Box::new(collect_io_stats),
],
}
}
}

impl std::fmt::Debug for HooksBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HooksBuilder")
.field("hooks", &format_args!("Vec<Box<dyn Hook>>, len: {}", self.hooks.len()))
.finish()
}
}
Expand All @@ -26,31 +56,26 @@ pub struct Hooks {
}

impl Hooks {
/// Create a new set of hooks
pub fn new<Metrics, N>(db: Metrics, static_file_provider: StaticFileProvider<N>) -> Self
where
Metrics: DatabaseMetrics + 'static + Send + Sync,
N: NodePrimitives,
{
let hooks: Vec<Box<dyn Hook<Output = ()>>> = vec![
Box::new(move || db.report_metrics()),
Box::new(move || {
let _ = static_file_provider.report_metrics().map_err(
|error| tracing::error!(%error, "Failed to report static file provider metrics"),
);
}),
Box::new(move || Collector::default().collect()),
Box::new(collect_memory_stats),
Box::new(collect_io_stats),
];
Self { inner: Arc::new(hooks) }
/// Creates a new [`HooksBuilder`] instance.
#[inline]
pub fn builder() -> HooksBuilder {
HooksBuilder::default()
}

pub(crate) fn iter(&self) -> impl Iterator<Item = &Box<dyn Hook<Output = ()>>> {
self.inner.iter()
}
}

impl fmt::Debug for Hooks {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let hooks_len = self.inner.len();
f.debug_struct("Hooks")
.field("inner", &format_args!("Arc<Vec<Box<dyn Hook>>>, len: {}", hooks_len))
.finish()
}
}

#[cfg(all(feature = "jemalloc", unix))]
fn collect_memory_stats() {
use metrics::gauge;
Expand Down
16 changes: 15 additions & 1 deletion crates/node/metrics/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ const fn describe_io_stats() {}
mod tests {
use super::*;
use reqwest::Client;
use reth_db_api::database_metrics::DatabaseMetrics;
use reth_provider::{test_utils::create_test_provider_factory, StaticFileProviderFactory};
use reth_tasks::TaskManager;
use socket2::{Domain, Socket, Type};
Expand Down Expand Up @@ -237,7 +238,20 @@ mod tests {
let executor = tasks.executor();

let factory = create_test_provider_factory();
let hooks = Hooks::new(factory.db_ref().clone(), factory.static_file_provider());
let hooks = Hooks::builder()
.with_hook({
let db = factory.db_ref().clone();
move || db.report_metrics()
})
.with_hook({
let sfp = factory.static_file_provider();
move || {
if let Err(error) = sfp.report_metrics() {
tracing::error!(%error, "Failed to report metrics for the static file provider");
}
}
})
.build();

let listen_addr = get_random_available_addr();
let config =
Expand Down