Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.streamnative.pulsar.handlers.kop;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.service.Producer;
Expand All @@ -27,6 +28,9 @@
*/
@Slf4j
public class InternalServerCnx extends ServerCnx {

public static final SocketAddress MOCKED_REMOTE_ADDRESS = new InetSocketAddress("localhost", 9999);

@Getter
KafkaRequestHandler kafkaRequestHandler;

Expand All @@ -39,7 +43,7 @@ public InternalServerCnx(KafkaRequestHandler kafkaRequestHandler) {
// mock some values, or Producer create will meet NPE.
// used in test, which will not call channel.active, and not call updateCtx.
if (this.remoteAddress == null) {
this.remoteAddress = new InetSocketAddress("localhost", 9999);
this.remoteAddress = MOCKED_REMOTE_ADDRESS;
}
}

Expand All @@ -56,8 +60,8 @@ public void closeProducer(Producer producer) {
}

// called after channel active
public void updateCtx() {
this.remoteAddress = kafkaRequestHandler.remoteAddress;
public void updateCtx(final SocketAddress remoteAddress) {
this.remoteAddress = remoteAddress;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ public KafkaRequestHandler(PulsarService pulsarService,
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
getTopicManager().updateCtx();
topicManager.setRemoteAddress(ctx.channel().remoteAddress());
if (authenticator != null) {
authenticator.reset();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import static com.google.common.base.Preconditions.checkArgument;

import com.google.common.annotations.VisibleForTesting;
import io.streamnative.pulsar.handlers.kop.utils.OffsetSearchPredicate;
import java.io.Closeable;
import java.util.ArrayList;
Expand All @@ -24,6 +25,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
Expand All @@ -42,6 +44,10 @@
*/
@Slf4j
public class KafkaTopicConsumerManager implements Closeable {

private static final AtomicIntegerFieldUpdater<KafkaTopicConsumerManager> NUM_CREATED_CURSORS_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(KafkaTopicConsumerManager.class, "numCreatedCursors");

private final PersistentTopic topic;
private final KafkaRequestHandler requestHandler;

Expand All @@ -60,6 +66,9 @@ public class KafkaTopicConsumerManager implements Closeable {
@Getter
private final Map<Long, Long> lastAccessTimes;

// Record the number of created cursor
private volatile int numCreatedCursors = 0;

KafkaTopicConsumerManager(KafkaRequestHandler requestHandler, PersistentTopic topic) {
this.topic = topic;
this.cursors = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -191,6 +200,7 @@ public void close() {
log.debug("[{}] Close TCM for topic {}.",
requestHandler.ctx.channel(), topic.getName());
}
NUM_CREATED_CURSORS_UPDATER.set(this, 0);
final List<CompletableFuture<Pair<ManagedCursor, Long>>> cursorFuturesToClose = new ArrayList<>();
cursors.forEach((ignored, cursorFuture) -> cursorFuturesToClose.add(cursorFuture));
cursors.clear();
Expand Down Expand Up @@ -235,6 +245,7 @@ private CompletableFuture<Pair<ManagedCursor, Long>> asyncGetCursorByOffset(long
}
try {
final ManagedCursor newCursor = ledger.newNonDurableCursor(previous, cursorName);
NUM_CREATED_CURSORS_UPDATER.incrementAndGet(this);
createdCursors.putIfAbsent(newCursor.getName(), newCursor);
lastAccessTimes.put(offset, System.currentTimeMillis());
return Pair.of(newCursor, offset);
Expand All @@ -249,4 +260,9 @@ private CompletableFuture<Pair<ManagedCursor, Long>> asyncGetCursorByOffset(long
public ManagedLedger getManagedLedger() {
return topic.getManagedLedger();
}

@VisibleForTesting
public int getNumCreatedCursors() {
return numCreatedCursors;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.kop;

import com.google.common.annotations.VisibleForTesting;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

/**
* The cache for {@link KafkaTopicConsumerManager}, aka TCM.
*/
@Slf4j
public class KafkaTopicConsumerManagerCache {

private static final KafkaTopicConsumerManagerCache TCM_CACHE = new KafkaTopicConsumerManagerCache();

// The 1st key is the full topic name, the 2nd key is the remote address of Kafka client.
// Because a topic could have multiple connected consumers, for different consumers we should maintain different
// KafkaTopicConsumerManagers, which are responsible for maintaining the cursors.
private final Map<String, Map<SocketAddress, CompletableFuture<KafkaTopicConsumerManager>>>
cache = new ConcurrentHashMap<>();

public static KafkaTopicConsumerManagerCache getInstance() {
return TCM_CACHE;
}

private KafkaTopicConsumerManagerCache() {
// No ops
}

public CompletableFuture<KafkaTopicConsumerManager> computeIfAbsent(
final String fullTopicName,
final SocketAddress remoteAddress,
final Supplier<CompletableFuture<KafkaTopicConsumerManager>> mappingFunction) {
return cache.computeIfAbsent(fullTopicName, ignored -> new ConcurrentHashMap<>())
.computeIfAbsent(remoteAddress, ignored -> mappingFunction.get());
}

public void forEach(final Consumer<CompletableFuture<KafkaTopicConsumerManager>> action) {
cache.values().forEach(internalMap -> {
internalMap.values().forEach(action);
});
}

public void removeAndClose(final String fullTopicName) {
// The TCM future could be completed with null, so we should process this case
Optional.ofNullable(cache.remove(fullTopicName)).ifPresent(map ->
map.forEach((remoteAddress, future) -> {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Remove and close TCM", fullTopicName, remoteAddress);
}
// Use thenAccept to avoid blocking
future.thenAccept(tcm -> {
if (tcm != null) {
tcm.close();
}
});
}));
}

public void close() {
cache.forEach((fullTopicName, internalMap) -> {
internalMap.forEach((remoteAddress, future) -> {
try {
Optional.ofNullable(future.get(100, TimeUnit.MILLISECONDS))
.ifPresent(KafkaTopicConsumerManager::close);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.warn("[{}][{}] Failed to get TCM future when trying to close it", fullTopicName, remoteAddress);
}
});
});
}

@VisibleForTesting
public int getCount() {
final AtomicInteger count = new AtomicInteger(0);
forEach(ignored -> count.incrementAndGet());
return count.get();
}

@VisibleForTesting
public @NonNull List<KafkaTopicConsumerManager> getTopicConsumerManagers(final String fullTopicName) {
return cache.getOrDefault(fullTopicName, Collections.emptyMap()).values().stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,15 @@
*/
package io.streamnative.pulsar.handlers.kop;

import com.google.common.annotations.VisibleForTesting;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -41,14 +40,12 @@
@Slf4j
public class KafkaTopicManager {

private static final KafkaTopicConsumerManagerCache TCM_CACHE = KafkaTopicConsumerManagerCache.getInstance();

private final KafkaRequestHandler requestHandler;
private final PulsarService pulsarService;
private final BrokerService brokerService;
private final LookupClient lookupClient;

// consumerTopicManagers for consumers cache.
private static final ConcurrentHashMap<String, CompletableFuture<KafkaTopicConsumerManager>>
consumerTopicManagers = new ConcurrentHashMap<>();
private volatile SocketAddress remoteAddress;

// cache for topics: <topicName, persistentTopic>, for removing producer
@Getter
Expand Down Expand Up @@ -77,7 +74,7 @@ public class KafkaTopicManager {

KafkaTopicManager(KafkaRequestHandler kafkaRequestHandler) {
this.requestHandler = kafkaRequestHandler;
this.pulsarService = kafkaRequestHandler.getPulsarService();
PulsarService pulsarService = kafkaRequestHandler.getPulsarService();
this.brokerService = pulsarService.getBrokerService();
this.internalServerCnx = new InternalServerCnx(requestHandler);
this.lookupClient = KafkaProtocolHandler.getLookupClient(pulsarService);
Expand All @@ -92,7 +89,7 @@ private static void initializeCursorExpireTask(final ScheduledExecutorService ex
// check expired cursor every 1 min.
cursorExpireTask = executor.scheduleWithFixedDelay(() -> {
long current = System.currentTimeMillis();
consumerTopicManagers.values().forEach(future -> {
TCM_CACHE.forEach(future -> {
if (future != null && future.isDone() && !future.isCompletedExceptionally()) {
future.join().deleteExpiredCursor(current, expirePeriodMillis);
}
Expand All @@ -104,8 +101,9 @@ private static void initializeCursorExpireTask(final ScheduledExecutorService ex
}

// update Ctx information, since at internalServerCnx create time there is no ctx passed into kafkaRequestHandler.
public void updateCtx() {
internalServerCnx.updateCtx();
public void setRemoteAddress(SocketAddress remoteAddress) {
internalServerCnx.updateCtx(remoteAddress);
this.remoteAddress = remoteAddress;
}

// topicName is in pulsar format. e.g. persistent://public/default/topic-partition-0
Expand All @@ -118,11 +116,17 @@ public CompletableFuture<KafkaTopicConsumerManager> getTopicConsumerManager(Stri
}
return CompletableFuture.completedFuture(null);
}
return consumerTopicManagers.computeIfAbsent(
if (remoteAddress == null) {
log.error("[{}] Try to getTopicConsumerManager({}) while remoteAddress is not set",
requestHandler.ctx.channel(), topicName);
return CompletableFuture.completedFuture(null);
}
return TCM_CACHE.computeIfAbsent(
topicName,
t -> {
remoteAddress,
() -> {
final CompletableFuture<KafkaTopicConsumerManager> tcmFuture = new CompletableFuture<>();
getTopic(t).whenComplete((persistentTopic, throwable) -> {
getTopic(topicName).whenComplete((persistentTopic, throwable) -> {
if (persistentTopic.isPresent() && throwable == null) {
if (log.isDebugEnabled()) {
log.debug("[{}] Call getTopicConsumerManager for {}, and create TCM for {}.",
Expand Down Expand Up @@ -321,45 +325,20 @@ public static void deReference(String topicName) {
try {
removeTopicManagerCache(topicName);

Optional.ofNullable(consumerTopicManagers.remove(topicName)).ifPresent(
// Use thenAccept to avoid blocking
tcmFuture -> tcmFuture.thenAccept(tcm -> {
if (tcm != null) {
tcm.close();
}
})
);

TCM_CACHE.removeAndClose(topicName);
removePersistentTopicAndReferenceProducer(topicName);
} catch (Exception e) {
log.error("Failed to close reference for individual topic {}. exception:", topicName, e);
}
}

public static void removeKafkaTopicConsumerManager(String topicName) {
consumerTopicManagers.remove(topicName);
}

public static void closeKafkaTopicConsumerManagers() {
synchronized (KafkaTopicManager.class) {
if (cursorExpireTask != null) {
cursorExpireTask.cancel(true);
cursorExpireTask = null;
}
}
consumerTopicManagers.forEach((topic, tcmFuture) -> {
try {
Optional.ofNullable(tcmFuture.get(300, TimeUnit.SECONDS))
.ifPresent(KafkaTopicConsumerManager::close);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.warn("Failed to get TCM future of {} when trying to close it", topic);
}
});
consumerTopicManagers.clear();
}

@VisibleForTesting
public static int getNumberOfKafkaTopicConsumerManagers() {
return consumerTopicManagers.size();
TCM_CACHE.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ private void handlePartitionData(final TopicPartition topicPartition,
statsLogger.getPrepareMetadataStats().registerFailedEvent(
MathUtils.elapsedNanos(startPrepareMetadataNanos), TimeUnit.NANOSECONDS);
// remove null future cache
KafkaTopicManager.removeKafkaTopicConsumerManager(fullTopicName);
KafkaTopicConsumerManagerCache.getInstance().removeAndClose(fullTopicName);
addErrorPartitionResponse(topicPartition, Errors.NOT_LEADER_FOR_PARTITION);
return;
}
Expand Down Expand Up @@ -315,7 +315,7 @@ private void handlePartitionData(final TopicPartition topicPartition,
// tcm is closed, just return a NONE error because the channel may be still active
log.warn("[{}] KafkaTopicConsumerManager is closed, remove TCM of {}",
requestHandler.ctx, fullTopicName);
KafkaTopicManager.removeKafkaTopicConsumerManager(fullTopicName);
KafkaTopicConsumerManagerCache.getInstance().removeAndClose(fullTopicName);
addErrorPartitionResponse(topicPartition, Errors.NONE);
return;
}
Expand Down
Loading