From e84c79b5c2c760c4b22551e4af0c3f8c177a72d9 Mon Sep 17 00:00:00 2001 From: Frando Date: Mon, 22 Jun 2026 10:42:19 +0200 Subject: [PATCH 1/7] feat: add daemon for many named tunnels over one endpoint Add a `dumbpipe daemon` subcommand that runs any number of incoming and outgoing tunnels over a single iroh endpoint, driven by a TOML config. - `[[connect]]` entries expose a local TCP port that forwards to a remote endpoint under a name; `[[accept]]` entries forward incoming named streams to a local TCP backend selected by name. - A "named" handshake (prefix, u32 length, name) routes streams, so one endpoint can multiplex several tunnels. - `remote` accepts a bare endpoint id or a full ticket; ticket address hints are seeded via a static address lookup alongside discovery. - The connect side reuses one pooled iroh connection per remote across TCP streams and retries a failed connect once. - The secret key is read from IROH_SECRET, else a hex key file under the data dir, else generated and persisted. Config and secret default to /dumbpipe/daemon/. Documented in docs/daemon.md and linked from the README. --- Cargo.lock | 138 +++++++++++-- Cargo.toml | 4 + README.md | 25 +++ docs/daemon.md | 143 ++++++++++++++ src/daemon.rs | 525 +++++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 14 ++ src/main.rs | 39 +++- 7 files changed, 866 insertions(+), 22 deletions(-) create mode 100644 docs/daemon.md create mode 100644 src/daemon.rs diff --git a/Cargo.lock b/Cargo.lock index 4b589cc..469bfab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -97,7 +97,7 @@ version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" dependencies = [ - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -108,7 +108,7 @@ checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" dependencies = [ "anstyle", "once_cell_polyfill", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -720,6 +720,27 @@ dependencies = [ "crypto-common 0.2.2", ] +[[package]] +name = "dirs" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3e8aa94d75141228480295a7d0e7feb620b1a5ad9f12bc40be62411e38cce4e" +dependencies = [ + "dirs-sys", +] + +[[package]] +name = "dirs-sys" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e01a3366d27ee9890022452ee61b2b63a67e6f13f58900b651ff5665f0bb1fab" +dependencies = [ + "libc", + "option-ext", + "redox_users", + "windows-sys 0.61.2", +] + [[package]] name = "dispatch2" version = "0.3.1" @@ -772,17 +793,21 @@ version = "0.39.0" dependencies = [ "clap", "data-encoding", + "dirs", "duct", "hex", "iroh", "iroh-tickets", + "iroh-util", "n0-error", "nix", "noq", "rand", + "serde", "tempfile", "tokio", "tokio-util", + "toml", "tracing", "tracing-subscriber", ] @@ -856,7 +881,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -1738,6 +1763,20 @@ dependencies = [ "serde", ] +[[package]] +name = "iroh-util" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20e41eb982f15230c55f0a70a74a514360e1f565b07861924fd0e8db172b3d00" +dependencies = [ + "derive_more", + "iroh", + "n0-error", + "n0-future", + "tokio", + "tracing", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.2" @@ -1860,6 +1899,15 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6d2cec3eae94f9f509c767b45932f1ada8350c4bdb85af2fcab4a3c14807981" +[[package]] +name = "libredox" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f02ab6bace2054fb888a3c16f990117b579d14a3088e472d63c6011fa185c9d3" +dependencies = [ + "libc", +] + [[package]] name = "linux-raw-sys" version = "0.12.1" @@ -2229,7 +2277,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -2396,6 +2444,12 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" +[[package]] +name = "option-ext" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" + [[package]] name = "os_pipe" version = "1.2.3" @@ -2403,7 +2457,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7d8fae84b431384b68627d0f9b3b1245fcf9f46f6c0e3dc902e9dce64edd1967" dependencies = [ "libc", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -2684,6 +2738,17 @@ dependencies = [ "bitflags", ] +[[package]] +name = "redox_users" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4e608c6638b9c18977b00b475ac1f28d14e84b27d8d42f70e0bf1e3dec127ac" +dependencies = [ + "getrandom 0.2.17", + "libredox", + "thiserror 2.0.18", +] + [[package]] name = "regex-automata" version = "0.4.14" @@ -2783,7 +2848,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -2841,7 +2906,7 @@ dependencies = [ "security-framework", "security-framework-sys", "webpki-root-certs", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -2933,7 +2998,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b55fb86dfd3a2f5f76ea78310a88f96c4ea21a3031f8d212443d56123fd0521" dependencies = [ "libc", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -3001,6 +3066,15 @@ dependencies = [ "zmij", ] +[[package]] +name = "serde_spanned" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6662b5879511e06e8999a8a235d848113e942c9124f211511b16466ee2995f26" +dependencies = [ + "serde_core", +] + [[package]] name = "serdect" version = "0.4.3" @@ -3147,7 +3221,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" dependencies = [ "libc", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -3299,7 +3373,7 @@ dependencies = [ "getrandom 0.4.2", "once_cell", "rustix", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -3497,6 +3571,30 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "toml" +version = "0.9.12+spec-1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf92845e79fc2e2def6a5d828f0801e29a2f8acc037becc5ab08595c7d5e9863" +dependencies = [ + "indexmap", + "serde_core", + "serde_spanned", + "toml_datetime 0.7.5+spec-1.1.0", + "toml_parser", + "toml_writer", + "winnow 0.7.15", +] + +[[package]] +name = "toml_datetime" +version = "0.7.5+spec-1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92e1cfed4a3038bc5a127e35a2d360f145e1f4b971b551a2ba5fd7aedf7e1347" +dependencies = [ + "serde_core", +] + [[package]] name = "toml_datetime" version = "1.1.1+spec-1.1.0" @@ -3513,9 +3611,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b59c4d22ed448339746c59b905d24568fcbb3ab65a500494f7b8c3e97739f2b" dependencies = [ "indexmap", - "toml_datetime", + "toml_datetime 1.1.1+spec-1.1.0", "toml_parser", - "winnow", + "winnow 1.0.3", ] [[package]] @@ -3524,9 +3622,15 @@ version = "1.1.2+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2abe9b86193656635d2411dc43050282ca48aa31c2451210f4202550afb7526" dependencies = [ - "winnow", + "winnow 1.0.3", ] +[[package]] +name = "toml_writer" +version = "1.1.1+spec-1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "756daf9b1013ebe47a8776667b466417e2d4c5679d441c26230efd9ef78692db" + [[package]] name = "tower" version = "0.5.3" @@ -3976,7 +4080,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -4328,6 +4432,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" +[[package]] +name = "winnow" +version = "0.7.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df79d97927682d2fd8adb29682d1140b343be4ac0f08fd68b7765d9c059d3945" + [[package]] name = "winnow" version = "1.0.3" diff --git a/Cargo.toml b/Cargo.toml index b53e129..2041d73 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ clap = { version = "4.4.10", features = ["derive"] } hex = "0.4.3" iroh = { version = "1.0.0", default-features = false, features = ["tls-ring"] } iroh-tickets = "1.0.0" +iroh-util = "0.6.0" noq = "1.0.0" tokio = { version = "1.34.0", features = ["full"] } tokio-util = "0.7.10" @@ -25,6 +26,9 @@ tracing = "0.1.40" tracing-subscriber = { version = "0.3.20", features = ["env-filter"] } data-encoding = "2.9.0" n0-error = "1.0.0" +dirs = "6.0.0" +toml = "0.9.5" +serde = { version = "1.0.228", features = ["derive"] } [dev-dependencies] duct = "1.1.1" diff --git a/README.md b/README.md index 56b7d35..446396a 100644 --- a/README.md +++ b/README.md @@ -145,6 +145,31 @@ ZELLIJ_SOCKET_DIR=/tmp/zj-remote zellij attach remote-task-1234 # Advanced features +## Daemon: many named tunnels from one config + +The `dumbpipe daemon` subcommand runs any number of incoming and outgoing +tunnels over a single endpoint, driven by a TOML config file. Each `connect` +entry exposes a local TCP port that forwards to a remote endpoint under a name, +and each `accept` entry forwards incoming named streams to a local TCP backend. + +```toml +[[connect]] +remote = "" +name = "web" +addr = "127.0.0.1:8080" + +[[accept]] +name = "web" +addr = "localhost:3000" +``` + +``` +dumbpipe daemon -c config.toml +``` + +See [docs/daemon.md](docs/daemon.md) for the config format, key handling, and +details. + ## Combining Listeners You can mix and match listeners. For example, forward from a remote Unix socket to a local TCP port: diff --git a/docs/daemon.md b/docs/daemon.md new file mode 100644 index 0000000..9f13431 --- /dev/null +++ b/docs/daemon.md @@ -0,0 +1,143 @@ +# Dumbpipe daemon + +The daemon runs many named tunnels over a single iroh endpoint, driven by a +config file. It is the multi-tunnel counterpart to the single-purpose +`listen-tcp` and `connect-tcp` subcommands: instead of one process per tunnel, +one daemon serves any number of incoming and outgoing tunnels from one endpoint +(one identity). + +``` +dumbpipe daemon [-c ] +``` + +## Config file + +The config path is taken from `-c/--config` if given, and otherwise defaults to +`/dumbpipe/daemon/daemon.toml`, where `` is the platform +data directory (`~/.local/share` on Linux, `~/Library/Application Support` on +macOS; see the [`dirs`](https://crates.io/crates/dirs) crate). + +The config has two kinds of entries, each repeatable: + +```toml +# Expose a local TCP port that forwards to a remote endpoint under a name. +[[connect]] +remote = "" +name = "boo" +addr = "localhost:13414" + +# Forward incoming streams with a given name to a local TCP backend. +[[accept]] +name = "foo" +addr = "localhost:31231" + +[[accept]] +name = "bar" +addr = "10.0.0.3:80" +``` + +A `connect` entry binds `addr` as a local TCP listener. Every accepted socket is +forwarded to the endpoint `remote`, tagging the stream with `name`. An `accept` +entry registers a backend: an incoming stream tagged with `name` is forwarded to +that entry's `addr`. The two sides are wired together by matching names, so the +`connect` side and the `accept` side of one tunnel must agree on the name. + +`addr` values are resolved at bind and connect time, so host names such as +`localhost:13414` work as well as literal socket addresses. + +### The `remote` field + +`remote` accepts either form: + +- A bare endpoint id (hex or base32). The remote is then resolved through + discovery, exactly like a dumbpipe short ticket. +- A full endpoint ticket that also carries relay and direct-address hints. The + hints are seeded into the endpoint's address book (see + [Connection handling](#connection-handling)), so the remote can be reached + without waiting on discovery. + +## Secret key + +The daemon uses a stable identity so that its endpoint id does not change across +restarts. The secret key is resolved in this order: + +1. The `IROH_SECRET` environment variable (hex or base32), if set. +2. `/dumbpipe/daemon/secret.key`, a 32-byte lowercase hex key. +3. Otherwise a fresh key is generated and written to `secret.key` for next time. + +## Named handshake + +A single endpoint multiplexes several tunnels, so each stream must say which +backend it is for. On a fresh bidi stream the connecting side writes a named +handshake before any data: + +``` +"named" (5 bytes) || name length (u32, big-endian) || name (UTF-8) +``` + +The accepting side reads the prefix, the length (capped to bound allocation), +and the name, then routes the stream to the matching `accept` backend. A stream +whose name has no matching backend is dropped with a warning. This handshake +replaces the plain `"hello"` handshake that the single-tunnel subcommands use. + +## Connection handling + +The connect side reuses one iroh connection per remote across TCP streams, +rather than dialing afresh for every connection. Connections are managed by a +shared pool keyed by endpoint id; a connection is kept warm for a short idle +period and reused by later streams, then closed once unused. If a connect +attempt fails, it is retried once, which covers a transient discovery or relay +hiccup. + +Because the pool connects by endpoint id, relay and direct-address hints from a +ticket-form `remote` are registered with the endpoint as a static address +lookup at startup. This runs alongside the default discovery, so a ticket's +hints let the daemon connect immediately while bare ids fall back to discovery. + +## Shutdown + +The daemon runs until interrupted with ctrl-c, at which point it stops accepting +new connections, cancels its listeners, and closes the endpoint. + +## Example + +Forward a web server running on one machine to a local port on another, plus a +second tunnel for SSH, all from one daemon on each side. + +On the server machine, `server.toml`: + +```toml +[[accept]] +name = "web" +addr = "localhost:3000" + +[[accept]] +name = "ssh" +addr = "localhost:22" +``` + +``` +dumbpipe daemon -c server.toml +# logs: daemon endpoint bound endpoint_id= +``` + +On the client machine, `client.toml` (using the server's endpoint id): + +```toml +[[connect]] +remote = "" +name = "web" +addr = "127.0.0.1:8080" + +[[connect]] +remote = "" +name = "ssh" +addr = "127.0.0.1:2222" +``` + +``` +dumbpipe daemon -c client.toml +``` + +The client can now reach the server's web server at `127.0.0.1:8080` and its +SSH server at `127.0.0.1:2222`, both over a single reused iroh connection. diff --git a/src/daemon.rs b/src/daemon.rs new file mode 100644 index 0000000..f3eda37 --- /dev/null +++ b/src/daemon.rs @@ -0,0 +1,525 @@ +//! The dumbpipe daemon. +//! +//! The daemon runs many named tunnels over a single iroh endpoint, driven by a +//! TOML config file. It is the multi-tunnel counterpart to the single-purpose +//! `listen-tcp` / `connect-tcp` subcommands. +//! +//! A `[[connect]]` entry binds a local TCP port and forwards every accepted +//! socket to a remote endpoint under a name. A `[[accept]]` entry forwards +//! incoming named streams to a local TCP backend selected by that name. Because +//! one endpoint serves several tunnels, streams carry a name: see +//! [`dumbpipe::HANDSHAKE_NAMED`]. + +use std::{ + collections::HashMap, + path::{Path, PathBuf}, + str::FromStr, + sync::Arc, + time::Duration, +}; + +use clap::Parser; +use dumbpipe::{EndpointTicket, HANDSHAKE_NAMED}; +use iroh::{address_lookup::MemoryLookup, EndpointAddr, EndpointId, SecretKey}; +use iroh_util::connection_pool::{ConnectionPool, ConnectionRef, Options}; +use n0_error::{bail_any, ensure_any, Result, StdResultExt}; +use serde::Deserialize; +use tokio::{ + io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, + net::{TcpListener, TcpStream}, + select, + time::timeout, +}; +use tokio_util::{sync::CancellationToken, task::AbortOnDropHandle}; +use tracing::{debug, info, warn}; + +use crate::CommonArgs; + +/// The maximum length of a tunnel name, in bytes. +/// +/// The accepting side reads the name length from the wire before allocating the +/// name buffer, so this bound keeps a malicious or buggy peer from requesting a +/// huge allocation. +const MAX_NAME_LEN: usize = 1024; + +/// How long the connection pool keeps an idle connection before closing it. +/// +/// Connect tunnels reuse one iroh connection per remote across many TCP +/// streams; this keeps the connection warm between bursts of streams. +const POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(30); + +/// How long the connection pool waits for a connect to complete. +/// +/// Connecting by endpoint id alone goes through discovery and a relay, so this +/// is generous compared to the pool's one-second default. +const POOL_CONNECT_TIMEOUT: Duration = Duration::from_secs(15); + +/// Arguments for the `daemon` subcommand. +#[derive(Parser, Debug)] +pub struct DaemonArgs { + /// Path to the daemon config file. + /// + /// Defaults to `/dumbpipe/daemon/daemon.toml`, where `data_dir` + /// is the platform data directory (see the `dirs` crate). + #[clap(short = 'c', long)] + pub config: Option, + + #[clap(flatten)] + pub common: CommonArgs, +} + +/// A remote endpoint reference in the config. +/// +/// Accepts either a bare [`EndpointId`] (hex or base32) or a full +/// [`EndpointTicket`] string that also carries relay and address hints. +#[derive(Debug, Clone)] +struct Remote(EndpointAddr); + +impl<'de> Deserialize<'de> for Remote { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + // A ticket is the more specific form, so try it first and fall back to + // a bare endpoint id. + if let Ok(ticket) = EndpointTicket::from_str(&s) { + return Ok(Self(ticket.endpoint_addr().clone())); + } + let id = EndpointId::from_str(&s).map_err(serde::de::Error::custom)?; + Ok(Self(EndpointAddr::from(id))) + } +} + +/// The daemon config, parsed from TOML. +#[derive(Debug, Clone, Deserialize)] +struct Config { + /// Outgoing tunnels: local TCP port -> remote endpoint, under a name. + #[serde(default)] + connect: Vec, + /// Incoming tunnels: named stream -> local TCP backend. + #[serde(default)] + accept: Vec, +} + +/// A single `[[connect]]` entry. +#[derive(Debug, Clone, Deserialize)] +struct ConnectConfig { + /// The remote endpoint to forward to. + remote: Remote, + /// The name sent in the handshake, used by the remote to route the stream. + name: String, + /// The local TCP address to listen on. + addr: String, +} + +/// A single `[[accept]]` entry. +#[derive(Debug, Clone, Deserialize)] +struct AcceptConfig { + /// The name that selects this backend. + name: String, + /// The local TCP backend to forward matching streams to. + addr: String, +} + +/// Runs the daemon until interrupted with ctrl-c. +pub(crate) async fn run(args: DaemonArgs) -> Result<()> { + let dir = daemon_dir()?; + let config_path = args + .config + .clone() + .unwrap_or_else(|| dir.join("daemon.toml")); + let config = load_config(&config_path)?; + let secret_key = load_or_create_secret(&dir)?; + + // Seed a static address lookup with the relay and direct-address hints from + // ticket-form remotes. The connection pool connects by endpoint id alone, so + // without this those hints would be lost and only discovery could resolve the + // remote. Bare-id remotes carry no hints and are left to discovery. + let address_lookup = static_address_lookup(&config.connect); + + // The daemon always speaks the dumbpipe ALPN with the named handshake, so + // it ignores `--custom-alpn`. It still honors the bind-address options. + let endpoint = crate::create_endpoint( + secret_key, + &args.common, + vec![dumbpipe::ALPN.to_vec()], + address_lookup, + ) + .await?; + info!(endpoint_id = %endpoint.id(), "daemon endpoint bound"); + + if timeout(crate::ONLINE_TIMEOUT, endpoint.online()) + .await + .is_err() + { + warn!("failed to connect to the home relay"); + } + + if config.connect.is_empty() && config.accept.is_empty() { + warn!("config has no [[connect]] or [[accept]] entries"); + } + + // Route incoming streams by name to a backend address. Reject duplicate + // names up front: a duplicate would silently shadow an earlier backend. + let mut routes: HashMap = HashMap::new(); + for accept in &config.accept { + if routes + .insert(accept.name.clone(), accept.addr.clone()) + .is_some() + { + bail_any!("duplicate accept name {:?}", accept.name); + } + info!(name = %accept.name, addr = %accept.addr, "accept route"); + } + + let token = CancellationToken::new(); + // Hold the tasks as abort-on-drop handles so they stop when `run` returns. + let mut tasks: Vec> = Vec::new(); + + // One pool shared by all connect tunnels. It reuses a single connection per + // remote endpoint across TCP streams, keyed by endpoint id, instead of + // dialing afresh for every stream. + let pool = ConnectionPool::new( + endpoint.clone(), + dumbpipe::ALPN, + Options { + idle_timeout: POOL_IDLE_TIMEOUT, + connect_timeout: POOL_CONNECT_TIMEOUT, + ..Default::default() + }, + ); + + for connect in config.connect { + // Bind before spawning so a bad address fails startup loudly instead of + // disappearing into a background task's log. + let listener = TcpListener::bind(&connect.addr) + .await + .with_std_context(|_| format!("failed to bind {}", connect.addr))?; + info!(addr = %connect.addr, remote = %connect.remote.0.id, name = %connect.name, "connect listening"); + let pool = pool.clone(); + let token = token.child_token(); + tasks.push(AbortOnDropHandle::new(tokio::spawn(async move { + if let Err(cause) = run_connect(pool, listener, connect, token).await { + warn!("connect listener stopped: {cause}"); + } + }))); + } + + if !routes.is_empty() { + let endpoint = endpoint.clone(); + let routes = Arc::new(routes); + let token = token.child_token(); + tasks.push(AbortOnDropHandle::new(tokio::spawn(async move { + run_accept(endpoint, routes, token).await; + }))); + } + + tokio::signal::ctrl_c().await.anyerr()?; + info!("got ctrl-c, shutting down"); + token.cancel(); + endpoint.close().await; + Ok(()) +} + +/// Builds a static address lookup from the connect remotes that carry hints. +/// +/// Returns `None` if no remote has relay or direct-address hints, in which case +/// the endpoint relies on discovery alone. +fn static_address_lookup(connect: &[ConnectConfig]) -> Option { + let hints: Vec = connect + .iter() + .map(|c| c.remote.0.clone()) + .filter(|addr| !addr.is_empty()) + .collect(); + (!hints.is_empty()).then(|| MemoryLookup::from_endpoint_info(hints)) +} + +/// Returns the daemon data directory, `/dumbpipe/daemon`. +fn daemon_dir() -> Result { + let Some(data_dir) = dirs::data_dir() else { + bail_any!("could not determine the platform data directory"); + }; + Ok(data_dir.join("dumbpipe").join("daemon")) +} + +/// Loads and parses the config file at `path`. +fn load_config(path: &Path) -> Result { + let contents = std::fs::read_to_string(path) + .with_std_context(|_| format!("failed to read config {}", path.display()))?; + toml::from_str(&contents) + .with_std_context(|_| format!("failed to parse config {}", path.display())) +} + +/// Loads the endpoint secret key, generating and persisting one if needed. +/// +/// `IROH_SECRET` takes preference. Otherwise the key is read from +/// `/secret.key` (32-byte lowercase hex), and if that file does not exist +/// a fresh key is generated and written there. +fn load_or_create_secret(dir: &Path) -> Result { + if let Ok(secret) = std::env::var("IROH_SECRET") { + return SecretKey::from_str(&secret).std_context("invalid IROH_SECRET"); + } + let path = dir.join("secret.key"); + match std::fs::read_to_string(&path) { + Ok(contents) => SecretKey::from_str(contents.trim()) + .with_std_context(|_| format!("invalid secret key in {}", path.display())), + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + let key = SecretKey::generate(); + std::fs::create_dir_all(dir) + .with_std_context(|_| format!("failed to create {}", dir.display()))?; + let hex = data_encoding::HEXLOWER.encode(&key.to_bytes()); + std::fs::write(&path, &hex) + .with_std_context(|_| format!("failed to write {}", path.display()))?; + info!(path = %path.display(), "generated new secret key"); + Ok(key) + } + Err(e) => Err(e).with_std_context(|_| format!("failed to read {}", path.display())), + } +} + +/// Listens on a local TCP port and forwards each socket to the remote endpoint. +async fn run_connect( + pool: ConnectionPool, + listener: TcpListener, + config: ConnectConfig, + token: CancellationToken, +) -> Result<()> { + let remote_id = config.remote.0.id; + let name = Arc::new(config.name); + loop { + let (tcp_stream, peer) = select! { + res = listener.accept() => res.std_context("error accepting tcp connection")?, + _ = token.cancelled() => break, + }; + let pool = pool.clone(); + let name = name.clone(); + tokio::spawn(async move { + if let Err(cause) = handle_connect(&pool, remote_id, &name, tcp_stream).await { + warn!("error forwarding tcp connection from {peer}: {cause}"); + } + }); + } + Ok(()) +} + +/// Forwards a single accepted TCP socket to the remote endpoint. +/// +/// The iroh connection comes from the shared pool, so concurrent and successive +/// streams to the same remote reuse one connection. The [`ConnectionRef`] is +/// held until forwarding finishes, which keeps the pool from closing the +/// connection while it is in use. +async fn handle_connect( + pool: &ConnectionPool, + remote_id: EndpointId, + name: &str, + tcp_stream: TcpStream, +) -> Result<()> { + let (tcp_recv, tcp_send) = tcp_stream.into_split(); + let connection = get_connection(pool, remote_id).await?; + let (mut endpoint_send, endpoint_recv) = connection + .open_bi() + .await + .std_context("error opening bidi stream")?; + write_named_handshake(&mut endpoint_send, name).await?; + crate::forward_bidi(tcp_recv, tcp_send, endpoint_recv, endpoint_send).await?; + Ok(()) +} + +/// Gets a pooled connection to `remote_id`, retrying the connect once on error. +/// +/// The pool reuses a live connection if it has one. On a connect failure it +/// retries a single time, which covers a transient discovery or relay hiccup +/// and a connection that was evicted just as it was requested. +async fn get_connection(pool: &ConnectionPool, remote_id: EndpointId) -> Result { + match pool.get_or_connect(remote_id).await { + Ok(connection) => Ok(connection), + Err(cause) => { + warn!(remote = %remote_id, "connect failed, retrying once: {cause}"); + pool.get_or_connect(remote_id) + .await + .with_std_context(|_| format!("error connecting to {remote_id}")) + } + } +} + +/// Accepts incoming endpoint connections and routes their streams by name. +async fn run_accept( + endpoint: iroh::Endpoint, + routes: Arc>, + token: CancellationToken, +) { + loop { + let incoming = select! { + incoming = endpoint.accept() => incoming, + _ = token.cancelled() => break, + }; + let Some(incoming) = incoming else { + break; + }; + let Ok(accepting) = incoming.accept() else { + continue; + }; + let routes = routes.clone(); + tokio::spawn(async move { + if let Err(cause) = handle_connection(accepting, routes).await { + warn!("error handling incoming connection: {cause}"); + } + }); + } +} + +/// Accepts every bidi stream on one incoming connection and routes each by name. +/// +/// A connecting daemon reuses one connection for many TCP streams, so each +/// stream arrives as a separate bidi stream that must be accepted in turn. +async fn handle_connection( + accepting: iroh::endpoint::Accepting, + routes: Arc>, +) -> Result<()> { + let connection = accepting.await.std_context("error accepting connection")?; + let remote_id = connection.remote_id(); + loop { + let (send, recv) = match connection.accept_bi().await { + Ok(stream) => stream, + // The remote closing the connection ends the stream loop normally. + Err(cause) => { + debug!(%remote_id, "connection closed: {cause}"); + break; + } + }; + let routes = routes.clone(); + tokio::spawn(async move { + if let Err(cause) = handle_stream(send, recv, &routes, remote_id).await { + warn!(%remote_id, "error handling stream: {cause}"); + } + }); + } + Ok(()) +} + +/// Reads the named handshake from one incoming stream and forwards it. +async fn handle_stream( + send: noq::SendStream, + mut recv: noq::RecvStream, + routes: &HashMap, + remote_id: EndpointId, +) -> Result<()> { + let name = read_named_handshake(&mut recv).await?; + let Some(addr) = routes.get(&name) else { + warn!(%remote_id, %name, "no route for name, dropping stream"); + return Ok(()); + }; + info!(%remote_id, %name, %addr, "forwarding named stream"); + let backend = TcpStream::connect(addr) + .await + .with_std_context(|_| format!("error connecting to backend {addr}"))?; + let (backend_recv, backend_send) = backend.into_split(); + crate::forward_bidi(backend_recv, backend_send, recv, send).await?; + Ok(()) +} + +/// Writes the named handshake: prefix, name length (`u32` big-endian), name. +async fn write_named_handshake(send: &mut W, name: &str) -> Result<()> { + ensure_any!( + name.len() <= MAX_NAME_LEN, + "name too long: {} bytes", + name.len() + ); + // The bound above keeps this well within u32 range. + let len = name.len() as u32; + send.write_all(&HANDSHAKE_NAMED).await.anyerr()?; + send.write_all(&len.to_be_bytes()).await.anyerr()?; + send.write_all(name.as_bytes()).await.anyerr()?; + Ok(()) +} + +/// Reads a named handshake written by [`write_named_handshake`]. +async fn read_named_handshake(recv: &mut R) -> Result { + let mut prefix = [0u8; HANDSHAKE_NAMED.len()]; + recv.read_exact(&mut prefix).await.anyerr()?; + ensure_any!(prefix == HANDSHAKE_NAMED, "invalid named handshake"); + let mut len_buf = [0u8; 4]; + recv.read_exact(&mut len_buf).await.anyerr()?; + let len = u32::from_be_bytes(len_buf) as usize; + ensure_any!(len <= MAX_NAME_LEN, "name too long: {len} bytes"); + let mut name = vec![0u8; len]; + recv.read_exact(&mut name).await.anyerr()?; + String::from_utf8(name).std_context("name is not valid utf8") +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_named_handshake_round_trip() { + let (mut a, mut b) = tokio::io::duplex(64); + write_named_handshake(&mut a, "boo").await.unwrap(); + let name = read_named_handshake(&mut b).await.unwrap(); + assert_eq!(name, "boo"); + } + + #[tokio::test] + async fn test_read_named_handshake_rejects_bad_prefix() { + let (mut a, mut b) = tokio::io::duplex(64); + a.write_all(b"hello").await.unwrap(); + a.write_all(&0u32.to_be_bytes()).await.unwrap(); + assert!(read_named_handshake(&mut b).await.is_err()); + } + + #[test] + fn test_parse_config() { + // A real endpoint id (32-byte ed25519 public key) in hex. + let id = "0".repeat(64); + let toml = format!( + r#" + [[connect]] + remote = "{id}" + name = "boo" + addr = "localhost:13414" + + [[accept]] + name = "foo" + addr = "localhost:31231" + + [[accept]] + name = "bar" + addr = "10.0.0.3:80" + "# + ); + let config: Config = toml::from_str(&toml).unwrap(); + assert_eq!(config.connect.len(), 1); + assert_eq!(config.connect[0].name, "boo"); + assert_eq!(config.connect[0].addr, "localhost:13414"); + assert_eq!(config.accept.len(), 2); + assert_eq!(config.accept[1].name, "bar"); + } + + #[test] + fn test_parse_config_empty() { + let config: Config = toml::from_str("").unwrap(); + assert!(config.connect.is_empty()); + assert!(config.accept.is_empty()); + } + + #[test] + fn test_static_address_lookup() { + let id = EndpointId::from_str(&"0".repeat(64)).unwrap(); + let connect = |addr: EndpointAddr| ConnectConfig { + remote: Remote(addr), + name: "n".into(), + addr: "127.0.0.1:1".into(), + }; + + // A bare id carries no hints, so no static lookup is built. + let bare = connect(EndpointAddr::from(id)); + assert!(static_address_lookup(std::slice::from_ref(&bare)).is_none()); + + // A remote with a relay hint produces a static lookup that knows the id. + let relay = "https://relay.example".parse().unwrap(); + let hinted = connect(EndpointAddr::new(id).with_relay_url(relay)); + let lookup = static_address_lookup(std::slice::from_ref(&hinted)).expect("lookup built"); + assert!(lookup.get_endpoint_info(id).is_some()); + } +} diff --git a/src/lib.rs b/src/lib.rs index e0c519d..129ad69 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,4 +10,18 @@ pub const ALPN: &[u8] = b"DUMBPIPEV0"; /// calls accept_bi() must consume it. pub const HANDSHAKE: [u8; 5] = *b"hello"; +/// The handshake prefix for named connections used by the daemon. +/// +/// A single iroh endpoint can multiplex several named tunnels, so the +/// accepting side must learn which backend an incoming stream belongs to. +/// The connecting side opens a bidi stream and writes this prefix, followed +/// by the name length as a big-endian [`u32`], followed by the UTF-8 name: +/// +/// ```text +/// HANDSHAKE_NAMED ("named") || name_len: u32 big-endian || name (UTF-8) +/// ``` +/// +/// It replaces [`HANDSHAKE`] for daemon streams. +pub const HANDSHAKE_NAMED: [u8; 5] = *b"named"; + pub use iroh_tickets::endpoint::EndpointTicket; diff --git a/src/main.rs b/src/main.rs index 097b12d..718dcdf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,6 +9,7 @@ use std::{ use clap::{Parser, Subcommand}; use dumbpipe::EndpointTicket; use iroh::{ + address_lookup::MemoryLookup, endpoint::{presets, Accepting}, Endpoint, EndpointAddr, SecretKey, }; @@ -25,6 +26,8 @@ use { tokio::net::{UnixListener, UnixStream}, }; +mod daemon; + const ONLINE_TIMEOUT: Duration = Duration::from_secs(5); /// Create a dumb pipe between two machines, using an iroh endpoint. @@ -104,6 +107,14 @@ pub enum Commands { /// As far as the endpoint is concerned, this is connecting. But it is /// listening on a Unix socket for which you have to specify the path. ConnectUnix(ConnectUnixArgs), + + /// Run many named tunnels over a single endpoint, driven by a config file. + /// + /// Reads a TOML config with `[[connect]]` and `[[accept]]` entries. Each + /// `connect` exposes a local TCP port that forwards to a remote endpoint + /// under a name; each `accept` forwards incoming named streams to a local + /// TCP backend selected by name. + Daemon(daemon::DaemonArgs), } #[derive(Parser, Debug)] @@ -300,14 +311,21 @@ fn get_or_create_secret() -> Result { } /// Create a new iroh endpoint. -async fn create_endpoint( +/// +/// When `address_lookup` is set, it is added alongside the default discovery so +/// connecting by endpoint id can use statically configured address hints. +pub(crate) async fn create_endpoint( secret_key: SecretKey, common: &CommonArgs, alpns: Vec>, + address_lookup: Option, ) -> Result { let mut builder = Endpoint::builder(presets::N0) .secret_key(secret_key) .alpns(alpns); + if let Some(address_lookup) = address_lookup { + builder = builder.address_lookup(address_lookup); + } if let Some(addr) = common.ipv4_addr { builder = builder.bind_addr(addr)?; } @@ -328,7 +346,7 @@ fn cancel_token(token: CancellationToken) -> impl Fn(T) -> T { /// Bidirectionally forward data from a noq stream and an arbitrary tokio /// reader/writer pair, aborting both sides when either one forwarder is done, /// or when control-c is pressed. -async fn forward_bidi( +pub(crate) async fn forward_bidi( from1: impl AsyncRead + Send + Sync + Unpin + 'static, to1: impl AsyncWrite + Send + Sync + Unpin + 'static, from2: noq::RecvStream, @@ -359,7 +377,8 @@ async fn forward_bidi( async fn listen_stdio(args: ListenArgs) -> Result<()> { let secret_key = get_or_create_secret()?; - let endpoint = create_endpoint(secret_key, &args.common, vec![args.common.alpn()?]).await?; + let endpoint = + create_endpoint(secret_key, &args.common, vec![args.common.alpn()?], None).await?; // wait for the endpoint to figure out its home relay and addresses before making a ticket if (timeout(ONLINE_TIMEOUT, endpoint.online()).await).is_err() { eprintln!("Warning: Failed to connect to the home relay"); @@ -423,7 +442,7 @@ async fn listen_stdio(args: ListenArgs) -> Result<()> { async fn connect_stdio(args: ConnectArgs) -> Result<()> { let secret_key = get_or_create_secret()?; - let endpoint = create_endpoint(secret_key, &args.common, vec![]).await?; + let endpoint = create_endpoint(secret_key, &args.common, vec![], None).await?; let addr = args.ticket.endpoint_addr(); let remote_endpoint_id = addr.id; // connect to the remote, try only once @@ -463,7 +482,7 @@ async fn connect_tcp(args: ConnectTcpArgs) -> Result<()> { .to_socket_addrs() .std_context(format!("invalid host string {}", args.addr))?; let secret_key = get_or_create_secret()?; - let endpoint = create_endpoint(secret_key, &args.common, vec![]) + let endpoint = create_endpoint(secret_key, &args.common, vec![], None) .await .std_context("unable to bind endpoint")?; tracing::info!("tcp listening on {:?}", addrs); @@ -545,7 +564,8 @@ async fn listen_tcp(args: ListenTcpArgs) -> Result<()> { Err(e) => bail_any!("invalid host string {}: {}", args.host, e), }; let secret_key = get_or_create_secret()?; - let endpoint = create_endpoint(secret_key, &args.common, vec![args.common.alpn()?]).await?; + let endpoint = + create_endpoint(secret_key, &args.common, vec![args.common.alpn()?], None).await?; // wait for the endpoint to figure out its address before making a ticket if (timeout(ONLINE_TIMEOUT, endpoint.online()).await).is_err() { eprintln!("Warning: Failed to connect to the home relay"); @@ -643,7 +663,8 @@ fn create_short_ticket(addr: &EndpointAddr) -> EndpointTicket { async fn listen_unix(args: ListenUnixArgs) -> Result<()> { let socket_path = args.socket_path.clone(); let secret_key = get_or_create_secret()?; - let endpoint = create_endpoint(secret_key, &args.common, vec![args.common.alpn()?]).await?; + let endpoint = + create_endpoint(secret_key, &args.common, vec![args.common.alpn()?], None).await?; // wait for the endpoint to figure out its address before making a ticket if (timeout(ONLINE_TIMEOUT, endpoint.online()).await).is_err() { eprintln!("Warning: Failed to connect to the home relay"); @@ -761,7 +782,7 @@ impl Drop for UnixSocketGuard { async fn connect_unix(args: ConnectUnixArgs) -> Result<()> { let socket_path = args.socket_path.clone(); let secret_key = get_or_create_secret()?; - let endpoint = create_endpoint(secret_key, &args.common, vec![]) + let endpoint = create_endpoint(secret_key, &args.common, vec![], None) .await .std_context("unable to bind endpoint")?; tracing::info!("unix listening on {:?}", socket_path); @@ -883,6 +904,8 @@ async fn main() -> Result<()> { #[cfg(unix)] Commands::ConnectUnix(args) => connect_unix(args).await, + + Commands::Daemon(args) => daemon::run(args).await, }; match res { Ok(()) => std::process::exit(0), From cec7e78cc9b2d3dfb45b9048425768f24612b683 Mon Sep 17 00:00:00 2001 From: Frando Date: Mon, 22 Jun 2026 13:05:59 +0200 Subject: [PATCH 2/7] print --- docs/daemon.md | 21 ++++++++++++++++++++- src/daemon.rs | 10 +++++++++- 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/docs/daemon.md b/docs/daemon.md index 9f13431..d3782a5 100644 --- a/docs/daemon.md +++ b/docs/daemon.md @@ -65,6 +65,25 @@ restarts. The secret key is resolved in this order: 2. `/dumbpipe/daemon/secret.key`, a 32-byte lowercase hex key. 3. Otherwise a fresh key is generated and written to `secret.key` for next time. +## Output + +On startup, after the endpoint comes online, the daemon prints to stdout: + +- its endpoint id, as hex, on the first line; +- its ticket on the second line, in the same format the `remote` field accepts, + so it can be pasted into a connecting daemon's config; +- one line per configured `accept` backend, as `accept -> `. + +``` +f2e16a92c17a40ceb7bbb6e6f216ad98f59f7708f32ff24bf8ed6335c908bb1d +endpointadzoc2usyf5ebtvxxo3on4qwvwmplh3xbdzs74sl7dwwgnojbc5r2ay... +accept web -> localhost:3000 +accept ssh -> localhost:22 +``` + +The id and the ticket are both valid `remote` values; the ticket additionally +carries the relay and address hints. + ## Named handshake A single endpoint multiplexes several tunnels, so each stream must say which @@ -118,7 +137,7 @@ addr = "localhost:22" ``` dumbpipe daemon -c server.toml -# logs: daemon endpoint bound endpoint_id= +# prints the endpoint id, a ticket, and the accept backends ``` On the client machine, `client.toml` (using the server's endpoint id): diff --git a/src/daemon.rs b/src/daemon.rs index f3eda37..f687845 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -160,8 +160,16 @@ pub(crate) async fn run(args: DaemonArgs) -> Result<()> { warn!("config has no [[connect]] or [[accept]] entries"); } + // Advertise this daemon's identity on stdout. The id (hex) and the ticket + // are both valid `remote` values for a connecting daemon's config; the + // ticket also carries the relay and address hints. Printed after `online` + // so the ticket includes the home relay. + println!("short addr: {}", endpoint.id()); + println!(" long addr: {}", EndpointTicket::new(endpoint.addr())); + // Route incoming streams by name to a backend address. Reject duplicate // names up front: a duplicate would silently shadow an earlier backend. + // List each configured accept backend on stdout. let mut routes: HashMap = HashMap::new(); for accept in &config.accept { if routes @@ -170,7 +178,7 @@ pub(crate) async fn run(args: DaemonArgs) -> Result<()> { { bail_any!("duplicate accept name {:?}", accept.name); } - info!(name = %accept.name, addr = %accept.addr, "accept route"); + println!("accept {} -> {}", accept.name, accept.addr); } let token = CancellationToken::new(); From b2d1d771b3146eaf1b9b28957e03166c8956550a Mon Sep 17 00:00:00 2001 From: Frando Date: Mon, 22 Jun 2026 15:45:44 +0200 Subject: [PATCH 3/7] feat: daemon subcommands, tokens, reload, new alpn, and span logging Build out the daemon into a configurable, long-running service. - New daemon ALPN, distinct from the single-tunnel dumbpipe protocol. - Per-stream header is now postcard-encoded { name, token }. - Optional per-tunnel token: an accept entry with a token only forwards streams whose connect side presents the same token. - Subcommands (start is the default): `accept` and `connect` append entries to the config file, `show` prints the configured tunnels, and `start` runs the daemon. `accept --secure` generates a random token. - `start` creates an empty config file when none exists. - `reload` (default on) watches the config with notify and applies changes live: accept routes swap atomically, connect tunnels are reconciled by local addr, new remotes' hints are registered. - Connection reuse via iroh-util ConnectionPool with one retry on connect failure, plus a static address lookup so ticket-form remotes' relay and address hints are honored. - Logging: connect/disconnect events inside incoming{remote}:tcp{name, target} and outgoing{remote}:tcp{name, target} spans; the daemon defaults to dumbpipe=info,iroh=info when RUST_LOG is unset. Documented in docs/daemon.md and linked from the README. --- Cargo.lock | 80 +++++ Cargo.toml | 5 +- README.md | 24 +- docs/daemon.md | 186 ++++++---- src/daemon.rs | 944 ++++++++++++++++++++++++++++++++++++------------- src/lib.rs | 14 - src/main.rs | 10 +- 7 files changed, 923 insertions(+), 340 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 469bfab..d36d101 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -791,6 +791,7 @@ dependencies = [ name = "dumbpipe" version = "0.39.0" dependencies = [ + "arc-swap", "clap", "data-encoding", "dirs", @@ -802,6 +803,8 @@ dependencies = [ "n0-error", "nix", "noq", + "notify", + "postcard", "rand", "serde", "tempfile", @@ -941,6 +944,15 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fsevent-sys" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" +dependencies = [ + "libc", +] + [[package]] name = "futures" version = "0.3.32" @@ -1552,6 +1564,26 @@ dependencies = [ "serde_core", ] +[[package]] +name = "inotify" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "533e68a5842e734946fe159fb03fc9bbbb254f590dd0d8ad321ae5ff7beca2c1" +dependencies = [ + "bitflags", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "inout" version = "0.1.4" @@ -1875,6 +1907,26 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "kqueue" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "273c0752728918e0ac4976f2b275b6fefb9ecd400585dec929419f3844cd87b5" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07293a4e297ac234359b510362495713f75ea345d5307140414f20c69ffeb087" +dependencies = [ + "bitflags", + "libc", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -1991,6 +2043,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "50b7e5b27aa02a74bac8c3f23f448f8d87ff11f92d3aac1a6ed369ee08cc56c1" dependencies = [ "libc", + "log", "wasi", "windows-sys 0.61.2", ] @@ -2271,6 +2324,33 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "notify" +version = "8.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d3d07927151ff8575b7087f245456e549fea62edf0ec4e565a5ee50c8402bc3" +dependencies = [ + "bitflags", + "fsevent-sys", + "inotify", + "kqueue", + "libc", + "log", + "mio", + "notify-types", + "walkdir", + "windows-sys 0.60.2", +] + +[[package]] +name = "notify-types" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42b8cfee0e339a0337359f3c88165702ac6e600dc01c0cc9579a92d62b08477a" +dependencies = [ + "bitflags", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" diff --git a/Cargo.toml b/Cargo.toml index 2041d73..771710a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,11 +29,14 @@ n0-error = "1.0.0" dirs = "6.0.0" toml = "0.9.5" serde = { version = "1.0.228", features = ["derive"] } +postcard = { version = "1.1.1", features = ["use-std"] } +notify = "8.0.0" +arc-swap = "1.7.1" +rand = "0.10" [dev-dependencies] duct = "1.1.1" nix = { version = "0.31", features = ["signal", "process"] } -rand = "0.10" tempfile = "3.8" [profile.release] diff --git a/README.md b/README.md index 446396a..d52729d 100644 --- a/README.md +++ b/README.md @@ -151,24 +151,24 @@ The `dumbpipe daemon` subcommand runs any number of incoming and outgoing tunnels over a single endpoint, driven by a TOML config file. Each `connect` entry exposes a local TCP port that forwards to a remote endpoint under a name, and each `accept` entry forwards incoming named streams to a local TCP backend. +Tunnels can be protected with a shared token. -```toml -[[connect]] -remote = "" -name = "web" -addr = "127.0.0.1:8080" +Build the config with the `accept` and `connect` subcommands (or edit the TOML +by hand): -[[accept]] -name = "web" -addr = "localhost:3000" ``` +# On the server: expose a backend, protected by a generated token. +dumbpipe daemon accept web localhost:3000 --secure +dumbpipe daemon # run it; prints the endpoint id and a ticket -``` -dumbpipe daemon -c config.toml +# On the client: forward a local port to the server's "web" tunnel. +dumbpipe daemon connect :web 127.0.0.1:8080 --token +dumbpipe daemon ``` -See [docs/daemon.md](docs/daemon.md) for the config format, key handling, and -details. +By default the daemon watches its config file and applies changes (added or +removed tunnels) while running. See [docs/daemon.md](docs/daemon.md) for the +config format, tokens, reloading, key handling, and more. ## Combining Listeners diff --git a/docs/daemon.md b/docs/daemon.md index d3782a5..f928691 100644 --- a/docs/daemon.md +++ b/docs/daemon.md @@ -7,33 +7,41 @@ one daemon serves any number of incoming and outgoing tunnels from one endpoint (one identity). ``` -dumbpipe daemon [-c ] +dumbpipe daemon [-c ] [start] +dumbpipe daemon [-c ] accept [--token ] [--secure] +dumbpipe daemon [-c ] connect : [--token ] +dumbpipe daemon [-c ] show ``` +`start` runs the daemon and is the default when no subcommand is given, so +`dumbpipe daemon` and `dumbpipe daemon start` are the same. The `accept` and +`connect` subcommands do not run anything; they edit the config file. `show` +prints the configured tunnels without running anything. + ## Config file The config path is taken from `-c/--config` if given, and otherwise defaults to `/dumbpipe/daemon/daemon.toml`, where `` is the platform data directory (`~/.local/share` on Linux, `~/Library/Application Support` on -macOS; see the [`dirs`](https://crates.io/crates/dirs) crate). - -The config has two kinds of entries, each repeatable: +macOS; see the [`dirs`](https://crates.io/crates/dirs) crate). `start` creates +an empty config there if none exists rather than failing. ```toml +# Watch this file and apply changes while running (see Reloading). +reload = true + # Expose a local TCP port that forwards to a remote endpoint under a name. [[connect]] remote = "" name = "boo" addr = "localhost:13414" +token = "optional-token" # presented to the remote accept tunnel # Forward incoming streams with a given name to a local TCP backend. [[accept]] -name = "foo" -addr = "localhost:31231" - -[[accept]] -name = "bar" -addr = "10.0.0.3:80" +name = "foo" +addr = "localhost:31231" +token = "optional-token" # required from the connecting side ``` A `connect` entry binds `addr` as a local TCP listener. Every accepted socket is @@ -56,6 +64,53 @@ that entry's `addr`. The two sides are wired together by matching names, so the [Connection handling](#connection-handling)), so the remote can be reached without waiting on discovery. +### Tokens + +A tunnel can be protected with a shared token. When an `accept` entry has a +`token`, an incoming stream is forwarded only if the connecting side presents +the same token; otherwise the stream is dropped. An `accept` entry without a +token accepts any matching stream. The token travels inside the per-stream +[header](#stream-header), which is carried over the encrypted iroh connection. + +## Editing the config from the command line + +The `accept` and `connect` subcommands append an entry to the config file +(creating it if needed), so the config can be built up without editing TOML by +hand. With [reloading](#reloading) on, a running daemon picks the change up +immediately. + +``` +# Add an accept backend named "web", protected by a generated token. +dumbpipe daemon accept web localhost:3000 --secure +# added accept web -> localhost:3000 +# token: MZUW4Z3FOJSWG5DBNVSXG43F + +# Or set the token explicitly. +dumbpipe daemon accept ssh localhost:22 --token hunter2 + +# Add a connect tunnel. The first argument is remote:name. +dumbpipe daemon connect :web 127.0.0.1:8080 --token MZUW4Z3FOJSWG5DBNVSXG43F +``` + +`--secure` generates a random token of 16 base32-encoded bytes and prints it. +`--secure` and `--token` are mutually exclusive. + +## Reloading + +When `reload` is `true` (the default, and what the subcommands write), the +daemon watches the config file with [`notify`](https://crates.io/crates/notify) +and applies changes while running: + +- accept routes are swapped in atomically; +- connect tunnels are reconciled by local address: tunnels that disappeared or + changed are stopped, and newly added ones are bound and started; +- relay and address hints for new remotes are registered. + +A bad edit (unparseable file, a remote that fails to parse, an address that +fails to bind) is logged and skipped; the daemon keeps running with the entries +that are still valid. Set `reload = false` to load the config once at startup +and ignore later changes. + ## Secret key The daemon uses a stable identity so that its endpoint id does not change across @@ -69,35 +124,62 @@ restarts. The secret key is resolved in this order: On startup, after the endpoint comes online, the daemon prints to stdout: -- its endpoint id, as hex, on the first line; -- its ticket on the second line, in the same format the `remote` field accepts, - so it can be pasted into a connecting daemon's config; -- one line per configured `accept` backend, as `accept -> `. +- `short addr:` followed by its endpoint id, as hex; +- `long addr:` followed by its ticket, in the same format the `remote` field + accepts, so it can be pasted into a connecting daemon's config; +- one line per configured tunnel, the same listing `daemon show` prints. ``` -f2e16a92c17a40ceb7bbb6e6f216ad98f59f7708f32ff24bf8ed6335c908bb1d -endpointadzoc2usyf5ebtvxxo3on4qwvwmplh3xbdzs74sl7dwwgnojbc5r2ay... -accept web -> localhost:3000 +short addr: f2e16a92c17a40ceb7bbb6e6f216ad98f59f7708f32ff24bf8ed6335c908bb1d + long addr: endpointadzoc2usyf5ebtvxxo3on4qwvwmplh3xbdzs74sl7dwwgnojbc5r2ay... +connect 127.0.0.1:8080 -> :web [token] +accept web -> localhost:3000 [token] accept ssh -> localhost:22 ``` -The id and the ticket are both valid `remote` values; the ticket additionally -carries the relay and address hints. +A `connect` line reads `connect -> :`, an `accept` +line reads `accept -> `, and `[token]` marks a +token-protected tunnel. The id and the ticket are both valid `remote` values; +the ticket additionally carries the relay and address hints. + +`daemon show` prints the same tunnel listing without starting the daemon. -## Named handshake +## Logging -A single endpoint multiplexes several tunnels, so each stream must say which -backend it is for. On a fresh bidi stream the connecting side writes a named -handshake before any data: +The daemon logs lifecycle events (endpoint binding, tunnel listeners, reloads) +at `info`, and per-forward connect/disconnect events inside tracing spans: ``` -"named" (5 bytes) || name length (u32, big-endian) || name (UTF-8) +incoming{remote=f198fe07e6}:tcp{name=foo target=localhost:31231}: connected +incoming{remote=f198fe07e6}:tcp{name=foo target=localhost:31231}: disconnected +outgoing{remote=69696f31c6}:tcp{name=foo target=127.0.0.1:58084}: connected +outgoing{remote=69696f31c6}:tcp{name=foo target=127.0.0.1:58084}: disconnected ``` -The accepting side reads the prefix, the length (capped to bound allocation), -and the name, then routes the stream to the matching `accept` backend. A stream -whose name has no matching backend is dropped with a warning. This handshake -replaces the plain `"hello"` handshake that the single-tunnel subcommands use. +An accepted iroh connection runs in an `incoming{remote}` span and each +forwarded stream in a child `tcp{name, target}` span, where `target` is the +backend address; the connect side mirrors this with `outgoing{remote}` and +`tcp{name, target}`, where `target` is the local client. `remote` is the short +endpoint id. + +When `RUST_LOG` is unset the daemon defaults to `dumbpipe=info,iroh=info`; set +`RUST_LOG` to override (for example `RUST_LOG=dumbpipe=debug`). + +## Stream header + +The daemon speaks its own ALPN, distinct from the single-tunnel dumbpipe +protocol. A single endpoint multiplexes several tunnels, so each stream begins +with a header that says which backend it is for and carries the optional token. +On a fresh bidi stream the connecting side writes, before any data: + +``` +header length (u32, big-endian) || postcard-encoded { name, token } +``` + +The accepting side reads the length (capped to bound allocation) and decodes the +header, routes the stream to the matching `accept` backend, and checks the token. +A stream whose name has no matching backend, or whose token does not match, is +dropped with a warning. ## Connection handling @@ -110,8 +192,8 @@ hiccup. Because the pool connects by endpoint id, relay and direct-address hints from a ticket-form `remote` are registered with the endpoint as a static address -lookup at startup. This runs alongside the default discovery, so a ticket's -hints let the daemon connect immediately while bare ids fall back to discovery. +lookup. This runs alongside the default discovery, so a ticket's hints let the +daemon connect immediately while bare ids fall back to discovery. ## Shutdown @@ -120,43 +202,29 @@ new connections, cancels its listeners, and closes the endpoint. ## Example -Forward a web server running on one machine to a local port on another, plus a -second tunnel for SSH, all from one daemon on each side. - -On the server machine, `server.toml`: +Forward a web server and an SSH server from one machine to local ports on +another, all from one daemon on each side. -```toml -[[accept]] -name = "web" -addr = "localhost:3000" - -[[accept]] -name = "ssh" -addr = "localhost:22" -``` +On the server machine: ``` +dumbpipe daemon -c server.toml accept web localhost:3000 +dumbpipe daemon -c server.toml accept ssh localhost:22 --secure +# token: MZUW4Z3FOJSWG5DBNVSXG43F dumbpipe daemon -c server.toml -# prints the endpoint id, a ticket, and the accept backends +# short addr: +# long addr: ``` -On the client machine, `client.toml` (using the server's endpoint id): - -```toml -[[connect]] -remote = "" -name = "web" -addr = "127.0.0.1:8080" - -[[connect]] -remote = "" -name = "ssh" -addr = "127.0.0.1:2222" -``` +On the client machine, using the server's id or ticket: ``` +dumbpipe daemon -c client.toml connect :web 127.0.0.1:8080 +dumbpipe daemon -c client.toml connect :ssh 127.0.0.1:2222 --token MZUW4Z3FOJSWG5DBNVSXG43F dumbpipe daemon -c client.toml ``` -The client can now reach the server's web server at `127.0.0.1:8080` and its -SSH server at `127.0.0.1:2222`, both over a single reused iroh connection. +The client can now reach the server's web server at `127.0.0.1:8080` and its SSH +server at `127.0.0.1:2222`, both over a single reused iroh connection. Because +the configs were built with the subcommands, `reload` is on, so the tunnels +added on each side take effect without restarting the daemons. diff --git a/src/daemon.rs b/src/daemon.rs index f687845..6d05987 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -7,8 +7,11 @@ //! A `[[connect]]` entry binds a local TCP port and forwards every accepted //! socket to a remote endpoint under a name. A `[[accept]]` entry forwards //! incoming named streams to a local TCP backend selected by that name. Because -//! one endpoint serves several tunnels, streams carry a name: see -//! [`dumbpipe::HANDSHAKE_NAMED`]. +//! one endpoint serves several tunnels, each stream is prefixed with a [`Header`] +//! carrying the name and an optional token. +//! +//! The daemon speaks its own ALPN ([`DAEMON_ALPN`]), distinct from the +//! single-tunnel dumbpipe protocol. use std::{ collections::HashMap, @@ -18,29 +21,41 @@ use std::{ time::Duration, }; -use clap::Parser; -use dumbpipe::{EndpointTicket, HANDSHAKE_NAMED}; +use arc_swap::ArcSwap; +use clap::{Parser, Subcommand}; +use dumbpipe::EndpointTicket; use iroh::{address_lookup::MemoryLookup, EndpointAddr, EndpointId, SecretKey}; use iroh_util::connection_pool::{ConnectionPool, ConnectionRef, Options}; use n0_error::{bail_any, ensure_any, Result, StdResultExt}; -use serde::Deserialize; +use notify::Watcher; +use serde::{Deserialize, Serialize}; use tokio::{ io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, net::{TcpListener, TcpStream}, select, + sync::mpsc, time::timeout, }; use tokio_util::{sync::CancellationToken, task::AbortOnDropHandle}; -use tracing::{debug, info, warn}; +use tracing::{debug, info, info_span, warn, Instrument}; use crate::CommonArgs; -/// The maximum length of a tunnel name, in bytes. +/// The ALPN spoken by the daemon protocol. +/// +/// Distinct from [`dumbpipe::ALPN`] so a daemon and a plain `dumbpipe connect` +/// never accidentally talk to each other; the daemon also frames every stream +/// with a [`Header`] rather than the fixed handshake. +const DAEMON_ALPN: &[u8] = b"DUMBPIPEDAEMON0"; + +/// The maximum size of an encoded [`Header`], in bytes. /// -/// The accepting side reads the name length from the wire before allocating the -/// name buffer, so this bound keeps a malicious or buggy peer from requesting a -/// huge allocation. -const MAX_NAME_LEN: usize = 1024; +/// The accepting side reads the header length from the wire before allocating, +/// so this bounds the allocation a peer can request. +const MAX_HEADER_LEN: usize = 4096; + +/// The number of random bytes in a `--secure` token, before base32 encoding. +const SECURE_TOKEN_BYTES: usize = 16; /// How long the connection pool keeps an idle connection before closing it. /// @@ -54,46 +69,84 @@ const POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(30); /// is generous compared to the pool's one-second default. const POOL_CONNECT_TIMEOUT: Duration = Duration::from_secs(15); +/// How long to wait after a config change before reloading, to coalesce the +/// burst of filesystem events a single save produces. +const RELOAD_DEBOUNCE: Duration = Duration::from_millis(200); + /// Arguments for the `daemon` subcommand. #[derive(Parser, Debug)] pub struct DaemonArgs { + #[clap(subcommand)] + pub command: Option, + /// Path to the daemon config file. /// /// Defaults to `/dumbpipe/daemon/daemon.toml`, where `data_dir` /// is the platform data directory (see the `dirs` crate). - #[clap(short = 'c', long)] + #[clap(short = 'c', long, global = true)] pub config: Option, #[clap(flatten)] pub common: CommonArgs, } -/// A remote endpoint reference in the config. -/// -/// Accepts either a bare [`EndpointId`] (hex or base32) or a full -/// [`EndpointTicket`] string that also carries relay and address hints. -#[derive(Debug, Clone)] -struct Remote(EndpointAddr); +/// The daemon subcommands. `start` is the default when none is given. +#[derive(Subcommand, Debug)] +pub enum DaemonCommand { + /// Run the daemon. This is the default when no subcommand is given. + Start, -impl<'de> Deserialize<'de> for Remote { - fn deserialize(deserializer: D) -> std::result::Result - where - D: serde::Deserializer<'de>, - { - let s = String::deserialize(deserializer)?; - // A ticket is the more specific form, so try it first and fall back to - // a bare endpoint id. - if let Ok(ticket) = EndpointTicket::from_str(&s) { - return Ok(Self(ticket.endpoint_addr().clone())); - } - let id = EndpointId::from_str(&s).map_err(serde::de::Error::custom)?; - Ok(Self(EndpointAddr::from(id))) - } + /// Add an accept tunnel to the config file. + Accept(AcceptCmd), + + /// Add a connect tunnel to the config file. + Connect(ConnectCmd), + + /// Print the configured connect and accept tunnels. + Show, } -/// The daemon config, parsed from TOML. -#[derive(Debug, Clone, Deserialize)] +/// Arguments for `daemon accept`. +#[derive(Parser, Debug)] +pub struct AcceptCmd { + /// The name that selects this backend. + pub name: String, + + /// The local TCP backend address to forward matching streams to. + pub addr: String, + + /// Require this token from the connecting side. + #[clap(long)] + pub token: Option, + + /// Generate a random token (16 base32 bytes) instead of passing one. + #[clap(long)] + pub secure: bool, +} + +/// Arguments for `daemon connect`. +#[derive(Parser, Debug)] +pub struct ConnectCmd { + /// The remote and name, written as `remote:name`. + /// + /// `remote` is an endpoint id or ticket; `name` selects the accept tunnel + /// on the remote daemon. + pub remote_name: String, + + /// The local TCP address to listen on. + pub addr: String, + + /// Token required by the remote accept tunnel, if any. + #[clap(long)] + pub token: Option, +} + +/// The daemon config, parsed from and written to TOML. +#[derive(Debug, Clone, Serialize, Deserialize)] struct Config { + /// Whether to watch the config file and apply changes while running. + #[serde(default = "default_true")] + reload: bool, /// Outgoing tunnels: local TCP port -> remote endpoint, under a name. #[serde(default)] connect: Vec, @@ -102,49 +155,191 @@ struct Config { accept: Vec, } +impl Default for Config { + fn default() -> Self { + Self { + reload: true, + connect: Vec::new(), + accept: Vec::new(), + } + } +} + +fn default_true() -> bool { + true +} + /// A single `[[connect]]` entry. -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] struct ConnectConfig { - /// The remote endpoint to forward to. - remote: Remote, - /// The name sent in the handshake, used by the remote to route the stream. + /// The remote endpoint id or ticket to forward to. + remote: String, + /// The name sent in the header, used by the remote to route the stream. name: String, /// The local TCP address to listen on. addr: String, + /// Token to present to the remote accept tunnel, if it requires one. + #[serde(default, skip_serializing_if = "Option::is_none")] + token: Option, } /// A single `[[accept]]` entry. -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] struct AcceptConfig { /// The name that selects this backend. name: String, /// The local TCP backend to forward matching streams to. addr: String, + /// Token the connecting side must present, if set. + #[serde(default, skip_serializing_if = "Option::is_none")] + token: Option, } -/// Runs the daemon until interrupted with ctrl-c. +/// The per-stream header, postcard-encoded ahead of any forwarded data. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +struct Header { + /// The tunnel name, used by the accepting daemon to route the stream. + name: String, + /// The token to authorize the stream, if the connect side has one. + #[serde(default)] + token: Option, +} + +/// A resolved accept route: where to forward and the token it requires. +#[derive(Debug, Clone)] +struct AcceptRoute { + addr: String, + token: Option, +} + +/// Accept routes, keyed by name. Swapped atomically on reload. +type Routes = HashMap; + +/// A running connect tunnel. Dropping it aborts the listener task. +struct ConnectTunnel { + config: ConnectConfig, + _handle: AbortOnDropHandle<()>, +} + +/// Dispatches the `daemon` subcommand. pub(crate) async fn run(args: DaemonArgs) -> Result<()> { + let config_path = match args.config { + Some(path) => path, + None => daemon_dir()?.join("daemon.toml"), + }; + match args.command.unwrap_or(DaemonCommand::Start) { + DaemonCommand::Start => start(config_path, args.common).await, + DaemonCommand::Accept(cmd) => cmd_accept(&config_path, cmd), + DaemonCommand::Connect(cmd) => cmd_connect(&config_path, cmd), + DaemonCommand::Show => cmd_show(&config_path), + } +} + +/// Prints the configured connect and accept tunnels. +fn cmd_show(config_path: &Path) -> Result<()> { + let config = load_or_default_config(config_path)?; + print_tunnels(&config); + Ok(()) +} + +/// Prints one line per configured connect and accept tunnel. +/// +/// Used both by `daemon show` and at startup, so the running set matches what +/// `show` reports. A `[token]` marker flags a token-protected tunnel. +fn print_tunnels(config: &Config) { + for connect in &config.connect { + let token = if connect.token.is_some() { + " [token]" + } else { + "" + }; + println!( + "connect {} -> {}:{}{}", + connect.addr, connect.remote, connect.name, token + ); + } + for accept in &config.accept { + let token = if accept.token.is_some() { + " [token]" + } else { + "" + }; + println!("accept {} -> {}{}", accept.name, accept.addr, token); + } +} + +/// Adds an `[[accept]]` entry to the config file. +fn cmd_accept(config_path: &Path, cmd: AcceptCmd) -> Result<()> { + let token = match (cmd.secure, cmd.token) { + (true, Some(_)) => bail_any!("--secure and --token are mutually exclusive"), + (true, None) => Some(generate_token()), + (false, token) => token, + }; + let mut config = load_or_default_config(config_path)?; + ensure_any!( + !config.accept.iter().any(|a| a.name == cmd.name), + "an accept entry named {:?} already exists", + cmd.name + ); + config.accept.push(AcceptConfig { + name: cmd.name.clone(), + addr: cmd.addr.clone(), + token: token.clone(), + }); + write_config(config_path, &config)?; + println!("added accept {} -> {}", cmd.name, cmd.addr); + if let Some(token) = &token { + println!("token: {token}"); + } + Ok(()) +} + +/// Adds a `[[connect]]` entry to the config file. +fn cmd_connect(config_path: &Path, cmd: ConnectCmd) -> Result<()> { + let Some((remote, name)) = cmd.remote_name.split_once(':') else { + bail_any!("expected remote:name, got {:?}", cmd.remote_name); + }; + ensure_any!( + !remote.is_empty() && !name.is_empty(), + "expected remote:name with both parts present, got {:?}", + cmd.remote_name + ); + // Validate the remote up front so a typo is caught now, not at next start. + parse_remote(remote)?; + let mut config = load_or_default_config(config_path)?; + config.connect.push(ConnectConfig { + remote: remote.to_string(), + name: name.to_string(), + addr: cmd.addr.clone(), + token: cmd.token, + }); + write_config(config_path, &config)?; + println!("added connect {remote}:{name} -> {}", cmd.addr); + Ok(()) +} + +/// Runs the daemon until interrupted with ctrl-c. +async fn start(config_path: PathBuf, common: CommonArgs) -> Result<()> { let dir = daemon_dir()?; - let config_path = args - .config - .clone() - .unwrap_or_else(|| dir.join("daemon.toml")); + // Create an empty config rather than failing when none exists yet. + if !config_path.exists() { + write_config(&config_path, &Config::default())?; + info!(path = %config_path.display(), "created config file"); + } let config = load_config(&config_path)?; let secret_key = load_or_create_secret(&dir)?; - // Seed a static address lookup with the relay and direct-address hints from - // ticket-form remotes. The connection pool connects by endpoint id alone, so - // without this those hints would be lost and only discovery could resolve the - // remote. Bare-id remotes carry no hints and are left to discovery. - let address_lookup = static_address_lookup(&config.connect); + // Keep the memory lookup so reloads can register hints for new remotes. + let memory_lookup = MemoryLookup::new(); + seed_lookup(&memory_lookup, &config.connect); - // The daemon always speaks the dumbpipe ALPN with the named handshake, so - // it ignores `--custom-alpn`. It still honors the bind-address options. + // The daemon always speaks DAEMON_ALPN with a postcard header, so it ignores + // `--custom-alpn`. It still honors the bind-address options. let endpoint = crate::create_endpoint( secret_key, - &args.common, - vec![dumbpipe::ALPN.to_vec()], - address_lookup, + &common, + vec![DAEMON_ALPN.to_vec()], + Some(memory_lookup.clone()), ) .await?; info!(endpoint_id = %endpoint.id(), "daemon endpoint bound"); @@ -156,41 +351,13 @@ pub(crate) async fn run(args: DaemonArgs) -> Result<()> { warn!("failed to connect to the home relay"); } - if config.connect.is_empty() && config.accept.is_empty() { - warn!("config has no [[connect]] or [[accept]] entries"); - } - - // Advertise this daemon's identity on stdout. The id (hex) and the ticket - // are both valid `remote` values for a connecting daemon's config; the - // ticket also carries the relay and address hints. Printed after `online` - // so the ticket includes the home relay. println!("short addr: {}", endpoint.id()); println!(" long addr: {}", EndpointTicket::new(endpoint.addr())); + print_tunnels(&config); - // Route incoming streams by name to a backend address. Reject duplicate - // names up front: a duplicate would silently shadow an earlier backend. - // List each configured accept backend on stdout. - let mut routes: HashMap = HashMap::new(); - for accept in &config.accept { - if routes - .insert(accept.name.clone(), accept.addr.clone()) - .is_some() - { - bail_any!("duplicate accept name {:?}", accept.name); - } - println!("accept {} -> {}", accept.name, accept.addr); - } - - let token = CancellationToken::new(); - // Hold the tasks as abort-on-drop handles so they stop when `run` returns. - let mut tasks: Vec> = Vec::new(); - - // One pool shared by all connect tunnels. It reuses a single connection per - // remote endpoint across TCP streams, keyed by endpoint id, instead of - // dialing afresh for every stream. let pool = ConnectionPool::new( endpoint.clone(), - dumbpipe::ALPN, + DAEMON_ALPN, Options { idle_timeout: POOL_IDLE_TIMEOUT, connect_timeout: POOL_CONNECT_TIMEOUT, @@ -198,117 +365,193 @@ pub(crate) async fn run(args: DaemonArgs) -> Result<()> { }, ); - for connect in config.connect { - // Bind before spawning so a bad address fails startup loudly instead of - // disappearing into a background task's log. - let listener = TcpListener::bind(&connect.addr) - .await - .with_std_context(|_| format!("failed to bind {}", connect.addr))?; - info!(addr = %connect.addr, remote = %connect.remote.0.id, name = %connect.name, "connect listening"); - let pool = pool.clone(); - let token = token.child_token(); - tasks.push(AbortOnDropHandle::new(tokio::spawn(async move { - if let Err(cause) = run_connect(pool, listener, connect, token).await { - warn!("connect listener stopped: {cause}"); - } - }))); - } + // Accept routes live behind an ArcSwap so the accept loop and its per-stream + // handlers always read the current routes; reload swaps in a new map. + let routes = Arc::new(ArcSwap::from_pointee(build_routes(&config.accept))); + let cancel = CancellationToken::new(); + let _accept = AbortOnDropHandle::new(tokio::spawn(run_accept( + endpoint.clone(), + routes.clone(), + cancel.child_token(), + ))); + + // Connect tunnels are keyed by local addr and reconciled on reload. + let mut connects: HashMap = HashMap::new(); + reconcile_connects(&mut connects, &config.connect, &pool).await; + + // Watch the config file when reload is enabled. + let (mut reloads, _watcher) = if config.reload { + let (tx, rx) = mpsc::unbounded_channel(); + let watcher = watch_config(&config_path, tx)?; + info!(path = %config_path.display(), "watching config for changes"); + (Some(rx), Some(watcher)) + } else { + (None, None) + }; - if !routes.is_empty() { - let endpoint = endpoint.clone(); - let routes = Arc::new(routes); - let token = token.child_token(); - tasks.push(AbortOnDropHandle::new(tokio::spawn(async move { - run_accept(endpoint, routes, token).await; - }))); + loop { + select! { + _ = tokio::signal::ctrl_c() => { + info!("got ctrl-c, shutting down"); + break; + } + Some(()) = maybe_recv(&mut reloads) => { + // Coalesce the burst of events a single save produces. + tokio::time::sleep(RELOAD_DEBOUNCE).await; + if let Some(rx) = reloads.as_mut() { + while rx.try_recv().is_ok() {} + } + match load_config(&config_path) { + Ok(config) => { + routes.store(Arc::new(build_routes(&config.accept))); + seed_lookup(&memory_lookup, &config.connect); + reconcile_connects(&mut connects, &config.connect, &pool).await; + info!("reloaded config"); + } + Err(cause) => warn!("failed to reload config: {cause}"), + } + } + } } - tokio::signal::ctrl_c().await.anyerr()?; - info!("got ctrl-c, shutting down"); - token.cancel(); + cancel.cancel(); endpoint.close().await; Ok(()) } -/// Builds a static address lookup from the connect remotes that carry hints. -/// -/// Returns `None` if no remote has relay or direct-address hints, in which case -/// the endpoint relies on discovery alone. -fn static_address_lookup(connect: &[ConnectConfig]) -> Option { - let hints: Vec = connect - .iter() - .map(|c| c.remote.0.clone()) - .filter(|addr| !addr.is_empty()) - .collect(); - (!hints.is_empty()).then(|| MemoryLookup::from_endpoint_info(hints)) +/// Generates a random token: [`SECURE_TOKEN_BYTES`] random bytes, base32 encoded. +fn generate_token() -> String { + let bytes: [u8; SECURE_TOKEN_BYTES] = rand::random(); + data_encoding::BASE32_NOPAD.encode(&bytes) } -/// Returns the daemon data directory, `/dumbpipe/daemon`. -fn daemon_dir() -> Result { - let Some(data_dir) = dirs::data_dir() else { - bail_any!("could not determine the platform data directory"); - }; - Ok(data_dir.join("dumbpipe").join("daemon")) +/// Builds the accept routes from the config, keyed by name. +fn build_routes(accept: &[AcceptConfig]) -> Routes { + let mut routes = Routes::new(); + for entry in accept { + let route = AcceptRoute { + addr: entry.addr.clone(), + token: entry.token.clone(), + }; + if routes.insert(entry.name.clone(), route).is_some() { + warn!(name = %entry.name, "duplicate accept name, later entry overrides"); + } + } + routes } -/// Loads and parses the config file at `path`. -fn load_config(path: &Path) -> Result { - let contents = std::fs::read_to_string(path) - .with_std_context(|_| format!("failed to read config {}", path.display()))?; - toml::from_str(&contents) - .with_std_context(|_| format!("failed to parse config {}", path.display())) +/// Registers relay and address hints from the connect remotes that carry them. +/// +/// Bare-id remotes carry no hints and are left to discovery. Unparseable +/// remotes are logged and skipped so one bad entry does not stop the rest. +fn seed_lookup(lookup: &MemoryLookup, connect: &[ConnectConfig]) { + for entry in connect { + match parse_remote(&entry.remote) { + Ok(addr) if !addr.is_empty() => lookup.add_endpoint_info(addr), + Ok(_) => {} + Err(cause) => warn!(remote = %entry.remote, "invalid remote, skipping: {cause}"), + } + } } -/// Loads the endpoint secret key, generating and persisting one if needed. +/// Reconciles the running connect tunnels with the desired config. /// -/// `IROH_SECRET` takes preference. Otherwise the key is read from -/// `/secret.key` (32-byte lowercase hex), and if that file does not exist -/// a fresh key is generated and written there. -fn load_or_create_secret(dir: &Path) -> Result { - if let Ok(secret) = std::env::var("IROH_SECRET") { - return SecretKey::from_str(&secret).std_context("invalid IROH_SECRET"); +/// Tunnels are keyed by local addr. Tunnels whose addr is gone or whose config +/// changed are stopped; newly desired tunnels are bound and started. A bind +/// failure logs and skips that tunnel rather than stopping the daemon. +async fn reconcile_connects( + connects: &mut HashMap, + desired: &[ConnectConfig], + pool: &ConnectionPool, +) { + let mut want: HashMap<&str, &ConnectConfig> = HashMap::new(); + for entry in desired { + if want.insert(entry.addr.as_str(), entry).is_some() { + warn!(addr = %entry.addr, "duplicate connect addr, ignoring later entry"); + } } - let path = dir.join("secret.key"); - match std::fs::read_to_string(&path) { - Ok(contents) => SecretKey::from_str(contents.trim()) - .with_std_context(|_| format!("invalid secret key in {}", path.display())), - Err(e) if e.kind() == std::io::ErrorKind::NotFound => { - let key = SecretKey::generate(); - std::fs::create_dir_all(dir) - .with_std_context(|_| format!("failed to create {}", dir.display()))?; - let hex = data_encoding::HEXLOWER.encode(&key.to_bytes()); - std::fs::write(&path, &hex) - .with_std_context(|_| format!("failed to write {}", path.display()))?; - info!(path = %path.display(), "generated new secret key"); - Ok(key) + + connects.retain(|addr, tunnel| match want.get(addr.as_str()) { + Some(config) if **config == tunnel.config => true, + _ => { + info!(%addr, "stopping connect tunnel"); + false + } + }); + + for (addr, config) in want { + if connects.contains_key(addr) { + continue; + } + match start_connect_tunnel(config.clone(), pool.clone()).await { + Ok(tunnel) => { + connects.insert(addr.to_string(), tunnel); + } + Err(cause) => warn!(%addr, "failed to start connect tunnel: {cause}"), } - Err(e) => Err(e).with_std_context(|_| format!("failed to read {}", path.display())), } } +/// Binds a connect tunnel's local listener and spawns its accept loop. +async fn start_connect_tunnel( + config: ConnectConfig, + pool: ConnectionPool, +) -> Result { + let remote = parse_remote(&config.remote)?; + let listener = TcpListener::bind(&config.addr) + .await + .with_std_context(|_| format!("failed to bind {}", config.addr))?; + info!(addr = %config.addr, remote = %remote.id, name = %config.name, "connect listening"); + let task_config = config.clone(); + let handle = tokio::spawn(async move { + if let Err(cause) = run_connect(pool, listener, remote.id, task_config).await { + warn!("connect tunnel stopped: {cause}"); + } + }); + Ok(ConnectTunnel { + config, + _handle: AbortOnDropHandle::new(handle), + }) +} + /// Listens on a local TCP port and forwards each socket to the remote endpoint. +/// +/// Runs in an `outgoing{remote}` span; each forwarded socket gets a child +/// `tcp{name, target}` span. async fn run_connect( pool: ConnectionPool, listener: TcpListener, + remote_id: EndpointId, config: ConnectConfig, - token: CancellationToken, ) -> Result<()> { - let remote_id = config.remote.0.id; let name = Arc::new(config.name); - loop { - let (tcp_stream, peer) = select! { - res = listener.accept() => res.std_context("error accepting tcp connection")?, - _ = token.cancelled() => break, - }; - let pool = pool.clone(); - let name = name.clone(); - tokio::spawn(async move { - if let Err(cause) = handle_connect(&pool, remote_id, &name, tcp_stream).await { - warn!("error forwarding tcp connection from {peer}: {cause}"); - } - }); + let token = Arc::new(config.token); + async { + loop { + let (tcp_stream, peer) = listener + .accept() + .await + .std_context("error accepting tcp connection")?; + let pool = pool.clone(); + let name = name.clone(); + let token = token.clone(); + let span = info_span!("tcp", name = %name, target = %peer); + tokio::spawn( + async move { + let header = Header { + name: name.as_str().to_string(), + token: (*token).clone(), + }; + if let Err(cause) = handle_connect(&pool, remote_id, header, tcp_stream).await { + warn!("error forwarding tcp connection: {cause}"); + } + } + .instrument(span), + ); + } } - Ok(()) + .instrument(info_span!("outgoing", remote = %remote_id.fmt_short())) + .await } /// Forwards a single accepted TCP socket to the remote endpoint. @@ -320,7 +563,7 @@ async fn run_connect( async fn handle_connect( pool: &ConnectionPool, remote_id: EndpointId, - name: &str, + header: Header, tcp_stream: TcpStream, ) -> Result<()> { let (tcp_recv, tcp_send) = tcp_stream.into_split(); @@ -329,9 +572,11 @@ async fn handle_connect( .open_bi() .await .std_context("error opening bidi stream")?; - write_named_handshake(&mut endpoint_send, name).await?; - crate::forward_bidi(tcp_recv, tcp_send, endpoint_recv, endpoint_send).await?; - Ok(()) + write_header(&mut endpoint_send, &header).await?; + info!("connected"); + let result = crate::forward_bidi(tcp_recv, tcp_send, endpoint_recv, endpoint_send).await; + info!("disconnected"); + result } /// Gets a pooled connection to `remote_id`, retrying the connect once on error. @@ -354,7 +599,7 @@ async fn get_connection(pool: &ConnectionPool, remote_id: EndpointId) -> Result< /// Accepts incoming endpoint connections and routes their streams by name. async fn run_accept( endpoint: iroh::Endpoint, - routes: Arc>, + routes: Arc>, token: CancellationToken, ) { loop { @@ -380,80 +625,228 @@ async fn run_accept( /// Accepts every bidi stream on one incoming connection and routes each by name. /// /// A connecting daemon reuses one connection for many TCP streams, so each -/// stream arrives as a separate bidi stream that must be accepted in turn. +/// stream arrives as a separate bidi stream that must be accepted in turn. Runs +/// in an `incoming{remote}` span; each forwarded stream gets a child +/// `tcp{name, target}` span. async fn handle_connection( accepting: iroh::endpoint::Accepting, - routes: Arc>, + routes: Arc>, ) -> Result<()> { let connection = accepting.await.std_context("error accepting connection")?; let remote_id = connection.remote_id(); - loop { - let (send, recv) = match connection.accept_bi().await { - Ok(stream) => stream, - // The remote closing the connection ends the stream loop normally. - Err(cause) => { - debug!(%remote_id, "connection closed: {cause}"); - break; - } - }; - let routes = routes.clone(); - tokio::spawn(async move { - if let Err(cause) = handle_stream(send, recv, &routes, remote_id).await { - warn!(%remote_id, "error handling stream: {cause}"); - } - }); + async { + loop { + let (send, recv) = match connection.accept_bi().await { + Ok(stream) => stream, + // The remote closing the connection ends the stream loop normally. + Err(cause) => { + debug!("accept_bi ended: {cause}"); + break; + } + }; + let routes = routes.clone(); + tokio::spawn( + async move { + if let Err(cause) = handle_stream(send, recv, &routes).await { + warn!("error handling stream: {cause}"); + } + } + .in_current_span(), + ); + } + Ok(()) } - Ok(()) + .instrument(info_span!("incoming", remote = %remote_id.fmt_short())) + .await } -/// Reads the named handshake from one incoming stream and forwards it. +/// Reads the header from one incoming stream, checks its token, and forwards it. +/// +/// Runs in the incoming connection's span; the forwarded stream gets a child +/// `tcp{name, target}` span. async fn handle_stream( send: noq::SendStream, mut recv: noq::RecvStream, - routes: &HashMap, - remote_id: EndpointId, + routes: &ArcSwap, ) -> Result<()> { - let name = read_named_handshake(&mut recv).await?; - let Some(addr) = routes.get(&name) else { - warn!(%remote_id, %name, "no route for name, dropping stream"); + let header = read_header(&mut recv).await?; + let route = routes.load().get(&header.name).cloned(); + let Some(route) = route else { + warn!(name = %header.name, "no route for name, dropping stream"); return Ok(()); }; - info!(%remote_id, %name, %addr, "forwarding named stream"); - let backend = TcpStream::connect(addr) - .await - .with_std_context(|_| format!("error connecting to backend {addr}"))?; - let (backend_recv, backend_send) = backend.into_split(); - crate::forward_bidi(backend_recv, backend_send, recv, send).await?; - Ok(()) + if let Some(expected) = &route.token { + if header.token.as_deref() != Some(expected.as_str()) { + warn!(name = %header.name, "token mismatch, dropping stream"); + return Ok(()); + } + } + let span = info_span!("tcp", name = %header.name, target = %route.addr); + async move { + let backend = TcpStream::connect(&route.addr) + .await + .with_std_context(|_| format!("error connecting to backend {}", route.addr))?; + info!("connected"); + let (backend_recv, backend_send) = backend.into_split(); + let result = crate::forward_bidi(backend_recv, backend_send, recv, send).await; + info!("disconnected"); + result + } + .instrument(span) + .await } -/// Writes the named handshake: prefix, name length (`u32` big-endian), name. -async fn write_named_handshake(send: &mut W, name: &str) -> Result<()> { +/// Writes the postcard-encoded header, length-prefixed with a big-endian `u32`. +async fn write_header(send: &mut W, header: &Header) -> Result<()> { + let bytes = postcard::to_stdvec(header).std_context("failed to encode header")?; ensure_any!( - name.len() <= MAX_NAME_LEN, - "name too long: {} bytes", - name.len() + bytes.len() <= MAX_HEADER_LEN, + "header too large: {} bytes", + bytes.len() ); - // The bound above keeps this well within u32 range. - let len = name.len() as u32; - send.write_all(&HANDSHAKE_NAMED).await.anyerr()?; + let len = bytes.len() as u32; send.write_all(&len.to_be_bytes()).await.anyerr()?; - send.write_all(name.as_bytes()).await.anyerr()?; + send.write_all(&bytes).await.anyerr()?; Ok(()) } -/// Reads a named handshake written by [`write_named_handshake`]. -async fn read_named_handshake(recv: &mut R) -> Result { - let mut prefix = [0u8; HANDSHAKE_NAMED.len()]; - recv.read_exact(&mut prefix).await.anyerr()?; - ensure_any!(prefix == HANDSHAKE_NAMED, "invalid named handshake"); +/// Reads a header written by [`write_header`]. +async fn read_header(recv: &mut R) -> Result
{ let mut len_buf = [0u8; 4]; recv.read_exact(&mut len_buf).await.anyerr()?; let len = u32::from_be_bytes(len_buf) as usize; - ensure_any!(len <= MAX_NAME_LEN, "name too long: {len} bytes"); - let mut name = vec![0u8; len]; - recv.read_exact(&mut name).await.anyerr()?; - String::from_utf8(name).std_context("name is not valid utf8") + ensure_any!(len <= MAX_HEADER_LEN, "header too large: {len} bytes"); + let mut bytes = vec![0u8; len]; + recv.read_exact(&mut bytes).await.anyerr()?; + postcard::from_bytes(&bytes).std_context("failed to decode header") +} + +/// Parses a remote from an endpoint id (hex or base32) or a full ticket. +fn parse_remote(s: &str) -> Result { + // A ticket is the more specific form, so try it first and fall back to a + // bare endpoint id. + if let Ok(ticket) = EndpointTicket::from_str(s) { + return Ok(ticket.endpoint_addr().clone()); + } + let id = EndpointId::from_str(s).std_context("invalid remote endpoint id or ticket")?; + Ok(EndpointAddr::from(id)) +} + +/// Returns the daemon data directory, `/dumbpipe/daemon`. +fn daemon_dir() -> Result { + let Some(data_dir) = dirs::data_dir() else { + bail_any!("could not determine the platform data directory"); + }; + Ok(data_dir.join("dumbpipe").join("daemon")) +} + +/// Loads and parses the config file at `path`, failing if it does not exist. +fn load_config(path: &Path) -> Result { + let contents = std::fs::read_to_string(path) + .with_std_context(|_| format!("failed to read config {}", path.display()))?; + toml::from_str(&contents) + .with_std_context(|_| format!("failed to parse config {}", path.display())) +} + +/// Loads the config at `path`, returning the default config if it is missing. +fn load_or_default_config(path: &Path) -> Result { + match std::fs::read_to_string(path) { + Ok(contents) => toml::from_str(&contents) + .with_std_context(|_| format!("failed to parse config {}", path.display())), + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Config::default()), + Err(e) => Err(e).with_std_context(|_| format!("failed to read config {}", path.display())), + } +} + +/// Writes the config to `path` as TOML, creating the parent directory if needed. +fn write_config(path: &Path, config: &Config) -> Result<()> { + if let Some(parent) = path.parent() { + if !parent.as_os_str().is_empty() { + std::fs::create_dir_all(parent) + .with_std_context(|_| format!("failed to create {}", parent.display()))?; + } + } + let contents = toml::to_string(config).std_context("failed to serialize config")?; + std::fs::write(path, contents) + .with_std_context(|_| format!("failed to write {}", path.display()))?; + Ok(()) +} + +/// Watches the config file and signals on `tx` when it changes. +/// +/// Watches the parent directory rather than the file, so the watch survives +/// editors that replace the file on save. Events are filtered to the config +/// file's name. +fn watch_config( + config_path: &Path, + tx: mpsc::UnboundedSender<()>, +) -> Result { + let dir = config_path + .parent() + .filter(|p| !p.as_os_str().is_empty()) + .map_or_else(|| PathBuf::from("."), Path::to_path_buf); + let file_name = config_path.file_name().map(ToOwned::to_owned); + let mut watcher = notify::recommended_watcher(move |res: notify::Result| { + let Ok(event) = res else { + return; + }; + // Ignore access events: the reload itself reads the file, and reacting + // to that read would spin a feedback loop. + if matches!(event.kind, notify::EventKind::Access(_)) { + return; + } + let relevant = match &file_name { + Some(name) => event + .paths + .iter() + .any(|p| p.file_name() == Some(name.as_os_str())), + None => true, + }; + if relevant { + // The receiver going away just means we are shutting down. + let _ = tx.send(()); + } + }) + .std_context("failed to create config watcher")?; + watcher + .watch(&dir, notify::RecursiveMode::NonRecursive) + .with_std_context(|_| format!("failed to watch {}", dir.display()))?; + Ok(watcher) +} + +/// Awaits the next reload signal, or never resolves when reload is disabled. +async fn maybe_recv(rx: &mut Option>) -> Option<()> { + match rx { + Some(rx) => rx.recv().await, + None => std::future::pending().await, + } +} + +/// Loads the endpoint secret key, generating and persisting one if needed. +/// +/// `IROH_SECRET` takes preference. Otherwise the key is read from +/// `/secret.key` (32-byte lowercase hex), and if that file does not exist +/// a fresh key is generated and written there. +fn load_or_create_secret(dir: &Path) -> Result { + if let Ok(secret) = std::env::var("IROH_SECRET") { + return SecretKey::from_str(&secret).std_context("invalid IROH_SECRET"); + } + let path = dir.join("secret.key"); + match std::fs::read_to_string(&path) { + Ok(contents) => SecretKey::from_str(contents.trim()) + .with_std_context(|_| format!("invalid secret key in {}", path.display())), + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + let key = SecretKey::generate(); + std::fs::create_dir_all(dir) + .with_std_context(|_| format!("failed to create {}", dir.display()))?; + let hex = data_encoding::HEXLOWER.encode(&key.to_bytes()); + std::fs::write(&path, &hex) + .with_std_context(|_| format!("failed to write {}", path.display()))?; + info!(path = %path.display(), "generated new secret key"); + Ok(key) + } + Err(e) => Err(e).with_std_context(|_| format!("failed to read {}", path.display())), + } } #[cfg(test)] @@ -461,24 +854,38 @@ mod tests { use super::*; #[tokio::test] - async fn test_named_handshake_round_trip() { + async fn test_header_round_trip() { + let (mut a, mut b) = tokio::io::duplex(64); + let header = Header { + name: "boo".into(), + token: Some("s3cret".into()), + }; + write_header(&mut a, &header).await.unwrap(); + let back = read_header(&mut b).await.unwrap(); + assert_eq!(back, header); + } + + #[tokio::test] + async fn test_header_round_trip_no_token() { let (mut a, mut b) = tokio::io::duplex(64); - write_named_handshake(&mut a, "boo").await.unwrap(); - let name = read_named_handshake(&mut b).await.unwrap(); - assert_eq!(name, "boo"); + let header = Header { + name: "boo".into(), + token: None, + }; + write_header(&mut a, &header).await.unwrap(); + assert_eq!(read_header(&mut b).await.unwrap(), header); } #[tokio::test] - async fn test_read_named_handshake_rejects_bad_prefix() { + async fn test_read_header_rejects_oversize_length() { let (mut a, mut b) = tokio::io::duplex(64); - a.write_all(b"hello").await.unwrap(); - a.write_all(&0u32.to_be_bytes()).await.unwrap(); - assert!(read_named_handshake(&mut b).await.is_err()); + let len = (MAX_HEADER_LEN as u32) + 1; + a.write_all(&len.to_be_bytes()).await.unwrap(); + assert!(read_header(&mut b).await.is_err()); } #[test] fn test_parse_config() { - // A real endpoint id (32-byte ed25519 public key) in hex. let id = "0".repeat(64); let toml = format!( r#" @@ -486,48 +893,79 @@ mod tests { remote = "{id}" name = "boo" addr = "localhost:13414" + token = "abc" [[accept]] name = "foo" addr = "localhost:31231" - - [[accept]] - name = "bar" - addr = "10.0.0.3:80" "# ); let config: Config = toml::from_str(&toml).unwrap(); + // reload defaults to true when absent. + assert!(config.reload); assert_eq!(config.connect.len(), 1); - assert_eq!(config.connect[0].name, "boo"); - assert_eq!(config.connect[0].addr, "localhost:13414"); - assert_eq!(config.accept.len(), 2); - assert_eq!(config.accept[1].name, "bar"); + assert_eq!(config.connect[0].remote, id); + assert_eq!(config.connect[0].token.as_deref(), Some("abc")); + assert_eq!(config.accept.len(), 1); + assert_eq!(config.accept[0].token, None); } #[test] fn test_parse_config_empty() { let config: Config = toml::from_str("").unwrap(); + assert!(config.reload); assert!(config.connect.is_empty()); assert!(config.accept.is_empty()); } #[test] - fn test_static_address_lookup() { - let id = EndpointId::from_str(&"0".repeat(64)).unwrap(); - let connect = |addr: EndpointAddr| ConnectConfig { - remote: Remote(addr), - name: "n".into(), - addr: "127.0.0.1:1".into(), + fn test_config_round_trip() { + let config = Config { + reload: false, + connect: vec![ConnectConfig { + remote: "0".repeat(64), + name: "boo".into(), + addr: "127.0.0.1:1".into(), + token: Some("tok".into()), + }], + accept: vec![AcceptConfig { + name: "foo".into(), + addr: "127.0.0.1:2".into(), + token: None, + }], }; + let toml = toml::to_string(&config).unwrap(); + let back: Config = toml::from_str(&toml).unwrap(); + assert_eq!(back.reload, config.reload); + assert_eq!(back.connect, config.connect); + assert_eq!(back.accept, config.accept); + } - // A bare id carries no hints, so no static lookup is built. - let bare = connect(EndpointAddr::from(id)); - assert!(static_address_lookup(std::slice::from_ref(&bare)).is_none()); + #[test] + fn test_generate_token_is_16_bytes() { + let token = generate_token(); + let decoded = data_encoding::BASE32_NOPAD + .decode(token.as_bytes()) + .unwrap(); + assert_eq!(decoded.len(), SECURE_TOKEN_BYTES); + } + + #[test] + fn test_build_routes_keeps_token() { + let accept = vec![AcceptConfig { + name: "foo".into(), + addr: "127.0.0.1:1".into(), + token: Some("tok".into()), + }]; + let routes = build_routes(&accept); + assert_eq!(routes.get("foo").unwrap().token.as_deref(), Some("tok")); + } - // A remote with a relay hint produces a static lookup that knows the id. - let relay = "https://relay.example".parse().unwrap(); - let hinted = connect(EndpointAddr::new(id).with_relay_url(relay)); - let lookup = static_address_lookup(std::slice::from_ref(&hinted)).expect("lookup built"); - assert!(lookup.get_endpoint_info(id).is_some()); + #[test] + fn test_parse_remote_accepts_bare_id() { + let id = "0".repeat(64); + let addr = parse_remote(&id).unwrap(); + assert!(addr.is_empty()); + assert!(parse_remote("not-a-real-remote").is_err()); } } diff --git a/src/lib.rs b/src/lib.rs index 129ad69..e0c519d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,18 +10,4 @@ pub const ALPN: &[u8] = b"DUMBPIPEV0"; /// calls accept_bi() must consume it. pub const HANDSHAKE: [u8; 5] = *b"hello"; -/// The handshake prefix for named connections used by the daemon. -/// -/// A single iroh endpoint can multiplex several named tunnels, so the -/// accepting side must learn which backend an incoming stream belongs to. -/// The connecting side opens a bidi stream and writes this prefix, followed -/// by the name length as a big-endian [`u32`], followed by the UTF-8 name: -/// -/// ```text -/// HANDSHAKE_NAMED ("named") || name_len: u32 big-endian || name (UTF-8) -/// ``` -/// -/// It replaces [`HANDSHAKE`] for daemon streams. -pub const HANDSHAKE_NAMED: [u8; 5] = *b"named"; - pub use iroh_tickets::endpoint::EndpointTicket; diff --git a/src/main.rs b/src/main.rs index 718dcdf..e52e52b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -890,8 +890,16 @@ async fn generate_ticket() -> Result<()> { #[tokio::main] async fn main() -> Result<()> { - tracing_subscriber::fmt::init(); let args = Args::parse(); + // The daemon is a long-running process, so give it a useful default log + // filter when RUST_LOG is unset. Other subcommands keep the quiet default. + let filter = match (std::env::var_os("RUST_LOG"), &args.command) { + (None, Commands::Daemon(_)) => { + tracing_subscriber::EnvFilter::new("dumbpipe=info,iroh=info") + } + _ => tracing_subscriber::EnvFilter::from_default_env(), + }; + tracing_subscriber::fmt().with_env_filter(filter).init(); let res = match args.command { Commands::GenerateTicket => generate_ticket().await, Commands::Listen(args) => listen_stdio(args).await, From 52145e5ea1efeb4edfda7ad515b635c44621418a Mon Sep 17 00:00:00 2001 From: Frando Date: Tue, 23 Jun 2026 13:22:38 +0200 Subject: [PATCH 4/7] feat: manage the daemon as a user-level service Add service subcommands backed by the service-manager crate, and make a subcommand required so bare `dumbpipe daemon` prints help. - `install` / `uninstall` / `start` / `stop` manage a user-level service (systemd user units, launchd, etc.). `install` runs `daemon run` with an absolute config path and enables start at login. - `run` runs the daemon in the foreground (formerly the default `start`). - The systemd unit is named `dumbpipe.service`. - Default log filter `dumbpipe=info,iroh=info` now applies to `daemon run` specifically. Documented in docs/daemon.md and the README. --- Cargo.lock | 133 +++++++++++++++++++++++++++++++++++++++++++++++-- Cargo.toml | 1 + README.md | 12 +++-- docs/daemon.md | 42 ++++++++++++---- src/daemon.rs | 130 ++++++++++++++++++++++++++++++++++++++++++++--- src/main.rs | 12 +++-- 6 files changed, 299 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d36d101..13ebecb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -382,6 +382,15 @@ dependencies = [ "thiserror 2.0.18", ] +[[package]] +name = "codepage" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48f68d061bc2828ae826206326e61251aca94c1e4a5305cf52d9138639c918b4" +dependencies = [ + "encoding_rs", +] + [[package]] name = "colorchoice" version = "1.0.5" @@ -720,13 +729,33 @@ dependencies = [ "crypto-common 0.2.2", ] +[[package]] +name = "dirs" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059" +dependencies = [ + "dirs-sys 0.3.7", +] + [[package]] name = "dirs" version = "6.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3e8aa94d75141228480295a7d0e7feb620b1a5ad9f12bc40be62411e38cce4e" dependencies = [ - "dirs-sys", + "dirs-sys 0.5.0", +] + +[[package]] +name = "dirs-sys" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b1d1d91c932ef41c0f2663aa8b0ca0342d444d842c06914aa0a7e352d0bada6" +dependencies = [ + "libc", + "redox_users 0.4.6", + "winapi", ] [[package]] @@ -737,7 +766,7 @@ checksum = "e01a3366d27ee9890022452ee61b2b63a67e6f13f58900b651ff5665f0bb1fab" dependencies = [ "libc", "option-ext", - "redox_users", + "redox_users 0.5.2", "windows-sys 0.61.2", ] @@ -794,7 +823,7 @@ dependencies = [ "arc-swap", "clap", "data-encoding", - "dirs", + "dirs 6.0.0", "duct", "hex", "iroh", @@ -807,6 +836,7 @@ dependencies = [ "postcard", "rand", "serde", + "service-manager", "tempfile", "tokio", "tokio-util", @@ -860,6 +890,26 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "edd0f118536f44f5ccd48bcb8b111bdc3de888b58c74639dfb034a357d0f206d" +[[package]] +name = "encoding-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87b881ab2524b96a5ce932056c7482ba6152e2226fed3936b3e592adeb95ca6d" +dependencies = [ + "codepage", + "encoding_rs", + "windows-sys 0.52.0", +] + +[[package]] +name = "encoding_rs" +version = "0.8.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" +dependencies = [ + "cfg-if", +] + [[package]] name = "enum-assoc" version = "1.3.0" @@ -1294,6 +1344,15 @@ dependencies = [ "tracing", ] +[[package]] +name = "home" +version = "0.5.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc627f471c528ff0c4a49e1d5e60450c8f6461dd6d10ba9dcd3a61d3dff7728d" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "http" version = "1.4.1" @@ -1960,6 +2019,12 @@ dependencies = [ "libc", ] +[[package]] +name = "linux-raw-sys" +version = "0.4.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab" + [[package]] name = "linux-raw-sys" version = "0.12.1" @@ -2818,6 +2883,17 @@ dependencies = [ "bitflags", ] +[[package]] +name = "redox_users" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba009ff324d1fc1b900bd1fdb31564febe58a8ccc8a6fdbb93b543d33b13ca43" +dependencies = [ + "getrandom 0.2.17", + "libredox", + "thiserror 1.0.69", +] + [[package]] name = "redox_users" version = "0.5.2" @@ -2918,6 +2994,19 @@ dependencies = [ "semver", ] +[[package]] +name = "rustix" +version = "0.38.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154" +dependencies = [ + "bitflags", + "errno", + "libc", + "linux-raw-sys 0.4.15", + "windows-sys 0.52.0", +] + [[package]] name = "rustix" version = "1.1.4" @@ -2927,7 +3016,7 @@ dependencies = [ "bitflags", "errno", "libc", - "linux-raw-sys", + "linux-raw-sys 0.12.1", "windows-sys 0.61.2", ] @@ -3165,6 +3254,22 @@ dependencies = [ "serde", ] +[[package]] +name = "service-manager" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9ff6975a4ea07dda326cf122fcc1b5cf6266daad42d1442a666a99fe50f0de9" +dependencies = [ + "cfg-if", + "dirs 4.0.0", + "encoding-utils", + "encoding_rs", + "log", + "plist", + "which", + "xml-rs", +] + [[package]] name = "sha1_smol" version = "1.0.1" @@ -3452,7 +3557,7 @@ dependencies = [ "fastrand", "getrandom 0.4.2", "once_cell", - "rustix", + "rustix 1.1.4", "windows-sys 0.61.2", ] @@ -4132,6 +4237,18 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "which" +version = "4.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7" +dependencies = [ + "either", + "home", + "once_cell", + "rustix 0.38.44", +] + [[package]] name = "widestring" version = "1.2.1" @@ -4661,6 +4778,12 @@ dependencies = [ "web-sys", ] +[[package]] +name = "xml-rs" +version = "0.8.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ae8337f8a065cfc972643663ea4279e04e7256de865aa66fe25cec5fb912d3f" + [[package]] name = "yoke" version = "0.8.2" diff --git a/Cargo.toml b/Cargo.toml index 771710a..103bf41 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ postcard = { version = "1.1.1", features = ["use-std"] } notify = "8.0.0" arc-swap = "1.7.1" rand = "0.10" +service-manager = "0.11.0" [dev-dependencies] duct = "1.1.1" diff --git a/README.md b/README.md index d52729d..8f2f52e 100644 --- a/README.md +++ b/README.md @@ -159,16 +159,18 @@ by hand): ``` # On the server: expose a backend, protected by a generated token. dumbpipe daemon accept web localhost:3000 --secure -dumbpipe daemon # run it; prints the endpoint id and a ticket +dumbpipe daemon run # run it; prints the endpoint id and a ticket # On the client: forward a local port to the server's "web" tunnel. dumbpipe daemon connect :web 127.0.0.1:8080 --token -dumbpipe daemon +dumbpipe daemon run ``` -By default the daemon watches its config file and applies changes (added or -removed tunnels) while running. See [docs/daemon.md](docs/daemon.md) for the -config format, tokens, reloading, key handling, and more. +`daemon run` runs in the foreground; `daemon install` / `start` / `stop` / +`uninstall` manage it as a user-level service instead. By default the daemon +watches its config file and applies changes (added or removed tunnels) while +running. See [docs/daemon.md](docs/daemon.md) for the config format, tokens, +reloading, the service commands, key handling, and more. ## Combining Listeners diff --git a/docs/daemon.md b/docs/daemon.md index f928691..19fe378 100644 --- a/docs/daemon.md +++ b/docs/daemon.md @@ -7,24 +7,28 @@ one daemon serves any number of incoming and outgoing tunnels from one endpoint (one identity). ``` -dumbpipe daemon [-c ] [start] +dumbpipe daemon [-c ] run +dumbpipe daemon [-c ] install | uninstall | start | stop dumbpipe daemon [-c ] accept [--token ] [--secure] dumbpipe daemon [-c ] connect : [--token ] dumbpipe daemon [-c ] show ``` -`start` runs the daemon and is the default when no subcommand is given, so -`dumbpipe daemon` and `dumbpipe daemon start` are the same. The `accept` and -`connect` subcommands do not run anything; they edit the config file. `show` -prints the configured tunnels without running anything. +A subcommand is required; `dumbpipe daemon` with none prints help. + +- `run` runs the daemon in the foreground. +- `install` / `uninstall` / `start` / `stop` manage the daemon as a user-level + service (see [Running as a service](#running-as-a-service)). +- `accept` / `connect` edit the config file and do not run anything. +- `show` prints the configured tunnels. ## Config file The config path is taken from `-c/--config` if given, and otherwise defaults to `/dumbpipe/daemon/daemon.toml`, where `` is the platform data directory (`~/.local/share` on Linux, `~/Library/Application Support` on -macOS; see the [`dirs`](https://crates.io/crates/dirs) crate). `start` creates -an empty config there if none exists rather than failing. +macOS; see the [`dirs`](https://crates.io/crates/dirs) crate). `run` creates an +empty config there if none exists rather than failing. ```toml # Watch this file and apply changes while running (see Reloading). @@ -95,6 +99,26 @@ dumbpipe daemon connect :web 127.0.0.1:8080 --token MZUW4Z3 `--secure` generates a random token of 16 base32-encoded bytes and prints it. `--secure` and `--token` are mutually exclusive. +## Running as a service + +The daemon can install itself as a user-level service via +[`service-manager`](https://crates.io/crates/service-manager), which uses the +platform's native service manager (systemd user units on Linux, launchd on +macOS, and so on): + +``` +dumbpipe daemon -c config.toml install # install a service that runs `daemon run -c ` +dumbpipe daemon start # start it +dumbpipe daemon stop # stop it +dumbpipe daemon uninstall # remove it +``` + +`install` records the absolute config path so the service finds it regardless of +its working directory, and enables start at login. The service runs `daemon run` +in the foreground under the service manager. User-level services are not +supported on every platform; `install` fails with an explanation where they are +not. + ## Reloading When `reload` is `true` (the default, and what the subcommands write), the @@ -211,7 +235,7 @@ On the server machine: dumbpipe daemon -c server.toml accept web localhost:3000 dumbpipe daemon -c server.toml accept ssh localhost:22 --secure # token: MZUW4Z3FOJSWG5DBNVSXG43F -dumbpipe daemon -c server.toml +dumbpipe daemon -c server.toml run # short addr: # long addr: ``` @@ -221,7 +245,7 @@ On the client machine, using the server's id or ticket: ``` dumbpipe daemon -c client.toml connect :web 127.0.0.1:8080 dumbpipe daemon -c client.toml connect :ssh 127.0.0.1:2222 --token MZUW4Z3FOJSWG5DBNVSXG43F -dumbpipe daemon -c client.toml +dumbpipe daemon -c client.toml run ``` The client can now reach the server's web server at `127.0.0.1:8080` and its SSH diff --git a/src/daemon.rs b/src/daemon.rs index 6d05987..d1b422d 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -15,6 +15,7 @@ use std::{ collections::HashMap, + ffi::OsString, path::{Path, PathBuf}, str::FromStr, sync::Arc, @@ -29,6 +30,10 @@ use iroh_util::connection_pool::{ConnectionPool, ConnectionRef, Options}; use n0_error::{bail_any, ensure_any, Result, StdResultExt}; use notify::Watcher; use serde::{Deserialize, Serialize}; +use service_manager::{ + RestartPolicy, ServiceInstallCtx, ServiceLabel, ServiceLevel, ServiceManager, ServiceStartCtx, + ServiceStopCtx, ServiceUninstallCtx, +}; use tokio::{ io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, net::{TcpListener, TcpStream}, @@ -73,11 +78,18 @@ const POOL_CONNECT_TIMEOUT: Duration = Duration::from_secs(15); /// burst of filesystem events a single save produces. const RELOAD_DEBOUNCE: Duration = Duration::from_millis(200); +/// The label identifying the daemon service to the platform service manager. +/// +/// A single-component label keeps the systemd unit named `dumbpipe.service` +/// rather than the `{organization}-{application}` form a qualified label yields. +const SERVICE_LABEL: &str = "dumbpipe"; + /// Arguments for the `daemon` subcommand. #[derive(Parser, Debug)] +#[command(arg_required_else_help = true)] pub struct DaemonArgs { #[clap(subcommand)] - pub command: Option, + pub command: DaemonCommand, /// Path to the daemon config file. /// @@ -90,12 +102,24 @@ pub struct DaemonArgs { pub common: CommonArgs, } -/// The daemon subcommands. `start` is the default when none is given. +/// The daemon subcommands. #[derive(Subcommand, Debug)] pub enum DaemonCommand { - /// Run the daemon. This is the default when no subcommand is given. + /// Install the daemon as a user-level service. + Install, + + /// Stop and remove the daemon service. + Uninstall, + + /// Start the installed daemon service. Start, + /// Stop the running daemon service. + Stop, + + /// Run the daemon in the foreground. + Run, + /// Add an accept tunnel to the config file. Accept(AcceptCmd), @@ -227,14 +251,106 @@ pub(crate) async fn run(args: DaemonArgs) -> Result<()> { Some(path) => path, None => daemon_dir()?.join("daemon.toml"), }; - match args.command.unwrap_or(DaemonCommand::Start) { - DaemonCommand::Start => start(config_path, args.common).await, + match args.command { + DaemonCommand::Install => cmd_install(&config_path), + DaemonCommand::Uninstall => cmd_uninstall(), + DaemonCommand::Start => cmd_service_start(), + DaemonCommand::Stop => cmd_service_stop(), + DaemonCommand::Run => run_foreground(config_path, args.common).await, DaemonCommand::Accept(cmd) => cmd_accept(&config_path, cmd), DaemonCommand::Connect(cmd) => cmd_connect(&config_path, cmd), DaemonCommand::Show => cmd_show(&config_path), } } +/// Builds the platform service manager, set to manage user-level services. +fn service_manager() -> Result> { + let mut manager = + ::native().std_context("no supported service manager found")?; + manager + .set_level(ServiceLevel::User) + .std_context("user-level services are not supported on this platform")?; + Ok(manager) +} + +/// The service label for the daemon. +fn service_label() -> ServiceLabel { + SERVICE_LABEL.parse().expect("service label is valid") +} + +/// Installs the daemon as a user-level service that runs `daemon run`. +fn cmd_install(config_path: &Path) -> Result<()> { + let manager = service_manager()?; + let label = service_label(); + let program = + std::env::current_exe().std_context("could not determine the dumbpipe binary path")?; + // Pass an absolute config path so the service finds it regardless of the + // working directory it is launched in. + let config = std::path::absolute(config_path) + .with_std_context(|_| format!("could not resolve config path {}", config_path.display()))?; + manager + .install(ServiceInstallCtx { + label: label.clone(), + program, + args: vec![ + OsString::from("daemon"), + OsString::from("run"), + OsString::from("-c"), + config.clone().into_os_string(), + ], + contents: None, + username: None, + working_directory: None, + environment: None, + autostart: true, + restart_policy: RestartPolicy::default(), + }) + .std_context("failed to install service")?; + println!("installed dumbpipe daemon service {label}"); + println!("config: {}", config.display()); + println!("start it with: dumbpipe daemon start"); + Ok(()) +} + +/// Stops and removes the daemon service. +fn cmd_uninstall() -> Result<()> { + let manager = service_manager()?; + let label = service_label(); + manager + .uninstall(ServiceUninstallCtx { + label: label.clone(), + }) + .std_context("failed to uninstall service")?; + println!("uninstalled dumbpipe daemon service {label}"); + Ok(()) +} + +/// Starts the installed daemon service. +fn cmd_service_start() -> Result<()> { + let manager = service_manager()?; + let label = service_label(); + manager + .start(ServiceStartCtx { + label: label.clone(), + }) + .std_context("failed to start service")?; + println!("started dumbpipe daemon service {label}"); + Ok(()) +} + +/// Stops the running daemon service. +fn cmd_service_stop() -> Result<()> { + let manager = service_manager()?; + let label = service_label(); + manager + .stop(ServiceStopCtx { + label: label.clone(), + }) + .std_context("failed to stop service")?; + println!("stopped dumbpipe daemon service {label}"); + Ok(()) +} + /// Prints the configured connect and accept tunnels. fn cmd_show(config_path: &Path) -> Result<()> { let config = load_or_default_config(config_path)?; @@ -318,8 +434,8 @@ fn cmd_connect(config_path: &Path, cmd: ConnectCmd) -> Result<()> { Ok(()) } -/// Runs the daemon until interrupted with ctrl-c. -async fn start(config_path: PathBuf, common: CommonArgs) -> Result<()> { +/// Runs the daemon in the foreground until interrupted with ctrl-c. +async fn run_foreground(config_path: PathBuf, common: CommonArgs) -> Result<()> { let dir = daemon_dir()?; // Create an empty config rather than failing when none exists yet. if !config_path.exists() { diff --git a/src/main.rs b/src/main.rs index e52e52b..09b2257 100644 --- a/src/main.rs +++ b/src/main.rs @@ -891,12 +891,14 @@ async fn generate_ticket() -> Result<()> { #[tokio::main] async fn main() -> Result<()> { let args = Args::parse(); - // The daemon is a long-running process, so give it a useful default log + // `daemon run` is a long-running process, so give it a useful default log // filter when RUST_LOG is unset. Other subcommands keep the quiet default. - let filter = match (std::env::var_os("RUST_LOG"), &args.command) { - (None, Commands::Daemon(_)) => { - tracing_subscriber::EnvFilter::new("dumbpipe=info,iroh=info") - } + let daemon_run = matches!( + &args.command, + Commands::Daemon(args) if matches!(args.command, daemon::DaemonCommand::Run) + ); + let filter = match (std::env::var_os("RUST_LOG"), daemon_run) { + (None, true) => tracing_subscriber::EnvFilter::new("dumbpipe=info,iroh=info"), _ => tracing_subscriber::EnvFilter::from_default_env(), }; tracing_subscriber::fmt().with_env_filter(filter).init(); From 02b7bdfe43895f9468c5de10f8a8171a5cf8c1ee Mon Sep 17 00:00:00 2001 From: Frando Date: Tue, 23 Jun 2026 13:26:22 +0200 Subject: [PATCH 5/7] feat: add daemon status subcommand Add `dumbpipe daemon status`, which reports the service status via service-manager: not installed, running, or stopped. --- docs/daemon.md | 7 ++++--- src/daemon.rs | 22 +++++++++++++++++++++- 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/docs/daemon.md b/docs/daemon.md index 19fe378..ca92d71 100644 --- a/docs/daemon.md +++ b/docs/daemon.md @@ -8,7 +8,7 @@ one daemon serves any number of incoming and outgoing tunnels from one endpoint ``` dumbpipe daemon [-c ] run -dumbpipe daemon [-c ] install | uninstall | start | stop +dumbpipe daemon [-c ] install | uninstall | start | stop | status dumbpipe daemon [-c ] accept [--token ] [--secure] dumbpipe daemon [-c ] connect : [--token ] dumbpipe daemon [-c ] show @@ -17,8 +17,8 @@ dumbpipe daemon [-c ] show A subcommand is required; `dumbpipe daemon` with none prints help. - `run` runs the daemon in the foreground. -- `install` / `uninstall` / `start` / `stop` manage the daemon as a user-level - service (see [Running as a service](#running-as-a-service)). +- `install` / `uninstall` / `start` / `stop` / `status` manage the daemon as a + user-level service (see [Running as a service](#running-as-a-service)). - `accept` / `connect` edit the config file and do not run anything. - `show` prints the configured tunnels. @@ -109,6 +109,7 @@ macOS, and so on): ``` dumbpipe daemon -c config.toml install # install a service that runs `daemon run -c ` dumbpipe daemon start # start it +dumbpipe daemon status # not installed | stopped | running dumbpipe daemon stop # stop it dumbpipe daemon uninstall # remove it ``` diff --git a/src/daemon.rs b/src/daemon.rs index d1b422d..bafe246 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -32,7 +32,7 @@ use notify::Watcher; use serde::{Deserialize, Serialize}; use service_manager::{ RestartPolicy, ServiceInstallCtx, ServiceLabel, ServiceLevel, ServiceManager, ServiceStartCtx, - ServiceStopCtx, ServiceUninstallCtx, + ServiceStatus, ServiceStatusCtx, ServiceStopCtx, ServiceUninstallCtx, }; use tokio::{ io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, @@ -117,6 +117,9 @@ pub enum DaemonCommand { /// Stop the running daemon service. Stop, + /// Show the daemon service status. + Status, + /// Run the daemon in the foreground. Run, @@ -256,6 +259,7 @@ pub(crate) async fn run(args: DaemonArgs) -> Result<()> { DaemonCommand::Uninstall => cmd_uninstall(), DaemonCommand::Start => cmd_service_start(), DaemonCommand::Stop => cmd_service_stop(), + DaemonCommand::Status => cmd_service_status(), DaemonCommand::Run => run_foreground(config_path, args.common).await, DaemonCommand::Accept(cmd) => cmd_accept(&config_path, cmd), DaemonCommand::Connect(cmd) => cmd_connect(&config_path, cmd), @@ -351,6 +355,22 @@ fn cmd_service_stop() -> Result<()> { Ok(()) } +/// Prints the daemon service status. +fn cmd_service_status() -> Result<()> { + let manager = service_manager()?; + let label = service_label(); + let status = manager + .status(ServiceStatusCtx { label }) + .std_context("failed to query service status")?; + match status { + ServiceStatus::NotInstalled => println!("not installed"), + ServiceStatus::Running => println!("running"), + ServiceStatus::Stopped(Some(reason)) => println!("stopped: {reason}"), + ServiceStatus::Stopped(None) => println!("stopped"), + } + Ok(()) +} + /// Prints the configured connect and accept tunnels. fn cmd_show(config_path: &Path) -> Result<()> { let config = load_or_default_config(config_path)?; From e8101d0ac4a00a9007addb309ef7b385962a3a9b Mon Sep 17 00:00:00 2001 From: Frando Date: Tue, 23 Jun 2026 14:13:37 +0200 Subject: [PATCH 6/7] improve errors --- docs/daemon.md | 11 ++++++++--- src/daemon.rs | 44 ++++++++++++++++++++++++++------------------ src/main.rs | 4 +++- 3 files changed, 37 insertions(+), 22 deletions(-) diff --git a/docs/daemon.md b/docs/daemon.md index ca92d71..2f4a375 100644 --- a/docs/daemon.md +++ b/docs/daemon.md @@ -116,9 +116,14 @@ dumbpipe daemon uninstall # remove it `install` records the absolute config path so the service finds it regardless of its working directory, and enables start at login. The service runs `daemon run` -in the foreground under the service manager. User-level services are not -supported on every platform; `install` fails with an explanation where they are -not. +in the foreground under the service manager. No elevated permissions are +required: the service is installed at the user level (a systemd user unit, a +launchd user agent, and so on). + +User-level services are not supported on every platform; failures print the +underlying service-manager error. On macOS in particular, launchd only loads a +user agent inside a GUI login session, so `install` over SSH fails: run it from a +Terminal in the desktop session. ## Reloading diff --git a/src/daemon.rs b/src/daemon.rs index bafe246..75685ee 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -292,24 +292,32 @@ fn cmd_install(config_path: &Path) -> Result<()> { // working directory it is launched in. let config = std::path::absolute(config_path) .with_std_context(|_| format!("could not resolve config path {}", config_path.display()))?; - manager - .install(ServiceInstallCtx { - label: label.clone(), - program, - args: vec![ - OsString::from("daemon"), - OsString::from("run"), - OsString::from("-c"), - config.clone().into_os_string(), - ], - contents: None, - username: None, - working_directory: None, - environment: None, - autostart: true, - restart_policy: RestartPolicy::default(), - }) - .std_context("failed to install service")?; + let result = manager.install(ServiceInstallCtx { + label: label.clone(), + program, + args: vec![ + OsString::from("daemon"), + OsString::from("run"), + OsString::from("-c"), + config.clone().into_os_string(), + ], + contents: None, + username: None, + working_directory: None, + environment: None, + autostart: true, + restart_policy: RestartPolicy::default(), + }); + if result.is_err() && cfg!(target_os = "macos") { + // A user-level launchd agent only loads in a GUI login session, which an + // SSH session lacks. No elevated permissions are needed. + eprintln!( + "note: a user service uses launchd, which only loads agents in a GUI \ + login session that an SSH session does not have. Run `dumbpipe daemon \ + install` from a Terminal in the desktop session; sudo is not required." + ); + } + result.std_context("failed to install service")?; println!("installed dumbpipe daemon service {label}"); println!("config: {}", config.display()); println!("start it with: dumbpipe daemon start"); diff --git a/src/main.rs b/src/main.rs index 09b2257..d1314ef 100644 --- a/src/main.rs +++ b/src/main.rs @@ -920,7 +920,9 @@ async fn main() -> Result<()> { match res { Ok(()) => std::process::exit(0), Err(e) => { - eprintln!("error: {e}"); + // The alternate form prints the full source chain, so wrapped errors + // (e.g. the underlying launchctl/systemctl failure) stay visible. + eprintln!("error: {e:#}"); std::process::exit(1) } } From e3aa7492e750aa72905780c5a01084a4de37c882 Mon Sep 17 00:00:00 2001 From: Frando Date: Tue, 23 Jun 2026 15:42:05 +0200 Subject: [PATCH 7/7] feat: install the daemon service over SSH on macOS service-manager loads the launchd agent with `launchctl load`, which targets the GUI login session and aborts over SSH. Fall back to managing the agent in the per-user (Background) launchd domain that an SSH session has, so install/start/stop/status/uninstall work over SSH without elevated permissions. status also checks the per-user domain, since service-manager queries the GUI domain and would report not installed. --- docs/daemon.md | 13 +++- src/daemon.rs | 179 +++++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 160 insertions(+), 32 deletions(-) diff --git a/docs/daemon.md b/docs/daemon.md index 2f4a375..daa8551 100644 --- a/docs/daemon.md +++ b/docs/daemon.md @@ -120,10 +120,17 @@ in the foreground under the service manager. No elevated permissions are required: the service is installed at the user level (a systemd user unit, a launchd user agent, and so on). +On macOS the commands also work over SSH. The default `launchctl load` path +targets the GUI login session, which an SSH connection does not have, so the +commands fall back to managing the agent in the per-user (Background) launchd +domain (`user/`) that an SSH session does have. This still needs no elevated +permissions. One caveat: a per-user agent runs while that user has a session and +is not guaranteed to restart after a reboot with nobody logged in; for +unconditional restart on a headless machine, a system-level daemon is required, +which does need elevated permissions to install. + User-level services are not supported on every platform; failures print the -underlying service-manager error. On macOS in particular, launchd only loads a -user agent inside a GUI login session, so `install` over SSH fails: run it from a -Terminal in the desktop session. +underlying service-manager error. ## Reloading diff --git a/src/daemon.rs b/src/daemon.rs index 75685ee..2628c41 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -292,7 +292,7 @@ fn cmd_install(config_path: &Path) -> Result<()> { // working directory it is launched in. let config = std::path::absolute(config_path) .with_std_context(|_| format!("could not resolve config path {}", config_path.display()))?; - let result = manager.install(ServiceInstallCtx { + let installed = manager.install(ServiceInstallCtx { label: label.clone(), program, args: vec![ @@ -308,16 +308,12 @@ fn cmd_install(config_path: &Path) -> Result<()> { autostart: true, restart_policy: RestartPolicy::default(), }); - if result.is_err() && cfg!(target_os = "macos") { - // A user-level launchd agent only loads in a GUI login session, which an - // SSH session lacks. No elevated permissions are needed. - eprintln!( - "note: a user service uses launchd, which only loads agents in a GUI \ - login session that an SSH session does not have. Run `dumbpipe daemon \ - install` from a Terminal in the desktop session; sudo is not required." - ); - } - result.std_context("failed to install service")?; + // service-manager loads the agent via `launchctl load`, which aborts over + // SSH because that targets the GUI domain. The plist is written before that + // step, so fall back to bootstrapping it into the per-user domain. + #[cfg(target_os = "macos")] + let installed = installed.or_else(|_| macos::install(&label)); + installed.std_context("failed to install service")?; println!("installed dumbpipe daemon service {label}"); println!("config: {}", config.display()); println!("start it with: dumbpipe daemon start"); @@ -328,11 +324,12 @@ fn cmd_install(config_path: &Path) -> Result<()> { fn cmd_uninstall() -> Result<()> { let manager = service_manager()?; let label = service_label(); - manager - .uninstall(ServiceUninstallCtx { - label: label.clone(), - }) - .std_context("failed to uninstall service")?; + let removed = manager.uninstall(ServiceUninstallCtx { + label: label.clone(), + }); + #[cfg(target_os = "macos")] + let removed = removed.or_else(|_| macos::uninstall(&label)); + removed.std_context("failed to uninstall service")?; println!("uninstalled dumbpipe daemon service {label}"); Ok(()) } @@ -341,11 +338,12 @@ fn cmd_uninstall() -> Result<()> { fn cmd_service_start() -> Result<()> { let manager = service_manager()?; let label = service_label(); - manager - .start(ServiceStartCtx { - label: label.clone(), - }) - .std_context("failed to start service")?; + let started = manager.start(ServiceStartCtx { + label: label.clone(), + }); + #[cfg(target_os = "macos")] + let started = started.or_else(|_| macos::start(&label)); + started.std_context("failed to start service")?; println!("started dumbpipe daemon service {label}"); Ok(()) } @@ -354,11 +352,12 @@ fn cmd_service_start() -> Result<()> { fn cmd_service_stop() -> Result<()> { let manager = service_manager()?; let label = service_label(); - manager - .stop(ServiceStopCtx { - label: label.clone(), - }) - .std_context("failed to stop service")?; + let stopped = manager.stop(ServiceStopCtx { + label: label.clone(), + }); + #[cfg(target_os = "macos")] + let stopped = stopped.or_else(|_| macos::stop(&label)); + stopped.std_context("failed to stop service")?; println!("stopped dumbpipe daemon service {label}"); Ok(()) } @@ -367,9 +366,17 @@ fn cmd_service_stop() -> Result<()> { fn cmd_service_status() -> Result<()> { let manager = service_manager()?; let label = service_label(); - let status = manager - .status(ServiceStatusCtx { label }) - .std_context("failed to query service status")?; + let status = manager.status(ServiceStatusCtx { + label: label.clone(), + }); + // service-manager queries the GUI domain and reports a user-domain service as + // not installed, so on macOS also check the per-user domain. + #[cfg(target_os = "macos")] + let status = match status { + Ok(ServiceStatus::NotInstalled) | Err(_) => macos::status(&label), + installed => installed, + }; + let status = status.std_context("failed to query service status")?; match status { ServiceStatus::NotInstalled => println!("not installed"), ServiceStatus::Running => println!("running"), @@ -379,6 +386,120 @@ fn cmd_service_status() -> Result<()> { Ok(()) } +/// macOS fallbacks that manage the launchd agent in the per-user (Background) +/// domain. +/// +/// An SSH session has the `user/` domain but not the `gui/` domain +/// that `launchctl load` (used by service-manager) targets, so the standard +/// path aborts over SSH. These run the equivalent modern, domain-targeted +/// `launchctl` commands against `user/`. No elevated permissions are +/// required. +#[cfg(target_os = "macos")] +mod macos { + use std::{io, path::PathBuf, process::Command}; + + use service_manager::{ServiceLabel, ServiceStatus}; + + /// Runs `launchctl` with `args`, turning a non-zero exit into an error that + /// carries the command's stderr. + fn launchctl(args: &[&str]) -> io::Result<()> { + let output = Command::new("launchctl").args(args).output()?; + if output.status.success() { + Ok(()) + } else { + Err(io::Error::other(format!( + "launchctl {} failed: {}", + args.join(" "), + String::from_utf8_lossy(&output.stderr).trim() + ))) + } + } + + /// Returns the current user id via `id -u`. + fn uid() -> io::Result { + let output = Command::new("id").arg("-u").output()?; + if !output.status.success() { + return Err(io::Error::other("failed to run `id -u`")); + } + String::from_utf8_lossy(&output.stdout) + .trim() + .parse() + .map_err(|_| io::Error::other("could not parse the user id")) + } + + /// The path of the launchd agent plist that service-manager writes. + fn plist_path(label: &ServiceLabel) -> io::Result { + let home = dirs::home_dir() + .ok_or_else(|| io::Error::other("could not find the home directory"))?; + Ok(home + .join("Library") + .join("LaunchAgents") + .join(format!("{}.plist", label.to_qualified_name()))) + } + + /// The `user//