Skip to content

Commit

Permalink
Enqueue directly to Redis unless in a transaction (#74)
Browse files Browse the repository at this point in the history
  • Loading branch information
bray authored Jan 31, 2024
1 parent 9b74043 commit 684ca83
Show file tree
Hide file tree
Showing 9 changed files with 275 additions and 47 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ that modifies the system of record for the application. It also allows jobs to
be created even when Sidekiq/Redis is temporarily unavailable. The separate
publisher process handles retries and ensure that each job is delivered to Sidekiq.

> :warning: Not all jobs are staged in Postgres. This is determined dynamically:
> if the job is enqueued from within an `ActiveRecord` transaction, then it is
> staged in Postgres. If not, then it bypasses Postgres and is enqueued directly
> to Redis via Sidekiq.
## Installation

Add this line to your application's Gemfile:
Expand Down
19 changes: 16 additions & 3 deletions lib/active_job/queue_adapters/sidekiq_publisher_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,28 @@ class SidekiqPublisherAdapter
JOB_WRAPPER_CLASS = ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper.to_s.freeze

def enqueue(job)
internal_enqueue(job)
if SidekiqPublisher::DatabaseConnection.transaction_open?
create_job_record(job)
else
sidekiq_adapter.enqueue(job)
end
end

def enqueue_at(job, timestamp)
internal_enqueue(job, timestamp)
if SidekiqPublisher::DatabaseConnection.transaction_open?
create_job_record(job, timestamp)
else
sidekiq_adapter.enqueue_at(job, timestamp)
end
end

private

def internal_enqueue(job, timestamp = nil)
def create_job_record(job, timestamp = nil)
job.provider_job_id = SidekiqPublisher::Job.generate_sidekiq_jid
attributes = job_attributes(job)
attributes[:run_at] = timestamp if timestamp.present?

SidekiqPublisher::Job.create!(attributes).job_id
end

Expand All @@ -36,6 +45,10 @@ def job_attributes(job)
args: [job.serialize],
}
end

def sidekiq_adapter
@_sidekiq_adapter ||= ActiveJob::QueueAdapters::SidekiqAdapter.new
end
end
end
end
1 change: 1 addition & 0 deletions lib/sidekiq_publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
require "active_support/core_ext/numeric/time"
require "sidekiq_publisher/version"
require "sidekiq_publisher/compatibility"
require "sidekiq_publisher/database_connection"
require "sidekiq_publisher/instrumenter"
require "sidekiq_publisher/metrics_reporter"
require "sidekiq_publisher/exception_reporter"
Expand Down
9 changes: 9 additions & 0 deletions lib/sidekiq_publisher/database_connection.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# frozen_string_literal: true

module SidekiqPublisher
module DatabaseConnection
def self.transaction_open?
ActiveRecord::Base.connection.transaction_open?
end
end
end
6 changes: 5 additions & 1 deletion lib/sidekiq_publisher/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ def self.included(base)

module ClassMethods
def client_push(item)
SidekiqPublisher::Job.create_job!(item)
if SidekiqPublisher::DatabaseConnection.transaction_open?
SidekiqPublisher::Job.create_job!(item)
else
super
end
end
end
end
Expand Down
128 changes: 112 additions & 16 deletions spec/active_job/queue_adapters/sidekiq_publisher_adapter_spec.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# frozen_string_literal: true

require "sidekiq/api"

RSpec.describe ActiveJob::QueueAdapters::SidekiqPublisherAdapter do
let(:job_class) do
Class.new(ActiveJob::Base) do
Expand All @@ -14,38 +16,132 @@ def perform(*args); end

before do
stub_const("TestJob", job_class)
clear_redis
end

describe "#enqueue" do
it "creates a SidekiqPublisher job" do
active_job.enqueue
context "when in a transaction" do
it "creates a SidekiqPublisher job record" do
active_job.enqueue

expect(job.job_class).to eq(described_class::JOB_WRAPPER_CLASS)
expect(job.wrapped).to eq("TestJob")
expect(job.args.first).to include(
"job_class" => "TestJob",
"arguments" => args,
"provider_job_id" => job.job_id
)
end

expect(job.job_class).to eq(described_class::JOB_WRAPPER_CLASS)
expect(job.wrapped).to eq("TestJob")
expect(job.args.first).to include("job_class" => "TestJob", "arguments" => args, "provider_job_id" => job.job_id)
it "does not enqueue directly to Redis" do
active_job.enqueue

queue = Sidekiq::Queue.new("default")
expect(queue.size).to eq(0)
end
end

context "when not in a transaction", skip_db_clean: true do
it "does not create a SidekiqPublisher job record" do
active_job.enqueue

expect(job).to be_nil
end

it "enqueues directly to Redis via Sidekiq" do
active_job.enqueue

queue = Sidekiq::Queue.new("default")
expect(queue.size).to eq(1)

sidekiq_job = queue.first
expect(sidekiq_job.display_class).to eq("TestJob")
expect(sidekiq_job.display_args).to eq(args)
end
end
end

describe "#enqueue_at" do
let(:scheduled_at) { 1.hour.from_now }

it "creates a SidekiqPublisher job with a run_at value" do
active_job.enqueue(wait_until: scheduled_at)
context "when in a transaction" do
it "creates a SidekiqPublisher job record with a run_at value" do
active_job.enqueue(wait_until: scheduled_at)

expect(job.job_class).to eq(described_class::JOB_WRAPPER_CLASS)
expect(job.wrapped).to eq("TestJob")
expect(job.args.first).to include(
"job_class" => "TestJob",
"arguments" => args,
"provider_job_id" => job.job_id
)
expect(job.run_at).to be_within(1).of(scheduled_at.to_f)
end

expect(job.job_class).to eq(described_class::JOB_WRAPPER_CLASS)
expect(job.wrapped).to eq("TestJob")
expect(job.args.first).to include("job_class" => "TestJob", "arguments" => args, "provider_job_id" => job.job_id)
expect(job.run_at).to be_within(1).of(scheduled_at.to_f)
it "does not enqueue directly to Redis" do
active_job.enqueue(wait_until: scheduled_at)

queue = Sidekiq::ScheduledSet.new
expect(queue.size).to eq(0)
end
end

context "when not in a transaction", skip_db_clean: true do
it "does not create a SidekiqPublisher job record" do
active_job.enqueue(wait_until: scheduled_at)

expect(job).to be_nil
end

it "enqueues directly to Redis via Sidekiq" do
active_job.enqueue(wait_until: scheduled_at)

queue = Sidekiq::ScheduledSet.new
expect(queue.size).to eq(1)

sidekiq_job = queue.first
expect(sidekiq_job.display_class).to eq("TestJob")
expect(sidekiq_job.display_args).to eq(args)
expect(sidekiq_job.at).to be_within(1).of(scheduled_at)
end
end
end

describe "ActiveJob::Base.perform_later" do
it "creates a SidekiqPublisher job" do
job_class.perform_later(*args)
context "when in a transaction" do
it "creates a SidekiqPublisher job record" do
job_class.perform_later(*args)

expect(job.job_class).to eq(described_class::JOB_WRAPPER_CLASS)
expect(job.wrapped).to eq("TestJob")
expect(job.args.dig(0, "arguments")).to eq(args)
end

it "does not enqueue directly to Redis" do
job_class.perform_later(*args)

queue = Sidekiq::Queue.new("default")
expect(queue.size).to eq(0)
end
end

context "when not in a transaction", skip_db_clean: true do
it "does not create a SidekiqPublisher job record" do
job_class.perform_later(*args)

expect(job).to be_nil
end

it "enqueues directly to Redis via Sidekiq" do
job_class.perform_later(*args)

queue = Sidekiq::Queue.new("default")
expect(queue.size).to eq(1)

expect(job.job_class).to eq(described_class::JOB_WRAPPER_CLASS)
expect(job.wrapped).to eq("TestJob")
expect(job.args.dig(0, "arguments")).to eq(args)
sidekiq_job = queue.first
expect(sidekiq_job.display_class).to eq("TestJob")
expect(sidekiq_job.display_args).to eq(args)
end
end
end
end
Loading

0 comments on commit 684ca83

Please sign in to comment.