Using Async Worker Pool in Python for Efficient Task Execution

Published: September 23, 2023

When developing applications that require high-concurrency, efficiency, and robust error-handling, the Async Worker Pool pattern in Python can be a valuable pattern. It's particularly useful when:

  1. High-Concurrency Requirement: Ideal for handling a large number of tasks simultaneously, such as multiple API calls or file fetches.
  2. Rate Limiting: Helps you respect API rate limits by controlling the number of workers.
  3. Efficient Resource Utilization: Minimizes system overhead by reusing worker coroutines instead of creating new threads or processes for each task.
  4. Fetching Data from Multiple Sources: Perfect for simultaneous data gathering from various providers.
  5. Dynamic Task Addition: The task queue holds incoming tasks until a worker is available, offering flexibility.
  6. Centralized Error Handling: Manages errors in a centralized way, easing the debugging process.

In this blog post, I'll break down the code that demonstrates this pattern, helping you understand how to implement it in your own projects for enhanced efficiency and control.

Overview

The code example uses the asyncio library to create a pool of worker coroutines that execute tasks from a queue. The main features are:

  1. Async Functions: Asynchronous functions (async def) for fetching data and for the worker.
  2. Task Queue: An asyncio.Queue to store tasks.
  3. Workers: Multiple workers consuming tasks from the queue and executing them.

Breaking Down the Code

Asynchronous Function for Data Fetching

async def fetch_data(root_path):
    await asyncio.sleep(0.001)
    print(f"Done {root_path}")
    return [root_path]

🔍 This function simulates fetching data and takes root_path as an argument.

Main Function with Workers

async def main(run_column):
    result_data = []
    async def worker(task_queue):
        # Some worker logic here

🔍 The main() function contains the worker function and the logic for task execution.

Worker Function

Inside main(), the worker function does the following:

  1. Takes tasks from the queue.
  2. Executes them.
  3. Stores the result.
while not task_queue.empty():
    task = await task_queue.get()
    try:
        returned = await task
        result_data.extend(returned)
    except Exception as e:
        print(f"Error executing task: {e}")

Task Queue and Execution

task_queue = asyncio.Queue()
[task_queue.put_nowait(asyncio.create_task(fetch_data(root_path=row['1']))) for index, row in run_column.iterrows()]
await asyncio.gather(*[worker(task_queue) for _ in range(10)])

🔍 Tasks are added to task_queue. Then, 10 worker coroutines are launched to consume the tasks.

Running the Main Function

if __name__ == '__main__':
    import pandas as pd
    with open('./dummy.csv', newline='') as csvfile:
        data = pd.read_csv(csvfile)
    result = asyncio.run(main(data))
    print(result)

🔍 This part reads data from a CSV file and then runs the main() function.

Key Takeaways

  • A pool of async workers can efficiently execute multiple tasks.
  • asyncio.Queue is useful for managing tasks in a queue.
  • Multiple workers can work concurrently, improving speed and resource utilization.

Overall, the code for this pattern now looks like:

import asyncio
 
async def fetch_data(root_path):
    await asyncio.sleep(0.001)
    print(f"Done {root_path}")
    return [root_path]
 
 
async def main(run_column):
    result_data = []
 
    async def worker(task_queue):
        while not task_queue.empty():
            task = await task_queue.get()
            try:
                returned = await task
                result_data.extend(returned)
            except Exception as e:
                print(f"Error executing task: {e}")
 
    try:
        task_queue = asyncio.Queue()
        [task_queue.put_nowait(asyncio.create_task(fetch_data(root_path=row['1']))) for index, row in run_column.iterrows()]
        await asyncio.gather(*[worker(task_queue) for _ in range(10)])
 
    except Exception as e:
        print(f"\nUnable to get data: {e}\n")
 
    return result_data
 
if __name__ == '__main__':
    import pandas as pd
 
    with open('./dummy.csv', newline='') as csvfile:
        data = pd.read_csv(csvfile)
 
    result = asyncio.run(main(data))
    print(result)