Skip to content

Commit

Permalink
Merge branch 'rl-3.3.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
amenezes committed Mar 23, 2024
2 parents 6eefadf + 2423e74 commit 8876a80
Show file tree
Hide file tree
Showing 22 changed files with 178 additions and 68 deletions.
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ VERSION := $(shell cat rabbit/__init__.py | grep '__version__ ' | cut -d'"' -f 2
lint:
ifeq ($(SKIP_STYLE), )
@echo "> running isort..."
isort --profile black rabbit
isort --profile black tests
isort --profile black setup.py
isort rabbit
isort tests
isort setup.py
@echo "> running black..."
black rabbit
black tests
Expand All @@ -30,7 +30,7 @@ docs:

install-deps:
@echo "> installing dependencies..."
pip install -r requirements-dev.txt
uv pip install -r requirements-dev.txt
pre-commit install

tox:
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ requires = [
"setuptools >= 46.4.0",
]
build-backend = "setuptools.build_meta"

[tool.isort]
profile = "black"
2 changes: 1 addition & 1 deletion rabbit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from .queue import Queue
from .subscribe import Subscribe

__version__ = "3.2.0"
__version__ = "3.3.0"
__all__ = [
"__version__",
"AioRabbitClient",
Expand Down
22 changes: 11 additions & 11 deletions rabbit/_wait.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import os
from typing import Optional
from typing import Union

from .logger import logger


def expo(
headers,
delay: Optional[int] = None,
base: Optional[int] = None,
factor: Optional[int] = None,
max_delay: Optional[int] = None,
headers: Union[None, dict],
delay: Union[None, int] = None,
base: Union[None, int] = None,
factor: Union[None, int] = None,
max_delay: Union[None, int] = None,
) -> int:
"""Exponential delay strategy."""
delay = delay or int(os.getenv("EXPO_DELAY", 300000))
Expand All @@ -30,9 +30,9 @@ def expo(


def fibo(
headers,
delay: Optional[int] = None,
max_delay: Optional[int] = None,
headers: Union[None, dict],
delay: Union[None, int] = None,
max_delay: Union[None, int] = None,
) -> int:
"""Incremental delay strategy."""
delay = delay or int(os.getenv("FIBO_DELAY", 300000))
Expand All @@ -47,14 +47,14 @@ def fibo(
return int(max_delay)


def constant(headers, delay: Optional[int] = None) -> int:
def constant(headers: Union[None, dict], delay: Union[None, int] = None) -> int:
"""Constant delay strategy."""
delay = delay or int(os.getenv("CONSTANT_DELAY", 300000))
logger.debug(f"constant delay strategy: [delay={delay}]")
return delay


def _set_timeout(headers, delay: int) -> int:
def _set_timeout(headers: Union[None, dict], delay: int) -> int:
if (headers is not None) and ("x-delay" in headers):
delay = headers["x-delay"]
return int(delay)
3 changes: 3 additions & 0 deletions rabbit/background_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ def discard(self, task: asyncio.Task) -> None:
def tasks_by_name(self) -> List[str]:
return [task_name for task_name in self._tasks.keys()]

def __getitem__(self, name: str) -> asyncio.Task:
return self._tasks[name]

def __iter__(self) -> Generator[asyncio.Task, None, None]:
for _, task in self._tasks.items():
yield task
Expand Down
26 changes: 13 additions & 13 deletions rabbit/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,19 +202,19 @@ def consumer(
@click.option("--channels", default=1, show_default=True, help="Channel max.")
@click.option("-v", "--verbose", is_flag=True, help="Extend output info.")
def send_event(
payload,
events,
exchange,
key,
host,
port,
login,
password,
ssl,
verify,
channels,
verbose,
):
payload: Path,
events: int,
exchange: str,
key: str,
host: str,
port: int,
login: str,
password: str,
ssl: bool,
verify: bool,
channels: int,
verbose: bool,
) -> None:
"""Send a sample message 📤 to Consumer or PollingPublisher"""
if verbose:
table = Table.grid(padding=(0, 1))
Expand Down
15 changes: 11 additions & 4 deletions rabbit/cli/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ def __init__(
exchange_topic: str,
queue_name: str,
concurrent: int,
):
) -> None:
self.subscribe_client = AioRabbitClient()

self._loop = asyncio.get_event_loop()
self._loop = self.event_loop() # type: ignore
self._loop.create_task(
self.subscribe_client.persistent_connect(
host=host, port=port, login=login, password=password
Expand All @@ -37,15 +37,22 @@ def __init__(
self.queue_name = queue_name
self.concurrent = concurrent

def run(self, chaos_mode: bool = False, verbose: bool = True):
def event_loop(self):
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
return loop

def run(self, chaos_mode: bool = False, verbose: bool = True) -> None:
task = async_echo_job
if chaos_mode:
task = async_chaos_job

self._loop.run_until_complete(self.init(task, verbose))
self._loop.run_forever()

async def init(self, task, verbose: bool = False):
async def init(self, task, verbose: bool = False) -> None:
logger.info(f"Using '{task.__doc__}'")
subscribe = Subscribe(
task=task,
Expand Down
2 changes: 1 addition & 1 deletion rabbit/cli/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@


class Publisher:
def __init__(self, exchange_name: str, routing_key: str, **kwargs):
def __init__(self, exchange_name: str, routing_key: str, **kwargs) -> None:
self.loop = asyncio.get_event_loop()
self.client = AioRabbitClient()
self.exchange_name = exchange_name
Expand Down
8 changes: 4 additions & 4 deletions rabbit/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
import random
from typing import List, Optional
from typing import List, Union
from uuid import uuid4

import aioamqp
Expand Down Expand Up @@ -40,7 +40,7 @@ def __repr__(self) -> str:
return f"AioRabbitClient(connected={connected}, channels={channels}, max_channels={max_channels}, background_tasks={self._background_tasks})"

@property
def server_properties(self) -> Optional[dict]:
def server_properties(self) -> Union[None, dict]:
"""Get server properties from the current connection."""
try:
return self.protocol.server_properties # type: ignore
Expand All @@ -67,14 +67,14 @@ async def connect(self, **kwargs) -> None:
"""Connect to message broker."""
self.transport, self.protocol = await aioamqp.connect(**kwargs)

async def persistent_connect(self, **kwargs):
async def persistent_connect(self, **kwargs) -> None:
"""Connect to message broker ensuring reconnection in case of error."""
while True:
try:
self.transport, self.protocol = await aioamqp.connect(**kwargs)
await self.protocol.wait_closed()
self.transport.close()
except (OSError, aioamqp.exceptions.AmqpClosedConnection) as err:
except (OSError, AmqpClosedConnection) as err:
logger.error(
f"ConnectionError: [error='{err}', host='{kwargs.get('host')}', port={kwargs.get('port')}, login='{kwargs.get('login')}']"
)
Expand Down
7 changes: 3 additions & 4 deletions rabbit/dlx.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class DLX:
queue: Queue = field(
validator=validators.instance_of(Queue),
)
delay_strategy: Callable = field(
delay_strategy: Callable[..., int] = field(
default=constant, validator=validators.is_callable()
)
_channel = field(init=False, repr=False)
Expand All @@ -37,14 +37,13 @@ def channel(self) -> Channel:
return self._channel

@channel.setter
def channel(self, channel: Channel):
def channel(self, channel: Channel) -> None:
self._channel = channel

async def configure(self) -> None:
"""Configure DLX channel, queues and exchange."""
try:
await self._configure_queue()
await self._configure_exchange()
await asyncio.gather(self._configure_queue(), self._configure_exchange())
await self._configure_queue_bind()
except AttributeNotInitialized:
logger.debug("Waiting client initialization...DLX")
Expand Down
16 changes: 9 additions & 7 deletions rabbit/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
class AttributeNotInitialized(Exception):
pass
def __init__(self, message: str = "Attribute not initialized") -> None:
super().__init__(message)


class OperationError(Exception):
pass
def __init__(self, message: str = "OperationError") -> None:
super().__init__(message)


class ExchangeNotFound(Exception):
def __init__(
self, exchange_name: str, message: str = "Exchange '{name}' not found"
):
super().__init__(message.format(name=exchange_name))
def __init__(self, exchange_name: str) -> None:
super().__init__(f"Exchange '{exchange_name}' not found")


class ClientNotConnectedError(Exception):
def __init__(self, message="AioRabbitClient was not connected with RabbitMQ"):
def __init__(
self, message: str = "AioRabbitClient was not connected with RabbitMQ"
) -> None:
super().__init__(message)
6 changes: 3 additions & 3 deletions rabbit/publish.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os
from typing import Optional
from typing import Union

from aioamqp.channel import Channel
from aioamqp.exceptions import ChannelClosed
Expand Down Expand Up @@ -57,8 +57,8 @@ async def enable_publish_confirms(self) -> None:
async def send_event(
self,
payload: bytes,
exchange_name: Optional[str] = None,
routing_key: Optional[str] = None,
exchange_name: Union[None, str] = None,
routing_key: Union[None, str] = None,
**kwargs,
) -> None:
"""Sends event message to broker."""
Expand Down
9 changes: 4 additions & 5 deletions rabbit/subscribe.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import os
from contextlib import suppress
from typing import Callable, Optional
from typing import Callable, Union

from aioamqp.channel import Channel
from aioamqp.envelope import Envelope
Expand Down Expand Up @@ -86,12 +86,11 @@ def channel(self, channel: Channel) -> None:
self._dlx.channel = channel
self._channel = channel

async def configure(self, channel: Optional[Channel] = None) -> None:
async def configure(self, channel: Union[None, Channel] = None) -> None:
"""Configure subscriber channel, queues and exchange."""
await self.qos(prefetch_count=self.concurrent)
with suppress(SynchronizationError):
await self._configure_queue()
await self._dlx.configure()
await asyncio.gather(self._configure_queue(), self._dlx.configure())
await self._configure_exchange()
await self._configure_queue_bind()

Expand Down Expand Up @@ -167,7 +166,7 @@ async def qos(
prefetch_size: int = 0,
prefetch_count: int = 0,
connection_global: bool = False,
):
) -> None:
"""Configure qos feature in the subscriber channel."""
await self.channel.basic_qos(
prefetch_size=prefetch_size,
Expand Down
12 changes: 10 additions & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,23 @@ asyncio_mode = auto
files = rabbit
show_error_context = True
verbosity = 0

ignore_missing_imports = True
check_untyped_defs = True
extra_checks = True
strict_equality = True
no_implicit_optional = True
no_implicit_reexport = True

warn_unused_configs = True
warn_return_any = True
warn_unused_ignores = True
warn_unreachable = True
no_implicit_optional = True

disallow_untyped_calls = True

[tox:tox]
envlist = py{38,39,310,311}
envlist = py{38,39,310,311,312}

[testenv]
deps = -rrequirements-dev.txt
Expand Down
42 changes: 42 additions & 0 deletions tests/unit/test_background_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import asyncio

import pytest

from rabbit.background_tasks import BackgroundTasks
from rabbit.job import async_echo_job


@pytest.fixture
def background_tasks():
return BackgroundTasks()


async def test_background_tasks_add(background_tasks):
background_tasks.add("test-task", async_echo_job, b'{"message": "test"}')
assert len(background_tasks) == 1


async def test_background_tasks_multiple_add(background_tasks):
background_tasks.add("test-task-1", async_echo_job, b'{"message": "test"}')
background_tasks.add("test-task-1", async_echo_job, b'{"message": "test"}')
background_tasks.add("test-task-2", async_echo_job, b'{"message": "test2"}')
assert len(background_tasks) == 2


async def test_background_tasks_by_name(background_tasks):
background_tasks.add("test-task", async_echo_job, b'{"message": "test"}')
for task in background_tasks:
assert task.get_name() == "test-task"


async def test_background_tasks_getitem(background_tasks):
background_tasks.add("test-task", async_echo_job, b'{"message": "test"}')
assert isinstance(background_tasks["test-task"], asyncio.Task)


def test_background_tasks_len(background_tasks):
assert len(background_tasks) == 0


def test_background_tasks_repr(background_tasks):
assert repr(background_tasks) == "BackgroundTasks(tasks=0, tasks_by_name=[])"
Loading

0 comments on commit 8876a80

Please sign in to comment.