From 240e550d4c35ebcef863ffd4adfe35207b8e53d2 Mon Sep 17 00:00:00 2001 From: Daniel Olsen Date: Fri, 9 Dec 2022 09:33:31 +0100 Subject: [PATCH] Implement sync workers --- synapse-module/default.nix | 158 ++++++++--------------------- synapse-module/nginx.nix | 199 +++++++++++++++++++++++++++++++++++++ 2 files changed, 243 insertions(+), 114 deletions(-) create mode 100644 synapse-module/nginx.nix diff --git a/synapse-module/default.nix b/synapse-module/default.nix index aa41bec..7fd6463 100644 --- a/synapse-module/default.nix +++ b/synapse-module/default.nix @@ -15,6 +15,8 @@ let isListenerType = type: listener: lib.lists.any (r: lib.lists.any (n: n == type) r.names) listener.resources; in { + imports = [ ./nginx.nix ]; + options.services.matrix-synapse-next = { enable = lib.mkEnableOption "matrix-synapse"; @@ -117,6 +119,18 @@ in default = 0; }; + initialSyncers = lib.mkOption { + type = lib.types.ints.unsigned; + description = "How many automatically configured intial syncers to set up"; + default = 0; + }; + + normalSyncers = lib.mkOption { + type = lib.types.ints.unsigned; + description = "How many automatically configured sync workers to set up"; + default = 0; + }; + instances = lib.mkOption { type = lib.types.attrsOf (lib.types.submodule ({config, ...}: { @@ -147,6 +161,8 @@ in mapTypeApp = t: { "fed-sender" = "synapse.app.generic_worker"; "fed-receiver" = "synapse.app.generic_worker"; + "initial-sync" = "synapse.app.generic_worker"; + "normal-sync" = "synapse.app.generic_worker"; }.${t}; defaultApp = if (!isAuto) then "synapse.app.generic_worker" @@ -204,6 +220,8 @@ in typeToResources = t: { "fed-receiver" = [ "federation" ]; "fed-sender" = [ ]; + "initial-sync" = [ "client" ]; + "normal-sync" = [ "client" ]; }.${t}; in lib.mkOption { type = lib.types.listOf (lib.types.submodule { @@ -546,6 +564,31 @@ in }; }); }) + + + ({ + services.matrix-synapse-next.workers.instances = genAttrs' (lib.lists.range 1 cfg.workers.initialSyncers) + (i: "auto-initial-sync${toString i}") + (i: { + isAuto = true; type = "initial-sync"; index = i; + settings.worker_listeners = [{ port = cfg.workers.workerStartingPort + cfg.workers.federationReceivers + i - 1; }] + ++ lib.optional wcfg.enableMetrics { port = cfg.workers.metricsStartingPort + cfg.workers.federationSenders + cfg.workers.federationReceivers + i; + resources = [ { names = [ "metrics" ]; } ]; + }; + }); + }) + + ({ + services.matrix-synapse-next.workers.instances = genAttrs' (lib.lists.range 1 cfg.workers.normalSyncers) + (i: "auto-normal-sync${toString i}") + (i: { + isAuto = true; type = "normal-sync"; index = i; + settings.worker_listeners = [{ port = cfg.workers.workerStartingPort + cfg.workers.federationReceivers + cfg.workers.initialSyncers + i - 1; }] + ++ lib.optional wcfg.enableMetrics { port = cfg.workers.metricsStartingPort + cfg.workers.federationSenders + cfg.workers.federationReceivers + cfg.workers.initialSyncers + i; + resources = [ { names = [ "metrics" ]; } ]; + }; + }); + }) ]) ({ @@ -586,119 +629,6 @@ in }; } ) workerList); - }) - - (lib.mkIf cfg.enableNginx { - services.nginx.commonHttpConfig = '' - map $request_uri $synapse_backend { - default synapse_master; - - # Sync requests - ~*^/_matrix/client/(v2_alpha|r0)/sync$ synapse_client; - ~*^/_matrix/client/(api/v1|v2_alpha|r0)/events$ synapse_client; - ~*^/_matrix/client/(api/v1|r0)/initialSync$ synapse_client; - ~*^/_matrix/client/(api/v1|r0)/rooms/[^/]+/initialSync$ synapse_client; - - # Federation requests - ~*^/_matrix/federation/v1/event/ synapse_federation; - ~*^/_matrix/federation/v1/state/ synapse_federation; - ~*^/_matrix/federation/v1/state_ids/ synapse_federation; - ~*^/_matrix/federation/v1/backfill/ synapse_federation; - ~*^/_matrix/federation/v1/get_missing_events/ synapse_federation; - ~*^/_matrix/federation/v1/publicRooms synapse_federation; - ~*^/_matrix/federation/v1/query/ synapse_federation; - ~*^/_matrix/federation/v1/make_join/ synapse_federation; - ~*^/_matrix/federation/v1/make_leave/ synapse_federation; - ~*^/_matrix/federation/v1/send_join/ synapse_federation; - ~*^/_matrix/federation/v2/send_join/ synapse_federation; - ~*^/_matrix/federation/v1/send_leave/ synapse_federation; - ~*^/_matrix/federation/v2/send_leave/ synapse_federation; - ~*^/_matrix/federation/v1/invite/ synapse_federation; - ~*^/_matrix/federation/v2/invite/ synapse_federation; - ~*^/_matrix/federation/v1/query_auth/ synapse_federation; - ~*^/_matrix/federation/v1/event_auth/ synapse_federation; - ~*^/_matrix/federation/v1/exchange_third_party_invite/ synapse_federation; - ~*^/_matrix/federation/v1/user/devices/ synapse_federation; - ~*^/_matrix/federation/v1/get_groups_publicised$ synapse_federation; - ~*^/_matrix/key/v2/query synapse_federation; - - # Inbound federation transaction request - ~*^/_matrix/federation/v1/send/ synapse_federation; - - # Client API requests - ~*^/_matrix/client/(api/v1|r0|unstable)/publicRooms$ synapse_client; - ~*^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/joined_members$ synapse_client; - ~*^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/context/.*$ synapse_client; - ~*^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/members$ synapse_client; - ~*^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/state$ synapse_client; - ~*^/_matrix/client/(api/v1|r0|unstable)/account/3pid$ synapse_client; - ~*^/_matrix/client/(api/v1|r0|unstable)/devices$ synapse_client; - ~*^/_matrix/client/(api/v1|r0|unstable)/keys/query$ synapse_client; - ~*^/_matrix/client/(api/v1|r0|unstable)/keys/changes$ synapse_client; - ~*^/_matrix/client/versions$ synapse_client; - ~*^/_matrix/client/(api/v1|r0|unstable)/voip/turnServer$ synapse_client; - ~*^/_matrix/client/(api/v1|r0|unstable)/joined_groups$ synapse_client; - ~*^/_matrix/client/(api/v1|r0|unstable)/publicised_groups$ synapse_client; - ~*^/_matrix/client/(api/v1|r0|unstable)/publicised_groups/ synapse_client; - ~*^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/event/ synapse_client; - ~*^/_matrix/client/(api/v1|r0|unstable)/joined_rooms$ synapse_client; - ~*^/_matrix/client/(api/v1|r0|unstable)/search$ synapse_client; - - # Registration/login requests - ~*^/_matrix/client/(api/v1|r0|unstable)/login$ synapse_client; - ~*^/_matrix/client/(r0|unstable)/register$ synapse_client; - - # Event sending requests - ~*^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/redact synapse_client; - ~*^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/send synapse_client; - ~*^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/state/ synapse_client; - ~*^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$ synapse_client; - ~*^/_matrix/client/(api/v1|r0|unstable)/join/ synapse_client; - ~*^/_matrix/client/(api/v1|r0|unstable)/profile/ synapse_client; - } - ''; - - services.nginx.upstreams.synapse_master.servers = let - isMainListener = l: isListenerType "client" l && isListenerType "federation" l; - firstMainListener = lib.findFirst isMainListener - (throw "No cartch-all listener configured") cfg.settings.listeners; - address = lib.findFirst (_: true) (throw "No address in main listener") firstMainListener.bind_addresses; - port = firstMainListener.port; - socketAddress = "${address}:${builtins.toString port}"; - in { - "${socketAddress}" = { }; - }; - - services.nginx.upstreams.synapse_federation.servers = let - fedReceivers = lib.filterAttrs (_: w: w.type == "fed-receiver") cfg.workers.instances; - isListenerType = type: listener: lib.lists.any (r: lib.lists.any (n: n == type) r.names) listener.resources; - isFedListener = l: isListenerType "federation" l; - - firstFedListener = w: lib.lists.findFirst isFedListener (throw "No federation endpoint on receiver") w.settings.worker_listeners; - - wAddress = w: lib.lists.findFirst (_: true) (throw "No address in receiver") (firstFedListener w).bind_addresses; - wPort = w: (firstFedListener w).port; - - socketAddress = w: "${wAddress w}:${builtins.toString (wPort w)}"; - socketAddresses = lib.mapAttrsToList (_: value: "${socketAddress value}") fedReceivers; - in if fedReceivers != [ ] then lib.genAttrs socketAddresses (_: { }) else config.services.nginx.upstreams.synapse_master.servers; - - services.nginx.upstreams.synapse_client.servers = config.services.nginx.upstreams.synapse_master.servers; - - - services.nginx.virtualHosts."${cfg.public_baseurl}" = { - enableACME = true; - forceSSL = true; - locations."/_matrix" = { - proxyPass = "http://$synapse_backend"; - extraConfig = '' - client_max_body_size ${cfg.settings.max_upload_size}; - ''; - }; - locations."/_synapse/client" = { - proxyPass = "http://$synapse_backend"; - }; - }; - }) + }) ]); } \ No newline at end of file diff --git a/synapse-module/nginx.nix b/synapse-module/nginx.nix new file mode 100644 index 0000000..6e42a3f --- /dev/null +++ b/synapse-module/nginx.nix @@ -0,0 +1,199 @@ +{ lib, pkgs, config, ...}: +let + cfg = config.services.matrix-synapse-next; + + getWorkersOfType = type: lib.filterAttrs (_: w: w.type == type) cfg.workers.instances; + isListenerType = type: listener: lib.lists.any (r: lib.lists.any (n: n == type) r.names) listener.resources; + firstListenerOfType = type: worker: lib.lists.findFirst (isListenerType type) (throw "No federation endpoint on receiver") worker.settings.worker_listeners; + wAddressOfType = type: w: lib.lists.findFirst (_: true) (throw "No address in receiver") (firstListenerOfType type w).bind_addresses; + wPortOfType = type: w: (firstListenerOfType type w).port; + wSocketAddressOfType = type: w: "${wAddressOfType type w}:${builtins.toString (wPortOfType type w)}"; + generateSocketAddresses = type: workers: lib.mapAttrsToList (_: w: "${wSocketAddressOfType type w}") workers; +in +{ + config = lib.mkIf cfg.enableNginx { + services.nginx.commonHttpConfig = '' + # No since argument means its initialSync + map $arg_since $synapse_unknown_sync { + default synapse_normal_sync; + ''' synapse_initial_sync; + } + + map $request_uri $synapse_uri_group { + # Sync requests + ^/_matrix/client/(r0|v3)/sync$ $synapse_unknown_sync; + ^/_matrix/client/(api/v1|r0|v3)/event$ synapse_normal_sync; + ^/_matrix/client/(api/v1|r0|v3)/initialSync$ synapse_initial_sync; + ^/_matrix/client/(api/v1|r0|v3)/rooms/[^/]+/initialSync$ synapse_initial_sync; + + # Federation requests + ^/_matrix/federation/v1/event/ synapse_federation; + ^/_matrix/federation/v1/state/ synapse_federation; + ^/_matrix/federation/v1/state_ids/ synapse_federation; + ^/_matrix/federation/v1/backfill/ synapse_federation; + ^/_matrix/federation/v1/get_missing_events/ synapse_federation; + ^/_matrix/federation/v1/publicRooms synapse_federation; + ^/_matrix/federation/v1/query/ synapse_federation; + ^/_matrix/federation/v1/make_join/ synapse_federation; + ^/_matrix/federation/v1/make_leave/ synapse_federation; + ^/_matrix/federation/(v1|v2)/send_join/ synapse_federation; + ^/_matrix/federation/(v1|v2)/send_leave/ synapse_federation; + ^/_matrix/federation/(v1|v2)/invite/ synapse_federation; + ^/_matrix/federation/v1/event_auth/ synapse_federation; + ^/_matrix/federation/v1/timestamp_to_event/ synapse_federation; + ^/_matrix/federation/v1/exchange_third_party_invite/ synapse_federation; + ^/_matrix/federation/v1/user/devices/ synapse_federation; + ^/_matrix/key/v2/query synapse_federation; + ^/_matrix/federation/v1/hierarchy/ synapse_federation; + + # Inbound federation transaction request + ^/_matrix/federation/v1/send/ synapse_federation_transaction; + + # Client API requests + ^/_matrix/client/(api/v1|r0|v3|unstable)/createRoom$ synapse_client_interaction; + ^/_matrix/client/(api/v1|r0|v3|unstable)/publicRooms$ synapse_client_interaction; + ^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/joined_members$ synapse_client_interaction; + ^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/context/.*$ synapse_client_interaction; + ^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/members$ synapse_client_interaction; + ^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/state$ synapse_client_interaction; + ^/_matrix/client/v1/rooms/.*/hierarchy$ synapse_client_interaction; + ^/_matrix/client/(v1|unstable)/rooms/.*/relations/ synapse_client_interaction; + ^/_matrix/client/v1/rooms/.*/threads$ synapse_client_interaction; + ^/_matrix/client/unstable/org.matrix.msc2716/rooms/.*/batch_send$ synapse_client_interaction; + ^/_matrix/client/unstable/im.nheko.summary/rooms/.*/summary$ synapse_client_interaction; + ^/_matrix/client/(r0|v3|unstable)/account/3pid$ synapse_client_interaction; + ^/_matrix/client/(r0|v3|unstable)/account/whoami$ synapse_client_interaction; + ^/_matrix/client/(r0|v3|unstable)/devices$ synapse_client_interaction; + ^/_matrix/client/versions$ synapse_client_interaction; + ^/_matrix/client/(api/v1|r0|v3|unstable)/voip/turnServer$ synapse_client_interaction; + ^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/event/ synapse_client_interaction; + ^/_matrix/client/(api/v1|r0|v3|unstable)/joined_rooms$ synapse_client_interaction; + ^/_matrix/client/v1/rooms/.*/timestamp_to_event$ synapse_client_interaction; + ^/_matrix/client/(api/v1|r0|v3|unstable)/search$ synapse_client_interaction; + + # Encryption requests + ^/_matrix/client/(r0|v3|unstable)/keys/query$ synapse_client_encryption; + ^/_matrix/client/(r0|v3|unstable)/keys/changes$ synapse_client_encryption; + ^/_matrix/client/(r0|v3|unstable)/keys/claim$ synapse_client_encryption; + ^/_matrix/client/(r0|v3|unstable)/room_keys/ synapse_client_encryption; + ^/_matrix/client/(r0|v3|unstable)/keys/upload/ synapse_client_encryption; + + # Registration/login requests + ^/_matrix/client/(api/v1|r0|v3|unstable)/login$ synapse_client_login; + ^/_matrix/client/(r0|v3|unstable)/register$ synapse_client_login; + ^/_matrix/client/v1/register/m.login.registration_token/validity$ synapse_client_login; + + # Event sending requests + ^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/redact synapse_client_transaction; + ^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/send synapse_client_transaction; + ^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/state/ synapse_client_transaction; + ^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$ synapse_client_transaction; + ^/_matrix/client/(api/v1|r0|v3|unstable)/join/ synapse_client_transaction; + ^/_matrix/client/(api/v1|r0|v3|unstable)/profile/ synapse_client_transaction; + + # Account data requests + ^/_matrix/client/(r0|v3|unstable)/.*/tags synapse_client_data; + ^/_matrix/client/(r0|v3|unstable)/.*/account_data synapse_client_data; + + # Receipts requests + ^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt synapse_client_interaction; + ^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers synapse_client_interaction; + + # Presence requests + ^/_matrix/client/(api/v1|r0|v3|unstable)/presence/ synapse_client_presence; + + # User directory search requests; + ^/_matrix/client/(r0|v3|unstable)/user_directory/search$ synapse_client_search; + } + + #Plugboard for url -> workers + map $synapse_uri_group $synapse_backend { + default synapse_master; + + # synapse_initial_sync synapse_worker_initial_sync; + # synapse_normal_sync synapse_worker_normal_sync; + + synapse_federation synapse_worker_federation; + synapse_federation_transaction synapse_worker_federation; + } + + # from https://github.com/tswfi/synapse/commit/b3704b936663cc692241e978dce4ac623276b1a6 + map $arg_access_token $accesstoken_from_urlparam { + # Defaults to just passing back the whole accesstoken + default $arg_access_token; + # Try to extract username part from accesstoken URL parameter + "~syt_(?.*?)_.*" $username; + } + + map $http_authorization $mxid_localpart { + # Defaults to just passing back the whole accesstoken + default $http_authorization; + # Try to extract username part from accesstoken header + "~Bearer syt_(?.*?)_.*" $username; + # if no authorization-header exist, try mapper for URL parameter "access_token" + "" $accesstoken_from_urlparam; + } + ''; + + services.nginx.upstreams.synapse_master.servers = let + isMainListener = l: isListenerType "client" l && isListenerType "federation" l; + firstMainListener = lib.findFirst isMainListener + (throw "No cartch-all listener configured") cfg.settings.listeners; + address = lib.findFirst (_: true) (throw "No address in main listener") firstMainListener.bind_addresses; + port = firstMainListener.port; + socketAddress = "${address}:${builtins.toString port}"; + in { + "${socketAddress}" = { }; + }; + + + services.nginx.upstreams.synapse_worker_federation = { + servers = let + fedReceivers = getWorkersOfType "fed-receiver"; + socketAddresses = generateSocketAddresses "federation" fedReceivers; + in if fedReceivers != [ ] then + lib.genAttrs socketAddresses (_: { }) + else config.services.nginx.upstreams.synapse_master.servers; + extraConfig = '' + hash $mxid_localpart consistent; + ''; + }; + + + services.nginx.upstreams.synapse_worker_initial_sync = { + servers = let + initialSyncers = getWorkersOfType "inital-sync"; + socketAddresses = generateSocketAddresses "client" initialSyncers; + in if initialSyncers != [ ] then + lib.genAttrs socketAddresses (_: { }) + else config.services.nginx.upstreams.synapse_master.server; + extraConfig = '' + hash $mxid_localpart consistent; + ''; + }; + + + services.nginx.upstreams.synapse_worker_normal_sync.servers = let + normalSyncers = getWorkersOfType "normal-sync"; + socketAddresses = generateSocketAddresses "client" normalSyncers; + in if normalSyncers != [ ] then + lib.genAttrs socketAddresses (_: { }) + else config.services.nginx.upstreams.synapse_master.server; + + + + services.nginx.virtualHosts."${cfg.public_baseurl}" = { + enableACME = true; + forceSSL = true; + locations."/_matrix" = { + proxyPass = "http://$synapse_backend"; + extraConfig = '' + client_max_body_size ${cfg.settings.max_upload_size}; + ''; + }; + locations."/_synapse/client" = { + proxyPass = "http://$synapse_backend"; + }; + }; +}; +} \ No newline at end of file