一、队列
队列是一种列表,不同的是队列只能在队尾插入元素,在队首删除元素。队列用于存储按顺序排列的数据,先进先出,这点和栈不一样,在栈中,最后入栈的元素反而被优先处理。可以将队列想象成在银行前排队的人群,排在最前面的人第一个办理业务,新来的人只能在后面排队,直到轮到他们为止。但是在python中,它内置了一个queue模块,它不但提供普通的队列,还提供一些特殊的队列
- queue.Queue :先进先出队列
- queue.LifoQueue :后进先出队列
- queue.PriorityQueue :优先级队列
- queue.deque :双向队列
队列的使用场景:
- 提高并发
- 流量削峰
- 程序解耦
先进先出模型:
import queueq=queue.Queue()q.put(1) #给队列传三个值q.put(2)q.put(3)size=q.qsize() #查看队列里的个数print(size)print('第一次取值',q.get() ) #取出队列的一个值q.task_done() #调用告诉队列该任务已经处理完毕print('第二次取值:',q.get())q.task_done()print('第三次取值:',q.get())q.task_done() #每取出一个需要告诉joinq.join() #阻塞调用线程,知道队列中的所有任务被处理掉.print('此时队列剩余:',q.qsize())#显示结果3第一次取值 1第二次取值: 2第三次取值: 3此时队列剩余: 0
主要方法:
Queue.qsize()
返回队列的近似大小。注意,队列大小大于0并不保证接下来的get()调用不会被阻塞,队列大小小于maxsize也不保证接下来的put()调用不会被阻塞。Queue.empty()
如果队列为空返回True,否则返回False。如果empty()返回True并不保证接下来的put()调用不会被阻塞。类似的,如果empty()返回False也不能保证接下来的get()调用不会被阻塞。Queue.full()
如果队列是满的返回True,否则返回False。如果full()返回True并不能保证接下来的get()调用不会被阻塞。类似的,如果full()返回False并不能保证接下来的put()调用不会被阻塞。Queue.put(item[, block[, timeout]])
将item放入队列中。如果可选的参数block为真且timeout为空对象(默认的情况,阻塞调用,无超时),如有必要(比如队列满),阻塞调用线程,直到有空闲槽可用。如果timeout是个正整数,阻塞调用进程最多timeout秒,如果一直无空闲槽可用,抛出Full异常(带超时的阻塞调用)。如果block为假,如果有空闲槽可用将数据放入队列,否则立即抛出Full异常(非阻塞调用,timeout被忽略)。Queue.put_nowait(item)
等同于put(item, False)(非阻塞调用)。Queue.get([block[, timeout]])
从队列中移除并返回一个数据。如果可选的参数block为真且timeout为空对象(默认的情况,阻塞调用,无超时),阻塞调用进程直到有数据可用。如果timeout是个正整数,阻塞调用进程最多timeout秒,如果一直无数据可用,抛出Empty异常(带超时的阻塞调用)。如果block为假,如果有数据可用返回数据,否则立即抛出Empty异常(非阻塞调用,timeout被忽略)。Queue.get_nowait()
等同于get(False)(非阻塞调用)。为了跟踪入队任务被消费者线程完全的处理掉,Queue对象提供了两个额外的方法。
Queue.task_done()
意味着之前入队的一个任务已经完成。由队列的消费者线程调用。每一个get()调用得到一个任务,接下来的task_done()调用告诉队列该任务已经处理完毕。如果当前一个join()正在阻塞,它将在队列中的所有任务都处理完时恢复执行(即每一个由put()调用入队的任务都有一个对应的task_done()调用)。
如果该方法被调用的次数多于被放入队列中的任务的个数,ValueError异常会被抛出。
后进先出:
import queueq=queue.LifoQueue() #后进先出模式q.put(1) #给队列传三个值q.put(2)q.put(3)size=q.qsize() #查看队列里的个数print(size)print('第一次取值',q.get() ) #取出队列的一个值q.task_done() #调用告诉队列该任务已经处理完毕print('第二次取值:',q.get())q.task_done()print('第三次取值:',q.get())q.task_done() #每取出一个需要告诉joinq.join() #阻塞调用线程,知道队列中的所有任务被处理掉.print('此时队列剩余:',q.qsize())#显示结果3第一次取值 3第二次取值: 2第三次取值: 1此时队列剩余: 0
优先级:
import queueq=queue.PriorityQueue()q.put((5,"tom1")) #给队列传三个值q.put((2,"jerry2"))q.put((3,"Hello"))size=q.qsize() #查看队列里的个数print(size)print('第一次取值',q.get() ) #取出队列的一个值q.task_done() #调用告诉队列该任务已经处理完毕print('第二次取值:',q.get())q.task_done()print('第三次取值:',q.get())q.task_done() #每取出一个需要告诉joinq.join() #阻塞调用线程,知道队列中的所有任务被处理掉.print('此时队列剩余:',q.qsize())#显示结果3第一次取值 (2, 'jerry2') #值越小,权重越高第二次取值: (3, 'Hello')第三次取值: (5, 'tom1')此时队列剩余: 0
双向队列:
q=queue.deque()q.append(123)q.append(456)q.append(4789)q.appendleft(90)q.appendleft(234)q.appendleft(235)print(q.pop()) #从右边取print(q.popleft()) #从左边去#显示结果4789235
二、由队列引发的生产者消费者模型:
什么是生产者消费者模式?
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者之间彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔该阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
这个阻塞队列就是用来给生产者和消费者解耦的。纵观大多数设计模式,都会找一个第三者出来进行解耦,如工厂模式的第三者是工厂类,模板模式的第三者是模板类。在学习一些设计模式的过程中,如果先找到这个模式的第三者,能帮助我们快速熟悉一个设计模式。
为什么要使用生产者和消费者模式?
在线程的世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
以下实例,就是一个同时有一百个人请求,三个后台处理情况,
主要知识点:
多线程、队列
import threadingimport queueimport timeq=queue.Queue()def productor(arg): ''' 买票 :param arg: :return: ''' q.put(arg,"请求") #把请求都放在队列中for i in range(100): #生成100 个请求 t=threading.Thread(target=productor,args=(i,)) t.start()def consumer(arg): ''' 服务后台处理请求 :param arg: :return: ''' while True: print(arg,q.get()) #哪一个线程处理的和谁的请求 print("请求个数%s"%q.qsize()) #队列里还剩多少个请求 time.sleep(1)for j in range(3): #后台处理,多线程 t=threading.Thread(target=consumer,args=(j,)) t.start()#显示结果0 0请求个数991 1请求个数982 2请求个数971 3请求个数960 4请求个数952 5请求个数940 6请求个数932 7请求个数921 8请求个数911 9请求个数900 10请求个数89。。。。
线程池:
一个比较篓的实例:
在使用多线程处理任务时也不是线程越多越好,由于在切换线程的时候,需要切换上下文环境,依然会造成cpu的大量开销。为解决这个问题,线程池的概念被提出来了。预先创建好一个较为优化的数量的线程,让过来的任务立刻能够使用,就形成了线程池。在python中,没有内置的较好的线程池模块,需要自己实现或使用第三方模块。下面是一个简单的线程池:
import threadingimport timeimport queueclass Mythrea(): def __init__(self,maxsize=5): self.maxsize=maxsize self.q=queue.Queue(maxsize) for i in range(maxsize): self.q.put(threading.Thread) def get_thread(self): return self.q.get() def put_thread(self): self.q.put(threading.Thread)pool=Mythrea(5)def task(arg,p): print(arg) time.sleep(1) p.put_thread()for i in range(100): t=pool.get_thread() obj=t(target=task,args=(i,pool,)) obj.start()
这个是程序已启动,是把线程当做元素传到队列中,每个线程使用后都被抛弃了,没有重复利用,而且一开始就把线程开满了,无论有没有被利用,此版本性能差,比较糙!
再来一个nb的版本:
#更新版本2, 自定义线程池import queueimport threadingimport timeimport contextlibStopEvent = object()class ThreadPool: def __init__(self,max_num, max_task_num = None ): if max_task_num: self.q = queue.Queue(max_task_num) else: self.q = queue.Queue() self.max_num = max_num self.cancel = False self.terminal = False self.generate_list = [] self.free_list = [] def run(self,func, args, callback=None): ''' 线程池执行一个任务 :param func: 任务函数 :param args: 任务函数所需要的参数 :param callback: 任务执行失败或者成功之后执行的回调函数, 回调函数有两个参数: 1. 任务函数的执行状态 2. 任务函数返回值(默认为None,即,不执行回调函数) :return: 如果线程已经终止, 则返回True,否则None ''' if self.cancel: return True if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: self.generate_thread() w = (func, args, callback,) self.q.put(w) def generate_thread(self): ''' 创建一个线程 :return: ''' t = threading.Thread(target=self.call) t.start() def call(self): ''' 循环去获取任务函数并执行任务函数 ''' current_thread = threading.currentThread self.generate_list.append(current_thread) event = self.q.get() while event != StopEvent: func, arguments, callback = event try: res = func(*arguments) success = True except Exception: success = False res = None if callback is not None: try: callback(success, res) except Exception: pass with self.worker_state(self.free_list, current_thread): if self.terminal: event = StopEvent else: event = self.q.get() else: self.generate_list.remove(current_thread) def terminal(self): ''' 无论是否还有任务,终止线程 :return: ''' self.terminal = True while self.generate_list: self.q.put(StopEvent) self.q.empty() def close(self): ''' 执行完所有任务后,所有线程都停止 :return: ''' self.cancel = True full_size = len(self.generate_list) while full_size: self.q.put(StopEvent) full_size -= 1 @contextlib.contextmanager def worker_state(self,state_list, worker_thread): ''' 用于记录线程中正在等候的线程数 :param state_list: :param worker_thread: :return: ''' state_list.append(worker_thread) try: yield finally: state_list.remove(worker_thread)#How to usepool = ThreadPool(5)def callback(status, result): # status, execute action status # result, execute action return value passdef action(i): print(i)for i in range(30): res = pool.run(action, (i,), callback)time.sleep(2)print(len(pool.generate_list), len(pool.free_list))print(len(pool.generate_list), len(pool.free_list))pool.close()# pool.terminal()