Skip to content

Commit aa93b35

Browse files
authored
Update v2 sync receive to align with GA behavior (#41817)
1 parent 73fe2ff commit aa93b35

File tree

3 files changed

+119
-144
lines changed

3 files changed

+119
-144
lines changed

sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerClient.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,15 +127,15 @@ public class EventHubConsumerClient implements Closeable {
127127
private final Duration timeout;
128128
private final AtomicInteger idGenerator = new AtomicInteger();
129129
private final EventHubsTracer tracer;
130-
private final SynchronousReceiver syncReceiver;
130+
private final SynchronousPartitionReceiver syncReceiver;
131131

132132
EventHubConsumerClient(EventHubConsumerAsyncClient consumer, Duration tryTimeout) {
133133
Objects.requireNonNull(tryTimeout, "'tryTimeout' cannot be null.");
134134

135135
this.consumer = Objects.requireNonNull(consumer, "'consumer' cannot be null.");
136136
this.timeout = tryTimeout;
137137
this.tracer = consumer.getInstrumentation().getTracer();
138-
this.syncReceiver = new SynchronousReceiver(LOGGER, consumer);
138+
this.syncReceiver = new SynchronousPartitionReceiver(consumer); // used in V2 mode.
139139
}
140140

141141
/**
@@ -337,6 +337,7 @@ public IterableStream<PartitionEvent> receiveFromPartition(String partitionId, i
337337
*/
338338
@Override
339339
public void close() {
340+
syncReceiver.dispose();
340341
consumer.close();
341342
}
342343

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.messaging.eventhubs;
5+
6+
import com.azure.core.amqp.implementation.WindowedSubscriber;
7+
import com.azure.core.amqp.implementation.WindowedSubscriber.WindowedSubscriberOptions;
8+
import com.azure.core.util.Context;
9+
import com.azure.core.util.IterableStream;
10+
import com.azure.messaging.eventhubs.implementation.instrumentation.EventHubsTracer;
11+
import com.azure.messaging.eventhubs.models.EventPosition;
12+
import com.azure.messaging.eventhubs.models.PartitionEvent;
13+
import com.azure.messaging.eventhubs.models.ReceiveOptions;
14+
import reactor.core.publisher.Flux;
15+
16+
import java.time.Duration;
17+
import java.time.Instant;
18+
import java.util.Collections;
19+
import java.util.Objects;
20+
import java.util.concurrent.atomic.AtomicReference;
21+
22+
import static com.azure.messaging.eventhubs.implementation.ClientConstants.PARTITION_ID_KEY;
23+
24+
/**
25+
* A type that channels synchronous receive requests to a backing asynchronous receiver client.
26+
*/
27+
final class SynchronousPartitionReceiver {
28+
private static final String TERMINAL_MESSAGE = "The receiver client is terminated. Re-create the client to continue receive attempt.";
29+
private static final String SYNC_RECEIVE_SPAN_NAME = "EventHubs.receiveFromPartition";
30+
private final EventHubsTracer tracer;
31+
private final AtomicReference<Receiver> receiver = new AtomicReference<>(null);
32+
33+
/**
34+
* Creates a SynchronousPartitionReceiver.
35+
*
36+
* @param client the backing asynchronous client to connect to the broker, delegate message requesting and receive.
37+
*/
38+
SynchronousPartitionReceiver(EventHubConsumerAsyncClient client) {
39+
Objects.requireNonNull(client, "'client' cannot be null.");
40+
this.receiver.set(new DelegatingReceiver(client));
41+
this.tracer = client.getInstrumentation().getTracer();
42+
}
43+
44+
/**
45+
* Request a specified number of event from a parition and obtain an {@link IterableStream} streaming the received
46+
* event.
47+
*
48+
* @param partitionId Identifier of the partition to read events from.
49+
* @param startingPosition Position within the Event Hub partition to begin consuming events.
50+
* @param receiveOptions Options when receiving events from the partition.
51+
* @param maxEvents the maximum number of event to receive.
52+
* @param maxWaitTime the upper bound for the time to wait to receive the requested number of event.
53+
*
54+
* @return an {@link IterableStream} of at most {@code maxEvents} event.
55+
*/
56+
IterableStream<PartitionEvent> receive(String partitionId, EventPosition startingPosition,
57+
ReceiveOptions receiveOptions, int maxEvents, Duration maxWaitTime) {
58+
Objects.requireNonNull(partitionId, "'partitionId' cannot be null.");
59+
Objects.requireNonNull(startingPosition, "'startingPosition' cannot be null.");
60+
Objects.requireNonNull(receiveOptions, "'receiveOptions' cannot be null.");
61+
62+
final WindowedSubscriber<PartitionEvent> subscriber = createSubscriber(partitionId);
63+
final Flux<PartitionEvent> upstream = receiver.get().receive(partitionId, startingPosition, receiveOptions);
64+
upstream.subscribeWith(subscriber);
65+
final Flux<PartitionEvent> windowFlux = subscriber.enqueueRequestFlux(maxEvents, maxWaitTime);
66+
return new IterableStream<>(windowFlux.doOnComplete(subscriber::cancel).doOnError(__ -> subscriber.cancel()));
67+
}
68+
69+
/**
70+
* Disposes the SynchronousReceiver.
71+
* <p>
72+
* Once disposed, the {@link IterableStream} for any future or pending receive requests will receive terminated error.
73+
* </p>
74+
*/
75+
void dispose() {
76+
receiver.set(Receiver.DISPOSED);
77+
}
78+
79+
/**
80+
* Create a {@link WindowedSubscriber} capable of bridging synchronous receive requests to an upstream of
81+
* asynchronous event.
82+
*
83+
* @param partitionId Identifier of the partition to read events from.
84+
*
85+
* @return The subscriber.
86+
*/
87+
private WindowedSubscriber<PartitionEvent> createSubscriber(String partitionId) {
88+
final WindowedSubscriberOptions<PartitionEvent> options = new WindowedSubscriberOptions<>();
89+
options.setWindowDecorator(toDecorate -> {
90+
// Decorates the provided 'toDecorate' flux for tracing the signals (events, termination) it produces.
91+
final Instant startTime = tracer.isEnabled() ? Instant.now() : null;
92+
return tracer.reportSyncReceiveSpan(SYNC_RECEIVE_SPAN_NAME, startTime, toDecorate, Context.NONE);
93+
});
94+
return new WindowedSubscriber<>(Collections.singletonMap(PARTITION_ID_KEY, partitionId), TERMINAL_MESSAGE, options);
95+
}
96+
97+
private interface Receiver {
98+
Receiver DISPOSED = (partitionId, startingPosition, receiveOptions) -> Flux.error(new RuntimeException(TERMINAL_MESSAGE));
99+
100+
Flux<PartitionEvent> receive(String partitionId, EventPosition startingPosition, ReceiveOptions receiveOptions);
101+
}
102+
103+
private static final class DelegatingReceiver implements Receiver {
104+
private final EventHubConsumerAsyncClient client;
105+
106+
DelegatingReceiver(EventHubConsumerAsyncClient client) {
107+
this.client = Objects.requireNonNull(client, "'client' cannot be null.");
108+
}
109+
110+
@Override
111+
public Flux<PartitionEvent> receive(String partitionId, EventPosition startingPosition, ReceiveOptions receiveOptions) {
112+
assert client.isV2();
113+
return client.receiveFromPartition(partitionId, startingPosition, receiveOptions);
114+
}
115+
}
116+
}

sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/SynchronousReceiver.java

Lines changed: 0 additions & 142 deletions
This file was deleted.

0 commit comments

Comments
 (0)