-
Notifications
You must be signed in to change notification settings - Fork 322
JSONRPCConnection handled in swift instead of DispatchIO #2315
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -58,8 +58,9 @@ public final class JSONRPCConnection: Connection { | |||||||||||||||||
/// The queue on which we send data. | ||||||||||||||||||
private let sendQueue: DispatchQueue = DispatchQueue(label: "jsonrpc-send-queue", qos: .userInitiated) | ||||||||||||||||||
|
||||||||||||||||||
private let receiveIO: DispatchIO | ||||||||||||||||||
private let sendIO: DispatchIO | ||||||||||||||||||
private let inFD: FileHandle | ||||||||||||||||||
private let outFD: FileHandle | ||||||||||||||||||
let ioGroup: DispatchGroup | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be |
||||||||||||||||||
private let messageRegistry: MessageRegistry | ||||||||||||||||||
|
||||||||||||||||||
/// If non-nil, all input received by this `JSONRPCConnection` will be written to the file handle | ||||||||||||||||||
|
@@ -86,7 +87,6 @@ public final class JSONRPCConnection: Connection { | |||||||||||||||||
/// Buffer of received bytes that haven't been parsed. | ||||||||||||||||||
/// | ||||||||||||||||||
/// Access to this must be be guaranteed to be sequential to avoid data races. Currently, all access are | ||||||||||||||||||
/// - The `receiveIO` handler: This is synchronized on `queue`. | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We now have a corresponding use in /// - The `inFD.readabilityHandler`: This is synchronized on `queue`. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yup I'll need to update the docs once this settles |
||||||||||||||||||
/// - `requestBufferIsEmpty`: Also synchronized on `queue`. | ||||||||||||||||||
private nonisolated(unsafe) var requestBuffer: [UInt8] = [] | ||||||||||||||||||
|
||||||||||||||||||
|
@@ -136,45 +136,12 @@ public final class JSONRPCConnection: Connection { | |||||||||||||||||
state = .created | ||||||||||||||||||
self.messageRegistry = messageRegistry | ||||||||||||||||||
|
||||||||||||||||||
let ioGroup = DispatchGroup() | ||||||||||||||||||
|
||||||||||||||||||
#if os(Windows) | ||||||||||||||||||
let rawInFD = dispatch_fd_t(bitPattern: inFD._handle) | ||||||||||||||||||
#else | ||||||||||||||||||
let rawInFD = inFD.fileDescriptor | ||||||||||||||||||
#endif | ||||||||||||||||||
|
||||||||||||||||||
ioGroup.enter() | ||||||||||||||||||
receiveIO = DispatchIO( | ||||||||||||||||||
type: .stream, | ||||||||||||||||||
fileDescriptor: rawInFD, | ||||||||||||||||||
queue: queue, | ||||||||||||||||||
cleanupHandler: { (error: Int32) in | ||||||||||||||||||
if error != 0 { | ||||||||||||||||||
logger.fault("IO error \(error)") | ||||||||||||||||||
} | ||||||||||||||||||
ioGroup.leave() | ||||||||||||||||||
} | ||||||||||||||||||
) | ||||||||||||||||||
self.ioGroup = DispatchGroup() | ||||||||||||||||||
|
||||||||||||||||||
#if os(Windows) | ||||||||||||||||||
let rawOutFD = dispatch_fd_t(bitPattern: outFD._handle) | ||||||||||||||||||
#else | ||||||||||||||||||
let rawOutFD = outFD.fileDescriptor | ||||||||||||||||||
#endif | ||||||||||||||||||
|
||||||||||||||||||
ioGroup.enter() | ||||||||||||||||||
sendIO = DispatchIO( | ||||||||||||||||||
type: .stream, | ||||||||||||||||||
fileDescriptor: rawOutFD, | ||||||||||||||||||
queue: sendQueue, | ||||||||||||||||||
cleanupHandler: { (error: Int32) in | ||||||||||||||||||
if error != 0 { | ||||||||||||||||||
logger.fault("IO error \(error)") | ||||||||||||||||||
} | ||||||||||||||||||
ioGroup.leave() | ||||||||||||||||||
} | ||||||||||||||||||
) | ||||||||||||||||||
self.inFD = inFD | ||||||||||||||||||
self.outFD = outFD | ||||||||||||||||||
|
||||||||||||||||||
self.ioGroup.enter() | ||||||||||||||||||
|
||||||||||||||||||
ioGroup.notify(queue: queue) { [weak self] in | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the entire So, if we just executed this code inside Incidentally, now that I look at this, we might have been missing a few |
||||||||||||||||||
guard let self else { return } | ||||||||||||||||||
|
@@ -187,13 +154,6 @@ public final class JSONRPCConnection: Connection { | |||||||||||||||||
await self.closeHandler?() | ||||||||||||||||||
} | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
// We cannot assume the client will send us bytes in packets of any particular size, so set the lower limit to 1. | ||||||||||||||||||
receiveIO.setLimit(lowWater: 1) | ||||||||||||||||||
receiveIO.setLimit(highWater: Int.max) | ||||||||||||||||||
|
||||||||||||||||||
sendIO.setLimit(lowWater: 1) | ||||||||||||||||||
sendIO.setLimit(highWater: Int.max) | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
/// Creates and starts a `JSONRPCConnection` that connects to a subprocess launched with the specified arguments. | ||||||||||||||||||
|
@@ -293,27 +253,19 @@ public final class JSONRPCConnection: Connection { | |||||||||||||||||
state = .running | ||||||||||||||||||
self.receiveHandler = receiveHandler | ||||||||||||||||||
self.closeHandler = closeHandler | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
receiveIO.read(offset: 0, length: Int.max, queue: queue) { done, data, errorCode in | ||||||||||||||||||
guard errorCode == 0 else { | ||||||||||||||||||
#if !os(Windows) | ||||||||||||||||||
if errorCode != POSIXError.ECANCELED.rawValue { | ||||||||||||||||||
logger.fault("IO error reading \(errorCode)") | ||||||||||||||||||
self.inFD.readabilityHandler = { fileHandle in | ||||||||||||||||||
let data = fileHandle.availableData | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I unfortunately can’t remember the example when |
||||||||||||||||||
if data.isEmpty { | ||||||||||||||||||
fileHandle.readabilityHandler = nil | ||||||||||||||||||
self.queue.async { | ||||||||||||||||||
self.closeAssumingOnQueue() | ||||||||||||||||||
} | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we need to Also, |
||||||||||||||||||
#endif | ||||||||||||||||||
if done { self.closeAssumingOnQueue() } | ||||||||||||||||||
return | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
if done { | ||||||||||||||||||
self.closeAssumingOnQueue() | ||||||||||||||||||
return | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
guard let data = data, !data.isEmpty else { | ||||||||||||||||||
return | ||||||||||||||||||
} | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
self.queue.sync { | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm now using async in the last iteration, but I'm not sure I'm doing it right in each particular case There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do you think we shouldn’t be using |
||||||||||||||||||
orLog("Writing input mirror file") { | ||||||||||||||||||
try self.inputMirrorFile?.write(contentsOf: data) | ||||||||||||||||||
} | ||||||||||||||||||
|
@@ -554,16 +506,16 @@ public final class JSONRPCConnection: Connection { | |||||||||||||||||
orLog("Writing output mirror file") { | ||||||||||||||||||
try outputMirrorFile?.write(contentsOf: dispatchData) | ||||||||||||||||||
} | ||||||||||||||||||
sendIO.write(offset: 0, data: dispatchData, queue: sendQueue) { [weak self] done, _, errorCode in | ||||||||||||||||||
if errorCode != 0 { | ||||||||||||||||||
logger.fault("IO error sending message \(errorCode)") | ||||||||||||||||||
if done, let self { | ||||||||||||||||||
// An unrecoverable error occurs on the channel’s file descriptor. | ||||||||||||||||||
// Close the connection. | ||||||||||||||||||
self.queue.async { | ||||||||||||||||||
self.closeAssumingOnQueue() | ||||||||||||||||||
} | ||||||||||||||||||
sendQueue.sync { | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to my other |
||||||||||||||||||
do { | ||||||||||||||||||
try outFD.write(contentsOf: dispatchData) | ||||||||||||||||||
} catch { | ||||||||||||||||||
logger.fault("IO error sending message \(error.forLogging)") | ||||||||||||||||||
self.queue.async { | ||||||||||||||||||
self.ioGroup.leave() | ||||||||||||||||||
self.closeAssumingOnQueue() | ||||||||||||||||||
} | ||||||||||||||||||
return | ||||||||||||||||||
} | ||||||||||||||||||
} | ||||||||||||||||||
} | ||||||||||||||||||
|
@@ -646,7 +598,10 @@ public final class JSONRPCConnection: Connection { | |||||||||||||||||
/// The user-provided close handler will be called *asynchronously* when all outstanding I/O | ||||||||||||||||||
/// operations have completed. No new I/O will be accepted after `close` returns. | ||||||||||||||||||
public func close() { | ||||||||||||||||||
queue.sync { closeAssumingOnQueue() } | ||||||||||||||||||
queue.sync { | ||||||||||||||||||
closeAssumingOnQueue() | ||||||||||||||||||
ioGroup.leave() | ||||||||||||||||||
} | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
/// Close the connection, assuming that the code is already executing on `queue`. | ||||||||||||||||||
|
@@ -660,9 +615,12 @@ public final class JSONRPCConnection: Connection { | |||||||||||||||||
|
||||||||||||||||||
logger.log("Closing JSONRPCConnection...") | ||||||||||||||||||
// Attempt to close the reader immediately; we do not need to accept remaining inputs. | ||||||||||||||||||
receiveIO.close(flags: .stop) | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we set the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I'm doing this now but might be doing it wrong :) |
||||||||||||||||||
// Close the writer after it finishes outstanding work. | ||||||||||||||||||
sendIO.close() | ||||||||||||||||||
do { | ||||||||||||||||||
try outFD.close() | ||||||||||||||||||
} catch { | ||||||||||||||||||
logger.error("Failed to close outFD: \(error.forLogging)") | ||||||||||||||||||
} | ||||||||||||||||||
Comment on lines
+619
to
+623
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can be simplified slightly.
Suggested change
|
||||||||||||||||||
} | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we match the previous naming and call this
receiveFD
?inFD
can get confusing if you eg. set up aJSONRPCConnection
from SourceKit-LSP to a BSP server, in which case thereceiveFD
will be stdout of the BSP server process and thusreceiveFD
wouldn’t be stdin.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naming follows
JSONRPCConnection.init()
args names, are you suggesting we change these as well?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think changing the argument names
JSONRPCConnection.init()
would be my preference. Happy to change the argument names in a follow-up PR though, this PR is delicate enough that we don’t need to litter it with other renames. But I would prefer to name the members in herereceiveFD
andsendFD
now already.