PR

 最近,ネットの一部で「Erlang」(発音はアーランまたはエアラン)というプログラミング言語が流行している(参考リンク)。Erlangはスウェーデンの通信機器メーカーであるエリクソンにて開発された言語で,エリクソン内部や通信業界はもちろん,Twitter(関連記事)などのオンライン・サービスでも採用されているという。

 Erlangの特徴は,複数のプロセスが通信をしながら計算をする「並行プログラミング」を,言語の機能として強力にサポートしていることだ。C言語などの逐次プログラミング言語は,プロセスや通信の機能を内蔵しておらず,こうした機能をオペレーティング・システムに「外注」している。しかし,そのような外付けの仕組みは,メモリー消費量や速度,開発効率などの点でオーバーへッドが大きい。Erlangのような並行プログラミング言語では,プロセスや通信といった機能をプリミティブとして実装することにより,メモリー消費などのオーバーへッドを極めて小さくすることに成功している。

並行言語のモデル「π計算」

 LispやML,Haskellといったいわゆる「関数型言語」は,λ計算(lambda-calculus)という理論に基づいて設計・開発されている。同じように,並行言語のモデルの一つとして,π計算(pi-calculus)という理論がよく知られている(ちなみに円周率とは無関係)。一口にπ計算といってもいろいろなバージョンがあるのだが,ここでは細かい違いは気にせず大ざっぱに考えることにしよう。

 π計算には,以下のような4種類のプロセスがある。

構文 直観的な意味
new x in P 新しい通信チャンネル(メッセージ・キュー)xを作ってからプロセスPを実行する
send x to y 通信チャンネルyに値xを送信する
recv x from y in P 通信チャンネルyから値xを受信し,プロセスPを実行する
fork P in Q プロセスPを生成してからプロセスQを実行する

 例えば,次のようなプロセスを考える。

new x in
  fork
    send 3 to x
  in
    recv y from x in print y

 このプロセスは,まず新しい通信チャンネルxを作ってから,xに3を送信するプロセスを生成する。それから,xから値y(すなわち3)を受信して,画面に表示する。

π計算をOCamlで実装してみる

 理論やモデルだけではおもしろくないので,今回はπ計算をOCamlで実装してみよう。ただし,前回までの命令型言語(MyC)や関数型言語(MyFun)のようにインタプリタを作成するのではなく,OCamlのデータ型と関数でπ計算の機能を実現してみる。

 まず,π計算の通信チャンネルを実現するために,次のような代数データ型を定義する。

type 'a chan =
  | Senders of 'a list
  | Receivers of ('a -> unit) list

 この定義は以下のような意味だ。'a chanは,'a型の値を送ったり受け取ったりするための通信チャンネルの型である。π計算の通信チャンネルはメッセージ・キューのことなので,'a型のメッセージm1, m2, m3, ...がたまっている状態を,Senders [m1; m2; m3; ...]という値で表現することにしよう。また,一般に,値が送信されていない通信チャンネルから受信しようとしたプロセスはブロックする。そのような通信チャンネルの状態をReceivers [f1; f2; f3; ...] で表そう。ただし,f1, f2, f3, ...はブロックしているプロセスを表す関数である(詳しくは後述する)。

 新しい通信チャンネルを作るπ計算のプロセス「new x in P」は,次のような形のOCamlの式で表すことにしよう。

let x = newc () in P

 ただし,newcは以下のように定義される関数とする。

let newc () = ref (Senders [])

 OCamlの「let x = 式1 in 式2」は,「式1の結果を変数xにおき,式2を計算する」という意味だった。したがって,「let x = newc () in P」は「空のメッセージ・キューSenders []を作り,それをxとして,Pを実行する」という式になっている。これは「新しい通信チャンネルxを作ってから,プロセスPを実行する」という意図に一致する。

 ただし,次で見るように,通信チャンネルに送信や受信をしたら,メッセージ・キューの中身を変更しなければならない。そのために,ここではOCamlの参照(reference)という機能を利用している。OCamlでは「ref 式」と書くと,「式」を初期値とする記憶領域(参照セル)が作られ,その記憶領域へのポインタ(参照)が返される。参照セルの値を読み出すときは「!x」のように!演算子を使う。また,参照セルの値を変更するときは「x := 式」のように書く。

 さて,送信プロセス「send x to y」は以下のような式で表すことにしよう。

send y x

 関数sendの定義は下の通りだ。

(* 通信チャンネルyに値xを送る *)
let send y x =
  match !y with
  | Senders ss -> (* 受信プロセスがない *)
      (* キューの後に値を付け加える *)
      y := Senders(ss @ [x])
  | Receivers [] -> (* 同上 *)
      y := Senders [x]
  | Receivers(f :: rs) -> (* 受信プロセスがある *)
      (* 一つ(f)を取り出して残り(rs)は戻す *)
      y := Receivers rs;
      (* 取り出した受信プロセスに値を渡す *)
      f x

 yは通信チャンネルつまりメッセージ・キューへの参照だから,まずその中身がどうなっているかをチェックする。もし値を待っている受信プロセスがなければ,キューの後に値を付け加える(@はリストとリストの連結なので,「ss @ [x]」でリストssの末尾に要素xが追加される)。もし受信プロセスがあれば,一つを取り出して,残りをキューに戻しておく。そして,取り出した受信プロセスrに,送信しようとしている値xを渡す。

 また,受信プロセス「recv x from y in P」は,次のように書くことにしよう。

recv y (fun x -> P)

 この関数recvは以下のように定義する。

(* 通信チャンネルyから値を受信し,関数fを適用する *)
let recv y f =
  match !y with
  | Receivers rs -> (* 値がない *)
      (* ブロックした受信プロセスをキューに追加 *)
      y := Receivers(rs @ [f])
  | Senders [] -> (* 同上 *)
      y := Receivers [f]
  | Senders(x :: ss) -> (* 値がある *)
      (* 一つだけ(x)を取り出して残り(ss)は戻す *)
      y := Senders ss;
      (* 取り出した値を受信プロセスに渡す *)
      f x

 まず,sendの場合と同様に,yは通信チャンネルへの参照なので,その中身を確認する。もしキューに値がなければ,「xを受け取ってPを実行する」という関数f(つまりfun x -> P)をキューに追加する。この関数が,先にも触れた「ブロックした受信プロセス」を表すわけだ。一方で,もしキューに値があれば,それを一つだけ取り出し,xとして関数fに与える。

 最後に,π計算のプロセス生成「fork P in Q」は,OCamlでは次のように表せる。

P ; Q

 これにより,まずプロセスPが実行され,それがブロックまたは終了したら,プロセスQが実行される。OCamlの;はあくまで逐次実行なので,PとQが同時に実行されるわけではない。しかし,もしPが通信チャンネルからの受信でブロックしたら,Pが実行している処理はメッセージ・キューに保存され,Qへ制御が移る。このように,並行プロセスの動作が実現される。

実際に並行プログラミングをしてみる

 以上に示した型('a chan)と関数(newc, send, recv)を定義したら,まず,前に示した簡単なプロセスの例を試してみよう。

# let x = newc () in
  send x 3;
  recv x (fun y ->
    Printf.printf "%d\n" y) ;;
3
- : unit = ()

 まずnewcにより新しい通信チャンネルxが作られる。次にsendにより,xの指すメッセージ・キューに整数3が入れられる。それがrecvにより取り出され,yとして渡されて画面に表示されるわけだ。

 次の例では,まず新しい通信チャンネルcを作り,cから整数を受信して表示するプロセスrepeat ( )を生成している。ただし,単に受信した整数を表示するだけでなく,再びrepeat ( )自身を生成する。これにより,1回だけでなく何回でも,cから整数を受信して画面に表示することが可能になる。

# (* 新しい通信チャンネルcを作る *)
  let c = newc () ;;
val c : '_a chan ref = {contents = Senders []}
# (* プロセスrepeat ()を再帰で定義 *)
  let rec repeat () =
    (* cから整数iを受信 *)
    recv c (fun i ->
      (* iを画面に表示 *)
      Printf.printf "%d\n" i;
      (* またrepeat ()自身を生成 *)
      repeat ()) ;;
val repeat : unit -> unit = <fun>
# (* 最初のrepeat ()を生成 *)
  repeat () ;;
- : unit = ()
# (* cに1を送信 *)
  send c 1 ;;
1
- : unit = ()
# (* cに2を送信 *)
  send c 2 ;;
2
- : unit = ()
# (* 何度でも送信できる *)
  send c 3 ;;
3
- : unit = ()

 また,π計算では「通信チャンネルを通して通信チャンネルを送る」ことも可能だ。これを利用して,いわば「関数サーバー」をプロセスで実現できる。

# (* サーバーがリクエストを受け付けるための
     通信チャンネルservcを作る *)
  let servc = newc () ;;
val servc : '_a chan ref = {contents = Senders []}
# (* サーバー・プロセスserv ()を再帰で定義 *)
  let rec serv () =
    (* servcから整数iと,返信のための
       通信チャンネルrepcの組を受け取る *)
    recv servc (fun (i, repc) ->
      (* repcにiの2乗を返す *)
      send repc (i * i);
      (* serv自身を再び生成 *)
      serv ()) ;;
val serv : unit -> unit = <fun>
# (* サーバー・プロセスを起動 *)
  serv () ;;
- : unit = ()
# (* 返信のためのチャンネルrを作る *)
  let r = newc () ;;
val r : '_a chan ref = {contents = Senders []}
# (* サーバーに整数123とrを送る *)
  send servc (123, r) ;;
- : unit = ()
# (* rから答えの整数jを受け取り表示 *)
  recv r (fun j ->
    Printf.printf "%d\n" j) ;;
15129
- : unit = ()
# (* サーバー・プロセスは何回でも
     呼び出すことができる *)
  send servc (45, r) ;;
- : unit = ()
# recv r (fun j ->
    Printf.printf "%d\n" j) ;;
2025
- : unit = ()

 これをさらに応用し,第3回で登場したフィボナッチ数列を,並行プロセスを利用して計算してみよう(ただし,今回の実装では実際に複数のプロセスが同時に動作するわけではないので,並列処理による性能向上はない)。

# (* サーバーがリクエストを受け付けるための
     通信チャンネルfibcを作る *)
  let fibc = newc () ;;
val fibc : '_a chan ref = {contents = Senders []}
# (* フィボナッチ・サーバーfib ()を定義 *)
  let rec fib () =
    (* fibcから引数nと,返値を送るための
       通信チャンネルrepcの組を受け取る *)
    recv fibc (fun (n, repc) ->
      (* またfib ()自身を生成しておく *)
      fib ();
      if n <= 1 then
        (* nが1以下ならnを返信 *)
        send repc n
      else
        (* フィボナッチ・サーバー自身を利用して
           引数がn-1とn-2の場合の返値を計算 *)
        let repc1 = newc () in
        let repc2 = newc () in
        send fibc (n - 1, repc1);
        send fibc (n - 2, repc2);
        recv repc1 (fun rep1 ->
          recv repc2 (fun rep2 ->
            (* 二つの返値を足してrepcに返信 *)
            send repc (rep1 + rep2)))) ;;
val fib : unit -> unit = <fun>
# (* フィボナッチ・サーバーを起動 *)
  fib () ;;
- : unit = ()
# (* 返値を受け取るための通信チャンネルrを作る *)
  let r = newc () ;;
val r : '_a chan ref = {contents = Senders []}
# (* 引数とrを送信 *)
  send fibc (10, r) ;;
- : unit = ()
# (* rから返値mを受け取って表示 *)
  recv r (fun m ->
    Printf.printf "fib(10) = %d\n" m) ;;
fib(10) = 55
- : unit = ()

プロセス・スケジューリングの改良

 今回の実装では,π計算のプロセス生成「fork P in Q」をOCamlの逐次実行「P ; Q」で済ませてしまったので,生成されるプロセスPは必ずQより先に実行される。これを変更するためには,例えば以下のような式でプロセス生成を実現すればいい。

fork
  (fun () -> P)
  (fun () -> Q)

 ただしforkは次のように定義される関数とする。

let fork f g =
  (* 乱数が0だったらPを先に,
     1だったらQを先に実行 *)
  if Random.int 2 = 0 then
    (f (); g ())
  else
    (g (); f ())

 また,今回の実装では,走り始めてしまったプロセスは,終了するか受信でブロックするまで永久に動き続ける。余裕のある方は,プロセスが他のプロセスに実行を譲る操作(yield)を実現する方法を考えてみると面白いだろう。

著者紹介 住井 英二郎

 東北大学 大学院 情報科学研究科 准教授。今回のような「超軽量並行プロセス」は,OCamlでなくとも,いわゆるクロージャ(高階関数)のある言語ならば同様に実装することが可能なはずです。RubyやJavascript等で書いたらどうなるかも考えてみると面白いのではないかと思います。また,高階関数ではなく,「継続」ないし「コルーチン」という仕組みにより実装することもでき,Fiberという名前で呼ばれることもあるようです。これらの機構は関数型言語だけでなくC言語やRubyなどでも利用できるようになりつつあり,今後の展開が注目されます。