1616
1717package org .springframework .kafka .support .micrometer ;
1818
19+ import org .springframework .lang .NonNull ;
1920import org .springframework .util .StringUtils ;
2021
2122import io .micrometer .common .KeyValues ;
2829 * Spring for Apache Kafka Observation for listeners.
2930 *
3031 * @author Gary Russell
32+ * @author Christian Mergenthaler
33+ * @author Wang Zhiyang
34+ *
3135 * @since 3.0
3236 *
3337 */
@@ -44,19 +48,32 @@ public Class<? extends ObservationConvention<? extends Context>> getDefaultConve
4448 }
4549
4650 @ Override
51+ @ NonNull
4752 public String getPrefix () {
4853 return "spring.kafka.listener" ;
4954 }
5055
5156 @ Override
57+ @ NonNull
5258 public KeyName [] getLowCardinalityKeyNames () {
5359 return ListenerLowCardinalityTags .values ();
5460 }
5561
62+ @ Override
63+ @ NonNull
64+ public KeyName [] getHighCardinalityKeyNames () {
65+ return ListenerHighCardinalityTags .values ();
66+ }
67+
5668 };
5769
5870 /**
5971 * Low cardinality tags.
72+ *
73+ * @author Christian Mergenthaler
74+ * @author Wang Zhiyang
75+ *
76+ * @since 3.2
6077 */
6178 public enum ListenerLowCardinalityTags implements KeyName {
6279
@@ -66,6 +83,7 @@ public enum ListenerLowCardinalityTags implements KeyName {
6683 LISTENER_ID {
6784
6885 @ Override
86+ @ NonNull
6987 public String asString () {
7088 return "spring.kafka.listener.id" ;
7189 }
@@ -78,6 +96,7 @@ public String asString() {
7896 MESSAGING_SYSTEM {
7997
8098 @ Override
99+ @ NonNull
81100 public String asString () {
82101 return "messaging.system" ;
83102 }
@@ -90,30 +109,20 @@ public String asString() {
90109 MESSAGING_OPERATION {
91110
92111 @ Override
112+ @ NonNull
93113 public String asString () {
94114 return "messaging.operation" ;
95115 }
96116
97117 },
98118
99- /**
100- * Messaging consumer id.
101- */
102- MESSAGING_CONSUMER_ID {
103-
104- @ Override
105- public String asString () {
106- return "messaging.consumer.id" ;
107- }
108-
109- },
110-
111119 /**
112120 * Messaging source name.
113121 */
114122 MESSAGING_SOURCE_NAME {
115123
116124 @ Override
125+ @ NonNull
117126 public String asString () {
118127 return "messaging.source.name" ;
119128 }
@@ -126,42 +135,71 @@ public String asString() {
126135 MESSAGING_SOURCE_KIND {
127136
128137 @ Override
138+ @ NonNull
129139 public String asString () {
130140 return "messaging.source.kind" ;
131141 }
132142
133143 },
134144
135145 /**
136- * Messaging consumer group.
146+ * Messaging the consumer group.
137147 */
138148 MESSAGING_CONSUMER_GROUP {
139149
140150 @ Override
151+ @ NonNull
141152 public String asString () {
142153 return "messaging.kafka.consumer.group" ;
143154 }
144155
145156 },
146157
158+ }
159+
160+ /**
161+ * High cardinality tags.
162+ *
163+ * @author Wang Zhiyang
164+ * @author Christian Mergenthaler
165+ *
166+ * @since 3.2
167+ */
168+ public enum ListenerHighCardinalityTags implements KeyName {
169+
147170 /**
148171 * Messaging client id.
149172 */
150173 MESSAGING_CLIENT_ID {
151174
152175 @ Override
176+ @ NonNull
153177 public String asString () {
154178 return "messaging.kafka.client_id" ;
155179 }
156180
157181 },
158182
183+ /**
184+ * Messaging consumer id (consumer group and client id).
185+ */
186+ MESSAGING_CONSUMER_ID {
187+
188+ @ Override
189+ @ NonNull
190+ public String asString () {
191+ return "messaging.consumer.id" ;
192+ }
193+
194+ },
195+
159196 /**
160197 * Messaging partition.
161198 */
162199 MESSAGING_PARTITION {
163200
164201 @ Override
202+ @ NonNull
165203 public String asString () {
166204 return "messaging.kafka.source.partition" ;
167205 }
@@ -174,18 +212,22 @@ public String asString() {
174212 MESSAGING_OFFSET {
175213
176214 @ Override
215+ @ NonNull
177216 public String asString () {
178217 return "messaging.kafka.message.offset" ;
179218 }
180219
181- }
220+ },
182221
183222 }
184223
185224 /**
186225 * Default {@link KafkaListenerObservationConvention} for Kafka listener key values.
187226 *
188227 * @author Gary Russell
228+ * @author Christian Mergenthaler
229+ * @author Wang Zhiyang
230+ *
189231 * @since 3.0
190232 *
191233 */
@@ -199,30 +241,31 @@ public static class DefaultKafkaListenerObservationConvention implements KafkaLi
199241
200242 @ Override
201243 public KeyValues getLowCardinalityKeyValues (KafkaRecordReceiverContext context ) {
202- KeyValues keyValues = KeyValues .of (
244+
245+ return KeyValues .of (
203246 ListenerLowCardinalityTags .LISTENER_ID .withValue (context .getListenerId ()),
204- ListenerLowCardinalityTags .MESSAGING_CONSUMER_ID .withValue (getConsumerId (context )),
205247 ListenerLowCardinalityTags .MESSAGING_SYSTEM .withValue ("kafka" ),
206248 ListenerLowCardinalityTags .MESSAGING_OPERATION .withValue ("receive" ),
207249 ListenerLowCardinalityTags .MESSAGING_SOURCE_NAME .withValue (context .getSource ()),
208250 ListenerLowCardinalityTags .MESSAGING_SOURCE_KIND .withValue ("topic" ),
209251 ListenerLowCardinalityTags .MESSAGING_CONSUMER_GROUP .withValue (context .getGroupId ())
210252 );
211-
212- if (StringUtils .hasText (context .getClientId ())) {
213- keyValues = keyValues .and (ListenerLowCardinalityTags .MESSAGING_CLIENT_ID .withValue (context .getClientId ()));
214- }
215-
216- return keyValues ;
217253 }
218254
219255 @ Override
256+ @ NonNull
220257 public KeyValues getHighCardinalityKeyValues (KafkaRecordReceiverContext context ) {
221258 KeyValues keyValues = KeyValues .of (
222- ListenerLowCardinalityTags .MESSAGING_PARTITION .withValue (context .getPartition ()),
223- ListenerLowCardinalityTags .MESSAGING_OFFSET .withValue (context .getOffset ())
259+ ListenerHighCardinalityTags .MESSAGING_PARTITION .withValue (context .getPartition ()),
260+ ListenerHighCardinalityTags .MESSAGING_OFFSET .withValue (context .getOffset ())
224261 );
225262
263+ if (StringUtils .hasText (context .getClientId ())) {
264+ keyValues = keyValues
265+ .and (ListenerHighCardinalityTags .MESSAGING_CLIENT_ID .withValue (context .getClientId ()))
266+ .and (ListenerHighCardinalityTags .MESSAGING_CONSUMER_ID .withValue (getConsumerId (context )));
267+ }
268+
226269 return keyValues ;
227270 }
228271
@@ -231,11 +274,6 @@ public String getContextualName(KafkaRecordReceiverContext context) {
231274 return context .getSource () + " receive" ;
232275 }
233276
234- @ Override
235- public String getName () {
236- return "spring.kafka.listener" ;
237- }
238-
239277 private String getConsumerId (KafkaRecordReceiverContext context ) {
240278 if (StringUtils .hasText (context .getClientId ())) {
241279 return context .getGroupId () + " - " + context .getClientId ();
0 commit comments