Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Segment.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
46A018D425E6C9C200F9CCD8 /* LinuxUtils.swift in Sources */ = {isa = PBXBuildFile; fileRef = 46A018D325E6C9C200F9CCD8 /* LinuxUtils.swift */; };
46A018DA25E97FDF00F9CCD8 /* AppleUtils.swift in Sources */ = {isa = PBXBuildFile; fileRef = 46A018D925E97FDF00F9CCD8 /* AppleUtils.swift */; };
46A018EE25E9A74F00F9CCD8 /* VendorSystem.swift in Sources */ = {isa = PBXBuildFile; fileRef = 46A018ED25E9A74F00F9CCD8 /* VendorSystem.swift */; };
46B1AC6927346D3D00846DE8 /* StressTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 46B1AC6827346D3D00846DE8 /* StressTests.swift */; };
46E382E72654429A00BA2502 /* Utils.swift in Sources */ = {isa = PBXBuildFile; fileRef = 46E382E62654429A00BA2502 /* Utils.swift */; };
46F7485D26C718710042798E /* ObjCAnalytics.swift in Sources */ = {isa = PBXBuildFile; fileRef = 46F7485B26C718710042798E /* ObjCAnalytics.swift */; };
46F7485E26C718710042798E /* ObjCConfiguration.swift in Sources */ = {isa = PBXBuildFile; fileRef = 46F7485C26C718710042798E /* ObjCConfiguration.swift */; };
Expand Down Expand Up @@ -125,6 +126,7 @@
46A018D325E6C9C200F9CCD8 /* LinuxUtils.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = LinuxUtils.swift; sourceTree = "<group>"; };
46A018D925E97FDF00F9CCD8 /* AppleUtils.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AppleUtils.swift; sourceTree = "<group>"; };
46A018ED25E9A74F00F9CCD8 /* VendorSystem.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = VendorSystem.swift; sourceTree = "<group>"; };
46B1AC6827346D3D00846DE8 /* StressTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = StressTests.swift; sourceTree = "<group>"; };
46D98E3D26D6FEF300E7A86A /* FlurryDestination.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = FlurryDestination.swift; sourceTree = "<group>"; };
46D98E3E26D6FEF300E7A86A /* AdjustDestination.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AdjustDestination.swift; sourceTree = "<group>"; };
46D98E3F26D6FEF300E7A86A /* MixpanelDestination.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MixpanelDestination.swift; sourceTree = "<group>"; };
Expand Down Expand Up @@ -377,6 +379,7 @@
46F7485F26C720F60042798E /* ObjC_Tests.swift */,
967C40D9258D472C008EB0B6 /* SegmentLog_Tests.swift */,
46FE4D1C25A7A850003A7362 /* Storage_Tests.swift */,
46B1AC6827346D3D00846DE8 /* StressTests.swift */,
4621082D2609206D00EBC4A8 /* Support */,
96DBF37A26F39B5500724B0B /* Timeline_Tests.swift */,
OBJ_13 /* XCTestManifests.swift */,
Expand Down Expand Up @@ -581,6 +584,7 @@
OBJ_30 /* Analytics_Tests.swift in Sources */,
46F7486026C720F60042798E /* ObjC_Tests.swift in Sources */,
OBJ_31 /* XCTestManifests.swift in Sources */,
46B1AC6927346D3D00846DE8 /* StressTests.swift in Sources */,
4658175425BA4C20006B2809 /* HTTPClient_Tests.swift in Sources */,
46210811260538BE00EBC4A8 /* KeyPath_Tests.swift in Sources */,
967C40E3258D4DAF008EB0B6 /* Metrics_Tests.swift in Sources */,
Expand Down
33 changes: 21 additions & 12 deletions Sources/Segment/Plugins/SegmentDestination.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class SegmentDestination: DestinationPlugin {

private var httpClient: HTTPClient?
private var uploads = [UploadTaskInfo]()
private let uploadsQueue = DispatchQueue(label: "uploadsQueue.segment.com")
private var storage: Storage?

private var apiKey: String? = nil
Expand Down Expand Up @@ -162,25 +163,33 @@ extension SegmentDestination {
// lets go through and get rid of any tasks that aren't running.
// either they were suspended because a background task took too
// long, or the os orphaned it due to device constraints (like a watch).
let before = uploads.count
var newPending = uploads
newPending.removeAll { uploadInfo in
let shouldRemove = uploadInfo.task.state != .running
if shouldRemove, let cleanup = uploadInfo.cleanup {
cleanup()
uploadsQueue.sync {
let before = uploads.count
var newPending = uploads
newPending.removeAll { uploadInfo in
let shouldRemove = uploadInfo.task.state != .running
if shouldRemove, let cleanup = uploadInfo.cleanup {
cleanup()
}
return shouldRemove
}
return shouldRemove
uploads = newPending
let after = uploads.count
analytics?.log(message: "Cleaned up \(before - after) non-running uploads.")
}
uploads = newPending
let after = uploads.count
analytics?.log(message: "Cleaned up \(before - after) non-running uploads.")
}

internal var pendingUploads: Int {
return uploads.count
var uploadsCount = 0
uploadsQueue.sync {
uploadsCount = uploads.count
}
return uploadsCount
}

internal func add(uploadTask: UploadTaskInfo) {
uploads.append(uploadTask)
uploadsQueue.sync {
uploads.append(uploadTask)
}
}
}
19 changes: 12 additions & 7 deletions Sources/Segment/Plugins/StartupQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,21 @@ internal class StartupQueue: Plugin, Subscriber {
}
}

let syncQueue = DispatchQueue(label: "startupQueue.segment.com")
var queuedEvents = [RawEvent]()

required init() { }

func execute<T: RawEvent>(event: T?) -> T? {
if running == false, let e = event {
// timeline hasn't started, so queue it up.
if queuedEvents.count >= Self.maxSize {
// if we've exceeded the max queue size start dropping events
queuedEvents.removeFirst()
syncQueue.sync {
if queuedEvents.count >= Self.maxSize {
// if we've exceeded the max queue size start dropping events
queuedEvents.removeFirst()
}
queuedEvents.append(e)
}
queuedEvents.append(e)
return nil
}
// the timeline has started, so let the event pass.
Expand All @@ -50,9 +53,11 @@ extension StartupQueue {

internal func replayEvents() {
// replay the queued events to the instance of Analytics we're working with.
for event in queuedEvents {
analytics?.process(event: event)
syncQueue.sync {
for event in queuedEvents {
analytics?.process(event: event)
}
queuedEvents.removeAll()
}
queuedEvents.removeAll()
}
}
Loading