Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions ably.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2484,6 +2484,19 @@ interface PathObjectBase {
listener: EventCallback<PathObjectSubscriptionEvent>,
options?: PathObjectSubscriptionOptions,
): SubscribeResponse;

/**
* Registers a subscription listener and returns an async iterator that yields
* subscription events each time the object or a primitive value at this path is updated.
*
* This method functions in the same way as the regular {@link PathObjectBase.subscribe | PathObject.subscribe()} method,
* but instead returns an async iterator that can be used in a `for await...of` loop for convenience.
*
* @param options - Optional subscription configuration.
* @returns An async iterator that yields {@link PathObjectSubscriptionEvent} objects.
* @experimental
*/
subscribeIterator(options?: PathObjectSubscriptionOptions): AsyncIterableIterator<PathObjectSubscriptionEvent>;
}

/**
Expand Down Expand Up @@ -2948,6 +2961,18 @@ interface InstanceBase<T extends Value> {
* @experimental
*/
subscribe(listener: EventCallback<InstanceSubscriptionEvent<T>>): SubscribeResponse;

/**
* Registers a subscription listener and returns an async iterator that yields
* subscription events each time this instance is updated.
*
* This method functions in the same way as the regular {@link InstanceBase.subscribe | Instance.subscribe()} method,
* but instead returns an async iterator that can be used in a `for await...of` loop for convenience.
*
* @returns An async iterator that yields {@link InstanceSubscriptionEvent} objects.
* @experimental
*/
subscribeIterator(): AsyncIterableIterator<InstanceSubscriptionEvent<T>>;
}

/**
Expand Down
53 changes: 53 additions & 0 deletions src/common/lib/util/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -477,3 +477,56 @@ export async function withTimeoutAsync<A>(promise: Promise<A>, timeout = 5000, e

type NonFunctionKeyNames<A> = { [P in keyof A]: A[P] extends Function ? never : P }[keyof A];
export type Properties<A> = Pick<A, NonFunctionKeyNames<A>>;

/**
* A subscription function that registers the provided listener and returns a function to deregister it.
*/
export type RegisterListenerFunction<T> = (listener: (event: T) => void) => () => void;

/**
* Converts a listener-based event emitter API into an async iterator
* that can be consumed using a `for await...of` loop.
*
* @param registerListener - A function that registers a listener and returns a function to remove it
* @returns An async iterator that yields events from the listener
*/
export async function* listenerToAsyncIterator<T>(
registerListener: RegisterListenerFunction<T>,
): AsyncIterableIterator<T> {
const eventQueue: T[] = [];
let resolveNext: ((event: T) => void) | null = null;

const removeListener = registerListener((event: T) => {
if (resolveNext) {
// If we have a waiting promise, resolve it immediately
const resolve = resolveNext;
resolveNext = null;
resolve(event);
} else {
// Otherwise, queue the event for later consumption
eventQueue.push(event);
}
});

try {
while (true) {
if (eventQueue.length > 0) {
// If we have queued events, yield the next one
yield eventQueue.shift()!;
} else {
if (resolveNext) {
throw new ErrorInfo('Concurrent next() calls are not supported', 40000, 400);
}

// Otherwise wait for the next event to arrive
const event = await new Promise<T>((resolve) => {
resolveNext = resolve;
});
yield event;
}
}
} finally {
// Clean up when iterator is done or abandoned
removeListener();
}
}
10 changes: 10 additions & 0 deletions src/plugins/objects/instance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,4 +173,14 @@ export class DefaultInstance<T extends Value> implements AnyInstance<T> {
});
});
}

subscribeIterator(): AsyncIterableIterator<InstanceSubscriptionEvent<T>> {
if (!(this._value instanceof LiveObject)) {
throw new this._client.ErrorInfo('Cannot subscribe to a non-LiveObject instance', 40000, 400);
}
return this._client.Utils.listenerToAsyncIterator((listener) => {
const { unsubscribe } = this.subscribe(listener);
return unsubscribe;
});
}
}
7 changes: 7 additions & 0 deletions src/plugins/objects/pathobject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,13 @@ export class DefaultPathObject implements AnyPathObject {
return this._realtimeObject.getPathObjectSubscriptionRegister().subscribe(this._path, listener, options ?? {});
}

subscribeIterator(options?: PathObjectSubscriptionOptions): AsyncIterableIterator<PathObjectSubscriptionEvent> {
return this._client.Utils.listenerToAsyncIterator((listener) => {
const { unsubscribe } = this.subscribe(listener, options);
return unsubscribe;
});
}

private _resolvePath(path: string[]): Value {
// TODO: remove type assertion when internal LiveMap is updated to support new path based type system
let current: Value = this._root as unknown as API.LiveMap;
Expand Down
7 changes: 6 additions & 1 deletion test/common/modules/private_api_recorder.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ define(['test/support/output_directory_paths'], function (outputDirectoryPaths)
'call.Defaults.getPort',
'call.Defaults.normaliseOptions',
'call.EventEmitter.emit',
'call.EventEmitter.listeners',
'call.LiveObject.getObjectId',
'call.LiveObject.isTombstoned',
'call.LiveObject.tombstonedAt',
Expand All @@ -28,6 +29,7 @@ define(['test/support/output_directory_paths'], function (outputDirectoryPaths)
'call.ProtocolMessage.setFlag',
'call.RealtimeObject._objectsPool._onGCInterval',
'call.RealtimeObject._objectsPool.get',
'call.RealtimeObject.getPathObjectSubscriptionRegister',
'call.Utils.copy',
'call.Utils.dataSizeBytes',
'call.Utils.getRetryTime',
Expand Down Expand Up @@ -77,9 +79,12 @@ define(['test/support/output_directory_paths'], function (outputDirectoryPaths)
'pass.clientOption.webSocketConnectTimeout',
'pass.clientOption.webSocketSlowTimeout',
'pass.clientOption.wsConnectivityCheckUrl', // actually ably-js public API (i.e. it’s in the TypeScript typings) but no other SDK has it. At the same time it's not entirely clear if websocket connectivity check should be considered an ably-js-specific functionality (as for other params above), so for the time being we consider it as private API
'read.DefaultInstance._value',
'read.Defaults.version',
'read.LiveMap._dataRef.data',
'read.EventEmitter.events',
'read.LiveMap._dataRef.data',
'read.LiveObject._instanceSubscriptions',
'read.PathObjectSubscriptionRegister._subscriptions',
'read.Platform.Config.push',
'read.ProtocolMessage.channelSerial',
'read.Realtime._transports',
Expand Down
Loading
Loading