Cleaned up matrix synapse module

This commit is contained in:
Oystein Kristoffer Tveit 2023-01-18 01:23:58 +01:00
parent b6f0a026a7
commit 2fd07f83b5
Signed by untrusted user: oysteikt
GPG Key ID: 9F2F7D8250F35146
3 changed files with 890 additions and 797 deletions

View File

@ -1,35 +1,50 @@
{ lib, pkgs, config, ... }: { pkgs, lib, config, ... }:
let let
cfg = config.services.matrix-synapse-next; cfg = config.services.matrix-synapse-next;
wcfg = cfg.workers; wcfg = cfg.workers;
# Used to generate proper defaultTexts.
cfgText = "config.services.matrix-synapse-next";
wcfgText = "config.services.matrix-synapse-next.workers";
format = pkgs.formats.yaml {}; format = pkgs.formats.yaml {};
matrix-synapse-common-config = format.generate "matrix-synapse-common-config.yaml" cfg.settings; matrix-synapse-common-config = format.generate "matrix-synapse-common-config.yaml" cfg.settings;
pluginsEnv = cfg.package.python.buildEnv.override { pluginsEnv = cfg.package.python.buildEnv.override {
extraLibs = cfg.plugins; extraLibs = cfg.plugins;
}; };
genAttrs' = items: f: g: builtins.listToAttrs (builtins.map (i: lib.attrsets.nameValuePair (f i) (g i)) items); inherit (lib)
literalExpression
mkEnableOption
mkIf
mkMerge
mkOption
mkPackageOption
types;
isListenerType = type: listener: lib.lists.any (r: lib.lists.any (n: n == type) r.names) listener.resources; throw' = str: throw ''
matrix-synapse-next error:
${str}
'';
in in
{ {
imports = [ ./nginx.nix ]; imports = [
./nginx.nix
(import ./workers.nix {
inherit throw' format matrix-synapse-common-config pluginsEnv;
})
];
options.services.matrix-synapse-next = { options.services.matrix-synapse-next = {
enable = lib.mkEnableOption "matrix-synapse"; enable = mkEnableOption "matrix-synapse";
package = lib.mkOption { package = mkPackageOption pkgs "matrix-synapse" {};
type = lib.types.package;
default = pkgs.matrix-synapse;
};
plugins = lib.mkOption { plugins = mkOption {
type = lib.types.listOf lib.types.package; type = types.listOf types.package;
default = [ ]; default = [ ];
example = lib.literalExample '' example = literalExpression ''
with config.services.matrix-synapse-advanced.package.plugins; [ with ${cfgText}.package.plugins; [
matrix-synapse-ldap3 matrix-synapse-ldap3
matrix-synapse-pam matrix-synapse-pam
]; ];
@ -39,8 +54,8 @@ in
''; '';
}; };
dataDir = lib.mkOption { dataDir = mkOption {
type = lib.types.str; type = types.path;
default = "/var/lib/matrix-synapse"; default = "/var/lib/matrix-synapse";
description = '' description = ''
The directory where matrix-synapse stores its stateful data such as The directory where matrix-synapse stores its stateful data such as
@ -48,239 +63,31 @@ in
''; '';
}; };
enableNginx = lib.mkEnableOption "Enable the synapse module managing nginx"; enableNginx = mkEnableOption "The synapse module managing nginx";
public_baseurl = lib.mkOption { public_baseurl = mkOption {
type = lib.types.str; type = types.str;
default = "matrix.${cfg.settings.server_name}"; default = "matrix.${cfg.settings.server_name}";
description = "The domain where clients and such will connect (May be different from server_name if using delegation)"; defaultText =
}; literalExpression ''matrix.''${${cfgText}.settings.server_name}'';
mainLogConfig = lib.mkOption {
type = lib.types.lines;
description = "A yaml python logging config file";
default = lib.readFile ./matrix-synapse-log_config.yaml;
};
workers = let
isReplication = l: lib.lists.any (r: lib.lists.any (n: n == "replication") r.names) l.resources;
dMRL = lib.lists.findFirst isReplication
(throw "No replication listener configured!")
cfg.settings.listeners;
dMRH = lib.findFirst (x: true) (throw "Replication listener had no addresses")
dMRL.bind_addresses;
dMRP = dMRL.port;
in {
mainReplicationHost = lib.mkOption {
type = lib.types.str;
default = if builtins.elem dMRH [ "0.0.0.0" "::" ] then "127.0.0.1" else dMRH;
description = "Host of the main synapse instance's replication listener";
};
mainReplicationPort = lib.mkOption {
type = lib.types.port;
default = dMRP;
description = "Port for the main synapse instance's replication listener";
};
defaultListenerAddress = lib.mkOption {
type = lib.types.str;
default = "127.0.0.1";
description = "The default listener address for the worker";
};
workerStartingPort = lib.mkOption {
type = lib.types.port;
description = "What port should the automatically configured workers start enumerating from";
default = 8083;
};
enableMetrics = lib.mkOption {
type = lib.types.bool;
default = cfg.settings.enable_metrics;
};
metricsStartingPort = lib.mkOption {
type = lib.types.port;
default = 18083;
};
federationSenders = lib.mkOption {
type = lib.types.ints.unsigned;
description = "How many automatically configured federation senders to set up";
default = 0;
};
federationReceivers = lib.mkOption {
type = lib.types.ints.unsigned;
description = "How many automatically configured federation recievers to set up";
default = 0;
};
initialSyncers = lib.mkOption {
type = lib.types.ints.unsigned;
description = "How many automatically configured intial syncers to set up";
default = 0;
};
normalSyncers = lib.mkOption {
type = lib.types.ints.unsigned;
description = "How many automatically configured sync workers to set up";
default = 0;
};
eventPersisters = lib.mkOption {
type = lib.types.ints.unsigned;
description = "How many automatically configured event-persisters to set up";
default = 0;
};
useUserDirectoryWorker = lib.mkEnableOption "user directory worker";
instances = lib.mkOption {
type = lib.types.attrsOf (lib.types.submodule ({config, ...}: {
options.isAuto = lib.mkOption {
type = lib.types.bool;
internal = true;
default = false;
};
options.index = lib.mkOption {
internal = true;
type = lib.types.ints.positive;
};
# The custom string type here is mainly for the name to use for the metrics of custom worker types
options.type = lib.mkOption {
type = lib.types.either (lib.types.str) (lib.types.enum [ "fed-sender" "fed-receiver" ]);
};
options.settings = let
instanceCfg = config;
inherit (instanceCfg) type isAuto;
in lib.mkOption {
type = lib.types.submodule ({config, ...}: {
freeformType = format.type;
options.worker_app = let
mapTypeApp = t: {
"fed-sender" = "synapse.app.generic_worker";
"fed-receiver" = "synapse.app.generic_worker";
"initial-sync" = "synapse.app.generic_worker";
"normal-sync" = "synapse.app.generic_worker";
"event-persist" = "synapse.app.generic_worker";
"user-dir" = "synapse.app.generic_worker";
}.${t};
defaultApp = if (!isAuto)
then "synapse.app.generic_worker"
else mapTypeApp type;
in lib.mkOption {
type = lib.types.enum [
"synapse.app.generic_worker"
"synapse.app.appservice"
"synapse.app.media_repository"
"synapse.app.user_dir"
];
description = "The type of worker application";
default = defaultApp;
};
options.worker_replication_host = lib.mkOption {
type = lib.types.str;
default = cfg.workers.mainReplicationHost;
description = "The replication listeners ip on the main synapse process";
};
options.worker_replication_http_port = lib.mkOption {
type = lib.types.port;
default = cfg.workers.mainReplicationPort;
description = "The replication listeners port on the main synapse process";
};
options.worker_listeners = lib.mkOption {
type = lib.types.listOf (lib.types.submodule {
options.type = lib.mkOption {
type = lib.types.enum [ "http" "metrics" ];
description = "The type of the listener";
default = "http";
};
options.port = lib.mkOption {
type = lib.types.port;
description = "the TCP port to bind to";
};
options.bind_addresses = lib.mkOption {
type = lib.types.listOf lib.types.str;
description = "A list of local addresses to listen on";
default = [ cfg.workers.defaultListenerAddress ];
};
options.tls = lib.mkOption {
type = lib.types.bool;
description = "set to true to enable TLS for this listener. Will use the TLS key/cert specified in tls_private_key_path / tls_certificate_path.";
default = false;
};
options.x_forwarded = lib.mkOption {
type = lib.types.bool;
description = '' description = ''
Only valid for an 'http' listener. Set to true to use the X-Forwarded-For header as the client IP. The domain where clients and such will connect.
Useful when Synapse is behind a reverse-proxy. This may be different from server_name if using delegation.
''; '';
default = true;
};
options.resources = let
typeToResources = t: {
"fed-receiver" = [ "federation" ];
"fed-sender" = [ ];
"initial-sync" = [ "client" ];
"normal-sync" = [ "client" ];
"event-persist" = [ "replication" ];
"user-dir" = [ "client" ];
}.${t};
in lib.mkOption {
type = lib.types.listOf (lib.types.submodule {
options.names = lib.mkOption {
type = lib.types.listOf (lib.types.enum [ "client" "consent" "federation" "keys" "media" "metrics" "openid" "replication" "static" "webclient" ]);
description = "A list of resources to host on this port";
default = lib.optionals isAuto (typeToResources type);
};
options.compress = lib.mkOption {
type = lib.types.bool;
description = "enable HTTP compression for this resource";
default = false;
};
});
default = [{ }];
};
});
description = "Listener configuration for the worker, similar to the main synapse listener";
default = [ ];
};
});
default = { };
};
}));
default = { };
description = "Worker configuration";
example = {
"federation_sender1" = {
settings = {
worker_name = "federation_sender1";
worker_app = "synapse.app.generic_worker";
worker_replication_host = "127.0.0.1";
worker_replication_http_port = 9093;
worker_listeners = [ ];
};
};
};
};
}; };
settings = lib.mkOption { mainLogConfig = mkOption {
type = lib.types.submodule { type = with types; coercedTo path lib.readFile lines;
default = ./matrix-synapse-log_config.yaml;
description = "A yaml python logging config file";
};
settings = mkOption {
type = types.submodule {
freeformType = format.type; freeformType = format.type;
options = {
options.server_name = lib.mkOption { server_name = mkOption {
type = lib.types.str; type = types.str;
description = '' description = ''
The server_name name will appear at the end of usernames and room addresses The server_name name will appear at the end of usernames and room addresses
created on this server. For example if the server_name was example.com, created on this server. For example if the server_name was example.com,
@ -300,55 +107,80 @@ in
example = "matrix.org"; example = "matrix.org";
}; };
options.use_presence = lib.mkOption { use_presence = mkOption {
type = lib.types.bool; type = types.bool;
description = "disable presence tracking, if you're having perfomance issues this can have a big impact"; description = "Disable presence tracking, if you're having perfomance issues this can have a big impact";
default = true; default = true;
}; };
options.listeners = lib.mkOption {
type = lib.types.listOf (lib.types.submodule { listeners = mkOption {
options.port = lib.mkOption { type = types.listOf (types.submodule {
type = lib.types.port; options = {
description = "the TCP port to bind to"; port = mkOption {
type = types.port;
description = "The TCP port to bind to";
example = 8448; example = 8448;
}; };
options.bind_addresses = lib.mkOption {
type = lib.types.listOf lib.types.str; bind_addresses = mkOption {
type = types.listOf types.str;
description = "A list of local addresses to listen on"; description = "A list of local addresses to listen on";
}; };
options.type = lib.mkOption {
type = lib.types.enum [ "http" "manhole" "metrics" "replication" ]; type = mkOption {
type = types.enum [ "http" "manhole" "metrics" "replication" ];
description = "The type of the listener"; description = "The type of the listener";
default = "http"; default = "http";
}; };
options.tls = lib.mkOption {
type = lib.types.bool; tls = mkOption {
description = "set to true to enable TLS for this listener. Will use the TLS key/cert specified in tls_private_key_path / tls_certificate_path."; type = types.bool;
description = ''
Set to true to enable TLS for this listener.
Will use the TLS key/cert specified in tls_private_key_path / tls_certificate_path.
'';
default = false; default = false;
}; };
options.x_forwarded = lib.mkOption {
type = lib.types.bool; x_forwarded = mkOption {
type = types.bool;
description = '' description = ''
Only valid for an 'http' listener. Set to true to use the X-Forwarded-For header as the client IP. Set to true to use the X-Forwarded-For header as the client IP.
Only valid for an 'http' listener.
Useful when Synapse is behind a reverse-proxy. Useful when Synapse is behind a reverse-proxy.
''; '';
default = true; default = true;
}; };
options.resources = lib.mkOption {
type = lib.types.listOf (lib.types.submodule { resources = mkOption {
options.names = lib.mkOption { type = types.listOf (types.submodule {
type = lib.types.listOf (lib.types.enum [ "client" "consent" "federation" "keys" "media" "metrics" "openid" "replication" "static" "webclient" ]); options = {
names = mkOption {
type = with types; listOf (enum [
"client"
"consent"
"federation"
"keys"
"media"
"metrics"
"openid"
"replication"
"static"
"webclient"
]);
description = "A list of resources to host on this port"; description = "A list of resources to host on this port";
}; };
options.compress = lib.mkOption {
type = lib.types.bool; compress = mkEnableOption "HTTP compression for this resource";
description = "enable HTTP compression for this resource";
default = false;
}; };
}); });
}; };
};
}); });
description = "List of ports that Synapse should listen on, their purpose and their configuration"; description = "List of ports that Synapse should listen on, their purpose and their configuration";
# TODO: add defaultText
default = [ default = [
{ {
port = 8008; port = 8008;
@ -358,14 +190,14 @@ in
{ names = [ "federation" ]; compress = false; } { names = [ "federation" ]; compress = false; }
]; ];
} }
(lib.mkIf (wcfg.instances != { }) { (mkIf (wcfg.instances != { }) {
port = 9093; port = 9093;
bind_addresses = [ "127.0.0.1" ]; bind_addresses = [ "127.0.0.1" ];
resources = [ resources = [
{ names = [ "replication" ]; } { names = [ "replication" ]; }
]; ];
}) })
(lib.mkIf cfg.settings.enable_metrics { (mkIf cfg.settings.enable_metrics {
port = 9000; port = 9000;
bind_addresses = [ "127.0.0.1" ]; bind_addresses = [ "127.0.0.1" ];
resources = [ resources = [
@ -375,8 +207,8 @@ in
]; ];
}; };
options.federation_ip_range_blacklist = lib.mkOption { federation_ip_range_blacklist = mkOption {
type = lib.types.listOf lib.types.str; type = types.listOf types.str;
description = '' description = ''
Prevent federation requests from being sent to the following Prevent federation requests from being sent to the following
blacklist IP address CIDR ranges. If this option is not specified, or blacklist IP address CIDR ranges. If this option is not specified, or
@ -394,62 +226,55 @@ in
"fc00::/7" "fc00::/7"
]; ];
}; };
options.log_config = lib.mkOption {
type = lib.types.path; log_config = mkOption {
type = types.path;
description = '' description = ''
A yaml python logging config file as described by A yaml python logging config file as described by
https://docs.python.org/3.7/library/logging.config.html#configuration-dictionary-schema https://docs.python.org/3.7/library/logging.config.html#configuration-dictionary-schema
''; '';
default = pkgs.writeText "log_config.yaml" cfg.mainLogConfig; default = pkgs.writeText "log_config.yaml" cfg.mainLogConfig;
defaultText = "A config file generated from ${cfgText}.mainLogConfig";
}; };
options.media_store_path = lib.mkOption { media_store_path = mkOption {
type = lib.types.path; type = types.path;
description = "Directory where uploaded images and attachments are stored"; description = "Directory where uploaded images and attachments are stored";
default = "${cfg.dataDir}/media_store"; default = "${cfg.dataDir}/media_store";
defaultText = literalExpression ''''${${cfgText}.dataDir}/media_store'';
}; };
options.max_upload_size = lib.mkOption {
type = lib.types.str; max_upload_size = mkOption {
type = types.str;
description = "The largest allowed upload size in bytes"; description = "The largest allowed upload size in bytes";
default = "50M"; default = "50M";
example = "800K";
}; };
options.enable_registration = lib.mkOption { enable_registration = mkEnableOption "registration for new users";
type = lib.types.bool; enable_metrics = mkEnableOption "collection and rendering of performance metrics";
description = "Enable registration for new users"; report_stats = mkEnableOption "reporting usage stats";
default = false;
};
options.enable_metrics = lib.mkOption { app_service_config_files = mkOption {
type = lib.types.bool; type = types.listOf types.path;
description = "Enable collection and rendering of performance metrics";
default = false;
};
options.report_stats = lib.mkOption {
type = lib.types.bool;
description = "TODO: Enable and Disable reporting usage stats";
default = false;
};
options.app_service_config_files = lib.mkOption {
type = lib.types.listOf lib.types.path;
description = "A list of application service config files to use"; description = "A list of application service config files to use";
default = []; default = [];
}; };
options.signing_key_path = lib.mkOption { signing_key_path = mkOption {
type = lib.types.path; type = types.path;
description = "Path to the signing key to sign messages with"; description = "Path to the signing key to sign messages with";
default = "${cfg.dataDir}/homeserver.signing.key"; default = "${cfg.dataDir}/homeserver.signing.key";
defaultText = literalExpression ''''${${cfgText}.dataDir}/homeserver.signing.key'';
}; };
options.trusted_key_servers = lib.mkOption { trusted_key_servers = mkOption {
type = lib.types.listOf (lib.types.submodule { type = types.listOf (types.submodule {
freeformType = format.type; freeformType = format.type;
options.server_name = lib.mkOption { options.server_name = mkOption {
type = lib.types.str; type = types.str;
description = "the name of the server. required"; description = "The name of the server. This is required.";
}; };
}); });
description = "The trusted servers to download signing keys from"; description = "The trusted servers to download signing keys from";
@ -461,36 +286,42 @@ in
]; ];
}; };
options.federation_sender_instances = lib.mkOption { federation_sender_instances = mkOption {
type = lib.types.listOf lib.types.str; type = types.listOf types.str;
description = '' description = ''
This configuration must be shared between all federation sender workers, and if This configuration must be shared between all federation sender workers.
changed all federation sender workers must be stopped at the same time and then
started, to ensure that all instances are running with the same config (otherwise When changed, all federation sender workers must be stopped at the same time and
events may be dropped) restarted, to ensure that all instances are running with the same config.
Otherwise, events may be dropped.
''; '';
default = [ ]; default = [ ];
}; };
options.redis = lib.mkOption { redis = mkOption {
type = lib.types.submodule { type = types.submodule {
freeformType = format.type; freeformType = format.type;
options.enabled = lib.mkOption { options.enabled = mkOption {
type = lib.types.bool; type = types.bool;
description = "Enables using redis, required for worker support"; description = ''
default = (lib.lists.count (x: true) Whether to enable redis within synapse.
(lib.attrsets.attrValues cfg.workers.instances)) > 0;
This is required for worker support.
'';
default = wcfg.instances != { };
defaultText = literalExpression "${wcfgText}.instances != { }";
}; };
}; };
default = { }; default = { };
description = "configuration of redis for synapse and workers"; description = "Redis configuration for synapse and workers";
};
}; };
}; };
}; };
extraConfigFiles = lib.mkOption { extraConfigFiles = mkOption {
type = lib.types.listOf lib.types.path; type = types.listOf types.path;
default = []; default = [];
description = '' description = ''
Extra config files to include. Extra config files to include.
@ -500,11 +331,9 @@ in
NixOPS is in use. NixOPS is in use.
''; '';
}; };
}; };
config = lib.mkIf cfg.enable (lib.mkMerge [ config = mkIf cfg.enable {
({
users.users.matrix-synapse = { users.users.matrix-synapse = {
group = "matrix-synapse"; group = "matrix-synapse";
home = cfg.dataDir; home = cfg.dataDir;
@ -512,174 +341,56 @@ in
shell = "${pkgs.bash}/bin/bash"; shell = "${pkgs.bash}/bin/bash";
uid = config.ids.uids.matrix-synapse; uid = config.ids.uids.matrix-synapse;
}; };
users.groups.matrix-synapse = { users.groups.matrix-synapse = {
gid = config.ids.gids.matrix-synapse; gid = config.ids.gids.matrix-synapse;
}; };
systemd.targets.matrix-synapse = {
description = "Synapse parent target"; systemd = {
targets.matrix-synapse = {
description = "Matrix synapse parent target";
after = [ "network.target" ]; after = [ "network.target" ];
wantedBy = [ "multi-user.target" ]; wantedBy = [ "multi-user.target" ];
}; };
})
({ slices.system-matrix-synapse = {
systemd.services.matrix-synapse = { description = "Matrix synapse slice";
requires= [ "system.slice" ];
after= [ "system.slice" ];
};
services.matrix-synapse = {
description = "Synapse Matrix homeserver"; description = "Synapse Matrix homeserver";
partOf = [ "matrix-synapse.target" ]; partOf = [ "matrix-synapse.target" ];
wantedBy = [ "matrix-synapse.target" ]; wantedBy = [ "matrix-synapse.target" ];
preStart = ''
${cfg.package}/bin/synapse_homeserver \ preStart = let
${ lib.concatMapStringsSep "\n " (x: "--config-path ${x} \\") ([ matrix-synapse-common-config ] ++ cfg.extraConfigFiles) } flags = lib.cli.toGNUCommandLineShell {} {
--keys-directory ${cfg.dataDir} \ config-path = [ matrix-synapse-common-config ] ++ cfg.extraConfigFiles;
--generate-keys keys-directory = cfg.dataDir;
''; generate-keys = true;
environment.PYTHONPATH = lib.makeSearchPathOutput "lib" cfg.package.python.sitePackages [ pluginsEnv ]; };
in "${cfg.package}/bin/synapse_homeserver ${flags}";
environment.PYTHONPATH =
lib.makeSearchPathOutput "lib" cfg.package.python.sitePackages [ pluginsEnv ];
serviceConfig = { serviceConfig = {
Type = "notify"; Type = "notify";
User = "matrix-synapse"; User = "matrix-synapse";
Group = "matrix-synapse"; Group = "matrix-synapse";
Slice = "system-matrix-synapse.slice";
WorkingDirectory = cfg.dataDir; WorkingDirectory = cfg.dataDir;
ExecStart = '' ExecStart = let
${cfg.package}/bin/synapse_homeserver \ flags = lib.cli.toGNUCommandLineShell {} {
${ lib.concatMapStringsSep "\n " (x: "--config-path ${x} \\") ([ matrix-synapse-common-config ] ++ cfg.extraConfigFiles) } config-path = [ matrix-synapse-common-config ] ++ cfg.extraConfigFiles;
--keys-directory ${cfg.dataDir} keys-directory = cfg.dataDir;
''; };
in "${cfg.package}/bin/synapse_homeserver ${flags}";
ExecReload = "${pkgs.utillinux}/bin/kill -HUP $MAINPID"; ExecReload = "${pkgs.utillinux}/bin/kill -HUP $MAINPID";
Restart = "on-failure"; Restart = "on-failure";
}; };
}; };
})
(lib.mkMerge [
({
services.matrix-synapse-next.settings.federation_sender_instances = lib.genList (i: "auto-fed-sender${toString (i + 1)}") cfg.workers.federationSenders;
services.matrix-synapse-next.workers.instances = genAttrs' (lib.lists.range 1 cfg.workers.federationSenders)
(i: "auto-fed-sender${toString i}")
(i: {
isAuto = true; type = "fed-sender"; index = i;
settings.worker_listeners = lib.mkIf wcfg.enableMetrics [
{ port = cfg.workers.metricsStartingPort + i - 1;
resources = [ { names = [ "metrics" ]; } ];
}
];
});
})
({
services.matrix-synapse-next.workers.instances = genAttrs' (lib.lists.range 1 cfg.workers.federationReceivers)
(i: "auto-fed-receiver${toString i}")
(i: {
isAuto = true; type = "fed-receiver"; index = i;
settings.worker_listeners = [{ port = cfg.workers.workerStartingPort + i - 1; }]
++ lib.optional wcfg.enableMetrics { port = cfg.workers.metricsStartingPort + cfg.workers.federationSenders + i;
resources = [ { names = [ "metrics" ]; } ];
};
});
})
({
services.matrix-synapse-next.workers.instances = genAttrs' (lib.lists.range 1 cfg.workers.initialSyncers)
(i: "auto-initial-sync${toString i}")
(i: {
isAuto = true; type = "initial-sync"; index = i;
settings.worker_listeners = [{ port = cfg.workers.workerStartingPort + cfg.workers.federationReceivers + i - 1; }]
++ lib.optional wcfg.enableMetrics { port = cfg.workers.metricsStartingPort + cfg.workers.federationSenders + cfg.workers.federationReceivers + i;
resources = [ { names = [ "metrics" ]; } ];
};
});
})
({
services.matrix-synapse-next.workers.instances = genAttrs' (lib.lists.range 1 cfg.workers.normalSyncers)
(i: "auto-normal-sync${toString i}")
(i: {
isAuto = true; type = "normal-sync"; index = i;
settings.worker_listeners = [{ port = cfg.workers.workerStartingPort + cfg.workers.federationReceivers + cfg.workers.initialSyncers + i - 1; }]
++ lib.optional wcfg.enableMetrics { port = cfg.workers.metricsStartingPort + cfg.workers.federationSenders + cfg.workers.federationReceivers + cfg.workers.initialSyncers + i;
resources = [ { names = [ "metrics" ]; } ];
};
});
})
({
services.matrix-synapse-next.settings.instance_map = genAttrs' (lib.lists.range 1 cfg.workers.eventPersisters)
(i: "auto-event-persist${toString i}")
(i: let
isReplication = l: lib.lists.any (r: lib.lists.any (n: n == "replication") r.names) l.resources;
wRL = lib.lists.findFirst isReplication
(throw "No replication listener configured!")
cfg.workers.instances."auto-event-persist${toString i}".settings.worker_listeners;
wRH = lib.findFirst (x: true) (throw "Replication listener had no addresses")
wRL.bind_addresses;
wRP = wRL.port;
in {
host = wRH;
port = wRP;
}
);
services.matrix-synapse-next.settings.stream_writers.events = lib.mkIf (cfg.workers.eventPersisters > 0) (lib.genList (i: "auto-event-persist${toString (i + 1)}") cfg.workers.eventPersisters);
services.matrix-synapse-next.workers.instances = genAttrs' (lib.lists.range 1 cfg.workers.eventPersisters)
(i: "auto-event-persist${toString i}")
(i: {
isAuto = true; type = "event-persist"; index = i;
settings.worker_listeners = [{ port = cfg.workers.workerStartingPort + cfg.workers.federationReceivers + cfg.workers.initialSyncers + cfg.workers.normalSyncers + i - 1;}]
++ lib.optional wcfg.enableMetrics { port = cfg.workers.metricsStartingPort + cfg.workers.federationSenders + cfg.workers.federationReceivers + cfg.workers.initialSyncers + cfg.workers.normalSyncers + i;
resources = [ { names = [ "metrics" ]; } ];
};
});
})
(lib.mkIf cfg.workers.useUserDirectoryWorker {
services.matrix-synapse-next.workers.instances."auto-user-dir" = {
isAuto = true; type = "user-dir"; index = 1;
settings.worker_listeners = [{ port = cfg.workers.workerStartingPort + cfg.workers.federationReceivers + cfg.workers.initialSyncers + cfg.workers.normalSyncers + cfg.workers.eventPersisters + 1 - 1;}]
++ lib.optional wcfg.enableMetrics { port = cfg.workers.metricsStartingPort + cfg.workers.federationSenders + cfg.workers.federationReceivers + cfg.workers.initialSyncers + cfg.workers.normalSyncers + cfg.workers.eventPersisters + 1;
resources = [ { names = [ "metrics"]; } ];
};
};
services.matrix-synapse-next.settings.update_user_directory_from_worker = "auto-user-dir";
})
])
({
systemd.services = let
workerList = lib.mapAttrsToList (name: value: lib.nameValuePair name value ) cfg.workers.instances;
workerName = worker: worker.name;
workerSettings = worker: (worker.value.settings // {worker_name = (workerName worker);});
workerConfig = worker: format.generate "matrix-synapse-worker-${workerName worker}-config.yaml" (workerSettings worker);
in builtins.listToAttrs (map (worker:
{
name = "matrix-synapse-worker-${workerName worker}";
value = {
description = "Synapse Matrix Worker";
partOf = [ "matrix-synapse.target" ];
wantedBy = [ "matrix-synapse.target" ];
after = [ "matrix-synapse.service" ];
requires = [ "matrix-synapse.service" ];
environment.PYTHONPATH = lib.makeSearchPathOutput "lib" cfg.package.python.sitePackages [
pluginsEnv
];
serviceConfig = {
Type = "notify";
User = "matrix-synapse";
Group = "matrix-synapse";
WorkingDirectory = cfg.dataDir;
ExecStartPre = pkgs.writers.writeBash "wait-for-synapse" ''
# From https://md.darmstadt.ccc.de/synapse-at-work
while ! systemctl is-active -q matrix-synapse.service; do
sleep 1
done
'';
ExecStart = ''
${cfg.package}/bin/synapse_worker \
${ lib.concatMapStringsSep "\n " (x: "--config-path ${x} \\") ([ matrix-synapse-common-config (workerConfig worker) ] ++ cfg.extraConfigFiles) }
--keys-directory ${cfg.dataDir}
'';
}; };
}; };
} }
) workerList);
})
]);
}

View File

@ -1,4 +1,4 @@
{ lib, pkgs, config, ...}: { pkgs, lib, config, ... }:
let let
cfg = config.services.matrix-synapse-next; cfg = config.services.matrix-synapse-next;
@ -140,7 +140,7 @@ in
services.nginx.upstreams.synapse_master.servers = let services.nginx.upstreams.synapse_master.servers = let
isMainListener = l: isListenerType "client" l && isListenerType "federation" l; isMainListener = l: isListenerType "client" l && isListenerType "federation" l;
firstMainListener = lib.findFirst isMainListener firstMainListener = lib.findFirst isMainListener
(throw "No cartch-all listener configured") cfg.settings.listeners; (throw "No catch-all listener configured") cfg.settings.listeners;
address = lib.findFirst (_: true) (throw "No address in main listener") firstMainListener.bind_addresses; address = lib.findFirst (_: true) (throw "No address in main listener") firstMainListener.bind_addresses;
port = firstMainListener.port; port = firstMainListener.port;
socketAddress = "${address}:${builtins.toString port}"; socketAddress = "${address}:${builtins.toString port}";

382
synapse-module/workers.nix Normal file
View File

@ -0,0 +1,382 @@
{ matrix-synapse-common-config,
pluginsEnv,
throw',
format
}:
{ pkgs, lib, config, ... }: let
cfg = config.services.matrix-synapse-next;
wcfg = config.services.matrix-synapse-next.workers;
# Used to generate proper defaultTexts.
cfgText = "config.services.matrix-synapse-next";
wcfgText = "config.services.matrix-synapse-next.workers";
inherit (lib) types mkOption mkEnableOption mkIf mkMerge literalExpression;
mkWorkerCountOption = workerType: mkOption {
type = types.ints.unsigned;
description = "How many automatically configured ${workerType} workers to set up";
default = 0;
};
genAttrs' = items: f: g: builtins.listToAttrs (map (i: lib.nameValuePair (f i) (g i)) items);
isReplicationListener =
l: lib.any (r: lib.any (n: n == "replication") r.names) l.resources;
mainReplicationListener = lib.lists.findFirst isReplicationListener
(throw' "No replication listener configured!")
cfg.settings.listeners;
mainReplicationListenerHost =
if mainReplicationListener.bind_addresses == []
then throw' "Replication listener had no addresses"
else builtins.head mainReplicationListener.bind_addresses;
mainReplicationListenerPort = mainReplicationListener.port;
in {
# See https://github.com/matrix-org/synapse/blob/develop/docs/workers.md for more info
options.services.matrix-synapse-next.workers = let
workerInstanceType = types.submodule ({ config, ... }: {
options = {
isAuto = mkOption {
type = types.bool;
internal = true;
default = false;
};
index = mkOption {
internal = true;
type = types.ints.positive;
};
# The custom string type here is mainly for the name to use
# for the metrics of custom worker types
type = mkOption {
type = types.str;
# TODO: add description and possibly default value?
};
settings = mkOption {
type = workerSettingsType config;
default = { };
};
};
});
workerSettingsType = instanceCfg: types.submodule {
freeformType = format.type;
options = {
worker_app = mkOption {
type = types.enum [
"synapse.app.generic_worker"
"synapse.app.appservice"
"synapse.app.media_repository"
"synapse.app.user_dir"
];
description = "The type of worker application";
default = "synapse.app.generic_worker";
};
worker_replication_host = mkOption {
type = types.str;
default = wcfg.mainReplicationHost;
defaultText = literalExpression "${wcfgText}.mainReplicationHost";
description = "The replication listeners IP on the main synapse process";
};
worker_replication_http_port = mkOption {
type = types.port;
default = wcfg.mainReplicationPort;
defaultText = literalExpression "${wcfgText}.mainReplicationPort";
description = "The replication listeners port on the main synapse process";
};
worker_listeners = mkOption {
type = types.listOf (workerListenerType instanceCfg);
description = "Listener configuration for the worker, similar to the main synapse listener";
default = [ ];
};
};
};
workerListenerType = instanceCfg: types.submodule {
options = {
type = mkOption {
type = types.enum [ "http" "metrics" ];
description = "The type of the listener";
default = "http";
};
port = mkOption {
type = types.port;
description = "The TCP port to bind to";
};
bind_addresses = mkOption {
type = with types; listOf str;
description = "A list of local addresses to listen on";
default = [ wcfg.defaultListenerAddress ];
defaultText = literalExpression "[ ${wcfgText}.defaultListenerAddress ]";
};
tls = mkOption {
type = types.bool;
description = ''
Whether to enable TLS for this listener.
Will use the TLS key/cert specified in tls_private_key_path / tls_certificate_path.
'';
default = false;
example = true;
};
x_forwarded = mkOption {
type = types.bool;
description = ''
Whether to use the X-Forwarded-For HTTP header as the client IP.
This option is only valid for an 'http' listener.
It is useful when Synapse is running behind a reverse-proxy.
'';
default = true;
example = false;
};
resources = let
typeToResources = t: {
"fed-receiver" = [ "federation" ];
"fed-sender" = [ ];
"initial-sync" = [ "client" ];
"normal-sync" = [ "client" ];
"event-persist" = [ "replication" ];
"user-dir" = [ "client" ];
}.${t};
in mkOption {
type = types.listOf (types.submodule {
options = {
names = mkOption {
type = with types; listOf (enum [
"client"
"consent"
"federation"
"keys"
"media"
"metrics"
"openid"
"replication"
"static"
"webclient"
]);
description = "A list of resources to host on this port";
default = lib.optionals instanceCfg.isAuto (typeToResources instanceCfg.type);
defaultText = ''
If the worker is generated from other config, the resource type will
be determined automatically.
'';
};
compress = mkEnableOption "HTTP compression for this resource";
};
});
default = [{ }];
};
};
};
in {
mainReplicationHost = mkOption {
type = types.str;
default =
if builtins.elem mainReplicationListenerHost [ "0.0.0.0" "::" ]
then "127.0.0.1"
else mainReplicationListenerHost;
# TODO: add defaultText
description = "Host of the main synapse instance's replication listener";
};
mainReplicationPort = mkOption {
type = types.port;
default = mainReplicationListenerPort;
# TODO: add defaultText
description = "Port for the main synapse instance's replication listener";
};
defaultListenerAddress = mkOption {
type = types.str;
default = "127.0.0.1";
description = "The default listener address for the worker";
};
workerStartingPort = mkOption {
type = types.port;
description = "What port should the automatically configured workers start enumerating from";
default = 8083;
};
enableMetrics = mkOption {
type = types.bool;
default = cfg.settings.enable_metrics;
defaultText = literalExpression "${cfgText}.settings.enable_metrics";
# TODO: add description
};
metricsStartingPort = mkOption {
type = types.port;
default = 18083;
# TODO: add description
};
federationSenders = mkWorkerCountOption "federation-sender";
federationReceivers = mkWorkerCountOption "federation-reciever";
initialSyncers = mkWorkerCountOption "initial-syncer";
normalSyncers = mkWorkerCountOption "sync";
eventPersisters = mkWorkerCountOption "event-persister";
useUserDirectoryWorker = mkEnableOption "user directory worker";
instances = mkOption {
type = types.attrsOf workerInstanceType;
default = { };
description = "Worker configuration";
example = {
"federation_sender1" = {
settings = {
worker_name = "federation_sender1";
worker_app = "synapse.app.generic_worker";
worker_replication_host = "127.0.0.1";
worker_replication_http_port = 9093;
worker_listeners = [ ];
};
};
};
};
};
config = {
services.matrix-synapse-next.settings = {
federation_sender_instances =
lib.genList (i: "auto-fed-sender${toString (i + 1)}") wcfg.federationSenders;
instance_map = genAttrs' (lib.lists.range 1 wcfg.eventPersisters)
(i: "auto-event-persist${toString i}")
(i: let
wRL = lib.lists.findFirst isReplicationListener
(throw' "No replication listener configured!")
wcfg.instances."auto-event-persist${toString i}".settings.worker_listeners;
wRH = lib.findFirst (x: true) (throw' "Replication listener had no addresses")
wRL.bind_addresses;
wRP = wRL.port;
in {
host = wRH;
port = wRP;
});
stream_writers.events =
mkIf (wcfg.eventPersisters > 0)
(lib.genList (i: "auto-event-persist${toString (i + 1)}") wcfg.eventPersisters);
update_user_directory_from_worker =
mkIf wcfg.useUserDirectoryWorker "auto-user-dir";
};
services.matrix-synapse-next.workers.instances = let
sum = lib.foldl lib.add 0;
workerListenersWithMetrics = portOffset:
lib.singleton ({
port = wcfg.workerStartingPort + portOffset - 1;
})
++ lib.optional wcfg.enableMetrics {
port = wcfg.metricsStartingPort + portOffset;
resources = [ { names = [ "metrics" ]; } ];
};
makeWorkerInstances = {
type,
numberOfWorkers,
portOffset ? 0,
nameFn ? i: "auto-${type}${toString i}",
workerListenerFn ? i: workerListenersWithMetrics (portOffset + i)
}: genAttrs'
(lib.lists.range 1 numberOfWorkers)
nameFn
(i: {
isAuto = true;
inherit type;
index = i;
settings.worker_listeners = workerListenerFn i;
});
workerInstances = {
"fed-sender" = wcfg.federationSenders;
"fed-receiver" = wcfg.federationReceivers;
"initial-sync" = wcfg.initialSyncers;
"normal-sync" = wcfg.normalSyncers;
"event-persist" = wcfg.eventPersisters;
} // (lib.optionalAttrs wcfg.useUserDirectoryWorker {
"user-dir" = {
numberOfWorkers = 1;
nameFn = _: "auto-user-dir";
};
});
coerceWorker = { name, value }: if builtins.isInt value then {
type = name;
numberOfWorkers = value;
} else { type = name; } // value;
# Like foldl, but keeps all intermediate values
#
# (b -> a -> b) -> b -> [a] -> [b]
scanl = f: x1: list: let
x2 = lib.head list;
x1' = f x1 x2;
in if list == [] then [] else [x1'] ++ (scanl f x1' (lib.tail list));
f = { portOffset, numberOfWorkers, ... }: x: x // { portOffset = portOffset + numberOfWorkers; };
init = { portOffset = 0; numberOfWorkers = 0; };
in lib.pipe workerInstances [
(lib.mapAttrsToList lib.nameValuePair)
(map coerceWorker)
(scanl f init)
(map makeWorkerInstances)
mkMerge
];
systemd.services = let
workerList = lib.mapAttrsToList lib.nameValuePair wcfg.instances;
workerConfig = worker: format.generate "matrix-synapse-worker-${worker.name}-config.yaml"
(worker.value.settings // { worker_name = worker.name; });
in builtins.listToAttrs (lib.flip map workerList (worker: {
name = "matrix-synapse-worker-${worker.name}";
value = {
description = "Synapse Matrix Worker";
partOf = [ "matrix-synapse.target" ];
wantedBy = [ "matrix-synapse.target" ];
after = [ "matrix-synapse.service" ];
requires = [ "matrix-synapse.service" ];
environment.PYTHONPATH = lib.makeSearchPathOutput "lib" cfg.package.python.sitePackages [
pluginsEnv
];
serviceConfig = {
Type = "notify";
User = "matrix-synapse";
Group = "matrix-synapse";
Slice = "system-matrix-synapse.slice";
WorkingDirectory = cfg.dataDir;
ExecStartPre = pkgs.writers.writeBash "wait-for-synapse" ''
# From https://md.darmstadt.ccc.de/synapse-at-work
while ! systemctl is-active -q matrix-synapse.service; do
sleep 1
done
'';
ExecStart = let
flags = lib.cli.toGNUCommandLineShell {} {
config-path = [ matrix-synapse-common-config (workerConfig worker) ] ++ cfg.extraConfigFiles;
keys-directory = cfg.dataDir;
};
in "${cfg.package}/bin/synapse_worker ${flags}";
};
};
}));
};
}