Skip to content
Discord Get Started

SQLAlchemy

SQLAlchemy connects to DB9 using the psycopg (psycopg3) driver over pgwire. The modern SQLAlchemy 2.0 API with mapped_column, type-annotated models, and session-based transactions works without modification.

DB9’s E2E test suite validates CRUD, JSONB, joins, subqueries, CTEs, window functions, DDL, transactions, savepoints, and vector operations through SQLAlchemy.

  • A DB9 database (create one)
  • Python 3.10+
  • SQLAlchemy 2.0+
  • psycopg 3.1+ (with binary support)
Terminal
db9 create --name sqlalchemy-app

Get the connection string:

Terminal
db9 db status sqlalchemy-app
Terminal
mkdir sqlalchemy-db9 && cd sqlalchemy-db9
python -m venv .venv && source .venv/bin/activate
pip install "sqlalchemy>=2.0.0" "psycopg[binary]>=3.1.0"

For vector search, also install:

Terminal
pip install "pgvector>=0.2.5"

SQLAlchemy requires the postgresql+psycopg:// prefix. Normalize the connection string from DB9:

db.py
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
from sqlalchemy.orm import Session, sessionmaker
def build_engine(database_url: str) -> Engine:
url = database_url.strip()
if url.startswith("postgresql://"):
url = "postgresql+psycopg://" + url[len("postgresql://"):]
elif url.startswith("postgres://"):
url = "postgresql+psycopg://" + url[len("postgres://"):]
return create_engine(url, pool_pre_ping=True)
engine = build_engine(
"postgresql://sqlalchemy-app.admin:YOUR_PASSWORD@pg.db9.io:5433/postgres"
)
SessionFactory = sessionmaker(bind=engine, expire_on_commit=False)

Key engine options:

  • pool_pre_ping=True verifies the connection before each use.
  • expire_on_commit=False keeps loaded attributes accessible after commit.

Use SQLAlchemy 2.0 declarative style with mapped_column:

models.py
from datetime import datetime
from sqlalchemy import Integer, String, Text, DateTime, ForeignKey, func
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
email: Mapped[str] = mapped_column(String(255), unique=True, nullable=False)
name: Mapped[str] = mapped_column(String(100), nullable=False)
metadata_: Mapped[dict] = mapped_column("metadata", JSONB, default=dict)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
posts: Mapped[list["Post"]] = relationship(back_populates="author", cascade="all, delete-orphan")
class Post(Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
title: Mapped[str] = mapped_column(String(500), nullable=False)
content: Mapped[str | None] = mapped_column(Text)
author_id: Mapped[int] = mapped_column(ForeignKey("users.id", ondelete="CASCADE"))
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
author: Mapped["User"] = relationship(back_populates="posts")

Use Base.metadata.create_all to create tables from your model definitions:

setup.py
from db import engine
from models import Base
Base.metadata.create_all(engine)
print("Tables created")

For production, manage schema changes with raw SQL rather than relying on Alembic’s introspection, which may not fully work with DB9’s information_schema support:

Python
from sqlalchemy import text
with engine.begin() as conn:
conn.execute(text("ALTER TABLE users ADD COLUMN IF NOT EXISTS bio TEXT"))
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_users_email ON users (email)"))
Python
from sqlalchemy import select, text
from db import SessionFactory
from models import User, Post
# Create
with SessionFactory() as session:
user = User(email="alice@example.com", name="Alice", metadata_={"role": "admin"})
session.add(user)
session.commit()
post = Post(title="Getting Started", content="Hello DB9", author_id=user.id)
session.add(post)
session.commit()
# Read
with SessionFactory() as session:
user = session.get(User, 1)
print(user.name) # "Alice"
users = session.execute(
select(User).where(User.name == "Alice")
).scalars().all()
# Update
with SessionFactory() as session:
user = session.get(User, 1)
user.name = "Alice Updated"
session.commit()
# Upsert (INSERT ... ON CONFLICT)
from sqlalchemy.dialects.postgresql import insert
with SessionFactory() as session:
stmt = insert(User).values(email="alice@example.com", name="Alice V2")
stmt = stmt.on_conflict_do_update(
index_elements=["email"],
set_={"name": stmt.excluded.name},
)
session.execute(stmt)
session.commit()
# Delete
with SessionFactory() as session:
user = session.get(User, 1)
session.delete(user) # cascades to posts
session.commit()

SQLAlchemy’s PostgreSQL JSONB support works with DB9:

Python
from sqlalchemy import select
from models import User
with SessionFactory() as session:
# Containment query
admins = session.execute(
select(User).where(User.metadata_.contains({"role": "admin"}))
).scalars().all()
# Raw SQL for advanced JSONB operations
from sqlalchemy import text
with engine.begin() as conn:
conn.execute(text(
"UPDATE users SET metadata = jsonb_set(metadata, '{level}', '5'::jsonb) WHERE id = 1"
))
Python
# Explicit commit
with SessionFactory() as session:
session.add(User(email="bob@example.com", name="Bob"))
session.flush() # sends to DB but doesn't commit
session.commit() # commits the transaction
# Rollback
with SessionFactory() as session:
session.add(User(email="temp@example.com", name="Temp"))
session.flush()
session.rollback()
# user was not persisted
# Savepoints
with engine.begin() as conn:
conn.execute(text("SAVEPOINT sp1"))
conn.execute(text("INSERT INTO users (email, name) VALUES ('x@x.com', 'X')"))
conn.execute(text("ROLLBACK TO SAVEPOINT sp1"))
conn.execute(text("RELEASE SAVEPOINT sp1"))

Supported isolation levels: READ COMMITTED, REPEATABLE READ, SERIALIZABLE (runs as REPEATABLE READ on DB9).

Python
with engine.begin() as conn:
conn.execute(text("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ"))
# ... queries run at snapshot isolation
Python
from sqlalchemy import select, func
from models import User, Post
with SessionFactory() as session:
# Join with aggregation
results = session.execute(
select(User.name, func.count(Post.id).label("post_count"))
.join(Post, User.id == Post.author_id)
.group_by(User.name)
.having(func.count(Post.id) >= 1)
.order_by(User.name)
).all()
# Subquery
subq = (
select(Post.author_id, func.count().label("cnt"))
.group_by(Post.author_id)
.subquery()
)
top_authors = session.execute(
select(User.name)
.join(subq, User.id == subq.c.author_id)
.where(subq.c.cnt >= 2)
).scalars().all()

Use raw SQL for window functions and CTEs:

Python
from sqlalchemy import text
with engine.begin() as conn:
# Window function
rows = conn.execute(text("""
SELECT name, email,
ROW_NUMBER() OVER (ORDER BY created_at) AS row_num
FROM users
""")).fetchall()
# CTE
rows = conn.execute(text("""
WITH recent AS (
SELECT * FROM posts WHERE created_at > now() - interval '7 days'
)
SELECT u.name, r.title
FROM users u JOIN recent r ON u.id = r.author_id
""")).fetchall()

Install pgvector for SQLAlchemy vector type support:

vector_models.py
from pgvector.sqlalchemy import Vector
from sqlalchemy import Integer, Text
from sqlalchemy.orm import Mapped, mapped_column
from models import Base
EMBEDDING_DIM = 1024
class Document(Base):
__tablename__ = "documents"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
content: Mapped[str] = mapped_column(Text, nullable=False)
embedding: Mapped[list[float]] = mapped_column(Vector(EMBEDDING_DIM), nullable=False)

Create the table and HNSW index:

Python
from sqlalchemy import text
from db import engine
from vector_models import Base
with engine.begin() as conn:
conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector"))
Base.metadata.create_all(conn)
conn.execute(text("""
CREATE INDEX IF NOT EXISTS idx_documents_embedding
ON documents USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64)
"""))

Search by cosine distance:

Python
from sqlalchemy import select
from vector_models import Document
def search(session, query_embedding: list[float], top_k: int = 5):
distance = Document.embedding.cosine_distance(query_embedding)
stmt = (
select(Document, distance.label("distance"))
.order_by(distance)
.limit(top_k)
)
return session.execute(stmt).all()
with SessionFactory() as session:
results = search(session, query_embedding=[0.1, 0.2, ...])
for doc, dist in results:
print(f"{doc.content} (distance: {dist:.4f})")

DB9 supports HNSW indexes with vector_cosine_ops, vector_l2_ops, and vector_ip_ops. IVFFlat is not supported.

DB9 supports PostgreSQL full-text search with to_tsvector, websearch_to_tsquery, and ts_rank:

Python
from sqlalchemy import text
with SessionFactory() as session:
results = session.execute(text("""
SELECT content,
ts_rank(to_tsvector('english', content),
websearch_to_tsquery('english', :query)) AS rank
FROM documents
WHERE to_tsvector('english', content) @@ websearch_to_tsquery('english', :query)
ORDER BY rank DESC
LIMIT :top_k
"""), {"query": "database serverless", "top_k": 10}).mappings().all()

For GIN indexes on text columns, note that DB9 accepts the syntax but falls back to table scan. Performance is comparable for small-to-medium datasets.

  • Driver: Use psycopg (psycopg3), not psycopg2. The connection string prefix must be postgresql+psycopg://.
  • Connection pooling: SQLAlchemy’s built-in pool works with DB9. Use pool_pre_ping=True to handle idle connection drops.
  • Alembic migrations: Alembic’s autogenerate relies on information_schema introspection that may not fully work with DB9. Use explicit raw SQL for schema changes instead.
  • hstore: Pass use_native_hstore=False to create_engine if you use hstore columns. This keeps hstore values as strings rather than relying on psycopg’s automatic conversion.
  • Connection string: Use the format postgresql+psycopg://sqlalchemy-app.admin:{password}@pg.db9.io:5433/postgres.

OperationalError: FATAL: authentication failed

Section titled “OperationalError: FATAL: authentication failed”

Verify the tenant ID in the username. DB9 expects {database-name}.admin as the username (e.g., sqlalchemy-app.admin).

DB9’s SQLAlchemy integration uses psycopg (v3), not psycopg2. Install psycopg[binary] and use the postgresql+psycopg:// prefix, not postgresql+psycopg2://.

DB9 includes pgvector natively. The extension may already be available. Wrap the call in a try/except or use CREATE EXTENSION IF NOT EXISTS vector.

Alembic autogenerate produces empty migrations

Section titled “Alembic autogenerate produces empty migrations”

DB9’s information_schema coverage is limited. Write migration SQL manually and use Alembic’s op.execute() to run it, or manage migrations as numbered SQL files.

Tested with SQLAlchemy 2.0+ and psycopg 3.1+ against DB9. E2E smoke tests cover:

CategoryStatus
Connection and poolingPass
CRUD operationsPass
JSONB containment queriesPass
Transactions and savepointsPass
Joins and aggregationsPass
Subqueries and CTEsPass
Window functionsPass
DDL (ALTER TABLE, CREATE INDEX)Pass
Vector operations and HNSWPass
Upsert (ON CONFLICT)Pass