APScheduler的使用详解

发布时间:2025-05-24 16:41:33 作者:益华网络 来源:undefined 浏览量(1) 点赞(1)
摘要:1.简介 APScheduler 是一款Python开发的定时任务工具, 跨平台运行, 不依赖Linux系统的crontab服务, 在windows上也可以运行 官方文档的地址是 https://apscheduler.readthedocs.io/en/latest/index.html

  1.简介

  APScheduler 是一款Python开发的定时任务工具, 跨平台运行, 不依赖Linux系统的crontab服务, 在windows上也可以运行

  官方文档的地址是 https://apscheduler.readthedocs.io/en/latest/index.html

  简单介绍

  APScheduler具有四种组件

  触发器(triggers) 指定定时任务的执行的时机

  存储器(job stores) 可以定时持久化存储, 可以保存在数据库中或redis

  # 存储在redis中

  from apscheduler.jobstores.redis import RedisJobStore

  # 存储在mongo中

  from apscheduler.jobstores.mongodb import MongoDBJobStore

  # 存储在数据库中

  from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

  执行器(executors) 在定时任务执行时, 进程或者线程的方式执行任务

  调度器(schedulers)

  # 以后台的方式运行

  from apscheduler.schedulers.background import BackgroundScheduler

  # 以阻塞的方式运行, 前台运行

  from apscheduler.schedulers.background import BlockingScheduler

  对添加的任务可以做持久保存

  2.安装

  pip install apscheduler

  3. 触发器 Trigger

  date在特定的时间日期执行

  from datetime import date

  from apscheduler.schedulers.blocking import BlockingScheduler

  sched = BlockingScheduler()

  def my_job(text):

  print(text)

  # 在2019年11月6日00:00:00执行

  sched.add_job(my_job, date, run_date=date(2019, 11, 6))

  # 在2019年11月6日16:30:05, 可以指定运行的详细时间

  sched.add_job(my_job, date, run_date=datetime(2009, 11, 6, 16, 30, 5))

  # 运行时间也可以是字符串的形式

  sched.add_job(my_job, date, run_date=2009-11-06 16:30:05, args=[text])

  # 立即执行

  sched.add_job(my_job, date)

  sched.start()

  interval:以固定的时间间隔运行作业时使用

  weeks (int) – 间隔的周数

  days (int) – 间隔的天数

  hours (int) – 间隔的小时

  minutes (int) –间隔的分钟

  seconds (int) – 间隔的秒

  start_date (datetime|str) – 间隔时间的起点

  end_date (datetime|str) – 间隔时间的结束点

  timezone (datetime.tzinfo|str) – 时区

  jitter (int|None) – 将作业执行 延迟的时间

  from datetime import datetime

  # 每两小时执行一次

  sched.add_job(job_function, interval, hours=2)

  # 在2018年10月10日09:30:00 到2019年6月15日11:00:00的时间内,每两小时执行一次

  sched.add_job(job_function, interval, hours=2, start_date=2018-10-10 09:30:00, end_date=2019-06-15 11:00:00)

  cron:在一天中的特定时间定期运行作业时使用

  常见的参数

  year (int|str) – 4位数的年份

  month (int|str) – month (1-12)

  day (int|str) – day (1-31)

  week (int|str) – ISO week (1-53)

  day_of_week (int|str) –工作日的编号或名称(0-6或周一,周二,周三,周四,周五,周六,周日)

  hour (int|str) – 小时(0-23)

  minute (int|str) – 分钟 (0-59)

  second (int|str) – 秒 (0-59)

  start_date (datetime|str) –最早触发的日期/时间(包括)

  end_date (datetime|str) – 结束触发的日期/时间(包括)

  timezone (datetime.tzinfo|str) – 时区

  jitter (int|None) – 将执行作业延迟几秒执行

  常见的表达式类型

  # 在6、7、8、11、12月的第三个周五的00:00, 01:00, 02:00和03:00 执行

  sched.add_job(job_function, cron, month=6-8,11-12, day=3rd fri, hour=0-3)

  # 在2014年5月30日前的周一到周五的5:30执行

  sched.add_job(job_function, cron, day_of_week=mon-fri, hour=5, minute=30, end_date=2014-05-30)

  # 执行的方式 用装饰器的形式, 每个月的最后一个星期日执行

  @sched.scheduled_job(cron, id=my_job_id, day=last sun)

  def some_decorated_task():

  print("I am printed at 00:00:00 on the last Sunday of every month!")

  # 可以使用标准的crontab表达式执行

  sched.add_job(job_function, CronTrigger.from_crontab(0 0 1-15 may-aug *))

  # 延迟120秒执行

  sched.add_job(job_function, cron, hour=*, jitter=120)

  calendarinterval:在一天的特定时间以日历为基础的间隔运行作业时使用

  参数和 interval 中的参数设置相同

  from datetime import datetime

  from apscheduler.schedulers.blocking import BlockingScheduler

  def job_function():

  print("Hello World")

  sched = BlockingScheduler()

  # 每个月的15:36:00 执行这个任务

  sched.add_job(job_function, calendarinterval, months=1, hour=15, minute=36)

  # 从今天开始 每两个月的 15点36分执行, 时间范围是 2019-6-16到 2020-3-26

  sched.add_job(job_function, calendarinterval, months=2, start_date=2019-06-16,

  end_date=2020-03-16, hour=15, minute=36)

  sched.start()

  4. 储存器

  REDIS_CONF = {

  "password": "xxxxx",

  "host": "192.168.137.120",

  "port": 6379,

  "db": 0}

  from apscheduler.jobstores.redis import RedisJobStore

  from apscheduler.jobstores.mongodb import MongoDBJobStore

  from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

  # 存储器

  job_stores = {

  # 使用redis存储

  redis: RedisJobStore(jobs_key=jobs_key, run_times_key=run_times_key, **REDIS_CONF),

  # 使用mongo存储

  mongo: MongoDBJobStore(),

  # 数据库存储

  default: SQLAlchemyJobStore(url=sqlite:///jobs.sqlite)

  }

  # 执行器

  executors = {

  default: ThreadPoolExecutor(20), # 20个线程

  processpool: ProcessPoolExecutor(5) # 5个进程

  }

  job_defaults = {

  coalesce: False, # 相同任务触发多次

  max_instances: 3 # 每个任务最多同时触发三次

  }

  # 使用配置, 启动

  scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

  5. 执行器 在定时任务该执行时,以进程或线程方式执行任务

  # 线程的方式执行

  from apscheduler.executors.pool import ThreadPoolExecutor

  executors = {

  default: ThreadPoolExecutor(20) # 最多20个线程同时执行

  }

  scheduler = BackgroundScheduler(executors=executors)

  # 进程的方式

  executors = {

  default: ProcessPoolExecutor(5) # 最多5个进程同时执行

  }

  6.调度器

  BlockingScheduler: 作为独立进程时使用

  from apscheduler.schedulers.blocking import BlockingScheduler

  scheduler = BlockingScheduler()

  scheduler.start()

  # 此处程序会发生阻塞复制代码

  BackgroundScheduler 后台运行, 在框架中使用

  from apscheduler.schedulers.background import BackgroundScheduler

  scheduler = BackgroundScheduler()

  scheduler.start()

  # 此处程序不会发生阻塞复制代码

  AsyncIOScheduler : 当你的程序使用了asyncio的时候使用。

  GeventScheduler : 当你的程序使用了gevent的时候使用。

  TornadoScheduler : 当你的程序基于Tornado的时候使用。

  TwistedScheduler : 当你的程序使用了Twisted的时候使用

  QtScheduler : 如果你的应用是一个Qt应用的时候可以使用。

  7. 配置的三中方法

  方法1

  from pytz import utc

  from apscheduler.schedulers.background import BackgroundScheduler

  from apscheduler.jobstores.mongodb import MongoDBJobStore

  from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

  from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor

  jobstores = {

  mongo: MongoDBJobStore(),

  default: SQLAlchemyJobStore(url=sqlite:///jobs.sqlite)

  }

  executors = {

  default: ThreadPoolExecutor(20), # 最大线程数

  processpool: ProcessPoolExecutor(5) # 最大进程数

  }

  job_defaults = {

  coalesce: False,

  max_instances: 3 # 同一个任务启动实例的最大个数

  }

  # 配置的使用方式

  scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc) 郑州妇科检查哪家好 http://www.zzkdfk.com/

  方法2

  from apscheduler.schedulers.background import BackgroundScheduler

  # 使用字典的形式添加配置

  scheduler = BackgroundScheduler({

  apscheduler.jobstores.mongo: {

  type: mongodb

  },

  apscheduler.jobstores.default: {

  type: sqlalchemy,

  url: sqlite:///jobs.sqlite

  },

  apscheduler.executors.default: {

  class: apscheduler.executors.pool:ThreadPoolExecutor,

  max_workers: 20

  },

  apscheduler.executors.processpool: {

  type: processpool,

  max_workers: 5

  },

  apscheduler.job_defaults.coalesce: false,

  apscheduler.job_defaults.max_instances: 3,

  apscheduler.timezone: UTC,

  })

  方法3

  from pytz import utc

  from apscheduler.schedulers.background import BackgroundScheduler

  from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

  from apscheduler.executors.pool import ProcessPoolExecutor

  jobstores = {

  mongo: {type: mongodb},

  default: SQLAlchemyJobStore(url=sqlite:///jobs.sqlite)

  }

  executors = {

  default: {type: threadpool, max_workers: 20},

  processpool: ProcessPoolExecutor(max_workers=5)

  }

  job_defaults = {

  coalesce: False,

  max_instances: 3

  }

  scheduler = BackgroundScheduler()

  # 使用调度器对象的 configure属性增加 存储器, 执行器 存储器 的配置

  scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

  8. 定时任务启动

  scheduler.start()

  对于BlockingScheduler ,程序会阻塞在这,防止退出,作为独立进程时使用。

  对于BackgroundScheduler,可以在应用程序中使用。不再以单独的进程使用。

  9. 任务管理

  方式1

  job = scheduler.add_job(myfunc, interval, minutes=2) # 添加任务

  job.remove() # 删除任务

  job.pause() # 暂定任务

  job.resume() # 恢复任务

  job.shutdown() # 关闭调度

  job.shutdown(wait=False) # 不等待正在运行的任务

  方式2

  scheduler.add_job(myfunc, interval, minutes=2, id=my_job_id) # 添加任务

  scheduler.remove_job(my_job_id) # 删除任务

  scheduler.pause_job(my_job_id) # 暂定任务

  scheduler.resume_job(my_job_id) # 恢复任务

  修改调度, 修改调度的配置属性

  job.modify(max_instances=6, name=Alternate name)

  # 更改触发器

  scheduler.reschedule_job(my_job_id, trigger=cron, minute=*/5)

  获取作业列表 get_jobs() 方法, 返回的是Job实例列表

  10.日志的使用

  项目中没有使用日志记录,

  import logging

  logging.basicConfig()

  logging.getLogger(apscheduler).setLevel(logging.DEBUG)

  集成到项目中的日志中

  logger = logging.getLogger("django")

  ......

  scheduler = BackgroundScheduler(jobstores=job_stores, executors=executors, job_defaults=job_defaults)

  scheduler._logger = logger

  11.完整的例子

  REDIS_CONF = {

  "password": "xxxxx",

  "host": "192.168.137.120",

  "port": 6379,

  "db": 0}

  logger = logging.getLogger("django")

  jobs_key = collection_api_apscheduler.jobs

  run_times_key = collection_api_apscheduler.run_times

  job_stores = {

  default: RedisJobStore(jobs_key=jobs_key, run_times_key=run_times_key, **REDIS_CONF)

  }

  executors = {

  default: {type: threadpool, max_workers: 60}

  }

  job_defaults = {

  coalesce: True, # 相同任务同时触发多次时,只运行一次

  max_instances: 3,

  misfire_grace_time: 30, # 过期30秒依然执行该任务

  }

  scheduler = BackgroundScheduler(jobstores=job_stores, executors=executors, job_defaults=job_defaults)

  scheduler._logger = logger

  # 如果持久化的调度器中作业列表, 调度器继续执行

  if scheduler.get_jobs():

  scheduler.resume()

  # 添加定时任务

  scheduler.add_job(handle_news_task, date, id=handle_news_task, replace_existing=True)

  scheduler.add_job(......)

  scheduler.start()

二维码

扫一扫,关注我们

声明:本文由【益华网络】编辑上传发布,转载此文章须经作者同意,并请附上出处【益华网络】及本页链接。如内容、图片有任何版权问题,请联系我们进行处理。

感兴趣吗?

欢迎联系我们,我们愿意为您解答任何有关网站疑难问题!

您身边的【网站建设专家】

搜索千万次不如咨询1次

主营项目:网站建设,手机网站,响应式网站,SEO优化,小程序开发,公众号系统,软件开发等

立即咨询 15368564009
在线客服
嘿,我来帮您!