Skip to content

Connections and Transactions

Connections

Connections are the heart-piece of databasez. They are enabling rollback behavior, via transactions and provide all manipulation functions. Despite there are shortcut methods on the Database object, they are all using connections internally. When wanting for performance reasons one connection doing multiple jobs, retrieving a connection via connection() and perform all tasks within the connection context (connections are async contextmanagers) are the way to go.

Multithreading

sqlalchemy's async addon is also multi-threading capable but we cannot use this for two reasons:

  • Only the NullPool is supported which is afaik just no pooling.
  • No global connection which is reliable resetted would be possible.

Therefor databasez uses an other approach: isolation:

Event-loops are per thread. So in this approach for every new event-loop detected the database object is copied and assigned to it. This way all connections are running in a per-thread pool. The global connection is a bit different from the rest. Whenever it is accessed by an other thread, asyncio.run_coroutine_threadsafe is used. In the full-isolation mode (default) the global connection is even moved to an own thread.

Global connection with auto-rollback

Sometimes, especially for tests, you want a connection in which all changes are resetted when the database is closed. For having this reliable, a global connection is lazily initialized when requested on the database object via the force_rollback contextmanager/parameter/pseudo-attribute.

Whenever connection() is called, the same global connection is returned. It behaves nearly normally except you have just one connection available. This means you have to be careful when using iterate to not open another connection via connection (except you set force_rollback to False before opening the connection). Otherwise it will deadlock.

Example for force_rollback:

from databasez import Database

database = Database("sqlite:///foo.sqlite", force_rollback=True)


def test_foo():
    async with database:
        ...
        # do the tests
    # and now everything is rolled back

    async with database:
        ...
        # do the tests
    # and now everything is rolled back again

Transactions

Transactions are lazily initialized. You dont't need a connected database to create them via database.transaction().

When created, there are three ways to activate the Transaction

  1. The async context manager way via async with transaction: ...
  2. Entering a method decorated with a transaction.
  3. The manual way via await transaction

Whenever the transaction is activated, a connected database is required.

A second way to get a transaction is via the Connection. It has also a transaction() method.

The three ways to use a transaction

Via the async context manager protocol

from databasez import Database


async def main():
    async with Database("<URL>") as database:
        # do something
        async with database.transaction():
            ...

        # check something and then reset
        async with database.transaction(force_rollback=True):
            ...
It can also be acquired from a specific database connection:

from databasez import Database


async def main():
    async with Database("<URL>") as database, database.connection() as connection:
        # do something
        async with connection.transaction():
            ...

        # check something and then reset
        async with connection.transaction(force_rollback=True):
            ...

Via decorating

You can also use .transaction() as a function decorator on any async function:

from databasez import Database

database = Database("<URL>")


@database.transaction()
async def create_users(request):
    ...
    # do something


async def main():
    async with database:
        # now the transaction is activated
        await create_users()

Manually

For a lower-level transaction API:

from databasez import Database


async def main():
    async with Database("<URL>") as database, database.connection() as connection:
        # get the activated transaction
        transaction = await connection.transaction()

        try:
            # do something
            ...
        except Exception:
            await transaction.rollback()
        else:
            await transaction.commit()

Auto-rollback (force_rollback)

Transactions support an keyword parameter named force_rollback which default to False. When set to True at the end of the transaction a rollback is tried. This means all changes are undone.

This is a simpler variant of force_rollback of the database object.

from databasez import Database

database = Database("sqlite:///foo.sqlite")


def test_foo():
    async with database:
        await database.execute(...)

        async with database.transaction(force_rollback=True):
            # this is rolled back
            await database.execute(...)
            async with database.transaction(force_rollback=False):
                ...
                # this is saved

Isolation level

The state of a transaction is liked to the connection used in the currently executing async task. If you would like to influence an active transaction from another task, the connection must be shared:

Transaction isolation-level can be specified if the driver backend supports that:

import asyncio

from databasez import Database, core


async def add_excitement(connection: core.Connection, id: int):
    await connection.execute(
        "UPDATE notes SET text = CONCAT(text, '!!!') WHERE id = :id", {"id": id}
    )


async with Database("database_url") as database:
    async with database.transaction():
        # This note won't exist until the transaction closes...
        await database.execute("INSERT INTO notes(id, text) values (1, 'databases is cool')")
        # ...but child tasks can use this connection now!
        await asyncio.create_task(add_excitement(database.connection(), id=1))

    async with database.transaction(isolation_level="serializable"):
        await database.fetch_val("SELECT text FROM notes WHERE id=1")
        # ^ returns: "databases is cool!!!"

Nested transactions

Nested transactions are fully supported, and are implemented using database savepoints:

import contextlib

import databasez

async with databasez.Database("database_url") as db:
    async with db.transaction() as outer:
        # Do something in the outer transaction
        ...

        # Suppress to prevent influence on the outer transaction
        with contextlib.suppress(ValueError):
            async with db.transaction():
                # Do something in the inner transaction
                ...

                raise ValueError("Abort the inner transaction")

    # Observe the results of the outer transaction,
    # without effects from the inner transaction.
    await db.fetch_all("SELECT * FROM ...")