Что такое Scrapy Pipeline и зачем он нужен?
Scrapy Pipeline – это ключевой компонент фреймворка Scrapy, отвечающий за постобработку извлеченных данных (items). После того, как паук (spider) извлекает данные со страницы, они передаются в Pipeline для выполнения различных операций: очистки, валидации, сохранения в базу данных, фильтрации дубликатов и других преобразований. Pipeline позволяет структурировать процесс обработки данных и сделать его более гибким и управляемым.
Зачем он нужен?
- Очистка данных: Удаление лишних пробелов, приведение типов данных к нужному формату.
- Валидация данных: Проверка соответствия данных определенным критериям, например, формату email или диапазону цен.
- Сохранение данных: Запись данных в базу данных (MongoDB, PostgreSQL, MySQL и т.д.) или в файлы (JSON, CSV, XML).
- Фильтрация дубликатов: Предотвращение сохранения повторяющихся данных.
- Обработка изображений/файлов: Загрузка изображений и других файлов, связанных с извлеченными данными.
Архитектура Scrapy и место Pipeline в ней
Scrapy имеет модульную архитектуру. Основные компоненты:
- Scrapy Engine: Ядро фреймворка, управляющее потоком данных между компонентами.
- Spiders: Пауки, определяющие, как извлекать данные с веб-сайтов.
- Scheduler: Планировщик запросов, определяющий порядок посещения страниц.
- Downloader: Загрузчик страниц.
- Item Pipeline: Компонент для постобработки извлеченных данных.
- Middlewares: Промежуточное ПО для обработки запросов и ответов.
Данные проходят следующий путь: Spider -> Item -> Item Pipeline. После извлечения данных пауком (Spider), они представляются в виде Item. Item передается в Item Pipeline, где проходит через последовательность компонентов, каждый из которых выполняет определенную операцию. Результат работы Pipeline может быть сохранен в базу данных, файл или использован для других целей.
Основные функции и возможности Pipeline
Основные функции:
- Гибкая настройка: Можно настроить несколько Pipeline, каждый из которых выполняет определенную задачу.
- Приоритеты: Можно установить приоритет для каждого Pipeline, определяющий порядок их выполнения.
- Модульность: Каждый Pipeline представляет собой отдельный модуль, что упрощает разработку и поддержку.
- Обработка ошибок: Pipeline может обрабатывать исключения и ошибки, возникающие при обработке данных.
Настройка Scrapy Pipeline
Определение Pipeline в файле settings.py
Чтобы активировать Pipeline, его необходимо добавить в файл settings.py проекта Scrapy. Это делается с помощью настройки ITEM_PIPELINES. Значение этой настройки – словарь, где ключи – это классы Pipeline, а значения – их приоритеты.
ITEM_PIPELINES = {
'myproject.pipelines.MyPipeline': 300,
'myproject.pipelines.AnotherPipeline': 100,
}
В этом примере определены два Pipeline: MyPipeline и AnotherPipeline. MyPipeline имеет приоритет 300, а AnotherPipeline – 100. Pipeline с более высоким приоритетом выполняется раньше.
Приоритеты Pipeline: зачем и как их устанавливать
Приоритеты Pipeline определяют порядок, в котором они будут выполняться. Это важно, когда Pipeline зависят друг от друга или когда необходимо выполнить определенные операции в определенной последовательности. Например, сначала может потребоваться очистить данные, а затем проверить их на валидность.
Приоритеты устанавливаются в файле settings.py с помощью настройки ITEM_PIPELINES. Значение приоритета – целое число. Чем выше число, тем выше приоритет.
Активация и деактивация Pipeline
Чтобы деактивировать Pipeline, достаточно закомментировать или удалить соответствующую строку в настройке ITEM_PIPELINES в файле settings.py.
Реализация пользовательского Pipeline
Создание класса Pipeline: основные методы (processitem, openspider, close_spider)
Каждый Pipeline представляет собой класс, который должен реализовывать определенные методы. Основные методы:
process_item(self, item: dict, spider) -> dict:Этот метод вызывается для каждого Item, который проходит через Pipeline. Он принимает Item и объект Spider в качестве аргументов и должен возвращать Item (обработанный или нет) или вызыватьDropItemexception, чтобы отбросить Item. Типизацияitem: dictуказана для наглядности, фактически item этоscrapy.Itemилиdict.open_spider(self, spider):Этот метод вызывается при запуске Spider. Он может быть использован для инициализации ресурсов, например, для подключения к базе данных.close_spider(self, spider):Этот метод вызывается при завершении работы Spider. Он может быть использован для освобождения ресурсов, например, для закрытия соединения с базой данных.
Пример реализации Pipeline:
from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem
class PriceConverterPipeline:
exchange_rate = 80 # Abstract rate
def process_item(self, item: dict, spider) -> dict:
adapter = ItemAdapter(item)
if adapter.get('price'):
price = adapter['price']
adapter['price_usd'] = float(price) / self.exchange_rate
return item
else:
raise DropItem(f"Missing price in {item}")
class DuplicatesPipeline:
def __init__(self):
self.ids_seen = set()
def process_item(self, item: dict, spider) -> dict:
adapter = ItemAdapter(item)
if adapter['id'] in self.ids_seen:
raise DropItem(f"Duplicate item found: {item!r}")
else:
self.ids_seen.add(adapter['id'])
return item
Примеры реализации Pipeline для различных задач (очистка данных, валидация, сохранение в базу данных, экспорт в файлы)
-
Очистка данных: Удаление лишних пробелов, приведение типов данных. Например, удаление пробелов из названия товара:
class CleanDataPipeline: def process_item(self, item: dict, spider) -> dict: adapter = ItemAdapter(item) if adapter.get('name'): adapter['name'] = adapter['name'].strip() return item -
Валидация данных: Проверка на соответствие определенным критериям. Например, проверка формата email:
import re from scrapy.exceptions import DropItem from itemadapter import ItemAdapter class ValidateDataPipeline: def process_item(self, item: dict, spider) -> dict: adapter = ItemAdapter(item) if adapter.get('email') and not re.match(r"^[\w\.-]+@\w+\.\w+", adapter['email']): raise DropItem(f"Invalid email address: {item['email']}") return item -
Сохранение в базу данных: Запись данных в MongoDB:
import pymongo from itemadapter import ItemAdapter class MongoPipeline: collection_name = 'products'def __init__(self, mongo_uri, mongo_db): self.mongo_uri = mongo_uri self.mongo_db = mongo_db @classmethod def from_crawler(cls, crawler): return cls( mongo_uri=crawler.settings.get('MONGO_URI'), mongo_db=crawler.settings.get('MONGO_DATABASE', 'items') ) def open_spider(self, spider): self.client = pymongo.MongoClient(self.mongo_uri) self.db = self.client[self.mongo_db] def close_spider(self, spider): self.client.close() def process_item(self, item: dict, spider) -> dict: self.db[self.collection_name].insert_one(ItemAdapter(item).asdict()) return item -
Экспорт в файлы: Сохранение данных в JSON-файл:
import json class JsonWriterPipeline:def open_spider(self, spider): self.file = open('items.json', 'w') def close_spider(self, spider): self.file.close() def process_item(self, item: dict, spider) -> dict: line = json.dumps(item) + "\n" self.file.write(line) return item
Обработка исключений в Pipeline
В Pipeline необходимо обрабатывать исключения, которые могут возникнуть при обработке данных. Например, если не удается подключиться к базе данных, необходимо обработать исключение pymongo.errors.ConnectionFailure.
Исключения можно обрабатывать в методе process_item. Если возникает исключение, которое не позволяет обработать Item, можно вызвать DropItem, чтобы отбросить Item.
from scrapy.exceptions import DropItem
from itemadapter import ItemAdapter
class ExceptionHandlerPipeline:
def process_item(self, item: dict, spider) -> dict:
try:
adapter = ItemAdapter(item)
# Some operation that can raise an exception
value = int(adapter['price'])
adapter['price'] = value
return item
except ValueError:
raise DropItem(f"Invalid price format: {item['price']}")
Примеры использования Scrapy Pipeline
Сохранение данных в MongoDB через Pipeline
(Пример кода приведен в разделе «Примеры реализации Pipeline для различных задач»)
Фильтрация дубликатов данных через Pipeline
(Пример кода приведен в разделе «Создание класса Pipeline: основные методы»)
Конвейер для загрузки изображений
Scrapy имеет встроенный конвейер для загрузки изображений — ImagesPipeline. Для его использования необходимо:
- Добавить
ImagesPipelineвITEM_PIPELINES. - Указать параметры
IMAGES_STORE(путь к папке для сохранения изображений),IMAGES_EXPIRES(срок хранения изображений),IMAGES_THUMBS(создание миниатюр) иMEDIA_ALLOW_REDIRECTSвsettings.py. - В Item добавить поле
image_urlsс списком URL изображений.
# settings.py
ITEM_PIPELINES = {
'scrapy.pipelines.images.ImagesPipeline': 1
}
IMAGES_STORE = './images'
# Item
import scrapy
class Product(scrapy.Item):
image_urls = scrapy.Field()
images = scrapy.Field()
Продвинутые техники работы с Pipeline
Использование нескольких Pipeline для одной задачи
Можно использовать несколько Pipeline для выполнения одной задачи, например, для очистки и валидации данных. В этом случае необходимо правильно настроить приоритеты Pipeline, чтобы они выполнялись в нужной последовательности.
Передача данных между Pipeline
Данные можно передавать между Pipeline, используя spider.meta. Этот словарь доступен в каждом Pipeline и может использоваться для хранения промежуточных результатов.
class PipelineA:
def process_item(self, item: dict, spider) -> dict:
# Some operation
spider.meta['value'] = 123
return item
class PipelineB:
def process_item(self, item: dict, spider) -> dict:
value = spider.meta.get('value')
# Use the value
return item
Тестирование Pipeline
Для тестирования Pipeline можно использовать модульные тесты. Необходимо создать тестовый Item и проверить, что Pipeline правильно обрабатывает его.