diff --git a/Segment.xcodeproj/project.pbxproj b/Segment.xcodeproj/project.pbxproj index 568cfec2..be9a39e5 100644 --- a/Segment.xcodeproj/project.pbxproj +++ b/Segment.xcodeproj/project.pbxproj @@ -38,6 +38,7 @@ 46A018D425E6C9C200F9CCD8 /* LinuxUtils.swift in Sources */ = {isa = PBXBuildFile; fileRef = 46A018D325E6C9C200F9CCD8 /* LinuxUtils.swift */; }; 46A018DA25E97FDF00F9CCD8 /* AppleUtils.swift in Sources */ = {isa = PBXBuildFile; fileRef = 46A018D925E97FDF00F9CCD8 /* AppleUtils.swift */; }; 46A018EE25E9A74F00F9CCD8 /* VendorSystem.swift in Sources */ = {isa = PBXBuildFile; fileRef = 46A018ED25E9A74F00F9CCD8 /* VendorSystem.swift */; }; + 46B1AC6927346D3D00846DE8 /* StressTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 46B1AC6827346D3D00846DE8 /* StressTests.swift */; }; 46E382E72654429A00BA2502 /* Utils.swift in Sources */ = {isa = PBXBuildFile; fileRef = 46E382E62654429A00BA2502 /* Utils.swift */; }; 46F7485D26C718710042798E /* ObjCAnalytics.swift in Sources */ = {isa = PBXBuildFile; fileRef = 46F7485B26C718710042798E /* ObjCAnalytics.swift */; }; 46F7485E26C718710042798E /* ObjCConfiguration.swift in Sources */ = {isa = PBXBuildFile; fileRef = 46F7485C26C718710042798E /* ObjCConfiguration.swift */; }; @@ -125,6 +126,7 @@ 46A018D325E6C9C200F9CCD8 /* LinuxUtils.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = LinuxUtils.swift; sourceTree = ""; }; 46A018D925E97FDF00F9CCD8 /* AppleUtils.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AppleUtils.swift; sourceTree = ""; }; 46A018ED25E9A74F00F9CCD8 /* VendorSystem.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = VendorSystem.swift; sourceTree = ""; }; + 46B1AC6827346D3D00846DE8 /* StressTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = StressTests.swift; sourceTree = ""; }; 46D98E3D26D6FEF300E7A86A /* FlurryDestination.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = FlurryDestination.swift; sourceTree = ""; }; 46D98E3E26D6FEF300E7A86A /* AdjustDestination.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AdjustDestination.swift; sourceTree = ""; }; 46D98E3F26D6FEF300E7A86A /* MixpanelDestination.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MixpanelDestination.swift; sourceTree = ""; }; @@ -377,6 +379,7 @@ 46F7485F26C720F60042798E /* ObjC_Tests.swift */, 967C40D9258D472C008EB0B6 /* SegmentLog_Tests.swift */, 46FE4D1C25A7A850003A7362 /* Storage_Tests.swift */, + 46B1AC6827346D3D00846DE8 /* StressTests.swift */, 4621082D2609206D00EBC4A8 /* Support */, 96DBF37A26F39B5500724B0B /* Timeline_Tests.swift */, OBJ_13 /* XCTestManifests.swift */, @@ -581,6 +584,7 @@ OBJ_30 /* Analytics_Tests.swift in Sources */, 46F7486026C720F60042798E /* ObjC_Tests.swift in Sources */, OBJ_31 /* XCTestManifests.swift in Sources */, + 46B1AC6927346D3D00846DE8 /* StressTests.swift in Sources */, 4658175425BA4C20006B2809 /* HTTPClient_Tests.swift in Sources */, 46210811260538BE00EBC4A8 /* KeyPath_Tests.swift in Sources */, 967C40E3258D4DAF008EB0B6 /* Metrics_Tests.swift in Sources */, diff --git a/Sources/Segment/Plugins/SegmentDestination.swift b/Sources/Segment/Plugins/SegmentDestination.swift index d3c9ceb4..fa11e2ff 100644 --- a/Sources/Segment/Plugins/SegmentDestination.swift +++ b/Sources/Segment/Plugins/SegmentDestination.swift @@ -42,6 +42,7 @@ public class SegmentDestination: DestinationPlugin { private var httpClient: HTTPClient? private var uploads = [UploadTaskInfo]() + private let uploadsQueue = DispatchQueue(label: "uploadsQueue.segment.com") private var storage: Storage? private var apiKey: String? = nil @@ -162,25 +163,33 @@ extension SegmentDestination { // lets go through and get rid of any tasks that aren't running. // either they were suspended because a background task took too // long, or the os orphaned it due to device constraints (like a watch). - let before = uploads.count - var newPending = uploads - newPending.removeAll { uploadInfo in - let shouldRemove = uploadInfo.task.state != .running - if shouldRemove, let cleanup = uploadInfo.cleanup { - cleanup() + uploadsQueue.sync { + let before = uploads.count + var newPending = uploads + newPending.removeAll { uploadInfo in + let shouldRemove = uploadInfo.task.state != .running + if shouldRemove, let cleanup = uploadInfo.cleanup { + cleanup() + } + return shouldRemove } - return shouldRemove + uploads = newPending + let after = uploads.count + analytics?.log(message: "Cleaned up \(before - after) non-running uploads.") } - uploads = newPending - let after = uploads.count - analytics?.log(message: "Cleaned up \(before - after) non-running uploads.") } internal var pendingUploads: Int { - return uploads.count + var uploadsCount = 0 + uploadsQueue.sync { + uploadsCount = uploads.count + } + return uploadsCount } internal func add(uploadTask: UploadTaskInfo) { - uploads.append(uploadTask) + uploadsQueue.sync { + uploads.append(uploadTask) + } } } diff --git a/Sources/Segment/Plugins/StartupQueue.swift b/Sources/Segment/Plugins/StartupQueue.swift index 5e3b39ce..fcc9a6a1 100644 --- a/Sources/Segment/Plugins/StartupQueue.swift +++ b/Sources/Segment/Plugins/StartupQueue.swift @@ -21,6 +21,7 @@ internal class StartupQueue: Plugin, Subscriber { } } + let syncQueue = DispatchQueue(label: "startupQueue.segment.com") var queuedEvents = [RawEvent]() required init() { } @@ -28,11 +29,13 @@ internal class StartupQueue: Plugin, Subscriber { func execute(event: T?) -> T? { if running == false, let e = event { // timeline hasn't started, so queue it up. - if queuedEvents.count >= Self.maxSize { - // if we've exceeded the max queue size start dropping events - queuedEvents.removeFirst() + syncQueue.sync { + if queuedEvents.count >= Self.maxSize { + // if we've exceeded the max queue size start dropping events + queuedEvents.removeFirst() + } + queuedEvents.append(e) } - queuedEvents.append(e) return nil } // the timeline has started, so let the event pass. @@ -50,9 +53,11 @@ extension StartupQueue { internal func replayEvents() { // replay the queued events to the instance of Analytics we're working with. - for event in queuedEvents { - analytics?.process(event: event) + syncQueue.sync { + for event in queuedEvents { + analytics?.process(event: event) + } + queuedEvents.removeAll() } - queuedEvents.removeAll() } } diff --git a/Sources/Segment/Utilities/Storage.swift b/Sources/Segment/Utilities/Storage.swift index 73e1c387..f581e0c2 100644 --- a/Sources/Segment/Utilities/Storage.swift +++ b/Sources/Segment/Utilities/Storage.swift @@ -11,10 +11,13 @@ import Sovran internal class Storage: Subscriber { let store: Store let writeKey: String - let syncQueue = DispatchQueue(label: "storage.segment.com") let userDefaults: UserDefaults? - static let MAXFILESIZE = 475000 // Server accepts max 500k per batch - + static let MAXFILESIZE = 475000 // Server accepts max 500k per batch + + // This queue synchronizes reads/writes. + // Do NOT use it outside of: write, read, reset, remove. + let syncQueue = DispatchQueue(label: "storage.segment.com") + private var fileHandle: FileHandle? = nil init(store: Store, writeKey: String) { @@ -24,91 +27,82 @@ internal class Storage: Subscriber { store.subscribe(self, handler: userInfoUpdate) store.subscribe(self, handler: systemUpdate) } -} - -// MARK: - String Contants - -extension Storage { - static let tempExtension = "temp" - enum Constants: String, CaseIterable { - case userId = "segment.userId" - case traits = "segment.traits" - case anonymousId = "segment.anonymousId" - case settings = "segment.settings" - case events = "segment.events" - } -} - -// MARK: - Read/Write - -extension Storage { func write(_ key: Storage.Constants, value: T?) { - switch key { - case .events: - if let event = value as? RawEvent { - let eventStoreFile = currentFile(key) - self.storeEvent(toFile: eventStoreFile, event: event) - } - break - default: - if isBasicType(value: value) { - // we can write it like normal - userDefaults?.set(value, forKey: key.rawValue) - } else { - // encode it to a data object to store - let encoder = PropertyListEncoder() - if let plistValue = try? encoder.encode(value) { - userDefaults?.set(plistValue, forKey: key.rawValue) + syncQueue.sync { + switch key { + case .events: + if let event = value as? RawEvent { + let eventStoreFile = currentFile(key) + self.storeEvent(toFile: eventStoreFile, event: event) + } + break + default: + if isBasicType(value: value) { + // we can write it like normal + userDefaults?.set(value, forKey: key.rawValue) + } else { + // encode it to a data object to store + let encoder = PropertyListEncoder() + if let plistValue = try? encoder.encode(value) { + userDefaults?.set(plistValue, forKey: key.rawValue) + } } } + userDefaults?.synchronize() } - userDefaults?.synchronize() } func read(_ key: Storage.Constants) -> [URL]? { - switch key { - case .events: - return eventFiles(includeUnfinished: false) - default: - break + var result: [URL]? = nil + syncQueue.sync { + switch key { + case .events: + result = eventFiles(includeUnfinished: false) + default: + break + } } - return nil + return result } func read(_ key: Storage.Constants) -> T? { var result: T? = nil - switch key { - case .events: - // do nothing - break - default: - let decoder = PropertyListDecoder() - let raw = userDefaults?.object(forKey: key.rawValue) - if let r = raw as? Data { - // it's an encoded object, not a basic type - result = try? decoder.decode(T.self, from: r) - } else { - // it's a basic type - result = userDefaults?.object(forKey: key.rawValue) as? T + syncQueue.sync { + switch key { + case .events: + // do nothing + break + default: + let decoder = PropertyListDecoder() + let raw = userDefaults?.object(forKey: key.rawValue) + if let r = raw as? Data { + // it's an encoded object, not a basic type + result = try? decoder.decode(T.self, from: r) + } else { + // it's a basic type + result = userDefaults?.object(forKey: key.rawValue) as? T + } } } return result } func hardReset(doYouKnowHowToUseThis: Bool) { - if doYouKnowHowToUseThis != true { return } + syncQueue.sync { + if doYouKnowHowToUseThis != true { return } - let urls = eventFiles(includeUnfinished: true) - for key in Constants.allCases { - // on linux, setting a key's value to nil just deadlocks. - // however just removing it works, which is what we really - // wanna do anyway. - userDefaults?.removeObject(forKey: key.rawValue) - } + let urls = eventFiles(includeUnfinished: true) + for key in Constants.allCases { + // on linux, setting a key's value to nil just deadlocks. + // however just removing it works, which is what we really + // wanna do anyway. + userDefaults?.removeObject(forKey: key.rawValue) + } - for url in urls { - try? FileManager.default.removeItem(atPath: url.path) + for url in urls { + try? FileManager.default.removeItem(atPath: url.path) + } } } @@ -135,28 +129,43 @@ extension Storage { return result } - func currentFile(_ key: Storage.Constants) -> URL { - var currentFile = 0 + func remove(file: URL) { syncQueue.sync { - let index: Int = userDefaults?.integer(forKey: key.rawValue) ?? 0 - userDefaults?.set(index, forKey: key.rawValue) - currentFile = index + // remove the temp file. + try? FileManager.default.removeItem(atPath: file.path) + // remove the unfinished event storage file. + let actualFile = file.deletingPathExtension() + try? FileManager.default.removeItem(atPath: actualFile.path) } - return self.eventsFile(index: currentFile) + } + +} + +// MARK: - String Contants + +extension Storage { + private static let tempExtension = "temp" + + enum Constants: String, CaseIterable { + case userId = "segment.userId" + case traits = "segment.traits" + case anonymousId = "segment.anonymousId" + case settings = "segment.settings" + case events = "segment.events" } } // MARK: - State Subscriptions extension Storage { - func userInfoUpdate(state: UserInfo) { + internal func userInfoUpdate(state: UserInfo) { // write new stuff to disk write(.userId, value: state.userId) write(.traits, value: state.traits) write(.anonymousId, value: state.anonymousId) } - func systemUpdate(state: System) { + internal func systemUpdate(state: System) { // write new stuff to disk if let s = state.settings { write(.settings, value: s) @@ -167,7 +176,15 @@ extension Storage { // MARK: - Utility Methods extension Storage { - func eventStorageDirectory() -> URL { + private func currentFile(_ key: Storage.Constants) -> URL { + var currentFile = 0 + let index: Int = userDefaults?.integer(forKey: key.rawValue) ?? 0 + userDefaults?.set(index, forKey: key.rawValue) + currentFile = index + return self.eventsFile(index: currentFile) + } + + private func eventStorageDirectory() -> URL { let urls = FileManager.default.urls(for: .documentDirectory, in: .userDomainMask) let docURL = urls[0] let segmentURL = docURL.appendingPathComponent("segment/\(writeKey)/") @@ -177,41 +194,36 @@ extension Storage { return segmentURL } - func eventsFile(index: Int) -> URL { + private func eventsFile(index: Int) -> URL { let docs = eventStorageDirectory() let fileURL = docs.appendingPathComponent("\(index)-segment-events") return fileURL } - func eventFiles(includeUnfinished: Bool) -> [URL] { + internal func eventFiles(includeUnfinished: Bool) -> [URL] { // synchronized against finishing/creating files while we're getting // a list of files to send. - + var result = [URL]() + // finish out any file in progress - var index: Int = 0 - syncQueue.sync { - index = userDefaults?.integer(forKey: Constants.events.rawValue) ?? 0 - } + let index = userDefaults?.integer(forKey: Constants.events.rawValue) ?? 0 finish(file: eventsFile(index: index)) - var result = [URL]() - syncQueue.sync { - let allFiles = try? FileManager.default.contentsOfDirectory(at: eventStorageDirectory(), includingPropertiesForKeys: [], options: .skipsHiddenFiles) - var files = allFiles - - if includeUnfinished == false { - files = allFiles?.filter({ (file) -> Bool in - return file.pathExtension == Storage.tempExtension - }) - } - - let sorted = files?.sorted { (left, right) -> Bool in - return left.lastPathComponent > right.lastPathComponent - } - if let s = sorted { - result = s + let allFiles = try? FileManager.default.contentsOfDirectory(at: eventStorageDirectory(), includingPropertiesForKeys: [], options: .skipsHiddenFiles) + var files = allFiles + + if includeUnfinished == false { + files = allFiles?.filter { (file) -> Bool in + return file.pathExtension == Storage.tempExtension } } + + let sorted = files?.sorted { (left, right) -> Bool in + return left.lastPathComponent > right.lastPathComponent + } + if let s = sorted { + result = s + } return result } } @@ -219,9 +231,7 @@ extension Storage { // MARK: - Event Storage extension Storage { - - func storeEvent(toFile file: URL, event: RawEvent) { - + private func storeEvent(toFile file: URL, event: RawEvent) { var storeFile = file let fm = FileManager.default @@ -242,72 +252,66 @@ extension Storage { newFile = true } - syncQueue.sync { - let jsonString = event.toString() - if let jsonData = jsonString.data(using: .utf8) { - fileHandle?.seekToEndOfFile() - // prepare for the next entry - if newFile == false { - fileHandle?.write(",".data(using: .utf8)!) - } - // write the data - fileHandle?.write(jsonData) - if #available(tvOS 13, *) { - try? fileHandle?.synchronize() - } - } else { - assert(false, "Storage: Unable to convert event to json!") + let jsonString = event.toString() + if let jsonData = jsonString.data(using: .utf8) { + fileHandle?.seekToEndOfFile() + // prepare for the next entry + if newFile == false { + fileHandle?.write(",".data(using: .utf8)!) } + // write the data + fileHandle?.write(jsonData) + if #available(tvOS 13, *) { + try? fileHandle?.synchronize() + } + } else { + assert(false, "Storage: Unable to convert event to json!") } } - func start(file: URL) { - syncQueue.sync { - let contents = "{ \"batch\": [" - do { - FileManager.default.createFile(atPath: file.path, contents: contents.data(using: .utf8)) - fileHandle = try FileHandle(forWritingTo: file) - } catch { - assert(false, "Storage: failed to write \(file), error: \(error)") - } + private func start(file: URL) { + let contents = "{ \"batch\": [" + do { + FileManager.default.createFile(atPath: file.path, contents: contents.data(using: .utf8)) + fileHandle = try FileHandle(forWritingTo: file) + } catch { + assert(false, "Storage: failed to write \(file), error: \(error)") } } - func finish(file: URL) { - syncQueue.sync { - let sentAt = Date().iso8601() + private func finish(file: URL) { + guard let fileHandle = self.fileHandle else { + // we haven't actually started a file yet and being told to flush + // so ignore it and get out. + return + } + + let sentAt = Date().iso8601() - // write it to the existing file - let fileEnding = "],\"sentAt\":\"\(sentAt)\"}" - let endData = fileEnding.data(using: .utf8) - if let endData = endData { - fileHandle?.seekToEndOfFile() - fileHandle?.write(endData) - if #available(tvOS 13, *) { - try? fileHandle?.synchronize() - } - fileHandle?.closeFile() - fileHandle = nil - } else { - // something is wrong with this file, maybe delete it? - //assert(false, "Storage: event storage \(file) is messed up!") + // write it to the existing file + let fileEnding = "],\"sentAt\":\"\(sentAt)\"}" + let endData = fileEnding.data(using: .utf8) + if let endData = endData { + fileHandle.seekToEndOfFile() + fileHandle.write(endData) + if #available(tvOS 13, *) { + try? fileHandle.synchronize() } - - let tempFile = file.appendingPathExtension(Storage.tempExtension) - try? FileManager.default.copyItem(at: file, to: tempFile) - - let currentFile: Int = (userDefaults?.integer(forKey: Constants.events.rawValue) ?? 0) + 1 - userDefaults?.set(currentFile, forKey: Constants.events.rawValue) + fileHandle.closeFile() + self.fileHandle = nil + } else { + // something is wrong with this file :S + Analytics.segmentLog(message: "Event storage \(file) has some kind of problem.", kind: .error) } - } - - func remove(file: URL) { - syncQueue.sync { - // remove the temp file. - try? FileManager.default.removeItem(atPath: file.path) - // remove the unfinished event storage file. - let actualFile = file.deletingPathExtension() - try? FileManager.default.removeItem(atPath: actualFile.path) + + let tempFile = file.appendingPathExtension(Storage.tempExtension) + do { + try FileManager.default.copyItem(at: file, to: tempFile) + } catch { + Analytics.segmentLog(message: "Unable to rename to temp: \(file), Error: \(error)", kind: .error) } + + let currentFile: Int = (userDefaults?.integer(forKey: Constants.events.rawValue) ?? 0) + 1 + userDefaults?.set(currentFile, forKey: Constants.events.rawValue) } } diff --git a/Tests/Segment-Tests/Storage_Tests.swift b/Tests/Segment-Tests/Storage_Tests.swift index 17d2d421..4c4f95de 100644 --- a/Tests/Segment-Tests/Storage_Tests.swift +++ b/Tests/Segment-Tests/Storage_Tests.swift @@ -110,5 +110,5 @@ class StorageTests: XCTestCase { XCTAssertTrue(fileURL.lastPathComponent == "1-segment-events.temp") XCTAssertTrue(FileManager.default.fileExists(atPath: fileURL.path)) } - + } diff --git a/Tests/Segment-Tests/StressTests.swift b/Tests/Segment-Tests/StressTests.swift new file mode 100644 index 00000000..c5cf327e --- /dev/null +++ b/Tests/Segment-Tests/StressTests.swift @@ -0,0 +1,82 @@ +// +// StressTests.swift +// Segment-Tests +// +// Created by Brandon Sneed on 11/4/21. +// + +import XCTest +@testable import Segment + +class StressTests: XCTestCase { + + override func setUpWithError() throws { + // Put setup code here. This method is called before the invocation of each test method in the class. + } + + override func tearDownWithError() throws { + // Put teardown code here. This method is called after the invocation of each test method in the class. + } + /* re-enable when network is mocked */ + /* + func testStorageStress() { + let analytics = Analytics(configuration: Configuration(writeKey: "test")) + analytics.storage.hardReset(doYouKnowHowToUseThis: true) + + let writeQueue1 = DispatchQueue(label: "write queue 1") + let writeQueue2 = DispatchQueue(label: "write queue 2") + let flushQueue = DispatchQueue(label: "flush queue") + + @Atomic var ready = false + @Atomic var queue1Done = false + @Atomic var queue2Done = false + + writeQueue1.async { + while (ready == false) { usleep(1) } + var eventsWritten = 0 + while (eventsWritten < 10000) { + let event = "write queue 1: \(eventsWritten)" + analytics.track(name: event) + eventsWritten += 1 + usleep(0001) + } + print("queue 1 wrote \(eventsWritten) events.") + queue1Done = true + } + + writeQueue2.async { + while (ready == false) { usleep(1) } + var eventsWritten = 0 + while (eventsWritten < 10000) { + let event = "write queue 2: \(eventsWritten)" + analytics.track(name: event) + eventsWritten += 1 + usleep(0001) + } + print("queue 2 wrote \(eventsWritten) events.") + queue2Done = true + } + + flushQueue.async { + while (ready == false) { usleep(1) } + var counter = 0 + sleep(1) + while (queue1Done == false || queue2Done == false) { + let sleepTime = UInt32.random(in: 1..<3000) + usleep(sleepTime) + analytics.flush() + counter += 1 + } + print("flushed \(counter) times.") + ready = false + } + + ready = true + + while (ready) { + RunLoop.main.run(until: Date.distantPast) + } + }*/ + + +}