1 Commits

Author SHA1 Message Date
7f45c49a79 WIP 2026-01-12 16:32:30 +09:00
18 changed files with 584 additions and 577 deletions

757
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,11 +1,12 @@
[package]
name = "muscl"
version = "1.0.0"
version = "0.1.0"
edition = "2024"
resolver = "2"
license = "BSD-3-Clause"
authors = [
"Programvareverkstedet <projects@pvv.ntnu.no>",
"oysteikt@pvv.ntnu.no",
"felixalb@pvv.ntnu.no",
]
homepage = "https://git.pvv.ntnu.no/Projects/muscl"
repository = "https://git.pvv.ntnu.no/Projects/muscl"
@@ -18,50 +19,50 @@ autobins = false
autolib = false
[dependencies]
anyhow = "1.0.102"
anyhow = "1.0.100"
async-bincode = "0.8.0"
bincode = "2.0.1"
clap = { version = "4.6.0", features = ["cargo", "derive"] }
clap = { version = "4.5.54", features = ["cargo", "derive"] }
clap-verbosity-flag = { version = "3.0.4", features = [ "tracing" ] }
clap_complete = { version = "4.6.0", features = ["unstable-dynamic"] }
clap_complete = { version = "4.5.65", features = ["unstable-dynamic"] }
color-print = "0.3.7"
const_format = "0.2.35"
derive_more = { version = "2.1.1", features = ["display", "error"] }
dialoguer = "0.12.0"
futures-util = "0.3.32"
futures-util = "0.3.31"
humansize = "2.1.3"
indoc = "2.0.7"
itertools = "0.14.0"
nix = { version = "0.31.2", features = ["fs", "process", "socket", "user"] }
nix = { version = "0.30.1", features = ["fs", "process", "socket", "user"] }
num_cpus = "1.17.0"
prettytable = "0.10.0"
rand = "0.10.0"
rand = "0.9.2"
serde = "1.0.228"
serde_json = { version = "1.0.149", features = ["preserve_order"] }
sqlx = { version = "0.8.6", features = ["runtime-tokio", "mysql", "tls-rustls"] }
thiserror = "2.0.18"
tokio = { version = "1.50.0", features = ["rt-multi-thread", "macros", "signal"] }
thiserror = "2.0.17"
tokio = { version = "1.49.0", features = ["rt-multi-thread", "macros", "signal"] }
tokio-serde = { version = "0.9.0", features = ["bincode"] }
tokio-stream = "0.1.18"
tokio-util = { version = "0.7.18", features = ["codec", "rt"] }
toml = "1.1.2"
toml = "0.9.11"
tracing = { version = "0.1.44", features = ["log"] }
tracing-subscriber = "0.3.23"
uuid = { version = "1.23.0", features = ["v4"] }
tracing-subscriber = "0.3.22"
uuid = { version = "1.19.0", features = ["v4"] }
[target.'cfg(target_os = "linux")'.dependencies]
landlock = "0.4.4"
sd-notify = "0.5.0"
sd-notify = "0.4.5"
tracing-journald = "0.3.2"
[build-dependencies]
anyhow = "1.0.102"
build-info-build = "0.0.43"
git2 = { version = "0.20.4", default-features = false }
anyhow = "1.0.100"
build-info-build = "0.0.42"
git2 = { version = "0.20.3", default-features = false }
[dev-dependencies]
pretty_assertions = "1.4.1"
regex = "1.12.3"
regex = "1.12.2"
[features]
default = ["mysql-admutils-compatibility"]

View File

@@ -3,7 +3,6 @@ Description=Muscl MySQL admin tool
[Socket]
ListenStream=/run/muscl/muscl.sock
RemoveOnStop=true
Accept=no
PassCredentials=true

View File

View File

@@ -0,0 +1,53 @@
[Unit]
Description=Authorization daemon for Muscl
[Service]
Type=notify
ExecStart=/usr/local/bin/muscl_auth_daemon.py
# WatchdogSec=15
User=muscl
Group=muscl
DynamicUser=yes
; ConfigurationDirectory=muscl
; RuntimeDirectory=muscl
; # This is required to read unix user/group details.
; PrivateUsers=false
; # Needed to communicate with MySQL.
; PrivateNetwork=false
; PrivateIPC=false
; AmbientCapabilities=
; CapabilityBoundingSet=
; DeviceAllow=
; DevicePolicy=closed
; LockPersonality=true
; MemoryDenyWriteExecute=true
; NoNewPrivileges=true
; PrivateDevices=true
; PrivateMounts=true
; PrivateTmp=yes
; ProcSubset=pid
; ProtectClock=true
; ProtectControlGroups=strict
; ProtectHome=true
; ProtectHostname=true
; ProtectKernelLogs=true
; ProtectKernelModules=true
; ProtectKernelTunables=true
; ProtectProc=invisible
; ProtectSystem=strict
; RemoveIPC=true
; RestrictAddressFamilies=AF_UNIX AF_INET AF_INET6
; RestrictNamespaces=true
; RestrictRealtime=true
; RestrictSUIDSGID=true
; SocketBindDeny=any
; SystemCallArchitectures=native
; SystemCallFilter=@system-service
; SystemCallFilter=~@privileged @resources
; UMask=0777

View File

@@ -0,0 +1,8 @@
[Unit]
Description=Authorization daemon for Muscl
WantedBy=sockets.target
[Socket]
ListenStream=/run/muscl/muscl-auth-daemon.socket
Accept=no
SocketMode=0660

View File

@@ -0,0 +1,84 @@
#!/usr/bin/env python3
# TODO: create pool of workers to handle requests concurrently
# the socket should be a listener socket and each worker should accept connections from it
# the socket should accept requests as newline-separated JSON objects
# there should be a watchdog to monitor worker health and restart them if they die
# graceful shutdown should be implemented for the workers
# optional logging of requests and responses
# use systemd notify to signal readiness and amount of connections handled
import json
import os
from socket import AF_UNIX, SOCK_DGRAM, SOCK_STREAM, fromfd, socket
from multiprocessing import Pool
def get_listener_from_systemd() -> socket:
listen_fds = int(os.getenv("LISTEN_FDS", "0"))
listen_pid = int(os.getenv("LISTEN_PID", "0"))
if listen_fds != 1 or listen_pid != os.getpid():
raise RuntimeError("No socket passed from systemd")
assert listen_fds == 1
sock = fromfd(3, AF_UNIX, SOCK_STREAM)
sock.setblocking(False)
return sock
def get_notify_socket_from_systemd() -> socket:
notify_socket_path = os.getenv("NOTIFY_SOCKET")
if not notify_socket_path:
raise RuntimeError("No notify socket path found in environment")
sock = socket(AF_UNIX, SOCK_DGRAM)
sock.connect(notify_socket_path)
return sock
def run_auth_daemon(sock: socket):
sock.listen()
print("Auth daemon is running and listening for connections...")
with Pool() as worker_pool:
with get_notify_socket_from_systemd() as notify_socket:
notify_socket.sendall(b"READY=1\n")
while True:
conn, _ = sock.accept()
worker_pool.apply_async(session_handler, args=(conn,))
def session_handler(sock: socket):
buffer = ""
while True:
data = sock.recv(4096).decode("utf-8")
if not data:
print("Connection closed by client")
break
buffer += data
if buffer.endswith("\n"):
requests = buffer.strip().split("\n")
buffer = ""
for request in requests:
try:
req_json = json.loads(request)
username = req_json.get("username", "")
groups = req_json.get("groups", [])
resource_type = req_json.get("resource_type", "")
resource = req_json.get("resource", "")
allowed = process_request(username, groups, resource_type, resource)
response = {"allowed": allowed}
except json.JSONDecodeError:
response = {"error": "Invalid JSON"}
sock.sendall((json.dumps(response) + "\n").encode("utf-8"))
def process_request(
username: str,
groups: list[str],
resource_type: str,
resource: str,
) -> bool:
...
if __name__ == "__main__":
listener_socket = get_listener_from_systemd()
run_auth_daemon(listener_socket)

18
flake.lock generated
View File

@@ -2,11 +2,11 @@
"nodes": {
"crane": {
"locked": {
"lastModified": 1774313767,
"narHash": "sha256-hy0XTQND6avzGEUFrJtYBBpFa/POiiaGBr2vpU6Y9tY=",
"lastModified": 1767744144,
"narHash": "sha256-9/9ntI0D+HbN4G0TrK3KmHbTvwgswz7p8IEJsWyef8Q=",
"owner": "ipetkov",
"repo": "crane",
"rev": "3d9df76e29656c679c744968b17fbaf28f0e923d",
"rev": "2fb033290bf6b23f226d4c8b32f7f7a16b043d7e",
"type": "github"
},
"original": {
@@ -17,11 +17,11 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1775036866,
"narHash": "sha256-ZojAnPuCdy657PbTq5V0Y+AHKhZAIwSIT2cb8UgAz/U=",
"lastModified": 1768127708,
"narHash": "sha256-1Sm77VfZh3mU0F5OqKABNLWxOuDeHIlcFjsXeeiPazs=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "6201e203d09599479a3b3450ed24fa81537ebc4e",
"rev": "ffbc9f8cbaacfb331b6017d5a5abb21a492c9a38",
"type": "github"
},
"original": {
@@ -45,11 +45,11 @@
]
},
"locked": {
"lastModified": 1775099554,
"narHash": "sha256-3xBsGnGDLOFtnPZ1D3j2LU19wpAlYefRKTlkv648rU0=",
"lastModified": 1768186348,
"narHash": "sha256-nkpIe3zkpeoFuOl8xBpexulECsHLQ9Ljg1gW3bPCjSI=",
"owner": "oxalica",
"repo": "rust-overlay",
"rev": "8d6387ed6d8e6e6672fd3ed4b61b59d44b124d99",
"rev": "af69e497567a5945a64057717bc9b17c8478097e",
"type": "github"
},
"original": {

View File

@@ -105,6 +105,7 @@
fileset = lib.fileset.unions [
(craneLib.fileset.commonCargoSources ./.)
./assets
./examples
];
};
in {

View File

@@ -85,10 +85,13 @@ buildFunction ({
install -Dm644 assets/systemd/muscl.service -t "$out/lib/systemd/system"
substituteInPlace "$out/lib/systemd/system/muscl.service" \
--replace-fail '/usr/bin/muscl-server' "$out/bin/muscl-server"
mkdir -p "$out/share/muscl"
cp -r examples "$out/share/muscl"
'';
meta = with lib; {
license = licenses.bsd3;
license = licenses.mit;
platforms = platforms.linux ++ platforms.darwin;
inherit mainProgram;
};

View File

@@ -27,6 +27,31 @@ in
}.${level};
};
authHandler = lib.mkOption {
type = with lib.types; nullOr lines;
default = null;
description = "Custom authentication handler, written in python";
example = ''
def process_request(
username: str,
groups: list[str],
resource_type: str,
resource: str,
) -> bool:
if resource_type == "database":
if resource.startswith(username) or any(
resource.startswith(group) for group in groups
):
return True
elif resource_type == "user":
if resource.startswith(username) or any(
resource.startswith(group) for group in groups
):
return True
return False
'';
};
settings = lib.mkOption {
default = { };
type = lib.types.submodule {
@@ -191,5 +216,72 @@ in
++ (lib.optionals (cfg.settings.mysql.host != null) [ "AF_INET" "AF_INET6" ]);
};
};
systemd.sockets."muscl-auth-daemon" = lib.mkIf (cfg.authHandler != null) {
description = "Authorization daemon for Muscl";
wantedBy = [ "sockets.target" ];
socketConfig = {
ListenStream = "/run/muscl/muscl-auth-daemon.sock";
Accept = "no";
};
};
systemd.services."muscl-auth-daemon" = lib.mkIf (cfg.authHandler != null) {
description = "Authorization daemon for Muscl";
requires = [ "muscl-auth-daemon.socket" ];
serviceConfig = {
Type = "notify";
ExecStart = let
authScript = lib.pipe ../examples/auth_daemon_python/muscl_auth_daemon.py [
lib.fileContents
(lib.replaceString ''
def process_request(
username: str,
groups: list[str],
resource_type: str,
resource: str,
) -> bool:
...
'' cfg.authHandler)
(pkgs.writers.writePyPy3Bin "muscl-auth-handler.py" { })
];
in lib.getExe authScript;
User = "muscl-auth-daemon";
Group = "muscl-auth-daemon";
DynamicUser = true;
AmbientCapabilities = [ "" ];
CapabilityBoundingSet = [ "" ];
DeviceAllow = [ "" ];
LockPersonality = true;
NoNewPrivileges = true;
PrivateDevices = true;
PrivateMounts = true;
PrivateTmp = "yes";
ProcSubset = "pid";
ProtectClock = true;
ProtectControlGroups = "strict";
ProtectHome = true;
ProtectHostname = true;
ProtectKernelLogs = true;
ProtectKernelModules = true;
ProtectKernelTunables = true;
ProtectProc = "invisible";
ProtectSystem = "strict";
RemoveIPC = true;
UMask = "0777";
RestrictNamespaces = true;
RestrictRealtime = true;
RestrictSUIDSGID = true;
SystemCallArchitectures = "native";
SocketBindDeny = [ "any" ];
SystemCallFilter = [
"@system-service"
"~@privileged"
"~@resources"
];
};
};
};
}

View File

@@ -56,6 +56,25 @@ nixpkgs.lib.nixosSystem {
enable = true;
logLevel = "trace";
createLocalDatabaseUser = true;
authHandler = ''
def process_request(
username: str,
groups: list[str],
resource_type: str,
resource: str,
) -> bool:
if resource_type == "database":
if resource.startswith(username) or any(
resource.startswith(group) for group in groups
):
return True
elif resource_type == "user":
if resource.startswith(username) or any(
resource.startswith(group) for group in groups
):
return True
return False
'';
};
programs.vim = {

View File

@@ -54,13 +54,11 @@ for variant in debian-bookworm debian-trixie ubuntu-jammy ubuntu-noble; do
DEB_VERSION=$(find "$TMPDIR/muscl-deb-$variant-$GIT_SHA"/*.deb -print0 | xargs -0 -n1 basename | cut -d'_' -f2 | head -n1)
DEB_ARCH=$(find "$TMPDIR/muscl-deb-$variant-$GIT_SHA"/*.deb -print0 | xargs -0 -n1 basename | cut -d'_' -f3 | cut -d'.' -f1 | head -n1)
# echo "[DELETE] https://git.pvv.ntnu.no/api/packages/Projects/debian/pool/$DISTRO_VERSION_NAME/main/$DEB_NAME/$DEB_VERSION/$DEB_ARCH"
# curl \
# -X DELETE \
# --user "$GITEA_USER:$GITEA_TOKEN" \
# "https://git.pvv.ntnu.no/api/packages/Projects/debian/pool/$DISTRO_VERSION_NAME/main/$DEB_NAME/$DEB_VERSION/$DEB_ARCH"
curl \
-X DELETE \
--user "$GITEA_USER:$GITEA_TOKEN" \
"https://git.pvv.ntnu.no/api/packages/Projects/debian/pool/$DISTRO_VERSION_NAME/main/$DEB_NAME/$DEB_VERSION/$DEB_ARCH"
echo "[PUT] https://git.pvv.ntnu.no/api/packages/Projects/debian/pool/$DISTRO_VERSION_NAME/main/upload"
curl \
-X PUT \
--user "$GITEA_USER:$GITEA_TOKEN" \

View File

@@ -319,8 +319,7 @@ fn main() -> anyhow::Result<()> {
#[cfg(not(feature = "suid-sgid-mode"))]
None,
args.verbose,
)
.context("Failed to connect to the server")?;
)?;
tokio_run_command(args.command, connection)?;

View File

@@ -1,6 +1,5 @@
use std::{
fs,
os::unix::fs::FileTypeExt,
path::{Path, PathBuf},
sync::Arc,
time::Duration,
@@ -8,10 +7,7 @@ use std::{
use anyhow::{Context, anyhow};
use clap_verbosity_flag::{InfoLevel, Verbosity};
use nix::{
libc::{EXIT_SUCCESS, exit},
unistd::{AccessFlags, access},
};
use nix::libc::{EXIT_SUCCESS, exit};
use sqlx::mysql::MySqlPoolOptions;
use std::os::unix::net::UnixStream as StdUnixStream;
use tokio::{net::UnixStream as TokioUnixStream, sync::RwLock};
@@ -134,28 +130,11 @@ pub fn bootstrap_server_connection_and_drop_privileges(
}
}
fn socket_path_is_ok(path: &Path) -> anyhow::Result<()> {
fs::metadata(path)
.context(format!("Failed to get metadata for {:?}", path))
.and_then(|meta| {
if !meta.file_type().is_socket() {
anyhow::bail!("{:?} is not a unix socket", path);
}
access(path, AccessFlags::R_OK | AccessFlags::W_OK)
.with_context(|| format!("Socket at {:?} is not readable/writable", path))?;
Ok(())
})
}
fn connect_to_external_server(
server_socket_path: Option<PathBuf>,
) -> anyhow::Result<StdUnixStream> {
// TODO: ensure this is both readable and writable
if let Some(socket_path) = server_socket_path {
tracing::trace!("Checking socket at {:?}", socket_path);
socket_path_is_ok(&socket_path)?;
tracing::debug!("Connecting to socket at {:?}", socket_path);
return match StdUnixStream::connect(socket_path) {
Ok(socket) => Ok(socket),
@@ -168,9 +147,6 @@ fn connect_to_external_server(
}
if fs::metadata(DEFAULT_SOCKET_PATH).is_ok() {
tracing::trace!("Checking socket at {:?}", DEFAULT_SOCKET_PATH);
socket_path_is_ok(Path::new(DEFAULT_SOCKET_PATH))?;
tracing::debug!("Connecting to default socket at {:?}", DEFAULT_SOCKET_PATH);
return match StdUnixStream::connect(DEFAULT_SOCKET_PATH) {
Ok(socket) => Ok(socket),
@@ -182,9 +158,7 @@ fn connect_to_external_server(
};
}
anyhow::bail!(
"No socket path provided, and no socket found found at default location {DEFAULT_SOCKET_PATH}"
);
anyhow::bail!("No socket path provided, and no default socket found");
}
// TODO: this function is security critical, it should be integration tested

View File

@@ -34,7 +34,7 @@ impl DerefMut for MySQLUser {
impl fmt::Display for MySQLUser {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
write!(f, "{:<width$}", self.0, width = f.width().unwrap_or(0))
}
}
@@ -83,7 +83,7 @@ impl DerefMut for MySQLDatabase {
impl fmt::Display for MySQLDatabase {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
write!(f, "{:<width$}", self.0, width = f.width().unwrap_or(0))
}
}

View File

@@ -59,10 +59,6 @@ fn parse_group_denylist(denylist_path: &Path, lines: Lines) -> GroupDenylist {
}
.trim();
if trimmed_line.is_empty() {
continue;
}
let parts: Vec<&str> = trimmed_line.splitn(2, ':').collect();
if parts.len() != 2 {
tracing::warn!(

View File

@@ -90,12 +90,14 @@ impl Supervisor {
};
let mut watchdog_duration = None;
let mut watchdog_micro_seconds = 0;
#[cfg(target_os = "linux")]
let watchdog_task =
if systemd_mode && let Some(watchdog_duration_) = sd_notify::watchdog_enabled() {
if systemd_mode && sd_notify::watchdog_enabled(true, &mut watchdog_micro_seconds) {
let watchdog_duration_ = Duration::from_micros(watchdog_micro_seconds);
tracing::debug!(
"Systemd watchdog enabled with {} millisecond interval",
watchdog_duration_.as_millis()
watchdog_micro_seconds.div_ceil(1000),
);
watchdog_duration = Some(watchdog_duration_);
Some(spawn_watchdog_task(watchdog_duration_))
@@ -293,12 +295,15 @@ impl Supervisor {
pub async fn reload(&self) -> anyhow::Result<()> {
#[cfg(target_os = "linux")]
sd_notify::notify(&[
sd_notify::NotifyState::Reloading,
sd_notify::NotifyState::monotonic_usec_now()
.expect("Failed to get monotonic time to send to systemd while reloading"),
sd_notify::NotifyState::Status("Reloading configuration"),
])?;
sd_notify::notify(
false,
&[
sd_notify::NotifyState::Reloading,
sd_notify::NotifyState::monotonic_usec_now()
.expect("Failed to get monotonic time to send to systemd while reloading"),
sd_notify::NotifyState::Status("Reloading configuration"),
],
)?;
let previous_config = self.config.lock().await.clone();
self.reload_config().await?;
@@ -335,14 +340,14 @@ impl Supervisor {
}
#[cfg(target_os = "linux")]
sd_notify::notify(&[sd_notify::NotifyState::Ready])?;
sd_notify::notify(false, &[sd_notify::NotifyState::Ready])?;
Ok(())
}
pub async fn shutdown(&self) -> anyhow::Result<()> {
#[cfg(target_os = "linux")]
sd_notify::notify(&[sd_notify::NotifyState::Stopping])?;
sd_notify::notify(false, &[sd_notify::NotifyState::Stopping])?;
tracing::debug!("Stop accepting new connections");
self.stop_receiving_new_connections()?;
@@ -412,7 +417,7 @@ fn spawn_watchdog_task(duration: Duration) -> JoinHandle<()> {
);
loop {
interval.tick().await;
if let Err(err) = sd_notify::notify(&[sd_notify::NotifyState::Watchdog]) {
if let Err(err) = sd_notify::notify(false, &[sd_notify::NotifyState::Watchdog]) {
tracing::warn!("Failed to notify systemd watchdog: {}", err);
}
}
@@ -435,7 +440,9 @@ fn spawn_status_notifier_task(task_tracker: TaskTracker) -> JoinHandle<()> {
"Waiting for connections".to_string()
};
if let Err(e) = sd_notify::notify(&[sd_notify::NotifyState::Status(message.as_str())]) {
if let Err(e) =
sd_notify::notify(false, &[sd_notify::NotifyState::Status(message.as_str())])
{
tracing::warn!("Failed to send systemd status notification: {}", e);
}
}
@@ -550,7 +557,7 @@ async fn listener_task(
group_denylist: Arc<RwLock<GroupDenylist>>,
) -> anyhow::Result<()> {
#[cfg(target_os = "linux")]
sd_notify::notify(&[sd_notify::NotifyState::Ready])?;
sd_notify::notify(false, &[sd_notify::NotifyState::Ready])?;
let connection_counter = AtomicU64::new(0);