From 011494dff4664b9d59475378dcf5215e87f73f6f Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Wed, 8 Nov 2023 21:33:18 +0100 Subject: [PATCH] feat: support multiple shared caps (#5363) Co-authored-by: Emilia Hane --- crates/net/eth-wire/src/capability.rs | 15 +++- crates/net/eth-wire/src/errors/p2p.rs | 4 +- crates/net/eth-wire/src/p2pstream.rs | 109 +++++++++++++++++------ crates/net/eth-wire/src/types/message.rs | 4 + crates/net/eth-wire/src/types/version.rs | 3 +- crates/net/network/src/error.rs | 3 +- crates/net/network/src/session/mod.rs | 16 +++- examples/manual-p2p/src/main.rs | 2 +- 8 files changed, 121 insertions(+), 35 deletions(-) diff --git a/crates/net/eth-wire/src/capability.rs b/crates/net/eth-wire/src/capability.rs index 1ba92f2b904c..49a5379b86db 100644 --- a/crates/net/eth-wire/src/capability.rs +++ b/crates/net/eth-wire/src/capability.rs @@ -1,6 +1,8 @@ //! All capability related types -use crate::{version::ParseVersionError, EthMessage, EthVersion}; +use crate::{ + p2pstream::MAX_RESERVED_MESSAGE_ID, version::ParseVersionError, EthMessage, EthVersion, +}; use alloy_rlp::{Decodable, Encodable, RlpDecodable, RlpEncodable}; use reth_codecs::add_arbitrary_tests; use reth_primitives::bytes::{BufMut, Bytes}; @@ -227,6 +229,11 @@ impl SharedCapability { } } + /// Returns true if the capability is eth. + pub fn is_eth(&self) -> bool { + matches!(self, SharedCapability::Eth { .. }) + } + /// Returns the version of the capability. pub fn version(&self) -> u8 { match self { @@ -243,6 +250,12 @@ impl SharedCapability { } } + /// Returns the message ID offset of the current capability relative to the start of the + /// capability message ID suffix. + pub fn offset_rel_caps_suffix(&self) -> u8 { + self.offset() - MAX_RESERVED_MESSAGE_ID - 1 + } + /// Returns the number of protocol messages supported by this capability. pub fn num_messages(&self) -> Result { match self { diff --git a/crates/net/eth-wire/src/errors/p2p.rs b/crates/net/eth-wire/src/errors/p2p.rs index be51badf24ca..1d4ae2aa2d03 100644 --- a/crates/net/eth-wire/src/errors/p2p.rs +++ b/crates/net/eth-wire/src/errors/p2p.rs @@ -30,7 +30,9 @@ pub enum P2PStreamError { #[error("ping timed out with")] PingTimeout, #[error(transparent)] - ParseVersionError(#[from] SharedCapabilityError), + ParseSharedCapability(#[from] SharedCapabilityError), + #[error("capability not supported on stream to this peer")] + CapabilityNotShared, #[error("mismatched protocol version in Hello message: {0}")] MismatchedProtocolVersion(GotExpected), #[error("started ping task before the handshake completed")] diff --git a/crates/net/eth-wire/src/p2pstream.rs b/crates/net/eth-wire/src/p2pstream.rs index 7db22470c99d..5f58378de91d 100644 --- a/crates/net/eth-wire/src/p2pstream.rs +++ b/crates/net/eth-wire/src/p2pstream.rs @@ -34,7 +34,7 @@ const MAX_PAYLOAD_SIZE: usize = 16 * 1024 * 1024; /// [`MAX_RESERVED_MESSAGE_ID`] is the maximum message ID reserved for the `p2p` subprotocol. If /// there are any incoming messages with an ID greater than this, they are subprotocol messages. -const MAX_RESERVED_MESSAGE_ID: u8 = 0x0f; +pub const MAX_RESERVED_MESSAGE_ID: u8 = 0x0f; /// [`MAX_P2P_MESSAGE_ID`] is the maximum message ID in use for the `p2p` subprotocol. const MAX_P2P_MESSAGE_ID: u8 = P2PMessageID::Pong as u8; @@ -159,7 +159,7 @@ where // determine shared capabilities (currently returns only one capability) let capability_res = - set_capability_offsets(hello.capabilities, their_hello.capabilities.clone()); + SharedCapabilities::try_new(hello.capabilities, their_hello.capabilities.clone()); let shared_capability = match capability_res { Err(err) => { @@ -207,6 +207,27 @@ where /// A P2PStream wraps over any `Stream` that yields bytes and makes it compatible with `p2p` /// protocol messages. +/// +/// This stream supports multiple shared capabilities, that were negotiated during the handshake. +/// +/// ### Message-ID based multiplexing +/// +/// > Each capability is given as much of the message-ID space as it needs. All such capabilities +/// > must statically specify how many message IDs they require. On connection and reception of the +/// > Hello message, both peers have equivalent information about what capabilities they share +/// > (including versions) and are able to form consensus over the composition of message ID space. +/// +/// > Message IDs are assumed to be compact from ID 0x10 onwards (0x00-0x0f is reserved for the +/// > "p2p" capability) and given to each shared (equal-version, equal-name) capability in +/// > alphabetic order. Capability names are case-sensitive. Capabilities which are not shared are +/// > ignored. If multiple versions are shared of the same (equal name) capability, the numerically +/// > highest wins, others are ignored. +/// +/// See also +/// +/// This stream emits Bytes that start with the normalized message id, so that the first byte of +/// each message starts from 0. If this stream only supports a single capability, for example `eth` +/// then the first byte of each message will match [EthMessageID](crate::types::EthMessageID). #[pin_project] #[derive(Debug)] pub struct P2PStream { @@ -223,7 +244,7 @@ pub struct P2PStream { pinger: Pinger, /// The supported capability for this stream. - shared_capability: SharedCapability, + shared_capabilities: SharedCapabilities, /// Outgoing messages buffered for sending to the underlying stream. outgoing_messages: VecDeque, @@ -241,13 +262,13 @@ impl P2PStream { /// Create a new [`P2PStream`] from the provided stream. /// New [`P2PStream`]s are assumed to have completed the `p2p` handshake successfully and are /// ready to send and receive subprotocol messages. - pub fn new(inner: S, capability: SharedCapability) -> Self { + pub fn new(inner: S, shared_capabilities: SharedCapabilities) -> Self { Self { inner, encoder: snap::raw::Encoder::new(), decoder: snap::raw::Decoder::new(), pinger: Pinger::new(PING_INTERVAL, PING_TIMEOUT), - shared_capability: capability, + shared_capabilities, outgoing_messages: VecDeque::new(), outgoing_message_buffer_capacity: MAX_P2P_CAPACITY, disconnecting: false, @@ -268,9 +289,12 @@ impl P2PStream { self.outgoing_message_buffer_capacity = capacity; } - /// Returns the shared capability for this stream. - pub fn shared_capability(&self) -> &SharedCapability { - &self.shared_capability + /// Returns the shared capabilities for this stream. + /// + /// This includes all the shared capabilities that were negotiated during the handshake and + /// their offsets based on the number of messages of each capability. + pub fn shared_capabilities(&self) -> &SharedCapabilities { + &self.shared_capabilities } /// Returns `true` if the connection is about to disconnect. @@ -460,7 +484,7 @@ where // * `eth/67` is reserved message IDs 0x10 - 0x19. // * `qrs/65` is reserved message IDs 0x1a - 0x21. // - decompress_buf[0] = bytes[0] - this.shared_capability.offset(); + decompress_buf[0] = bytes[0] - MAX_RESERVED_MESSAGE_ID - 1; return Poll::Ready(Some(Ok(decompress_buf))) } @@ -539,7 +563,7 @@ where // all messages sent in this stream are subprotocol messages, so we need to switch the // message id based on the offset - compressed[0] = item[0] + this.shared_capability.offset(); + compressed[0] = item[0] + MAX_RESERVED_MESSAGE_ID + 1; this.outgoing_messages.push_back(compressed.freeze()); Ok(()) @@ -571,16 +595,50 @@ where } } +/// Non-empty ordered list of recognized shared capabilities. +#[derive(Debug)] +pub struct SharedCapabilities(Vec); + +impl SharedCapabilities { + /// Merges the local and peer capabilities and returns a new [`SharedCapabilities`] instance. + pub fn try_new( + local_capabilities: Vec, + peer_capabilities: Vec, + ) -> Result { + Ok(Self(set_capability_offsets(local_capabilities, peer_capabilities)?)) + } + + /// Iterates over the shared capabilities. + pub fn iter_caps(&self) -> impl Iterator { + self.0.iter() + } + + /// Returns the eth capability if it is shared. + pub fn eth(&self) -> Result<&SharedCapability, P2PStreamError> { + for cap in self.iter_caps() { + if cap.is_eth() { + return Ok(cap) + } + } + Err(P2PStreamError::CapabilityNotShared) + } + + /// Returns the negotiated eth version if it is shared. + pub fn eth_version(&self) -> Result { + self.eth().map(|cap| cap.version()) + } +} + /// Determines the offsets for each shared capability between the input list of peer /// capabilities and the input list of locally supported capabilities. /// -/// Currently only `eth` versions 66 and 67 are supported. +/// Currently only `eth` versions 66, 67, 68 are supported. /// Additionally, the `p2p` capability version 5 is supported, but is /// expected _not_ to be in neither `local_capabilities` or `peer_capabilities`. pub fn set_capability_offsets( local_capabilities: Vec, peer_capabilities: Vec, -) -> Result { +) -> Result, P2PStreamError> { // find intersection of capabilities let our_capabilities = local_capabilities.into_iter().collect::>(); @@ -597,8 +655,7 @@ pub fn set_capability_offsets( // This would cause the peers to send messages with the wrong message id, which is usually a // protocol violation. // - // The `Ord` implementation for `SmolStr` (used here) currently delegates to rust's `Ord` - // implementation for `str`, which also orders strings lexicographically. + // The `Ord` implementation for `str` orders strings lexicographically. let mut shared_capability_names = BTreeSet::new(); // find highest shared version of each shared capability @@ -640,6 +697,8 @@ pub fn set_capability_offsets( SharedCapability::UnknownCapability { .. } => { // Capabilities which are not shared are ignored debug!("unknown capability: name={:?}, version={}", name, version,); + + // TODO(mattsse): track shared caps } SharedCapability::Eth { .. } => { // increment the offset if the capability is known @@ -650,17 +709,11 @@ pub fn set_capability_offsets( } } - // TODO: support multiple capabilities - we would need a new Stream type to go on top of - // `P2PStream` containing its capability. `P2PStream` would still send pings and handle - // pongs, but instead contain a map of capabilities to their respective stream / channel. - // Each channel would be responsible for containing the offset for that stream and would - // only increment / decrement message IDs. - // NOTE: since the `P2PStream` currently only supports one capability, we set the - // capability with the lowest offset. - Ok(shared_with_offsets - .first() - .ok_or(P2PStreamError::HandshakeError(P2PHandshakeError::NoSharedCapabilities))? - .clone()) + if shared_with_offsets.is_empty() { + return Err(P2PStreamError::HandshakeError(P2PHandshakeError::NoSharedCapabilities)) + } + + Ok(shared_with_offsets) } /// This represents only the reserved `p2p` subprotocol messages. @@ -928,7 +981,7 @@ mod tests { // ensure that the two share a single capability, eth67 assert_eq!( - p2p_stream.shared_capability, + *p2p_stream.shared_capabilities.iter_caps().next().unwrap(), SharedCapability::Eth { version: EthVersion::Eth67, offset: MAX_RESERVED_MESSAGE_ID + 1 @@ -946,7 +999,7 @@ mod tests { // ensure that the two share a single capability, eth67 assert_eq!( - p2p_stream.shared_capability, + *p2p_stream.shared_capabilities.iter_caps().next().unwrap(), SharedCapability::Eth { version: EthVersion::Eth67, offset: MAX_RESERVED_MESSAGE_ID + 1 @@ -1019,7 +1072,7 @@ mod tests { let peer_capabilities: Vec = vec![EthVersion::Eth66.into()]; let shared_capability = - set_capability_offsets(local_capabilities, peer_capabilities).unwrap(); + set_capability_offsets(local_capabilities, peer_capabilities).unwrap()[0].clone(); assert_eq!( shared_capability, diff --git a/crates/net/eth-wire/src/types/message.rs b/crates/net/eth-wire/src/types/message.rs index 17efbeb2dfbe..e405f727a295 100644 --- a/crates/net/eth-wire/src/types/message.rs +++ b/crates/net/eth-wire/src/types/message.rs @@ -12,6 +12,10 @@ use std::{fmt::Debug, sync::Arc}; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; +/// [`MAX_MESSAGE_SIZE`] is the maximum cap on the size of a protocol message. +// https://github.com/ethereum/go-ethereum/blob/30602163d5d8321fbc68afdcbbaf2362b2641bde/eth/protocols/eth/protocol.go#L50 +pub const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024; + /// An `eth` protocol message, containing a message ID and payload. #[derive(Clone, Debug, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] diff --git a/crates/net/eth-wire/src/types/version.rs b/crates/net/eth-wire/src/types/version.rs index cb81a1c3bff2..c7201cf7e0bb 100644 --- a/crates/net/eth-wire/src/types/version.rs +++ b/crates/net/eth-wire/src/types/version.rs @@ -2,10 +2,9 @@ use crate::capability::Capability; use std::str::FromStr; -use thiserror::Error; /// Error thrown when failed to parse a valid [`EthVersion`]. -#[derive(Debug, Clone, PartialEq, Eq, Error)] +#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] #[error("Unknown eth protocol version: {0}")] pub struct ParseVersionError(String); diff --git a/crates/net/network/src/error.rs b/crates/net/network/src/error.rs index 73a85376292c..ae8b485cad2a 100644 --- a/crates/net/network/src/error.rs +++ b/crates/net/network/src/error.rs @@ -154,7 +154,8 @@ impl SessionError for EthStreamError { )) | P2PStreamError::UnknownReservedMessageId(_) | P2PStreamError::EmptyProtocolMessage | - P2PStreamError::ParseVersionError(_) | + P2PStreamError::ParseSharedCapability(_) | + P2PStreamError::CapabilityNotShared | P2PStreamError::Disconnected(DisconnectReason::UselessPeer) | P2PStreamError::Disconnected( DisconnectReason::IncompatibleP2PProtocolVersion diff --git a/crates/net/network/src/session/mod.rs b/crates/net/network/src/session/mod.rs index 008227079ab7..dcf8d1c748bd 100644 --- a/crates/net/network/src/session/mod.rs +++ b/crates/net/network/src/session/mod.rs @@ -46,6 +46,7 @@ pub use handle::{ ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle, SessionCommand, }; + pub use reth_network_api::{Direction, PeerInfo}; /// Internal identifier for active sessions. @@ -912,10 +913,23 @@ async fn authenticate_stream( } }; + // Ensure we negotiated eth protocol + let version = match p2p_stream.shared_capabilities().eth_version() { + Ok(version) => version, + Err(err) => { + return PendingSessionEvent::Disconnected { + remote_addr, + session_id, + direction, + error: Some(err.into()), + } + } + }; + // if the hello handshake was successful we can try status handshake // // Before trying status handshake, set up the version to shared_capability - let status = Status { version: p2p_stream.shared_capability().version(), ..status }; + let status = Status { version, ..status }; let eth_unauthed = UnauthedEthStream::new(p2p_stream); let (eth_stream, their_status) = match eth_unauthed.handshake(status, fork_filter).await { Ok(stream_res) => stream_res, diff --git a/examples/manual-p2p/src/main.rs b/examples/manual-p2p/src/main.rs index 23bf478540db..d8b98875d204 100644 --- a/examples/manual-p2p/src/main.rs +++ b/examples/manual-p2p/src/main.rs @@ -105,7 +105,7 @@ async fn handshake_eth(p2p_stream: AuthedP2PStream) -> eyre::Result<(AuthedEthSt .forkid(Hardfork::Shanghai.fork_id(&MAINNET).unwrap()) .build(); - let status = Status { version: p2p_stream.shared_capability().version(), ..status }; + let status = Status { version: p2p_stream.shared_capabilities().eth()?.version(), ..status }; let eth_unauthed = UnauthedEthStream::new(p2p_stream); Ok(eth_unauthed.handshake(status, fork_filter).await?) }