WIP: make workers use path

This commit is contained in:
Oystein Kristoffer Tveit 2023-08-01 22:34:33 +02:00
parent 1f1475aec6
commit f5bb4ac8c2
Signed by untrusted user: oysteikt
GPG Key ID: 9F2F7D8250F35146
1 changed files with 83 additions and 128 deletions

View File

@ -19,6 +19,7 @@
type = types.ints.unsigned; type = types.ints.unsigned;
description = "How many automatically configured ${workerType} workers to set up"; description = "How many automatically configured ${workerType} workers to set up";
default = 0; default = 0;
example = 1;
}; };
genAttrs' = items: f: g: builtins.listToAttrs (map (i: lib.nameValuePair (f i) (g i)) items); genAttrs' = items: f: g: builtins.listToAttrs (map (i: lib.nameValuePair (f i) (g i)) items);
@ -33,11 +34,25 @@ in {
type = types.bool; type = types.bool;
internal = true; internal = true;
default = false; default = false;
description = ''
This is an internal flag that signals that this worker is part of the
workers generated by either of the following:
- federationSenders
- federationReceivers
- initialSyncers
- normalSyncers
- eventPersisters
- useUserDirectoryWorker
'';
}; };
index = mkOption { index = mkOption {
internal = true; internal = true;
type = types.ints.positive; type = types.ints.positive;
description = ''
This is an internal variable that indexes the worker of this type.
'';
}; };
# The custom string type here is mainly for the name to use # The custom string type here is mainly for the name to use
@ -85,17 +100,25 @@ in {
default = "http"; default = "http";
}; };
port = mkOption { path = mkOption {
type = types.port; type = types.path;
description = "The TCP port to bind to"; default = instanceCfg.name;
description = ''
A path and filename for a Unix socket.
'';
}; };
bind_addresses = mkOption { # port = mkOption {
type = with types; listOf str; # type = types.port;
description = "A list of local addresses to listen on"; # description = "The TCP port to bind to";
default = [ wcfg.defaultListenerAddress ]; # };
defaultText = literalExpression "[ ${wcfgText}.defaultListenerAddress ]";
}; # bind_addresses = mkOption {
# type = with types; listOf str;
# description = "A list of local addresses to listen on";
# default = [ wcfg.defaultListenerAddress ];
# defaultText = literalExpression "[ ${wcfgText}.defaultListenerAddress ]";
# };
tls = mkOption { tls = mkOption {
type = types.bool; type = types.bool;
@ -160,38 +183,6 @@ in {
}; };
}; };
in { in {
mainReplicationHost = mkOption {
type = types.str;
default = let
host = (matrix-lib.connectionInfo mainReplicationListener).host;
in
# To avoid connecting to 0.0.0.0 and so on
if builtins.elem host [ "0.0.0.0" "::" ]
then "127.0.0.1"
else host;
# TODO: add defaultText
description = "Host of the main synapse instance's replication listener";
};
mainReplicationPort = mkOption {
type = types.port;
default = mainReplicationListener.port;
# TODO: add defaultText
description = "Port for the main synapse instance's replication listener";
};
defaultListenerAddress = mkOption {
type = types.str;
default = "127.0.0.1";
description = "The default listener address for the worker";
};
workerStartingPort = mkOption {
type = types.port;
description = "What port should the automatically configured workers start enumerating from";
default = 8083;
};
enableMetrics = mkOption { enableMetrics = mkOption {
type = types.bool; type = types.bool;
default = cfg.settings.enable_metrics; default = cfg.settings.enable_metrics;
@ -199,12 +190,6 @@ in {
# TODO: add description # TODO: add description
}; };
metricsStartingPort = mkOption {
type = types.port;
default = 18083;
# TODO: add description
};
federationSenders = mkWorkerCountOption "federation-sender"; federationSenders = mkWorkerCountOption "federation-sender";
federationReceivers = mkWorkerCountOption "federation-reciever"; federationReceivers = mkWorkerCountOption "federation-reciever";
initialSyncers = mkWorkerCountOption "initial-syncer"; initialSyncers = mkWorkerCountOption "initial-syncer";
@ -218,13 +203,14 @@ in {
default = { }; default = { };
description = "Worker configuration"; description = "Worker configuration";
example = { example = {
"federation_sender1" = { "federation-sender-1" = {
settings = { settings = {
worker_name = "federation_sender1"; worker_name = "federation-sender-1";
worker_app = "synapse.app.generic_worker"; worker_app = "synapse.app.generic_worker";
worker_replication_host = "127.0.0.1"; path = "/run/matrix-synapse/federation-sender-1.sock";
worker_replication_http_port = 9093; # worker_replication_host = "127.0.0.1";
# worker_replication_http_port = 9093;
worker_listeners = [ ]; worker_listeners = [ ];
}; };
}; };
@ -232,94 +218,59 @@ in {
}; };
}; };
config = { config = let
genList1 = f: builtins.genList (i: f (i + 1));
in {
services.matrix-synapse-next.settings = { services.matrix-synapse-next.settings = {
federation_sender_instances = federation_sender_instances =
lib.genList (i: "auto-fed-sender${toString (i + 1)}") wcfg.federationSenders; genList1 (i: "auto-fed-sender-${toString i}") wcfg.federationSenders;
instance_map = (lib.mkIf (cfg.workers.instances != { }) ({ instance_map = lib.mkIf (cfg.workers.instances != { }) ({
main = let main.path = "/run/matrix-synapse/main-replication-worker.sock";
host = lib.head mainReplicationListener.bind_addresses; } // builtins.mapAttrs (n: v: {
in { inherit (builtins.head v.settings.worker_listeners) path;
host = if builtins.elem host [ "0.0.0.0" "::"] then "127.0.0.1" else host; }) wcfg.instances);
port = mainReplicationListener.port;
};
} // genAttrs' (lib.lists.range 1 wcfg.eventPersisters)
(i: "auto-event-persist${toString i}")
(i: let
wRL = matrix-lib.firstListenerOfType "replication" wcfg.instances."auto-event-persist${toString i}".settings.worker_listeners;
in matrix-lib.connectionInfo wRL)));
stream_writers.events = stream_writers.events =
mkIf (wcfg.eventPersisters > 0) mkIf (wcfg.eventPersisters > 0)
(lib.genList (i: "auto-event-persist${toString (i + 1)}") wcfg.eventPersisters); (genList1 (i: "auto-event-persist-${toString i}") wcfg.eventPersisters);
update_user_directory_from_worker = update_user_directory_from_worker =
mkIf wcfg.useUserDirectoryWorker "auto-user-dir"; mkIf wcfg.useUserDirectoryWorker "auto-user-dir-1";
}; };
services.matrix-synapse-next.workers.instances = let services.matrix-synapse-next.workers.instances =
sum = lib.foldl lib.add 0; let
workerListenersWithMetrics = portOffset: workerInstances = {
lib.singleton ({ "fed-sender" = wcfg.federationSenders;
port = wcfg.workerStartingPort + portOffset - 1; "fed-receiver" = wcfg.federationReceivers;
}) "initial-sync" = wcfg.initialSyncers;
++ lib.optional wcfg.enableMetrics { "normal-sync" = wcfg.normalSyncers;
port = wcfg.metricsStartingPort + portOffset; "event-persist" = wcfg.eventPersisters;
resources = [ { names = [ "metrics" ]; } ]; "user-dir" = if wcfg.useUserDirectoryWorker then 1 else 0;
}; };
in
makeWorkerInstances = { lib.pipe workerInstances [
type, (lib.mapAttrsToList (type: count: { inherit type count; }))
numberOfWorkers, (map ({ type, count }: genList1 (i: rec {
portOffset ? 0, name = "auto-${type}-${toString i}";
nameFn ? i: "auto-${type}${toString i}", value = {
workerListenerFn ? i: workerListenersWithMetrics (portOffset + i) inherit type;
}: genAttrs' isAuto = true;
(lib.lists.range 1 numberOfWorkers) index = i;
nameFn settings.worker_listeners =
(i: { [
isAuto = true; { path = "/run/matrix-synapse/${name}.sock"; }
inherit type; ] ++ lib.optionals wcfg.enableMetrics [{
index = i; path = "/run/matrix-synapse/${name}-metrics.sock";
settings.worker_listeners = workerListenerFn i; resources = [{ names = [ "metrics" ]; }];
}); type = "metrics";
}];
workerInstances = { };
"fed-sender" = wcfg.federationSenders; }) count))
"fed-receiver" = wcfg.federationReceivers; lib.flatten
"initial-sync" = wcfg.initialSyncers; builtins.listToAttrs
"normal-sync" = wcfg.normalSyncers; ];
"event-persist" = wcfg.eventPersisters;
} // (lib.optionalAttrs wcfg.useUserDirectoryWorker {
"user-dir" = {
numberOfWorkers = 1;
nameFn = _: "auto-user-dir";
};
});
coerceWorker = { name, value }: if builtins.isInt value then {
type = name;
numberOfWorkers = value;
} else { type = name; } // value;
# Like foldl, but keeps all intermediate values
#
# (b -> a -> b) -> b -> [a] -> [b]
scanl = f: x1: list: let
x2 = lib.head list;
x1' = f x1 x2;
in if list == [] then [] else [x1'] ++ (scanl f x1' (lib.tail list));
f = { portOffset, numberOfWorkers, ... }: x: x // { portOffset = portOffset + numberOfWorkers; };
init = { portOffset = 0; numberOfWorkers = 0; };
in lib.pipe workerInstances [
(lib.mapAttrsToList lib.nameValuePair)
(map coerceWorker)
(scanl f init)
(map makeWorkerInstances)
mkMerge
];
systemd.services = let systemd.services = let
workerList = lib.mapAttrsToList lib.nameValuePair wcfg.instances; workerList = lib.mapAttrsToList lib.nameValuePair wcfg.instances;
@ -328,22 +279,26 @@ in {
in builtins.listToAttrs (lib.flip map workerList (worker: { in builtins.listToAttrs (lib.flip map workerList (worker: {
name = "matrix-synapse-worker-${worker.name}"; name = "matrix-synapse-worker-${worker.name}";
value = { value = {
documentation = [ "https://github.com/matrix-org/synapse/blob/develop/docs/workers.md" ];
description = "Synapse Matrix Worker"; description = "Synapse Matrix Worker";
partOf = [ "matrix-synapse.target" ]; partOf = [ "matrix-synapse.target" ];
wantedBy = [ "matrix-synapse.target" ]; wantedBy = [ "matrix-synapse.target" ];
after = [ "matrix-synapse.service" ]; after = [ "matrix-synapse.service" ];
requires = [ "matrix-synapse.service" ]; requires = [ "matrix-synapse.service" ];
restartTriggers = [ matrix-synapse-common-config (workerConfig worker) ] ++ cfg.extraConfigFiles;
environment = { environment = {
PYTHONPATH = lib.makeSearchPathOutput "lib" cfg.package.python.sitePackages [ PYTHONPATH = lib.makeSearchPathOutput "lib" cfg.package.python.sitePackages [
pluginsEnv pluginsEnv
]; ];
}; };
serviceConfig = { serviceConfig = {
Restart = "always";
Type = "notify"; Type = "notify";
User = "matrix-synapse"; User = "matrix-synapse";
Group = "matrix-synapse"; Group = "matrix-synapse";
Slice = "system-matrix-synapse.slice"; Slice = "system-matrix-synapse.slice";
WorkingDirectory = cfg.dataDir; WorkingDirectory = cfg.dataDir;
RuntimeDirectory = [ "matrix-synapse" ];
ExecStartPre = pkgs.writers.writeBash "wait-for-synapse" '' ExecStartPre = pkgs.writers.writeBash "wait-for-synapse" ''
# From https://md.darmstadt.ccc.de/synapse-at-work # From https://md.darmstadt.ccc.de/synapse-at-work
while ! systemctl is-active -q matrix-synapse.service; do while ! systemctl is-active -q matrix-synapse.service; do