From 5219faa20fb5c94bcfde1f29bc76847cf9584b48 Mon Sep 17 00:00:00 2001 From: h7x4 Date: Sat, 23 May 2026 03:12:03 +0900 Subject: [PATCH] Initial commit --- .envrc | 1 + .gitignore | 3 + Cargo.lock | 783 ++++++++++++++++++++++++++++ Cargo.toml | 47 ++ LICENSE | 21 + README.md | 67 +++ flake.lock | 48 ++ flake.nix | 81 +++ nix/default.nix | 34 ++ src/bin/bro-client.rs | 574 +++++++++++++++++++++ src/bin/bro-server.rs | 1147 +++++++++++++++++++++++++++++++++++++++++ src/common.rs | 71 +++ src/lib.rs | 2 + src/protocol.rs | 828 +++++++++++++++++++++++++++++ 14 files changed, 3707 insertions(+) create mode 100644 .envrc create mode 100644 .gitignore create mode 100644 Cargo.lock create mode 100644 Cargo.toml create mode 100644 LICENSE create mode 100644 README.md create mode 100644 flake.lock create mode 100644 flake.nix create mode 100644 nix/default.nix create mode 100644 src/bin/bro-client.rs create mode 100644 src/bin/bro-server.rs create mode 100644 src/common.rs create mode 100644 src/lib.rs create mode 100644 src/protocol.rs diff --git a/.envrc b/.envrc new file mode 100644 index 0000000..3550a30 --- /dev/null +++ b/.envrc @@ -0,0 +1 @@ +use flake diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..27012f2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +/target +result +result-* diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..8d7e332 --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,783 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "aho-corasick" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddd31a130427c27518df266943a5308ed92d4b226cc639f5a8f1002816174301" +dependencies = [ + "memchr", +] + +[[package]] +name = "anstream" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "824a212faf96e9acacdbd09febd34438f8f711fb84e09a8916013cd7815ca28d" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "940b3a0ca603d1eade50a4846a2afffd5ef57a9feac2c0e2ec2e14f9ead76000" + +[[package]] +name = "anstyle-parse" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52ce7f38b242319f7cabaa6813055467063ecdc9d355bbb4ce0c68908cd8130e" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" +dependencies = [ + "windows-sys", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" +dependencies = [ + "anstyle", + "once_cell_polyfill", + "windows-sys", +] + +[[package]] +name = "anyhow" +version = "1.0.102" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" + +[[package]] +name = "bitflags" +version = "2.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4512299f36f043ab09a583e57bceb5a5aab7a73db1805848e8fef3c9e8c78b3" + +[[package]] +name = "bro" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap", + "log", + "nix", + "sd-notify", + "sendfd", + "serde", + "serde_json", + "signal-hook", + "tempfile", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "cfg-if" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" + +[[package]] +name = "cfg_aliases" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + +[[package]] +name = "clap" +version = "4.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ddb117e43bbf7dacf0a4190fef4d345b9bad68dfc649cb349e7d17d28428e51" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "714a53001bf66416adb0e2ef5ac857140e7dc3a0c48fb28b2f10762fc4b5069f" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2ce8604710f6733aa641a2b3731eaa1e8b3d9973d5e3565da11800813f997a9" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "clap_lex" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9" + +[[package]] +name = "colorchoice" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d07550c9036bf2ae0c684c4297d503f838287c83c53686d05370d0e139ae570" + +[[package]] +name = "equivalent" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" + +[[package]] +name = "errno" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" +dependencies = [ + "libc", + "windows-sys", +] + +[[package]] +name = "fastrand" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6" + +[[package]] +name = "foldhash" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" + +[[package]] +name = "getrandom" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de51e6874e94e7bf76d726fc5d13ba782deca734ff60d5bb2fb2607c7406555" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "wasip2", + "wasip3", +] + +[[package]] +name = "hashbrown" +version = "0.15.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" +dependencies = [ + "foldhash", +] + +[[package]] +name = "hashbrown" +version = "0.17.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed5909b6e89a2db4456e54cd5f673791d7eca6732202bbf2a9cc504fe2f9b84a" + +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + +[[package]] +name = "id-arena" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d3067d79b975e8844ca9eb072e16b31c3c1c36928edf9c6789548c524d0d954" + +[[package]] +name = "indexmap" +version = "2.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d466e9454f08e4a911e14806c24e16fba1b4c121d1ea474396f396069cf949d9" +dependencies = [ + "equivalent", + "hashbrown 0.17.1", + "serde", + "serde_core", +] + +[[package]] +name = "is_terminal_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" + +[[package]] +name = "itoa" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" + +[[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" + +[[package]] +name = "leb128fmt" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" + +[[package]] +name = "libc" +version = "0.2.186" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68ab91017fe16c622486840e4c83c9a37afeff978bd239b5293d61ece587de66" + +[[package]] +name = "linux-raw-sys" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53" + +[[package]] +name = "log" +version = "0.4.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" + +[[package]] +name = "matchers" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" +dependencies = [ + "regex-automata", +] + +[[package]] +name = "memchr" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" + +[[package]] +name = "nix" +version = "0.31.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf20d2fde8ff38632c426f1165ed7436270b44f199fc55284c38276f9db47c3d" +dependencies = [ + "bitflags", + "cfg-if", + "cfg_aliases", + "libc", +] + +[[package]] +name = "nu-ansi-term" +version = "0.50.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" +dependencies = [ + "windows-sys", +] + +[[package]] +name = "once_cell" +version = "1.21.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" + +[[package]] +name = "once_cell_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" + +[[package]] +name = "pin-project-lite" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" + +[[package]] +name = "prettyplease" +version = "0.2.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" +dependencies = [ + "proc-macro2", + "syn", +] + +[[package]] +name = "proc-macro2" +version = "1.0.106" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fd00f0bb2e90d81d1044c2b32617f68fcb9fa3bb7640c23e9c748e53fb30934" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41f2619966050689382d2b44f664f4bc593e129785a36d6ee376ddf37259b924" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "r-efi" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" + +[[package]] +name = "regex-automata" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e1dd4122fc1595e8162618945476892eefca7b88c52820e74af6262213cae8f" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" + +[[package]] +name = "rustix" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6fe4565b9518b83ef4f91bb47ce29620ca828bd32cb7e408f0062e9930ba190" +dependencies = [ + "bitflags", + "errno", + "libc", + "linux-raw-sys", + "windows-sys", +] + +[[package]] +name = "sd-notify" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e4ef7359e694bfaf1dd27a30f9d760b54c00dfae9f19bd0c05a39bc9128fe76" +dependencies = [ + "libc", +] + +[[package]] +name = "semver" +version = "1.0.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a7852d02fc848982e0c167ef163aaff9cd91dc640ba85e263cb1ce46fae51cd" + +[[package]] +name = "sendfd" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b183bfd5b1bc64ab0c1ef3ee06b008a9ef1b68a7d3a99ba566fbfe7a7c6d745b" +dependencies = [ + "libc", +] + +[[package]] +name = "serde" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", + "serde_derive", +] + +[[package]] +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.150" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8014e44b4736ed0538adeecded0fce2a272f22dc9578a7eb6b2d9993c74cfb9" +dependencies = [ + "itoa", + "memchr", + "serde", + "serde_core", + "zmij", +] + +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + +[[package]] +name = "signal-hook" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2a0c28ca5908dbdbcd52e6fdaa00358ab88637f8ab33e1f188dd510eb44b53d" +dependencies = [ + "libc", + "signal-hook-registry", +] + +[[package]] +name = "signal-hook-registry" +version = "1.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4db69cba1110affc0e9f7bcd48bbf87b3f4fc7c61fc9155afd4c469eb3d6c1b" +dependencies = [ + "errno", + "libc", +] + +[[package]] +name = "smallvec" +version = "1.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" + +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + +[[package]] +name = "syn" +version = "2.0.117" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e665b8803e7b1d2a727f4023456bbbbe74da67099c585258af0ad9c5013b9b99" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "tempfile" +version = "3.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd" +dependencies = [ + "fastrand", + "getrandom", + "once_cell", + "rustix", + "windows-sys", +] + +[[package]] +name = "thread_local" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "tracing" +version = "0.1.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" +dependencies = [ + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" +dependencies = [ + "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex-automata", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", +] + +[[package]] +name = "unicode-ident" +version = "1.0.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" + +[[package]] +name = "unicode-xid" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" + +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + +[[package]] +name = "wasip2" +version = "1.0.3+wasi-0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20064672db26d7cdc89c7798c48a0fdfac8213434a1186e5ef29fd560ae223d6" +dependencies = [ + "wit-bindgen 0.57.1", +] + +[[package]] +name = "wasip3" +version = "0.4.0+wasi-0.3.0-rc-2026-01-06" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5428f8bf88ea5ddc08faddef2ac4a67e390b88186c703ce6dbd955e1c145aca5" +dependencies = [ + "wit-bindgen 0.51.0", +] + +[[package]] +name = "wasm-encoder" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "990065f2fe63003fe337b932cfb5e3b80e0b4d0f5ff650e6985b1048f62c8319" +dependencies = [ + "leb128fmt", + "wasmparser", +] + +[[package]] +name = "wasm-metadata" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909" +dependencies = [ + "anyhow", + "indexmap", + "wasm-encoder", + "wasmparser", +] + +[[package]] +name = "wasmparser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" +dependencies = [ + "bitflags", + "hashbrown 0.15.5", + "indexmap", + "semver", +] + +[[package]] +name = "windows-link" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" + +[[package]] +name = "windows-sys" +version = "0.61.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae137229bcbd6cdf0f7b80a31df61766145077ddf49416a728b02cb3921ff3fc" +dependencies = [ + "windows-link", +] + +[[package]] +name = "wit-bindgen" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5" +dependencies = [ + "wit-bindgen-rust-macro", +] + +[[package]] +name = "wit-bindgen" +version = "0.57.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ebf944e87a7c253233ad6766e082e3cd714b5d03812acc24c318f549614536e" + +[[package]] +name = "wit-bindgen-core" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea61de684c3ea68cb082b7a88508a8b27fcc8b797d738bfc99a82facf1d752dc" +dependencies = [ + "anyhow", + "heck", + "wit-parser", +] + +[[package]] +name = "wit-bindgen-rust" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21" +dependencies = [ + "anyhow", + "heck", + "indexmap", + "prettyplease", + "syn", + "wasm-metadata", + "wit-bindgen-core", + "wit-component", +] + +[[package]] +name = "wit-bindgen-rust-macro" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c0f9bfd77e6a48eccf51359e3ae77140a7f50b1e2ebfe62422d8afdaffab17a" +dependencies = [ + "anyhow", + "prettyplease", + "proc-macro2", + "quote", + "syn", + "wit-bindgen-core", + "wit-bindgen-rust", +] + +[[package]] +name = "wit-component" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" +dependencies = [ + "anyhow", + "bitflags", + "indexmap", + "log", + "serde", + "serde_derive", + "serde_json", + "wasm-encoder", + "wasm-metadata", + "wasmparser", + "wit-parser", +] + +[[package]] +name = "wit-parser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736" +dependencies = [ + "anyhow", + "id-arena", + "indexmap", + "log", + "semver", + "serde", + "serde_derive", + "serde_json", + "unicode-xid", + "wasmparser", +] + +[[package]] +name = "zmij" +version = "1.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..c9bd4e1 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,47 @@ +[package] +name = "bro" +version = "0.1.0" +edition = "2024" +license = "MIT" +authors = [ + "projects@pvv.ntnu.no", +] +homepage = "https://git.pvv.ntnu.no/Projects/bro" +repository = "https://git.pvv.ntnu.no/Projects/bro" +description = "Remote execution tool" +readme = "README.md" +autobins = false +autolib = false + +[lib] +path = "src/lib.rs" +bench = false + +[dependencies] +anyhow = "1.0.102" +clap = { version = "4.6.1", features = ["derive", "env"] } +sd-notify = "0.5.0" +serde = { version = "1.0.228", features = ["derive"] } +log = "0.4.28" +nix = { version = "0.31.3", features = ["poll", "event"] } +sendfd = "0.4.4" +serde_json = "1.0.150" +signal-hook = "0.4.4" +tempfile = "3.27.0" +tracing = "0.1.44" +tracing-subscriber = { version = "0.3.23", features = ["env-filter", "fmt"] } + +[[bin]] +name = "bro-client" +bench = false +path = "src/bin/bro-client.rs" + +[[bin]] +name = "bro-server" +bench = false +path = "src/bin/bro-server.rs" + +[profile.release] +strip = true +lto = true +codegen-units = 1 diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..cfbaa20 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2026 Programvareverkstedet + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..f5d59bf --- /dev/null +++ b/README.md @@ -0,0 +1,67 @@ +# bro + +bro is a remote execution tool for Linux. It is built to execute commands outside of a sandbox. It does this by transporting all details of the command (argv, stdin, etc.) to a remote server, which executes and returns the results (stdout, stderr, and exit code). + +## Usage + +### Client + +The client is for the most part configured only via environment variables. This is to make sure that any arguments and stdin contents can be passed to the remote server without any issues. The configuration variables are: + +| Variable | Description | +| --- | --- | +| `BRO_SOCKET_PATH` (required) | The path to the socket file of the bro server. | +| `BRO_FORWARD_ENV` (optional) | A comma-separated list of environment variables to forward to the remote server. | +| `BRO_FILE_FLAGS` (optional) | A comma-separated list of flags that might have a value that is a file path, and which should be forwarded to the remote server. Note that the values include dashes, e.g. `-f,--file`. | +| `BRO_FILE_ARGS` (optional) | If set to a non-empty value, all non-flag arguments that look like file paths, smell like file paths, or can somehow be opened as a file will be forwarded to the remote server. | +| `BRO_CAPTURE_TTY_STDIN` (optional) | By default, bro will only capture stdin if it detects that it is passed through a pipe or redirection of some sort. Setting this variable to a non-empty value will make bro capture stdin also when it is a TTY. Useful for interactive use. | + +The intended way to set these variables is by creating a wrapper script around the `bro-client` executable. That also lets you name this wrapper script the same as the executable you want to call remotely, so that you can transparently replace it. + +Here is an example wrapper for the `ls` command: + +```bash +#!/bin/bash + +export BRO_SOCKET_PATH="/run/bro.sock" +export BRO_FORWARD_ENV="LS_COLORS,TIME_STYLE,QUOTING_STYLE" +export BRO_FILE_ARGS=1 + +exec bro-client "$@" +``` + +#### A note on debugging + +You can set the `RUST_LOG` environment variable to `debug` to get debug logs from the client. The logs will be written to stderr. + +### Server + +The server can be configured via command line flags (or corresponding environment variables, see `--help`). + +Here are some of the more important flags: + +| Flag | Description | +| --- | --- | +| `--executable` (required) | The path to the executable that the server will run. | +| `--socket-path` (required*) | The path to the socket file that the server will listen on. This should be omitted if you are using socket activation. | +| `--systemd-socket` (required*) | If set, the server will look for a socket fd provided by systemd. This should be omitted if you are setting the socket path. | +| `--fd-passing` (optional) | If set, the server will use file descriptor passing mode instead of the default file content passing mode. | +| `--allowed-env` (optional) | A comma-separated whitelist of client-forwarded environment variables that the server may pass through to the spawned process. | +| `--inherit-env` (optional) | A comma-separated whitelist of serverside environment variables to inherit into the spawned process. By default, the spawned process inherits none of the server's environment variables. | + +#### Systemd socket activation + +The server can be run as a [systemd socket-activated service](http://0pointer.de/blog/projects/socket-activation.html). +This is the recommended way to run the server. + +#### File descriptor passing mode + +In the default mode, the client will take its stdin and any files that are passed via arguments and read them fully before sending them to the server. +The server will then write these contents (apart from stdin) to temporary files and rewrite the arguments to point to these files before executing the command. +This is relatively OS-agnostic, but for some applications it will introduce unnecessary overhead, especially if the executable being called is only going +to read a small portion of the files or stdin. + +In file descriptor passing mode, the client uses the `SCM_RIGHTS` mechanism to transfer the file descriptors of its stdin and any files passed via arguments to the server. The server can then use these file descriptors directly when executing the command, without needing to read and write the contents to temporary files. +This mode might be more efficient for certain applications, but it is also heavily Linux-specific. + +If you use this mode with systemd socket activation, remember to set the `AcceptFileDescriptors=` option in the socket unit file. diff --git a/flake.lock b/flake.lock new file mode 100644 index 0000000..1fcdfee --- /dev/null +++ b/flake.lock @@ -0,0 +1,48 @@ +{ + "nodes": { + "nixpkgs": { + "locked": { + "lastModified": 1778869304, + "narHash": "sha256-30sZNZoA1cqF5JNO9fVX+wgiQYjB7HJqqJ4ztCDeBZE=", + "owner": "NixOS", + "repo": "nixpkgs", + "rev": "d233902339c02a9c334e7e593de68855ad26c4cb", + "type": "github" + }, + "original": { + "owner": "NixOS", + "ref": "nixos-unstable", + "repo": "nixpkgs", + "type": "github" + } + }, + "root": { + "inputs": { + "nixpkgs": "nixpkgs", + "rust-overlay": "rust-overlay" + } + }, + "rust-overlay": { + "inputs": { + "nixpkgs": [ + "nixpkgs" + ] + }, + "locked": { + "lastModified": 1779419951, + "narHash": "sha256-dMX0PUslUHPajP6o8FEoRdFv9afq/dec4POR0vVfjK4=", + "owner": "oxalica", + "repo": "rust-overlay", + "rev": "5b5c521d6cae9ef4aa32f888eb2c0ce595c9be52", + "type": "github" + }, + "original": { + "owner": "oxalica", + "repo": "rust-overlay", + "type": "github" + } + } + }, + "root": "root", + "version": 7 +} diff --git a/flake.nix b/flake.nix new file mode 100644 index 0000000..1742d11 --- /dev/null +++ b/flake.nix @@ -0,0 +1,81 @@ +{ + inputs = { + nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable"; + + rust-overlay.url = "github:oxalica/rust-overlay"; + rust-overlay.inputs.nixpkgs.follows = "nixpkgs"; + }; + + outputs = { self, nixpkgs, rust-overlay }: + let + inherit (nixpkgs) lib; + + systems = [ + "x86_64-linux" + "aarch64-linux" + "x86_64-darwin" + "aarch64-darwin" + "armv7l-linux" + ]; + + forAllSystems = f: lib.genAttrs systems (system: let + pkgs = import nixpkgs { + inherit system; + overlays = [ + (import rust-overlay) + ]; + }; + + rust-bin = rust-overlay.lib.mkRustBin { } pkgs.buildPackages; + toolchain = rust-bin.stable.latest.default.override { + extensions = [ "rust-src" "rust-analyzer" "rust-std" ]; + }; + in f system pkgs toolchain); + in { + devShells = forAllSystems (system: pkgs: toolchain: { + default = pkgs.mkShell { + nativeBuildInputs = with pkgs; [ + toolchain + + cargo-nextest + cargo-edit + cargo-deny + ]; + + env.RUST_SRC_PATH = "${toolchain}/lib/rustlib/src/rust/library"; + }; + }); + + overlays = { + default = self.overlays.bro; + bro = final: prev: { + inherit (self.packages.${prev.stdenv.hostPlatform.system}) bro; + }; + }; + + packages = forAllSystems (system: pkgs: _: + let + cargoToml = fromTOML (builtins.readFile ./Cargo.toml); + cargoLock = ./Cargo.lock; + src = lib.fileset.toSource { + root = ./.; + fileset = lib.fileset.unions [ + ./Cargo.toml + ./Cargo.lock + ./src + ]; + }; + in { + default = self.packages.${system}.bro; + + bro = pkgs.callPackage ./nix/default.nix { inherit cargoToml cargoLock src; }; + filteredSource = pkgs.runCommandLocal "filtered-source" { } '' + ln -s ${src} $out + ''; + }); + + checks = forAllSystems (system: pkgs: _: { + inherit (self.packages.${system}) bro; + }); + }; +} diff --git a/nix/default.nix b/nix/default.nix new file mode 100644 index 0000000..14a6009 --- /dev/null +++ b/nix/default.nix @@ -0,0 +1,34 @@ +{ + lib +, rustPlatform +, stdenv +, installShellFiles +, versionCheckHook + +, cargoToml +, cargoLock +, src +}: +let + mainProgram = (lib.head cargoToml.bin).name; + pname = cargoToml.package.name; +in +rustPlatform.buildRustPackage { + inherit pname; + inherit (cargoToml.package) version; + inherit src; + + cargoLock.lockFile = cargoLock; + + # doCheck = true; + # useNextest = true; + # nativeCheckInputs = [ + # versionCheckHook + # ]; + + meta = with lib; { + license = licenses.mit; + platforms = platforms.linux; + inherit mainProgram; + }; +} diff --git a/src/bin/bro-client.rs b/src/bin/bro-client.rs new file mode 100644 index 0000000..e796898 --- /dev/null +++ b/src/bin/bro-client.rs @@ -0,0 +1,574 @@ +use anyhow::{Context, Result, bail}; +use bro::{ + common::{copy_exact, env_var_is_set, env_var_is_truthy, init_tracing, parse_csv_env}, + protocol::*, +}; +use sendfd::SendWithFd; +use std::{ + collections::{BTreeMap, HashMap}, + env, + fs::File, + io::{self, IsTerminal, Seek, Write}, + os::{ + fd::{AsRawFd, RawFd}, + unix::net::UnixStream, + }, + path::{Path, PathBuf}, +}; +use tempfile::NamedTempFile; + +fn main() { + if let Err(error) = init_tracing("bro-client") { + eprintln!("bro-client: failed to initialize tracing: {error:#}"); + std::process::exit(WRAPPER_ERROR_EXIT_CODE); + } + + let exit_code = match run() { + Ok(code) => code, + Err(error) => { + log::error!("bro-client failed: {error:#}"); + WRAPPER_ERROR_EXIT_CODE + } + }; + + log::info!("bro-client exiting with code {exit_code}"); + std::process::exit(exit_code); +} + +fn run() -> Result { + let options = ClientOptions::from_env()?; + log::info!( + "starting bro-client request: socket={} args={} forward_env={} file_flags={} bro_file_args_enabled={}", + options.socket_path.display(), + options.target_args.len(), + options.forward_env.len(), + options.file_flags.len(), + options.file_args_enabled + ); + if options.file_args_enabled { + log::info!( + "BRO_FILE_ARGS is enabled; bro-client will probe non-flag arguments and auto-forward readable regular files" + ); + } else { + log::debug!( + "BRO_FILE_ARGS is disabled; bro-client will only forward paths matched via BRO_FILE_FLAGS" + ); + } + if !options.file_flags.is_empty() { + log::debug!("BRO_FILE_FLAGS configured as: {:?}", options.file_flags); + } + + let transports = choose_request_transports(&options.socket_path); + log::info!( + "selected request transports: uploads={:?} stdin={:?} responses={:?}", + transports.upload_transport, + transports.stdin_transport, + transports.response_transport + ); + + let mut stdin = prepare_stdin(transports.stdin_transport)?; + let request = build_request_header( + &options, + stdin.stdin_size(), + transports.upload_transport, + transports.stdin_transport, + transports.response_transport, + )?; + log::info!( + "prepared request: uploads={} rewrites={} stdin_size={:?}", + request.uploads.len(), + request.header.rewrites.len(), + request.header.stdin_size, + ); + + log::info!( + "connecting to bro-server socket at {}", + options.socket_path.display() + ); + let mut stream = UnixStream::connect(&options.socket_path).with_context(|| { + format!( + "failed to connect to broker socket `{}`", + options.socket_path.display() + ) + })?; + log::info!( + "connected to bro-server socket at {}", + options.socket_path.display() + ); + + write_execute_magic(&mut stream)?; + write_execute_request_header(&mut stream, &request.header)?; + + if request.header.upload_transport == UploadTransport::StreamedBytes { + send_uploaded_file_bytes(&mut stream, &request.uploads)?; + } + + let fd_uploads: &[PreparedUpload] = + if request.header.upload_transport == UploadTransport::PassedFileDescriptors { + &request.uploads[..] + } else { + &request.uploads[..0] + }; + send_passed_descriptors(&stream, fd_uploads, stdin.passed_fd())?; + stdin.send_streamed_bytes_if_needed(&mut stream)?; + stream + .shutdown(std::net::Shutdown::Write) + .context("failed to close the client write side")?; + + log::info!("waiting for response frames from bro-server"); + receive_response(stream, request.header.response_transport) +} + +struct ClientOptions { + socket_path: PathBuf, + forward_env: Vec, + file_flags: Vec, + file_args_enabled: bool, + target_args: Vec, +} + +impl ClientOptions { + fn from_env() -> Result { + let socket_path = env::var_os("BRO_SOCKET_PATH") + .map(PathBuf::from) + .context("BRO_SOCKET_PATH must be set")?; + + Ok(Self { + socket_path, + forward_env: parse_csv_env("BRO_FORWARD_ENV"), + file_flags: parse_csv_env("BRO_FILE_FLAGS"), + file_args_enabled: env_var_is_set("BRO_FILE_ARGS"), + target_args: env::args().skip(1).collect(), + }) + } +} + +struct PreparedRequest { + header: RequestHeader, + uploads: Vec, +} + +struct PreparedUpload { + path: PathBuf, + size: u64, + file: File, +} + +struct RequestTransports { + upload_transport: UploadTransport, + stdin_transport: UploadTransport, + response_transport: ResponseTransport, +} + +enum PreparedStdin { + Streamed { spool: NamedTempFile, size: u64 }, + Passed { fd: RawFd, _owner: Option }, +} + +impl PreparedStdin { + fn stdin_size(&self) -> Option { + match self { + Self::Streamed { size, .. } => Some(*size), + Self::Passed { .. } => None, + } + } + + fn passed_fd(&self) -> Option { + match self { + Self::Streamed { .. } => None, + Self::Passed { fd, .. } => Some(*fd), + } + } + + fn send_streamed_bytes_if_needed(&mut self, stream: &mut UnixStream) -> Result<()> { + match self { + Self::Streamed { spool, size } => { + spool + .rewind() + .context("failed to rewind stdin spool before transmission")?; + copy_exact(spool, stream, *size).context("failed to stream stdin to bro-server") + } + Self::Passed { .. } => Ok(()), + } + } +} + +#[derive(Default)] +struct UploadRegistry { + uploads_by_path: HashMap, + uploads: Vec, + prepared_uploads: Vec, + next_upload_id: u32, +} + +impl UploadRegistry { + fn register(&mut self, path_text: &str, file: File, size: u64) -> Result { + let upload_id = self.next_upload_id; + self.next_upload_id = self + .next_upload_id + .checked_add(1) + .context("too many transport files were requested")?; + + self.uploads_by_path.insert(path_text.to_owned(), upload_id); + self.uploads.push(UploadSpec { + id: upload_id, + original_path: path_text.to_owned(), + size, + }); + self.prepared_uploads.push(PreparedUpload { + path: PathBuf::from(path_text), + size, + file, + }); + Ok(upload_id) + } + + fn into_parts(self) -> (Vec, Vec) { + (self.uploads, self.prepared_uploads) + } +} + +impl UploadRegistrar for UploadRegistry { + fn ensure_upload(&mut self, path_text: &str) -> Result { + if let Some(&existing_id) = self.uploads_by_path.get(path_text) { + return Ok(existing_id); + } + + let (file, size) = probe_regular_upload(path_text)? + .with_context(|| format!("failed to open transport file `{path_text}`"))?; + self.register(path_text, file, size) + } + + fn maybe_ensure_upload(&mut self, path_text: &str) -> Result> { + if let Some(&existing_id) = self.uploads_by_path.get(path_text) { + return Ok(Some(existing_id)); + } + + probe_regular_upload(path_text)? + .map(|(file, size)| self.register(path_text, file, size)) + .transpose() + } +} + +fn probe_regular_upload(path_text: &str) -> Result> { + let file = match File::open(path_text) { + Ok(file) => file, + Err(_) => return Ok(None), + }; + let metadata = match file.metadata() { + Ok(metadata) => metadata, + Err(_) => return Ok(None), + }; + + Ok(metadata.is_file().then_some((file, metadata.len()))) +} + +fn choose_request_transports(socket_path: &Path) -> RequestTransports { + match query_server_features(socket_path) { + Ok(server_features) => RequestTransports { + upload_transport: if server_features + .supports_upload_transport(UploadTransport::PassedFileDescriptors) + { + UploadTransport::PassedFileDescriptors + } else { + UploadTransport::StreamedBytes + }, + stdin_transport: if server_features + .supports_stdin_transport(UploadTransport::PassedFileDescriptors) + { + UploadTransport::PassedFileDescriptors + } else { + UploadTransport::StreamedBytes + }, + response_transport: if server_features + .supports_response_transport(ResponseTransport::BinaryFrames) + { + ResponseTransport::BinaryFrames + } else { + ResponseTransport::JsonMessages + }, + }, + Err(error) => { + log::warn!( + "failed to query server features, falling back to streamed byte uploads and stdin: {error:#}" + ); + RequestTransports { + upload_transport: UploadTransport::StreamedBytes, + stdin_transport: UploadTransport::StreamedBytes, + response_transport: ResponseTransport::JsonMessages, + } + } + } +} + +fn query_server_features(socket_path: &Path) -> Result { + let mut stream = UnixStream::connect(socket_path).with_context(|| { + format!( + "failed to connect to broker socket `{}` for server-features query", + socket_path.display() + ) + })?; + write_server_features_magic(&mut stream)?; + let socket_supports_fd_passing = probe_socket_for_fd_passing(&stream); + let mut server_features = read_server_features_response(&mut stream)?; + + if !socket_supports_fd_passing { + let server_advertised_fd_passing = server_features + .supports_upload_transport(UploadTransport::PassedFileDescriptors) + || server_features.supports_stdin_transport(UploadTransport::PassedFileDescriptors); + if server_advertised_fd_passing { + log::warn!( + "bro-server advertised fd passing, but the socket rejected an SCM_RIGHTS probe; falling back to streamed uploads and stdin" + ); + } + server_features + .upload_transports + .retain(|transport| *transport != UploadTransport::PassedFileDescriptors); + server_features + .stdin_transports + .retain(|transport| *transport != UploadTransport::PassedFileDescriptors); + } + + Ok(server_features) +} + +fn probe_socket_for_fd_passing(stream: &UnixStream) -> bool { + let probe_file = match File::open("/dev/null") { + Ok(file) => file, + Err(error) => { + log::warn!( + "failed to open `/dev/null` for SCM_RIGHTS probe; assuming fd passing is unavailable: {error:#}" + ); + return false; + } + }; + + match stream.send_with_fd(&[0_u8], &[probe_file.as_raw_fd()]) { + Ok(1) => { + log::debug!("socket SCM_RIGHTS probe succeeded"); + true + } + Ok(sent) => { + log::warn!( + "socket SCM_RIGHTS probe wrote {sent} probe byte(s) instead of 1; assuming fd passing is unavailable" + ); + false + } + Err(error) => { + log::debug!("socket SCM_RIGHTS probe failed: {error:#}"); + false + } + } +} + +fn build_request_header( + options: &ClientOptions, + stdin_size: Option, + upload_transport: UploadTransport, + stdin_transport: UploadTransport, + response_transport: ResponseTransport, +) -> Result { + let mut upload_registry = UploadRegistry::default(); + let planned_rewrites = plan_request_rewrites( + &options.target_args, + &RequestPlanningOptions { + file_flags: &options.file_flags, + file_args_enabled: options.file_args_enabled, + }, + &mut upload_registry, + )?; + + if !options.file_flags.is_empty() { + log::info!( + "BRO_FILE_FLAGS rewrote {} argument value(s) for transport", + planned_rewrites.stats.file_flag_rewrite_count + ); + } + if options.file_args_enabled { + log::info!( + "BRO_FILE_ARGS examined {} non-flag argument(s) and auto-forwarded {} readable regular file(s)", + planned_rewrites.stats.auto_file_arg_probe_count, + planned_rewrites.stats.auto_file_arg_rewrite_count, + ); + } + + let (uploads, prepared_uploads) = upload_registry.into_parts(); + let forwarded_env = options + .forward_env + .iter() + .filter_map(|key| env::var(key).ok().map(|value| (key.clone(), value))) + .collect::>(); + + Ok(PreparedRequest { + header: RequestHeader { + args: options.target_args.clone(), + env: forwarded_env, + uploads, + rewrites: planned_rewrites.rewrites, + stdin_size, + upload_transport, + stdin_transport, + response_transport, + }, + uploads: prepared_uploads, + }) +} + +fn send_uploaded_file_bytes(stream: &mut UnixStream, uploads: &[PreparedUpload]) -> Result<()> { + for upload in uploads { + let mut source = &upload.file; + copy_exact(&mut source, stream, upload.size).with_context(|| { + format!( + "failed to stream transport file `{}` to bro-server", + upload.path.display() + ) + })?; + } + + Ok(()) +} + +fn send_passed_descriptors( + stream: &UnixStream, + uploads: &[PreparedUpload], + stdin_fd: Option, +) -> Result<()> { + if uploads.is_empty() && stdin_fd.is_none() { + return Ok(()); + } + + let marker_bytes = passed_fd_marker_bytes(uploads.len(), stdin_fd.is_some()); + + let mut fds = uploads + .iter() + .map(|upload| upload.file.as_raw_fd()) + .collect::>(); + if let Some(stdin_fd) = stdin_fd { + fds.push(stdin_fd); + } + + let sent = stream + .send_with_fd(&marker_bytes, &fds) + .context("failed to send passed file descriptors to bro-server")?; + if sent != marker_bytes.len() { + bail!( + "sent {sent} fd marker bytes but expected to send {}", + marker_bytes.len() + ) + } + + Ok(()) +} + +fn prepare_stdin(stdin_transport: UploadTransport) -> Result { + match stdin_transport { + UploadTransport::StreamedBytes => { + let (spool, size) = spool_stdin()?; + Ok(PreparedStdin::Streamed { spool, size }) + } + UploadTransport::PassedFileDescriptors => prepare_passed_stdin(), + } +} + +fn prepare_passed_stdin() -> Result { + let stdin = io::stdin(); + + if stdin.is_terminal() && !env_var_is_truthy("BRO_CAPTURE_TTY_STDIN") { + log::info!( + "stdin is a terminal; using /dev/null as fd-passed stdin to avoid blocking. Set BRO_CAPTURE_TTY_STDIN=1 to pass terminal stdin through to bro-server" + ); + let devnull = File::open("/dev/null").context("failed to open `/dev/null` for stdin")?; + return Ok(PreparedStdin::Passed { + fd: devnull.as_raw_fd(), + _owner: Some(devnull), + }); + } + + if stdin.is_terminal() { + log::warn!( + "stdin is a terminal and BRO_CAPTURE_TTY_STDIN is enabled; passing terminal stdin directly to bro-server" + ); + } else { + log::info!("using fd passing for stdin; not spooling local stdin before execution"); + } + + Ok(PreparedStdin::Passed { + fd: stdin.as_raw_fd(), + _owner: None, + }) +} + +fn spool_stdin() -> Result<(NamedTempFile, u64)> { + let mut spool = NamedTempFile::new().context("failed to create stdin spool file")?; + let stdin = io::stdin(); + + if stdin.is_terminal() && !env_var_is_truthy("BRO_CAPTURE_TTY_STDIN") { + log::info!( + "stdin is a terminal; using empty stdin to avoid blocking. Set BRO_CAPTURE_TTY_STDIN=1 to read terminal stdin until EOF" + ); + spool + .rewind() + .context("failed to rewind empty stdin spool")?; + return Ok((spool, 0)); + } + + if stdin.is_terminal() { + log::warn!( + "stdin is a terminal and BRO_CAPTURE_TTY_STDIN is enabled; reading stdin until EOF" + ); + } + + let stdin_size = io::copy(&mut stdin.lock(), &mut spool) + .context("failed to spool stdin before transmission")?; + spool + .rewind() + .context("failed to rewind stdin spool after capture")?; + Ok((spool, stdin_size)) +} + +fn receive_response(mut stream: UnixStream, response_transport: ResponseTransport) -> Result { + let stdout_handle = io::stdout(); + let stdout_is_terminal = stdout_handle.is_terminal(); + let mut stdout = stdout_handle.lock(); + let stderr_handle = io::stderr(); + let stderr_is_terminal = stderr_handle.is_terminal(); + let mut stderr = stderr_handle.lock(); + + loop { + let frame = read_response_frame(&mut stream, response_transport)?; + match frame { + ResponseFrame::Stdout(bytes) => { + stdout + .write_all(&bytes) + .context("failed to write remote stdout locally")?; + if stdout_is_terminal { + stdout.flush().context("failed to flush local stdout")?; + } + } + ResponseFrame::Stderr(bytes) => { + stderr + .write_all(&bytes) + .context("failed to write remote stderr locally")?; + if stderr_is_terminal { + stderr.flush().context("failed to flush local stderr")?; + } + } + ResponseFrame::Exit(status) => { + let exit_code = status.to_exit_code(); + log::info!( + "received remote exit status {:?} -> code {}", + status, + exit_code + ); + return Ok(exit_code); + } + ResponseFrame::Error(message) => { + log::warn!("received error response from bro-server: {message}"); + writeln!(stderr, "bro-server: {message}") + .context("failed to print bro-server error")?; + stderr.flush().context("failed to flush local stderr")?; + return Ok(WRAPPER_ERROR_EXIT_CODE); + } + } + } +} diff --git a/src/bin/bro-server.rs b/src/bin/bro-server.rs new file mode 100644 index 0000000..6dac3e6 --- /dev/null +++ b/src/bin/bro-server.rs @@ -0,0 +1,1147 @@ +use anyhow::{Context, Result, bail}; +use bro::{ + common::{copy_exact, init_tracing, parse_csv_list}, + protocol::*, +}; +use clap::{ArgGroup, Parser}; +use nix::sys::epoll::{Epoll, EpollCreateFlags, EpollEvent, EpollFlags, EpollTimeout}; +use sd_notify::NotifyState; +use sendfd::RecvWithFd; +use signal_hook::{ + SigId, + consts::signal::SIGHUP, + low_level::{pipe as signal_pipe, unregister as unregister_signal}, +}; +use std::{ + collections::{BTreeMap, HashMap, HashSet}, + env, + ffi::OsString, + fs::{self, File, OpenOptions}, + io::{self, Read, Seek}, + os::{ + fd::{AsRawFd, FromRawFd}, + unix::{ + fs::{FileTypeExt, OpenOptionsExt}, + net::{UnixListener, UnixStream}, + }, + }, + path::{Component, Path, PathBuf}, + process::{Command, Stdio}, + sync::{ + Arc, + atomic::{AtomicBool, Ordering}, + mpsc, + }, + thread, +}; +use tempfile::{Builder, TempDir}; + +const DEFAULT_MAX_FILES: usize = 64; +const DEFAULT_MAX_FILE_BYTES: u64 = 64 * 1024 * 1024; +const DEFAULT_MAX_STDIN_BYTES: u64 = 64 * 1024 * 1024; +const DEFAULT_MAX_TOTAL_BYTES: u64 = 128 * 1024 * 1024; +const EPOLL_EVENT_CAPACITY: usize = 8; +const EPOLL_LISTENER_DATA: u64 = 1; +const EPOLL_RELOAD_SIGNAL_DATA: u64 = 2; +const IO_BUFFER_SIZE: usize = 64 * 1024; +const RELOAD_FAILED_STATUS: &str = "bro-server reload failed; previous configuration kept"; + +#[derive(Parser, Clone, Debug)] +#[command( + name = "bro-server", + version, + about = "Remote execution server", + long_about = None +)] +#[command( + group( + ArgGroup::new("listener_mode") + .args(["systemd_socket", "socket_path"]) + .required(true) + .multiple(false) + ) +)] +struct ServerCli { + #[arg(long, group = "listener_mode")] + systemd_socket: bool, + + #[arg(long, group = "listener_mode", value_name = "PATH")] + socket_path: Option, + + #[arg(long = "fd-passing", alias = "enable-fd-passing")] + fd_passing: bool, + + #[arg(long, env = "BRO_EXECUTABLE", value_name = "PATH")] + executable: PathBuf, + + #[arg(long, env = "BRO_TMP_ROOT", value_name = "PATH")] + tmp_root: Option, + + #[arg(long = "allowed-env", env = "BRO_ALLOWED_ENV", value_name = "CSV")] + allowed_forward_env: Option, + + #[arg(long = "inherit-env", env = "BRO_INHERIT_ENV", value_name = "CSV")] + inherit_env: Option, + + #[arg(long, env = "BRO_MAX_FILES", default_value_t = DEFAULT_MAX_FILES)] + max_files: usize, + + #[arg(long, env = "BRO_MAX_FILE_BYTES", default_value_t = DEFAULT_MAX_FILE_BYTES)] + max_file_bytes: u64, + + #[arg(long, env = "BRO_MAX_STDIN_BYTES", default_value_t = DEFAULT_MAX_STDIN_BYTES)] + max_stdin_bytes: u64, + + #[arg(long, env = "BRO_MAX_TOTAL_BYTES", default_value_t = DEFAULT_MAX_TOTAL_BYTES)] + max_total_bytes: u64, +} + +#[derive(Clone, Debug)] +enum ListenerMode { + Systemd, + Path(PathBuf), +} + +impl ListenerMode { + fn uses_systemd(&self) -> bool { + match self { + Self::Systemd => true, + Self::Path(_) => false, + } + } + + fn label(&self) -> &'static str { + match self { + Self::Systemd => "systemd", + Self::Path(_) => "socket-path", + } + } +} + +fn main() { + if let Err(error) = init_tracing("bro-server") { + eprintln!("bro-server: failed to initialize tracing: {error:#}"); + std::process::exit(1); + } + + if let Err(error) = run() { + log::error!("bro-server failed: {error:#}"); + std::process::exit(1); + } +} + +struct SignalRegistration { + id: SigId, +} + +impl Drop for SignalRegistration { + fn drop(&mut self) { + unregister_signal(self.id); + } +} + +fn run() -> Result<()> { + let argv: Vec = env::args_os().collect(); + let mut config = ServerConfig::from(ServerCli::parse_from(argv.clone())); + log_server_config(&config); + ensure_tmp_root_exists(&config)?; + + let (mut reload_signal_reader, _reload_signal_registration) = install_reload_signal_waker()?; + let reload_requested = Arc::new(AtomicBool::new(false)); + signal_hook::flag::register(SIGHUP, Arc::clone(&reload_requested)) + .context("failed to install SIGHUP reload flag")?; + + let listener = open_listener(&config)?; + listener + .set_nonblocking(true) + .context("failed to make the listener non-blocking")?; + + let epoll = Epoll::new(EpollCreateFlags::empty()).context("failed to create epoll instance")?; + epoll + .add( + &listener, + EpollEvent::new(EpollFlags::EPOLLIN, EPOLL_LISTENER_DATA), + ) + .context("failed to register listener with epoll")?; + epoll + .add( + &reload_signal_reader, + EpollEvent::new(EpollFlags::EPOLLIN, EPOLL_RELOAD_SIGNAL_DATA), + ) + .context("failed to register reload signal pipe with epoll")?; + + let startup_status = format!("bro-server ready for `{}`", config.executable.display()); + maybe_notify(&config.listener_mode, || { + send_ready_notification(&startup_status) + })?; + log::info!( + "bro-server is ready to accept connections in {} mode", + config.listener_mode.label() + ); + + let mut events = [EpollEvent::empty(); EPOLL_EVENT_CAPACITY]; + loop { + let event_count = epoll + .wait(&mut events, EpollTimeout::NONE) + .context("failed to wait for epoll events")?; + let mut listener_ready = false; + let mut reload_signal_ready = false; + + for event in events.iter().take(event_count) { + match event.data() { + EPOLL_LISTENER_DATA => listener_ready = true, + EPOLL_RELOAD_SIGNAL_DATA => reload_signal_ready = true, + data => log::warn!("epoll returned an unknown event token: {data}"), + } + } + + let drained_reload_signal_bytes = if reload_signal_ready { + drain_reload_signal_stream(&mut reload_signal_reader)? + } else { + 0 + }; + if drained_reload_signal_bytes > 0 || reload_requested.swap(false, Ordering::SeqCst) { + handle_reload_request(&mut config, &argv); + } + + if listener_ready { + accept_ready_connections(&listener, &config)?; + } + } +} + +fn handle_reload_request(config: &mut ServerConfig, argv: &[OsString]) { + log::info!("received SIGHUP; reloading configuration"); + notify_or_warn( + &config.listener_mode, + "failed to notify systemd about reload start", + || send_reload_notification("bro-server is reloading configuration"), + ); + + match reload_server_config(argv) { + Ok(new_config) => match ensure_tmp_root_exists(&new_config) { + Ok(()) => { + *config = new_config; + log_server_config(config); + let ready_status = + format!("bro-server reloaded for `{}`", config.executable.display()); + notify_or_warn( + &config.listener_mode, + "failed to notify systemd after reload", + || send_ready_notification(&ready_status), + ); + log::info!("successfully reloaded configuration"); + } + Err(error) => { + log::error!("failed to prepare reloaded configuration: {error:#}"); + notify_reload_failure(&config.listener_mode); + } + }, + Err(error) => { + log::error!("failed to reload configuration: {error:#}"); + notify_reload_failure(&config.listener_mode); + } + } +} + +fn install_reload_signal_waker() -> Result<(UnixStream, SignalRegistration)> { + let (reader, writer) = + UnixStream::pair().context("failed to create reload signal stream pair")?; + reader + .set_nonblocking(true) + .context("failed to make reload signal reader non-blocking")?; + let id = signal_pipe::register(SIGHUP, writer) + .context("failed to register SIGHUP reload signal pipe")?; + Ok((reader, SignalRegistration { id })) +} + +fn drain_reload_signal_stream(stream: &mut UnixStream) -> Result { + let mut buffer = [0_u8; 64]; + let mut total_bytes: usize = 0; + loop { + match stream.read(&mut buffer) { + Ok(0) => bail!("reload signal stream closed unexpectedly"), + Ok(read_count) => { + total_bytes = total_bytes + .checked_add(read_count) + .context("reload signal byte count overflowed")?; + } + Err(error) if error.kind() == io::ErrorKind::WouldBlock => return Ok(total_bytes), + Err(error) => return Err(error).context("failed to drain reload signal stream"), + } + } +} + +fn accept_ready_connections(listener: &UnixListener, config: &ServerConfig) -> Result<()> { + loop { + match listener.accept() { + Ok((stream, address)) => { + log::info!("accepted client connection from {address:?}"); + if let Err(error) = handle_connection(stream, config) { + log::warn!("connection handling failed: {error:#}"); + } + } + Err(error) if error.kind() == io::ErrorKind::WouldBlock => return Ok(()), + Err(error) => return Err(error).context("failed to accept a connection"), + } + } +} + +#[derive(Clone, Debug)] +struct ServerConfig { + listener_mode: ListenerMode, + fd_passing: bool, + executable: PathBuf, + tmp_root: Option, + allowed_forward_env: Option>, + inherited_env: Option>, + max_files: usize, + max_file_bytes: u64, + max_stdin_bytes: u64, + max_total_bytes: u64, +} + +impl From for ServerConfig { + fn from(value: ServerCli) -> Self { + let listener_mode = if value.systemd_socket { + ListenerMode::Systemd + } else { + ListenerMode::Path( + value + .socket_path + .expect("clap validates the mutually exclusive listener mode flags"), + ) + }; + let allowed_forward_env = parse_optional_csv_set(value.allowed_forward_env); + let inherited_env = parse_optional_csv_set(value.inherit_env); + + Self { + listener_mode, + fd_passing: value.fd_passing, + executable: value.executable, + tmp_root: value.tmp_root, + allowed_forward_env, + inherited_env, + max_files: value.max_files, + max_file_bytes: value.max_file_bytes, + max_stdin_bytes: value.max_stdin_bytes, + max_total_bytes: value.max_total_bytes, + } + } +} + +impl ServerConfig { + fn forwarded_env_is_allowed(&self, key: &str) -> bool { + self.allowed_forward_env + .as_ref() + .is_none_or(|allowed| allowed.contains(key)) + } + + fn inherited_environment(&self) -> BTreeMap { + self.inherited_env + .as_ref() + .map_or_else(BTreeMap::new, |allowed| { + env::vars() + .filter(|(key, _value)| { + allowed.contains(key.as_str()) && !is_control_environment_key(key) + }) + .collect() + }) + } + + fn server_features(&self) -> ServerFeatures { + let mut upload_transports = vec![UploadTransport::StreamedBytes]; + let mut stdin_transports = vec![UploadTransport::StreamedBytes]; + let response_transports = vec![ + ResponseTransport::JsonMessages, + ResponseTransport::BinaryFrames, + ]; + if self.fd_passing { + upload_transports.push(UploadTransport::PassedFileDescriptors); + stdin_transports.push(UploadTransport::PassedFileDescriptors); + } + + ServerFeatures { + upload_transports, + stdin_transports, + response_transports, + } + } +} + +fn log_server_config(config: &ServerConfig) { + match &config.listener_mode { + ListenerMode::Systemd => { + log::info!( + "loaded server configuration: listener_mode={} executable={} fd_passing={}", + config.listener_mode.label(), + config.executable.display(), + config.fd_passing + ); + } + ListenerMode::Path(path) => { + log::info!( + "loaded server configuration: listener_mode={} socket_path={} executable={} fd_passing={}", + config.listener_mode.label(), + path.display(), + config.executable.display(), + config.fd_passing + ); + } + } + + log::debug!( + "server limits and environment configuration: tmp_root={:?} allowed_forward_env_count={} inherited_env_count={} max_files={} max_file_bytes={} max_stdin_bytes={} max_total_bytes={}", + config + .tmp_root + .as_ref() + .map(|path| path.display().to_string()), + config.allowed_forward_env.as_ref().map_or(0, HashSet::len), + config.inherited_env.as_ref().map_or(0, HashSet::len), + config.max_files, + config.max_file_bytes, + config.max_stdin_bytes, + config.max_total_bytes + ); +} + +fn reload_server_config(argv: &[OsString]) -> Result { + let cli = ServerCli::try_parse_from(argv.iter().cloned()) + .context("failed to parse bro-server configuration")?; + Ok(ServerConfig::from(cli)) +} + +fn ensure_tmp_root_exists(config: &ServerConfig) -> Result<()> { + if let Some(tmp_root) = &config.tmp_root { + fs::create_dir_all(tmp_root).with_context(|| { + format!( + "failed to create BRO_TMP_ROOT directory `{}`", + tmp_root.display() + ) + })?; + } + + Ok(()) +} + +fn open_listener(config: &ServerConfig) -> Result { + match &config.listener_mode { + ListenerMode::Systemd => { + log::info!("opening listener from systemd socket activation"); + activated_listener() + } + ListenerMode::Path(path) => { + log::info!("binding listener at unix socket path {}", path.display()); + bound_listener(path) + } + } +} + +fn activated_listener() -> Result { + let mut fds = sd_notify::listen_fds() + .context("failed to inspect systemd socket activation file descriptors")?; + if fds.len() != 1 { + bail!( + "bro-server requires exactly one systemd socket activation file descriptor, got {}", + fds.len() + ) + } + + let fd = fds + .next() + .context("systemd did not pass the activated listener on fd 3")?; + // SAFETY: systemd owns the listening socket and `listen_fds()` returns the raw fd it passed + // to this process. We take ownership of that exact fd once and turn it into a `UnixListener`. + let listener = unsafe { UnixListener::from_raw_fd(fd) }; + Ok(listener) +} + +fn bound_listener(path: &Path) -> Result { + if let Some(parent) = path + .parent() + .filter(|parent| !parent.as_os_str().is_empty()) + { + fs::create_dir_all(parent).with_context(|| { + format!( + "failed to create parent directory for socket `{}`", + path.display() + ) + })?; + } + + match fs::symlink_metadata(path) { + Ok(metadata) if !metadata.file_type().is_socket() => { + bail!( + "socket path `{}` already exists and is not a unix socket", + path.display() + ) + } + Ok(_) => match UnixStream::connect(path) { + Ok(_) => { + bail!("socket path `{}` is already in use", path.display()) + } + Err(error) if error.kind() == io::ErrorKind::ConnectionRefused => { + log::warn!("removing stale unix socket before bind: {}", path.display()); + fs::remove_file(path).with_context(|| { + format!("failed to remove stale socket path `{}`", path.display()) + })?; + } + Err(error) if error.kind() == io::ErrorKind::NotFound => {} + Err(error) => { + return Err(error).with_context(|| { + format!("failed to probe existing socket path `{}`", path.display()) + }); + } + }, + Err(error) if error.kind() == io::ErrorKind::NotFound => {} + Err(error) => { + return Err(error) + .with_context(|| format!("failed to inspect socket path `{}`", path.display())); + } + } + + let listener = UnixListener::bind(path).with_context(|| { + format!( + "failed to bind unix socket listener at `{}`", + path.display() + ) + })?; + log::info!("bound unix socket listener at {}", path.display()); + Ok(listener) +} + +fn maybe_notify(listener_mode: &ListenerMode, notify: F) -> Result<()> +where + F: FnOnce() -> Result<()>, +{ + if listener_mode.uses_systemd() { + notify() + } else { + Ok(()) + } +} + +fn notify_or_warn(listener_mode: &ListenerMode, warning: &str, notify: F) +where + F: FnOnce() -> Result<()>, +{ + if let Err(error) = maybe_notify(listener_mode, notify) { + log::warn!("{warning}: {error:#}"); + } +} + +fn notify_reload_failure(listener_mode: &ListenerMode) { + notify_or_warn( + listener_mode, + "failed to notify systemd about reload failure", + || send_ready_notification(RELOAD_FAILED_STATUS), + ); +} + +fn parse_optional_csv_set(value: Option) -> Option> { + value + .map(|entries| parse_csv_list(&entries).into_iter().collect::>()) + .filter(|entries| !entries.is_empty()) +} + +fn send_ready_notification(status: &str) -> Result<()> { + let states = [NotifyState::Status(status), NotifyState::Ready]; + sd_notify::notify(&states[..]).context("failed to notify systemd that bro-server is ready") +} + +fn send_reload_notification(status: &str) -> Result<()> { + let monotonic = NotifyState::monotonic_usec_now() + .context("failed to read monotonic time for systemd reload notification")?; + let states = [ + NotifyState::Status(status), + NotifyState::Reloading, + monotonic, + ]; + sd_notify::notify(&states[..]).context("failed to notify systemd that bro-server is reloading") +} + +fn handle_connection(mut stream: UnixStream, config: &ServerConfig) -> Result<()> { + match read_connection_kind(&mut stream)? { + ConnectionKind::ServerFeaturesQuery => handle_server_features_query(&mut stream, config), + ConnectionKind::Execute => handle_execute_connection(&mut stream, config), + } +} + +fn handle_server_features_query(stream: &mut UnixStream, config: &ServerConfig) -> Result<()> { + write_server_features_response(stream, &config.server_features()) + .context("failed to send server features") +} + +fn handle_execute_connection(stream: &mut UnixStream, config: &ServerConfig) -> Result<()> { + let header: RequestHeader = match read_execute_request_header(stream) { + Ok(header) => header, + Err(error) => { + log::warn!("failed to read execute request header: {error:#}"); + let _ = write_message(stream, &ResponseFrame::Error(format!("{error:#}"))); + return Err(error); + } + }; + let response_transport = header.response_transport; + + let result = handle_execute_connection_inner(stream, config, header); + if let Err(error) = &result { + log::warn!("request failed; sending error response frame: {error:#}"); + let _ = write_response_frame( + stream, + response_transport, + &ResponseFrame::Error(format!("{error:#}")), + ); + } + result +} + +enum ReceivedUpload { + StagedPath(PathBuf), + PassedFile(File), +} + +impl ReceivedUpload { + fn rewritten_path(&self) -> String { + match self { + Self::StagedPath(path) => path.to_string_lossy().into_owned(), + Self::PassedFile(file) => format!("/proc/self/fd/{}", file.as_raw_fd()), + } + } +} + +struct ReceivedInputs { + uploads: HashMap, + stdin: File, +} + +fn handle_execute_connection_inner( + stream: &mut UnixStream, + config: &ServerConfig, + header: RequestHeader, +) -> Result<()> { + log::info!( + "received request header: args={} uploads={} stdin_size={:?} forwarded_env={} upload_transport={:?} stdin_transport={:?} response_transport={:?}", + header.args.len(), + header.uploads.len(), + header.stdin_size, + header.env.len(), + header.upload_transport, + header.stdin_transport, + header.response_transport, + ); + + validate_request(&header, config)?; + + let tempdir = create_request_tempdir(config)?; + let ReceivedInputs { + uploads: received_uploads, + stdin, + } = receive_request_inputs(stream, &header, tempdir.path())?; + let rewritten_args = header.rewrite_args(|upload_id| { + Ok(received_upload(&received_uploads, upload_id)?.rewritten_path()) + })?; + + execute_request( + stream, + config, + rewritten_args, + stdin, + header.response_transport, + &header.env, + received_uploads, + ) +} + +fn validate_request(header: &RequestHeader, config: &ServerConfig) -> Result<()> { + header.validate_invariants()?; + + if header.upload_transport == UploadTransport::PassedFileDescriptors && !config.fd_passing { + bail!("request asked for fd-passing uploads but the server does not support them") + } + if header.stdin_transport == UploadTransport::PassedFileDescriptors && !config.fd_passing { + bail!("request asked for fd-passing stdin but the server does not support it") + } + + if header.uploads.len() > config.max_files { + bail!( + "request attempted to upload {} files but the server allows only {}", + header.uploads.len(), + config.max_files + ) + } + + if let Some(stdin_size) = header.stdin_size + && stdin_size > config.max_stdin_bytes + { + bail!( + "request stdin size {} exceeds BRO_MAX_STDIN_BYTES ({})", + stdin_size, + config.max_stdin_bytes + ) + } + + let total_file_bytes = header.uploads.iter().try_fold(0_u64, |total, upload| { + if upload.size > config.max_file_bytes { + bail!( + "upload `{}` is {} bytes, which exceeds BRO_MAX_FILE_BYTES ({})", + upload.original_path, + upload.size, + config.max_file_bytes + ) + } + + total + .checked_add(upload.size) + .context("total uploaded file size overflowed") + })?; + + let total_bytes = total_file_bytes + .checked_add(header.stdin_size.unwrap_or(0)) + .context("total request size overflowed")?; + if total_bytes > config.max_total_bytes { + bail!( + "request transfers {} bytes, which exceeds BRO_MAX_TOTAL_BYTES ({})", + total_bytes, + config.max_total_bytes + ) + } + + Ok(()) +} + +fn create_request_tempdir(config: &ServerConfig) -> Result { + let mut builder = Builder::new(); + builder.prefix("bro-"); + + let tempdir = if let Some(root) = &config.tmp_root { + builder + .tempdir_in(root) + .with_context(|| format!("failed to create tempdir inside `{}`", root.display()))? + } else { + builder + .tempdir() + .context("failed to create request tempdir")? + }; + + Ok(tempdir) +} + +fn receive_request_inputs( + stream: &mut UnixStream, + header: &RequestHeader, + temp_root: &Path, +) -> Result { + let mut received_uploads = match header.upload_transport { + UploadTransport::StreamedBytes => { + receive_streamed_uploads(stream, &header.uploads, temp_root)? + } + UploadTransport::PassedFileDescriptors => HashMap::new(), + }; + + let (fd_uploads, fd_stdin) = receive_passed_inputs( + stream, + &header.uploads, + header.upload_transport, + header.stdin_transport, + )?; + received_uploads.extend(fd_uploads); + + let stdin = match header.stdin_transport { + UploadTransport::StreamedBytes => receive_streamed_stdin( + stream, + header + .stdin_size + .context("request used streamed stdin but did not provide stdin_size")?, + temp_root, + )?, + UploadTransport::PassedFileDescriptors => fd_stdin.context( + "request declared fd-passed stdin but no stdin file descriptor was received", + )?, + }; + + Ok(ReceivedInputs { + uploads: received_uploads, + stdin, + }) +} + +fn receive_streamed_uploads( + stream: &mut UnixStream, + uploads: &[UploadSpec], + temp_root: &Path, +) -> Result> { + let mut uploaded_paths = HashMap::with_capacity(uploads.len()); + + for upload in uploads { + let staged_path = staged_upload_path(temp_root, upload); + if let Some(parent) = staged_path.parent() { + fs::create_dir_all(parent).with_context(|| { + format!( + "failed to create directories for uploaded file `{}`", + staged_path.display() + ) + })?; + } + + let mut output = create_private_staged_file(&staged_path).with_context(|| { + format!( + "failed to create staged upload destination `{}`", + staged_path.display() + ) + })?; + copy_exact(stream, &mut output, upload.size).with_context(|| { + format!( + "failed while receiving uploaded file `{}`", + upload.original_path + ) + })?; + + uploaded_paths.insert(upload.id, ReceivedUpload::StagedPath(staged_path)); + } + + Ok(uploaded_paths) +} + +fn receive_passed_inputs( + stream: &mut UnixStream, + uploads: &[UploadSpec], + upload_transport: UploadTransport, + stdin_transport: UploadTransport, +) -> Result<(HashMap, Option)> { + let upload_count = match upload_transport { + UploadTransport::StreamedBytes => 0, + UploadTransport::PassedFileDescriptors => uploads.len(), + }; + let expects_stdin = stdin_transport == UploadTransport::PassedFileDescriptors; + let expected_count = expected_passed_fd_count(upload_transport, uploads.len(), stdin_transport); + + if expected_count == 0 { + return Ok((HashMap::new(), None)); + } + + let PassedDescriptorBatch { + marker_bytes, + files, + } = receive_passed_descriptor_batch(stream, expected_count)?; + validate_passed_fd_marker_bytes(&marker_bytes, upload_count, expects_stdin)?; + + let mut files = files.into_iter(); + let mut received_uploads = HashMap::with_capacity(upload_count); + for upload in uploads.iter().take(upload_count) { + let mut file = files.next().with_context(|| { + format!( + "missing passed file descriptor for uploaded file `{}`", + upload.original_path + ) + })?; + let description = format!("passed file descriptor for `{}`", upload.original_path); + validate_and_rewind_passed_file(&mut file, &description, upload.size)?; + received_uploads.insert(upload.id, ReceivedUpload::PassedFile(file)); + } + + let stdin = if expects_stdin { + Some( + files + .next() + .context("missing passed stdin file descriptor from bro-client")?, + ) + } else { + None + }; + + Ok((received_uploads, stdin)) +} + +struct PassedDescriptorBatch { + marker_bytes: Vec, + files: Vec, +} + +fn receive_passed_descriptor_batch( + stream: &mut UnixStream, + expected_count: usize, +) -> Result { + let mut marker_bytes = vec![0_u8; expected_count]; + let mut raw_fds = vec![0; expected_count]; + let mut received_bytes = 0; + let mut received_fds = 0; + + while received_bytes < expected_count { + let (byte_count, fd_count) = stream + .recv_with_fd( + &mut marker_bytes[received_bytes..], + &mut raw_fds[received_fds..], + ) + .context("failed to receive passed file descriptors from bro-client")?; + + if byte_count == 0 && fd_count == 0 { + bail!("connection closed before all passed file descriptors were received") + } + + received_bytes += byte_count; + received_fds += fd_count; + } + + if received_fds != expected_count { + bail!( + "expected {} passed file descriptors but received {}", + expected_count, + received_fds + ) + } + + let files = raw_fds + .into_iter() + .take(received_fds) + .map(|fd| { + // SAFETY: each raw fd comes from `recv_with_fd`, so ownership was transferred to this process. + unsafe { File::from_raw_fd(fd) } + }) + .collect(); + + Ok(PassedDescriptorBatch { + marker_bytes, + files, + }) +} + +fn validate_and_rewind_passed_file( + file: &mut File, + description: &str, + expected_size: u64, +) -> Result<()> { + let metadata = file + .metadata() + .with_context(|| format!("failed to stat {description}"))?; + if !metadata.is_file() { + bail!("{description} is not a regular file") + } + if metadata.len() != expected_size { + bail!( + "{description} reported size {} but expected {}", + metadata.len(), + expected_size + ) + } + file.rewind() + .with_context(|| format!("failed to rewind {description}"))?; + Ok(()) +} + +fn receive_streamed_stdin( + stream: &mut UnixStream, + stdin_size: u64, + temp_root: &Path, +) -> Result { + let stdin_path = temp_root.join("stdin"); + let mut output = create_private_staged_file(&stdin_path) + .with_context(|| format!("failed to create `{}`", stdin_path.display()))?; + copy_exact(stream, &mut output, stdin_size).context("failed while receiving stdin")?; + drop(output); + + File::open(&stdin_path) + .with_context(|| format!("failed to reopen staged stdin `{}`", stdin_path.display())) +} + +fn create_private_staged_file(path: &Path) -> Result { + Ok(OpenOptions::new() + .create_new(true) + .write(true) + .mode(0o600) + .open(path)?) +} + +fn staged_upload_path(temp_root: &Path, upload: &UploadSpec) -> PathBuf { + let mut staged_path = temp_root.join(format!("upload-{}", upload.id)); + let mut added_component = false; + + for component in Path::new(&upload.original_path).components() { + match component { + Component::Normal(part) => { + staged_path.push(part); + added_component = true; + } + Component::ParentDir => { + staged_path.push("__parent__"); + added_component = true; + } + Component::CurDir | Component::RootDir => {} + Component::Prefix(_) => {} + } + } + + if !added_component { + staged_path.push("file"); + } + + staged_path +} + +fn received_upload( + received_uploads: &HashMap, + upload_id: u32, +) -> Result<&ReceivedUpload> { + received_uploads + .get(&upload_id) + .with_context(|| format!("missing received upload for upload id {upload_id}")) +} + +fn execute_request( + stream: &mut UnixStream, + config: &ServerConfig, + args: Vec, + stdin: File, + response_transport: ResponseTransport, + forwarded_env: &BTreeMap, + received_uploads: HashMap, +) -> Result<()> { + let _keep_received_uploads_alive = received_uploads; + let inherited_env = config.inherited_environment(); + let disallowed_forwarded_env = forwarded_env + .keys() + .filter(|key| !config.forwarded_env_is_allowed(key)) + .cloned() + .collect::>(); + if !disallowed_forwarded_env.is_empty() { + log::warn!( + "ignoring {} forwarded environment variable(s) that are not allowed by server policy: {}", + disallowed_forwarded_env.len(), + disallowed_forwarded_env.join(", ") + ); + } + let allowed_forwarded_env = forwarded_env + .iter() + .filter(|(key, _value)| config.forwarded_env_is_allowed(key)) + .map(|(key, value)| (key.clone(), value.clone())) + .collect::>(); + log::info!( + "spawning configured executable {} with {} args, {} inherited env vars, and {} forwarded env vars", + config.executable.display(), + args.len(), + inherited_env.len(), + allowed_forwarded_env.len() + ); + + let mut command = Command::new(&config.executable); + command.args(&args); + command.stdin(Stdio::from(stdin)); + command.stdout(Stdio::piped()); + command.stderr(Stdio::piped()); + command.env_clear(); + command.envs(&inherited_env); + command.envs(&allowed_forwarded_env); + + let mut child = command.spawn().with_context(|| { + format!( + "failed to spawn configured executable `{}`", + config.executable.display() + ) + })?; + log::info!("spawned configured executable with pid {}", child.id()); + + let stdout = child + .stdout + .take() + .context("child stdout was not captured")?; + let stderr = child + .stderr + .take() + .context("child stderr was not captured")?; + + let (tx, rx) = mpsc::channel::(); + spawn_output_reader(stdout, OutputStream::Stdout, tx.clone()); + spawn_output_reader(stderr, OutputStream::Stderr, tx); + + for chunk in rx { + let frame = match chunk { + OutputChunk::Stdout(bytes) => ResponseFrame::Stdout(bytes), + OutputChunk::Stderr(bytes) => ResponseFrame::Stderr(bytes), + }; + + if let Err(error) = write_response_frame(stream, response_transport, &frame) { + log::warn!("failed to send child output frame to client; terminating child: {error:#}"); + let _ = child.kill(); + let _ = child.wait(); + return Err(error).context("failed to stream child output back to bro-client"); + } + } + + let status = child + .wait() + .context("failed to wait for remote executable")?; + log::info!("configured executable exited with status {status}"); + write_response_frame( + stream, + response_transport, + &ResponseFrame::Exit(status.into()), + ) + .context("failed to send remote exit status")?; + + Ok(()) +} + +fn is_control_environment_key(key: &str) -> bool { + key.starts_with("BRO_") + || matches!( + key, + "LISTEN_PID" | "LISTEN_FDS" | "LISTEN_FDNAMES" | "NOTIFY_SOCKET" + ) +} + +#[derive(Clone, Copy)] +enum OutputStream { + Stdout, + Stderr, +} + +impl OutputStream { + fn as_str(self) -> &'static str { + match self { + Self::Stdout => "stdout", + Self::Stderr => "stderr", + } + } +} + +enum OutputChunk { + Stdout(Vec), + Stderr(Vec), +} + +fn spawn_output_reader(mut reader: R, stream: OutputStream, sender: mpsc::Sender) +where + R: Read + Send + 'static, +{ + thread::spawn(move || { + let mut buffer = [0_u8; IO_BUFFER_SIZE]; + loop { + match reader.read(&mut buffer) { + Ok(0) => { + break; + } + Ok(read_count) => { + let chunk = buffer[..read_count].to_vec(); + let message = match stream { + OutputStream::Stdout => OutputChunk::Stdout(chunk), + OutputStream::Stderr => OutputChunk::Stderr(chunk), + }; + if sender.send(message).is_err() { + break; + } + } + Err(error) => { + log::warn!( + "failed to read child output stream {}: {error}", + stream.as_str() + ); + let diagnostic = format!( + "bro-server: failed to read child {}: {error}\n", + stream.as_str(), + ); + let _ = sender.send(OutputChunk::Stderr(diagnostic.into_bytes())); + break; + } + } + } + }); +} diff --git a/src/common.rs b/src/common.rs new file mode 100644 index 0000000..f8f018a --- /dev/null +++ b/src/common.rs @@ -0,0 +1,71 @@ +use anyhow::{Context, Result, anyhow, bail}; +use std::{ + env, + io::{self, Read, Write}, +}; +use tracing_subscriber::EnvFilter; + +pub fn init_tracing(binary_name: &str) -> Result<()> { + let binary_target = binary_name.replace('-', "_"); + let default_filter = format!("info,{binary_target}=debug"); + + let Ok(requested_filter) = env::var("RUST_LOG") else { + return Ok(()); + }; + + let requested_filter = match requested_filter.trim() { + "" | "1" | "true" | "yes" | "on" => default_filter, + value => value.to_owned(), + }; + + let env_filter = EnvFilter::try_new(&requested_filter) + .map_err(|error| anyhow!("invalid log filter `{requested_filter}`: {error}"))?; + + tracing_subscriber::fmt() + .with_env_filter(env_filter) + .with_target(true) + .with_thread_ids(true) + .with_thread_names(true) + .compact() + .try_init() + .map_err(|error| anyhow!("failed to initialize tracing subscriber: {error}"))?; + + log::debug!("initialized tracing subscriber for {binary_name}"); + Ok(()) +} + +pub fn copy_exact(reader: &mut R, writer: &mut W, size: u64) -> Result<()> { + let mut limited_reader = reader.take(size); + let copied = io::copy(&mut limited_reader, writer).context("failed to copy payload bytes")?; + if copied != size { + bail!("expected to copy {size} bytes, but copied {copied}") + } + + Ok(()) +} + +pub fn env_var_is_set(key: &str) -> bool { + env::var_os(key).is_some_and(|value| !value.is_empty()) +} + +pub fn env_var_is_truthy(key: &str) -> bool { + env::var_os(key).is_some_and(|value| { + matches!( + value.to_string_lossy().trim().to_ascii_lowercase().as_str(), + "1" | "true" | "yes" | "on" + ) + }) +} + +pub fn parse_csv_env(key: &str) -> Vec { + env::var(key).map_or_else(|_| Vec::new(), |value| parse_csv_list(&value)) +} + +pub fn parse_csv_list(value: &str) -> Vec { + value + .split(',') + .map(str::trim) + .filter(|entry| !entry.is_empty()) + .map(ToOwned::to_owned) + .collect() +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..59c589a --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,2 @@ +pub mod common; +pub mod protocol; diff --git a/src/protocol.rs b/src/protocol.rs new file mode 100644 index 0000000..d432fb9 --- /dev/null +++ b/src/protocol.rs @@ -0,0 +1,828 @@ +use anyhow::{Context, Result, bail}; +use serde::{Deserialize, Serialize, de::DeserializeOwned}; +use std::{ + collections::{BTreeMap, HashSet}, + io::{Read, Write}, + os::unix::process::ExitStatusExt, + process::ExitStatus, +}; + +pub const EXECUTE_MAGIC: [u8; 4] = *b"BRO1"; +pub const SERVER_FEATURES_MAGIC: [u8; 4] = *b"BROC"; +pub const FD_PASS_MARKER: u8 = b'F'; +pub const STDIN_FD_PASS_MARKER: u8 = b'S'; +pub const MAX_CONTROL_MESSAGE_BYTES: u64 = 16 * 1024 * 1024; +pub const MAX_RESPONSE_FRAME_BYTES: u64 = 16 * 1024 * 1024; +pub const WRAPPER_ERROR_EXIT_CODE: i32 = 125; + +const RESPONSE_FRAME_STDOUT: u8 = b'O'; +const RESPONSE_FRAME_STDERR: u8 = b'E'; +const RESPONSE_FRAME_EXIT: u8 = b'X'; +const RESPONSE_FRAME_ERROR: u8 = b'!'; +const BINARY_EXIT_STATUS_PAYLOAD_LEN: usize = 10; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ConnectionKind { + Execute, + ServerFeaturesQuery, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)] +#[serde(rename_all = "snake_case")] +pub enum UploadTransport { + #[default] + StreamedBytes, + PassedFileDescriptors, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)] +#[serde(rename_all = "snake_case")] +pub enum ResponseTransport { + #[default] + JsonMessages, + BinaryFrames, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RequestHeader { + pub args: Vec, + pub env: BTreeMap, + pub uploads: Vec, + pub rewrites: Vec, + #[serde(default)] + pub stdin_size: Option, + #[serde(default)] + pub upload_transport: UploadTransport, + #[serde(default)] + pub stdin_transport: UploadTransport, + #[serde(default)] + pub response_transport: ResponseTransport, +} + +impl RequestHeader { + pub fn validate_invariants(&self) -> Result<()> { + if self.stdin_transport == UploadTransport::StreamedBytes && self.stdin_size.is_none() { + bail!("request used streamed stdin but did not provide stdin_size") + } + + let mut upload_ids = HashSet::with_capacity(self.uploads.len()); + for upload in &self.uploads { + if !upload_ids.insert(upload.id) { + bail!("request referenced upload id {} more than once", upload.id) + } + } + + for rewrite in &self.rewrites { + match rewrite { + ArgRewrite::Replace { + arg_index, + upload_id, + } + | ArgRewrite::Prefixed { + arg_index, + upload_id, + .. + } => { + if *arg_index >= self.args.len() { + bail!( + "request tried to rewrite argument index {} but only {} arguments were provided", + arg_index, + self.args.len() + ) + } + if !upload_ids.contains(upload_id) { + bail!( + "request tried to rewrite argument {} using unknown upload id {}", + arg_index, + upload_id + ) + } + } + } + } + + Ok(()) + } + + pub fn rewrite_args(&self, mut resolve_upload: F) -> Result> + where + F: FnMut(u32) -> Result, + { + let mut rewritten = self.args.clone(); + + for rewrite in &self.rewrites { + match rewrite { + ArgRewrite::Replace { + arg_index, + upload_id, + } => { + rewritten[*arg_index] = resolve_upload(*upload_id)?; + } + ArgRewrite::Prefixed { + arg_index, + prefix, + upload_id, + } => { + rewritten[*arg_index] = format!("{prefix}{}", resolve_upload(*upload_id)?); + } + } + } + + Ok(rewritten) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct UploadSpec { + pub id: u32, + pub original_path: String, + pub size: u64, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub enum ArgRewrite { + Replace { + arg_index: usize, + upload_id: u32, + }, + Prefixed { + arg_index: usize, + prefix: String, + upload_id: u32, + }, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub enum ResponseFrame { + Stdout(Vec), + Stderr(Vec), + Exit(RemoteExitStatus), + Error(String), +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct RemoteExitStatus { + pub code: Option, + pub signal: Option, +} + +#[derive(Debug, Clone, Copy)] +pub struct RequestPlanningOptions<'a> { + pub file_flags: &'a [String], + pub file_args_enabled: bool, +} + +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +pub struct RequestPlanningStats { + pub file_flag_rewrite_count: usize, + pub auto_file_arg_probe_count: usize, + pub auto_file_arg_rewrite_count: usize, +} + +#[derive(Debug, Default)] +pub struct PlannedArgRewrites { + pub rewrites: Vec, + pub stats: RequestPlanningStats, +} + +pub trait UploadRegistrar { + fn ensure_upload(&mut self, path_text: &str) -> Result; + fn maybe_ensure_upload(&mut self, path_text: &str) -> Result>; +} + +pub fn plan_request_rewrites( + args: &[String], + options: &RequestPlanningOptions<'_>, + uploads: &mut U, +) -> Result +where + U: UploadRegistrar, +{ + let mut rewrites = Vec::new(); + let mut stats = RequestPlanningStats::default(); + let mut skip_index = None; + + for (index, argument) in args.iter().enumerate() { + if skip_index == Some(index) { + skip_index = None; + continue; + } + + if let Some(matched_flag) = match_file_flag(args, index, options.file_flags)? { + match matched_flag { + MatchedFileFlag::Separate { + flag, + value_index, + value, + } => { + let upload_id = uploads.ensure_upload(value)?; + rewrites.push(ArgRewrite::Replace { + arg_index: value_index, + upload_id, + }); + log::debug!( + "BRO_FILE_FLAGS matched separate flag `{flag}` at arg index {index}; forwarding argument index {value_index} -> `{value}`" + ); + stats.file_flag_rewrite_count += 1; + skip_index = Some(value_index); + } + MatchedFileFlag::Joined { + flag, + prefix, + value, + } => { + let upload_id = uploads.ensure_upload(value)?; + rewrites.push(ArgRewrite::Prefixed { + arg_index: index, + prefix, + upload_id, + }); + log::debug!( + "BRO_FILE_FLAGS matched joined flag `{flag}` at arg index {index}; forwarding value `{value}`" + ); + stats.file_flag_rewrite_count += 1; + } + } + continue; + } + + if options.file_args_enabled && is_nonflag_argument(argument) { + stats.auto_file_arg_probe_count += 1; + if let Some(upload_id) = uploads.maybe_ensure_upload(argument)? { + rewrites.push(ArgRewrite::Replace { + arg_index: index, + upload_id, + }); + stats.auto_file_arg_rewrite_count += 1; + log::debug!( + "BRO_FILE_ARGS auto-forwarded non-flag argument index {index}: `{argument}`" + ); + } else { + log::debug!( + "BRO_FILE_ARGS left non-flag argument index {index} unchanged because `{argument}` could not be opened as a readable regular file" + ); + } + } + } + + Ok(PlannedArgRewrites { rewrites, stats }) +} + +enum MatchedFileFlag<'a> { + Separate { + flag: &'a str, + value_index: usize, + value: &'a str, + }, + Joined { + flag: &'a str, + prefix: String, + value: &'a str, + }, +} + +fn match_file_flag<'a>( + args: &'a [String], + index: usize, + file_flags: &'a [String], +) -> Result>> { + let argument = &args[index]; + + for flag in file_flags { + if argument == flag { + let value_index = index + 1; + let value = args.get(value_index).with_context(|| { + format!("flag `{flag}` is configured as a file flag but has no value") + })?; + return Ok(Some(MatchedFileFlag::Separate { + flag: flag.as_str(), + value_index, + value, + })); + } + + let prefix = format!("{flag}="); + if let Some(value) = argument.strip_prefix(&prefix) { + return Ok(Some(MatchedFileFlag::Joined { + flag: flag.as_str(), + prefix, + value, + })); + } + } + + Ok(None) +} + +fn is_nonflag_argument(argument: &str) -> bool { + !argument.starts_with('-') || argument == "-" +} + +fn default_upload_transports() -> Vec { + vec![UploadTransport::StreamedBytes] +} + +fn default_response_transports() -> Vec { + vec![ResponseTransport::JsonMessages] +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ServerFeatures { + #[serde(default = "default_upload_transports")] + pub upload_transports: Vec, + #[serde(default = "default_upload_transports")] + pub stdin_transports: Vec, + #[serde(default = "default_response_transports")] + pub response_transports: Vec, +} + +impl ServerFeatures { + pub fn supports_transport(&self, transport: UploadTransport) -> bool { + self.supports_upload_transport(transport) + } + + pub fn supports_upload_transport(&self, transport: UploadTransport) -> bool { + self.upload_transports.contains(&transport) + } + + pub fn supports_stdin_transport(&self, transport: UploadTransport) -> bool { + self.stdin_transports.contains(&transport) + } + + pub fn supports_response_transport(&self, transport: ResponseTransport) -> bool { + self.response_transports.contains(&transport) + } +} + +impl RemoteExitStatus { + pub fn to_exit_code(&self) -> i32 { + if let Some(code) = self.code { + code + } else if let Some(signal) = self.signal { + 128 + signal + } else { + WRAPPER_ERROR_EXIT_CODE + } + } +} + +impl From for RemoteExitStatus { + fn from(status: ExitStatus) -> Self { + Self { + code: status.code(), + signal: status.signal(), + } + } +} + +pub fn write_execute_magic(writer: &mut W) -> Result<()> { + write_magic(writer, &EXECUTE_MAGIC) +} + +pub fn write_server_features_magic(writer: &mut W) -> Result<()> { + write_magic(writer, &SERVER_FEATURES_MAGIC) +} + +fn write_magic(writer: &mut W, magic: &[u8; 4]) -> Result<()> { + writer + .write_all(magic) + .context("failed to write protocol magic") +} + +pub fn read_connection_kind(reader: &mut R) -> Result { + let mut magic = [0_u8; 4]; + reader + .read_exact(&mut magic) + .context("failed to read protocol magic")?; + + match magic { + EXECUTE_MAGIC => Ok(ConnectionKind::Execute), + SERVER_FEATURES_MAGIC => Ok(ConnectionKind::ServerFeaturesQuery), + _ => bail!( + "received an unsupported protocol header: expected `{EXECUTE_MAGIC:?}` or `{SERVER_FEATURES_MAGIC:?}`, got `{magic:?}`" + ), + } +} + +pub fn write_execute_request_header( + writer: &mut W, + header: &RequestHeader, +) -> Result<()> { + write_message(writer, header) +} + +pub fn read_execute_request_header(reader: &mut R) -> Result { + read_message(reader) +} + +pub fn write_server_features_response( + writer: &mut W, + server_features: &ServerFeatures, +) -> Result<()> { + write_message(writer, server_features) +} + +pub fn read_server_features_response(reader: &mut R) -> Result { + read_message(reader) +} + +pub fn passed_fd_marker_bytes(upload_count: usize, include_stdin: bool) -> Vec { + let mut marker_bytes = std::iter::repeat_n(FD_PASS_MARKER, upload_count).collect::>(); + if include_stdin { + marker_bytes.push(STDIN_FD_PASS_MARKER); + } + marker_bytes +} + +pub fn expected_passed_fd_count( + upload_transport: UploadTransport, + upload_count: usize, + stdin_transport: UploadTransport, +) -> usize { + let upload_fd_count = match upload_transport { + UploadTransport::StreamedBytes => 0, + UploadTransport::PassedFileDescriptors => upload_count, + }; + + upload_fd_count + usize::from(stdin_transport == UploadTransport::PassedFileDescriptors) +} + +pub fn validate_passed_fd_marker_bytes( + marker_bytes: &[u8], + upload_count: usize, + expects_stdin: bool, +) -> Result<()> { + let expected_count = upload_count + usize::from(expects_stdin); + if marker_bytes.len() != expected_count { + bail!( + "received {} fd marker bytes but expected {}", + marker_bytes.len(), + expected_count + ) + } + + if marker_bytes[..upload_count] + .iter() + .any(|byte| *byte != FD_PASS_MARKER) + { + bail!("received invalid upload file descriptor marker bytes from bro-client") + } + if expects_stdin && marker_bytes[upload_count] != STDIN_FD_PASS_MARKER { + bail!("received invalid stdin file descriptor marker byte from bro-client") + } + + Ok(()) +} + +pub fn write_message(writer: &mut W, value: &T) -> Result<()> { + let payload = + serde_json::to_vec(value).context("failed to serialize protocol message as JSON")?; + let payload_len = u64::try_from(payload.len()).context("protocol message is too large")?; + + writer + .write_all(&payload_len.to_le_bytes()) + .context("failed to write protocol message length")?; + writer + .write_all(&payload) + .context("failed to write protocol message payload")?; + writer.flush().context("failed to flush protocol message")?; + Ok(()) +} + +pub fn read_message(reader: &mut R) -> Result { + let payload = + read_length_prefixed_payload(reader, MAX_CONTROL_MESSAGE_BYTES, "protocol message")?; + + serde_json::from_slice(&payload).context("failed to deserialize protocol message from JSON") +} + +pub fn write_response_frame( + writer: &mut W, + transport: ResponseTransport, + frame: &ResponseFrame, +) -> Result<()> { + match transport { + ResponseTransport::JsonMessages => write_message(writer, frame), + ResponseTransport::BinaryFrames => write_binary_response_frame(writer, frame), + } +} + +pub fn read_response_frame( + reader: &mut R, + transport: ResponseTransport, +) -> Result { + match transport { + ResponseTransport::JsonMessages => read_message(reader), + ResponseTransport::BinaryFrames => read_binary_response_frame(reader), + } +} + +fn write_binary_response_frame(writer: &mut W, frame: &ResponseFrame) -> Result<()> { + match frame { + ResponseFrame::Stdout(bytes) => { + write_binary_response_payload(writer, RESPONSE_FRAME_STDOUT, bytes) + } + ResponseFrame::Stderr(bytes) => { + write_binary_response_payload(writer, RESPONSE_FRAME_STDERR, bytes) + } + ResponseFrame::Exit(status) => { + let mut payload = Vec::with_capacity(BINARY_EXIT_STATUS_PAYLOAD_LEN); + payload.push(u8::from(status.code.is_some())); + payload.extend_from_slice(&status.code.unwrap_or_default().to_le_bytes()); + payload.push(u8::from(status.signal.is_some())); + payload.extend_from_slice(&status.signal.unwrap_or_default().to_le_bytes()); + write_binary_response_payload(writer, RESPONSE_FRAME_EXIT, &payload) + } + ResponseFrame::Error(message) => { + write_binary_response_payload(writer, RESPONSE_FRAME_ERROR, message.as_bytes()) + } + } +} + +fn write_binary_response_payload(writer: &mut W, tag: u8, payload: &[u8]) -> Result<()> { + let payload_len = + u64::try_from(payload.len()).context("response frame payload is too large")?; + writer + .write_all(&[tag]) + .context("failed to write response frame type")?; + writer + .write_all(&payload_len.to_le_bytes()) + .context("failed to write response frame length")?; + writer + .write_all(payload) + .context("failed to write response frame payload")?; + Ok(()) +} + +fn read_binary_response_frame(reader: &mut R) -> Result { + let mut tag = [0_u8; 1]; + reader + .read_exact(&mut tag) + .context("failed to read response frame type")?; + let payload = read_length_prefixed_payload(reader, MAX_RESPONSE_FRAME_BYTES, "response frame")?; + + match tag[0] { + RESPONSE_FRAME_STDOUT => Ok(ResponseFrame::Stdout(payload)), + RESPONSE_FRAME_STDERR => Ok(ResponseFrame::Stderr(payload)), + RESPONSE_FRAME_ERROR => Ok(ResponseFrame::Error( + String::from_utf8(payload).context("response error frame was not valid UTF-8")?, + )), + RESPONSE_FRAME_EXIT => read_binary_exit_status(&payload), + _ => bail!("received unknown response frame type `{}`", tag[0]), + } +} + +fn read_binary_exit_status(payload: &[u8]) -> Result { + if payload.len() != BINARY_EXIT_STATUS_PAYLOAD_LEN { + bail!( + "binary exit status frame payload had length {} but expected {}", + payload.len(), + BINARY_EXIT_STATUS_PAYLOAD_LEN + ) + } + + let code = if payload[0] == 0 { + None + } else { + Some(i32::from_le_bytes([ + payload[1], payload[2], payload[3], payload[4], + ])) + }; + let signal = if payload[5] == 0 { + None + } else { + Some(i32::from_le_bytes([ + payload[6], payload[7], payload[8], payload[9], + ])) + }; + + Ok(ResponseFrame::Exit(RemoteExitStatus { code, signal })) +} + +fn read_length_prefixed_payload( + reader: &mut R, + max_payload_len: u64, + payload_kind: &str, +) -> Result> { + let mut length_bytes = [0_u8; 8]; + reader + .read_exact(&mut length_bytes) + .with_context(|| format!("failed to read {payload_kind} length"))?; + + let payload_len = u64::from_le_bytes(length_bytes); + if payload_len > max_payload_len { + bail!( + "{payload_kind} length {payload_len} exceeds the maximum supported size of {max_payload_len} bytes" + ) + } + + let payload_len = usize::try_from(payload_len) + .with_context(|| format!("{payload_kind} does not fit in memory"))?; + let mut payload = vec![0_u8; payload_len]; + reader + .read_exact(&mut payload) + .with_context(|| format!("failed to read {payload_kind} payload"))?; + Ok(payload) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::HashMap; + use std::io::Cursor; + + #[derive(Default)] + struct MockUploadRegistrar { + available_paths: HashSet, + upload_ids_by_path: HashMap, + next_upload_id: u32, + } + + impl MockUploadRegistrar { + fn with_available_paths(paths: impl IntoIterator) -> Self { + Self { + available_paths: paths.into_iter().map(ToOwned::to_owned).collect(), + ..Self::default() + } + } + + fn register(&mut self, path_text: &str) -> u32 { + if let Some(&upload_id) = self.upload_ids_by_path.get(path_text) { + return upload_id; + } + + let upload_id = self.next_upload_id; + self.next_upload_id += 1; + self.upload_ids_by_path + .insert(path_text.to_owned(), upload_id); + upload_id + } + } + + impl UploadRegistrar for MockUploadRegistrar { + fn ensure_upload(&mut self, path_text: &str) -> Result { + if !self.available_paths.contains(path_text) { + bail!("mock upload `{path_text}` is unavailable") + } + Ok(self.register(path_text)) + } + + fn maybe_ensure_upload(&mut self, path_text: &str) -> Result> { + Ok(self + .available_paths + .contains(path_text) + .then(|| self.register(path_text))) + } + } + + #[test] + fn plan_request_rewrites_handles_file_flags_and_file_args() -> Result<()> { + let args = vec![ + "--config".to_owned(), + "config.toml".to_owned(), + "--input=data.txt".to_owned(), + "note.txt".to_owned(), + "-n".to_owned(), + ]; + let file_flags = vec!["--config".to_owned(), "--input".to_owned()]; + let options = RequestPlanningOptions { + file_flags: &file_flags, + file_args_enabled: true, + }; + let mut uploads = + MockUploadRegistrar::with_available_paths(["config.toml", "data.txt", "note.txt"]); + + let planned = plan_request_rewrites(&args, &options, &mut uploads)?; + + assert_eq!( + planned.rewrites, + vec![ + ArgRewrite::Replace { + arg_index: 1, + upload_id: 0, + }, + ArgRewrite::Prefixed { + arg_index: 2, + prefix: "--input=".to_owned(), + upload_id: 1, + }, + ArgRewrite::Replace { + arg_index: 3, + upload_id: 2, + }, + ] + ); + assert_eq!( + planned.stats, + RequestPlanningStats { + file_flag_rewrite_count: 2, + auto_file_arg_probe_count: 1, + auto_file_arg_rewrite_count: 1, + } + ); + + Ok(()) + } + + #[test] + fn plan_request_rewrites_treats_double_dash_like_any_other_argument() -> Result<()> { + let args = vec![ + "--".to_owned(), + "--config".to_owned(), + "plain.txt".to_owned(), + ]; + let file_flags = vec!["--config".to_owned()]; + let options = RequestPlanningOptions { + file_flags: &file_flags, + file_args_enabled: true, + }; + let mut uploads = MockUploadRegistrar::with_available_paths(["--config", "plain.txt"]); + + let planned = plan_request_rewrites(&args, &options, &mut uploads)?; + + assert_eq!( + planned.rewrites, + vec![ArgRewrite::Replace { + arg_index: 2, + upload_id: 0, + },] + ); + assert_eq!(planned.stats.file_flag_rewrite_count, 1); + assert_eq!(planned.stats.auto_file_arg_probe_count, 0); + assert_eq!(planned.stats.auto_file_arg_rewrite_count, 0); + + Ok(()) + } + + #[test] + fn request_header_validates_upload_ids_and_rewrite_references() { + let header = RequestHeader { + args: vec!["a".to_owned()], + env: BTreeMap::new(), + uploads: vec![ + UploadSpec { + id: 0, + original_path: "x".to_owned(), + size: 1, + }, + UploadSpec { + id: 0, + original_path: "y".to_owned(), + size: 1, + }, + ], + rewrites: vec![ArgRewrite::Replace { + arg_index: 0, + upload_id: 1, + }], + stdin_size: Some(0), + upload_transport: UploadTransport::StreamedBytes, + stdin_transport: UploadTransport::StreamedBytes, + response_transport: ResponseTransport::JsonMessages, + }; + + assert!(header.validate_invariants().is_err()); + } + + #[test] + fn request_header_requires_streamed_stdin_size() { + let header = RequestHeader { + args: Vec::new(), + env: BTreeMap::new(), + uploads: Vec::new(), + rewrites: Vec::new(), + stdin_size: None, + upload_transport: UploadTransport::StreamedBytes, + stdin_transport: UploadTransport::StreamedBytes, + response_transport: ResponseTransport::JsonMessages, + }; + + assert!(header.validate_invariants().is_err()); + } + + #[test] + fn binary_response_frames_round_trip() -> Result<()> { + let frames = [ + ResponseFrame::Stdout(vec![1, 2, 3]), + ResponseFrame::Stderr(vec![4, 5]), + ResponseFrame::Exit(RemoteExitStatus { + code: Some(42), + signal: None, + }), + ResponseFrame::Error("boom".to_owned()), + ]; + + let mut buffer = Cursor::new(Vec::new()); + for frame in &frames { + write_response_frame(&mut buffer, ResponseTransport::BinaryFrames, frame)?; + } + + buffer.set_position(0); + for expected in frames { + let actual = read_response_frame(&mut buffer, ResponseTransport::BinaryFrames)?; + assert_eq!(actual, expected); + } + + Ok(()) + } +}