Skip to content
This repository was archived by the owner on May 18, 2023. It is now read-only.

Commit 2db71b5

Browse files
committed
Get the pool configured, and the worker started.
1 parent 149696f commit 2db71b5

File tree

2 files changed

+14
-41
lines changed

2 files changed

+14
-41
lines changed

requirements_dev.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,4 @@ pytest
1616
pytest-celery
1717
pytest-cov
1818
pytest-runner
19+
pytest-asyncio

tests/test_celery_pool_asyncio.py

Lines changed: 13 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,6 @@
11
#!/usr/bin/env python
2-
3-
import pathlib
42
import asyncio
5-
import time
6-
from pprint import pformat
7-
#from concurrent.futures import TimeoutError
8-
9-
import pytest
10-
from kombu import Queue
11-
12-
13-
from celery_pool_asyncio import TaskPool
3+
import pathlib
144

155
root = pathlib.Path()
166
brocker = root / 'brocker'
@@ -25,45 +15,27 @@
2515
dir_processed = brocker / 'processed'
2616
dir_processed.mkdir(exist_ok=True)
2717

18+
2819
async def tack_function(input_data):
2920
await asyncio.sleep(1)
3021
return input_data.upper()
3122

3223

33-
@pytest.fixture(scope='session')
34-
def celery_config():
35-
return {
24+
def test_create_task(celery_app, celery_worker):
25+
from celery.contrib.testing.app import TestApp
26+
app = TestApp(config={
3627
'broker_url': 'filesystem:// %s' % dir_messages,
3728
'broker_transport_options': {
3829
'data_folder_in': '%s' % dir_out,
3930
'data_folder_out': '%s' % dir_out,
4031
'data_folder_processed': '%s' % dir_processed,
4132
},
4233
'result_persistent': True,
43-
}
44-
45-
46-
@pytest.fixture(scope='session')
47-
def celery_includes():
48-
return [
49-
'test_celery_pool_asyncio',
50-
]
51-
52-
53-
@pytest.fixture(scope='session')
54-
def celery_worker_pool():
55-
return 'celery_pool_asyncio:TaskPool'
56-
57-
58-
def test_create_task(celery_app, celery_worker):
59-
@celery_app.task
60-
async def tack_function(input_data):
61-
await asyncio.sleep(1)
62-
print('tack_function', input_data)
63-
return input_data.upper()
64-
65-
task = tack_function.delay('hello, world!')
66-
print('task', task)
67-
result = task.get(timeout=10)
68-
print('result', result)
69-
34+
'worker_pool': 'celery_pool_asyncio:TaskPool',
35+
})
36+
wrapped = app.task(tack_function)
37+
app.register_task(wrapped)
38+
msg = 'hello, world!'
39+
task = wrapped.delay(msg)
40+
reply = task.get(timeout=10)
41+
assert reply == msg.upper()

0 commit comments

Comments
 (0)