Обробляємо замовлення з інтернет магазину за допомогою RabbitMQ і TypeScript

Обробляємо замовлення з інтернет магазину за допомогою RabbitMQ і TypeScript
23 хв. читання
06 жовтня 2019

Популярність інтернет комерції зростає все більше, як і частка інформатизації всіх суміжних з торгівлею видів діяльності. Разом з цим зростає і складність обробки інформації. Кожне замовлення, зроблений клієнтом інтернет магазину, породжує за собою велику кількість інтеграцій з різними сервісами. Такими сервісами можуть бути сервіси обробки платежів, доставки, системи обліку і лояльності. Кожне замовлення має бути оплачений, врахований, зібраний і доставлений, а також доступний для подальшого аналізу. Цю, і так не просту ситуацію, ускладнює і той факт, що користувач інтернет магазину не хоче довго і болісно чогось чекати при оформленні замовлення. Відгук від інтернет магазину повинен бути швидким, адже кожна мілісекунда затримки збільшує шанс втрати клієнта, а в наслідку і прибутку. У цій статті я хочу розповісти про брокер повідомлень RabbitMQ і як з його допомогою можна організувати процес обробки замовлень використовуючи Node.js і TypeScript. Ласкаво просимо під кат.

Необхідна теорія

Думаю, багато хто чув про RabbitMQ, адже перша open source версія цього брокера повідомлень, заснованого на протоколі AMQP, була випущена аж в 2007 році. Брокер повідомлень необхідний для зв'язку різних компонентів системи в єдине ціле, як клей необхідний для реанімації розбитою вази. За допомогою брокера повідомлень можна реалізувати асинхронну обробку подій, що надходять в систему. Якраз така асинхронна обробка замовлень і потрібна інтернет магазину. Але для початку необхідно розібратися з основними компонентами RabbitMQ. У цього брокера є три основних компоненти, за допомогою яких ми будемо вибудовувати процес обробки:

Message. Це мінімальна одиниця інформації в рамках брокера повідомлень і нашого сервісу обробки, яка може бути оброблена. Сам RabbitMQ зберігає повідомлення в бінарному вигляді, але для нашої системи і для статті це не важливо. Повідомлення ми будемо приймати і обробляти у вигляді JSON. Так само варто згадати, що повідомлення в RabbitMQ мають заголовки. Вони схожі з заголовками http запитів. Це асоціативний масив, в який можна записати необхідну інформацію.

Message queue. Це чергу, в якій RabbitMQ зберігає повідомлення. На чергу повідомлень можуть бути підписані один або кілька споживачів (consumer). Кожне повідомлення з черги rabbit розподіляє по споживачах, використовуючи алгоритм round-robin.

Exchange. Це, як зрозуміло з назви, точка обміну. До цієї точки можуть бути прив'язані черзі або інші обмінники. Точка обміну не зберігає повідомлення, основна її функція - це маршрутизація повідомлень в одну або кілька черг, або такі ж точки обміну. Кожна чергу або обмінник прив'язується по ключу маршрутизації (routing key). У RabbitMQ є кілька різних типів обмінників, які впливають на те, як саме exchange буде маршрутизировать яке надійшло в нього повідомлення.

Для того, щоб описати, як працюють різні типи обмінників, необхідно розібратися, що з себе представляють ключі маршрутизації. Ключ маршрутизації є як у прив'язки (binding) черги до обмінника, так і у самого повідомлення. Routing key це просто рядок, поділена на блоки. Кожен блок розділений точкою. Наприклад, "notify.sendEmail.sendSms". При цьому для ключа маршрутизації повідомлення можна задавати шаблони з використанням спеціальних символів # і *. * - говорить що після точки може йти один будь-який блок, а ось після # може йти будь-яка кількість блоків. Наприклад "notify.sendSms. *" Або "notify. #". Тепер можна переходити до типам точок обміну.

Є чотири типи обмінників:

Fanout. Логіка маршрутизації даного exchange'a проста, він перенаправляє надійшло повідомлення в усі черги або обмінники, які прив'язані до нього.

Обробляємо замовлення з інтернет магазину за допомогою RabbitMQ і TypeScript

Direct. Цей exchange перенаправляє повідомлення в залежності від того, чи збігається routing key повідомлення з routing key прив'язки.

Обробляємо замовлення з інтернет магазину за допомогою RabbitMQ і TypeScript

Topic. Exchange цього типу також як і Direct маршрутизує повідомлення в залежності від routing key. Але як ключ маршрутизації може виступати шаблон.

Обробляємо замовлення з інтернет магазину за допомогою RabbitMQ і TypeScript

Headers. Цей exchange, на відміну від інших, використовує для маршрутизації заголовки повідомлень. При цьому черзі до обмінника бінді також за допомогою асоціативного масиву. Логіку, за якою обмінник буде маршрутизировать повідомлення, можна змінювати за допомогою спеціального ключа "x-match", який задається в асоціативному масиві прив'язки. Ключу можна задати два значення all або any. Якщо значення all, то заголовки повідомлення повинні повністю збігатися з асоціативним масивом прив'язки, якщо значення any, то значення має збігатися хоча б у одного ключа.

Обробляємо замовлення з інтернет магазину за допомогою RabbitMQ і TypeScript

Це основні компоненти RabbitMQ. Більш докладно про ці компонентах можна почитати в специфікації протоколу AMQP. Далі ми будемо проектувати і реалізовувати систему обробки замовлень на прикладі TypeScript, попутно розбираючись з настройками кожного компонента.

Проєктування

Для спрощення прикладу вважатимемо, що для успішної обробки інтернет замовлення у нас повинен бути наступний функціонал:

Зберігати надходять замовлення

Відправляти sms клієнту з номером замовлення, а також статусом замовлення

Відправляти повідомлення в службу кур'єрської доставки про нове замовлення з нашого інтернет магазину, якщо клієнт вибрав цей спосіб доставки

Але мало реалізувати цей функціонал, адже наш інтернет магазин планує розширювати функціонал і надавати в подальшому більше різних можливостей своїм клієнтам (а таке відбувається завжди). Наприклад, повідомляти клієнта по email або надавати на вибір кілька способів доставки замовлення. З цього випливає, що нам потрібно спроектувати систему таким чином, щоб додавати функціонал було просто.

Також варто згадати, що я буду використовувати шаблон відкладених повідомлень для того, щоб була можливість, при недоступності зовнішнього сервісу, повторювати логіку кілька разів. Про цей шаблон можна почитати тут

Щоб більш наочно уявити кінцеву мету, я намалюю схему.

Обробляємо замовлення з інтернет магазину за допомогою RabbitMQ і TypeScript

Давайте розберемося по порядку, як влаштований процес обробки замовлення на цій схемі. Схема розбита на блоки і різні кольори. Блоки білого кольору позначають зовнішні сервіси, які ми розглядати не будемо. Блоки сірого кольору позначають елементи RabbitMQ. Черги і обмінники. Зеленим кольором відображені блоки бізнес логіки, які необхідно реалізовувати. Також кожен блок, що має відношення до нашої логіки, пронумерований. Цифри позначають процес і подпроцесс відповідно до порядку.

Насамперед, повідомлення по HTTP API потрапляє в наш сервіс. Після цього ми повинні привласнити номер замовлення, зберегти замовлення в базі даних зі статусом "новий" і відправити відповідь про успішне створення замовлення, з його номером, назад. Клієнт, отримавши повідомлення про успішне створення замовлення, йде займатися своїми справами. Відправивши позитивну відповідь, ми відправляємо об'єкт замовлення в exchange постобработки, з якого він потрапляє в worker формування routing key. Цей Воркер, отримавши об'єкт замовлення з черги, на його основі (чи є в замовленні email або телефон клієнта, який спосіб доставки був обраний) повинен сформувати ключ маршрутизації замовлення. Сформувавши routing key, worker відправляє повідомлення назад в exchange постобработки, але тепер ключ маршрутизації у замовлення змінився і обмінник може відправити його вже по потрібному маршруту. Залежно від ключа, замовлення може бути відправлений у exchange, який відповідає за повідомлення, exchange інтеграцій або відразу в обидва. А далі за такою ж логікою в черзі і Воркер.

Воркер відправки sms і служби доставки намагатимуться обробити повідомлення кілька разів. Кількість таких спроб можна передати в змінної оточення. Але нескінченно обробляти повідомлення не варто, адже помилка може критися в самому повідомленні або логіці Воркер. Тому після перевищення кількості допустимих спроб повідомлення буде віддалятися з черг і відправлятися в сховище помилок, з якого його можна буде повторно відправити назад на потрібний рівень обробки.

Реалізація

Для перевірки реалізації буде потрібно сам rabbit. Я рекомендую використовувати для цієї мети docker і офіційний образ брокера. Встановити і запустити контейнер можна наступною командою.

docker run -d --name rabbit -p 5672: 5672 -e rabbitmq: 3.7.15-management-alpine

Це образ з web інтерфейсом, доступним на порту 15672, для зручної налагодження.

Реалізовувати задумане будемо за допомогою TypeScript і бібліотеки amqplib (реалізація клієнта RabbitMQ для Node.js) тому для початок необхідно описати кілька інтерфейсів. Наведемо інтерфейси замовлення і повідомлень, які ми будемо відправляти в rabbit

// інтерфейс товара
експортний інтерфейс Продукт {
  id: рядок;
  назва: рядок;
  ціна: номер;
}

// Загальний інтерфейс замовлення
експортний інтерфейс Замовлення {
  clientName: рядок;
  місто: рядок;
  електронна пошта ?: рядок;
  телефон ?: рядок;
  продукція: Товар [];
  totalSum: число;
  deliveryAddress ?: рядок;
}

// Інтерфейс замовлення у якого є номер телефону клієнта
експортний інтерфейс OrderWithPhone розширює замовлення {
  телефон: рядок;
}
// Інтерфейс замовлення у якого є адреса доставки
експортний інтерфейс OrderWithDeliveryAddress розширює замовлення {
  deliveryAddress: рядок;
}

// Types Guard'и для визначення якої замовлення до нас прийшов
експорт const isOrderWithPhone = (замовлення: замовлення): замовлення є OrderWithPhone => булевий (order.phone);

експорт const isOrderWithDeliveryAddress = (замовлення: Замовлення): замовлення є OrderWithDeliveryAddress =>
  Булева (order.deliveryAddress);

// Повідомлення в рамках системи.
експортний інтерфейс Повідомлення {
  помилки: string [];
  повторити: номер;
  порядок: O;

// Інтерфейс повідомлення яке буде надіслано в сховище помилок
експортний інтерфейс FailOrder розширює Повідомлення {
  обмін: рядок;
  routingKey: рядок;
}

Тепер потрібно описати інтерфейс конфігурації черг і обмінників, на основі якої будемо будувати структуру обробки в rabbit.

імпортувати {Типи, ExchangeTypes} з '../constants';
імпортувати {Параметри} з 'amqplib';

// Типи об'єктів RabbitMQ які ми будемо використовувати для конфігурації
типи експорту перерахунків {
  QUEUE = 'черга',
  ОБМІН = 'обмін',
}

// Типи обмінників які будемо використовувати
export enum ExchangeTypes {
  TOPIC = "тема",
}

// Інтерфейс опису черги
черга на експорт
  назва: рядок;
  параметри: Options.AssertQueue;
}

// Інтерфейс опису обмінника
експортний інтерфейс Exchange {
  назва: рядок;
  тип: ExchangeTypes;
}

// Інтерфейс опису прив'язки
експортний інтерфейс Прив'язка {
  тип: типи;
  призначення: рядок;
  джерело: рядок;
  routingKey: рядок;
}

// Інтерфейс конфігурації RabbitMQ
експортний інтерфейс PipelineConfig {
  черги: черга [];
  обміни: обмін [];
  прив’язки: Пов'язування [];
}

Описавши основні компоненти системи, опишемо конфігурацію, яка була намальована на схемі за допомогою об'єкта.

Черги

експортувати за замовчуванням [
  // Черга для повідомлень для яких потрібно згенерувати routingKey
  {
    name: 'generateRoutingKey',
    options: {
      durable: true,
    },
  },
  // Черга відправки sms
  {
    name: 'sendSms',
    options: {
      durable: true,
    },
  },
  // Черга інтеграції зі службою доствки 
  {
    name: 'delivery',
    options: {
      durable: true,
    },
  },
  // Відкладена черга для повідомлень які очікують повторної відправки по SMS
  {
    name: 'sendSmsHold',
    options: {
      durable: true,
      deadLetterExchange: 'notify',
      deadLetterRoutingKey: 'sendSms',
      messageTtl: 60000,
    },
  },
  //  Відкладена черга для повідомлень які очікують повторної відправки в службу доставки
  {
    name: 'deliveryHold',
    options: {
      durable: true,
      deadLetterExchange: 'integrates',
      deadLetterRoutingKey: 'delivery',
      messageTtl: 60000,
    },
  },
];

При описі черг використовуються наступні опції для черги

durable. За замовчуванням всі повідомлення черзі зберігаються в пам'яті. Отже, при перезавантаженні брокера повідомлення пропадуть. Для уникнення цього можна використовувати цю опцію. З цієї налаштуванням rabbit буде скидати повідомлення на диск. Але тут є один нюанс. Щоб повідомлення збереглися після рестарту брокера, мало цієї настройки, потрібно, щоб повідомлення відправлялися в чергу з опцією persistent.

messageTtl. Час життя повідомлення. Задається в мілісекундах

deadLetterExchange. Ім'я обмінника, куди повідомлення відправиться з черги при закінченні її терміну життя

deadLetterRoutingKey. RoutingKey, з яким повідомлення буде відправлено в обмінник з попередньої опції

Exchanges

import { ExchangeTypes } from '../constants';

export default [
  {
    name: 'postprocessing',
    type: ExchangeTypes.TOPIC,
  },
  {
    name: 'notify',
    type: ExchangeTypes.TOPIC,
  },
  {
    name: 'integrates',
    type: ExchangeTypes.TOPIC,
  },
];

Bindings

import { Types } from '../constants';

export default [
  {
    type: Types.EXCHANGE,
    destination: 'notify',
    source: 'postprocessing',
    routingKey: '#.notify.#',
  },
  {
    type: Types.EXCHANGE,
    destination: 'integrates',
    source: 'postprocessing',
    routingKey: '#.integrates.#',
  },
  {
    type: Types.QUEUE,
    destination: 'generateRoutingKey',
    source: 'postprocessing',
    routingKey: 'generateRoutingKey',
  },
  {
    type: Types.QUEUE,
    destination: 'sendSms',
    source: 'notify',
    routingKey: '#.sendSms.#',
  },
  {
    type: Types.QUEUE,
    destination: 'delivery',
    source: 'integrates',
    routingKey: '#.delivery.#',
  },
  {
    type: Types.QUEUE,
    destination: 'sendSmsHold',
    source: 'notify',
    routingKey: 'sendSmsHold',
  },
  {
    type: Types.QUEUE,
    destination: 'deliveryHold',
    source: 'integrates',
    routingKey: 'deliveryHold',
  },
];

повна конфігурація

import { PipelineConfig } from '../interfaces';
import exchanges from './exchanges';
import queues from './queues';
import bindings from './bindigs';

export const pipelineConfig: PipelineConfig = {
  exchanges,
  queues,
  bindings,
};

Для підключення до rabbit напишемо клас.

import { connect, Connection, Channel } from 'amqplib';

export class RabbitConnect {
  private _uri: string;
  private _connection: Connection;
  private _chanel: Channel;
  constructor() {
    // Рядок підключення до Rabbit буде братися з оточення середовища
    this._uri = process.env.RABBIT_URI || 'amqp://localhost';
  }
  protected async connect() {
    this._connection = await connect(this._uri);
    this._chanel = await this._connection.createChannel();
  }
  protected async disconnect() {
    await this._chanel.close();
    return this._connection.close();
  }
  protected get chanel() {
    return this._chanel;
  }
}

Напишемо клас Pipeline, який при старті буде створювати всю необхідну інфраструктуру в rabbit за описаною раніше конфігурації.

import { RabbitConnect } from './RabbitConnect';
import { PipelineConfig } from './interfaces';
import { Types } from './constants';

export class Pipeline extends RabbitConnect {
  private _pipeline: PipelineConfig;
  constructor(pipelineConfig: PipelineConfig) {
    super();
    this._pipeline = pipelineConfig;
  }
  public async create() {
    try {
      await this.connect();

      // Створюємо черги
      const createQueues = this._pipeline.queues.map(queue =>
        this.chanel.assertQueue(queue.name, queue.options),
      ) as PromiseLike[];
      // Створюємо обмінники
      const createExchanges = this._pipeline.exchanges.map(exchange =>
        this.chanel.assertExchange(exchange.name, exchange.type),
      ) as PromiseLike[];

      await Promise.all([...createQueues, ...createExchanges]);
      // Після створення необхідних компонентів створюємо біндінги
      const createBindings = this._pipeline.bindings.map(binding => {
        if (binding.type === Types.QUEUE) {
          return this.chanel.bindQueue(binding.destination, binding.source, binding.routingKey);
        }
        return this.chanel.bindExchange(binding.destination, binding.source, binding.routingKey);
      });

      await Promise.all(createBindings);
      return this.disconnect();
    } catch (error) {
      console.error(error);
      throw new Error(error);
    }
  }
}

Тепер напишемо абстрактний клас Воркер із загальним для всіх Воркер функціоналом, від якого можна буде успадковуватися.

import { RabbitConnect } from './RabbitConnect';
import { Message, Order, FailOrder } from './interfaces';
import { ConsumeMessage } from 'amqplib';

export interface WorkerParams {
  maxRetry?: number; // Максимальна кількість повторів обробки
  active: string; // Ім'я активної черги
  exchange: string; // Ім'я обмінника з якого прийшло повідомлення
  holdKey?: string; // Ключ маршрутизації для відкладеної черги
}

export abstract class Worker extends RabbitConnect {
  private _maxRetry: number;
  private _active: string;
  private _holdKey: string | undefined;
  protected exchange: string;
  private _currentMessage: Message;
  private _currentConsumeMessage: ConsumeMessage;
  constructor({ active, holdKey, exchange, maxRetry }: WorkerParams) {
    super();
    this._maxRetry = maxRetry || 0;
    this._active = active;
    this._holdKey = holdKey;
    this.exchange = exchange;
  }
  public async subscribe() {
    await this.connect();
    this.chanel.consume(this._active, this.checkMessage.bind(this));
  }
  // Метод перевірки для повідомлень у яких перевищено ліміт повторів
  private async checkMessage(message: ConsumeMessage) {
    this._currentConsumeMessage = message;
    const orderMessage: Message = JSON.parse(message.content.toString());
    if (orderMessage.retry >= this._maxRetry) {
      await this.sendToErrorStorage('Перевищено ліміт спроб');
    }
    this._currentMessage = orderMessage;
    // Якщо кількість спроб не перевищено викликаємо метод з бізнес логікою
    await this.handler(orderMessage.order || orderMessage);
  }
  // Метод відправки повідомлення в сховище помилок
  protected async sendToErrorStorage(error: string) {
    const message: FailOrder = {
      order: this._currentMessage.order,
      errors: [...this._currentMessage.errors, error],
      retry: this._currentMessage.retry + 1,
      exchange: this.exchange,
      routingKey: this._active
    };
    console.log('Відправлення в сховище помилок', message);
    this.ack();
  }
  // Метод відправки повідомлення в відкладену чергу
  protected async hold(error: string) {
    if (!this._holdKey) {
      return;
    }
    const orderMessage = {
      order: this._currentMessage.order,
      errors: [...this._currentMessage.errors, error],
      retry: this._currentMessage.retry + 1,
    };
    const orderData = Buffer.from(JSON.stringify(orderMessage));
    return this.chanel.publish(this.exchange, this._holdKey, orderData);
  }
  // Метод підтвердження вдалої обробки повідомлення
  protected async ack() {
    return this.chanel.ack(this._currentConsumeMessage);
  }
  protected abstract handler(message: M): void;
}

За замовчуванням rabbit вимагає підтвердження уcпешно обробки повідомлення від Воркер. Для цього у каналу підключення є метод ack. Якщо Воркер не зміг обробити повідомлення, то існує метод nack, який говорить rabbit'у, щоб він відправив повідомлення іншому Воркер. Тепер ми можемо написати кілька простих Воркер зі схеми. Воркер генерації ключа маршрутизації.

import { Worker } from '../Worker';
import {
  isOrderWithPhone,
  isOrderWithDeliveryAddress,
  Order,
  Message,
} from '../interfaces';
import { Keys } from '../constants';

export class GenerateRoutingKey extends Worker {
  constructor() {
    super({
      active: 'generateRoutingKey',
      exchange: 'postprocessing',
    });
  }
  protected async handler(order: Order) {
    try {
      const routingKey: string[] = [];
      if (isOrderWithPhone(order)) {
        routingKey.push(Keys.SEND_SMS);
      }
      if (isOrderWithDeliveryAddress(order)) {
        routingKey.push(Keys.SEND_TO_DELIVERY);
      }
      const message: Message = {
        retry: 0,
        errors: [],
        order,
      };
      await this.chanel.publish(
        this.exchange,
        routingKey.join('.'),
        Buffer.from(JSON.stringify(message)),
      );
      await this.ack();
    } catch (error) {
      console.error(error);
      await this.sendToErrorStorage(error);
    }
  }
}

Воркер відправки sms.

import { Worker } from '../Worker';
import { OrderWithPhone } from '../interfaces';

export class SendSms extends Worker {
  constructor() {
    super({
      active: 'sendSms',
      exchange: 'notify',
      holdKey: 'sendSmsHold',
      maxRetry: process.env.MAX_RETRY ? parseInt(process.env.MAX_RETRY) : 5,
    });
  }
  protected async handler(message: OrderWithPhone) {
    try {
      console.log('Відправка sms на номер: ', message.phone);
      this.ack();
    } catch (error) {
      console.error(error);
      await this.hold(error);
    }
  }
}

Воркер інтеграції зі службою доставки.

import { Worker } from '../Worker';
import { OrderWithDeliveryAddress } from '../interfaces';

export class Delivery extends Worker {
  constructor() {
    super({
      active: 'delivery',
      exchange: 'interates',
      holdKey: 'deliveryHold',
      maxRetry: process.env.MAX_RETRY ? parseInt(process.env.MAX_RETRY) : 5,
    });
  }
  protected async handler(message: OrderWithDeliveryAddress) {
    try {
      console.log('Відправка замовлення в службу доставки на адресу: ', message.deliveryAddress);
      this.ack();
    } catch (error) {
      console.error(error);
      await this.hold(error);
    }
  }
}

Точка входу в застосунок.

import { Pipeline } from './Pipeline';
import { pipelineConfig } from './pipeline';
import { GenerateRoutingKey } from './workers/GenerateRoutingKey';
import { SendSms } from './workers/SendSms';
import { Delivery } from './workers/Delivery';

(async () => {
  try {
    const pipeline = new Pipeline(pipelineConfig);
    const generateRoutingKey = new GenerateRoutingKey();
    const sendSms = new SendSms();
    const delivery = new Delivery();
    await pipeline.create();
    чекайте Promise.all ([generatorRoutingKey.subscribe (), sendSms.subscribe (), delivery.subscribe ()]);
  } catch (Error) {
    console.error (помилка);
    process.exit (1);
  }
})();

Наводити приклад коду класу записи замовлення в базу і генерації номера інтернет замовлення я не буду. Це виходить за рамки даної статті. Для перевірки коду можна скористатися веб інтерфейсом rabbit'а, відправивши в обмінник posrprocessing json замовлення.

Висновок

Така схема побудови обробки інтернет замовлення дозволяє легко масштабувати систему. Нам не важко додати в цю схему кілька черг і Воркер, щоб додати потрібний функціонал. Наприклад, можна додати відправку повідомлень на email або відправку замовлення для обліку в 1С.

Помітили помилку? Повідомте автору, для цього достатньо виділити текст з помилкою та натиснути Ctrl+Enter
Коментарі (0)

    Ще немає коментарів

Щоб залишити коментар необхідно авторизуватися.

Вхід / Реєстрація