-
Notifications
You must be signed in to change notification settings - Fork 93
/
subscription_manager.go
251 lines (210 loc) · 7.57 KB
/
subscription_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
package river
import (
"context"
"fmt"
"sync"
"time"
"github.com/riverqueue/river/internal/jobcompleter"
"github.com/riverqueue/river/internal/jobstats"
"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivershared/startstop"
"github.com/riverqueue/river/rivershared/util/sliceutil"
"github.com/riverqueue/river/rivertype"
)
type subscriptionManager struct {
baseservice.BaseService
startstop.BaseStartStop
subscribeCh <-chan []jobcompleter.CompleterJobUpdated
statsMu sync.Mutex // protects stats fields
statsAggregate jobstats.JobStatistics
statsNumJobs int
mu sync.Mutex // protects subscription fields
subscriptions map[int]*eventSubscription
subscriptionsSeq int // used for generating simple IDs
}
func newSubscriptionManager(archetype *baseservice.Archetype, subscribeCh <-chan []jobcompleter.CompleterJobUpdated) *subscriptionManager {
return baseservice.Init(archetype, &subscriptionManager{
subscribeCh: subscribeCh,
subscriptions: make(map[int]*eventSubscription),
})
}
// ResetSubscribeChan is used to change the channel that the subscription
// manager listens on. It must only be called when the subscription manager is
// stopped.
func (sm *subscriptionManager) ResetSubscribeChan(subscribeCh <-chan []jobcompleter.CompleterJobUpdated) {
sm.subscribeCh = subscribeCh
}
func (sm *subscriptionManager) Start(ctx context.Context) error {
ctx, shouldStart, started, stopped := sm.StartInit(ctx)
if !shouldStart {
return nil
}
go func() {
started()
defer stopped() // this defer should come first so it's last out
sm.Logger.DebugContext(ctx, sm.Name+": Run loop started")
defer sm.Logger.DebugContext(ctx, sm.Name+": Run loop stopped")
// On shutdown, close and remove all active subscriptions.
defer func() {
sm.mu.Lock()
defer sm.mu.Unlock()
for subID, sub := range sm.subscriptions {
close(sub.Chan)
delete(sm.subscriptions, subID)
}
}()
for {
select {
case <-ctx.Done():
// Distribute remaining subscriptions until the channel is
// closed. This does make the subscription manager a little
// problematic in that it requires the subscription channel to
// be closed before it will fully stop. This always happens in
// the case of a real client by virtue of the completer always
// stopping at the same time as the subscription manager, but
// one has to be careful in tests.
sm.Logger.DebugContext(ctx, sm.Name+": Stopping; distributing subscriptions until channel is closed")
for updates := range sm.subscribeCh {
sm.distributeJobUpdates(updates)
}
return
case updates := <-sm.subscribeCh:
sm.distributeJobUpdates(updates)
}
}
}()
return nil
}
func (sm *subscriptionManager) logStats(ctx context.Context, svcName string) {
sm.statsMu.Lock()
defer sm.statsMu.Unlock()
sm.Logger.DebugContext(ctx, svcName+": Job stats (since last stats line)",
"num_jobs_run", sm.statsNumJobs,
"average_complete_duration", sm.safeDurationAverage(sm.statsAggregate.CompleteDuration, sm.statsNumJobs),
"average_queue_wait_duration", sm.safeDurationAverage(sm.statsAggregate.QueueWaitDuration, sm.statsNumJobs),
"average_run_duration", sm.safeDurationAverage(sm.statsAggregate.RunDuration, sm.statsNumJobs))
sm.statsAggregate = jobstats.JobStatistics{}
sm.statsNumJobs = 0
}
// Handles a potential divide by zero.
func (sm *subscriptionManager) safeDurationAverage(d time.Duration, n int) time.Duration {
if n == 0 {
return 0
}
return d / time.Duration(n)
}
// Receives updates from the completer and prompts the client to update
// statistics and distribute jobs into any listening subscriber channels.
// (Subscriber channels are non-blocking so this should be quite fast.)
func (sm *subscriptionManager) distributeJobUpdates(updates []jobcompleter.CompleterJobUpdated) {
func() {
sm.statsMu.Lock()
defer sm.statsMu.Unlock()
for _, update := range updates {
stats := update.JobStats
sm.statsAggregate.CompleteDuration += stats.CompleteDuration
sm.statsAggregate.QueueWaitDuration += stats.QueueWaitDuration
sm.statsAggregate.RunDuration += stats.RunDuration
sm.statsNumJobs++
}
}()
sm.mu.Lock()
defer sm.mu.Unlock()
// Quick path so we don't need to allocate anything if no one is listening.
if len(sm.subscriptions) < 1 {
return
}
for _, update := range updates {
sm.distributeJobEvent(update.Job, jobStatisticsFromInternal(update.JobStats))
}
}
// Distribute a single event into any listening subscriber channels.
//
// Job events should specify the job and stats, while queue events should only specify
// the queue.
//
// MUST be called with sm.mu already held.
func (sm *subscriptionManager) distributeJobEvent(job *rivertype.JobRow, stats *JobStatistics) {
var event *Event
switch job.State {
case rivertype.JobStateCancelled:
event = &Event{Kind: EventKindJobCancelled, Job: job, JobStats: stats}
case rivertype.JobStateCompleted:
event = &Event{Kind: EventKindJobCompleted, Job: job, JobStats: stats}
case rivertype.JobStateScheduled:
event = &Event{Kind: EventKindJobSnoozed, Job: job, JobStats: stats}
case rivertype.JobStateAvailable, rivertype.JobStateDiscarded, rivertype.JobStateRetryable, rivertype.JobStateRunning:
event = &Event{Kind: EventKindJobFailed, Job: job, JobStats: stats}
case rivertype.JobStatePending:
panic("completion subscriber unexpectedly received job in pending state, river bug")
default:
// linter exhaustive rule prevents this from being reached
panic("unreachable state to distribute, river bug")
}
// All subscription channels are non-blocking so this is always fast and
// there's no risk of falling behind what producers are sending.
for _, sub := range sm.subscriptions {
if sub.ListensFor(event.Kind) {
// TODO: THIS IS UNSAFE AND WILL LEAD TO DROPPED EVENTS.
//
// We are allocating subscriber channels with a fixed size of 1000, but
// potentially processing job events in batches of 5000 (batch completer
// max batch size). It's probably not possible for the subscriber to keep
// up with these bursts.
select {
case sub.Chan <- event:
default:
}
}
}
}
func (sm *subscriptionManager) distributeQueueEvent(event *Event) {
sm.mu.Lock()
defer sm.mu.Unlock()
// All subscription channels are non-blocking so this is always fast and
// there's no risk of falling behind what producers are sending.
for _, sub := range sm.subscriptions {
if sub.ListensFor(event.Kind) {
select {
case sub.Chan <- event:
default:
}
}
}
}
// Special internal variant that lets us inject an overridden size.
func (sm *subscriptionManager) SubscribeConfig(config *SubscribeConfig) (<-chan *Event, func()) {
if config.ChanSize < 0 {
panic("SubscribeConfig.ChanSize must be greater or equal to 1")
}
if config.ChanSize == 0 {
config.ChanSize = subscribeChanSizeDefault
}
for _, kind := range config.Kinds {
if _, ok := allKinds[kind]; !ok {
panic(fmt.Errorf("unknown event kind: %s", kind))
}
}
subChan := make(chan *Event, config.ChanSize)
sm.mu.Lock()
defer sm.mu.Unlock()
// Just gives us an easy way of removing the subscription again later.
subID := sm.subscriptionsSeq
sm.subscriptionsSeq++
sm.subscriptions[subID] = &eventSubscription{
Chan: subChan,
Kinds: sliceutil.KeyBy(config.Kinds, func(k EventKind) (EventKind, struct{}) { return k, struct{}{} }),
}
cancel := func() {
sm.mu.Lock()
defer sm.mu.Unlock()
// May no longer be present in case this was called after a stop.
sub, ok := sm.subscriptions[subID]
if !ok {
return
}
close(sub.Chan)
delete(sm.subscriptions, subID)
}
return subChan, cancel
}