From f279a6e4cf50669b4e2ecd736562fe25b0e365e2 Mon Sep 17 00:00:00 2001 From: Rogerio Pontual Date: Wed, 30 Apr 2025 15:25:46 +0200 Subject: [PATCH 01/11] Using CRDTs for the processes only with Horde --- lib/lightning/application.ex | 15 +++ lib/lightning/extensions/rate_limiting.ex | 11 +- lib/lightning/rate_limiters.ex | 52 +++++++++ lib/lightning/services/rate_limiter.ex | 2 +- lib/lightning/webhook_rate_limiter.ex | 100 ++++++++++++++++++ .../controllers/webhooks_controller.ex | 92 +++++++++------- mix.exs | 3 +- mix.lock | 2 + test/lightning/webhook_rate_limiter_test.exs | 100 ++++++++++++++++++ test/test_helper.exs | 7 ++ 10 files changed, 334 insertions(+), 50 deletions(-) create mode 100644 lib/lightning/rate_limiters.ex create mode 100644 lib/lightning/webhook_rate_limiter.ex create mode 100644 test/lightning/webhook_rate_limiter_test.exs diff --git a/lib/lightning/application.ex b/lib/lightning/application.ex index 7b811ced49..36a15a7bbb 100644 --- a/lib/lightning/application.ex +++ b/lib/lightning/application.ex @@ -107,6 +107,12 @@ defmodule Lightning.Application do [ Lightning.PromEx, {Cluster.Supervisor, [topologies, [name: Lightning.ClusterSupervisor]]}, + {Horde.Registry, + name: Lightning.HordeRegistry, keys: :unique, members: :auto}, + {Horde.DynamicSupervisor, + name: Lightning.DistributedSupervisor, + strategy: :one_for_one, + members: :auto}, {Lightning.Vault, Application.get_env(:lightning, Lightning.Vault, [])}, # Start the Ecto repository Lightning.Repo, @@ -174,6 +180,15 @@ defmodule Lightning.Application do :ok end + def start_phase(:init_rate_limiter, :normal, _args) do + Horde.DynamicSupervisor.start_child( + Lightning.DistributedSupervisor, + Lightning.WebhookRateLimiter + ) + + :ok + end + def oban_opts do opts = Application.get_env(:lightning, Oban) diff --git a/lib/lightning/extensions/rate_limiting.ex b/lib/lightning/extensions/rate_limiting.ex index 9371b79198..aae3a0232d 100644 --- a/lib/lightning/extensions/rate_limiting.ex +++ b/lib/lightning/extensions/rate_limiting.ex @@ -8,16 +8,9 @@ defmodule Lightning.Extensions.RateLimiting do @type message :: Lightning.Extensions.Message.t() defmodule Context do - @moduledoc """ - Which user is making the request for a certain project. - """ + @type t :: %Context{project_id: Ecto.UUID.t()} - @type t :: %Context{ - project_id: Ecto.UUID.t(), - user_id: Ecto.UUID.t() | nil - } - - defstruct [:project_id, :user_id] + defstruct [:project_id] end @callback limit_request( diff --git a/lib/lightning/rate_limiters.ex b/lib/lightning/rate_limiters.ex new file mode 100644 index 0000000000..4db0763cbf --- /dev/null +++ b/lib/lightning/rate_limiters.ex @@ -0,0 +1,52 @@ +defmodule Lightning.RateLimiters do + @moduledoc false + + defmodule Mail do + @moduledoc false + + # WARNING: When changing the algorithm, you must also update the mnesia table name. + # The default is to use __MODULE__, passing `:table` to the `use Hammer` macro + # allows you to specify a custom table name. + use Hammer, + backend: Hammer.Mnesia, + algorithm: :leaky_bucket, + table: :mail_limiter + + @type hit_result :: + {:allow, + %{ + count: non_neg_integer(), + time_scale: non_neg_integer(), + rate_limit: non_neg_integer() + }} + | {:deny, non_neg_integer()} + end + + @spec hit({:failure_email, String.t(), String.t()}) :: Mail.hit_result() + def hit({:failure_email, workflow_id, user_id}) do + [time_scale: time_scale, rate_limit: rate_limit] = + Application.fetch_env!(:lightning, Lightning.FailureAlerter) + + Mail.hit("#{workflow_id}::#{user_id}", time_scale, rate_limit) + |> case do + {:allow, count} -> + {:allow, %{count: count, time_scale: time_scale, rate_limit: rate_limit}} + + {:deny, count} -> + {:deny, count} + end + end + + def child_spec(opts) do + %{ + id: __MODULE__, + start: {__MODULE__, :start_link, [opts]}, + type: :supervisor + } + end + + def start_link(opts) do + children = [{Mail, opts}] + Supervisor.start_link(children, strategy: :one_for_one) + end +end diff --git a/lib/lightning/services/rate_limiter.ex b/lib/lightning/services/rate_limiter.ex index f069314aee..c11f952e5e 100644 --- a/lib/lightning/services/rate_limiter.ex +++ b/lib/lightning/services/rate_limiter.ex @@ -7,7 +7,7 @@ defmodule Lightning.Services.RateLimiter do import Lightning.Services.AdapterHelper @impl true - def limit_request(conn, context, opts) do + def limit_request(conn, context, opts \\ []) do adapter().limit_request(conn, context, opts) end diff --git a/lib/lightning/webhook_rate_limiter.ex b/lib/lightning/webhook_rate_limiter.ex new file mode 100644 index 0000000000..294ee3942d --- /dev/null +++ b/lib/lightning/webhook_rate_limiter.ex @@ -0,0 +1,100 @@ +defmodule Lightning.WebhookRateLimiter do + @moduledoc false + use GenServer + + @capacity 10 + @refill_per_sec 2 + + require Logger + + def child_spec(opts) do + {id, name} = + if name = Keyword.get(opts, :name) do + {"#{__MODULE__}_#{name}", name} + else + {__MODULE__, __MODULE__} + end + + %{ + id: id, + start: {__MODULE__, :start_link, [name]}, + shutdown: 10_000, + restart: :transient + } + end + + def start_link(name) do + with {:error, {:already_started, pid}} <- + GenServer.start_link(__MODULE__, [], name: via_tuple(name)) do + Logger.info("already started at #{inspect(pid)}, returning :ignore") + :ignore + end + end + + @impl true + def init([]) do + Process.flag(:trap_exit, true) + + {:ok, %{table: :ets.new(:table, [:set])}} + end + + def check_rate(bucket, cost \\ 1, name \\ __MODULE__) do + name + |> via_tuple() + |> GenServer.call({:check_rate, bucket, cost}) + end + + def inspect_table(name \\ __MODULE__) do + name + |> via_tuple() + |> GenServer.call(:inspect_table) + end + + @impl true + def handle_call({:check_rate, bucket, cost}, _from, %{table: table} = state) do + {:reply, do_check_rate(table, bucket, cost), state} + end + + @impl true + def handle_call(:inspect_table, _from, %{table: table} = state) do + {:reply, :ets.info(table), state} + end + + @impl true + def handle_info( + {:EXIT, _from, {:name_conflict, {_key, _value}, registry, pid}}, + state + ) do + Logger.info( + "Stopping #{inspect({registry, pid})} as it has already started in another node." + ) + + {:stop, :normal, state} + end + + def do_check_rate(table, bucket, cost) do + now = System.monotonic_time(:millisecond) + + :ets.insert_new(table, {bucket, {@capacity, now}}) + [{^bucket, {level, updated}}] = :ets.lookup(table, bucket) + + refilled = div(now - updated, 1_000) * @refill_per_sec + current = min(@capacity, level + refilled) + + if current >= cost do + level = current - cost + :ets.insert(table, {bucket, {level, now}}) + + {:allow, level} + else + # can retry after 1 second + {:deny, 1} + end + end + + def capacity, do: @capacity + def refill_per_second, do: @refill_per_sec + + def via_tuple(name), + do: {:via, Horde.Registry, {Lightning.HordeRegistry, name}} +end diff --git a/lib/lightning_web/controllers/webhooks_controller.ex b/lib/lightning_web/controllers/webhooks_controller.ex index 8839ada8d5..c5967610d2 100644 --- a/lib/lightning_web/controllers/webhooks_controller.ex +++ b/lib/lightning_web/controllers/webhooks_controller.ex @@ -1,7 +1,6 @@ defmodule LightningWeb.WebhooksController do use LightningWeb, :controller - alias Lightning.Extensions.RateLimiting alias Lightning.Extensions.UsageLimiting.Action alias Lightning.Extensions.UsageLimiting.Context alias Lightning.Services.RateLimiter @@ -10,6 +9,7 @@ defmodule LightningWeb.WebhooksController do alias Lightning.WorkOrders plug :reject_unfetched when action in [:create] + plug :check_rate when action in [:create] # Reject requests with unfetched body params, as they are not supported # See Plug.Parsers in Endpoint for more information. @@ -27,7 +27,38 @@ defmodule LightningWeb.WebhooksController do end end - @spec create(Plug.Conn.t(), %{path: binary()}) :: Plug.Conn.t() + # Note: Plug.Parsers is called before the Router. + def check_rate(conn, _opts) do + with %Workflows.Trigger{enabled: true, workflow: %{project_id: project_id}} <- + conn.assigns.trigger, + :ok <- RateLimiter.limit_request(conn, %Context{project_id: project_id}) do + conn + else + %Workflows.Trigger{enabled: false} -> + conn + |> put_status(:forbidden) + |> json(%{ + message: + "Unable to process request, trigger is disabled. Enable it on OpenFn to allow requests to this endpoint." + }) + |> halt() + + {:error, :too_many_requests, %{text: message}} -> + conn + |> put_status(:too_many_requests) + |> put_resp_header("retry-after", "1") + |> json(%{"error" => message}) + |> halt() + + _no_trigger_or_workflow -> + conn + |> put_status(:not_found) + |> json(%{"error" => "Webhook not found"}) + |> halt() + end + end + + @spec check(Plug.Conn.t(), %{path: binary()}) :: Plug.Conn.t() def check(conn, _params) do put_status(conn, :ok) |> json(%{ @@ -37,30 +68,25 @@ defmodule LightningWeb.WebhooksController do end @spec create(Plug.Conn.t(), %{path: binary()}) :: Plug.Conn.t() - def create(conn, _params) do - with %Workflows.Trigger{enabled: true, workflow: %{project_id: project_id}} = - trigger <- conn.assigns.trigger, - {:ok, without_run?} <- check_skip_run_creation(project_id), - :ok <- - RateLimiter.limit_request( - conn, - %RateLimiting.Context{project_id: project_id}, - [] - ) do - {:ok, work_order} = - WorkOrders.create_for(trigger, - workflow: trigger.workflow, - dataclip: %{ - body: conn.body_params, - request: build_request(conn), - type: :http_request, - project_id: project_id - }, - without_run: without_run? - ) - - conn |> json(%{work_order_id: work_order.id}) - else + def create(%{assigns: %{trigger: trigger}} = conn, _params) do + %Workflows.Trigger{workflow: workflow} = trigger + + case check_skip_run_creation(workflow.project_id) do + {:ok, without_run?} -> + {:ok, work_order} = + WorkOrders.create_for(trigger, + workflow: workflow, + dataclip: %{ + body: conn.body_params, + request: build_request(conn), + type: :http_request, + project_id: workflow.project_id + }, + without_run: without_run? + ) + + json(conn, %{work_order_id: work_order.id}) + {:error, reason, %{text: message}} -> status = if reason == :too_many_requests, @@ -69,19 +95,7 @@ defmodule LightningWeb.WebhooksController do conn |> put_status(status) - |> json(%{"error" => message}) - - nil -> - conn - |> put_status(:not_found) - |> json(%{"error" => "Webhook not found"}) - - _disabled -> - put_status(conn, :forbidden) - |> json(%{ - message: - "Unable to process request, trigger is disabled. Enable it on OpenFn to allow requests to this endpoint." - }) + |> json(%{error: message}) end end diff --git a/mix.exs b/mix.exs index 9572f2d0ec..4cd63b75d1 100644 --- a/mix.exs +++ b/mix.exs @@ -56,7 +56,7 @@ defmodule Lightning.MixProject do [ mod: {Lightning.Application, [:timex]}, extra_applications: [:logger, :runtime_tools, :os_mon, :scrivener], - start_phases: [seed_prom_ex_telemetry: []] + start_phases: [seed_prom_ex_telemetry: [], init_rate_limiter: []] ] end @@ -70,6 +70,7 @@ defmodule Lightning.MixProject do defp deps do [ # {:rexbug, ">= 1.0.0", only: :test}, + {:horde, "~> 0.9.0"}, {:bcrypt_elixir, "~> 3.2"}, {:bodyguard, "~> 2.2"}, {:broadway_kafka, "~> 0.4.2"}, diff --git a/mix.lock b/mix.lock index adbca248fa..f847d3ed37 100644 --- a/mix.lock +++ b/mix.lock @@ -60,6 +60,7 @@ "hammer": {:hex, :hammer, "6.2.1", "5ae9c33e3dceaeb42de0db46bf505bd9c35f259c8defb03390cd7556fea67ee2", [:mix], [{:poolboy, "~> 1.5", [hex: :poolboy, repo: "hexpm", optional: false]}], "hexpm", "b9476d0c13883d2dc0cc72e786bac6ac28911fba7cc2e04b70ce6a6d9c4b2bdc"}, "hammer_backend_mnesia": {:hex, :hammer_backend_mnesia, "0.6.1", "d10d94fc29cbffbf04ecb3c3127d705ce4cc1cecfb9f3d6b18a554c3cae9af2c", [:mix], [{:hammer, "~> 6.1", [hex: :hammer, repo: "hexpm", optional: false]}], "hexpm", "85ad2ef6ebe035207dd9a03a116dc6a7ee43fbd53e8154cf32a1e33b9200fb62"}, "heroicons": {:hex, :heroicons, "0.5.6", "95d730e7179c633df32d95c1fdaaecdf81b0da11010b89b737b843ac176a7eb5", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:phoenix_live_view, ">= 0.18.2", [hex: :phoenix_live_view, repo: "hexpm", optional: false]}], "hexpm", "ca267f02a5fa695a4178a737b649fb6644a2e399639d4ba7964c18e8a58c2352"}, + "horde": {:hex, :horde, "0.9.0", "522342bd7149aeed453c97692a8bca9cf7c9368c5a489afd802e575dc8df54a6", [:mix], [{:delta_crdt, "~> 0.6.2", [hex: :delta_crdt, repo: "hexpm", optional: false]}, {:libring, "~> 1.4", [hex: :libring, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_poller, "~> 0.5.0 or ~> 1.0", [hex: :telemetry_poller, repo: "hexpm", optional: false]}], "hexpm", "fae11e5bc9c980038607d0c3338cdf7f97124a5d5382fd4b6fb6beaab8e214fe"}, "hpax": {:hex, :hpax, "1.0.2", "762df951b0c399ff67cc57c3995ec3cf46d696e41f0bba17da0518d94acd4aac", [:mix], [], "hexpm", "2f09b4c1074e0abd846747329eaa26d535be0eb3d189fa69d812bfb8bfefd32f"}, "httpoison": {:hex, :httpoison, "2.2.1", "87b7ed6d95db0389f7df02779644171d7319d319178f6680438167d7b69b1f3d", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "51364e6d2f429d80e14fe4b5f8e39719cacd03eb3f9a9286e61e216feac2d2df"}, "idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"}, @@ -71,6 +72,7 @@ "junit_formatter": {:hex, :junit_formatter, "3.4.0", "d0e8db6c34dab6d3c4154c3b46b21540db1109ae709d6cf99ba7e7a2ce4b1ac2", [:mix], [], "hexpm", "bb36e2ae83f1ced6ab931c4ce51dd3dbef1ef61bb4932412e173b0cfa259dacd"}, "kafka_protocol": {:hex, :kafka_protocol, "4.1.9", "7c10d9adaba84c6f176f152e6ba8029c46dfb7cb12432587009128836cf9a44a", [:rebar3], [{:crc32cer, "0.1.11", [hex: :crc32cer, repo: "hexpm", optional: false]}], "hexpm", "14f89eed8329ff4c7b5448e318ee20a98bf5c1e5dc41b74b8af459dfb7590cef"}, "libcluster": {:hex, :libcluster, "3.4.1", "271d2da892763bbef53c2872036c936fe8b80111eb1feefb2d30a3bb15c9b4f6", [:mix], [{:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.3", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "1d568157f069c6afa70ec0d736704cf799734bdbb6343f0322af4a980301c853"}, + "libring": {:hex, :libring, "1.7.0", "4f245d2f1476cd7ed8f03740f6431acba815401e40299208c7f5c640e1883bda", [:mix], [], "hexpm", "070e3593cb572e04f2c8470dd0c119bc1817a7a0a7f88229f43cf0345268ec42"}, "makeup": {:hex, :makeup, "1.1.2", "9ba8837913bdf757787e71c1581c21f9d2455f4dd04cfca785c70bbfff1a76a3", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cce1566b81fbcbd21eca8ffe808f33b221f9eee2cbc7a1706fc3da9ff18e6cac"}, "makeup_eex": {:hex, :makeup_eex, "0.1.2", "93a5ef3d28ed753215dba2d59cb40408b37cccb4a8205e53ef9b5319a992b700", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.16 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_html, "~> 0.1.0 or ~> 1.0", [hex: :makeup_html, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "6140eafb28215ad7182282fd21d9aa6dcffbfbe0eb876283bc6b768a6c57b0c3"}, "makeup_elixir": {:hex, :makeup_elixir, "0.16.2", "627e84b8e8bf22e60a2579dad15067c755531fea049ae26ef1020cad58fe9578", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "41193978704763f6bbe6cc2758b84909e62984c7752b3784bd3c218bb341706b"}, diff --git a/test/lightning/webhook_rate_limiter_test.exs b/test/lightning/webhook_rate_limiter_test.exs new file mode 100644 index 0000000000..bc38c82054 --- /dev/null +++ b/test/lightning/webhook_rate_limiter_test.exs @@ -0,0 +1,100 @@ +defmodule Lightning.WebhookRateLimiterTest do + @moduledoc false + use ExUnit.Case + + alias Lightning.WebhookRateLimiter + + @default_capacity 10 + + describe "check_rate/2" do + test "allows up to the capacity and refills on multiple buckets" do + initial_capacity = @default_capacity + bucket1 = "project#{System.unique_integer()}" + bucket2 = "project#{System.unique_integer()}" + + Enum.each(1..initial_capacity, fn i -> + level = initial_capacity - i + assert match?({:allow, ^level}, WebhookRateLimiter.check_rate(bucket1)) + assert match?({:allow, ^level}, WebhookRateLimiter.check_rate(bucket2)) + end) + end + + test "denies after consuming the bucket" do + initial_capacity = @default_capacity + bucket1 = "project#{System.unique_integer()}" + bucket2 = "project#{System.unique_integer()}" + + Enum.each(1..initial_capacity, fn i -> + assert {:allow, level} = WebhookRateLimiter.check_rate(bucket1) + assert level == initial_capacity - i + end) + + assert {:allow, level} = WebhookRateLimiter.check_rate(bucket2) + assert level == initial_capacity - 1 + + assert {:deny, wait_ms} = WebhookRateLimiter.check_rate(bucket1) + assert 0 < wait_ms and wait_ms < 1_000 + end + + # Synthetic cluster not working. + # For testing use manual procedure: + # 0. Disable Endpoint server + # 1. Run node1 on one terminal: iex --sname node1@localhost --cookie hordecookie -S mix phx.server + # 2. Run node2 on another terminal: iex --sname node2@localhost --cookie hordecookie -S mix phx.server + # 3. Call Lightning.WebhookRateLimiter.inspect_table() on both iex and they show the same ets table process and node. + @tag skip: true + test "consumes the bucket remotely" do + {:ok, peer, _node1, node2} = start_nodes(:node1, :node2, ~c"localhost") + + :rpc.call(node2, Application, :ensure_all_started, [:mix]) + :rpc.call(node2, Application, :ensure_all_started, [:lightning]) + + # Copy current code paths to the peer node + :rpc.call(node2, :code, :add_paths, [:code.get_path()]) + + assert [ + {Lightning.DistributedSupervisor, :node1@localhost}, + {Lightning.DistributedSupervisor, :node2@localhost} + ] = Horde.Cluster.members(Lightning.DistributedSupervisor) + + # initial_capacity = @default_capacity + bucket = "project#{System.unique_integer()}" + + dbg(WebhookRateLimiter.check_rate(bucket)) + + # dbg :rpc.block_call(node1, WebhookRateLimiter, :inspect, [WebhookRateLimiter]) + # dbg :rpc.block_call(node2, WebhookRateLimiter, :inspect, [WebhookRateLimiter]) + + # Enum.each(1..initial_capacity-1, fn i -> + # assert {:allow, level} = :rpc.call(node2, WebhookRateLimiter, :check_rate, [bucket, 1]) + # assert level == initial_capacity - i - 1 + # end) + + # assert {:deny, wait_ms} = WebhookRateLimiter.check_rate(bucket) + # assert 0 < wait_ms and wait_ms < 1_000 + + :peer.stop(peer) + end + end + + defp start_nodes(node1, node2, host) do + # Start the main node + node1_sname = :"#{node1}@#{host}" + {:ok, _pid} = Node.start(node1_sname, :shortnames) + true = Node.set_cookie(:delicious_cookie) + cookie = Node.get_cookie() |> to_charlist() + + # Start the peer node + {:ok, peer, node2_sname} = + :peer.start(%{ + name: node2, + host: host, + cookie: cookie, + args: [~c"-setcookie", cookie] + }) + + assert node2_sname in Node.list() + + {:ok, peer, node1_sname, node2_sname} + end +end diff --git a/test/test_helper.exs b/test/test_helper.exs index cc0209588f..9a710135a2 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -54,5 +54,12 @@ Application.put_env(:lightning, Lightning.Extensions, external_metrics: Lightning.Extensions.ExternalMetrics ) +epmd_path = System.find_executable("epmd") +port = Port.open({:spawn_executable, epmd_path}, []) +os_pid = Keyword.get(Port.info(port), :os_pid) + +# Configuring a "shutdown hook" to stop epmd after everything is done. +System.at_exit(fn _ -> System.shell("kill -TERM #{os_pid}") end) + ExUnit.start() Ecto.Adapters.SQL.Sandbox.mode(Lightning.Repo, :manual) From df8b6933718528a887822241ae88e5ba94cf6555 Mon Sep 17 00:00:00 2001 From: Rogerio Pontual Date: Thu, 1 May 2025 22:29:09 +0200 Subject: [PATCH 02/11] Move capacity and refill to config and allow calling with opts --- .iex.exs | 5 ++- config/config.exs | 5 +++ config/test.exs | 2 ++ lib/lightning/application.ex | 15 +++++--- lib/lightning/webhook_rate_limiter.ex | 50 ++++++++++++++++----------- 5 files changed, 51 insertions(+), 26 deletions(-) diff --git a/.iex.exs b/.iex.exs index 4289c885a9..a26dcdf6f4 100644 --- a/.iex.exs +++ b/.iex.exs @@ -1,2 +1,5 @@ -import Ecto.Query +if Code.loaded?(Ecto.Query) do + import Ecto.Query +end + alias Lightning.Repo diff --git a/config/config.exs b/config/config.exs index 896270e90c..ab6fb0f3a0 100644 --- a/config/config.exs +++ b/config/config.exs @@ -30,6 +30,11 @@ config :lightning, LightningWeb.Endpoint, pubsub_server: Lightning.PubSub, live_view: [signing_salt: "EfrmuOUr"] +config :lightning, Lightning.WebhookRateLimiter, + start: false, + capacity: 10, + refill_per_second: 2 + config :lightning, Lightning.Extensions, rate_limiter: Lightning.Extensions.RateLimiter, usage_limiter: Lightning.Extensions.UsageLimiter, diff --git a/config/test.exs b/config/test.exs index 11e0893b9c..73977bb4a0 100644 --- a/config/test.exs +++ b/config/test.exs @@ -53,6 +53,8 @@ config :lightning, LightningWeb.Endpoint, "/8zedVJLxvmGGFoRExE3e870g7CGZZQ1Vq11A5MbQGPKOpK57MahVsPW6Wkkv61n", server: true +config :lightning, Lightning.WebhookRateLimiter, start: true + config :lightning, Lightning.Runtime.RuntimeManager, ws_url: "ws://localhost:4002/worker" diff --git a/lib/lightning/application.ex b/lib/lightning/application.ex index 36a15a7bbb..fcd9636e85 100644 --- a/lib/lightning/application.ex +++ b/lib/lightning/application.ex @@ -7,6 +7,11 @@ defmodule Lightning.Application do require Logger + @rate_limiter_opts Application.compile_env!( + :lightning, + Lightning.WebhookRateLimiter + ) + @impl true def start(_type, _args) do # Initialize ETS table for adapter lookup @@ -181,10 +186,12 @@ defmodule Lightning.Application do end def start_phase(:init_rate_limiter, :normal, _args) do - Horde.DynamicSupervisor.start_child( - Lightning.DistributedSupervisor, - Lightning.WebhookRateLimiter - ) + if @rate_limiter_opts[:start] do + Horde.DynamicSupervisor.start_child( + Lightning.DistributedSupervisor, + {Lightning.WebhookRateLimiter, @rate_limiter_opts} + ) + end :ok end diff --git a/lib/lightning/webhook_rate_limiter.ex b/lib/lightning/webhook_rate_limiter.ex index 294ee3942d..9a506bb629 100644 --- a/lib/lightning/webhook_rate_limiter.ex +++ b/lib/lightning/webhook_rate_limiter.ex @@ -2,9 +2,6 @@ defmodule Lightning.WebhookRateLimiter do @moduledoc false use GenServer - @capacity 10 - @refill_per_sec 2 - require Logger def child_spec(opts) do @@ -17,31 +14,43 @@ defmodule Lightning.WebhookRateLimiter do %{ id: id, - start: {__MODULE__, :start_link, [name]}, + start: {__MODULE__, :start_link, [Keyword.put(opts, :name, name)]}, shutdown: 10_000, restart: :transient } end - def start_link(name) do + def start_link(opts) do + name = Keyword.fetch!(opts, :name) + with {:error, {:already_started, pid}} <- - GenServer.start_link(__MODULE__, [], name: via_tuple(name)) do + GenServer.start_link(__MODULE__, opts, name: via_tuple(name)) do Logger.info("already started at #{inspect(pid)}, returning :ignore") :ignore end end @impl true - def init([]) do + def init(opts) do Process.flag(:trap_exit, true) - {:ok, %{table: :ets.new(:table, [:set])}} + capacity = Keyword.fetch!(opts, :capacity) + refill = Keyword.fetch!(opts, :refill_per_second) + + {:ok, + %{ + table: :ets.new(:table, [:set]), + capacity: capacity, + refill_per_second: refill + }} end - def check_rate(bucket, cost \\ 1, name \\ __MODULE__) do + def check_rate(bucket, opts \\ []) do + name = Keyword.get(opts, :name, __MODULE__) + name |> via_tuple() - |> GenServer.call({:check_rate, bucket, cost}) + |> GenServer.call({:check_rate, bucket, opts}) end def inspect_table(name \\ __MODULE__) do @@ -51,8 +60,8 @@ defmodule Lightning.WebhookRateLimiter do end @impl true - def handle_call({:check_rate, bucket, cost}, _from, %{table: table} = state) do - {:reply, do_check_rate(table, bucket, cost), state} + def handle_call({:check_rate, bucket, opts}, _from, state) do + {:reply, do_check_rate(state, bucket, opts), state} end @impl true @@ -72,17 +81,19 @@ defmodule Lightning.WebhookRateLimiter do {:stop, :normal, state} end - def do_check_rate(table, bucket, cost) do + def do_check_rate(%{table: table} = config, bucket, opts) do now = System.monotonic_time(:millisecond) + capacity = opts[:capacity] || config[:capacity] + refill_per_sec = opts[:refill_per_second] || config[:refill_per_second] - :ets.insert_new(table, {bucket, {@capacity, now}}) + :ets.insert_new(table, {bucket, {capacity, now}}) [{^bucket, {level, updated}}] = :ets.lookup(table, bucket) - refilled = div(now - updated, 1_000) * @refill_per_sec - current = min(@capacity, level + refilled) + refilled = div(now - updated, 1_000) * refill_per_sec + current = min(capacity, level + refilled) - if current >= cost do - level = current - cost + if current >= 1 do + level = current - 1 :ets.insert(table, {bucket, {level, now}}) {:allow, level} @@ -92,9 +103,6 @@ defmodule Lightning.WebhookRateLimiter do end end - def capacity, do: @capacity - def refill_per_second, do: @refill_per_sec - def via_tuple(name), do: {:via, Horde.Registry, {Lightning.HordeRegistry, name}} end From c9e2c8fb3a447862f0978bf1d5d4a5e31651e3d5 Mon Sep 17 00:00:00 2001 From: Rogerio Pontual Date: Fri, 2 May 2025 09:47:53 +0200 Subject: [PATCH 03/11] Remove the replicated and rename the webhook to generic term --- config/config.exs | 2 +- config/test.exs | 2 +- lib/lightning/application.ex | 4 +-- ...limiter.ex => distributed_rate_limiter.ex} | 6 ++-- lib/lightning/extensions/rate_limiting.ex | 3 ++ ....exs => distributed_rate_limiter_test.exs} | 36 +++++++++++-------- .../extensions/rate_limiter_test.exs | 5 +-- test/lightning/rate_limiters_test.exs | 13 +++++++ 8 files changed, 46 insertions(+), 25 deletions(-) rename lib/lightning/{webhook_rate_limiter.ex => distributed_rate_limiter.ex} (95%) rename test/lightning/{webhook_rate_limiter_test.exs => distributed_rate_limiter_test.exs} (68%) create mode 100644 test/lightning/rate_limiters_test.exs diff --git a/config/config.exs b/config/config.exs index ab6fb0f3a0..20748e73d6 100644 --- a/config/config.exs +++ b/config/config.exs @@ -30,7 +30,7 @@ config :lightning, LightningWeb.Endpoint, pubsub_server: Lightning.PubSub, live_view: [signing_salt: "EfrmuOUr"] -config :lightning, Lightning.WebhookRateLimiter, +config :lightning, Lightning.DistributedRateLimiter, start: false, capacity: 10, refill_per_second: 2 diff --git a/config/test.exs b/config/test.exs index 73977bb4a0..5dc4c563a0 100644 --- a/config/test.exs +++ b/config/test.exs @@ -53,7 +53,7 @@ config :lightning, LightningWeb.Endpoint, "/8zedVJLxvmGGFoRExE3e870g7CGZZQ1Vq11A5MbQGPKOpK57MahVsPW6Wkkv61n", server: true -config :lightning, Lightning.WebhookRateLimiter, start: true +config :lightning, Lightning.DistributedRateLimiter, start: true config :lightning, Lightning.Runtime.RuntimeManager, ws_url: "ws://localhost:4002/worker" diff --git a/lib/lightning/application.ex b/lib/lightning/application.ex index fcd9636e85..07d7c318e7 100644 --- a/lib/lightning/application.ex +++ b/lib/lightning/application.ex @@ -9,7 +9,7 @@ defmodule Lightning.Application do @rate_limiter_opts Application.compile_env!( :lightning, - Lightning.WebhookRateLimiter + Lightning.DistributedRateLimiter ) @impl true @@ -189,7 +189,7 @@ defmodule Lightning.Application do if @rate_limiter_opts[:start] do Horde.DynamicSupervisor.start_child( Lightning.DistributedSupervisor, - {Lightning.WebhookRateLimiter, @rate_limiter_opts} + {Lightning.DistributedRateLimiter, @rate_limiter_opts} ) end diff --git a/lib/lightning/webhook_rate_limiter.ex b/lib/lightning/distributed_rate_limiter.ex similarity index 95% rename from lib/lightning/webhook_rate_limiter.ex rename to lib/lightning/distributed_rate_limiter.ex index 9a506bb629..c981872123 100644 --- a/lib/lightning/webhook_rate_limiter.ex +++ b/lib/lightning/distributed_rate_limiter.ex @@ -1,4 +1,4 @@ -defmodule Lightning.WebhookRateLimiter do +defmodule Lightning.DistributedRateLimiter do @moduledoc false use GenServer @@ -98,8 +98,8 @@ defmodule Lightning.WebhookRateLimiter do {:allow, level} else - # can retry after 1 second - {:deny, 1} + wait_ms = 1_000 - (now - updated) + {:deny, wait_ms} end end diff --git a/lib/lightning/extensions/rate_limiting.ex b/lib/lightning/extensions/rate_limiting.ex index aae3a0232d..4f8c891d1e 100644 --- a/lib/lightning/extensions/rate_limiting.ex +++ b/lib/lightning/extensions/rate_limiting.ex @@ -8,6 +8,9 @@ defmodule Lightning.Extensions.RateLimiting do @type message :: Lightning.Extensions.Message.t() defmodule Context do + @moduledoc """ + Context for the object (bucket) under rate limiting. + """ @type t :: %Context{project_id: Ecto.UUID.t()} defstruct [:project_id] diff --git a/test/lightning/webhook_rate_limiter_test.exs b/test/lightning/distributed_rate_limiter_test.exs similarity index 68% rename from test/lightning/webhook_rate_limiter_test.exs rename to test/lightning/distributed_rate_limiter_test.exs index bc38c82054..78a6a1800a 100644 --- a/test/lightning/webhook_rate_limiter_test.exs +++ b/test/lightning/distributed_rate_limiter_test.exs @@ -1,8 +1,8 @@ -defmodule Lightning.WebhookRateLimiterTest do +defmodule Lightning.DistributedRateLimiterTest do @moduledoc false use ExUnit.Case - alias Lightning.WebhookRateLimiter + alias Lightning.DistributedRateLimiter @default_capacity 10 @@ -14,8 +14,16 @@ defmodule Lightning.WebhookRateLimiterTest do Enum.each(1..initial_capacity, fn i -> level = initial_capacity - i - assert match?({:allow, ^level}, WebhookRateLimiter.check_rate(bucket1)) - assert match?({:allow, ^level}, WebhookRateLimiter.check_rate(bucket2)) + + assert match?( + {:allow, ^level}, + DistributedRateLimiter.check_rate(bucket1) + ) + + assert match?( + {:allow, ^level}, + DistributedRateLimiter.check_rate(bucket2) + ) end) end @@ -25,15 +33,15 @@ defmodule Lightning.WebhookRateLimiterTest do bucket2 = "project#{System.unique_integer()}" Enum.each(1..initial_capacity, fn i -> - assert {:allow, level} = WebhookRateLimiter.check_rate(bucket1) + assert {:allow, level} = DistributedRateLimiter.check_rate(bucket1) assert level == initial_capacity - i end) - assert {:allow, level} = WebhookRateLimiter.check_rate(bucket2) + assert {:allow, level} = DistributedRateLimiter.check_rate(bucket2) assert level == initial_capacity - 1 - assert {:deny, wait_ms} = WebhookRateLimiter.check_rate(bucket1) - assert 0 < wait_ms and wait_ms < 1_000 + assert {:deny, wait_ms} = DistributedRateLimiter.check_rate(bucket1) |> dbg + assert 500 < wait_ms and wait_ms <= 1_000 end # Synthetic cluster not working. @@ -41,7 +49,7 @@ defmodule Lightning.WebhookRateLimiterTest do # 0. Disable Endpoint server # 1. Run node1 on one terminal: iex --sname node1@localhost --cookie hordecookie -S mix phx.server # 2. Run node2 on another terminal: iex --sname node2@localhost --cookie hordecookie -S mix phx.server - # 3. Call Lightning.WebhookRateLimiter.inspect_table() on both iex and they show the same ets table process and node. + # 3. Call Lightning.DistributedRateLimiter.inspect_table() on both iex and they show the same ets table process and node. @tag skip: true test "consumes the bucket remotely" do {:ok, peer, _node1, node2} = start_nodes(:node1, :node2, ~c"localhost") @@ -60,17 +68,17 @@ defmodule Lightning.WebhookRateLimiterTest do # initial_capacity = @default_capacity bucket = "project#{System.unique_integer()}" - dbg(WebhookRateLimiter.check_rate(bucket)) + dbg(DistributedRateLimiter.check_rate(bucket)) - # dbg :rpc.block_call(node1, WebhookRateLimiter, :inspect, [WebhookRateLimiter]) - # dbg :rpc.block_call(node2, WebhookRateLimiter, :inspect, [WebhookRateLimiter]) + # dbg :rpc.block_call(node1, DistributedRateLimiter, :inspect, [DistributedRateLimiter]) + # dbg :rpc.block_call(node2, DistributedRateLimiter, :inspect, [DistributedRateLimiter]) # Enum.each(1..initial_capacity-1, fn i -> - # assert {:allow, level} = :rpc.call(node2, WebhookRateLimiter, :check_rate, [bucket, 1]) + # assert {:allow, level} = :rpc.call(node2, DistributedRateLimiter, :check_rate, [bucket, 1]) # assert level == initial_capacity - i - 1 # end) - # assert {:deny, wait_ms} = WebhookRateLimiter.check_rate(bucket) + # assert {:deny, wait_ms} = DistributedRateLimiter.check_rate(bucket) # assert 0 < wait_ms and wait_ms < 1_000 :peer.stop(peer) diff --git a/test/lightning/extensions/rate_limiter_test.exs b/test/lightning/extensions/rate_limiter_test.exs index e32a9f9ac1..af71e33c76 100644 --- a/test/lightning/extensions/rate_limiter_test.exs +++ b/test/lightning/extensions/rate_limiter_test.exs @@ -8,10 +8,7 @@ defmodule Lightning.Extensions.RateLimiterTest do Enum.each(1..100, fn _i -> assert RateLimiter.limit_request( conn, - %Context{ - project_id: Ecto.UUID.generate(), - user_id: Ecto.UUID.generate() - }, + %Context{project_id: Ecto.UUID.generate()}, [] ) == :ok end) diff --git a/test/lightning/rate_limiters_test.exs b/test/lightning/rate_limiters_test.exs new file mode 100644 index 0000000000..47998f50a8 --- /dev/null +++ b/test/lightning/rate_limiters_test.exs @@ -0,0 +1,13 @@ +defmodule Lightning.RateLimitersTest do + use ExUnit.Case, async: true + + alias Lightning.RateLimiters + + describe "Mail" do + test "returns a hit result" do + id = Ecto.UUID.generate() + assert RateLimiters.Mail.hit(id, 1, 1) == {:allow, 1} + assert RateLimiters.Mail.hit(id, 1, 1) == {:deny, 1000} + end + end +end From 024ad8c9f2d3de508d537b5acceb30a5fb37357d Mon Sep 17 00:00:00 2001 From: Rogerio Pontual Date: Fri, 2 May 2025 11:51:16 +0200 Subject: [PATCH 04/11] Fix test case and check for a single process/worker --- mix.exs | 2 +- mix.lock | 2 ++ .../distributed_rate_limiter_test.exs | 27 +++++-------------- 3 files changed, 10 insertions(+), 21 deletions(-) diff --git a/mix.exs b/mix.exs index 4cd63b75d1..7bad42888e 100644 --- a/mix.exs +++ b/mix.exs @@ -134,7 +134,7 @@ defmodule Lightning.MixProject do {:timex, "~> 3.7"}, {:replug, "~> 0.1.0"}, {:phoenix_swoosh, "~> 1.2.1"}, - {:hammer_backend_mnesia, "~> 0.6"}, + {:hammer_backend_mnesia, "~> 0.6.0"}, {:hammer, "~> 6.0"}, {:dotenvy, "~> 0.8.0"}, {:goth, "~> 1.3"}, diff --git a/mix.lock b/mix.lock index f847d3ed37..a21f77f289 100644 --- a/mix.lock +++ b/mix.lock @@ -25,6 +25,7 @@ "db_connection": {:hex, :db_connection, "2.7.0", "b99faa9291bb09892c7da373bb82cba59aefa9b36300f6145c5f201c7adf48ec", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "dcf08f31b2701f857dfc787fbad78223d61a32204f217f15e881dd93e4bdd3ff"}, "decimal": {:hex, :decimal, "2.3.0", "3ad6255aa77b4a3c4f818171b12d237500e63525c2fd056699967a3e7ea20f62", [:mix], [], "hexpm", "a4d66355cb29cb47c3cf30e71329e58361cfcb37c34235ef3bf1d7bf3773aeac"}, "deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"}, + "delta_crdt": {:hex, :delta_crdt, "0.6.5", "c7bb8c2c7e60f59e46557ab4e0224f67ba22f04c02826e273738f3dcc4767adc", [:mix], [{:merkle_map, "~> 0.2.0", [hex: :merkle_map, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c6ae23a525d30f96494186dd11bf19ed9ae21d9fe2c1f1b217d492a7cc7294ae"}, "dialyxir": {:hex, :dialyxir, "1.4.5", "ca1571ac18e0f88d4ab245f0b60fa31ff1b12cbae2b11bd25d207f865e8ae78a", [:mix], [{:erlex, ">= 0.2.7", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "b0fb08bb8107c750db5c0b324fa2df5ceaa0f9307690ee3c1f6ba5b9eb5d35c3"}, "dotenvy": {:hex, :dotenvy, "0.8.0", "777486ad485668317c56afc53a7cbcd74f43e4e34588ba8e95a73e15a360050e", [:mix], [], "hexpm", "1f535066282388cbd109743d337ac46ff0708195780d4b5778bb83491ab1b654"}, "earmark": {:hex, :earmark, "1.4.47", "7e7596b84fe4ebeb8751e14cbaeaf4d7a0237708f2ce43630cfd9065551f94ca", [:mix], [], "hexpm", "3e96bebea2c2d95f3b346a7ff22285bc68a99fbabdad9b655aa9c6be06c698f8"}, @@ -79,6 +80,7 @@ "makeup_erlang": {:hex, :makeup_erlang, "1.0.1", "c7f58c120b2b5aa5fd80d540a89fdf866ed42f1f3994e4fe189abebeab610839", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "8a89a1eeccc2d798d6ea15496a6e4870b75e014d1af514b1b71fa33134f57814"}, "makeup_html": {:hex, :makeup_html, "0.1.1", "c3d4abd39d5f7e925faca72ada6e9cc5c6f5fa7cd5bc0158315832656cf14d7f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "44f2a61bc5243645dd7fafeaa6cc28793cd22f3c76b861e066168f9a5b2c26a4"}, "meck": {:hex, :meck, "0.9.2", "85ccbab053f1db86c7ca240e9fc718170ee5bda03810a6292b5306bf31bae5f5", [:rebar3], [], "hexpm", "81344f561357dc40a8344afa53767c32669153355b626ea9fcbc8da6b3045826"}, + "merkle_map": {:hex, :merkle_map, "0.2.1", "01a88c87a6b9fb594c67c17ebaf047ee55ffa34e74297aa583ed87148006c4c8", [:mix], [], "hexpm", "fed4d143a5c8166eee4fa2b49564f3c4eace9cb252f0a82c1613bba905b2d04d"}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, "mime": {:hex, :mime, "1.6.0", "dabde576a497cef4bbdd60aceee8160e02a6c89250d6c0b29e56c0dfb00db3d2", [:mix], [], "hexpm", "31a1a8613f8321143dde1dafc36006a17d28d02bdfecb9e95a880fa7aabd19a7"}, "mimerl": {:hex, :mimerl, "1.3.0", "d0cd9fc04b9061f82490f6581e0128379830e78535e017f7780f37fea7545726", [:rebar3], [], "hexpm", "a1e15a50d1887217de95f0b9b0793e32853f7c258a5cd227650889b38839fe9d"}, diff --git a/test/lightning/distributed_rate_limiter_test.exs b/test/lightning/distributed_rate_limiter_test.exs index 78a6a1800a..664edeb29f 100644 --- a/test/lightning/distributed_rate_limiter_test.exs +++ b/test/lightning/distributed_rate_limiter_test.exs @@ -40,18 +40,16 @@ defmodule Lightning.DistributedRateLimiterTest do assert {:allow, level} = DistributedRateLimiter.check_rate(bucket2) assert level == initial_capacity - 1 - assert {:deny, wait_ms} = DistributedRateLimiter.check_rate(bucket1) |> dbg + assert {:deny, wait_ms} = DistributedRateLimiter.check_rate(bucket1) assert 500 < wait_ms and wait_ms <= 1_000 end - # Synthetic cluster not working. - # For testing use manual procedure: + # For testing the replication use manual procedure: # 0. Disable Endpoint server # 1. Run node1 on one terminal: iex --sname node1@localhost --cookie hordecookie -S mix phx.server # 2. Run node2 on another terminal: iex --sname node2@localhost --cookie hordecookie -S mix phx.server # 3. Call Lightning.DistributedRateLimiter.inspect_table() on both iex and they show the same ets table process and node. - @tag skip: true - test "consumes the bucket remotely" do + test "works on top of a single worker of a distributed dynamic supervisor" do {:ok, peer, _node1, node2} = start_nodes(:node1, :node2, ~c"localhost") :rpc.call(node2, Application, :ensure_all_started, [:mix]) @@ -65,21 +63,10 @@ defmodule Lightning.DistributedRateLimiterTest do {Lightning.DistributedSupervisor, :node2@localhost} ] = Horde.Cluster.members(Lightning.DistributedSupervisor) - # initial_capacity = @default_capacity - bucket = "project#{System.unique_integer()}" - - dbg(DistributedRateLimiter.check_rate(bucket)) - - # dbg :rpc.block_call(node1, DistributedRateLimiter, :inspect, [DistributedRateLimiter]) - # dbg :rpc.block_call(node2, DistributedRateLimiter, :inspect, [DistributedRateLimiter]) - - # Enum.each(1..initial_capacity-1, fn i -> - # assert {:allow, level} = :rpc.call(node2, DistributedRateLimiter, :check_rate, [bucket, 1]) - # assert level == initial_capacity - i - 1 - # end) - - # assert {:deny, wait_ms} = DistributedRateLimiter.check_rate(bucket) - # assert 0 < wait_ms and wait_ms < 1_000 + assert [{:undefined, _pid, :worker, [Lightning.DistributedRateLimiter]}] = + Horde.DynamicSupervisor.which_children( + Lightning.DistributedSupervisor + ) :peer.stop(peer) end From 1bee752b6ae48c553e90045268fb1e8bbefa511f Mon Sep 17 00:00:00 2001 From: Rogerio Pontual Date: Fri, 2 May 2025 16:43:17 +0200 Subject: [PATCH 05/11] Remove unused and unrelated change to Webhook rate limiter --- lib/lightning/rate_limiters.ex | 52 --------------------------- test/lightning/rate_limiters_test.exs | 13 ------- 2 files changed, 65 deletions(-) delete mode 100644 lib/lightning/rate_limiters.ex delete mode 100644 test/lightning/rate_limiters_test.exs diff --git a/lib/lightning/rate_limiters.ex b/lib/lightning/rate_limiters.ex deleted file mode 100644 index 4db0763cbf..0000000000 --- a/lib/lightning/rate_limiters.ex +++ /dev/null @@ -1,52 +0,0 @@ -defmodule Lightning.RateLimiters do - @moduledoc false - - defmodule Mail do - @moduledoc false - - # WARNING: When changing the algorithm, you must also update the mnesia table name. - # The default is to use __MODULE__, passing `:table` to the `use Hammer` macro - # allows you to specify a custom table name. - use Hammer, - backend: Hammer.Mnesia, - algorithm: :leaky_bucket, - table: :mail_limiter - - @type hit_result :: - {:allow, - %{ - count: non_neg_integer(), - time_scale: non_neg_integer(), - rate_limit: non_neg_integer() - }} - | {:deny, non_neg_integer()} - end - - @spec hit({:failure_email, String.t(), String.t()}) :: Mail.hit_result() - def hit({:failure_email, workflow_id, user_id}) do - [time_scale: time_scale, rate_limit: rate_limit] = - Application.fetch_env!(:lightning, Lightning.FailureAlerter) - - Mail.hit("#{workflow_id}::#{user_id}", time_scale, rate_limit) - |> case do - {:allow, count} -> - {:allow, %{count: count, time_scale: time_scale, rate_limit: rate_limit}} - - {:deny, count} -> - {:deny, count} - end - end - - def child_spec(opts) do - %{ - id: __MODULE__, - start: {__MODULE__, :start_link, [opts]}, - type: :supervisor - } - end - - def start_link(opts) do - children = [{Mail, opts}] - Supervisor.start_link(children, strategy: :one_for_one) - end -end diff --git a/test/lightning/rate_limiters_test.exs b/test/lightning/rate_limiters_test.exs deleted file mode 100644 index 47998f50a8..0000000000 --- a/test/lightning/rate_limiters_test.exs +++ /dev/null @@ -1,13 +0,0 @@ -defmodule Lightning.RateLimitersTest do - use ExUnit.Case, async: true - - alias Lightning.RateLimiters - - describe "Mail" do - test "returns a hit result" do - id = Ecto.UUID.generate() - assert RateLimiters.Mail.hit(id, 1, 1) == {:allow, 1} - assert RateLimiters.Mail.hit(id, 1, 1) == {:deny, 1000} - end - end -end From cb615cbf63ef7e17aa7851d4f43170851c8cd5ed Mon Sep 17 00:00:00 2001 From: Rogerio Pontual Date: Fri, 2 May 2025 17:03:00 +0200 Subject: [PATCH 06/11] Tag distributed test as such to avoid flaky on regular ones --- test/lightning/distributed_rate_limiter_test.exs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/lightning/distributed_rate_limiter_test.exs b/test/lightning/distributed_rate_limiter_test.exs index 664edeb29f..563c79b86f 100644 --- a/test/lightning/distributed_rate_limiter_test.exs +++ b/test/lightning/distributed_rate_limiter_test.exs @@ -44,11 +44,12 @@ defmodule Lightning.DistributedRateLimiterTest do assert 500 < wait_ms and wait_ms <= 1_000 end - # For testing the replication use manual procedure: + # For testing the replication use manual procedure or use this test case isolated to avoid interfering the tests above: # 0. Disable Endpoint server # 1. Run node1 on one terminal: iex --sname node1@localhost --cookie hordecookie -S mix phx.server # 2. Run node2 on another terminal: iex --sname node2@localhost --cookie hordecookie -S mix phx.server # 3. Call Lightning.DistributedRateLimiter.inspect_table() on both iex and they show the same ets table process and node. + @tag :dist_integration test "works on top of a single worker of a distributed dynamic supervisor" do {:ok, peer, _node1, node2} = start_nodes(:node1, :node2, ~c"localhost") From 66469a25cb51adde29272228643fb463a3074a9d Mon Sep 17 00:00:00 2001 From: Rogerio Pontual Date: Fri, 2 May 2025 17:24:42 +0200 Subject: [PATCH 07/11] Improve coverage --- test/lightning/distributed_rate_limiter_test.exs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/test/lightning/distributed_rate_limiter_test.exs b/test/lightning/distributed_rate_limiter_test.exs index 563c79b86f..0971ba52ba 100644 --- a/test/lightning/distributed_rate_limiter_test.exs +++ b/test/lightning/distributed_rate_limiter_test.exs @@ -6,6 +6,20 @@ defmodule Lightning.DistributedRateLimiterTest do @default_capacity 10 + describe "inspect_table/0" do + test "shows the process info of the ets" do + %{table: table} = + Horde.DynamicSupervisor.which_children(Lightning.DistributedSupervisor) + |> then(fn [{:undefined, pid, :worker, _name}] -> + :sys.get_state(pid) + end) + + ets_info = :ets.info(table) + assert ^ets_info = DistributedRateLimiter.inspect_table() + assert Keyword.has_key?(ets_info, :node) + end + end + describe "check_rate/2" do test "allows up to the capacity and refills on multiple buckets" do initial_capacity = @default_capacity From 83be24a57aeda7b8fff11105a83ddf896a042bd5 Mon Sep 17 00:00:00 2001 From: Rogerio Pontual Date: Fri, 2 May 2025 19:03:24 +0200 Subject: [PATCH 08/11] Changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index de01a454d8..bca3283ea1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,8 @@ and this project adheres to ### Added +- Support a rate-limiter on the /i endpoint + [#3185](https://github.com/OpenFn/lightning/issues/3185) - Added a custom metric to track projects that could benefit from additional worker pods. [#3189](https://github.com/OpenFn/lightning/issues/3189) From 1271253bc962bb23fe534371377a50ffe1f8d63f Mon Sep 17 00:00:00 2001 From: Rogerio Pontual Date: Tue, 6 May 2025 08:35:04 +0200 Subject: [PATCH 09/11] Adds a script for local dev clusters --- bin/local_cluster | 173 ++++++++++++++++++ config/dev.exs | 2 + lib/lightning/config/bootstrap.ex | 117 +++++++----- .../distributed_rate_limiter_test.exs | 11 +- 4 files changed, 254 insertions(+), 49 deletions(-) create mode 100755 bin/local_cluster diff --git a/bin/local_cluster b/bin/local_cluster new file mode 100755 index 0000000000..3762e3f9ed --- /dev/null +++ b/bin/local_cluster @@ -0,0 +1,173 @@ +#!/usr/bin/env bash + +COOKIE="rate-limiter-secret" + +# Function to show usage +show_usage() { + echo "Usage:" + echo " $0 [--proxy] --count Start local cluster (default: 2 instances)" + echo " $0 connect Connect to a specific node (1-4)" + echo "" + echo "Options:" + echo " --proxy Start a Caddy reverse proxy on port 4000 (nodes will start from 4001)" + echo " --count Number of nodes to start (1-4, default: 2)" + exit 1 +} + +# Handle connect subcommand +if [ "$1" = "connect" ]; then + if [ -z "$2" ] || ! [[ "$2" =~ ^[1-4]$ ]]; then + echo "Error: Please specify a valid node number (1-4)" + show_usage + fi + + NODE_NUM=$2 + echo "Connecting to node${NODE_NUM}@127.0.0.1..." + exec iex --name "remote_shell${NODE_NUM}@127.0.0.1" --cookie "${COOKIE}" --remsh "node${NODE_NUM}@127.0.0.1" + # The exec command replaces the current process, so we don't need an explicit exit + # If we reach this point, it means the exec failed, so we'll exit with its status code + exit $? +fi + +# Parse arguments +USE_PROXY=false +INSTANCES=2 + +while [[ $# -gt 0 ]]; do + case $1 in + --proxy) + USE_PROXY=true + shift + ;; + --count) + if [ -z "$2" ] || ! [[ "$2" =~ ^[0-9]+$ ]]; then + echo "Error: --count requires a numeric argument" + show_usage + fi + INSTANCES=$2 + shift 2 + ;; + *) + echo "Unknown argument: $1" + show_usage + ;; + esac +done + +# Validate number of instances +if ! [[ "$INSTANCES" =~ ^[0-9]+$ ]]; then + echo "Error: Number of instances must be a positive integer" + show_usage +fi + +if [ "$INSTANCES" -lt 1 ] || [ "$INSTANCES" -gt 4 ]; then + echo "Error: Number of instances must be between 1 and 4" + show_usage +fi + +# Check for Caddy if proxy is requested +if [ "$USE_PROXY" = true ]; then + if ! command -v caddy &>/dev/null; then + echo "Error: Caddy is required for proxy mode but it's not installed" + echo "Please install Caddy first:" + echo " Mac: brew install caddy" + echo " Linux: sudo apt install caddy" + echo " Or visit: https://caddyserver.com/docs/install" + exit 1 + fi +fi + +# Array to store background PIDs +declare -a PIDS + +# Colors for different processes +declare -a COLORS=( + "\033[0;36m" # Cyan + "\033[0;32m" # Green + "\033[0;35m" # Purple + "\033[0;33m" # Yellow + "\033[0;37m" # Gray (for proxy) +) +RESET="\033[0m" + +# Cleanup function to kill all child processes +cleanup() { + echo "Shutting down all processes..." + for pid in "${PIDS[@]}"; do + kill "$pid" 2>/dev/null + done + exit 0 +} + +# Set up trap for cleanup +trap cleanup INT TERM + +# Function to run a command with colored output +run_with_color() { + local color=$1 + local prefix=$2 + shift 2 + # Run the command and color its output + "$@" 2>&1 | while read -r line; do + echo -e "${color}${prefix} | ${line}${RESET}" + done +} + +# Create Caddy configuration if proxy is enabled +if [ "$USE_PROXY" = true ]; then + BASE_PORT=4001 + CADDY_CONFIG=$(mktemp) + echo "Creating Caddy configuration..." + cat >"$CADDY_CONFIG" <" + +# Wait for all background processes +wait diff --git a/config/dev.exs b/config/dev.exs index cb37276e1f..9ad4a26f2b 100644 --- a/config/dev.exs +++ b/config/dev.exs @@ -48,6 +48,8 @@ config :lightning, Lightning.Vault, config :lightning, Lightning.Runtime.RuntimeManager, start: true +config :lightning, Lightning.DistributedRateLimiter, start: true + config :lightning, :workers, private_key: """ -----BEGIN PRIVATE KEY----- diff --git a/lib/lightning/config/bootstrap.ex b/lib/lightning/config/bootstrap.ex index 0bf1f15013..8da7c67058 100644 --- a/lib/lightning/config/bootstrap.ex +++ b/lib/lightning/config/bootstrap.ex @@ -114,7 +114,8 @@ defmodule Lightning.Config.Bootstrap do "RTM", &Utils.ensure_boolean/1, Utils.get_env([:lightning, Lightning.Runtime.RuntimeManager, :start]) - ) + ), + port: env!("RTM_PORT", :integer, 2222) config :lightning, :workers, private_key: @@ -408,6 +409,10 @@ defmodule Lightning.Config.Bootstrap do config :logger, :level, log_level end + if log_level == :debug do + config :libcluster, debug: true + end + database_url = env!("DATABASE_URL", :string, nil) config :lightning, Lightning.Repo, @@ -418,7 +423,6 @@ defmodule Lightning.Config.Bootstrap do queue_interval: env!("DATABASE_QUEUE_INTERVAL", :integer, 1000) host = env!("URL_HOST", :string, "example.com") - port = env!("PORT", :integer, 4000) url_port = env!("URL_PORT", :integer, 443) config :lightning, @@ -464,18 +468,6 @@ defmodule Lightning.Config.Bootstrap do You can generate one by calling: mix phx.gen.secret """ - listen_address = - env!( - "LISTEN_ADDRESS", - fn address -> - address - |> String.split(".") - |> Enum.map(&String.to_integer/1) - |> List.to_tuple() - end, - {127, 0, 0, 1} - ) - origins = env!( "ORIGINS", @@ -490,40 +482,10 @@ defmodule Lightning.Config.Bootstrap do url_scheme = env!("URL_SCHEME", :string, "https") - idle_timeout = - env!( - "IDLE_TIMEOUT", - fn str -> - case Integer.parse(str) do - :error -> 60_000 - {val, _} -> val * 1_000 - end - end, - 60_000 - ) - config :lightning, LightningWeb.Endpoint, url: [host: host, port: url_port, scheme: url_scheme], secret_key_base: secret_key_base, check_origin: origins, - http: [ - ip: listen_address, - port: port, - compress: true, - protocol_options: [ - # Note that if a request is more than 10x the max dataclip size, we cut - # the connection immediately to prevent memory issues via the - # :max_skip_body_length setting. - max_skip_body_length: - Application.get_env( - :lightning, - :max_dataclip_size_bytes, - 10_000_000 - ) * - 10, - idle_timeout: idle_timeout - ] - ], server: true end @@ -539,6 +501,8 @@ defmodule Lightning.Config.Bootstrap do assert_receive_timeout: env!("ASSERT_RECEIVE_TIMEOUT", :integer, 1000) end + config :lightning, LightningWeb.Endpoint, http: http_config(config_env()) + config :sentry, dsn: env!("SENTRY_DSN", :string, nil), filter: Lightning.SentryEventFilter, @@ -814,4 +778,69 @@ defmodule Lightning.Config.Bootstrap do value -> value end end + + defp http_config(env, opts \\ []) + # Production environment configuration + defp http_config(:prod, opts) do + port = Keyword.get(opts, :port) || env!("PORT", :integer, 4000) + + listen_address = + env!( + "LISTEN_ADDRESS", + fn address -> + address + |> String.split(".") + |> Enum.map(&String.to_integer/1) + |> List.to_tuple() + end, + {127, 0, 0, 1} + ) + + idle_timeout = + env!( + "IDLE_TIMEOUT", + fn str -> + case Integer.parse(str) do + :error -> 60_000 + {val, _} -> val * 1_000 + end + end, + 60_000 + ) + + [ + ip: listen_address, + port: port, + compress: true, + protocol_options: [ + # Note that if a request is more than 10x the max dataclip size, we cut + # the connection immediately to prevent memory issues via the + # :max_skip_body_length setting. + max_skip_body_length: + Application.get_env( + :lightning, + :max_dataclip_size_bytes, + 10_000_000 + ) * 10, + idle_timeout: idle_timeout + ] + ] + end + + # Default configuration for non-production environments + defp http_config(_env, opts) do + port = + Keyword.get(opts, :port) || + env!( + "PORT", + :integer, + get_env(:lightning, [LightningWeb.Endpoint, :http, :port]) + ) + + [ + ip: {0, 0, 0, 0}, + port: port, + compress: true + ] + end end diff --git a/test/lightning/distributed_rate_limiter_test.exs b/test/lightning/distributed_rate_limiter_test.exs index 0971ba52ba..a36defc9d3 100644 --- a/test/lightning/distributed_rate_limiter_test.exs +++ b/test/lightning/distributed_rate_limiter_test.exs @@ -58,11 +58,12 @@ defmodule Lightning.DistributedRateLimiterTest do assert 500 < wait_ms and wait_ms <= 1_000 end - # For testing the replication use manual procedure or use this test case isolated to avoid interfering the tests above: - # 0. Disable Endpoint server - # 1. Run node1 on one terminal: iex --sname node1@localhost --cookie hordecookie -S mix phx.server - # 2. Run node2 on another terminal: iex --sname node2@localhost --cookie hordecookie -S mix phx.server - # 3. Call Lightning.DistributedRateLimiter.inspect_table() on both iex and they show the same ets table process and node. + # Run this distributed integration test case separately to avoid interfering with the tests above. + # For testing the replication use `bin/local_cluster` on the shell: + # 1. In one shell run `./bin/local_cluster` + # 2. On another shell run `./bin/local_cluster connect 2` + # 3. Type `Lightning.DistributedRateLimiter.inspect_table()` to see that the ETS table is distributed + # (on node1 or vice-versa if it was spawned on node2 when you connect to node 1). @tag :dist_integration test "works on top of a single worker of a distributed dynamic supervisor" do {:ok, peer, _node1, node2} = start_nodes(:node1, :node2, ~c"localhost") From b56a424cbb7535f3c731b8262fcbdceede430d04 Mon Sep 17 00:00:00 2001 From: Rogerio Pontual Date: Mon, 26 May 2025 12:09:52 +0200 Subject: [PATCH 10/11] exclude-cluster-integration-test --- bin/ci_tests | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bin/ci_tests b/bin/ci_tests index dd0efab1d3..1e9f480877 100755 --- a/bin/ci_tests +++ b/bin/ci_tests @@ -26,7 +26,7 @@ set -euo pipefail set +e -mix coveralls.json --export-coverage coverage -o test/reports +mix coveralls.json --exclude dist_integration --export-coverage coverage -o test/reports EXIT_CODE=$? set -e @@ -39,7 +39,7 @@ fi # Only retry if exit code is exactly 2 if [ $EXIT_CODE -eq 2 ]; then set +e - mix coveralls.json --import-cover coverage --failed -o test/reports + mix coveralls.json --exclude dist_integration --import-cover coverage --failed -o test/reports EXIT_CODE=$? # Overwrite with the second run's exit code set -e # Rename the failed test report if it exists From 93a2320bd5a3cde687685e2d30c09a661a3cb4a4 Mon Sep 17 00:00:00 2001 From: Rogerio Pontual Date: Mon, 2 Jun 2025 09:30:17 +0200 Subject: [PATCH 11/11] Fix child spec to restart process automatically --- lib/lightning/distributed_rate_limiter.ex | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/lightning/distributed_rate_limiter.ex b/lib/lightning/distributed_rate_limiter.ex index c981872123..deb7f88736 100644 --- a/lib/lightning/distributed_rate_limiter.ex +++ b/lib/lightning/distributed_rate_limiter.ex @@ -15,8 +15,7 @@ defmodule Lightning.DistributedRateLimiter do %{ id: id, start: {__MODULE__, :start_link, [Keyword.put(opts, :name, name)]}, - shutdown: 10_000, - restart: :transient + shutdown: 10_000 } end