Найшвидший спосіб завантажити дані в PostgreSQL за допомогою Python

57 хв. читання

Згадайте, як часто вам доводилося завантажувати дані зі стороннього ресурсу до власного проєкту. Якщо ви щасливчик, то ці дані серіалізовані як JSON або YAML. Якщо ні, то отримуєте Excel-таблицю або CSV-файл, який обов'язково (незрозуміло, чому) повинен бути некоректним.

Дані великих компаній або старих систем завжди якось дивно закодовані, заархівовані або розбиті на менші файли з рандомними назвами.

Сучасні сервіси можуть, звичайно, пропонувати хороший API, однак нам не часто потрібно отримувати файл з FTP, SFTP, S3 або якогось застарілого сховища, яке працює лише на Windows.

У статті ми розглянемо найкращий спосіб імпортувати великий обсяг даних з віддаленого джерела в PostgreSQL БД.

Спочатку про передумови:

  1. Дані отримуються з віддаленого джерела.
  2. Дані — неформатовані, потребують перетворень.
  3. Великий обсяг даних.

Налаштування

Для прикладу будемо використовувати дані з API за посиланням: ними й заповнимо таблицю в базі даних.

$ curl https://api.punkapi.com/v2/beers/?per_page=1&page=1
[
    {
        "id": 1,
        "name": "Buzz",
        "tagline": "A Real Bitter Experience.",
        "first_brewed": "09/2007",
        "description": "A light, crisp and bitter IPA ...",
        "image_url": "https://images.punkapi.com/v2/keg.png",
        "abv": 4.5,
        "ibu": 60,
        "target_fg": 1010,
        "target_og": 1044,
        "ebc": 20,
        "srm": 10,
        "ph": 4.4,
        "attenuation_level": 75,
        "volume": {
            "value": 20,
            "unit": "litres"
        },
        "contributed_by": "Sam Mason <samjbmason>"
        "brewers_tips": "The earthy and floral aromas from...",
        "boil_volume": {},
        "method": {},
        "ingredients": {},
        "food_pairing": [],
    }
]

У статті ми будемо імпортувати усі рядки до brewers_tips в таблицю бази даних.

Поле volume — вкладене. Нам необхідно отримати лише value з поля та зберегти його в таблиці в колонці volume.

volume = beer['volume']['data']

Поле first_brewed містить лише рік та місяць, а в деяких випадках лише рік. Нам необхідно перетворити це значення на валідну дату. Наприклад, значення 09/2007 буде перетворено на дату 2007-09-01. Значення 2006 — на 2016-01-01.

Напишемо просту функцію, щоб перетворити текстові значення в datetime.date у Python:

import datetime

def parse_first_brewed(text: str) -> datetime.date:
    parts = text.split('/')
    if len(parts) == 2:
        return datetime.date(int(parts[1]), int(parts[0]), 1)
    elif len(parts) == 1:
        return datetime.date(int(parts[0]), 1, 1)
    else:
        assert False, 'Unknown date format'

Переконаємось, що все працює:

>>> parse_first_brewed('09/2007')
datetime.date(2007, 9, 1)

>>> parse_first_brewed('2006')
datetime.date(2006, 1, 1)

В реальних ситуаціях перетворення можуть бути набагато складнішими. Але для демонстрації цього достатньо.

Отримуємо дані

API вказує дані посторінково. Щоб інкапсулювати цю логіку, ми створимо генератор, який видаватиме елемент один за одним:

from typing import Iterator, Dict, Any
from urllib.parse import urlencode
import requests


def iter_beers_from_api(page_size: int = 5) -> Iterator[Dict[str, Any]]:
    session = requests.Session()
    page = 1
    while True:
        response = session.get('https://web.archive.org/web/20230610000809/https://api.punkapi.com/v2/beers?' + urlencode({
            'page': page,
            'per_page': page_size
        }))
        response.raise_for_status()

        data = response.json()
        if not data:
            break

        yield from data

        page += 1

Подивимось на нашу функцію-генератор у дії:

>>> beers = iter_beers_from_api()
>>> next(beers)
{'id': 1,
 'name': 'Buzz',
 'tagline': 'A Real Bitter Experience.',
 'first_brewed': '09/2007',
 'description': 'A light, crisp and bitter IPA brewed...',
 'image_url': 'https://web.archive.org/web/20230610000809/https://images.punkapi.com/v2/keg.png',
 'abv': 4.5,
 'ibu': 60,
 'target_fg': 1010,
...
}
>>> next(beers)
{'id': 2,
 'name': 'Trashy Blonde',
 'tagline': "You Know You Shouldn't",
 'first_brewed': '04/2008',
 'description': 'A titillating, ...',
 'image_url': 'https://web.archive.org/web/20230610000809/https://images.punkapi.com/v2/2.png',
 'abv': 4.1,
 'ibu': 41.5,

Ви можете помітити, що перший результат кожної сторінки отримується найдовше. Усе тому, що здійснюється запит для отримання сторінки.

Створюємо таблицю в БД

Наступний крок — створити таблицю, куди ми будемо імпортувати дані.

Створюємо саму БД:

$ createdb -O haki testload

Замініть haki на вашого локального користувача.

Щоб з'єднатися з PostgreSQL базою даних з Python, ми можемо використовувати pycopg:

python -m pip install psycopg2

За допомогою psycopg створюємо з'єднання з БД:

import psycopg2

connection = psycopg2.connect(
    host="localhost",
    database="testload",
    user="haki",
    password=None,
)
connection.autocommit = True

Ми встановлюємо autocommit=True, щоб кожна команда, яку ми виконуємо, застосовувалась миттєво. Це саме те, що нам потрібно для прикладу.

Тепер у нас встановлено з'єднання, тож можемо написати функцію для створення таблиці:

def create_staging_table(cursor) -> None:
    cursor.execute("""
        DROP TABLE IF EXISTS staging_beers;
        CREATE UNLOGGED TABLE staging_beers (
            id                  INTEGER,
            name                TEXT,
            tagline             TEXT,
            first_brewed        DATE,
            description         TEXT,
            image_url           TEXT,
            abv                 DECIMAL,
            ibu                 DECIMAL,
            target_fg           DECIMAL,
            target_og           DECIMAL,
            ebc                 DECIMAL,
            srm                 DECIMAL,
            ph                  DECIMAL,
            attenuation_level   DECIMAL,
            brewers_tips        TEXT,
            contributed_by      TEXT,
            volume              INTEGER
        );
    """)

Функція отримує курсор та створює unlogged-таблицю staging_beers.

Дані, додані до unlogged таблиці, не будуть логовані до журналу випереджального запису (write-ahead-log, WAL). А це ідеальний варіант для проміжних таблиць Зверніть увагу, що UNLOGGED-таблиці не будуть відновлені у разі збою.

Використовуючи з'єднання, яке ми створили раніше, викличемо нашу функцію:

>>> with connection.cursor() as cursor:
>>>     create_staging_table(cursor)

Тепер ми готові рухатись далі.

Показники

В матеріалі нас цікавитимуть два основні показники: час та пам'ять.

Вимірювання часу

Щоб визначити час виконання кожного методу, ми можемо використати вбудований модуль time:

>>> import time
>>> start = time.perf_counter()
>>> time.sleep(1) # do work
>>> elapsed = time.perf_counter() - start
>>> print(f'Time {elapsed:0.4}')
Time 1.001

Функція perf_counter ідеально підійде для виміру часу виконання наших методів.

Вимірювання пам'яті

Визначити обсяг використаної пам'яті нам допоможе пакет memory-profiler.

$ python -m pip install memory-profiler

З цим пакетом ми можемо визначити як звичайне, так і додаткове використання пам'яті для кожного рядка коду. Саме така інформація стане нам у пригоді для оптимізації. Звернемося до прикладу PyPI:

$ python -m memory_profiler example.py

Line #    Mem usage  Increment   Line Contents
==============================================
     3                           @profile
     4      5.97 MB    0.00 MB   def my_func():
     5     13.61 MB    7.64 MB       a = [1] * (10 ** 6)
     6    166.20 MB  152.59 MB       b = [2] * (2 * 10 ** 7)
     7     13.61 MB -152.59 MB       del b
     8     13.61 MB    0.00 MB       return a

Варто звернути увагу на колонку Increment, яка демонструє додаткову пам'ять, виділену кодом у кожному рядку.

У статті нас цікавитиме використання функціями «peak memory», тобто різниці між значеннями колонок mem usage та найвищим значенням.

Щоб отримати значення mem usage, ми використаємо функцію memory_usage з memory_profiler:

>>> from memory_profiler import memory_usage
>>> mem, retval = memory_usage((fn, args, kwargs), retval=True, interval=1e-7)

При такому використанні функція memory_usage виконує функцію fn з передбаченими args та kwargs, але також запускає інший фоновий процес, щоб контролювати використання пам'яті кожні interval секунд.

Для дуже швидких операцій, функція fn може виконатись більше ніж один раз. Якщо як interval ми встановимо значення менше, ніж 1e-6, функція виконається лише раз.

Аргумент retval вказує функції повернути результат fn.

Декоратор profile

Щоб з'єднати все докупи, ми створюємо декоратор, аби виміряти показники та дати відповідну інформацію про використання пам'яті та час виконання.

import time
from functools import wraps
from memory_profiler import memory_usage

def profile(fn):
    @wraps(fn)
    def inner(*args, **kwargs):
        fn_kwargs_str = ', '.join(f'{k}={v}' for k, v in kwargs.items())
        print(f'\
{fn.__name__}({fn_kwargs_str})')

        # Вимірювання часу
        t = time.perf_counter()
        retval = fn(*args, **kwargs)
        elapsed = time.perf_counter() - t
        print(f'Time   {elapsed:0.4}')

        # Вимірювання пам'яті
        mem, retval = memory_usage((fn, args, kwargs), retval=True, timeout=200, interval=1e-7)

        print(f'Memory {max(mem) - min(mem)}')
        return retval

    return inner

Щоб уникнути взаємного впливу часу на пам'ять і навпаки, ми виконаємо функцію два рази: один раз для вимірювання часу, другий — для пам'яті.

Декоратор виведе назву функції та будь-які kwargs, а також відзвітує про використання пам'яті та час.

>>> @profile
>>> def work(n):
>>>     for i in range(n):
>>>         2 ** n

>>> work(10)
work()
Time   0.06269
Memory 0.0

>>> work(n=10000)
work(n=10000)
Time   0.3865
Memory 0.0234375

Ми навмисно виводимо тут лише kwargs, які збираємось використовувати також у параметризованих тестах далі.

Контрольні показники

Коли писалась ця стаття, у API було лише 325 елементів. Щоб отримати більш об'ємні дані, ми продублювали вихідні дані 100 разів та зберегли їх у пам'яті. Отриманий набір даних містить 32 500 елементів:

>> beers = list(iter_beers_from_api()) * 100
>>> len(beers)
32,500

Для імітації стороннього API наші функції прийматимуть ітератори, подібні до поверненого функцією iter_beers_from_api значення:

def process(beers: Iterator[Dict[str, Any]])) -> None:
    # Обробка

Для отримання контрольних показників, ми імпортуємо дані до БД. Щоб усунути вплив зовнішніх факторів (наприклад, мережі), ми отримуємо дані з API заздалегідь та обробляємо їх локально.

Щоб отримати точне значення часу, ми «підробляємо» стороннє API:

>>> beers = list(iter_beers_from_api()) * 100
>>> process(beers)

В реальних ситуаціях ви використаєте функцію iter_beers_from_api напряму:

>>> process(iter_beers_from_api())

Тепер усе готово, щоб почати тестувати різні методи!

insert_one_by_one

Почнемо з найпростішого прикладу: вставимо рядки один за одним:

@profile
def insert_one_by_one(connection, beers: Iterator[Dict[str, Any]]) -> None:
    with connection.cursor() as cursor:
        create_staging_table(cursor)
        for beer in beers:
            cursor.execute("""
                INSERT INTO staging_beers VALUES (
                    %(id)s,
                    %(name)s,
                    %(tagline)s,
                    %(first_brewed)s,
                    %(description)s,
                    %(image_url)s,
                    %(abv)s,
                    %(ibu)s,
                    %(target_fg)s,
                    %(target_og)s,
                    %(ebc)s,
                    %(srm)s,
                    %(ph)s,
                    %(attenuation_level)s,
                    %(brewers_tips)s,
                    %(contributed_by)s,
                    %(volume)s
                );
            """, {
                **beer,
                'first_brewed': parse_first_brewed(beer['first_brewed']),
                'volume': beer['volume']['value'],
            })

Зверніть увагу, як під час ітерації елементами, ми перетворюємо first_brewed на datetime.date, а також отримуємо значення з вкладеного поля volume.

Запустивши цю функцію, ми отримаємо такий результат:

>>> insert_one_by_one(connection, beers)
insert_one_by_one()
Time   128.8
Memory 0.08203125

Функції знадобилось 129 секунд, щоб імпортувати 32 тисячі рядків. memory profiler показує, що було використано зовсім небагато пам'яті.

Видається, що вставка рядків один за одним не надто ефективний метод: постійне перемикання між програмою і БД сповільнює цей процес.

executemany

Psycopg2 дозволяє вставляти багато рядків за один раз за допомогою executemany. Ось що сказано в документації:

Функція виконує операцію БД (запит чи команду) для всіх кортежів параметрів або зіставлень, знайдених у vars_list.

Спробуймо застосувати цю функцію на практиці:

@profile
def insert_executemany(connection, beers: Iterator[Dict[str, Any]]) -> None:
    with connection.cursor() as cursor:
        create_staging_table(cursor)

        all_beers = [{
            **beer,
            'first_brewed': parse_first_brewed(beer['first_brewed']),
            'volume': beer['volume']['value'],
        } for beer in beers]

        cursor.executemany("""
            INSERT INTO staging_beers VALUES (
                %(id)s,
                %(name)s,
                %(tagline)s,
                %(first_brewed)s,
                %(description)s,
                %(image_url)s,
                %(abv)s,
                %(ibu)s,
                %(target_fg)s,
                %(target_og)s,
                %(ebc)s,
                %(srm)s,
                %(ph)s,
                %(attenuation_level)s,
                %(brewers_tips)s,
                %(contributed_by)s,
                %(volume)s
            );
        """, all_beers)

Все дуже схоже на попередній приклад, особливо всі перетворення даних. Основна відмінність в тому, що ми спочатку робимо всі перетворення в пам'яті, а вже потім імпортуємо їх в БД.

Результат виконання функції:

>>> insert_executemany(connection, beers)
insert_executemany()
Time   124.7
Memory 2.765625

Результати засмучують. Все відпрацювало трохи швидше, ніж у попередньому прикладі, проте використання пам'яті тепер становить 2.7МБ.

Зараз JSON-файл з даними, які ми імпортували, важить 25МБ на диску. З таким підходом файл в 1ГБ потребуватиме 110МБ пам'яті.

executemany з ітератором

Попередній метод не був оптимальним через використання пам'яті, адже перетворені дані зберігались у пам'яті перед обробкою psycopg.

Перевіримо, чи допоможе ітератор не зберігати дані локально:

@profile
def insert_executemany_iterator(connection, beers: Iterator[Dict[str, Any]]) -> None:
    with connection.cursor() as cursor:
        create_staging_table(cursor)
        cursor.executemany("""
            INSERT INTO staging_beers VALUES (
                %(id)s,
                %(name)s,
                %(tagline)s,
                %(first_brewed)s,
                %(description)s,
                %(image_url)s,
                %(abv)s,
                %(ibu)s,
                %(target_fg)s,
                %(target_og)s,
                %(ebc)s,
                %(srm)s,
                %(ph)s,
                %(attenuation_level)s,
                %(brewers_tips)s,
                %(contributed_by)s,
                %(volume)s
            );
        """, ({
            **beer,
            'first_brewed': parse_first_brewed(beer['first_brewed']),
            'volume': beer['volume']['value'],
        } for beer in beers))

Основна відмінність у тому, що перетворені дані переходять потоком в executemany, використовуючи ітератор.

Після виконання отримуємо такий результат:

>>> insert_executemany_iterator(connection, beers)
insert_executemany_iterator()
Time   129.3
Memory 0.0

Цей метод працює, як і очікувалось, тому нам вдалось звести використання пам'яті до 0. Однак час виконання залишається приблизно таким самим, як і у методі one-by-one.

execute_batch

Документація psycopg дає таку інформацію про executemany в розділі «Fast execution helpers»:

Поточна реалізація executemany() не дуже продуктивна. Така функція може використовуватись для прискорення повторного виконання виразу для певного набору параметрів. Зменшивши кількість звертань до сервера, ви скоріше збільшите продуктивність, ніж якщо будете використовувати executemany().

Тож весь цей час ми робили все неправильно!

Але документація пропонує нам також функцію execute_batch:

Виконує групу виразів з мінімальною кількістю звернень до сервера.

Реалізуємо завантаження з execute_batch:

import psycopg2.extras

@profile
def insert_execute_batch(connection, beers: Iterator[Dict[str, Any]]) -> None:
    with connection.cursor() as cursor:
        create_staging_table(cursor)

        all_beers = [{
            **beer,
            'first_brewed': parse_first_brewed(beer['first_brewed']),
            'volume': beer['volume']['value'],
        } for beer in beers]

        psycopg2.extras.execute_batch(cursor, """
            INSERT INTO staging_beers VALUES (
                %(id)s,
                %(name)s,
                %(tagline)s,
                %(first_brewed)s,
                %(description)s,
                %(image_url)s,
                %(abv)s,
                %(ibu)s,
                %(target_fg)s,
                %(target_og)s,
                %(ebc)s,
                %(srm)s,
                %(ph)s,
                %(attenuation_level)s,
                %(brewers_tips)s,
                %(contributed_by)s,
                %(volume)s
            );
        """, all_beers)

Результат виконання:

>>> insert_execute_batch(connection, beers)
insert_execute_batch()
Time   3.917
Memory 2.50390625

Результати вражають: функція впоралась майже за 4 секунди, а це приблизно в 33 рази швидше за показник, з якого ми починали.

execute_batch з ітератором

Функція execute_batch використовує менше пам'яті, ніж executemany для однакового обсягу даних. Спробуймо ще покращити цей показник, використавши execute_batch з ітератором:

@profile
def insert_execute_batch_iterator(connection, beers: Iterator[Dict[str, Any]]) -> None:
    with connection.cursor() as cursor:
        create_staging_table(cursor)

        iter_beers = ({
            **beer,
            'first_brewed': parse_first_brewed(beer['first_brewed']),
            'volume': beer['volume']['value'],
        } for beer in beers)

        psycopg2.extras.execute_batch(cursor, """
            INSERT INTO staging_beers VALUES (
                %(id)s,
                %(name)s,
                %(tagline)s,
                %(first_brewed)s,
                %(description)s,
                %(image_url)s,
                %(abv)s,
                %(ibu)s,
                %(target_fg)s,
                %(target_og)s,
                %(ebc)s,
                %(srm)s,
                %(ph)s,
                %(attenuation_level)s,
                %(brewers_tips)s,
                %(contributed_by)s,
                %(volume)s
            );
        """, iter_beers)

Результати:

>>> insert_execute_batch_iterator(connection, beers)
insert_execute_batch_iterator()
Time   4.333
Memory 0.2265625

Час виконання майже не змінився, проте пам'яті використовується значно менше.

execute_batch з ітератором та page_size

В розділі документації про execute_batch можна натрапити на аргумент page_size.

page_size — максимальна кількість елементів arglist, які можна додати до одного виразу. Якщо елементів більше, функція виконуватиме понад один вираз.

Раніше ми вже дізналися з документації, що продуктивність функції покращується зі зменшенням звернень до БД. Якщо в цьому суть, більший розмір сторінки повинен зменшити кількість таких звернень, а отже — й час завантаження.

Додамо аргумент page_size до нашої функції, щоб ми могли дослідити результат:

@profile
def insert_execute_batch_iterator(
    connection,
    beers: Iterator[Dict[str, Any]],
    page_size: int = 100,
) -> None:
    with connection.cursor() as cursor:
        create_staging_table(cursor)

        iter_beers = ({
            **beer,
            'first_brewed': parse_first_brewed(beer['first_brewed']),
            'volume': beer['volume']['value'],
        } for beer in beers)

        psycopg2.extras.execute_batch(cursor, """
            INSERT INTO staging_beers VALUES (
                %(id)s,
                %(name)s,
                %(tagline)s,
                %(first_brewed)s,
                %(description)s,
                %(image_url)s,
                %(abv)s,
                %(ibu)s,
                %(target_fg)s,
                %(target_og)s,
                %(ebc)s,
                %(srm)s,
                %(ph)s,
                %(attenuation_level)s,
                %(brewers_tips)s,
                %(contributed_by)s,
                %(volume)s
            );
        """, iter_beers, page_size=page_size)

Значення за замовчуванням для page_size — 100. Протестуємо різні значення та порівняємо результати виконання:

>>> insert_execute_batch_iterator(connection, iter(beers), page_size=1)
insert_execute_batch_iterator(page_size=1)
Time   130.2
Memory 0.0

>>> insert_execute_batch_iterator(connection, iter(beers), page_size=100)
insert_execute_batch_iterator(page_size=100)
Time   4.333
Memory 0.0

>>> insert_execute_batch_iterator(connection, iter(beers), page_size=1000)
insert_execute_batch_iterator(page_size=1000)
Time   2.537
Memory 0.2265625

>>> insert_execute_batch_iterator(connection, iter(beers), page_size=10000)
insert_execute_batch_iterator(page_size=10000)
Time   2.585
Memory 25.4453125

Ми отримали дещо цікаві результати. Розглянемо кожен окремо:

  • 1: Результат подібний до insert_one_by_one;
  • 100: Значення page_size за замовчуванням, тому результат подібний до нашого попереднього випробування;
  • 1000: Час виконання на 40% кращий і пам'яті використовується небагато;
  • 10000: Показник часу не набагато ліпший за попередній результат, проте показник використання пам'яті на порядок вищий.

Результати показують, що є деякий компроміс між пам'яттю та швидкістю. Здається, найкращим варіантом буде використання значення 1000 для page_size.

execute_values

В документації про execute_values вказано:

Виконує вираз, використовуючи значення з послідовністю параметрів.

Функція execute_values генерує великий список VALUES для запиту.

Спробуймо функцію на практиці:

import psycopg2.extras

@profile
def insert_execute_values(connection, beers: Iterator[Dict[str, Any]]) -> None:
    with connection.cursor() as cursor:
        create_staging_table(cursor)
        psycopg2.extras.execute_values(cursor, """
            INSERT INTO staging_beers VALUES %s;
        """, [(
            beer['id'],
            beer['name'],
            beer['tagline'],
            parse_first_brewed(beer['first_brewed']),
            beer['description'],
            beer['image_url'],
            beer['abv'],
            beer['ibu'],
            beer['target_fg'],
            beer['target_og'],
            beer['ebc'],
            beer['srm'],
            beer['ph'],
            beer['attenuation_level'],
            beer['brewers_tips'],
            beer['contributed_by'],
            beer['volume']['value'],
        ) for beer in beers])

Проаналізуємо результат виконання:

>>> insert_execute_values(connection, beers)
insert_execute_values()
Time   3.666
Memory 4.50390625

Тож одразу з коробки ми отримуємо невелике пришвидшення, якщо порівняти з execute_bulk. Однак показник пам'яті трохи завищений.

execute_values з ітератором

Як і раніше, щоб зменшити використання пам'яті, ми намагаємось уникати збереження даних локально, використовуючи ітератор замість списку:

@profile
def insert_execute_values_iterator(connection, beers: Iterator[Dict[str, Any]]) -> None:
    with connection.cursor() as cursor:
        create_staging_table(cursor)
        psycopg2.extras.execute_values(cursor, """
            INSERT INTO staging_beers VALUES %s;
        """, ((
            beer['id'],
            beer['name'],
            beer['tagline'],
            parse_first_brewed(beer['first_brewed']),
            beer['description'],
            beer['image_url'],
            beer['abv'],
            beer['ibu'],
            beer['target_fg'],
            beer['target_og'],
            beer['ebc'],
            beer['srm'],
            beer['ph'],
            beer['attenuation_level'],
            beer['brewers_tips'],
            beer['contributed_by'],
            beer['volume']['value'],
        ) for beer in beers))

Отримаємо такий результат:

>>> insert_execute_values_iterator(connection, beers)
insert_execute_values_iterator()
Time   3.677
Memory 0.0

Час виконання майже той самий, однак використання пам'яті зводиться до 0.

execute_values з ітератором та page_size

Подібно до execute_bulk, execute_values приймає параметр page_size:

@profile
def insert_execute_values_iterator(
    connection,
    beers: Iterator[Dict[str, Any]],
    page_size: int = 100,
) -> None:
    with connection.cursor() as cursor:
        create_staging_table(cursor)
        psycopg2.extras.execute_values(cursor, """
            INSERT INTO staging_beers VALUES %s;
        """, ((
            beer['id'],
            beer['name'],
            beer['tagline'],
            parse_first_brewed(beer['first_brewed']),
            beer['description'],
            beer['image_url'],
            beer['abv'],
            beer['ibu'],
            beer['target_fg'],
            beer['target_og'],
            beer['ebc'],
            beer['srm'],
            beer['ph'],
            beer['attenuation_level'],
            beer['brewers_tips'],
            beer['contributed_by'],
            beer['volume']['value'],
        ) for beer in beers), page_size=page_size)

Протестуємо на різних значеннях параметра page_size:

>>> insert_execute_values_iterator(connection, iter(beers), page_size=1)
insert_execute_values_iterator(page_size=1)
Time   127.4
Memory 0.0

>>> insert_execute_values_iterator(connection, iter(beers), page_size=100)
insert_execute_values_iterator(page_size=100)
Time   3.677
Memory 0.0

>>> insert_execute_values_iterator(connection, iter(beers), page_size=1000)
insert_execute_values_iterator(page_size=1000)
Time   1.468
Memory 0.0

>>> insert_execute_values_iterator(connection, iter(beers), page_size=10000)
insert_execute_values_iterator(page_size=10000)
Time   1.503
Memory 2.25

Як і у випадку з execute_bulk, ми бачимо компроміс між пам'яттю та швидкістю. Тут також оптимальним рішенням буде значення за замовчуванням 1000. Однак з використанням execute_values наш результат приблизно на 20% швидший за execute_bulk з тим самим значенням параметра page_size.

copy

Офіційна документація PostgreSQL містить цілий розділ про заповнення БД. Відповідно до документації, найкращий спосіб додати дані в БД — використати команду copy.

psycopg реалізовує функціонал copy за допомогою спеціальної функції copy_from. Команда copy вимагає CSV-файл. Тож спробуймо перетворити наші дані на CSV та завантажити їх в БД з copy_from:

import io

def clean_csv_value(value: Optional[Any]) -> str:
    if value is None:
        return r'\\N'
    return str(value).replace('\
', '\\\
')

@profile
def copy_stringio(connection, beers: Iterator[Dict[str, Any]]) -> None:
    with connection.cursor() as cursor:
        create_staging_table(cursor)
        csv_file_like_object = io.StringIO()
        for beer in beers:
            csv_file_like_object.write('|'.join(map(clean_csv_value, (
                beer['id'],
                beer['name'],
                beer['tagline'],
                parse_first_brewed(beer['first_brewed']),
                beer['description'],
                beer['image_url'],
                beer['abv'],
                beer['ibu'],
                beer['target_fg'],
                beer['target_og'],
                beer['ebc'],
                beer['srm'],
                beer['ph'],
                beer['attenuation_level'],
                beer['contributed_by'],
                beer['brewers_tips'],
                beer['volume']['value'],
            ))) + '\
')
        csv_file_like_object.seek(0)
        cursor.copy_from(csv_file_like_object, 'staging_beers', sep='|')

Розглянемо послідовно:

  • clean_csv_value: перетворює значення

    Екранування нового рядка: деякі текстові поля містять перехід на новий рядок, тому ми перетворюємо \ -> \\\;

    Пусті значення перетворюються на \\N: рядок \\N використовується PostgreSQL, щоб виявити значення NULL в COPY (можна змінити за допомогою параметра NULL).

  • csv_file_like_object: генерує файловий об'єкт, використовуючи io.StringIO. Об'єкт StringIO містить рядок, який можна використовувати як файл. У нашому випадку це .CSV-файл.

  • csv_file_like_object.write: перетворює елемент на рядок CSV

    Перетворення даних: тут відбуваються перетворення значень first_brewed та volume;

    Встановлення розділювача: деякі поля в наборі даних містять текст с комами. Щоб запобігати конфліктам, ми встановлюємо як розділювач | (можна використовувати параметр QUOTE).

Поглянемо на результати нашої важкої роботи:

>>> copy_stringio(connection, beers)
copy_stringio()
Time   0.6274
Memory 99.109375

Команда copy— найшвидша з тих, що ми робили. З її використанням увесь процес зайняв трохи менше ніж секунду. Однак з погляду використання пам'яті цей метод не такий вже й оптимальний. Функція використовує 99МБ пам'яті, а це в понад два рази більше за розмір JSON-файлу на диску.

copy та рядковий ітератор

Один з основних недоліків використання copy зі StringIO в тому, що весь файл створюється локально. А якби ми могли створити об'єкт, що виконував би роль буфера між віддаленим джерелом та командою COPY? Буфер буде зчитувати JSON через ітератор, очищувати та перетворювати дані, а на виході отримаємо чистий CSV.

Найшвидший спосіб завантажити дані в PostgreSQL за допомогою Python
Копіювання даних з рядкового ітератора (Джерело: yuml.me)

Надихнувшись відповіддю на Stack Overflow, ми створили об'єкт, який отримує дані з ітератора та передбачає інтерфейс, подібний до файлу:

from typing import Iterator, Optional
import io

class StringIteratorIO(io.TextIOBase):
    def __init__(self, iter: Iterator[str]):
        self._iter = iter
        self._buff = ''

    def readable(self) -> bool:
        return True

    def _read1(self, n: Optional[int] = None) -> str:
        while not self._buff:
            try:
                self._buff = next(self._iter)
            except StopIteration:
                break
        ret = self._buff[:n]
        self._buff = self._buff[len(ret):]
        return ret

    def read(self, n: Optional[int] = None) -> str:
        line = []
        if n is None or n < 0:
            while True:
                m = self._read1()
                if not m:
                    break
                line.append(m)
        else:
            while n > 0:
                m = self._read1(n)
                if not m:
                    break
                n -= len(m)
                line.append(m)
        return ''.join(line)

Розглянемо спочатку, як можна згенерувати зі списку чисел CSV-об'єкт, подібний до файлу:

>>> gen = (f'{i},{i**2}\
' for i in range(3))
>>> gen
<generator object <genexpr> at 0x7f58bde7f5e8>
>>> f = StringIteratorIO(gen)
>>> print(f.read())
0,0
1,1
2,4

Зверніть увагу, що ми використовували f як файл. Він отримував рядки з gen, тільки коли його внутрішній рядковий буфер був пустим.

Завантаження даних з StringIteratorIO матиме такий вигляд:

@profile
def copy_string_iterator(connection, beers: Iterator[Dict[str, Any]]) -> None:
    with connection.cursor() as cursor:
        create_staging_table(cursor)
        beers_string_iterator = StringIteratorIO((
            '|'.join(map(clean_csv_value, (
                beer['id'],
                beer['name'],
                beer['tagline'],
                parse_first_brewed(beer['first_brewed']).isoformat(),
                beer['description'],
                beer['image_url'],
                beer['abv'],
                beer['ibu'],
                beer['target_fg'],
                beer['target_og'],
                beer['ebc'],
                beer['srm'],
                beer['ph'],
                beer['attenuation_level'],
                beer['brewers_tips'],
                beer['contributed_by'],
                beer['volume']['value'],
            ))) + '\
'
            for beer in beers
        ))
        cursor.copy_from(beers_string_iterator, 'beers', sep='|')

Основна відмінність у тому, що CSV-файл використовується за потребою, а дані не зберігаються у пам'яті після того, як були використані.

Проаналізуємо результати виконання функції:

>>> copy_string_iterator(connection, beers)
copy_string_iterator()
Time   0.4596
Memory 0.0

Чудово! Час виконання та використання пам'яті наближуються до нуля.

Копіювання даних з рядкового ітератора з вказаним розміром буфера

У спробах отримати від продуктивності все, звернемо увагу ще на параметр, подібний до page_size, але для команди copy.

size — розмір буфера, який використовується для зчитування з файлу.

Додамо до нашої функції вказаний аргумент:

@profile
def copy_string_iterator(connection, beers: Iterator[Dict[str, Any]], size: int = 8192) -> None:
    with connection.cursor() as cursor:
        create_staging_table(cursor)
        beers_string_iterator = StringIteratorIO((
            '|'.join(map(clean_csv_value, (
                beer['id'],
                beer['name'],
                beer['tagline'],
                parse_first_brewed(beer['first_brewed']).isoformat(),
                beer['description'],
                beer['image_url'],
                beer['abv'],
                beer['ibu'],
                beer['target_fg'],
                beer['target_og'],
                beer['ebc'],
                beer['srm'],
                beer['ph'],
                beer['attenuation_level'],
                beer['brewers_tips'],
                beer['contributed_by'],
                beer['volume']['value'],
            ))) + '\
'
            for beer in beers
        ))
        cursor.copy_from(beers_string_iterator, 'beers', sep='|', size=size)

Значення за замовчуванням для size становить 8192, а це 2 ** 13, тому ми й для прикладів будемо використовувати значення, які є ступенями числа 2.

>>> copy_string_iterator(connection, iter(beers), size=1024)
copy_string_iterator(size=1024)
Time   0.4536
Memory 0.0

>>> copy_string_iterator(connection, iter(beers), size=8192)
copy_string_iterator(size=8192)
Time   0.4596
Memory 0.0

>>> copy_string_iterator(connection, iter(beers), size=16384)
copy_string_iterator(size=16384)
Time   0.4649
Memory 0.0

>>> copy_string_iterator(connection, iter(beers), size=65536)
copy_string_iterator(size=65536)
Time   0.6171
Memory 0.0

На відміну від попередніх прикладів, тут немає компромісу між швидкістю та пам'яттю. І зрозуміло, чому так: адже метод був створений саме для того, щоб зовсім не використовувати пам'ять. Однак час все ж відрізняється, коли змінюємо size. Оптимальним варіантом буде значення 8192.

Таблиця результатів

Тож можемо порівняти усі отримані результати:

Функція Час (с) Пам'ять (МБ)
insert_one_by_one() 128.8 0.08203125
insert_executemany() 124.7 2.765625
insert_executemany_iterator() 129.3 0.0
insert_execute_batch() 3.917 2.50390625
insert_execute_batch_iterator(page_size=1) 130.2 0.0
insert_execute_batch_iterator(page_size=100) 4.333 0.0
insert_execute_batch_iterator(page_size=1000) 2.537 0.2265625
insert_execute_batch_iterator(page_size=10000) 2.585 25.4453125
insert_execute_values() 3.666 4.50390625
insert_execute_values_iterator(page_size=1) 127.4 0.0
insert_execute_values_iterator(page_size=100) 3.677 0.0
insert_execute_values_iterator(page_size=1000) 1.468 0.0
insert_execute_values_iterator(page_size=10000) 1.503 2.25
copy_stringio() 0.6274 99.109375
copy_string_iterator(size=1024) 0.4536 0.0
copy_string_iterator(size=8196) 0.4596 0.0
copy_string_iterator(size=16384) 0.4649 0.0
copy_string_iterator(size=65536) 0.6171 0.0

Висновок

Як завжди, у всіх виникає запитання: «Що я повинен використовувати?». І, як завжди, відповідь на це: «Все залежить від ситуації».

Кожен метод має власні переваги та недоліки і пасуватиме до різних ситуацій:

Віддавайте перевагу вбудованим підходам для складних типів даних.

Execute many, execute values та batch турбуються про перетворення типів даних Python в типи даних БД. CSV-підходи потребують додаткової обробки.

Віддавайте перевагу вбудованим підходам для невеликих обсягів даних.

Такі підходи більш читабельні та менш ймовірно, що вийдуть з ладу у майбутньому. Тож якщо пам'ять та час для вас не проблема, keep it simple.

Віддавайте перевагу підходам з copy для великих обсягів даних.

Такий підхід буде вдалим для випадків, коли використання пам'яті може стати проблемою.

Сирцевий код зі статті можна знайти за посиланням.

Помітили помилку? Повідомте автору, для цього достатньо виділити текст з помилкою та натиснути Ctrl+Enter
Codeguida 3.9K
Приєднався: 11 місяців тому
Коментарі (0)

    Ще немає коментарів

Щоб залишити коментар необхідно авторизуватися.

Вхід / Реєстрація