Author: | Wang Tong |
---|---|
Version: | 0.4.1 |
"You are not fighting alone!"
Asynchronous MySQL connector written in pure Nim. Releases are available as tags in this repository and can be fetched via nimble:
nimble install asyncmysql
This documentation is still being perfected.
Most of the procs API provided by asyncmysql are callback styles, however, the user can wrap them to async/await Future styles.
Exec a simple query
Here is a simple example to excute a SQL statement:
import asyncdispatch, asyncmysql type Reply = tuple[packet: ResultPacket, rows: seq[string]] const MysqlHost = "127.0.0.1" MysqlPort = Port(3306) MysqlUser = "mysql" MysqlPassword = "123456" MysqlDb = "mysql" MysqlConnLimit = 10 # wrap to yourself Future proc proc execMyQuery(pool: AsyncMysqlPool, q: SqlQuery): Future[seq[Reply]] = var retFuture = newFuture[void]("execMyQuery") result = retFuture proc finishCb(err: ref Exception, replies: seq[Reply]) {.async.} = # do something ... if err == nil: retFuture.complete(replies) else: retFuture.fail(err) pool.execQuery(q, finishCb) proc main() {.async.} = var pool = await openMysqlConnection(AF_INET, MysqlPort, MysqlHost, MysqlUser, MysqlPassword, MysqlDb, MysqlConnLimit) var replies = await pool.execMyQuery(sql("select * from mytable where id = ?", "1")) assert replies.len == 1 echo ">>> select * from mytable where id = 1;" assert replies[0].packet.kind = rpkResultSet echo replies[0].rows pool.close()
Transaction
The important factors of transaction are commit and rollback:
import asyncdispatch, asyncmysql type Reply = tuple[packet: ResultPacket, rows: seq[string]] const MysqlHost = "127.0.0.1" MysqlPort = Port(3306) MysqlUser = "mysql" MysqlPassword = "123456" MysqlDb = "mysql" MysqlConnLimit = 10 proc execMyTransaction(pool: AsyncMysqlPool): Future[void] = var retFuture = newFuture[void]("execMyTransaction") result = retFuture # a rollback wrapper proc execRollback(conn: AsyncMysqlConnection): Future[void] = var retFuture = newFuture[void]("execRollback") result = retFuture proc finishCb(err: ref Exception, replies: seq[Reply]) {.async.} = # do something ... if err == nil: retFuture.complete() else: retFuture.fail(err) conn.execQuery(sql("rollback"), finishCb) # a transaction wrapper proc execTransaction(conn: AsyncMysqlConnection): Future[void] = var retFuture = newFuture[void]("execTransaction") result = retFuture proc finishCb(err: ref Exception, replies: seq[Reply]) {.async.} = # do something ... if err == nil: # check whether the results are as expected if replies.len < 5 or replies[0].packet.kind == rpkError or replies[1].packet.kind == rpkError or replies[2].packet.kind == rpkError or replies[3].packet.kind == rpkError or replies[4].packet.kind == rpkError: try: # rollback await conn.execRollback() except: retFuture.fail(getCurrentException()) else: retFuture.complete() else: retFuture.fail(err) conn.execQuery(sql(""" start transaction; select val from sample where id = ?; update sample set val = 1 where id = ?; insert into sample (val) values (200),,,; commit; """, "1", "1"), finishCb) proc cb(conn: AsyncMysqlConnection, connIx: int) {.async.} = try: await conn.execTransaction() pool.release(connIx) # release connection to the pool retFuture.complete() except: pool.release(connIx) # release connection to the pool retFuture.fail(getCurrentException()) pool.request(cb) proc main() {.async.} = var pool = await openMysqlConnection(AF_INET, MysqlPort, MysqlHost, MysqlUser, MysqlPassword, MysqlDb, MysqlConnLimit) await pool.execMyTransaction()
Streaming large result sets field-by-field
import asyncdispatch, asyncmysql type Reply = tuple[packet: ResultPacket, rows: seq[string]] const MysqlHost = "127.0.0.1" MysqlPort = Port(3306) MysqlUser = "mysql" MysqlPassword = "123456" MysqlDb = "mysql" MysqlConnLimit = 10 # wrap to yourself Future proc proc execMyEchoQuery(pool: AsyncMysqlPool): Future[void] = var retFuture = newFuture[void]("execMyQuery") result = retFuture var queryLen = 2 var querySql = sql(""" select host, user from user where user = ?; select password from user; """, "root") var queryPos = 0 proc recvPacketCb(packet: ResultPacket) {.async.} = inc(queryPos) case queryPos of 1: echo ">>> select host, user from user where user = ?;" assert packet.kind == rpkResultSet of 2: echo ">>> select password from user;" assert packet.kind == rpkResultSet else: discard proc recvPacketEndCb() {.async.} = case queryPos of 1: write(stdout, "\n") of 2: write(stdout, "\n") else: discard proc recvFieldCb(field: string) {.async.} = case queryPos of 1: write(stdout, field, " ") of 2: write(stdout, field, " ") else: discard proc finishCb(err: ref Exception, replies: seq[Reply]) {.async.} = if err == nil: retFuture.complete(replies) else: retFuture.fail(err) pool.execQuery(querySql, finishCb, recvPacketCb, recvPacketEndCb, recvFieldCb) proc main() {.async.} = var pool = await openMysqlConnection(AF_INET, MysqlPort, MysqlHost, MysqlUser, MysqlPassword, MysqlDb, MysqlConnLimit) await pool.execMyEchoQuery() pool.close()