Dagster – это мощный оркестратор данных, предназначенный для построения, тестирования и мониторинга сложных data pipeline. Одним из ключевых аспектов эффективной работы с Dagster является правильное использование ресурсов (resources). Ресурсы позволяют абстрагироваться от деталей реализации, делая ваш код более модульным, тестируемым и переиспользуемым. В этой статье мы подробно рассмотрим, как создавать и использовать ресурсы в Dagster для эффективного построения data pipeline.
Основы работы с ресурсами в Dagster
Что такое ресурсы (resources) в Dagster и зачем они нужны?
Ресурсы в Dagster – это объекты, предоставляющие доступ к внешним системам или сервисам, таким как базы данных, API или файловые системы. Они инкапсулируют логику подключения и взаимодействия с этими системами, позволяя вашим ops (операциям) сосредоточиться на бизнес-логике.
Использование ресурсов дает несколько преимуществ:
-
Переиспользуемость: Ресурсы можно переиспользовать в разных ops и jobs (задачах), избегая дублирования кода.
-
Тестируемость: Ресурсы можно легко подменять мок-объектами во время тестирования, изолируя код от внешних зависимостей.
-
Конфигурация: Ресурсы позволяют централизованно управлять конфигурацией внешних систем.
-
Управление зависимостями: Dagster управляет жизненным циклом ресурсов, гарантируя их инициализацию перед использованием и освобождение после завершения.
Различия между ResourceDefinition и ConfigurableResourceDefinition
В Dagster существуют два основных способа определения ресурсов: ResourceDefinition и ConfigurableResourceDefinition.
-
ResourceDefinition: Используется для ресурсов, которые не требуют конфигурации. Например, ресурс, предоставляющий доступ к глобальной переменной окружения. -
ConfigurableResourceDefinition: Используется для ресурсов, которые требуют конфигурации. Например, ресурс для подключения к базе данных, требующий имя хоста, имя пользователя и пароль.ConfigurableResourceDefinitionавтоматически генерирует схему конфигурации, позволяя пользователям задавать значения параметров через UI Dagster или командную строку.
Определение и настройка ресурсов
Пошаговое руководство по созданию ResourceDefinition
Создадим простой ресурс, который будет возвращать текущее время:
from dagster import ResourceDefinition, OpExecutionContext
import datetime
def get_current_time(context: OpExecutionContext):
return datetime.datetime.now()
current_time_resource = ResourceDefinition(resource_fn=get_current_time)
Этот код определяет ресурс current_time_resource, который при вызове возвращает текущее время. resource_fn – это функция, которая будет вызвана для создания экземпляра ресурса. Для использования этого ресурса, необходимо добавить его к определению job или graph.
Передача конфигурации ресурсам: примеры и лучшие практики
Для ресурсов, требующих конфигурацию, используется ConfigurableResourceDefinition. Рассмотрим пример ресурса для подключения к базе данных PostgreSQL:
from dagster import ConfigurableResource, OpExecutionContext
import psycopg2
class PostgresResource(ConfigurableResource):
host: str
port: int
database: str
user: str
password: str
def get_connection(self):
return psycopg2.connect(
host=self.host,
port=self.port,
database=self.database,
user=self.user,
password=self.password
)
@asset
def some_asset(context: AssetExecutionContext):
postgres = context.resources.postgres
connection = postgres.get_connection()
# ... работа с базой данных ...
connection.close()
postgres_resource = PostgresResource.configure_at_launch()
В этом примере мы определили класс PostgresResource, который наследуется от ConfigurableResource. Атрибуты класса (host, port, database, user, password) автоматически преобразуются в параметры конфигурации. Метод get_connection создает подключение к базе данных, используя значения параметров конфигурации. configure_at_launch — метод, который позволяет задать конфигурацию ресурса при запуске job или asset.
Лучшие практики:
-
Используйте типы данных для параметров конфигурации, чтобы Dagster мог автоматически проверять значения.
-
Используйте значения по умолчанию для параметров конфигурации, чтобы упростить использование ресурса.
-
Разделите конфигурацию на разные уровни (например, development, staging, production), используя разные файлы конфигурации.
Использование ресурсов в Data Pipelines
Примеры использования ресурсов для подключения к базам данных, API, файловым системам
Рассмотрим несколько примеров использования ресурсов для подключения к различным системам:
- Подключение к API:
import requests
from dagster import ConfigurableResource, OpExecutionContext
class APIResource(ConfigurableResource):
api_url: str
def get_data(self, endpoint: str):
response = requests.get(f"{self.api_url}/{endpoint}")
response.raise_for_status()
return response.json()
- Подключение к файловой системе (S3):
import boto3
from dagster import ConfigurableResource, OpExecutionContext
class S3Resource(ConfigurableResource):
bucket_name: str
aws_access_key_id: str
aws_secret_access_key: str
def get_client(self):
return boto3.client(
's3',
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key
)
Интеграция ресурсов с IO Manager для управления вводом/выводом данных
IO Manager в Dagster отвечает за сериализацию и десериализацию данных между ops. Ресурсы можно интегрировать с IO Manager для управления вводом/выводом данных в различные системы хранения. Например, можно создать IO Manager, который будет сохранять данные в базе данных PostgreSQL, используя ресурс PostgresResource, описанный ранее.
from dagster import IOManager, InputContext, OutputContext, ConfigurableResource
import pandas as pd
class PostgresIOManager(IOManager):
def __init__(self, postgres_resource):
self.postgres_resource = postgres_resource
def handle_output(self, context: OutputContext, obj: pd.DataFrame):
connection = self.postgres_resource.get_connection()
# ... сохранение DataFrame в базу данных ...
connection.close()
def load_input(self, context: InputContext):
connection = self.postgres_resource.get_connection()
# ... загрузка DataFrame из базы данных ...
connection.close()
Продвинутые техники и лучшие практики
Тестирование кода, использующего ресурсы Dagster
Для тестирования кода, использующего ресурсы, рекомендуется использовать мок-объекты. Мок-объекты позволяют имитировать поведение внешних систем, изолируя код от реальных зависимостей.
from unittest.mock import MagicMock
from dagster import job, op, ResourceDefinition
@op
def my_op(context):
data = context.resources.my_resource.get_data()
assert data == "mock data"
def test_my_op():
mock_resource = MagicMock()
mock_resource.get_data.return_value = "mock data"
@job(resource_defs={"my_resource": ResourceDefinition.hardcoded_resource(mock_resource)})
def my_job():
my_op()
result = my_job.execute_in_process()
assert result.success
Обработка ошибок и повторные попытки при работе с ресурсами
При работе с внешними системами необходимо учитывать возможность возникновения ошибок. Dagster предоставляет возможность настраивать повторные попытки (retries) для ops, использующих ресурсы. Это позволяет автоматически повторять выполнение ops в случае временных сбоев.
from dagster import retry_policy, job, op
@op(retry_policy=retry_policy(max_retries=3))
def my_op(context):
# ... код, использующий ресурсы ...
pass
Также можно использовать блоки try...except для обработки исключений и логирования ошибок.
Заключение
Ресурсы в Dagster – это мощный инструмент для построения модульных, тестируемых и переиспользуемых data pipeline. Правильное использование ресурсов позволяет упростить управление конфигурацией, улучшить тестируемость и повысить надежность ваших pipelines. В этой статье мы рассмотрели основные концепции, примеры использования и лучшие практики работы с ресурсами в Dagster. Надеемся, что эта информация поможет вам эффективно использовать Dagster для решения ваших задач оркестрации данных.