From 1c9a15c5cd186a02f388a16b876c28000f0960ec Mon Sep 17 00:00:00 2001 From: Caleb Moore Date: Wed, 31 May 2023 16:46:48 +1000 Subject: [PATCH 1/3] Add epoch parameter to reconnect --- src/connection.rs | 5 ++++- src/producer.rs | 5 +++++ src/retry_op.rs | 2 ++ 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/src/connection.rs b/src/connection.rs index e55c086d..6ba50ea6 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -431,9 +431,10 @@ impl ConnectionSender { producer_id: u64, producer_name: Option, options: ProducerOptions, + epoch: u64, ) -> Result { let request_id = self.request_id.get(); - let msg = messages::create_producer(topic, producer_name, producer_id, request_id, options); + let msg = messages::create_producer(topic, producer_name, producer_id, request_id, options, epoch); self.send_message(msg, RequestKey::RequestId(request_id), |resp| { resp.command.producer_success }) @@ -1227,6 +1228,7 @@ pub(crate) mod messages { producer_id: u64, request_id: u64, options: ProducerOptions, + epoch: u64, ) -> Message { Message { command: proto::BaseCommand { @@ -1247,6 +1249,7 @@ pub(crate) mod messages { .collect(), schema: options.schema, producer_access_mode: options.access_mode, + epoch: Some(epoch), ..Default::default() }), ..Default::default() diff --git a/src/producer.rs b/src/producer.rs index 9a8c00ec..b7d2b7a8 100644 --- a/src/producer.rs +++ b/src/producer.rs @@ -433,6 +433,7 @@ struct TopicProducer { batch: Option>, compression: Option, options: ProducerOptions, + epoch: std::sync::atomic::AtomicU64, } impl TopicProducer { @@ -455,6 +456,7 @@ impl TopicProducer { let batch_byte_size = options.batch_byte_size; let compression = options.compression.clone(); let mut connection = client.manager.get_connection(&addr).await?; + let epoch = std::sync::atomic::AtomicU64::new(0); let producer_name = retry_create_producer( &client, @@ -464,6 +466,7 @@ impl TopicProducer { producer_id, name, &options, + &epoch, ) .await?; @@ -481,6 +484,7 @@ impl TopicProducer { batch, compression, options, + epoch, }) } @@ -727,6 +731,7 @@ impl TopicProducer { self.id, Some(self.name.clone()), &self.options, + &self.epoch, ) .await?; diff --git a/src/retry_op.rs b/src/retry_op.rs index 74f3f529..450562e9 100644 --- a/src/retry_op.rs +++ b/src/retry_op.rs @@ -156,6 +156,7 @@ pub async fn retry_create_producer( producer_id: u64, producer_name: Option, options: &ProducerOptions, + epoch: &std::sync::atomic::AtomicU64, ) -> Result { *connection = client.manager.get_connection(&addr).await?; let mut current_retries = 0u32; @@ -174,6 +175,7 @@ pub async fn retry_create_producer( producer_id, producer_name.clone(), options.clone(), + epoch.fetch_add(0, std::sync::atomic::Ordering::Relaxed) ) .await { From 00a4b1e21c1304424663615ffd47392aa93e3336 Mon Sep 17 00:00:00 2001 From: CMoore-Darwinium <84005891+CMoore-Darwinium@users.noreply.github.com> Date: Wed, 31 May 2023 17:22:38 +1000 Subject: [PATCH 2/3] Update retry_op.rs Add 1 to epoch --- src/retry_op.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/retry_op.rs b/src/retry_op.rs index 450562e9..f255b6b2 100644 --- a/src/retry_op.rs +++ b/src/retry_op.rs @@ -175,7 +175,7 @@ pub async fn retry_create_producer( producer_id, producer_name.clone(), options.clone(), - epoch.fetch_add(0, std::sync::atomic::Ordering::Relaxed) + epoch.fetch_add(1, std::sync::atomic::Ordering::Relaxed) ) .await { From 070709f060d879bcdf928266663be19da5dafd02 Mon Sep 17 00:00:00 2001 From: Caleb Moore Date: Wed, 31 May 2023 17:38:47 +1000 Subject: [PATCH 3/3] Pass whether producer name is user provided or not --- src/connection.rs | 5 ++++- src/producer.rs | 5 +++++ src/retry_op.rs | 4 +++- 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 6ba50ea6..6b9b8c0a 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -432,9 +432,10 @@ impl ConnectionSender { producer_name: Option, options: ProducerOptions, epoch: u64, + user_provided_producer_name: bool, ) -> Result { let request_id = self.request_id.get(); - let msg = messages::create_producer(topic, producer_name, producer_id, request_id, options, epoch); + let msg = messages::create_producer(topic, producer_name, producer_id, request_id, options, epoch, user_provided_producer_name); self.send_message(msg, RequestKey::RequestId(request_id), |resp| { resp.command.producer_success }) @@ -1229,6 +1230,7 @@ pub(crate) mod messages { request_id: u64, options: ProducerOptions, epoch: u64, + user_provided_producer_name: bool, ) -> Message { Message { command: proto::BaseCommand { @@ -1237,6 +1239,7 @@ pub(crate) mod messages { topic, producer_id, request_id, + user_provided_producer_name: Some(user_provided_producer_name), producer_name, encrypted: options.encrypted, metadata: options diff --git a/src/producer.rs b/src/producer.rs index b7d2b7a8..85e76939 100644 --- a/src/producer.rs +++ b/src/producer.rs @@ -434,6 +434,7 @@ struct TopicProducer { compression: Option, options: ProducerOptions, epoch: std::sync::atomic::AtomicU64, + user_provided_producer_name: bool, } impl TopicProducer { @@ -457,6 +458,7 @@ impl TopicProducer { let compression = options.compression.clone(); let mut connection = client.manager.get_connection(&addr).await?; let epoch = std::sync::atomic::AtomicU64::new(0); + let user_provided_producer_name = name.is_some(); let producer_name = retry_create_producer( &client, @@ -467,6 +469,7 @@ impl TopicProducer { name, &options, &epoch, + user_provided_producer_name, ) .await?; @@ -485,6 +488,7 @@ impl TopicProducer { compression, options, epoch, + user_provided_producer_name, }) } @@ -732,6 +736,7 @@ impl TopicProducer { Some(self.name.clone()), &self.options, &self.epoch, + self.user_provided_producer_name, ) .await?; diff --git a/src/retry_op.rs b/src/retry_op.rs index f255b6b2..5c41b760 100644 --- a/src/retry_op.rs +++ b/src/retry_op.rs @@ -157,6 +157,7 @@ pub async fn retry_create_producer( producer_name: Option, options: &ProducerOptions, epoch: &std::sync::atomic::AtomicU64, + user_provided_producer_name: bool, ) -> Result { *connection = client.manager.get_connection(&addr).await?; let mut current_retries = 0u32; @@ -175,7 +176,8 @@ pub async fn retry_create_producer( producer_id, producer_name.clone(), options.clone(), - epoch.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + epoch.fetch_add(1, std::sync::atomic::Ordering::Relaxed), + user_provided_producer_name ) .await {