Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 95 additions & 90 deletions Sources/ServiceLifecycle/ServiceGroup.swift
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ public actor ServiceGroup: Sendable {
group: &group,
gracefulShutdownManagers: gracefulShutdownManagers
)
return .success(())
} catch {
return .failure(error)
}
Expand Down Expand Up @@ -425,6 +426,7 @@ public actor ServiceGroup: Sendable {
group: &group,
gracefulShutdownManagers: gracefulShutdownManagers
)
return .success(())
} catch {
return .failure(error)
}
Expand All @@ -451,6 +453,7 @@ public actor ServiceGroup: Sendable {
group: &group,
gracefulShutdownManagers: gracefulShutdownManagers
)
return .success(())
} catch {
return .failure(error)
}
Expand Down Expand Up @@ -511,7 +514,7 @@ public actor ServiceGroup: Sendable {
// We have to shutdown the services in reverse. To do this
// we are going to signal each child task the graceful shutdown and then wait for
// its exit.
for (gracefulShutdownIndex, gracefulShutdownManager) in gracefulShutdownManagers.lazy.enumerated().reversed() {
gracefulShutdownLoop: for (gracefulShutdownIndex, gracefulShutdownManager) in gracefulShutdownManagers.lazy.enumerated().reversed() {
guard let service = services[gracefulShutdownIndex] else {
self.logger.debug(
"Service already finished. Skipping shutdown"
Expand All @@ -527,112 +530,114 @@ public actor ServiceGroup: Sendable {

gracefulShutdownManager.shutdownGracefully()

let result = try await group.next()
while let result = try await group.next() {
switch result {
case .serviceFinished(let service, let index):
if group.isCancelled {
// The group is cancelled and we expect all services to finish
continue gracefulShutdownLoop
}

switch result {
case .serviceFinished(let service, let index):
if group.isCancelled {
// The group is cancelled and we expect all services to finish
continue
}
if index == gracefulShutdownIndex {
// The service that we signalled graceful shutdown did exit/
// We can continue to the next one.
self.logger.debug(
"Service finished",
metadata: [
self.loggingConfiguration.keys.serviceKey: "\(service.service)",
]
)
continue gracefulShutdownLoop
} else {
// Another service exited unexpectedly
self.logger.debug(
"Service finished unexpectedly during graceful shutdown. Cancelling all other services now",
metadata: [
self.loggingConfiguration.keys.serviceKey: "\(service.service)",
]
)

if index == gracefulShutdownIndex {
// The service that we signalled graceful shutdown did exit/
// We can continue to the next one.
self.logger.debug(
"Service finished",
metadata: [
self.loggingConfiguration.keys.serviceKey: "\(service.service)",
]
)
continue
} else {
// Another service exited unexpectedly
self.logger.debug(
"Service finished unexpectedly during graceful shutdown. Cancelling all other services now",
metadata: [
self.loggingConfiguration.keys.serviceKey: "\(service.service)",
]
)
self.cancelGroupAndSpawnTimeoutIfNeeded(group: &group, cancellationTimeoutTask: &cancellationTimeoutTask)
throw ServiceGroupError.serviceFinishedUnexpectedly()
}

self.cancelGroupAndSpawnTimeoutIfNeeded(group: &group, cancellationTimeoutTask: &cancellationTimeoutTask)
throw ServiceGroupError.serviceFinishedUnexpectedly()
}
case .serviceThrew(let service, _, let serviceError):
switch service.failureTerminationBehavior.behavior {
case .cancelGroup:
self.logger.debug(
"Service threw error during graceful shutdown. Cancelling group.",
metadata: [
self.loggingConfiguration.keys.serviceKey: "\(service.service)",
self.loggingConfiguration.keys.errorKey: "\(serviceError)",
]
)
group.cancelAll()
throw serviceError

case .serviceThrew(let service, _, let serviceError):
switch service.failureTerminationBehavior.behavior {
case .cancelGroup:
self.logger.debug(
"Service threw error during graceful shutdown. Cancelling group.",
metadata: [
self.loggingConfiguration.keys.serviceKey: "\(service.service)",
self.loggingConfiguration.keys.errorKey: "\(serviceError)",
]
)
group.cancelAll()
throw serviceError
case .gracefullyShutdownGroup:
self.logger.debug(
"Service threw error during graceful shutdown.",
metadata: [
self.loggingConfiguration.keys.serviceKey: "\(service.service)",
self.loggingConfiguration.keys.errorKey: "\(serviceError)",
]
)

case .gracefullyShutdownGroup:
self.logger.debug(
"Service threw error during graceful shutdown.",
metadata: [
self.loggingConfiguration.keys.serviceKey: "\(service.service)",
self.loggingConfiguration.keys.errorKey: "\(serviceError)",
]
)
if error == nil {
error = serviceError
}

if error == nil {
error = serviceError
// We can continue shutting down the next service now
continue gracefulShutdownLoop

case .ignore:
self.logger.debug(
"Service threw error during graceful shutdown.",
metadata: [
self.loggingConfiguration.keys.serviceKey: "\(service.service)",
self.loggingConfiguration.keys.errorKey: "\(serviceError)",
]
)

// We can continue shutting down the next service now
continue gracefulShutdownLoop
}

case .ignore:
self.logger.debug(
"Service threw error during graceful shutdown.",
metadata: [
self.loggingConfiguration.keys.serviceKey: "\(service.service)",
self.loggingConfiguration.keys.errorKey: "\(serviceError)",
]
)
case .signalCaught(let signal):
if self.cancellationSignals.contains(signal) {
// We got signalled cancellation after graceful shutdown
self.logger.debug(
"Signal caught. Cancelling the group.",
metadata: [
self.loggingConfiguration.keys.signalKey: "\(signal)",
]
)

continue
}
self.cancelGroupAndSpawnTimeoutIfNeeded(group: &group, cancellationTimeoutTask: &cancellationTimeoutTask)
}

case .signalCaught(let signal):
if self.cancellationSignals.contains(signal) {
// We got signalled cancellation after graceful shutdown
case .gracefulShutdownTimedOut:
// Gracefully shutting down took longer than the user configured
// so we have to escalate it now.
self.logger.debug(
"Signal caught. Cancelling the group.",
"Graceful shutdown took longer than allowed by the configuration. Cancelling the group now.",
metadata: [
self.loggingConfiguration.keys.signalKey: "\(signal)",
self.loggingConfiguration.keys.serviceKey: "\(service.service)",
]
)

self.cancelGroupAndSpawnTimeoutIfNeeded(group: &group, cancellationTimeoutTask: &cancellationTimeoutTask)
}

case .gracefulShutdownTimedOut:
// Gracefully shutting down took longer than the user configured
// so we have to escalate it now.
self.logger.debug(
"Graceful shutdown took longer than allowed by the configuration. Cancelling the group now.",
metadata: [
self.loggingConfiguration.keys.serviceKey: "\(service.service)",
]
)
self.cancelGroupAndSpawnTimeoutIfNeeded(group: &group, cancellationTimeoutTask: &cancellationTimeoutTask)

case .cancellationCaught:
// We caught cancellation in our child task so we have to spawn
// our cancellation timeout task if needed
self.logger.debug("Caught cancellation.")
self.cancelGroupAndSpawnTimeoutIfNeeded(group: &group, cancellationTimeoutTask: &cancellationTimeoutTask)

case .signalSequenceFinished, .gracefulShutdownCaught, .gracefulShutdownFinished:
// We just have to tolerate this since signals and parent graceful shutdowns downs can race.
continue
case .cancellationCaught:
// We caught cancellation in our child task so we have to spawn
// our cancellation timeout task if needed
self.logger.debug("Caught cancellation.")
self.cancelGroupAndSpawnTimeoutIfNeeded(group: &group, cancellationTimeoutTask: &cancellationTimeoutTask)

case nil:
fatalError("Invalid result from group.next().")
case .signalSequenceFinished, .gracefulShutdownCaught, .gracefulShutdownFinished:
// We just have to tolerate this since signals and parent graceful shutdowns downs can race.
// We are going to continue the
break
}
}
}

Expand Down
34 changes: 28 additions & 6 deletions Tests/ServiceLifecycleTests/ServiceGroupTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -320,12 +320,10 @@ final class ServiceGroupTests: XCTestCase {
await service2.resumeRunContinuation(with: .success(()))

// Waiting to see that the remaining is still running
await XCTAsyncAssertEqual(await eventIterator1.next(), .shutdownGracefully)
service1.sendPing()
await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing)

// The first service should now receive the signal
await XCTAsyncAssertEqual(await eventIterator1.next(), .shutdownGracefully)

// Waiting to see that the one remaining are still running
service1.sendPing()
await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing)
Expand Down Expand Up @@ -523,12 +521,10 @@ final class ServiceGroupTests: XCTestCase {
await service3.resumeRunContinuation(with: .success(()))

// Waiting to see that the remaining is still running
await XCTAsyncAssertEqual(await eventIterator1.next(), .shutdownGracefully)
service1.sendPing()
await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing)

// The first service should now receive the signal
await XCTAsyncAssertEqual(await eventIterator1.next(), .shutdownGracefully)

// Waiting to see that the one remaining are still running
service1.sendPing()
await XCTAsyncAssertEqual(await eventIterator1.next(), .runPing)
Expand Down Expand Up @@ -1112,6 +1108,32 @@ final class ServiceGroupTests: XCTestCase {
}
}

func testGracefulShutdownWithMaximumDuration() async throws {
let mockService = MockService(description: "Service1")
let serviceGroup = self.makeServiceGroup(
services: [.init(service: mockService)],
gracefulShutdownSignals: [.sigalrm],
maximumGracefulShutdownDuration: .seconds(0.1)
)

try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
try await serviceGroup.run()
}

var eventIterator = mockService.events.makeAsyncIterator()
await XCTAsyncAssertEqual(await eventIterator.next(), .run)

await serviceGroup.triggerGracefulShutdown()

await XCTAsyncAssertEqual(await eventIterator.next(), .shutdownGracefully)

await mockService.resumeRunContinuation(with: .success(()))

try await XCTAsyncAssertNoThrow(await group.next())
}
}

func testGracefulShutdownEscalation_whenNoCancellationEscalation() async throws {
let mockService = MockService(description: "Service1")
let serviceGroup = self.makeServiceGroup(
Expand Down