Skip to content

Commit

Permalink
merge MessageBus and PubSub interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
andykswong committed Oct 6, 2023
1 parent 756ee68 commit 5b03d3f
Show file tree
Hide file tree
Showing 69 changed files with 1,472 additions and 1,404 deletions.
14 changes: 7 additions & 7 deletions packages/cqrs/src/__tests__/iterator.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { ErrorName, immediate } from '@mithic/commons';
import { SimpleMessageBus } from '../bus/simple.js';
import { AsyncSubscriber } from '../iterator.js';
import { SimpleMessageBus } from '@mithic/messaging';

describe(AsyncSubscriber.name, () => {
let eventBus: SimpleMessageBus<number>;
Expand All @@ -17,7 +17,7 @@ describe(AsyncSubscriber.name, () => {

it('should return values from the subscription', async () => {
const events = [1, 2, 3];
events.forEach(eventBus.dispatch);
events.forEach((event) => eventBus.dispatch(event));
const result = [];

for await (const event of subscriber) {
Expand All @@ -36,7 +36,7 @@ describe(AsyncSubscriber.name, () => {

const error = new Error('stop');
const events = [1, 2, 3];
events.forEach(eventBus.dispatch);
events.forEach((event) => eventBus.dispatch(event));
const result = [];

try {
Expand All @@ -60,7 +60,7 @@ describe(AsyncSubscriber.name, () => {
const controller = new AbortController();
const subscriber = new AsyncSubscriber(eventBus, controller);
const events = [1, 2, 3];
events.forEach(eventBus.dispatch);
events.forEach((event) => eventBus.dispatch(event));
const result = [];

try {
Expand Down Expand Up @@ -96,7 +96,7 @@ describe(AsyncSubscriber.name, () => {

it('should resolve pending pulls on close', async () => {
const events = [1, 2];
events.forEach(eventBus.dispatch);
events.forEach((event) => eventBus.dispatch(event));
const result = [];
for (let i = 0; i < 4; ++i) {
result.push(subscriber.next());
Expand All @@ -113,7 +113,7 @@ describe(AsyncSubscriber.name, () => {
it('should drop overflowing values', async () => {
const subscriber = new AsyncSubscriber(eventBus, { bufferSize: 1 });
const events = [1, 2, 3];
events.forEach(eventBus.dispatch);
events.forEach((event) => eventBus.dispatch(event));

for await (const event of subscriber) {
expect(event).toEqual(events[2]); // previous events dropped
Expand All @@ -124,7 +124,7 @@ describe(AsyncSubscriber.name, () => {
it('should ignore new values on fcfs mode if buffer is full', async () => {
const subscriber = new AsyncSubscriber(eventBus, { bufferSize: 1, fcfs: true });
const events = [1, 2, 3];
events.forEach(eventBus.dispatch);
events.forEach((event) => eventBus.dispatch(event));

for await (const event of subscriber) {
expect(event).toEqual(events[0]); // later events dropped
Expand Down
14 changes: 8 additions & 6 deletions packages/cqrs/src/__tests__/mocks.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
import { MessageHandler, PubSub, PubSubMessage, SubscribeOptions, MessageValidator } from '@mithic/messaging';
import { MessageHandler, SubscribeOptions, MessageValidator, MessageBus, Unsubscribe } from '@mithic/messaging';

export const LIBP2P_KEY_CODE = 0x72;

export class MockPubSub implements PubSub<Uint8Array> {
topicHandlers = new Map<string, MessageHandler<PubSubMessage<Uint8Array>>>();
topicValidators = new Map<string, MessageValidator<PubSubMessage<Uint8Array>>>();
export class MockMessageBus<Msg = Uint8Array> implements MessageBus<Msg> {
topicHandlers = new Map<string, MessageHandler<Msg>>();
topicValidators = new Map<string, MessageValidator<Msg>>();

subscribe(topic: string, handler: MessageHandler<PubSubMessage<Uint8Array>>, options?: SubscribeOptions<PubSubMessage<Uint8Array>>): void {
subscribe(handler: MessageHandler<Msg>, options?: SubscribeOptions<Msg>): Unsubscribe {
const topic = options?.topic ?? '';
this.topicHandlers.set(topic, handler);
options?.validator && this.topicValidators.set(topic, options.validator);
return () => this.unsubscribe(topic);
}

unsubscribe(topic: string): void {
this.topicHandlers.delete(topic);
this.topicValidators.delete(topic);
}

publish(): void {
dispatch(): void {
// NO-OP
}

Expand Down
5 changes: 2 additions & 3 deletions packages/cqrs/src/__tests__/processor.spec.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import { jest } from '@jest/globals';
import { MessageConsumer, MessageSubscription, Unsubscribe } from '../bus.js';
import { SimpleMessageBus } from '../bus/index.js';
import { MessageProcessor } from '../processor.js';
import { MessageHandler, MessageSubscription, SimpleMessageBus, Unsubscribe } from '@mithic/messaging';

describe(MessageProcessor.name, () => {
let subscription: MessageSubscription<string>;
let mockConsumer: jest.MockedFunction<MessageConsumer<string>>;
let mockConsumer: jest.MockedFunction<MessageHandler<string>>;
let processor: MessageProcessor<string>;

beforeEach(() => {
Expand Down
23 changes: 0 additions & 23 deletions packages/cqrs/src/bus.ts

This file was deleted.

93 changes: 0 additions & 93 deletions packages/cqrs/src/bus/__tests__/pubsub.spec.ts

This file was deleted.

42 changes: 0 additions & 42 deletions packages/cqrs/src/bus/__tests__/simple.spec.ts

This file was deleted.

2 changes: 0 additions & 2 deletions packages/cqrs/src/bus/index.ts

This file was deleted.

52 changes: 0 additions & 52 deletions packages/cqrs/src/bus/pubsub.ts

This file was deleted.

25 changes: 0 additions & 25 deletions packages/cqrs/src/bus/simple.ts

This file was deleted.

2 changes: 0 additions & 2 deletions packages/cqrs/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
export * from './bus.js';
export * from './event.js';
export * from './iterator.js';
export * from './processor.js';

export * from './bus/index.js';
export * from './processor/index.js';
export * from './preset/index.js';
2 changes: 1 addition & 1 deletion packages/cqrs/src/iterator.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { ArrayDeque } from '@mithic/collections';
import { AbortOptions, Closeable, MaybePromise } from '@mithic/commons';
import { MessageSubscription, Unsubscribe } from './bus.js';
import { MessageSubscription, Unsubscribe } from '@mithic/messaging';

/**
* Subscribe to an {@link MessageSubscription} as AsyncIterator.
Expand Down
Loading

0 comments on commit 5b03d3f

Please sign in to comment.