From 82dcc0537df34e42beb4537c0ee3e83dedee2cdd Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Fri, 6 Aug 2021 11:20:06 +0800 Subject: [PATCH 1/6] add epoch to producer --- pulsar/producer_partition.go | 47 +++++++++++++++++++++--------------- pulsar/producer_test.go | 37 ++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+), 19 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index abec4fc1f7..350c097388 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -79,6 +79,9 @@ type partitionProducer struct { schemaInfo *SchemaInfo partitionIdx int32 metrics *internal.TopicMetrics + + epoch uint64 + userProvidedProducerName bool } func newPartitionProducer(client *client, topic string, options *ProducerOptions, partitionIdx int, @@ -101,19 +104,21 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions logger := client.log.SubLogger(log.Fields{"topic": topic}) p := &partitionProducer{ - client: client, - topic: topic, - log: logger, - options: options, - producerID: client.rpcClient.NewProducerID(), - eventsChan: make(chan interface{}, maxPendingMessages), - connectClosedCh: make(chan connectionClosed, 10), - batchFlushTicker: time.NewTicker(batchingMaxPublishDelay), - publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)), - pendingQueue: internal.NewBlockingQueue(maxPendingMessages), - lastSequenceID: -1, - partitionIdx: int32(partitionIdx), - metrics: metrics, + client: client, + topic: topic, + log: logger, + options: options, + producerID: client.rpcClient.NewProducerID(), + eventsChan: make(chan interface{}, maxPendingMessages), + connectClosedCh: make(chan connectionClosed, 10), + batchFlushTicker: time.NewTicker(batchingMaxPublishDelay), + publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)), + pendingQueue: internal.NewBlockingQueue(maxPendingMessages), + lastSequenceID: -1, + partitionIdx: int32(partitionIdx), + metrics: metrics, + epoch: 0, + userProvidedProducerName: false, } p.setProducerState(producerInit) @@ -125,6 +130,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions if options.Name != "" { p.producerName = options.Name + p.userProvidedProducerName = true } err := p.grabCnx() @@ -136,6 +142,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions p.log = p.log.SubLogger(log.Fields{ "producer_name": p.producerName, "producerID": p.producerID, + "epoch": atomic.LoadUint64(&p.epoch), }) p.log.WithField("cnx", p.cnx.ID()).Info("Created producer") @@ -177,11 +184,13 @@ func (p *partitionProducer) grabCnx() error { } cmdProducer := &pb.CommandProducer{ - RequestId: proto.Uint64(id), - Topic: proto.String(p.topic), - Encrypted: nil, - ProducerId: proto.Uint64(p.producerID), - Schema: pbSchema, + RequestId: proto.Uint64(id), + Topic: proto.String(p.topic), + Encrypted: nil, + ProducerId: proto.Uint64(p.producerID), + Schema: pbSchema, + Epoch: proto.Uint64(atomic.LoadUint64(&p.epoch)), + UserProvidedProducerName: proto.Bool(p.userProvidedProducerName), } if p.producerName != "" { @@ -298,7 +307,7 @@ func (p *partitionProducer) reconnectToBroker() { d := backoff.Next() p.log.Info("Reconnecting to broker in ", d) time.Sleep(d) - + atomic.AddUint64(&p.epoch, 1) err := p.grabCnx() if err == nil { // Successfully reconnected diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index bbe8028e55..865865fa31 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -1127,3 +1127,40 @@ func TestProducerSendAfterClose(t *testing.T) { assert.Nil(t, ID) assert.Error(t, err) } + +func TestExactlyOnceWithProducerNameSpecified(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.NoError(t, err) + defer client.Close() + + topicName := newTopicName() + + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topicName, + Name: "p-name-1", + }) + + assert.NoError(t, err) + assert.NotNil(t, producer) + defer producer.Close() + + producer2, err := client.CreateProducer(ProducerOptions{ + Topic: topicName, + Name: "p-name-2", + }) + + assert.NoError(t, err) + assert.NotNil(t, producer2) + defer producer2.Close() + + producer3, err := client.CreateProducer(ProducerOptions{ + Topic: topicName, + Name: "p-name-2", + }) + + assert.NotNil(t, err) + assert.Nil(t, producer3) + defer producer3.Close() +} From d60fec5a1a89b3dafe02b93c4bee3ac0db552d4f Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Fri, 6 Aug 2021 11:33:29 +0800 Subject: [PATCH 2/6] fix CI --- pulsar/producer_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 865865fa31..dc7a5efd21 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -1162,5 +1162,4 @@ func TestExactlyOnceWithProducerNameSpecified(t *testing.T) { assert.NotNil(t, err) assert.Nil(t, producer3) - defer producer3.Close() } From ede3b087dbbcc38a2ab78a07a487ceae98fffa80 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Mon, 9 Aug 2021 09:42:28 +0800 Subject: [PATCH 3/6] address comments --- pulsar/producer_partition.go | 38 ++++++++++++++++++------------------ 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 350c097388..2b039cf9ac 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -80,8 +80,7 @@ type partitionProducer struct { partitionIdx int32 metrics *internal.TopicMetrics - epoch uint64 - userProvidedProducerName bool + epoch uint64 } func newPartitionProducer(client *client, topic string, options *ProducerOptions, partitionIdx int, @@ -104,21 +103,20 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions logger := client.log.SubLogger(log.Fields{"topic": topic}) p := &partitionProducer{ - client: client, - topic: topic, - log: logger, - options: options, - producerID: client.rpcClient.NewProducerID(), - eventsChan: make(chan interface{}, maxPendingMessages), - connectClosedCh: make(chan connectionClosed, 10), - batchFlushTicker: time.NewTicker(batchingMaxPublishDelay), - publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)), - pendingQueue: internal.NewBlockingQueue(maxPendingMessages), - lastSequenceID: -1, - partitionIdx: int32(partitionIdx), - metrics: metrics, - epoch: 0, - userProvidedProducerName: false, + client: client, + topic: topic, + log: logger, + options: options, + producerID: client.rpcClient.NewProducerID(), + eventsChan: make(chan interface{}, maxPendingMessages), + connectClosedCh: make(chan connectionClosed, 10), + batchFlushTicker: time.NewTicker(batchingMaxPublishDelay), + publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)), + pendingQueue: internal.NewBlockingQueue(maxPendingMessages), + lastSequenceID: -1, + partitionIdx: int32(partitionIdx), + metrics: metrics, + epoch: 0, } p.setProducerState(producerInit) @@ -130,7 +128,6 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions if options.Name != "" { p.producerName = options.Name - p.userProvidedProducerName = true } err := p.grabCnx() @@ -183,6 +180,8 @@ func (p *partitionProducer) grabCnx() error { p.log.Debug("The partition consumer schema is nil") } + userProvidedProducerName := p.producerName != "" + cmdProducer := &pb.CommandProducer{ RequestId: proto.Uint64(id), Topic: proto.String(p.topic), @@ -190,7 +189,7 @@ func (p *partitionProducer) grabCnx() error { ProducerId: proto.Uint64(p.producerID), Schema: pbSchema, Epoch: proto.Uint64(atomic.LoadUint64(&p.epoch)), - UserProvidedProducerName: proto.Bool(p.userProvidedProducerName), + UserProvidedProducerName: proto.Bool(userProvidedProducerName), } if p.producerName != "" { @@ -308,6 +307,7 @@ func (p *partitionProducer) reconnectToBroker() { p.log.Info("Reconnecting to broker in ", d) time.Sleep(d) atomic.AddUint64(&p.epoch, 1) + p.log.WithField("epoch", atomic.LoadUint64(&p.epoch)) err := p.grabCnx() if err == nil { // Successfully reconnected From 089f7311904f0e32362c28c416ba41d1e63adcb6 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Mon, 9 Aug 2021 15:57:10 +0800 Subject: [PATCH 4/6] address comments --- pulsar/producer_partition.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 2b039cf9ac..64f981f60e 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -139,7 +139,6 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions p.log = p.log.SubLogger(log.Fields{ "producer_name": p.producerName, "producerID": p.producerID, - "epoch": atomic.LoadUint64(&p.epoch), }) p.log.WithField("cnx", p.cnx.ID()).Info("Created producer") @@ -307,7 +306,7 @@ func (p *partitionProducer) reconnectToBroker() { p.log.Info("Reconnecting to broker in ", d) time.Sleep(d) atomic.AddUint64(&p.epoch, 1) - p.log.WithField("epoch", atomic.LoadUint64(&p.epoch)) + p.log.WithField("epoch", atomic.LoadUint64(&p.epoch)).Debug("Reconnecting to broker with epoch ", atomic.LoadUint64(&p.epoch)) err := p.grabCnx() if err == nil { // Successfully reconnected From c57d8c1c6352f0cff2c59d4ce5f202d76eabeade Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Mon, 9 Aug 2021 15:59:14 +0800 Subject: [PATCH 5/6] update style --- pulsar/producer_partition.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 64f981f60e..4c52f1ca7d 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -306,7 +306,8 @@ func (p *partitionProducer) reconnectToBroker() { p.log.Info("Reconnecting to broker in ", d) time.Sleep(d) atomic.AddUint64(&p.epoch, 1) - p.log.WithField("epoch", atomic.LoadUint64(&p.epoch)).Debug("Reconnecting to broker with epoch ", atomic.LoadUint64(&p.epoch)) + p.log.WithField("epoch", atomic.LoadUint64(&p.epoch)).Debug( + "Reconnecting to broker with epoch ", atomic.LoadUint64(&p.epoch)) err := p.grabCnx() if err == nil { // Successfully reconnected From 9c83721dd360e9bcaea81d7bb1c5879cc5ee7bae Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Tue, 10 Aug 2021 10:23:28 +0800 Subject: [PATCH 6/6] better logging --- pulsar/producer_partition.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 4c52f1ca7d..ca6850d1d3 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -237,7 +237,10 @@ func (p *partitionProducer) grabCnx() error { } p.cnx = res.Cnx p.cnx.RegisterListener(p.producerID, p) - p.log.WithField("cnx", res.Cnx.ID()).Debug("Connected producer") + p.log.WithFields(log.Fields{ + "cnx": res.Cnx.ID(), + "epoch": atomic.LoadUint64(&p.epoch), + }).Debug("Connected producer") pendingItems := p.pendingQueue.ReadableSlice() viewSize := len(pendingItems) @@ -306,8 +309,6 @@ func (p *partitionProducer) reconnectToBroker() { p.log.Info("Reconnecting to broker in ", d) time.Sleep(d) atomic.AddUint64(&p.epoch, 1) - p.log.WithField("epoch", atomic.LoadUint64(&p.epoch)).Debug( - "Reconnecting to broker with epoch ", atomic.LoadUint64(&p.epoch)) err := p.grabCnx() if err == nil { // Successfully reconnected