Parallelism, Concurrency and AsyncIO in Python

Concurrency vs Parallelism

Concurrency and parallelism are similar terms, but they are not the same thing.

Concurrency is the ability to run multiple tasks on the CPU at the same time. Tasks can start, run, and complete in overlapping time periods. In the case of a single CPU, multiple tasks are run with the help of context switching, where the state of a process is stored so that it can be called and executed later.

Parallelism, meanwhile, is the ability to run multiple tasks at the same time across multiple CPU cores.

Though they can increase the speed of your application, concurrency and parallelism should not be used everywhere. The use case depends on whether the task is CPU-bound or IO-bound.

Tasks that are limited by the CPU are CPU-bound. For example, mathematical computations are CPU-bound since computational power increases as the number of computer processors increases. Parallelism is for CPU-bound tasks. In theory, If a task is divided into n-subtasks, each of these n-tasks can run in parallel to effectively reduce the time to 1/n of the original non-parallel task. Concurrency is preferred for IO-bound tasks, as you can do something else while the IO resources are being fetched.

The best example of CPU-bound tasks is in data science. Data Scientists deal with huge chunks of data. For data preprocessing, they can split the data into multiple batches and run them in parallel, effectively decreasing the total time to process. Increasing the number of cores results in faster processing.

Web scraping is IO-bound. Because the task has little effect on the CPU since most of the time is spent on reading from and writing to the network. Other common IO-bound tasks include database calls and reading and writing files to disk. Web applications, like Django and Flask, are IO-bound applications.

Scenario

With that, let's take a look at how to speed up the following tasks:

# tasks.py

import os
from multiprocessing import current_process
from threading import current_thread

import requests

def make_request(num):
# io-bound

pid = os.getpid()
thread_name = current_thread().name
process_name = current_process().name
print(f"{pid} - {process_name} - {thread_name}")

requests.get("https://httpbin.org/ip")

async def make_request_async(num, client):
# io-bound

pid = os.getpid()
thread_name = current_thread().name
process_name = current_process().name
print(f"{pid} - {process_name} - {thread_name}")

await client.get("https://httpbin.org/ip")

def get_prime_numbers(num):
# cpu-bound

pid = os.getpid()
thread_name = current_thread().name
process_name = current_process().name
print(f"{pid} - {process_name} - {thread_name}")

numbers = []

prime = [True for i in range(num + 1)]
p = 2

while p * p <= num:
if prime[p]:
for i in range(p * 2, num + 1, p):
prime[i] = False
p += 1

prime[0] = False
prime[1] = False

for p in range(num + 1):
if prime [  p]:
numbers.append(p)

return numbers


Notes:

  • make_request makes an HTTP request to https://httpbin.org/ip X number of times.
  • make_request_async makes the same HTTP request asynchronously with HTTPX.
  • get_prime_numbers calculates the prime numbers, via the Sieve of Eratosthenes method, from two to the provided limit.

We'll be using the following libraries from the standard library to speed up the above tasks:

Library Class/Method Processing Type
threading Thread concurrent
concurrent.futures ThreadPoolExecutor concurrent
asyncio gather concurrent (via coroutines)
multiprocessing Pool parallel
concurrent.futures ProcessPoolExecutor parallel

IO-bound Operation

Again, IO-bound tasks spend more time on IO than on the CPU.

Since web scraping is IO bound, we should use threading to speed up the processing as the retrieving of the HTML (IO) is slower than parsing it (CPU).

Scenario: How to speed up a Python-based web scraping and crawling script?

Sync Example

Let's start with a benchmark.

# io-bound_sync.py

import time

from tasks import make_request

def main():
for num in range(1, 101):
make_request(num)

if __name__ == "__main__":
start_time = time.perf_counter()

main()

end_time = time.perf_counter()
print(f"Elapsed run time: {end_time - start_time} seconds.")

Here, we made 100 HTTP requests using the make_request function. Since requests happen synchronously, each task is executed sequentially.

Elapsed run time: 15.710984757 seconds.

So, that's roughly 0.16 seconds per request.

Threading Example

# io-bound_concurrent_1.py

import threading
import time

from tasks import make_request

def main():
tasks = []

for num in range(1, 101):
tasks.append(threading.Thread(target=make_request, args=(num,)))
tasks[-1].start()

for task in tasks:
task.join()

if __name__ == "__main__":
start_time = time.perf_counter()

main()

end_time = time.perf_counter()
print(f"Elapsed run time: {end_time - start_time} seconds.")

Here, the same make_request function is called 100 times. This time the threading library is used to create a thread for each request.

Elapsed run time: 1.020112515 seconds.

The total time decreases from ~16s to ~1s.

Since we're using separate threads for each request, you might be wondering why the whole thing didn't take ~0.16s to finish. This extra time is the overhead for managing threads. The Global Interpreter Lock (GIL) in Python makes sure that only one thread uses the Python bytecode at a time.

concurrent.futures Example

# io-bound_concurrent_2.py

import time
from concurrent.futures import ThreadPoolExecutor, wait

from tasks import make_request

def main():
futures = []

with ThreadPoolExecutor() as executor:
for num in range(1, 101):
futures.append(executor.submit(make_request, num))

wait(futures)

if __name__ == "__main__":
start_time = time.perf_counter()

main()

end_time = time.perf_counter()
print(f"Elapsed run time: {end_time - start_time} seconds.")

Here we used concurrent.futures.ThreadPoolExecutor to achieve multithreading. After all the futures/promises are created, we used wait to wait for all of them to complete.

Elapsed run time: 1.340592231 seconds

concurrent.futures.ThreadPoolExecutor is actually an abstraction around the multithreading library, which makes it easier to use. In the previous example, we assigned each request to a thread and in total 100 threads were used. But ThreadPoolExecutor defaults the number of worker threads to min(32, os.cpu_count() + 4). ThreadPoolExecutor exists to ease the process of achieving multithreading. If you want more control over multithreading, use the multithreading library instead.

AsyncIO Example

# io-bound_concurrent_3.py

import asyncio
import time

import httpx

from tasks import make_request_async

async def main():
async with httpx.AsyncClient() as client:
return await asyncio.gather(
*[make_request_async(num, client) for num in range(1, 101)]
)

if __name__ == "__main__":
start_time = time.perf_counter()

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

end_time = time.perf_counter()
elapsed_time = end_time - start_time
print(f"Elapsed run time: {elapsed_time} seconds")

Here, we used asyncio to achieve concurrency.

Elapsed run time: 0.553961068 seconds

asyncio is faster than the other methods, because threading makes use of OS (Operating System) threads. So the threads are managed by the OS, where thread switching is preempted by the OS. asyncio uses coroutines, which are defined by the Python interpreter. With coroutines, the program decides when to switch tasks in an optimal way. This is handled by the even_loop in asyncio.

CPU-bound Operation

Scenario: How to speed up a simple data processing script?

Sync Example

Again, let's start with a benchmark.

# cpu-bound_sync.py

import time

from tasks import get_prime_numbers

def main():
for num in range(1000, 16000):
get_prime_numbers(num)

if __name__ == "__main__":
start_time = time.perf_counter()

main()

end_time = time.perf_counter()
print(f"Elapsed run time: {end_time - start_time} seconds.")

Here, we executed the get_prime_numbers function for numbers from 1000 to 16000.

Elapsed run time: 17.863046316 seconds.

Multiprocessing Example

# cpu-bound_parallel_1.py

import time
from multiprocessing import Pool, cpu_count

from tasks import get_prime_numbers

def main():
with Pool(cpu_count() - 1) as p:
p.starmap(get_prime_numbers, zip(range(1000, 16000)))
p.close()
p.join()

if __name__ == "__main__":
start_time = time.perf_counter()

main()

end_time = time.perf_counter()
print(f"Elapsed run time: {end_time - start_time} seconds.")

Here, we used multiprocessing to calculate the prime numbers.

Elapsed run time: 2.9848740599999997 seconds.

concurrent.futures Example

# cpu-bound_parallel_2.py

import time
from concurrent.futures import ProcessPoolExecutor, wait
from multiprocessing import cpu_count

from tasks import get_prime_numbers

def main():
futures = []

with ProcessPoolExecutor(cpu_count() - 1) as executor:
for num in range(1000, 16000):
futures.append(executor.submit(get_prime_numbers, num))

wait(futures)

if __name__ == "__main__":
start_time = time.perf_counter()

main()

end_time = time.perf_counter()
print(f"Elapsed run time: {end_time - start_time} seconds.")

Here, we achieved multiprocessing using concurrent.futures.ProcessPoolExecutor. Once the jobs are added to futures, wait(futures) waits for them to finish.

Elapsed run time: 4.452427557 seconds.

concurrent.futures.ProcessPoolExecutor is a wrapper around multiprocessing.Pool. It has the same limitations as the ThreadPoolExecutor. If you want more control over multiprocessing, use multiprocessing.Pool. concurrent.futures provides an abstraction over both multiprocessing and threading, making it easy to switch between the two.