Apache Airflow – мощный инструмент для оркестрации рабочих процессов, часто используемый для задач ETL и работы с данными. Ключевым аспектом работы с Airflow является настройка соединений с базами данных. В этой статье мы рассмотрим, как настроить соединения с базами данных в Airflow с использованием SQLAlchemy, conn_id и параметра extra_args, а также обсудим лучшие практики обеспечения безопасности.
Основы работы с соединениями в Airflow
Обзор типов соединений в Airflow и их назначение
Airflow поддерживает различные типы соединений для взаимодействия с разными системами, включая базы данных, облачные сервисы и API. Тип соединения определяет, как Airflow будет взаимодействовать с целевой системой. Для работы с базами данных часто используются соединения, поддерживающие SQLAlchemy. Примеры: PostgreSQL, MySQL, SQLite.
Настройка соединений через интерфейс Airflow UI и CLI
Соединения можно настроить двумя способами:
-
Через веб-интерфейс Airflow UI:
-
Перейдите в раздел Admin -> Connections.
-
Нажмите кнопку Create.
-
Выберите тип соединения (например, PostgreSQL).
-
Заполните необходимые поля, такие как Host, Schema, Login, Password, Port.
-
Укажите
Conn Id— уникальный идентификатор соединения.
-
-
Через CLI (Command Line Interface):
Используйте команду
airflow connections add. Например:airflow connections add postgres_db --conn-type postgres --conn-host localhost --conn-schema public --conn-login airflow --conn-password airflow --conn-port 5432
SQLAlchemy и Airflow: Первые шаги
Настройка SQLAlchemy для подключения к базам данных (установка зависимостей)
SQLAlchemy – это мощная библиотека Python, предоставляющая гибкий и удобный способ работы с базами данных. Для использования SQLAlchemy в Airflow необходимо установить соответствующий драйвер для вашей базы данных. Например, для PostgreSQL:
pip install psycopg2-binary
Для MySQL:
pip install pymysql
Создание базового DAG для подключения к базе данных через SQLAlchemy
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime
with DAG(
dag_id='postgres_example',
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False
) as dag:
create_table = PostgresOperator(
task_id='create_table',
postgres_conn_id='postgres_db',
sql="""
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
name VARCHAR(255)
);
"""
)
В этом примере postgres_conn_id указывает на соединение, настроенное для PostgreSQL.
Использование conn_id для управления соединениями
Что такое conn_id и как он используется в Airflow
conn_id – это уникальный идентификатор соединения, используемый в Airflow для ссылки на определенную конфигурацию соединения. Вместо того чтобы жестко задавать параметры подключения в коде DAG, вы указываете conn_id, и Airflow использует соответствующие учетные данные и параметры из конфигурации соединения.
Практическое применение conn_id в DAG: примеры с различными базами данных
from airflow import DAG
from airflow.providers.mysql.operators.mysql import MySqlOperator
from datetime import datetime
with DAG(
dag_id='mysql_example',
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False
) as dag:
create_table = MySqlOperator(
task_id='create_table',
mysql_conn_id='mysql_db',
sql="""
CREATE TABLE IF NOT EXISTS products (
id INT AUTO_INCREMENT PRIMARY KEY,
product_name VARCHAR(255)
);
"""
)
Здесь mysql_conn_id ссылается на соединение MySQL, настроенное в Airflow.
Передача дополнительных аргументов (extra_args) для SQLAlchemy
Объяснение параметра extra_args и его роли в настройке соединений
Параметр extra_args (или extra в некоторых версиях Airflow) позволяет передавать дополнительные, специфичные для базы данных аргументы в строку подключения SQLAlchemy. Это полезно для настройки параметров, которые не предусмотрены стандартными полями в интерфейсе Airflow, например, для настройки SSL, указания кодировки и т.д.
Примеры передачи специфичных для базы данных аргументов (SSL, кодировка, т.д.)
Пример для PostgreSQL с SSL:
В интерфейсе Airflow UI в поле Extra для соединения PostgreSQL добавьте JSON:
{
"sslmode": "require",
"sslcert": "/path/to/client.crt",
"sslkey": "/path/to/client.key",
"sslrootcert": "/path/to/server.crt"
}
В коде DAG это будет автоматически учтено при создании подключения SQLAlchemy.
Пример для MySQL с указанием кодировки:
{
"charset": "utf8mb4"
}
Теперь все соединения с MySQL будут использовать кодировку utf8mb4.
Безопасность и лучшие практики при работе с соединениями
Рекомендации по безопасному хранению учетных данных: использование переменных и секретов Airflow
-
Не храните учетные данные в коде DAG. Используйте
conn_idдля ссылки на соединения, настроенные в Airflow. -
Используйте переменные Airflow (Variables). Для хранения конфиденциальных данных, таких как пароли, можно использовать переменные Airflow. Доступ к переменным можно получить в DAG.
-
Используйте секреты Airflow (Secrets Backends). Для более безопасного хранения учетных данных рекомендуется использовать секреты, хранящиеся в внешних хранилищах, таких как HashiCorp Vault, AWS Secrets Manager или Google Secret Manager. Airflow поддерживает интеграцию с этими системами.
-
Используйте зашифрованные соединения. Настройте SSL для соединений с базами данных для защиты передаваемых данных.
Обзор connection hooks и их применение для работы с соединениями
Connection hooks – это классы в Airflow, предоставляющие абстракции для работы с различными типами соединений. Они упрощают процесс подключения к внешним системам и выполнения операций. Например, PostgresHook предоставляет методы для выполнения SQL-запросов к базе данных PostgreSQL. Использование connection hooks делает код более читаемым и упрощает обработку ошибок.
Заключение
Настройка соединений с базами данных – важная часть работы с Apache Airflow. Используя SQLAlchemy, conn_id и extra_args, можно гибко и безопасно управлять подключениями к различным базам данных. Не забывайте о лучших практиках обеспечения безопасности, таких как использование секретов и зашифрованных соединений, чтобы защитить ваши учетные данные и данные.