Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 59 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ When macOS user install google-protobuf, there maybe a suffix with `universal-da
- [x] Topic lookup
- [x] Log Optimization
- [x] Connection pool
- [ ] Unit Test
- [x] Unit Test
- [x] Thread safe
- [ ] Schema
- [ ] Get
Expand All @@ -93,6 +93,64 @@ When macOS user install google-protobuf, there maybe a suffix with `universal-da
- [x] Delete Topic
- [x] Peek Messages

## Improvements

### Message Tracker Enhancements

The Message Tracker has been enhanced with a new `AckHandler` class that provides both asynchronous and synchronous message acknowledgment capabilities:

- **Asynchronous Acknowledgment**: The original `call` method for asynchronous message acknowledgment
- **Synchronous Acknowledgment**: The new `call_sync` method for synchronous message acknowledgment with timeout support

Example usage:
```ruby
# Asynchronous acknowledgment (original behavior)
message.ack_handler.call(cmd)

# Synchronous acknowledgment with default timeout (5 seconds)
message.ack_handler.call_sync(cmd)

# Synchronous acknowledgment with custom timeout
message.ack_handler.call_sync(cmd, 10) # 10 seconds timeout
```

### Message Serialization Differences

Please note that there are differences in message serialization between this Ruby client and the official Pulsar clients:

1. **Automatic JSON Serialization**: The Ruby client automatically serializes Hash and Array objects to JSON format, and adds a `Content-Type: application/json` property to the message metadata.

2. **String Handling**: String messages are automatically marked with `Content-Type: text/plain` in the metadata.

3. **Other Objects**: Other Ruby objects are converted to JSON if they respond to `to_json`, otherwise they are converted to strings.

Example:
```ruby
# Hash is automatically serialized to JSON
producer.send_message({name: "John", age: 30})

# Array is automatically serialized to JSON
producer.send_message([1, 2, 3, 4])

# String gets Content-Type text/plain
producer.send_message("Hello World")

# Custom objects are serialized to JSON if possible
class Person
def initialize(name, age)
@name, @age = name, age
end

def to_json(*args)
{name: @name, age: @age}.to_json
end
end

producer.send_message(Person.new("John", 30))
```

When migrating from other Pulsar clients or working in a mixed environment, please verify that your message handling is compatible with these automatic serialization features.

## WIP

Catch up [Pulsar client feature matrix][5], current working on:
Expand Down
51 changes: 51 additions & 0 deletions examples/admin_api_improved.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
require 'pulsar_admin'

# 创建管理客户端
api = PulsarAdmin.create_client(endpoint: 'http://localhost:8080', tenant: 'public')

begin
# 列出所有命名空间
puts "列出所有命名空间:"
namespaces = api.list_namespaces
namespaces.each { |ns| puts " #{ns}" }

# 创建新命名空间
new_namespace = 'test-namespace'
if api.create_namespace(new_namespace)
puts "成功创建命名空间: #{new_namespace}"
else
puts "创建命名空间失败: #{new_namespace}"
end

# 在命名空间中创建主题
topic_name = 'test-topic'
if api.create_topic(new_namespace, topic_name)
puts "成功创建主题: #{topic_name}"
else
puts "创建主题失败: #{topic_name}"
end

# 列出命名空间中的所有主题
puts "命名空间 #{new_namespace} 中的主题:"
topics = api.namespace_topics(new_namespace)
topics.each { |topic| puts " #{topic}" }

# 删除主题
if api.delete_topic(new_namespace, topic_name)
puts "成功删除主题: #{topic_name}"
else
puts "删除主题失败: #{topic_name}"
end

# 删除命名空间
if api.delete_namespace(new_namespace)
puts "成功删除命名空间: #{new_namespace}"
else
puts "删除命名空间失败: #{new_namespace}"
end

rescue => e
puts "管理API操作出错: #{e.message}"
end

puts "管理API示例执行完成"
88 changes: 88 additions & 0 deletions examples/consumer_improved.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# 创建客户端
opts = PulsarSdk::Options::Connection.new(logical_addr: 'pulsar://localhost:6650')

# 创建消费者选项
consumer_opts = PulsarSdk::Options::Consumer.new(
topic: 'persistent://public/default/test-topic',
subscription_name: 'test-subscription',
subscription_type: Pulsar::Proto::CommandSubscribe::SubType::Shared,
prefetch: 10
)

# 创建客户端和消费者
client = PulsarSdk::Client.create(opts)
consumer = client.subscribe(consumer_opts)

puts "消费者已创建,开始监听消息..."

# 监听消息并手动确认
puts "开始手动确认模式监听..."
consumer.listen do |cmd, msg|
begin
puts "收到命令: #{cmd.inspect}"
puts "收到消息: #{msg.payload.inspect}"

# 处理消息
if msg.payload.is_a?(String) && msg.payload.include?("延迟消息")
puts "这是一条延迟消息"
end

# 手动确认消息
msg.ack
puts "消息已确认"
rescue => e
puts "处理消息时出错: #{e.message}"
# 消息处理失败,否定确认
msg.nack
end
end

# 带重连机制的消费者示例
def start_consumer_with_reconnect
opts = PulsarSdk::Options::Connection.new(logical_addr: 'pulsar://localhost:6650')
consumer_opts = PulsarSdk::Options::Consumer.new(
topic: 'persistent://public/default/test-topic',
subscription_name: 'reconnect-subscription'
)

loop do
begin
client = PulsarSdk::Client.create(opts)
consumer = client.subscribe(consumer_opts)

puts "消费者连接成功,开始接收消息..."

consumer.listen do |cmd, msg|
puts "收到消息: #{msg.payload.inspect}"
msg.ack
end

rescue => e
puts "消费者连接出错: #{e.message}"
puts "5秒后尝试重新连接..."
sleep 5
retry
ensure
consumer&.close
client&.close
end
end
end

# 错误处理示例
begin
# 尝试订阅不存在的主题
bad_consumer_opts = PulsarSdk::Options::Consumer.new(
topic: 'persistent://public/default/non-existent-topic',
subscription_name: 'bad-subscription'
)
bad_consumer = client.subscribe(bad_consumer_opts)
rescue => e
puts "订阅失败: #{e.message}"
end

# 关闭消费者和客户端
consumer.close
client.close

puts "Consumer示例执行完成"
60 changes: 60 additions & 0 deletions examples/error_handling.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# 错误处理和重连机制示例

def create_robust_producer
max_retries = 5
retry_count = 0

loop do
begin
client = PulsarSdk.create_client(logical_addr: 'pulsar://localhost:6650')
producer_opts = PulsarSdk::Options::Producer.new(
topic: 'persistent://public/default/robust-topic'
)
producer = client.create_producer(producer_opts)

puts "Producer创建成功"
return [client, producer]

rescue => e
retry_count += 1
puts "创建Producer失败 (尝试 #{retry_count}/#{max_retries}): #{e.message}"

if retry_count >= max_retries
raise "无法创建Producer: #{e.message}"
end

sleep(2 ** retry_count) # 指数退避
retry
end
end
end

# 创建健壮的producer
begin
client, producer = create_robust_producer

# 发送消息并处理可能的错误
base_cmd = Pulsar::Proto::BaseCommand.new(
type: Pulsar::Proto::BaseCommand::Type::SEND,
send: Pulsar::Proto::CommandSend.new(num_messages: 1)
)

5.times do |i|
begin
msg = PulsarSdk::Producer::Message.new("消息 #{i}")
result = producer.execute(base_cmd, msg)
puts "消息 #{i} 发送成功"
rescue => e
puts "发送消息 #{i} 失败: #{e.message}"
end
end

producer.close
client.close

rescue => e
puts "程序执行出错: #{e.message}"
puts e.backtrace.join("\n")
end

puts "错误处理示例执行完成"
118 changes: 118 additions & 0 deletions examples/message_serialization_example.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# 消息序列化示例
# 演示如何使用新的消息序列化功能和json_encoded?辅助方法

require 'bundler/setup'
require 'pulsar_sdk'

# 创建客户端
client_opts = PulsarSdk::Options::Connection.new(logical_addr: 'pulsar://localhost:6650')
client = PulsarSdk::Client.create(client_opts)

# 创建生产者
producer_opts = PulsarSdk::Options::Producer.new(
topic: 'persistent://public/default/serialization-test'
)
producer = client.create_producer(producer_opts)

# 创建消费者
consumer_opts = PulsarSdk::Options::Consumer.new(
topic: 'persistent://public/default/serialization-test',
subscription_name: 'serialization-subscription'
)
consumer = client.subscribe(consumer_opts)

puts "=== 消息序列化示例 ==="

# 1. 发送字符串消息
puts "\n1. 发送字符串消息:"
string_message = "这是一条纯文本消息"
msg1 = PulsarSdk::Producer::Message.new(string_message)
puts "消息内容: #{msg1.message}"
puts "是否JSON编码: #{msg1.json_encoded?}"

# 发送消息
send_cmd = Pulsar::Proto::BaseCommand.new(
type: Pulsar::Proto::BaseCommand::Type::SEND,
send: Pulsar::Proto::CommandSend.new(num_messages: 1)
)
producer.execute(send_cmd, msg1)

# 2. 发送哈希消息
puts "\n2. 发送哈希消息:"
hash_message = { name: "张三", age: 30, city: "北京" }
msg2 = PulsarSdk::Producer::Message.new(hash_message)
puts "消息内容: #{msg2.message}"
puts "是否JSON编码: #{msg2.json_encoded?}"

producer.execute(send_cmd, msg2)

# 3. 发送数组消息
puts "\n3. 发送数组消息:"
array_message = [1, 2, 3, "four", "five"]
msg3 = PulsarSdk::Producer::Message.new(array_message)
puts "消息内容: #{msg3.message}"
puts "是否JSON编码: #{msg3.json_encoded?}"

producer.execute(send_cmd, msg3)

# 4. 发送自定义对象消息
puts "\n4. 发送自定义对象消息:"
class Person
attr_accessor :name, :age

def initialize(name, age)
@name, @age = name, age
end

def to_json(*args)
{ name: @name, age: @age }.to_json
end
end

person = Person.new("李四", 25)
msg4 = PulsarSdk::Producer::Message.new(person)
puts "消息内容: #{msg4.message}"
puts "是否JSON编码: #{msg4.json_encoded?}"

producer.execute(send_cmd, msg4)

puts "\n所有消息已发送,现在开始接收消息...\n"

# 接收并处理消息
received_count = 0
consumer.listen do |cmd, msg|
received_count += 1
puts "\n--- 接收到第 #{received_count} 条消息 ---"

# 使用新的辅助方法判断消息是否需要JSON反序列化
if msg.json_encoded?
puts "消息是JSON编码的 (Content-Type: #{PulsarSdk::Producer::Message::CONTENT_TYPE_JSON})"
begin
# 尝试解析JSON内容
parsed_data = JSON.parse(msg.payload)
puts "解析后的数据: #{parsed_data}"
rescue JSON::ParserError => e
puts "JSON解析失败: #{e.message}"
puts "原始内容: #{msg.payload}"
end
else
puts "消息是纯文本 (Content-Type: #{PulsarSdk::Producer::Message::CONTENT_TYPE_TEXT})"
puts "消息内容: #{msg.payload}"
end

# 确认消息
msg.ack
puts "消息已确认"

# 接收4条消息后停止
if received_count >= 4
break
end
end

# 清理资源
producer.close
consumer.close
client.close

puts "\n示例执行完成"
Loading