Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 70 additions & 4 deletions Sources/Segment/Analytics.swift
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ extension Analytics {
return nil
}

/// Returns the current operating mode this instance was given.
public var operatingMode: OperatingMode {
return configuration.values.operatingMode
}

/// Adjusts the flush interval post configuration.
public var flushInterval: TimeInterval {
get {
Expand Down Expand Up @@ -196,16 +201,52 @@ extension Analytics {
}

/// Tells this instance of Analytics to flush any queued events up to Segment.com. This command will also
/// be sent to each plugin present in the system.
public func flush() {
/// be sent to each plugin present in the system. A completion handler can be optionally given and will be
/// called when flush has completed.
public func flush(completion: (() -> Void)? = nil) {
// only flush if we're enabled.
guard enabled == true else { return }

let flushGroup = DispatchGroup()
// gotta call enter at least once before we ask to be notified.
flushGroup.enter()

apply { plugin in
if let p = plugin as? EventPlugin {
p.flush()
operatingMode.run(queue: configuration.values.flushQueue) {
if let p = plugin as? FlushCompletion {
// this is async
// flush(group:completion:) handles the enter/leave.
p.flush(group: flushGroup) { plugin in
// we don't really care about the plugin value .. yet.
}
} else if let p = plugin as? EventPlugin {
// we have no idea if this will be async or not, assume it's sync.
flushGroup.enter()
p.flush()
flushGroup.leave()
}
}
}

// if we're not in server mode, we need to be notified when it's done.
if let completion, operatingMode != .synchronous {
// set up our callback to know when the group has completed, if we're not
// in .server operating mode.
flushGroup.notify(queue: configuration.values.flushQueue) {
DispatchQueue.main.async { completion() }
}
}

flushGroup.leave() // matches our initial enter().

// if we ARE in server mode, we need to wait on the group.
// This effectively ends up being a `sync` operation.
if operatingMode == .synchronous {
flushGroup.wait()
// we need to call completion on our own since
// we skipped setting up notify.
if let completion { DispatchQueue.main.async { completion() }}
}
}

/// Resets this instance of Analytics to a clean slate. Traits, UserID's, anonymousId, etc are all cleared or reset. This
Expand Down Expand Up @@ -384,3 +425,28 @@ extension Analytics {
return configuration.values.writeKey == Self.deadInstance
}
}

// MARK: Operating mode based scheduling

extension OperatingMode {
func run(queue: DispatchQueue, task: @escaping () -> Void) {
//
switch self {
case .asynchronous:
queue.async {
task()
}
case .synchronous:
// if for some reason, we're told to do all this stuff on
// main, ignore it, and use the default queue. this prevents
// a possible deadlock.
if queue === DispatchQueue.main {
OperatingMode.defaultQueue.asyncAndWait {
task()
}
} else {
queue.asyncAndWait { task() }
}
}
}
}
30 changes: 30 additions & 0 deletions Sources/Segment/Configuration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,17 @@ import Foundation
import FoundationNetworking
#endif

// MARK: - Operating Mode
/// Specifies the operating mode/context
public enum OperatingMode {
/// The operation of the Analytics client are synchronous.
case synchronous
/// The operation of the Analytics client are asynchronous.
case asynchronous

static internal let defaultQueue = DispatchQueue(label: "com.segment.operatingModeQueue", qos: .utility)
}

// MARK: - Internal Configuration

public class Configuration {
Expand All @@ -26,6 +37,9 @@ public class Configuration {
var requestFactory: ((URLRequest) -> URLRequest)? = nil
var errorHandler: ((Error) -> Void)? = nil
var flushPolicies: [FlushPolicy] = [CountBasedFlushPolicy(), IntervalBasedFlushPolicy()]

var operatingMode: OperatingMode = .asynchronous
var flushQueue: DispatchQueue = OperatingMode.defaultQueue
var userAgent: String? = nil
}

Expand Down Expand Up @@ -184,6 +198,22 @@ public extension Configuration {
return self
}

/// Informs the Analytics instance of its operating mode/context.
/// Use `.server` when operating in a web service, or when synchronous operation
/// is desired. Use `.client` when operating in a long lived process,
/// desktop/mobile application.
@discardableResult
func operatingMode(_ mode: OperatingMode) -> Configuration {
values.operatingMode = mode
return self
}

/// Specify a custom queue to use when performing a flush operation. The default
/// value is a Segment owned background queue.
@discardableResult
func flushQueue(_ queue: DispatchQueue) -> Configuration {
values.flushQueue = queue

@discardableResult
func userAgent(_ userAgent: String) -> Configuration {
values.userAgent = userAgent
Expand Down
4 changes: 4 additions & 0 deletions Sources/Segment/Plugins.swift
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ public protocol VersionedPlugin {
static func version() -> String
}

public protocol FlushCompletion {
func flush(group: DispatchGroup, completion: @escaping (DestinationPlugin) -> Void)
}

// For internal platform-specific bits
internal protocol PlatformPlugin: Plugin { }

Expand Down
14 changes: 12 additions & 2 deletions Sources/Segment/Plugins/SegmentDestination.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import Sovran
import FoundationNetworking
#endif

public class SegmentDestination: DestinationPlugin, Subscriber {
public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion {
internal enum Constants: String {
case integrationName = "Segment.io"
case apiHost = "apiHost"
Expand Down Expand Up @@ -113,6 +113,10 @@ public class SegmentDestination: DestinationPlugin, Subscriber {
}

public func flush() {
// unused .. see flush(group:completion:)
}

public func flush(group: DispatchGroup, completion: @escaping (DestinationPlugin) -> Void) {
guard let storage = self.storage else { return }
guard let analytics = self.analytics else { return }
guard let httpClient = self.httpClient else { return }
Expand All @@ -131,7 +135,9 @@ public class SegmentDestination: DestinationPlugin, Subscriber {
if pendingUploads == 0 {
for url in data {
analytics.log(message: "Processing Batch:\n\(url.lastPathComponent)")

// enter the dispatch group
group.enter()
// set up the task
let uploadTask = httpClient.startBatchUpload(writeKey: analytics.configuration.values.writeKey, batch: url) { (result) in
switch result {
case .success(_):
Expand All @@ -146,6 +152,10 @@ public class SegmentDestination: DestinationPlugin, Subscriber {
// make sure it gets removed and it's cleanup() called rather
// than waiting on the next flush to come around.
self.cleanupUploads()
// call the completion
completion(self)
// leave the dispatch group
group.leave()
}
// we have a legit upload in progress now, so add it to our list.
if let upload = uploadTask {
Expand Down
4 changes: 3 additions & 1 deletion Sources/Segment/Settings.swift
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,14 @@ extension Analytics {
// we don't really wanna wait for this network call during tests...
// but we should make it work similarly.
store.dispatch(action: System.ToggleRunningAction(running: false))
DispatchQueue.main.async {

operatingMode.run(queue: DispatchQueue.main) {
if let state: System = self.store.currentState(), let settings = state.settings {
self.store.dispatch(action: System.UpdateSettingsAction(settings: settings))
}
self.store.dispatch(action: System.ToggleRunningAction(running: true))
}

return
}
#endif
Expand Down
2 changes: 1 addition & 1 deletion Sources/Segment/Startup.swift
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ extension Analytics: Subscriber {
plugins += VendorSystem.current.requiredPlugins

// setup lifecycle if desired
if configuration.values.trackApplicationLifecycleEvents {
if configuration.values.trackApplicationLifecycleEvents, operatingMode != .synchronous {
#if os(iOS) || os(tvOS)
plugins.append(iOSLifecycleEvents())
#endif
Expand Down
7 changes: 5 additions & 2 deletions Sources/Segment/Utilities/Logging.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@
import Foundation

extension Analytics {
internal enum LogKind {
internal enum LogKind: CustomStringConvertible, CustomDebugStringConvertible {
case error
case warning
case debug
case none

var description: String { return string }
var debugDescription: String { return string }

var string: String {
switch self {
case .error:
Expand All @@ -23,7 +26,7 @@ extension Analytics {
case .debug:
return "SEG_DEBUG: "
case .none:
return ""
return "SEG_INFO: "
}
}
}
Expand Down
43 changes: 40 additions & 3 deletions Tests/Segment-Tests/Analytics_Tests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,10 @@ final class Analytics_Tests: XCTestCase {

func testPurgeStorage() {
// Use a specific writekey to this test so we do not collide with other cached items.
let analytics = Analytics(configuration: Configuration(writeKey: "testFlush_do_not_reuse_this_writekey_either").flushInterval(9999).flushAt(9999))
let analytics = Analytics(configuration: Configuration(writeKey: "testFlush_do_not_reuse_this_writekey_either")
.flushInterval(9999)
.flushAt(9999)
.operatingMode(.synchronous))

waitUntilStarted(analytics: analytics)

Expand All @@ -479,13 +482,13 @@ final class Analytics_Tests: XCTestCase {
analytics.track(name: "test")

var newPendingCount = analytics.pendingUploads!.count
XCTAssertEqual(newPendingCount, 4)
XCTAssertEqual(newPendingCount, 1)

let pending = analytics.pendingUploads!
analytics.purgeStorage(fileURL: pending.first!)

newPendingCount = analytics.pendingUploads!.count
XCTAssertEqual(newPendingCount, 3)
XCTAssertEqual(newPendingCount, 0)

analytics.purgeStorage()
newPendingCount = analytics.pendingUploads!.count
Expand Down Expand Up @@ -688,4 +691,38 @@ final class Analytics_Tests: XCTestCase {
XCTAssertTrue(shared2 === shared)

}

func testServerOperatingMode() {
// Use a specific writekey to this test so we do not collide with other cached items.
let analytics = Analytics(configuration: Configuration(writeKey: "testFlush_serverMode")
.flushInterval(9999)
.flushAt(9999)
.operatingMode(.synchronous))

waitUntilStarted(analytics: analytics)

analytics.storage.hardReset(doYouKnowHowToUseThis: true)

@Atomic var completionCalled = false

// put an event in the pipe ...
analytics.track(name: "completion test1")
// flush it, that'll get us an upload going
analytics.flush {
// verify completion is called.
completionCalled = true
}

// completion shouldn't be called before flush returned.
XCTAssertTrue(completionCalled)
XCTAssertEqual(analytics.pendingUploads!.count, 0)

// put another event in the pipe.
analytics.track(name: "completion test2")
analytics.flush()

// flush shouldn't return until all uploads are done, cuz
// it's running in sync mode.
XCTAssertEqual(analytics.pendingUploads!.count, 0)
}
}
14 changes: 10 additions & 4 deletions Tests/Segment-Tests/FlushPolicy_Tests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,15 @@ class FlushPolicyTests: XCTestCase {

XCTAssertTrue(analytics.hasUnsentEvents)

// sleep for 4 seconds for 2 second flush policy
RunLoop.main.run(until: Date.init(timeIntervalSinceNow: 4))

XCTAssertFalse(analytics.hasUnsentEvents)
@Atomic var flushSent = false
while !flushSent {
RunLoop.main.run(until: Date.distantPast)
if analytics.pendingUploads!.count > 0 {
// flush was triggered
flushSent = true
}
}

XCTAssertTrue(flushSent)
}
}