80 min readIntelligence & insights

Implementation Guide: Analyze production yield, scrap rates, and downtime to identify root causes

Step-by-step implementation guide for deploying AI to analyze production yield, scrap rates, and downtime to identify root causes for Manufacturing clients.

Hardware Procurement

...

Advantech EPC-R7300 Edge AI Computer

AdvantechEPC-R7300Qty: 1

$1,500–$2,500 per unit (MSP cost) / $3,000–$4,000 suggested resale (configured and installed)

Industrial-hardened fanless edge gateway deployed on the shop floor. Runs KEPServerEX for protocol translation (OPC UA, Modbus, EtherNet/IP to MQTT), local data buffering when cloud connectivity is intermittent, and lightweight edge inference for real-time anomaly detection. Rated for -20°C to 60°C operation in harsh manufacturing environments with dust, vibration, and temperature swings.

Dell PowerEdge T360 Tower Server

Dell TechnologiesPowerEdge T360Qty: 1

$3,500–$5,000 per unit (MSP cost, configured with 32GB RAM, 2x1TB NVMe RAID1, Intel Xeon E-2488) / $5,500–$7,500 suggested resale

On-premise analytics server hosting TimescaleDB for time-series data storage, Python ML model training and inference, local Grafana instance for shop-floor dashboards, and data pipeline orchestration. Sits in the client's server room or IT closet and acts as the primary data warehouse for all production metrics with 90-day hot retention.

ifm efector VVB001 Vibration Sensor

ifm efectorVVB001Qty: 8

$400–$495 per unit (MSP cost) / $700–$850 suggested resale per unit

IO-Link 3-axis vibration sensors (0–50g, 2–10,000 Hz) mounted on critical rotating equipment such as spindles, motors, pumps, and compressors that lack built-in monitoring. Provides vibration velocity and acceleration data for predictive maintenance and correlating machine health degradation with scrap rate increases.

Banner Engineering Wireless Vibration/Temperature Sensor Node

Banner EngineeringQM42VT2 (Sure Cross Wireless)Qty: 4

$400–$600 per node (MSP cost) / $700–$900 suggested resale per node

Wireless mesh vibration and temperature sensors for machines in locations where running IO-Link or Ethernet cabling is impractical. Up to 2-mile range via wireless mesh. Used to monitor auxiliary equipment (hydraulic units, cooling systems, air compressors) that influence yield and scrap but are often unmonitored.

Banner Engineering Wireless Gateway

Banner EngineeringDXM700 (Sure Cross Wireless Gateway)Qty: 1

$800–$1,200 (MSP cost) / $1,400–$1,800 suggested resale

Central wireless gateway that aggregates data from all Banner wireless sensor nodes and forwards it via Modbus TCP or MQTT to the edge gateway (Advantech EPC-R7300). Provides web-based configuration and diagnostics.

Cisco Catalyst IE3300 Rugged Industrial Switch

CiscoIE-3300-8T2S-EQty: 2

$1,200–$1,800 per unit (MSP cost) / $1,800–$2,800 suggested resale per unit

Industrial-rated managed Ethernet switches for the OT network VLAN. One switch connects PLCs and edge devices on the shop floor; the second sits at the IT/OT DMZ boundary. Supports VLAN segmentation, QoS for real-time data, and DIN-rail mounting in industrial enclosures.

Samsung Galaxy Tab Active5 Enterprise Edition

SamsungSM-X306BQty: 3

$450–$550 per unit (MSP cost) / $700–$900 suggested resale per unit

Ruggedized tablets mounted at workstations for operators to view real-time OEE dashboards, enter downtime reason codes, and review scrap alerts. IP68 rated, MIL-STD-810H compliant, suitable for shop floor environments with oil, dust, and vibration.

APC Smart-UPS 1500VA

APC by Schneider ElectricSMT1500RM2UCQty: 1

$600–$800 (MSP cost) / $900–$1,200 suggested resale

Rack-mount UPS protecting the Dell PowerEdge T360 analytics server and Advantech edge gateway from power fluctuations common in manufacturing environments. Provides 10–15 minutes of battery backup for graceful shutdown and prevents data corruption during power events.

Software Procurement

MachineMetrics Professional

MachineMetricsQty: 12 machines

$200–$400/machine/month; estimate $2,400–$4,800/month for 12 machines ($28,800–$57,600/year)

Cloud-native machine monitoring platform providing out-of-the-box connectivity to CNC machines, injection molders, and stamping presses. Automatically captures cycle times, utilization, OEE, downtime events, and part counts. Provides real-time dashboards and historical analytics. Primary data source for production yield and downtime analysis.

KEPServerEX Manufacturing Suite

PTC / KepwareManufacturing Suite

$2,875 one-time (Manufacturing Suite) + ~$575/year annual subscription; individual drivers from $452

OPC UA/DA server with 100+ industrial protocol drivers for bridging legacy PLCs (Modbus RTU/TCP, EtherNet/IP, PROFINET, FANUC, Mitsubishi) to modern MQTT and OPC UA. Critical for connecting older machines that MachineMetrics cannot natively reach. Includes IoT Gateway module for MQTT publishing to Azure IoT Hub.

Microsoft Power BI Pro

MicrosoftPer-seat SaaS (via Microsoft CSP)Qty: 8 users

$14/user/month; estimate 8 users = $112/month ($1,344/year)

Executive and management dashboards showing yield trends, scrap Pareto charts, downtime root cause analysis, shift-over-shift comparisons, and financial impact of production losses. Connects to Azure IoT Hub, TimescaleDB, and MachineMetrics APIs for unified reporting.

Azure IoT Hub (S1 Tier)

Microsoft AzureS1 TierQty: 2 units

$25/unit/month; 2 units = $50/month ($600/year) for up to 800K messages/day

Cloud message broker and device management for ingesting production data from the edge gateway. Provides device twins, message routing to Azure services, and secure TLS-encrypted communication. Routes data to Azure SQL Database and Azure Blob Storage for long-term analytics.

Azure SQL Database (General Purpose)

Microsoft AzureGeneral Purpose

$150–$300/month (2 vCores, 32GB storage) ($1,800–$3,600/year)

Cloud-hosted relational database storing aggregated KPIs, downtime event records, scrap summaries, and ML model outputs. Serves as the data warehouse for Power BI dashboards and long-term trend analysis (5+ year retention for compliance).

$20–$50/month for 500GB–1TB ($240–$600/year)

Cold storage for raw sensor telemetry, historical time-series data beyond 90-day on-premise retention, and ML training datasets. Provides cost-effective long-term retention for compliance requirements (ISO 9001, 21 CFR Part 11).

TimescaleDB Community Edition

TimescaleCommunity Edition

Free (self-hosted on Dell PowerEdge T360); optional Timescale Cloud from $29/month

High-performance time-series database deployed on the on-premise analytics server. Stores 90 days of hot sensor data (vibration, temperature, cycle times, power consumption) at 1-second to 1-minute resolution. Supports hypertables with automatic partitioning, continuous aggregates for dashboard queries, and native PostgreSQL compatibility.

Grafana OSS

Grafana Labs

Free (self-hosted on Dell PowerEdge T360)

Shop-floor real-time dashboards displayed on Samsung tablets at workstations. Shows live OEE gauges, current machine status, active alarms, and shift progress. Connects directly to TimescaleDB via native PostgreSQL data source. Lower latency than cloud-based Power BI for real-time monitoring.

Python (Anaconda Distribution)

AnacondaAnaconda Individual Edition

Free (Anaconda Individual Edition)

Runtime environment for custom ML models performing root cause analysis, anomaly detection, and scrap prediction. Key libraries: scikit-learn, pandas, numpy, prophet, shap, imbalanced-learn. Deployed on Dell PowerEdge T360 with scheduled model training via cron jobs.

Node-RED

OpenJS Foundation

Free (self-hosted on Advantech edge gateway)

Low-code data integration and transformation running on the edge gateway. Provides visual flow-based programming for bridging OPC UA data from KEPServerEX to MQTT topics, performing data normalization, applying threshold-based alerts, and handling protocol conversions for non-standard machines.

Free (self-hosted on Advantech edge gateway)

Lightweight MQTT message broker on the edge gateway that receives machine data from KEPServerEX IoT Gateway and Node-RED flows, then forwards to Azure IoT Hub. Provides local message buffering during cloud connectivity outages ensuring no data loss.

Prerequisites

  • Existing PLC infrastructure accessible via at least one standard industrial protocol (OPC UA, EtherNet/IP, PROFINET, Modbus TCP/RTU, or MTConnect). Document all PLC makes, models, and firmware versions during site survey.
  • Functioning ERP system (SAP Business One, Epicor Kinetic, Infor SyteLine, Microsoft Dynamics 365 Business Central, JobBOSS², or equivalent) with accessible database (ODBC/SQL) or REST API for work order, BOM, and cost data extraction.
  • Dedicated VLAN or ability to create a new VLAN on the plant floor network for OT device isolation. Network infrastructure must support 802.1Q VLAN tagging.
  • Gigabit Ethernet backbone on the shop floor with available switch ports near each PLC or machine controller. Minimum Cat5e cabling; Cat6 recommended for new runs.
  • Minimum 50 Mbps symmetrical internet uplink for cloud SaaS connectivity (MachineMetrics, Azure IoT Hub, Power BI). 100 Mbps recommended.
  • Server room or IT closet with adequate cooling, 120/240V power, and space for a 2U rack-mount server (Dell PowerEdge T360) and 1U UPS (APC SMT1500RM2UC).
  • Active Directory or Azure AD tenant for identity management and single sign-on across Power BI, Azure services, and MachineMetrics.
  • Microsoft 365 or Azure tenant with Global Admin access for provisioning Azure IoT Hub, Azure SQL Database, and Power BI Pro licenses via CSP.
  • Client-designated project stakeholders: plant manager or production supervisor (defines KPIs and downtime categories), maintenance lead (sensor placement and machine access), IT administrator (network access and firewall rules), and quality manager (scrap categories and compliance requirements).
  • Downtime reason code taxonomy agreed upon by client operations team before deployment. Minimum 15–30 standardized codes covering categories: mechanical failure, electrical failure, tooling, material, operator, quality hold, changeover, scheduled maintenance, no demand.
  • Physical access to all machines for sensor installation (ifm VVB001, Banner wireless nodes) during scheduled downtime windows. Coordinate with maintenance team for lockout/tagout procedures.
  • Baseline production data: minimum 30 days of historical production records (even if manual/spreadsheet-based) to validate initial KPI calculations against known values.

Installation Steps

...

Step 1: Site Survey and Data Source Audit

Conduct a comprehensive on-site audit of the manufacturing facility to document every machine, PLC, sensor, and software system that will feed into the analytics platform. Walk the entire production floor with the plant engineer and maintenance lead. For each machine, record: make/model, PLC type and firmware version, available communication protocols, existing sensors, network connectivity status, and current monitoring capabilities. Document the ERP system version, database type, and available APIs. Map the existing network topology including all VLANs, firewalls, and internet uplinks. Photograph PLC cabinets, network panels, and proposed sensor mounting locations. Identify machines that have no PLC or digital connectivity (candidates for retrofit sensors). Produce a Machine Connectivity Matrix spreadsheet as the primary deliverable.

  • Create Machine Connectivity Matrix template
  • Columns: Machine_ID, Machine_Name, Make_Model, PLC_Type, PLC_FW_Version, Protocol_Available, Network_Connected(Y/N), IP_Address, Sensors_Existing, Retrofit_Needed(Y/N), MachineMetrics_Compatible(Y/N), Notes
  • Save as: [ClientName]_Machine_Connectivity_Matrix_v1.xlsx
Note

Schedule this visit during a production shift so you can observe actual operations. Allow 4–8 hours depending on facility size. Bring a laptop with Nmap for network scanning and a Fluke LinkIQ cable tester. Do NOT connect any devices to the OT network during the survey — observation only. Request all PLC program documentation from the client in advance.

Step 2: Network Infrastructure — OT VLAN and IT/OT DMZ Setup

Configure the network infrastructure required to safely collect data from OT devices without exposing the control network. This follows the Purdue Model (IEC 62443) with a dedicated OT VLAN (Level 1-2), a DMZ VLAN (Level 3), and the existing corporate IT network (Level 4-5). Install the two Cisco IE3300 industrial switches. Switch 1 connects to PLCs and edge devices on the shop floor OT VLAN. Switch 2 sits at the DMZ boundary. Configure VLAN tagging, inter-VLAN routing restrictions, and firewall rules on the client's existing firewall to allow only outbound data flow from OT to IT/cloud (never inbound to OT).

Cisco IE3300 Switch 1 and Switch 2 IOS configuration plus firewall rule definitions for OT VLAN segmentation
cisco-ios
# Cisco IE3300 Switch 1 (Shop Floor OT VLAN) - Basic Configuration
enable
configure terminal
hostname IE3300-OT-FLOOR
vlan 100
  name OT-Production
  exit
vlan 200
  name DMZ-Analytics
  exit
interface range GigabitEthernet1/1-8
  switchport mode access
  switchport access vlan 100
  spanning-tree portfast
  exit
interface GigabitEthernet1/9
  description Uplink-to-DMZ-Switch
  switchport mode trunk
  switchport trunk allowed vlan 100,200
  exit
ip default-gateway 10.100.1.1
end
write memory

# Cisco IE3300 Switch 2 (DMZ Boundary) - Basic Configuration
enable
configure terminal
hostname IE3300-DMZ
vlan 200
  name DMZ-Analytics
  exit
interface GigabitEthernet1/1
  description Downlink-from-OT-Switch
  switchport mode trunk
  switchport trunk allowed vlan 100,200
  exit
interface GigabitEthernet1/2
  description Edge-Gateway-Advantech
  switchport mode access
  switchport access vlan 200
  exit
interface GigabitEthernet1/3
  description Analytics-Server-Dell
  switchport mode access
  switchport access vlan 200
  exit
interface GigabitEthernet1/9
  description Uplink-to-Corporate-Firewall
  switchport mode access
  switchport access vlan 200
  exit
end
write memory

# Firewall Rules (example for pfSense/Fortinet - adapt to client's firewall)
# Rule 1: Allow OT VLAN (10.100.1.0/24) -> DMZ VLAN (10.200.1.0/24) on ports 4840 (OPC UA), 1883 (MQTT), 502 (Modbus)
# Rule 2: Allow DMZ VLAN (10.200.1.0/24) -> Internet on ports 443 (HTTPS), 8883 (MQTTS to Azure IoT Hub)
# Rule 3: DENY ALL from Internet/Corporate -> OT VLAN
# Rule 4: DENY ALL from DMZ -> OT VLAN (unidirectional data flow only)
Critical

CRITICAL: Never allow any inbound connections from the corporate network or internet to the OT VLAN. Data must flow unidirectionally: OT -> DMZ -> Cloud. Use the client's existing firewall for inter-VLAN routing and ACLs. The Cisco IE3300 switches provide Layer 2 segmentation; Layer 3 filtering happens at the firewall. Coordinate with the plant engineer to schedule switch installation during a maintenance window — do NOT disrupt production network. Label all cables and ports clearly.

Step 3: Edge Gateway Deployment — Advantech EPC-R7300

Install and configure the Advantech EPC-R7300 edge AI computer in the DMZ network zone. This device runs KEPServerEX for industrial protocol translation, Eclipse Mosquitto as the local MQTT broker, and Node-RED for data integration flows. Mount it in an industrial enclosure or DIN-rail near the main PLC cabinet. Connect it to the DMZ switch (Cisco IE3300 Switch 2) via Ethernet. The edge gateway must have network access to PLCs on the OT VLAN (through firewall-controlled rules) and outbound internet access for Azure IoT Hub.

Edge gateway OS setup, Mosquitto MQTT broker installation, Node-RED installation with manufacturing palettes, and connectivity verification
bash
# 1. Physical installation
# Mount Advantech EPC-R7300 on DIN rail or in industrial enclosure
# Connect Ethernet to Cisco IE3300-DMZ port Gi1/2
# Connect power supply (24VDC industrial or included AC adapter)
# Assign static IP: 10.200.1.10/24, Gateway: 10.200.1.1, DNS: client's DNS servers

# 2. OS Setup (Ubuntu 22.04 LTS pre-installed or install via USB)
sudo apt update && sudo apt upgrade -y
sudo hostnamectl set-hostname edge-gw-prod

# 3. Install Eclipse Mosquitto MQTT Broker
sudo apt install -y mosquitto mosquitto-clients
sudo systemctl enable mosquitto

# Create Mosquitto config
sudo tee /etc/mosquitto/conf.d/production.conf << 'EOF'
listener 1883 0.0.0.0
allow_anonymous false
password_file /etc/mosquitto/passwd
persistence true
persistence_location /var/lib/mosquitto/
log_dest file /var/log/mosquitto/mosquitto.log
EOF

# Create MQTT user
sudo mosquitto_passwd -c /etc/mosquitto/passwd edgeuser
# Enter password when prompted (use strong password, document in password manager)
sudo systemctl restart mosquitto

# 4. Install Node-RED
sudo apt install -y nodejs npm
sudo npm install -g --unsafe-perm node-red
sudo npm install -g pm2
pm2 start node-red -- --userDir /home/admin/.node-red
pm2 startup systemd
pm2 save

# Install Node-RED manufacturing palettes
cd /home/admin/.node-red
npm install node-red-contrib-opcua
npm install node-red-contrib-mqtt-broker
npm install node-red-contrib-modbus
npm install node-red-dashboard
pm2 restart node-red

# 5. Install KEPServerEX (Windows VM or Wine, or use dedicated Windows IoT if Advantech ships with Windows)
# If Windows IoT Enterprise is installed:
# Download KEPServerEX installer from PTC/Kepware partner portal
# Install Manufacturing Suite with the following drivers based on site survey:
#   - Allen-Bradley EtherNet/IP
#   - Siemens TCP/IP Ethernet (S7-1200/1500)
#   - Modbus TCP/IP
#   - FANUC Focas
#   - Mitsubishi MC Protocol
#   - MTConnect Agent
#   - IoT Gateway (MQTT Client)
# Activate license with PTC-provided activation code

# 6. Verify connectivity
ping 10.100.1.0  # Test reach to OT VLAN (should work per firewall rules)
mosquitto_sub -h localhost -t 'test/#' -u edgeuser -P <password> &
mosquitto_pub -h localhost -t 'test/hello' -u edgeuser -P <password> -m 'Edge gateway online'
Note

If the Advantech EPC-R7300 ships with Windows IoT Enterprise (common for industrial PCs), install KEPServerEX natively. If it ships with Ubuntu, consider running KEPServerEX in a lightweight Windows VM via KVM, or deploy KEPServerEX on a separate small Windows PC and use the Advantech box exclusively for Mosquitto + Node-RED. Ensure NTP is configured to synchronize time with the same source as PLCs — time alignment is critical for correlating production events across systems. Set up automatic OS security updates.

Step 4: KEPServerEX Configuration — PLC and Machine Connectivity

Configure KEPServerEX on the edge gateway to connect to all PLCs and machine controllers identified in the site survey. Create channels and devices for each protocol, define tag groups for production-relevant data points (cycle counts, part counts, reject counts, machine state, alarms, temperatures, pressures, speeds), and configure the IoT Gateway module to publish all tags to the local Mosquitto MQTT broker in a standardized Sparkplug B or JSON format.

1
Create Channel: Allen-Bradley EtherNet/IP — Channel Name: AB_Line1, Driver: Allen-Bradley EtherNet/IP, Network Adapter: 10.200.1.10 (edge gateway IP)
2
Add Device under AB_Line1 — Device Name: Press01, IP Address: 10.100.1.101 (PLC IP on OT VLAN), Model: CompactLogix 5380, Scan Rate: 1000ms (1 second)
3
Add Tags for Press01: Press01.CycleCount (DINT, Read Only), Press01.PartCountGood (DINT, Read Only), Press01.PartCountReject (DINT, Read Only), Press01.MachineState (INT, Read Only) [0=Off, 1=Running, 2=Idle, 3=Fault, 4=Setup], Press01.CycleTimeActual_ms (REAL, Read Only), Press01.CycleTimeTarget_ms (REAL, Read Only), Press01.ActiveFaultCode (INT, Read Only), Press01.SpindleSpeed_RPM (REAL, Read Only), Press01.ToolID (STRING, Read Only), Press01.Temperature_C (REAL, Read Only)
4
Repeat for all machines (create appropriate channels per protocol) — Siemens S7-1500: Driver = Siemens TCP/IP Ethernet; FANUC CNC: Driver = FANUC Focas; Modbus devices: Driver = Modbus TCP/IP
5
Configure IoT Gateway Module — Agent Type: MQTT Client, Broker URL: mqtt://localhost:1883, Username: edgeuser, Password: (from Mosquitto setup), Topic Structure: kepware/{channel}/{device}/{tag}, Publish Rate: 1000ms (1 second for critical tags), Data Format: JSON, Publish on Data Change: Enabled, Store and Forward: Enabled (buffer if MQTT broker unavailable)
Test connectivity via Mosquitto CLI
bash
# expected output: kepware/AB_Line1/Press01/CycleCount 4523 |
# kepware/AB_Line1/Press01/MachineState 1 |
# kepware/AB_Line1/Press01/PartCountGood 4501 |
# kepware/AB_Line1/Press01/PartCountReject 22

mosquitto_sub -h localhost -t 'kepware/#' -u edgeuser -P <password> -v
Note

Tag naming conventions are critical — establish a standard early and enforce it. Recommended: {Line}_{Machine}_{Parameter}_{Unit}. Example: Line1_Press01_CycleTime_ms. Set scan rates based on data criticality: 1s for machine state and cycle counts, 5–10s for temperatures and pressures, 60s for slow-changing parameters. Enable KEPServerEX store-and-forward to prevent data loss during network hiccups. Document all tag mappings in a Tag Dictionary spreadsheet that maps PLC addresses to KEPServerEX tag names to MQTT topics.

Step 5: MachineMetrics Deployment — Cloud Machine Monitoring

Deploy MachineMetrics on CNC machines and other supported equipment for automated, cloud-native machine monitoring. MachineMetrics provides plug-and-play adapters for most CNC controls (FANUC, Haas, Mazak, DMG Mori, Siemens SINUMERIK) and connects directly to the machine controller without PLC modification. Install MachineMetrics Edge devices on each compatible machine, connect them to the DMZ network, and configure the MachineMetrics cloud portal with machine profiles, shift schedules, and downtime reason codes.

1
Physical Installation (per machine): a) Mount MachineMetrics Edge device near machine controller, b) Connect Ethernet cable from Edge device to Cisco IE3300-DMZ switch, c) Connect machine interface cable (RS-232, Ethernet, FOCAS, MTConnect), d) Power on Edge device — it will auto-register with MachineMetrics cloud
2
MachineMetrics Cloud Portal Configuration (https://app.machinemetrics.com): a) Admin > Machines > Add Machine, b) Enter: Machine Name, Make, Model, Control Type, c) Associate Edge device serial number, d) Configure Expected Cycle Time per part program, e) Set Ideal Cycle Time for OEE calculation
3
Configure Shift Schedule: a) Admin > Shifts > Add Shift, b) Define: Shift 1 (6:00 AM – 2:30 PM), Shift 2 (2:30 PM – 11:00 PM), Shift 3 (11:00 PM – 6:00 AM), c) Define break times, planned downtime windows
4
Configure Downtime Reason Codes: a) Admin > Downtime Categories, b) Add categories matching the client-approved taxonomy: Mechanical Failure (subcodes: bearing, hydraulic, pneumatic, spindle), Electrical Failure (subcodes: drive, sensor, wiring, power), Tooling (subcodes: tool break, tool wear, tool change), Material (subcodes: material defect, wrong material, material shortage), Operator (subcodes: setup error, programming error, training), Quality Hold (subcodes: dimension OOT, surface finish, visual defect), Changeover (subcodes: setup, first article, program change), Scheduled Maintenance (subcodes: PM, calibration, cleaning), No Demand (subcodes: no order, scheduling gap)
5
Configure Alerts: a) Admin > Alerts, b) Machine Down > 15 minutes: notify production supervisor via email + SMS, c) Reject Rate > 5% in rolling 1-hour window: notify quality manager, d) OEE < 50% for any machine in current shift: notify plant manager
MachineMetrics API — fetch machine list and production data
bash
# MachineMetrics API — for custom data extraction:
curl -X GET 'https://api.machinemetrics.com/v1/machines' \
  -H 'Authorization: Bearer <API_KEY>' \
  -H 'Content-Type: application/json'

# Fetch production data for a specific machine and date range:
curl -X GET 'https://api.machinemetrics.com/v1/report/production?machineId=<ID>&startDate=2025-01-01&endDate=2025-01-31' \
  -H 'Authorization: Bearer <API_KEY>'
Note

MachineMetrics charges per connected machine, so prioritize the most critical and highest-value machines first. Start with CNC machines (highest compatibility) and expand to other equipment types. The MachineMetrics Edge device requires outbound HTTPS (port 443) to *.machinemetrics.com — ensure the firewall allows this from the DMZ VLAN. MachineMetrics provides an API for extracting data programmatically — we will use this in later steps to feed data into the custom ML pipeline. Operator downtime reason code entry is critical for root cause analysis — work with the plant manager to enforce compliance.

Step 6: On-Premise Analytics Server Setup — Dell PowerEdge T360

Install and configure the Dell PowerEdge T360 as the on-premise analytics server. This server runs TimescaleDB for time-series data, Grafana for shop-floor dashboards, Python for ML models, and acts as the local data processing hub. Install Ubuntu Server 22.04 LTS, configure RAID1 for the two NVMe drives, set up remote management via iDRAC, and install all required software components.

  • Physical Setup: Rack-mount Dell PowerEdge T360 in client server room
  • Connect to APC UPS via USB for graceful shutdown on power loss
  • Connect Ethernet to Cisco IE3300-DMZ switch port
  • Assign static IP: 10.200.1.20/24, Gateway: 10.200.1.1
  • Configure iDRAC remote management: 10.200.1.21 (separate NIC)
  • OS Installation: Install Ubuntu Server 22.04 LTS from USB
  • Configure RAID1 (mirror) across two NVMe drives via PERC controller in BIOS
  • Create user: mspadmin (add to sudo group)
  • Enable SSH: sudo apt install openssh-server
Enable UFW firewall and open required ports
bash
sudo ufw allow ssh
sudo ufw allow 3000/tcp   # Grafana
sudo ufw allow 5432/tcp   # TimescaleDB (from edge gateway and Power BI only)
sudo ufw allow 8888/tcp   # Jupyter Lab (for ML development, restrict to MSP IPs)
sudo ufw enable
Install TimescaleDB
bash
sudo apt install -y gnupg postgresql-common apt-transport-https lsb-release wget
sudo /usr/share/postgresql-common/pgdg/apt.postgresql.org.sh -y
echo "deb https://packagecloud.io/timescale/timescaledb/ubuntu/ $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/timescaledb.list
wget --quiet -O - https://packagecloud.io/timescale/timescaledb/gpgkey | sudo gpg --dearmor -o /etc/apt/trusted.gpg.d/timescaledb.gpg
sudo apt update
sudo apt install -y timescaledb-2-postgresql-16
sudo timescaledb-tune --quiet --yes
sudo systemctl restart postgresql
Create production database and application user
bash
sudo -u postgres psql -c "CREATE DATABASE production_analytics;"
sudo -u postgres psql -d production_analytics -c "CREATE EXTENSION IF NOT EXISTS timescaledb;"
sudo -u postgres psql -c "CREATE USER analytics_app WITH PASSWORD 'CHANGE_THIS_STRONG_PASSWORD';"
sudo -u postgres psql -c "GRANT ALL PRIVILEGES ON DATABASE production_analytics TO analytics_app;"
Create TimescaleDB schema
sql
-- hypertables for telemetry, production events, downtime, OEE, and scrap

sudo -u postgres psql -d production_analytics << 'EOSQL'
-- Machine telemetry (high-frequency sensor data)
CREATE TABLE machine_telemetry (
    time        TIMESTAMPTZ NOT NULL,
    machine_id  TEXT NOT NULL,
    metric_name TEXT NOT NULL,
    value       DOUBLE PRECISION,
    unit        TEXT
);
SELECT create_hypertable('machine_telemetry', 'time');
CREATE INDEX idx_telemetry_machine ON machine_telemetry (machine_id, time DESC);

-- Production events (cycle completions, rejects, state changes)
CREATE TABLE production_events (
    time            TIMESTAMPTZ NOT NULL,
    machine_id      TEXT NOT NULL,
    event_type      TEXT NOT NULL,  -- 'cycle_complete', 'reject', 'state_change', 'alarm'
    part_number     TEXT,
    work_order      TEXT,
    cycle_time_ms   INTEGER,
    good_count      INTEGER DEFAULT 0,
    reject_count    INTEGER DEFAULT 0,
    reject_reason   TEXT,
    machine_state   TEXT,
    fault_code      TEXT,
    operator_id     TEXT
);
SELECT create_hypertable('production_events', 'time');
CREATE INDEX idx_events_machine ON production_events (machine_id, time DESC);
CREATE INDEX idx_events_type ON production_events (event_type, time DESC);

-- Downtime events
CREATE TABLE downtime_events (
    time_start      TIMESTAMPTZ NOT NULL,
    time_end        TIMESTAMPTZ,
    machine_id      TEXT NOT NULL,
    duration_min    DOUBLE PRECISION,
    reason_code     TEXT,
    reason_category TEXT,
    reason_detail   TEXT,
    shift           TEXT,
    operator_id     TEXT,
    work_order      TEXT,
    notes           TEXT
);
SELECT create_hypertable('downtime_events', 'time_start');

-- OEE summary (continuous aggregate)
CREATE TABLE oee_hourly (
    time_bucket     TIMESTAMPTZ NOT NULL,
    machine_id      TEXT NOT NULL,
    availability    DOUBLE PRECISION,
    performance     DOUBLE PRECISION,
    quality         DOUBLE PRECISION,
    oee             DOUBLE PRECISION,
    good_count      INTEGER,
    reject_count    INTEGER,
    planned_time_min DOUBLE PRECISION,
    run_time_min    DOUBLE PRECISION
);
SELECT create_hypertable('oee_hourly', 'time_bucket');

-- Scrap summary
CREATE TABLE scrap_events (
    time            TIMESTAMPTZ NOT NULL,
    machine_id      TEXT NOT NULL,
    part_number     TEXT,
    work_order      TEXT,
    quantity         INTEGER,
    scrap_reason    TEXT,
    scrap_category  TEXT,
    material_cost   DOUBLE PRECISION,
    labor_cost      DOUBLE PRECISION,
    operator_id     TEXT
);
SELECT create_hypertable('scrap_events', 'time_start');

-- Data retention policies
SELECT add_retention_policy('machine_telemetry', INTERVAL '90 days');
SELECT add_retention_policy('production_events', INTERVAL '365 days');
-- downtime_events and oee_hourly retained indefinitely for trend analysis
EOSQL
Install Grafana OSS
bash
sudo apt install -y apt-transport-https software-properties-common wget
sudo mkdir -p /etc/apt/keyrings/
wget -q -O - https://apt.grafana.com/gpg.key | gpg --dearmor | sudo tee /etc/apt/keyrings/grafana.gpg > /dev/null
echo "deb [signed-by=/etc/apt/keyrings/grafana.gpg] https://apt.grafana.com stable main" | sudo tee /etc/apt/sources.list.d/grafana.list
sudo apt update
sudo apt install -y grafana
sudo systemctl enable grafana-server
sudo systemctl start grafana-server
# Grafana accessible at http://10.200.1.20:3000 (default admin/admin, change immediately)
Install Python ML environment
bash
sudo apt install -y python3 python3-pip python3-venv
python3 -m venv /opt/analytics/ml-env
source /opt/analytics/ml-env/bin/activate
pip install --upgrade pip
pip install pandas numpy scikit-learn scipy statsmodels prophet shap imbalanced-learn psycopg2-binary sqlalchemy matplotlib seaborn jupyter-lab requests schedule
deactivate
Install and configure Chrony for NTP time synchronization
bash
sudo apt install -y chrony
sudo tee /etc/chrony/chrony.conf << 'EOF'
server time.windows.com iburst
server pool.ntp.org iburst
makestep 1.0 3
rtcsync
EOF
sudo systemctl restart chrony
Configure automatic security updates
bash
sudo apt install -y unattended-upgrades
sudo dpkg-reconfigure -plow unattended-upgrades
Note

RAID1 is essential — a single drive failure should not take down the analytics platform. Configure iDRAC email alerts to the MSP monitoring system. TimescaleDB retention policies automatically purge raw telemetry after 90 days to manage disk usage — aggregated data is retained indefinitely. Set PostgreSQL max_connections to 100 and shared_buffers to 8GB (25% of 32GB RAM) for optimal performance. Schedule weekly pg_dump backups to Azure Blob Storage for disaster recovery.

Step 7: Data Pipeline — Edge to On-Premise to Cloud

Build the data pipeline that flows production data from machines through the edge gateway to both the on-premise TimescaleDB and the Azure cloud. Node-RED on the edge gateway subscribes to MQTT topics from KEPServerEX, normalizes the data, writes to TimescaleDB via PostgreSQL, and forwards to Azure IoT Hub for cloud analytics. Additionally, configure a Python script to pull data from the MachineMetrics API and merge it into the same TimescaleDB schema.

1
Node-RED Flow Configuration (access at http://10.200.1.10:1880) — Import the following flow JSON into Node-RED: Flow: MQTT-to-TimescaleDB — MQTT In node: broker=localhost:1883, topic=kepware/#, QoS=1 — Function node: Parse JSON payload, extract machine_id from topic, add timestamp — PostgreSQL node: INSERT INTO machine_telemetry (time, machine_id, metric_name, value, unit) VALUES ($1, $2, $3, $4, $5) — MQTT Out node: broker=<Azure IoT Hub hostname>, topic=devices/<device-id>/messages/events/, TLS enabled
2
Azure IoT Hub Device Registration
3
Configure Azure IoT Hub message routing: Route 1: All telemetry -> Azure SQL Database (via Azure Stream Analytics or Azure Functions) — Route 2: Alert-level events -> Azure Service Bus -> Email/SMS notifications
4
MachineMetrics API Data Sync Script — Deploy on Dell PowerEdge T360: /opt/analytics/scripts/mm_sync.py
5
Set up cron job for MachineMetrics sync
6
Set up cron job for ERP work order sync — Similar script pulling work orders, BOMs, and cost data from ERP API — Schedule: every 15 minutes
Azure IoT Hub device registration commands
bash
az iot hub device-identity create --hub-name <iot-hub-name> --device-id edge-gateway-prod
az iot hub device-identity connection-string show --hub-name <iot-hub-name> --device-id edge-gateway-prod
MachineMetrics API sync script
python
# deploy to /opt/analytics/scripts/mm_sync.py on Dell PowerEdge T360

cat > /opt/analytics/scripts/mm_sync.py << 'PYEOF'
#!/usr/bin/env python3
"""Sync MachineMetrics data to local TimescaleDB every 5 minutes."""
import requests
import psycopg2
from datetime import datetime, timedelta
import json
import os
import logging

logging.basicConfig(level=logging.INFO, filename='/var/log/analytics/mm_sync.log')
logger = logging.getLogger(__name__)

MM_API_KEY = os.environ.get('MM_API_KEY', 'YOUR_API_KEY')
MM_BASE_URL = 'https://api.machinemetrics.com/v1'
DB_CONN = os.environ.get('DB_CONN', 'host=localhost dbname=production_analytics user=analytics_app password=CHANGE_THIS')

def sync_production_data():
    """Fetch last 10 minutes of production data from MachineMetrics."""
    conn = psycopg2.connect(DB_CONN)
    cur = conn.cursor()
    
    end_time = datetime.utcnow()
    start_time = end_time - timedelta(minutes=10)
    
    headers = {'Authorization': f'Bearer {MM_API_KEY}', 'Content-Type': 'application/json'}
    
    # Fetch machines
    resp = requests.get(f'{MM_BASE_URL}/machines', headers=headers)
    machines = resp.json()
    
    for machine in machines:
        machine_id = machine.get('id')
        machine_name = machine.get('name', f'machine_{machine_id}')
        
        # Fetch production metrics
        params = {
            'machineId': machine_id,
            'startDate': start_time.isoformat() + 'Z',
            'endDate': end_time.isoformat() + 'Z'
        }
        
        try:
            prod_resp = requests.get(f'{MM_BASE_URL}/report/production', headers=headers, params=params)
            if prod_resp.status_code == 200:
                data = prod_resp.json()
                for record in data.get('data', []):
                    cur.execute(
                        """INSERT INTO production_events 
                        (time, machine_id, event_type, good_count, reject_count, cycle_time_ms) 
                        VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING""",
                        (record.get('timestamp'), machine_name, 'cycle_complete',
                         record.get('goodCount', 0), record.get('rejectCount', 0),
                         record.get('cycleTime'))
                    )
        except Exception as e:
            logger.error(f'Error syncing machine {machine_name}: {e}')
    
    conn.commit()
    cur.close()
    conn.close()
    logger.info(f'Sync completed at {datetime.utcnow()}')

if __name__ == '__main__':
    sync_production_data()
PYEOF
Cron job setup for MachineMetrics sync (every 5 min) and ERP sync (every 15 min)
bash
chmod +x /opt/analytics/scripts/mm_sync.py
(crontab -l 2>/dev/null; echo '*/5 * * * * /opt/analytics/ml-env/bin/python /opt/analytics/scripts/mm_sync.py') | crontab -
(crontab -l 2>/dev/null; echo '*/15 * * * * /opt/analytics/ml-env/bin/python /opt/analytics/scripts/erp_sync.py') | crontab -
Note

The Node-RED flow is the critical real-time pipeline; ensure it has error handling for database connection failures with automatic retry. The MachineMetrics API sync runs every 5 minutes as a batch supplement — this captures data from machines connected directly to MachineMetrics that bypass KEPServerEX. For the ERP sync script, you'll need to customize it based on the client's specific ERP system and available APIs. Store all API keys and database passwords in environment variables, never hardcoded. Monitor the cron jobs with a simple health check that alerts the MSP if a sync hasn't run in 15 minutes.

Step 8: Grafana Dashboard Configuration — Shop Floor Real-Time Displays

Configure Grafana on the Dell PowerEdge T360 with production dashboards designed for three audiences: (1) operators on the shop floor viewing real-time machine status on Samsung tablets, (2) production supervisors monitoring shift performance, and (3) plant managers reviewing daily/weekly trends. Connect Grafana to TimescaleDB as the primary data source and create dashboard templates.

1
Add TimescaleDB as Grafana data source — Access Grafana at http://10.200.1.20:3000 → Configuration > Data Sources > Add PostgreSQL. Set Host: localhost:5432, Database: production_analytics, User: analytics_app, Password: (from setup), TLS/SSL Mode: disable (local connection), TimescaleDB: toggle ON
2
Import pre-built manufacturing dashboard templates — Download from Grafana Dashboard Library: OEE Dashboard (ID: 16919 or custom), Machine Status Overview, Downtime Pareto
3
Create custom dashboard: Production Overview with 5 panels (see queries below)
4
Configure alerts in Grafana: Machine down > 15 minutes | Scrap rate > 5% in rolling 1-hour window | OEE < 50% for any machine. Notification channel: Email to production supervisor + webhook to MSP monitoring
5
Configure Samsung tablets for kiosk mode — Set Chrome kiosk URL: http://10.200.1.20:3000/d/<dashboard-uid>?orgId=1&kiosk, Auto-refresh: 10 seconds, Use Grafana anonymous auth for shop floor displays
Panel 1: Machine Status Grid
sql
-- latest state per machine (Stat panel)

SELECT machine_id, 
  CASE WHEN machine_state = 'Running' THEN 1 WHEN machine_state = 'Idle' THEN 0.5 ELSE 0 END as status
FROM (
  SELECT DISTINCT ON (machine_id) machine_id, machine_state
  FROM production_events 
  WHERE event_type = 'state_change' AND time > now() - interval '5 minutes'
  ORDER BY machine_id, time DESC
) latest_states;
Panel 2: Real-Time OEE Gauge
sql
-- rolling 8-hour OEE per machine (Gauge panel)

SELECT time_bucket('1 hour', time_bucket) as time, machine_id, oee * 100 as oee_pct
FROM oee_hourly
WHERE time_bucket > now() - interval '8 hours'
ORDER BY time;
Panel 3: Scrap Rate Trend
sql
-- hourly scrap percentage over 24 hours (Time series panel)

SELECT time_bucket('1 hour', time) as time,
  (SUM(reject_count)::float / NULLIF(SUM(good_count) + SUM(reject_count), 0)) * 100 as scrap_pct
FROM production_events
WHERE event_type = 'cycle_complete' AND time > now() - interval '24 hours'
GROUP BY 1 ORDER BY 1;
Panel 4: Downtime Pareto
sql
-- top 10 downtime reasons over 7 days (Bar chart panel)

SELECT reason_category, SUM(duration_min) as total_minutes
FROM downtime_events
WHERE time_start > now() - interval '7 days'
GROUP BY reason_category
ORDER BY total_minutes DESC
LIMIT 10;
Panel 5: Top Scrap Reasons
sql
-- scrap quantity by category over 7 days (Pie chart panel)

SELECT scrap_category, SUM(quantity) as total_scrap
FROM scrap_events
WHERE time > now() - interval '7 days'
GROUP BY scrap_category
ORDER BY total_scrap DESC;
Enable Grafana anonymous auth for shop floor kiosk displays and restart service
bash
sudo tee -a /etc/grafana/grafana.ini << 'EOF'
[auth.anonymous]
enabled = true
org_name = Main Org.
org_role = Viewer
EOF
sudo systemctl restart grafana-server
Note

Design shop-floor dashboards with large fonts, high contrast, and minimal text — operators glance at them from 10+ feet away. Use color coding: green = running, yellow = idle, red = down. The Samsung Galaxy Tab Active5 in kiosk mode prevents operators from navigating away from the dashboard. Create separate dashboards for each audience — operators need real-time status, supervisors need shift summaries, managers need weekly trends. Set Grafana auto-refresh to 10 seconds for shop-floor displays and 5 minutes for management dashboards.

Step 9: Power BI Dashboard Configuration — Executive Analytics

Configure Power BI Pro dashboards for management-level analytics connecting to Azure SQL Database (cloud data) and optionally to the on-premise TimescaleDB via Power BI Gateway. Build executive reports showing yield trends, scrap cost analysis, downtime root cause Pareto charts, and OEE benchmarking across machines and lines. These dashboards focus on business impact (dollars lost, trends over weeks/months) rather than real-time machine status.

1
Install Power BI Gateway (on Dell PowerEdge T360): Download from https://powerbi.microsoft.com/en-us/gateway/ — Install in Standard mode, register with client's Power BI tenant, configure data source: PostgreSQL → localhost:5432/production_analytics
2
Power BI Desktop — Create Production Analytics Report. Data connections: Azure SQL Database (cloud aggregated data), PostgreSQL via Gateway (on-premise TimescaleDB), MachineMetrics API (DirectQuery via custom connector or import)
3
Key Report Pages — Page 1: Executive Summary: OEE trend (line chart, weekly, by machine/line), Yield % trend (line chart, weekly), Total scrap cost (card, MTD and YTD), Downtime hours (card, MTD), Top 5 downtime reasons (bar chart), Top 5 scrap reasons (bar chart)
4
Key Report Pages — Page 2: Downtime Root Cause Analysis: Downtime Pareto by reason category (bar + cumulative line), Downtime by machine (stacked bar), Downtime by shift (grouped bar), Downtime trend over time (line chart), Drill-through to individual downtime events
5
Key Report Pages — Page 3: Scrap & Yield Analysis: Scrap Pareto by reason (bar + cumulative line), Scrap by part number (table with conditional formatting), Scrap by machine (bar chart), Scrap cost by material type (pie chart), First-pass yield trend by product family
6
Key Report Pages — Page 4: Machine Performance Benchmarking: OEE comparison across all machines (bar chart), Availability vs Performance vs Quality decomposition, Best/worst performing machines (table), Cycle time actual vs target (scatter plot)
7
Key Report Pages — Page 5: ML Insights (connected to Python model outputs): Predicted scrap risk for current production runs, Anomaly detection alerts (table with severity), Root cause correlation matrix (heatmap), Recommended actions from ML model
DAX Measures
dax
# Key OEE and scrap cost calculations for Power BI

OEE = Availability * Performance * Quality
Availability = Run Time / Planned Production Time
Performance = (Ideal Cycle Time * Total Count) / Run Time
Quality = Good Count / Total Count
Scrap Rate = Reject Count / Total Count
Scrap Cost = SUM(ScrapEvents[quantity] * ScrapEvents[material_cost])
MTBF = Total Operating Time / Number of Failures
MTTR = Total Repair Time / Number of Failures
1
Publish report to client's Power BI workspace
2
Configure scheduled refresh: every 2 hours via Gateway
3
Share with executives and plant management via Power BI app
4
Configure email subscriptions for daily OEE summary
Note

Power BI Pro requires Microsoft 365 licenses for each user who needs interactive access. Read-only consumers can use Power BI Free if reports are published to a Premium workspace. Schedule data refreshes during low-production times (early morning) to minimize load on TimescaleDB. The Power BI Gateway must be installed on the Dell T360 for on-premise data access — it runs as a Windows service so you'll need a lightweight Windows VM or install Power BI Gateway on a separate small Windows PC. For DAX measures, work with the client's financial team to get accurate material costs and labor rates for scrap cost calculations.

Step 10: Vibration Sensor Installation — ifm VVB001 and Banner Wireless

Install vibration and temperature sensors on critical machines that lack built-in condition monitoring. The ifm VVB001 IO-Link sensors are hardwired to machines with PLC connectivity; the Banner Engineering wireless nodes are deployed on auxiliary equipment. These sensors provide the data needed for predictive maintenance ML models that correlate machine health degradation with increasing scrap rates and unplanned downtime.

ifm VVB001 and Banner Wireless sensor installation, configuration, and data pipeline verification
bash
# 1. ifm VVB001 Installation (8 sensors on critical machines)
# Physical mounting:
#   - Clean mounting surface with isopropyl alcohol
#   - Apply thin film of coupling grease
#   - Use M8 stud mount (included) directly on bearing housing
#   - Torque to 5 Nm
#   - Route M12 cable to nearest PLC I/O module or IO-Link master

# IO-Link Configuration (via ifm moneo or PLC):
#   - Set measurement mode: Velocity RMS (mm/s) + Acceleration Peak (g)
#   - Set sampling rate: 1 Hz continuous
#   - Set alarm thresholds per ISO 10816:
#     * Good: < 2.8 mm/s (Zone A)
#     * Satisfactory: 2.8 - 7.1 mm/s (Zone B)
#     * Unsatisfactory: 7.1 - 18.0 mm/s (Zone C)
#     * Unacceptable: > 18.0 mm/s (Zone D) -> ALARM

# PLC Tag Mapping (add to KEPServerEX):
#   {Machine}_Vibration_Velocity_mms
#   {Machine}_Vibration_Acceleration_g
#   {Machine}_Temperature_C
#   {Machine}_Vibration_Alarm (BOOL)

# 2. Banner Engineering Wireless Sensor Installation (4 nodes)
# Physical mounting:
#   - Mount sensor on bearing housing or motor casing using supplied adhesive mount or M6 stud
#   - Orient sensor with arrow pointing toward primary vibration axis
#   - Ensure line-of-sight to DXM700 gateway (or within mesh range)

# DXM700 Gateway Configuration:
#   - Access web interface at assigned IP
#   - Add each wireless node by serial number
#   - Configure reporting interval: 60 seconds
#   - Configure output: Modbus TCP register map
#   - Assign Modbus registers:
#     Register 40001-40002: Node 1 Vibration (FLOAT32)
#     Register 40003-40004: Node 1 Temperature (FLOAT32)
#     Register 40005-40006: Node 2 Vibration (FLOAT32)
#     ...

# 3. Add Banner Modbus data to KEPServerEX:
#   Channel: Modbus_Wireless
#   Driver: Modbus TCP/IP
#   Device: BannerGW, IP: <DXM700 IP>, Port: 502
#   Tags: Map all holding registers to named tags

# 4. Verify sensor data flowing through pipeline:
mosquitto_sub -h 10.200.1.10 -t 'kepware/Modbus_Wireless/#' -u edgeuser -P <password> -v
# Expected: vibration and temperature values updating every 60 seconds

# Verify in TimescaleDB:
sudo -u postgres psql -d production_analytics -c "SELECT * FROM machine_telemetry WHERE metric_name LIKE '%Vibration%' ORDER BY time DESC LIMIT 10;"
Note

Sensor placement is critical — mount on the bearing housing as close to the bearing as possible, not on sheet metal covers or guards. For CNC machines, mount on the spindle bearing housing. For pumps, mount on the drive-end and non-drive-end bearings. Take baseline vibration readings immediately after installation when the machine is known to be in good condition — these baselines are essential for the ML models. The ifm VVB001 requires an IO-Link master or 4–20mA analog input on the PLC. If the PLC has no available I/O, use an ifm AL1100 IO-Link master connected via Ethernet to KEPServerEX. Document sensor locations with photographs for the maintenance team.

Step 11: ERP Integration — Work Order and Cost Data Sync

Build the integration between the client's ERP system and the analytics platform to enrich production data with work order context, bill of materials (BOM), part costs, and customer information. This enables calculating the financial impact of scrap and downtime, and correlating yield issues with specific products, materials, or suppliers. The specific integration method depends on the client's ERP; this step covers the most common scenarios.

ERP integration script template covering REST API and ODBC/SQL connection options, TimescaleDB table creation, and cron scheduling
bash
# ERP Integration Script Template
# Deploy on Dell PowerEdge T360: /opt/analytics/scripts/erp_sync.py

cat > /opt/analytics/scripts/erp_sync.py << 'PYEOF'
#!/usr/bin/env python3
"""
ERP Data Sync - Pulls work orders, BOM, and cost data into production_analytics DB.
Customize the fetch_from_erp() function for the client's specific ERP system.
"""
import psycopg2
import requests
import pyodbc  # For ODBC connections to ERP databases
import os
import logging
from datetime import datetime, timedelta

logging.basicConfig(level=logging.INFO, filename='/var/log/analytics/erp_sync.log')
logger = logging.getLogger(__name__)

DB_CONN = os.environ.get('DB_CONN', 'host=localhost dbname=production_analytics user=analytics_app password=CHANGE_THIS')

# ===== OPTION A: REST API (for Epicor Kinetic, Dynamics 365, NetSuite) =====
def fetch_work_orders_api():
    """Fetch active work orders from ERP REST API."""
    erp_url = os.environ.get('ERP_API_URL', 'https://erp.client.com/api/v1')
    erp_key = os.environ.get('ERP_API_KEY', 'YOUR_KEY')
    headers = {'Authorization': f'Bearer {erp_key}', 'Content-Type': 'application/json'}
    
    # Adjust endpoint for specific ERP
    resp = requests.get(f'{erp_url}/workorders?status=active&modified_after={datetime.utcnow() - timedelta(hours=1)}', headers=headers)
    return resp.json().get('data', [])

# ===== OPTION B: ODBC/SQL (for JobBOSS2, E2 Shop, SAP Business One) =====
def fetch_work_orders_sql():
    """Fetch work orders directly from ERP database via ODBC."""
    conn_str = os.environ.get('ERP_ODBC', 'DRIVER={ODBC Driver 18 for SQL Server};SERVER=erp-server;DATABASE=erp_db;UID=readonly;PWD=CHANGE_THIS')
    conn = pyodbc.connect(conn_str)
    cursor = conn.cursor()
    cursor.execute("""
        SELECT WorkOrderNumber, PartNumber, PartDescription, OrderQty, CompletedQty, 
               ScrapQty, MaterialCost, LaborRate, CustomerName, DueDate, Status
        FROM WorkOrders 
        WHERE Status IN ('Active', 'In Progress') 
          AND ModifiedDate > DATEADD(hour, -1, GETDATE())
    """)
    rows = cursor.fetchall()
    conn.close()
    return rows

def sync_to_timescaledb(work_orders):
    """Write work order data to local analytics database."""
    conn = psycopg2.connect(DB_CONN)
    cur = conn.cursor()
    
    for wo in work_orders:
        cur.execute("""
            INSERT INTO work_orders (
                work_order, part_number, part_description, order_qty, 
                completed_qty, scrap_qty, material_cost_per_unit, 
                labor_rate_per_hour, customer_name, due_date, status, last_synced
            ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW())
            ON CONFLICT (work_order) DO UPDATE SET
                completed_qty = EXCLUDED.completed_qty,
                scrap_qty = EXCLUDED.scrap_qty,
                status = EXCLUDED.status,
                last_synced = NOW()
        """, (
            wo.get('work_order'), wo.get('part_number'), wo.get('part_description'),
            wo.get('order_qty'), wo.get('completed_qty'), wo.get('scrap_qty'),
            wo.get('material_cost'), wo.get('labor_rate'), wo.get('customer_name'),
            wo.get('due_date'), wo.get('status')
        ))
    
    conn.commit()
    cur.close()
    conn.close()
    logger.info(f'Synced {len(work_orders)} work orders at {datetime.utcnow()}')

if __name__ == '__main__':
    try:
        # Use API or SQL depending on ERP type
        work_orders = fetch_work_orders_api()  # or fetch_work_orders_sql()
        sync_to_timescaledb(work_orders)
    except Exception as e:
        logger.error(f'ERP sync failed: {e}')
PYEOF

# Create work_orders table in TimescaleDB
sudo -u postgres psql -d production_analytics << 'EOSQL'
CREATE TABLE IF NOT EXISTS work_orders (
    work_order      TEXT PRIMARY KEY,
    part_number     TEXT,
    part_description TEXT,
    order_qty       INTEGER,
    completed_qty   INTEGER,
    scrap_qty       INTEGER,
    material_cost_per_unit DOUBLE PRECISION,
    labor_rate_per_hour DOUBLE PRECISION,
    customer_name   TEXT,
    due_date        DATE,
    status          TEXT,
    last_synced     TIMESTAMPTZ
);
CREATE INDEX idx_wo_part ON work_orders (part_number);
CREATE INDEX idx_wo_status ON work_orders (status);
EOSQL

# Schedule ERP sync every 15 minutes
(crontab -l 2>/dev/null; echo '*/15 * * * * /opt/analytics/ml-env/bin/python /opt/analytics/scripts/erp_sync.py') | crontab -
Note

ERP integration is the most client-specific part of this deployment — allocate extra time for this step. For JobBOSS² and E2 Shop (very common in small CNC shops), use ODBC with a read-only SQL account. For Epicor Kinetic, use the REST API v2. For SAP Business One, use the Service Layer REST API. For Dynamics 365 Business Central, use OData/REST APIs with Azure AD OAuth2. NEVER write to the ERP database — read-only access only. Create a dedicated read-only database account with the client's DBA. Test the ERP sync thoroughly with the client's financial controller to verify cost data accuracy.

Step 12: Azure Cloud Infrastructure Setup

Provision Azure cloud resources for long-term data storage, cloud-based analytics, and Power BI connectivity. Set up Azure IoT Hub for device management and message ingestion, Azure SQL Database for aggregated analytics data, Azure Blob Storage for raw data archival, and Azure Stream Analytics for real-time cloud processing. All resources provisioned through the MSP's Azure CSP subscription for the client.

Azure CLI commands to provision all cloud infrastructure resources
bash
# 1. Create Resource Group
az group create --name rg-manufacturing-analytics --location eastus

# 2. Create IoT Hub (S1 tier, 2 units)
az iot hub create \
  --name iot-hub-<client-short-name>-prod \
  --resource-group rg-manufacturing-analytics \
  --sku S1 \
  --unit 2 \
  --location eastus

# 3. Register edge gateway as IoT device
az iot hub device-identity create \
  --hub-name iot-hub-<client-short-name>-prod \
  --device-id edge-gateway-prod \
  --auth-method shared_private_key

# Get connection string for edge gateway
az iot hub device-identity connection-string show \
  --hub-name iot-hub-<client-short-name>-prod \
  --device-id edge-gateway-prod \
  --output tsv

# 4. Create Azure SQL Database
az sql server create \
  --name sql-mfg-<client-short-name>-prod \
  --resource-group rg-manufacturing-analytics \
  --location eastus \
  --admin-user sqladmin \
  --admin-password '<STRONG_PASSWORD>'

az sql db create \
  --name production-analytics \
  --resource-group rg-manufacturing-analytics \
  --server sql-mfg-<client-short-name>-prod \
  --compute-model Provisioned \
  --edition GeneralPurpose \
  --family Gen5 \
  --capacity 2 \
  --max-size 32GB

# Configure firewall to allow Azure services
az sql server firewall-rule create \
  --resource-group rg-manufacturing-analytics \
  --server sql-mfg-<client-short-name>-prod \
  --name AllowAzureServices \
  --start-ip-address 0.0.0.0 \
  --end-ip-address 0.0.0.0

# 5. Create Blob Storage for data archival
az storage account create \
  --name stmfg<client>prod \
  --resource-group rg-manufacturing-analytics \
  --location eastus \
  --sku Standard_LRS \
  --kind StorageV2

az storage container create \
  --name raw-telemetry \
  --account-name stmfg<client>prod

az storage container create \
  --name ml-training-data \
  --account-name stmfg<client>prod

# Set lifecycle management for cost optimization
az storage account management-policy create \
  --account-name stmfg<client>prod \
  --resource-group rg-manufacturing-analytics \
  --policy '{"rules":[{"name":"archive-old-data","type":"Lifecycle","definition":{"actions":{"baseBlob":{"tierToCool":{"daysAfterModificationGreaterThan":90},"tierToArchive":{"daysAfterModificationGreaterThan":365}}},"filters":{"blobTypes":["blockBlob"],"prefixMatch":["raw-telemetry/"]}}}]}'

# 6. Create Stream Analytics Job (optional, for real-time cloud processing)
az stream-analytics job create \
  --name sa-mfg-<client-short-name>-prod \
  --resource-group rg-manufacturing-analytics \
  --location eastus \
  --sku Standard

# 7. Configure diagnostic logging
az monitor diagnostic-settings create \
  --name mfg-analytics-diagnostics \
  --resource /subscriptions/<sub-id>/resourceGroups/rg-manufacturing-analytics/providers/Microsoft.Devices/IotHubs/iot-hub-<client-short-name>-prod \
  --logs '[{"category":"Connections","enabled":true},{"category":"DeviceTelemetry","enabled":true}]' \
  --storage-account stmfg<client>prod
Note

Use the MSP's Azure CSP subscription to provision all resources — this allows centralized billing and management. For defense contractors or ITAR-regulated manufacturers, use Azure Government regions (usgovvirginia, usgovarizona) instead of commercial Azure. Enable Azure Defender for IoT Hub for security monitoring. Set up budget alerts at 80% and 100% of estimated monthly spend to prevent cost overruns. The Stream Analytics job is optional for the initial deployment — it can be added later for real-time cloud-side anomaly detection.

Step 13: Custom ML Model Deployment — Root Cause Analysis Engine

Deploy the custom Python-based machine learning models on the Dell PowerEdge T360 that analyze production data to identify root causes of yield loss, excessive scrap, and unplanned downtime. This includes three ML components: (1) a downtime root cause classifier, (2) a scrap anomaly detector, and (3) a yield prediction model. Models are trained on historical data and retrained monthly as new data accumulates.

bash
# 1. Create model directory structure
sudo mkdir -p /opt/analytics/models/{downtime_classifier,scrap_detector,yield_predictor}
sudo mkdir -p /opt/analytics/scripts
sudo mkdir -p /var/log/analytics
sudo chown -R mspadmin:mspadmin /opt/analytics /var/log/analytics

# 2. Deploy root cause analysis models (see custom_ai_components for full code)
# Files to create:
#   /opt/analytics/scripts/train_models.py
#   /opt/analytics/scripts/run_inference.py
#   /opt/analytics/scripts/generate_insights.py

# 3. Initial model training (run after 30+ days of data collection)
source /opt/analytics/ml-env/bin/activate
python /opt/analytics/scripts/train_models.py --mode initial
deactivate

# 4. Schedule daily inference and weekly retraining
(crontab -l 2>/dev/null; echo '0 6 * * * /opt/analytics/ml-env/bin/python /opt/analytics/scripts/run_inference.py >> /var/log/analytics/inference.log 2>&1') | crontab -
(crontab -l 2>/dev/null; echo '0 2 * * 0 /opt/analytics/ml-env/bin/python /opt/analytics/scripts/train_models.py --mode retrain >> /var/log/analytics/training.log 2>&1') | crontab -

# 5. Test inference
source /opt/analytics/ml-env/bin/activate
python /opt/analytics/scripts/run_inference.py --test
# Expected output: JSON with predicted downtime causes, scrap risk scores, yield predictions
deactivate
Note

Do NOT deploy ML models until you have at least 30 days of clean production data. The first 30 days should focus on data collection, validation, and establishing baselines. Start with simple statistical analysis (Pareto charts, control charts) before introducing ML — clients need to trust the data before they trust the models. Model accuracy will improve significantly after 90+ days of data. Schedule model retraining weekly on Sunday at 2 AM when production is typically idle. Monitor model drift by comparing prediction accuracy against actuals monthly.

Step 14: System Integration Testing and Validation

Conduct comprehensive end-to-end testing of the entire analytics pipeline from sensor to dashboard. Verify data accuracy by comparing system-calculated OEE, scrap rates, and downtime against manually tracked values. Test all alert conditions, dashboard displays, and ML model outputs. Document all test results for client sign-off and compliance records.

End-to-end pipeline validation: MQTT, TimescaleDB, Azure IoT Hub, OEE checks, alert simulation, and test report generation
bash
# 1. Data pipeline integrity test
# Manually trigger a known event on a machine (e.g., press the cycle start button)
# Verify the event appears in:
#   a. KEPServerEX Quick Client (within 1 second)
#   b. Mosquitto MQTT subscription (within 2 seconds)
mosquitto_sub -h 10.200.1.10 -t 'kepware/#' -u edgeuser -P <password> -C 5 -v
#   c. TimescaleDB (within 5 seconds)
sudo -u postgres psql -d production_analytics -c "SELECT * FROM production_events ORDER BY time DESC LIMIT 5;"
#   d. Grafana dashboard (within 10 seconds)
#   e. Azure IoT Hub (within 30 seconds)
az iot hub monitor-events --hub-name iot-hub-<client-short-name>-prod --device-id edge-gateway-prod --timeout 60
#   f. Power BI (within 2 hours, after scheduled refresh)

# 2. OEE calculation validation
# Select one machine, one shift
# Manually calculate OEE from paper records:
#   Availability = (Shift Time - Downtime) / Shift Time
#   Performance = (Actual Cycle Time * Parts Produced) / Available Time
#   Quality = Good Parts / Total Parts
#   OEE = A * P * Q
# Compare with system-calculated OEE — should be within 2% tolerance

# 3. Alert testing
# Simulate machine down event > 15 minutes
# Verify email/SMS alert fires to production supervisor
# Simulate scrap rate > 5% (manually enter reject events)
# Verify quality manager alert fires

# 4. Sensor data validation
# Compare ifm VVB001 readings with a reference vibration meter
# Readings should match within +/- 10%

# 5. ERP data validation
# Compare work order data in TimescaleDB with ERP screen
# Verify part numbers, quantities, and costs match exactly

# 6. Generate test report
echo 'Test Results Summary' > /opt/analytics/docs/test_report_$(date +%Y%m%d).txt
echo '====================' >> /opt/analytics/docs/test_report_$(date +%Y%m%d).txt
echo 'Date: '$(date) >> /opt/analytics/docs/test_report_$(date +%Y%m%d).txt
echo 'Data Pipeline Latency: ___ms (sensor to TimescaleDB)' >> /opt/analytics/docs/test_report_$(date +%Y%m%d).txt
echo 'OEE Accuracy: ___% (vs manual calculation)' >> /opt/analytics/docs/test_report_$(date +%Y%m%d).txt
echo 'Alert Response Time: ___seconds' >> /opt/analytics/docs/test_report_$(date +%Y%m%d).txt
echo 'Sensor Accuracy: +/- ___%' >> /opt/analytics/docs/test_report_$(date +%Y%m%d).txt
echo 'ERP Data Match: PASS/FAIL' >> /opt/analytics/docs/test_report_$(date +%Y%m%d).txt
Note

Validation against manual records is the single most important step for client buy-in. If the system shows different OEE numbers than what operators believe, investigate the discrepancy immediately — it could be a data issue or it could reveal that manual tracking was inaccurate (common). Document all test results with screenshots for compliance requirements (ISO 9001, 21 CFR Part 11). Have the plant manager and quality manager sign off on test results before declaring the system production-ready.

Custom AI Components

Downtime Root Cause Classifier

Type: skill A supervised machine learning model that classifies unplanned downtime events into root cause categories and identifies the most likely underlying factors. The model uses production context (machine, part number, shift, operator, preceding sensor readings, time since last maintenance) to predict the root cause category when operators fail to enter reason codes, and to validate operator-entered codes against sensor evidence. It uses a Random Forest classifier with SHAP explainability.

Implementation:

python
#!/usr/bin/env python3
"""
Downtime Root Cause Classifier
File: /opt/analytics/models/downtime_classifier/train.py

Trains a Random Forest classifier on historical downtime events
to predict root cause categories from production context features.
"""
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.model_selection import cross_val_score, train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.metrics import classification_report, confusion_matrix
from imblearn.over_sampling import SMOTE
import shap
import joblib
import psycopg2
import json
import logging
from datetime import datetime

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

MODEL_DIR = '/opt/analytics/models/downtime_classifier'
DB_CONN = 'host=localhost dbname=production_analytics user=analytics_app password=CHANGE_THIS'

def load_training_data():
    """Load and merge downtime events with sensor context."""
    conn = psycopg2.connect(DB_CONN)
    
    # Load downtime events with reason codes (labeled data)
    downtime_df = pd.read_sql("""
        SELECT d.time_start, d.time_end, d.machine_id, d.duration_min,
               d.reason_category, d.reason_detail, d.shift, d.operator_id,
               d.work_order, w.part_number, w.material_cost_per_unit
        FROM downtime_events d
        LEFT JOIN work_orders w ON d.work_order = w.work_order
        WHERE d.reason_category IS NOT NULL 
          AND d.reason_category != ''
          AND d.duration_min > 1
    """, conn)
    
    # For each downtime event, get preceding sensor readings (5 min before)
    enriched_rows = []
    for _, row in downtime_df.iterrows():
        sensor_df = pd.read_sql(f"""
            SELECT metric_name, AVG(value) as avg_val, 
                   STDDEV(value) as std_val, MAX(value) as max_val
            FROM machine_telemetry
            WHERE machine_id = '{row['machine_id']}'
              AND time BETWEEN '{row['time_start']}'::timestamptz - interval '5 minutes'
                          AND '{row['time_start']}'::timestamptz
            GROUP BY metric_name
        """, conn)
        
        sensor_features = {}
        for _, s in sensor_df.iterrows():
            name = s['metric_name'].replace(' ', '_')
            sensor_features[f'{name}_avg'] = s['avg_val']
            sensor_features[f'{name}_std'] = s['std_val']
            sensor_features[f'{name}_max'] = s['max_val']
        
        enriched_row = row.to_dict()
        enriched_row.update(sensor_features)
        enriched_rows.append(enriched_row)
    
    conn.close()
    return pd.DataFrame(enriched_rows)

def engineer_features(df):
    """Create features for the classifier."""
    features = pd.DataFrame()
    
    # Temporal features
    df['time_start'] = pd.to_datetime(df['time_start'])
    features['hour_of_day'] = df['time_start'].dt.hour
    features['day_of_week'] = df['time_start'].dt.dayofweek
    features['is_night_shift'] = ((df['time_start'].dt.hour >= 23) | (df['time_start'].dt.hour < 6)).astype(int)
    
    # Duration feature
    features['duration_min'] = df['duration_min']
    
    # Encode categorical features
    le_machine = LabelEncoder()
    features['machine_encoded'] = le_machine.fit_transform(df['machine_id'].fillna('unknown'))
    joblib.dump(le_machine, f'{MODEL_DIR}/le_machine.pkl')
    
    le_shift = LabelEncoder()
    features['shift_encoded'] = le_shift.fit_transform(df['shift'].fillna('unknown'))
    joblib.dump(le_shift, f'{MODEL_DIR}/le_shift.pkl')
    
    le_part = LabelEncoder()
    features['part_encoded'] = le_part.fit_transform(df['part_number'].fillna('unknown'))
    joblib.dump(le_part, f'{MODEL_DIR}/le_part.pkl')
    
    # Sensor features (vibration, temperature, etc.)
    sensor_cols = [c for c in df.columns if c.endswith(('_avg', '_std', '_max'))]
    for col in sensor_cols:
        features[col] = pd.to_numeric(df[col], errors='coerce').fillna(0)
    
    # Material cost (proxy for part complexity)
    features['material_cost'] = pd.to_numeric(df['material_cost_per_unit'], errors='coerce').fillna(0)
    
    return features

def train_model():
    """Train the downtime root cause classifier."""
    logger.info('Loading training data...')
    df = load_training_data()
    
    if len(df) < 100:
        logger.warning(f'Only {len(df)} labeled events. Need 100+ for reliable model. Using rule-based fallback.')
        return None
    
    logger.info(f'Training on {len(df)} downtime events')
    
    # Prepare features and target
    X = engineer_features(df)
    le_target = LabelEncoder()
    y = le_target.fit_transform(df['reason_category'])
    joblib.dump(le_target, f'{MODEL_DIR}/le_target.pkl')
    
    # Handle class imbalance with SMOTE
    smote = SMOTE(random_state=42, k_neighbors=min(5, min(pd.Series(y).value_counts()) - 1))
    X_resampled, y_resampled = smote.fit_resample(X.fillna(0), y)
    
    # Train/test split
    X_train, X_test, y_train, y_test = train_test_split(X_resampled, y_resampled, test_size=0.2, random_state=42)
    
    # Train Random Forest
    model = RandomForestClassifier(
        n_estimators=200,
        max_depth=15,
        min_samples_split=5,
        min_samples_leaf=2,
        class_weight='balanced',
        random_state=42,
        n_jobs=-1
    )
    model.fit(X_train, y_train)
    
    # Evaluate
    y_pred = model.predict(X_test)
    report = classification_report(y_test, y_pred, target_names=le_target.classes_, output_dict=True)
    logger.info(f'Model accuracy: {report["accuracy"]:.3f}')
    logger.info(f'Classification Report:\n{classification_report(y_test, y_pred, target_names=le_target.classes_)}')
    
    # SHAP explainability
    explainer = shap.TreeExplainer(model)
    shap_values = explainer.shap_values(X_test[:100])
    
    # Save model and artifacts
    joblib.dump(model, f'{MODEL_DIR}/rf_model.pkl')
    joblib.dump(explainer, f'{MODEL_DIR}/shap_explainer.pkl')
    
    # Save feature importance
    importance = pd.DataFrame({
        'feature': X.columns,
        'importance': model.feature_importances_
    }).sort_values('importance', ascending=False)
    importance.to_csv(f'{MODEL_DIR}/feature_importance.csv', index=False)
    
    # Save metadata
    metadata = {
        'trained_at': datetime.utcnow().isoformat(),
        'training_samples': len(X_resampled),
        'features': list(X.columns),
        'classes': list(le_target.classes_),
        'accuracy': float(report['accuracy']),
        'f1_weighted': float(report['weighted avg']['f1-score'])
    }
    with open(f'{MODEL_DIR}/metadata.json', 'w') as f:
        json.dump(metadata, f, indent=2)
    
    logger.info(f'Model saved to {MODEL_DIR}')
    return model

if __name__ == '__main__':
    train_model()

Scrap Anomaly Detector

Type: skill An unsupervised anomaly detection model that monitors real-time scrap rates and identifies statistically significant deviations from normal patterns. Uses Isolation Forest for multivariate anomaly detection across scrap rate, reject counts, cycle time deviation, and sensor readings. When anomalies are detected, the system generates alerts with correlated potential causes by analyzing which process parameters deviated simultaneously.

Implementation:

Scrap Anomaly Detector
python
# /opt/analytics/models/scrap_detector/detect.py

#!/usr/bin/env python3
"""
Scrap Anomaly Detector
File: /opt/analytics/models/scrap_detector/detect.py

Monitors scrap rates and identifies anomalies using Isolation Forest.
Correlates anomalies with process parameter deviations.
"""
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import psycopg2
import joblib
import json
import logging
from datetime import datetime, timedelta

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

MODEL_DIR = '/opt/analytics/models/scrap_detector'
DB_CONN = 'host=localhost dbname=production_analytics user=analytics_app password=CHANGE_THIS'

def load_production_data(hours_back=24*30):
    """Load production and sensor data for training/inference."""
    conn = psycopg2.connect(DB_CONN)
    
    # Hourly aggregated production metrics
    production_df = pd.read_sql(f"""
        SELECT 
            time_bucket('1 hour', time) as hour,
            machine_id,
            SUM(good_count) as good_parts,
            SUM(reject_count) as reject_parts,
            AVG(cycle_time_ms) as avg_cycle_time,
            STDDEV(cycle_time_ms) as std_cycle_time,
            COUNT(*) as cycle_count
        FROM production_events
        WHERE event_type = 'cycle_complete'
          AND time > NOW() - interval '{hours_back} hours'
        GROUP BY 1, 2
        HAVING SUM(good_count) + SUM(reject_count) > 0
        ORDER BY 1
    """, conn)
    
    # Hourly sensor averages
    sensor_df = pd.read_sql(f"""
        SELECT
            time_bucket('1 hour', time) as hour,
            machine_id,
            metric_name,
            AVG(value) as avg_value,
            STDDEV(value) as std_value,
            MAX(value) as max_value,
            MIN(value) as min_value
        FROM machine_telemetry
        WHERE time > NOW() - interval '{hours_back} hours'
        GROUP BY 1, 2, 3
    """, conn)
    
    conn.close()
    
    # Pivot sensor data to wide format
    if not sensor_df.empty:
        sensor_pivot = sensor_df.pivot_table(
            index=['hour', 'machine_id'],
            columns='metric_name',
            values=['avg_value', 'std_value', 'max_value'],
            aggfunc='first'
        )
        sensor_pivot.columns = [f'{col[1]}_{col[0]}' for col in sensor_pivot.columns]
        sensor_pivot = sensor_pivot.reset_index()
        
        merged = production_df.merge(sensor_pivot, on=['hour', 'machine_id'], how='left')
    else:
        merged = production_df
    
    return merged

def engineer_scrap_features(df):
    """Create features for anomaly detection."""
    features = pd.DataFrame()
    
    # Core scrap metrics
    total_parts = df['good_parts'] + df['reject_parts']
    features['scrap_rate'] = df['reject_parts'] / total_parts.replace(0, np.nan)
    features['reject_count'] = df['reject_parts']
    
    # Cycle time metrics
    features['avg_cycle_time'] = df['avg_cycle_time']
    features['cycle_time_variability'] = df['std_cycle_time'] / df['avg_cycle_time'].replace(0, np.nan)
    features['throughput'] = df['cycle_count']
    
    # Temporal features
    df['hour'] = pd.to_datetime(df['hour'])
    features['hour_of_day'] = df['hour'].dt.hour
    features['is_shift_start'] = df['hour'].dt.hour.isin([6, 14, 22]).astype(int)
    
    # Sensor features (dynamic based on available sensors)
    sensor_cols = [c for c in df.columns if any(x in c for x in ['Vibration', 'Temperature', 'Pressure', 'Speed'])]
    for col in sensor_cols:
        features[col] = pd.to_numeric(df[col], errors='coerce')
    
    return features.fillna(0)

def train_anomaly_detector():
    """Train Isolation Forest on historical production data."""
    logger.info('Loading training data for anomaly detector...')
    df = load_production_data(hours_back=24*60)  # 60 days of history
    
    if len(df) < 200:
        logger.warning(f'Only {len(df)} hourly records. Need 200+ for reliable model.')
        return None
    
    features = engineer_scrap_features(df)
    
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(features)
    
    model = IsolationForest(
        n_estimators=200,
        max_samples='auto',
        contamination=0.05,  # Expect ~5% of data points to be anomalous
        random_state=42,
        n_jobs=-1
    )
    model.fit(X_scaled)
    
    joblib.dump(model, f'{MODEL_DIR}/iso_forest.pkl')
    joblib.dump(scaler, f'{MODEL_DIR}/scaler.pkl')
    
    metadata = {
        'trained_at': datetime.utcnow().isoformat(),
        'training_samples': len(features),
        'features': list(features.columns),
        'contamination': 0.05
    }
    with open(f'{MODEL_DIR}/metadata.json', 'w') as f:
        json.dump(metadata, f, indent=2)
    
    logger.info(f'Anomaly detector trained on {len(features)} samples')
    return model

def detect_anomalies(hours_back=24):
    """Run anomaly detection on recent production data."""
    model = joblib.load(f'{MODEL_DIR}/iso_forest.pkl')
    scaler = joblib.load(f'{MODEL_DIR}/scaler.pkl')
    
    df = load_production_data(hours_back=hours_back)
    if df.empty:
        return []
    
    features = engineer_scrap_features(df)
    X_scaled = scaler.transform(features)
    
    # Predict: -1 = anomaly, 1 = normal
    predictions = model.predict(X_scaled)
    scores = model.decision_function(X_scaled)
    
    anomalies = []
    anomaly_mask = predictions == -1
    
    for idx in df[anomaly_mask].index:
        row = df.loc[idx]
        feat_row = features.loc[idx]
        
        # Identify which features are most anomalous
        feature_z_scores = {}
        for col in features.columns:
            z = abs(X_scaled[idx][list(features.columns).index(col)])
            if z > 2.0:  # More than 2 std deviations
                feature_z_scores[col] = round(float(z), 2)
        
        anomaly = {
            'timestamp': str(row['hour']),
            'machine_id': row['machine_id'],
            'scrap_rate': round(float(feat_row.get('scrap_rate', 0)) * 100, 2),
            'anomaly_score': round(float(scores[idx]), 4),
            'severity': 'HIGH' if scores[idx] < -0.3 else 'MEDIUM',
            'deviating_parameters': feature_z_scores,
            'likely_causes': generate_cause_hypothesis(feature_z_scores, row)
        }
        anomalies.append(anomaly)
    
    # Write anomalies to database
    if anomalies:
        write_anomalies_to_db(anomalies)
    
    return anomalies

def generate_cause_hypothesis(deviating_params, production_row):
    """Generate human-readable cause hypotheses based on deviating parameters."""
    causes = []
    
    for param, z_score in sorted(deviating_params.items(), key=lambda x: -x[1]):
        if 'Vibration' in param and z_score > 2.5:
            causes.append(f'Elevated vibration (z={z_score}) may indicate bearing wear, imbalance, or loose mounting')
        elif 'Temperature' in param and z_score > 2.5:
            causes.append(f'Abnormal temperature (z={z_score}) may indicate coolant issue, friction, or overload')
        elif 'cycle_time_variability' in param and z_score > 2.0:
            causes.append(f'High cycle time variability (z={z_score}) suggests inconsistent material or tool wear')
        elif 'scrap_rate' in param and z_score > 2.0:
            causes.append(f'Scrap rate spike (z={z_score}) - check material batch, tool condition, and setup parameters')
        elif 'is_shift_start' in param and z_score > 1.5:
            causes.append('Anomaly coincides with shift change - possible setup or warmup issue')
    
    if not causes:
        causes.append('Multiple parameters deviating simultaneously - investigate holistically')
    
    return causes

def write_anomalies_to_db(anomalies):
    """Write detected anomalies to the analytics database."""
    conn = psycopg2.connect(DB_CONN)
    cur = conn.cursor()
    
    for a in anomalies:
        cur.execute("""
            INSERT INTO ml_anomalies (time, machine_id, anomaly_type, severity,
                                      scrap_rate_pct, anomaly_score, 
                                      deviating_parameters, likely_causes)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
        """, (
            a['timestamp'], a['machine_id'], 'scrap_anomaly', a['severity'],
            a['scrap_rate'], a['anomaly_score'],
            json.dumps(a['deviating_parameters']),
            json.dumps(a['likely_causes'])
        ))
    
    conn.commit()
    cur.close()
    conn.close()
    logger.info(f'Wrote {len(anomalies)} anomalies to database')

if __name__ == '__main__':
    import sys
    if '--train' in sys.argv:
        train_anomaly_detector()
    else:
        anomalies = detect_anomalies(hours_back=24)
        print(json.dumps(anomalies, indent=2))

Yield Prediction and Optimization Agent

Type: agent A predictive model that forecasts expected yield for upcoming production runs based on historical patterns, current machine conditions, and environmental factors. Uses Facebook Prophet for time-series forecasting of yield trends combined with a gradient boosting model for run-specific yield prediction. Outputs predicted yield, confidence interval, and actionable recommendations to optimize yield for the upcoming shift.

Implementation:

Yield Prediction & Optimization Agent
python
# /opt/analytics/models/yield_predictor/predict.py

#!/usr/bin/env python3
"""
Yield Prediction & Optimization Agent
File: /opt/analytics/models/yield_predictor/predict.py

Forecasts expected yield and generates optimization recommendations.
"""
import pandas as pd
import numpy as np
from prophet import Prophet
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.model_selection import cross_val_score
import psycopg2
import joblib
import json
import logging
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

MODEL_DIR = '/opt/analytics/models/yield_predictor'
DB_CONN = 'host=localhost dbname=production_analytics user=analytics_app password=CHANGE_THIS'

def load_yield_history():
    """Load historical yield data aggregated by shift."""
    conn = psycopg2.connect(DB_CONN)
    df = pd.read_sql("""
        SELECT 
            date_trunc('day', time) + 
              CASE 
                WHEN EXTRACT(hour FROM time) BETWEEN 6 AND 13 THEN interval '6 hours'
                WHEN EXTRACT(hour FROM time) BETWEEN 14 AND 21 THEN interval '14 hours'
                ELSE interval '22 hours'
              END as shift_start,
            machine_id,
            SUM(good_count) as good_parts,
            SUM(reject_count) as reject_parts,
            AVG(cycle_time_ms) as avg_cycle_time,
            STDDEV(cycle_time_ms) as std_cycle_time
        FROM production_events
        WHERE event_type = 'cycle_complete'
          AND time > NOW() - interval '180 days'
        GROUP BY 1, 2
        HAVING SUM(good_count) + SUM(reject_count) > 10
        ORDER BY 1
    """, conn)
    conn.close()
    
    df['total_parts'] = df['good_parts'] + df['reject_parts']
    df['yield_pct'] = (df['good_parts'] / df['total_parts']) * 100
    return df

def train_prophet_forecast(df):
    """Train Prophet model for yield trend forecasting."""
    # Aggregate across all machines for plant-level trend
    daily = df.groupby(pd.to_datetime(df['shift_start']).dt.date).agg({
        'good_parts': 'sum',
        'reject_parts': 'sum'
    }).reset_index()
    daily.columns = ['ds', 'good_parts', 'reject_parts']
    daily['ds'] = pd.to_datetime(daily['ds'])
    daily['y'] = (daily['good_parts'] / (daily['good_parts'] + daily['reject_parts'])) * 100
    
    model = Prophet(
        changepoint_prior_scale=0.05,
        seasonality_prior_scale=10,
        daily_seasonality=False,
        weekly_seasonality=True,
        yearly_seasonality=False
    )
    model.add_country_holidays(country_name='US')
    model.fit(daily[['ds', 'y']])
    
    # Save model
    with open(f'{MODEL_DIR}/prophet_model.json', 'w') as f:
        from prophet.serialize import model_to_json
        f.write(model_to_json(model))
    
    return model

def train_yield_predictor(df):
    """Train gradient boosting model for per-run yield prediction."""
    features = pd.DataFrame()
    features['hour_of_day'] = pd.to_datetime(df['shift_start']).dt.hour
    features['day_of_week'] = pd.to_datetime(df['shift_start']).dt.dayofweek
    features['avg_cycle_time'] = df['avg_cycle_time']
    features['cycle_time_cv'] = df['std_cycle_time'] / df['avg_cycle_time'].replace(0, np.nan)
    features['total_parts'] = df['total_parts']
    
    from sklearn.preprocessing import LabelEncoder
    le = LabelEncoder()
    features['machine_encoded'] = le.fit_transform(df['machine_id'])
    joblib.dump(le, f'{MODEL_DIR}/le_machine.pkl')
    
    target = df['yield_pct']
    
    # Remove NaN rows
    valid_mask = features.notna().all(axis=1) & target.notna()
    X = features[valid_mask].fillna(0)
    y = target[valid_mask]
    
    model = GradientBoostingRegressor(
        n_estimators=200,
        max_depth=6,
        learning_rate=0.05,
        subsample=0.8,
        random_state=42
    )
    
    scores = cross_val_score(model, X, y, cv=5, scoring='r2')
    logger.info(f'Yield predictor CV R\u00b2: {scores.mean():.3f} (+/- {scores.std():.3f})')
    
    model.fit(X, y)
    joblib.dump(model, f'{MODEL_DIR}/gb_model.pkl')
    
    metadata = {
        'trained_at': datetime.utcnow().isoformat(),
        'training_samples': len(X),
        'cv_r2_mean': round(float(scores.mean()), 3),
        'cv_r2_std': round(float(scores.std()), 3),
        'features': list(X.columns)
    }
    with open(f'{MODEL_DIR}/metadata.json', 'w') as f:
        json.dump(metadata, f, indent=2)
    
    return model

def generate_recommendations(current_conditions, predicted_yield, historical_df):
    """Generate actionable recommendations to improve yield."""
    recommendations = []
    
    # Compare current conditions to high-yield historical periods
    high_yield = historical_df[historical_df['yield_pct'] > historical_df['yield_pct'].quantile(0.9)]
    low_yield = historical_df[historical_df['yield_pct'] < historical_df['yield_pct'].quantile(0.1)]
    
    if current_conditions.get('avg_cycle_time', 0) > high_yield['avg_cycle_time'].mean() * 1.1:
        recommendations.append({
            'priority': 'HIGH',
            'area': 'Cycle Time',
            'action': f"Current avg cycle time ({current_conditions.get('avg_cycle_time', 0):.0f}ms) is {((current_conditions.get('avg_cycle_time', 0) / high_yield['avg_cycle_time'].mean()) - 1) * 100:.1f}% above optimal. Check tool wear, feed rates, and material hardness.",
            'expected_impact': f"Optimizing to {high_yield['avg_cycle_time'].mean():.0f}ms could improve yield by {high_yield['yield_pct'].mean() - predicted_yield:.1f}%"
        })
    
    if current_conditions.get('cycle_time_cv', 0) > 0.15:
        recommendations.append({
            'priority': 'HIGH',
            'area': 'Process Stability',
            'action': f"Cycle time coefficient of variation ({current_conditions.get('cycle_time_cv', 0):.2f}) indicates unstable process. Investigate material consistency, fixture clamping, and environmental factors.",
            'expected_impact': 'Reducing variability below 0.10 historically correlates with 3-5% yield improvement'
        })
    
    hour = current_conditions.get('hour_of_day', 12)
    if hour in [6, 7, 22, 23]:  # Shift start hours
        recommendations.append({
            'priority': 'MEDIUM',
            'area': 'Shift Transition',
            'action': 'Current/upcoming shift change period. Ensure thorough handoff, machine warmup cycle, and first-article inspection.',
            'expected_impact': 'Proper shift transitions reduce shift-start scrap by 30-50% historically'
        })
    
    return recommendations

def predict_yield():
    """Generate yield prediction for the next shift."""
    df = load_yield_history()
    
    # Prophet trend forecast
    from prophet.serialize import model_from_json
    with open(f'{MODEL_DIR}/prophet_model.json', 'r') as f:
        prophet_model = model_from_json(f.read())
    
    future = prophet_model.make_future_dataframe(periods=7)
    forecast = prophet_model.predict(future)
    next_day_forecast = forecast.iloc[-7:][['ds', 'yhat', 'yhat_lower', 'yhat_upper']]
    
    # Current conditions for GB prediction
    conn = psycopg2.connect(DB_CONN)
    current = pd.read_sql("""
        SELECT machine_id, AVG(cycle_time_ms) as avg_cycle_time,
               STDDEV(cycle_time_ms) as std_cycle_time,
               SUM(good_count) + SUM(reject_count) as total_parts
        FROM production_events
        WHERE event_type = 'cycle_complete'
          AND time > NOW() - interval '4 hours'
        GROUP BY machine_id
    """, conn)
    conn.close()
    
    results = {
        'generated_at': datetime.utcnow().isoformat(),
        'trend_forecast': next_day_forecast.to_dict('records'),
        'machine_predictions': [],
        'recommendations': []
    }
    
    if not current.empty:
        gb_model = joblib.load(f'{MODEL_DIR}/gb_model.pkl')
        le_machine = joblib.load(f'{MODEL_DIR}/le_machine.pkl')
        
        for _, row in current.iterrows():
            conditions = {
                'avg_cycle_time': row['avg_cycle_time'],
                'cycle_time_cv': row['std_cycle_time'] / max(row['avg_cycle_time'], 1),
                'total_parts': row['total_parts'],
                'hour_of_day': datetime.now().hour,
                'day_of_week': datetime.now().weekday()
            }
            
            try:
                machine_enc = le_machine.transform([row['machine_id']])[0]
            except ValueError:
                machine_enc = 0
            
            X_pred = pd.DataFrame([{
                'hour_of_day': conditions['hour_of_day'],
                'day_of_week': conditions['day_of_week'],
                'avg_cycle_time': conditions['avg_cycle_time'],
                'cycle_time_cv': conditions['cycle_time_cv'],
                'total_parts': conditions['total_parts'],
                'machine_encoded': machine_enc
            }])
            
            predicted_yield = float(gb_model.predict(X_pred)[0])
            
            results['machine_predictions'].append({
                'machine_id': row['machine_id'],
                'predicted_yield_pct': round(predicted_yield, 1),
                'current_conditions': conditions
            })
            
            recs = generate_recommendations(conditions, predicted_yield, df[df['machine_id'] == row['machine_id']])
            results['recommendations'].extend(recs)
    
    # Write results to database
    conn = psycopg2.connect(DB_CONN)
    cur = conn.cursor()
    cur.execute("""
        INSERT INTO ml_predictions (time, prediction_type, results)
        VALUES (NOW(), 'yield_forecast', %s)
    """, (json.dumps(results),))
    conn.commit()
    cur.close()
    conn.close()
    
    return results

if __name__ == '__main__':
    import sys
    if '--train' in sys.argv:
        df = load_yield_history()
        train_prophet_forecast(df)
        train_yield_predictor(df)
        logger.info('All yield prediction models trained successfully')
    else:
        results = predict_yield()
        print(json.dumps(results, indent=2, default=str))

OEE Calculation Workflow

Type: workflow An automated data pipeline workflow that calculates Overall Equipment Effectiveness (OEE) hourly for each machine by combining availability, performance, and quality metrics from multiple data sources. The workflow pulls real-time data from TimescaleDB, computes OEE components using standard manufacturing formulas, stores results in the oee_hourly table, and triggers alerts when OEE drops below configurable thresholds.

Implementation:

OEE Calculation Script
python
# /opt/analytics/scripts/oee_calculator.py

#!/usr/bin/env python3
"""
OEE Calculation Workflow
File: /opt/analytics/scripts/oee_calculator.py

Runs hourly via cron to calculate OEE for each machine.
Also supports real-time calculation triggered by Node-RED.

Crontab entry:
0 * * * * /opt/analytics/ml-env/bin/python /opt/analytics/scripts/oee_calculator.py
"""
import psycopg2
import pandas as pd
import json
import logging
import smtplib
from email.mime.text import MIMEText
from datetime import datetime, timedelta
import os

logging.basicConfig(
    level=logging.INFO,
    filename='/var/log/analytics/oee_calculator.log',
    format='%(asctime)s %(levelname)s %(message)s'
)
logger = logging.getLogger(__name__)

DB_CONN = os.environ.get('DB_CONN', 'host=localhost dbname=production_analytics user=analytics_app password=CHANGE_THIS')

# OEE alert thresholds (configurable per client)
ALERT_THRESHOLDS = {
    'oee_critical': 40.0,       # % - trigger immediate alert
    'oee_warning': 60.0,        # % - trigger warning
    'availability_critical': 70.0,
    'quality_critical': 95.0,   # Manufacturing quality typically >95%
    'performance_critical': 75.0
}

# Shift schedule (customize per client)
SHIFT_SCHEDULE = {
    'shift_1': {'start': 6, 'end': 14, 'break_min': 30},
    'shift_2': {'start': 14, 'end': 22, 'break_min': 30},
    'shift_3': {'start': 22, 'end': 6, 'break_min': 30}
}

def get_planned_production_time(hour, shift_schedule=SHIFT_SCHEDULE):
    """Return planned production minutes for a given hour."""
    for shift_name, shift in shift_schedule.items():
        if shift['start'] <= hour < shift['end'] or \
           (shift['start'] > shift['end'] and (hour >= shift['start'] or hour < shift['end'])):
            return 60  # Full hour of planned production
    return 0  # Outside planned production time

def calculate_oee_hourly(target_hour=None):
    """Calculate OEE for all machines for the specified hour."""
    conn = psycopg2.connect(DB_CONN)
    
    if target_hour is None:
        target_hour = datetime.utcnow().replace(minute=0, second=0, microsecond=0) - timedelta(hours=1)
    
    hour_end = target_hour + timedelta(hours=1)
    planned_minutes = get_planned_production_time(target_hour.hour)
    
    if planned_minutes == 0:
        logger.info(f'Hour {target_hour} is outside planned production. Skipping.')
        return []
    
    # Get production data for the hour
    production = pd.read_sql(f"""
        SELECT 
            machine_id,
            SUM(good_count) as good_parts,
            SUM(reject_count) as reject_parts,
            AVG(cycle_time_ms) as avg_actual_cycle_time_ms,
            COUNT(*) as total_cycles
        FROM production_events
        WHERE event_type = 'cycle_complete'
          AND time >= '{target_hour}'::timestamptz
          AND time < '{hour_end}'::timestamptz
        GROUP BY machine_id
    """, conn)
    
    # Get downtime for the hour
    downtime = pd.read_sql(f"""
        SELECT 
            machine_id,
            SUM(
                EXTRACT(EPOCH FROM (
                    LEAST(time_end, '{hour_end}'::timestamptz) - 
                    GREATEST(time_start, '{target_hour}'::timestamptz)
                )) / 60.0
            ) as downtime_minutes
        FROM downtime_events
        WHERE time_start < '{hour_end}'::timestamptz
          AND (time_end IS NULL OR time_end > '{target_hour}'::timestamptz)
        GROUP BY machine_id
    """, conn)
    
    # Get ideal cycle times from machine configuration
    # (stored in a config table or hardcoded per machine)
    ideal_cycle_times = pd.read_sql("""
        SELECT machine_id, ideal_cycle_time_ms 
        FROM machine_config
    """, conn)
    
    results = []
    
    for _, prod_row in production.iterrows():
        machine_id = prod_row['machine_id']
        good_parts = int(prod_row['good_parts'])
        reject_parts = int(prod_row['reject_parts'])
        total_parts = good_parts + reject_parts
        
        # Availability = (Planned Time - Downtime) / Planned Time
        machine_downtime = downtime[downtime['machine_id'] == machine_id]
        dt_minutes = float(machine_downtime['downtime_minutes'].iloc[0]) if not machine_downtime.empty else 0
        run_time = max(planned_minutes - dt_minutes, 0)
        availability = run_time / planned_minutes if planned_minutes > 0 else 0
        
        # Performance = (Ideal Cycle Time * Total Parts) / Run Time
        ideal_ct = ideal_cycle_times[ideal_cycle_times['machine_id'] == machine_id]
        ideal_ct_ms = float(ideal_ct['ideal_cycle_time_ms'].iloc[0]) if not ideal_ct.empty else prod_row['avg_actual_cycle_time_ms']
        ideal_ct_min = ideal_ct_ms / 60000.0
        performance = (ideal_ct_min * total_parts) / run_time if run_time > 0 else 0
        performance = min(performance, 1.0)  # Cap at 100%
        
        # Quality = Good Parts / Total Parts
        quality = good_parts / total_parts if total_parts > 0 else 0
        
        # OEE = Availability * Performance * Quality
        oee = availability * performance * quality
        
        result = {
            'time_bucket': target_hour,
            'machine_id': machine_id,
            'availability': round(availability, 4),
            'performance': round(performance, 4),
            'quality': round(quality, 4),
            'oee': round(oee, 4),
            'good_count': good_parts,
            'reject_count': reject_parts,
            'planned_time_min': planned_minutes,
            'run_time_min': round(run_time, 1)
        }
        results.append(result)
        
        # Check alert thresholds
        check_alerts(result)
    
    # Write results to database
    cur = conn.cursor()
    for r in results:
        cur.execute("""
            INSERT INTO oee_hourly (time_bucket, machine_id, availability, performance, 
                                    quality, oee, good_count, reject_count,
                                    planned_time_min, run_time_min)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            ON CONFLICT DO NOTHING
        """, (
            r['time_bucket'], r['machine_id'], r['availability'], r['performance'],
            r['quality'], r['oee'], r['good_count'], r['reject_count'],
            r['planned_time_min'], r['run_time_min']
        ))
    
    conn.commit()
    cur.close()
    conn.close()
    
    logger.info(f'Calculated OEE for {len(results)} machines at {target_hour}')
    return results

def check_alerts(oee_result):
    """Check OEE result against thresholds and send alerts."""
    alerts = []
    
    if oee_result['oee'] * 100 < ALERT_THRESHOLDS['oee_critical']:
        alerts.append(f"CRITICAL: {oee_result['machine_id']} OEE={oee_result['oee']*100:.1f}% (threshold: {ALERT_THRESHOLDS['oee_critical']}%)")
    elif oee_result['oee'] * 100 < ALERT_THRESHOLDS['oee_warning']:
        alerts.append(f"WARNING: {oee_result['machine_id']} OEE={oee_result['oee']*100:.1f}% (threshold: {ALERT_THRESHOLDS['oee_warning']}%)")
    
    if oee_result['quality'] * 100 < ALERT_THRESHOLDS['quality_critical']:
        alerts.append(f"QUALITY ALERT: {oee_result['machine_id']} Quality={oee_result['quality']*100:.1f}%")
    
    for alert_msg in alerts:
        logger.warning(alert_msg)
        # Write alert to database for dashboard display
        try:
            conn = psycopg2.connect(DB_CONN)
            cur = conn.cursor()
            cur.execute("""
                INSERT INTO alerts (time, machine_id, alert_type, severity, message)
                VALUES (NOW(), %s, 'oee', %s, %s)
            """, (
                oee_result['machine_id'],
                'CRITICAL' if 'CRITICAL' in alert_msg else 'WARNING',
                alert_msg
            ))
            conn.commit()
            cur.close()
            conn.close()
        except Exception as e:
            logger.error(f'Failed to write alert: {e}')

if __name__ == '__main__':
    results = calculate_oee_hourly()
    for r in results:
        print(f"{r['machine_id']}: OEE={r['oee']*100:.1f}% (A={r['availability']*100:.1f}% P={r['performance']*100:.1f}% Q={r['quality']*100:.1f}%)")

Daily Insights Report Generator

Type: prompt A templated report generator that produces a daily executive summary of production performance, combining OEE metrics, scrap analysis, downtime root causes, and ML-generated insights into a formatted email or PDF sent to plant management every morning. Uses Jinja2 templates for formatting and pulls all data from TimescaleDB.

Implementation:

Daily Insights Report Generator
python
# /opt/analytics/scripts/generate_insights.py. Schedule via cron: 0 5 * * *
# (5 AM daily, before shift 1 starts)

#!/usr/bin/env python3
"""
Daily Insights Report Generator
File: /opt/analytics/scripts/generate_insights.py

Generates and emails a daily production insights report.
Schedule via cron: 0 5 * * * (5 AM daily, before shift 1 starts)

Crontab entry:
0 5 * * * /opt/analytics/ml-env/bin/python /opt/analytics/scripts/generate_insights.py
"""
import psycopg2
import pandas as pd
import json
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime, timedelta
import os
import logging

logging.basicConfig(level=logging.INFO, filename='/var/log/analytics/insights.log')
logger = logging.getLogger(__name__)

DB_CONN = os.environ.get('DB_CONN', 'host=localhost dbname=production_analytics user=analytics_app password=CHANGE_THIS')
SMTP_SERVER = os.environ.get('SMTP_SERVER', 'smtp.office365.com')
SMTP_PORT = int(os.environ.get('SMTP_PORT', '587'))
SMTP_USER = os.environ.get('SMTP_USER', 'analytics@client.com')
SMTP_PASS = os.environ.get('SMTP_PASS', 'CHANGE_THIS')
RECIPIENTS = os.environ.get('REPORT_RECIPIENTS', 'plantmanager@client.com,qualitymanager@client.com').split(',')

def generate_report():
    conn = psycopg2.connect(DB_CONN)
    yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
    today = datetime.now().strftime('%Y-%m-%d')
    
    # 1. OEE Summary
    oee_df = pd.read_sql(f"""
        SELECT machine_id,
               AVG(availability) * 100 as avg_availability,
               AVG(performance) * 100 as avg_performance,
               AVG(quality) * 100 as avg_quality,
               AVG(oee) * 100 as avg_oee,
               SUM(good_count) as total_good,
               SUM(reject_count) as total_rejects
        FROM oee_hourly
        WHERE time_bucket >= '{yesterday}'::date
          AND time_bucket < '{today}'::date
        GROUP BY machine_id
        ORDER BY avg_oee ASC
    """, conn)
    
    plant_oee = oee_df['avg_oee'].mean() if not oee_df.empty else 0
    
    # 2. Top Downtime Causes
    downtime_df = pd.read_sql(f"""
        SELECT reason_category, COUNT(*) as event_count,
               SUM(duration_min) as total_minutes,
               AVG(duration_min) as avg_duration
        FROM downtime_events
        WHERE time_start >= '{yesterday}'::date
          AND time_start < '{today}'::date
          AND reason_category IS NOT NULL
        GROUP BY reason_category
        ORDER BY total_minutes DESC
        LIMIT 5
    """, conn)
    
    # 3. Top Scrap Causes
    scrap_df = pd.read_sql(f"""
        SELECT scrap_category, SUM(quantity) as total_qty,
               SUM(quantity * material_cost) as total_cost
        FROM scrap_events
        WHERE time >= '{yesterday}'::date
          AND time < '{today}'::date
        GROUP BY scrap_category
        ORDER BY total_cost DESC
        LIMIT 5
    """, conn)
    
    total_scrap_cost = scrap_df['total_cost'].sum() if not scrap_df.empty else 0
    
    # 4. ML Anomalies
    anomaly_df = pd.read_sql(f"""
        SELECT machine_id, severity, scrap_rate_pct, likely_causes
        FROM ml_anomalies
        WHERE time >= '{yesterday}'::date
          AND time < '{today}'::date
        ORDER BY severity DESC, scrap_rate_pct DESC
        LIMIT 5
    """, conn)
    
    # 5. ML Yield Predictions
    prediction_df = pd.read_sql(f"""
        SELECT results
        FROM ml_predictions
        WHERE prediction_type = 'yield_forecast'
        ORDER BY time DESC LIMIT 1
    """, conn)
    
    conn.close()
    
    # Build HTML report
    html = f"""
    <html><body style='font-family: Arial, sans-serif; max-width: 800px; margin: auto;'>
    <h1 style='color: #2c3e50;'>📊 Daily Production Insights Report</h1>
    <p style='color: #7f8c8d;'>Report Date: {yesterday} | Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}</p>
    
    <h2 style='color: #2980b9;'>🏭 Plant OEE Summary</h2>
    <div style='background: {"#27ae60" if plant_oee >= 85 else "#f39c12" if plant_oee >= 65 else "#e74c3c"}; 
         color: white; padding: 20px; border-radius: 8px; text-align: center; font-size: 36px; margin: 10px 0;'>
        Plant OEE: {plant_oee:.1f}%
    </div>
    
    <table style='width: 100%; border-collapse: collapse; margin: 10px 0;'>
    <tr style='background: #34495e; color: white;'>
        <th style='padding: 8px;'>Machine</th>
        <th>Availability</th><th>Performance</th><th>Quality</th>
        <th>OEE</th><th>Good Parts</th><th>Rejects</th>
    </tr>"""
    
    for _, row in oee_df.iterrows():
        color = '#27ae60' if row['avg_oee'] >= 85 else '#f39c12' if row['avg_oee'] >= 65 else '#e74c3c'
        html += f"""
        <tr style='border-bottom: 1px solid #ddd;'>
            <td style='padding: 6px;'>{row['machine_id']}</td>
            <td style='text-align:center;'>{row['avg_availability']:.1f}%</td>
            <td style='text-align:center;'>{row['avg_performance']:.1f}%</td>
            <td style='text-align:center;'>{row['avg_quality']:.1f}%</td>
            <td style='text-align:center; color: {color}; font-weight: bold;'>{row['avg_oee']:.1f}%</td>
            <td style='text-align:center;'>{int(row['total_good']):,}</td>
            <td style='text-align:center; color: #e74c3c;'>{int(row['total_rejects']):,}</td>
        </tr>"""
    
    html += "</table>"
    
    # Downtime section
    html += f"<h2 style='color: #e67e22;'>⏱ Top Downtime Causes ({downtime_df['total_minutes'].sum():.0f} total minutes)</h2><ol>"
    for _, row in downtime_df.iterrows():
        html += f"<li><b>{row['reason_category']}</b>: {row['total_minutes']:.0f} min ({row['event_count']} events, avg {row['avg_duration']:.0f} min each)</li>"
    html += "</ol>"
    
    # Scrap section
    html += f"<h2 style='color: #e74c3c;'>🗑 Scrap Summary (${total_scrap_cost:,.0f} total cost)</h2><ol>"
    for _, row in scrap_df.iterrows():
        html += f"<li><b>{row['scrap_category']}</b>: {int(row['total_qty']):,} units (${row['total_cost']:,.0f})</li>"
    html += "</ol>"
    
    # ML Insights
    if not anomaly_df.empty:
        html += "<h2 style='color: #8e44ad;'>🤖 AI-Detected Anomalies</h2><ul>"
        for _, row in anomaly_df.iterrows():
            causes = json.loads(row['likely_causes']) if isinstance(row['likely_causes'], str) else row['likely_causes']
            html += f"<li><b>[{row['severity']}] {row['machine_id']}</b> - Scrap rate: {row['scrap_rate_pct']:.1f}%<br>"
            html += '<br>'.join(causes if isinstance(causes, list) else [str(causes)])
            html += "</li>"
        html += "</ul>"
    
    html += """<hr><p style='color: #95a5a6; font-size: 12px;'>This report was automatically generated by the Production Analytics Platform. 
    View interactive dashboards at <a href='http://10.200.1.20:3000'>Grafana</a> or 
    <a href='https://app.powerbi.com'>Power BI</a>. Contact your MSP for support.</p>
    </body></html>"""
    
    # Send email
    msg = MIMEMultipart('alternative')
    msg['Subject'] = f'📊 Production Report {yesterday} | Plant OEE: {plant_oee:.1f}%'
    msg['From'] = SMTP_USER
    msg['To'] = ', '.join(RECIPIENTS)
    msg.attach(MIMEText(html, 'html'))
    
    try:
        with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
            server.starttls()
            server.login(SMTP_USER, SMTP_PASS)
            server.send_message(msg)
        logger.info(f'Daily report sent to {RECIPIENTS}')
    except Exception as e:
        logger.error(f'Failed to send report: {e}')
    
    return html

if __name__ == '__main__':
    generate_report()

Node-RED Data Normalization Flow

Type: integration A Node-RED flow deployed on the Advantech edge gateway that subscribes to raw MQTT data from KEPServerEX, normalizes tag names and values into a standardized JSON schema, calculates derived metrics (e.g., incremental part counts from cumulative counters), and publishes normalized data to both the local TimescaleDB and Azure IoT Hub. This flow handles the critical data transformation layer between raw PLC data and the analytics platform.

Implementation:

Node-RED Flow JSON
json
# import via Menu > Import > Clipboard in the Node-RED UI at
# http://10.200.1.10:1880

// Node-RED Flow JSON
// Import via: Menu > Import > Clipboard in Node-RED UI at http://10.200.1.10:1880
// 
// This flow:
// 1. Subscribes to all KEPServerEX MQTT topics (kepware/#)
// 2. Parses and normalizes the data
// 3. Calculates incremental counts from cumulative PLC counters
// 4. Writes to TimescaleDB
// 5. Forwards to Azure IoT Hub

[
    {
        "id": "mqtt-in-kepware",
        "type": "mqtt in",
        "name": "KEPServerEX Data",
        "topic": "kepware/#",
        "qos": "1",
        "datatype": "json",
        "broker": "local-mosquitto",
        "x": 150,
        "y": 200,
        "wires": [["parse-topic"]]
    },
    {
        "id": "local-mosquitto",
        "type": "mqtt-broker",
        "name": "Local Mosquitto",
        "broker": "localhost",
        "port": "1883",
        "clientid": "nodered-edge",
        "autoConnect": true,
        "usetls": false,
        "credentials": {
            "user": "edgeuser",
            "password": "CHANGE_THIS"
        }
    },
    {
        "id": "parse-topic",
        "type": "function",
        "name": "Parse & Normalize",
        "func": "// Parse MQTT topic: kepware/{channel}/{device}/{tag}\nvar parts = msg.topic.split('/');
if (parts.length < 4) return null;\n\nvar channel = parts[1];\nvar device = parts[2];\nvar tag = parts.slice(3).join('/');
\n// Normalize machine_id\nvar machine_id = device; // e.g., 'Press01'\n\n// Determine metric name from tag\nvar metric_name = tag.replace(device + '.', '');\n\n// Extract value\nvar value = msg.payload;\nif (typeof value === 'object' && value.hasOwnProperty('value')) {\n    value = value.value;\n}\nvalue = parseFloat(value);\nif (isNaN(value)) return null;\n\n// Determine unit based on metric name\nvar units = {\n    'CycleTimeActual_ms': 'ms',\n    'SpindleSpeed_RPM': 'RPM',\n    'Temperature_C': '°C',\n    'Vibration_Velocity_mms': 'mm/s',\n    'Vibration_Acceleration_g': 'g',\n    'Pressure_bar': 'bar',\n    'Power_kW': 'kW'\n};\nvar unit = units[metric_name] || 'raw';\n\n// Build normalized message\nmsg.normalized = {\n    timestamp: new Date().toISOString(),\n    machine_id: machine_id,\n    metric_name: metric_name,\n    value: value,\n    unit: unit,\n    source: 'kepware',\n    channel: channel\n};\n\nreturn msg;",
        "outputs": 1,
        "x": 400,
        "y": 200,
        "wires": [["increment-calc", "write-timescaledb", "forward-azure"]]
    },
    {
        "id": "increment-calc",
        "type": "function",
        "name": "Incremental Counter Calc",
        "func": "// Calculate incremental part counts from cumulative PLC counters\n// PLC counters are cumulative; we need incremental counts for analytics\n\nvar metric = msg.normalized.metric_name;\nvar machine = msg.normalized.machine_id;\nvar value = msg.normalized.value;\n\n// Only process counter-type metrics\nvar counterMetrics = ['CycleCount', 'PartCountGood', 'PartCountReject'];\nif (counterMetrics.indexOf(metric) === -1) return null;\n\n// Get previous value from context\nvar key = machine + '_' + metric;\nvar prevValue = context.get(key) || value;\ncontext.set(key, value);\n\n// Calculate increment\nvar increment = value - prevValue;\n\n// Handle counter reset (PLC restart or overflow)\nif (increment < 0) increment = value;\nif (increment === 0) return null; // No change\n\n// Create production event\nvar eventType = 'cycle_complete';\nvar event = {\n    timestamp: msg.normalized.timestamp,\n    machine_id: machine,\n    event_type: eventType,\n    good_count: metric === 'PartCountGood' ? increment : 0,\n    reject_count: metric === 'PartCountReject' ? increment : 0,\n    cycle_time_ms: null\n};\n\nmsg.production_event = event;\nreturn msg;",
        "outputs": 1,
        "x": 650,
        "y": 100,
        "wires": [["write-production-events"]]
    },
    {
        "id": "write-timescaledb",
        "type": "function",
        "name": "Format for TimescaleDB",
        "func": "// Format INSERT for machine_telemetry table\nmsg.query = `INSERT INTO machine_telemetry (time, machine_id, metric_name, value, unit) VALUES ($1, $2, $3, $4, $5)`;\nmsg.params = [\n    msg.normalized.timestamp,\n    msg.normalized.machine_id,\n    msg.normalized.metric_name,\n    msg.normalized.value,\n    msg.normalized.unit\n];\nreturn msg;",
        "outputs": 1,
        "x": 650,
        "y": 200,
        "wires": [["postgres-insert"]]
    },
    {
        "id": "write-production-events",
        "type": "function",
        "name": "Format Production Event",
        "func": "var e = msg.production_event;\nmsg.query = `INSERT INTO production_events (time, machine_id, event_type, good_count, reject_count, cycle_time_ms) VALUES ($1, $2, $3, $4, $5, $6)`;\nmsg.params = [e.timestamp, e.machine_id, e.event_type, e.good_count, e.reject_count, e.cycle_time_ms];\nreturn msg;",
        "outputs": 1,
        "x": 900,
        "y": 100,
        "wires": [["postgres-insert"]]
    },
    {
        "id": "postgres-insert",
        "type": "postgresql",
        "name": "TimescaleDB Insert",
        "query": "",
        "postgreSQLConfig": "timescaledb-config",
        "x": 1100,
        "y": 200,
        "wires": [[]]
    },
    {
        "id": "timescaledb-config",
        "type": "postgreSQLConfig",
        "name": "TimescaleDB",
        "host": "10.200.1.20",
        "port": "5432",
        "database": "production_analytics",
        "ssl": false,
        "credentials": {
            "user": "analytics_app",
            "password": "CHANGE_THIS"
        }
    },
    {
        "id": "forward-azure",
        "type": "function",
        "name": "Format for Azure IoT Hub",
        "func": "// Rate limit: only forward every 5th reading per metric to reduce costs\nvar key = msg.normalized.machine_id + '_' + msg.normalized.metric_name + '_count';\nvar count = context.get(key) || 0;\ncount++;\ncontext.set(key, count);\n\nif (count % 5 !== 0) return null; // Skip 4 out of 5 readings\n\nmsg.payload = JSON.stringify(msg.normalized);\nreturn msg;",
        "outputs": 1,
        "x": 650,
        "y": 300,
        "wires": [["mqtt-out-azure"]]
    },
    {
        "id": "mqtt-out-azure",
        "type": "mqtt out",
        "name": "Azure IoT Hub",
        "topic": "devices/edge-gateway-prod/messages/events/",
        "qos": "1",
        "retain": false,
        "broker": "azure-iot-hub",
        "x": 900,
        "y": 300,
        "wires": []
    },
    {
        "id": "azure-iot-hub",
        "type": "mqtt-broker",
        "name": "Azure IoT Hub",
        "broker": "<iot-hub-name>.azure-devices.net",
        "port": "8883",
        "clientid": "edge-gateway-prod",
        "autoConnect": true,
        "usetls": true,
        "credentials": {
            "user": "<iot-hub-name>.azure-devices.net/edge-gateway-prod/?api-version=2021-04-12",
            "password": "<SAS-TOKEN>"
        }
    }
]

// NOTE: After importing, you must:
// 1. Install node-red-contrib-postgresql via npm in the Node-RED user directory
// 2. Update the Azure IoT Hub broker credentials with actual SAS token
// 3. Update the TimescaleDB credentials
// 4. Deploy the flow and verify data flowing to both destinations
// 5. Monitor the Node-RED debug panel for any errors
// 
// The rate limiting in the Azure forwarding function reduces IoT Hub message
// volume by 80%, keeping costs within the S1 tier allocation while still
// providing sufficient data resolution for cloud analytics.

Testing & Validation

  • DATA PIPELINE END-TO-END: Trigger a manual cycle on one machine. Verify the event appears in KEPServerEX Quick Client within 1 second, in Mosquitto MQTT subscription within 2 seconds, in TimescaleDB machine_telemetry table within 5 seconds, on the Grafana shop-floor dashboard within 10 seconds, and in Azure IoT Hub device telemetry within 30 seconds. Document latency at each stage.
  • OEE CALCULATION ACCURACY: Select one machine and one complete shift (8 hours). Manually calculate OEE from paper production logs using the standard formula (Availability × Performance × Quality). Compare against the system-calculated OEE in the oee_hourly table. Results must match within ±2 percentage points. If they do not match, investigate whether the discrepancy is in availability (downtime tracking), performance (cycle time measurement), or quality (reject counting).
  • SCRAP RATE VALIDATION: For a 24-hour period, compare total reject counts in the analytics system against the client's existing quality tracking system (QC log, ERP scrap report, or manual tally sheet). Counts must match within ±1%. Cross-reference scrap reason codes to ensure they are correctly categorized.
  • DOWNTIME EVENT CAPTURE: Simulate three types of downtime events: (1) stop a machine for 5 minutes and enter reason code 'Tool Change', (2) trigger a fault condition if safely possible, (3) record a planned maintenance stop. Verify all three events appear in the downtime_events table with correct start/end times, durations, and reason codes.
  • ALERT SYSTEM VERIFICATION: Test all configured alert thresholds: (a) Machine down > 15 minutes → verify production supervisor receives email/SMS within 2 minutes; (b) Scrap rate > 5% → verify quality manager alert; (c) OEE < 50% → verify plant manager alert; (d) Vibration exceeds Zone C threshold → verify maintenance alert. Document alert delivery time for each.
  • SENSOR DATA ACCURACY: Compare ifm VVB001 vibration readings against a calibrated reference vibration meter (e.g., Fluke 805). Readings must match within ±10%. Verify temperature readings against an IR thermometer. For Banner wireless sensors, verify signal strength and data delivery rate (should be >99% over a 24-hour period).
  • GRAFANA DASHBOARD FUNCTIONALITY: Verify all dashboard panels load correctly on Samsung tablets. Test: (a) real-time machine status updates within 10 seconds; (b) OEE gauges show correct values; (c) downtime Pareto chart filters work by date range, machine, and shift; (d) scrap rate trend displays correctly; (e) alert notifications appear on dashboard. Test with 3 different tablet devices simultaneously.
  • POWER BI REPORT VALIDATION: Verify Power BI scheduled refresh completes successfully. Check: (a) data gateway connectivity to TimescaleDB; (b) all report pages render correctly; (c) DAX measures calculate OEE, scrap cost, and MTBF correctly; (d) drill-through from summary to detail works; (e) date slicers filter correctly; (f) report renders on mobile Power BI app.
  • ERP DATA INTEGRITY: Verify ERP sync script pulls correct work order data. Compare 10 random work orders between the ERP screen and the analytics database work_orders table. Check: work order number, part number, order quantity, completed quantity, scrap quantity, and material cost. All fields must match exactly.
  • ML MODEL BASELINE TEST: After initial model training (30+ days of data), run the scrap anomaly detector on a known anomaly period (a date when scrap was unusually high). Verify the model flags this period as anomalous. Also run on a known normal period and verify no false positives. Run the downtime classifier on 20 labeled events and verify >70% classification accuracy.
  • NETWORK SECURITY VERIFICATION: Perform the following security tests: (a) attempt to ping the OT VLAN from the corporate network — should be blocked; (b) attempt to initiate a connection from the internet to the edge gateway — should be blocked; (c) verify all MQTT traffic between edge and Azure uses TLS (port 8883); (d) verify no unauthorized devices on the OT VLAN using Nmap scan; (e) verify all user accounts have unique credentials and default passwords are changed.
  • FAILOVER AND RECOVERY: Test system resilience: (a) disconnect internet — verify local Grafana dashboards continue functioning and Mosquitto buffers messages; (b) restart the Dell PowerEdge T360 — verify all services (TimescaleDB, Grafana, cron jobs) auto-start; (c) disconnect one sensor — verify alert fires and other sensors continue; (d) simulate power outage — verify UPS graceful shutdown and no data corruption on restart.
  • DAILY REPORT GENERATION: Trigger the daily insights report generator manually. Verify: (a) email arrives at all configured recipients; (b) OEE summary table shows all machines; (c) downtime Pareto and scrap summary sections are populated; (d) ML anomaly section shows detected anomalies (if any); (e) HTML formatting renders correctly in Outlook and Gmail; (f) all numbers match dashboard values for the same date.

Client Handoff

The client handoff meeting should be a 4-hour session with the plant manager, production supervisors, quality manager, maintenance lead, and IT administrator present. Cover the following topics:

1
System Architecture Overview (30 min): Walk through the data flow from machines → edge gateway → TimescaleDB → dashboards/cloud. Provide a laminated architecture diagram for the server room. Explain what each component does and why it matters.
2
Dashboard Training — Operators (45 min): Demonstrate the Grafana shop-floor dashboards on the Samsung tablets. Train operators on: reading real-time OEE gauges, understanding machine status colors (green/yellow/red), entering downtime reason codes (CRITICAL — emphasize this is the most important operator action), and reviewing scrap alerts. Provide a laminated quick-reference card for each workstation.
3
Dashboard Training — Supervisors & Managers (45 min): Demonstrate Power BI reports. Train on: navigating report pages, using date/shift/machine filters, interpreting Pareto charts for downtime and scrap, reading the daily insights email report, and understanding ML-generated anomaly alerts and yield predictions. Provide a printed Power BI navigation guide.
4
Downtime Reason Code Compliance (30 min): Present the agreed-upon downtime reason code taxonomy. Explain why accurate reason codes are essential for root cause analysis. Establish the expectation that every downtime event > 2 minutes must have a reason code entered within 5 minutes of the event. Define who is responsible for enforcing compliance (production supervisor).
5
Alert Response Procedures (30 min): Document and review the alert escalation matrix: who receives which alerts, expected response times, and escalation paths. Provide a printed alert response procedure document.
6
Ongoing Support & SLA Review (30 min): Review the managed services agreement: MSP response times, monthly reporting, model retraining schedule, and system update cadence. Provide emergency contact information. Schedule the first monthly review meeting.

Documentation to Leave Behind

  • System Architecture Diagram (laminated, posted in server room)
  • Network Topology Diagram with IP addresses and VLAN assignments
  • Tag Dictionary spreadsheet (PLC addresses → MQTT topics → database fields)
  • Downtime Reason Code Taxonomy (laminated, posted at each workstation)
  • Operator Quick-Reference Cards (laminated, one per tablet/workstation)
  • Power BI Navigation Guide (printed, one per manager)
  • Alert Escalation Matrix (printed, posted in supervisor office)
  • Admin Credentials Document (sealed envelope, stored in client's safe)
  • Maintenance Runbook (digital, shared via SharePoint/OneDrive)

Success Criteria to Review Together

Maintenance

Monthly Maintenance Tasks (MSP Responsibility)

1
Dashboard Review & KPI Audit (2 hours/month): Review all Grafana and Power BI dashboards for accuracy. Compare system OEE against any manual tracking the client still performs. Verify all machines are reporting data. Check for and fix any broken dashboard panels or data source connection issues.
2
Data Pipeline Health Check (1 hour/month): Verify Node-RED flows are running without errors. Check Mosquitto MQTT broker logs for connection failures. Verify TimescaleDB disk usage and retention policy execution. Confirm Azure IoT Hub message delivery rates. Review cron job execution logs for MachineMetrics sync and ERP sync scripts.
3
ML Model Performance Review (2 hours/month): Evaluate scrap anomaly detector precision and recall — check for false positives that erode trust and false negatives that miss real issues. Review downtime classifier accuracy against newly labeled events. Compare yield predictions against actuals. Document model drift metrics.

Quarterly Maintenance Tasks

1
ML Model Retraining (4 hours/quarter): Retrain all three ML models with the latest 90 days of data. Evaluate model performance against the previous quarter's metrics. Deploy updated models if accuracy improves; roll back if it degrades. Update the model metadata JSON files.
2
Security Patch Management (2 hours/quarter): Apply OS security updates to the Dell PowerEdge T360 and Advantech EPC-R7300 during a scheduled maintenance window. Update KEPServerEX, Grafana, TimescaleDB, and Node-RED to latest stable versions. Review firewall rules and access logs for anomalies.
3
Hardware Inspection (1 hour/quarter): Check sensor cable connections, edge gateway and server fan operation, UPS battery health, and industrial switch status LEDs. Replace any degraded sensors. Verify NTP time synchronization across all devices.

Annual Maintenance Tasks

1
Comprehensive System Audit (8 hours/year): Full review of architecture, performance, security, and compliance. Update network topology documentation. Review and update downtime reason code taxonomy with client. Assess whether additional machines or production lines should be added.

Monitoring & SLA Considerations

  • Set up automated monitoring alerts (via Grafana, Azure Monitor, or an RMM tool) for: server CPU/memory/disk > 80%, database connection failures, cron job missed executions, sensor data gaps > 15 minutes, Azure IoT Hub throttling.
  • SLA Target: 99.5% data pipeline uptime (measured monthly). Exclude planned maintenance windows.
  • Response Times: Critical alerts (production data loss, server down): 1-hour response during business hours, 4-hour response after hours. Non-critical (dashboard issue, cosmetic): next business day.

Escalation Path

  • Level 1: MSP help desk — dashboard access issues, password resets, simple queries
  • Level 2: MSP senior engineer — data pipeline issues, sensor troubleshooting, network problems
  • Level 3: MSP data engineer / ML specialist — model retraining, new analytics development, database tuning
  • Level 4: Vendor support — KEPServerEX (PTC), MachineMetrics (MM support), Azure (Microsoft), Grafana (Grafana Labs)

Model Retraining Triggers (outside regular schedule)

  • Client adds new machines or production lines
  • Client introduces new product families or materials
  • Major process change (new tooling, new suppliers, layout change)
  • Model accuracy drops below 65% on the downtime classifier or R² drops below 0.5 on the yield predictor
  • Scrap anomaly detector generates >20% false positive rate over a 30-day period

...

Option B: Full MES Platform (TrakSYS or Tulip)

Replace the custom-built analytics stack with a purpose-built Manufacturing Execution System (MES). Use Parsec TrakSYS ($1,999+/month) or Tulip ($1,000+/month) as the primary platform, which provides OEE tracking, scrap tracking, downtime management, quality workflows, and dashboards out of the box. Supplement with Power BI for executive reporting and retain KEPServerEX for legacy machine connectivity. Estimated Year 1 cost: $60,000–$100,000 (MES subscription + hardware + integration labor).

Option D: Existing SCADA Vendor Extension

If the client already has an Ignition SCADA system or Rockwell FactoryTalk deployment, extend the existing platform with analytics modules rather than deploying a separate analytics stack. For Ignition: add the Reporting Module ($2,400), Perspective Module ($3,800) for web dashboards, and SPC Module ($2,400). For FactoryTalk: add FactoryTalk Analytics Edge or FactoryTalk DataFlowML. Retain custom Python ML models for advanced root cause analysis. Estimated Year 1 cost: $20,000–$50,000 (module licenses + configuration labor).

Want early access to the full toolkit?