Skip to content
Open
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package io.reactivex.mantis.network.push;

import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.spectator.MetricGroupId;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -37,12 +40,21 @@ public class ConsistentHashingRouter<K, V> extends Router<KeyValuePair<K, V>> {
private static long validCacheAgeMSec = 5000;
private HashFunction hashFunction;
private AtomicReference<SnapshotCache<SortedMap<Long, AsyncConnection<KeyValuePair<K, V>>>>> cachedRingRef = new AtomicReference<>();
private final MetricGroupId metricGroup = new MetricGroupId("ConsistentHashingRouter");
private final Metrics metrics;
private final Counter collisionsCounter;

public ConsistentHashingRouter(String name,
Func1<KeyValuePair<K, V>, byte[]> dataEncoder,
HashFunction hashFunction) {
super("ConsistentHashingRouter_" + name, dataEncoder);
this.hashFunction = hashFunction;
this.metrics = new Metrics.Builder()
.id(metricGroup)
.addCounter("numHashCollisions")
.build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don' t think this is ever going to get registered. The underlying Router implementation has a getMetrics method that is used to register the metrics. You could override that here but then you would need to re-instantiate numEventsRouted and numEventsProcessed to make sure those metrics are still sent (and probably use a metricsGroup like the parent class to avoid a breaking change).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think you need to register with something like "MetricsRegistry.getInstance().registerAndGet"

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gracias y'all, used the singleton to register the metrics!


this.collisionsCounter = this.metrics.getCounter("numHashCollisions");
}

@Override
Expand Down Expand Up @@ -108,7 +120,7 @@ private void computeRing(Set<AsyncConnection<KeyValuePair<K, V>>> connections) {
byte[] connectionBytes = (connectionId + "-" + i).getBytes();
long hash = hashFunction.computeHash(connectionBytes);
if (ring.containsKey(hash)) {
logger.error("Hash collision when computing ring. {} hashed to a value already in the ring.", connectionId + "-" + i);
this.collisionsCounter.increment();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might still be nice to have logging for which connections are duplicates. Could we maybe track at the connection level (instead of the partition level) and emit one log line with all of the duplicate connections (not virtual nodes in the hash ring)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a debug log that should come out in a structure like:

{
  <hash>: [
    0: <first connection with hash>
    1..N: <colliding connections>
  ]
}

I chose a debug log that we'll have to explicitly opt into because I expect the structure to be huge and potentially cause this pausing like behavior we've been seeing today.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh and toString() contains all the interesting bits of info we'd need, taken from AsyncConnection.java:

    @Override
    public String toString() {
        return "AsyncConnection [host=" + host + ", port=" + port
            + ", groupId=" + groupId + ", slotId=" + slotId + ", id=" + id
            + "]";
    }

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I think toString will be called by the logger lib so I don't need to call it myself

}
ring.put(hash, connection);
}
Expand Down
Loading