読者です 読者をやめる 読者になる 読者になる

golangのチャンネルでセマフォ的なナニカ

mattnさんのエントリに触発されて、某所で使用したちょっと変わったgolangのチャンネルの使い方をご紹介します。

mattn.kaoriya.net


特定の処理の並列度をある程度までに抑えたい、みたいなコトありますよね?
例えばCPUヘビーな処理の並列数をたかだかコア数くらいまでに抑えたい、とか。

そんなときはバッファ付きチャンネルを用意しておいて、当該処理の前後でそのチャンネルにwrite/readをすることで、セマフォ的な制御ができます。

以下のようなカンジです。

package main

import (
    "fmt"
    "sync"
    "time"
)

var ch chan int = make(chan int, 4) // 並列度を4に制限

func heavyFunc(i int) {
    ch <- 1 // チャンネルのバッファがイッパイになっていたら、ブロックする
    defer func() { <-ch }()

    fmt.Println("start:", i)
    time.Sleep(time.Second)
    fmt.Println("end:", i)
}

func main() {
    wg := sync.WaitGroup{}
    for i := 0; i < 20; i++ {
        wg.Add(1)
        go func(i int) {
            heavyFunc(i)
            wg.Done()
        }(i)
    }
    wg.Wait()
}

並列数が上限に達している際にブロックさせたくない場合は、ちょっと変えて以下のようなカンジにすると良いです。

func heavyFunc(i int) error {
    if len(ch) == cap(ch) {
        // バッファがイッパイだったらさっさとerrorを返す
        return errors.New("too busy!")

    }
    ch <- 1
    defer func() { <-ch }()

    fmt.Println("start:", i)
    time.Sleep(time.Second)
    fmt.Println("end:", i)

    return nil
}

(追記もみてください)
厳密にはlen/capのチェックを同時にくぐり抜けたgoroutineがあるとブロックしてしまう可能性があります。
回避するにはさらなる同期処理が必要になります……けど、そこまでする必要があるケースは少ないんじゃないかなぁ……と。


余談ですけど、これ使うときに並列数をコア数より少なめに設定しちゃうと、結果的にCPUを使い切れなくなるので

  • 並列数はコア数
  • GOMAXPROCSはコア数より少し多め

としておくと重い処理でCPUをフルで使いつつ、平行してそれ以外の処理も細々と処理できるという状況を作れます。

こちらからは以上です


2016-07-07 追記

golangのチャンネルでセマフォ的なナニカ - okzkメモ

len==capのところは、https://play.golang.org/p/CBvBeQ-jO8 のように select を使うとraceが発生しないんじゃないかな

2016/07/07 10:38
b.hatena.ne.jp

上記コメントのPlaygroundのリンクです。

( ゚д゚)ハッ!
channelのwrite時にもselect使えるって知りませんでした! ありがとうございます!!!


ちなみにさくっと諦めてエラーを返すヤツですけど、Web APIみたいなので503 Service Temporarily Unavailable 返すのに使ったことあります。

もっと作りこめばバックプレッシャー制御もできるかもしれませんね。夢ひろがりんぐ