Статичні методи Atomics.wait() і Atomics.notify() представляють собою низькорівневі примітиви синхронізації, які можна застосовувати для реалізації м'ютексів та інших подібних механізмів. Але, оскільки метод Atomics.wait()
є блокуючим, його не можна викликати в головному потоці (якщо спробувати це зробити - буде видана помилка TypeError
).
Рушій V8, починаючи з версії 8.7, підтримує неблокуючий варіант Atomics.waitAsync(). Цим новим методом можна користуватися в головному потоці. Сьогодні ми розповімо про те, як застосувати ці низькорівневі API для створення мьютекса, який може працювати і в синхронному режимі (в потоках Воркер) і асинхронно (в потоках Воркер або в головному потоці).
Atomics.wait() і Atomics.waitAsync()
Методи Atomics.wait()
і Atomics.waitAsync()
приймають такі параметри:
buffer
: масив типуInt32Array
абоBigInt64Array
, в основі якого лежитьSharedArrayBuffer
.index
: дійсний індекс елемента в масиві.expectedValue
: значення, яке, як ми очікуємо, повинно бути представлено в пам'яті, в тому місці, яке описано за допомогоюbuffer, index
.timeout
: таймаут в мілісекундах (необов'язковий параметр, за замовчуванням встановлений вInfinity
).
Atomics.wait()
повертає рядок. Якщо в зазначеному місці пам'яті не виявляється очікуваного значення - Atomics.wait()
негайно завершує роботу, повертаючи рядок not-equal
. В іншому випадку потік блокується. Для того щоб блокування було б зняте, має відбутися одна з наступних події. Перша - це виклик з іншого потоку методу Atomics.notify()
із зазначенням того місця в пам'яті, яке цікавить метод Atomics.wait()
. Друга - це завершення таймауту. У першому випадку Atomics.wait()
поверне рядок ok
, у другому - рядок зі значенням timed-out
.
Метод Atomics.notify()
приймає такі параметри:
typedArray
: масив типуInt32Array
абоBigInt64Array
, в основі якого лежитьSharedArrayBuffer
.index
: дійсний індекс елемента в масиві.count
: кількість агентів, які очікують повідомлення (необов'язковий параметр, за замовчуванням встановлений вInfinity
).
Метод Atomics.notify()
повідомляє вказану кількість агентів, які очікують повідомлення на адресу, описуваного typedArray
та index
, обходячи їх в порядку FIFO-черзі. Якщо було зроблено кілька викликів Atomics.wait()
або Atomics.waitAsync()
, що спостерігають за одним і тим же місцем в пам'яті, то всі вони виявляються в одній і тій же черзі.
На відміну від методу Atomics.wait()
, метод Atomics.waitAsync()
відразу ж повертає значення в місце виклику. Це може бути одне з наступних значень:
{ async: false, value: 'not-equal' }
- якщо вказане місце в пам'яті не містить очікуваного значення.{ async: false, value: 'timed-out' }
- тільки в тих випадках, коли тайм-аут встановлений в 0.{ async: true, value: promise }
- в інших випадках.
Проміс, після деякого часу, може бути успішно розв'язаний строковим значенням ok
(якщо був викликаний метод Atomics.notify()
, до якого віднесені відомості про те місце в пам'яті, яке було передано Atomics.waitAsync()
) або timed-out
. Цей проміс ніколи не відхиляється.
У наступному прикладі продемонстровані основи використання Atomics.waitAsync()
:
const sab = new SharedArrayBuffer(16); const i32a = new Int32Array(sab); const result = Atomics.waitAsync(i32a, 0, 0, 1000); // | | ^ timeout (opt) // | ^ expected value // ^ index if (result.value === 'not-equal') { // The value in the SharedArrayBuffer was not the expected one. } else { result.value instanceof Promise; // true result.value.then( (value) => { if (value == 'ok') { /* notified */ } else { /* value is 'timed-out' */ } }); } // In this thread, or in another thread: Atomics.notify(i32a, 0);
Тепер давайте поговоримо про те, як створити м'ютекс, яким можна використовувати і в синхронному, і в асинхронному режимах. Треба відзначити, що реалізація синхронної версії мьютекса раніше вже обговорювалася. Наприклад - в цьому матеріалі.
У цьому прикладі ми не будемо використовувати параметр timeout
при виклику Atomics.wait()
і Atomics.waitAsync()
. Цей параметр може бути використаний для реалізації умовних конструкцій, пов'язаних з тайм-аутом.
Наш клас AsyncLock
, що представляє м'ютекс, працює з буфером SharedArrayBuffer
і реалізує наступні методи:
lock()
: блокує потік до того моменту, поки у нас не з'явиться можливість захопити м'ютекс (застосовується лише в потоці Воркер).unlock()
: звільняє мьютекс (цей - протилежністьlock()
).executeLocked(callback)
: намагається захопити блокування не блокуючи при цьому потік. Цей метод може бути використаний в головному потоці. Він планує виконання коллбека на той момент, коли ми зможемо захопити блокування.
Погляньмо на те, як можуть бути реалізовані ці методи. Оголошення класу містить у собі константи і конструктор, який приймає буфер SharedArrayBuffer
.
class AsyncLock { static INDEX = 0; static UNLOCKED = 0; static LOCKED = 1; constructor(sab) { this.sab = sab; this.i32a = new Int32Array(sab); } lock() { /* … */ } unlock() { /* … */ } executeLocked(f) { /* … */ } }
Тут елемент i32a[0]
містить значення LOCKED
або UNLOCKED
. Він, крім того, являє те місце в пам'яті, яке цікавить Atomics.wait()
і Atomics.waitAsync()
. Клас AsyncLock
забезпечує наступні базові можливості:
- Якщо
i32a[0] == LOCKED
і потік виявляється в стані очікування (після викликуAtomics.wait()
абоAtomics.waitAsync()
), спостерігаючи заi32a[0]
, він, в результаті, буде повідомлений. - Після того, як потік отримає повідомлення, він спробує захопити блокування. Якщо йому це вдасться, то, він, коли буде звільняти блокування, викличе
Atomics.notify()
.
Синхронні захоплення і звільнення блокування
Розглянемо код методу lock()
, який можна викликати тільки з потоку воркера.
lock() { while (true) { const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX, /* old value >>> */ AsyncLock.UNLOCKED, /* new value >>> */ AsyncLock.LOCKED); if (oldValue == AsyncLock.UNLOCKED) { return; } Atomics.wait(this.i32a, AsyncLock.INDEX, AsyncLock.LOCKED); // <<< expected value at start } }
Коли з потоку викликається метод lock()
, спочатку він намагається захопити блокування, використовуючи Atomics.compareExchange()
для зміни стану блокування з UNLOCKED
на LOCKED
. Метод Atomics.compareExchange()
намагається виконати атомарну операцію зміни стану блокування, він повертає початкове значення, що знаходиться в заданій області пам'яті. Якщо вихідним значенням було UNLOCKED
, завдяки цьому ми дізнаємося про те, що зміна стану пройшло успішно, і про те, що потік захопив блокування. Нічого більше робити не потрібно.
Якщо ж Atomics.compareExchange()
не зміг змінити стан блокування, це означає, що блокування утримує інший потік. В результаті потік, з якого викликаний метод lock()
, намагається скористатися методом Atomics.wait()
для того щоб дочекатися моменту звільнення блокування іншим потоком. Якщо в області що нас цікавить пам'яті все ще зберігається очікуване значення (в нашому випадку - AsyncLock.LOCKED
), то виклик Atomics.wait()
заблокує потік. Повернення з Atomics.wait()
відбудеться тільки тоді, коли інший потік викличе Atomics.notify()
.
Метод unlock()
звільняє блокування, встановлюючи його в стан UNLOCKED
, і викликає Atomics.notify()
для того щоб повідомити агентів, які очікують зняття цього блокування. Передбачається, що операція зміни стану блокування завжди завершується успішно. Це так через те, що потік, що виконує цю операцію, утримує блокування. Тому ніщо інше в цей час не повинно викликати метод unlock()
.
unlock() { const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX, /* old value >>> */ AsyncLock.LOCKED, /* new value >>> */ AsyncLock.UNLOCKED); if (oldValue != AsyncLock.LOCKED) { throw new Error('Tried to unlock while not holding the mutex'); } Atomics.notify(this.i32a, AsyncLock.INDEX, 1); }
У типовому випадку все відбувається так: блокування вільне і потік T1 захоплює його, змінюючи його стан за допомогою Atomics.compareExchange()
. Потік T2 намагається захопити блокування, викликаючи Atomics.compareExchange()
, але не може змінити його стан. Потім T2 викликає Atomics.wait()
, цей виклик блокує потік. Через деякий час потік T1 звільняє блокування і викликає Atomics.notify()
. Це призводить до того, що виклик Atomics.wait()
у T2 повертає ok
і потік T2 виходить з блокування. Після цього T2 намагається захопити блокування знову. На цей раз йому це вдається.
Тут можуть виникнути два особливих випадки. Їх розбір покликаний продемонструвати причини того, що Atomics.wait()
і Atomics.waitAsync()
перевіряють наявність конкретного значення за заданим індексом елемента масиву. Ось ці випадки:
- T1 утримує блокування, а T2 намагається його захопити. Спочатку T2 намагається змінити стан блокування, користуючись
Atomics.compareExchange()
, але йому це не вдається. Але потім T1 звільняє блокування до того, як T2 встигає викликатиAtomics.wait()
. А вже після цього T2 викликаєAtomics.wait()
, звідки тут же відбувається повернення значенняnot-equal
. У подібному випадку T2 переходить на наступну ітерацію циклу і знову намагається захопити блокування. - T1 утримує блокування, а T2 викликає
Atomics.wait()
і очікує його звільнення. T1 звільняє блокування, T2 активується (здійснюється повернення зAtomics.wait()
) і намагається виконати операціюAtomics.compareExchange()
для захоплення блокування. Але інший потік, T3, виявився швидшим. Він уже встиг сам захопити це блокування. В результаті викликAtomics.compareExchange()
не дозволяє T2 захопити блокування. Після цього T2 знову викликаєAtomics.wait()
і виявляється заблокованим до того моменту, поки T3 не звільнить блокування.
Останній особливий випадок демонструє той факт, що наш м'ютекс працює «нечесно». Може трапитися так, що потік T2 очікував звільнення блокування, але T3 встиг захопити його негайно після його звільнення. Реалізація блокування, найбільш підходяща для реального застосування, може використовувати кілька станів блокування, які існують для того щоб розрізняти ситуації, в яких блокування було просто «захоплене», і в яких «при захопленні стався конфлікт».
Асинхронне захоплення блокування
Неблокуючий метод executeLocked()
можна, на відміну від методу lock()
, викликати з головного потоку. Він отримує, в якості єдиного параметра, коллбек, і планує виклик коллбека після успішного захоплення блокування.
executeLocked(f) { const self = this; async function tryGetLock() { while (true) { const oldValue = Atomics.compareExchange(self.i32a, AsyncLock.INDEX, /* old value >>> */ AsyncLock.UNLOCKED, /* new value >>> */ AsyncLock.LOCKED); if (oldValue == AsyncLock.UNLOCKED) { f(); self.unlock(); return; } const result = Atomics.waitAsync(self.i32a, AsyncLock.INDEX, AsyncLock.LOCKED); // ^ expected value at start await result.value; } } tryGetLock(); }
Внутрішня функція tryGetLock()
спочатку, як і раніше, намагається захопити блокування за допомогою Atomics.compareExchange()
. Якщо виклик цього методу призводить до успішної зміни стану блокування, функція може викликати колбек, а після цього - звільнити блокування і завершити роботу.
Якщо виклик Atomics.compareExchange()
не дозволив захопити блокування, нам потрібно спробувати зробити це знову, в той момент, коли блокування, можливо, буде вільне. Але ми не можемо заблокувати потік і чекати звільнення блокування. Замість цього ми плануємо нову спробу захоплення блокування з використанням методу Atomics.waitAsync()
то повернутий цим методом проміс виконається тоді, коли потік, який утримував блокування, викличе Atomics.notify()
. Після цього потік, який хотів захопити блокування, як і раніше, знову намагається це зробити.
Тут можливі ті особливі випадки, що характерні для синхронної версії (блокування звільняється від одного виклику Atomics.compareExchange()
і Atomics.waitAsync()
; блокування захоплює інший потік, роблячи це між моментами вирішення Проміу і виклику Atomics.compareExchange()
). Тому в подібному коді, в реальних проектах, це необхідно врахувати.
Підсумки
У цьому матеріалі ми розповіли про низькорівневі примітиви синхронізації Atomics.wait()
, Atomics.waitAsync()
і Atomics.notify()
. Ми розібрали приклад створення на їх основі мьютекса, який можна застосовувати і в головному потоці, і в потоках Воркер.
Джерело: v8.dev
Ще немає коментарів