diff --git a/README.md b/README.md index a030dc2..4af16fe 100644 --- a/README.md +++ b/README.md @@ -44,9 +44,10 @@ topic::queue1;topic::queue2 <-- will read from queue1 and queue2 on the topic Subscribing with client acknowledgement option (ENV variables): -STOMP_CONSUMER_WIN_SIZE=1024 // number of bytes that Broker will send to client before it expects ACK +``` +STOMP_CONSUMER_WIN_SIZE=819200 // number of bytes that Broker will send to client before it expects ACK STOMP_CONSUMER_ACK_MODE=client // mode: client (ACK needs to be sent) | auto (no ACK, and window-size has to be -1 in that case) - +``` You can see all other available ``.env`` variables, their defaults and usage explanation within the [config file](config/stomp.php). diff --git a/config/stomp.php b/config/stomp.php index c02d96c..c37d231 100644 --- a/config/stomp.php +++ b/config/stomp.php @@ -78,7 +78,7 @@ * Once that happens the client will not receive any more messages until it sends the appropriate ACK or NACK * frame for the messages it already has. */ - 'consumer_window_size' => env('STOMP_CONSUMER_WIN_SIZE', 1048576), + 'consumer_window_size' => env('STOMP_CONSUMER_WIN_SIZE', 819200), /** * Subscribe mode: auto, client. diff --git a/src/Queue/Jobs/StompJob.php b/src/Queue/Jobs/StompJob.php index 2475e01..1d01992 100644 --- a/src/Queue/Jobs/StompJob.php +++ b/src/Queue/Jobs/StompJob.php @@ -4,6 +4,7 @@ use Asseco\Stomp\Queue\Stomp\Config; use Asseco\Stomp\Queue\StompQueue; +use Illuminate\Broadcasting\BroadcastEvent; use Illuminate\Container\Container; use Illuminate\Contracts\Queue\Job as JobContract; use Illuminate\Queue\Jobs\Job; @@ -112,13 +113,36 @@ public function fire() protected function isNativeLaravelJob(): bool { - return array_key_exists('job', $this->payload); + $job = Arr::get($this->payload, 'job'); + + return $job && str_contains($job, 'CallQueuedHandler@call'); + } + + protected function laravelJobClassExists(): bool + { + $eventClassName = Arr::get($this->payload, 'displayName'); + if ($eventClassName) { + return class_exists($eventClassName); + } else { + $command = Arr::get($this->payload, 'data.command'); + $command = $command ?? unserialize($command); + /** @var BroadcastEvent $command */ + if ($command & $command->event && class_exists(get_class($command->event))) { + return true; + } + } + + return false; } protected function fireLaravelJob(): void { - [$class, $method] = JobName::parse($this->payload['job']); - ($this->instance = $this->resolve($class))->{$method}($this, $this->payload['data']); + if ($this->laravelJobClassExists()) { + [$class, $method] = JobName::parse($this->payload['job']); + ($this->instance = $this->resolve($class))->{$method}($this, $this->payload['data']); + } else { + $this->log->error("$this->session [STOMP] Laravel job class does not exist!"); + } } protected function fireExternalJob(): void @@ -241,8 +265,6 @@ protected function failed($e) protected function ackIfNecessary() { - if (Config::get('consumer_ack_mode') == StompQueue::ACK_MODE_CLIENT && $this->frame) { - $this->stompQueue->client->ack($this->frame); - } + $this->stompQueue->ackLastFrameIfNecessary(); } } diff --git a/src/Queue/StompQueue.php b/src/Queue/StompQueue.php index c1de818..9782dfe 100644 --- a/src/Queue/StompQueue.php +++ b/src/Queue/StompQueue.php @@ -363,10 +363,13 @@ public function pop($queue = null) if (!$queueFromFrame) { $this->log->error("$this->session [STOMP] Wrong frame received. Expected MESSAGE, got: " . print_r($frame, true)); + $this->_lastFrame = null; return null; } + $this->_lastFrame = $frame; + return new StompJob($this->container, $this, $frame, $queueFromFrame); } @@ -500,7 +503,7 @@ protected function subscribeToQueues(): void continue; } - $winSize = Config::get('consumer_window_size') ?? 512000; + $winSize = Config::get('consumer_window_size') ?? 819200; if ($this->_ackMode != self::ACK_MODE_CLIENT) { $winSize = -1; } @@ -521,7 +524,7 @@ protected function subscribeToQueues(): void * * @return void */ - protected function ackLastFrameIfNecessary() + public function ackLastFrameIfNecessary() { if ($this->_ackMode == self::ACK_MODE_CLIENT && $this->_lastFrame) { $this->client->ack($this->_lastFrame);