Module asyncmysql

Author: Wang Tong
Version: 0.4.1

"You are not fighting alone!"

Asynchronous MySQL connector written in pure Nim. Releases are available as tags in this repository and can be fetched via nimble:

nimble install asyncmysql

This documentation is still being perfected.

Most of the procs API provided by asyncmysql are callback styles, however, the user can wrap them to async/await Future styles.

Exec a simple query

Here is a simple example to excute a SQL statement:

import asyncdispatch, asyncmysql

type
  Reply = tuple[packet: ResultPacket, rows: seq[string]]

const
  MysqlHost = "127.0.0.1"
  MysqlPort = Port(3306)
  MysqlUser = "mysql"
  MysqlPassword = "123456"
  MysqlDb = "mysql"
  MysqlConnLimit = 10

# wrap to yourself Future proc
proc execMyQuery(pool: AsyncMysqlPool, q: SqlQuery): Future[seq[Reply]] =
  var retFuture = newFuture[void]("execMyQuery")
  result = retFuture
  
  proc finishCb(err: ref Exception, replies: seq[Reply]) {.async.} =
    # do something ...
    
    if err == nil:
      retFuture.complete(replies)
    else:
      retFuture.fail(err)
  
  pool.execQuery(q, finishCb)

proc main() {.async.} =
  var pool = await openMysqlConnection(AF_INET, MysqlPort, MysqlHost, MysqlUser,
                                       MysqlPassword, MysqlDb, MysqlConnLimit)
  var replies = await pool.execMyQuery(sql("select * from mytable where id = ?", "1"))
  
  assert replies.len == 1
  
  echo ">>> select * from mytable where id = 1;"
  assert replies[0].packet.kind = rpkResultSet
  
  echo replies[0].rows
  
  pool.close()

Transaction

The important factors of transaction are commit and rollback:

import asyncdispatch, asyncmysql

type
  Reply = tuple[packet: ResultPacket, rows: seq[string]]

const
  MysqlHost = "127.0.0.1"
  MysqlPort = Port(3306)
  MysqlUser = "mysql"
  MysqlPassword = "123456"
  MysqlDb = "mysql"
  MysqlConnLimit = 10

proc execMyTransaction(pool: AsyncMysqlPool): Future[void] =
  var retFuture = newFuture[void]("execMyTransaction")
  result = retFuture
  
  # a rollback wrapper
  proc execRollback(conn: AsyncMysqlConnection): Future[void] =
    var retFuture = newFuture[void]("execRollback")
    result = retFuture
    
    proc finishCb(err: ref Exception, replies: seq[Reply]) {.async.} =
      # do something ...
      
      if err == nil:
        retFuture.complete()
      else:
        retFuture.fail(err)
    
    conn.execQuery(sql("rollback"), finishCb)
  
  # a transaction wrapper
  proc execTransaction(conn: AsyncMysqlConnection): Future[void] =
    var retFuture = newFuture[void]("execTransaction")
    result = retFuture
    
    proc finishCb(err: ref Exception, replies: seq[Reply]) {.async.} =
      # do something ...
      
      if err == nil:
        # check whether the results are as expected
        if replies.len < 5 or
          replies[0].packet.kind == rpkError or
          replies[1].packet.kind == rpkError or
          replies[2].packet.kind == rpkError or
          replies[3].packet.kind == rpkError or
          replies[4].packet.kind == rpkError:
          try:
            # rollback
            await conn.execRollback()
          except:
            retFuture.fail(getCurrentException())
        else:
          retFuture.complete()
      else:
        retFuture.fail(err)
    
    conn.execQuery(sql("""
      start transaction;
      select val from sample where id = ?;
      update sample set val = 1 where id = ?;
      insert into sample (val) values (200),,,;
      commit;
      """, "1", "1"), finishCb)
  
  proc cb(conn: AsyncMysqlConnection, connIx: int) {.async.} =
    try:
      await conn.execTransaction()
      pool.release(connIx) # release connection to the pool
      retFuture.complete()
    except:
      pool.release(connIx) # release connection to the pool
      retFuture.fail(getCurrentException())
  
  pool.request(cb)

proc main() {.async.} =
  var pool = await openMysqlConnection(AF_INET, MysqlPort, MysqlHost, MysqlUser,
                                       MysqlPassword, MysqlDb, MysqlConnLimit)
  await pool.execMyTransaction()

Streaming large result sets field-by-field

import asyncdispatch, asyncmysql

type
  Reply = tuple[packet: ResultPacket, rows: seq[string]]

const
  MysqlHost = "127.0.0.1"
  MysqlPort = Port(3306)
  MysqlUser = "mysql"
  MysqlPassword = "123456"
  MysqlDb = "mysql"
  MysqlConnLimit = 10

# wrap to yourself Future proc
proc execMyEchoQuery(pool: AsyncMysqlPool): Future[void] =
  var retFuture = newFuture[void]("execMyQuery")
  result = retFuture
  
  var queryLen = 2
  var querySql = sql("""
    select host, user from user where user = ?;
    select password from user;
    """, "root")
  var queryPos = 0
  
  proc recvPacketCb(packet: ResultPacket) {.async.} =
    inc(queryPos)
    case queryPos
    of 1:
      echo ">>> select host, user from user where user = ?;"
      assert packet.kind == rpkResultSet
    of 2:
      echo ">>> select password from user;"
      assert packet.kind == rpkResultSet
    else:
      discard
  
  proc recvPacketEndCb() {.async.} =
    case queryPos
    of 1:
      write(stdout, "\n")
    of 2:
      write(stdout, "\n")
    else:
      discard
  
  proc recvFieldCb(field: string) {.async.} =
    case queryPos
    of 1:
      write(stdout, field, " ")
    of 2:
      write(stdout, field, " ")
    else:
      discard
  
  proc finishCb(err: ref Exception, replies: seq[Reply]) {.async.} =
    if err == nil:
      retFuture.complete(replies)
    else:
      retFuture.fail(err)
  
  pool.execQuery(querySql, finishCb, recvPacketCb, recvPacketEndCb, recvFieldCb)

proc main() {.async.} =
  var pool = await openMysqlConnection(AF_INET, MysqlPort, MysqlHost, MysqlUser,
                                       MysqlPassword, MysqlDb, MysqlConnLimit)
  await pool.execMyEchoQuery()
  pool.close()

Sub-modules