Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions p2p/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,8 @@ fn decode_message(
Type::RangeProofSegment => Message::RangeProofSegment(msg.body()?),
Type::GetKernelSegment => Message::GetKernelSegment(msg.body()?),
Type::KernelSegment => Message::KernelSegment(msg.body()?),
Type::GetHeaderSegment => Message::GetHeaderSegment(msg.body()?),
Type::HeaderSegment => Message::HeaderSegment(msg.body()?),
Type::Error | Type::Hand | Type::Shake | Type::Headers => {
return Err(Error::UnexpectedMessage)
}
Expand Down
1 change: 1 addition & 0 deletions p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,5 @@ pub use crate::store::{PeerData, State};
pub use crate::types::{
Capabilities, ChainAdapter, Direction, Error, P2PConfig, PeerAddr, PeerInfo, ReasonForBan,
Seeding, TxHashSetRead, MAX_BLOCK_HEADERS, MAX_LOCATORS, MAX_PEER_ADDRS,
PIHD_HEADER_SEGMENT_HEIGHT,
};
47 changes: 47 additions & 0 deletions p2p/src/msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ enum_from_primitive! {
RangeProofSegment = 26,
GetKernelSegment = 27,
KernelSegment = 28,
GetHeaderSegment = 29,
HeaderSegment = 30,
}
}

Expand Down Expand Up @@ -139,6 +141,8 @@ fn max_msg_size(msg_type: Type) -> u64 {
Type::RangeProofSegment => 2 * max_block_size(),
Type::GetKernelSegment => 41,
Type::KernelSegment => 2 * max_block_size(),
Type::GetHeaderSegment => 9,
Type::HeaderSegment => 11 + 365 * MAX_BLOCK_HEADERS as u64,
}
}

Expand Down Expand Up @@ -649,6 +653,45 @@ impl Writeable for Headers {
}
}

/// Serializable wrapper for a deterministic header segment.
pub struct HeaderSegment {
pub identifier: SegmentIdentifier,
pub headers: Vec<BlockHeader>,
}

impl Readable for HeaderSegment {
fn read<R: Reader>(reader: &mut R) -> Result<HeaderSegment, ser::Error> {
let identifier = SegmentIdentifier::read(reader)?;
let len = reader.read_u16()?;
if len > (MAX_BLOCK_HEADERS as u16) {
return Err(ser::Error::TooLargeReadErr);
}
let mut headers = Vec::with_capacity(len as usize);
for _ in 0..len {
let header: UntrustedBlockHeader = Readable::read(reader)?;
headers.push(header.into());
}
Ok(HeaderSegment {
identifier,
headers,
})
}
}

impl Writeable for HeaderSegment {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
if self.headers.len() > MAX_BLOCK_HEADERS as usize {
return Err(ser::Error::TooLargeReadErr);
}
self.identifier.write(writer)?;
writer.write_u16(self.headers.len() as u16)?;
for h in &self.headers {
h.write(writer)?;
}
Ok(())
}
}

pub struct Ping {
/// total difficulty accumulated by the sender, used to check whether sync
/// may be needed
Expand Down Expand Up @@ -926,6 +969,8 @@ pub enum Message {
RangeProofSegment(SegmentResponse<RangeProof>),
GetKernelSegment(SegmentRequest),
KernelSegment(SegmentResponse<TxKernel>),
GetHeaderSegment(SegmentIdentifier),
HeaderSegment(HeaderSegment),
}

/// We receive 512 headers from a peer.
Expand Down Expand Up @@ -970,6 +1015,8 @@ impl fmt::Display for Message {
Message::RangeProofSegment(_) => write!(f, "range proof segment"),
Message::GetKernelSegment(_) => write!(f, "get kernel segment"),
Message::KernelSegment(_) => write!(f, "kernel segment"),
Message::GetHeaderSegment(_) => write!(f, "get header segment"),
Message::HeaderSegment(_) => write!(f, "header segment"),
}
}
}
Expand Down
21 changes: 19 additions & 2 deletions p2p/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ use crate::msg::{
};
use crate::protocol::Protocol;
use crate::types::{
Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan,
TxHashSetRead,
Capabilities, ChainAdapter, Error, HeaderSegmentAcceptance, NetAdapter, P2PConfig, PeerAddr,
PeerInfo, ReasonForBan, TxHashSetRead,
};
use crate::util::secp::pedersen::RangeProof;
use chrono::prelude::{DateTime, Utc};
Expand Down Expand Up @@ -569,6 +569,14 @@ impl ChainAdapter for TrackingAdapter {
self.adapter.locate_headers(locator)
}

fn locate_header_segment(
&self,
id: SegmentIdentifier,
peer_info: &PeerInfo,
) -> Result<Option<Vec<core::BlockHeader>>, chain::Error> {
self.adapter.locate_header_segment(id, peer_info)
}

fn get_block(&self, h: Hash, peer_info: &PeerInfo) -> Option<core::Block> {
self.adapter.get_block(h, peer_info)
}
Expand Down Expand Up @@ -685,6 +693,15 @@ impl ChainAdapter for TrackingAdapter {
self.adapter
.receive_kernel_segment(block_hash, segment, peer_info)
}

fn receive_header_segment(
&self,
id: SegmentIdentifier,
headers: &[core::BlockHeader],
peer_info: &PeerInfo,
) -> Result<HeaderSegmentAcceptance, chain::Error> {
self.adapter.receive_header_segment(id, headers, peer_info)
}
}

impl NetAdapter for TrackingAdapter {
Expand Down
31 changes: 29 additions & 2 deletions p2p/src/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ use crate::msg::PeerAddrs;
use crate::peer::Peer;
use crate::store::{PeerData, PeerStore, State};
use crate::types::{
Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan,
TxHashSetRead, MAX_PEER_ADDRS,
Capabilities, ChainAdapter, Error, HeaderSegmentAcceptance, NetAdapter, P2PConfig, PeerAddr,
PeerInfo, ReasonForBan, TxHashSetRead, MAX_PEER_ADDRS,
};
use crate::util::secp::pedersen::RangeProof;
use chrono::prelude::*;
Expand Down Expand Up @@ -662,6 +662,14 @@ impl ChainAdapter for Peers {
self.adapter.locate_headers(hs)
}

fn locate_header_segment(
&self,
id: SegmentIdentifier,
peer_info: &PeerInfo,
) -> Result<Option<Vec<core::BlockHeader>>, chain::Error> {
self.adapter.locate_header_segment(id, peer_info)
}

fn get_block(&self, h: Hash, peer_info: &PeerInfo) -> Option<core::Block> {
self.adapter.get_block(h, peer_info)
}
Expand Down Expand Up @@ -820,6 +828,25 @@ impl ChainAdapter for Peers {
Ok(true)
}
}

fn receive_header_segment(
&self,
id: SegmentIdentifier,
headers: &[core::BlockHeader],
peer_info: &PeerInfo,
) -> Result<HeaderSegmentAcceptance, chain::Error> {
match self
.adapter
.receive_header_segment(id, headers, peer_info)?
{
HeaderSegmentAcceptance::Accepted => Ok(HeaderSegmentAcceptance::Accepted),
HeaderSegmentAcceptance::Ban => {
self.ban_peer(peer_info.addr, ReasonForBan::BadBlockHeader)
.map_err(|e| chain::Error::Other(format!("ban peer error: {:?}", e)))?;
Ok(HeaderSegmentAcceptance::Ban)
}
}
}
}

impl NetAdapter for Peers {
Expand Down
118 changes: 116 additions & 2 deletions p2p/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ use crate::conn::MessageHandler;
use crate::core::core::{hash::Hashed, CompactBlock};

use crate::msg::{
Consumed, Headers, Message, Msg, OutputBitmapSegmentResponse, OutputSegmentResponse, PeerAddrs,
Pong, SegmentRequest, SegmentResponse, TxHashSetArchive, Type,
Consumed, HeaderSegment, Headers, Message, Msg, OutputBitmapSegmentResponse,
OutputSegmentResponse, PeerAddrs, Pong, SegmentRequest, SegmentResponse, TxHashSetArchive,
Type,
};
use crate::types::{AttachmentMeta, Error, NetAdapter, PeerInfo};
use chrono::prelude::Utc;
Expand Down Expand Up @@ -192,6 +193,28 @@ impl MessageHandler for Protocol {
)?)
}

Message::GetHeaderSegment(identifier) => {
if !self
.peer_info
.capabilities
.contains(crate::types::Capabilities::PIHD_HIST)
{
return Ok(Consumed::None);
}
if let Some(headers) = adapter.locate_header_segment(identifier, &self.peer_info)? {
Consumed::Response(Msg::new(
Type::HeaderSegment,
HeaderSegment {
identifier,
headers,
},
self.peer_info.version,
)?)
} else {
Consumed::None
}
}

// "header first" block propagation - if we have not yet seen this block
// we can go request it from some of our peers
Message::Header(header) => {
Expand All @@ -204,6 +227,15 @@ impl MessageHandler for Protocol {
Consumed::None
}

Message::HeaderSegment(segment) => {
adapter.receive_header_segment(
segment.identifier,
&segment.headers,
&self.peer_info,
)?;
Consumed::None
}

Message::GetPeerAddrs(get_peers) => {
let peers = adapter.find_peer_addrs(get_peers.capabilities);
Consumed::Response(Msg::new(
Expand Down Expand Up @@ -431,3 +463,85 @@ impl MessageHandler for Protocol {
Ok(consumed)
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::conn::Tracker;
use crate::core::core::SegmentIdentifier;
use crate::core::global;
use crate::core::pow::Difficulty;
use crate::core::ser::ProtocolVersion;
use crate::msg::{read_message, write_message};
use crate::serv::DummyAdapter;
use crate::types::{
Capabilities, Direction, PeerAddr, PeerLiveInfo, PIHD_HEADER_SEGMENT_HEIGHT,
};
use crate::util::RwLock;
use std::io::Cursor;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::AtomicBool;

fn test_peer_info(capabilities: Capabilities) -> PeerInfo {
PeerInfo {
capabilities,
user_agent: "test".to_owned(),
version: ProtocolVersion::local(),
addr: PeerAddr(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 10000)),
direction: Direction::Outbound,
live_info: Arc::new(RwLock::new(PeerLiveInfo::new(Difficulty::zero()))),
}
}

#[test]
fn get_header_segment_returns_header_segment_response() {
global::set_local_chain_type(global::ChainTypes::AutomatedTesting);
let protocol = Protocol::new(
Arc::new(DummyAdapter {}),
test_peer_info(Capabilities::default()),
Arc::new(AtomicBool::new(false)),
);
let identifier = SegmentIdentifier {
height: PIHD_HEADER_SEGMENT_HEIGHT,
idx: 2,
};

let consumed = protocol
.consume(Message::GetHeaderSegment(identifier))
.expect("get header segment response");
let response = match consumed {
Consumed::Response(response) => response,
other => panic!("expected response, got {:?}", other),
};

let mut bytes = vec![];
write_message(&mut bytes, &response, Arc::new(Tracker::new())).expect("write response");
let segment: HeaderSegment = read_message(
&mut Cursor::new(bytes),
ProtocolVersion::local(),
Type::HeaderSegment,
)
.expect("read header segment response");

assert_eq!(segment.identifier, identifier);
assert!(segment.headers.is_empty());
}

#[test]
fn get_header_segment_requires_pihd_capability() {
let protocol = Protocol::new(
Arc::new(DummyAdapter {}),
test_peer_info(Capabilities::HEADER_HIST),
Arc::new(AtomicBool::new(false)),
);

let consumed = protocol
.consume(Message::GetHeaderSegment(SegmentIdentifier {
height: PIHD_HEADER_SEGMENT_HEIGHT,
idx: 0,
}))
.expect("get header segment handling");

assert!(matches!(consumed, Consumed::None));
}
}
20 changes: 18 additions & 2 deletions p2p/src/serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ use crate::peer::Peer;
use crate::peers::Peers;
use crate::store::PeerStore;
use crate::types::{
Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan,
TxHashSetRead,
Capabilities, ChainAdapter, Error, HeaderSegmentAcceptance, NetAdapter, P2PConfig, PeerAddr,
PeerInfo, ReasonForBan, TxHashSetRead,
};
use crate::util::secp::pedersen::RangeProof;
use crate::util::StopState;
Expand Down Expand Up @@ -362,6 +362,13 @@ impl ChainAdapter for DummyAdapter {
fn locate_headers(&self, _: &[Hash]) -> Result<Vec<core::BlockHeader>, chain::Error> {
Ok(vec![])
}
fn locate_header_segment(
&self,
_: SegmentIdentifier,
_: &PeerInfo,
) -> Result<Option<Vec<core::BlockHeader>>, chain::Error> {
Ok(Some(vec![]))
}
fn get_block(&self, _: Hash, _: &PeerInfo) -> Option<core::Block> {
None
}
Expand Down Expand Up @@ -472,6 +479,15 @@ impl ChainAdapter for DummyAdapter {
) -> Result<bool, chain::Error> {
unimplemented!()
}

fn receive_header_segment(
&self,
_id: SegmentIdentifier,
_headers: &[core::BlockHeader],
_peer_info: &PeerInfo,
) -> Result<HeaderSegmentAcceptance, chain::Error> {
Ok(HeaderSegmentAcceptance::Accepted)
}
}

impl NetAdapter for DummyAdapter {
Expand Down
Loading
Loading