Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/StreamManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export class StreamManager extends Root {
private readonly GREATER_WILDCARD = '>';
private readonly TWO_WEEKS_IN_SECOND = 1209600;
private readonly ONE_DAY_IN_SECOND = 86400;
private readonly CONSUMER_NOT_FOUND = 'consumer not found';

private readonly defaultStreamOption: Omit<Required<StreamAction>, 'action' | 'maxBytes'> &
Pick<StreamAction, 'maxBytes'> = {
Expand Down Expand Up @@ -155,7 +156,15 @@ export class StreamManager extends Root {
}

if (isConsumerOptsBuilder(options)) {
await this.jsm.consumers.add(streamName, { ...options.config, filter_subject: subject });
const isConsumerExist = await this.jsm.consumers.info(streamName, consumerName).catch(async error => {
if (error.message === this.CONSUMER_NOT_FOUND) {
return false;
}
throw error;
});
if (!isConsumerExist) {
await this.jsm.consumers.add(streamName, { ...options.config, filter_subject: subject });
}
}

return isPullConsumer
Expand Down
3 changes: 2 additions & 1 deletion src/__tests__/Client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ describe('Testing Client class methods', () => {
},
consumers: {
add: jetstreamManagerConsumersAdd.mockResolvedValue(true),
info: jetstreamManagerConsumersAdd.mockResolvedValue(false),
},
}),
};
Expand Down Expand Up @@ -91,7 +92,7 @@ describe('Testing Client class methods', () => {
});
});

describe('Fetch event butchs', () => {
describe('Fetch event batch', () => {
test('Successful fetch events from stream', done => {
const payload = { data: { elapsed: 42 } };
const subscribe = new PassThrough({ objectMode: true });
Expand Down
Loading