Skip to content

Commit d8b0ce2

Browse files
authored
fix: grow subscriptions (#130)
1 parent dd1945f commit d8b0ce2

File tree

3 files changed

+16
-12
lines changed

3 files changed

+16
-12
lines changed

src/Client.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ export class Client<E extends Emitter = Emitter> extends Root {
8181
private async startBatchWatch(fetcher: StreamFetcher, listener: EventEmitter, eventName: string) {
8282
while (true) {
8383
const batch: Partial<EmitterStreamEvent<any>>[] = [];
84-
const events = fetcher.fetch();
84+
const events = await fetcher.fetch();
8585

8686
for await (const event of events) {
8787
let data: unknown;
@@ -97,7 +97,7 @@ export class Client<E extends Emitter = Emitter> extends Root {
9797
batch.push(message);
9898
}
9999
if (batch.length > 0) listener.emit(eventName, batch);
100-
events.stop();
100+
await events.close();
101101
}
102102
}
103103

src/StreamFetcher.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { JetStreamClient, JsMsg, QueuedIterator } from 'nats';
1+
import { ConsumerMessages, JetStreamClient } from 'nats';
22

33
interface BatcherOptions {
44
batchSize?: number;
@@ -13,11 +13,11 @@ export class StreamFetcher {
1313
private options: BatcherOptions,
1414
) {}
1515

16-
public fetch(noWait?: boolean, size?: number, expires?: number): QueuedIterator<JsMsg> {
17-
return this.jsClient.fetch(this.streamName, this.consumerName, {
18-
batch: size ?? this.options.batchSize,
16+
public async fetch(size?: number, expires?: number): Promise<ConsumerMessages> {
17+
const consumer = await this.jsClient.consumers.get(this.streamName, this.consumerName);
18+
return await consumer.fetch({
19+
max_messages: size ?? this.options.batchSize,
1920
expires: expires ?? this.options.batchTimeout,
20-
no_wait: noWait ?? this.options.noWait,
2121
});
2222
}
2323
}

src/__tests__/Client.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,11 @@ describe('Testing Client class methods', () => {
1818
request: jest.fn(),
1919
jetstream: () => ({
2020
subscribe: jetstreamSubscribeMock,
21-
fetch: jetstreamFetchMock,
21+
consumers: {
22+
get: jest.fn().mockResolvedValue({
23+
fetch: jetstreamFetchMock,
24+
}),
25+
},
2226
}),
2327
jetstreamManager: jest.fn().mockResolvedValue({
2428
streams: {
@@ -97,10 +101,10 @@ describe('Testing Client class methods', () => {
97101
const payload = { data: { elapsed: 42 } };
98102
const subscribe = new PassThrough({ objectMode: true });
99103
const secondSubscribe = new PassThrough({ objectMode: true });
100-
subscribe['stop'] = jest.fn();
101-
secondSubscribe['stop'] = jest.fn();
102-
jetstreamFetchMock.mockReturnValueOnce(subscribe);
103-
jetstreamFetchMock.mockReturnValueOnce(secondSubscribe);
104+
subscribe['close'] = jest.fn().mockResolvedValue('Ok');
105+
secondSubscribe['close'] = jest.fn().mockResolvedValue('Ok');
106+
jetstreamFetchMock.mockResolvedValueOnce(subscribe);
107+
jetstreamFetchMock.mockResolvedValueOnce(secondSubscribe);
104108

105109
const result = mathClient.getListener('Test', { batch: true });
106110

0 commit comments

Comments
 (0)