
Implementation Guide: Analyze production yield, scrap rates, and downtime to identify root causes
Step-by-step implementation guide for deploying AI to analyze production yield, scrap rates, and downtime to identify root causes for Manufacturing clients.
Hardware Procurement
...
Advantech EPC-R7300 Edge AI Computer
$1,500–$2,500 per unit (MSP cost) / $3,000–$4,000 suggested resale (configured and installed)
Industrial-hardened fanless edge gateway deployed on the shop floor. Runs KEPServerEX for protocol translation (OPC UA, Modbus, EtherNet/IP to MQTT), local data buffering when cloud connectivity is intermittent, and lightweight edge inference for real-time anomaly detection. Rated for -20°C to 60°C operation in harsh manufacturing environments with dust, vibration, and temperature swings.
Dell PowerEdge T360 Tower Server
$3,500–$5,000 per unit (MSP cost, configured with 32GB RAM, 2x1TB NVMe RAID1, Intel Xeon E-2488) / $5,500–$7,500 suggested resale
On-premise analytics server hosting TimescaleDB for time-series data storage, Python ML model training and inference, local Grafana instance for shop-floor dashboards, and data pipeline orchestration. Sits in the client's server room or IT closet and acts as the primary data warehouse for all production metrics with 90-day hot retention.
ifm efector VVB001 Vibration Sensor
$400–$495 per unit (MSP cost) / $700–$850 suggested resale per unit
IO-Link 3-axis vibration sensors (0–50g, 2–10,000 Hz) mounted on critical rotating equipment such as spindles, motors, pumps, and compressors that lack built-in monitoring. Provides vibration velocity and acceleration data for predictive maintenance and correlating machine health degradation with scrap rate increases.
Banner Engineering Wireless Vibration/Temperature Sensor Node
$400–$600 per node (MSP cost) / $700–$900 suggested resale per node
Wireless mesh vibration and temperature sensors for machines in locations where running IO-Link or Ethernet cabling is impractical. Up to 2-mile range via wireless mesh. Used to monitor auxiliary equipment (hydraulic units, cooling systems, air compressors) that influence yield and scrap but are often unmonitored.
Banner Engineering Wireless Gateway
$800–$1,200 (MSP cost) / $1,400–$1,800 suggested resale
Central wireless gateway that aggregates data from all Banner wireless sensor nodes and forwards it via Modbus TCP or MQTT to the edge gateway (Advantech EPC-R7300). Provides web-based configuration and diagnostics.
Cisco Catalyst IE3300 Rugged Industrial Switch
$1,200–$1,800 per unit (MSP cost) / $1,800–$2,800 suggested resale per unit
Industrial-rated managed Ethernet switches for the OT network VLAN. One switch connects PLCs and edge devices on the shop floor; the second sits at the IT/OT DMZ boundary. Supports VLAN segmentation, QoS for real-time data, and DIN-rail mounting in industrial enclosures.
Samsung Galaxy Tab Active5 Enterprise Edition
$450–$550 per unit (MSP cost) / $700–$900 suggested resale per unit
Ruggedized tablets mounted at workstations for operators to view real-time OEE dashboards, enter downtime reason codes, and review scrap alerts. IP68 rated, MIL-STD-810H compliant, suitable for shop floor environments with oil, dust, and vibration.
APC Smart-UPS 1500VA
$600–$800 (MSP cost) / $900–$1,200 suggested resale
Rack-mount UPS protecting the Dell PowerEdge T360 analytics server and Advantech edge gateway from power fluctuations common in manufacturing environments. Provides 10–15 minutes of battery backup for graceful shutdown and prevents data corruption during power events.
Software Procurement
MachineMetrics Professional
$200–$400/machine/month; estimate $2,400–$4,800/month for 12 machines ($28,800–$57,600/year)
Cloud-native machine monitoring platform providing out-of-the-box connectivity to CNC machines, injection molders, and stamping presses. Automatically captures cycle times, utilization, OEE, downtime events, and part counts. Provides real-time dashboards and historical analytics. Primary data source for production yield and downtime analysis.
KEPServerEX Manufacturing Suite
$2,875 one-time (Manufacturing Suite) + ~$575/year annual subscription; individual drivers from $452
OPC UA/DA server with 100+ industrial protocol drivers for bridging legacy PLCs (Modbus RTU/TCP, EtherNet/IP, PROFINET, FANUC, Mitsubishi) to modern MQTT and OPC UA. Critical for connecting older machines that MachineMetrics cannot natively reach. Includes IoT Gateway module for MQTT publishing to Azure IoT Hub.
Microsoft Power BI Pro
$14/user/month; estimate 8 users = $112/month ($1,344/year)
Executive and management dashboards showing yield trends, scrap Pareto charts, downtime root cause analysis, shift-over-shift comparisons, and financial impact of production losses. Connects to Azure IoT Hub, TimescaleDB, and MachineMetrics APIs for unified reporting.
Azure IoT Hub (S1 Tier)
$25/unit/month; 2 units = $50/month ($600/year) for up to 800K messages/day
Cloud message broker and device management for ingesting production data from the edge gateway. Provides device twins, message routing to Azure services, and secure TLS-encrypted communication. Routes data to Azure SQL Database and Azure Blob Storage for long-term analytics.
Azure SQL Database (General Purpose)
$150–$300/month (2 vCores, 32GB storage) ($1,800–$3,600/year)
Cloud-hosted relational database storing aggregated KPIs, downtime event records, scrap summaries, and ML model outputs. Serves as the data warehouse for Power BI dashboards and long-term trend analysis (5+ year retention for compliance).
Azure Blob Storage (Hot Tier)
$20–$50/month for 500GB–1TB ($240–$600/year)
Cold storage for raw sensor telemetry, historical time-series data beyond 90-day on-premise retention, and ML training datasets. Provides cost-effective long-term retention for compliance requirements (ISO 9001, 21 CFR Part 11).
TimescaleDB Community Edition
Free (self-hosted on Dell PowerEdge T360); optional Timescale Cloud from $29/month
High-performance time-series database deployed on the on-premise analytics server. Stores 90 days of hot sensor data (vibration, temperature, cycle times, power consumption) at 1-second to 1-minute resolution. Supports hypertables with automatic partitioning, continuous aggregates for dashboard queries, and native PostgreSQL compatibility.
Grafana OSS
Free (self-hosted on Dell PowerEdge T360)
Shop-floor real-time dashboards displayed on Samsung tablets at workstations. Shows live OEE gauges, current machine status, active alarms, and shift progress. Connects directly to TimescaleDB via native PostgreSQL data source. Lower latency than cloud-based Power BI for real-time monitoring.
Python (Anaconda Distribution)
Free (Anaconda Individual Edition)
Runtime environment for custom ML models performing root cause analysis, anomaly detection, and scrap prediction. Key libraries: scikit-learn, pandas, numpy, prophet, shap, imbalanced-learn. Deployed on Dell PowerEdge T360 with scheduled model training via cron jobs.
Node-RED
Free (self-hosted on Advantech edge gateway)
Low-code data integration and transformation running on the edge gateway. Provides visual flow-based programming for bridging OPC UA data from KEPServerEX to MQTT topics, performing data normalization, applying threshold-based alerts, and handling protocol conversions for non-standard machines.
Eclipse Mosquitto MQTT Broker
Free (self-hosted on Advantech edge gateway)
Lightweight MQTT message broker on the edge gateway that receives machine data from KEPServerEX IoT Gateway and Node-RED flows, then forwards to Azure IoT Hub. Provides local message buffering during cloud connectivity outages ensuring no data loss.
Prerequisites
- Existing PLC infrastructure accessible via at least one standard industrial protocol (OPC UA, EtherNet/IP, PROFINET, Modbus TCP/RTU, or MTConnect). Document all PLC makes, models, and firmware versions during site survey.
- Functioning ERP system (SAP Business One, Epicor Kinetic, Infor SyteLine, Microsoft Dynamics 365 Business Central, JobBOSS², or equivalent) with accessible database (ODBC/SQL) or REST API for work order, BOM, and cost data extraction.
- Dedicated VLAN or ability to create a new VLAN on the plant floor network for OT device isolation. Network infrastructure must support 802.1Q VLAN tagging.
- Gigabit Ethernet backbone on the shop floor with available switch ports near each PLC or machine controller. Minimum Cat5e cabling; Cat6 recommended for new runs.
- Minimum 50 Mbps symmetrical internet uplink for cloud SaaS connectivity (MachineMetrics, Azure IoT Hub, Power BI). 100 Mbps recommended.
- Server room or IT closet with adequate cooling, 120/240V power, and space for a 2U rack-mount server (Dell PowerEdge T360) and 1U UPS (APC SMT1500RM2UC).
- Active Directory or Azure AD tenant for identity management and single sign-on across Power BI, Azure services, and MachineMetrics.
- Microsoft 365 or Azure tenant with Global Admin access for provisioning Azure IoT Hub, Azure SQL Database, and Power BI Pro licenses via CSP.
- Client-designated project stakeholders: plant manager or production supervisor (defines KPIs and downtime categories), maintenance lead (sensor placement and machine access), IT administrator (network access and firewall rules), and quality manager (scrap categories and compliance requirements).
- Downtime reason code taxonomy agreed upon by client operations team before deployment. Minimum 15–30 standardized codes covering categories: mechanical failure, electrical failure, tooling, material, operator, quality hold, changeover, scheduled maintenance, no demand.
- Physical access to all machines for sensor installation (ifm VVB001, Banner wireless nodes) during scheduled downtime windows. Coordinate with maintenance team for lockout/tagout procedures.
- Baseline production data: minimum 30 days of historical production records (even if manual/spreadsheet-based) to validate initial KPI calculations against known values.
Installation Steps
...
Step 1: Site Survey and Data Source Audit
Conduct a comprehensive on-site audit of the manufacturing facility to document every machine, PLC, sensor, and software system that will feed into the analytics platform. Walk the entire production floor with the plant engineer and maintenance lead. For each machine, record: make/model, PLC type and firmware version, available communication protocols, existing sensors, network connectivity status, and current monitoring capabilities. Document the ERP system version, database type, and available APIs. Map the existing network topology including all VLANs, firewalls, and internet uplinks. Photograph PLC cabinets, network panels, and proposed sensor mounting locations. Identify machines that have no PLC or digital connectivity (candidates for retrofit sensors). Produce a Machine Connectivity Matrix spreadsheet as the primary deliverable.
- Create Machine Connectivity Matrix template
- Columns: Machine_ID, Machine_Name, Make_Model, PLC_Type, PLC_FW_Version, Protocol_Available, Network_Connected(Y/N), IP_Address, Sensors_Existing, Retrofit_Needed(Y/N), MachineMetrics_Compatible(Y/N), Notes
- Save as: [ClientName]_Machine_Connectivity_Matrix_v1.xlsx
Schedule this visit during a production shift so you can observe actual operations. Allow 4–8 hours depending on facility size. Bring a laptop with Nmap for network scanning and a Fluke LinkIQ cable tester. Do NOT connect any devices to the OT network during the survey — observation only. Request all PLC program documentation from the client in advance.
Step 2: Network Infrastructure — OT VLAN and IT/OT DMZ Setup
Configure the network infrastructure required to safely collect data from OT devices without exposing the control network. This follows the Purdue Model (IEC 62443) with a dedicated OT VLAN (Level 1-2), a DMZ VLAN (Level 3), and the existing corporate IT network (Level 4-5). Install the two Cisco IE3300 industrial switches. Switch 1 connects to PLCs and edge devices on the shop floor OT VLAN. Switch 2 sits at the DMZ boundary. Configure VLAN tagging, inter-VLAN routing restrictions, and firewall rules on the client's existing firewall to allow only outbound data flow from OT to IT/cloud (never inbound to OT).
# Cisco IE3300 Switch 1 (Shop Floor OT VLAN) - Basic Configuration
enable
configure terminal
hostname IE3300-OT-FLOOR
vlan 100
name OT-Production
exit
vlan 200
name DMZ-Analytics
exit
interface range GigabitEthernet1/1-8
switchport mode access
switchport access vlan 100
spanning-tree portfast
exit
interface GigabitEthernet1/9
description Uplink-to-DMZ-Switch
switchport mode trunk
switchport trunk allowed vlan 100,200
exit
ip default-gateway 10.100.1.1
end
write memory
# Cisco IE3300 Switch 2 (DMZ Boundary) - Basic Configuration
enable
configure terminal
hostname IE3300-DMZ
vlan 200
name DMZ-Analytics
exit
interface GigabitEthernet1/1
description Downlink-from-OT-Switch
switchport mode trunk
switchport trunk allowed vlan 100,200
exit
interface GigabitEthernet1/2
description Edge-Gateway-Advantech
switchport mode access
switchport access vlan 200
exit
interface GigabitEthernet1/3
description Analytics-Server-Dell
switchport mode access
switchport access vlan 200
exit
interface GigabitEthernet1/9
description Uplink-to-Corporate-Firewall
switchport mode access
switchport access vlan 200
exit
end
write memory
# Firewall Rules (example for pfSense/Fortinet - adapt to client's firewall)
# Rule 1: Allow OT VLAN (10.100.1.0/24) -> DMZ VLAN (10.200.1.0/24) on ports 4840 (OPC UA), 1883 (MQTT), 502 (Modbus)
# Rule 2: Allow DMZ VLAN (10.200.1.0/24) -> Internet on ports 443 (HTTPS), 8883 (MQTTS to Azure IoT Hub)
# Rule 3: DENY ALL from Internet/Corporate -> OT VLAN
# Rule 4: DENY ALL from DMZ -> OT VLAN (unidirectional data flow only)CRITICAL: Never allow any inbound connections from the corporate network or internet to the OT VLAN. Data must flow unidirectionally: OT -> DMZ -> Cloud. Use the client's existing firewall for inter-VLAN routing and ACLs. The Cisco IE3300 switches provide Layer 2 segmentation; Layer 3 filtering happens at the firewall. Coordinate with the plant engineer to schedule switch installation during a maintenance window — do NOT disrupt production network. Label all cables and ports clearly.
Step 3: Edge Gateway Deployment — Advantech EPC-R7300
Install and configure the Advantech EPC-R7300 edge AI computer in the DMZ network zone. This device runs KEPServerEX for industrial protocol translation, Eclipse Mosquitto as the local MQTT broker, and Node-RED for data integration flows. Mount it in an industrial enclosure or DIN-rail near the main PLC cabinet. Connect it to the DMZ switch (Cisco IE3300 Switch 2) via Ethernet. The edge gateway must have network access to PLCs on the OT VLAN (through firewall-controlled rules) and outbound internet access for Azure IoT Hub.
# 1. Physical installation
# Mount Advantech EPC-R7300 on DIN rail or in industrial enclosure
# Connect Ethernet to Cisco IE3300-DMZ port Gi1/2
# Connect power supply (24VDC industrial or included AC adapter)
# Assign static IP: 10.200.1.10/24, Gateway: 10.200.1.1, DNS: client's DNS servers
# 2. OS Setup (Ubuntu 22.04 LTS pre-installed or install via USB)
sudo apt update && sudo apt upgrade -y
sudo hostnamectl set-hostname edge-gw-prod
# 3. Install Eclipse Mosquitto MQTT Broker
sudo apt install -y mosquitto mosquitto-clients
sudo systemctl enable mosquitto
# Create Mosquitto config
sudo tee /etc/mosquitto/conf.d/production.conf << 'EOF'
listener 1883 0.0.0.0
allow_anonymous false
password_file /etc/mosquitto/passwd
persistence true
persistence_location /var/lib/mosquitto/
log_dest file /var/log/mosquitto/mosquitto.log
EOF
# Create MQTT user
sudo mosquitto_passwd -c /etc/mosquitto/passwd edgeuser
# Enter password when prompted (use strong password, document in password manager)
sudo systemctl restart mosquitto
# 4. Install Node-RED
sudo apt install -y nodejs npm
sudo npm install -g --unsafe-perm node-red
sudo npm install -g pm2
pm2 start node-red -- --userDir /home/admin/.node-red
pm2 startup systemd
pm2 save
# Install Node-RED manufacturing palettes
cd /home/admin/.node-red
npm install node-red-contrib-opcua
npm install node-red-contrib-mqtt-broker
npm install node-red-contrib-modbus
npm install node-red-dashboard
pm2 restart node-red
# 5. Install KEPServerEX (Windows VM or Wine, or use dedicated Windows IoT if Advantech ships with Windows)
# If Windows IoT Enterprise is installed:
# Download KEPServerEX installer from PTC/Kepware partner portal
# Install Manufacturing Suite with the following drivers based on site survey:
# - Allen-Bradley EtherNet/IP
# - Siemens TCP/IP Ethernet (S7-1200/1500)
# - Modbus TCP/IP
# - FANUC Focas
# - Mitsubishi MC Protocol
# - MTConnect Agent
# - IoT Gateway (MQTT Client)
# Activate license with PTC-provided activation code
# 6. Verify connectivity
ping 10.100.1.0 # Test reach to OT VLAN (should work per firewall rules)
mosquitto_sub -h localhost -t 'test/#' -u edgeuser -P <password> &
mosquitto_pub -h localhost -t 'test/hello' -u edgeuser -P <password> -m 'Edge gateway online'If the Advantech EPC-R7300 ships with Windows IoT Enterprise (common for industrial PCs), install KEPServerEX natively. If it ships with Ubuntu, consider running KEPServerEX in a lightweight Windows VM via KVM, or deploy KEPServerEX on a separate small Windows PC and use the Advantech box exclusively for Mosquitto + Node-RED. Ensure NTP is configured to synchronize time with the same source as PLCs — time alignment is critical for correlating production events across systems. Set up automatic OS security updates.
Step 4: KEPServerEX Configuration — PLC and Machine Connectivity
Configure KEPServerEX on the edge gateway to connect to all PLCs and machine controllers identified in the site survey. Create channels and devices for each protocol, define tag groups for production-relevant data points (cycle counts, part counts, reject counts, machine state, alarms, temperatures, pressures, speeds), and configure the IoT Gateway module to publish all tags to the local Mosquitto MQTT broker in a standardized Sparkplug B or JSON format.
# expected output: kepware/AB_Line1/Press01/CycleCount 4523 |
# kepware/AB_Line1/Press01/MachineState 1 |
# kepware/AB_Line1/Press01/PartCountGood 4501 |
# kepware/AB_Line1/Press01/PartCountReject 22
mosquitto_sub -h localhost -t 'kepware/#' -u edgeuser -P <password> -vTag naming conventions are critical — establish a standard early and enforce it. Recommended: {Line}_{Machine}_{Parameter}_{Unit}. Example: Line1_Press01_CycleTime_ms. Set scan rates based on data criticality: 1s for machine state and cycle counts, 5–10s for temperatures and pressures, 60s for slow-changing parameters. Enable KEPServerEX store-and-forward to prevent data loss during network hiccups. Document all tag mappings in a Tag Dictionary spreadsheet that maps PLC addresses to KEPServerEX tag names to MQTT topics.
Step 5: MachineMetrics Deployment — Cloud Machine Monitoring
Deploy MachineMetrics on CNC machines and other supported equipment for automated, cloud-native machine monitoring. MachineMetrics provides plug-and-play adapters for most CNC controls (FANUC, Haas, Mazak, DMG Mori, Siemens SINUMERIK) and connects directly to the machine controller without PLC modification. Install MachineMetrics Edge devices on each compatible machine, connect them to the DMZ network, and configure the MachineMetrics cloud portal with machine profiles, shift schedules, and downtime reason codes.
# MachineMetrics API — for custom data extraction:
curl -X GET 'https://api.machinemetrics.com/v1/machines' \
-H 'Authorization: Bearer <API_KEY>' \
-H 'Content-Type: application/json'
# Fetch production data for a specific machine and date range:
curl -X GET 'https://api.machinemetrics.com/v1/report/production?machineId=<ID>&startDate=2025-01-01&endDate=2025-01-31' \
-H 'Authorization: Bearer <API_KEY>'MachineMetrics charges per connected machine, so prioritize the most critical and highest-value machines first. Start with CNC machines (highest compatibility) and expand to other equipment types. The MachineMetrics Edge device requires outbound HTTPS (port 443) to *.machinemetrics.com — ensure the firewall allows this from the DMZ VLAN. MachineMetrics provides an API for extracting data programmatically — we will use this in later steps to feed data into the custom ML pipeline. Operator downtime reason code entry is critical for root cause analysis — work with the plant manager to enforce compliance.
Step 6: On-Premise Analytics Server Setup — Dell PowerEdge T360
Install and configure the Dell PowerEdge T360 as the on-premise analytics server. This server runs TimescaleDB for time-series data, Grafana for shop-floor dashboards, Python for ML models, and acts as the local data processing hub. Install Ubuntu Server 22.04 LTS, configure RAID1 for the two NVMe drives, set up remote management via iDRAC, and install all required software components.
- Physical Setup: Rack-mount Dell PowerEdge T360 in client server room
- Connect to APC UPS via USB for graceful shutdown on power loss
- Connect Ethernet to Cisco IE3300-DMZ switch port
- Assign static IP: 10.200.1.20/24, Gateway: 10.200.1.1
- Configure iDRAC remote management: 10.200.1.21 (separate NIC)
- OS Installation: Install Ubuntu Server 22.04 LTS from USB
- Configure RAID1 (mirror) across two NVMe drives via PERC controller in BIOS
- Create user: mspadmin (add to sudo group)
- Enable SSH: sudo apt install openssh-server
sudo ufw allow ssh
sudo ufw allow 3000/tcp # Grafana
sudo ufw allow 5432/tcp # TimescaleDB (from edge gateway and Power BI only)
sudo ufw allow 8888/tcp # Jupyter Lab (for ML development, restrict to MSP IPs)
sudo ufw enablesudo apt install -y gnupg postgresql-common apt-transport-https lsb-release wget
sudo /usr/share/postgresql-common/pgdg/apt.postgresql.org.sh -y
echo "deb https://packagecloud.io/timescale/timescaledb/ubuntu/ $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/timescaledb.list
wget --quiet -O - https://packagecloud.io/timescale/timescaledb/gpgkey | sudo gpg --dearmor -o /etc/apt/trusted.gpg.d/timescaledb.gpg
sudo apt update
sudo apt install -y timescaledb-2-postgresql-16
sudo timescaledb-tune --quiet --yes
sudo systemctl restart postgresqlsudo -u postgres psql -c "CREATE DATABASE production_analytics;"
sudo -u postgres psql -d production_analytics -c "CREATE EXTENSION IF NOT EXISTS timescaledb;"
sudo -u postgres psql -c "CREATE USER analytics_app WITH PASSWORD 'CHANGE_THIS_STRONG_PASSWORD';"
sudo -u postgres psql -c "GRANT ALL PRIVILEGES ON DATABASE production_analytics TO analytics_app;"-- hypertables for telemetry, production events, downtime, OEE, and scrap
sudo -u postgres psql -d production_analytics << 'EOSQL'
-- Machine telemetry (high-frequency sensor data)
CREATE TABLE machine_telemetry (
time TIMESTAMPTZ NOT NULL,
machine_id TEXT NOT NULL,
metric_name TEXT NOT NULL,
value DOUBLE PRECISION,
unit TEXT
);
SELECT create_hypertable('machine_telemetry', 'time');
CREATE INDEX idx_telemetry_machine ON machine_telemetry (machine_id, time DESC);
-- Production events (cycle completions, rejects, state changes)
CREATE TABLE production_events (
time TIMESTAMPTZ NOT NULL,
machine_id TEXT NOT NULL,
event_type TEXT NOT NULL, -- 'cycle_complete', 'reject', 'state_change', 'alarm'
part_number TEXT,
work_order TEXT,
cycle_time_ms INTEGER,
good_count INTEGER DEFAULT 0,
reject_count INTEGER DEFAULT 0,
reject_reason TEXT,
machine_state TEXT,
fault_code TEXT,
operator_id TEXT
);
SELECT create_hypertable('production_events', 'time');
CREATE INDEX idx_events_machine ON production_events (machine_id, time DESC);
CREATE INDEX idx_events_type ON production_events (event_type, time DESC);
-- Downtime events
CREATE TABLE downtime_events (
time_start TIMESTAMPTZ NOT NULL,
time_end TIMESTAMPTZ,
machine_id TEXT NOT NULL,
duration_min DOUBLE PRECISION,
reason_code TEXT,
reason_category TEXT,
reason_detail TEXT,
shift TEXT,
operator_id TEXT,
work_order TEXT,
notes TEXT
);
SELECT create_hypertable('downtime_events', 'time_start');
-- OEE summary (continuous aggregate)
CREATE TABLE oee_hourly (
time_bucket TIMESTAMPTZ NOT NULL,
machine_id TEXT NOT NULL,
availability DOUBLE PRECISION,
performance DOUBLE PRECISION,
quality DOUBLE PRECISION,
oee DOUBLE PRECISION,
good_count INTEGER,
reject_count INTEGER,
planned_time_min DOUBLE PRECISION,
run_time_min DOUBLE PRECISION
);
SELECT create_hypertable('oee_hourly', 'time_bucket');
-- Scrap summary
CREATE TABLE scrap_events (
time TIMESTAMPTZ NOT NULL,
machine_id TEXT NOT NULL,
part_number TEXT,
work_order TEXT,
quantity INTEGER,
scrap_reason TEXT,
scrap_category TEXT,
material_cost DOUBLE PRECISION,
labor_cost DOUBLE PRECISION,
operator_id TEXT
);
SELECT create_hypertable('scrap_events', 'time_start');
-- Data retention policies
SELECT add_retention_policy('machine_telemetry', INTERVAL '90 days');
SELECT add_retention_policy('production_events', INTERVAL '365 days');
-- downtime_events and oee_hourly retained indefinitely for trend analysis
EOSQLsudo apt install -y apt-transport-https software-properties-common wget
sudo mkdir -p /etc/apt/keyrings/
wget -q -O - https://apt.grafana.com/gpg.key | gpg --dearmor | sudo tee /etc/apt/keyrings/grafana.gpg > /dev/null
echo "deb [signed-by=/etc/apt/keyrings/grafana.gpg] https://apt.grafana.com stable main" | sudo tee /etc/apt/sources.list.d/grafana.list
sudo apt update
sudo apt install -y grafana
sudo systemctl enable grafana-server
sudo systemctl start grafana-server
# Grafana accessible at http://10.200.1.20:3000 (default admin/admin, change immediately)sudo apt install -y python3 python3-pip python3-venv
python3 -m venv /opt/analytics/ml-env
source /opt/analytics/ml-env/bin/activate
pip install --upgrade pip
pip install pandas numpy scikit-learn scipy statsmodels prophet shap imbalanced-learn psycopg2-binary sqlalchemy matplotlib seaborn jupyter-lab requests schedule
deactivatesudo apt install -y chrony
sudo tee /etc/chrony/chrony.conf << 'EOF'
server time.windows.com iburst
server pool.ntp.org iburst
makestep 1.0 3
rtcsync
EOF
sudo systemctl restart chronysudo apt install -y unattended-upgrades
sudo dpkg-reconfigure -plow unattended-upgradesRAID1 is essential — a single drive failure should not take down the analytics platform. Configure iDRAC email alerts to the MSP monitoring system. TimescaleDB retention policies automatically purge raw telemetry after 90 days to manage disk usage — aggregated data is retained indefinitely. Set PostgreSQL max_connections to 100 and shared_buffers to 8GB (25% of 32GB RAM) for optimal performance. Schedule weekly pg_dump backups to Azure Blob Storage for disaster recovery.
Step 7: Data Pipeline — Edge to On-Premise to Cloud
Build the data pipeline that flows production data from machines through the edge gateway to both the on-premise TimescaleDB and the Azure cloud. Node-RED on the edge gateway subscribes to MQTT topics from KEPServerEX, normalizes the data, writes to TimescaleDB via PostgreSQL, and forwards to Azure IoT Hub for cloud analytics. Additionally, configure a Python script to pull data from the MachineMetrics API and merge it into the same TimescaleDB schema.
az iot hub device-identity create --hub-name <iot-hub-name> --device-id edge-gateway-prod
az iot hub device-identity connection-string show --hub-name <iot-hub-name> --device-id edge-gateway-prod# deploy to /opt/analytics/scripts/mm_sync.py on Dell PowerEdge T360
cat > /opt/analytics/scripts/mm_sync.py << 'PYEOF'
#!/usr/bin/env python3
"""Sync MachineMetrics data to local TimescaleDB every 5 minutes."""
import requests
import psycopg2
from datetime import datetime, timedelta
import json
import os
import logging
logging.basicConfig(level=logging.INFO, filename='/var/log/analytics/mm_sync.log')
logger = logging.getLogger(__name__)
MM_API_KEY = os.environ.get('MM_API_KEY', 'YOUR_API_KEY')
MM_BASE_URL = 'https://api.machinemetrics.com/v1'
DB_CONN = os.environ.get('DB_CONN', 'host=localhost dbname=production_analytics user=analytics_app password=CHANGE_THIS')
def sync_production_data():
"""Fetch last 10 minutes of production data from MachineMetrics."""
conn = psycopg2.connect(DB_CONN)
cur = conn.cursor()
end_time = datetime.utcnow()
start_time = end_time - timedelta(minutes=10)
headers = {'Authorization': f'Bearer {MM_API_KEY}', 'Content-Type': 'application/json'}
# Fetch machines
resp = requests.get(f'{MM_BASE_URL}/machines', headers=headers)
machines = resp.json()
for machine in machines:
machine_id = machine.get('id')
machine_name = machine.get('name', f'machine_{machine_id}')
# Fetch production metrics
params = {
'machineId': machine_id,
'startDate': start_time.isoformat() + 'Z',
'endDate': end_time.isoformat() + 'Z'
}
try:
prod_resp = requests.get(f'{MM_BASE_URL}/report/production', headers=headers, params=params)
if prod_resp.status_code == 200:
data = prod_resp.json()
for record in data.get('data', []):
cur.execute(
"""INSERT INTO production_events
(time, machine_id, event_type, good_count, reject_count, cycle_time_ms)
VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING""",
(record.get('timestamp'), machine_name, 'cycle_complete',
record.get('goodCount', 0), record.get('rejectCount', 0),
record.get('cycleTime'))
)
except Exception as e:
logger.error(f'Error syncing machine {machine_name}: {e}')
conn.commit()
cur.close()
conn.close()
logger.info(f'Sync completed at {datetime.utcnow()}')
if __name__ == '__main__':
sync_production_data()
PYEOFchmod +x /opt/analytics/scripts/mm_sync.py
(crontab -l 2>/dev/null; echo '*/5 * * * * /opt/analytics/ml-env/bin/python /opt/analytics/scripts/mm_sync.py') | crontab -
(crontab -l 2>/dev/null; echo '*/15 * * * * /opt/analytics/ml-env/bin/python /opt/analytics/scripts/erp_sync.py') | crontab -The Node-RED flow is the critical real-time pipeline; ensure it has error handling for database connection failures with automatic retry. The MachineMetrics API sync runs every 5 minutes as a batch supplement — this captures data from machines connected directly to MachineMetrics that bypass KEPServerEX. For the ERP sync script, you'll need to customize it based on the client's specific ERP system and available APIs. Store all API keys and database passwords in environment variables, never hardcoded. Monitor the cron jobs with a simple health check that alerts the MSP if a sync hasn't run in 15 minutes.
Step 8: Grafana Dashboard Configuration — Shop Floor Real-Time Displays
Configure Grafana on the Dell PowerEdge T360 with production dashboards designed for three audiences: (1) operators on the shop floor viewing real-time machine status on Samsung tablets, (2) production supervisors monitoring shift performance, and (3) plant managers reviewing daily/weekly trends. Connect Grafana to TimescaleDB as the primary data source and create dashboard templates.
-- latest state per machine (Stat panel)
SELECT machine_id,
CASE WHEN machine_state = 'Running' THEN 1 WHEN machine_state = 'Idle' THEN 0.5 ELSE 0 END as status
FROM (
SELECT DISTINCT ON (machine_id) machine_id, machine_state
FROM production_events
WHERE event_type = 'state_change' AND time > now() - interval '5 minutes'
ORDER BY machine_id, time DESC
) latest_states;-- rolling 8-hour OEE per machine (Gauge panel)
SELECT time_bucket('1 hour', time_bucket) as time, machine_id, oee * 100 as oee_pct
FROM oee_hourly
WHERE time_bucket > now() - interval '8 hours'
ORDER BY time;-- hourly scrap percentage over 24 hours (Time series panel)
SELECT time_bucket('1 hour', time) as time,
(SUM(reject_count)::float / NULLIF(SUM(good_count) + SUM(reject_count), 0)) * 100 as scrap_pct
FROM production_events
WHERE event_type = 'cycle_complete' AND time > now() - interval '24 hours'
GROUP BY 1 ORDER BY 1;-- top 10 downtime reasons over 7 days (Bar chart panel)
SELECT reason_category, SUM(duration_min) as total_minutes
FROM downtime_events
WHERE time_start > now() - interval '7 days'
GROUP BY reason_category
ORDER BY total_minutes DESC
LIMIT 10;-- scrap quantity by category over 7 days (Pie chart panel)
SELECT scrap_category, SUM(quantity) as total_scrap
FROM scrap_events
WHERE time > now() - interval '7 days'
GROUP BY scrap_category
ORDER BY total_scrap DESC;sudo tee -a /etc/grafana/grafana.ini << 'EOF'
[auth.anonymous]
enabled = true
org_name = Main Org.
org_role = Viewer
EOF
sudo systemctl restart grafana-serverDesign shop-floor dashboards with large fonts, high contrast, and minimal text — operators glance at them from 10+ feet away. Use color coding: green = running, yellow = idle, red = down. The Samsung Galaxy Tab Active5 in kiosk mode prevents operators from navigating away from the dashboard. Create separate dashboards for each audience — operators need real-time status, supervisors need shift summaries, managers need weekly trends. Set Grafana auto-refresh to 10 seconds for shop-floor displays and 5 minutes for management dashboards.
Step 9: Power BI Dashboard Configuration — Executive Analytics
Configure Power BI Pro dashboards for management-level analytics connecting to Azure SQL Database (cloud data) and optionally to the on-premise TimescaleDB via Power BI Gateway. Build executive reports showing yield trends, scrap cost analysis, downtime root cause Pareto charts, and OEE benchmarking across machines and lines. These dashboards focus on business impact (dollars lost, trends over weeks/months) rather than real-time machine status.
# Key OEE and scrap cost calculations for Power BI
OEE = Availability * Performance * Quality
Availability = Run Time / Planned Production Time
Performance = (Ideal Cycle Time * Total Count) / Run Time
Quality = Good Count / Total Count
Scrap Rate = Reject Count / Total Count
Scrap Cost = SUM(ScrapEvents[quantity] * ScrapEvents[material_cost])
MTBF = Total Operating Time / Number of Failures
MTTR = Total Repair Time / Number of FailuresPower BI Pro requires Microsoft 365 licenses for each user who needs interactive access. Read-only consumers can use Power BI Free if reports are published to a Premium workspace. Schedule data refreshes during low-production times (early morning) to minimize load on TimescaleDB. The Power BI Gateway must be installed on the Dell T360 for on-premise data access — it runs as a Windows service so you'll need a lightweight Windows VM or install Power BI Gateway on a separate small Windows PC. For DAX measures, work with the client's financial team to get accurate material costs and labor rates for scrap cost calculations.
Step 10: Vibration Sensor Installation — ifm VVB001 and Banner Wireless
Install vibration and temperature sensors on critical machines that lack built-in condition monitoring. The ifm VVB001 IO-Link sensors are hardwired to machines with PLC connectivity; the Banner Engineering wireless nodes are deployed on auxiliary equipment. These sensors provide the data needed for predictive maintenance ML models that correlate machine health degradation with increasing scrap rates and unplanned downtime.
# 1. ifm VVB001 Installation (8 sensors on critical machines)
# Physical mounting:
# - Clean mounting surface with isopropyl alcohol
# - Apply thin film of coupling grease
# - Use M8 stud mount (included) directly on bearing housing
# - Torque to 5 Nm
# - Route M12 cable to nearest PLC I/O module or IO-Link master
# IO-Link Configuration (via ifm moneo or PLC):
# - Set measurement mode: Velocity RMS (mm/s) + Acceleration Peak (g)
# - Set sampling rate: 1 Hz continuous
# - Set alarm thresholds per ISO 10816:
# * Good: < 2.8 mm/s (Zone A)
# * Satisfactory: 2.8 - 7.1 mm/s (Zone B)
# * Unsatisfactory: 7.1 - 18.0 mm/s (Zone C)
# * Unacceptable: > 18.0 mm/s (Zone D) -> ALARM
# PLC Tag Mapping (add to KEPServerEX):
# {Machine}_Vibration_Velocity_mms
# {Machine}_Vibration_Acceleration_g
# {Machine}_Temperature_C
# {Machine}_Vibration_Alarm (BOOL)
# 2. Banner Engineering Wireless Sensor Installation (4 nodes)
# Physical mounting:
# - Mount sensor on bearing housing or motor casing using supplied adhesive mount or M6 stud
# - Orient sensor with arrow pointing toward primary vibration axis
# - Ensure line-of-sight to DXM700 gateway (or within mesh range)
# DXM700 Gateway Configuration:
# - Access web interface at assigned IP
# - Add each wireless node by serial number
# - Configure reporting interval: 60 seconds
# - Configure output: Modbus TCP register map
# - Assign Modbus registers:
# Register 40001-40002: Node 1 Vibration (FLOAT32)
# Register 40003-40004: Node 1 Temperature (FLOAT32)
# Register 40005-40006: Node 2 Vibration (FLOAT32)
# ...
# 3. Add Banner Modbus data to KEPServerEX:
# Channel: Modbus_Wireless
# Driver: Modbus TCP/IP
# Device: BannerGW, IP: <DXM700 IP>, Port: 502
# Tags: Map all holding registers to named tags
# 4. Verify sensor data flowing through pipeline:
mosquitto_sub -h 10.200.1.10 -t 'kepware/Modbus_Wireless/#' -u edgeuser -P <password> -v
# Expected: vibration and temperature values updating every 60 seconds
# Verify in TimescaleDB:
sudo -u postgres psql -d production_analytics -c "SELECT * FROM machine_telemetry WHERE metric_name LIKE '%Vibration%' ORDER BY time DESC LIMIT 10;"Sensor placement is critical — mount on the bearing housing as close to the bearing as possible, not on sheet metal covers or guards. For CNC machines, mount on the spindle bearing housing. For pumps, mount on the drive-end and non-drive-end bearings. Take baseline vibration readings immediately after installation when the machine is known to be in good condition — these baselines are essential for the ML models. The ifm VVB001 requires an IO-Link master or 4–20mA analog input on the PLC. If the PLC has no available I/O, use an ifm AL1100 IO-Link master connected via Ethernet to KEPServerEX. Document sensor locations with photographs for the maintenance team.
Step 11: ERP Integration — Work Order and Cost Data Sync
Build the integration between the client's ERP system and the analytics platform to enrich production data with work order context, bill of materials (BOM), part costs, and customer information. This enables calculating the financial impact of scrap and downtime, and correlating yield issues with specific products, materials, or suppliers. The specific integration method depends on the client's ERP; this step covers the most common scenarios.
# ERP Integration Script Template
# Deploy on Dell PowerEdge T360: /opt/analytics/scripts/erp_sync.py
cat > /opt/analytics/scripts/erp_sync.py << 'PYEOF'
#!/usr/bin/env python3
"""
ERP Data Sync - Pulls work orders, BOM, and cost data into production_analytics DB.
Customize the fetch_from_erp() function for the client's specific ERP system.
"""
import psycopg2
import requests
import pyodbc # For ODBC connections to ERP databases
import os
import logging
from datetime import datetime, timedelta
logging.basicConfig(level=logging.INFO, filename='/var/log/analytics/erp_sync.log')
logger = logging.getLogger(__name__)
DB_CONN = os.environ.get('DB_CONN', 'host=localhost dbname=production_analytics user=analytics_app password=CHANGE_THIS')
# ===== OPTION A: REST API (for Epicor Kinetic, Dynamics 365, NetSuite) =====
def fetch_work_orders_api():
"""Fetch active work orders from ERP REST API."""
erp_url = os.environ.get('ERP_API_URL', 'https://erp.client.com/api/v1')
erp_key = os.environ.get('ERP_API_KEY', 'YOUR_KEY')
headers = {'Authorization': f'Bearer {erp_key}', 'Content-Type': 'application/json'}
# Adjust endpoint for specific ERP
resp = requests.get(f'{erp_url}/workorders?status=active&modified_after={datetime.utcnow() - timedelta(hours=1)}', headers=headers)
return resp.json().get('data', [])
# ===== OPTION B: ODBC/SQL (for JobBOSS2, E2 Shop, SAP Business One) =====
def fetch_work_orders_sql():
"""Fetch work orders directly from ERP database via ODBC."""
conn_str = os.environ.get('ERP_ODBC', 'DRIVER={ODBC Driver 18 for SQL Server};SERVER=erp-server;DATABASE=erp_db;UID=readonly;PWD=CHANGE_THIS')
conn = pyodbc.connect(conn_str)
cursor = conn.cursor()
cursor.execute("""
SELECT WorkOrderNumber, PartNumber, PartDescription, OrderQty, CompletedQty,
ScrapQty, MaterialCost, LaborRate, CustomerName, DueDate, Status
FROM WorkOrders
WHERE Status IN ('Active', 'In Progress')
AND ModifiedDate > DATEADD(hour, -1, GETDATE())
""")
rows = cursor.fetchall()
conn.close()
return rows
def sync_to_timescaledb(work_orders):
"""Write work order data to local analytics database."""
conn = psycopg2.connect(DB_CONN)
cur = conn.cursor()
for wo in work_orders:
cur.execute("""
INSERT INTO work_orders (
work_order, part_number, part_description, order_qty,
completed_qty, scrap_qty, material_cost_per_unit,
labor_rate_per_hour, customer_name, due_date, status, last_synced
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW())
ON CONFLICT (work_order) DO UPDATE SET
completed_qty = EXCLUDED.completed_qty,
scrap_qty = EXCLUDED.scrap_qty,
status = EXCLUDED.status,
last_synced = NOW()
""", (
wo.get('work_order'), wo.get('part_number'), wo.get('part_description'),
wo.get('order_qty'), wo.get('completed_qty'), wo.get('scrap_qty'),
wo.get('material_cost'), wo.get('labor_rate'), wo.get('customer_name'),
wo.get('due_date'), wo.get('status')
))
conn.commit()
cur.close()
conn.close()
logger.info(f'Synced {len(work_orders)} work orders at {datetime.utcnow()}')
if __name__ == '__main__':
try:
# Use API or SQL depending on ERP type
work_orders = fetch_work_orders_api() # or fetch_work_orders_sql()
sync_to_timescaledb(work_orders)
except Exception as e:
logger.error(f'ERP sync failed: {e}')
PYEOF
# Create work_orders table in TimescaleDB
sudo -u postgres psql -d production_analytics << 'EOSQL'
CREATE TABLE IF NOT EXISTS work_orders (
work_order TEXT PRIMARY KEY,
part_number TEXT,
part_description TEXT,
order_qty INTEGER,
completed_qty INTEGER,
scrap_qty INTEGER,
material_cost_per_unit DOUBLE PRECISION,
labor_rate_per_hour DOUBLE PRECISION,
customer_name TEXT,
due_date DATE,
status TEXT,
last_synced TIMESTAMPTZ
);
CREATE INDEX idx_wo_part ON work_orders (part_number);
CREATE INDEX idx_wo_status ON work_orders (status);
EOSQL
# Schedule ERP sync every 15 minutes
(crontab -l 2>/dev/null; echo '*/15 * * * * /opt/analytics/ml-env/bin/python /opt/analytics/scripts/erp_sync.py') | crontab -ERP integration is the most client-specific part of this deployment — allocate extra time for this step. For JobBOSS² and E2 Shop (very common in small CNC shops), use ODBC with a read-only SQL account. For Epicor Kinetic, use the REST API v2. For SAP Business One, use the Service Layer REST API. For Dynamics 365 Business Central, use OData/REST APIs with Azure AD OAuth2. NEVER write to the ERP database — read-only access only. Create a dedicated read-only database account with the client's DBA. Test the ERP sync thoroughly with the client's financial controller to verify cost data accuracy.
Step 12: Azure Cloud Infrastructure Setup
Provision Azure cloud resources for long-term data storage, cloud-based analytics, and Power BI connectivity. Set up Azure IoT Hub for device management and message ingestion, Azure SQL Database for aggregated analytics data, Azure Blob Storage for raw data archival, and Azure Stream Analytics for real-time cloud processing. All resources provisioned through the MSP's Azure CSP subscription for the client.
# 1. Create Resource Group
az group create --name rg-manufacturing-analytics --location eastus
# 2. Create IoT Hub (S1 tier, 2 units)
az iot hub create \
--name iot-hub-<client-short-name>-prod \
--resource-group rg-manufacturing-analytics \
--sku S1 \
--unit 2 \
--location eastus
# 3. Register edge gateway as IoT device
az iot hub device-identity create \
--hub-name iot-hub-<client-short-name>-prod \
--device-id edge-gateway-prod \
--auth-method shared_private_key
# Get connection string for edge gateway
az iot hub device-identity connection-string show \
--hub-name iot-hub-<client-short-name>-prod \
--device-id edge-gateway-prod \
--output tsv
# 4. Create Azure SQL Database
az sql server create \
--name sql-mfg-<client-short-name>-prod \
--resource-group rg-manufacturing-analytics \
--location eastus \
--admin-user sqladmin \
--admin-password '<STRONG_PASSWORD>'
az sql db create \
--name production-analytics \
--resource-group rg-manufacturing-analytics \
--server sql-mfg-<client-short-name>-prod \
--compute-model Provisioned \
--edition GeneralPurpose \
--family Gen5 \
--capacity 2 \
--max-size 32GB
# Configure firewall to allow Azure services
az sql server firewall-rule create \
--resource-group rg-manufacturing-analytics \
--server sql-mfg-<client-short-name>-prod \
--name AllowAzureServices \
--start-ip-address 0.0.0.0 \
--end-ip-address 0.0.0.0
# 5. Create Blob Storage for data archival
az storage account create \
--name stmfg<client>prod \
--resource-group rg-manufacturing-analytics \
--location eastus \
--sku Standard_LRS \
--kind StorageV2
az storage container create \
--name raw-telemetry \
--account-name stmfg<client>prod
az storage container create \
--name ml-training-data \
--account-name stmfg<client>prod
# Set lifecycle management for cost optimization
az storage account management-policy create \
--account-name stmfg<client>prod \
--resource-group rg-manufacturing-analytics \
--policy '{"rules":[{"name":"archive-old-data","type":"Lifecycle","definition":{"actions":{"baseBlob":{"tierToCool":{"daysAfterModificationGreaterThan":90},"tierToArchive":{"daysAfterModificationGreaterThan":365}}},"filters":{"blobTypes":["blockBlob"],"prefixMatch":["raw-telemetry/"]}}}]}'
# 6. Create Stream Analytics Job (optional, for real-time cloud processing)
az stream-analytics job create \
--name sa-mfg-<client-short-name>-prod \
--resource-group rg-manufacturing-analytics \
--location eastus \
--sku Standard
# 7. Configure diagnostic logging
az monitor diagnostic-settings create \
--name mfg-analytics-diagnostics \
--resource /subscriptions/<sub-id>/resourceGroups/rg-manufacturing-analytics/providers/Microsoft.Devices/IotHubs/iot-hub-<client-short-name>-prod \
--logs '[{"category":"Connections","enabled":true},{"category":"DeviceTelemetry","enabled":true}]' \
--storage-account stmfg<client>prodUse the MSP's Azure CSP subscription to provision all resources — this allows centralized billing and management. For defense contractors or ITAR-regulated manufacturers, use Azure Government regions (usgovvirginia, usgovarizona) instead of commercial Azure. Enable Azure Defender for IoT Hub for security monitoring. Set up budget alerts at 80% and 100% of estimated monthly spend to prevent cost overruns. The Stream Analytics job is optional for the initial deployment — it can be added later for real-time cloud-side anomaly detection.
Step 13: Custom ML Model Deployment — Root Cause Analysis Engine
Deploy the custom Python-based machine learning models on the Dell PowerEdge T360 that analyze production data to identify root causes of yield loss, excessive scrap, and unplanned downtime. This includes three ML components: (1) a downtime root cause classifier, (2) a scrap anomaly detector, and (3) a yield prediction model. Models are trained on historical data and retrained monthly as new data accumulates.
# 1. Create model directory structure
sudo mkdir -p /opt/analytics/models/{downtime_classifier,scrap_detector,yield_predictor}
sudo mkdir -p /opt/analytics/scripts
sudo mkdir -p /var/log/analytics
sudo chown -R mspadmin:mspadmin /opt/analytics /var/log/analytics
# 2. Deploy root cause analysis models (see custom_ai_components for full code)
# Files to create:
# /opt/analytics/scripts/train_models.py
# /opt/analytics/scripts/run_inference.py
# /opt/analytics/scripts/generate_insights.py
# 3. Initial model training (run after 30+ days of data collection)
source /opt/analytics/ml-env/bin/activate
python /opt/analytics/scripts/train_models.py --mode initial
deactivate
# 4. Schedule daily inference and weekly retraining
(crontab -l 2>/dev/null; echo '0 6 * * * /opt/analytics/ml-env/bin/python /opt/analytics/scripts/run_inference.py >> /var/log/analytics/inference.log 2>&1') | crontab -
(crontab -l 2>/dev/null; echo '0 2 * * 0 /opt/analytics/ml-env/bin/python /opt/analytics/scripts/train_models.py --mode retrain >> /var/log/analytics/training.log 2>&1') | crontab -
# 5. Test inference
source /opt/analytics/ml-env/bin/activate
python /opt/analytics/scripts/run_inference.py --test
# Expected output: JSON with predicted downtime causes, scrap risk scores, yield predictions
deactivateDo NOT deploy ML models until you have at least 30 days of clean production data. The first 30 days should focus on data collection, validation, and establishing baselines. Start with simple statistical analysis (Pareto charts, control charts) before introducing ML — clients need to trust the data before they trust the models. Model accuracy will improve significantly after 90+ days of data. Schedule model retraining weekly on Sunday at 2 AM when production is typically idle. Monitor model drift by comparing prediction accuracy against actuals monthly.
Step 14: System Integration Testing and Validation
Conduct comprehensive end-to-end testing of the entire analytics pipeline from sensor to dashboard. Verify data accuracy by comparing system-calculated OEE, scrap rates, and downtime against manually tracked values. Test all alert conditions, dashboard displays, and ML model outputs. Document all test results for client sign-off and compliance records.
# 1. Data pipeline integrity test
# Manually trigger a known event on a machine (e.g., press the cycle start button)
# Verify the event appears in:
# a. KEPServerEX Quick Client (within 1 second)
# b. Mosquitto MQTT subscription (within 2 seconds)
mosquitto_sub -h 10.200.1.10 -t 'kepware/#' -u edgeuser -P <password> -C 5 -v
# c. TimescaleDB (within 5 seconds)
sudo -u postgres psql -d production_analytics -c "SELECT * FROM production_events ORDER BY time DESC LIMIT 5;"
# d. Grafana dashboard (within 10 seconds)
# e. Azure IoT Hub (within 30 seconds)
az iot hub monitor-events --hub-name iot-hub-<client-short-name>-prod --device-id edge-gateway-prod --timeout 60
# f. Power BI (within 2 hours, after scheduled refresh)
# 2. OEE calculation validation
# Select one machine, one shift
# Manually calculate OEE from paper records:
# Availability = (Shift Time - Downtime) / Shift Time
# Performance = (Actual Cycle Time * Parts Produced) / Available Time
# Quality = Good Parts / Total Parts
# OEE = A * P * Q
# Compare with system-calculated OEE — should be within 2% tolerance
# 3. Alert testing
# Simulate machine down event > 15 minutes
# Verify email/SMS alert fires to production supervisor
# Simulate scrap rate > 5% (manually enter reject events)
# Verify quality manager alert fires
# 4. Sensor data validation
# Compare ifm VVB001 readings with a reference vibration meter
# Readings should match within +/- 10%
# 5. ERP data validation
# Compare work order data in TimescaleDB with ERP screen
# Verify part numbers, quantities, and costs match exactly
# 6. Generate test report
echo 'Test Results Summary' > /opt/analytics/docs/test_report_$(date +%Y%m%d).txt
echo '====================' >> /opt/analytics/docs/test_report_$(date +%Y%m%d).txt
echo 'Date: '$(date) >> /opt/analytics/docs/test_report_$(date +%Y%m%d).txt
echo 'Data Pipeline Latency: ___ms (sensor to TimescaleDB)' >> /opt/analytics/docs/test_report_$(date +%Y%m%d).txt
echo 'OEE Accuracy: ___% (vs manual calculation)' >> /opt/analytics/docs/test_report_$(date +%Y%m%d).txt
echo 'Alert Response Time: ___seconds' >> /opt/analytics/docs/test_report_$(date +%Y%m%d).txt
echo 'Sensor Accuracy: +/- ___%' >> /opt/analytics/docs/test_report_$(date +%Y%m%d).txt
echo 'ERP Data Match: PASS/FAIL' >> /opt/analytics/docs/test_report_$(date +%Y%m%d).txtValidation against manual records is the single most important step for client buy-in. If the system shows different OEE numbers than what operators believe, investigate the discrepancy immediately — it could be a data issue or it could reveal that manual tracking was inaccurate (common). Document all test results with screenshots for compliance requirements (ISO 9001, 21 CFR Part 11). Have the plant manager and quality manager sign off on test results before declaring the system production-ready.
Custom AI Components
Downtime Root Cause Classifier
Type: skill A supervised machine learning model that classifies unplanned downtime events into root cause categories and identifies the most likely underlying factors. The model uses production context (machine, part number, shift, operator, preceding sensor readings, time since last maintenance) to predict the root cause category when operators fail to enter reason codes, and to validate operator-entered codes against sensor evidence. It uses a Random Forest classifier with SHAP explainability.
Implementation:
#!/usr/bin/env python3
"""
Downtime Root Cause Classifier
File: /opt/analytics/models/downtime_classifier/train.py
Trains a Random Forest classifier on historical downtime events
to predict root cause categories from production context features.
"""
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.model_selection import cross_val_score, train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.metrics import classification_report, confusion_matrix
from imblearn.over_sampling import SMOTE
import shap
import joblib
import psycopg2
import json
import logging
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
MODEL_DIR = '/opt/analytics/models/downtime_classifier'
DB_CONN = 'host=localhost dbname=production_analytics user=analytics_app password=CHANGE_THIS'
def load_training_data():
"""Load and merge downtime events with sensor context."""
conn = psycopg2.connect(DB_CONN)
# Load downtime events with reason codes (labeled data)
downtime_df = pd.read_sql("""
SELECT d.time_start, d.time_end, d.machine_id, d.duration_min,
d.reason_category, d.reason_detail, d.shift, d.operator_id,
d.work_order, w.part_number, w.material_cost_per_unit
FROM downtime_events d
LEFT JOIN work_orders w ON d.work_order = w.work_order
WHERE d.reason_category IS NOT NULL
AND d.reason_category != ''
AND d.duration_min > 1
""", conn)
# For each downtime event, get preceding sensor readings (5 min before)
enriched_rows = []
for _, row in downtime_df.iterrows():
sensor_df = pd.read_sql(f"""
SELECT metric_name, AVG(value) as avg_val,
STDDEV(value) as std_val, MAX(value) as max_val
FROM machine_telemetry
WHERE machine_id = '{row['machine_id']}'
AND time BETWEEN '{row['time_start']}'::timestamptz - interval '5 minutes'
AND '{row['time_start']}'::timestamptz
GROUP BY metric_name
""", conn)
sensor_features = {}
for _, s in sensor_df.iterrows():
name = s['metric_name'].replace(' ', '_')
sensor_features[f'{name}_avg'] = s['avg_val']
sensor_features[f'{name}_std'] = s['std_val']
sensor_features[f'{name}_max'] = s['max_val']
enriched_row = row.to_dict()
enriched_row.update(sensor_features)
enriched_rows.append(enriched_row)
conn.close()
return pd.DataFrame(enriched_rows)
def engineer_features(df):
"""Create features for the classifier."""
features = pd.DataFrame()
# Temporal features
df['time_start'] = pd.to_datetime(df['time_start'])
features['hour_of_day'] = df['time_start'].dt.hour
features['day_of_week'] = df['time_start'].dt.dayofweek
features['is_night_shift'] = ((df['time_start'].dt.hour >= 23) | (df['time_start'].dt.hour < 6)).astype(int)
# Duration feature
features['duration_min'] = df['duration_min']
# Encode categorical features
le_machine = LabelEncoder()
features['machine_encoded'] = le_machine.fit_transform(df['machine_id'].fillna('unknown'))
joblib.dump(le_machine, f'{MODEL_DIR}/le_machine.pkl')
le_shift = LabelEncoder()
features['shift_encoded'] = le_shift.fit_transform(df['shift'].fillna('unknown'))
joblib.dump(le_shift, f'{MODEL_DIR}/le_shift.pkl')
le_part = LabelEncoder()
features['part_encoded'] = le_part.fit_transform(df['part_number'].fillna('unknown'))
joblib.dump(le_part, f'{MODEL_DIR}/le_part.pkl')
# Sensor features (vibration, temperature, etc.)
sensor_cols = [c for c in df.columns if c.endswith(('_avg', '_std', '_max'))]
for col in sensor_cols:
features[col] = pd.to_numeric(df[col], errors='coerce').fillna(0)
# Material cost (proxy for part complexity)
features['material_cost'] = pd.to_numeric(df['material_cost_per_unit'], errors='coerce').fillna(0)
return features
def train_model():
"""Train the downtime root cause classifier."""
logger.info('Loading training data...')
df = load_training_data()
if len(df) < 100:
logger.warning(f'Only {len(df)} labeled events. Need 100+ for reliable model. Using rule-based fallback.')
return None
logger.info(f'Training on {len(df)} downtime events')
# Prepare features and target
X = engineer_features(df)
le_target = LabelEncoder()
y = le_target.fit_transform(df['reason_category'])
joblib.dump(le_target, f'{MODEL_DIR}/le_target.pkl')
# Handle class imbalance with SMOTE
smote = SMOTE(random_state=42, k_neighbors=min(5, min(pd.Series(y).value_counts()) - 1))
X_resampled, y_resampled = smote.fit_resample(X.fillna(0), y)
# Train/test split
X_train, X_test, y_train, y_test = train_test_split(X_resampled, y_resampled, test_size=0.2, random_state=42)
# Train Random Forest
model = RandomForestClassifier(
n_estimators=200,
max_depth=15,
min_samples_split=5,
min_samples_leaf=2,
class_weight='balanced',
random_state=42,
n_jobs=-1
)
model.fit(X_train, y_train)
# Evaluate
y_pred = model.predict(X_test)
report = classification_report(y_test, y_pred, target_names=le_target.classes_, output_dict=True)
logger.info(f'Model accuracy: {report["accuracy"]:.3f}')
logger.info(f'Classification Report:\n{classification_report(y_test, y_pred, target_names=le_target.classes_)}')
# SHAP explainability
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X_test[:100])
# Save model and artifacts
joblib.dump(model, f'{MODEL_DIR}/rf_model.pkl')
joblib.dump(explainer, f'{MODEL_DIR}/shap_explainer.pkl')
# Save feature importance
importance = pd.DataFrame({
'feature': X.columns,
'importance': model.feature_importances_
}).sort_values('importance', ascending=False)
importance.to_csv(f'{MODEL_DIR}/feature_importance.csv', index=False)
# Save metadata
metadata = {
'trained_at': datetime.utcnow().isoformat(),
'training_samples': len(X_resampled),
'features': list(X.columns),
'classes': list(le_target.classes_),
'accuracy': float(report['accuracy']),
'f1_weighted': float(report['weighted avg']['f1-score'])
}
with open(f'{MODEL_DIR}/metadata.json', 'w') as f:
json.dump(metadata, f, indent=2)
logger.info(f'Model saved to {MODEL_DIR}')
return model
if __name__ == '__main__':
train_model()Scrap Anomaly Detector
Type: skill An unsupervised anomaly detection model that monitors real-time scrap rates and identifies statistically significant deviations from normal patterns. Uses Isolation Forest for multivariate anomaly detection across scrap rate, reject counts, cycle time deviation, and sensor readings. When anomalies are detected, the system generates alerts with correlated potential causes by analyzing which process parameters deviated simultaneously.
Implementation:
# /opt/analytics/models/scrap_detector/detect.py
#!/usr/bin/env python3
"""
Scrap Anomaly Detector
File: /opt/analytics/models/scrap_detector/detect.py
Monitors scrap rates and identifies anomalies using Isolation Forest.
Correlates anomalies with process parameter deviations.
"""
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import psycopg2
import joblib
import json
import logging
from datetime import datetime, timedelta
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
MODEL_DIR = '/opt/analytics/models/scrap_detector'
DB_CONN = 'host=localhost dbname=production_analytics user=analytics_app password=CHANGE_THIS'
def load_production_data(hours_back=24*30):
"""Load production and sensor data for training/inference."""
conn = psycopg2.connect(DB_CONN)
# Hourly aggregated production metrics
production_df = pd.read_sql(f"""
SELECT
time_bucket('1 hour', time) as hour,
machine_id,
SUM(good_count) as good_parts,
SUM(reject_count) as reject_parts,
AVG(cycle_time_ms) as avg_cycle_time,
STDDEV(cycle_time_ms) as std_cycle_time,
COUNT(*) as cycle_count
FROM production_events
WHERE event_type = 'cycle_complete'
AND time > NOW() - interval '{hours_back} hours'
GROUP BY 1, 2
HAVING SUM(good_count) + SUM(reject_count) > 0
ORDER BY 1
""", conn)
# Hourly sensor averages
sensor_df = pd.read_sql(f"""
SELECT
time_bucket('1 hour', time) as hour,
machine_id,
metric_name,
AVG(value) as avg_value,
STDDEV(value) as std_value,
MAX(value) as max_value,
MIN(value) as min_value
FROM machine_telemetry
WHERE time > NOW() - interval '{hours_back} hours'
GROUP BY 1, 2, 3
""", conn)
conn.close()
# Pivot sensor data to wide format
if not sensor_df.empty:
sensor_pivot = sensor_df.pivot_table(
index=['hour', 'machine_id'],
columns='metric_name',
values=['avg_value', 'std_value', 'max_value'],
aggfunc='first'
)
sensor_pivot.columns = [f'{col[1]}_{col[0]}' for col in sensor_pivot.columns]
sensor_pivot = sensor_pivot.reset_index()
merged = production_df.merge(sensor_pivot, on=['hour', 'machine_id'], how='left')
else:
merged = production_df
return merged
def engineer_scrap_features(df):
"""Create features for anomaly detection."""
features = pd.DataFrame()
# Core scrap metrics
total_parts = df['good_parts'] + df['reject_parts']
features['scrap_rate'] = df['reject_parts'] / total_parts.replace(0, np.nan)
features['reject_count'] = df['reject_parts']
# Cycle time metrics
features['avg_cycle_time'] = df['avg_cycle_time']
features['cycle_time_variability'] = df['std_cycle_time'] / df['avg_cycle_time'].replace(0, np.nan)
features['throughput'] = df['cycle_count']
# Temporal features
df['hour'] = pd.to_datetime(df['hour'])
features['hour_of_day'] = df['hour'].dt.hour
features['is_shift_start'] = df['hour'].dt.hour.isin([6, 14, 22]).astype(int)
# Sensor features (dynamic based on available sensors)
sensor_cols = [c for c in df.columns if any(x in c for x in ['Vibration', 'Temperature', 'Pressure', 'Speed'])]
for col in sensor_cols:
features[col] = pd.to_numeric(df[col], errors='coerce')
return features.fillna(0)
def train_anomaly_detector():
"""Train Isolation Forest on historical production data."""
logger.info('Loading training data for anomaly detector...')
df = load_production_data(hours_back=24*60) # 60 days of history
if len(df) < 200:
logger.warning(f'Only {len(df)} hourly records. Need 200+ for reliable model.')
return None
features = engineer_scrap_features(df)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(features)
model = IsolationForest(
n_estimators=200,
max_samples='auto',
contamination=0.05, # Expect ~5% of data points to be anomalous
random_state=42,
n_jobs=-1
)
model.fit(X_scaled)
joblib.dump(model, f'{MODEL_DIR}/iso_forest.pkl')
joblib.dump(scaler, f'{MODEL_DIR}/scaler.pkl')
metadata = {
'trained_at': datetime.utcnow().isoformat(),
'training_samples': len(features),
'features': list(features.columns),
'contamination': 0.05
}
with open(f'{MODEL_DIR}/metadata.json', 'w') as f:
json.dump(metadata, f, indent=2)
logger.info(f'Anomaly detector trained on {len(features)} samples')
return model
def detect_anomalies(hours_back=24):
"""Run anomaly detection on recent production data."""
model = joblib.load(f'{MODEL_DIR}/iso_forest.pkl')
scaler = joblib.load(f'{MODEL_DIR}/scaler.pkl')
df = load_production_data(hours_back=hours_back)
if df.empty:
return []
features = engineer_scrap_features(df)
X_scaled = scaler.transform(features)
# Predict: -1 = anomaly, 1 = normal
predictions = model.predict(X_scaled)
scores = model.decision_function(X_scaled)
anomalies = []
anomaly_mask = predictions == -1
for idx in df[anomaly_mask].index:
row = df.loc[idx]
feat_row = features.loc[idx]
# Identify which features are most anomalous
feature_z_scores = {}
for col in features.columns:
z = abs(X_scaled[idx][list(features.columns).index(col)])
if z > 2.0: # More than 2 std deviations
feature_z_scores[col] = round(float(z), 2)
anomaly = {
'timestamp': str(row['hour']),
'machine_id': row['machine_id'],
'scrap_rate': round(float(feat_row.get('scrap_rate', 0)) * 100, 2),
'anomaly_score': round(float(scores[idx]), 4),
'severity': 'HIGH' if scores[idx] < -0.3 else 'MEDIUM',
'deviating_parameters': feature_z_scores,
'likely_causes': generate_cause_hypothesis(feature_z_scores, row)
}
anomalies.append(anomaly)
# Write anomalies to database
if anomalies:
write_anomalies_to_db(anomalies)
return anomalies
def generate_cause_hypothesis(deviating_params, production_row):
"""Generate human-readable cause hypotheses based on deviating parameters."""
causes = []
for param, z_score in sorted(deviating_params.items(), key=lambda x: -x[1]):
if 'Vibration' in param and z_score > 2.5:
causes.append(f'Elevated vibration (z={z_score}) may indicate bearing wear, imbalance, or loose mounting')
elif 'Temperature' in param and z_score > 2.5:
causes.append(f'Abnormal temperature (z={z_score}) may indicate coolant issue, friction, or overload')
elif 'cycle_time_variability' in param and z_score > 2.0:
causes.append(f'High cycle time variability (z={z_score}) suggests inconsistent material or tool wear')
elif 'scrap_rate' in param and z_score > 2.0:
causes.append(f'Scrap rate spike (z={z_score}) - check material batch, tool condition, and setup parameters')
elif 'is_shift_start' in param and z_score > 1.5:
causes.append('Anomaly coincides with shift change - possible setup or warmup issue')
if not causes:
causes.append('Multiple parameters deviating simultaneously - investigate holistically')
return causes
def write_anomalies_to_db(anomalies):
"""Write detected anomalies to the analytics database."""
conn = psycopg2.connect(DB_CONN)
cur = conn.cursor()
for a in anomalies:
cur.execute("""
INSERT INTO ml_anomalies (time, machine_id, anomaly_type, severity,
scrap_rate_pct, anomaly_score,
deviating_parameters, likely_causes)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
""", (
a['timestamp'], a['machine_id'], 'scrap_anomaly', a['severity'],
a['scrap_rate'], a['anomaly_score'],
json.dumps(a['deviating_parameters']),
json.dumps(a['likely_causes'])
))
conn.commit()
cur.close()
conn.close()
logger.info(f'Wrote {len(anomalies)} anomalies to database')
if __name__ == '__main__':
import sys
if '--train' in sys.argv:
train_anomaly_detector()
else:
anomalies = detect_anomalies(hours_back=24)
print(json.dumps(anomalies, indent=2))Yield Prediction and Optimization Agent
Type: agent A predictive model that forecasts expected yield for upcoming production runs based on historical patterns, current machine conditions, and environmental factors. Uses Facebook Prophet for time-series forecasting of yield trends combined with a gradient boosting model for run-specific yield prediction. Outputs predicted yield, confidence interval, and actionable recommendations to optimize yield for the upcoming shift.
Implementation:
# /opt/analytics/models/yield_predictor/predict.py
#!/usr/bin/env python3
"""
Yield Prediction & Optimization Agent
File: /opt/analytics/models/yield_predictor/predict.py
Forecasts expected yield and generates optimization recommendations.
"""
import pandas as pd
import numpy as np
from prophet import Prophet
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.model_selection import cross_val_score
import psycopg2
import joblib
import json
import logging
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
MODEL_DIR = '/opt/analytics/models/yield_predictor'
DB_CONN = 'host=localhost dbname=production_analytics user=analytics_app password=CHANGE_THIS'
def load_yield_history():
"""Load historical yield data aggregated by shift."""
conn = psycopg2.connect(DB_CONN)
df = pd.read_sql("""
SELECT
date_trunc('day', time) +
CASE
WHEN EXTRACT(hour FROM time) BETWEEN 6 AND 13 THEN interval '6 hours'
WHEN EXTRACT(hour FROM time) BETWEEN 14 AND 21 THEN interval '14 hours'
ELSE interval '22 hours'
END as shift_start,
machine_id,
SUM(good_count) as good_parts,
SUM(reject_count) as reject_parts,
AVG(cycle_time_ms) as avg_cycle_time,
STDDEV(cycle_time_ms) as std_cycle_time
FROM production_events
WHERE event_type = 'cycle_complete'
AND time > NOW() - interval '180 days'
GROUP BY 1, 2
HAVING SUM(good_count) + SUM(reject_count) > 10
ORDER BY 1
""", conn)
conn.close()
df['total_parts'] = df['good_parts'] + df['reject_parts']
df['yield_pct'] = (df['good_parts'] / df['total_parts']) * 100
return df
def train_prophet_forecast(df):
"""Train Prophet model for yield trend forecasting."""
# Aggregate across all machines for plant-level trend
daily = df.groupby(pd.to_datetime(df['shift_start']).dt.date).agg({
'good_parts': 'sum',
'reject_parts': 'sum'
}).reset_index()
daily.columns = ['ds', 'good_parts', 'reject_parts']
daily['ds'] = pd.to_datetime(daily['ds'])
daily['y'] = (daily['good_parts'] / (daily['good_parts'] + daily['reject_parts'])) * 100
model = Prophet(
changepoint_prior_scale=0.05,
seasonality_prior_scale=10,
daily_seasonality=False,
weekly_seasonality=True,
yearly_seasonality=False
)
model.add_country_holidays(country_name='US')
model.fit(daily[['ds', 'y']])
# Save model
with open(f'{MODEL_DIR}/prophet_model.json', 'w') as f:
from prophet.serialize import model_to_json
f.write(model_to_json(model))
return model
def train_yield_predictor(df):
"""Train gradient boosting model for per-run yield prediction."""
features = pd.DataFrame()
features['hour_of_day'] = pd.to_datetime(df['shift_start']).dt.hour
features['day_of_week'] = pd.to_datetime(df['shift_start']).dt.dayofweek
features['avg_cycle_time'] = df['avg_cycle_time']
features['cycle_time_cv'] = df['std_cycle_time'] / df['avg_cycle_time'].replace(0, np.nan)
features['total_parts'] = df['total_parts']
from sklearn.preprocessing import LabelEncoder
le = LabelEncoder()
features['machine_encoded'] = le.fit_transform(df['machine_id'])
joblib.dump(le, f'{MODEL_DIR}/le_machine.pkl')
target = df['yield_pct']
# Remove NaN rows
valid_mask = features.notna().all(axis=1) & target.notna()
X = features[valid_mask].fillna(0)
y = target[valid_mask]
model = GradientBoostingRegressor(
n_estimators=200,
max_depth=6,
learning_rate=0.05,
subsample=0.8,
random_state=42
)
scores = cross_val_score(model, X, y, cv=5, scoring='r2')
logger.info(f'Yield predictor CV R\u00b2: {scores.mean():.3f} (+/- {scores.std():.3f})')
model.fit(X, y)
joblib.dump(model, f'{MODEL_DIR}/gb_model.pkl')
metadata = {
'trained_at': datetime.utcnow().isoformat(),
'training_samples': len(X),
'cv_r2_mean': round(float(scores.mean()), 3),
'cv_r2_std': round(float(scores.std()), 3),
'features': list(X.columns)
}
with open(f'{MODEL_DIR}/metadata.json', 'w') as f:
json.dump(metadata, f, indent=2)
return model
def generate_recommendations(current_conditions, predicted_yield, historical_df):
"""Generate actionable recommendations to improve yield."""
recommendations = []
# Compare current conditions to high-yield historical periods
high_yield = historical_df[historical_df['yield_pct'] > historical_df['yield_pct'].quantile(0.9)]
low_yield = historical_df[historical_df['yield_pct'] < historical_df['yield_pct'].quantile(0.1)]
if current_conditions.get('avg_cycle_time', 0) > high_yield['avg_cycle_time'].mean() * 1.1:
recommendations.append({
'priority': 'HIGH',
'area': 'Cycle Time',
'action': f"Current avg cycle time ({current_conditions.get('avg_cycle_time', 0):.0f}ms) is {((current_conditions.get('avg_cycle_time', 0) / high_yield['avg_cycle_time'].mean()) - 1) * 100:.1f}% above optimal. Check tool wear, feed rates, and material hardness.",
'expected_impact': f"Optimizing to {high_yield['avg_cycle_time'].mean():.0f}ms could improve yield by {high_yield['yield_pct'].mean() - predicted_yield:.1f}%"
})
if current_conditions.get('cycle_time_cv', 0) > 0.15:
recommendations.append({
'priority': 'HIGH',
'area': 'Process Stability',
'action': f"Cycle time coefficient of variation ({current_conditions.get('cycle_time_cv', 0):.2f}) indicates unstable process. Investigate material consistency, fixture clamping, and environmental factors.",
'expected_impact': 'Reducing variability below 0.10 historically correlates with 3-5% yield improvement'
})
hour = current_conditions.get('hour_of_day', 12)
if hour in [6, 7, 22, 23]: # Shift start hours
recommendations.append({
'priority': 'MEDIUM',
'area': 'Shift Transition',
'action': 'Current/upcoming shift change period. Ensure thorough handoff, machine warmup cycle, and first-article inspection.',
'expected_impact': 'Proper shift transitions reduce shift-start scrap by 30-50% historically'
})
return recommendations
def predict_yield():
"""Generate yield prediction for the next shift."""
df = load_yield_history()
# Prophet trend forecast
from prophet.serialize import model_from_json
with open(f'{MODEL_DIR}/prophet_model.json', 'r') as f:
prophet_model = model_from_json(f.read())
future = prophet_model.make_future_dataframe(periods=7)
forecast = prophet_model.predict(future)
next_day_forecast = forecast.iloc[-7:][['ds', 'yhat', 'yhat_lower', 'yhat_upper']]
# Current conditions for GB prediction
conn = psycopg2.connect(DB_CONN)
current = pd.read_sql("""
SELECT machine_id, AVG(cycle_time_ms) as avg_cycle_time,
STDDEV(cycle_time_ms) as std_cycle_time,
SUM(good_count) + SUM(reject_count) as total_parts
FROM production_events
WHERE event_type = 'cycle_complete'
AND time > NOW() - interval '4 hours'
GROUP BY machine_id
""", conn)
conn.close()
results = {
'generated_at': datetime.utcnow().isoformat(),
'trend_forecast': next_day_forecast.to_dict('records'),
'machine_predictions': [],
'recommendations': []
}
if not current.empty:
gb_model = joblib.load(f'{MODEL_DIR}/gb_model.pkl')
le_machine = joblib.load(f'{MODEL_DIR}/le_machine.pkl')
for _, row in current.iterrows():
conditions = {
'avg_cycle_time': row['avg_cycle_time'],
'cycle_time_cv': row['std_cycle_time'] / max(row['avg_cycle_time'], 1),
'total_parts': row['total_parts'],
'hour_of_day': datetime.now().hour,
'day_of_week': datetime.now().weekday()
}
try:
machine_enc = le_machine.transform([row['machine_id']])[0]
except ValueError:
machine_enc = 0
X_pred = pd.DataFrame([{
'hour_of_day': conditions['hour_of_day'],
'day_of_week': conditions['day_of_week'],
'avg_cycle_time': conditions['avg_cycle_time'],
'cycle_time_cv': conditions['cycle_time_cv'],
'total_parts': conditions['total_parts'],
'machine_encoded': machine_enc
}])
predicted_yield = float(gb_model.predict(X_pred)[0])
results['machine_predictions'].append({
'machine_id': row['machine_id'],
'predicted_yield_pct': round(predicted_yield, 1),
'current_conditions': conditions
})
recs = generate_recommendations(conditions, predicted_yield, df[df['machine_id'] == row['machine_id']])
results['recommendations'].extend(recs)
# Write results to database
conn = psycopg2.connect(DB_CONN)
cur = conn.cursor()
cur.execute("""
INSERT INTO ml_predictions (time, prediction_type, results)
VALUES (NOW(), 'yield_forecast', %s)
""", (json.dumps(results),))
conn.commit()
cur.close()
conn.close()
return results
if __name__ == '__main__':
import sys
if '--train' in sys.argv:
df = load_yield_history()
train_prophet_forecast(df)
train_yield_predictor(df)
logger.info('All yield prediction models trained successfully')
else:
results = predict_yield()
print(json.dumps(results, indent=2, default=str))OEE Calculation Workflow
Type: workflow An automated data pipeline workflow that calculates Overall Equipment Effectiveness (OEE) hourly for each machine by combining availability, performance, and quality metrics from multiple data sources. The workflow pulls real-time data from TimescaleDB, computes OEE components using standard manufacturing formulas, stores results in the oee_hourly table, and triggers alerts when OEE drops below configurable thresholds.
Implementation:
# /opt/analytics/scripts/oee_calculator.py
#!/usr/bin/env python3
"""
OEE Calculation Workflow
File: /opt/analytics/scripts/oee_calculator.py
Runs hourly via cron to calculate OEE for each machine.
Also supports real-time calculation triggered by Node-RED.
Crontab entry:
0 * * * * /opt/analytics/ml-env/bin/python /opt/analytics/scripts/oee_calculator.py
"""
import psycopg2
import pandas as pd
import json
import logging
import smtplib
from email.mime.text import MIMEText
from datetime import datetime, timedelta
import os
logging.basicConfig(
level=logging.INFO,
filename='/var/log/analytics/oee_calculator.log',
format='%(asctime)s %(levelname)s %(message)s'
)
logger = logging.getLogger(__name__)
DB_CONN = os.environ.get('DB_CONN', 'host=localhost dbname=production_analytics user=analytics_app password=CHANGE_THIS')
# OEE alert thresholds (configurable per client)
ALERT_THRESHOLDS = {
'oee_critical': 40.0, # % - trigger immediate alert
'oee_warning': 60.0, # % - trigger warning
'availability_critical': 70.0,
'quality_critical': 95.0, # Manufacturing quality typically >95%
'performance_critical': 75.0
}
# Shift schedule (customize per client)
SHIFT_SCHEDULE = {
'shift_1': {'start': 6, 'end': 14, 'break_min': 30},
'shift_2': {'start': 14, 'end': 22, 'break_min': 30},
'shift_3': {'start': 22, 'end': 6, 'break_min': 30}
}
def get_planned_production_time(hour, shift_schedule=SHIFT_SCHEDULE):
"""Return planned production minutes for a given hour."""
for shift_name, shift in shift_schedule.items():
if shift['start'] <= hour < shift['end'] or \
(shift['start'] > shift['end'] and (hour >= shift['start'] or hour < shift['end'])):
return 60 # Full hour of planned production
return 0 # Outside planned production time
def calculate_oee_hourly(target_hour=None):
"""Calculate OEE for all machines for the specified hour."""
conn = psycopg2.connect(DB_CONN)
if target_hour is None:
target_hour = datetime.utcnow().replace(minute=0, second=0, microsecond=0) - timedelta(hours=1)
hour_end = target_hour + timedelta(hours=1)
planned_minutes = get_planned_production_time(target_hour.hour)
if planned_minutes == 0:
logger.info(f'Hour {target_hour} is outside planned production. Skipping.')
return []
# Get production data for the hour
production = pd.read_sql(f"""
SELECT
machine_id,
SUM(good_count) as good_parts,
SUM(reject_count) as reject_parts,
AVG(cycle_time_ms) as avg_actual_cycle_time_ms,
COUNT(*) as total_cycles
FROM production_events
WHERE event_type = 'cycle_complete'
AND time >= '{target_hour}'::timestamptz
AND time < '{hour_end}'::timestamptz
GROUP BY machine_id
""", conn)
# Get downtime for the hour
downtime = pd.read_sql(f"""
SELECT
machine_id,
SUM(
EXTRACT(EPOCH FROM (
LEAST(time_end, '{hour_end}'::timestamptz) -
GREATEST(time_start, '{target_hour}'::timestamptz)
)) / 60.0
) as downtime_minutes
FROM downtime_events
WHERE time_start < '{hour_end}'::timestamptz
AND (time_end IS NULL OR time_end > '{target_hour}'::timestamptz)
GROUP BY machine_id
""", conn)
# Get ideal cycle times from machine configuration
# (stored in a config table or hardcoded per machine)
ideal_cycle_times = pd.read_sql("""
SELECT machine_id, ideal_cycle_time_ms
FROM machine_config
""", conn)
results = []
for _, prod_row in production.iterrows():
machine_id = prod_row['machine_id']
good_parts = int(prod_row['good_parts'])
reject_parts = int(prod_row['reject_parts'])
total_parts = good_parts + reject_parts
# Availability = (Planned Time - Downtime) / Planned Time
machine_downtime = downtime[downtime['machine_id'] == machine_id]
dt_minutes = float(machine_downtime['downtime_minutes'].iloc[0]) if not machine_downtime.empty else 0
run_time = max(planned_minutes - dt_minutes, 0)
availability = run_time / planned_minutes if planned_minutes > 0 else 0
# Performance = (Ideal Cycle Time * Total Parts) / Run Time
ideal_ct = ideal_cycle_times[ideal_cycle_times['machine_id'] == machine_id]
ideal_ct_ms = float(ideal_ct['ideal_cycle_time_ms'].iloc[0]) if not ideal_ct.empty else prod_row['avg_actual_cycle_time_ms']
ideal_ct_min = ideal_ct_ms / 60000.0
performance = (ideal_ct_min * total_parts) / run_time if run_time > 0 else 0
performance = min(performance, 1.0) # Cap at 100%
# Quality = Good Parts / Total Parts
quality = good_parts / total_parts if total_parts > 0 else 0
# OEE = Availability * Performance * Quality
oee = availability * performance * quality
result = {
'time_bucket': target_hour,
'machine_id': machine_id,
'availability': round(availability, 4),
'performance': round(performance, 4),
'quality': round(quality, 4),
'oee': round(oee, 4),
'good_count': good_parts,
'reject_count': reject_parts,
'planned_time_min': planned_minutes,
'run_time_min': round(run_time, 1)
}
results.append(result)
# Check alert thresholds
check_alerts(result)
# Write results to database
cur = conn.cursor()
for r in results:
cur.execute("""
INSERT INTO oee_hourly (time_bucket, machine_id, availability, performance,
quality, oee, good_count, reject_count,
planned_time_min, run_time_min)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT DO NOTHING
""", (
r['time_bucket'], r['machine_id'], r['availability'], r['performance'],
r['quality'], r['oee'], r['good_count'], r['reject_count'],
r['planned_time_min'], r['run_time_min']
))
conn.commit()
cur.close()
conn.close()
logger.info(f'Calculated OEE for {len(results)} machines at {target_hour}')
return results
def check_alerts(oee_result):
"""Check OEE result against thresholds and send alerts."""
alerts = []
if oee_result['oee'] * 100 < ALERT_THRESHOLDS['oee_critical']:
alerts.append(f"CRITICAL: {oee_result['machine_id']} OEE={oee_result['oee']*100:.1f}% (threshold: {ALERT_THRESHOLDS['oee_critical']}%)")
elif oee_result['oee'] * 100 < ALERT_THRESHOLDS['oee_warning']:
alerts.append(f"WARNING: {oee_result['machine_id']} OEE={oee_result['oee']*100:.1f}% (threshold: {ALERT_THRESHOLDS['oee_warning']}%)")
if oee_result['quality'] * 100 < ALERT_THRESHOLDS['quality_critical']:
alerts.append(f"QUALITY ALERT: {oee_result['machine_id']} Quality={oee_result['quality']*100:.1f}%")
for alert_msg in alerts:
logger.warning(alert_msg)
# Write alert to database for dashboard display
try:
conn = psycopg2.connect(DB_CONN)
cur = conn.cursor()
cur.execute("""
INSERT INTO alerts (time, machine_id, alert_type, severity, message)
VALUES (NOW(), %s, 'oee', %s, %s)
""", (
oee_result['machine_id'],
'CRITICAL' if 'CRITICAL' in alert_msg else 'WARNING',
alert_msg
))
conn.commit()
cur.close()
conn.close()
except Exception as e:
logger.error(f'Failed to write alert: {e}')
if __name__ == '__main__':
results = calculate_oee_hourly()
for r in results:
print(f"{r['machine_id']}: OEE={r['oee']*100:.1f}% (A={r['availability']*100:.1f}% P={r['performance']*100:.1f}% Q={r['quality']*100:.1f}%)")Daily Insights Report Generator
Type: prompt A templated report generator that produces a daily executive summary of production performance, combining OEE metrics, scrap analysis, downtime root causes, and ML-generated insights into a formatted email or PDF sent to plant management every morning. Uses Jinja2 templates for formatting and pulls all data from TimescaleDB.
Implementation:
# /opt/analytics/scripts/generate_insights.py. Schedule via cron: 0 5 * * *
# (5 AM daily, before shift 1 starts)
#!/usr/bin/env python3
"""
Daily Insights Report Generator
File: /opt/analytics/scripts/generate_insights.py
Generates and emails a daily production insights report.
Schedule via cron: 0 5 * * * (5 AM daily, before shift 1 starts)
Crontab entry:
0 5 * * * /opt/analytics/ml-env/bin/python /opt/analytics/scripts/generate_insights.py
"""
import psycopg2
import pandas as pd
import json
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime, timedelta
import os
import logging
logging.basicConfig(level=logging.INFO, filename='/var/log/analytics/insights.log')
logger = logging.getLogger(__name__)
DB_CONN = os.environ.get('DB_CONN', 'host=localhost dbname=production_analytics user=analytics_app password=CHANGE_THIS')
SMTP_SERVER = os.environ.get('SMTP_SERVER', 'smtp.office365.com')
SMTP_PORT = int(os.environ.get('SMTP_PORT', '587'))
SMTP_USER = os.environ.get('SMTP_USER', 'analytics@client.com')
SMTP_PASS = os.environ.get('SMTP_PASS', 'CHANGE_THIS')
RECIPIENTS = os.environ.get('REPORT_RECIPIENTS', 'plantmanager@client.com,qualitymanager@client.com').split(',')
def generate_report():
conn = psycopg2.connect(DB_CONN)
yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
today = datetime.now().strftime('%Y-%m-%d')
# 1. OEE Summary
oee_df = pd.read_sql(f"""
SELECT machine_id,
AVG(availability) * 100 as avg_availability,
AVG(performance) * 100 as avg_performance,
AVG(quality) * 100 as avg_quality,
AVG(oee) * 100 as avg_oee,
SUM(good_count) as total_good,
SUM(reject_count) as total_rejects
FROM oee_hourly
WHERE time_bucket >= '{yesterday}'::date
AND time_bucket < '{today}'::date
GROUP BY machine_id
ORDER BY avg_oee ASC
""", conn)
plant_oee = oee_df['avg_oee'].mean() if not oee_df.empty else 0
# 2. Top Downtime Causes
downtime_df = pd.read_sql(f"""
SELECT reason_category, COUNT(*) as event_count,
SUM(duration_min) as total_minutes,
AVG(duration_min) as avg_duration
FROM downtime_events
WHERE time_start >= '{yesterday}'::date
AND time_start < '{today}'::date
AND reason_category IS NOT NULL
GROUP BY reason_category
ORDER BY total_minutes DESC
LIMIT 5
""", conn)
# 3. Top Scrap Causes
scrap_df = pd.read_sql(f"""
SELECT scrap_category, SUM(quantity) as total_qty,
SUM(quantity * material_cost) as total_cost
FROM scrap_events
WHERE time >= '{yesterday}'::date
AND time < '{today}'::date
GROUP BY scrap_category
ORDER BY total_cost DESC
LIMIT 5
""", conn)
total_scrap_cost = scrap_df['total_cost'].sum() if not scrap_df.empty else 0
# 4. ML Anomalies
anomaly_df = pd.read_sql(f"""
SELECT machine_id, severity, scrap_rate_pct, likely_causes
FROM ml_anomalies
WHERE time >= '{yesterday}'::date
AND time < '{today}'::date
ORDER BY severity DESC, scrap_rate_pct DESC
LIMIT 5
""", conn)
# 5. ML Yield Predictions
prediction_df = pd.read_sql(f"""
SELECT results
FROM ml_predictions
WHERE prediction_type = 'yield_forecast'
ORDER BY time DESC LIMIT 1
""", conn)
conn.close()
# Build HTML report
html = f"""
<html><body style='font-family: Arial, sans-serif; max-width: 800px; margin: auto;'>
<h1 style='color: #2c3e50;'>📊 Daily Production Insights Report</h1>
<p style='color: #7f8c8d;'>Report Date: {yesterday} | Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}</p>
<h2 style='color: #2980b9;'>🏭 Plant OEE Summary</h2>
<div style='background: {"#27ae60" if plant_oee >= 85 else "#f39c12" if plant_oee >= 65 else "#e74c3c"};
color: white; padding: 20px; border-radius: 8px; text-align: center; font-size: 36px; margin: 10px 0;'>
Plant OEE: {plant_oee:.1f}%
</div>
<table style='width: 100%; border-collapse: collapse; margin: 10px 0;'>
<tr style='background: #34495e; color: white;'>
<th style='padding: 8px;'>Machine</th>
<th>Availability</th><th>Performance</th><th>Quality</th>
<th>OEE</th><th>Good Parts</th><th>Rejects</th>
</tr>"""
for _, row in oee_df.iterrows():
color = '#27ae60' if row['avg_oee'] >= 85 else '#f39c12' if row['avg_oee'] >= 65 else '#e74c3c'
html += f"""
<tr style='border-bottom: 1px solid #ddd;'>
<td style='padding: 6px;'>{row['machine_id']}</td>
<td style='text-align:center;'>{row['avg_availability']:.1f}%</td>
<td style='text-align:center;'>{row['avg_performance']:.1f}%</td>
<td style='text-align:center;'>{row['avg_quality']:.1f}%</td>
<td style='text-align:center; color: {color}; font-weight: bold;'>{row['avg_oee']:.1f}%</td>
<td style='text-align:center;'>{int(row['total_good']):,}</td>
<td style='text-align:center; color: #e74c3c;'>{int(row['total_rejects']):,}</td>
</tr>"""
html += "</table>"
# Downtime section
html += f"<h2 style='color: #e67e22;'>⏱ Top Downtime Causes ({downtime_df['total_minutes'].sum():.0f} total minutes)</h2><ol>"
for _, row in downtime_df.iterrows():
html += f"<li><b>{row['reason_category']}</b>: {row['total_minutes']:.0f} min ({row['event_count']} events, avg {row['avg_duration']:.0f} min each)</li>"
html += "</ol>"
# Scrap section
html += f"<h2 style='color: #e74c3c;'>🗑 Scrap Summary (${total_scrap_cost:,.0f} total cost)</h2><ol>"
for _, row in scrap_df.iterrows():
html += f"<li><b>{row['scrap_category']}</b>: {int(row['total_qty']):,} units (${row['total_cost']:,.0f})</li>"
html += "</ol>"
# ML Insights
if not anomaly_df.empty:
html += "<h2 style='color: #8e44ad;'>🤖 AI-Detected Anomalies</h2><ul>"
for _, row in anomaly_df.iterrows():
causes = json.loads(row['likely_causes']) if isinstance(row['likely_causes'], str) else row['likely_causes']
html += f"<li><b>[{row['severity']}] {row['machine_id']}</b> - Scrap rate: {row['scrap_rate_pct']:.1f}%<br>"
html += '<br>'.join(causes if isinstance(causes, list) else [str(causes)])
html += "</li>"
html += "</ul>"
html += """<hr><p style='color: #95a5a6; font-size: 12px;'>This report was automatically generated by the Production Analytics Platform.
View interactive dashboards at <a href='http://10.200.1.20:3000'>Grafana</a> or
<a href='https://app.powerbi.com'>Power BI</a>. Contact your MSP for support.</p>
</body></html>"""
# Send email
msg = MIMEMultipart('alternative')
msg['Subject'] = f'📊 Production Report {yesterday} | Plant OEE: {plant_oee:.1f}%'
msg['From'] = SMTP_USER
msg['To'] = ', '.join(RECIPIENTS)
msg.attach(MIMEText(html, 'html'))
try:
with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
server.starttls()
server.login(SMTP_USER, SMTP_PASS)
server.send_message(msg)
logger.info(f'Daily report sent to {RECIPIENTS}')
except Exception as e:
logger.error(f'Failed to send report: {e}')
return html
if __name__ == '__main__':
generate_report()Node-RED Data Normalization Flow
Type: integration A Node-RED flow deployed on the Advantech edge gateway that subscribes to raw MQTT data from KEPServerEX, normalizes tag names and values into a standardized JSON schema, calculates derived metrics (e.g., incremental part counts from cumulative counters), and publishes normalized data to both the local TimescaleDB and Azure IoT Hub. This flow handles the critical data transformation layer between raw PLC data and the analytics platform.
Implementation:
# import via Menu > Import > Clipboard in the Node-RED UI at
# http://10.200.1.10:1880
// Node-RED Flow JSON
// Import via: Menu > Import > Clipboard in Node-RED UI at http://10.200.1.10:1880
//
// This flow:
// 1. Subscribes to all KEPServerEX MQTT topics (kepware/#)
// 2. Parses and normalizes the data
// 3. Calculates incremental counts from cumulative PLC counters
// 4. Writes to TimescaleDB
// 5. Forwards to Azure IoT Hub
[
{
"id": "mqtt-in-kepware",
"type": "mqtt in",
"name": "KEPServerEX Data",
"topic": "kepware/#",
"qos": "1",
"datatype": "json",
"broker": "local-mosquitto",
"x": 150,
"y": 200,
"wires": [["parse-topic"]]
},
{
"id": "local-mosquitto",
"type": "mqtt-broker",
"name": "Local Mosquitto",
"broker": "localhost",
"port": "1883",
"clientid": "nodered-edge",
"autoConnect": true,
"usetls": false,
"credentials": {
"user": "edgeuser",
"password": "CHANGE_THIS"
}
},
{
"id": "parse-topic",
"type": "function",
"name": "Parse & Normalize",
"func": "// Parse MQTT topic: kepware/{channel}/{device}/{tag}\nvar parts = msg.topic.split('/');
if (parts.length < 4) return null;\n\nvar channel = parts[1];\nvar device = parts[2];\nvar tag = parts.slice(3).join('/');
\n// Normalize machine_id\nvar machine_id = device; // e.g., 'Press01'\n\n// Determine metric name from tag\nvar metric_name = tag.replace(device + '.', '');\n\n// Extract value\nvar value = msg.payload;\nif (typeof value === 'object' && value.hasOwnProperty('value')) {\n value = value.value;\n}\nvalue = parseFloat(value);\nif (isNaN(value)) return null;\n\n// Determine unit based on metric name\nvar units = {\n 'CycleTimeActual_ms': 'ms',\n 'SpindleSpeed_RPM': 'RPM',\n 'Temperature_C': '°C',\n 'Vibration_Velocity_mms': 'mm/s',\n 'Vibration_Acceleration_g': 'g',\n 'Pressure_bar': 'bar',\n 'Power_kW': 'kW'\n};\nvar unit = units[metric_name] || 'raw';\n\n// Build normalized message\nmsg.normalized = {\n timestamp: new Date().toISOString(),\n machine_id: machine_id,\n metric_name: metric_name,\n value: value,\n unit: unit,\n source: 'kepware',\n channel: channel\n};\n\nreturn msg;",
"outputs": 1,
"x": 400,
"y": 200,
"wires": [["increment-calc", "write-timescaledb", "forward-azure"]]
},
{
"id": "increment-calc",
"type": "function",
"name": "Incremental Counter Calc",
"func": "// Calculate incremental part counts from cumulative PLC counters\n// PLC counters are cumulative; we need incremental counts for analytics\n\nvar metric = msg.normalized.metric_name;\nvar machine = msg.normalized.machine_id;\nvar value = msg.normalized.value;\n\n// Only process counter-type metrics\nvar counterMetrics = ['CycleCount', 'PartCountGood', 'PartCountReject'];\nif (counterMetrics.indexOf(metric) === -1) return null;\n\n// Get previous value from context\nvar key = machine + '_' + metric;\nvar prevValue = context.get(key) || value;\ncontext.set(key, value);\n\n// Calculate increment\nvar increment = value - prevValue;\n\n// Handle counter reset (PLC restart or overflow)\nif (increment < 0) increment = value;\nif (increment === 0) return null; // No change\n\n// Create production event\nvar eventType = 'cycle_complete';\nvar event = {\n timestamp: msg.normalized.timestamp,\n machine_id: machine,\n event_type: eventType,\n good_count: metric === 'PartCountGood' ? increment : 0,\n reject_count: metric === 'PartCountReject' ? increment : 0,\n cycle_time_ms: null\n};\n\nmsg.production_event = event;\nreturn msg;",
"outputs": 1,
"x": 650,
"y": 100,
"wires": [["write-production-events"]]
},
{
"id": "write-timescaledb",
"type": "function",
"name": "Format for TimescaleDB",
"func": "// Format INSERT for machine_telemetry table\nmsg.query = `INSERT INTO machine_telemetry (time, machine_id, metric_name, value, unit) VALUES ($1, $2, $3, $4, $5)`;\nmsg.params = [\n msg.normalized.timestamp,\n msg.normalized.machine_id,\n msg.normalized.metric_name,\n msg.normalized.value,\n msg.normalized.unit\n];\nreturn msg;",
"outputs": 1,
"x": 650,
"y": 200,
"wires": [["postgres-insert"]]
},
{
"id": "write-production-events",
"type": "function",
"name": "Format Production Event",
"func": "var e = msg.production_event;\nmsg.query = `INSERT INTO production_events (time, machine_id, event_type, good_count, reject_count, cycle_time_ms) VALUES ($1, $2, $3, $4, $5, $6)`;\nmsg.params = [e.timestamp, e.machine_id, e.event_type, e.good_count, e.reject_count, e.cycle_time_ms];\nreturn msg;",
"outputs": 1,
"x": 900,
"y": 100,
"wires": [["postgres-insert"]]
},
{
"id": "postgres-insert",
"type": "postgresql",
"name": "TimescaleDB Insert",
"query": "",
"postgreSQLConfig": "timescaledb-config",
"x": 1100,
"y": 200,
"wires": [[]]
},
{
"id": "timescaledb-config",
"type": "postgreSQLConfig",
"name": "TimescaleDB",
"host": "10.200.1.20",
"port": "5432",
"database": "production_analytics",
"ssl": false,
"credentials": {
"user": "analytics_app",
"password": "CHANGE_THIS"
}
},
{
"id": "forward-azure",
"type": "function",
"name": "Format for Azure IoT Hub",
"func": "// Rate limit: only forward every 5th reading per metric to reduce costs\nvar key = msg.normalized.machine_id + '_' + msg.normalized.metric_name + '_count';\nvar count = context.get(key) || 0;\ncount++;\ncontext.set(key, count);\n\nif (count % 5 !== 0) return null; // Skip 4 out of 5 readings\n\nmsg.payload = JSON.stringify(msg.normalized);\nreturn msg;",
"outputs": 1,
"x": 650,
"y": 300,
"wires": [["mqtt-out-azure"]]
},
{
"id": "mqtt-out-azure",
"type": "mqtt out",
"name": "Azure IoT Hub",
"topic": "devices/edge-gateway-prod/messages/events/",
"qos": "1",
"retain": false,
"broker": "azure-iot-hub",
"x": 900,
"y": 300,
"wires": []
},
{
"id": "azure-iot-hub",
"type": "mqtt-broker",
"name": "Azure IoT Hub",
"broker": "<iot-hub-name>.azure-devices.net",
"port": "8883",
"clientid": "edge-gateway-prod",
"autoConnect": true,
"usetls": true,
"credentials": {
"user": "<iot-hub-name>.azure-devices.net/edge-gateway-prod/?api-version=2021-04-12",
"password": "<SAS-TOKEN>"
}
}
]
// NOTE: After importing, you must:
// 1. Install node-red-contrib-postgresql via npm in the Node-RED user directory
// 2. Update the Azure IoT Hub broker credentials with actual SAS token
// 3. Update the TimescaleDB credentials
// 4. Deploy the flow and verify data flowing to both destinations
// 5. Monitor the Node-RED debug panel for any errors
//
// The rate limiting in the Azure forwarding function reduces IoT Hub message
// volume by 80%, keeping costs within the S1 tier allocation while still
// providing sufficient data resolution for cloud analytics.Testing & Validation
- DATA PIPELINE END-TO-END: Trigger a manual cycle on one machine. Verify the event appears in KEPServerEX Quick Client within 1 second, in Mosquitto MQTT subscription within 2 seconds, in TimescaleDB machine_telemetry table within 5 seconds, on the Grafana shop-floor dashboard within 10 seconds, and in Azure IoT Hub device telemetry within 30 seconds. Document latency at each stage.
- OEE CALCULATION ACCURACY: Select one machine and one complete shift (8 hours). Manually calculate OEE from paper production logs using the standard formula (Availability × Performance × Quality). Compare against the system-calculated OEE in the oee_hourly table. Results must match within ±2 percentage points. If they do not match, investigate whether the discrepancy is in availability (downtime tracking), performance (cycle time measurement), or quality (reject counting).
- SCRAP RATE VALIDATION: For a 24-hour period, compare total reject counts in the analytics system against the client's existing quality tracking system (QC log, ERP scrap report, or manual tally sheet). Counts must match within ±1%. Cross-reference scrap reason codes to ensure they are correctly categorized.
- DOWNTIME EVENT CAPTURE: Simulate three types of downtime events: (1) stop a machine for 5 minutes and enter reason code 'Tool Change', (2) trigger a fault condition if safely possible, (3) record a planned maintenance stop. Verify all three events appear in the downtime_events table with correct start/end times, durations, and reason codes.
- ALERT SYSTEM VERIFICATION: Test all configured alert thresholds: (a) Machine down > 15 minutes → verify production supervisor receives email/SMS within 2 minutes; (b) Scrap rate > 5% → verify quality manager alert; (c) OEE < 50% → verify plant manager alert; (d) Vibration exceeds Zone C threshold → verify maintenance alert. Document alert delivery time for each.
- SENSOR DATA ACCURACY: Compare ifm VVB001 vibration readings against a calibrated reference vibration meter (e.g., Fluke 805). Readings must match within ±10%. Verify temperature readings against an IR thermometer. For Banner wireless sensors, verify signal strength and data delivery rate (should be >99% over a 24-hour period).
- GRAFANA DASHBOARD FUNCTIONALITY: Verify all dashboard panels load correctly on Samsung tablets. Test: (a) real-time machine status updates within 10 seconds; (b) OEE gauges show correct values; (c) downtime Pareto chart filters work by date range, machine, and shift; (d) scrap rate trend displays correctly; (e) alert notifications appear on dashboard. Test with 3 different tablet devices simultaneously.
- POWER BI REPORT VALIDATION: Verify Power BI scheduled refresh completes successfully. Check: (a) data gateway connectivity to TimescaleDB; (b) all report pages render correctly; (c) DAX measures calculate OEE, scrap cost, and MTBF correctly; (d) drill-through from summary to detail works; (e) date slicers filter correctly; (f) report renders on mobile Power BI app.
- ERP DATA INTEGRITY: Verify ERP sync script pulls correct work order data. Compare 10 random work orders between the ERP screen and the analytics database work_orders table. Check: work order number, part number, order quantity, completed quantity, scrap quantity, and material cost. All fields must match exactly.
- ML MODEL BASELINE TEST: After initial model training (30+ days of data), run the scrap anomaly detector on a known anomaly period (a date when scrap was unusually high). Verify the model flags this period as anomalous. Also run on a known normal period and verify no false positives. Run the downtime classifier on 20 labeled events and verify >70% classification accuracy.
- NETWORK SECURITY VERIFICATION: Perform the following security tests: (a) attempt to ping the OT VLAN from the corporate network — should be blocked; (b) attempt to initiate a connection from the internet to the edge gateway — should be blocked; (c) verify all MQTT traffic between edge and Azure uses TLS (port 8883); (d) verify no unauthorized devices on the OT VLAN using Nmap scan; (e) verify all user accounts have unique credentials and default passwords are changed.
- FAILOVER AND RECOVERY: Test system resilience: (a) disconnect internet — verify local Grafana dashboards continue functioning and Mosquitto buffers messages; (b) restart the Dell PowerEdge T360 — verify all services (TimescaleDB, Grafana, cron jobs) auto-start; (c) disconnect one sensor — verify alert fires and other sensors continue; (d) simulate power outage — verify UPS graceful shutdown and no data corruption on restart.
- DAILY REPORT GENERATION: Trigger the daily insights report generator manually. Verify: (a) email arrives at all configured recipients; (b) OEE summary table shows all machines; (c) downtime Pareto and scrap summary sections are populated; (d) ML anomaly section shows detected anomalies (if any); (e) HTML formatting renders correctly in Outlook and Gmail; (f) all numbers match dashboard values for the same date.
Client Handoff
The client handoff meeting should be a 4-hour session with the plant manager, production supervisors, quality manager, maintenance lead, and IT administrator present. Cover the following topics:
Documentation to Leave Behind
- System Architecture Diagram (laminated, posted in server room)
- Network Topology Diagram with IP addresses and VLAN assignments
- Tag Dictionary spreadsheet (PLC addresses → MQTT topics → database fields)
- Downtime Reason Code Taxonomy (laminated, posted at each workstation)
- Operator Quick-Reference Cards (laminated, one per tablet/workstation)
- Power BI Navigation Guide (printed, one per manager)
- Alert Escalation Matrix (printed, posted in supervisor office)
- Admin Credentials Document (sealed envelope, stored in client's safe)
- Maintenance Runbook (digital, shared via SharePoint/OneDrive)
Success Criteria to Review Together
Maintenance
Monthly Maintenance Tasks (MSP Responsibility)
Quarterly Maintenance Tasks
Annual Maintenance Tasks
Monitoring & SLA Considerations
- Set up automated monitoring alerts (via Grafana, Azure Monitor, or an RMM tool) for: server CPU/memory/disk > 80%, database connection failures, cron job missed executions, sensor data gaps > 15 minutes, Azure IoT Hub throttling.
- SLA Target: 99.5% data pipeline uptime (measured monthly). Exclude planned maintenance windows.
- Response Times: Critical alerts (production data loss, server down): 1-hour response during business hours, 4-hour response after hours. Non-critical (dashboard issue, cosmetic): next business day.
Escalation Path
- Level 1: MSP help desk — dashboard access issues, password resets, simple queries
- Level 2: MSP senior engineer — data pipeline issues, sensor troubleshooting, network problems
- Level 3: MSP data engineer / ML specialist — model retraining, new analytics development, database tuning
- Level 4: Vendor support — KEPServerEX (PTC), MachineMetrics (MM support), Azure (Microsoft), Grafana (Grafana Labs)
Model Retraining Triggers (outside regular schedule)
- Client adds new machines or production lines
- Client introduces new product families or materials
- Major process change (new tooling, new suppliers, layout change)
- Model accuracy drops below 65% on the downtime classifier or R² drops below 0.5 on the yield predictor
- Scrap anomaly detector generates >20% false positive rate over a 30-day period
...
Option B: Full MES Platform (TrakSYS or Tulip)
Replace the custom-built analytics stack with a purpose-built Manufacturing Execution System (MES). Use Parsec TrakSYS ($1,999+/month) or Tulip ($1,000+/month) as the primary platform, which provides OEE tracking, scrap tracking, downtime management, quality workflows, and dashboards out of the box. Supplement with Power BI for executive reporting and retain KEPServerEX for legacy machine connectivity. Estimated Year 1 cost: $60,000–$100,000 (MES subscription + hardware + integration labor).
Option D: Existing SCADA Vendor Extension
If the client already has an Ignition SCADA system or Rockwell FactoryTalk deployment, extend the existing platform with analytics modules rather than deploying a separate analytics stack. For Ignition: add the Reporting Module ($2,400), Perspective Module ($3,800) for web dashboards, and SPC Module ($2,400). For FactoryTalk: add FactoryTalk Analytics Edge or FactoryTalk DataFlowML. Retain custom Python ML models for advanced root cause analysis. Estimated Year 1 cost: $20,000–$50,000 (module licenses + configuration labor).
Want early access to the full toolkit?