PR

並行HaskellとSTM

 それでは,parIOとそこで使われている並行処理について見ていくことにしましょう。parIOの定義を再掲しておきます。

module Por where
import Prelude hiding (catch)
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
import System.IO.Unsafe

parIO a1 a2
 = do m <- newEmptyTMVarIO
      c1 <- forkIO (child m a1)
      c2 <- forkIO (child m a2)
      r <- atomically $ takeTMVar m
      killThread c1
      killThread c2
      return r
 where
     child m a = catch (do
         b <- a
         atomically $ putTMVar m b)
       (\e -> return ())

 このうち,childのcatchの部分は上で紹介した例外処理であることがわかると思います。無限ループになる変数gがNonTerminationかAsyncException StackOverflowの例外を生じるため,catchを使って例外を握りつぶしています(本来はcase式を使って扱う例外のみを取得し,残りの例外はthrowを使って再発生(redirect)させたほうがよいのですが,わかりやすさのために今回は省略しました)。

Prelude Control.Exception> :i AsyncException
data AsyncException = StackOverflow | HeapOverflow | ThreadKilled
        -- Defined in GHC.IOBase
instance Eq AsyncException -- Defined in GHC.IOBase
instance Ord AsyncException -- Defined in GHC.IOBase
instance Show AsyncException -- Defined in GHC.IOBase

data Exception = ... | AsyncException AsyncException | ...
        -- Defined in GHC.IOBase
Prelude Control.Exception> evaluate g
*** Exception: stack overflow

 次にparIOの肝である,スレッドを使った処理について見ていきましょう。スレッドを利用するための処理の多くは,Control.Concurrentモジュールから利用できます。

 forkIOはスレッドを生成して,そのスレッドでI/Oアクションを行うためのものです。

Prelude Control.Concurrent> :i forkIO
forkIO :: IO () -> IO ThreadId  -- Defined in GHC.Conc

 forkIOで生成したスレッドは,一般的なスレッドと同様に親スレッドが終了すれば終了します。また,killThreadにThreadIdを与えて明示的に終了させることもできます。

Prelude Control.Concurrent> :i killThread
killThread :: ThreadId -> IO ()         -- Defined in GHC.Conc

 なお,forkIOで生成されるスレッドはネイティブ・スレッドを利用したものではなく,あくまで処理系の側で実装されるユーザーレベル・スレッドです。ネイティブ・スレッドを利用するには,GHCやHaskell'で用意されているforkOSを使う必要があります。

Prelude Control.Concurrent> :i forkOS
forkOS :: IO () -> IO ThreadId  -- Defined in Control.Concurrent

 forkOSなどのOSのネイティブ・スレッドと結びついた「結合スレッド(bound thread)」を使うための機能は,Haskell'では「処理系が特に対応する必要がない選択可能な機能」になる予定です。なので,処理系によってはforkIOのような「非結合スレッド(unbound thread)」を使った処理しか利用できない可能性があることに注意してください。

 なお,GHCでforkOSを利用する場合には,-threadedオプションをつけてコンパイルするのを忘れないでください。-threadedオプションを忘れると,以下のエラーが出ます。

$ ./Por
Por: user error (RTS doesn't support multiple OS threads (use ghc -threaded when linking))

 それぞれのスレッド間で値を受け渡すのには,stm(Software Transactional Memory)パッケージのControl.Concurrent.STMモジュールから利用できるSTMモナドを利用しています。STMは共有メモリーの利用方法としてロック(lock)による排他制御ではなく,データベースのトランザクションのアイデアを取り入れたものです。STMは,以下の三つの特徴を持ちます。

  • STMを使ったアクションは「原子性(atomicity)」を持つので,途中で処理に割り込まれて不正な状態になるのを防げる

  • STMではロックを使用しないので,デッドロック(dead lock)が起こることはない

  • STMではロックを使用しないので,STMを使って書いた処理同士を何の問題もなく互いに組み合わせられる(合成性を持つ)

 STMモナドではこれらを,TVar型の値を監視することで実現しています。TVarは「トランザクション変数(Transactional Varaiable)」という意味です。STMモナドを使った処理の最中にTVar型の値が別の処理によって書き換えられた場合,それまでの実行を取り消し(cancell)てもう一度処理を実行するのです。

 こうした処理がきちんとした形で行われるためには,STMが実行するアクションの内部に取り消し不可能な処理が入り込むことを防がなくてはなりません。内部にI/Oアクションが入り込まないように,STMはI/Oとは別のモナド(型)になっているのです。これによってバグが入り込むのを未然に防げます。代わりに,原子性を持った実際のI/OアクションにSTMアクションを移すには,atomically関数を使用する必要があります。

Prelude Control.Concurrent.STM> :i atomically
atomically :: STM a -> IO a     -- Defined in GHC.Conc

 newEmptyTMVarIO,takeTMVar,putTMVarの定義は,以下の通りです。

newtype TMVar a = TMVar (TVar (Maybe a))

newTMVarIO :: a -> IO (TMVar a)
newTMVarIO a = do
  t <- newTVarIO (Just a)
  return (TMVar t)

takeTMVar :: TMVar a -> STM a
takeTMVar (TMVar t) = do
  m <- readTVar t
  case m of
    Nothing -> retry
    Just a  -> do writeTVar t Nothing; return a

putTMVar :: TMVar a -> a -> STM ()
putTMVar (TMVar t) a = do
  m <- readTVar t
  case m of
    Nothing -> do writeTVar t (Just a); return ()
    Just _  -> retry

 readTVarとwriteTVarは名前の通り,TVar型の値を読み書きするためのものです。newTVarIOは,TVar型を作成する関数newTVarの派生形で,STM型ではなくIO型にくるんだ形でTVar型の値を作成します。あまり複雑な値を生成しない場合,いちいちatomicallyを使って値を取り出すのは面倒なので,補助関数としてnewTVarIOを別途提供しているのです。

Prelude Control.Concurrent.STM> :i readTVar
readTVar :: TVar a -> STM a     -- Defined in GHC.Conc
Prelude Control.Concurrent.STM> :i writeTVar
writeTVar :: TVar a -> a -> STM ()      -- Defined in GHC.Conc
Prelude Control.Concurrent.STM> :i newTVar
newTVar :: a -> STM (TVar a)    -- Defined in GHC.Conc
Prelude Control.Concurrent.STM> :i newTVarIO
newTVarIO :: a -> IO (TVar a)   -- Defined in GHC.Conc

 retryはユーザー自身の手で,STMアクションの取り消しや再実行を行うためのものです。

Prelude Control.Concurrent.STM> :i retry
retry :: STM a  -- Defined in GHC.Conc

 これを使うことで,TVarの値が望ましいものになるまで何度も処理をやり直すことができます。takeTMVarでは,putTMVarなどによってTMVar型の中身が空(Nothing)でなくなるまで,何度も処理をやり直します。逆にputTMVarでは,まだtakeMVarなどによってTMVarの中身が空になっておらず,何らかの値を持つ(Just a)の場合,処理をやり直します。

 こう書くと,retryによって無駄なビジー・ループ(busy loop)が実行されるように見えるかもしれません。しかし実際には,STMではTVarの値を監視し,TVarの値が変化するまでアクションの再試行が行われないように実装することで,このような非効率性の問題を回避しています。

 parIOでは,これらを利用することで,「どちらかのアクションが終了するまで待ち,どちらかが終了した時点で値を確定する」ような処理を記述しているのです。なお,Control.Concurrent.MVarモジュールには,排他制御のためにSTMやretryではなくロックを使用した,MVar型という別の実装もあります。先ほどから出てきているTMVarという名前は,「TVarでMVarを実装したもの」という意味です。

 さて,ここでparIOの実装に使用されている「TMVar型の変数と.変数に対する操作」について見てみることにしましょう。parIOで使用されているTMVar型の変数は,最初に作成したmただ一つです。また,変数mに対する操作も,それそれのスレッドで1度だけしか行われません。このため,TVarで定義したTNVar型を使ってSTMで処理させずに,ロックを使うMVarのほうを使用したとしても,特に問題が生じることはないでしょう。なので,parIO程度の実装では,今回のようにわざわざSTMモナドを利用する必要はないかもしれませんね。

 MVarは,先ほどのTVarによる定義を見てわかる通り,「空かあるいは値を持つ,変更可能な記憶領域(mutable location that is either empty or contains a value)を示す変数」という意味です(参考リンク)。

Prelude Control.Concurrent.MVar> :i newMVar
newMVar :: a -> IO (MVar a)     -- Defined in GHC.Conc
Prelude Control.Concurrent.MVar> :i takeMVar
takeMVar :: MVar a -> IO a      -- Defined in GHC.Conc
Prelude Control.Concurrent.MVar> :i putMVar
putMVar :: MVar a -> a -> IO ()         -- Defined in GHC.Conc

 それでは||||演算子を実際に使用してみましょう。以下のように,正しい結果が出力されていることがわかります。

*Por> f |||| g && g |||| f && (not $ False |||| False)
Loading package stm-2.0 ... linking ... done.
True
*Por> f |||| g && g |||| f && (False |||| False)
False
*Por> f |||| g && g |||| f && (not $ False |||| False) && (f |||| g)
True

 ただ,gが無限ループであるため,使用する環境によっては実装上の問題から答えを返さないこともあるかもしれません。手元のWindows XPとGHC6.6.1の組み合わせでは,GHCiの対話環境または最適化オプション(-O<n>)を使って作成した実行可能ファイルでのみ,答えが返ってきました。

参照型(**Ref)とMVarやTVarの違い

 今回は,スレッド間で「状態」を共有するための型として,新たにTVarとMVarという型を紹介しました。

 参照型とTVarやMVarには以下のような違いがあります。

 参照型自身には,同期のための仕組みは何も組み込まれていません。そのため軽量ですが,複数スレッド間での使用には適しません。ただ,TVarやMVarとは異なり,I/Oアクションの中で利用する必要のあるIORefに加えて,状態を外に持ち出さないならI/O以外の場所でも使用できるSTRefが用意されています。

 TVarでは,トランザクション・メモリーを使って同期処理を行います。このため初期化に少々負荷がかかりますが,一度初期化した後は,ロックを使った場合に比べ比較的低いコストでアクセスできます(参考リンク)。

 MVarでは,同期処理を「中に値が存在するとき,あるいは中身が空のとき」といった特定条件下で処理をブロック(block)することによって行います。よって,デッドロックを引き起こす可能性があることに注意しなければなりません。

型名使用時の負荷複数スレッドでの共用I/O以外での使用
参照型(**Ref)
TVar△(初期コストが高い)×
MVar△~×(排他ロックが重い)○(デッドロックに注意)×

 なお,MVarは空の状態を持つという性質上,本文中で見たように「値の取得と書き込みを行う組み込み処理」が他のものとは異なっています。とはいえ,ライブラリでは補助関数としてtakeMVarとputMVarをくっつけたreadMVarやmodifyMVar*なども定義されいるので,この違いについてはあまり考えなくてよいと思います(少々型は異なりますが)。

Prelude Data.IORef> :i modifyIORef
modifyIORef :: IORef a -> (a -> a) -> IO ()
         -- Defined in Data.IORef
Prelude Data.IORef> :m Control.Concurrent.MVar
Prelude Control.Concurrent.MVar> :i newMVar
newMVar :: a -> IO (MVar a)     -- Defined in GHC.Conc
Prelude Control.Concurrent.MVar> :i readMVar
readMVar :: MVar a -> IO a      -- Defined in Control.Concurrent.MVar
Prelude Control.Concurrent.MVar> :i modifyMVar
modifyMVar :: MVar a -> (a -> IO (a, b)) -> IO b
         -- Defined in Control.Concurrent.MVar
Prelude Control.Concurrent.MVar> :i modifyMVar_
modifyMVar_ :: MVar a -> (a -> IO a) -> IO ()
         -- Defined in Control.Concurrent.MVar

 modify**は,write**とは違って,引数に新しい値ではなく現在の値を変化させるための関数を取ります。もっとも,\_ -> 11 やPreludeのconstなどを使い,現在の値を棄ててしまえば同じことです。

Prelude Data.IORef> newIORef 11 >>= \x -> modifyIORef x (\_ -> 12) >> readIORef x
12
Prelude Data.IORef> const 12 11
12
Prelude Data.IORef> newIORef 11 >>= \x -> modifyIORef x (const 12) >> readIORef x
12
Prelude Data.IORef> :m Control.Concurrent.MVar
Prelude Control.Concurrent.MVar> newMVar 11 >>= \x -> modifyMVar_ x (\y -> return $ y+1) >> readMVar x
12
Prelude Control.Concurrent.MVar> newMVar 11 >>= \x -> modifyMVar_ x (\_ -> return $ 12) >> readMVar x
12

 ただし,modifyMVarは少し特殊です。組のうち一つ目を書き込み,もう一つをそのまま返します(二つ目の型は異なっていても構いません)。

Prelude Control.Concurrent.MVar> newMVar 11 >>= \x -> modifyMVar x (\y -> return (y+1, "11")) >> readMVar x
12
Prelude Control.Concurrent.MVar> newMVar 11 >>= \x -> modifyMVar x (\y -> return (y+1, "11"))
"11"

著者紹介 shelarcy

 STM(Software Transactional Memory)という名前は,ソフトの側で,内部的にロックなどを使用して「トランザクション・メモリー(TM:Transactional Memory,トランザクショナル・メモリー)」を実現することから付けられたものです。一方,最近ではCPUのマルチコア化に伴い,トランザクション・メモリーの実装にハードウエアを利用するHTM(Hardware Transactional Memory)などの話もよく聞くようになりました(参考リンク1参考リンク2)。

 しかし,こうしたトランザクション・メモリーの技術の隆盛が一つの問題を生んでいます。ここまでの話の流れからわかるように,**TMという名前の略語があふれかえり始めているのです。GHCのスレッドの再実装にも,STMよりも低レベルのトランザクション・メモリーに対してPTM(Primitive Transactional Memory)などの名前が使われています。まあ,これはあくまで型名として用いられているので,あまり周囲を混乱させるようなことはないかもしれませんが…。似たような略語が乱立するとわかりにくいので,略語の増加は最小限に留めて欲しいような気がします。例えば,「HTM」のようにすべての単語の頭文字を取った略称を使うのではなく,「Hardware TM」のように他と比べて特徴的な部分を示す単語を省略しないほうが,その用語が何を指すかがわかりやすいでしょう。

 しかし,現状を見る限り,そういう方向には当分向かいそうにありません。この流れが収束するのは,はたしていつのことになるやら。