Продвинутые паттерны конкурентности в Go

Перевод первой статьи от Roberto Clapis про продвинутые паттерны конкурентности в Go.

Написание кода это сложно. Написание кода, связанного с параллелизмом и конкурентностью, ещё сложнее. Делать всё это, поддерживая эффективность программы - это настоящий вызов.
Сегодня* (23.02.2019) я решил поделиться несколькими трюками для некоторых необычных ситуаций.

Каналы временных операций

Иногда вы хотите ограничить время работы канала: попробовать выполнить что-то и если не получилось в срок - отступиться.
Для решения этой задачи можно воспользоваться пакетами context или time, оба подходят отлично. context может быть более идиоматично, а time немного эффективнее, но в целом они идентичны:

func ToChanTimedContext(ctx context.Context, d time.Duration, message Type, c chan<- Type) (written bool) {
	ctx, cancel := context.WithTimeout(ctx, d)
	defer cancel()
	select {
	case c <- message:
		return true
	case <-ctx.Done():
		return false
	}
}

func ToChanTimedTimer(d time.Duration, message Type, c chan<- Type) (written bool) {
	t := time.NewTimer(d)
	defer t.Stop()
	select {
	case c <- message:
		return true
	case <-t.C:
		return false
	}
}

Попробовать context в Playground Попробовать timer в Playground

С точки зрения перфоманса здесь нет особых различий, единственная разница, которую я нашёл это то, что решение, использующее context, выделяет больше памяти (и ещё таймер можно переиспользовать в будущем).

Но будьте осторожны при переиспользовании таймера, просто помните, что это может не стоить риска и вы просто сэкономите 10 allocs/op.

Если вам интересны трюки с таймерами, можете посмотреть их здесь.

Первым пришёл первым обслужен

Иногда вы хотите отправить одно и то же сообщение в несколько каналов, причём сначала записывать в тот, который освободится первым, но никогда не писать одно сообщение дважды в один и тот же канал.

Чтобы сделать это есть два пути: вы можете скрыть канал с помощью локальной переменной и тем самым отключить соответствующие case‘ы внутри select или использовать горутины и синронизацию.

func FirstComeFirstServedSelect(message Type, a, b chan<- Type) {  
    for i := 0; i < 2; i++ {  
       select {  
       case a <- message:  
          a = nil  
       case b <- message:  
          b = nil  
       }  
    }  
}  
  
func FirstComeFirstServedGoroutines(message Type, a, b chan<- Type) {  
    var wg sync.WaitGroup  
    wg.Add(2)  
    go func() { a <- message; wg.Done() }()  
    go func() { b <- message; wg.Done() }()  
    wg.Wait()  
}

Обратите внимание, что в этом случае производительность может иметь значение, и на момент написания статьи решение, порождающее горутины, занимало почти в 4 раза больше времени, чем решение с select.

Если во время компиляции количество каналов неизвестно, то первое решение становится сложнее, но всё ещё возможно, в то же время второе решение в основном остаётся без изменений.

Примечание: если ваша программа имеет много мест с непредопределенными размерами каналов, возможно стоит пересмотреть дизайн, скорее всего это можно упростить.

Если после анализа кода, количество каналов всё ещё неопределено, то вот два решения для этой ситуации:

func FirstComeFirstServedGoroutinesVariadic(message Type, chs ...chan<- Type) {  
    var wg sync.WaitGroup  
    wg.Add(len(chs))  
    for _, c := range chs {  
       go func() { c <- message; wg.Done() }()  
    }  
    wg.Wait()  
}  
  
func FirstComeFirstServedSelectVariadic(message Type, chs ...chan<- Type) {  
    cases := make([]reflect.SelectCase, len(chs))  
    for i, ch := range chs {  
       cases[i] = reflect.SelectCase{  
          Dir:  reflect.SelectSend,  
          Chan: reflect.ValueOf(ch),  
          Send: reflect.ValueOf(message),  
       }  
    }  
    for i := 0; i < len(chs); i++ {  
       chosen, _, _ := reflect.Select(cases)  
       cases[chosen].Chan = reflect.ValueOf(nil)  
    }  
}

Не нужно и говорить, что решение, использующее рефлексию, почти на два порядка медленее чем решение с горутинами, а также оно менее читаемо, пожалуйста не используйте его.

Объединение подходов

В случае если вам нужно решить обе вышеописанные проблемы: писать в несколько каналов и отменять запись, если она занимает продолжительное время, то здесь есть также два решения. Первое с использованием time + select и другое с context + go. Первое больше подходит, если количество каналов во время компиляции известно, второе можно использовать, когда количество каналов неизвестно.

func ToChansTimedTimerSelect(d time.Duration, message Type, a, b chan Type) (written int) {  
    t := time.NewTimer(d)  
    for i := 0; i < 2; i++ {  
       select {  
       case a <- message:  
          a = nil  
       case b <- message:  
          b = nil  
       case <-t.C:  
          return i  
       }  
    }  
    t.Stop()  
    return 2  
}  
  
func ToChansTimedContextGoroutines(ctx context.Context, d time.Duration, message Type, ch ...chan Type) (written int) {  
    ctx, cancel := context.WithTimeout(ctx, d)  
    defer cancel()  
    var (  
       wr int32  
       wg sync.WaitGroup  
    )  
    wg.Add(len(ch))  
    for _, c := range ch {  
       go func() {  
          defer wg.Done()  
          select {  
          case c <- message:  
             atomic.AddInt32(&wr, 1)  
          case <-ctx.Done():  
          }  
       }()  
    }  
    wg.Wait()  
    return int(wr)  
}

Создано: 10.07.2025
Обновлено: 23.07.2025
Технологии