Skip to content

Commit f90f9b9

Browse files
authored
feat: configure internal proxy ports via explicit lists (#709)
1 parent deaa481 commit f90f9b9

File tree

8 files changed

+68
-13
lines changed

8 files changed

+68
-13
lines changed

Makefile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ dev.node2:
2929
PROXY_PORT_SESSION="5442" \
3030
PROXY_PORT_TRANSACTION="6553" \
3131
PROXY_PORT="5402" \
32+
SESSION_PROXY_PORTS="12200,12201,12202,12203" \
33+
TRANSACTION_PROXY_PORTS="12204,12205,12206,12207" \
3234
NODE_IP=localhost \
3335
AVAILABILITY_ZONE="ap-southeast-1c" \
3436
ERL_AFLAGS="-kernel shell_history enabled" \
@@ -45,6 +47,8 @@ dev.node3:
4547
CLUSTER_POSTGRES="true" \
4648
PROXY_PORT_SESSION="5443" \
4749
PROXY_PORT_TRANSACTION="6554" \
50+
SESSION_PROXY_PORTS="12300,12301,12302,12303" \
51+
TRANSACTION_PROXY_PORTS="12304,12305,12306,12307" \
4852
ERL_AFLAGS="-kernel shell_history enabled" \
4953
iex --name [email protected] --cookie cookie -S mix phx.server
5054

config/runtime.exs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
import Config
22

33
require Logger
4-
alias Supavisor.Helpers
4+
5+
parse_integer_list = fn numbers when is_binary(numbers) ->
6+
numbers
7+
|> String.split(",", trim: true)
8+
|> Enum.map(&String.to_integer/1)
9+
end
510

611
secret_key_base =
712
if config_env() in [:dev, :test] do
@@ -158,7 +163,12 @@ reconnect_retries =
158163

159164
if config_env() != :test do
160165
config :supavisor,
161-
local_proxy_shards: System.get_env("LOCAL_PROXY_SHARDS", "4") |> String.to_integer(),
166+
session_proxy_ports:
167+
System.get_env("SESSION_PROXY_PORTS", "12100,12101,12102,12103")
168+
|> parse_integer_list.(),
169+
transaction_proxy_ports:
170+
System.get_env("TRANSACTION_PROXY_PORTS", "12104,12105,12106,12107")
171+
|> parse_integer_list.(),
162172
availability_zone: System.get_env("AVAILABILITY_ZONE"),
163173
region: System.get_env("REGION") || System.get_env("FLY_REGION"),
164174
fly_alloc_id: System.get_env("FLY_ALLOC_ID"),

config/test.exs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
import Config
22

3+
parse_integer_list = fn numbers when is_binary(numbers) ->
4+
numbers
5+
|> String.split(",", trim: true)
6+
|> Enum.map(&String.to_integer/1)
7+
end
8+
39
config :supavisor,
410
region: "eu",
511
fly_alloc_id: "123e4567-e89b-12d3-a456-426614174000",
@@ -18,7 +24,10 @@ config :supavisor,
1824
metrics_blocklist: [],
1925
node_host: System.get_env("NODE_IP", "127.0.0.1"),
2026
availability_zone: System.get_env("AVAILABILITY_ZONE"),
21-
local_proxy_shards: System.get_env("LOCAL_PROXY_SHARDS", "4") |> String.to_integer(),
27+
session_proxy_ports:
28+
System.get_env("SESSION_PROXY_PORTS", "12100,12101,12102,12103") |> parse_integer_list.(),
29+
transaction_proxy_ports:
30+
System.get_env("TRANSACTION_PROXY_PORTS", "12104,12105,12106,12107") |> parse_integer_list.(),
2231
max_pools: 5,
2332
reconnect_retries: System.get_env("RECONNECT_RETRIES", "5") |> String.to_integer(),
2433
subscribe_retries: System.get_env("SUBSCRIBE_RETRIES", "5") |> String.to_integer()

docs/configuration/env.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ This document provides a categorized list of all environment variables used in S
3232
| `GLOBAL_UPSTREAM_CA_PATH` | Upstream CA certificate path | - | No |
3333
| `GLOBAL_DOWNSTREAM_CERT_PATH` | Downstream certificate path | - | No |
3434
| `GLOBAL_DOWNSTREAM_KEY_PATH` | Downstream private key path | - | No |
35-
| `LOCAL_PROXY_SHARDS` | Number of local proxy shards per mode (session/transaction) | `4` | No |
35+
| `SESSION_PROXY_PORTS` | Comma-separated list of ports for session proxy shards | `12100,12101,12102,12103` | No |
36+
| `TRANSACTION_PROXY_PORTS` | Comma-separated list of ports for transaction proxy shards | `12104,12105,12106,12107` | No |
3637

3738
---
3839

lib/supavisor.ex

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -413,8 +413,14 @@ defmodule Supavisor do
413413
@spec get_local_server(id, atom) :: map()
414414
def get_local_server(id, mode) do
415415
host = Application.get_env(:supavisor, :node_host)
416-
local_proxy_shards = Application.fetch_env!(:supavisor, :local_proxy_shards)
417-
shard = :erlang.phash2(id, local_proxy_shards)
416+
417+
ports =
418+
case mode do
419+
:session -> Application.fetch_env!(:supavisor, :session_proxy_ports)
420+
:transaction -> Application.fetch_env!(:supavisor, :transaction_proxy_ports)
421+
end
422+
423+
shard = :erlang.phash2(id, length(ports))
418424
%{host: host, port: :ranch.get_port({:pg_proxy_internal, mode, shard})}
419425
end
420426

lib/supavisor/application.ex

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,15 @@ defmodule Supavisor.Application do
4848
{Supavisor.SignalHandler, []}
4949
)
5050

51-
local_proxy_shards_count = Application.fetch_env!(:supavisor, :local_proxy_shards)
51+
session_shards =
52+
:supavisor
53+
|> Application.fetch_env!(:session_proxy_ports)
54+
|> build_shards(:session)
5255

53-
local_proxy_shards =
54-
for shard <- 0..(local_proxy_shards_count - 1), mode <- [:session, :transaction] do
55-
{{:pg_proxy_internal, mode, shard}, 0, %{mode: mode, local: true, shard: shard},
56-
Supavisor.ClientHandler}
57-
end
56+
transaction_shards =
57+
:supavisor
58+
|> Application.fetch_env!(:transaction_proxy_ports)
59+
|> build_shards(:transaction)
5860

5961
proxy_ports =
6062
[
@@ -64,7 +66,7 @@ defmodule Supavisor.Application do
6466
%{mode: :session, local: false}, Supavisor.ClientHandler},
6567
{:pg_proxy, Application.get_env(:supavisor, :proxy_port), %{mode: :proxy, local: false},
6668
Supavisor.ClientHandler}
67-
] ++ local_proxy_shards
69+
] ++ session_shards ++ transaction_shards
6870

6971
for {key, port, opts, handler} <- proxy_ports do
7072
case :ranch.start_listener(
@@ -159,6 +161,14 @@ defmodule Supavisor.Application do
159161
:ok
160162
end
161163

164+
@spec build_shards([pos_integer()], atom()) :: term()
165+
defp build_shards(ports, mode) do
166+
for {port, shard} <- Enum.with_index(ports) do
167+
{{:pg_proxy_internal, mode, shard}, port, %{mode: mode, local: true, shard: shard},
168+
Supavisor.ClientHandler}
169+
end
170+
end
171+
162172
@spec short_node_id() :: String.t() | nil
163173
defp short_node_id do
164174
with {:ok, fly_alloc_id} when is_binary(fly_alloc_id) <-

lib/supavisor/helpers.ex

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,13 @@ defmodule Supavisor.Helpers do
337337
result -> result
338338
end
339339

340+
@spec parse_integer_list(String.t()) :: [integer()]
341+
def parse_integer_list(numbers) when is_binary(numbers) do
342+
numbers
343+
|> String.split(",", trim: true)
344+
|> Enum.map(&String.to_integer/1)
345+
end
346+
340347
@doc """
341348
Sets the maximum heap size for the current process. The `max_heap_size` parameter is in megabytes.
342349

test/support/cluster.ex

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,14 @@ defmodule Supavisor.Support.Cluster do
5252
{:supavisor, :availability_zone} ->
5353
"ap-southeast-1c"
5454

55+
{:supavisor, :session_proxy_ports} ->
56+
Application.get_env(:supavisor, :session_proxy_ports)
57+
|> Enum.map(&(&1 + 100))
58+
59+
{:supavisor, :transaction_proxy_ports} ->
60+
Application.get_env(:supavisor, :transaction_proxy_ports)
61+
|> Enum.map(&(&1 + 100))
62+
5563
_ ->
5664
val
5765
end

0 commit comments

Comments
 (0)