Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions checkstyle/import-control-metadata.xml
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.fault" />
<allow pkg="org.apache.kafka.server.config" />
<allow pkg="org.apache.kafka.server.quota" />
<allow pkg="org.apache.kafka.server.util"/>
<allow pkg="org.apache.kafka.test" />
<subpackage name="authorizer">
Expand Down
5 changes: 5 additions & 0 deletions checkstyle/import-control-server-common.xml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@
<allow pkg="com.yammer.metrics.core" />
</subpackage>

<subpackage name="quota">
<allow pkg="org.apache.kafka.server.network" />
<allow pkg="org.apache.kafka.server.util" />
</subpackage>

<subpackage name="share">
<allow pkg="org.apache.kafka.server.share" />
<subpackage name="persister">
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/kafka/server/QuotaFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.publisher.QuotaManagersProvider;
import org.apache.kafka.server.config.ClientQuotaManagerConfig;
import org.apache.kafka.server.config.QuotaConfig;
import org.apache.kafka.server.config.ReplicationQuotaManagerConfig;
Expand Down Expand Up @@ -58,7 +59,7 @@ public record QuotaManagers(ClientQuotaManager fetch,
ReplicationQuotaManager leader,
ReplicationQuotaManager follower,
ReplicationQuotaManager alterLogDirs,
Optional<Plugin<ClientQuotaCallback>> clientQuotaCallbackPlugin) {
Optional<Plugin<ClientQuotaCallback>> clientQuotaCallbackPlugin) implements QuotaManagersProvider {

public void shutdown() {
fetch.shutdown();
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/network/RequestChannel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ import org.apache.kafka.common.network.Send
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.Time
import org.apache.kafka.network.Session
import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics}
import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.network.RequestConvertToJson
import org.apache.kafka.server.network.Session

import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOption
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorRec
import org.apache.kafka.coordinator.transaction.ProducerIdManager
import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher}
import org.apache.kafka.metadata.{BrokerState, ListenerInfo, MetadataVersionConfigValidator}
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, ScramPublisher}
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicTopicClusterQuotaPublisher, ScramPublisher}
import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager}
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition}
Expand Down Expand Up @@ -494,7 +494,7 @@ class BrokerServer(
),
new DynamicTopicClusterQuotaPublisher(
clusterId,
config,
config.nodeId,
sharedServer.metadataPublishingFaultHandler,
"broker",
quotaManagers,
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import kafka.raft.KafkaRaftManager
import kafka.server.QuotaFactory.QuotaManagers

import scala.collection.immutable
import kafka.server.metadata.{ClientQuotaMetadataManager, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher}
import kafka.server.metadata.{ClientQuotaMetadataManager, DynamicConfigPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher}
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.internals.Plugin
import org.apache.kafka.common.message.ApiMessageType.ListenerType
Expand All @@ -38,7 +38,7 @@ import org.apache.kafka.image.publisher.{ControllerRegistrationsPublisher, Metad
import org.apache.kafka.metadata.{KafkaConfigSchema, ListenerInfo}
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, FeaturesPublisher, ScramPublisher}
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicTopicClusterQuotaPublisher, FeaturesPublisher, ScramPublisher}
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager}
import org.apache.kafka.server.{ProcessRole, SimpleApiVersionManager}
Expand Down Expand Up @@ -346,7 +346,7 @@ class ControllerServer(
// Set up the DynamicTopicClusterQuotaPublisher. This will enable quotas for the cluster and topics.
metadataPublishers.add(new DynamicTopicClusterQuotaPublisher(
clusterId,
config,
config.nodeId,
sharedServer.metadataPublishingFaultHandler,
"controller",
quotaManagers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.image.loader.LoaderManifest
import org.apache.kafka.image.publisher.MetadataPublisher
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, ScramPublisher}
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicTopicClusterQuotaPublisher, ScramPublisher}
import org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION
import org.apache.kafka.server.common.{FinalizedFeatures, RequestLocal, ShareVersion}
import org.apache.kafka.server.fault.FaultHandler
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.requests.{AbstractRequest, FetchRequest, RequestContext, RequestHeader}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.MockTime
import org.apache.kafka.network.Session
import org.apache.kafka.network.metrics.RequestChannelMetrics
import org.apache.kafka.server.network.Session
import org.apache.kafka.server.quota.{ClientQuotaManager, ThrottleCallback}
import org.junit.jupiter.api.AfterEach
import org.mockito.Mockito.mock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.kafka.common.internals.Plugin
import org.apache.kafka.common.metrics.Quota
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.server.config.ClientQuotaManagerConfig
import org.apache.kafka.network.Session
import org.apache.kafka.server.network.Session
import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity, ClientQuotaManager, ClientQuotaType, QuotaType}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ import org.apache.kafka.controller.{Controller, ControllerRequestContext, Result
import org.apache.kafka.image.publisher.ControllerRegistrationsPublisher
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.network.metrics.RequestChannelMetrics
import org.apache.kafka.network.Session
import org.apache.kafka.raft.{QuorumConfig, RaftManager}
import org.apache.kafka.server.SimpleApiVersionManager
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult, Authorizer}
import org.apache.kafka.server.common.{ApiMessageAndVersion, FinalizedFeatures, KRaftVersion, MetadataVersion, ProducerIdsBlock, RequestLocal}
import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs}
import org.apache.kafka.server.network.Session
import org.apache.kafka.server.quota.{ClientQuotaManager, ControllerMutationQuota, ControllerMutationQuotaManager}
import org.apache.kafka.server.util.FutureUtils
import org.apache.kafka.storage.internals.log.CleanerConfig
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorTes
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance}
import org.apache.kafka.metadata.{ConfigRepository, MetadataCache, MockConfigRepository}
import org.apache.kafka.network.Session
import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics}
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.authorizer.AclEntry
Expand All @@ -93,6 +92,7 @@ import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures, GroupV
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.logger.LoggingController
import org.apache.kafka.server.metrics.ClientMetricsTestUtils
import org.apache.kafka.server.network.Session
import org.apache.kafka.server.share.{CachedSharePartition, ErroneousAndValidPartitionData, SharePartitionKey}
import org.apache.kafka.server.quota.{ClientQuotaManager, ControllerMutationQuota, ControllerMutationQuotaManager, ThrottleCallback}
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ import org.apache.kafka.common.security.auth._
import org.apache.kafka.common.utils.{Sanitizer, SecurityUtils}
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import org.apache.kafka.network.Session
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult}
import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs}
import org.apache.kafka.server.network.Session
import org.apache.kafka.server.quota.QuotaType
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, DelegationTokenImage, FeaturesImage, MetadataDelta, MetadataImage, MetadataImageTest, MetadataProvenance, ProducerIdsImage, ScramImage, TopicsImage}
import org.apache.kafka.image.loader.LogDeltaManifest
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, ScramPublisher}
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicTopicClusterQuotaPublisher, ScramPublisher}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.LeaderAndEpoch
import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion, ShareVersion}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata.publisher;

import org.apache.kafka.common.Cluster;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.loader.LoaderManifest;
import org.apache.kafka.image.publisher.MetadataPublisher;
import org.apache.kafka.metadata.MetadataCache;
import org.apache.kafka.server.fault.FaultHandler;

public class DynamicTopicClusterQuotaPublisher implements MetadataPublisher {
private final String clusterId;
private final Integer nodeId;
private final FaultHandler faultHandler;
private final String nodeType;
private final QuotaManagersProvider quotaManagersProvider;

public DynamicTopicClusterQuotaPublisher(
String clusterId,
Integer nodeId,
FaultHandler faultHandler,
String nodeType,
QuotaManagersProvider quotaManagers
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you consider de-coupling those components by using lambda function? for example:

    public DynamicTopicClusterQuotaPublisher(
        String clusterId,
        int nodeId,
        FaultHandler faultHandler,
        String nodeType,
        Plugin<ClientQuotaCallback> clientQuotaCallbackPlugin,
        Runnable updateQuotaMetricConfigs
    ) {
        this.clusterId = clusterId;
        this.nodeId = nodeId;
        this.faultHandler = faultHandler;
        this.nodeType = nodeType;
        this.clientQuotaCallbackPlugin = clientQuotaCallbackPlugin;
        this.updateQuotaMetricConfigs = updateQuotaMetricConfigs;
    }

    @Override
    public String name() {
        return "DynamicTopicClusterQuotaPublisher " + nodeType + " id=" + nodeId;
    }

    @Override
    public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, LoaderManifest manifest) {
        try {
            if (delta.topicsDelta() != null || delta.clusterDelta() != null) {
                Cluster cluster = MetadataCache.toCluster(clusterId, newImage);
                if (clientQuotaCallbackPlugin.get().updateClusterMetadata(cluster)) {
                    updateQuotaMetricConfigs.run();
                }
            }
        } catch (Exception e) {
            String deltaName = "MetadataDelta up to " + newImage.highestOffsetAndEpoch().offset();
            faultHandler.handleFault("Uncaught exception while publishing dynamic topic or cluster changes from " + deltaName, e);
        }
    }
  1. we could create DynamicTopicClusterQuotaPublisher only if the clientQuotaCallbackPlugin is existent
  2. we wrap all callbacks in a single runnable object to avoid move many classes into server-common module

@JimmyWang6 WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @chia7712 ,
I just tried and found that clientQuotaCallbackPlugin also belongs to class ClientQuotaManager, so if we are going to de-coupe those components, the code will be shown like this:

public class DynamicTopicClusterQuotaPublisher implements MetadataPublisher {
    private final FaultHandler faultHandler;
    private final String nodeType;
    private final Runnable updateQuotaMetricConfigs;

    public DynamicTopicClusterQuotaPublisher(
        FaultHandler faultHandler,
        String nodeType,
        Runnable updateQuotaMetricConfigs
    ) {
        this.faultHandler = faultHandler;
        this.nodeType = nodeType;
        this.updateQuotaMetricConfigs = updateQuotaMetricConfigs;
    }

    @Override
    public String name() {
        return "DynamicTopicClusterQuotaPublisher " + nodeType;
    }

    @Override
    public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, LoaderManifest manifest) {
        try {
            if (delta.topicsDelta() != null || delta.clusterDelta() != null) {
                updateQuotaMetricConfigs.run();
            }
        } catch (Exception e) {
            String deltaName = "MetadataDelta up to " + newImage.highestOffsetAndEpoch().offset();
            faultHandler.handleFault("Uncaught exception while publishing dynamic topic or cluster changes from " + deltaName, e);
        }
    }
}

And the logic of the original DynamicTopicClusterQuotaPublisher will still remain in the new class updateQuotaMetricConfigs somewhere within the core module. However, such a modification might deviate from the original intent of this issue. What's your perspective on this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes but DynamicTopicClusterQuotaPublisher can generate the Cluster and check the result of updateClusterMetadata

) {
this.clusterId = clusterId;
this.nodeId = nodeId;
this.faultHandler = faultHandler;
this.nodeType = nodeType;
this.quotaManagersProvider = quotaManagers;
}

@Override
public String name() {
return "DynamicTopicClusterQuotaPublisher " + nodeType + " id=" + nodeId;
}

@Override
public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, LoaderManifest manifest) {
onMetadataUpdate(delta, newImage);
}

public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage) {
try {
quotaManagersProvider.clientQuotaCallbackPlugin().ifPresent(plugin -> {
if (delta.topicsDelta() != null || delta.clusterDelta() != null) {
Cluster cluster = MetadataCache.toCluster(clusterId, newImage);
if (plugin.get().updateClusterMetadata(cluster)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The necessity of this method is being discussed in KIP-1200. While I hope the vote succeeds, there is no harm in migrating the code though

quotaManagersProvider.fetch().updateQuotaMetricConfigs();
quotaManagersProvider.produce().updateQuotaMetricConfigs();
quotaManagersProvider.request().updateQuotaMetricConfigs();
quotaManagersProvider.controllerMutation().updateQuotaMetricConfigs();
}
}
});
} catch (Exception e) {
String deltaName = "MetadataDelta up to " + newImage.highestOffsetAndEpoch().offset();
faultHandler.handleFault("Uncaught exception while publishing dynamic topic or cluster changes from " + deltaName, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata.publisher;

import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.server.quota.ClientQuotaCallback;
import org.apache.kafka.server.quota.ClientQuotaManager;

import java.util.Optional;

/**
* Interface to provide access to quota managers for metadata publishers.
* This abstraction allows the metadata module to access quota managers without
* directly depending on the core module.
*/
public interface QuotaManagersProvider {
ClientQuotaManager fetch();

ClientQuotaManager produce();

ClientQuotaManager request();

ClientQuotaManager controllerMutation();

Optional<Plugin<ClientQuotaCallback>> clientQuotaCallbackPlugin();
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.network;
package org.apache.kafka.server.network;

import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.Sanitizer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.Sanitizer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.network.Session;
import org.apache.kafka.server.config.ClientQuotaManagerConfig;
import org.apache.kafka.server.network.Session;
import org.apache.kafka.server.util.ShutdownableThread;

import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@
import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
import org.apache.kafka.common.requests.WriteTxnMarkersRequest;
import org.apache.kafka.common.requests.WriteTxnMarkersResponse;
import org.apache.kafka.server.network.Session;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.BooleanNode;
Expand Down
Loading