Skip to content

Queries

Databasez supports queries using SQLAlchemy core as well as raw SQL. This means you can take advantage or the full power of the package as well as be free to do your own custom queries.

Declarations

To be able to make queries using the SQLAlchemy core you will need to create/declare your tables in your codebase using the proper syntax.

It is good practice in any case as this make it a lot easier to keep your database schema in sync with the code that is accessing it. This also allows to use a database migration tool of choice to manage any schema change.

import sqlalchemy

metadata = sqlalchemy.MetaData()

user = sqlalchemy.Table(
    "users",
    metadata,
    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
    sqlalchemy.Column("name", sqlalchemy.String(length=150)),
    sqlalchemy.Column("address", sqlalchemy.String(length=500)),
)

Declaring a database table using SQLAlchemy core is very simple and it should follow the best practices mentioned by the author. You can create as many tables and columns as you wish.

Regarding the columns, you can use any of the SQLAlchemy column types such as sqlalchemy.JSON or a custom column type.

Another example could be:

import sqlalchemy

metadata = sqlalchemy.MetaData()

user = sqlalchemy.Table(
    "users",
    metadata,
    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
    sqlalchemy.Column("name", sqlalchemy.String(length=150)),
    sqlalchemy.Column("address", sqlalchemy.String(length=500)),
    sqlalchemy.Column("config", sqlalchemy.JSON(none_as_null=True)),
)

Creating tables

As per fork of Databases, Databasez also does not use SQLAlchemy's engine for database access internally. The usual SQLAlchemy core way to create tables with create_all is therefore not available.

To circunvent this situation you can use SQLAlchemy to compile the query to SQL and then execute it with databasez.

import sqlalchemy
from databasez import Database

database = Database("postgresql+asyncpg://localhost/example")

# Establish the connection pool
await database.connect()

metadata = sqlalchemy.MetaData()
dialect = sqlalchemy.dialects.postgresql.dialect()

# Define your table(s)
users = sqlalchemy.Table(
    "users",
    metadata,
    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
    sqlalchemy.Column("name", sqlalchemy.String(length=150)),
    sqlalchemy.Column("address", sqlalchemy.String(length=500)),
)

# Create tables
for table in metadata.tables.values():
    # Set `if_not_exists=False` if you want the query to throw an
    # exception when the table already exists
    schema = sqlalchemy.schema.CreateTable(table, if_not_exists=True)
    query = str(schema.compile(dialect=dialect))
    await database.execute(query=query)

# Close all connections in the connection pool
await database.disconnect()

Note

Note that this is way of creating tables is only useful for local testing and experimentation. For big and serious projects, you should be using something like Alembic or any proper migration solution tool.

Queries

Since you can use SQLAlchemy core, that also means you can also use the queries. Check out the official tutorial.

from databasez import Database

database = Database("postgresql+asyncpg://localhost/example")


# Establish the connection pool
await database.connect()

# Execute
query = users.insert()
values = {"name": "databasez", "address": "London, United Kingdom"}
await database.execute(query=query, values=values)

# Execute many
query = users.insert()
values = [
    {"name": "databasez", "address": "London, United Kingdom"},
    {"name": "another name", "address": "The Hague, Netherlands"},
]
await database.execute_many(query=query, values=values)

# Fetch multiple rows
query = users.select()
rows = await database.fetch_all(query=query)

# Fetch single row
query = users.select()
row = await database.fetch_one(query=query)

# Fetch single value, defaults to `column=0`.
query = users.select()
value = await database.fetch_val(query=query)

# Fetch multiple rows without loading them all into memory at once
query = users.select()
async for row in database.iterate(query=query):
    ...

# Close all connections in the connection pool
await database.disconnect()

Connections are managed as task-local state, with driver implementations using connection pooling behind the scenes.

Raw Queries

from databasez import Database

database = Database("postgresql+asyncpg://localhost/example")


# Establish the connection pool
await database.connect()

# Execute
query = "INSERT INTO users(name, address) VALUES (:name, :address)"
values = {"text": "databasez", "address": "London, United Kingdom"}
await database.execute(query=query, values=values)

# Execute many
query = "INSERT INTO users(name, address) VALUES (:name, :address)"
values = [
    {"name": "databasez", "address": "London, United Kingdom"},
    {"name": "another name", "address": "The Hague, Netherlands"},
]
await database.execute_many(query=query, values=values)

# Fetch multiple rows
query = "SELECT * FROM users WHERE address = :address"
rows = await database.fetch_all(query=query, values={"address": "London, United Kingdom"})

# Fetch single row
query = "SELECT * FROM users WHERE id = :id"
result = await database.fetch_one(query=query, values={"id": 1})

Tip

The query arguments should follow the :query_arg style.

Databasez allows the same level of flexibility as per its ancestor and adds its own extra flavours on the top.