- SOTA Embedding Retrieval: Gemini + pgvector for Production Chat
- A Review of Agentic Design Patterns
- Model Context Protocol (MCP) and MCP Servers in LLM Agent Systems
- Building AI Agents for Automated Multi-Format Content: From News to Podcasts
- Rediscovering Cursor
- GraphRAG > Traditional Vector RAG
- Cultural Bias in LLMs
- Mapping out the AI Landscape with Topic Modelling
- Sustainable Cloud Computing: Carbon-Aware AI
- Defensive Technology for the Next Decade of AI
- Situational Awareness: The Decade Ahead
- Mechanistic Interpretability: A Survey
- Why I Left Ubuntu
- Multi-Agent Collaboration
- Embeddings and Vector Databases: Enhancing Retrieval Systems
- Building an Automated Newsletter-to-Summary Pipeline with OpenAI: Zapier AI Actions vs AWS SES & Lambda
- Local AI Image Generation
- MLOps: Deploying a Distributed Ray Python Server with Kubernetes, EKS & KubeRay
- Making the Switch to Linux for Development: A Developer's Experience
- Scaling Options Pricing with Ray
- ›The Async Worker Pool
- Browser Fingerprinting: Introducing My First NPM Package
- Reading Data from @socket.io/redis-emitter without Using a Socket.io Client
- Socket.io Middleware for Redux Store Integration
- Sharing TypeScript Code Between Microservices: A Guide Using Git Submodules
- Efficient Dataset Storage: Beyond CSVs
- Embracing Next.js 13: Why I switched from Plain React
- Deploy & Scale Socket.io Containers in ECS with Elasticache
- Implementing TOTP Authentication in Python using PyOTP
- Simplifying Lambda Layer ARNs and Creating Custom Layers in AWS
- TimeScaleDB Deployment: Docker Containers and EC2 Setup
- How to SSH into an EC2 Instance Using PuTTY
When developing applications that require high-concurrency, efficiency, and robust error-handling, the Async Worker Pool pattern in Python can be a valuable pattern. It's particularly useful when:
- High-Concurrency Requirement - Ideal for handling a large number of tasks simultaneously, such as multiple API calls or file fetches
- Rate Limiting - Helps you respect API rate limits by controlling the number of workers
- Efficient Resource Utilization - Minimizes system overhead by reusing worker coroutines instead of creating new threads or processes for each task
- Fetching Data from Multiple Sources - Perfect for simultaneous data gathering from various providers
- Dynamic Task Addition - The task queue holds incoming tasks until a worker is available, offering flexibility
- Centralized Error Handling - Manages errors in a centralized way, easing the debugging process
In this blog post, I'll break down the code that demonstrates this pattern, helping you understand how to implement it in your own projects for enhanced efficiency and control.
Overview
The code example uses the asyncio
library to create a pool of worker coroutines that execute tasks from a queue. The main features are:
- Async Functions - Asynchronous functions (
async def
) for fetching data and for the worker - Task Queue - An
asyncio.Queue
to store tasks - Workers - Multiple workers consuming tasks from the queue and executing them
Implementation Details
Asynchronous Function for Data Fetching
async def fetch_data(root_path):
await asyncio.sleep(0.001)
print(f"Done {root_path}")
return [root_path]
This function simulates fetching data and takes root_path
as an argument. The asyncio.sleep(0.001)
simulates I/O operations that would occur in real-world scenarios.
Main Function with Workers
async def main(run_column):
result_data = []
async def worker(task_queue):
# Worker logic implementation
The main()
function contains the worker function and orchestrates the entire task execution process.
Worker Function Logic
Inside main()
, the worker
function performs three key operations:
- Takes tasks from the queue - Continuously monitors for available tasks
- Executes them - Processes each task asynchronously
- Stores the result - Aggregates results in a shared data structure
while not task_queue.empty():
task = await task_queue.get()
try:
returned = await task
result_data.extend(returned)
except Exception as e:
print(f"Error executing task: {e}")
Task Queue Setup and Execution
task_queue = asyncio.Queue()
[task_queue.put_nowait(asyncio.create_task(fetch_data(root_path=row['1']))) for index, row in run_column.iterrows()]
await asyncio.gather(*[worker(task_queue) for _ in range(10)])
Tasks are added to task_queue
using list comprehension. Then, 10 worker coroutines are launched simultaneously to consume the tasks, providing controlled concurrency.
Running the Application
if __name__ == '__main__':
import pandas as pd
with open('./dummy.csv', newline='') as csvfile:
data = pd.read_csv(csvfile)
result = asyncio.run(main(data))
print(result)
This section reads data from a CSV file using pandas
and executes the main async function.
Key Benefits
The Async Worker Pool pattern provides several advantages for high-performance Python applications:
- Efficient Task Execution - A pool of async workers can process multiple tasks simultaneously without the overhead of thread creation
- Queue Management -
asyncio.Queue
provides thread-safe task distribution and load balancing - Controlled Concurrency - Multiple workers operate concurrently while respecting system limits and API rate constraints
- Resource Optimization - Worker reuse minimizes memory allocation and improves overall system performance
Complete Implementation
Here's the full implementation of the Async Worker Pool pattern:
import asyncio
async def fetch_data(root_path):
await asyncio.sleep(0.001)
print(f"Done {root_path}")
return [root_path]
async def main(run_column):
result_data = []
async def worker(task_queue):
while not task_queue.empty():
task = await task_queue.get()
try:
returned = await task
result_data.extend(returned)
except Exception as e:
print(f"Error executing task: {e}")
try:
task_queue = asyncio.Queue()
[task_queue.put_nowait(asyncio.create_task(fetch_data(root_path=row['1'])))
for index, row in run_column.iterrows()]
await asyncio.gather(*[worker(task_queue) for _ in range(10)])
except Exception as e:
print(f"\nUnable to get data: {e}\n")
return result_data
if __name__ == '__main__':
import pandas as pd
with open('./dummy.csv', newline='') as csvfile:
data = pd.read_csv(csvfile)
result = asyncio.run(main(data))
print(result)
This implementation demonstrates how to effectively manage concurrent task execution while maintaining clean error handling and resource efficiency. The pattern scales well for applications requiring high-throughput data processing with controlled concurrency limits.