-
Notifications
You must be signed in to change notification settings - Fork 0
/
pkg.go
284 lines (262 loc) · 7.52 KB
/
pkg.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
// Package dietdog couples [log.Logger] from the standard library with the DataDog logs API.
package dietdog
import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/http"
"sync"
"time"
)
// Options customize behavior of this package.
type Option func(*config)
// WithAuth configures DataDog API key to use.
func WithAuth(token string) Option { return func(c *config) { c.auth = token } }
// New returns an io.WriteCloser which can be used as a log.Logger underlying writer.
// It buffers logged messages and submits them to the DataDog logs API endpoint.
//
// To get it working, you need to create a new writer providing an API key:
//
// sink := dietdog.New(dietdog.WithAuth(apiKey))
// defer sink.Close()
// logger := log.New(io.MultiWriter(os.Stderr, sink), "", 0)
// logger.Println("Hello, world!")
//
// When used without an API key, writes to it become no-op.
//
// On a high log rate writes may occasionally block until the background log consumer catches up.
//
// To release resouces and trigger the unsubmitted logs delivery, call its Close method.
// Be careful not to call Fatal* and Panic* methods of its parent Logger.
// Because this package delivers logs to DataDog asynchronously,
// it won't have time to deliver those final messages,
// because Fatal* and Panic* methods terminate the program.
func New(opts ...Option) *writer {
s := &writer{}
for _, opt := range opts {
opt(&s.config)
}
if s.auth == "" && s.log != nil {
s.log.Print("dietdog.New called without an auth option, its writes would be no-op")
}
return s
}
// WithEndpoint overrides which endpoint logs are sent to.
func WithEndpoint(url string) Option { return func(c *config) { c.url = url } }
// WithMetadata adds additional metatada to every log message sent.
func WithMetadata(m Metadata) Option {
return func(c *config) {
c.tags = m.Tags
c.source = m.Source
c.service = m.Service
c.hostname = m.Hostname
}
}
// Metadata describes additional fields of a log message.
// See [DataDog API documentation] for more details.
//
// [DataDog API documentation]: https://docs.datadoghq.com/api/latest/logs/#send-logs
type Metadata struct {
Tags string // ddtags
Source string // ddsource
Service string // service
Hostname string // hostname
}
// WithLogger configures writer with an additional logger to log its own errors to.
// Some errors it may log include: failed delivery attempts, dropped messages because of internal queue overflow.
//
// Be careful not to provide Logger that you plan to reconfigure to write to this writer.
// Such configuration may eventually result in a deadlock.
//
// It is recommended to use a dedicated logger with a custom prefix:
//
// sink := dietdog.New(dietdog.WithAuth(apiKey),
// dietdog.WithLogger(log.New(os.Stderr, "DataDog logging: ", 0)))
func WithLogger(l *log.Logger) Option { return func(c *config) { c.log = l } }
type config struct {
tags string
source string
service string
hostname string
auth string
url string
log *log.Logger
}
type writer struct {
config
once sync.Once
ch chan rawMessage
loopDone chan struct{} // closed on loop exit
ctx context.Context
cancel context.CancelFunc
gzw *gzip.Writer // reused by sendBatch calls
}
// Write puts a message coming from a *log.Logger into an internal queue.
// On a high log rate it may occasionally block until the background log consumer catches up.
func (s *writer) Write(p []byte) (int, error) {
if s.auth == "" {
return len(p), nil
}
s.once.Do(s.init)
if len(p) == 0 {
return 0, nil
}
m := rawMessage{t: time.Now().UnixMilli(), b: make([]byte, len(p))}
copy(m.b, p) // io.Write implementations must not retain p
select {
case <-s.ctx.Done():
return 0, errors.New("sink is already closed")
case s.ch <- m:
}
return len(p), nil
}
// Close triggers unsent logs final delivery and release resources.
// It blocks until the final delivery attempt finishes.
func (s *writer) Close() error {
if s.auth == "" {
return nil
}
s.once.Do(s.init)
s.cancel()
<-s.loopDone
return nil
}
func (s *writer) init() {
if s.url == "" {
s.url = DefaultIntakeEndpoint
}
s.ch = make(chan rawMessage, 1000)
s.loopDone = make(chan struct{})
s.ctx, s.cancel = context.WithCancel(context.Background())
s.gzw, _ = gzip.NewWriterLevel(io.Discard, gzip.BestSpeed)
if s.log == nil {
s.log = log.New(io.Discard, "", 0)
}
go s.loop()
}
func (s *writer) loop() {
defer close(s.loopDone)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
var batch []rawMessage
resetBatch := func() {
for i := range batch {
batch[i].b = nil
}
batch = batch[:0]
}
for {
select {
case m := <-s.ch:
if len(batch) == maxBatchSize {
s.log.Print("outgoing queue overflow, dropping message")
continue
}
batch = append(batch, m)
if len(batch) < maxBatchSize {
continue
}
case <-ticker.C:
case <-s.ctx.Done():
drain:
for l := len(batch); l < maxBatchSize; {
select {
case m := <-s.ch:
batch = append(batch, m)
default:
break drain
}
}
if err := s.sendBatch(batch); err != nil {
s.log.Printf("final batch send: %v, dropped %d messages", err, len(batch))
}
return
}
if len(batch) == 0 {
continue
}
if err := s.sendBatch(batch); err != nil {
var r *errBadStatus
if errors.As(err, &r) && r.retryable() {
continue
}
var t interface{ Timeout() bool }
if errors.Is(err, context.DeadlineExceeded) || (errors.As(err, &t) && t.Timeout()) {
continue
}
s.log.Printf("batch send: %v, dropping %d messages", err, len(batch))
}
resetBatch()
}
}
type rawMessage struct {
t int64 // unix timestamp in milliseconds
b []byte
}
const maxBatchSize = 1000 // https://docs.datadoghq.com/api/latest/logs/#send-logs
func (s *writer) sendBatch(batch []rawMessage) error {
if len(batch) == 0 {
return nil
}
if len(batch) > maxBatchSize {
batch = batch[:maxBatchSize]
}
type msg struct {
Source string `json:"ddsource,omitempty"`
Tags string `json:"ddtags,omitempty"`
Hostname string `json:"hostname,omitempty"`
Service string `json:"service,omitempty"`
Message string `json:"message"`
TS int64 `json:"timestamp"`
}
out := make([]msg, len(batch))
for i := range batch {
out[i] = msg{
Source: s.source,
Tags: s.tags,
Hostname: s.hostname,
Service: s.service,
Message: string(batch[i].b),
TS: batch[i].t,
}
}
buf := new(bytes.Buffer) // TODO: maybe pool
s.gzw.Reset(buf)
if err := json.NewEncoder(s.gzw).Encode(out); err != nil {
return err
}
if err := s.gzw.Close(); err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.url, buf)
if err != nil {
return err
}
req.Header.Set("Content-Encoding", "gzip")
req.Header.Set("Content-Type", "application/json")
req.Header.Set("User-Agent", "github.com/artyom/dietdog")
req.Header.Set("DD-API-KEY", s.auth)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
return nil
}
return &errBadStatus{code: resp.StatusCode}
}
type errBadStatus struct{ code int }
func (e *errBadStatus) Error() string { return fmt.Sprintf("unexpected status code: %d", e.code) }
func (e *errBadStatus) retryable() bool {
return e.code == http.StatusTooManyRequests || e.code >= 500
}
// DefaultIntakeEndpoint is a default endpoint to send logs
const DefaultIntakeEndpoint = "https://http-intake.logs.datadoghq.com/api/v2/logs"