Scrapy: Как создать и использовать конвейер элементов?

Что такое конвейеры элементов и зачем они нужны?

Конвейеры элементов (Item Pipelines) в Scrapy — это компоненты, отвечающие за постобработку извлеченных данных (Items). Они позволяют выполнять очистку, валидацию, фильтрацию, обогащение и сохранение данных. Без конвейеров, вся извлеченная информация выгружалась бы в «сыром» виде, что затрудняет ее дальнейшее использование. Они позволяют превратить сырые данные, полученные от пауков, в структурированную и пригодную для анализа информацию.

Роль конвейеров элементов в процессе обработки данных Scrapy

После того, как паук извлек данные и создал объект Item, этот объект передается в конвейер. Конвейеры работают последовательно, как сборочная линия. Каждый конвейер принимает элемент, выполняет определенную операцию, и затем передает его следующему конвейеру. Это позволяет разделить логику обработки данных на отдельные, легко поддерживаемые компоненты.

Примеры задач, решаемых с помощью конвейеров элементов

Конвейеры элементов решают широкий спектр задач:

  1. Очистка данных: удаление лишних пробелов, преобразование типов данных.
  2. Валидация данных: проверка наличия обязательных полей, проверка соответствия форматам.
  3. Удаление дубликатов: предотвращение сохранения идентичных элементов.
  4. Сохранение данных: запись данных в базу данных, файл, API.
  5. Обогащение данных: добавление дополнительной информации, например, гео-координат на основе адреса.

Создание и настройка конвейера элементов

Определение класса конвейера элементов (Item Pipeline)

Для создания конвейера необходимо определить класс, который будет наследоваться от scrapy.ItemPipeline. Этот класс должен находиться в файле pipelines.py вашего проекта Scrapy.

from itemadapter import ItemAdapter

class PriceConverterPipeline:
    exchange_rate = 80  # Example rate for demonstration

    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        if adapter.get('price'):
            price = adapter['price']
            adapter['price_usd'] = float(price) / self.exchange_rate
        return item

Реализация методов process_item, open_spider и close_spider

Основным методом конвейера является process_item(self, item, spider). Этот метод вызывается для каждого элемента, проходящего через конвейер. Он принимает элемент (item) и объект паука (spider) в качестве аргументов и должен возвращать обработанный элемент или выбросить исключение DropItem, чтобы отбросить элемент. Также можно переопределить методы open_spider(self, spider) и close_spider(self, spider), которые вызываются при запуске и завершении работы паука, соответственно. Эти методы часто используются для установки соединения с базой данных или выполнения других операций инициализации и завершения.

import logging

import pymongo
from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem

class MongoDBPipeline:

    collection_name = 'products'

    def __init__(self, mongo_uri, mongo_db):
        self.mongo_uri = mongo_uri
        self.mongo_db = mongo_db

    @classmethod
    def from_crawler(cls, crawler):
        return cls(
            mongo_uri=crawler.settings.get('MONGO_URI'),
            mongo_db=crawler.settings.get('MONGO_DATABASE')
        )

    def open_spider(self, spider):
        self.client = pymongo.MongoClient(self.mongo_uri)
        self.db = self.client[self.mongo_db]

    def close_spider(self, spider):
        self.client.close()

    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        if adapter.get('description') is None:
            raise DropItem(f"Missing description in {item}")

        self.db[self.collection_name].insert_one(item)
        logging.log(logging.INFO, f"Item added to MongoDB: {item['name']}")
        return item

Добавление конвейера в настройки Scrapy (settings.py)

Чтобы активировать конвейер, его необходимо добавить в настройках Scrapy (settings.py). Для этого используется параметр ITEM_PIPELINES.

ITEM_PIPELINES = {
   'myproject.pipelines.PriceConverterPipeline': 300,
   'myproject.pipelines.MongoDBPipeline': 400,
}
Реклама

Указание приоритета конвейера (ITEM_PIPELINES)

Числовые значения в ITEM_PIPELINES определяют приоритет конвейеров. Чем меньше число, тем выше приоритет, и тем раньше конвейер будет вызван для обработки элемента. Важно правильно настроить приоритет, чтобы конвейеры выполнялись в нужном порядке. Например, конвейер, очищающий данные, должен быть вызван до конвейера, сохраняющего данные в базу.

Примеры использования конвейеров элементов

Очистка данных (удаление лишних пробелов, приведение к нужному типу)

class CleanDataPipeline:
    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        adapter['title'] = adapter['title'].strip()
        try:
            adapter['price'] = float(adapter['price'])
        except ValueError:
            adapter['price'] = 0.0
        return item

Проверка данных (валидация обязательных полей, проверка форматов)

from scrapy.exceptions import DropItem

class ValidateDataPipeline:
    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        if not adapter.get('title'):
            raise DropItem("Missing title")
        if not adapter.get('price') or adapter['price'] <= 0:
            raise DropItem("Invalid price")
        return item

Удаление дубликатов элементов

class DuplicatesPipeline:
    def __init__(self):
        self.seen = set()

    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        item_id = adapter['id']
        if item_id in self.seen:
            raise DropItem(f"Duplicate item found: {item}")
        else:
            self.seen.add(item_id)
            return item

Сохранение данных в базу данных (например, PostgreSQL, MongoDB)

Пример сохранения в MongoDB был приведен выше.

Продвинутые техники работы с конвейерами

Использование нескольких конвейеров для обработки одного элемента

Как уже упоминалось, конвейеры работают последовательно. Это позволяет реализовать сложные сценарии обработки данных, где каждый конвейер отвечает за свою часть работы.

Условная обработка элементов в зависимости от типа или содержимого

Можно реализовать логику условной обработки внутри process_item, чтобы разные типы элементов обрабатывались по-разному. Например:

class ConditionalPipeline:
    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        if isinstance(item, ProductItem):
            # do product-specific processing
            pass
        elif isinstance(item, ReviewItem):
            # do review-specific processing
            pass
        return item

Обработка изображений и файлов с использованием конвейера ImagesPipeline и FilesPipeline

Scrapy предоставляет встроенные конвейеры ImagesPipeline и FilesPipeline для удобной загрузки и обработки изображений и файлов. Они автоматически скачивают файлы, генерируют миниатюры (для изображений) и хранят информацию о скачанных файлах.

Отладка и тестирование конвейеров элементов

Логирование ошибок и предупреждений в конвейере

Используйте модуль logging для записи информации о работе конвейера, включая ошибки и предупреждения.

import logging

class MyPipeline:
    def process_item(self, item, spider):
        try:
            # some operation that may fail
            pass
        except Exception as e:
            logging.error(f"Error processing item: {item}, error: {e}")

Использование scrapy.exceptions.DropItem для отбрасывания нежелательных элементов

Как было показано выше, исключение DropItem позволяет отбросить элемент, который не соответствует заданным критериям.

Тестирование конвейеров с использованием юнит-тестов

Напишите юнит-тесты для проверки корректности работы конвейеров. Используйте фреймворк unittest или pytest.

import unittest
from myproject.pipelines import CleanDataPipeline
from myproject.items import ProductItem

class TestCleanDataPipeline(unittest.TestCase):

    def test_clean_data(self):
        pipeline = CleanDataPipeline()
        item = ProductItem(title="  My Product  ", price="  100  ")
        cleaned_item = pipeline.process_item(item, None)
        self.assertEqual(cleaned_item['title'], "My Product")
        self.assertEqual(cleaned_item['price'], 100.0)

if __name__ == '__main__':
    unittest.main()

Добавить комментарий