Chapter 7: Python: From psycopg & asyncpg to Production
If you've arrived at this chapter, you write Python. Perhaps you use Django with its ORM and Celery for background tasks. Perhaps you prefer FastAPI with SQLAlchemy and asyncpg for raw speed. Perhaps you use Flask, or no framework at all, just psycopg and honest SQL.
Regardless, the principles from Part II apply identically. A materialized view appears to your Python application as a table. You query it. You schedule its refresh. Your users experience single-digit millisecond response times where they previously experienced seconds of loading spinners.
This chapter covers every major Python pathway to PostgreSQL materialized views, organized by increasing abstraction: raw drivers first, then SQLAlchemy and FastAPI, then the full Django integration with ORM mapping, migrations, dedicated packages, connection pooling, and production scheduling. Find your stack and begin. Each section stands alone.
The Drivers: psycopg & asyncpg
Python has two primary PostgreSQL drivers, and the choice between them shapes everything that follows.
psycopg2 is the veteran — synchronous, battle-tested, and the default driver for Django and most of the Python PostgreSQL ecosystem for over a decade. Its successor, psycopg3 (installed as the psycopg package), adds native async support, built-in connection pooling, pipeline mode for batched queries, and is now Django's recommended driver. psycopg3 is required for Django 5.1's native connection pooling feature, which we'll cover in the production section. For new projects, use psycopg3. For existing codebases, psycopg2 continues to work and will for some time yet.
asyncpg is purpose-built for asynchronous Python. Written in Cython, it speaks the PostgreSQL binary protocol natively rather than through libpq, which gives it approximately 3x speed advantage over psycopg2 in benchmarks. It is the natural choice for FastAPI, Starlette, and any asyncio-based application where throughput and latency are paramount.
Materialized views with psycopg
import psycopg2 # or: import psycopg
conn = psycopg2.connect("dbname=myapp user=postgres")
conn.autocommit = True # Required for DDL operations
# Create the materialized view (run once — in a migration or setup script)
with conn.cursor() as cur:
cur.execute("""
CREATE MATERIALIZED VIEW IF NOT EXISTS mv_dashboard AS
SELECT
date_trunc('day', created_at)::date AS day,
COUNT(*) AS events,
COUNT(DISTINCT user_id) AS unique_users,
SUM(CASE WHEN event_type = 'purchase'
THEN amount ELSE 0 END)::numeric(13,2) AS revenue
FROM events
WHERE created_at >= CURRENT_DATE - INTERVAL '90 days'
GROUP BY 1
WITH DATA
""")
cur.execute("""
CREATE UNIQUE INDEX IF NOT EXISTS idx_mv_dashboard_day
ON mv_dashboard (day)
""")
# Refresh
with conn.cursor() as cur:
cur.execute("REFRESH MATERIALIZED VIEW CONCURRENTLY mv_dashboard")
# Query — identical to querying any table
with conn.cursor() as cur:
cur.execute("""
SELECT day, unique_users, revenue
FROM mv_dashboard
ORDER BY day DESC
LIMIT 30
""")
rows = cur.fetchall()
for row in rows:
print(f"{row[0]}: {row[1]} users, ${row[2]} revenue") The pattern with psycopg3 is nearly identical — the connection API is compatible by design. The primary difference is that psycopg3 supports async with await psycopg.AsyncConnection.connect(...) for async contexts, and its connection pool (psycopg_pool.ConnectionPool) is a first-class feature rather than a third-party addition.
Materialized views with asyncpg
import asyncpg
import asyncio
async def main():
# Connection pool — reuse across your application's lifetime
pool = await asyncpg.create_pool(
"postgresql://postgres@localhost/myapp",
min_size=5,
max_size=20
)
# Query — returns list of Record objects with attribute access
rows = await pool.fetch("""
SELECT day, unique_users, revenue
FROM mv_dashboard
ORDER BY day DESC
LIMIT 30
""")
for row in rows:
print(f"{row['day']}: {row['unique_users']} users, ${row['revenue']}")
# Refresh
await pool.execute(
"REFRESH MATERIALIZED VIEW CONCURRENTLY mv_dashboard"
)
await pool.close()
asyncio.run(main()) asyncpg's connection pool handles concurrency natively. For a FastAPI application serving hundreds of concurrent requests, each request acquires a connection from the pool, executes a single indexed query against the materialized view, and returns — without contention, without connection setup overhead, and without the query recomputation that would occur against the raw source tables.
Background refresh with asyncpg
For applications that manage their own event loop — FastAPI, Starlette, aiohttp — a background refresh task integrates naturally:
import asyncio
import logging
logger = logging.getLogger(__name__)
async def refresh_loop(pool: asyncpg.Pool, interval: int = 900):
"""Refresh the dashboard materialized view every 15 minutes."""
while True:
try:
await pool.execute(
"REFRESH MATERIALIZED VIEW CONCURRENTLY mv_dashboard"
)
logger.info("Dashboard MV refreshed successfully")
except Exception as e:
logger.error(f"Dashboard MV refresh failed: {e}")
await asyncio.sleep(interval) Start this alongside your application: asyncio.create_task(refresh_loop(app.state.pool)) in FastAPI's lifespan handler. The refresh runs in the background, on schedule, without blocking request handling.
SQLAlchemy & FastAPI
SQLAlchemy serves Python developers who want more structure than raw SQL but aren't using Django. Materialized views are not first-class citizens in SQLAlchemy — there is no MaterializedView base class — but the integration is clean and the ORM works exactly as you'd expect.
The model
from sqlalchemy import Column, Integer, Date, Numeric, text
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass
class DashboardMetrics(Base):
__tablename__ = 'mv_dashboard'
day = Column(Date, primary_key=True)
events = Column(Integer, nullable=False)
unique_users = Column(Integer, nullable=False)
revenue = Column(Numeric(13, 2), nullable=False)
# Prevent Alembic from trying to CREATE/ALTER this table
__table_args__ = {'info': {'is_view': True}} The info dictionary is a convention — Alembic won't act on it automatically, but your migration configuration can check it to skip materialized view tables during autogeneration. The model is queryable with standard SQLAlchemy ORM methods:
from sqlalchemy.orm import Session
with Session(engine) as session:
recent = (
session.query(DashboardMetrics)
.order_by(DashboardMetrics.day.desc())
.limit(30)
.all()
)
for row in recent:
print(f"{row.day}: {row.unique_users} users, ${row.revenue}") Alembic migration
"""create mv_dashboard materialized view
Revision ID and down_revision are auto-generated by `alembic revision`.
"""
from alembic import op
def upgrade():
op.execute("""
CREATE MATERIALIZED VIEW IF NOT EXISTS mv_dashboard AS
SELECT
date_trunc('day', created_at)::date AS day,
COUNT(*)::integer AS events,
COUNT(DISTINCT user_id)::integer AS unique_users,
SUM(CASE WHEN event_type = 'purchase'
THEN amount ELSE 0 END)::numeric(13,2) AS revenue
FROM events
WHERE created_at >= CURRENT_DATE - INTERVAL '90 days'
GROUP BY 1
WITH DATA
""")
op.execute("""
CREATE UNIQUE INDEX IF NOT EXISTS idx_mv_dashboard_day
ON mv_dashboard (day)
""")
def downgrade():
op.execute("DROP MATERIALIZED VIEW IF EXISTS mv_dashboard") Refresh
from sqlalchemy import text
with engine.connect() as conn:
conn.execute(text(
"REFRESH MATERIALIZED VIEW CONCURRENTLY mv_dashboard"
))
conn.commit() FastAPI integration
The complete pattern for a FastAPI application using asyncpg and materialized views:
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request
import asyncpg
import asyncio
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: create pool and start refresh task
app.state.pool = await asyncpg.create_pool(
"postgresql://postgres@localhost/myapp",
min_size=5, max_size=20
)
refresh_task = asyncio.create_task(
refresh_loop(app.state.pool, interval=900)
)
yield
# Shutdown: cancel refresh and close pool
refresh_task.cancel()
await app.state.pool.close()
app = FastAPI(lifespan=lifespan)
@app.get("/api/dashboard")
async def get_dashboard(request: Request):
pool = request.app.state.pool
rows = await pool.fetch("""
SELECT day, unique_users, revenue
FROM mv_dashboard
ORDER BY day DESC
LIMIT 30
""")
return [dict(row) for row in rows] This replaces the pattern where a FastAPI endpoint would either join four tables on every request (slow) or read from a Redis cache that might be stale (fragile). The materialized view serves pre-computed data at index-scan speed. The background task refreshes it on schedule. No Redis. No cache invalidation logic. No additional infrastructure.
For applications using SQLAlchemy's async engine with FastAPI, the same model class works with create_async_engine("postgresql+asyncpg://...") and AsyncSession. The query syntax is identical; the engine handles the async execution.
Django: The Full Integration
Django is where most Python developers encounter PostgreSQL, and where materialized view integration requires the most framework-specific knowledge. Three approaches, in order of increasing sophistication.
The model
The critical Django pattern for materialized views:
from django.db import models, connection
class DashboardMetrics(models.Model):
day = models.DateField(primary_key=True)
events = models.IntegerField()
unique_users = models.IntegerField()
revenue = models.DecimalField(max_digits=13, decimal_places=2)
class Meta:
managed = False
db_table = 'mv_dashboard'
ordering = ['-day']
def __str__(self):
return f"{self.day}: {self.unique_users} users"
@classmethod
def refresh(cls, concurrently=True):
keyword = "CONCURRENTLY" if concurrently else ""
with connection.cursor() as cursor:
cursor.execute(
f"REFRESH MATERIALIZED VIEW {keyword} {cls._meta.db_table}"
) managed = False is the essential declaration. Without it, Django's migration system will attempt to create a regular table with the same name as your materialized view — which will either fail (if the MV already exists) or create a conflicting object. With managed = False, Django treats the model as a read interface to an existing database object, which is exactly what a materialized view is.
The model is immediately usable with the full Django ORM:
# In a view or API endpoint
from .models import DashboardMetrics
# Standard queryset — filters, ordering, slicing, aggregation all work
recent = DashboardMetrics.objects.filter(
day__gte='2026-01-01'
).order_by('-day')[:30]
# Works with Django REST Framework
class DashboardSerializer(serializers.ModelSerializer):
class Meta:
model = DashboardMetrics
fields = ['day', 'events', 'unique_users', 'revenue']
class DashboardViewSet(viewsets.ReadOnlyModelViewSet):
queryset = DashboardMetrics.objects.all()
serializer_class = DashboardSerializer
filterset_fields = ['day'] The migration
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [('myapp', '0001_initial')]
operations = [
migrations.RunSQL(
sql="""
CREATE MATERIALIZED VIEW mv_dashboard AS
SELECT
date_trunc('day', created_at)::date AS day,
COUNT(*)::integer AS events,
COUNT(DISTINCT user_id)::integer AS unique_users,
SUM(CASE WHEN event_type = 'purchase'
THEN amount ELSE 0 END)::numeric(13,2) AS revenue
FROM myapp_events
WHERE created_at >= CURRENT_DATE - INTERVAL '90 days'
GROUP BY 1
WITH DATA;
CREATE UNIQUE INDEX idx_mv_dashboard_day
ON mv_dashboard (day);
""",
reverse_sql="""
DROP MATERIALIZED VIEW IF EXISTS mv_dashboard CASCADE;
"""
),
] Note myapp_events — Django's default table naming convention. Reference the actual database table names in your SQL, not the Python model class names.
The reverse_sql parameter enables python manage.py migrate myapp 0001 to roll back cleanly. When the materialized view's defining query needs to change, create a new migration that drops and recreates:
migrations.RunSQL(
sql="""
DROP MATERIALIZED VIEW IF EXISTS mv_dashboard CASCADE;
CREATE MATERIALIZED VIEW mv_dashboard AS
-- updated query here
WITH DATA;
CREATE UNIQUE INDEX idx_mv_dashboard_day ON mv_dashboard (day);
""",
reverse_sql="DROP MATERIALIZED VIEW IF EXISTS mv_dashboard CASCADE;"
), django-pgviews-redux: for teams managing many views
When your application grows to three or more materialized views — especially when they have dependencies on each other (a daily view feeding a weekly rollup) — the RunSQL migration pattern becomes tedious. django-pgviews-redux, maintained by xelixdev, provides a dedicated base class with automatic dependency tracking, a management command for creation and refresh, and recent integration with Django's migration system.
pip install django-pgviews-redux # settings.py
INSTALLED_APPS = [
# ...
'django_pgviews',
] # models.py
from django_pgviews import view as pg
class DashboardMetrics(pg.MaterializedView):
concurrent_index = 'day'
sql = """
SELECT
date_trunc('day', created_at)::date AS day,
COUNT(*)::integer AS events,
COUNT(DISTINCT user_id)::integer AS unique_users,
SUM(CASE WHEN event_type = 'purchase'
THEN amount ELSE 0 END)::numeric(13,2) AS revenue
FROM myapp_events
WHERE created_at >= CURRENT_DATE - INTERVAL '90 days'
GROUP BY 1
"""
day = models.DateField(primary_key=True)
events = models.IntegerField()
unique_users = models.IntegerField()
revenue = models.DecimalField(max_digits=13, decimal_places=2)
class Meta:
managed = False
db_table = 'mv_dashboard'
indexes = [
models.Index(fields=['-day'], name='idx_mv_dashboard_day_desc'),
] The concurrent_index attribute specifies which column to use for the UNIQUE index required by REFRESH CONCURRENTLY. The sql attribute contains the defining query. The dependencies attribute (not shown here) can list other views that must exist before this one is created — django-pgviews-redux respects the ordering during sync.
# Create or update all materialized views
python manage.py sync_pgviews
# Refresh all materialized views
python manage.py refresh_pgviews --concurrently The recent migration system integration means that when you remove a MaterializedView class from your code and run makemigrations, django-pgviews-redux generates a migration to DROP the view from the database. It also detects when a view's SQL definition has changed and recreates it accordingly — controlled by the MATERIALIZED_VIEWS_CHECK_SQL_CHANGED setting, which defaults to True.
When to use django-pgviews-redux: multiple materialized views, especially with dependencies between them, where manual RunSQL migrations would be repetitive and error-prone. When the raw approach suffices: one or two independent materialized views where the overhead of a package isn't justified. Both approaches produce the same result — a materialized view in PostgreSQL and a Django model that reads from it.
Production Operations
Celery Beat refresh scheduling
The standard production pattern for Django applications with Celery:
# tasks.py
from celery import shared_task
from django.db import connection
import logging
logger = logging.getLogger(__name__)
@shared_task(bind=True, max_retries=3, default_retry_delay=60)
def refresh_dashboard(self):
"""Refresh the dashboard materialized view with retry logic."""
try:
with connection.cursor() as cursor:
cursor.execute(
"REFRESH MATERIALIZED VIEW CONCURRENTLY mv_dashboard"
)
logger.info("Dashboard MV refreshed successfully")
except Exception as exc:
logger.error(f"Dashboard MV refresh failed: {exc}")
self.retry(exc=exc) # settings.py or celery.py
CELERY_BEAT_SCHEDULE = {
'refresh-dashboard-mv': {
'task': 'myapp.tasks.refresh_dashboard',
'schedule': 900.0, # every 15 minutes
},
} Three retries with 60-second delays between each. If all three fail, Celery surfaces the exception to your error tracking system — Sentry, Datadog, or whatever your team uses. The refresh runs in a Celery worker process, completely independent of your web request cycle.
A note on Celery's broker: by default, Celery uses Redis or RabbitMQ as its message broker. If your goal is eliminating Redis entirely, Celery supports a PostgreSQL broker via the sqlalchemy transport — though it is less performant than Redis for high-volume task queues. For the specific use case of scheduling a materialized view refresh every 15 minutes, the simpler alternative is pg_cron (Chapter 5): three lines of SQL inside PostgreSQL, no Celery dependency, no broker dependency, no application code at all.
APScheduler: for Flask and lightweight applications
When Celery is more infrastructure than the task requires:
from apscheduler.schedulers.background import BackgroundScheduler
import psycopg2
def refresh_dashboard():
conn = psycopg2.connect("dbname=myapp")
conn.autocommit = True
with conn.cursor() as cur:
cur.execute(
"REFRESH MATERIALIZED VIEW CONCURRENTLY mv_dashboard"
)
conn.close()
scheduler = BackgroundScheduler()
scheduler.add_job(refresh_dashboard, 'interval', minutes=15)
scheduler.start() APScheduler runs in-process — no external broker, no worker process, no additional infrastructure. For a Flask application with one or two materialized views, this is the path of least resistance. For anything more complex, Celery or pg_cron is more appropriate.
Django 5.1 native connection pooling
Django 5.1, released August 2024, introduced native PostgreSQL connection pooling — eliminating the need for PgBouncer in many Django deployments. The configuration is straightforward, with one critical requirement that has caught multiple production deployments:
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'myapp',
'USER': 'postgres',
'PASSWORD': 'secret',
'HOST': 'localhost',
'PORT': '5432',
'CONN_MAX_AGE': 0, # Required — must not be None (persistent)
'OPTIONS': {
'pool': {
'min_size': 2,
'max_size': 10,
'timeout': 10,
}
},
}
} Two requirements that are non-negotiable: psycopg3 must be installed (pip install "psycopg[pool]" — the pool extra is required), and CONN_MAX_AGE must not be None (which enables persistent connections). Setting it to 0 is the recommended value — the pool manages connection lifetimes internally, and Django's CONN_MAX_AGE must not interfere. Without psycopg3, the pool option is silently ignored.
For materialized view-heavy applications, connection pooling matters because the REFRESH operation holds a connection for the duration of the refresh — potentially seconds or minutes. Without a pool, this can starve application requests of database connections. With a pool sized appropriately (max_size set to your expected concurrent connections plus headroom for refresh operations), the refresh and the application coexist without contention.
Testing materialized views
Testing materialized views in Django requires a real PostgreSQL instance. SQLite — Django's default test database — does not support materialized views. There is no workaround for this. If your test suite uses SQLite, materialized view tests will fail.
from django.test import TransactionTestCase
from django.db import connection
class DashboardMetricsTest(TransactionTestCase):
"""
TransactionTestCase required — materialized views need
real committed transactions, not the savepoint rollbacks
that TestCase uses.
"""
def setUp(self):
# Insert test data into source tables
with connection.cursor() as cursor:
cursor.execute("""
INSERT INTO myapp_events
(user_id, event_type, amount, created_at)
VALUES
(1, 'purchase', 49.99, NOW()),
(2, 'purchase', 29.99, NOW()),
(1, 'pageview', 0, NOW())
""")
# Refresh the materialized view
with connection.cursor() as cursor:
cursor.execute("REFRESH MATERIALIZED VIEW mv_dashboard")
def test_dashboard_returns_aggregated_data(self):
from myapp.models import DashboardMetrics
metrics = DashboardMetrics.objects.first()
self.assertEqual(metrics.unique_users, 2)
self.assertEqual(metrics.events, 3)
self.assertAlmostEqual(float(metrics.revenue), 79.98, places=2) TransactionTestCase is essential — Django's standard TestCase wraps each test in a savepoint that it rolls back after the test. Materialized views require actually committed data to refresh against. TransactionTestCase commits transactions and truncates tables between tests, which is slower but necessary.
For CI environments, use Testcontainers or a dedicated PostgreSQL service in your pipeline:
# GitHub Actions
services:
postgres:
image: postgres:17
env:
POSTGRES_DB: test
POSTGRES_PASSWORD: test
ports:
- 5432:5432
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5 Guard MV-dependent tests to prevent failures when running against SQLite in local development:
from django.test import TransactionTestCase
from django.db import connection
from unittest import skipUnless
@skipUnless(
connection.vendor == 'postgresql',
'Materialized views require PostgreSQL'
)
class DashboardMetricsTest(TransactionTestCase):
# ... That is every Python pathway — from raw psycopg through asyncpg to SQLAlchemy to FastAPI to the full Django integration with ORM mapping, migrations, django-pgviews-redux, Celery scheduling, native connection pooling, and production testing.
The pattern is consistent across all of them: create the materialized view in a migration, map it to a model or class, query it like any table, and schedule its refresh through whatever mechanism your application already uses for periodic tasks. The SQL is the same in every case. The driver and framework handle the delivery. The materialized view handles the performance.
The materialized view does not care which driver or framework reads from it. It is a table. Python is very good at reading from tables.
In the next chapter, we attend to Node.js — where the ORM landscape is more fragmented but the principles are, I assure you, identical.