やってみる

アウトプットすべく己を導くためのブログ。その試行錯誤すらたれ流す。

組込ライブラリ(ConditionVariable)

 スレッドの同期機構のひとつ。超絶むずかしい。時期尚早。

成果物

情報源

ConditionVariable

スレッドの同期機構の一つである状態変数を実現するクラスです。

Condition Variable とは

あるスレッド A が排他領域で動いていたとします。スレッド A は現在空いていないリソースが必要になったので空くまで待つことにしたとします。これはうまくいきません。なぜなら、スレッド A は排他領域で動いているわけですから、他のスレッドは動くことができません。リソースを空けることもできません。スレッド A がリソースの空きを待っていても、いつまでも空くことはありません。

 排他領域ってなんすか? たぶん以下コードにおけるmutex.synchronize {...}のブロック内だと思われる。仮にそうだとしてもmutex.synchronizeが何なのか知らない。先にそれを説明してくれ。

Mutex(Mutal Exclusion = 相互排他ロック)は共有データを並行アクセスから保護するためにあります。

mutex = Mutex.new
mutex.synchronize {
  # mutex によって保護されたクリティカルセクション
}

 あと、以下URLもあった。

https://ruby-doc.com/docs/ProgrammingRuby/html/tut_threads.html#UF

 英語だったので日本語に翻訳する。

重要なデータを保護するためにミューテックスを使用するだけでは不十分な場合があります。クリティカルセクションにいるが、特定のリソースを待つ必要があるとします。スレッドがこのリソースを待ってスリープ状態になると、クリティカルセクションに入ることができないため、他のスレッドがリソースを解放できない可能性があります---元のプロセスはまだリソースをロックしています。重要な地域の独占的な使用を一時的に放棄し、同時にリソースを待っていることを人々に伝えることができる必要があります。リソースが利用可能になると、あなたはそれをつかむことができるようにする必要がありますし、ワンステップですべて、クリティカル領域のロックをreobtain。

 複数のスレッドで同一ファイルを使用するときの話。あるスレッドAでファイルを書込モードで開いたままでいると、その間ほかのスレッドはそのファイルを書込モードで開けない。なのでずっと待機しつづけるハメになる。ということで合っているかな? それを解決するのがConditionVariableだってのが次の話だよね?

ここで条件変数が登場します。条件変数は、リソースに関連付けられ、特定のミューテックスの保護内で使用される単なるセマフォです。利用できないリソースが必要な場合は、条件変数を待ちます。このアクションにより、対応するミューテックスのロックが解除されます。他のスレッドがリソースが使用可能であることを通知すると、元のスレッドは待機を解除し、同時にクリティカル領域のロックを回復します。

 セマフォってなんだよ。ググった。

セマフォとは、並列プログラミング環境での複数の実行単位(主にプロセス)が共有する資源にアクセスするのを制御する際の、単純だが便利な抽象化を提供する変数または抽象データ型である。

 セマフォ=条件変数(ConditionVariable)ということかな? なんの説明にもなっていない気がするけど。

セマフォは、ある資源が何個使用可能かを示す記録と考えればわかりやすく、それにその資源を使用する際や解放する際にその記録を「安全に」(すなわち競合状態となることなく)書き換え、必要に応じて資源が使用可能になるまで待つ操作が結びついている。

 ごめん、全然わかんない。

セマフォを使うことでプログラムにおける競合状態がなくなると保証するものではない。任意個の資源を扱うセマフォをカウンティングセマフォ、値が0と1に制限されている(ロック/アンロック、使用可能/使用不可の意味がある)セマフォをバイナリセマフォと呼ぶ。後者はミューテックスと同等の機能を持つ。

 いきなり矛盾している。「「安全に」(すなわち競合状態となることなく)」と書いていたのに、その舌の根も乾かぬうちに「競合状態がなくなると保証するものではない」と言っている。じゃあセマフォ、お前は結局なんなんだよ。たぶん使用可能数のことなのだろうが、よくわからん。

 たぶんミューテックス0/1の2値しか持てない。セマフォはそれを整数で持てるようになったものなのだろう。

 この競合問題を一般化したのが「食事する哲学者の問題」というやつらしい。

食事する哲学者の問題

 上記の円卓において、一人がスパゲティを食べるためには両手に1本ずつフォークを持つ必要がある。でもこの円卓では全員が食べれなくなる恐れがある。全員が一本ずつフォークを持ったら誰も食べれない。だれかがフォークを明け渡すまで。

 これは興味深い。車道の信号に似ている。道は一本しかないが、異なる方向に行く人々が同時にその道を使うと衝突してしまう。そのため信号で同期をとって解決している。

 同じ問題に、電車の降りる人と乗る人の動きがある。降りる人を先にすべて降ろしてから、次に乗る人を全員のせて出発する。

 食堂の座席もおなじ。10席しかないので同時に10人までしか食べられない。それ以上は待機の行列ができる。

 同時に使えるリソースには限りがある。そのため超過した人員はどうしても待機させられる。「何もできない」という無為な時間を大勢がすごす。なんたるムダ。まあ今はものによってスマホで予約や通知ができて同期問題が解決されつつある気がするが。

 車、電車、食堂はすべて「早いもの勝ち」である。もし同時だったらどうするか、というのが今回の問題だと思われる。

 公共の利益を最大化するための合理的な解決手段がほしいという話にも聞こえる。そうなれば全員の私益も必然的に最大化される。でも全員が欲張って常にチャンスを狙って、一本ずつフォークを持っていたら永久にチャンスが巡ってこない。個人の立場からみれば最適解のはずが、全体の構図からみると最悪の結果になる。とてもマヌケだが、いかにもやりそう。皮肉なことに勝者が誰もいない結果になる。そこが面白い。囚人のジレンマや自己矛盾につうじるもどかしさがある。

 これってお金や社会の話にも通じるのではないだろうか。庶民はギリギリ生活するだけのお金しか稼げない。だれも学習につぎこむお金や時間がない。大衆がバカなので一向に状況を改善できない。どころか愚衆と化して悪化する。民主主義では皆で赤信号を渡るようになる。民主主義崩壊。一部の権力者がそれを利用し、民衆を貧困化させ、民主主義から独裁に変化させるべく増税などで絞り上げてゆく。こうして独裁政治が復権した。私たち庶民は自らの欲望と、権力者の欲望によってデッドロックしているのではないか? そこに気づかぬ限り、永久に支配され続ける未来が確定してしまったデットロックの中を抜け出せないのでは?

 貯金しすぎて経済が回らないとかもデッドロックなんだろうな。まあ、実際は生活する金がないし、老後2000万円不足問題とか詐欺などもあって絶対に経済が回っていない理由はそんなことではないと思う。金を使わないのは金持ちであって、庶民は生活するだけで精一杯。「使える金がない」の間違い。富の再分配をしたとしても、支配者層に金がまわる仕組みになっているから金は流通せずデッドロックする。一部の裕福層間だけで金をつかんで離さないから天下の回りものではなくなり、社会も人の生活も回らなくなって、首が回らなくなる。アカン。

 とにかく、同時に使えるリソースが限られており、複数人で同時に使おうとして競合する。このとき条件の一部を保持しつづけていると、だれもすべての条件を満たせなくなってしまい、全員が条件を満たせないという状態になる。これがデッドロック。ということでいいよね?

 ドキュメントに戻ろう。

以上のような状況を解決するのが Condition Variable です。

 本当に?

スレッド a で条件(リソースが空いているかなど)が満たされるまで wait メソッドでスレッドを止めます。他のスレッド b において条件が満たされたなら signal メソッドでスレッド a に対して条件が成立したことを通知します。これが典型的な使用例です。

mutex = Mutex.new
cv = ConditionVariable.new

a = Thread.start {
    mutex.synchronize {
      ...
      while (条件が満たされない)
        cv.wait(mutex)
      end
      ...
    }
}

b = Thread.start {
    mutex.synchronize {
      # 上の条件を満たすための操作
      cv.signal
    }
}

 あらかじめ優先順位をつけておく、ということかな?

以下は [ruby-list:14445] で紹介されている例です。@q が空になった場合、あるいは満タンになった場合に Condition Variable を使って wait しています。

require 'thread'

class TinyQueue
  def initialize(max=2)
    @max = max
    @full = ConditionVariable.new
    @empty = ConditionVariable.new
    @mutex = Mutex.new
    @q = []
  end

  def count
    @q.size
  end

  def enq(v)
    @mutex.synchronize{
      @full.wait(@mutex) if count == @max
      @q.push v
      @empty.signal if count == 1
    }
  end

  def deq
    @mutex.synchronize{
      @empty.wait(@mutex) if count == 0
      v = @q.shift
      @full.signal if count == (@max - 1)
      v
    }
  end

  alias send enq
  alias recv deq
end

if __FILE__ == $0
  q = TinyQueue.new(1)
  foods = 'Apple Banana Strawberry Udon Rice Milk'.split
  l = []

  th = Thread.new {
    for obj in foods
      q.send(obj)
      print "sent ", obj, "\n"
    end
    q.send nil
  }

  l.push th

  th = Thread.new {
    while obj = q.recv
      print "recv ", obj, "\n"
    end
  }
  l.push th

  l.each do |t|
    t.join
  end
end

実行すると以下のように出力します。

$ ruby condvar.rb
sent Apple
recv Apple
sent Banana
recv Banana
sent Strawberry
recv Strawberry
sent Udon
recv Udon
sent Rice
recv Rice
sent Milk
recv Milk

 コード読んだけどわかんないや。

 たぶんセマフォはスレッド間で共有する変数のことなんだろう。でもコードではwaitsignalしかしていないんだよなぁ。まあいいや。スレッドは難しいので一旦放置する。

対象環境

$ uname -a
Linux raspberrypi 5.10.52-v7l+ #1441 SMP Tue Aug 3 18:11:56 BST 2021 armv7l GNU/Linux