diff --git a/p2p/src/codec.rs b/p2p/src/codec.rs index 53278d777..7891e6f85 100644 --- a/p2p/src/codec.rs +++ b/p2p/src/codec.rs @@ -273,6 +273,8 @@ fn decode_message( Type::RangeProofSegment => Message::RangeProofSegment(msg.body()?), Type::GetKernelSegment => Message::GetKernelSegment(msg.body()?), Type::KernelSegment => Message::KernelSegment(msg.body()?), + Type::GetHeaderSegment => Message::GetHeaderSegment(msg.body()?), + Type::HeaderSegment => Message::HeaderSegment(msg.body()?), Type::Error | Type::Hand | Type::Shake | Type::Headers => { return Err(Error::UnexpectedMessage) } diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index a71e861be..e0d8bc10c 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -55,4 +55,5 @@ pub use crate::store::{PeerData, State}; pub use crate::types::{ Capabilities, ChainAdapter, Direction, Error, P2PConfig, PeerAddr, PeerInfo, ReasonForBan, Seeding, TxHashSetRead, MAX_BLOCK_HEADERS, MAX_LOCATORS, MAX_PEER_ADDRS, + PIHD_HEADER_SEGMENT_HEIGHT, }; diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index 69c448e92..6d7743c24 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -94,6 +94,8 @@ enum_from_primitive! { RangeProofSegment = 26, GetKernelSegment = 27, KernelSegment = 28, + GetHeaderSegment = 29, + HeaderSegment = 30, } } @@ -107,6 +109,14 @@ fn default_max_msg_size() -> u64 { max_block_size() } +// Max serialized header size for the largest proof accepted by Proof::read(). +fn max_header_size() -> u64 { + let header_bytes = 2 + 2 * 8 + 5 * 32 + 32 + 2 * 8; + let pow_bytes = 8 + 4 + 8 + 1; + let proof_bytes = (63 * consensus::PROOFSIZE + 7) / 8; + (header_bytes + pow_bytes + proof_bytes) as u64 +} + // Max msg size for each msg type. fn max_msg_size(msg_type: Type) -> u64 { match msg_type { @@ -118,8 +128,8 @@ fn max_msg_size(msg_type: Type) -> u64 { Type::GetPeerAddrs => 4, Type::PeerAddrs => 4 + (1 + 16 + 2) * MAX_PEER_ADDRS as u64, Type::GetHeaders => 1 + 32 * MAX_LOCATORS as u64, - Type::Header => 365, - Type::Headers => 2 + 365 * MAX_BLOCK_HEADERS as u64, + Type::Header => max_header_size(), + Type::Headers => 2 + max_header_size() * MAX_BLOCK_HEADERS as u64, Type::GetBlock => 32, Type::Block => max_block_size(), Type::GetCompactBlock => 32, @@ -139,6 +149,8 @@ fn max_msg_size(msg_type: Type) -> u64 { Type::RangeProofSegment => 2 * max_block_size(), Type::GetKernelSegment => 41, Type::KernelSegment => 2 * max_block_size(), + Type::GetHeaderSegment => 9, + Type::HeaderSegment => 11 + max_header_size() * MAX_BLOCK_HEADERS as u64, } } @@ -649,6 +661,45 @@ impl Writeable for Headers { } } +/// Serializable wrapper for a deterministic header segment. +pub struct HeaderSegment { + pub identifier: SegmentIdentifier, + pub headers: Vec, +} + +impl Readable for HeaderSegment { + fn read(reader: &mut R) -> Result { + let identifier = SegmentIdentifier::read(reader)?; + let len = reader.read_u16()?; + if len > (MAX_BLOCK_HEADERS as u16) { + return Err(ser::Error::TooLargeReadErr); + } + let mut headers = Vec::with_capacity(len as usize); + for _ in 0..len { + let header: UntrustedBlockHeader = Readable::read(reader)?; + headers.push(header.into()); + } + Ok(HeaderSegment { + identifier, + headers, + }) + } +} + +impl Writeable for HeaderSegment { + fn write(&self, writer: &mut W) -> Result<(), ser::Error> { + if self.headers.len() > MAX_BLOCK_HEADERS as usize { + return Err(ser::Error::TooLargeReadErr); + } + self.identifier.write(writer)?; + writer.write_u16(self.headers.len() as u16)?; + for h in &self.headers { + h.write(writer)?; + } + Ok(()) + } +} + pub struct Ping { /// total difficulty accumulated by the sender, used to check whether sync /// may be needed @@ -926,6 +977,8 @@ pub enum Message { RangeProofSegment(SegmentResponse), GetKernelSegment(SegmentRequest), KernelSegment(SegmentResponse), + GetHeaderSegment(SegmentIdentifier), + HeaderSegment(HeaderSegment), } /// We receive 512 headers from a peer. @@ -970,6 +1023,8 @@ impl fmt::Display for Message { Message::RangeProofSegment(_) => write!(f, "range proof segment"), Message::GetKernelSegment(_) => write!(f, "get kernel segment"), Message::KernelSegment(_) => write!(f, "kernel segment"), + Message::GetHeaderSegment(_) => write!(f, "get header segment"), + Message::HeaderSegment(_) => write!(f, "header segment"), } } } @@ -997,3 +1052,11 @@ impl fmt::Debug for Consumed { } } } + +#[cfg(test)] +mod tests { + #[test] + fn test_max_header_size_limit() { + assert_eq!(super::max_header_size(), 578); + } +} diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index c35047e3d..e26024ffa 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -35,8 +35,8 @@ use crate::msg::{ }; use crate::protocol::Protocol; use crate::types::{ - Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan, - TxHashSetRead, + Capabilities, ChainAdapter, Error, HeaderSegmentAcceptance, NetAdapter, P2PConfig, PeerAddr, + PeerInfo, ReasonForBan, TxHashSetRead, }; use crate::util::secp::pedersen::RangeProof; use chrono::prelude::{DateTime, Utc}; @@ -569,6 +569,14 @@ impl ChainAdapter for TrackingAdapter { self.adapter.locate_headers(locator) } + fn locate_header_segment( + &self, + id: SegmentIdentifier, + peer_info: &PeerInfo, + ) -> Result>, chain::Error> { + self.adapter.locate_header_segment(id, peer_info) + } + fn get_block(&self, h: Hash, peer_info: &PeerInfo) -> Option { self.adapter.get_block(h, peer_info) } @@ -685,6 +693,15 @@ impl ChainAdapter for TrackingAdapter { self.adapter .receive_kernel_segment(block_hash, segment, peer_info) } + + fn receive_header_segment( + &self, + id: SegmentIdentifier, + headers: &[core::BlockHeader], + peer_info: &PeerInfo, + ) -> Result { + self.adapter.receive_header_segment(id, headers, peer_info) + } } impl NetAdapter for TrackingAdapter { diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index 581988e3d..cf5759b3f 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -32,8 +32,8 @@ use crate::msg::PeerAddrs; use crate::peer::Peer; use crate::store::{PeerData, PeerStore, State}; use crate::types::{ - Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan, - TxHashSetRead, MAX_PEER_ADDRS, + Capabilities, ChainAdapter, Error, HeaderSegmentAcceptance, NetAdapter, P2PConfig, PeerAddr, + PeerInfo, ReasonForBan, TxHashSetRead, MAX_PEER_ADDRS, }; use crate::util::secp::pedersen::RangeProof; use chrono::prelude::*; @@ -662,6 +662,14 @@ impl ChainAdapter for Peers { self.adapter.locate_headers(hs) } + fn locate_header_segment( + &self, + id: SegmentIdentifier, + peer_info: &PeerInfo, + ) -> Result>, chain::Error> { + self.adapter.locate_header_segment(id, peer_info) + } + fn get_block(&self, h: Hash, peer_info: &PeerInfo) -> Option { self.adapter.get_block(h, peer_info) } @@ -820,6 +828,25 @@ impl ChainAdapter for Peers { Ok(true) } } + + fn receive_header_segment( + &self, + id: SegmentIdentifier, + headers: &[core::BlockHeader], + peer_info: &PeerInfo, + ) -> Result { + match self + .adapter + .receive_header_segment(id, headers, peer_info)? + { + HeaderSegmentAcceptance::Accepted => Ok(HeaderSegmentAcceptance::Accepted), + HeaderSegmentAcceptance::Ban => { + self.ban_peer(peer_info.addr, ReasonForBan::BadBlockHeader) + .map_err(|e| chain::Error::Other(format!("ban peer error: {:?}", e)))?; + Ok(HeaderSegmentAcceptance::Ban) + } + } + } } impl NetAdapter for Peers { diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index 408453e52..0bb7ef831 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -17,8 +17,9 @@ use crate::conn::MessageHandler; use crate::core::core::{hash::Hashed, CompactBlock}; use crate::msg::{ - Consumed, Headers, Message, Msg, OutputBitmapSegmentResponse, OutputSegmentResponse, PeerAddrs, - Pong, SegmentRequest, SegmentResponse, TxHashSetArchive, Type, + Consumed, HeaderSegment, Headers, Message, Msg, OutputBitmapSegmentResponse, + OutputSegmentResponse, PeerAddrs, Pong, SegmentRequest, SegmentResponse, TxHashSetArchive, + Type, }; use crate::types::{AttachmentMeta, Error, NetAdapter, PeerInfo}; use chrono::prelude::Utc; @@ -192,6 +193,28 @@ impl MessageHandler for Protocol { )?) } + Message::GetHeaderSegment(identifier) => { + if !self + .peer_info + .capabilities + .contains(crate::types::Capabilities::PIHD_HIST) + { + return Ok(Consumed::None); + } + if let Some(headers) = adapter.locate_header_segment(identifier, &self.peer_info)? { + Consumed::Response(Msg::new( + Type::HeaderSegment, + HeaderSegment { + identifier, + headers, + }, + self.peer_info.version, + )?) + } else { + Consumed::None + } + } + // "header first" block propagation - if we have not yet seen this block // we can go request it from some of our peers Message::Header(header) => { @@ -204,6 +227,15 @@ impl MessageHandler for Protocol { Consumed::None } + Message::HeaderSegment(segment) => { + adapter.receive_header_segment( + segment.identifier, + &segment.headers, + &self.peer_info, + )?; + Consumed::None + } + Message::GetPeerAddrs(get_peers) => { let peers = adapter.find_peer_addrs(get_peers.capabilities); Consumed::Response(Msg::new( @@ -431,3 +463,85 @@ impl MessageHandler for Protocol { Ok(consumed) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::conn::Tracker; + use crate::core::core::SegmentIdentifier; + use crate::core::global; + use crate::core::pow::Difficulty; + use crate::core::ser::ProtocolVersion; + use crate::msg::{read_message, write_message}; + use crate::serv::DummyAdapter; + use crate::types::{ + Capabilities, Direction, PeerAddr, PeerLiveInfo, PIHD_HEADER_SEGMENT_HEIGHT, + }; + use crate::util::RwLock; + use std::io::Cursor; + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use std::sync::atomic::AtomicBool; + + fn test_peer_info(capabilities: Capabilities) -> PeerInfo { + PeerInfo { + capabilities, + user_agent: "test".to_owned(), + version: ProtocolVersion::local(), + addr: PeerAddr(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 10000)), + direction: Direction::Outbound, + live_info: Arc::new(RwLock::new(PeerLiveInfo::new(Difficulty::zero()))), + } + } + + #[test] + fn get_header_segment_returns_header_segment_response() { + global::set_local_chain_type(global::ChainTypes::AutomatedTesting); + let protocol = Protocol::new( + Arc::new(DummyAdapter {}), + test_peer_info(Capabilities::default()), + Arc::new(AtomicBool::new(false)), + ); + let identifier = SegmentIdentifier { + height: PIHD_HEADER_SEGMENT_HEIGHT, + idx: 2, + }; + + let consumed = protocol + .consume(Message::GetHeaderSegment(identifier)) + .expect("get header segment response"); + let response = match consumed { + Consumed::Response(response) => response, + other => panic!("expected response, got {:?}", other), + }; + + let mut bytes = vec![]; + write_message(&mut bytes, &response, Arc::new(Tracker::new())).expect("write response"); + let segment: HeaderSegment = read_message( + &mut Cursor::new(bytes), + ProtocolVersion::local(), + Type::HeaderSegment, + ) + .expect("read header segment response"); + + assert_eq!(segment.identifier, identifier); + assert!(segment.headers.is_empty()); + } + + #[test] + fn get_header_segment_requires_pihd_capability() { + let protocol = Protocol::new( + Arc::new(DummyAdapter {}), + test_peer_info(Capabilities::HEADER_HIST), + Arc::new(AtomicBool::new(false)), + ); + + let consumed = protocol + .consume(Message::GetHeaderSegment(SegmentIdentifier { + height: PIHD_HEADER_SEGMENT_HEIGHT, + idx: 0, + })) + .expect("get header segment handling"); + + assert!(matches!(consumed, Consumed::None)); + } +} diff --git a/p2p/src/serv.rs b/p2p/src/serv.rs index 2955187ba..57ac9631c 100644 --- a/p2p/src/serv.rs +++ b/p2p/src/serv.rs @@ -32,8 +32,8 @@ use crate::peer::Peer; use crate::peers::Peers; use crate::store::PeerStore; use crate::types::{ - Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan, - TxHashSetRead, + Capabilities, ChainAdapter, Error, HeaderSegmentAcceptance, NetAdapter, P2PConfig, PeerAddr, + PeerInfo, ReasonForBan, TxHashSetRead, }; use crate::util::secp::pedersen::RangeProof; use crate::util::StopState; @@ -362,6 +362,13 @@ impl ChainAdapter for DummyAdapter { fn locate_headers(&self, _: &[Hash]) -> Result, chain::Error> { Ok(vec![]) } + fn locate_header_segment( + &self, + _: SegmentIdentifier, + _: &PeerInfo, + ) -> Result>, chain::Error> { + Ok(Some(vec![])) + } fn get_block(&self, _: Hash, _: &PeerInfo) -> Option { None } @@ -472,6 +479,15 @@ impl ChainAdapter for DummyAdapter { ) -> Result { unimplemented!() } + + fn receive_header_segment( + &self, + _id: SegmentIdentifier, + _headers: &[core::BlockHeader], + _peer_info: &PeerInfo, + ) -> Result { + Ok(HeaderSegmentAcceptance::Accepted) + } } impl NetAdapter for DummyAdapter { diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 07f8df74d..2c1a3f259 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -40,6 +40,9 @@ use crate::util::RwLock; /// Maximum number of block headers a peer should ever send pub const MAX_BLOCK_HEADERS: u32 = 512; +/// Header segment height used for PIHD header segment requests. +pub const PIHD_HEADER_SEGMENT_HEIGHT: u8 = 9; + /// Maximum number of block bodies a peer should ever ask for and send #[allow(dead_code)] pub const MAX_BLOCK_BODIES: u32 = 16; @@ -396,6 +399,8 @@ bitflags! { const BLOCK_HIST = 0b0010_0000; /// As above, with crucial serialization fix #3705 applied const PIBD_HIST_1 = 0b0100_0000; + /// Can provide deterministic historical header segments. + const PIHD_HIST = 0b1000_0000; } } @@ -408,6 +413,7 @@ impl Default for Capabilities { | Capabilities::TX_KERNEL_HASH | Capabilities::PIBD_HIST | Capabilities::PIBD_HIST_1 + | Capabilities::PIHD_HIST } } @@ -547,6 +553,15 @@ pub struct TxHashSetRead { pub reader: File, } +/// Result of processing a PIHD header segment response. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum HeaderSegmentAcceptance { + /// Segment was accepted or intentionally ignored. + Accepted, + /// Segment is malformed or invalid enough to justify banning the sender. + Ban, +} + /// Bridge between the networking layer and the rest of the system. Handles the /// forwarding or querying of blocks and transactions from the network among /// other things. @@ -606,6 +621,13 @@ pub trait ChainAdapter: Sync + Send { /// immediately. fn locate_headers(&self, locator: &[Hash]) -> Result, chain::Error>; + /// Finds a deterministic header segment based on the provided segment identifier. + fn locate_header_segment( + &self, + id: SegmentIdentifier, + peer_info: &PeerInfo, + ) -> Result>, chain::Error>; + /// Gets a full block by its hash. /// Converts block to v2 compatibility if necessary (based on peer protocol version). fn get_block(&self, h: Hash, peer_info: &PeerInfo) -> Option; @@ -703,6 +725,13 @@ pub trait ChainAdapter: Sync + Send { segment: Segment, peer_info: &PeerInfo, ) -> Result; + + fn receive_header_segment( + &self, + id: SegmentIdentifier, + headers: &[core::BlockHeader], + peer_info: &PeerInfo, + ) -> Result; } /// Additional methods required by the protocol that don't need to be diff --git a/p2p/tests/capabilities.rs b/p2p/tests/capabilities.rs index 5b20e099b..c6a8bf74c 100644 --- a/p2p/tests/capabilities.rs +++ b/p2p/tests/capabilities.rs @@ -43,6 +43,7 @@ fn default_capabilities() { assert!(x.contains(Capabilities::TX_KERNEL_HASH)); assert!(x.contains(Capabilities::PIBD_HIST)); assert!(x.contains(Capabilities::PIBD_HIST_1)); + assert!(x.contains(Capabilities::PIHD_HIST)); assert_eq!( x, @@ -52,5 +53,6 @@ fn default_capabilities() { | Capabilities::TX_KERNEL_HASH | Capabilities::PIBD_HIST | Capabilities::PIBD_HIST_1 + | Capabilities::PIHD_HIST ); } diff --git a/p2p/tests/ser_deser.rs b/p2p/tests/ser_deser.rs index 1d50aff6a..e07c64241 100644 --- a/p2p/tests/ser_deser.rs +++ b/p2p/tests/ser_deser.rs @@ -14,6 +14,8 @@ use grin_p2p as p2p; +use grin_core::core::SegmentIdentifier; +use grin_core::ser::{self, ProtocolVersion}; use num::FromPrimitive; // Test that Healthy == 0. @@ -50,23 +52,46 @@ fn test_capabilities() { ); assert_eq!( p2p::types::Capabilities::from_bits_truncate(0b10000000 as u32), - p2p::types::Capabilities::UNKNOWN - ); - - assert_eq!( - expected, - p2p::types::Capabilities::from_bits_truncate(0b1011111 as u32), + p2p::types::Capabilities::PIHD_HIST ); assert_eq!( expected, - p2p::types::Capabilities::from_bits_truncate(0b01011111 as u32), + p2p::types::Capabilities::from_bits_truncate(0b11011111 as u32), ); - assert!(p2p::types::Capabilities::from_bits_truncate(0b01011111 as u32).contains(expected)); + assert!(p2p::types::Capabilities::from_bits_truncate(0b11011111 as u32).contains(expected)); assert!( p2p::types::Capabilities::from_bits_truncate(0b00101111 as u32) .contains(p2p::types::Capabilities::TX_KERNEL_HASH) ); } + +#[test] +fn test_header_segment_limit() { + let mut bytes = vec![]; + ser::serialize( + &mut bytes, + ProtocolVersion::local(), + &SegmentIdentifier { + height: p2p::PIHD_HEADER_SEGMENT_HEIGHT, + idx: 0, + }, + ) + .unwrap(); + ser::serialize( + &mut bytes, + ProtocolVersion::local(), + &((p2p::MAX_BLOCK_HEADERS + 1) as u16), + ) + .unwrap(); + + let res: Result = ser::deserialize( + &mut &bytes[..], + ProtocolVersion::local(), + ser::DeserializationMode::default(), + ); + + assert_eq!(res.err(), Some(ser::Error::TooLargeReadErr)); +} diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index 131b76b48..5bdcb5301 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -16,7 +16,9 @@ //! events to consumers of those events. use crate::util::RwLock; +use std::collections::HashMap; use std::fs::File; +use std::net::SocketAddr; use std::panic::{catch_unwind, AssertUnwindSafe}; use std::path::PathBuf; use std::sync::{mpsc, Arc, Weak}; @@ -41,7 +43,7 @@ use crate::core::pow::Difficulty; use crate::core::ser::ProtocolVersion; use crate::core::{core, global}; use crate::p2p; -use crate::p2p::types::PeerInfo; +use crate::p2p::types::{HeaderSegmentAcceptance, PeerInfo}; use crate::pool::{self, BlockChain, PoolAdapter}; use crate::util::secp::pedersen::RangeProof; use crate::util::OneTime; @@ -56,6 +58,8 @@ const BITMAP_SEGMENT_HEIGHT_RANGE: Range = 9..14; const OUTPUT_SEGMENT_HEIGHT_RANGE: Range = 11..16; const RANGEPROOF_SEGMENT_HEIGHT_RANGE: Range = 7..12; const WORKER_CHANNEL_BUFFER_SIZE: usize = 64; +const HEADER_SEGMENT_REQUEST_WINDOW_SECS: i64 = 60; +const MAX_HEADER_SEGMENT_REQUESTS_PER_WINDOW: usize = 120; /// Implementation of the NetAdapter for the . Gets notified when new /// blocks and transactions are received and forwards to the chain and pool @@ -71,6 +75,7 @@ where peers: OneTime>, config: ServerConfig, hooks: Vec>, + header_segment_requests: RwLock, usize)>>, tx: mpsc::SyncSender, } @@ -393,6 +398,60 @@ where Ok(headers) } + fn locate_header_segment( + &self, + id: SegmentIdentifier, + peer_info: &PeerInfo, + ) -> Result>, chain::Error> { + if !peer_info + .capabilities + .contains(p2p::Capabilities::PIHD_HIST) + || id.height != p2p::PIHD_HEADER_SEGMENT_HEIGHT + { + return Ok(None); + } + if !self.header_segment_request_allowed(peer_info.addr.0) { + warn!( + "throttling PIHD header segment request {:?} from {}", + id, peer_info.addr + ); + return Ok(None); + } + + let segment_capacity = id.segment_capacity(); + let start_height = match id + .idx + .checked_mul(segment_capacity) + .and_then(|height| height.checked_add(1)) + { + Some(height) => height, + None => return Ok(None), + }; + let max_height = self.chain().header_head()?.height; + let end_height = match start_height + .checked_add(segment_capacity) + .and_then(|height| height.checked_sub(1)) + { + Some(height) => std::cmp::min(height, max_height), + None => max_height, + }; + if start_height > end_height { + return Ok(Some(vec![])); + } + + let header_pmmr = self.chain().header_pmmr(); + let header_pmmr = header_pmmr.read(); + let mut headers = vec![]; + for h in start_height..=end_height { + if let Ok(hash) = header_pmmr.get_header_hash_by_height(h) { + headers.push(self.chain().get_block_header(&hash)?); + } else { + break; + } + } + Ok(Some(headers)) + } + /// Gets a full block by its hash. /// We only support v3 blocks since HF4. /// If a peer is requesting a block and only appears to support v2 @@ -610,6 +669,21 @@ where ) -> Result { self.queue_pibd_segment(PIBDSegment::Kernel(block_hash, segment), peer_info) } + + fn receive_header_segment( + &self, + id: SegmentIdentifier, + headers: &[core::BlockHeader], + peer_info: &PeerInfo, + ) -> Result { + debug!( + "ignoring PIHD header segment {:?} with {} headers from {}", + id, + headers.len(), + peer_info.addr + ); + Ok(HeaderSegmentAcceptance::Accepted) + } } impl NetToChainAdapter @@ -633,6 +707,7 @@ where peers: OneTime::new(), config, hooks, + header_segment_requests: RwLock::new(HashMap::new()), tx, }; adapter.spawn_net_adapter_worker(Arc::downgrade(&chain), rx); @@ -772,6 +847,22 @@ where } } + fn header_segment_request_allowed(&self, peer_addr: SocketAddr) -> bool { + let now = Utc::now(); + let cutoff = now - Duration::seconds(HEADER_SEGMENT_REQUEST_WINDOW_SECS); + let mut requests = self.header_segment_requests.write(); + requests.retain(|_, (window_start, _)| *window_start > cutoff); + let entry = requests.entry(peer_addr).or_insert((now, 0)); + if now > entry.0 + Duration::seconds(HEADER_SEGMENT_REQUEST_WINDOW_SECS) { + *entry = (now, 0); + } + if entry.1 >= MAX_HEADER_SEGMENT_REQUESTS_PER_WINDOW { + return false; + } + entry.1 += 1; + true + } + // Find the first locator hash that refers to a known header on our main chain. fn find_common_header(&self, locator: &[Hash]) -> Option { let header_pmmr = self.chain().header_pmmr();