6 Star 3 Fork 0

张思聪/jiuyun

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
test_add_task.py 689 Bytes
一键复制 编辑 原始数据 按行查看 历史
disenQF 提交于 2021-07-08 17:46 . celery任务
import os
import uuid
from celery.result import AsyncResult
import django
from approve.tasks import add_approve
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'jiuyun.settings')
django.setup()
tasks = []
for i in range(10):
tasks.append(add_approve.delay(appr_id=uuid.uuid4().hex).id)
print('所有的任务', tasks)
i = 0
while tasks:
if i >= len(tasks): i = 0
task_id = tasks[i]
try:
ret = AsyncResult(id=task_id).get(timeout=0.1) # 同步的方法,阻塞
except:
continue
finally:
i += 1
print(task_id, ret)
if ret == 'Ok':
tasks.remove(task_id) # tasks.pop(-1)
else:
print(task_id, '处理失败')
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/sicongzhang/jiuyun.git
[email protected]:sicongzhang/jiuyun.git
sicongzhang
jiuyun
jiuyun
master

搜索帮助