-- https://github.com/simonmar/parconc-examples/blob/master/geturls1.hs
import Control.Concurrent
import Data.ByteString.Char8 as B
import System.Random
getURL :: String -> IO ByteString
= do
getURL url <- randomRIO (500000, 1500000) -- URLのコンテンツを取得する時間ということにする
delay
threadDelay delaypure (B.pack url)
example1 :: IO ()
= do
example1 <- newEmptyMVar
m1 <- newEmptyMVar
m2
$ do
forkIO <- getURL "https://en.wikipedia.org/wiki/Shovel"
r
putMVar m1 r
$ do
forkIO <- getURL "https://en.wikipedia.org/wiki/Spade"
r
putMVar m2 r
<- takeMVar m1
r1 <- takeMVar m2
r2 print (B.length r1, B.length r2)
これは『Haskellによる並列・並行プログラミング』リモート輪講 #10の発表資料です。
Haskellの非同期処理を使って並行に入出力を伴う処理を行うプログラムを書く方法について見ていきます。まず、複数のWebページを並行にダウンロードするようなタスクを考えます
newEmptyMVar
で結果を受け取る箱を作る- 非同期に取得処理を行い、結果を
MVar
に詰める MVar
に結果が入るのを待つ
という共通する実装のパターンが見え隠れしているので共通化してみましょう
-- https://github.com/simonmar/parconc-examples/blob/master/geturls2.hs
newtype Async a = Async (MVar a)
async :: IO a -> IO (Async a)
= do
async action <- newEmptyMVar
var >>= putMVar var)
forkIO (action pure (Async var)
wait :: Async a -> IO a
Async var) = readMVar var wait (
意図しないデッドロックを防ぐために wait
では takeMVar
ではなく readMVar
を使っています
これを使えば example1
を以下のように書き換えることができます
-- https://github.com/simonmar/parconc-examples/blob/master/geturls2.hs
example2 :: IO ()
= do
example2 <- async (getURL "https://en.wikipedia.org/wiki/Shovel")
a1 <- async (getURL "https://en.wikipedia.org/wiki/Spade")
a2 <- wait a1
r1 <- wait a2
r2 print (B.length r1, B.length r2)
example2
(36,35)
モナディックなコンビネータを使って更に簡潔に書くことも可能です
-- https://github.com/simonmar/parconc-examples/blob/master/geturls3.hs
=
sites "http://www.google.com"
[ "http://www.bing.com"
, "http://www.yahoo.com"
, "http://www.wikipedia.com/wiki/Spade"
, "http://www.wikipedia.com/wiki/Shovel"
,
]
example3 :: IO ()
= do
example3 <- mapM (async . getURL) sites
as <- mapM wait as
result print $ fmap B.length result
example3
[21,19,20,35,36]
Asyncでのエラー処理
getURL
の中でエラーが起こった場合の挙動を見てみましょう
import Control.Exception
getURL' :: String -> IO ByteString
= do
getURL' url ErrorCall "oops!")
throwIO (pure (B.pack url)
example4 :: IO ()
= do
example4 <- mapM (async . getURL') sites
as <- mapM wait as
result print $ fmap B.length result
example4
:
thread blocked indefinitely in an MVar operation
getURL'
では async
の中で putMVar
が実行される前に例外が投げられてしまうので wait
における readMVar
が永遠にスレッドをブロックしてしまいます
これを安全な挙動に変えるために Async
周りの実装を修正してみしましょう
-- https://github.com/simonmar/parconc-examples/blob/master/geturls4.hs
newtype Async a = Async (MVar (Either SomeException a))
async :: IO a -> IO (Async a)
= do
async action <- newEmptyMVar
var >>= putMVar var)
forkIO (try action pure (Async var)
waitCatch :: Async a -> IO (Either SomeException a)
Async var) = readMVar var
waitCatch (
wait :: Async a -> IO a
= do
wait a <- waitCatch a
r case r of
Left e -> throwIO e
Right a -> pure a
async
と wait
は以前のものと同じ型ですが、例外を適切に伝搬する仕組みを備えています
example5 :: IO ()
= do
example5 <- mapM (async . getURL') sites
as <- mapM wait as
result print $ fmap B.length result
example5
:
oops!
最初に非同期処理が例外を投げた時点でプログラム全体が停止しているのが分かります
非同期処理の合流
ここでは並行に実行している非同期処理のどれか一つでも結果を返した時点で何らかの処理を行いたいような場合について見ていきます。
以下の例は並行に複数のWebサイトをダウンロードして
- 最初にダウンロードが完了したWebサイトの情報を表示する
- 残りのダウンロードが完了するのを待つ
という挙動を実装しています。
-- https://github.com/simonmar/parconc-examples/blob/master/geturls5.hs
import Control.Monad
example6 :: IO ()
= do
example6 <- newEmptyMVar
m let download url = do
<- getURL url
r
putMVar m (url, r)
mapM_ (forkIO . download) sites
<- takeMVar m
(url, r) print $ url ++ ": " ++ show (B.length r)
4 (takeMVar m)
replicateM_
-- 実行するたびに結果が変わる
example6
"http://www.wikipedia.com/wiki/Shovel: 36"
これを明示的に MVar
を用いずに Async
を使って実装する事を考えましょう。
以下の waitEither
は2つの Async
を受け取って最初に完了した値を IO
で返す関数です。
waitEither :: Async a -> Async b -> IO (Either a b)
= do
waitEither a b <- newEmptyMVar
m $ try (fmap Left (wait a)) >>= putMVar m
forkIO $ try (fmap Right (wait b)) >>= putMVar m
forkIO Async m) wait (
この仕組は複数の Async
のリストにも拡張することができます
waitAny :: [Async a] -> IO a
= do
waitAny as <- newEmptyMVar
m let forkwait a = forkIO $ try (wait a) >>= putMVar m
mapM_ forkwait as
Async m)
wait (
-- 勉強会中に逆に全てのAsyncを待つようなコンビネータが作れるか?という質問があったので実装してみた例
waitAll :: [Async a] -> IO [a]
= mapM wait waitAll
以上の実装を使って example6
は明示的に MVar
を使わない形で書き換えることができます。
-- https://github.com/simonmar/parconc-examples/blob/master/geturls6.hs
example7 :: IO ()
= do
example7 let download url = do
<- getURL url
r pure (url, r)
<- mapM (async . download) sites
as
<- waitAny as
(url, r)
print $ url ++ ": " ++ show (B.length r)
mapM_ wait as
example7
"http://www.wikipedia.com/wiki/Spade: 35"