Concurrent Request Optimization
Concurrent processing allows you to handle multiple API requests simultaneously, significantly improving performance when dealing with batch operations or multiple independent queries.
Asynchronous Batch Requests
import asyncio
import aiohttp
import json
import time
class AsyncAIClient:
def __init__(self, api_key, base_url, max_concurrent=5):
self.api_key = api_key
self.base_url = base_url
self.semaphore = asyncio.Semaphore(max_concurrent)
async def chat_single(self, session, prompt, model="gpt-3.5-turbo"):
"""Single asynchronous request"""
async with self.semaphore: # Control concurrency
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
data = {
"model": model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 1000
}
async with session.post(
f"{self.base_url}/v1/chat/completions",
headers=headers,
json=data
) as response:
result = await response.json()
return {
"prompt": prompt,
"response": result["choices"][0]["message"]["content"],
"usage": result.get("usage", {})
}
async def chat_batch(self, prompts, model="gpt-3.5-turbo"):
"""Batch asynchronous requests"""
async with aiohttp.ClientSession() as session:
tasks = [
self.chat_single(session, prompt, model)
for prompt in prompts
]
return await asyncio.gather(*tasks)
# Usage example
async def main():
client = AsyncAIClient("your-api-key", "https://ai.machinefi.com")
prompts = [
"What is Python?",
"What is JavaScript?",
"What is Go language?",
"What is Rust?",
"What is Java?"
]
start_time = time.time()
results = await client.chat_batch(prompts)
end_time = time.time()
print(f"Processed {len(prompts)} requests in {end_time - start_time:.2f} seconds")
for result in results:
print(f"Question: {result['prompt']}")
print(f"Answer: {result['response'][:100]}...")
print(f"Token usage: {result['usage']}")
print("-" * 50)
# Run async task
asyncio.run(main())Advanced Concurrent Processing
Rate-Limited Concurrent Client
Batch Processing with Progress Tracking
Performance Optimization Strategies
Connection Pooling and Session Reuse
Error Handling in Concurrent Requests
Best Practices for Concurrent Processing
Rate Limiting: Respect API rate limits to avoid throttling
Connection Pooling: Reuse HTTP connections for better performance
Error Handling: Implement robust retry mechanisms with exponential backoff
Memory Management: Monitor memory usage with large batch operations
Progress Tracking: Provide feedback for long-running batch operations
Graceful Shutdown: Properly close connections and handle interruptions
Resource Limits: Use semaphores to control maximum concurrent requests

