Bidirectional Pydantic serialization for Celery, Dramatiq, and Arq. One shared wire codec: pass models on enqueue, get models back from results.
Celery 5.5+ pydantic=True only validates on the worker. Callers still model_dump() before .delay(), and .get() returns a dict. Dramatiq chokes on models and UUIDs. Arq defaults to pickle. queuebridge fixes all three with a thin codec + backend adapters.
pip install queuebridgeExtras:
pip install queuebridge[celery] # Celery + Kombu
pip install queuebridge[dramatiq] # Dramatiq
pip install queuebridge[arq] # Arq + msgpack
pip install queuebridge[all] # all backendsRequires Python 3.10+ and Pydantic v2.
Documentation: https://queuebridge.readthedocs.io
from celery import Celery
from queuebridge.celery import register_queuebridge, typed_result
from myapp.models import OrderCreate, OrderResult
app = Celery("orders", broker="redis://localhost:6379/0")
register_queuebridge(app)
@app.task(pydantic=True)
def process_order(order: OrderCreate) -> OrderResult:
return OrderResult(id=order.id, status="processed")
ar = process_order.delay(OrderCreate(id=1, sku="ABC"))
result = typed_result(ar, OrderResult).get(timeout=10)import dramatiq
from pydantic import validate_call
from queuebridge.dramatiq import register_queuebridge
from myapp.models import OrderCreate
register_queuebridge()
@dramatiq.actor
@validate_call
def process(order: OrderCreate):
print(order)
process.send(OrderCreate(id=1, sku="ABC"))from arq.connections import RedisSettings
from pydantic import validate_call
from queuebridge.arq import get_serializer_pair, qb_task, typed_result
from myapp.models import OrderCreate, OrderResult
serialize, deserialize = get_serializer_pair()
@qb_task
@validate_call
async def process_order(ctx, order: OrderCreate) -> OrderResult:
return OrderResult(id=order.id, status="ok")
class WorkerSettings:
functions = [process_order]
redis_settings = RedisSettings()
job_serializer = serialize
job_deserializer = deserializeRecursively transform a Python value into a JSON-serializable structure.
Required
Type: Any
The value to encode: Pydantic models, nested containers, UUID, datetime, Decimal, Enum, etc.
Type: boolean
Default: true
When true, BaseModel instances are wrapped in a __qb__ envelope with a fully-qualified type name. When false, models are dumped with model_dump(mode="json") only.
from queuebridge import encode, decode
from myapp.models import OrderCreate
wire = encode(OrderCreate(id=1, sku="ABC"))
restored = decode(wire, OrderCreate)Recursively decode a wire value back to Python using an optional type hint.
Required
Type: Any
Wire value: primitives, lists, dicts, or __qb__ envelopes.
Type: Any
Default: Any
Type hint used for validation. TypeAdapter(hint).validate_python() is used when the hint is concrete.
Type: boolean
Default: false
When true, raise QueuebridgeDecodeError if the value cannot be decoded.
Recursively unwrap __qb__ envelopes without type hints. Used internally by Dramatiq's decoder.
Type: Any -> Any
Register the queuebridge-json Kombu serializer on a Celery app. Idempotent: safe to call twice.
Required
Type: celery.Celery
Type: boolean
Default: false
Reserved for future strict decode behavior.
Sets task_serializer, result_serializer, and accept_content on the app.
Wrap a Celery AsyncResult so .get() returns a Pydantic model instead of a dict.
Required
Type: celery.result.AsyncResult
Required
Type: type[T]
Returns TypedAsyncResult[T], which proxies .id, .state, .ready(), etc.
Celery cannot safely monkey-patch
AsyncResult.get()globally. Usetyped_result()on the client.
Install QueuebridgeEncoder via dramatiq.set_encoder(). Call once at process startup.
Type: dramatiq.Broker | None
Default: None
If provided, also calls dramatiq.set_broker(broker).
Returns (serialize, deserialize) callables for job_serializer / job_deserializer.
serialize, deserialize = get_serializer_pair()Uses msgpack over queuebridge-encoded dicts. Set on both WorkerSettings and create_pool().
Decorator that decodes wire args/kwargs using function type hints before your async task runs.
Apply outside @validate_call:
@qb_task
@validate_call
async def process_order(ctx, order: OrderCreate) -> OrderResult:
...result = await typed_result(job, OrderResult)Decode the job.result() payload into a Pydantic model.
Non-JSON-native values use a tagged envelope:
{
"__qb__": {
"t": "myapp.models.OrderCreate",
"v": 1,
"d": {"id": 1, "sku": "ABC"}
}
}| Python type | Encode | Decode |
|---|---|---|
BaseModel |
envelope + model_dump(mode="json") |
model_validate or FQN import |
UUID, datetime, Decimal, Enum |
tagged envelope | builtin dispatch |
list, dict, set, tuple |
recurse | recurse via hint |
| Primitives | pass through | pass through |
A plain dict + OrderCreate hint still validates. Tags are for ambiguity, not required when hints are known.
Producer Worker Client
------ ------ ------
.delay(model) FAIL> pydantic=True validates .get() -> dict
(model_dump() required) args on worker only
- celery#9442: models not JSON-serializable on enqueue
- dramatiq#660: no Pydantic support
- arq#497: pickle default, Pydantic requested
| Solution | Celery | Dramatiq | Arq | Typed .get() |
|---|---|---|---|---|
Celery pydantic=True |
worker only | n/a | n/a | no |
| Blog / msgpack hacks | partial | partial | partial | varies |
| queuebridge | yes | yes | yes | yes |
Deserialization resolves types by fully-qualified name (import_fqn). Only deserialize from brokers you trust.
ALLOWED_MODULE_PREFIXES allowlisting is planned for v0.2.
| Path | Description |
|---|---|
examples/celery_fastapi/ |
FastAPI enqueue + typed result polling |
examples/dramatiq_example/ |
Dramatiq + validate_call |
examples/arq_example/ |
Arq worker with custom serializers |
examples/smoke_test_complex.py |
End-to-end smoke test (no Redis) |
pypi_verify/run_complex.py |
PyPI install verification script |
- Celery Pydantic docs: worker-only validation
- Arq custom serializers: msgpack hook point
- Dramatiq encoders:
set_encoder()extension point
MIT. See LICENSE.