Here I put together brief examples and descriptions of efficient concurrent I/O handling in different languages. At a lower level than mere usage of readily available asynchronous libraries, rather focusing on their implementations. By "efficient", I mean relying on non-blocking functions and file descriptor polling, as opposed to waiting on blocking functions in multiple system threads. I plan to cover more canonical, common, simple, and built-in ways first, though there can be different options within a language.
The following code examples open a couple of files using libc
functions (ensuring general suitability for FFI), wait for one
of them to become available for reading, then close them and
exit. For a test environment, run mkfifo a
b
, cat - > a
, and cat - >
b
. Then observe execution with strace
,
possibly checking ls -l /proc/$PID/fd/
or lsof a b
to compare file descriptor numbers.
The simplest case, with explicit polling:
#include <fcntl.h> #include <unistd.h> #include <sys/select.h> int main() { int fd_a = open("a", O_RDONLY | O_NONBLOCK, 0); int fd_b = open("b", O_RDONLY | O_NONBLOCK, 0); fd_set rfds; FD_ZERO(&rfds); FD_SET(fd_a, &rfds); FD_SET(fd_b, &rfds); select((fd_a > fd_b ? fd_a : fd_b) + 1, &rfds, NULL, NULL, NULL); close(fd_a); close(fd_b); return 0; }
Though select
is not particularly efficient, and
other options are not as portable.
When implementing a library, it can pass its file descriptors and timings to the library user, to poll them together with others.
Some may prefer libev, libevent, or libuv, which are more easily composable, and do use efficient polling methods in a portable way. An example with libevent:
#include <fcntl.h> #include <unistd.h> #include <event2/event.h> void callback(int fd, short events, void *eb_ptr) { (void) events; struct event_base *eb = eb_ptr; printf("Event on %d\n", fd); event_base_loopbreak(eb); } int main() { int fd_a = open("a", O_RDONLY | O_NONBLOCK, 0); int fd_b = open("b", O_RDONLY | O_NONBLOCK, 0); struct event_base *eb = event_base_new(); struct event* ev_a = event_new(eb, fd_a, EV_READ | EV_PERSIST, callback, eb); struct event* ev_b = event_new(eb, fd_b, EV_READ | EV_PERSIST, callback, eb); event_add(ev_a, NULL); event_add(ev_b, NULL); event_base_dispatch(eb); close(fd_a); close(fd_b); event_base_free(eb); event_free(ev_a); event_free(ev_b); return 0; }
The "Extending the Haskell Foreign Function Interface with
Concurrency" (2004) paper covers the origins, but for more
concrete and up-to-date documentation, see the Haskell's base
library, GHC User's Guide's Multi-threading and the FFI.
Usually there is I/O multiplexing,
the threadWaitRead
function takes care of it by
blocking a lightweight Haskell thread until a file descriptor
becomes readable, while the runtime system polls all such file
descriptors together. MVar
-based synchronization is
used here.
{-# LANGUAGE CApiFFI #-} import Foreign.C (CInt(..), CString, withCString) import Control.Concurrent (forkIO, threadWaitRead) import Control.Concurrent.MVar (newEmptyMVar, putMVar, takeMVar) import System.Posix.Types (CMode(..)) foreign import capi "fcntl.h value O_NONBLOCK" nonBlock :: CInt foreign import capi "fcntl.h open" c_open :: CString -> CInt -> CMode -> IO CInt foreign import capi "unistd.h close" c_close :: CInt -> IO CInt cOpen :: String -> IO CInt cOpen path = withCString path $ \cpath -> c_open cpath nonBlock 0 main :: IO () main = do fa <- cOpen "a" fb <- cOpen "b" mvEnd <- newEmptyMVar _ta <- forkIO $ threadWaitRead (fromIntegral fa) >> putMVar mvEnd () _tb <- forkIO $ threadWaitRead (fromIntegral fb) >> putMVar mvEnd () _ <- takeMVar mvEnd _ <- c_close (fromIntegral fa) _ <- c_close (fromIntegral fb) pure ()
Compared to manual polling in C, this approach simplifies composition, but multi-threading (even while the threads are lightweight) comes with its own issues, including synchronization.
Haskell also has the async package and a few other libraries aiming to help with concurrency, but the base functions are sufficient for most tasks.
Python has the selectors module for high-level I/O multiplexing, as well as lower-level primitives for socket polling, and a few asynchronous I/O libraries, which are more conventional and provide better composability, particularly asyncio. Selectors by themselves are straightforward to use. With asynio, we create a future, hook a file descriptor to the current event loop, setting a callback, which sets the future's result, as below.
from ctypes import cdll, c_char_p import asyncio libc = cdll.LoadLibrary("libc.so.6") nonBlock = 2048 def c_open(fn): return libc.open(c_char_p(fn.encode('utf-8')), nonBlock, 0) fa = c_open("a") fb = c_open("b") def wait_fd(loop, fd): future = loop.create_future() def callback(): print(fd, 'is readable') future.set_result(None) loop.remove_reader(fd) loop.add_reader(fd, callback) return future async def wait_for_one(): loop = asyncio.get_running_loop() t1 = wait_fd(loop, fa) t2 = wait_fd(loop, fb) await asyncio.wait([t1, t2], return_when=asyncio.FIRST_COMPLETED) libc.close(fa) libc.close(fb) asyncio.run(wait_for_one())
Though there are alternative asynchronous I/O libraries for Python as well. See also: "asyncio: a library with too many sharp corners".
More options, possibly to cover them here later: