It may be wrong to use async/await or coroutine to execute low-level I/O.

I have spent several years on Node.js and libuv. I felt desperately that callback functions, Promise and async/await in Node.js work pretty well all the time. The callback functions is responsible for the low-level I/O, the data are read and written chunk by chunk. The Promise is responsible to wrap multiple callback functions to make a atomic I/O operation. And the async/await is responsible only to form a logical workflow.

And I have two to three years experience in asyncdispatch, asyncnet, asynchttpserver in Nim and I have written several asynchronous modules. Now, I’m writting an asynchronous Mysql client/connector in pure Nim. However, I now feel that it is not appropriate to use async/await entirely.

Now, let us suppose that we use a mysql client to execute some queries. This is a pseudo code:

var conn = newAsyncSocket()

proc query(sql: string): Stream {.async.} =
await conn.send(mysql_format_function(sql))
let data = await conn.recv_mysql_function()
mysql_parse_function(data)
return newStream(conn)

proc main() {.async.} =
let stream1 = await query("select 100; select 200;")
while true:
let data = await stream1.read()
echo data # 100, 200
if data == "":
break

let stream2 = await query("select 300; select 400;")
while true:
let data = await stream2.read()
echo data # 300, 400
if data == "":
break

waitFor main()

The code work fine if stream2 is always executing after stream1. However, if not:

proc main() {.async.} =
let stream1 = await query("select 100; select 200;")
let stream2 = await query("select 300; select 400;")

while true:
let data = await stream1.read()
echo data
if data == "":
break

while true:
let data = await stream2.read()
echo data
if data == "":
break

or

proc do1() {.async.} =
let stream1 = await query("select 100; select 200;")
while true:
let data = await stream1.read()
echo data
if data == "":
break

proc do2() {.async.} =
let stream2 = await query("select 300; select 400;")
while true:
let data = await stream2.read()
echo data
if data == "":
break

proc main() {.async.} =
asyncCheck do1()
asyncCheck do2()

What happened then? The main function will blocking at (await) stream2 because stream1 is never finished. What does finished mean? The stream1 finished is mean that stream1 recv all the data that belong to the query of select 100; select 200;.

     stream1 data           stream2 data
|......................|....................|
| | |
v V V
stream1 begin stream1 finished stream2 finished
stream2 begin

A solution is to use lock or queue cache. This can solve the second problems, but it also introduce data race:

proc do1() {.async.} =
await conn.lock()
let stream1 = await query("select 100; select 200;")
# ...
conn.unlock()

proc do2() {.async.} =
await conn.lock()
let stream1 = await query("select 100; select 200;")
# ...
conn.unlock()

And it cannot prevent programmers to write the first kind of code:

proc main() {.async.} =
let stream1 = await query("select 100; select 200;")
let stream2 = await query("select 300; select 400;")

Each of your asynchronous functions must be atomic. Otherwise, there are traps in them. The atomic function is mean that you can not construct streaming interfaces:

proc main() {.async.} =
let all_data1 = await query("select 100; select 200;")
let all_data2 = await query("select 300; select 400;")

It is very bad to transfer large data, and it is a huge issue whenever you use async/await to write any program .


Ideally, using some callback functions to execute low-level I/O, using some Future to wrap these callback functions into atomic operations, and using async/await to construct the final logical workflow, then you can get the perfect solution:

# `Future` wrapper
proc query(sql: string, fileStream: FileStream): Future[void] =
var retFuture = newFuture[void]()

let stream = conn.send(mysql_format_function(sql)) # I/O interface

stream.onData() do (data: string): # I/O interface, read loop
fileStream.write(data)

stream.onFinished() do (): # I/O interface
complete(retFuture)

# logical workflow
proc main() {.async.} =
await query("select 100; select 200;", fileStream1) # write to file
await query("select 300; select 400;", fileStream2) # write to file