Просунутий паралелізм Go

Просунутий паралелізм Go
Переклад 11 хв. читання
19 вересня 2023

Якщо ви використовували Go деякий час, ви, ймовірно, знаєте деякі з основних примітивів паралелізму Go:

  • Ключове слово go для створення підпрограм
  • Канали, для зв'язку між підпрограмами
  • Контекстний пакет для поширення скасування (propagating cancellation)
  • Пакети sync та sync/atomic для примітивів нижчого рівня, таких як м'ютекси та атомарний доступ до пам'яті

Ці можливості мови та пакети надають багатий набір інструментів для створення паралельних програм. Можливо, ви ще не знаєте про набір високорівневих примітивів паралелізму, доступних у "розширеній стандартній бібліотеці", що знаходиться на golang.org/x/sync. У цій статті ми їх розглянемо.

Пакет singleflight

Як зазначено у документації до пакета, цей модуль надає механізм придушення дублюючих викликів функцій.

Цей пакет надзвичайно корисний у випадках, коли у відповідь на дії користувача ви виконуєте щось обчислювально дороге (або просто повільне, наприклад, доступ до мережі). Наприклад, скажімо, у вас є база даних з інформацією про погоду у кожному місті, і ви хочете надати її у вигляді API. У деяких випадках кілька користувачів можуть одночасно запитувати погоду в одному місті.

Коли таке трапляється, чи не було б чудово, якби ви могли просто зробити запит до бази даних, а потім поділитися результатом з усіма запитами, що очікують на нього? Це саме те, що робить пакет singleflight!

Щоб використовувати його, створіть singleflight.Group. Для коректної роботи він має бути спільним для всіх запитів. Потім оберніть повільну або дорогу операцію у виклик group.Do(key, fn). Кілька паралельних запитів з одним і тим же ключем викличуть fn лише один раз, і результат буде повернуто всім, хто його викликав, як тільки fn повернеться.

Ось як це виглядає на практиці:

package weather

type Info struct {
    TempC, TempF int // temperature in Celsius and Farenheit
    Conditions string // "sunny", "snowing", etc
}

var group singleflight.Group

func City(city string) (*Info, error) {
    results, err, _ := group.Do(city, func() (interface{}, error) {
        info, err := fetchWeatherFromDB(city) // slow operation
        return info, err
    })
    if err != nil {
        return nil, fmt.Errorf("weather.City %s: %w", city, err)
    }
    return results.(*Info), nil
}

Зверніть увагу, що замикання, яке ми передаємо в group.Do, повинно повертати (interface{}, error), щоб працювати з системою типу Go. Третє значення, що повертається з group.Do, яке ігнорується у наведеному вище прикладі, вказує на те, чи був результат спільним для декількох викликів, чи ні.

Пакет errgroup

Ще одним неоціненним пакетом є пакет errgroup. Його найкраще можна описати як sync.WaitGroup, але в ньому задачі повертають помилки, які передаються назад очікувачу.

Цей пакет корисний, коли у вас є кілька операцій, на виконання яких ви хочете зачекати, але також хочете визначити, чи всі вони завершилися успішно. Наприклад, спираючись на приклад з погодою, наведений вище, припустимо, що ви хочете переглянути погоду для кількох міст одночасно, і припинити роботу, якщо один з пошуків завершиться невдачею.

Почніть з визначення errgroup.Group і використовуйте метод group.Go(fn func() error) для кожного міста. Цей метод створює підпрограму для виконання завдання. Коли ви створили всі потрібні виклики, скористайтеся методом group.Wait(), щоб дочекатися їхнього завершення. Зверніть увагу, що цей метод повертає помилку, на відміну від еквівалента sync.WaitGroup. Помилка дорівнює нулю тоді та тільки тоді, коли всі завдання повернули нульову помилку.

На практиці це виглядає так:

func Cities(cities ...string) ([]*Info, error) {
    var g errgroup.Group
    var mu sync.Mutex
    res := make([]*Info, len(cities)) // res[i] corresponds to cities[i]

    for i, city := range cities {
        i, city := i, city // create locals for closure below
        g.Go(func() error {
            info, err := City(city)
            mu.Lock()
            res[i] = info
            mu.Unlock()
            return err
        })
    }
    if err := g.Wait(); err != nil {
        return nil, err
    }
    return res, nil
}

Тут ми створюємо зріз результатів, щоб кожна підпрограма могла писати у свій власний індекс. Хоча наведений вище код безпечний навіть без м'ютексу mu, оскільки кожна підпрограма записує у власний запис у зрізі, ми все одно використовуємо його на випадок, якщо з часом код буде змінено.

Обмежений паралелізм

Наведений вище код буде шукати інформацію про погоду для всіх заданих міст одночасно. Це добре, коли кількість міст невелика, але може спричинити проблеми з продуктивністю, якщо кількість міст велика. У таких випадках корисно ввести обмежений паралелізм.

У Go дуже легко створити обмежений паралелізм за допомогою семафорів. Семафор - це примітив паралелізму, з яким ви могли зустрічатися, якщо вивчали комп'ютерні науки, але якщо ні, не хвилюйтеся. Ви можете використовувати семафори для різних цілей, але ми будемо використовувати їх лише для відстеження кількості запущених задач і блокування, поки не звільниться місце для запуску іншої задачі.

У Go ми можемо досягти цього завдяки розумному використанню каналів! Якщо ми хочемо дозволити виконувати до 10 завдань одночасно, ми створюємо канал з місцем для 10 елементів: semaphore := make(chan struct{}, 10). Ви можете уявити це як трубу, в яку може поміститися 10 кульок.

Для запуску нового завдання, блокування, якщо вже запущено занадто багато задач, ми просто намагаємося відправити значення по каналу: semaphore <- struct{}{}. Це аналогічно спробі проштовхнути ще одну кульку в трубу. Якщо труба переповнена, то чекаємо, поки звільниться місце.

Коли задача завершиться, позначте її як виконану, забравши значення з каналу: <-semaphore. Це аналогічно витягуванню кульки на іншому кінці труби, що залишає місце для проштовхування іншої кульки (початок виконання іншого завдання).

І це все! Наша модифікована функція Cities має такий вигляд:

func Cities(cities ...string) ([]*Info, error) {
    var g errgroup.Group
    var mu sync.Mutex
    res := make([]*Info, len(cities)) // res[i] corresponds to cities[i]
    sem := make(chan struct{}, 10)
    for i, city := range cities {
        i, city := i, city // create locals for closure below
        sem <- struct{}{}
        g.Go(func() error {
            info, err := City(city)
            mu.Lock()
            res[i] = info
            mu.Unlock()
            <-sem
            return err
        })
    }
    if err := g.Wait(); err != nil {
        return nil, err
    }
    return res, nil
}

Зважений обмежений паралелізм

І нарешті, іноді вам потрібен обмежений паралелізм, але не всі завдання однаково дорогі. В такому випадку кількість ресурсів, які ми будемо споживати, буде сильно відрізнятися в залежності від розподілу дешевих і дорогих завдань, а також від того, як вони починаються.

Кращим рішенням для цього випадку є використання зваженого обмеженого паралелізму. Це працює просто: замість того, щоб міркувати про кількість завдань, які ми хочемо запустити одночасно, ми придумуємо "вартість" для кожного завдання та отримуємо та звільняємо цю вартість від семафора.

Ми більше не можемо моделювати це за допомогою каналів, оскільки нам потрібна вся вартість, отримана і звільнена одночасно. На щастя, "розширена стандартна бібліотека" знову приходить нам на допомогу! Пакет golang.org/x/sync/sempahore надає зважену реалізацію семафора саме для цієї мети.

Операція sem <- struct{}{} називається "Отримати", а операція <-sem - "Звільнити". Ви можете помітити, що метод semaphore.Acquire повертає помилку; це тому, що його можна використовувати з контекстним пакетом для дострокового переривання операції. У цьому прикладі ми проігноруємо цю можливість.

Приклад пошуку прогнозу погоди насправді занадто простий, щоб вимагати використання зваженого семафора, але для простоти уявімо, що вартість залежить від довжини назви міста. Тоді ми прийдемо до наступного:

func Cities(cities ...string) ([]*Info, error) {
    ctx := context.TODO() // replace with a real context
    var g errgroup.Group
    var mu sync.Mutex
    res := make([]*Info, len(cities)) // res[i] corresponds to cities[i]
    sem := semaphore.NewWeighted(100) // 100 chars processed concurrently
    for i, city := range cities {
        i, city := i, city // create locals for closure below
        cost := int64(len(city))
        if err := sem.Acquire(ctx, cost); err != nil {
            break
        }
        g.Go(func() error {
            info, err := City(city)
            mu.Lock()
            res[i] = info
            mu.Unlock()
            sem.Release(cost)
            return err
        })
    }
    if err := g.Wait(); err != nil {
        return nil, err
    } else if err := ctx.Err(); err != nil {
        return nil, err
    }
    return res, nil
}

Висновок

Наведені вище приклади показують, як легко додати паралелізм до програми на Go, а потім налаштувати його відповідно до ваших потреб.

Джерело: Advanced Go Concurrency
Помітили помилку? Повідомте автору, для цього достатньо виділити текст з помилкою та натиснути Ctrl+Enter
Коментарі (0)

    Ще немає коментарів

Щоб залишити коментар необхідно авторизуватися.

Вхід / Реєстрація

Читайте також: fmt.errorf w, fmt.errorf, go if and