Python程序中的进程操作-进程间通信(multiprocess.Queue)

2019-08-14

Python程序中的进程操作-进程间通信(multiprocess.Queue)

一、进程间通信

? IPC(Intal-Process Communication)

二、队列

2.1 multiprocess.Queue

创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue是实现多进程之间的数据传递。

Queue([maxsize])创建共享的进程队列。

参数:maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。

底层队列使用管道和锁定实现。

q.get([block[,timeout]])返回q中的一个项目。如果q为空,此方法阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True,如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常,

q.get_nowait():同q.get(False)方法。

q.put(item[,block[,timeout]]):将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。

q.qsize:返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotlmplementedError异常,

q.empty:如果调用此方法时q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。

q.full():如果q已满,返回为True,由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。

q.close():关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,但将在此方法完成时马上 关闭。如果q被垃圾收集,将自动调用此方法,关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如,如果某个使用者正被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。

q.cancel_join_thread:不会再进程退出时自动连接后台线程。这可以防止join_thread()方法阻塞。

q.join_thread():连接队列的后台线程。此方法用于在调用q.close()方法后,等待所有队列项被消耗。默认情况下,此方法不是q的原始创建者的所有进程调用。调用q.cancel_join_thread()方法可以禁止这种行为。

from multiprocessing import Queue

q = Queue(5)  # 括号可以传参数 表示的是这个队列的最大存储数

# 往队列中添加数据
q.put(1)
q.put(2)
q.put(3)
# print(q.full())  # 判断队列是否满了
q.put(4)
q.put(5)
# print(q.full())
# q.put(6)  # 当队列满了之后,再放入数据 不会报错 会原地等待 直到队列中有数据被取走(阻塞态)

print(q.get())
print(q.get())
print(q.get())
# print(q.empty())  # 判断队列中的数据是否取完
print(q.get())
print(q.get())
# print(q.empty())
print(q.get_nowait())  # 取值 没有值不等待直接报错
# print(q.get())  # 当队列中的数据被取完之后 再次获取 程序会阻塞 知道有人往队列中放入值

full,get_nowait,empty都不使用于多进程的情况。

小结

Queue(5)  # 括号可以传参数 表示的是这个队列的最大存储数
q.put(1)  # 往队列中添加数据
q.full()  # 判断队列是否满了
q.put(6)  # 当队列满了之后,再放入数据 不会报错 会原地等待 直到队列中有数据被取走(阻塞态)
q.get()  # 从队列取值 只要有值就取
q.empty() # 判断队列中的数据是否取完
q.get_nowait()  # 取值 没有值不等待直接报错
q.get()  # 当队列中的数据被取完之后 再次获取 程序会阻塞 知道有人往队列中放入值

进程间通信

子进程放数据 主进程获取数据,两个子进程相互放 取数据

from multiprocessing import Process,Queue

def producer(q):
    q.put("hello GF~")  # 往通道发值

def consumer(q):
    print(q.get())  # 从通道取值


if __name__ == '__main__':
    q = Queue()  # 通道的作用
    p = Process(target=producer,args=(q,))  # 产生子进程p
    c = Process(target=consumer,args=(q,))  # 产生子进程c
    p.start()
    c.start()
# hello GF~

生产者消费者模型

生产者:生产/制造数据的
消费者:消费/处理数据的
例子:做包子的,买包子的
1.做包子远比买包子的多
2.做包子的远比包子的少
供需不平衡的问题
from multiprocessing import Process,Queue,JoinableQueue
import random
import time


def producer(name,food,q):
    for i in range(10):
        data = f"{name}生产了{food}{i}"
        time.sleep(random.random())
        q.put(data)
        print(data)

def consumer(name,q):
    while True:
        data = q.get()
        if data == None:break
        print(f"{name}吃了{data}")
        time.sleep(random.random())
        q.task_done()  # 告诉队列你已经从队列中取出了一个数据 并且处理完毕了


if __name__ == '__main__':
    q = JoinableQueue()

    p = Process(target=producer,args=("大厨egon","馒头",q))
    p1 = Process(target=producer,args=("跟班tank",'生蚝',q))

    c = Process(target=consumer,args=('nick',q))
    c1 = Process(target=consumer,args=("吃货jerry",q))
    p.start()
    p1.start()
    # 下面的意思是要确保所有生产者的产生东西已经完全生产出来
    c.daemon = True  # 守护c进程  主进程代码运行结束,守护进程随即终止。
    c1.daemon = True # 守护c进程  主进程代码运行结束,守护进程随即终止。
    c.start()
    c1.start()
    # 这两句的代码意思是要确定每一个消费者已经吃完了通道中的食物才结束。
    p.join()  # 主进程等待子进程的结束
    p1.join() # 主进程等待子进程的结束
    q.join()  # 等到队列中数据全部取出
    # q.put(None)
    # q.put(None)