-
Notifications
You must be signed in to change notification settings - Fork 292
Reduce address comparisons for network topology replica calculation #532
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 2 commits
257c9a9
928ff70
74eb1b9
5b35f2d
2501425
f38c0a0
a78cc90
402119e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -388,6 +388,14 @@ inline bool add_replica(CopyOnWriteHostVec& hosts, const Host::Ptr& host) { | |
return true; | ||
} | ||
|
||
class RawPtrHostSet : public DenseHashSet<Host*> { | ||
public: | ||
RawPtrHostSet() { | ||
set_empty_key(0x0); | ||
set_deleted_key(reinterpret_cast<Host*>(0x1)); | ||
} | ||
}; | ||
|
||
template <class Partitioner> | ||
void ReplicationStrategy<Partitioner>::build_replicas_network_topology( | ||
const TokenHostVec& tokens, const DatacenterMap& datacenters, TokenReplicasVec& result) const { | ||
|
@@ -435,6 +443,9 @@ void ReplicationStrategy<Partitioner>::build_replicas_network_topology( | |
CopyOnWriteHostVec replicas(new HostVec()); | ||
replicas->reserve(num_replicas); | ||
|
||
RawPtrHostSet replicas_set; | ||
replicas_set.resize(num_replicas); | ||
|
||
// Clear datacenter and rack information for the next token | ||
for (typename DatacenterRackInfoMap::iterator j = dc_racks.begin(), end = dc_racks.end(); | ||
j != end; ++j) { | ||
|
@@ -444,7 +455,7 @@ void ReplicationStrategy<Partitioner>::build_replicas_network_topology( | |
} | ||
|
||
for (typename TokenHostVec::const_iterator j = tokens.begin(), end = tokens.end(); | ||
j != end && replicas->size() < num_replicas; ++j) { | ||
j != end && replicas_set.size() < num_replicas; ++j) { | ||
typename TokenHostVec::const_iterator curr_token_it = token_it; | ||
Host* host = curr_token_it->second; | ||
uint32_t dc = host->dc_id(); | ||
|
@@ -476,15 +487,17 @@ void ReplicationStrategy<Partitioner>::build_replicas_network_topology( | |
// datacenter only then consider hosts in the same rack | ||
|
||
if (rack == 0 || racks_observed_this_dc.size() == rack_count_this_dc) { | ||
if (add_replica(replicas, Host::Ptr(host))) { | ||
if (replicas_set.insert(host).second) { | ||
replicas->push_back(Host::Ptr(host)); | ||
++replica_count_this_dc; | ||
} | ||
} else { | ||
TokenHostQueue& skipped_endpoints_this_dc = dc_rack_info.skipped_endpoints; | ||
if (racks_observed_this_dc.count(rack) > 0) { | ||
skipped_endpoints_this_dc.push_back(curr_token_it); | ||
} else { | ||
if (add_replica(replicas, Host::Ptr(host))) { | ||
if (replicas_set.insert(host).second) { | ||
|
||
replicas->push_back(Host::Ptr(host)); | ||
++replica_count_this_dc; | ||
racks_observed_this_dc.insert(rack); | ||
} | ||
|
@@ -494,7 +507,8 @@ void ReplicationStrategy<Partitioner>::build_replicas_network_topology( | |
if (racks_observed_this_dc.size() == rack_count_this_dc) { | ||
while (!skipped_endpoints_this_dc.empty() && | ||
replica_count_this_dc < replication_factor) { | ||
if (add_replica(replicas, Host::Ptr(skipped_endpoints_this_dc.front()->second))) { | ||
if (replicas_set.insert(skipped_endpoints_this_dc.front()->second).second) { | ||
replicas->push_back(Host::Ptr(skipped_endpoints_this_dc.front()->second)); | ||
++replica_count_this_dc; | ||
} | ||
skipped_endpoints_this_dc.pop_front(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -79,6 +79,8 @@ class BufferBuilder { | |
|
||
static size_t size_of(const String& value) { return value.size(); } | ||
|
||
static size_t size_of(const Address& value) { char buf[16]; return value.to_inet(buf); } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These changes fix test warnings. |
||
|
||
static void encode(char* buf, uint16_t value) { datastax::internal::encode_uint16(buf, value); } | ||
|
||
static void encode(char* buf, int32_t value) { datastax::internal::encode_int32(buf, value); } | ||
|
@@ -87,6 +89,8 @@ class BufferBuilder { | |
|
||
static void encode(char* buf, const String& value) { memcpy(buf, value.data(), value.size()); } | ||
|
||
static void encode(char* buf, const Address& value) { value.to_inet(buf); } | ||
|
||
private: | ||
String buffer_; | ||
}; | ||
|
@@ -154,9 +158,11 @@ class RowResultResponseBuilder : protected BufferBuilder { | |
++row_count_; | ||
} | ||
|
||
void append_local_peers_row_v3(const TokenVec& tokens, const String& partitioner, | ||
void append_local_peers_row_v3(const Address& rpc_address, | ||
const TokenVec& tokens, const String& partitioner, | ||
const String& dc, const String& rack, | ||
const String& release_version) { | ||
append_value<Address>(rpc_address); | ||
append_value<String>(rack); | ||
append_value<String>(dc); | ||
append_value<String>(release_version); | ||
|
@@ -306,8 +312,10 @@ inline Host::Ptr create_host(const Address& address, const TokenVec& tokens, | |
Host::Ptr host(new Host(address)); | ||
|
||
DataType::ConstPtr varchar_data_type(new DataType(CASS_VALUE_TYPE_VARCHAR)); | ||
DataType::ConstPtr inet_data_type(new DataType(CASS_VALUE_TYPE_INET)); | ||
|
||
ColumnMetadataVec column_metadata; | ||
column_metadata.push_back(ColumnMetadata("rpc_address", inet_data_type)); | ||
column_metadata.push_back(ColumnMetadata("data_center", varchar_data_type)); | ||
column_metadata.push_back(ColumnMetadata("rack", varchar_data_type)); | ||
column_metadata.push_back(ColumnMetadata("release_version", varchar_data_type)); | ||
|
@@ -318,7 +326,7 @@ inline Host::Ptr create_host(const Address& address, const TokenVec& tokens, | |
ColumnMetadata("tokens", CollectionType::list(varchar_data_type, true))); | ||
|
||
RowResultResponseBuilder builder(column_metadata); | ||
builder.append_local_peers_row_v3(tokens, partitioner, dc, rack, release_version); | ||
builder.append_local_peers_row_v3(address, tokens, partitioner, dc, rack, release_version); | ||
|
||
host->set(&builder.finish()->first_row(), true); | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -129,12 +129,12 @@ TEST(TokenMapUnitTest, Murmur3MultipleTokensPerHost) { | |
TEST(TokenMapUnitTest, Murmur3LargeNumberOfVnodes) { | ||
TestTokenMap<Murmur3Partitioner> test_murmur3; | ||
|
||
size_t num_dcs = 3; | ||
size_t num_racks = 3; | ||
size_t num_hosts = 4; | ||
size_t num_dcs = 2; | ||
size_t num_racks = 1; | ||
size_t num_hosts = 27; | ||
size_t num_vnodes = 256; | ||
size_t replication_factor = 3; | ||
size_t total_replicas = std::min(num_hosts, replication_factor) * num_dcs; | ||
size_t replication_factor = 54; | ||
size_t total_replicas = replication_factor; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should likely be reverted. This is a pathological use case though. |
||
|
||
ReplicationMap replication; | ||
MT19937_64 rng; | ||
|
@@ -166,9 +166,13 @@ TEST(TokenMapUnitTest, Murmur3LargeNumberOfVnodes) { | |
} | ||
} | ||
|
||
printf("Populating token map finished finished\n"); | ||
|
||
// Build token map | ||
add_keyspace_network_topology("ks1", replication, token_map); | ||
printf("Building replicas\n"); | ||
token_map->build(); | ||
printf("Building replicas finished\n"); | ||
|
||
const String keys[] = { "test", "abc", "def", "a", "b", "c", "d" }; | ||
|
||
|
Uh oh!
There was an error while loading. Please reload this page.