From ecb253e0a87eb7e3f84fd9568ea5782c155d38a4 Mon Sep 17 00:00:00 2001 From: Arjan Vermunt Date: Mon, 17 Mar 2025 22:28:25 +0100 Subject: [PATCH 01/19] initial version of config class --- spec/Dockerfile | 3 + spec/amqproxy/config_spec.cr | 71 ++++++++++++++++++++ spec/amqproxy/server_spec.cr | 18 ++--- spec/config.ini | 8 +++ spec/config_empty.ini | 0 src/amqproxy/cli.cr | 5 +- src/amqproxy/config.cr | 126 +++++++++++++++++++++++++++++++++++ 7 files changed, 221 insertions(+), 10 deletions(-) create mode 100644 spec/amqproxy/config_spec.cr create mode 100644 spec/config.ini create mode 100644 spec/config_empty.ini create mode 100644 src/amqproxy/config.cr diff --git a/spec/Dockerfile b/spec/Dockerfile index 6a46850..a17ff95 100644 --- a/spec/Dockerfile +++ b/spec/Dockerfile @@ -11,6 +11,9 @@ RUN shards install COPY src/ src/ COPY spec/ spec/ +COPY spec/config.ini /tmp/config.ini +COPY spec/config_empty.ini /tmp/config_empty.ini + COPY spec/entrypoint.sh /entrypoint.sh RUN chmod +x /entrypoint.sh ENTRYPOINT ["/entrypoint.sh"] diff --git a/spec/amqproxy/config_spec.cr b/spec/amqproxy/config_spec.cr new file mode 100644 index 0000000..8cb1ac1 --- /dev/null +++ b/spec/amqproxy/config_spec.cr @@ -0,0 +1,71 @@ +require "spec" +require "../../src/amqproxy/config" + +describe AMQProxy::Config do + it "loads defaults when no env vars are set" do + previous_argv = ARGV.clone + ARGV.clear + + config = AMQProxy::Config.load_with_cli(ARGV) + + config.listen_address.should eq "127.0.0.1" + config.listen_port.should eq 5673 + + # Restore ARGV + ARGV.clear + ARGV.concat(previous_argv) + end + + it "reads from environment variables" do + previous_argv = ARGV.clone + ARGV.clear + + ENV["LISTEN_ADDRESS"] = "example.com" + ENV["LISTEN_PORT"] = "5674" + + config = AMQProxy::Config.load_with_cli(ARGV) + + config.listen_address.should eq "example.com" + config.listen_port.should eq 5674 + + # Clean up + ENV.delete("LISTEN_ADDRESS") + ENV.delete("LISTEN_PORT") + + # Restore ARGV + ARGV.clear + ARGV.concat(previous_argv) + end + + it "reads from command line arguments and overrules env vars" do + previous_argv = ARGV.clone + ARGV.clear + + ENV["LISTEN_ADDRESS"] = "example_env.com" + ARGV.concat(["--listen=example_arg.com"]) + + config = AMQProxy::Config.load_with_cli(ARGV) + + config.listen_address.should eq "example_arg.com" + + # Clean Up + ENV.delete("LISTEN_ADDRESS") + + # Restore ARGV + ARGV.clear + ARGV.concat(previous_argv) + end + + it "reads from empty config file returning default configuration" do + previous_argv = ARGV.clone + ARGV.clear + + config = AMQProxy::Config.load_with_cli(ARGV, "/tmp/config_empty.ini") + + config.listen_address.should eq "localhost" + + # Restore ARGV + ARGV.clear + ARGV.concat(previous_argv) + end +end diff --git a/spec/amqproxy/server_spec.cr b/spec/amqproxy/server_spec.cr index 7ffaaac..bd8c4ca 100644 --- a/spec/amqproxy/server_spec.cr +++ b/spec/amqproxy/server_spec.cr @@ -18,7 +18,7 @@ describe AMQProxy::Server do ch.basic_publish_confirm "foobar", "non-existing" end end - sleep 0.1 + sleep 0.1.seconds server.upstream_connections.should eq 1 end end @@ -55,7 +55,7 @@ describe AMQProxy::Server do queue.publish_confirm(message_payload) end end - sleep 0.1 + sleep 0.1.seconds AMQP::Client.start(proxy_url) do |conn| channel = conn.channel @@ -66,7 +66,7 @@ describe AMQProxy::Server do num_received_messages += 1 end end - sleep 0.1 + sleep 0.1.seconds end num_received_messages.should eq num_messages_to_publish @@ -84,7 +84,7 @@ describe AMQProxy::Server do server.client_connections.should eq 1 server.upstream_connections.should eq 2 end - sleep 0.1 + sleep 0.1.seconds server.client_connections.should eq 0 server.upstream_connections.should eq 2 end @@ -103,7 +103,7 @@ describe AMQProxy::Server do server.client_connections.should eq(1) server.upstream_connections.should eq(1) end - sleep 0.1 + sleep 0.1.seconds server.client_connections.should eq(0) server.upstream_connections.should eq(1) end @@ -116,7 +116,7 @@ describe AMQProxy::Server do AMQP::Client.start(proxy_url) do |conn| conn.channel end - sleep 2 + sleep 2.seconds server.client_connections.should eq(0) server.upstream_connections.should eq(1) ensure @@ -137,7 +137,7 @@ describe AMQProxy::Server do 10.times do server.client_connections.should be >= 1 server.upstream_connections.should be >= 1 - sleep 1 + sleep 1.seconds end end wait_for_channel.send(5) # send 5 @@ -149,7 +149,7 @@ describe AMQProxy::Server do AMQP::Client.start(proxy_url) do |conn| conn.channel wait_for_channel.send(2) # send 2 - sleep 2 + sleep 2.seconds end wait_for_channel.send(3) # send 3 end @@ -165,7 +165,7 @@ describe AMQProxy::Server do AMQP::Client.start(proxy_url) do |conn| conn.channel wait_for_channel.send(-1) # send 4 (this should not happen) - sleep 1 + sleep 1.seconds end rescue ex # ex.message.should be "Error reading socket: Connection reset by peer" diff --git a/spec/config.ini b/spec/config.ini new file mode 100644 index 0000000..5332912 --- /dev/null +++ b/spec/config.ini @@ -0,0 +1,8 @@ +[main] +log_level = info +idle_connection_timeout = 5 +upstream = amqp://localhost:5672 + +[listen] +address = 127.0.0.1 +port = 5673 \ No newline at end of file diff --git a/spec/config_empty.ini b/spec/config_empty.ini new file mode 100644 index 0000000..e69de29 diff --git a/src/amqproxy/cli.cr b/src/amqproxy/cli.cr index 18dc78f..a1ebd7b 100644 --- a/src/amqproxy/cli.cr +++ b/src/amqproxy/cli.cr @@ -9,6 +9,7 @@ require "log" class AMQProxy::CLI Log = ::Log.for(self) + @config : Config @listen_address = ENV["LISTEN_ADDRESS"]? || "localhost" @listen_port = ENV["LISTEN_PORT"]? || 5673 @http_port = ENV["HTTP_PORT"]? || 15673 @@ -52,6 +53,8 @@ class AMQProxy::CLI def run(argv) raise "run cant be called multiple times" unless @server.nil? + @config = Config.new + p = OptionParser.parse(argv) do |parser| parser.banner = "Usage: amqproxy [options] [amqp upstream url]" parser.on("-l ADDRESS", "--listen=ADDRESS", "Address to listen on (default is localhost)") do |v| @@ -108,7 +111,7 @@ class AMQProxy::CLI # wait until all client connections are closed until server.client_connections.zero? - sleep 0.2 + sleep 0.2.seconds end Log.info { "No clients left. Exiting." } end diff --git a/src/amqproxy/config.cr b/src/amqproxy/config.cr new file mode 100644 index 0000000..d67eeb8 --- /dev/null +++ b/src/amqproxy/config.cr @@ -0,0 +1,126 @@ +require "ini" +require "log" +require "option_parser" + +module AMQProxy + class Config + # Define instance variables and getters + getter listen_address : String + getter listen_port : Int32 + getter http_port : Int32 + getter log_level : Log::Severity + getter idle_connection_timeout : Int32 + getter term_timeout : Int32 + getter term_client_close_timeout : Int32 + getter upstream : String? + + private def initialize( + listen_address = "localhost", + listen_port = 5673, + http_port = 15673, + log_level = Log::Severity::Info, + idle_connection_timeout = 5, + term_timeout = -1, + term_client_close_timeout = 0, + @upstream = nil + ) + @listen_address = listen_address + @listen_port = listen_port + @http_port = http_port + @log_level = log_level + @idle_connection_timeout = idle_connection_timeout + @term_timeout = term_timeout + @term_client_close_timeout = term_client_close_timeout + end + + def load_from_file(path) # ameba:disable Metrics/CyclomaticComplexity + INI.parse(File.read(path)).each do |name, section| + case name + when "main", "" + section.each do |key, value| + case key + when "upstream" then @upstream = value + when "log_level" then @log_level = ::Log::Severity.parse(value) + when "idle_connection_timeout" then @idle_connection_timeout = value.to_i + when "term_timeout" then @term_timeout = value.to_i + when "term_client_close_timeout" then @term_client_close_timeout = value.to_i + else raise "Unsupported config #{name}/#{key}" + end + end + when "listen" + section.each do |key, value| + case key + when "port" then @listen_port = value.to_i + when "bind", "address" then @listen_address = value + when "log_level" then @log_level = ::Log::Severity.parse(value) + else raise "Unsupported config #{name}/#{key}" + end + end + else raise "Unsupported config section #{name}" + end + end + rescue ex + abort ex.message + end + + # Override config using environment variables + def load_with_env + @listen_address = ENV["LISTEN_ADDRESS"]? || @listen_address + @listen_port = ENV["LISTEN_PORT"]?.try &.to_i || @listen_port + @http_port = ENV["HTTP_PORT"]?.try &.to_i || @http_port + @log_level = Log::Severity.parse(ENV["LOG_LEVEL"]? || @log_level.to_s) + @idle_connection_timeout = ENV["IDLE_CONNECTION_TIMEOUT"]?.try &.to_i || @idle_connection_timeout + @term_timeout = ENV["TERM_TIMEOUT"]?.try &.to_i || @term_timeout + @term_client_close_timeout = ENV["TERM_CLIENT_CLOSE_TIMEOUT"]?.try &.to_i || @term_client_close_timeout + @upstream = ENV["UPSTREAM"]? || @upstream + end + + def load_from_options(args) + p = OptionParser.parse(args) do |parser| + parser.on("-l ADDRESS", "--listen=ADDRESS", "Address to listen on (default is localhost)") do |v| + @listen_address = v + end + parser.on("-p PORT", "--port=PORT", "Port to listen on (default: 5673)") { |v| @listen_port = v.to_i } + parser.on("-b PORT", "--http-port=PORT", "HTTP Port to listen on (default: 15673)") { |v| @http_port = v.to_i } + parser.on("-t IDLE_CONNECTION_TIMEOUT", "--idle-connection-timeout=SECONDS", "Maxiumum time in seconds an unused pooled connection stays open (default 5s)") do |v| + @idle_connection_timeout = v.to_i + end + parser.on("--term-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to gracefully close their sockets after Close has been sent (default: infinite)") do |v| + @term_timeout = v.to_i + end + parser.on("--term-client-close-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to send Close beforing sending Close to clients (default: 0s)") do |v| + @term_client_close_timeout = v.to_i + end + end + end + + # Override config using command-line arguments + def self.load_with_cli(args, path = "config.ini") : Config + config = new + + # First, load config file + config.load_from_file(path) + + # Then, load environment variables + config.load_with_env + + config.load_from_options(args) + + config + end + + # Print configuration for debugging + def to_s + <<-CONFIG + listen_address: #{@listen_address} + listen_port: #{@listen_port} + http_port: #{@http_port} + log_level: #{@log_level} + idle_connection_timeout: #{@idle_connection_timeout} + term_timeout: #{@term_timeout} + term_client_close_timeout: #{@term_client_close_timeout} + upstream: #{@upstream} + CONFIG + end + end +end From a2e55a87eebcd8cc3a88e20601a08056a1d796fa Mon Sep 17 00:00:00 2001 From: Arjan Vermunt Date: Tue, 18 Mar 2025 15:29:23 +0100 Subject: [PATCH 02/19] Set load methods to protected Add --log-level switch Ignore missing ini file Add test cases --- spec/amqproxy/config_spec.cr | 17 ++++++++++++++++- src/amqproxy/cli.cr | 1 + src/amqproxy/config.cr | 29 ++++++++++------------------- 3 files changed, 27 insertions(+), 20 deletions(-) diff --git a/spec/amqproxy/config_spec.cr b/spec/amqproxy/config_spec.cr index 8cb1ac1..b2fda07 100644 --- a/spec/amqproxy/config_spec.cr +++ b/spec/amqproxy/config_spec.cr @@ -42,11 +42,13 @@ describe AMQProxy::Config do ARGV.clear ENV["LISTEN_ADDRESS"] = "example_env.com" - ARGV.concat(["--listen=example_arg.com"]) + ENV["LOG_LEVEL"] = "Error" + ARGV.concat(["--listen=example_arg.com", "--log-level=Warn"]) config = AMQProxy::Config.load_with_cli(ARGV) config.listen_address.should eq "example_arg.com" + config.log_level.should eq ::Log::Severity::Warn # Clean Up ENV.delete("LISTEN_ADDRESS") @@ -68,4 +70,17 @@ describe AMQProxy::Config do ARGV.clear ARGV.concat(previous_argv) end + + it "reads without error when ini file is missing" do + previous_argv = ARGV.clone + ARGV.clear + + config = AMQProxy::Config.load_with_cli(ARGV, "/tmp/non_existing_file.ini") + + config.listen_address.should eq "localhost" + + # Restore ARGV + ARGV.clear + ARGV.concat(previous_argv) + end end diff --git a/src/amqproxy/cli.cr b/src/amqproxy/cli.cr index a1ebd7b..cbd161f 100644 --- a/src/amqproxy/cli.cr +++ b/src/amqproxy/cli.cr @@ -71,6 +71,7 @@ class AMQProxy::CLI parser.on("--term-client-close-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to send Close beforing sending Close to clients (default: 0s)") do |v| @term_client_close_timeout = v.to_i end + parser.on("--log-level=LEVEL", "The log level (default: info)") { } parser.on("-d", "--debug", "Verbose logging") { @log_level = ::Log::Severity::Debug } parser.on("-c FILE", "--config=FILE", "Load config file") { |v| parse_config(v) } parser.on("-h", "--help", "Show this help") { puts parser.to_s; exit 0 } diff --git a/src/amqproxy/config.cr b/src/amqproxy/config.cr index d67eeb8..0644a40 100644 --- a/src/amqproxy/config.cr +++ b/src/amqproxy/config.cr @@ -33,7 +33,9 @@ module AMQProxy @term_client_close_timeout = term_client_close_timeout end - def load_from_file(path) # ameba:disable Metrics/CyclomaticComplexity + protected def load_from_file(path) # ameba:disable Metrics/CyclomaticComplexity + return unless File.exists?(path) + INI.parse(File.read(path)).each do |name, section| case name when "main", "" @@ -64,7 +66,7 @@ module AMQProxy end # Override config using environment variables - def load_with_env + protected def load_with_env @listen_address = ENV["LISTEN_ADDRESS"]? || @listen_address @listen_port = ENV["LISTEN_PORT"]?.try &.to_i || @listen_port @http_port = ENV["HTTP_PORT"]?.try &.to_i || @http_port @@ -75,14 +77,15 @@ module AMQProxy @upstream = ENV["UPSTREAM"]? || @upstream end - def load_from_options(args) + # override config using command-line arguments + protected def load_from_options(args) p = OptionParser.parse(args) do |parser| parser.on("-l ADDRESS", "--listen=ADDRESS", "Address to listen on (default is localhost)") do |v| @listen_address = v end parser.on("-p PORT", "--port=PORT", "Port to listen on (default: 5673)") { |v| @listen_port = v.to_i } parser.on("-b PORT", "--http-port=PORT", "HTTP Port to listen on (default: 15673)") { |v| @http_port = v.to_i } - parser.on("-t IDLE_CONNECTION_TIMEOUT", "--idle-connection-timeout=SECONDS", "Maxiumum time in seconds an unused pooled connection stays open (default 5s)") do |v| + parser.on("-t IDLE_CONNECTION_TIMEOUT", "--idle-connection-timeout=SECONDS", "Maximum time in seconds an unused pooled connection stays open (default 5s)") do |v| @idle_connection_timeout = v.to_i end parser.on("--term-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to gracefully close their sockets after Close has been sent (default: infinite)") do |v| @@ -91,36 +94,24 @@ module AMQProxy parser.on("--term-client-close-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to send Close beforing sending Close to clients (default: 0s)") do |v| @term_client_close_timeout = v.to_i end + parser.on("--log-level=LEVEL", "The log level (default: info)") { |v| @log_level = Log::Severity.parse(v) } end end # Override config using command-line arguments def self.load_with_cli(args, path = "config.ini") : Config config = new - + # First, load config file config.load_from_file(path) # Then, load environment variables config.load_with_env + # Finally, load command-line arguments config.load_from_options(args) config end - - # Print configuration for debugging - def to_s - <<-CONFIG - listen_address: #{@listen_address} - listen_port: #{@listen_port} - http_port: #{@http_port} - log_level: #{@log_level} - idle_connection_timeout: #{@idle_connection_timeout} - term_timeout: #{@term_timeout} - term_client_close_timeout: #{@term_client_close_timeout} - upstream: #{@upstream} - CONFIG - end end end From f065f2de20227f7234e9a4b5200aff1cd2a18761 Mon Sep 17 00:00:00 2001 From: Arjan Vermunt Date: Wed, 19 Mar 2025 20:23:10 +0100 Subject: [PATCH 03/19] Add record example --- spec/amqproxy/config_record_spec.cr | 136 ++++++++++++++++++++++++++ spec/amqproxy/config_spec.cr | 6 +- spec/config.ini | 15 ++- src/amqproxy/config.cr | 85 ++++++++++------ src/amqproxy/config_record.cr | 145 ++++++++++++++++++++++++++++ 5 files changed, 350 insertions(+), 37 deletions(-) create mode 100644 spec/amqproxy/config_record_spec.cr create mode 100644 src/amqproxy/config_record.cr diff --git a/spec/amqproxy/config_record_spec.cr b/spec/amqproxy/config_record_spec.cr new file mode 100644 index 0000000..0aece77 --- /dev/null +++ b/spec/amqproxy/config_record_spec.cr @@ -0,0 +1,136 @@ +require "spec" +require "../../src/amqproxy/config_record" + +describe AMQProxy::Configuration do + it "loads defaults when no ini file, env vars or options are available" do + previous_argv = ARGV.clone + ARGV.clear + + config = AMQProxy::Configuration.load_with_cli(ARGV, "/tmp/non_existing_file.ini") + + config.listen_address.should eq "localhost" + config.listen_port.should eq 5673 + config.http_port.should eq 15673 + config.log_level.should eq ::Log::Severity::Info + config.idle_connection_timeout.should eq 5 + config.term_timeout.should eq -1 + config.term_client_close_timeout.should eq 0 + config.upstream.should eq nil + + # Restore ARGV + ARGV.clear + ARGV.concat(previous_argv) + end + + it "reads from environment variables and overwrites ini file values" do + previous_argv = ARGV.clone + ARGV.clear + + ENV["LISTEN_ADDRESS"] = "example.com" + ENV["LISTEN_PORT"] = "5674" + ENV["HTTP_PORT"] = "15674" + ENV["LOG_LEVEL"] = "Error" + ENV["IDLE_CONNECTION_TIMEOUT"] = "12" + ENV["TERM_TIMEOUT"] = "13" + ENV["TERM_CLIENT_CLOSE_TIMEOUT"] = "14" + ENV["UPSTREAM"] = "amqp://localhost:5674" + + config = AMQProxy::Configuration.load_with_cli(ARGV) + + config.listen_address.should eq "example.com" + config.listen_port.should eq 5674 + config.http_port.should eq 15674 + config.log_level.should eq ::Log::Severity::Error + config.idle_connection_timeout.should eq 12 + config.term_timeout.should eq 13 + config.term_client_close_timeout.should eq 14 + config.upstream.should eq "amqp://localhost:5674" + + # Clean up + ENV.delete("LISTEN_ADDRESS") + ENV.delete("LISTEN_PORT") + ENV.delete("HTTP_PORT") + ENV.delete("LOG_LEVEL") + ENV.delete("IDLE_CONNECTION_TIMEOUT") + ENV.delete("TERM_TIMEOUT") + ENV.delete("TERM_CLIENT_CLOSE_TIMEOUT") + ENV.delete("UPSTREAM") + + # Restore ARGV + ARGV.clear + ARGV.concat(previous_argv) + end + + it "reads from command line arguments and overrules env vars" do + previous_argv = ARGV.clone + ARGV.clear + + ENV["LISTEN_ADDRESS"] = "example.com" + ENV["LISTEN_PORT"] = "5674" + ENV["HTTP_PORT"] = "15674" + ENV["LOG_LEVEL"] = "Error" + ENV["IDLE_CONNECTION_TIMEOUT"] = "12" + ENV["TERM_TIMEOUT"] = "13" + ENV["TERM_CLIENT_CLOSE_TIMEOUT"] = "14" + ENV["UPSTREAM"] = "amqp://localhost:5674" + + ARGV.concat(["--listen=example_arg.com", "--port=5675", "--http-port=15675", "--log-level=Warn", "--idle-connection-timeout=15", "--term-timeout=16", "--term-client-close-timeout=17"]) + + config = AMQProxy::Configuration.load_with_cli(ARGV) + + config.listen_address.should eq "example_arg.com" + config.log_level.should eq ::Log::Severity::Warn + config.listen_port.should eq 5675 + config.http_port.should eq 15675 + config.idle_connection_timeout.should eq 15 + config.term_timeout.should eq 16 + config.term_client_close_timeout.should eq 17 + + # Clean Up + ENV.delete("LISTEN_ADDRESS") + ENV.delete("LISTEN_PORT") + ENV.delete("HTTP_PORT") + ENV.delete("LOG_LEVEL") + ENV.delete("IDLE_CONNECTION_TIMEOUT") + ENV.delete("TERM_TIMEOUT") + ENV.delete("TERM_CLIENT_CLOSE_TIMEOUT") + ENV.delete("UPSTREAM") + + # Restore ARGV + ARGV.clear + ARGV.concat(previous_argv) + end + + it "reads from empty config file returning default configuration" do + previous_argv = ARGV.clone + ARGV.clear + + config = AMQProxy::Configuration.load_with_cli(ARGV, "/tmp/config_empty.ini") + + config.listen_address.should eq "localhost" + + # Restore ARGV + ARGV.clear + ARGV.concat(previous_argv) + end + + it "reads init file without error" do + previous_argv = ARGV.clone + ARGV.clear + + config = AMQProxy::Configuration.load_with_cli(ARGV, "/tmp/config.ini") + + config.listen_address.should eq "127.0.0.2" + config.listen_port.should eq 5678 + config.http_port.should eq 15678 + config.log_level.should eq ::Log::Severity::Debug + config.idle_connection_timeout.should eq 55 + config.term_timeout.should eq 56 + config.term_client_close_timeout.should eq 57 + config.upstream.should eq "amqp://localhost:5678" + + # Restore ARGV + ARGV.clear + ARGV.concat(previous_argv) + end +end diff --git a/spec/amqproxy/config_spec.cr b/spec/amqproxy/config_spec.cr index b2fda07..2d9ff87 100644 --- a/spec/amqproxy/config_spec.cr +++ b/spec/amqproxy/config_spec.cr @@ -8,8 +8,8 @@ describe AMQProxy::Config do config = AMQProxy::Config.load_with_cli(ARGV) - config.listen_address.should eq "127.0.0.1" - config.listen_port.should eq 5673 + config.listen_address.should eq "127.0.0.2" + config.listen_port.should eq 5678 # Restore ARGV ARGV.clear @@ -43,6 +43,7 @@ describe AMQProxy::Config do ENV["LISTEN_ADDRESS"] = "example_env.com" ENV["LOG_LEVEL"] = "Error" + ARGV.concat(["--listen=example_arg.com", "--log-level=Warn"]) config = AMQProxy::Config.load_with_cli(ARGV) @@ -52,6 +53,7 @@ describe AMQProxy::Config do # Clean Up ENV.delete("LISTEN_ADDRESS") + ENV.delete("LOG_LEVEL") # Restore ARGV ARGV.clear diff --git a/spec/config.ini b/spec/config.ini index 5332912..89193fc 100644 --- a/spec/config.ini +++ b/spec/config.ini @@ -1,8 +1,13 @@ [main] -log_level = info -idle_connection_timeout = 5 -upstream = amqp://localhost:5672 +log_level = debug +http_port = 15678 +idle_connection_timeout = 55 +term_timeout = 56 +term_client_close_timeout = 57 +upstream = amqp://localhost:5678 [listen] -address = 127.0.0.1 -port = 5673 \ No newline at end of file +bind = 127.0.0.1 +address = 127.0.0.2 +port = 5678 +log_level = debug diff --git a/src/amqproxy/config.cr b/src/amqproxy/config.cr index 0644a40..0d6725e 100644 --- a/src/amqproxy/config.cr +++ b/src/amqproxy/config.cr @@ -33,69 +33,94 @@ module AMQProxy @term_client_close_timeout = term_client_close_timeout end - protected def load_from_file(path) # ameba:disable Metrics/CyclomaticComplexity - return unless File.exists?(path) + private def self.load_from_file(path, oldConfig : Config) : Config # ameba:disable Metrics/CyclomaticComplexity + return oldConfig unless File.exists?(path) + listen_address = oldConfig.listen_address + listen_port = oldConfig.listen_port + http_port = oldConfig.http_port + idle_connection_timeout = oldConfig.idle_connection_timeout + term_timeout = oldConfig.term_timeout + term_client_close_timeout = oldConfig.term_client_close_timeout + log_level = oldConfig.log_level + upstream = oldConfig.upstream + INI.parse(File.read(path)).each do |name, section| case name when "main", "" section.each do |key, value| case key - when "upstream" then @upstream = value - when "log_level" then @log_level = ::Log::Severity.parse(value) - when "idle_connection_timeout" then @idle_connection_timeout = value.to_i - when "term_timeout" then @term_timeout = value.to_i - when "term_client_close_timeout" then @term_client_close_timeout = value.to_i + when "http_port" then http_port = value.to_i + when "upstream" then upstream = value + when "log_level" then log_level = ::Log::Severity.parse(value) + when "idle_connection_timeout" then idle_connection_timeout = value.to_i + when "term_timeout" then term_timeout = value.to_i + when "term_client_close_timeout" then term_client_close_timeout = value.to_i else raise "Unsupported config #{name}/#{key}" end end when "listen" section.each do |key, value| case key - when "port" then @listen_port = value.to_i - when "bind", "address" then @listen_address = value - when "log_level" then @log_level = ::Log::Severity.parse(value) + when "port" then listen_port = value.to_i + when "bind", "address" then listen_address = value + when "log_level" then log_level = ::Log::Severity.parse(value) else raise "Unsupported config #{name}/#{key}" end end else raise "Unsupported config section #{name}" end end + + new listen_address, listen_port, http_port, log_level, idle_connection_timeout, term_timeout, term_client_close_timeout, upstream rescue ex abort ex.message end # Override config using environment variables - protected def load_with_env - @listen_address = ENV["LISTEN_ADDRESS"]? || @listen_address - @listen_port = ENV["LISTEN_PORT"]?.try &.to_i || @listen_port - @http_port = ENV["HTTP_PORT"]?.try &.to_i || @http_port - @log_level = Log::Severity.parse(ENV["LOG_LEVEL"]? || @log_level.to_s) - @idle_connection_timeout = ENV["IDLE_CONNECTION_TIMEOUT"]?.try &.to_i || @idle_connection_timeout - @term_timeout = ENV["TERM_TIMEOUT"]?.try &.to_i || @term_timeout - @term_client_close_timeout = ENV["TERM_CLIENT_CLOSE_TIMEOUT"]?.try &.to_i || @term_client_close_timeout - @upstream = ENV["UPSTREAM"]? || @upstream + private def self.load_with_env(oldConfig : Config = new) : Config + listen_address = ENV["LISTEN_ADDRESS"]? || oldConfig.listen_address + listen_port = ENV["LISTEN_PORT"]?.try &.to_i || oldConfig.listen_port + http_port = ENV["HTTP_PORT"]?.try &.to_i || oldConfig.http_port + log_level = Log::Severity.parse(ENV["LOG_LEVEL"]? || oldConfig.log_level.to_s) + idle_connection_timeout = ENV["IDLE_CONNECTION_TIMEOUT"]?.try &.to_i || oldConfig.idle_connection_timeout + term_timeout = ENV["TERM_TIMEOUT"]?.try &.to_i || oldConfig.term_timeout + term_client_close_timeout = ENV["TERM_CLIENT_CLOSE_TIMEOUT"]?.try &.to_i || oldConfig.term_client_close_timeout + upstream = ENV["UPSTREAM"]? || oldConfig.upstream + + new listen_address, listen_port, http_port, log_level, idle_connection_timeout, term_timeout, term_client_close_timeout, upstream end # override config using command-line arguments - protected def load_from_options(args) + private def self.load_from_options(args, oldConfig : Config = new) : Config + listen_address = oldConfig.listen_address + listen_port = oldConfig.listen_port + http_port = oldConfig.http_port + idle_connection_timeout = oldConfig.idle_connection_timeout + term_timeout = oldConfig.term_timeout + term_client_close_timeout = oldConfig.term_client_close_timeout + log_level = oldConfig.log_level + upstream = oldConfig.upstream + p = OptionParser.parse(args) do |parser| parser.on("-l ADDRESS", "--listen=ADDRESS", "Address to listen on (default is localhost)") do |v| - @listen_address = v + listen_address = v end - parser.on("-p PORT", "--port=PORT", "Port to listen on (default: 5673)") { |v| @listen_port = v.to_i } - parser.on("-b PORT", "--http-port=PORT", "HTTP Port to listen on (default: 15673)") { |v| @http_port = v.to_i } + parser.on("-p PORT", "--port=PORT", "Port to listen on (default: 5673)") { |v| listen_port = v.to_i } + parser.on("-b PORT", "--http-port=PORT", "HTTP Port to listen on (default: 15673)") { |v| http_port = v.to_i } parser.on("-t IDLE_CONNECTION_TIMEOUT", "--idle-connection-timeout=SECONDS", "Maximum time in seconds an unused pooled connection stays open (default 5s)") do |v| - @idle_connection_timeout = v.to_i + idle_connection_timeout = v.to_i end parser.on("--term-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to gracefully close their sockets after Close has been sent (default: infinite)") do |v| - @term_timeout = v.to_i + term_timeout = v.to_i end parser.on("--term-client-close-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to send Close beforing sending Close to clients (default: 0s)") do |v| - @term_client_close_timeout = v.to_i + term_client_close_timeout = v.to_i end - parser.on("--log-level=LEVEL", "The log level (default: info)") { |v| @log_level = Log::Severity.parse(v) } + parser.on("--log-level=LEVEL", "The log level (default: info)") { |v| log_level = Log::Severity.parse(v) } end + + new listen_address, listen_port, http_port, log_level, idle_connection_timeout, term_timeout, term_client_close_timeout, upstream end # Override config using command-line arguments @@ -103,13 +128,13 @@ module AMQProxy config = new # First, load config file - config.load_from_file(path) + config = self.load_from_file(path, config) # Then, load environment variables - config.load_with_env + config = self.load_with_env(config) # Finally, load command-line arguments - config.load_from_options(args) + config = self.load_from_options(args, config) config end diff --git a/src/amqproxy/config_record.cr b/src/amqproxy/config_record.cr new file mode 100644 index 0000000..55df6d2 --- /dev/null +++ b/src/amqproxy/config_record.cr @@ -0,0 +1,145 @@ +require "ini" +require "log" +require "option_parser" + +module AMQProxy + record Configuration, + listen_address : String, + listen_port : Int32, + http_port : Int32, + log_level : Log::Severity, + idle_connection_timeout : Int32, + term_timeout : Int32, + term_client_close_timeout : Int32, + upstream : String? do + + # Factory method to create a Config with nullable parameters + def self.create( + listen_address : String? = nil, + listen_port : Int32? = nil, + http_port : Int32? = nil, + log_level : Log::Severity? = nil, + idle_connection_timeout : Int32? = nil, + term_timeout : Int32? = nil, + term_client_close_timeout : Int32? = nil, + upstream : String? = nil + ) + new( + listen_address || "localhost", + listen_port || 5673, + http_port || 15673, + log_level || Log::Severity::Info, + idle_connection_timeout || 5, + term_timeout || -1, + term_client_close_timeout || 0, + upstream || nil + ) + end + + # Method to return a new instance with modified fields (like C# `with`) + def with( + listen_address : String? = nil, + listen_port : Int32? = nil, + http_port : Int32? = nil, + log_level : Log::Severity? = nil, + idle_connection_timeout : Int32? = nil, + term_timeout : Int32? = nil, + term_client_close_timeout : Int32? = nil, + upstream : String? = nil + ) + Configuration.new( + listen_address || self.listen_address, + listen_port || self.listen_port, + http_port || self.http_port, + log_level || self.log_level, + idle_connection_timeout || self.idle_connection_timeout, + term_timeout || self.term_timeout, + term_client_close_timeout || self.term_client_close_timeout, + upstream || self.upstream + ) + end + + def load_from_file(path : String) # ameba:disable Metrics/CyclomaticComplexity + return self unless File.exists?(path) + + config = self + + INI.parse(File.read(path)).each do |name, section| + case name + when "main", "" + section.each do |key, value| + case key + when "http_port" then config = config.with(http_port: value.to_i) + when "upstream" then config = config.with(upstream: value) + when "log_level" then config = config.with(log_level: ::Log::Severity.parse(value)) + when "idle_connection_timeout" then config = config.with(idle_connection_timeout: value.to_i) + when "term_timeout" then config = config.with(term_timeout: value.to_i) + when "term_client_close_timeout" then config = config.with(term_client_close_timeout: value.to_i) + else raise "Unsupported config #{name}/#{key}" + end + end + when "listen" + section.each do |key, value| + case key + when "port" then config = config.with(listen_port: value.to_i) + when "bind", "address" then config = config.with(listen_address: value) + when "log_level" then config = config.with(log_level: ::Log::Severity.parse(value)) + else raise "Unsupported config #{name}/#{key}" + end + end + else raise "Unsupported config section #{name}" + end + end + + config + rescue ex + abort ex.message + end + + def load_from_env + self.with( + listen_address: ENV["LISTEN_ADDRESS"]?, + listen_port: ENV["LISTEN_PORT"]?.try &.to_i, + http_port: ENV["HTTP_PORT"]?.try &.to_i, + log_level: Log::Severity.parse(ENV["LOG_LEVEL"]? || self.log_level.to_s), + idle_connection_timeout: ENV["IDLE_CONNECTION_TIMEOUT"]?.try &.to_i, + term_timeout: ENV["TERM_TIMEOUT"]?.try &.to_i, + term_client_close_timeout: ENV["TERM_CLIENT_CLOSE_TIMEOUT"]?.try &.to_i, + upstream: ENV["UPSTREAM"]? + ) + end + + def load_from_options(args) + config = self + + p = OptionParser.parse(args) do |parser| + parser.on("-l ADDRESS", "--listen=ADDRESS", "Address to listen on (default is localhost)") do |v| + config = config.with(listen_address: v) + end + parser.on("-p PORT", "--port=PORT", "Port to listen on (default: 5673)") { |v| config = config.with(listen_port: v.to_i) } + parser.on("-b PORT", "--http-port=PORT", "HTTP Port to listen on (default: 15673)") { |v| config = config.with(http_port: v.to_i) } + parser.on("-t IDLE_CONNECTION_TIMEOUT", "--idle-connection-timeout=SECONDS", "Maximum time in seconds an unused pooled connection stays open (default 5s)") do |v| + config = config.with(idle_connection_timeout: v.to_i) + end + parser.on("--term-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to gracefully close their sockets after Close has been sent (default: infinite)") do |v| + config = config.with(term_timeout: v.to_i) + end + parser.on("--term-client-close-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to send Close beforing sending Close to clients (default: 0s)") do |v| + config = config.with(term_client_close_timeout: v.to_i) + end + parser.on("--log-level=LEVEL", "The log level (default: info)") { |v| config = config.with(log_level: Log::Severity.parse(v)) } + end + + config + end + + def self.load_with_cli(args, path = "config.ini") + config = self.create() + .load_from_file(path) + .load_from_env() + .load_from_options(args) + + config + end + end +end From 16fbe1b7408cd977fc3b237a537c535666cd4719 Mon Sep 17 00:00:00 2001 From: Arjan Vermunt Date: Thu, 20 Mar 2025 19:13:26 +0100 Subject: [PATCH 04/19] Implement config record --- spec/amqproxy/config_record_spec.cr | 136 -------------------- spec/amqproxy/config_spec.cr | 104 ++++++++++++++-- src/amqproxy/cli.cr | 87 ++++--------- src/amqproxy/config.cr | 186 ++++++++++++++-------------- src/amqproxy/config_record.cr | 145 ---------------------- 5 files changed, 207 insertions(+), 451 deletions(-) delete mode 100644 spec/amqproxy/config_record_spec.cr delete mode 100644 src/amqproxy/config_record.cr diff --git a/spec/amqproxy/config_record_spec.cr b/spec/amqproxy/config_record_spec.cr deleted file mode 100644 index 0aece77..0000000 --- a/spec/amqproxy/config_record_spec.cr +++ /dev/null @@ -1,136 +0,0 @@ -require "spec" -require "../../src/amqproxy/config_record" - -describe AMQProxy::Configuration do - it "loads defaults when no ini file, env vars or options are available" do - previous_argv = ARGV.clone - ARGV.clear - - config = AMQProxy::Configuration.load_with_cli(ARGV, "/tmp/non_existing_file.ini") - - config.listen_address.should eq "localhost" - config.listen_port.should eq 5673 - config.http_port.should eq 15673 - config.log_level.should eq ::Log::Severity::Info - config.idle_connection_timeout.should eq 5 - config.term_timeout.should eq -1 - config.term_client_close_timeout.should eq 0 - config.upstream.should eq nil - - # Restore ARGV - ARGV.clear - ARGV.concat(previous_argv) - end - - it "reads from environment variables and overwrites ini file values" do - previous_argv = ARGV.clone - ARGV.clear - - ENV["LISTEN_ADDRESS"] = "example.com" - ENV["LISTEN_PORT"] = "5674" - ENV["HTTP_PORT"] = "15674" - ENV["LOG_LEVEL"] = "Error" - ENV["IDLE_CONNECTION_TIMEOUT"] = "12" - ENV["TERM_TIMEOUT"] = "13" - ENV["TERM_CLIENT_CLOSE_TIMEOUT"] = "14" - ENV["UPSTREAM"] = "amqp://localhost:5674" - - config = AMQProxy::Configuration.load_with_cli(ARGV) - - config.listen_address.should eq "example.com" - config.listen_port.should eq 5674 - config.http_port.should eq 15674 - config.log_level.should eq ::Log::Severity::Error - config.idle_connection_timeout.should eq 12 - config.term_timeout.should eq 13 - config.term_client_close_timeout.should eq 14 - config.upstream.should eq "amqp://localhost:5674" - - # Clean up - ENV.delete("LISTEN_ADDRESS") - ENV.delete("LISTEN_PORT") - ENV.delete("HTTP_PORT") - ENV.delete("LOG_LEVEL") - ENV.delete("IDLE_CONNECTION_TIMEOUT") - ENV.delete("TERM_TIMEOUT") - ENV.delete("TERM_CLIENT_CLOSE_TIMEOUT") - ENV.delete("UPSTREAM") - - # Restore ARGV - ARGV.clear - ARGV.concat(previous_argv) - end - - it "reads from command line arguments and overrules env vars" do - previous_argv = ARGV.clone - ARGV.clear - - ENV["LISTEN_ADDRESS"] = "example.com" - ENV["LISTEN_PORT"] = "5674" - ENV["HTTP_PORT"] = "15674" - ENV["LOG_LEVEL"] = "Error" - ENV["IDLE_CONNECTION_TIMEOUT"] = "12" - ENV["TERM_TIMEOUT"] = "13" - ENV["TERM_CLIENT_CLOSE_TIMEOUT"] = "14" - ENV["UPSTREAM"] = "amqp://localhost:5674" - - ARGV.concat(["--listen=example_arg.com", "--port=5675", "--http-port=15675", "--log-level=Warn", "--idle-connection-timeout=15", "--term-timeout=16", "--term-client-close-timeout=17"]) - - config = AMQProxy::Configuration.load_with_cli(ARGV) - - config.listen_address.should eq "example_arg.com" - config.log_level.should eq ::Log::Severity::Warn - config.listen_port.should eq 5675 - config.http_port.should eq 15675 - config.idle_connection_timeout.should eq 15 - config.term_timeout.should eq 16 - config.term_client_close_timeout.should eq 17 - - # Clean Up - ENV.delete("LISTEN_ADDRESS") - ENV.delete("LISTEN_PORT") - ENV.delete("HTTP_PORT") - ENV.delete("LOG_LEVEL") - ENV.delete("IDLE_CONNECTION_TIMEOUT") - ENV.delete("TERM_TIMEOUT") - ENV.delete("TERM_CLIENT_CLOSE_TIMEOUT") - ENV.delete("UPSTREAM") - - # Restore ARGV - ARGV.clear - ARGV.concat(previous_argv) - end - - it "reads from empty config file returning default configuration" do - previous_argv = ARGV.clone - ARGV.clear - - config = AMQProxy::Configuration.load_with_cli(ARGV, "/tmp/config_empty.ini") - - config.listen_address.should eq "localhost" - - # Restore ARGV - ARGV.clear - ARGV.concat(previous_argv) - end - - it "reads init file without error" do - previous_argv = ARGV.clone - ARGV.clear - - config = AMQProxy::Configuration.load_with_cli(ARGV, "/tmp/config.ini") - - config.listen_address.should eq "127.0.0.2" - config.listen_port.should eq 5678 - config.http_port.should eq 15678 - config.log_level.should eq ::Log::Severity::Debug - config.idle_connection_timeout.should eq 55 - config.term_timeout.should eq 56 - config.term_client_close_timeout.should eq 57 - config.upstream.should eq "amqp://localhost:5678" - - # Restore ARGV - ARGV.clear - ARGV.concat(previous_argv) - end -end diff --git a/spec/amqproxy/config_spec.cr b/spec/amqproxy/config_spec.cr index 2d9ff87..e3bd65c 100644 --- a/spec/amqproxy/config_spec.cr +++ b/spec/amqproxy/config_spec.cr @@ -2,35 +2,59 @@ require "spec" require "../../src/amqproxy/config" describe AMQProxy::Config do - it "loads defaults when no env vars are set" do + it "loads defaults when no ini file, env vars or options are available" do previous_argv = ARGV.clone ARGV.clear - config = AMQProxy::Config.load_with_cli(ARGV) + config = AMQProxy::Config.load_with_cli(ARGV, "/tmp/non_existing_file.ini") - config.listen_address.should eq "127.0.0.2" - config.listen_port.should eq 5678 + config.listen_address.should eq "localhost" + config.listen_port.should eq 5673 + config.http_port.should eq 15673 + config.log_level.should eq ::Log::Severity::Info + config.idle_connection_timeout.should eq 5 + config.term_timeout.should eq -1 + config.term_client_close_timeout.should eq 0 + config.upstream.should eq nil # Restore ARGV ARGV.clear ARGV.concat(previous_argv) end - it "reads from environment variables" do + it "reads from environment variables and overwrites ini file values" do previous_argv = ARGV.clone ARGV.clear ENV["LISTEN_ADDRESS"] = "example.com" ENV["LISTEN_PORT"] = "5674" - + ENV["HTTP_PORT"] = "15674" + ENV["LOG_LEVEL"] = "Error" + ENV["IDLE_CONNECTION_TIMEOUT"] = "12" + ENV["TERM_TIMEOUT"] = "13" + ENV["TERM_CLIENT_CLOSE_TIMEOUT"] = "14" + ENV["UPSTREAM"] = "amqp://localhost:5674" + config = AMQProxy::Config.load_with_cli(ARGV) config.listen_address.should eq "example.com" config.listen_port.should eq 5674 + config.http_port.should eq 15674 + config.log_level.should eq ::Log::Severity::Error + config.idle_connection_timeout.should eq 12 + config.term_timeout.should eq 13 + config.term_client_close_timeout.should eq 14 + config.upstream.should eq "amqp://localhost:5674" # Clean up ENV.delete("LISTEN_ADDRESS") ENV.delete("LISTEN_PORT") + ENV.delete("HTTP_PORT") + ENV.delete("LOG_LEVEL") + ENV.delete("IDLE_CONNECTION_TIMEOUT") + ENV.delete("TERM_TIMEOUT") + ENV.delete("TERM_CLIENT_CLOSE_TIMEOUT") + ENV.delete("UPSTREAM") # Restore ARGV ARGV.clear @@ -41,20 +65,46 @@ describe AMQProxy::Config do previous_argv = ARGV.clone ARGV.clear - ENV["LISTEN_ADDRESS"] = "example_env.com" + ENV["LISTEN_ADDRESS"] = "example.com" + ENV["LISTEN_PORT"] = "5674" + ENV["HTTP_PORT"] = "15674" ENV["LOG_LEVEL"] = "Error" - - ARGV.concat(["--listen=example_arg.com", "--log-level=Warn"]) + ENV["IDLE_CONNECTION_TIMEOUT"] = "12" + ENV["TERM_TIMEOUT"] = "13" + ENV["TERM_CLIENT_CLOSE_TIMEOUT"] = "14" + ENV["UPSTREAM"] = "amqp://localhost:5674" + + ARGV.concat([ + "--listen=example_arg.com", + "--port=5675", + "--http-port=15675", + "--log-level=Warn", + "--idle-connection-timeout=15", + "--term-timeout=16", + "--term-client-close-timeout=17", + "amqp://localhost:5679"]) config = AMQProxy::Config.load_with_cli(ARGV) config.listen_address.should eq "example_arg.com" config.log_level.should eq ::Log::Severity::Warn + config.listen_port.should eq 5675 + config.http_port.should eq 15675 + config.idle_connection_timeout.should eq 15 + config.term_timeout.should eq 16 + config.term_client_close_timeout.should eq 17 + config.upstream.should eq "amqp://localhost:5679" # Clean Up ENV.delete("LISTEN_ADDRESS") + ENV.delete("LISTEN_PORT") + ENV.delete("HTTP_PORT") ENV.delete("LOG_LEVEL") - + ENV.delete("IDLE_CONNECTION_TIMEOUT") + ENV.delete("TERM_TIMEOUT") + ENV.delete("TERM_CLIENT_CLOSE_TIMEOUT") + ENV.delete("UPSTREAM") + # Restore ARGV ARGV.clear ARGV.concat(previous_argv) @@ -73,13 +123,41 @@ describe AMQProxy::Config do ARGV.concat(previous_argv) end - it "reads without error when ini file is missing" do + it "reads init file without error" do previous_argv = ARGV.clone ARGV.clear - config = AMQProxy::Config.load_with_cli(ARGV, "/tmp/non_existing_file.ini") + config = AMQProxy::Config.load_with_cli(ARGV) - config.listen_address.should eq "localhost" + config.listen_address.should eq "127.0.0.2" + config.listen_port.should eq 5678 + config.http_port.should eq 15678 + config.log_level.should eq ::Log::Severity::Debug + config.idle_connection_timeout.should eq 55 + config.term_timeout.should eq 56 + config.term_client_close_timeout.should eq 57 + config.upstream.should eq "amqp://localhost:5678" + + # Restore ARGV + ARGV.clear + ARGV.concat(previous_argv) + end + + it "reads default ini file when null ini path is specified" do + previous_argv = ARGV.clone + ARGV.clear + + initPath : String? = nil + config = AMQProxy::Config.load_with_cli(ARGV, initPath) + + config.listen_address.should eq "127.0.0.2" + config.listen_port.should eq 5678 + config.http_port.should eq 15678 + config.log_level.should eq ::Log::Severity::Debug + config.idle_connection_timeout.should eq 55 + config.term_timeout.should eq 56 + config.term_client_close_timeout.should eq 57 + config.upstream.should eq "amqp://localhost:5678" # Restore ARGV ARGV.clear diff --git a/src/amqproxy/cli.cr b/src/amqproxy/cli.cr index cbd161f..809b006 100644 --- a/src/amqproxy/cli.cr +++ b/src/amqproxy/cli.cr @@ -1,3 +1,4 @@ +require "./config" require "./version" require "./server" require "./http_server" @@ -10,79 +11,33 @@ class AMQProxy::CLI Log = ::Log.for(self) @config : Config - @listen_address = ENV["LISTEN_ADDRESS"]? || "localhost" - @listen_port = ENV["LISTEN_PORT"]? || 5673 - @http_port = ENV["HTTP_PORT"]? || 15673 - @log_level : ::Log::Severity = ::Log::Severity::Info - @idle_connection_timeout : Int32 = ENV.fetch("IDLE_CONNECTION_TIMEOUT", "5").to_i - @term_timeout = -1 - @term_client_close_timeout = 0 - @upstream = ENV["AMQP_URL"]? @server : AMQProxy::Server? = nil - def parse_config(path) # ameba:disable Metrics/CyclomaticComplexity - INI.parse(File.read(path)).each do |name, section| - case name - when "main", "" - section.each do |key, value| - case key - when "upstream" then @upstream = value - when "log_level" then @log_level = ::Log::Severity.parse(value) - when "idle_connection_timeout" then @idle_connection_timeout = value.to_i - when "term_timeout" then @term_timeout = value.to_i - when "term_client_close_timeout" then @term_client_close_timeout = value.to_i - else raise "Unsupported config #{name}/#{key}" - end - end - when "listen" - section.each do |key, value| - case key - when "port" then @listen_port = value - when "bind", "address" then @listen_address = value - when "log_level" then @log_level = ::Log::Severity.parse(value) - else raise "Unsupported config #{name}/#{key}" - end - end - else raise "Unsupported config section #{name}" - end - end - rescue ex - abort ex.message - end - def run(argv) raise "run cant be called multiple times" unless @server.nil? - @config = Config.new + file : String? = nil + # validate options and get config file when specified p = OptionParser.parse(argv) do |parser| parser.banner = "Usage: amqproxy [options] [amqp upstream url]" - parser.on("-l ADDRESS", "--listen=ADDRESS", "Address to listen on (default is localhost)") do |v| - @listen_address = v - end - parser.on("-p PORT", "--port=PORT", "Port to listen on (default: 5673)") { |v| @listen_port = v.to_i } - parser.on("-b PORT", "--http-port=PORT", "HTTP Port to listen on (default: 15673)") { |v| @http_port = v.to_i } - parser.on("-t IDLE_CONNECTION_TIMEOUT", "--idle-connection-timeout=SECONDS", "Maxiumum time in seconds an unused pooled connection stays open (default 5s)") do |v| - @idle_connection_timeout = v.to_i - end - parser.on("--term-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to gracefully close their sockets after Close has been sent (default: infinite)") do |v| - @term_timeout = v.to_i - end - parser.on("--term-client-close-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to send Close beforing sending Close to clients (default: 0s)") do |v| - @term_client_close_timeout = v.to_i - end + parser.on("-l ADDRESS", "--listen=ADDRESS", "Address to listen on (default is localhost)") { } + parser.on("-p PORT", "--port=PORT", "Port to listen on (default: 5673)") { } + parser.on("-b PORT", "--http-port=PORT", "HTTP Port to listen on (default: 15673)") { } + parser.on("-t IDLE_CONNECTION_TIMEOUT", "--idle-connection-timeout=SECONDS", "Maxiumum time in seconds an unused pooled connection stays open (default 5s)") { } + parser.on("--term-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to gracefully close their sockets after Close has been sent (default: infinite)") { } + parser.on("--term-client-close-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to send Close beforing sending Close to clients (default: 0s)") { } parser.on("--log-level=LEVEL", "The log level (default: info)") { } - parser.on("-d", "--debug", "Verbose logging") { @log_level = ::Log::Severity::Debug } - parser.on("-c FILE", "--config=FILE", "Load config file") { |v| parse_config(v) } + parser.on("-d", "--debug", "Verbose logging") { } + parser.on("-c FILE", "--config=FILE", "Load config file") { |v| file = v } parser.on("-h", "--help", "Show this help") { puts parser.to_s; exit 0 } parser.on("-v", "--version", "Display version") { puts AMQProxy::VERSION.to_s; exit 0 } parser.invalid_option { |arg| abort "Invalid argument: #{arg}" } end - @upstream ||= argv.shift? - upstream_url = @upstream || abort p.to_s + @config = AMQProxy::Config.load_with_cli(argv, file) - u = URI.parse upstream_url + u = URI.parse @config.upstream abort "Invalid upstream URL" unless u.host default_port = case u.scheme @@ -98,15 +53,15 @@ class AMQProxy::CLI else ::Log::IOBackend.new(formatter: Stdout::LogFormat, dispatcher: ::Log::DirectDispatcher) end - ::Log.setup_from_env(default_level: @log_level, backend: log_backend) + ::Log.setup_from_env(default_level: @config.log_level, backend: log_backend) Signal::INT.trap &->self.initiate_shutdown(Signal) Signal::TERM.trap &->self.initiate_shutdown(Signal) - server = @server = AMQProxy::Server.new(u.hostname || "", port, tls, @idle_connection_timeout) + server = @server = AMQProxy::Server.new(u.hostname || "", port, tls, @config.idle_connection_timeout) - HTTPServer.new(server, @listen_address, @http_port.to_i) - server.listen(@listen_address, @listen_port.to_i) + HTTPServer.new(server, @config.listen_address, @config.http_port) + server.listen(config.listen_address, @config.listen_port) shutdown @@ -136,16 +91,16 @@ class AMQProxy::CLI raise "Can't call shutdown before run" end if server.client_connections > 0 - if @term_client_close_timeout > 0 - wait_for_clients_to_close @term_client_close_timeout.seconds + if @config.term_client_close_timeout > 0 + wait_for_clients_to_close @config.term_client_close_timeout.seconds end server.disconnect_clients end if server.client_connections > 0 - if @term_timeout >= 0 + if @config.term_timeout >= 0 spawn do - sleep @term_timeout + sleep @config.term_timeout abort "Exiting with #{server.client_connections} client connections still open" end end diff --git a/src/amqproxy/config.cr b/src/amqproxy/config.cr index 0d6725e..b5254e9 100644 --- a/src/amqproxy/config.cr +++ b/src/amqproxy/config.cr @@ -3,68 +3,87 @@ require "log" require "option_parser" module AMQProxy - class Config - # Define instance variables and getters - getter listen_address : String - getter listen_port : Int32 - getter http_port : Int32 - getter log_level : Log::Severity - getter idle_connection_timeout : Int32 - getter term_timeout : Int32 - getter term_client_close_timeout : Int32 - getter upstream : String? - - private def initialize( - listen_address = "localhost", - listen_port = 5673, - http_port = 15673, - log_level = Log::Severity::Info, - idle_connection_timeout = 5, - term_timeout = -1, - term_client_close_timeout = 0, - @upstream = nil + record Config, + listen_address : String, + listen_port : Int32, + http_port : Int32, + log_level : Log::Severity, + idle_connection_timeout : Int32, + term_timeout : Int32, + term_client_close_timeout : Int32, + upstream : String? do + + # Factory method to create a Config with nullable parameters + private def self.create( + listen_address : String? = nil, + listen_port : Int32? = nil, + http_port : Int32? = nil, + log_level : Log::Severity? = nil, + idle_connection_timeout : Int32? = nil, + term_timeout : Int32? = nil, + term_client_close_timeout : Int32? = nil, + upstream : String? = nil + ) + new( + listen_address || "localhost", + listen_port || 5673, + http_port || 15673, + log_level || Log::Severity::Info, + idle_connection_timeout || 5, + term_timeout || -1, + term_client_close_timeout || 0, + upstream || nil + ) + end + + # Method to return a new instance with modified fields (like C# `with`) + protected def with( + listen_address : String? = nil, + listen_port : Int32? = nil, + http_port : Int32? = nil, + log_level : Log::Severity? = nil, + idle_connection_timeout : Int32? = nil, + term_timeout : Int32? = nil, + term_client_close_timeout : Int32? = nil, + upstream : String? = nil ) - @listen_address = listen_address - @listen_port = listen_port - @http_port = http_port - @log_level = log_level - @idle_connection_timeout = idle_connection_timeout - @term_timeout = term_timeout - @term_client_close_timeout = term_client_close_timeout + Config.new( + listen_address || self.listen_address, + listen_port || self.listen_port, + http_port || self.http_port, + log_level || self.log_level, + idle_connection_timeout || self.idle_connection_timeout, + term_timeout || self.term_timeout, + term_client_close_timeout || self.term_client_close_timeout, + upstream || self.upstream + ) end - - private def self.load_from_file(path, oldConfig : Config) : Config # ameba:disable Metrics/CyclomaticComplexity - return oldConfig unless File.exists?(path) - - listen_address = oldConfig.listen_address - listen_port = oldConfig.listen_port - http_port = oldConfig.http_port - idle_connection_timeout = oldConfig.idle_connection_timeout - term_timeout = oldConfig.term_timeout - term_client_close_timeout = oldConfig.term_client_close_timeout - log_level = oldConfig.log_level - upstream = oldConfig.upstream + + protected def load_from_file(path : String) # ameba:disable Metrics/CyclomaticComplexity + return self unless File.exists?(path) + + config = self INI.parse(File.read(path)).each do |name, section| case name when "main", "" section.each do |key, value| case key - when "http_port" then http_port = value.to_i - when "upstream" then upstream = value - when "log_level" then log_level = ::Log::Severity.parse(value) - when "idle_connection_timeout" then idle_connection_timeout = value.to_i - when "term_timeout" then term_timeout = value.to_i - when "term_client_close_timeout" then term_client_close_timeout = value.to_i + when "http_port" then config = config.with(http_port: value.to_i) + when "upstream" then config = config.with(upstream: value) + when "log_level" then config = config.with(log_level: ::Log::Severity.parse(value)) + when "idle_connection_timeout" then config = config.with(idle_connection_timeout: value.to_i) + when "term_timeout" then config = config.with(term_timeout: value.to_i) + when "term_client_close_timeout" then config = config.with(term_client_close_timeout: value.to_i) else raise "Unsupported config #{name}/#{key}" end end when "listen" section.each do |key, value| case key - when "port" then listen_port = value.to_i - when "bind", "address" then listen_address = value - when "log_level" then log_level = ::Log::Severity.parse(value) + when "port" then config = config.with(listen_port: value.to_i) + when "bind", "address" then config = config.with(listen_address: value) + when "log_level" then config = config.with(log_level: ::Log::Severity.parse(value)) else raise "Unsupported config #{name}/#{key}" end end @@ -72,69 +91,54 @@ module AMQProxy end end - new listen_address, listen_port, http_port, log_level, idle_connection_timeout, term_timeout, term_client_close_timeout, upstream + config rescue ex abort ex.message end - - # Override config using environment variables - private def self.load_with_env(oldConfig : Config = new) : Config - listen_address = ENV["LISTEN_ADDRESS"]? || oldConfig.listen_address - listen_port = ENV["LISTEN_PORT"]?.try &.to_i || oldConfig.listen_port - http_port = ENV["HTTP_PORT"]?.try &.to_i || oldConfig.http_port - log_level = Log::Severity.parse(ENV["LOG_LEVEL"]? || oldConfig.log_level.to_s) - idle_connection_timeout = ENV["IDLE_CONNECTION_TIMEOUT"]?.try &.to_i || oldConfig.idle_connection_timeout - term_timeout = ENV["TERM_TIMEOUT"]?.try &.to_i || oldConfig.term_timeout - term_client_close_timeout = ENV["TERM_CLIENT_CLOSE_TIMEOUT"]?.try &.to_i || oldConfig.term_client_close_timeout - upstream = ENV["UPSTREAM"]? || oldConfig.upstream - new listen_address, listen_port, http_port, log_level, idle_connection_timeout, term_timeout, term_client_close_timeout, upstream + protected def load_from_env + self.with( + listen_address: ENV["LISTEN_ADDRESS"]?, + listen_port: ENV["LISTEN_PORT"]?.try &.to_i, + http_port: ENV["HTTP_PORT"]?.try &.to_i, + log_level: Log::Severity.parse(ENV["LOG_LEVEL"]? || self.log_level.to_s), + idle_connection_timeout: ENV["IDLE_CONNECTION_TIMEOUT"]?.try &.to_i, + term_timeout: ENV["TERM_TIMEOUT"]?.try &.to_i, + term_client_close_timeout: ENV["TERM_CLIENT_CLOSE_TIMEOUT"]?.try &.to_i, + upstream: ENV["UPSTREAM"]? + ) end - # override config using command-line arguments - private def self.load_from_options(args, oldConfig : Config = new) : Config - listen_address = oldConfig.listen_address - listen_port = oldConfig.listen_port - http_port = oldConfig.http_port - idle_connection_timeout = oldConfig.idle_connection_timeout - term_timeout = oldConfig.term_timeout - term_client_close_timeout = oldConfig.term_client_close_timeout - log_level = oldConfig.log_level - upstream = oldConfig.upstream + protected def load_from_options(args) + config = self p = OptionParser.parse(args) do |parser| parser.on("-l ADDRESS", "--listen=ADDRESS", "Address to listen on (default is localhost)") do |v| - listen_address = v + config = config.with(listen_address: v) end - parser.on("-p PORT", "--port=PORT", "Port to listen on (default: 5673)") { |v| listen_port = v.to_i } - parser.on("-b PORT", "--http-port=PORT", "HTTP Port to listen on (default: 15673)") { |v| http_port = v.to_i } + parser.on("-p PORT", "--port=PORT", "Port to listen on (default: 5673)") { |v| config = config.with(listen_port: v.to_i) } + parser.on("-b PORT", "--http-port=PORT", "HTTP Port to listen on (default: 15673)") { |v| config = config.with(http_port: v.to_i) } parser.on("-t IDLE_CONNECTION_TIMEOUT", "--idle-connection-timeout=SECONDS", "Maximum time in seconds an unused pooled connection stays open (default 5s)") do |v| - idle_connection_timeout = v.to_i + config = config.with(idle_connection_timeout: v.to_i) end parser.on("--term-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to gracefully close their sockets after Close has been sent (default: infinite)") do |v| - term_timeout = v.to_i + config = config.with(term_timeout: v.to_i) end parser.on("--term-client-close-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to send Close beforing sending Close to clients (default: 0s)") do |v| - term_client_close_timeout = v.to_i + config = config.with(term_client_close_timeout: v.to_i) end - parser.on("--log-level=LEVEL", "The log level (default: info)") { |v| log_level = Log::Severity.parse(v) } + parser.on("--log-level=LEVEL", "The log level (default: info)") { |v| config = config.with(log_level: Log::Severity.parse(v)) } end - new listen_address, listen_port, http_port, log_level, idle_connection_timeout, term_timeout, term_client_close_timeout, upstream + config end - - # Override config using command-line arguments - def self.load_with_cli(args, path = "config.ini") : Config - config = new - # First, load config file - config = self.load_from_file(path, config) - - # Then, load environment variables - config = self.load_with_env(config) - - # Finally, load command-line arguments - config = self.load_from_options(args, config) + def self.load_with_cli(args, path : String? = nil) + config = self.create() + .load_from_file(path || "config.ini") + .load_from_env() + .load_from_options(args) + .with(upstream: args.shift?) config end diff --git a/src/amqproxy/config_record.cr b/src/amqproxy/config_record.cr deleted file mode 100644 index 55df6d2..0000000 --- a/src/amqproxy/config_record.cr +++ /dev/null @@ -1,145 +0,0 @@ -require "ini" -require "log" -require "option_parser" - -module AMQProxy - record Configuration, - listen_address : String, - listen_port : Int32, - http_port : Int32, - log_level : Log::Severity, - idle_connection_timeout : Int32, - term_timeout : Int32, - term_client_close_timeout : Int32, - upstream : String? do - - # Factory method to create a Config with nullable parameters - def self.create( - listen_address : String? = nil, - listen_port : Int32? = nil, - http_port : Int32? = nil, - log_level : Log::Severity? = nil, - idle_connection_timeout : Int32? = nil, - term_timeout : Int32? = nil, - term_client_close_timeout : Int32? = nil, - upstream : String? = nil - ) - new( - listen_address || "localhost", - listen_port || 5673, - http_port || 15673, - log_level || Log::Severity::Info, - idle_connection_timeout || 5, - term_timeout || -1, - term_client_close_timeout || 0, - upstream || nil - ) - end - - # Method to return a new instance with modified fields (like C# `with`) - def with( - listen_address : String? = nil, - listen_port : Int32? = nil, - http_port : Int32? = nil, - log_level : Log::Severity? = nil, - idle_connection_timeout : Int32? = nil, - term_timeout : Int32? = nil, - term_client_close_timeout : Int32? = nil, - upstream : String? = nil - ) - Configuration.new( - listen_address || self.listen_address, - listen_port || self.listen_port, - http_port || self.http_port, - log_level || self.log_level, - idle_connection_timeout || self.idle_connection_timeout, - term_timeout || self.term_timeout, - term_client_close_timeout || self.term_client_close_timeout, - upstream || self.upstream - ) - end - - def load_from_file(path : String) # ameba:disable Metrics/CyclomaticComplexity - return self unless File.exists?(path) - - config = self - - INI.parse(File.read(path)).each do |name, section| - case name - when "main", "" - section.each do |key, value| - case key - when "http_port" then config = config.with(http_port: value.to_i) - when "upstream" then config = config.with(upstream: value) - when "log_level" then config = config.with(log_level: ::Log::Severity.parse(value)) - when "idle_connection_timeout" then config = config.with(idle_connection_timeout: value.to_i) - when "term_timeout" then config = config.with(term_timeout: value.to_i) - when "term_client_close_timeout" then config = config.with(term_client_close_timeout: value.to_i) - else raise "Unsupported config #{name}/#{key}" - end - end - when "listen" - section.each do |key, value| - case key - when "port" then config = config.with(listen_port: value.to_i) - when "bind", "address" then config = config.with(listen_address: value) - when "log_level" then config = config.with(log_level: ::Log::Severity.parse(value)) - else raise "Unsupported config #{name}/#{key}" - end - end - else raise "Unsupported config section #{name}" - end - end - - config - rescue ex - abort ex.message - end - - def load_from_env - self.with( - listen_address: ENV["LISTEN_ADDRESS"]?, - listen_port: ENV["LISTEN_PORT"]?.try &.to_i, - http_port: ENV["HTTP_PORT"]?.try &.to_i, - log_level: Log::Severity.parse(ENV["LOG_LEVEL"]? || self.log_level.to_s), - idle_connection_timeout: ENV["IDLE_CONNECTION_TIMEOUT"]?.try &.to_i, - term_timeout: ENV["TERM_TIMEOUT"]?.try &.to_i, - term_client_close_timeout: ENV["TERM_CLIENT_CLOSE_TIMEOUT"]?.try &.to_i, - upstream: ENV["UPSTREAM"]? - ) - end - - def load_from_options(args) - config = self - - p = OptionParser.parse(args) do |parser| - parser.on("-l ADDRESS", "--listen=ADDRESS", "Address to listen on (default is localhost)") do |v| - config = config.with(listen_address: v) - end - parser.on("-p PORT", "--port=PORT", "Port to listen on (default: 5673)") { |v| config = config.with(listen_port: v.to_i) } - parser.on("-b PORT", "--http-port=PORT", "HTTP Port to listen on (default: 15673)") { |v| config = config.with(http_port: v.to_i) } - parser.on("-t IDLE_CONNECTION_TIMEOUT", "--idle-connection-timeout=SECONDS", "Maximum time in seconds an unused pooled connection stays open (default 5s)") do |v| - config = config.with(idle_connection_timeout: v.to_i) - end - parser.on("--term-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to gracefully close their sockets after Close has been sent (default: infinite)") do |v| - config = config.with(term_timeout: v.to_i) - end - parser.on("--term-client-close-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to send Close beforing sending Close to clients (default: 0s)") do |v| - config = config.with(term_client_close_timeout: v.to_i) - end - parser.on("--log-level=LEVEL", "The log level (default: info)") { |v| config = config.with(log_level: Log::Severity.parse(v)) } - end - - config - end - - def self.load_with_cli(args, path = "config.ini") - config = self.create() - .load_from_file(path) - .load_from_env() - .load_from_options(args) - - config - end - end -end From 98b0d692b3cfd3a13975ff7df173d8210f2d851f Mon Sep 17 00:00:00 2001 From: Arjan Vermunt Date: Fri, 21 Mar 2025 22:40:07 +0100 Subject: [PATCH 05/19] Fix config references --- spec/amqproxy/config_spec.cr | 31 ++++++++++++++++++++++++++ src/amqproxy/cli.cr | 43 ++++++++++++++++++++++-------------- src/amqproxy/config.cr | 13 ++++++++++- 3 files changed, 70 insertions(+), 17 deletions(-) diff --git a/spec/amqproxy/config_spec.cr b/spec/amqproxy/config_spec.cr index e3bd65c..cd08d1b 100644 --- a/spec/amqproxy/config_spec.cr +++ b/spec/amqproxy/config_spec.cr @@ -110,6 +110,37 @@ describe AMQProxy::Config do ARGV.concat(previous_argv) end + it "sets log level to debug when debug flag is present" do + previous_argv = ARGV.clone + ARGV.clear + + ARGV.concat([ + "--listen=example_arg.com", + "--port=5675", + "--http-port=15675", + "--log-level=Warn", + "--idle-connection-timeout=15", + "--term-timeout=16", + "--term-client-close-timeout=17", + "--debug", + "amqp://localhost:5679"]) + + config = AMQProxy::Config.load_with_cli(ARGV) + + config.listen_address.should eq "example_arg.com" + config.log_level.should eq ::Log::Severity::Debug + config.listen_port.should eq 5675 + config.http_port.should eq 15675 + config.idle_connection_timeout.should eq 15 + config.term_timeout.should eq 16 + config.term_client_close_timeout.should eq 17 + config.upstream.should eq "amqp://localhost:5679" + + # Restore ARGV + ARGV.clear + ARGV.concat(previous_argv) + end + it "reads from empty config file returning default configuration" do previous_argv = ARGV.clone ARGV.clear diff --git a/src/amqproxy/cli.cr b/src/amqproxy/cli.cr index 809b006..4851950 100644 --- a/src/amqproxy/cli.cr +++ b/src/amqproxy/cli.cr @@ -10,16 +10,21 @@ require "log" class AMQProxy::CLI Log = ::Log.for(self) - @config : Config + @config : AMQProxy::Config? = nil @server : AMQProxy::Server? = nil def run(argv) raise "run cant be called multiple times" unless @server.nil? - file : String? = nil + Log.Debug { "Starting AMQProxy #{AMQProxy::VERSION} with options: #{ARGV.join(", ")}" } - # validate options and get config file when specified - p = OptionParser.parse(argv) do |parser| + ini_file : String? = nil + + # Need to clone the args, because OptionParser will modify them + argv_validation = argv.clone + + # validate options and get config ini file path when provided + p = OptionParser.parse(argv_validation) do |parser| parser.banner = "Usage: amqproxy [options] [amqp upstream url]" parser.on("-l ADDRESS", "--listen=ADDRESS", "Address to listen on (default is localhost)") { } parser.on("-p PORT", "--port=PORT", "Port to listen on (default: 5673)") { } @@ -29,15 +34,18 @@ class AMQProxy::CLI parser.on("--term-client-close-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to send Close beforing sending Close to clients (default: 0s)") { } parser.on("--log-level=LEVEL", "The log level (default: info)") { } parser.on("-d", "--debug", "Verbose logging") { } - parser.on("-c FILE", "--config=FILE", "Load config file") { |v| file = v } + parser.on("-c FILE", "--config=FILE", "Load config file") { |v| ini_file = v } parser.on("-h", "--help", "Show this help") { puts parser.to_s; exit 0 } parser.on("-v", "--version", "Display version") { puts AMQProxy::VERSION.to_s; exit 0 } parser.invalid_option { |arg| abort "Invalid argument: #{arg}" } end - @config = AMQProxy::Config.load_with_cli(argv, file) + # load cascading configuration: sequence defaults, file, env and cli + config = @config = AMQProxy::Config.load_with_cli(argv, ini_file) + + upstream_url = config.upstream || abort p.to_s + u = URI.parse upstream_url - u = URI.parse @config.upstream abort "Invalid upstream URL" unless u.host default_port = case u.scheme @@ -53,15 +61,15 @@ class AMQProxy::CLI else ::Log::IOBackend.new(formatter: Stdout::LogFormat, dispatcher: ::Log::DirectDispatcher) end - ::Log.setup_from_env(default_level: @config.log_level, backend: log_backend) + ::Log.setup_from_env(default_level: config.log_level, backend: log_backend) Signal::INT.trap &->self.initiate_shutdown(Signal) Signal::TERM.trap &->self.initiate_shutdown(Signal) - server = @server = AMQProxy::Server.new(u.hostname || "", port, tls, @config.idle_connection_timeout) + server = @server = AMQProxy::Server.new(u.hostname || "", port, tls, config.idle_connection_timeout) - HTTPServer.new(server, @config.listen_address, @config.http_port) - server.listen(config.listen_address, @config.listen_port) + HTTPServer.new(server, config.listen_address, config.http_port) + server.listen(config.listen_address, config.listen_port) shutdown @@ -90,17 +98,20 @@ class AMQProxy::CLI unless server = @server raise "Can't call shutdown before run" end + + config = @config.not_nil! + if server.client_connections > 0 - if @config.term_client_close_timeout > 0 - wait_for_clients_to_close @config.term_client_close_timeout.seconds + if config.term_client_close_timeout > 0 + wait_for_clients_to_close config.term_client_close_timeout.seconds end server.disconnect_clients end if server.client_connections > 0 - if @config.term_timeout >= 0 + if config.term_timeout >= 0 spawn do - sleep @config.term_timeout + sleep config.term_timeout abort "Exiting with #{server.client_connections} client connections still open" end end @@ -108,7 +119,7 @@ class AMQProxy::CLI end def wait_for_clients_to_close(close_timeout) - unless server = @server + unless server = @server raise "Can't call shutdown before run" end Log.info { "Waiting for clients to close their connections." } diff --git a/src/amqproxy/config.cr b/src/amqproxy/config.cr index b5254e9..ebb2bbb 100644 --- a/src/amqproxy/config.cr +++ b/src/amqproxy/config.cr @@ -44,7 +44,7 @@ module AMQProxy log_level : Log::Severity? = nil, idle_connection_timeout : Int32? = nil, term_timeout : Int32? = nil, - term_client_close_timeout : Int32? = nil, + term_client_close_timeout : Int32? = nil, upstream : String? = nil ) Config.new( @@ -112,6 +112,9 @@ module AMQProxy protected def load_from_options(args) config = self + Log.info { "listen_address: #{config.listen_address}" } + is_debug : Bool = false + p = OptionParser.parse(args) do |parser| parser.on("-l ADDRESS", "--listen=ADDRESS", "Address to listen on (default is localhost)") do |v| config = config.with(listen_address: v) @@ -128,8 +131,16 @@ module AMQProxy config = config.with(term_client_close_timeout: v.to_i) end parser.on("--log-level=LEVEL", "The log level (default: info)") { |v| config = config.with(log_level: Log::Severity.parse(v)) } + parser.on("-d", "--debug", "Verbose logging") { is_debug = true } end + # the debug flag overrules the log level + if (is_debug) + config = config.with(log_level: Log::Severity::Debug) + end + + Log.info { "listen_address: #{config.listen_address}" } + config end From fc7e84c38d015f096b61f25b58eea6cac6f6277e Mon Sep 17 00:00:00 2001 From: Arjan Vermunt Date: Sat, 22 Mar 2025 08:50:41 +0100 Subject: [PATCH 06/19] fix log_level handling and add specs remove initial logging --- spec/amqproxy/config_spec.cr | 56 +++++++++++++++++++----------------- src/amqproxy/cli.cr | 2 -- src/amqproxy/config.cr | 5 ++-- 3 files changed, 33 insertions(+), 30 deletions(-) diff --git a/spec/amqproxy/config_spec.cr b/spec/amqproxy/config_spec.cr index cd08d1b..bcd3bbb 100644 --- a/spec/amqproxy/config_spec.cr +++ b/spec/amqproxy/config_spec.cr @@ -22,7 +22,27 @@ describe AMQProxy::Config do ARGV.concat(previous_argv) end - it "reads from environment variables and overwrites ini file values" do + it "reads from empty config file returning default configuration" do + previous_argv = ARGV.clone + ARGV.clear + + config = AMQProxy::Config.load_with_cli(ARGV, "/tmp/config_empty.ini") + + config.listen_address.should eq "localhost" + config.listen_port.should eq 5673 + config.http_port.should eq 15673 + config.log_level.should eq ::Log::Severity::Info + config.idle_connection_timeout.should eq 5 + config.term_timeout.should eq -1 + config.term_client_close_timeout.should eq 0 + config.upstream.should eq nil + + # Restore ARGV + ARGV.clear + ARGV.concat(previous_argv) + end + + it "reads from environment variables and overrules ini file values" do previous_argv = ARGV.clone ARGV.clear @@ -141,45 +161,29 @@ describe AMQProxy::Config do ARGV.concat(previous_argv) end - it "reads from empty config file returning default configuration" do + it "keeps the log level to trace when debug flag is present" do previous_argv = ARGV.clone ARGV.clear - config = AMQProxy::Config.load_with_cli(ARGV, "/tmp/config_empty.ini") - - config.listen_address.should eq "localhost" - - # Restore ARGV - ARGV.clear - ARGV.concat(previous_argv) - end - - it "reads init file without error" do - previous_argv = ARGV.clone - ARGV.clear + ARGV.concat([ + "--log-level=Trace", + "--debug"]) config = AMQProxy::Config.load_with_cli(ARGV) - config.listen_address.should eq "127.0.0.2" - config.listen_port.should eq 5678 - config.http_port.should eq 15678 - config.log_level.should eq ::Log::Severity::Debug - config.idle_connection_timeout.should eq 55 - config.term_timeout.should eq 56 - config.term_client_close_timeout.should eq 57 - config.upstream.should eq "amqp://localhost:5678" - + config.log_level.should eq ::Log::Severity::Trace + # Restore ARGV ARGV.clear ARGV.concat(previous_argv) end - it "reads default ini file when null ini path is specified" do + it "reads default ini file when ini file path is null" do previous_argv = ARGV.clone ARGV.clear - initPath : String? = nil - config = AMQProxy::Config.load_with_cli(ARGV, initPath) + init_file_path : String? = nil + config = AMQProxy::Config.load_with_cli(ARGV, init_file_path) config.listen_address.should eq "127.0.0.2" config.listen_port.should eq 5678 diff --git a/src/amqproxy/cli.cr b/src/amqproxy/cli.cr index 4851950..7078277 100644 --- a/src/amqproxy/cli.cr +++ b/src/amqproxy/cli.cr @@ -16,8 +16,6 @@ class AMQProxy::CLI def run(argv) raise "run cant be called multiple times" unless @server.nil? - Log.Debug { "Starting AMQProxy #{AMQProxy::VERSION} with options: #{ARGV.join(", ")}" } - ini_file : String? = nil # Need to clone the args, because OptionParser will modify them diff --git a/src/amqproxy/config.cr b/src/amqproxy/config.cr index ebb2bbb..097068d 100644 --- a/src/amqproxy/config.cr +++ b/src/amqproxy/config.cr @@ -134,8 +134,9 @@ module AMQProxy parser.on("-d", "--debug", "Verbose logging") { is_debug = true } end - # the debug flag overrules the log level - if (is_debug) + # the debug flag overrules the log level. Only set the level + # when it is not already set to debug or trace + if (is_debug && config.log_level > Log::Severity::Debug) config = config.with(log_level: Log::Severity::Debug) end From 224123ac5586d8df60995a72bd5eae2846d716a5 Mon Sep 17 00:00:00 2001 From: Arjan Vermunt Date: Sat, 29 Mar 2025 10:34:16 +0100 Subject: [PATCH 07/19] Add options record --- spec/amqproxy/config_spec.cr | 129 ++++++++++++++-------------------- spec/amqproxy/options_spec.cr | 32 +++++++++ src/amqproxy/cli.cr | 57 ++++++++------- src/amqproxy/config.cr | 120 ++++++++++++++----------------- src/amqproxy/options.cr | 41 +++++++++++ 5 files changed, 208 insertions(+), 171 deletions(-) create mode 100644 spec/amqproxy/options_spec.cr create mode 100644 src/amqproxy/options.cr diff --git a/spec/amqproxy/config_spec.cr b/spec/amqproxy/config_spec.cr index bcd3bbb..73b830e 100644 --- a/spec/amqproxy/config_spec.cr +++ b/spec/amqproxy/config_spec.cr @@ -3,10 +3,10 @@ require "../../src/amqproxy/config" describe AMQProxy::Config do it "loads defaults when no ini file, env vars or options are available" do - previous_argv = ARGV.clone - ARGV.clear + options = AMQProxy::Options.new + options = options.with(ini_file: "/tmp/non_existing_file.ini") - config = AMQProxy::Config.load_with_cli(ARGV, "/tmp/non_existing_file.ini") + config = AMQProxy::Config.load_with_cli(options) config.listen_address.should eq "localhost" config.listen_port.should eq 5673 @@ -16,17 +16,13 @@ describe AMQProxy::Config do config.term_timeout.should eq -1 config.term_client_close_timeout.should eq 0 config.upstream.should eq nil - - # Restore ARGV - ARGV.clear - ARGV.concat(previous_argv) end it "reads from empty config file returning default configuration" do - previous_argv = ARGV.clone - ARGV.clear + options = AMQProxy::Options.new + options = options.with(ini_file: "/tmp/config_empty.ini") - config = AMQProxy::Config.load_with_cli(ARGV, "/tmp/config_empty.ini") + config = AMQProxy::Config.load_with_cli(options) config.listen_address.should eq "localhost" config.listen_port.should eq 5673 @@ -36,15 +32,10 @@ describe AMQProxy::Config do config.term_timeout.should eq -1 config.term_client_close_timeout.should eq 0 config.upstream.should eq nil - - # Restore ARGV - ARGV.clear - ARGV.concat(previous_argv) end it "reads from environment variables and overrules ini file values" do - previous_argv = ARGV.clone - ARGV.clear + options = AMQProxy::Options.new ENV["LISTEN_ADDRESS"] = "example.com" ENV["LISTEN_PORT"] = "5674" @@ -55,7 +46,7 @@ describe AMQProxy::Config do ENV["TERM_CLIENT_CLOSE_TIMEOUT"] = "14" ENV["UPSTREAM"] = "amqp://localhost:5674" - config = AMQProxy::Config.load_with_cli(ARGV) + config = AMQProxy::Config.load_with_cli(options) config.listen_address.should eq "example.com" config.listen_port.should eq 5674 @@ -75,16 +66,9 @@ describe AMQProxy::Config do ENV.delete("TERM_TIMEOUT") ENV.delete("TERM_CLIENT_CLOSE_TIMEOUT") ENV.delete("UPSTREAM") - - # Restore ARGV - ARGV.clear - ARGV.concat(previous_argv) end it "reads from command line arguments and overrules env vars" do - previous_argv = ARGV.clone - ARGV.clear - ENV["LISTEN_ADDRESS"] = "example.com" ENV["LISTEN_PORT"] = "5674" ENV["HTTP_PORT"] = "15674" @@ -94,17 +78,19 @@ describe AMQProxy::Config do ENV["TERM_CLIENT_CLOSE_TIMEOUT"] = "14" ENV["UPSTREAM"] = "amqp://localhost:5674" - ARGV.concat([ - "--listen=example_arg.com", - "--port=5675", - "--http-port=15675", - "--log-level=Warn", - "--idle-connection-timeout=15", - "--term-timeout=16", - "--term-client-close-timeout=17", - "amqp://localhost:5679"]) + options = AMQProxy::Options.new + options = options.with( + listen_address: "example_arg.com", + listen_port: 5675, + http_port: 15675, + log_level: ::Log::Severity::Warn, + idle_connection_timeout: 15, + term_timeout: 16, + term_client_close_timeout: 17, + upstream: "amqp://localhost:5679" + ) - config = AMQProxy::Config.load_with_cli(ARGV) + config = AMQProxy::Config.load_with_cli(options) config.listen_address.should eq "example_arg.com" config.log_level.should eq ::Log::Severity::Warn @@ -124,28 +110,23 @@ describe AMQProxy::Config do ENV.delete("TERM_TIMEOUT") ENV.delete("TERM_CLIENT_CLOSE_TIMEOUT") ENV.delete("UPSTREAM") - - # Restore ARGV - ARGV.clear - ARGV.concat(previous_argv) end it "sets log level to debug when debug flag is present" do - previous_argv = ARGV.clone - ARGV.clear - - ARGV.concat([ - "--listen=example_arg.com", - "--port=5675", - "--http-port=15675", - "--log-level=Warn", - "--idle-connection-timeout=15", - "--term-timeout=16", - "--term-client-close-timeout=17", - "--debug", - "amqp://localhost:5679"]) - - config = AMQProxy::Config.load_with_cli(ARGV) + options = AMQProxy::Options.new + options = options.with( + listen_address: "example_arg.com", + listen_port: 5675, + http_port: 15675, + log_level: ::Log::Severity::Warn, + idle_connection_timeout: 15, + term_timeout: 16, + term_client_close_timeout: 17, + is_debug: true, + upstream: "amqp://localhost:5679" + ) + + config = AMQProxy::Config.load_with_cli(options) config.listen_address.should eq "example_arg.com" config.log_level.should eq ::Log::Severity::Debug @@ -155,35 +136,31 @@ describe AMQProxy::Config do config.term_timeout.should eq 16 config.term_client_close_timeout.should eq 17 config.upstream.should eq "amqp://localhost:5679" - - # Restore ARGV - ARGV.clear - ARGV.concat(previous_argv) end it "keeps the log level to trace when debug flag is present" do - previous_argv = ARGV.clone - ARGV.clear - - ARGV.concat([ - "--log-level=Trace", - "--debug"]) - - config = AMQProxy::Config.load_with_cli(ARGV) + options = AMQProxy::Options.new + options = options.with( + listen_address: "example_arg.com", + listen_port: 5675, + http_port: 15675, + log_level: ::Log::Severity::Trace, + idle_connection_timeout: 15, + term_timeout: 16, + term_client_close_timeout: 17, + is_debug: true, + upstream: "amqp://localhost:5679" + ) + + config = AMQProxy::Config.load_with_cli(options) config.log_level.should eq ::Log::Severity::Trace - - # Restore ARGV - ARGV.clear - ARGV.concat(previous_argv) end it "reads default ini file when ini file path is null" do - previous_argv = ARGV.clone - ARGV.clear - - init_file_path : String? = nil - config = AMQProxy::Config.load_with_cli(ARGV, init_file_path) + options = AMQProxy::Options.new + + config = AMQProxy::Config.load_with_cli(options) config.listen_address.should eq "127.0.0.2" config.listen_port.should eq 5678 @@ -193,9 +170,5 @@ describe AMQProxy::Config do config.term_timeout.should eq 56 config.term_client_close_timeout.should eq 57 config.upstream.should eq "amqp://localhost:5678" - - # Restore ARGV - ARGV.clear - ARGV.concat(previous_argv) end end diff --git a/spec/amqproxy/options_spec.cr b/spec/amqproxy/options_spec.cr new file mode 100644 index 0000000..73a3845 --- /dev/null +++ b/spec/amqproxy/options_spec.cr @@ -0,0 +1,32 @@ +require "spec" +require "../../src/amqproxy/options" + +describe AMQProxy::Options do + it "can be changed by with yielding a new options record with correct property values" do + options = AMQProxy::Options.new + + options = options.with( + listen_port: 34, + http_port: 35, + idle_connection_timeout: 36, + term_timeout: 37, + term_client_close_timeout: 38, + log_level: ::Log::Severity::Trace, + is_debug: true, + ini_file: "the_init_file.config", + listen_address: "listen.example.com", + upstream: "upstream.example.com:39" + ) + + options.listen_address.should eq "listen.example.com" + options.listen_port.should eq 34 + options.upstream.should eq "upstream.example.com:39" + options.http_port.should eq 35 + options.idle_connection_timeout.should eq 36 + options.term_timeout.should eq 37 + options.term_client_close_timeout.should eq 38 + options.log_level.should eq ::Log::Severity::Trace + options.is_debug.should eq true + options.ini_file.should eq "the_init_file.config" + end +end diff --git a/src/amqproxy/cli.cr b/src/amqproxy/cli.cr index 7078277..70be6e5 100644 --- a/src/amqproxy/cli.cr +++ b/src/amqproxy/cli.cr @@ -1,4 +1,5 @@ require "./config" +require "./options" require "./version" require "./server" require "./http_server" @@ -16,30 +17,45 @@ class AMQProxy::CLI def run(argv) raise "run cant be called multiple times" unless @server.nil? - ini_file : String? = nil - # Need to clone the args, because OptionParser will modify them - argv_validation = argv.clone - - # validate options and get config ini file path when provided - p = OptionParser.parse(argv_validation) do |parser| - parser.banner = "Usage: amqproxy [options] [amqp upstream url]" - parser.on("-l ADDRESS", "--listen=ADDRESS", "Address to listen on (default is localhost)") { } - parser.on("-p PORT", "--port=PORT", "Port to listen on (default: 5673)") { } - parser.on("-b PORT", "--http-port=PORT", "HTTP Port to listen on (default: 15673)") { } - parser.on("-t IDLE_CONNECTION_TIMEOUT", "--idle-connection-timeout=SECONDS", "Maxiumum time in seconds an unused pooled connection stays open (default 5s)") { } - parser.on("--term-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to gracefully close their sockets after Close has been sent (default: infinite)") { } - parser.on("--term-client-close-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to send Close beforing sending Close to clients (default: 0s)") { } - parser.on("--log-level=LEVEL", "The log level (default: info)") { } - parser.on("-d", "--debug", "Verbose logging") { } - parser.on("-c FILE", "--config=FILE", "Load config file") { |v| ini_file = v } + options = AMQProxy::Options.new + + p = OptionParser.parse(argv) do |parser| + parser.on("-l ADDRESS", "--listen=ADDRESS", "Address to listen on (default is localhost)") do |v| + options = options.with(listen_address: v) + end + parser.on("-p PORT", "--port=PORT", "Port to listen on (default: 5673)") { |v| options = options.with(listen_port: v.to_i) } + parser.on("-b PORT", "--http-port=PORT", "HTTP Port to listen on (default: 15673)") { |v| options = options.with(http_port: v.to_i) } + parser.on("-t IDLE_CONNECTION_TIMEOUT", "--idle-connection-timeout=SECONDS", "Maximum time in seconds an unused pooled connection stays open (default 5s)") do |v| + options = options.with(idle_connection_timeout: v.to_i) + end + parser.on("--term-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to gracefully close their sockets after Close has been sent (default: infinite)") do |v| + options = options.with(term_timeout: v.to_i) + end + parser.on("--term-client-close-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to send Close beforing sending Close to clients (default: 0s)") do |v| + options = options.with(term_client_close_timeout: v.to_i) + end + parser.on("--log-level=LEVEL", "The log level (default: info)") { |v| options = options.with(log_level: ::Log::Severity.parse(v)) } + parser.on("-d", "--debug", "Verbose logging") { options = options.with(is_debug: true) } + parser.on("-c FILE", "--config=FILE", "Load config file") { |v| options = options.with(ini_file: v) } parser.on("-h", "--help", "Show this help") { puts parser.to_s; exit 0 } parser.on("-v", "--version", "Display version") { puts AMQProxy::VERSION.to_s; exit 0 } parser.invalid_option { |arg| abort "Invalid argument: #{arg}" } end + options = options.with(upstream: argv.shift?) + # load cascading configuration: sequence defaults, file, env and cli - config = @config = AMQProxy::Config.load_with_cli(argv, ini_file) + config = @config = AMQProxy::Config.load_with_cli(options) + + log_backend = if ENV.has_key?("JOURNAL_STREAM") + ::Log::IOBackend.new(formatter: Journal::LogFormat, dispatcher: ::Log::DirectDispatcher) + else + ::Log::IOBackend.new(formatter: Stdout::LogFormat, dispatcher: ::Log::DirectDispatcher) + end + ::Log.setup_from_env(default_level: config.log_level, backend: log_backend) + + Log.debug { config.inspect } upstream_url = config.upstream || abort p.to_s u = URI.parse upstream_url @@ -54,13 +70,6 @@ class AMQProxy::CLI port = u.port || default_port tls = u.scheme == "amqps" - log_backend = if ENV.has_key?("JOURNAL_STREAM") - ::Log::IOBackend.new(formatter: Journal::LogFormat, dispatcher: ::Log::DirectDispatcher) - else - ::Log::IOBackend.new(formatter: Stdout::LogFormat, dispatcher: ::Log::DirectDispatcher) - end - ::Log.setup_from_env(default_level: config.log_level, backend: log_backend) - Signal::INT.trap &->self.initiate_shutdown(Signal) Signal::TERM.trap &->self.initiate_shutdown(Signal) diff --git a/src/amqproxy/config.cr b/src/amqproxy/config.cr index 097068d..6d38eec 100644 --- a/src/amqproxy/config.cr +++ b/src/amqproxy/config.cr @@ -4,58 +4,58 @@ require "option_parser" module AMQProxy record Config, - listen_address : String, - listen_port : Int32, - http_port : Int32, - log_level : Log::Severity, - idle_connection_timeout : Int32, - term_timeout : Int32, + listen_address : String, + listen_port : Int32, + http_port : Int32, + log_level : Log::Severity, + idle_connection_timeout : Int32, + term_timeout : Int32, term_client_close_timeout : Int32, - upstream : String? do + upstream : String? do # Factory method to create a Config with nullable parameters private def self.create( - listen_address : String? = nil, - listen_port : Int32? = nil, - http_port : Int32? = nil, - log_level : Log::Severity? = nil, - idle_connection_timeout : Int32? = nil, - term_timeout : Int32? = nil, - term_client_close_timeout : Int32? = nil, - upstream : String? = nil + listen_address : String? = nil, + listen_port : Int32? = nil, + http_port : Int32? = nil, + log_level : Log::Severity? = nil, + idle_connection_timeout : Int32? = nil, + term_timeout : Int32? = nil, + term_client_close_timeout : Int32? = nil, + upstream : String? = nil ) new( - listen_address || "localhost", - listen_port || 5673, - http_port || 15673, - log_level || Log::Severity::Info, - idle_connection_timeout || 5, - term_timeout || -1, + listen_address || "localhost", + listen_port || 5673, + http_port || 15673, + log_level || Log::Severity::Info, + idle_connection_timeout || 5, + term_timeout || -1, term_client_close_timeout || 0, - upstream || nil + upstream || nil ) end # Method to return a new instance with modified fields (like C# `with`) protected def with( - listen_address : String? = nil, - listen_port : Int32? = nil, - http_port : Int32? = nil, - log_level : Log::Severity? = nil, - idle_connection_timeout : Int32? = nil, - term_timeout : Int32? = nil, - term_client_close_timeout : Int32? = nil, - upstream : String? = nil + listen_address : String? = nil, + listen_port : Int32? = nil, + http_port : Int32? = nil, + log_level : Log::Severity? = nil, + idle_connection_timeout : Int32? = nil, + term_timeout : Int32? = nil, + term_client_close_timeout : Int32? = nil, + upstream : String? = nil ) Config.new( - listen_address || self.listen_address, - listen_port || self.listen_port, - http_port || self.http_port, - log_level || self.log_level, - idle_connection_timeout || self.idle_connection_timeout, - term_timeout || self.term_timeout, + listen_address || self.listen_address, + listen_port || self.listen_port, + http_port || self.http_port, + log_level || self.log_level, + idle_connection_timeout || self.idle_connection_timeout, + term_timeout || self.term_timeout, term_client_close_timeout || self.term_client_close_timeout, - upstream || self.upstream + upstream || self.upstream ) end @@ -109,50 +109,32 @@ module AMQProxy ) end - protected def load_from_options(args) + protected def load_from_options(options) config = self - Log.info { "listen_address: #{config.listen_address}" } - is_debug : Bool = false - - p = OptionParser.parse(args) do |parser| - parser.on("-l ADDRESS", "--listen=ADDRESS", "Address to listen on (default is localhost)") do |v| - config = config.with(listen_address: v) - end - parser.on("-p PORT", "--port=PORT", "Port to listen on (default: 5673)") { |v| config = config.with(listen_port: v.to_i) } - parser.on("-b PORT", "--http-port=PORT", "HTTP Port to listen on (default: 15673)") { |v| config = config.with(http_port: v.to_i) } - parser.on("-t IDLE_CONNECTION_TIMEOUT", "--idle-connection-timeout=SECONDS", "Maximum time in seconds an unused pooled connection stays open (default 5s)") do |v| - config = config.with(idle_connection_timeout: v.to_i) - end - parser.on("--term-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to gracefully close their sockets after Close has been sent (default: infinite)") do |v| - config = config.with(term_timeout: v.to_i) - end - parser.on("--term-client-close-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to send Close beforing sending Close to clients (default: 0s)") do |v| - config = config.with(term_client_close_timeout: v.to_i) - end - parser.on("--log-level=LEVEL", "The log level (default: info)") { |v| config = config.with(log_level: Log::Severity.parse(v)) } - parser.on("-d", "--debug", "Verbose logging") { is_debug = true } - end + config = config.with(listen_address: options.listen_address, + listen_port: options.listen_port, + http_port: options.http_port, + idle_connection_timeout: options.idle_connection_timeout, + term_timeout: options.term_timeout, + term_client_close_timeout: options.term_client_close_timeout, + log_level: options.log_level, + upstream: options.upstream) # the debug flag overrules the log level. Only set the level # when it is not already set to debug or trace - if (is_debug && config.log_level > Log::Severity::Debug) + if (options.is_debug && config.log_level > Log::Severity::Debug) config = config.with(log_level: Log::Severity::Debug) end - Log.info { "listen_address: #{config.listen_address}" } - config end - def self.load_with_cli(args, path : String? = nil) - config = self.create() - .load_from_file(path || "config.ini") + def self.load_with_cli(options : Options) + self.create() + .load_from_file(options.ini_file || "config.ini") .load_from_env() - .load_from_options(args) - .with(upstream: args.shift?) - - config + .load_from_options(options) end end end diff --git a/src/amqproxy/options.cr b/src/amqproxy/options.cr new file mode 100644 index 0000000..a9504bc --- /dev/null +++ b/src/amqproxy/options.cr @@ -0,0 +1,41 @@ +module AMQProxy + record Options, + listen_address : String? = nil, + listen_port : Int32? = nil, + http_port : Int32? = nil, + idle_connection_timeout : Int32? = nil, + term_timeout : Int32? = nil, + term_client_close_timeout : Int32? = nil, + log_level : Log::Severity? = nil, + is_debug : Bool = false, + ini_file : String? = nil, + upstream : String? = nil do + + # Define `with` method to allow selective modifications + def with( + listen_address : String? = self.listen_address, + listen_port : Int32? = self.listen_port, + http_port : Int32? = self.http_port, + idle_connection_timeout : Int32? = self.idle_connection_timeout, + term_timeout : Int32? = self.term_timeout, + term_client_close_timeout : Int32? = self.term_client_close_timeout, + log_level : Log::Severity? = self.log_level, + is_debug : Bool = self.is_debug, + ini_file : String? = self.ini_file, + upstream : String? = self.upstream + ) + Options.new( + listen_address, + listen_port, + http_port, + idle_connection_timeout, + term_timeout, + term_client_close_timeout, + log_level, + is_debug, + ini_file, + upstream + ) + end + end +end From 8a0187aec7311825293a78c37a0243991b472639 Mon Sep 17 00:00:00 2001 From: Arjan Vermunt Date: Sat, 29 Mar 2025 10:58:32 +0100 Subject: [PATCH 08/19] move http_port to listen section add AMQP_URL to upstream setting --- spec/amqproxy/config_spec.cr | 15 +++++++++++++++ spec/config.ini | 2 +- src/amqproxy/config.cr | 4 ++-- 3 files changed, 18 insertions(+), 3 deletions(-) diff --git a/spec/amqproxy/config_spec.cr b/spec/amqproxy/config_spec.cr index 73b830e..54e16bb 100644 --- a/spec/amqproxy/config_spec.cr +++ b/spec/amqproxy/config_spec.cr @@ -34,6 +34,21 @@ describe AMQProxy::Config do config.upstream.should eq nil end + it "reads from environment variables and use AMPQ_URL over UPSTREAM variable" do + options = AMQProxy::Options.new + + ENV["AMQP_URL"] = "amqp://localhost:5673" + ENV["UPSTREAM"] = "amqp://localhost:5674" + + config = AMQProxy::Config.load_with_cli(options) + + config.upstream.should eq "amqp://localhost:5673" + + # Clean up + ENV.delete("AMQP_URL") + ENV.delete("UPSTREAM") + end + it "reads from environment variables and overrules ini file values" do options = AMQProxy::Options.new diff --git a/spec/config.ini b/spec/config.ini index 89193fc..ca38d1c 100644 --- a/spec/config.ini +++ b/spec/config.ini @@ -1,6 +1,5 @@ [main] log_level = debug -http_port = 15678 idle_connection_timeout = 55 term_timeout = 56 term_client_close_timeout = 57 @@ -10,4 +9,5 @@ upstream = amqp://localhost:5678 bind = 127.0.0.1 address = 127.0.0.2 port = 5678 +http_port = 15678 log_level = debug diff --git a/src/amqproxy/config.cr b/src/amqproxy/config.cr index 6d38eec..912b564 100644 --- a/src/amqproxy/config.cr +++ b/src/amqproxy/config.cr @@ -69,7 +69,6 @@ module AMQProxy when "main", "" section.each do |key, value| case key - when "http_port" then config = config.with(http_port: value.to_i) when "upstream" then config = config.with(upstream: value) when "log_level" then config = config.with(log_level: ::Log::Severity.parse(value)) when "idle_connection_timeout" then config = config.with(idle_connection_timeout: value.to_i) @@ -81,6 +80,7 @@ module AMQProxy when "listen" section.each do |key, value| case key + when "http_port" then config = config.with(http_port: value.to_i) when "port" then config = config.with(listen_port: value.to_i) when "bind", "address" then config = config.with(listen_address: value) when "log_level" then config = config.with(log_level: ::Log::Severity.parse(value)) @@ -105,7 +105,7 @@ module AMQProxy idle_connection_timeout: ENV["IDLE_CONNECTION_TIMEOUT"]?.try &.to_i, term_timeout: ENV["TERM_TIMEOUT"]?.try &.to_i, term_client_close_timeout: ENV["TERM_CLIENT_CLOSE_TIMEOUT"]?.try &.to_i, - upstream: ENV["UPSTREAM"]? + upstream: ENV["AMQP_URL"]? || ENV["UPSTREAM"]? ) end From 43e26ebde00e74fe7e46f6dd217f1708a9244f3e Mon Sep 17 00:00:00 2001 From: Arjan Vermunt Date: Sat, 29 Mar 2025 11:05:11 +0100 Subject: [PATCH 09/19] removed trailing space --- src/amqproxy/cli.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/amqproxy/cli.cr b/src/amqproxy/cli.cr index 70be6e5..79c5dc7 100644 --- a/src/amqproxy/cli.cr +++ b/src/amqproxy/cli.cr @@ -126,7 +126,7 @@ class AMQProxy::CLI end def wait_for_clients_to_close(close_timeout) - unless server = @server + unless server = @server raise "Can't call shutdown before run" end Log.info { "Waiting for clients to close their connections." } From 09d829105fdd3f8c39123709b2c0706a22acba4c Mon Sep 17 00:00:00 2001 From: Arjan Vermunt Date: Mon, 31 Mar 2025 08:20:20 +0200 Subject: [PATCH 10/19] Run ci on pull requests --- .github/workflows/ci.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 059f78a..f0816fa 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,5 +1,8 @@ name: CI on: + pull_request: + branches: + - main push: paths: - 'run-specs-in-docker.sh' From 71a05d13ede253cebc14e3562bf76215313cd290 Mon Sep 17 00:00:00 2001 From: Arjan Vermunt Date: Sat, 5 Apr 2025 11:02:34 +0200 Subject: [PATCH 11/19] Rework Options as struct --- spec/amqproxy/config_spec.cr | 63 ++++++++++++++++------------------- spec/amqproxy/options_spec.cr | 32 ------------------ src/amqproxy/cli.cr | 36 +++++++++++--------- src/amqproxy/options.cr | 51 +++++++--------------------- 4 files changed, 62 insertions(+), 120 deletions(-) delete mode 100644 spec/amqproxy/options_spec.cr diff --git a/spec/amqproxy/config_spec.cr b/spec/amqproxy/config_spec.cr index 54e16bb..a93425d 100644 --- a/spec/amqproxy/config_spec.cr +++ b/spec/amqproxy/config_spec.cr @@ -1,10 +1,11 @@ require "spec" require "../../src/amqproxy/config" +require "../../src/amqproxy/options" describe AMQProxy::Config do it "loads defaults when no ini file, env vars or options are available" do options = AMQProxy::Options.new - options = options.with(ini_file: "/tmp/non_existing_file.ini") + options.ini_file = "/tmp/non_existing_file.ini" config = AMQProxy::Config.load_with_cli(options) @@ -20,7 +21,7 @@ describe AMQProxy::Config do it "reads from empty config file returning default configuration" do options = AMQProxy::Options.new - options = options.with(ini_file: "/tmp/config_empty.ini") + options.ini_file = "/tmp/config_empty.ini" config = AMQProxy::Config.load_with_cli(options) @@ -94,16 +95,14 @@ describe AMQProxy::Config do ENV["UPSTREAM"] = "amqp://localhost:5674" options = AMQProxy::Options.new - options = options.with( - listen_address: "example_arg.com", - listen_port: 5675, - http_port: 15675, - log_level: ::Log::Severity::Warn, - idle_connection_timeout: 15, - term_timeout: 16, - term_client_close_timeout: 17, - upstream: "amqp://localhost:5679" - ) + options.listen_address = "example_arg.com" + options.listen_port = 5675 + options.http_port = 15675 + options.log_level = ::Log::Severity::Warn + options.idle_connection_timeout = 15 + options.term_timeout = 16 + options.term_client_close_timeout = 17 + options.upstream = "amqp://localhost:5679" config = AMQProxy::Config.load_with_cli(options) @@ -129,17 +128,15 @@ describe AMQProxy::Config do it "sets log level to debug when debug flag is present" do options = AMQProxy::Options.new - options = options.with( - listen_address: "example_arg.com", - listen_port: 5675, - http_port: 15675, - log_level: ::Log::Severity::Warn, - idle_connection_timeout: 15, - term_timeout: 16, - term_client_close_timeout: 17, - is_debug: true, - upstream: "amqp://localhost:5679" - ) + options.listen_address = "example_arg.com" + options.listen_port = 5675 + options.http_port = 15675 + options.log_level = ::Log::Severity::Warn + options.idle_connection_timeout = 15 + options.term_timeout = 16 + options.term_client_close_timeout = 17 + options.is_debug = true + options.upstream = "amqp://localhost:5679" config = AMQProxy::Config.load_with_cli(options) @@ -155,17 +152,15 @@ describe AMQProxy::Config do it "keeps the log level to trace when debug flag is present" do options = AMQProxy::Options.new - options = options.with( - listen_address: "example_arg.com", - listen_port: 5675, - http_port: 15675, - log_level: ::Log::Severity::Trace, - idle_connection_timeout: 15, - term_timeout: 16, - term_client_close_timeout: 17, - is_debug: true, - upstream: "amqp://localhost:5679" - ) + options.listen_address = "example_arg.com" + options.listen_port = 5675 + options.http_port = 15675 + options.log_level = ::Log::Severity::Trace + options.idle_connection_timeout = 15 + options.term_timeout = 16 + options.term_client_close_timeout = 17 + options.is_debug = true + options.upstream = "amqp://localhost:5679" config = AMQProxy::Config.load_with_cli(options) diff --git a/spec/amqproxy/options_spec.cr b/spec/amqproxy/options_spec.cr deleted file mode 100644 index 73a3845..0000000 --- a/spec/amqproxy/options_spec.cr +++ /dev/null @@ -1,32 +0,0 @@ -require "spec" -require "../../src/amqproxy/options" - -describe AMQProxy::Options do - it "can be changed by with yielding a new options record with correct property values" do - options = AMQProxy::Options.new - - options = options.with( - listen_port: 34, - http_port: 35, - idle_connection_timeout: 36, - term_timeout: 37, - term_client_close_timeout: 38, - log_level: ::Log::Severity::Trace, - is_debug: true, - ini_file: "the_init_file.config", - listen_address: "listen.example.com", - upstream: "upstream.example.com:39" - ) - - options.listen_address.should eq "listen.example.com" - options.listen_port.should eq 34 - options.upstream.should eq "upstream.example.com:39" - options.http_port.should eq 35 - options.idle_connection_timeout.should eq 36 - options.term_timeout.should eq 37 - options.term_client_close_timeout.should eq 38 - options.log_level.should eq ::Log::Severity::Trace - options.is_debug.should eq true - options.ini_file.should eq "the_init_file.config" - end -end diff --git a/src/amqproxy/cli.cr b/src/amqproxy/cli.cr index 79c5dc7..f396380 100644 --- a/src/amqproxy/cli.cr +++ b/src/amqproxy/cli.cr @@ -14,38 +14,44 @@ class AMQProxy::CLI @config : AMQProxy::Config? = nil @server : AMQProxy::Server? = nil - def run(argv) - raise "run cant be called multiple times" unless @server.nil? - - # Need to clone the args, because OptionParser will modify them + def load_options(argv) options = AMQProxy::Options.new p = OptionParser.parse(argv) do |parser| parser.on("-l ADDRESS", "--listen=ADDRESS", "Address to listen on (default is localhost)") do |v| - options = options.with(listen_address: v) + options.listen_address = v end - parser.on("-p PORT", "--port=PORT", "Port to listen on (default: 5673)") { |v| options = options.with(listen_port: v.to_i) } - parser.on("-b PORT", "--http-port=PORT", "HTTP Port to listen on (default: 15673)") { |v| options = options.with(http_port: v.to_i) } + parser.on("-p PORT", "--port=PORT", "Port to listen on (default: 5673)") { |v| options.listen_port = v.to_i } + parser.on("-b PORT", "--http-port=PORT", "HTTP Port to listen on (default: 15673)") { |v| options.http_port = v.to_i } parser.on("-t IDLE_CONNECTION_TIMEOUT", "--idle-connection-timeout=SECONDS", "Maximum time in seconds an unused pooled connection stays open (default 5s)") do |v| - options = options.with(idle_connection_timeout: v.to_i) + options.idle_connection_timeout = v.to_i end parser.on("--term-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to gracefully close their sockets after Close has been sent (default: infinite)") do |v| - options = options.with(term_timeout: v.to_i) + options.term_timeout = v.to_i end parser.on("--term-client-close-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to send Close beforing sending Close to clients (default: 0s)") do |v| - options = options.with(term_client_close_timeout: v.to_i) + options.term_client_close_timeout = v.to_i end - parser.on("--log-level=LEVEL", "The log level (default: info)") { |v| options = options.with(log_level: ::Log::Severity.parse(v)) } - parser.on("-d", "--debug", "Verbose logging") { options = options.with(is_debug: true) } - parser.on("-c FILE", "--config=FILE", "Load config file") { |v| options = options.with(ini_file: v) } + parser.on("--log-level=LEVEL", "The log level (default: info)") { |v| options.log_level = ::Log::Severity.parse(v) } + parser.on("-d", "--debug", "Verbose logging") { options.is_debug = true } + parser.on("-c FILE", "--config=FILE", "Load config file") { |v| options.ini_file = v } parser.on("-h", "--help", "Show this help") { puts parser.to_s; exit 0 } parser.on("-v", "--version", "Display version") { puts AMQProxy::VERSION.to_s; exit 0 } parser.invalid_option { |arg| abort "Invalid argument: #{arg}" } end - options = options.with(upstream: argv.shift?) + options.upstream = argv.shift? + + options + end + + def run(argv) + raise "run cant be called multiple times" unless @server.nil? + + # load options from command line arguments + options = load_options(argv) - # load cascading configuration: sequence defaults, file, env and cli + # load cascading configuration. load sequence: defaults -> file -> env -> cli config = @config = AMQProxy::Config.load_with_cli(options) log_backend = if ENV.has_key?("JOURNAL_STREAM") diff --git a/src/amqproxy/options.cr b/src/amqproxy/options.cr index a9504bc..013b3cb 100644 --- a/src/amqproxy/options.cr +++ b/src/amqproxy/options.cr @@ -1,41 +1,14 @@ module AMQProxy - record Options, - listen_address : String? = nil, - listen_port : Int32? = nil, - http_port : Int32? = nil, - idle_connection_timeout : Int32? = nil, - term_timeout : Int32? = nil, - term_client_close_timeout : Int32? = nil, - log_level : Log::Severity? = nil, - is_debug : Bool = false, - ini_file : String? = nil, - upstream : String? = nil do - - # Define `with` method to allow selective modifications - def with( - listen_address : String? = self.listen_address, - listen_port : Int32? = self.listen_port, - http_port : Int32? = self.http_port, - idle_connection_timeout : Int32? = self.idle_connection_timeout, - term_timeout : Int32? = self.term_timeout, - term_client_close_timeout : Int32? = self.term_client_close_timeout, - log_level : Log::Severity? = self.log_level, - is_debug : Bool = self.is_debug, - ini_file : String? = self.ini_file, - upstream : String? = self.upstream - ) - Options.new( - listen_address, - listen_port, - http_port, - idle_connection_timeout, - term_timeout, - term_client_close_timeout, - log_level, - is_debug, - ini_file, - upstream - ) - end - end + struct Options + property listen_address : String? = nil + property listen_port : Int32? = nil + property http_port : Int32? = nil + property idle_connection_timeout : Int32? = nil + property term_timeout : Int32? = nil + property term_client_close_timeout : Int32? = nil + property log_level : Log::Severity? = nil + property is_debug : Bool = false + property ini_file : String? = nil + property upstream : String? = nil + end end From efe7f516c13a29da6aaad35afb8bcb228c49e354 Mon Sep 17 00:00:00 2001 From: Arjan Vermunt Date: Sat, 5 Apr 2025 15:51:43 +0200 Subject: [PATCH 12/19] change config to struct with properties --- src/amqproxy/config.cr | 138 ++++++++++++++--------------------------- 1 file changed, 45 insertions(+), 93 deletions(-) diff --git a/src/amqproxy/config.cr b/src/amqproxy/config.cr index 912b564..2f10076 100644 --- a/src/amqproxy/config.cr +++ b/src/amqproxy/config.cr @@ -3,87 +3,41 @@ require "log" require "option_parser" module AMQProxy - record Config, - listen_address : String, - listen_port : Int32, - http_port : Int32, - log_level : Log::Severity, - idle_connection_timeout : Int32, - term_timeout : Int32, - term_client_close_timeout : Int32, - upstream : String? do + struct Config + getter listen_address : String = "localhost" + getter listen_port : Int32 = 5673 + getter http_port : Int32 = 15673 + getter log_level : Log::Severity = Log::Severity::Info + getter idle_connection_timeout : Int32 = 5 + getter term_timeout : Int32 = -1 + getter term_client_close_timeout : Int32 = 0 + getter upstream : String? - # Factory method to create a Config with nullable parameters - private def self.create( - listen_address : String? = nil, - listen_port : Int32? = nil, - http_port : Int32? = nil, - log_level : Log::Severity? = nil, - idle_connection_timeout : Int32? = nil, - term_timeout : Int32? = nil, - term_client_close_timeout : Int32? = nil, - upstream : String? = nil - ) - new( - listen_address || "localhost", - listen_port || 5673, - http_port || 15673, - log_level || Log::Severity::Info, - idle_connection_timeout || 5, - term_timeout || -1, - term_client_close_timeout || 0, - upstream || nil - ) - end - - # Method to return a new instance with modified fields (like C# `with`) - protected def with( - listen_address : String? = nil, - listen_port : Int32? = nil, - http_port : Int32? = nil, - log_level : Log::Severity? = nil, - idle_connection_timeout : Int32? = nil, - term_timeout : Int32? = nil, - term_client_close_timeout : Int32? = nil, - upstream : String? = nil - ) - Config.new( - listen_address || self.listen_address, - listen_port || self.listen_port, - http_port || self.http_port, - log_level || self.log_level, - idle_connection_timeout || self.idle_connection_timeout, - term_timeout || self.term_timeout, - term_client_close_timeout || self.term_client_close_timeout, - upstream || self.upstream - ) - end - - protected def load_from_file(path : String) # ameba:disable Metrics/CyclomaticComplexity - return self unless File.exists?(path) - - config = self + protected def load_from_file(path : String?) # ameba:disable Metrics/CyclomaticComplexity + if (path.nil? || path.empty? || !File.exists?(path)) + return self + end INI.parse(File.read(path)).each do |name, section| case name when "main", "" section.each do |key, value| case key - when "upstream" then config = config.with(upstream: value) - when "log_level" then config = config.with(log_level: ::Log::Severity.parse(value)) - when "idle_connection_timeout" then config = config.with(idle_connection_timeout: value.to_i) - when "term_timeout" then config = config.with(term_timeout: value.to_i) - when "term_client_close_timeout" then config = config.with(term_client_close_timeout: value.to_i) + when "upstream" then @upstream = value + when "log_level" then @log_level = ::Log::Severity.parse(value) + when "idle_connection_timeout" then @idle_connection_timeout = value.to_i + when "term_timeout" then @term_timeout = value.to_i + when "term_client_close_timeout" then @term_client_close_timeout = value.to_i else raise "Unsupported config #{name}/#{key}" end end when "listen" section.each do |key, value| case key - when "http_port" then config = config.with(http_port: value.to_i) - when "port" then config = config.with(listen_port: value.to_i) - when "bind", "address" then config = config.with(listen_address: value) - when "log_level" then config = config.with(log_level: ::Log::Severity.parse(value)) + when "http_port" then @http_port = value.to_i + when "port" then @listen_port = value.to_i + when "bind", "address" then @listen_address = value + when "log_level" then @log_level = ::Log::Severity.parse(value) else raise "Unsupported config #{name}/#{key}" end end @@ -91,47 +45,45 @@ module AMQProxy end end - config + self rescue ex abort ex.message end protected def load_from_env - self.with( - listen_address: ENV["LISTEN_ADDRESS"]?, - listen_port: ENV["LISTEN_PORT"]?.try &.to_i, - http_port: ENV["HTTP_PORT"]?.try &.to_i, - log_level: Log::Severity.parse(ENV["LOG_LEVEL"]? || self.log_level.to_s), - idle_connection_timeout: ENV["IDLE_CONNECTION_TIMEOUT"]?.try &.to_i, - term_timeout: ENV["TERM_TIMEOUT"]?.try &.to_i, - term_client_close_timeout: ENV["TERM_CLIENT_CLOSE_TIMEOUT"]?.try &.to_i, - upstream: ENV["AMQP_URL"]? || ENV["UPSTREAM"]? - ) + @listen_address = ENV["LISTEN_ADDRESS"]? || @listen_address + @listen_port = ENV["LISTEN_PORT"]?.try &.to_i || @listen_port + @http_port = ENV["HTTP_PORT"]?.try &.to_i || @http_port + @log_level = Log::Severity.parse(ENV["LOG_LEVEL"]? || self.log_level.to_s) || @log_level + @idle_connection_timeout = ENV["IDLE_CONNECTION_TIMEOUT"]?.try &.to_i || @idle_connection_timeout + @term_timeout = ENV["TERM_TIMEOUT"]?.try &.to_i || @term_timeout + @term_client_close_timeout = ENV["TERM_CLIENT_CLOSE_TIMEOUT"]?.try &.to_i || @term_client_close_timeout + @upstream = ENV["AMQP_URL"]? || ENV["UPSTREAM"]? || @upstream + + self end protected def load_from_options(options) - config = self - - config = config.with(listen_address: options.listen_address, - listen_port: options.listen_port, - http_port: options.http_port, - idle_connection_timeout: options.idle_connection_timeout, - term_timeout: options.term_timeout, - term_client_close_timeout: options.term_client_close_timeout, - log_level: options.log_level, - upstream: options.upstream) + @listen_address = options.listen_address || @listen_address + @listen_port = options.listen_port || @listen_port + @http_port = options.http_port || @http_port + @idle_connection_timeout = options.idle_connection_timeout || @idle_connection_timeout + @term_timeout = options.term_timeout || @term_timeout + @term_client_close_timeout = options.term_client_close_timeout || @term_client_close_timeout + @log_level = options.log_level || @log_level + @upstream = options.upstream || @upstream # the debug flag overrules the log level. Only set the level # when it is not already set to debug or trace - if (options.is_debug && config.log_level > Log::Severity::Debug) - config = config.with(log_level: Log::Severity::Debug) + if (options.is_debug && log_level > Log::Severity::Debug) + @log_level = Log::Severity::Debug end - config + self end def self.load_with_cli(options : Options) - self.create() + new() .load_from_file(options.ini_file || "config.ini") .load_from_env() .load_from_options(options) From a91687158a23aac35118b1ddd47f41cc2c1a96bb Mon Sep 17 00:00:00 2001 From: Arjan Vermunt Date: Sat, 5 Apr 2025 16:30:49 +0200 Subject: [PATCH 13/19] Fix lint issues --- spec/amqproxy/config_spec.cr | 8 ++++---- src/amqproxy/cli.cr | 10 ++++++---- src/amqproxy/config.cr | 8 ++++---- 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/spec/amqproxy/config_spec.cr b/spec/amqproxy/config_spec.cr index a93425d..342f583 100644 --- a/spec/amqproxy/config_spec.cr +++ b/spec/amqproxy/config_spec.cr @@ -24,7 +24,7 @@ describe AMQProxy::Config do options.ini_file = "/tmp/config_empty.ini" config = AMQProxy::Config.load_with_cli(options) - + config.listen_address.should eq "localhost" config.listen_port.should eq 5673 config.http_port.should eq 15673 @@ -40,7 +40,7 @@ describe AMQProxy::Config do ENV["AMQP_URL"] = "amqp://localhost:5673" ENV["UPSTREAM"] = "amqp://localhost:5674" - + config = AMQProxy::Config.load_with_cli(options) config.upstream.should eq "amqp://localhost:5673" @@ -61,7 +61,7 @@ describe AMQProxy::Config do ENV["TERM_TIMEOUT"] = "13" ENV["TERM_CLIENT_CLOSE_TIMEOUT"] = "14" ENV["UPSTREAM"] = "amqp://localhost:5674" - + config = AMQProxy::Config.load_with_cli(options) config.listen_address.should eq "example.com" @@ -169,7 +169,7 @@ describe AMQProxy::Config do it "reads default ini file when ini file path is null" do options = AMQProxy::Options.new - + config = AMQProxy::Config.load_with_cli(options) config.listen_address.should eq "127.0.0.2" diff --git a/src/amqproxy/cli.cr b/src/amqproxy/cli.cr index 90da01e..060276b 100644 --- a/src/amqproxy/cli.cr +++ b/src/amqproxy/cli.cr @@ -17,7 +17,7 @@ class AMQProxy::CLI def load_options(argv) options = AMQProxy::Options.new - p = OptionParser.parse(argv) do |parser| + OptionParser.parse(argv) do |parser| parser.on("-l ADDRESS", "--listen=ADDRESS", "Address to listen on (default is localhost)") do |v| options.listen_address = v end @@ -44,7 +44,7 @@ class AMQProxy::CLI options end - + def run(argv) raise "run cant be called multiple times" unless @server.nil? @@ -63,7 +63,7 @@ class AMQProxy::CLI Log.debug { config.inspect } - upstream_url = config.upstream || abort p.to_s + upstream_url = config.upstream || abort "Upstream AMQP url is required. Add -h switch for help." u = URI.parse upstream_url abort "Invalid upstream URL" unless u.host @@ -112,7 +112,9 @@ class AMQProxy::CLI raise "Can't call shutdown before run" end - config = @config.not_nil! + unless config = @config + raise "Configuration has not been loaded" + end if server.client_connections > 0 if config.term_client_close_timeout > 0 diff --git a/src/amqproxy/config.cr b/src/amqproxy/config.cr index 2f10076..a743384 100644 --- a/src/amqproxy/config.cr +++ b/src/amqproxy/config.cr @@ -14,7 +14,7 @@ module AMQProxy getter upstream : String? protected def load_from_file(path : String?) # ameba:disable Metrics/CyclomaticComplexity - if (path.nil? || path.empty? || !File.exists?(path)) + if path.nil? || path.empty? || !File.exists?(path) return self end @@ -50,7 +50,7 @@ module AMQProxy abort ex.message end - protected def load_from_env + protected def load_from_env # ameba:disable Metrics/CyclomaticComplexity @listen_address = ENV["LISTEN_ADDRESS"]? || @listen_address @listen_port = ENV["LISTEN_PORT"]?.try &.to_i || @listen_port @http_port = ENV["HTTP_PORT"]?.try &.to_i || @http_port @@ -63,7 +63,7 @@ module AMQProxy self end - protected def load_from_options(options) + protected def load_from_options(options) # ameba:disable Metrics/CyclomaticComplexity @listen_address = options.listen_address || @listen_address @listen_port = options.listen_port || @listen_port @http_port = options.http_port || @http_port @@ -75,7 +75,7 @@ module AMQProxy # the debug flag overrules the log level. Only set the level # when it is not already set to debug or trace - if (options.is_debug && log_level > Log::Severity::Debug) + if options.is_debug && log_level > Log::Severity::Debug @log_level = Log::Severity::Debug end From 43b7f0c880e17043c1cec66a230a38cad99516a9 Mon Sep 17 00:00:00 2001 From: Arjan Vermunt Date: Sat, 5 Apr 2025 16:47:01 +0200 Subject: [PATCH 14/19] run format --- src/amqproxy/cli.cr | 8 ++++---- src/amqproxy/config.cr | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/amqproxy/cli.cr b/src/amqproxy/cli.cr index 060276b..e83a9e8 100644 --- a/src/amqproxy/cli.cr +++ b/src/amqproxy/cli.cr @@ -55,10 +55,10 @@ class AMQProxy::CLI config = @config = AMQProxy::Config.load_with_cli(options) log_backend = if ENV.has_key?("JOURNAL_STREAM") - ::Log::IOBackend.new(formatter: Journal::LogFormat, dispatcher: ::Log::DirectDispatcher) - else - ::Log::IOBackend.new(formatter: Stdout::LogFormat, dispatcher: ::Log::DirectDispatcher) - end + ::Log::IOBackend.new(formatter: Journal::LogFormat, dispatcher: ::Log::DirectDispatcher) + else + ::Log::IOBackend.new(formatter: Stdout::LogFormat, dispatcher: ::Log::DirectDispatcher) + end ::Log.setup_from_env(default_level: config.log_level, backend: log_backend) Log.debug { config.inspect } diff --git a/src/amqproxy/config.cr b/src/amqproxy/config.cr index a743384..ee97c05 100644 --- a/src/amqproxy/config.cr +++ b/src/amqproxy/config.cr @@ -85,7 +85,7 @@ module AMQProxy def self.load_with_cli(options : Options) new() .load_from_file(options.ini_file || "config.ini") - .load_from_env() + .load_from_env .load_from_options(options) end end From 3d538b76f96bc155657f5f5fcfd0fc4af812aa57 Mon Sep 17 00:00:00 2001 From: Arjan Vermunt Date: Sat, 5 Apr 2025 16:57:50 +0200 Subject: [PATCH 15/19] fix boolean syntax --- spec/amqproxy/config_spec.cr | 4 ++-- src/amqproxy/cli.cr | 2 +- src/amqproxy/config.cr | 2 +- src/amqproxy/options.cr | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/spec/amqproxy/config_spec.cr b/spec/amqproxy/config_spec.cr index 342f583..9d04a7a 100644 --- a/spec/amqproxy/config_spec.cr +++ b/spec/amqproxy/config_spec.cr @@ -135,7 +135,7 @@ describe AMQProxy::Config do options.idle_connection_timeout = 15 options.term_timeout = 16 options.term_client_close_timeout = 17 - options.is_debug = true + options.debug = true options.upstream = "amqp://localhost:5679" config = AMQProxy::Config.load_with_cli(options) @@ -159,7 +159,7 @@ describe AMQProxy::Config do options.idle_connection_timeout = 15 options.term_timeout = 16 options.term_client_close_timeout = 17 - options.is_debug = true + options.debug = true options.upstream = "amqp://localhost:5679" config = AMQProxy::Config.load_with_cli(options) diff --git a/src/amqproxy/cli.cr b/src/amqproxy/cli.cr index e83a9e8..1082a1b 100644 --- a/src/amqproxy/cli.cr +++ b/src/amqproxy/cli.cr @@ -33,7 +33,7 @@ class AMQProxy::CLI options.term_client_close_timeout = v.to_i end parser.on("--log-level=LEVEL", "The log level (default: info)") { |v| options.log_level = ::Log::Severity.parse(v) } - parser.on("-d", "--debug", "Verbose logging") { options.is_debug = true } + parser.on("-d", "--debug", "Verbose logging") { options.debug = true } parser.on("-c FILE", "--config=FILE", "Load config file") { |v| options.ini_file = v } parser.on("-h", "--help", "Show this help") { puts parser.to_s; exit 0 } parser.on("-v", "--version", "Display version") { puts AMQProxy::VERSION.to_s; exit 0 } diff --git a/src/amqproxy/config.cr b/src/amqproxy/config.cr index ee97c05..da218d1 100644 --- a/src/amqproxy/config.cr +++ b/src/amqproxy/config.cr @@ -75,7 +75,7 @@ module AMQProxy # the debug flag overrules the log level. Only set the level # when it is not already set to debug or trace - if options.is_debug && log_level > Log::Severity::Debug + if options.debug? && log_level > Log::Severity::Debug @log_level = Log::Severity::Debug end diff --git a/src/amqproxy/options.cr b/src/amqproxy/options.cr index 013b3cb..878e858 100644 --- a/src/amqproxy/options.cr +++ b/src/amqproxy/options.cr @@ -7,7 +7,7 @@ module AMQProxy property term_timeout : Int32? = nil property term_client_close_timeout : Int32? = nil property log_level : Log::Severity? = nil - property is_debug : Bool = false + property? debug : Bool = false property ini_file : String? = nil property upstream : String? = nil end From 933e3104b9a99f81b7adafc35bdaed4bfa718786 Mon Sep 17 00:00:00 2001 From: Arjan Vermunt Date: Sat, 12 Apr 2025 21:51:03 +0200 Subject: [PATCH 16/19] Process review feedback. Removed option record, process cli options in the Configu class --- spec/amqproxy/config_spec.cr | 145 ++++++++++++++++++----------------- src/amqproxy/cli.cr | 37 +-------- src/amqproxy/config.cr | 79 +++++++++++-------- 3 files changed, 123 insertions(+), 138 deletions(-) diff --git a/spec/amqproxy/config_spec.cr b/spec/amqproxy/config_spec.cr index 9d04a7a..288bacf 100644 --- a/spec/amqproxy/config_spec.cr +++ b/spec/amqproxy/config_spec.cr @@ -1,13 +1,15 @@ require "spec" require "../../src/amqproxy/config" -require "../../src/amqproxy/options" describe AMQProxy::Config do it "loads defaults when no ini file, env vars or options are available" do - options = AMQProxy::Options.new - options.ini_file = "/tmp/non_existing_file.ini" + previous_argv = ARGV.clone + ARGV.clear - config = AMQProxy::Config.load_with_cli(options) + ARGV.concat([ + "--config=/tmp/non_existing_file.ini"]) + + config = AMQProxy::Config.load_with_cli(ARGV) config.listen_address.should eq "localhost" config.listen_port.should eq 5673 @@ -17,14 +19,20 @@ describe AMQProxy::Config do config.term_timeout.should eq -1 config.term_client_close_timeout.should eq 0 config.upstream.should eq nil + + # Restore ARGV + ARGV.clear + ARGV.concat(previous_argv) end it "reads from empty config file returning default configuration" do - options = AMQProxy::Options.new - options.ini_file = "/tmp/config_empty.ini" + previous_argv = ARGV.clone + ARGV.clear - config = AMQProxy::Config.load_with_cli(options) + ARGV.concat(["--config=/tmp/config_empty.ini"]) + config = AMQProxy::Config.load_with_cli(ARGV) + config.listen_address.should eq "localhost" config.listen_port.should eq 5673 config.http_port.should eq 15673 @@ -33,25 +41,15 @@ describe AMQProxy::Config do config.term_timeout.should eq -1 config.term_client_close_timeout.should eq 0 config.upstream.should eq nil - end - it "reads from environment variables and use AMPQ_URL over UPSTREAM variable" do - options = AMQProxy::Options.new - - ENV["AMQP_URL"] = "amqp://localhost:5673" - ENV["UPSTREAM"] = "amqp://localhost:5674" - - config = AMQProxy::Config.load_with_cli(options) - - config.upstream.should eq "amqp://localhost:5673" - - # Clean up - ENV.delete("AMQP_URL") - ENV.delete("UPSTREAM") + # Restore ARGV + ARGV.clear + ARGV.concat(previous_argv) end it "reads from environment variables and overrules ini file values" do - options = AMQProxy::Options.new + previous_argv = ARGV.clone + ARGV.clear ENV["LISTEN_ADDRESS"] = "example.com" ENV["LISTEN_PORT"] = "5674" @@ -61,8 +59,8 @@ describe AMQProxy::Config do ENV["TERM_TIMEOUT"] = "13" ENV["TERM_CLIENT_CLOSE_TIMEOUT"] = "14" ENV["UPSTREAM"] = "amqp://localhost:5674" - - config = AMQProxy::Config.load_with_cli(options) + + config = AMQProxy::Config.load_with_cli(ARGV) config.listen_address.should eq "example.com" config.listen_port.should eq 5674 @@ -82,9 +80,16 @@ describe AMQProxy::Config do ENV.delete("TERM_TIMEOUT") ENV.delete("TERM_CLIENT_CLOSE_TIMEOUT") ENV.delete("UPSTREAM") + + # Restore ARGV + ARGV.clear + ARGV.concat(previous_argv) end it "reads from command line arguments and overrules env vars" do + previous_argv = ARGV.clone + ARGV.clear + ENV["LISTEN_ADDRESS"] = "example.com" ENV["LISTEN_PORT"] = "5674" ENV["HTTP_PORT"] = "15674" @@ -94,17 +99,17 @@ describe AMQProxy::Config do ENV["TERM_CLIENT_CLOSE_TIMEOUT"] = "14" ENV["UPSTREAM"] = "amqp://localhost:5674" - options = AMQProxy::Options.new - options.listen_address = "example_arg.com" - options.listen_port = 5675 - options.http_port = 15675 - options.log_level = ::Log::Severity::Warn - options.idle_connection_timeout = 15 - options.term_timeout = 16 - options.term_client_close_timeout = 17 - options.upstream = "amqp://localhost:5679" + ARGV.concat([ + "--listen=example_arg.com", + "--port=5675", + "--http-port=15675", + "--log-level=Warn", + "--idle-connection-timeout=15", + "--term-timeout=16", + "--term-client-close-timeout=17", + "amqp://localhost:5679"]) - config = AMQProxy::Config.load_with_cli(options) + config = AMQProxy::Config.load_with_cli(ARGV) config.listen_address.should eq "example_arg.com" config.log_level.should eq ::Log::Severity::Warn @@ -124,21 +129,28 @@ describe AMQProxy::Config do ENV.delete("TERM_TIMEOUT") ENV.delete("TERM_CLIENT_CLOSE_TIMEOUT") ENV.delete("UPSTREAM") + + # Restore ARGV + ARGV.clear + ARGV.concat(previous_argv) end it "sets log level to debug when debug flag is present" do - options = AMQProxy::Options.new - options.listen_address = "example_arg.com" - options.listen_port = 5675 - options.http_port = 15675 - options.log_level = ::Log::Severity::Warn - options.idle_connection_timeout = 15 - options.term_timeout = 16 - options.term_client_close_timeout = 17 - options.debug = true - options.upstream = "amqp://localhost:5679" - - config = AMQProxy::Config.load_with_cli(options) + previous_argv = ARGV.clone + ARGV.clear + + ARGV.concat([ + "--listen=example_arg.com", + "--port=5675", + "--http-port=15675", + "--log-level=Warn", + "--idle-connection-timeout=15", + "--term-timeout=16", + "--term-client-close-timeout=17", + "--debug", + "amqp://localhost:5679"]) + + config = AMQProxy::Config.load_with_cli(ARGV) config.listen_address.should eq "example_arg.com" config.log_level.should eq ::Log::Severity::Debug @@ -148,37 +160,26 @@ describe AMQProxy::Config do config.term_timeout.should eq 16 config.term_client_close_timeout.should eq 17 config.upstream.should eq "amqp://localhost:5679" + + # Restore ARGV + ARGV.clear + ARGV.concat(previous_argv) end it "keeps the log level to trace when debug flag is present" do - options = AMQProxy::Options.new - options.listen_address = "example_arg.com" - options.listen_port = 5675 - options.http_port = 15675 - options.log_level = ::Log::Severity::Trace - options.idle_connection_timeout = 15 - options.term_timeout = 16 - options.term_client_close_timeout = 17 - options.debug = true - options.upstream = "amqp://localhost:5679" - - config = AMQProxy::Config.load_with_cli(options) + previous_argv = ARGV.clone + ARGV.clear - config.log_level.should eq ::Log::Severity::Trace - end - - it "reads default ini file when ini file path is null" do - options = AMQProxy::Options.new + ARGV.concat([ + "--log-level=Trace", + "--debug"]) - config = AMQProxy::Config.load_with_cli(options) + config = AMQProxy::Config.load_with_cli(ARGV) - config.listen_address.should eq "127.0.0.2" - config.listen_port.should eq 5678 - config.http_port.should eq 15678 - config.log_level.should eq ::Log::Severity::Debug - config.idle_connection_timeout.should eq 55 - config.term_timeout.should eq 56 - config.term_client_close_timeout.should eq 57 - config.upstream.should eq "amqp://localhost:5678" + config.log_level.should eq ::Log::Severity::Trace + + # Restore ARGV + ARGV.clear + ARGV.concat(previous_argv) end end diff --git a/src/amqproxy/cli.cr b/src/amqproxy/cli.cr index 1082a1b..511a0ef 100644 --- a/src/amqproxy/cli.cr +++ b/src/amqproxy/cli.cr @@ -1,5 +1,4 @@ require "./config" -require "./options" require "./version" require "./server" require "./http_server" @@ -14,45 +13,11 @@ class AMQProxy::CLI @config : AMQProxy::Config? = nil @server : AMQProxy::Server? = nil - def load_options(argv) - options = AMQProxy::Options.new - - OptionParser.parse(argv) do |parser| - parser.on("-l ADDRESS", "--listen=ADDRESS", "Address to listen on (default is localhost)") do |v| - options.listen_address = v - end - parser.on("-p PORT", "--port=PORT", "Port to listen on (default: 5673)") { |v| options.listen_port = v.to_i } - parser.on("-b PORT", "--http-port=PORT", "HTTP Port to listen on (default: 15673)") { |v| options.http_port = v.to_i } - parser.on("-t IDLE_CONNECTION_TIMEOUT", "--idle-connection-timeout=SECONDS", "Maximum time in seconds an unused pooled connection stays open (default 5s)") do |v| - options.idle_connection_timeout = v.to_i - end - parser.on("--term-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to gracefully close their sockets after Close has been sent (default: infinite)") do |v| - options.term_timeout = v.to_i - end - parser.on("--term-client-close-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to send Close beforing sending Close to clients (default: 0s)") do |v| - options.term_client_close_timeout = v.to_i - end - parser.on("--log-level=LEVEL", "The log level (default: info)") { |v| options.log_level = ::Log::Severity.parse(v) } - parser.on("-d", "--debug", "Verbose logging") { options.debug = true } - parser.on("-c FILE", "--config=FILE", "Load config file") { |v| options.ini_file = v } - parser.on("-h", "--help", "Show this help") { puts parser.to_s; exit 0 } - parser.on("-v", "--version", "Display version") { puts AMQProxy::VERSION.to_s; exit 0 } - parser.invalid_option { |arg| abort "Invalid argument: #{arg}" } - end - - options.upstream = argv.shift? - - options - end - def run(argv) raise "run cant be called multiple times" unless @server.nil? - # load options from command line arguments - options = load_options(argv) - # load cascading configuration. load sequence: defaults -> file -> env -> cli - config = @config = AMQProxy::Config.load_with_cli(options) + config = @config = AMQProxy::Config.load_with_cli(argv) log_backend = if ENV.has_key?("JOURNAL_STREAM") ::Log::IOBackend.new(formatter: Journal::LogFormat, dispatcher: ::Log::DirectDispatcher) diff --git a/src/amqproxy/config.cr b/src/amqproxy/config.cr index da218d1..1d505ea 100644 --- a/src/amqproxy/config.cr +++ b/src/amqproxy/config.cr @@ -3,22 +3,24 @@ require "log" require "option_parser" module AMQProxy - struct Config - getter listen_address : String = "localhost" - getter listen_port : Int32 = 5673 - getter http_port : Int32 = 15673 - getter log_level : Log::Severity = Log::Severity::Info - getter idle_connection_timeout : Int32 = 5 - getter term_timeout : Int32 = -1 - getter term_client_close_timeout : Int32 = 0 - getter upstream : String? + class Config + getter listen_address = "localhost" + getter listen_port = 5673 + getter http_port = 15673 + getter log_level = Log::Severity::Info + getter idle_connection_timeout = 5 + getter term_timeout = -1 + getter term_client_close_timeout = 0 + getter upstream : String? = nil + getter debug = false + getter config_file = "config.ini" - protected def load_from_file(path : String?) # ameba:disable Metrics/CyclomaticComplexity - if path.nil? || path.empty? || !File.exists?(path) + protected def load_from_file # ameba:disable Metrics/CyclomaticComplexity + if config_file.empty? || !File.exists?(config_file) return self end - INI.parse(File.read(path)).each do |name, section| + INI.parse(File.read(config_file)).each do |name, section| case name when "main", "" section.each do |key, value| @@ -46,8 +48,6 @@ module AMQProxy end self - rescue ex - abort ex.message end protected def load_from_env # ameba:disable Metrics/CyclomaticComplexity @@ -63,30 +63,49 @@ module AMQProxy self end - protected def load_from_options(options) # ameba:disable Metrics/CyclomaticComplexity - @listen_address = options.listen_address || @listen_address - @listen_port = options.listen_port || @listen_port - @http_port = options.http_port || @http_port - @idle_connection_timeout = options.idle_connection_timeout || @idle_connection_timeout - @term_timeout = options.term_timeout || @term_timeout - @term_client_close_timeout = options.term_client_close_timeout || @term_client_close_timeout - @log_level = options.log_level || @log_level - @upstream = options.upstream || @upstream + protected def load_cli_options(argv) + OptionParser.parse(argv) do |parser| + parser.on("-l ADDRESS", "--listen=ADDRESS", "Address to listen on (default is localhost)") do |v| + @listen_address = v + end + parser.on("-p PORT", "--port=PORT", "Port to listen on (default: 5673)") { |v| @listen_port = v.to_i } + parser.on("-b PORT", "--http-port=PORT", "HTTP Port to listen on (default: 15673)") { |v| @http_port = v.to_i } + parser.on("-t IDLE_CONNECTION_TIMEOUT", "--idle-connection-timeout=SECONDS", "Maximum time in seconds an unused pooled connection stays open (default 5s)") do |v| + @idle_connection_timeout = v.to_i + end + parser.on("--term-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to gracefully close their sockets after Close has been sent (default: infinite)") do |v| + @term_timeout = v.to_i + end + parser.on("--term-client-close-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to send Close beforing sending Close to clients (default: 0s)") do |v| + @term_client_close_timeout = v.to_i + end + parser.on("--log-level=LEVEL", "The log level (default: info)") { |v| @log_level = ::Log::Severity.parse(v) } + parser.on("-d", "--debug", "Verbose logging") { @debug = true } + parser.on("-c FILE", "--config=FILE", "Load config file") do |v| + @config_file = v + end + parser.on("-h", "--help", "Show this help") { puts parser.to_s; exit 0 } + parser.on("-v", "--version", "Display version") { puts AMQProxy::VERSION.to_s; exit 0 } + parser.invalid_option { |arg| abort "Invalid argument: #{arg}" } + end - # the debug flag overrules the log level. Only set the level - # when it is not already set to debug or trace - if options.debug? && log_level > Log::Severity::Debug + @upstream = argv.shift? || @upstream + + if @debug && @log_level > Log::Severity::Debug @log_level = Log::Severity::Debug end self end - - def self.load_with_cli(options : Options) + + def self.load_with_cli(argv) new() - .load_from_file(options.ini_file || "config.ini") + .load_cli_options(argv.dup) # handle config file/help/version options + .load_from_file .load_from_env - .load_from_options(options) + .load_cli_options(argv) + rescue ex + abort ex.message end end end From 98cda91a0ab36be55bbab7e07f5fa9fd4f4919b8 Mon Sep 17 00:00:00 2001 From: Arjan Vermunt Date: Sun, 13 Apr 2025 00:20:41 +0200 Subject: [PATCH 17/19] tool format --- amqproxy | 1 + spec/amqproxy/config_spec.cr | 22 +++++++++++++--------- src/amqproxy/config.cr | 6 +++--- 3 files changed, 17 insertions(+), 12 deletions(-) create mode 160000 amqproxy diff --git a/amqproxy b/amqproxy new file mode 160000 index 0000000..d09258d --- /dev/null +++ b/amqproxy @@ -0,0 +1 @@ +Subproject commit d09258d8dec72b178e756e33ec806b70ee9fc94a diff --git a/spec/amqproxy/config_spec.cr b/spec/amqproxy/config_spec.cr index 288bacf..d9425cf 100644 --- a/spec/amqproxy/config_spec.cr +++ b/spec/amqproxy/config_spec.cr @@ -7,7 +7,8 @@ describe AMQProxy::Config do ARGV.clear ARGV.concat([ - "--config=/tmp/non_existing_file.ini"]) + "--config=/tmp/non_existing_file.ini", + ]) config = AMQProxy::Config.load_with_cli(ARGV) @@ -32,7 +33,7 @@ describe AMQProxy::Config do ARGV.concat(["--config=/tmp/config_empty.ini"]) config = AMQProxy::Config.load_with_cli(ARGV) - + config.listen_address.should eq "localhost" config.listen_port.should eq 5673 config.http_port.should eq 15673 @@ -59,7 +60,7 @@ describe AMQProxy::Config do ENV["TERM_TIMEOUT"] = "13" ENV["TERM_CLIENT_CLOSE_TIMEOUT"] = "14" ENV["UPSTREAM"] = "amqp://localhost:5674" - + config = AMQProxy::Config.load_with_cli(ARGV) config.listen_address.should eq "example.com" @@ -107,7 +108,8 @@ describe AMQProxy::Config do "--idle-connection-timeout=15", "--term-timeout=16", "--term-client-close-timeout=17", - "amqp://localhost:5679"]) + "amqp://localhost:5679", + ]) config = AMQProxy::Config.load_with_cli(ARGV) @@ -129,7 +131,7 @@ describe AMQProxy::Config do ENV.delete("TERM_TIMEOUT") ENV.delete("TERM_CLIENT_CLOSE_TIMEOUT") ENV.delete("UPSTREAM") - + # Restore ARGV ARGV.clear ARGV.concat(previous_argv) @@ -148,7 +150,8 @@ describe AMQProxy::Config do "--term-timeout=16", "--term-client-close-timeout=17", "--debug", - "amqp://localhost:5679"]) + "amqp://localhost:5679", + ]) config = AMQProxy::Config.load_with_cli(ARGV) @@ -160,7 +163,7 @@ describe AMQProxy::Config do config.term_timeout.should eq 16 config.term_client_close_timeout.should eq 17 config.upstream.should eq "amqp://localhost:5679" - + # Restore ARGV ARGV.clear ARGV.concat(previous_argv) @@ -172,12 +175,13 @@ describe AMQProxy::Config do ARGV.concat([ "--log-level=Trace", - "--debug"]) + "--debug", + ]) config = AMQProxy::Config.load_with_cli(ARGV) config.log_level.should eq ::Log::Severity::Trace - + # Restore ARGV ARGV.clear ARGV.concat(previous_argv) diff --git a/src/amqproxy/config.cr b/src/amqproxy/config.cr index 1d505ea..9871259 100644 --- a/src/amqproxy/config.cr +++ b/src/amqproxy/config.cr @@ -97,15 +97,15 @@ module AMQProxy self end - + def self.load_with_cli(argv) new() .load_cli_options(argv.dup) # handle config file/help/version options .load_from_file .load_from_env .load_cli_options(argv) - rescue ex - abort ex.message + rescue ex + abort ex.message end end end From 52c80e369217d05236812ab2d936ac5bbc20daab Mon Sep 17 00:00:00 2001 From: Arjan Vermunt Date: Sun, 13 Apr 2025 00:23:14 +0200 Subject: [PATCH 18/19] Remove options file --- src/amqproxy/options.cr | 14 -------------- 1 file changed, 14 deletions(-) delete mode 100644 src/amqproxy/options.cr diff --git a/src/amqproxy/options.cr b/src/amqproxy/options.cr deleted file mode 100644 index 878e858..0000000 --- a/src/amqproxy/options.cr +++ /dev/null @@ -1,14 +0,0 @@ -module AMQProxy - struct Options - property listen_address : String? = nil - property listen_port : Int32? = nil - property http_port : Int32? = nil - property idle_connection_timeout : Int32? = nil - property term_timeout : Int32? = nil - property term_client_close_timeout : Int32? = nil - property log_level : Log::Severity? = nil - property? debug : Bool = false - property ini_file : String? = nil - property upstream : String? = nil - end -end From a919055872a235cc9b52434c9c22d54657a87b9f Mon Sep 17 00:00:00 2001 From: Arjan Vermunt Date: Sun, 13 Apr 2025 00:33:41 +0200 Subject: [PATCH 19/19] fix ameba suggestion --- src/amqproxy/config.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/amqproxy/config.cr b/src/amqproxy/config.cr index 9871259..46626f7 100644 --- a/src/amqproxy/config.cr +++ b/src/amqproxy/config.cr @@ -12,7 +12,7 @@ module AMQProxy getter term_timeout = -1 getter term_client_close_timeout = 0 getter upstream : String? = nil - getter debug = false + getter debug : Bool? = nil getter config_file = "config.ini" protected def load_from_file # ameba:disable Metrics/CyclomaticComplexity