Полный справочник по Airflow API для DAG: создание, запуск и мониторинг рабочих процессов

Apache Airflow зарекомендовал себя как мощный инструмент для оркестрации сложных рабочих процессов, позволяя инженерам данных и MLOps-специалистам эффективно управлять ETL-конвейерами, задачами машинного обучения и другими автоматизированными процессами. Однако по мере роста масштаба и сложности систем, ручное взаимодействие с веб-интерфейсом Airflow становится неэффективным и ограничивающим.

Именно здесь на помощь приходит программный интерфейс Airflow. Он предоставляет гибкие возможности для автоматизации практически всех аспектов управления DAG (Directed Acyclic Graphs) — от их создания и модификации до запуска, мониторинга и получения детальной информации о выполнении.

Данный справочник призван стать всеобъемлющим руководством по использованию различных API Airflow, включая REST API, Python API и CLI, для полного контроля над жизненным циклом ваших рабочих процессов. Мы рассмотрим, как программно создавать и обновлять DAG, инициировать их выполнение с передачей параметров, а также эффективно отслеживать статус задач и получать доступ к важным метаданным. Цель — дать вам инструменты для построения полностью автоматизированных и масштабируемых решений на базе Airflow.

Обзор Airflow API и его роль в управлении DAG

Apache Airflow предлагает три основных интерфейса для взаимодействия с DAG, каждый со своим назначением. Airflow REST API служит для внешнего программного управления, позволяя автоматизировать запуск, мониторинг и управление DAG через HTTP-запросы. Это идеальный выбор для интеграции с внешними системами и создания пользовательских интерфейсов.

Airflow Python API предназначен для глубокой интеграции и расширения функциональности Airflow. Он предоставляет прямой доступ к внутренним объектам, что критично при разработке кастомных операторов, хуков и плагинов. Airflow CLI (Command Line Interface) используется для ручного управления и скриптинга, предлагая быстрый доступ к основным командам из терминала.

Программное управление DAG через эти API критически важно для сложных рабочих процессов. Оно обеспечивает автоматизацию, масштабируемость, интеграцию в CI/CD конвейеры и возможность динамического создания DAG, значительно повышая гибкость и эффективность оркестрации.

Различия и назначение Airflow REST API, Python API и CLI

Apache Airflow предоставляет несколько интерфейсов для взаимодействия, каждый из которых предназначен для различных сценариев использования. Понимание их различий критически важно для эффективного управления рабочими процессами.

  • Airflow REST API служит для программного взаимодействия с Airflow из внешних систем. Он идеально подходит для интеграции с CI/CD пайплайнами, создания кастомных пользовательских интерфейсов или автоматизации кросс-платформенных задач. REST API обеспечивает стандартизированный HTTP-интерфейс с JSON-ответами, позволяя управлять DAG, запускать их и мониторить статус выполнения.

  • Airflow Python API используется непосредственно для определения DAG и задач внутри файлов DAG. Это основной способ создания логики рабочих процессов, написания кастомных операторов, хуков и сенсоров. Он предоставляет прямой доступ к внутренним компонентам Airflow, позволяя разработчикам гибко описывать сложные зависимости и операции.

  • Airflow CLI (Command Line Interface) предназначен для ручного управления и автоматизации административных задач через командную строку. С его помощью можно запускать DAG, проверять их статус, управлять соединениями, переменными и выполнять отладку. CLI удобен для скриптов и быстрой диагностики.

Преимущества программного управления DAG в сложных рабочих процессах

Программное управление DAG через API становится незаменимым инструментом в условиях сложных, динамически изменяющихся рабочих процессов. Оно позволяет автоматизировать рутинные операции, такие как создание, обновление и удаление DAG, что критически важно для сред с большим количеством постоянно эволюционирующих пайплайнов.

Основные преимущества включают:

  • Масштабируемость и автоматизация: Возможность генерировать и развертывать сотни DAG-ов на основе шаблонов или внешних конфигураций, значительно сокращая ручной труд.

  • Интеграция с CI/CD: Встраивание управления DAG в конвейеры непрерывной интеграции и доставки, обеспечивая версионирование, тестирование и автоматическое развертывание изменений.

  • Динамическое конфигурирование: Передача параметров запуска (DAG Run Config) для адаптации поведения DAG без изменения его кода, что полезно для обработки различных источников данных или сценариев.

  • Централизованный контроль и мониторинг: Создание кастомных панелей управления или систем оповещения, которые взаимодействуют с Airflow для получения детального статуса и управления рабочими процессами из единой точки.

  • Улучшенная обработка ошибок: Программное реагирование на сбои, автоматический перезапуск задач или DAG-ов, а также реализация сложной логики восстановления.

Подготовка среды для работы с Airflow API

Для эффективного взаимодействия с Airflow API необходимо правильно подготовить рабочую среду. В первую очередь, это касается установки необходимых клиентских библиотек. Для работы с Python API и CLI достаточно установить основной пакет apache-airflow. Если же вы планируете использовать REST API, то, помимо стандартных HTTP-клиентов вроде requests или httpx, специализированные клиентские библиотеки могут упростить взаимодействие, хотя прямые HTTP-запросы также являются распространенной практикой.

Ключевым аспектом является безопасная аутентификация. Airflow поддерживает несколько методов:

  • Базовая аутентификация (Basic Auth): Использование имени пользователя и пароля, настроенных в Airflow.

  • Токен-аутентификация: Создание и использование токенов доступа через веб-интерфейс Airflow, что предпочтительнее для автоматизированных скриптов.

  • OAuth/OpenID Connect: Для более сложных корпоративных сред, интегрированных с внешними провайдерами идентификации.

Важно обеспечить безопасное хранение и передачу учетных данных, избегая их жесткого кодирования в коде.

Установка и настройка клиентских библиотек для Airflow API

Для взаимодействия с Airflow API, в зависимости от выбранного подхода, требуются различные клиентские библиотеки.

Для использования Python API и CLI достаточно установить основной пакет Apache Airflow. Это обеспечивает доступ ко всем необходимым классам и функциям для программного взаимодействия, а также к утилите командной строки airflow:

pip install apache-airflow

При работе с Airflow REST API специализированная клиентская библиотека от Airflow не требуется. Вместо этого используются стандартные HTTP-клиенты, доступные в большинстве языков программирования. Например, в Python широко применяется библиотека requests:

pip install requests

После установки необходимо настроить клиент для подключения к вашему экземпляру Airflow. Для CLI и Python API это часто делается через переменные окружения, такие как AIRFLOW_HOME, или через файл airflow.cfg. Для REST API достаточно указать базовый URL веб-сервера Airflow (например, http://localhost:8080) при выполнении HTTP-запросов.

Методы аутентификации и авторизации для безопасного доступа к API

После настройки клиентских библиотек и определения URL веб-сервера Airflow, критически важно обеспечить безопасный доступ к API. Airflow поддерживает несколько методов аутентификации для REST API, обеспечивая гибкость и безопасность:

  • Базовая аутентификация (Basic Auth): Это наиболее распространенный метод, при котором имя пользователя и пароль (закодированные в Base64) передаются в заголовке Authorization каждого HTTP-запроса. Пользователи должны быть предварительно созданы в Airflow с соответствующими ролями и разрешениями. Рекомендуется использовать этот метод только через HTTPS для предотвращения перехвата учетных данных.

  • Токены-носители (Bearer Tokens): Airflow также поддерживает аутентификацию на основе токенов, обычно JSON Web Tokens (JWT). Токен получается путем отправки учетных данных на специальный эндпоинт (/api/v1/login), а затем передается в заголовке Authorization: Bearer <token> для последующих запросов. Этот метод предпочтителен для автоматизированных систем, так как токены могут иметь ограниченный срок действия и быть отозваны.

Для Python API и CLI аутентификация обычно происходит неявно, используя учетные данные текущего пользователя системы или настройки, определенные в airflow.cfg или переменных окружения, если команды выполняются на том же сервере, где развернут Airflow. При удаленном доступе через Python-клиенты к REST API применяются те же методы, что и для любого другого HTTP-клиента.

Программное создание и модификация DAG

После настройки безопасного доступа к Airflow API, следующим шагом является понимание того, как программно определять и управлять DAG. Хотя Airflow DAGs традиционно создаются как Python-файлы, их определение и модификация являются ключевым аспектом программного управления.

Определение DAG и задач с использованием Python API и TaskFlow API

Основой любого DAG является его определение в Python-файле. Airflow предоставляет Python API для создания объектов DAG и определения задач с помощью операторов (BaseOperator). Современный подход, TaskFlow API, значительно упрощает этот процесс, позволяя определять задачи как обычные Python-функции с помощью декоратора @task. Это улучшает читаемость и упрощает передачу данных между задачами (XCom).

Реклама
  • Python API: Использует DAG контекстный менеджер и явное создание экземпляров операторов (например, BashOperator, PythonOperator).

  • TaskFlow API: Позволяет превращать Python-функции в задачи, автоматически управляя зависимостями и передачей данных.

Управление жизненным циклом DAG-файлов и их обновление через API

Хотя REST API не используется для написания Python-кода DAG, он играет роль в управлении их жизненным циклом. После создания или модификации Python-файла DAG, он должен быть размещен в dags_folder Airflow. Airflow автоматически обнаруживает и парсит эти файлы. Через REST API можно:

  • Проверить статус DAG: Убедиться, что новый или обновленный DAG был успешно загружен планировщиком.

  • Включить/отключить DAG: Изменить активность DAG без прямого редактирования файла.

Автоматизация развертывания DAG-файлов в dags_folder часто реализуется через CI/CD-пайплайны, которые, по сути, являются программным методом управления жизненным циклом DAG.

Определение DAG и задач с использованием Python API и TaskFlow API

Определение DAG и задач в Airflow традиционно осуществляется с помощью Python API. Это включает создание экземпляра класса DAG и последующее инстанцирование операторов (например, BashOperator, PythonOperator) для каждой задачи. Операторы связываются между собой с использованием битовых сдвигов (>>, <<) для определения зависимостей, формируя граф выполнения.

С появлением TaskFlow API процесс определения задач значительно упростился, особенно для Python-ориентированных рабочих процессов. Используя декоратор @task (или специализированные @task.bash, @task.python и т.д.), обычные Python-функции могут быть преобразованы в задачи Airflow. TaskFlow API автоматически обрабатывает передачу данных между задачами через XCom, что делает код более читаемым и лаконичным, минимизируя бойлерплейт.

Управление жизненным циклом DAG-файлов и их обновление через API

После определения DAG и его задач, как было показано ранее, файл с определением DAG необходимо разместить в папке DAGs, доступной для планировщика Airflow. Airflow автоматически обнаруживает новые или измененные файлы DAG, парсит их и обновляет внутреннее представление. Этот процесс обычно не требует прямого взаимодействия с API для "загрузки" файла, так как он основан на файловой системе.

Программное управление жизненным циклом DAG через API в основном касается их активации, деактивации и получения метаданных. REST API Airflow предоставляет эндпоинты для этих целей. Например, для включения или отключения DAG можно использовать метод PATCH /dags/{dag_id} с соответствующим телом запроса, устанавливающим статус is_paused. Это позволяет динамически управлять доступностью и видимостью DAG без прямого доступа к файловой системе или веб-интерфейсу.

При обновлении файла DAG Airflow автоматически перечитывает его, и изменения применяются при следующем запуске DAG или при обновлении его состояния планировщиком. API позволяет убедиться, что изменения были зарегистрированы, запрашивая текущие метаданные DAG.

Запуск и управление выполнением DAG

После того как DAG определен и активирован, следующим шагом является управление его выполнением. Airflow API предоставляет мощные инструменты для программного запуска DAG-ов по требованию и контроля их жизненного цикла.

Инициирование запусков DAG по требованию и передача параметров (Config)

Для запуска DAG программно используется эндпоинт REST API /dags/{dag_id}/dagRuns. Этот метод позволяет не только инициировать новый запуск, но и передавать пользовательские параметры в виде JSON-объекта в поле conf. Эти параметры становятся доступными внутри DAG через объект dag_run.conf, что обеспечивает гибкость и динамичность рабочих процессов. Например, можно передать дату обработки, идентификатор файла или другие конфигурационные данные.

Приостановка, возобновление и остановка активных DAG-ов

Хотя активация/деактивация DAG-ов управляет их планированием, API также позволяет взаимодействовать с активными запусками DAG. Вы можете приостановить или возобновить выполнение отдельных задач в рамках запущенного DAG, а также отметить запуск как успешный или неуспешный, что может быть полезно для ручного вмешательства или исправления ошибок. Для этого используются соответствующие эндпоинты, например, для изменения состояния задач или DAG-ранов.

Инициирование запусков DAG по требованию и передача параметров (Config)

Для инициирования запуска DAG по требованию через Airflow REST API используется эндпоинт /api/v1/dags/{dag_id}/dagRuns с методом POST. Это позволяет внешним системам или скриптам запускать рабочие процессы, когда это необходимо, без ожидания планового запуска.

Ключевой особенностью является возможность передачи произвольных параметров конфигурации (Config) в запускаемый DAG. Эти параметры передаются в теле запроса JSON в поле conf. Например:

{
  "conf": {
    "data_interval_start": "2026-03-01T00:00:00Z",
    "target_table": "my_reporting_table"
  }
}

Внутри DAG эти параметры доступны через объект dag_run.conf. Это обеспечивает гибкость для динамической настройки поведения DAG, например, для обработки данных за определенный период или изменения целевых ресурсов.

Приостановка, возобновление и остановка активных DAG-ов

После запуска DAG может возникнуть необходимость временно приостановить его выполнение или полностью остановить активные запуски. Airflow API предоставляет для этого соответствующие эндпоинты.

Для приостановки или возобновления DAG можно использовать метод PATCH к эндпоинту /dags/{dag_id}. В теле запроса необходимо передать JSON-объект с полем is_paused, установив его в true для приостановки или false для возобновления.

Пример запроса для приостановки:

{
  "is_paused": true
}

Остановка активных запусков DAG или отдельных задач обычно реализуется через очистку (clearing) их статуса. Это можно сделать, используя эндпоинт /dagRuns/{dag_run_id}/clear или /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/clear для более гранулярного контроля. Очистка сбрасывает статус выполнения, позволяя планировщику перезапустить задачи или пометить их как пропущенные, в зависимости от конфигурации.

Мониторинг и получение информации о DAG

После того как мы научились программно управлять запусками DAG, следующим логичным шагом является их мониторинг и получение детальной информации. Airflow API предоставляет мощные инструменты для отслеживания статуса выполнения, доступа к логам и извлечения метаданных.

Получение статуса выполнения DAG и отдельных задач через API

Для контроля за ходом выполнения рабочих процессов можно использовать REST API Airflow. Эндпоинты /api/v1/dagRuns и /api/v1/taskInstances позволяют получить текущий статус запусков DAG и отдельных задач соответственно. Вы можете фильтровать результаты по dag_id, state (например, success, failed, running) и другим параметрам, чтобы быстро оценить состояние вашей оркестрации.

Доступ к логам задач, XCom-ам и метаданным DAG

  • Логи задач: Для отладки и анализа ошибок критически важен доступ к логам. Airflow REST API предоставляет эндпоинт /api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/logs/{task_try_number} для получения логов конкретной попытки выполнения задачи.

  • XCom-ы: Данные, передаваемые между задачами (XComs), также доступны через API по эндпоинту /api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcoms. Это позволяет программно извлекать результаты выполнения задач.

  • Метаданные DAG: Общая информация о DAG, такая как расписание, статус паузы (is_paused) и другие конфигурационные параметры, может быть получена через эндпоинт /api/v1/dags/{dag_id}.

Получение статуса выполнения DAG и отдельных задач через API

Для эффективного мониторинга рабочих процессов Airflow предоставляет мощные возможности через REST API. Получение статуса выполнения DAG-ов и отдельных задач является ключевым аспектом оперативного контроля:

  • Статус DAG-ранов: Используйте эндпоинт /api/v1/dags/{dag_id}/dagRuns для получения списка всех запусков DAG с их текущим статусом (например, running, success, failed). Можно применять фильтрацию по статусу или временному диапазону.

  • Статус экземпляров задач: Для детализированного контроля, эндпоинт /api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances позволяет получить статус каждого экземпляра задачи в конкретном DAG-ране. Это дает представление о прогрессе и возможных ошибках на уровне отдельных шагов.

Доступ к логам задач, XCom-ам и метаданным DAG

Для глубокого анализа и отладки рабочих процессов Airflow API предоставляет обширные возможности доступа к метаданным:

  • Логи задач: Через REST API можно получить потоки логов для конкретных экземпляров задач, используя эндпоинт /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/logs. Это критически важно для удаленной диагностики и мониторинга ошибок.

  • XCom-ы: Значения XCom, используемые для обмена данными между задачами, доступны через эндпоинт /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcoms. Это позволяет программно извлекать промежуточные результаты.

  • Метаданные DAG: Помимо статуса, API предоставляет доступ к детальной информации о конфигурации DAG, его последних запусках и параметрах через соответствующие эндпоинты, например, /dags/{dag_id}.

Заключение

В данном справочнике мы подробно рассмотрели возможности Airflow API для комплексного управления DAG: от программного создания и модификации до запуска, мониторинга и получения детальной информации о выполнении. Использование REST API, Python API и CLI предоставляет разработчикам и инженерам мощные инструменты для автоматизации, интеграции и масштабирования рабочих процессов. Освоение этих интерфейсов позволяет значительно повысить эффективность эксплуатации Airflow, создавая более гибкие, надежные и легко управляемые решения для оркестрации данных и задач.


Добавить комментарий