Early accessSome features may be unavailable
Back to Blog
process miningOCELmanufacturingbottleneck detectionPython

Process Mining with Synthetic Manufacturing Data and OCEL 2.0

Before Six Sigma consultants spend months mapping your processes, let process mining show you where the bottlenecks are. Here is how to do it with VynFi's manufacturing event logs.

VynFi Team · EngineeringApril 11, 20269 min read

Before Six Sigma consultants spend months mapping your processes, process mining can show you where the bottlenecks are in hours. Instead of hand-drawn process models that describe how processes are supposed to work, process mining extracts models directly from event logs — showing how processes actually execute. The gap between the two is usually where the problems are.

VynFi's manufacturing sector generates OCEL 2.0 (Object-Centric Event Logs) where a single event can reference multiple business objects. This is a richer representation than traditional single-case event logs, because real manufacturing processes rarely follow a single thread — one production order consumes multiple materials, one invoice covers multiple receipts, one payment settles multiple invoices.

This tutorial walks through the process mining notebook: load an event log from a synthetic manufacturing dataset, discover process variants, detect bottlenecks, analyze organizational handovers, and export to PM4Py-compatible XES format for further analysis.

**DataSynth 3.1.1 update:** Skip the generation step — the regenerated VynFi/vynfi-ocel-manufacturing and VynFi/vynfi-supply-chain-ocel Hugging Face datasets are already native OCEL 2.0 with microsecond timestamps (pandas-safe, 100% row retention) and realistic variant imperfection rates (rework 15% / skip 10% / out-of-order 8%). Load directly via `datasets.load_dataset("VynFi/vynfi-ocel-manufacturing", "events")`.

Generate Manufacturing Process Data

Request the p2p, s2c (source-to-contract), and manufacturing process models to get the full suite of manufacturing artifacts. The manufacturing model generates OCEL event data alongside the standard document flow files.

Python
import os
from vynfi import VynFi
client = VynFi(api_key=os.environ["VYNFI_API_KEY"])
config = {
"sector": "manufacturing",
"country": "US",
"accountingFramework": "us_gaap",
"rows": 1000,
"companies": 5,
"periods": 3,
"periodLength": "monthly",
"processModels": ["p2p", "s2c", "manufacturing"],
"exportFormat": "json",
"fraudPacks": [],
"fraudRate": 0.0,
}
job = client.jobs.generate_config(config=config)
completed = client.jobs.wait(job.id)
archive = client.jobs.download_archive(completed.id)
# The archive includes:
# - ocel-event-log table (OCEL 2.0 native) if available
# - document_flows/ for reconstruction when native OCEL is absent
# - events/organizational_events.json
# - events/process_evolution_events.json
print("Archive contents:")
for f in archive.files():
print(f" {f}")

Build the Directly-Follows Graph

The directly-follows graph (DFG) is the foundation of process discovery. For each pair of consecutive activities in a case, the DFG counts how many times activity A is directly followed by activity B. This produces the transition matrix from which process models are extracted.

Python
import pandas as pd
def build_dfg(df: pd.DataFrame) -> pd.DataFrame:
"""Build a directly-follows graph from an event log.
For each case, events are sorted by timestamp and consecutive pairs
(A -> B) are counted. Returns DataFrame with columns: source, target, count.
"""
transitions = []
for _case_id, case_events in df.groupby("case_id"):
sorted_events = case_events.sort_values("timestamp")
activities = sorted_events["activity"].tolist()
for i in range(len(activities) - 1):
transitions.append((activities[i], activities[i + 1]))
if not transitions:
return pd.DataFrame(columns=["source", "target", "count"])
dfg = pd.DataFrame(transitions, columns=["source", "target"])
dfg = dfg.groupby(["source", "target"]).size().reset_index(name="count")
return dfg.sort_values("count", ascending=False).reset_index(drop=True)
dfg = build_dfg(events_df)
print("Top 10 transitions in directly-follows graph:")
print(dfg.head(10).to_string(index=False))

Extract Process Variants

A process variant is the unique sequence of activities observed for a case. The most common variant is the happy path — the intended process. Everything else is a deviation. Variant analysis answers three questions: what is the happy path, how much deviation exists, and what causes deviations.

Python
from collections import Counter
def extract_variants(df: pd.DataFrame) -> pd.DataFrame:
"""Extract the activity sequence (variant) for each case."""
case_variants = {}
for case_id, case_events in df.groupby("case_id"):
sorted_acts = case_events.sort_values("timestamp")["activity"].tolist()
case_variants[case_id] = tuple(sorted_acts)
variant_counts = Counter(case_variants.values())
total = sum(variant_counts.values())
rows, cumulative = [], 0.0
for variant, count in variant_counts.most_common():
pct = count / total * 100
cumulative += pct
rows.append({"variant": variant, "count": count,
"pct": round(pct, 1), "cumulative_pct": round(cumulative, 1),
"length": len(variant)})
return pd.DataFrame(rows)
variants_df = extract_variants(events_df)
print(f"Total unique variants: {len(variants_df)}")
print(f"Total cases: {variants_df['count'].sum()}")
# Happy path
happy = variants_df.iloc[0]
print(f"
Happy path ({happy['count']} cases, {happy['pct']}%):")
print(f" {' -> '.join(happy['variant'])}")
# Conformance rates
for n in [1, 3, 5]:
cases_in_top_n = variants_df.head(n)["count"].sum()
rate = cases_in_top_n / variants_df["count"].sum() * 100
print(f"Conformance rate (top {n} variant{'s' if n > 1 else ''}): {rate:.1f}%")

Low conformance rates (below 50% in the top 5 variants) suggest an uncontrolled process where many ad-hoc paths exist. In manufacturing, this often indicates bypassed approval steps, rework loops, or exceptions that were handled informally rather than through the standard workflow.

Bottleneck Detection

Bottlenecks are transitions where cases spend disproportionate time waiting. The compute_waiting_times function calculates the sojourn time between each consecutive pair of activities in a case, aggregated across all cases to find the transitions with the highest mean wait.

Python
def compute_waiting_times(df: pd.DataFrame) -> pd.DataFrame:
"""Compute waiting time between consecutive activities in each case.
Returns a DataFrame with columns: source, target, waiting_hours.
"""
records = []
for _case_id, case_events in df.groupby("case_id"):
sorted_events = case_events.sort_values("timestamp")
timestamps = sorted_events["timestamp"].tolist()
activities = sorted_events["activity"].tolist()
for i in range(len(activities) - 1):
dt = (timestamps[i + 1] - timestamps[i]).total_seconds() / 3600
records.append({"source": activities[i],
"target": activities[i + 1],
"waiting_hours": dt})
return pd.DataFrame(records)
waiting_df = compute_waiting_times(events_df)
transition_stats = waiting_df.groupby(["source", "target"])["waiting_hours"].agg(
["mean", "median", "count"]
).reset_index()
transition_stats.columns = ["source", "target", "mean_h", "median_h", "count"]
transition_stats = transition_stats.sort_values("mean_h", ascending=False)
print("Top 10 bottleneck transitions by mean waiting time:")
print(transition_stats.head(10).to_string(index=False, float_format="%.1f"))

In manufacturing P2P data, the GR-to-invoice transition is typically the largest bottleneck — goods are received but vendor invoices take days or weeks to arrive. The approval-to-payment transition is often the second largest, reflecting payment terms and treasury cycle times. These are the transitions where process improvement has the highest leverage.

Case Duration Analysis

Case-level statistics — duration from first to last event, event count, and completion rate — reveal outliers at the process instance level rather than the transition level. Cases with unusually long durations are potential bottleneck victims. Cases with unusually high event counts may have rework loops. Cases that never reach a terminal activity represent process leakage.

Python
case_stats = events_df.groupby("case_id").agg(
first_event=("timestamp", "min"),
last_event=("timestamp", "max"),
event_count=("event_id", "count"),
unique_activities=("activity", "nunique"),
first_activity=("activity", "first"),
last_activity=("activity", "last"),
)
case_stats["duration_hours"] = (
(case_stats["last_event"] - case_stats["first_event"]).dt.total_seconds() / 3600
)
print(f"Total cases: {len(case_stats)}")
print(f"
Duration statistics (hours):")
print(case_stats["duration_hours"].describe().round(2))
print("
5 slowest cases:")
for case_id, row in case_stats.nlargest(5, "duration_hours").iterrows():
print(f" {case_id}: {row['duration_hours']:.1f}h | "
f"{row['event_count']} events | "
f"{row['first_activity']} -> {row['last_activity']}")

Organizational Mining

The handover-of-work analysis builds a matrix showing how many times each resource hands work to every other resource in the same case. Dense handover patterns between two resources indicate either a strong working relationship or a dependency that could become a bottleneck when one resource is unavailable.

Python
def build_handover_matrix(df: pd.DataFrame) -> pd.DataFrame:
"""Build a resource handover matrix from an event log."""
handovers = []
for _case_id, case_events in df.groupby("case_id"):
sorted_events = case_events.sort_values("timestamp")
resources = sorted_events["resource"].tolist()
for i in range(len(resources) - 1):
if resources[i] != resources[i + 1]: # only count actual handovers
handovers.append((resources[i], resources[i + 1]))
if not handovers:
return pd.DataFrame(columns=["from_resource", "to_resource", "count"])
ho_df = pd.DataFrame(handovers, columns=["from_resource", "to_resource"])
return (ho_df.groupby(["from_resource", "to_resource"])
.size().reset_index(name="count")
.sort_values("count", ascending=False))
if "resource" in events_df.columns:
handovers = build_handover_matrix(events_df)
print("Top 15 handover patterns:")
print(handovers.head(15).to_string(index=False))

Export for Process Mining Tools

For production process mining, dedicated tools provide algorithms that go beyond what you can build from scratch in pandas. VynFi event data can be exported to three formats: XES (for PM4Py, ProM, and Disco), CSV (for Celonis and ARIS), and OCEL 2.0 JSON (for PM4Py's object-centric mining module).

Python
def to_xes(df: pd.DataFrame, output_path: str) -> None:
"""Export a pandas event log to XES format (IEEE 1849-2016)."""
lines = [
'<?xml version="1.0" encoding="UTF-8"?>',
'<log xes.version="2.0" xmlns="http://www.xes-standard.org/">',
' <extension name="Concept" prefix="concept" uri="http://www.xes-standard.org/concept.xesext"/>',
' <extension name="Time" prefix="time" uri="http://www.xes-standard.org/time.xesext"/>',
' <extension name="Organizational" prefix="org" uri="http://www.xes-standard.org/org.xesext"/>',
]
for case_id, case_events in df.groupby("case_id"):
lines.append(f' <trace>')
lines.append(f' <string key="concept:name" value="{case_id}"/>')
for _, event in case_events.sort_values("timestamp").iterrows():
ts = event["timestamp"].isoformat() if hasattr(event["timestamp"], "isoformat") else str(event["timestamp"])
lines.append(f' <event>')
lines.append(f' <string key="concept:name" value="{event["activity"]}"/>')
lines.append(f' <date key="time:timestamp" value="{ts}"/>')
if "resource" in event.index and pd.notna(event.get("resource")):
lines.append(f' <string key="org:resource" value="{event["resource"]}"/>')
lines.append(f' </event>')
lines.append(f' </trace>')
lines.append('</log>')
with open(output_path, "w", encoding="utf-8") as f:
f.write("
".join(lines))
print(f"Exported {len(df)} events to {output_path}")
to_xes(events_df, "vynfi_manufacturing.xes")
# If PM4Py is installed, discover a Petri net directly
try:
import pm4py
pm4py_df = events_df.rename(columns={
"case_id": "case:concept:name",
"activity": "concept:name",
"timestamp": "time:timestamp",
})
net, im, fm = pm4py.discover_petri_net_inductive(pm4py_df)
fitness = pm4py.fitness_token_based_replay(pm4py_df, net, im, fm)
print(f"Petri net: {len(net.places)} places, {len(net.transitions)} transitions")
print(f"Fitness: {fitness['average_trace_fitness']:.3f}")
except ImportError:
print("Install pm4py for Petri net discovery: pip install pm4py")

Set fraudRate > 0 in your generation config and enable process mining on the resulting data. Fraudulent process variants — unusual activity sequences, bypassed approval steps, or entries posted by resources outside their normal role — will appear as low-frequency deviants from the happy path. Process mining surfaces these anomalies without requiring labeled data.

Next Steps

The manufacturing notebook covers temporal drift detection (how process behavior changes across the three monthly periods), role discovery (clustering resources by the activities they perform), and object-centric interaction analysis (which object types appear together in the same case). For production deployments, the Celonis EMS and Minit connectors accept the CSV export format directly, with the case_id, activity, and timestamp columns mapping to their standard schema.

The full notebook is available at 05_process_mining_ocel.ipynb in the VynFi Python SDK repository. It includes PM4Py integration, XES/OCEL 2.0 export, and organizational mining with handover matrices.

Ready to try VynFi?

Start generating synthetic financial data with 10,000 free credits. No credit card required.