Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hotfix 4 #38

Merged
merged 4 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions config/stomp.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,16 @@
'auto_tries' => env('STOMP_AUTO_TRIES', true),
'auto_backoff' => env('STOMP_AUTO_BACKOFF', true),

/*
* Will failed job be re-queued ?
* We experienced issues with pushing Jobs back to the topic/queue, so we're turning this OFF
*/
'fail_job_requeue' => env('STOMP_FAILED_JOB_REQUEUE', false),

/** If all messages should fail on timeout. Set to false in order to revert to default (looking in event payload) */
'fail_on_timeout' => env('STOMP_FAIL_ON_TIMEOUT', true),
/** Maximum time in seconds for job execution. This value must be less than send heartbeat in order to run correctly. */
'timeout' => env('STOMP_TIMEOUT', 10),
'timeout' => env('STOMP_TIMEOUT', 45),

/**
* Incremental multiplier for failed job redelivery.
Expand Down Expand Up @@ -70,15 +76,15 @@
/**
* Heartbeat which we will be sending to server at given millisecond period.
*/
'send_heartbeat' => env('STOMP_SEND_HEARTBEAT', 20000),
'send_heartbeat' => env('STOMP_SEND_HEARTBEAT', 50000),

/**
* Setting consumer-window-size to a value greater than 0 will allow it to receive messages until
* the cumulative bytes of those messages reaches the configured size.
* Once that happens the client will not receive any more messages until it sends the appropriate ACK or NACK
* frame for the messages it already has.
*/
'consumer_window_size' => env('STOMP_CONSUMER_WIN_SIZE', 819200),
'consumer_window_size' => env('STOMP_CONSUMER_WIN_SIZE', 8192000),

/**
* Subscribe mode: auto, client.
Expand Down
18 changes: 5 additions & 13 deletions src/Queue/Jobs/StompJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
use Illuminate\Queue\Jobs\JobName;
use Illuminate\Support\Arr;
use Illuminate\Support\Facades\Event;
use Illuminate\Support\Facades\Log;
use Illuminate\Support\Str;
use Psr\Log\LoggerInterface;
use Stomp\Transport\Frame;
Expand Down Expand Up @@ -108,7 +107,6 @@ public function fire()
{
$this->log->info("$this->session [STOMP] Executing event...");
$this->isNativeLaravelJob() ? $this->fireLaravelJob() : $this->fireExternalJob();
$this->ackIfNecessary();
}

protected function isNativeLaravelJob(): bool
Expand Down Expand Up @@ -202,9 +200,10 @@ public function release($delay = 0)
{
parent::release($delay);

$payload = $this->createStompPayload($delay);

$this->stompQueue->pushRaw($payload, $this->queue, []);
if (Config::get('fail_job_requeue')) {
$payload = $this->createStompPayload($delay);
$this->stompQueue->pushRaw($payload, $this->queue, []);
}
}

protected function createStompPayload(int $delay): Message
Expand Down Expand Up @@ -245,8 +244,6 @@ protected function getBackoff(int $attempts): int
*/
protected function failed($e)
{
$this->ackIfNecessary();

// External events don't have failed method to call.
if (!$this->payload || !$this->isNativeLaravelJob()) {
return;
Expand All @@ -259,12 +256,7 @@ protected function failed($e)
$this->instance->failed($this->payload['data'], $e, $this->payload['uuid']);
}
} catch (\Exception $e) {
Log::error('Exception in job failing: ' . $e->getMessage());
$this->log->error('Exception in job failing: ' . $e->getMessage());
}
}

protected function ackIfNecessary()
{
$this->stompQueue->ackLastFrameIfNecessary();
}
}
33 changes: 18 additions & 15 deletions src/Queue/StompQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class StompQueue extends Queue implements QueueInterface
protected array $subscribedTo = [];

protected LoggerInterface $log;
protected static int $circuitBreaker = 0;
protected int $circuitBreaker = 0;
protected string $session;

protected $_lastFrame = null;
Expand Down Expand Up @@ -238,7 +238,7 @@ protected function writeToMultipleQueues(array $writeQueues, Message $payload):
return $allEventsSent;
}

protected function write($queue, Message $payload): bool
protected function write($queue, Message $payload, $tryAgain = true): bool
{
// This will write all the events received in a single batch, then send disconnect frame
try {
Expand All @@ -247,13 +247,17 @@ protected function write($queue, Message $payload): bool
$this->log->info("$this->session [STOMP] Message sent successfully? " . ($sent ? 't' : 'f'));

return $sent;
} catch (Exception) {
$this->log->error("$this->session [STOMP] PUSH failed. Reconnecting...");
} catch (Exception $e) {
$this->log->error("$this->session [STOMP] PUSH failed. Reconnecting... " . $e->getMessage());
$this->reconnect(false);

$this->log->info("$this->session [STOMP] Trying to send again...");
if ($tryAgain) {
$this->log->info("$this->session [STOMP] Trying to send again...");

return $this->write($queue, $payload, false);
}

return $this->write($queue, $payload);
return false;
}
}

Expand Down Expand Up @@ -453,25 +457,25 @@ protected function reconnect(bool $subscribe = true)

try {
$this->client->getClient()->connect();

$this->log->info("$this->session [STOMP] Reconnected successfully.");
} catch (Exception $e) {
self::$circuitBreaker++;
$cb = self::$circuitBreaker;
$this->circuitBreaker++;

$this->log->error("$this->session [STOMP] Failed reconnecting (tries: $cb), retrying in 2s..." . print_r($e->getMessage(), true));
$this->log->error("$this->session [STOMP] Failed reconnecting (tries: {$this->circuitBreaker}),
retrying..." . print_r($e->getMessage(), true));

if (self::$circuitBreaker <= 5) {
if ($this->circuitBreaker <= 5) {
usleep(100);
$this->reconnect($subscribe);
}

$this->log->error("$this->session [STOMP] Circuit breaker executed after $cb tries, exiting.");
$this->log->error("$this->session [STOMP] Circuit breaker executed after {$this->circuitBreaker} tries, exiting.");

return;
}

// By this point it should be connected, so it is safe to subscribe
if ($subscribe) {
if ($this->client->getClient()->isConnected() && $subscribe) {
$this->log->info("$this->session [STOMP] Connected, subscribing...");
$this->subscribedTo = [];
$this->subscribeToQueues();
Expand All @@ -486,7 +490,6 @@ public function disconnect()

try {
$this->ackLastFrameIfNecessary();

$this->log->info("$this->session [STOMP] Disconnecting...");
$this->client->getClient()->disconnect();
} catch (Exception $e) {
Expand All @@ -503,7 +506,7 @@ protected function subscribeToQueues(): void
continue;
}

$winSize = Config::get('consumer_window_size') ?? 819200;
$winSize = Config::get('consumer_window_size') ?: 8192000;
if ($this->_ackMode != self::ACK_MODE_CLIENT) {
$winSize = -1;
}
Expand Down
2 changes: 0 additions & 2 deletions src/StompServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ class StompServiceProvider extends ServiceProvider
public function register()
{
$this->mergeConfigFrom(__DIR__ . '/../config/asseco-stomp.php', 'asseco-stomp');

$this->mergeConfigFrom(__DIR__ . '/../config/stomp.php', 'queue.connections.stomp');
}

Expand All @@ -35,7 +34,6 @@ public function boot()
app()->singleton(Config::class);
app()->singleton(ConnectionWrapper::class);
app()->singleton(ClientWrapper::class);

app()->singleton(StompQueue::class);

/** @var QueueManager $queue */
Expand Down
Loading