forked from ThreeDotsLabs/esja
-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream.go
91 lines (73 loc) · 1.85 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package esja
import (
"errors"
"fmt"
)
// Stream represents a queue of events and basic stream properties.
type Stream[T any] struct {
id string
streamType string
version int
queue []VersionedEvent[T]
}
// NewStream creates a new instance of a Stream with provided ID.
func NewStream[T any](id string) (*Stream[T], error) {
if id == "" {
return nil, errors.New("empty id")
}
return &Stream[T]{
id: id,
}, nil
}
// NewStreamWithType creates a new instance of a Stream with provided ID and custom type.
func NewStreamWithType[T any](id string, streamType string) (*Stream[T], error) {
s, err := NewStream[T](id)
if err != nil {
return nil, err
}
s.streamType = streamType
return s, nil
}
func (s *Stream[T]) ID() string {
return s.id
}
func (s *Stream[T]) Type() string {
return s.streamType
}
// Record applies the provided Event to the entity
// and puts it into the stream's event queue as a next VersionedEvent.
func (s *Stream[T]) Record(entity *T, event Event[T]) error {
err := event.ApplyTo(entity)
if err != nil {
return err
}
s.version += 1
s.queue = append(s.queue, VersionedEvent[T]{
Event: event,
StreamVersion: s.version,
})
return nil
}
// PopEvents returns the slice of queued VersionedEvents and clears it.
func (s *Stream[T]) PopEvents() []VersionedEvent[T] {
tmp := make([]VersionedEvent[T], len(s.queue))
copy(tmp, s.queue)
s.queue = []VersionedEvent[T]{}
return tmp
}
// HasEvents returns true if there are any queued stream.
func (s *Stream[T]) HasEvents() bool {
return len(s.queue) > 0
}
func newStream[T any](id string, events []VersionedEvent[T]) (*Stream[T], error) {
if len(events) == 0 {
return nil, fmt.Errorf("no stream to load")
}
e, err := NewStream[T](id)
if err != nil {
return nil, err
}
e.version = events[len(events)-1].StreamVersion
e.queue = events
return e, nil
}