Haskellの非同期処理を使った入出力の重ね合わせ

Author

lotz

Published

June 29, 2020

これは『Haskellによる並列・並行プログラミング』リモート輪講 #10の発表資料です。

Haskellの非同期処理を使って並行に入出力を伴う処理を行うプログラムを書く方法について見ていきます。まず、複数のWebページを並行にダウンロードするようなタスクを考えます

-- https://github.com/simonmar/parconc-examples/blob/master/geturls1.hs

import Control.Concurrent
import Data.ByteString.Char8 as B
import System.Random

getURL :: String -> IO ByteString
getURL url = do
  delay <- randomRIO (500000, 1500000) -- URLのコンテンツを取得する時間ということにする
  threadDelay delay
  pure (B.pack url)

example1 :: IO ()
example1 = do
  m1 <- newEmptyMVar
  m2 <- newEmptyMVar
  
  forkIO $ do
    r <- getURL "https://en.wikipedia.org/wiki/Shovel"
    putMVar m1 r

  forkIO $ do
    r <- getURL "https://en.wikipedia.org/wiki/Spade"
    putMVar m2 r

  r1 <- takeMVar m1
  r2 <- takeMVar m2
  print (B.length r1, B.length r2)

という共通する実装のパターンが見え隠れしているので共通化してみましょう

-- https://github.com/simonmar/parconc-examples/blob/master/geturls2.hs

newtype Async a = Async (MVar a)

async :: IO a -> IO (Async a)
async action = do
  var <- newEmptyMVar
  forkIO (action >>= putMVar var)
  pure (Async var)

wait :: Async a -> IO a
wait (Async var) = readMVar var

意図しないデッドロックを防ぐために wait では takeMVar ではなく readMVar を使っています

これを使えば example1 を以下のように書き換えることができます

-- https://github.com/simonmar/parconc-examples/blob/master/geturls2.hs

example2 :: IO ()
example2 = do
  a1 <- async (getURL "https://en.wikipedia.org/wiki/Shovel")
  a2 <- async (getURL "https://en.wikipedia.org/wiki/Spade")
  r1 <- wait a1
  r2 <- wait a2
  print (B.length r1, B.length r2)

example2
(36,35)

モナディックなコンビネータを使って更に簡潔に書くことも可能です

-- https://github.com/simonmar/parconc-examples/blob/master/geturls3.hs

sites =
  [ "http://www.google.com"
  , "http://www.bing.com"
  , "http://www.yahoo.com"
  , "http://www.wikipedia.com/wiki/Spade"
  , "http://www.wikipedia.com/wiki/Shovel"
  ]

example3 :: IO ()
example3 = do
  as <- mapM (async . getURL) sites
  result <- mapM wait as
  print $ fmap B.length result

example3
[21,19,20,35,36]

Asyncでのエラー処理

getURL の中でエラーが起こった場合の挙動を見てみましょう

import Control.Exception

getURL' :: String -> IO ByteString
getURL' url = do
  throwIO (ErrorCall "oops!")
  pure (B.pack url)

example4 :: IO ()
example4 = do
  as <- mapM (async . getURL') sites
  result <- mapM wait as
  print $ fmap B.length result

example4
: 
thread blocked indefinitely in an MVar operation

getURL' では async の中で putMVar が実行される前に例外が投げられてしまうので wait における readMVar が永遠にスレッドをブロックしてしまいます

これを安全な挙動に変えるために Async 周りの実装を修正してみしましょう

-- https://github.com/simonmar/parconc-examples/blob/master/geturls4.hs

newtype Async a = Async (MVar (Either SomeException a))

async :: IO a -> IO (Async a)
async action = do
  var <- newEmptyMVar
  forkIO (try action >>= putMVar var)
  pure (Async var)

waitCatch :: Async a -> IO (Either SomeException a)
waitCatch (Async var) = readMVar var

wait :: Async a -> IO a
wait a = do
  r <- waitCatch a
  case r of
    Left e  -> throwIO e
    Right a -> pure a

asyncwait は以前のものと同じ型ですが、例外を適切に伝搬する仕組みを備えています

example5 :: IO ()
example5 = do
  as <- mapM (async . getURL') sites
  result <- mapM wait as
  print $ fmap B.length result

example5
: 
oops!

最初に非同期処理が例外を投げた時点でプログラム全体が停止しているのが分かります

非同期処理の合流

ここでは並行に実行している非同期処理のどれか一つでも結果を返した時点で何らかの処理を行いたいような場合について見ていきます。

以下の例は並行に複数のWebサイトをダウンロードして

  • 最初にダウンロードが完了したWebサイトの情報を表示する
  • 残りのダウンロードが完了するのを待つ

という挙動を実装しています。

-- https://github.com/simonmar/parconc-examples/blob/master/geturls5.hs

import Control.Monad

example6 :: IO ()
example6 = do
  m <- newEmptyMVar
  let download url = do
        r <- getURL url
        putMVar m (url, r)

  mapM_ (forkIO . download) sites
  
  (url, r) <- takeMVar m
  print $ url ++ ": " ++ show (B.length r)
  replicateM_ 4 (takeMVar m)

-- 実行するたびに結果が変わる
example6
"http://www.wikipedia.com/wiki/Shovel: 36"

これを明示的に MVar を用いずに Async を使って実装する事を考えましょう。

以下の waitEither は2つの Async を受け取って最初に完了した値を IO で返す関数です。

waitEither :: Async a -> Async b -> IO (Either a b)
waitEither a b = do
  m <- newEmptyMVar
  forkIO $ try (fmap Left  (wait a)) >>= putMVar m
  forkIO $ try (fmap Right (wait b)) >>= putMVar m
  wait (Async m)

この仕組は複数の Async のリストにも拡張することができます

waitAny :: [Async a] -> IO a
waitAny as = do
  m <- newEmptyMVar
  let forkwait a = forkIO $ try (wait a) >>= putMVar m
  mapM_ forkwait as
  wait (Async m)


-- 勉強会中に逆に全てのAsyncを待つようなコンビネータが作れるか?という質問があったので実装してみた例
waitAll :: [Async a] -> IO [a]
waitAll = mapM wait

以上の実装を使って example6 は明示的に MVar を使わない形で書き換えることができます。

-- https://github.com/simonmar/parconc-examples/blob/master/geturls6.hs

example7 :: IO ()
example7 = do
  let download url = do
        r <- getURL url
        pure (url, r)

  as <- mapM (async . download) sites
  
  (url, r) <- waitAny as
  
  print $ url ++ ": " ++ show (B.length r)
  mapM_ wait as

example7
"http://www.wikipedia.com/wiki/Spade: 35"