-- https://github.com/simonmar/parconc-examples/blob/master/geturls1.hs
import Control.Concurrent
import Data.ByteString.Char8 as B
import System.Random
getURL :: String -> IO ByteString
getURL url = do
delay <- randomRIO (500000, 1500000) -- URLのコンテンツを取得する時間ということにする
threadDelay delay
pure (B.pack url)
example1 :: IO ()
example1 = do
m1 <- newEmptyMVar
m2 <- newEmptyMVar
forkIO $ do
r <- getURL "https://en.wikipedia.org/wiki/Shovel"
putMVar m1 r
forkIO $ do
r <- getURL "https://en.wikipedia.org/wiki/Spade"
putMVar m2 r
r1 <- takeMVar m1
r2 <- takeMVar m2
print (B.length r1, B.length r2)
これは『Haskellによる並列・並行プログラミング』リモート輪講 #10の発表資料です。
Haskellの非同期処理を使って並行に入出力を伴う処理を行うプログラムを書く方法について見ていきます。まず、複数のWebページを並行にダウンロードするようなタスクを考えます
newEmptyMVarで結果を受け取る箱を作る- 非同期に取得処理を行い、結果を
MVarに詰める MVarに結果が入るのを待つ
という共通する実装のパターンが見え隠れしているので共通化してみましょう
-- https://github.com/simonmar/parconc-examples/blob/master/geturls2.hs
newtype Async a = Async (MVar a)
async :: IO a -> IO (Async a)
async action = do
var <- newEmptyMVar
forkIO (action >>= putMVar var)
pure (Async var)
wait :: Async a -> IO a
wait (Async var) = readMVar var意図しないデッドロックを防ぐために wait では takeMVar ではなく readMVar を使っています
これを使えば example1 を以下のように書き換えることができます
-- https://github.com/simonmar/parconc-examples/blob/master/geturls2.hs
example2 :: IO ()
example2 = do
a1 <- async (getURL "https://en.wikipedia.org/wiki/Shovel")
a2 <- async (getURL "https://en.wikipedia.org/wiki/Spade")
r1 <- wait a1
r2 <- wait a2
print (B.length r1, B.length r2)
example2(36,35)
モナディックなコンビネータを使って更に簡潔に書くことも可能です
-- https://github.com/simonmar/parconc-examples/blob/master/geturls3.hs
sites =
[ "http://www.google.com"
, "http://www.bing.com"
, "http://www.yahoo.com"
, "http://www.wikipedia.com/wiki/Spade"
, "http://www.wikipedia.com/wiki/Shovel"
]
example3 :: IO ()
example3 = do
as <- mapM (async . getURL) sites
result <- mapM wait as
print $ fmap B.length result
example3[21,19,20,35,36]
Asyncでのエラー処理
getURL の中でエラーが起こった場合の挙動を見てみましょう
import Control.Exception
getURL' :: String -> IO ByteString
getURL' url = do
throwIO (ErrorCall "oops!")
pure (B.pack url)
example4 :: IO ()
example4 = do
as <- mapM (async . getURL') sites
result <- mapM wait as
print $ fmap B.length result
example4:
thread blocked indefinitely in an MVar operation
getURL' では async の中で putMVar が実行される前に例外が投げられてしまうので wait における readMVar が永遠にスレッドをブロックしてしまいます
これを安全な挙動に変えるために Async 周りの実装を修正してみしましょう
-- https://github.com/simonmar/parconc-examples/blob/master/geturls4.hs
newtype Async a = Async (MVar (Either SomeException a))
async :: IO a -> IO (Async a)
async action = do
var <- newEmptyMVar
forkIO (try action >>= putMVar var)
pure (Async var)
waitCatch :: Async a -> IO (Either SomeException a)
waitCatch (Async var) = readMVar var
wait :: Async a -> IO a
wait a = do
r <- waitCatch a
case r of
Left e -> throwIO e
Right a -> pure aasync と wait は以前のものと同じ型ですが、例外を適切に伝搬する仕組みを備えています
example5 :: IO ()
example5 = do
as <- mapM (async . getURL') sites
result <- mapM wait as
print $ fmap B.length result
example5:
oops!
最初に非同期処理が例外を投げた時点でプログラム全体が停止しているのが分かります
非同期処理の合流
ここでは並行に実行している非同期処理のどれか一つでも結果を返した時点で何らかの処理を行いたいような場合について見ていきます。
以下の例は並行に複数のWebサイトをダウンロードして
- 最初にダウンロードが完了したWebサイトの情報を表示する
- 残りのダウンロードが完了するのを待つ
という挙動を実装しています。
-- https://github.com/simonmar/parconc-examples/blob/master/geturls5.hs
import Control.Monad
example6 :: IO ()
example6 = do
m <- newEmptyMVar
let download url = do
r <- getURL url
putMVar m (url, r)
mapM_ (forkIO . download) sites
(url, r) <- takeMVar m
print $ url ++ ": " ++ show (B.length r)
replicateM_ 4 (takeMVar m)
-- 実行するたびに結果が変わる
example6"http://www.wikipedia.com/wiki/Shovel: 36"
これを明示的に MVar を用いずに Async を使って実装する事を考えましょう。
以下の waitEither は2つの Async を受け取って最初に完了した値を IO で返す関数です。
waitEither :: Async a -> Async b -> IO (Either a b)
waitEither a b = do
m <- newEmptyMVar
forkIO $ try (fmap Left (wait a)) >>= putMVar m
forkIO $ try (fmap Right (wait b)) >>= putMVar m
wait (Async m)この仕組は複数の Async のリストにも拡張することができます
waitAny :: [Async a] -> IO a
waitAny as = do
m <- newEmptyMVar
let forkwait a = forkIO $ try (wait a) >>= putMVar m
mapM_ forkwait as
wait (Async m)
-- 勉強会中に逆に全てのAsyncを待つようなコンビネータが作れるか?という質問があったので実装してみた例
waitAll :: [Async a] -> IO [a]
waitAll = mapM wait以上の実装を使って example6 は明示的に MVar を使わない形で書き換えることができます。
-- https://github.com/simonmar/parconc-examples/blob/master/geturls6.hs
example7 :: IO ()
example7 = do
let download url = do
r <- getURL url
pure (url, r)
as <- mapM (async . download) sites
(url, r) <- waitAny as
print $ url ++ ": " ++ show (B.length r)
mapM_ wait as
example7"http://www.wikipedia.com/wiki/Spade: 35"