Skip to content

Commit 01c1785

Browse files
committed
Added redis broker.
1 parent 30df2ef commit 01c1785

File tree

4 files changed

+142
-9
lines changed

4 files changed

+142
-9
lines changed

.flake8

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ ignore =
7878
; Consider possible security implications associated with pickle module
7979
; Pickle and modules that wrap it can be unsafe when used to deserialize untrusted data, possible security issue
8080
S403, S301
81+
; Found `%` string formatting
82+
WPS323
8183

8284
per-file-ignores =
8385
; all tests

poetry.lock

Lines changed: 33 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ packages = [{ include = "taskiq", from = "taskiq_redis" }]
77

88
[tool.poetry.dependencies]
99
python = "^3.7"
10-
taskiq = { git = "https://github.com/taskiq-python/taskiq", branch = "master" }
1110
redis = "4.3.4"
1211
flake8 = "^4.0.1"
12+
taskiq = {path = "../taskiq"}
1313

1414
[tool.poetry.dev-dependencies]
1515
pytest = "^7.0"
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
import pickle
2+
from logging import getLogger
3+
from typing import Any, AsyncGenerator, Callable, Optional, TypeVar
4+
5+
from redis.asyncio import ConnectionPool, Redis
6+
from taskiq.abc.broker import AsyncBroker
7+
from taskiq.abc.result_backend import AsyncResultBackend
8+
from taskiq.message import BrokerMessage
9+
10+
_T = TypeVar("_T") # noqa: WPS111
11+
12+
logger = getLogger("taskiq.redis_broker")
13+
14+
15+
class RedisBroker(AsyncBroker):
16+
"""Broker that works with Redis."""
17+
18+
def __init__(
19+
self,
20+
url: Optional[str] = None,
21+
task_id_generator: Optional[Callable[[], str]] = None,
22+
result_backend: Optional[AsyncResultBackend[_T]] = None,
23+
queue_name: str = "taskiq",
24+
max_connection_pool_size: Optional[int] = None,
25+
**connection_kwargs: Any,
26+
) -> None:
27+
"""
28+
Constructs a new broker.
29+
30+
:param url: url to redis.
31+
:param task_id_generator: custom task_id generator.
32+
:param result_backend: custom result backend.
33+
:param queue_name: name for a list in redis.
34+
:param max_connection_pool_size: maximum number of connections in pool.
35+
:param connection_kwargs: additional arguments for aio-redis ConnectionPool.
36+
"""
37+
super().__init__(
38+
result_backend=result_backend,
39+
task_id_generator=task_id_generator,
40+
)
41+
42+
self.connection_pool: ConnectionPool = ConnectionPool.from_url(
43+
url=url,
44+
max_connections=max_connection_pool_size,
45+
**connection_kwargs,
46+
)
47+
48+
self.redis_list_name = queue_name
49+
50+
async def shutdown(self) -> None:
51+
"""Closes redis connection pool."""
52+
self.connection_pool.disconnect()
53+
54+
async def kick(self, message: BrokerMessage) -> None:
55+
"""
56+
Sends a message to the redis broker list.
57+
58+
This function constructs message for redis
59+
and sends it.
60+
61+
The message is pickled dict object with message,
62+
task_id, task_name and labels.
63+
64+
:param message: message to send.
65+
"""
66+
async with Redis(connection_pool=self.connection_pool) as redis_conn:
67+
await redis_conn.rpush(
68+
self.redis_list_name,
69+
pickle.dumps(message),
70+
)
71+
72+
async def listen(self) -> AsyncGenerator[BrokerMessage, None]:
73+
"""
74+
Listen redis list for new messages.
75+
76+
This function listens to list and yields new messages.
77+
78+
:yields: parsed broker messages.
79+
"""
80+
async with Redis(connection_pool=self.connection_pool) as redis_conn:
81+
while True:
82+
redis_pickled_message = await redis_conn.blpop(
83+
self.redis_list_name,
84+
)
85+
if redis_pickled_message:
86+
try: # noqa: WPS448
87+
redis_message = pickle.loads( # noqa: WPS220
88+
redis_pickled_message[1],
89+
)
90+
yield BrokerMessage( # noqa: WPS220
91+
task_id=redis_message.task_id,
92+
task_name=redis_message.task_name,
93+
message=redis_message.message,
94+
labels=redis_message.labels,
95+
)
96+
except (
97+
ValueError,
98+
LookupError,
99+
AttributeError,
100+
pickle.UnpicklingError,
101+
) as exc:
102+
logger.debug( # noqa: WPS220
103+
"Cannot read broker message %s",
104+
exc,
105+
exc_info=True,
106+
)

0 commit comments

Comments
 (0)