11/*
2- * Copyright 2016-2023 the original author or authors.
2+ * Copyright 2016-2024 the original author or authors.
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
125125import org .springframework .kafka .test .context .EmbeddedKafka ;
126126import org .springframework .kafka .test .utils .ContainerTestUtils ;
127127import org .springframework .kafka .test .utils .KafkaTestUtils ;
128+ import org .springframework .lang .NonNull ;
128129import org .springframework .lang .Nullable ;
129130import org .springframework .scheduling .concurrent .ThreadPoolTaskScheduler ;
130131import org .springframework .transaction .PlatformTransactionManager ;
140141 * @author Lukasz Kaminski
141142 * @author Ray Chuan Tay
142143 * @author Daniel Gentes
144+ * @author Soby Chacko
143145 */
144146@ EmbeddedKafka (topics = { KafkaMessageListenerContainerTests .topic1 , KafkaMessageListenerContainerTests .topic2 ,
145147 KafkaMessageListenerContainerTests .topic3 , KafkaMessageListenerContainerTests .topic4 ,
@@ -235,8 +237,7 @@ public void testDelegateType() throws Exception {
235237 container .setBeanName ("delegate" );
236238 AtomicReference <List <TopicPartitionOffset >> offsets = new AtomicReference <>();
237239 container .setApplicationEventPublisher (e -> {
238- if (e instanceof ConsumerStoppingEvent ) {
239- ConsumerStoppingEvent event = (ConsumerStoppingEvent ) e ;
240+ if (e instanceof ConsumerStoppingEvent event ) {
240241 offsets .set (event .getPartitions ().stream ()
241242 .map (p -> new TopicPartitionOffset (p .topic (), p .partition (),
242243 event .getConsumer ().position (p , Duration .ofMillis (10_000 ))))
@@ -929,7 +930,7 @@ public void testRecordAckAfterStop() throws Exception {
929930 Consumer <Integer , String > consumer = mock (Consumer .class );
930931 given (cf .createConsumer (eq ("grp" ), eq ("clientId" ), isNull (), any ())).willReturn (consumer );
931932 final Map <TopicPartition , List <ConsumerRecord <Integer , String >>> records = new HashMap <>();
932- records .put (new TopicPartition ("foo" , 0 ), Arrays . asList (
933+ records .put (new TopicPartition ("foo" , 0 ), List . of (
933934 new ConsumerRecord <>("foo" , 0 , 0L , 1 , "foo" )));
934935 ConsumerRecords <Integer , String > consumerRecords = new ConsumerRecords <>(records );
935936 given (consumer .poll (any (Duration .class ))).willAnswer (i -> {
@@ -1344,7 +1345,6 @@ else if (entry.getValue().offset() == 2) {
13441345 logger .info ("Stop batch listener manual" );
13451346 }
13461347
1347- @ SuppressWarnings ("deprecation" )
13481348 @ Test
13491349 public void testBatchListenerErrors () throws Exception {
13501350 logger .info ("Start batch listener errors" );
@@ -1417,7 +1417,7 @@ public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, C
14171417 logger .info ("Stop batch listener errors" );
14181418 }
14191419
1420- @ SuppressWarnings ({ "unchecked" , "deprecation" })
1420+ @ SuppressWarnings ({ "unchecked" })
14211421 @ Test
14221422 public void testBatchListenerAckAfterRecoveryMock () throws Exception {
14231423 ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
@@ -1680,7 +1680,7 @@ public void testDefinedPartitions() throws Exception {
16801680 @ Override
16811681 protected KafkaConsumer <Integer , String > createKafkaConsumer (Map <String , Object > configs ) {
16821682 assertThat (configs ).containsKey (ConsumerConfig .MAX_POLL_RECORDS_CONFIG );
1683- return new KafkaConsumer <Integer , String >(props ) {
1683+ return new KafkaConsumer <>(props ) {
16841684
16851685 @ Override
16861686 public ConsumerRecords <Integer , String > poll (Duration timeout ) {
@@ -2281,10 +2281,8 @@ public void testStaticAssign() throws Exception {
22812281 Map <String , Object > props = KafkaTestUtils .consumerProps ("testStatic" , "false" , embeddedKafka );
22822282
22832283 DefaultKafkaConsumerFactory <Integer , String > cf = new DefaultKafkaConsumerFactory <>(props );
2284- ContainerProperties containerProps = new ContainerProperties (new TopicPartitionOffset [] {
2285- new TopicPartitionOffset (topic22 , 0 ),
2286- new TopicPartitionOffset (topic22 , 1 )
2287- });
2284+ ContainerProperties containerProps = new ContainerProperties (new TopicPartitionOffset (topic22 , 0 ),
2285+ new TopicPartitionOffset (topic22 , 1 ));
22882286 final CountDownLatch latch = new CountDownLatch (1 );
22892287 final List <ConsumerRecord <Integer , String >> received = new ArrayList <>();
22902288 containerProps .setMessageListener ((MessageListener <Integer , String >) record -> {
@@ -2362,15 +2360,15 @@ public void testBadListenerType() {
23622360 containerProps .setMissingTopicsFatal (false );
23632361 KafkaMessageListenerContainer <Integer , Foo1 > badContainer =
23642362 new KafkaMessageListenerContainer <>(cf , containerProps );
2365- assertThatIllegalStateException ().isThrownBy (() -> badContainer . start () )
2363+ assertThatIllegalStateException ().isThrownBy (badContainer :: start )
23662364 .withMessageContaining ("implementation must be provided" );
23672365 badContainer .setupMessageListener ((GenericMessageListener <String >) data -> {
23682366 });
23692367 assertThat (badContainer .getAssignedPartitions ()).isNull ();
23702368 badContainer .pause ();
23712369 assertThat (badContainer .isContainerPaused ()).isFalse ();
23722370 assertThat (badContainer .metrics ()).isEqualTo (Collections .emptyMap ());
2373- assertThatIllegalArgumentException ().isThrownBy (() -> badContainer . start () )
2371+ assertThatIllegalArgumentException ().isThrownBy (badContainer :: start )
23742372 .withMessageContaining ("Listener must be" );
23752373 assertThat (badContainer .toString ()).contains ("none assigned" );
23762374
@@ -2387,7 +2385,7 @@ public void testBadAckMode() {
23872385 new KafkaMessageListenerContainer <>(cf , containerProps );
23882386 badContainer .setupMessageListener ((MessageListener <String , String >) m -> {
23892387 });
2390- assertThatIllegalStateException ().isThrownBy (() -> badContainer . start () )
2388+ assertThatIllegalStateException ().isThrownBy (badContainer :: start )
23912389 .withMessageContaining ("Consumer cannot be configured for auto commit for ackMode" );
23922390
23932391 }
@@ -2566,14 +2564,16 @@ public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, Consumer
25662564 public void onMessage (ConsumerRecord <String , String > data ) {
25672565 if (data .partition () == 0 && data .offset () == 0 ) {
25682566 TopicPartition topicPartition = new TopicPartition (data .topic (), data .partition ());
2569- getSeekCallbackFor (topicPartition ).seekToBeginning (records .keySet ());
2567+ final ConsumerSeekCallback seekCallbackFor = getSeekCallbackFor (topicPartition );
2568+ assertThat (seekCallbackFor ).isNotNull ();
2569+ seekCallbackFor .seekToBeginning (records .keySet ());
25702570 Iterator <TopicPartition > iterator = records .keySet ().iterator ();
2571- getSeekCallbackFor ( topicPartition ) .seekToBeginning (Collections .singletonList (iterator .next ()));
2572- getSeekCallbackFor ( topicPartition ) .seekToBeginning (Collections .singletonList (iterator .next ()));
2573- getSeekCallbackFor ( topicPartition ) .seekToEnd (records .keySet ());
2571+ seekCallbackFor .seekToBeginning (Collections .singletonList (iterator .next ()));
2572+ seekCallbackFor .seekToBeginning (Collections .singletonList (iterator .next ()));
2573+ seekCallbackFor .seekToEnd (records .keySet ());
25742574 iterator = records .keySet ().iterator ();
2575- getSeekCallbackFor ( topicPartition ) .seekToEnd (Collections .singletonList (iterator .next ()));
2576- getSeekCallbackFor ( topicPartition ) .seekToEnd (Collections .singletonList (iterator .next ()));
2575+ seekCallbackFor .seekToEnd (Collections .singletonList (iterator .next ()));
2576+ seekCallbackFor .seekToEnd (Collections .singletonList (iterator .next ()));
25772577 }
25782578 }
25792579
@@ -2679,7 +2679,7 @@ public void dontResumePausedPartition() throws Exception {
26792679 containerProps .setAckMode (AckMode .RECORD );
26802680 containerProps .setClientId ("clientId" );
26812681 containerProps .setIdleEventInterval (100L );
2682- containerProps .setMessageListener ((MessageListener ) rec -> { });
2682+ containerProps .setMessageListener ((MessageListener <?, ?> ) rec -> { });
26832683 containerProps .setMissingTopicsFatal (false );
26842684 KafkaMessageListenerContainer <Integer , String > container =
26852685 new KafkaMessageListenerContainer <>(cf , containerProps );
@@ -2746,7 +2746,7 @@ public void rePausePartitionAfterRebalance() throws Exception {
27462746 containerProps .setAckMode (AckMode .RECORD );
27472747 containerProps .setClientId ("clientId" );
27482748 containerProps .setIdleEventInterval (100L );
2749- containerProps .setMessageListener ((MessageListener ) rec -> { });
2749+ containerProps .setMessageListener ((MessageListener <?, ?> ) rec -> { });
27502750 containerProps .setMissingTopicsFatal (false );
27512751 KafkaMessageListenerContainer <Integer , String > container =
27522752 new KafkaMessageListenerContainer <>(cf , containerProps );
@@ -2828,7 +2828,7 @@ public void resumePartitionAfterRevokeAndReAssign() throws Exception {
28282828 containerProps .setAckMode (AckMode .RECORD );
28292829 containerProps .setClientId ("clientId" );
28302830 containerProps .setIdleEventInterval (100L );
2831- containerProps .setMessageListener ((MessageListener ) rec -> { });
2831+ containerProps .setMessageListener ((MessageListener <?, ?> ) rec -> { });
28322832 containerProps .setMissingTopicsFatal (false );
28332833 KafkaMessageListenerContainer <Integer , String > container =
28342834 new KafkaMessageListenerContainer <>(cf , containerProps );
@@ -2956,7 +2956,7 @@ public void testIdleEarlyExit() throws Exception {
29562956 container .start ();
29572957 assertThat (latch .await (10 , TimeUnit .SECONDS )).isTrue ();
29582958 new DirectFieldAccessor (container ).setPropertyValue ("listenerConsumer.assignedPartitions" ,
2959- Arrays . asList (new TopicPartition ("foo" , 0 )));
2959+ List . of (new TopicPartition ("foo" , 0 )));
29602960 Thread .sleep (500 );
29612961 long t1 = System .currentTimeMillis ();
29622962 container .stop ();
@@ -3061,16 +3061,12 @@ public void testAckModeCount() throws Exception {
30613061 given (consumer .poll (any (Duration .class ))).willAnswer (i -> {
30623062 Thread .sleep (50 );
30633063 int recordsToUse = which .incrementAndGet ();
3064- switch (recordsToUse ) {
3065- case 1 :
3066- return consumerRecords1 ;
3067- case 2 :
3068- return consumerRecords2 ;
3069- case 3 :
3070- return consumerRecords3 ;
3071- default :
3072- return emptyRecords ;
3073- }
3064+ return switch (recordsToUse ) {
3065+ case 1 -> consumerRecords1 ;
3066+ case 2 -> consumerRecords2 ;
3067+ case 3 -> consumerRecords3 ;
3068+ default -> emptyRecords ;
3069+ };
30743070 });
30753071 final CountDownLatch commitLatch = new CountDownLatch (3 );
30763072 willAnswer (i -> {
@@ -3108,7 +3104,7 @@ public void testAckModeCount() throws Exception {
31083104 container .stop ();
31093105 }
31103106
3111- @ SuppressWarnings ({ "unchecked" , "rawtypes" , "deprecation" })
3107+ @ SuppressWarnings ({ "unchecked" , "rawtypes" })
31123108 @ Test
31133109 public void testCommitErrorHandlerCalled () throws Exception {
31143110 ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
@@ -3436,7 +3432,7 @@ public void testCooperativeRebalance() throws Exception {
34363432 ContainerProperties containerProps = new ContainerProperties ("foo" );
34373433 containerProps .setGroupId ("grp" );
34383434 containerProps .setClientId ("clientId" );
3439- containerProps .setMessageListener ((MessageListener ) msg -> { });
3435+ containerProps .setMessageListener ((MessageListener <?, ?> ) msg -> { });
34403436 Properties consumerProps = new Properties ();
34413437 KafkaMessageListenerContainer <Integer , String > container =
34423438 new KafkaMessageListenerContainer <>(cf , containerProps );
@@ -3468,7 +3464,7 @@ void testCommitRebalanceInProgressRecord() throws Exception {
34683464 assertThat (commits .get (5 )).hasSize (2 ); // GH-2489: offsets for both partition should be re-committed before partition 1 is revoked
34693465 assertThat (commits .get (5 ).get (new TopicPartition ("foo" , 1 )))
34703466 .isNotNull ()
3471- .extracting (om -> om . offset () )
3467+ .extracting (OffsetAndMetadata :: offset )
34723468 .isEqualTo (2L );
34733469 });
34743470 }
@@ -3528,7 +3524,7 @@ else if (call == 1) {
35283524 containerProps .setAckMode (ackMode );
35293525 containerProps .setClientId ("clientId" );
35303526 containerProps .setIdleEventInterval (100L );
3531- containerProps .setMessageListener ((MessageListener ) msg -> { });
3527+ containerProps .setMessageListener ((MessageListener <?, ?> ) msg -> { });
35323528 Properties consumerProps = new Properties ();
35333529 containerProps .setKafkaConsumerProperties (consumerProps );
35343530 KafkaMessageListenerContainer <Integer , String > container =
@@ -3539,7 +3535,7 @@ else if (call == 1) {
35393535 verifier .accept (commits );
35403536 }
35413537
3542- @ SuppressWarnings ({ "unchecked" , "rawtypes" })
3538+ @ SuppressWarnings ({ "unchecked" })
35433539 @ Test
35443540 void testCommitFailsOnRevoke () throws Exception {
35453541 ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
@@ -3672,7 +3668,7 @@ void commitAfterHandleManual() throws InterruptedException {
36723668 cfProps .put (ConsumerConfig .DEFAULT_API_TIMEOUT_MS_CONFIG , 45000 ); // wins
36733669 given (cf .getConfigurationProperties ()).willReturn (cfProps );
36743670 final Map <TopicPartition , List <ConsumerRecord <Integer , String >>> records = new HashMap <>();
3675- records .put (new TopicPartition ("foo" , 0 ), Arrays . asList (
3671+ records .put (new TopicPartition ("foo" , 0 ), List . of (
36763672 new ConsumerRecord <>("foo" , 0 , 0L , 1 , "foo" )));
36773673 ConsumerRecords <Integer , String > consumerRecords = new ConsumerRecords <>(records );
36783674 ConsumerRecords <Integer , String > emptyRecords = new ConsumerRecords <>(Collections .emptyMap ());
@@ -3755,7 +3751,7 @@ void stopImmediately() throws InterruptedException {
37553751 }
37563752
37573753 @ Test
3758- @ SuppressWarnings ({"unchecked" , "deprecation" })
3754+ @ SuppressWarnings ({"unchecked" })
37593755 public void testInvokeRecordInterceptorSuccess () throws Exception {
37603756 ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
37613757 Consumer <Integer , String > consumer = mock (Consumer .class );
@@ -3797,7 +3793,7 @@ public void onMessage(ConsumerRecord<Integer, String> data) {
37973793 RecordInterceptor <Integer , String > recordInterceptor = spy (new RecordInterceptor <Integer , String >() {
37983794
37993795 @ Override
3800- @ Nullable
3796+ @ NonNull
38013797 public ConsumerRecord <Integer , String > intercept (ConsumerRecord <Integer , String > record ,
38023798 Consumer <Integer , String > consumer ) {
38033799
@@ -3843,7 +3839,7 @@ private static Stream<Arguments> paramsForRecordAllSkipped() {
38433839
38443840 @ ParameterizedTest (name = "{index} testInvokeRecordInterceptorAllSkipped AckMode.{0} early intercept {1}" )
38453841 @ MethodSource ("paramsForRecordAllSkipped" )
3846- @ SuppressWarnings ({"unchecked" , "deprecation" })
3842+ @ SuppressWarnings ({"unchecked" })
38473843 public void testInvokeRecordInterceptorAllSkipped (AckMode ackMode , boolean early ) throws Exception {
38483844 ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
38493845 Consumer <Integer , String > consumer = mock (Consumer .class );
@@ -3870,7 +3866,7 @@ public void testInvokeRecordInterceptorAllSkipped(AckMode ackMode, boolean early
38703866 containerProps .setGroupId ("grp" );
38713867 containerProps .setAckMode (ackMode );
38723868
3873- containerProps .setMessageListener ((MessageListener ) msg -> {
3869+ containerProps .setMessageListener ((MessageListener <?, ?> ) msg -> {
38743870 });
38753871 containerProps .setClientId ("clientId" );
38763872
@@ -3913,7 +3909,7 @@ public ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String>
39133909
39143910 @ ParameterizedTest (name = "{index} testInvokeBatchInterceptorAllSkipped early intercept {0}" )
39153911 @ ValueSource (booleans = { true , false })
3916- @ SuppressWarnings ({"unchecked" , "deprecation" })
3912+ @ SuppressWarnings ({"unchecked" })
39173913 public void testInvokeBatchInterceptorAllSkipped (boolean early ) throws Exception {
39183914 ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
39193915 Consumer <Integer , String > consumer = mock (Consumer .class );
@@ -3940,7 +3936,7 @@ public void testInvokeBatchInterceptorAllSkipped(boolean early) throws Exception
39403936 containerProps .setGroupId ("grp" );
39413937 containerProps .setAckMode (AckMode .BATCH );
39423938
3943- containerProps .setMessageListener ((BatchMessageListener ) msgs -> {
3939+ containerProps .setMessageListener ((BatchMessageListener <?, ?> ) msgs -> {
39443940 });
39453941 containerProps .setClientId ("clientId" );
39463942 if (!early ) {
@@ -3976,7 +3972,7 @@ public ConsumerRecords<Integer, String> intercept(ConsumerRecords<Integer, Strin
39763972 }
39773973
39783974 @ Test
3979- @ SuppressWarnings ({"unchecked" , "deprecation" })
3975+ @ SuppressWarnings ({"unchecked" })
39803976 public void testInvokeRecordInterceptorFailure () throws Exception {
39813977 ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
39823978 Consumer <Integer , String > consumer = mock (Consumer .class );
@@ -4016,7 +4012,7 @@ public void onMessage(ConsumerRecord<Integer, String> data) {
40164012 RecordInterceptor <Integer , String > recordInterceptor = spy (new RecordInterceptor <Integer , String >() {
40174013
40184014 @ Override
4019- @ Nullable
4015+ @ NonNull
40204016 public ConsumerRecord <Integer , String > intercept (ConsumerRecord <Integer , String > record ,
40214017 Consumer <Integer , String > consumer ) {
40224018
0 commit comments