Concurrent I/O

Here I put together brief examples and descriptions of efficient concurrent I/O handling in different languages. At a lower level than mere usage of readily available asynchronous libraries, rather focusing on their implementations. By "efficient", I mean relying on non-blocking functions and file descriptor polling, as opposed to waiting on blocking functions in multiple system threads. I plan to cover more canonical, common, simple, and built-in ways first, though there can be different options within a language.

The following code examples open a couple of files using libc functions (ensuring general suitability for FFI), wait for one of them to become available for reading, then close them and exit. For a test environment, run mkfifo a b, cat - > a, and cat - > b. Then observe execution with strace, possibly checking ls -l /proc/$PID/fd/ or lsof a b to compare file descriptor numbers.

C

Plain

The simplest case, with explicit polling:

#include <fcntl.h>
#include <unistd.h>
#include <sys/select.h>

int main() {
  int fd_a = open("a", O_RDONLY | O_NONBLOCK, 0);
  int fd_b = open("b", O_RDONLY | O_NONBLOCK, 0);

  fd_set rfds;
  FD_ZERO(&rfds);
  FD_SET(fd_a, &rfds);
  FD_SET(fd_b, &rfds);
  select((fd_a > fd_b ? fd_a : fd_b) + 1, &rfds, NULL, NULL, NULL);
  close(fd_a);
  close(fd_b);
  return 0;
}

Though select is not particularly efficient, and other options are not as portable.

When implementing a library, it can pass its file descriptors and timings to the library user, to poll them together with others.

libevent

Some may prefer libev, libevent, or libuv, which are more easily composable, and do use efficient polling methods in a portable way. An example with libevent:

#include <fcntl.h>
#include <unistd.h>
#include <event2/event.h>

void callback(int fd, short events, void *eb_ptr) {
  (void) events;
  struct event_base *eb = eb_ptr;
  printf("Event on %d\n", fd);
  event_base_loopbreak(eb);
}

int main() {
  int fd_a = open("a", O_RDONLY | O_NONBLOCK, 0);
  int fd_b = open("b", O_RDONLY | O_NONBLOCK, 0);

  struct event_base *eb = event_base_new();
  struct event* ev_a = event_new(eb, fd_a, EV_READ | EV_PERSIST, callback, eb);
  struct event* ev_b = event_new(eb, fd_b, EV_READ | EV_PERSIST, callback, eb);
  event_add(ev_a, NULL);
  event_add(ev_b, NULL);
  event_base_dispatch(eb);
  
  close(fd_a);
  close(fd_b);
  event_base_free(eb);
  event_free(ev_a);
  event_free(ev_b);
  return 0;
}

Haskell

The "Extending the Haskell Foreign Function Interface with Concurrency" (2004) paper covers the origins, but for more concrete and up-to-date documentation, see the Haskell's base library, GHC User's Guide's Multi-threading and the FFI. Usually there is I/O multiplexing, the threadWaitRead function takes care of it by blocking a lightweight Haskell thread until a file descriptor becomes readable, while the runtime system polls all such file descriptors together. MVar-based synchronization is used here.

{-# LANGUAGE CApiFFI #-}
import Foreign.C (CInt(..), CString, withCString)
import Control.Concurrent (forkIO, threadWaitRead)
import Control.Concurrent.MVar (newEmptyMVar, putMVar, takeMVar)
import System.Posix.Types (CMode(..))

foreign import capi "fcntl.h value O_NONBLOCK" nonBlock :: CInt
foreign import capi "fcntl.h open" c_open
  :: CString -> CInt -> CMode -> IO CInt
foreign import capi "unistd.h close" c_close
  :: CInt -> IO CInt

cOpen :: String -> IO CInt
cOpen path = withCString path $ \cpath -> c_open cpath nonBlock 0

main :: IO ()
main = do
  fa <- cOpen "a"
  fb <- cOpen "b"
  mvEnd <- newEmptyMVar
  _ta <- forkIO $ threadWaitRead (fromIntegral fa) >> putMVar mvEnd ()
  _tb <- forkIO $ threadWaitRead (fromIntegral fb) >> putMVar mvEnd ()
  _ <- takeMVar mvEnd
  _ <- c_close (fromIntegral fa)
  _ <- c_close (fromIntegral fb)
  pure ()

Compared to manual polling in C, this approach simplifies composition, but multi-threading (even while the threads are lightweight) comes with its own issues, including synchronization.

Haskell also has the async package and a few other libraries aiming to help with concurrency, but the base functions are sufficient for most tasks.

Python

Python has the selectors module for high-level I/O multiplexing, as well as lower-level primitives for socket polling, and a few asynchronous I/O libraries, which are more conventional and provide better composability, particularly asyncio. Selectors by themselves are straightforward to use. With asynio, we create a future, hook a file descriptor to the current event loop, setting a callback, which sets the future's result, as below.

from ctypes import cdll, c_char_p
import asyncio

libc = cdll.LoadLibrary("libc.so.6")
nonBlock = 2048

def c_open(fn):
    return libc.open(c_char_p(fn.encode('utf-8')), nonBlock, 0)

fa = c_open("a")
fb = c_open("b")

def wait_fd(loop, fd):
    future = loop.create_future()

    def callback():
        print(fd, 'is readable')
        future.set_result(None)
        loop.remove_reader(fd)
        
    loop.add_reader(fd, callback)
    return future

async def wait_for_one():
    loop = asyncio.get_running_loop()
    t1 = wait_fd(loop, fa)
    t2 = wait_fd(loop, fb)
    await asyncio.wait([t1, t2], return_when=asyncio.FIRST_COMPLETED)
    libc.close(fa)
    libc.close(fb)

asyncio.run(wait_for_one())

Though there are alternative asynchronous I/O libraries for Python as well. See also: "asyncio: a library with too many sharp corners".

Others

More options, possibly to cover them here later: