python线程安全队列的实现代码是什么
Admin 2022-08-19 群英技术资讯 1044 次浏览
这篇文章给大家分享的是python线程安全队列的实现代码是什么。小编觉得挺实用的,因此分享给大家做个参考,文中的介绍得很详细,而要易于理解和学习,有需要的朋友可以参考,接下来就跟随小编一起了解看看吧。一个完整的线程池由下面几部分组成,线程安全队列、任务对象、线程处理对象、线程池对象。其中一个线程安全的队列是实现线程池和任务队列的基础,本节我们通过threading包中的互斥量threading.Lock()和条件变量threading.Condition()来实现一个简单的、读取安全的线程队列。

包括put、pop、get等方法,为保证线程安全,读写操作时要添加互斥锁;并且pop操作可以设置等待时间以阻塞当前获取元素的线程,当新元素写入队列时通过条件变量通知解除等待操作。
class ThreadSafeQueue(object): def __init__(self, max_size=0): self.queue = [] self.max_size = max_size # max_size为0表示无限大 self.lock = threading.Lock() # 互斥量 self.condition = threading.Condition() # 条件变量 def size(self): """ 获取当前队列的大小 :return: 队列长度 """ # 加锁 self.lock.acquire() size = len(self.queue) self.lock.release() return size def put(self, item): """ 将单个元素放入队列 :param item: :return: """ # 队列已满 max_size为0表示无限大 if self.max_size != 0 and self.size() >= self.max_size: return ThreadSafeException() # 加锁 self.lock.acquire() self.queue.append(item) self.lock.release() self.condition.acquire() # 通知等待读取的线程 self.condition.notify() self.condition.release() return item def batch_put(self, item_list): """ 批量添加元素 :param item_list: :return: """ if not isinstance(item_list, list): item_list = list(item_list) res = [self.put(item) for item in item_list] return res def pop(self, block=False, timeout=0): """ 从队列头部取出元素 :param block: 是否阻塞线程 :param timeout: 等待时间 :return: """ if self.size() == 0: if block: self.condition.acquire() self.condition.wait(timeout) self.condition.release() else: return None # 加锁 self.lock.acquire() item = None if len(self.queue): item = self.queue.pop() self.lock.release() return item def get(self, index): """ 获取指定位置的元素 :param index: :return: """ if self.size() == 0 or index >= self.size(): return None # 加锁 self.lock.acquire() item = self.queue[index] self.lock.release() return item class ThreadSafeException(Exception): pass
def thread_queue_test_1():
thread_queue = ThreadSafeQueue(10)
def producer():
while True:
thread_queue.put(random.randint(0, 10))
time.sleep(2)
def consumer():
while True:
print('current time before pop is %d' % time.time())
item = thread_queue.pop(block=True, timeout=3)
# item = thread_queue.get(2)
if item is not None:
print('get value from queue is %s' % item)
else:
print(item)
print('current time after pop is %d' % time.time())
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t2.start()
t1.join()
t2.join()
测试结果:
我们可以看到生产者线程每隔2s向队列写入一个元素,消费者线程当无数据时默认阻塞3s。通过执行时间发现消费者线程确实发生了阻塞,当生产者写入数据时结束当前等待操作。

def thread_queue_test_2():
thread_queue = ThreadSafeQueue(10)
def producer():
while True:
thread_queue.put(random.randint(0, 10))
time.sleep(2)
def consumer(name):
while True:
item = thread_queue.pop(block=True, timeout=1)
# item = thread_queue.get(2)
if item is not None:
print('%s get value from queue is %s' % (name, item))
else:
print('%s get value from queue is None' % name)
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer, args=('thread1',))
t3 = threading.Thread(target=consumer, args=('thread2',))
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
测试结果:
生产者还是每2s生成一个元素写入队列,消费者开启两个线程进行消费,默认阻塞时间为1s,打印结果显示通过加锁确保每次只有一个线程能获取数据,保证了线程读写的安全。

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:mmqy2019@163.com进行举报,并提供相关证据,查实之后,将立刻删除涉嫌侵权内容。
猜你喜欢
Python内置函数-print() 函数。print() 方法用于打印输出,最常见的一个函数。
这篇文章给大家分享的是Python scrapy框架的相关内容,下文介绍了创建scrapy项目的操作,scrapy框架常用命令,scrapy项目下载文件和scrapy框架的整体执行流程,对大家了解和学习scrapy框架的使用会有一定的帮助,感兴趣的朋友就继续往下看吧。
Python 海象运算符是在 PEP 572 中提出,并在 Python3.8 版本并入和发布。本文就来为大家详细讲讲Python海象运算符的用法,感兴趣的可以了解一下
大家还记不记得曾经印在我们的铅笔盒的九九乘法表?我们从小就开始学习背诵九九乘法表,经常被家长和数学老师检查背诵。九九乘法表我们进行计算的前提,没了九九乘法表,我们就难以计算。小编之前向大家介绍过用while循环打印九九乘法表,那大家知不知道如何用for循环打印九九乘法表呢?今天,小编就向大家介绍如何打印九九乘法表。
在Python里一切都是对象(object),基本数据类型,如数字,字符串,函数都是对象。对象可以由类(class)进行创建。那么既然一切都是对象,那么类是对象吗?是的,类也是对象,那么又是谁创造了类呢?答案也很简单,也是类,一个能创作类的类,称之为(type)元类
成为群英会员,开启智能安全云计算之旅
立即注册关注或联系群英网络
7x24小时售前:400-678-4567
7x24小时售后:0668-2555666
24小时QQ客服
群英微信公众号
CNNIC域名投诉举报处理平台
服务电话:010-58813000
服务邮箱:service@cnnic.cn
投诉与建议:0668-2555555
Copyright © QY Network Company Ltd. All Rights Reserved. 2003-2020 群英 版权所有
增值电信经营许可证 : B1.B2-20140078 ICP核准(ICP备案)粤ICP备09006778号 域名注册商资质 粤 D3.1-20240008