SQLAlchemy
SQLAlchemy connects to DB9 using the psycopg (psycopg3) driver over pgwire. The modern SQLAlchemy 2.0 API with mapped_column, type-annotated models, and session-based transactions works without modification.
DB9’s E2E test suite validates CRUD, JSONB, joins, subqueries, CTEs, window functions, DDL, transactions, savepoints, and vector operations through SQLAlchemy.
Prerequisites
Section titled “Prerequisites”- A DB9 database (create one)
- Python 3.10+
- SQLAlchemy 2.0+
- psycopg 3.1+ (with binary support)
Create a DB9 Database
Section titled “Create a DB9 Database”db9 create --name sqlalchemy-appGet the connection string:
db9 db status sqlalchemy-appProject Setup
Section titled “Project Setup”mkdir sqlalchemy-db9 && cd sqlalchemy-db9python -m venv .venv && source .venv/bin/activatepip install "sqlalchemy>=2.0.0" "psycopg[binary]>=3.1.0"For vector search, also install:
pip install "pgvector>=0.2.5"Connection
Section titled “Connection”SQLAlchemy requires the postgresql+psycopg:// prefix. Normalize the connection string from DB9:
from sqlalchemy import create_enginefrom sqlalchemy.engine import Enginefrom sqlalchemy.orm import Session, sessionmaker
def build_engine(database_url: str) -> Engine: url = database_url.strip() if url.startswith("postgresql://"): url = "postgresql+psycopg://" + url[len("postgresql://"):] elif url.startswith("postgres://"): url = "postgresql+psycopg://" + url[len("postgres://"):] return create_engine(url, pool_pre_ping=True)
engine = build_engine( "postgresql://sqlalchemy-app.admin:YOUR_PASSWORD@pg.db9.io:5433/postgres")SessionFactory = sessionmaker(bind=engine, expire_on_commit=False)Key engine options:
pool_pre_ping=Trueverifies the connection before each use.expire_on_commit=Falsekeeps loaded attributes accessible after commit.
Define Models
Section titled “Define Models”Use SQLAlchemy 2.0 declarative style with mapped_column:
from datetime import datetimefrom sqlalchemy import Integer, String, Text, DateTime, ForeignKey, funcfrom sqlalchemy.dialects.postgresql import JSONBfrom sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
class Base(DeclarativeBase): pass
class User(Base): __tablename__ = "users"
id: Mapped[int] = mapped_column(Integer, primary_key=True) email: Mapped[str] = mapped_column(String(255), unique=True, nullable=False) name: Mapped[str] = mapped_column(String(100), nullable=False) metadata_: Mapped[dict] = mapped_column("metadata", JSONB, default=dict) created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), server_default=func.now() ) posts: Mapped[list["Post"]] = relationship(back_populates="author", cascade="all, delete-orphan")
class Post(Base): __tablename__ = "posts"
id: Mapped[int] = mapped_column(Integer, primary_key=True) title: Mapped[str] = mapped_column(String(500), nullable=False) content: Mapped[str | None] = mapped_column(Text) author_id: Mapped[int] = mapped_column(ForeignKey("users.id", ondelete="CASCADE")) created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), server_default=func.now() ) author: Mapped["User"] = relationship(back_populates="posts")Create Tables
Section titled “Create Tables”Use Base.metadata.create_all to create tables from your model definitions:
from db import enginefrom models import Base
Base.metadata.create_all(engine)print("Tables created")For production, manage schema changes with raw SQL rather than relying on Alembic’s introspection, which may not fully work with DB9’s information_schema support:
from sqlalchemy import text
with engine.begin() as conn: conn.execute(text("ALTER TABLE users ADD COLUMN IF NOT EXISTS bio TEXT")) conn.execute(text("CREATE INDEX IF NOT EXISTS idx_users_email ON users (email)"))CRUD Operations
Section titled “CRUD Operations”from sqlalchemy import select, textfrom db import SessionFactoryfrom models import User, Post
# Createwith SessionFactory() as session: user = User(email="alice@example.com", name="Alice", metadata_={"role": "admin"}) session.add(user) session.commit()
post = Post(title="Getting Started", content="Hello DB9", author_id=user.id) session.add(post) session.commit()
# Readwith SessionFactory() as session: user = session.get(User, 1) print(user.name) # "Alice"
users = session.execute( select(User).where(User.name == "Alice") ).scalars().all()
# Updatewith SessionFactory() as session: user = session.get(User, 1) user.name = "Alice Updated" session.commit()
# Upsert (INSERT ... ON CONFLICT)from sqlalchemy.dialects.postgresql import insert
with SessionFactory() as session: stmt = insert(User).values(email="alice@example.com", name="Alice V2") stmt = stmt.on_conflict_do_update( index_elements=["email"], set_={"name": stmt.excluded.name}, ) session.execute(stmt) session.commit()
# Deletewith SessionFactory() as session: user = session.get(User, 1) session.delete(user) # cascades to posts session.commit()JSONB Queries
Section titled “JSONB Queries”SQLAlchemy’s PostgreSQL JSONB support works with DB9:
from sqlalchemy import selectfrom models import User
with SessionFactory() as session: # Containment query admins = session.execute( select(User).where(User.metadata_.contains({"role": "admin"})) ).scalars().all()
# Raw SQL for advanced JSONB operationsfrom sqlalchemy import text
with engine.begin() as conn: conn.execute(text( "UPDATE users SET metadata = jsonb_set(metadata, '{level}', '5'::jsonb) WHERE id = 1" ))Transactions
Section titled “Transactions”# Explicit commitwith SessionFactory() as session: session.add(User(email="bob@example.com", name="Bob")) session.flush() # sends to DB but doesn't commit session.commit() # commits the transaction
# Rollbackwith SessionFactory() as session: session.add(User(email="temp@example.com", name="Temp")) session.flush() session.rollback() # user was not persisted
# Savepointswith engine.begin() as conn: conn.execute(text("SAVEPOINT sp1")) conn.execute(text("INSERT INTO users (email, name) VALUES ('x@x.com', 'X')")) conn.execute(text("ROLLBACK TO SAVEPOINT sp1")) conn.execute(text("RELEASE SAVEPOINT sp1"))Supported isolation levels: READ COMMITTED, REPEATABLE READ, SERIALIZABLE (runs as REPEATABLE READ on DB9).
with engine.begin() as conn: conn.execute(text("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ")) # ... queries run at snapshot isolationJoins and Aggregations
Section titled “Joins and Aggregations”from sqlalchemy import select, funcfrom models import User, Post
with SessionFactory() as session: # Join with aggregation results = session.execute( select(User.name, func.count(Post.id).label("post_count")) .join(Post, User.id == Post.author_id) .group_by(User.name) .having(func.count(Post.id) >= 1) .order_by(User.name) ).all()
# Subquery subq = ( select(Post.author_id, func.count().label("cnt")) .group_by(Post.author_id) .subquery() ) top_authors = session.execute( select(User.name) .join(subq, User.id == subq.c.author_id) .where(subq.c.cnt >= 2) ).scalars().all()Window Functions and CTEs
Section titled “Window Functions and CTEs”Use raw SQL for window functions and CTEs:
from sqlalchemy import text
with engine.begin() as conn: # Window function rows = conn.execute(text(""" SELECT name, email, ROW_NUMBER() OVER (ORDER BY created_at) AS row_num FROM users """)).fetchall()
# CTE rows = conn.execute(text(""" WITH recent AS ( SELECT * FROM posts WHERE created_at > now() - interval '7 days' ) SELECT u.name, r.title FROM users u JOIN recent r ON u.id = r.author_id """)).fetchall()Vector Search
Section titled “Vector Search”Install pgvector for SQLAlchemy vector type support:
from pgvector.sqlalchemy import Vectorfrom sqlalchemy import Integer, Textfrom sqlalchemy.orm import Mapped, mapped_columnfrom models import Base
EMBEDDING_DIM = 1024
class Document(Base): __tablename__ = "documents"
id: Mapped[int] = mapped_column(Integer, primary_key=True) content: Mapped[str] = mapped_column(Text, nullable=False) embedding: Mapped[list[float]] = mapped_column(Vector(EMBEDDING_DIM), nullable=False)Create the table and HNSW index:
from sqlalchemy import textfrom db import enginefrom vector_models import Base
with engine.begin() as conn: conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) Base.metadata.create_all(conn) conn.execute(text(""" CREATE INDEX IF NOT EXISTS idx_documents_embedding ON documents USING hnsw (embedding vector_cosine_ops) WITH (m = 16, ef_construction = 64) """))Search by cosine distance:
from sqlalchemy import selectfrom vector_models import Document
def search(session, query_embedding: list[float], top_k: int = 5): distance = Document.embedding.cosine_distance(query_embedding) stmt = ( select(Document, distance.label("distance")) .order_by(distance) .limit(top_k) ) return session.execute(stmt).all()
with SessionFactory() as session: results = search(session, query_embedding=[0.1, 0.2, ...]) for doc, dist in results: print(f"{doc.content} (distance: {dist:.4f})")DB9 supports HNSW indexes with vector_cosine_ops, vector_l2_ops, and vector_ip_ops. IVFFlat is not supported.
Full-Text Search
Section titled “Full-Text Search”DB9 supports PostgreSQL full-text search with to_tsvector, websearch_to_tsquery, and ts_rank:
from sqlalchemy import text
with SessionFactory() as session: results = session.execute(text(""" SELECT content, ts_rank(to_tsvector('english', content), websearch_to_tsquery('english', :query)) AS rank FROM documents WHERE to_tsvector('english', content) @@ websearch_to_tsquery('english', :query) ORDER BY rank DESC LIMIT :top_k """), {"query": "database serverless", "top_k": 10}).mappings().all()For GIN indexes on text columns, note that DB9 accepts the syntax but falls back to table scan. Performance is comparable for small-to-medium datasets.
Production Notes
Section titled “Production Notes”- Driver: Use
psycopg(psycopg3), notpsycopg2. The connection string prefix must bepostgresql+psycopg://. - Connection pooling: SQLAlchemy’s built-in pool works with DB9. Use
pool_pre_ping=Trueto handle idle connection drops. - Alembic migrations: Alembic’s autogenerate relies on
information_schemaintrospection that may not fully work with DB9. Use explicit raw SQL for schema changes instead. - hstore: Pass
use_native_hstore=Falsetocreate_engineif you use hstore columns. This keeps hstore values as strings rather than relying on psycopg’s automatic conversion. - Connection string: Use the format
postgresql+psycopg://sqlalchemy-app.admin:{password}@pg.db9.io:5433/postgres.
Troubleshooting
Section titled “Troubleshooting”OperationalError: FATAL: authentication failed
Section titled “OperationalError: FATAL: authentication failed”Verify the tenant ID in the username. DB9 expects {database-name}.admin as the username (e.g., sqlalchemy-app.admin).
psycopg2 import errors
Section titled “psycopg2 import errors”DB9’s SQLAlchemy integration uses psycopg (v3), not psycopg2. Install psycopg[binary] and use the postgresql+psycopg:// prefix, not postgresql+psycopg2://.
CREATE EXTENSION vector fails
Section titled “CREATE EXTENSION vector fails”DB9 includes pgvector natively. The extension may already be available. Wrap the call in a try/except or use CREATE EXTENSION IF NOT EXISTS vector.
Alembic autogenerate produces empty migrations
Section titled “Alembic autogenerate produces empty migrations”DB9’s information_schema coverage is limited. Write migration SQL manually and use Alembic’s op.execute() to run it, or manage migrations as numbered SQL files.
Verified Compatibility
Section titled “Verified Compatibility”Tested with SQLAlchemy 2.0+ and psycopg 3.1+ against DB9. E2E smoke tests cover:
| Category | Status |
|---|---|
| Connection and pooling | Pass |
| CRUD operations | Pass |
| JSONB containment queries | Pass |
| Transactions and savepoints | Pass |
| Joins and aggregations | Pass |
| Subqueries and CTEs | Pass |
| Window functions | Pass |
| DDL (ALTER TABLE, CREATE INDEX) | Pass |
| Vector operations and HNSW | Pass |
| Upsert (ON CONFLICT) | Pass |
Next Pages
Section titled “Next Pages”- RAG with Built-in Embeddings — vector search with DB9-native embedding
- Connect — connection strings and authentication
- Vector Extension — HNSW indexes and distance operators
- Compatibility Matrix — full PostgreSQL compatibility surface
- Prisma — Node.js ORM alternative
- Drizzle — TypeScript ORM alternative