Skip to content

Commit 8664809

Browse files
authored
fix: Error for create same consumer (#124)
1 parent edc4580 commit 8664809

File tree

2 files changed

+12
-2
lines changed

2 files changed

+12
-2
lines changed

src/StreamManager.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ export class StreamManager extends Root {
1919
private readonly GREATER_WILDCARD = '>';
2020
private readonly TWO_WEEKS_IN_SECOND = 1209600;
2121
private readonly ONE_DAY_IN_SECOND = 86400;
22+
private readonly CONSUMER_NOT_FOUND = 'consumer not found';
2223

2324
private readonly defaultStreamOption: Omit<Required<StreamAction>, 'action' | 'maxBytes'> &
2425
Pick<StreamAction, 'maxBytes'> = {
@@ -155,7 +156,15 @@ export class StreamManager extends Root {
155156
}
156157

157158
if (isConsumerOptsBuilder(options)) {
158-
await this.jsm.consumers.add(streamName, { ...options.config, filter_subject: subject });
159+
const isConsumerExist = await this.jsm.consumers.info(streamName, consumerName).catch(async error => {
160+
if (error.message === this.CONSUMER_NOT_FOUND) {
161+
return false;
162+
}
163+
throw error;
164+
});
165+
if (!isConsumerExist) {
166+
await this.jsm.consumers.add(streamName, { ...options.config, filter_subject: subject });
167+
}
159168
}
160169

161170
return isPullConsumer

src/__tests__/Client.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ describe('Testing Client class methods', () => {
2626
},
2727
consumers: {
2828
add: jetstreamManagerConsumersAdd.mockResolvedValue(true),
29+
info: jetstreamManagerConsumersAdd.mockResolvedValue(false),
2930
},
3031
}),
3132
};
@@ -91,7 +92,7 @@ describe('Testing Client class methods', () => {
9192
});
9293
});
9394

94-
describe('Fetch event butchs', () => {
95+
describe('Fetch event batch', () => {
9596
test('Successful fetch events from stream', done => {
9697
const payload = { data: { elapsed: 42 } };
9798
const subscribe = new PassThrough({ objectMode: true });

0 commit comments

Comments
 (0)