返回顶部

Python 实现定时任务的四种方案!

[复制链接]
luckyLv.2 显示全部楼层 发表于 2021-10-20 15:12:56 |阅读模式 打印 上一主题 下一主题
  1、利用 while True: + sleep() 实现定时任务

  位于 time 模块中的 sleep(secs) 函数,可以实现令当前执行的线程暂停 secs 秒后再继续执行。所谓暂停,即令当前线程进入阻塞状态,当达到 sleep() 函数规定的时间后,再由阻塞状态转为就绪状态,等待 CPU 调度。

  基于这样的特性我们可以通过 while 死循环+sleep() 的方式实现简单的定时任务。

  代码示例:


[PowerShell] 纯文本查看 复制代码
import datetime

import time

def time_printer():

    now = datetime.datetime.now()

    ts = now.strftime('%Y-%m-%d %H:%M:%S')

    print('do func time :', ts)

def loop_monitor():

    while True:

        time_printer()

        time.sleep(5)  # 暂停 5 秒

if __name__ == "__main__":

    loop_monitor()


  主要缺点:

  只能设定间隔,不能指定具体的时间,比如每天早上 8:00

  sleep 是一个阻塞函数,也就是说 sleep 这一段时间,程序什么也不能操作。

  2、使用 Timeloop 库运行定时任务

  Timeloop[2] 是一个库,可用于运行多周期任务。这是一个简单的库,它使用 decorator 模式在线程中运行标记函数。

  示例代码:

[Python] 纯文本查看 复制代码
import time

from timeloop import Timeloop

from datetime import timedelta

tl = Timeloop()

@tl.job(interval=timedelta(seconds=2))

def sample_job_every_2s():

    print "2s job current time : {}".format(time.ctime())

@tl.job(interval=timedelta(seconds=5))

def sample_job_every_5s():

    print "5s job current time : {}".format(time.ctime())

@tl.job(interval=timedelta(seconds=10))

def sample_job_every_10s():

    print "10s job current time : {}".format(time.ctime())


  3、利用 threading.Timer 实现定时任务

  threading 模块中的 Timer 是一个非阻塞函数,比 sleep 稍好一点,timer 最基本理解就是定时器,我们可以启动多个定时任务,这些定时器任务是异步执行,所以不存在等待顺序执行问题。

  Timer(interval, function, args=[ ], kwargs={ })

  interval: 指定的时间

  function: 要执行的方法

  args/kwargs: 方法的参数

  代码示例:

[Python] 纯文本查看 复制代码
import datetime

from threading import Timer

def time_printer():

    now = datetime.datetime.now()

    ts = now.strftime('%Y-%m-%d %H:%M:%S')

    print('do func time :', ts)

    loop_monitor()

def loop_monitor():

    t = Timer(5, time_printer)

    t.start()

if __name__ == "__main__":

    loop_monitor()


  4、利用内置模块 sched 实现定时任务

  sched 模块实现了一个通用事件调度器,在调度器类使用一个延迟函数等待特定的时间,执行任务。同时支持多线程应用程序,在每个任务执行后会立刻调用延时函数,以确保其他线程也能执行。

  class sched.scheduler(timefunc, delayfunc) 这个类定义了调度事件的通用接口,它需要外部传入两个参数,timefunc 是一个没有参数的返回时间类型数字的函数(常用使用的如 time 模块里面的 time),delayfunc 应该是一个需要一个参数来调用、与 timefunc 的输出兼容、并且作用为延迟多个时间单位的函数(常用的如 time 模块的 sleep)。

  代码示例:

[Python] 纯文本查看 复制代码
import datetime

import time

import sched

def time_printer():

    now = datetime.datetime.now()

    ts = now.strftime('%Y-%m-%d %H:%M:%S')

    print('do func time :', ts)

    loop_monitor()

def loop_monitor():

    s = sched.scheduler(time.time, time.sleep)  # 生成调度器

    s.enter(5, 1, time_printer, ())

    s.run()

if __name__ == "__main__":

    loop_monitor()


  scheduler 对象主要方法:

  enter(delay, priority, action, argument),安排一个事件来延迟 delay 个时间单位。

  cancel(event):从队列中删除事件。如果事件不是当前队列中的事件,则该方法将跑出一个 ValueError。

  run():运行所有预定的事件。这个函数将等待(使用传递给构造函数的 delayfunc() 函数),然后执行事件,直到不再有预定的事件。


  
【免责声明】本文部分系转载,原文来源:标点符,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如涉及作品内容、版权和其它问题,请在30日内与联系我们,我们会予以更改或删除相关文章,以保证您的权益!

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

达内教育:成立于2002年。致力于面向IT互联网行业,培养软件开发工程师、测试工程师、系统管理员、智能硬件工程师、UI设计师、网络营销、会计等职场人才 达内使命:缔造年轻人的中国梦、缔造达内员工的中国梦 达内愿景:做管理一流的教育公司
  • 商务合作

  • 微信公众号

  • Powered by Discuz! X3.4 | Copyright © 2002-2021, 达内教育 Tedu.cn
  • 京ICP备08000853号-56 |网站地图 | 京公网安备 11010802029508号