Skip to content

Commit

Permalink
refactor crdt model
Browse files Browse the repository at this point in the history
  • Loading branch information
andykswong committed Jan 3, 2024
1 parent d555e2c commit 8cf76f6
Show file tree
Hide file tree
Showing 17 changed files with 1,217 additions and 793 deletions.
9 changes: 9 additions & 0 deletions packages/crdt/src/__tests__/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { SyncOrAsyncIterable } from '@mithic/commons';

export async function collect<T>(entries: SyncOrAsyncIterable<T>): Promise<T[]> {
const results = [];
for await (const entry of entries) {
results.push(entry);
}
return results;
}
27 changes: 5 additions & 22 deletions packages/crdt/src/aggregate.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,16 @@
import { AbortOptions, MaybePromise } from '@mithic/commons';

/** Aggregate command haler. */
export interface AggregateCommandHandler<Store, Command, Event> {
export interface AggregateCommandHandler<State, Command, Event> {
/** Handles a command and produces an event. */
handle(store: Store, command: Command, options?: AbortOptions): MaybePromise<Event | undefined>;
handle(store: State, command: Command, options?: AbortOptions): MaybePromise<Event | undefined>;
}

/** Aggregate state projection. */
export interface AggregateProjection<Store, Event> {
export interface AggregateProjection<State, Event> {
/** Applies an event to state. */
reduce(store: Store, event: Event, options?: AbortOptions): MaybePromise<Store>;
reduce(store: State, event: Event, options?: AbortOptions): MaybePromise<State>;

/** Validates an event and returns any error. */
validate(store: Store, event: Event, options?: AbortOptions): MaybePromise<Error | undefined>;
validate(store: State, event: Event, options?: AbortOptions): MaybePromise<Error | undefined>;
}

/** Aggregate state query resolver. */
export interface AggregateQueryResolver<Store, Query> {
/** Resolves query to current state. */
resolve(store: Store, query: Query, options?: AbortOptions): AggregateQueryResult<Query>;
}

declare const ResultMarker: unique symbol;

/** Aggregate query object type. */
export interface AggregateQuery<Result = unknown> {
/** Result type marker */
[ResultMarker]?: Result;
}

/** Aggregate query result type. */
export type AggregateQueryResult<Query> = Query extends AggregateQuery<infer Result> ? Result : unknown;
16 changes: 12 additions & 4 deletions packages/crdt/src/defaults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export const getCID = await (async () => {
const dagCbor = await import('@ipld/dag-cbor');
return async <K, Event>(event: Event) =>
CID.create(1, dagCbor.code, await sha256.digest(dagCbor.encode(event))) as unknown as K;
} catch (_) {
} catch {
return () => { throw new InvalidStateError('multiformats or @ipld/dag-cbor not available'); };
}
})();
Expand All @@ -21,10 +21,18 @@ export const decodeCID = await (async () => {
return function decodeCID<K>(key: string) {
return CID.parse(key) as unknown as K;
};
} catch (_) {
} catch {
return () => { throw new InvalidStateError('multiformats not available'); };
}
})();

/** Default value hash function using JSON stringify. */
export const defaultHash = <V>(value: V) => JSON.stringify(value);
/** Default value hash function. */
export const defaultHash = await (async () => {
try {
const { base32hex } = await import('multiformats/bases/base32');
const dagCbor = await import('@ipld/dag-cbor');
return async <V>(value: V) => base32hex.baseEncode(dagCbor.encode(value));
} catch {
return <V>(value: V) => JSON.stringify(value);
}
})();
Loading

0 comments on commit 8cf76f6

Please sign in to comment.