From 1179ddd8ac85eb0830c33a32ca797351b244dc55 Mon Sep 17 00:00:00 2001 From: Sergey Kuchin <3on.gleip@gmail.com> Date: Fri, 8 Nov 2024 12:36:13 +0300 Subject: [PATCH] feat: Uniq key for event --- examples/MathService/interfaces.ts | 6 +++--- src/Service.ts | 18 +++++++++++++++--- src/__tests__/Service.ts | 15 +++++++++++++++ 3 files changed, 33 insertions(+), 6 deletions(-) diff --git a/examples/MathService/interfaces.ts b/examples/MathService/interfaces.ts index 61c9bf6..b6807a4 100644 --- a/examples/MathService/interfaces.ts +++ b/examples/MathService/interfaces.ts @@ -15,9 +15,9 @@ export interface FibonacciNumberEvent { } export type EmitterMath = { - Elapsed: (params: ElapsedEvent) => void; - Notify: (params: NotifyEvent) => void; - FibonacciNumber: (params: FibonacciNumberEvent) => void; + Elapsed: (params: ElapsedEvent, uniqId?: string) => void; + Notify: (params: NotifyEvent, uniqId?: string) => void; + FibonacciNumber: (params: FibonacciNumberEvent, uniqId?: string) => void; }; export type EmitterMathExternal = { diff --git a/src/Service.ts b/src/Service.ts index 55c0967..1546211 100644 --- a/src/Service.ts +++ b/src/Service.ts @@ -1,5 +1,5 @@ import { Root } from './Root'; -import { JSONCodec, Subscription, DebugEvents, Events } from 'nats'; +import { JSONCodec, Subscription, DebugEvents, Events, headers, MsgHdrs } from 'nats'; import { Message, Emitter, @@ -43,6 +43,12 @@ export class Service extends Root { private httpMethods = new Map(); private rootSpans = new Map(); + /** + * Unique identifier NATS header for a message that will be used by the server apply + * de-duplication within the configured Duplicate Window + */ + private readonly UNIQ_ID_HEADER = 'Nats-Msg-Id'; + constructor(private options: ServiceOptions) { super(options.brokerConnection, options.loggerOutputFormatter); @@ -51,7 +57,7 @@ export class Service extends Root { if (options.events) { const events = Object.keys(options.events.list) as [keyof E]; this.emitter = events.reduce((result, eventName) => { - result[eventName] = ((params: unknown) => { + result[eventName] = ((params: unknown, uniqId?: string) => { const subject: string[] = [options.name]; const eventOptions = options.events?.list[eventName]; @@ -68,7 +74,13 @@ export class Service extends Root { subject.push(String(eventName)); - this.broker.publish(subject.join('.'), this.buildMessage(params)); + let settings: { headers: MsgHdrs } | undefined; + if (uniqId) { + settings = { headers: headers() }; + settings.headers.append(this.UNIQ_ID_HEADER, uniqId); + } + + this.broker.publish(subject.join('.'), this.buildMessage(params), settings); }) as E[keyof E]; return result; }, this.emitter); diff --git a/src/__tests__/Service.ts b/src/__tests__/Service.ts index 854726d..073c0a8 100644 --- a/src/__tests__/Service.ts +++ b/src/__tests__/Service.ts @@ -32,6 +32,7 @@ describe('Testing Service class methods', () => { subscribe: jetstreamSubscribeMock, }), status: () => brokerEvents, + publish: jest.fn(), }; const codec = JSONCodec(); @@ -234,6 +235,20 @@ describe('Testing Service class methods', () => { await setTimeout(1); expect(result).toMatchObject(streamResponse); }); + + test('Event sending', async () => { + const mathService = new Service({ + name, + brokerConnection: broker as any, + methods: [], + events, + }); + + await mathService.start(); + + mathService.emitter.Notify({ method: 'Test' }, 'uniq_key'); + expect(broker.publish).toBeCalledTimes(1); + }); }); describe('Other', () => {