Module pool

This module implements an connection pool. The connection pool is very efficient, it automatically assigns connections, and queues when there is no connection, until a connection is available.

Types

AsyncMysqlPool = ref object
  conns: seq[tuple[conn: AsyncMysqlConnection, flags: set[ConnectionFlag]]]
  connFutures: Deque[Future[int]]
  config: Config
The connection pool.   Source Edit
RequestCb = proc (conn: AsyncMysqlConnection; connIx: int): Future[void] {.
closure, gcsafe
.}
  Source Edit

Procs

proc openMysqlPool(domain: Domain = AF_INET; port = Port(3306); host = "127.0.0.1";
                  user: string; password: string; database: string;
                  charset = DefaultClientCharset;
                  capabilities = DefaultClientCapabilities; connectionLimit = 10): Future[
    AsyncMysqlPool] {.
raises: [FutureError], tags: [RootEffect]
.}

Creates a new connection pool.

You can set the following options:

  • domain - the protocol family of the underly socket for this connection.
  • port - the port number to connect to.
  • host - the hostname of the database you are connecting to.
  • user - the MySQL user to authenticate as.
  • password - the password of the MySQL user.
  • database - name of the database to use for this connection.
  • charset - the charset for the connection. (Default: DefaultClientCharset). All available charset constants are in a sub-module called charset.
  • capabilities - the client capabilitis which is a flag bitmask. (Default: DefaultClientCapabilities). And this can be used to affect the connection's behavior. All available charset constants are in a sub-module called capabilities.
  • connectionLimit - The maximum number of connections to create at once.

Pool accept all the same options as a connection. When creating a new connection, the options are simply passed to the connection.

  Source Edit
proc close(p: AsyncMysqlPool) {.
raises: [SslError, OSError, Exception], tags: [RootEffect]
.}
Closes the pool p, all the connections in the pool will be closed immediately.   Source Edit
proc countAvailableConnections(p: AsyncMysqlPool): int {.
raises: [], tags: []
.}
Returns count of the current available connections.   Source Edit
proc release(p: AsyncMysqlPool; connIx: int) {.
raises: [IndexError, Exception, FutureError], tags: [RootEffect]
.}
Signal that the connection is return to the pool, ready to be used again by someone else. connIx is the connection ID in the pool.   Source Edit
proc request(p: AsyncMysqlPool; cb: RequestCb) {.
raises: [Exception, FutureError], tags: [RootEffect]
.}

Requires pool p to assign a connection.

When an available connection is obtained, the connection is passed to the callback proc, and the callback cb is called.

Notes: don't close the connection in cb. When cb is finished, the connection is automatically released to the pool inside, and other requests can reuse this connection.

This proc is particularly useful when handling transactions. When there is no error in the previous queries then executing commit, otherwise rollbackk must be excuted to protect the data.

proc execMyRequest(pool: AsyncMysqlPool): Future[void] =
  var retFuture = newFuture[void]("myRequest")
  result = retFuture
  
  proc execRollback(conn: AsyncMysqlConnection): Future[void] =
    var retFuture = newFuture[void]("execRollback")
    result = retFuture
    
    proc finishCb(
      err: ref Exception,
      replies: seq[tuple[packet: ResultPacket, rows: seq[string]]]
    ) {.async.} =
      ##     do something if needed ...
      if err == nil:
        retFuture.complete()
      else:
        retFuture.fail(err)
    
    conn.execQuery(sql("rollback"), finishCb)
  
  proc execTransaction(conn: AsyncMysqlConnection): Future[void] =
    var retFuture = newFuture[void]("execTransaction")
    result = retFuture
    
    proc finishCb(
      err: ref Exception,
      replies: seq[tuple[packet: ResultPacket, rows: seq[string]]]
    ) {.async.} =
      ##     do something
      if err == nil:
        if replies.len < 5 or
          replies[0].packet.kind == rpkError or
          replies[1].packet.kind == rpkError or
          replies[2].packet.kind == rpkError or
          replies[3].packet.kind == rpkError or
          replies[4].packet.kind == rpkError:
          try:
            await conn.execRollback()
          except:
            retFuture.fail(getCurrentException())
        else:
          retFuture.complete()
      else:
        retFuture.fail(err)
    
    conn.execQuery(sql("""
      start transaction;
      select val from sample where id = ?;
      update sample set val = 1 where id = ?;
      insert into sample (val) values (200),,,;
      commit;
      """, "1", "1"), finishCb)
  
  proc cb(conn: AsyncMysqlConnection, connIx: int) {.async.} =
    try:
      await conn.execTransaction()
      pool.release(connIx)
      retFuture.complete()
    except:
      pool.release(connIx)
      retFuture.fail(getCurrentException())
  
  pool.request(cb)

proc main() {.async.} =
  await pool.execMyRequest()
  Source Edit
proc execQuery(p: AsyncMysqlPool; q: SqlQuery; finishCb: proc (err: ref Exception): Future[
    void] {.
closure, gcsafe
.}; recvPacketCb: proc (packet: ResultPacket): Future[void] {.
closure, gcsafe
.} = nil; recvPacketEndCb: proc (): Future[void] {.
closure, gcsafe
.} = nil; recvFieldCb: proc (field: string): Future[void] {.
closure, gcsafe
.} = nil) {.
raises: [Exception, FutureError, IndexError, ValueError], tags: [RootEffect]
.}

Executes the SQL statements in q.

This proc is especially useful when dealing with large result sets. The query process is made up of many different stages. At each stage, a different callback proc is called:

  • recvPacketCb - called when a SQL statement is beginning.
  • recvFieldCb - called when a complete field is made.
  • recvPacketEndCb - called when a SQL statement is finished.
  • finishCb - called when all SQL statements are finished or occur some errors.

For example, when the following statements are executed:

select host from user;
select id from test;
insert into test (name) values ('name1') where id = 1;

the query process is like this:

1. packet stage

receives a result packet from select host from user;, calls recvPacketCb, and the kind field of the argument packet is set to rpkResultSet.

2. field stage

receives a field(column) from select host from user;, calls recvFieldCb. Then, receives next field(column), calls recvFieldCb again.

recvFieldCb will be called again by again until there is no any field from select host from user;.

3. packet stage

receives a result packet from select id from test;, calls recvPacketCb, and the kind field of the argument packet is set to rpkResultSet.

4. field stage

receives a field(column) from select id from user;, calls recvFieldCb. Then, receives next field(column), calls recvFieldCb again.

recvFieldCb will be called again by again until there is no any field from select host from user;.

5. packet stage

receives a result packet from insert into test (name) values ('name1') where id = 1;, calls recvPacketCb, and the kind field of the argument packet is set to rpkOk.

6. finished stage

all SQL statements are finished, calls finishCb.

Notes: if any errors occur in the above steps, calls finishCb immediately and ignores other callback procs.

  Source Edit
proc execQuery(p: AsyncMysqlPool; q: SqlQuery; bufferSize: int; finishCb: proc (
    err: ref Exception): Future[void] {.
closure, gcsafe
.}; recvPacketCb: proc ( packet: ResultPacket): Future[void] {.
closure, gcsafe
.} = nil; recvPacketEndCb: proc (): Future[void] {.
closure, gcsafe
.} = nil; recvFieldCb: proc (buffer: string): Future[void] {.
closure, gcsafe
.} = nil; recvFieldEndCb: proc (): Future[void] {.
closure, gcsafe
.} = nil) {.
raises: [Exception, FutureError, IndexError, ValueError], tags: [RootEffect]
.}

Executes the SQL statements in q. bufferSize specifies the size of field buffer.

This proc is especially useful when dealing with large result sets. The query process is made up of many different stages. At each stage, a different callback proc is called:

  • recvPacketCb - called when a SQL statement is beginning.
  • recvFieldCb - called when the content of a field fill fully the internal buffer.
  • recvFieldEndCb - called when a complete field is made.
  • recvPacketEndCb - called when a SQL statement is finished.
  • finishCb - called when all SQL statements are finished or an error occurs.
  Source Edit
proc execQuery(p: AsyncMysqlPool; q: SqlQuery; finishCb: proc (err: ref Exception;
    replies: seq[tuple[packet: ResultPacket, rows: seq[string]]]): Future[void] {.
closure, gcsafe
.}) {.
raises: [Exception, FutureError, IndexError, ValueError], tags: [RootEffect]
.}

Executes the SQL statements in q.

This proc places all the results in memory. When dealing with large result sets, this can be inefficient and take up a lot of memory, so you can try the other two execQuery procs at this point.

  • finishCb - called when all SQL statements are finished or an error occurs.
  Source Edit
proc execInitDb(p: AsyncMysqlPool; database: string; finishCb: proc (
    err: ref Exception; reply: ResultPacket): Future[void] {.
closure, gcsafe
.}) {.
raises: [Exception, FutureError, IndexError, ValueError], tags: [RootEffect]
.}
Notifies the mysql server that the connection is disconnected. Attempting to request the mysql server again will causes unknown errors.
  • finishCb - called when this task is finished or an error occurs.
  Source Edit
proc execChangeUser(p: AsyncMysqlPool; user: string; password: string;
                   database: string; charset = DefaultClientCharset; finishCb: proc (
    err: ref Exception; reply: ResultPacket): Future[void] {.
closure, gcsafe
.}) {.
raises: [Exception, FutureError, IndexError, ValueError], tags: [RootEffect]
.}

Changes the default database on the connection.

Equivalent to use <database>;

  • finishCb - called when this task is finished or an error occurs.
  Source Edit