Продвинутые паттерны конкурентности в Go
Перевод первой статьи от Roberto Clapis про продвинутые паттерны конкурентности в Go.
Написание кода это сложно. Написание кода, связанного с параллелизмом и конкурентностью, ещё сложнее. Делать всё это, поддерживая эффективность программы - это настоящий вызов.
Сегодня* (23.02.2019) я решил поделиться несколькими трюками для некоторых необычных ситуаций.
Каналы временных операций
Иногда вы хотите ограничить время работы канала: попробовать выполнить что-то и если не получилось в срок - отступиться.
Для решения этой задачи можно воспользоваться пакетами context
или time
, оба подходят отлично. context
может быть более идиоматично, а time
немного эффективнее, но в целом они идентичны:
func ToChanTimedContext(ctx context.Context, d time.Duration, message Type, c chan<- Type) (written bool) {
ctx, cancel := context.WithTimeout(ctx, d)
defer cancel()
select {
case c <- message:
return true
case <-ctx.Done():
return false
}
}
func ToChanTimedTimer(d time.Duration, message Type, c chan<- Type) (written bool) {
t := time.NewTimer(d)
defer t.Stop()
select {
case c <- message:
return true
case <-t.C:
return false
}
}
Попробовать context в Playground Попробовать timer в Playground
С точки зрения перфоманса здесь нет особых различий, единственная разница, которую я нашёл это то, что решение, использующее context
, выделяет больше памяти (и ещё таймер можно переиспользовать в будущем).
Но будьте осторожны при переиспользовании таймера, просто помните, что это может не стоить риска и вы просто сэкономите 10 allocs/op.
Если вам интересны трюки с таймерами, можете посмотреть их здесь.
Первым пришёл первым обслужен
Иногда вы хотите отправить одно и то же сообщение в несколько каналов, причём сначала записывать в тот, который освободится первым, но никогда не писать одно сообщение дважды в один и тот же канал.
Чтобы сделать это есть два пути: вы можете скрыть канал с помощью локальной переменной и тем самым отключить соответствующие case
‘ы внутри select
или использовать горутины и синронизацию.
func FirstComeFirstServedSelect(message Type, a, b chan<- Type) {
for i := 0; i < 2; i++ {
select {
case a <- message:
a = nil
case b <- message:
b = nil
}
}
}
func FirstComeFirstServedGoroutines(message Type, a, b chan<- Type) {
var wg sync.WaitGroup
wg.Add(2)
go func() { a <- message; wg.Done() }()
go func() { b <- message; wg.Done() }()
wg.Wait()
}
Обратите внимание, что в этом случае производительность может иметь значение, и на момент написания статьи решение, порождающее горутины, занимало почти в 4 раза больше времени, чем решение с select
.
Если во время компиляции количество каналов неизвестно, то первое решение становится сложнее, но всё ещё возможно, в то же время второе решение в основном остаётся без изменений.
Примечание: если ваша программа имеет много мест с непредопределенными размерами каналов, возможно стоит пересмотреть дизайн, скорее всего это можно упростить.
Если после анализа кода, количество каналов всё ещё неопределено, то вот два решения для этой ситуации:
func FirstComeFirstServedGoroutinesVariadic(message Type, chs ...chan<- Type) {
var wg sync.WaitGroup
wg.Add(len(chs))
for _, c := range chs {
go func() { c <- message; wg.Done() }()
}
wg.Wait()
}
func FirstComeFirstServedSelectVariadic(message Type, chs ...chan<- Type) {
cases := make([]reflect.SelectCase, len(chs))
for i, ch := range chs {
cases[i] = reflect.SelectCase{
Dir: reflect.SelectSend,
Chan: reflect.ValueOf(ch),
Send: reflect.ValueOf(message),
}
}
for i := 0; i < len(chs); i++ {
chosen, _, _ := reflect.Select(cases)
cases[chosen].Chan = reflect.ValueOf(nil)
}
}
Не нужно и говорить, что решение, использующее рефлексию, почти на два порядка медленее чем решение с горутинами, а также оно менее читаемо, пожалуйста не используйте его.
Объединение подходов
В случае если вам нужно решить обе вышеописанные проблемы: писать в несколько каналов и отменять запись, если она занимает продолжительное время, то здесь есть также два решения. Первое с использованием time
+ select
и другое с context
+ go
. Первое больше подходит, если количество каналов во время компиляции известно, второе можно использовать, когда количество каналов неизвестно.
func ToChansTimedTimerSelect(d time.Duration, message Type, a, b chan Type) (written int) {
t := time.NewTimer(d)
for i := 0; i < 2; i++ {
select {
case a <- message:
a = nil
case b <- message:
b = nil
case <-t.C:
return i
}
}
t.Stop()
return 2
}
func ToChansTimedContextGoroutines(ctx context.Context, d time.Duration, message Type, ch ...chan Type) (written int) {
ctx, cancel := context.WithTimeout(ctx, d)
defer cancel()
var (
wr int32
wg sync.WaitGroup
)
wg.Add(len(ch))
for _, c := range ch {
go func() {
defer wg.Done()
select {
case c <- message:
atomic.AddInt32(&wr, 1)
case <-ctx.Done():
}
}()
}
wg.Wait()
return int(wr)
}
Обновлено: 23.07.2025