
Airflow TaskFlow API: внутреннее устройство современного способа писать DAG-и
eager_igor 11 минут назад Airflow TaskFlow API: внутреннее устройство современного способа писать DAG-и Средний 11 мин 398 Data Engineering * Python * Туториал Apache Airflow долгое время ассоциировался с таким стилем...
Anthropic — What company has the best second artificial intelligence model at the end of June?
В сфере искусственного интеллекта произошло заметное событие. eager_igor 11 минут назад Airflow TaskFlow API: внутреннее устройство современного способа писать DAG-и Средний 11 мин 398 Data Engineering * Python * Туториал Apache Airflow долгое время ассоциировался с таким стилем описания workflow:# объявляем задачи-таски task1 = PythonOperator(... ) task2 = BashOperator(... ) # проставляем зависимости между ними task1 >> task2Это рабочий и до сих пор актуальный подход, но с Airflow 2.
0 появился TaskFlow API — способ описывать DAG-и через обычные Python функции и декораторы:@dag(dag_id="linear_demo def tutorial_dag() @task def extract(): return 42 @task def transform(x): return x * 2 # описываем зависимости и строим Flow y = transform(extract()) # создаем даг tutorial_dag()TaskFlow в Airflow позволяет описывать DAG как обычный Python-код: @dag задает сам workflow/DAG, а @task превращает Python-функции в задачи Airflow. При вызове декорированных функций, например transform(extract()), выполняется не сам расчет, а создаются объекты задач, связи между ними и ссылки на их будущие результаты (через объект XComArg). То есть TaskFlow - это декларативный DSL(domain-specific language) для построения DAG, где вызовы функций не выполняют вычисления, а описывают граф зависимостей.
Технические детали
Задачи статьиВ этой статье попробуем:Заглянуть внутрь Airflow и понять, как работает TaskFlow API (для версии 3. 1)На основе этих идей написать собственный микро-фреймворк для закрепления понимания. Сохранить названия и общую логику внутренних объектов Airflow.
Понять главный архитектурный принцип: описание DAG ≠ выполнение DAG. Итак, давайте еще раз рассмотрим основные фазы TaskFlow:Создается DAG через @dag и вызов функции DAG-а. В этот момент Airflow создает объект DAG, входит в его контекст и начинает исполнять тело функции DAG-а для сборки графа.
Внутри тела DAG-а @task декорирует Python-функции. То есть extract, transform становятся не обычными функциями, а объектами-декораторами (_TaskDecorator), которые умеют создавать Airflow-задачи. При вызове декорированных функций, напримерy = transform(extract()) создаются объекты задач (операторы) и зависимости между задачами.
Отраслевые последствия
Именно здесь фактически собирается граф DAG. Позже scheduler и worker исполняют уже собранный DAG. Что делает @taskОбъект task в Airflow является специальным вызываемым объектом (TaskDecoratorCollection).
При декорировании функции он создаёт другой объект _TaskDecorator. исходники в Упрощённо:class TaskDecoratorCollection: def __call__(self, function): return _TaskDecorator(function) task = TaskDecoratorCollection()Напомню:@task def extract():эквивалентно: extract = task(extract)А значит будет вызван:TaskDecoratorCollection.
Этот прогресс даёт важные сигналы о будущем отрасли, и технологический мир внимательно наблюдает.





