@@ -6,7 +6,8 @@ Standalone admin panel with all data stored in SQLite database
66- [ Broker-agnostic admin panel for Taskiq] ( #broker-agnostic-admin-panel-for-taskiq )
77 - [ Previews] ( #previews )
88 - [ Usage] ( #usage )
9- - [ Docker Compose Examples] ( #docker-compose-examples )
9+ - [ Docker Compose Example] ( #docker-compose-example )
10+ - [ Task States] ( #task-states )
1011 - [ Development] ( #development )
1112
1213### Previews
@@ -16,7 +17,7 @@ Tasks Page | Task Details Page
1617
1718### Usage
1819
19- 1 ) Add this middleware to your taskiq broker :
20+ 1 ) Add this middleware to your project :
2021
2122``` python
2223from typing import Any
@@ -26,30 +27,49 @@ from datetime import datetime, UTC
2627import httpx
2728from taskiq import TaskiqMiddleware, TaskiqResult, TaskiqMessage
2829
29- TASKIQ_ADMIN_URL = " ..." # or os.getenv() to use .env vars
30- TASKIQ_ADMIN_API_TOKEN = " ..." # or os.getenv() to use .env vars
31-
32-
3330class TaskiqAdminMiddleware (TaskiqMiddleware ):
34- def __init__ (self , taskiq_broker_name : str | None = None ):
31+ def __init__ (
32+ self ,
33+ url : str ,
34+ api_token : str ,
35+ taskiq_broker_name : str | None = None ,
36+ ):
3537 super ().__init__ ()
38+ self .url = url
39+ self .api_token = api_token
3640 self .__ta_broker_name = taskiq_broker_name
3741
42+ async def post_send (self , message ):
43+ now = datetime.now(UTC ).replace(tzinfo = None ).isoformat()
44+ async with httpx.AsyncClient() as client:
45+ await client.post(
46+ headers = {" access-token" : self .api_token},
47+ url = urljoin(self .url, f " /api/tasks/ { message.task_id} /queued " ),
48+ json = {
49+ " args" : message.args,
50+ " kwargs" : message.kwargs,
51+ " taskName" : message.task_name,
52+ " worker" : self .__ta_broker_name,
53+ " queuedAt" : now,
54+ },
55+ )
56+ return super ().post_send(message)
57+
3858 async def pre_execute (self , message : TaskiqMessage):
3959 """ """
60+ now = datetime.now(UTC ).replace(tzinfo = None ).isoformat()
4061 async with httpx.AsyncClient() as client:
4162 await client.post(
42- headers = {" access-token" : TASKIQ_ADMIN_API_TOKEN },
43- url = urljoin(TASKIQ_ADMIN_URL , f " /api/tasks/ { message.task_id} /started " ),
63+ headers = {" access-token" : self .api_token },
64+ url = urljoin(self .url , f " /api/tasks/ { message.task_id} /started " ),
4465 json = {
66+ " startedAt" : now,
4567 " args" : message.args,
4668 " kwargs" : message.kwargs,
4769 " taskName" : message.task_name,
4870 " worker" : self .__ta_broker_name,
49- " startedAt" : datetime.now(UTC ).replace(tzinfo = None ).isoformat(),
5071 },
5172 )
52-
5373 return super ().pre_execute(message)
5474
5575 async def post_execute (
@@ -58,26 +78,50 @@ class TaskiqAdminMiddleware(TaskiqMiddleware):
5878 result : TaskiqResult[Any],
5979 ):
6080 """ """
81+ now = datetime.now(UTC ).replace(tzinfo = None ).isoformat()
6182 async with httpx.AsyncClient() as client:
6283 await client.post(
63- headers = {" access-token" : TASKIQ_ADMIN_API_TOKEN },
64- url = urljoin(TASKIQ_ADMIN_URL , f " /api/tasks/ { message.task_id} /executed " ),
84+ headers = {" access-token" : self .api_token},
85+ url = urljoin(
86+ self .url,
87+ f " /api/tasks/ { message.task_id} /executed " ,
88+ ),
6589 json = {
90+ " finishedAt" : now,
6691 " error" : result.error
6792 if result.error is None
6893 else repr (result.error),
6994 " executionTime" : result.execution_time,
7095 " returnValue" : {" return_value" : result.return_value},
71- " finishedAt" : datetime.now(UTC ).replace(tzinfo = None ).isoformat(),
7296 },
7397 )
74-
7598 return super ().post_execute(message, result)
7699```
77100
78- 2 ) Pull the image from GitHub Container Registry: ` docker pull ghcr.io/taskiq-python/taskiq-admin:latest `
101+ 2 ) Connect the middleware to your broker:
102+
103+ ``` python
104+ ...
105+ broker = (
106+ ListQueueBroker(
107+ url = redis_url,
108+ queue_name = " my_lovely_queue" ,
109+ )
110+ .with_result_backend(result_backend)
111+ .with_middlewares(
112+ TaskiqAdminMiddleware(
113+ url = " http://localhost:3000" , # the url to your taskiq-admin instance
114+ api_token = " supersecret" , # any secret enough string
115+ taskiq_broker_name = " mybroker" ,
116+ )
117+ )
118+ )
119+ ...
120+ ```
79121
80- 3 ) Replace ` TASKIQ_ADMIN_API_TOKEN ` with any secret enough string and run:
122+ 3 ) Pull the image from GitHub Container Registry: ` docker pull ghcr.io/taskiq-python/taskiq-admin:latest `
123+
124+ 4 ) Replace ` TASKIQ_ADMIN_API_TOKEN ` with any secret enough string and run:
81125``` bash
82126docker run -d --rm \
83127 -p " 3000:3000" \
@@ -87,17 +131,10 @@ docker run -d --rm \
87131 " ghcr.io/taskiq-python/taskiq-admin:latest"
88132```
89133
90- 4 ) Go to ` http://localhost:3000/tasks `
134+ 5 ) Go to ` http://localhost:3000/tasks `
91135
92- ### Docker Compose Examples
136+ ### Docker Compose Example
93137
94- .env file example:
95- ``` bash
96- TASKIQ_ADMIN_URL=" http://taskiq_admin:3000"
97- TASKIQ_ADMIN_API_TOKEN=" supersecret"
98- ```
99-
100- compose.yml file example
101138``` yaml
102139services :
103140 queue :
@@ -106,8 +143,9 @@ services:
106143 dockerfile : ./Dockerfile
107144 container_name : my_queue
108145 command : taskiq worker app.tasks.queue:broker --workers 1 --max-async-tasks 20
109- env_file :
110- - .env
146+ environment :
147+ - TASKIQ_ADMIN_URL=http://taskiq_admin:3000
148+ - TASKIQ_ADMIN_API_TOKEN=supersecret
111149 depends_on :
112150 - redis
113151 - taskiq_admin
@@ -117,15 +155,23 @@ services:
117155 container_name : taskiq_admin
118156 ports :
119157 - 3000:3000
120- env_file :
121- - .env
158+ environment :
159+ - TASKIQ_ADMIN_API_TOKEN=supersecret
122160 volumes :
123161 - admin_data:/usr/database/
124162
125163volumes :
126164 admin_data :
127165` ` `
128166
167+ ### Task States
168+ Let's assume we have a task 'do_smth', there are all states it can embrace:
169+ 1) ` queued` - the task has been sent to the queue without an error
170+ 2) `running` - the task is grabbed by a worker and is being processed
171+ 3) `success` - the task is fully processed without any errors
172+ 4) `failure` - an error occured during the task processing
173+ 5) `abandoned` - taskiq-admin sets all 'running' tasks as 'abandoned' if there was a downtime between the time these tasks were in 'running' state and the time of next startup of taskiq-admin
174+
129175# ## Development
1301761) Run `pnpm install` to install all dependencies
1311772) Run `pnpm db:push` to create the sqlite database if needed
0 commit comments