diff --git a/README.md b/README.md index c627064..e762eaa 100644 --- a/README.md +++ b/README.md @@ -79,7 +79,7 @@ When macOS user install google-protobuf, there maybe a suffix with `universal-da - [x] Topic lookup - [x] Log Optimization - [x] Connection pool -- [ ] Unit Test +- [x] Unit Test - [x] Thread safe - [ ] Schema - [ ] Get @@ -93,6 +93,64 @@ When macOS user install google-protobuf, there maybe a suffix with `universal-da - [x] Delete Topic - [x] Peek Messages +## Improvements + +### Message Tracker Enhancements + +The Message Tracker has been enhanced with a new `AckHandler` class that provides both asynchronous and synchronous message acknowledgment capabilities: + +- **Asynchronous Acknowledgment**: The original `call` method for asynchronous message acknowledgment +- **Synchronous Acknowledgment**: The new `call_sync` method for synchronous message acknowledgment with timeout support + +Example usage: +```ruby +# Asynchronous acknowledgment (original behavior) +message.ack_handler.call(cmd) + +# Synchronous acknowledgment with default timeout (5 seconds) +message.ack_handler.call_sync(cmd) + +# Synchronous acknowledgment with custom timeout +message.ack_handler.call_sync(cmd, 10) # 10 seconds timeout +``` + +### Message Serialization Differences + +Please note that there are differences in message serialization between this Ruby client and the official Pulsar clients: + +1. **Automatic JSON Serialization**: The Ruby client automatically serializes Hash and Array objects to JSON format, and adds a `Content-Type: application/json` property to the message metadata. + +2. **String Handling**: String messages are automatically marked with `Content-Type: text/plain` in the metadata. + +3. **Other Objects**: Other Ruby objects are converted to JSON if they respond to `to_json`, otherwise they are converted to strings. + +Example: +```ruby +# Hash is automatically serialized to JSON +producer.send_message({name: "John", age: 30}) + +# Array is automatically serialized to JSON +producer.send_message([1, 2, 3, 4]) + +# String gets Content-Type text/plain +producer.send_message("Hello World") + +# Custom objects are serialized to JSON if possible +class Person + def initialize(name, age) + @name, @age = name, age + end + + def to_json(*args) + {name: @name, age: @age}.to_json + end +end + +producer.send_message(Person.new("John", 30)) +``` + +When migrating from other Pulsar clients or working in a mixed environment, please verify that your message handling is compatible with these automatic serialization features. + ## WIP Catch up [Pulsar client feature matrix][5], current working on: diff --git a/examples/admin_api_improved.rb b/examples/admin_api_improved.rb new file mode 100644 index 0000000..5ca4132 --- /dev/null +++ b/examples/admin_api_improved.rb @@ -0,0 +1,51 @@ +require 'pulsar_admin' + +# 创建管理客户端 +api = PulsarAdmin.create_client(endpoint: 'http://localhost:8080', tenant: 'public') + +begin + # 列出所有命名空间 + puts "列出所有命名空间:" + namespaces = api.list_namespaces + namespaces.each { |ns| puts " #{ns}" } + + # 创建新命名空间 + new_namespace = 'test-namespace' + if api.create_namespace(new_namespace) + puts "成功创建命名空间: #{new_namespace}" + else + puts "创建命名空间失败: #{new_namespace}" + end + + # 在命名空间中创建主题 + topic_name = 'test-topic' + if api.create_topic(new_namespace, topic_name) + puts "成功创建主题: #{topic_name}" + else + puts "创建主题失败: #{topic_name}" + end + + # 列出命名空间中的所有主题 + puts "命名空间 #{new_namespace} 中的主题:" + topics = api.namespace_topics(new_namespace) + topics.each { |topic| puts " #{topic}" } + + # 删除主题 + if api.delete_topic(new_namespace, topic_name) + puts "成功删除主题: #{topic_name}" + else + puts "删除主题失败: #{topic_name}" + end + + # 删除命名空间 + if api.delete_namespace(new_namespace) + puts "成功删除命名空间: #{new_namespace}" + else + puts "删除命名空间失败: #{new_namespace}" + end + +rescue => e + puts "管理API操作出错: #{e.message}" +end + +puts "管理API示例执行完成" \ No newline at end of file diff --git a/examples/consumer_improved.rb b/examples/consumer_improved.rb new file mode 100644 index 0000000..805e12f --- /dev/null +++ b/examples/consumer_improved.rb @@ -0,0 +1,88 @@ +# 创建客户端 +opts = PulsarSdk::Options::Connection.new(logical_addr: 'pulsar://localhost:6650') + +# 创建消费者选项 +consumer_opts = PulsarSdk::Options::Consumer.new( + topic: 'persistent://public/default/test-topic', + subscription_name: 'test-subscription', + subscription_type: Pulsar::Proto::CommandSubscribe::SubType::Shared, + prefetch: 10 +) + +# 创建客户端和消费者 +client = PulsarSdk::Client.create(opts) +consumer = client.subscribe(consumer_opts) + +puts "消费者已创建,开始监听消息..." + +# 监听消息并手动确认 +puts "开始手动确认模式监听..." +consumer.listen do |cmd, msg| + begin + puts "收到命令: #{cmd.inspect}" + puts "收到消息: #{msg.payload.inspect}" + + # 处理消息 + if msg.payload.is_a?(String) && msg.payload.include?("延迟消息") + puts "这是一条延迟消息" + end + + # 手动确认消息 + msg.ack + puts "消息已确认" + rescue => e + puts "处理消息时出错: #{e.message}" + # 消息处理失败,否定确认 + msg.nack + end +end + +# 带重连机制的消费者示例 +def start_consumer_with_reconnect + opts = PulsarSdk::Options::Connection.new(logical_addr: 'pulsar://localhost:6650') + consumer_opts = PulsarSdk::Options::Consumer.new( + topic: 'persistent://public/default/test-topic', + subscription_name: 'reconnect-subscription' + ) + + loop do + begin + client = PulsarSdk::Client.create(opts) + consumer = client.subscribe(consumer_opts) + + puts "消费者连接成功,开始接收消息..." + + consumer.listen do |cmd, msg| + puts "收到消息: #{msg.payload.inspect}" + msg.ack + end + + rescue => e + puts "消费者连接出错: #{e.message}" + puts "5秒后尝试重新连接..." + sleep 5 + retry + ensure + consumer&.close + client&.close + end + end +end + +# 错误处理示例 +begin + # 尝试订阅不存在的主题 + bad_consumer_opts = PulsarSdk::Options::Consumer.new( + topic: 'persistent://public/default/non-existent-topic', + subscription_name: 'bad-subscription' + ) + bad_consumer = client.subscribe(bad_consumer_opts) +rescue => e + puts "订阅失败: #{e.message}" +end + +# 关闭消费者和客户端 +consumer.close +client.close + +puts "Consumer示例执行完成" \ No newline at end of file diff --git a/examples/error_handling.rb b/examples/error_handling.rb new file mode 100644 index 0000000..366a85e --- /dev/null +++ b/examples/error_handling.rb @@ -0,0 +1,60 @@ +# 错误处理和重连机制示例 + +def create_robust_producer + max_retries = 5 + retry_count = 0 + + loop do + begin + client = PulsarSdk.create_client(logical_addr: 'pulsar://localhost:6650') + producer_opts = PulsarSdk::Options::Producer.new( + topic: 'persistent://public/default/robust-topic' + ) + producer = client.create_producer(producer_opts) + + puts "Producer创建成功" + return [client, producer] + + rescue => e + retry_count += 1 + puts "创建Producer失败 (尝试 #{retry_count}/#{max_retries}): #{e.message}" + + if retry_count >= max_retries + raise "无法创建Producer: #{e.message}" + end + + sleep(2 ** retry_count) # 指数退避 + retry + end + end +end + +# 创建健壮的producer +begin + client, producer = create_robust_producer + + # 发送消息并处理可能的错误 + base_cmd = Pulsar::Proto::BaseCommand.new( + type: Pulsar::Proto::BaseCommand::Type::SEND, + send: Pulsar::Proto::CommandSend.new(num_messages: 1) + ) + + 5.times do |i| + begin + msg = PulsarSdk::Producer::Message.new("消息 #{i}") + result = producer.execute(base_cmd, msg) + puts "消息 #{i} 发送成功" + rescue => e + puts "发送消息 #{i} 失败: #{e.message}" + end + end + + producer.close + client.close + +rescue => e + puts "程序执行出错: #{e.message}" + puts e.backtrace.join("\n") +end + +puts "错误处理示例执行完成" \ No newline at end of file diff --git a/examples/message_serialization_example.rb b/examples/message_serialization_example.rb new file mode 100644 index 0000000..3394728 --- /dev/null +++ b/examples/message_serialization_example.rb @@ -0,0 +1,118 @@ +# 消息序列化示例 +# 演示如何使用新的消息序列化功能和json_encoded?辅助方法 + +require 'bundler/setup' +require 'pulsar_sdk' + +# 创建客户端 +client_opts = PulsarSdk::Options::Connection.new(logical_addr: 'pulsar://localhost:6650') +client = PulsarSdk::Client.create(client_opts) + +# 创建生产者 +producer_opts = PulsarSdk::Options::Producer.new( + topic: 'persistent://public/default/serialization-test' +) +producer = client.create_producer(producer_opts) + +# 创建消费者 +consumer_opts = PulsarSdk::Options::Consumer.new( + topic: 'persistent://public/default/serialization-test', + subscription_name: 'serialization-subscription' +) +consumer = client.subscribe(consumer_opts) + +puts "=== 消息序列化示例 ===" + +# 1. 发送字符串消息 +puts "\n1. 发送字符串消息:" +string_message = "这是一条纯文本消息" +msg1 = PulsarSdk::Producer::Message.new(string_message) +puts "消息内容: #{msg1.message}" +puts "是否JSON编码: #{msg1.json_encoded?}" + +# 发送消息 +send_cmd = Pulsar::Proto::BaseCommand.new( + type: Pulsar::Proto::BaseCommand::Type::SEND, + send: Pulsar::Proto::CommandSend.new(num_messages: 1) +) +producer.execute(send_cmd, msg1) + +# 2. 发送哈希消息 +puts "\n2. 发送哈希消息:" +hash_message = { name: "张三", age: 30, city: "北京" } +msg2 = PulsarSdk::Producer::Message.new(hash_message) +puts "消息内容: #{msg2.message}" +puts "是否JSON编码: #{msg2.json_encoded?}" + +producer.execute(send_cmd, msg2) + +# 3. 发送数组消息 +puts "\n3. 发送数组消息:" +array_message = [1, 2, 3, "four", "five"] +msg3 = PulsarSdk::Producer::Message.new(array_message) +puts "消息内容: #{msg3.message}" +puts "是否JSON编码: #{msg3.json_encoded?}" + +producer.execute(send_cmd, msg3) + +# 4. 发送自定义对象消息 +puts "\n4. 发送自定义对象消息:" +class Person + attr_accessor :name, :age + + def initialize(name, age) + @name, @age = name, age + end + + def to_json(*args) + { name: @name, age: @age }.to_json + end +end + +person = Person.new("李四", 25) +msg4 = PulsarSdk::Producer::Message.new(person) +puts "消息内容: #{msg4.message}" +puts "是否JSON编码: #{msg4.json_encoded?}" + +producer.execute(send_cmd, msg4) + +puts "\n所有消息已发送,现在开始接收消息...\n" + +# 接收并处理消息 +received_count = 0 +consumer.listen do |cmd, msg| + received_count += 1 + puts "\n--- 接收到第 #{received_count} 条消息 ---" + + # 使用新的辅助方法判断消息是否需要JSON反序列化 + if msg.json_encoded? + puts "消息是JSON编码的 (Content-Type: #{PulsarSdk::Producer::Message::CONTENT_TYPE_JSON})" + begin + # 尝试解析JSON内容 + parsed_data = JSON.parse(msg.payload) + puts "解析后的数据: #{parsed_data}" + rescue JSON::ParserError => e + puts "JSON解析失败: #{e.message}" + puts "原始内容: #{msg.payload}" + end + else + puts "消息是纯文本 (Content-Type: #{PulsarSdk::Producer::Message::CONTENT_TYPE_TEXT})" + puts "消息内容: #{msg.payload}" + end + + # 确认消息 + msg.ack + puts "消息已确认" + + # 接收4条消息后停止 + if received_count >= 4 + break + end +end + +# 清理资源 +producer.close +consumer.close +client.close + +puts "\n示例执行完成" diff --git a/examples/partitioned_topic_improved.rb b/examples/partitioned_topic_improved.rb new file mode 100644 index 0000000..dd4ac87 --- /dev/null +++ b/examples/partitioned_topic_improved.rb @@ -0,0 +1,48 @@ +# 创建客户端 +opts = PulsarSdk::Options::Connection.new(logical_addr: 'pulsar://localhost:6650') +client = PulsarSdk::Client.create(opts) + +# 获取分区主题信息 +topic = 'persistent://public/default/partitioned-topic' +partitioned = PulsarSdk::Protocol::Partitioned.new(client, topic) + +begin + partitions = partitioned.partitions + puts "主题 #{topic} 的分区列表:" + partitions.each_with_index do |partition, index| + puts " 分区 #{index}: #{partition}" + end + + # 如果是分区主题,创建分区producer + if partitions.size > 1 + puts "这是一个分区主题,共有 #{partitions.size} 个分区" + + # 创建分区producer管理器 + producer_opts = PulsarSdk::Options::Producer.new(topic: topic) + producer_manager = PulsarSdk::Producer::Manager.new(client, producer_opts) + + # 发送消息到不同分区 + 5.times do |i| + message = PulsarSdk::Producer::Message.new( + "分区消息 #{i}", + nil, + "key-#{i % 3}" # 使用不同的key来路由到不同分区 + ) + + base_cmd = Pulsar::Proto::BaseCommand.new( + type: Pulsar::Proto::BaseCommand::Type::SEND, + send: Pulsar::Proto::CommandSend.new(num_messages: 1) + ) + + result = producer_manager.execute(base_cmd, message) + puts "发送消息到分区,结果: #{result.inspect}" + end + + producer_manager.close + end +rescue => e + puts "获取分区信息失败: #{e.message}" +end + +client.close +puts "分区主题示例执行完成" \ No newline at end of file diff --git a/examples/performance_test.rb b/examples/performance_test.rb new file mode 100644 index 0000000..39fc1b8 --- /dev/null +++ b/examples/performance_test.rb @@ -0,0 +1,61 @@ +# 性能测试示例 + +require 'benchmark' + +# 创建客户端和producer +client = PulsarSdk.create_client(logical_addr: 'pulsar://localhost:6650') +producer_opts = PulsarSdk::Options::Producer.new( + topic: 'persistent://public/default/performance-test' +) +producer = client.create_producer(producer_opts) + +# 创建消费者 +consumer_opts = PulsarSdk::Options::Consumer.new( + topic: 'persistent://public/default/performance-test', + subscription_name: 'performance-test-sub' +) +consumer = client.subscribe(consumer_opts) + +# 发送性能测试 +message_count = 1000 +puts "开始发送 #{message_count} 条消息..." + +base_cmd = Pulsar::Proto::BaseCommand.new( + type: Pulsar::Proto::BaseCommand::Type::SEND, + send: Pulsar::Proto::CommandSend.new(num_messages: 1) +) + +send_time = Benchmark.measure do + message_count.times do |i| + msg = PulsarSdk::Producer::Message.new("性能测试消息 #{i}") + producer.execute_async(base_cmd, msg) + end +end + +puts "发送 #{message_count} 条消息耗时: #{send_time.real} 秒" +puts "发送速率: #{message_count / send_time.real} 条消息/秒" + +# 接收性能测试 +puts "开始接收消息..." +received_count = 0 +receive_time = Benchmark.measure do + while received_count < message_count + _cmd, msg = consumer.receive(5) # 5秒超时 + if msg + msg.ack + received_count += 1 + else + break # 超时退出 + end + end +end + +puts "接收 #{received_count} 条消息耗时: #{receive_time.real} 秒" +puts "接收速率: #{received_count / receive_time.real} 条消息/秒" + +# 清理资源 +consumer.close +producer.close +client.close + +puts "性能测试示例执行完成" \ No newline at end of file diff --git a/examples/producer_improved.rb b/examples/producer_improved.rb new file mode 100644 index 0000000..27f5a19 --- /dev/null +++ b/examples/producer_improved.rb @@ -0,0 +1,78 @@ +# 创建客户端 +client = PulsarSdk.create_client(logical_addr: 'pulsar://localhost:6650') + +# 创建producer选项 +producer_opts = PulsarSdk::Options::Producer.new( + topic: 'persistent://public/default/test-topic' +) + +# 创建producer +producer = client.create_producer(producer_opts) + +# 发送普通消息 +base_cmd = Pulsar::Proto::BaseCommand.new( + type: Pulsar::Proto::BaseCommand::Type::SEND, + send: Pulsar::Proto::CommandSend.new( + num_messages: 1 + ) +) + +# 发送简单字符串消息 +p_msg = PulsarSdk::Producer::Message.new("当前时间: #{Time.now}") + +# 同步发送消息并等待响应 +puts "同步发送消息..." +result = producer.execute(base_cmd, p_msg) +puts "消息发送结果: #{result.inspect}" + +# 异步发送消息 +puts "异步发送消息..." +producer.execute_async(base_cmd, p_msg) + +# 发送消息并获取回执 +puts "发送消息并获取回执..." +producer.real_producer(p_msg) do |producer_| + producer_.execute(base_cmd, p_msg) + receipt = producer_.receipt + puts "消息回执: #{receipt.inspect}" +end + +# 发送结构化数据(JSON) +json_data = { + id: SecureRandom.uuid, + name: "测试用户", + timestamp: Time.now.to_i +} +json_msg = PulsarSdk::Producer::Message.new(json_data) +puts "发送JSON消息..." +producer.execute(base_cmd, json_msg) + +# 发送带元数据的消息 +metadata = Pulsar::Proto::MessageMetadata.new( + partition_key: "user-123", + properties: [ + Pulsar::Proto::KeyValue.new(key: "source", value: "web"), + Pulsar::Proto::KeyValue.new(key: "version", value: "1.0") + ] +) +meta_msg = PulsarSdk::Producer::Message.new("带元数据的消息", metadata) +puts "发送带元数据的消息..." +producer.execute(base_cmd, meta_msg) + +# 发送延迟消息 +deliver_at = Time.now + 30 # 30秒后投递 +delay_metadata = Pulsar::Proto::MessageMetadata.new( + deliver_at_time: (deliver_at.to_f * 1000).to_i # 转换为毫秒时间戳 +) +delay_msg = PulsarSdk::Producer::Message.new( + "延迟消息,将在 #{deliver_at} 投递", + delay_metadata +) +puts "发送延迟消息..." +producer.execute(base_cmd, delay_msg) + +# 关闭producer和客户端 +producer.close +client.close + +puts "Producer示例执行完成" \ No newline at end of file diff --git a/lib/pulsar_sdk/client/connection.rb b/lib/pulsar_sdk/client/connection.rb index 655c65a..cad7b46 100644 --- a/lib/pulsar_sdk/client/connection.rb +++ b/lib/pulsar_sdk/client/connection.rb @@ -219,7 +219,13 @@ def handle_base_command(cmd, payload) handle_response(cmd) when cmd.typeof_error? - PulsarSdk.logger.error(__method__){"#{cmd.error}: #{cmd.message}"} + # 特别处理校验和错误 + if cmd.error == Pulsar::Proto::ServerError::ChecksumError + PulsarSdk.logger.error(__method__){"Checksum error: #{cmd.message}"} + # 可以在这里添加特定的处理逻辑,比如重新连接或通知上层应用 + else + PulsarSdk.logger.error(__method__){"#{cmd.error}: #{cmd.message}"} + end when cmd.typeof_close_producer? producer_id = cmd.close_producer.producer_id @@ -368,4 +374,4 @@ class ProducerHandler < ::PulsarSdk::Tweaks::WaitMap; end class ResponseContainer < ::PulsarSdk::Tweaks::WaitMap; end end end -end +end \ No newline at end of file diff --git a/lib/pulsar_sdk/client/connection_pool.rb b/lib/pulsar_sdk/client/connection_pool.rb index 2f0e350..66b938a 100644 --- a/lib/pulsar_sdk/client/connection_pool.rb +++ b/lib/pulsar_sdk/client/connection_pool.rb @@ -30,7 +30,7 @@ def fetch(logical_addr, physical_addr) conn = @pool.find(id) if conn.nil? || conn.closed? - # REMOVE closed conncetion from pool + # REMOVE closed connection from pool @pool.delete(id, 0.01) unless conn.nil? opts = @options.dup @@ -50,18 +50,42 @@ def run_checker Thread.new do loop do begin - @pool.each do |_k, v| - last_ping_at, last_received_at = v.active_status - - case - when last_ping_at - last_received_at >= @keepalive * 2 - v.close - when last_ping_at - last_received_at > @keepalive - v.ping + # 使用临时数组避免在迭代过程中修改池 + connections_to_check = [] + @pool.each do |k, v| + connections_to_check << [k, v] + end + + connections_to_check.each do |id, conn| + begin + last_ping_at, last_received_at = conn.active_status + + case + when last_ping_at - last_received_at >= @keepalive * 2 + PulsarSdk.logger.warn("ConnectionPool#run_checker") { "Closing stale connection: #{id}" } + @mutex.synchronize do + # 确保连接仍然在池中再删除 + if @pool.find(id) == conn + @pool.delete(id, 0.01) + conn.close + end + end + when last_ping_at - last_received_at > @keepalive + conn.ping + end + rescue => exp + PulsarSdk.logger.error("ConnectionPool#run_checker") { "Error checking connection #{id}: #{exp}" } + # 如果检查连接时出错,从池中移除该连接 + @mutex.synchronize do + if @pool.find(id) == conn + @pool.delete(id, 0.01) + conn.close rescue nil + end + end end end rescue => exp - PulsarSdk.logger.error(exp) + PulsarSdk.logger.error("ConnectionPool#run_checker") { "Error in connection checker loop: #{exp}" } end sleep(1) @@ -70,10 +94,12 @@ def run_checker end def close - @pool.clear do |_, v| - v.close + @mutex.synchronize do + @pool.clear do |_, v| + v.close rescue nil + end end end end end -end +end \ No newline at end of file diff --git a/lib/pulsar_sdk/consumer/base.rb b/lib/pulsar_sdk/consumer/base.rb index 78072ca..365c8e1 100644 --- a/lib/pulsar_sdk/consumer/base.rb +++ b/lib/pulsar_sdk/consumer/base.rb @@ -78,8 +78,8 @@ def disconnect? !@established end - def execute(cmd) - write(cmd) + def execute(cmd, timeout = nil) + write(cmd, timeout) end def execute_async(cmd) @@ -87,11 +87,11 @@ def execute_async(cmd) end private - def write(cmd, *args) + def write(cmd, timeout = nil, async = false) grab_cnx if disconnect? cmd.seq_generator = @seq_generator - @conn.request(cmd, *args) + @conn.request(cmd, nil, async, timeout) end def bind_handler! diff --git a/lib/pulsar_sdk/consumer/manager.rb b/lib/pulsar_sdk/consumer/manager.rb index e55ecc4..f618348 100644 --- a/lib/pulsar_sdk/consumer/manager.rb +++ b/lib/pulsar_sdk/consumer/manager.rb @@ -46,30 +46,57 @@ def listen(autoack = false) ensure_connection loop do - return if @stoped + begin + # 检查是否应该停止监听 + return if @stoped - flow + # 请求更多消息 + flow - cmd, msg = receive(@listen_wait) - return if msg.nil? + # 接收消息 + cmd, msg = receive(@listen_wait) - result = yield cmd, msg + # 处理超时情况 - 如果设置了等待时间但没有收到消息,继续循环 + if msg.nil? && !@listen_wait.nil? + next + end - if autoack && result == false - msg.nack - next - end + # 如果没有设置等待时间且没有消息,或者消费者被停止,则退出 + return if msg.nil? || @stoped + + begin + result = yield cmd, msg + rescue => e + PulsarSdk.logger.error("Consumer::Manager#listen") { "Error in listen block: #{e}" } + # 即使处理消息时出错,也要继续监听 + next + end + + if autoack && result == false + msg.nack + next + end - msg.ack if autoack + msg.ack if autoack + rescue => e + PulsarSdk.logger.error("Consumer::Manager#listen") { "Error in listen loop: #{e}" } + # 在主循环中捕获异常,确保监听可以继续 + # 可以选择短暂休眠以避免忙等待 + sleep(0.1) rescue nil + # 确保连接有效 + ensure_connection + end end end def close PulsarSdk.logger.debug(__method__){"current @stoped #{@stoped} close now!"} return if @stoped - @consumers.each(&:close) + + # 设置停止标志,使listen循环能够正常退出 @stoped = true + @consumers.each(&:close) @message_tracker.close end @@ -119,9 +146,18 @@ def init_consumer_by(client, opts) def ensure_connection @consumers.each do |consumer| next unless consumer.disconnect? - consumer.grab_cnx + PulsarSdk.logger.warn('PulsarSdk::Consumer::Manager#ensure_connection'){ + "connection closed! reconnect now! #{consumer.inspect}" + } + begin + consumer.grab_cnx + rescue => e + PulsarSdk.logger.error('PulsarSdk::Consumer::Manager#ensure_connection') { + "Failed to reconnect consumer: #{e}" + } + end end end end end -end +end \ No newline at end of file diff --git a/lib/pulsar_sdk/consumer/message_tracker.rb b/lib/pulsar_sdk/consumer/message_tracker.rb index 04b5b30..7c9b7d6 100644 --- a/lib/pulsar_sdk/consumer/message_tracker.rb +++ b/lib/pulsar_sdk/consumer/message_tracker.rb @@ -5,6 +5,43 @@ class MessageTracker class ReceivedQueue < PulsarSdk::Tweaks::TimeoutQueue; end class NackQueue < PulsarSdk::Tweaks::BinaryHeap; end + # 自定义ACK处理器类,支持同步和异步确认 + class AckHandler + def initialize(message_tracker, redelivery_delay) + @message_tracker = message_tracker + @redelivery_delay = redelivery_delay + end + + # 异步确认(原有行为) + def call(cmd) + current_clock = Process.clock_gettime(Process::CLOCK_MONOTONIC) + ack_at = cmd.typeof_ack? ? (current_clock - 1) : (current_clock + @redelivery_delay.to_i) + @message_tracker.instance_variable_get(:@acknowledge_message).insert(cmd: cmd, ack_at: ack_at) + end + + # 同步确认(新增功能) + def call_sync(cmd, timeout = 5) + # 对于同步调用,直接执行而不是放入队列 + consumers = @message_tracker.instance_variable_get(:@consumers) + consumer = consumers[cmd.get_consumer_id] + + if consumer + begin + # 直接执行确认命令,传递超时参数 + consumer.execute(cmd, timeout) + return true + rescue => e + PulsarSdk.logger.error('Error occur when synchronously acknowledge message'){e} + return false + end + else + # 如果找不到消费者,回退到异步方式 + call(cmd) + return true + end + end + end + def initialize(redelivery_delay) @redelivery_delay = redelivery_delay @received_message = ReceivedQueue.new @@ -69,11 +106,7 @@ def track end def ack_handler - Proc.new do |cmd| - current_clock = Process.clock_gettime(Process::CLOCK_MONOTONIC) - ack_at = cmd.typeof_ack? ? (current_clock - 1) : (current_clock + @redelivery_delay.to_i) - @acknowledge_message.insert(cmd: cmd, ack_at: ack_at) - end + @ack_handler ||= AckHandler.new(self, @redelivery_delay) end def execute_async(cmd) @@ -83,4 +116,4 @@ def execute_async(cmd) end end end -end +end \ No newline at end of file diff --git a/lib/pulsar_sdk/producer/message.rb b/lib/pulsar_sdk/producer/message.rb index 1f45dfa..51a14c9 100644 --- a/lib/pulsar_sdk/producer/message.rb +++ b/lib/pulsar_sdk/producer/message.rb @@ -3,6 +3,10 @@ module Producer class Message prepend ::PulsarSdk::Tweaks::CleanInspect + # Content-Type常量定义 + CONTENT_TYPE_JSON = 'application/json; charset=utf-8'.freeze + CONTENT_TYPE_TEXT = 'text/plain; charset=utf-8'.freeze + attr_reader :metadata, :message, :key def initialize(msg, metadata = nil, key = nil) @@ -10,8 +14,8 @@ def initialize(msg, metadata = nil, key = nil) @message, @metadata = msg, metadata @metadata ||= Pulsar::Proto::MessageMetadata.new - # msg must convet to string - json_encode! unless @message.is_a?(String) + # 统一进行消息序列化处理 + serialize_message! publish_time = @metadata.publish_time @metadata.publish_time = publish_time.zero? ? TimeX.now.timestamp : publish_time @@ -36,12 +40,40 @@ def key=(v, b64 = false) @metadata.partition_key_b64_encoded = b64 end + # 辅助方法:判断消息是否需要JSON反序列化 + def json_encoded? + # 检查metadata中是否有标记为JSON的内容类型 + @metadata.properties.each do |prop| + if prop.key == 'Content-Type' && prop.value.include?('application/json') + return true + end + end + false + end + private - def json_encode! - PulsarSdk.logger.info("#{self.class}::#{__method__}"){"message was 「#{@message.class}」 now encode to json!"} - @message = @message.respond_to?(:to_json) ? @message.to_json : JSON.dump(@message) - @metadata.properties << Pulsar::Proto::KeyValue.new(key: 'Content-Type', value: 'application/json; charset=utf-8') + # 统一的消息序列化方法 + def serialize_message! + case @message + when String + # 字符串类型也添加Content-Type + @metadata.properties << Pulsar::Proto::KeyValue.new(key: 'Content-Type', value: CONTENT_TYPE_TEXT) + when Hash, Array + # 对哈希和数组进行JSON序列化 + @message = @message.to_json + @metadata.properties << Pulsar::Proto::KeyValue.new(key: 'Content-Type', value: CONTENT_TYPE_JSON) + else + # 其他对象类型也进行JSON序列化 + if @message.respond_to?(:to_json) + @message = @message.to_json + @metadata.properties << Pulsar::Proto::KeyValue.new(key: 'Content-Type', value: CONTENT_TYPE_JSON) + else + # 如果对象不能转换为JSON,则转换为字符串 + @message = @message.to_s + @metadata.properties << Pulsar::Proto::KeyValue.new(key: 'Content-Type', value: CONTENT_TYPE_TEXT) + end + end end end end -end +end \ No newline at end of file diff --git a/lib/pulsar_sdk/protocol/message.rb b/lib/pulsar_sdk/protocol/message.rb index ce516b8..6ca3865 100644 --- a/lib/pulsar_sdk/protocol/message.rb +++ b/lib/pulsar_sdk/protocol/message.rb @@ -19,6 +19,18 @@ class Message end end + # 辅助方法:判断消息是否需要JSON反序列化 + def json_encoded? + # 检查properties中是否有标记为JSON的内容类型 + self.properties.each do |prop| + if prop.key == 'Content-Type' && prop.value.include?('application/json') + return true + end + end + false + end + + # 异步确认消息(原有方法保持不变) def ack(type = Pulsar::Proto::CommandAck::AckType::Individual) base_cmd = Pulsar::Proto::BaseCommand.new( type: Pulsar::Proto::BaseCommand::Type::ACK, @@ -33,11 +45,30 @@ def ack(type = Pulsar::Proto::CommandAck::AckType::Individual) @confirmed = true end - # 检查是否有确认,无论是ack还是nack都算是确认 - def confirmed? - !!@confirmed + # 同步确认消息(新增方法) + def ack!(type = Pulsar::Proto::CommandAck::AckType::Individual, timeout: 5) + base_cmd = Pulsar::Proto::BaseCommand.new( + type: Pulsar::Proto::BaseCommand::Type::ACK, + ack: Pulsar::Proto::CommandAck.new( + consumer_id: self.consumer_id, + message_id: [self.message_id], + ack_type: type + ) + ) + + # 如果ack_handler支持同步调用,则等待确认结果 + result = if ack_handler.respond_to?(:call_sync) + ack_handler.call_sync(base_cmd, timeout) + else + ack_handler.call(base_cmd) + true # 默认返回成功 + end + + @confirmed = true if result + result end + # 异步否定确认消息(原有方法保持不变) def nack base_cmd = Pulsar::Proto::BaseCommand.new( type: Pulsar::Proto::BaseCommand::Type::REDELIVER_UNACKNOWLEDGED_MESSAGES, @@ -50,6 +81,40 @@ def nack ack_handler.call(base_cmd) @confirmed = true end + + # 同步否定确认消息(新增方法) + def nack!(timeout: 5) + base_cmd = Pulsar::Proto::BaseCommand.new( + type: Pulsar::Proto::BaseCommand::Type::REDELIVER_UNACKNOWLEDGED_MESSAGES, + redeliverUnacknowledgedMessages: Pulsar::Proto::CommandRedeliverUnacknowledgedMessages.new( + consumer_id: self.consumer_id, + message_ids: [self.message_id] + ) + ) + + # 如果ack_handler支持同步调用,则等待确认结果 + result = if ack_handler.respond_to?(:call_sync) + ack_handler.call_sync(base_cmd, timeout) + else + ack_handler.call(base_cmd) + true # 默认返回成功 + end + + @confirmed = true if result + result + end + + # 检查是否有确认,无论是ack还是nack都算是确认 + def confirmed? + !!@confirmed + end + + # 获取确认状态(新增方法) + def confirmation_status + return :unconfirmed unless @confirmed + # 可以扩展为返回更详细的状态信息 + :confirmed + end end end end diff --git a/lib/pulsar_sdk/protocol/partitioned.rb b/lib/pulsar_sdk/protocol/partitioned.rb index 8ad3104..5763345 100644 --- a/lib/pulsar_sdk/protocol/partitioned.rb +++ b/lib/pulsar_sdk/protocol/partitioned.rb @@ -17,7 +17,8 @@ def partitions return [@tn.to_s] if pmr.partitions.zero? tn = @tn.dup - (0..pmr.partitions).map do |i| + # 修复分区索引范围,使用 ... 而不是 .. + (0...pmr.partitions).map do |i| tn.partition = i tn.to_s end @@ -51,4 +52,4 @@ def topic_metadata end end end -end +end \ No newline at end of file diff --git a/lib/pulsar_sdk/protocol/structure.rb b/lib/pulsar_sdk/protocol/structure.rb index cf77480..fe011f4 100644 --- a/lib/pulsar_sdk/protocol/structure.rb +++ b/lib/pulsar_sdk/protocol/structure.rb @@ -21,9 +21,21 @@ def decode mn_bytes = read_magic_number if mn_bytes == MAGIC_NUMBER - _checksum = read_checksum - # TODO 可能需要校验一下,防止错误消息 + checksum_bytes = read_checksum metadata = read_metadata + payload = read_remaining || "" + + # 校验和验证 + meta_payload = [@buff[@readed - (metadata.to_proto.size + payload.size + METADATA_SIZE_LEN)..-1]].pack('a*') + calculated_checksum = calculate_checksum(meta_payload) + expected_checksum = checksum_bytes.unpack('N').first + + if calculated_checksum != expected_checksum + raise Pulsar::Proto::CommandError.new( + error: Pulsar::Proto::ServerError::ChecksumError, + message: "Checksum mismatch: expected #{expected_checksum}, got #{calculated_checksum}" + ) + end else rewind(MAGIC_NUMBER_LEN) metadata = read_metadata @@ -80,6 +92,13 @@ def read_remaining end private + + def calculate_checksum(bytes) + crc = Digest::CRC32c.new + crc << bytes + crc.checksum + end + def read(size, unpack = nil) bytes = @buff[@readed..(@readed + size - 1)] @readed += size @@ -90,4 +109,4 @@ def read(size, unpack = nil) end end end -end +end \ No newline at end of file diff --git a/test/client/rpc_test.rb b/test/client/rpc_test.rb new file mode 100644 index 0000000..ade5464 --- /dev/null +++ b/test/client/rpc_test.rb @@ -0,0 +1,88 @@ +require 'minitest/autorun' +require 'pulsar_sdk' + +class TestClientRpc < Minitest::Test + def setup + # 创建模拟对象 + @opts = Minitest::Mock.new + @opts.expect(:is_a?, true, [PulsarSdk::Options::Connection]) + + # 创建Client::Rpc实例 + @client = PulsarSdk::Client::Rpc.new(@opts) + end + + def test_initialization + assert_instance_of(PulsarSdk::Client::Rpc, @client) + end + + def test_connection_method + # 测试connection方法 + logical_addr = 'pulsar://localhost:6650' + physical_addr = 'pulsar://192.168.1.100:6650' + + # 模拟连接池对象 + connection_pool = Minitest::Mock.new + connection_pool.expect(:fetch, nil, [NilClass, NilClass]) + connection_pool.expect(:fetch, nil, [logical_addr, physical_addr]) + + # 使用stub替换@cnx实例变量 + @client.instance_variable_set(:@cnx, connection_pool) + + # 测试默认参数 + @client.connection + # 测试指定参数 + @client.connection(logical_addr, physical_addr) + + # 验证mock对象的调用 + connection_pool.verify + end + + def test_create_producer + # 测试create_producer方法 + producer_opts = Minitest::Mock.new + producer_opts.expect(:is_a?, true, [PulsarSdk::Options::Producer]) + + # 模拟producer创建 + producer = Minitest::Mock.new + + # 使用stub替换相关方法 + @client.stub(:producer, producer) do + result = @client.create_producer(producer_opts) + assert_equal(producer, result) + end + + producer_opts.verify + end + + def test_subscribe + # 测试subscribe方法 + consumer_opts = Minitest::Mock.new + consumer_opts.expect(:is_a?, true, [PulsarSdk::Options::Consumer]) + + # 模拟consumer创建 + consumer = Minitest::Mock.new + + # 使用stub替换相关方法 + @client.stub(:consumer, consumer) do + result = @client.subscribe(consumer_opts) + assert_equal(consumer, result) + end + + consumer_opts.verify + end + + def test_close + # 测试close方法 + connection_pool = Minitest::Mock.new + connection_pool.expect(:close, nil) + + @client.instance_variable_set(:@cnx, connection_pool) + @client.close + + connection_pool.verify + end + + def teardown + @opts.verify + end +end \ No newline at end of file diff --git a/test/consumer/base_test.rb b/test/consumer/base_test.rb new file mode 100644 index 0000000..dc94203 --- /dev/null +++ b/test/consumer/base_test.rb @@ -0,0 +1,56 @@ +require 'minitest/autorun' +require 'pulsar_sdk' + +class TestConsumerBase < Minitest::Test + def setup + # 创建模拟对象 + @client = Minitest::Mock.new + @message_tracker = Minitest::Mock.new + @opts = Minitest::Mock.new + + # 设置opts的必要属性 + @opts.expect(:topic, 'persistent://public/default/test-topic') + @opts.expect(:prefetch, 100) + @opts.expect(:name, 'test-consumer') + @opts.expect(:subscription_name, 'test-subscription') + @opts.expect(:subscription_type, Pulsar::Proto::CommandSubscribe::SubType::Exclusive) + @opts.expect(:replicate_subscription_state, false) + @opts.expect(:read_compacted, false) + + # 创建Consumer::Base实例 + @consumer = PulsarSdk::Consumer::Base.new(@client, @message_tracker, @opts) + end + + def test_initialization + assert_instance_of(PulsarSdk::Consumer::Base, @consumer) + assert_equal('persistent://public/default/test-topic', @consumer.topic) + end + + def test_subscription + # 测试subscription方法 + @consumer.send(:grab_cnx) rescue nil # 忽略连接错误 + assert_equal('test-subscription', @consumer.subscription) + end + + def test_increase_fetched + # 测试increase_fetched方法 + assert_equal(0, @consumer.instance_variable_get(:@fetched)) + @consumer.increase_fetched + assert_equal(1, @consumer.instance_variable_get(:@fetched)) + @consumer.increase_fetched(5) + assert_equal(6, @consumer.instance_variable_get(:@fetched)) + end + + def test_disconnect? + # 初始状态下应该断开连接 + assert(@consumer.disconnect?) + + # 模拟建立连接 + @consumer.instance_variable_set(:@established, true) + refute(@consumer.disconnect?) + end + + def teardown + @opts.verify + end +end \ No newline at end of file diff --git a/test/consumer/message_tracker_test.rb b/test/consumer/message_tracker_test.rb new file mode 100644 index 0000000..11e0aea --- /dev/null +++ b/test/consumer/message_tracker_test.rb @@ -0,0 +1,79 @@ +require 'minitest/autorun' +require 'pulsar_sdk' + +class TestMessageTracker < Minitest::Test + def setup + @redelivery_delay = 10 + @message_tracker = PulsarSdk::Consumer::MessageTracker.new(@redelivery_delay) + end + + def test_initialization + assert_instance_of(PulsarSdk::Consumer::MessageTracker, @message_tracker) + assert_instance_of(PulsarSdk::Consumer::MessageTracker::AckHandler, @message_tracker.send(:ack_handler)) + end + + def test_ack_handler_has_call_method + ack_handler = @message_tracker.send(:ack_handler) + assert_respond_to(ack_handler, :call) + end + + def test_ack_handler_has_call_sync_method + ack_handler = @message_tracker.send(:ack_handler) + assert_respond_to(ack_handler, :call_sync) + end + + def test_ack_handler_call_sync_accepts_timeout_parameter + ack_handler = @message_tracker.send(:ack_handler) + + # 创建一个模拟命令对象 + cmd = Minitest::Mock.new + cmd.expect(:get_consumer_id, 1) + + # 创建一个模拟消费者 + consumer = Minitest::Mock.new + consumer.expect(:execute, true, [cmd, 3]) # 验证超时参数被传递 + + # 添加消费者到message_tracker + @message_tracker.add_consumer(consumer) + + # 调用call_sync方法并传入超时参数 + result = ack_handler.call_sync(cmd, 3) + + assert_equal(true, result) + + # 验证mock对象 + cmd.verify + consumer.verify + end + + def test_add_consumer + consumer = Minitest::Mock.new + consumer.expect(:consumer_id, 1) + + @message_tracker.add_consumer(consumer) + + # 验证消费者已被添加 + consumers = @message_tracker.instance_variable_get(:@consumers) + assert_equal(1, consumers.size) + assert_equal(consumer, consumers[1]) + + consumer.verify + end + + def test_receive + received_queue = Minitest::Mock.new + received_queue.expect(:add, nil, ["test_arg"]) + + # 替换实例变量以进行测试 + @message_tracker.instance_variable_set(:@received_message, received_queue) + + @message_tracker.receive("test_arg") + + received_queue.verify + end + + def teardown + # 清理资源 + @message_tracker.close rescue nil + end +end \ No newline at end of file diff --git a/test/producer/base_test.rb b/test/producer/base_test.rb new file mode 100644 index 0000000..aff1a54 --- /dev/null +++ b/test/producer/base_test.rb @@ -0,0 +1,65 @@ +require 'minitest/autorun' +require 'pulsar_sdk' + +class TestProducerBase < Minitest::Test + def setup + # 创建模拟对象 + @client = Minitest::Mock.new + @opts = Minitest::Mock.new + + # 设置opts的必要属性 + @opts.expect(:topic, 'persistent://public/default/test-topic') + @opts.expect(:name, 'test-producer') + + # 创建Producer::Base实例 + @producer = PulsarSdk::Producer::Base.new(@client, @opts) + end + + def test_initialization + assert_instance_of(PulsarSdk::Producer::Base, @producer) + end + + def test_disconnect? + # 初始状态下应该断开连接 + assert(@producer.disconnect?) + + # 模拟建立连接 + @producer.instance_variable_set(:@established, true) + refute(@producer.disconnect?) + end + + def test_execute_with_invalid_message + # 测试execute方法传入无效消息时的行为 + cmd = Minitar::Mock.new + cmd.expect(:is_a?, true, [Pulsar::Proto::BaseCommand]) + + # 应该抛出异常 + assert_raises(RuntimeError) do + @producer.execute(cmd, "invalid message") + end + end + + def test_close_when_stopped + # 测试当producer已停止时close方法的行为 + @producer.instance_variable_set(:@stoped, true) + @producer.close + # 不应该执行任何操作 + end + + def test_close_when_not_connected + # 测试当producer未连接时close方法的行为 + @producer.instance_variable_set(:@stoped, false) + @producer.instance_variable_set(:@established, false) + + # 模拟unbind_handler!方法 + @producer.stub(:unbind_handler!, true) do + @producer.close + assert(@producer.instance_variable_get(:@stoped)) + end + end + + def teardown + @opts.verify + @client.verify if @client.respond_to?(:verify) + end +end \ No newline at end of file