Python开发中多线程通讯

前言

有时候我们需要在分线程中执行方法,但是有时需要通知主线程做某些操作。

Python中多进程、多线程、协程对比

https://www.psvmc.cn/article/2021-11-24-python-async.html

queue.Queue()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import queue
import threading
import time

# 创建一个队列用于通信
task_queue = queue.Queue()


def background_thread():
# 放入一个任务到队列
task_queue.put('execute_method')


def main_thread():
while True:
# 检查队列中有无任务
if not task_queue.empty():
task = task_queue.get()
if task == 'execute_method':
print("主线程执行方法")
time.sleep(1)


if __name__ == "__main__":
# 创建后台线程
bg_thread = threading.Thread(target=background_thread)
bg_thread.daemon = True
bg_thread.start()

# 主线程
main_thread()

multiprocessing.Queue

multiprocessing.Queue 允许不同进程之间传递数据。

不同于 queue.Queue(线程安全队列),multiprocessing.Queue 是进程安全的,适合在多进程环境中使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import multiprocessing
import time

def worker(queue):
time.sleep(2) # 模拟一些工作
queue.put("execute_method") # 向队列中添加消息

def main_method():
print("Main method executed")

def main():
queue = multiprocessing.Queue()
p = multiprocessing.Process(target=worker, args=(queue,))
p.start()

while True:
if not queue.empty():
message = queue.get()
if message == "execute_method":
main_method() # 执行主进程的方法
break # 根据需要可以继续处理其他消息
time.sleep(0.1) # 避免过度占用CPU

p.join()

if __name__ == "__main__":
main()

multiprocessing.Pipe()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import multiprocessing
import time


def worker(s_conn, p_conn):
"""工作进程,负责与主进程进行双向通信"""
while True:
# 接收主进程发送的消息
message = s_conn.recv()
print(f"Worker received: {message}")

# 模拟处理时间
time.sleep(1)

# 向主进程发送响应
response = f"Response to {message}"
s_conn.send(response)


def main():
# 创建双向管道
p_comm, s_comm = multiprocessing.Pipe()

# 创建工作进程
p = multiprocessing.Process(target=worker, args=(s_comm, p_comm))
p.start()

message = f"Message From p_comm"
p_comm.send(message)
print(f"Main sent: {message}")

# 接收工作进程的响应
response = p_comm.recv()
print(f"Main received: {response}")

# 结束工作进程
p.terminate()
p.join()


if __name__ == "__main__":
main()

关键点

  • 双向通信multiprocessing.Pipe 允许两个进程之间进行双向数据传输。
  • 轻量级PipeQueue 更轻量级,适合一对一的通信。
  • 高性能Pipe 的性能通常比 Queue 更高,尤其是在频繁通信的场景下。
  • 阻塞操作send()recv() 是阻塞的,适合用来同步两个进程。

获取所在线程

1
2
3
4
import threading     

current_thread = threading.current_thread()
print(f"Worker thread: {current_thread.name} (ID: {current_thread.ident})")