Queries¶
Databasez supports queries using SQLAlchemy core as well as raw SQL. This means you can take advantage or the full power of the package as well as be free to do your own custom queries.
Declarations¶
To be able to make queries using the SQLAlchemy core you will need to create/declare your tables in your codebase using the proper syntax.
It is good practice in any case as this make it a lot easier to keep your database schema in sync with the code that is accessing it. This also allows to use a database migration tool of choice to manage any schema change.
import sqlalchemy
metadata = sqlalchemy.MetaData()
user = sqlalchemy.Table(
"users",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("name", sqlalchemy.String(length=150)),
sqlalchemy.Column("address", sqlalchemy.String(length=500)),
)
Declaring a database table using SQLAlchemy core is very simple and it should follow the best practices mentioned by the author. You can create as many tables and columns as you wish.
Regarding the columns, you can use any of the SQLAlchemy column types such as sqlalchemy.JSON
or a custom column type.
Another example could be:
import sqlalchemy
metadata = sqlalchemy.MetaData()
user = sqlalchemy.Table(
"users",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("name", sqlalchemy.String(length=150)),
sqlalchemy.Column("address", sqlalchemy.String(length=500)),
sqlalchemy.Column("config", sqlalchemy.JSON(none_as_null=True)),
)
Creating tables¶
As per fork of Databases, Databasez also does not use SQLAlchemy's engine for database access
internally. The usual SQLAlchemy core way to create tables with create_all
is therefore not available.
To circunvent this situation you can use SQLAlchemy to compile the query to SQL and then execute it with databasez.
import sqlalchemy
from databasez import Database
database = Database("postgresql+asyncpg://localhost/example")
# Establish the connection pool
await database.connect()
metadata = sqlalchemy.MetaData()
dialect = sqlalchemy.dialects.postgresql.dialect()
# Define your table(s)
users = sqlalchemy.Table(
"users",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("name", sqlalchemy.String(length=150)),
sqlalchemy.Column("address", sqlalchemy.String(length=500)),
)
# Create tables (manually)
for table in metadata.tables.values():
# Set `if_not_exists=False` if you want the query to throw an
# exception when the table already exists
schema = sqlalchemy.schema.CreateTable(table, if_not_exists=True)
query = str(schema.compile(dialect=dialect))
await database.execute(query=query)
# Create tables automatic
await database.create_all(metadata)
# Close all connections in the connection pool
await database.disconnect()
Note
Note that this is way of creating tables is only useful for local testing and experimentation. For big and serious projects, you should be using something like Alembic or any proper migration solution tool.
Queries¶
Since you can use SQLAlchemy core, that also means you can also use the queries. Check out the official tutorial.
import sqlalchemy
from databasez import Database
database = Database("postgresql+asyncpg://localhost/example")
# Establish the connection pool
await database.connect()
metadata = sqlalchemy.MetaData()
# Define your table(s)
users = sqlalchemy.Table(
"users",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("name", sqlalchemy.String(length=150)),
sqlalchemy.Column("address", sqlalchemy.String(length=500)),
)
# Execute
query = users.insert()
values = {"name": "databasez", "address": "London, United Kingdom"}
await database.execute(query=query, values=values)
# Execute many
query = users.insert()
values = [
{"name": "databasez", "address": "London, United Kingdom"},
{"name": "another name", "address": "The Hague, Netherlands"},
]
await database.execute_many(query=query, values=values)
# Fetch multiple rows
query = users.select()
rows = await database.fetch_all(query=query)
# Fetch single row
query = users.select()
row = await database.fetch_one(query=query)
# Fetch single value, defaults to `column=0`.
query = users.select()
value = await database.fetch_val(query=query)
# Fetch multiple rows without loading them all into memory at once
query = users.select()
async for row in database.iterate(query=query):
...
# Fetch multiple rows in batches
query = users.select()
async for batch_tuple in database.batched_iterate(query=query, batch_size=10):
...
# Fetch multiple rows in batches and use as wrapper list
query = users.select()
async for batch_list in database.batched_iterate(query=query, batch_size=10, batch_wrapper=list):
...
# Close all connections in the connection pool
await database.disconnect()
Connections are managed as task-local state, with driver implementations using connection pooling behind the scenes.
Task-local implies, you need a new task for a new connection. This is important when querying during iterations.
You will need to wrap the new query in a Task (e.g. by create_task
).
Raw Queries¶
from databasez import Database
database = Database("postgresql+asyncpg://localhost/example")
# Establish the connection pool
await database.connect()
# Execute
query = "INSERT INTO users(name, address) VALUES (:name, :address)"
values = {"text": "databasez", "address": "London, United Kingdom"}
await database.execute(query=query, values=values)
# Execute many
query = "INSERT INTO users(name, address) VALUES (:name, :address)"
values = [
{"name": "databasez", "address": "London, United Kingdom"},
{"name": "another name", "address": "The Hague, Netherlands"},
]
await database.execute_many(query=query, values=values)
# Fetch multiple rows
query = "SELECT * FROM users WHERE address = :address"
rows = await database.fetch_all(query=query, values={"address": "London, United Kingdom"})
# Fetch single row
query = "SELECT * FROM users WHERE id = :id"
result = await database.fetch_one(query=query, values={"id": 1})
Tip
The query arguments should follow the :query_arg
style.
Databasez allows the same level of flexibility as per its ancestor and adds its own extra flavours on the top.
Timeouts¶
Most query methods and the connection() method of database support a timeout parameter. It can be used to limit the time used for an operation.
Note: There is a stronger parameter which should be only used for debugging (multithreading):
databasez.utils.DATABASEZ_RESULT_TIMEOUT
It is a stronger timeout which applies everywhere where arun_coroutine_threadsafe
with a different loop is used (most methods).