
Implementation Guide: Identify anomalies and outliers in transaction data that may signal errors or fraud
Step-by-step implementation guide for deploying AI to identify anomalies and outliers in transaction data that may signal errors or fraud for Accounting & Bookkeeping clients.
Hardware Procurement
Business-Class Client Workstations
$850 per unit MSP cost / $1,150 suggested resale
Standard endpoint for accounting staff accessing cloud-based anomaly detection dashboards, QBO, and Copilot for Finance. All AI processing occurs in the cloud; workstations need only support modern browsers and Microsoft 365 apps.
Firewall / Security Appliance
$500 MSP cost / $800 suggested resale (hardware only; FortiGuard UTM bundle ~$300/year)
Perimeter security for the client office. Required by FTC Safeguards Rule for accounting firms handling Nonpublic Personal Information (NPPI). Provides IPS, web filtering, and VPN for remote staff accessing financial systems.
Encrypted NAS for Financial Data Backup
$550 MSP cost (unit) + $300 for 2x 4TB Seagate IronWolf drives in RAID 1 = $850 total / $1,200 suggested resale
AES-256 encrypted local backup of financial data exports, anomaly detection reports, audit logs, and WISP documentation. Required by FTC Safeguards Rule for data backup and recovery. Supplements cloud backup.
UPS Battery Backup
$230 MSP cost / $350 suggested resale
Power protection for NAS and firewall. Ensures backup integrity and prevents data corruption during power events.
Analytics Server
$1,800 MSP cost (configured) / $2,500 suggested resale
On-premises server for running custom PyOD/Isolation Forest anomaly detection models if client requires data residency or has transaction volumes exceeding 500K/month. Optional — most implementations use cloud compute instead (Azure VM B2s at ~$30/month).
Software Procurement
QuickBooks Online Advanced
$235/month per client organization (retail); MSP may receive 10-20% via Intuit ProAdvisor wholesale program
Core accounting platform with built-in AI anomaly detection for balance sheets, P&L reports, and transaction categorization. Anomaly detection is included at the Plus ($92/mo) and Advanced ($235/mo) tiers. Advanced is recommended for multi-user firms needing advanced reporting and custom roles.
Intuit Accountant Suite Accelerate
$149/month for the accounting firm (MSP client); available from July 1, 2026 — use Core (free) until then
Centralized dashboard for the accounting firm to monitor anomalies across all their business clients. AI-powered anomaly detection proactively identifies trends and anomalies across financial KPIs, payroll data, and AP/bill pay data. Includes Books Close workflow.
Intuit Books Close Add-On
$8/client/month (under 50 clients) or $6/client/month (50+ clients)
Automated month-end close workflow with anomaly flagging. Surfaces balance sheet discrepancies, uncategorized transactions, and reconciliation anomalies for each client during the close process.
Automated month-end close workflow with anomaly flagging. Surfaces balance sheet discrepancies, uncategorized transactions, and reconciliation anomalies for each client during the close process.
Microsoft 365 Business Standard + Copilot Business
$30.50/user/month bundle via CSP; MSP receives 15-35% partner discount depending on volume commitment
Microsoft Copilot for Finance provides AI-driven variance analysis directly in Excel, identifying anomalies in financial performance and explaining key drivers in natural language. Also provides MFA via Entra ID (required by FTC Safeguards Rule), Teams for collaboration, and SharePoint for document management.
Dext Prepare (Practice Plan)
$239.19/month for 10 business clients; $848.99/month for larger portfolios; partner discounts of 15-25% available
AI-powered document capture and data extraction with 99.9% accuracy. Flags anomalies at the source document level — duplicate receipts, unusual amounts, mismatched vendor data — before transactions even enter the GL. Integrates directly with QBO, Xero, and Sage.
PyOD (Python Outlier Detection)
$0 (software); cloud compute costs of ~$30-60/month on Azure VM or AWS EC2
Phase 3 custom anomaly detection engine. Library with 45+ outlier detection algorithms including Isolation Forest, Local Outlier Factor, AutoEncoder, and ECOD. Used to build tailored detection models trained on each client's historical transaction data.
Supplementary Python library specifically designed for financial anomaly detection. Supports both rule-based methods (Benford's Law, threshold checks) and ML methods. Used alongside PyOD for specialized financial checks.
Azure Virtual Machine (Phase 3)
B2s instance: ~$30/month; B4ms for larger workloads: ~$120/month
Cloud compute for running custom Python anomaly detection models. Preferred over on-prem for most clients due to lower upfront cost, automatic scaling, and managed patching. Azure preferred for M365 integration via Entra ID SSO.
$4-6/endpoint/month MSP cost via Pax8; suggest $10-12/endpoint/month resale
License type: SaaS per-seat
AI-powered endpoint protection required by FTC Safeguards Rule. Protects workstations and server handling financial data. Provides EDR capabilities and audit logging for compliance.
Veeam Backup for Microsoft 365
$2-4/user/month via MSP partner program
Backup of Microsoft 365 data including SharePoint-hosted anomaly reports and Excel workbooks. Ensures compliance with data retention requirements.
Prerequisites
- Client must have an active QuickBooks Online Plus ($92/mo minimum) or Advanced ($235/mo recommended) subscription, or Xero Growing/Established plan, with at least 6 months of historical transaction data for meaningful anomaly baselining
- Client must have a current Written Information Security Plan (WISP) on file per FTC Safeguards Rule. If no WISP exists, MSP must create one before deploying AI tools (billable service at $1,500-$3,500)
- All users accessing financial data must have Multi-Factor Authentication (MFA) enabled — mandatory under FTC Safeguards Rule and IRS Publication 1075 (2025). Microsoft Entra ID or Intuit's built-in MFA satisfy this requirement
- Internet connectivity of 25 Mbps download / 10 Mbps upload minimum at each client location. All anomaly detection platforms are cloud-based SaaS
- Modern web browsers: Chrome 90+, Microsoft Edge 90+, Safari 15+, or Firefox 90+ on all workstations
- Microsoft 365 Business Standard or higher licenses for all accounting staff who will review anomaly alerts and use Copilot for Finance
- Client must provide MSP with QuickBooks Online Accountant access (master admin or firm admin role) for the accounting firm, or equivalent access in Xero/Sage
- Bank feeds must be active and reconciled in QBO/Xero for all accounts to be monitored. Anomaly detection relies on matched and categorized transactions
- Client must designate at least one internal 'Anomaly Review Owner' — a senior bookkeeper or accountant who will triage flagged items
- Existing chart of accounts must be reviewed and cleaned up. Miscategorized historical transactions will generate false positive anomalies. Budget 4-8 hours for chart of accounts cleanup per client
- Data Processing Agreement (DPA) must be signed between client and MSP, and between client and each AI vendor, documenting how financial data is processed, stored, and protected
- If client serves public company clients, confirm SOX compliance requirements and ensure selected AI tools maintain SOC 2 Type II certification
Installation Steps
Step 1: Conduct AI Readiness Assessment
Before any technical deployment, assess the client's current accounting environment, data quality, and compliance posture. This assessment determines which phase(s) to implement and identifies blockers. Document current software stack (QBO version, add-ons, integrations), transaction volume per month, number of business clients (if accounting firm), current reconciliation cadence, and any known fraud/error history.
Bill this as a professional service: $2,500-$5,000. Use the assessment to upsell the full implementation. Create a standardized assessment template in your PSA (ConnectWise/Autotask) to reuse across clients. Key output: a written recommendation document specifying Phase 1, 2, or 3 deployment.
Step 2: Update or Create WISP Documentation
Before deploying any AI tools that touch financial data, the client's Written Information Security Plan must be updated (or created) to include the new AI anomaly detection tooling. The WISP must document: what AI tools are being deployed, what data they access, how data flows between systems, vendor security certifications, access controls, incident response procedures for AI-detected anomalies, and data retention policies. This is a regulatory requirement under FTC Safeguards Rule.
Use the IRS WISP template (Publication 4557) as a starting point. Customize with AI-specific sections. This is a billable deliverable ($1,500-$3,500). Store the WISP on the encrypted NAS and in SharePoint with restricted access. WISP must be reviewed and signed by the client's designated security coordinator.
Step 3: Deploy Security Infrastructure
Install and configure the FortiGate 40F firewall, set up VPN for remote workers, deploy SentinelOne endpoint protection on all workstations, and configure the Synology NAS with AES-256 encryption. This security baseline is required before any financial AI tools can be activated.
# FortiGate initial setup via CLI (connect via console cable)
config system interface
edit port1
set ip 192.168.1.1 255.255.255.0
set allowaccess https ssh ping
next
end# Enable FortiGuard UTM services
config utm# Configure SSL VPN for remote accounting staff
config vpn ssl settings
set servercert 'Fortinet_Factory'
set tunnel-ip-pools 'SSLVPN_TUNNEL_ADDR1'
set port 10443
set source-interface 'wan1'
endSynology NAS — Enable AES-256 Encryption via DSM
SentinelOne Deployment via MSP Console
FortiGate should be registered with FortiCloud for centralized management. Set up FortiGuard UTM bundle ($300/year) for IPS, antivirus, and web filtering. SentinelOne deployment can be automated via your RMM platform. Synology encryption key MUST be stored separately from the NAS — recommend Azure Key Vault or physical safe.
Step 4: Configure Microsoft 365 and Entra ID Security
Ensure all accounting staff have M365 Business Standard licenses. Enable and enforce MFA via Entra ID Conditional Access policies. Create a Security Group for anomaly detection users. Configure SharePoint document library for anomaly reports. Enable audit logging in Microsoft Purview.
# PowerShell - Connect to Microsoft Graph
Install-Module Microsoft.Graph -Scope CurrentUser
Connect-MgGraph -Scopes 'User.ReadWrite.All','Group.ReadWrite.All','Policy.ReadWrite.ConditionalAccess'
# Create security group for anomaly detection users
New-MgGroup -DisplayName 'Anomaly Detection Users' -MailEnabled:$false -SecurityEnabled:$true -MailNickname 'anomaly-detection-users'MFA is mandatory under FTC Safeguards Rule. If client has M365 Business Basic (no Conditional Access), upgrade to Business Premium or add Entra ID P1 ($6/user/month). Document all Conditional Access policies in the WISP. Enable sign-in risk policies if client has Entra ID P2.
Step 5: Upgrade QuickBooks Online and Enable Anomaly Detection
Verify or upgrade each client organization to QBO Plus or Advanced. Access QBO as the accounting firm via Intuit Accountant portal. Enable the built-in AI anomaly detection features which automatically scan balance sheets, P&L reports, and transaction categorizations for anomalies.
QBO anomaly detection is automatic on Plus and Advanced — it identifies anomalies in balance sheets and P&L reports and surfaces detailed explanations. For payroll anomaly detection, the client must be on QBO Payroll Premium or Elite. The anomaly detection improves over time as it learns the client's transaction patterns. Allow 2-4 weeks of baseline learning before relying on alerts.
Step 6: Set Up Intuit Accountant Suite with Books Close
Configure the Intuit Accountant Suite (Core is free; Accelerate at $149/month when available) to centralize anomaly monitoring across all client companies. Enable Books Close for each client to automate month-end anomaly review. Configure alert preferences and notification routing.
The Accelerate tier ($149/mo, available July 2026) adds AI-powered anomaly detection that proactively identifies trends across financial KPIs, payroll data, and AP/bill pay data across the entire client portfolio. Until then, Core (free) plus Books Close ($6-8/client/month) provides good baseline coverage. Configure different alert thresholds per client based on their transaction volume and risk profile.
Step 7: Deploy Dext for Document-Level Anomaly Detection
Set up Dext Prepare to capture and analyze source documents (receipts, invoices, bills) before they enter the GL. Dext's AI flags duplicate documents, unusual amounts, mismatched vendor data, and OCR confidence issues. This catches anomalies at the point of data entry rather than after the fact.
Dext processes documents before they enter QBO, catching errors at the source. The 99.9% accuracy rate for OCR reduces manual data entry errors. Duplicate detection is particularly valuable — it catches the same invoice submitted via email and physical mail. Allow 1-2 weeks for Dext to learn the client's vendor patterns and optimize categorization.
Step 8: Configure Microsoft Copilot for Finance
Activate Microsoft Copilot for Finance for accounting staff who perform variance analysis and financial review. Copilot for Finance works within Excel to automatically identify anomalies in financial data, explain variance drivers in natural language, and flag unusual patterns in budget-vs-actual comparisons.
Prerequisites: M365 Business Standard + Copilot Business license assigned in M365 Admin Center
1. Assign Copilot License
2. Enable Copilot for Finance
3. Connect Financial Data Source
4. Configure Variance Analysis
Analyze variances and identify anomalies5. Create Template Workbooks
- Monthly P&L variance analysis template
- Balance sheet trend analysis template
- Cash flow anomaly detection template
- Save to SharePoint for team access
Copilot for Finance excels at explaining WHY variances occurred, not just flagging them. It uses natural language to describe anomaly drivers, making it accessible to non-technical accounting staff. For SMB clients, the bundled Business Standard + Copilot Business at $30.50/user/month is the most cost-effective option. Copilot improves with use as it learns the firm's financial patterns and terminology.
Step 9: Deploy Custom Python Anomaly Detection Pipeline (Phase 3)
For clients requiring deeper analysis beyond SaaS capabilities — those with 50K+ monthly transactions, complex multi-entity structures, or specific fraud detection requirements — deploy a custom anomaly detection pipeline using PyOD and Finomaly on an Azure VM. This pipeline connects to QBO via API, extracts transaction data nightly, runs multiple anomaly detection algorithms, and pushes alerts to Microsoft Teams and email.
az login
az group create --name rg-anomaly-detection --location eastus2
az vm create --resource-group rg-anomaly-detection --name vm-anomaly-detect --image Ubuntu2204 --size Standard_B2s --admin-username anomalyadmin --generate-ssh-keys --nsg-rule SSHssh anomalyadmin@<vm-public-ip>
sudo apt update && sudo apt upgrade -y
sudo apt install -y python3.11 python3.11-venv python3-pip postgresql postgresql-contrib nginx certbotpython3.11 -m venv /opt/anomaly-detection/venv
source /opt/anomaly-detection/venv/bin/activatepip install pyod==2.0.2 scikit-learn==1.5.2 pandas==2.2.3 numpy==1.26.4 finomaly requests python-quickbooks flask gunicorn psycopg2-binary python-dotenv schedule msrest azure-keyvault-secrets azure-identitysudo -u postgres psql -c "CREATE USER anomaly_user WITH PASSWORD 'CHANGE_ME_STRONG_PASSWORD';"
sudo -u postgres psql -c "CREATE DATABASE anomaly_db OWNER anomaly_user;"
sudo -u postgres psql -c "GRANT ALL PRIVILEGES ON DATABASE anomaly_db TO anomaly_user;"CREATE TABLE transactions (
id SERIAL PRIMARY KEY,
client_id VARCHAR(50) NOT NULL,
txn_id VARCHAR(100) UNIQUE NOT NULL,
txn_date DATE NOT NULL,
amount DECIMAL(15,2) NOT NULL,
account_name VARCHAR(255),
category VARCHAR(255),
vendor_name VARCHAR(255),
txn_type VARCHAR(50),
memo TEXT,
imported_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE anomaly_results (
id SERIAL PRIMARY KEY,
txn_id VARCHAR(100) REFERENCES transactions(txn_id),
client_id VARCHAR(50) NOT NULL,
detection_method VARCHAR(100),
anomaly_score DECIMAL(10,6),
is_anomaly BOOLEAN,
explanation TEXT,
detected_at TIMESTAMP DEFAULT NOW(),
reviewed BOOLEAN DEFAULT FALSE,
reviewer_notes TEXT
);
CREATE INDEX idx_txn_date ON transactions(txn_date);
CREATE INDEX idx_anomaly_client ON anomaly_results(client_id, detected_at);psql -U anomaly_user -d anomaly_db -c "
CREATE TABLE transactions (
id SERIAL PRIMARY KEY,
client_id VARCHAR(50) NOT NULL,
txn_id VARCHAR(100) UNIQUE NOT NULL,
txn_date DATE NOT NULL,
amount DECIMAL(15,2) NOT NULL,
account_name VARCHAR(255),
category VARCHAR(255),
vendor_name VARCHAR(255),
txn_type VARCHAR(50),
memo TEXT,
imported_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE anomaly_results (
id SERIAL PRIMARY KEY,
txn_id VARCHAR(100) REFERENCES transactions(txn_id),
client_id VARCHAR(50) NOT NULL,
detection_method VARCHAR(100),
anomaly_score DECIMAL(10,6),
is_anomaly BOOLEAN,
explanation TEXT,
detected_at TIMESTAMP DEFAULT NOW(),
reviewed BOOLEAN DEFAULT FALSE,
reviewer_notes TEXT
);
CREATE INDEX idx_txn_date ON transactions(txn_date);
CREATE INDEX idx_anomaly_client ON anomaly_results(client_id, detected_at);"az keyvault create --name kv-anomaly-detect --resource-group rg-anomaly-detection --location eastus2
az keyvault secret set --vault-name kv-anomaly-detect --name qbo-client-id --value 'YOUR_INTUIT_APP_CLIENT_ID'
az keyvault secret set --vault-name kv-anomaly-detect --name qbo-client-secret --value 'YOUR_INTUIT_APP_CLIENT_SECRET'
az keyvault secret set --vault-name kv-anomaly-detect --name db-password --value 'CHANGE_ME_STRONG_PASSWORD'cd /opt/anomaly-detection
git clone https://your-msp-repo.git app
cd app
cp .env.example .env
# Edit .env with Azure Key Vault URI and client configurationssudo tee /etc/systemd/system/anomaly-detector.service << 'EOF'
[Unit]
Description=Financial Anomaly Detection Service
After=network.target postgresql.service
[Service]
Type=simple
User=anomalyadmin
WorkingDirectory=/opt/anomaly-detection/app
Environment=PATH=/opt/anomaly-detection/venv/bin
ExecStart=/opt/anomaly-detection/venv/bin/python scheduler.py
Restart=always
RestartSec=60
[Install]
WantedBy=multi-user.target
EOF
sudo systemctl enable anomaly-detector
sudo systemctl start anomaly-detectorsudo tee /etc/nginx/sites-available/anomaly-dashboard << 'EOF'
server {
listen 443 ssl;
server_name anomaly.yourclient.com;
ssl_certificate /etc/letsencrypt/live/anomaly.yourclient.com/fullchain.pem;
ssl_certificate_key /etc/letsencrypt/live/anomaly.yourclient.com/privkey.pem;
location / {
proxy_pass http://127.0.0.1:8000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
}
EOF
sudo ln -s /etc/nginx/sites-available/anomaly-dashboard /etc/nginx/sites-enabled/
sudo certbot --nginx -d anomaly.yourclient.com
sudo systemctl restart nginxPhase 3 is only needed for clients with complex requirements beyond what QBO + Intuit Accelerate + Copilot for Finance provide. Most SMB accounting firms will be well-served by Phases 1 and 2 alone. The Azure VM B2s ($30/month) handles up to ~500K transactions/month. For larger volumes, scale to B4ms ($120/month). All QBO API credentials must be stored in Azure Key Vault, never in plaintext config files. The QBO API uses OAuth 2.0 with refresh tokens that expire after 100 days — the scheduler must handle token refresh automatically.
Step 10: Configure Alert Routing and Escalation Workflow
Set up the alert routing so detected anomalies flow from each detection layer (QBO built-in, Dext, Copilot, custom pipeline) into a unified triage workflow. Configure Microsoft Teams channels, email distribution lists, and PSA ticket creation for different anomaly severity levels.
- Trigger: When a new email arrives (from: alerts@intuit.com, noreply@dext.com)
- Condition: Subject contains 'anomaly' OR 'discrepancy' OR 'unusual'
- Action: Post message to Teams > Anomaly Alerts channel
- Action: Create item in SharePoint list 'Anomaly Tracker'
Priority levels:
- P1 (Critical): Potential fraud indicators, missing funds, unauthorized access
- P2 (High): Large dollar anomalies (>$10K variance), duplicate payments detected
- P3 (Medium): Benford's Law violations, unusual vendor patterns
- P4 (Low): Minor categorization anomalies, rounding discrepancies
TEAMS_WEBHOOK_URL=https://outlook.office.com/webhook/YOUR_WEBHOOK_URL
ALERT_THRESHOLD_HIGH=0.85
ALERT_THRESHOLD_MEDIUM=0.70
PSA_API_KEY=your_connectwise_api_keyThe goal is a single-pane-of-glass view of all anomalies regardless of detection source. Power Automate handles the routing between Intuit alerts, Dext notifications, and custom pipeline alerts. Create escalation rules: P1 anomalies trigger immediate notification to the firm partner; P2 within 4 hours; P3/P4 in daily digest. Document the escalation matrix in the WISP.
Step 11: Baseline Calibration and Threshold Tuning
After all systems are active, run a 2-4 week calibration period where anomalies are detected but not auto-escalated. During this period, the accounting team reviews all flagged items and classifies them as true anomalies or false positives. Use this feedback to tune detection thresholds and reduce noise.
source /opt/anomaly-detection/venv/bin/activate
cd /opt/anomaly-detection/app
python import_historical.py --client-id CLIENT001 --months 12
python calibrate.py --client-id CLIENT001 --output calibration_report.csvanomaly_thresholds:
isolation_forest_contamination: 0.05 # Start at 5%, adjust down to reduce FP
benford_chi_squared_pvalue: 0.01 # 1% significance level
amount_zscore_threshold: 3.0 # Flag transactions >3 std deviations
duplicate_similarity_threshold: 0.92 # 92% match score for duplicate detection
min_anomaly_score: 0.75 # Minimum composite score to flagpython update_thresholds.py --config config.yamlThe calibration period is critical for client satisfaction. Too many false positives will cause alert fatigue and the accounting team will ignore the system. Too few will miss real anomalies. Target: <10% false positive rate after tuning. Common false positive sources: regular large payments (rent, payroll), seasonal patterns, legitimate one-time transactions. Whitelist known recurring large transactions. Re-calibrate quarterly or when the client's business changes significantly (new vendor, new revenue stream, acquisition).
Step 12: Create Anomaly Review Standard Operating Procedure
Document the complete anomaly review workflow as an SOP that the client's accounting team follows. This SOP becomes part of the WISP appendix and the client handoff documentation. It specifies who reviews what, when, how they escalate, and how they document resolution.
The SOP should include: 1) Daily review of anomaly alerts in Teams/email, 2) Triage criteria for each severity level, 3) Investigation steps for common anomaly types (duplicate payments, unusual vendor, round-number transactions, Benford violations), 4) Documentation requirements for each reviewed anomaly (resolved/escalated/false-positive with explanation), 5) Monthly anomaly summary report template, 6) Quarterly threshold review process with MSP, 7) Annual WISP review incorporating anomaly detection effectiveness metrics. Create the SOP as a branded document in the client's SharePoint library.
Custom AI Components
Transaction Anomaly Detection Engine
Type: agent
Core Python-based anomaly detection agent that extracts transactions from QuickBooks Online via API, runs them through multiple detection algorithms (Isolation Forest, ECOD, Benford's Law, Z-score, duplicate detection), computes a composite anomaly score, stores results in PostgreSQL, and triggers alerts for items exceeding configurable thresholds. Runs nightly via scheduler.
Implementation:
# Core anomaly detection engine with QBO client, multi-algorithm detector,
# PostgreSQL storage, and Teams alerting
# anomaly_engine.py - Core anomaly detection engine
import os
import json
import logging
from datetime import datetime, timedelta
from decimal import Decimal
import numpy as np
import pandas as pd
from pyod.models.iforest import IForest
from pyod.models.ecod import ECOD
from pyod.models.lof import LOF
from scipy.stats import chi2
from collections import Counter
import psycopg2
from psycopg2.extras import execute_values
import requests
from dotenv import load_dotenv
load_dotenv()
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
logger = logging.getLogger(__name__)
class QBOClient:
"""QuickBooks Online API client for transaction extraction."""
BASE_URL = 'https://quickbooks.api.intuit.com/v3'
def __init__(self, realm_id, access_token, refresh_token, client_id, client_secret):
self.realm_id = realm_id
self.access_token = access_token
self.refresh_token = refresh_token
self.client_id = client_id
self.client_secret = client_secret
def _refresh_access_token(self):
resp = requests.post(
'https://oauth.platform.intuit.com/oauth2/v1/tokens/bearer',
data={
'grant_type': 'refresh_token',
'refresh_token': self.refresh_token
},
auth=(self.client_id, self.client_secret)
)
resp.raise_for_status()
tokens = resp.json()
self.access_token = tokens['access_token']
self.refresh_token = tokens['refresh_token']
# Persist new tokens to secure storage
return tokens
def query(self, query_string):
headers = {
'Authorization': f'Bearer {self.access_token}',
'Accept': 'application/json',
'Content-Type': 'application/text'
}
url = f'{self.BASE_URL}/company/{self.realm_id}/query?query={query_string}'
resp = requests.get(url, headers=headers)
if resp.status_code == 401:
self._refresh_access_token()
headers['Authorization'] = f'Bearer {self.access_token}'
resp = requests.get(url, headers=headers)
resp.raise_for_status()
return resp.json()
def get_transactions(self, start_date, end_date):
"""Extract all transactions (purchases, sales, journal entries) for a date range."""
transactions = []
# Get Purchase transactions
query = f"SELECT * FROM Purchase WHERE TxnDate >= '{start_date}' AND TxnDate <= '{end_date}' MAXRESULTS 1000"
result = self.query(query)
if 'QueryResponse' in result and 'Purchase' in result['QueryResponse']:
for txn in result['QueryResponse']['Purchase']:
transactions.append({
'txn_id': f"PUR-{txn['Id']}",
'txn_date': txn['TxnDate'],
'amount': float(txn.get('TotalAmt', 0)),
'account_name': txn.get('AccountRef', {}).get('name', 'Unknown'),
'category': txn.get('PaymentType', 'Purchase'),
'vendor_name': txn.get('EntityRef', {}).get('name', 'Unknown'),
'txn_type': 'Purchase',
'memo': txn.get('PrivateNote', '')
})
# Get Sales Receipt transactions
query = f"SELECT * FROM SalesReceipt WHERE TxnDate >= '{start_date}' AND TxnDate <= '{end_date}' MAXRESULTS 1000"
result = self.query(query)
if 'QueryResponse' in result and 'SalesReceipt' in result['QueryResponse']:
for txn in result['QueryResponse']['SalesReceipt']:
transactions.append({
'txn_id': f"SR-{txn['Id']}",
'txn_date': txn['TxnDate'],
'amount': float(txn.get('TotalAmt', 0)),
'account_name': txn.get('DepositToAccountRef', {}).get('name', 'Unknown'),
'category': 'Sales Receipt',
'vendor_name': txn.get('CustomerRef', {}).get('name', 'Unknown'),
'txn_type': 'SalesReceipt',
'memo': txn.get('PrivateNote', '')
})
# Get Journal Entries (high fraud risk)
query = f"SELECT * FROM JournalEntry WHERE TxnDate >= '{start_date}' AND TxnDate <= '{end_date}' MAXRESULTS 1000"
result = self.query(query)
if 'QueryResponse' in result and 'JournalEntry' in result['QueryResponse']:
for txn in result['QueryResponse']['JournalEntry']:
for line in txn.get('Line', []):
if line.get('JournalEntryLineDetail'):
transactions.append({
'txn_id': f"JE-{txn['Id']}-{line.get('Id', 0)}",
'txn_date': txn['TxnDate'],
'amount': float(line.get('Amount', 0)),
'account_name': line['JournalEntryLineDetail'].get('AccountRef', {}).get('name', 'Unknown'),
'category': 'Journal Entry',
'vendor_name': line['JournalEntryLineDetail'].get('Entity', {}).get('EntityRef', {}).get('name', 'Manual Entry'),
'txn_type': f"JournalEntry-{line['JournalEntryLineDetail'].get('PostingType', 'Unknown')}",
'memo': line.get('Description', txn.get('PrivateNote', ''))
})
# Get Bill transactions
query = f"SELECT * FROM Bill WHERE TxnDate >= '{start_date}' AND TxnDate <= '{end_date}' MAXRESULTS 1000"
result = self.query(query)
if 'QueryResponse' in result and 'Bill' in result['QueryResponse']:
for txn in result['QueryResponse']['Bill']:
transactions.append({
'txn_id': f"BILL-{txn['Id']}",
'txn_date': txn['TxnDate'],
'amount': float(txn.get('TotalAmt', 0)),
'account_name': txn.get('APAccountRef', {}).get('name', 'Accounts Payable'),
'category': 'Bill',
'vendor_name': txn.get('VendorRef', {}).get('name', 'Unknown'),
'txn_type': 'Bill',
'memo': txn.get('PrivateNote', '')
})
return transactions
class AnomalyDetector:
"""Multi-algorithm anomaly detection for financial transactions."""
def __init__(self, config):
self.config = config
self.contamination = config.get('isolation_forest_contamination', 0.05)
self.zscore_threshold = config.get('amount_zscore_threshold', 3.0)
self.benford_pvalue = config.get('benford_chi_squared_pvalue', 0.01)
self.duplicate_threshold = config.get('duplicate_similarity_threshold', 0.92)
self.min_anomaly_score = config.get('min_anomaly_score', 0.75)
def _prepare_features(self, df):
"""Create numerical features from transaction data."""
features = pd.DataFrame()
features['amount'] = df['amount'].abs()
features['log_amount'] = np.log1p(features['amount'])
features['day_of_week'] = pd.to_datetime(df['txn_date']).dt.dayofweek
features['day_of_month'] = pd.to_datetime(df['txn_date']).dt.day
features['is_weekend'] = (features['day_of_week'] >= 5).astype(int)
features['is_round_number'] = ((features['amount'] % 100 == 0) & (features['amount'] > 0)).astype(int)
features['is_just_below_threshold'] = (
((features['amount'] >= 9000) & (features['amount'] < 10000)) |
((features['amount'] >= 4500) & (features['amount'] < 5000)) |
((features['amount'] >= 2400) & (features['amount'] < 2500))
).astype(int)
# Vendor frequency (rare vendors are more suspicious)
vendor_counts = df['vendor_name'].value_counts()
features['vendor_frequency'] = df['vendor_name'].map(vendor_counts)
features['is_rare_vendor'] = (features['vendor_frequency'] <= 2).astype(int)
# Account-level statistics
account_means = df.groupby('account_name')['amount'].transform('mean')
account_stds = df.groupby('account_name')['amount'].transform('std').fillna(1)
features['account_zscore'] = ((df['amount'].abs() - account_means.abs()) / account_stds).abs()
# Transaction type encoding
txn_type_map = {'Purchase': 0, 'SalesReceipt': 1, 'JournalEntry-Debit': 2, 'JournalEntry-Credit': 3, 'Bill': 4}
features['txn_type_encoded'] = df['txn_type'].map(txn_type_map).fillna(5)
features = features.fillna(0)
return features
def detect_isolation_forest(self, df, features):
"""Isolation Forest anomaly detection."""
if len(features) < 20:
return pd.Series([0.0] * len(features), index=features.index)
model = IForest(contamination=self.contamination, random_state=42, n_estimators=200)
model.fit(features.values)
scores = model.decision_scores_
# Normalize to 0-1 range
normalized = (scores - scores.min()) / (scores.max() - scores.min() + 1e-10)
return pd.Series(normalized, index=features.index)
def detect_ecod(self, df, features):
"""ECOD (Empirical Cumulative Distribution) anomaly detection."""
if len(features) < 20:
return pd.Series([0.0] * len(features), index=features.index)
model = ECOD(contamination=self.contamination)
model.fit(features.values)
scores = model.decision_scores_
normalized = (scores - scores.min()) / (scores.max() - scores.min() + 1e-10)
return pd.Series(normalized, index=features.index)
def detect_zscore_anomalies(self, df):
"""Statistical Z-score anomaly detection on amounts."""
amounts = df['amount'].abs()
mean_amt = amounts.mean()
std_amt = amounts.std()
if std_amt == 0:
return pd.Series([0.0] * len(df), index=df.index)
zscores = ((amounts - mean_amt) / std_amt).abs()
# Normalize: z-score of 3+ maps to 1.0
normalized = (zscores / self.zscore_threshold).clip(0, 1)
return normalized
def detect_benfords_law(self, df):
"""Benford's Law analysis for first-digit distribution."""
amounts = df['amount'].abs()
amounts = amounts[amounts >= 10] # Need at least 2-digit numbers
if len(amounts) < 50: # Need sufficient sample size
return pd.Series([0.0] * len(df), index=df.index), None
first_digits = amounts.apply(lambda x: int(str(x).lstrip('0').lstrip('.')[0]) if x > 0 else 0)
first_digits = first_digits[first_digits > 0]
observed = Counter(first_digits)
total = sum(observed.values())
# Benford's expected distribution
expected_pcts = {d: np.log10(1 + 1/d) for d in range(1, 10)}
chi2_stat = 0
for digit in range(1, 10):
observed_count = observed.get(digit, 0)
expected_count = expected_pcts[digit] * total
chi2_stat += (observed_count - expected_count) ** 2 / expected_count
p_value = 1 - chi2.cdf(chi2_stat, df=8)
# Score individual transactions: digits that deviate most from Benford's get higher scores
scores = []
for idx, row in df.iterrows():
amt = abs(row['amount'])
if amt >= 10:
fd = int(str(amt).lstrip('0').lstrip('.')[0])
if fd > 0:
observed_pct = observed.get(fd, 0) / total
expected_pct = expected_pcts.get(fd, 0.1)
deviation = abs(observed_pct - expected_pct) / expected_pct
scores.append(min(deviation, 1.0))
else:
scores.append(0.0)
else:
scores.append(0.0)
benford_result = {
'chi2_statistic': chi2_stat,
'p_value': p_value,
'is_violation': p_value < self.benford_pvalue
}
return pd.Series(scores, index=df.index), benford_result
def detect_duplicates(self, df):
"""Detect potential duplicate transactions."""
scores = pd.Series([0.0] * len(df), index=df.index)
for i, row in df.iterrows():
for j, other in df.iterrows():
if i >= j:
continue
# Same amount, same vendor, within 7 days
if (abs(row['amount']) == abs(other['amount']) and
row['vendor_name'] == other['vendor_name'] and
abs((pd.to_datetime(row['txn_date']) - pd.to_datetime(other['txn_date'])).days) <= 7 and
row['txn_id'] != other['txn_id']):
scores[i] = max(scores[i], 0.9)
scores[j] = max(scores[j], 0.9)
# Same amount, different vendor, same day (possible split payment fraud)
elif (abs(row['amount']) == abs(other['amount']) and
row['vendor_name'] != other['vendor_name'] and
row['txn_date'] == other['txn_date'] and
abs(row['amount']) > 1000):
scores[i] = max(scores[i], 0.6)
scores[j] = max(scores[j], 0.6)
return scores
def compute_composite_score(self, scores_dict):
"""Weighted composite anomaly score from all detection methods."""
weights = {
'isolation_forest': 0.25,
'ecod': 0.20,
'zscore': 0.15,
'benfords': 0.15,
'duplicates': 0.25
}
composite = pd.Series([0.0] * len(scores_dict['isolation_forest']))
for method, weight in weights.items():
if method in scores_dict:
composite += scores_dict[method] * weight
return composite
def generate_explanation(self, row, scores):
"""Generate human-readable explanation of why a transaction was flagged."""
explanations = []
if scores.get('isolation_forest', 0) > 0.7:
explanations.append('Unusual pattern detected by ML model (multiple features deviate from norm)')
if scores.get('zscore', 0) > 0.7:
explanations.append(f'Amount (${abs(row["amount"]):,.2f}) is statistically unusual for this account')
if scores.get('benfords', 0) > 0.5:
explanations.append('First-digit distribution deviates from Benford\'s Law (potential manipulation)')
if scores.get('duplicates', 0) > 0.7:
explanations.append('Possible duplicate: matching amount and vendor within 7-day window')
if scores.get('ecod', 0) > 0.7:
explanations.append('Transaction falls in an empirically rare distribution region')
if row.get('is_round_number'):
explanations.append('Suspiciously round dollar amount')
if row.get('is_just_below_threshold'):
explanations.append('Amount just below common approval threshold (possible structuring)')
return '; '.join(explanations) if explanations else 'Composite score exceeded threshold across multiple factors'
def run_detection(self, df):
"""Run all anomaly detection methods and return results."""
if len(df) == 0:
return []
features = self._prepare_features(df)
scores = {}
scores['isolation_forest'] = self.detect_isolation_forest(df, features)
scores['ecod'] = self.detect_ecod(df, features)
scores['zscore'] = self.detect_zscore_anomalies(df)
benford_scores, benford_result = self.detect_benfords_law(df)
scores['benfords'] = benford_scores
scores['duplicates'] = self.detect_duplicates(df)
composite = self.compute_composite_score(scores)
results = []
for idx in df.index:
if composite[idx] >= self.min_anomaly_score:
row_scores = {method: float(s[idx]) for method, s in scores.items()}
results.append({
'txn_id': df.loc[idx, 'txn_id'],
'client_id': df.loc[idx, 'client_id'],
'composite_score': float(composite[idx]),
'method_scores': row_scores,
'is_anomaly': True,
'explanation': self.generate_explanation(df.loc[idx], row_scores),
'detection_method': 'composite'
})
logger.info(f'Detected {len(results)} anomalies out of {len(df)} transactions')
return results, benford_result
class DatabaseManager:
"""PostgreSQL database operations."""
def __init__(self, connection_string):
self.connection_string = connection_string
def get_connection(self):
return psycopg2.connect(self.connection_string)
def store_transactions(self, transactions, client_id):
conn = self.get_connection()
cur = conn.cursor()
values = [
(client_id, t['txn_id'], t['txn_date'], t['amount'],
t['account_name'], t['category'], t['vendor_name'],
t['txn_type'], t['memo'])
for t in transactions
]
execute_values(cur, """
INSERT INTO transactions (client_id, txn_id, txn_date, amount,
account_name, category, vendor_name, txn_type, memo)
VALUES %s
ON CONFLICT (txn_id) DO NOTHING
""", values)
conn.commit()
cur.close()
conn.close()
def store_anomaly_results(self, results):
conn = self.get_connection()
cur = conn.cursor()
values = [
(r['txn_id'], r['client_id'], r['detection_method'],
r['composite_score'], r['is_anomaly'], r['explanation'])
for r in results
]
execute_values(cur, """
INSERT INTO anomaly_results (txn_id, client_id, detection_method,
anomaly_score, is_anomaly, explanation)
VALUES %s
""", values)
conn.commit()
cur.close()
conn.close()
def get_historical_transactions(self, client_id, months=6):
conn = self.get_connection()
start_date = (datetime.now() - timedelta(days=months*30)).strftime('%Y-%m-%d')
df = pd.read_sql(
f"SELECT * FROM transactions WHERE client_id = %s AND txn_date >= %s",
conn, params=[client_id, start_date]
)
conn.close()
return df
class AlertManager:
"""Send anomaly alerts to Teams and email."""
def __init__(self, teams_webhook_url, smtp_config=None):
self.teams_webhook_url = teams_webhook_url
self.smtp_config = smtp_config
def send_teams_alert(self, anomalies, client_name, benford_result=None):
if not anomalies:
return
severity = 'High' if any(a['composite_score'] > 0.9 for a in anomalies) else 'Medium'
color = 'FF0000' if severity == 'High' else 'FFA500'
anomaly_rows = ''
for a in sorted(anomalies, key=lambda x: x['composite_score'], reverse=True)[:10]:
anomaly_rows += f"- **{a['txn_id']}** (Score: {a['composite_score']:.2f}): {a['explanation']}\n"
benford_note = ''
if benford_result and benford_result.get('is_violation'):
benford_note = f"\n\n⚠️ **Benford's Law Violation Detected** (p-value: {benford_result['p_value']:.4f}) — overall digit distribution in this client's transactions deviates from expected patterns, which may indicate systematic data manipulation."
card = {
'@type': 'MessageCard',
'@context': 'http://schema.org/extensions',
'themeColor': color,
'summary': f'{severity} Priority: {len(anomalies)} anomalies detected for {client_name}',
'sections': [{
'activityTitle': f'🔍 {severity} Priority: {len(anomalies)} Transaction Anomalies Detected',
'activitySubtitle': f'Client: {client_name} | {datetime.now().strftime("%Y-%m-%d %H:%M")}',
'text': f'{anomaly_rows}{benford_note}',
'markdown': True
}]
}
resp = requests.post(self.teams_webhook_url, json=card)
resp.raise_for_status()
logger.info(f'Teams alert sent for {client_name}: {len(anomalies)} anomalies')Nightly Transaction Extraction Scheduler
Type: workflow
Scheduled workflow that runs nightly at 2:00 AM, iterates through all configured client QBO connections, extracts the last 24 hours of transactions, feeds them through the anomaly detection engine along with historical context, stores results, and sends alerts. Handles OAuth token refresh, error recovery, and logging.
Implementation:
# Nightly anomaly detection scheduler with QBO extraction, anomaly
# detection, alerting, and weekly reporting
# scheduler.py - Nightly anomaly detection scheduler
import os
import yaml
import logging
import schedule
import time
from datetime import datetime, timedelta
from anomaly_engine import QBOClient, AnomalyDetector, DatabaseManager, AlertManager
import pandas as pd
from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s %(message)s',
handlers=[
logging.FileHandler('/opt/anomaly-detection/logs/scheduler.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def load_config():
with open('/opt/anomaly-detection/app/config.yaml', 'r') as f:
return yaml.safe_load(f)
def get_secrets(vault_url):
credential = DefaultAzureCredential()
client = SecretClient(vault_url=vault_url, credential=credential)
return {
'qbo_client_id': client.get_secret('qbo-client-id').value,
'qbo_client_secret': client.get_secret('qbo-client-secret').value,
'db_password': client.get_secret('db-password').value
}
def run_anomaly_detection():
logger.info('=== Starting nightly anomaly detection run ===')
config = load_config()
secrets = get_secrets(config['azure_keyvault_url'])
db_conn_string = f"host={config['db_host']} dbname={config['db_name']} user={config['db_user']} password={secrets['db_password']}"
db = DatabaseManager(db_conn_string)
detector = AnomalyDetector(config.get('anomaly_thresholds', {}))
alert_mgr = AlertManager(
teams_webhook_url=config['teams_webhook_url']
)
end_date = datetime.now().strftime('%Y-%m-%d')
start_date = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
for client_config in config.get('clients', []):
client_id = client_config['id']
client_name = client_config['name']
realm_id = client_config['qbo_realm_id']
try:
logger.info(f'Processing client: {client_name} ({client_id})')
# Initialize QBO client
qbo = QBOClient(
realm_id=realm_id,
access_token=client_config.get('access_token', ''),
refresh_token=client_config['refresh_token'],
client_id=secrets['qbo_client_id'],
client_secret=secrets['qbo_client_secret']
)
# Extract new transactions
new_txns = qbo.get_transactions(start_date, end_date)
logger.info(f' Extracted {len(new_txns)} new transactions')
if not new_txns:
logger.info(f' No new transactions for {client_name}, skipping')
continue
# Add client_id to transactions
for txn in new_txns:
txn['client_id'] = client_id
# Store in database
db.store_transactions(new_txns, client_id)
# Get historical transactions for context (6 months)
historical_df = db.get_historical_transactions(client_id, months=6)
# Combine historical + new for detection (model needs context)
new_df = pd.DataFrame(new_txns)
combined_df = pd.concat([historical_df, new_df], ignore_index=True)
combined_df = combined_df.drop_duplicates(subset=['txn_id'])
# Run anomaly detection
all_anomalies, benford_result = detector.run_detection(combined_df)
# Filter to only anomalies in new transactions
new_txn_ids = set(t['txn_id'] for t in new_txns)
new_anomalies = [a for a in all_anomalies if a['txn_id'] in new_txn_ids]
logger.info(f' Found {len(new_anomalies)} anomalies in new transactions')
# Store anomaly results
if new_anomalies:
db.store_anomaly_results(new_anomalies)
alert_mgr.send_teams_alert(new_anomalies, client_name, benford_result)
# Update stored tokens if refreshed
client_config['access_token'] = qbo.access_token
client_config['refresh_token'] = qbo.refresh_token
except Exception as e:
logger.error(f' ERROR processing {client_name}: {str(e)}', exc_info=True)
# Send error alert to MSP team
alert_mgr.send_teams_alert(
[{'txn_id': 'SYSTEM', 'composite_score': 1.0,
'explanation': f'Pipeline error for {client_name}: {str(e)}'}],
f'SYSTEM ERROR - {client_name}'
)
continue
# Save updated tokens back to config
with open('/opt/anomaly-detection/app/config.yaml', 'w') as f:
yaml.dump(config, f)
logger.info('=== Nightly anomaly detection run complete ===')
def run_weekly_report():
"""Generate weekly summary report of all anomalies across all clients."""
logger.info('=== Generating weekly anomaly summary ===')
config = load_config()
secrets = get_secrets(config['azure_keyvault_url'])
db_conn_string = f"host={config['db_host']} dbname={config['db_name']} user={config['db_user']} password={secrets['db_password']}"
db = DatabaseManager(db_conn_string)
alert_mgr = AlertManager(teams_webhook_url=config['teams_webhook_url'])
conn = db.get_connection()
week_ago = (datetime.now() - timedelta(days=7)).strftime('%Y-%m-%d')
summary = pd.read_sql(f"""
SELECT client_id, COUNT(*) as anomaly_count,
AVG(anomaly_score) as avg_score,
MAX(anomaly_score) as max_score,
SUM(CASE WHEN reviewed THEN 1 ELSE 0 END) as reviewed_count
FROM anomaly_results
WHERE detected_at >= %s
GROUP BY client_id
ORDER BY anomaly_count DESC
""", conn, params=[week_ago])
conn.close()
if not summary.empty:
report_lines = []
for _, row in summary.iterrows():
review_pct = (row['reviewed_count'] / row['anomaly_count'] * 100) if row['anomaly_count'] > 0 else 0
report_lines.append(
f"- **{row['client_id']}**: {int(row['anomaly_count'])} anomalies "
f"(avg score: {row['avg_score']:.2f}, max: {row['max_score']:.2f}, "
f"{review_pct:.0f}% reviewed)"
)
card = {
'@type': 'MessageCard',
'@context': 'http://schema.org/extensions',
'themeColor': '0076D7',
'summary': 'Weekly Anomaly Detection Summary',
'sections': [{
'activityTitle': '📊 Weekly Anomaly Detection Summary',
'activitySubtitle': f'Week ending {datetime.now().strftime("%Y-%m-%d")}',
'text': '\n'.join(report_lines),
'markdown': True
}]
}
requests.post(config['teams_webhook_url'], json=card)
logger.info('=== Weekly report complete ===')
if __name__ == '__main__':
# Schedule nightly run at 2:00 AM
schedule.every().day.at('02:00').do(run_anomaly_detection)
# Schedule weekly summary on Monday at 8:00 AM
schedule.every().monday.at('08:00').do(run_weekly_report)
logger.info('Anomaly detection scheduler started')
logger.info('Nightly detection: 2:00 AM daily')
logger.info('Weekly summary: Monday 8:00 AM')
while True:
schedule.run_pending()
time.sleep(60)Configuration Template
Type: integration
YAML configuration file that defines client connections, detection thresholds, alert routing, and operational parameters. Each client is configured with their QBO realm ID, OAuth tokens, and custom threshold overrides.
Implementation:
# Anomaly Detection Pipeline Configuration (store at /opt/anomaly-
# detection/app/config.yaml)
# config.yaml - Anomaly Detection Pipeline Configuration
# Store this file at /opt/anomaly-detection/app/config.yaml
# SENSITIVE: Tokens are stored here temporarily; migrate to Azure Key Vault for production
azure_keyvault_url: 'https://kv-anomaly-detect.vault.azure.net/'
# Database configuration
db_host: 'localhost'
db_name: 'anomaly_db'
db_user: 'anomaly_user'
# db_password stored in Azure Key Vault as 'db-password'
# Microsoft Teams webhook for alerts
teams_webhook_url: 'https://outlook.office.com/webhook/YOUR-WEBHOOK-URL-HERE'
# Global anomaly detection thresholds (can be overridden per client)
anomaly_thresholds:
isolation_forest_contamination: 0.05 # Expected % of anomalies (5%)
amount_zscore_threshold: 3.0 # Standard deviations for amount outlier
benford_chi_squared_pvalue: 0.01 # P-value for Benford's Law violation
duplicate_similarity_threshold: 0.92 # Similarity score for duplicate detection
min_anomaly_score: 0.75 # Minimum composite score to flag as anomaly
# Algorithm weights (must sum to 1.0)
weights:
isolation_forest: 0.25
ecod: 0.20
zscore: 0.15
benfords: 0.15
duplicates: 0.25
# Alert severity thresholds
alert_thresholds:
critical: 0.95 # P1 - Immediate escalation
high: 0.85 # P2 - Within 4 hours
medium: 0.75 # P3 - Daily digest
# Client configurations
clients:
- id: 'CLIENT001'
name: 'Acme Manufacturing LLC'
qbo_realm_id: '1234567890'
refresh_token: 'STORED_IN_KEYVAULT_OR_REFRESHED_AUTOMATICALLY'
access_token: ''
# Override global thresholds for this client (optional)
threshold_overrides:
min_anomaly_score: 0.80 # Higher threshold = fewer alerts for this client
# Whitelist known large recurring transactions
whitelist:
- vendor: 'ABC Landlord LLC'
amount: 15000.00
frequency: 'monthly'
- vendor: 'ADP Payroll'
amount_range: [45000, 55000]
frequency: 'biweekly'
- id: 'CLIENT002'
name: 'Smith & Associates CPA'
qbo_realm_id: '0987654321'
refresh_token: 'STORED_IN_KEYVAULT_OR_REFRESHED_AUTOMATICALLY'
access_token: ''
threshold_overrides:
min_anomaly_score: 0.70 # Lower threshold for this high-risk client
whitelist: []
# Scheduler settings
scheduler:
daily_run_time: '02:00'
weekly_report_day: 'monday'
weekly_report_time: '08:00'
retry_on_failure: true
max_retries: 3
retry_delay_seconds: 300
# Data retention
retention:
transaction_history_months: 24
anomaly_results_months: 36
log_retention_days: 90Anomaly Review Dashboard
Type: integration
A lightweight Flask web dashboard that displays detected anomalies, allows the accounting team to review and classify them (true positive, false positive, needs investigation), and tracks review completion metrics. Protected by Azure Entra ID SSO.
Implementation:
# Flask-based anomaly review dashboard with embedded HTML template,
# filtering, review actions, and REST API endpoint
# dashboard.py - Flask-based anomaly review dashboard
from flask import Flask, render_template_string, request, redirect, jsonify
import psycopg2
import psycopg2.extras
import os
from datetime import datetime, timedelta
from functools import wraps
app = Flask(__name__)
app.secret_key = os.environ.get('FLASK_SECRET_KEY', 'change-me-in-production')
DB_CONN = os.environ.get('DATABASE_URL', 'host=localhost dbname=anomaly_db user=anomaly_user password=changeme')
DASHBOARD_HTML = '''
<!DOCTYPE html>
<html>
<head>
<title>Financial Anomaly Detection Dashboard</title>
<style>
* { box-sizing: border-box; margin: 0; padding: 0; }
body { font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', sans-serif; background: #f5f6fa; color: #2d3436; }
.header { background: #2d3436; color: white; padding: 20px 40px; display: flex; justify-content: space-between; align-items: center; }
.header h1 { font-size: 1.4em; }
.stats { display: grid; grid-template-columns: repeat(4, 1fr); gap: 20px; padding: 20px 40px; }
.stat-card { background: white; border-radius: 8px; padding: 20px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }
.stat-card h3 { color: #636e72; font-size: 0.85em; text-transform: uppercase; }
.stat-card .value { font-size: 2em; font-weight: bold; margin-top: 5px; }
.stat-card .value.critical { color: #e74c3c; }
.stat-card .value.warning { color: #f39c12; }
.stat-card .value.success { color: #27ae60; }
.filters { padding: 10px 40px; display: flex; gap: 10px; align-items: center; }
.filters select, .filters input { padding: 8px 12px; border: 1px solid #ddd; border-radius: 4px; }
table { width: calc(100% - 80px); margin: 20px 40px; border-collapse: collapse; background: white; border-radius: 8px; overflow: hidden; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }
th { background: #636e72; color: white; padding: 12px 16px; text-align: left; font-size: 0.85em; text-transform: uppercase; }
td { padding: 12px 16px; border-bottom: 1px solid #eee; }
tr:hover { background: #f8f9fa; }
.score-badge { display: inline-block; padding: 4px 10px; border-radius: 12px; font-weight: bold; font-size: 0.85em; }
.score-high { background: #ffe0e0; color: #c0392b; }
.score-medium { background: #fff3e0; color: #e67e22; }
.score-low { background: #e8f5e9; color: #27ae60; }
.btn { padding: 6px 14px; border: none; border-radius: 4px; cursor: pointer; font-size: 0.85em; }
.btn-approve { background: #27ae60; color: white; }
.btn-fp { background: #95a5a6; color: white; }
.btn-investigate { background: #e74c3c; color: white; }
.review-actions { display: flex; gap: 5px; }
</style>
</head>
<body>
<div class="header">
<h1>🔍 Financial Anomaly Detection Dashboard</h1>
<span>{{ current_date }}</span>
</div>
<div class="stats">
<div class="stat-card">
<h3>Open Anomalies</h3>
<div class="value critical">{{ stats.open_count }}</div>
</div>
<div class="stat-card">
<h3>This Week</h3>
<div class="value warning">{{ stats.week_count }}</div>
</div>
<div class="stat-card">
<h3>Reviewed</h3>
<div class="value success">{{ stats.reviewed_count }}</div>
</div>
<div class="stat-card">
<h3>Review Rate</h3>
<div class="value">{{ stats.review_rate }}%</div>
</div>
</div>
<div class="filters">
<form method="GET">
<select name="client_id">
<option value="">All Clients</option>
{% for c in clients %}
<option value="{{ c }}" {{ "selected" if c == selected_client }}>{{ c }}</option>
{% endfor %}
</select>
<select name="status">
<option value="open" {{ "selected" if status == "open" }}>Open</option>
<option value="all" {{ "selected" if status == "all" }}>All</option>
<option value="reviewed" {{ "selected" if status == "reviewed" }}>Reviewed</option>
</select>
<input type="date" name="from_date" value="{{ from_date }}">
<input type="date" name="to_date" value="{{ to_date }}">
<button class="btn btn-approve" type="submit">Filter</button>
</form>
</div>
<table>
<thead>
<tr>
<th>Date Detected</th>
<th>Client</th>
<th>Transaction</th>
<th>Score</th>
<th>Explanation</th>
<th>Status</th>
<th>Actions</th>
</tr>
</thead>
<tbody>
{% for a in anomalies %}
<tr>
<td>{{ a.detected_at.strftime("%Y-%m-%d %H:%M") }}</td>
<td>{{ a.client_id }}</td>
<td>{{ a.txn_id }}</td>
<td>
<span class="score-badge {% if a.anomaly_score > 0.9 %}score-high{% elif a.anomaly_score > 0.8 %}score-medium{% else %}score-low{% endif %}">
{{ "%.2f"|format(a.anomaly_score) }}
</span>
</td>
<td>{{ a.explanation }}</td>
<td>{{ "✅ Reviewed" if a.reviewed else "⏳ Open" }}</td>
<td>
{% if not a.reviewed %}
<div class="review-actions">
<form method="POST" action="/review/{{ a.id }}">
<input type="hidden" name="action" value="true_positive">
<button class="btn btn-investigate" type="submit">Confirm</button>
</form>
<form method="POST" action="/review/{{ a.id }}">
<input type="hidden" name="action" value="false_positive">
<button class="btn btn-fp" type="submit">False +</button>
</form>
</div>
{% else %}
{{ a.reviewer_notes or "-" }}
{% endif %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
</body>
</html>
'''
def get_db():
return psycopg2.connect(DB_CONN)
@app.route('/')
def dashboard():
conn = get_db()
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
client_id = request.args.get('client_id', '')
status = request.args.get('status', 'open')
from_date = request.args.get('from_date', (datetime.now() - timedelta(days=30)).strftime('%Y-%m-%d'))
to_date = request.args.get('to_date', datetime.now().strftime('%Y-%m-%d'))
# Stats
cur.execute('SELECT COUNT(*) as cnt FROM anomaly_results WHERE NOT reviewed')
open_count = cur.fetchone()['cnt']
week_ago = (datetime.now() - timedelta(days=7)).strftime('%Y-%m-%d')
cur.execute('SELECT COUNT(*) as cnt FROM anomaly_results WHERE detected_at >= %s', [week_ago])
week_count = cur.fetchone()['cnt']
cur.execute('SELECT COUNT(*) as cnt FROM anomaly_results WHERE reviewed')
reviewed_count = cur.fetchone()['cnt']
total = open_count + reviewed_count
review_rate = round((reviewed_count / total * 100) if total > 0 else 0)
# Client list
cur.execute('SELECT DISTINCT client_id FROM anomaly_results ORDER BY client_id')
clients = [r['client_id'] for r in cur.fetchall()]
# Anomalies
query = 'SELECT * FROM anomaly_results WHERE detected_at >= %s AND detected_at <= %s'
params = [from_date, to_date]
if client_id:
query += ' AND client_id = %s'
params.append(client_id)
if status == 'open':
query += ' AND NOT reviewed'
elif status == 'reviewed':
query += ' AND reviewed'
query += ' ORDER BY anomaly_score DESC, detected_at DESC LIMIT 200'
cur.execute(query, params)
anomalies = cur.fetchall()
cur.close()
conn.close()
return render_template_string(DASHBOARD_HTML,
current_date=datetime.now().strftime('%B %d, %Y'),
stats={'open_count': open_count, 'week_count': week_count,
'reviewed_count': reviewed_count, 'review_rate': review_rate},
clients=clients, selected_client=client_id, status=status,
from_date=from_date, to_date=to_date, anomalies=anomalies)
@app.route('/review/<int:anomaly_id>', methods=['POST'])
def review_anomaly(anomaly_id):
action = request.form.get('action', 'false_positive')
notes = 'Confirmed anomaly - needs investigation' if action == 'true_positive' else 'Reviewed - false positive'
conn = get_db()
cur = conn.cursor()
cur.execute(
'UPDATE anomaly_results SET reviewed = TRUE, reviewer_notes = %s WHERE id = %s',
[notes, anomaly_id]
)
conn.commit()
cur.close()
conn.close()
return redirect(request.referrer or '/')
@app.route('/api/anomalies')
def api_anomalies():
"""REST API endpoint for PSA integration."""
conn = get_db()
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute(
'SELECT * FROM anomaly_results WHERE NOT reviewed ORDER BY anomaly_score DESC LIMIT 50'
)
results = cur.fetchall()
cur.close()
conn.close()
for r in results:
r['detected_at'] = r['detected_at'].isoformat()
r['anomaly_score'] = float(r['anomaly_score'])
return jsonify(results)
if __name__ == '__main__':
app.run(host='127.0.0.1', port=8000, debug=False)Benford's Law Monthly Audit Prompt
Copilot for Finance Prompt Template: Benford's Law Audit
Instructions for Accounting Staff
Excel Copilot prompt — Transaction anomaly detection for monthly review
Expected Output
Copilot will generate a structured analysis table and narrative that the bookkeeper can:
- Include in the monthly close package
- Share with the client as evidence of thorough review
- Use to prioritize manual investigation
- Archive for audit trail purposes
Frequency
Run this prompt monthly during the close process for each client. Save each month's output in the client's SharePoint folder under:
[Client Name]/Anomaly Reports/YYYY-MM Benford Analysis.xlsxPSA Ticket Creation Integration
Type: integration
Power Automate flow specification that automatically creates tickets in ConnectWise PSA when high-severity anomalies are detected, enabling the MSP to track anomaly resolution through their standard service delivery workflow.
Implementation:
Power Automate Flow: Anomaly-to-PSA-Ticket
Flow Name
Financial Anomaly - Create PSA TicketTrigger
- Type: When an HTTP request is received (webhook)
- URL: Auto-generated by Power Automate
- Method: POST
- JSON Schema: (see below)
{
"type": "object",
"properties": {
"client_id": {"type": "string"},
"client_name": {"type": "string"},
"anomaly_count": {"type": "integer"},
"max_score": {"type": "number"},
"anomalies": {
"type": "array",
"items": {
"type": "object",
"properties": {
"txn_id": {"type": "string"},
"score": {"type": "number"},
"explanation": {"type": "string"}
}
}
}
}
}Flow Steps
Step 1: Initialize Variables
varPriority(String): Set based onmax_score
- If max_score >= 0.95 → "Priority 1 - Critical"
- If max_score >= 0.85 → "Priority 2 - High"
- If max_score >= 0.75 → "Priority 3 - Medium"
- Else → "Priority 4 - Low"
Step 2: Compose Ticket Description
Expression:
concat(
'Automated Anomaly Detection Alert\n\n',
'Client: ', triggerBody()?['client_name'], '\n',
'Anomalies Detected: ', string(triggerBody()?['anomaly_count']), '\n',
'Highest Score: ', string(triggerBody()?['max_score']), '\n\n',
'Top Anomalies:\n',
join(
map(take(triggerBody()?['anomalies'], 5),
item => concat('- ', item['txn_id'], ' (Score: ', string(item['score']), '): ', item['explanation'])
), '\n'
),
'\n\nAction Required: Review flagged transactions in the Anomaly Detection Dashboard.\n',
'Dashboard URL: https://anomaly.yourclient.com'
)Step 3: Create ConnectWise Ticket (HTTP Action)
- Method: POST
- URI:
https://api-na.myconnectwise.net/v4_6_release/apis/3.0/service/tickets
Headers:
Authorization: Basic [Base64 encoded company+publickey:privatekey]Content-Type: application/jsonclientId: [your-connectwise-client-id]
Body:
{
"summary": "[AI Alert] @{triggerBody()?['anomaly_count']} Transaction Anomalies - @{triggerBody()?['client_name']}",
"board": {"id": 123, "name": "Financial Anomaly Review"},
"company": {"identifier": "@{triggerBody()?['client_id']}"},
"priority": {"id": "@{if(greaterOrEquals(triggerBody()?['max_score'], 0.95), 1, if(greaterOrEquals(triggerBody()?['max_score'], 0.85), 2, if(greaterOrEquals(triggerBody()?['max_score'], 0.75), 3, 4)))}"},
"status": {"name": "New"},
"type": {"name": "AI Anomaly Alert"},
"initialDescription": "@{outputs('Compose_Ticket_Description')}",
"automaticEmailContactFlag": true,
"automaticEmailResourceFlag": true
}Step 4: Condition - If Priority 1 (Critical)
- Send immediate email to MSP service manager and client firm partner
- Post to dedicated Teams channel with @mention
Step 5: Log Flow Execution
- Create SharePoint list item for audit trail
Setup Instructions for MSP Technician
psa_webhook_url)Testing & Validation
Connectivity Test
Log into QuickBooks Online via the Intuit Accountant portal and verify you can access each client company. Navigate to Reports > Balance Sheet and confirm the AI anomaly indicators appear (small lightbulb/insight icons) on Plus and Advanced plans.
Expected result: anomaly suggestions surface for at least one line item if there are 30+ days of transaction history.
Dext Integration Test
Expected result: duplicate warning appears in the Dext review queue.
Copilot for Finance Test
Expected result: Copilot generates a complete analysis within 30 seconds, including a narrative summary.
MFA Verification Test
Expected result: no financial system is accessible without MFA.
Custom Pipeline Data Extraction Test (Phase 3)
SSH into the Azure VM and run the import script manually:
python import_historical.py --client-id CLIENT001 --months 1Verify transactions are extracted from QBO and stored in PostgreSQL:
psql -U anomaly_user -d anomaly_db -c 'SELECT COUNT(*) FROM transactions WHERE client_id = '\''CLIENT001'\'''Expected result: transaction count matches QBO transaction register for the same period (within 5% tolerance for pagination).
Custom Pipeline Anomaly Detection Test (Phase 3)
Insert 5 known-anomalous test transactions into the database:
Run the detection calibration script:
python calibrate.py --client-id CLIENT001Verify all 5 test transactions appear in the anomaly results with scores > 0.75.
Expected result: 5/5 test anomalies detected.
Alert Routing Test
Trigger a test alert from the anomaly detection pipeline (or manually send a webhook payload to the Power Automate flow). Verify the following:
Expected result: all three alert channels receive the notification.
Dashboard Functional Test
Open the anomaly dashboard in a browser (https://anomaly.yourclient.com). Verify the following:
Expected result: all CRUD operations work and UI renders correctly on Chrome, Edge, and Safari.
Backup and Recovery Test
Expected result: full database recovery within 30 minutes with zero data loss from the last backup window.
Compliance Documentation Test
Review the updated WISP with the client's designated security coordinator. Verify it includes:
Expected result: WISP passes review checklist with zero gaps.
False Positive Rate Validation
After the 2–4 week calibration period, calculate the false positive rate by reviewing all flagged anomalies: (false positives / total flagged) × 100. Target: less than 10% false positive rate.
If the false positive rate exceeds 15%, adjust thresholds in config.yaml (increase min_anomaly_score by 0.05 increments) and re-run calibration. Document the final calibrated thresholds.
End-to-End Smoke Test
On a Monday morning, verify the previous night's scheduled run completed successfully by checking:
Expected result: complete pipeline execution with no errors in the log.
Client Handoff
The client handoff should be conducted as a 2-hour on-site or video meeting with the firm's partners, senior bookkeepers, and the designated Anomaly Review Owner. Cover the following topics in order:
Documentation to Leave Behind
- Anomaly Review SOP (printed and in SharePoint)
- Updated WISP (printed and in SharePoint)
- Quick Reference Card: 1-page laminated guide with dashboard URL, Teams channel, escalation contacts, and severity definitions
- Copilot Prompt Library: All prompt templates saved in a shared Excel workbook
- Vendor contact sheet: Intuit support, Dext support, MSP helpdesk number
- Training recording: Record the handoff session and store in the client's SharePoint
- Architecture diagram: Visual diagram of all data flows for compliance reference
Maintenance
ONGOING MSP RESPONSIBILITIES:
Weekly (30 minutes per client)
Monthly (2 hours per client)
sudo apt update && sudo apt upgrade -y && pip install --upgrade pyod scikit-learnQuarterly (4 hours per client)
Semi-Annually (1 day)
Annually
SLA Considerations
- P1 (Critical) anomalies: MSP acknowledges within 1 hour during business hours; client reviews within 4 hours
- P2 (High) anomalies: MSP acknowledges within 4 hours; client reviews within 24 hours
- P3/P4 anomalies: Included in daily/weekly digest; reviewed within 72 hours
- System uptime SLA: 99.5% for custom pipeline (Azure VM); SaaS platforms governed by vendor SLAs
- Data extraction SLA: No more than 24-hour lag between QBO transaction posting and anomaly detection
Escalation Paths
- Level 1: MSP L2 technician (configuration issues, alert routing, dashboard access)
- Level 2: MSP L3 solutions architect (threshold tuning, model performance, integration failures)
- Level 3: MSP data engineer (custom pipeline bugs, ML model retraining, database issues)
- Level 4: Vendor support (QBO API issues → Intuit Developer Support; Azure issues → Microsoft Support; Dext issues → Dext Partner Support)
- Fraud escalation: If confirmed fraud is detected, MSP notifies client firm partner immediately. Client engages legal counsel and forensic accountant. MSP preserves all detection logs and audit trails.
Alternatives
MindBridge AI Platform (Dedicated Anomaly Detection)
Instead of the phased approach starting with QBO built-in features, deploy MindBridge as the primary anomaly detection platform from day one. MindBridge uses a combination of Benford's Law, statistical models, and neural networks to analyze 100% of transactions, providing the deepest anomaly detection available for accounting firms. It is purpose-built for audit and accounting use cases with features like risk scoring, audit trail documentation, and regulatory-ready reporting.
Tradeoffs: Cost: $15,000-$50,000+/year vs. ~$2,000-$4,000/year for the Phase 1 approach. Complexity: 8-14 week implementation vs. 1-2 weeks. Capability: Significantly deeper detection with ML models trained specifically on financial fraud patterns, but may be overkill for small bookkeeping firms with <$5M in annual client transactions. Recommend when: the client is a mid-size accounting or audit firm (50+ clients), has experienced fraud or error losses exceeding $50K, or needs audit-ready anomaly documentation for compliance purposes. MindBridge does not have public pricing, so negotiation is required.
Botkeeper AI Bookkeeping Platform
Replace the client's existing bookkeeping workflow with Botkeeper, which combines AI-powered transaction categorization with human-in-the-loop review. Anomaly detection is embedded in the categorization workflow rather than running as a separate layer. Botkeeper handles the entire bookkeeping process — data entry, categorization, reconciliation, and anomaly flagging — as a managed service.
Tradeoffs:
- Cost: $199-$299/license/month per business entity, which may be lower than the combined cost of QBO Advanced + Dext + Copilot for firms managing many small clients.
- Complexity: Lower ongoing MSP involvement since Botkeeper provides its own support team for bookkeeping operations.
- Capability: Less customizable anomaly detection (no custom ML models), but the integrated approach catches errors earlier in the workflow.
- Recommend when: the MSP client is a small bookkeeping firm (2-10 people) that wants to outsource more of the bookkeeping process, not just add anomaly detection on top of existing workflows.
- Not recommended for firms that want to maintain full control of their bookkeeping process.
Xero + Dext + Custom Python Pipeline (Non-QuickBooks Stack)
For clients who use Xero instead of QuickBooks, implement the same phased approach but substitute Xero's native AI features and API. Xero's API is well-documented and RESTful, making it straightforward to build the custom extraction pipeline. Dext integrates natively with Xero. Microsoft Copilot for Finance works with Xero data exported to Excel.
Tradeoffs
- Cost: Xero plans are generally less expensive ($20-$80/month vs. QBO's $92-$235/month) with unlimited users on all plans, reducing per-seat costs for larger firms.
- Capability: Xero's built-in AI anomaly detection is less mature than QBO's current offering, so the custom pipeline (Phase 3) becomes more important earlier.
- Complexity: The Python pipeline requires Xero API OAuth 2.0 implementation instead of QuickBooks, but the code changes are straightforward (swap QBOClient class for a XeroClient class).
- Recommend when: the client is already on Xero and migration to QBO would be disruptive, or when the client has international operations (Xero has stronger multi-currency support).
Sage Intacct with Built-In GL Outlier Detection
For mid-market clients (50-500 employees, multi-entity), deploy Sage Intacct as the core accounting platform with its native GL Outlier Detection and Sage Copilot Variance Analysis features. Sage Intacct's dimensional reporting provides richer context for anomaly detection than QBO or Xero.
Tradeoffs
- Cost: Significantly higher — $25,000-$35,000/year for Sage Intacct vs. $1,000-$3,000/year for QBO Advanced.
- Complexity: 3-6 month implementation for Sage Intacct migration, requiring certified Sage implementation partner involvement.
- Capability: Far superior for multi-entity, multi-dimensional analysis. GL Outlier Detection provides continuous monitoring with Sage Copilot providing AI-generated explanations of variance drivers.
- Recommend when: the client is a growing firm managing complex entities (multi-location, multi-department), has revenue exceeding $10M, or needs GAAP/IFRS compliance with robust audit trails.
- Not appropriate for micro-businesses or solo bookkeepers.
Fully Custom Open-Source Solution (No SaaS Dependencies)
Skip all commercial SaaS platforms and build an entirely custom anomaly detection system using open-source tools: PostgreSQL for data storage, Apache Airflow for orchestration, PyOD + scikit-learn for ML models, Grafana for dashboards, and direct bank data ingestion via Plaid API. This approach maximizes customization and eliminates recurring SaaS licensing costs.
Tradeoffs
- Cost: Higher upfront ($25,000-$75,000 development) but lower ongoing ($100-$300/month for cloud infrastructure vs. $500-$3,000/month for SaaS licenses).
- Complexity: Very high — requires a data engineer and ML specialist for initial build (16-26 weeks) and ongoing maintenance. This is not a standard MSP competency and may require subcontracting.
- Capability: Maximum flexibility — every algorithm, threshold, and workflow can be customized. However, you lose the pre-built accounting domain expertise embedded in commercial platforms.
Recommend only when: the client has unique requirements that no commercial platform addresses, regulatory constraints prevent cloud SaaS usage, or the MSP has in-house data engineering talent and wants to build a proprietary offering to resell across multiple clients.
Want early access to the full toolkit?