Python中定时任务

延迟执行

sched模块是 Python 内置的一个简单调度器。

阻塞调用

后续的任务会在延迟任务执行后调用

示例 延迟5秒执行

1
2
3
4
5
6
7
8
9
10
11
import sched
import time

def print_time():
print("任务执行时间:", time.strftime("%Y-%m-%d %H:%M:%S"))

my_scheduler = sched.scheduler(time.time, time.sleep)
my_scheduler.enter(10, 1, print_time)
my_scheduler.run()

print("后续代码开始执行")

这个例子中,首先创建了一个调度器对象,其中time.time用于获取当前时间,time.sleep用于暂停程序。

然后定义了一个print_time函数,它将打印当前时间。接着使用scheduler.enter()方法安排任务

  • 第一个参数10表示 10 秒后执行任务
  • 第二个参数1表示任务的优先级(数字越小优先级越高)
  • 第三个参数是要执行的任务函数。

最后通过scheduler.run()运行调度器,等待任务执行。

不阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import sched
import time
import threading

# 定义要执行的任务函数
def scan_nopaper():
print("执行扫描无纸任务")

# 创建一个调度器对象
my_scheduler = sched.scheduler(time.time, time.sleep)

# 安排一个任务,10 秒后执行 scan_nopaper 函数,优先级为 1
my_scheduler.enter(10, 1, scan_nopaper)

# 定义一个函数,用于在新线程中运行调度器
def run_scheduler():
my_scheduler.run()

# 创建一个新线程并启动
scheduler_thread = threading.Thread(target=run_scheduler)
scheduler_thread.start()

# 后续代码
print("后续代码开始执行")

间隔循环执行

添加依赖

1
pip install apscheduler

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
import time
from apscheduler.schedulers.background import BackgroundScheduler

def job():
print("任务执行,时间:", time.strftime("%Y-%m-%d %H:%M:%S"))


# 创建调度器
scheduler = BackgroundScheduler()
# 添加任务,每3秒执行一次
scheduler.add_job(job, 'interval', seconds=3)
# 启动调度器
scheduler.start()

关闭调度器

1
scheduler.shutdown()

这里使用APSchedulerBackgroundScheduler类型创建一个调度器。

通过add_job()方法添加任务,第一个参数job是要执行的任务函数,第二个参数interval表示任务执行的间隔类型(这里是按固定间隔),第三个参数seconds = 3表示间隔时间为 3 秒。

默认会在一个独立的线程(非主线程)中执行任务(job)。