Вставка данных в Google BigQuery с использованием Python: Полное руководство и примеры кода

Google BigQuery – это мощное и масштабируемое хранилище данных в облаке. Python, с его богатой экосистемой библиотек, является отличным инструментом для взаимодействия с BigQuery, включая вставку данных. Эта статья предоставит вам все необходимые знания и примеры кода для эффективной вставки данных в BigQuery с использованием Python.

Мы рассмотрим различные методы вставки: от простой построчной записи до пакетной загрузки из файлов, а также затронем вопросы оптимизации производительности и обработки ошибок. Данное руководство предназначено для разработчиков, инженеров данных и аналитиков, работающих с Python и Google Cloud Platform.

Настройка окружения и подготовка к работе

Прежде чем приступить к вставке данных, необходимо настроить окружение Python и установить необходимые библиотеки, а также настроить аутентификацию для доступа к BigQuery.

Установка и настройка библиотеки google-cloud-bigquery

Библиотека google-cloud-bigquery предоставляет Python API для работы с BigQuery. Установите её с помощью pip:

pip install google-cloud-bigquery

Настройка аутентификации для доступа к BigQuery (Service Accounts)

Для аутентификации рекомендуется использовать Service Accounts. Вот шаги по настройке:

  1. Создайте Service Account в Google Cloud Console.

  2. Предоставьте Service Account права на доступ к BigQuery (например, роль BigQuery Data Editor).

  3. Загрузите JSON-файл с ключом Service Account.

  4. Установите переменную окружения GOOGLE_APPLICATION_CREDENTIALS, указывающую на путь к этому файлу:

export GOOGLE_APPLICATION_CREDENTIALS="/path/to/your/service-account-key.json"

Теперь Python сможет автоматически аутентифицироваться при работе с BigQuery.

Вставка данных построчно в BigQuery

Построчная вставка – это самый простой, но наименее эффективный способ вставки данных, особенно при больших объемах.

Пример вставки одной строки данных

from google.cloud import bigquery

client = bigquery.Client()
table_id = "your-project.your_dataset.your_table" # Replace with your table ID

rows_to_insert = [
    {"field1": "value1", "field2": 123, "field3": True}
]

errors = client.insert_rows_json(table_id, rows_to_insert)
if errors:
    print(f"Encountered errors while inserting rows: {errors}")
else:
    print("Rows inserted successfully.")

Замените your-project.your_dataset.your_table на идентификатор вашей таблицы BigQuery. Структура rows_to_insert должна соответствовать схеме таблицы BigQuery.

Обработка ошибок при вставке данных построчно

Функция insert_rows_json возвращает список ошибок, если они возникли. Важно проверять этот список и обрабатывать ошибки соответствующим образом. Обычно, ошибки связаны с несоответствием типов данных или нарушением ограничений таблицы.

Вставка данных пакетами (Batch Insert)

Пакетная вставка значительно эффективнее построчной, так как позволяет отправлять несколько строк данных в одном запросе.

Вставка данных из списка Python объектов (Dictionaries)

from google.cloud import bigquery

client = bigquery.Client()
table_id = "your-project.your_dataset.your_table" # Replace with your table ID


data = [
    {"field1": "value1", "field2": 123, "field3": True},
    {"field1": "value2", "field2": 456, "field3": False},
    {"field1": "value3", "field2": 789, "field3": True},
]

errors = client.insert_rows_json(table_id, data)
if errors:
    print(f"Encountered errors while inserting rows: {errors}")
else:
    print("Rows inserted successfully.")

В этом примере мы создаем список словарей data и передаем его в функцию insert_rows_json. BigQuery примет список строк за один запрос и обработает его атомарно, либо все строки будут добавлены, либо все отклонены.

Оптимизация пакетной вставки: размер пакетов и количество запросов

Размер пакета влияет на производительность. Слишком маленькие пакеты снижают эффективность, а слишком большие могут привести к ошибкам из-за ограничений размера запроса. Рекомендуется экспериментировать с разными размерами пакетов, чтобы найти оптимальный. Начните с пакетов размером 1000-10000 строк и корректируйте в зависимости от результатов.

Реклама

Вставка данных из DataFrame Pandas

Pandas DataFrame – распространенный формат для хранения и обработки данных в Python. Библиотека google-cloud-bigquery предоставляет удобный способ вставки данных из DataFrame в BigQuery.

Преобразование DataFrame в формат, подходящий для BigQuery

import pandas as pd
from google.cloud import bigquery

client = bigquery.Client()
table_id = "your-project.your_dataset.your_table" # Replace with your table ID

df = pd.DataFrame({
    "field1": ["value1", "value2", "value3"],
    "field2": [123, 456, 789],
    "field3": [True, False, True]
})

df.to_gbq(destination_table=table_id, project_id=client.project, if_exists='append')

print("DataFrame inserted successfully.")

Функция to_gbq упрощает процесс вставки. Аргумент if_exists определяет, что делать, если таблица уже существует (‘append’ – добавить данные, ‘replace’ – перезаписать таблицу, ‘fail’ – вызвать ошибку). Для работы этого метода необходима установка pandas-gbq.

pip install pandas-gbq

Обработка ошибок и оптимизация для больших DataFrame

При работе с большими DataFrame может потребоваться оптимизация. Разбейте DataFrame на части и вставляйте их последовательно, чтобы избежать проблем с памятью. Также, убедитесь, что типы данных в DataFrame соответствуют типам данных в таблице BigQuery. Использование dtype в Pandas может помочь в этом.

Работа с данными из файлов (CSV и JSON)

BigQuery поддерживает загрузку данных из файлов CSV и JSON. Это удобный способ вставки данных, особенно если они уже хранятся в этих форматах.

Загрузка данных из CSV файлов в BigQuery

from google.cloud import bigquery

client = bigquery.Client()
table_id = "your-project.your_dataset.your_table" # Replace with your table ID

job_config = bigquery.LoadJobConfig(
    schema=[
        bigquery.SchemaField("field1", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("field2", bigquery.enums.SqlTypeNames.INTEGER),
        bigquery.SchemaField("field3", bigquery.enums.SqlTypeNames.BOOLEAN),
    ],
    skip_leading_rows=1, # Skip header row
    source_format=bigquery.SourceFormat.CSV,
)

with open("/path/to/your/data.csv", "rb") as source_file:
    job = client.load_table_from_file(source_file, table_id, job_config=job_config)

job.result()  # Waits for the job to complete.

table = client.get_table(table_id)  # Make an API request.
print("Loaded {} rows into {}.".format(table.num_rows, table_id))

Определите схему таблицы в job_config и укажите путь к CSV-файлу. Укажите skip_leading_rows=1, если CSV-файл содержит строку заголовков. Параметр source_format устанавливаем в bigquery.SourceFormat.CSV.

Загрузка данных из JSON файлов в BigQuery

from google.cloud import bigquery

client = bigquery.Client()
table_id = "your-project.your_dataset.your_table" # Replace with your table ID

job_config = bigquery.LoadJobConfig(
    schema=[
        bigquery.SchemaField("field1", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("field2", bigquery.enums.SqlTypeNames.INTEGER),
        bigquery.SchemaField("field3", bigquery.enums.SqlTypeNames.BOOLEAN),
    ],
    source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
)

with open("/path/to/your/data.json", "rb") as source_file:
    job = client.load_table_from_file(source_file, table_id, job_config=job_config)

job.result()  # Waits for the job to complete.

table = client.get_table(table_id)  # Make an API request.
print("Loaded {} rows into {}.".format(table.num_rows, table_id))

Схема таблицы также задается в job_config. Установите source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON. Убедитесь, что JSON-файл имеет формат newline-delimited JSON, где каждая строка является валидным JSON-объектом.

Заключение

В этой статье мы рассмотрели различные способы вставки данных в Google BigQuery с использованием Python. Вы научились настраивать окружение, вставлять данные построчно, пакетами, из DataFrame Pandas и из файлов CSV и JSON. Выбор оптимального метода зависит от объема данных, формата данных и требований к производительности. Не забывайте об обработке ошибок и оптимизации для достижения максимальной эффективности.


Добавить комментарий