From 28426180706c1b8569886d669cb4db2fb0ed94fa Mon Sep 17 00:00:00 2001 From: Brandon Sneed Date: Tue, 7 May 2024 14:00:45 -0700 Subject: [PATCH 1/9] Adding more stressful tests for debugging. --- .../Storage/Types/DirectoryStore.swift | 4 + Tests/Segment-Tests/StressTests.swift | 136 +++++++++++++++++- 2 files changed, 137 insertions(+), 3 deletions(-) diff --git a/Sources/Segment/Utilities/Storage/Types/DirectoryStore.swift b/Sources/Segment/Utilities/Storage/Types/DirectoryStore.swift index 26f062c2..6f70464c 100644 --- a/Sources/Segment/Utilities/Storage/Types/DirectoryStore.swift +++ b/Sources/Segment/Utilities/Storage/Types/DirectoryStore.swift @@ -8,6 +8,8 @@ import Foundation public class DirectoryStore: DataStore { + internal static var fileValidator: ((URL) -> Void)? = nil + public typealias StoreConfiguration = Configuration public struct Configuration { @@ -178,6 +180,8 @@ extension DirectoryStore { try? FileManager.default.moveItem(at: url, to: newURL) self.writer = nil incrementIndex() + + DirectoryStore.fileValidator?(newURL) } } diff --git a/Tests/Segment-Tests/StressTests.swift b/Tests/Segment-Tests/StressTests.swift index 36ed6454..22331f4a 100644 --- a/Tests/Segment-Tests/StressTests.swift +++ b/Tests/Segment-Tests/StressTests.swift @@ -20,6 +20,95 @@ class StressTests: XCTestCase { // Linux doesn't know what URLProtocol is and on watchOS it somehow works differently and isn't hit. #if !os(Linux) && !os(watchOS) + func testDirectoryStorageStress2() throws { + // register our network blocker + guard URLProtocol.registerClass(BlockNetworkCalls.self) else { XCTFail(); return } + + let analytics = Analytics(configuration: Configuration(writeKey: "stressTest").errorHandler({ error in + XCTFail("Storage Error: \(error)") + })) + analytics.storage.hardReset(doYouKnowHowToUseThis: true) + analytics.storage.onFinish = { url in + // check that each one is valid json + do { + let json = try Data(contentsOf: url) + _ = try JSONSerialization.jsonObject(with: json) + } catch { + XCTFail("\(error) in \(url)") + } + } + + DirectoryStore.fileValidator = { url in + do { + let eventBundle = try JSONSerialization.jsonObject(with: Data(contentsOf: url)) + XCTAssertNotNil(eventBundle, "The event bundle parsed out to null. \(url)") + } catch { + XCTFail("Unable to parse JSON bundle; It must be corrupt! \(error), \(url)") + } + } + + waitUntilStarted(analytics: analytics) + + // set the httpclient to use our blocker session + let segment = analytics.find(pluginType: SegmentDestination.self) + let configuration = URLSessionConfiguration.ephemeral + configuration.allowsCellularAccess = true + configuration.timeoutIntervalForResource = 30 + configuration.timeoutIntervalForRequest = 60 + configuration.httpMaximumConnectionsPerHost = 2 + configuration.protocolClasses = [BlockNetworkCalls.self] + configuration.httpAdditionalHeaders = ["Content-Type": "application/json; charset=utf-8", + "Authorization": "Basic test", + "User-Agent": "analytics-ios/\(Analytics.version())"] + let blockSession = URLSession(configuration: configuration, delegate: nil, delegateQueue: nil) + segment?.httpClient?.session = blockSession + + @Atomic var ready = false + var queues = [DispatchQueue]() + for i in 0..<30 { + queues.append(DispatchQueue(label: "write queue \(i))", attributes: .concurrent)) + } + + let group = DispatchGroup() + group.enter() + + let lock = NSLock() + var eventsWritten = 0 + let writeBlock: (Int) -> Void = { queueNum in + analytics.track(name: "dummy event") + lock.lock() + eventsWritten += 1 + lock.unlock() + } + + // schedule a bunch of events to go out + for i in 0..<1_000_000 { + let randomInt = Int.random(in: 0..<30) + let queue = queues[randomInt] + group.enter() + queue.async { + writeBlock(randomInt) + group.leave() + } + } + + group.notify(queue: DispatchQueue.main) { + ready = false + print("\(eventsWritten) events written, across 30 queues.") + print("all queues finished.") + } + + ready = true + + group.leave() + + while (ready) { + RunLoop.main.run(until: Date.distantPast) + } + + analytics.purgeStorage() + } + func testDirectoryStorageStress() throws { // register our network blocker guard URLProtocol.registerClass(BlockNetworkCalls.self) else { XCTFail(); return } @@ -37,6 +126,15 @@ class StressTests: XCTestCase { XCTFail("\(error) in \(url)") } } + + DirectoryStore.fileValidator = { url in + do { + let eventBundle = try JSONSerialization.jsonObject(with: Data(contentsOf: url)) + XCTAssertNotNil(eventBundle, "The event bundle parsed out to null. \(url)") + } catch { + XCTFail("Unable to parse JSON bundle; It must be corrupt! \(url)") + } + } waitUntilStarted(analytics: analytics) @@ -54,13 +152,17 @@ class StressTests: XCTestCase { let blockSession = URLSession(configuration: configuration, delegate: nil, delegateQueue: nil) segment?.httpClient?.session = blockSession - let writeQueue1 = DispatchQueue(label: "write queue 1") - let writeQueue2 = DispatchQueue(label: "write queue 2") + let writeQueue1 = DispatchQueue(label: "write queue 1", attributes: .concurrent) + let writeQueue2 = DispatchQueue(label: "write queue 2", attributes: .concurrent) + let writeQueue3 = DispatchQueue(label: "write queue 3", attributes: .concurrent) + let writeQueue4 = DispatchQueue(label: "write queue 4", attributes: .concurrent) let flushQueue = DispatchQueue(label: "flush queue") @Atomic var ready = false @Atomic var queue1Done = false @Atomic var queue2Done = false + @Atomic var queue3Done = false + @Atomic var queue4Done = false writeQueue1.async { while (ready == false) { usleep(1) } @@ -90,12 +192,40 @@ class StressTests: XCTestCase { queue2Done = true } + writeQueue3.async { + while (ready == false) { usleep(1) } + var eventsWritten = 0 + while (eventsWritten < 10000) { + let event = "write queue 3: \(eventsWritten)" + analytics.track(name: event) + eventsWritten += 1 + //usleep(0001) + RunLoop.main.run(until: Date.distantPast) + } + print("queue 3 wrote \(eventsWritten) events.") + queue3Done = true + } + + writeQueue4.async { + while (ready == false) { usleep(1) } + var eventsWritten = 0 + while (eventsWritten < 10000) { + let event = "write queue 4: \(eventsWritten)" + analytics.track(name: event) + eventsWritten += 1 + //usleep(0001) + RunLoop.main.run(until: Date.distantPast) + } + print("queue 4 wrote \(eventsWritten) events.") + queue4Done = true + } + flushQueue.async { while (ready == false) { usleep(1) } var counter = 0 //sleep(1) RunLoop.main.run(until: Date(timeIntervalSinceNow: 1)) - while (queue1Done == false || queue2Done == false) { + while (queue1Done == false || queue2Done == false || queue3Done == false || queue4Done == false) { let sleepTime = UInt32.random(in: 1..<3000) //usleep(sleepTime) RunLoop.main.run(until: Date(timeIntervalSinceNow: Double(sleepTime / 1000) )) From cd52ee03e6c8f264334931eb0049b6555559f1f7 Mon Sep 17 00:00:00 2001 From: Brandon Sneed Date: Wed, 8 May 2024 09:09:41 -0700 Subject: [PATCH 2/9] Remove unused onFinish closure. --- Sources/Segment/Utilities/Storage/Storage.swift | 1 - 1 file changed, 1 deletion(-) diff --git a/Sources/Segment/Utilities/Storage/Storage.swift b/Sources/Segment/Utilities/Storage/Storage.swift index 12fd4d1d..3cd37432 100644 --- a/Sources/Segment/Utilities/Storage/Storage.swift +++ b/Sources/Segment/Utilities/Storage/Storage.swift @@ -13,7 +13,6 @@ internal class Storage: Subscriber { let userDefaults: UserDefaults static let MAXFILESIZE = 475000 // Server accepts max 500k per batch - internal var onFinish: ((URL) -> Void)? = nil internal weak var analytics: Analytics? = nil internal let dataStore: TransientDB From e75f0b90e78b2ca7a01a3d109b1ed73591fa7ad7 Mon Sep 17 00:00:00 2001 From: Brandon Sneed Date: Thu, 9 May 2024 10:50:09 -0700 Subject: [PATCH 3/9] Updated references to atomic --- Sources/Segment/Analytics.swift | 10 ++- .../Platforms/Vendors/AppleUtils.swift | 6 +- .../Platforms/iOS/iOSLifecycleEvents.swift | 4 +- .../Segment/Plugins/SegmentDestination.swift | 4 +- Sources/Segment/Plugins/StartupQueue.swift | 2 +- Sources/Segment/Utilities/Atomic.swift | 68 +++++++++++++------ .../Policies/CountBasedFlushPolicy.swift | 4 +- Sources/Segment/Utilities/QueueTimer.swift | 4 +- Sources/Segment/Utilities/UserAgent.swift | 9 ++- Tests/Segment-Tests/Atomic_Tests.swift | 2 +- Tests/Segment-Tests/FlushPolicy_Tests.swift | 2 +- Tests/Segment-Tests/Storage_Tests.swift | 2 +- Tests/Segment-Tests/StressTests.swift | 68 ++++++------------- 13 files changed, 98 insertions(+), 87 deletions(-) diff --git a/Sources/Segment/Analytics.swift b/Sources/Segment/Analytics.swift index 85dd400c..031ae6a4 100644 --- a/Sources/Segment/Analytics.swift +++ b/Sources/Segment/Analytics.swift @@ -423,12 +423,16 @@ extension Analytics { } internal static func addActiveWriteKey(_ writeKey: String) { - Self.activeWriteKeys.append(writeKey) + Self._activeWriteKeys.mutate { keys in + keys.append(writeKey) + } } internal static func removeActiveWriteKey(_ writeKey: String) { - Self.activeWriteKeys.removeAll { key in - writeKey == key + Self._activeWriteKeys.mutate { keys in + keys.removeAll { key in + writeKey == key + } } } } diff --git a/Sources/Segment/Plugins/Platforms/Vendors/AppleUtils.swift b/Sources/Segment/Plugins/Platforms/Vendors/AppleUtils.swift index 901b5ace..0c5106a2 100644 --- a/Sources/Segment/Plugins/Platforms/Vendors/AppleUtils.swift +++ b/Sources/Segment/Plugins/Platforms/Vendors/AppleUtils.swift @@ -348,17 +348,17 @@ internal class ConnectionMonitor { SCNetworkReachabilityCreateWithAddress(nil, zeroSockAddress) } }) else { - connectionStatus = .unknown + _connectionStatus.set(.unknown) return } var flags : SCNetworkReachabilityFlags = [] if !SCNetworkReachabilityGetFlags(defaultRouteReachability, &flags) { - connectionStatus = .unknown + _connectionStatus.set(.unknown) return } - connectionStatus = ConnectionStatus(reachabilityFlags: flags) + _connectionStatus.set(ConnectionStatus(reachabilityFlags: flags)) } } diff --git a/Sources/Segment/Plugins/Platforms/iOS/iOSLifecycleEvents.swift b/Sources/Segment/Plugins/Platforms/iOS/iOSLifecycleEvents.swift index edfb91d7..dcc340e5 100644 --- a/Sources/Segment/Plugins/Platforms/iOS/iOSLifecycleEvents.swift +++ b/Sources/Segment/Plugins/Platforms/iOS/iOSLifecycleEvents.swift @@ -28,7 +28,7 @@ class iOSLifecycleEvents: PlatformPlugin, iOSLifecycle { // Make sure we aren't double calling application:didFinishLaunchingWithOptions // by resetting the check at the start - didFinishLaunching = true + _didFinishLaunching.set(true) if analytics?.configuration.values.trackApplicationLifecycleEvents == false { return @@ -88,7 +88,7 @@ class iOSLifecycleEvents: PlatformPlugin, iOSLifecycle { } func applicationDidEnterBackground(application: UIApplication?) { - didFinishLaunching = false + _didFinishLaunching.set(false) if analytics?.configuration.values.trackApplicationLifecycleEvents == false { return } diff --git a/Sources/Segment/Plugins/SegmentDestination.swift b/Sources/Segment/Plugins/SegmentDestination.swift index 1ab2260f..8a741dcf 100644 --- a/Sources/Segment/Plugins/SegmentDestination.swift +++ b/Sources/Segment/Plugins/SegmentDestination.swift @@ -116,7 +116,7 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion guard let storage = self.storage else { return } // Send Event to File System storage.write(.events, value: event) - self._eventCount.withValue { count in + self._eventCount.mutate { count in count += 1 } } @@ -135,7 +135,7 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion // don't flush if analytics is disabled. guard analytics.enabled == true else { return } - eventCount = 0 + _eventCount.set(0) cleanupUploads() let type = storage.dataStore.transactionType diff --git a/Sources/Segment/Plugins/StartupQueue.swift b/Sources/Segment/Plugins/StartupQueue.swift index 8f316f8e..6e7a3479 100644 --- a/Sources/Segment/Plugins/StartupQueue.swift +++ b/Sources/Segment/Plugins/StartupQueue.swift @@ -47,7 +47,7 @@ public class StartupQueue: Plugin, Subscriber { extension StartupQueue { internal func runningUpdate(state: System) { - running = state.running + _running.set(state.running) if state.running { replayEvents() } diff --git a/Sources/Segment/Utilities/Atomic.swift b/Sources/Segment/Utilities/Atomic.swift index 1dc6c07e..922c4819 100644 --- a/Sources/Segment/Utilities/Atomic.swift +++ b/Sources/Segment/Utilities/Atomic.swift @@ -7,34 +7,62 @@ import Foundation -// NOTE: Revised from previous implementation which used a struct and NSLock's. -// Thread Sanitizer was *correctly* capturing this issue, which was a little obscure -// given the property wrapper PLUS the semantics of a struct. Moving to `class` -// removes the semantics problem and lets TSan approve of what's happening. -// -// Additionally, moving to a lock free version is just desirable, so moved to a queue. -// -// Also see thread here: https://github.com/apple/swift-evolution/pull/1387 +/* + Revised the implementation yet again. Tiziano Coriano noticed that this wrapper + can be misleading about it's atomicity. A single set would be atomic, but a compound + operation like += would cause an atomic read, and a separate atomic write, in which + point another thread could've changed the value we're now working off of. + + This implementation removes the ability to set wrappedValue, and callers now must use + the set() or mutate() functions explicitly to ensure a proper atomic mutation. + + The use of a dispatch queue was also removed in favor of an unfair lock (yes, it's + implemented correctly). + */ @propertyWrapper public class Atomic { - private var value: T - private let queue = DispatchQueue(label: "com.segment.atomic.\(UUID().uuidString)") - + internal typealias os_unfair_lock_t = UnsafeMutablePointer + internal var unfairLock: os_unfair_lock_t + + internal var value: T + public init(wrappedValue value: T) { + self.unfairLock = UnsafeMutablePointer.allocate(capacity: 1) + self.unfairLock.initialize(to: os_unfair_lock()) self.value = value } - + + deinit { + unfairLock.deallocate() + } + public var wrappedValue: T { - get { return queue.sync { return value } } - set { queue.sync { value = newValue } } + get { + lock() + defer { unlock() } + return value + } + // set is not allowed, use set() or mutate() } + + public func set(_ newValue: T) { + mutate { $0 = newValue } + } + + public func mutate(_ mutation: (inout T) -> Void) { + lock() + defer { unlock() } + mutation(&value) + } +} - @discardableResult - public func withValue(_ operation: (inout T) -> Void) -> T { - queue.sync { - operation(&self.value) - return self.value - } +extension Atomic { + internal func lock() { + os_unfair_lock_lock(unfairLock) + } + + internal func unlock() { + os_unfair_lock_unlock(unfairLock) } } diff --git a/Sources/Segment/Utilities/Policies/CountBasedFlushPolicy.swift b/Sources/Segment/Utilities/Policies/CountBasedFlushPolicy.swift index 0a07edf1..d5be3d0d 100644 --- a/Sources/Segment/Utilities/Policies/CountBasedFlushPolicy.swift +++ b/Sources/Segment/Utilities/Policies/CountBasedFlushPolicy.swift @@ -37,12 +37,12 @@ public class CountBasedFlushPolicy: FlushPolicy { } public func updateState(event: RawEvent) { - _count.withValue { value in + _count.mutate { value in value += 1 } } public func reset() { - count = 0 + _count.set(0) } } diff --git a/Sources/Segment/Utilities/QueueTimer.swift b/Sources/Segment/Utilities/QueueTimer.swift index da6d97fd..4f01a700 100644 --- a/Sources/Segment/Utilities/QueueTimer.swift +++ b/Sources/Segment/Utilities/QueueTimer.swift @@ -57,7 +57,7 @@ internal class QueueTimer { if state == .suspended { return } - state = .suspended + _state.set(.suspended) timer.suspend() } @@ -65,7 +65,7 @@ internal class QueueTimer { if state == .resumed { return } - state = .resumed + _state.set(.resumed) timer.resume() } } diff --git a/Sources/Segment/Utilities/UserAgent.swift b/Sources/Segment/Utilities/UserAgent.swift index 60ab8a5e..6530f2b9 100644 --- a/Sources/Segment/Utilities/UserAgent.swift +++ b/Sources/Segment/Utilities/UserAgent.swift @@ -35,13 +35,18 @@ internal struct UserAgent { private static let defaultWebKitAppName = "" #endif - internal static var _value: String = "" + @Atomic internal static var _value: String = "" + internal static let lock = NSLock() public static var value: String { + lock.lock() + defer { lock.unlock() } + if _value.isEmpty { - _value = value(applicationName: defaultWebKitAppName) + __value.set(value(applicationName: defaultWebKitAppName)) } return _value + //return "someUserAgent" } private static func version() -> String { diff --git a/Tests/Segment-Tests/Atomic_Tests.swift b/Tests/Segment-Tests/Atomic_Tests.swift index 6e1d1216..d6b420b0 100644 --- a/Tests/Segment-Tests/Atomic_Tests.swift +++ b/Tests/Segment-Tests/Atomic_Tests.swift @@ -13,7 +13,7 @@ final class Atomic_Tests: XCTestCase { // `queue.sync { counter = oldValue + 1 }` // And the threads are free to suspend in between the two calls to `queue.sync`. - _counter.withValue { value in + _counter.mutate { value in value += 1 } } diff --git a/Tests/Segment-Tests/FlushPolicy_Tests.swift b/Tests/Segment-Tests/FlushPolicy_Tests.swift index 0de096c0..636a5792 100644 --- a/Tests/Segment-Tests/FlushPolicy_Tests.swift +++ b/Tests/Segment-Tests/FlushPolicy_Tests.swift @@ -142,7 +142,7 @@ class FlushPolicyTests: XCTestCase { RunLoop.main.run(until: Date.distantPast) if analytics.pendingUploads!.count > 0 { // flush was triggered - flushSent = true + _flushSent.set(true) } } diff --git a/Tests/Segment-Tests/Storage_Tests.swift b/Tests/Segment-Tests/Storage_Tests.swift index d4c89894..4d6cb7e7 100644 --- a/Tests/Segment-Tests/Storage_Tests.swift +++ b/Tests/Segment-Tests/Storage_Tests.swift @@ -290,7 +290,7 @@ class StorageTests: XCTestCase { @Atomic var done = false analytics.flush { print("flush completed") - done = true + _done.set(true) } while !done { diff --git a/Tests/Segment-Tests/StressTests.swift b/Tests/Segment-Tests/StressTests.swift index 22331f4a..f4553621 100644 --- a/Tests/Segment-Tests/StressTests.swift +++ b/Tests/Segment-Tests/StressTests.swift @@ -24,19 +24,11 @@ class StressTests: XCTestCase { // register our network blocker guard URLProtocol.registerClass(BlockNetworkCalls.self) else { XCTFail(); return } - let analytics = Analytics(configuration: Configuration(writeKey: "stressTest").errorHandler({ error in + let analytics = Analytics(configuration: Configuration(writeKey: "stressTest2").errorHandler({ error in XCTFail("Storage Error: \(error)") })) + analytics.purgeStorage() analytics.storage.hardReset(doYouKnowHowToUseThis: true) - analytics.storage.onFinish = { url in - // check that each one is valid json - do { - let json = try Data(contentsOf: url) - _ = try JSONSerialization.jsonObject(with: json) - } catch { - XCTFail("\(error) in \(url)") - } - } DirectoryStore.fileValidator = { url in do { @@ -83,22 +75,22 @@ class StressTests: XCTestCase { // schedule a bunch of events to go out for i in 0..<1_000_000 { - let randomInt = Int.random(in: 0..<30) - let queue = queues[randomInt] - group.enter() - queue.async { - writeBlock(randomInt) - group.leave() - } + let randomInt = Int.random(in: 0..<30) + let queue = queues[randomInt] + group.enter() + queue.async { + writeBlock(randomInt) + group.leave() + } } group.notify(queue: DispatchQueue.main) { - ready = false + _ready.set(false) print("\(eventsWritten) events written, across 30 queues.") print("all queues finished.") } - ready = true + _ready.set(true) group.leave() @@ -117,15 +109,6 @@ class StressTests: XCTestCase { XCTFail("Storage Error: \(error)") })) analytics.storage.hardReset(doYouKnowHowToUseThis: true) - analytics.storage.onFinish = { url in - // check that each one is valid json - do { - let json = try Data(contentsOf: url) - _ = try JSONSerialization.jsonObject(with: json) - } catch { - XCTFail("\(error) in \(url)") - } - } DirectoryStore.fileValidator = { url in do { @@ -175,7 +158,7 @@ class StressTests: XCTestCase { RunLoop.main.run(until: Date.distantPast) } print("queue 1 wrote \(eventsWritten) events.") - queue1Done = true + _queue1Done.set(true) } writeQueue2.async { @@ -189,7 +172,7 @@ class StressTests: XCTestCase { RunLoop.main.run(until: Date.distantPast) } print("queue 2 wrote \(eventsWritten) events.") - queue2Done = true + _queue2Done.set(true) } writeQueue3.async { @@ -203,7 +186,7 @@ class StressTests: XCTestCase { RunLoop.main.run(until: Date.distantPast) } print("queue 3 wrote \(eventsWritten) events.") - queue3Done = true + _queue3Done.set(true) } writeQueue4.async { @@ -217,7 +200,7 @@ class StressTests: XCTestCase { RunLoop.main.run(until: Date.distantPast) } print("queue 4 wrote \(eventsWritten) events.") - queue4Done = true + _queue4Done.set(true) } flushQueue.async { @@ -233,10 +216,10 @@ class StressTests: XCTestCase { counter += 1 } print("flushed \(counter) times.") - ready = false + _ready.set(false) } - ready = true + _ready.set(true) while (ready) { RunLoop.main.run(until: Date.distantPast) @@ -257,15 +240,6 @@ class StressTests: XCTestCase { XCTFail("Storage Error: \(error)") })) analytics.storage.hardReset(doYouKnowHowToUseThis: true) - analytics.storage.onFinish = { url in - // check that each one is valid json - do { - let json = try Data(contentsOf: url) - _ = try JSONSerialization.jsonObject(with: json) - } catch { - XCTFail("\(error) in \(url)") - } - } waitUntilStarted(analytics: analytics) @@ -302,7 +276,7 @@ class StressTests: XCTestCase { RunLoop.main.run(until: Date.distantPast) } print("queue 1 wrote \(eventsWritten) events.") - queue1Done = true + _queue1Done.set(true) } writeQueue2.async { @@ -316,7 +290,7 @@ class StressTests: XCTestCase { RunLoop.main.run(until: Date.distantPast) } print("queue 2 wrote \(eventsWritten) events.") - queue2Done = true + _queue2Done.set(true) } flushQueue.async { @@ -332,10 +306,10 @@ class StressTests: XCTestCase { counter += 1 } print("flushed \(counter) times.") - ready = false + _ready.set(false) } - ready = true + _ready.set(true) while (ready) { RunLoop.main.run(until: Date.distantPast) From 7b19f1b107fe74a57055af5ef97fe3a3ad69fe34 Mon Sep 17 00:00:00 2001 From: Brandon Sneed Date: Thu, 9 May 2024 11:28:39 -0700 Subject: [PATCH 4/9] Some platform specific fixes. --- Examples/other_plugins/IDFACollection.swift | 2 +- .../Platforms/Mac/macOSLifecycleEvents.swift | 2 +- Sources/Segment/Utilities/Atomic.swift | 18 ++++++++++++++++++ 3 files changed, 20 insertions(+), 2 deletions(-) diff --git a/Examples/other_plugins/IDFACollection.swift b/Examples/other_plugins/IDFACollection.swift index a2fe52e6..f3ceadb7 100644 --- a/Examples/other_plugins/IDFACollection.swift +++ b/Examples/other_plugins/IDFACollection.swift @@ -78,7 +78,7 @@ extension IDFACollection: iOSLifecycle { func applicationDidBecomeActive(application: UIApplication?) { let status = ATTrackingManager.trackingAuthorizationStatus - _alreadyAsked.withValue { alreadyAsked in + _alreadyAsked.mutate { alreadyAsked in if status == .notDetermined && !alreadyAsked { // we don't know, so should ask the user. alreadyAsked = true diff --git a/Sources/Segment/Plugins/Platforms/Mac/macOSLifecycleEvents.swift b/Sources/Segment/Plugins/Platforms/Mac/macOSLifecycleEvents.swift index a34e28c9..1664c770 100644 --- a/Sources/Segment/Plugins/Platforms/Mac/macOSLifecycleEvents.swift +++ b/Sources/Segment/Plugins/Platforms/Mac/macOSLifecycleEvents.swift @@ -27,7 +27,7 @@ class macOSLifecycleEvents: PlatformPlugin, macOSLifecycle { func application(didFinishLaunchingWithOptions launchOptions: [String : Any]?) { // Make sure we aren't double calling application:didFinishLaunchingWithOptions // by resetting the check at the start - didFinishLaunching = true + _didFinishLaunching.set(true) if analytics?.configuration.values.trackApplicationLifecycleEvents == false { return diff --git a/Sources/Segment/Utilities/Atomic.swift b/Sources/Segment/Utilities/Atomic.swift index 922c4819..84694651 100644 --- a/Sources/Segment/Utilities/Atomic.swift +++ b/Sources/Segment/Utilities/Atomic.swift @@ -22,19 +22,29 @@ import Foundation @propertyWrapper public class Atomic { + #if os(Linux) + let lock: NSLock + #else internal typealias os_unfair_lock_t = UnsafeMutablePointer internal var unfairLock: os_unfair_lock_t + #endif internal var value: T public init(wrappedValue value: T) { + #if os(Linux) + self.lock = NSLock() + #else self.unfairLock = UnsafeMutablePointer.allocate(capacity: 1) self.unfairLock.initialize(to: os_unfair_lock()) + #endif self.value = value } deinit { + #if !os(Linux) unfairLock.deallocate() + #endif } public var wrappedValue: T { @@ -59,10 +69,18 @@ public class Atomic { extension Atomic { internal func lock() { + #if os(Linux) + self.lock.lock() + #else os_unfair_lock_lock(unfairLock) + #endif } internal func unlock() { + #if os(Linux) + self.lock.unlock() + #else os_unfair_lock_unlock(unfairLock) + #endif } } From 75a238f240053ede84cdc23ff70b28f5d893b6b5 Mon Sep 17 00:00:00 2001 From: Brandon Sneed Date: Thu, 9 May 2024 11:36:06 -0700 Subject: [PATCH 5/9] some linux fixes --- Sources/Segment/Utilities/Atomic.swift | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Sources/Segment/Utilities/Atomic.swift b/Sources/Segment/Utilities/Atomic.swift index 84694651..e3bbf266 100644 --- a/Sources/Segment/Utilities/Atomic.swift +++ b/Sources/Segment/Utilities/Atomic.swift @@ -23,7 +23,7 @@ import Foundation @propertyWrapper public class Atomic { #if os(Linux) - let lock: NSLock + let swiftLock: NSLock #else internal typealias os_unfair_lock_t = UnsafeMutablePointer internal var unfairLock: os_unfair_lock_t @@ -33,7 +33,7 @@ public class Atomic { public init(wrappedValue value: T) { #if os(Linux) - self.lock = NSLock() + self.swiftLock = NSLock() #else self.unfairLock = UnsafeMutablePointer.allocate(capacity: 1) self.unfairLock.initialize(to: os_unfair_lock()) @@ -70,7 +70,7 @@ public class Atomic { extension Atomic { internal func lock() { #if os(Linux) - self.lock.lock() + swiftLock.lock() #else os_unfair_lock_lock(unfairLock) #endif @@ -78,7 +78,7 @@ extension Atomic { internal func unlock() { #if os(Linux) - self.lock.unlock() + swiftLock.unlock() #else os_unfair_lock_unlock(unfairLock) #endif From e216a7ba0f41a133bab0e2ecb4339b5a9325cae7 Mon Sep 17 00:00:00 2001 From: Brandon Sneed Date: Thu, 9 May 2024 12:15:33 -0700 Subject: [PATCH 6/9] Modified directory store to validate prior to rename. --- .../Segment/Utilities/Storage/Types/DirectoryStore.swift | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/Sources/Segment/Utilities/Storage/Types/DirectoryStore.swift b/Sources/Segment/Utilities/Storage/Types/DirectoryStore.swift index 6f70464c..575e4add 100644 --- a/Sources/Segment/Utilities/Storage/Types/DirectoryStore.swift +++ b/Sources/Segment/Utilities/Storage/Types/DirectoryStore.swift @@ -176,12 +176,15 @@ extension DirectoryStore { try? writer.writeLine(fileEnding) let url = writer.url + + // do validation before we rename to prevent the file disappearing out from under us. + DirectoryStore.fileValidator?(url) + + // move it to make availble for flushing ... let newURL = url.appendingPathExtension(Self.tempExtension) try? FileManager.default.moveItem(at: url, to: newURL) self.writer = nil incrementIndex() - - DirectoryStore.fileValidator?(newURL) } } From 36414a988f3e23641efd09910e14fc5a05c2a488 Mon Sep 17 00:00:00 2001 From: Brandon Sneed Date: Mon, 13 May 2024 09:38:48 -0700 Subject: [PATCH 7/9] Reduced test size --- Tests/Segment-Tests/StressTests.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/Segment-Tests/StressTests.swift b/Tests/Segment-Tests/StressTests.swift index f4553621..cc89a9d4 100644 --- a/Tests/Segment-Tests/StressTests.swift +++ b/Tests/Segment-Tests/StressTests.swift @@ -74,7 +74,7 @@ class StressTests: XCTestCase { } // schedule a bunch of events to go out - for i in 0..<1_000_000 { + for _ in 0..<500_000 { let randomInt = Int.random(in: 0..<30) let queue = queues[randomInt] group.enter() From a5a7ecc54e54379af10152141ed81bab517f9a19 Mon Sep 17 00:00:00 2001 From: Brandon Sneed Date: Mon, 13 May 2024 09:51:04 -0700 Subject: [PATCH 8/9] wait for flushes to finish out ... --- Tests/Segment-Tests/StressTests.swift | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Tests/Segment-Tests/StressTests.swift b/Tests/Segment-Tests/StressTests.swift index cc89a9d4..8adbddf7 100644 --- a/Tests/Segment-Tests/StressTests.swift +++ b/Tests/Segment-Tests/StressTests.swift @@ -98,6 +98,11 @@ class StressTests: XCTestCase { RunLoop.main.run(until: Date.distantPast) } + // wait for everything to settle down flush-wise... + while (analytics.hasUnsentEvents) { + RunLoop.main.run(until: Date(timeIntervalSinceNow: .seconds(5))) + } + analytics.purgeStorage() } From 0de31c9b9ceaf11c919bac83a73e9c510145500c Mon Sep 17 00:00:00 2001 From: Brandon Sneed Date: Mon, 13 May 2024 10:32:45 -0700 Subject: [PATCH 9/9] Combined linux/tvos/watchos skips. --- Tests/Segment-Tests/StressTests.swift | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/Tests/Segment-Tests/StressTests.swift b/Tests/Segment-Tests/StressTests.swift index 8adbddf7..4617f530 100644 --- a/Tests/Segment-Tests/StressTests.swift +++ b/Tests/Segment-Tests/StressTests.swift @@ -18,8 +18,8 @@ class StressTests: XCTestCase { // Put teardown code here. This method is called after the invocation of each test method in the class. } - // Linux doesn't know what URLProtocol is and on watchOS it somehow works differently and isn't hit. - #if !os(Linux) && !os(watchOS) + // Linux doesn't know what URLProtocol is and on tvOS/watchOS it somehow works differently and isn't hit. + #if !os(Linux) && !os(tvOS) && !os(watchOS) func testDirectoryStorageStress2() throws { // register our network blocker guard URLProtocol.registerClass(BlockNetworkCalls.self) else { XCTFail(); return } @@ -230,10 +230,7 @@ class StressTests: XCTestCase { RunLoop.main.run(until: Date.distantPast) } } - #endif - - // Linux doesn't know what URLProtocol is and on watchOS it somehow works differently and isn't hit. - #if !os(Linux) && !os(watchOS) + func testMemoryStorageStress() throws { // register our network blocker guard URLProtocol.registerClass(BlockNetworkCalls.self) else { XCTFail(); return }