Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions zerofs/ninep-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ const LIVENESS_WINDOW: Duration = Duration::from_secs(3);
/// this op (reads fine, writes stuck): after the cap we tear down and re-probe
/// regardless. 7 extra windows plus the initial one is about a minute.
const MAX_LIVENESS_EXTENSIONS: u32 = 7;
/// Cap on replaying the whole recorded session (re-attach/rebind every fid,
/// re-open files, re-acquire locks) onto a fresh connection during reconnect.
/// `send_raw` has no per-reply timeout, so without this a server that accepts
/// the connection and negotiates but then stalls on a replayed request (e.g. its
/// store is not ready yet right after a restart) would wedge the supervisor
/// forever, leaving every waiting op parked in `wait_until_live`. On elapse the
/// half-built connection is torn down and the supervisor retries with backoff.
/// Generous so a large session over a slow link still replays in one pass.
const REPLAY_TIMEOUT: Duration = Duration::from_secs(30);

#[derive(Debug)]
pub enum ClientError {
Expand Down Expand Up @@ -576,10 +585,19 @@ impl NinePClient {

self.msize.store(msize, Ordering::Relaxed);
self.extensions.store(extensions, Ordering::Relaxed);
// Replay failure discards this connection; tear it down so its fd is not leaked.
if let Err(e) = self.replay(&conn).await {
conn.shutdown();
return Err(e);
// Replay failure, or a stall past REPLAY_TIMEOUT, discards this connection:
// tear it down so its fd is not leaked and the supervisor reconnects afresh.
match tokio::time::timeout(REPLAY_TIMEOUT, self.replay(&conn)).await {
Ok(Ok(())) => {}
Ok(Err(e)) => {
conn.shutdown();
return Err(e);
}
Err(_) => {
conn.shutdown();
warn!("9P session replay stalled past {REPLAY_TIMEOUT:?}; retrying reconnect");
return Err(ClientError::Disconnected);
}
}
let old = self.conn.swap(conn);
old.dead.store(true, Ordering::Release);
Expand Down
30 changes: 26 additions & 4 deletions zerofs/src/replication/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@ use proto::{
ReplicateRequest, ReplicateResponse,
};

/// Bounds a gRPC dial so a peer that accepts the TCP connection but never
/// finishes the HTTP/2 handshake cannot hang the caller forever (leader startup
/// `hello_peer`, the heartbeat sender, or the shipping reconnect loop). Without
/// it a half-open standby silently wedges the leader's control plane.
const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
/// Bounds a unary control RPC (Hello) so a peer that accepts the call but never
/// answers does not stall the leader's startup role election.
const RPC_TIMEOUT: Duration = Duration::from_secs(10);

fn to_repl_ops(ops: Vec<ProtoOp>) -> Vec<ReplOp> {
ops.into_iter()
.map(|o| {
Expand Down Expand Up @@ -187,7 +196,10 @@ pub struct ReplicationSender {

impl ReplicationSender {
pub async fn connect(endpoint: String) -> anyhow::Result<Self> {
let client = ReplicationServiceClient::connect(endpoint).await?;
let client =
tokio::time::timeout(CONNECT_TIMEOUT, ReplicationServiceClient::connect(endpoint))
.await
.map_err(|_| anyhow::anyhow!("replication dial timed out"))??;
Ok(Self {
client: Mutex::new(client),
})
Expand Down Expand Up @@ -216,8 +228,13 @@ impl ReplicationSender {
/// Ask a peer (Hello) whether the caller should defer instead of opening as
/// writer: true if the peer is leading or holds an un-replayed tail.
pub async fn hello_peer(endpoint: String) -> anyhow::Result<bool> {
let mut client = ReplicationServiceClient::connect(endpoint).await?;
let resp = client.hello(HelloRequest {}).await?;
let mut client =
tokio::time::timeout(CONNECT_TIMEOUT, ReplicationServiceClient::connect(endpoint))
.await
.map_err(|_| anyhow::anyhow!("Hello dial timed out"))??;
let resp = tokio::time::timeout(RPC_TIMEOUT, client.hello(HelloRequest {}))
.await
.map_err(|_| anyhow::anyhow!("Hello RPC timed out"))??;
Ok(resp.into_inner().peer_active)
}

Expand All @@ -228,7 +245,12 @@ pub async fn run_heartbeat_sender(
epoch: u64,
interval: Duration,
) -> anyhow::Result<()> {
let mut client = ReplicationServiceClient::connect(endpoint).await?;
// Only the dial is bounded; the heartbeat stream itself is long-lived by
// design (it runs until the connection breaks).
let mut client =
tokio::time::timeout(CONNECT_TIMEOUT, ReplicationServiceClient::connect(endpoint))
.await
.map_err(|_| anyhow::anyhow!("heartbeat dial timed out"))??;
let beats = futures::stream::unfold(true, move |first| async move {
// First beat immediately (so a watching standby observes the leader at
// once); pace the rest by `interval`.
Expand Down
Loading