70 min readIntelligence & insights

Implementation Guide: Flag abnormal lab trends across patient panels and alert care coordinators

Step-by-step implementation guide for deploying AI to flag abnormal lab trends across patient panels and alert care coordinators for Healthcare clients.

Hardware Procurement

Integration & Analytics Server

Dell TechnologiesPowerEdge T360 (Intel Xeon E-2434, 32GB DDR5 ECC, 2x 960GB SSD RAID-1, iDRAC9 Enterprise)Qty: 1

$4,200 MSP cost / $6,500 suggested resale

On-premise integration engine host running Mirth Connect and local FHIR data cache. Receives HL7 messages from lab information systems and EHRs, performs data transformation, and forwards normalized data to cloud analytics. Also serves as local encrypted backup target for PHI data in transit. Required only if client prefers hybrid architecture or has unreliable internet connectivity; cloud-only deployments can skip this.

Next-Generation Firewall

Next-Generation Firewall

FortinetFortiGate 60F (FG-60F-BDL-950-12)Qty: 1

$1,400 MSP cost (appliance + 1yr UTP bundle) / $2,800 suggested resale

HIPAA-compliant perimeter security with IDS/IPS, application control, SSL inspection, and VPN for site-to-cloud encrypted tunnel. Replaces or augments existing consumer-grade router. Required for all PHI data flows between practice network and cloud infrastructure.

Encrypted Backup Appliance

Axcientx360Recover BDR - 2TB (Direct-to-Cloud or with local appliance)Qty: 1

$1,800 MSP cost (appliance) + $250/mo service / $3,200 resale + $500/mo service

HIPAA-compliant backup and disaster recovery for on-premise integration server, EHR database snapshots, and configuration data. Provides AES-256 encrypted local and cloud backup with 1-hour RPO. Required for HIPAA Security Rule compliance on systems hosting PHI.

Care Coordinator Workstation Upgrade (if needed)

Dell TechnologiesOptiPlex 7020 Micro (Intel Core i5-14500T, 16GB DDR5, 512GB NVMe SSD, Windows 11 Pro)Qty: 2

$950 MSP cost per unit / $1,500 suggested resale per unit

Upgraded workstations for care coordinators who will be primary users of the analytics dashboard. Dual-monitor capable for simultaneous EHR and analytics dashboard viewing. Only procure if existing workstations are older than 5 years or have less than 8GB RAM.

Dual Monitor Arms and 24-inch Monitors

Dell TechnologiesDell P2425H 24" IPS Monitor + MDA20 Dual Monitor ArmQty: 2

$380 MSP cost per set / $600 suggested resale per set

Dual-screen setup for care coordinators to view EHR and lab trend dashboard simultaneously without alt-tabbing. Significantly improves workflow efficiency for alert triage.

Software Procurement

...

Azara DRVS Population Health Platform

Azara Healthcare (a Figment company)DRVS

$1.50–$3.00 PMPM wholesale; suggest resale at $3.00–$5.00 PMPM. For 10,000 patients: $1,250–$2,500/mo MSP cost, $2,500–$4,167/mo resale

Primary analytics platform for population-level lab trend analysis, care gap identification, and care coordinator workflow management. Provides 600+ built-in clinical measures, drill-down from population to patient level, and pre-built dashboards for lab result trending. Handles HEDIS/UDS reporting. Recommended as primary platform for practices in value-based care.

Enterprise license: $5,000–$15,000 one-time + $2,000–$5,000/yr maintenance. Open-source (v3.x): Free but unsupported. Resale as managed service: $800–$1,500/mo

Healthcare integration engine that receives HL7 v2.x messages (ORU for lab results, ADT for demographics) from EHR and LIS systems, transforms them to FHIR R4 resources, applies filtering and routing logic, and forwards normalized data to the analytics platform and FHIR data store. License type: Perpetual license + annual maintenance (commercial version 4.6+); older open-source versions available under MPL 2.0

Microsoft Azure Health Data Services (FHIR Server)

MicrosoftAzure Health Data Services

$0.27/data store/hour (~$194/mo base) + storage at $0.10/GB/mo + API calls. Typical 10-provider practice: $250–$500/mo. Resale at $500–$1,000/mo as managed FHIR service

HIPAA-compliant FHIR R4 data store serving as the canonical clinical data repository. Stores normalized lab results (Observation resources), patient demographics (Patient resources), and practitioner assignments. Provides FHIR API for analytics queries, supports $everything operations for patient-level data export, and enables SMART on FHIR app authorization.

Microsoft Azure Virtual Machine (Integration Host)

MicrosoftD4s_v5 (4 vCPU, 16GB RAM)Qty: 1

~$140/mo + managed disk. Resale at $280–$400/mo

Cloud VM hosting Mirth Connect integration engine (if client opts for fully cloud-hosted architecture instead of on-premise server). Also hosts custom Python analytics services for trend detection.

Email: $0.00025/message; SMS: $0.0075/message. Typical monthly cost: $15–$50/mo

Sends care coordinator alert notifications via email and SMS when abnormal lab trends are detected. Integrates with custom alerting workflow.

Apache Superset (Self-Hosted BI)

Apache Software FoundationOpen Source (Apache 2.0)

Free software; hosting on Azure VM included above. Configuration labor: $3,000–$8,000 one-time

Open-source business intelligence platform for building custom lab trend dashboards, care coordinator alert queues, and provider performance reports. Connects to PostgreSQL analytical database. Provides row-level security for PHI access control.

PostgreSQL Database

Microsoft (Azure Database for PostgreSQL Flexible Server)Azure Database for PostgreSQL Flexible Server

Burstable B2s (2 vCPU, 4GB): ~$50/mo + storage. Resale at $120–$200/mo

Analytical database that stores denormalized lab result data optimized for trend queries. Receives transformed data from FHIR server via scheduled ETL jobs. Serves as data source for Apache Superset dashboards and custom trend detection algorithms.

Twilio (Backup SMS/Voice Alerts)

TwilioUsage-based API

SMS: $0.0079/message; Voice: $0.014/min. Monthly estimate: $20–$80

Backup alert delivery channel for critical/urgent lab trend alerts that require escalation beyond email. Provides voice call capability for high-priority alerts (e.g., critical potassium trends) when care coordinator has not acknowledged email/SMS within configurable timeout.

Self-hosted: Free (hosting costs only ~$200–$400/mo on Azure); Cloud: $2,000/mo production plan. Resale: $400–$800/mo self-hosted managed; $3,000–$4,000/mo cloud managed

Alternative FHIR server with built-in subscription/webhook support, HIPAA+SOC2 compliance, and BAA included in cloud plans. Better developer experience than Azure FHIR for smaller deployments. Includes built-in bot framework for custom automation. License type: Open Source (Apache 2.0) self-hosted; or Cloud hosted at $2,000/mo

Prerequisites

  • Active EHR system with API access enabled — must support at minimum HL7 v2.x ADT/ORU message export; FHIR R4 API strongly preferred. Supported EHRs: athenahealth, eClinicalWorks, Epic, NextGen, Cerner. Verify API licensing fees with EHR vendor before project start.
  • Functioning lab interface already configured between EHR and at least one reference laboratory (Quest, LabCorp, or regional lab). Lab results must be flowing electronically as HL7 ORU messages — practices still receiving lab results via fax/paper cannot participate without first establishing electronic lab interfaces.
  • LOINC-coded lab results in the EHR. Most modern lab interfaces use LOINC codes, but some legacy interfaces use proprietary codes. Run a report of the top 50 lab orders to verify LOINC coding before project start. If not LOINC-coded, budget additional 2–4 weeks for code mapping.
  • Minimum 12 months of historical lab data accessible via EHR API or database export. Trend detection algorithms require historical baseline data to establish patient-specific normal ranges and detect deviations. Practices with less than 6 months of digital lab data will have limited trend detection capability at launch.
  • HIPAA Security Risk Assessment completed within the past 12 months (or budget for one as part of this project). Required by HIPAA Security Rule and essential for documenting compliance posture before introducing new PHI data flows.
  • Dedicated care coordinator role(s) identified — at least one staff member whose job function includes proactive patient outreach, chronic disease management, and care gap closure. This person will be the primary user of the alert system. If no care coordinator exists, the practice must designate this role before go-live.
  • Network infrastructure capable of supporting encrypted site-to-cloud connectivity: minimum 25 Mbps symmetrical internet, business-grade router/firewall (or budget for FortiGate 60F procurement), and static IP address or dynamic DNS for VPN endpoint.
  • Microsoft Azure subscription with HIPAA BAA executed, or AWS account with BAA in place. MSP should use their own Azure CSP tenant and provision client resources under their management. BAA must be signed before any PHI enters the cloud environment.
  • Windows Active Directory or Azure AD (Entra ID) domain for user identity management. Required for RBAC implementation on analytics dashboard and FHIR server. Practices without domain infrastructure need Azure AD P1 licenses provisioned.
  • Written authorization from practice medical director or HIPAA Privacy Officer approving the data flows described in this project. Document the specific PHI data elements being processed (patient demographics, lab results, provider assignments) and the business purpose (quality improvement, care coordination).

Installation Steps

...

Step 1: Environment Assessment and Documentation

Conduct a thorough assessment of the client's existing IT environment, EHR system, lab interfaces, and network infrastructure. Document all findings in a standardized assessment template. This step prevents costly surprises during integration and establishes the baseline for compliance documentation.

Network assessment commands (run from admin workstation)
bash
# Network assessment commands (run from admin workstation)
nmap -sP 192.168.1.0/24 > network_scan_results.txt
# Test internet bandwidth
speedtest-cli --simple > bandwidth_test.txt
# Verify DNS resolution to Azure endpoints
nslookup azurehealthcareapis.com
nslookup login.microsoftonline.com
# Check existing firewall rules
# (vendor-specific — document current firewall make/model and export config)
# Verify EHR API endpoint accessibility
curl -I https://<ehr-fhir-endpoint>/metadata
Note

Create a formal assessment document including: EHR vendor and version, lab interface vendor and message type (HL7 v2.3 vs v2.5.1), number of active patients, number of providers, current network topology diagram, existing security controls. This document becomes the foundation for the implementation plan and compliance documentation. Allow 2–3 on-site visits for thorough assessment.

Step 2: Azure Environment Provisioning and HIPAA Hardening

Set up the Azure subscription under your MSP CSP tenant, execute the BAA, provision the resource group with appropriate tagging, and apply HIPAA/HITRUST security baseline. All resources must be deployed in a single Azure region with encryption enabled by default.

bash
# Login to Azure CLI
az login

# Create resource group for this client
az group create --name rg-clientname-labtrends --location eastus --tags environment=production compliance=hipaa client=ClientName

# Enable Azure Policy for HIPAA HITRUST baseline
az policy assignment create \
  --name 'hipaa-hitrust-baseline' \
  --display-name 'HIPAA HITRUST Compliance Baseline' \
  --policy-set-definition '/providers/Microsoft.Authorization/policySetDefinitions/a169a624-5599-4385-a696-c8d643089fab' \
  --scope '/subscriptions/<subscription-id>/resourceGroups/rg-clientname-labtrends'

# Create Key Vault for encryption keys and secrets
az keyvault create \
  --name kv-clientname-labtrends \
  --resource-group rg-clientname-labtrends \
  --location eastus \
  --sku premium \
  --enable-purge-protection true \
  --enable-soft-delete true \
  --retention-days 90

# Create Log Analytics workspace for audit logging
az monitor log-analytics workspace create \
  --resource-group rg-clientname-labtrends \
  --workspace-name law-clientname-labtrends \
  --retention-time 365

# Create storage account for FHIR export and backups
az storage account create \
  --name stclientnamelabtrends \
  --resource-group rg-clientname-labtrends \
  --location eastus \
  --sku Standard_GRS \
  --encryption-services blob file \
  --min-tls-version TLS1_2 \
  --allow-blob-public-access false
Note

Ensure the BAA is signed at the Azure subscription level BEFORE provisioning any resources that will contain PHI. Use Azure CSP (Cloud Solution Provider) model so the MSP retains administrative control. Tag all resources consistently for cost tracking and compliance auditing. Enable diagnostic logging on every resource from day one — retroactive log enablement creates compliance gaps.

Step 3: Deploy Azure Health Data Services (FHIR Server)

Provision the FHIR R4 server that will serve as the canonical clinical data repository for all lab results, patient demographics, and care team assignments. Configure SMART on FHIR authentication, enable audit logging, and set up CORS for dashboard access.

bash
# Create Azure Health Data Services workspace
az healthcareapis workspace create \
  --name hw-clientname-labtrends \
  --resource-group rg-clientname-labtrends \
  --location eastus

# Create FHIR service within the workspace
az healthcareapis fhir-service create \
  --name fhir-clientname \
  --workspace-name hw-clientname-labtrends \
  --resource-group rg-clientname-labtrends \
  --kind fhir-R4 \
  --authority https://login.microsoftonline.com/<tenant-id> \
  --audience https://hw-clientname-labtrends-fhir-clientname.fhir.azurehealthcareapis.com \
  --smart-proxy-enabled true

# Enable diagnostic logging to Log Analytics
az monitor diagnostic-settings create \
  --name fhir-diagnostics \
  --resource /subscriptions/<sub-id>/resourceGroups/rg-clientname-labtrends/providers/Microsoft.HealthcareApis/workspaces/hw-clientname-labtrends/fhirservices/fhir-clientname \
  --workspace /subscriptions/<sub-id>/resourceGroups/rg-clientname-labtrends/providers/Microsoft.OperationalInsights/workspaces/law-clientname-labtrends \
  --logs '[{"category":"AuditLogs","enabled":true,"retentionPolicy":{"enabled":true,"days":365}}]'

# Register application for FHIR API access
az ad app create --display-name 'LabTrends-FHIR-Client' --sign-in-audience AzureADMyOrg
# Note the appId from output, then create service principal
az ad sp create --id <appId>
# Create client secret
az ad app credential reset --id <appId> --append
# Assign FHIR Data Contributor role
az role assignment create \
  --assignee <service-principal-id> \
  --role 'FHIR Data Contributor' \
  --scope /subscriptions/<sub-id>/resourceGroups/rg-clientname-labtrends/providers/Microsoft.HealthcareApis/workspaces/hw-clientname-labtrends/fhirservices/fhir-clientname
Note

The FHIR endpoint URL will be: https://hw-clientname-labtrends-fhir-clientname.fhir.azurehealthcareapis.com. Store the client ID and secret in Azure Key Vault immediately — never in code or config files. Enable SMART on FHIR proxy if you plan to integrate third-party SMART apps later. The FHIR Data Contributor role allows read/write/delete — assign FHIR Data Reader role for dashboard-only service accounts.

Step 4: Deploy Azure PostgreSQL for Analytical Database

Provision the PostgreSQL Flexible Server that will store denormalized lab data optimized for trend analysis queries. The FHIR server stores the canonical data; PostgreSQL stores derived analytical views for fast dashboard queries and trend calculations.

Create PostgreSQL Flexible Server, private endpoint, and analytical database schema
bash
# Create PostgreSQL Flexible Server
az postgres flexible-server create \
  --name pg-clientname-labtrends \
  --resource-group rg-clientname-labtrends \
  --location eastus \
  --sku-name Standard_B2s \
  --storage-size 128 \
  --version 16 \
  --admin-user labtrends_admin \
  --admin-password '<STORE-IN-KEYVAULT>' \
  --public-access None \
  --tier Burstable

# Create private endpoint for secure access
az network private-endpoint create \
  --name pe-pg-labtrends \
  --resource-group rg-clientname-labtrends \
  --vnet-name vnet-clientname \
  --subnet subnet-data \
  --private-connection-resource-id /subscriptions/<sub-id>/resourceGroups/rg-clientname-labtrends/providers/Microsoft.DBforPostgreSQL/flexibleServers/pg-clientname-labtrends \
  --group-id postgresqlServer \
  --connection-name pg-labtrends-connection

# Connect and create database schema
psql -h pg-clientname-labtrends.postgres.database.azure.com -U labtrends_admin -d postgres

# SQL: Create analytical database
CREATE DATABASE labtrends_analytics;
\c labtrends_analytics

CREATE SCHEMA lab_data;

CREATE TABLE lab_data.patients (
  patient_id UUID PRIMARY KEY,
  fhir_id VARCHAR(64) NOT NULL,
  mrn VARCHAR(50),
  first_name VARCHAR(100),
  last_name VARCHAR(100),
  date_of_birth DATE,
  sex VARCHAR(10),
  primary_provider_id UUID,
  panel_assignment VARCHAR(100),
  created_at TIMESTAMPTZ DEFAULT NOW(),
  updated_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE TABLE lab_data.lab_results (
  result_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  patient_id UUID REFERENCES lab_data.patients(patient_id),
  loinc_code VARCHAR(20) NOT NULL,
  loinc_display VARCHAR(255),
  value_numeric DECIMAL(12,4),
  value_string VARCHAR(500),
  unit VARCHAR(50),
  reference_range_low DECIMAL(12,4),
  reference_range_high DECIMAL(12,4),
  interpretation_code VARCHAR(10),
  effective_date TIMESTAMPTZ NOT NULL,
  issued_date TIMESTAMPTZ,
  performing_lab VARCHAR(255),
  ordering_provider_id UUID,
  fhir_observation_id VARCHAR(64),
  created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_lab_results_patient_date ON lab_data.lab_results(patient_id, effective_date DESC);
CREATE INDEX idx_lab_results_loinc ON lab_data.lab_results(loinc_code);
CREATE INDEX idx_lab_results_effective ON lab_data.lab_results(effective_date);

CREATE TABLE lab_data.trend_alerts (
  alert_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  patient_id UUID REFERENCES lab_data.patients(patient_id),
  loinc_code VARCHAR(20) NOT NULL,
  alert_type VARCHAR(50) NOT NULL,
  severity VARCHAR(20) NOT NULL,
  trend_direction VARCHAR(20),
  trend_slope DECIMAL(12,6),
  data_points_count INTEGER,
  first_result_date TIMESTAMPTZ,
  latest_result_date TIMESTAMPTZ,
  latest_value DECIMAL(12,4),
  message TEXT NOT NULL,
  status VARCHAR(20) DEFAULT 'new',
  assigned_coordinator_id UUID,
  acknowledged_at TIMESTAMPTZ,
  resolved_at TIMESTAMPTZ,
  resolution_notes TEXT,
  created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_alerts_status ON lab_data.trend_alerts(status, severity);
CREATE INDEX idx_alerts_coordinator ON lab_data.trend_alerts(assigned_coordinator_id, status);

CREATE TABLE lab_data.clinical_rules (
  rule_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  loinc_code VARCHAR(20) NOT NULL,
  rule_name VARCHAR(255) NOT NULL,
  rule_type VARCHAR(50) NOT NULL,
  threshold_json JSONB NOT NULL,
  severity VARCHAR(20) NOT NULL,
  is_active BOOLEAN DEFAULT TRUE,
  created_by VARCHAR(100),
  created_at TIMESTAMPTZ DEFAULT NOW(),
  updated_at TIMESTAMPTZ DEFAULT NOW()
);
Note

Use Private Endpoints — never expose PostgreSQL to public internet. Store the admin password in Azure Key Vault. The Burstable B2s tier is sufficient for practices up to 20,000 patients; upgrade to General Purpose D2s_v4 if query performance degrades. Enable automated backups with 35-day retention (default) and geo-redundant backup storage.

Step 5: Deploy Integration Engine (Mirth Connect)

Install and configure Mirth Connect on either the on-premise Dell PowerEdge T360 or the Azure VM. Configure HL7 v2 listeners for lab result (ORU) messages and ADT messages from the EHR/LIS, and set up FHIR output channels to push normalized data to the Azure FHIR server.

Option A: Azure VM deployment. Option B: On-premise Dell PowerEdge T360 — install Ubuntu Server 22.04 LTS, then follow same steps above after configuring site-to-site VPN to Azure (Step 6).
bash
# Option A: Azure VM deployment
az vm create \
  --resource-group rg-clientname-labtrends \
  --name vm-mirth-labtrends \
  --image Ubuntu2204 \
  --size Standard_D4s_v5 \
  --admin-username mirthadmin \
  --generate-ssh-keys \
  --nsg nsg-mirth \
  --vnet-name vnet-clientname \
  --subnet subnet-integration \
  --public-ip-address ''

# SSH into VM
ssh mirthadmin@<private-ip>

# Install Java 17 (required for Mirth Connect 4.x)
sudo apt update && sudo apt upgrade -y
sudo apt install -y openjdk-17-jre-headless
java -version

# Install Mirth Connect (commercial version — download from NextGen portal with license)
wget https://downloads.mirthcorp.com/connect/4.5.0/mirthconnect-4.5.0-unix.tar.gz
tar -xzf mirthconnect-4.5.0-unix.tar.gz
cd Mirth\ Connect/

# Configure memory allocation
sed -i 's/-Xmx256m/-Xmx4096m/' mcservice.vmoptions
sed -i 's/-Xms256m/-Xms2048m/' mcservice.vmoptions

# Start Mirth Connect
./mcservice start

# Verify it's running
curl -k https://localhost:8443/api/server/status

# Option B: On-premise Dell PowerEdge T360
# Install Ubuntu Server 22.04 LTS, then follow same steps above
# Configure site-to-site VPN to Azure first (Step 6)
Note

Mirth Connect 4.6+ requires a commercial license from NextGen Healthcare. Budget $5,000–$15,000 for the license. For cost-sensitive deployments, Mirth Connect 3.12 (last open-source version) is available but no longer receives security patches. ALWAYS run Mirth behind a firewall with no direct internet exposure. The Mirth Administrator GUI runs on port 8443 (HTTPS) — access only via VPN or SSH tunnel. Never expose Mirth ports (HL7 listeners typically on 6661, 6662, etc.) to the public internet.

Step 6: Configure Network Security and VPN

Establish encrypted connectivity between the practice network and Azure cloud infrastructure. Configure the FortiGate firewall, set up site-to-site VPN, and implement network segmentation to isolate PHI-carrying traffic.

bash
# Azure side: Create VPN Gateway
az network vnet-gateway create \
  --name vpngw-clientname \
  --resource-group rg-clientname-labtrends \
  --vnet vnet-clientname \
  --gateway-type Vpn \
  --vpn-type RouteBased \
  --sku VpnGw1 \
  --generation Generation2

# Create Local Network Gateway (represents on-prem FortiGate)
az network local-gateway create \
  --name lgw-clientname-onprem \
  --resource-group rg-clientname-labtrends \
  --gateway-ip-address <practice-public-ip> \
  --local-address-prefixes 192.168.1.0/24

# Create VPN connection
az network vpn-connection create \
  --name vpnconn-to-practice \
  --resource-group rg-clientname-labtrends \
  --vnet-gateway1 vpngw-clientname \
  --local-gateway2 lgw-clientname-onprem \
  --shared-key '<STORE-IN-KEYVAULT>' \
  --connection-protocol IKEv2

# FortiGate side: Configure via CLI (SSH to FortiGate)
config vpn ipsec phase1-interface
  edit azure-vpn
    set interface port1
    set ike-version 2
    set peertype any
    set net-device disable
    set proposal aes256-sha256
    set remote-gw <azure-vpn-gateway-ip>
    set psksecret <shared-key>
  next
end

config vpn ipsec phase2-interface
  edit azure-vpn-p2
    set phase1name azure-vpn
    set proposal aes256-sha256
    set src-addr-type subnet
    set src-subnet 192.168.1.0/24
    set dst-addr-type subnet
    set dst-subnet 10.0.0.0/16
  next
end

# Create firewall policies for HL7/FHIR traffic only
config firewall policy
  edit 100
    set srcintf internal
    set dstintf azure-vpn
    set srcaddr LAN
    set dstaddr Azure-FHIR-Subnet
    set action accept
    set schedule always
    set service HTTPS CUSTOM-HL7
    set logtraffic all
    set comments 'Lab Trend Analytics - PHI Traffic'
  next
end
Note

VPN Gateway VpnGw1 costs ~$140/month. For smaller deployments, consider Azure VPN P2S (Point-to-Site) at lower cost, but S2S is preferred for always-on integration engine connectivity. If the practice already has a site-to-site VPN to Azure (e.g., for hosted applications), reuse the existing tunnel and add route entries. Test VPN throughput with iperf3 before going live — lab HL7 messages are small but need reliable low-latency delivery.

Step 7: Configure Mirth Connect Integration Channels

Create and configure the Mirth Connect channels that receive HL7 v2 lab messages, parse them, normalize lab codes to LOINC, transform data to FHIR R4 Observation/Patient resources, and POST them to the Azure FHIR server. This is the most technically complex step and requires HL7/FHIR expertise.

Channel 1: HL7 ORU Lab Result Receiver
javascript
// Source Transformer and FHIR Destination

# Channel 1: HL7 ORU Lab Result Receiver
# Source: TCP Listener on port 6661 (HL7 v2 MLLP)
# Destination 1: FHIR Server (HTTP POST)
# Destination 2: PostgreSQL (JDBC)

# In Mirth Connect Administrator, create new channel:
# Name: LAB_RESULTS_INBOUND
# Source Connector: TCP Listener
#   - Port: 6661
#   - Mode: MLLP
#   - Receive Timeout: 30000ms

# Source Transformer (JavaScript):
// Parse ORU^R01 message and extract lab results
var obxSegments = msg['OBX'];
var patientId = msg['PID']['PID.3']['PID.3.1'].toString();
var patientName = msg['PID']['PID.5'];

for (var i = 0; i < obxSegments.length(); i++) {
  var obx = obxSegments[i];
  var loincCode = obx['OBX.3']['OBX.3.1'].toString();
  var loincDisplay = obx['OBX.3']['OBX.3.2'].toString();
  var value = obx['OBX.5']['OBX.5.1'].toString();
  var units = obx['OBX.6']['OBX.6.1'].toString();
  var refRange = obx['OBX.7']['OBX.7.1'].toString();
  var abnormalFlag = obx['OBX.8']['OBX.8.1'].toString();
  var resultDate = obx['OBX.14']['OBX.14.1'].toString();
  
  // Store in channel map for FHIR transformation
  channelMap.put('loinc_' + i, loincCode);
  channelMap.put('value_' + i, value);
  channelMap.put('unit_' + i, units);
  channelMap.put('flag_' + i, abnormalFlag);
}
channelMap.put('obx_count', obxSegments.length());

# Destination 1: FHIR Observation POST
# Type: HTTP Sender
# URL: https://hw-clientname-labtrends-fhir-clientname.fhir.azurehealthcareapis.com/Observation
# Method: POST
# Headers: Authorization: Bearer <token>, Content-Type: application/fhir+json

# Destination 1 Transformer - Build FHIR Observation resource:
var observation = {
  resourceType: 'Observation',
  status: 'final',
  category: [{
    coding: [{
      system: 'http://terminology.hl7.org/CodeSystem/observation-category',
      code: 'laboratory',
      display: 'Laboratory'
    }]
  }],
  code: {
    coding: [{
      system: 'http://loinc.org',
      code: channelMap.get('loinc_0'),
      display: channelMap.get('loincDisplay_0')
    }]
  },
  subject: { reference: 'Patient/' + channelMap.get('fhir_patient_id') },
  effectiveDateTime: channelMap.get('result_date'),
  valueQuantity: {
    value: parseFloat(channelMap.get('value_0')),
    unit: channelMap.get('unit_0'),
    system: 'http://unitsofmeasure.org'
  }
};
msg = JSON.stringify(observation);
Note

This is a simplified representation — the actual Mirth channel will be much more complex. Key considerations: (1) Handle LOINC mapping for labs that use proprietary codes — maintain a lookup table in Mirth's Code Template library. (2) Implement patient matching logic — match incoming PID segments to existing FHIR Patient resources using MRN. (3) Handle batch ORU messages with multiple OBX segments. (4) Implement error handling and dead letter queues for failed transformations. (5) Obtain OAuth2 tokens for FHIR server access using client credentials flow with token caching. Budget 40–60 hours of integration engineering time for this step. Consider engaging a Mirth-certified consultant if your team lacks HL7/FHIR transformation experience.

Step 8: Deploy FHIR-to-PostgreSQL ETL Pipeline

Create a scheduled ETL (Extract, Transform, Load) process that queries the FHIR server for new Observation resources and denormalizes them into the PostgreSQL analytical database. This separation allows the FHIR server to remain the canonical data store while PostgreSQL provides fast analytical queries.

ETL pipeline script: etl_fhir_to_postgres.py — deploy as Azure Function or cron job
python
# Deploy as Azure Function (Python) or cron job on integration VM
# File: etl_fhir_to_postgres.py

import requests
import psycopg2
import json
from datetime import datetime, timedelta
import os

FHIR_BASE = os.environ['FHIR_BASE_URL']
FHIR_TOKEN = get_fhir_token()  # OAuth2 client credentials flow
PG_CONN = os.environ['PG_CONNECTION_STRING']

def sync_observations(since_hours=1):
    """Fetch recent lab observations from FHIR and insert into PostgreSQL"""
    since = (datetime.utcnow() - timedelta(hours=since_hours)).isoformat() + 'Z'
    
    url = f"{FHIR_BASE}/Observation?category=laboratory&_lastUpdated=gt{since}&_count=100"
    conn = psycopg2.connect(PG_CONN)
    cur = conn.cursor()
    
    while url:
        resp = requests.get(url, headers={'Authorization': f'Bearer {FHIR_TOKEN}'})
        bundle = resp.json()
        
        for entry in bundle.get('entry', []):
            obs = entry['resource']
            patient_ref = obs.get('subject', {}).get('reference', '')
            loinc_code = obs.get('code', {}).get('coding', [{}])[0].get('code', '')
            loinc_display = obs.get('code', {}).get('coding', [{}])[0].get('display', '')
            
            value_numeric = None
            value_string = None
            unit = None
            ref_low = None
            ref_high = None
            
            if 'valueQuantity' in obs:
                value_numeric = obs['valueQuantity'].get('value')
                unit = obs['valueQuantity'].get('unit')
            elif 'valueString' in obs:
                value_string = obs['valueString']
            
            if 'referenceRange' in obs and obs['referenceRange']:
                rr = obs['referenceRange'][0]
                ref_low = rr.get('low', {}).get('value')
                ref_high = rr.get('high', {}).get('value')
            
            interpretation = ''
            if 'interpretation' in obs and obs['interpretation']:
                interpretation = obs['interpretation'][0].get('coding', [{}])[0].get('code', '')
            
            cur.execute("""
                INSERT INTO lab_data.lab_results 
                (patient_id, loinc_code, loinc_display, value_numeric, value_string,
                 unit, reference_range_low, reference_range_high, interpretation_code,
                 effective_date, fhir_observation_id)
                SELECT p.patient_id, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
                FROM lab_data.patients p WHERE p.fhir_id = %s
                ON CONFLICT (fhir_observation_id) DO NOTHING
            """, (loinc_code, loinc_display, value_numeric, value_string,
                  unit, ref_low, ref_high, interpretation,
                  obs.get('effectiveDateTime'), obs.get('id'),
                  patient_ref.replace('Patient/', '')))
        
        # Handle pagination
        url = None
        for link in bundle.get('link', []):
            if link['relation'] == 'next':
                url = link['url']
    
    conn.commit()
    cur.close()
    conn.close()

# Deploy as Azure Function with timer trigger (every 15 minutes)
# function.json:
# {
#   "bindings": [{
#     "name": "timer",
#     "type": "timerTrigger",
#     "direction": "in",
#     "schedule": "0 */15 * * * *"
#   }]
# }
Note

Run the ETL every 15 minutes for near-real-time data freshness. For practices with high lab volume (>200 results/day), consider reducing to every 5 minutes. Add a unique constraint on fhir_observation_id column to prevent duplicate inserts. Implement retry logic with exponential backoff for FHIR API failures. Monitor ETL execution via Azure Application Insights — set alerts for consecutive failures. For the initial historical data load, run a one-time bulk sync with a wider date range (12+ months) and increase _count to 500.

Step 9: Deploy Trend Detection Analytics Engine

Deploy the core lab trend detection service that analyzes patient lab histories, applies clinical rules, calculates statistical trends (linear regression slopes, moving averages, threshold crossings), and generates alerts for care coordinators. This is the AI/analytics heart of the system.

bash
# Deploy as Azure Function App or standalone Python service
# File: trend_detection_service.py
# See custom_ai_components section for full implementation

# Create Azure Function App
az functionapp create \
  --resource-group rg-clientname-labtrends \
  --consumption-plan-location eastus \
  --runtime python \
  --runtime-version 3.11 \
  --functions-version 4 \
  --name func-labtrends-analytics \
  --storage-account stclientnamelabtrends \
  --os-type Linux

# Set application settings
az functionapp config appsettings set \
  --name func-labtrends-analytics \
  --resource-group rg-clientname-labtrends \
  --settings \
    PG_CONNECTION_STRING='@Microsoft.KeyVault(SecretUri=https://kv-clientname-labtrends.vault.azure.net/secrets/pg-connection-string/)' \
    FHIR_BASE_URL='https://hw-clientname-labtrends-fhir-clientname.fhir.azurehealthcareapis.com' \
    ALERT_EMAIL_ENDPOINT='https://labtrends-comm.communication.azure.com' \
    TWILIO_ACCOUNT_SID='@Microsoft.KeyVault(SecretUri=https://kv-clientname-labtrends.vault.azure.net/secrets/twilio-sid/)' \
    TWILIO_AUTH_TOKEN='@Microsoft.KeyVault(SecretUri=https://kv-clientname-labtrends.vault.azure.net/secrets/twilio-token/)'

# Enable managed identity for Key Vault access
az functionapp identity assign --name func-labtrends-analytics --resource-group rg-clientname-labtrends

# Grant Key Vault access
az keyvault set-policy \
  --name kv-clientname-labtrends \
  --object-id <managed-identity-principal-id> \
  --secret-permissions get list

# Deploy function code
cd labtrends-analytics/
func azure functionapp publish func-labtrends-analytics
Note

The trend detection service runs on a 30-minute timer trigger by default. For urgent critical lab detection (e.g., rapidly deteriorating kidney function), configure Mirth Connect to send a direct webhook to the analytics function for immediate processing of flagged results. The Consumption plan is cost-effective for typical practice volumes (<1000 analyses/day). If processing consistently exceeds 5 minutes per run, upgrade to Azure Functions Premium plan ($175/month) for pre-warmed instances.

Step 10: Deploy Apache Superset Care Coordinator Dashboard

Install and configure Apache Superset as the primary care coordinator interface for viewing lab trend alerts, patient panels, and quality metrics. Configure role-based access control, connect to PostgreSQL analytical database, and create pre-built dashboards.

Deploy Superset on Azure VM using Docker Compose
bash
# Deploy Superset on Azure VM (can share with Mirth VM or separate)
# Using Docker Compose for easier management

# Install Docker on Ubuntu VM
sudo apt-get update
sudo apt-get install -y docker.io docker-compose-v2
sudo usermod -aG docker $USER

# Create Superset directory
mkdir -p /opt/superset && cd /opt/superset

# Create docker-compose.yml
cat > docker-compose.yml << 'EOF'
version: '3.8'
services:
  superset:
    image: apache/superset:3.1.0
    container_name: superset
    ports:
      - '8088:8088'
    environment:
      - SUPERSET_SECRET_KEY=${SUPERSET_SECRET_KEY}
      - SUPERSET_LOAD_EXAMPLES=no
    volumes:
      - ./superset_config.py:/app/pythonpath/superset_config.py
      - superset_home:/app/superset_home
    restart: unless-stopped
  redis:
    image: redis:7-alpine
    container_name: superset_redis
    restart: unless-stopped
    volumes:
      - redis_data:/data
  postgres_meta:
    image: postgres:16-alpine
    container_name: superset_meta_db
    environment:
      - POSTGRES_DB=superset
      - POSTGRES_USER=superset
      - POSTGRES_PASSWORD=${SUPERSET_META_DB_PASS}
    volumes:
      - pg_meta_data:/var/lib/postgresql/data
    restart: unless-stopped
volumes:
  superset_home:
  redis_data:
  pg_meta_data:
EOF

# Create Superset config
cat > superset_config.py << 'PYEOF'
import os
SQLALCHEMY_DATABASE_URI = f"postgresql://superset:{os.environ.get('SUPERSET_META_DB_PASS')}@postgres_meta:5432/superset"
SECRET_KEY = os.environ.get('SUPERSET_SECRET_KEY')
ENABLE_PROXY_FIX = True
ROW_LIMIT = 50000
CACHE_CONFIG = {
    'CACHE_TYPE': 'RedisCache',
    'CACHE_DEFAULT_TIMEOUT': 300,
    'CACHE_KEY_PREFIX': 'superset_',
    'CACHE_REDIS_HOST': 'redis',
    'CACHE_REDIS_PORT': 6379,
}
# HIPAA: Disable telemetry
SCARF_ANALYTICS = False
# Session security
SESSION_COOKIE_HTTPONLY = True
SESSION_COOKIE_SECURE = True
WTF_CSRF_ENABLED = True
PYEOF

# Generate secrets
export SUPERSET_SECRET_KEY=$(openssl rand -base64 42)
export SUPERSET_META_DB_PASS=$(openssl rand -base64 32)

# Start services
docker compose up -d

# Initialize Superset
docker exec -it superset superset db upgrade
docker exec -it superset superset fab create-admin \
  --username admin \
  --firstname Admin \
  --lastname User \
  --email admin@msp-domain.com \
  --password '<STORE-IN-KEYVAULT>'
docker exec -it superset superset init

# Add PostgreSQL analytical database connection (via Superset UI)
# Navigate to: Settings > Database Connections > + Database
# SQLAlchemy URI: postgresql://labtrends_readonly:<password>@pg-clientname-labtrends.postgres.database.azure.com:5432/labtrends_analytics?sslmode=require
Critical

Place Superset behind an Azure Application Gateway with WAF and TLS termination — never expose port 8088 directly to the internet. Configure SAML/OAuth2 SSO integration with Azure AD for user authentication. Create a read-only PostgreSQL user (labtrends_readonly) for Superset connections — never use the admin account. Enable row-level security in Superset to restrict care coordinators to their assigned patient panels. Pre-build 4 dashboards: (1) Active Alerts Queue, (2) Patient Lab Trend Detail, (3) Provider Panel Overview, (4) Quality Metrics Summary.

Step 11: Configure Alert Notification System

Set up the multi-channel notification system that delivers trend alerts to care coordinators via email, SMS, and optionally EHR in-basket messages. Configure escalation rules for unacknowledged alerts.

bash
# Create Azure Communication Services resource
az communication create \
  --name acs-clientname-labtrends \
  --resource-group rg-clientname-labtrends \
  --data-location unitedstates \
  --location global

# Get connection string
az communication list-key \
  --name acs-clientname-labtrends \
  --resource-group rg-clientname-labtrends

# File: alert_notification.py
import os
from azure.communication.email import EmailClient
from twilio.rest import Client as TwilioClient
from datetime import datetime
import psycopg2

class AlertNotifier:
    def __init__(self):
        self.email_client = EmailClient.from_connection_string(
            os.environ['ACS_CONNECTION_STRING']
        )
        self.twilio_client = TwilioClient(
            os.environ['TWILIO_ACCOUNT_SID'],
            os.environ['TWILIO_AUTH_TOKEN']
        )
        self.pg_conn_str = os.environ['PG_CONNECTION_STRING']
    
    def send_alert(self, alert):
        severity = alert['severity']
        
        # Always send email
        self._send_email(alert)
        
        # SMS for HIGH and CRITICAL
        if severity in ('high', 'critical'):
            self._send_sms(alert)
        
        # Voice call for CRITICAL (unacknowledged after 30 min)
        if severity == 'critical':
            self._schedule_escalation(alert, minutes=30)
    
    def _send_email(self, alert):
        message = {
            'senderAddress': 'LabTrends@<verified-domain>.azurecomm.net',
            'recipients': {
                'to': [{'address': alert['coordinator_email']}]
            },
            'content': {
                'subject': f"[{alert['severity'].upper()}] Lab Trend Alert: {alert['patient_name']}",
                'plainText': self._format_alert_text(alert),
                'html': self._format_alert_html(alert)
            }
        }
        self.email_client.begin_send(message)
    
    def _send_sms(self, alert):
        self.twilio_client.messages.create(
            body=f"Lab Alert [{alert['severity'].upper()}]: {alert['patient_name']} - {alert['message'][:140]}. Review in dashboard.",
            from_=os.environ['TWILIO_PHONE_NUMBER'],
            to=alert['coordinator_phone']
        )
    
    def _format_alert_html(self, alert):
        color = {'critical': '#dc3545', 'high': '#fd7e14', 'medium': '#ffc107', 'low': '#17a2b8'}
        return f"""
        <div style='font-family:Arial; max-width:600px;'>
            <div style='background:{color.get(alert["severity"], "#6c757d")}; color:white; padding:12px; border-radius:4px 4px 0 0;'>
                <h2 style='margin:0;'>Lab Trend Alert - {alert['severity'].upper()}</h2>
            </div>
            <div style='border:1px solid #ddd; padding:16px;'>
                <p><strong>Patient:</strong> {alert['patient_name']} (MRN: {alert['mrn']})</p>
                <p><strong>Lab Test:</strong> {alert['loinc_display']} ({alert['loinc_code']})</p>
                <p><strong>Finding:</strong> {alert['message']}</p>
                <p><strong>Latest Value:</strong> {alert['latest_value']} {alert.get('unit', '')}</p>
                <p><strong>Trend:</strong> {alert['trend_direction']} over {alert['data_points_count']} results</p>
                <p><strong>Date Range:</strong> {alert['first_result_date']} to {alert['latest_result_date']}</p>
                <hr>
                <p><a href='{alert["dashboard_url"]}'>View Full Trend in Dashboard →</a></p>
            </div>
        </div>
        """
Note

Register a custom domain in Azure Communication Services for sending emails (avoid the default azurecomm.net domain which may be flagged as spam). Configure SPF, DKIM, and DMARC records. For practices that want alerts in their EHR in-basket, use the EHR's FHIR Communication resource or Task resource API — this requires additional EHR-specific development. Set up daily digest emails summarizing all open alerts as an additional touchpoint. Test email delivery to common healthcare email providers (Microsoft 365, Google Workspace) to ensure no spam filtering issues.

Step 12: Historical Data Migration and Baseline Establishment

Perform the initial bulk load of historical lab data (minimum 12 months) from the EHR to establish patient baselines for trend detection. This is essential — the system cannot detect trends without historical context.

Historical data migration options (FHIR Bulk Export, database extract, HL7 replay) and patient baseline calculation SQL
bash
# Option 1: FHIR Bulk Data Export (if EHR supports SMART Backend Services)
# Request bulk export of Observation resources
curl -X GET \
  'https://<ehr-fhir-endpoint>/Observation/$export?_type=Observation&_typeFilter=Observation%3Fcategory%3Dlaboratory' \
  -H 'Authorization: Bearer <ehr-access-token>' \
  -H 'Accept: application/fhir+json' \
  -H 'Prefer: respond-async'

# Poll the Content-Location URL until export is ready
# Download NDJSON files and import into FHIR server

# Option 2: Database extract (coordinate with EHR vendor)
# Export lab results as CSV from EHR reporting module
# Required columns: PatientMRN, LOINC, Value, Units, ReferenceRange, Date, OrderingProvider

# Option 3: HL7 replay through Mirth
# If the EHR can regenerate historical HL7 ORU messages, replay them through
# the Mirth channel (slower but most accurate transformation)

# After historical load, run baseline calculation:
psql -h pg-clientname-labtrends.postgres.database.azure.com -U labtrends_admin -d labtrends_analytics

-- Calculate patient-specific baselines
CREATE MATERIALIZED VIEW lab_data.patient_baselines AS
SELECT 
    patient_id,
    loinc_code,
    COUNT(*) as result_count,
    AVG(value_numeric) as mean_value,
    STDDEV(value_numeric) as stddev_value,
    MIN(value_numeric) as min_value,
    MAX(value_numeric) as max_value,
    PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY value_numeric) as median_value,
    MIN(effective_date) as first_result_date,
    MAX(effective_date) as latest_result_date
FROM lab_data.lab_results
WHERE value_numeric IS NOT NULL
GROUP BY patient_id, loinc_code
HAVING COUNT(*) >= 3;

CREATE UNIQUE INDEX idx_baselines_pk ON lab_data.patient_baselines(patient_id, loinc_code);

-- Refresh this view daily
-- Add to cron: 0 2 * * * psql -c 'REFRESH MATERIALIZED VIEW CONCURRENTLY lab_data.patient_baselines;'
Note

Historical data migration is often the longest phase (2–4 weeks). Work closely with the EHR vendor or practice IT to determine the most efficient extraction method. FHIR Bulk Export is cleanest but not all EHRs support it. Direct database extraction requires a BAA with whoever performs the extract. Validate data quality after migration: check for missing LOINC codes, null values, duplicate records, and date accuracy. A minimum of 3 data points per patient per lab type is needed for meaningful trend detection. Expect 60–80% of patients to meet this threshold for common labs (CBC, CMP, lipids, HbA1c).

Step 13: Clinical Rule Configuration and Validation

Work with the practice medical director or clinical informatics consultant to configure the specific lab trend detection rules, thresholds, and alert priorities. This step MUST involve clinical oversight — MSP technicians should NOT define clinical thresholds independently.

Example clinical rules for lab trend detection — requires medical director review and sign-off before activation
sql
# Insert clinical rules into the rules engine
# These are EXAMPLE rules — must be validated by the practice's clinical team

psql -h pg-clientname-labtrends.postgres.database.azure.com -U labtrends_admin -d labtrends_analytics

-- Rule: Rising HbA1c trend (diabetes management)
INSERT INTO lab_data.clinical_rules (loinc_code, rule_name, rule_type, threshold_json, severity) VALUES
('4548-4', 'Rising HbA1c Trend', 'trend_slope', '{
  "min_data_points": 3,
  "lookback_months": 18,
  "slope_threshold": 0.3,
  "direction": "increasing",
  "absolute_threshold": 7.0,
  "description": "HbA1c increasing by >=0.3% per measurement interval over 3+ results"
}', 'high');

-- Rule: Declining eGFR (kidney function deterioration)
INSERT INTO lab_data.clinical_rules (loinc_code, rule_name, rule_type, threshold_json, severity) VALUES
('33914-3', 'Declining eGFR', 'trend_slope', '{
  "min_data_points": 3,
  "lookback_months": 24,
  "slope_threshold": -5.0,
  "direction": "decreasing",
  "absolute_threshold": 60,
  "critical_threshold": 30,
  "description": "eGFR declining by >=5 mL/min/1.73m2 per year over 3+ results"
}', 'high');

-- Rule: Rising creatinine
INSERT INTO lab_data.clinical_rules (loinc_code, rule_name, rule_type, threshold_json, severity) VALUES
('2160-0', 'Rising Creatinine', 'trend_slope', '{
  "min_data_points": 3,
  "lookback_months": 12,
  "slope_threshold": 0.3,
  "direction": "increasing",
  "absolute_threshold": 1.3,
  "description": "Serum creatinine rising >=0.3 mg/dL over 3+ results"
}', 'high');

-- Rule: Worsening LDL cholesterol
INSERT INTO lab_data.clinical_rules (loinc_code, rule_name, rule_type, threshold_json, severity) VALUES
('2089-1', 'Worsening LDL Cholesterol', 'trend_slope', '{
  "min_data_points": 2,
  "lookback_months": 24,
  "slope_threshold": 20.0,
  "direction": "increasing",
  "absolute_threshold": 130,
  "description": "LDL increasing by >=20 mg/dL between measurements"
}', 'medium');

-- Rule: Declining hemoglobin (anemia)
INSERT INTO lab_data.clinical_rules (loinc_code, rule_name, rule_type, threshold_json, severity) VALUES
('718-7', 'Declining Hemoglobin', 'trend_slope', '{
  "min_data_points": 2,
  "lookback_months": 12,
  "slope_threshold": -1.5,
  "direction": "decreasing",
  "absolute_threshold_male": 13.0,
  "absolute_threshold_female": 11.5,
  "description": "Hemoglobin declining >=1.5 g/dL over 2+ results"
}', 'medium');

-- Rule: Rising ALT/AST (liver function)
INSERT INTO lab_data.clinical_rules (loinc_code, rule_name, rule_type, threshold_json, severity) VALUES
('1742-6', 'Rising ALT', 'threshold_crossing', '{
  "min_data_points": 2,
  "lookback_months": 12,
  "upper_threshold": 56,
  "consecutive_abnormal": 2,
  "multiplier_critical": 3,
  "description": "ALT above normal range on 2+ consecutive results"
}', 'medium');

-- Rule: Rising TSH (thyroid)
INSERT INTO lab_data.clinical_rules (loinc_code, rule_name, rule_type, threshold_json, severity) VALUES
('3016-3', 'Abnormal TSH Trend', 'threshold_crossing', '{
  "min_data_points": 2,
  "lookback_months": 12,
  "upper_threshold": 4.5,
  "lower_threshold": 0.4,
  "consecutive_abnormal": 2,
  "description": "TSH outside normal range on 2+ consecutive results"
}', 'medium');

-- Rule: Missing follow-up labs (overdue)
INSERT INTO lab_data.clinical_rules (loinc_code, rule_name, rule_type, threshold_json, severity) VALUES
('4548-4', 'Overdue HbA1c for Diabetic', 'missing_followup', '{
  "last_result_older_than_months": 6,
  "condition_filter": "last_value >= 6.5 OR diagnosis_code LIKE E11%",
  "description": "Diabetic patient with no HbA1c in past 6 months"
}', 'medium');
Critical

CRITICAL: These rules are examples only and MUST be reviewed and approved by the practice's medical director before activation. Schedule a 2-hour clinical configuration session with the medical director and lead care coordinator. Prepare a printed rule summary document for sign-off. Start with conservative thresholds (fewer, higher-confidence alerts) and tune downward over 60–90 days based on feedback. Alert fatigue is the #1 reason clinical decision support systems fail — it is far better to start with 5 high-quality rules than 20 noisy ones. Document the clinical rationale for each rule and keep the sign-off document in the project file for compliance purposes.

Step 14: User Account Setup and RBAC Configuration

Create user accounts for all care coordinators, providers, and administrators. Configure role-based access control to ensure minimum necessary access to PHI. Set up Azure AD groups, Superset roles, and FHIR server access policies.

bash
# Create Azure AD security groups
az ad group create --display-name 'LabTrends-CareCoordinators' --mail-nickname 'labtrends-cc'
az ad group create --display-name 'LabTrends-Providers' --mail-nickname 'labtrends-providers'
az ad group create --display-name 'LabTrends-Admins' --mail-nickname 'labtrends-admins'

# Add users to groups
az ad group member add --group 'LabTrends-CareCoordinators' --member-id <user-object-id>

# Assign FHIR roles
# Care coordinators: Read-only access
az role assignment create \
  --assignee-object-id <cc-group-id> \
  --role 'FHIR Data Reader' \
  --scope /subscriptions/<sub-id>/resourceGroups/rg-clientname-labtrends/providers/Microsoft.HealthcareApis/workspaces/hw-clientname-labtrends/fhirservices/fhir-clientname

# Admins: Full access
az role assignment create \
  --assignee-object-id <admin-group-id> \
  --role 'FHIR Data Contributor' \
  --scope /subscriptions/<sub-id>/resourceGroups/rg-clientname-labtrends/providers/Microsoft.HealthcareApis/workspaces/hw-clientname-labtrends/fhirservices/fhir-clientname

# In Superset, create matching roles:
# Role: CareCoordinator - datasource access to lab_data schema, dashboard access to Alert Queue + Patient Trend dashboards
# Role: Provider - read-only access to Provider Panel Overview dashboard
# Role: Admin - full access including SQL Lab

# PostgreSQL: Create read-only user for Superset
psql -h pg-clientname-labtrends.postgres.database.azure.com -U labtrends_admin -d labtrends_analytics
CREATE ROLE labtrends_readonly WITH LOGIN PASSWORD '<store-in-keyvault>';
GRANT USAGE ON SCHEMA lab_data TO labtrends_readonly;
GRANT SELECT ON ALL TABLES IN SCHEMA lab_data TO labtrends_readonly;
ALTER DEFAULT PRIVILEGES IN SCHEMA lab_data GRANT SELECT ON TABLES TO labtrends_readonly;
Note

Enforce MFA for all users accessing the analytics dashboard. Use Azure AD Conditional Access policies to restrict access to managed/compliant devices only. Create a 'break glass' admin account with MFA stored in a physical safe for emergency access. Document all user accounts and roles in the HIPAA Security documentation. Review user access quarterly and remove access for terminated or role-changed employees within 24 hours.

Step 15: Go-Live Preparation and Phased Rollout

Prepare for production launch by running the system in shadow mode (generating alerts but not delivering them) for 2 weeks, reviewing alert accuracy with clinical staff, tuning rules, and then activating notifications in phases.

Phased rollout commands: shadow mode setup, alert review query, and progressive severity enablement
bash
# Phase 1: Shadow Mode (Week 1-2)
# Set all alerts to 'shadow' status — written to database but no notifications sent
psql -h pg-clientname-labtrends.postgres.database.azure.com -U labtrends_admin -d labtrends_analytics
UPDATE lab_data.clinical_rules SET is_active = TRUE;
-- In alert_notification.py, add shadow mode flag:
-- if os.environ.get('SHADOW_MODE') == 'true': log_only, skip notification

# Set shadow mode
az functionapp config appsettings set \
  --name func-labtrends-analytics \
  --resource-group rg-clientname-labtrends \
  --settings SHADOW_MODE=true

# After 2 weeks, review shadow alerts with clinical team
SELECT rule_name, severity, COUNT(*) as alert_count,
  COUNT(DISTINCT patient_id) as patients_affected
FROM lab_data.trend_alerts ta
JOIN lab_data.clinical_rules cr ON ta.loinc_code = cr.loinc_code
WHERE ta.created_at > NOW() - INTERVAL '14 days'
GROUP BY rule_name, severity
ORDER BY alert_count DESC;

# Phase 2: Enable notifications for HIGH/CRITICAL only (Week 3-4)
az functionapp config appsettings set \
  --name func-labtrends-analytics \
  --resource-group rg-clientname-labtrends \
  --settings SHADOW_MODE=false MIN_ALERT_SEVERITY=high

# Phase 3: Full go-live (Week 5+)
az functionapp config appsettings set \
  --name func-labtrends-analytics \
  --resource-group rg-clientname-labtrends \
  --settings MIN_ALERT_SEVERITY=low
Note

The shadow mode period is critical for building clinical trust and reducing alert fatigue. During the review session, present the shadow alerts to the medical director and ask: (1) Are these clinically meaningful? (2) Would you want your care coordinator to act on this? (3) Is the severity level appropriate? Expect to disable or modify 20–40% of initial rules based on this feedback. Target a final alert volume of 5–15 actionable alerts per care coordinator per day. More than 25/day leads to alert fatigue; fewer than 3/day may indicate rules are too conservative.

Custom AI Components

Lab Trend Detection Engine

Type: agent Core analytics engine that runs on a scheduled timer (every 30 minutes), queries the PostgreSQL analytical database for patients with new lab results, applies configurable clinical rules to detect abnormal trends, and generates prioritized alerts. Implements three detection strategies: (1) Linear regression slope analysis for gradual trends, (2) Threshold crossing detection for values moving out of normal range, (3) Missing follow-up detection for overdue labs. The engine uses patient-specific baselines when available and falls back to population reference ranges.

Implementation

Lab Trend Detection Engine — Azure Function (Python)
python
import psycopg2
import psycopg2.extras
import numpy as np
from scipy import stats
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Tuple
import json
import uuid
import os
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('trend_detection')

class TrendDetectionEngine:
    """
    Lab Trend Detection Engine
    Analyzes patient lab results for abnormal trends and generates care coordinator alerts.
    
    Detection Strategies:
    1. trend_slope: Linear regression on time-series lab values to detect gradual worsening
    2. threshold_crossing: Consecutive abnormal results beyond reference ranges
    3. missing_followup: Patients overdue for expected follow-up labs
    """
    
    def __init__(self, pg_connection_string: str):
        self.pg_conn_str = pg_connection_string
        self.conn = None
    
    def connect(self):
        self.conn = psycopg2.connect(self.pg_conn_str)
        self.conn.autocommit = False
    
    def close(self):
        if self.conn:
            self.conn.close()
    
    def run_analysis(self):
        """Main entry point: run all active rules against patients with recent lab results."""
        self.connect()
        try:
            rules = self._load_active_rules()
            logger.info(f'Loaded {len(rules)} active clinical rules')
            
            new_alerts = []
            for rule in rules:
                if rule['rule_type'] == 'trend_slope':
                    alerts = self._analyze_trend_slope(rule)
                elif rule['rule_type'] == 'threshold_crossing':
                    alerts = self._analyze_threshold_crossing(rule)
                elif rule['rule_type'] == 'missing_followup':
                    alerts = self._analyze_missing_followup(rule)
                else:
                    logger.warning(f"Unknown rule type: {rule['rule_type']}")
                    continue
                
                new_alerts.extend(alerts)
                logger.info(f"Rule '{rule['rule_name']}': {len(alerts)} new alerts")
            
            # Deduplicate: don't re-alert for same patient+loinc within 7 days
            deduplicated = self._deduplicate_alerts(new_alerts)
            
            # Save alerts to database
            self._save_alerts(deduplicated)
            
            self.conn.commit()
            logger.info(f'Analysis complete: {len(deduplicated)} new alerts generated')
            return deduplicated
            
        except Exception as e:
            self.conn.rollback()
            logger.error(f'Analysis failed: {str(e)}')
            raise
        finally:
            self.close()
    
    def _load_active_rules(self) -> List[Dict]:
        cur = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
        cur.execute("""
            SELECT rule_id, loinc_code, rule_name, rule_type, 
                   threshold_json, severity
            FROM lab_data.clinical_rules 
            WHERE is_active = TRUE
        """)
        rules = cur.fetchall()
        cur.close()
        # Parse threshold_json
        for rule in rules:
            if isinstance(rule['threshold_json'], str):
                rule['threshold_json'] = json.loads(rule['threshold_json'])
        return rules
    
    def _analyze_trend_slope(self, rule: Dict) -> List[Dict]:
        """
        Linear regression analysis on time-series lab values.
        Detects gradual worsening trends (e.g., HbA1c slowly rising over 18 months).
        
        Algorithm:
        1. Query patients with >= min_data_points results for this LOINC code
        2. For each patient, fit OLS linear regression: value = slope * days + intercept
        3. If slope exceeds threshold in the specified direction AND latest value
           exceeds absolute threshold, generate alert
        4. Calculate p-value to filter out statistically insignificant trends
        """
        alerts = []
        thresholds = rule['threshold_json']
        min_points = thresholds.get('min_data_points', 3)
        lookback_months = thresholds.get('lookback_months', 12)
        slope_threshold = thresholds.get('slope_threshold', 0)
        direction = thresholds.get('direction', 'increasing')
        absolute_threshold = thresholds.get('absolute_threshold')
        
        lookback_date = datetime.utcnow() - timedelta(days=lookback_months * 30)
        
        cur = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
        
        # Get patients with sufficient data points for this lab
        cur.execute("""
            SELECT lr.patient_id, p.first_name, p.last_name, p.mrn,
                   array_agg(lr.value_numeric ORDER BY lr.effective_date) as values,
                   array_agg(lr.effective_date ORDER BY lr.effective_date) as dates,
                   array_agg(lr.unit ORDER BY lr.effective_date) as units,
                   COUNT(*) as result_count
            FROM lab_data.lab_results lr
            JOIN lab_data.patients p ON p.patient_id = lr.patient_id
            WHERE lr.loinc_code = %s
              AND lr.value_numeric IS NOT NULL
              AND lr.effective_date >= %s
            GROUP BY lr.patient_id, p.first_name, p.last_name, p.mrn
            HAVING COUNT(*) >= %s
        """, (rule['loinc_code'], lookback_date, min_points))
        
        patients = cur.fetchall()
        cur.close()
        
        for patient in patients:
            values = [float(v) for v in patient['values'] if v is not None]
            dates = patient['dates']
            
            if len(values) < min_points:
                continue
            
            # Convert dates to days since first measurement for regression
            day_offsets = [(d - dates[0]).total_seconds() / 86400.0 for d in dates]
            
            # Perform linear regression
            slope, intercept, r_value, p_value, std_err = stats.linregress(
                day_offsets, values
            )
            
            # Normalize slope to per-month for interpretability
            slope_per_month = slope * 30.44  # avg days per month
            
            # Check if trend meets criteria
            trend_detected = False
            latest_value = values[-1]
            
            if direction == 'increasing':
                # Slope must be positive and exceed threshold
                # AND latest value should be approaching/exceeding clinical threshold
                if slope_per_month > 0 and abs(slope_per_month) >= abs(slope_threshold / (lookback_months / len(values))):
                    if absolute_threshold is None or latest_value >= absolute_threshold * 0.85:
                        trend_detected = True
            elif direction == 'decreasing':
                if slope_per_month < 0 and abs(slope_per_month) >= abs(slope_threshold / (lookback_months / len(values))):
                    if absolute_threshold is None or latest_value <= absolute_threshold * 1.15:
                        trend_detected = True
            
            # Require statistical significance (p < 0.1 for clinical use)
            if trend_detected and p_value < 0.10:
                severity = rule['severity']
                
                # Escalate to critical if crossing critical threshold
                critical_threshold = thresholds.get('critical_threshold')
                if critical_threshold:
                    if (direction == 'decreasing' and latest_value <= critical_threshold) or \
                       (direction == 'increasing' and latest_value >= critical_threshold):
                        severity = 'critical'
                
                alert = {
                    'patient_id': str(patient['patient_id']),
                    'patient_name': f"{patient['first_name']} {patient['last_name']}",
                    'mrn': patient['mrn'],
                    'loinc_code': rule['loinc_code'],
                    'loinc_display': thresholds.get('description', rule['rule_name']),
                    'alert_type': 'trend_slope',
                    'severity': severity,
                    'trend_direction': direction,
                    'trend_slope': round(slope_per_month, 4),
                    'data_points_count': len(values),
                    'first_result_date': dates[0].isoformat(),
                    'latest_result_date': dates[-1].isoformat(),
                    'latest_value': round(latest_value, 2),
                    'unit': patient['units'][-1] if patient['units'] else '',
                    'r_squared': round(r_value**2, 3),
                    'p_value': round(p_value, 4),
                    'message': self._generate_trend_message(
                        rule['rule_name'], patient, values, dates,
                        slope_per_month, direction, latest_value
                    )
                }
                alerts.append(alert)
        
        return alerts
    
    def _analyze_threshold_crossing(self, rule: Dict) -> List[Dict]:
        """
        Detects consecutive results outside reference ranges.
        Useful for labs where the absolute value matters more than the trend
        (e.g., liver enzymes, TSH).
        """
        alerts = []
        thresholds = rule['threshold_json']
        min_points = thresholds.get('min_data_points', 2)
        lookback_months = thresholds.get('lookback_months', 12)
        upper_threshold = thresholds.get('upper_threshold')
        lower_threshold = thresholds.get('lower_threshold')
        consecutive_required = thresholds.get('consecutive_abnormal', 2)
        
        lookback_date = datetime.utcnow() - timedelta(days=lookback_months * 30)
        
        cur = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
        cur.execute("""
            SELECT lr.patient_id, p.first_name, p.last_name, p.mrn,
                   array_agg(lr.value_numeric ORDER BY lr.effective_date) as values,
                   array_agg(lr.effective_date ORDER BY lr.effective_date) as dates,
                   array_agg(lr.unit ORDER BY lr.effective_date) as units
            FROM lab_data.lab_results lr
            JOIN lab_data.patients p ON p.patient_id = lr.patient_id
            WHERE lr.loinc_code = %s
              AND lr.value_numeric IS NOT NULL
              AND lr.effective_date >= %s
            GROUP BY lr.patient_id, p.first_name, p.last_name, p.mrn
            HAVING COUNT(*) >= %s
        """, (rule['loinc_code'], lookback_date, min_points))
        
        patients = cur.fetchall()
        cur.close()
        
        for patient in patients:
            values = [float(v) for v in patient['values'] if v is not None]
            dates = patient['dates']
            
            # Check last N values for consecutive abnormals
            recent_values = values[-consecutive_required:]
            
            consecutive_high = all(
                v > upper_threshold for v in recent_values
            ) if upper_threshold else False
            
            consecutive_low = all(
                v < lower_threshold for v in recent_values
            ) if lower_threshold else False
            
            if consecutive_high or consecutive_low:
                latest_value = values[-1]
                severity = rule['severity']
                
                # Check for critical multiplier
                multiplier = thresholds.get('multiplier_critical')
                if multiplier and upper_threshold and latest_value > upper_threshold * multiplier:
                    severity = 'critical'
                
                direction = 'high' if consecutive_high else 'low'
                threshold_val = upper_threshold if consecutive_high else lower_threshold
                
                alert = {
                    'patient_id': str(patient['patient_id']),
                    'patient_name': f"{patient['first_name']} {patient['last_name']}",
                    'mrn': patient['mrn'],
                    'loinc_code': rule['loinc_code'],
                    'loinc_display': thresholds.get('description', rule['rule_name']),
                    'alert_type': 'threshold_crossing',
                    'severity': severity,
                    'trend_direction': direction,
                    'trend_slope': None,
                    'data_points_count': len(values),
                    'first_result_date': dates[0].isoformat(),
                    'latest_result_date': dates[-1].isoformat(),
                    'latest_value': round(latest_value, 2),
                    'unit': patient['units'][-1] if patient['units'] else '',
                    'message': f"{rule['rule_name']}: {consecutive_required} consecutive results "
                               f"{'above' if consecutive_high else 'below'} threshold "
                               f"({threshold_val}). Latest: {round(latest_value, 2)}."
                }
                alerts.append(alert)
        
        return alerts
    
    def _analyze_missing_followup(self, rule: Dict) -> List[Dict]:
        """
        Identifies patients overdue for follow-up labs based on their last result
        and clinical criteria (e.g., diabetic patient without HbA1c in 6 months).
        """
        alerts = []
        thresholds = rule['threshold_json']
        overdue_months = thresholds.get('last_result_older_than_months', 6)
        
        cutoff_date = datetime.utcnow() - timedelta(days=overdue_months * 30)
        
        cur = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
        cur.execute("""
            SELECT lr.patient_id, p.first_name, p.last_name, p.mrn,
                   MAX(lr.effective_date) as last_result_date,
                   (array_agg(lr.value_numeric ORDER BY lr.effective_date DESC))[1] as last_value
            FROM lab_data.lab_results lr
            JOIN lab_data.patients p ON p.patient_id = lr.patient_id
            WHERE lr.loinc_code = %s
              AND lr.value_numeric IS NOT NULL
            GROUP BY lr.patient_id, p.first_name, p.last_name, p.mrn
            HAVING MAX(lr.effective_date) < %s
               AND MAX(lr.effective_date) > %s
        """, (rule['loinc_code'], cutoff_date, 
              cutoff_date - timedelta(days=365)))  # Only if they had a result in past 18 months
        
        patients = cur.fetchall()
        cur.close()
        
        for patient in patients:
            last_value = float(patient['last_value']) if patient['last_value'] else None
            days_overdue = (datetime.utcnow().date() - patient['last_result_date'].date()).days
            
            alert = {
                'patient_id': str(patient['patient_id']),
                'patient_name': f"{patient['first_name']} {patient['last_name']}",
                'mrn': patient['mrn'],
                'loinc_code': rule['loinc_code'],
                'loinc_display': thresholds.get('description', rule['rule_name']),
                'alert_type': 'missing_followup',
                'severity': rule['severity'],
                'trend_direction': 'overdue',
                'trend_slope': None,
                'data_points_count': 0,
                'first_result_date': None,
                'latest_result_date': patient['last_result_date'].isoformat(),
                'latest_value': round(last_value, 2) if last_value else None,
                'unit': '',
                'message': f"{rule['rule_name']}: Last result was {days_overdue} days ago "
                           f"(last value: {round(last_value, 2) if last_value else 'N/A'}). "
                           f"Follow-up lab is overdue."
            }
            alerts.append(alert)
        
        return alerts
    
    def _deduplicate_alerts(self, alerts: List[Dict]) -> List[Dict]:
        """Remove alerts that duplicate an existing open alert within the past 7 days."""
        cur = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
        deduplicated = []
        
        for alert in alerts:
            cur.execute("""
                SELECT alert_id FROM lab_data.trend_alerts
                WHERE patient_id = %s::uuid
                  AND loinc_code = %s
                  AND alert_type = %s
                  AND status IN ('new', 'acknowledged')
                  AND created_at > NOW() - INTERVAL '7 days'
                LIMIT 1
            """, (alert['patient_id'], alert['loinc_code'], alert['alert_type']))
            
            if cur.fetchone() is None:
                deduplicated.append(alert)
        
        cur.close()
        return deduplicated
    
    def _save_alerts(self, alerts: List[Dict]):
        """Persist new alerts to the database."""
        if not alerts:
            return
        
        cur = self.conn.cursor()
        for alert in alerts:
            cur.execute("""
                INSERT INTO lab_data.trend_alerts
                (patient_id, loinc_code, alert_type, severity, trend_direction,
                 trend_slope, data_points_count, first_result_date, latest_result_date,
                 latest_value, message, status)
                VALUES (%s::uuid, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'new')
                RETURNING alert_id
            """, (
                alert['patient_id'], alert['loinc_code'], alert['alert_type'],
                alert['severity'], alert['trend_direction'], alert.get('trend_slope'),
                alert['data_points_count'],
                alert.get('first_result_date'), alert.get('latest_result_date'),
                alert.get('latest_value'), alert['message']
            ))
            alert['alert_id'] = str(cur.fetchone()[0])
        cur.close()
    
    def _generate_trend_message(self, rule_name, patient, values, dates,
                                 slope_per_month, direction, latest_value) -> str:
        """Generate human-readable trend description for care coordinator."""
        first_val = round(values[0], 2)
        last_val = round(latest_value, 2)
        duration_days = (dates[-1] - dates[0]).days
        duration_months = round(duration_days / 30.44, 1)
        
        direction_word = 'increasing' if direction == 'increasing' else 'decreasing'
        
        return (
            f"{rule_name}: {patient['first_name']} {patient['last_name']} shows "
            f"{direction_word} trend over {duration_months} months "
            f"({len(values)} results). Values moved from {first_val} to {last_val} "
            f"(slope: {round(slope_per_month, 3)}/month). Review recommended."
        )


# Azure Function entry point
def main(timer):
    """Timer-triggered Azure Function that runs trend detection every 30 minutes."""
    pg_conn = os.environ['PG_CONNECTION_STRING']
    shadow_mode = os.environ.get('SHADOW_MODE', 'false').lower() == 'true'
    min_severity = os.environ.get('MIN_ALERT_SEVERITY', 'low')
    
    severity_order = {'low': 0, 'medium': 1, 'high': 2, 'critical': 3}
    min_severity_level = severity_order.get(min_severity, 0)
    
    engine = TrendDetectionEngine(pg_conn)
    alerts = engine.run_analysis()
    
    if not shadow_mode:
        from alert_notification import AlertNotifier
        notifier = AlertNotifier()
        
        for alert in alerts:
            alert_severity_level = severity_order.get(alert['severity'], 0)
            if alert_severity_level >= min_severity_level:
                # Look up care coordinator assignment
                coordinator = get_assigned_coordinator(alert['patient_id'])
                if coordinator:
                    alert['coordinator_email'] = coordinator['email']
                    alert['coordinator_phone'] = coordinator['phone']
                    alert['dashboard_url'] = f"https://dashboard.clientname.com/superset/dashboard/patient-trends/?patient_id={alert['patient_id']}"
                    notifier.send_alert(alert)
    
    logger.info(f'Function completed. {len(alerts)} alerts processed. Shadow mode: {shadow_mode}')


# requirements.txt:
# psycopg2-binary==2.9.9
# numpy==1.26.4
# scipy==1.12.0
# azure-functions==1.19.0
# azure-communication-email==1.0.0
# twilio==9.0.0

LOINC Normalization Mapper

Type: integration Maps proprietary lab codes from EHR/LIS systems to standardized LOINC codes. Many legacy lab interfaces use internal codes that must be translated before trend detection can work across different lab vendors. This component maintains a mapping table and provides fuzzy matching for unmapped codes.

Implementation:

python
import psycopg2
import psycopg2.extras
from difflib import SequenceMatcher
import csv
import logging

logger = logging.getLogger('loinc_mapper')

class LOINCMapper:
    """
    Maps proprietary lab codes to standard LOINC codes.
    
    Usage:
    1. Load the LOINC database subset (common lab tests) into PostgreSQL
    2. Create practice-specific mapping entries for known proprietary codes
    3. Call map_code() during Mirth channel processing for each OBX segment
    4. Unmapped codes are logged for manual review and mapping
    """
    
    # Common lab LOINC codes for medical practices
    COMMON_LOINC_CODES = {
        # Metabolic Panel
        '2345-7': 'Glucose [Mass/volume] in Serum or Plasma',
        '3094-0': 'Urea nitrogen [Mass/volume] in Serum or Plasma',
        '2160-0': 'Creatinine [Mass/volume] in Serum or Plasma',
        '17861-6': 'Calcium [Mass/volume] in Serum or Plasma',
        '2951-2': 'Sodium [Moles/volume] in Serum or Plasma',
        '2823-3': 'Potassium [Moles/volume] in Serum or Plasma',
        '2075-0': 'Chloride [Moles/volume] in Serum or Plasma',
        '1963-8': 'Bicarbonate [Moles/volume] in Serum or Plasma',
        '1751-7': 'Albumin [Mass/volume] in Serum or Plasma',
        '1975-2': 'Bilirubin.total [Mass/volume] in Serum or Plasma',
        '6768-6': 'Alkaline phosphatase [Enzymatic activity/volume] in Serum or Plasma',
        '1742-6': 'Alanine aminotransferase [Enzymatic activity/volume] in Serum or Plasma',
        '1920-8': 'Aspartate aminotransferase [Enzymatic activity/volume] in Serum or Plasma',
        '2885-2': 'Protein [Mass/volume] in Serum or Plasma',
        
        # CBC
        '6690-2': 'Leukocytes [#/volume] in Blood',
        '789-8': 'Erythrocytes [#/volume] in Blood',
        '718-7': 'Hemoglobin [Mass/volume] in Blood',
        '4544-3': 'Hematocrit [Volume Fraction] of Blood',
        '777-3': 'Platelets [#/volume] in Blood',
        '787-2': 'MCV [Entitic volume]',
        '786-4': 'MCHC [Mass/volume]',
        '785-6': 'MCH [Entitic mass]',
        
        # Lipid Panel
        '2093-3': 'Cholesterol [Mass/volume] in Serum or Plasma',
        '2571-8': 'Triglyceride [Mass/volume] in Serum or Plasma',
        '2085-9': 'HDL Cholesterol [Mass/volume] in Serum or Plasma',
        '2089-1': 'LDL Cholesterol [Mass/volume] in Serum or Plasma',
        
        # Diabetes
        '4548-4': 'Hemoglobin A1c/Hemoglobin.total in Blood',
        '14749-6': 'Glucose [Mass/volume] in Serum or Plasma --fasting',
        
        # Thyroid
        '3016-3': 'Thyrotropin [Units/volume] in Serum or Plasma',
        '3053-6': 'Triiodothyronine (T3) Free [Mass/volume] in Serum or Plasma',
        '3024-7': 'Thyroxine (T4) Free [Mass/volume] in Serum or Plasma',
        
        # Kidney
        '33914-3': 'Glomerular filtration rate/1.73 sq M predicted',
        '14959-1': 'Microalbumin [Mass/volume] in Urine',
        '13705-9': 'Albumin/Creatinine [Mass Ratio] in Urine',
        
        # Vitamins/Minerals
        '1989-3': 'Vitamin D [Mass/volume] in Serum or Plasma',
        '2132-9': 'Vitamin B12 [Mass/volume] in Serum or Plasma',
        '2498-4': 'Iron [Mass/volume] in Serum or Plasma',
        '2502-3': 'Ferritin [Mass/volume] in Serum or Plasma',
        
        # Inflammation
        '1988-5': 'C reactive protein [Mass/volume] in Serum or Plasma',
        '4537-7': 'Erythrocyte sedimentation rate',
        
        # Prostate
        '2857-1': 'Prostate specific Ag [Mass/volume] in Serum or Plasma',
        
        # Urinalysis
        '5811-5': 'Specific gravity of Urine',
        '2756-5': 'pH of Urine',
    }
    
    def __init__(self, pg_connection_string: str):
        self.pg_conn_str = pg_connection_string
    
    def initialize_mapping_table(self):
        """Create the mapping table and seed with common LOINC codes."""
        conn = psycopg2.connect(self.pg_conn_str)
        cur = conn.cursor()
        
        cur.execute("""
            CREATE TABLE IF NOT EXISTS lab_data.loinc_mappings (
                mapping_id SERIAL PRIMARY KEY,
                source_system VARCHAR(100) NOT NULL,
                source_code VARCHAR(50) NOT NULL,
                source_display VARCHAR(255),
                loinc_code VARCHAR(20),
                loinc_display VARCHAR(255),
                confidence VARCHAR(20) DEFAULT 'manual',
                is_verified BOOLEAN DEFAULT FALSE,
                created_at TIMESTAMPTZ DEFAULT NOW(),
                verified_by VARCHAR(100),
                UNIQUE(source_system, source_code)
            );
            
            CREATE TABLE IF NOT EXISTS lab_data.unmapped_codes (
                id SERIAL PRIMARY KEY,
                source_system VARCHAR(100),
                source_code VARCHAR(50),
                source_display VARCHAR(255),
                sample_value VARCHAR(100),
                sample_unit VARCHAR(50),
                occurrence_count INTEGER DEFAULT 1,
                first_seen TIMESTAMPTZ DEFAULT NOW(),
                last_seen TIMESTAMPTZ DEFAULT NOW(),
                suggested_loinc VARCHAR(20),
                suggested_confidence DECIMAL(3,2),
                resolved BOOLEAN DEFAULT FALSE,
                UNIQUE(source_system, source_code)
            );
        """)
        
        # Seed LOINC reference data
        for code, display in self.COMMON_LOINC_CODES.items():
            cur.execute("""
                INSERT INTO lab_data.loinc_mappings 
                (source_system, source_code, source_display, loinc_code, loinc_display, confidence, is_verified)
                VALUES ('LOINC', %s, %s, %s, %s, 'canonical', TRUE)
                ON CONFLICT (source_system, source_code) DO NOTHING
            """, (code, display, code, display))
        
        conn.commit()
        cur.close()
        conn.close()
        logger.info(f'Initialized LOINC mapping table with {len(self.COMMON_LOINC_CODES)} codes')
    
    def map_code(self, source_system: str, source_code: str, 
                 source_display: str = None) -> dict:
        """
        Map a proprietary lab code to LOINC.
        Returns: {'loinc_code': str, 'loinc_display': str, 'confidence': str}
        or None if unmapped.
        """
        conn = psycopg2.connect(self.pg_conn_str)
        cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
        
        # First: check if it's already a valid LOINC code
        if source_code in self.COMMON_LOINC_CODES:
            cur.close()
            conn.close()
            return {
                'loinc_code': source_code,
                'loinc_display': self.COMMON_LOINC_CODES[source_code],
                'confidence': 'direct'
            }
        
        # Second: check mapping table
        cur.execute("""
            SELECT loinc_code, loinc_display, confidence
            FROM lab_data.loinc_mappings
            WHERE source_system = %s AND source_code = %s AND is_verified = TRUE
        """, (source_system, source_code))
        
        mapping = cur.fetchone()
        if mapping:
            cur.close()
            conn.close()
            return dict(mapping)
        
        # Third: fuzzy match on display name
        if source_display:
            best_match = None
            best_score = 0
            for loinc_code, loinc_display in self.COMMON_LOINC_CODES.items():
                score = SequenceMatcher(None, 
                    source_display.lower(), 
                    loinc_display.lower()
                ).ratio()
                if score > best_score:
                    best_score = score
                    best_match = (loinc_code, loinc_display)
            
            if best_score >= 0.7:
                # Log as suggested mapping for human review
                cur.execute("""
                    INSERT INTO lab_data.unmapped_codes 
                    (source_system, source_code, source_display, suggested_loinc, suggested_confidence)
                    VALUES (%s, %s, %s, %s, %s)
                    ON CONFLICT (source_system, source_code) 
                    DO UPDATE SET occurrence_count = lab_data.unmapped_codes.occurrence_count + 1,
                                 last_seen = NOW()
                """, (source_system, source_code, source_display, best_match[0], best_score))
                conn.commit()
        
        # Log unmapped code
        cur.execute("""
            INSERT INTO lab_data.unmapped_codes 
            (source_system, source_code, source_display)
            VALUES (%s, %s, %s)
            ON CONFLICT (source_system, source_code) 
            DO UPDATE SET occurrence_count = lab_data.unmapped_codes.occurrence_count + 1,
                         last_seen = NOW()
        """, (source_system, source_code, source_display))
        conn.commit()
        
        cur.close()
        conn.close()
        return None
    
    def get_unmapped_codes_report(self) -> list:
        """Generate report of unmapped codes for clinical review."""
        conn = psycopg2.connect(self.pg_conn_str)
        cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
        cur.execute("""
            SELECT source_system, source_code, source_display, 
                   occurrence_count, suggested_loinc, suggested_confidence,
                   first_seen, last_seen
            FROM lab_data.unmapped_codes
            WHERE resolved = FALSE
            ORDER BY occurrence_count DESC
        """)
        results = cur.fetchall()
        cur.close()
        conn.close()
        return results

Care Coordinator Alert Dashboard (Superset Configuration)

Type: workflow Pre-built Apache Superset dashboard configuration for care coordinators to view, acknowledge, and manage lab trend alerts. Includes four dashboard tabs: Active Alerts Queue, Patient Trend Detail, Provider Panel Overview, and Quality Metrics.

Implementation:

Superset Dashboard Configuration
yaml
# import via CLI or use as manual creation specification

# Superset Dashboard Configuration
# Import via Superset CLI: superset import-dashboards -p dashboard_export.json
# Below is the specification for manual dashboard creation:

---
dashboard_name: "Lab Trend Alerts - Care Coordinator"
refresh_interval: 300  # 5 minutes

database_connection:
  name: "LabTrends Analytics"
  sqlalchemy_uri: "postgresql://labtrends_readonly:{password}@pg-clientname-labtrends.postgres.database.azure.com:5432/labtrends_analytics?sslmode=require"
  extra:
    allows_virtual_table_explore: true

datasets:
  - name: "active_alerts"
    sql: |
      SELECT 
        ta.alert_id,
        ta.created_at as alert_date,
        p.first_name || ' ' || p.last_name as patient_name,
        p.mrn,
        ta.loinc_code,
        cr.rule_name,
        ta.severity,
        ta.alert_type,
        ta.trend_direction,
        ta.latest_value,
        ta.message,
        ta.status,
        ta.data_points_count,
        ta.first_result_date,
        ta.latest_result_date,
        EXTRACT(EPOCH FROM (NOW() - ta.created_at))/3600 as hours_since_alert,
        ta.acknowledged_at,
        ta.resolved_at,
        ta.resolution_notes
      FROM lab_data.trend_alerts ta
      JOIN lab_data.patients p ON p.patient_id = ta.patient_id
      LEFT JOIN lab_data.clinical_rules cr ON cr.loinc_code = ta.loinc_code AND cr.rule_type = ta.alert_type

  - name: "patient_lab_trends"
    sql: |
      SELECT
        p.patient_id,
        p.first_name || ' ' || p.last_name as patient_name,
        p.mrn,
        lr.loinc_code,
        lr.loinc_display as lab_test,
        lr.value_numeric,
        lr.unit,
        lr.reference_range_low,
        lr.reference_range_high,
        lr.interpretation_code,
        lr.effective_date,
        lr.performing_lab
      FROM lab_data.lab_results lr
      JOIN lab_data.patients p ON p.patient_id = lr.patient_id
      WHERE lr.value_numeric IS NOT NULL

  - name: "alert_summary_metrics"
    sql: |
      SELECT
        DATE_TRUNC('week', created_at) as week,
        severity,
        alert_type,
        status,
        COUNT(*) as alert_count,
        COUNT(DISTINCT patient_id) as patients_affected,
        AVG(EXTRACT(EPOCH FROM (COALESCE(acknowledged_at, NOW()) - created_at))/3600) as avg_hours_to_acknowledge
      FROM lab_data.trend_alerts
      WHERE created_at > NOW() - INTERVAL '90 days'
      GROUP BY DATE_TRUNC('week', created_at), severity, alert_type, status

charts:
  - name: "Active Alerts Queue"
    chart_type: "table"
    dataset: "active_alerts"
    description: "Primary work queue for care coordinators"
    filters:
      - column: status
        values: ["new", "acknowledged"]
    columns:
      - severity  # Color-coded: critical=red, high=orange, medium=yellow, low=blue
      - patient_name
      - mrn
      - rule_name
      - message
      - latest_value
      - alert_date
      - hours_since_alert
      - status
    sort:
      - column: severity
        order: "desc"  # critical first
      - column: alert_date
        order: "desc"
    row_limit: 100

  - name: "Alerts by Severity (This Week)"
    chart_type: "pie"
    dataset: "active_alerts"
    filters:
      - column: status
        values: ["new"]
      - column: alert_date
        operator: ">="
        value: "{{ from_dttm }}"  # This week
    metric: "COUNT(*)"
    group_by: "severity"
    color_scheme: "supersetColors"

  - name: "Alert Volume Trend"
    chart_type: "line"
    dataset: "alert_summary_metrics"
    x_axis: "week"
    metrics:
      - "SUM(alert_count) as total_alerts"
      - "SUM(patients_affected) as unique_patients"
    group_by: "severity"

  - name: "Patient Lab Trend Chart"
    chart_type: "line"
    dataset: "patient_lab_trends"
    description: "Time-series chart for individual patient lab values"
    x_axis: "effective_date"
    y_axis: "value_numeric"
    filters:
      - column: patient_id
        type: "filter_box"  # Dropdown for patient selection
      - column: loinc_code
        type: "filter_box"
    annotation_layers:
      - name: "Reference Range"
        type: "formula"
        formulas:
          - "reference_range_high"  # Upper reference line
          - "reference_range_low"   # Lower reference line

  - name: "Mean Time to Acknowledge"
    chart_type: "big_number"
    dataset: "alert_summary_metrics"
    metric: "AVG(avg_hours_to_acknowledge)"
    subheader: "hours"
    filters:
      - column: week
        operator: ">="
        value: "{{ from_dttm }}"

  - name: "Open Critical Alerts Count"
    chart_type: "big_number"
    dataset: "active_alerts"
    metric: "COUNT(*)"
    filters:
      - column: status
        values: ["new"]
      - column: severity
        values: ["critical"]
    subheader: "critical alerts unresolved"
    color: "#dc3545"

dashboard_layout:
  tabs:
    - name: "Alert Queue"
      charts:
        - "Open Critical Alerts Count"
        - "Alerts by Severity (This Week)"
        - "Active Alerts Queue"
    - name: "Patient Detail"
      charts:
        - "Patient Lab Trend Chart"  # With filter boxes for patient/lab selection
    - name: "Metrics"
      charts:
        - "Alert Volume Trend"
        - "Mean Time to Acknowledge"
    - name: "Provider Panel"
      charts:
        - "Alerts by Provider"  # Additional chart grouping alerts by primary_provider_id

roles:
  care_coordinator:
    permissions:
      - datasource_access: ["active_alerts", "patient_lab_trends", "alert_summary_metrics"]
      - dashboard_access: ["Lab Trend Alerts - Care Coordinator"]
      - can_explore: true
      - can_csv: true  # Allow export for outreach lists
  provider:
    permissions:
      - datasource_access: ["patient_lab_trends", "alert_summary_metrics"]
      - dashboard_access: ["Lab Trend Alerts - Care Coordinator"]
      - can_explore: false
  admin:
    permissions:
      - all_datasource_access: true
      - all_database_access: true
      - can_sql_lab: true

Alert Acknowledgment and Resolution API

Type: skill REST API endpoints that allow care coordinators to acknowledge, resolve, and add notes to lab trend alerts directly from the dashboard or via external integrations. Tracks response times for quality metrics.

Implementation:

alert_api.py
python
# FastAPI app deployable as Azure Function App HTTP triggers or standalone
# service

# File: alert_api.py
# Deploy as Azure Function App HTTP triggers or Flask/FastAPI app

from fastapi import FastAPI, HTTPException, Depends, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel, Field
from datetime import datetime
from typing import Optional
import psycopg2
import psycopg2.extras
import os
import jwt
import logging

logger = logging.getLogger('alert_api')
app = FastAPI(title='Lab Trend Alert API', version='1.0.0')
security = HTTPBearer()

PG_CONN_STR = os.environ['PG_CONNECTION_STRING']
JWT_SECRET = os.environ['JWT_SECRET']  # From Azure AD token validation
AZURE_AD_TENANT = os.environ['AZURE_AD_TENANT_ID']


class AlertAcknowledge(BaseModel):
    coordinator_id: str
    notes: Optional[str] = None

class AlertResolve(BaseModel):
    coordinator_id: str
    resolution_notes: str = Field(..., min_length=10, description='Describe action taken')
    outcome: str = Field(..., pattern='^(contacted_patient|ordered_labs|referred_provider|no_action_needed|other)$')

class AlertSnooze(BaseModel):
    coordinator_id: str
    snooze_days: int = Field(..., ge=1, le=30)
    reason: str


def get_db():
    conn = psycopg2.connect(PG_CONN_STR)
    try:
        yield conn
    finally:
        conn.close()

def validate_token(credentials: HTTPAuthorizationCredentials = Security(security)):
    """Validate Azure AD JWT token."""
    try:
        # In production, validate against Azure AD JWKS endpoint
        payload = jwt.decode(
            credentials.credentials,
            JWT_SECRET,
            algorithms=['RS256'],
            audience=os.environ.get('API_AUDIENCE'),
            issuer=f'https://sts.windows.net/{AZURE_AD_TENANT}/'
        )
        return payload
    except jwt.InvalidTokenError as e:
        raise HTTPException(status_code=401, detail=f'Invalid token: {str(e)}')


@app.get('/api/alerts')
def list_alerts(
    status: str = 'new',
    severity: Optional[str] = None,
    limit: int = 50,
    conn=Depends(get_db),
    user=Depends(validate_token)
):
    """List alerts filtered by status and severity."""
    cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
    query = """
        SELECT ta.*, p.first_name, p.last_name, p.mrn
        FROM lab_data.trend_alerts ta
        JOIN lab_data.patients p ON p.patient_id = ta.patient_id
        WHERE ta.status = %s
    """
    params = [status]
    
    if severity:
        query += ' AND ta.severity = %s'
        params.append(severity)
    
    query += ' ORDER BY ta.severity DESC, ta.created_at DESC LIMIT %s'
    params.append(limit)
    
    cur.execute(query, params)
    alerts = cur.fetchall()
    cur.close()
    
    # Convert datetime objects to ISO strings
    for alert in alerts:
        for key, val in alert.items():
            if isinstance(val, datetime):
                alert[key] = val.isoformat()
    
    return {'alerts': alerts, 'count': len(alerts)}


@app.post('/api/alerts/{alert_id}/acknowledge')
def acknowledge_alert(
    alert_id: str,
    body: AlertAcknowledge,
    conn=Depends(get_db),
    user=Depends(validate_token)
):
    """Mark an alert as acknowledged by a care coordinator."""
    cur = conn.cursor()
    cur.execute("""
        UPDATE lab_data.trend_alerts
        SET status = 'acknowledged',
            assigned_coordinator_id = %s::uuid,
            acknowledged_at = NOW()
        WHERE alert_id = %s::uuid AND status = 'new'
        RETURNING alert_id
    """, (body.coordinator_id, alert_id))
    
    result = cur.fetchone()
    if not result:
        conn.rollback()
        raise HTTPException(status_code=404, detail='Alert not found or already acknowledged')
    
    conn.commit()
    cur.close()
    
    logger.info(f'Alert {alert_id} acknowledged by {body.coordinator_id}')
    return {'status': 'acknowledged', 'alert_id': alert_id}


@app.post('/api/alerts/{alert_id}/resolve')
def resolve_alert(
    alert_id: str,
    body: AlertResolve,
    conn=Depends(get_db),
    user=Depends(validate_token)
):
    """Mark an alert as resolved with resolution notes and outcome."""
    cur = conn.cursor()
    cur.execute("""
        UPDATE lab_data.trend_alerts
        SET status = 'resolved',
            resolved_at = NOW(),
            resolution_notes = %s
        WHERE alert_id = %s::uuid AND status IN ('new', 'acknowledged')
        RETURNING alert_id, created_at, acknowledged_at
    """, (f'[{body.outcome}] {body.resolution_notes}', alert_id))
    
    result = cur.fetchone()
    if not result:
        conn.rollback()
        raise HTTPException(status_code=404, detail='Alert not found or already resolved')
    
    conn.commit()
    cur.close()
    
    logger.info(f'Alert {alert_id} resolved by {body.coordinator_id}: {body.outcome}')
    return {'status': 'resolved', 'alert_id': alert_id, 'outcome': body.outcome}


@app.post('/api/alerts/{alert_id}/snooze')
def snooze_alert(
    alert_id: str,
    body: AlertSnooze,
    conn=Depends(get_db),
    user=Depends(validate_token)
):
    """Snooze an alert for N days (suppresses re-alerting)."""
    cur = conn.cursor()
    cur.execute("""
        UPDATE lab_data.trend_alerts
        SET status = 'snoozed',
            resolution_notes = %s
        WHERE alert_id = %s::uuid AND status IN ('new', 'acknowledged')
        RETURNING alert_id
    """, (f'Snoozed for {body.snooze_days} days: {body.reason}', alert_id))
    
    result = cur.fetchone()
    if not result:
        conn.rollback()
        raise HTTPException(status_code=404, detail='Alert not found')
    
    conn.commit()
    cur.close()
    return {'status': 'snoozed', 'alert_id': alert_id, 'snooze_days': body.snooze_days}


@app.get('/api/patients/{patient_id}/lab-history')
def get_patient_lab_history(
    patient_id: str,
    loinc_code: Optional[str] = None,
    months: int = 24,
    conn=Depends(get_db),
    user=Depends(validate_token)
):
    """Get complete lab history for a patient, optionally filtered by LOINC code."""
    cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
    
    query = """
        SELECT lr.loinc_code, lr.loinc_display, lr.value_numeric, lr.value_string,
               lr.unit, lr.reference_range_low, lr.reference_range_high,
               lr.interpretation_code, lr.effective_date, lr.performing_lab
        FROM lab_data.lab_results lr
        WHERE lr.patient_id = %s::uuid
          AND lr.effective_date > NOW() - INTERVAL '%s months'
    """
    params = [patient_id, months]
    
    if loinc_code:
        query += ' AND lr.loinc_code = %s'
        params.append(loinc_code)
    
    query += ' ORDER BY lr.loinc_code, lr.effective_date'
    
    cur.execute(query, params)
    results = cur.fetchall()
    cur.close()
    
    for r in results:
        for key, val in r.items():
            if isinstance(val, datetime):
                r[key] = val.isoformat()
    
    return {'patient_id': patient_id, 'lab_results': results, 'count': len(results)}


# Run with: uvicorn alert_api:app --host 0.0.0.0 --port 8000
# requirements.txt additions:
# fastapi==0.109.0
# uvicorn==0.27.0
# pyjwt==2.8.0
# cryptography==42.0.0

Daily Alert Digest Email Generator

Type: prompt Generates a daily summary email for each care coordinator containing their unresolved alert count, new alerts from the past 24 hours, overdue acknowledgments, and a quality metrics snapshot. Runs as a scheduled Azure Function at 6:30 AM local time.

Implementation:

daily_digest.py
python
# Azure Function timer trigger: 0 30 6 * * * (6:30 AM daily)

# File: daily_digest.py
# Azure Function timer trigger: 0 30 6 * * * (6:30 AM daily)

import psycopg2
import psycopg2.extras
import os
from datetime import datetime, timedelta
from azure.communication.email import EmailClient
import logging

logger = logging.getLogger('daily_digest')

def generate_daily_digest():
    conn = psycopg2.connect(os.environ['PG_CONNECTION_STRING'])
    email_client = EmailClient.from_connection_string(os.environ['ACS_CONNECTION_STRING'])
    
    cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
    
    # Get all care coordinators with assigned alerts
    cur.execute("""
        SELECT DISTINCT assigned_coordinator_id as coordinator_id
        FROM lab_data.trend_alerts
        WHERE status IN ('new', 'acknowledged')
          AND assigned_coordinator_id IS NOT NULL
    """)
    coordinators = cur.fetchall()
    
    # Also include coordinators from a coordinators table if you have one
    # For now, generate a practice-wide digest if no coordinator assignment exists
    
    for coord in coordinators:
        coord_id = coord['coordinator_id']
        
        # Get alert counts by severity
        cur.execute("""
            SELECT severity, COUNT(*) as count
            FROM lab_data.trend_alerts
            WHERE status = 'new'
              AND (assigned_coordinator_id = %s OR assigned_coordinator_id IS NULL)
            GROUP BY severity
            ORDER BY CASE severity 
                WHEN 'critical' THEN 1 WHEN 'high' THEN 2 
                WHEN 'medium' THEN 3 WHEN 'low' THEN 4 END
        """, (str(coord_id),))
        severity_counts = cur.fetchall()
        
        # Get new alerts from last 24 hours
        cur.execute("""
            SELECT ta.severity, p.first_name || ' ' || p.last_name as patient_name,
                   p.mrn, ta.message, ta.latest_value, ta.created_at
            FROM lab_data.trend_alerts ta
            JOIN lab_data.patients p ON p.patient_id = ta.patient_id
            WHERE ta.created_at > NOW() - INTERVAL '24 hours'
              AND (ta.assigned_coordinator_id = %s OR ta.assigned_coordinator_id IS NULL)
            ORDER BY ta.severity DESC, ta.created_at DESC
            LIMIT 20
        """, (str(coord_id),))
        new_alerts = cur.fetchall()
        
        # Get overdue acknowledgments (new alerts > 4 hours old)
        cur.execute("""
            SELECT COUNT(*) as overdue_count
            FROM lab_data.trend_alerts
            WHERE status = 'new'
              AND created_at < NOW() - INTERVAL '4 hours'
              AND (assigned_coordinator_id = %s OR assigned_coordinator_id IS NULL)
        """, (str(coord_id),))
        overdue = cur.fetchone()['overdue_count']
        
        # Get weekly resolution stats
        cur.execute("""
            SELECT COUNT(*) as resolved_count,
                   AVG(EXTRACT(EPOCH FROM (resolved_at - created_at))/3600) as avg_resolution_hours
            FROM lab_data.trend_alerts
            WHERE status = 'resolved'
              AND resolved_at > NOW() - INTERVAL '7 days'
        """) 
        weekly_stats = cur.fetchone()
        
        # Build HTML email
        total_open = sum(s['count'] for s in severity_counts)
        critical_count = next((s['count'] for s in severity_counts if s['severity'] == 'critical'), 0)
        
        severity_html = ''.join([
            f"<tr><td style='padding:4px 12px;'>"
            f"<span style='background:{_severity_color(s['severity'])};color:white;padding:2px 8px;border-radius:3px;'>"
            f"{s['severity'].upper()}</span></td>"
            f"<td style='padding:4px 12px;font-weight:bold;'>{s['count']}</td></tr>"
            for s in severity_counts
        ])
        
        new_alerts_html = ''.join([
            f"<tr style='border-bottom:1px solid #eee;'>"
            f"<td style='padding:8px;'><span style='background:{_severity_color(a['severity'])};color:white;padding:1px 6px;border-radius:3px;font-size:11px;'>{a['severity'].upper()}</span></td>"
            f"<td style='padding:8px;'>{a['patient_name']}<br><small style='color:#666;'>MRN: {a['mrn']}</small></td>"
            f"<td style='padding:8px;font-size:13px;'>{a['message'][:120]}...</td></tr>"
            for a in new_alerts
        ]) if new_alerts else "<tr><td colspan='3' style='padding:12px;color:#666;'>No new alerts in the past 24 hours.</td></tr>"
        
        html_body = f"""
        <div style='font-family:Arial,sans-serif;max-width:700px;margin:0 auto;'>
            <div style='background:#2c3e50;color:white;padding:20px;border-radius:8px 8px 0 0;'>
                <h1 style='margin:0;font-size:22px;'>Lab Trend Alert Daily Digest</h1>
                <p style='margin:4px 0 0;opacity:0.8;'>{datetime.now().strftime('%A, %B %d, %Y')}</p>
            </div>
            
            <div style='padding:20px;border:1px solid #ddd;'>
                {'<div style="background:#dc3545;color:white;padding:12px;border-radius:4px;margin-bottom:16px;"><strong>⚠ ' + str(critical_count) + ' CRITICAL alerts require immediate attention</strong></div>' if critical_count > 0 else ''}
                
                <h2 style='color:#2c3e50;border-bottom:2px solid #3498db;padding-bottom:8px;'>Open Alerts Summary</h2>
                <table style='width:100%;border-collapse:collapse;'>
                    <tr style='background:#f8f9fa;'><th style='text-align:left;padding:4px 12px;'>Severity</th><th style='text-align:left;padding:4px 12px;'>Count</th></tr>
                    {severity_html}
                    <tr style='background:#f8f9fa;font-weight:bold;'><td style='padding:4px 12px;'>TOTAL</td><td style='padding:4px 12px;'>{total_open}</td></tr>
                </table>
                
                {'<p style="color:#dc3545;font-weight:bold;">⏰ ' + str(overdue) + ' alerts are overdue for acknowledgment (>4 hours)</p>' if overdue > 0 else ''}
                
                <h2 style='color:#2c3e50;border-bottom:2px solid #3498db;padding-bottom:8px;margin-top:24px;'>New Alerts (Last 24 Hours)</h2>
                <table style='width:100%;border-collapse:collapse;'>
                    {new_alerts_html}
                </table>
                
                <h2 style='color:#2c3e50;border-bottom:2px solid #3498db;padding-bottom:8px;margin-top:24px;'>Weekly Performance</h2>
                <p>Alerts resolved this week: <strong>{weekly_stats['resolved_count'] or 0}</strong></p>
                <p>Average resolution time: <strong>{round(weekly_stats['avg_resolution_hours'] or 0, 1)} hours</strong></p>
                
                <div style='margin-top:24px;padding:16px;background:#f0f7ff;border-radius:4px;text-align:center;'>
                    <a href='https://dashboard.clientname.com/superset/dashboard/alert-queue/' 
                       style='background:#3498db;color:white;padding:12px 24px;border-radius:4px;text-decoration:none;font-weight:bold;'>Open Alert Dashboard →</a>
                </div>
            </div>
            
            <div style='padding:12px;text-align:center;color:#999;font-size:12px;'>
                This is an automated notification from the Lab Trend Analytics System.<br>
                Contact your IT administrator to modify alert preferences.
            </div>
        </div>
        """
        
        # Send email
        # In production, look up coordinator email from a coordinators table
        message = {
            'senderAddress': f'LabTrends@<verified-domain>.azurecomm.net',
            'recipients': {
                'to': [{'address': f'coordinator-{coord_id}@practice.com'}]  # Replace with actual lookup
            },
            'content': {
                'subject': f'Lab Trend Digest: {total_open} Open Alerts ({critical_count} Critical) - {datetime.now().strftime("%m/%d")}',
                'html': html_body
            }
        }
        
        try:
            email_client.begin_send(message)
            logger.info(f'Digest sent to coordinator {coord_id}')
        except Exception as e:
            logger.error(f'Failed to send digest to {coord_id}: {str(e)}')
    
    cur.close()
    conn.close()


def _severity_color(severity: str) -> str:
    return {
        'critical': '#dc3545',
        'high': '#fd7e14', 
        'medium': '#ffc107',
        'low': '#17a2b8'
    }.get(severity, '#6c757d')


# Azure Function entry point
def main(timer):
    generate_daily_digest()

Testing & Validation

  • CONNECTIVITY TEST: From the Mirth Connect server, verify HL7 MLLP connectivity to the EHR/LIS interface engine by sending an ACK request to the configured HL7 listener port. Use the Mirth Channel Statistics dashboard to confirm messages are being received. Expected: At least 1 HL7 ORU message received within 30 minutes during business hours when labs are being resulted.
  • FHIR SERVER TEST: Execute a FHIR metadata query against the Azure Health Data Services endpoint: curl -H 'Authorization: Bearer <token>' https://hw-clientname-labtrends-fhir-clientname.fhir.azurehealthcareapis.com/metadata. Verify the response contains a CapabilityStatement with resourceType entries for Patient, Observation, and Practitioner.
  • DATA TRANSFORMATION TEST: Send a sample HL7 ORU^R01 message through the Mirth Connect LAB_RESULTS_INBOUND channel containing a known LOINC code (e.g., 4548-4 for HbA1c with value 7.2). Verify that: (1) The Mirth channel processes without errors, (2) A FHIR Observation resource is created in the Azure FHIR server with the correct LOINC code and value, (3) The PostgreSQL lab_results table contains a corresponding row with matching data.
  • LOINC MAPPING TEST: Send an HL7 message with a proprietary lab code through the integration engine. Verify that: (1) If a mapping exists, the result is correctly translated to the LOINC code, (2) If no mapping exists, the code is logged in the unmapped_codes table for review, (3) Run the unmapped codes report and confirm the proprietary code appears with any fuzzy-match suggestions.
  • ETL PIPELINE TEST: After sending 10 test lab results through Mirth, wait 15 minutes for the ETL cycle, then query PostgreSQL: SELECT COUNT(*) FROM lab_data.lab_results WHERE created_at > NOW() - INTERVAL '1 hour'. Verify count matches the number of test results sent. Check for data accuracy by comparing value_numeric, loinc_code, and effective_date against the original HL7 messages.
  • TREND DETECTION TEST — RISING TREND: Insert synthetic lab data for a test patient with 4 HbA1c results over 12 months showing a clear upward trend (6.5, 7.0, 7.4, 7.9). Run the trend detection engine manually. Verify: (1) An alert is generated with alert_type='trend_slope' and severity='high', (2) The trend_slope value is positive, (3) The message accurately describes the trend direction and values.
  • TREND DETECTION TEST — THRESHOLD CROSSING: Insert synthetic lab data for a test patient with 2 consecutive ALT results above the threshold (e.g., 78, 92 with threshold of 56). Run detection. Verify an alert is generated with alert_type='threshold_crossing'.
  • TREND DETECTION TEST — MISSING FOLLOWUP: Insert a test patient with last HbA1c result dated 8 months ago with value 8.2. Run detection. Verify an alert is generated with alert_type='missing_followup'.
  • DEDUPLICATION TEST: Run the trend detection engine twice within 5 minutes. Verify that the second run does NOT generate duplicate alerts for the same patient/LOINC code combination. Check the trend_alerts table to confirm only one alert per patient per rule within the 7-day deduplication window.
  • ALERT NOTIFICATION TEST — EMAIL: Trigger a high-severity alert for a test patient and verify: (1) Email arrives at the designated care coordinator address within 5 minutes, (2) Email contains correct patient name, MRN, lab test name, trend description, and dashboard link, (3) Email renders correctly in Outlook/Gmail, (4) The dashboard link navigates to the correct patient view.
  • ALERT NOTIFICATION TEST — SMS: Trigger a critical-severity alert and verify an SMS message arrives at the care coordinator's mobile phone within 2 minutes. Verify the message is under 160 characters and contains actionable information.
  • DASHBOARD TEST: Log into Apache Superset as a care coordinator role user. Verify: (1) The Active Alerts Queue shows all unresolved alerts sorted by severity, (2) Clicking a patient name navigates to the Patient Lab Trend Detail view, (3) The lab trend chart correctly displays historical values with reference range lines, (4) Filters work correctly (severity, date range, provider panel), (5) Export to CSV works for outreach list generation.
  • ALERT API TEST: Using curl or Postman with a valid JWT token: (1) GET /api/alerts?status=new returns the expected alerts, (2) POST /api/alerts/{id}/acknowledge updates the alert status and sets acknowledged_at timestamp, (3) POST /api/alerts/{id}/resolve with resolution notes updates status to 'resolved', (4) GET /api/patients/{id}/lab-history returns correct lab history. Verify all endpoints return 401 without a valid token.
  • RBAC TEST: Log in as three different role types (care coordinator, provider, admin) and verify: (1) Care coordinators can view alerts and patient data but cannot access SQL Lab, (2) Providers can view dashboards in read-only mode, (3) Admins have full access including SQL Lab and user management. Attempt to access a restricted resource and confirm a 403 error.
  • HIPAA AUDIT LOG TEST: Perform several FHIR queries, dashboard accesses, and alert acknowledgments. Check Azure Log Analytics workspace to verify all PHI access events are captured with user identity, timestamp, resource accessed, and action performed. Run KQL query: AzureDiagnostics | where ResourceType == 'MICROSOFT.HEALTHCAREAPIS/WORKSPACES/FHIRSERVICES' | take 20.
  • VPN AND ENCRYPTION TEST: Capture network traffic on the practice LAN (using Wireshark) during a lab result flow and verify: (1) No PHI is visible in plaintext, (2) All traffic to Azure traverses the IPsec VPN tunnel, (3) FHIR API calls use TLS 1.2+. Verify: az network vpn-connection show shows connection status as 'Connected'.
  • BACKUP AND RECOVERY TEST: Trigger a backup of the PostgreSQL database and verify it completes successfully. Perform a test restore to a separate database instance and verify data integrity by comparing row counts and checksums against the production database.
  • PERFORMANCE TEST: With 10,000 patients and 200,000 lab results loaded, run the full trend detection analysis and measure execution time. Acceptable threshold: complete analysis in under 10 minutes. Dashboard page load time should be under 5 seconds for the Alert Queue and under 8 seconds for the Patient Trend Detail with 2 years of data.
  • CLINICAL VALIDATION: Present 20 randomly selected alerts to the practice medical director for clinical review. Track: (1) True positive rate (clinically actionable alerts), (2) False positive rate (alerts that don't warrant action), (3) Severity accuracy (was the alert classified at the right urgency level). Target: >80% true positive rate. If below 70%, return to rule tuning before proceeding to full go-live.

Client Handoff

The client handoff should be conducted as a structured half-day session with three distinct audiences.

Session 1 (1 hour) — Practice Leadership and Medical Director

  • Review project scope and objectives
  • Demonstrate the system end-to-end with real patient examples (use de-identified data if in training environment)
  • Review clinical rules and get formal sign-off on rule configurations
  • Discuss expected alert volumes and the phased rollout plan
  • Review quality metrics the system will track (alert response time, resolution rate, care gap closure improvement)
  • Establish the quarterly clinical rule review cadence
  • Present the ROI model (expected improvement in HEDIS/UDS scores, potential savings from avoided ED visits)

Session 2 (2 hours) — Care Coordinators (Primary Users)

  • Hands-on training with the Superset dashboard — navigating the alert queue, filtering by severity, viewing patient lab trends, acknowledging and resolving alerts with proper documentation, exporting patient lists for outreach campaigns
  • Walk through the daily digest email and explain each section
  • Practice the complete workflow: receive alert → review patient history in EHR → take action → document resolution in dashboard
  • Cover escalation procedures: when to escalate to a provider, when to call a patient directly, when to schedule an urgent appointment
  • Provide a printed Quick Reference Card (laminated, desk-sized) with: dashboard URL, login instructions, severity definitions, escalation contacts, and common resolution workflows

Session 3 (1 hour) — IT Staff / Office Manager

  • Review system architecture at a high level
  • Demonstrate the admin dashboard in Superset
  • Show how to add/remove user accounts
  • Review the support escalation path (internal IT → MSP helpdesk → MSP engineering)
  • Cover basic troubleshooting (Is Mirth running? Is VPN connected? Are alerts being generated?)
  • Review backup verification procedures
  • Hand over the complete documentation package

Documentation Package to Leave Behind

1
System Architecture Diagram showing all data flows
2
User Guide for Care Coordinators (20-page PDF with screenshots)
3
Admin Guide for IT staff (10-page PDF)
4
Clinical Rules Reference Document (signed by medical director)
5
HIPAA Compliance Documentation including BAAs, risk assessment update, data flow diagram, and access control matrix
6
Emergency Contact Card with MSP support numbers
7
Runbook for common issues and their resolutions

Success Criteria to Review Together

Note

Schedule a 30-day post-go-live review meeting before departing the handoff session.

Maintenance

Weekly Tasks (MSP Technician, 1-2 hours/week)

  • Check Mirth Connect channel statistics for error rates — investigate any channel with >2% error rate.
  • Review Azure Monitor alerts for failed ETL runs, FHIR API errors, or VM health issues.
  • Check VPN tunnel status and latency.
  • Review the unmapped LOINC codes report and map any new proprietary codes that have appeared (coordinate with clinical staff for verification).
  • Verify backup completion for PostgreSQL and integration server.
  • Monitor Azure cost dashboard for unexpected spend spikes.

Monthly Tasks (MSP Senior Engineer, 2-4 hours/month)

  • Apply OS security patches to integration VM and Superset host (schedule during off-hours maintenance window, typically Saturday 2-6 AM).
  • Update Mirth Connect and Superset Docker images if security patches are available.
  • Review FHIR server audit logs for unauthorized access attempts.
  • Generate and review the monthly quality metrics report: total alerts generated, acknowledgment rate, mean time to acknowledge, mean time to resolve, false positive rate (from care coordinator feedback).
  • Review Azure AD sign-in logs for the application.
  • Rotate service account credentials and API keys stored in Key Vault.
  • Test backup restoration (quarterly at minimum, monthly preferred).

Quarterly Tasks (MSP + Clinical Staff, 4-8 hours/quarter)

  • Clinical rule review meeting with the medical director — review alert effectiveness, adjust thresholds, add or retire rules based on 3 months of data.
  • Present quality metrics dashboard to practice leadership.
  • Review and update the HIPAA risk assessment with any system changes.
  • Refresh the PostgreSQL patient_baselines materialized view with updated statistics.
  • Performance tune PostgreSQL queries if dashboard response times have degraded (VACUUM ANALYZE, index review).
  • Review Mirth Connect logs for deprecated HL7 message formats or new interface requirements.
  • Update documentation for any system changes.

Semi-Annual Tasks

  • Comprehensive HIPAA Security Risk Assessment review.
  • Penetration testing of externally-accessible components (VPN endpoint, any exposed APIs).
  • Review Azure subscription for cost optimization opportunities (Reserved Instances, right-sizing VMs).
  • Evaluate new features from EHR vendor that might improve integration (new FHIR API capabilities, bulk export support).
  • Update disaster recovery plan and test full system recovery.

Trigger-Based Maintenance

  • If the EHR system is upgraded or migrated, re-test all integration channels immediately — EHR upgrades are the #1 cause of integration failures.
  • If a new laboratory is added to the practice's reference lab network, configure a new Mirth channel and LOINC mappings.
  • If the practice adds providers or care coordinators, update Azure AD groups, Superset roles, and alert routing.
  • If alert volume exceeds 25/coordinator/day, schedule an emergency rule tuning session — this indicates alert fatigue risk.
  • If the trend detection engine execution time exceeds 15 minutes, investigate PostgreSQL query performance and consider scaling up the database tier.

SLA Considerations

  • Critical system down (no alerts flowing): 1-hour response, 4-hour resolution.
  • Alert delivery failure: 4-hour response, 8-hour resolution.
  • Dashboard unavailable: 8-hour response, 24-hour resolution.
  • Non-urgent (report request, minor UI issue): Next business day.

Include quarterly business review (QBR) meetings in the managed services contract. Recommend a 3-year managed services agreement with annual price escalation of 3-5% to account for infrastructure cost increases.

Alternatives

...

EHR-Native Population Health Module Activation (Option C)

Instead of building a custom integration and analytics platform, activate and configure the population health / care management module already included in the practice's existing EHR system (athenahealth Population Intelligence, eClinicalWorks Population Health with HEDIS Analytics, or NextGen Population Health Analytics). Most practices pay for these features in their EHR subscription but never configure them. The MSP would configure care gap rules, lab result alert thresholds, and care coordinator workflows within the native EHR environment.

Azara DRVS or Lightbeam Full Platform (Option A)

Partner with an established population health analytics vendor (Azara Healthcare DRVS or Lightbeam Health Solutions) as a channel reseller. Deploy their turn-key SaaS platform which includes pre-built lab trend analytics, care gap identification, quality measure reporting, and care coordinator workflow tools. The MSP handles the integration work (connecting EHR data to the platform) and provides ongoing managed services around hosting, security, and user support, while the vendor provides the analytics engine.

Medplum Open-Source FHIR Platform (Cost-Optimized Custom Build)

Replace Azure Health Data Services with Medplum's open-source FHIR platform (Apache 2.0 license) self-hosted on Azure VMs. Medplum provides a complete FHIR R4 server with built-in bot framework for custom automation, subscription/webhook support, and TypeScript/React SDK. This reduces the cloud FHIR cost from ~$194/month to the cost of hosting a VM (~$140/month) while adding programmability features that Azure FHIR lacks.

Pearl Practice Intelligence (Dental-Specific)

For dental practices specifically, deploy Pearl's Practice Intelligence platform which provides AI-powered clinical quality analysis, financial performance metrics, and appointment analytics with native integrations to Open Dental, Dentrix, Eaglesoft, and Carestack. Pearl focuses on dental radiograph AI analysis and practice performance rather than medical lab trends, but addresses the dental vertical's specific needs.

Want early access to the full toolkit?