Skip to content

Commit

Permalink
Merge pull request #17 from orlandos-nl/jo/concurrency-improvements
Browse files Browse the repository at this point in the history
Concurrency improvements
  • Loading branch information
Joannis authored Apr 15, 2024
2 parents 3e60550 + 0db6346 commit 8354892
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 70 deletions.
62 changes: 47 additions & 15 deletions Sources/MongoQueue/KnownType.swift
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ internal struct KnownType {
metadata = try BSONDecoder().decode(type, from: task.metadata)
} catch {
logger.error("Task of category \"\(T.category)\" has changed metadata format")
queue.jobsInvalid.increment()
try await collection.deleteOne(where: "_id" == task._id)
queue.jobsRemoved.increment()
throw error
}

Expand All @@ -49,12 +51,15 @@ internal struct KnownType {
switch taskConfig {
case .scheduled(let scheduleConfig):
if let executeBefore = scheduleConfig.executeBefore, executeBefore < Date() {
queue.jobsExpired.increment()

logger.info("Task of category \"\(T.category)\" expired and will not be executed")
do {
// TODO: We assume this succeeds, but what if it does not?
var concern = WriteConcern()
concern.acknowledgement = .majority
try await collection.deleteOne(where: "_id" == task._id, writeConcern: concern)
queue.jobsRemoved.increment()
} catch {
logger.critical("Failed to delete task \(task._id) of category \"\(T.category))\" after execution: \(error.localizedDescription)")
}
Expand All @@ -63,12 +68,15 @@ internal struct KnownType {
case .recurring(let recurringConfig):
// No filters exist (yet) that prevent a task from executing
if let deadline = recurringConfig.deadline, recurringConfig.scheduledDate.addingTimeInterval(deadline) < Date() {
queue.jobsExpired.increment()

logger.info("Task of category \"\(T.category)\" expired and will not be executed")
do {
// TODO: We assume this succeeds, but what if it does not?
var concern = WriteConcern()
concern.acknowledgement = .majority
try await collection.deleteOne(where: "_id" == task._id, writeConcern: concern)
queue.jobsRemoved.increment()
} catch {
logger.critical("Failed to delete task \(task._id) of category \"\(T.category))\" after execution: \(error.localizedDescription)")
}
Expand All @@ -83,25 +91,42 @@ internal struct KnownType {

// We're early on the updates, so that we don't get dequeued
let interval = Swift.max(task.maxTaskDuration - 15, 1)
let executionUpdates = Task {
while !Task.isCancelled {
try await Task.sleep(nanoseconds: UInt64(interval) * 1_000_000_000)
_ = try await collection.findOneAndUpdate(
where: "_id" == taskId ,
to: [
"$set": [
"execution.lastUpdate": Date()
try await withThrowingTaskGroup(of: T.self) { taskGroup in
taskGroup.addTask {
while !Task.isCancelled {
try await Task.sleep(nanoseconds: UInt64(interval) * 1_000_000_000)
_ = try await collection.findOneAndUpdate(
where: "_id" == taskId ,
to: [
"$set": [
"execution.lastUpdate": Date()
]
]
]
).execute()
).execute()
}

throw CancellationError()
}

taskGroup.addTask { [metadata, task] in
var metadata = metadata
queue.jobsRan.increment()
try await metadata.execute(withContext: context)
queue.jobsSucceeded.increment()
logger.debug("Successful execution: task \(task._id) of category \"\(T.category)\"")
_ = try await metadata._onDequeueTask(task, withContext: context, inQueue: queue)
return metadata
}

guard let _metadata = try await taskGroup.next() else {
throw CancellationError()
}

metadata = _metadata
taskGroup.cancelAll()
}

defer { executionUpdates.cancel() }
try await metadata.execute(withContext: context)
logger.debug("Successful execution: task \(task._id) of category \"\(T.category)\"")
_ = try await metadata._onDequeueTask(task, withContext: context, inQueue: queue)
} catch {
queue.jobsFailed.increment()
logger.debug("Execution failure for task \(task._id) in category \"\(T.category))\": \(error.localizedDescription)")
let failureContext = QueuedTaskFailure(
executionContext: context,
Expand Down Expand Up @@ -130,6 +155,7 @@ internal struct KnownType {
throw MongoQueueError.dequeueTaskFailed
}
}
queue.jobsRemoved.increment()
}

switch onFailure.raw {
Expand All @@ -146,6 +172,7 @@ internal struct KnownType {
guard try await collection.upsertEncoded(task, where: "_id" == task._id).updatedCount == 1 else {
throw MongoQueueError.reschedulingFailedTaskFailed
}
queue.jobsRequeued.increment()
}
case .retryAfter(let nextInterval, maxAttempts: let maxAttempts, let removal):
if let maxAttempts = maxAttempts, task.attempts >= maxAttempts {
Expand All @@ -155,6 +182,11 @@ internal struct KnownType {
task.status = .scheduled
task.execution = nil
task.executeAfter = Date().addingTimeInterval(nextInterval)

guard try await collection.upsertEncoded(task, where: "_id" == task._id).updatedCount == 1 else {
throw MongoQueueError.reschedulingFailedTaskFailed
}
queue.jobsRequeued.increment()
}
}

Expand Down
129 changes: 74 additions & 55 deletions Sources/MongoQueue/MongoQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import MongoCore
import NIOConcurrencyHelpers
import Foundation
import Meow
import Tracing
import Metrics

/// A MongoQueue is a queue that uses MongoDB as a backend for storing tasks. It is designed to be used in a distributed environment.
///
Expand Down Expand Up @@ -51,6 +53,14 @@ public final class MongoQueue: @unchecked Sendable {
}
private let checkServerNotifications = AsyncStream<Void>.makeStream(bufferingPolicy: .bufferingNewest(2))
private var maxParallelJobs = 1
package let jobsRan = Counter(label: "org.orlandos-nl.mongoqueue.jobsRan")
package let jobsSucceeded = Counter(label: "org.orlandos-nl.mongoqueue.jobsSucceeded")
package let jobsInvalid = Counter(label: "org.orlandos-nl.mongoqueue.jobsIgnored")
package let jobsRequeued = Counter(label: "org.orlandos-nl.mongoqueue.jobsRequeued")
package let jobsExpired = Counter(label: "org.orlandos-nl.mongoqueue.jobsExpired")
package let jobsRemoved = Counter(label: "org.orlandos-nl.mongoqueue.jobsRemoved")
package let jobsKilled = Counter(label: "org.orlandos-nl.mongoqueue.jobsKilled")
package let jobsFailed = Counter(label: "org.orlandos-nl.mongoqueue.jobsFailed")
public var newTaskPollingFrequency = NIO.TimeAmount.milliseconds(1000)
public let options: Set<Option>

Expand Down Expand Up @@ -148,47 +158,51 @@ public final class MongoQueue: @unchecked Sendable {
}

func runNextTask() async throws -> TaskExecutionResult {
let context = try BSONEncoder().encode(TaskModel.ExecutingContext())
var writeConcern = WriteConcern()
writeConcern.acknowledgement = .majority

var filter: Document = "status" == TaskStatus.scheduled.raw.rawValue
let executeAfterFilter: Document = "executeAfter" <= Date()
filter = (filter && executeAfterFilter).makeDocument()

let reply = try await collection.findOneAndUpdate(
where: filter,
to: [
"$set": [
"status": TaskStatus.executing.raw.rawValue,
"execution": context
] as Document
],
returnValue: .modified
)
.sort([
"priority": .descending,
"executeBefore": .ascending,
"creationDate": .ascending
])
.writeConcern(writeConcern)
.execute()

guard let taskDocument = reply.value else {
// No task found
return .noneExecuted
}

var task = try BSONDecoder().decode(TaskModel.self, from: taskDocument)
guard let knownType = knownTypes.first(where: { $0.category == task.category }) else {
throw MongoQueueError.unknownTaskCategory
}

do {
try await knownType.performTask(&task)
return .taskSuccessful
} catch {
return .taskFailure(error)
try await withSpan("MongoQueue.\(#function)") { span in
let context = try BSONEncoder().encode(TaskModel.ExecutingContext())
var writeConcern = WriteConcern()
writeConcern.acknowledgement = .majority

var filter: Document = "status" == TaskStatus.scheduled.raw.rawValue
let executeAfterFilter: Document = "executeAfter" <= Date()
filter = (filter && executeAfterFilter).makeDocument()

let reply = try await collection.findOneAndUpdate(
where: filter,
to: [
"$set": [
"status": TaskStatus.executing.raw.rawValue,
"execution": context
] as Document
],
returnValue: .modified
)
.sort([
"priority": .descending,
"executeBefore": .ascending,
"creationDate": .ascending
])
.writeConcern(writeConcern)
.execute()

guard let taskDocument = reply.value else {
// No task found
return .noneExecuted
}

var task = try BSONDecoder().decode(TaskModel.self, from: taskDocument)
guard let knownType = knownTypes.first(where: { $0.category == task.category }) else {
logger.error("Job of type '\(task.category)' is not known and ignored")
jobsInvalid.increment()
throw MongoQueueError.unknownTaskCategory
}

do {
try await knownType.performTask(&task)
return .taskSuccessful
} catch {
return .taskFailure(error)
}
}
}

Expand Down Expand Up @@ -259,6 +273,7 @@ public final class MongoQueue: @unchecked Sendable {
started = true

try await ensureIndexes()

try await withThrowingTaskGroup(of: Void.self) { taskGroup in
taskGroup.addTask {
while !Task.isCancelled {
Expand Down Expand Up @@ -286,7 +301,7 @@ public final class MongoQueue: @unchecked Sendable {

taskGroup.addTask {
for await () in self.checkServerNotifications.stream {
try? await self.tick()
try? await self.findTasks()
}
}

Expand Down Expand Up @@ -326,7 +341,7 @@ public final class MongoQueue: @unchecked Sendable {
while !Task.isCancelled, self.started {
do {
// Kick off the first tick, because we might already have backlog
try await self.tick()
try await self.findTasks()

try await withThrowingTaskGroup(of: Void.self) { taskGroup in
taskGroup.addTask {
Expand All @@ -353,19 +368,22 @@ public final class MongoQueue: @unchecked Sendable {
}
}

private func tick() async throws {
private func findTasks() async throws {
do {
repeat {
switch try await self.runNextTask() {
case .taskFailure(let error):
logger.debug("Failed to run task: \(error)")
fallthrough
case .taskSuccessful:
serverHasData = true
case .noneExecuted:
serverHasData = false
}
} while !Task.isCancelled && serverHasData
try await withSpan("MongoQueue.\(#function)") { span in
repeat {
switch try await self.runNextTask() {
case .taskFailure(let error):
span.recordError(error)
logger.debug("Failed to run task: \(error)")
fallthrough
case .taskSuccessful:
serverHasData = true
case .noneExecuted:
serverHasData = false
}
} while !Task.isCancelled && serverHasData
}
} catch {
// Task execution failed due to a MongoDB error
// Otherwise the return type would specify the task status
Expand Down Expand Up @@ -445,6 +463,7 @@ public final class MongoQueue: @unchecked Sendable {
"execution": Null()
] as Document
]).execute()
jobsKilled.increment()
} catch {
self.logger.error("Failed to dequeue stale task id \(task._id) of type \(task.category)")
}
Expand Down

0 comments on commit 8354892

Please sign in to comment.