Згадайте, як часто вам доводилося завантажувати дані зі стороннього ресурсу до власного проєкту. Якщо ви щасливчик, то ці дані серіалізовані як JSON або YAML. Якщо ні, то отримуєте Excel-таблицю або CSV-файл, який обов'язково (незрозуміло, чому) повинен бути некоректним.
Дані великих компаній або старих систем завжди якось дивно закодовані, заархівовані або розбиті на менші файли з рандомними назвами.
Сучасні сервіси можуть, звичайно, пропонувати хороший API, однак нам не часто потрібно отримувати файл з FTP, SFTP, S3 або якогось застарілого сховища, яке працює лише на Windows.
У статті ми розглянемо найкращий спосіб імпортувати великий обсяг даних з віддаленого джерела в PostgreSQL БД.
Спочатку про передумови:
- Дані отримуються з віддаленого джерела.
- Дані — неформатовані, потребують перетворень.
- Великий обсяг даних.
Налаштування
Для прикладу будемо використовувати дані з API за посиланням: ними й заповнимо таблицю в базі даних.
$ curl https://api.punkapi.com/v2/beers/?per_page=1&page=1
[
{
"id": 1,
"name": "Buzz",
"tagline": "A Real Bitter Experience.",
"first_brewed": "09/2007",
"description": "A light, crisp and bitter IPA ...",
"image_url": "https://images.punkapi.com/v2/keg.png",
"abv": 4.5,
"ibu": 60,
"target_fg": 1010,
"target_og": 1044,
"ebc": 20,
"srm": 10,
"ph": 4.4,
"attenuation_level": 75,
"volume": {
"value": 20,
"unit": "litres"
},
"contributed_by": "Sam Mason <samjbmason>"
"brewers_tips": "The earthy and floral aromas from...",
"boil_volume": {},
"method": {},
"ingredients": {},
"food_pairing": [],
}
]
У статті ми будемо імпортувати усі рядки до brewers_tips
в таблицю бази даних.
Поле volume
— вкладене. Нам необхідно отримати лише value
з поля та зберегти його в таблиці в колонці volume
.
volume = beer['volume']['data']
Поле first_brewed
містить лише рік та місяць, а в деяких випадках лише рік. Нам необхідно перетворити це значення на валідну дату. Наприклад, значення 09/2007
буде перетворено на дату 2007-09-01
. Значення 2006
— на 2016-01-01
.
Напишемо просту функцію, щоб перетворити текстові значення в datetime.date
у Python:
import datetime
def parse_first_brewed(text: str) -> datetime.date:
parts = text.split('/')
if len(parts) == 2:
return datetime.date(int(parts[1]), int(parts[0]), 1)
elif len(parts) == 1:
return datetime.date(int(parts[0]), 1, 1)
else:
assert False, 'Unknown date format'
Переконаємось, що все працює:
>>> parse_first_brewed('09/2007')
datetime.date(2007, 9, 1)
>>> parse_first_brewed('2006')
datetime.date(2006, 1, 1)
В реальних ситуаціях перетворення можуть бути набагато складнішими. Але для демонстрації цього достатньо.
Отримуємо дані
API вказує дані посторінково. Щоб інкапсулювати цю логіку, ми створимо генератор, який видаватиме елемент один за одним:
from typing import Iterator, Dict, Any
from urllib.parse import urlencode
import requests
def iter_beers_from_api(page_size: int = 5) -> Iterator[Dict[str, Any]]:
session = requests.Session()
page = 1
while True:
response = session.get('https://web.archive.org/web/20230610000809/https://api.punkapi.com/v2/beers?' + urlencode({
'page': page,
'per_page': page_size
}))
response.raise_for_status()
data = response.json()
if not data:
break
yield from data
page += 1
Подивимось на нашу функцію-генератор у дії:
>>> beers = iter_beers_from_api()
>>> next(beers)
{'id': 1,
'name': 'Buzz',
'tagline': 'A Real Bitter Experience.',
'first_brewed': '09/2007',
'description': 'A light, crisp and bitter IPA brewed...',
'image_url': 'https://web.archive.org/web/20230610000809/https://images.punkapi.com/v2/keg.png',
'abv': 4.5,
'ibu': 60,
'target_fg': 1010,
...
}
>>> next(beers)
{'id': 2,
'name': 'Trashy Blonde',
'tagline': "You Know You Shouldn't",
'first_brewed': '04/2008',
'description': 'A titillating, ...',
'image_url': 'https://web.archive.org/web/20230610000809/https://images.punkapi.com/v2/2.png',
'abv': 4.1,
'ibu': 41.5,
Ви можете помітити, що перший результат кожної сторінки отримується найдовше. Усе тому, що здійснюється запит для отримання сторінки.
Створюємо таблицю в БД
Наступний крок — створити таблицю, куди ми будемо імпортувати дані.
Створюємо саму БД:
$ createdb -O haki testload
Замініть haki
на вашого локального користувача.
Щоб з'єднатися з PostgreSQL базою даних з Python, ми можемо використовувати pycopg:
python -m pip install psycopg2
За допомогою psycopg створюємо з'єднання з БД:
import psycopg2
connection = psycopg2.connect(
host="localhost",
database="testload",
user="haki",
password=None,
)
connection.autocommit = True
Ми встановлюємо autocommit=True
, щоб кожна команда, яку ми виконуємо, застосовувалась миттєво. Це саме те, що нам потрібно для прикладу.
Тепер у нас встановлено з'єднання, тож можемо написати функцію для створення таблиці:
def create_staging_table(cursor) -> None:
cursor.execute("""
DROP TABLE IF EXISTS staging_beers;
CREATE UNLOGGED TABLE staging_beers (
id INTEGER,
name TEXT,
tagline TEXT,
first_brewed DATE,
description TEXT,
image_url TEXT,
abv DECIMAL,
ibu DECIMAL,
target_fg DECIMAL,
target_og DECIMAL,
ebc DECIMAL,
srm DECIMAL,
ph DECIMAL,
attenuation_level DECIMAL,
brewers_tips TEXT,
contributed_by TEXT,
volume INTEGER
);
""")
Функція отримує курсор та створює unlogged-таблицю staging_beers
.
Дані, додані до unlogged таблиці, не будуть логовані до журналу випереджального запису (write-ahead-log, WAL). А це ідеальний варіант для проміжних таблиць Зверніть увагу, що UNLOGGED
-таблиці не будуть відновлені у разі збою.
Використовуючи з'єднання, яке ми створили раніше, викличемо нашу функцію:
>>> with connection.cursor() as cursor:
>>> create_staging_table(cursor)
Тепер ми готові рухатись далі.
Показники
В матеріалі нас цікавитимуть два основні показники: час та пам'ять.
Вимірювання часу
Щоб визначити час виконання кожного методу, ми можемо використати вбудований модуль time
:
>>> import time
>>> start = time.perf_counter()
>>> time.sleep(1) # do work
>>> elapsed = time.perf_counter() - start
>>> print(f'Time {elapsed:0.4}')
Time 1.001
Функція perf_counter
ідеально підійде для виміру часу виконання наших методів.
Вимірювання пам'яті
Визначити обсяг використаної пам'яті нам допоможе пакет memory-profiler.
$ python -m pip install memory-profiler
З цим пакетом ми можемо визначити як звичайне, так і додаткове використання пам'яті для кожного рядка коду. Саме така інформація стане нам у пригоді для оптимізації. Звернемося до прикладу PyPI:
$ python -m memory_profiler example.py
Line # Mem usage Increment Line Contents
==============================================
3 @profile
4 5.97 MB 0.00 MB def my_func():
5 13.61 MB 7.64 MB a = [1] * (10 ** 6)
6 166.20 MB 152.59 MB b = [2] * (2 * 10 ** 7)
7 13.61 MB -152.59 MB del b
8 13.61 MB 0.00 MB return a
Варто звернути увагу на колонку Increment
, яка демонструє додаткову пам'ять, виділену кодом у кожному рядку.
У статті нас цікавитиме використання функціями «peak memory», тобто різниці між значеннями колонок mem usage
та найвищим значенням.
Щоб отримати значення mem usage
, ми використаємо функцію memory_usage
з memory_profiler
:
>>> from memory_profiler import memory_usage
>>> mem, retval = memory_usage((fn, args, kwargs), retval=True, interval=1e-7)
При такому використанні функція memory_usage
виконує функцію fn
з передбаченими args
та kwargs
, але також запускає інший фоновий процес, щоб контролювати використання пам'яті кожні interval
секунд.
Для дуже швидких операцій, функція fn
може виконатись більше ніж один раз. Якщо як interval
ми встановимо значення менше, ніж 1e-6
, функція виконається лише раз.
Аргумент retval
вказує функції повернути результат fn
.
Декоратор profile
Щоб з'єднати все докупи, ми створюємо декоратор, аби виміряти показники та дати відповідну інформацію про використання пам'яті та час виконання.
import time
from functools import wraps
from memory_profiler import memory_usage
def profile(fn):
@wraps(fn)
def inner(*args, **kwargs):
fn_kwargs_str = ', '.join(f'{k}={v}' for k, v in kwargs.items())
print(f'\
{fn.__name__}({fn_kwargs_str})')
# Вимірювання часу
t = time.perf_counter()
retval = fn(*args, **kwargs)
elapsed = time.perf_counter() - t
print(f'Time {elapsed:0.4}')
# Вимірювання пам'яті
mem, retval = memory_usage((fn, args, kwargs), retval=True, timeout=200, interval=1e-7)
print(f'Memory {max(mem) - min(mem)}')
return retval
return inner
Щоб уникнути взаємного впливу часу на пам'ять і навпаки, ми виконаємо функцію два рази: один раз для вимірювання часу, другий — для пам'яті.
Декоратор виведе назву функції та будь-які kwargs
, а також відзвітує про використання пам'яті та час.
>>> @profile
>>> def work(n):
>>> for i in range(n):
>>> 2 ** n
>>> work(10)
work()
Time 0.06269
Memory 0.0
>>> work(n=10000)
work(n=10000)
Time 0.3865
Memory 0.0234375
Ми навмисно виводимо тут лише kwargs
, які збираємось використовувати також у параметризованих тестах далі.
Контрольні показники
Коли писалась ця стаття, у API було лише 325 елементів. Щоб отримати більш об'ємні дані, ми продублювали вихідні дані 100 разів та зберегли їх у пам'яті. Отриманий набір даних містить 32 500 елементів:
>> beers = list(iter_beers_from_api()) * 100
>>> len(beers)
32,500
Для імітації стороннього API наші функції прийматимуть ітератори, подібні до поверненого функцією iter_beers_from_api
значення:
def process(beers: Iterator[Dict[str, Any]])) -> None:
# Обробка
Для отримання контрольних показників, ми імпортуємо дані до БД. Щоб усунути вплив зовнішніх факторів (наприклад, мережі), ми отримуємо дані з API заздалегідь та обробляємо їх локально.
Щоб отримати точне значення часу, ми «підробляємо» стороннє API:
>>> beers = list(iter_beers_from_api()) * 100
>>> process(beers)
В реальних ситуаціях ви використаєте функцію iter_beers_from_api
напряму:
>>> process(iter_beers_from_api())
Тепер усе готово, щоб почати тестувати різні методи!
insert_one_by_one
Почнемо з найпростішого прикладу: вставимо рядки один за одним:
@profile
def insert_one_by_one(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
for beer in beers:
cursor.execute("""
INSERT INTO staging_beers VALUES (
%(id)s,
%(name)s,
%(tagline)s,
%(first_brewed)s,
%(description)s,
%(image_url)s,
%(abv)s,
%(ibu)s,
%(target_fg)s,
%(target_og)s,
%(ebc)s,
%(srm)s,
%(ph)s,
%(attenuation_level)s,
%(brewers_tips)s,
%(contributed_by)s,
%(volume)s
);
""", {
**beer,
'first_brewed': parse_first_brewed(beer['first_brewed']),
'volume': beer['volume']['value'],
})
Зверніть увагу, як під час ітерації елементами, ми перетворюємо first_brewed
на datetime.date
, а також отримуємо значення з вкладеного поля volume
.
Запустивши цю функцію, ми отримаємо такий результат:
>>> insert_one_by_one(connection, beers)
insert_one_by_one()
Time 128.8
Memory 0.08203125
Функції знадобилось 129 секунд, щоб імпортувати 32 тисячі рядків. memory profiler показує, що було використано зовсім небагато пам'яті.
Видається, що вставка рядків один за одним не надто ефективний метод: постійне перемикання між програмою і БД сповільнює цей процес.
executemany
Psycopg2 дозволяє вставляти багато рядків за один раз за допомогою executemany
. Ось що сказано в документації:
Функція виконує операцію БД (запит чи команду) для всіх кортежів параметрів або зіставлень, знайдених у vars_list
.
Спробуймо застосувати цю функцію на практиці:
@profile
def insert_executemany(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
all_beers = [{
**beer,
'first_brewed': parse_first_brewed(beer['first_brewed']),
'volume': beer['volume']['value'],
} for beer in beers]
cursor.executemany("""
INSERT INTO staging_beers VALUES (
%(id)s,
%(name)s,
%(tagline)s,
%(first_brewed)s,
%(description)s,
%(image_url)s,
%(abv)s,
%(ibu)s,
%(target_fg)s,
%(target_og)s,
%(ebc)s,
%(srm)s,
%(ph)s,
%(attenuation_level)s,
%(brewers_tips)s,
%(contributed_by)s,
%(volume)s
);
""", all_beers)
Все дуже схоже на попередній приклад, особливо всі перетворення даних. Основна відмінність в тому, що ми спочатку робимо всі перетворення в пам'яті, а вже потім імпортуємо їх в БД.
Результат виконання функції:
>>> insert_executemany(connection, beers)
insert_executemany()
Time 124.7
Memory 2.765625
Результати засмучують. Все відпрацювало трохи швидше, ніж у попередньому прикладі, проте використання пам'яті тепер становить 2.7МБ.
Зараз JSON-файл з даними, які ми імпортували, важить 25МБ на диску. З таким підходом файл в 1ГБ потребуватиме 110МБ пам'яті.
executemany
з ітератором
Попередній метод не був оптимальним через використання пам'яті, адже перетворені дані зберігались у пам'яті перед обробкою psycopg.
Перевіримо, чи допоможе ітератор не зберігати дані локально:
@profile
def insert_executemany_iterator(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
cursor.executemany("""
INSERT INTO staging_beers VALUES (
%(id)s,
%(name)s,
%(tagline)s,
%(first_brewed)s,
%(description)s,
%(image_url)s,
%(abv)s,
%(ibu)s,
%(target_fg)s,
%(target_og)s,
%(ebc)s,
%(srm)s,
%(ph)s,
%(attenuation_level)s,
%(brewers_tips)s,
%(contributed_by)s,
%(volume)s
);
""", ({
**beer,
'first_brewed': parse_first_brewed(beer['first_brewed']),
'volume': beer['volume']['value'],
} for beer in beers))
Основна відмінність у тому, що перетворені дані переходять потоком в executemany
, використовуючи ітератор.
Після виконання отримуємо такий результат:
>>> insert_executemany_iterator(connection, beers)
insert_executemany_iterator()
Time 129.3
Memory 0.0
Цей метод працює, як і очікувалось, тому нам вдалось звести використання пам'яті до 0. Однак час виконання залишається приблизно таким самим, як і у методі one-by-one
.
execute_batch
Документація psycopg дає таку інформацію про executemany
в розділі «Fast execution helpers»:
Поточна реалізація executemany()
не дуже продуктивна. Така функція може використовуватись для прискорення повторного виконання виразу для певного набору параметрів. Зменшивши кількість звертань до сервера, ви скоріше збільшите продуктивність, ніж якщо будете використовувати executemany()
.
Тож весь цей час ми робили все неправильно!
Але документація пропонує нам також функцію execute_batch
:
Виконує групу виразів з мінімальною кількістю звернень до сервера.
Реалізуємо завантаження з execute_batch
:
import psycopg2.extras
@profile
def insert_execute_batch(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
all_beers = [{
**beer,
'first_brewed': parse_first_brewed(beer['first_brewed']),
'volume': beer['volume']['value'],
} for beer in beers]
psycopg2.extras.execute_batch(cursor, """
INSERT INTO staging_beers VALUES (
%(id)s,
%(name)s,
%(tagline)s,
%(first_brewed)s,
%(description)s,
%(image_url)s,
%(abv)s,
%(ibu)s,
%(target_fg)s,
%(target_og)s,
%(ebc)s,
%(srm)s,
%(ph)s,
%(attenuation_level)s,
%(brewers_tips)s,
%(contributed_by)s,
%(volume)s
);
""", all_beers)
Результат виконання:
>>> insert_execute_batch(connection, beers)
insert_execute_batch()
Time 3.917
Memory 2.50390625
Результати вражають: функція впоралась майже за 4 секунди, а це приблизно в 33 рази швидше за показник, з якого ми починали.
execute_batch
з ітератором
Функція execute_batch
використовує менше пам'яті, ніж executemany
для однакового обсягу даних. Спробуймо ще покращити цей показник, використавши execute_batch
з ітератором:
@profile
def insert_execute_batch_iterator(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
iter_beers = ({
**beer,
'first_brewed': parse_first_brewed(beer['first_brewed']),
'volume': beer['volume']['value'],
} for beer in beers)
psycopg2.extras.execute_batch(cursor, """
INSERT INTO staging_beers VALUES (
%(id)s,
%(name)s,
%(tagline)s,
%(first_brewed)s,
%(description)s,
%(image_url)s,
%(abv)s,
%(ibu)s,
%(target_fg)s,
%(target_og)s,
%(ebc)s,
%(srm)s,
%(ph)s,
%(attenuation_level)s,
%(brewers_tips)s,
%(contributed_by)s,
%(volume)s
);
""", iter_beers)
Результати:
>>> insert_execute_batch_iterator(connection, beers)
insert_execute_batch_iterator()
Time 4.333
Memory 0.2265625
Час виконання майже не змінився, проте пам'яті використовується значно менше.
execute_batch
з ітератором та page_size
В розділі документації про execute_batch
можна натрапити на аргумент page_size
.
page_size
— максимальна кількість елементів arglist
, які можна додати до одного виразу. Якщо елементів більше, функція виконуватиме понад один вираз.
Раніше ми вже дізналися з документації, що продуктивність функції покращується зі зменшенням звернень до БД. Якщо в цьому суть, більший розмір сторінки повинен зменшити кількість таких звернень, а отже — й час завантаження.
Додамо аргумент page_size
до нашої функції, щоб ми могли дослідити результат:
@profile
def insert_execute_batch_iterator(
connection,
beers: Iterator[Dict[str, Any]],
page_size: int = 100,
) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
iter_beers = ({
**beer,
'first_brewed': parse_first_brewed(beer['first_brewed']),
'volume': beer['volume']['value'],
} for beer in beers)
psycopg2.extras.execute_batch(cursor, """
INSERT INTO staging_beers VALUES (
%(id)s,
%(name)s,
%(tagline)s,
%(first_brewed)s,
%(description)s,
%(image_url)s,
%(abv)s,
%(ibu)s,
%(target_fg)s,
%(target_og)s,
%(ebc)s,
%(srm)s,
%(ph)s,
%(attenuation_level)s,
%(brewers_tips)s,
%(contributed_by)s,
%(volume)s
);
""", iter_beers, page_size=page_size)
Значення за замовчуванням для page_size
— 100. Протестуємо різні значення та порівняємо результати виконання:
>>> insert_execute_batch_iterator(connection, iter(beers), page_size=1)
insert_execute_batch_iterator(page_size=1)
Time 130.2
Memory 0.0
>>> insert_execute_batch_iterator(connection, iter(beers), page_size=100)
insert_execute_batch_iterator(page_size=100)
Time 4.333
Memory 0.0
>>> insert_execute_batch_iterator(connection, iter(beers), page_size=1000)
insert_execute_batch_iterator(page_size=1000)
Time 2.537
Memory 0.2265625
>>> insert_execute_batch_iterator(connection, iter(beers), page_size=10000)
insert_execute_batch_iterator(page_size=10000)
Time 2.585
Memory 25.4453125
Ми отримали дещо цікаві результати. Розглянемо кожен окремо:
-
1
: Результат подібний доinsert_one_by_one
; -
100
: Значенняpage_size
за замовчуванням, тому результат подібний до нашого попереднього випробування; -
1000
: Час виконання на 40% кращий і пам'яті використовується небагато; -
10000
: Показник часу не набагато ліпший за попередній результат, проте показник використання пам'яті на порядок вищий.
Результати показують, що є деякий компроміс між пам'яттю та швидкістю. Здається, найкращим варіантом буде використання значення 1000
для page_size
.
execute_values
В документації про execute_values
вказано:
Виконує вираз, використовуючи значення з послідовністю параметрів.
Функція execute_values
генерує великий список VALUES
для запиту.
Спробуймо функцію на практиці:
import psycopg2.extras
@profile
def insert_execute_values(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
psycopg2.extras.execute_values(cursor, """
INSERT INTO staging_beers VALUES %s;
""", [(
beer['id'],
beer['name'],
beer['tagline'],
parse_first_brewed(beer['first_brewed']),
beer['description'],
beer['image_url'],
beer['abv'],
beer['ibu'],
beer['target_fg'],
beer['target_og'],
beer['ebc'],
beer['srm'],
beer['ph'],
beer['attenuation_level'],
beer['brewers_tips'],
beer['contributed_by'],
beer['volume']['value'],
) for beer in beers])
Проаналізуємо результат виконання:
>>> insert_execute_values(connection, beers)
insert_execute_values()
Time 3.666
Memory 4.50390625
Тож одразу з коробки ми отримуємо невелике пришвидшення, якщо порівняти з execute_bulk
. Однак показник пам'яті трохи завищений.
execute_values
з ітератором
Як і раніше, щоб зменшити використання пам'яті, ми намагаємось уникати збереження даних локально, використовуючи ітератор замість списку:
@profile
def insert_execute_values_iterator(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
psycopg2.extras.execute_values(cursor, """
INSERT INTO staging_beers VALUES %s;
""", ((
beer['id'],
beer['name'],
beer['tagline'],
parse_first_brewed(beer['first_brewed']),
beer['description'],
beer['image_url'],
beer['abv'],
beer['ibu'],
beer['target_fg'],
beer['target_og'],
beer['ebc'],
beer['srm'],
beer['ph'],
beer['attenuation_level'],
beer['brewers_tips'],
beer['contributed_by'],
beer['volume']['value'],
) for beer in beers))
Отримаємо такий результат:
>>> insert_execute_values_iterator(connection, beers)
insert_execute_values_iterator()
Time 3.677
Memory 0.0
Час виконання майже той самий, однак використання пам'яті зводиться до 0.
execute_values
з ітератором та page_size
Подібно до execute_bulk
, execute_values
приймає параметр page_size
:
@profile
def insert_execute_values_iterator(
connection,
beers: Iterator[Dict[str, Any]],
page_size: int = 100,
) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
psycopg2.extras.execute_values(cursor, """
INSERT INTO staging_beers VALUES %s;
""", ((
beer['id'],
beer['name'],
beer['tagline'],
parse_first_brewed(beer['first_brewed']),
beer['description'],
beer['image_url'],
beer['abv'],
beer['ibu'],
beer['target_fg'],
beer['target_og'],
beer['ebc'],
beer['srm'],
beer['ph'],
beer['attenuation_level'],
beer['brewers_tips'],
beer['contributed_by'],
beer['volume']['value'],
) for beer in beers), page_size=page_size)
Протестуємо на різних значеннях параметра page_size
:
>>> insert_execute_values_iterator(connection, iter(beers), page_size=1)
insert_execute_values_iterator(page_size=1)
Time 127.4
Memory 0.0
>>> insert_execute_values_iterator(connection, iter(beers), page_size=100)
insert_execute_values_iterator(page_size=100)
Time 3.677
Memory 0.0
>>> insert_execute_values_iterator(connection, iter(beers), page_size=1000)
insert_execute_values_iterator(page_size=1000)
Time 1.468
Memory 0.0
>>> insert_execute_values_iterator(connection, iter(beers), page_size=10000)
insert_execute_values_iterator(page_size=10000)
Time 1.503
Memory 2.25
Як і у випадку з execute_bulk
, ми бачимо компроміс між пам'яттю та швидкістю. Тут також оптимальним рішенням буде значення за замовчуванням 1000
. Однак з використанням execute_values
наш результат приблизно на 20% швидший за execute_bulk
з тим самим значенням параметра page_size
.
copy
Офіційна документація PostgreSQL містить цілий розділ про заповнення БД. Відповідно до документації, найкращий спосіб додати дані в БД — використати команду copy
.
psycopg реалізовує функціонал copy
за допомогою спеціальної функції copy_from
. Команда copy
вимагає CSV-файл. Тож спробуймо перетворити наші дані на CSV та завантажити їх в БД з copy_from
:
import io
def clean_csv_value(value: Optional[Any]) -> str:
if value is None:
return r'\\N'
return str(value).replace('\
', '\\\
')
@profile
def copy_stringio(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
csv_file_like_object = io.StringIO()
for beer in beers:
csv_file_like_object.write('|'.join(map(clean_csv_value, (
beer['id'],
beer['name'],
beer['tagline'],
parse_first_brewed(beer['first_brewed']),
beer['description'],
beer['image_url'],
beer['abv'],
beer['ibu'],
beer['target_fg'],
beer['target_og'],
beer['ebc'],
beer['srm'],
beer['ph'],
beer['attenuation_level'],
beer['contributed_by'],
beer['brewers_tips'],
beer['volume']['value'],
))) + '\
')
csv_file_like_object.seek(0)
cursor.copy_from(csv_file_like_object, 'staging_beers', sep='|')
Розглянемо послідовно:
-
clean_csv_value
: перетворює значенняЕкранування нового рядка: деякі текстові поля містять перехід на новий рядок, тому ми перетворюємо
\ -> \\\
;Пусті значення перетворюються на
\\N
: рядок\\N
використовується PostgreSQL, щоб виявити значенняNULL
вCOPY
(можна змінити за допомогою параметраNULL
). -
csv_file_like_object
: генерує файловий об'єкт, використовуючиio.StringIO
. Об'єктStringIO
містить рядок, який можна використовувати як файл. У нашому випадку це.CSV
-файл. -
csv_file_like_object.write
: перетворює елемент на рядок CSVПеретворення даних: тут відбуваються перетворення значень
first_brewed
таvolume
;Встановлення розділювача: деякі поля в наборі даних містять текст с комами. Щоб запобігати конфліктам, ми встановлюємо як розділювач
|
(можна використовувати параметрQUOTE
).
Поглянемо на результати нашої важкої роботи:
>>> copy_stringio(connection, beers)
copy_stringio()
Time 0.6274
Memory 99.109375
Команда copy
— найшвидша з тих, що ми робили. З її використанням увесь процес зайняв трохи менше ніж секунду. Однак з погляду використання пам'яті цей метод не такий вже й оптимальний. Функція використовує 99МБ пам'яті, а це в понад два рази більше за розмір JSON-файлу на диску.
copy
та рядковий ітератор
Один з основних недоліків використання copy
зі StringIO
в тому, що весь файл створюється локально. А якби ми могли створити об'єкт, що виконував би роль буфера між віддаленим джерелом та командою COPY
? Буфер буде зчитувати JSON через ітератор, очищувати та перетворювати дані, а на виході отримаємо чистий CSV.
Надихнувшись відповіддю на Stack Overflow, ми створили об'єкт, який отримує дані з ітератора та передбачає інтерфейс, подібний до файлу:
from typing import Iterator, Optional
import io
class StringIteratorIO(io.TextIOBase):
def __init__(self, iter: Iterator[str]):
self._iter = iter
self._buff = ''
def readable(self) -> bool:
return True
def _read1(self, n: Optional[int] = None) -> str:
while not self._buff:
try:
self._buff = next(self._iter)
except StopIteration:
break
ret = self._buff[:n]
self._buff = self._buff[len(ret):]
return ret
def read(self, n: Optional[int] = None) -> str:
line = []
if n is None or n < 0:
while True:
m = self._read1()
if not m:
break
line.append(m)
else:
while n > 0:
m = self._read1(n)
if not m:
break
n -= len(m)
line.append(m)
return ''.join(line)
Розглянемо спочатку, як можна згенерувати зі списку чисел CSV-об'єкт, подібний до файлу:
>>> gen = (f'{i},{i**2}\
' for i in range(3))
>>> gen
<generator object <genexpr> at 0x7f58bde7f5e8>
>>> f = StringIteratorIO(gen)
>>> print(f.read())
0,0
1,1
2,4
Зверніть увагу, що ми використовували f
як файл. Він отримував рядки з gen
, тільки коли його внутрішній рядковий буфер був пустим.
Завантаження даних з StringIteratorIO
матиме такий вигляд:
@profile
def copy_string_iterator(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
beers_string_iterator = StringIteratorIO((
'|'.join(map(clean_csv_value, (
beer['id'],
beer['name'],
beer['tagline'],
parse_first_brewed(beer['first_brewed']).isoformat(),
beer['description'],
beer['image_url'],
beer['abv'],
beer['ibu'],
beer['target_fg'],
beer['target_og'],
beer['ebc'],
beer['srm'],
beer['ph'],
beer['attenuation_level'],
beer['brewers_tips'],
beer['contributed_by'],
beer['volume']['value'],
))) + '\
'
for beer in beers
))
cursor.copy_from(beers_string_iterator, 'beers', sep='|')
Основна відмінність у тому, що CSV-файл використовується за потребою, а дані не зберігаються у пам'яті після того, як були використані.
Проаналізуємо результати виконання функції:
>>> copy_string_iterator(connection, beers)
copy_string_iterator()
Time 0.4596
Memory 0.0
Чудово! Час виконання та використання пам'яті наближуються до нуля.
Копіювання даних з рядкового ітератора з вказаним розміром буфера
У спробах отримати від продуктивності все, звернемо увагу ще на параметр, подібний до page_size
, але для команди copy
.
size
— розмір буфера, який використовується для зчитування з файлу.
Додамо до нашої функції вказаний аргумент:
@profile
def copy_string_iterator(connection, beers: Iterator[Dict[str, Any]], size: int = 8192) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
beers_string_iterator = StringIteratorIO((
'|'.join(map(clean_csv_value, (
beer['id'],
beer['name'],
beer['tagline'],
parse_first_brewed(beer['first_brewed']).isoformat(),
beer['description'],
beer['image_url'],
beer['abv'],
beer['ibu'],
beer['target_fg'],
beer['target_og'],
beer['ebc'],
beer['srm'],
beer['ph'],
beer['attenuation_level'],
beer['brewers_tips'],
beer['contributed_by'],
beer['volume']['value'],
))) + '\
'
for beer in beers
))
cursor.copy_from(beers_string_iterator, 'beers', sep='|', size=size)
Значення за замовчуванням для size
становить 8192, а це 2 ** 13
, тому ми й для прикладів будемо використовувати значення, які є ступенями числа 2.
>>> copy_string_iterator(connection, iter(beers), size=1024)
copy_string_iterator(size=1024)
Time 0.4536
Memory 0.0
>>> copy_string_iterator(connection, iter(beers), size=8192)
copy_string_iterator(size=8192)
Time 0.4596
Memory 0.0
>>> copy_string_iterator(connection, iter(beers), size=16384)
copy_string_iterator(size=16384)
Time 0.4649
Memory 0.0
>>> copy_string_iterator(connection, iter(beers), size=65536)
copy_string_iterator(size=65536)
Time 0.6171
Memory 0.0
На відміну від попередніх прикладів, тут немає компромісу між швидкістю та пам'яттю. І зрозуміло, чому так: адже метод був створений саме для того, щоб зовсім не використовувати пам'ять. Однак час все ж відрізняється, коли змінюємо size
. Оптимальним варіантом буде значення 8192
.
Таблиця результатів
Тож можемо порівняти усі отримані результати:
Функція | Час (с) | Пам'ять (МБ) |
---|---|---|
insert_one_by_one() |
128.8 | 0.08203125 |
insert_executemany() |
124.7 | 2.765625 |
insert_executemany_iterator() |
129.3 | 0.0 |
insert_execute_batch() |
3.917 | 2.50390625 |
insert_execute_batch_iterator(page_size=1) |
130.2 | 0.0 |
insert_execute_batch_iterator(page_size=100) |
4.333 | 0.0 |
insert_execute_batch_iterator(page_size=1000) |
2.537 | 0.2265625 |
insert_execute_batch_iterator(page_size=10000) |
2.585 | 25.4453125 |
insert_execute_values() |
3.666 | 4.50390625 |
insert_execute_values_iterator(page_size=1) |
127.4 | 0.0 |
insert_execute_values_iterator(page_size=100) |
3.677 | 0.0 |
insert_execute_values_iterator(page_size=1000) |
1.468 | 0.0 |
insert_execute_values_iterator(page_size=10000) |
1.503 | 2.25 |
copy_stringio() |
0.6274 | 99.109375 |
copy_string_iterator(size=1024) |
0.4536 | 0.0 |
copy_string_iterator(size=8196) |
0.4596 | 0.0 |
copy_string_iterator(size=16384) |
0.4649 | 0.0 |
copy_string_iterator(size=65536) |
0.6171 | 0.0 |
Висновок
Як завжди, у всіх виникає запитання: «Що я повинен використовувати?». І, як завжди, відповідь на це: «Все залежить від ситуації».
Кожен метод має власні переваги та недоліки і пасуватиме до різних ситуацій:
Віддавайте перевагу вбудованим підходам для складних типів даних.
Execute many
, execute values
та batch
турбуються про перетворення типів даних Python в типи даних БД. CSV-підходи потребують додаткової обробки.
Віддавайте перевагу вбудованим підходам для невеликих обсягів даних.
Такі підходи більш читабельні та менш ймовірно, що вийдуть з ладу у майбутньому. Тож якщо пам'ять та час для вас не проблема, keep it simple.
Віддавайте перевагу підходам з copy
для великих обсягів даних.
Такий підхід буде вдалим для випадків, коли використання пам'яті може стати проблемою.
Сирцевий код зі статті можна знайти за посиланням.
Ще немає коментарів