Skip to content

Commit

Permalink
Enqueue directly to Redis unless in a transaction (ActiveJob)
Browse files Browse the repository at this point in the history
  • Loading branch information
bray committed Jan 31, 2024
1 parent 99f1687 commit a73e831
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 19 deletions.
19 changes: 16 additions & 3 deletions lib/active_job/queue_adapters/sidekiq_publisher_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,28 @@ class SidekiqPublisherAdapter
JOB_WRAPPER_CLASS = ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper.to_s.freeze

def enqueue(job)
internal_enqueue(job)
if ActiveRecord::Base.connection.transaction_open?
create_job_record(job)
else
sidekiq_adapter.enqueue(job)
end
end

def enqueue_at(job, timestamp)
internal_enqueue(job, timestamp)
if ActiveRecord::Base.connection.transaction_open?
create_job_record(job, timestamp)
else
sidekiq_adapter.enqueue_at(job, timestamp)
end
end

private

def internal_enqueue(job, timestamp = nil)
def create_job_record(job, timestamp = nil)
job.provider_job_id = SidekiqPublisher::Job.generate_sidekiq_jid
attributes = job_attributes(job)
attributes[:run_at] = timestamp if timestamp.present?

SidekiqPublisher::Job.create!(attributes).job_id
end

Expand All @@ -36,6 +45,10 @@ def job_attributes(job)
args: [job.serialize],
}
end

def sidekiq_adapter
@_sidekiq_adapter ||= ActiveJob::QueueAdapters::SidekiqAdapter.new
end
end
end
end
95 changes: 79 additions & 16 deletions spec/active_job/queue_adapters/sidekiq_publisher_adapter_spec.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# frozen_string_literal: true

require "sidekiq/api"

RSpec.describe ActiveJob::QueueAdapters::SidekiqPublisherAdapter do
let(:job_class) do
Class.new(ActiveJob::Base) do
Expand All @@ -14,38 +16,99 @@ def perform(*args); end

before do
stub_const("TestJob", job_class)
clear_redis
end

describe "#enqueue" do
it "creates a SidekiqPublisher job" do
active_job.enqueue
context "when in a transaction" do
it "creates a SidekiqPublisher job record" do
active_job.enqueue

expect(job.job_class).to eq(described_class::JOB_WRAPPER_CLASS)
expect(job.wrapped).to eq("TestJob")
expect(job.args.first).to include(
"job_class" => "TestJob",
"arguments" => args,
"provider_job_id" => job.job_id
)
end
end

context "when not in a transaction", skip_db_clean: true do
it "enqueues directly to Redis via Sidekiq" do
active_job.enqueue

expect(job.job_class).to eq(described_class::JOB_WRAPPER_CLASS)
expect(job.wrapped).to eq("TestJob")
expect(job.args.first).to include("job_class" => "TestJob", "arguments" => args, "provider_job_id" => job.job_id)
expect(job).to be_nil

queue = Sidekiq::Queue.new("default")
expect(queue.size).to eq(1)

sidekiq_job = queue.first
expect(sidekiq_job.display_class).to eq("TestJob")
expect(sidekiq_job.display_args).to eq(args)
end
end
end

describe "#enqueue_at" do
let(:scheduled_at) { 1.hour.from_now }

it "creates a SidekiqPublisher job with a run_at value" do
active_job.enqueue(wait_until: scheduled_at)
context "when in a transaction" do
it "creates a SidekiqPublisher job with a run_at value" do
active_job.enqueue(wait_until: scheduled_at)

expect(job.job_class).to eq(described_class::JOB_WRAPPER_CLASS)
expect(job.wrapped).to eq("TestJob")
expect(job.args.first).to include(
"job_class" => "TestJob",
"arguments" => args,
"provider_job_id" => job.job_id
)
expect(job.run_at).to be_within(1).of(scheduled_at.to_f)
end
end

context "when not in a transaction", skip_db_clean: true do
it "enqueues directly to Redis via Sidekiq" do
active_job.enqueue(wait_until: scheduled_at)

expect(job).to be_nil

expect(job.job_class).to eq(described_class::JOB_WRAPPER_CLASS)
expect(job.wrapped).to eq("TestJob")
expect(job.args.first).to include("job_class" => "TestJob", "arguments" => args, "provider_job_id" => job.job_id)
expect(job.run_at).to be_within(1).of(scheduled_at.to_f)
queue = Sidekiq::ScheduledSet.new
expect(queue.size).to eq(1)

sidekiq_job = queue.first
expect(sidekiq_job.display_class).to eq("TestJob")
expect(sidekiq_job.display_args).to eq(args)
expect(sidekiq_job.at).to be_within(1).of(scheduled_at)
end
end
end

describe "ActiveJob::Base.perform_later" do
it "creates a SidekiqPublisher job" do
job_class.perform_later(*args)
context "when in a transaction" do
it "creates a SidekiqPublisher job record" do
job_class.perform_later(*args)

expect(job.job_class).to eq(described_class::JOB_WRAPPER_CLASS)
expect(job.wrapped).to eq("TestJob")
expect(job.args.dig(0, "arguments")).to eq(args)
end
end

context "when not in a transaction", skip_db_clean: true do
it "enqueues directly to Redis via Sidekiq" do
job_class.perform_later(*args)

expect(job).to be_nil

queue = Sidekiq::Queue.new("default")
expect(queue.size).to eq(1)

expect(job.job_class).to eq(described_class::JOB_WRAPPER_CLASS)
expect(job.wrapped).to eq("TestJob")
expect(job.args.dig(0, "arguments")).to eq(args)
sidekiq_job = queue.first
expect(sidekiq_job.display_class).to eq("TestJob")
expect(sidekiq_job.display_args).to eq(args)
end
end
end
end

0 comments on commit a73e831

Please sign in to comment.