11/*
2- * Copyright 2016-2023 the original author or authors.
2+ * Copyright 2016-2024 the original author or authors.
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
125125import org .springframework .kafka .test .context .EmbeddedKafka ;
126126import org .springframework .kafka .test .utils .ContainerTestUtils ;
127127import org .springframework .kafka .test .utils .KafkaTestUtils ;
128+ import org .springframework .lang .NonNull ;
128129import org .springframework .lang .Nullable ;
129130import org .springframework .scheduling .concurrent .ThreadPoolTaskScheduler ;
130131import org .springframework .transaction .PlatformTransactionManager ;
140141 * @author Lukasz Kaminski
141142 * @author Ray Chuan Tay
142143 * @author Daniel Gentes
144+ * @author Soby Chacko
143145 */
144146@ EmbeddedKafka (topics = { KafkaMessageListenerContainerTests .topic1 , KafkaMessageListenerContainerTests .topic2 ,
145147 KafkaMessageListenerContainerTests .topic3 , KafkaMessageListenerContainerTests .topic4 ,
@@ -928,7 +930,7 @@ public void testRecordAckAfterStop() throws Exception {
928930 Consumer <Integer , String > consumer = mock (Consumer .class );
929931 given (cf .createConsumer (eq ("grp" ), eq ("clientId" ), isNull (), any ())).willReturn (consumer );
930932 final Map <TopicPartition , List <ConsumerRecord <Integer , String >>> records = new HashMap <>();
931- records .put (new TopicPartition ("foo" , 0 ), Arrays . asList (
933+ records .put (new TopicPartition ("foo" , 0 ), List . of (
932934 new ConsumerRecord <>("foo" , 0 , 0L , 1 , "foo" )));
933935 ConsumerRecords <Integer , String > consumerRecords = new ConsumerRecords <>(records );
934936 given (consumer .poll (any (Duration .class ))).willAnswer (i -> {
@@ -1343,7 +1345,6 @@ else if (entry.getValue().offset() == 2) {
13431345 logger .info ("Stop batch listener manual" );
13441346 }
13451347
1346- @ SuppressWarnings ("deprecation" )
13471348 @ Test
13481349 public void testBatchListenerErrors () throws Exception {
13491350 logger .info ("Start batch listener errors" );
@@ -1416,7 +1417,7 @@ public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, C
14161417 logger .info ("Stop batch listener errors" );
14171418 }
14181419
1419- @ SuppressWarnings ({ "unchecked" , "deprecation" })
1420+ @ SuppressWarnings ({ "unchecked" })
14201421 @ Test
14211422 public void testBatchListenerAckAfterRecoveryMock () throws Exception {
14221423 ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
@@ -1679,7 +1680,7 @@ public void testDefinedPartitions() throws Exception {
16791680 @ Override
16801681 protected KafkaConsumer <Integer , String > createKafkaConsumer (Map <String , Object > configs ) {
16811682 assertThat (configs ).containsKey (ConsumerConfig .MAX_POLL_RECORDS_CONFIG );
1682- return new KafkaConsumer <Integer , String >(props ) {
1683+ return new KafkaConsumer <>(props ) {
16831684
16841685 @ Override
16851686 public ConsumerRecords <Integer , String > poll (Duration timeout ) {
@@ -2280,10 +2281,8 @@ public void testStaticAssign() throws Exception {
22802281 Map <String , Object > props = KafkaTestUtils .consumerProps ("testStatic" , "false" , embeddedKafka );
22812282
22822283 DefaultKafkaConsumerFactory <Integer , String > cf = new DefaultKafkaConsumerFactory <>(props );
2283- ContainerProperties containerProps = new ContainerProperties (new TopicPartitionOffset [] {
2284- new TopicPartitionOffset (topic22 , 0 ),
2285- new TopicPartitionOffset (topic22 , 1 )
2286- });
2284+ ContainerProperties containerProps = new ContainerProperties (new TopicPartitionOffset (topic22 , 0 ),
2285+ new TopicPartitionOffset (topic22 , 1 ));
22872286 final CountDownLatch latch = new CountDownLatch (1 );
22882287 final List <ConsumerRecord <Integer , String >> received = new ArrayList <>();
22892288 containerProps .setMessageListener ((MessageListener <Integer , String >) record -> {
@@ -2361,15 +2360,15 @@ public void testBadListenerType() {
23612360 containerProps .setMissingTopicsFatal (false );
23622361 KafkaMessageListenerContainer <Integer , Foo1 > badContainer =
23632362 new KafkaMessageListenerContainer <>(cf , containerProps );
2364- assertThatIllegalStateException ().isThrownBy (() -> badContainer . start () )
2363+ assertThatIllegalStateException ().isThrownBy (badContainer :: start )
23652364 .withMessageContaining ("implementation must be provided" );
23662365 badContainer .setupMessageListener ((GenericMessageListener <String >) data -> {
23672366 });
23682367 assertThat (badContainer .getAssignedPartitions ()).isNull ();
23692368 badContainer .pause ();
23702369 assertThat (badContainer .isContainerPaused ()).isFalse ();
23712370 assertThat (badContainer .metrics ()).isEqualTo (Collections .emptyMap ());
2372- assertThatIllegalArgumentException ().isThrownBy (() -> badContainer . start () )
2371+ assertThatIllegalArgumentException ().isThrownBy (badContainer :: start )
23732372 .withMessageContaining ("Listener must be" );
23742373 assertThat (badContainer .toString ()).contains ("none assigned" );
23752374
@@ -2386,7 +2385,7 @@ public void testBadAckMode() {
23862385 new KafkaMessageListenerContainer <>(cf , containerProps );
23872386 badContainer .setupMessageListener ((MessageListener <String , String >) m -> {
23882387 });
2389- assertThatIllegalStateException ().isThrownBy (() -> badContainer . start () )
2388+ assertThatIllegalStateException ().isThrownBy (badContainer :: start )
23902389 .withMessageContaining ("Consumer cannot be configured for auto commit for ackMode" );
23912390
23922391 }
@@ -2565,14 +2564,16 @@ public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, Consumer
25652564 public void onMessage (ConsumerRecord <String , String > data ) {
25662565 if (data .partition () == 0 && data .offset () == 0 ) {
25672566 TopicPartition topicPartition = new TopicPartition (data .topic (), data .partition ());
2568- getSeekCallbackFor (topicPartition ).seekToBeginning (records .keySet ());
2567+ final ConsumerSeekCallback seekCallbackFor = getSeekCallbackFor (topicPartition );
2568+ assertThat (seekCallbackFor ).isNotNull ();
2569+ seekCallbackFor .seekToBeginning (records .keySet ());
25692570 Iterator <TopicPartition > iterator = records .keySet ().iterator ();
2570- getSeekCallbackFor ( topicPartition ) .seekToBeginning (Collections .singletonList (iterator .next ()));
2571- getSeekCallbackFor ( topicPartition ) .seekToBeginning (Collections .singletonList (iterator .next ()));
2572- getSeekCallbackFor ( topicPartition ) .seekToEnd (records .keySet ());
2571+ seekCallbackFor .seekToBeginning (Collections .singletonList (iterator .next ()));
2572+ seekCallbackFor .seekToBeginning (Collections .singletonList (iterator .next ()));
2573+ seekCallbackFor .seekToEnd (records .keySet ());
25732574 iterator = records .keySet ().iterator ();
2574- getSeekCallbackFor ( topicPartition ) .seekToEnd (Collections .singletonList (iterator .next ()));
2575- getSeekCallbackFor ( topicPartition ) .seekToEnd (Collections .singletonList (iterator .next ()));
2575+ seekCallbackFor .seekToEnd (Collections .singletonList (iterator .next ()));
2576+ seekCallbackFor .seekToEnd (Collections .singletonList (iterator .next ()));
25762577 }
25772578 }
25782579
@@ -2678,7 +2679,7 @@ public void dontResumePausedPartition() throws Exception {
26782679 containerProps .setAckMode (AckMode .RECORD );
26792680 containerProps .setClientId ("clientId" );
26802681 containerProps .setIdleEventInterval (100L );
2681- containerProps .setMessageListener ((MessageListener ) rec -> { });
2682+ containerProps .setMessageListener ((MessageListener <?, ?> ) rec -> { });
26822683 containerProps .setMissingTopicsFatal (false );
26832684 KafkaMessageListenerContainer <Integer , String > container =
26842685 new KafkaMessageListenerContainer <>(cf , containerProps );
@@ -2745,7 +2746,7 @@ public void rePausePartitionAfterRebalance() throws Exception {
27452746 containerProps .setAckMode (AckMode .RECORD );
27462747 containerProps .setClientId ("clientId" );
27472748 containerProps .setIdleEventInterval (100L );
2748- containerProps .setMessageListener ((MessageListener ) rec -> { });
2749+ containerProps .setMessageListener ((MessageListener <?, ?> ) rec -> { });
27492750 containerProps .setMissingTopicsFatal (false );
27502751 KafkaMessageListenerContainer <Integer , String > container =
27512752 new KafkaMessageListenerContainer <>(cf , containerProps );
@@ -2827,7 +2828,7 @@ public void resumePartitionAfterRevokeAndReAssign() throws Exception {
28272828 containerProps .setAckMode (AckMode .RECORD );
28282829 containerProps .setClientId ("clientId" );
28292830 containerProps .setIdleEventInterval (100L );
2830- containerProps .setMessageListener ((MessageListener ) rec -> { });
2831+ containerProps .setMessageListener ((MessageListener <?, ?> ) rec -> { });
28312832 containerProps .setMissingTopicsFatal (false );
28322833 KafkaMessageListenerContainer <Integer , String > container =
28332834 new KafkaMessageListenerContainer <>(cf , containerProps );
@@ -2955,7 +2956,7 @@ public void testIdleEarlyExit() throws Exception {
29552956 container .start ();
29562957 assertThat (latch .await (10 , TimeUnit .SECONDS )).isTrue ();
29572958 new DirectFieldAccessor (container ).setPropertyValue ("listenerConsumer.assignedPartitions" ,
2958- Arrays . asList (new TopicPartition ("foo" , 0 )));
2959+ List . of (new TopicPartition ("foo" , 0 )));
29592960 Thread .sleep (500 );
29602961 long t1 = System .currentTimeMillis ();
29612962 container .stop ();
@@ -3060,16 +3061,12 @@ public void testAckModeCount() throws Exception {
30603061 given (consumer .poll (any (Duration .class ))).willAnswer (i -> {
30613062 Thread .sleep (50 );
30623063 int recordsToUse = which .incrementAndGet ();
3063- switch (recordsToUse ) {
3064- case 1 :
3065- return consumerRecords1 ;
3066- case 2 :
3067- return consumerRecords2 ;
3068- case 3 :
3069- return consumerRecords3 ;
3070- default :
3071- return emptyRecords ;
3072- }
3064+ return switch (recordsToUse ) {
3065+ case 1 -> consumerRecords1 ;
3066+ case 2 -> consumerRecords2 ;
3067+ case 3 -> consumerRecords3 ;
3068+ default -> emptyRecords ;
3069+ };
30733070 });
30743071 final CountDownLatch commitLatch = new CountDownLatch (3 );
30753072 willAnswer (i -> {
@@ -3107,7 +3104,7 @@ public void testAckModeCount() throws Exception {
31073104 container .stop ();
31083105 }
31093106
3110- @ SuppressWarnings ({ "unchecked" , "rawtypes" , "deprecation" })
3107+ @ SuppressWarnings ({ "unchecked" , "rawtypes" })
31113108 @ Test
31123109 public void testCommitErrorHandlerCalled () throws Exception {
31133110 ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
@@ -3435,7 +3432,7 @@ public void testCooperativeRebalance() throws Exception {
34353432 ContainerProperties containerProps = new ContainerProperties ("foo" );
34363433 containerProps .setGroupId ("grp" );
34373434 containerProps .setClientId ("clientId" );
3438- containerProps .setMessageListener ((MessageListener ) msg -> { });
3435+ containerProps .setMessageListener ((MessageListener <?, ?> ) msg -> { });
34393436 Properties consumerProps = new Properties ();
34403437 KafkaMessageListenerContainer <Integer , String > container =
34413438 new KafkaMessageListenerContainer <>(cf , containerProps );
@@ -3467,7 +3464,7 @@ void testCommitRebalanceInProgressRecord() throws Exception {
34673464 assertThat (commits .get (5 )).hasSize (2 ); // GH-2489: offsets for both partition should be re-committed before partition 1 is revoked
34683465 assertThat (commits .get (5 ).get (new TopicPartition ("foo" , 1 )))
34693466 .isNotNull ()
3470- .extracting (om -> om . offset () )
3467+ .extracting (OffsetAndMetadata :: offset )
34713468 .isEqualTo (2L );
34723469 });
34733470 }
@@ -3527,7 +3524,7 @@ else if (call == 1) {
35273524 containerProps .setAckMode (ackMode );
35283525 containerProps .setClientId ("clientId" );
35293526 containerProps .setIdleEventInterval (100L );
3530- containerProps .setMessageListener ((MessageListener ) msg -> { });
3527+ containerProps .setMessageListener ((MessageListener <?, ?> ) msg -> { });
35313528 Properties consumerProps = new Properties ();
35323529 containerProps .setKafkaConsumerProperties (consumerProps );
35333530 KafkaMessageListenerContainer <Integer , String > container =
@@ -3538,7 +3535,7 @@ else if (call == 1) {
35383535 verifier .accept (commits );
35393536 }
35403537
3541- @ SuppressWarnings ({ "unchecked" , "rawtypes" })
3538+ @ SuppressWarnings ({ "unchecked" })
35423539 @ Test
35433540 void testCommitFailsOnRevoke () throws Exception {
35443541 ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
@@ -3671,7 +3668,7 @@ void commitAfterHandleManual() throws InterruptedException {
36713668 cfProps .put (ConsumerConfig .DEFAULT_API_TIMEOUT_MS_CONFIG , 45000 ); // wins
36723669 given (cf .getConfigurationProperties ()).willReturn (cfProps );
36733670 final Map <TopicPartition , List <ConsumerRecord <Integer , String >>> records = new HashMap <>();
3674- records .put (new TopicPartition ("foo" , 0 ), Arrays . asList (
3671+ records .put (new TopicPartition ("foo" , 0 ), List . of (
36753672 new ConsumerRecord <>("foo" , 0 , 0L , 1 , "foo" )));
36763673 ConsumerRecords <Integer , String > consumerRecords = new ConsumerRecords <>(records );
36773674 ConsumerRecords <Integer , String > emptyRecords = new ConsumerRecords <>(Collections .emptyMap ());
@@ -3754,7 +3751,7 @@ void stopImmediately() throws InterruptedException {
37543751 }
37553752
37563753 @ Test
3757- @ SuppressWarnings ({"unchecked" , "deprecation" })
3754+ @ SuppressWarnings ({"unchecked" })
37583755 public void testInvokeRecordInterceptorSuccess () throws Exception {
37593756 ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
37603757 Consumer <Integer , String > consumer = mock (Consumer .class );
@@ -3796,7 +3793,7 @@ public void onMessage(ConsumerRecord<Integer, String> data) {
37963793 RecordInterceptor <Integer , String > recordInterceptor = spy (new RecordInterceptor <Integer , String >() {
37973794
37983795 @ Override
3799- @ Nullable
3796+ @ NonNull
38003797 public ConsumerRecord <Integer , String > intercept (ConsumerRecord <Integer , String > record ,
38013798 Consumer <Integer , String > consumer ) {
38023799
@@ -3842,7 +3839,7 @@ private static Stream<Arguments> paramsForRecordAllSkipped() {
38423839
38433840 @ ParameterizedTest (name = "{index} testInvokeRecordInterceptorAllSkipped AckMode.{0} early intercept {1}" )
38443841 @ MethodSource ("paramsForRecordAllSkipped" )
3845- @ SuppressWarnings ({"unchecked" , "deprecation" })
3842+ @ SuppressWarnings ({"unchecked" })
38463843 public void testInvokeRecordInterceptorAllSkipped (AckMode ackMode , boolean early ) throws Exception {
38473844 ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
38483845 Consumer <Integer , String > consumer = mock (Consumer .class );
@@ -3869,7 +3866,7 @@ public void testInvokeRecordInterceptorAllSkipped(AckMode ackMode, boolean early
38693866 containerProps .setGroupId ("grp" );
38703867 containerProps .setAckMode (ackMode );
38713868
3872- containerProps .setMessageListener ((MessageListener ) msg -> {
3869+ containerProps .setMessageListener ((MessageListener <?, ?> ) msg -> {
38733870 });
38743871 containerProps .setClientId ("clientId" );
38753872
@@ -3912,7 +3909,7 @@ public ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String>
39123909
39133910 @ ParameterizedTest (name = "{index} testInvokeBatchInterceptorAllSkipped early intercept {0}" )
39143911 @ ValueSource (booleans = { true , false })
3915- @ SuppressWarnings ({"unchecked" , "deprecation" })
3912+ @ SuppressWarnings ({"unchecked" })
39163913 public void testInvokeBatchInterceptorAllSkipped (boolean early ) throws Exception {
39173914 ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
39183915 Consumer <Integer , String > consumer = mock (Consumer .class );
@@ -3939,7 +3936,7 @@ public void testInvokeBatchInterceptorAllSkipped(boolean early) throws Exception
39393936 containerProps .setGroupId ("grp" );
39403937 containerProps .setAckMode (AckMode .BATCH );
39413938
3942- containerProps .setMessageListener ((BatchMessageListener ) msgs -> {
3939+ containerProps .setMessageListener ((BatchMessageListener <?, ?> ) msgs -> {
39433940 });
39443941 containerProps .setClientId ("clientId" );
39453942 if (!early ) {
@@ -3975,7 +3972,7 @@ public ConsumerRecords<Integer, String> intercept(ConsumerRecords<Integer, Strin
39753972 }
39763973
39773974 @ Test
3978- @ SuppressWarnings ({"unchecked" , "deprecation" })
3975+ @ SuppressWarnings ({"unchecked" })
39793976 public void testInvokeRecordInterceptorFailure () throws Exception {
39803977 ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
39813978 Consumer <Integer , String > consumer = mock (Consumer .class );
@@ -4015,7 +4012,7 @@ public void onMessage(ConsumerRecord<Integer, String> data) {
40154012 RecordInterceptor <Integer , String > recordInterceptor = spy (new RecordInterceptor <Integer , String >() {
40164013
40174014 @ Override
4018- @ Nullable
4015+ @ NonNull
40194016 public ConsumerRecord <Integer , String > intercept (ConsumerRecord <Integer , String > record ,
40204017 Consumer <Integer , String > consumer ) {
40214018
0 commit comments