75 min readAutonomous agents

Implementation Guide: Gather loss runs and applications and submit to multiple carriers for quotes

Step-by-step implementation guide for deploying AI to gather loss runs and applications and submit to multiple carriers for quotes for Insurance Agencies clients.

Hardware Procurement

Ricoh fi-8170 Document Scanner

Ricoh / PFUPA03810-B055Qty: 1

$699 per unit (MSP cost) / $950 suggested resale with setup

High-volume duplex document scanner (70 ppm) for digitizing paper loss runs, ACORD applications, and supplemental forms received by mail or hand-delivery. Features a 100-sheet ADF, USB 3.2 and Gigabit Ethernet connectivity, and PaperStream IP driver for enhanced OCR preprocessing. Connects to the document ingestion pipeline via network scanning to a monitored hot folder or SharePoint library.

Fujitsu ScanSnap iX1600 Document Scanner

Ricoh / PFUPA03770-B615Qty: 2

$413 per unit (MSP cost) / $550 suggested resale with setup

Mid-volume Wi-Fi-enabled scanner (40 ppm) for branch offices or individual producer desks. Features one-touch cloud-direct scanning profiles that can be configured to send directly to SharePoint or a monitored inbox folder. Used as secondary/branch scanning stations when the agency has multiple locations or producers who receive paper documents at their desks.

Dell OptiPlex 7020 Micro Desktop

Dell TechnologiesOptiPlex 7020 MFF (i5-14500T, 16GB, 512GB NVMe)Qty: 2

$950 per unit (MSP cost) / $1,250 suggested resale with setup and imaging

Standard business workstations for agency staff operating the AI submission workflow dashboard. All AI processing is cloud-based so no GPU is needed; the workstation only needs a modern browser and reliable network connectivity. These units serve as the primary workstations for the account managers / CSRs who review AI-extracted data and approve submissions. Quantity depends on number of staff interacting with the system.

Ubiquiti UniFi U6-Pro Access Point

UbiquitiU6-ProQty: 2

$150 per unit (MSP cost) / $220 suggested resale with installation

Reliable Wi-Fi 6 coverage is required for the ScanSnap iX1600 wireless scanners and for ensuring consistent cloud connectivity to AI services and AMS platforms. Install one per floor/zone of the agency office.

APC Smart-UPS 1500VA LCD

APC by Schneider ElectricSMT1500CQty: 1

$550 per unit (MSP cost) / $725 suggested resale with installation

Uninterruptible power supply for network equipment and any on-premise infrastructure (router, switch, NAS if used). Ensures the document scanning pipeline and network connectivity are not disrupted by power events during batch submission processing.

Software Procurement

Microsoft 365 Business Premium

MicrosoftMicrosoft 365 Business PremiumQty: 10–30 users (typical agency)

$22/user/month (MSP cost) / $33/user/month suggested resale; typical agency 10–30 users = $220–$660/month MSP cost

Core productivity and identity platform. Provides Exchange Online for email ingestion of loss run PDFs, SharePoint Online for document storage and hot-folder scanning targets, Azure AD (Entra ID) for SSO and MFA across all platforms, Microsoft Teams for collaboration, and the Microsoft Graph API for programmatic email and file access. Business Premium tier includes Intune and Azure AD P1 for security compliance.

Azure OpenAI Service (GPT-4.1 / GPT-4.1-mini)

Microsoft / OpenAIGPT-4.1 / GPT-4.1-mini

GPT-4.1-mini: $0.40/1M input + $1.60/1M output tokens; GPT-4.1: $2.00/1M input + $8.00/1M output tokens. Typical agency processing 200 submissions/month: ~$50–$200/month in API costs; resell bundled into managed service

Primary LLM engine for document understanding, data extraction from loss run PDFs and ACORD applications, intelligent field mapping, carrier-specific form population logic, and quote comparison summarization. Azure OpenAI is preferred over direct OpenAI API for enterprise data residency guarantees (US-only processing), Azure RBAC integration, content filtering, and compliance certifications (SOC 2, HIPAA BAA available).

Read model: $1.25/100 pages; Prebuilt models: $10/1,000 pages. Typical agency: $25–$100/month; resell bundled into managed service

OCR and document structure extraction engine. Processes scanned PDFs and images of loss runs, ACORD forms, and carrier correspondence into structured text with layout preservation, table extraction, and key-value pair detection. Used as the first stage of the document intelligence pipeline before LLM-based semantic extraction.

Tarmika Bridge

Applied Systems (Tarmika)

Contact vendor for agency-specific pricing; estimated $200–$500/month based on agency size and carrier count. 26% market share — leading commercial quoting platform

Primary commercial lines quoting and carrier submission platform. Provides single-entry rating for small business commercial insurance with streamlined underwriting questions and extensive NAICS mapping. The AI agent submits populated application data to Tarmika via its API, which then distributes to connected carriers. Supports the broadest carrier panel for small commercial.

Semsee

Semsee

Contact vendor; free tier available for basic access; premium plans for API access and market access program. Estimated $100–$400/month

Secondary/alternative commercial quoting platform. Provides rapid multi-carrier quoting (quotes back within minutes from 4+ carriers). Particularly valuable for its Market Access Program that grants agencies access to additional carrier markets without direct appointments. Used as a fallback or supplementary submission channel alongside Tarmika.

Applied Epic / Vertafore AMS360 / HawkSoft (existing AMS)

Applied Systems / Vertafore / HawkSoftper-seat SaaS (existing client subscription)

Already licensed by agency; Applied Epic ~$150/user/mo; AMS360 ~$150/user/mo; HawkSoft ~$94/user/mo + base fee. MSP assists with API access enablement — no additional license cost

The agency's existing system of record. The AI agent reads client data, policy details, and contact information from the AMS via API to pre-populate applications. Completed quotes and submission records are written back to the AMS for the agency's workflow continuity. The MSP must ensure API access is enabled and properly credentialed.

$0 for the library; hosting costs covered under Azure VM below

Core multi-agent orchestration framework. Implements the stateful submission workflow as a directed graph with nodes for document intake, data extraction, AMS lookup, form population, human review, carrier submission, and quote monitoring. LangGraph provides built-in persistence, checkpointing (for human-in-the-loop pauses), and parallel execution (for simultaneous multi-carrier submission).

LangSmith (Observability Platform)

LangChain Inc.Qty: 1–2 Plus seats

Developer tier: free (1 seat, 5,000 traces/month); Plus tier: $39/seat/month; $39–$78/month bundled into managed service fee

Production monitoring, tracing, and debugging platform for the LangGraph agent. Captures every LLM call, tool invocation, and decision point with full input/output logging. Critical for debugging extraction errors, monitoring accuracy metrics, tracking API costs, and providing compliance audit trails for every AI-processed submission.

Self-hosted: free software + ~$100–$200/month Azure VM hosting; Cloud: $50/month (Pro plan). Resell as part of managed service at $500–$800/month bundled

Low-code workflow automation platform connecting all integration points: email monitoring triggers, document preprocessing dispatch, AMS API calls, LangGraph agent invocation, Tarmika/Semsee API submission, and notification/approval workflows. n8n serves as the integration backbone and scheduling layer that orchestrates the end-to-end pipeline without requiring custom code for each connector.

Azure Virtual Machine (Agent Runtime)

Microsoft AzureStandard_D2s_v5 / Standard_D4s_v5Qty: 1

Standard_D2s_v5 (2 vCPU, 8GB RAM): ~$70/month; Standard_D4s_v5 (4 vCPU, 16GB RAM): ~$140/month for higher workloads. Add $20–$40/month for managed disks and networking

Hosts the self-managed n8n instance, LangGraph agent runtime (Python), Redis for task queuing, and PostgreSQL for agent state persistence. Deployed in Azure East US or West US 2 region for proximity to Azure OpenAI endpoints. The MSP manages this VM as part of the ongoing managed service.

Azure Blob Storage

Microsoft Azure

~$0.018/GB/month (Hot tier); typical agency needs 100–500GB = $2–$9/month

Archival storage for all processed documents, extracted data JSON files, populated ACORD form PDFs, carrier responses, and audit logs. Provides immutable storage option for compliance record retention. Lifecycle policies automatically tier older documents to Cool/Archive storage for cost optimization.

Starting at $50/user/month; typical agency 5–10 users = $250–$500/month

Optional but recommended turnkey application management platform. Provides smart form technology for insurance application data gathering, client-facing digital application portals, and single-click submission to carriers. Can serve as an intermediate layer between the AI agent and carrier submission, providing validated form templates and submission tracking. Particularly valuable if the agency wants a branded client-facing portal.

Burstable B1ms (1 vCore, 2GB): ~$25/month; General Purpose D2s (2 vCores, 8GB): ~$100/month

Persistent storage for LangGraph agent state, submission tracking records, extracted data cache, carrier response history, and operational metadata. Used by n8n for workflow execution history. PaaS deployment provides automated backups, high availability, and managed patching.

Prerequisites

  • Active Agency Management System (AMS) subscription — Applied Epic, Vertafore AMS360, HawkSoft, or EZLynx — with API access enabled. Contact the AMS vendor to request API credentials and verify the agency's plan includes API access (some plans require an upgrade).
  • Active subscription to at least one commercial quoting/submission platform — Tarmika Bridge (preferred) and/or Semsee. Verify API access is included in the subscription tier; contact the vendor if only the web portal is available.
  • Microsoft 365 Business Premium (or higher) for all agency staff who will interact with the AI submission workflow. Required for Exchange Online (email ingestion), SharePoint (document storage), Azure AD/Entra ID (SSO and MFA), and Microsoft Graph API access.
  • Business-grade internet service: minimum 100 Mbps download / 50 Mbps upload with <50ms latency to Azure US data centers. Verify with a speed test to Azure East US or West US 2 (use Azure Speed Test tool).
  • Azure subscription provisioned under the MSP's Azure tenant (for managing AI services) or under the client's tenant with MSP granted RBAC Contributor access. An Azure OpenAI resource can only be created with an approved Azure OpenAI access application (apply at https://aka.ms/oai/access).
  • Complete inventory of the agency's active carrier appointments, including: carrier names, lines of business written with each carrier, carrier portal URLs and login credentials (for RPA fallback), and whether each carrier is accessible via Tarmika/Semsee or requires direct portal submission.
  • List of all ACORD form types the agency commonly uses (typically ACORD 125 - Commercial Insurance Application, ACORD 126 - Commercial General Liability, ACORD 130 - Workers Compensation, ACORD 140 - Property, plus carrier-specific supplemental applications).
  • Sample documents for AI training: at least 10 representative loss run PDFs (from different carriers — each carrier formats loss runs differently), 10 completed ACORD applications, and 5 carrier-specific supplemental applications. These will be used to build and test the extraction prompts.
  • Domain admin or Global Admin access to the agency's Microsoft 365 tenant for configuring mail flow rules, SharePoint sites, Azure AD app registrations, and API permissions.
  • Written authorization from the agency principal/owner approving: (a) AI processing of client PII data, (b) API integration with their AMS, (c) automated submission to carrier platforms with human-in-the-loop approval, and (d) data processing via Azure OpenAI (with data residency in US). This authorization should be documented in the project SOW.
  • Current Written Information Security Plan (WISP) or willingness to create one. The WISP must be updated to include the AI system's data flows, vendor risk assessments, and incident response procedures per GLBA/FTC Safeguards Rule and applicable NAIC Model Law requirements.
  • Python 3.11+ development environment available to the MSP build team, with access to GitHub/GitLab for version-controlled deployment of agent code and configuration.

Installation Steps

...

Step 1: Provision Azure Infrastructure

Create the Azure resource group and deploy all required Azure services: Virtual Machine for the agent runtime, Azure OpenAI resource with GPT-4.1 and GPT-4.1-mini model deployments, Azure AI Document Intelligence resource, Azure Database for PostgreSQL Flexible Server, Azure Blob Storage account, and Virtual Network with NSG rules. All resources should be deployed in the same Azure region (East US 2 recommended for best Azure OpenAI availability).

bash
az login
az account set --subscription '<MSP-Azure-Subscription-ID>'
az group create --name rg-insurance-ai-agent --location eastus2
# Create Virtual Network
az network vnet create --resource-group rg-insurance-ai-agent --name vnet-agent --address-prefix 10.0.0.0/16 --subnet-name subnet-agent --subnet-prefix 10.0.1.0/24
# Create NSG with restricted inbound rules
az network nsg create --resource-group rg-insurance-ai-agent --name nsg-agent
az network nsg rule create --resource-group rg-insurance-ai-agent --nsg-name nsg-agent --name AllowSSH --priority 1000 --source-address-prefixes '<MSP-Office-IP>/32' --destination-port-ranges 22 --access Allow --protocol Tcp --direction Inbound
az network nsg rule create --resource-group rg-insurance-ai-agent --nsg-name nsg-agent --name AllowHTTPS --priority 1010 --source-address-prefixes '<MSP-Office-IP>/32' '<Client-Office-IP>/32' --destination-port-ranges 443 --access Allow --protocol Tcp --direction Inbound
# Create VM for agent runtime
az vm create --resource-group rg-insurance-ai-agent --name vm-agent-runtime --image Ubuntu2404LTSGen2 --size Standard_D4s_v5 --admin-username mspadmin --generate-ssh-keys --vnet-name vnet-agent --subnet subnet-agent --nsg nsg-agent --public-ip-sku Standard
# Create Azure OpenAI resource
az cognitiveservices account create --name oai-insurance-agent --resource-group rg-insurance-ai-agent --kind OpenAI --sku S0 --location eastus2
# Deploy GPT-4.1-mini model
az cognitiveservices account deployment create --name oai-insurance-agent --resource-group rg-insurance-ai-agent --deployment-name gpt-4-1-mini --model-name gpt-4.1-mini --model-version 2025-04-14 --model-format OpenAI --sku-capacity 80 --sku-name Standard
# Deploy GPT-4.1 model
az cognitiveservices account deployment create --name oai-insurance-agent --resource-group rg-insurance-ai-agent --deployment-name gpt-4-1 --model-name gpt-4.1 --model-version 2025-04-14 --model-format OpenAI --sku-capacity 40 --sku-name Standard
# Create Document Intelligence resource
az cognitiveservices account create --name docai-insurance-agent --resource-group rg-insurance-ai-agent --kind FormRecognizer --sku S0 --location eastus2
# Create PostgreSQL Flexible Server
az postgres flexible-server create --resource-group rg-insurance-ai-agent --name pgdb-insurance-agent --location eastus2 --admin-user pgadmin --admin-password '<STRONG-PASSWORD>' --sku-name Standard_B2s --tier Burstable --storage-size 64 --version 16 --vnet vnet-agent --subnet subnet-pg --private-dns-zone-suffix postgres.database.azure.com
# Create Storage Account
az storage account create --name stinsuranceagent --resource-group rg-insurance-ai-agent --location eastus2 --sku Standard_LRS --kind StorageV2
az storage container create --name loss-runs --account-name stinsuranceagent
az storage container create --name acord-forms --account-name stinsuranceagent
az storage container create --name submissions --account-name stinsuranceagent
az storage container create --name audit-logs --account-name stinsuranceagent
Note

Replace placeholder values (<MSP-Azure-Subscription-ID>, <MSP-Office-IP>, <Client-Office-IP>, <STRONG-PASSWORD>) with actual values. Azure OpenAI access must be pre-approved — submit the application at https://aka.ms/oai/access at least 2 weeks before this step. The PostgreSQL subnet needs to be dedicated (no other resources); create it as a delegated subnet for Microsoft.DBforPostgreSQL/flexibleServers. Consider using Azure Key Vault for all secrets instead of passing passwords on the command line.

Step 2: Configure VM Runtime Environment

SSH into the Azure VM and install all required runtime dependencies: Python 3.11, Node.js 20 (for n8n), Docker (optional for containerized deployment), Redis for task queuing, and all Python packages for the LangGraph agent. Set up the project directory structure and configure systemd services for automatic startup.

bash
ssh mspadmin@<VM-PUBLIC-IP>
# System updates
sudo apt update && sudo apt upgrade -y
# Install Python 3.11
sudo apt install -y python3.11 python3.11-venv python3.11-dev python3-pip
# Install Node.js 20 LTS for n8n
curl -fsSL https://deb.nodesource.com/setup_20.x | sudo -E bash -
sudo apt install -y nodejs
# Install Redis
sudo apt install -y redis-server
sudo systemctl enable redis-server && sudo systemctl start redis-server
# Install n8n globally
sudo npm install -g n8n
# Create project directory structure
sudo mkdir -p /opt/insurance-ai-agent/{agent,config,logs,scripts,templates}
sudo chown -R mspadmin:mspadmin /opt/insurance-ai-agent
# Create Python virtual environment
cd /opt/insurance-ai-agent
python3.11 -m venv venv
source venv/bin/activate
# Install Python dependencies
pip install --upgrade pip
pip install langgraph langchain langchain-openai langchain-community langsmith azure-ai-documentintelligence azure-storage-blob azure-identity openai httpx psycopg2-binary redis celery pydantic python-dotenv fastapi uvicorn jinja2 pdfplumber PyPDF2 python-docx openpyxl requests beautifulsoup4 playwright
# Install Playwright browsers for carrier portal RPA
plawright install chromium
plawright install-deps
# Create environment file
cat > /opt/insurance-ai-agent/.env << 'EOF'
AZURE_OPENAI_ENDPOINT=https://oai-insurance-agent.openai.azure.com/
AZURE_OPENAI_API_KEY=<YOUR-AZURE-OPENAI-KEY>
AZURE_OPENAI_DEPLOYMENT_MINI=gpt-4-1-mini
AZURE_OPENAI_DEPLOYMENT_FULL=gpt-4-1
AZURE_OPENAI_API_VERSION=2025-04-01-preview
AZURE_DOC_INTELLIGENCE_ENDPOINT=https://docai-insurance-agent.cognitiveservices.azure.com/
AZURE_DOC_INTELLIGENCE_KEY=<YOUR-DOC-AI-KEY>
AZURE_STORAGE_CONNECTION_STRING=<YOUR-STORAGE-CONN-STRING>
DATABASE_URL=postgresql://pgadmin:<PASSWORD>@pgdb-insurance-agent.postgres.database.azure.com:5432/insuranceagent
REDIS_URL=redis://localhost:6379/0
LANGCHAIN_TRACING_V2=true
LANGCHAIN_API_KEY=<YOUR-LANGSMITH-KEY>
LANGCHAIN_PROJECT=insurance-submission-agent
TARMIKA_API_KEY=<TARMIKA-API-KEY>
TARMIKA_API_URL=<TARMIKA-API-ENDPOINT>
SEMSEE_API_KEY=<SEMSEE-API-KEY>
SEMSEE_API_URL=<SEMSEE-API-ENDPOINT>
AMS_API_BASE_URL=<AMS-API-ENDPOINT>
AMS_API_KEY=<AMS-API-KEY>
MS_GRAPH_CLIENT_ID=<AZURE-AD-APP-CLIENT-ID>
MS_GRAPH_CLIENT_SECRET=<AZURE-AD-APP-CLIENT-SECRET>
MS_GRAPH_TENANT_ID=<AZURE-AD-TENANT-ID>
EOF
chmod 600 /opt/insurance-ai-agent/.env
Note

All API keys and secrets should eventually be migrated to Azure Key Vault with managed identity access from the VM. The .env file is used for initial development and testing only. Playwright with Chromium is installed for RPA-based carrier portal submissions where API access is not available. Ensure the VM has at least 16GB RAM if running n8n + LangGraph agent + Redis concurrently.

Step 3: Configure Microsoft 365 Integration for Email Ingestion

Register an Azure AD (Entra ID) application for Microsoft Graph API access. This app will monitor the agency's shared submission inbox for incoming loss run PDFs and carrier correspondence, automatically downloading attachments and routing them to the document processing pipeline. Configure application-level permissions for Mail.Read, Mail.ReadWrite, and Files.ReadWrite.All.

  • In Azure Portal > Azure Active Directory > App Registrations > New Registration — Name: Insurance-AI-Agent-Graph-App, Supported account types: Single tenant, Redirect URI: (leave blank for daemon app)
  • After registration, note the Application (client) ID and Directory (tenant) ID
  • Add API Permissions (Application type, not Delegated): Microsoft Graph > Application > Mail.Read, Mail.ReadWrite, Files.ReadWrite.All, User.Read.All
  • Grant admin consent for the permissions
  • Create a client secret: Certificates & secrets > New client secret > Description: 'AI Agent Access' > Expires: 24 months — Copy the secret value immediately, it won't be shown again
  • Configure mail flow rule in Exchange Admin Center: Go to https://admin.exchange.microsoft.com > Mail flow > Rules
  • Create rule: 'Copy Loss Runs to Shared Mailbox' — Condition: Subject contains 'loss run' OR 'loss history' OR attachment name contains 'loss_run' — Action: Bcc to submissions@<agency-domain>.com
  • Create shared mailbox for AI agent monitoring (see PowerShell commands below)
  • Create SharePoint document library 'AI-Processed-Submissions' with folders: /loss-runs, /applications, /carrier-responses, /audit-trail
Create shared mailbox for AI agent monitoring via Exchange Online PowerShell
powershell
Connect-ExchangeOnline
New-Mailbox -Shared -Name 'AI Submissions Inbox' -DisplayName 'AI Submissions Inbox' -Alias ai-submissions
Set-Mailbox -Identity ai-submissions -MessageCopyForSendOnBehalfEnabled $true
Note

The shared mailbox approach is preferred over monitoring individual mailboxes for privacy and compliance reasons. The AI agent only accesses the shared submissions mailbox, not personal mailboxes. Application permissions require Global Admin consent. Set the client secret expiration reminder in the MSP's ticketing system — secrets must be rotated before expiration. Consider using a certificate instead of client secret for production deployments for enhanced security.

Step 4: Initialize Database Schema and Storage

Connect to the PostgreSQL database and create the schema for tracking submissions, extracted data, carrier responses, and audit logs. Also configure Azure Blob Storage lifecycle policies for document retention and cost optimization.

bash
source /opt/insurance-ai-agent/venv/bin/activate
cd /opt/insurance-ai-agent

# Create database
PGPASSWORD='<PASSWORD>' psql -h pgdb-insurance-agent.postgres.database.azure.com -U pgadmin -d postgres -c 'CREATE DATABASE insuranceagent;'

# Apply schema
PGPASSWORD='<PASSWORD>' psql -h pgdb-insurance-agent.postgres.database.azure.com -U pgadmin -d insuranceagent << 'EOSQL'

CREATE EXTENSION IF NOT EXISTS "uuid-ossp";

CREATE TABLE clients (
    id UUID PRIMARY KEY DEFAULT uuid_ossp.uuid_generate_v4(),
    ams_client_id VARCHAR(100) UNIQUE NOT NULL,
    business_name VARCHAR(255) NOT NULL,
    dba VARCHAR(255),
    fein VARCHAR(20),
    sic_code VARCHAR(10),
    naics_code VARCHAR(10),
    entity_type VARCHAR(50),
    state_of_incorporation VARCHAR(2),
    years_in_business INTEGER,
    annual_revenue DECIMAL(15,2),
    num_employees INTEGER,
    primary_contact_name VARCHAR(255),
    primary_contact_email VARCHAR(255),
    primary_contact_phone VARCHAR(20),
    mailing_address JSONB,
    locations JSONB,
    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW()
);

CREATE TABLE loss_runs (
    id UUID PRIMARY KEY DEFAULT uuid_ossp.uuid_generate_v4(),
    client_id UUID REFERENCES clients(id),
    carrier_name VARCHAR(255),
    policy_number VARCHAR(100),
    policy_type VARCHAR(50),
    effective_date DATE,
    expiration_date DATE,
    claims JSONB,
    total_incurred DECIMAL(15,2),
    total_paid DECIMAL(15,2),
    total_reserved DECIMAL(15,2),
    loss_ratio DECIMAL(5,4),
    experience_mod DECIMAL(5,3),
    raw_text TEXT,
    extraction_confidence DECIMAL(5,4),
    source_document_url VARCHAR(500),
    extracted_at TIMESTAMP DEFAULT NOW(),
    reviewed_by VARCHAR(255),
    reviewed_at TIMESTAMP
);

CREATE TABLE submissions (
    id UUID PRIMARY KEY DEFAULT uuid_ossp.uuid_generate_v4(),
    client_id UUID REFERENCES clients(id),
    submission_type VARCHAR(50) NOT NULL,
    lines_of_business TEXT[],
    target_carriers TEXT[],
    acord_forms_used TEXT[],
    populated_data JSONB,
    status VARCHAR(30) DEFAULT 'draft',
    human_approved_by VARCHAR(255),
    human_approved_at TIMESTAMP,
    submitted_at TIMESTAMP,
    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW()
);

CREATE TABLE carrier_submissions (
    id UUID PRIMARY KEY DEFAULT uuid_ossp.uuid_generate_v4(),
    submission_id UUID REFERENCES submissions(id),
    carrier_name VARCHAR(255) NOT NULL,
    platform VARCHAR(50),
    submission_method VARCHAR(30),
    platform_reference_id VARCHAR(100),
    status VARCHAR(30) DEFAULT 'pending',
    quote_received BOOLEAN DEFAULT false,
    quote_premium DECIMAL(15,2),
    quote_details JSONB,
    submitted_at TIMESTAMP,
    response_received_at TIMESTAMP,
    error_message TEXT
);

CREATE TABLE audit_log (
    id BIGSERIAL PRIMARY KEY,
    event_type VARCHAR(50) NOT NULL,
    entity_type VARCHAR(50),
    entity_id UUID,
    actor VARCHAR(100),
    details JSONB,
    timestamp TIMESTAMP DEFAULT NOW()
);

CREATE INDEX idx_submissions_client ON submissions(client_id);
CREATE INDEX idx_submissions_status ON submissions(status);
CREATE INDEX idx_carrier_sub_submission ON carrier_submissions(submission_id);
CREATE INDEX idx_carrier_sub_status ON carrier_submissions(status);
CREATE INDEX idx_audit_log_entity ON audit_log(entity_type, entity_id);
CREATE INDEX idx_audit_log_timestamp ON audit_log(timestamp);

EOSQL

# Configure Azure Blob Storage lifecycle policy
az storage account management-policy create --account-name stinsuranceagent --resource-group rg-insurance-ai-agent --policy @- << 'EOF'
{
  "rules": [
    {
      "name": "archiveOldDocuments",
      "type": "Lifecycle",
      "definition": {
        "filters": { "blobTypes": ["blockBlob"] },
        "actions": {
          "baseBlob": {
            "tierToCool": { "daysAfterModificationGreaterThan": 90 },
            "tierToArchive": { "daysAfterModificationGreaterThan": 365 }
          }
        }
      }
    }
  ]
}
EOF
Note

The JSONB fields (claims, populated_data, quote_details, locations, mailing_address) provide flexible schema for the varied data structures returned by different carriers and extracted from different document formats. The audit_log table is critical for compliance — every AI action must be logged. Consider enabling Azure PostgreSQL audit logging (pgAudit extension) for additional database-level auditing. Retention policies should align with the agency's document retention requirements (typically 7+ years for insurance records).

Step 5: Deploy n8n Workflow Automation Platform

Install and configure n8n as a systemd service on the Azure VM. n8n serves as the integration backbone connecting email monitoring, document processing dispatch, AMS API calls, LangGraph agent invocation, and notification workflows. Configure it with persistent PostgreSQL storage and set up the basic webhook and scheduling infrastructure.

bash
# Create n8n data directory
sudo mkdir -p /opt/n8n-data
sudo chown mspadmin:mspadmin /opt/n8n-data

# Create n8n environment file
cat > /opt/n8n-data/.env << 'EOF'
N8N_PORT=5678
N8N_PROTOCOL=https
N8N_HOST=<VM-FQDN-OR-IP>
WEBHOOK_URL=https://<VM-FQDN-OR-IP>:5678/
N8N_ENCRYPTION_KEY=<GENERATE-RANDOM-32-CHAR-KEY>
DB_TYPE=postgresdb
DB_POSTGRESDB_HOST=pgdb-insurance-agent.postgres.database.azure.com
DB_POSTGRESDB_PORT=5432
DB_POSTGRESDB_DATABASE=n8n
DB_POSTGRESDB_USER=pgadmin
DB_POSTGRESDB_PASSWORD=<PASSWORD>
DB_POSTGRESDB_SSL_REJECT_UNAUTHORIZED=false
GENERIC_TIMEZONE=America/New_York
N8N_BASIC_AUTH_ACTIVE=true
N8N_BASIC_AUTH_USER=mspadmin
N8N_BASIC_AUTH_PASSWORD=<STRONG-N8N-PASSWORD>
EOF

# Create n8n database
PGPASSWORD='<PASSWORD>' psql -h pgdb-insurance-agent.postgres.database.azure.com -U pgadmin -d postgres -c 'CREATE DATABASE n8n;'

# Create systemd service for n8n
sudo tee /etc/systemd/system/n8n.service << 'EOF'
[Unit]
Description=n8n Workflow Automation
After=network.target redis-server.service

[Service]
Type=simple
User=mspadmin
WorkingDirectory=/opt/n8n-data
EnvironmentFile=/opt/n8n-data/.env
ExecStart=/usr/bin/n8n start
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal

[Install]
WantedBy=multi-user.target
EOF

sudo systemctl daemon-reload
sudo systemctl enable n8n
sudo systemctl start n8n

# Verify n8n is running
sudo systemctl status n8n
curl -k https://localhost:5678/healthz
Note

For production, place n8n behind an Nginx reverse proxy with a proper TLS certificate (Let's Encrypt). The basic auth is a minimum — consider integrating with Azure AD for SSO. Generate the N8N_ENCRYPTION_KEY with: openssl rand -hex 16. The n8n instance will be configured with workflows in a subsequent step after the LangGraph agent is deployed. n8n Community Edition is AGPL-licensed and free for self-hosting.

Step 6: Deploy LangGraph Agent Core

Deploy the main LangGraph-based autonomous agent that orchestrates the entire submission workflow. This agent implements a state machine with nodes for: email monitoring → document classification → OCR/extraction → AMS data enrichment → ACORD form population → human review gate → multi-carrier submission → quote monitoring → comparison report generation. The agent is deployed as a FastAPI service that n8n invokes via HTTP.

Deploy FastAPI wrapper for the LangGraph submission agent and register it as a systemd service
bash
cd /opt/insurance-ai-agent/agent
source /opt/insurance-ai-agent/venv/bin/activate

# Create the main agent file
cat > /opt/insurance-ai-agent/agent/submission_agent.py << 'PYEOF'
# See custom_ai_components section for full implementation
# This is the deployment wrapper
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional, List
import uvicorn
from agent_graph import create_submission_graph, SubmissionState
from dotenv import load_dotenv
import logging

load_dotenv('/opt/insurance-ai-agent/.env')
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = FastAPI(title='Insurance Submission Agent API', version='1.0.0')
graph = create_submission_graph()

class SubmissionRequest(BaseModel):
    client_ams_id: str
    document_urls: List[str]
    target_carriers: Optional[List[str]] = None
    lines_of_business: List[str]
    priority: str = 'normal'

class ApprovalRequest(BaseModel):
    submission_id: str
    approved: bool
    reviewer_name: str
    notes: Optional[str] = None

@app.post('/api/v1/submissions/start')
async def start_submission(req: SubmissionRequest, background_tasks: BackgroundTasks):
    initial_state = SubmissionState(
        client_ams_id=req.client_ams_id,
        document_urls=req.document_urls,
        target_carriers=req.target_carriers or [],
        lines_of_business=req.lines_of_business,
        priority=req.priority
    )
    # Run agent graph asynchronously
    background_tasks.add_task(graph.ainvoke, initial_state)
    return {'status': 'started', 'message': 'Submission workflow initiated'}

@app.post('/api/v1/submissions/{submission_id}/approve')
async def approve_submission(submission_id: str, req: ApprovalRequest):
    # Resume the paused graph at the human review checkpoint
    # Implementation in agent_graph.py
    return {'status': 'approved' if req.approved else 'rejected'}

@app.get('/api/v1/submissions/{submission_id}/status')
async def get_status(submission_id: str):
    # Query database for submission status
    return {'submission_id': submission_id, 'status': 'in_progress'}

@app.get('/health')
async def health():
    return {'status': 'healthy'}

if __name__ == '__main__':
    uvicorn.run(app, host='0.0.0.0', port=8000)
PYEOF

# Create systemd service for the agent API
sudo tee /etc/systemd/system/insurance-agent.service << 'EOF'
[Unit]
Description=Insurance Submission AI Agent API
After=network.target redis-server.service postgresql.service

[Service]
Type=simple
User=mspadmin
WorkingDirectory=/opt/insurance-ai-agent
EnvironmentFile=/opt/insurance-ai-agent/.env
ExecStart=/opt/insurance-ai-agent/venv/bin/uvicorn agent.submission_agent:app --host 0.0.0.0 --port 8000 --workers 2
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal

[Install]
WantedBy=multi-user.target
EOF

sudo systemctl daemon-reload
sudo systemctl enable insurance-agent
sudo systemctl start insurance-agent

# Verify agent is running
curl http://localhost:8000/health
Note

The full agent_graph.py implementation is provided in the custom_ai_components section. This step deploys the FastAPI wrapper that exposes the agent as an HTTP service for n8n to call. The agent runs with 2 Uvicorn workers to handle concurrent submissions. For agencies processing >50 submissions/day, increase workers to 4 and upgrade the VM to Standard_D8s_v5. The human-in-the-loop approval endpoint is critical — no submission goes to carriers without explicit human approval.

Step 7: Configure Document Intelligence Pipeline

Set up the Azure AI Document Intelligence pipeline for OCR and initial document structure extraction. Configure custom extraction models for the most common loss run formats and ACORD form types the agency encounters. This pipeline is called by the LangGraph agent during the extraction phase.

bash
cd /opt/insurance-ai-agent
source venv/bin/activate

# Create the document intelligence module
cat > /opt/insurance-ai-agent/agent/doc_intelligence.py << 'PYEOF'
import os
from azure.ai.documentintelligence import DocumentIntelligenceClient
from azure.ai.documentintelligence.models import AnalyzeDocumentRequest, DocumentAnalysisFeature
from azure.core.credentials import AzureKeyCredential
from azure.storage.blob import BlobServiceClient
import json
import logging

logger = logging.getLogger(__name__)

class DocumentProcessor:
    def __init__(self):
        self.doc_client = DocumentIntelligenceClient(
            endpoint=os.environ['AZURE_DOC_INTELLIGENCE_ENDPOINT'],
            credential=AzureKeyCredential(os.environ['AZURE_DOC_INTELLIGENCE_KEY'])
        )
        self.blob_client = BlobServiceClient.from_connection_string(
            os.environ['AZURE_STORAGE_CONNECTION_STRING']
        )
    
    def analyze_document(self, document_url: str = None, document_bytes: bytes = None) -> dict:
        """Run Azure Document Intelligence on a document and return structured output."""
        if document_url:
            poller = self.doc_client.begin_analyze_document(
                model_id='prebuilt-layout',
                analyze_request=AnalyzeDocumentRequest(url_source=document_url),
                features=[DocumentAnalysisFeature.KEY_VALUE_PAIRS]
            )
        elif document_bytes:
            poller = self.doc_client.begin_analyze_document(
                model_id='prebuilt-layout',
                body=document_bytes,
                content_type='application/pdf',
                features=[DocumentAnalysisFeature.KEY_VALUE_PAIRS]
            )
        else:
            raise ValueError('Must provide document_url or document_bytes')
        
        result = poller.result()
        
        extracted = {
            'content': result.content,
            'pages': [],
            'tables': [],
            'key_value_pairs': []
        }
        
        for page in result.pages:
            page_data = {
                'page_number': page.page_number,
                'width': page.width,
                'height': page.height,
                'lines': [{'content': line.content, 'confidence': line.spans[0].offset if line.spans else 0} for line in (page.lines or [])]
            }
            extracted['pages'].append(page_data)
        
        for table in (result.tables or []):
            table_data = {
                'row_count': table.row_count,
                'column_count': table.column_count,
                'cells': [{'row': cell.row_index, 'col': cell.column_index, 'content': cell.content, 'kind': cell.kind} for cell in table.cells]
            }
            extracted['tables'].append(table_data)
        
        for kvp in (result.key_value_pairs or []):
            if kvp.key and kvp.value:
                extracted['key_value_pairs'].append({
                    'key': kvp.key.content,
                    'value': kvp.value.content,
                    'confidence': kvp.confidence
                })
        
        return extracted
    
    def upload_document(self, container: str, blob_name: str, data: bytes) -> str:
        """Upload a document to Azure Blob Storage and return the URL."""
        blob_client = self.blob_client.get_blob_client(container=container, blob=blob_name)
        blob_client.upload_blob(data, overwrite=True)
        return blob_client.url
    
    def classify_document(self, text_content: str) -> str:
        """Classify document type based on extracted text content."""
        text_lower = text_content[:2000].lower()
        if any(kw in text_lower for kw in ['loss run', 'loss history', 'claims history', 'loss experience']):
            return 'loss_run'
        elif any(kw in text_lower for kw in ['acord 125', 'commercial insurance application']):
            return 'acord_125'
        elif any(kw in text_lower for kw in ['acord 126', 'general liability']):
            return 'acord_126'
        elif any(kw in text_lower for kw in ['acord 130', 'workers compensation']):
            return 'acord_130'
        elif any(kw in text_lower for kw in ['acord 140', 'property section']):
            return 'acord_140'
        elif any(kw in text_lower for kw in ['supplemental', 'questionnaire']):
            return 'supplemental_app'
        elif any(kw in text_lower for kw in ['quote', 'proposal', 'indication']):
            return 'carrier_quote'
        else:
            return 'unknown'

PYEOF

echo 'Document intelligence module deployed successfully'
Note

The prebuilt-layout model from Azure AI Document Intelligence handles most document types well. For agencies with highly specialized loss run formats (e.g., from niche carriers), consider training a custom extraction model using Azure AI Document Intelligence Studio (https://formrecognizer.appliedai.azure.com/studio). This requires 5+ labeled sample documents per format. The classification function uses keyword matching as a fast first pass — the LLM-based agent refines classification when keywords are ambiguous.

Step 8: Configure n8n Integration Workflows

Create the core n8n workflows that connect all system components: (1) Email monitoring workflow that checks the shared mailbox every 5 minutes for new documents, (2) Document processing dispatch workflow that routes documents through the AI pipeline, (3) Human approval notification workflow that sends Teams/email alerts when submissions are ready for review, and (4) Carrier quote monitoring workflow that checks for returned quotes.

  • Access n8n at https://<VM-IP>:5678 and login with the basic auth credentials configured earlier
  • WORKFLOW 1: Email Monitor — Runs on a 5-minute schedule: Trigger: Schedule (every 5 min) → Microsoft Graph: Get emails from ai-submissions@agency.com with hasAttachments=true and isRead=false → Loop: For each email → Microsoft Graph: Get attachments → Filter: Only PDF/DOCX/XLSX attachments → HTTP Request: Upload to Azure Blob Storage (loss-runs container) → HTTP Request: POST to Agent API /api/v1/documents/process → Microsoft Graph: Mark email as read → Slack/Teams: Notify 'New document received: {filename}'
  • WORKFLOW 2: Submission Orchestration — Triggered by webhook from Agent API: Trigger: Webhook POST /webhook/submission-ready → HTTP Request: GET Agent API /api/v1/submissions/{id}/status → IF status == 'awaiting_review': Microsoft Teams: Send adaptive card to #submissions channel with review details + Email: Send notification to assigned CSR → ELSE IF status == 'approved': HTTP Request: POST Agent API /api/v1/submissions/{id}/submit → ELSE IF status == 'completed': HTTP Request: GET quotes comparison + Email: Send quote comparison to producer
  • WORKFLOW 3: Quote Monitor — Runs every 30 minutes during business hours: Trigger: Schedule (every 30 min, Mon-Fri 8am-6pm) → Database: Query carrier_submissions WHERE status = 'submitted' AND response_received_at IS NULL → Loop: For each pending submission → HTTP Request: Check Tarmika API for quote status → HTTP Request: Check Semsee API for quote status → IF quote received: Database: Update carrier_submissions with quote details + HTTP Request: Notify Agent API of quote received
  • Configure n8n credentials: (1) Microsoft Graph OAuth2 — Use the App Registration from Step 3, (2) HTTP Header Auth for Agent API — Bearer token, (3) PostgreSQL — Direct database connection, (4) Azure Blob Storage — Connection string
Verify n8n access and locate workflow JSON templates
bash
echo 'Configure n8n workflows via the web UI at https://<VM-IP>:5678'
echo 'Import workflow JSON templates from /opt/insurance-ai-agent/config/n8n-workflows/'
Note

The n8n workflows are best built in the visual editor. The commands above describe the logical flow — actual implementation is drag-and-drop in the n8n UI. Export workflows as JSON and store in the Git repository for version control and disaster recovery. The Microsoft Graph credential in n8n should use the OAuth2 flow with the app registration from Step 3. Set up n8n error workflows that send alerts to the MSP monitoring system when any workflow fails. Consider using n8n's built-in error handling nodes with retry logic for transient API failures.

Step 9: Configure AMS API Integration

Set up the API connection to the agency's AMS (Applied Epic, Vertafore AMS360, or HawkSoft). This integration allows the AI agent to pull client data for pre-populating applications and write back submission records and carrier responses. The specific implementation varies by AMS platform.

Deploy AMS integration module supporting Applied Epic, Vertafore AMS360, and HawkSoft
bash
cd /opt/insurance-ai-agent
source venv/bin/activate

# Create AMS integration module (Applied Epic example)
cat > /opt/insurance-ai-agent/agent/ams_integration.py << 'PYEOF'
import os
import httpx
import logging
from typing import Optional, Dict, Any

logger = logging.getLogger(__name__)

class AMSClient:
    """
    Unified AMS client that abstracts the specific AMS API.
    Currently supports: Applied Epic, Vertafore AMS360, HawkSoft.
    Set AMS_PLATFORM env var to: 'epic', 'ams360', or 'hawksoft'
    """
    
    def __init__(self):
        self.platform = os.environ.get('AMS_PLATFORM', 'epic')
        self.base_url = os.environ['AMS_API_BASE_URL']
        self.api_key = os.environ['AMS_API_KEY']
        self.client = httpx.AsyncClient(
            base_url=self.base_url,
            headers=self._get_headers(),
            timeout=30.0
        )
    
    def _get_headers(self) -> dict:
        if self.platform == 'epic':
            return {
                'Authorization': f'Bearer {self.api_key}',
                'Content-Type': 'application/json',
                'Accept': 'application/json'
            }
        elif self.platform == 'ams360':
            return {
                'Authorization': f'Basic {self.api_key}',
                'Content-Type': 'application/json'
            }
        elif self.platform == 'hawksoft':
            return {
                'X-API-Key': self.api_key,
                'Content-Type': 'application/json'
            }
        return {}
    
    async def get_client(self, client_id: str) -> Dict[str, Any]:
        """Retrieve client/account details from AMS."""
        if self.platform == 'epic':
            response = await self.client.get(f'/api/v1/customers/{client_id}')
        elif self.platform == 'ams360':
            response = await self.client.get(f'/api/customers/{client_id}')
        elif self.platform == 'hawksoft':
            response = await self.client.get(f'/api/v4/contacts/{client_id}')
        
        response.raise_for_status()
        raw = response.json()
        return self._normalize_client_data(raw)
    
    async def get_policies(self, client_id: str) -> list:
        """Retrieve active policies for a client."""
        if self.platform == 'epic':
            response = await self.client.get(f'/api/v1/customers/{client_id}/policies')
        elif self.platform == 'ams360':
            response = await self.client.get(f'/api/customers/{client_id}/policies')
        elif self.platform == 'hawksoft':
            response = await self.client.get(f'/api/v4/contacts/{client_id}/policies')
        
        response.raise_for_status()
        return response.json()
    
    async def create_activity(self, client_id: str, activity: dict) -> dict:
        """Create an activity/note in the AMS for audit trail."""
        if self.platform == 'epic':
            response = await self.client.post(
                f'/api/v1/customers/{client_id}/activities',
                json=activity
            )
        elif self.platform == 'ams360':
            response = await self.client.post(
                f'/api/customers/{client_id}/activities',
                json=activity
            )
        elif self.platform == 'hawksoft':
            response = await self.client.post(
                f'/api/v4/contacts/{client_id}/notes',
                json=activity
            )
        response.raise_for_status()
        return response.json()
    
    def _normalize_client_data(self, raw: dict) -> dict:
        """Normalize AMS-specific data format to our standard schema."""
        if self.platform == 'epic':
            return {
                'ams_client_id': raw.get('customerId', ''),
                'business_name': raw.get('name', ''),
                'dba': raw.get('dba', ''),
                'fein': raw.get('fein', ''),
                'entity_type': raw.get('entityType', ''),
                'sic_code': raw.get('sicCode', ''),
                'naics_code': raw.get('naicsCode', ''),
                'primary_contact': {
                    'name': raw.get('primaryContact', {}).get('name', ''),
                    'email': raw.get('primaryContact', {}).get('email', ''),
                    'phone': raw.get('primaryContact', {}).get('phone', '')
                },
                'mailing_address': raw.get('mailingAddress', {}),
                'locations': raw.get('locations', [])
            }
        # Add normalization for ams360 and hawksoft
        return raw

PYEOF

echo 'AMS integration module deployed. Set AMS_PLATFORM in .env to match client AMS.'
Note

AMS API access requires vendor approval. Applied Epic uses the Applied SDK — register at developer.appliedsystems.com. Vertafore AMS360 API access is obtained through your Vertafore account team. HawkSoft provides Open API documentation at developers.hawksoft.com. Each AMS has different authentication mechanisms (OAuth2, API key, Basic Auth) — the module abstracts these differences. Always test API connectivity with read-only operations before enabling write-back. The _normalize_client_data method must be expanded with the full field mapping during discovery when you have access to the actual AMS API responses.

Step 10: Configure Carrier Submission Integration

Set up the carrier submission layer that connects to Tarmika Bridge and Semsee APIs for multi-carrier quote submission. Also configure Playwright-based RPA for carriers that don't support API submission through these platforms. Map each of the agency's appointed carriers to the appropriate submission method.

bash
cd /opt/insurance-ai-agent
source venv/bin/activate

# Create carrier submission module
cat > /opt/insurance-ai-agent/agent/carrier_submit.py << 'PYEOF'
import os
import httpx
import json
import logging
from typing import Dict, List, Optional, Any
from playwright.async_api import async_playwright

logger = logging.getLogger(__name__)

class CarrierSubmissionEngine:
    def __init__(self):
        self.tarmika_url = os.environ.get('TARMIKA_API_URL', '')
        self.tarmika_key = os.environ.get('TARMIKA_API_KEY', '')
        self.semsee_url = os.environ.get('SEMSEE_API_URL', '')
        self.semsee_key = os.environ.get('SEMSEE_API_KEY', '')
        self.carrier_config = self._load_carrier_config()
    
    def _load_carrier_config(self) -> dict:
        config_path = '/opt/insurance-ai-agent/config/carrier_mapping.json'
        try:
            with open(config_path) as f:
                return json.load(f)
        except FileNotFoundError:
            logger.warning(f'Carrier config not found at {config_path}, using defaults')
            return {}
    
    async def submit_to_tarmika(self, submission_data: dict) -> dict:
        """Submit to carriers via Tarmika Bridge API."""
        async with httpx.AsyncClient() as client:
            response = await client.post(
                f'{self.tarmika_url}/api/v1/submissions',
                headers={
                    'Authorization': f'Bearer {self.tarmika_key}',
                    'Content-Type': 'application/json'
                },
                json={
                    'applicant': submission_data.get('applicant', {}),
                    'business_info': submission_data.get('business_info', {}),
                    'coverage_requested': submission_data.get('coverage', {}),
                    'loss_history': submission_data.get('loss_history', {}),
                    'target_carriers': submission_data.get('carriers', [])
                },
                timeout=60.0
            )
            response.raise_for_status()
            return response.json()
    
    async def submit_to_semsee(self, submission_data: dict) -> dict:
        """Submit to carriers via Semsee API."""
        async with httpx.AsyncClient() as client:
            response = await client.post(
                f'{self.semsee_url}/api/submissions',
                headers={
                    'Authorization': f'Bearer {self.semsee_key}',
                    'Content-Type': 'application/json'
                },
                json=submission_data,
                timeout=60.0
            )
            response.raise_for_status()
            return response.json()
    
    async def submit_via_rpa(self, carrier_name: str, submission_data: dict) -> dict:
        """Submit to carrier portal via browser automation (Playwright)."""
        carrier_cfg = self.carrier_config.get(carrier_name, {})
        portal_url = carrier_cfg.get('portal_url', '')
        username = carrier_cfg.get('username', '')
        password = carrier_cfg.get('password', '')
        
        if not portal_url:
            raise ValueError(f'No portal configuration for carrier: {carrier_name}')
        
        async with async_playwright() as p:
            browser = await p.chromium.launch(headless=True)
            context = await browser.new_context()
            page = await context.new_page()
            
            try:
                # Login
                await page.goto(portal_url)
                await page.fill(carrier_cfg.get('username_selector', '#username'), username)
                await page.fill(carrier_cfg.get('password_selector', '#password'), password)
                await page.click(carrier_cfg.get('login_button_selector', 'button[type=submit]'))
                await page.wait_for_load_state('networkidle')
                
                # Navigate to new submission
                await page.goto(carrier_cfg.get('new_submission_url', ''))
                
                # Fill form fields from mapping
                for field_map in carrier_cfg.get('field_mappings', []):
                    selector = field_map['selector']
                    data_key = field_map['data_key']
                    value = self._get_nested_value(submission_data, data_key)
                    if value:
                        field_type = field_map.get('type', 'text')
                        if field_type == 'text':
                            await page.fill(selector, str(value))
                        elif field_type == 'select':
                            await page.select_option(selector, str(value))
                        elif field_type == 'checkbox':
                            if value:
                                await page.check(selector)
                
                # Upload loss run documents
                for upload_map in carrier_cfg.get('upload_mappings', []):
                    upload_selector = upload_map['selector']
                    file_key = upload_map['file_key']
                    file_path = submission_data.get(file_key)
                    if file_path:
                        await page.set_input_files(upload_selector, file_path)
                
                # Screenshot before submission for audit
                screenshot = await page.screenshot(full_page=True)
                
                # Submit (if auto_submit is enabled)
                if carrier_cfg.get('auto_submit', False):
                    await page.click(carrier_cfg.get('submit_button_selector', ''))
                    await page.wait_for_load_state('networkidle')
                    confirmation = await page.text_content(carrier_cfg.get('confirmation_selector', 'body'))
                else:
                    confirmation = 'MANUAL_SUBMIT_REQUIRED'
                
                return {
                    'status': 'submitted' if carrier_cfg.get('auto_submit') else 'form_filled',
                    'confirmation': confirmation,
                    'screenshot': screenshot
                }
            finally:
                await browser.close()
    
    def _get_nested_value(self, data: dict, key_path: str):
        keys = key_path.split('.')
        value = data
        for k in keys:
            if isinstance(value, dict):
                value = value.get(k)
            else:
                return None
        return value
    
    async def submit(self, carrier_name: str, submission_data: dict, method: str = 'auto') -> dict:
        """Route submission to the appropriate method."""
        if method == 'auto':
            carrier_cfg = self.carrier_config.get(carrier_name, {})
            method = carrier_cfg.get('preferred_method', 'tarmika')
        
        if method == 'tarmika':
            return await self.submit_to_tarmika(submission_data)
        elif method == 'semsee':
            return await self.submit_to_semsee(submission_data)
        elif method == 'rpa':
            return await self.submit_via_rpa(carrier_name, submission_data)
        else:
            raise ValueError(f'Unknown submission method: {method}')

PYEOF

# Create carrier mapping configuration template
cat > /opt/insurance-ai-agent/config/carrier_mapping.json << 'EOF'
{
  "_comment": "Populate this file during discovery with agency-specific carrier details",
  "Hartford": {
    "preferred_method": "tarmika",
    "tarmika_carrier_id": "HARTFORD",
    "lines": ["BOP", "WC", "GL", "CA"]
  },
  "Travelers": {
    "preferred_method": "tarmika",
    "tarmika_carrier_id": "TRAVELERS",
    "lines": ["BOP", "WC", "GL", "PROP", "CA"]
  },
  "AmTrust": {
    "preferred_method": "semsee",
    "semsee_carrier_id": "AMTRUST",
    "lines": ["WC", "GL", "BOP"]
  },
  "CNA": {
    "preferred_method": "rpa",
    "portal_url": "https://www.cna.com/web/guest/agentcenter",
    "username": "AGENT_USERNAME",
    "password": "ENCRYPTED_PASSWORD",
    "username_selector": "#userId",
    "password_selector": "#password",
    "login_button_selector": "#loginBtn",
    "new_submission_url": "https://www.cna.com/web/guest/agentcenter/new-business",
    "auto_submit": false,
    "field_mappings": [
      {"selector": "#insuredName", "data_key": "applicant.business_name", "type": "text"},
      {"selector": "#fein", "data_key": "applicant.fein", "type": "text"}
    ],
    "upload_mappings": [
      {"selector": "#lossRunUpload", "file_key": "loss_run_file_path"}
    ],
    "lines": ["GL", "PROP", "UMBR"]
  }
}
EOF

echo 'Carrier submission engine deployed. Update carrier_mapping.json with agency-specific carrier details.'
Note

The carrier mapping JSON must be customized per agency during the discovery phase. Carrier portal selectors (CSS selectors for RPA) are fragile and will break when carriers update their portals — this is the #1 ongoing maintenance burden. Budget 2–4 hours/month for carrier portal selector updates. For the initial deployment, start with Tarmika/Semsee API submissions only and add RPA for additional carriers in Phase 2. Store carrier portal credentials in Azure Key Vault, not in the JSON file — the example above is for illustration. Set auto_submit to false for RPA carriers during the initial deployment period; the agent fills the form and an agency staff member clicks Submit.

Step 11: Install and Configure Document Scanners

Install the Ricoh fi-8170 and ScanSnap iX1600 scanners at the agency office(s). Configure scanning profiles that automatically route scanned documents to the AI processing pipeline via SharePoint or a monitored network folder.

Ricoh fi-8170 and ScanSnap iX1600 setup and scanning profile configuration
bash
# Ricoh fi-8170 Setup:
# 1. Connect via Gigabit Ethernet to agency LAN
# 2. Access scanner web UI at assigned IP to configure network settings
# 3. Install PaperStream IP (TWAIN) driver on admin workstation: https://www.pfu.ricoh.com/global/scanners/fi/dl/
# 4. Install PaperStream Capture for batch scanning profiles
# 5. Configure scan profile 'Loss Runs':
#    - Color mode: Auto
#    - Resolution: 300 DPI
#    - File format: Searchable PDF (PDF/A)
#    - Destination: SharePoint > AI-Processed-Submissions > loss-runs
#    - Naming: LossRun_{date}_{time}_{counter}.pdf
# 6. Configure scan profile 'Applications':
#    - Same settings as above
#    - Destination: SharePoint > AI-Processed-Submissions > applications

# ScanSnap iX1600 Setup:
# 1. Download ScanSnap Home from https://www.pfu.ricoh.com/global/scanners/scansnap/dl/
# 2. Connect scanner to Wi-Fi network during initial setup wizard
# 3. Create profiles on the scanner touchscreen:
#    - Profile 1: 'Loss Runs' -> Scan to SharePoint (configure SharePoint connection)
#    - Profile 2: 'Applications' -> Scan to SharePoint
#    - Profile 3: 'General Docs' -> Scan to Email (forward to ai-submissions@agency.com)
# 4. Set default scan settings:
#    - Quality: Better (300 DPI)
#    - Color mode: Auto
#    - File format: PDF (Searchable)
#    - Blank page removal: ON
#    - Multi-feed detection: ON

echo 'Scanner installation complete. Test with sample documents before going live.'
Note

Always scan at 300 DPI for optimal OCR accuracy — 200 DPI is acceptable but degrades extraction quality on fine print. Enable 'Searchable PDF' mode on both scanners, which runs a local OCR pass that gives Azure Document Intelligence a head start. The ScanSnap iX1600 can scan directly to SharePoint via its cloud connection feature (configured in ScanSnap Home). The Ricoh fi-8170 integrates with SharePoint through PaperStream Capture's output connectors. Test scanning with a sample loss run document and verify the file appears in the correct SharePoint library within 60 seconds.

Step 12: Deploy Review Dashboard and Human-in-the-Loop Interface

Deploy a lightweight web dashboard that agency staff use to review AI-extracted data, approve or reject submissions before they go to carriers, and monitor the status of all active submissions. This is built as a simple FastAPI + HTML/JS application served from the existing agent VM.

bash
cd /opt/insurance-ai-agent
source venv/bin/activate

mkdir -p /opt/insurance-ai-agent/dashboard/templates
mkdir -p /opt/insurance-ai-agent/dashboard/static

# Create dashboard FastAPI app
cat > /opt/insurance-ai-agent/dashboard/app.py << 'PYEOF'
from fastapi import FastAPI, Request
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse
import httpx
import os

app = FastAPI(title='Insurance AI Submission Dashboard')
app.mount('/static', StaticFiles(directory='/opt/insurance-ai-agent/dashboard/static'), name='static')
templates = Jinja2Templates(directory='/opt/insurance-ai-agent/dashboard/templates')
AGENT_API = 'http://localhost:8000'

@app.get('/', response_class=HTMLResponse)
async def dashboard(request: Request):
    async with httpx.AsyncClient() as client:
        subs = await client.get(f'{AGENT_API}/api/v1/submissions')
    return templates.TemplateResponse('dashboard.html', {
        'request': request,
        'submissions': subs.json() if subs.status_code == 200 else []
    })

@app.get('/submission/{submission_id}', response_class=HTMLResponse)
async def submission_detail(request: Request, submission_id: str):
    async with httpx.AsyncClient() as client:
        sub = await client.get(f'{AGENT_API}/api/v1/submissions/{submission_id}')
    return templates.TemplateResponse('submission_detail.html', {
        'request': request,
        'submission': sub.json() if sub.status_code == 200 else {}
    })

@app.post('/submission/{submission_id}/approve')
async def approve(submission_id: str, request: Request):
    form_data = await request.form()
    async with httpx.AsyncClient() as client:
        resp = await client.post(f'{AGENT_API}/api/v1/submissions/{submission_id}/approve', json={
            'submission_id': submission_id,
            'approved': True,
            'reviewer_name': form_data.get('reviewer_name', 'Unknown'),
            'notes': form_data.get('notes', '')
        })
    return {'status': 'approved'}

PYEOF

# Add dashboard to systemd (on port 8080)
sudo tee /etc/systemd/system/insurance-dashboard.service << 'EOF'
[Unit]
Description=Insurance AI Dashboard
After=insurance-agent.service

[Service]
Type=simple
User=mspadmin
WorkingDirectory=/opt/insurance-ai-agent/dashboard
EnvironmentFile=/opt/insurance-ai-agent/.env
ExecStart=/opt/insurance-ai-agent/venv/bin/uvicorn app:app --host 0.0.0.0 --port 8080
Restart=always
RestartSec=10

[Install]
WantedBy=multi-user.target
EOF

sudo systemctl daemon-reload
sudo systemctl enable insurance-dashboard
sudo systemctl start insurance-dashboard

echo 'Dashboard deployed at https://<VM-IP>:8080'
Note

The dashboard HTML templates (dashboard.html, submission_detail.html) should be developed during the build phase with the agency's branding. For production, place behind Nginx with TLS and Azure AD SSO authentication (using MSAL.js). The dashboard is intentionally simple — complex analytics can be added later. The critical feature is the approve/reject button that gates every carrier submission. Consider using Microsoft Power Apps or Retool as alternatives if the agency prefers a more polished UI with less custom development.

Step 13: End-to-End Integration Testing

Conduct comprehensive testing of the entire pipeline using real sample documents from the agency. Test each path: email ingestion → OCR → data extraction → AMS lookup → form population → human review → carrier submission. Validate accuracy, error handling, and audit logging at each stage.

bash
cd /opt/insurance-ai-agent
source venv/bin/activate

# Run unit tests
python -m pytest tests/ -v

# Run integration test: Document processing pipeline
python -c "
import asyncio
from agent.doc_intelligence import DocumentProcessor
dp = DocumentProcessor()
result = dp.analyze_document(document_url='https://stinsuranceagent.blob.core.windows.net/loss-runs/test_loss_run.pdf')
print(f'Pages extracted: {len(result["pages"])}')
print(f'Tables found: {len(result["tables"])}')
print(f'Key-value pairs: {len(result["key_value_pairs"])}')
doc_type = dp.classify_document(result['content'])
print(f'Document type: {doc_type}')
"

# Run integration test: AMS connectivity
python -c "
import asyncio
from agent.ams_integration import AMSClient
ams = AMSClient()
client = asyncio.run(ams.get_client('TEST-CLIENT-ID'))
print(f'Client retrieved: {client["business_name"]}')
"

# Run integration test: Full agent workflow (dry run)
curl -X POST http://localhost:8000/api/v1/submissions/start \
  -H 'Content-Type: application/json' \
  -d '{
    "client_ams_id": "TEST-CLIENT-001",
    "document_urls": ["https://stinsuranceagent.blob.core.windows.net/loss-runs/test_loss_run.pdf"],
    "lines_of_business": ["GL", "PROP"],
    "target_carriers": ["Hartford", "Travelers"],
    "priority": "normal"
  }'

# Monitor agent execution in LangSmith
echo 'Check LangSmith dashboard at https://smith.langchain.com for trace details'

# Verify database records
PGPASSWORD='<PASSWORD>' psql -h pgdb-insurance-agent.postgres.database.azure.com -U pgadmin -d insuranceagent -c 'SELECT id, status, created_at FROM submissions ORDER BY created_at DESC LIMIT 5;'

# Verify audit logs
PGPASSWORD='<PASSWORD>' psql -h pgdb-insurance-agent.postgres.database.azure.com -U pgadmin -d insuranceagent -c 'SELECT event_type, actor, timestamp FROM audit_log ORDER BY timestamp DESC LIMIT 20;'
Note

Testing must use real agency documents (loss runs, ACORD apps) from the sample set collected during prerequisites. Never test with production carrier submissions — use Tarmika/Semsee sandbox/test environments if available, or set auto_submit=false for all carriers during testing. Document all extraction accuracy metrics: target ≥95% field-level accuracy for structured forms (ACORD), ≥90% for unstructured loss runs. Any field below confidence threshold must trigger human review flag. Testing should include edge cases: multi-page loss runs, handwritten ACORD forms, poorly scanned documents, and loss runs from less common carriers.

Step 14: Production Cutover and Go-Live

After successful testing, transition to production. Enable live email monitoring, switch carrier submission from test mode to production, conduct final UAT with agency staff, and activate monitoring and alerting. Implement a parallel-run period where AI-processed submissions are verified against manual submissions before full autonomous operation.

Production cutover: enable monitoring, configure Azure alerts, and start parallel run period
bash
# 1. Switch to production mode
cd /opt/insurance-ai-agent

# Update .env with production API keys (replace test/sandbox keys)
# Verify all carrier API endpoints point to production

# 2. Enable email monitoring workflow in n8n
# In n8n UI: Activate 'Email Monitor' workflow
# In n8n UI: Activate 'Quote Monitor' workflow

# 3. Configure monitoring alerts
# Set up Azure Monitor alerts for the VM:
az monitor metrics alert create \
  --name 'agent-vm-cpu-high' \
  --resource-group rg-insurance-ai-agent \
  --scopes /subscriptions/<SUB>/resourceGroups/rg-insurance-ai-agent/providers/Microsoft.Compute/virtualMachines/vm-agent-runtime \
  --condition 'avg Percentage CPU > 85' \
  --window-size 10m \
  --action-group <MSP-ALERT-ACTION-GROUP>

az monitor metrics alert create \
  --name 'agent-vm-disk-high' \
  --resource-group rg-insurance-ai-agent \
  --scopes /subscriptions/<SUB>/resourceGroups/rg-insurance-ai-agent/providers/Microsoft.Compute/virtualMachines/vm-agent-runtime \
  --condition 'avg OS Disk Queue Depth > 10' \
  --window-size 5m \
  --action-group <MSP-ALERT-ACTION-GROUP>

# 4. Set up LangSmith alerts for agent failures
# In LangSmith dashboard: Configure webhook alerts for:
#   - Run failure rate > 5%
#   - Average latency > 120 seconds
#   - Token usage > daily budget threshold

# 5. Start parallel run period (2 weeks)
echo 'PARALLEL_RUN_MODE=true' >> /opt/insurance-ai-agent/.env
echo 'In parallel run mode: AI submissions require manual verification before carrier submission'

# 6. After 2-week parallel run with >95% accuracy:
# echo 'PARALLEL_RUN_MODE=false' >> /opt/insurance-ai-agent/.env
# sudo systemctl restart insurance-agent

echo 'Production cutover complete. Monitoring active. Parallel run period: 2 weeks.'
Note

The parallel run period is critical for building agency confidence and catching edge cases. During this period, every AI submission is also manually processed by a CSR, and results are compared. Track: extraction accuracy, form population correctness, carrier acceptance rate, and time savings. Do not disable parallel mode until accuracy consistently exceeds 95% and the agency principal has signed off. Schedule daily check-in calls with the agency during the first week of go-live, then move to weekly during weeks 2–4.

Custom AI Components

Document Extraction Agent

Type: agent LangGraph-based agent node that takes OCR output from Azure Document Intelligence and uses GPT-4.1-mini to extract structured insurance data from loss runs and ACORD applications. Handles diverse document formats from different carriers, assigns confidence scores to each extracted field, and flags low-confidence fields for human review. This is the core intelligence of the system — it converts unstructured PDF documents into structured data that can populate carrier submission forms.

Implementation:

/opt/insurance-ai-agent/agent/extraction_agent.py
python
# /opt/insurance-ai-agent/agent/extraction_agent.py

import json
import logging
from typing import Dict, Any, List, Optional
from langchain_openai import AzureChatOpenAI
from pydantic import BaseModel, Field

logger = logging.getLogger(__name__)

# --- Pydantic models for structured extraction ---

class ClaimRecord(BaseModel):
    claim_number: Optional[str] = None
    date_of_loss: Optional[str] = None
    claimant_name: Optional[str] = None
    claim_type: Optional[str] = None
    claim_status: str = 'unknown'  # open, closed, reserved
    total_incurred: Optional[float] = None
    total_paid: Optional[float] = None
    total_reserved: Optional[float] = None
    description: Optional[str] = None
    confidence: float = 0.0

class LossRunExtraction(BaseModel):
    carrier_name: Optional[str] = None
    policy_number: Optional[str] = None
    named_insured: Optional[str] = None
    policy_type: Optional[str] = None  # GL, WC, PROP, AUTO, BOP, UMBR
    policy_effective_date: Optional[str] = None
    policy_expiration_date: Optional[str] = None
    report_date: Optional[str] = None
    valuation_date: Optional[str] = None
    claims: List[ClaimRecord] = []
    total_incurred: Optional[float] = None
    total_paid: Optional[float] = None
    total_reserved: Optional[float] = None
    loss_ratio: Optional[float] = None
    experience_mod: Optional[float] = None
    overall_confidence: float = 0.0
    extraction_notes: List[str] = []

class ACORDFieldExtraction(BaseModel):
    form_type: str = ''  # ACORD 125, 126, 130, 140, etc.
    applicant_name: Optional[str] = None
    dba: Optional[str] = None
    mailing_address: Optional[Dict[str, str]] = None
    fein: Optional[str] = None
    sic_code: Optional[str] = None
    naics_code: Optional[str] = None
    entity_type: Optional[str] = None
    business_description: Optional[str] = None
    years_in_business: Optional[int] = None
    annual_revenue: Optional[float] = None
    num_employees: Optional[int] = None
    requested_effective_date: Optional[str] = None
    requested_expiration_date: Optional[str] = None
    coverage_details: Dict[str, Any] = {}
    locations: List[Dict[str, Any]] = []
    additional_insureds: List[Dict[str, str]] = []
    prior_carrier: Optional[str] = None
    prior_policy_number: Optional[str] = None
    prior_premium: Optional[float] = None
    overall_confidence: float = 0.0
    low_confidence_fields: List[str] = []

# --- Extraction prompts ---

LOSS_RUN_EXTRACTION_PROMPT = """You are an expert insurance data extraction specialist. 
Analyze the following document text extracted from a loss run report and extract all 
structured data.

IMPORTANT RULES:
1. Extract EVERY claim listed, even if some fields are missing.
2. For monetary amounts, extract as numbers without currency symbols.
3. For dates, use YYYY-MM-DD format.
4. If a field is unclear or potentially incorrect, set its confidence lower.
5. Calculate overall_confidence as the average confidence across all fields.
6. Add notes about any ambiguities or data quality issues in extraction_notes.
7. A loss ratio = total incurred / total premium (if premium is shown).
8. For Workers Comp loss runs, look for Experience Modification Rate (EMR/e-mod).

DOCUMENT TEXT:
{document_text}

TABLE DATA (if available):
{table_data}

KEY-VALUE PAIRS (if available):
{kv_pairs}

Extract the data as a JSON object matching this exact schema:
{schema}
"""

ACORD_EXTRACTION_PROMPT = """You are an expert insurance forms specialist familiar with 
all ACORD form types. Analyze the following document text extracted from an ACORD 
application form and extract all structured data.

IMPORTANT RULES:
1. Identify the ACORD form type (125, 126, 130, 140, etc.) from the header.
2. Extract ALL filled fields, paying special attention to:
   - Named insured details (name, address, FEIN, entity type)
   - Business description and classification codes (SIC, NAICS)
   - Coverage requested (limits, deductibles, endorsements)
   - Locations and their details
   - Prior insurance information
3. For any field where the handwriting is unclear or the value seems 
   potentially incorrect, add the field name to low_confidence_fields.
4. Calculate overall_confidence as the percentage of fields successfully 
   extracted with high confidence.

DOCUMENT TEXT:
{document_text}

TABLE DATA (if available):
{table_data}

KEY-VALUE PAIRS (if available):
{kv_pairs}

Extract the data as a JSON object matching this exact schema:
{schema}
"""


class ExtractionAgent:
    def __init__(self):
        # Use GPT-4.1-mini for standard extractions (fast + cheap)
        self.llm_fast = AzureChatOpenAI(
            azure_deployment='gpt-4-1-mini',
            api_version='2025-04-01-preview',
            temperature=0.0,
            max_tokens=4096
        )
        # Use GPT-4.1 for complex/low-confidence re-extractions
        self.llm_strong = AzureChatOpenAI(
            azure_deployment='gpt-4-1',
            api_version='2025-04-01-preview',
            temperature=0.0,
            max_tokens=8192
        )
        self.CONFIDENCE_THRESHOLD = 0.85
    
    async def extract_loss_run(self, doc_intelligence_output: dict) -> LossRunExtraction:
        """Extract structured data from a loss run document."""
        prompt = LOSS_RUN_EXTRACTION_PROMPT.format(
            document_text=doc_intelligence_output.get('content', '')[:15000],
            table_data=json.dumps(doc_intelligence_output.get('tables', [])[:10], indent=2),
            kv_pairs=json.dumps(doc_intelligence_output.get('key_value_pairs', []), indent=2),
            schema=LossRunExtraction.model_json_schema()
        )
        
        # First pass with fast model
        response = await self.llm_fast.ainvoke(prompt)
        extraction = LossRunExtraction.model_validate_json(response.content)
        
        # If confidence is low, re-extract with stronger model
        if extraction.overall_confidence < self.CONFIDENCE_THRESHOLD:
            logger.info(f'Low confidence ({extraction.overall_confidence:.2f}), re-extracting with GPT-4.1')
            response = await self.llm_strong.ainvoke(prompt)
            extraction = LossRunExtraction.model_validate_json(response.content)
            extraction.extraction_notes.append('Re-extracted with GPT-4.1 due to low initial confidence')
        
        return extraction
    
    async def extract_acord_form(self, doc_intelligence_output: dict) -> ACORDFieldExtraction:
        """Extract structured data from an ACORD application form."""
        prompt = ACORD_EXTRACTION_PROMPT.format(
            document_text=doc_intelligence_output.get('content', '')[:15000],
            table_data=json.dumps(doc_intelligence_output.get('tables', [])[:10], indent=2),
            kv_pairs=json.dumps(doc_intelligence_output.get('key_value_pairs', []), indent=2),
            schema=ACORDFieldExtraction.model_json_schema()
        )
        
        response = await self.llm_fast.ainvoke(prompt)
        extraction = ACORDFieldExtraction.model_validate_json(response.content)
        
        if extraction.overall_confidence < self.CONFIDENCE_THRESHOLD:
            logger.info(f'Low confidence ({extraction.overall_confidence:.2f}), re-extracting with GPT-4.1')
            response = await self.llm_strong.ainvoke(prompt)
            extraction = ACORDFieldExtraction.model_validate_json(response.content)
        
        return extraction
    
    def needs_human_review(self, extraction) -> bool:
        """Determine if the extraction needs human review."""
        if hasattr(extraction, 'overall_confidence'):
            if extraction.overall_confidence < self.CONFIDENCE_THRESHOLD:
                return True
        if hasattr(extraction, 'low_confidence_fields'):
            if len(extraction.low_confidence_fields) > 3:
                return True
        return False

Submission Orchestration Graph

Type: workflow The core LangGraph state machine that orchestrates the entire submission workflow from document intake to carrier quote collection. Implements a directed graph with conditional routing, parallel carrier submission, human-in-the-loop approval checkpointing, and error recovery. This is the 'brain' that coordinates all other components.

Implementation

/opt/insurance-ai-agent/agent/agent_graph.py
python
# /opt/insurance-ai-agent/agent/agent_graph.py

import json
import logging
import uuid
from typing import Annotated, Any, Dict, List, Optional, TypedDict
from datetime import datetime

from langgraph.graph import StateGraph, END
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.graph.message import add_messages

from agent.doc_intelligence import DocumentProcessor
from agent.extraction_agent import ExtractionAgent, LossRunExtraction, ACORDFieldExtraction
from agent.ams_integration import AMSClient
from agent.carrier_submit import CarrierSubmissionEngine
from agent.form_populator import ACORDFormPopulator
from agent.db import get_db_connection, save_submission, update_submission_status, save_audit_log

logger = logging.getLogger(__name__)

# --- State Definition ---

class SubmissionState(TypedDict):
    # Input
    submission_id: str
    client_ams_id: str
    document_urls: List[str]
    target_carriers: List[str]
    lines_of_business: List[str]
    priority: str
    
    # Processing state
    documents_processed: List[Dict[str, Any]]  # OCR results
    extractions: List[Dict[str, Any]]  # Structured extractions
    client_data: Dict[str, Any]  # From AMS
    merged_application_data: Dict[str, Any]  # Combined extraction + AMS data
    populated_forms: Dict[str, Any]  # ACORD forms ready for submission
    
    # Review state
    needs_review: bool
    review_flags: List[str]
    human_approved: bool
    reviewer_name: str
    review_notes: str
    
    # Submission state
    carrier_results: Dict[str, Dict[str, Any]]  # Per-carrier submission results
    quotes_received: Dict[str, Dict[str, Any]]  # Per-carrier quotes
    comparison_report: str
    
    # Metadata
    status: str
    errors: List[str]
    created_at: str
    updated_at: str


# --- Node Functions ---

async def process_documents(state: SubmissionState) -> SubmissionState:
    """Node 1: Process all input documents through Azure Document Intelligence."""
    logger.info(f'Processing {len(state["document_urls"])} documents')
    processor = DocumentProcessor()
    documents_processed = []
    
    for url in state['document_urls']:
        try:
            result = processor.analyze_document(document_url=url)
            doc_type = processor.classify_document(result['content'])
            documents_processed.append({
                'url': url,
                'type': doc_type,
                'ocr_result': result,
                'status': 'processed'
            })
            logger.info(f'Processed document: {url} -> type: {doc_type}')
        except Exception as e:
            logger.error(f'Failed to process document {url}: {e}')
            documents_processed.append({
                'url': url,
                'type': 'error',
                'error': str(e),
                'status': 'failed'
            })
    
    save_audit_log('documents_processed', 'submission', state['submission_id'],
                   'system', {'count': len(documents_processed)})
    
    return {
        **state,
        'documents_processed': documents_processed,
        'status': 'documents_processed',
        'updated_at': datetime.utcnow().isoformat()
    }


async def extract_data(state: SubmissionState) -> SubmissionState:
    """Node 2: Extract structured data from processed documents using LLM."""
    logger.info('Extracting structured data from documents')
    extractor = ExtractionAgent()
    extractions = []
    review_flags = []
    
    for doc in state['documents_processed']:
        if doc['status'] != 'processed':
            continue
        
        try:
            if doc['type'] == 'loss_run':
                extraction = await extractor.extract_loss_run(doc['ocr_result'])
                if extractor.needs_human_review(extraction):
                    review_flags.append(f"Loss run from {extraction.carrier_name}: confidence {extraction.overall_confidence:.0%}")
                extractions.append({
                    'doc_url': doc['url'],
                    'doc_type': 'loss_run',
                    'data': extraction.model_dump(),
                    'confidence': extraction.overall_confidence
                })
            elif doc['type'].startswith('acord_'):
                extraction = await extractor.extract_acord_form(doc['ocr_result'])
                if extractor.needs_human_review(extraction):
                    review_flags.append(f"ACORD form {extraction.form_type}: {len(extraction.low_confidence_fields)} low-confidence fields")
                extractions.append({
                    'doc_url': doc['url'],
                    'doc_type': doc['type'],
                    'data': extraction.model_dump(),
                    'confidence': extraction.overall_confidence
                })
        except Exception as e:
            logger.error(f'Extraction failed for {doc["url"]}: {e}')
            review_flags.append(f"Extraction failed for {doc['url']}: {str(e)}")
    
    save_audit_log('data_extracted', 'submission', state['submission_id'],
                   'system', {'extraction_count': len(extractions), 'flags': len(review_flags)})
    
    return {
        **state,
        'extractions': extractions,
        'review_flags': review_flags,
        'status': 'data_extracted',
        'updated_at': datetime.utcnow().isoformat()
    }


async def enrich_from_ams(state: SubmissionState) -> SubmissionState:
    """Node 3: Pull client data from AMS and merge with extracted data."""
    logger.info(f'Enriching with AMS data for client {state["client_ams_id"]}')
    ams = AMSClient()
    
    try:
        client_data = await ams.get_client(state['client_ams_id'])
        policies = await ams.get_policies(state['client_ams_id'])
        client_data['existing_policies'] = policies
    except Exception as e:
        logger.error(f'AMS lookup failed: {e}')
        client_data = {}
        state['review_flags'].append(f'AMS lookup failed: {str(e)} - manual data entry required')
    
    # Merge extracted data with AMS data (AMS data takes priority for identity fields)
    merged = {
        'applicant': {
            'business_name': client_data.get('business_name', ''),
            'dba': client_data.get('dba', ''),
            'fein': client_data.get('fein', ''),
            'entity_type': client_data.get('entity_type', ''),
            'sic_code': client_data.get('sic_code', ''),
            'naics_code': client_data.get('naics_code', ''),
            'primary_contact': client_data.get('primary_contact', {}),
            'mailing_address': client_data.get('mailing_address', {}),
            'locations': client_data.get('locations', [])
        },
        'business_info': {},
        'loss_history': [],
        'coverage': {},
        'existing_policies': client_data.get('existing_policies', [])
    }
    
    # Add extracted application data
    for ext in state['extractions']:
        if ext['doc_type'] == 'loss_run':
            merged['loss_history'].append(ext['data'])
        elif ext['doc_type'].startswith('acord_'):
            # Merge ACORD data, preferring non-null values
            acord_data = ext['data']
            for key, value in acord_data.items():
                if value and key not in ['overall_confidence', 'low_confidence_fields']:
                    if key == 'coverage_details':
                        merged['coverage'].update(value)
                    elif key == 'locations' and value:
                        merged['applicant']['locations'] = value
                    elif key in merged['applicant'] and not merged['applicant'].get(key):
                        merged['applicant'][key] = value
                    elif key not in merged['applicant']:
                        merged['business_info'][key] = value
    
    save_audit_log('ams_enrichment', 'submission', state['submission_id'],
                   'system', {'client_found': bool(client_data)})
    
    return {
        **state,
        'client_data': client_data,
        'merged_application_data': merged,
        'status': 'data_enriched',
        'updated_at': datetime.utcnow().isoformat()
    }


async def populate_forms(state: SubmissionState) -> SubmissionState:
    """Node 4: Populate ACORD forms and carrier-specific applications."""
    logger.info('Populating submission forms')
    populator = ACORDFormPopulator()
    populated = {}
    
    for lob in state['lines_of_business']:
        try:
            forms = populator.populate_for_line(lob, state['merged_application_data'])
            populated[lob] = forms
        except Exception as e:
            logger.error(f'Form population failed for {lob}: {e}')
            state['review_flags'].append(f'Form population failed for {lob}: {str(e)}')
    
    # Determine if human review is needed
    needs_review = (
        len(state['review_flags']) > 0 or
        any(ext['confidence'] < 0.85 for ext in state['extractions'])
    )
    
    save_audit_log('forms_populated', 'submission', state['submission_id'],
                   'system', {'lines': list(populated.keys()), 'needs_review': needs_review})
    
    return {
        **state,
        'populated_forms': populated,
        'needs_review': needs_review,
        'status': 'awaiting_review' if needs_review else 'ready_to_submit',
        'updated_at': datetime.utcnow().isoformat()
    }


def should_route_to_review(state: SubmissionState) -> str:
    """Conditional edge: route to human review or directly to submission."""
    # ALWAYS route to human review in production (safety first)
    # Even if confidence is high, human approval is required
    return 'human_review'


async def human_review_checkpoint(state: SubmissionState) -> SubmissionState:
    """Node 5: CHECKPOINT - Pause execution and wait for human approval.
    This node saves state and returns. The graph is resumed when the
    human approves/rejects via the dashboard API endpoint."""
    logger.info(f'Submission {state["submission_id"]} awaiting human review')
    
    # Save current state to database for dashboard display
    save_submission(state)
    
    # Send notification via n8n webhook
    import httpx
    try:
        async with httpx.AsyncClient() as client:
            await client.post('http://localhost:5678/webhook/submission-ready', json={
                'submission_id': state['submission_id'],
                'client_name': state['merged_application_data'].get('applicant', {}).get('business_name', 'Unknown'),
                'lines': state['lines_of_business'],
                'carriers': state['target_carriers'],
                'flags': state['review_flags'],
                'status': 'awaiting_review'
            })
    except Exception as e:
        logger.warning(f'Failed to send review notification: {e}')
    
    # Graph execution pauses here until resumed by approve API
    return {
        **state,
        'status': 'awaiting_review',
        'updated_at': datetime.utcnow().isoformat()
    }


async def submit_to_carriers(state: SubmissionState) -> SubmissionState:
    """Node 6: Submit populated applications to all target carriers."""
    if not state.get('human_approved', False):
        logger.error('Attempted to submit without human approval!')
        return {**state, 'status': 'error', 'errors': state['errors'] + ['No human approval']}
    
    logger.info(f'Submitting to {len(state["target_carriers"])} carriers')
    engine = CarrierSubmissionEngine()
    carrier_results = {}
    
    for carrier in state['target_carriers']:
        try:
            result = await engine.submit(
                carrier_name=carrier,
                submission_data=state['merged_application_data']
            )
            carrier_results[carrier] = {
                'status': result.get('status', 'submitted'),
                'reference_id': result.get('reference_id', ''),
                'submitted_at': datetime.utcnow().isoformat(),
                'platform': result.get('platform', 'unknown')
            }
            logger.info(f'Submitted to {carrier}: {result.get("status")}')
        except Exception as e:
            logger.error(f'Submission to {carrier} failed: {e}')
            carrier_results[carrier] = {
                'status': 'failed',
                'error': str(e),
                'submitted_at': datetime.utcnow().isoformat()
            }
    
    # Log to AMS
    try:
        ams = AMSClient()
        await ams.create_activity(state['client_ams_id'], {
            'type': 'submission',
            'description': f'AI agent submitted to {len(carrier_results)} carriers: {", ".join(state["target_carriers"])}',
            'details': json.dumps(carrier_results)
        })
    except Exception as e:
        logger.warning(f'Failed to log to AMS: {e}')
    
    save_audit_log('carriers_submitted', 'submission', state['submission_id'],
                   state.get('reviewer_name', 'system'),
                   {'carriers': list(carrier_results.keys()),
                    'results': {k: v['status'] for k, v in carrier_results.items()}})
    
    return {
        **state,
        'carrier_results': carrier_results,
        'status': 'submitted',
        'updated_at': datetime.utcnow().isoformat()
    }


async def generate_comparison(state: SubmissionState) -> SubmissionState:
    """Node 7: When quotes are received, generate comparison report."""
    # This node is called by the quote monitoring workflow when quotes come in
    if not state.get('quotes_received'):
        return {**state, 'status': 'awaiting_quotes'}
    
    from langchain_openai import AzureChatOpenAI
    llm = AzureChatOpenAI(azure_deployment='gpt-4-1-mini', temperature=0.0)
    
    prompt = f"""Generate a clear, professional quote comparison summary for an insurance agent.
    
    Client: {state['merged_application_data'].get('applicant', {}).get('business_name', 'N/A')}
    Lines of Business: {', '.join(state['lines_of_business'])}
    
    Quotes Received:
    {json.dumps(state['quotes_received'], indent=2)}
    
    Format as a table with columns: Carrier | Premium | Deductible | Key Coverage Highlights | Notable Exclusions
    Add a recommendation summary at the bottom noting best value and best coverage options.
    """
    
    response = await llm.ainvoke(prompt)
    
    return {
        **state,
        'comparison_report': response.content,
        'status': 'quotes_compared',
        'updated_at': datetime.utcnow().isoformat()
    }


# --- Build the Graph ---

def create_submission_graph():
    """Create and compile the submission workflow graph."""
    import os
    
    # Set up PostgreSQL checkpointer for state persistence
    checkpointer = PostgresSaver.from_conn_string(os.environ['DATABASE_URL'])
    checkpointer.setup()  # Creates checkpoint tables if not exist
    
    # Build the graph
    workflow = StateGraph(SubmissionState)
    
    # Add nodes
    workflow.add_node('process_documents', process_documents)
    workflow.add_node('extract_data', extract_data)
    workflow.add_node('enrich_from_ams', enrich_from_ams)
    workflow.add_node('populate_forms', populate_forms)
    workflow.add_node('human_review', human_review_checkpoint)
    workflow.add_node('submit_to_carriers', submit_to_carriers)
    workflow.add_node('generate_comparison', generate_comparison)
    
    # Add edges (sequential flow with conditional routing)
    workflow.set_entry_point('process_documents')
    workflow.add_edge('process_documents', 'extract_data')
    workflow.add_edge('extract_data', 'enrich_from_ams')
    workflow.add_edge('enrich_from_ams', 'populate_forms')
    workflow.add_conditional_edges(
        'populate_forms',
        should_route_to_review,
        {
            'human_review': 'human_review',
            'submit': 'submit_to_carriers'
        }
    )
    workflow.add_edge('human_review', 'submit_to_carriers')  # After approval
    workflow.add_edge('submit_to_carriers', 'generate_comparison')
    workflow.add_edge('generate_comparison', END)
    
    # Compile with checkpointing
    graph = workflow.compile(checkpointer=checkpointer, interrupt_before=['human_review'])
    
    return graph

ACORD Form Populator

Type: skill Takes merged application data (from AI extraction + AMS enrichment) and maps it to the correct fields on ACORD form templates. Supports ACORD 125 (Commercial Insurance Application), ACORD 126 (Commercial General Liability), ACORD 130 (Workers Compensation), and ACORD 140 (Property Section). Generates populated form data ready for submission via Tarmika/Semsee APIs or direct carrier portal entry.

Implementation:

/opt/insurance-ai-agent/agent/form_populator.py
python
# /opt/insurance-ai-agent/agent/form_populator.py

import json
import logging
from typing import Dict, Any, List
from datetime import datetime, timedelta

logger = logging.getLogger(__name__)

# ACORD form field mappings
# These map our normalized data schema to ACORD form field identifiers
# Field IDs correspond to standard ACORD XML element names

ACORD_125_FIELD_MAP = {
    # Section 1: Applicant Information
    'ApplicantName': 'applicant.business_name',
    'ApplicantDBA': 'applicant.dba',
    'ApplicantMailingAddr1': 'applicant.mailing_address.street1',
    'ApplicantMailingAddr2': 'applicant.mailing_address.street2',
    'ApplicantMailingCity': 'applicant.mailing_address.city',
    'ApplicantMailingState': 'applicant.mailing_address.state',
    'ApplicantMailingZip': 'applicant.mailing_address.zip',
    'ApplicantPhone': 'applicant.primary_contact.phone',
    'ApplicantEmail': 'applicant.primary_contact.email',
    'ApplicantWebsite': 'applicant.website',
    'ApplicantFEIN': 'applicant.fein',
    'ApplicantSICCode': 'applicant.sic_code',
    'ApplicantNAICSCode': 'applicant.naics_code',
    'EntityType': 'applicant.entity_type',
    'YearsInBusiness': 'business_info.years_in_business',
    'AnnualRevenue': 'business_info.annual_revenue',
    'NumEmployees': 'business_info.num_employees',
    'BusinessDescription': 'business_info.business_description',
    # Section 2: Policy Information
    'ProposedEffDate': 'coverage.effective_date',
    'ProposedExpDate': 'coverage.expiration_date',
    # Section 3: Prior Insurance
    'PriorCarrier': 'existing_policies.0.carrier_name',
    'PriorPolicyNumber': 'existing_policies.0.policy_number',
    'PriorPremium': 'existing_policies.0.premium',
    'PriorEffDate': 'existing_policies.0.effective_date',
    'PriorExpDate': 'existing_policies.0.expiration_date',
    # Section 4: Nature of Business
    'DescriptionOfOperations': 'business_info.business_description',
}

ACORD_126_FIELD_MAP = {
    # GL-specific fields (extends ACORD 125)
    'GLClassCode': 'coverage.gl.class_code',
    'GLPremisesOperations': 'coverage.gl.premises_operations_limit',
    'GLGeneralAggregate': 'coverage.gl.general_aggregate',
    'GLProductsCompOps': 'coverage.gl.products_completed_ops',
    'GLPersonalAdvInjury': 'coverage.gl.personal_advertising_injury',
    'GLEachOccurrence': 'coverage.gl.each_occurrence',
    'GLFireDamageLegal': 'coverage.gl.fire_damage_legal',
    'GLMedicalExpense': 'coverage.gl.medical_expense',
    'GLDeductible': 'coverage.gl.deductible',
    'GLRetroDate': 'coverage.gl.retro_date',
    'AdditionalInsuredRequired': 'coverage.gl.additional_insured_required',
    'WaiverOfSubrogation': 'coverage.gl.waiver_of_subrogation',
}

ACORD_130_FIELD_MAP = {
    # Workers Comp specific fields
    'WCState': 'coverage.wc.governing_state',
    'WCClassCode': 'coverage.wc.class_codes',
    'WCPayroll': 'coverage.wc.estimated_annual_payroll',
    'WCNumEmployees': 'business_info.num_employees',
    'WCExperienceMod': 'loss_history_summary.experience_mod',
    'WCDeductible': 'coverage.wc.deductible',
    'EachAccident': 'coverage.wc.each_accident_limit',
    'DiseasePolicyLimit': 'coverage.wc.disease_policy_limit',
    'DiseaseEachEmployee': 'coverage.wc.disease_each_employee',
}

ACORD_140_FIELD_MAP = {
    # Property specific fields
    'BuildingValue': 'coverage.property.building_value',
    'BPPValue': 'coverage.property.bpp_value',
    'BusinessIncomeValue': 'coverage.property.business_income',
    'PropertyDeductible': 'coverage.property.deductible',
    'ConstructionType': 'coverage.property.construction_type',
    'YearBuilt': 'coverage.property.year_built',
    'NumStories': 'coverage.property.num_stories',
    'SquareFootage': 'coverage.property.square_footage',
    'Sprinklered': 'coverage.property.sprinklered',
    'AlarmType': 'coverage.property.alarm_type',
    'RoofType': 'coverage.property.roof_type',
    'RoofAge': 'coverage.property.roof_age',
}

LINE_TO_FORMS = {
    'GL': ['ACORD_125', 'ACORD_126'],
    'WC': ['ACORD_125', 'ACORD_130'],
    'PROP': ['ACORD_125', 'ACORD_140'],
    'BOP': ['ACORD_125', 'ACORD_126', 'ACORD_140'],
    'CA': ['ACORD_125'],  # Plus ACORD 127 (not yet mapped)
    'UMBR': ['ACORD_125'],  # Plus ACORD 129 (not yet mapped)
}

FORM_FIELD_MAPS = {
    'ACORD_125': ACORD_125_FIELD_MAP,
    'ACORD_126': ACORD_126_FIELD_MAP,
    'ACORD_130': ACORD_130_FIELD_MAP,
    'ACORD_140': ACORD_140_FIELD_MAP,
}


class ACORDFormPopulator:
    def __init__(self):
        self.field_maps = FORM_FIELD_MAPS
        self.line_forms = LINE_TO_FORMS
    
    def populate_for_line(self, line_of_business: str, merged_data: dict) -> dict:
        """Populate all required ACORD forms for a given line of business."""
        lob = line_of_business.upper()
        required_forms = self.line_forms.get(lob, ['ACORD_125'])
        populated_forms = {}
        
        # Summarize loss history for form population
        if merged_data.get('loss_history'):
            merged_data['loss_history_summary'] = self._summarize_loss_history(
                merged_data['loss_history'], lob
            )
        
        # Set default dates if not provided
        if not merged_data.get('coverage', {}).get('effective_date'):
            merged_data.setdefault('coverage', {})['effective_date'] = (
                datetime.now() + timedelta(days=30)
            ).strftime('%Y-%m-%d')
            merged_data['coverage']['expiration_date'] = (
                datetime.now() + timedelta(days=395)
            ).strftime('%Y-%m-%d')
        
        for form_name in required_forms:
            field_map = self.field_maps.get(form_name, {})
            populated = {}
            missing_fields = []
            
            for acord_field, data_path in field_map.items():
                value = self._get_nested_value(merged_data, data_path)
                if value is not None:
                    populated[acord_field] = value
                else:
                    missing_fields.append(acord_field)
            
            populated_forms[form_name] = {
                'fields': populated,
                'missing_fields': missing_fields,
                'completeness': len(populated) / max(len(field_map), 1)
            }
            
            logger.info(
                f'{form_name}: {len(populated)}/{len(field_map)} fields populated '
                f'({populated_forms[form_name]["completeness"]:.0%} complete)'
            )
        
        return populated_forms
    
    def _get_nested_value(self, data: dict, path: str):
        """Navigate nested dict using dot notation (supports array indices)."""
        keys = path.split('.')
        value = data
        for key in keys:
            if isinstance(value, dict):
                value = value.get(key)
            elif isinstance(value, list):
                try:
                    value = value[int(key)]
                except (IndexError, ValueError):
                    return None
            else:
                return None
            if value is None:
                return None
        return value
    
    def _summarize_loss_history(self, loss_runs: list, line_of_business: str) -> dict:
        """Summarize loss history across multiple loss runs for form population."""
        summary = {
            'total_claims': 0,
            'open_claims': 0,
            'total_incurred': 0.0,
            'total_paid': 0.0,
            'years_covered': set(),
            'experience_mod': None,
            'largest_claim': 0.0
        }
        
        for lr in loss_runs:
            if isinstance(lr, dict):
                data = lr
            else:
                data = lr if isinstance(lr, dict) else {}
            
            claims = data.get('claims', [])
            summary['total_claims'] += len(claims)
            
            for claim in claims:
                if isinstance(claim, dict):
                    if claim.get('claim_status', '').lower() == 'open':
                        summary['open_claims'] += 1
                    incurred = claim.get('total_incurred', 0) or 0
                    summary['total_incurred'] += incurred
                    summary['total_paid'] += claim.get('total_paid', 0) or 0
                    summary['largest_claim'] = max(summary['largest_claim'], incurred)
            
            if data.get('experience_mod'):
                summary['experience_mod'] = data['experience_mod']
            
            if data.get('policy_effective_date'):
                try:
                    year = int(data['policy_effective_date'][:4])
                    summary['years_covered'].add(year)
                except (ValueError, IndexError):
                    pass
        
        summary['years_covered'] = sorted(list(summary['years_covered']))
        return summary

Loss Run Data Extraction Prompt

Type: prompt Specialized prompt template for extracting structured claims and policy data from carrier loss run PDF documents. Handles the wide variety of loss run formats across different insurance carriers (each carrier formats their loss runs differently). Includes examples for common carriers like Hartford, Travelers, CNA, AmTrust, and Liberty Mutual.

Implementation:

Loss Run Extraction Prompt v1.0

## System Prompt You are an expert insurance data extraction specialist with deep knowledge of P&C insurance loss run reports. You have extensive experience reading loss runs from all major carriers including Hartford, Travelers, CNA, AmTrust, Liberty Mutual, Zurich, Chubb, Nationwide, Erie, and regional carriers. ## Key Knowledge - Loss runs report claims history for a specific policy period - Each carrier formats their loss run differently but all contain: policy holder, policy number, policy period, and claims detail - Claims detail typically includes: date of loss, claim number, claimant, description, status, paid amounts, reserved amounts, incurred amounts - Incurred = Paid + Reserved (outstanding) - Loss ratio = Total Incurred / Earned Premium - Workers Comp loss runs include Experience Modification Rate (EMR/e-mod) - Some loss runs show multiple policy periods in one document - "Total Incurred" may be labeled as "Total Losses", "Total Cost", or "Incurred Losses" - Reserves may be labeled as "Outstanding", "Case Reserves", or "O/S Reserves" - A "clean" loss run means zero claims ## Extraction Instructions Given the OCR text output from a loss run document: 1. **Identify the carrier** from the letterhead, logo text, or header 2. **Extract policy details**: policy number, named insured, policy type (GL/WC/AUTO/PROP/BOP), effective and expiration dates 3. **Extract each claim as a separate record** with all available fields 4. **Calculate totals** if not explicitly stated: sum all paid, reserved, and incurred amounts 5. **Calculate loss ratio** if premium information is available 6. **Assign confidence scores** (0.0 to 1.0) based on: - 1.0: Clear, unambiguous text - 0.8-0.9: Minor OCR artifacts but data is interpretable - 0.5-0.7: Significant ambiguity, multiple possible interpretations - Below 0.5: Essentially guessing — flag for human review 7. **Add extraction notes** for any anomalies: - Missing fields that are normally present - Inconsistent totals (sum of claims ≠ stated total) - Unusually formatted dates or amounts - Pages that appear to be missing or cut off ## Output Format Return a JSON object matching the LossRunExtraction Pydantic schema. All monetary values as floats (no currency symbols). All dates as YYYY-MM-DD strings. ## Common Carrier Format Notes - **Hartford**: Header says "The Hartford"; claims in table format; shows Indemnity and Expense separately for WC - **Travelers**: Shows "Travelers Insurance"; detailed claim descriptions; Medical/Indemnity/Expense breakdown for WC - **CNA**: Minimal formatting; sometimes PDF tables are poorly structured; look for "Valued as of" date - **AmTrust**: Clean table format; often includes "Loss Summary" section at end - **Liberty Mutual**: "Liberty Mutual Insurance"; shows Open/Closed status clearly; includes Subrogation column
Sonnet 4.6

Email Ingestion Integration

Type: integration Microsoft Graph API integration that monitors the agency's shared mailbox for incoming loss run documents and carrier correspondence. Automatically downloads PDF attachments, uploads them to Azure Blob Storage, and triggers the document processing pipeline. Handles duplicate detection, attachment filtering, and email classification. Implementation:

/opt/insurance-ai-agent/agent/email_ingestion.py
python
# /opt/insurance-ai-agent/agent/email_ingestion.py

import os
import hashlib
import logging
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
import httpx
import base64

logger = logging.getLogger(__name__)

class EmailIngestionService:
    def __init__(self):
        self.tenant_id = os.environ['MS_GRAPH_TENANT_ID']
        self.client_id = os.environ['MS_GRAPH_CLIENT_ID']
        self.client_secret = os.environ['MS_GRAPH_CLIENT_SECRET']
        self.mailbox = os.environ.get('MONITORED_MAILBOX', 'ai-submissions@agency.com')
        self.graph_base = 'https://graph.microsoft.com/v1.0'
        self._token = None
        self._token_expiry = None
        self._processed_hashes = set()  # In-memory dedup; use Redis in production
    
    async def _get_token(self) -> str:
        if self._token and self._token_expiry and datetime.utcnow() < self._token_expiry:
            return self._token
        
        async with httpx.AsyncClient() as client:
            response = await client.post(
                f'https://login.microsoftonline.com/{self.tenant_id}/oauth2/v2.0/token',
                data={
                    'grant_type': 'client_credentials',
                    'client_id': self.client_id,
                    'client_secret': self.client_secret,
                    'scope': 'https://graph.microsoft.com/.default'
                }
            )
            response.raise_for_status()
            token_data = response.json()
            self._token = token_data['access_token']
            self._token_expiry = datetime.utcnow() + timedelta(seconds=token_data['expires_in'] - 60)
            return self._token
    
    async def check_for_new_documents(self) -> List[Dict[str, Any]]:
        """Check shared mailbox for new emails with attachments."""
        token = await self._get_token()
        headers = {'Authorization': f'Bearer {token}'}
        
        # Get unread emails with attachments
        filter_query = "isRead eq false and hasAttachments eq true"
        async with httpx.AsyncClient() as client:
            response = await client.get(
                f'{self.graph_base}/users/{self.mailbox}/messages',
                headers=headers,
                params={
                    '$filter': filter_query,
                    '$select': 'id,subject,from,receivedDateTime,hasAttachments',
                    '$top': 50,
                    '$orderby': 'receivedDateTime desc'
                }
            )
            response.raise_for_status()
            messages = response.json().get('value', [])
        
        new_documents = []
        for msg in messages:
            attachments = await self._get_attachments(msg['id'], headers)
            for att in attachments:
                if self._is_relevant_attachment(att):
                    # Check for duplicates
                    content_hash = hashlib.sha256(att['content']).hexdigest()
                    if content_hash not in self._processed_hashes:
                        self._processed_hashes.add(content_hash)
                        new_documents.append({
                            'email_id': msg['id'],
                            'subject': msg['subject'],
                            'from': msg.get('from', {}).get('emailAddress', {}).get('address', ''),
                            'received': msg['receivedDateTime'],
                            'filename': att['name'],
                            'content': att['content'],
                            'content_hash': content_hash,
                            'content_type': att['contentType']
                        })
            
            # Mark email as read
            await self._mark_as_read(msg['id'], headers)
        
        logger.info(f'Found {len(new_documents)} new documents from {len(messages)} emails')
        return new_documents
    
    async def _get_attachments(self, message_id: str, headers: dict) -> list:
        async with httpx.AsyncClient() as client:
            response = await client.get(
                f'{self.graph_base}/users/{self.mailbox}/messages/{message_id}/attachments',
                headers=headers
            )
            response.raise_for_status()
            attachments = []
            for att in response.json().get('value', []):
                if att.get('contentBytes'):
                    attachments.append({
                        'name': att['name'],
                        'contentType': att['contentType'],
                        'content': base64.b64decode(att['contentBytes'])
                    })
            return attachments
    
    async def _mark_as_read(self, message_id: str, headers: dict):
        async with httpx.AsyncClient() as client:
            await client.patch(
                f'{self.graph_base}/users/{self.mailbox}/messages/{message_id}',
                headers={**headers, 'Content-Type': 'application/json'},
                json={'isRead': True}
            )
    
    def _is_relevant_attachment(self, attachment: dict) -> bool:
        """Filter for relevant document types."""
        name = attachment.get('name', '').lower()
        content_type = attachment.get('contentType', '').lower()
        
        # Accept PDFs, Word docs, and Excel files
        relevant_extensions = ('.pdf', '.docx', '.doc', '.xlsx', '.xls', '.tiff', '.tif')
        relevant_mimes = ('application/pdf', 'application/vnd.openxmlformats', 'image/tiff')
        
        if any(name.endswith(ext) for ext in relevant_extensions):
            return True
        if any(mime in content_type for mime in relevant_mimes):
            return True
        
        return False
    
    def classify_email_intent(self, subject: str, sender: str) -> str:
        """Classify the intent of an incoming email based on subject and sender."""
        subject_lower = subject.lower()
        
        if any(kw in subject_lower for kw in ['loss run', 'loss history', 'claims history', 'loss experience']):
            return 'loss_run_delivery'
        elif any(kw in subject_lower for kw in ['application', 'submission', 'acord', 'new business']):
            return 'application'
        elif any(kw in subject_lower for kw in ['quote', 'proposal', 'indication', 'premium']):
            return 'carrier_quote'
        elif any(kw in subject_lower for kw in ['renewal', 'expiring', 'upcoming renewal']):
            return 'renewal_notice'
        else:
            return 'general'

Quote Comparison Reporter

Type: skill Generates professional quote comparison reports when carrier quotes are received. Uses GPT-4.1-mini to analyze quotes from multiple carriers, highlight key differences in coverage, pricing, deductibles, and exclusions, and produce a formatted comparison that the agent/producer can present to the client.

Implementation:

Quote Comparison Report Prompt

You are an expert insurance advisor helping an independent insurance agent prepare a quote comparison for their client. Generate a professional, clear, and comprehensive quote comparison report. ## Client Information - Business Name: {business_name} - Lines of Business: {lines_of_business} - Industry: {industry} - Annual Revenue: {revenue} ## Carrier Quotes {quotes_json} ## Loss History Summary - Total Claims (5 years): {total_claims} - Total Incurred: ${total_incurred:,.2f} - Experience Mod (WC): {experience_mod} ## Instructions Generate a report with these sections: ### 1. Executive Summary Brief overview of quotes received and overall recommendation. ### 2. Premium Comparison Table | Carrier | Line | Annual Premium | Deductible | Payment Terms | (Include all carriers and all lines) ### 3. Coverage Comparison For each line of business, compare: - Limits (per occurrence, aggregate) - Key endorsements included/excluded - Notable coverage differences - Deductible options ### 4. Carrier Strength & Service - AM Best ratings (if known) - Claims handling reputation - Local service capabilities ### 5. Recommendation - Best Overall Value (price + coverage balance) - Best Coverage (most comprehensive) - Best Price (lowest premium) - Any concerns or caveats Format the output as clean HTML that can be rendered in an email or dashboard. Use tables for comparisons. Highlight the recommended option.
Sonnet 4.6
/opt/insurance-ai-agent/agent/quote_comparison.py
python
import json
import logging
from typing import Dict, List, Any, Optional
from langchain_openai import AzureChatOpenAI

logger = logging.getLogger(__name__)


class QuoteComparisonReporter:
    def __init__(self):
        self.llm = AzureChatOpenAI(
            azure_deployment='gpt-4-1-mini',
            api_version='2025-04-01-preview',
            temperature=0.2,
            max_tokens=4096
        )
    
    async def generate_comparison(
        self,
        quotes: Dict[str, Dict[str, Any]],
        client_info: Dict[str, Any],
        loss_history_summary: Dict[str, Any]
    ) -> str:
        """Generate a formatted quote comparison report."""
        
        prompt = COMPARISON_PROMPT.format(
            business_name=client_info.get('business_name', 'N/A'),
            lines_of_business=', '.join(client_info.get('lines_of_business', [])),
            industry=client_info.get('business_description', 'N/A'),
            revenue=client_info.get('annual_revenue', 'N/A'),
            quotes_json=json.dumps(quotes, indent=2, default=str),
            total_claims=loss_history_summary.get('total_claims', 0),
            total_incurred=loss_history_summary.get('total_incurred', 0),
            experience_mod=loss_history_summary.get('experience_mod', 'N/A')
        )
        
        response = await self.llm.ainvoke(prompt)
        return response.content
    
    def generate_quick_summary(self, quotes: Dict[str, Dict[str, Any]]) -> str:
        """Generate a quick text summary without LLM (for notifications)."""
        lines = ['📊 Quote Summary:\n']
        
        for carrier, details in quotes.items():
            premium = details.get('premium', 'TBD')
            if isinstance(premium, (int, float)):
                premium = f'${premium:,.0f}'
            lines.append(f'  • {carrier}: {premium}')
        
        if quotes:
            premiums = [
                v.get('premium', 0) for v in quotes.values() 
                if isinstance(v.get('premium'), (int, float))
            ]
            if premiums:
                lines.append(f'\n  Lowest: ${min(premiums):,.0f} | Highest: ${max(premiums):,.0f}')
                lines.append(f'  Spread: ${max(premiums) - min(premiums):,.0f}')
        
        return '\n'.join(lines)

Testing & Validation

  • DOCUMENT OCR TEST: Scan 3 different loss run PDFs (from different carriers — e.g., Hartford, Travelers, CNA) through the Azure Document Intelligence pipeline. Verify: (a) all pages are extracted, (b) tables are correctly identified with proper row/column structure, (c) text content is >98% accurate compared to the source document.
  • LOSS RUN EXTRACTION ACCURACY TEST: Process 10 sample loss runs through the full extraction pipeline (OCR → LLM extraction). For each, manually compare extracted fields against the source document. Success criteria: ≥90% of fields correctly extracted, ≥95% accuracy for critical fields (claim counts, total incurred, policy number), and all fields below 85% confidence are flagged for human review. Record results in a spreadsheet tracking field-by-field accuracy per carrier format.
  • ACORD FORM EXTRACTION TEST: Process 5 completed ACORD 125 forms through extraction. Verify: applicant name, FEIN, address, entity type, SIC/NAICS codes, and coverage dates are correctly extracted. Test with both typed/printed and handwritten forms. Success criteria: ≥95% accuracy on typed forms, ≥85% on handwritten forms.
  • AMS CONNECTIVITY TEST: Execute a read-only API call to the agency's AMS to retrieve a known test client record. Verify all standard fields are returned and correctly mapped by the normalization function.
  • FORM POPULATION COMPLETENESS TEST: For a known client with complete AMS data plus extracted loss run data, run the ACORD form populator for GL line of business. Verify: ACORD 125 has ≥80% of fields populated, ACORD 126 has ≥70% of GL-specific fields populated. Check that populated fields match the source data exactly (no data corruption during mapping).
  • EMAIL INGESTION PIPELINE TEST: Send a test email with a PDF attachment to the ai-submissions shared mailbox. Verify within 10 minutes: (a) the email is detected by the monitoring workflow, (b) the PDF attachment is downloaded and uploaded to Azure Blob Storage, (c) the document appears in the processing queue, (d) the email is marked as read. Check n8n execution log for the complete workflow trace.
  • HUMAN-IN-THE-LOOP APPROVAL TEST: Trigger a full submission workflow and verify it pauses at the human review checkpoint. Confirm: (a) the dashboard shows the pending submission with all extracted data, review flags, and populated form preview, (b) clicking 'Approve' resumes the workflow and proceeds to carrier submission, (c) clicking 'Reject' stops the workflow and logs the rejection reason, (d) the audit log records the reviewer name and timestamp.
  • CARRIER SUBMISSION TEST (SANDBOX): Submit a test application to the Tarmika sandbox environment (if available) or verify the API call is correctly formatted by capturing the HTTP request without actually sending. Verify: all required fields are present, field values match the populated ACORD data, and the API response is correctly parsed. For RPA carriers, run in non-submit mode (auto_submit=false) and verify the carrier portal form is correctly populated via screenshot comparison.
  • END-TO-END PIPELINE TEST: Run the complete workflow from a realistic starting point: (1) drop a loss run PDF and ACORD application PDF into the shared mailbox, (2) wait for automated pickup, (3) verify extraction in LangSmith traces, (4) verify AMS enrichment, (5) review populated forms in the dashboard, (6) approve the submission, (7) verify carrier submission API calls are made, (8) verify audit log has complete trail of all actions. Target: entire pipeline completes in <15 minutes for a single submission.
  • PARALLEL RUN ACCURACY TEST: During the 2-week parallel run period, compare AI-populated submissions against manually-prepared submissions for the same clients. Track: field-level match rate, data entry errors caught by AI that humans missed, data entry errors made by AI that humans caught, and total time difference (AI vs. manual). Success criteria for transitioning to production: ≥95% field-level accuracy and consistent time savings of ≥50% vs. manual process.
  • SECURITY AND COMPLIANCE AUDIT: Verify (a) all API communications use TLS 1.2+, (b) Azure Blob Storage has encryption at rest enabled, (c) PostgreSQL has SSL connections enforced, (d) all user-facing endpoints require authentication, (e) the audit_log table captures every AI action with timestamps, (f) PII data is not logged in application logs (only in the encrypted database), (g) Azure OpenAI content filtering is active and not returning filtered responses for insurance content.
  • DISASTER RECOVERY TEST: Simulate a VM failure by stopping the insurance-agent service. Verify: (a) Azure Monitor alert fires within 10 minutes, (b) n8n error workflow sends notification to MSP, (c) in-progress submissions can be resumed from the LangGraph checkpoint after service restart without data loss, (d) no duplicate carrier submissions occur during recovery.
DOCUMENT OCR TEST
python
# verify pages, tables, and key-value pairs extracted by Azure Document
# Intelligence

python -c "from agent.doc_intelligence import DocumentProcessor; dp = DocumentProcessor(); result = dp.analyze_document(document_url='<test-url>'); print(f'Pages: {len(result["pages"])}, Tables: {len(result["tables"])}, KV Pairs: {len(result["key_value_pairs"])}')"
AMS CONNECTIVITY TEST
bash
# retrieve known test client record; verify response contains business_name,
# fein, mailing_address, and primary_contact

curl http://localhost:8000/api/v1/test/ams-lookup?client_id=TEST-001

Client Handoff

Client Handoff Checklist

Training Sessions (Schedule 3 sessions, 1 hour each)

Session 1: System Overview & Daily Workflow (All Staff)

  • How the AI submission agent works end-to-end (visual flow diagram)
  • What changes in their daily workflow (emphasize: AI assists, humans decide)
  • How documents enter the system (email forwarding, scanner profiles, manual upload)
  • The review dashboard: logging in, viewing pending submissions, understanding extraction results
  • How to approve or reject a submission (and when to reject)
  • Understanding confidence scores and review flags

Session 2: Power User Training (CSRs & Account Managers)

  • How to correct AI extraction errors in the review dashboard
  • How to add/modify carrier targets for a submission
  • How to handle edge cases: unusual documents, new carrier formats, multi-policy submissions
  • How to view submission history and audit trails
  • How to request loss runs to feed into the system
  • Tips for getting better extraction results (scan quality, email subject lines)

Session 3: Admin Training (Agency Owner/Principal + Operations Manager)

  • n8n dashboard overview: monitoring workflow health
  • LangSmith basics: how to check if the AI is performing well
  • Cost monitoring: Azure consumption and API usage dashboards
  • How to request changes (new carriers, new lines of business, workflow modifications)
  • Compliance documentation: where records are stored, how to produce audit reports
  • Escalation process: who to contact at the MSP for issues

Documentation to Leave Behind

1
Quick Start Guide (laminated 1-pager at each desk): Step-by-step for the most common task — receiving a loss run email and getting it through to carrier submission
2
Review Dashboard User Guide (PDF, 10-15 pages): Screenshots and instructions for every dashboard function
3
Scanner Quick Reference (laminated card next to each scanner): Scan profiles, which button for which document type
4
Troubleshooting FAQ (PDF/SharePoint page): Common issues and self-service fixes (e.g., 'Document not detected' → check it went to the right mailbox)
5
Escalation Contact Sheet: MSP help desk number, SLA response times, emergency contacts
6
Compliance Documentation Binder: Updated WISP, vendor risk assessments for all AI providers, data flow diagrams, data processing agreements

Success Criteria to Review Together (at 30-day and 90-day checkpoints)

  • Extraction accuracy rate (target: ≥95% by day 90)
  • Submission turnaround time (target: ≤2 hours from document receipt to carrier submission, vs. previous manual baseline)
  • Staff time savings (target: 20+ hours/week reduction in manual submission work)
  • Carrier submission success rate (target: ≥98% accepted by carrier platforms without errors)
  • User adoption rate (target: 100% of submissions going through AI pipeline by day 90)
  • Zero compliance incidents related to the AI system

Maintenance

Ongoing Maintenance Responsibilities

Weekly Tasks (MSP L2 Technician, ~2 hours/week)

  • Monday AM: Review LangSmith dashboard for prior week — check error rates, average latency, token consumption, and any failed runs. Investigate and resolve any recurring errors.
  • Monday AM: Review n8n execution log — verify all scheduled workflows executed successfully. Check for failed webhook deliveries or timeout errors.
  • Wednesday: Review Azure cost dashboard — verify AI API consumption is within expected range. Alert if spending exceeds 120% of monthly budget.
  • Friday PM: Check extraction accuracy metrics — review any submissions that were rejected by agency staff during the week. Identify patterns in extraction failures for prompt tuning.

Monthly Tasks (MSP L2/L3 Engineer, ~4-6 hours/month)

  • Carrier Portal Audit: Check all RPA-based carrier integrations for portal changes that may have broken selectors. Update Playwright scripts as needed. Budget 2-4 hours for this — carrier portals change frequently.
  • Security Patch Cycle: Apply OS updates to the Azure VM (Ubuntu unattended-upgrades should handle critical patches, but verify). Update Python packages: pip install --upgrade langgraph langchain-openai azure-ai-documentintelligence.
  • Database Maintenance: Run VACUUM ANALYZE on PostgreSQL tables. Check disk usage and archive old audit logs if needed.
  • LLM Model Review: Check for new Azure OpenAI model versions. Test on a sample set before upgrading deployment.
  • Backup Verification: Verify Azure PostgreSQL automated backups are completing. Test point-in-time restore on a quarterly basis.
  • Usage Report: Generate monthly usage report for the agency: submissions processed, time saved, accuracy rates, carrier response times.

Quarterly Tasks (MSP Solutions Architect, ~8 hours/quarter)

  • Accuracy Tune-Up: Review the past quarter's extraction accuracy data. Update prompts for any carrier loss run formats showing declining accuracy. Add new sample documents to the test suite.
  • Compliance Review: Update vendor risk assessments. Review any new state AI regulations. Verify WISP is current. Check that audit logs are complete and retention policies are enforced.
  • Capacity Planning: Review Azure resource utilization. Right-size VM and database tiers based on actual usage. Forecast growth if agency is adding staff or carriers.
  • Feature Review: Meet with agency to discuss new capabilities — additional carriers, new lines of business, workflow improvements, new AI model capabilities.
  • Disaster Recovery Test: Simulate a failure and verify recovery procedures work correctly.

Trigger-Based Maintenance

  • New Carrier Added: When agency gets a new carrier appointment — add carrier to carrier_mapping.json, configure submission method (Tarmika/Semsee/RPA), test with sample submission. Estimate: 2-4 hours per carrier.
  • New Line of Business: When agency wants to add a new LOB (e.g., Cyber, E&O) — add ACORD form mappings, update extraction prompts, test with sample documents. Estimate: 8-16 hours.
  • AMS Upgrade: When Applied/Vertafore/HawkSoft releases a new version — test API compatibility, update any changed endpoints or field mappings.
  • Azure OpenAI Model Update: When new model versions are available — test on sample set, compare accuracy and cost, update deployment if beneficial.
  • Extraction Accuracy Drop: If weekly accuracy metrics drop below 90% — investigate root cause (new document format, OCR degradation, prompt drift), update prompts or retrain custom models.

SLA Considerations

  • Response Time: Critical issues (system down, submissions failing) — 1 hour response, 4 hour resolution during business hours
  • Standard Issues: (accuracy problems, minor bugs) — 4 hour response, next business day resolution
  • Enhancement Requests: (new carriers, features) — acknowledged within 1 business day, scoped and scheduled within 1 week
  • Uptime Target: 99.5% during business hours (Mon-Fri 7am-7pm agency local time)
  • Maintenance Windows: Saturday 10pm-Sunday 6am for planned updates

Escalation Path

1
L1 (Agency Staff): Check FAQ, verify document quality, retry submission
2
L2 (MSP Help Desk): Check n8n logs, verify API connectivity, restart services
3
L3 (MSP Engineer): Debug LangGraph traces in LangSmith, update prompts/code, fix carrier integrations
4
L4 (MSP Solutions Architect): Architecture changes, vendor escalations, compliance issues

Alternatives

...

Turnkey Platform Approach (Indio + Tarmika)

Instead of building a custom AI agent, deploy Applied Systems' Indio platform for application management and data gathering combined with Tarmika Bridge for multi-carrier quoting. Indio provides smart form technology that auto-populates ACORD fields from prior submissions and offers a client-facing portal for digital application intake. Tarmika handles the carrier submission and quoting. Both are established, carrier-approved platforms with existing integrations to Applied Epic and other AMS platforms.

Hybrid Approach: Quoting Platform + Custom AI Extraction Layer

Use Tarmika or Semsee as the carrier submission platform but build a custom AI extraction layer using Azure OpenAI + Document Intelligence that automates the data entry into these platforms. The AI handles document ingestion, OCR, and data extraction, then populates the quoting platform via its API or UI automation. This gives the agency AI-powered efficiency without building the carrier submission infrastructure.

Syntora Purpose-Built Insurance Automation

Engage Syntora, a purpose-built insurance submission automation vendor, to build the automation. Syntora specializes in connecting to carrier portals and AMS systems with pre-built integrations for Applied Epic and Vertafore. They offer fixed-price project engagements (not hourly) with typical 6-week deployment timelines for 10 carriers.

Patra AI + BPO Hybrid Service

Outsource the entire submission process to Patra, which combines AI-powered document processing with human business process outsourcing (BPO). Patra's team handles loss run ordering, data extraction, application population, and carrier submission using their proprietary AI platform augmented by trained insurance processing staff.

Microsoft Power Automate + Copilot Studio Approach

For agencies already deep in the Microsoft ecosystem, build the automation using Microsoft Power Automate (workflow automation), Copilot Studio (AI agent builder), and Azure AI Document Intelligence. This approach uses Microsoft's low-code/no-code tools instead of LangGraph and n8n, potentially enabling the agency's own IT staff to maintain the solution.

Want early access to the full toolkit?