python线程安全队列的实现代码是什么
Admin 2022-08-19 群英技术资讯 477 次浏览
一个完整的线程池由下面几部分组成,线程安全队列、任务对象、线程处理对象、线程池对象。其中一个线程安全的队列是实现线程池和任务队列的基础,本节我们通过threading包中的互斥量threading.Lock()和条件变量threading.Condition()来实现一个简单的、读取安全的线程队列。
包括put、pop、get等方法,为保证线程安全,读写操作时要添加互斥锁;并且pop操作可以设置等待时间以阻塞当前获取元素的线程,当新元素写入队列时通过条件变量通知解除等待操作。
class ThreadSafeQueue(object): def __init__(self, max_size=0): self.queue = [] self.max_size = max_size # max_size为0表示无限大 self.lock = threading.Lock() # 互斥量 self.condition = threading.Condition() # 条件变量 def size(self): """ 获取当前队列的大小 :return: 队列长度 """ # 加锁 self.lock.acquire() size = len(self.queue) self.lock.release() return size def put(self, item): """ 将单个元素放入队列 :param item: :return: """ # 队列已满 max_size为0表示无限大 if self.max_size != 0 and self.size() >= self.max_size: return ThreadSafeException() # 加锁 self.lock.acquire() self.queue.append(item) self.lock.release() self.condition.acquire() # 通知等待读取的线程 self.condition.notify() self.condition.release() return item def batch_put(self, item_list): """ 批量添加元素 :param item_list: :return: """ if not isinstance(item_list, list): item_list = list(item_list) res = [self.put(item) for item in item_list] return res def pop(self, block=False, timeout=0): """ 从队列头部取出元素 :param block: 是否阻塞线程 :param timeout: 等待时间 :return: """ if self.size() == 0: if block: self.condition.acquire() self.condition.wait(timeout) self.condition.release() else: return None # 加锁 self.lock.acquire() item = None if len(self.queue): item = self.queue.pop() self.lock.release() return item def get(self, index): """ 获取指定位置的元素 :param index: :return: """ if self.size() == 0 or index >= self.size(): return None # 加锁 self.lock.acquire() item = self.queue[index] self.lock.release() return item class ThreadSafeException(Exception): pass
def thread_queue_test_1(): thread_queue = ThreadSafeQueue(10) def producer(): while True: thread_queue.put(random.randint(0, 10)) time.sleep(2) def consumer(): while True: print('current time before pop is %d' % time.time()) item = thread_queue.pop(block=True, timeout=3) # item = thread_queue.get(2) if item is not None: print('get value from queue is %s' % item) else: print(item) print('current time after pop is %d' % time.time()) t1 = threading.Thread(target=producer) t2 = threading.Thread(target=consumer) t1.start() t2.start() t1.join() t2.join()
测试结果:
我们可以看到生产者线程每隔2s向队列写入一个元素,消费者线程当无数据时默认阻塞3s。通过执行时间发现消费者线程确实发生了阻塞,当生产者写入数据时结束当前等待操作。
def thread_queue_test_2(): thread_queue = ThreadSafeQueue(10) def producer(): while True: thread_queue.put(random.randint(0, 10)) time.sleep(2) def consumer(name): while True: item = thread_queue.pop(block=True, timeout=1) # item = thread_queue.get(2) if item is not None: print('%s get value from queue is %s' % (name, item)) else: print('%s get value from queue is None' % name) t1 = threading.Thread(target=producer) t2 = threading.Thread(target=consumer, args=('thread1',)) t3 = threading.Thread(target=consumer, args=('thread2',)) t1.start() t2.start() t3.start() t1.join() t2.join() t3.join()
测试结果:
生产者还是每2s生成一个元素写入队列,消费者开启两个线程进行消费,默认阻塞时间为1s,打印结果显示通过加锁确保每次只有一个线程能获取数据,保证了线程读写的安全。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:mmqy2019@163.com进行举报,并提供相关证据,查实之后,将立刻删除涉嫌侵权内容。
猜你喜欢
python在 Python 中字符串也可以使用三个单引号或三个双引号来表示字符串,这样字符串中的内容就可以多行书写,并且被多行输出。
Python内置函数-tuple 函数。tuple 函数将可迭代系列(如列表)转换为元组。
这篇文章主要介绍了Python 多线程之threading 模块的使用,帮助大家更好的理解和学习使用python,感兴趣的朋友可以了解下
python中的str:1、Python中包含字符串,字符串的类型为str。str函数是Python的内置函数,它将参数转换成字符串类型,即人适合阅读的形式;2、主要使用有无参调用和不省略参数。
对List进行排序,Python提供了两个方法方法1 用List的内建函数list sort进行排序list sort(func=None, key=None, reverse=False)Python实
成为群英会员,开启智能安全云计算之旅
立即注册Copyright © QY Network Company Ltd. All Rights Reserved. 2003-2020 群英 版权所有
增值电信经营许可证 : B1.B2-20140078 粤ICP备09006778号 域名注册商资质 粤 D3.1-20240008