Merge pull request #7 from dali99/refactor-nginx-upstream-generation

refactor nginx upstream generation, add support for unix sockets
This commit is contained in:
Daniel Lovbrotte Olsen 2024-03-13 06:23:07 +01:00 committed by GitHub
commit 19b85a2562
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 277 additions and 79 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
result

View File

@ -1,23 +1,23 @@
{ {
"nodes": { "nodes": {
"nixpkgs-lib": { "nixpkgs": {
"locked": { "locked": {
"lastModified": 1673743903, "lastModified": 1706098335,
"narHash": "sha256-sloY6KYyVOozJ1CkbgJPpZ99TKIjIvM+04V48C04sMQ=", "narHash": "sha256-r3dWjT8P9/Ah5m5ul4WqIWD8muj5F+/gbCdjiNVBKmU=",
"owner": "nix-community", "owner": "NixOS",
"repo": "nixpkgs.lib", "repo": "nixpkgs",
"rev": "7555e2dfcbac1533f047021f1744ac8871150f9f", "rev": "a77ab169a83a4175169d78684ddd2e54486ac651",
"type": "github" "type": "github"
}, },
"original": { "original": {
"owner": "nix-community", "id": "nixpkgs",
"repo": "nixpkgs.lib", "ref": "nixos-23.11",
"type": "github" "type": "indirect"
} }
}, },
"root": { "root": {
"inputs": { "inputs": {
"nixpkgs-lib": "nixpkgs-lib" "nixpkgs": "nixpkgs"
} }
} }
}, },

View File

@ -2,13 +2,29 @@
description = "NixOS modules for matrix related services"; description = "NixOS modules for matrix related services";
inputs = { inputs = {
nixpkgs-lib.url = github:nix-community/nixpkgs.lib; nixpkgs.url = "nixpkgs/nixos-23.11";
}; };
outputs = { self, nixpkgs-lib }: { outputs = { self, nixpkgs }: {
nixosModules = { nixosModules = {
default = import ./module.nix; default = import ./module.nix;
}; };
lib = import ./lib.nix { lib = nixpkgs-lib.lib; };
lib = import ./lib.nix { lib = nixpkgs.lib; };
packages = let
forAllSystems = f:
nixpkgs.lib.genAttrs [
"x86_64-linux"
"aarch64-linux"
"x86_64-darwin"
"aarch64-darwin"
] (system: f nixpkgs.legacyPackages.${system});
in forAllSystems (pkgs: {
tests = import ./tests {
inherit nixpkgs pkgs;
matrix-lib = self.lib;
};
});
}; };
} }

63
lib.nix
View File

@ -4,7 +4,7 @@ rec {
isListenerType = type: l: lib.any (r: lib.any (n: n == type) r.names) l.resources; isListenerType = type: l: lib.any (r: lib.any (n: n == type) r.names) l.resources;
# Get the first listener that includes the given resource from worker # Get the first listener that includes the given resource from worker
firstListenerOfType = type: ls: lib.lists.findFirst (isListenerType type) firstListenerOfType = type: ls: lib.lists.findFirst (isListenerType type)
(lib.throw "No listener with resource: ${type} configured") (throw "No listener with resource: ${type} configured")
ls; ls;
# Get an attrset of the host and port from a listener # Get an attrset of the host and port from a listener
connectionInfo = l: { connectionInfo = l: {
@ -17,4 +17,65 @@ rec {
l = firstListenerOfType r w.settings.worker_listeners; l = firstListenerOfType r w.settings.worker_listeners;
in connectionInfo l; in connectionInfo l;
mapWorkersToUpstreamsByType = workerInstances:
lib.pipe workerInstances [
lib.attrValues
# Index by worker type
(lib.foldl (acc: worker: acc // {
${worker.type} = (acc.${worker.type} or [ ]) ++ [ worker ];
}) { })
# Subindex by resource names, listener types, and convert to upstreams
(lib.mapAttrs (_: workers: lib.pipe workers [
(lib.concatMap (worker: worker.settings.worker_listeners))
lib.lists.head # only select one listener for the worker to avoid cache thrashing
lib.flatten
mapListenersToUpstreamsByType
]))
];
mapListenersToUpstreamsByType = listenerInstances:
lib.pipe listenerInstances [
# Index by resource names
(lib.concatMap (listener: lib.pipe listener [
(listener: let
allResourceNames = lib.pipe listener.resources [
(map (resource: resource.names))
lib.flatten
lib.unique
];
in if allResourceNames == [ ]
then { "empty" = listener; }
else lib.genAttrs allResourceNames (_: listener))
lib.attrsToList
]))
(lib.foldl (acc: listener: acc // {
${listener.name} = (acc.${listener.name} or [ ]) ++ [ listener.value ];
}) { })
# Index by listener type
(lib.mapAttrs (_:
(lib.foldl (acc: listener: acc // {
${listener.type} = (acc.${listener.type} or [ ]) ++ [ listener ];
}) { })
))
# Convert listeners to upstream URIs
(lib.mapAttrs (_:
(lib.mapAttrs (_: listeners:
lib.pipe listeners [
(lib.concatMap (listener:
if listener.path != null
then [ "unix:${listener.path}" ]
else (map (addr: "${addr}:${toString listener.port}") listener.bind_addresses)
))
# NOTE: Adding ` = { }` to every upstream might seem unnecessary in isolation,
# but it makes it easier to set upstreams in the nginx module.
(uris: lib.genAttrs uris (_: { }))
]
))
))
];
} }

View File

@ -10,7 +10,9 @@ let
wcfgText = "config.services.matrix-synapse-next.workers"; wcfgText = "config.services.matrix-synapse-next.workers";
format = pkgs.formats.yaml {}; format = pkgs.formats.yaml {};
matrix-synapse-common-config = format.generate "matrix-synapse-common-config.yaml" cfg.settings; matrix-synapse-common-config = format.generate "matrix-synapse-common-config.yaml" (cfg.settings // {
listeners = map (lib.filterAttrsRecursive (_: v: v != null)) cfg.settings.listeners;
});
# TODO: Align better with the upstream module # TODO: Align better with the upstream module
wrapped = cfg.package.override { wrapped = cfg.package.override {
@ -79,6 +81,15 @@ in
''; '';
}; };
socketDir = mkOption {
type = types.path;
default = "/run/matrix-synapse";
description = ''
The directory where matrix-synapse by default stores the sockets of
all listeners that bind to UNIX sockets.
'';
};
enableNginx = mkEnableOption "The synapse module managing nginx"; enableNginx = mkEnableOption "The synapse module managing nginx";
public_baseurl = mkOption { public_baseurl = mkOption {
@ -135,14 +146,42 @@ in
type = types.listOf (types.submodule { type = types.listOf (types.submodule {
options = { options = {
port = mkOption { port = mkOption {
type = types.port; type = with types; nullOr types.port;
description = "The TCP port to bind to"; default = null;
description = ''
The TCP port to bind to.
::: {.note}
This option will be ignored if {option}`path` is set to a non-null value.
:::
'';
example = 8448; example = 8448;
}; };
path = mkOption {
type = with types; nullOr path;
default = null;
description = ''
The UNIX socket to bind to.
::: {.note}
This option will override {option}`bind_addresses` and {option}`port`
if set to a non-null value.
:::
'';
example = literalExpression ''''${${cfgText}.socketDir}/matrix-synapse.sock'';
};
bind_addresses = mkOption { bind_addresses = mkOption {
type = types.listOf types.str; type = types.listOf types.str;
description = "A list of local addresses to listen on"; default = [ ];
description = ''
A list of local addresses to listen on.
::: {.note}
This option will be ignored if {option}`path` is set to a non-null value.
:::
'';
}; };
type = mkOption { type = mkOption {
@ -201,16 +240,14 @@ in
# TODO: add defaultText # TODO: add defaultText
default = [ default = [
{ {
port = 8008; path = "${cfg.socketDir}/matrix-synapse.sock";
bind_addresses = [ "127.0.0.1" ];
resources = [ resources = [
{ names = [ "client" ]; compress = true; } { names = [ "client" ]; compress = true; }
{ names = [ "federation" ]; compress = false; } { names = [ "federation" ]; compress = false; }
]; ];
} }
(mkIf (wcfg.instances != { }) { (mkIf (wcfg.instances != { }) {
port = 9093; path = "${cfg.socketDir}/matrix-synapse-replication.sock";
bind_addresses = [ "127.0.0.1" ];
resources = [ resources = [
{ names = [ "replication" ]; } { names = [ "replication" ]; }
]; ];
@ -352,6 +389,12 @@ in
}; };
config = mkIf cfg.enable { config = mkIf cfg.enable {
assertions = [ ]
++ (map (l: {
assertion = l.path == null -> (l.bind_addresses != [ ] && l.port != null);
message = "Some listeners are missing either a socket path or a bind_address + port to listen on";
}) cfg.settings.listeners);
users.users.matrix-synapse = { users.users.matrix-synapse = {
group = "matrix-synapse"; group = "matrix-synapse";
home = cfg.dataDir; home = cfg.dataDir;
@ -396,6 +439,8 @@ in
Group = "matrix-synapse"; Group = "matrix-synapse";
Slice = "system-matrix-synapse.slice"; Slice = "system-matrix-synapse.slice";
WorkingDirectory = cfg.dataDir; WorkingDirectory = cfg.dataDir;
StateDirectory = "matrix-synapse";
RuntimeDirectory = "matrix-synapse";
ExecStart = let ExecStart = let
flags = lib.cli.toGNUCommandLineShell {} { flags = lib.cli.toGNUCommandLineShell {} {
config-path = [ matrix-synapse-common-config ] ++ cfg.extraConfigFiles; config-path = [ matrix-synapse-common-config ] ++ cfg.extraConfigFiles;

View File

@ -2,13 +2,10 @@
let let
cfg = config.services.matrix-synapse-next; cfg = config.services.matrix-synapse-next;
getWorkersOfType = type: lib.filterAttrs (_: w: w.type == type) cfg.workers.instances; matrix-lib = (import ../lib.nix { inherit lib; });
isListenerType = type: listener: lib.lists.any (r: lib.lists.any (n: n == type) r.names) listener.resources;
firstListenerOfType = type: worker: lib.lists.findFirst (isListenerType type) (throw "No federation endpoint on receiver") worker.settings.worker_listeners; workerUpstreams = matrix-lib.mapWorkersToUpstreamsByType cfg.workers.instances;
wAddressOfType = type: w: lib.lists.findFirst (_: true) (throw "No address in receiver") (firstListenerOfType type w).bind_addresses; listenerUpstreams = matrix-lib.mapListenersToUpstreamsByType cfg.settings.listeners;
wPortOfType = type: w: (firstListenerOfType type w).port;
wSocketAddressOfType = type: w: "${wAddressOfType type w}:${builtins.toString (wPortOfType type w)}";
generateSocketAddresses = type: workers: lib.mapAttrsToList (_: w: "${wSocketAddressOfType type w}") workers;
in in
{ {
config = lib.mkIf cfg.enableNginx { config = lib.mkIf cfg.enableNginx {
@ -138,24 +135,17 @@ in
''; '';
services.nginx.upstreams.synapse_master.servers = let services.nginx.upstreams.synapse_master.servers = let
isMainListener = l: isListenerType "client" l && isListenerType "federation" l; mainListeners = builtins.intersectAttrs
firstMainListener = lib.findFirst isMainListener (listenerUpstreams.client.http or { })
(throw "No catch-all listener configured") cfg.settings.listeners; (listenerUpstreams.federation.http or { });
address = lib.findFirst (_: true) (throw "No address in main listener") firstMainListener.bind_addresses; in
port = firstMainListener.port; assert lib.assertMsg (mainListeners != { })
socketAddress = "${address}:${builtins.toString port}"; "No catch-all listener configured, or listener is not bound to an address";
in { mainListeners;
"${socketAddress}" = { };
};
services.nginx.upstreams.synapse_worker_federation = { services.nginx.upstreams.synapse_worker_federation = {
servers = let servers = workerUpstreams.fed-receiver.federation.http or config.services.nginx.upstreams.synapse_master.servers;
fedReceivers = getWorkersOfType "fed-receiver";
socketAddresses = generateSocketAddresses "federation" fedReceivers;
in if fedReceivers != { } then
lib.genAttrs socketAddresses (_: { })
else config.services.nginx.upstreams.synapse_master.servers;
extraConfig = '' extraConfig = ''
ip_hash; ip_hash;
''; '';
@ -163,12 +153,7 @@ in
services.nginx.upstreams.synapse_worker_initial_sync = { services.nginx.upstreams.synapse_worker_initial_sync = {
servers = let servers = workerUpstreams.initial-sync.client.http or config.services.nginx.upstreams.synapse_master.servers;
initialSyncers = getWorkersOfType "initial-sync";
socketAddresses = generateSocketAddresses "client" initialSyncers;
in if initialSyncers != { } then
lib.genAttrs socketAddresses (_: { })
else config.services.nginx.upstreams.synapse_master.servers;
extraConfig = '' extraConfig = ''
hash $mxid_localpart consistent; hash $mxid_localpart consistent;
''; '';
@ -176,12 +161,7 @@ in
services.nginx.upstreams.synapse_worker_normal_sync = { services.nginx.upstreams.synapse_worker_normal_sync = {
servers = let servers = workerUpstreams.normal-sync.client.http or config.services.nginx.upstreams.synapse_master.servers;
normalSyncers = getWorkersOfType "normal-sync";
socketAddresses = generateSocketAddresses "client" normalSyncers;
in if normalSyncers != { } then
lib.genAttrs socketAddresses (_: { })
else config.services.nginx.upstreams.synapse_master.servers;
extraConfig = '' extraConfig = ''
hash $mxid_localpart consistent; hash $mxid_localpart consistent;
''; '';
@ -189,12 +169,7 @@ in
services.nginx.upstreams.synapse_worker_user-dir = { services.nginx.upstreams.synapse_worker_user-dir = {
servers = let servers = workerUpstreams.user-dir.client.http or config.services.nginx.upstreams.synapse_master.servers;
workers = getWorkersOfType "user-dir";
socketAddresses = generateSocketAddresses "client" workers;
in if workers != { } then
lib.genAttrs socketAddresses (_: { })
else config.services.nginx.upstreams.synapse_master.servers;
}; };
services.nginx.virtualHosts."${cfg.public_baseurl}" = { services.nginx.virtualHosts."${cfg.public_baseurl}" = {

View File

@ -86,10 +86,17 @@ in {
}; };
port = mkOption { port = mkOption {
type = types.port; type = with types; nullOr port;
default = null;
description = "The TCP port to bind to"; description = "The TCP port to bind to";
}; };
path = mkOption {
type = with types; nullOr path;
default = null;
description = "The UNIX socket to bind to";
};
bind_addresses = mkOption { bind_addresses = mkOption {
type = with types; listOf str; type = with types; listOf str;
description = "A list of local addresses to listen on"; description = "A list of local addresses to listen on";
@ -161,7 +168,7 @@ in {
}; };
in { in {
mainReplicationHost = mkOption { mainReplicationHost = mkOption {
type = types.str; type = with types; nullOr str;
default = let default = let
host = (matrix-lib.connectionInfo mainReplicationListener).host; host = (matrix-lib.connectionInfo mainReplicationListener).host;
in in
@ -174,18 +181,32 @@ in {
}; };
mainReplicationPort = mkOption { mainReplicationPort = mkOption {
type = types.port; type = with types; nullOr port;
default = mainReplicationListener.port; default = mainReplicationListener.port;
# TODO: add defaultText # TODO: add defaultText
description = "Port for the main synapse instance's replication listener"; description = "Port for the main synapse instance's replication listener";
}; };
mainReplicationPath = mkOption {
type = with types; nullOr path;
default = mainReplicationListener.path;
# TODO: add defaultText
description = "Path to the UNIX socket of the main synapse instance's replication listener";
};
defaultListenerAddress = mkOption { defaultListenerAddress = mkOption {
type = types.str; type = types.str;
default = "127.0.0.1"; default = "127.0.0.1";
description = "The default listener address for the worker"; description = "The default listener address for the worker";
}; };
workersUsePath = mkOption {
type = types.bool;
description = "Whether to enable UNIX sockets for all automatically generated workers";
default = true;
example = false;
};
workerStartingPort = mkOption { workerStartingPort = mkOption {
type = types.port; type = types.port;
description = "What port should the automatically configured workers start enumerating from"; description = "What port should the automatically configured workers start enumerating from";
@ -233,22 +254,32 @@ in {
}; };
config = { config = {
assertions = [ ]
++ (lib.concatMap (worker:
(map (l: {
assertion = l.path == null -> (l.bind_addresses != [ ] && l.port != null);
message = "At least one worker listener is missing either a socket path or a bind_address + port to listen on";
}) worker.settings.worker_listeners)
) (lib.attrValues wcfg.instances));
services.matrix-synapse-next.settings = { services.matrix-synapse-next.settings = {
federation_sender_instances = federation_sender_instances =
lib.genList (i: "auto-fed-sender${toString (i + 1)}") wcfg.federationSenders; lib.genList (i: "auto-fed-sender${toString (i + 1)}") wcfg.federationSenders;
instance_map = (lib.mkIf (cfg.workers.instances != { }) ({ instance_map = (lib.mkIf (cfg.workers.instances != { }) ({
main = let main = if wcfg.mainReplicationPath != null then {
host = lib.head mainReplicationListener.bind_addresses; path = wcfg.mainReplicationPath;
in { } else {
host = if builtins.elem host [ "0.0.0.0" "::"] then "127.0.0.1" else host; host = wcfg.mainReplicationHost;
port = mainReplicationListener.port; port = wcfg.mainReplicationPort;
}; };
} // genAttrs' (lib.lists.range 1 wcfg.eventPersisters) } // genAttrs' (lib.lists.range 1 wcfg.eventPersisters)
(i: "auto-event-persist${toString i}") (i: "auto-event-persist${toString i}")
(i: let (i: let
wRL = matrix-lib.firstListenerOfType "replication" wcfg.instances."auto-event-persist${toString i}".settings.worker_listeners; wRL = matrix-lib.firstListenerOfType "replication" wcfg.instances."auto-event-persist${toString i}".settings.worker_listeners;
in matrix-lib.connectionInfo wRL))); in if wRL.path != null then {
inherit (wRL) path;
} else matrix-lib.connectionInfo wRL)));
stream_writers.events = stream_writers.events =
mkIf (wcfg.eventPersisters > 0) mkIf (wcfg.eventPersisters > 0)
@ -260,10 +291,15 @@ in {
services.matrix-synapse-next.workers.instances = let services.matrix-synapse-next.workers.instances = let
sum = lib.foldl lib.add 0; sum = lib.foldl lib.add 0;
workerListenersWithMetrics = portOffset: workerListenersWithMetrics = portOffset: name:
lib.singleton ({ [(if wcfg.workersUsePath
then {
path = "${cfg.socketDir}/matrix-synapse-worker-${name}.sock";
}
else {
port = wcfg.workerStartingPort + portOffset - 1; port = wcfg.workerStartingPort + portOffset - 1;
}) }
)]
++ lib.optional wcfg.enableMetrics { ++ lib.optional wcfg.enableMetrics {
port = wcfg.metricsStartingPort + portOffset; port = wcfg.metricsStartingPort + portOffset;
resources = [ { names = [ "metrics" ]; } ]; resources = [ { names = [ "metrics" ]; } ];
@ -274,7 +310,7 @@ in {
numberOfWorkers, numberOfWorkers,
portOffset ? 0, portOffset ? 0,
nameFn ? i: "auto-${type}${toString i}", nameFn ? i: "auto-${type}${toString i}",
workerListenerFn ? i: workerListenersWithMetrics (portOffset + i) workerListenerFn ? i: name: workerListenersWithMetrics (portOffset + i) name
}: genAttrs' }: genAttrs'
(lib.lists.range 1 numberOfWorkers) (lib.lists.range 1 numberOfWorkers)
nameFn nameFn
@ -282,7 +318,7 @@ in {
isAuto = true; isAuto = true;
inherit type; inherit type;
index = i; index = i;
settings.worker_listeners = workerListenerFn i; settings.worker_listeners = workerListenerFn i (nameFn i);
}); });
workerInstances = { workerInstances = {
@ -323,8 +359,13 @@ in {
systemd.services = let systemd.services = let
workerList = lib.mapAttrsToList lib.nameValuePair wcfg.instances; workerList = lib.mapAttrsToList lib.nameValuePair wcfg.instances;
workerConfig = worker: format.generate "matrix-synapse-worker-${worker.name}-config.yaml" workerConfig = worker:
(worker.value.settings // { worker_name = worker.name; }); format.generate "matrix-synapse-worker-${worker.name}-config.yaml"
(worker.value.settings // {
worker_name = worker.name;
worker_listeners =
map (lib.filterAttrsRecursive (_: v: v != null)) worker.value.settings.worker_listeners;
});
in builtins.listToAttrs (lib.flip map workerList (worker: { in builtins.listToAttrs (lib.flip map workerList (worker: {
name = "matrix-synapse-worker-${worker.name}"; name = "matrix-synapse-worker-${worker.name}";
value = { value = {
@ -339,6 +380,8 @@ in {
Group = "matrix-synapse"; Group = "matrix-synapse";
Slice = "system-matrix-synapse.slice"; Slice = "system-matrix-synapse.slice";
WorkingDirectory = cfg.dataDir; WorkingDirectory = cfg.dataDir;
RuntimeDirectory = "matrix-synapse";
StateDirectory = "matrix-synapse";
ExecStartPre = pkgs.writers.writeBash "wait-for-synapse" '' ExecStartPre = pkgs.writers.writeBash "wait-for-synapse" ''
# From https://md.darmstadt.ccc.de/synapse-at-work # From https://md.darmstadt.ccc.de/synapse-at-work
while ! systemctl is-active -q matrix-synapse.service; do while ! systemctl is-active -q matrix-synapse.service; do

4
tests/default.nix Normal file
View File

@ -0,0 +1,4 @@
{ nixpkgs, pkgs, matrix-lib, ... }:
{
nginx-pipeline = pkgs.callPackage ./nginx-pipeline { inherit nixpkgs matrix-lib; };
}

View File

@ -0,0 +1,53 @@
{ nixpkgs, lib, matrix-lib, writeText, ... }:
let
nixosConfig = nixpkgs.lib.nixosSystem {
system = "x86_64-linux";
modules = [
../../module.nix
{
system.stateVersion = "23.11";
boot.isContainer = true;
services.matrix-synapse-next = {
enable = true;
enableNginx = true;
workers = {
enableMetrics = true;
federationSenders = 3;
federationReceivers = 3;
initialSyncers = 1;
normalSyncers = 1;
eventPersisters = 1;
useUserDirectoryWorker = true;
instances.auto-fed-receiver1.settings.worker_listeners = [
{
bind_addresses = [
"127.0.0.2"
];
port = 1337;
resources = [
{ compress = false;
names = [ "federation" ];
}
];
}
];
};
settings.server_name = "example.com";
};
}
];
};
inherit (nixosConfig.config.services.matrix-synapse-next.workers) instances;
in
writeText "matrix-synapse-next-nginx-pipeline-test.txt" ''
${(lib.generators.toPretty {}) instances}
====================================================
${(lib.generators.toPretty {}) (matrix-lib.mapWorkersToUpstreamsByType instances)}
''