From ccf990dee03fb58c09cb4a9aae2d7cf06d8b4392 Mon Sep 17 00:00:00 2001 From: Mikhail Sazonov Date: Wed, 12 Jun 2024 21:42:05 +0300 Subject: [PATCH] HW#14 --- hw12_13_14_15_calendar/cmd/scheduler/main.go | 389 +++++++++--------- hw12_13_14_15_calendar/cmd/sender/main.go | 156 +++---- .../internal/queue/rabbit/consumer.go | 192 ++++----- .../internal/queue/rabbit/producer.go | 173 ++++---- 4 files changed, 464 insertions(+), 446 deletions(-) diff --git a/hw12_13_14_15_calendar/cmd/scheduler/main.go b/hw12_13_14_15_calendar/cmd/scheduler/main.go index 34fedca..7b9ef11 100644 --- a/hw12_13_14_15_calendar/cmd/scheduler/main.go +++ b/hw12_13_14_15_calendar/cmd/scheduler/main.go @@ -1,188 +1,201 @@ -package main - -import ( - "context" - "fmt" - "os" - "os/signal" - "sync" - "syscall" - "time" - - "github.com/spf13/pflag" - "github.com/wursta/otus_go/hw12_13_14_15_calendar/internal/app" - "github.com/wursta/otus_go/hw12_13_14_15_calendar/internal/logger" - rabbit "github.com/wursta/otus_go/hw12_13_14_15_calendar/internal/queue/rabbit" - sqlstorage "github.com/wursta/otus_go/hw12_13_14_15_calendar/internal/storage/sql" -) - -var configFile string - -func init() { - pflag.StringVar(&configFile, "config", "/etc/calendar/scheduler_config.toml", "Path to configuration file") - pflag.Parse() -} - -func main() { - config, err := NewConfig(configFile) - if err != nil { - fmt.Printf("error reading config: %v\n", err) - return - } - - log, err := logger.New(config.Logger.Level, os.Stderr) - if err != nil { - fmt.Printf("error creating logger: %v\n", err) - return - } - - log.Debug("created logger", log) - - ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP) - defer cancel() - - done := make(chan bool) - - var storage app.Storage - switch config.Storage.Type { - case "inmemory": - log.Error("error creating storage: inmemory storage not allowed here") - return - case "postgres": - sqlStorage := sqlstorage.New(config.Postgres.Dsn) - ctx := context.Background() - err = sqlStorage.Connect(ctx) - if err != nil { - log.Error(fmt.Sprintf("error connecting to database: %v\n", err)) - return - } - defer sqlStorage.Close(ctx) - storage = sqlStorage - default: - log.Error("error creating storage: unknown storage type") - return - } - log.Debug("create storage", storage) - - producer := rabbit.NewProducer(config.Rabbit.URI, config.Rabbit.Exchange) - defer producer.Disconnect() - - err = producer.Connect() - if err != nil { - log.Error(fmt.Sprint("connect error:", err)) - return - } - log.Debug("create producer and connected", producer) - - wg := &sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - runScheduler( - context.Background(), - log, - storage, - config.Scheduler.EventsNotifyCheckFrequency, - done, - producer, - ) - }() - - wg.Add(1) - go func() { - defer wg.Done() - runOldEventsCleaner( - context.Background(), - log, - storage, - config.Scheduler.OldEventsCleanerFrequency, - done, - ) - }() - - <-ctx.Done() - close(done) - - wg.Wait() -} - -func runScheduler( - ctx context.Context, - log *logger.Logger, - storage app.Storage, - frequency time.Duration, - doneCh <-chan bool, - producer *rabbit.Producer, -) { - checkEventsForNotify(ctx, log, storage, producer) - - ticker := time.NewTicker(frequency) - defer ticker.Stop() - - for { - select { - case <-doneCh: - log.Info("Stopping events notify checker") - return - case <-ticker.C: - checkEventsForNotify(ctx, log, storage, producer) - } - } -} - -func runOldEventsCleaner( - ctx context.Context, - log *logger.Logger, - storage app.Storage, - frequency time.Duration, - doneCh <-chan bool, -) { - removeOldEvents(ctx, log, storage) - - ticker := time.NewTicker(frequency) - defer ticker.Stop() - - for { - select { - case <-doneCh: - log.Info("Stopping old events cleaner") - return - case <-ticker.C: - removeOldEvents(ctx, log, storage) - } - } -} - -func checkEventsForNotify(ctx context.Context, log *logger.Logger, storage app.Storage, producer *rabbit.Producer) { - events := storage.GetEventsForNotify(ctx, time.Now().Format(time.DateOnly)) - log.Info(fmt.Sprintf("Fetched events for notify: %d", len(events))) - - for i := range events { - err := producer.ProduceEvent(events[i]) - if err != nil { - log.Error(fmt.Sprint("error consume event:", err)) - } else { - event := events[i] - event.Notified = true - err = storage.UpdateEvent(ctx, event.ID, event) - if err != nil { - log.Error(fmt.Sprint("error updating event:", err)) - } - log.Info("Consume event: " + events[i].ID) - } - } -} - -func removeOldEvents(ctx context.Context, log *logger.Logger, storage app.Storage) { - yearAgo := time.Now().AddDate(-1, 0, 0) - events := storage.GetEventsListByDates(ctx, nil, &yearAgo) - - log.Info(fmt.Sprintf("Fetched old events for clean: %d", len(events))) - - for i := range events { - err := storage.DeleteEvent(ctx, events[i].ID) - if err != nil { - log.Error(fmt.Sprint("error while deleting old event:", err)) - } - } -} +package main + +import ( + "context" + "fmt" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "github.com/spf13/pflag" + "github.com/wursta/otus_go/hw12_13_14_15_calendar/internal/app" + "github.com/wursta/otus_go/hw12_13_14_15_calendar/internal/logger" + rabbit "github.com/wursta/otus_go/hw12_13_14_15_calendar/internal/queue/rabbit" + sqlstorage "github.com/wursta/otus_go/hw12_13_14_15_calendar/internal/storage/sql" +) + +var configFile string + +func init() { + pflag.StringVar(&configFile, "config", "/etc/calendar/scheduler_config.toml", "Path to configuration file") + pflag.Parse() +} + +func main() { + config, err := NewConfig(configFile) + if err != nil { + fmt.Printf("error reading config: %v\n", err) + return + } + + log, err := logger.New(config.Logger.Level, os.Stderr) + if err != nil { + fmt.Printf("error creating logger: %v\n", err) + return + } + + log.Debug("created logger", log) + + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP) + defer cancel() + + done := make(chan bool) + + var storage app.Storage + + switch config.Storage.Type { + case "inmemory": + log.Error("error creating storage: inmemory storage not allowed here") + return + case "postgres": + sqlStorage := sqlstorage.New(config.Postgres.Dsn) + + ctx := context.Background() + err = sqlStorage.Connect(ctx) + if err != nil { + log.Error(fmt.Sprintf("error connecting to database: %v\n", err)) + return + } + defer sqlStorage.Close(ctx) + + storage = sqlStorage + + default: + log.Error("error creating storage: unknown storage type") + return + } + + log.Debug("create storage", storage) + + producer := rabbit.NewProducer(config.Rabbit.URI, config.Rabbit.Exchange) + + err = producer.Connect() + if err != nil { + log.Error(fmt.Sprint("connect error:", err)) + return + } + defer producer.Disconnect() + + log.Debug("create producer and connected", producer) + + wg := &sync.WaitGroup{} + + wg.Add(1) + go func() { + defer wg.Done() + runScheduler( + context.Background(), + log, + storage, + config.Scheduler.EventsNotifyCheckFrequency, + done, + producer, + ) + }() + + wg.Add(1) + go func() { + defer wg.Done() + runOldEventsCleaner( + context.Background(), + log, + storage, + config.Scheduler.OldEventsCleanerFrequency, + done, + ) + }() + + <-ctx.Done() + + close(done) + + wg.Wait() +} + +func runScheduler( + ctx context.Context, + log *logger.Logger, + storage app.Storage, + frequency time.Duration, + doneCh <-chan bool, + producer *rabbit.Producer, +) { + checkEventsForNotify(ctx, log, storage, producer) + + ticker := time.NewTicker(frequency) + defer ticker.Stop() + + for { + select { + case <-doneCh: + log.Info("Stopping events notify checker") + return + case <-ticker.C: + checkEventsForNotify(ctx, log, storage, producer) + } + } +} + +func runOldEventsCleaner( + ctx context.Context, + log *logger.Logger, + storage app.Storage, + frequency time.Duration, + doneCh <-chan bool, +) { + removeOldEvents(ctx, log, storage) + + ticker := time.NewTicker(frequency) + defer ticker.Stop() + + for { + select { + case <-doneCh: + log.Info("Stopping old events cleaner") + return + case <-ticker.C: + removeOldEvents(ctx, log, storage) + } + } +} + +func checkEventsForNotify(ctx context.Context, log *logger.Logger, storage app.Storage, producer *rabbit.Producer) { + events := storage.GetEventsForNotify(ctx, time.Now().Format(time.DateOnly)) + + log.Info(fmt.Sprintf("Fetched events for notify: %d", len(events))) + + for i := range events { + event := events[i] + + err := producer.ProduceEvent(event) + + if err != nil { + log.Error(fmt.Sprint("error consume event:", err)) + } else { + event.Notified = true + + err = storage.UpdateEvent(ctx, event.ID, event) + if err != nil { + log.Error(fmt.Sprint("error updating event:", err)) + } + + log.Info("Consume event: " + events[i].ID) + } + } +} + +func removeOldEvents(ctx context.Context, log *logger.Logger, storage app.Storage) { + yearAgo := time.Now().AddDate(-1, 0, 0) + events := storage.GetEventsListByDates(ctx, nil, &yearAgo) + + log.Info(fmt.Sprintf("Fetched old events for clean: %d", len(events))) + + for i := range events { + err := storage.DeleteEvent(ctx, events[i].ID) + if err != nil { + log.Error(fmt.Sprint("error while deleting old event:", err)) + } + } +} diff --git a/hw12_13_14_15_calendar/cmd/sender/main.go b/hw12_13_14_15_calendar/cmd/sender/main.go index 7e65246..427b5e6 100644 --- a/hw12_13_14_15_calendar/cmd/sender/main.go +++ b/hw12_13_14_15_calendar/cmd/sender/main.go @@ -1,76 +1,80 @@ -package main - -import ( - "context" - "fmt" - "os" - "os/signal" - "syscall" - - "github.com/spf13/pflag" - "github.com/streadway/amqp" - "github.com/wursta/otus_go/hw12_13_14_15_calendar/internal/logger" - rabbit "github.com/wursta/otus_go/hw12_13_14_15_calendar/internal/queue/rabbit" -) - -var configFile string - -func init() { - pflag.StringVar(&configFile, "config", "/etc/calendar/scheduler_config.toml", "Path to configuration file") - pflag.Parse() -} - -func main() { - config, err := NewConfig(configFile) - if err != nil { - fmt.Printf("error reading config: %v\n", err) - return - } - - log, err := logger.New(config.Logger.Level, os.Stderr) - if err != nil { - fmt.Printf("error creating logger: %v\n", err) - return - } - - log.Debug("created logger", log) - - ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP) - defer cancel() - - consumer := rabbit.NewConsumer(config.Rabbit.URI, config.Rabbit.Exchange, config.Rabbit.Queue) - defer consumer.Disconnect() - log.Debug("create consumer", consumer) - - err = consumer.Connect() - if err != nil { - log.Error(fmt.Sprint("connect error:", err)) - return - } - log.Debug("connect consumer") - - deliveries, err := consumer.ConsumeEvents() - if err != nil { - fmt.Printf("error consuming events: %v\n", err) - return - } - - go handleEvents(log, deliveries) - - <-ctx.Done() -} - -func handleEvents(log *logger.Logger, deliveries <-chan amqp.Delivery) { - for d := range deliveries { - log.Info( - fmt.Sprintf( - "got %dB delivery: [%v] %q", - len(d.Body), - d.DeliveryTag, - d.Body, - ), - ) - d.Ack(false) - } - log.Debug("handle: deliveries channel closed") -} +package main + +import ( + "context" + "fmt" + "os" + "os/signal" + "syscall" + + "github.com/spf13/pflag" + "github.com/streadway/amqp" + "github.com/wursta/otus_go/hw12_13_14_15_calendar/internal/logger" + rabbit "github.com/wursta/otus_go/hw12_13_14_15_calendar/internal/queue/rabbit" +) + +var configFile string + +func init() { + pflag.StringVar(&configFile, "config", "/etc/calendar/scheduler_config.toml", "Path to configuration file") + pflag.Parse() +} + +func main() { + config, err := NewConfig(configFile) + if err != nil { + fmt.Printf("error reading config: %v\n", err) + return + } + + log, err := logger.New(config.Logger.Level, os.Stderr) + if err != nil { + fmt.Printf("error creating logger: %v\n", err) + return + } + + log.Debug("created logger", log) + + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP) + defer cancel() + + consumer := rabbit.NewConsumer(config.Rabbit.URI, config.Rabbit.Exchange, config.Rabbit.Queue) + defer consumer.Disconnect() + + log.Debug("create consumer", consumer) + + err = consumer.Connect() + if err != nil { + log.Error(fmt.Sprint("connect error:", err)) + return + } + + log.Debug("connect consumer") + + deliveries, err := consumer.ConsumeEvents() + if err != nil { + fmt.Printf("error consuming events: %v\n", err) + return + } + + go handleEvents(log, deliveries) + + <-ctx.Done() +} + +func handleEvents(log *logger.Logger, deliveries <-chan amqp.Delivery) { + for d := range deliveries { + log.Info( + fmt.Sprintf( + "got %dB delivery: [%v] %q", + len(d.Body), + d.DeliveryTag, + d.Body, + ), + ) + + d.Ack(false) + } + + log.Debug("handle: deliveries channel closed") +} diff --git a/hw12_13_14_15_calendar/internal/queue/rabbit/consumer.go b/hw12_13_14_15_calendar/internal/queue/rabbit/consumer.go index b360469..3eb6989 100644 --- a/hw12_13_14_15_calendar/internal/queue/rabbit/consumer.go +++ b/hw12_13_14_15_calendar/internal/queue/rabbit/consumer.go @@ -1,96 +1,96 @@ -package rabbit - -import ( - "fmt" - - "github.com/streadway/amqp" -) - -type Consumer struct { - uri string - exchange string - queue string - connection *amqp.Connection - channel *amqp.Channel -} - -func NewConsumer(uri, exchange, queue string) *Consumer { - return &Consumer{ - uri: uri, - exchange: exchange, - queue: queue, - } -} - -func (c *Consumer) Connect() error { - connection, err := amqp.Dial(c.uri) - if err != nil { - return fmt.Errorf("dial error: %w", err) - } - - channel, err := connection.Channel() - if err != nil { - return fmt.Errorf("channel error: %w", err) - } - - if err = channel.ExchangeDeclare( - c.exchange, // name - "direct", // type - true, // durable - false, // auto-deleted - false, // internal - false, // noWait - nil, // arguments - ); err != nil { - return fmt.Errorf("exchange declare error: %w", err) - } - - if _, err = channel.QueueDeclare( - c.queue, // name of the queue - true, // durable - false, // delete when unused - false, // exclusive - false, // noWait - nil, // arguments - ); err != nil { - return fmt.Errorf("queue declare error: %w", err) - } - - if err = channel.QueueBind( - c.queue, // name of the queue - "", // bindingKey - c.exchange, // sourceExchange - false, // noWait - nil, // arguments - ); err != nil { - return fmt.Errorf("queue bind error: %w", err) - } - - c.connection = connection - c.channel = channel - - return nil -} - -func (c *Consumer) Disconnect() { - if c.connection != nil { - c.connection.Close() - } -} - -func (c *Consumer) ConsumeEvents() (<-chan amqp.Delivery, error) { - deliveries, err := c.channel.Consume( - c.queue, - "simple-consumer", - false, // noAck - false, // exclusive - false, // noLocal - false, // noWait - nil, // arguments - ) - if err != nil { - return nil, fmt.Errorf("queue consume error: %w", err) - } - - return deliveries, nil -} +package rabbit + +import ( + "fmt" + + "github.com/streadway/amqp" +) + +type Consumer struct { + uri string + exchange string + queue string + connection *amqp.Connection + channel *amqp.Channel +} + +func NewConsumer(uri, exchange, queue string) *Consumer { + return &Consumer{ + uri: uri, + exchange: exchange, + queue: queue, + } +} + +func (c *Consumer) Connect() error { + connection, err := amqp.Dial(c.uri) + if err != nil { + return fmt.Errorf("dial error: %w", err) + } + + channel, err := connection.Channel() + if err != nil { + return fmt.Errorf("channel error: %w", err) + } + + if err = channel.ExchangeDeclare( + c.exchange, // name + "direct", // type + true, // durable + false, // auto-deleted + false, // internal + false, // noWait + nil, // arguments + ); err != nil { + return fmt.Errorf("exchange declare error: %w", err) + } + + if _, err = channel.QueueDeclare( + c.queue, // name of the queue + true, // durable + false, // delete when unused + false, // exclusive + false, // noWait + nil, // arguments + ); err != nil { + return fmt.Errorf("queue declare error: %w", err) + } + + if err = channel.QueueBind( + c.queue, // name of the queue + "", // bindingKey + c.exchange, // sourceExchange + false, // noWait + nil, // arguments + ); err != nil { + return fmt.Errorf("queue bind error: %w", err) + } + + c.connection = connection + c.channel = channel + + return nil +} + +func (c *Consumer) Disconnect() { + if c.connection != nil { + c.connection.Close() + } +} + +func (c *Consumer) ConsumeEvents() (<-chan amqp.Delivery, error) { + deliveries, err := c.channel.Consume( + c.queue, + "simple-consumer", + false, // noAck + false, // exclusive + false, // noLocal + false, // noWait + nil, // arguments + ) + if err != nil { + return nil, fmt.Errorf("queue consume error: %w", err) + } + + return deliveries, nil +} diff --git a/hw12_13_14_15_calendar/internal/queue/rabbit/producer.go b/hw12_13_14_15_calendar/internal/queue/rabbit/producer.go index 575be78..fd0d7a3 100644 --- a/hw12_13_14_15_calendar/internal/queue/rabbit/producer.go +++ b/hw12_13_14_15_calendar/internal/queue/rabbit/producer.go @@ -1,86 +1,87 @@ -package rabbit - -import ( - "fmt" - - "github.com/streadway/amqp" - "github.com/wursta/otus_go/hw12_13_14_15_calendar/internal/storage" -) - -type Producer struct { - uri string - exchange string - connection *amqp.Connection - channel *amqp.Channel -} - -func NewProducer(uri, exchange string) *Producer { - return &Producer{ - uri: uri, - exchange: exchange, - } -} - -func (p *Producer) Connect() error { - connection, err := amqp.Dial(p.uri) - if err != nil { - return fmt.Errorf("dial error: %w", err) - } - - channel, err := connection.Channel() - if err != nil { - return fmt.Errorf("channel error: %w", err) - } - - if err := channel.ExchangeDeclare( - p.exchange, // name - "direct", // type - true, // durable - false, // auto-deleted - false, // internal - false, // noWait - nil, // arguments - ); err != nil { - return fmt.Errorf("exchange declare error: %w", err) - } - - p.connection = connection - p.channel = channel - - return nil -} - -func (p *Producer) Disconnect() { - if p.connection != nil { - p.connection.Close() - p.connection = nil - p.channel = nil - } -} - -func (p *Producer) ProduceEvent(event storage.Event) error { - eventJSON, err := event.MarshalJSON() - if err != nil { - return fmt.Errorf("error marshalling event: %w", err) - } - - err = p.channel.Publish( - p.exchange, - "", - false, - false, - amqp.Publishing{ - Headers: amqp.Table{}, - ContentType: "application/json", - ContentEncoding: "", - Body: eventJSON, - DeliveryMode: amqp.Transient, - Priority: 0, - }, - ) - if err != nil { - return fmt.Errorf("error while publishing to rabbit mq: %w", err) - } - - return nil -} +package rabbit + +import ( + "fmt" + + "github.com/streadway/amqp" + "github.com/wursta/otus_go/hw12_13_14_15_calendar/internal/storage" +) + +type Producer struct { + uri string + exchange string + connection *amqp.Connection + channel *amqp.Channel +} + +func NewProducer(uri, exchange string) *Producer { + return &Producer{ + uri: uri, + exchange: exchange, + } +} + +func (p *Producer) Connect() error { + connection, err := amqp.Dial(p.uri) + if err != nil { + return fmt.Errorf("dial error: %w", err) + } + + channel, err := connection.Channel() + if err != nil { + return fmt.Errorf("channel error: %w", err) + } + + if err := channel.ExchangeDeclare( + p.exchange, // name + "direct", // type + true, // durable + false, // auto-deleted + false, // internal + false, // noWait + nil, // arguments + ); err != nil { + return fmt.Errorf("exchange declare error: %w", err) + } + + p.connection = connection + p.channel = channel + + return nil +} + +func (p *Producer) Disconnect() { + if p.connection != nil { + p.connection.Close() + p.connection = nil + p.channel = nil + } +} + +func (p *Producer) ProduceEvent(event storage.Event) error { + eventJSON, err := event.MarshalJSON() + if err != nil { + return fmt.Errorf("error marshalling event: %w", err) + } + + err = p.channel.Publish( + p.exchange, + "", + false, + false, + amqp.Publishing{ + Headers: amqp.Table{}, + ContentType: "application/json", + ContentEncoding: "", + Body: eventJSON, + DeliveryMode: amqp.Transient, + Priority: 0, + }, + ) + + if err != nil { + return fmt.Errorf("error while publishing to rabbit mq: %w", err) + } + + return nil +}