From f5bb4ac8c23f3e03b1ce754ccc72d3aa4ad06632 Mon Sep 17 00:00:00 2001 From: h7x4 Date: Tue, 1 Aug 2023 22:34:33 +0200 Subject: [PATCH] WIP: make workers use path --- synapse-module/workers.nix | 211 +++++++++++++++---------------------- 1 file changed, 83 insertions(+), 128 deletions(-) diff --git a/synapse-module/workers.nix b/synapse-module/workers.nix index cdf1ba9..558829e 100644 --- a/synapse-module/workers.nix +++ b/synapse-module/workers.nix @@ -19,6 +19,7 @@ type = types.ints.unsigned; description = "How many automatically configured ${workerType} workers to set up"; default = 0; + example = 1; }; genAttrs' = items: f: g: builtins.listToAttrs (map (i: lib.nameValuePair (f i) (g i)) items); @@ -33,11 +34,25 @@ in { type = types.bool; internal = true; default = false; + description = '' + This is an internal flag that signals that this worker is part of the + workers generated by either of the following: + + - federationSenders + - federationReceivers + - initialSyncers + - normalSyncers + - eventPersisters + - useUserDirectoryWorker + ''; }; index = mkOption { internal = true; type = types.ints.positive; + description = '' + This is an internal variable that indexes the worker of this type. + ''; }; # The custom string type here is mainly for the name to use @@ -85,17 +100,25 @@ in { default = "http"; }; - port = mkOption { - type = types.port; - description = "The TCP port to bind to"; + path = mkOption { + type = types.path; + default = instanceCfg.name; + description = '' + A path and filename for a Unix socket. + ''; }; - bind_addresses = mkOption { - type = with types; listOf str; - description = "A list of local addresses to listen on"; - default = [ wcfg.defaultListenerAddress ]; - defaultText = literalExpression "[ ${wcfgText}.defaultListenerAddress ]"; - }; + # port = mkOption { + # type = types.port; + # description = "The TCP port to bind to"; + # }; + + # bind_addresses = mkOption { + # type = with types; listOf str; + # description = "A list of local addresses to listen on"; + # default = [ wcfg.defaultListenerAddress ]; + # defaultText = literalExpression "[ ${wcfgText}.defaultListenerAddress ]"; + # }; tls = mkOption { type = types.bool; @@ -160,38 +183,6 @@ in { }; }; in { - mainReplicationHost = mkOption { - type = types.str; - default = let - host = (matrix-lib.connectionInfo mainReplicationListener).host; - in - # To avoid connecting to 0.0.0.0 and so on - if builtins.elem host [ "0.0.0.0" "::" ] - then "127.0.0.1" - else host; - # TODO: add defaultText - description = "Host of the main synapse instance's replication listener"; - }; - - mainReplicationPort = mkOption { - type = types.port; - default = mainReplicationListener.port; - # TODO: add defaultText - description = "Port for the main synapse instance's replication listener"; - }; - - defaultListenerAddress = mkOption { - type = types.str; - default = "127.0.0.1"; - description = "The default listener address for the worker"; - }; - - workerStartingPort = mkOption { - type = types.port; - description = "What port should the automatically configured workers start enumerating from"; - default = 8083; - }; - enableMetrics = mkOption { type = types.bool; default = cfg.settings.enable_metrics; @@ -199,12 +190,6 @@ in { # TODO: add description }; - metricsStartingPort = mkOption { - type = types.port; - default = 18083; - # TODO: add description - }; - federationSenders = mkWorkerCountOption "federation-sender"; federationReceivers = mkWorkerCountOption "federation-reciever"; initialSyncers = mkWorkerCountOption "initial-syncer"; @@ -218,13 +203,14 @@ in { default = { }; description = "Worker configuration"; example = { - "federation_sender1" = { + "federation-sender-1" = { settings = { - worker_name = "federation_sender1"; + worker_name = "federation-sender-1"; worker_app = "synapse.app.generic_worker"; - worker_replication_host = "127.0.0.1"; - worker_replication_http_port = 9093; + path = "/run/matrix-synapse/federation-sender-1.sock"; + # worker_replication_host = "127.0.0.1"; + # worker_replication_http_port = 9093; worker_listeners = [ ]; }; }; @@ -232,94 +218,59 @@ in { }; }; - config = { + config = let + genList1 = f: builtins.genList (i: f (i + 1)); + in { services.matrix-synapse-next.settings = { federation_sender_instances = - lib.genList (i: "auto-fed-sender${toString (i + 1)}") wcfg.federationSenders; + genList1 (i: "auto-fed-sender-${toString i}") wcfg.federationSenders; - instance_map = (lib.mkIf (cfg.workers.instances != { }) ({ - main = let - host = lib.head mainReplicationListener.bind_addresses; - in { - host = if builtins.elem host [ "0.0.0.0" "::"] then "127.0.0.1" else host; - port = mainReplicationListener.port; - }; - } // genAttrs' (lib.lists.range 1 wcfg.eventPersisters) - (i: "auto-event-persist${toString i}") - (i: let - wRL = matrix-lib.firstListenerOfType "replication" wcfg.instances."auto-event-persist${toString i}".settings.worker_listeners; - in matrix-lib.connectionInfo wRL))); + instance_map = lib.mkIf (cfg.workers.instances != { }) ({ + main.path = "/run/matrix-synapse/main-replication-worker.sock"; + } // builtins.mapAttrs (n: v: { + inherit (builtins.head v.settings.worker_listeners) path; + }) wcfg.instances); stream_writers.events = mkIf (wcfg.eventPersisters > 0) - (lib.genList (i: "auto-event-persist${toString (i + 1)}") wcfg.eventPersisters); + (genList1 (i: "auto-event-persist-${toString i}") wcfg.eventPersisters); update_user_directory_from_worker = - mkIf wcfg.useUserDirectoryWorker "auto-user-dir"; + mkIf wcfg.useUserDirectoryWorker "auto-user-dir-1"; }; - services.matrix-synapse-next.workers.instances = let - sum = lib.foldl lib.add 0; - workerListenersWithMetrics = portOffset: - lib.singleton ({ - port = wcfg.workerStartingPort + portOffset - 1; - }) - ++ lib.optional wcfg.enableMetrics { - port = wcfg.metricsStartingPort + portOffset; - resources = [ { names = [ "metrics" ]; } ]; + services.matrix-synapse-next.workers.instances = + let + workerInstances = { + "fed-sender" = wcfg.federationSenders; + "fed-receiver" = wcfg.federationReceivers; + "initial-sync" = wcfg.initialSyncers; + "normal-sync" = wcfg.normalSyncers; + "event-persist" = wcfg.eventPersisters; + "user-dir" = if wcfg.useUserDirectoryWorker then 1 else 0; }; - - makeWorkerInstances = { - type, - numberOfWorkers, - portOffset ? 0, - nameFn ? i: "auto-${type}${toString i}", - workerListenerFn ? i: workerListenersWithMetrics (portOffset + i) - }: genAttrs' - (lib.lists.range 1 numberOfWorkers) - nameFn - (i: { - isAuto = true; - inherit type; - index = i; - settings.worker_listeners = workerListenerFn i; - }); - - workerInstances = { - "fed-sender" = wcfg.federationSenders; - "fed-receiver" = wcfg.federationReceivers; - "initial-sync" = wcfg.initialSyncers; - "normal-sync" = wcfg.normalSyncers; - "event-persist" = wcfg.eventPersisters; - } // (lib.optionalAttrs wcfg.useUserDirectoryWorker { - "user-dir" = { - numberOfWorkers = 1; - nameFn = _: "auto-user-dir"; - }; - }); - - coerceWorker = { name, value }: if builtins.isInt value then { - type = name; - numberOfWorkers = value; - } else { type = name; } // value; - - # Like foldl, but keeps all intermediate values - # - # (b -> a -> b) -> b -> [a] -> [b] - scanl = f: x1: list: let - x2 = lib.head list; - x1' = f x1 x2; - in if list == [] then [] else [x1'] ++ (scanl f x1' (lib.tail list)); - - f = { portOffset, numberOfWorkers, ... }: x: x // { portOffset = portOffset + numberOfWorkers; }; - init = { portOffset = 0; numberOfWorkers = 0; }; - in lib.pipe workerInstances [ - (lib.mapAttrsToList lib.nameValuePair) - (map coerceWorker) - (scanl f init) - (map makeWorkerInstances) - mkMerge - ]; + in + lib.pipe workerInstances [ + (lib.mapAttrsToList (type: count: { inherit type count; })) + (map ({ type, count }: genList1 (i: rec { + name = "auto-${type}-${toString i}"; + value = { + inherit type; + isAuto = true; + index = i; + settings.worker_listeners = + [ + { path = "/run/matrix-synapse/${name}.sock"; } + ] ++ lib.optionals wcfg.enableMetrics [{ + path = "/run/matrix-synapse/${name}-metrics.sock"; + resources = [{ names = [ "metrics" ]; }]; + type = "metrics"; + }]; + }; + }) count)) + lib.flatten + builtins.listToAttrs + ]; systemd.services = let workerList = lib.mapAttrsToList lib.nameValuePair wcfg.instances; @@ -328,22 +279,26 @@ in { in builtins.listToAttrs (lib.flip map workerList (worker: { name = "matrix-synapse-worker-${worker.name}"; value = { + documentation = [ "https://github.com/matrix-org/synapse/blob/develop/docs/workers.md" ]; description = "Synapse Matrix Worker"; partOf = [ "matrix-synapse.target" ]; wantedBy = [ "matrix-synapse.target" ]; after = [ "matrix-synapse.service" ]; requires = [ "matrix-synapse.service" ]; + restartTriggers = [ matrix-synapse-common-config (workerConfig worker) ] ++ cfg.extraConfigFiles; environment = { PYTHONPATH = lib.makeSearchPathOutput "lib" cfg.package.python.sitePackages [ pluginsEnv ]; }; serviceConfig = { + Restart = "always"; Type = "notify"; User = "matrix-synapse"; Group = "matrix-synapse"; Slice = "system-matrix-synapse.slice"; WorkingDirectory = cfg.dataDir; + RuntimeDirectory = [ "matrix-synapse" ]; ExecStartPre = pkgs.writers.writeBash "wait-for-synapse" '' # From https://md.darmstadt.ccc.de/synapse-at-work while ! systemctl is-active -q matrix-synapse.service; do