diff --git a/zerofs/ninep-client/src/lib.rs b/zerofs/ninep-client/src/lib.rs index a7e97a4f..27231885 100644 --- a/zerofs/ninep-client/src/lib.rs +++ b/zerofs/ninep-client/src/lib.rs @@ -72,6 +72,15 @@ const LIVENESS_WINDOW: Duration = Duration::from_secs(3); /// this op (reads fine, writes stuck): after the cap we tear down and re-probe /// regardless. 7 extra windows plus the initial one is about a minute. const MAX_LIVENESS_EXTENSIONS: u32 = 7; +/// Cap on replaying the whole recorded session (re-attach/rebind every fid, +/// re-open files, re-acquire locks) onto a fresh connection during reconnect. +/// `send_raw` has no per-reply timeout, so without this a server that accepts +/// the connection and negotiates but then stalls on a replayed request (e.g. its +/// store is not ready yet right after a restart) would wedge the supervisor +/// forever, leaving every waiting op parked in `wait_until_live`. On elapse the +/// half-built connection is torn down and the supervisor retries with backoff. +/// Generous so a large session over a slow link still replays in one pass. +const REPLAY_TIMEOUT: Duration = Duration::from_secs(30); #[derive(Debug)] pub enum ClientError { @@ -576,10 +585,19 @@ impl NinePClient { self.msize.store(msize, Ordering::Relaxed); self.extensions.store(extensions, Ordering::Relaxed); - // Replay failure discards this connection; tear it down so its fd is not leaked. - if let Err(e) = self.replay(&conn).await { - conn.shutdown(); - return Err(e); + // Replay failure, or a stall past REPLAY_TIMEOUT, discards this connection: + // tear it down so its fd is not leaked and the supervisor reconnects afresh. + match tokio::time::timeout(REPLAY_TIMEOUT, self.replay(&conn)).await { + Ok(Ok(())) => {} + Ok(Err(e)) => { + conn.shutdown(); + return Err(e); + } + Err(_) => { + conn.shutdown(); + warn!("9P session replay stalled past {REPLAY_TIMEOUT:?}; retrying reconnect"); + return Err(ClientError::Disconnected); + } } let old = self.conn.swap(conn); old.dead.store(true, Ordering::Release); diff --git a/zerofs/src/replication/transport.rs b/zerofs/src/replication/transport.rs index 8076dfdd..2c3c5789 100644 --- a/zerofs/src/replication/transport.rs +++ b/zerofs/src/replication/transport.rs @@ -22,6 +22,15 @@ use proto::{ ReplicateRequest, ReplicateResponse, }; +/// Bounds a gRPC dial so a peer that accepts the TCP connection but never +/// finishes the HTTP/2 handshake cannot hang the caller forever (leader startup +/// `hello_peer`, the heartbeat sender, or the shipping reconnect loop). Without +/// it a half-open standby silently wedges the leader's control plane. +const CONNECT_TIMEOUT: Duration = Duration::from_secs(5); +/// Bounds a unary control RPC (Hello) so a peer that accepts the call but never +/// answers does not stall the leader's startup role election. +const RPC_TIMEOUT: Duration = Duration::from_secs(10); + fn to_repl_ops(ops: Vec) -> Vec { ops.into_iter() .map(|o| { @@ -187,7 +196,10 @@ pub struct ReplicationSender { impl ReplicationSender { pub async fn connect(endpoint: String) -> anyhow::Result { - let client = ReplicationServiceClient::connect(endpoint).await?; + let client = + tokio::time::timeout(CONNECT_TIMEOUT, ReplicationServiceClient::connect(endpoint)) + .await + .map_err(|_| anyhow::anyhow!("replication dial timed out"))??; Ok(Self { client: Mutex::new(client), }) @@ -216,8 +228,13 @@ impl ReplicationSender { /// Ask a peer (Hello) whether the caller should defer instead of opening as /// writer: true if the peer is leading or holds an un-replayed tail. pub async fn hello_peer(endpoint: String) -> anyhow::Result { - let mut client = ReplicationServiceClient::connect(endpoint).await?; - let resp = client.hello(HelloRequest {}).await?; + let mut client = + tokio::time::timeout(CONNECT_TIMEOUT, ReplicationServiceClient::connect(endpoint)) + .await + .map_err(|_| anyhow::anyhow!("Hello dial timed out"))??; + let resp = tokio::time::timeout(RPC_TIMEOUT, client.hello(HelloRequest {})) + .await + .map_err(|_| anyhow::anyhow!("Hello RPC timed out"))??; Ok(resp.into_inner().peer_active) } @@ -228,7 +245,12 @@ pub async fn run_heartbeat_sender( epoch: u64, interval: Duration, ) -> anyhow::Result<()> { - let mut client = ReplicationServiceClient::connect(endpoint).await?; + // Only the dial is bounded; the heartbeat stream itself is long-lived by + // design (it runs until the connection breaks). + let mut client = + tokio::time::timeout(CONNECT_TIMEOUT, ReplicationServiceClient::connect(endpoint)) + .await + .map_err(|_| anyhow::anyhow!("heartbeat dial timed out"))??; let beats = futures::stream::unfold(true, move |first| async move { // First beat immediately (so a watching standby observes the leader at // once); pace the rest by `interval`.