From 341670172dffd728d7d0f3da948b2e8e998925e6 Mon Sep 17 00:00:00 2001 From: gdarko Date: Tue, 5 Jul 2022 12:18:04 +0200 Subject: [PATCH] Init --- .gitignore | 3 + README.md | 11 ++ config.sample.php | 23 ++++ src/Abstracts/BaseAdapter.php | 68 +++++++++++ src/Abstracts/BaseConnection.php | 62 ++++++++++ src/Adapters/RabbitMQ/RabbitMQAdapter.php | 116 ++++++++++++++++++ src/Adapters/RabbitMQ/RabbitMQConnection.php | 63 ++++++++++ src/Exchange.php | 54 ++++++++ src/Manager.php | 122 +++++++++++++++++++ src/Message.php | 54 ++++++++ 10 files changed, 576 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 config.sample.php create mode 100644 src/Abstracts/BaseAdapter.php create mode 100644 src/Abstracts/BaseConnection.php create mode 100644 src/Adapters/RabbitMQ/RabbitMQAdapter.php create mode 100644 src/Adapters/RabbitMQ/RabbitMQConnection.php create mode 100644 src/Exchange.php create mode 100644 src/Manager.php create mode 100644 src/Message.php diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3ba0b2e --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +vendor/ +composer.json +composer.lock \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..99985b3 --- /dev/null +++ b/README.md @@ -0,0 +1,11 @@ +# Messages + +Abstractions for queue brokering packages like RabbitMQ, etc used in Microservices. + +### Installation + +``` +composer require ignitekit/messages +``` + + diff --git a/config.sample.php b/config.sample.php new file mode 100644 index 0000000..2316d5d --- /dev/null +++ b/config.sample.php @@ -0,0 +1,23 @@ + 'RabbitMQ', + 'adapters' => [ + 'RabbitMQ' => [ + 'hostname' => 'localhost', + 'username' => 'guest', + 'password' => 'guest', + 'port' => 5672, + 'settings' => [ + 'passive' => 0, + 'durable' => 1, + 'exclusive' => 0, + 'auto_delete' => 0 + ] + ] + ] +]; \ No newline at end of file diff --git a/src/Abstracts/BaseAdapter.php b/src/Abstracts/BaseAdapter.php new file mode 100644 index 0000000..b7f68c2 --- /dev/null +++ b/src/Abstracts/BaseAdapter.php @@ -0,0 +1,68 @@ +connection = $connection; + } + + /** + * Create a queue + * + * @return mixed + */ + abstract public function create( Exchange $exchange ); + + /** + * Dispatch a message + * + * @param Exchange $exchange + * @param Message $message + * @param array $config + * + * @return void + */ + abstract public function send( Exchange $exchange, Message $message, $config = array() ); + + /** + * Receive a message + * + * @param Exchange $exchange + * @param callable $callback + * @param array $config + * + * @return void + */ + abstract public function receive( Exchange $exchange, callable $callback, $config = array() ); + + /** + * The connection instance + * @return BaseConnection + */ + public function get_connection() { + return $this->connection; + } +} diff --git a/src/Abstracts/BaseConnection.php b/src/Abstracts/BaseConnection.php new file mode 100644 index 0000000..9a06511 --- /dev/null +++ b/src/Abstracts/BaseConnection.php @@ -0,0 +1,62 @@ +user = $user; + $this->pass = $pass; + $this->host = $host; + $this->port = $port; + + if ( $connect ) { + $this->open(); + } + } + + /** + * The destructor + */ + public function __destruct() { + $this->close(); + } + + /** + * Create the connection + * @return mixed + */ + abstract public function open(); + + /** + * Close the active connection + * @return void + */ + abstract public function close(); + + /** + * Returns the connection + * @return mixed + */ + abstract public function get(); + + /** + * Check if connected + * @return bool + */ + abstract public function is_connected(); + +} diff --git a/src/Adapters/RabbitMQ/RabbitMQAdapter.php b/src/Adapters/RabbitMQ/RabbitMQAdapter.php new file mode 100644 index 0000000..28f14a2 --- /dev/null +++ b/src/Adapters/RabbitMQ/RabbitMQAdapter.php @@ -0,0 +1,116 @@ +exchanges[ $exchange->get_name() ] ) ) { + return false; + } + + $this->exchanges[ $exchange->get_name() ] = $exchange; + + $this->connection + ->get() + ->channel() + ->queue_declare( + $exchange->get_name(), + $exchange->get_config( 'passive', false ), + $exchange->get_config( 'durable', true ), + $exchange->get_config( 'exclusive', false ), + $exchange->get_config( 'auto_delete', false ) + ); + + return true; + } + + /** + * Dispatch a message + * + * @param Exchange $exchange + * @param Message $message + * @param array $config + * + * @return void + */ + public function send( Exchange $exchange, Message $message, $config = array() ) { + + if ( ! isset( $this->exchanges[ $exchange->get_name() ] ) ) { + $this->create( $exchange ); + } + + $amq_message = new AMQPMessage( $message->get_body() ); + + $channel = $this->connection + ->get() + ->channel(); + + $channel->basic_publish( $amq_message, '', $exchange->get_name() ); + } + + /** + * Dispatch a message and close connection + * + * @param Exchange $exchange + * @param Message $message + * @param array $config + * + * @return void + */ + public function send_close( Exchange $exchange, Message $message, $config = array() ) { + $this->send( $exchange, $message, $config ); + $this->connection->close(); + } + + /** + * Receive a message + * + * @param Exchange $exchange + * @param callable $callback + * @param array $config + * + * @return void + */ + public function receive( Exchange $exchange, callable $callback, $config = array() ) { + + if ( ! isset( $this->exchanges[ $exchange->get_name() ] ) ) { + $this->create( $exchange ); + } + + $channel = $this->connection->get()->channel(); + + $channel->basic_consume( + $exchange->get_name(), + $exchange->get_config( 'consumer_tag', '' ), + $exchange->get_config( 'no_local', false ), + $exchange->get_config( 'no_ack', true ), + $exchange->get_config( 'exclusive', false ), + $exchange->get_config( 'nowait', false ), + $callback + ); + + while ( $channel->is_open() ) { + $channel->wait(); + } + + } +} diff --git a/src/Adapters/RabbitMQ/RabbitMQConnection.php b/src/Adapters/RabbitMQ/RabbitMQConnection.php new file mode 100644 index 0000000..3d596e4 --- /dev/null +++ b/src/Adapters/RabbitMQ/RabbitMQConnection.php @@ -0,0 +1,63 @@ +instance = new AMQPStreamConnection( + $this->host, + $this->port, + $this->user, + $this->pass + ); + + return $this->instance->isConnected(); + } + + /** + * Close the active connection + * @return void + */ + public function close() { + if ( ! $this->is_connected() ) { + return; + } + + try { + $this->instance->channel()->close(); + $this->instance->close(); + } catch ( \Exception $e ) { + return; + } + } + + /** + * Returns the connection + * @return AMQPStreamConnection + */ + public function get() { + return $this->instance; + } + + /** + * Check if connected + * @return bool + */ + public function is_connected() { + return $this->instance && $this->instance->isConnected(); + } +} diff --git a/src/Exchange.php b/src/Exchange.php new file mode 100644 index 0000000..5083db8 --- /dev/null +++ b/src/Exchange.php @@ -0,0 +1,54 @@ +name = $name; + $this->config = $config; + } + + /** + * The queue name + * @return string + */ + public function get_name() { + return $this->name; + } + + /** + * The queue config + * @return array|mixed + */ + public function get_config( $key = null, $default = null ) { + + if ( ! is_null( $key ) ) { + return isset( $this->config[ $key ] ) ? $this->config[ $key ] : $default; + } + + return $this->config; + } + +} diff --git a/src/Manager.php b/src/Manager.php new file mode 100644 index 0000000..5414932 --- /dev/null +++ b/src/Manager.php @@ -0,0 +1,122 @@ +config = $config; + if ( empty( $this->config ) ) { + throw new \Exception( 'No configuration found.' ); + } + + $current = isset( $this->config['default'] ) && isset( $this->config['adapters'][ $this->config['default'] ] ) ? $this->config['default'] : null; + + if ( is_null( $current ) ) { + throw new \Exception( 'The default message broker adapter is not found.' ); + } + + if ( 'RabbitMQ' === $current ) { + + $user = isset( $this->config['adapters'][ $current ]['username'] ) ? $this->config['adapters'][ $current ]['username'] : null; + $pass = isset( $this->config['adapters'][ $current ]['password'] ) ? $this->config['adapters'][ $current ]['password'] : null; + $host = isset( $this->config['adapters'][ $current ]['hostname'] ) ? $this->config['adapters'][ $current ]['hostname'] : null; + $port = isset( $this->config['adapters'][ $current ]['port'] ) ? $this->config['adapters'][ $current ]['port'] : null; + + $connection = new RabbitMQConnection( $user, $pass, $host, $port, true ); + $this->adapter = new RabbitMQAdapter( $connection ); + + } else { + throw new \Exception( 'Unsupported message broker adapter.' ); // Perhaps other? + } + + } + + /** + * The adapter instance + * @return BaseAdapter|RabbitMQAdapter + */ + public function get_adapter() { + return $this->adapter; + } + + /** + * Create queue + * + * @param Exchange $exchange + * + * @return void + */ + public function create( Exchange $exchange ) { + $this->get_adapter()->create( $exchange ); + } + + /** + * Dispatch a message and close + * @return void + * @throws \Exception + */ + public function send( Exchange $exchange, Message $message ) { + $this->get_adapter()->send( $exchange, $message ); + } + + /** + * Dispatch a message and close + * @return void + * @throws \Exception + */ + public function send_close( Exchange $exchange, Message $message ) { + $this->get_adapter()->send_close( $exchange, $message ); + } + + /** + * Receive a message from channel + * + * @param Exchange $exchange + * @param callable $callback + * + * @return void + * @throws \Exception + */ + public function receive( Exchange $exchange, callable $callback ) { + $this->get_adapter()->receive( $exchange, $callback ); + } + + /** + * Close the connection + * @return void + * @throws \Exception + */ + public function close() { + $connection = $this->get_adapter()->get_connection(); + if ( $connection && $connection->is_connected() ) { + $this->get_adapter()->get_connection()->close(); + } + } + +} diff --git a/src/Message.php b/src/Message.php new file mode 100644 index 0000000..d30dd7d --- /dev/null +++ b/src/Message.php @@ -0,0 +1,54 @@ +body = $body; + $this->data = $data; + } + + /** + * Returns the body + * @return mixed + */ + public function get_body() { + return $this->body; + } + + /** + * Returns data + * @return array|mixed + */ + public function get_data() { + return $this->data; + } + +}