Skip to content

Commit

Permalink
MF-521 - Remove the dynamic creation of tables (#530)
Browse files Browse the repository at this point in the history
* MF-521 - Remove the dynamic creation of tables

* Add consts

* Rename the JSON Transformer field from valueFields to valuesFilter

* Fix the indentation
  • Loading branch information
majabirmancevic authored Oct 2, 2024
1 parent 5c0f00f commit eea9e08
Show file tree
Hide file tree
Showing 28 changed files with 586 additions and 460 deletions.
7 changes: 5 additions & 2 deletions consumers/writers/influxdb/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ import (
influxdb2write "github.com/influxdata/influxdb-client-go/v2/api/write"
)

const senmlPoints = "messages"
const (
senmlPoints = "messages"
jsonPoints = "json"
)

var _ consumers.Consumer = (*influxRepo)(nil)

Expand Down Expand Up @@ -123,7 +126,7 @@ func (repo *influxRepo) jsonPoints(msgs json.Messages) ([]*influxdb2write.Point,
"subtopic": m.Subtopic,
"publisher": m.Publisher,
}
pt := influxdb2.NewPoint(msgs.Format, tags, fields, t)
pt := influxdb2.NewPoint(jsonPoints, tags, fields, t)
pts = append(pts, pt)
}

Expand Down
26 changes: 12 additions & 14 deletions consumers/writers/influxdb/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ import (
"github.com/stretchr/testify/require"
)

const valueFields = 5
const (
valueFields = 5
httpProt = "http"
mqttProt = "mqtt"
)

var (
testLog, _ = log.New(os.Stdout, log.Info.String())
Expand All @@ -37,7 +41,7 @@ var (

rowCountJson = fmt.Sprintf(`from(bucket: "%s")
|> range(start: -1h, stop: 1h)
|> filter(fn: (r) => r["_measurement"] == "some_json")
|> filter(fn: (r) => r["_measurement"] == "json")
|> filter(fn: (r) => r["_field"] == "field_1" or r["_field"] == "field_2" or r["_field"] == "field_3" or r["_field"] == "field_4" or r["_field"] == "field_5/field_1" or r["_field"] == "field_5/field_2")
|> count()
|> yield(name: "count")`, repoCfg.Bucket)
Expand Down Expand Up @@ -149,7 +153,7 @@ func TestSaveSenML(t *testing.T) {
for i := 0; i < tc.msgsNum; i++ {
msg := senml.Message{
Publisher: pubID,
Protocol: "http",
Protocol: httpProt,
Name: "test name",
Unit: "km",
UpdateTime: 5456565466,
Expand Down Expand Up @@ -191,8 +195,8 @@ func TestSaveJSON(t *testing.T) {
msg := json.Message{
Publisher: pubID,
Created: time.Now().UnixNano(),
Subtopic: "subtopic/format/some_json",
Protocol: "mqtt",
Subtopic: subtopic,
Protocol: mqttProt,
Payload: map[string]interface{}{
"field_1": 123,
"field_2": "value",
Expand Down Expand Up @@ -231,15 +235,9 @@ func TestSaveJSON(t *testing.T) {
}

now := time.Now().UnixNano()
msgs := json.Messages{
Format: "some_json",
}
invalidKeySepMsgs := json.Messages{
Format: "some_json",
}
invalidKeyNameMsgs := json.Messages{
Format: "some_json",
}
msgs := json.Messages{}
invalidKeySepMsgs := json.Messages{}
invalidKeyNameMsgs := json.Messages{}

for i := 0; i < streamsSize; i++ {
msg.Created = now
Expand Down
7 changes: 5 additions & 2 deletions consumers/writers/mongodb/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ import (
"github.com/MainfluxLabs/mainflux/pkg/transformers/senml"
)

const senmlCollection string = "messages"
const (
senmlCollection = "messages"
jsonCollection = "json"
)

var _ consumers.Consumer = (*mongoRepo)(nil)

Expand Down Expand Up @@ -62,7 +65,7 @@ func (repo *mongoRepo) saveJSON(msgs json.Messages) error {
m = append(m, msg)
}

coll := repo.db.Collection(msgs.Format)
coll := repo.db.Collection(jsonCollection)

_, err := coll.InsertMany(context.Background(), m)
if err != nil {
Expand Down
9 changes: 4 additions & 5 deletions consumers/writers/mongodb/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ var (
msgsNum = 100
valueFields = 5
subtopic = "topic"
mqttProt = "mqtt"
)

var (
Expand Down Expand Up @@ -103,8 +104,8 @@ func TestSaveJSON(t *testing.T) {
msg := json.Message{
Publisher: pubid.String(),
Created: time.Now().Unix(),
Subtopic: "subtopic/format/some_json",
Protocol: "mqtt",
Subtopic: subtopic,
Protocol: mqttProt,
Payload: map[string]interface{}{
"field_1": 123,
"field_2": "value",
Expand All @@ -118,9 +119,7 @@ func TestSaveJSON(t *testing.T) {
}

now := time.Now().Unix()
msgs := json.Messages{
Format: "some_json",
}
msgs := json.Messages{}

for i := 0; i < msgsNum; i++ {
msg.Created = now + int64(i)
Expand Down
44 changes: 8 additions & 36 deletions consumers/writers/postgres/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package postgres
import (
"context"
"encoding/json"
"fmt"

"github.com/MainfluxLabs/mainflux/consumers"
"github.com/MainfluxLabs/mainflux/pkg/errors"
Expand All @@ -20,7 +19,6 @@ import (
var (
errInvalidMessage = errors.New("invalid message representation")
errTransRollback = errors.New("failed to rollback transaction")
errNoTable = errors.New("relation does not exist")
)

var _ consumers.Consumer = (*postgresRepo)(nil)
Expand Down Expand Up @@ -89,20 +87,14 @@ func (pr postgresRepo) saveSenml(messages interface{}) (err error) {
return err
}

func (pr postgresRepo) saveJSON(msgs mfjson.Messages) error {
if err := pr.insertJSON(msgs); err != nil {
if err == errNoTable {
if err := pr.createTable(msgs.Format); err != nil {
return err
}
return pr.insertJSON(msgs)
}
return err
func (pr postgresRepo) saveJSON(messages interface{}) error {
msgs, ok := messages.(mfjson.Messages)
if !ok {
return errors.ErrSaveMessage
}
return nil
}
q := `INSERT INTO json (created, subtopic, publisher, protocol, payload)
VALUES (:created, :subtopic, :publisher, :protocol, :payload);`

func (pr postgresRepo) insertJSON(msgs mfjson.Messages) error {
tx, err := pr.db.BeginTxx(context.Background(), nil)
if err != nil {
return errors.Wrap(errors.ErrSaveMessage, err)
Expand All @@ -120,10 +112,6 @@ func (pr postgresRepo) insertJSON(msgs mfjson.Messages) error {
}
}()

q := `INSERT INTO %s (created, subtopic, publisher, protocol, payload)
VALUES (:created, :subtopic, :publisher, :protocol, :payload);`
q = fmt.Sprintf(q, msgs.Format)

for _, m := range msgs.Data {
var dbmsg jsonMessage
dbmsg, err = toJSONMessage(m)
Expand All @@ -137,28 +125,12 @@ func (pr postgresRepo) insertJSON(msgs mfjson.Messages) error {
switch pgErr.Code {
case pgerrcode.InvalidTextRepresentation:
return errors.Wrap(errors.ErrSaveMessage, errInvalidMessage)
case pgerrcode.UndefinedTable:
return errNoTable
}
}
return err

return errors.Wrap(errors.ErrSaveMessage, err)
}
}
return nil
}

func (pr postgresRepo) createTable(name string) error {
q := `CREATE TABLE IF NOT EXISTS %s (
created BIGINT,
subtopic VARCHAR(254),
publisher VARCHAR(254),
protocol TEXT,
payload JSONB,
PRIMARY KEY (publisher, subtopic, created)
)`
q = fmt.Sprintf(q, name)

_, err := pr.db.Exec(q)
return err
}

Expand Down
9 changes: 4 additions & 5 deletions consumers/writers/postgres/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
const (
msgsNum = 42
valueFields = 5
mqttProt = "mqtt"
subtopic = "topic"
)

Expand Down Expand Up @@ -77,8 +78,8 @@ func TestSaveJSON(t *testing.T) {
msg := json.Message{
Publisher: pubid.String(),
Created: time.Now().Unix(),
Subtopic: "subtopic/format/some_json",
Protocol: "mqtt",
Subtopic: subtopic,
Protocol: mqttProt,
Payload: map[string]interface{}{
"field_1": 123,
"field_2": "value",
Expand All @@ -92,9 +93,7 @@ func TestSaveJSON(t *testing.T) {
}

now := time.Now().Unix()
msgs := json.Messages{
Format: "some_json",
}
msgs := json.Messages{}

for i := 0; i < msgsNum; i++ {
msg.Created = now + int64(i)
Expand Down
37 changes: 23 additions & 14 deletions consumers/writers/postgres/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,32 @@ func migrateDB(db *sqlx.DB) error {
Id: "messages_1",
Up: []string{
`CREATE TABLE IF NOT EXISTS messages (
subtopic VARCHAR(254),
publisher UUID,
protocol TEXT,
name TEXT,
unit TEXT,
value FLOAT,
string_value TEXT,
bool_value BOOL,
data_value BYTEA,
sum FLOAT,
time FLOAT,
update_time FLOAT,
PRIMARY KEY (time, publisher, subtopic, name)
)`,
subtopic VARCHAR(254),
publisher UUID,
protocol TEXT,
name TEXT,
unit TEXT,
value FLOAT,
string_value TEXT,
bool_value BOOL,
data_value BYTEA,
sum FLOAT,
time FLOAT,
update_time FLOAT,
PRIMARY KEY (time, publisher, subtopic, name)
)`,
`CREATE TABLE IF NOT EXISTS json (
created BIGINT,
subtopic VARCHAR(254),
publisher VARCHAR(254),
protocol TEXT,
payload JSONB,
PRIMARY KEY (publisher, subtopic, created)
)`,
},
Down: []string{
"DROP TABLE messages",
"DROP TABLE json",
},
},
},
Expand Down
45 changes: 9 additions & 36 deletions consumers/writers/timescale/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package timescale
import (
"context"
"encoding/json"
"fmt"

"github.com/MainfluxLabs/mainflux/consumers"
"github.com/MainfluxLabs/mainflux/pkg/errors"
Expand All @@ -20,7 +19,6 @@ import (
var (
errInvalidMessage = errors.New("invalid message representation")
errTransRollback = errors.New("failed to rollback transaction")
errNoTable = errors.New("relation does not exist")
)

var _ consumers.Consumer = (*timescaleRepo)(nil)
Expand Down Expand Up @@ -89,20 +87,14 @@ func (tr timescaleRepo) saveSenml(messages interface{}) (err error) {
return err
}

func (tr timescaleRepo) saveJSON(msgs mfjson.Messages) error {
if err := tr.insertJSON(msgs); err != nil {
if err == errNoTable {
if err := tr.createTable(msgs.Format); err != nil {
return err
}
return tr.insertJSON(msgs)
}
return err
func (tr timescaleRepo) saveJSON(messages interface{}) error {
msgs, ok := messages.(mfjson.Messages)
if !ok {
return errors.ErrSaveMessage
}
return nil
}
q := `INSERT INTO json (created, subtopic, publisher, protocol, payload)
VALUES (:created, :subtopic, :publisher, :protocol, :payload);`

func (tr timescaleRepo) insertJSON(msgs mfjson.Messages) error {
tx, err := tr.db.BeginTxx(context.Background(), nil)
if err != nil {
return errors.Wrap(errors.ErrSaveMessage, err)
Expand All @@ -120,44 +112,25 @@ func (tr timescaleRepo) insertJSON(msgs mfjson.Messages) error {
}
}()

q := `INSERT INTO %s (created, subtopic, publisher, protocol, payload)
VALUES (:created, :subtopic, :publisher, :protocol, :payload);`
q = fmt.Sprintf(q, msgs.Format)

for _, m := range msgs.Data {
var dbmsg jsonMessage
dbmsg, err = toJSONMessage(m)
if err != nil {
return errors.Wrap(errors.ErrSaveMessage, err)
}

if _, err = tx.NamedExec(q, dbmsg); err != nil {
pgErr, ok := err.(*pgconn.PgError)
if ok {
switch pgErr.Code {
case pgerrcode.InvalidTextRepresentation:
return errors.Wrap(errors.ErrSaveMessage, errInvalidMessage)
case pgerrcode.UndefinedTable:
return errNoTable
}
}
return err

return errors.Wrap(errors.ErrSaveMessage, err)
}
}
return nil
}

func (tr timescaleRepo) createTable(name string) error {
q := `CREATE TABLE IF NOT EXISTS %s (
created BIGINT NOT NULL,
subtopic VARCHAR(254),
publisher VARCHAR(254),
protocol TEXT,
payload JSONB,
PRIMARY KEY (created, publisher, subtopic)
);`
q = fmt.Sprintf(q, name)

_, err := tr.db.Exec(q)
return err
}

Expand Down
Loading

0 comments on commit eea9e08

Please sign in to comment.