Skip to content

Commit 237d39e

Browse files
committed
Fix the failing tests due to connection profile missing STREAM type
Signed-off-by: Rishabh Maurya <[email protected]>
1 parent 46afedd commit 237d39e

File tree

7 files changed

+58
-18
lines changed

7 files changed

+58
-18
lines changed

plugins/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/transport/FlightTransportTestBase.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,15 @@
3636
import java.io.IOException;
3737
import java.net.InetAddress;
3838
import java.util.Collections;
39+
import java.util.concurrent.atomic.AtomicInteger;
3940

4041
import static org.mockito.Mockito.mock;
4142
import static org.mockito.Mockito.spy;
4243

4344
public abstract class FlightTransportTestBase extends OpenSearchTestCase {
4445

46+
private static final AtomicInteger portCounter = new AtomicInteger(0);
47+
4548
protected DiscoveryNode remoteNode;
4649
protected Location serverLocation;
4750
protected HeaderContext headerContext;
@@ -57,14 +60,21 @@ public abstract class FlightTransportTestBase extends OpenSearchTestCase {
5760
public void setUp() throws Exception {
5861
super.setUp();
5962

60-
TransportAddress streamAddress = new TransportAddress(InetAddress.getLoopbackAddress(), 9401);
61-
TransportAddress transportAddress = new TransportAddress(InetAddress.getLoopbackAddress(), 9300);
63+
int basePort = getBasePort(9500);
64+
int streamPort = basePort + portCounter.incrementAndGet();
65+
int transportPort = basePort + portCounter.incrementAndGet();
66+
67+
TransportAddress streamAddress = new TransportAddress(InetAddress.getLoopbackAddress(), streamPort);
68+
TransportAddress transportAddress = new TransportAddress(InetAddress.getLoopbackAddress(), transportPort);
6269
remoteNode = new DiscoveryNode(new DiscoveryNode("test-node-id", transportAddress, Version.CURRENT), streamAddress);
6370
boundAddress = new BoundTransportAddress(new TransportAddress[] { transportAddress }, transportAddress);
64-
serverLocation = Location.forGrpcInsecure("localhost", 9401);
71+
serverLocation = Location.forGrpcInsecure("localhost", streamPort);
6572
headerContext = new HeaderContext();
6673

67-
Settings settings = Settings.builder().put("node.name", getTestName()).build();
74+
Settings settings = Settings.builder()
75+
.put("node.name", getTestName())
76+
.put("aux.transport.transport-flight.port", streamPort)
77+
.build();
6878
ServerConfig.init(settings);
6979
threadPool = new ThreadPool(settings, ServerConfig.getClientExecutorBuilder(), ServerConfig.getServerExecutorBuilder());
7080
namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
@@ -151,10 +161,6 @@ public void writeTo(StreamOutput out) throws IOException {
151161
protected static class TestResponse extends TransportResponse {
152162
private final String data;
153163

154-
public TestResponse() {
155-
this.data = null;
156-
}
157-
158164
public TestResponse(String data) {
159165
this.data = data;
160166
}

server/src/main/java/org/opensearch/transport/ConnectionProfile.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,8 @@ public static ConnectionProfile buildDefaultConnectionProfile(Settings settings)
112112
// if we are not a data-node we don't need any dedicated channels for recovery
113113
builder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY);
114114
builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG);
115-
// TODO use different setting for connectionsPerNodeReg for stream request
116-
builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.STREAM);
115+
// we build a single channel profile with only supported type as STREAM for stream transport defined in StreamTransportService
116+
builder.addConnections(0, TransportRequestOptions.Type.STREAM);
117117
return builder.build();
118118
}
119119

server/src/main/java/org/opensearch/transport/RemoteConnectionStrategy.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,8 @@ static ConnectionProfile buildConnectionProfile(String clusterAlias, Settings se
188188
TransportRequestOptions.Type.BULK,
189189
TransportRequestOptions.Type.STATE,
190190
TransportRequestOptions.Type.RECOVERY,
191-
TransportRequestOptions.Type.PING
191+
TransportRequestOptions.Type.PING,
192+
TransportRequestOptions.Type.STREAM
192193
)
193194
.addConnections(mode.numberOfChannels, TransportRequestOptions.Type.REG);
194195
return builder.build();

server/src/main/java/org/opensearch/transport/StreamTransportService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public Transport.Connection getConnection(DiscoveryNode node) {
113113
try {
114114
return connectionManager.getConnection(node);
115115
} catch (Exception e) {
116-
logger.error("Failed to get streaming connection to node [{}]", node, e);
116+
logger.error("Failed to get streaming connection to node [{}]: {}", node, e.getMessage());
117117
throw new ConnectTransportException(node, "Failed to get streaming connection", e);
118118
}
119119
}

server/src/test/java/org/opensearch/transport/ConnectionProfileTests.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,20 +76,26 @@ public void testBuildConnectionProfile() {
7676
builder.addConnections(1, TransportRequestOptions.Type.BULK);
7777
builder.addConnections(2, TransportRequestOptions.Type.STATE, TransportRequestOptions.Type.RECOVERY);
7878
builder.addConnections(3, TransportRequestOptions.Type.PING);
79+
builder.addConnections(0, TransportRequestOptions.Type.STREAM);
80+
7981
IllegalStateException illegalStateException = expectThrows(IllegalStateException.class, builder::build);
80-
assertEquals("not all types are added for this connection profile - missing types: [REG]", illegalStateException.getMessage());
82+
assertEquals(
83+
"not all types are added for this connection profile - missing types: [REG, STREAM]",
84+
illegalStateException.getMessage()
85+
);
8186

8287
IllegalArgumentException illegalArgumentException = expectThrows(
8388
IllegalArgumentException.class,
8489
() -> builder.addConnections(4, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING)
8590
);
8691
assertEquals("type [PING] is already registered", illegalArgumentException.getMessage());
8792
builder.addConnections(4, TransportRequestOptions.Type.REG);
93+
builder.addConnections(1, TransportRequestOptions.Type.STREAM);
8894
ConnectionProfile build = builder.build();
8995
if (randomBoolean()) {
9096
build = new ConnectionProfile.Builder(build).build();
9197
}
92-
assertEquals(10, build.getNumConnections());
98+
assertEquals(11, build.getNumConnections());
9399
if (setConnectTimeout) {
94100
assertEquals(connectTimeout, build.getConnectTimeout());
95101
} else {
@@ -114,12 +120,12 @@ public void testBuildConnectionProfile() {
114120
assertNull(build.getPingInterval());
115121
}
116122

117-
List<Integer> list = new ArrayList<>(10);
118-
for (int i = 0; i < 10; i++) {
123+
List<Integer> list = new ArrayList<>(11);
124+
for (int i = 0; i < 11; i++) {
119125
list.add(i);
120126
}
121127
final int numIters = randomIntBetween(5, 10);
122-
assertEquals(4, build.getHandles().size());
128+
assertEquals(5, build.getHandles().size());
123129
assertEquals(0, build.getHandles().get(0).offset);
124130
assertEquals(1, build.getHandles().get(0).length);
125131
assertEquals(EnumSet.of(TransportRequestOptions.Type.BULK), build.getHandles().get(0).getTypes());
@@ -155,11 +161,20 @@ public void testBuildConnectionProfile() {
155161
assertThat(channel, Matchers.anyOf(Matchers.is(6), Matchers.is(7), Matchers.is(8), Matchers.is(9)));
156162
}
157163

164+
assertEquals(10, build.getHandles().get(4).offset);
165+
assertEquals(1, build.getHandles().get(4).length);
166+
assertEquals(EnumSet.of(TransportRequestOptions.Type.STREAM), build.getHandles().get(4).getTypes());
167+
channel = build.getHandles().get(4).getChannel(list);
168+
for (int i = 0; i < numIters; i++) {
169+
assertEquals(10, channel.intValue());
170+
}
171+
158172
assertEquals(3, build.getNumConnectionsPerType(TransportRequestOptions.Type.PING));
159173
assertEquals(4, build.getNumConnectionsPerType(TransportRequestOptions.Type.REG));
160174
assertEquals(2, build.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
161175
assertEquals(2, build.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
162176
assertEquals(1, build.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));
177+
assertEquals(1, build.getNumConnectionsPerType(TransportRequestOptions.Type.STREAM));
163178
}
164179

165180
public void testNoChannels() {
@@ -169,7 +184,8 @@ public void testNoChannels() {
169184
TransportRequestOptions.Type.BULK,
170185
TransportRequestOptions.Type.STATE,
171186
TransportRequestOptions.Type.RECOVERY,
172-
TransportRequestOptions.Type.REG
187+
TransportRequestOptions.Type.REG,
188+
TransportRequestOptions.Type.STREAM
173189
);
174190
builder.addConnections(0, TransportRequestOptions.Type.PING);
175191
ConnectionProfile build = builder.build();
@@ -188,6 +204,7 @@ public void testConnectionProfileResolve() {
188204
builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.REG);
189205
builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.STATE);
190206
builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.PING);
207+
builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.STREAM);
191208

192209
final boolean connectionTimeoutSet = randomBoolean();
193210
if (connectionTimeoutSet) {
@@ -235,6 +252,7 @@ public void testDefaultConnectionProfile() {
235252
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
236253
assertEquals(2, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
237254
assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));
255+
assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STREAM));
238256
assertEquals(TransportSettings.CONNECT_TIMEOUT.get(Settings.EMPTY), profile.getConnectTimeout());
239257
assertEquals(TransportSettings.CONNECT_TIMEOUT.get(Settings.EMPTY), profile.getHandshakeTimeout());
240258
assertEquals(TransportSettings.TRANSPORT_COMPRESS.get(Settings.EMPTY), profile.getCompressionEnabled());
@@ -247,6 +265,7 @@ public void testDefaultConnectionProfile() {
247265
assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
248266
assertEquals(2, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
249267
assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));
268+
assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STREAM));
250269

251270
profile = ConnectionProfile.buildDefaultConnectionProfile(nonDataNode());
252271
assertEquals(11, profile.getNumConnections());
@@ -255,6 +274,7 @@ public void testDefaultConnectionProfile() {
255274
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
256275
assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
257276
assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));
277+
assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STREAM));
258278

259279
profile = ConnectionProfile.buildDefaultConnectionProfile(
260280
removeRoles(
@@ -267,5 +287,6 @@ public void testDefaultConnectionProfile() {
267287
assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
268288
assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
269289
assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));
290+
assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STREAM));
270291
}
271292
}

test/framework/src/main/java/org/opensearch/transport/AbstractSimpleTransportTestCase.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2178,6 +2178,7 @@ public void testTimeoutPerConnection() throws IOException {
21782178
TransportRequestOptions.Type.REG,
21792179
TransportRequestOptions.Type.STATE
21802180
);
2181+
builder.addConnections(0, TransportRequestOptions.Type.STREAM);
21812182
// connection with one connection and a large timeout -- should consume the one spot in the backlog queue
21822183
try (TransportService service = buildService("TS_TPC", Version.CURRENT, null, Settings.EMPTY, true, false)) {
21832184
IOUtils.close(service.openConnection(first, builder.build()));
@@ -2214,6 +2215,7 @@ public void testHandshakeWithIncompatVersion() {
22142215
TransportRequestOptions.Type.REG,
22152216
TransportRequestOptions.Type.STATE
22162217
);
2218+
builder.addConnections(0, TransportRequestOptions.Type.STREAM);
22172219
expectThrows(ConnectTransportException.class, () -> serviceA.openConnection(node, builder.build()));
22182220
}
22192221
}
@@ -2242,6 +2244,7 @@ public void testHandshakeUpdatesVersion() throws IOException {
22422244
TransportRequestOptions.Type.REG,
22432245
TransportRequestOptions.Type.STATE
22442246
);
2247+
builder.addConnections(0, TransportRequestOptions.Type.STREAM);
22452248
try (Transport.Connection connection = serviceA.openConnection(node, builder.build())) {
22462249
assertEquals(version, connection.getVersion());
22472250
}
@@ -2313,6 +2316,7 @@ public void testTcpHandshakeTimeout() throws IOException {
23132316
TransportRequestOptions.Type.REG,
23142317
TransportRequestOptions.Type.STATE
23152318
);
2319+
builder.addConnections(0, TransportRequestOptions.Type.STREAM);
23162320
builder.setHandshakeTimeout(TimeValue.timeValueMillis(1));
23172321
ConnectTransportException ex = expectThrows(
23182322
ConnectTransportException.class,
@@ -2355,6 +2359,7 @@ public void run() {
23552359
TransportRequestOptions.Type.REG,
23562360
TransportRequestOptions.Type.STATE
23572361
);
2362+
builder.addConnections(0, TransportRequestOptions.Type.STREAM);
23582363
builder.setHandshakeTimeout(TimeValue.timeValueHours(1));
23592364
ConnectTransportException ex = expectThrows(
23602365
ConnectTransportException.class,
@@ -2478,6 +2483,8 @@ public String executor() {
24782483
TransportRequestOptions.Type.REG,
24792484
TransportRequestOptions.Type.STATE
24802485
);
2486+
builder.addConnections(0, TransportRequestOptions.Type.STREAM);
2487+
24812488
try (Transport.Connection connection = serviceB.openConnection(serviceC.getLocalNode(), builder.build())) {
24822489
serviceC.close();
24832490
serviceB.sendRequest(
@@ -2549,6 +2556,7 @@ public String executor() {
25492556
TransportRequestOptions.Type.REG,
25502557
TransportRequestOptions.Type.STATE
25512558
);
2559+
builder.addConnections(0, TransportRequestOptions.Type.STREAM);
25522560

25532561
try (Transport.Connection connection = serviceB.openConnection(serviceC.getLocalNode(), builder.build())) {
25542562
serviceB.sendRequest(
@@ -2629,6 +2637,7 @@ public String executor() {
26292637
TransportRequestOptions.Type.REG,
26302638
TransportRequestOptions.Type.STATE
26312639
);
2640+
builder.addConnections(0, TransportRequestOptions.Type.STREAM);
26322641
try (Transport.Connection connection = serviceC.openConnection(serviceB.getLocalNode(), builder.build())) {
26332642
assertBusy(() -> { // netty for instance invokes this concurrently so we better use assert busy here
26342643
TransportStats transportStats = serviceC.transport.getStats(); // we did a single round-trip to do the initial handshake
@@ -2750,6 +2759,7 @@ public String executor() {
27502759
TransportRequestOptions.Type.REG,
27512760
TransportRequestOptions.Type.STATE
27522761
);
2762+
builder.addConnections(0, TransportRequestOptions.Type.STREAM);
27532763
try (Transport.Connection connection = serviceC.openConnection(serviceB.getLocalNode(), builder.build())) {
27542764
assertBusy(() -> { // netty for instance invokes this concurrently so we better use assert busy here
27552765
TransportStats transportStats = serviceC.transport.getStats(); // request has been sent
@@ -3035,6 +3045,7 @@ public void onConnectionClosed(Transport.Connection connection) {
30353045
TransportRequestOptions.Type.REG,
30363046
TransportRequestOptions.Type.STATE
30373047
);
3048+
builder.addConnections(0, TransportRequestOptions.Type.STREAM);
30383049
final ConnectTransportException e = expectThrows(
30393050
ConnectTransportException.class,
30403051
() -> service.openConnection(nodeA, builder.build())

test/framework/src/main/java/org/opensearch/transport/TestProfiles.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ private TestProfiles() {}
5959
TransportRequestOptions.Type.REG,
6060
TransportRequestOptions.Type.STATE
6161
);
62+
builder.addConnections(0, TransportRequestOptions.Type.STREAM);
6263
LIGHT_PROFILE = builder.build();
6364
}
6465
}

0 commit comments

Comments
 (0)