Scrapy пайплайн: как настроить и использовать для обработки данных?

Что такое Scrapy Pipeline и зачем он нужен?

Scrapy Pipeline – это ключевой компонент фреймворка Scrapy, отвечающий за постобработку извлеченных данных (items). После того, как паук (spider) извлекает данные со страницы, они передаются в Pipeline для выполнения различных операций: очистки, валидации, сохранения в базу данных, фильтрации дубликатов и других преобразований. Pipeline позволяет структурировать процесс обработки данных и сделать его более гибким и управляемым.

Зачем он нужен?

  • Очистка данных: Удаление лишних пробелов, приведение типов данных к нужному формату.
  • Валидация данных: Проверка соответствия данных определенным критериям, например, формату email или диапазону цен.
  • Сохранение данных: Запись данных в базу данных (MongoDB, PostgreSQL, MySQL и т.д.) или в файлы (JSON, CSV, XML).
  • Фильтрация дубликатов: Предотвращение сохранения повторяющихся данных.
  • Обработка изображений/файлов: Загрузка изображений и других файлов, связанных с извлеченными данными.

Архитектура Scrapy и место Pipeline в ней

Scrapy имеет модульную архитектуру. Основные компоненты:

  1. Scrapy Engine: Ядро фреймворка, управляющее потоком данных между компонентами.
  2. Spiders: Пауки, определяющие, как извлекать данные с веб-сайтов.
  3. Scheduler: Планировщик запросов, определяющий порядок посещения страниц.
  4. Downloader: Загрузчик страниц.
  5. Item Pipeline: Компонент для постобработки извлеченных данных.
  6. Middlewares: Промежуточное ПО для обработки запросов и ответов.

Данные проходят следующий путь: Spider -> Item -> Item Pipeline. После извлечения данных пауком (Spider), они представляются в виде Item. Item передается в Item Pipeline, где проходит через последовательность компонентов, каждый из которых выполняет определенную операцию. Результат работы Pipeline может быть сохранен в базу данных, файл или использован для других целей.

Основные функции и возможности Pipeline

Основные функции:

  • Гибкая настройка: Можно настроить несколько Pipeline, каждый из которых выполняет определенную задачу.
  • Приоритеты: Можно установить приоритет для каждого Pipeline, определяющий порядок их выполнения.
  • Модульность: Каждый Pipeline представляет собой отдельный модуль, что упрощает разработку и поддержку.
  • Обработка ошибок: Pipeline может обрабатывать исключения и ошибки, возникающие при обработке данных.

Настройка Scrapy Pipeline

Определение Pipeline в файле settings.py

Чтобы активировать Pipeline, его необходимо добавить в файл settings.py проекта Scrapy. Это делается с помощью настройки ITEM_PIPELINES. Значение этой настройки – словарь, где ключи – это классы Pipeline, а значения – их приоритеты.

ITEM_PIPELINES = {
    'myproject.pipelines.MyPipeline': 300,
    'myproject.pipelines.AnotherPipeline': 100,
}

В этом примере определены два Pipeline: MyPipeline и AnotherPipeline. MyPipeline имеет приоритет 300, а AnotherPipeline – 100. Pipeline с более высоким приоритетом выполняется раньше.

Приоритеты Pipeline: зачем и как их устанавливать

Приоритеты Pipeline определяют порядок, в котором они будут выполняться. Это важно, когда Pipeline зависят друг от друга или когда необходимо выполнить определенные операции в определенной последовательности. Например, сначала может потребоваться очистить данные, а затем проверить их на валидность.

Приоритеты устанавливаются в файле settings.py с помощью настройки ITEM_PIPELINES. Значение приоритета – целое число. Чем выше число, тем выше приоритет.

Активация и деактивация Pipeline

Чтобы деактивировать Pipeline, достаточно закомментировать или удалить соответствующую строку в настройке ITEM_PIPELINES в файле settings.py.

Реализация пользовательского Pipeline

Создание класса Pipeline: основные методы (processitem, openspider, close_spider)

Каждый Pipeline представляет собой класс, который должен реализовывать определенные методы. Основные методы:

  • process_item(self, item: dict, spider) -> dict: Этот метод вызывается для каждого Item, который проходит через Pipeline. Он принимает Item и объект Spider в качестве аргументов и должен возвращать Item (обработанный или нет) или вызывать DropItem exception, чтобы отбросить Item. Типизация item: dict указана для наглядности, фактически item это scrapy.Item или dict.
  • open_spider(self, spider): Этот метод вызывается при запуске Spider. Он может быть использован для инициализации ресурсов, например, для подключения к базе данных.
  • close_spider(self, spider): Этот метод вызывается при завершении работы Spider. Он может быть использован для освобождения ресурсов, например, для закрытия соединения с базой данных.

Пример реализации Pipeline:

Реклама
from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem

class PriceConverterPipeline:
    exchange_rate = 80  # Abstract rate

    def process_item(self, item: dict, spider) -> dict:
        adapter = ItemAdapter(item)
        if adapter.get('price'):
            price = adapter['price']
            adapter['price_usd'] = float(price) / self.exchange_rate
            return item
        else:
            raise DropItem(f"Missing price in {item}")


class DuplicatesPipeline:
    def __init__(self):
        self.ids_seen = set()

    def process_item(self, item: dict, spider) -> dict:
        adapter = ItemAdapter(item)
        if adapter['id'] in self.ids_seen:
            raise DropItem(f"Duplicate item found: {item!r}")
        else:
            self.ids_seen.add(adapter['id'])
            return item

Примеры реализации Pipeline для различных задач (очистка данных, валидация, сохранение в базу данных, экспорт в файлы)

  • Очистка данных: Удаление лишних пробелов, приведение типов данных. Например, удаление пробелов из названия товара:

    class CleanDataPipeline:
        def process_item(self, item: dict, spider) -> dict:
            adapter = ItemAdapter(item)
            if adapter.get('name'):
                adapter['name'] = adapter['name'].strip()
            return item
    
  • Валидация данных: Проверка на соответствие определенным критериям. Например, проверка формата email:

    import re
    from scrapy.exceptions import DropItem
    from itemadapter import ItemAdapter
    
    class ValidateDataPipeline:
        def process_item(self, item: dict, spider) -> dict:
            adapter = ItemAdapter(item)
            if adapter.get('email') and not re.match(r"^[\w\.-]+@\w+\.\w+", adapter['email']):
                raise DropItem(f"Invalid email address: {item['email']}")
            return item
    
  • Сохранение в базу данных: Запись данных в MongoDB:

    import pymongo
    from itemadapter import ItemAdapter
    
    class MongoPipeline:
        collection_name = 'products'
    def __init__(self, mongo_uri, mongo_db):
        self.mongo_uri = mongo_uri
        self.mongo_db = mongo_db
    
    @classmethod
    def from_crawler(cls, crawler):
        return cls(
            mongo_uri=crawler.settings.get('MONGO_URI'),
            mongo_db=crawler.settings.get('MONGO_DATABASE', 'items')
        )
    
    def open_spider(self, spider):
        self.client = pymongo.MongoClient(self.mongo_uri)
        self.db = self.client[self.mongo_db]
    
    def close_spider(self, spider):
        self.client.close()
    
    def process_item(self, item: dict, spider) -> dict:
        self.db[self.collection_name].insert_one(ItemAdapter(item).asdict())
        return item
    

  • Экспорт в файлы: Сохранение данных в JSON-файл:

    import json
    
    class JsonWriterPipeline:
    def open_spider(self, spider):
        self.file = open('items.json', 'w')
    
    def close_spider(self, spider):
        self.file.close()
    
    def process_item(self, item: dict, spider) -> dict:
        line = json.dumps(item) + "\n"
        self.file.write(line)
        return item
    

Обработка исключений в Pipeline

В Pipeline необходимо обрабатывать исключения, которые могут возникнуть при обработке данных. Например, если не удается подключиться к базе данных, необходимо обработать исключение pymongo.errors.ConnectionFailure.

Исключения можно обрабатывать в методе process_item. Если возникает исключение, которое не позволяет обработать Item, можно вызвать DropItem, чтобы отбросить Item.

from scrapy.exceptions import DropItem
from itemadapter import ItemAdapter

class ExceptionHandlerPipeline:
    def process_item(self, item: dict, spider) -> dict:
        try:
            adapter = ItemAdapter(item)
            # Some operation that can raise an exception
            value = int(adapter['price'])
            adapter['price'] = value
            return item
        except ValueError:
            raise DropItem(f"Invalid price format: {item['price']}")

Примеры использования Scrapy Pipeline

Сохранение данных в MongoDB через Pipeline

(Пример кода приведен в разделе «Примеры реализации Pipeline для различных задач»)

Фильтрация дубликатов данных через Pipeline

(Пример кода приведен в разделе «Создание класса Pipeline: основные методы»)

Конвейер для загрузки изображений

Scrapy имеет встроенный конвейер для загрузки изображений — ImagesPipeline. Для его использования необходимо:

  1. Добавить ImagesPipeline в ITEM_PIPELINES.
  2. Указать параметры IMAGES_STORE (путь к папке для сохранения изображений), IMAGES_EXPIRES (срок хранения изображений), IMAGES_THUMBS (создание миниатюр) и MEDIA_ALLOW_REDIRECTS в settings.py.
  3. В Item добавить поле image_urls с списком URL изображений.
# settings.py
ITEM_PIPELINES = {
    'scrapy.pipelines.images.ImagesPipeline': 1
}

IMAGES_STORE = './images'

# Item
import scrapy

class Product(scrapy.Item):
    image_urls = scrapy.Field()
    images = scrapy.Field()

Продвинутые техники работы с Pipeline

Использование нескольких Pipeline для одной задачи

Можно использовать несколько Pipeline для выполнения одной задачи, например, для очистки и валидации данных. В этом случае необходимо правильно настроить приоритеты Pipeline, чтобы они выполнялись в нужной последовательности.

Передача данных между Pipeline

Данные можно передавать между Pipeline, используя spider.meta. Этот словарь доступен в каждом Pipeline и может использоваться для хранения промежуточных результатов.

class PipelineA:
    def process_item(self, item: dict, spider) -> dict:
        # Some operation
        spider.meta['value'] = 123
        return item

class PipelineB:
    def process_item(self, item: dict, spider) -> dict:
        value = spider.meta.get('value')
        # Use the value
        return item

Тестирование Pipeline

Для тестирования Pipeline можно использовать модульные тесты. Необходимо создать тестовый Item и проверить, что Pipeline правильно обрабатывает его.


Добавить комментарий