Skip to content

Commit 5087aff

Browse files
committed
Replace the placeholder client request and response mechanisms
Motivation: The client code for async-await has some placeholder implementations for receiving responses and sending requests. Now that we have the necessary types we can replace the placeholders. Modifications: - Replace `AsyncStream` usage with a `PassthroughMessageSequence` - Add a `GRPCAsyncRequestStreamWriter` which request streaming RPCs may use to write requests and finish the request stream. - This writer depends on a delegate which uses callbacks to send messages on the underlying `Call`. - Request streaming RPCs now have a `requestStream` on which to send requests. - Wrap up `AsyncWriterError` as a `public` `struct`. Result: Clients may send requests on a `requestStream`.
1 parent fc2d640 commit 5087aff

10 files changed

+292
-201
lines changed

Sources/GRPC/AsyncAwaitSupport/AsyncWriter.swift

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ internal final actor AsyncWriter<Delegate: AsyncWriterDelegate> {
239239
///
240240
/// The call may be suspend if the writer is paused.
241241
///
242-
/// Throws: ``AsyncWriterError`` if the writer has already been finished or too many write tasks
242+
/// Throws: ``GRPCAsyncWriterError`` if the writer has already been finished or too many write tasks
243243
/// have been suspended.
244244
@inlinable
245245
internal func write(_ element: Element) async throws {
@@ -256,15 +256,15 @@ internal final actor AsyncWriter<Delegate: AsyncWriterDelegate> {
256256
// - error (the writer is complete or the queue is full).
257257

258258
if self._completionState.isPendingOrCompleted {
259-
continuation.resume(throwing: AsyncWriterError.alreadyFinished)
259+
continuation.resume(throwing: GRPCAsyncWriterError.alreadyFinished)
260260
} else if !self._isPaused, self._pendingElements.isEmpty {
261261
self._delegate.write(element)
262262
continuation.resume()
263263
} else if self._pendingElements.count < self._maxPendingElements {
264264
// The continuation will be resumed later.
265265
self._pendingElements.append(PendingElement(element, continuation: continuation))
266266
} else {
267-
continuation.resume(throwing: AsyncWriterError.tooManyPendingWrites)
267+
continuation.resume(throwing: GRPCAsyncWriterError.tooManyPendingWrites)
268268
}
269269
}
270270

@@ -279,7 +279,7 @@ internal final actor AsyncWriter<Delegate: AsyncWriterDelegate> {
279279
@inlinable
280280
internal func _finish(_ end: End, continuation: CheckedContinuation<Void, Error>) {
281281
if self._completionState.isPendingOrCompleted {
282-
continuation.resume(throwing: AsyncWriterError.alreadyFinished)
282+
continuation.resume(throwing: GRPCAsyncWriterError.alreadyFinished)
283283
} else if !self._isPaused, self._pendingElements.isEmpty {
284284
self._completionState = .completed
285285
self._delegate.writeEnd(end)
@@ -291,10 +291,35 @@ internal final actor AsyncWriter<Delegate: AsyncWriterDelegate> {
291291
}
292292
}
293293

294-
@usableFromInline
295-
internal enum AsyncWriterError: Error, Hashable {
296-
case tooManyPendingWrites
297-
case alreadyFinished
294+
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
295+
extension AsyncWriter where End == Void {
296+
@inlinable
297+
internal func finish() async throws {
298+
try await self.finish(())
299+
}
300+
}
301+
302+
public struct GRPCAsyncWriterError: Error, Hashable {
303+
private let wrapped: Wrapped
304+
305+
@usableFromInline
306+
internal enum Wrapped {
307+
case tooManyPendingWrites
308+
case alreadyFinished
309+
}
310+
311+
@usableFromInline
312+
internal init(_ wrapped: Wrapped) {
313+
self.wrapped = wrapped
314+
}
315+
316+
/// There are too many writes pending. This may occur when too many Tasks are writing
317+
/// concurrently.
318+
public static let tooManyPendingWrites = Self(.tooManyPendingWrites)
319+
320+
/// The writer has already finished. This may occur when the RPC completes prematurely, or when
321+
/// a user calls finish more than once.
322+
public static let alreadyFinished = Self(.alreadyFinished)
298323
}
299324

300325
@usableFromInline
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright 2021, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#if compiler(>=5.5)
17+
18+
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
19+
extension Call {
20+
internal func makeRequestStreamWriter() -> GRPCAsyncRequestStreamWriter<Request> {
21+
let delegate = GRPCAsyncRequestStreamWriter<Request>.Delegate(
22+
compressionEnabled: self.options.messageEncoding.enabledForRequests
23+
) { request, metadata in
24+
self.send(.message(request, metadata), promise: nil)
25+
} finish: {
26+
self.send(.end, promise: nil)
27+
}
28+
29+
return GRPCAsyncRequestStreamWriter(asyncWriter: .init(delegate: delegate))
30+
}
31+
}
32+
33+
#endif // compiler(>=5.5)

Sources/GRPC/AsyncAwaitSupport/GRPCAsyncBidirectionalStreamingCall.swift

Lines changed: 34 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,17 @@
1515
*/
1616
#if compiler(>=5.5)
1717

18-
import _NIOConcurrency
1918
import NIOHPACK
2019

2120
/// Async-await variant of BidirectionalStreamingCall.
2221
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
2322
public struct GRPCAsyncBidirectionalStreamingCall<Request, Response> {
2423
private let call: Call<Request, Response>
2524
private let responseParts: StreamingResponseParts<Response>
25+
private let responseSource: PassthroughMessageSource<Response, Error>
26+
27+
/// A request stream writer for sending messages to the server.
28+
public let requestStream: GRPCAsyncRequestStreamWriter<Request>
2629

2730
/// The stream of responses from the server.
2831
public let responses: GRPCAsyncResponseStream<Response>
@@ -70,93 +73,43 @@ public struct GRPCAsyncBidirectionalStreamingCall<Request, Response> {
7073

7174
private init(call: Call<Request, Response>) {
7275
self.call = call
73-
// Initialise `responseParts` with an empty response handler because we
74-
// provide the responses as an AsyncSequence in `responseStream`.
7576
self.responseParts = StreamingResponseParts(on: call.eventLoop) { _ in }
76-
77-
// Call and StreamingResponseParts are reference types so we grab a
78-
// referecence to them here to avoid capturing mutable self in the closure
79-
// passed to the AsyncThrowingStream initializer.
80-
//
81-
// The alternative would be to declare the responseStream as:
82-
// ```
83-
// public private(set) var responseStream: AsyncThrowingStream<ResponsePayload>!
84-
// ```
85-
//
86-
// UPDATE: Additionally we expect to replace this soon with an AsyncSequence
87-
// implementation that supports yielding values from outside the closure.
88-
let call = self.call
89-
let responseParts = self.responseParts
90-
let responseStream = AsyncThrowingStream(Response.self) { continuation in
91-
call.invokeStreamingRequests { error in
92-
responseParts.handleError(error)
93-
continuation.finish(throwing: error)
94-
} onResponsePart: { responsePart in
95-
responseParts.handle(responsePart)
96-
switch responsePart {
97-
case let .message(response):
98-
continuation.yield(response)
99-
case .metadata:
100-
break
101-
case .end:
102-
continuation.finish()
103-
}
104-
}
105-
}
106-
self.responses = .init(responseStream)
77+
self.responseSource = PassthroughMessageSource<Response, Error>()
78+
self.responses = .init(PassthroughMessageSequence(consuming: self.responseSource))
79+
self.requestStream = call.makeRequestStreamWriter()
10780
}
10881

10982
/// We expose this as the only non-private initializer so that the caller
11083
/// knows that invocation is part of initialisation.
11184
internal static func makeAndInvoke(call: Call<Request, Response>) -> Self {
112-
Self(call: call)
113-
}
114-
115-
// MARK: - Requests
116-
117-
/// Sends a message to the service.
118-
///
119-
/// - Important: Callers must terminate the stream of messages by calling `sendEnd()`.
120-
///
121-
/// - Parameters:
122-
/// - message: The message to send.
123-
/// - compression: Whether compression should be used for this message. Ignored if compression
124-
/// was not enabled for the RPC.
125-
public func sendMessage(
126-
_ message: Request,
127-
compression: Compression = .deferToCallDefault
128-
) async throws {
129-
let compress = self.call.compress(compression)
130-
let promise = self.call.eventLoop.makePromise(of: Void.self)
131-
self.call.send(.message(message, .init(compress: compress, flush: true)), promise: promise)
132-
// TODO: This waits for the message to be written to the socket. We should probably just wait for it to be written to the channel?
133-
try await promise.futureResult.get()
134-
}
135-
136-
/// Sends a sequence of messages to the service.
137-
///
138-
/// - Important: Callers must terminate the stream of messages by calling `sendEnd()`.
139-
///
140-
/// - Parameters:
141-
/// - messages: The sequence of messages to send.
142-
/// - compression: Whether compression should be used for this message. Ignored if compression
143-
/// was not enabled for the RPC.
144-
public func sendMessages<S>(
145-
_ messages: S,
146-
compression: Compression = .deferToCallDefault
147-
) async throws where S: Sequence, S.Element == Request {
148-
let promise = self.call.eventLoop.makePromise(of: Void.self)
149-
self.call.sendMessages(messages, compression: compression, promise: promise)
150-
try await promise.futureResult.get()
151-
}
85+
let asyncCall = Self(call: call)
86+
87+
asyncCall.call.invokeStreamingRequests { error in
88+
asyncCall.responseParts.handleError(error)
89+
asyncCall.responseSource.finish(throwing: error)
90+
} onResponsePart: { responsePart in
91+
// Handle the metadata, trailers and status.
92+
asyncCall.responseParts.handle(responsePart)
93+
94+
// Handle the response messages and status.
95+
switch responsePart {
96+
case .metadata:
97+
()
98+
99+
case let .message(response):
100+
// TODO: when we support backpressure we will need to stop ignoring the return value.
101+
_ = asyncCall.responseSource.yield(response)
102+
103+
case let .end(status, _):
104+
if status.isOk {
105+
asyncCall.responseSource.finish()
106+
} else {
107+
asyncCall.responseSource.finish(throwing: status)
108+
}
109+
}
110+
}
152111

153-
/// Terminates a stream of messages sent to the service.
154-
///
155-
/// - Important: This should only ever be called once.
156-
public func sendEnd() async throws {
157-
let promise = self.call.eventLoop.makePromise(of: Void.self)
158-
self.call.send(.end, promise: promise)
159-
try await promise.futureResult.get()
112+
return asyncCall
160113
}
161114
}
162115

Sources/GRPC/AsyncAwaitSupport/GRPCAsyncClientStreamingCall.swift

Lines changed: 4 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ public struct GRPCAsyncClientStreamingCall<Request, Response> {
2323
private let call: Call<Request, Response>
2424
private let responseParts: UnaryResponseParts<Response>
2525

26+
/// A request stream writer for sending messages to the server.
27+
public let requestStream: GRPCAsyncRequestStreamWriter<Request>
28+
2629
/// The options used to make the RPC.
2730
public var options: CallOptions {
2831
return self.call.options
@@ -79,60 +82,14 @@ public struct GRPCAsyncClientStreamingCall<Request, Response> {
7982
onError: self.responseParts.handleError(_:),
8083
onResponsePart: self.responseParts.handle(_:)
8184
)
85+
self.requestStream = call.makeRequestStreamWriter()
8286
}
8387

8488
/// We expose this as the only non-private initializer so that the caller
8589
/// knows that invocation is part of initialisation.
8690
internal static func makeAndInvoke(call: Call<Request, Response>) -> Self {
8791
Self(call: call)
8892
}
89-
90-
// MARK: - Requests
91-
92-
/// Sends a message to the service.
93-
///
94-
/// - Important: Callers must terminate the stream of messages by calling `sendEnd()`.
95-
///
96-
/// - Parameters:
97-
/// - message: The message to send.
98-
/// - compression: Whether compression should be used for this message. Ignored if compression
99-
/// was not enabled for the RPC.
100-
public func sendMessage(
101-
_ message: Request,
102-
compression: Compression = .deferToCallDefault
103-
) async throws {
104-
let compress = self.call.compress(compression)
105-
let promise = self.call.eventLoop.makePromise(of: Void.self)
106-
self.call.send(.message(message, .init(compress: compress, flush: true)), promise: promise)
107-
// TODO: This waits for the message to be written to the socket. We should probably just wait for it to be written to the channel?
108-
try await promise.futureResult.get()
109-
}
110-
111-
/// Sends a sequence of messages to the service.
112-
///
113-
/// - Important: Callers must terminate the stream of messages by calling `sendEnd()`.
114-
///
115-
/// - Parameters:
116-
/// - messages: The sequence of messages to send.
117-
/// - compression: Whether compression should be used for this message. Ignored if compression
118-
/// was not enabled for the RPC.
119-
public func sendMessages<S>(
120-
_ messages: S,
121-
compression: Compression = .deferToCallDefault
122-
) async throws where S: Sequence, S.Element == Request {
123-
let promise = self.call.eventLoop.makePromise(of: Void.self)
124-
self.call.sendMessages(messages, compression: compression, promise: promise)
125-
try await promise.futureResult.get()
126-
}
127-
128-
/// Terminates a stream of messages sent to the service.
129-
///
130-
/// - Important: This should only ever be called once.
131-
public func sendEnd() async throws {
132-
let promise = self.call.eventLoop.makePromise(of: Void.self)
133-
self.call.send(.end, promise: promise)
134-
try await promise.futureResult.get()
135-
}
13693
}
13794

13895
#endif

0 commit comments

Comments
 (0)