From 6a24750fd517478ebca85d7f3491650871ab3040 Mon Sep 17 00:00:00 2001 From: ngasparic Date: Mon, 20 May 2024 12:18:04 +0200 Subject: [PATCH 1/3] fixes --- README.md | 5 +++-- config/stomp.php | 2 +- src/Queue/Jobs/StompJob.php | 35 +++++++++++++++++++++++++++++------ src/Queue/StompQueue.php | 8 +++++--- 4 files changed, 38 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index a030dc2..4af16fe 100644 --- a/README.md +++ b/README.md @@ -44,9 +44,10 @@ topic::queue1;topic::queue2 <-- will read from queue1 and queue2 on the topic Subscribing with client acknowledgement option (ENV variables): -STOMP_CONSUMER_WIN_SIZE=1024 // number of bytes that Broker will send to client before it expects ACK +``` +STOMP_CONSUMER_WIN_SIZE=819200 // number of bytes that Broker will send to client before it expects ACK STOMP_CONSUMER_ACK_MODE=client // mode: client (ACK needs to be sent) | auto (no ACK, and window-size has to be -1 in that case) - +``` You can see all other available ``.env`` variables, their defaults and usage explanation within the [config file](config/stomp.php). diff --git a/config/stomp.php b/config/stomp.php index c02d96c..c37d231 100644 --- a/config/stomp.php +++ b/config/stomp.php @@ -78,7 +78,7 @@ * Once that happens the client will not receive any more messages until it sends the appropriate ACK or NACK * frame for the messages it already has. */ - 'consumer_window_size' => env('STOMP_CONSUMER_WIN_SIZE', 1048576), + 'consumer_window_size' => env('STOMP_CONSUMER_WIN_SIZE', 819200), /** * Subscribe mode: auto, client. diff --git a/src/Queue/Jobs/StompJob.php b/src/Queue/Jobs/StompJob.php index 2475e01..2ee084d 100644 --- a/src/Queue/Jobs/StompJob.php +++ b/src/Queue/Jobs/StompJob.php @@ -4,6 +4,7 @@ use Asseco\Stomp\Queue\Stomp\Config; use Asseco\Stomp\Queue\StompQueue; +use Illuminate\Broadcasting\BroadcastEvent; use Illuminate\Container\Container; use Illuminate\Contracts\Queue\Job as JobContract; use Illuminate\Queue\Jobs\Job; @@ -112,13 +113,37 @@ public function fire() protected function isNativeLaravelJob(): bool { - return array_key_exists('job', $this->payload); + $job = Arr::get($this->payload, 'job'); + return ($job && str_contains($job, 'CallQueuedHandler@call')); + } + + protected function laravelJobClassExists(): bool { + $eventClassName = Arr::get($this->payload, 'displayName'); + if ($eventClassName) { + return class_exists($eventClassName); + } + else { + $command = Arr::get($this->payload, 'data.command'); + $command = $command ?? unserialize($command); + /** @var BroadcastEvent $comand */ + if ($command & $comand->event && (class_exists(get_class($comand->event)))) { + return true; + } + } + + return false; } protected function fireLaravelJob(): void { - [$class, $method] = JobName::parse($this->payload['job']); - ($this->instance = $this->resolve($class))->{$method}($this, $this->payload['data']); + if ($this->laravelJobClassExists()) { + [$class, $method] = JobName::parse($this->payload['job']); + ($this->instance = $this->resolve($class))->{$method}($this, $this->payload['data']); + } + else { + $this->log->error("$this->session [STOMP] Laravel job class does not exist!"); + } + } protected function fireExternalJob(): void @@ -241,8 +266,6 @@ protected function failed($e) protected function ackIfNecessary() { - if (Config::get('consumer_ack_mode') == StompQueue::ACK_MODE_CLIENT && $this->frame) { - $this->stompQueue->client->ack($this->frame); - } + $this->stompQueue->ackLastFrameIfNecessary(); } } diff --git a/src/Queue/StompQueue.php b/src/Queue/StompQueue.php index c1de818..f5a2305 100644 --- a/src/Queue/StompQueue.php +++ b/src/Queue/StompQueue.php @@ -363,10 +363,12 @@ public function pop($queue = null) if (!$queueFromFrame) { $this->log->error("$this->session [STOMP] Wrong frame received. Expected MESSAGE, got: " . print_r($frame, true)); - + $this->_lastFrame = null; return null; } + $this->_lastFrame = $frame; + return new StompJob($this->container, $this, $frame, $queueFromFrame); } @@ -500,7 +502,7 @@ protected function subscribeToQueues(): void continue; } - $winSize = Config::get('consumer_window_size') ?? 512000; + $winSize = Config::get('consumer_window_size') ?? 819200; if ($this->_ackMode != self::ACK_MODE_CLIENT) { $winSize = -1; } @@ -521,7 +523,7 @@ protected function subscribeToQueues(): void * * @return void */ - protected function ackLastFrameIfNecessary() + public function ackLastFrameIfNecessary() { if ($this->_ackMode == self::ACK_MODE_CLIENT && $this->_lastFrame) { $this->client->ack($this->_lastFrame); From e2e76543932a58021a105c80d9195915bbd431e7 Mon Sep 17 00:00:00 2001 From: ngasparic Date: Mon, 20 May 2024 12:26:48 +0200 Subject: [PATCH 2/3] typo --- src/Queue/Jobs/StompJob.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Queue/Jobs/StompJob.php b/src/Queue/Jobs/StompJob.php index 2ee084d..07431cd 100644 --- a/src/Queue/Jobs/StompJob.php +++ b/src/Queue/Jobs/StompJob.php @@ -125,8 +125,8 @@ protected function laravelJobClassExists(): bool { else { $command = Arr::get($this->payload, 'data.command'); $command = $command ?? unserialize($command); - /** @var BroadcastEvent $comand */ - if ($command & $comand->event && (class_exists(get_class($comand->event)))) { + /** @var BroadcastEvent $command */ + if ($command & $command->event && (class_exists(get_class($command->event)))) { return true; } } From 37b72f3790926f80db509d8908ed967711d7403c Mon Sep 17 00:00:00 2001 From: StyleCI Bot Date: Mon, 20 May 2024 10:27:40 +0000 Subject: [PATCH 3/3] Apply fixes from StyleCI --- src/Queue/Jobs/StompJob.php | 15 +++++++-------- src/Queue/StompQueue.php | 1 + 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/Queue/Jobs/StompJob.php b/src/Queue/Jobs/StompJob.php index 07431cd..1d01992 100644 --- a/src/Queue/Jobs/StompJob.php +++ b/src/Queue/Jobs/StompJob.php @@ -114,19 +114,20 @@ public function fire() protected function isNativeLaravelJob(): bool { $job = Arr::get($this->payload, 'job'); - return ($job && str_contains($job, 'CallQueuedHandler@call')); + + return $job && str_contains($job, 'CallQueuedHandler@call'); } - protected function laravelJobClassExists(): bool { + protected function laravelJobClassExists(): bool + { $eventClassName = Arr::get($this->payload, 'displayName'); if ($eventClassName) { return class_exists($eventClassName); - } - else { + } else { $command = Arr::get($this->payload, 'data.command'); $command = $command ?? unserialize($command); /** @var BroadcastEvent $command */ - if ($command & $command->event && (class_exists(get_class($command->event)))) { + if ($command & $command->event && class_exists(get_class($command->event))) { return true; } } @@ -139,11 +140,9 @@ protected function fireLaravelJob(): void if ($this->laravelJobClassExists()) { [$class, $method] = JobName::parse($this->payload['job']); ($this->instance = $this->resolve($class))->{$method}($this, $this->payload['data']); - } - else { + } else { $this->log->error("$this->session [STOMP] Laravel job class does not exist!"); } - } protected function fireExternalJob(): void diff --git a/src/Queue/StompQueue.php b/src/Queue/StompQueue.php index f5a2305..9782dfe 100644 --- a/src/Queue/StompQueue.php +++ b/src/Queue/StompQueue.php @@ -364,6 +364,7 @@ public function pop($queue = null) if (!$queueFromFrame) { $this->log->error("$this->session [STOMP] Wrong frame received. Expected MESSAGE, got: " . print_r($frame, true)); $this->_lastFrame = null; + return null; }