Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow tasks to specify a unique key #13

Merged
merged 3 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 39 additions & 2 deletions Sources/MongoQueue/MongoQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@ import Meow
/// try await queue.queueTask(Reminder(username: "Joannis"))
/// ```
public final class MongoQueue {
public struct Option: Hashable {
internal enum _Option: Hashable {
case uniqueKeysEnabled
}

internal let raw: _Option

public static let enableUniqueKeys = Option(raw: .uniqueKeysEnabled)
}

internal let collection: MongoCollection
internal let logger = Logger(label: "org.openkitten.mongo-queues")
private var knownTypes = [KnownType]()
Expand All @@ -33,13 +43,20 @@ public final class MongoQueue {
private var maxParallelJobs = 1
private var task: Task<Void, Never>?
public var newTaskPollingFrequency = NIO.TimeAmount.milliseconds(1000)
public let options: Set<Option>

/// The frequency at which the queue will check for stalled tasks. Defaults to 30 seconds. This is the time after which a task is considered stalled and will be requeued.
public var stalledTaskPollingFrequency = NIO.TimeAmount.seconds(30)

/// Creates a new MongoQueue with the given collection as a backend for storing tasks.
public init(collection: MongoCollection, options: Set<Option>) {
self.collection = collection
self.options = options
}

public init(collection: MongoCollection) {
self.collection = collection
self.options = []
}

public func setMaxParallelJobs(to max: Int) {
Expand Down Expand Up @@ -173,6 +190,8 @@ public final class MongoQueue {
}

public func runUntilEmpty() async throws {
try await ensureIndexes()

struct TickResult {
var errors: [Error]
var tasksRan: Int
Expand Down Expand Up @@ -237,7 +256,9 @@ public final class MongoQueue {
}

let pool = collection.database.pool


try await ensureIndexes()

if
let wireVersion = await pool.wireVersion,
wireVersion.supportsCollectionChangeStream,
Expand All @@ -255,7 +276,23 @@ public final class MongoQueue {
public func shutdown() {
self.started = false
}


/// - Note: Gets called automatically when you `run` the application.
public func ensureIndexes() async throws {
if options.contains(.enableUniqueKeys) {
var uniqueKeyIndex = CreateIndexes.Index(
named: "unique-task-by-key",
keys: [
"uniqueKey": 1,
"category": 1
]
)
uniqueKeyIndex.unique = true
uniqueKeyIndex.partialFilterExpression = ["status": ["$in": ["scheduled", "executing"]]]
try await collection.createIndexes([uniqueKeyIndex])
}
}

private func startChangeStreamTicks() async throws {
// Using change stream cursor based polling
var options = ChangeStreamOptions()
Expand Down
8 changes: 7 additions & 1 deletion Sources/MongoQueue/QueuedTask.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ public protocol _QueuedTask: Codable {
/// The amount of urgency your task has. Tasks with higher priority take precedence over lower priorities.
/// When priorities are equal, the earlier-created task is executed first.
var priority: TaskPriority { get }


/// If you want only one task of this type to exist, use a static task key
/// If you want to have many tasks, but not duplicate the task, identify this task by the task key
/// If you don't want this task to be uniquely identified, and you want to spawn many of them, use `UUID().uuidString`
var uniqueTaskKey: String { get }

/// An internal configuration object that MongoQueue uses to pass around internal metadata
///
/// - Warning: Do not implement or use this yourself, if you need this hook let us know
Expand Down Expand Up @@ -54,6 +59,7 @@ extension _QueuedTask {
public static var category: String { String(describing: Self.self) }
public var priority: TaskPriority { .normal }
public var group: String? { nil }
public var uniqueTaskKey: String { UUID().uuidString }
public var maxTaskDuration: TimeInterval { 10 * 60 }
// public var allowsParallelisation: Bool { false }
}
9 changes: 2 additions & 7 deletions Sources/MongoQueue/RecurringTask.swift
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,6 @@ public protocol RecurringTask: _QueuedTask {
/// The moment that you want this to be first executed on (delay)
/// If you want it to be immediate, use `Date()`
var initialTaskExecutionDate: Date { get }

/// If you want only one task of this type to exist, use a static task key
/// If you want to have many tasks, but not duplicate the task, identify this task by the task key
/// If you don't want this task to be uniquely identified, and you want to spawn many of them, use `UUID().uuidString`
var uniqueTaskKey: String { get }

/// Tasks won't be executed after this moment
var taskExecutionDeadline: TimeInterval? { get }
Expand Down Expand Up @@ -91,7 +86,7 @@ struct ScheduledInterval: Codable {
extension RecurringTask {
/// The deadline for this task to be executed on. After this deadline, the task will not be executed, even if it is still in the queue.
public var taskExecutionDeadline: TimeInterval? { nil }

public func _onDequeueTask(_ task: TaskModel, withContext context: ExecutionContext, inQueue queue: MongoQueue) async throws -> _DequeueResult {
do {
guard case .recurring(let taskConfig) = try task.readConfiguration().value else {
Expand Down Expand Up @@ -132,7 +127,7 @@ extension RecurringTask {
public var configuration: _TaskConfiguration {
let recurring = RecurringTaskConfiguration(
scheduledDate: initialTaskExecutionDate,
key: uniqueTaskKey,
uniqueTaskKey: uniqueTaskKey,
deadline: taskExecutionDeadline
)
return _TaskConfiguration(value: .recurring(recurring))
Expand Down
1 change: 1 addition & 0 deletions Sources/MongoQueue/ScheduledTask.swift
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ extension ScheduledTask {
public var configuration: _TaskConfiguration {
let scheduled = ScheduledTaskConfiguration(
scheduledDate: taskExecutionDate,
uniqueTaskKey: uniqueTaskKey,
executeBefore: taskExecutionDeadline
)
return _TaskConfiguration(value: .scheduled(scheduled))
Expand Down
9 changes: 6 additions & 3 deletions Sources/MongoQueue/TaskModel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ public struct TaskModel: Codable {
/// Contains `Task.name`, used to identify how to decode the `metadata`
let category: String
let group: String?

/// If set, only one Task with this `uniqueKey` can be queued or executing for a given `category`
let uniqueKey: String?

let creationDate: Date
Expand Down Expand Up @@ -132,13 +134,13 @@ public struct TaskModel: Codable {
switch task.configuration.value {
case .scheduled(let configuration):
self.configurationType = .scheduled
self.uniqueKey = nil
self.uniqueKey = configuration.uniqueTaskKey
self.executeAfter = configuration.scheduledDate
self.executeBefore = configuration.executeBefore
self.configuration = try BSONEncoder().encode(configuration)
case .recurring(let configuration):
self.configurationType = .recurring
self.uniqueKey = configuration.key
self.uniqueKey = configuration.uniqueTaskKey
self.executeAfter = configuration.scheduledDate
self.executeBefore = configuration.deadline.map { deadline in
configuration.scheduledDate.addingTimeInterval(deadline)
Expand Down Expand Up @@ -183,11 +185,12 @@ public struct _TaskConfiguration {

struct RecurringTaskConfiguration: Codable {
let scheduledDate: Date
let key: String
let uniqueTaskKey: String
let deadline: TimeInterval?
}

struct ScheduledTaskConfiguration: Codable {
let scheduledDate: Date
let uniqueTaskKey: String?
let executeBefore: Date?
}
98 changes: 93 additions & 5 deletions Tests/MongoQueueTests/MongoQueueTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ final class MongoQueueTests: XCTestCase {
func testExample() async throws {
Self.ranTasks = 0
let db = try await MongoDatabase.connect(to: settings)
try await db.drop()
let queue = MongoQueue(collection: db["tasks"])
queue.registerTask(_Task.self, context: ())
try await queue.queueTask(_Task(message: 0))
Expand All @@ -21,11 +22,14 @@ final class MongoQueueTests: XCTestCase {
try await Task.sleep(nanoseconds: 10_000_000_000)

XCTAssertEqual(Self.ranTasks, 4)
queue.shutdown()
}

@available(macOS 13.0, *)
func testMaxParallelJobs() async throws {
Self.ranTasks = 0
let db = try await MongoDatabase.connect(to: settings)
try await db.drop()
let queue = MongoQueue(collection: db["tasks"])
queue.setMaxParallelJobs(to: 6)
queue.registerTask(SlowTask.self, context: ())
Expand All @@ -41,14 +45,17 @@ final class MongoQueueTests: XCTestCase {

try await queue.runUntilEmpty()

XCTAssertLessThanOrEqual(-start.timeIntervalSinceNow, 2)
XCTAssertLessThanOrEqual(Date().timeIntervalSince(start), 2)

XCTAssertEqual(Self.ranTasks, 6)
queue.shutdown()
}

@available(macOS 13.0, *)
func testMaxParallelJobsLow() async throws {
Self.ranTasks = 0
let db = try await MongoDatabase.connect(to: settings)
try await db.drop()
let queue = MongoQueue(collection: db["tasks"])
queue.setMaxParallelJobs(to: 1)
queue.registerTask(SlowTask.self, context: ())
Expand All @@ -64,23 +71,104 @@ final class MongoQueueTests: XCTestCase {

try await queue.runUntilEmpty()

XCTAssertGreaterThanOrEqual(-start.timeIntervalSinceNow, 6)
XCTAssertLessThanOrEqual(Date().timeIntervalSince(start), 7)

XCTAssertEqual(Self.ranTasks, 6)
queue.shutdown()
}


func testNoDuplicateQueuedTasksOfSameUniqueKey() async throws {
struct UniqueTask: ScheduledTask {
var taskExecutionDate: Date { Date() }
var uniqueTaskKey: String { "static" }

func execute(withContext context: Void) async throws {}

func onExecutionFailure(failureContext: QueuedTaskFailure<()>) async throws -> TaskExecutionFailureAction {
return .dequeue()
}
}

let db = try await MongoDatabase.connect(to: settings)
try await db.drop()
let queue = MongoQueue(collection: db["tasks"], options: [.enableUniqueKeys])
queue.registerTask(UniqueTask.self, context: ())
try await queue.ensureIndexes()
try await queue.queueTask(UniqueTask())

do {
try await queue.queueTask(UniqueTask())
XCTFail("Task should not be able to exist in queue twice")
} catch {}

try await queue.runUntilEmpty()
try await queue.queueTask(UniqueTask())
queue.shutdown()
}

func testDuplicatedOfDifferentTasksCanExist() async throws {
struct UniqueTask: ScheduledTask {
var taskExecutionDate: Date { Date() }
var uniqueTaskKey: String { "static" }

func execute(withContext context: Void) async throws {}

func onExecutionFailure(failureContext: QueuedTaskFailure<()>) async throws -> TaskExecutionFailureAction {
return .dequeue()
}
}

struct UniqueTask2: ScheduledTask {
var taskExecutionDate: Date { Date() }
var uniqueTaskKey: String { "static" }

func execute(withContext context: Void) async throws {}

func onExecutionFailure(failureContext: QueuedTaskFailure<()>) async throws -> TaskExecutionFailureAction {
return .dequeue()
}
}

let db = try await MongoDatabase.connect(to: settings)
try await db.drop()
let queue = MongoQueue(collection: db["tasks"], options: [.enableUniqueKeys])
queue.registerTask(UniqueTask.self, context: ())
queue.registerTask(UniqueTask2.self, context: ())
try await queue.ensureIndexes()
try await queue.queueTask(UniqueTask())
try await queue.queueTask(UniqueTask2())

do {
try await queue.queueTask(UniqueTask())
XCTFail("Task should not be able to exist in queue twice")
} catch {}

do {
try await queue.queueTask(UniqueTask2())
XCTFail("Task should not be able to exist in queue twice")
} catch {}

try await queue.runUntilEmpty()
try await queue.queueTask(UniqueTask())
try await queue.queueTask(UniqueTask2())
queue.shutdown()
}

func test_recurringTask() async throws {
Self.ranTasks = 0
let db = try await MongoDatabase.connect(to: settings)
try await db.drop()
let queue = MongoQueue(collection: db["tasks"])
queue.registerTask(RTRecurringTask.self, context: ())
try await queue.queueTask(RTRecurringTask())
queue.newTaskPollingFrequency = .milliseconds(100)
try queue.runInBackground()

// Sleep 30 sec, so each 5-second window is ran, +5 seconds to test if it runs only 5 times
try await Task.sleep(nanoseconds: 35_000_000_000)
try await Task.sleep(nanoseconds: 5_000_000_000)

XCTAssertEqual(Self.ranTasks, 5)
queue.shutdown()
}
}

Expand Down Expand Up @@ -124,7 +212,7 @@ struct RTRecurringTask: RecurringTask {
var uniqueTaskKey: String = "RecurringTask"

func getNextRecurringTaskDate(_ context: ExecutionContext) async throws -> Date? {
MongoQueueTests.ranTasks >= 5 ? nil : Date().addingTimeInterval(5)
MongoQueueTests.ranTasks >= 5 ? nil : Date().addingTimeInterval(1)
}

func execute(withContext context: Void) async throws {
Expand Down