diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b2be92b --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +result diff --git a/flake.lock b/flake.lock index e8d4a00..ffc815b 100644 --- a/flake.lock +++ b/flake.lock @@ -1,23 +1,23 @@ { "nodes": { - "nixpkgs-lib": { + "nixpkgs": { "locked": { - "lastModified": 1673743903, - "narHash": "sha256-sloY6KYyVOozJ1CkbgJPpZ99TKIjIvM+04V48C04sMQ=", - "owner": "nix-community", - "repo": "nixpkgs.lib", - "rev": "7555e2dfcbac1533f047021f1744ac8871150f9f", + "lastModified": 1706098335, + "narHash": "sha256-r3dWjT8P9/Ah5m5ul4WqIWD8muj5F+/gbCdjiNVBKmU=", + "owner": "NixOS", + "repo": "nixpkgs", + "rev": "a77ab169a83a4175169d78684ddd2e54486ac651", "type": "github" }, "original": { - "owner": "nix-community", - "repo": "nixpkgs.lib", - "type": "github" + "id": "nixpkgs", + "ref": "nixos-23.11", + "type": "indirect" } }, "root": { "inputs": { - "nixpkgs-lib": "nixpkgs-lib" + "nixpkgs": "nixpkgs" } } }, diff --git a/flake.nix b/flake.nix index a6125b5..b284666 100644 --- a/flake.nix +++ b/flake.nix @@ -2,13 +2,29 @@ description = "NixOS modules for matrix related services"; inputs = { - nixpkgs-lib.url = github:nix-community/nixpkgs.lib; + nixpkgs.url = "nixpkgs/nixos-23.11"; }; - outputs = { self, nixpkgs-lib }: { + outputs = { self, nixpkgs }: { nixosModules = { default = import ./module.nix; }; - lib = import ./lib.nix { lib = nixpkgs-lib.lib; }; + + lib = import ./lib.nix { lib = nixpkgs.lib; }; + + packages = let + forAllSystems = f: + nixpkgs.lib.genAttrs [ + "x86_64-linux" + "aarch64-linux" + "x86_64-darwin" + "aarch64-darwin" + ] (system: f nixpkgs.legacyPackages.${system}); + in forAllSystems (pkgs: { + tests = import ./tests { + inherit nixpkgs pkgs; + matrix-lib = self.lib; + }; + }); }; } diff --git a/lib.nix b/lib.nix index 217accd..24a769d 100644 --- a/lib.nix +++ b/lib.nix @@ -4,7 +4,7 @@ rec { isListenerType = type: l: lib.any (r: lib.any (n: n == type) r.names) l.resources; # Get the first listener that includes the given resource from worker firstListenerOfType = type: ls: lib.lists.findFirst (isListenerType type) - (lib.throw "No listener with resource: ${type} configured") + (throw "No listener with resource: ${type} configured") ls; # Get an attrset of the host and port from a listener connectionInfo = l: { @@ -17,4 +17,65 @@ rec { l = firstListenerOfType r w.settings.worker_listeners; in connectionInfo l; + mapWorkersToUpstreamsByType = workerInstances: + lib.pipe workerInstances [ + lib.attrValues + + # Index by worker type + (lib.foldl (acc: worker: acc // { + ${worker.type} = (acc.${worker.type} or [ ]) ++ [ worker ]; + }) { }) + + # Subindex by resource names, listener types, and convert to upstreams + (lib.mapAttrs (_: workers: lib.pipe workers [ + (lib.concatMap (worker: worker.settings.worker_listeners)) + lib.lists.head # only select one listener for the worker to avoid cache thrashing + lib.flatten + mapListenersToUpstreamsByType + ])) + ]; + + mapListenersToUpstreamsByType = listenerInstances: + lib.pipe listenerInstances [ + # Index by resource names + (lib.concatMap (listener: lib.pipe listener [ + (listener: let + allResourceNames = lib.pipe listener.resources [ + (map (resource: resource.names)) + lib.flatten + lib.unique + ]; + in if allResourceNames == [ ] + then { "empty" = listener; } + else lib.genAttrs allResourceNames (_: listener)) + lib.attrsToList + ])) + + (lib.foldl (acc: listener: acc // { + ${listener.name} = (acc.${listener.name} or [ ]) ++ [ listener.value ]; + }) { }) + + # Index by listener type + (lib.mapAttrs (_: + (lib.foldl (acc: listener: acc // { + ${listener.type} = (acc.${listener.type} or [ ]) ++ [ listener ]; + }) { }) + )) + + # Convert listeners to upstream URIs + (lib.mapAttrs (_: + (lib.mapAttrs (_: listeners: + lib.pipe listeners [ + (lib.concatMap (listener: + if listener.path != null + then [ "unix:${listener.path}" ] + else (map (addr: "${addr}:${toString listener.port}") listener.bind_addresses) + )) + # NOTE: Adding ` = { }` to every upstream might seem unnecessary in isolation, + # but it makes it easier to set upstreams in the nginx module. + (uris: lib.genAttrs uris (_: { })) + ] + )) + )) + ]; } diff --git a/synapse-module/default.nix b/synapse-module/default.nix index 45eb879..a523bb0 100644 --- a/synapse-module/default.nix +++ b/synapse-module/default.nix @@ -10,7 +10,9 @@ let wcfgText = "config.services.matrix-synapse-next.workers"; format = pkgs.formats.yaml {}; - matrix-synapse-common-config = format.generate "matrix-synapse-common-config.yaml" cfg.settings; + matrix-synapse-common-config = format.generate "matrix-synapse-common-config.yaml" (cfg.settings // { + listeners = map (lib.filterAttrsRecursive (_: v: v != null)) cfg.settings.listeners; + }); # TODO: Align better with the upstream module wrapped = cfg.package.override { @@ -79,6 +81,15 @@ in ''; }; + socketDir = mkOption { + type = types.path; + default = "/run/matrix-synapse"; + description = '' + The directory where matrix-synapse by default stores the sockets of + all listeners that bind to UNIX sockets. + ''; + }; + enableNginx = mkEnableOption "The synapse module managing nginx"; public_baseurl = mkOption { @@ -135,14 +146,42 @@ in type = types.listOf (types.submodule { options = { port = mkOption { - type = types.port; - description = "The TCP port to bind to"; + type = with types; nullOr types.port; + default = null; + description = '' + The TCP port to bind to. + + ::: {.note} + This option will be ignored if {option}`path` is set to a non-null value. + ::: + ''; example = 8448; }; + path = mkOption { + type = with types; nullOr path; + default = null; + description = '' + The UNIX socket to bind to. + + ::: {.note} + This option will override {option}`bind_addresses` and {option}`port` + if set to a non-null value. + ::: + ''; + example = literalExpression ''''${${cfgText}.socketDir}/matrix-synapse.sock''; + }; + bind_addresses = mkOption { type = types.listOf types.str; - description = "A list of local addresses to listen on"; + default = [ ]; + description = '' + A list of local addresses to listen on. + + ::: {.note} + This option will be ignored if {option}`path` is set to a non-null value. + ::: + ''; }; type = mkOption { @@ -201,16 +240,14 @@ in # TODO: add defaultText default = [ { - port = 8008; - bind_addresses = [ "127.0.0.1" ]; + path = "${cfg.socketDir}/matrix-synapse.sock"; resources = [ { names = [ "client" ]; compress = true; } { names = [ "federation" ]; compress = false; } ]; } (mkIf (wcfg.instances != { }) { - port = 9093; - bind_addresses = [ "127.0.0.1" ]; + path = "${cfg.socketDir}/matrix-synapse-replication.sock"; resources = [ { names = [ "replication" ]; } ]; @@ -352,6 +389,12 @@ in }; config = mkIf cfg.enable { + assertions = [ ] + ++ (map (l: { + assertion = l.path == null -> (l.bind_addresses != [ ] && l.port != null); + message = "Some listeners are missing either a socket path or a bind_address + port to listen on"; + }) cfg.settings.listeners); + users.users.matrix-synapse = { group = "matrix-synapse"; home = cfg.dataDir; @@ -396,6 +439,8 @@ in Group = "matrix-synapse"; Slice = "system-matrix-synapse.slice"; WorkingDirectory = cfg.dataDir; + StateDirectory = "matrix-synapse"; + RuntimeDirectory = "matrix-synapse"; ExecStart = let flags = lib.cli.toGNUCommandLineShell {} { config-path = [ matrix-synapse-common-config ] ++ cfg.extraConfigFiles; diff --git a/synapse-module/nginx.nix b/synapse-module/nginx.nix index 6259ba5..a350d7f 100644 --- a/synapse-module/nginx.nix +++ b/synapse-module/nginx.nix @@ -2,13 +2,10 @@ let cfg = config.services.matrix-synapse-next; - getWorkersOfType = type: lib.filterAttrs (_: w: w.type == type) cfg.workers.instances; - isListenerType = type: listener: lib.lists.any (r: lib.lists.any (n: n == type) r.names) listener.resources; - firstListenerOfType = type: worker: lib.lists.findFirst (isListenerType type) (throw "No federation endpoint on receiver") worker.settings.worker_listeners; - wAddressOfType = type: w: lib.lists.findFirst (_: true) (throw "No address in receiver") (firstListenerOfType type w).bind_addresses; - wPortOfType = type: w: (firstListenerOfType type w).port; - wSocketAddressOfType = type: w: "${wAddressOfType type w}:${builtins.toString (wPortOfType type w)}"; - generateSocketAddresses = type: workers: lib.mapAttrsToList (_: w: "${wSocketAddressOfType type w}") workers; + matrix-lib = (import ../lib.nix { inherit lib; }); + + workerUpstreams = matrix-lib.mapWorkersToUpstreamsByType cfg.workers.instances; + listenerUpstreams = matrix-lib.mapListenersToUpstreamsByType cfg.settings.listeners; in { config = lib.mkIf cfg.enableNginx { @@ -138,24 +135,17 @@ in ''; services.nginx.upstreams.synapse_master.servers = let - isMainListener = l: isListenerType "client" l && isListenerType "federation" l; - firstMainListener = lib.findFirst isMainListener - (throw "No catch-all listener configured") cfg.settings.listeners; - address = lib.findFirst (_: true) (throw "No address in main listener") firstMainListener.bind_addresses; - port = firstMainListener.port; - socketAddress = "${address}:${builtins.toString port}"; - in { - "${socketAddress}" = { }; - }; + mainListeners = builtins.intersectAttrs + (listenerUpstreams.client.http or { }) + (listenerUpstreams.federation.http or { }); + in + assert lib.assertMsg (mainListeners != { }) + "No catch-all listener configured, or listener is not bound to an address"; + mainListeners; services.nginx.upstreams.synapse_worker_federation = { - servers = let - fedReceivers = getWorkersOfType "fed-receiver"; - socketAddresses = generateSocketAddresses "federation" fedReceivers; - in if fedReceivers != { } then - lib.genAttrs socketAddresses (_: { }) - else config.services.nginx.upstreams.synapse_master.servers; + servers = workerUpstreams.fed-receiver.federation.http or config.services.nginx.upstreams.synapse_master.servers; extraConfig = '' ip_hash; ''; @@ -163,12 +153,7 @@ in services.nginx.upstreams.synapse_worker_initial_sync = { - servers = let - initialSyncers = getWorkersOfType "initial-sync"; - socketAddresses = generateSocketAddresses "client" initialSyncers; - in if initialSyncers != { } then - lib.genAttrs socketAddresses (_: { }) - else config.services.nginx.upstreams.synapse_master.servers; + servers = workerUpstreams.initial-sync.client.http or config.services.nginx.upstreams.synapse_master.servers; extraConfig = '' hash $mxid_localpart consistent; ''; @@ -176,12 +161,7 @@ in services.nginx.upstreams.synapse_worker_normal_sync = { - servers = let - normalSyncers = getWorkersOfType "normal-sync"; - socketAddresses = generateSocketAddresses "client" normalSyncers; - in if normalSyncers != { } then - lib.genAttrs socketAddresses (_: { }) - else config.services.nginx.upstreams.synapse_master.servers; + servers = workerUpstreams.normal-sync.client.http or config.services.nginx.upstreams.synapse_master.servers; extraConfig = '' hash $mxid_localpart consistent; ''; @@ -189,12 +169,7 @@ in services.nginx.upstreams.synapse_worker_user-dir = { - servers = let - workers = getWorkersOfType "user-dir"; - socketAddresses = generateSocketAddresses "client" workers; - in if workers != { } then - lib.genAttrs socketAddresses (_: { }) - else config.services.nginx.upstreams.synapse_master.servers; + servers = workerUpstreams.user-dir.client.http or config.services.nginx.upstreams.synapse_master.servers; }; services.nginx.virtualHosts."${cfg.public_baseurl}" = { diff --git a/synapse-module/workers.nix b/synapse-module/workers.nix index 4c5fd2c..ed60628 100644 --- a/synapse-module/workers.nix +++ b/synapse-module/workers.nix @@ -86,10 +86,17 @@ in { }; port = mkOption { - type = types.port; + type = with types; nullOr port; + default = null; description = "The TCP port to bind to"; }; + path = mkOption { + type = with types; nullOr path; + default = null; + description = "The UNIX socket to bind to"; + }; + bind_addresses = mkOption { type = with types; listOf str; description = "A list of local addresses to listen on"; @@ -161,7 +168,7 @@ in { }; in { mainReplicationHost = mkOption { - type = types.str; + type = with types; nullOr str; default = let host = (matrix-lib.connectionInfo mainReplicationListener).host; in @@ -174,18 +181,32 @@ in { }; mainReplicationPort = mkOption { - type = types.port; + type = with types; nullOr port; default = mainReplicationListener.port; # TODO: add defaultText description = "Port for the main synapse instance's replication listener"; }; + mainReplicationPath = mkOption { + type = with types; nullOr path; + default = mainReplicationListener.path; + # TODO: add defaultText + description = "Path to the UNIX socket of the main synapse instance's replication listener"; + }; + defaultListenerAddress = mkOption { type = types.str; default = "127.0.0.1"; description = "The default listener address for the worker"; }; + workersUsePath = mkOption { + type = types.bool; + description = "Whether to enable UNIX sockets for all automatically generated workers"; + default = true; + example = false; + }; + workerStartingPort = mkOption { type = types.port; description = "What port should the automatically configured workers start enumerating from"; @@ -233,22 +254,32 @@ in { }; config = { + assertions = [ ] + ++ (lib.concatMap (worker: + (map (l: { + assertion = l.path == null -> (l.bind_addresses != [ ] && l.port != null); + message = "At least one worker listener is missing either a socket path or a bind_address + port to listen on"; + }) worker.settings.worker_listeners) + ) (lib.attrValues wcfg.instances)); + services.matrix-synapse-next.settings = { federation_sender_instances = lib.genList (i: "auto-fed-sender${toString (i + 1)}") wcfg.federationSenders; instance_map = (lib.mkIf (cfg.workers.instances != { }) ({ - main = let - host = lib.head mainReplicationListener.bind_addresses; - in { - host = if builtins.elem host [ "0.0.0.0" "::"] then "127.0.0.1" else host; - port = mainReplicationListener.port; + main = if wcfg.mainReplicationPath != null then { + path = wcfg.mainReplicationPath; + } else { + host = wcfg.mainReplicationHost; + port = wcfg.mainReplicationPort; }; } // genAttrs' (lib.lists.range 1 wcfg.eventPersisters) (i: "auto-event-persist${toString i}") (i: let wRL = matrix-lib.firstListenerOfType "replication" wcfg.instances."auto-event-persist${toString i}".settings.worker_listeners; - in matrix-lib.connectionInfo wRL))); + in if wRL.path != null then { + inherit (wRL) path; + } else matrix-lib.connectionInfo wRL))); stream_writers.events = mkIf (wcfg.eventPersisters > 0) @@ -260,10 +291,15 @@ in { services.matrix-synapse-next.workers.instances = let sum = lib.foldl lib.add 0; - workerListenersWithMetrics = portOffset: - lib.singleton ({ - port = wcfg.workerStartingPort + portOffset - 1; - }) + workerListenersWithMetrics = portOffset: name: + [(if wcfg.workersUsePath + then { + path = "${cfg.socketDir}/matrix-synapse-worker-${name}.sock"; + } + else { + port = wcfg.workerStartingPort + portOffset - 1; + } + )] ++ lib.optional wcfg.enableMetrics { port = wcfg.metricsStartingPort + portOffset; resources = [ { names = [ "metrics" ]; } ]; @@ -274,7 +310,7 @@ in { numberOfWorkers, portOffset ? 0, nameFn ? i: "auto-${type}${toString i}", - workerListenerFn ? i: workerListenersWithMetrics (portOffset + i) + workerListenerFn ? i: name: workerListenersWithMetrics (portOffset + i) name }: genAttrs' (lib.lists.range 1 numberOfWorkers) nameFn @@ -282,7 +318,7 @@ in { isAuto = true; inherit type; index = i; - settings.worker_listeners = workerListenerFn i; + settings.worker_listeners = workerListenerFn i (nameFn i); }); workerInstances = { @@ -323,8 +359,13 @@ in { systemd.services = let workerList = lib.mapAttrsToList lib.nameValuePair wcfg.instances; - workerConfig = worker: format.generate "matrix-synapse-worker-${worker.name}-config.yaml" - (worker.value.settings // { worker_name = worker.name; }); + workerConfig = worker: + format.generate "matrix-synapse-worker-${worker.name}-config.yaml" + (worker.value.settings // { + worker_name = worker.name; + worker_listeners = + map (lib.filterAttrsRecursive (_: v: v != null)) worker.value.settings.worker_listeners; + }); in builtins.listToAttrs (lib.flip map workerList (worker: { name = "matrix-synapse-worker-${worker.name}"; value = { @@ -339,6 +380,8 @@ in { Group = "matrix-synapse"; Slice = "system-matrix-synapse.slice"; WorkingDirectory = cfg.dataDir; + RuntimeDirectory = "matrix-synapse"; + StateDirectory = "matrix-synapse"; ExecStartPre = pkgs.writers.writeBash "wait-for-synapse" '' # From https://md.darmstadt.ccc.de/synapse-at-work while ! systemctl is-active -q matrix-synapse.service; do diff --git a/tests/default.nix b/tests/default.nix new file mode 100644 index 0000000..5ad1572 --- /dev/null +++ b/tests/default.nix @@ -0,0 +1,4 @@ +{ nixpkgs, pkgs, matrix-lib, ... }: +{ + nginx-pipeline = pkgs.callPackage ./nginx-pipeline { inherit nixpkgs matrix-lib; }; +} diff --git a/tests/nginx-pipeline/default.nix b/tests/nginx-pipeline/default.nix new file mode 100644 index 0000000..03958cc --- /dev/null +++ b/tests/nginx-pipeline/default.nix @@ -0,0 +1,53 @@ +{ nixpkgs, lib, matrix-lib, writeText, ... }: +let + nixosConfig = nixpkgs.lib.nixosSystem { + system = "x86_64-linux"; + modules = [ + ../../module.nix + { + system.stateVersion = "23.11"; + boot.isContainer = true; + services.matrix-synapse-next = { + enable = true; + enableNginx = true; + + workers = { + enableMetrics = true; + + federationSenders = 3; + federationReceivers = 3; + initialSyncers = 1; + normalSyncers = 1; + eventPersisters = 1; + useUserDirectoryWorker = true; + + instances.auto-fed-receiver1.settings.worker_listeners = [ + { + bind_addresses = [ + "127.0.0.2" + ]; + port = 1337; + resources = [ + { compress = false; + names = [ "federation" ]; + } + ]; + } + ]; + }; + + settings.server_name = "example.com"; + }; + } + ]; + }; + + inherit (nixosConfig.config.services.matrix-synapse-next.workers) instances; +in + writeText "matrix-synapse-next-nginx-pipeline-test.txt" '' + ${(lib.generators.toPretty {}) instances} + + ==================================================== + + ${(lib.generators.toPretty {}) (matrix-lib.mapWorkersToUpstreamsByType instances)} + ''