python定时任务异步任务工具celery与Scheduler

python定时任务异步任务工具celery与Scheduler

2023年6月29日发(作者:)

python定时任务异步任务⼯具celery与Schedulerpython定时任务/异步任务⼯具celery与Scheduler⽂章⽬录修改时间2018-07-01修改说明初次成稿修改⼈AyoCross本⽂以django/flask为例,介绍如何使⽤celery or Scheduler 来实现python的定时任务计划,并⽐较⼆者在使⽤过程中的不同之处,区分不同场景来选择适当的⼯具。1. why 异步 or 定时异步任务,指的是与同步任务相反的⼀种逻辑处理流程,在执⾏某任务A的过程中,需要执⾏任务B,但是B任务⽐较耗时,⽽且B的结果对A⽆影响。这种情况下,就需要考虑使⽤异步来实现任务B,从⽽使A的处理速度加快。⼀个典型的例⼦就是⽤户操作过程中的邮件提醒,⽐如业务逻辑如下:当⽤户修改web项⽬中的某⼏个敏感参数的时候,需要向该⽤户以及管理员⽤户发送通知邮件,并完成参数的修改。那么⽤户的修改操作即可看作是A任务,邮件发送即为B任务,B任务的发送结果(邮件发送成功还是失败)对修改任务A是否成功执⾏⽆影响。此时就需要考虑将B邮件发送任务异步实现,减少修改参数的服务器处理速度,提⾼⽤户体验。定时任务,就更好理解了,⽐如需要每隔5分钟检查⼀次数据库⾥的当天有效数据,如果发⽣变更,则需要执⾏某些逻辑。此时就需要定时任务来执⾏响应的任务计划。how to1. SchedulerScheduler作为⼀个轻量级的定时任务⼯具,在⼩型项⽬或者轻量级异步任务中经常会⽤到,最常见的使⽤⽅法如下:⾸先,安装Scheduler:pip install APScheduler;然后,在项⽬启动的位置,调⽤以下函数:from ound import BackgroundSchedulerdef initialize_cron(): apsched = BackgroundScheduler() _job(isp_sync , 'cron' , kwargs = {} , hour = 2 , minute = 10) _job(kafka_sync,'cron', hour = 6 , minute = 10) _job(workflow,'cron', minute = '*/10') _job(detect_isp_access,'cron', hour='*') _job(mass_report,'cron', minute = '*/5') ()感觉似乎没什么好介绍的,只要部署上去,就完事了,它会每隔多少时间,或者每天的定点时间,去执⾏响应的任务。⽐较⾼阶的⽤法,会出现在某些特殊的任务,可能需要在某个时间段内暂停任务,然后再恢复任务。就会需要⽤到pause / resume,详情可以查看,写的很详细。2. celery只要是项⽬上了点规模,或者说,既有定时任务,也有异步任务需要;再或者,待执⾏的任务占资源⽐较多,那就最好使⽤celery来实现,可以把任务从主任务中剥离出来,减少项⽬占⽤资源过多出问题的⼏率。在django/flask中使⽤celery,也是⾮常简单的,⾸先,安装celery,pip install celery==3.1.25pip install redis==2.10.5 # Broker,⽤于存放celery任务队列,除celery之外,也可使⽤rabbitMQ/Zookeeper等安装完毕后,需要在项⽬中配置celery,以flask为例:在创建flask_app时,对celery进⾏配置:app = Flask("cdn_console",static_folder='', static_url_path='')_object('DefaultConfig')cdn_celery = configure_celery(app)def configure_celery(app): platforms.C_FORCE_ROOT = True celery = Celery(_name, broker=['CELERY_BROKER_URL']) () Taskbase = class ContextTask(Taskbase): abstract = True def __call__(self, *args, **kw): with _context(): return Taskbase.__call__(self, *args, **kw) = ContextTask return celeryclass DefaultConfig(object): DEBUG = True TESTING = False _basedir = (h(e(e( e(__file__))))) if DEBUG==True: redis_host='127.0.0.1' else: redis_host='' REDIS_URL = "redis://%s:6379/1"%redis_host CELERY_BROKER_URL = 'redis://%s:6379'%redis_host, # Broker

地址 # CELERY_RESULT_BACKEND = 'redis://localhost:6379', #

结果存储地址

#

定时任务, CELERYBEAT_SCHEDULE = { 'task1': { #

定时任务1,每隔5分钟执⾏⼀次 'task': '_delay_func_1', "schedule": timedelta(minutes=5), "args": '', }, #'task2': { # 'task': '_delay_func_2', # "schedule": timedelta(minutes=30), # "args": '', #}, }}

# 中的定时任务:@cdn_f api_delay_func_1(log_type='5'): """ 定时任务 """ from run import app app_ctx = _context() app_() try: try: print '定时任务' app_() except Exception as e: app_() print e

配置中之所有有个ContextTask函数,是因此flask的应⽤上下⽂,这种配置也是flask官⽅推荐的配置模块。如果某个函数需要执⾏异步任务,添加⼀个装饰器即可:@cdn_f flow_converge(): """ 流量/带宽 汇聚 """ print '异步任务' return#

在需要调⽤该异步函数的位置,使⽤delay来调⽤from tasks import flow_convergeflow_()项⽬中添加完毕之后,在项⽬所在的⽬录运⾏脚本来执⾏celery:nohup celery worker -A tasks --loglevel=info -c 2 >../celery_ 2>&1 &nohup celery beat -A tasks --loglevel=info >../celery_ 2>&1 &celery架构Celery Beat,任务调度器,Beat进程会读取配置⽂件的内容,周期性的将配置中到期需要执⾏的任务发送给任务队列Celery Worker,执⾏任务的消费者 Broker,消息代理,接受⽣产者的任务消息,存进队列然后按序发送给消费者Producer,定时任务或者调⽤了API产⽣任务交给任务队列进⾏处理 Result Backend,任务处理完后保存状态信息和结果终⽌执⾏中的celery任务from l import inspectinspect().active()这将列出正在处理的活动任务的列表。 你可以在那⾥获得任务的id 。 ⼀旦获得了任务的id,就可以通过使⽤from l import revokerevoke(task_id, terminate=True)3. which one只要不是太⼩的项⽬,或者你确定项⽬以后不会有其他的定时任务出现了,那么就使⽤scheduler来实现,否则,加⼊celery的怀抱吧!毕竟celery真的是太好⽤了。。AyoCross 20180701

发布者:admin,转转请注明出处:http://www.yc00.com/web/1687978256a62967.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信