読者です 読者をやめる 読者になる 読者になる

Re: golang の channel を使って Dispatcher-Worker を作り goroutine 爆発させないようにする

こちらを読みました。

blog.kaneshin.co

channel自体にdispatch機構があるからもっとシンプルに書けるのでは?と思って書き直したのがこちら

コードだけぶん投げてもアレなので、あとで解説書きます。

ついでに「go1.7で標準化されたcontext使ったらどうなるか」も気力次第で書くかもしれません。

というわけで追記というか、本編というか、解説です。

channel整理

元のコードを読んでいくと、

  1. dispatcherのqueueにjobを突っ込む
  2. dispatcherがidle状態のworkerをpoolから取り出す
  3. 同じ行で取り出したworkerのqueueにjobを渡す
  4. workerがjobを受け取って処理する

というカンジの処理の流れになってるんですが、dispatch機構自体がchannelにはあるので、

  1. dispatcherのqueueにjobを突っ込む
  2. workerがjobを受け取って処理する

とするだけでOKです。

ここまでで、idle状態のworkerを保存するpoolとworker毎のqueueのchannelが不要になりました。

終了処理

終了処理用にわざわざ別でquiteというchannelを使っていますが、このくらいシンプルなケースではchannelをcloseするだけでOKです。

worker側も単純にforループ回すだけになります。 ループはchannelがcloseされて空になった時点で抜けてくれます。

ここまででworkerが単なるgoroutineになりました。
また、queueをcloseすれば最終的に全workerが終了するので、dispatcherがworkerのリストを保持する必要もなくなりました。

Wait??

元コードのWaitは「queueにつっこんだjobの処理が全部終わって完全にidleになるまでブロックする」というコードになってます。

main()で一回しか呼ばれていないのでなんともいえないのですが、このWait()の実装意図が以下のように「ループで登録されたモノが全部処理されるまで待ち合わせたい」ものだとこの実装ではマズイです。

func run(d *Dispatcher) {
    for i := 0; i < 100; i++ {
        url := fmt.Sprintf("http://placehold.it/%dx%d", i, i)
        d.Add(url)
    }
    d.Wait()
    // 上でAddした処理が全部終わったらナニカしたい!!!
}

このrun()が並列でよばれると、自分で登録したモノだけじゃなく、他ので登録されたモノ全部が終わるまで待ってしまうからです。 この場合はrun()内で、個別にWaitGroupを作ってあげて、ゴニョゴニョしてあげる必要があります。

また、もしかするとWait()の実装意図は単に終了処理用で「全部のjobの処理が終わるまで待ちたい!」というモノかもしれません。 その場合はStop()に統合しちゃえばいいわけですし、書き直した方ではqueueをcloseした後、全部のgoroutineが処理を終えるまで待つようにしています。

Stop??

元コードは引数でimmediatelyかどうかを区別していますが「Stop()にboolな引数がある」というのが個人的には驚き最小ではないのでStop()とStopImmediately()でメソッド自体分けてしまってます。
こうしておけば「Stop()があってStopImmediately()があるということは、StopImmediately()はきっとなにか通常ではやらないことをするんだろうな」と身構えることができるので。

んでImmediatelyな実装の方は「queueにつっこまれたけど処理しないjob」が発生するというモノなんですけど、それを単にqueueから読み捨てるというカタチで書きなおしています。
ただし、workerで処理中のモノがあればその分だけは処理完了するまで待ち合わせます。

元コードが処理中のモノは終了処理用のchannelに書き込むだけで、処理中のモノの完了を待ち合わせていない(!?)ので、その点は挙動が異なっています。

余談ですけど、値に意味の無い終了処理用のchannelは、ダミー値を書き込むよりもcloseした方がいいです。
readしようとしているgoroutineが一つだけだったらいいんですけど、複数あったらその分書き込まなきゃいけませんから。

// コレよりも
w.quit <- struct{}{}
// こっちがベター
close(w.quit)

感想

書きなおしてみると、ただのworkerパターンになっちゃいました(;・∀・)
dispatcher周りのサンプルとしてなら、pub/subパターンの方を実装した方が良かったんじゃないかなぁ……

あと、これにcontextを使うと、標準的な方法でworker側で処理の中断制御ができるようになる……というハナシはまた後日で。