У мене на роботі виникла суперечка між мною і дотнетчиками щодо потоків у новій версії Node.JS і необхідності їх синхронізувати. Для початку вирішили вибрати задачу про паралельний запис рядків у файлі.
Трохи про самі потоки. Вони є експериментальною технологією Node.JS 10.5.0, і для того, щоб мати доступ до модуля «worker_threads», необхідно запускати наше Node.JS застосунок з прапором "--experimental-worker". Я прописав цей прапор start скрипті у файлі package.json:
{
"name": "worker-test",
"version": "1.0.0",
"description": "",
"main": "app.js",
"scripts": {
"start": "node --max-old-space-size=4096 --experimental-worker app.js "
},
"author": "",
"license": "ISC"
}
Тепер про саму логіку. Головний потік породжує N робочих потоків, всі вони пишуть з якимось інтервалом у файл. На відміну від всіх прикладів, де головні та дочірні потоки стартують з одного файлу, я відділив потоки в окремий, мені це здається чистішим та елегантнішим.
Власне, код.
Головний файл app.js — точка входу.
const { Worker } = require('worker_threads');
const path = require('path');
const WORKERS_NUMBER = 100;
console.log('Hello from main!');
for (var i = 1; i <= WORKERS_NUMBER ; i++) {
const w = new Worker(path.join(__dirname, './writer-worker-app/app.js'), { workerData: { id: i } });
}
Тут ми просто створюємо дочірні потоки використовуючи клас Worker і вказуючи шлях до стартового файлу для потоку './writer-worker-app/app.js'. При створенні потоку передаємо самописний ID як дані workerData.
Стартовий файл для потоку ./writer-worker-app/app.js:
const { workerData, parentPort } = require('worker_threads');
const logger = require('./logger');
const id = workerData.id;
console.log(`Worker $ initializad.`);
while (true) {
sendMessage();
}
function sendMessage() {
logger.log(`Hello from worker number $\r\n`);
}
Ну і найпростіший клас-логер: ./writer-worker-app/logger.js
const fs = require('fs');
function log(message) {
return fs.appendFileSync('./my-file.txt', message);
}
module.exports = {
log
};
При запуску цього додатка ми всі сподівалися на те, що в результаті отримаємо кашу в файлі і дотнетчики закричать, потрібні блокування з семафорами та іншими радощами паралельного виконання. Але ні! У файлі всі рядки йдуть не перериваючись, хіба що у випадковому порядку:Hello from worker number 14
Hello from worker number 3
Hello from worker number 9
Hello from worker number 15
Hello from worker number 2
Hello from worker number 4
Hello from worker number 7
Hello from worker number 6
Hello from worker number 1
Hello from worker number 11
Чудовий експеримент, чергова маленька перемога Ноди :-) Моє припущення у тому, що вся синхронізація відбувається на рівні I\O потоків Ноди, але буду радий дізнатися в коментарях правильний варіант. На всякий випадок ми перевірили роботу, використовуючи не fs.appendFileSync, а fs.createWriteStream і методу stream.write.
Результат вийшов такий же.
Але ми на цьому не зупинилися.
Колега запропонував задачу про синхронізацію потоків. Для нашого конкретного прикладу, нехай це буде завдання послідовного запису у файл за збільшенням id. Спочатку пише перший потік, потім другий, потім третій і так далі.
Для цього я запровадив ще один потік-Менеджер. Можна було обійтися головним, але мені так приємно створювати цих ізольованих робітників і вибудовувати спілкування за допомогою повідомлень. Перш ніж почати писати імплементацію потоку-Менеджера, необхідно створити канал зв'язку між ним і записувачами-робочими. Для цього був використаний клас MessageChannel. Інстанси цього класу мають два поля: port1 та port2, кожен з яких вміє слухати та відправляти повідомлення іншим за допомогою методів .on('message') та .postMessage(). Цей клас і був створений в рамках модуля «worker_threads» для комунікації між потоками, тому що зазвичай при передачі об'єкта відбувається просто його клонування, і в ізольованому середовищі виконання потоку він буде марний.
Для комунікації між 2 потоками ми кожному повинні дати по порту.
Цікавий факт: на 10.5.0 неможливо передати порт через конструктор воркера, необхідно це робити тільки через worker.postMessage(), причому обов'язково вказуючи порт в transferList параметрі!
Сам потік-менеджер буде відсилати команди потокам-записувачам в порядку зростання їх ідентифікаторів, причому таку команду він відправить тільки після отримання відповіді записувача про успішну операцію.
Недо-UML-діаграма додатки:
Наш видозмінений головний файл ./app.js:
const { Worker, MessageChannel } = require('worker_threads');
const path = require('path');
const WORKERS_NUMBER = 100;
console.log('Main app initialized and started.');
const workersMeta = [];
for (var i = 1; i <= WORKERS_NUMBER; i++) {
const channel = new MessageChannel();
const worker = new Worker(path.join(__dirname, './writer-worker-app/app.js'), { workerData: { id: i } });
workersMeta.push({ id: i, worker, channel });
}
workersMeta.forEach(({ worker, channel }) => {
worker.postMessage({ orchestratorPort: channel.port1 }, [channel.port1]);
})
setTimeout(() => {
const orchestrator = new Worker(path.join(__dirname, './orchestrator-worker-app/app.js'));
const orchestratorData = workersMeta.map((meta) => ({ id: meta.id port: meta.channel.port2 }));
orchestrator.postMessage({ workerPorts: orchestratorData }, orchestratorData.map(w => w.port));
console.log('All worker threads have been initialized');
}, WORKERS_NUMBER * 10);
Тут ми спочатку створюємо воркерів, потім кожному відправляємо порт для зв'язку з диспетчером (і тільки так, через конструктор це зробити неможливо).
Потім створюємо потік-менеджер, відправляємо йому список портів для зв'язку з потоками-записувачами.
Updated: емпіричним шляхом з'ясував, що при роботі з потоками краще спочатку дати їм настоятися (проініціюватися як треба). По хорошому треба було слухати якісь відповіді від потоків в стилі «Я готовий!», але я вирішив піти легшим шляхом.
Змінимо поведінку потока-записника, щоб він відправляв повідомлення тільки коли йому скажуть, а також повертав результат, коли операція запису закінчена:
./writer-worer-app/app.js
const { workerData, parentPort } = require('worker_threads');
const logger = require('./logger');
const id = workerData.id;
console.log(`Worker $ initializad.`);
parentPort.on('message', value => {
const orchestratorPort = value.orchestratorPort;
orchestratorPort.on('message', data => {
if (data.command == 'write') {
console.log(`Worker $ received write command`);
sendMessage();
sendResult(orchestratorPort);
}
});
console.log(`Worker $ started.`);
});
function sendMessage() {
logger.log(`Hello from worker number $\r\n`);
}
function sendResult(port) {
port.postMessage({ id status: 'completed' });
}
Ми правильно проініціювали повідомлення від батьківського потоку, почали злучати канал потоку-менеджера, при отриманні команди спочатку пишемо в файл, потім відправляємо результат. Потрібно зауважити, що в файл пишеться синхронно, тому sendResult() викликається відразу за sendMessage().
Все, що залишилося — написати імплементацію нашого розумного менеджера
./orchestrator-worker-app/app.js:
const { parentPort } = require('worker_threads');
console.log('Orchestrator initialized.')
let workerPorts;
parentPort.on('message', (value) => {
workerPorts = value.workerPorts;
workerPorts.forEach(wp => wp.port.on('message', handleResponse));
console.log('Orchestrator started.');
sendCommand(workerPorts[0]);
});
function handleResponse(status) {
const responseWorkerId = status.id;
let nextWorker = workerPorts.find(wp => wp.id == responseWorkerId + 1);
if (!nextWorker) {
nextWorker = workerPorts[0];
}
sendCommand(nextWorker);
}
function sendCommand(worker) {
worker.port.postMessage({ command: 'write' });
}
Отримали список портів, впорядкували, для кожного порту встановили callback на response, ну і відправили команду першому. У самому callback шукаємо наступного записника і відправляємо команду йому. Щоб не сильно напружувати систему, був встановлений інтервал між командами.
Ось і весь, наш багатопотоковий застосунок з керуванням потоків готово. Ми навчилися не просто породжувати воркер-потоки в Node.JS, але і створювати ефективні способи комунікації між ними. На мій особистий погляд, архітектура ізольованих потоків в Node.JS з очікуванням і посиланням повідомлень більш зручна і перспективна. Всім дякую за увагу.
Ще немає коментарів