diff --git a/Sources/Segment/Analytics.swift b/Sources/Segment/Analytics.swift index 0667f03f..4ea2b92b 100644 --- a/Sources/Segment/Analytics.swift +++ b/Sources/Segment/Analytics.swift @@ -212,40 +212,49 @@ extension Analytics { flushGroup.enter() apply { plugin in + // we want to enter as soon as possible. waiting to do it from + // another queue just takes too long. operatingMode.run(queue: configuration.values.flushQueue) { if let p = plugin as? FlushCompletion { - // this is async - // flush(group:completion:) handles the enter/leave. + // flush handles the groups enter/leave calls p.flush(group: flushGroup) { plugin in // we don't really care about the plugin value .. yet. } } else if let p = plugin as? EventPlugin { - // we have no idea if this will be async or not, assume it's sync. flushGroup.enter() + // we have no idea if this will be async or not, assume it's sync. p.flush() flushGroup.leave() } } } - // if we're not in server mode, we need to be notified when it's done. - if let completion, operatingMode != .synchronous { - // set up our callback to know when the group has completed, if we're not - // in .server operating mode. - flushGroup.notify(queue: configuration.values.flushQueue) { - DispatchQueue.main.async { completion() } - } - } - flushGroup.leave() // matches our initial enter(). - // if we ARE in server mode, we need to wait on the group. + // if we ARE in sync mode, we need to wait on the group. // This effectively ends up being a `sync` operation. if operatingMode == .synchronous { flushGroup.wait() // we need to call completion on our own since - // we skipped setting up notify. - if let completion { DispatchQueue.main.async { completion() }} + // we skipped setting up notify. we don't need to do it on + // .main since we are in synchronous mode. + if let completion { completion() } + } else if operatingMode == .asynchronous { + // if we're not, flip over to our serial queue, tell it to wait on the flush + // group to complete if we have a completion to hit. Otherwise, no need to + // wait on completion. + if let completion { + // NOTE: DispatchGroup's `notify` method on linux ended up getting called + // before the tasks have actually completed, so we went with this instead. + OperatingMode.defaultQueue.async { [weak self] in + let timedOut = flushGroup.wait(timeout: .now() + 15 /*seconds*/) + if timedOut == .timedOut { + self?.log(message: "flush(completion:) timed out waiting for completion.") + } + completion() + //DispatchQueue.main.async { completion() } + } + } } } @@ -437,16 +446,11 @@ extension OperatingMode { task() } case .synchronous: - // if for some reason, we're told to do all this stuff on - // main, ignore it, and use the default queue. this prevents - // a possible deadlock. - if queue === DispatchQueue.main { - OperatingMode.defaultQueue.asyncAndWait { - task() - } - } else { - queue.asyncAndWait { task() } - } + // in synchronous mode, always use our own queue to + // prevent deadlocks. + let workItem = DispatchWorkItem(block: task) + OperatingMode.defaultQueue.asyncAndWait(execute: workItem) } } } + diff --git a/Sources/Segment/Plugins/SegmentDestination.swift b/Sources/Segment/Plugins/SegmentDestination.swift index 91257438..0cf6fea6 100644 --- a/Sources/Segment/Plugins/SegmentDestination.swift +++ b/Sources/Segment/Plugins/SegmentDestination.swift @@ -124,8 +124,11 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion // don't flush if analytics is disabled. guard analytics.enabled == true else { return } + // enter for the high level flush, allow us time to run through any existing files.. + group.enter() + // Read events from file system - guard let data = storage.read(Storage.Constants.events) else { return } + guard let data = storage.read(Storage.Constants.events) else { group.leave(); return } eventCount = 0 cleanupUploads() @@ -134,9 +137,9 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion if pendingUploads == 0 { for url in data { - analytics.log(message: "Processing Batch:\n\(url.lastPathComponent)") - // enter the dispatch group + // enter for this url we're going to kick off group.enter() + analytics.log(message: "Processing Batch:\n\(url.lastPathComponent)") // set up the task let uploadTask = httpClient.startBatchUpload(writeKey: analytics.configuration.values.writeKey, batch: url) { (result) in switch result { @@ -154,7 +157,7 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion self.cleanupUploads() // call the completion completion(self) - // leave the dispatch group + // leave for the url we kicked off. group.leave() } // we have a legit upload in progress now, so add it to our list. @@ -165,6 +168,9 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion } else { analytics.log(message: "Skipping processing; Uploads in progress.") } + + // leave for the high level flush + group.leave() } } diff --git a/Sources/Segment/Utilities/Utils.swift b/Sources/Segment/Utilities/Utils.swift index 3e8d0387..e317738e 100644 --- a/Sources/Segment/Utilities/Utils.swift +++ b/Sources/Segment/Utilities/Utils.swift @@ -7,6 +7,17 @@ import Foundation +#if os(Linux) +extension DispatchQueue { + func asyncAndWait(execute workItem: DispatchWorkItem) { + async { + workItem.perform() + } + workItem.wait() + } +} +#endif + /// Inquire as to whether we are within a Unit Testing environment. #if DEBUG internal var isUnitTesting: Bool = { @@ -58,3 +69,36 @@ extension Optional: Flattenable { } } +class TrackingDispatchGroup: CustomStringConvertible { + internal let group = DispatchGroup() + + var description: String { + return "DispatchGroup Enters: \(enters), Leaves: \(leaves)" + } + + var enters: Int = 0 + var leaves: Int = 0 + var current: Int = 0 + + func enter() { + enters += 1 + current += 1 + group.enter() + } + + func leave() { + leaves += 1 + current -= 1 + group.leave() + } + + init() { } + + func wait() { + group.wait() + } + + public func notify(qos: DispatchQoS = .unspecified, flags: DispatchWorkItemFlags = [], queue: DispatchQueue, execute work: @escaping @convention(block) () -> Void) { + group.notify(qos: qos, flags: flags, queue: queue, execute: work) + } +} diff --git a/Tests/Segment-Tests/Analytics_Tests.swift b/Tests/Segment-Tests/Analytics_Tests.swift index 1db8c2d5..64fb1b44 100644 --- a/Tests/Segment-Tests/Analytics_Tests.swift +++ b/Tests/Segment-Tests/Analytics_Tests.swift @@ -692,9 +692,38 @@ final class Analytics_Tests: XCTestCase { } - func testServerOperatingMode() { + func testAsyncOperatingMode() { // Use a specific writekey to this test so we do not collide with other cached items. - let analytics = Analytics(configuration: Configuration(writeKey: "testFlush_serverMode") + let analytics = Analytics(configuration: Configuration(writeKey: "testFlush_asyncMode") + .flushInterval(9999) + .flushAt(9999) + .operatingMode(.asynchronous)) + + waitUntilStarted(analytics: analytics) + + analytics.storage.hardReset(doYouKnowHowToUseThis: true) + + @Atomic var completionCalled = false + + // put an event in the pipe ... + analytics.track(name: "completion test1") + // flush it, that'll get us an upload going + analytics.flush { + // verify completion is called. + completionCalled = true + } + + while !completionCalled { + RunLoop.main.run(until: Date.distantPast) + } + + XCTAssertTrue(completionCalled) + XCTAssertEqual(analytics.pendingUploads!.count, 0) + } + + func testSyncOperatingMode() { + // Use a specific writekey to this test so we do not collide with other cached items. + let analytics = Analytics(configuration: Configuration(writeKey: "testFlush_syncMode") .flushInterval(9999) .flushAt(9999) .operatingMode(.synchronous)) diff --git a/Tests/Segment-Tests/StressTests.swift b/Tests/Segment-Tests/StressTests.swift index 75aa393d..651955c3 100644 --- a/Tests/Segment-Tests/StressTests.swift +++ b/Tests/Segment-Tests/StressTests.swift @@ -69,7 +69,8 @@ class StressTests: XCTestCase { let event = "write queue 1: \(eventsWritten)" analytics.track(name: event) eventsWritten += 1 - usleep(0001) + //usleep(0001) + RunLoop.main.run(until: Date.distantPast) } print("queue 1 wrote \(eventsWritten) events.") queue1Done = true @@ -82,7 +83,8 @@ class StressTests: XCTestCase { let event = "write queue 2: \(eventsWritten)" analytics.track(name: event) eventsWritten += 1 - usleep(0001) + //usleep(0001) + RunLoop.main.run(until: Date.distantPast) } print("queue 2 wrote \(eventsWritten) events.") queue2Done = true @@ -91,10 +93,12 @@ class StressTests: XCTestCase { flushQueue.async { while (ready == false) { usleep(1) } var counter = 0 - sleep(1) + //sleep(1) + RunLoop.main.run(until: Date(timeIntervalSinceNow: 1)) while (queue1Done == false || queue2Done == false) { let sleepTime = UInt32.random(in: 1..<3000) - usleep(sleepTime) + //usleep(sleepTime) + RunLoop.main.run(until: Date(timeIntervalSinceNow: Double(sleepTime / 1000) )) analytics.flush() counter += 1 }