diff --git a/Makefile b/Makefile index 63e6316..9f7d861 100644 --- a/Makefile +++ b/Makefile @@ -4,9 +4,9 @@ VERSION := $(shell cat rabbit/__init__.py | grep '__version__ ' | cut -d'"' -f 2 lint: ifeq ($(SKIP_STYLE), ) @echo "> running isort..." - isort --profile black rabbit - isort --profile black tests - isort --profile black setup.py + isort rabbit + isort tests + isort setup.py @echo "> running black..." black rabbit black tests @@ -30,7 +30,7 @@ docs: install-deps: @echo "> installing dependencies..." - pip install -r requirements-dev.txt + uv pip install -r requirements-dev.txt pre-commit install tox: diff --git a/pyproject.toml b/pyproject.toml index 225e117..82aedd1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,3 +3,6 @@ requires = [ "setuptools >= 46.4.0", ] build-backend = "setuptools.build_meta" + +[tool.isort] +profile = "black" diff --git a/rabbit/__init__.py b/rabbit/__init__.py index a2f99a9..eed860a 100644 --- a/rabbit/__init__.py +++ b/rabbit/__init__.py @@ -7,7 +7,7 @@ from .queue import Queue from .subscribe import Subscribe -__version__ = "3.2.0" +__version__ = "3.3.0" __all__ = [ "__version__", "AioRabbitClient", diff --git a/rabbit/_wait.py b/rabbit/_wait.py index 84dc887..1176809 100644 --- a/rabbit/_wait.py +++ b/rabbit/_wait.py @@ -1,15 +1,15 @@ import os -from typing import Optional +from typing import Union from .logger import logger def expo( - headers, - delay: Optional[int] = None, - base: Optional[int] = None, - factor: Optional[int] = None, - max_delay: Optional[int] = None, + headers: Union[None, dict], + delay: Union[None, int] = None, + base: Union[None, int] = None, + factor: Union[None, int] = None, + max_delay: Union[None, int] = None, ) -> int: """Exponential delay strategy.""" delay = delay or int(os.getenv("EXPO_DELAY", 300000)) @@ -30,9 +30,9 @@ def expo( def fibo( - headers, - delay: Optional[int] = None, - max_delay: Optional[int] = None, + headers: Union[None, dict], + delay: Union[None, int] = None, + max_delay: Union[None, int] = None, ) -> int: """Incremental delay strategy.""" delay = delay or int(os.getenv("FIBO_DELAY", 300000)) @@ -47,14 +47,14 @@ def fibo( return int(max_delay) -def constant(headers, delay: Optional[int] = None) -> int: +def constant(headers: Union[None, dict], delay: Union[None, int] = None) -> int: """Constant delay strategy.""" delay = delay or int(os.getenv("CONSTANT_DELAY", 300000)) logger.debug(f"constant delay strategy: [delay={delay}]") return delay -def _set_timeout(headers, delay: int) -> int: +def _set_timeout(headers: Union[None, dict], delay: int) -> int: if (headers is not None) and ("x-delay" in headers): delay = headers["x-delay"] return int(delay) diff --git a/rabbit/background_tasks.py b/rabbit/background_tasks.py index 703051b..d5ffab5 100644 --- a/rabbit/background_tasks.py +++ b/rabbit/background_tasks.py @@ -37,6 +37,9 @@ def discard(self, task: asyncio.Task) -> None: def tasks_by_name(self) -> List[str]: return [task_name for task_name in self._tasks.keys()] + def __getitem__(self, name: str) -> asyncio.Task: + return self._tasks[name] + def __iter__(self) -> Generator[asyncio.Task, None, None]: for _, task in self._tasks.items(): yield task diff --git a/rabbit/cli/__init__.py b/rabbit/cli/__init__.py index 1f23041..b90e533 100644 --- a/rabbit/cli/__init__.py +++ b/rabbit/cli/__init__.py @@ -202,19 +202,19 @@ def consumer( @click.option("--channels", default=1, show_default=True, help="Channel max.") @click.option("-v", "--verbose", is_flag=True, help="Extend output info.") def send_event( - payload, - events, - exchange, - key, - host, - port, - login, - password, - ssl, - verify, - channels, - verbose, -): + payload: Path, + events: int, + exchange: str, + key: str, + host: str, + port: int, + login: str, + password: str, + ssl: bool, + verify: bool, + channels: int, + verbose: bool, +) -> None: """Send a sample message 📤 to Consumer or PollingPublisher""" if verbose: table = Table.grid(padding=(0, 1)) diff --git a/rabbit/cli/consumer.py b/rabbit/cli/consumer.py index 9e75618..8539170 100644 --- a/rabbit/cli/consumer.py +++ b/rabbit/cli/consumer.py @@ -20,10 +20,10 @@ def __init__( exchange_topic: str, queue_name: str, concurrent: int, - ): + ) -> None: self.subscribe_client = AioRabbitClient() - self._loop = asyncio.get_event_loop() + self._loop = self.event_loop() # type: ignore self._loop.create_task( self.subscribe_client.persistent_connect( host=host, port=port, login=login, password=password @@ -37,7 +37,14 @@ def __init__( self.queue_name = queue_name self.concurrent = concurrent - def run(self, chaos_mode: bool = False, verbose: bool = True): + def event_loop(self): + try: + loop = asyncio.get_running_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + return loop + + def run(self, chaos_mode: bool = False, verbose: bool = True) -> None: task = async_echo_job if chaos_mode: task = async_chaos_job @@ -45,7 +52,7 @@ def run(self, chaos_mode: bool = False, verbose: bool = True): self._loop.run_until_complete(self.init(task, verbose)) self._loop.run_forever() - async def init(self, task, verbose: bool = False): + async def init(self, task, verbose: bool = False) -> None: logger.info(f"Using '{task.__doc__}'") subscribe = Subscribe( task=task, diff --git a/rabbit/cli/publisher.py b/rabbit/cli/publisher.py index b0d0b5e..e026a5a 100644 --- a/rabbit/cli/publisher.py +++ b/rabbit/cli/publisher.py @@ -7,7 +7,7 @@ class Publisher: - def __init__(self, exchange_name: str, routing_key: str, **kwargs): + def __init__(self, exchange_name: str, routing_key: str, **kwargs) -> None: self.loop = asyncio.get_event_loop() self.client = AioRabbitClient() self.exchange_name = exchange_name diff --git a/rabbit/client.py b/rabbit/client.py index 1cfafd8..0e62975 100644 --- a/rabbit/client.py +++ b/rabbit/client.py @@ -1,6 +1,6 @@ import asyncio import random -from typing import List, Optional +from typing import List, Union from uuid import uuid4 import aioamqp @@ -40,7 +40,7 @@ def __repr__(self) -> str: return f"AioRabbitClient(connected={connected}, channels={channels}, max_channels={max_channels}, background_tasks={self._background_tasks})" @property - def server_properties(self) -> Optional[dict]: + def server_properties(self) -> Union[None, dict]: """Get server properties from the current connection.""" try: return self.protocol.server_properties # type: ignore @@ -67,14 +67,14 @@ async def connect(self, **kwargs) -> None: """Connect to message broker.""" self.transport, self.protocol = await aioamqp.connect(**kwargs) - async def persistent_connect(self, **kwargs): + async def persistent_connect(self, **kwargs) -> None: """Connect to message broker ensuring reconnection in case of error.""" while True: try: self.transport, self.protocol = await aioamqp.connect(**kwargs) await self.protocol.wait_closed() self.transport.close() - except (OSError, aioamqp.exceptions.AmqpClosedConnection) as err: + except (OSError, AmqpClosedConnection) as err: logger.error( f"ConnectionError: [error='{err}', host='{kwargs.get('host')}', port={kwargs.get('port')}, login='{kwargs.get('login')}']" ) diff --git a/rabbit/dlx.py b/rabbit/dlx.py index c61f620..b588879 100644 --- a/rabbit/dlx.py +++ b/rabbit/dlx.py @@ -24,7 +24,7 @@ class DLX: queue: Queue = field( validator=validators.instance_of(Queue), ) - delay_strategy: Callable = field( + delay_strategy: Callable[..., int] = field( default=constant, validator=validators.is_callable() ) _channel = field(init=False, repr=False) @@ -37,14 +37,13 @@ def channel(self) -> Channel: return self._channel @channel.setter - def channel(self, channel: Channel): + def channel(self, channel: Channel) -> None: self._channel = channel async def configure(self) -> None: """Configure DLX channel, queues and exchange.""" try: - await self._configure_queue() - await self._configure_exchange() + await asyncio.gather(self._configure_queue(), self._configure_exchange()) await self._configure_queue_bind() except AttributeNotInitialized: logger.debug("Waiting client initialization...DLX") diff --git a/rabbit/exceptions.py b/rabbit/exceptions.py index 0fbdb5c..7d96d50 100644 --- a/rabbit/exceptions.py +++ b/rabbit/exceptions.py @@ -1,18 +1,20 @@ class AttributeNotInitialized(Exception): - pass + def __init__(self, message: str = "Attribute not initialized") -> None: + super().__init__(message) class OperationError(Exception): - pass + def __init__(self, message: str = "OperationError") -> None: + super().__init__(message) class ExchangeNotFound(Exception): - def __init__( - self, exchange_name: str, message: str = "Exchange '{name}' not found" - ): - super().__init__(message.format(name=exchange_name)) + def __init__(self, exchange_name: str) -> None: + super().__init__(f"Exchange '{exchange_name}' not found") class ClientNotConnectedError(Exception): - def __init__(self, message="AioRabbitClient was not connected with RabbitMQ"): + def __init__( + self, message: str = "AioRabbitClient was not connected with RabbitMQ" + ) -> None: super().__init__(message) diff --git a/rabbit/publish.py b/rabbit/publish.py index f40b343..c183a18 100644 --- a/rabbit/publish.py +++ b/rabbit/publish.py @@ -1,5 +1,5 @@ import os -from typing import Optional +from typing import Union from aioamqp.channel import Channel from aioamqp.exceptions import ChannelClosed @@ -57,8 +57,8 @@ async def enable_publish_confirms(self) -> None: async def send_event( self, payload: bytes, - exchange_name: Optional[str] = None, - routing_key: Optional[str] = None, + exchange_name: Union[None, str] = None, + routing_key: Union[None, str] = None, **kwargs, ) -> None: """Sends event message to broker.""" diff --git a/rabbit/subscribe.py b/rabbit/subscribe.py index 19d62df..5538a9f 100644 --- a/rabbit/subscribe.py +++ b/rabbit/subscribe.py @@ -1,7 +1,7 @@ import asyncio import os from contextlib import suppress -from typing import Callable, Optional +from typing import Callable, Union from aioamqp.channel import Channel from aioamqp.envelope import Envelope @@ -86,12 +86,11 @@ def channel(self, channel: Channel) -> None: self._dlx.channel = channel self._channel = channel - async def configure(self, channel: Optional[Channel] = None) -> None: + async def configure(self, channel: Union[None, Channel] = None) -> None: """Configure subscriber channel, queues and exchange.""" await self.qos(prefetch_count=self.concurrent) with suppress(SynchronizationError): - await self._configure_queue() - await self._dlx.configure() + await asyncio.gather(self._configure_queue(), self._dlx.configure()) await self._configure_exchange() await self._configure_queue_bind() @@ -167,7 +166,7 @@ async def qos( prefetch_size: int = 0, prefetch_count: int = 0, connection_global: bool = False, - ): + ) -> None: """Configure qos feature in the subscriber channel.""" await self.channel.basic_qos( prefetch_size=prefetch_size, diff --git a/setup.cfg b/setup.cfg index 1d1f60e..d3bdbee 100644 --- a/setup.cfg +++ b/setup.cfg @@ -72,15 +72,23 @@ asyncio_mode = auto files = rabbit show_error_context = True verbosity = 0 + ignore_missing_imports = True +check_untyped_defs = True +extra_checks = True +strict_equality = True +no_implicit_optional = True +no_implicit_reexport = True + warn_unused_configs = True warn_return_any = True warn_unused_ignores = True warn_unreachable = True -no_implicit_optional = True + +disallow_untyped_calls = True [tox:tox] -envlist = py{38,39,310,311} +envlist = py{38,39,310,311,312} [testenv] deps = -rrequirements-dev.txt diff --git a/tests/unit/test_background_tasks.py b/tests/unit/test_background_tasks.py new file mode 100644 index 0000000..bd2d063 --- /dev/null +++ b/tests/unit/test_background_tasks.py @@ -0,0 +1,42 @@ +import asyncio + +import pytest + +from rabbit.background_tasks import BackgroundTasks +from rabbit.job import async_echo_job + + +@pytest.fixture +def background_tasks(): + return BackgroundTasks() + + +async def test_background_tasks_add(background_tasks): + background_tasks.add("test-task", async_echo_job, b'{"message": "test"}') + assert len(background_tasks) == 1 + + +async def test_background_tasks_multiple_add(background_tasks): + background_tasks.add("test-task-1", async_echo_job, b'{"message": "test"}') + background_tasks.add("test-task-1", async_echo_job, b'{"message": "test"}') + background_tasks.add("test-task-2", async_echo_job, b'{"message": "test2"}') + assert len(background_tasks) == 2 + + +async def test_background_tasks_by_name(background_tasks): + background_tasks.add("test-task", async_echo_job, b'{"message": "test"}') + for task in background_tasks: + assert task.get_name() == "test-task" + + +async def test_background_tasks_getitem(background_tasks): + background_tasks.add("test-task", async_echo_job, b'{"message": "test"}') + assert isinstance(background_tasks["test-task"], asyncio.Task) + + +def test_background_tasks_len(background_tasks): + assert len(background_tasks) == 0 + + +def test_background_tasks_repr(background_tasks): + assert repr(background_tasks) == "BackgroundTasks(tasks=0, tasks_by_name=[])" diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 7a23cdb..fa30e71 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -2,7 +2,7 @@ import pytest -from rabbit.client import aioamqp +from rabbit.client import AioRabbitClient, aioamqp from rabbit.exceptions import AttributeNotInitialized from tests.conftest import AioAmqpMock @@ -44,3 +44,8 @@ def test_server_properties_with_client_not_connected(client): def test_server_properties(client_mock): assert isinstance(client_mock.server_properties, dict) + + +@pytest.mark.parametrize("attribute", ["transport", "server_properties", "protocol"]) +def test_client_attributes(attribute): + assert hasattr(AioRabbitClient, attribute) diff --git a/tests/unit/test_dlx.py b/tests/unit/test_dlx.py index d1fb0a9..2ee4542 100644 --- a/tests/unit/test_dlx.py +++ b/tests/unit/test_dlx.py @@ -1,5 +1,6 @@ import pytest +from rabbit.dlx import DLX from rabbit.exceptions import OperationError from tests.conftest import EnvelopeMock, PropertiesMock @@ -20,3 +21,10 @@ async def test_send_event_error_without_client_connection(dlx): def test_dlx_repr(dlx): assert isinstance(repr(dlx), str) + + +@pytest.mark.parametrize( + "attribute", ["exchange", "dlq_exchange", "queue", "delay_strategy", "channel"] +) +def test_dlx_attributes(attribute): + assert hasattr(DLX, attribute) diff --git a/tests/unit/test_exceptions.py b/tests/unit/test_exceptions.py new file mode 100644 index 0000000..990ffa2 --- /dev/null +++ b/tests/unit/test_exceptions.py @@ -0,0 +1,14 @@ +from rabbit.exceptions import ClientNotConnectedError, ExchangeNotFound + + +def test_exchange_not_found_message(): + assert ( + str(ExchangeNotFound("test-exchange")) == "Exchange 'test-exchange' not found" + ) + + +def test_client_not_connected_error(): + assert ( + str(ClientNotConnectedError()) + == "AioRabbitClient was not connected with RabbitMQ" + ) diff --git a/tests/unit/test_jobs.py b/tests/unit/test_jobs.py index 186df2d..9ef8dd3 100644 --- a/tests/unit/test_jobs.py +++ b/tests/unit/test_jobs.py @@ -1,6 +1,14 @@ -from rabbit.job import async_echo_job +import pytest + +from rabbit.job import async_chaos_job, async_echo_job async def test_async_echo_job(): resp = await async_echo_job(b'{"process": 123}', skip_wait=True) assert resp == b'{"process": 123}' + + +@pytest.mark.xfail(reason="The test may fail due to randomness issue") +async def test_async_chaos_job(): + resp = await async_chaos_job(b'{"process": 123}', skip_wait=True) + assert resp == b'{"process": 123}' diff --git a/tests/unit/test_publish.py b/tests/unit/test_publish.py index 7029862..d85845d 100644 --- a/tests/unit/test_publish.py +++ b/tests/unit/test_publish.py @@ -24,3 +24,10 @@ async def test_publish_confirms_disabled(publish): async def test_publish_confirms_enabled(): publish = Publish(True) assert publish.publish_confirms is True + + +@pytest.mark.parametrize( + "attribute", ["publish_confirms", "name", "channel_id", "channel"] +) +def test_publish_attributes(attribute): + assert hasattr(Publish, attribute) diff --git a/tests/unit/test_queue.py b/tests/unit/test_queue.py index 0159755..3d349be 100644 --- a/tests/unit/test_queue.py +++ b/tests/unit/test_queue.py @@ -1,7 +1,6 @@ -import attr +import pytest -def test_attributes(queue): - values = ["queue", True, {}] - for value in values: - assert value in attr.asdict(queue).values() +@pytest.mark.parametrize("attribute", ["name", "durable", "arguments"]) +def test_attributes(queue, attribute): + assert hasattr(queue, attribute) diff --git a/tests/unit/test_subscribe.py b/tests/unit/test_subscribe.py index 4849c56..2fb6662 100644 --- a/tests/unit/test_subscribe.py +++ b/tests/unit/test_subscribe.py @@ -1,10 +1,9 @@ import pytest from rabbit.exceptions import ClientNotConnectedError +from rabbit.subscribe import Subscribe from tests.conftest import EnvelopeMock -PAYLOAD = b'{"a": 1}' - async def test_register_subscribe_without_client_connected(subscribe): with pytest.raises(ClientNotConnectedError): @@ -25,3 +24,10 @@ def test_subscribe_repr(subscribe_mock): async def test_nack_event(subscribe_mock): await subscribe_mock.nack_event(EnvelopeMock()) + + +@pytest.mark.parametrize( + "attribute", ["task", "exchange", "queue", "concurrent", "delay_strategy"] +) +def test_subscribe_attributes(attribute): + assert hasattr(Subscribe, attribute)