Skip to content

Commit

Permalink
Merge pull request #34 from tidepool-org/add-full-name
Browse files Browse the repository at this point in the history
Various improvements of cloud events consumer
  • Loading branch information
toddkazakov authored Oct 14, 2020
2 parents 39fb0d4 + 47b614e commit 0883b62
Show file tree
Hide file tree
Showing 49 changed files with 30,942 additions and 64 deletions.
75 changes: 38 additions & 37 deletions clients/mongo/mongo_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
package mongo

import (
"testing"
)
Expand All @@ -8,90 +9,90 @@ func TestNoDatabase(t *testing.T) {
_, err := x.ToConnectionString()

if err == nil {
t.Error("database is required")
}
t.Error("database is required")
}
}

func TestDatabase(t *testing.T) {
x := Config{ Database: "admin"}
x := Config{Database: "admin"}
s, err := x.ToConnectionString()

if err != nil {
t.Error("should not error")
}
t.Error("should not error")
}
if s != "mongodb://localhost/admin?ssl=false" {
t.Errorf("found %v", s)
}
t.Errorf("found %v", s)
}
}

func TestScheme(t *testing.T) {
x := Config{ Database: "admin", Scheme: "mongodb+srv"}
x := Config{Database: "admin", Scheme: "mongodb+srv"}
s, err := x.ToConnectionString()

if err != nil {
t.Error("should not error")
}
t.Error("should not error")
}
if s != "mongodb+srv://localhost/admin?ssl=false" {
t.Errorf("found %v", s)
}
t.Errorf("found %v", s)
}
}

func TestUser(t *testing.T) {
x := Config{ Database: "admin", User: "derrick"}
x := Config{Database: "admin", User: "derrick"}
s, err := x.ToConnectionString()

if err != nil {
t.Error("should not error")
}
t.Error("should not error")
}
if s != "mongodb://derrick@localhost/admin?ssl=false" {
t.Errorf("found %v", s)
}
t.Errorf("found %v", s)
}
}

func TestPassword(t *testing.T) {
x := Config{ Database: "admin", User: "derrick", Password: "password"}
x := Config{Database: "admin", User: "derrick", Password: "password"}
s, err := x.ToConnectionString()

if err != nil {
t.Error("should not error")
}
t.Error("should not error")
}
if s != "mongodb://derrick:password@localhost/admin?ssl=false" {
t.Errorf("found %v", s)
}
t.Errorf("found %v", s)
}
}

func TestSsl(t *testing.T) {
x := Config{ Database: "admin", User: "derrick", Password: "password", Ssl: true}
x := Config{Database: "admin", User: "derrick", Password: "password", Ssl: true}
s, err := x.ToConnectionString()

if err != nil {
t.Error("should not error")
}
t.Error("should not error")
}
if s != "mongodb://derrick:password@localhost/admin?ssl=true" {
t.Errorf("found %v", s)
}
t.Errorf("found %v", s)
}
}

func TestHosts(t *testing.T) {
x := Config{ Database: "admin", User: "derrick", Password: "password", Ssl: true, Hosts: "mongodb1,mongodb2"}
x := Config{Database: "admin", User: "derrick", Password: "password", Ssl: true, Hosts: "mongodb1,mongodb2"}
s, err := x.ToConnectionString()

if err != nil {
t.Error("should not error")
}
t.Error("should not error")
}
if s != "mongodb://derrick:password@mongodb1,mongodb2/admin?ssl=true" {
t.Errorf("found %v", s)
}
t.Errorf("found %v", s)
}
}

func TestOptParams(t *testing.T) {
x := Config{ Database: "admin", User: "derrick", Password: "password", Ssl: true, Hosts: "mongodb1,mongodb2", OptParams: "x=y"}
x := Config{Database: "admin", User: "derrick", Password: "password", Ssl: true, Hosts: "mongodb1,mongodb2", OptParams: "x=y"}
s, err := x.ToConnectionString()

if err != nil {
t.Error("should not error")
}
t.Error("should not error")
}
if s != "mongodb://derrick:password@mongodb1,mongodb2/admin?ssl=true&x=y" {
t.Errorf("found %v", s)
}
t.Errorf("found %v", s)
}
}
3 changes: 3 additions & 0 deletions events/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ func (k *CloudEventsConfig) LoadFromEnv() error {
k.SaramaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
k.SaramaConfig.Net.SASL.User = k.KafkaUsername
k.SaramaConfig.Net.SASL.Password = k.KafkaPassword
k.SaramaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
}
}

return nil
Expand Down
56 changes: 32 additions & 24 deletions events/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
)

type SaramaConsumer struct {
config *CloudEventsConfig
consumerGroup sarama.ConsumerGroup
ready chan bool
topic string
Expand All @@ -23,33 +24,14 @@ func NewSaramaCloudEventsConsumer(config *CloudEventsConfig) (EventConsumer, err
return nil, err
}

cg, err := sarama.NewConsumerGroup(config.KafkaBrokers, config.KafkaConsumerGroup, config.SaramaConfig)
if err != nil {
return nil, err
}

var deadLetterProducer *KafkaCloudEventsProducer
if config.IsDeadLettersEnabled() {
deadLetterProducer, err = NewKafkaCloudEventsProducerForDeadLetters(config)
if err != nil {
return nil, err
}
}

return &SaramaConsumer{
consumerGroup: cg,
ready: make(chan bool),
topic: config.GetPrefixedTopic(),
handlers: make([]EventHandler, 0),
deadLetterProducer: deadLetterProducer,
config: config,
ready: make(chan bool),
topic: config.GetPrefixedTopic(),
handlers: make([]EventHandler, 0),
}, nil
}

func newDeadLetterProducerConfig(config CloudEventsConfig) CloudEventsConfig {
config.KafkaTopic = config.KafkaTopic + DeadLetterSuffix
return config
}

func (s *SaramaConsumer) Setup(session sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(s.ready)
Expand Down Expand Up @@ -99,6 +81,10 @@ func (s *SaramaConsumer) RegisterHandler(handler EventHandler) {
}

func (s *SaramaConsumer) Start(ctx context.Context) error {
if err := s.initialize(); err != nil {
return err
}

wg := &sync.WaitGroup{}
wg.Add(1)

Expand All @@ -109,7 +95,8 @@ func (s *SaramaConsumer) Start(ctx context.Context) error {
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
if err := s.consumerGroup.Consume(ctx, []string{s.topic}, s); err != nil {
log.Panicf("Error from consumer: %v", err)
log.Printf("Error from consumer: %v", err)
return
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
Expand All @@ -122,3 +109,24 @@ func (s *SaramaConsumer) Start(ctx context.Context) error {
wg.Wait()
return s.consumerGroup.Close()
}

func (s *SaramaConsumer) initialize() error {
cg, err := sarama.NewConsumerGroup(
s.config.KafkaBrokers,
s.config.KafkaConsumerGroup,
s.config.SaramaConfig,
)
if err != nil {
return err
}

if s.config.IsDeadLettersEnabled() {
s.deadLetterProducer, err = NewKafkaCloudEventsProducerForDeadLetters(s.config)
if err != nil {
return err
}
}

s.consumerGroup = cg
return nil
}
6 changes: 5 additions & 1 deletion events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ var _ Event = DeleteUserEvent{}

type DeleteUserEvent struct {
shoreline.UserData `json:",inline"`
ProfileFullName string `json:"fullName"`
}

func (d DeleteUserEvent) GetEventType() string {
Expand Down Expand Up @@ -71,6 +72,7 @@ func NewUserEventsHandler(delegate UserEventsHandler) EventHandler {
}

var _ EventHandler = &delegatingUserEventsHandler{}

type delegatingUserEventsHandler struct {
delegate UserEventsHandler
}
Expand Down Expand Up @@ -108,7 +110,8 @@ func (d *delegatingUserEventsHandler) Handle(ce cloudevents.Event) error {
return nil
}

type DebugEventHandler struct {}
type DebugEventHandler struct{}

var _ EventHandler = &DebugEventHandler{}

func (d *DebugEventHandler) CanHandle(ce cloudevents.Event) bool {
Expand All @@ -121,6 +124,7 @@ func (d *DebugEventHandler) Handle(ce cloudevents.Event) error {
}

type NoopUserEventsHandler struct{}

var _ UserEventsHandler = &NoopUserEventsHandler{}

func (d *NoopUserEventsHandler) HandleUpdateUserEvent(payload UpdateUserEvent) error {
Expand Down
33 changes: 33 additions & 0 deletions events/scram.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package events

import (
"crypto/sha512"
"github.com/xdg/scram"
"hash"
)

var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }

type XDGSCRAMClient struct {
*scram.Client
*scram.ClientConversation
scram.HashGeneratorFcn
}

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
if err != nil {
return err
}
x.ClientConversation = x.Client.NewConversation()
return nil
}

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
response, err = x.ClientConversation.Step(challenge)
return
}

func (x *XDGSCRAMClient) Done() bool {
return x.ClientConversation.Done()
}
5 changes: 3 additions & 2 deletions events/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ import (

const (
producerRetryPeriod = 5 * time.Second
producerMaxRetries = 5
producerMaxRetries = 5
)

type EventProducer interface {
Send(ctx context.Context, event Event) error
}

var _ EventProducer = &KafkaCloudEventsProducer{}

type KafkaCloudEventsProducer struct {
client cloudevents.Client
source string
Expand Down Expand Up @@ -66,7 +67,7 @@ func (c *KafkaCloudEventsProducer) Send(ctx context.Context, event Event) error

func (c *KafkaCloudEventsProducer) SendCloudEvent(ctx context.Context, event cloudevents.Event) error {
return c.client.Send(
cloudevents.ContextWithRetriesExponentialBackoff(ctx, producerRetryPeriod,producerMaxRetries),
cloudevents.ContextWithRetriesExponentialBackoff(ctx, producerRetryPeriod, producerMaxRetries),
event,
)
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ require (
github.com/cloudevents/sdk-go/v2 v2.2.0
github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8
github.com/kelseyhightower/envconfig v1.4.0
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ github.com/stretchr/testify v1.6.0 h1:jlIyCplCJFULU/01vCkhKuTyc3OorI3bJFuw6obfgh
github.com/stretchr/testify v1.6.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
go.opencensus.io v0.22.0 h1:C9hSCOW830chIVkdja34wa6Ky+IzWllkUinR+BtRZd4=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
Expand Down
Empty file.
11 changes: 11 additions & 0 deletions vendor/github.com/xdg/scram/.travis.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 0883b62

Please sign in to comment.