开发公司flask 定时任务 flask-apscheduler

-APScheduler介绍

Flask-APScheduler是基于APScheduler库开发的Flask拓展库。APScheduler的全称是Advanced Python 。允许您将Python开发公司代码安排为稍后执行,开发公司可以只执行一次,开发公司也可以定期执行。开发公司您可以随时添加新作业开发公司或删除旧作业。开发公司如果您将作业存储在数据库中,开发公司那么调度程序重启后它们也将存活下来并保持其状态。当调度器重新启动时,它将运行它在离线时应该运行的所有作业,。

pip install flask-apscheduler

 实例展示

 

使用flask配置启动定时任务

APSchedule可以使用很多方式进行启动任务,比如interval,或者cron等等,下面就分别介绍一下这两种方式启动任务。

interval间隔时间执行

我们可以通过配置如下参数来每间隔多少时间来启动任务

  1. JOBS = [
  2. {
  3. 'id': 'job1',
  4. 'func': 'scheduler:task',
  5. 'args': (1, 2),
  6. 'trigger': 'interval',
  7. 'seconds': 10
  8. }
  9. ]

其中func表示你要启动的函数,trigger表示触发方式,这里使用的interval表示间隔触发,second表示间隔的时间长短。

我们可以通过flask配置启动定时任务,例子如下

 

  1. from flask import Flask
  2. import datetime
  3. from flask_apscheduler import APScheduler
  4. aps = APScheduler()
  5. class Config(object):
  6. JOBS = [
  7. {
  8. 'id': 'job1',
  9. 'func': 'scheduler:task',
  10. 'args': (1, 2),
  11. 'trigger': 'interval',
  12. 'seconds': 10
  13. }
  14. ]
  15. SCHEDULER_API_ENABLED = True
  16. def task(a, b):
  17. print(str(datetime.datetime.now()) + ' execute task ' + '{}+{}={}'.format(a, b, a + b))
  18. if __name__ == '__main__':
  19. app = Flask(__name__)
  20. app.config.from_object(Config())
  21. scheduler = APScheduler()
  22. scheduler.init_app(app)
  23. scheduler.start()
  24. app.run(port=8000)

 上述代码中,通过APScheduler每间隔10秒钟执行一次task函数。

cron启动任务

是Linux中定时任务启动程序,我们可以通过配置crontab的配置文件来定时启动任务。在APScheduler中也可以通过cron的形式来定时启动任务。下载的例子来说明配置方式。

  1. from flask import Flask
  2. import datetime
  3. from flask_apscheduler import APScheduler
  4. aps = APScheduler()
  5. class Config(object):
  6. JOBS = [
  7. {
  8. 'id': 'job1',
  9. 'func': 'scheduler:task',
  10. 'args': (1, 2),
  11. 'trigger': 'cron',
  12. 'day': '*',
  13. 'hour': '13',
  14. 'minute': '16',
  15. 'second': '20'
  16. }
  17. ]
  18. SCHEDULER_API_ENABLED = True
  19. def task(a, b):
  20. print(str(datetime.datetime.now()) + ' execute task ' + '{}+{}={}'.format(a, b, a + b))
  21. if __name__ == '__main__':
  22. app = Flask(__name__)
  23. app.config.from_object(Config())
  24. scheduler = APScheduler()
  25. scheduler.init_app(app)
  26. scheduler.start()
  27. app.run(port=8000)

 上述的代码表示,在每天的13:16:20秒启动task()函数。其实看配置就能理解意思,一目了然,其中*代表任意的意思。

使用定时启动任务

除了上面通过配置的方式来启动定时任务外,我们还可以使用装饰器的方式来定时启动任务。例子如下所示

  1. from flask import Flask
  2. from flask_apscheduler import APScheduler
  3. import datetime
  4. class Config(object):
  5. SCHEDULER_API_ENABLED = True
  6. scheduler = APScheduler()
  7. # interval examples
  8. @scheduler.task('interval', id='do_job_1', seconds=30, misfire_grace_time=900)
  9. def job1():
  10. print(str(datetime.datetime.now()) + ' Job 1 executed')
  11. # cron examples
  12. @scheduler.task('cron', id='do_job_2', minute='*')
  13. def job2():
  14. print(str(datetime.datetime.now()) + ' Job 2 executed')
  15. @scheduler.task('cron', id='do_job_3', week='*', day_of_week='sun')
  16. def job3():
  17. print(str(datetime.datetime.now()) + ' Job 3 executed')
  18. @scheduler.task('cron', id='do_job_3', day='*', hour='13', minute='26', second='05')
  19. def job4():
  20. print(str(datetime.datetime.now()) + ' Job 4 executed')
  21. if __name__ == '__main__':
  22. app = Flask(__name__)
  23. app.config.from_object(Config())
  24. # it is also possible to enable the API directly
  25. # scheduler.api_enabled = True
  26. scheduler.init_app(app)
  27. scheduler.start()
  28. app.run(port=8000)

上述代码的含义如下:

  1. job1: 每间隔30s执行一次函数
  2. job2: 每分钟执行一次函数
  3. job3: 每周的星期天执行一次函数
  4. job4: 每天的13:26:05时刻执行一次函数
网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发