Redis на практичних прикладах

Alex Alex 09 липня 2020
Redis на практичних прикладах
Redis — досить популярний інструмент, який з коробки підтримує велику кількість різних типів даних і методів роботи з ними. У багатьох проєктах він використовується в якості шару кешування, але його можливості набагато ширші. Про деякі цікаві кейси використання цієї in-memory key-value бази даних я розповім на прикладах. Сподіваюся, вам вони будуть корисні, і ви зможете застосувати у своїх проектах.

Розглянемо такі кейси:
  • Кешування даних (так, банально і нудно, але це класний інструмент для кешування і обійти стороною цей кейс, здається буде не правильно)
  • Робота з чергами на базі redis
  • Організація блокувань (mutex)
  • Робимо систему rate-limit
  • Pubsub — робимо розсилки повідомлень на клієнти
Буду працювати з сирими redis командами, щоб не зав'язуватися на яку-небудь конкретну бібліотеку, що надає обгортку над цими командами. Код буду писати на PHP з використанням ext-redis, але він тут для наочності, використовувати представлені підходи можна в зв'язці з будь-якою іншою мовою програмування.

Кешування даних

Давайте почнемо з найпростішого, один з найпопулярніших кейсів використання Redis — кешування даних. Буде корисно для тих, хто не працював з Redis. Для тих, хто вже давно користується цим інструментом можна сміливо переходити до наступного кейсу. Для того, щоб знизити навантаження на БД, мати можливість запитувати часто використовувані дані максимально швидко, використовується кеш. Redis — це in-memory сховище, тобто дані зберігаються в оперативній пам'яті. Ще це key-value сховище, де доступ до даних по їх ключам має складність O(1) — тому дані ми отримуємо дуже швидко.

Отримання даних зі сховища виглядає наступним чином:
public function getValueFromCache(string $key)
{
    return $this->getRedis()->rawCommand('GET', $key);
}
Але для того, щоб дані з кеша отримати, їх потрібно спочатку туди покласти. Простий приклад запису:
public function setValueToCache(string $key, $value)
{
    $this->getRedis()->rawCommand('SET', $key, $value);
} 
Таким чином, ми запишемо дані в Redis і зможемо їх зчитати по тому самому ключу в будь-який потрібний нам момент. Але якщо ми будемо весь час писати у Redis, дані в ньому будуть займати все більше і більше місця в оперативній пам'яті. Нам потрібно видаляти релевантні дані, контролювати це вручну досить проблематично, тому нехай redis займається цим самостійно. Додамо до нашого ключу TTL (час життя ключа):
public function setValueToCache(string $key, $value, int $ttl = 3600)
{
    $this->getRedis()->rawCommand('SET', $key, $value, 'EX', $ttl);
}
Після закінчення часу ttl (в секундах) дані по цьому ключу будуть автоматично видалені.

Як кажуть, в програмуванні існує дві найбільш складних речі: придумування назв змінних та інвалідація кеша. Для того, щоб примусово видалити значення з Redis по ключу, достатньо виконати наступну команду:
public function dropValueFromCache(string $key)
{
    $this->getRedis()->rawCommand('DEL', $key);
}
Також Redis дозволяє отримати масив значень за списком ключів:
public function getValuesFromCache(array $keys)
{
    return $this->getRedis()->rawCommand('MGET', ...$keys);
}
І відповідно масове видалення даних по масиву ключів:
public function dropValuesFromCache(array $keys)
{
    $this->getRedis()->rawCommand('MDEL', ...$keys);
}

Черги

Використовуючи наявні в Redis структури даних, ми можемо запросто реалізувати стандартні черги FIFO або LIFO. Для цього використовуємо структуру List і методи роботи з нею. Робота з чергами складається з двох основних дій: надіслати завдання в чергу, і взяти завдання з черги. Відправляти завдання в чергу ми можемо з будь-якої частини системи. Отриманням завдання з черги і її обробкою зазвичай займається виділений процес, який називається споживачем (consumer).

Отже, для того, щоб відправити наше завдання в чергу, нам достатньо використовувати наступний метод:
public function pushToQueue(string $queueName, $payload)
{
    $this->getRedis()->rawCommand('RPUSH', $queueName, serialize($payload));
}
Тим самим ми додамо в кінець списку з назвою $queueName якийсь $payload, який може представляти із себе JSON для ініціалізації потрібної нам бізнес-логіки (наприклад дані по грошовій транзакції, дані для ініціалізації відправки списка користувачу, etc.). Якщо ж в нашому сховищі не існує листа з ім'ям $queueName, він буде автоматично створений і туди потрапить перший елемент $payload.

З боку консьюмера нам необхідно забезпечити отримання завдань з черги, це реалізується простий командою читання зі списку. Для реалізації черги FIFO ми використовуємо читання із зворотного запису сторони (у нашому випадку ми писали через RPUSH), тобто читати будемо через LPOP:
public function popFromQueue(string $queueName)
{
    return $this->getRedis()->rawCommand('LPOP', $queueName);
}
Для реалізації LIFO черги, нам потрібно буде читати список з тієї ж сторони, з якої ми в нього пишемо, тобто через RPOP.
Redis на практичних прикладах
Тим самим ми вичитуємо по одному повідомленню з черги. У разі якщо списку не існує (він порожній), то ми отримаємо NULL. Каркас консьюмера міг би виглядати так:
class Consumer {

    private string $queueName;

    public function __construct(string $queueName)
    {
        $this->queueName = $queueName;
    }

    public function run()
    {
        while (true) { //Вичитуєму в нескінченному циклі нашу чергу
            $payload = $this->popFromQueue();
            if ($payload === null) { //Якщо ми отримали NULL, значить черга порожня, зробимо невеличку паузу в очікуванні нових повідомлень
                sleep(1);
                continue;
            }
            //Якщо черга не порожня і ми отримали $payload, то запускаємо обробку цього $payload
            $this->process($payload);
        }
    }

    private function popFromQueue()
    {
        return $this->getRedis()->rawCommand('LPOP', $this->queueName);
    }
}
Для того, щоб отримати інформацію про глибину черги (скільки значень зберігається в нашому листі), можемо скористатися наступною командою:
public function getQueueLength(string $queueName)
{
    return $this->getRedis()->rawCommand('LLEN', $queueName);
}
Ми розглянули базову реалізацію простих черг, але Redis дозволяє будувати складніші черги. Наприклад, ми хочемо знати про час останньої активності наших користувачів на сайті. Нам не важливо знати з точністю аж до секунди, прийнятна похибка — 3 хвилини. Ми можемо оновлювати поле last_visit користувача при кожному запиті на наш бекенд від цього користувача. Але якщо цих користувачів велика кількість в онлайні — 10,000 або 100,000? А якщо у нас ще і SPA, яке відправляє багато асинхронних запитів? Якщо на кожен такий запит оновлювати поле в бд, ми отримаємо велику кількість тупих запитів до нашої БД. Цю задачу можна вирішувати різними способами, один з варіантів — це зробити якусь відкладену чергу, в рамках якої ми будемо об'єднувати однакові завдання в одне в певний проміжок часу. Тут на допомогу нам прийде така структура, як Sorted SET. Це зважена множина, кожен елемент якого має свою вагу (score). А що якщо в якості score ми будемо використовувати timestamp додавання елемента до цього sorted set? Тоді ми зможемо організувати чергу, в якій можна буде відкладати деякі події на певний час. Для цього використовуємо наступну функцію:
public function pushToDelayedQueue(string $queueName, $payload, int $delay = 180)
{
    $this->getRedis()->rawCommand('ZADD', $queueName, 'NX', time() + $delay, serialize($payload))
}
У такій схемі ідентифікатор користувача, що зайшов на сайт, потрапить в чергу $queueName і буде висіти там протягом 180 секунд. Всі інші запити в рамках цього часу будуть також вирушати в цю чергу, але вони не будуть туди додані, оскілки ідентифікатор користувача вже існує у цій черзі і продубльований він не буде (за це відповідає параметр 'NX'). Так ми відсікаємо все зайве навантаження і кожен користувач буде генерувати не більше одного запиту в 3 хвилини на оновлення поля last_visit.

Тепер виникає питання про те, як читати цю чергу. Якщо методи LPOP і RPOP для списка читають значення і видаляють його зі списка атомарно (це означає, що одне і те саме значення не може бути взято кількома споживачами), то sorted set такого методу з коробки не має. Ми можемо зробити читання і видалення елемента тільки двома послідовними командами. Але ми можемо виконати ці команди атомарно, використовуючи простий LUA скрипт!
public function popFromDelayedQueue(string $queueName)
{
    $command = 'eval "
        local val = redis.call(\'ZRANGEBYSCORE\', KEYS[1], 0, ARGV[1], \'LIMIT\', 0, 1)[1]
        if val then
            redis.call(\'ZREM\', KEYS[1], val)
        end
        return val"
';
    return $this->getRedis()->rawCommand($command, 1, $queueName, time());
}
У цьому LUA скрипті ми намагаємося отримати перше значення з вагою в діапазоні від 0 до поточного timestamp змінну val з допомогою команди ZRANGEBYSCORE, якщо нам вдалося отримати це значення, то видаляємо його з sorted set командою ZREM і повертаємо саме значення val. Всі ці операції виконуються атомарно. Таким чином ми можемо вичитувати нашу чергу в споживачі, аналогічно з прикладом черги побудованої на структурі LIST.

Я розповів про кілька базових патернів черг, реалізованих в нашій системі. На поточний момент у нас в продакшені існують складніші механізми побудови черг — лінійних, складових, шардінгових. При цьому Redis дозволяє все це робити за допомогою кмітливості та готових круто працюючих структур з коробки, без складного програмування.

Блокування (Mutex)

Mutex (блокування) — це механізм синхронізації доступу до shared ресурсу декількох процесів, тим самим гарантуючи, що тільки один процес буде взаємодіяти з цим ресурсом в одиницю часу. Цей механізм часто застосовується в біллінгу та інших системах, де важливо дотримуватися потокової безпеки (thread safety).

Для реалізації mutex на базі Redis чудово підійде стандартний метод SET з додатковими параметрами:
public function lock(string $key, string $hash, int $ttl = 10): bool
{
    return (bool)$this->getRedis()->rawCommand('SET', $key, $hash, 'NX', 'EX', $ttl);
}
де параметрами для установки mutex є:
  • $key — ключ ідентифікує mutex;
  • $hash — генеруємо підпис, який ідентифікує того, хто поставив mutex. Ми ж не хочемо, щоб хтось в іншому місці випадково зняв блокування і вся наша логіка розсипалася.
  • $ttl — час в секундах, який ми відводимо на блокування (на той випадок, якщо щось піде не так, наприклад процес, який поставив блокування, з якоїсь причини помер і не зняв його, щоб це блокування не висіло нескінченно).
Основна відмінність від методу SET, що використовує механізм кешування — це параметр NX, який говорить Redis про те, що значення, яке зберігається в Redis по ключу $key, не буде записано повторно. У результаті, якщо в Redis немає значення по ключу $key, туди проводиться запис і у відповіді ми отримаємо 'OK', якщо значення по ключу вже є в Redis, воно не буде туди додано (оновлено) і у відповіді ми отримаємо NULL. Результат методу lock(): bool, де true – блокування поставлена, false – вже є активна блокування, створити нову неможливо.

Найчастіше, коли ми пишемо код, який намагається працювати з shared ресурсом, який заблокований, ми хочемо дочекатися його розблокування і продовжити роботу з цим ресурсом. Для цього можемо реалізувати простий метод для очікування вивільненого ресурсу:
public function tryLock(string $key, string $hash, int $timeout, int $ttl = 10): bool
{
     $startTime = microtime(true);
     while (!this->lock($key, $hash, $ttl)) {
         if ((microtime(true) - $startTime) > $timeout) {
             return false; // не вдалося взяти shared ресурс під блокування за вказаний $timeout
         }
         usleep(500 * 1000) //чекаємо 500 мілісекунд до наступної спроби поставити блокування
     }
     return true; //блокування успішно поставлене
}
Ми розібралися як ставити блокування, тепер нам треба навчитися його знімати. Для того, щоб гарантувати зняття блокування тим процесом, який його встановив, нам знадобиться перед видаленням значення зі сховища Redis, звірити чи зберігається хеш по цьому ключу. Для того, щоб зробити це атомарно, скористаємося LUA скриптом:
public function releaseLock(string $key, string $hash): bool
{
    $command = 'eval "
    if redis.call("GET",KEYS[1])==ARGV[1] then
        return redis.call("DEL",KEYS[1])
    else
        return 0
    end"
';
     return (bool) $this->getRedis()->rawCommand($command, 1, $key, $hash);
}
Тут ми намагаємося знайти за допомогою команди GET значення по ключу $key, якщо воно дорівнює значенню $hash, то видаляємо його за допомогою команди DELETE, яка поверне нам кількість віддалених ключів, якщо ж значення по ключу $key не існує, або вона не дорівнює значенню $hash, то ми повертаємо 0, що означає блокування зняти не вдалося. Базовий приклад використання mutex:
class Billing {
    public function charge(int $userId, int $amount)
    {
        $mutexName = sprintf('billing_%d', $userId);
        $hash = sha1(sprintf('billing_%d_%d'), $userId, mt_rand()); //генеруємо якийсь хеш запущеного потоку
        if (!$this->tryLock($mutexName, $hash, 10)) { //намагаємося поставити блокування протягом 10 секунд
            throw new Exception('Не вийшло поставити lock, shared ресурс зайнятий");
        }
        //lock отримано, виконуємо бізнес-логіку
        $this->doSomeLogick();
        //звільняємо shared ресурс, знімаємо блокування
        $this->releaseLock($mutexName, $hash);
    }
}

Rate limiter

Досить часте завдання, коли ми хочемо обмежити кількість запитів до нашого апі. Наприклад на один API endpoint від одного аккаунта ми хочемо приймати не більше 100 запитів в хвилину. Ця задача легко вирішується за допомогою нашого улюбленого Redis:
public function isLimitReached(string $method, int $userId, int $limit): bool
{
     $currentTime = time();
     $timeWindow = $currentTime - ($currentTime % 60); //Так як наш rate limit має обмеження 100 запитів в хвилину, 
    //то округляємо поточний timestamp до початку хвилини — це буде частиною нашого ключа, //за яким ми будемо вважати кількість запитів
     $key = sprintf('api_%s_%d_%d', $method, $userId, $timeWindow); //генеруємо ключ для лічильника, відповідно кожну хвилину він буде мінятися виходячи з $timeWindow
     $count = $this->getRedis()->rawCommand('INCR', $key); //метод INCR збільшує значення за вказаною ключу, і повертає нове значення. 
    //Якщо ключа не існує, він буде ініційований зі значенням 0 і після цього збільшений
     $this->getRedis()->rawCommand('EXPIRE', $key, 60); // Оновлюємо TTL нашому ключу, виставляючи його в хвилину, для того, щоб не накопичувати не актуальні дані
     if ($count > $limit) { //limit досягнутий
          return true;
    }
    return false;
} 
Таким простим методом ми можемо лімітувати кількість запитів до нашого API, базовий каркас нашого контролера міг би виглядати наступним чином:
class FooController {

     public function actionBar()
    {
        if ($this->isLimitReached(__METHOD__, $this->getUserId(), 100)) {
             throw new Exception('API method max limit reached');
        }
        $this->doSomeLogick();
    }
}

Pub/sub

Pub/sub — цікавий механізм, який дозволяє, з одного боку, підписатися на канал і отримувати повідомлення з нього, з іншого боку — відправляти в цей канал повідомлення, яке буде отримано усіма підписниками. Напевно у багатьох, хто працював з вебсокетами, виникла аналогія з цим механізмом, вони дійсно дуже схожі. Механізм pub/sub не гарантує доставки повідомлень, він не гарантує консистентності, тому не варто його використовувати в системах, для яких важливі ці критерії. Проте розглянемо цей механізм на практичному прикладі. Припустимо, що у нас є велика кількість демонізованих команд, якими ми хочемо централізовано керувати. При ініціалізації нашої команди ми підписуємося на канал, через який будемо отримувати повідомлення з інструкціями. З іншого боку у нас є керуючий скрипт, який відправляє повідомлення з інструкцій у зазначений канал. На жаль, стандартний PHP працює в одному блокувальному потоці; для того, щоб реалізувати задумане, використовуємо ReactPHP і реалізований під нього клієнт Redis.

Підписка на канал:
class FooDaemon {

     private $throttleParam = 10;

     public function run()
    {
         $loop = React\EventLoop\Factory::create(); //ініціалізуємо event-loop ReactPHP
         $redisClient = $this->getRedis($loop); //ініціалізуємо клієнта Redis для ReactPHP
         $redisClient->subscribe(__CLASS__); // підписуємося на потрібний нам канал в Redis, у нашому прикладі назву каналу відповідає назві класу
         $redisClient->on('message', static function($channel, $payload) { //слухаємо події message, при виникненні такої події, отримуємо channel і payload
             switch (true) { // Тут може бути будь-яка логіка обробки повідомлень, в якості прикладу нехай буде так:
                 case \is_int($payload): //Якщо до нас прийшло число – оновимо параметр $throttleParam на отримане значення
                     $this->throttleParam = $payload;
                     break;
                 case $payload === 'exit': //Якщо до нас прийшла команда 'exit' – завершимо виконання скрипта
                    exit;
                 default: //Якщо прийшло щось інше, то просто залогируем це
                    $this->log($payload);
                    break;
            }
        });

         $loop->addPeriodicTimer(0, function() {
             $this->doSomeLogick(); // Тут в нескінченному циклі може виконуватися якась логіка, наприклад читання задач з черги і їх обробка
        });

         $loop->run(); //Запускаємо наш event-loop
    }
}
Відправлення повідомлення в канал — простіша дія, ми можемо зробити це абсолютно з будь-якого місця системи однією командою:
public function publishMessage($channel, $message)
{
       $this->getRedis()->publish($channel, $message);
}
В результаті такого надсилання повідомлення в канал, всі клієнти, які підписані на даний канал, отримають це повідомлення.
Redis на практичних прикладах

Підсумок

Ми розглянули 5 прикладів використання Redis на практиці, сподіваюся, що кожен знайде для себе щось цікаве. 

Коментарі (0)

    Ще немає коментарів

Щоб залишити коментар необхідно авторизуватися.