一、前言

  Celery是三个依据python开发的分布式任务队列,若是不了然请阅读作者上1篇博文Celery入门与进阶,而做python
WEB开发最为盛行的框架莫属Django,可是Django的伸手处理进度都以共同的一筹莫展兑现异步职务,若要完结异步任务处理须求通过其余事办公室法(前端的1般化解方案是ajax操作),而后台Celery正是天经地义的选项。假使3个用户在实践有个别操作要求等待很久才回去,那大大降低了网址的吞吐量。上面将讲述Django的央求处理大概流程(图片来源互联网):

金沙注册送58 1

恳请进程不难表明:浏览器发起呼吁–>请求处理–>请求经过中间件–>路由映射–>视图处理工科作逻辑–>响应请求(template或response)

网上有为数不少celery +
django达成定时职责的科目,不过它们当先肆分之一是依照djcelery + celery3的;
只怕是应用django_celery_beat配置较为麻烦的。

持有演示均基于Django二.0

Celery
是多少个总结、灵活且保障的,处理大量音讯的分布式系统,并且提供保障这么一个类别的画龙点睛工具。
它是贰个在意于实时处理的职务队列,同时也协助任务调度。
以上是celery自个儿官网的牵线

二、配置利用

  celery很不难集成到Django框架中,当然假若想要完成定时义务的话还亟需安装django-celery-beta插件,前边会表明。须要专注的是Celery四.0只援助Django版本>=1.八的,借使是低于1.8版本须求动用Celery3.1。

精晓简洁而连忙才是我们最终的求偶,而celery四已经不必要额外插件即可与django结合达成定时任务了,原生的celery
beat就能够很好的达成定时任务成效。

celery是1个依照python开发的简约、灵活且有限扶助的分布式职责队列框架,协理使用任务队列的主意在分布式的机械/进度/线程上推行职分调度。采纳独立的劳动者-消费者模型,主要由三片段组成:

celery的利用场景很常见

配置

  新建立项目taskproj,目录结构(各个app下多了个tasks文件,用于定义职务):

taskproj
├── app01
│   ├── __init__.py
│   ├── apps.py
│   ├── migrations
│   │   └── __init__.py
│   ├── models.py
│   ├── tasks.py
│   └── views.py
├── manage.py
├── taskproj
│   ├── __init__.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
└── templates

在档次目录taskproj/taskproj/目录下新建celery.py:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery


os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'taskproj.settings')  # 设置django环境

app = Celery('taskproj')

app.config_from_object('django.conf:settings', namespace='CELERY') #  使用CELERY_ 作为前缀,在settings中写配置

app.autodiscover_tasks()  # 发现任务文件每个app下的task.py

taskproj/taskproj/__init__施行异步任务和定时职分,分布式职分队列学习笔记。.py:

from __future__ import absolute_import, unicode_literals

from .celery import app as celery_app

__all__ = ['celery_app']

taskproj/taskproj/settings.py

CELERY_BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis作为消息中间件

CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis

CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案

跻身项指标taskproj目录运行worker:

celery worker -A taskproj -l debug

自然使用原生方案的同时有几点插件所拉动的便宜被咱们屏弃了:

  • 金沙注册送58,音讯队列broker:broker实际上正是贰个MQ队列服务,能够选拔redis、rabbitmq等作为broker
  • 处理职务的消费者workers:broker通告worker队列中有职分,worker去队列中取出职分履行,每多少个worker就是八个进度
  • 积存结果的backend:执行结果存款和储蓄在backend,暗许也会储存在broker使用的MQ队列服务中,也足以独自安顿用何种服务做backend
  • 处理异步任务
  • 职分调度
  • 拍卖定时任务
  • 分布式调度

概念与触发义务

  职责定义在每一种tasks文件中,app01/tasks.py:

from __future__ import absolute_import, unicode_literals
from celery import shared_task


@shared_task
def add(x, y):
    return x + y


@shared_task
def mul(x, y):
    return x * y

视图中触发职务

from django.http import JsonResponse
from app01 import tasks

# Create your views here.

def index(request,*args,**kwargs):
    res=tasks.add.delay(1,3)
    #任务逻辑
    return JsonResponse({'status':'successful','task_id':res.task_id})

访问

金沙注册送58 2

 若想获得职责结果,能够通过task_id使用AsyncResult获取结果,还足以一直通过backend获取:

金沙注册送58 3

 

  • 插件提供的定时职责管理将不在可用,当大家只需求任务定期执行而不必要人工资调整度的时候这一点忽略不计。
  • 不能火速的保管或追踪定时职责,定时职分的跟踪其实交给日志更合理,可是对义务的修改就不曾那么便宜了,不过假若不必要平常改变/增减职分的话那点也在可承受范围内。

金沙注册送58 4

利益也很多,越发在选取python创设的使用系统中,无缝衔接,使用一定有益。

扩展

  除了redis、rabbitmq能做结果存款和储蓄外,还足以采纳Django的orm作为结果存款和储蓄,当然需求设置依赖插件,那样的便宜在于大家可以直接通过django的数据查看到职务状态,同时为能够制定愈多的操作,上边介绍怎么着行使orm作为结果存款和储蓄。

1.安装

pip install django-celery-results

2.配置settings.py,注册app

INSTALLED_APPS = (
    ...,
    'django_celery_results',
)

4.修改backend配置,将redis改为django-db

#CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,这里使用redis

CELERY_RESULT_BACKEND = 'django-db'  #使用django orm 作为结果存储

5.改动数据库

python3 manage.py migrate django_celery_results

此刻会合到数据库会多成立:

金沙注册送58 5 当然你有时候要求对task表举办操作,以下源码的表结构定义:

class TaskResult(models.Model):
    """Task result/status."""

    task_id = models.CharField(_('task id'), max_length=255, unique=True)
    task_name = models.CharField(_('task name'), null=True, max_length=255)
    task_args = models.TextField(_('task arguments'), null=True)
    task_kwargs = models.TextField(_('task kwargs'), null=True)
    status = models.CharField(_('state'), max_length=50,
                              default=states.PENDING,
                              choices=TASK_STATE_CHOICES
                              )
    content_type = models.CharField(_('content type'), max_length=128)
    content_encoding = models.CharField(_('content encoding'), max_length=64)
    result = models.TextField(null=True, default=None, editable=False)
    date_done = models.DateTimeField(_('done at'), auto_now=True)
    traceback = models.TextField(_('traceback'), blank=True, null=True)
    hidden = models.BooleanField(editable=False, default=False, db_index=True)
    meta = models.TextField(null=True, default=None, editable=False)

    objects = managers.TaskResultManager()

    class Meta:
        """Table information."""

        ordering = ['-date_done']

        verbose_name = _('task result')
        verbose_name_plural = _('task results')

    def as_dict(self):
        return {
            'task_id': self.task_id,
            'task_name': self.task_name,
            'task_args': self.task_args,
            'task_kwargs': self.task_kwargs,
            'status': self.status,
            'result': self.result,
            'date_done': self.date_done,
            'traceback': self.traceback,
            'meta': self.meta,
        }

    def __str__(self):
        return '<Task: {0.task_id} ({0.status})>'.format(self)

 

Celery定时职责安排

在拓展布署前先来看望项目组织:

.├── linux_news│   ├── celery.py│   ├── __init__.py│   ├── settings.py│   ├── urls.py│   └── wsgi.py├── manage.py├── news│   ├── admin.py│   ├── apps.py│   ├── __init__.py│   ├── migrations│   ├── models│   ├── tasks.py│   ├── tests.py│   └── views└── start-celery.sh

其中news是大家的app,用于从局地rss订阅源获取情报音信,linux_news则是我们的project。大家要求关爱的关键是celery.py,settings.py,tasks.py和start-celery.sh。

先是是celery.py,想让celery执行职分就务须实例化三个celery
app,并把settings.py里的布局传入app:

import osfrom celery import Celery# set the default Django settings module for the 'celery' program.os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'linux_news.settings')app = Celery('linux_news')# 'django.conf:settings'表示django,conf.settings也就是django项目的配置,celery会根据前面设置的环境变量自动查找并导入# - namespace表示在settings.py中celery配置项的名字的统一前缀,这里是'CELERY_',配置项的名字也需要大写app.config_from_object('django.conf:settings', namespace='CELERY')# Load task modules from all registered Django app configs.app.autodiscover_tasks()

配置正是那样不难,为了能在django里使用这么些app,大家须求在__init__.py中程导弹入它:

from .celery import app as celery_app

然后我们来看tasks.py,它应该置身你的app目录中,前边大家安插了自动发现,所以celery会自动找到那个tasks,大家的tasks将写在这一模块中,代码涉及了有个别orm的使用,为了契合核心笔者做了些简洁明了:

from linux_news.celery import celery_app as appfrom .models import *import timeimport feedparserimport pytzimport html@app.task(ignore_result=True)def fetch_news(origin_name):    """    fetch all news from origin_name    """    origin = get_feeds_origin(origin_name)    feeds = feedparser.parse(origin.feed_link)    for item in feeds['entries']:        entry = NewsEntry()        entry.title = item.title        entry.origin = origin        entry.author = item.author        entry.link = item.link        # add timezone        entry.publish_time = item.time.replace(tzinfo=pytz.utc)        entry.summary = html.escape(item.summary)        entry.save()@app.task(ignore_result=True)def fetch_all_news():    """    这是我们的定时任务    fetch all origins' news to db    """    origins = NewsOrigin.objects.all()    for origin in origins:        fetch_news.delay(origin.origin_name)

tasks里是局地耗费时间操作,比如互连网IO也许数据库读写,因为大家不关切义务的再次回到值,所以利用@app.task(ignore_result=True)将其屏蔽了。

职务布置达成后大家就要陈设celery了,大家选拔redis作为天职队列,小编强烈提议在生育条件中应用rabbitmq大概redis作为天职队列或结果缓存后端,而不应有接纳关系型数据库:

# redisREDIS_PORT = 6379REDIS_DB = 0# 从环境变量中取得redis服务器地址REDIS_HOST = os.environ.get('REDIS_ADDR', 'redis')# celery settings# 这两项必须设置,否则不能正常启动celery beatCELERY_ENABLE_UTC = TrueCELERY_TIMEZONE = TIME_ZONE# 任务队列配置CELERY_BROKER_URL = f'redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}'CELERY_ACCEPT_CONTENT = ['application/json', ]CELERY_RESULT_BACKEND = f'redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}'CELERY_TASK_SERIALIZER = 'json'

然后是我们的定时任务设置:

from celery.schedules import crontabCELERY_BEAT_SCHEDULE={        'fetch_news_every-1-hour': {            'task': 'news.tasks.fetch_all_news',            'schedule': crontab(minute=0, hour='*/1'),        }}

定时职责布置对象是一个dict,由任务名和安顿项组成,首要安顿想如下:

  • task:职分函数所在的模块,模块路径得写全,不然找不到将相当小概运转该职务
  • schedule:定时方针,一般选拔celery.schedules.crontab,上边例子为每小时的0分执行贰次职责,具体写法与linux的crontab类似能够参照文书档案表明
  • args:是个元组,给出职务必要的参数,若是不须要参数也得以不写进配置,就如例子中的一样
  • 任何配置项较少用,能够参考文书档案
    至此,配置celery beat的部分就病逝了。

异步职责

Celery

3、Django中使用定时任务

  假如想要在django中使用定时职责效用雷同是靠beat实现职责发送成效,当在Django中使用定时职责时,必要设置django-celery-beat插件。以下将介绍使用进程。

启动celery beat

布置实现后只必要运行celery了。

启航此前布署一下条件。不要用root运营celery!不要用root运营celery!不要用root运营celery!主要的作业说壹遍。

start-celery.sh:

export REDIS_ADDR=127.0.0.1celery -A linux_news worker -l info -B -f /path/to/log

-A 表示app所在的目录,-B代表运维celery beat运维定时任务。
celery符合规律运行后就能够由此日记来查看职责是或不是正常运营了:

[2018-12-21 13:00:00,022: INFO/MainProcess] Received task: news.tasks.fetch_all_news[e4566ede-2cfa-4c19-b2f3-0c7d6c38690d]  [2018-12-21 13:00:00,046: INFO/MainProcess] Received task: news.tasks.fetch_news[583e96dc-f508-49fa-a24a-331e0c07a86b]  [2018-12-21 13:00:00,051: INFO/ForkPoolWorker-2] Task news.tasks.fetch_all_news[e4566ede-2cfa-4c19-b2f3-0c7d6c38690d] succeeded in 0.02503809699555859s: None[2018-12-21 13:00:00,052: INFO/MainProcess] Received task: news.tasks.fetch_news[c61a3e55-dd3c-4d49-8d6d-ca9b1757db25]  [2018-12-21 13:00:00,449: INFO/ForkPoolWorker-5] Task news.tasks.fetch_news[c61a3e55-dd3c-4d49-8d6d-ca9b1757db25] succeeded in 0.39487219898728654s: None[2018-12-21 13:00:00,606: INFO/ForkPoolWorker-3] Task news.tasks.fetch_news[583e96dc-f508-49fa-a24a-331e0c07a86b] succeeded in 0.5523456179944333s: None

以上就是celery四运转定时职分的剧情,如有错误和疏漏,欢迎指正。

本身的异步使用处境为项目上线:前端web上有个上线按钮,点击按钮后发请求给后端,后端执行上线进度要六秒钟,后端在收取到请求后把任务放入队列异步执行,同时马上重返给前端2个任务执行中的结果。若果未有异步执行会怎样呢?同步的景观正是履行进度中前端一直在等后端再次来到结果,页面转呀转的就转超时了。

安装

安装配置

壹.beat插件安装

pip3 install django-celery-beat

2.注册APP

INSTALLED_APPS = [
    ....   
    'django_celery_beat',
]

叁.数据库变更

python3 manage.py migrate django_celery_beat

肆.分别运行woker和beta

celery -A proj beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler  #启动beta 调度器使用数据库

celery worker -A taskproj -l info #启动woker

5.配置admin

urls.py

# urls.py
from django.conf.urls import url
from django.contrib import admin

urlpatterns = [
    url(r'^admin/', admin.site.urls),
]

陆.开立用户

python3 manage.py createsuperuser 

七.登录admin举办政管理制(地址

金沙注册送58 6

 

 使用示例:

金沙注册送58 7

 

 

 

 

金沙注册送58 8

 

 

 查看结果:

金沙注册送58 9

 

异步职务计划

安装Celery

推荐使用pip安装,假如您利用的是虚拟环境,请在虚拟环境里设置

$ pip install celery

二回开发

  django-celery-beat插件本质上是对数码库表变化检查,一旦有多少库表改变,调度珍视新读取职责举办调度,所以如若想协调定制的职分页面,只需求操作beat插件的四张表就能够了。当然你还是能团结定义调度器,django-celery-beat插件已经嵌入了model,只要求进行导入便可进行orm操作,以下笔者用django
reset api实行出现说法:

settings.py

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'app01.apps.App01Config',
    'django_celery_results',
    'django_celery_beat',
    'rest_framework',
]

urls.py

urlpatterns = [
    url(r'^admin/', admin.site.urls),
    url(r'^index$', views.index),
    url(r'^res$', views.get_res),
    url(r'^tasks$', views.TaskView.as_view({'get':'list'})),
]

views.py

from django_celery_beat.models import PeriodicTask  #倒入插件model
from rest_framework import serializers
from rest_framework import pagination
from rest_framework.viewsets import ModelViewSet
class Userserializer(serializers.ModelSerializer):
    class Meta:
        model = PeriodicTask
        fields = '__all__'

class Mypagination(pagination.PageNumberPagination):
    """自定义分页"""
    page_size=2
    page_query_param = 'p'
    page_size_query_param='size'
    max_page_size=4

class TaskView(ModelViewSet):
    queryset = PeriodicTask.objects.all()
    serializer_class = Userserializer
    permission_classes = []
    pagination_class = Mypagination

访问

金沙注册送58 10

 

一.安装rabbitmq,那里大家应用rabbitmq作为broker,安装到位后暗许运转了,也不须求任何任何配置

设置音讯中间件

Celery 帮助 RabbitMQ、Redis 甚至别的数据库系统作为其新闻代理中间件

您期望用什么中间件和后端就请自行安装,一般都应用redis也许RabbitMQ

# apt-get install rabbitmq-server

安装Redis

在Ubuntu系统下行使apt-get命令就能够

$ sudo apt-get install redis-server

假使你利用redis作为中间件,还索要设置redis援助包,同样利用pip安装即可

$ pip install redis

能出现以下结果即为成功

redis 127.0.0.1:6379>

其余的redis知识那里不左介绍,借使有趣味,可以自行明白

假使你利用RabbitMQ,也请安装RabbitMQ

2.安装celery

安装RabbitMQ

$ sudo apt-get install rabbitmq-server
# pip3 install celery

使用Celery

三.celery用在django项目中,django项目目录结构(简化)如下

简短直接运用

可以在要求的地方直接引进Celery,直接利用即可。最简单易行的艺术只需求安排3个职责和中间人即可

from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/3')

@app.task
def add(x, y):
    return x + y

自家那边运用了redis作为中间件,那是能够按本身的习惯替换的

是因为暗中认可的配备不是最适合我们的花色其实须求,一般的话大家都亟需按大家友好的供给布署部分,
然则出于须求将项目解耦,也好维护,我们最棒利用单独的四个文本编写制定配置。

website/
|-- deploy
|  |-- admin.py
|  |-- apps.py
|  |-- __init__.py
|  |-- models.py
|  |-- tasks.py
|  |-- tests.py
|  |-- urls.py
|  `-- views.py
|-- manage.py
|-- README
`-- website
  |-- celery.py
  |-- __init__.py
  |-- settings.py
  |-- urls.py
  `-- wsgi.py

单独安插配置文件

比地方的略微复杂一点,大家须要创立多少个公文,二个为config.py的celery配置文件,在中间填写适合大家项目标配置,在开创1个tasks.py文件来编排大家的职分。文件的名字能够按您的喜好本人取名。

config.py内容为:

# coding=utf-8
# 配置文件同一配置celery
BROKER_URL = 'redis://localhost:6379/3'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/4'

CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True

# 把“脏活”路由到专用的队列:
CELERY_ROUTES = {
    'tasks.add': 'low-priority',
}

# 限制任务的速率,这样每分钟只允许处理 10 个该类型的任务:
CELERY_ANNOTATIONS = {
    'tasks.add': {'rate_limit': '10/m'}
}

配备好之后能够用以下命令检查布置文件是不是正确(config为布局文件名)

$ python -m config

tasks.py内容为:

# coding=utf-8
from celery import Celery

app = Celery()
# 参数为配置文件的文件名
app.config_from_object('config')

@app.task
def add(x, y):
    return x + y

再有壹种同等设置配置的点子,不是很推荐

app.conf.update(
    task_serializer='json',
    accept_content=['json'],  # Ignore other content
    result_serializer='json',
    timezone='Europe/Oslo',
    enable_utc=True,
)

在app使用前先必要用上述措施批量立异配备文件。

4.创建 website/celery.py 主文件

在运用上使用

工程目录结构为

proj/
    __init__.py
    # 存放配置和启动celery代码
    celery.py
    # 存放任务
    tasks.py

celery.py为:

from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('proj',
             broker='redis://localhost:6379/3',
             backend='redis://localhost:6379/4',
             include=['proj.tasks'])

# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()

tasks.py为:

from __future__ import absolute_import, unicode_literals
from .celery import app


@app.task
def add(x, y):
    return x + y


@app.task
def mul(x, y):
    return x * y


@app.task
def xsum(numbers):
    return sum(numbers)

启航celery只必要在proj同级目录下:

$ celery -A proj worker -l info
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery, platforms

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'website.settings')

app = Celery('website')

# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#  should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

# 允许root 用户运行celery
platforms.C_FORCE_ROOT = True

@app.task(bind=True)
def debug_task(self):
  print('Request: {0!r}'.format(self.request))

在django中使用celery

咱俩的django的品种的目录结构相似如下

proj/
    manage.py
    myapp/
    proj/
        __init__py
        settings.py
        urls.py
        wsgi.py

想要在django项目中选取celery,大家首先需求在django中配置celery

我们供给在与工程名同名的子文件夹中添加celery.py文件
在本例中也便是proj/proj/celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
# 第二个参数为工程名.settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

# 括号里的参数为工程名
app = Celery('proj')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
# 配置文件需要写在setting.py中,并且配置项需要使用`CELERY_`作为前缀
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
# 能够自动加载所有在django中注册的app,也就是setting.py中的INSTALLED_APPS
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

下一场大家须求在同级目录下的**init.py文件中配备如下内容 proj/proj/init.py**

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']

然后大家就能够把须要的职分放到须求的app下的tasks.py中,今后项目目录结构如下

proj/
    manage.py
    myapp1/
        __init__.py
        tasks.py
        views.py
        model.py
        tests.py
    myapp2/
        __init__.py
        tasks.py
        views.py
        model.py
        tests.py
    proj/
        __init__py
        settings.py
        urls.py
        wsgi.py

或者的多少个tasks.py文件内容如下:
myapp1/tasks.py为:

# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task
import time


@shared_task
def add(x, y):
    # 为了测试是否是异步,特意休眠5s,观察是否会卡主主进程
    time.sleep(5)
    print(x+y)
    return x + y


@shared_task
def mul(x, y):
    return x * y


@shared_task
def xsum(numbers):
    return sum(numbers)

@shared_task修饰器可以让你创立task不必要app实体

在急需的地点调用相关职务即可,例如在myapp1/views.py中调用

from django.shortcuts import render
from .tasks import add


def index(request):
    # 测试celery任务
    add.delay(4,5)
    return render(request,'index.html')

然后就可以运行项目,celery须要单独运维,所以要求开两个极端,分别

启航web应用服务器

$ python manage.py runserver

启动celery

$ celery -A proj worker -l info

接下来访问浏览器就能够在运转celery的极限中看出输出

金沙注册送58 11

测试结果

5.在 website/__init__.py
文件中追加如下内容,确认保证django运行的时候那几个app能够被加载到

扩展

  • 假定你的花色需求在admin中管理调度,请使用django-celery-beat
  1. 使用pip安装django-celery-beat

$ pip install django-celery-beat

并非在动用django-celery,这些项目现已甘休更新好好多年。。。。

  1. 在settings.py中丰盛那么些app

INSTALLED_APPS = (
    ...,
    'django_celery_beat',
)
  1. 联机一下数据库

$ python manage.py migrate
  1. 设置celery
    beat
    劳动应用django_celery_beat.schedulers:DatabaseScheduler
    scheduler

$ celery -A proj beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

然后在就足以admin界面看到了。

  • 就算您想利用Django-OHummerH二M只怕Django
    Cache作为后端,须要安装django-celery-results扩张(我不提出)
  1. 使用pip安装django-celery-results

$ pip install django-celery-results

不用在动用django-celery,这些类型现已终止更新好好多年。。。。

  1. 在settings.py中加上那么些app

INSTALLED_APPS = (
    ...,
    'django_celery_results',
)
  1. 协助实行一下数据库

$ python manage.py migrate django_celery_results
  1. 布局后端,在settings.py中陈设

# 使用数据库作为结果后端
CELERY_RESULT_BACKEND = 'django-db'

# 使用缓存作为结果后端
CELERY_RESULT_BACKEND = 'django-cache'

着力使用大致就是上述这几个,别的实际布置和应用还需自身研读法定文书档案

注:

  • 上述条件在ubuntu16.04 lts django1.9中搭建测试成功
  • 上述文字皆为个体见解,如有错误或建议请登时交流自己
from __future__ import absolute_import

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']

六.各使用成立tasks.py文件,那里为 deploy/tasks.py

from __future__ import absolute_import
from celery import shared_task

@shared_task
def add(x, y):
  return x + y

专注tasks.py必须建在各app的根目录下,且不得不叫tasks.py,不能够随随便便命名

7.views.py中援引使用那些tasks异步处理

from deploy.tasks import add

def post(request):
  result = add.delay(2, 3)


result.ready()
result.get(timeout=1)
result.traceback

8.启动celery

# celery -A website worker -l info

九.这样在调用post这么些主意时,里边的add就足以异步处理了

定时职责

定时职分的应用情况就很广泛了,比如本身要求定时发送报告给业主~

定时任务安插

  1. website/celery.py 文件添加如下配置以支撑定时职分crontab
from celery.schedules import crontab

app.conf.update(
  CELERYBEAT_SCHEDULE = {
    'sum-task': {
      'task': 'deploy.tasks.add',
      'schedule': timedelta(seconds=20),
      'args': (5, 6)
    }
    'send-report': {
      'task': 'deploy.tasks.report',
      'schedule': crontab(hour=4, minute=30, day_of_week=1),
    }
  }
)

概念了多个task:

  • 名为’sum-task’的task,每20秒执行1遍add函数,并传了几个参数伍和陆
  • 名为’send-report’的task,每一周1深夜4:30推行report函数

timedelta是datetime中的一个对象,须要 from datetime import timedelta
引进,有如下多少个参数

  • days
  • seconds
  • microseconds
  • milliseconds
  • minutes
  • hours

crontab的参数有:

month_of_year
day_of_month
day_of_week
hour
minute

  1. deploy/tasks.py 文件添加report方法:
@shared_task
def report():
  return 5

三.运营celery
beat,celery运营了二个beat进度一贯在不停的判定是还是不是有任务急需履行

# celery -A website beat -l info

Tips

一.只要您还要选拔了异步任务和安顿职务,有一种更简约的起步格局
celery -A website worker -b -l info ,可同时开动worker和beat

二.万壹使用的不是rabbitmq做队列那么必要在主配置文件中 website/celery.py
配置broker和backend,如下:

# redis做MQ配置
app = Celery('website', backend='redis', broker='redis://localhost')
# rabbitmq做MQ配置
app = Celery('website', backend='amqp', broker='amqp://admin:admin@localhost')

三.celery不能用root用户运维的话须要在主配置文件中添加
platforms.C_FORCE_ROOT = True

4.celery在长日子运作后也许出现内部存款和储蓄器泄漏,要求加上配置
CELERYD_MAX_TASKS_PER_CHILD = 10
,表示各样worker执行了多少个任务就死掉

如上正是本文的全体内容,希望对大家的学习抱有支持,也可望大家多多帮助脚本之家。

你只怕感兴趣的文章:

  • 异步任务队列Celery在Django中的利用方式
  • Django使用Celery异步职分队列的运用
  • Django中使用celery完毕异步职责的演示代码

相关文章

网站地图xml地图