6.1 Tensorflow笔记(基础篇):队列与线程

xiaoxiao2021-02-27  256

前言

在Tensorflow的实际应用中,队列与线程是必不可少,主要应用于数据的加载等,不同的情况下使用不同的队列,主线程与其他线程异步进行数据的训练与读取,所以队列与线程的知识也是Tensorflow必须要学会的重要知识 另一方面,Tensorflow作为符号编程框架,在构图后,加载数据有三种方式,预加载与填充数据都存在,数据量大消耗内存等情况的出现.使用第三种方式文件读取避免了前两者的缺点,但是其实现则必须要使用队列与此线程,因此为了进一步的学习,也需要在这个时候掌握队列与线程的知识

FIFOQueue : 先入先出的队列

# 在使用循环神经网络时,希望读入的训练样本是有序的,就要用到FIFOQueue # 先创建一个先入先出的队列,初始化队列插入0.1,0.2,0.3三个数字 q = tf.FIFOQueue(3,tf.float32) init = q.enqueue_many(([0.1,0.2,0.3],)) # 定义出队,+1,入队操作 x = q.dequeue() y = x+1 q_inc = q.enqueue(y) with tf.Session() as sess: sess.run(init) # quelen = sess.run(q.size()) for i in range(2): sess.run(q_inc) # 执行两次操作,队列中的值变为0.3,1.1,1.2 for j in range(sess.run(q.size())): print(sess.run(q.dequeue())) # 输出队列的值

运行结果

0.3 1.1 1.2

RandomShuffleQueue:随机队列

随机队列,再出队列时,是以随机的顺序产生元素的 # 例如,我们在训练一些图像样本时,使用CNN的网络结构,希望可以无序的读入训练样本 q = tf.RandomShuffleQueue(capacity=10,min_after_dequeue=2,dtypes=tf.float32) # capacity 队列最大长度,min_after_dequeue 出队后最小的长度 with tf.Session() as sess : for i in range(0,10): # 10次入队 sess.run(q.enqueue(i)) for i in range(0,8): # 8次出队 print(sess.run(q.dequeue())) # 在队列长度等于最小值时,执行出队操作,会发生阻断 # 在队列长度等于最大值时,执行入队操作,会发生阻断 # 解除阻断的一种方法---设置等待时间 # run_options = tf.RunOptions(time_out_in_ms = 100000) # 等待十秒 # try: # sess.run(q.dequeue(),options=run_options) # except tf.errors.DeadlineExceededError: # print('out of range')

运行结果:

1.0 6.0 3.0 0.0 7.0 4.0 8.0 9.0

队列管理器

q = tf.FIFOQueue(1000, tf.float32) counter = tf.Variable(0.0) # 计数器 increment_op = tf.assign_add(counter, tf.constant(1.0)) # 操作给计数器加一 enquence_op = q.enqueue(counter) # 操作: 让计数器加入队列 # 创建一个队列管理器QueueRunner,用这两个操作相对列q中添加元素,目前我们只使用一个线程 qr = tf.train.QueueRunner(q, enqueue_ops=[increment_op, enquence_op] * 1) # 启动一个会话,从队列管理器qr中创建线程 # 主线程 with tf.Session() as sess: sess.run(tf.global_variables_initializer()) enquence_threads = qr.create_threads(sess, start=True) # 启用入队线程 # 主线程 for i in range(10): print(sess.run(q.dequeue())) # tensorflow.python.framework.errors_impl.CancelledError: Run call was cancelled

运行结果:

187.0 190.0 195.0 201.0 206.0 209.0 214.0 219.0 223.0 230.0 报错:......

线程和协调器

使用协调器来管理线程 这是要注意的的是在队列线程关闭后,再执行出队操作,将会报错”tf.errors.OutOfRange” 第一点会报错很好理解,队列关闭自然不能出队了(如果你学过数据结构那么你应该能够理解队列的意义,如果没能理解,建议重新掌握队列这一基本概念), 第二点则需要注意一下,报的错误是超出范围,而非队列不存在,希望当你遇到这个错误的时候能够想到,实际上可能是队列线程已经被取消了,而非真的超出了范围.

q = tf.FIFOQueue(1000, tf.float32) counter = tf.Variable(0.0) # 计数器 increment_op = tf.assign_add(counter, tf.constant(1.0)) # 操作给计数器加一 enquence_op = q.enqueue(counter) # 操作: 让计数器加入队列 # 第一种情况,在关闭其他线程之后(除主线程之外的其它线程),调用出队操作 print('第一种情况,在关闭其他线程之后(除主线程之外的其它线程),调用出队操作') # 创建一个队列管理器QueueRunner,用这两个操作相对列q中添加元素,目前我们只使用一个线程 qr = tf.train.QueueRunner(q, enqueue_ops=[increment_op, enquence_op] * 1) # 主线程 sess = tf.Session() sess.run(tf.global_variables_initializer()) # Coordinator: 协调器, 协调线程间的关系,可以视为一种信号量,用来做同步 coord = tf.train.Coordinator() # 启动入队线程,协调器是线程的参数 enqueue_threads = qr.create_threads(sess,coord=coord,start=True) # 主线程 for i in range(0,10): print(sess.run(q.dequeue())) coord.request_stop() # 通知其他线程关闭 coord.join(enqueue_threads) # join 操作等待其他线程结束,其他所有线程关闭之后,这一函数才能返回 print('第二种情况: 在队列线程关闭之后,调用出队操作-->处理tf.errors.OutOfRange错误') # q启动入队线程 enqueue_threads = qr.create_threads(sess,coord=coord,start=True) # 主线程 coord.request_stop() # 通知其他线程关闭 for j in range(0,10): try: print(sess.run(q.dequeue())) except tf.errors.OutOfRangeError: break coord.join(enqueue_threads) # join 操作等待其他线程结束,其他所有线程关闭之后,这一函数才能返回

运行结果:

第一种情况,在关闭其他线程之后(除主线程之外的其它线程),调用出队操作 5.0 7.0 10.0 17.0 23.0 29.0 34.0 38.0 46.0 52.0 第二种情况: 在队列线程关闭之后,调用出队操作-->处理tf.errors.OutOfRange错误 53.0 53.0 53.0 53.0 53.0 53.0 53.0 53.0 53.0 53.0

更详细的内容你可以参考github中的源代码

转载请注明原文地址: https://www.6miu.com/read-10009.html

最新回复(0)