tensorflow 队列管理之异步操作

xiaoxiao2025-05-25  44

# -*- coding: utf-8 -*- """ Created on Thu Oct 25 09:49:34 2018 @author: Grey """ import tensorflow as tf #模拟异步子线程存入样本,主线程读取样本 #1.定义一个队列,100 Q = tf.FIFOQueue(100,tf.float32) #2.定义子线程做的操作,循环+1 放入队列 var = tf.Variable(0.0)#变量op #实现自增tf.assign_add() data=tf.assign_add(var,tf.constant(1.0)) en_q=Q.enqueue(data) #3.定义队列管理器op,指定子线程的数量和操作 qr=tf.train.QueueRunner(Q,enqueue_ops=[en_q]*2) #定义会话,执行op with tf.Session() as sess: #初始化var变量op tf.global_variables_initializer().run() # 正式开启子线程 threads=qr.create_threads(sess,start=True) # 主线程,不断读取训练数据 for i in range(300): print(sess.run(Q.dequeue())) pass

输出正确,但是还是会报错 CancelledError (see above for traceback): Enqueue operation was cancelled          [[Node: fifo_queue_57_enqueue = QueueEnqueue[Tcomponents=[DT_FLOAT], _class=["loc:@fifo_queue_57"], timeout_ms=-1, _device="/job:localhost/replica:0/task:0/cpu:0"](fifo_queue_57, AssignAdd_31)]]

原因:主线程结束,意味着Session结束,资源释放,但是子线程还在运行,所以出错。

解决方案:线程协调器关闭子线程

异步执行的关键是 创建子线程并及进行关闭

如下图为添加关闭子线程的异步执行的框架:

代码:

# -*- coding: utf-8 -*- """ Created on Thu Oct 25 09:49:34 2018 @author: Grey """ import tensorflow as tf Q = tf.FIFOQueue(1000,dtypes=tf.float32) var = tf.Variable(0.0) data=tf.assign_add(var,tf.constant(1.0)) en_q=Q.enqueue(data) qr=tf.train.QueueRunner(Q,enqueue_ops=[en_q]*2) with tf.Session() as sess: tf.global_variables_initializer().run() coord=tf.train.Coordinator() threads=qr.create_threads(sess,coord=coord,start=True) # 主线程,不断读取训练数据 for i in range(100): print(sess.run(Q.dequeue())) # 回收子线程 coord.request_stop() coord.join(threads) pass

输出结果正确且无任何报错:

转载请注明原文地址: https://www.6miu.com/read-5030712.html

最新回复(0)