-APScheduler介绍
Flask-APScheduler是基于APScheduler库开发的Flask拓展库。APScheduler的全称是Advanced Python 。允许您将Python开发公司代码安排为稍后执行,开发公司可以只执行一次,开发公司也可以定期执行。开发公司您可以随时添加新作业开发公司或删除旧作业。开发公司如果您将作业存储在数据库中,开发公司那么调度程序重启后它们也将存活下来并保持其状态。当调度器重新启动时,它将运行它在离线时应该运行的所有作业,。
pip install flask-apscheduler
实例展示
使用flask配置启动定时任务
APSchedule可以使用很多方式进行启动任务,比如interval,或者cron等等,下面就分别介绍一下这两种方式启动任务。
interval间隔时间执行
我们可以通过配置如下参数来每间隔多少时间来启动任务
- JOBS = [
- {
- 'id': 'job1',
- 'func': 'scheduler:task',
- 'args': (1, 2),
- 'trigger': 'interval',
- 'seconds': 10
- }
- ]
其中func
表示你要启动的函数,trigger
表示触发方式,这里使用的interval
表示间隔触发,second
表示间隔的时间长短。
我们可以通过flask配置启动定时任务,例子如下
- from flask import Flask
- import datetime
- from flask_apscheduler import APScheduler
-
- aps = APScheduler()
-
-
- class Config(object):
- JOBS = [
- {
- 'id': 'job1',
- 'func': 'scheduler:task',
- 'args': (1, 2),
- 'trigger': 'interval',
- 'seconds': 10
- }
- ]
- SCHEDULER_API_ENABLED = True
-
-
- def task(a, b):
- print(str(datetime.datetime.now()) + ' execute task ' + '{}+{}={}'.format(a, b, a + b))
-
-
- if __name__ == '__main__':
- app = Flask(__name__)
- app.config.from_object(Config())
-
- scheduler = APScheduler()
- scheduler.init_app(app)
- scheduler.start()
-
- app.run(port=8000)
上述代码中,通过APScheduler每间隔10秒钟执行一次task函数。
cron启动任务
是Linux中定时任务启动程序,我们可以通过配置crontab的配置文件来定时启动任务。在APScheduler中也可以通过cron的形式来定时启动任务。下载的例子来说明配置方式。
- from flask import Flask
- import datetime
- from flask_apscheduler import APScheduler
-
- aps = APScheduler()
-
-
- class Config(object):
- JOBS = [
- {
- 'id': 'job1',
- 'func': 'scheduler:task',
- 'args': (1, 2),
- 'trigger': 'cron',
- 'day': '*',
- 'hour': '13',
- 'minute': '16',
- 'second': '20'
- }
- ]
- SCHEDULER_API_ENABLED = True
-
-
- def task(a, b):
- print(str(datetime.datetime.now()) + ' execute task ' + '{}+{}={}'.format(a, b, a + b))
-
-
- if __name__ == '__main__':
- app = Flask(__name__)
- app.config.from_object(Config())
-
- scheduler = APScheduler()
- scheduler.init_app(app)
- scheduler.start()
-
- app.run(port=8000)
上述的代码表示,在每天的13:16:20秒启动task()
函数。其实看配置就能理解意思,一目了然,其中*代表任意的意思。
使用定时启动任务
除了上面通过配置的方式来启动定时任务外,我们还可以使用装饰器的方式来定时启动任务。例子如下所示
from flask import Flask from flask_apscheduler import APScheduler import datetime class Config(object): SCHEDULER_API_ENABLED = True scheduler = APScheduler() # interval examples @scheduler.task('interval', id='do_job_1', seconds=30, misfire_grace_time=900) def job1(): print(str(datetime.datetime.now()) + ' Job 1 executed') # cron examples @scheduler.task('cron', id='do_job_2', minute='*') def job2(): print(str(datetime.datetime.now()) + ' Job 2 executed') @scheduler.task('cron', id='do_job_3', week='*', day_of_week='sun') def job3(): print(str(datetime.datetime.now()) + ' Job 3 executed') @scheduler.task('cron', id='do_job_3', day='*', hour='13', minute='26', second='05') def job4(): print(str(datetime.datetime.now()) + ' Job 4 executed') if __name__ == '__main__': app = Flask(__name__) app.config.from_object(Config()) # it is also possible to enable the API directly # scheduler.api_enabled = True scheduler.init_app(app) scheduler.start() app.run(port=8000)上述代码的含义如下:
- job1: 每间隔30s执行一次函数
- job2: 每分钟执行一次函数
- job3: 每周的星期天执行一次函数
- job4: 每天的13:26:05时刻执行一次函数