From 7e4e8663cba593646d4b193bfe433c451355b5e3 Mon Sep 17 00:00:00 2001 From: wiesche Date: Fri, 19 Jun 2026 08:20:29 +0200 Subject: [PATCH 1/4] Add PIHD header segment protocol --- p2p/src/codec.rs | 2 + p2p/src/lib.rs | 1 + p2p/src/msg.rs | 47 +++++++ p2p/src/peer.rs | 26 +++- p2p/src/peers.rs | 31 ++++- p2p/src/protocol.rs | 118 ++++++++++++++++- p2p/src/serv.rs | 20 ++- p2p/src/types.rs | 29 +++++ p2p/tests/capabilities.rs | 2 + p2p/tests/ser_deser.rs | 8 +- servers/src/common/adapters.rs | 225 +++++++++++++++++++++++++++++---- 11 files changed, 475 insertions(+), 34 deletions(-) diff --git a/p2p/src/codec.rs b/p2p/src/codec.rs index 53278d777e..7891e6f852 100644 --- a/p2p/src/codec.rs +++ b/p2p/src/codec.rs @@ -273,6 +273,8 @@ fn decode_message( Type::RangeProofSegment => Message::RangeProofSegment(msg.body()?), Type::GetKernelSegment => Message::GetKernelSegment(msg.body()?), Type::KernelSegment => Message::KernelSegment(msg.body()?), + Type::GetHeaderSegment => Message::GetHeaderSegment(msg.body()?), + Type::HeaderSegment => Message::HeaderSegment(msg.body()?), Type::Error | Type::Hand | Type::Shake | Type::Headers => { return Err(Error::UnexpectedMessage) } diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index a71e861be6..e0d8bc10cf 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -55,4 +55,5 @@ pub use crate::store::{PeerData, State}; pub use crate::types::{ Capabilities, ChainAdapter, Direction, Error, P2PConfig, PeerAddr, PeerInfo, ReasonForBan, Seeding, TxHashSetRead, MAX_BLOCK_HEADERS, MAX_LOCATORS, MAX_PEER_ADDRS, + PIHD_HEADER_SEGMENT_HEIGHT, }; diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index 69c448e927..051ec5f993 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -94,6 +94,8 @@ enum_from_primitive! { RangeProofSegment = 26, GetKernelSegment = 27, KernelSegment = 28, + GetHeaderSegment = 29, + HeaderSegment = 30, } } @@ -139,6 +141,8 @@ fn max_msg_size(msg_type: Type) -> u64 { Type::RangeProofSegment => 2 * max_block_size(), Type::GetKernelSegment => 41, Type::KernelSegment => 2 * max_block_size(), + Type::GetHeaderSegment => 9, + Type::HeaderSegment => 11 + 365 * MAX_BLOCK_HEADERS as u64, } } @@ -649,6 +653,45 @@ impl Writeable for Headers { } } +/// Serializable wrapper for a deterministic header segment. +pub struct HeaderSegment { + pub identifier: SegmentIdentifier, + pub headers: Vec, +} + +impl Readable for HeaderSegment { + fn read(reader: &mut R) -> Result { + let identifier = SegmentIdentifier::read(reader)?; + let len = reader.read_u16()?; + if len > (MAX_BLOCK_HEADERS as u16) { + return Err(ser::Error::TooLargeReadErr); + } + let mut headers = Vec::with_capacity(len as usize); + for _ in 0..len { + let header: UntrustedBlockHeader = Readable::read(reader)?; + headers.push(header.into()); + } + Ok(HeaderSegment { + identifier, + headers, + }) + } +} + +impl Writeable for HeaderSegment { + fn write(&self, writer: &mut W) -> Result<(), ser::Error> { + if self.headers.len() > MAX_BLOCK_HEADERS as usize { + return Err(ser::Error::TooLargeReadErr); + } + self.identifier.write(writer)?; + writer.write_u16(self.headers.len() as u16)?; + for h in &self.headers { + h.write(writer)?; + } + Ok(()) + } +} + pub struct Ping { /// total difficulty accumulated by the sender, used to check whether sync /// may be needed @@ -926,6 +969,8 @@ pub enum Message { RangeProofSegment(SegmentResponse), GetKernelSegment(SegmentRequest), KernelSegment(SegmentResponse), + GetHeaderSegment(SegmentIdentifier), + HeaderSegment(HeaderSegment), } /// We receive 512 headers from a peer. @@ -970,6 +1015,8 @@ impl fmt::Display for Message { Message::RangeProofSegment(_) => write!(f, "range proof segment"), Message::GetKernelSegment(_) => write!(f, "get kernel segment"), Message::KernelSegment(_) => write!(f, "kernel segment"), + Message::GetHeaderSegment(_) => write!(f, "get header segment"), + Message::HeaderSegment(_) => write!(f, "header segment"), } } } diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index c35047e3de..91f8af4318 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -35,8 +35,8 @@ use crate::msg::{ }; use crate::protocol::Protocol; use crate::types::{ - Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan, - TxHashSetRead, + Capabilities, ChainAdapter, Error, HeaderSegmentAcceptance, NetAdapter, P2PConfig, PeerAddr, + PeerInfo, ReasonForBan, TxHashSetRead, }; use crate::util::secp::pedersen::RangeProof; use chrono::prelude::{DateTime, Utc}; @@ -328,6 +328,11 @@ impl Peer { self.send(&Locator { hashes: locator }, msg::Type::GetHeaders) } + /// Sends a request for a deterministic header segment. + pub fn send_header_segment_request(&self, identifier: SegmentIdentifier) -> Result<(), Error> { + self.send(&identifier, msg::Type::GetHeaderSegment) + } + pub fn send_tx_request(&self, h: Hash) -> Result<(), Error> { debug!( "Requesting tx (kernel hash) {} from peer {}.", @@ -569,6 +574,14 @@ impl ChainAdapter for TrackingAdapter { self.adapter.locate_headers(locator) } + fn locate_header_segment( + &self, + id: SegmentIdentifier, + peer_info: &PeerInfo, + ) -> Result>, chain::Error> { + self.adapter.locate_header_segment(id, peer_info) + } + fn get_block(&self, h: Hash, peer_info: &PeerInfo) -> Option { self.adapter.get_block(h, peer_info) } @@ -685,6 +698,15 @@ impl ChainAdapter for TrackingAdapter { self.adapter .receive_kernel_segment(block_hash, segment, peer_info) } + + fn receive_header_segment( + &self, + id: SegmentIdentifier, + headers: &[core::BlockHeader], + peer_info: &PeerInfo, + ) -> Result { + self.adapter.receive_header_segment(id, headers, peer_info) + } } impl NetAdapter for TrackingAdapter { diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index 581988e3d0..cf5759b3fe 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -32,8 +32,8 @@ use crate::msg::PeerAddrs; use crate::peer::Peer; use crate::store::{PeerData, PeerStore, State}; use crate::types::{ - Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan, - TxHashSetRead, MAX_PEER_ADDRS, + Capabilities, ChainAdapter, Error, HeaderSegmentAcceptance, NetAdapter, P2PConfig, PeerAddr, + PeerInfo, ReasonForBan, TxHashSetRead, MAX_PEER_ADDRS, }; use crate::util::secp::pedersen::RangeProof; use chrono::prelude::*; @@ -662,6 +662,14 @@ impl ChainAdapter for Peers { self.adapter.locate_headers(hs) } + fn locate_header_segment( + &self, + id: SegmentIdentifier, + peer_info: &PeerInfo, + ) -> Result>, chain::Error> { + self.adapter.locate_header_segment(id, peer_info) + } + fn get_block(&self, h: Hash, peer_info: &PeerInfo) -> Option { self.adapter.get_block(h, peer_info) } @@ -820,6 +828,25 @@ impl ChainAdapter for Peers { Ok(true) } } + + fn receive_header_segment( + &self, + id: SegmentIdentifier, + headers: &[core::BlockHeader], + peer_info: &PeerInfo, + ) -> Result { + match self + .adapter + .receive_header_segment(id, headers, peer_info)? + { + HeaderSegmentAcceptance::Accepted => Ok(HeaderSegmentAcceptance::Accepted), + HeaderSegmentAcceptance::Ban => { + self.ban_peer(peer_info.addr, ReasonForBan::BadBlockHeader) + .map_err(|e| chain::Error::Other(format!("ban peer error: {:?}", e)))?; + Ok(HeaderSegmentAcceptance::Ban) + } + } + } } impl NetAdapter for Peers { diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index 408453e528..0bb7ef8316 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -17,8 +17,9 @@ use crate::conn::MessageHandler; use crate::core::core::{hash::Hashed, CompactBlock}; use crate::msg::{ - Consumed, Headers, Message, Msg, OutputBitmapSegmentResponse, OutputSegmentResponse, PeerAddrs, - Pong, SegmentRequest, SegmentResponse, TxHashSetArchive, Type, + Consumed, HeaderSegment, Headers, Message, Msg, OutputBitmapSegmentResponse, + OutputSegmentResponse, PeerAddrs, Pong, SegmentRequest, SegmentResponse, TxHashSetArchive, + Type, }; use crate::types::{AttachmentMeta, Error, NetAdapter, PeerInfo}; use chrono::prelude::Utc; @@ -192,6 +193,28 @@ impl MessageHandler for Protocol { )?) } + Message::GetHeaderSegment(identifier) => { + if !self + .peer_info + .capabilities + .contains(crate::types::Capabilities::PIHD_HIST) + { + return Ok(Consumed::None); + } + if let Some(headers) = adapter.locate_header_segment(identifier, &self.peer_info)? { + Consumed::Response(Msg::new( + Type::HeaderSegment, + HeaderSegment { + identifier, + headers, + }, + self.peer_info.version, + )?) + } else { + Consumed::None + } + } + // "header first" block propagation - if we have not yet seen this block // we can go request it from some of our peers Message::Header(header) => { @@ -204,6 +227,15 @@ impl MessageHandler for Protocol { Consumed::None } + Message::HeaderSegment(segment) => { + adapter.receive_header_segment( + segment.identifier, + &segment.headers, + &self.peer_info, + )?; + Consumed::None + } + Message::GetPeerAddrs(get_peers) => { let peers = adapter.find_peer_addrs(get_peers.capabilities); Consumed::Response(Msg::new( @@ -431,3 +463,85 @@ impl MessageHandler for Protocol { Ok(consumed) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::conn::Tracker; + use crate::core::core::SegmentIdentifier; + use crate::core::global; + use crate::core::pow::Difficulty; + use crate::core::ser::ProtocolVersion; + use crate::msg::{read_message, write_message}; + use crate::serv::DummyAdapter; + use crate::types::{ + Capabilities, Direction, PeerAddr, PeerLiveInfo, PIHD_HEADER_SEGMENT_HEIGHT, + }; + use crate::util::RwLock; + use std::io::Cursor; + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use std::sync::atomic::AtomicBool; + + fn test_peer_info(capabilities: Capabilities) -> PeerInfo { + PeerInfo { + capabilities, + user_agent: "test".to_owned(), + version: ProtocolVersion::local(), + addr: PeerAddr(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 10000)), + direction: Direction::Outbound, + live_info: Arc::new(RwLock::new(PeerLiveInfo::new(Difficulty::zero()))), + } + } + + #[test] + fn get_header_segment_returns_header_segment_response() { + global::set_local_chain_type(global::ChainTypes::AutomatedTesting); + let protocol = Protocol::new( + Arc::new(DummyAdapter {}), + test_peer_info(Capabilities::default()), + Arc::new(AtomicBool::new(false)), + ); + let identifier = SegmentIdentifier { + height: PIHD_HEADER_SEGMENT_HEIGHT, + idx: 2, + }; + + let consumed = protocol + .consume(Message::GetHeaderSegment(identifier)) + .expect("get header segment response"); + let response = match consumed { + Consumed::Response(response) => response, + other => panic!("expected response, got {:?}", other), + }; + + let mut bytes = vec![]; + write_message(&mut bytes, &response, Arc::new(Tracker::new())).expect("write response"); + let segment: HeaderSegment = read_message( + &mut Cursor::new(bytes), + ProtocolVersion::local(), + Type::HeaderSegment, + ) + .expect("read header segment response"); + + assert_eq!(segment.identifier, identifier); + assert!(segment.headers.is_empty()); + } + + #[test] + fn get_header_segment_requires_pihd_capability() { + let protocol = Protocol::new( + Arc::new(DummyAdapter {}), + test_peer_info(Capabilities::HEADER_HIST), + Arc::new(AtomicBool::new(false)), + ); + + let consumed = protocol + .consume(Message::GetHeaderSegment(SegmentIdentifier { + height: PIHD_HEADER_SEGMENT_HEIGHT, + idx: 0, + })) + .expect("get header segment handling"); + + assert!(matches!(consumed, Consumed::None)); + } +} diff --git a/p2p/src/serv.rs b/p2p/src/serv.rs index 2955187bac..57ac9631c5 100644 --- a/p2p/src/serv.rs +++ b/p2p/src/serv.rs @@ -32,8 +32,8 @@ use crate::peer::Peer; use crate::peers::Peers; use crate::store::PeerStore; use crate::types::{ - Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan, - TxHashSetRead, + Capabilities, ChainAdapter, Error, HeaderSegmentAcceptance, NetAdapter, P2PConfig, PeerAddr, + PeerInfo, ReasonForBan, TxHashSetRead, }; use crate::util::secp::pedersen::RangeProof; use crate::util::StopState; @@ -362,6 +362,13 @@ impl ChainAdapter for DummyAdapter { fn locate_headers(&self, _: &[Hash]) -> Result, chain::Error> { Ok(vec![]) } + fn locate_header_segment( + &self, + _: SegmentIdentifier, + _: &PeerInfo, + ) -> Result>, chain::Error> { + Ok(Some(vec![])) + } fn get_block(&self, _: Hash, _: &PeerInfo) -> Option { None } @@ -472,6 +479,15 @@ impl ChainAdapter for DummyAdapter { ) -> Result { unimplemented!() } + + fn receive_header_segment( + &self, + _id: SegmentIdentifier, + _headers: &[core::BlockHeader], + _peer_info: &PeerInfo, + ) -> Result { + Ok(HeaderSegmentAcceptance::Accepted) + } } impl NetAdapter for DummyAdapter { diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 07f8df74d2..2c1a3f2590 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -40,6 +40,9 @@ use crate::util::RwLock; /// Maximum number of block headers a peer should ever send pub const MAX_BLOCK_HEADERS: u32 = 512; +/// Header segment height used for PIHD header segment requests. +pub const PIHD_HEADER_SEGMENT_HEIGHT: u8 = 9; + /// Maximum number of block bodies a peer should ever ask for and send #[allow(dead_code)] pub const MAX_BLOCK_BODIES: u32 = 16; @@ -396,6 +399,8 @@ bitflags! { const BLOCK_HIST = 0b0010_0000; /// As above, with crucial serialization fix #3705 applied const PIBD_HIST_1 = 0b0100_0000; + /// Can provide deterministic historical header segments. + const PIHD_HIST = 0b1000_0000; } } @@ -408,6 +413,7 @@ impl Default for Capabilities { | Capabilities::TX_KERNEL_HASH | Capabilities::PIBD_HIST | Capabilities::PIBD_HIST_1 + | Capabilities::PIHD_HIST } } @@ -547,6 +553,15 @@ pub struct TxHashSetRead { pub reader: File, } +/// Result of processing a PIHD header segment response. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum HeaderSegmentAcceptance { + /// Segment was accepted or intentionally ignored. + Accepted, + /// Segment is malformed or invalid enough to justify banning the sender. + Ban, +} + /// Bridge between the networking layer and the rest of the system. Handles the /// forwarding or querying of blocks and transactions from the network among /// other things. @@ -606,6 +621,13 @@ pub trait ChainAdapter: Sync + Send { /// immediately. fn locate_headers(&self, locator: &[Hash]) -> Result, chain::Error>; + /// Finds a deterministic header segment based on the provided segment identifier. + fn locate_header_segment( + &self, + id: SegmentIdentifier, + peer_info: &PeerInfo, + ) -> Result>, chain::Error>; + /// Gets a full block by its hash. /// Converts block to v2 compatibility if necessary (based on peer protocol version). fn get_block(&self, h: Hash, peer_info: &PeerInfo) -> Option; @@ -703,6 +725,13 @@ pub trait ChainAdapter: Sync + Send { segment: Segment, peer_info: &PeerInfo, ) -> Result; + + fn receive_header_segment( + &self, + id: SegmentIdentifier, + headers: &[core::BlockHeader], + peer_info: &PeerInfo, + ) -> Result; } /// Additional methods required by the protocol that don't need to be diff --git a/p2p/tests/capabilities.rs b/p2p/tests/capabilities.rs index 5b20e099b4..c6a8bf74c1 100644 --- a/p2p/tests/capabilities.rs +++ b/p2p/tests/capabilities.rs @@ -43,6 +43,7 @@ fn default_capabilities() { assert!(x.contains(Capabilities::TX_KERNEL_HASH)); assert!(x.contains(Capabilities::PIBD_HIST)); assert!(x.contains(Capabilities::PIBD_HIST_1)); + assert!(x.contains(Capabilities::PIHD_HIST)); assert_eq!( x, @@ -52,5 +53,6 @@ fn default_capabilities() { | Capabilities::TX_KERNEL_HASH | Capabilities::PIBD_HIST | Capabilities::PIBD_HIST_1 + | Capabilities::PIHD_HIST ); } diff --git a/p2p/tests/ser_deser.rs b/p2p/tests/ser_deser.rs index 1d50aff6a4..613b183071 100644 --- a/p2p/tests/ser_deser.rs +++ b/p2p/tests/ser_deser.rs @@ -50,20 +50,20 @@ fn test_capabilities() { ); assert_eq!( p2p::types::Capabilities::from_bits_truncate(0b10000000 as u32), - p2p::types::Capabilities::UNKNOWN + p2p::types::Capabilities::PIHD_HIST ); assert_eq!( expected, - p2p::types::Capabilities::from_bits_truncate(0b1011111 as u32), + p2p::types::Capabilities::from_bits_truncate(0b11011111 as u32), ); assert_eq!( expected, - p2p::types::Capabilities::from_bits_truncate(0b01011111 as u32), + p2p::types::Capabilities::from_bits_truncate(0b11011111 as u32), ); - assert!(p2p::types::Capabilities::from_bits_truncate(0b01011111 as u32).contains(expected)); + assert!(p2p::types::Capabilities::from_bits_truncate(0b11011111 as u32).contains(expected)); assert!( p2p::types::Capabilities::from_bits_truncate(0b00101111 as u32) diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index 131b76b489..9a28620a48 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -16,7 +16,9 @@ //! events to consumers of those events. use crate::util::RwLock; +use std::collections::HashMap; use std::fs::File; +use std::net::SocketAddr; use std::panic::{catch_unwind, AssertUnwindSafe}; use std::path::PathBuf; use std::sync::{mpsc, Arc, Weak}; @@ -41,7 +43,7 @@ use crate::core::pow::Difficulty; use crate::core::ser::ProtocolVersion; use crate::core::{core, global}; use crate::p2p; -use crate::p2p::types::PeerInfo; +use crate::p2p::types::{HeaderSegmentAcceptance, PeerInfo}; use crate::pool::{self, BlockChain, PoolAdapter}; use crate::util::secp::pedersen::RangeProof; use crate::util::OneTime; @@ -56,6 +58,17 @@ const BITMAP_SEGMENT_HEIGHT_RANGE: Range = 9..14; const OUTPUT_SEGMENT_HEIGHT_RANGE: Range = 11..16; const RANGEPROOF_SEGMENT_HEIGHT_RANGE: Range = 7..12; const WORKER_CHANNEL_BUFFER_SIZE: usize = 64; +const MAX_CACHED_HEADER_BATCHES: usize = 16; +const HEADER_SEGMENT_REQUEST_WINDOW_SECS: i64 = 60; +const MAX_HEADER_SEGMENT_REQUESTS_PER_WINDOW: usize = 1000; +const HEADER_BATCH_CACHE_LOOKAHEAD: u64 = + MAX_CACHED_HEADER_BATCHES as u64 * p2p::MAX_BLOCK_HEADERS as u64; + +#[derive(Clone)] +struct HeaderBatch { + headers: Vec, + peer_info: PeerInfo, +} /// Implementation of the NetAdapter for the . Gets notified when new /// blocks and transactions are received and forwards to the chain and pool @@ -71,6 +84,8 @@ where peers: OneTime>, config: ServerConfig, hooks: Vec>, + header_batch_cache: RwLock>, + header_segment_requests: RwLock, usize)>>, tx: mpsc::SyncSender, } @@ -335,27 +350,7 @@ where } }; - match self - .chain() - .sync_block_headers(bhs, sync_head, chain::Options::SYNC) - { - Ok(sync_head) => { - // If we have an updated sync_head after processing this batch of headers - // then update our sync_state so we can request relevant headers in the next batch. - if let Some(sync_head) = sync_head { - self.sync_state.update_header_sync(sync_head); - } - Ok(true) - } - Err(e) => { - debug!("Block headers refused by chain: {:?}", e); - if e.is_bad_data() { - Ok(false) - } else { - Err(e) - } - } - } + self.cache_and_process_header_batch(bhs, peer_info, sync_head) } fn locate_headers(&self, locator: &[Hash]) -> Result, chain::Error> { @@ -393,6 +388,60 @@ where Ok(headers) } + fn locate_header_segment( + &self, + id: SegmentIdentifier, + peer_info: &PeerInfo, + ) -> Result>, chain::Error> { + if !peer_info + .capabilities + .contains(p2p::Capabilities::PIHD_HIST) + || id.height != p2p::PIHD_HEADER_SEGMENT_HEIGHT + { + return Ok(None); + } + if !self.header_segment_request_allowed(peer_info.addr.0) { + warn!( + "throttling PIHD header segment request {:?} from {}", + id, peer_info.addr + ); + return Ok(None); + } + + let segment_capacity = id.segment_capacity(); + let start_height = match id + .idx + .checked_mul(segment_capacity) + .and_then(|height| height.checked_add(1)) + { + Some(height) => height, + None => return Ok(None), + }; + let max_height = self.chain().header_head()?.height; + let end_height = match start_height + .checked_add(segment_capacity) + .and_then(|height| height.checked_sub(1)) + { + Some(height) => std::cmp::min(height, max_height), + None => max_height, + }; + if start_height > end_height { + return Ok(Some(vec![])); + } + + let header_pmmr = self.chain().header_pmmr(); + let header_pmmr = header_pmmr.read(); + let mut headers = vec![]; + for h in start_height..=end_height { + if let Ok(hash) = header_pmmr.get_header_hash_by_height(h) { + headers.push(self.chain().get_block_header(&hash)?); + } else { + break; + } + } + Ok(Some(headers)) + } + /// Gets a full block by its hash. /// We only support v3 blocks since HF4. /// If a peer is requesting a block and only appears to support v2 @@ -610,6 +659,21 @@ where ) -> Result { self.queue_pibd_segment(PIBDSegment::Kernel(block_hash, segment), peer_info) } + + fn receive_header_segment( + &self, + id: SegmentIdentifier, + headers: &[core::BlockHeader], + peer_info: &PeerInfo, + ) -> Result { + debug!( + "ignoring PIHD header segment {:?} with {} headers from {}", + id, + headers.len(), + peer_info.addr + ); + Ok(HeaderSegmentAcceptance::Accepted) + } } impl NetToChainAdapter @@ -633,6 +697,8 @@ where peers: OneTime::new(), config, hooks, + header_batch_cache: RwLock::new(vec![]), + header_segment_requests: RwLock::new(HashMap::new()), tx, }; adapter.spawn_net_adapter_worker(Arc::downgrade(&chain), rx); @@ -772,6 +838,121 @@ where } } + fn header_segment_request_allowed(&self, peer_addr: SocketAddr) -> bool { + let now = Utc::now(); + let cutoff = now - Duration::seconds(HEADER_SEGMENT_REQUEST_WINDOW_SECS); + let mut requests = self.header_segment_requests.write(); + requests.retain(|_, (window_start, _)| *window_start > cutoff); + let entry = requests.entry(peer_addr).or_insert((now, 0)); + if now > entry.0 + Duration::seconds(HEADER_SEGMENT_REQUEST_WINDOW_SECS) { + *entry = (now, 0); + } + if entry.1 >= MAX_HEADER_SEGMENT_REQUESTS_PER_WINDOW { + return false; + } + entry.1 += 1; + true + } + + fn cache_and_process_header_batch( + &self, + headers: &[BlockHeader], + peer_info: &PeerInfo, + sync_head: chain::Tip, + ) -> Result { + let headers = headers + .iter() + .skip_while(|h| h.height <= sync_head.height) + .cloned() + .collect::>(); + if headers.is_empty() { + return Ok(true); + } + if headers + .first() + .map(|h| { + h.height + > sync_head + .height + .saturating_add(HEADER_BATCH_CACHE_LOOKAHEAD) + }) + .unwrap_or(false) + { + debug!( + "ignoring far-future header batch starting at height {} while sync head is {}", + headers[0].height, sync_head.height + ); + return Ok(true); + } + + { + let mut cache = self.header_batch_cache.write(); + let first = headers.first().map(|h| h.hash()); + let last = headers.last().map(|h| h.hash()); + if !cache.iter().any(|b| { + b.headers.first().map(|h| h.hash()) == first + || b.headers.last().map(|h| h.hash()) == last + }) { + if cache.len() >= MAX_CACHED_HEADER_BATCHES { + cache.remove(0); + } + cache.push(HeaderBatch { + headers, + peer_info: peer_info.clone(), + }); + } + } + + self.process_ready_header_batches(peer_info) + } + + fn process_ready_header_batches(&self, current_peer: &PeerInfo) -> Result { + loop { + let sync_head = match self.sync_state.status() { + SyncStatus::HeaderSync { sync_head, .. } => sync_head, + _ => return Ok(true), + }; + let batch = { + let mut cache = self.header_batch_cache.write(); + cache.sort_by_key(|b| b.headers.first().map(|h| h.height).unwrap_or(u64::MAX)); + let pos = cache.iter().position(|b| { + b.headers.first().map(|h| h.height) == Some(sync_head.height + 1) + }); + match pos { + Some(pos) => cache.remove(pos), + None => return Ok(true), + } + }; + + match self + .chain() + .sync_block_headers(&batch.headers, sync_head, chain::Options::SYNC) + { + Ok(sync_head) => { + if let Some(sync_head) = sync_head { + self.sync_state.update_header_sync(sync_head); + } + } + Err(e) => { + debug!("Block headers refused by chain: {:?}", e); + if e.is_bad_data() { + if batch.peer_info.addr == current_peer.addr { + return Ok(false); + } + if let Err(e) = self.peers().ban_peer( + batch.peer_info.addr, + p2p::types::ReasonForBan::BadBlockHeader, + ) { + error!("failed to ban peer {}: {:?}", batch.peer_info.addr, e); + } + } else { + return Err(e); + } + } + } + } + } + // Find the first locator hash that refers to a known header on our main chain. fn find_common_header(&self, locator: &[Hash]) -> Option { let header_pmmr = self.chain().header_pmmr(); From 206b21379fc3f07596e3682b5fa270ffc5044e5d Mon Sep 17 00:00:00 2001 From: wiesche Date: Fri, 26 Jun 2026 18:47:59 +0200 Subject: [PATCH 2/4] remove header sync cache from PIHD protocol --- p2p/src/peer.rs | 5 -- p2p/tests/ser_deser.rs | 5 -- servers/src/common/adapters.rs | 134 ++++++--------------------------- 3 files changed, 22 insertions(+), 122 deletions(-) diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 91f8af4318..e26024ffa7 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -328,11 +328,6 @@ impl Peer { self.send(&Locator { hashes: locator }, msg::Type::GetHeaders) } - /// Sends a request for a deterministic header segment. - pub fn send_header_segment_request(&self, identifier: SegmentIdentifier) -> Result<(), Error> { - self.send(&identifier, msg::Type::GetHeaderSegment) - } - pub fn send_tx_request(&self, h: Hash) -> Result<(), Error> { debug!( "Requesting tx (kernel hash) {} from peer {}.", diff --git a/p2p/tests/ser_deser.rs b/p2p/tests/ser_deser.rs index 613b183071..f2c0a548c4 100644 --- a/p2p/tests/ser_deser.rs +++ b/p2p/tests/ser_deser.rs @@ -58,11 +58,6 @@ fn test_capabilities() { p2p::types::Capabilities::from_bits_truncate(0b11011111 as u32), ); - assert_eq!( - expected, - p2p::types::Capabilities::from_bits_truncate(0b11011111 as u32), - ); - assert!(p2p::types::Capabilities::from_bits_truncate(0b11011111 as u32).contains(expected)); assert!( diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index 9a28620a48..5bdcb5301b 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -58,17 +58,8 @@ const BITMAP_SEGMENT_HEIGHT_RANGE: Range = 9..14; const OUTPUT_SEGMENT_HEIGHT_RANGE: Range = 11..16; const RANGEPROOF_SEGMENT_HEIGHT_RANGE: Range = 7..12; const WORKER_CHANNEL_BUFFER_SIZE: usize = 64; -const MAX_CACHED_HEADER_BATCHES: usize = 16; const HEADER_SEGMENT_REQUEST_WINDOW_SECS: i64 = 60; -const MAX_HEADER_SEGMENT_REQUESTS_PER_WINDOW: usize = 1000; -const HEADER_BATCH_CACHE_LOOKAHEAD: u64 = - MAX_CACHED_HEADER_BATCHES as u64 * p2p::MAX_BLOCK_HEADERS as u64; - -#[derive(Clone)] -struct HeaderBatch { - headers: Vec, - peer_info: PeerInfo, -} +const MAX_HEADER_SEGMENT_REQUESTS_PER_WINDOW: usize = 120; /// Implementation of the NetAdapter for the . Gets notified when new /// blocks and transactions are received and forwards to the chain and pool @@ -84,7 +75,6 @@ where peers: OneTime>, config: ServerConfig, hooks: Vec>, - header_batch_cache: RwLock>, header_segment_requests: RwLock, usize)>>, tx: mpsc::SyncSender, } @@ -350,7 +340,27 @@ where } }; - self.cache_and_process_header_batch(bhs, peer_info, sync_head) + match self + .chain() + .sync_block_headers(bhs, sync_head, chain::Options::SYNC) + { + Ok(sync_head) => { + // If we have an updated sync_head after processing this batch of headers + // then update our sync_state so we can request relevant headers in the next batch. + if let Some(sync_head) = sync_head { + self.sync_state.update_header_sync(sync_head); + } + Ok(true) + } + Err(e) => { + debug!("Block headers refused by chain: {:?}", e); + if e.is_bad_data() { + Ok(false) + } else { + Err(e) + } + } + } } fn locate_headers(&self, locator: &[Hash]) -> Result, chain::Error> { @@ -697,7 +707,6 @@ where peers: OneTime::new(), config, hooks, - header_batch_cache: RwLock::new(vec![]), header_segment_requests: RwLock::new(HashMap::new()), tx, }; @@ -854,105 +863,6 @@ where true } - fn cache_and_process_header_batch( - &self, - headers: &[BlockHeader], - peer_info: &PeerInfo, - sync_head: chain::Tip, - ) -> Result { - let headers = headers - .iter() - .skip_while(|h| h.height <= sync_head.height) - .cloned() - .collect::>(); - if headers.is_empty() { - return Ok(true); - } - if headers - .first() - .map(|h| { - h.height - > sync_head - .height - .saturating_add(HEADER_BATCH_CACHE_LOOKAHEAD) - }) - .unwrap_or(false) - { - debug!( - "ignoring far-future header batch starting at height {} while sync head is {}", - headers[0].height, sync_head.height - ); - return Ok(true); - } - - { - let mut cache = self.header_batch_cache.write(); - let first = headers.first().map(|h| h.hash()); - let last = headers.last().map(|h| h.hash()); - if !cache.iter().any(|b| { - b.headers.first().map(|h| h.hash()) == first - || b.headers.last().map(|h| h.hash()) == last - }) { - if cache.len() >= MAX_CACHED_HEADER_BATCHES { - cache.remove(0); - } - cache.push(HeaderBatch { - headers, - peer_info: peer_info.clone(), - }); - } - } - - self.process_ready_header_batches(peer_info) - } - - fn process_ready_header_batches(&self, current_peer: &PeerInfo) -> Result { - loop { - let sync_head = match self.sync_state.status() { - SyncStatus::HeaderSync { sync_head, .. } => sync_head, - _ => return Ok(true), - }; - let batch = { - let mut cache = self.header_batch_cache.write(); - cache.sort_by_key(|b| b.headers.first().map(|h| h.height).unwrap_or(u64::MAX)); - let pos = cache.iter().position(|b| { - b.headers.first().map(|h| h.height) == Some(sync_head.height + 1) - }); - match pos { - Some(pos) => cache.remove(pos), - None => return Ok(true), - } - }; - - match self - .chain() - .sync_block_headers(&batch.headers, sync_head, chain::Options::SYNC) - { - Ok(sync_head) => { - if let Some(sync_head) = sync_head { - self.sync_state.update_header_sync(sync_head); - } - } - Err(e) => { - debug!("Block headers refused by chain: {:?}", e); - if e.is_bad_data() { - if batch.peer_info.addr == current_peer.addr { - return Ok(false); - } - if let Err(e) = self.peers().ban_peer( - batch.peer_info.addr, - p2p::types::ReasonForBan::BadBlockHeader, - ) { - error!("failed to ban peer {}: {:?}", batch.peer_info.addr, e); - } - } else { - return Err(e); - } - } - } - } - } - // Find the first locator hash that refers to a known header on our main chain. fn find_common_header(&self, locator: &[Hash]) -> Option { let header_pmmr = self.chain().header_pmmr(); From 6f9aabb4f58b5e1d03174d4faf32cbfed15c2179 Mon Sep 17 00:00:00 2001 From: wiesche Date: Fri, 26 Jun 2026 19:21:56 +0200 Subject: [PATCH 3/4] add header segment length test --- p2p/tests/ser_deser.rs | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/p2p/tests/ser_deser.rs b/p2p/tests/ser_deser.rs index f2c0a548c4..e07c642419 100644 --- a/p2p/tests/ser_deser.rs +++ b/p2p/tests/ser_deser.rs @@ -14,6 +14,8 @@ use grin_p2p as p2p; +use grin_core::core::SegmentIdentifier; +use grin_core::ser::{self, ProtocolVersion}; use num::FromPrimitive; // Test that Healthy == 0. @@ -65,3 +67,31 @@ fn test_capabilities() { .contains(p2p::types::Capabilities::TX_KERNEL_HASH) ); } + +#[test] +fn test_header_segment_limit() { + let mut bytes = vec![]; + ser::serialize( + &mut bytes, + ProtocolVersion::local(), + &SegmentIdentifier { + height: p2p::PIHD_HEADER_SEGMENT_HEIGHT, + idx: 0, + }, + ) + .unwrap(); + ser::serialize( + &mut bytes, + ProtocolVersion::local(), + &((p2p::MAX_BLOCK_HEADERS + 1) as u16), + ) + .unwrap(); + + let res: Result = ser::deserialize( + &mut &bytes[..], + ProtocolVersion::local(), + ser::DeserializationMode::default(), + ); + + assert_eq!(res.err(), Some(ser::Error::TooLargeReadErr)); +} From 0b723fc1bb2d7ebb8269da7ab8f37ec205540259 Mon Sep 17 00:00:00 2001 From: wiesche Date: Fri, 26 Jun 2026 23:23:04 +0200 Subject: [PATCH 4/4] fix max header message size --- p2p/src/msg.rs | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index 051ec5f993..6d7743c248 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -109,6 +109,14 @@ fn default_max_msg_size() -> u64 { max_block_size() } +// Max serialized header size for the largest proof accepted by Proof::read(). +fn max_header_size() -> u64 { + let header_bytes = 2 + 2 * 8 + 5 * 32 + 32 + 2 * 8; + let pow_bytes = 8 + 4 + 8 + 1; + let proof_bytes = (63 * consensus::PROOFSIZE + 7) / 8; + (header_bytes + pow_bytes + proof_bytes) as u64 +} + // Max msg size for each msg type. fn max_msg_size(msg_type: Type) -> u64 { match msg_type { @@ -120,8 +128,8 @@ fn max_msg_size(msg_type: Type) -> u64 { Type::GetPeerAddrs => 4, Type::PeerAddrs => 4 + (1 + 16 + 2) * MAX_PEER_ADDRS as u64, Type::GetHeaders => 1 + 32 * MAX_LOCATORS as u64, - Type::Header => 365, - Type::Headers => 2 + 365 * MAX_BLOCK_HEADERS as u64, + Type::Header => max_header_size(), + Type::Headers => 2 + max_header_size() * MAX_BLOCK_HEADERS as u64, Type::GetBlock => 32, Type::Block => max_block_size(), Type::GetCompactBlock => 32, @@ -142,7 +150,7 @@ fn max_msg_size(msg_type: Type) -> u64 { Type::GetKernelSegment => 41, Type::KernelSegment => 2 * max_block_size(), Type::GetHeaderSegment => 9, - Type::HeaderSegment => 11 + 365 * MAX_BLOCK_HEADERS as u64, + Type::HeaderSegment => 11 + max_header_size() * MAX_BLOCK_HEADERS as u64, } } @@ -1044,3 +1052,11 @@ impl fmt::Debug for Consumed { } } } + +#[cfg(test)] +mod tests { + #[test] + fn test_max_header_size_limit() { + assert_eq!(super::max_header_size(), 578); + } +}