Skip to content
Merged
34 changes: 33 additions & 1 deletion .idea/inspectionProfiles/Druid.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public void setup() throws IOException

factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(
QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()
QueryBenchmarkUtil.noopIntervalChunkingQueryRunnerDecorator()
),
new TimeseriesQueryEngine(),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ public String getFormatString()
strategySelector,
new GroupByQueryQueryToolChest(
strategySelector,
QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()
QueryBenchmarkUtil.noopIntervalChunkingQueryRunnerDecorator()
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,10 @@ public void setup() throws IOException
0,
Integer.MAX_VALUE
),
new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()),
new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryBenchmarkUtil.noopIntervalChunkingQueryRunnerDecorator()
),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ public String getFormatString()
strategySelector,
new GroupByQueryQueryToolChest(
strategySelector,
QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()
QueryBenchmarkUtil.noopIntervalChunkingQueryRunnerDecorator()
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public static <T, QueryType extends Query<T>> QueryRunner<T> makeQueryRunner(
);
}

public static IntervalChunkingQueryRunnerDecorator NoopIntervalChunkingQueryRunnerDecorator()
public static IntervalChunkingQueryRunnerDecorator noopIntervalChunkingQueryRunnerDecorator()
{
return new IntervalChunkingQueryRunnerDecorator(null, null, null) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ public void setup() throws IOException
new SearchStrategySelector(Suppliers.ofInstance(config)),
new SearchQueryQueryToolChest(
config,
QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()
QueryBenchmarkUtil.noopIntervalChunkingQueryRunnerDecorator()
),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public void setup() throws IOException
factory = new SelectQueryRunnerFactory(
new SelectQueryQueryToolChest(
JSON_MAPPER,
QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator(),
QueryBenchmarkUtil.noopIntervalChunkingQueryRunnerDecorator(),
selectConfigSupplier
),
new SelectQueryEngine(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ public void setup() throws IOException

factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(
QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()
QueryBenchmarkUtil.noopIntervalChunkingQueryRunnerDecorator()
),
new TimeseriesQueryEngine(),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,10 @@ public void setup() throws IOException
0,
Integer.MAX_VALUE
),
new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()),
new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryBenchmarkUtil.noopIntervalChunkingQueryRunnerDecorator()
),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,10 @@ private void setupQueries()
0,
Integer.MAX_VALUE
),
new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()),
new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryBenchmarkUtil.noopIntervalChunkingQueryRunnerDecorator()
),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
);
}
Expand Down Expand Up @@ -270,7 +273,7 @@ private void setupQueries()

timeseriesQuery = timeseriesQueryBuilder.build();
timeseriesFactory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()),
new TimeseriesQueryQueryToolChest(QueryBenchmarkUtil.noopIntervalChunkingQueryRunnerDecorator()),
new TimeseriesQueryEngine(),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,21 +120,19 @@ public void emit(Event event)
{
try {
URI uri = uriExtractor.apply(event);
// get() before computeIfAbsent() is an optimization to avoid locking in computeIfAbsent() if not needed.
// See https://github.com/apache/incubator-druid/pull/6898#discussion_r251384586.
HttpPostEmitter emitter = emitters.get(uri);
if (emitter == null) {
try {
emitter = emitters.computeIfAbsent(uri, u -> {
try {
return innerLifecycle.addMaybeStartManagedInstance(
new HttpPostEmitter(
config.buildHttpEmitterConfig(u.toString()),
client,
jsonMapper
)
new HttpPostEmitter(config.buildHttpEmitterConfig(u.toString()), client, jsonMapper)
);
}
catch (Exception e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -123,13 +126,23 @@ protected void finalize() throws Throwable
public void close()
{
closed.set(true);
final Map<K, ImmediateCreationResourceHolder<K, V>> mapView = pool.asMap();
for (K k : ImmutableSet.copyOf(mapView.keySet())) {
mapView.remove(k).close();
final ConcurrentMap<K, ImmediateCreationResourceHolder<K, V>> mapView = pool.asMap();
Closer closer = Closer.create();
for (Iterator<Map.Entry<K, ImmediateCreationResourceHolder<K, V>>> iterator =
mapView.entrySet().iterator(); iterator.hasNext(); ) {
Map.Entry<K, ImmediateCreationResourceHolder<K, V>> e = iterator.next();
iterator.remove();
closer.register(e.getValue());
}
try {
closer.close();
}
catch (IOException e) {
throw new RuntimeException(e);
}
}

private static class ImmediateCreationResourceHolder<K, V>
private static class ImmediateCreationResourceHolder<K, V> implements Closeable
{
private final int maxSize;
private final K key;
Expand Down Expand Up @@ -265,7 +278,8 @@ private boolean holderListContains(V object)
return resourceHolderList.stream().anyMatch(a -> a.getResource().equals(object));
}

void close()
@Override
public void close()
{
synchronized (this) {
closed = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void testMakePostComputeManipulatorFn()
.put(
TimeseriesQuery.class,
new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
)
)
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public int getNumThreads()
strategySelector,
new GroupByQueryQueryToolChest(
strategySelector,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public static Iterable<Object[]> constructorFeeder() throws IOException
SelectQueryRunnerFactory factory = new SelectQueryRunnerFactory(
new SelectQueryQueryToolChest(
new DefaultObjectMapper(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator(),
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator(),
selectConfigSupplier
),
new SelectQueryEngine(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void setup() throws IOException
new StupidPool<>("map-virtual-column-test", () -> ByteBuffer.allocate(1024)),
new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public static Iterable<Object[]> constructorFeeder()
defaultPool,
new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
Expand All @@ -91,7 +91,7 @@ public static Iterable<Object[]> constructorFeeder()
customPool,
new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public static Iterable<Object[]> constructorFeeder()
defaultPool,
new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
Expand All @@ -91,7 +91,7 @@ public static Iterable<Object[]> constructorFeeder()
customPool,
new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -184,7 +185,7 @@ public boolean start()
final String topic = getKafkaTopic();
LOG.debug("About to listen to topic [%s] with group.id [%s]", topic, factoryId);
cacheHandler = cacheManager.createCache();
final Map<String, String> map = cacheHandler.getCache();
final ConcurrentMap<String, String> map = cacheHandler.getCache();
mapRef.set(map);
// Enable publish-subscribe
kafkaProperties.setProperty("auto.offset.reset", "smallest");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
Expand All @@ -123,8 +124,8 @@ public class LegacyKafkaIndexTaskRunner extends SeekableStreamIndexTaskRunner<In
private static final EmittingLogger log = new EmittingLogger(LegacyKafkaIndexTaskRunner.class);
private static final String METADATA_NEXT_PARTITIONS = "nextPartitions";

private final Map<Integer, Long> endOffsets = new ConcurrentHashMap<>();
private final Map<Integer, Long> nextOffsets = new ConcurrentHashMap<>();
private final ConcurrentMap<Integer, Long> endOffsets = new ConcurrentHashMap<>();
private final ConcurrentMap<Integer, Long> nextOffsets = new ConcurrentHashMap<>();

// The pause lock and associated conditions are to support coordination between the Jetty threads and the main
// ingestion loop. The goal is to provide callers of the API a guarantee that if pause() returns successfully
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.Counters;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.TaskLock;
Expand Down Expand Up @@ -2393,8 +2392,7 @@ public boolean checkPointDataSourceMetadata(
);
return true;
}
},
new Counters()
}
);
final TaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory(
taskStorage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
Expand Down Expand Up @@ -341,7 +342,8 @@ private void rescheduleRunnable(long delayMillis)

private ScheduledExecutorService scheduledExec;

private final Map<StreamPartition<String>, PartitionResource> partitionResources = new ConcurrentHashMap<>();
private final ConcurrentMap<StreamPartition<String>, PartitionResource> partitionResources =
new ConcurrentHashMap<>();
private BlockingQueue<OrderedPartitionableRecord<String, String>> records;

private volatile boolean checkPartitionsStarted = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.Counters;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.TaskLock;
Expand Down Expand Up @@ -2705,8 +2704,7 @@ public boolean checkPointDataSourceMetadata(
);
return true;
}
},
new Counters()
}
);
final TaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory(
taskStorage,
Expand Down
Loading