nixos-matrix-modules/synapse-module/workers.nix

319 lines
11 KiB
Nix

{ matrix-synapse-common-config,
matrix-lib,
pluginsEnv,
throw',
format
}:
{ pkgs, lib, config, ... }: let
cfg = config.services.matrix-synapse-next;
wcfg = config.services.matrix-synapse-next.workers;
# Used to generate proper defaultTexts.
cfgText = "config.services.matrix-synapse-next";
wcfgText = "config.services.matrix-synapse-next.workers";
inherit (lib) types mkOption mkEnableOption mkIf mkMerge literalExpression;
mkWorkerCountOption = workerType: mkOption {
type = types.ints.unsigned;
description = "How many automatically configured ${workerType} workers to set up";
default = 0;
example = 1;
};
genAttrs' = items: f: g: builtins.listToAttrs (map (i: lib.nameValuePair (f i) (g i)) items);
mainReplicationListener = matrix-lib.firstListenerOfType "replication" cfg.settings.listeners;
in {
# See https://github.com/matrix-org/synapse/blob/develop/docs/workers.md for more info
options.services.matrix-synapse-next.workers = let
workerInstanceType = types.submodule ({ config, ... }: {
options = {
isAuto = mkOption {
type = types.bool;
internal = true;
default = false;
description = ''
This is an internal flag that signals that this worker is part of the
workers generated by either of the following:
- federationSenders
- federationReceivers
- initialSyncers
- normalSyncers
- eventPersisters
- useUserDirectoryWorker
'';
};
index = mkOption {
internal = true;
type = types.ints.positive;
description = ''
This is an internal variable that indexes the worker of this type.
'';
};
# The custom string type here is mainly for the name to use
# for the metrics of custom worker types
type = mkOption {
type = types.str;
# TODO: add description and possibly default value?
};
settings = mkOption {
type = workerSettingsType config;
default = { };
};
};
});
workerSettingsType = instanceCfg: types.submodule {
freeformType = format.type;
options = {
worker_app = mkOption {
type = types.enum [
"synapse.app.generic_worker"
"synapse.app.appservice"
"synapse.app.media_repository"
"synapse.app.user_dir"
];
description = "The type of worker application";
default = "synapse.app.generic_worker";
};
worker_listeners = mkOption {
type = types.listOf (workerListenerType instanceCfg);
description = "Listener configuration for the worker, similar to the main synapse listener";
default = [ ];
};
};
};
workerListenerType = instanceCfg: types.submodule {
options = {
type = mkOption {
type = types.enum [ "http" "metrics" ];
description = "The type of the listener";
default = "http";
};
path = mkOption {
type = types.path;
default = instanceCfg.name;
description = ''
A path and filename for a Unix socket.
'';
};
# port = mkOption {
# type = types.port;
# description = "The TCP port to bind to";
# };
# bind_addresses = mkOption {
# type = with types; listOf str;
# description = "A list of local addresses to listen on";
# default = [ wcfg.defaultListenerAddress ];
# defaultText = literalExpression "[ ${wcfgText}.defaultListenerAddress ]";
# };
tls = mkOption {
type = types.bool;
description = ''
Whether to enable TLS for this listener.
Will use the TLS key/cert specified in tls_private_key_path / tls_certificate_path.
'';
default = false;
example = true;
};
x_forwarded = mkOption {
type = types.bool;
description = ''
Whether to use the X-Forwarded-For HTTP header as the client IP.
This option is only valid for an 'http' listener.
It is useful when Synapse is running behind a reverse-proxy.
'';
default = true;
example = false;
};
resources = let
typeToResources = t: {
"fed-receiver" = [ "federation" ];
"fed-sender" = [ ];
"initial-sync" = [ "client" ];
"normal-sync" = [ "client" ];
"event-persist" = [ "replication" ];
"user-dir" = [ "client" ];
}.${t};
in mkOption {
type = types.listOf (types.submodule {
options = {
names = mkOption {
type = with types; listOf (enum [
"client"
"consent"
"federation"
"keys"
"media"
"metrics"
"openid"
"replication"
"static"
"webclient"
]);
description = "A list of resources to host on this port";
default = lib.optionals instanceCfg.isAuto (typeToResources instanceCfg.type);
defaultText = ''
If the worker is generated from other config, the resource type will
be determined automatically.
'';
};
compress = mkEnableOption "HTTP compression for this resource";
};
});
default = [{ }];
};
};
};
in {
enableMetrics = mkOption {
type = types.bool;
default = cfg.settings.enable_metrics;
defaultText = literalExpression "${cfgText}.settings.enable_metrics";
# TODO: add description
};
federationSenders = mkWorkerCountOption "federation-sender";
federationReceivers = mkWorkerCountOption "federation-reciever";
initialSyncers = mkWorkerCountOption "initial-syncer";
normalSyncers = mkWorkerCountOption "sync";
eventPersisters = mkWorkerCountOption "event-persister";
useUserDirectoryWorker = mkEnableOption "user directory worker";
instances = mkOption {
type = types.attrsOf workerInstanceType;
default = { };
description = "Worker configuration";
example = {
"federation-sender-1" = {
settings = {
worker_name = "federation-sender-1";
worker_app = "synapse.app.generic_worker";
path = "/run/matrix-synapse/federation-sender-1.sock";
# worker_replication_host = "127.0.0.1";
# worker_replication_http_port = 9093;
worker_listeners = [ ];
};
};
};
};
};
config = let
genList1 = f: builtins.genList (i: f (i + 1));
in {
services.matrix-synapse-next.settings = {
federation_sender_instances =
genList1 (i: "auto-fed-sender-${toString i}") wcfg.federationSenders;
instance_map = lib.mkIf (cfg.workers.instances != { }) ({
main.path = "/run/matrix-synapse/main-replication-worker.sock";
} // builtins.mapAttrs (n: v: {
inherit (builtins.head v.settings.worker_listeners) path;
}) wcfg.instances);
stream_writers.events =
mkIf (wcfg.eventPersisters > 0)
(genList1 (i: "auto-event-persist-${toString i}") wcfg.eventPersisters);
update_user_directory_from_worker =
mkIf wcfg.useUserDirectoryWorker "auto-user-dir-1";
};
services.matrix-synapse-next.workers.instances =
let
workerInstances = {
"fed-sender" = wcfg.federationSenders;
"fed-receiver" = wcfg.federationReceivers;
"initial-sync" = wcfg.initialSyncers;
"normal-sync" = wcfg.normalSyncers;
"event-persist" = wcfg.eventPersisters;
"user-dir" = if wcfg.useUserDirectoryWorker then 1 else 0;
};
in
lib.pipe workerInstances [
(lib.mapAttrsToList (type: count: { inherit type count; }))
(map ({ type, count }: genList1 (i: rec {
name = "auto-${type}-${toString i}";
value = {
inherit type;
isAuto = true;
index = i;
settings.worker_listeners =
[
{ path = "/run/matrix-synapse/${name}.sock"; }
] ++ lib.optionals wcfg.enableMetrics [{
path = "/run/matrix-synapse/${name}-metrics.sock";
resources = [{ names = [ "metrics" ]; }];
type = "metrics";
}];
};
}) count))
lib.flatten
builtins.listToAttrs
];
systemd.services = let
workerList = lib.mapAttrsToList lib.nameValuePair wcfg.instances;
workerConfig = worker: format.generate "matrix-synapse-worker-${worker.name}-config.yaml"
(worker.value.settings // { worker_name = worker.name; });
in builtins.listToAttrs (lib.flip map workerList (worker: {
name = "matrix-synapse-worker-${worker.name}";
value = {
documentation = [ "https://github.com/matrix-org/synapse/blob/develop/docs/workers.md" ];
description = "Synapse Matrix Worker";
partOf = [ "matrix-synapse.target" ];
wantedBy = [ "matrix-synapse.target" ];
after = [ "matrix-synapse.service" ];
requires = [ "matrix-synapse.service" ];
restartTriggers = [ matrix-synapse-common-config (workerConfig worker) ] ++ cfg.extraConfigFiles;
environment = {
PYTHONPATH = lib.makeSearchPathOutput "lib" cfg.package.python.sitePackages [
pluginsEnv
];
};
serviceConfig = {
Restart = "always";
Type = "notify";
User = "matrix-synapse";
Group = "matrix-synapse";
Slice = "system-matrix-synapse.slice";
WorkingDirectory = cfg.dataDir;
RuntimeDirectory = [ "matrix-synapse" ];
ExecStartPre = pkgs.writers.writeBash "wait-for-synapse" ''
# From https://md.darmstadt.ccc.de/synapse-at-work
while ! systemctl is-active -q matrix-synapse.service; do
sleep 1
done
'';
ExecStart = let
flags = lib.cli.toGNUCommandLineShell {} {
config-path = [ matrix-synapse-common-config (workerConfig worker) ] ++ cfg.extraConfigFiles;
keys-directory = cfg.dataDir;
};
in "${cfg.package}/bin/synapse_worker ${flags}";
};
};
}));
};
}