Concurry is a unified, delightful concurrency library for Python. It replaces the fragmented landscape of threading, multiprocessing, asyncio, and Ray with a single, elegant API. Write your code once, and run it on a single thread, multiple cores, or a distributed clusterβwithout changing a line of business logic.
Calling LLMs sequentially is painfully slow. With Concurry, you can parallelize your existing code instantly.
Prerequisites: pip install concurry litellm
from pydantic import BaseModel
import litellm
# Line 1. Import concurry
from concurry import worker, gather
# Line 2. Add the @worker decorator to an existing class
@worker(mode="thread", max_workers=50)
class LLM(BaseModel):
model: str
def call(self, prompt: str) -> str:
# This runs in a separate thread!
return litellm.completion(
model=self.model,
messages=[{"role": "user", "content": prompt}]
).choices[0].message.content
# Initialize your worker (looks just like a normal class)
llm = LLM(model="gpt-3.5-turbo")
prompts = [f"What is {i} + {i}?" for i in range(100)]
results = [llm.call(prompt) for prompt in prompts] # Returns futures instantly, runs in parallel
# Line 3. gather futures
results = gather(results, progress=True) # Waits for all results
print(f"Processed {len(results)} prompts!")
llm.stop()The Result:
- Sequential: ~780 seconds
- Concurry: ~16 seconds (50x faster)
No refactoring. No concurrent.futures. No async def virus. No ray.remote.
Just your code, parallelized.
We think that's delicious π€€
pip install concurryFor distributed computing support:
pip install "concurry[ray]"For all features:
pip install "concurry[all]"Python's concurrency tools are scattered.
- Threading: Good for I/O, bad API (
concurrent.futures). - Multiprocessing: Good for CPU, hard to debug, pickling errors.
- Asyncio: High throughput, but requires rewriting everything (
async/await). - Ray: Powerful for clusters, but heavyweight for scripts.
Concurry abstracts all of these into a single interface.
You have to learn 4 different APIs to do the same thing.
# β Threading API
with ThreadPoolExecutor() as executor:
future = executor.submit(task, arg)
# β Multiprocessing API (Different behavior!)
with ProcessPoolExecutor() as executor:
future = executor.submit(task, arg)
# β Asyncio API (Rewrite everything!)
async def main():
await asyncio.create_task(async_task(arg))
# β Ray API (Another new API!)
ray.get(ray_task.remote(arg))One API, any backend.
from concurry import worker, gather
@worker
class MyWorker:
def do_work(self, x: int) -> int:
return x * 2
# Run on threads?
w = MyWorker.options(mode="thread", max_workers=10).init()
# Run on processes? Uncomment below.
# w = MyWorker.options(mode="process", max_workers=10).init()
# Run on a ray cluster? Uncomment below.
# w = MyWorker.options(mode="ray", max_workers=10).init()
# Run on asyncio? Uncomment below.
# w = MyWorker.options(mode="asyncio").init()
# The submission code NEVER changes:
futures = [w.do_work(i) for i in range(1000)]
# The collection code NEVER changes:
results = gather(futures, progress=True)
w.stop()Stateful workers that persist across calls. Perfect for database connections, model weights, or session management.
from concurry import worker
@worker(mode="thread")
class Counter:
def __init__(self):
self.count = 0
def increment(self) -> int:
self.count += 1
return self.count
# State is preserved!
counter = Counter()
print(counter.increment().result()) # 1
print(counter.increment().result()) # 2
counter.stop()Built-in rate limiting for APIs. Token buckets, sliding windows, and more, enforced globally across all workers.
from concurry import worker, gather, CallLimit
@worker(
mode="thread",
max_workers=20,
# Limit to 100 calls per minute across ALL 20 threads
limits=[CallLimit(window_seconds=60, capacity=100)]
)
class APIWorker:
def fetch(self, url: str):
# Rate limit is automatically checked here
return f"Fetched {url}"
pool = APIWorker()
futures = [pool.fetch(f"url_{i}") for i in range(200)]
results = gather(futures, progress=True) # Smoothly throttled!
pool.stop()Don't let flaky networks break your batch jobs. Configure retries declaratively.
from concurry import worker, RetryConfig
@worker(
mode="thread",
retry_config=RetryConfig(
max_retries=5,
retry_on=(ConnectionError, TimeoutError),
backoff_factor=2.0 # Exponential backoff: 1s, 2s, 4s, ...
)
)
class FlakyWorker:
def fetch(self):
# Automatically retried on failure!
passFull support for Pydantic models. Arguments are validated and coerced before they even reach the worker.
from concurry import worker
from pydantic import BaseModel, Field
@worker(mode="process")
class DataWorker(BaseModel):
db_url: str = Field(..., pattern=r"^postgres://")
def process(self, data: dict):
return data
# Validated at initialization!
try:
w = DataWorker(db_url="invalid-url")
except Exception as e:
print(f"Validation failed!: {e}") # Caught before worker startsJust want to run a function in parallel? You don't need a class.
from concurry import task, gather
import time
@task(mode="process", max_workers=4)
def heavy_computation(x: int) -> int:
time.sleep(1) ## Example heavy computation
return x
# Run 100 heavy computations in parallel
futures = [heavy_computation(i) for i in range(100)]
results = gather(futures, progress=True)
heavy_computation.stop()- User Guide: Tutorials and best practices.
- API Reference: Detailed API specs.
- Gallery: Production-ready examples (LLM pipelines, web scrapers).
We love contributions! Check out CONTRIBUTING.md to get started.
Apache 2.0 - See LICENSE for details.
Made with β€οΈ by Amazon Scientists
