Використання Atomics.wait, Atomics.notify і Atomics.waitAsync

Alex Alex 28 жовтня
Використання Atomics.wait, Atomics.notify і Atomics.waitAsync

Статичні методи Atomics.wait() і Atomics.notify() представляють собою низькорівневі примітиви синхронізації, які можна застосовувати для реалізації м'ютексів та інших подібних механізмів. Але, оскільки метод Atomics.wait() є блокуючим, його не можна викликати в головному потоці (якщо спробувати це зробити - буде видана помилка TypeError).

Рушій V8, починаючи з версії 8.7, підтримує неблокуючий варіант Atomics.waitAsync(). Цим новим методом можна користуватися в головному потоці. Сьогодні ми розповімо про те, як застосувати ці низькорівневі API для створення мьютекса, який може працювати і в синхронному режимі (в потоках Воркер) і асинхронно (в потоках Воркер або в головному потоці).

Atomics.wait() і Atomics.waitAsync()

Методи Atomics.wait() і Atomics.waitAsync() приймають такі параметри:

  • buffer: масив типу Int32Array або BigInt64Array, в основі якого лежить SharedArrayBuffer.
  • index: дійсний індекс елемента в масиві.
  • expectedValue: значення, яке, як ми очікуємо, повинно бути представлено в пам'яті, в тому місці, яке описано за допомогою buffer, index.
  • timeout: таймаут в мілісекундах (необов'язковий параметр, за замовчуванням встановлений в Infinity).

Atomics.wait() повертає рядок. Якщо в зазначеному місці пам'яті не виявляється очікуваного значення - Atomics.wait() негайно завершує роботу, повертаючи рядок not-equal. В іншому випадку потік блокується. Для того щоб блокування було б зняте, має відбутися одна з наступних події. Перша - це виклик з іншого потоку методу Atomics.notify() із зазначенням того місця в пам'яті, яке цікавить метод Atomics.wait(). Друга - це завершення таймауту. У першому випадку Atomics.wait() поверне рядок ok, у другому - рядок зі значенням  timed-out.

Метод Atomics.notify() приймає такі параметри:

  • typedArray: масив типу Int32Array або BigInt64Array, в основі якого лежить SharedArrayBuffer.
  • index: дійсний індекс елемента в масиві.
  • count: кількість агентів, які очікують повідомлення (необов'язковий параметр, за замовчуванням встановлений в Infinity).

Метод Atomics.notify() повідомляє вказану кількість агентів, які очікують повідомлення на адресу, описуваного typedArray та index, обходячи їх в порядку FIFO-черзі. Якщо було зроблено кілька викликів Atomics.wait() або Atomics.waitAsync(), що спостерігають за одним і тим же місцем в пам'яті, то всі вони виявляються в одній і тій же черзі.

На відміну від методу Atomics.wait(), метод Atomics.waitAsync() відразу ж повертає значення в місце виклику. Це може бути одне з наступних значень:

  • { async: false, value: 'not-equal' } - якщо вказане місце в пам'яті не містить очікуваного значення.
  • { async: false, value: 'timed-out' } - тільки в тих випадках, коли тайм-аут встановлений в 0.
  • { async: true, value: promise } - в інших випадках.

Проміс, після деякого часу, може бути успішно розв'язаний строковим значенням ok (якщо був викликаний метод Atomics.notify(), до якого віднесені відомості про те місце в пам'яті, яке було передано Atomics.waitAsync()) або timed-out. Цей проміс ніколи не відхиляється.

У наступному прикладі продемонстровані основи використання Atomics.waitAsync():

const sab = new SharedArrayBuffer(16);
const i32a = new Int32Array(sab);
const result = Atomics.waitAsync(i32a, 0, 0, 1000);
//                                     |  |  ^ timeout (opt)
//                                     |  ^ expected value
//                                     ^ index

if (result.value === 'not-equal') {
  // The value in the SharedArrayBuffer was not the expected one.
} else {
  result.value instanceof Promise; // true
  result.value.then(
    (value) => {
      if (value == 'ok') { /* notified */ }
      else { /* value is 'timed-out' */ }
    });
}

// In this thread, or in another thread:
Atomics.notify(i32a, 0);

Тепер давайте поговоримо про те, як створити м'ютекс, яким можна використовувати і в синхронному, і в асинхронному режимах. Треба відзначити, що реалізація синхронної версії мьютекса раніше вже обговорювалася. Наприклад - в цьому матеріалі.

У цьому прикладі ми не будемо використовувати параметр timeout при виклику Atomics.wait() і Atomics.waitAsync(). Цей параметр може бути використаний для реалізації умовних конструкцій, пов'язаних з тайм-аутом.

Наш клас AsyncLock, що представляє м'ютекс, працює з буфером SharedArrayBuffer і реалізує наступні методи:

  • lock(): блокує потік до того моменту, поки у нас не з'явиться можливість захопити м'ютекс (застосовується лише в потоці Воркер).
  • unlock(): звільняє мьютекс (цей - протилежність lock()).
  • executeLocked(callback): намагається захопити блокування не блокуючи при цьому потік. Цей метод може бути використаний в головному потоці. Він планує виконання коллбека на той момент, коли ми зможемо захопити блокування.

Погляньмо на те, як можуть бути реалізовані ці методи. Оголошення класу містить у собі константи і конструктор, який приймає буфер SharedArrayBuffer.

class AsyncLock {
  static INDEX = 0;
  static UNLOCKED = 0;
  static LOCKED = 1;

  constructor(sab) {
    this.sab = sab;
    this.i32a = new Int32Array(sab);
  }

  lock() {
    /* … */
  }

  unlock() {
    /* … */
  }

  executeLocked(f) {
    /* … */
  }
}

Тут елемент i32a[0] містить значення LOCKED або UNLOCKED. Він, крім того, являє те місце в пам'яті, яке цікавить Atomics.wait() і Atomics.waitAsync(). Клас AsyncLock забезпечує наступні базові можливості:

  1. Якщо i32a[0] == LOCKED і потік виявляється в стані очікування (після виклику Atomics.wait() або Atomics.waitAsync()), спостерігаючи за i32a[0], він, в результаті, буде повідомлений.
  2. Після того, як потік отримає повідомлення, він спробує захопити блокування. Якщо йому це вдасться, то, він, коли буде звільняти блокування, викличе Atomics.notify().

Синхронні захоплення і звільнення блокування

Розглянемо код методу lock(), який можна викликати тільки з потоку воркера.

lock() {
  while (true) {
    const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
                        /* old value >>> */  AsyncLock.UNLOCKED,
                        /* new value >>> */  AsyncLock.LOCKED);
    if (oldValue == AsyncLock.UNLOCKED) {
      return;
    }
    Atomics.wait(this.i32a, AsyncLock.INDEX,
                 AsyncLock.LOCKED); // <<< expected value at start
  }
}

Коли з потоку викликається метод lock(), спочатку він намагається захопити блокування, використовуючи Atomics.compareExchange() для зміни стану блокування з UNLOCKED на LOCKED. Метод Atomics.compareExchange() намагається виконати атомарну операцію зміни стану блокування, він повертає початкове значення, що знаходиться в заданій області пам'яті. Якщо вихідним значенням було UNLOCKED, завдяки цьому ми дізнаємося про те, що зміна стану пройшло успішно, і про те, що потік захопив блокування. Нічого більше робити не потрібно.

Якщо ж Atomics.compareExchange() не зміг змінити стан блокування, це означає, що блокування утримує інший потік. В результаті потік, з якого викликаний метод lock(), намагається скористатися методом Atomics.wait() для того щоб дочекатися моменту звільнення блокування іншим потоком. Якщо в області що нас цікавить  пам'яті все ще зберігається очікуване значення (в нашому випадку - AsyncLock.LOCKED), то виклик Atomics.wait() заблокує потік. Повернення з Atomics.wait() відбудеться тільки тоді, коли інший потік викличе Atomics.notify().

Метод unlock() звільняє блокування, встановлюючи його в стан UNLOCKED, і викликає Atomics.notify() для того щоб повідомити агентів, які очікують зняття цього блокування. Передбачається, що операція зміни стану блокування завжди завершується успішно. Це так через те, що потік, що виконує цю операцію, утримує блокування. Тому ніщо інше в цей час не повинно викликати метод unlock().

unlock() {
  const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
                      /* old value >>> */  AsyncLock.LOCKED,
                      /* new value >>> */  AsyncLock.UNLOCKED);
  if (oldValue != AsyncLock.LOCKED) {
    throw new Error('Tried to unlock while not holding the mutex');
  }
  Atomics.notify(this.i32a, AsyncLock.INDEX, 1);
}

У типовому випадку все відбувається так: блокування вільне і потік T1 захоплює його, змінюючи його стан за допомогою Atomics.compareExchange(). Потік T2 намагається захопити блокування, викликаючи Atomics.compareExchange(), але не може змінити його стан. Потім T2 викликає Atomics.wait(), цей виклик блокує потік. Через деякий час потік T1 звільняє блокування і викликає Atomics.notify(). Це призводить до того, що виклик Atomics.wait() у T2 повертає ok і потік T2 виходить з блокування. Після цього T2 намагається захопити блокування знову. На цей раз йому це вдається.

Тут можуть виникнути два особливих випадки. Їх розбір покликаний продемонструвати причини того, що Atomics.wait() і Atomics.waitAsync() перевіряють наявність конкретного значення за заданим індексом елемента масиву. Ось ці випадки:

  • T1 утримує блокування, а T2 намагається його захопити. Спочатку T2 намагається змінити стан блокування, користуючись Atomics.compareExchange(), але йому це не вдається. Але потім T1 звільняє блокування до того, як T2 встигає викликати Atomics.wait(). А вже після цього T2 викликає Atomics.wait(), звідки тут же відбувається повернення значення not-equal. У подібному випадку T2 переходить на наступну ітерацію циклу і знову намагається захопити блокування.
  • T1 утримує блокування, а T2 викликає Atomics.wait() і очікує його звільнення. T1 звільняє блокування, T2 активується (здійснюється повернення з Atomics.wait()) і намагається виконати операцію Atomics.compareExchange() для захоплення блокування. Але інший потік, T3, виявився швидшим. Він уже встиг сам захопити це блокування. В результаті виклик Atomics.compareExchange() не дозволяє T2 захопити блокування. Після цього T2 знову викликає Atomics.wait() і виявляється заблокованим до того моменту, поки T3 не звільнить блокування.

Останній особливий випадок демонструє той факт, що наш м'ютекс працює «нечесно». Може трапитися так, що потік T2 очікував звільнення блокування, але T3 встиг захопити його негайно після його звільнення. Реалізація блокування, найбільш підходяща для реального застосування, може використовувати кілька станів блокування, які існують для того щоб розрізняти ситуації, в яких блокування було просто «захоплене», і в яких «при захопленні стався конфлікт».

Асинхронне захоплення блокування

Неблокуючий метод executeLocked() можна, на відміну від методу lock(), викликати з головного потоку. Він отримує, в якості єдиного параметра, коллбек, і планує виклик коллбека після успішного захоплення блокування.

executeLocked(f) {
  const self = this;

  async function tryGetLock() {
    while (true) {
      const oldValue = Atomics.compareExchange(self.i32a, AsyncLock.INDEX,
                          /* old value >>> */  AsyncLock.UNLOCKED,
                          /* new value >>> */  AsyncLock.LOCKED);
      if (oldValue == AsyncLock.UNLOCKED) {
        f();
        self.unlock();
        return;
      }
      const result = Atomics.waitAsync(self.i32a, AsyncLock.INDEX,
                                       AsyncLock.LOCKED);
                                   //  ^ expected value at start
      await result.value;
    }
  }

  tryGetLock();
}

Внутрішня функція tryGetLock() спочатку, як і раніше, намагається захопити блокування за допомогою Atomics.compareExchange(). Якщо виклик цього методу призводить до успішної зміни стану блокування, функція може викликати колбек, а після цього - звільнити блокування і завершити роботу.

Якщо виклик Atomics.compareExchange() не дозволив захопити блокування, нам потрібно спробувати зробити це знову, в той момент, коли блокування, можливо, буде вільне. Але ми не можемо заблокувати потік і чекати звільнення блокування. Замість цього ми плануємо нову спробу захоплення блокування з використанням методу Atomics.waitAsync()   то повернутий цим методом проміс виконається тоді, коли потік, який утримував блокування, викличе Atomics.notify(). Після цього потік, який хотів захопити блокування, як і раніше, знову намагається це зробити.

Тут можливі ті особливі випадки, що характерні для синхронної версії (блокування звільняється від одного виклику Atomics.compareExchange() і Atomics.waitAsync(); блокування захоплює інший потік, роблячи це між моментами вирішення Проміу і виклику Atomics.compareExchange()). Тому в подібному коді, в реальних проектах, це необхідно врахувати.

Підсумки

У цьому матеріалі ми розповіли про низькорівневі примітиви синхронізації Atomics.wait(), Atomics.waitAsync() і Atomics.notify(). Ми розібрали приклад створення на їх основі мьютекса, який можна застосовувати і в головному потоці, і в потоках Воркер.

Джерело: v8.dev

Коментарі (0)

    Ще немає коментарів

Щоб залишити коментар необхідно авторизуватися.

Війти / Зареєструватися