Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 69 additions & 4 deletions Sources/Segment/Analytics.swift
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ extension Analytics {
return nil
}

/// Returns the current operating mode this instance was given.
public var operatingMode: OperatingMode {
return configuration.values.operatingMode
}

/// Adjusts the flush interval post configuration.
public var flushInterval: TimeInterval {
get {
Expand Down Expand Up @@ -196,16 +201,52 @@ extension Analytics {
}

/// Tells this instance of Analytics to flush any queued events up to Segment.com. This command will also
/// be sent to each plugin present in the system.
public func flush() {
/// be sent to each plugin present in the system. A completion handler can be optionally given and will be
/// called when flush has completed.
public func flush(completion: (() -> Void)? = nil) {
// only flush if we're enabled.
guard enabled == true else { return }

let flushGroup = DispatchGroup()
// gotta call enter at least once before we ask to be notified.
flushGroup.enter()

apply { plugin in
if let p = plugin as? EventPlugin {
p.flush()
operatingMode.run(queue: configuration.values.flushQueue) {
if let p = plugin as? FlushCompletion {
// this is async
// flush(group:completion:) handles the enter/leave.
p.flush(group: flushGroup) { plugin in
// we don't really care about the plugin value .. yet.
}
} else if let p = plugin as? EventPlugin {
// we have no idea if this will be async or not, assume it's sync.
flushGroup.enter()
p.flush()
flushGroup.leave()
}
}
}

// if we're not in server mode, we need to be notified when it's done.
if let completion, operatingMode != .server {
// set up our callback to know when the group has completed, if we're not
// in .server operating mode.
flushGroup.notify(queue: configuration.values.flushQueue) {
completion()
}
}

flushGroup.leave() // matches our initial enter().

// if we ARE in server mode, we need to wait on the group.
// This effectively ends up being a `sync` operation.
if operatingMode == .server {
flushGroup.wait()
// we need to call completion on our own since
// we skipped setting up notify.
if let completion { completion() }
}
}

/// Resets this instance of Analytics to a clean slate. Traits, UserID's, anonymousId, etc are all cleared or reset. This
Expand Down Expand Up @@ -384,3 +425,27 @@ extension Analytics {
return configuration.values.writeKey == Self.deadInstance
}
}

// MARK: Operating mode based scheduling

extension OperatingMode {
func run(queue: DispatchQueue, task: @escaping () -> Void) {
switch self {
case .client:
queue.async {
task()
}
case .server:
// we need to be careful about doing a sync on the same queue we're running
// on, and we have no way of knowing that. So we're gonna dispatch to a
// safe place elsewhere, and just wait.
let group = DispatchGroup()
group.enter()
DispatchQueue.global(qos: .utility).async {
task()
group.leave()
}
group.wait()
}
}
}
29 changes: 29 additions & 0 deletions Sources/Segment/Configuration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ import Foundation
import FoundationNetworking
#endif

// MARK: - Operating Mode
/// Specifies the operating mode/context
public enum OperatingMode {
/// Running as an executable on a server
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Callout to project README.md to list differences.

case server
/// Running as a long-lived application
case client
}

// MARK: - Internal Configuration

public class Configuration {
Expand All @@ -26,6 +35,8 @@ public class Configuration {
var requestFactory: ((URLRequest) -> URLRequest)? = nil
var errorHandler: ((Error) -> Void)? = nil
var flushPolicies: [FlushPolicy] = [CountBasedFlushPolicy(), IntervalBasedFlushPolicy()]
var operatingMode: OperatingMode = .client
var flushQueue: DispatchQueue = DispatchQueue(label: "com.segment.flushQueue", qos: .background)
}

internal var values: Values
Expand Down Expand Up @@ -182,6 +193,24 @@ public extension Configuration {
values.flushPolicies = policies
return self
}

/// Informs the Analytics instance of its operating mode/context.
/// Use `.server` when operating in a web service, or when synchronous operation
/// is desired. Use `.client` when operating in a long lived process,
/// desktop/mobile application.
@discardableResult
func operatingMode(_ mode: OperatingMode) -> Configuration {
values.operatingMode = mode
return self
}

/// Specify a custom queue to use when performing a flush operation. The default
/// value is a Segment owned background queue.
@discardableResult
func flushQueue(_ queue: DispatchQueue) -> Configuration {
values.flushQueue = queue
return self
}
}

extension Analytics {
Expand Down
4 changes: 4 additions & 0 deletions Sources/Segment/Plugins.swift
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ public protocol VersionedPlugin {
static func version() -> String
}

public protocol FlushCompletion {
func flush(group: DispatchGroup, completion: @escaping (DestinationPlugin) -> Void)
}

// For internal platform-specific bits
internal protocol PlatformPlugin: Plugin { }

Expand Down
14 changes: 12 additions & 2 deletions Sources/Segment/Plugins/SegmentDestination.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import Sovran
import FoundationNetworking
#endif

public class SegmentDestination: DestinationPlugin, Subscriber {
public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion {
internal enum Constants: String {
case integrationName = "Segment.io"
case apiHost = "apiHost"
Expand Down Expand Up @@ -113,6 +113,10 @@ public class SegmentDestination: DestinationPlugin, Subscriber {
}

public func flush() {
// unused .. see flush(group:completion:)
}

public func flush(group: DispatchGroup, completion: @escaping (DestinationPlugin) -> Void) {
guard let storage = self.storage else { return }
guard let analytics = self.analytics else { return }
guard let httpClient = self.httpClient else { return }
Expand All @@ -131,7 +135,9 @@ public class SegmentDestination: DestinationPlugin, Subscriber {
if pendingUploads == 0 {
for url in data {
analytics.log(message: "Processing Batch:\n\(url.lastPathComponent)")

// enter the dispatch group
group.enter()
// set up the task
let uploadTask = httpClient.startBatchUpload(writeKey: analytics.configuration.values.writeKey, batch: url) { (result) in
switch result {
case .success(_):
Expand All @@ -146,6 +152,10 @@ public class SegmentDestination: DestinationPlugin, Subscriber {
// make sure it gets removed and it's cleanup() called rather
// than waiting on the next flush to come around.
self.cleanupUploads()
// call the completion
completion(self)
// leave the dispatch group
group.leave()
}
// we have a legit upload in progress now, so add it to our list.
if let upload = uploadTask {
Expand Down
4 changes: 3 additions & 1 deletion Sources/Segment/Settings.swift
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,14 @@ extension Analytics {
// we don't really wanna wait for this network call during tests...
// but we should make it work similarly.
store.dispatch(action: System.ToggleRunningAction(running: false))
DispatchQueue.main.async {

operatingMode.run(queue: DispatchQueue.main) {
if let state: System = self.store.currentState(), let settings = state.settings {
self.store.dispatch(action: System.UpdateSettingsAction(settings: settings))
}
self.store.dispatch(action: System.ToggleRunningAction(running: true))
}

return
}
#endif
Expand Down
2 changes: 1 addition & 1 deletion Sources/Segment/Startup.swift
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ extension Analytics: Subscriber {
plugins += VendorSystem.current.requiredPlugins

// setup lifecycle if desired
if configuration.values.trackApplicationLifecycleEvents {
if configuration.values.trackApplicationLifecycleEvents, operatingMode != .server {
#if os(iOS) || os(tvOS)
plugins.append(iOSLifecycleEvents())
#endif
Expand Down
7 changes: 5 additions & 2 deletions Sources/Segment/Utilities/Logging.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@
import Foundation

extension Analytics {
internal enum LogKind {
internal enum LogKind: CustomStringConvertible, CustomDebugStringConvertible {
case error
case warning
case debug
case none

var description: String { return string }
var debugDescription: String { return string }

var string: String {
switch self {
case .error:
Expand All @@ -23,7 +26,7 @@ extension Analytics {
case .debug:
return "SEG_DEBUG: "
case .none:
return ""
return "SEG_INFO: "
}
}
}
Expand Down
43 changes: 40 additions & 3 deletions Tests/Segment-Tests/Analytics_Tests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,10 @@ final class Analytics_Tests: XCTestCase {

func testPurgeStorage() {
// Use a specific writekey to this test so we do not collide with other cached items.
let analytics = Analytics(configuration: Configuration(writeKey: "testFlush_do_not_reuse_this_writekey_either").flushInterval(9999).flushAt(9999))
let analytics = Analytics(configuration: Configuration(writeKey: "testFlush_do_not_reuse_this_writekey_either")
.flushInterval(9999)
.flushAt(9999)
.operatingMode(.server))

waitUntilStarted(analytics: analytics)

Expand All @@ -432,13 +435,13 @@ final class Analytics_Tests: XCTestCase {
analytics.track(name: "test")

var newPendingCount = analytics.pendingUploads!.count
XCTAssertEqual(newPendingCount, 4)
XCTAssertEqual(newPendingCount, 1)

let pending = analytics.pendingUploads!
analytics.purgeStorage(fileURL: pending.first!)

newPendingCount = analytics.pendingUploads!.count
XCTAssertEqual(newPendingCount, 3)
XCTAssertEqual(newPendingCount, 0)

analytics.purgeStorage()
newPendingCount = analytics.pendingUploads!.count
Expand Down Expand Up @@ -641,4 +644,38 @@ final class Analytics_Tests: XCTestCase {
XCTAssertTrue(shared2 === shared)

}

func testServerOperatingMode() {
// Use a specific writekey to this test so we do not collide with other cached items.
let analytics = Analytics(configuration: Configuration(writeKey: "testFlush_serverMode")
.flushInterval(9999)
.flushAt(9999)
.operatingMode(.server))

waitUntilStarted(analytics: analytics)

analytics.storage.hardReset(doYouKnowHowToUseThis: true)

@Atomic var completionCalled = false

// put an event in the pipe ...
analytics.track(name: "completion test1")
// flush it, that'll get us an upload going
analytics.flush {
// verify completion is called.
completionCalled = true
}

// completion shouldn't be called before flush returned.
XCTAssertTrue(completionCalled)
XCTAssertEqual(analytics.pendingUploads!.count, 0)

// put another event in the pipe.
analytics.track(name: "completion test2")
analytics.flush()

// flush shouldn't return until all uploads are done, cuz
// it's running in sync mode.
XCTAssertEqual(analytics.pendingUploads!.count, 0)
}
}
14 changes: 10 additions & 4 deletions Tests/Segment-Tests/FlushPolicy_Tests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,15 @@ class FlushPolicyTests: XCTestCase {

XCTAssertTrue(analytics.hasUnsentEvents)

// sleep for 4 seconds for 2 second flush policy
RunLoop.main.run(until: Date.init(timeIntervalSinceNow: 4))

XCTAssertFalse(analytics.hasUnsentEvents)
@Atomic var flushSent = false
while !flushSent {
RunLoop.main.run(until: Date.distantPast)
if analytics.pendingUploads!.count > 0 {
// flush was triggered
flushSent = true
}
}

XCTAssertTrue(flushSent)
}
}