When developing applications that require high-concurrency, efficiency, and robust error-handling, the Async Worker Pool pattern in Python can be a valuable pattern. It's particularly useful when:
- High-Concurrency Requirement: Ideal for handling a large number of tasks simultaneously, such as multiple API calls or file fetches.
- Rate Limiting: Helps you respect API rate limits by controlling the number of workers.
- Efficient Resource Utilization: Minimizes system overhead by reusing worker coroutines instead of creating new threads or processes for each task.
- Fetching Data from Multiple Sources: Perfect for simultaneous data gathering from various providers.
- Dynamic Task Addition: The task queue holds incoming tasks until a worker is available, offering flexibility.
- Centralized Error Handling: Manages errors in a centralized way, easing the debugging process.
In this blog post, I'll break down the code that demonstrates this pattern, helping you understand how to implement it in your own projects for enhanced efficiency and control.
Overview
The code example uses the asyncio
library to create a pool of worker coroutines that execute tasks from a queue. The main features are:
- Async Functions: Asynchronous functions (
async def
) for fetching data and for the worker. - Task Queue: An
asyncio.Queue
to store tasks. - Workers: Multiple workers consuming tasks from the queue and executing them.
Breaking Down the Code
Asynchronous Function for Data Fetching
async def fetch_data(root_path):
await asyncio.sleep(0.001)
print(f"Done {root_path}")
return [root_path]
🔍 This function simulates fetching data and takes root_path
as an argument.
Main Function with Workers
async def main(run_column):
result_data = []
async def worker(task_queue):
# Some worker logic here
🔍 The main()
function contains the worker function and the logic for task execution.
Worker Function
Inside main()
, the worker
function does the following:
- Takes tasks from the queue.
- Executes them.
- Stores the result.
while not task_queue.empty():
task = await task_queue.get()
try:
returned = await task
result_data.extend(returned)
except Exception as e:
print(f"Error executing task: {e}")
Task Queue and Execution
task_queue = asyncio.Queue()
[task_queue.put_nowait(asyncio.create_task(fetch_data(root_path=row['1']))) for index, row in run_column.iterrows()]
await asyncio.gather(*[worker(task_queue) for _ in range(10)])
🔍 Tasks are added to task_queue
. Then, 10 worker coroutines are launched to consume the tasks.
Running the Main Function
if __name__ == '__main__':
import pandas as pd
with open('./dummy.csv', newline='') as csvfile:
data = pd.read_csv(csvfile)
result = asyncio.run(main(data))
print(result)
🔍 This part reads data from a CSV file and then runs the main()
function.
Key Takeaways
- A pool of async workers can efficiently execute multiple tasks.
asyncio.Queue
is useful for managing tasks in a queue.- Multiple workers can work concurrently, improving speed and resource utilization.
Overall, the code for this pattern now looks like:
import asyncio
async def fetch_data(root_path):
await asyncio.sleep(0.001)
print(f"Done {root_path}")
return [root_path]
async def main(run_column):
result_data = []
async def worker(task_queue):
while not task_queue.empty():
task = await task_queue.get()
try:
returned = await task
result_data.extend(returned)
except Exception as e:
print(f"Error executing task: {e}")
try:
task_queue = asyncio.Queue()
[task_queue.put_nowait(asyncio.create_task(fetch_data(root_path=row['1']))) for index, row in run_column.iterrows()]
await asyncio.gather(*[worker(task_queue) for _ in range(10)])
except Exception as e:
print(f"\nUnable to get data: {e}\n")
return result_data
if __name__ == '__main__':
import pandas as pd
with open('./dummy.csv', newline='') as csvfile:
data = pd.read_csv(csvfile)
result = asyncio.run(main(data))
print(result)