federationReceivers first pass, needs a lot of cleanup

This commit is contained in:
Daniel Lovbrotte Olsen 2022-10-10 10:55:16 +02:00
parent 59f4e969c4
commit ccbd092679
1 changed files with 99 additions and 17 deletions

View File

@ -7,6 +7,11 @@ let
pluginsEnv = cfg.package.python.buildEnv.override { pluginsEnv = cfg.package.python.buildEnv.override {
extraLibs = cfg.plugins; extraLibs = cfg.plugins;
}; };
typeToResources = t: {
"fed-receiver" = [ "federation" ];
"fed-sender" = [ ];
}.t;
in in
{ {
options.services.matrix-synapse-next = { options.services.matrix-synapse-next = {
@ -66,11 +71,9 @@ in
dMRL.bind_addresses; dMRL.bind_addresses;
dMRP = dMRL.port; dMRP = dMRL.port;
in { in {
enable = lib.mkEnableOption "synapse worker support";
mainReplicationHost = lib.mkOption { mainReplicationHost = lib.mkOption {
type = lib.types.str; type = lib.types.str;
default = if (lib.lists.all (x: dMRH != x) ["0.0.0.0" "::"]) then dMRH else "127.0.0.1"; default = if builtins.elem dMRH [ "0.0.0.0" "::" ] then "127.0.0.1" else dMRH;
description = "Host of the main synapse instance's replication listener"; description = "Host of the main synapse instance's replication listener";
}; };
@ -80,16 +83,55 @@ in
description = "Port for the main synapse instance's replication listener"; description = "Port for the main synapse instance's replication listener";
}; };
defaultListenerAddress = lib.mkOption {
type = lib.types.str;
default = "127.0.0.1";
description = "The default listener address for the worker";
};
workerStartingPort = lib.mkOption {
type = lib.types.port;
description = "What port should the automatically configured workers start enumerating from";
default = 8083;
};
federationSenders = lib.mkOption { federationSenders = lib.mkOption {
type = lib.types.ints.positive; type = lib.types.ints.unsigned;
description = "How many automatically configured federation senders to set up"; description = "How many automatically configured federation senders to set up";
default = 0; default = 0;
}; };
federationReceivers = lib.mkOption {
type = lib.types.ints.unsigned;
description = "How many automatically configured federation recievers to set up";
default = 0;
};
instances = lib.mkOption { instances = lib.mkOption {
type = lib.types.attrsOf (lib.types.submodule ({config, ...}: { type = lib.types.attrsOf (lib.types.submodule ({config, ...}: {
options.settings = lib.mkOption {
type = lib.types.submodule { options.autoType = lib.mkOption {
type = lib.types.enum [ null "fed-sender" "fed-receiver" ];
internal = true;
default = null;
};
options.settings = let
instanceCfg = config;
inherit (instanceCfg) autoType autoPort;
isAuto = autoType != null;
mapTypeApp = t: {
"fed-sender" = "synapse.app.federation_sender";
"fed-receiver" = "synapse.app.generic_worker";
}.${t};
defaultApp = if (instanceCfg.autoType == null)
then "synapse.app.generic_worker"
else mapTypeApp instanceCfg.autoType;
in lib.mkOption {
type = lib.types.submodule ({config, ...}: {
freeformType = format.type; freeformType = format.type;
options.worker_app = lib.mkOption { options.worker_app = lib.mkOption {
@ -102,6 +144,7 @@ in
"synapse.app.user_dir" "synapse.app.user_dir"
]; ];
description = "The type of worker application"; description = "The type of worker application";
default = defaultApp;
}; };
options.worker_replication_host = lib.mkOption { options.worker_replication_host = lib.mkOption {
type = lib.types.str; type = lib.types.str;
@ -127,11 +170,12 @@ in
options.bind_addresses = lib.mkOption { options.bind_addresses = lib.mkOption {
type = lib.types.listOf lib.types.str; type = lib.types.listOf lib.types.str;
description = "A list of local addresses to listen on"; description = "A list of local addresses to listen on";
default = [ cfg.workers.defaultListenerAddress ];
}; };
options.tls = lib.mkOption { options.tls = lib.mkOption {
type = lib.types.bool; type = lib.types.bool;
description = "set to true to enable TLS for this listener. Will use the TLS key/cert specified in tls_private_key_path / tls_certificate_path."; description = "set to true to enable TLS for this listener. Will use the TLS key/cert specified in tls_private_key_path / tls_certificate_path.";
default = true; default = false;
}; };
options.x_forwarded = lib.mkOption { options.x_forwarded = lib.mkOption {
type = lib.types.bool; type = lib.types.bool;
@ -139,7 +183,7 @@ in
Only valid for an 'http' listener. Set to true to use the X-Forwarded-For header as the client IP. Only valid for an 'http' listener. Set to true to use the X-Forwarded-For header as the client IP.
Useful when Synapse is behind a reverse-proxy. Useful when Synapse is behind a reverse-proxy.
''; '';
default = false; default = true;
}; };
options.resources = lib.mkOption { options.resources = lib.mkOption {
type = lib.types.listOf (lib.types.submodule { type = lib.types.listOf (lib.types.submodule {
@ -158,7 +202,7 @@ in
description = "Listener configuration for the worker, similar to the main synapse listener"; description = "Listener configuration for the worker, similar to the main synapse listener";
default = [ ]; default = [ ];
}; };
}; });
}; };
})); }));
default = { }; default = { };
@ -259,17 +303,29 @@ in
}; };
}); });
description = "List of ports that Synapse should listen on, their purpose and their configuration"; description = "List of ports that Synapse should listen on, their purpose and their configuration";
default = [ default = let
enableReplication = lib.lists.any
(w: !(builtins.elem w.settings.worker_app [ "federationSender" ]))
cfg.workers.instances;
in [
{ {
port = 8448; port = 8008;
bind_addresses = [ "0.0.0.0" "::" ]; bind_addresses = [ "127.0.0.1" ];
resources = [ resources = [
{ names = [ "client" ]; compress = true; } { names = [ "client" ]; compress = true; }
{ names = [ "federation" ]; compress = false; } { names = [ "federation" ]; compress = false; }
]; ];
} }
(lib.mkIf enableReplication {
port = 9093;
bind_addresses = [ "127.0.0.1" ];
resources = [
{ names = [ "replication" ]; }
];
})
]; ];
}; };
options.federation_ip_range_blacklist = lib.mkOption { options.federation_ip_range_blacklist = lib.mkOption {
type = lib.types.listOf lib.types.str; type = lib.types.listOf lib.types.str;
description = '' description = ''
@ -449,13 +505,32 @@ in
Restart = "on-failure"; Restart = "on-failure";
}; };
}; };
})
(lib.mkMerge [
({
services.matrix-synapse-next.settings.federation_sender_instances = lib.genList (x: "auto-fed-sender${toString x}") cfg.workers.federationSenders; services.matrix-synapse-next.settings.federation_sender_instances = lib.genList (x: "auto-fed-sender${toString x}") cfg.workers.federationSenders;
services.matrix-synapse-next.workers.instances = lib.attrsets.genAttrs (lib.genList services.matrix-synapse-next.workers.instances = lib.attrsets.genAttrs (lib.genList
(x: "auto-fed-sender${toString x}") cfg.workers.federationSenders) (x: "auto-fed-sender${toString x}") cfg.workers.federationSenders)
(_: { settings.worker_app = "synapse.app.federation_sender";}); (_: { autoType = "fed-sender"; });
}) })
({
services.matrix-synapse-next.workers.instances = lib.mapAttrs'
(name: value: lib.attrsets.nameValuePair "auto-fed-receiver${name}" value)
(lib.genAttrs (lib.genList (x: builtins.toString x) cfg.workers.federationReceivers)
(x: {
autoType = "fed-receiver";
settings.worker_listeners = [
{ port = cfg.workers.workerStartingPort + (lib.strings.toInt x);
resources = [ {names = ["federation"];} ];
}
];
}));
})
])
({ ({
systemd.services = let systemd.services = let
workerList = lib.mapAttrsToList (name: value: lib.nameValuePair name value ) cfg.workers.instances; workerList = lib.mapAttrsToList (name: value: lib.nameValuePair name value ) cfg.workers.instances;
@ -470,6 +545,7 @@ in
partOf = [ "matrix-synapse.target" ]; partOf = [ "matrix-synapse.target" ];
wantedBy = [ "matrix-synapse.target" ]; wantedBy = [ "matrix-synapse.target" ];
after = [ "matrix-synapse.service" ]; after = [ "matrix-synapse.service" ];
requires = [ "matrix-synapse.service" ];
environment.PYTHONPATH = lib.makeSearchPathOutput "lib" cfg.package.python.sitePackages [ environment.PYTHONPATH = lib.makeSearchPathOutput "lib" cfg.package.python.sitePackages [
pluginsEnv pluginsEnv
]; ];
@ -478,6 +554,12 @@ in
User = "matrix-synapse"; User = "matrix-synapse";
Group = "matrix-synapse"; Group = "matrix-synapse";
WorkingDirectory = cfg.dataDir; WorkingDirectory = cfg.dataDir;
ExecStartPre = pkgs.writers.writeBash "wait-for-synapse" ''
# From https://md.darmstadt.ccc.de/synapse-at-work
while ! systemctl is-active -q matrix-synapse.service; do
sleep 1
done
'';
ExecStart = '' ExecStart = ''
${cfg.package}/bin/synapse_worker \ ${cfg.package}/bin/synapse_worker \
${ lib.concatMapStringsSep "\n " (x: "--config-path ${x} \\") ([ matrix-synapse-common-config (workerConfig worker) ] ++ cfg.extraConfigFiles) } ${ lib.concatMapStringsSep "\n " (x: "--config-path ${x} \\") ([ matrix-synapse-common-config (workerConfig worker) ] ++ cfg.extraConfigFiles) }