Python怎么用sched模块实现定时任务
时间:2023-04-18 17:08
我们先来看下面这个案例,代码如下 那么上述的代码中,第一步首先则是实例化一个定时器,通过如下的代码 接下来我们通过 当然要是延迟的时间相等的时候,我们可以设置任务执行的优先级来指定函数方法运行的顺序,例如有如下的代码 如上述代码,尽管延迟的时间都是一样的,但是 除了让函数延迟执行,我们还可以让其重复执行,具体这样来操作,代码如下 这里我们新建了一个 同时我们还可以让任务在指定的时间执行,这里用到 我们传入其中参数使其在指定的时间,也就是距离当下的5秒钟之后来执行任务 这里仍然是调用 这里定义了两个函数, 这回我们给 output Task One - Hello, world! 上述的代码会在停顿两秒之后运行 我们给定时任务添加上取消的方法,代码如下 我们将两秒钟之后执行的 我们来写一个备份的脚本,在每天固定的时间将文件备份,代码如下 我们通过 最后我们来执行定时分发邮件的程序,代码如下 以上就是Python怎么用sched模块实现定时任务的详细内容,更多请关注Gxl网其它相关文章!牛刀小试
import schedimport timedef say_hello(name): print(f"Hello, world, {name}")scheduler = sched.scheduler(time.time, time.sleep)scheduler.enter(5, 1, say_hello, ("张三", ))scheduler.run()
import schedscheduler = sched.scheduler()
enter()
方法来执行定时任务的操作,其中的参数分别是延迟的时间、任务的优先级以及具体的执行函数和执行函数中的参数。像如上的代码就会在延迟5秒钟之后执行say_hello()
函数import schedimport timedef say_hello(name): print(f"Hello, world, {name}")def say_hello_2(name): print(f"Hello, {name}")scheduler = sched.scheduler(time.time, time.sleep)scheduler.enter(5, 2, say_hello, ("张三", ))scheduler.enter(5, 1, say_hello_2, ("李四", ))scheduler.run()
say_hello()
方法的优先级明显要比say_hello_2()
方法要低一些,因此后者会优先执行。进阶使用
import schedimport timedef say_hello(): print("Hello, world!")scheduler = sched.scheduler(time.time, time.sleep)def repeat_task(): scheduler.enter(5, 1, say_hello, ()) scheduler.enter(5, 1, repeat_task, ())repeat_task()scheduler.run()
repeat_task()
自定义函数,调用了scheduler.enter()
方法5秒钟执行一次之前定义的say_hello()
函数在固定时间执行任务
scheduler.entertabs()
方法,代码如下import schedimport timedef say_hello(): print("Hello, world!")scheduler = sched.scheduler(time.time, time.sleep)# 指定时间执行任务specific_time = time.time() + 5 # 距离现在的5秒钟之后执行scheduler.enterabs(specific_time, 1, say_hello, ())scheduler.run()
执行多个任务
enter()
方法来运行多个任务,代码如下import schedimport timedef task_one(): print("Task One - Hello, world!") def task_two(): print("Task Two - Hello, world!")scheduler = sched.scheduler(time.time, time.sleep)# 任务一在两秒钟只有执行scheduler.enter(2, 1, task_one, ())# 任务二在五秒钟之后运行scheduler.enter(5, 1, task_two, ())scheduler.run()
task_one
和task_two
里面分是同样的执行逻辑,打印出“Hello, world!”,然后task_one()
是在两秒钟之后执行而task_two()
则是在5秒钟之后执行,两者执行的优先级都是一样的。以不同的优先级执行不同的任务
task_one()
和task_two()
赋予不同的优先级,看一看执行的结果如下import schedimport timedef task_one(): print("Task One - Hello, world!") def task_two(): print("Task Two - Hello, world!")scheduler = sched.scheduler(time.time, time.sleep)# 优先级是1scheduler.enter(2, 2, task_one, ())# 优先级是2scheduler.enter(5, 1, task_two, ())scheduler.run()
Task Two - Hello, world!task_one()
函数,再停顿3秒之后执行task_two()
函数定时任务加上取消方法
import schedimport timedef task_one(): print("Task One - Hello, world!") def task_two(): print("Task Two - Hello, world!")scheduler = sched.scheduler(time.time, time.sleep)# 任务一在两秒钟只有执行task_one_event = scheduler.enter(2, 1, task_one, ())# 任务二在五秒钟之后运行task_two_event = scheduler.enter(5, 1, task_two, ())# 取消执行task_onescheduler.cancel(task_one_event)scheduler.run()
task_one()
方法给取消掉,最后就只执行了task_two()
方法,也就打印出来“Task Two - Hello, world!”执行备份程序
import schedimport timeimport shutildef backup_files(): source = '路径/files' destination = '路径二' shutil.copytree(source, destination)def schedule_backup(): # 创建新的定时器 scheduler = sched.scheduler(time.time, time.sleep) # 备份程序在每天的1点来执行 backup_time = time.strptime('01:00:00', '%H:%M:%S') backup_event = scheduler.enterabs(time.mktime(backup_time), 1, backup_files, ()) # 开启定时任务 scheduler.run()schedule_backup()
shutil
模块当中的copytree()
方法来执行拷贝文件,然后在每天的1点准时执行执行定时分发邮件的程序
import schedimport timeimport smtplibfrom email.mime.text import MIMETextdef send_email(subject, message, from_addr, to_addr, smtp_server): # 邮件的主体信息 email = MIMEText(message) email['Subject'] = subject email['From'] = from_addr email['To'] = to_addr # 发邮件 with smtplib.SMTP(smtp_server) as server: server.send_message(email)def send_scheduled_email(subject, message, from_addr, to_addr, smtp_server, scheduled_time): # 创建定时任务的示例 scheduler = sched.scheduler(time.time, time.sleep) # 定时邮件 scheduler.enterabs(scheduled_time, 1, send_email, argument=(subject, message, from_addr, to_addr, smtp_server)) # 开启定时器 scheduler.run()subject = 'Test Email'message = 'This is a test email'from_addr = 'test@example.com'to_addr = 'test@example.com'smtp_server = 'smtp.test.com'scheduled_time = time.time() + 60 # 一分钟之后执行程序send_scheduled_email(subject, message, from_addr, to_addr, smtp_server, scheduled_time)