Closable thread safe Queue.

Gribouillis 0 Tallied Votes 313 Views Share

When several producer threads put items in a queue.Queue, and a consumer thread gets items from the queue, there is no way for the consumer thread to tell the producers that they should stop feeding the queue. This snippet defines a subclass of Queue with a close method. After the queue has been closed, producer threads which attempt to put items in the queue will receive a Closed exception.
Priority queues and lifo queues are provided as well.
This code has been tested with python 2.6 and 3.1.

"""closablequeue.py
Thread safe queues that can be closed (Tested with python 2.6 and 3.1).

This module subclasses a new class Queue from (Q/q)ueue.Queue
with an additional thread safe method Queue.close(). After this call,
no thread can put items in the queue and the method Queue.put raises
a new Closed exception. There may remain items in the queue when close
is called, this items can be extracted by calling items = queue.close(empty=True)
or by calling Queue.get() after the close.

Subclasses PriorityQueue and LifoQueue are provided as well.
"""

import threading
import sys
try:
    import queue as _Queue
    from queue import Empty, Full
except ImportError:
    import Queue as _Queue
    from Queue import Empty, Full

from functools import update_wrapper

class Closed(Exception):
    pass

class Queue(_Queue.Queue):
    def __init__(self, maxsize=0):
        _Queue.Queue.__init__(self, maxsize)
        self.use_put = threading.Lock()
        self._can_put = True

    def close(self, empty=False):
        """close and the queue, forbidding subsequent 'put'.
        If 'empty' is true, empty the queue, and return the queue items"""
        self.use_put.acquire()
        self._can_put = False
        self.use_put.release()
        items = []
        if empty:
            try:
                while True:
                    items.append(self.get_nowait())
            except Empty:
                pass
        return items

    def put(self, item, block=True, timeout=None):
        """Put an item into the queue.

        If optional args 'block' is true and 'timeout' is None (the default),
        block if necessary until a free slot is available. If 'timeout' is
        a positive number, it blocks at most 'timeout' seconds and raises
        the Full exception if no free slot was available within that time.
        Otherwise ('block' is false), put an item on the queue if a free slot
        is immediately available, else raise the Full exception ('timeout'
        is ignored in that case).
        """
        self.not_full.acquire()
        try:
            if self.maxsize > 0:
                if not block:
                    if self._qsize() == self.maxsize:
                        raise Full
                elif timeout is None:
                    while self._qsize() == self.maxsize:
                        self.not_full.wait()
                elif timeout < 0:
                    raise ValueError("'timeout' must be a positive number")
                else:
                    endtime = _time() + timeout
                    while self._qsize() == self.maxsize:
                        remaining = endtime - _time()
                        if remaining <= 0.0:
                            raise Full
                        self.not_full.wait(remaining)

            self.use_put.acquire()
            try:
                if self._can_put:
                    self._put(item)
                    self.unfinished_tasks += 1
                    self.not_empty.notify()
                else:
                    raise Closed
            finally:
                self.use_put.release()
        finally:
            self.not_full.release()

for name in ("PriorityQueue", "LifoQueue"):
  exec("""
class {0}(Queue):
    _init = _Queue.{0}._init
    _qsize = _Queue.{0}._qsize
    _put = _Queue.{0}._put
    _get = _Queue.{0}._get
""".format(name))

#************************************************
#   Put this test code in a different file:
#************************************************
#!/usr/bin/env python
"""testqueue.py

Test program for module closablequeue.

Description:
    In this example, 2 workers fill a truck with bags. Each worker
    has its own thread and put the bags in a closable queue created
    by the main thread. When the worker catches the Closed exception,
    it stops filling the queue and exit.
    The main thread creates the truck and workers and a queue, and starts
    the workers' threads. When the truck is full, it closes the queue,
    waits for the end of the other threads and prints the content of
    the truck. Each bag in the trucks shows which worker put that bag
    in the truck.
    Note that we use a queue of maximum size 3. It means that when the
    main thread closes the queue, there may be 0, 1 2 or 3 bags in the queue.
    The main thread therefore closes the queue before the truck is completely full,
    so that it can add 3 more bags to the truck if necessary.
"""
from closablequeue import Queue, Closed
import threading

def main():
    class Truck(object):
        def __init__(self, capacity):
            self.capacity = capacity
            self.content = []
        def add(self, package):
            self.content.append(package)
    class Worker(object):
        def __init__(self, index):
            self.index = index
            self.thread = threading.Thread(target=self._fill)
        def fill(self, queue):
            self.queue = queue
            self.thread.start()
        def _fill(self):
            print("worker {0} starting".format(self.index))
            try:
                while True:
                    self.queue.put([self.index])
            except Closed:
                print("worker {0} found a closed queue".format(self.index))
    truck = Truck(50)
    queue = Queue(3)
    workers = []
    for i in range(2):
        workers.append(Worker(i))
        workers[i].fill(queue)
    load = 0
    while load < truck.capacity - queue.maxsize:
        truck.add(queue.get())
        load += 1
    print("closing queue")
    items = queue.close(True)
    truck.content.extend(items)
    for w in workers:
        w.thread.join()
        print("joined worker {0}".format(w.index))
    print("truck's content: {0}".format(truck.content))
main()
Gribouillis 1,391 Programming Explorer Team Colleague

ERRATUM:
A line self.task_done() should be addded after line 44 above. The code should read

if empty:
            try:
                while True:
                    items.append(self.get_nowait())
                    self.task_done()
            except Empty:
                pass
        return items

Otherwise, trying to join() the queue may block.

Be a part of the DaniWeb community

We're a friendly, industry-focused community of developers, IT pros, digital marketers, and technology enthusiasts meeting, networking, learning, and sharing knowledge.