
Implementation Guide: Gather loss runs and applications and submit to multiple carriers for quotes
Step-by-step implementation guide for deploying AI to gather loss runs and applications and submit to multiple carriers for quotes for Insurance Agencies clients.
Hardware Procurement
Ricoh fi-8170 Document Scanner
$699 per unit (MSP cost) / $950 suggested resale with setup
High-volume duplex document scanner (70 ppm) for digitizing paper loss runs, ACORD applications, and supplemental forms received by mail or hand-delivery. Features a 100-sheet ADF, USB 3.2 and Gigabit Ethernet connectivity, and PaperStream IP driver for enhanced OCR preprocessing. Connects to the document ingestion pipeline via network scanning to a monitored hot folder or SharePoint library.
Fujitsu ScanSnap iX1600 Document Scanner
$413 per unit (MSP cost) / $550 suggested resale with setup
Mid-volume Wi-Fi-enabled scanner (40 ppm) for branch offices or individual producer desks. Features one-touch cloud-direct scanning profiles that can be configured to send directly to SharePoint or a monitored inbox folder. Used as secondary/branch scanning stations when the agency has multiple locations or producers who receive paper documents at their desks.
Dell OptiPlex 7020 Micro Desktop
$950 per unit (MSP cost) / $1,250 suggested resale with setup and imaging
Standard business workstations for agency staff operating the AI submission workflow dashboard. All AI processing is cloud-based so no GPU is needed; the workstation only needs a modern browser and reliable network connectivity. These units serve as the primary workstations for the account managers / CSRs who review AI-extracted data and approve submissions. Quantity depends on number of staff interacting with the system.
Ubiquiti UniFi U6-Pro Access Point
$150 per unit (MSP cost) / $220 suggested resale with installation
Reliable Wi-Fi 6 coverage is required for the ScanSnap iX1600 wireless scanners and for ensuring consistent cloud connectivity to AI services and AMS platforms. Install one per floor/zone of the agency office.
APC Smart-UPS 1500VA LCD
$550 per unit (MSP cost) / $725 suggested resale with installation
Uninterruptible power supply for network equipment and any on-premise infrastructure (router, switch, NAS if used). Ensures the document scanning pipeline and network connectivity are not disrupted by power events during batch submission processing.
Software Procurement
Microsoft 365 Business Premium
$22/user/month (MSP cost) / $33/user/month suggested resale; typical agency 10–30 users = $220–$660/month MSP cost
Core productivity and identity platform. Provides Exchange Online for email ingestion of loss run PDFs, SharePoint Online for document storage and hot-folder scanning targets, Azure AD (Entra ID) for SSO and MFA across all platforms, Microsoft Teams for collaboration, and the Microsoft Graph API for programmatic email and file access. Business Premium tier includes Intune and Azure AD P1 for security compliance.
Azure OpenAI Service (GPT-4.1 / GPT-4.1-mini)
GPT-4.1-mini: $0.40/1M input + $1.60/1M output tokens; GPT-4.1: $2.00/1M input + $8.00/1M output tokens. Typical agency processing 200 submissions/month: ~$50–$200/month in API costs; resell bundled into managed service
Primary LLM engine for document understanding, data extraction from loss run PDFs and ACORD applications, intelligent field mapping, carrier-specific form population logic, and quote comparison summarization. Azure OpenAI is preferred over direct OpenAI API for enterprise data residency guarantees (US-only processing), Azure RBAC integration, content filtering, and compliance certifications (SOC 2, HIPAA BAA available).
Azure AI Document Intelligence
Read model: $1.25/100 pages; Prebuilt models: $10/1,000 pages. Typical agency: $25–$100/month; resell bundled into managed service
OCR and document structure extraction engine. Processes scanned PDFs and images of loss runs, ACORD forms, and carrier correspondence into structured text with layout preservation, table extraction, and key-value pair detection. Used as the first stage of the document intelligence pipeline before LLM-based semantic extraction.
Tarmika Bridge
Contact vendor for agency-specific pricing; estimated $200–$500/month based on agency size and carrier count. 26% market share — leading commercial quoting platform
Primary commercial lines quoting and carrier submission platform. Provides single-entry rating for small business commercial insurance with streamlined underwriting questions and extensive NAICS mapping. The AI agent submits populated application data to Tarmika via its API, which then distributes to connected carriers. Supports the broadest carrier panel for small commercial.
Semsee
Contact vendor; free tier available for basic access; premium plans for API access and market access program. Estimated $100–$400/month
Secondary/alternative commercial quoting platform. Provides rapid multi-carrier quoting (quotes back within minutes from 4+ carriers). Particularly valuable for its Market Access Program that grants agencies access to additional carrier markets without direct appointments. Used as a fallback or supplementary submission channel alongside Tarmika.
Applied Epic / Vertafore AMS360 / HawkSoft (existing AMS)
Already licensed by agency; Applied Epic ~$150/user/mo; AMS360 ~$150/user/mo; HawkSoft ~$94/user/mo + base fee. MSP assists with API access enablement — no additional license cost
The agency's existing system of record. The AI agent reads client data, policy details, and contact information from the AMS via API to pre-populate applications. Completed quotes and submission records are written back to the AMS for the agency's workflow continuity. The MSP must ensure API access is enabled and properly credentialed.
LangGraph (Agent Orchestration Framework)
$0 for the library; hosting costs covered under Azure VM below
Core multi-agent orchestration framework. Implements the stateful submission workflow as a directed graph with nodes for document intake, data extraction, AMS lookup, form population, human review, carrier submission, and quote monitoring. LangGraph provides built-in persistence, checkpointing (for human-in-the-loop pauses), and parallel execution (for simultaneous multi-carrier submission).
LangSmith (Observability Platform)
Developer tier: free (1 seat, 5,000 traces/month); Plus tier: $39/seat/month; $39–$78/month bundled into managed service fee
Production monitoring, tracing, and debugging platform for the LangGraph agent. Captures every LLM call, tool invocation, and decision point with full input/output logging. Critical for debugging extraction errors, monitoring accuracy metrics, tracking API costs, and providing compliance audit trails for every AI-processed submission.
Self-hosted: free software + ~$100–$200/month Azure VM hosting; Cloud: $50/month (Pro plan). Resell as part of managed service at $500–$800/month bundled
Low-code workflow automation platform connecting all integration points: email monitoring triggers, document preprocessing dispatch, AMS API calls, LangGraph agent invocation, Tarmika/Semsee API submission, and notification/approval workflows. n8n serves as the integration backbone and scheduling layer that orchestrates the end-to-end pipeline without requiring custom code for each connector.
Azure Virtual Machine (Agent Runtime)
Standard_D2s_v5 (2 vCPU, 8GB RAM): ~$70/month; Standard_D4s_v5 (4 vCPU, 16GB RAM): ~$140/month for higher workloads. Add $20–$40/month for managed disks and networking
Hosts the self-managed n8n instance, LangGraph agent runtime (Python), Redis for task queuing, and PostgreSQL for agent state persistence. Deployed in Azure East US or West US 2 region for proximity to Azure OpenAI endpoints. The MSP manages this VM as part of the ongoing managed service.
Azure Blob Storage
~$0.018/GB/month (Hot tier); typical agency needs 100–500GB = $2–$9/month
Archival storage for all processed documents, extracted data JSON files, populated ACORD form PDFs, carrier responses, and audit logs. Provides immutable storage option for compliance record retention. Lifecycle policies automatically tier older documents to Cool/Archive storage for cost optimization.
Indio (Application Management Platform)
Starting at $50/user/month; typical agency 5–10 users = $250–$500/month
Optional but recommended turnkey application management platform. Provides smart form technology for insurance application data gathering, client-facing digital application portals, and single-click submission to carriers. Can serve as an intermediate layer between the AI agent and carrier submission, providing validated form templates and submission tracking. Particularly valuable if the agency wants a branded client-facing portal.
PostgreSQL (Azure Database for PostgreSQL - Flexible Server)
Burstable B1ms (1 vCore, 2GB): ~$25/month; General Purpose D2s (2 vCores, 8GB): ~$100/month
Persistent storage for LangGraph agent state, submission tracking records, extracted data cache, carrier response history, and operational metadata. Used by n8n for workflow execution history. PaaS deployment provides automated backups, high availability, and managed patching.
Prerequisites
- Active Agency Management System (AMS) subscription — Applied Epic, Vertafore AMS360, HawkSoft, or EZLynx — with API access enabled. Contact the AMS vendor to request API credentials and verify the agency's plan includes API access (some plans require an upgrade).
- Active subscription to at least one commercial quoting/submission platform — Tarmika Bridge (preferred) and/or Semsee. Verify API access is included in the subscription tier; contact the vendor if only the web portal is available.
- Microsoft 365 Business Premium (or higher) for all agency staff who will interact with the AI submission workflow. Required for Exchange Online (email ingestion), SharePoint (document storage), Azure AD/Entra ID (SSO and MFA), and Microsoft Graph API access.
- Business-grade internet service: minimum 100 Mbps download / 50 Mbps upload with <50ms latency to Azure US data centers. Verify with a speed test to Azure East US or West US 2 (use Azure Speed Test tool).
- Azure subscription provisioned under the MSP's Azure tenant (for managing AI services) or under the client's tenant with MSP granted RBAC Contributor access. An Azure OpenAI resource can only be created with an approved Azure OpenAI access application (apply at https://aka.ms/oai/access).
- Complete inventory of the agency's active carrier appointments, including: carrier names, lines of business written with each carrier, carrier portal URLs and login credentials (for RPA fallback), and whether each carrier is accessible via Tarmika/Semsee or requires direct portal submission.
- List of all ACORD form types the agency commonly uses (typically ACORD 125 - Commercial Insurance Application, ACORD 126 - Commercial General Liability, ACORD 130 - Workers Compensation, ACORD 140 - Property, plus carrier-specific supplemental applications).
- Sample documents for AI training: at least 10 representative loss run PDFs (from different carriers — each carrier formats loss runs differently), 10 completed ACORD applications, and 5 carrier-specific supplemental applications. These will be used to build and test the extraction prompts.
- Domain admin or Global Admin access to the agency's Microsoft 365 tenant for configuring mail flow rules, SharePoint sites, Azure AD app registrations, and API permissions.
- Written authorization from the agency principal/owner approving: (a) AI processing of client PII data, (b) API integration with their AMS, (c) automated submission to carrier platforms with human-in-the-loop approval, and (d) data processing via Azure OpenAI (with data residency in US). This authorization should be documented in the project SOW.
- Current Written Information Security Plan (WISP) or willingness to create one. The WISP must be updated to include the AI system's data flows, vendor risk assessments, and incident response procedures per GLBA/FTC Safeguards Rule and applicable NAIC Model Law requirements.
- Python 3.11+ development environment available to the MSP build team, with access to GitHub/GitLab for version-controlled deployment of agent code and configuration.
Installation Steps
...
Step 1: Provision Azure Infrastructure
Create the Azure resource group and deploy all required Azure services: Virtual Machine for the agent runtime, Azure OpenAI resource with GPT-4.1 and GPT-4.1-mini model deployments, Azure AI Document Intelligence resource, Azure Database for PostgreSQL Flexible Server, Azure Blob Storage account, and Virtual Network with NSG rules. All resources should be deployed in the same Azure region (East US 2 recommended for best Azure OpenAI availability).
az login
az account set --subscription '<MSP-Azure-Subscription-ID>'
az group create --name rg-insurance-ai-agent --location eastus2
# Create Virtual Network
az network vnet create --resource-group rg-insurance-ai-agent --name vnet-agent --address-prefix 10.0.0.0/16 --subnet-name subnet-agent --subnet-prefix 10.0.1.0/24
# Create NSG with restricted inbound rules
az network nsg create --resource-group rg-insurance-ai-agent --name nsg-agent
az network nsg rule create --resource-group rg-insurance-ai-agent --nsg-name nsg-agent --name AllowSSH --priority 1000 --source-address-prefixes '<MSP-Office-IP>/32' --destination-port-ranges 22 --access Allow --protocol Tcp --direction Inbound
az network nsg rule create --resource-group rg-insurance-ai-agent --nsg-name nsg-agent --name AllowHTTPS --priority 1010 --source-address-prefixes '<MSP-Office-IP>/32' '<Client-Office-IP>/32' --destination-port-ranges 443 --access Allow --protocol Tcp --direction Inbound
# Create VM for agent runtime
az vm create --resource-group rg-insurance-ai-agent --name vm-agent-runtime --image Ubuntu2404LTSGen2 --size Standard_D4s_v5 --admin-username mspadmin --generate-ssh-keys --vnet-name vnet-agent --subnet subnet-agent --nsg nsg-agent --public-ip-sku Standard
# Create Azure OpenAI resource
az cognitiveservices account create --name oai-insurance-agent --resource-group rg-insurance-ai-agent --kind OpenAI --sku S0 --location eastus2
# Deploy GPT-4.1-mini model
az cognitiveservices account deployment create --name oai-insurance-agent --resource-group rg-insurance-ai-agent --deployment-name gpt-4-1-mini --model-name gpt-4.1-mini --model-version 2025-04-14 --model-format OpenAI --sku-capacity 80 --sku-name Standard
# Deploy GPT-4.1 model
az cognitiveservices account deployment create --name oai-insurance-agent --resource-group rg-insurance-ai-agent --deployment-name gpt-4-1 --model-name gpt-4.1 --model-version 2025-04-14 --model-format OpenAI --sku-capacity 40 --sku-name Standard
# Create Document Intelligence resource
az cognitiveservices account create --name docai-insurance-agent --resource-group rg-insurance-ai-agent --kind FormRecognizer --sku S0 --location eastus2
# Create PostgreSQL Flexible Server
az postgres flexible-server create --resource-group rg-insurance-ai-agent --name pgdb-insurance-agent --location eastus2 --admin-user pgadmin --admin-password '<STRONG-PASSWORD>' --sku-name Standard_B2s --tier Burstable --storage-size 64 --version 16 --vnet vnet-agent --subnet subnet-pg --private-dns-zone-suffix postgres.database.azure.com
# Create Storage Account
az storage account create --name stinsuranceagent --resource-group rg-insurance-ai-agent --location eastus2 --sku Standard_LRS --kind StorageV2
az storage container create --name loss-runs --account-name stinsuranceagent
az storage container create --name acord-forms --account-name stinsuranceagent
az storage container create --name submissions --account-name stinsuranceagent
az storage container create --name audit-logs --account-name stinsuranceagentReplace placeholder values (<MSP-Azure-Subscription-ID>, <MSP-Office-IP>, <Client-Office-IP>, <STRONG-PASSWORD>) with actual values. Azure OpenAI access must be pre-approved — submit the application at https://aka.ms/oai/access at least 2 weeks before this step. The PostgreSQL subnet needs to be dedicated (no other resources); create it as a delegated subnet for Microsoft.DBforPostgreSQL/flexibleServers. Consider using Azure Key Vault for all secrets instead of passing passwords on the command line.
Step 2: Configure VM Runtime Environment
SSH into the Azure VM and install all required runtime dependencies: Python 3.11, Node.js 20 (for n8n), Docker (optional for containerized deployment), Redis for task queuing, and all Python packages for the LangGraph agent. Set up the project directory structure and configure systemd services for automatic startup.
ssh mspadmin@<VM-PUBLIC-IP>
# System updates
sudo apt update && sudo apt upgrade -y
# Install Python 3.11
sudo apt install -y python3.11 python3.11-venv python3.11-dev python3-pip
# Install Node.js 20 LTS for n8n
curl -fsSL https://deb.nodesource.com/setup_20.x | sudo -E bash -
sudo apt install -y nodejs
# Install Redis
sudo apt install -y redis-server
sudo systemctl enable redis-server && sudo systemctl start redis-server
# Install n8n globally
sudo npm install -g n8n
# Create project directory structure
sudo mkdir -p /opt/insurance-ai-agent/{agent,config,logs,scripts,templates}
sudo chown -R mspadmin:mspadmin /opt/insurance-ai-agent
# Create Python virtual environment
cd /opt/insurance-ai-agent
python3.11 -m venv venv
source venv/bin/activate
# Install Python dependencies
pip install --upgrade pip
pip install langgraph langchain langchain-openai langchain-community langsmith azure-ai-documentintelligence azure-storage-blob azure-identity openai httpx psycopg2-binary redis celery pydantic python-dotenv fastapi uvicorn jinja2 pdfplumber PyPDF2 python-docx openpyxl requests beautifulsoup4 playwright
# Install Playwright browsers for carrier portal RPA
plawright install chromium
plawright install-deps
# Create environment file
cat > /opt/insurance-ai-agent/.env << 'EOF'
AZURE_OPENAI_ENDPOINT=https://oai-insurance-agent.openai.azure.com/
AZURE_OPENAI_API_KEY=<YOUR-AZURE-OPENAI-KEY>
AZURE_OPENAI_DEPLOYMENT_MINI=gpt-4-1-mini
AZURE_OPENAI_DEPLOYMENT_FULL=gpt-4-1
AZURE_OPENAI_API_VERSION=2025-04-01-preview
AZURE_DOC_INTELLIGENCE_ENDPOINT=https://docai-insurance-agent.cognitiveservices.azure.com/
AZURE_DOC_INTELLIGENCE_KEY=<YOUR-DOC-AI-KEY>
AZURE_STORAGE_CONNECTION_STRING=<YOUR-STORAGE-CONN-STRING>
DATABASE_URL=postgresql://pgadmin:<PASSWORD>@pgdb-insurance-agent.postgres.database.azure.com:5432/insuranceagent
REDIS_URL=redis://localhost:6379/0
LANGCHAIN_TRACING_V2=true
LANGCHAIN_API_KEY=<YOUR-LANGSMITH-KEY>
LANGCHAIN_PROJECT=insurance-submission-agent
TARMIKA_API_KEY=<TARMIKA-API-KEY>
TARMIKA_API_URL=<TARMIKA-API-ENDPOINT>
SEMSEE_API_KEY=<SEMSEE-API-KEY>
SEMSEE_API_URL=<SEMSEE-API-ENDPOINT>
AMS_API_BASE_URL=<AMS-API-ENDPOINT>
AMS_API_KEY=<AMS-API-KEY>
MS_GRAPH_CLIENT_ID=<AZURE-AD-APP-CLIENT-ID>
MS_GRAPH_CLIENT_SECRET=<AZURE-AD-APP-CLIENT-SECRET>
MS_GRAPH_TENANT_ID=<AZURE-AD-TENANT-ID>
EOF
chmod 600 /opt/insurance-ai-agent/.envAll API keys and secrets should eventually be migrated to Azure Key Vault with managed identity access from the VM. The .env file is used for initial development and testing only. Playwright with Chromium is installed for RPA-based carrier portal submissions where API access is not available. Ensure the VM has at least 16GB RAM if running n8n + LangGraph agent + Redis concurrently.
Step 3: Configure Microsoft 365 Integration for Email Ingestion
Register an Azure AD (Entra ID) application for Microsoft Graph API access. This app will monitor the agency's shared submission inbox for incoming loss run PDFs and carrier correspondence, automatically downloading attachments and routing them to the document processing pipeline. Configure application-level permissions for Mail.Read, Mail.ReadWrite, and Files.ReadWrite.All.
- In Azure Portal > Azure Active Directory > App Registrations > New Registration — Name: Insurance-AI-Agent-Graph-App, Supported account types: Single tenant, Redirect URI: (leave blank for daemon app)
- After registration, note the Application (client) ID and Directory (tenant) ID
- Add API Permissions (Application type, not Delegated): Microsoft Graph > Application > Mail.Read, Mail.ReadWrite, Files.ReadWrite.All, User.Read.All
- Grant admin consent for the permissions
- Create a client secret: Certificates & secrets > New client secret > Description: 'AI Agent Access' > Expires: 24 months — Copy the secret value immediately, it won't be shown again
- Configure mail flow rule in Exchange Admin Center: Go to https://admin.exchange.microsoft.com > Mail flow > Rules
- Create rule: 'Copy Loss Runs to Shared Mailbox' — Condition: Subject contains 'loss run' OR 'loss history' OR attachment name contains 'loss_run' — Action: Bcc to submissions@<agency-domain>.com
- Create shared mailbox for AI agent monitoring (see PowerShell commands below)
- Create SharePoint document library 'AI-Processed-Submissions' with folders: /loss-runs, /applications, /carrier-responses, /audit-trail
Connect-ExchangeOnline
New-Mailbox -Shared -Name 'AI Submissions Inbox' -DisplayName 'AI Submissions Inbox' -Alias ai-submissions
Set-Mailbox -Identity ai-submissions -MessageCopyForSendOnBehalfEnabled $trueThe shared mailbox approach is preferred over monitoring individual mailboxes for privacy and compliance reasons. The AI agent only accesses the shared submissions mailbox, not personal mailboxes. Application permissions require Global Admin consent. Set the client secret expiration reminder in the MSP's ticketing system — secrets must be rotated before expiration. Consider using a certificate instead of client secret for production deployments for enhanced security.
Step 4: Initialize Database Schema and Storage
Connect to the PostgreSQL database and create the schema for tracking submissions, extracted data, carrier responses, and audit logs. Also configure Azure Blob Storage lifecycle policies for document retention and cost optimization.
source /opt/insurance-ai-agent/venv/bin/activate
cd /opt/insurance-ai-agent
# Create database
PGPASSWORD='<PASSWORD>' psql -h pgdb-insurance-agent.postgres.database.azure.com -U pgadmin -d postgres -c 'CREATE DATABASE insuranceagent;'
# Apply schema
PGPASSWORD='<PASSWORD>' psql -h pgdb-insurance-agent.postgres.database.azure.com -U pgadmin -d insuranceagent << 'EOSQL'
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE TABLE clients (
id UUID PRIMARY KEY DEFAULT uuid_ossp.uuid_generate_v4(),
ams_client_id VARCHAR(100) UNIQUE NOT NULL,
business_name VARCHAR(255) NOT NULL,
dba VARCHAR(255),
fein VARCHAR(20),
sic_code VARCHAR(10),
naics_code VARCHAR(10),
entity_type VARCHAR(50),
state_of_incorporation VARCHAR(2),
years_in_business INTEGER,
annual_revenue DECIMAL(15,2),
num_employees INTEGER,
primary_contact_name VARCHAR(255),
primary_contact_email VARCHAR(255),
primary_contact_phone VARCHAR(20),
mailing_address JSONB,
locations JSONB,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE loss_runs (
id UUID PRIMARY KEY DEFAULT uuid_ossp.uuid_generate_v4(),
client_id UUID REFERENCES clients(id),
carrier_name VARCHAR(255),
policy_number VARCHAR(100),
policy_type VARCHAR(50),
effective_date DATE,
expiration_date DATE,
claims JSONB,
total_incurred DECIMAL(15,2),
total_paid DECIMAL(15,2),
total_reserved DECIMAL(15,2),
loss_ratio DECIMAL(5,4),
experience_mod DECIMAL(5,3),
raw_text TEXT,
extraction_confidence DECIMAL(5,4),
source_document_url VARCHAR(500),
extracted_at TIMESTAMP DEFAULT NOW(),
reviewed_by VARCHAR(255),
reviewed_at TIMESTAMP
);
CREATE TABLE submissions (
id UUID PRIMARY KEY DEFAULT uuid_ossp.uuid_generate_v4(),
client_id UUID REFERENCES clients(id),
submission_type VARCHAR(50) NOT NULL,
lines_of_business TEXT[],
target_carriers TEXT[],
acord_forms_used TEXT[],
populated_data JSONB,
status VARCHAR(30) DEFAULT 'draft',
human_approved_by VARCHAR(255),
human_approved_at TIMESTAMP,
submitted_at TIMESTAMP,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE carrier_submissions (
id UUID PRIMARY KEY DEFAULT uuid_ossp.uuid_generate_v4(),
submission_id UUID REFERENCES submissions(id),
carrier_name VARCHAR(255) NOT NULL,
platform VARCHAR(50),
submission_method VARCHAR(30),
platform_reference_id VARCHAR(100),
status VARCHAR(30) DEFAULT 'pending',
quote_received BOOLEAN DEFAULT false,
quote_premium DECIMAL(15,2),
quote_details JSONB,
submitted_at TIMESTAMP,
response_received_at TIMESTAMP,
error_message TEXT
);
CREATE TABLE audit_log (
id BIGSERIAL PRIMARY KEY,
event_type VARCHAR(50) NOT NULL,
entity_type VARCHAR(50),
entity_id UUID,
actor VARCHAR(100),
details JSONB,
timestamp TIMESTAMP DEFAULT NOW()
);
CREATE INDEX idx_submissions_client ON submissions(client_id);
CREATE INDEX idx_submissions_status ON submissions(status);
CREATE INDEX idx_carrier_sub_submission ON carrier_submissions(submission_id);
CREATE INDEX idx_carrier_sub_status ON carrier_submissions(status);
CREATE INDEX idx_audit_log_entity ON audit_log(entity_type, entity_id);
CREATE INDEX idx_audit_log_timestamp ON audit_log(timestamp);
EOSQL
# Configure Azure Blob Storage lifecycle policy
az storage account management-policy create --account-name stinsuranceagent --resource-group rg-insurance-ai-agent --policy @- << 'EOF'
{
"rules": [
{
"name": "archiveOldDocuments",
"type": "Lifecycle",
"definition": {
"filters": { "blobTypes": ["blockBlob"] },
"actions": {
"baseBlob": {
"tierToCool": { "daysAfterModificationGreaterThan": 90 },
"tierToArchive": { "daysAfterModificationGreaterThan": 365 }
}
}
}
}
]
}
EOFThe JSONB fields (claims, populated_data, quote_details, locations, mailing_address) provide flexible schema for the varied data structures returned by different carriers and extracted from different document formats. The audit_log table is critical for compliance — every AI action must be logged. Consider enabling Azure PostgreSQL audit logging (pgAudit extension) for additional database-level auditing. Retention policies should align with the agency's document retention requirements (typically 7+ years for insurance records).
Step 5: Deploy n8n Workflow Automation Platform
Install and configure n8n as a systemd service on the Azure VM. n8n serves as the integration backbone connecting email monitoring, document processing dispatch, AMS API calls, LangGraph agent invocation, and notification workflows. Configure it with persistent PostgreSQL storage and set up the basic webhook and scheduling infrastructure.
# Create n8n data directory
sudo mkdir -p /opt/n8n-data
sudo chown mspadmin:mspadmin /opt/n8n-data
# Create n8n environment file
cat > /opt/n8n-data/.env << 'EOF'
N8N_PORT=5678
N8N_PROTOCOL=https
N8N_HOST=<VM-FQDN-OR-IP>
WEBHOOK_URL=https://<VM-FQDN-OR-IP>:5678/
N8N_ENCRYPTION_KEY=<GENERATE-RANDOM-32-CHAR-KEY>
DB_TYPE=postgresdb
DB_POSTGRESDB_HOST=pgdb-insurance-agent.postgres.database.azure.com
DB_POSTGRESDB_PORT=5432
DB_POSTGRESDB_DATABASE=n8n
DB_POSTGRESDB_USER=pgadmin
DB_POSTGRESDB_PASSWORD=<PASSWORD>
DB_POSTGRESDB_SSL_REJECT_UNAUTHORIZED=false
GENERIC_TIMEZONE=America/New_York
N8N_BASIC_AUTH_ACTIVE=true
N8N_BASIC_AUTH_USER=mspadmin
N8N_BASIC_AUTH_PASSWORD=<STRONG-N8N-PASSWORD>
EOF
# Create n8n database
PGPASSWORD='<PASSWORD>' psql -h pgdb-insurance-agent.postgres.database.azure.com -U pgadmin -d postgres -c 'CREATE DATABASE n8n;'
# Create systemd service for n8n
sudo tee /etc/systemd/system/n8n.service << 'EOF'
[Unit]
Description=n8n Workflow Automation
After=network.target redis-server.service
[Service]
Type=simple
User=mspadmin
WorkingDirectory=/opt/n8n-data
EnvironmentFile=/opt/n8n-data/.env
ExecStart=/usr/bin/n8n start
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
EOF
sudo systemctl daemon-reload
sudo systemctl enable n8n
sudo systemctl start n8n
# Verify n8n is running
sudo systemctl status n8n
curl -k https://localhost:5678/healthzFor production, place n8n behind an Nginx reverse proxy with a proper TLS certificate (Let's Encrypt). The basic auth is a minimum — consider integrating with Azure AD for SSO. Generate the N8N_ENCRYPTION_KEY with: openssl rand -hex 16. The n8n instance will be configured with workflows in a subsequent step after the LangGraph agent is deployed. n8n Community Edition is AGPL-licensed and free for self-hosting.
Step 6: Deploy LangGraph Agent Core
Deploy the main LangGraph-based autonomous agent that orchestrates the entire submission workflow. This agent implements a state machine with nodes for: email monitoring → document classification → OCR/extraction → AMS data enrichment → ACORD form population → human review gate → multi-carrier submission → quote monitoring → comparison report generation. The agent is deployed as a FastAPI service that n8n invokes via HTTP.
cd /opt/insurance-ai-agent/agent
source /opt/insurance-ai-agent/venv/bin/activate
# Create the main agent file
cat > /opt/insurance-ai-agent/agent/submission_agent.py << 'PYEOF'
# See custom_ai_components section for full implementation
# This is the deployment wrapper
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional, List
import uvicorn
from agent_graph import create_submission_graph, SubmissionState
from dotenv import load_dotenv
import logging
load_dotenv('/opt/insurance-ai-agent/.env')
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(title='Insurance Submission Agent API', version='1.0.0')
graph = create_submission_graph()
class SubmissionRequest(BaseModel):
client_ams_id: str
document_urls: List[str]
target_carriers: Optional[List[str]] = None
lines_of_business: List[str]
priority: str = 'normal'
class ApprovalRequest(BaseModel):
submission_id: str
approved: bool
reviewer_name: str
notes: Optional[str] = None
@app.post('/api/v1/submissions/start')
async def start_submission(req: SubmissionRequest, background_tasks: BackgroundTasks):
initial_state = SubmissionState(
client_ams_id=req.client_ams_id,
document_urls=req.document_urls,
target_carriers=req.target_carriers or [],
lines_of_business=req.lines_of_business,
priority=req.priority
)
# Run agent graph asynchronously
background_tasks.add_task(graph.ainvoke, initial_state)
return {'status': 'started', 'message': 'Submission workflow initiated'}
@app.post('/api/v1/submissions/{submission_id}/approve')
async def approve_submission(submission_id: str, req: ApprovalRequest):
# Resume the paused graph at the human review checkpoint
# Implementation in agent_graph.py
return {'status': 'approved' if req.approved else 'rejected'}
@app.get('/api/v1/submissions/{submission_id}/status')
async def get_status(submission_id: str):
# Query database for submission status
return {'submission_id': submission_id, 'status': 'in_progress'}
@app.get('/health')
async def health():
return {'status': 'healthy'}
if __name__ == '__main__':
uvicorn.run(app, host='0.0.0.0', port=8000)
PYEOF
# Create systemd service for the agent API
sudo tee /etc/systemd/system/insurance-agent.service << 'EOF'
[Unit]
Description=Insurance Submission AI Agent API
After=network.target redis-server.service postgresql.service
[Service]
Type=simple
User=mspadmin
WorkingDirectory=/opt/insurance-ai-agent
EnvironmentFile=/opt/insurance-ai-agent/.env
ExecStart=/opt/insurance-ai-agent/venv/bin/uvicorn agent.submission_agent:app --host 0.0.0.0 --port 8000 --workers 2
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
EOF
sudo systemctl daemon-reload
sudo systemctl enable insurance-agent
sudo systemctl start insurance-agent
# Verify agent is running
curl http://localhost:8000/healthThe full agent_graph.py implementation is provided in the custom_ai_components section. This step deploys the FastAPI wrapper that exposes the agent as an HTTP service for n8n to call. The agent runs with 2 Uvicorn workers to handle concurrent submissions. For agencies processing >50 submissions/day, increase workers to 4 and upgrade the VM to Standard_D8s_v5. The human-in-the-loop approval endpoint is critical — no submission goes to carriers without explicit human approval.
Step 7: Configure Document Intelligence Pipeline
Set up the Azure AI Document Intelligence pipeline for OCR and initial document structure extraction. Configure custom extraction models for the most common loss run formats and ACORD form types the agency encounters. This pipeline is called by the LangGraph agent during the extraction phase.
cd /opt/insurance-ai-agent
source venv/bin/activate
# Create the document intelligence module
cat > /opt/insurance-ai-agent/agent/doc_intelligence.py << 'PYEOF'
import os
from azure.ai.documentintelligence import DocumentIntelligenceClient
from azure.ai.documentintelligence.models import AnalyzeDocumentRequest, DocumentAnalysisFeature
from azure.core.credentials import AzureKeyCredential
from azure.storage.blob import BlobServiceClient
import json
import logging
logger = logging.getLogger(__name__)
class DocumentProcessor:
def __init__(self):
self.doc_client = DocumentIntelligenceClient(
endpoint=os.environ['AZURE_DOC_INTELLIGENCE_ENDPOINT'],
credential=AzureKeyCredential(os.environ['AZURE_DOC_INTELLIGENCE_KEY'])
)
self.blob_client = BlobServiceClient.from_connection_string(
os.environ['AZURE_STORAGE_CONNECTION_STRING']
)
def analyze_document(self, document_url: str = None, document_bytes: bytes = None) -> dict:
"""Run Azure Document Intelligence on a document and return structured output."""
if document_url:
poller = self.doc_client.begin_analyze_document(
model_id='prebuilt-layout',
analyze_request=AnalyzeDocumentRequest(url_source=document_url),
features=[DocumentAnalysisFeature.KEY_VALUE_PAIRS]
)
elif document_bytes:
poller = self.doc_client.begin_analyze_document(
model_id='prebuilt-layout',
body=document_bytes,
content_type='application/pdf',
features=[DocumentAnalysisFeature.KEY_VALUE_PAIRS]
)
else:
raise ValueError('Must provide document_url or document_bytes')
result = poller.result()
extracted = {
'content': result.content,
'pages': [],
'tables': [],
'key_value_pairs': []
}
for page in result.pages:
page_data = {
'page_number': page.page_number,
'width': page.width,
'height': page.height,
'lines': [{'content': line.content, 'confidence': line.spans[0].offset if line.spans else 0} for line in (page.lines or [])]
}
extracted['pages'].append(page_data)
for table in (result.tables or []):
table_data = {
'row_count': table.row_count,
'column_count': table.column_count,
'cells': [{'row': cell.row_index, 'col': cell.column_index, 'content': cell.content, 'kind': cell.kind} for cell in table.cells]
}
extracted['tables'].append(table_data)
for kvp in (result.key_value_pairs or []):
if kvp.key and kvp.value:
extracted['key_value_pairs'].append({
'key': kvp.key.content,
'value': kvp.value.content,
'confidence': kvp.confidence
})
return extracted
def upload_document(self, container: str, blob_name: str, data: bytes) -> str:
"""Upload a document to Azure Blob Storage and return the URL."""
blob_client = self.blob_client.get_blob_client(container=container, blob=blob_name)
blob_client.upload_blob(data, overwrite=True)
return blob_client.url
def classify_document(self, text_content: str) -> str:
"""Classify document type based on extracted text content."""
text_lower = text_content[:2000].lower()
if any(kw in text_lower for kw in ['loss run', 'loss history', 'claims history', 'loss experience']):
return 'loss_run'
elif any(kw in text_lower for kw in ['acord 125', 'commercial insurance application']):
return 'acord_125'
elif any(kw in text_lower for kw in ['acord 126', 'general liability']):
return 'acord_126'
elif any(kw in text_lower for kw in ['acord 130', 'workers compensation']):
return 'acord_130'
elif any(kw in text_lower for kw in ['acord 140', 'property section']):
return 'acord_140'
elif any(kw in text_lower for kw in ['supplemental', 'questionnaire']):
return 'supplemental_app'
elif any(kw in text_lower for kw in ['quote', 'proposal', 'indication']):
return 'carrier_quote'
else:
return 'unknown'
PYEOF
echo 'Document intelligence module deployed successfully'The prebuilt-layout model from Azure AI Document Intelligence handles most document types well. For agencies with highly specialized loss run formats (e.g., from niche carriers), consider training a custom extraction model using Azure AI Document Intelligence Studio (https://formrecognizer.appliedai.azure.com/studio). This requires 5+ labeled sample documents per format. The classification function uses keyword matching as a fast first pass — the LLM-based agent refines classification when keywords are ambiguous.
Step 8: Configure n8n Integration Workflows
Create the core n8n workflows that connect all system components: (1) Email monitoring workflow that checks the shared mailbox every 5 minutes for new documents, (2) Document processing dispatch workflow that routes documents through the AI pipeline, (3) Human approval notification workflow that sends Teams/email alerts when submissions are ready for review, and (4) Carrier quote monitoring workflow that checks for returned quotes.
- Access n8n at https://<VM-IP>:5678 and login with the basic auth credentials configured earlier
- WORKFLOW 1: Email Monitor — Runs on a 5-minute schedule: Trigger: Schedule (every 5 min) → Microsoft Graph: Get emails from ai-submissions@agency.com with hasAttachments=true and isRead=false → Loop: For each email → Microsoft Graph: Get attachments → Filter: Only PDF/DOCX/XLSX attachments → HTTP Request: Upload to Azure Blob Storage (loss-runs container) → HTTP Request: POST to Agent API /api/v1/documents/process → Microsoft Graph: Mark email as read → Slack/Teams: Notify 'New document received: {filename}'
- WORKFLOW 2: Submission Orchestration — Triggered by webhook from Agent API: Trigger: Webhook POST /webhook/submission-ready → HTTP Request: GET Agent API /api/v1/submissions/{id}/status → IF status == 'awaiting_review': Microsoft Teams: Send adaptive card to #submissions channel with review details + Email: Send notification to assigned CSR → ELSE IF status == 'approved': HTTP Request: POST Agent API /api/v1/submissions/{id}/submit → ELSE IF status == 'completed': HTTP Request: GET quotes comparison + Email: Send quote comparison to producer
- WORKFLOW 3: Quote Monitor — Runs every 30 minutes during business hours: Trigger: Schedule (every 30 min, Mon-Fri 8am-6pm) → Database: Query carrier_submissions WHERE status = 'submitted' AND response_received_at IS NULL → Loop: For each pending submission → HTTP Request: Check Tarmika API for quote status → HTTP Request: Check Semsee API for quote status → IF quote received: Database: Update carrier_submissions with quote details + HTTP Request: Notify Agent API of quote received
- Configure n8n credentials: (1) Microsoft Graph OAuth2 — Use the App Registration from Step 3, (2) HTTP Header Auth for Agent API — Bearer token, (3) PostgreSQL — Direct database connection, (4) Azure Blob Storage — Connection string
echo 'Configure n8n workflows via the web UI at https://<VM-IP>:5678'
echo 'Import workflow JSON templates from /opt/insurance-ai-agent/config/n8n-workflows/'The n8n workflows are best built in the visual editor. The commands above describe the logical flow — actual implementation is drag-and-drop in the n8n UI. Export workflows as JSON and store in the Git repository for version control and disaster recovery. The Microsoft Graph credential in n8n should use the OAuth2 flow with the app registration from Step 3. Set up n8n error workflows that send alerts to the MSP monitoring system when any workflow fails. Consider using n8n's built-in error handling nodes with retry logic for transient API failures.
Step 9: Configure AMS API Integration
Set up the API connection to the agency's AMS (Applied Epic, Vertafore AMS360, or HawkSoft). This integration allows the AI agent to pull client data for pre-populating applications and write back submission records and carrier responses. The specific implementation varies by AMS platform.
cd /opt/insurance-ai-agent
source venv/bin/activate
# Create AMS integration module (Applied Epic example)
cat > /opt/insurance-ai-agent/agent/ams_integration.py << 'PYEOF'
import os
import httpx
import logging
from typing import Optional, Dict, Any
logger = logging.getLogger(__name__)
class AMSClient:
"""
Unified AMS client that abstracts the specific AMS API.
Currently supports: Applied Epic, Vertafore AMS360, HawkSoft.
Set AMS_PLATFORM env var to: 'epic', 'ams360', or 'hawksoft'
"""
def __init__(self):
self.platform = os.environ.get('AMS_PLATFORM', 'epic')
self.base_url = os.environ['AMS_API_BASE_URL']
self.api_key = os.environ['AMS_API_KEY']
self.client = httpx.AsyncClient(
base_url=self.base_url,
headers=self._get_headers(),
timeout=30.0
)
def _get_headers(self) -> dict:
if self.platform == 'epic':
return {
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json',
'Accept': 'application/json'
}
elif self.platform == 'ams360':
return {
'Authorization': f'Basic {self.api_key}',
'Content-Type': 'application/json'
}
elif self.platform == 'hawksoft':
return {
'X-API-Key': self.api_key,
'Content-Type': 'application/json'
}
return {}
async def get_client(self, client_id: str) -> Dict[str, Any]:
"""Retrieve client/account details from AMS."""
if self.platform == 'epic':
response = await self.client.get(f'/api/v1/customers/{client_id}')
elif self.platform == 'ams360':
response = await self.client.get(f'/api/customers/{client_id}')
elif self.platform == 'hawksoft':
response = await self.client.get(f'/api/v4/contacts/{client_id}')
response.raise_for_status()
raw = response.json()
return self._normalize_client_data(raw)
async def get_policies(self, client_id: str) -> list:
"""Retrieve active policies for a client."""
if self.platform == 'epic':
response = await self.client.get(f'/api/v1/customers/{client_id}/policies')
elif self.platform == 'ams360':
response = await self.client.get(f'/api/customers/{client_id}/policies')
elif self.platform == 'hawksoft':
response = await self.client.get(f'/api/v4/contacts/{client_id}/policies')
response.raise_for_status()
return response.json()
async def create_activity(self, client_id: str, activity: dict) -> dict:
"""Create an activity/note in the AMS for audit trail."""
if self.platform == 'epic':
response = await self.client.post(
f'/api/v1/customers/{client_id}/activities',
json=activity
)
elif self.platform == 'ams360':
response = await self.client.post(
f'/api/customers/{client_id}/activities',
json=activity
)
elif self.platform == 'hawksoft':
response = await self.client.post(
f'/api/v4/contacts/{client_id}/notes',
json=activity
)
response.raise_for_status()
return response.json()
def _normalize_client_data(self, raw: dict) -> dict:
"""Normalize AMS-specific data format to our standard schema."""
if self.platform == 'epic':
return {
'ams_client_id': raw.get('customerId', ''),
'business_name': raw.get('name', ''),
'dba': raw.get('dba', ''),
'fein': raw.get('fein', ''),
'entity_type': raw.get('entityType', ''),
'sic_code': raw.get('sicCode', ''),
'naics_code': raw.get('naicsCode', ''),
'primary_contact': {
'name': raw.get('primaryContact', {}).get('name', ''),
'email': raw.get('primaryContact', {}).get('email', ''),
'phone': raw.get('primaryContact', {}).get('phone', '')
},
'mailing_address': raw.get('mailingAddress', {}),
'locations': raw.get('locations', [])
}
# Add normalization for ams360 and hawksoft
return raw
PYEOF
echo 'AMS integration module deployed. Set AMS_PLATFORM in .env to match client AMS.'AMS API access requires vendor approval. Applied Epic uses the Applied SDK — register at developer.appliedsystems.com. Vertafore AMS360 API access is obtained through your Vertafore account team. HawkSoft provides Open API documentation at developers.hawksoft.com. Each AMS has different authentication mechanisms (OAuth2, API key, Basic Auth) — the module abstracts these differences. Always test API connectivity with read-only operations before enabling write-back. The _normalize_client_data method must be expanded with the full field mapping during discovery when you have access to the actual AMS API responses.
Step 10: Configure Carrier Submission Integration
Set up the carrier submission layer that connects to Tarmika Bridge and Semsee APIs for multi-carrier quote submission. Also configure Playwright-based RPA for carriers that don't support API submission through these platforms. Map each of the agency's appointed carriers to the appropriate submission method.
cd /opt/insurance-ai-agent
source venv/bin/activate
# Create carrier submission module
cat > /opt/insurance-ai-agent/agent/carrier_submit.py << 'PYEOF'
import os
import httpx
import json
import logging
from typing import Dict, List, Optional, Any
from playwright.async_api import async_playwright
logger = logging.getLogger(__name__)
class CarrierSubmissionEngine:
def __init__(self):
self.tarmika_url = os.environ.get('TARMIKA_API_URL', '')
self.tarmika_key = os.environ.get('TARMIKA_API_KEY', '')
self.semsee_url = os.environ.get('SEMSEE_API_URL', '')
self.semsee_key = os.environ.get('SEMSEE_API_KEY', '')
self.carrier_config = self._load_carrier_config()
def _load_carrier_config(self) -> dict:
config_path = '/opt/insurance-ai-agent/config/carrier_mapping.json'
try:
with open(config_path) as f:
return json.load(f)
except FileNotFoundError:
logger.warning(f'Carrier config not found at {config_path}, using defaults')
return {}
async def submit_to_tarmika(self, submission_data: dict) -> dict:
"""Submit to carriers via Tarmika Bridge API."""
async with httpx.AsyncClient() as client:
response = await client.post(
f'{self.tarmika_url}/api/v1/submissions',
headers={
'Authorization': f'Bearer {self.tarmika_key}',
'Content-Type': 'application/json'
},
json={
'applicant': submission_data.get('applicant', {}),
'business_info': submission_data.get('business_info', {}),
'coverage_requested': submission_data.get('coverage', {}),
'loss_history': submission_data.get('loss_history', {}),
'target_carriers': submission_data.get('carriers', [])
},
timeout=60.0
)
response.raise_for_status()
return response.json()
async def submit_to_semsee(self, submission_data: dict) -> dict:
"""Submit to carriers via Semsee API."""
async with httpx.AsyncClient() as client:
response = await client.post(
f'{self.semsee_url}/api/submissions',
headers={
'Authorization': f'Bearer {self.semsee_key}',
'Content-Type': 'application/json'
},
json=submission_data,
timeout=60.0
)
response.raise_for_status()
return response.json()
async def submit_via_rpa(self, carrier_name: str, submission_data: dict) -> dict:
"""Submit to carrier portal via browser automation (Playwright)."""
carrier_cfg = self.carrier_config.get(carrier_name, {})
portal_url = carrier_cfg.get('portal_url', '')
username = carrier_cfg.get('username', '')
password = carrier_cfg.get('password', '')
if not portal_url:
raise ValueError(f'No portal configuration for carrier: {carrier_name}')
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context()
page = await context.new_page()
try:
# Login
await page.goto(portal_url)
await page.fill(carrier_cfg.get('username_selector', '#username'), username)
await page.fill(carrier_cfg.get('password_selector', '#password'), password)
await page.click(carrier_cfg.get('login_button_selector', 'button[type=submit]'))
await page.wait_for_load_state('networkidle')
# Navigate to new submission
await page.goto(carrier_cfg.get('new_submission_url', ''))
# Fill form fields from mapping
for field_map in carrier_cfg.get('field_mappings', []):
selector = field_map['selector']
data_key = field_map['data_key']
value = self._get_nested_value(submission_data, data_key)
if value:
field_type = field_map.get('type', 'text')
if field_type == 'text':
await page.fill(selector, str(value))
elif field_type == 'select':
await page.select_option(selector, str(value))
elif field_type == 'checkbox':
if value:
await page.check(selector)
# Upload loss run documents
for upload_map in carrier_cfg.get('upload_mappings', []):
upload_selector = upload_map['selector']
file_key = upload_map['file_key']
file_path = submission_data.get(file_key)
if file_path:
await page.set_input_files(upload_selector, file_path)
# Screenshot before submission for audit
screenshot = await page.screenshot(full_page=True)
# Submit (if auto_submit is enabled)
if carrier_cfg.get('auto_submit', False):
await page.click(carrier_cfg.get('submit_button_selector', ''))
await page.wait_for_load_state('networkidle')
confirmation = await page.text_content(carrier_cfg.get('confirmation_selector', 'body'))
else:
confirmation = 'MANUAL_SUBMIT_REQUIRED'
return {
'status': 'submitted' if carrier_cfg.get('auto_submit') else 'form_filled',
'confirmation': confirmation,
'screenshot': screenshot
}
finally:
await browser.close()
def _get_nested_value(self, data: dict, key_path: str):
keys = key_path.split('.')
value = data
for k in keys:
if isinstance(value, dict):
value = value.get(k)
else:
return None
return value
async def submit(self, carrier_name: str, submission_data: dict, method: str = 'auto') -> dict:
"""Route submission to the appropriate method."""
if method == 'auto':
carrier_cfg = self.carrier_config.get(carrier_name, {})
method = carrier_cfg.get('preferred_method', 'tarmika')
if method == 'tarmika':
return await self.submit_to_tarmika(submission_data)
elif method == 'semsee':
return await self.submit_to_semsee(submission_data)
elif method == 'rpa':
return await self.submit_via_rpa(carrier_name, submission_data)
else:
raise ValueError(f'Unknown submission method: {method}')
PYEOF
# Create carrier mapping configuration template
cat > /opt/insurance-ai-agent/config/carrier_mapping.json << 'EOF'
{
"_comment": "Populate this file during discovery with agency-specific carrier details",
"Hartford": {
"preferred_method": "tarmika",
"tarmika_carrier_id": "HARTFORD",
"lines": ["BOP", "WC", "GL", "CA"]
},
"Travelers": {
"preferred_method": "tarmika",
"tarmika_carrier_id": "TRAVELERS",
"lines": ["BOP", "WC", "GL", "PROP", "CA"]
},
"AmTrust": {
"preferred_method": "semsee",
"semsee_carrier_id": "AMTRUST",
"lines": ["WC", "GL", "BOP"]
},
"CNA": {
"preferred_method": "rpa",
"portal_url": "https://www.cna.com/web/guest/agentcenter",
"username": "AGENT_USERNAME",
"password": "ENCRYPTED_PASSWORD",
"username_selector": "#userId",
"password_selector": "#password",
"login_button_selector": "#loginBtn",
"new_submission_url": "https://www.cna.com/web/guest/agentcenter/new-business",
"auto_submit": false,
"field_mappings": [
{"selector": "#insuredName", "data_key": "applicant.business_name", "type": "text"},
{"selector": "#fein", "data_key": "applicant.fein", "type": "text"}
],
"upload_mappings": [
{"selector": "#lossRunUpload", "file_key": "loss_run_file_path"}
],
"lines": ["GL", "PROP", "UMBR"]
}
}
EOF
echo 'Carrier submission engine deployed. Update carrier_mapping.json with agency-specific carrier details.'The carrier mapping JSON must be customized per agency during the discovery phase. Carrier portal selectors (CSS selectors for RPA) are fragile and will break when carriers update their portals — this is the #1 ongoing maintenance burden. Budget 2–4 hours/month for carrier portal selector updates. For the initial deployment, start with Tarmika/Semsee API submissions only and add RPA for additional carriers in Phase 2. Store carrier portal credentials in Azure Key Vault, not in the JSON file — the example above is for illustration. Set auto_submit to false for RPA carriers during the initial deployment period; the agent fills the form and an agency staff member clicks Submit.
Step 11: Install and Configure Document Scanners
Install the Ricoh fi-8170 and ScanSnap iX1600 scanners at the agency office(s). Configure scanning profiles that automatically route scanned documents to the AI processing pipeline via SharePoint or a monitored network folder.
# Ricoh fi-8170 Setup:
# 1. Connect via Gigabit Ethernet to agency LAN
# 2. Access scanner web UI at assigned IP to configure network settings
# 3. Install PaperStream IP (TWAIN) driver on admin workstation: https://www.pfu.ricoh.com/global/scanners/fi/dl/
# 4. Install PaperStream Capture for batch scanning profiles
# 5. Configure scan profile 'Loss Runs':
# - Color mode: Auto
# - Resolution: 300 DPI
# - File format: Searchable PDF (PDF/A)
# - Destination: SharePoint > AI-Processed-Submissions > loss-runs
# - Naming: LossRun_{date}_{time}_{counter}.pdf
# 6. Configure scan profile 'Applications':
# - Same settings as above
# - Destination: SharePoint > AI-Processed-Submissions > applications
# ScanSnap iX1600 Setup:
# 1. Download ScanSnap Home from https://www.pfu.ricoh.com/global/scanners/scansnap/dl/
# 2. Connect scanner to Wi-Fi network during initial setup wizard
# 3. Create profiles on the scanner touchscreen:
# - Profile 1: 'Loss Runs' -> Scan to SharePoint (configure SharePoint connection)
# - Profile 2: 'Applications' -> Scan to SharePoint
# - Profile 3: 'General Docs' -> Scan to Email (forward to ai-submissions@agency.com)
# 4. Set default scan settings:
# - Quality: Better (300 DPI)
# - Color mode: Auto
# - File format: PDF (Searchable)
# - Blank page removal: ON
# - Multi-feed detection: ON
echo 'Scanner installation complete. Test with sample documents before going live.'Always scan at 300 DPI for optimal OCR accuracy — 200 DPI is acceptable but degrades extraction quality on fine print. Enable 'Searchable PDF' mode on both scanners, which runs a local OCR pass that gives Azure Document Intelligence a head start. The ScanSnap iX1600 can scan directly to SharePoint via its cloud connection feature (configured in ScanSnap Home). The Ricoh fi-8170 integrates with SharePoint through PaperStream Capture's output connectors. Test scanning with a sample loss run document and verify the file appears in the correct SharePoint library within 60 seconds.
Step 12: Deploy Review Dashboard and Human-in-the-Loop Interface
Deploy a lightweight web dashboard that agency staff use to review AI-extracted data, approve or reject submissions before they go to carriers, and monitor the status of all active submissions. This is built as a simple FastAPI + HTML/JS application served from the existing agent VM.
cd /opt/insurance-ai-agent
source venv/bin/activate
mkdir -p /opt/insurance-ai-agent/dashboard/templates
mkdir -p /opt/insurance-ai-agent/dashboard/static
# Create dashboard FastAPI app
cat > /opt/insurance-ai-agent/dashboard/app.py << 'PYEOF'
from fastapi import FastAPI, Request
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse
import httpx
import os
app = FastAPI(title='Insurance AI Submission Dashboard')
app.mount('/static', StaticFiles(directory='/opt/insurance-ai-agent/dashboard/static'), name='static')
templates = Jinja2Templates(directory='/opt/insurance-ai-agent/dashboard/templates')
AGENT_API = 'http://localhost:8000'
@app.get('/', response_class=HTMLResponse)
async def dashboard(request: Request):
async with httpx.AsyncClient() as client:
subs = await client.get(f'{AGENT_API}/api/v1/submissions')
return templates.TemplateResponse('dashboard.html', {
'request': request,
'submissions': subs.json() if subs.status_code == 200 else []
})
@app.get('/submission/{submission_id}', response_class=HTMLResponse)
async def submission_detail(request: Request, submission_id: str):
async with httpx.AsyncClient() as client:
sub = await client.get(f'{AGENT_API}/api/v1/submissions/{submission_id}')
return templates.TemplateResponse('submission_detail.html', {
'request': request,
'submission': sub.json() if sub.status_code == 200 else {}
})
@app.post('/submission/{submission_id}/approve')
async def approve(submission_id: str, request: Request):
form_data = await request.form()
async with httpx.AsyncClient() as client:
resp = await client.post(f'{AGENT_API}/api/v1/submissions/{submission_id}/approve', json={
'submission_id': submission_id,
'approved': True,
'reviewer_name': form_data.get('reviewer_name', 'Unknown'),
'notes': form_data.get('notes', '')
})
return {'status': 'approved'}
PYEOF
# Add dashboard to systemd (on port 8080)
sudo tee /etc/systemd/system/insurance-dashboard.service << 'EOF'
[Unit]
Description=Insurance AI Dashboard
After=insurance-agent.service
[Service]
Type=simple
User=mspadmin
WorkingDirectory=/opt/insurance-ai-agent/dashboard
EnvironmentFile=/opt/insurance-ai-agent/.env
ExecStart=/opt/insurance-ai-agent/venv/bin/uvicorn app:app --host 0.0.0.0 --port 8080
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
EOF
sudo systemctl daemon-reload
sudo systemctl enable insurance-dashboard
sudo systemctl start insurance-dashboard
echo 'Dashboard deployed at https://<VM-IP>:8080'The dashboard HTML templates (dashboard.html, submission_detail.html) should be developed during the build phase with the agency's branding. For production, place behind Nginx with TLS and Azure AD SSO authentication (using MSAL.js). The dashboard is intentionally simple — complex analytics can be added later. The critical feature is the approve/reject button that gates every carrier submission. Consider using Microsoft Power Apps or Retool as alternatives if the agency prefers a more polished UI with less custom development.
Step 13: End-to-End Integration Testing
Conduct comprehensive testing of the entire pipeline using real sample documents from the agency. Test each path: email ingestion → OCR → data extraction → AMS lookup → form population → human review → carrier submission. Validate accuracy, error handling, and audit logging at each stage.
cd /opt/insurance-ai-agent
source venv/bin/activate
# Run unit tests
python -m pytest tests/ -v
# Run integration test: Document processing pipeline
python -c "
import asyncio
from agent.doc_intelligence import DocumentProcessor
dp = DocumentProcessor()
result = dp.analyze_document(document_url='https://stinsuranceagent.blob.core.windows.net/loss-runs/test_loss_run.pdf')
print(f'Pages extracted: {len(result["pages"])}')
print(f'Tables found: {len(result["tables"])}')
print(f'Key-value pairs: {len(result["key_value_pairs"])}')
doc_type = dp.classify_document(result['content'])
print(f'Document type: {doc_type}')
"
# Run integration test: AMS connectivity
python -c "
import asyncio
from agent.ams_integration import AMSClient
ams = AMSClient()
client = asyncio.run(ams.get_client('TEST-CLIENT-ID'))
print(f'Client retrieved: {client["business_name"]}')
"
# Run integration test: Full agent workflow (dry run)
curl -X POST http://localhost:8000/api/v1/submissions/start \
-H 'Content-Type: application/json' \
-d '{
"client_ams_id": "TEST-CLIENT-001",
"document_urls": ["https://stinsuranceagent.blob.core.windows.net/loss-runs/test_loss_run.pdf"],
"lines_of_business": ["GL", "PROP"],
"target_carriers": ["Hartford", "Travelers"],
"priority": "normal"
}'
# Monitor agent execution in LangSmith
echo 'Check LangSmith dashboard at https://smith.langchain.com for trace details'
# Verify database records
PGPASSWORD='<PASSWORD>' psql -h pgdb-insurance-agent.postgres.database.azure.com -U pgadmin -d insuranceagent -c 'SELECT id, status, created_at FROM submissions ORDER BY created_at DESC LIMIT 5;'
# Verify audit logs
PGPASSWORD='<PASSWORD>' psql -h pgdb-insurance-agent.postgres.database.azure.com -U pgadmin -d insuranceagent -c 'SELECT event_type, actor, timestamp FROM audit_log ORDER BY timestamp DESC LIMIT 20;'Testing must use real agency documents (loss runs, ACORD apps) from the sample set collected during prerequisites. Never test with production carrier submissions — use Tarmika/Semsee sandbox/test environments if available, or set auto_submit=false for all carriers during testing. Document all extraction accuracy metrics: target ≥95% field-level accuracy for structured forms (ACORD), ≥90% for unstructured loss runs. Any field below confidence threshold must trigger human review flag. Testing should include edge cases: multi-page loss runs, handwritten ACORD forms, poorly scanned documents, and loss runs from less common carriers.
Step 14: Production Cutover and Go-Live
After successful testing, transition to production. Enable live email monitoring, switch carrier submission from test mode to production, conduct final UAT with agency staff, and activate monitoring and alerting. Implement a parallel-run period where AI-processed submissions are verified against manual submissions before full autonomous operation.
# 1. Switch to production mode
cd /opt/insurance-ai-agent
# Update .env with production API keys (replace test/sandbox keys)
# Verify all carrier API endpoints point to production
# 2. Enable email monitoring workflow in n8n
# In n8n UI: Activate 'Email Monitor' workflow
# In n8n UI: Activate 'Quote Monitor' workflow
# 3. Configure monitoring alerts
# Set up Azure Monitor alerts for the VM:
az monitor metrics alert create \
--name 'agent-vm-cpu-high' \
--resource-group rg-insurance-ai-agent \
--scopes /subscriptions/<SUB>/resourceGroups/rg-insurance-ai-agent/providers/Microsoft.Compute/virtualMachines/vm-agent-runtime \
--condition 'avg Percentage CPU > 85' \
--window-size 10m \
--action-group <MSP-ALERT-ACTION-GROUP>
az monitor metrics alert create \
--name 'agent-vm-disk-high' \
--resource-group rg-insurance-ai-agent \
--scopes /subscriptions/<SUB>/resourceGroups/rg-insurance-ai-agent/providers/Microsoft.Compute/virtualMachines/vm-agent-runtime \
--condition 'avg OS Disk Queue Depth > 10' \
--window-size 5m \
--action-group <MSP-ALERT-ACTION-GROUP>
# 4. Set up LangSmith alerts for agent failures
# In LangSmith dashboard: Configure webhook alerts for:
# - Run failure rate > 5%
# - Average latency > 120 seconds
# - Token usage > daily budget threshold
# 5. Start parallel run period (2 weeks)
echo 'PARALLEL_RUN_MODE=true' >> /opt/insurance-ai-agent/.env
echo 'In parallel run mode: AI submissions require manual verification before carrier submission'
# 6. After 2-week parallel run with >95% accuracy:
# echo 'PARALLEL_RUN_MODE=false' >> /opt/insurance-ai-agent/.env
# sudo systemctl restart insurance-agent
echo 'Production cutover complete. Monitoring active. Parallel run period: 2 weeks.'The parallel run period is critical for building agency confidence and catching edge cases. During this period, every AI submission is also manually processed by a CSR, and results are compared. Track: extraction accuracy, form population correctness, carrier acceptance rate, and time savings. Do not disable parallel mode until accuracy consistently exceeds 95% and the agency principal has signed off. Schedule daily check-in calls with the agency during the first week of go-live, then move to weekly during weeks 2–4.
Custom AI Components
Document Extraction Agent
Type: agent LangGraph-based agent node that takes OCR output from Azure Document Intelligence and uses GPT-4.1-mini to extract structured insurance data from loss runs and ACORD applications. Handles diverse document formats from different carriers, assigns confidence scores to each extracted field, and flags low-confidence fields for human review. This is the core intelligence of the system — it converts unstructured PDF documents into structured data that can populate carrier submission forms.
Implementation:
# /opt/insurance-ai-agent/agent/extraction_agent.py
import json
import logging
from typing import Dict, Any, List, Optional
from langchain_openai import AzureChatOpenAI
from pydantic import BaseModel, Field
logger = logging.getLogger(__name__)
# --- Pydantic models for structured extraction ---
class ClaimRecord(BaseModel):
claim_number: Optional[str] = None
date_of_loss: Optional[str] = None
claimant_name: Optional[str] = None
claim_type: Optional[str] = None
claim_status: str = 'unknown' # open, closed, reserved
total_incurred: Optional[float] = None
total_paid: Optional[float] = None
total_reserved: Optional[float] = None
description: Optional[str] = None
confidence: float = 0.0
class LossRunExtraction(BaseModel):
carrier_name: Optional[str] = None
policy_number: Optional[str] = None
named_insured: Optional[str] = None
policy_type: Optional[str] = None # GL, WC, PROP, AUTO, BOP, UMBR
policy_effective_date: Optional[str] = None
policy_expiration_date: Optional[str] = None
report_date: Optional[str] = None
valuation_date: Optional[str] = None
claims: List[ClaimRecord] = []
total_incurred: Optional[float] = None
total_paid: Optional[float] = None
total_reserved: Optional[float] = None
loss_ratio: Optional[float] = None
experience_mod: Optional[float] = None
overall_confidence: float = 0.0
extraction_notes: List[str] = []
class ACORDFieldExtraction(BaseModel):
form_type: str = '' # ACORD 125, 126, 130, 140, etc.
applicant_name: Optional[str] = None
dba: Optional[str] = None
mailing_address: Optional[Dict[str, str]] = None
fein: Optional[str] = None
sic_code: Optional[str] = None
naics_code: Optional[str] = None
entity_type: Optional[str] = None
business_description: Optional[str] = None
years_in_business: Optional[int] = None
annual_revenue: Optional[float] = None
num_employees: Optional[int] = None
requested_effective_date: Optional[str] = None
requested_expiration_date: Optional[str] = None
coverage_details: Dict[str, Any] = {}
locations: List[Dict[str, Any]] = []
additional_insureds: List[Dict[str, str]] = []
prior_carrier: Optional[str] = None
prior_policy_number: Optional[str] = None
prior_premium: Optional[float] = None
overall_confidence: float = 0.0
low_confidence_fields: List[str] = []
# --- Extraction prompts ---
LOSS_RUN_EXTRACTION_PROMPT = """You are an expert insurance data extraction specialist.
Analyze the following document text extracted from a loss run report and extract all
structured data.
IMPORTANT RULES:
1. Extract EVERY claim listed, even if some fields are missing.
2. For monetary amounts, extract as numbers without currency symbols.
3. For dates, use YYYY-MM-DD format.
4. If a field is unclear or potentially incorrect, set its confidence lower.
5. Calculate overall_confidence as the average confidence across all fields.
6. Add notes about any ambiguities or data quality issues in extraction_notes.
7. A loss ratio = total incurred / total premium (if premium is shown).
8. For Workers Comp loss runs, look for Experience Modification Rate (EMR/e-mod).
DOCUMENT TEXT:
{document_text}
TABLE DATA (if available):
{table_data}
KEY-VALUE PAIRS (if available):
{kv_pairs}
Extract the data as a JSON object matching this exact schema:
{schema}
"""
ACORD_EXTRACTION_PROMPT = """You are an expert insurance forms specialist familiar with
all ACORD form types. Analyze the following document text extracted from an ACORD
application form and extract all structured data.
IMPORTANT RULES:
1. Identify the ACORD form type (125, 126, 130, 140, etc.) from the header.
2. Extract ALL filled fields, paying special attention to:
- Named insured details (name, address, FEIN, entity type)
- Business description and classification codes (SIC, NAICS)
- Coverage requested (limits, deductibles, endorsements)
- Locations and their details
- Prior insurance information
3. For any field where the handwriting is unclear or the value seems
potentially incorrect, add the field name to low_confidence_fields.
4. Calculate overall_confidence as the percentage of fields successfully
extracted with high confidence.
DOCUMENT TEXT:
{document_text}
TABLE DATA (if available):
{table_data}
KEY-VALUE PAIRS (if available):
{kv_pairs}
Extract the data as a JSON object matching this exact schema:
{schema}
"""
class ExtractionAgent:
def __init__(self):
# Use GPT-4.1-mini for standard extractions (fast + cheap)
self.llm_fast = AzureChatOpenAI(
azure_deployment='gpt-4-1-mini',
api_version='2025-04-01-preview',
temperature=0.0,
max_tokens=4096
)
# Use GPT-4.1 for complex/low-confidence re-extractions
self.llm_strong = AzureChatOpenAI(
azure_deployment='gpt-4-1',
api_version='2025-04-01-preview',
temperature=0.0,
max_tokens=8192
)
self.CONFIDENCE_THRESHOLD = 0.85
async def extract_loss_run(self, doc_intelligence_output: dict) -> LossRunExtraction:
"""Extract structured data from a loss run document."""
prompt = LOSS_RUN_EXTRACTION_PROMPT.format(
document_text=doc_intelligence_output.get('content', '')[:15000],
table_data=json.dumps(doc_intelligence_output.get('tables', [])[:10], indent=2),
kv_pairs=json.dumps(doc_intelligence_output.get('key_value_pairs', []), indent=2),
schema=LossRunExtraction.model_json_schema()
)
# First pass with fast model
response = await self.llm_fast.ainvoke(prompt)
extraction = LossRunExtraction.model_validate_json(response.content)
# If confidence is low, re-extract with stronger model
if extraction.overall_confidence < self.CONFIDENCE_THRESHOLD:
logger.info(f'Low confidence ({extraction.overall_confidence:.2f}), re-extracting with GPT-4.1')
response = await self.llm_strong.ainvoke(prompt)
extraction = LossRunExtraction.model_validate_json(response.content)
extraction.extraction_notes.append('Re-extracted with GPT-4.1 due to low initial confidence')
return extraction
async def extract_acord_form(self, doc_intelligence_output: dict) -> ACORDFieldExtraction:
"""Extract structured data from an ACORD application form."""
prompt = ACORD_EXTRACTION_PROMPT.format(
document_text=doc_intelligence_output.get('content', '')[:15000],
table_data=json.dumps(doc_intelligence_output.get('tables', [])[:10], indent=2),
kv_pairs=json.dumps(doc_intelligence_output.get('key_value_pairs', []), indent=2),
schema=ACORDFieldExtraction.model_json_schema()
)
response = await self.llm_fast.ainvoke(prompt)
extraction = ACORDFieldExtraction.model_validate_json(response.content)
if extraction.overall_confidence < self.CONFIDENCE_THRESHOLD:
logger.info(f'Low confidence ({extraction.overall_confidence:.2f}), re-extracting with GPT-4.1')
response = await self.llm_strong.ainvoke(prompt)
extraction = ACORDFieldExtraction.model_validate_json(response.content)
return extraction
def needs_human_review(self, extraction) -> bool:
"""Determine if the extraction needs human review."""
if hasattr(extraction, 'overall_confidence'):
if extraction.overall_confidence < self.CONFIDENCE_THRESHOLD:
return True
if hasattr(extraction, 'low_confidence_fields'):
if len(extraction.low_confidence_fields) > 3:
return True
return FalseSubmission Orchestration Graph
Type: workflow The core LangGraph state machine that orchestrates the entire submission workflow from document intake to carrier quote collection. Implements a directed graph with conditional routing, parallel carrier submission, human-in-the-loop approval checkpointing, and error recovery. This is the 'brain' that coordinates all other components.
Implementation
# /opt/insurance-ai-agent/agent/agent_graph.py
import json
import logging
import uuid
from typing import Annotated, Any, Dict, List, Optional, TypedDict
from datetime import datetime
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.graph.message import add_messages
from agent.doc_intelligence import DocumentProcessor
from agent.extraction_agent import ExtractionAgent, LossRunExtraction, ACORDFieldExtraction
from agent.ams_integration import AMSClient
from agent.carrier_submit import CarrierSubmissionEngine
from agent.form_populator import ACORDFormPopulator
from agent.db import get_db_connection, save_submission, update_submission_status, save_audit_log
logger = logging.getLogger(__name__)
# --- State Definition ---
class SubmissionState(TypedDict):
# Input
submission_id: str
client_ams_id: str
document_urls: List[str]
target_carriers: List[str]
lines_of_business: List[str]
priority: str
# Processing state
documents_processed: List[Dict[str, Any]] # OCR results
extractions: List[Dict[str, Any]] # Structured extractions
client_data: Dict[str, Any] # From AMS
merged_application_data: Dict[str, Any] # Combined extraction + AMS data
populated_forms: Dict[str, Any] # ACORD forms ready for submission
# Review state
needs_review: bool
review_flags: List[str]
human_approved: bool
reviewer_name: str
review_notes: str
# Submission state
carrier_results: Dict[str, Dict[str, Any]] # Per-carrier submission results
quotes_received: Dict[str, Dict[str, Any]] # Per-carrier quotes
comparison_report: str
# Metadata
status: str
errors: List[str]
created_at: str
updated_at: str
# --- Node Functions ---
async def process_documents(state: SubmissionState) -> SubmissionState:
"""Node 1: Process all input documents through Azure Document Intelligence."""
logger.info(f'Processing {len(state["document_urls"])} documents')
processor = DocumentProcessor()
documents_processed = []
for url in state['document_urls']:
try:
result = processor.analyze_document(document_url=url)
doc_type = processor.classify_document(result['content'])
documents_processed.append({
'url': url,
'type': doc_type,
'ocr_result': result,
'status': 'processed'
})
logger.info(f'Processed document: {url} -> type: {doc_type}')
except Exception as e:
logger.error(f'Failed to process document {url}: {e}')
documents_processed.append({
'url': url,
'type': 'error',
'error': str(e),
'status': 'failed'
})
save_audit_log('documents_processed', 'submission', state['submission_id'],
'system', {'count': len(documents_processed)})
return {
**state,
'documents_processed': documents_processed,
'status': 'documents_processed',
'updated_at': datetime.utcnow().isoformat()
}
async def extract_data(state: SubmissionState) -> SubmissionState:
"""Node 2: Extract structured data from processed documents using LLM."""
logger.info('Extracting structured data from documents')
extractor = ExtractionAgent()
extractions = []
review_flags = []
for doc in state['documents_processed']:
if doc['status'] != 'processed':
continue
try:
if doc['type'] == 'loss_run':
extraction = await extractor.extract_loss_run(doc['ocr_result'])
if extractor.needs_human_review(extraction):
review_flags.append(f"Loss run from {extraction.carrier_name}: confidence {extraction.overall_confidence:.0%}")
extractions.append({
'doc_url': doc['url'],
'doc_type': 'loss_run',
'data': extraction.model_dump(),
'confidence': extraction.overall_confidence
})
elif doc['type'].startswith('acord_'):
extraction = await extractor.extract_acord_form(doc['ocr_result'])
if extractor.needs_human_review(extraction):
review_flags.append(f"ACORD form {extraction.form_type}: {len(extraction.low_confidence_fields)} low-confidence fields")
extractions.append({
'doc_url': doc['url'],
'doc_type': doc['type'],
'data': extraction.model_dump(),
'confidence': extraction.overall_confidence
})
except Exception as e:
logger.error(f'Extraction failed for {doc["url"]}: {e}')
review_flags.append(f"Extraction failed for {doc['url']}: {str(e)}")
save_audit_log('data_extracted', 'submission', state['submission_id'],
'system', {'extraction_count': len(extractions), 'flags': len(review_flags)})
return {
**state,
'extractions': extractions,
'review_flags': review_flags,
'status': 'data_extracted',
'updated_at': datetime.utcnow().isoformat()
}
async def enrich_from_ams(state: SubmissionState) -> SubmissionState:
"""Node 3: Pull client data from AMS and merge with extracted data."""
logger.info(f'Enriching with AMS data for client {state["client_ams_id"]}')
ams = AMSClient()
try:
client_data = await ams.get_client(state['client_ams_id'])
policies = await ams.get_policies(state['client_ams_id'])
client_data['existing_policies'] = policies
except Exception as e:
logger.error(f'AMS lookup failed: {e}')
client_data = {}
state['review_flags'].append(f'AMS lookup failed: {str(e)} - manual data entry required')
# Merge extracted data with AMS data (AMS data takes priority for identity fields)
merged = {
'applicant': {
'business_name': client_data.get('business_name', ''),
'dba': client_data.get('dba', ''),
'fein': client_data.get('fein', ''),
'entity_type': client_data.get('entity_type', ''),
'sic_code': client_data.get('sic_code', ''),
'naics_code': client_data.get('naics_code', ''),
'primary_contact': client_data.get('primary_contact', {}),
'mailing_address': client_data.get('mailing_address', {}),
'locations': client_data.get('locations', [])
},
'business_info': {},
'loss_history': [],
'coverage': {},
'existing_policies': client_data.get('existing_policies', [])
}
# Add extracted application data
for ext in state['extractions']:
if ext['doc_type'] == 'loss_run':
merged['loss_history'].append(ext['data'])
elif ext['doc_type'].startswith('acord_'):
# Merge ACORD data, preferring non-null values
acord_data = ext['data']
for key, value in acord_data.items():
if value and key not in ['overall_confidence', 'low_confidence_fields']:
if key == 'coverage_details':
merged['coverage'].update(value)
elif key == 'locations' and value:
merged['applicant']['locations'] = value
elif key in merged['applicant'] and not merged['applicant'].get(key):
merged['applicant'][key] = value
elif key not in merged['applicant']:
merged['business_info'][key] = value
save_audit_log('ams_enrichment', 'submission', state['submission_id'],
'system', {'client_found': bool(client_data)})
return {
**state,
'client_data': client_data,
'merged_application_data': merged,
'status': 'data_enriched',
'updated_at': datetime.utcnow().isoformat()
}
async def populate_forms(state: SubmissionState) -> SubmissionState:
"""Node 4: Populate ACORD forms and carrier-specific applications."""
logger.info('Populating submission forms')
populator = ACORDFormPopulator()
populated = {}
for lob in state['lines_of_business']:
try:
forms = populator.populate_for_line(lob, state['merged_application_data'])
populated[lob] = forms
except Exception as e:
logger.error(f'Form population failed for {lob}: {e}')
state['review_flags'].append(f'Form population failed for {lob}: {str(e)}')
# Determine if human review is needed
needs_review = (
len(state['review_flags']) > 0 or
any(ext['confidence'] < 0.85 for ext in state['extractions'])
)
save_audit_log('forms_populated', 'submission', state['submission_id'],
'system', {'lines': list(populated.keys()), 'needs_review': needs_review})
return {
**state,
'populated_forms': populated,
'needs_review': needs_review,
'status': 'awaiting_review' if needs_review else 'ready_to_submit',
'updated_at': datetime.utcnow().isoformat()
}
def should_route_to_review(state: SubmissionState) -> str:
"""Conditional edge: route to human review or directly to submission."""
# ALWAYS route to human review in production (safety first)
# Even if confidence is high, human approval is required
return 'human_review'
async def human_review_checkpoint(state: SubmissionState) -> SubmissionState:
"""Node 5: CHECKPOINT - Pause execution and wait for human approval.
This node saves state and returns. The graph is resumed when the
human approves/rejects via the dashboard API endpoint."""
logger.info(f'Submission {state["submission_id"]} awaiting human review')
# Save current state to database for dashboard display
save_submission(state)
# Send notification via n8n webhook
import httpx
try:
async with httpx.AsyncClient() as client:
await client.post('http://localhost:5678/webhook/submission-ready', json={
'submission_id': state['submission_id'],
'client_name': state['merged_application_data'].get('applicant', {}).get('business_name', 'Unknown'),
'lines': state['lines_of_business'],
'carriers': state['target_carriers'],
'flags': state['review_flags'],
'status': 'awaiting_review'
})
except Exception as e:
logger.warning(f'Failed to send review notification: {e}')
# Graph execution pauses here until resumed by approve API
return {
**state,
'status': 'awaiting_review',
'updated_at': datetime.utcnow().isoformat()
}
async def submit_to_carriers(state: SubmissionState) -> SubmissionState:
"""Node 6: Submit populated applications to all target carriers."""
if not state.get('human_approved', False):
logger.error('Attempted to submit without human approval!')
return {**state, 'status': 'error', 'errors': state['errors'] + ['No human approval']}
logger.info(f'Submitting to {len(state["target_carriers"])} carriers')
engine = CarrierSubmissionEngine()
carrier_results = {}
for carrier in state['target_carriers']:
try:
result = await engine.submit(
carrier_name=carrier,
submission_data=state['merged_application_data']
)
carrier_results[carrier] = {
'status': result.get('status', 'submitted'),
'reference_id': result.get('reference_id', ''),
'submitted_at': datetime.utcnow().isoformat(),
'platform': result.get('platform', 'unknown')
}
logger.info(f'Submitted to {carrier}: {result.get("status")}')
except Exception as e:
logger.error(f'Submission to {carrier} failed: {e}')
carrier_results[carrier] = {
'status': 'failed',
'error': str(e),
'submitted_at': datetime.utcnow().isoformat()
}
# Log to AMS
try:
ams = AMSClient()
await ams.create_activity(state['client_ams_id'], {
'type': 'submission',
'description': f'AI agent submitted to {len(carrier_results)} carriers: {", ".join(state["target_carriers"])}',
'details': json.dumps(carrier_results)
})
except Exception as e:
logger.warning(f'Failed to log to AMS: {e}')
save_audit_log('carriers_submitted', 'submission', state['submission_id'],
state.get('reviewer_name', 'system'),
{'carriers': list(carrier_results.keys()),
'results': {k: v['status'] for k, v in carrier_results.items()}})
return {
**state,
'carrier_results': carrier_results,
'status': 'submitted',
'updated_at': datetime.utcnow().isoformat()
}
async def generate_comparison(state: SubmissionState) -> SubmissionState:
"""Node 7: When quotes are received, generate comparison report."""
# This node is called by the quote monitoring workflow when quotes come in
if not state.get('quotes_received'):
return {**state, 'status': 'awaiting_quotes'}
from langchain_openai import AzureChatOpenAI
llm = AzureChatOpenAI(azure_deployment='gpt-4-1-mini', temperature=0.0)
prompt = f"""Generate a clear, professional quote comparison summary for an insurance agent.
Client: {state['merged_application_data'].get('applicant', {}).get('business_name', 'N/A')}
Lines of Business: {', '.join(state['lines_of_business'])}
Quotes Received:
{json.dumps(state['quotes_received'], indent=2)}
Format as a table with columns: Carrier | Premium | Deductible | Key Coverage Highlights | Notable Exclusions
Add a recommendation summary at the bottom noting best value and best coverage options.
"""
response = await llm.ainvoke(prompt)
return {
**state,
'comparison_report': response.content,
'status': 'quotes_compared',
'updated_at': datetime.utcnow().isoformat()
}
# --- Build the Graph ---
def create_submission_graph():
"""Create and compile the submission workflow graph."""
import os
# Set up PostgreSQL checkpointer for state persistence
checkpointer = PostgresSaver.from_conn_string(os.environ['DATABASE_URL'])
checkpointer.setup() # Creates checkpoint tables if not exist
# Build the graph
workflow = StateGraph(SubmissionState)
# Add nodes
workflow.add_node('process_documents', process_documents)
workflow.add_node('extract_data', extract_data)
workflow.add_node('enrich_from_ams', enrich_from_ams)
workflow.add_node('populate_forms', populate_forms)
workflow.add_node('human_review', human_review_checkpoint)
workflow.add_node('submit_to_carriers', submit_to_carriers)
workflow.add_node('generate_comparison', generate_comparison)
# Add edges (sequential flow with conditional routing)
workflow.set_entry_point('process_documents')
workflow.add_edge('process_documents', 'extract_data')
workflow.add_edge('extract_data', 'enrich_from_ams')
workflow.add_edge('enrich_from_ams', 'populate_forms')
workflow.add_conditional_edges(
'populate_forms',
should_route_to_review,
{
'human_review': 'human_review',
'submit': 'submit_to_carriers'
}
)
workflow.add_edge('human_review', 'submit_to_carriers') # After approval
workflow.add_edge('submit_to_carriers', 'generate_comparison')
workflow.add_edge('generate_comparison', END)
# Compile with checkpointing
graph = workflow.compile(checkpointer=checkpointer, interrupt_before=['human_review'])
return graphACORD Form Populator
Type: skill Takes merged application data (from AI extraction + AMS enrichment) and maps it to the correct fields on ACORD form templates. Supports ACORD 125 (Commercial Insurance Application), ACORD 126 (Commercial General Liability), ACORD 130 (Workers Compensation), and ACORD 140 (Property Section). Generates populated form data ready for submission via Tarmika/Semsee APIs or direct carrier portal entry.
Implementation:
# /opt/insurance-ai-agent/agent/form_populator.py
import json
import logging
from typing import Dict, Any, List
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
# ACORD form field mappings
# These map our normalized data schema to ACORD form field identifiers
# Field IDs correspond to standard ACORD XML element names
ACORD_125_FIELD_MAP = {
# Section 1: Applicant Information
'ApplicantName': 'applicant.business_name',
'ApplicantDBA': 'applicant.dba',
'ApplicantMailingAddr1': 'applicant.mailing_address.street1',
'ApplicantMailingAddr2': 'applicant.mailing_address.street2',
'ApplicantMailingCity': 'applicant.mailing_address.city',
'ApplicantMailingState': 'applicant.mailing_address.state',
'ApplicantMailingZip': 'applicant.mailing_address.zip',
'ApplicantPhone': 'applicant.primary_contact.phone',
'ApplicantEmail': 'applicant.primary_contact.email',
'ApplicantWebsite': 'applicant.website',
'ApplicantFEIN': 'applicant.fein',
'ApplicantSICCode': 'applicant.sic_code',
'ApplicantNAICSCode': 'applicant.naics_code',
'EntityType': 'applicant.entity_type',
'YearsInBusiness': 'business_info.years_in_business',
'AnnualRevenue': 'business_info.annual_revenue',
'NumEmployees': 'business_info.num_employees',
'BusinessDescription': 'business_info.business_description',
# Section 2: Policy Information
'ProposedEffDate': 'coverage.effective_date',
'ProposedExpDate': 'coverage.expiration_date',
# Section 3: Prior Insurance
'PriorCarrier': 'existing_policies.0.carrier_name',
'PriorPolicyNumber': 'existing_policies.0.policy_number',
'PriorPremium': 'existing_policies.0.premium',
'PriorEffDate': 'existing_policies.0.effective_date',
'PriorExpDate': 'existing_policies.0.expiration_date',
# Section 4: Nature of Business
'DescriptionOfOperations': 'business_info.business_description',
}
ACORD_126_FIELD_MAP = {
# GL-specific fields (extends ACORD 125)
'GLClassCode': 'coverage.gl.class_code',
'GLPremisesOperations': 'coverage.gl.premises_operations_limit',
'GLGeneralAggregate': 'coverage.gl.general_aggregate',
'GLProductsCompOps': 'coverage.gl.products_completed_ops',
'GLPersonalAdvInjury': 'coverage.gl.personal_advertising_injury',
'GLEachOccurrence': 'coverage.gl.each_occurrence',
'GLFireDamageLegal': 'coverage.gl.fire_damage_legal',
'GLMedicalExpense': 'coverage.gl.medical_expense',
'GLDeductible': 'coverage.gl.deductible',
'GLRetroDate': 'coverage.gl.retro_date',
'AdditionalInsuredRequired': 'coverage.gl.additional_insured_required',
'WaiverOfSubrogation': 'coverage.gl.waiver_of_subrogation',
}
ACORD_130_FIELD_MAP = {
# Workers Comp specific fields
'WCState': 'coverage.wc.governing_state',
'WCClassCode': 'coverage.wc.class_codes',
'WCPayroll': 'coverage.wc.estimated_annual_payroll',
'WCNumEmployees': 'business_info.num_employees',
'WCExperienceMod': 'loss_history_summary.experience_mod',
'WCDeductible': 'coverage.wc.deductible',
'EachAccident': 'coverage.wc.each_accident_limit',
'DiseasePolicyLimit': 'coverage.wc.disease_policy_limit',
'DiseaseEachEmployee': 'coverage.wc.disease_each_employee',
}
ACORD_140_FIELD_MAP = {
# Property specific fields
'BuildingValue': 'coverage.property.building_value',
'BPPValue': 'coverage.property.bpp_value',
'BusinessIncomeValue': 'coverage.property.business_income',
'PropertyDeductible': 'coverage.property.deductible',
'ConstructionType': 'coverage.property.construction_type',
'YearBuilt': 'coverage.property.year_built',
'NumStories': 'coverage.property.num_stories',
'SquareFootage': 'coverage.property.square_footage',
'Sprinklered': 'coverage.property.sprinklered',
'AlarmType': 'coverage.property.alarm_type',
'RoofType': 'coverage.property.roof_type',
'RoofAge': 'coverage.property.roof_age',
}
LINE_TO_FORMS = {
'GL': ['ACORD_125', 'ACORD_126'],
'WC': ['ACORD_125', 'ACORD_130'],
'PROP': ['ACORD_125', 'ACORD_140'],
'BOP': ['ACORD_125', 'ACORD_126', 'ACORD_140'],
'CA': ['ACORD_125'], # Plus ACORD 127 (not yet mapped)
'UMBR': ['ACORD_125'], # Plus ACORD 129 (not yet mapped)
}
FORM_FIELD_MAPS = {
'ACORD_125': ACORD_125_FIELD_MAP,
'ACORD_126': ACORD_126_FIELD_MAP,
'ACORD_130': ACORD_130_FIELD_MAP,
'ACORD_140': ACORD_140_FIELD_MAP,
}
class ACORDFormPopulator:
def __init__(self):
self.field_maps = FORM_FIELD_MAPS
self.line_forms = LINE_TO_FORMS
def populate_for_line(self, line_of_business: str, merged_data: dict) -> dict:
"""Populate all required ACORD forms for a given line of business."""
lob = line_of_business.upper()
required_forms = self.line_forms.get(lob, ['ACORD_125'])
populated_forms = {}
# Summarize loss history for form population
if merged_data.get('loss_history'):
merged_data['loss_history_summary'] = self._summarize_loss_history(
merged_data['loss_history'], lob
)
# Set default dates if not provided
if not merged_data.get('coverage', {}).get('effective_date'):
merged_data.setdefault('coverage', {})['effective_date'] = (
datetime.now() + timedelta(days=30)
).strftime('%Y-%m-%d')
merged_data['coverage']['expiration_date'] = (
datetime.now() + timedelta(days=395)
).strftime('%Y-%m-%d')
for form_name in required_forms:
field_map = self.field_maps.get(form_name, {})
populated = {}
missing_fields = []
for acord_field, data_path in field_map.items():
value = self._get_nested_value(merged_data, data_path)
if value is not None:
populated[acord_field] = value
else:
missing_fields.append(acord_field)
populated_forms[form_name] = {
'fields': populated,
'missing_fields': missing_fields,
'completeness': len(populated) / max(len(field_map), 1)
}
logger.info(
f'{form_name}: {len(populated)}/{len(field_map)} fields populated '
f'({populated_forms[form_name]["completeness"]:.0%} complete)'
)
return populated_forms
def _get_nested_value(self, data: dict, path: str):
"""Navigate nested dict using dot notation (supports array indices)."""
keys = path.split('.')
value = data
for key in keys:
if isinstance(value, dict):
value = value.get(key)
elif isinstance(value, list):
try:
value = value[int(key)]
except (IndexError, ValueError):
return None
else:
return None
if value is None:
return None
return value
def _summarize_loss_history(self, loss_runs: list, line_of_business: str) -> dict:
"""Summarize loss history across multiple loss runs for form population."""
summary = {
'total_claims': 0,
'open_claims': 0,
'total_incurred': 0.0,
'total_paid': 0.0,
'years_covered': set(),
'experience_mod': None,
'largest_claim': 0.0
}
for lr in loss_runs:
if isinstance(lr, dict):
data = lr
else:
data = lr if isinstance(lr, dict) else {}
claims = data.get('claims', [])
summary['total_claims'] += len(claims)
for claim in claims:
if isinstance(claim, dict):
if claim.get('claim_status', '').lower() == 'open':
summary['open_claims'] += 1
incurred = claim.get('total_incurred', 0) or 0
summary['total_incurred'] += incurred
summary['total_paid'] += claim.get('total_paid', 0) or 0
summary['largest_claim'] = max(summary['largest_claim'], incurred)
if data.get('experience_mod'):
summary['experience_mod'] = data['experience_mod']
if data.get('policy_effective_date'):
try:
year = int(data['policy_effective_date'][:4])
summary['years_covered'].add(year)
except (ValueError, IndexError):
pass
summary['years_covered'] = sorted(list(summary['years_covered']))
return summaryLoss Run Data Extraction Prompt
Type: prompt Specialized prompt template for extracting structured claims and policy data from carrier loss run PDF documents. Handles the wide variety of loss run formats across different insurance carriers (each carrier formats their loss runs differently). Includes examples for common carriers like Hartford, Travelers, CNA, AmTrust, and Liberty Mutual.
Implementation:
Loss Run Extraction Prompt v1.0
Email Ingestion Integration
Type: integration Microsoft Graph API integration that monitors the agency's shared mailbox for incoming loss run documents and carrier correspondence. Automatically downloads PDF attachments, uploads them to Azure Blob Storage, and triggers the document processing pipeline. Handles duplicate detection, attachment filtering, and email classification. Implementation:
# /opt/insurance-ai-agent/agent/email_ingestion.py
import os
import hashlib
import logging
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
import httpx
import base64
logger = logging.getLogger(__name__)
class EmailIngestionService:
def __init__(self):
self.tenant_id = os.environ['MS_GRAPH_TENANT_ID']
self.client_id = os.environ['MS_GRAPH_CLIENT_ID']
self.client_secret = os.environ['MS_GRAPH_CLIENT_SECRET']
self.mailbox = os.environ.get('MONITORED_MAILBOX', 'ai-submissions@agency.com')
self.graph_base = 'https://graph.microsoft.com/v1.0'
self._token = None
self._token_expiry = None
self._processed_hashes = set() # In-memory dedup; use Redis in production
async def _get_token(self) -> str:
if self._token and self._token_expiry and datetime.utcnow() < self._token_expiry:
return self._token
async with httpx.AsyncClient() as client:
response = await client.post(
f'https://login.microsoftonline.com/{self.tenant_id}/oauth2/v2.0/token',
data={
'grant_type': 'client_credentials',
'client_id': self.client_id,
'client_secret': self.client_secret,
'scope': 'https://graph.microsoft.com/.default'
}
)
response.raise_for_status()
token_data = response.json()
self._token = token_data['access_token']
self._token_expiry = datetime.utcnow() + timedelta(seconds=token_data['expires_in'] - 60)
return self._token
async def check_for_new_documents(self) -> List[Dict[str, Any]]:
"""Check shared mailbox for new emails with attachments."""
token = await self._get_token()
headers = {'Authorization': f'Bearer {token}'}
# Get unread emails with attachments
filter_query = "isRead eq false and hasAttachments eq true"
async with httpx.AsyncClient() as client:
response = await client.get(
f'{self.graph_base}/users/{self.mailbox}/messages',
headers=headers,
params={
'$filter': filter_query,
'$select': 'id,subject,from,receivedDateTime,hasAttachments',
'$top': 50,
'$orderby': 'receivedDateTime desc'
}
)
response.raise_for_status()
messages = response.json().get('value', [])
new_documents = []
for msg in messages:
attachments = await self._get_attachments(msg['id'], headers)
for att in attachments:
if self._is_relevant_attachment(att):
# Check for duplicates
content_hash = hashlib.sha256(att['content']).hexdigest()
if content_hash not in self._processed_hashes:
self._processed_hashes.add(content_hash)
new_documents.append({
'email_id': msg['id'],
'subject': msg['subject'],
'from': msg.get('from', {}).get('emailAddress', {}).get('address', ''),
'received': msg['receivedDateTime'],
'filename': att['name'],
'content': att['content'],
'content_hash': content_hash,
'content_type': att['contentType']
})
# Mark email as read
await self._mark_as_read(msg['id'], headers)
logger.info(f'Found {len(new_documents)} new documents from {len(messages)} emails')
return new_documents
async def _get_attachments(self, message_id: str, headers: dict) -> list:
async with httpx.AsyncClient() as client:
response = await client.get(
f'{self.graph_base}/users/{self.mailbox}/messages/{message_id}/attachments',
headers=headers
)
response.raise_for_status()
attachments = []
for att in response.json().get('value', []):
if att.get('contentBytes'):
attachments.append({
'name': att['name'],
'contentType': att['contentType'],
'content': base64.b64decode(att['contentBytes'])
})
return attachments
async def _mark_as_read(self, message_id: str, headers: dict):
async with httpx.AsyncClient() as client:
await client.patch(
f'{self.graph_base}/users/{self.mailbox}/messages/{message_id}',
headers={**headers, 'Content-Type': 'application/json'},
json={'isRead': True}
)
def _is_relevant_attachment(self, attachment: dict) -> bool:
"""Filter for relevant document types."""
name = attachment.get('name', '').lower()
content_type = attachment.get('contentType', '').lower()
# Accept PDFs, Word docs, and Excel files
relevant_extensions = ('.pdf', '.docx', '.doc', '.xlsx', '.xls', '.tiff', '.tif')
relevant_mimes = ('application/pdf', 'application/vnd.openxmlformats', 'image/tiff')
if any(name.endswith(ext) for ext in relevant_extensions):
return True
if any(mime in content_type for mime in relevant_mimes):
return True
return False
def classify_email_intent(self, subject: str, sender: str) -> str:
"""Classify the intent of an incoming email based on subject and sender."""
subject_lower = subject.lower()
if any(kw in subject_lower for kw in ['loss run', 'loss history', 'claims history', 'loss experience']):
return 'loss_run_delivery'
elif any(kw in subject_lower for kw in ['application', 'submission', 'acord', 'new business']):
return 'application'
elif any(kw in subject_lower for kw in ['quote', 'proposal', 'indication', 'premium']):
return 'carrier_quote'
elif any(kw in subject_lower for kw in ['renewal', 'expiring', 'upcoming renewal']):
return 'renewal_notice'
else:
return 'general'Quote Comparison Reporter
Type: skill Generates professional quote comparison reports when carrier quotes are received. Uses GPT-4.1-mini to analyze quotes from multiple carriers, highlight key differences in coverage, pricing, deductibles, and exclusions, and produce a formatted comparison that the agent/producer can present to the client.
Implementation:
Quote Comparison Report Prompt
import json
import logging
from typing import Dict, List, Any, Optional
from langchain_openai import AzureChatOpenAI
logger = logging.getLogger(__name__)
class QuoteComparisonReporter:
def __init__(self):
self.llm = AzureChatOpenAI(
azure_deployment='gpt-4-1-mini',
api_version='2025-04-01-preview',
temperature=0.2,
max_tokens=4096
)
async def generate_comparison(
self,
quotes: Dict[str, Dict[str, Any]],
client_info: Dict[str, Any],
loss_history_summary: Dict[str, Any]
) -> str:
"""Generate a formatted quote comparison report."""
prompt = COMPARISON_PROMPT.format(
business_name=client_info.get('business_name', 'N/A'),
lines_of_business=', '.join(client_info.get('lines_of_business', [])),
industry=client_info.get('business_description', 'N/A'),
revenue=client_info.get('annual_revenue', 'N/A'),
quotes_json=json.dumps(quotes, indent=2, default=str),
total_claims=loss_history_summary.get('total_claims', 0),
total_incurred=loss_history_summary.get('total_incurred', 0),
experience_mod=loss_history_summary.get('experience_mod', 'N/A')
)
response = await self.llm.ainvoke(prompt)
return response.content
def generate_quick_summary(self, quotes: Dict[str, Dict[str, Any]]) -> str:
"""Generate a quick text summary without LLM (for notifications)."""
lines = ['📊 Quote Summary:\n']
for carrier, details in quotes.items():
premium = details.get('premium', 'TBD')
if isinstance(premium, (int, float)):
premium = f'${premium:,.0f}'
lines.append(f' • {carrier}: {premium}')
if quotes:
premiums = [
v.get('premium', 0) for v in quotes.values()
if isinstance(v.get('premium'), (int, float))
]
if premiums:
lines.append(f'\n Lowest: ${min(premiums):,.0f} | Highest: ${max(premiums):,.0f}')
lines.append(f' Spread: ${max(premiums) - min(premiums):,.0f}')
return '\n'.join(lines)Testing & Validation
- DOCUMENT OCR TEST: Scan 3 different loss run PDFs (from different carriers — e.g., Hartford, Travelers, CNA) through the Azure Document Intelligence pipeline. Verify: (a) all pages are extracted, (b) tables are correctly identified with proper row/column structure, (c) text content is >98% accurate compared to the source document.
- LOSS RUN EXTRACTION ACCURACY TEST: Process 10 sample loss runs through the full extraction pipeline (OCR → LLM extraction). For each, manually compare extracted fields against the source document. Success criteria: ≥90% of fields correctly extracted, ≥95% accuracy for critical fields (claim counts, total incurred, policy number), and all fields below 85% confidence are flagged for human review. Record results in a spreadsheet tracking field-by-field accuracy per carrier format.
- ACORD FORM EXTRACTION TEST: Process 5 completed ACORD 125 forms through extraction. Verify: applicant name, FEIN, address, entity type, SIC/NAICS codes, and coverage dates are correctly extracted. Test with both typed/printed and handwritten forms. Success criteria: ≥95% accuracy on typed forms, ≥85% on handwritten forms.
- AMS CONNECTIVITY TEST: Execute a read-only API call to the agency's AMS to retrieve a known test client record. Verify all standard fields are returned and correctly mapped by the normalization function.
- FORM POPULATION COMPLETENESS TEST: For a known client with complete AMS data plus extracted loss run data, run the ACORD form populator for GL line of business. Verify: ACORD 125 has ≥80% of fields populated, ACORD 126 has ≥70% of GL-specific fields populated. Check that populated fields match the source data exactly (no data corruption during mapping).
- EMAIL INGESTION PIPELINE TEST: Send a test email with a PDF attachment to the ai-submissions shared mailbox. Verify within 10 minutes: (a) the email is detected by the monitoring workflow, (b) the PDF attachment is downloaded and uploaded to Azure Blob Storage, (c) the document appears in the processing queue, (d) the email is marked as read. Check n8n execution log for the complete workflow trace.
- HUMAN-IN-THE-LOOP APPROVAL TEST: Trigger a full submission workflow and verify it pauses at the human review checkpoint. Confirm: (a) the dashboard shows the pending submission with all extracted data, review flags, and populated form preview, (b) clicking 'Approve' resumes the workflow and proceeds to carrier submission, (c) clicking 'Reject' stops the workflow and logs the rejection reason, (d) the audit log records the reviewer name and timestamp.
- CARRIER SUBMISSION TEST (SANDBOX): Submit a test application to the Tarmika sandbox environment (if available) or verify the API call is correctly formatted by capturing the HTTP request without actually sending. Verify: all required fields are present, field values match the populated ACORD data, and the API response is correctly parsed. For RPA carriers, run in non-submit mode (auto_submit=false) and verify the carrier portal form is correctly populated via screenshot comparison.
- END-TO-END PIPELINE TEST: Run the complete workflow from a realistic starting point: (1) drop a loss run PDF and ACORD application PDF into the shared mailbox, (2) wait for automated pickup, (3) verify extraction in LangSmith traces, (4) verify AMS enrichment, (5) review populated forms in the dashboard, (6) approve the submission, (7) verify carrier submission API calls are made, (8) verify audit log has complete trail of all actions. Target: entire pipeline completes in <15 minutes for a single submission.
- PARALLEL RUN ACCURACY TEST: During the 2-week parallel run period, compare AI-populated submissions against manually-prepared submissions for the same clients. Track: field-level match rate, data entry errors caught by AI that humans missed, data entry errors made by AI that humans caught, and total time difference (AI vs. manual). Success criteria for transitioning to production: ≥95% field-level accuracy and consistent time savings of ≥50% vs. manual process.
- SECURITY AND COMPLIANCE AUDIT: Verify (a) all API communications use TLS 1.2+, (b) Azure Blob Storage has encryption at rest enabled, (c) PostgreSQL has SSL connections enforced, (d) all user-facing endpoints require authentication, (e) the audit_log table captures every AI action with timestamps, (f) PII data is not logged in application logs (only in the encrypted database), (g) Azure OpenAI content filtering is active and not returning filtered responses for insurance content.
- DISASTER RECOVERY TEST: Simulate a VM failure by stopping the insurance-agent service. Verify: (a) Azure Monitor alert fires within 10 minutes, (b) n8n error workflow sends notification to MSP, (c) in-progress submissions can be resumed from the LangGraph checkpoint after service restart without data loss, (d) no duplicate carrier submissions occur during recovery.
# verify pages, tables, and key-value pairs extracted by Azure Document
# Intelligence
python -c "from agent.doc_intelligence import DocumentProcessor; dp = DocumentProcessor(); result = dp.analyze_document(document_url='<test-url>'); print(f'Pages: {len(result["pages"])}, Tables: {len(result["tables"])}, KV Pairs: {len(result["key_value_pairs"])}')"# retrieve known test client record; verify response contains business_name,
# fein, mailing_address, and primary_contact
curl http://localhost:8000/api/v1/test/ams-lookup?client_id=TEST-001Client Handoff
Client Handoff Checklist
Training Sessions (Schedule 3 sessions, 1 hour each)
Session 1: System Overview & Daily Workflow (All Staff)
- How the AI submission agent works end-to-end (visual flow diagram)
- What changes in their daily workflow (emphasize: AI assists, humans decide)
- How documents enter the system (email forwarding, scanner profiles, manual upload)
- The review dashboard: logging in, viewing pending submissions, understanding extraction results
- How to approve or reject a submission (and when to reject)
- Understanding confidence scores and review flags
Session 2: Power User Training (CSRs & Account Managers)
- How to correct AI extraction errors in the review dashboard
- How to add/modify carrier targets for a submission
- How to handle edge cases: unusual documents, new carrier formats, multi-policy submissions
- How to view submission history and audit trails
- How to request loss runs to feed into the system
- Tips for getting better extraction results (scan quality, email subject lines)
Session 3: Admin Training (Agency Owner/Principal + Operations Manager)
- n8n dashboard overview: monitoring workflow health
- LangSmith basics: how to check if the AI is performing well
- Cost monitoring: Azure consumption and API usage dashboards
- How to request changes (new carriers, new lines of business, workflow modifications)
- Compliance documentation: where records are stored, how to produce audit reports
- Escalation process: who to contact at the MSP for issues
Documentation to Leave Behind
Success Criteria to Review Together (at 30-day and 90-day checkpoints)
- Extraction accuracy rate (target: ≥95% by day 90)
- Submission turnaround time (target: ≤2 hours from document receipt to carrier submission, vs. previous manual baseline)
- Staff time savings (target: 20+ hours/week reduction in manual submission work)
- Carrier submission success rate (target: ≥98% accepted by carrier platforms without errors)
- User adoption rate (target: 100% of submissions going through AI pipeline by day 90)
- Zero compliance incidents related to the AI system
Maintenance
Ongoing Maintenance Responsibilities
Weekly Tasks (MSP L2 Technician, ~2 hours/week)
- Monday AM: Review LangSmith dashboard for prior week — check error rates, average latency, token consumption, and any failed runs. Investigate and resolve any recurring errors.
- Monday AM: Review n8n execution log — verify all scheduled workflows executed successfully. Check for failed webhook deliveries or timeout errors.
- Wednesday: Review Azure cost dashboard — verify AI API consumption is within expected range. Alert if spending exceeds 120% of monthly budget.
- Friday PM: Check extraction accuracy metrics — review any submissions that were rejected by agency staff during the week. Identify patterns in extraction failures for prompt tuning.
Monthly Tasks (MSP L2/L3 Engineer, ~4-6 hours/month)
- Carrier Portal Audit: Check all RPA-based carrier integrations for portal changes that may have broken selectors. Update Playwright scripts as needed. Budget 2-4 hours for this — carrier portals change frequently.
- Security Patch Cycle: Apply OS updates to the Azure VM (Ubuntu unattended-upgrades should handle critical patches, but verify). Update Python packages: pip install --upgrade langgraph langchain-openai azure-ai-documentintelligence.
- Database Maintenance: Run VACUUM ANALYZE on PostgreSQL tables. Check disk usage and archive old audit logs if needed.
- LLM Model Review: Check for new Azure OpenAI model versions. Test on a sample set before upgrading deployment.
- Backup Verification: Verify Azure PostgreSQL automated backups are completing. Test point-in-time restore on a quarterly basis.
- Usage Report: Generate monthly usage report for the agency: submissions processed, time saved, accuracy rates, carrier response times.
Quarterly Tasks (MSP Solutions Architect, ~8 hours/quarter)
- Accuracy Tune-Up: Review the past quarter's extraction accuracy data. Update prompts for any carrier loss run formats showing declining accuracy. Add new sample documents to the test suite.
- Compliance Review: Update vendor risk assessments. Review any new state AI regulations. Verify WISP is current. Check that audit logs are complete and retention policies are enforced.
- Capacity Planning: Review Azure resource utilization. Right-size VM and database tiers based on actual usage. Forecast growth if agency is adding staff or carriers.
- Feature Review: Meet with agency to discuss new capabilities — additional carriers, new lines of business, workflow improvements, new AI model capabilities.
- Disaster Recovery Test: Simulate a failure and verify recovery procedures work correctly.
Trigger-Based Maintenance
- New Carrier Added: When agency gets a new carrier appointment — add carrier to carrier_mapping.json, configure submission method (Tarmika/Semsee/RPA), test with sample submission. Estimate: 2-4 hours per carrier.
- New Line of Business: When agency wants to add a new LOB (e.g., Cyber, E&O) — add ACORD form mappings, update extraction prompts, test with sample documents. Estimate: 8-16 hours.
- AMS Upgrade: When Applied/Vertafore/HawkSoft releases a new version — test API compatibility, update any changed endpoints or field mappings.
- Azure OpenAI Model Update: When new model versions are available — test on sample set, compare accuracy and cost, update deployment if beneficial.
- Extraction Accuracy Drop: If weekly accuracy metrics drop below 90% — investigate root cause (new document format, OCR degradation, prompt drift), update prompts or retrain custom models.
SLA Considerations
- Response Time: Critical issues (system down, submissions failing) — 1 hour response, 4 hour resolution during business hours
- Standard Issues: (accuracy problems, minor bugs) — 4 hour response, next business day resolution
- Enhancement Requests: (new carriers, features) — acknowledged within 1 business day, scoped and scheduled within 1 week
- Uptime Target: 99.5% during business hours (Mon-Fri 7am-7pm agency local time)
- Maintenance Windows: Saturday 10pm-Sunday 6am for planned updates
Escalation Path
Alternatives
...
Turnkey Platform Approach (Indio + Tarmika)
Instead of building a custom AI agent, deploy Applied Systems' Indio platform for application management and data gathering combined with Tarmika Bridge for multi-carrier quoting. Indio provides smart form technology that auto-populates ACORD fields from prior submissions and offers a client-facing portal for digital application intake. Tarmika handles the carrier submission and quoting. Both are established, carrier-approved platforms with existing integrations to Applied Epic and other AMS platforms.
Hybrid Approach: Quoting Platform + Custom AI Extraction Layer
Use Tarmika or Semsee as the carrier submission platform but build a custom AI extraction layer using Azure OpenAI + Document Intelligence that automates the data entry into these platforms. The AI handles document ingestion, OCR, and data extraction, then populates the quoting platform via its API or UI automation. This gives the agency AI-powered efficiency without building the carrier submission infrastructure.
Syntora Purpose-Built Insurance Automation
Engage Syntora, a purpose-built insurance submission automation vendor, to build the automation. Syntora specializes in connecting to carrier portals and AMS systems with pre-built integrations for Applied Epic and Vertafore. They offer fixed-price project engagements (not hourly) with typical 6-week deployment timelines for 10 carriers.
Patra AI + BPO Hybrid Service
Outsource the entire submission process to Patra, which combines AI-powered document processing with human business process outsourcing (BPO). Patra's team handles loss run ordering, data extraction, application population, and carrier submission using their proprietary AI platform augmented by trained insurance processing staff.
Microsoft Power Automate + Copilot Studio Approach
For agencies already deep in the Microsoft ecosystem, build the automation using Microsoft Power Automate (workflow automation), Copilot Studio (AI agent builder), and Azure AI Document Intelligence. This approach uses Microsoft's low-code/no-code tools instead of LangGraph and n8n, potentially enabling the agency's own IT staff to maintain the solution.
Want early access to the full toolkit?