Skip to content

Creating a Simple Work Queue Behind an Flask API⚓︎

Summary⚓︎

Like all of us I am always looking for ways to stop wasting my time. I often find myself needing to a run a command over and over again to test something (e.g. simulate load). My preference is always that I have a button that I can just click and it does the thing I need it to do. The below are the steps involved in setting up a simple Flask API (with swagger page), with a simple in memory work queue, that a configured number of workers monitor to perform the required task.

Setup⚓︎

First install gunicorn which we will use the run the web server, and the flask framework.

python3 -mpip install gunicorn flask-restx

Create the App⚓︎

Then on to creating the application.

Setup Work Queue and Configure Workers⚓︎

from queue import Queue
from threading import Thread
import time

number_of_threads = 5         ### obviously this is configurable
my_work_queue = Queue()

def doFileUploads(i, q):
    while True:
        print("%s: Looking for the next record" % i)
        record = q.get()
        print("%s: Uploading:" % i, record)
        time.sleep(10)  ## Do the thing that needs to be done
        q.task_done()

for i in range(number_of_threads):
    worker = Thread(target=doFileUploads, args=(i, my_work_queue,))    ### setup the workers -- give them work
    worker.setDaemon(True)
    worker.start()

Create the Flask App and Add The Routes⚓︎

from flask  import Flask
from flask_restx import Resource, Api, reqparse

app = Flask(__name__)
api = Api(app)


ns = api.namespace("queue", description="Simple Queue API operations")

parser = reqparse.RequestParser()
parser.add_argument(
    "recordDetails", type=int, help="Rate cannot be converted", location="json"
)
parser.add_argument(
    "numberOfQueueEntriesToCreate", required=True, type=int, location="args"
)
parser.add_argument(
    "priority", choices=(1, 2), type=int, help="Bad choice: {error_msg}", location="args"
)


@ns.route("/")
class Add_Queue_Record(Resource):
    @api.expect(parser)
    def put(self):
        """
        Add a record to the queue
        """
        args = parser.parse_args()
        for x in range(0, args['numberOfQueueEntriesToCreate']):
            my_work_queue.put({"data": args['recordDetails']})
        return {
            "message": "Record Added to Work Queue",
            "queue_size": my_work_queue.qsize(),
            "args": args,
        }

    def delete(self):
        """
        Remove everything from the queue
        """
        while my_work_queue.qsize() > 0:
            my_work_queue.get()
            my_work_queue.task_done()
        return {"message": "Work Queue Emptied", "queue_size": my_work_queue.qsize()}


@ns.route("/count")
class Count_Queue_Records(Resource):
    def get(self):
        """
        Count the number of records in the queue
        """
        return {"queue_size": my_work_queue.qsize()}

Configure Interface and Port⚓︎

if __name__ == '__main__':
    app.run(debug=True, host='0.0.0.0', port='9999')

Start the Web Server⚓︎

gunicorn app:app --reload -b '0.0.0.0:9999' -w 1 -t 300 

That's it. Noting that only one gunicorn worker can be used as if we start mulitple each would have their own work queue.