As a developer, it can be very useful to learn how to run functions in the background while being able to monitor the queue in another tab or different system. This is incredibly helpful when managing heavy workloads that might not work efficiently when called all at once, or when making large numbers of calls to a database that returns data slowly over time rather than all at once.
In this tutorial we will implement a RQ queue in Python with the help of Redis to schedule and execute tasks in a timely manner.
- Python 3.6 or newer. If your operating system does not provide a Python interpreter, you can go to python.org to download an installer.
Let’s talk about task queues
Task queues are a great way to allow tasks to work asynchronously outside of the main application flow. There are many task queues in Python to assist you in your project, however, we’ll be discussing a solution today known as RQ.
RQ, also known as Redis Queue, is a Python library that allows developers to enqueue jobs to be processed in the background with workers. The RQ workers will be called when it's time to execute the queue in the background. Using a connection to Redis, it’s no surprise that this library is super lightweight and offers support for those getting started for the first time.
By using this particular task queue, it is possible to process jobs in the background with little to no hassle.
Set up the environment
Create a project directory in your terminal called “rq-test” to follow along.
$ mkdir rq-test $ cd rq-test
Install a virtual environment and copy and paste the commands to install
rq and related packages. If you are using a Unix or MacOS system, enter the following commands:
$ python3 -m venv venv $ source venv/bin/activate (venv) $ pip install rq
If you are on a Windows machine, enter the following commands in a prompt window:
$ python -m venv venv $ source venv\bin\activate (venv) $ pip install rq
RQ requires a Redis installation on your machine which can be done using the following commands using
wget. Redis is on version 6.0.6 at the time of this article publication.
If you are using a Unix or MacOS system, enter these commands to install Redis. This is my personal favorite way to install Redis, but there are alternatives below:
$ wget http://download.redis.io/releases/redis-6.0.6.tar.gz $ tar xzf redis-6.0.6.tar.gz $ cd redis-6.0.6 $ make
If you have Homebrew installed, you can type
brew install redis in the terminal and refer to this GitHub gist to install Redis on the Mac. For developers using Ubuntu Linux, the command
sudo apt-get install redis would get the job done as well.
Run the Redis server in a separate terminal window on the default port with the command
src/redis-server from the directory where it's installed.
For Windows users, you would have to follow a separate tutorial to run Redis on Windows. Download the latest zip file on GitHub and extract the contents. Run the
redis-server.exe file that was extracted from the zip file to start the Redis server.
The output should look similar to the following after running Redis:
55154:C 25 Aug 2020 16:41:18.968 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo 55154:C 25 Aug 2020 16:41:18.969 # Redis version=6.0.6, bits=64, commit=c10e5f1e, modified=1, pid=55154, just started 55154:C 25 Aug 2020 16:41:18.969 # Warning: no config file specified, using the default config. In order to specify a config file use src/redis-server /path/to/redis.conf 55154:M 25 Aug 2020 16:41:18.970 * Increased maximum number of open files to 10032 (it was originally set to 2560). _._ _.-``__ ''-._ _.-`` `. `_. ''-._ Redis 6.0.6 (c10e5f1e/1) 64 bit .-`` .-```. ```\/ _.,_ ''-._ ( ' , .-` | `, ) Running in standalone mode |`-._`-...-` __...-.``-._|'` _.-'| Port: 6379 | `-._ `._ / _.-' | PID: 55154 `-._ `-._ `-./ _.-' _.-' |`-._`-._ `-.__.-' _.-'_.-'| | `-._`-._ _.-'_.-' | http://redis.io `-._ `-._`-.__.-'_.-' _.-' |`-._`-._ `-.__.-' _.-'_.-'| | `-._`-._ _.-'_.-' | `-._ `-._`-.__.-'_.-' _.-' `-._ `-.__.-' _.-' `-._ _.-' `-.__.-' 55154:M 25 Aug 2020 16:41:18.971 # Server initialized 55154:M 25 Aug 2020 16:41:18.971 * Ready to accept connections
Build out the tasks
In this case, a task for Redis Queue is merely a Python function. For this article, we’ll tell the task to print a message to the terminal for a “x” amount of seconds to demonstrate the use of RQ.
Copy and paste the following code to a file named “tasks.py” in your directory.
from datetime import datetime, timedelta import time def print_task(seconds): print("Starting task") for num in range(seconds): print(num, ". Hello World!") time.sleep(1) print("Task completed") def print_numbers(seconds): print("Starting num task") for num in range(seconds): print(num) time.sleep(1) print("Task to print_numbers completed")
These are simple tasks that print out numbers and text on the terminal so that we can see if the tasks are executed properly. Using the
time.sleep(1) function from the Python time library will allow your task to be suspended for the given number of seconds and overall extend the time of the task so that we can examine their progress.
Feel free to alter this code after the tutorial and create your own tasks. Some other popular tasks are sending a fax message or email by connecting to your email client.
Create your queue
Create another file in the root directory and name it “app.py”. Copy and paste the following code:
from datetime import datetime, timedelta import time from redis import Redis from rq import Queue import tasks queue = Queue(connection=Redis()) def queue_tasks(): queue.enqueue(tasks.print_task, 5) queue.enqueue_in(timedelta(seconds=10), tasks.print_numbers, 5) def main(): queue_tasks() if __name__ == "__main__": main()
queue object sets up a connection to Redis and initializes a queue based on that connection. This queue can hold all the jobs required to run in the background with workers.
As seen in the code, the
tasks.print_task function is added using the
enqueue function. This means that the task added to the queue will be executed immediately
enqueue_in function is another nifty RQ function because it expects a
timedelta in order to schedule the specified job. In this case,
seconds is specified, but this variable can be changed according to the time schedule expected for your usage. Check out other ways to schedule a job on this GitHub README.
Since we are testing out the RQ queue, I have enqueued both the
tasks.print_numbers functions so that we can see their output on the terminal. The third argument passed in is a "5" which also stands for the argument passed into the respective functions. In this case, we are expecting to see
print_task() print "Hello World!" five times and for
print_numbers() to print 5 numbers in order.
If you have created any additional task, be sure to import your tasks at the top of the file so that all the tasks in your Python file can be accessed.
Run the queue
For the purposes of this article, the gif demo below will show a perfect execution of the tasks in queue so no exceptions will be raised.
The Redis server should still be running in a tab from earlier in the tutorial at this point. If it stopped, run the command
src/redis-server inside the
redis-6.0.6 folder on one tab, or for developers with a Windows machine, start
redis-cli.exe. Open another tab solely to run the RQ scheduler with the command
rq worker --with-scheduler.
This should be the output after running the command above.
09:05:05 Worker rq:worker:1c0a81e7e160458283ebe1e21d92a26e: started, version 1.5.0 09:05:05 *** Listening on default... 09:05:05 Cleaning registries for queue: default INFO:rq.worker:Cleaning registries for queue: default
The worker command activated a worker process in order to connect to Redis and look for any jobs assigned to the queue from the code in
Lastly, open a third tab in the terminal for the root project directory. Start up the virtual environment again with the command
source venv/bin/activate. Then type
python app.py to run the project.
Go back to the tab that is running
rq worker --with-scheduler. Wait 5 more seconds after the first task is executed to see the next task. Although the live demo gif below wasn’t able to capture the best timing due to having to run the program and record, it is noticeable that there was a pause between tasks until execution and that both tasks were completed within 15 seconds.
Here’s the sample output inside of the
20:17:15 Worker rq:worker:9b4bf20b70694bfa9ad2621721136b06: started, version 1.5.1 20:17:15 *** Listening on default... 20:17:15: Trying to acquire locks for default 20:17:15 Cleaning registries for queue: default 20:17:15: Scheduler for default started with PID 93149 20:17:28 default: tasks.print_task(5) (eb70f0fd-c01a-4826-95a4-575919b8d8dd) Starting task 0 . Hello World! 1 . Hello World! 2 . Hello World! 3 . Hello World! 4 . Hello World! Task completed 20:17:33 default: Job OK (eb70f0fd-c01a-4826-95a4-575919b8d8dd) 20:17:33 Result is kept for 500 seconds 20:17:38 default: tasks.print_numbers(5) (d4d6fbd4-de7f-49ea-bc96-85a10c2a08c0) Starting num task 0 1 2 3 4 Task to print_numbers completed 20:17:43 default: Job OK (d4d6fbd4-de7f-49ea-bc96-85a10c2a08c0) 20:17:43 Result is kept for 500 seconds
As seen in the output above, if the tasks written in task.py had a line to return anything, then the result of both tasks are kept for 500 seconds which is the default. A developer can alter the return value's time to live by passing in a
result_ttl parameter when adding tasks to the queue.
Handle exceptions and try again
If a job were to fail, you can always set up a log to keep track of the error messages, or you can use the RQ queue to enqueue and retry failed jobs. By using RQ's
FailedJobRegistry package, you can keep track of the jobs that failed during runtime. The RQ documentation discusses how it handles the exceptions and how data regarding the job can help the developer figure out how to resubmit the job.
However, RQ also supports developers in handling exceptions in their own way by injecting your own logic to the rq workers. This may be a helpful option for you if you are executing many tasks in your project and those that failed are not worth retrying.
Force a failed task to retry
Since this is an introductory article to run your first task with RQ, let's try to purposely fail one of the tasks from earlier to test out RQ's
Go to the tasks.py file and alter the
print_task() function so that random numbers can be generated and determine if the function will be executed or not. We will be using the random Python library to assist us in generating numbers. Don't forget to include the
import random at the top of the file.
Copy and paste the following lines of code to change the
print_task() function in the tasks.py file.
import random def print_task(seconds): print("Starting task") random_num = random.randrange(1, 3, 1) # optional print statement to see the numbers # print("the randomly generated number = ", random_num) if random_num == 2: raise RuntimeError('Sorry, I failed! Let me try again.') else: for num in range(seconds): print(num, ". Hello World!") time.sleep(1) print("Task completed")
Go back to the app.py file to change the queue. Instead of using the
enqueue_in function to execute the
tasks.print_task function, delete the line and replace it with
queue.enqueue(tasks.print_task, 5, retry=Retry(max=2)).
retry object is imported with
rq so make sure you add
from rq import Retry at the top of the file as well in order to use this functionality. This object accepts
interval arguments to specify when the particular function will be retried. In the newly changed line, the
tasks.print_task function will pass in the function we want to retry, the argument parameter "5" which stands for the seconds of execution, and lastly the maximum amount of times we want the queue to retry.
The tasks in queue should now look like this:
def queue_tasks(): queue.enqueue(tasks.print_task, 5, retry=Retry(max=2)) queue.enqueue_in(timedelta(seconds=10), tasks.print_numbers, 5)
When running the
print_task task, there is a 50/50 chance that
tasks.print_task() will execute properly since we're only generating a 1 or 2, and the print statement will only happen if you generate a 1. A
RuntimeError will be raised otherwise and the queue will retry the task immediately as many times as it takes to successfully print "Hello World!".
What’s next for task queues?
Congratulations! You have successfully learned and implemented the basics of scheduling tasks in the RQ queue. Perhaps now you can tell the worker command to add a task that prints out an infinite number of "Congratulations" messages in a timely manner!
Otherwise, check out these different tasks that you can build in to your Redis Queue:
- Schedule Twilio SMS to a list of contacts quickly!
- Use Redis Queue to generate a fan fiction with OpenAI GPT-3
- Queue Emails with Twilio SendGrid using Redis Queue
Let me know what you have been building by reaching out to me over email!
Diane Phan is a developer on the Developer Voices team. She loves to help programmers tackle difficult challenges that might prevent them from bringing their projects to life. She can be reached at dphan [at] twilio.com or LinkedIn.