Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Examples/Examples.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,7 @@
"LD_RUNPATH_SEARCH_PATHS[sdk=macosx*]" = "@executable_path/../Frameworks";
LOCALIZATION_PREFERS_STRING_CATALOGS = YES;
MARKETING_VERSION = 1.0;
PRODUCT_BUNDLE_IDENTIFIER = com.supabase.SlackClone;
PRODUCT_BUNDLE_IDENTIFIER = "com.supabase.slack-clone";
PRODUCT_NAME = "$(TARGET_NAME)";
SDKROOT = auto;
SUPPORTED_PLATFORMS = "iphoneos iphonesimulator macosx xros xrsimulator";
Expand Down Expand Up @@ -809,7 +809,7 @@
"LD_RUNPATH_SEARCH_PATHS[sdk=macosx*]" = "@executable_path/../Frameworks";
LOCALIZATION_PREFERS_STRING_CATALOGS = YES;
MARKETING_VERSION = 1.0;
PRODUCT_BUNDLE_IDENTIFIER = com.supabase.SlackClone;
PRODUCT_BUNDLE_IDENTIFIER = "com.supabase.slack-clone";
PRODUCT_NAME = "$(TARGET_NAME)";
SDKROOT = auto;
SUPPORTED_PLATFORMS = "iphoneos iphonesimulator macosx xros xrsimulator";
Expand Down
36 changes: 14 additions & 22 deletions Examples/SlackClone/AuthView.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,20 @@ final class AuthViewModel {
var email = ""
var toast: ToastState?

func signInButtonTapped() {
Task {
do {
try await supabase.auth.signInWithOTP(
email: email,
redirectTo: URL(string: "slackclone://sign-in")
)
toast = ToastState(status: .success, title: "Check your inbox.")
} catch {
toast = ToastState(status: .error, title: "Error", description: error.localizedDescription)
}
}
}
func signInButtonTapped() async {
do {
try await supabase.auth.signInWithOTP(email: email)
toast = ToastState(status: .success, title: "Check your inbox.")

func handle(_ url: URL) {
Task {
do {
try await supabase.auth.session(from: url)
} catch {
toast = ToastState(status: .error, title: "Error", description: error.localizedDescription)
}
try? await Task.sleep(for: .seconds(1))

#if os(macOS)
NSWorkspace.shared.open(URL(string: "http://127.0.0.1:54324")!)
#else
await UIApplication.shared.open(URL(string: "http://127.0.0.1:54324")!)
#endif
} catch {
toast = ToastState(status: .error, title: "Error", description: error.localizedDescription)
}
}
}
Expand All @@ -54,12 +47,11 @@ struct AuthView: View {
.autocorrectionDisabled()
}
Button("Sign in with Magic Link") {
model.signInButtonTapped()
Task { await model.signInButtonTapped() }
}
}
.padding()
.toast(state: $model.toast)
.onOpenURL { model.handle($0) }
}
}

Expand Down
5 changes: 2 additions & 3 deletions Examples/SlackClone/Info.plist
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@
<key>CFBundleURLIconFile</key>
<string></string>
<key>CFBundleURLName</key>
<string>com.supabase.SlackClone</string>
<string>com.supabase.slack-clone</string>
<key>CFBundleURLSchemes</key>
<array>
<string>slackclone</string>
<string>com.supabase.slack-clone</string>
</array>
</dict>
<dict/>
</array>
</dict>
</plist>
3 changes: 3 additions & 0 deletions Examples/SlackClone/SlackCloneApp.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ struct SlackCloneApp: App {
var body: some Scene {
WindowGroup {
AppView(model: model)
.onOpenURL { url in
supabase.handle(url)
}
}
}
}
1 change: 1 addition & 0 deletions Examples/SlackClone/Supabase.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ let supabase = SupabaseClient(
supabaseKey: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6ImFub24iLCJleHAiOjE5ODM4MTI5OTZ9.CRXP1A7WOeoJeXxjNni43kdQwgnWNReilDMblYTn_I0",
options: SupabaseClientOptions(
db: .init(encoder: encoder, decoder: decoder),
auth: .init(redirectToURL: URL(string: "com.supabase.slack-clone://")),
global: SupabaseClientOptions.GlobalOptions(logger: LogStore.shared)
)
)
2 changes: 1 addition & 1 deletion Examples/SlackClone/supabase/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ enabled = true
# in emails.
site_url = "http://127.0.0.1:3000"
# A list of *exact* URLs that auth providers are permitted to redirect to post authentication.
additional_redirect_urls = ["https://127.0.0.1:3000", "slackclone://*"]
additional_redirect_urls = ["https://127.0.0.1:3000", "com.supabase.slack-clone://"]
# How long tokens are valid for, in seconds. Defaults to 3600 (1 hour), maximum 604,800 (1 week).
jwt_expiry = 3600
# If disabled, the refresh token will never expire.
Expand Down
140 changes: 14 additions & 126 deletions Sources/Realtime/V2/WebSocketClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,7 @@ protocol WebSocketClient: Sendable {
func send(_ message: RealtimeMessageV2) async throws
func receive() -> AsyncThrowingStream<RealtimeMessageV2, any Error>
func connect() -> AsyncStream<ConnectionStatus>
func disconnect(closeCode: URLSessionWebSocketTask.CloseCode)
}

extension WebSocketClient {
func disconnect() {
disconnect(closeCode: .normalClosure)
}
func disconnect()
}

final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @unchecked Sendable {
Expand All @@ -39,7 +33,7 @@ final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @

struct MutableState {
var continuation: AsyncStream<ConnectionStatus>.Continuation?
var stream: SocketStream?
var connection: WebSocketConnection<RealtimeMessageV2, RealtimeMessageV2>?
}

private let mutableState = LockIsolated(MutableState())
Expand All @@ -57,7 +51,7 @@ final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @
mutableState.withValue { state in
let session = URLSession(configuration: configuration, delegate: self, delegateQueue: nil)
let task = session.webSocketTask(with: realtimeURL)
state.stream = SocketStream(task: task)
state.connection = WebSocketConnection(task: task)
task.resume()

let (stream, continuation) = AsyncStream<ConnectionStatus>.makeStream()
Expand All @@ -66,51 +60,27 @@ final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @
}
}

func disconnect(closeCode: URLSessionWebSocketTask.CloseCode) {
func disconnect() {
mutableState.withValue { state in
state.stream?.cancel(with: closeCode)
state.connection?.close()
}
}

func receive() -> AsyncThrowingStream<RealtimeMessageV2, any Error> {
mutableState.withValue { mutableState in
guard let stream = mutableState.stream else {
return .finished(
throwing: RealtimeError(
"receive() called before connect(). Make sure to call `connect()` before calling `receive()`."
)
guard let connection = mutableState.connection else {
return .finished(
throwing: RealtimeError(
"receive() called before connect(). Make sure to call `connect()` before calling `receive()`."
)
}

return stream.map { message in
switch message {
case let .string(stringMessage):
self.logger?.verbose("Received message: \(stringMessage)")

guard let data = stringMessage.data(using: .utf8) else {
throw RealtimeError("Expected a UTF8 encoded message.")
}

let message = try JSONDecoder().decode(RealtimeMessageV2.self, from: data)
return message

case .data:
fallthrough

default:
throw RealtimeError("Unsupported message type.")
}
}
.eraseToThrowingStream()
)
}

return connection.receive()
}

func send(_ message: RealtimeMessageV2) async throws {
let data = try JSONEncoder().encode(message)
let string = String(decoding: data, as: UTF8.self)

logger?.verbose("Sending message: \(string)")
try await mutableState.stream?.send(.string(string))
logger?.verbose("Sending message: \(message)")
try await mutableState.connection?.send(message)
}

// MARK: - URLSessionWebSocketDelegate
Expand Down Expand Up @@ -145,85 +115,3 @@ final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @
mutableState.continuation?.yield(.error(error))
}
}

typealias WebSocketStream = AsyncThrowingStream<URLSessionWebSocketTask.Message, any Error>

final class SocketStream: AsyncSequence, Sendable {
typealias AsyncIterator = WebSocketStream.Iterator
typealias Element = URLSessionWebSocketTask.Message

struct MutableState {
var continuation: WebSocketStream.Continuation?
var stream: WebSocketStream?
}

private let task: URLSessionWebSocketTask
private let mutableState = LockIsolated(MutableState())

private func makeStreamIfNeeded() -> WebSocketStream {
mutableState.withValue { state in
if let stream = state.stream {
return stream
}

let stream = WebSocketStream { continuation in
state.continuation = continuation
waitForNextValue()
}

state.stream = stream
return stream
}
}

private func waitForNextValue() {
guard task.closeCode == .invalid else {
mutableState.continuation?.finish()
return
}

task.receive { [weak self] result in
guard let continuation = self?.mutableState.continuation else { return }

do {
let message = try result.get()
continuation.yield(message)
self?.waitForNextValue()
} catch {
continuation.finish(throwing: error)
}
}
}

init(task: URLSessionWebSocketTask) {
self.task = task
}

deinit {
mutableState.continuation?.finish()
}

func makeAsyncIterator() -> WebSocketStream.Iterator {
makeStreamIfNeeded().makeAsyncIterator()
}

func cancel(with closeCode: URLSessionWebSocketTask.CloseCode = .goingAway) {
task.cancel(with: closeCode, reason: nil)
mutableState.continuation?.finish()
}

func send(_ message: URLSessionWebSocketTask.Message) async throws {
try await task.send(message)
}
}

#if os(Linux) || os(Windows)
extension URLSessionWebSocketTask {
func receive(completionHandler: @Sendable @escaping (Result<URLSessionWebSocketTask.Message, any Error>) -> Void) {
Task {
let result = await Result(catching: { try await self.receive() })
completionHandler(result)
}
}
}
#endif
77 changes: 77 additions & 0 deletions Sources/Realtime/V2/WebSocketConnection.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
//
// WebSocketConnection.swift
//
//
// Created by Guilherme Souza on 29/03/24.
//

import Foundation

#if canImport(FoundationNetworking)
import FoundationNetworking
#endif

enum WebSocketConnectionError: Error {
case unsupportedData
}

final class WebSocketConnection<Incoming: Codable, Outgoing: Codable>: Sendable {
private let task: URLSessionWebSocketTask
private let encoder: JSONEncoder
private let decoder: JSONDecoder

init(
task: URLSessionWebSocketTask,
encoder: JSONEncoder = JSONEncoder(),
decoder: JSONDecoder = JSONDecoder()
) {
self.task = task
self.encoder = encoder
self.decoder = decoder

task.resume()
}

deinit {
task.cancel(with: .goingAway, reason: nil)
}

func receiveOnce() async throws -> Incoming {
switch try await task.receive() {
case let .data(data):
let message = try decoder.decode(Incoming.self, from: data)
return message

case let .string(string):
guard let data = string.data(using: .utf8) else {
throw WebSocketConnectionError.unsupportedData
}

let message = try decoder.decode(Incoming.self, from: data)
return message

@unknown default:
assertionFailure("Unsupported message type.")
task.cancel(with: .unsupportedData, reason: nil)
throw WebSocketConnectionError.unsupportedData
}
}

func send(_ message: Outgoing) async throws {
let data = try encoder.encode(message)
try await task.send(.data(data))
}

func receive() -> AsyncThrowingStream<Incoming, any Error> {
AsyncThrowingStream { [weak self] in
guard let self else { return nil }

let message = try await receiveOnce()
return Task.isCancelled ? nil : message
}
}

func close() {
task.cancel(with: .normalClosure, reason: nil)
}
}
4 changes: 2 additions & 2 deletions Sources/TestHelpers/HTTPClientMock.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ package actor HTTPClientMock: HTTPClientType {
package struct MockNotFound: Error {}

private var mocks = [@Sendable (HTTPRequest) async throws -> HTTPResponse?]()

/// Requests received by this client in order.
package var receivedRequests: [HTTPRequest] = []

Expand Down Expand Up @@ -47,7 +47,7 @@ package actor HTTPClientMock: HTTPClientType {
package func send(_ request: HTTPRequest) async throws -> HTTPResponse {
receivedRequests.append(request)

for mock in mocks{
for mock in mocks {
do {
if let response = try await mock(request) {
returnedResponses.append(.success(response))
Expand Down
Loading