PR

共有メモリーを使ったI/Oの排他制御

 制御の本質が状態の操作にあるのなら,STMを使ってI/O処理を制御するのは難しくありません。例として,複数のスレッドでファイルの書き込みを行うプログラムを見ていきましょう。

 特に制御を行わない素朴な実装は以下のようになります。

import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad
import System.IO

data Work = Running | Finished deriving Eq

text = "sample.txt"

write :: Int -> String -> IO ()
write n str = sequence_ $ replicate n $ writeFile text str

main = do
    let times = 6
    xVar <- newTVarIO Running
    yVar <- newTVarIO Running
    forkIO $ do
        write times "IO 1"
        atomically $ writeTVar xVar Finished
    forkIO $ do
        write times "IO 2"
        atomically $ writeTVar yVar Finished
    atomically $ do
        x <- readTVar xVar
        y <- readTVar yVar
        check (x == Finished && y == Finished)

 Work型は,mainで実行されているスレッド(メイン・スレッド,main thread)が子スレッドの実行の終了を待つために定義しています。このプログラムは,GHCiなどの対話環境を使った場合には問題なく動作しますが,実行可能ファイルとして作成した場合には,メイン・スレッドが終了した時点でプログラムが終了してしまいます(参考リンク)。これでは思った通りの動作を実現できません。

 そこで,前回TMVarを実装するために使った「retryを用いてロックを実現する」というテクニックを採用することにしました。checkは,値がTrueにならない場合にretryを呼ぶ関数です。

check :: Bool -> STM a
check b = if b then return undefined else retry

 ここに条件式を置くことで,その条件を満たさない限りretryを呼ぶという処理を簡潔に記述できます。ここでは,他のすべてのスレッドが実行を終えてFinishedという値を設定するまで待つ,という定義にしています。

 次に,ファイルの書き込み処理を行うwriteを見てみましょう。replicateは,ある値をn回繰り返したリストを作成する関数で,Preludeに用意されています。

Prelude> :t replicate
replicate :: Int -> a -> [a]
Prelude> replicate 6 12
[12,12,12,12,12,12]

 これを使ってI/O処理のリストを作成し,第7回で説明したsequence*を使って先頭から順にアクションをつなぎ合わせることで,n回の繰り返し処理を実現できます。Control.Monadモジュールには,replicateとsequence*を組み合わせたreplicateM*がすでに定義されているので,代わりにそれを利用することもできます。

Prelude Control.Monad> :t replicateM
replicateM :: (Monad m) => Int -> m a -> m [a]
Prelude Control.Monad> :t replicateM_
replicateM_ :: (Monad m) => Int -> m a -> m ()

 それでは実行してみましょう。System.IOは,同じファイルに対する書き込みは一つしか許さないというロック機構を持つように定められています(参考リンク)。したがって,プログラムの実行は失敗に終わるはずです。

*Main> :main
Loading package stm-2.0 ... linking ... done.
<interactive>: sample.txt: openFile: permission denied (Permission denied)

Interrupted.

 あるいは

*Main> :main
Loading package stm-2.0 ... linking ... done.
<interactive>: sample.txt: openFile: resource busy (file is locked)

Interrupted.

 GHCiでは,ファイルにアクセスできなかったことを示すエラー・メッセージが表示され,そのまま答が返ってこなくなります。処理系が答を返さなくなるのは,ファイルにアクセスするスレッドがエラーを起こして終了した結果,メイン・スレッドがいつまでもブロックされた状態になってしまうからです。

 実際,実行可能ファイルを作成して実行した場合には,スレッドがずっとブロックされていることを示すエラーが表示され,プログラムが終了します。

$ ghc --make STMHandle.hs -O
[1 of 1] Compiling Main             ( STMHandle.hs, STMHandle.o )
Linking STMHandle.exe ...

$ ./STMHandle.exe
STMHandle.exe: sample.txt: openFile: permission denied (Permission denied)

STMHandle.exe: thread blocked indefinitely

 では,いよいよ「状態」を使って明示的にI/O処理を制御してみましょう。

 まず簡単に思いつくのが,同時に処理を行わせようとすると破綻する可能性がある部分,「危険領域(Critical Section,クリティカル・セクション)」をロックにより保護する方法です。特に,同時に一つの処理しか行わせないようにする手法を「ミューテックス(Mutex)」といいます(参考リンク1参考リンク2)。Mutexは,MUTual EXclusion(相互排他)の略です。同じファイルに対して同時に複数の書き込みを行うことは許されないので,I/O処理の制御にはミューテックスが使えそうです。

 ところが,System.IOは暗黙的にファイルに対してロックを行うものの,ロックを明示的に制御するための仕組みは何も用意していません。hIsWritable:: Handle -> IO Boolのような問い合わせのためのアクションがあるだけです。すぐに考え付くのは,hIsWritableでハンドルの状態をチェックしながらハンドルを利用できるようになるまでループで待ち続ける,いわゆるビジー・ウェイト(busy waiting)というやり方でしょう。しかし,ビジー・ウェイトよりも,retry後のTVarの値の更新待ちやMVarなどを使って処理をロックしたほうが,CPUの処理を無駄に費やさなくて済みます。ここではLock型を定義し,それを使ってSTMで処理をブロックすることにします。

data Lock = Open | Close deriving Eq

lock res = writeTVar res Close
unlock res = writeTVar res Open
isLock res = if res == Close then True else False

write :: Int -> String -> TVar Lock -> IO ()
write n str res = replicateM_ n $ blockAct n (writeFile text str) res

blockAct :: Int -> IO a -> TVar Lock -> IO ()
blockAct n act res = do
    atomically $ do
        v <- readTVar res
        check (not $ isLock v)
        lock res
    act
    atomically $ unlock res

main = do
    let times = 6
    res <- newTVarIO Open
    xVar <- newTVarIO Running
    yVar <- newTVarIO Running
    forkIO $ do
        write times "IO 1" res
        atomically $ writeTVar xVar Finished
    forkIO $ do
        write times "IO 2" res
        atomically $ writeTVar yVar Finished
    atomically $ do
        x <- readTVar xVar
        y <- readTVar yVar
        check (x == Finished && y == Finished)
    -- return ()

 blockActの部分で,「他のスレッドによって処理が行われていないとき(=処理がブロックされていないとき)に他の処理をブロックする形で処理を行い,自分の仕事が終わったら処理を解放する」という制御を行っているのがわかりますね。

 ブロックによってスレッド間の処理の衝突を未然に防げるようになったため,今度は問題なく実行できます。

$ ghc --make STMHandle.hs -O
[1 of 1] Compiling Main             ( STMHandle.hs, STMHandle.o )
Linking STMHandle.exe ...

$ ./STMHandle.exe

 対話環境では評価結果がundefinedであることを示すエラーが出ますが,これはメイン・スレッドの最後のcheckの返り値であるundefinedを評価したものなので,基本的には無害です。

Prelude Main> :main
*** Exception: Prelude.undefined

 どうしても気になる場合には,コメントアウトしている最後の行のreturn ( )を定義に加えることで,main変数の返り値を変更してください。

*Main> :main
*Main>

 ただ,writeではblockActを使った処理以外のアクションを行っていないため,解放したロックを同じスレッドがまたすぐにかける可能性が高くなってしまっています。これではスレッド間の処理の公平性(fairness)が満たされず,スレッドを使って並行に処理させた意味がありません。どうすればよいでしょうか?

 実はControl.Concurrentには,指定したミリ秒の間,スレッドを一時停止(suspend)させるためのthreadDelayという処理が用意されています。

Prelude Control.Concurrent> :t threadDelay
threadDelay :: Int -> IO ()

 threadDelayとSystem.Randomモジュールを使って生成した乱数を組み合わせてrandomDelayというアクションを作ることで,スレッドを偏りなく実行できるようになります。なお,System.Randomモジュールは現在はbaseパッケージですが,GHC6.8以降は独立したrandomパッケージに入る予定です。

import System.Random

randomDelay :: IO ()
-- Delay for a random time between 1 and 100 microseconds
randomDelay = do { waitTime <- getStdRandom (randomR (1, 100))
                 ; threadDelay waitTime }

blockAct :: Int -> IO a -> TVar Lock -> IO ()
blockAct n act res = do
    randomDelay
    atomically $ do
        v <- readTVar res
        check (not $ isLock v)
        lock res
    act
    atomically $ unlock res

 randomRは乱数の範囲を決める関数,getStdRandomはその範囲から乱数を実際に生成するためのアクションです。

Prelude System.Random> :t randomR
randomR :: (Random a, RandomGen g) => (a, a) -> g -> (a, g)
Prelude System.Random> :t getStdRandom
getStdRandom :: (StdGen -> (a, StdGen)) -> IO a

 getStdRandomは,大域に一つ存在するStdGen型の乱数生成機を暗黙的に取得し,それを使って乱数を生成するアクションとして定義されています(参考リンク)。ごく単純な乱数の生成はそう難しくありません。

 writeのwriteFile text strの部分をprint strに置き換えれば,スレッドによる処理の実行に偏りがないことが実際にわかります。

*Main> :main
"IO 1"
"IO 2"
"IO 1"
"IO 2"
"IO 2"
"IO 1"
"IO 2"
"IO 1"
"IO 2"
"IO 1"
"IO 2"
"IO 1"