Skip to content

Commit

Permalink
Add burst mode to worker
Browse files Browse the repository at this point in the history
This allows to run a pulpcore-worker in the foreground that will handle
tasks until no more are available.
This can help to drain the task queue during an upgrade, or overcome a
congestion short term.

fixes #4341
  • Loading branch information
mdellweg authored and ipanova committed Sep 1, 2023
1 parent 4a2e54b commit 96ee562
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGES/4341.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added ``--burst`` flag to pulpcore-worker so it will terminate instead of sleeping.
6 changes: 6 additions & 0 deletions docs/plugin_dev/plugin-writer/concepts/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -769,3 +769,9 @@ older codebases could run on newer, upgraded workers. To ensure this always work
forever backwards compatible until the next major Pulp version. For example, you cannot have a
breaking signature change in tasking code and if this is needed you need to make a new task name and
preserve the old code until the next major Pulp version.

.. note::

Users not performing zero downtime upgrades who are still wary of any task incompatibilities,
should consider running the pulpcore worker in burst mode (`pulpcore-worker --burst`) after
shutting down all the api and content workers to drain the task queue.
9 changes: 6 additions & 3 deletions pulpcore/tasking/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
_logger = logging.getLogger(__name__)


@click.option("--pid", help="Write the process ID number to a file at the specified path")
@click.option("--pid", help="Write the process ID number to a file at the specified path.")
@click.option(
"--burst/--no-burst", help="Run in burst mode; terminate when no more tasks are available."
)
@click.command()
def worker(pid):
def worker(pid, burst):
"""A Pulp worker."""

if pid:
Expand All @@ -25,4 +28,4 @@ def worker(pid):

_logger.info("Starting distributed type worker")

NewPulpWorker().run_forever()
NewPulpWorker().run(burst=burst)
16 changes: 10 additions & 6 deletions pulpcore/tasking/pulpcore_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,22 +393,26 @@ def supervise_task(self, task):
self.notify_workers()
self.task = None

def run_forever(self):
def run(self, burst=False):
with WorkerDirectory(self.name):
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
signal.signal(signal.SIGHUP, self._signal_handler)
# Subscribe to pgsql channels
connection.connection.add_notify_handler(self._pg_notify_handler)
self.cursor.execute("LISTEN pulp_worker_wakeup")
self.cursor.execute("LISTEN pulp_worker_cancel")
while not self.shutdown_requested:
if burst:
for task in self.iter_tasks():
self.supervise_task(task)
if not self.shutdown_requested:
self.sleep()
else:
self.cursor.execute("LISTEN pulp_worker_wakeup")
while not self.shutdown_requested:
for task in self.iter_tasks():
self.supervise_task(task)
if not self.shutdown_requested:
self.sleep()
self.cursor.execute("UNLISTEN pulp_worker_wakeup")
self.cursor.execute("UNLISTEN pulp_worker_cancel")
self.cursor.execute("UNLISTEN pulp_worker_wakeup")
self.shutdown()


Expand Down

0 comments on commit 96ee562

Please sign in to comment.