続、workerパターンをcontext化してみたら……

みなさん「みんなのGo言語」は予約ポチりましたか? 私はポチりました!

そんな「みんなのGo言語」の著者の一人であるid:lestrrat さんからの前エントリに対してマサカリが飛んできてます。

workerパターンをcontext化してみたら…… - okzkメモ

context.Contextを揮発性のないものでラップして持つのはよくないと思う。このほうがよりGoっぽいと思うが、どうだろう https://gist.github.com/lestrrat/c9b78369cf9b9c5d9b0c909ed1e2452e

2016/08/23 18:34

コメント内のgistのリンクはこちら

ちょっと時間あいちゃいましたけど、これについて思ったことを2点ほど……

まずはcontext関係ないとこから……

contextの使い方へのツッコミなのにいきなりcontextとは無関係なトコに言及しちゃうのですが、同時実行数の制御の仕方が前回までのコードと異なってるのにモヤモヤしちゃいました。

前回までのコードの方は以下のようなカンジ(以下「worker方式)」で

  1. 同時実行数分のworkerのgoroutineを立ち上げる
  2. その中で、ループでchannel経由で回ってきたjobを処理する

んで、頂いたgistの方は、こんなカンジ(以下「セマフォ方式)」

  1. job分goroutineを立ち上げておいて
  2. セマフォで同時実行数を制限する

元々の「goroutineを爆発させないため」という記事の流れもあったんでworker方式だったというのもあるんですけど、 実際どちらの実装方式も選べるなら、個人的には以下のような理由でやっぱりworker方式を選ぶような気がします。

  • なんとなくセマフォ方式よりworker方式の方が可読性が高い気がする
  • goroutineのそのもののコスト(どんなに軽量でもゼロではない)

可読性は個人の主観によるのでどうしても宗教論争になっちゃいますけど、後者の実行コストの方を検証してみましょう。

というわけで、簡易なベンチマークを用意してみました。

package main

import (
  "sync"
  "testing"
  "time"
)

const concurrency = 4

func BenchmarkLimitByWorkers(b *testing.B) {
  ch := make(chan struct{}, 10000)

  wg := sync.WaitGroup{}
  wg.Add(concurrency)
  for i := 0; i < concurrency; i++ {
    go func() {
      for range ch {
        // 何かしら処理の代わり
        // ベンチマークそのものに支配的にならないようにマイクロ秒だけsleep
        time.Sleep(time.Microsecond)
      }
      wg.Done()
    }()
  }

  for i := 0; i < b.N; i++ {
    ch <- struct{}{}
  }
  close(ch)
  wg.Wait()
}

func BenchmarkLimitBySemaphore(b *testing.B) {
  ch := make(chan struct{}, concurrency)

  wg := sync.WaitGroup{}
  wg.Add(b.N)
  for i := 0; i < b.N; i++ {
    go func() {
      ch <- struct{}{}
      // 何かしら処理の代わり
      // ベンチマークそのものに支配的にならないようにマイクロ秒だけsleep
      time.Sleep(time.Microsecond)
      <-ch
      wg.Done()
    }()
  }
  wg.Wait()
}

手元で実行してみた結果は以下のようなカンジ。

BenchmarkLimitByWorkers-4         300000              4872 ns/op
BenchmarkLimitBySemaphore-4       200000             12835 ns/op

想像どおりセマフォ方式の方がオーバーヘッドが大きいようです。
……が、結局のトコ無視できるくらいですね。

そんなわけでよっぽどパフォーマンス重視のトコ以外は可読性で判断すればいいという結論なんですけど、みなさん、どちらがお好みでしょうか?

contextの揮発性について

えーっと、そもそもなんですけど、contextって揮発性が求められるんですっけ???
極端な例でいうと、context.Background()で帰ってくるcontextなんかは全然揮発しないpackageのvarで定義されちゃってますけど……

さてさて、contextの用途的に、

  • Dispatcher全体のcontext
  • その処理の一部であるjobの処理のcontext

という流れで派生関係があるというのは、個人的には違和感ないですし、その派生関係を利用してDispacher全体をキャンセルさせるStopImmediately()みたいな実装もできたわけで……
# というかあの例はStopImmediately()のためにcontext対応したようなもんですケド(;・∀・)

んでもって、Start/StopみたいにライフサイクルがはっきりしているDispatcherのようなモノのcontextは、自身で(ラップして)管理させるほうがキレイなんじゃないかなぁと思います。


とはいえこういう規約もあるわけで、判断に悩むトコではあるんですけどね……

// Do not store Contexts inside a struct type; instead, pass a Context
// explicitly to each function that needs it. The Context should be the first
// parameter, typically named ctx:
//
//  func DoSomething(ctx context.Context, arg Arg) error {
//    // ... use ctx ...
//  }

でもこれはfunctionで使うcontextの渡し方に限定した内容だと思えば、Dispatcherが内部にcontextを保持するのは矛盾しないかなぁ……と思ってみたり。

い、いかがでしょうか???

……やっぱり、struct typeでラップするのは、、、ダメですかね???

workerパターンをcontext化してみたら……

はい、というわけで、前記事のworkerパターンをcontextつかったらどーなるか、についてです。
前の記事や、その元記事のソースを読んでいる前提ですので、未読の方はそちらの確認からお願いします。

さて、ざっくりとした変更の方針ですけど、以下の2点です。

  • contextを使うことで、workerの実装側でキャンセルできるようにする
  • workerに渡す値はcontext経由にする。

というのをヤッツケでやってつくってみました

以下変更点の解説です。

まず、元実装ではworkerで実行される処理がベタ書きだったのを汎用化するため、dispatcherのqueueに入れるjobをとqueueの定義を変更します。

type (
  job struct {
    proc func(context.Context)
    ctx  context.Context
  }

  Dispatcher struct {
    queue chan *job
    // ...
  }
)

workerの実装も合わせて変更します。

go func() {
  defer d.wg.Done()
  for j := range d.queue {
    j.proc(j.ctx)
  }
}()

さらにdispatcherは親contextとCancelFunc持つようにします。

type (
  Dispatcher struct {
    // ...
    ctx    context.Context
    cancel context.CancelFunc
  }
)

そして使う側がcontextを拡張して使えるようにcontextを返す関数を用意します。

func (d *Dispatcher) Context() context.Context {
  return d.ctx
}

job登録もcontextを使わないパターンと使うパターンで2種類用意します。
使わないパターンでは、dispatcherのctxをそのまま渡します。

func (d *Dispatcher) Add(proc func(context.Context)) {
  d.queue <- &job{proc: proc, ctx: d.ctx}
}

func (d *Dispatcher) AddWithContext(proc func(context.Context), ctx context.Context) {
  d.queue <- &job{proc: proc, ctx: ctx}
}

終了処理も変更しています。

Immediatelyな実装の方では、queueの読み捨てをやめてcontextのcancelを先に呼ぶようにしました。
ちゃんとImmediatelyに帰ってくるかはjobの実装次第ですが、読み捨てよりはお行儀が良い気がしてます。

func (d *Dispatcher) StopImmediately() {
  d.cancel()
  close(d.queue)
  d.wg.Wait()
}

Immediatelyじゃない実装もworkerが全部終了した後ですが、cancel()を実行しています。コレをしないとリークが発生します

func (d *Dispatcher) Stop() {
  close(d.queue)
  d.wg.Wait()
  d.cancel()
}

さて、ここまでやったので、元のgetをcontextを受け取って使うようにちょちょいっと修正して……

func get(ctx context.Context) {
  url := ctx.Value("url").(string)
  req, err := http.NewRequest("GET", url, nil)
  if err != nil {
    log.Fatal(err)
  }

  resp, err := http.DefaultClient.Do(req.WithContext(ctx))
  // ...
}

jobを登録するトコをこんなカンジで、context.WithValueでurlを渡すように修正します。

  for i := 0; i < 10; i++ {
    url := fmt.Sprintf("http://placehold.it/%dx%d", i, i)
    ctx := context.WithValue(d.Context(), "url", url)
    d.AddWithContext(get, ctx)
  }

こんなカンジでworkerパターンのcontext対応化が完了しましたー。

感想

httpのclientとかはcontextに対応していて、中断処理に対応したモノをさっくり作ることができてラクですねー。


(8/24 追記)
id:lestrrat さんからマサカリが飛んできております((((;゚Д゚))))ガクガクブルブル

workerパターンをcontext化してみたら…… - okzkメモ

context.Contextを揮発性のないものでラップして持つのはよくないと思う。このほうがよりGoっぽいと思うが、どうだろう https://gist.github.com/lestrrat/c9b78369cf9b9c5d9b0c909ed1e2452e

2016/08/23 18:34
b.hatena.ne.jp

コチラについて、別エントリを書くかも……

Re: golang の channel を使って Dispatcher-Worker を作り goroutine 爆発させないようにする

こちらを読みました。

blog.kaneshin.co

channel自体にdispatch機構があるからもっとシンプルに書けるのでは?と思って書き直したのがこちら

コードだけぶん投げてもアレなので、あとで解説書きます。

ついでに「go1.7で標準化されたcontext使ったらどうなるか」も気力次第で書くかもしれません。

というわけで追記というか、本編というか、解説です。

channel整理

元のコードを読んでいくと、

  1. dispatcherのqueueにjobを突っ込む
  2. dispatcherがidle状態のworkerをpoolから取り出す
  3. 同じ行で取り出したworkerのqueueにjobを渡す
  4. workerがjobを受け取って処理する

というカンジの処理の流れになってるんですが、dispatch機構自体がchannelにはあるので、

  1. dispatcherのqueueにjobを突っ込む
  2. workerがjobを受け取って処理する

とするだけでOKです。

ここまでで、idle状態のworkerを保存するpoolとworker毎のqueueのchannelが不要になりました。

終了処理

終了処理用にわざわざ別でquiteというchannelを使っていますが、このくらいシンプルなケースではchannelをcloseするだけでOKです。

worker側も単純にforループ回すだけになります。 ループはchannelがcloseされて空になった時点で抜けてくれます。

ここまででworkerが単なるgoroutineになりました。
また、queueをcloseすれば最終的に全workerが終了するので、dispatcherがworkerのリストを保持する必要もなくなりました。

Wait??

元コードのWaitは「queueにつっこんだjobの処理が全部終わって完全にidleになるまでブロックする」というコードになってます。

main()で一回しか呼ばれていないのでなんともいえないのですが、このWait()の実装意図が以下のように「ループで登録されたモノが全部処理されるまで待ち合わせたい」ものだとこの実装ではマズイです。

func run(d *Dispatcher) {
    for i := 0; i < 100; i++ {
        url := fmt.Sprintf("http://placehold.it/%dx%d", i, i)
        d.Add(url)
    }
    d.Wait()
    // 上でAddした処理が全部終わったらナニカしたい!!!
}

このrun()が並列でよばれると、自分で登録したモノだけじゃなく、他ので登録されたモノ全部が終わるまで待ってしまうからです。 この場合はrun()内で、個別にWaitGroupを作ってあげて、ゴニョゴニョしてあげる必要があります。

また、もしかするとWait()の実装意図は単に終了処理用で「全部のjobの処理が終わるまで待ちたい!」というモノかもしれません。 その場合はStop()に統合しちゃえばいいわけですし、書き直した方ではqueueをcloseした後、全部のgoroutineが処理を終えるまで待つようにしています。

Stop??

元コードは引数でimmediatelyかどうかを区別していますが「Stop()にboolな引数がある」というのが個人的には驚き最小ではないのでStop()とStopImmediately()でメソッド自体分けてしまってます。
こうしておけば「Stop()があってStopImmediately()があるということは、StopImmediately()はきっとなにか通常ではやらないことをするんだろうな」と身構えることができるので。

んでImmediatelyな実装の方は「queueにつっこまれたけど処理しないjob」が発生するというモノなんですけど、それを単にqueueから読み捨てるというカタチで書きなおしています。
ただし、workerで処理中のモノがあればその分だけは処理完了するまで待ち合わせます。

元コードが処理中のモノは終了処理用のchannelに書き込むだけで、処理中のモノの完了を待ち合わせていない(!?)ので、その点は挙動が異なっています。

余談ですけど、値に意味の無い終了処理用のchannelは、ダミー値を書き込むよりもcloseした方がいいです。
readしようとしているgoroutineが一つだけだったらいいんですけど、複数あったらその分書き込まなきゃいけませんから。

// コレよりも
w.quit <- struct{}{}
// こっちがベター
close(w.quit)

感想

書きなおしてみると、ただのworkerパターンになっちゃいました(;・∀・)
dispatcher周りのサンプルとしてなら、pub/subパターンの方を実装した方が良かったんじゃないかなぁ……

あと、これにcontextを使うと、標準的な方法でworker側で処理の中断制御ができるようになる……というハナシはまた後日で。

Re: Dockerに載せたサービスをホットデプロイする

こちらを拝見したところ、やりたいコトはdocker1.12のswarmモードで解決するんじゃないかなー、と思ってみたので試してみたテスト。

h3poteto.hatenablog.com

とりあえず、最新版のdocker(1.12)をインストールです。
手元の環境はCentOS7なので、インストールガイドに従ってゴニョゴニョと。

んでもって、swarm初期化。

# docker swarm init

今回1台だけなので、ノード追加は行いません。

次に実験用にnginxのサービスを立ち上げます。
ポイントはimage差し替え時に「停止→起動」の順番で動くので、あらかじめレプリカ数を2にしておいて、ちゃんとローリングで切り替わるようdelayを設定することです。

# docker service create --update-delay 20s -p 80:80 --name test --replicas 2 nginx

この状態でアクセスすると、オーバーレイネットワークでラウンドロビンしながら各コンテナが応答してくれます。

imageの更新は以下の様なカンジです。

# docker service update --image nginx:stable test

swarmがローリングでアップデートしてくれます。
応答も途切れません。インストールもdocker最新版だけでいいので最小限だし、オペレーションも簡単!やったね!


……のつもりだったんですけど、何度もservice updateを繰り返しているウチにレプリカ2コのウチ片方にアクセスが寄ってしまうという現象が発生してしまいました。
当然その状態で寄ってる方から停止すると、無応答時間が発生するわけで……(´;ω;`)ブワッ

再現したりしなかったりで、今のとこ原因特定には至ってません(;・∀・)
なにか、私の設定がマズイのかなぁ……

golangのGCとかgoroutineの状況を確認するライブラリ

golangで作った長時間動かすアプリで「goroutineリークやメモリリークがないか知りたい」とか「GCの影響がどの程度か知りたい」とかないですか?ありますよね?
そのためのログをダラダラ出力するためのライブラリを公開しました。

# 元々はクローズドなトコで作ったモノを、公開のため完全フルスクラッチで書きなおしてます。

github.com

使い方はmainのアタマとかに適当に組み込むだけです。

func main() {
  // 1分ごとにログにjson出力
  t := stats.SchedulePeriodically(time.Minute, func(s *stats.Stats) { log.Println(s) })
  defer t.Stop()

  // あと本来の処理を……
}

# 標準ロガーがアレなら、お好みのロガー使ってください。

String()で生成されるjsonはその時点のgoroutineの数とruntime.MemStatsをそのままMarshalしただけのやる気ないモノです。

……はい、なんというか、完全に手抜きですね(;・∀・)
とはいえ、まったく情報がないのと比較すると、トラブったときの調査の捗り方が違います( ー`дー´)キリッ

なお、個人的には適当にローカルに書き出しておいて後からjqで眺めてみたりぐらいしかやってないんですけど、ElasticsearchなりNorikraなりでアレコレするのも面白いかと思います。
# その場合はfluent/fluent-logger-golangを使うと捗る……のかな???

そんなこんなで、もうすぐリリースされるハズのgo1.7でGCがどれくらい改善されたか、とか確認できるといいですねー

marisa-trieのgo bindingを書いた

id:s-yataさんが公開してくださっているmarisa-triegolangで使えるようなバインディングを公開しました。

github.com

marisa-trieは非常に省メモリなtrie実装です。
特徴等は公式ドキュメントを参照してください。

インストールも普通にgo getするだけでOKです。よければどーぞ。
# g++とかstdlib++とかは必要になりますけど。

使い方等のドキュメントは……今後の課題ということで(;・∀・)

go getでインスコできるc/c++なライブラリのラッパーの作り方?

go buildはパッケージディレクトリにおいている.cや.ccファイルは一緒にコンパイルしてくれるのですが、サブディレクトリまでは面倒見てくれません。
かといってライブラリのソースをパッケージディレクトリにコピーしたりするとライブラリの更新の追従とかがしんどくなってきます。

そこで今回は、元ライブラリをgitのsubmoduleとした上で、コンパイル対象のファイルをincludeしただけのやる気ない.ccファイルを用意してみました。

このやり方がいいのかわかりませんが、とりあえずはgo getでまとめて全部コンパイルしてくれるし、ライブラリ更新の追従もgit submoduleでゴニョるだけでよいので、結果的にシンプルに出来たような気がします。

他にこういう時のうまいやり方があるのなら、教えて頂けるとウレシイなぁ。。。

golangで書いたアプリケーションをどう動かすか?

まとまりなく、何パターンか列挙します。

アプリケーションコンテナで動かす

通常ステートレスなアプリに限られると思いますけど、dockerで動かすというやり方です。
# 個人的にはdocker 1.12で組み込まれたswarmモードがすごくお手軽でよいと最近思ってます。

バイナリはstatic linkでビルドして、alpineで動かすと軽量でイイカンジです。
Dockerfileは以下みたいなカンジ

FROM alpine
RUN apk add --no-cache ca-certificates
COPY your_app /usr/local/bin/
CMD ["your_app"]

外部サービスにssl/tls接続するのに必要なのでca-certificatesを突っ込んでます。
証明書周りを自分でなんとかするんなら、busyboxにするのもアリかと。

supervisordで動かす

定番ですね。
個人的にはOSのお決まりのサービス管理方法(init.dやsystemd)と異なるレイヤが増えるので好きではないです。

initスクリプトを頑張って書く

/etc/init.d/hogehoge みたいなスクリプトを用意しておいて、以下のように使うイメージです。

# service hogehoge start

中身はnohupを使ってなんちゃってdaemonize、とかですね。
スクリプト書くのは正直ダルいですけど、前処理/後処理を柔軟に書けるので、systemd以前はコレを好んでやってました。

systemdのサービスで動かす

/etc/systemd/system/ 以下にserviceファイルを書いて、systemctlで頑張ります。

serviceファイルはミニマムだと以下のようなカンジです。

[Unit]
Description=hogehoge

[Service]
Type=simple
ExecStart=/path/to/hogehoge
ExecStop=/bin/kill -SIGTERM $MAINPID

[Install]
WantedBy = multi-user.target

ExecStartPre/ExecStartPost/ExecStopPostで前処理、後処理も書けます。
ExecStopPreは存在しませんが、停止時の前処理はExecStopが複数記述できるので、それで対応しましょう

なお、limitsの設定とか実行ユーザの指定も簡単です。

[Service]
LimitNOFILE=65536
User=hogehoge

また、Type=simpleの場合は、起動したタイミングでsystemdは起動完了とみなしてくれるんですが、 アプリケーション的に初期化に時間が掛かるケースもあると思います。

そんな時はアプリをsd_notifyに対応させた上で、Type=notifyにしてあげればイイカンジになります。

というわけでsd_notifyに対応するためのライブラリを書いてみました。
使い方はsampleを見ればすぐわかると思います。

Google App Engineで動かす

書いといてアレですけど、すみません、やったコトないです(;・∀・)
docker以上に軽量でいいんじゃないですかね? 知らんけど。

Herokuで動かす

やったコトないですけど、選択肢の一つとして。

AWS lamda?

公式対応があるといいなぁ。。。
# node経由やり方もある、らしいけど。。。