Граємо з потоками в Node.JS 10.5.0

Граємо з потоками в Node.JS 10.5.0
7 хв. читання
15 червня 2020

У мене на роботі виникла суперечка між мною і дотнетчиками щодо потоків у новій версії Node.JS і необхідності їх синхронізувати. Для початку вирішили вибрати задачу про паралельний запис рядків у файлі.

Трохи про самі потоки. Вони є експериментальною технологією Node.JS 10.5.0, і для того, щоб мати доступ до модуля «worker_threads», необхідно запускати наше Node.JS застосунок з прапором "--experimental-worker". Я прописав цей прапор start скрипті у файлі package.json:

{
    "name": "worker-test",
    "version": "1.0.0",
    "description": "",
    "main": "app.js",
    "scripts": {
        "start": "node --max-old-space-size=4096 --experimental-worker app.js "
    },
    "author": "",
    "license": "ISC"
}

Тепер про саму логіку. Головний потік породжує N робочих потоків, всі вони пишуть з якимось інтервалом у файл. На відміну від всіх прикладів, де головні та дочірні потоки стартують з одного файлу, я відділив потоки в окремий, мені це здається чистішим та елегантнішим.

Власне, код.

Головний файл app.js — точка входу.

const { Worker } = require('worker_threads');
const path = require('path');

const WORKERS_NUMBER = 100;

console.log('Hello from main!');

for (var i = 1; i <= WORKERS_NUMBER ; i++) {
    const w = new Worker(path.join(__dirname, './writer-worker-app/app.js'), { workerData: { id: i } });
}

Тут ми просто створюємо дочірні потоки використовуючи клас Worker і вказуючи шлях до стартового файлу для потоку './writer-worker-app/app.js'. При створенні потоку передаємо самописний ID як дані workerData.

Стартовий файл для потоку ./writer-worker-app/app.js:

const { workerData, parentPort } = require('worker_threads');
const logger = require('./logger');

const id = workerData.id;

console.log(`Worker $ initializad.`);

while (true) {
    sendMessage();
}

function sendMessage() {
    logger.log(`Hello from worker number $\r\n`);
}


Ну і найпростіший клас-логер: ./writer-worker-app/logger.js

const fs = require('fs');

function log(message) {
return fs.appendFileSync('./my-file.txt', message);
    }

module.exports = {
    log
};

При запуску цього додатка ми всі сподівалися на те, що в результаті отримаємо кашу в файлі і дотнетчики закричать, потрібні блокування з семафорами та іншими радощами паралельного виконання. Але ні! У файлі всі рядки йдуть не перериваючись, хіба що у випадковому порядку:

Hello from worker number 14
Hello from worker number 3
Hello from worker number 9
Hello from worker number 15
Hello from worker number 2
Hello from worker number 4
Hello from worker number 7
Hello from worker number 6
Hello from worker number 1
Hello from worker number 11

Чудовий експеримент, чергова маленька перемога Ноди :-) Моє припущення у тому, що вся синхронізація відбувається на рівні I\O потоків Ноди, але буду радий дізнатися в коментарях правильний варіант. На всякий випадок ми перевірили роботу, використовуючи не fs.appendFileSync, а fs.createWriteStream і методу stream.write

Результат вийшов такий же.

Але ми на цьому не зупинилися.

Колега запропонував задачу про синхронізацію потоків. Для нашого конкретного прикладу, нехай це буде завдання послідовного запису у файл за збільшенням id. Спочатку пише перший потік, потім другий, потім третій і так далі.

Для цього я запровадив ще один потік-Менеджер. Можна було обійтися головним, але мені так приємно створювати цих ізольованих робітників і вибудовувати спілкування за допомогою повідомлень. Перш ніж почати писати імплементацію потоку-Менеджера, необхідно створити канал зв'язку між ним і записувачами-робочими. Для цього був використаний клас MessageChannel. Інстанси цього класу мають два поля: port1 та port2, кожен з яких вміє слухати та відправляти повідомлення іншим за допомогою методів .on('message') та .postMessage(). Цей клас і був створений в рамках модуля «worker_threads» для комунікації між потоками, тому що зазвичай при передачі об'єкта відбувається просто його клонування, і в ізольованому середовищі виконання потоку він буде марний.

Для комунікації між 2 потоками ми кожному повинні дати по порту. 

Цікавий факт: на 10.5.0 неможливо передати порт через конструктор воркера, необхідно це робити тільки через worker.postMessage(), причому обов'язково вказуючи порт в transferList параметрі!

Сам потік-менеджер буде відсилати команди потокам-записувачам в порядку зростання їх ідентифікаторів, причому таку команду він відправить тільки після отримання відповіді записувача про успішну операцію.

Недо-UML-діаграма додатки:

Наш видозмінений головний файл ./app.js:

const { Worker, MessageChannel } = require('worker_threads');
const path = require('path');

const WORKERS_NUMBER = 100;

console.log('Main app initialized and started.');

const workersMeta = [];

for (var i = 1; i <= WORKERS_NUMBER; i++) {
    const channel = new MessageChannel();
    const worker = new Worker(path.join(__dirname, './writer-worker-app/app.js'), { workerData: { id: i } });
    workersMeta.push({ id: i, worker, channel });
}

workersMeta.forEach(({ worker, channel }) => {
    worker.postMessage({ orchestratorPort: channel.port1 }, [channel.port1]);
})

setTimeout(() => {
    const orchestrator = new Worker(path.join(__dirname, './orchestrator-worker-app/app.js'));
    const orchestratorData = workersMeta.map((meta) => ({ id: meta.id port: meta.channel.port2 }));
    orchestrator.postMessage({ workerPorts: orchestratorData }, orchestratorData.map(w => w.port));

    console.log('All worker threads have been initialized');
}, WORKERS_NUMBER * 10);


Тут ми спочатку створюємо воркерів, потім кожному відправляємо порт для зв'язку з диспетчером (і тільки так, через конструктор це зробити неможливо).

Потім створюємо потік-менеджер, відправляємо йому список портів для зв'язку з потоками-записувачами. 
Updated: емпіричним шляхом з'ясував, що при роботі з потоками краще спочатку дати їм настоятися (проініціюватися як треба). По хорошому треба було слухати якісь відповіді від потоків в стилі «Я готовий!», але я вирішив піти легшим шляхом.

Змінимо поведінку потока-записника, щоб він відправляв повідомлення тільки коли йому скажуть, а також повертав результат, коли операція запису закінчена:
./writer-worer-app/app.js

const { workerData, parentPort } = require('worker_threads');
const logger = require('./logger');

const id = workerData.id;

console.log(`Worker $ initializad.`);

parentPort.on('message', value => {
const orchestratorPort = value.orchestratorPort;
orchestratorPort.on('message', data => {
if (data.command == 'write') {
console.log(`Worker $ received write command`);
sendMessage();
sendResult(orchestratorPort);
}
});
console.log(`Worker $ started.`);
});


function sendMessage() {
logger.log(`Hello from worker number $\r\n`);
}

function sendResult(port) {
port.postMessage({ id status: 'completed' });
}

Ми правильно проініціювали повідомлення від батьківського потоку, почали злучати канал потоку-менеджера, при отриманні команди спочатку пишемо в файл, потім відправляємо результат. Потрібно зауважити, що в файл пишеться синхронно, тому sendResult() викликається відразу за sendMessage().

Все, що залишилося — написати імплементацію нашого розумного менеджера
./orchestrator-worker-app/app.js:

const { parentPort } = require('worker_threads');

console.log('Orchestrator initialized.')

let workerPorts;

parentPort.on('message', (value) => {
workerPorts = value.workerPorts;
workerPorts.forEach(wp => wp.port.on('message', handleResponse));
console.log('Orchestrator started.');
sendCommand(workerPorts[0]);
});

function handleResponse(status) {
const responseWorkerId = status.id;
let nextWorker = workerPorts.find(wp => wp.id == responseWorkerId + 1);
if (!nextWorker) {
nextWorker = workerPorts[0];
}
sendCommand(nextWorker);
}

function sendCommand(worker) {
worker.port.postMessage({ command: 'write' });
}

Отримали список портів, впорядкували, для кожного порту встановили callback на response, ну і відправили команду першому. У самому callback шукаємо наступного записника і відправляємо команду йому. Щоб не сильно напружувати систему, був встановлений інтервал між командами.

Ось і весь, наш багатопотоковий застосунок з керуванням потоків готово. Ми навчилися не просто породжувати воркер-потоки в Node.JS, але і створювати ефективні способи комунікації між ними. На мій особистий погляд, архітектура ізольованих потоків в Node.JS з очікуванням і посиланням повідомлень більш зручна і перспективна. Всім дякую за увагу.

Весь вихідний код може бути знайдений тут.

Помітили помилку? Повідомте автору, для цього достатньо виділити текст з помилкою та натиснути Ctrl+Enter
Коментарі (0)

    Ще немає коментарів

Щоб залишити коментар необхідно авторизуватися.

Вхід / Реєстрація