-
Notifications
You must be signed in to change notification settings - Fork 107
/
transaction.go
131 lines (110 loc) · 3.95 KB
/
transaction.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package stomp
import (
"github.com/go-stomp/stomp/frame"
)
// A Transaction applies to the sending of messages to the STOMP server,
// and the acknowledgement of messages received from the STOMP server.
// All messages sent and and acknowledged in the context of a transaction
// are processed atomically by the STOMP server.
//
// Transactions are committed with the Commit method. When a transaction is
// committed, all sent messages, acknowledgements and negative acknowledgements,
// are processed by the STOMP server. Alternatively transactions can be aborted,
// in which case all sent messages, acknowledgements and negative
// acknowledgements are discarded by the STOMP server.
type Transaction struct {
id string
conn *Conn
completed bool
}
// Id returns the unique identifier for the transaction.
func (tx *Transaction) Id() string {
return tx.id
}
// Conn returns the connection associated with this transaction.
func (tx *Transaction) Conn() *Conn {
return tx.conn
}
// Abort will abort the transaction. Any calls to Send, SendWithReceipt,
// Ack and Nack on this transaction will be discarded.
func (tx *Transaction) Abort() error {
if tx.completed {
return ErrCompletedTransaction
}
f := frame.New(frame.ABORT, frame.Transaction, tx.id)
tx.conn.sendFrame(f)
tx.completed = true
return nil
}
// Commit will commit the transaction. All messages and acknowledgements
// sent to the STOMP server on this transaction will be processed atomically.
func (tx *Transaction) Commit() error {
if tx.completed {
return ErrCompletedTransaction
}
f := frame.New(frame.COMMIT, frame.Transaction, tx.id)
tx.conn.sendFrame(f)
tx.completed = true
return nil
}
// Send sends a message to the STOMP server as part of a transaction. The server will not process the
// message until the transaction is committed.
// This method returns without confirming that the STOMP server has received the message. If the STOMP server
// does fail to receive the message for any reason, the connection will close.
//
// The content type should be specified, according to the STOMP specification, but if contentType is an empty
// string, the message will be delivered without a content type header entry. The body array contains the
// message body, and its content should be consistent with the specified content type.
//
// TODO: document opts
func (tx *Transaction) Send(destination, contentType string, body []byte, opts ...func(*frame.Frame) error) error {
if tx.completed {
return ErrCompletedTransaction
}
f, err := createSendFrame(destination, contentType, body, opts)
if err != nil {
return err
}
f.Header.Set(frame.Transaction, tx.id)
tx.conn.sendFrame(f)
return nil
}
// Ack sends an acknowledgement for the message to the server. The STOMP
// server will not process the acknowledgement until the transaction
// has been committed. If the subscription has an AckMode of AckAuto, calling
// this function has no effect.
func (tx *Transaction) Ack(msg *Message) error {
if tx.completed {
return ErrCompletedTransaction
}
f, err := tx.conn.createAckNackFrame(msg, true)
if err != nil {
return err
}
if f != nil {
f.Header.Set(frame.Transaction, tx.id)
tx.conn.sendFrame(f)
}
return nil
}
// Nack sends a negative acknowledgement for the message to the server,
// indicating that this client cannot or will not process the message and
// that it should be processed elsewhere. The STOMP server will not process
// the negative acknowledgement until the transaction has been committed.
// It is an error to call this method if the subscription has an AckMode
// of AckAuto, because the STOMP server will not be expecting any kind
// of acknowledgement (positive or negative) for this message.
func (tx *Transaction) Nack(msg *Message) error {
if tx.completed {
return ErrCompletedTransaction
}
f, err := tx.conn.createAckNackFrame(msg, false)
if err != nil {
return err
}
if f != nil {
f.Header.Set(frame.Transaction, tx.id)
tx.conn.sendFrame(f)
}
return nil
}