-
Notifications
You must be signed in to change notification settings - Fork 180
/
server.go
156 lines (132 loc) · 3.99 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package sse
import (
"encoding/base64"
"sync"
"time"
)
// DefaultBufferSize size of the queue that holds the streams messages.
const DefaultBufferSize = 1024
// Server Is our main struct
type Server struct {
// Extra headers adding to the HTTP response to each client
Headers map[string]string
// Sets a ttl that prevents old events from being transmitted
EventTTL time.Duration
// Specifies the size of the message buffer for each stream
BufferSize int
// Encodes all data as base64
EncodeBase64 bool
// Splits an events data into multiple data: entries
SplitData bool
// Enables creation of a stream when a client connects
AutoStream bool
// Enables automatic replay for each new subscriber that connects
AutoReplay bool
// Specifies the function to run when client subscribe or un-subscribe
OnSubscribe func(streamID string, sub *Subscriber)
OnUnsubscribe func(streamID string, sub *Subscriber)
streams map[string]*Stream
muStreams sync.RWMutex
}
// New will create a server and setup defaults
func New() *Server {
return &Server{
BufferSize: DefaultBufferSize,
AutoStream: false,
AutoReplay: true,
streams: make(map[string]*Stream),
Headers: map[string]string{},
}
}
// NewWithCallback will create a server and setup defaults with callback function
func NewWithCallback(onSubscribe, onUnsubscribe func(streamID string, sub *Subscriber)) *Server {
return &Server{
BufferSize: DefaultBufferSize,
AutoStream: false,
AutoReplay: true,
streams: make(map[string]*Stream),
Headers: map[string]string{},
OnSubscribe: onSubscribe,
OnUnsubscribe: onUnsubscribe,
}
}
// Close shuts down the server, closes all of the streams and connections
func (s *Server) Close() {
s.muStreams.Lock()
defer s.muStreams.Unlock()
for id := range s.streams {
s.streams[id].close()
delete(s.streams, id)
}
}
// CreateStream will create a new stream and register it
func (s *Server) CreateStream(id string) *Stream {
s.muStreams.Lock()
defer s.muStreams.Unlock()
if s.streams[id] != nil {
return s.streams[id]
}
str := newStream(id, s.BufferSize, s.AutoReplay, s.AutoStream, s.OnSubscribe, s.OnUnsubscribe)
str.run()
s.streams[id] = str
return str
}
// RemoveStream will remove a stream
func (s *Server) RemoveStream(id string) {
s.muStreams.Lock()
defer s.muStreams.Unlock()
if s.streams[id] != nil {
s.streams[id].close()
delete(s.streams, id)
}
}
// StreamExists checks whether a stream by a given id exists
func (s *Server) StreamExists(id string) bool {
return s.getStream(id) != nil
}
// Publish sends a mesage to every client in a streamID.
// If the stream's buffer is full, it blocks until the message is sent out to
// all subscribers (but not necessarily arrived the clients), or when the
// stream is closed.
func (s *Server) Publish(id string, event *Event) {
stream := s.getStream(id)
if stream == nil {
return
}
select {
case <-stream.quit:
case stream.event <- s.process(event):
}
}
// TryPublish is the same as Publish except that when the operation would cause
// the call to be blocked, it simply drops the message and returns false.
// Together with a small BufferSize, it can be useful when publishing the
// latest message ASAP is more important than reliable delivery.
func (s *Server) TryPublish(id string, event *Event) bool {
stream := s.getStream(id)
if stream == nil {
return false
}
select {
case stream.event <- s.process(event):
return true
default:
return false
}
}
func (s *Server) getStream(id string) *Stream {
s.muStreams.RLock()
defer s.muStreams.RUnlock()
return s.streams[id]
}
func (s *Server) process(event *Event) *Event {
if s.EncodeBase64 {
output := make([]byte, base64.StdEncoding.EncodedLen(len(event.Data)))
base64.StdEncoding.Encode(output, event.Data)
event.Data = output
}
return event
}