forked from Teamwork/nylas-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
delta.go
150 lines (131 loc) · 3.93 KB
/
delta.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package nylas
import (
"bufio"
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"github.com/google/go-querystring/query"
)
// DeltaResponse contains the response of a Delta API request.
type DeltaResponse struct {
CursorStart string `json:"cursor_start"`
CursorEnd string `json:"cursor_end"`
Deltas []Delta `json:"deltas"`
}
// Delta represents a change in the Nylas system.
// See: https://docs.nylas.com/reference#deltas
type Delta struct {
ID string `json:"id"`
Object string `json:"object"`
Event string `json:"event"`
Cursor string `json:"cursor"`
Attributes json.RawMessage `json:"attributes"`
}
// Message unmarshals the receivers Attributes field into a Message.
func (d Delta) Message() (Message, error) {
var message Message
return message, json.Unmarshal(d.Attributes, &message)
}
// Thread unmarshals the receivers Attributes field into a Thread.
func (d Delta) Thread() (Thread, error) {
var thread Thread
return thread, json.Unmarshal(d.Attributes, &thread)
}
// LatestDeltaCursor returns latest delta cursor for a users mailbox.
// See: https://docs.nylas.com/reference#obtaining-a-delta-cursor
func (c *Client) LatestDeltaCursor(ctx context.Context) (string, error) {
req, err := c.newUserRequest(ctx, http.MethodPost, "/delta/latest_cursor", nil)
if err != nil {
return "", err
}
latestCursor := struct {
Cursor string `json:"cursor"`
}{}
return latestCursor.Cursor, c.do(req, &latestCursor)
}
// DeltasOptions provides optional parameters to the Deltas method.
type DeltasOptions struct {
IncludeTypes []string `url:"include_types,comma,omitempty"`
ExcludeTypes []string `url:"exclude_types,comma,omitempty"`
View string `url:"view,omitempty"`
}
// Deltas requests a set of changes starting at cursor for a users mailbox.
//
// Note: this may not return all the changes that have happened since the start
// of the cursor and so you should keep requesting using DeltaResponse.CursorEnd
// until a response is given with CursorStart equal to CursorEnd.
//
// See: https://docs.nylas.com/reference#requesting-a-set-of-deltas
func (c *Client) Deltas(
ctx context.Context, cursor string, opts *DeltasOptions,
) (DeltaResponse, error) {
req, err := c.newUserRequest(ctx, http.MethodGet, "/delta", nil)
if err != nil {
return DeltaResponse{}, err
}
appendQueryValues(req, url.Values{"cursor": {cursor}})
if opts != nil {
vs, err := query.Values(opts)
if err != nil {
return DeltaResponse{}, err
}
appendQueryValues(req, vs)
}
var resp DeltaResponse
return resp, c.do(req, &resp)
}
// StreamDeltas streams deltas for a users mailbox with a long lived connection
// calling the provided function with each delta.
//
// This method will block until the context is cancelled or an error occurs.
// Ensure you set a http.Client with appropriate timeout settings, e.g:
// &http.Client{
// Transport: &http.Transport{
// Dial: (&net.Dialer{
// Timeout: 5 * time.Second,
// }).Dial,
// ResponseHeaderTimeout: 10 * time.Second,
// TLSHandshakeTimeout: 5 * time.Second,
// },
// }
//
// See: https://docs.nylas.com/reference#streaming-delta-updates
func (c *Client) StreamDeltas(ctx context.Context, cursor string, fn func(Delta)) error {
req, err := c.newUserRequest(ctx, http.MethodGet, "/delta/streaming", nil)
if err != nil {
return err
}
q := req.URL.Query()
q.Add("cursor", cursor)
req.URL.RawQuery = q.Encode()
resp, err := c.c.Do(req)
if err != nil {
return err
}
defer resp.Body.Close() // nolint: errcheck
if resp.StatusCode > 299 {
return NewError(resp)
}
reader := bufio.NewReader(resp.Body)
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
line, err := reader.ReadBytes('\n')
if err != nil {
return err
}
if len(line) == 1 { // keep alive
continue
}
var delta Delta
if err := json.Unmarshal(line, &delta); err != nil {
return fmt.Errorf("unmarshal delta: %q: %w", line, err)
}
fn(delta)
}
}
}