
Как мы переписывали логику очередей: Celery => aio-pika => FastStream
GregTMJ 3 минуты назад Как мы переписывали логику очередей: Celery => aio-pika => FastStream Уровень сложности Средний Время на прочтение 8 мин Охват и читатели 0 Python * Микросервисы * Кейс Наш путь активной работы с...
Anthropic — What company has the best second artificial intelligence model at the end of June?
В сфере искусственного интеллекта произошло заметное событие. GregTMJ 3 минуты назад Как мы переписывали логику очередей: Celery => aio-pika => FastStream Уровень сложности Средний Время на прочтение 8 мин Охват и читатели 0 Python * Микросервисы * Кейс Наш путь активной работы с очередями RabbitMQ начался с классического Celery. Осознав критичность низкоуровневого контроля системы, принялись работать с aio-pika. Но и этот уровень слишком местами сложный (далее расскажу почему), и нашли отличное решение, на текущий момент, в лице FastStream.
Сразу оставлю такую пометку, что каждый инструмент подходит для решения своей задачи. Мы больше хотели сделать акцент на удобство и скорость разработки относительно затрачиваемого времени на миграции решений. : Код возможно покажется неоптимальным или старым.
Технические детали
Это всё наш дорогой Легаси. Постановка задачи Наша система построена на основе микросервисов, работающих с RabbitMQ. Внутри - обычный асинхронный код для похода на внешние API и в БД.
Требования: Надежный консьюминг - это для нас критично, чтобы сообщение шло по всему флоу и нигде не останавливалось без причин. Если ошибка падает, то это должно отражаться в 3 местах: БД, логи и метрики. Ретраи при ошибках обработки.
Трейсинг - поддержка OpenTelemetry. Мониторинг сервиса через healthcheck’и. Решение №1: Celery как консьюмер Почему Celery Celery — классический инструмент для фоновых задач, знакомый большинству Python-разработчиков.
Отраслевые последствия
Из коробки: декларативное описание задач, ретраи с экспоненциальной задержкой, хранение результатов, мониторинг через Flower, интеграции с фреймворками. Логика проста: пишешь @app. task , запускаешь воркер — и сообщения из очереди начинают обрабатываться.
Как мы его использовали Мы не отправляли задачи из кода в духе my_task. delay() , а настраивали Celery на прослушивание внешней очереди, куда сообщения попадали от других систем. По сути, Celery выступал как consumer: подключался к брокеру, забирал сообщения, десериализовал и передавал в наши обработчики.
Настройки вроде max_retries , default_retry_delay , countdown позволяли гибко управлять поведением при сбоях. Важно ещё подсветить, что результат всегда игнорируется с помощью параметра ignore_result=True поскольку все результаты записываются в БД. Пример инициализации воркера: def create_app( name, broker, include, backend=None, task_queues=None, liveness_probe=1, update_period=60, watcher_config={}, ): # Создание само приложение + наложение дополнительных конфигурации app = Celery(name, broker=broker, include=include, backend=backend) app.
Этот прогресс даёт важные сигналы о будущем отрасли, и технологический мир внимательно наблюдает.





