Skip to content

Commit

Permalink
Merge pull request #4 from catmullet/maintenance
Browse files Browse the repository at this point in the history
Maintenance
  • Loading branch information
catmullet authored Nov 13, 2020
2 parents 8d9a797 + 1d9649a commit 0406f84
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 185 deletions.
17 changes: 1 addition & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,7 @@ If you wanted to process based on any change to the storage object you could pas
```go
key := one.MakeKey(event)
```
## Implementing the OneStore
Once you have created your key you're going to need a place to store it. This is so you can check against it later on.
Currently there are only two implementations of storage.
#### Local Storage _(Only for single instance Apps)_
```go
var oneStore OneStore
oneStore = localstore.NewLocalOneStore(ctx, time.Minute * 5)
```

#### Redis
```go
// import "gopkg.in/redis.v5" for redis.Options
Expand All @@ -71,11 +64,3 @@ if !ok {

// Key doesn't exist and was added to the one store
```
## Checking Keys
```go
exists, err := oneStore.HasKey(key)
if !exists {
// Key does not exist in the one store
}

// Key exists
37 changes: 23 additions & 14 deletions examples/expiredkey/expiredkey_example.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package main

import (
"context"
"github.com/catmullet/one"
"github.com/catmullet/one/localstore"
"github.com/catmullet/one/redisstore"
"gopkg.in/redis.v5"
"log"
"time"
)
Expand All @@ -16,30 +16,35 @@ type Event struct {
}

func main() {
ctx := context.Background()

evt := Event{
Bucket: "gcs://test-bucket",
Object: "test_data.txt",
Version: 1,
UpdateTime: time.Now(),
}

var localStore one.OneStore
localStore = localstore.NewLocalOneStore(ctx, time.Second*2)
var redisStore one.Store
redisStore = redisstore.NewRedisStore(&redis.Options{
Network: "tcp",
Addr: "localhost:6379",
}, time.Second*2)

log.Println("PubSub messages coming in for storage events...")

for i := 0; i < 10; i++ {
evt.Version = i
oneKey := one.MakeKey(evt.Bucket, evt.Object, evt.Version)

err := localStore.AddKey(oneKey)
if err == one.ErrKeyExist {
ok, err := redisStore.AddKey(oneKey)
if err != nil {
log.Println("error", err)
continue
}
if !ok {
log.Println("key already exists")
} else {
log.Println("key was added")
continue
}
log.Println("key was added")
}

log.Println("Sleep to let ttl pass on keys...")
Expand All @@ -51,12 +56,16 @@ func main() {
evt.Version = i
oneKey := one.MakeKey(evt.Bucket, evt.Object, evt.Version)

err := localStore.AddKey(oneKey)
if err == one.ErrKeyExist {
ok, err := redisStore.AddKey(oneKey)
if err != nil {
log.Println("error", err)
continue
}
if !ok {
log.Println("key already exists")
} else {
log.Println("key was added")
continue
}
log.Println("key was added")
}

log.Println("finished")
Expand Down
37 changes: 23 additions & 14 deletions examples/storageevent/storageevent_example.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package main

import (
"context"
"github.com/catmullet/one"
"github.com/catmullet/one/localstore"
"github.com/catmullet/one/redisstore"
"gopkg.in/redis.v5"
"log"
"time"
)
Expand All @@ -16,30 +16,35 @@ type Event struct {
}

func main() {
ctx := context.Background()

evt := Event{
Bucket: "gcs://test-bucket",
Object: "test_data.txt",
Version: 1,
UpdateTime: time.Now(),
}

var localStore one.OneStore
localStore = localstore.NewLocalOneStore(ctx, time.Second*15)
var redisStore one.Store
redisStore = redisstore.NewRedisStore(&redis.Options{
Network: "tcp",
Addr: "localhost:6379",
}, time.Second*15)

log.Println("PubSub messages coming in for storage events...")

for i := 0; i < 10; i++ {
evt.Version = i
oneKey := one.MakeKey(evt.Bucket, evt.Object, evt.Version)

err := localStore.AddKey(oneKey)
if err == one.ErrKeyExist {
ok, err := redisStore.AddKey(oneKey)
if err != nil {
log.Println("error", err)
continue
}
if !ok {
log.Println("key already exists")
} else {
log.Println("key was added")
continue
}
log.Println("key was added")
}

log.Println("Duplicate PubSub messages coming in for storage events...")
Expand All @@ -48,12 +53,16 @@ func main() {
evt.Version = i
oneKey := one.MakeKey(evt.Bucket, evt.Object, evt.Version)

err := localStore.AddKey(oneKey)
if err == one.ErrKeyExist {
ok, err := redisStore.AddKey(oneKey)
if err != nil {
log.Println("error", err)
continue
}
if !ok {
log.Println("key already exists")
} else {
log.Println("key was added")
continue
}
log.Println("key was added")
}

log.Println("finished")
Expand Down
108 changes: 0 additions & 108 deletions localstore/localonestore.go

This file was deleted.

2 changes: 1 addition & 1 deletion one.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"fmt"
)

// MakeKey Creates key based on field values
// MakeKey Creates key based on field values.
func MakeKey(fields ...interface{}) (key string) {
var sha = sha256.New()
sha.Write([]byte(fmt.Sprintf("%v", fields...)))
Expand Down
12 changes: 4 additions & 8 deletions one_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,11 @@ package one
import "fmt"

var (
// ErrKeyExist Error for notifying of existing key
ErrKeyExist = fmt.Errorf("Error: Key Exists")

// ErrNoKeyExist Error for notifying of no key exists
ErrNoKeyExist = fmt.Errorf("Error: No Key Exists")
// ErrKeyExist Error for notifying of existing key.
ErrKeyExist = fmt.Errorf("error: key exists")
)

// OneStore interface for existing and custom key storage
type OneStore interface {
// OneStore interface for existing and custom key storage.
type Store interface {
AddKey(key string) (ok bool, err error)
HasKey(key string) (exists bool, err error)
}
40 changes: 16 additions & 24 deletions redisstore/redisonestore.go
Original file line number Diff line number Diff line change
@@ -1,44 +1,36 @@
package redisstore

import (
"github.com/catmullet/one"
"github.com/Kochava/collective2-ingestion-file-watcher/pkg/idempotency"
"gopkg.in/redis.v5"
"time"
)

// RedisOneStore struct to hold redis client and ttl
type RedisOneStore struct {
// RedisStore struct to hold redis client and ttl.
type RedisStore struct {
client *redis.Client
ttl time.Duration
}

// NewRedisOneStore returns a new RedisOneStore with redis client using gopkg.in/redis.v5
func NewRedisOneStore(options *redis.Options, ttl time.Duration) *RedisOneStore {
return &RedisOneStore{
// NewRedisStore returns a new RedisStore with redis client using https://gopkg.in/redis.v5.
func NewRedisStore(options *redis.Options, ttl time.Duration) *RedisStore {
return &RedisStore{
client: redis.NewClient(options),
ttl: ttl,
}
}

// AddKey adds key to redis if one does not exist otherwise returns error
func (rds *RedisOneStore) AddKey(key string) (ok bool, err error) {
isExisting, err := rds.HasKey(key)
if isExisting || err == nil {
return
}

_, err = rds.client.Set(key, nil, rds.ttl).Result()
// AddKey adds key to redis if one does not exist otherwise returns error.
func (rds *RedisStore) AddKey(key string) (bool, error) {
incr, err := rds.client.Incr(key).Result()
if err != nil {
return
return false, err
}
return true, nil
}

// HasKey returns error if key does not exist
func (rds *RedisOneStore) HasKey(key string) (exists bool, err error) {
_, err = rds.client.Get(key).Result()
if err != nil {
return false, one.ErrNoKeyExist
if incr > 1 {
return false, idempotency.ErrKeyExists()
}
return true, nil

_, err = rds.client.Set(key, int64(1), rds.ttl).Result()

return err == nil, err
}

0 comments on commit 0406f84

Please sign in to comment.