Airflow: Настройка соединений с базами данных через SQLAlchemy, conn_id и extra args

Apache Airflow – мощный инструмент для оркестрации рабочих процессов, часто используемый для задач ETL и работы с данными. Ключевым аспектом работы с Airflow является настройка соединений с базами данных. В этой статье мы рассмотрим, как настроить соединения с базами данных в Airflow с использованием SQLAlchemy, conn_id и параметра extra_args, а также обсудим лучшие практики обеспечения безопасности.

Основы работы с соединениями в Airflow

Обзор типов соединений в Airflow и их назначение

Airflow поддерживает различные типы соединений для взаимодействия с разными системами, включая базы данных, облачные сервисы и API. Тип соединения определяет, как Airflow будет взаимодействовать с целевой системой. Для работы с базами данных часто используются соединения, поддерживающие SQLAlchemy. Примеры: PostgreSQL, MySQL, SQLite.

Настройка соединений через интерфейс Airflow UI и CLI

Соединения можно настроить двумя способами:

  1. Через веб-интерфейс Airflow UI:

    • Перейдите в раздел Admin -> Connections.

    • Нажмите кнопку Create.

    • Выберите тип соединения (например, PostgreSQL).

    • Заполните необходимые поля, такие как Host, Schema, Login, Password, Port.

    • Укажите Conn Id — уникальный идентификатор соединения.

  2. Через CLI (Command Line Interface):

    Используйте команду airflow connections add. Например:

    airflow connections add postgres_db --conn-type postgres --conn-host localhost --conn-schema public --conn-login airflow --conn-password airflow --conn-port 5432
    

SQLAlchemy и Airflow: Первые шаги

Настройка SQLAlchemy для подключения к базам данных (установка зависимостей)

SQLAlchemy – это мощная библиотека Python, предоставляющая гибкий и удобный способ работы с базами данных. Для использования SQLAlchemy в Airflow необходимо установить соответствующий драйвер для вашей базы данных. Например, для PostgreSQL:

pip install psycopg2-binary

Для MySQL:

pip install pymysql

Создание базового DAG для подключения к базе данных через SQLAlchemy

from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime

with DAG(
    dag_id='postgres_example',
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False
) as dag:
    create_table = PostgresOperator(
        task_id='create_table',
        postgres_conn_id='postgres_db',
        sql="""
            CREATE TABLE IF NOT EXISTS users (
                id SERIAL PRIMARY KEY,
                name VARCHAR(255)
            );
        """
    )

В этом примере postgres_conn_id указывает на соединение, настроенное для PostgreSQL.

Использование conn_id для управления соединениями

Что такое conn_id и как он используется в Airflow

conn_id – это уникальный идентификатор соединения, используемый в Airflow для ссылки на определенную конфигурацию соединения. Вместо того чтобы жестко задавать параметры подключения в коде DAG, вы указываете conn_id, и Airflow использует соответствующие учетные данные и параметры из конфигурации соединения.

Практическое применение conn_id в DAG: примеры с различными базами данных

from airflow import DAG
from airflow.providers.mysql.operators.mysql import MySqlOperator
from datetime import datetime

with DAG(
    dag_id='mysql_example',
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False
) as dag:
    create_table = MySqlOperator(
        task_id='create_table',
        mysql_conn_id='mysql_db',
        sql="""
            CREATE TABLE IF NOT EXISTS products (
                id INT AUTO_INCREMENT PRIMARY KEY,
                product_name VARCHAR(255)
            );
        """
    )
Реклама

Здесь mysql_conn_id ссылается на соединение MySQL, настроенное в Airflow.

Передача дополнительных аргументов (extra_args) для SQLAlchemy

Объяснение параметра extra_args и его роли в настройке соединений

Параметр extra_args (или extra в некоторых версиях Airflow) позволяет передавать дополнительные, специфичные для базы данных аргументы в строку подключения SQLAlchemy. Это полезно для настройки параметров, которые не предусмотрены стандартными полями в интерфейсе Airflow, например, для настройки SSL, указания кодировки и т.д.

Примеры передачи специфичных для базы данных аргументов (SSL, кодировка, т.д.)

Пример для PostgreSQL с SSL:

В интерфейсе Airflow UI в поле Extra для соединения PostgreSQL добавьте JSON:

{
  "sslmode": "require",
  "sslcert": "/path/to/client.crt",
  "sslkey": "/path/to/client.key",
  "sslrootcert": "/path/to/server.crt"
}

В коде DAG это будет автоматически учтено при создании подключения SQLAlchemy.

Пример для MySQL с указанием кодировки:

{
  "charset": "utf8mb4"
}

Теперь все соединения с MySQL будут использовать кодировку utf8mb4.

Безопасность и лучшие практики при работе с соединениями

Рекомендации по безопасному хранению учетных данных: использование переменных и секретов Airflow

  • Не храните учетные данные в коде DAG. Используйте conn_id для ссылки на соединения, настроенные в Airflow.

  • Используйте переменные Airflow (Variables). Для хранения конфиденциальных данных, таких как пароли, можно использовать переменные Airflow. Доступ к переменным можно получить в DAG.

  • Используйте секреты Airflow (Secrets Backends). Для более безопасного хранения учетных данных рекомендуется использовать секреты, хранящиеся в внешних хранилищах, таких как HashiCorp Vault, AWS Secrets Manager или Google Secret Manager. Airflow поддерживает интеграцию с этими системами.

  • Используйте зашифрованные соединения. Настройте SSL для соединений с базами данных для защиты передаваемых данных.

Обзор connection hooks и их применение для работы с соединениями

Connection hooks – это классы в Airflow, предоставляющие абстракции для работы с различными типами соединений. Они упрощают процесс подключения к внешним системам и выполнения операций. Например, PostgresHook предоставляет методы для выполнения SQL-запросов к базе данных PostgreSQL. Использование connection hooks делает код более читаемым и упрощает обработку ошибок.

Заключение

Настройка соединений с базами данных – важная часть работы с Apache Airflow. Используя SQLAlchemy, conn_id и extra_args, можно гибко и безопасно управлять подключениями к различным базам данных. Не забывайте о лучших практиках обеспечения безопасности, таких как использование секретов и зашифрованных соединений, чтобы защитить ваши учетные данные и данные.


Добавить комментарий