分享好友 化工资讯首页 化工资讯分类 切换频道

让 Python 程序定时执行的 8 种姿势~

2023-01-11 23:44680admin
#python 14 #PyCharm教程 46 #Python架构师 80
作者:钱魏Way 来源:biaodianfu.com/python-schedule.html

在日常工作中,我们常常会用到需要周期性执行的任务,一种方式是采用 Linux 系统自带的 crond 结合命令行实现,另外一种方式是直接使用Python。

最近我整理了一下 Python 定时任务的实现方式,内容较长,建议收藏后学习,梳理不易,有所收获,点赞支持。

我们开始学习吧!

目录

点击领取Python面试题手册

Python从入门到进阶知识手册

1. 利用while True: + sleep()实现定时任务

位于 time 模块中的 sleep(secs) 函数,可以实现令当前执行的线程暂停 secs 秒后再继续执行。所谓暂停,即令当前线程进入阻塞状态,当达到 sleep() 函数规定的时间后,再由阻塞状态转为就绪状态,等待 CPU 调度。

基于这样的特性我们可以通过while死循环+sleep()的方式实现简单的定时任务。

代码示例:

import datetime import time def time_printer () : now = datetime.datetime.now() ts = now.strftime( ’%Y-%m-%d %H:%M:%S’ ) print( ’do func time :’ , ts) def loop_monitor () : while True : time_printer() time.sleep( 5 ) # 暂停5秒 if __name__ == "__main__" : loop_monitor()

主要缺点:

2. 使用Timeloop库运行定时任务

Timeloop是一个库,可用于运行多周期任务。这是一个简单的库,它使用decorator模式在线程中运行标记函数。

示例代码:

import time from timeloop import Timeloop from datetime import timedelta tl = Timeloop() @tl.job(interval=timedelta(seconds=2)) def sample_job_every_2s () : print "2s job current time : {}" .format(time.ctime()) @tl.job(interval=timedelta(seconds=5)) def sample_job_every_5s () : print "5s job current time : {}" .format(time.ctime()) @tl.job(interval=timedelta(seconds=10)) def sample_job_every_10s () : print "10s job current time : {}" .format(time.ctime())

3. 利用threading.Timer实现定时任务

threading 模块中的 Timer 是一个非阻塞函数,比 sleep 稍好一点,timer最基本理解就是定时器,我们可以启动多个定时任务,这些定时器任务是异步执行,所以不存在等待顺序执行问题。

Timer(interval, function, args=[ ], kwargs={ })

代码示例:

备注:Timer只能执行一次,这里需要循环调用,否则只能执行一次

4. 利用内置模块sched实现定时任务

sched模块实现了一个通用事件调度器,在调度器类使用一个延迟函数等待特定的时间,执行任务。同时支持多线程应用程序,在每个任务执行后会立刻调用延时函数,以确保其他线程也能执行。

class sched.scheduler(timefunc, delayfunc)这个类定义了调度事件的通用接口,它需要外部传入两个参数,timefunc是一个没有参数的返回时间类型数字的函数(常用使用的如time模块里面的time),delayfunc应该是一个需要一个参数来调用、与timefunc的输出兼容、并且作用为延迟多个时间单位的函数(常用的如time模块的sleep)。

代码示例:

import datetime import time import sched def time_printer () : now = datetime.datetime.now() ts = now.strftime( ’%Y-%m-%d %H:%M:%S’ ) print( ’do func time :’ , ts) loop_monitor() def loop_monitor () : s = sched.scheduler(time.time, time.sleep) # 生成调度器 s.enter( 5 , 1 , time_printer, ()) s.run() if __name__ == "__main__" : loop_monitor()

scheduler对象主要方法:

个人点评:比threading.Timer更好,不需要循环调用。

5. 利用调度模块schedule实现定时任务

schedule是一个第三方轻量级的任务调度模块,可以按照秒,分,小时,日期或者自定义事件执行时间。schedule允许用户使用简单、人性化的语法以预定的时间间隔定期运行Python函数(或其它可调用函数)。

先来看代码,是不是不看文档就能明白什么意思?

import schedule import time def job(): print ( "I’m working..." ) schedule.every( 10 ).seconds. do (job) schedule.every( 10 ).minutes. do (job) schedule.every().hour. do (job) schedule.every().day.at( "10:30" ). do (job) schedule.every( 5 ).to( 10 ).minutes. do (job) schedule.every().monday. do (job) schedule.every().wednesday.at( "13:15" ). do (job) schedule.every().minute.at( ":17" ). do (job) while True : schedule.run_pending() time.sleep( 1 )

装饰器:通过 @repeat() 装饰静态方法

import time from schedule import every, repeat, run_pending @repeat(every().second) def job () : print( ’working...’ ) while True : run_pending() time.sleep( 1 )

传递参数:

import schedule def greet (name) : print( ’Hello’ , name) schedule.every( 2 ).seconds.do(greet, name= ’Alice’ ) schedule.every( 4 ).seconds.do(greet, name= ’Bob’ ) while True : schedule.run_pending()

装饰器同样能传递参数:

from schedule import every, repeat, run_pending @repeat(every().second, ’World’) @repeat(every().minute, ’Mars’) def hello (planet) : print( ’Hello’ , planet) while True : run_pending()

取消任务:

import schedule i = 0 def some_task () : global i i += 1 print(i) if i == 10 : schedule.cancel_job(job) print( ’cancel job’ ) exit( 0 ) job = schedule.every().second.do(some_task) while True : schedule.run_pending()

运行一次任务:

import time import schedule def job_that_executes_once () : print( ’Hello’ ) return schedule.CancelJob schedule.every().minute.at( ’:34’ ).do(job_that_executes_once) while True : schedule.run_pending() time.sleep( 1 )

根据标签检索任务:

# 检索所有任务:schedule.get_jobs() import schedule def greet (name) : print( ’Hello {}’ .format(name)) schedule.every().day. do (greet, ’Andrea’ ).tag( ’daily-tasks’ , ’friend’ ) schedule.every().hour. do (greet, ’John’ ).tag( ’hourly-tasks’ , ’friend’ ) schedule.every().hour. do (greet, ’Monica’ ).tag( ’hourly-tasks’ , ’customer’ ) schedule.every().day. do (greet, ’Derek’ ).tag( ’daily-tasks’ , ’guest’ ) friends = schedule.get_jobs( ’friend’ ) print(friends)

根据标签取消任务:

# 取消所有任务:schedule.clear() import schedule def greet(name): print ( ’Hello {}’ .format(name)) if name == ’Cancel’ : schedule.clear( ’second-tasks’ ) print ( ’cancel second-tasks’ ) schedule.every().second. do (greet, ’Andrea’ ).tag( ’second-tasks’ , ’friend’ ) schedule.every().second. do (greet, ’John’ ).tag( ’second-tasks’ , ’friend’ ) schedule.every().hour. do (greet, ’Monica’ ).tag( ’hourly-tasks’ , ’customer’ ) schedule.every( 5 ).seconds. do (greet, ’Cancel’ ).tag( ’daily-tasks’ , ’guest’ ) while True : schedule.run_pending()

运行任务到某时间:

import schedule from datetime import datetime, timedelta, time def job () : print( ’working...’ ) schedule.every().second. until ( ’23:59’ ). do (job) # 今天23:59停止 schedule.every().second. until ( ’2030-01-01 18:30’ ). do (job) # 2030-01-01 18:30停止 schedule.every().second. until (timedelta(hours= 8 )). do (job) # 8小时后停止 schedule.every().second. until (time( 23 , 59 , 59 )). do (job) # 今天23:59:59停止 schedule.every().second. until (datetime( 2030 , 1 , 1 , 18 , 30 , 0 )). do (job) # 2030-01-01 18:30停止 while True: schedule.run_pending()

马上运行所有任务(主要用于测试):

import schedule def job () : print( ’working...’ ) def job1 () : print( ’Hello...’ ) schedule.every().monday.at( ’12:40’ ).do(job) schedule.every().tuesday.at( ’16:40’ ).do(job1) schedule.run_all() schedule.run_all(delay_seconds= 3 ) # 任务间延迟3秒

并行运行:使用 Python 内置队列实现:

import threading import time import schedule def job1 () : print( "I’m running on thread %s" % threading.current_thread()) def job2 () : print( "I’m running on thread %s" % threading.current_thread()) def job3 () : print( "I’m running on thread %s" % threading.current_thread()) def run_threaded (job_func) : job_thread = threading.Thread(target=job_func) job_thread.start() schedule.every( 10 ).seconds.do(run_threaded, job1) schedule.every( 10 ).seconds.do(run_threaded, job2) schedule.every( 10 ).seconds.do(run_threaded, job3) while True : schedule.run_pending() time.sleep( 1 )

6. 利用任务框架APScheduler实现定时任务

APScheduler(advanceded python scheduler)基于Quartz的一个Python定时任务框架,实现了Quartz的所有功能,使用起来十分方便。提供了基于日期、固定时间间隔以及crontab类型的任务,并且可以持久化任务。基于这些功能,我们可以很方便的实现一个Python定时任务系统。

它有以下三个特点:

APScheduler有四种组成部分:

示例代码:

from apscheduler.schedulers.blocking import BlockingScheduler from datetime import datetime # 输出时间 def job () : print(datetime.now().strftime( "%Y-%m-%d %H:%M:%S" )) # BlockingScheduler sched = BlockingScheduler() sched.add_job(my_job, ’interval’ , seconds= 5 , id= ’my_job_id’ ) sched.start()

 6.1 Job 作业

Job作为APScheduler最小执行单位。创建Job时指定执行的函数,函数中所需参数,Job执行时的一些设置信息。

构建说明:

 6.2 Trigger 触发器

Trigger绑定到Job,在scheduler调度筛选Job时,根据触发器的规则计算出Job的触发时间,然后与当前时间比较确定此Job是否会被执行,总之就是根据trigger规则计算出下一个执行时间。

目前APScheduler支持触发器:

触发器参数:date

date定时,作业只执行一次。

sched.add_job(my_job, ’date’ , run_date=date( 2009 , 11 , 6 ), args=[ ’text’ ]) sched.add_job(my_job,  ’date’ , run_date=datetime( 20197616305 ), args=[ ’text’ ])

触发器参数:interval

interval间隔调度

sched.add_job(job_function, ’interval’ , hours= 2 )

触发器参数:cron

cron调度

CronTrigger可用的表达式:

# 6-8,11-12月第三个周五 00:00, 01:00, 02:00, 03:00运行 sched.add_job(job_function, ’cron’ , month= ’6-8,11-12’ , day= ’3rd fri’ , hour= ’0-3’ ) # 每周一到周五运行 直到2024-05-30 00:00:00 sched.add_job(job_function,  ’cron’ , day_of_week= ’mon-fri’ , hour=5, minute=30, end_date= ’2024-05-30’

 6.3 Executor 执行器

Executor在scheduler中初始化,另外也可通过scheduler的add_executor动态添加Executor。每个executor都会绑定一个alias,这个作为唯一标识绑定到Job,在实际执行时会根据Job绑定的executor找到实际的执行器对象,然后根据执行器对象执行Job。

Executor的种类会根据不同的调度来选择,如果选择AsyncIO作为调度的库,那么选择AsyncIOExecutor,如果选择tornado作为调度的库,选择TornadoExecutor,如果选择启动进程作为调度,选择ThreadPoolExecutor或者ProcessPoolExecutor都可以。

Executor的选择需要根据实际的scheduler来选择不同的执行器。目前APScheduler支持的Executor:

 6.4 Jobstore 作业存储

Jobstore在scheduler中初始化,另外也可通过scheduler的add_jobstore动态添加Jobstore。每个jobstore都会绑定一个alias,scheduler在Add Job时,根据指定的jobstore在scheduler中找到相应的jobstore,并将job添加到jobstore中。作业存储器决定任务的保存方式, 默认存储在内存中(MemoryJobStore),重启后就没有了。APScheduler支持的任务存储器有:

不同的任务存储器可以在调度器的配置中进行配置(见调度器)

 6.5 Event 事件

Event是APScheduler在进行某些操作时触发相应的事件,用户可以自定义一些函数来监听这些事件,当触发某些Event时,做一些具体的操作。常见的比如。Job执行异常事件 EVENT_JOB_ERROR。Job执行时间错过事件 EVENT_JOB_MISSED。

目前APScheduler定义的Event:

Listener表示用户自定义监听的一些Event,比如当Job触发了EVENT_JOB_MISSED事件时可以根据需求做一些其他处理。

 6.6 调度器

Scheduler是APScheduler的核心,所有相关组件通过其定义。scheduler启动之后,将开始按照配置的任务进行调度。除了依据所有定义Job的trigger生成的将要调度时间唤醒调度之外。当发生Job信息变更时也会触发调度。

APScheduler支持的调度器方式如下,比较常用的为BlockingScheduler和BackgroundScheduler

 6.7 Scheduler的工作流程

Scheduler添加job流程:

Scheduler调度流程:

7. 使用分布式消息系统Celery实现定时任务

Celery是一个简单,灵活,可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具, 也可用于任务调度。Celery 的配置比较麻烦,如果你只是需要一个轻量级的调度工具,Celery 不会是一个好选择。

Celery 是一个强大的分布式任务队列,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)。异步任务比如是发送邮件、或者文件上传, 图像处理等等一些比较耗时的操作 ,定时任务是需要在特定时间执行的任务。

需要注意,celery本身并不具备任务的存储功能,在调度任务的时候肯定是要把任务存起来的,因此在使用celery的时候还需要搭配一些具备存储、访问功能的工具,比如:消息队列、Redis缓存、数据库等。官方推荐的是消息队列RabbitMQ,有些时候使用Redis也是不错的选择。

它的架构组成如下图:

Celery架构,它采用典型的生产者-消费者模式,主要由以下部分组成:

实际应用中,用户从Web前端发起一个请求,我们只需要将请求所要处理的任务丢入任务队列broker中,由空闲的worker去处理任务即可,处理的结果会暂存在后台数据库backend中。我们可以在一台机器或多台机器上同时起多个worker进程来实现分布式地并行处理任务。

Celery定时任务实例:

8. 使用数据流工具Apache Airflow实现定时任务

Apache Airflow 是Airbnb开源的一款数据流程工具,目前是Apache孵化项目。以非常灵活的方式来支持数据的ETL过程,同时还支持非常多的插件来完成诸如HDFS监控、邮件通知等功能。Airflow支持单机和分布式两种模式,支持Master-Slave模式,支持Mesos等资源调度,有非常好的扩展性。被大量公司采用。

Airflow使用Python开发,它通过DAGs(Directed Acyclic Graph, 有向无环图)来表达一个工作流中所要执行的任务,以及任务之间的关系和依赖。比如,如下的工作流中,任务T1执行完成,T2和T3才能开始执行,T2和T3都执行完成,T4才能开始执行。

Airflow提供了各种Operator实现,可以完成各种任务实现:

除了以上这些 Operators 还可以方便的自定义 Operators 满足个性化的任务需求。

一些情况下,我们需要根据执行结果执行不同的任务,这样工作流会产生分支。如:

这种需求可以使用BranchPythonOperator来实现。

 8.1 Airflow 产生的背景

通常,在一个运维系统,数据分析系统,或测试系统等大型系统中,我们会有各种各样的依赖需求。包括但不限于:

crontab 可以很好地处理定时执行任务的需求,但仅能管理时间上的依赖。Airflow 的核心概念 DAG(有向无环图)—— 来表现工作流。

 8.2 Airflow 核心概念

 8.3 Airflow 的架构

在一个可扩展的生产环境中,Airflow 含有以下组件:

Worker的具体实现由配置文件中的executor来指定,airflow支持多种Executor:

生产环境一般使用CeleryExecutor和KubernetesExecutor。

使用CeleryExecutor的架构如图:

使用KubernetesExecutor的架构如图:

其它参考:

程序员技术交流群

扫码进群记得备注: 城市、昵称和技术方向

  阅读更多

  1. 总结了90条简单实用的Python编程技巧
  2. 【PyCharm教程】PyCharm 创建测试
  3. 被Win11系统恶心到了

版权申明 以上内容由网友或网络信息收集整理呈现,不代表本站立场和观点!如对内容有疑问,请联系我们,谢谢!下载内容仅供研究和学习使用,务必24小时删除。
举报
收藏 0
打赏 0
评论 0
广东省科学院生态环境与土壤研究所流域水环境团队博士后招聘启事
课题组简介:流域水环境整治绿色技术与装备团队是由1名中科院院士,1名国家杰青,2名国家优青和1名海外优青等国家级人才组成

0评论2023-02-07363

汽轮机冲转暖机结束的标志到底是什么
汽轮机的启动是一个不稳定的加热过程,进行合理经济的启动就是寻求合理的加热方式,使启动过程中机组各部分热应力、热变形、胀差

0评论2023-02-07580

超冷极性分子、不平衡拓扑系统、量子微波光子学、铁电斯格明子 | 本周物理讲座
我们如何理解宇宙? | 科学公开课‍1 Understanding and controlling the loss

0评论2023-02-07314

天体物理理解的进步
以下文章来源于中国科学院理论物理研究所 ,作者韩超中国科学院理论物理研究所 . 理论物理所科研动态和综合新闻的发布;理

0评论2023-02-07345

中国科学院新疆理化技术研究所痕量化学物质感知方向招聘公告
一、招聘专业 材料、物理、化学、精仪相关专业,理论和实验相关方向均可。 二、招聘对象及支持政策享受研究所 “ 丝路人才计

0评论2023-02-07484

必看 | 2023复工复产落实“四不伤害”24条提示及反“三违”警示学习,对号入座.....
在工作之前一定要三问: 一是问问自己这样做安全吗? 如果你认为这样做安全了, 二是问一问一起工作的同事这样做安全吗? 三

0评论2023-02-07872

安全生产百错图【视频讲解】你的安全意识过关吗?
<iframe allowfullscreen="" class="video_iframe rich_pages"

0评论2023-02-07247

已致超3000人遇难!土耳其地震最新情况
点击蓝字 关注我们2月6日,土耳其一天内发生两次7.8级地震,截至目前已导致土耳其和叙利亚超3000人遇难。地震已造成土

0评论2023-02-07213

【一轮通知重磅发布】2023(第十八届)青岛国际水大会&水展
注:板块及分会场不定期更新中,请以会议正式通知为准。会议亮点01中国政府高端打造,全球各地行业组织共同支持02云集全球

0评论2023-02-07489

浙江工业大学王红宇课题组CEJ:可见光催化的Fe(VI)-Ti/Zn LDH体系高效降解水体微污染物的机制研究
以下文章来源于Environmental Advances ,作者王红宇课题组Environmental Advanc

0评论2023-02-07813