@@ -2424,22 +2424,22 @@ && checkForStartOffsetWithinBatch(inFlightBatch.firstOffset(), inFlightBatch.las
2424
2424
releaseAcquisitionLockOnTimeoutForPerOffsetBatch (inFlightBatch , stateBatches , memberId , firstOffset , lastOffset );
2425
2425
}
2426
2426
}
2427
-
2428
- if (!stateBatches .isEmpty ()) {
2429
- writeShareGroupState (stateBatches ).whenComplete ((result , exception ) -> {
2430
- if (exception != null ) {
2431
- log .debug ("Failed to write the share group state on acquisition lock timeout for share partition: {}-{} memberId: {}" ,
2432
- groupId , topicIdPartition , memberId , exception );
2433
- }
2434
- // Even if write share group state RPC call fails, we will still go ahead with the state transition.
2435
- // Update the cached state and start and end offsets after releasing the acquisition lock on timeout.
2436
- maybeUpdateCachedStateAndOffsets ();
2437
- });
2438
- }
2439
2427
} finally {
2440
2428
lock .writeLock ().unlock ();
2441
2429
}
2442
2430
2431
+ if (!stateBatches .isEmpty ()) {
2432
+ writeShareGroupState (stateBatches ).whenComplete ((result , exception ) -> {
2433
+ if (exception != null ) {
2434
+ log .debug ("Failed to write the share group state on acquisition lock timeout for share partition: {}-{} memberId: {}" ,
2435
+ groupId , topicIdPartition , memberId , exception );
2436
+ }
2437
+ // Even if write share group state RPC call fails, we will still go ahead with the state transition.
2438
+ // Update the cached state and start and end offsets after releasing the acquisition lock on timeout.
2439
+ maybeUpdateCachedStateAndOffsets ();
2440
+ });
2441
+ }
2442
+
2443
2443
// If we have an acquisition lock timeout for a share-partition, then we should check if
2444
2444
// there is a pending share fetch request for the share-partition and complete it.
2445
2445
// Skip null check for stateBatches, it should always be initialized if reached here.
0 commit comments