|
25 | 25 | import org.apache.flink.api.common.serialization.SimpleStringSchema; |
26 | 26 | import org.apache.flink.connector.gcp.pubsub.source.PubSubSource; |
27 | 27 | import org.apache.flink.streaming.api.datastream.DataStream; |
28 | | -import org.apache.flink.streaming.api.datastream.DataStreamUtils; |
29 | 28 | import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; |
30 | 29 | import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentials; |
31 | 30 | import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.GCloudUnitTestBase; |
@@ -126,16 +125,17 @@ public void testFlinkSource(boolean testWithFailure) throws Exception { |
126 | 125 | fromPubSub = fromPubSub.map(new FailureMapFunction<>(3)); |
127 | 126 | } |
128 | 127 |
|
129 | | - List<String> output = new ArrayList<>(); |
130 | | - DataStreamUtils.collect(fromPubSub).forEachRemaining(output::add); |
| 128 | + // Asking for any more elements would wait forever, and there isn't a graceful way to |
| 129 | + // indicate end of stream. |
| 130 | + List<String> output = fromPubSub.executeAndCollect(input.size()); |
131 | 131 |
|
132 | 132 | assertEquals("Wrong number of elements", input.size(), output.size()); |
133 | 133 | for (String test : input) { |
134 | 134 | assertTrue("Missing " + test, output.contains(test)); |
135 | 135 | } |
136 | 136 | } |
137 | 137 |
|
138 | | - private class FailureMapFunction<T> extends RichMapFunction<T, T> { |
| 138 | + private static class FailureMapFunction<T> extends RichMapFunction<T, T> { |
139 | 139 | private final long numberOfRecordsUntilFailure; |
140 | 140 | private long numberOfRecordsProcessed; |
141 | 141 |
|
|
0 commit comments