python线程安全队列的实现代码是什么
Admin 2022-08-19 群英技术资讯 766 次浏览
这篇文章给大家分享的是python线程安全队列的实现代码是什么。小编觉得挺实用的,因此分享给大家做个参考,文中的介绍得很详细,而要易于理解和学习,有需要的朋友可以参考,接下来就跟随小编一起了解看看吧。一个完整的线程池由下面几部分组成,线程安全队列、任务对象、线程处理对象、线程池对象。其中一个线程安全的队列是实现线程池和任务队列的基础,本节我们通过threading包中的互斥量threading.Lock()和条件变量threading.Condition()来实现一个简单的、读取安全的线程队列。

包括put、pop、get等方法,为保证线程安全,读写操作时要添加互斥锁;并且pop操作可以设置等待时间以阻塞当前获取元素的线程,当新元素写入队列时通过条件变量通知解除等待操作。
class ThreadSafeQueue(object): def __init__(self, max_size=0): self.queue = [] self.max_size = max_size # max_size为0表示无限大 self.lock = threading.Lock() # 互斥量 self.condition = threading.Condition() # 条件变量 def size(self): """ 获取当前队列的大小 :return: 队列长度 """ # 加锁 self.lock.acquire() size = len(self.queue) self.lock.release() return size def put(self, item): """ 将单个元素放入队列 :param item: :return: """ # 队列已满 max_size为0表示无限大 if self.max_size != 0 and self.size() >= self.max_size: return ThreadSafeException() # 加锁 self.lock.acquire() self.queue.append(item) self.lock.release() self.condition.acquire() # 通知等待读取的线程 self.condition.notify() self.condition.release() return item def batch_put(self, item_list): """ 批量添加元素 :param item_list: :return: """ if not isinstance(item_list, list): item_list = list(item_list) res = [self.put(item) for item in item_list] return res def pop(self, block=False, timeout=0): """ 从队列头部取出元素 :param block: 是否阻塞线程 :param timeout: 等待时间 :return: """ if self.size() == 0: if block: self.condition.acquire() self.condition.wait(timeout) self.condition.release() else: return None # 加锁 self.lock.acquire() item = None if len(self.queue): item = self.queue.pop() self.lock.release() return item def get(self, index): """ 获取指定位置的元素 :param index: :return: """ if self.size() == 0 or index >= self.size(): return None # 加锁 self.lock.acquire() item = self.queue[index] self.lock.release() return item class ThreadSafeException(Exception): pass
def thread_queue_test_1():
thread_queue = ThreadSafeQueue(10)
def producer():
while True:
thread_queue.put(random.randint(0, 10))
time.sleep(2)
def consumer():
while True:
print('current time before pop is %d' % time.time())
item = thread_queue.pop(block=True, timeout=3)
# item = thread_queue.get(2)
if item is not None:
print('get value from queue is %s' % item)
else:
print(item)
print('current time after pop is %d' % time.time())
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t2.start()
t1.join()
t2.join()
测试结果:
我们可以看到生产者线程每隔2s向队列写入一个元素,消费者线程当无数据时默认阻塞3s。通过执行时间发现消费者线程确实发生了阻塞,当生产者写入数据时结束当前等待操作。

def thread_queue_test_2():
thread_queue = ThreadSafeQueue(10)
def producer():
while True:
thread_queue.put(random.randint(0, 10))
time.sleep(2)
def consumer(name):
while True:
item = thread_queue.pop(block=True, timeout=1)
# item = thread_queue.get(2)
if item is not None:
print('%s get value from queue is %s' % (name, item))
else:
print('%s get value from queue is None' % name)
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer, args=('thread1',))
t3 = threading.Thread(target=consumer, args=('thread2',))
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
测试结果:
生产者还是每2s生成一个元素写入队列,消费者开启两个线程进行消费,默认阻塞时间为1s,打印结果显示通过加锁确保每次只有一个线程能获取数据,保证了线程读写的安全。

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:mmqy2019@163.com进行举报,并提供相关证据,查实之后,将立刻删除涉嫌侵权内容。
猜你喜欢
这篇文章主要为大家介绍了python数据可视化绘制火山图示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
冒泡排序(Bubble Sort)是一种简单的排序算法。它重复地遍历要排序的数列,一次比较两个元素,如果他们的顺序错误就把他们交换过来。遍历数列的工作是重复地进行直到没有再需要交换,也就是说该数列已经排序完成。这个算法的名字由来是因为越小的元素会经由交换慢慢“浮”到数列的顶端。
这篇文章主要介绍了pyecharts绘制各种数据可视化图表案例并附效果和代码,文章围绕主题展开详细的内容介绍,感兴趣的小伙伴可以参考一下
字符串是 Python 中最常用的数据类型。我们一般使用引号来创建字符串。创建字符串很简单,只要为变量分配一个值即可。
这篇文章主要介绍了Python打印数据类型的全过程,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
成为群英会员,开启智能安全云计算之旅
立即注册Copyright © QY Network Company Ltd. All Rights Reserved. 2003-2020 群英 版权所有
增值电信经营许可证 : B1.B2-20140078 粤ICP备09006778号 域名注册商资质 粤 D3.1-20240008