Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion core/spec/scenarios/basic-workflow.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { configureWorkflow, WorkflowHost, WorkflowBuilder, WorkflowStatus, WorkflowBase, StepBody, StepExecutionContext, ExecutionResult, WorkflowInstance, ConsoleLogger } from "../../src";
import { configureWorkflow, WorkflowHost, WorkflowBuilder, WorkflowStatus, WorkflowBase, StepBody, StepExecutionContext, ExecutionResult, WorkflowInstance, ConsoleLogger, PollWorker, EventQueueWorker, WorkflowQueueWorker } from "../../src";
import { MemoryPersistenceProvider } from "../../src/services/memory-persistence-provider";
import { spinWaitCallback } from "../helpers/spin-wait";

Expand Down Expand Up @@ -41,6 +41,13 @@ let basicWorkflowScope = {
let config = configureWorkflow();
config.useLogger(new ConsoleLogger());
config.usePersistence(persistence);
const pollWorker = new PollWorker();
pollWorker.setInterval(500);
config.usePollWorker(pollWorker);
const eventQueueWorker = new EventQueueWorker();
config.useEventQueueWorker(eventQueueWorker);
const workflowQueueWorker = new WorkflowQueueWorker();
config.useWorkflowQueueWorker(workflowQueueWorker);
let host = config.getHost();

jasmine.DEFAULT_TIMEOUT_INTERVAL = 20000;
Expand Down
3 changes: 3 additions & 0 deletions core/src/abstractions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ export * from "./abstractions/workflow-host";
export * from "./abstractions/workflow-registry";
export * from "./abstractions/logger";
export * from "./abstractions/background-worker";
export * from "./abstractions/poll-worker";
export * from "./abstractions/event-queue-worker";
export * from "./abstractions/workflow-queue-worker";
export * from "./abstractions/types";
export * from "./abstractions/execution-result-processor";
export * from "./abstractions/execution-pointer-factory";
2 changes: 2 additions & 0 deletions core/src/abstractions/background-worker.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { Container } from "inversify";

export interface IBackgroundWorker {
updateFromContainer(container: Container);
start();
stop();
}
4 changes: 4 additions & 0 deletions core/src/abstractions/event-queue-worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import { IBackgroundWorker } from "./background-worker";

export interface IEventQueueWorker extends IBackgroundWorker {
}
4 changes: 4 additions & 0 deletions core/src/abstractions/poll-worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import { IBackgroundWorker } from "./background-worker";

export interface IPollWorker extends IBackgroundWorker {
}
3 changes: 3 additions & 0 deletions core/src/abstractions/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ let TYPES = {
IPersistenceProvider: Symbol("IPersistenceProvider"),
IQueueProvider: Symbol("IQueueProvider"),
IBackgroundWorker: Symbol("IBackgroundWorker"),
IEventQueueWorker: Symbol("IEventQueueWorker"),
IPollWorker: Symbol("IPollWorker"),
IWorkflowQueueWorker: Symbol("IWorkflowQueueWorker"),
IWorkflowExecutor: Symbol("IWorkflowExecutor"),
IExecutionResultProcessor: Symbol("IExecutionResultProcessor"),
IExecutionPointerFactory: Symbol("IExecutionPointerFactory")
Expand Down
4 changes: 4 additions & 0 deletions core/src/abstractions/workflow-queue-worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import { IBackgroundWorker } from "./background-worker";

export interface IWorkflowQueueWorker extends IBackgroundWorker {
}
23 changes: 19 additions & 4 deletions core/src/config.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import "reflect-metadata";
import { Container, ContainerModule, interfaces, injectable, inject } from "inversify";
import { TYPES, IWorkflowRegistry, IQueueProvider, IWorkflowHost, IPersistenceProvider, IDistributedLockProvider, IWorkflowExecutor, IBackgroundWorker, IExecutionResultProcessor, IExecutionPointerFactory, ILogger } from "./abstractions";
import { TYPES, IWorkflowRegistry, IQueueProvider, IWorkflowHost, IPersistenceProvider, IDistributedLockProvider, IWorkflowExecutor, IPollWorker, IWorkflowQueueWorker, IEventQueueWorker, IExecutionResultProcessor, IExecutionPointerFactory, ILogger } from "./abstractions";
import { SingleNodeQueueProvider, SingleNodeLockProvider, MemoryPersistenceProvider, WorkflowExecutor, WorkflowQueueWorker, EventQueueWorker, PollWorker, WorkflowRegistry, WorkflowHost, ExecutionResultProcessor, ExecutionPointerFactory, NullLogger, ConsoleLogger } from "./services";

export class WorkflowConfig {
Expand Down Expand Up @@ -30,6 +30,21 @@ export class WorkflowConfig {
this.container.rebind<IDistributedLockProvider>(TYPES.IDistributedLockProvider).toConstantValue(service);
}

public usePollWorker(service: IPollWorker) {
service.updateFromContainer(this.container);
this.container.rebind<IPollWorker>(TYPES.IPollWorker).toConstantValue(service);
}

public useEventQueueWorker(service: IEventQueueWorker) {
service.updateFromContainer(this.container);
this.container.rebind<IEventQueueWorker>(TYPES.IEventQueueWorker).toConstantValue(service);
}

public useWorkflowQueueWorker(service: IWorkflowQueueWorker) {
service.updateFromContainer(this.container);
this.container.rebind<IWorkflowQueueWorker>(TYPES.IWorkflowQueueWorker).toConstantValue(service);
}

public getHost(): IWorkflowHost {
return this.container.get<IWorkflowHost>(TYPES.IWorkflowHost);
}
Expand All @@ -46,9 +61,9 @@ export function configureWorkflow(): WorkflowConfig {
bind<IExecutionResultProcessor>(TYPES.IExecutionResultProcessor).to(ExecutionResultProcessor);
bind<IExecutionPointerFactory>(TYPES.IExecutionPointerFactory).to(ExecutionPointerFactory);

bind<IBackgroundWorker>(TYPES.IBackgroundWorker).to(WorkflowQueueWorker);
bind<IBackgroundWorker>(TYPES.IBackgroundWorker).to(EventQueueWorker);
bind<IBackgroundWorker>(TYPES.IBackgroundWorker).to(PollWorker);
bind<IWorkflowQueueWorker>(TYPES.IWorkflowQueueWorker).to(WorkflowQueueWorker);
bind<IEventQueueWorker>(TYPES.IEventQueueWorker).to(EventQueueWorker);
bind<IPollWorker>(TYPES.IPollWorker).to(PollWorker);

bind<IWorkflowHost>(TYPES.IWorkflowHost).to(WorkflowHost).inSingletonScope();

Expand Down
29 changes: 23 additions & 6 deletions core/src/services/event-queue-worker.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { inject, injectable } from "inversify";
import { inject, injectable, Container } from "inversify";
import { WorkflowInstance, WorkflowStatus, ExecutionPointer, EventSubscription, Event } from "../models";
import { WorkflowBase, IPersistenceProvider, IWorkflowHost, IQueueProvider, IDistributedLockProvider, IWorkflowExecutor, ILogger, TYPES, QueueType, IBackgroundWorker } from "../abstractions";
import { WorkflowBase, IPersistenceProvider, IWorkflowHost, IQueueProvider, IDistributedLockProvider, IWorkflowExecutor, ILogger, TYPES, QueueType, IEventQueueWorker } from "../abstractions";
import { WorkflowRegistry } from "./workflow-registry";
import { WorkflowExecutor } from "./workflow-executor";

@injectable()
export class EventQueueWorker implements IBackgroundWorker {
export class EventQueueWorker implements IEventQueueWorker {

@inject(TYPES.IWorkflowExecutor)
private executor: IWorkflowExecutor;
private executor: IWorkflowExecutor;

@inject(TYPES.IPersistenceProvider)
private persistence: IPersistenceProvider;
Expand All @@ -17,15 +17,32 @@ export class EventQueueWorker implements IBackgroundWorker {
private lockProvider: IDistributedLockProvider;

@inject(TYPES.IQueueProvider)
private queueProvider: IQueueProvider;
private queueProvider: IQueueProvider;

@inject(TYPES.ILogger)
private logger: ILogger;

private processTimer: any;
private interval = 500;

public setInterval(ms: number) {
this.interval = ms;
}

public getInterval() {
return this.interval;
}

public updateFromContainer(container: Container) {
this.executor = container.get(TYPES.IWorkflowExecutor);
this.persistence = container.get(TYPES.IPersistenceProvider);
this.lockProvider = container.get(TYPES.IDistributedLockProvider);
this.queueProvider = container.get(TYPES.IQueueProvider);
this.logger = container.get(TYPES.ILogger);
}

public start() {
this.processTimer = setInterval(this.processQueue, 500, this);
this.processTimer = setInterval(this.processQueue, this.interval, this);
}

public stop() {
Expand Down
24 changes: 20 additions & 4 deletions core/src/services/poll-worker.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { inject, injectable } from "inversify";
import { inject, injectable, Container } from "inversify";
import { WorkflowInstance, WorkflowStatus, ExecutionPointer, EventSubscription, Event } from "../models";
import { WorkflowBase, IPersistenceProvider, IWorkflowHost, IQueueProvider, IDistributedLockProvider, IWorkflowExecutor, ILogger, TYPES, QueueType, IBackgroundWorker } from "../abstractions";
import { WorkflowBase, IPersistenceProvider, IWorkflowHost, IQueueProvider, IDistributedLockProvider, IWorkflowExecutor, ILogger, TYPES, QueueType, IPollWorker } from "../abstractions";
import { WorkflowRegistry } from "./workflow-registry";
import { WorkflowExecutor } from "./workflow-executor";

@injectable()
export class PollWorker implements IBackgroundWorker {
export class PollWorker implements IPollWorker {

@inject(TYPES.IPersistenceProvider)
private persistence: IPersistenceProvider;
Expand All @@ -20,9 +20,25 @@ export class PollWorker implements IBackgroundWorker {
private logger: ILogger;

private processTimer: any;
private interval = 10000;

public setInterval(ms: number) {
this.interval = ms;
}

public getInterval() {
return this.interval;
}

public updateFromContainer(container: Container) {
this.persistence = container.get(TYPES.IPersistenceProvider);
this.lockProvider = container.get(TYPES.IDistributedLockProvider);
this.queueProvider = container.get(TYPES.IQueueProvider);
this.logger = container.get(TYPES.ILogger);
}

public start() {
this.processTimer = setInterval(this.process, 10000, this);
this.processTimer = setInterval(this.process, this.interval, this);
}

public stop() {
Expand Down
22 changes: 14 additions & 8 deletions core/src/services/workflow-host.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { injectable, inject, multiInject } from "inversify";
import { WorkflowInstance, WorkflowStatus, ExecutionPointer, EventSubscription, Event } from "../models";
import { WorkflowBase, IWorkflowRegistry, IPersistenceProvider, IWorkflowHost, IQueueProvider, QueueType, IDistributedLockProvider, IBackgroundWorker, TYPES, ILogger, IExecutionPointerFactory } from "../abstractions";
import { WorkflowBase, IWorkflowRegistry, IPersistenceProvider, IWorkflowHost, IQueueProvider, QueueType, IDistributedLockProvider, IPollWorker, IEventQueueWorker, IWorkflowQueueWorker, TYPES, ILogger, IExecutionPointerFactory } from "../abstractions";
import { WorkflowQueueWorker } from "./workflow-queue-worker";

import { MemoryPersistenceProvider } from "./memory-persistence-provider";
Expand All @@ -12,10 +12,16 @@ import { NullLogger } from "./null-logger";
export class WorkflowHost implements IWorkflowHost {

@inject(TYPES.IWorkflowRegistry)
private registry : IWorkflowRegistry;
private registry: IWorkflowRegistry;

@multiInject(TYPES.IBackgroundWorker)
private workers: IBackgroundWorker[];
@inject(TYPES.IPollWorker)
private pollWorker: IPollWorker

@inject(TYPES.IEventQueueWorker)
private eventQueueWorker: IEventQueueWorker

@inject(TYPES.IWorkflowQueueWorker)
private workflowQueueWorker: IWorkflowQueueWorker

@inject(TYPES.IPersistenceProvider)
private persistence: IPersistenceProvider;
Expand All @@ -24,17 +30,17 @@ export class WorkflowHost implements IWorkflowHost {
private lockProvider: IDistributedLockProvider;

@inject(TYPES.IQueueProvider)
private queueProvider: IQueueProvider = new SingleNodeQueueProvider();
private queueProvider: IQueueProvider = new SingleNodeQueueProvider();

@inject(TYPES.IExecutionPointerFactory)
private pointerFactory : IExecutionPointerFactory;
private pointerFactory: IExecutionPointerFactory;

@inject(TYPES.ILogger)
private logger: ILogger;

public start(): Promise<void> {
this.logger.log("Starting workflow host...");
for (let worker of this.workers) {
for (let worker of [ this.pollWorker, this.eventQueueWorker, this.workflowQueueWorker ]) {
worker.start();
}
this.registerCleanCallbacks();
Expand All @@ -44,7 +50,7 @@ export class WorkflowHost implements IWorkflowHost {
public stop() {
this.logger.log("Stopping workflow host...");

for (let worker of this.workers) {
for (let worker of [ this.pollWorker, this.eventQueueWorker, this.workflowQueueWorker ]) {
worker.stop();
}
}
Expand Down
25 changes: 21 additions & 4 deletions core/src/services/workflow-queue-worker.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { inject, injectable } from "inversify";
import { inject, injectable, Container } from "inversify";
import { WorkflowInstance, WorkflowStatus, ExecutionPointer, EventSubscription, Event } from "../models";
import { WorkflowBase, IPersistenceProvider, IWorkflowHost, IQueueProvider, IDistributedLockProvider, IWorkflowExecutor, ILogger, TYPES, QueueType, IBackgroundWorker } from "../abstractions";
import { WorkflowBase, IPersistenceProvider, IWorkflowHost, IQueueProvider, IDistributedLockProvider, IWorkflowExecutor, ILogger, TYPES, QueueType, IWorkflowQueueWorker } from "../abstractions";
import { WorkflowRegistry } from "./workflow-registry";
import { WorkflowExecutor } from "./workflow-executor";

@injectable()
export class WorkflowQueueWorker implements IBackgroundWorker {
export class WorkflowQueueWorker implements IWorkflowQueueWorker {

@inject(TYPES.IWorkflowExecutor)
private executor: IWorkflowExecutor;
Expand All @@ -23,9 +23,26 @@ export class WorkflowQueueWorker implements IBackgroundWorker {
private logger: ILogger;

private processTimer: any;
private interval = 100;

public setInterval(ms: number) {
this.interval = ms;
}

public getInterval() {
return this.interval;
}

public updateFromContainer(container: Container) {
this.executor = container.get(TYPES.IWorkflowExecutor);
this.persistence = container.get(TYPES.IPersistenceProvider);
this.lockProvider = container.get(TYPES.IDistributedLockProvider);
this.queueProvider = container.get(TYPES.IQueueProvider);
this.logger = container.get(TYPES.ILogger);
}

public start() {
this.processTimer = setInterval(this.processQueue, 100, this);
this.processTimer = setInterval(this.processQueue, this.interval, this);
}

public stop() {
Expand Down