Skip to content

Commit

Permalink
Support google pubsub as adapter (#12)
Browse files Browse the repository at this point in the history
* Support google pubsub as adapter

* Fix testcase

* Support pubsub and fix some typo
  • Loading branch information
Wh1isper authored Sep 10, 2024
1 parent 64d825f commit eae0bc4
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 3 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Motivation: Redis provided on the cloud is usually only available within a VPC a
## Supported queue service

- [x] AWS SQS
- [x] Google PubSub

Welcome to contribute more queue service, see [adapter/impl](./redis_canal/adapter/impl/) for more details.

Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ classifiers = [
]

[project.optional-dependencies]
all = ["redis_canal[sqs]"]
all = ["redis_canal[sqs, pubsub]"]
sqs = ["boto3"]
pubsub = ["google-cloud-pubsub"]
test = [
"pytest",
"pytest-asyncio",
Expand Down
152 changes: 152 additions & 0 deletions redis_canal/adapter/impl/pubsub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
from __future__ import annotations

import asyncio
from functools import cached_property
from typing import TYPE_CHECKING, Awaitable

from redis_canal.adapter.plugin import Adapter, hookimpl
from redis_canal.log import logger
from redis_canal.models import Message
from redis_canal.tools import run_in_threadpool

if TYPE_CHECKING:
from google.cloud import pubsub_v1


class PubSubAdapter(Adapter):
"""
PubSub Adapter for Google Cloud Pub/Sub.
Args:
queue_url (str): The topic path, e.g. projects/{project}/topics/{topic}
poll_time (float): The time to poll the queue
poll_size (int): The number of messages to poll at a time
"""

register_name = "pubsub"

def __init__(self, queue_url: str, poll_time: float, poll_size: int, *args, **kwargs):
super().__init__(queue_url, poll_time, poll_size, *args, **kwargs)

if self.poll_time < 1:
self.poll_time = 1
if self.poll_size > 10:
self.poll_size = 10

self.poll_time = self.poll_time
self.ensure_queue_exists()

@property
def topic_path(self) -> str:
return self.queue_url

@property
def subscription_path(self) -> str:
return f"projects/{self.project_id}/subscriptions/{self.subscribe_id}"

@property
def project_id(self) -> str:
return self.topic_path.split("/")[1]

@property
def subscribe_id(self) -> str:
return "redis_cannal_pubsub"

@cached_property
def publisher(self) -> "pubsub_v1.PublisherClient":
try:
from google.cloud import pubsub_v1
except ImportError:
raise RuntimeError(
"Google Cloud Pub/Sub SDK is required to use PubSub Adapter, try install redis_canal with `pip install redis_canal[all]` for all components or `pip install redis_canal[pubsub]` for pubsub"
)

return pubsub_v1.PublisherClient()

@cached_property
def subscriber(self) -> "pubsub_v1.SubscriberClient":
try:
from google.cloud import pubsub_v1
except ImportError:
raise RuntimeError(
"Google Cloud Pub/Sub SDK is required to use PubSub Adapter, try install redis_canal with `pip install redis_canal[all]` for all components or `pip install redis_canal[pubsub]` for pubsub"
)

return pubsub_v1.SubscriberClient()

def ensure_queue_exists(self):
from google.api_core import exceptions

try:
self.publisher.create_topic(
name=self.topic_path,
)
except exceptions.AlreadyExists:
pass

try:
self.subscriber.create_subscription(
name=self.subscription_path,
topic=self.topic_path,
)
except exceptions.AlreadyExists:
pass

async def emit(self, message: Message) -> None:
def _():
response = self.publisher.publish(
topic=self.topic_path,
data=message.model_dump_json().encode("utf-8"),
)

logger.debug(f"Published message {response.result()} to {self.topic_path}")

await run_in_threadpool(_)

async def poll(self, process_func: Awaitable[Message], *args, **kwargs) -> None:
pubsub_messages = await self._poll_message()
await asyncio.gather(
*[
self._process_messages(process_func, message, ack_id)
for message, ack_id in pubsub_messages
]
)

async def _poll_message(self) -> list[tuple[Message, str]]:
def _():
response = self.subscriber.pull(
request={
"subscription": self.subscription_path,
"max_messages": self.poll_size,
}
)
return [
(
Message.model_validate_json(message.message.data.decode("utf-8")),
message.ack_id,
)
for message in response.received_messages
]

return await run_in_threadpool(_)

async def _process_messages(
self, process_func: Awaitable[Message], message: Message, ack_id: str
):
try:
await process_func(message)
except Exception as e:
logger.error(f"Error processing message {message}: {e}")
else:
await run_in_threadpool(
self.subscriber.acknowledge,
request={
"subscription": self.subscription_path,
"ack_ids": [ack_id],
},
)


@hookimpl
def register(manager):
manager.register(PubSubAdapter)
2 changes: 1 addition & 1 deletion redis_canal/adapter/impl/sqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def client(self):
import boto3
except ImportError:
raise RuntimeError(
"boto3 is not installed, try install moriarty with `pip install moriarty[matrix]` for all components or `pip install moriarty[sqs]` for sqs only"
"boto3 is not installed, try install redis_canal with `pip install redis_canal[all]` for all components or `pip install redis_canal[sqs]` for sqs"
)
return boto3.client("sqs")

Expand Down
54 changes: 54 additions & 0 deletions tests/adapter/test_pubsub_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import os

import pytest

from redis_canal.adapter.impl.pubsub import PubSubAdapter
from redis_canal.models import Message


@pytest.fixture
def pubsub_adapter(case_id):
try:
import google.api_core.exceptions as exceptions
import google.cloud.pubsub_v1

except ImportError:
pytest.skip("Google Cloud Pub/Sub SDK is not installed")

project_id = os.getenv("TEST_PUBSUB_PROJECT_ID")
if not project_id:
pytest.skip("TEST_PUBSUB_PROJECT_ID is not configured")

queue_url = f"projects/{project_id}/topics/test-topic"
try:
adapter = PubSubAdapter(
queue_url=queue_url,
poll_time=1,
poll_size=10,
)
except Exception as e:
pytest.skip(f"Google Cloud Pub/Sub SDK is not configured correctly: {e}")

yield adapter
try:
adapter.publisher.delete_topic(request={"topic": queue_url})
except exceptions.NotFound:
pass


async def test_pubsub_adapter(pubsub_adapter):

message_input = Message(
redis_key="test",
message_id="123-345",
message_content={"f1": "v1"},
)

async def validate(message):
assert message == message_input
print("validated!")

await pubsub_adapter.emit(message_input)
await pubsub_adapter.poll(
process_func=validate,
)
2 changes: 1 addition & 1 deletion tests/test_adapter_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


def test_bridge_manager():
registed = ["sqs"]
registed = ["sqs", "pubsub"]
manager = AdapterManager()

assert sorted(registed) == sorted(manager.registed_cls.keys())

0 comments on commit eae0bc4

Please sign in to comment.