Skip to content

Commit 423d1a8

Browse files
gleipgleip
andauthored
feat: add rollup support (#138)
Co-authored-by: gleip <[email protected]>
1 parent 51d63a1 commit 423d1a8

File tree

2 files changed

+16
-3
lines changed

2 files changed

+16
-3
lines changed

src/Service.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,14 @@ export class Service<E extends Emitter = Emitter> extends Root {
4848
* de-duplication within the configured Duplicate Window
4949
*/
5050
private readonly UNIQ_ID_HEADER = 'Nats-Msg-Id';
51+
/**
52+
* Nats-Rollup header indicating all prior messages should be purged
53+
*/
54+
private readonly ROLLUP_HEADER = 'Nats-Rollup';
55+
/**
56+
* Roll-up only same subject message in the stream
57+
*/
58+
private readonly ROLLUP_STRATEGY = 'sub';
5159

5260
constructor(private options: ServiceOptions<E>) {
5361
super(options.brokerConnection, options.loggerOutputFormatter);
@@ -57,7 +65,7 @@ export class Service<E extends Emitter = Emitter> extends Root {
5765
if (options.events) {
5866
const events = Object.keys(options.events.list) as [keyof E];
5967
this.emitter = events.reduce((result, eventName) => {
60-
result[eventName] = ((params: unknown, uniqId?: string) => {
68+
result[eventName] = ((params: unknown, uniqId?: string, rollupId?: string) => {
6169
const subject: string[] = [options.name];
6270

6371
const eventOptions = options.events?.list[eventName];
@@ -79,6 +87,11 @@ export class Service<E extends Emitter = Emitter> extends Root {
7987
settings = { headers: headers() };
8088
settings.headers.append(this.UNIQ_ID_HEADER, uniqId);
8189
}
90+
if (rollupId) {
91+
settings = settings ?? { headers: headers() };
92+
settings.headers.append(this.ROLLUP_HEADER, this.ROLLUP_STRATEGY);
93+
subject.push(rollupId);
94+
}
8295

8396
this.broker.publish(subject.join('.'), this.buildMessage(params), settings);
8497
}) as E[keyof E];

src/StreamManager.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ export class StreamManager extends Root {
9696
): Promise<StreamSingleMsgFetcher | StreamBatchMsgFetcher> {
9797
const consumerName = this.capitalizeFirstLetter(serviceNameFrom) + this.capitalizeFirstLetter(eventName);
9898
const prefix = this.param.options.prefix;
99-
const subject = `${this.param.serviceName}.${prefix}.${eventName}`;
99+
const subject = `${this.param.serviceName}.${prefix}.${eventName}.*`;
100100

101101
if (!this.jsm) {
102102
this.jsm = await this.param.broker.jetstreamManager();
@@ -154,7 +154,7 @@ export class StreamManager extends Root {
154154
if (!isConsumerExist) {
155155
await this.jsm.consumers.add(streamName, { ...options.config, filter_subject: subject });
156156
} else {
157-
await this.jsm.consumers.update(streamName, consumerName, options.config);
157+
await this.jsm.consumers.update(streamName, consumerName, { ...options.config, filter_subject: subject });
158158
}
159159
}
160160

0 commit comments

Comments
 (0)