Skip to content

Commit

Permalink
Merge pull request #13 from orlandos-nl/feature/jo/unique-keys
Browse files Browse the repository at this point in the history
Allow tasks to specify a unique key
  • Loading branch information
Joannis authored Oct 25, 2023
2 parents 2dc8e17 + e026b9a commit 7855bba
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 18 deletions.
41 changes: 39 additions & 2 deletions Sources/MongoQueue/MongoQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@ import Meow
/// try await queue.queueTask(Reminder(username: "Joannis"))
/// ```
public final class MongoQueue {
public struct Option: Hashable {
internal enum _Option: Hashable {
case uniqueKeysEnabled
}

internal let raw: _Option

public static let enableUniqueKeys = Option(raw: .uniqueKeysEnabled)
}

internal let collection: MongoCollection
internal let logger = Logger(label: "org.openkitten.mongo-queues")
private var knownTypes = [KnownType]()
Expand All @@ -33,13 +43,20 @@ public final class MongoQueue {
private var maxParallelJobs = 1
private var task: Task<Void, Never>?
public var newTaskPollingFrequency = NIO.TimeAmount.milliseconds(1000)
public let options: Set<Option>

/// The frequency at which the queue will check for stalled tasks. Defaults to 30 seconds. This is the time after which a task is considered stalled and will be requeued.
public var stalledTaskPollingFrequency = NIO.TimeAmount.seconds(30)

/// Creates a new MongoQueue with the given collection as a backend for storing tasks.
public init(collection: MongoCollection, options: Set<Option>) {
self.collection = collection
self.options = options
}

public init(collection: MongoCollection) {
self.collection = collection
self.options = []
}

public func setMaxParallelJobs(to max: Int) {
Expand Down Expand Up @@ -173,6 +190,8 @@ public final class MongoQueue {
}

public func runUntilEmpty() async throws {
try await ensureIndexes()

struct TickResult {
var errors: [Error]
var tasksRan: Int
Expand Down Expand Up @@ -237,7 +256,9 @@ public final class MongoQueue {
}

let pool = collection.database.pool


try await ensureIndexes()

if
let wireVersion = await pool.wireVersion,
wireVersion.supportsCollectionChangeStream,
Expand All @@ -255,7 +276,23 @@ public final class MongoQueue {
public func shutdown() {
self.started = false
}


/// - Note: Gets called automatically when you `run` the application.
public func ensureIndexes() async throws {
if options.contains(.enableUniqueKeys) {
var uniqueKeyIndex = CreateIndexes.Index(
named: "unique-task-by-key",
keys: [
"uniqueKey": 1,
"category": 1
]
)
uniqueKeyIndex.unique = true
uniqueKeyIndex.partialFilterExpression = ["status": ["$in": ["scheduled", "executing"]]]
try await collection.createIndexes([uniqueKeyIndex])
}
}

private func startChangeStreamTicks() async throws {
// Using change stream cursor based polling
var options = ChangeStreamOptions()
Expand Down
8 changes: 7 additions & 1 deletion Sources/MongoQueue/QueuedTask.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ public protocol _QueuedTask: Codable {
/// The amount of urgency your task has. Tasks with higher priority take precedence over lower priorities.
/// When priorities are equal, the earlier-created task is executed first.
var priority: TaskPriority { get }


/// If you want only one task of this type to exist, use a static task key
/// If you want to have many tasks, but not duplicate the task, identify this task by the task key
/// If you don't want this task to be uniquely identified, and you want to spawn many of them, use `UUID().uuidString`
var uniqueTaskKey: String { get }

/// An internal configuration object that MongoQueue uses to pass around internal metadata
///
/// - Warning: Do not implement or use this yourself, if you need this hook let us know
Expand Down Expand Up @@ -54,6 +59,7 @@ extension _QueuedTask {
public static var category: String { String(describing: Self.self) }
public var priority: TaskPriority { .normal }
public var group: String? { nil }
public var uniqueTaskKey: String { UUID().uuidString }
public var maxTaskDuration: TimeInterval { 10 * 60 }
// public var allowsParallelisation: Bool { false }
}
9 changes: 2 additions & 7 deletions Sources/MongoQueue/RecurringTask.swift
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,6 @@ public protocol RecurringTask: _QueuedTask {
/// The moment that you want this to be first executed on (delay)
/// If you want it to be immediate, use `Date()`
var initialTaskExecutionDate: Date { get }

/// If you want only one task of this type to exist, use a static task key
/// If you want to have many tasks, but not duplicate the task, identify this task by the task key
/// If you don't want this task to be uniquely identified, and you want to spawn many of them, use `UUID().uuidString`
var uniqueTaskKey: String { get }

/// Tasks won't be executed after this moment
var taskExecutionDeadline: TimeInterval? { get }
Expand Down Expand Up @@ -91,7 +86,7 @@ struct ScheduledInterval: Codable {
extension RecurringTask {
/// The deadline for this task to be executed on. After this deadline, the task will not be executed, even if it is still in the queue.
public var taskExecutionDeadline: TimeInterval? { nil }

public func _onDequeueTask(_ task: TaskModel, withContext context: ExecutionContext, inQueue queue: MongoQueue) async throws -> _DequeueResult {
do {
guard case .recurring(let taskConfig) = try task.readConfiguration().value else {
Expand Down Expand Up @@ -132,7 +127,7 @@ extension RecurringTask {
public var configuration: _TaskConfiguration {
let recurring = RecurringTaskConfiguration(
scheduledDate: initialTaskExecutionDate,
key: uniqueTaskKey,
uniqueTaskKey: uniqueTaskKey,
deadline: taskExecutionDeadline
)
return _TaskConfiguration(value: .recurring(recurring))
Expand Down
1 change: 1 addition & 0 deletions Sources/MongoQueue/ScheduledTask.swift
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ extension ScheduledTask {
public var configuration: _TaskConfiguration {
let scheduled = ScheduledTaskConfiguration(
scheduledDate: taskExecutionDate,
uniqueTaskKey: uniqueTaskKey,
executeBefore: taskExecutionDeadline
)
return _TaskConfiguration(value: .scheduled(scheduled))
Expand Down
9 changes: 6 additions & 3 deletions Sources/MongoQueue/TaskModel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ public struct TaskModel: Codable {
/// Contains `Task.name`, used to identify how to decode the `metadata`
let category: String
let group: String?

/// If set, only one Task with this `uniqueKey` can be queued or executing for a given `category`
let uniqueKey: String?

let creationDate: Date
Expand Down Expand Up @@ -132,13 +134,13 @@ public struct TaskModel: Codable {
switch task.configuration.value {
case .scheduled(let configuration):
self.configurationType = .scheduled
self.uniqueKey = nil
self.uniqueKey = configuration.uniqueTaskKey
self.executeAfter = configuration.scheduledDate
self.executeBefore = configuration.executeBefore
self.configuration = try BSONEncoder().encode(configuration)
case .recurring(let configuration):
self.configurationType = .recurring
self.uniqueKey = configuration.key
self.uniqueKey = configuration.uniqueTaskKey
self.executeAfter = configuration.scheduledDate
self.executeBefore = configuration.deadline.map { deadline in
configuration.scheduledDate.addingTimeInterval(deadline)
Expand Down Expand Up @@ -183,11 +185,12 @@ public struct _TaskConfiguration {

struct RecurringTaskConfiguration: Codable {
let scheduledDate: Date
let key: String
let uniqueTaskKey: String
let deadline: TimeInterval?
}

struct ScheduledTaskConfiguration: Codable {
let scheduledDate: Date
let uniqueTaskKey: String?
let executeBefore: Date?
}
98 changes: 93 additions & 5 deletions Tests/MongoQueueTests/MongoQueueTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ final class MongoQueueTests: XCTestCase {
func testExample() async throws {
Self.ranTasks = 0
let db = try await MongoDatabase.connect(to: settings)
try await db.drop()
let queue = MongoQueue(collection: db["tasks"])
queue.registerTask(_Task.self, context: ())
try await queue.queueTask(_Task(message: 0))
Expand All @@ -21,11 +22,14 @@ final class MongoQueueTests: XCTestCase {
try await Task.sleep(nanoseconds: 10_000_000_000)

XCTAssertEqual(Self.ranTasks, 4)
queue.shutdown()
}

@available(macOS 13.0, *)
func testMaxParallelJobs() async throws {
Self.ranTasks = 0
let db = try await MongoDatabase.connect(to: settings)
try await db.drop()
let queue = MongoQueue(collection: db["tasks"])
queue.setMaxParallelJobs(to: 6)
queue.registerTask(SlowTask.self, context: ())
Expand All @@ -41,14 +45,17 @@ final class MongoQueueTests: XCTestCase {

try await queue.runUntilEmpty()

XCTAssertLessThanOrEqual(-start.timeIntervalSinceNow, 2)
XCTAssertLessThanOrEqual(Date().timeIntervalSince(start), 2)

XCTAssertEqual(Self.ranTasks, 6)
queue.shutdown()
}

@available(macOS 13.0, *)
func testMaxParallelJobsLow() async throws {
Self.ranTasks = 0
let db = try await MongoDatabase.connect(to: settings)
try await db.drop()
let queue = MongoQueue(collection: db["tasks"])
queue.setMaxParallelJobs(to: 1)
queue.registerTask(SlowTask.self, context: ())
Expand All @@ -64,23 +71,104 @@ final class MongoQueueTests: XCTestCase {

try await queue.runUntilEmpty()

XCTAssertGreaterThanOrEqual(-start.timeIntervalSinceNow, 6)
XCTAssertLessThanOrEqual(Date().timeIntervalSince(start), 7)

XCTAssertEqual(Self.ranTasks, 6)
queue.shutdown()
}


func testNoDuplicateQueuedTasksOfSameUniqueKey() async throws {
struct UniqueTask: ScheduledTask {
var taskExecutionDate: Date { Date() }
var uniqueTaskKey: String { "static" }

func execute(withContext context: Void) async throws {}

func onExecutionFailure(failureContext: QueuedTaskFailure<()>) async throws -> TaskExecutionFailureAction {
return .dequeue()
}
}

let db = try await MongoDatabase.connect(to: settings)
try await db.drop()
let queue = MongoQueue(collection: db["tasks"], options: [.enableUniqueKeys])
queue.registerTask(UniqueTask.self, context: ())
try await queue.ensureIndexes()
try await queue.queueTask(UniqueTask())

do {
try await queue.queueTask(UniqueTask())
XCTFail("Task should not be able to exist in queue twice")
} catch {}

try await queue.runUntilEmpty()
try await queue.queueTask(UniqueTask())
queue.shutdown()
}

func testDuplicatedOfDifferentTasksCanExist() async throws {
struct UniqueTask: ScheduledTask {
var taskExecutionDate: Date { Date() }
var uniqueTaskKey: String { "static" }

func execute(withContext context: Void) async throws {}

func onExecutionFailure(failureContext: QueuedTaskFailure<()>) async throws -> TaskExecutionFailureAction {
return .dequeue()
}
}

struct UniqueTask2: ScheduledTask {
var taskExecutionDate: Date { Date() }
var uniqueTaskKey: String { "static" }

func execute(withContext context: Void) async throws {}

func onExecutionFailure(failureContext: QueuedTaskFailure<()>) async throws -> TaskExecutionFailureAction {
return .dequeue()
}
}

let db = try await MongoDatabase.connect(to: settings)
try await db.drop()
let queue = MongoQueue(collection: db["tasks"], options: [.enableUniqueKeys])
queue.registerTask(UniqueTask.self, context: ())
queue.registerTask(UniqueTask2.self, context: ())
try await queue.ensureIndexes()
try await queue.queueTask(UniqueTask())
try await queue.queueTask(UniqueTask2())

do {
try await queue.queueTask(UniqueTask())
XCTFail("Task should not be able to exist in queue twice")
} catch {}

do {
try await queue.queueTask(UniqueTask2())
XCTFail("Task should not be able to exist in queue twice")
} catch {}

try await queue.runUntilEmpty()
try await queue.queueTask(UniqueTask())
try await queue.queueTask(UniqueTask2())
queue.shutdown()
}

func test_recurringTask() async throws {
Self.ranTasks = 0
let db = try await MongoDatabase.connect(to: settings)
try await db.drop()
let queue = MongoQueue(collection: db["tasks"])
queue.registerTask(RTRecurringTask.self, context: ())
try await queue.queueTask(RTRecurringTask())
queue.newTaskPollingFrequency = .milliseconds(100)
try queue.runInBackground()

// Sleep 30 sec, so each 5-second window is ran, +5 seconds to test if it runs only 5 times
try await Task.sleep(nanoseconds: 35_000_000_000)
try await Task.sleep(nanoseconds: 5_000_000_000)

XCTAssertEqual(Self.ranTasks, 5)
queue.shutdown()
}
}

Expand Down Expand Up @@ -124,7 +212,7 @@ struct RTRecurringTask: RecurringTask {
var uniqueTaskKey: String = "RecurringTask"

func getNextRecurringTaskDate(_ context: ExecutionContext) async throws -> Date? {
MongoQueueTests.ranTasks >= 5 ? nil : Date().addingTimeInterval(5)
MongoQueueTests.ranTasks >= 5 ? nil : Date().addingTimeInterval(1)
}

func execute(withContext context: Void) async throws {
Expand Down

0 comments on commit 7855bba

Please sign in to comment.