3.1 队列与线程

3.1.1 队列

TensorFlow中主要有FIFOQueue和RandomShuffleQueue两种队列,下面就详细介绍这两种队列的使用方法和应用场景。

1.FIFOQueue

FIFOQueue是先进先出队列,主要是针对一些序列样本。例如,在使用循环神经网络以及需要处理语音、文字、视频等序列信息的时候,用户希望TensorFlow能够按照顺序进行处理,这时候就需要使用FIFOQueue队列。

【例3-1】 以下为FIFOQueue队列的实例,其是一个先进先出队列。

运行程序,输出如下:

由以上结果可以发现,队列的操作是在主线程的对话中依次完成的。

2.RandomShuffleQueue

RandomShuffleQueue是随机队列,在执行出队操作的时候,队列是以随机的顺序进行的。随机队列一般应用在训练模型的时候,希望可以无序的获取样本来进行训练。例如,在训练图像分类模型时,需要输入的样本是无序的,这时就可以利用多线程来读取样本,将样本放到随机队列中,然后再利用主线程每次从随机队列中获取一个 batch 进行模型的训练。

【例3-2】 用RandomShuffleQueue函数以随机的顺序产生元素。

3.1.2 队列管理器

在训练模型时,需要将样本从硬盘读取到内存后才能进行训练。会话中可以运行多个线程,可以在队列管理器中创建一系列新的线程进行入队操作,主线程可以利用队列中的数据进行训练,而不需要等到所有的样本都读取完成之后才开始,即数据的读取和模型的训练是异步的,这样可以节省很多时间。

【例3-3】 队列管理器演示实例。

程序结束的时候,还报了一个 tensorflow.python.framework.errors_impl.CancelledError:Enqueue operation was cancelled的异常。那是因为主线程已经完成了,而入队线程还在继续执行,导致程序没法结束。由于计数器加1操作和入队操作不同步,可能会因为计数器还没来得及进行加1操作就再次执行入队操作,从而导致多次入队同样的数字,这就是出队的时候会出现同样数字的原因。

3.1.3 线程协调器

Coordinator类用来帮助多个线程协同工作,多个线程同步终止。其主要方法有:

● should_stop():如果线程应该停止则返回True。

● request_stop(<exception>):请求该线程停止。

● join(<list of threads>):等待被指定的线程终止。

其大致操作过程为:首先创建一个 Coordinator 对象,然后建立一些使用Coordinator对象的线程。这些线程通常一直循环运行直到should_stop()返回True时停止。任何线程都可以决定什么时候应该停止计算,只需要调用 request_stop()即可,同时其他线程的should_stop()将会返回True,最终停止。

【例3-4】 线程协调器实例演示。

运行程序,输出如下:

第二种情况:在队列线程关闭之后调用出队操作,处理tf.errors.OutOfRange错误。

3.1.4 组合使用

在TensorFlow中使用Queue的经典模式有两种,都是配合QueueRunner和Coordinator一起使用的。

第一种,显式地创建QueueRunner,然后调用它的create_threads方法启动线程。例如下面这段代码:

第二种,使用全局的start_queue_runners方法启动线程。例如下面这段代码:

在例子中,tf.train.string_input_produecer 会将一个隐含的 QueueRunner 添加到全局图中(类似的操作还有 tf.train.shuffle_batch 等)。由于没有显式地返回 QueueRunner 来用create_threads启动线程,这里使用了tf.train.start_queue_runners的方法直接启动tf.GraphKeys.QUEUE_RUNNERS集合中的所有队列线程。

这两种方式在效果上是相似的。