Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions checkstyle/import-control-metadata.xml
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.fault" />
<allow pkg="org.apache.kafka.server.config" />
<allow pkg="org.apache.kafka.server.quota" />
<allow pkg="org.apache.kafka.server.util"/>
<allow pkg="org.apache.kafka.test" />
<subpackage name="authorizer">
Expand Down
5 changes: 5 additions & 0 deletions checkstyle/import-control-server-common.xml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@
<allow pkg="com.yammer.metrics.core" />
</subpackage>

<subpackage name="quota">
<allow pkg="org.apache.kafka.server.network" />
<allow pkg="org.apache.kafka.server.util" />
</subpackage>

<subpackage name="share">
<allow pkg="org.apache.kafka.server.share" />
<subpackage name="persister">
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/kafka/server/QuotaFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.publisher.QuotaManagersProvider;
import org.apache.kafka.server.config.ClientQuotaManagerConfig;
import org.apache.kafka.server.config.QuotaConfig;
import org.apache.kafka.server.config.ReplicationQuotaManagerConfig;
Expand Down Expand Up @@ -58,7 +59,7 @@ public record QuotaManagers(ClientQuotaManager fetch,
ReplicationQuotaManager leader,
ReplicationQuotaManager follower,
ReplicationQuotaManager alterLogDirs,
Optional<Plugin<ClientQuotaCallback>> clientQuotaCallbackPlugin) {
Optional<Plugin<ClientQuotaCallback>> clientQuotaCallbackPlugin) implements QuotaManagersProvider {

public void shutdown() {
fetch.shutdown();
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/network/RequestChannel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ import org.apache.kafka.common.network.Send
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.Time
import org.apache.kafka.network.Session
import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics}
import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.network.RequestConvertToJson
import org.apache.kafka.server.network.Session

import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOption
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorRec
import org.apache.kafka.coordinator.transaction.ProducerIdManager
import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher}
import org.apache.kafka.metadata.{BrokerState, ListenerInfo, MetadataVersionConfigValidator}
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, ScramPublisher}
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicTopicClusterQuotaPublisher, ScramPublisher}
import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager}
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition}
Expand Down Expand Up @@ -494,7 +494,7 @@ class BrokerServer(
),
new DynamicTopicClusterQuotaPublisher(
clusterId,
config,
config.nodeId,
sharedServer.metadataPublishingFaultHandler,
"broker",
quotaManagers,
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import kafka.raft.KafkaRaftManager
import kafka.server.QuotaFactory.QuotaManagers

import scala.collection.immutable
import kafka.server.metadata.{ClientQuotaMetadataManager, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher}
import kafka.server.metadata.{ClientQuotaMetadataManager, DynamicConfigPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher}
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.internals.Plugin
import org.apache.kafka.common.message.ApiMessageType.ListenerType
Expand All @@ -38,7 +38,7 @@ import org.apache.kafka.image.publisher.{ControllerRegistrationsPublisher, Metad
import org.apache.kafka.metadata.{KafkaConfigSchema, ListenerInfo}
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, FeaturesPublisher, ScramPublisher}
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicTopicClusterQuotaPublisher, FeaturesPublisher, ScramPublisher}
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager}
import org.apache.kafka.server.{ProcessRole, SimpleApiVersionManager}
Expand Down Expand Up @@ -346,7 +346,7 @@ class ControllerServer(
// Set up the DynamicTopicClusterQuotaPublisher. This will enable quotas for the cluster and topics.
metadataPublishers.add(new DynamicTopicClusterQuotaPublisher(
clusterId,
config,
config.nodeId,
sharedServer.metadataPublishingFaultHandler,
"controller",
quotaManagers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.image.loader.LoaderManifest
import org.apache.kafka.image.publisher.MetadataPublisher
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, ScramPublisher}
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicTopicClusterQuotaPublisher, ScramPublisher}
import org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION
import org.apache.kafka.server.common.{FinalizedFeatures, RequestLocal, ShareVersion}
import org.apache.kafka.server.fault.FaultHandler
Expand Down Expand Up @@ -221,7 +221,7 @@ class BrokerMetadataPublisher(
dynamicClientQuotaPublisher.onMetadataUpdate(delta, newImage, manifest)

// Apply topic or cluster quotas delta.
dynamicTopicClusterQuotaPublisher.onMetadataUpdate(delta, newImage)
dynamicTopicClusterQuotaPublisher.onMetadataUpdate(delta, newImage, manifest)

// Apply SCRAM delta.
scramPublisher.onMetadataUpdate(delta, newImage, manifest)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class RequestConvertToJsonTest {
expectedNode.set("responseQueueTimeMs", new DoubleNode(responseQueueTimeMs))
expectedNode.set("sendTimeMs", new DoubleNode(responseSendTimeMs))
expectedNode.set("securityProtocol", new TextNode(req.context.securityProtocol.toString))
expectedNode.set("principal", new TextNode(req.session.principal.toString))
expectedNode.set("principal", new TextNode(req.session.principal().toString))
expectedNode.set("listener", new TextNode(req.context.listenerName.value))
expectedNode.set("clientInformation", RequestConvertToJson.clientInfoNode(req.context.clientInformation))
expectedNode.set("temporaryMemoryBytes", new LongNode(temporaryMemoryBytes))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1068,7 +1068,7 @@ class SocketServerTest {
val socket = connect()
val bytes = new Array[Byte](40)
sendRequest(socket, bytes, Some(0))
assertEquals(KafkaPrincipal.ANONYMOUS, receiveRequest(server.dataPlaneRequestChannel).session.principal)
assertEquals(KafkaPrincipal.ANONYMOUS, receiveRequest(server.dataPlaneRequestChannel).session.principal())
}

/* Test that we update request metrics if the client closes the connection while the broker response is in flight. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.requests.{AbstractRequest, FetchRequest, RequestContext, RequestHeader}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.MockTime
import org.apache.kafka.network.Session
import org.apache.kafka.network.metrics.RequestChannelMetrics
import org.apache.kafka.server.network.Session
import org.apache.kafka.server.quota.{ClientQuotaManager, ThrottleCallback}
import org.junit.jupiter.api.AfterEach
import org.mockito.Mockito.mock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.kafka.common.internals.Plugin
import org.apache.kafka.common.metrics.Quota
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.server.config.ClientQuotaManagerConfig
import org.apache.kafka.network.Session
import org.apache.kafka.server.network.Session
import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity, ClientQuotaManager, ClientQuotaType, QuotaType}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ import org.apache.kafka.controller.{Controller, ControllerRequestContext, Result
import org.apache.kafka.image.publisher.ControllerRegistrationsPublisher
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.network.metrics.RequestChannelMetrics
import org.apache.kafka.network.Session
import org.apache.kafka.raft.{QuorumConfig, RaftManager}
import org.apache.kafka.server.SimpleApiVersionManager
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult, Authorizer}
import org.apache.kafka.server.common.{ApiMessageAndVersion, FinalizedFeatures, KRaftVersion, MetadataVersion, ProducerIdsBlock, RequestLocal}
import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs}
import org.apache.kafka.server.network.Session
import org.apache.kafka.server.quota.{ClientQuotaManager, ControllerMutationQuota, ControllerMutationQuotaManager}
import org.apache.kafka.server.util.FutureUtils
import org.apache.kafka.storage.internals.log.CleanerConfig
Expand Down
20 changes: 10 additions & 10 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorTes
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance}
import org.apache.kafka.metadata.{ConfigRepository, MetadataCache, MockConfigRepository}
import org.apache.kafka.network.Session
import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics}
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.authorizer.AclEntry
Expand All @@ -93,6 +92,7 @@ import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures, GroupV
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.logger.LoggingController
import org.apache.kafka.server.metrics.ClientMetricsTestUtils
import org.apache.kafka.server.network.Session
import org.apache.kafka.server.share.{CachedSharePartition, ErroneousAndValidPartitionData, SharePartitionKey}
import org.apache.kafka.server.quota.{ClientQuotaManager, ControllerMutationQuota, ControllerMutationQuotaManager, ThrottleCallback}
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch
Expand Down Expand Up @@ -2517,10 +2517,10 @@ class KafkaApisTest extends Logging {
// Verify the Session data passed to fetch quota manager is exactly what was defined in the test
val capturedSession = sessionCaptorFetch.getValue
assertNotNull(capturedSession)
assertNotNull(capturedSession.principal)
assertEquals(KafkaPrincipal.USER_TYPE, capturedSession.principal.getPrincipalType)
assertEquals("test-user", capturedSession.principal.getName)
assertEquals(testClientAddress, capturedSession.clientAddress)
assertNotNull(capturedSession.principal())
assertEquals(KafkaPrincipal.USER_TYPE, capturedSession.principal().getPrincipalType)
assertEquals("test-user", capturedSession.principal().getName)
assertEquals(testClientAddress, capturedSession.clientAddress())
assertEquals("test-user", capturedSession.sanitizedUser)

// Verify client ID passed to fetch quota manager matches what was defined
Expand Down Expand Up @@ -2733,11 +2733,11 @@ class KafkaApisTest extends Logging {
// Verify the Session data passed to fetch quota manager is exactly what was defined in the test
val capturedSession = sessionCaptorFetch.getValue
assertNotNull(capturedSession)
assertNotNull(capturedSession.principal)
assertEquals(KafkaPrincipal.USER_TYPE, capturedSession.principal.getPrincipalType)
assertEquals("test-user", capturedSession.principal.getName)
assertEquals(testClientAddress, capturedSession.clientAddress)
assertEquals("test-user", capturedSession.sanitizedUser)
assertNotNull(capturedSession.principal())
assertEquals(KafkaPrincipal.USER_TYPE, capturedSession.principal().getPrincipalType)
assertEquals("test-user", capturedSession.principal().getName)
assertEquals(testClientAddress, capturedSession.clientAddress())
assertEquals("test-user", capturedSession.sanitizedUser())

// Verify client ID passed to fetch quota manager matches what was defined
val capturedClientId = clientIdCaptor.getValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ import org.apache.kafka.common.security.auth._
import org.apache.kafka.common.utils.{Sanitizer, SecurityUtils}
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import org.apache.kafka.network.Session
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult}
import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs}
import org.apache.kafka.server.network.Session
import org.apache.kafka.server.quota.QuotaType
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, DelegationTokenImage, FeaturesImage, MetadataDelta, MetadataImage, MetadataImageTest, MetadataProvenance, ProducerIdsImage, ScramImage, TopicsImage}
import org.apache.kafka.image.loader.LogDeltaManifest
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, ScramPublisher}
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicTopicClusterQuotaPublisher, ScramPublisher}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.LeaderAndEpoch
import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion, ShareVersion}
Expand Down
Loading