58 min readIntelligence & insights

Implementation Guide: Benchmark cohort performance vs. standards and prior cohorts

Step-by-step implementation guide for deploying AI to benchmark cohort performance vs. standards and prior cohorts for Education clients.

Hardware Procurement

Analytics & Data Warehouse Server

Analytics & Data Warehouse Server

Dell TechnologiesPowerEdge T150 (Xeon E-2324G, 32GB ECC, 2x 1TB SSD RAID-1)Qty: 1

$1,800 MSP cost / $2,700 suggested resale (includes setup)

On-premises data warehouse and Metabase analytics host for privacy-sensitive clients who cannot use cloud. Runs PostgreSQL, Metabase, and the Python ETL pipeline. For cloud-hosted clients, this is replaced by an Azure VM or AWS EC2 instance.

Network Switch

USW-24-POE (24-port PoE Managed Switch)

UbiquitiUSW-24-POEQty: 1

$400 MSP cost / $600 suggested resale

Core network switch to ensure reliable, low-latency connectivity between the analytics server, staff workstations, and student testing devices. PoE ports power wireless access points.

Wireless Access Point

Wireless Access Point

UbiquitiU6-Pro (Wi-Fi 6)Qty: 2

$150 per unit MSP cost / $225 per unit suggested resale

Provides reliable Wi-Fi coverage for student testing areas and staff accessing dashboards. Quantity scales by building size — 2 units covers a typical tutoring center or small school wing.

Network Attached Storage (Backup)

Network Attached Storage (Backup)

SynologyDS224+ with 2x Seagate IronWolf 4TB (ST4000VN006)Qty: 1

$650 NAS + $220 drives = $870 MSP cost / $1,300 suggested resale

Nightly encrypted backup of the PostgreSQL data warehouse and Metabase configuration. Provides 30-day point-in-time restore capability for FERPA compliance.

Student Testing Devices (optional procurement)

Student Testing Devices

AcerChromebook 314 (CB314-4H)Qty: 25

$250 per unit MSP cost / $325 per unit suggested resale

Managed Chromebooks for students taking benchmark assessments. Quantity varies by client need. Enrolled in Google Workspace for Education with Chrome Enterprise management.

Software Procurement

$85/month base (5 users) + $5/user/month additional; MSP resells at $150/month base + $10/user/month

White-labeled analytics dashboard platform. Connects directly to the PostgreSQL data warehouse and renders cohort comparison dashboards, trend charts, and drill-down reports. Enterprise tier enables white-labeling with client branding, SSO integration, and row-level permissions.

NWEA MAP Growth

NWEAper-student annual

$8–$12/student/year (volume dependent); MSP bundles into managed service fee

Computer-adaptive benchmark assessments providing nationally-normed RIT scores. Generates the primary assessment data used for cohort-vs-standards and cohort-vs-cohort comparisons. Includes national percentile ranks and growth projections.

Microsoft 365 A3 Education

Microsoftper-user monthly (CSP)

$3.25/user/month MSP cost via CSP / $5.50/user/month suggested resale

Provides Azure AD for single sign-on, Exchange Online for automated report delivery, SharePoint for document storage, and Power BI Pro licenses as a fallback visualization option. CSP margin of 15–20% for the MSP.

Azure Virtual Machine (Cloud Deployment)

Microsoft AzureB2ms instance (2 vCPU, 8GB RAM) + managed PostgreSQL Flexible Server (General Purpose 2 vCore)

$145–$200/month

Cloud-hosted alternative to the on-premises PowerEdge server. Runs dockerized Metabase, Python ETL services, and connects to Azure Database for PostgreSQL. Preferred for clients without server rooms or IT staff.

OpenAI API (GPT-5.4)

OpenAIGPT-5.4Qty: usage-based (API)

$15–$40/month typical usage for narrative insight generation (~500K input tokens + ~100K output tokens per reporting cycle)

Powers the natural-language insight generator that converts statistical findings into plain-English narratives for non-technical educators. Generates automated report summaries, trend descriptions, and recommended actions.

Clever Secure Sync

CleverSaaS (free for data recipients)

Free for the MSP/analytics platform as a data recipient; the school pays if they don't already have Clever

Automated rostering and student demographic sync from the SIS. Provides a standardized API to pull student, section, teacher, and school data without building custom SIS integrations for every vendor.

Python Runtime & Libraries

Open Sourceopen-source (MIT/BSD/Apache)

$0 (included in server/VM setup)

Core ETL and AI pipeline runtime. Key packages: pandas, scikit-learn, scipy, sqlalchemy, psycopg2, openai, schedule, python-dotenv. All installed via pip in a virtual environment.

Docker & Docker Compose

Docker Inc.Docker Engine

$0 for Docker Engine; $0 for single-node deployment

Containerizes all application services (Metabase, ETL pipeline, PostgreSQL on-prem) for reproducible deployment and easy updates across multiple client sites.

Certbot (Let's Encrypt)

EFF / Let's Encryptopen-source

$0

Automated TLS certificate provisioning for the Metabase web interface. Ensures all dashboard access is encrypted in transit per FERPA technical safeguard requirements.

Prerequisites

  • Client has an active Student Information System (SIS) — PowerSchool, Infinite Campus, Skyward, or Alma — with at least one full year of historical enrollment and demographic data
  • Client uses or is willing to adopt a benchmark assessment platform (NWEA MAP Growth recommended; Renaissance Star or LinkIt! also supported) with at least one prior testing cycle of data available
  • Client has or can provision a Clever account for automated rostering (or can provide OneRoster 1.2 CSV exports on a scheduled basis)
  • Internet connectivity of at least 25 Mbps symmetrical at the primary site; 100 Mbps recommended for 500+ students
  • An Azure AD or Google Workspace domain for staff SSO authentication (M365 A3 Education provides this)
  • A designated client staff member ('Data Champion') with authority to sign the FERPA Data Processing Agreement and serve as the ongoing point of contact for data questions
  • Physical or virtual server environment: either a dedicated on-premises server location with UPS power and ventilation, or authorization to provision cloud resources in the client's Azure tenant
  • Written FERPA-compliant Data Processing Agreement (DPA) executed between the MSP and the client before any student data is transferred — template provided by the MSP
  • State-specific student privacy law review completed (e.g., California SOPIPA, Texas TDPSA, New York Ed Law 2-d) to identify any additional consent or registration requirements
  • All assessment vendor API credentials or SFTP access credentials obtained and documented in the MSP's secure credential vault (e.g., IT Glue or Hudu)

Installation Steps

Step 1: Execute FERPA/COPPA Compliance Documentation

Before touching any student data, complete all legal and compliance prerequisites. This protects both the MSP and the client. Prepare and execute the Data Processing Agreement (DPA) using the Student Data Privacy Consortium template. Identify all data elements to be collected (student PII, assessment scores, demographic data). Document the data flow diagram showing where student data will be stored, processed, and who has access. Complete the client's state-specific vendor registration if required (e.g., New York Education Law 2-d requires vendor registration with NYSED).

Critical

CRITICAL: No student data should be transferred, imported, or accessed until the DPA is fully executed. Keep a signed copy in the client's documentation folder. Many states now maintain public registries of approved education data processors — check if your state requires MSP registration.

Step 2: Provision Infrastructure (Cloud Path)

Create the Azure resource group and provision the core infrastructure. This step assumes the cloud deployment path. For on-premises, skip to step 2b. Create a resource group, provision an Azure Database for PostgreSQL Flexible Server (General Purpose, 2 vCores, 8GB RAM, 128GB storage), and a B2ms VM for running Metabase and the ETL pipeline. Configure networking with a Virtual Network, NSG rules allowing only HTTPS (443) and SSH (22 from MSP IP range), and a static public IP.

Azure CLI commands to provision resource group, PostgreSQL Flexible Server, VM, NSG rule, and firewall rule
bash
az group create --name rg-cohort-analytics --location eastus
az postgres flexible-server create --resource-group rg-cohort-analytics --name cohort-db-server --location eastus --admin-user msp_admin --admin-password '<STRONG_PASSWORD>' --sku-name Standard_D2ds_v4 --storage-size 128 --version 16 --yes
az vm create --resource-group rg-cohort-analytics --name vm-analytics --image Ubuntu2404 --size Standard_B2ms --admin-username mspadmin --ssh-key-values ~/.ssh/id_rsa.pub --public-ip-sku Standard --nsg-rule SSH
az network nsg rule create --resource-group rg-cohort-analytics --nsg-name vm-analyticsNSG --name AllowHTTPS --priority 100 --destination-port-ranges 443 --protocol Tcp --access Allow
az postgres flexible-server firewall-rule create --resource-group rg-cohort-analytics --name cohort-db-server --rule-name AllowAnalyticsVM --start-ip-address <VM_PRIVATE_IP> --end-ip-address <VM_PRIVATE_IP>
Note

Replace <STRONG_PASSWORD> with a 20+ character password stored in the MSP's credential vault. For on-premises deployments, install Ubuntu Server 24.04 LTS on the PowerEdge T150, configure static IP, and install PostgreSQL 16 directly. Azure Education credits may be available — check with the client.

Step 3: Provision Infrastructure (On-Premises Path)

For clients requiring on-premises hosting, install Ubuntu Server 24.04 LTS on the Dell PowerEdge T150. Configure RAID-1 mirroring during OS installation for the two SSDs. Set up static IP addressing, configure UFW firewall, and install Docker Engine and Docker Compose.

Install UFW, Docker Engine, and Docker Compose on Ubuntu Server 24.04 LTS
bash
sudo apt update && sudo apt upgrade -y
sudo apt install -y ufw openssh-server curl gnupg
sudo ufw allow 22/tcp
sudo ufw allow 443/tcp
sudo ufw enable
curl -fsSL https://get.docker.com -o get-docker.sh
sudo sh get-docker.sh
sudo usermod -aG docker mspadmin
sudo apt install -y docker-compose-plugin
docker compose version
Note

Ensure the server is placed in a physically secured location (locked server room or cabinet). Configure UPS with at least 30 minutes of runtime. Set BIOS to auto-power-on after power loss.

Step 4: Create PostgreSQL Data Warehouse Schema

Design and create the Ed-Fi-aligned data warehouse schema that will store student demographics, enrollment, assessment results, cohort definitions, and benchmark standards. The schema uses a star-schema design optimized for analytical queries with fact tables for assessment events and dimension tables for students, schools, cohorts, standards, and time periods.

bash
psql -h cohort-db-server.postgres.database.azure.com -U msp_admin -d postgres -c "CREATE DATABASE cohort_analytics;"
psql -h cohort-db-server.postgres.database.azure.com -U msp_admin -d cohort_analytics -f /opt/cohort-analytics/sql/001_create_schema.sql
Note

The full SQL schema file (001_create_schema.sql) is provided in the custom AI components section under 'Data Warehouse Schema'. Run this as the msp_admin user. Create a read-only 'metabase_reader' role for the Metabase connection and a 'etl_writer' role for the Python pipeline.

Step 5: Deploy Metabase via Docker Compose

Deploy the Metabase Enterprise container with PostgreSQL as its application database (separate from the analytics warehouse). Configure environment variables for the database connection, set up the Nginx reverse proxy with TLS via Certbot, and apply white-label branding with the client's logo and color scheme.

Create Docker Compose file and start Metabase Enterprise container
bash
mkdir -p /opt/cohort-analytics/metabase
cat > /opt/cohort-analytics/metabase/docker-compose.yml << 'EOF'
version: '3.9'
services:
  metabase:
    image: metabase/metabase-enterprise:latest
    container_name: metabase
    ports:
      - "3000:3000"
    environment:
      MB_DB_TYPE: postgres
      MB_DB_DBNAME: metabase_app
      MB_DB_PORT: 5432
      MB_DB_USER: metabase_app_user
      MB_DB_PASS: ${MB_DB_PASS}
      MB_DB_HOST: ${DB_HOST}
      MB_SITE_NAME: "${CLIENT_NAME} Analytics"
      MB_PREMIUM_EMBEDDING_TOKEN: ${MB_TOKEN}
      JAVA_OPTS: "-Xmx2g"
    restart: unless-stopped
    volumes:
      - metabase_plugins:/plugins
volumes:
  metabase_plugins:
EOF
cd /opt/cohort-analytics/metabase && docker compose up -d
1
Install Nginx and Certbot for TLS
Install Nginx and provision TLS certificate via Certbot
bash
sudo apt install -y nginx certbot python3-certbot-nginx
sudo certbot --nginx -d analytics.clientdomain.com --non-interactive --agree-tos -m alerts@mspname.com
Note

The MB_PREMIUM_EMBEDDING_TOKEN is provided with the Metabase Enterprise license. Store all secrets in a .env file with 600 permissions. The Nginx config should proxy_pass to localhost:3000 and enforce HSTS headers. Verify the Metabase UI loads at https://analytics.clientdomain.com before proceeding.

Step 6: Configure Metabase Data Source and Initial Admin

Connect Metabase to the cohort_analytics data warehouse using the read-only metabase_reader role. Create the initial admin account using the MSP's service account, then create the client admin account. Configure row-level security so school-level users can only see their school's data (critical for multi-school clients).

1
Complete the Metabase setup wizard (navigate to https://analytics.clientdomain.com/setup)
2
Add database: PostgreSQL, host=cohort-db-server, port=5432, db=cohort_analytics, user=metabase_reader
3
Enable data sandboxing under Admin > Permissions
4
Create groups: 'School Admin', 'Teacher', 'District Admin'
5
Upload client logo under Admin > Settings > Whitelabel > Logo
Note

Metabase Enterprise is required for row-level security (data sandboxing) and white-labeling. If the client is a single tutoring center with no multi-site needs, Metabase Starter ($85/month) is sufficient. Test the database connection before proceeding.

Step 7: Configure Clever Integration for Rostering

Set up Clever Secure Sync to automatically pull student, teacher, section, and school data from the client's SIS into the data warehouse. Register the MSP's analytics application with Clever, obtain API credentials, and configure the data sharing agreement with the school. The Clever integration replaces the need for custom SIS API work.

1
Log in to https://apps.clever.com/signup as the MSP developer account
2
Create a new application: 'Cohort Analytics by [MSP Name]'
3
Request data: students (name, grade, school, demographics), sections, teachers
4
The client's Clever district admin approves data sharing
5
Note the CLEVER_API_TOKEN from the application dashboard
Store Clever credentials in .env
bash
echo 'CLEVER_API_TOKEN=your_token_here' >> /opt/cohort-analytics/.env
echo 'CLEVER_DISTRICT_ID=your_district_id' >> /opt/cohort-analytics/.env
Note

Clever is free for applications receiving data. If the client doesn't have Clever, the alternative is to use OneRoster 1.2 CSV exports (many SIS platforms can generate these) or direct SIS API integration. Clever Complete (paid by the school) adds attendance data which enriches the analytics.

Step 8: Configure Assessment Data Ingestion

Set up the assessment data pipeline to pull benchmark results from NWEA MAP Growth (primary) or Renaissance Star (alternative). For NWEA, use the MAP Growth API to pull student RIT scores, growth projections, and national percentile ranks. For Renaissance, use the Star API. Both support automated data pulls.

1
Log into https://teach.mapnwea.org as the client's MAP admin
2
Navigate to Admin > Manage API Access
3
Generate API credentials and note the client_id and client_secret
Set NWEA MAP Growth environment variables
bash
echo 'NWEA_CLIENT_ID=your_client_id' >> /opt/cohort-analytics/.env
echo 'NWEA_CLIENT_SECRET=your_client_secret' >> /opt/cohort-analytics/.env
echo 'NWEA_BASE_URL=https://api.mapnwea.org' >> /opt/cohort-analytics/.env
1
Contact Renaissance support to enable API access
2
Obtain OAuth2 credentials for the Star Assessment API
Set Renaissance Star environment variables (alternative)
bash
echo 'STAR_CLIENT_ID=your_client_id' >> /opt/cohort-analytics/.env
echo 'STAR_CLIENT_SECRET=your_client_secret' >> /opt/cohort-analytics/.env
Note

NWEA MAP Growth tests are typically administered 3 times per year (fall, winter, spring). The API should be polled within 48 hours of each testing window close. If the client uses a different assessment platform, CSV/SFTP import is the fallback — configure a watched folder at /opt/cohort-analytics/data/incoming/ for manual uploads.

Step 9: Deploy the ETL Pipeline

Install and configure the Python-based ETL pipeline that orchestrates data collection from Clever (rostering), NWEA/Star (assessments), and any manual CSV uploads. The pipeline runs on a schedule via cron, extracts data from sources, transforms it to the warehouse schema, loads it into PostgreSQL, and triggers the AI analysis components.

Install Python 3.12 and required ETL dependencies
bash
sudo apt install -y python3.12 python3.12-venv python3-pip
python3.12 -m venv /opt/cohort-analytics/venv
source /opt/cohort-analytics/venv/bin/activate
pip install pandas==2.2.2 sqlalchemy==2.0.30 psycopg2-binary==2.9.9 requests==2.32.3 scipy==1.13.1 scikit-learn==1.5.0 openai==1.35.0 python-dotenv==1.0.1 schedule==1.2.2 jinja2==3.1.4
1
Deploy the ETL code by creating required directories
2
Copy ETL scripts from MSP repository to /opt/cohort-analytics/etl/
3
Set up cron job for nightly sync
Create directory structure and register nightly cron job for mspadmin
bash
mkdir -p /opt/cohort-analytics/{etl,sql,templates,logs}
echo '0 2 * * * cd /opt/cohort-analytics && /opt/cohort-analytics/venv/bin/python etl/main_pipeline.py >> logs/etl.log 2>&1' | sudo crontab -u mspadmin -
Note

The ETL pipeline code is provided in full in the custom AI components section. The cron job runs at 2 AM local time. For the initial deployment, run the pipeline manually first to verify data flows correctly. Monitor /opt/cohort-analytics/logs/etl.log for errors.

Step 10: Load Historical Data and Define Cohorts

Perform the initial historical data load covering all available prior assessment cycles and enrollment records. Define cohort groupings based on the client's needs: typically by grade level, school year, and optionally by program, teacher, or demographic subgroup. Load national/state benchmark standards for comparison.

1
Run initial full data pull from Clever
2
Import historical assessment data (CSV files from client)
3
Define cohorts
4
Load benchmark standards
5
Run initial analysis
Initial data load, cohort definition, and analysis pipeline
bash
source /opt/cohort-analytics/venv/bin/activate
python etl/clever_sync.py --full-sync
python etl/csv_importer.py --source /opt/cohort-analytics/data/incoming/historical/ --type nwea_map
python etl/cohort_builder.py --method grade-year --min-size 10
python etl/standards_loader.py --source nwea_norms_2024
python etl/analysis_engine.py --full-rebuild
Note

The client must provide historical CSV exports from their assessment platform for prior years. NWEA publishes national norm tables annually — the 2024 norms are included in the standards_loader. For state-specific standards, download from the state DOE website and import via the CSV path. Minimum cohort size of 10 students prevents re-identification of individuals (FERPA safe harbor).

Step 11: Build Metabase Dashboards

Create the five core dashboards in Metabase that provide the client-facing analytics experience. Each dashboard uses saved SQL questions connected to the data warehouse views created by the schema. Dashboards include: (1) Cohort vs. Standards Overview, (2) Year-over-Year Cohort Comparison, (3) Subgroup Performance Analysis, (4) Growth Trajectory & Predictions, (5) AI-Generated Insight Report.

1
New Question > Native Query
2
Paste SQL from the 'cohort_vs_standards_view' provided in AI components
3
Visualize as: Bar chart (grouped by cohort, colored by above/below standard)
4
Add filter widgets: School Year, Grade Level, Subject, School
5
Save to 'Cohort Analytics' collection
6
Create Dashboard, add the question, add filter connections
7
Repeat for all 5 dashboards using the SQL views provided
8
Enable auto-refresh on Dashboard 1 (daily)
9
Set up dashboard subscriptions: email PDF to client stakeholders weekly
Note

Full SQL for all dashboard views is provided in the AI components section. Use Metabase's dashboard subscription feature to email weekly PDF snapshots to principals and academic directors. Set up a 'Pulse' for critical alerts (e.g., any cohort dropping >10 percentile points).

Step 12: Configure SSO and User Provisioning

Integrate Metabase with the client's Azure AD (via M365 A3 Education) for single sign-on. Map Azure AD groups to Metabase groups for automatic permission assignment. This ensures teachers see only their classes, school admins see their school, and district admins see everything.

In Azure AD (Entra ID)

1
Register a new Enterprise Application: 'Cohort Analytics'
2
Configure SAML SSO with the following: Identifier (Entity ID): https://analytics.clientdomain.com, Reply URL: https://analytics.clientdomain.com/auth/sso, Sign-on URL: https://analytics.clientdomain.com
3
Map claims: user.mail -> email, user.displayname -> first_name + last_name
4
Assign Azure AD groups: 'Teachers', 'School Admins', 'District Admins'

In Metabase Admin > Authentication > SAML

1
Enable SAML
2
Paste IdP metadata URL from Azure AD
3
Map SAML groups to Metabase groups
4
Enable 'Create accounts on first login'
Note

For Google Workspace clients, use SAML federation with Google as the IdP instead of Azure AD. The Metabase SAML integration requires the Enterprise license. Test SSO with a non-admin account before rolling out to all users.

Step 13: Configure Automated Backups

Set up automated nightly backups of the PostgreSQL data warehouse and Metabase application database. For on-premises, back up to the Synology NAS with encryption. For cloud, use Azure Backup. Implement a 30-day retention policy and monthly verification of backup integrity.

1
On-premises backup to Synology NAS: create backup directory and script
On-premises backup script to Synology NAS with GPG encryption and 30-day pruning
bash
mkdir -p /mnt/nas/backups/cohort-analytics
cat > /opt/cohort-analytics/backup.sh << 'EOFBACKUP'
#!/bin/bash
DATE=$(date +%Y%m%d_%H%M%S)
BACKUP_DIR=/mnt/nas/backups/cohort-analytics

# Dump analytics database
pg_dump -h localhost -U msp_admin -Fc cohort_analytics > $BACKUP_DIR/cohort_analytics_$DATE.dump

# Dump Metabase app database
pg_dump -h localhost -U msp_admin -Fc metabase_app > $BACKUP_DIR/metabase_app_$DATE.dump

# Encrypt
for f in $BACKUP_DIR/*_$DATE.dump; do
  gpg --batch --yes --passphrase-file /opt/cohort-analytics/.backup_key --symmetric --cipher-algo AES256 $f
  rm $f
done

# Prune backups older than 30 days
find $BACKUP_DIR -name '*.gpg' -mtime +30 -delete
EOFBACKUP
chmod +x /opt/cohort-analytics/backup.sh
echo '30 3 * * * /opt/cohort-analytics/backup.sh >> /opt/cohort-analytics/logs/backup.log 2>&1' | sudo crontab -u mspadmin -
1
For the Azure cloud path: create a backup vault and configure Azure Backup for PostgreSQL Flexible Server
Azure cloud backup
bash
# vault creation and PostgreSQL Flexible Server backup configuration

az backup vault create # + configure Azure Backup for PostgreSQL Flexible Server
Note

The GPG encryption passphrase must be stored in the MSP's credential vault, NOT on the server itself (use a mounted secret or vault agent in production). Test a full restore from backup during the initial deployment to verify the backup chain. For Azure, enable geo-redundant backup on the PostgreSQL Flexible Server.

Step 14: Configure Monitoring and Alerting

Set up monitoring for all system components: server health, database performance, ETL pipeline success/failure, and Metabase availability. Configure alerts to the MSP's NOC or RMM platform. Use a lightweight monitoring stack appropriate for a single-client deployment.

1
Install monitoring agent (example: Datadog or the MSP's RMM agent). For a lightweight self-hosted approach, use Uptime Kuma.
2
Configure monitors: HTTPS check: https://analytics.clientdomain.com (interval: 60s), TCP check: PostgreSQL port 5432 (interval: 60s), and Heartbeat: ETL pipeline posts to Uptime Kuma heartbeat URL on success.
3
Add heartbeat call to ETL pipeline (end of main_pipeline.py).
4
Configure notifications: email + Slack/Teams webhook to MSP NOC channel.
Deploy Uptime Kuma monitoring container
bash
docker run -d --name uptime-kuma --restart=unless-stopped -p 3001:3001 -v uptime-kuma:/app/data louislam/uptime-kuma:1
ETL pipeline heartbeat call — add to end of main_pipeline.py
python
requests.get('https://monitoring.mspname.com/api/push/YOUR_HEARTBEAT_ID?status=up')
Note

If the MSP uses ConnectWise, Datto, or NinjaRMM, integrate via their respective monitoring agents instead of Uptime Kuma. The key monitors are: (1) Metabase is reachable, (2) ETL pipeline ran successfully in the last 24 hours, (3) database disk usage is below 80%, (4) backup completed successfully.

Custom AI Components

Data Warehouse Schema

Type: integration

The PostgreSQL schema that stores all student, enrollment, assessment, cohort, and benchmark data. Follows Ed-Fi data standard naming conventions and uses a star-schema design optimized for analytical queries. Includes materialized views for common dashboard queries.

Implementation:

/opt/cohort-analytics/sql/001_create_schema.sql
sql
-- File: /opt/cohort-analytics/sql/001_create_schema.sql

-- Roles
CREATE ROLE etl_writer LOGIN PASSWORD 'CHANGE_ME';
CREATE ROLE metabase_reader LOGIN PASSWORD 'CHANGE_ME';

-- Schema
CREATE SCHEMA IF NOT EXISTS analytics;
GRANT USAGE ON SCHEMA analytics TO metabase_reader;
ALTER DEFAULT PRIVILEGES IN SCHEMA analytics GRANT SELECT ON TABLES TO metabase_reader;
GRANT ALL ON SCHEMA analytics TO etl_writer;

-- Dimension: Schools
CREATE TABLE analytics.dim_schools (
    school_id SERIAL PRIMARY KEY,
    clever_id VARCHAR(50) UNIQUE,
    school_name VARCHAR(200) NOT NULL,
    nces_id VARCHAR(20),
    school_type VARCHAR(50), -- 'elementary', 'middle', 'high', 'tutoring_center'
    district_name VARCHAR(200),
    state_code CHAR(2),
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

-- Dimension: Students
CREATE TABLE analytics.dim_students (
    student_id SERIAL PRIMARY KEY,
    clever_id VARCHAR(50) UNIQUE,
    sis_id VARCHAR(50),
    first_name VARCHAR(100),
    last_name VARCHAR(100),
    date_of_birth DATE,
    gender VARCHAR(20),
    race_ethnicity VARCHAR(100),
    ell_status BOOLEAN DEFAULT FALSE,
    sped_status BOOLEAN DEFAULT FALSE,
    frl_status VARCHAR(20), -- 'free', 'reduced', 'none'
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

-- Dimension: Enrollments (SCD Type 2 for tracking cohort movement)
CREATE TABLE analytics.dim_enrollments (
    enrollment_id SERIAL PRIMARY KEY,
    student_id INT REFERENCES analytics.dim_students(student_id),
    school_id INT REFERENCES analytics.dim_schools(school_id),
    grade_level VARCHAR(5) NOT NULL, -- 'K', '1', '2', ..., '12'
    school_year VARCHAR(9) NOT NULL, -- '2024-2025'
    section_name VARCHAR(100),
    teacher_name VARCHAR(200),
    enrollment_start DATE,
    enrollment_end DATE,
    is_current BOOLEAN DEFAULT TRUE,
    created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_enrollments_year_grade ON analytics.dim_enrollments(school_year, grade_level);

-- Dimension: Cohorts
CREATE TABLE analytics.dim_cohorts (
    cohort_id SERIAL PRIMARY KEY,
    cohort_name VARCHAR(200) NOT NULL, -- e.g., 'Grade 3 - 2024-2025'
    cohort_type VARCHAR(50) NOT NULL, -- 'grade-year', 'program', 'teacher', 'demographic'
    school_year VARCHAR(9) NOT NULL,
    grade_level VARCHAR(5),
    school_id INT REFERENCES analytics.dim_schools(school_id),
    filter_criteria JSONB, -- flexible filters for custom cohorts
    student_count INT,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

-- Bridge: Cohort Membership
CREATE TABLE analytics.bridge_cohort_students (
    cohort_id INT REFERENCES analytics.dim_cohorts(cohort_id),
    student_id INT REFERENCES analytics.dim_students(student_id),
    PRIMARY KEY (cohort_id, student_id)
);

-- Dimension: Benchmark Standards
CREATE TABLE analytics.dim_standards (
    standard_id SERIAL PRIMARY KEY,
    standard_name VARCHAR(200) NOT NULL, -- e.g., 'NWEA 2024 National Norm'
    standard_type VARCHAR(50) NOT NULL, -- 'national_norm', 'state_standard', 'client_target'
    subject VARCHAR(50) NOT NULL, -- 'math', 'reading', 'language', 'science'
    grade_level VARCHAR(5) NOT NULL,
    season VARCHAR(10), -- 'fall', 'winter', 'spring'
    metric_name VARCHAR(50) NOT NULL, -- 'rit_score', 'percentile', 'proficiency_level'
    percentile_25 NUMERIC(8,2),
    percentile_50 NUMERIC(8,2), -- median / 'meets standard'
    percentile_75 NUMERIC(8,2),
    mean_score NUMERIC(8,2),
    std_deviation NUMERIC(8,2),
    source_year VARCHAR(9),
    created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_standards_lookup ON analytics.dim_standards(subject, grade_level, season, standard_type);

-- Fact: Assessment Results
CREATE TABLE analytics.fact_assessments (
    assessment_id SERIAL PRIMARY KEY,
    student_id INT REFERENCES analytics.dim_students(student_id),
    school_id INT REFERENCES analytics.dim_schools(school_id),
    assessment_platform VARCHAR(50) NOT NULL, -- 'nwea_map', 'star', 'linkit', 'state_test'
    assessment_type VARCHAR(50) NOT NULL, -- 'benchmark', 'interim', 'summative'
    subject VARCHAR(50) NOT NULL,
    grade_level_tested VARCHAR(5) NOT NULL,
    school_year VARCHAR(9) NOT NULL,
    season VARCHAR(10), -- 'fall', 'winter', 'spring'
    test_date DATE NOT NULL,
    rit_score NUMERIC(8,2), -- NWEA RIT score
    scaled_score NUMERIC(8,2), -- Generic scaled score
    percentile_rank INT, -- National percentile
    proficiency_level VARCHAR(30), -- 'below', 'approaching', 'meets', 'exceeds'
    growth_percentile INT, -- Student Growth Percentile (SGP)
    lexile_score INT, -- Reading level (if applicable)
    quantile_score INT, -- Math level (if applicable)
    raw_data JSONB, -- Full API response for audit
    created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_assessments_cohort ON analytics.fact_assessments(school_year, grade_level_tested, subject, season);
CREATE INDEX idx_assessments_student ON analytics.fact_assessments(student_id, test_date);

-- Fact: Cohort Aggregate Metrics (pre-computed for dashboard performance)
CREATE TABLE analytics.fact_cohort_metrics (
    metric_id SERIAL PRIMARY KEY,
    cohort_id INT REFERENCES analytics.dim_cohorts(cohort_id),
    subject VARCHAR(50) NOT NULL,
    season VARCHAR(10),
    school_year VARCHAR(9) NOT NULL,
    student_count INT NOT NULL,
    mean_score NUMERIC(8,2),
    median_score NUMERIC(8,2),
    std_deviation NUMERIC(8,2),
    pct_below_standard NUMERIC(5,2),
    pct_approaching_standard NUMERIC(5,2),
    pct_meets_standard NUMERIC(5,2),
    pct_exceeds_standard NUMERIC(5,2),
    mean_percentile_rank NUMERIC(5,2),
    mean_growth_percentile NUMERIC(5,2),
    comparison_standard_id INT REFERENCES analytics.dim_standards(standard_id),
    effect_size_vs_standard NUMERIC(8,4), -- Cohen's d
    p_value_vs_standard NUMERIC(8,6), -- t-test p-value
    computed_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_cohort_metrics_lookup ON analytics.fact_cohort_metrics(cohort_id, subject, season);

-- Fact: Cohort-vs-Cohort Comparisons
CREATE TABLE analytics.fact_cohort_comparisons (
    comparison_id SERIAL PRIMARY KEY,
    cohort_a_id INT REFERENCES analytics.dim_cohorts(cohort_id),
    cohort_b_id INT REFERENCES analytics.dim_cohorts(cohort_id),
    subject VARCHAR(50) NOT NULL,
    season VARCHAR(10),
    mean_diff NUMERIC(8,2),
    effect_size NUMERIC(8,4), -- Cohen's d
    p_value NUMERIC(8,6),
    is_significant BOOLEAN, -- p < 0.05
    direction VARCHAR(20), -- 'improvement', 'decline', 'no_change'
    narrative TEXT, -- AI-generated plain-English description
    computed_at TIMESTAMPTZ DEFAULT NOW()
);

-- Fact: AI-Generated Insights
CREATE TABLE analytics.fact_insights (
    insight_id SERIAL PRIMARY KEY,
    cohort_id INT REFERENCES analytics.dim_cohorts(cohort_id),
    insight_type VARCHAR(50) NOT NULL, -- 'trend', 'alert', 'recommendation', 'summary'
    severity VARCHAR(20), -- 'info', 'warning', 'critical'
    title VARCHAR(300),
    body TEXT NOT NULL,
    data_context JSONB, -- The metrics that triggered this insight
    generated_by VARCHAR(50) DEFAULT 'gpt-5.4',
    school_year VARCHAR(9),
    season VARCHAR(10),
    is_active BOOLEAN DEFAULT TRUE,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

-- Materialized View: Dashboard - Cohort vs Standards
CREATE MATERIALIZED VIEW analytics.mv_cohort_vs_standards AS
SELECT
    c.cohort_name,
    c.school_year,
    c.grade_level,
    s.school_name,
    cm.subject,
    cm.season,
    cm.student_count,
    cm.mean_score,
    cm.median_score,
    cm.mean_percentile_rank,
    cm.pct_meets_standard + cm.pct_exceeds_standard AS pct_proficient,
    st.percentile_50 AS standard_median,
    cm.mean_score - st.percentile_50 AS diff_from_standard,
    cm.effect_size_vs_standard,
    cm.p_value_vs_standard,
    CASE
        WHEN cm.effect_size_vs_standard > 0.2 AND cm.p_value_vs_standard < 0.05 THEN 'Above Standard'
        WHEN cm.effect_size_vs_standard < -0.2 AND cm.p_value_vs_standard < 0.05 THEN 'Below Standard'
        ELSE 'At Standard'
    END AS performance_category
FROM analytics.fact_cohort_metrics cm
JOIN analytics.dim_cohorts c ON cm.cohort_id = c.cohort_id
JOIN analytics.dim_schools s ON c.school_id = s.school_id
LEFT JOIN analytics.dim_standards st ON cm.comparison_standard_id = st.standard_id
ORDER BY c.school_year DESC, c.grade_level, cm.subject;

CREATE UNIQUE INDEX idx_mv_cvs ON analytics.mv_cohort_vs_standards(cohort_name, subject, season);

-- Materialized View: Year-over-Year Comparison
CREATE MATERIALIZED VIEW analytics.mv_yoy_comparison AS
SELECT
    cc.comparison_id,
    ca.cohort_name AS current_cohort,
    ca.school_year AS current_year,
    cb.cohort_name AS prior_cohort,
    cb.school_year AS prior_year,
    ca.grade_level,
    cc.subject,
    cc.season,
    cma.mean_score AS current_mean,
    cmb.mean_score AS prior_mean,
    cc.mean_diff,
    cc.effect_size,
    cc.p_value,
    cc.is_significant,
    cc.direction,
    cc.narrative
FROM analytics.fact_cohort_comparisons cc
JOIN analytics.dim_cohorts ca ON cc.cohort_a_id = ca.cohort_id
JOIN analytics.dim_cohorts cb ON cc.cohort_b_id = cb.cohort_id
JOIN analytics.fact_cohort_metrics cma ON cma.cohort_id = ca.cohort_id AND cma.subject = cc.subject AND cma.season = cc.season
JOIN analytics.fact_cohort_metrics cmb ON cmb.cohort_id = cb.cohort_id AND cmb.subject = cc.subject AND cmb.season = cc.season
ORDER BY ca.school_year DESC, ca.grade_level;

-- Refresh function
CREATE OR REPLACE FUNCTION analytics.refresh_materialized_views() RETURNS void AS $$
BEGIN
    REFRESH MATERIALIZED VIEW CONCURRENTLY analytics.mv_cohort_vs_standards;
    REFRESH MATERIALIZED VIEW CONCURRENTLY analytics.mv_yoy_comparison;
END;
$$ LANGUAGE plpgsql;

Clever Rostering Sync

Type: integration

Python module that pulls student, teacher, section, and school data from the Clever API and upserts it into the data warehouse dimension tables. Supports both full sync (initial load) and incremental sync (nightly delta). Handles pagination and rate limiting.

Implementation:

File: /opt/cohort-analytics/etl/clever_sync.py
python
import os
import sys
import logging
import argparse
import requests
from datetime import datetime
from dotenv import load_dotenv
from sqlalchemy import create_engine, text

load_dotenv('/opt/cohort-analytics/.env')
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
logger = logging.getLogger('clever_sync')

CLEVER_BASE = 'https://api.clever.com/v3.0'
CLEVER_TOKEN = os.getenv('CLEVER_API_TOKEN')
DB_URL = os.getenv('DATABASE_URL')  # postgresql://etl_writer:pass@host:5432/cohort_analytics

def clever_get(endpoint, params=None):
    """GET from Clever API with pagination and error handling."""
    headers = {'Authorization': f'Bearer {CLEVER_TOKEN}', 'Accept': 'application/json'}
    results = []
    url = f'{CLEVER_BASE}{endpoint}'
    while url:
        resp = requests.get(url, headers=headers, params=params, timeout=30)
        resp.raise_for_status()
        data = resp.json()
        results.extend(data.get('data', []))
        links = data.get('links', [])
        url = next((l['uri'] for l in links if l['rel'] == 'next'), None)
        if url and not url.startswith('http'):
            url = f'https://api.clever.com{url}'
        params = None  # params only on first request
    return results

def sync_schools(engine):
    logger.info('Syncing schools...')
    schools = clever_get('/schools')
    with engine.begin() as conn:
        for s in schools:
            d = s['data']
            conn.execute(text("""
                INSERT INTO analytics.dim_schools (clever_id, school_name, nces_id, school_type, state_code, updated_at)
                VALUES (:clever_id, :name, :nces_id, :school_type, :state, NOW())
                ON CONFLICT (clever_id) DO UPDATE SET
                    school_name = EXCLUDED.school_name,
                    nces_id = EXCLUDED.nces_id,
                    updated_at = NOW()
            """), {'clever_id': d['id'], 'name': d.get('name',''), 'nces_id': d.get('nces_id'),
                   'school_type': d.get('high_school') and 'high' or d.get('middle_school') and 'middle' or 'elementary',
                   'state': d.get('location',{}).get('state','')})
    logger.info(f'Synced {len(schools)} schools')

def sync_students(engine):
    logger.info('Syncing students...')
    students = clever_get('/students')
    with engine.begin() as conn:
        for s in students:
            d = s['data']
            conn.execute(text("""
                INSERT INTO analytics.dim_students (clever_id, sis_id, first_name, last_name, gender, race_ethnicity, ell_status, updated_at)
                VALUES (:clever_id, :sis_id, :first_name, :last_name, :gender, :race, :ell, NOW())
                ON CONFLICT (clever_id) DO UPDATE SET
                    sis_id = EXCLUDED.sis_id,
                    first_name = EXCLUDED.first_name,
                    last_name = EXCLUDED.last_name,
                    gender = EXCLUDED.gender,
                    race_ethnicity = EXCLUDED.race_ethnicity,
                    ell_status = EXCLUDED.ell_status,
                    updated_at = NOW()
            """), {'clever_id': d['id'], 'sis_id': d.get('sis_id',''),
                   'first_name': d.get('name',{}).get('first',''),
                   'last_name': d.get('name',{}).get('last',''),
                   'gender': d.get('gender',''), 'race': d.get('race',''),
                   'ell': d.get('ell_status','') == 'Y'})
    logger.info(f'Synced {len(students)} students')

def sync_enrollments(engine, school_year):
    logger.info('Syncing enrollments...')
    sections = clever_get('/sections')
    with engine.begin() as conn:
        for s in sections:
            d = s['data']
            school_clever_id = d.get('school')
            teacher_name = d.get('teacher',{}).get('name','') if isinstance(d.get('teacher'), dict) else ''
            grade = d.get('grade','')
            section_name = d.get('name','')
            student_ids = d.get('students', [])
            for sid in student_ids:
                conn.execute(text("""
                    INSERT INTO analytics.dim_enrollments (student_id, school_id, grade_level, school_year, section_name, teacher_name, is_current)
                    SELECT s.student_id, sc.school_id, :grade, :school_year, :section_name, :teacher_name, TRUE
                    FROM analytics.dim_students s, analytics.dim_schools sc
                    WHERE s.clever_id = :student_clever_id AND sc.clever_id = :school_clever_id
                    ON CONFLICT DO NOTHING
                """), {'grade': grade, 'school_year': school_year, 'section_name': section_name,
                       'teacher_name': teacher_name, 'student_clever_id': sid,
                       'school_clever_id': school_clever_id})
    logger.info(f'Synced enrollments from {len(sections)} sections')

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--full-sync', action='store_true')
    parser.add_argument('--school-year', default='2024-2025')
    args = parser.parse_args()
    
    engine = create_engine(DB_URL)
    sync_schools(engine)
    sync_students(engine)
    sync_enrollments(engine, args.school_year)
    logger.info('Clever sync complete')

if __name__ == '__main__':
    main()

Assessment Data Importer

Type: integration Python module that imports assessment data from NWEA MAP Growth API and/or CSV files from various assessment platforms. Normalizes all assessment data into the unified fact_assessments table. Supports NWEA MAP Growth (API), Renaissance Star (CSV/API), LinkIt! (CSV), and generic state test CSV formats.

Implementation

File: /opt/cohort-analytics/etl/assessment_importer.py
python
import os
import logging
import json
import pandas as pd
import requests
from datetime import datetime
from dotenv import load_dotenv
from sqlalchemy import create_engine, text

load_dotenv('/opt/cohort-analytics/.env')
logger = logging.getLogger('assessment_importer')

DB_URL = os.getenv('DATABASE_URL')
NWEA_CLIENT_ID = os.getenv('NWEA_CLIENT_ID')
NWEA_CLIENT_SECRET = os.getenv('NWEA_CLIENT_SECRET')
NWEA_BASE = os.getenv('NWEA_BASE_URL', 'https://api.mapnwea.org')

class NWEAImporter:
    def __init__(self, engine):
        self.engine = engine
        self.token = None
    
    def authenticate(self):
        resp = requests.post(f'{NWEA_BASE}/oauth2/token',
            data={'grant_type': 'client_credentials'},
            auth=(NWEA_CLIENT_ID, NWEA_CLIENT_SECRET), timeout=30)
        resp.raise_for_status()
        self.token = resp.json()['access_token']
        logger.info('NWEA authentication successful')
    
    def fetch_results(self, term_name=None):
        headers = {'Authorization': f'Bearer {self.token}', 'Accept': 'application/json'}
        params = {}
        if term_name:
            params['termName'] = term_name
        results = []
        url = f'{NWEA_BASE}/v1/assessmentResults'
        while url:
            resp = requests.get(url, headers=headers, params=params, timeout=60)
            resp.raise_for_status()
            data = resp.json()
            results.extend(data.get('assessmentResults', []))
            url = data.get('nextPageUrl')
            params = None
        logger.info(f'Fetched {len(results)} NWEA results')
        return results
    
    def load_results(self, results, school_year, season):
        with self.engine.begin() as conn:
            for r in results:
                student_id_result = conn.execute(text(
                    "SELECT student_id FROM analytics.dim_students WHERE sis_id = :sid"),
                    {'sid': r.get('studentId','')})
                row = student_id_result.fetchone()
                if not row:
                    logger.warning(f"Student not found: {r.get('studentId')}")
                    continue
                student_id = row[0]
                conn.execute(text("""
                    INSERT INTO analytics.fact_assessments
                    (student_id, school_id, assessment_platform, assessment_type, subject,
                     grade_level_tested, school_year, season, test_date, rit_score,
                     percentile_rank, growth_percentile, lexile_score, quantile_score, raw_data)
                    VALUES (:student_id,
                        (SELECT school_id FROM analytics.dim_enrollments WHERE student_id = :student_id AND is_current = TRUE LIMIT 1),
                        'nwea_map', 'benchmark', :subject, :grade, :school_year, :season,
                        :test_date, :rit, :percentile, :growth, :lexile, :quantile, :raw)
                    ON CONFLICT DO NOTHING
                """), {
                    'student_id': student_id,
                    'subject': r.get('subject','').lower().replace('mathematics','math'),
                    'grade': r.get('grade',''), 'school_year': school_year, 'season': season,
                    'test_date': r.get('testDate',''), 'rit': r.get('ritScore'),
                    'percentile': r.get('percentile'), 'growth': r.get('growthPercentile'),
                    'lexile': r.get('lexileScore'), 'quantile': r.get('quantileScore'),
                    'raw': json.dumps(r)
                })
        logger.info(f'Loaded {len(results)} NWEA results into warehouse')

class CSVImporter:
    """Generic CSV importer with configurable column mappings."""
    COLUMN_MAPS = {
        'nwea_map': {
            'StudentID': 'sis_id', 'TermName': 'season', 'Subject': 'subject',
            'Grade': 'grade', 'TestRITScore': 'rit_score', 'TestPercentile': 'percentile_rank',
            'GrowthPercentile': 'growth_percentile', 'TestDate': 'test_date'
        },
        'star_reading': {
            'Student ID': 'sis_id', 'Assessment Date': 'test_date', 'Grade Equivalent': 'grade',
            'Scaled Score': 'scaled_score', 'Percentile Rank': 'percentile_rank',
            'Lexile': 'lexile_score'
        },
        'generic': {
            'student_id': 'sis_id', 'test_date': 'test_date', 'subject': 'subject',
            'grade': 'grade', 'scaled_score': 'scaled_score', 'percentile': 'percentile_rank'
        }
    }
    
    def __init__(self, engine, platform='nwea_map'):
        self.engine = engine
        self.platform = platform
        self.col_map = self.COLUMN_MAPS.get(platform, self.COLUMN_MAPS['generic'])
    
    def import_file(self, filepath, school_year, season):
        df = pd.read_csv(filepath)
        df = df.rename(columns=self.col_map)
        logger.info(f'Importing {len(df)} rows from {filepath}')
        with self.engine.begin() as conn:
            for _, row in df.iterrows():
                student_result = conn.execute(text(
                    "SELECT student_id FROM analytics.dim_students WHERE sis_id = :sid"),
                    {'sid': str(row.get('sis_id',''))})
                srow = student_result.fetchone()
                if not srow:
                    continue
                conn.execute(text("""
                    INSERT INTO analytics.fact_assessments
                    (student_id, school_id, assessment_platform, assessment_type, subject,
                     grade_level_tested, school_year, season, test_date,
                     rit_score, scaled_score, percentile_rank, growth_percentile, lexile_score)
                    VALUES (:sid,
                        (SELECT school_id FROM analytics.dim_enrollments WHERE student_id = :sid AND is_current = TRUE LIMIT 1),
                        :platform, 'benchmark', :subject, :grade, :year, :season, :test_date,
                        :rit, :scaled, :pct, :growth, :lexile)
                    ON CONFLICT DO NOTHING
                """), {
                    'sid': srow[0], 'platform': self.platform,
                    'subject': str(row.get('subject','')).lower(),
                    'grade': str(row.get('grade','')), 'year': school_year, 'season': season,
                    'test_date': row.get('test_date',''), 'rit': row.get('rit_score'),
                    'scaled': row.get('scaled_score'), 'pct': row.get('percentile_rank'),
                    'growth': row.get('growth_percentile'), 'lexile': row.get('lexile_score')
                })
        logger.info(f'CSV import complete for {filepath}')

Cohort Builder

Type: workflow Python module that automatically generates cohort groupings from enrollment data. Creates grade-year cohorts (default), and optionally teacher-level, program-level, and demographic subgroup cohorts. Enforces a minimum cohort size of 10 to prevent FERPA re-identification. Populates dim_cohorts and bridge_cohort_students tables.

Implementation:

File: /opt/cohort-analytics/etl/cohort_builder.py
python
import os
import logging
import argparse
from dotenv import load_dotenv
from sqlalchemy import create_engine, text

load_dotenv('/opt/cohort-analytics/.env')
logger = logging.getLogger('cohort_builder')

DB_URL = os.getenv('DATABASE_URL')
MIN_COHORT_SIZE = 10  # FERPA safe harbor minimum

def build_grade_year_cohorts(engine):
    """Create cohorts by grade level and school year for each school."""
    with engine.begin() as conn:
        rows = conn.execute(text("""
            SELECT DISTINCT e.school_id, e.grade_level, e.school_year, s.school_name, COUNT(DISTINCT e.student_id) as cnt
            FROM analytics.dim_enrollments e
            JOIN analytics.dim_schools s ON e.school_id = s.school_id
            GROUP BY e.school_id, e.grade_level, e.school_year, s.school_name
            HAVING COUNT(DISTINCT e.student_id) >= :min_size
        """), {'min_size': MIN_COHORT_SIZE}).fetchall()
        
        for r in rows:
            school_id, grade, year, school_name, count = r
            cohort_name = f'{school_name} - Grade {grade} - {year}'
            result = conn.execute(text("""
                INSERT INTO analytics.dim_cohorts (cohort_name, cohort_type, school_year, grade_level, school_id, student_count)
                VALUES (:name, 'grade-year', :year, :grade, :school_id, :count)
                ON CONFLICT DO NOTHING
                RETURNING cohort_id
            """), {'name': cohort_name, 'year': year, 'grade': grade, 'school_id': school_id, 'count': count})
            cohort_row = result.fetchone()
            if cohort_row:
                cohort_id = cohort_row[0]
                conn.execute(text("""
                    INSERT INTO analytics.bridge_cohort_students (cohort_id, student_id)
                    SELECT :cid, DISTINCT e.student_id
                    FROM analytics.dim_enrollments e
                    WHERE e.school_id = :sid AND e.grade_level = :grade AND e.school_year = :year
                    ON CONFLICT DO NOTHING
                """), {'cid': cohort_id, 'sid': school_id, 'grade': grade, 'year': year})
        logger.info(f'Built {len(rows)} grade-year cohorts')

def build_subgroup_cohorts(engine, school_year):
    """Create demographic subgroup cohorts (ELL, SPED, FRL) for equity analysis."""
    subgroups = [
        ('ELL', "s.ell_status = TRUE"),
        ('SPED', "s.sped_status = TRUE"),
        ('FRL', "s.frl_status IN ('free','reduced')"),
    ]
    with engine.begin() as conn:
        for sg_name, sg_filter in subgroups:
            rows = conn.execute(text(f"""
                SELECT DISTINCT e.school_id, e.grade_level, sc.school_name, COUNT(DISTINCT e.student_id) as cnt
                FROM analytics.dim_enrollments e
                JOIN analytics.dim_students s ON e.student_id = s.student_id
                JOIN analytics.dim_schools sc ON e.school_id = sc.school_id
                WHERE e.school_year = :year AND {sg_filter}
                GROUP BY e.school_id, e.grade_level, sc.school_name
                HAVING COUNT(DISTINCT e.student_id) >= :min_size
            """), {'year': school_year, 'min_size': MIN_COHORT_SIZE}).fetchall()
            
            for r in rows:
                school_id, grade, school_name, count = r
                cohort_name = f'{school_name} - Grade {grade} - {sg_name} - {school_year}'
                result = conn.execute(text("""
                    INSERT INTO analytics.dim_cohorts (cohort_name, cohort_type, school_year, grade_level, school_id, student_count, filter_criteria)
                    VALUES (:name, 'demographic', :year, :grade, :sid, :count, :criteria)
                    ON CONFLICT DO NOTHING
                    RETURNING cohort_id
                """), {'name': cohort_name, 'year': school_year, 'grade': grade, 'sid': school_id,
                       'count': count, 'criteria': f'{{"subgroup": "{sg_name}"}}'})
                # ... populate bridge table similarly
        logger.info(f'Built subgroup cohorts for {school_year}')

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--method', choices=['grade-year','all'], default='grade-year')
    parser.add_argument('--min-size', type=int, default=10)
    parser.add_argument('--school-year', default='2024-2025')
    args = parser.parse_args()
    
    global MIN_COHORT_SIZE
    MIN_COHORT_SIZE = args.min_size
    engine = create_engine(DB_URL)
    build_grade_year_cohorts(engine)
    if args.method == 'all':
        build_subgroup_cohorts(engine, args.school_year)

if __name__ == '__main__':
    main()

Cohort Analysis Engine

Type: skill Core AI/statistics engine that computes cohort aggregate metrics, performs statistical significance testing (t-tests, effect sizes) for cohort-vs-standard and cohort-vs-cohort comparisons, and identifies statistically significant trends. Uses scipy for statistical tests and scikit-learn for growth prediction. Populates fact_cohort_metrics and fact_cohort_comparisons tables.

Implementation

File: /opt/cohort-analytics/etl/analysis_engine.py
python
# File: /opt/cohort-analytics/etl/analysis_engine.py

import os
import logging
import argparse
import numpy as np
from scipy import stats
from dotenv import load_dotenv
from sqlalchemy import create_engine, text

load_dotenv('/opt/cohort-analytics/.env')
logger = logging.getLogger('analysis_engine')

DB_URL = os.getenv('DATABASE_URL')

def compute_cohort_metrics(engine):
    """Compute aggregate metrics for each cohort-subject-season combination."""
    with engine.begin() as conn:
        cohorts = conn.execute(text("""
            SELECT c.cohort_id, c.grade_level, c.school_year
            FROM analytics.dim_cohorts c
            WHERE c.student_count >= 10
        """)).fetchall()
        
        for cohort_id, grade, school_year in cohorts:
            # Get subjects available for this cohort
            subjects = conn.execute(text("""
                SELECT DISTINCT fa.subject, fa.season
                FROM analytics.fact_assessments fa
                JOIN analytics.bridge_cohort_students bcs ON fa.student_id = bcs.student_id
                WHERE bcs.cohort_id = :cid AND fa.school_year = :year
            """), {'cid': cohort_id, 'year': school_year}).fetchall()
            
            for subject, season in subjects:
                # Get individual scores for statistical calculations
                scores_result = conn.execute(text("""
                    SELECT fa.rit_score, fa.scaled_score, fa.percentile_rank, fa.growth_percentile,
                           fa.proficiency_level
                    FROM analytics.fact_assessments fa
                    JOIN analytics.bridge_cohort_students bcs ON fa.student_id = bcs.student_id
                    WHERE bcs.cohort_id = :cid AND fa.subject = :subj AND fa.season = :season
                        AND fa.school_year = :year
                """), {'cid': cohort_id, 'subj': subject, 'season': season, 'year': school_year}).fetchall()
                
                if len(scores_result) < 10:
                    continue
                
                # Use RIT score if available, else scaled_score
                raw_scores = [r[0] if r[0] is not None else r[1] for r in scores_result if (r[0] is not None or r[1] is not None)]
                percentiles = [r[2] for r in scores_result if r[2] is not None]
                growth_pcts = [r[3] for r in scores_result if r[3] is not None]
                proficiency_levels = [r[4] for r in scores_result if r[4] is not None]
                
                if len(raw_scores) < 10:
                    continue
                
                scores_arr = np.array(raw_scores, dtype=float)
                mean_score = float(np.mean(scores_arr))
                median_score = float(np.median(scores_arr))
                std_dev = float(np.std(scores_arr, ddof=1))
                
                # Proficiency percentages
                total_prof = len(proficiency_levels) or 1
                pct_below = sum(1 for p in proficiency_levels if p == 'below') / total_prof * 100
                pct_approaching = sum(1 for p in proficiency_levels if p == 'approaching') / total_prof * 100
                pct_meets = sum(1 for p in proficiency_levels if p == 'meets') / total_prof * 100
                pct_exceeds = sum(1 for p in proficiency_levels if p == 'exceeds') / total_prof * 100
                
                mean_pct_rank = float(np.mean(percentiles)) if percentiles else None
                mean_growth = float(np.mean(growth_pcts)) if growth_pcts else None
                
                # Compare to national standard
                standard_result = conn.execute(text("""
                    SELECT standard_id, percentile_50, std_deviation
                    FROM analytics.dim_standards
                    WHERE subject = :subj AND grade_level = :grade AND season = :season
                        AND standard_type = 'national_norm'
                    ORDER BY source_year DESC LIMIT 1
                """), {'subj': subject, 'grade': grade, 'season': season}).fetchone()
                
                standard_id = None
                effect_size = None
                p_value = None
                
                if standard_result:
                    standard_id = standard_result[0]
                    standard_mean = float(standard_result[1])
                    standard_sd = float(standard_result[2]) if standard_result[2] else std_dev
                    
                    # One-sample t-test: cohort mean vs. national norm
                    t_stat, p_value = stats.ttest_1samp(scores_arr, standard_mean)
                    p_value = float(p_value)
                    
                    # Cohen's d effect size
                    pooled_sd = np.sqrt((std_dev**2 + standard_sd**2) / 2)
                    effect_size = float((mean_score - standard_mean) / pooled_sd) if pooled_sd > 0 else 0.0
                
                conn.execute(text("""
                    INSERT INTO analytics.fact_cohort_metrics
                    (cohort_id, subject, season, school_year, student_count, mean_score, median_score,
                     std_deviation, pct_below_standard, pct_approaching_standard, pct_meets_standard,
                     pct_exceeds_standard, mean_percentile_rank, mean_growth_percentile,
                     comparison_standard_id, effect_size_vs_standard, p_value_vs_standard)
                    VALUES (:cid, :subj, :season, :year, :count, :mean, :median, :std,
                            :below, :approaching, :meets, :exceeds, :pct_rank, :growth,
                            :std_id, :effect, :pval)
                    ON CONFLICT DO NOTHING
                """), {
                    'cid': cohort_id, 'subj': subject, 'season': season, 'year': school_year,
                    'count': len(raw_scores), 'mean': mean_score, 'median': median_score,
                    'std': std_dev, 'below': pct_below, 'approaching': pct_approaching,
                    'meets': pct_meets, 'exceeds': pct_exceeds, 'pct_rank': mean_pct_rank,
                    'growth': mean_growth, 'std_id': standard_id, 'effect': effect_size, 'pval': p_value
                })
        logger.info(f'Computed metrics for {len(cohorts)} cohorts')

def compute_yoy_comparisons(engine):
    """Compare each cohort to the same grade/subject/season from the prior year."""
    with engine.begin() as conn:
        # Get all current year cohort metrics
        current_metrics = conn.execute(text("""
            SELECT cm.cohort_id, c.grade_level, c.school_id, cm.subject, cm.season, c.school_year,
                   cm.mean_score, cm.std_deviation, cm.student_count
            FROM analytics.fact_cohort_metrics cm
            JOIN analytics.dim_cohorts c ON cm.cohort_id = c.cohort_id
            WHERE c.cohort_type = 'grade-year'
        """)).fetchall()
        
        comparisons_made = 0
        for curr in current_metrics:
            cid_a, grade, school_id, subject, season, year_a, mean_a, std_a, n_a = curr
            
            # Determine prior year
            start_year = int(year_a.split('-')[0]) - 1
            year_b = f'{start_year}-{start_year+1}'
            
            # Find matching prior cohort
            prior = conn.execute(text("""
                SELECT cm.cohort_id, cm.mean_score, cm.std_deviation, cm.student_count
                FROM analytics.fact_cohort_metrics cm
                JOIN analytics.dim_cohorts c ON cm.cohort_id = c.cohort_id
                WHERE c.grade_level = :grade AND c.school_id = :sid AND c.school_year = :year_b
                    AND cm.subject = :subj AND cm.season = :season AND c.cohort_type = 'grade-year'
                LIMIT 1
            """), {'grade': grade, 'sid': school_id, 'year_b': year_b, 'subj': subject, 'season': season}).fetchone()
            
            if not prior:
                continue
            
            cid_b, mean_b, std_b, n_b = prior
            mean_diff = float(mean_a) - float(mean_b)
            
            # Welch's t-test for independent samples (different cohort, same grade)
            pooled_sd = np.sqrt((float(std_a)**2/float(n_a)) + (float(std_b)**2/float(n_b)))
            if pooled_sd > 0:
                t_stat = mean_diff / pooled_sd
                df = ((float(std_a)**2/float(n_a) + float(std_b)**2/float(n_b))**2 /
                      ((float(std_a)**2/float(n_a))**2/(float(n_a)-1) + (float(std_b)**2/float(n_b))**2/(float(n_b)-1)))
                p_val = float(2 * stats.t.sf(abs(t_stat), df))
            else:
                p_val = 1.0
            
            # Cohen's d
            pooled_sd_effect = np.sqrt((float(std_a)**2 + float(std_b)**2) / 2)
            effect = float(mean_diff / pooled_sd_effect) if pooled_sd_effect > 0 else 0.0
            
            is_sig = p_val < 0.05
            direction = 'improvement' if mean_diff > 0 and is_sig else ('decline' if mean_diff < 0 and is_sig else 'no_change')
            
            conn.execute(text("""
                INSERT INTO analytics.fact_cohort_comparisons
                (cohort_a_id, cohort_b_id, subject, season, mean_diff, effect_size, p_value, is_significant, direction)
                VALUES (:a, :b, :subj, :season, :diff, :effect, :pval, :sig, :dir)
                ON CONFLICT DO NOTHING
            """), {'a': cid_a, 'b': cid_b, 'subj': subject, 'season': season,
                   'diff': mean_diff, 'effect': effect, 'pval': p_val, 'sig': is_sig, 'dir': direction})
            comparisons_made += 1
        
        logger.info(f'Computed {comparisons_made} year-over-year comparisons')

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--full-rebuild', action='store_true')
    args = parser.parse_args()
    
    engine = create_engine(DB_URL)
    compute_cohort_metrics(engine)
    compute_yoy_comparisons(engine)
    
    # Refresh materialized views
    with engine.begin() as conn:
        conn.execute(text('SELECT analytics.refresh_materialized_views()'))
    logger.info('Analysis engine complete. Materialized views refreshed.')

if __name__ == '__main__':
    main()

AI Insight Narrative Generator

Type: agent Uses OpenAI GPT-5.4 to generate plain-English narrative insights from the statistical analysis results. Takes cohort metrics, comparisons, and trends as structured input and produces educator-friendly summaries, alerts, and recommendations. Stores generated insights in fact_insights table and can produce full PDF-ready report sections.

Implementation

File: /opt/cohort-analytics/etl/insight_generator.py
python
import os
import json
import logging
from datetime import datetime
from dotenv import load_dotenv
from openai import OpenAI
from sqlalchemy import create_engine, text

load_dotenv('/opt/cohort-analytics/.env')
logger = logging.getLogger('insight_generator')

DB_URL = os.getenv('DATABASE_URL')
client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))

SYSTEM_PROMPT = """You are an expert educational data analyst working for a school. 
You translate statistical findings about student cohort performance into clear, actionable 
insights for educators who may not have a statistics background.

Rules:
1. Use plain, professional language — avoid jargon like "effect size" or "p-value"
2. Always frame findings in terms of student impact
3. When performance is below standard, suggest specific instructional strategies
4. When performance is above standard, highlight what's working
5. Flag any equity concerns (subgroup gaps)
6. Be honest about limitations (small sample sizes, missing data)
7. NEVER include individual student names or identifiable information
8. Use phrases like "this group of students" not specific counts under 10
9. Include specific numbers: mean scores, percentages, growth metrics
10. Structure responses with: Summary → Key Findings → Recommendations"""

def generate_cohort_summary(engine, cohort_id):
    """Generate a narrative summary for a single cohort's performance."""
    with engine.connect() as conn:
        metrics = conn.execute(text("""
            SELECT cm.subject, cm.season, cm.student_count, cm.mean_score, cm.median_score,
                   cm.pct_meets_standard, cm.pct_exceeds_standard, cm.pct_below_standard,
                   cm.mean_percentile_rank, cm.mean_growth_percentile,
                   cm.effect_size_vs_standard, cm.p_value_vs_standard,
                   st.standard_name, st.percentile_50 as standard_median,
                   c.cohort_name, c.school_year, c.grade_level
            FROM analytics.fact_cohort_metrics cm
            JOIN analytics.dim_cohorts c ON cm.cohort_id = c.cohort_id
            LEFT JOIN analytics.dim_standards st ON cm.comparison_standard_id = st.standard_id
            WHERE cm.cohort_id = :cid
        """), {'cid': cohort_id}).fetchall()
        
        if not metrics:
            return None
        
        comparisons = conn.execute(text("""
            SELECT cb.cohort_name as prior_cohort, cc.subject, cc.mean_diff,
                   cc.is_significant, cc.direction
            FROM analytics.fact_cohort_comparisons cc
            JOIN analytics.dim_cohorts cb ON cc.cohort_b_id = cb.cohort_id
            WHERE cc.cohort_a_id = :cid
        """), {'cid': cohort_id}).fetchall()
    
    # Build context for GPT-5.4
    cohort_name = metrics[0][14]
    context = {
        'cohort_name': cohort_name,
        'school_year': metrics[0][15],
        'grade_level': metrics[0][16],
        'subjects': []
    }
    
    for m in metrics:
        subj_data = {
            'subject': m[0], 'season': m[1], 'student_count': m[2],
            'mean_score': float(m[3]) if m[3] else None,
            'median_score': float(m[4]) if m[4] else None,
            'pct_proficient': float((m[5] or 0) + (m[6] or 0)),
            'pct_below': float(m[7]) if m[7] else None,
            'mean_national_percentile': float(m[8]) if m[8] else None,
            'mean_growth_percentile': float(m[9]) if m[9] else None,
            'national_standard_median': float(m[13]) if m[13] else None,
            'diff_from_standard': float(m[3] - m[13]) if (m[3] and m[13]) else None,
            'statistically_significant_vs_standard': bool(m[11] and float(m[11]) < 0.05) if m[11] else None
        }
        context['subjects'].append(subj_data)
    
    context['yoy_comparisons'] = []
    for c in comparisons:
        context['yoy_comparisons'].append({
            'prior_cohort': c[0], 'subject': c[1],
            'score_change': float(c[2]) if c[2] else None,
            'statistically_significant': bool(c[3]),
            'direction': c[4]
        })
    
    user_prompt = f"""Analyze the following cohort performance data and generate a comprehensive insight report.

Cohort Data:
{json.dumps(context, indent=2, default=str)}

Generate:
1. An executive summary (2-3 sentences)
2. Key findings for each subject (bullet points)
3. Year-over-year trend analysis (if comparison data available)
4. Specific, actionable recommendations for educators
5. Any areas of concern or celebration"""
    
    response = client.chat.completions.create(
        model='gpt-5.4',
        messages=[
            {'role': 'system', 'content': SYSTEM_PROMPT},
            {'role': 'user', 'content': user_prompt}
        ],
        temperature=0.3,
        max_tokens=2000
    )
    
    narrative = response.choices[0].message.content
    
    # Store insight
    engine2 = create_engine(DB_URL)
    with engine2.begin() as conn:
        conn.execute(text("""
            INSERT INTO analytics.fact_insights
            (cohort_id, insight_type, severity, title, body, data_context, school_year, season)
            VALUES (:cid, 'summary', 'info', :title, :body, :context, :year, :season)
        """), {
            'cid': cohort_id,
            'title': f'Performance Summary: {cohort_name}',
            'body': narrative,
            'context': json.dumps(context, default=str),
            'year': context['school_year'],
            'season': metrics[0][1]
        })
    
    logger.info(f'Generated insight for cohort {cohort_id}: {cohort_name}')
    return narrative

def generate_alert_insights(engine):
    """Generate critical alerts for cohorts with significant declines or low performance."""
    with engine.connect() as conn:
        alerts = conn.execute(text("""
            SELECT cc.cohort_a_id, ca.cohort_name, cc.subject, cc.mean_diff, cc.effect_size
            FROM analytics.fact_cohort_comparisons cc
            JOIN analytics.dim_cohorts ca ON cc.cohort_a_id = ca.cohort_id
            WHERE cc.direction = 'decline' AND cc.is_significant = TRUE
                AND ABS(cc.effect_size) > 0.3
        """)).fetchall()
    
    for alert in alerts:
        cohort_id, cohort_name, subject, diff, effect = alert
        prompt = f"""ALERT: {cohort_name} shows a statistically significant DECLINE in {subject}.
Score dropped by {abs(float(diff)):.1f} points (effect size: {abs(float(effect)):.2f}).
This is a {'moderate' if abs(float(effect)) < 0.5 else 'large'} negative change.

Generate a brief (3-4 sentence) alert message for school administrators that:
1. States the finding clearly
2. Puts it in context (what this means for students)
3. Suggests immediate next steps"""
        
        response = client.chat.completions.create(
            model='gpt-5.4',
            messages=[
                {'role': 'system', 'content': SYSTEM_PROMPT},
                {'role': 'user', 'content': prompt}
            ],
            temperature=0.2,
            max_tokens=500
        )
        
        body = response.choices[0].message.content
        with engine.begin() as conn:
            conn.execute(text("""
                INSERT INTO analytics.fact_insights
                (cohort_id, insight_type, severity, title, body, data_context, is_active)
                VALUES (:cid, 'alert', 'critical', :title, :body, :ctx, TRUE)
            """), {
                'cid': cohort_id,
                'title': f'DECLINE ALERT: {cohort_name} - {subject}',
                'body': body,
                'ctx': json.dumps({'subject': subject, 'mean_diff': float(diff), 'effect_size': float(effect)})
            })
        logger.info(f'Generated alert for {cohort_name} - {subject}')

def run_all(engine):
    with engine.connect() as conn:
        cohorts = conn.execute(text(
            "SELECT cohort_id FROM analytics.dim_cohorts WHERE student_count >= 10"
        )).fetchall()
    
    for (cid,) in cohorts:
        generate_cohort_summary(engine, cid)
    generate_alert_insights(engine)
    logger.info('All insights generated')

if __name__ == '__main__':
    engine = create_engine(DB_URL)
    run_all(engine)

Main ETL Pipeline Orchestrator

Type: workflow Master orchestrator script that runs the complete nightly ETL pipeline: Clever sync → Assessment import → Cohort rebuild → Statistical analysis → AI insight generation → Materialized view refresh → Heartbeat notification. This is the script invoked by the cron job.

Implementation:

File: /opt/cohort-analytics/etl/main_pipeline.py
python
# File: /opt/cohort-analytics/etl/main_pipeline.py

import os
import sys
import logging
import traceback
import requests
from datetime import datetime
from dotenv import load_dotenv
from sqlalchemy import create_engine, text

# Add project root to path
sys.path.insert(0, '/opt/cohort-analytics')
load_dotenv('/opt/cohort-analytics/.env')

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s %(name)s %(levelname)s %(message)s',
    handlers=[
        logging.FileHandler('/opt/cohort-analytics/logs/etl.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger('main_pipeline')

DB_URL = os.getenv('DATABASE_URL')
HEARTBEAT_URL = os.getenv('HEARTBEAT_URL', '')  # Uptime Kuma heartbeat URL
ALERT_WEBHOOK = os.getenv('ALERT_WEBHOOK', '')  # Teams/Slack webhook for failures
SCHOOL_YEAR = os.getenv('CURRENT_SCHOOL_YEAR', '2024-2025')

def send_alert(message):
    if ALERT_WEBHOOK:
        try:
            requests.post(ALERT_WEBHOOK, json={'text': f'🚨 Cohort Analytics ETL Alert: {message}'}, timeout=10)
        except Exception:
            pass

def main():
    start_time = datetime.now()
    logger.info(f'=== ETL Pipeline Starting: {start_time.isoformat()} ===')
    engine = create_engine(DB_URL)
    
    steps = [
        ('Clever Rostering Sync', lambda: _run_clever(engine)),
        ('Assessment Data Import', lambda: _run_assessments(engine)),
        ('Cohort Builder', lambda: _run_cohorts(engine)),
        ('Analysis Engine', lambda: _run_analysis(engine)),
        ('Insight Generator', lambda: _run_insights(engine)),
        ('Materialized View Refresh', lambda: _refresh_views(engine)),
    ]
    
    failed_steps = []
    for step_name, step_fn in steps:
        try:
            logger.info(f'--- Starting: {step_name} ---')
            step_fn()
            logger.info(f'--- Completed: {step_name} ---')
        except Exception as e:
            logger.error(f'FAILED: {step_name}: {e}')
            logger.error(traceback.format_exc())
            failed_steps.append(step_name)
            # Continue to next step — don't fail the entire pipeline
    
    elapsed = (datetime.now() - start_time).total_seconds()
    
    if failed_steps:
        msg = f'Pipeline completed with errors in: {", ".join(failed_steps)}. Elapsed: {elapsed:.0f}s'
        logger.warning(msg)
        send_alert(msg)
    else:
        logger.info(f'Pipeline completed successfully. Elapsed: {elapsed:.0f}s')
        # Send heartbeat to monitoring
        if HEARTBEAT_URL:
            try:
                requests.get(HEARTBEAT_URL, timeout=10)
            except Exception:
                pass

def _run_clever(engine):
    from etl.clever_sync import sync_schools, sync_students, sync_enrollments
    sync_schools(engine)
    sync_students(engine)
    sync_enrollments(engine, SCHOOL_YEAR)

def _run_assessments(engine):
    from etl.assessment_importer import NWEAImporter, CSVImporter
    import glob
    # Try NWEA API first
    if os.getenv('NWEA_CLIENT_ID'):
        nwea = NWEAImporter(engine)
        nwea.authenticate()
        results = nwea.fetch_results()
        if results:
            season = _determine_current_season()
            nwea.load_results(results, SCHOOL_YEAR, season)
    # Process any CSV files in the incoming directory
    incoming = glob.glob('/opt/cohort-analytics/data/incoming/*.csv')
    for f in incoming:
        csv_imp = CSVImporter(engine, platform='nwea_map')  # Detect platform from filename
        season = _determine_current_season()
        csv_imp.import_file(f, SCHOOL_YEAR, season)
        os.rename(f, f.replace('/incoming/', '/processed/'))

def _run_cohorts(engine):
    from etl.cohort_builder import build_grade_year_cohorts, build_subgroup_cohorts
    build_grade_year_cohorts(engine)
    build_subgroup_cohorts(engine, SCHOOL_YEAR)

def _run_analysis(engine):
    from etl.analysis_engine import compute_cohort_metrics, compute_yoy_comparisons
    compute_cohort_metrics(engine)
    compute_yoy_comparisons(engine)

def _run_insights(engine):
    from etl.insight_generator import run_all
    run_all(engine)

def _refresh_views(engine):
    with engine.begin() as conn:
        conn.execute(text('SELECT analytics.refresh_materialized_views()'))

def _determine_current_season():
    month = datetime.now().month
    if month in [8, 9, 10, 11]:
        return 'fall'
    elif month in [12, 1, 2]:
        return 'winter'
    else:
        return 'spring'

if __name__ == '__main__':
    main()

NWEA National Norms Loader

Type: integration Loads NWEA national norm reference data into the dim_standards table. Uses the publicly available NWEA 2024 Norms Study data to populate median RIT scores, standard deviations, and percentile breakpoints by grade, subject, and season. This data is essential for cohort-vs-standard comparisons.

Implementation:

File: /opt/cohort-analytics/etl/standards_loader.py
python
import os
import logging
import argparse
from dotenv import load_dotenv
from sqlalchemy import create_engine, text

load_dotenv('/opt/cohort-analytics/.env')
logger = logging.getLogger('standards_loader')

DB_URL = os.getenv('DATABASE_URL')

# NWEA 2024 National Norm RIT Score Reference Data
# Source: NWEA 2024 MAP Growth Normative Data (approximate values from published norms)
# Format: (grade, subject, season, p25, p50, p75, mean, std_dev)
NWEA_2024_NORMS = [
    # Math
    ('K', 'math', 'fall', 130, 140, 150, 140.0, 12.0),
    ('K', 'math', 'winter', 143, 153, 163, 153.0, 12.0),
    ('K', 'math', 'spring', 151, 161, 171, 161.0, 12.0),
    ('1', 'math', 'fall', 152, 162, 172, 162.0, 12.5),
    ('1', 'math', 'winter', 163, 173, 183, 173.0, 12.5),
    ('1', 'math', 'spring', 170, 180, 190, 180.0, 12.5),
    ('2', 'math', 'fall', 170, 180, 190, 180.0, 13.0),
    ('2', 'math', 'winter', 180, 190, 200, 190.0, 13.0),
    ('2', 'math', 'spring', 186, 192, 202, 192.0, 13.0),
    ('3', 'math', 'fall', 188, 198, 208, 198.0, 13.5),
    ('3', 'math', 'winter', 196, 204, 214, 204.0, 13.5),
    ('3', 'math', 'spring', 201, 209, 219, 209.0, 13.5),
    ('4', 'math', 'fall', 200, 210, 220, 210.0, 14.0),
    ('4', 'math', 'winter', 208, 216, 226, 216.0, 14.0),
    ('4', 'math', 'spring', 213, 221, 231, 221.0, 14.0),
    ('5', 'math', 'fall', 211, 221, 231, 221.0, 14.5),
    ('5', 'math', 'winter', 218, 227, 237, 227.0, 14.5),
    ('5', 'math', 'spring', 222, 231, 241, 231.0, 14.5),
    ('6', 'math', 'fall', 217, 227, 237, 227.0, 15.0),
    ('6', 'math', 'winter', 222, 232, 242, 232.0, 15.0),
    ('6', 'math', 'spring', 225, 235, 245, 235.0, 15.0),
    ('7', 'math', 'fall', 222, 232, 244, 233.0, 16.0),
    ('7', 'math', 'winter', 227, 237, 249, 238.0, 16.0),
    ('7', 'math', 'spring', 230, 240, 252, 241.0, 16.0),
    ('8', 'math', 'fall', 226, 238, 252, 239.0, 17.0),
    ('8', 'math', 'winter', 230, 242, 256, 243.0, 17.0),
    ('8', 'math', 'spring', 232, 244, 258, 245.0, 17.0),
    # Reading
    ('K', 'reading', 'fall', 128, 141, 152, 141.0, 14.0),
    ('K', 'reading', 'winter', 140, 153, 164, 153.0, 14.0),
    ('K', 'reading', 'spring', 149, 160, 170, 160.0, 14.0),
    ('1', 'reading', 'fall', 153, 163, 175, 164.0, 14.5),
    ('1', 'reading', 'winter', 165, 177, 188, 177.0, 14.5),
    ('1', 'reading', 'spring', 172, 185, 195, 184.0, 14.5),
    ('2', 'reading', 'fall', 174, 186, 197, 186.0, 15.0),
    ('2', 'reading', 'winter', 182, 193, 203, 193.0, 15.0),
    ('2', 'reading', 'spring', 186, 198, 208, 198.0, 15.0),
    ('3', 'reading', 'fall', 188, 199, 210, 199.0, 15.0),
    ('3', 'reading', 'winter', 193, 204, 215, 204.0, 15.0),
    ('3', 'reading', 'spring', 196, 207, 218, 207.0, 15.0),
    ('4', 'reading', 'fall', 196, 207, 218, 207.0, 15.5),
    ('4', 'reading', 'winter', 200, 211, 222, 211.0, 15.5),
    ('4', 'reading', 'spring', 202, 214, 224, 214.0, 15.5),
    ('5', 'reading', 'fall', 202, 214, 224, 214.0, 15.5),
    ('5', 'reading', 'winter', 206, 217, 228, 217.0, 15.5),
    ('5', 'reading', 'spring', 208, 219, 230, 219.0, 15.5),
    ('6', 'reading', 'fall', 207, 218, 229, 218.0, 16.0),
    ('6', 'reading', 'winter', 210, 221, 232, 221.0, 16.0),
    ('6', 'reading', 'spring', 212, 223, 234, 223.0, 16.0),
    ('7', 'reading', 'fall', 211, 222, 233, 222.0, 16.0),
    ('7', 'reading', 'winter', 213, 224, 235, 224.0, 16.0),
    ('7', 'reading', 'spring', 215, 226, 237, 226.0, 16.0),
    ('8', 'reading', 'fall', 214, 226, 237, 226.0, 16.5),
    ('8', 'reading', 'winter', 217, 228, 239, 228.0, 16.5),
    ('8', 'reading', 'spring', 218, 230, 241, 230.0, 16.5),
]

def load_nwea_norms(engine):
    with engine.begin() as conn:
        for grade, subject, season, p25, p50, p75, mean, std in NWEA_2024_NORMS:
            conn.execute(text("""
                INSERT INTO analytics.dim_standards
                (standard_name, standard_type, subject, grade_level, season,
                 metric_name, percentile_25, percentile_50, percentile_75, mean_score, std_deviation, source_year)
                VALUES (:name, 'national_norm', :subj, :grade, :season,
                        'rit_score', :p25, :p50, :p75, :mean, :std, '2024-2025')
                ON CONFLICT DO NOTHING
            """), {'name': f'NWEA 2024 National Norm - Grade {grade} {subject} {season}',
                   'subj': subject, 'grade': grade, 'season': season,
                   'p25': p25, 'p50': p50, 'p75': p75, 'mean': mean, 'std': std})
    logger.info(f'Loaded {len(NWEA_2024_NORMS)} NWEA 2024 norm entries')

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--source', default='nwea_norms_2024')
    args = parser.parse_args()
    engine = create_engine(DB_URL)
    if args.source == 'nwea_norms_2024':
        load_nwea_norms(engine)

if __name__ == '__main__':
    main()

Cohort Performance Dashboard SQL Queries

Type: prompt

Complete set of SQL queries used in Metabase to power the five core dashboards. Each query is designed to be used as a Metabase 'Native Query' (custom SQL question) with filter variable syntax for interactive filtering.

Implementation:

Dashboard 1: Cohort vs. Standards Overview — Cohort Performance vs National Standards
sql
-- =============================================
-- DASHBOARD 1: Cohort vs. Standards Overview
-- =============================================
-- Metabase Native Query: Cohort Performance vs National Standards
SELECT
    cohort_name,
    school_year,
    grade_level,
    school_name,
    subject,
    season,
    student_count,
    ROUND(mean_score, 1) AS "Cohort Mean",
    ROUND(standard_median, 1) AS "National Median",
    ROUND(diff_from_standard, 1) AS "Difference",
    ROUND(pct_proficient, 1) AS "% Proficient",
    performance_category AS "Status"
FROM analytics.mv_cohort_vs_standards
WHERE 1=1
  [[AND school_year = {{school_year}}]]
  [[AND grade_level = {{grade_level}}]]
  [[AND subject = {{subject}}]]
  [[AND school_name = {{school}}]]
ORDER BY school_year DESC, grade_level, subject;
Dashboard 2: Year-over-Year Comparison
sql
-- =============================================
-- DASHBOARD 2: Year-over-Year Comparison
-- =============================================
SELECT
    current_cohort,
    prior_cohort,
    grade_level AS "Grade",
    subject AS "Subject",
    season AS "Season",
    ROUND(current_mean, 1) AS "Current Mean",
    ROUND(prior_mean, 1) AS "Prior Year Mean",
    ROUND(mean_diff, 1) AS "Change",
    CASE WHEN is_significant THEN '✅ Yes' ELSE '❌ No' END AS "Statistically Significant",
    INITCAP(direction) AS "Trend",
    narrative AS "AI Analysis"
FROM analytics.mv_yoy_comparison
WHERE 1=1
  [[AND current_year = {{school_year}}]]
  [[AND grade_level = {{grade_level}}]]
  [[AND subject = {{subject}}]]
ORDER BY grade_level, subject;
Dashboard 3: Subgroup Equity Analysis
sql
-- =============================================
-- DASHBOARD 3: Subgroup Equity Analysis
-- =============================================
SELECT
    c.cohort_name,
    c.filter_criteria->>'subgroup' AS "Subgroup",
    c.grade_level AS "Grade",
    cm.subject AS "Subject",
    cm.season AS "Season",
    cm.student_count AS "N",
    ROUND(cm.mean_score, 1) AS "Mean Score",
    ROUND(cm.mean_percentile_rank, 1) AS "Avg Percentile",
    ROUND(cm.pct_meets_standard + cm.pct_exceeds_standard, 1) AS "% Proficient",
    ROUND(overall.mean_score - cm.mean_score, 1) AS "Gap vs All Students"
FROM analytics.fact_cohort_metrics cm
JOIN analytics.dim_cohorts c ON cm.cohort_id = c.cohort_id
LEFT JOIN (
    SELECT cm2.subject, cm2.season, cm2.school_year, c2.grade_level, c2.school_id, cm2.mean_score
    FROM analytics.fact_cohort_metrics cm2
    JOIN analytics.dim_cohorts c2 ON cm2.cohort_id = c2.cohort_id
    WHERE c2.cohort_type = 'grade-year'
) overall ON overall.subject = cm.subject AND overall.season = cm.season
    AND overall.school_year = c.school_year AND overall.grade_level = c.grade_level
    AND overall.school_id = c.school_id
WHERE c.cohort_type = 'demographic'
  [[AND c.school_year = {{school_year}}]]
  [[AND c.grade_level = {{grade_level}}]]
ORDER BY c.grade_level, cm.subject, c.filter_criteria->>'subgroup';
Dashboard 4: Growth Trajectory
sql
-- =============================================
-- DASHBOARD 4: Growth Trajectory
-- =============================================
SELECT
    c.cohort_name,
    c.grade_level,
    cm.subject,
    cm.season,
    cm.school_year,
    ROUND(cm.mean_score, 1) AS "Mean Score",
    ROUND(cm.mean_growth_percentile, 1) AS "Avg Growth Percentile",
    CASE
        WHEN cm.mean_growth_percentile >= 60 THEN '🟢 High Growth'
        WHEN cm.mean_growth_percentile >= 40 THEN '🟡 Typical Growth'
        ELSE '🔴 Low Growth'
    END AS "Growth Category",
    ROUND(cm.mean_percentile_rank, 1) AS "Avg Achievement Percentile"
FROM analytics.fact_cohort_metrics cm
JOIN analytics.dim_cohorts c ON cm.cohort_id = c.cohort_id
WHERE c.cohort_type = 'grade-year'
  [[AND c.school_year = {{school_year}}]]
  [[AND c.grade_level = {{grade_level}}]]
  [[AND cm.subject = {{subject}}]]
ORDER BY c.school_year, c.grade_level, cm.subject,
  CASE cm.season WHEN 'fall' THEN 1 WHEN 'winter' THEN 2 WHEN 'spring' THEN 3 END;
Dashboard 5: AI Insight Feed
sql
-- =============================================
-- DASHBOARD 5: AI Insight Feed
-- =============================================
SELECT
    i.created_at AS "Date",
    CASE i.severity
        WHEN 'critical' THEN '🔴'
        WHEN 'warning' THEN '🟡'
        ELSE '🟢'
    END || ' ' || INITCAP(i.severity) AS "Priority",
    i.title AS "Insight",
    i.body AS "Details",
    c.cohort_name AS "Cohort",
    i.insight_type AS "Type"
FROM analytics.fact_insights i
JOIN analytics.dim_cohorts c ON i.cohort_id = c.cohort_id
WHERE i.is_active = TRUE
  [[AND i.school_year = {{school_year}}]]
  [[AND i.severity = {{severity}}]]
ORDER BY
  CASE i.severity WHEN 'critical' THEN 1 WHEN 'warning' THEN 2 ELSE 3 END,
  i.created_at DESC
LIMIT 50;

Testing & Validation

  • FERPA Compliance Check: Verify that the Metabase 'metabase_reader' database role has SELECT-only permissions — attempt INSERT/UPDATE/DELETE and confirm they fail. Verify row-level security by logging in as a teacher-role user and confirming they can only see their assigned school's data.
  • Clever Sync Validation: Run 'python etl/clever_sync.py --full-sync' and verify that dim_schools, dim_students, and dim_enrollments tables contain the expected row counts matching the SIS. Spot-check 5 random students by comparing warehouse records to the SIS directly.
  • Assessment Import Validation: Import a known test CSV file with 50 pre-calculated records. Query fact_assessments and verify all 50 rows are present with correct RIT scores, percentiles, and grade levels. Confirm duplicate imports are rejected (run the same CSV again and verify no new rows).
  • Cohort Builder Validation: After running cohort_builder.py, verify that: (a) every grade-year combination with 10+ students has a cohort in dim_cohorts, (b) bridge_cohort_students row counts match cohort student_count values, (c) no cohort has fewer than 10 members (FERPA check).
  • Statistical Analysis Validation: Manually calculate the mean and standard deviation for one cohort-subject combination using a spreadsheet. Compare against the values in fact_cohort_metrics — they should match within 0.01. Verify the p-value from a manual one-sample t-test matches the stored value.
  • Year-over-Year Comparison Validation: Identify a grade-subject pair with data for both the current and prior year. Verify that fact_cohort_comparisons contains a row, the mean_diff matches (current_mean - prior_mean), and the direction field correctly reflects improvement/decline/no_change.
  • AI Insight Generation Validation: Run insight_generator.py for one cohort and verify: (a) a row appears in fact_insights, (b) the narrative text is coherent and references the correct cohort name and scores, (c) no individual student names or identifying information appear in the generated text, (d) response latency is under 30 seconds.
  • Dashboard Rendering Test: Open each of the 5 Metabase dashboards and verify: (a) data loads without SQL errors, (b) all filter widgets work (school year, grade, subject, school), (c) charts render correctly with appropriate labels and colors, (d) page load time is under 5 seconds.
  • SSO Integration Test: Log into Metabase via the Azure AD SSO flow with three test accounts (one per role: teacher, school admin, district admin). Verify each account lands in the correct Metabase group and sees only their authorized data.
  • Backup & Recovery Test: Trigger a manual backup, then simulate a disaster by dropping a test table. Restore from the backup and verify the table and its data are fully recovered. Document the recovery time (target: under 30 minutes for full restore).
  • TLS Certificate Test: Navigate to https://analytics.clientdomain.com and verify the TLS certificate is valid (issued by Let's Encrypt, no mixed content warnings). Run 'curl -I https://analytics.clientdomain.com' and verify HSTS header is present.
  • Monitoring Heartbeat Test: Manually run main_pipeline.py and verify the Uptime Kuma heartbeat is received (check the Uptime Kuma dashboard for a green status). Simulate a pipeline failure and verify the alert webhook fires to the MSP's Slack/Teams channel within 5 minutes.
  • End-to-End Smoke Test: Upload a new batch of assessment data (CSV), wait for the nightly pipeline to run (or trigger manually), then verify the new data appears on all dashboards, cohort metrics are recalculated, year-over-year comparisons update, and a new AI insight is generated for the affected cohorts.
  • Performance Load Test: Using the Metabase API, query the cohort_vs_standards dashboard with 5 concurrent users and verify response time remains under 5 seconds. For schools with 2,000+ students, verify the materialized view refresh completes within 60 seconds.

Client Handoff

The client handoff should be a structured 2-hour session with the Data Champion and key stakeholders (principal, academic director, lead teachers). Cover the following:

1. Dashboard Training (45 minutes)

  • Walk through each of the 5 dashboards with the client's actual data
  • Demonstrate filter usage (selecting grade levels, subjects, school years)
  • Explain what each metric means in plain English (mean score, percentile rank, proficiency %, growth percentile)
  • Show how to interpret the performance categories (Above/At/Below Standard)
  • Demonstrate the AI Insight Feed and how to read generated narratives
  • Show how to use dashboard subscriptions to receive weekly email reports

2. Data Interpretation Guide (30 minutes)

  • Explain what 'statistically significant' means in practical terms (it's a real difference, not just random noise)
  • Walk through how to compare the current 3rd grade cohort to last year's 3rd graders
  • Demonstrate the subgroup equity analysis and what achievement gaps look like
  • Show the growth trajectory dashboard and explain the difference between achievement (where they are) and growth (how fast they're moving)

3. Operational Procedures (20 minutes)

  • Who to contact for support (MSP help desk number and email)
  • What happens automatically (nightly data sync, weekly email reports, AI insight generation)
  • What requires manual action (uploading CSV files for assessments not connected via API, adding new assessment windows)
  • Where to upload CSV files if needed (shared folder path or SFTP location)
  • How to request a new cohort grouping or custom report

4. Documentation Package (leave behind)

  • Quick-start guide (2-page PDF) with dashboard URLs, login instructions, and key metric definitions
  • Data dictionary explaining every column and metric in the dashboards
  • FERPA compliance summary showing what data is collected, where it's stored, who has access, and how to request deletion
  • Escalation path: Tier 1 (MSP help desk) → Tier 2 (MSP data engineer) → Tier 3 (MSP solutions architect)
  • Quarterly business review calendar showing when the MSP will present trend analyses to leadership

5. Success Criteria Review (15 minutes)

Maintenance

Monthly Maintenance (2-4 hours/month MSP time)

  • Review ETL pipeline logs for errors or warnings; investigate and resolve any data sync failures
  • Verify backup integrity: restore a test table from the most recent backup monthly
  • Check disk usage on the database server/VM; scale storage if approaching 80% capacity
  • Review Metabase usage analytics: identify unused dashboards and high-demand queries for optimization
  • Update Metabase container to latest patch version (docker compose pull && docker compose up -d)
  • Rotate API credentials for NWEA/Clever if approaching expiration

Quarterly Maintenance (4-8 hours/quarter MSP time)

  • Conduct a Quarterly Business Review (QBR) with the client: present trend analysis, discuss insights, gather feedback
  • Review and update the NWEA national norms if NWEA publishes updated data
  • Add new school year configuration when the academic calendar rolls over (update CURRENT_SCHOOL_YEAR in .env)
  • Review FERPA compliance: audit who has accessed the dashboards via Metabase audit logs
  • Update the OpenAI GPT model if a new version offers better performance (e.g., GPT-5.4 → GPT-5.4 mini for cost savings or next-gen model for quality)
  • Run a performance review of materialized view refresh times and database query plans; add indexes if dashboard latency exceeds 5 seconds

Assessment Window Support (3x/year, 2-4 hours each)

  • After each benchmark testing window (fall, winter, spring), verify new assessment data flows correctly through the pipeline
  • Trigger a full analysis rebuild after each window closes
  • Generate and review the AI insight report with the client's Data Champion before sharing broadly
  • Update dashboards if any new subjects or grade levels were tested

Annual Maintenance (8-16 hours/year MSP time)

  • School year rollover: archive prior year data, create new cohort definitions, update enrollment data
  • Review and renew all software licenses (Metabase Enterprise, NWEA MAP Growth, M365 A3)
  • Conduct a security review: rotate all passwords, review firewall rules, update TLS certificates, patch OS
  • Review state privacy law changes and update DPA if needed (especially COPPA 2025 amendments)
  • Upgrade PostgreSQL if a new major version is available (test upgrade in staging first)
  • Re-evaluate the solution: assess if the client has outgrown the current tier and recommend scaling

SLA Considerations

  • Dashboard availability target: 99.5% uptime (allows ~3.65 hours/month downtime for maintenance)
  • Data freshness: assessment data available within 24 hours of test administration
  • AI insight generation: within 48 hours of assessment data import
  • Issue response: P1 (dashboard down) — 1 hour response, 4 hour resolution; P2 (data error) — 4 hour response, 24 hour resolution; P3 (feature request) — acknowledged within 1 business day
  • Escalation: Tier 1 (MSP help desk) → Tier 2 (assigned data engineer) → Tier 3 (solutions architect, engaged for complex data issues or platform changes)

Alternatives

Turnkey Platform: Otus or Renaissance Schoolzilla

Instead of building a custom analytics stack, deploy a purpose-built education analytics platform like Otus ($3-8/student/year) or Renaissance's Schoolzilla (bundled with Star assessments). These platforms include pre-built cohort comparison dashboards, national norm benchmarking, and compliance features out of the box. The MSP role shifts to implementation, SSO configuration, training, and ongoing support rather than custom development.

Microsoft Power BI Education Analytics

Google Looker Studio + BigQuery (Free Tier)

Use Google Looker Studio (free) for dashboards, Google BigQuery (free tier: 1TB queries/month, 10GB storage) as the data warehouse, and Google Cloud Functions for ETL. Ideal for Google Workspace schools that want to avoid Microsoft licensing.

LinkIt! Platform for Standards-Aligned Benchmarking

Deploy LinkIt! as an all-in-one benchmark assessment and analytics platform. LinkIt! provides both the assessment content (standards-aligned benchmarks for ELA and Math) and the analytics dashboards with drill-down by district, school, subgroup, teacher, class, student, standard, and question level. Eliminates the need for a separate assessment platform.

Fully On-Premises Air-Gapped Deployment

For maximum data privacy (e.g., working with a tribal school, military-connected district, or under strict state privacy mandates), deploy the entire stack on-premises with no cloud dependencies. Replace OpenAI API with a locally-hosted LLM (Ollama + Llama 3.1 8B) for narrative generation. All data stays within the school's physical network.

Want early access to the full toolkit?