运用数据分析优化供应链管理
供应链(Supply Chain)是指围绕核心企业,通过对物流、信息流、资金流的控制,从采购原材料开始,制成中间产品和最终产品,最后由销售网络把产品送到消费者手中的将供应商、制造商、分销商、零售商直到最终用户连成一个整体的功能网链结构。
一条完整的供应链通常包含以下五个核心环节:
# 用字典模拟一条简化供应链的各环节数据
supply_chain = {
"采购": {"供应商数量": 12, "平均采购周期(天)": 7, "月采购额(万元)": 350},
"生产": {"产线数量": 5, "日均产量(件)": 2000, "良品率": 0.97},
"仓储": {"仓库数量": 3, "总库容(平方米)": 15000, "库存周转天数": 25},
"物流": {"合作物流商": 6, "平均配送时效(天)": 3, "准时率": 0.95},
"销售": {"分销渠道": 4, "月均订单量": 8500, "客户满意度": 0.92}
}
# 遍历输出各环节关键指标
for stage, metrics in supply_chain.items():
print(f"\n【{stage}环节】")
for key, value in metrics.items():
print(f" {key}: {value}")
供应链管理(Supply Chain Management, SCM)的核心目标是在满足客户需求的前提下,实现整个供应链的成本最小化和效率最大化。具体来说,供应链管理追求以下关键目标:
供应链中产生的数据种类繁多,按照数据来源和用途可以分为以下几类:
| 数据类型 | 典型数据 | 分析用途 |
|---|---|---|
| 交易数据 | 订单、发票、付款记录 | 销售趋势、收入分析 |
| 库存数据 | 库存水平、出入库记录 | 库存优化、缺货预警 |
| 物流数据 | 运输轨迹、配送时效 | 物流效率、成本控制 |
| 供应商数据 | 交货记录、质量报告 | 供应商评估与选择 |
| 市场数据 | 需求预测、价格波动 | 需求规划、采购决策 |
| 生产数据 | 产能利用率、良品率 | 生产优化、质量控制 |
import pandas as pd
# 模拟供应链各环节数据
data = {
"日期": pd.date_range("2025-01-01", periods=10, freq="D"),
"订单量": [120, 135, 98, 150, 142, 160, 175, 130, 145, 168],
"库存量": [500, 470, 430, 460, 420, 380, 350, 330, 310, 280],
"发货量": [110, 130, 105, 145, 138, 155, 170, 128, 140, 162],
"退货量": [5, 8, 3, 10, 6, 7, 12, 4, 9, 8]
}
df = pd.DataFrame(data)
print("=== 供应链数据概览 ===")
print(df.describe())
print(f"\n日均订单量: {df['订单量'].mean():.1f}")
print(f"日均发货量: {df['发货量'].mean():.1f}")
print(f"平均退货率: {df['退货量'].sum() / df['订单量'].sum() * 100:.2f}%")
在数字化时代,数据分析已成为供应链管理的核心驱动力。通过系统地收集、处理和分析供应链数据,企业能够获得以下关键价值:
供应链数据分析中,以下三个核心指标是衡量供应链健康度的关键:
库存周转率衡量企业在一定时期内库存资产的周转速度,反映库存管理的效率。计算公式为:
库存周转率 = 销售成本 / 平均库存
库存周转率越高,说明库存变现速度越快,资金利用效率越高。但过高的周转率也可能意味着库存不足,存在缺货风险。
订单满足率(Fill Rate)衡量企业能够按时、按量满足客户订单的能力。计算公式为:
订单满足率 = 完全满足的订单数 / 总订单数 x 100%
高订单满足率意味着客户需求得到充分满足,有助于提升客户满意度和忠诚度。
准时交付率(On-Time Delivery Rate, OTDR)衡量企业按时交付订单的能力。计算公式为:
准时交付率 = 准时交付的订单数 / 总订单数 x 100%
# 计算供应链核心KPI指标
import pandas as pd
# 模拟月度数据
monthly_data = {
"月份": ["1月", "2月", "3月", "4月", "5月", "6月"],
"销售成本(万元)": [280, 310, 295, 340, 365, 390],
"平均库存(万元)": [180, 175, 185, 190, 178, 170],
"总订单数": [850, 920, 880, 960, 1010, 1050],
"完全满足订单数": [780, 860, 815, 910, 965, 1000],
"准时交付订单数": [800, 875, 840, 920, 980, 1020]
}
df = pd.DataFrame(monthly_data)
# 计算各月KPI
df["库存周转率"] = df["销售成本(万元)"] / df["平均库存(万元)"]
df["订单满足率(%)"] = df["完全满足订单数"] / df["总订单数"] * 100
df["准时交付率(%)"] = df["准时交付订单数"] / df["总订单数"] * 100
print("=== 供应链KPI月度报表 ===")
print(df[["月份", "库存周转率", "订单满足率(%)", "准时交付率(%)"]].to_string(index=False))
print(f"\n半年平均库存周转率: {df['库存周转率'].mean():.2f}")
print(f"半年平均订单满足率: {df['订单满足率(%)'].mean():.1f}%")
print(f"半年平均准时交付率: {df['准时交付率(%)'].mean():.1f}%")
库存管理是供应链管理的核心环节之一。合理的库存水平既能保证生产和销售的连续性,又能最大限度地降低库存持有成本。库存管理面临的核心矛盾是:库存过多会导致资金占用和仓储成本增加,库存过少则可能导致缺货和销售损失。
ABC分类法(又称帕累托分析法)是库存管理中最常用的分类方法。它基于"二八定律",将库存物品按其价值贡献分为三类:
import pandas as pd
import numpy as np
# 模拟50个SKU的库存数据
np.random.seed(42)
n_skus = 50
skus = [f"SKU-{str(i).zfill(3)}" for i in range(1, n_skus + 1)]
unit_cost = np.random.uniform(5, 500, n_skus) # 单价 5~500元
annual_demand = np.random.randint(50, 5000, n_skus) # 年需求量
df = pd.DataFrame({
"SKU": skus,
"单价(元)": np.round(unit_cost, 2),
"年需求量": annual_demand
})
df["年消耗金额(元)"] = df["单价(元)"] * df["年需求量"]
df = df.sort_values("年消耗金额(元)", ascending=False).reset_index(drop=True)
df["累计金额占比(%)"] = df["年消耗金额(元)"].cumsum() / df["年消耗金额(元)"].sum() * 100
# ABC分类
def abc_classify(cum_pct):
if cum_pct <= 80:
return "A"
elif cum_pct <= 95:
return "B"
else:
return "C"
df["ABC分类"] = df["累计金额占比(%)"].apply(abc_classify)
print("=== ABC分类结果 ===")
for cls in ["A", "B", "C"]:
subset = df[df["ABC分类"] == cls]
print(f"\n{cls}类物品: {len(subset)}个SKU ({len(subset)/n_skus*100:.0f}%), "
f"金额占比: {subset['年消耗金额(元)'].sum()/df['年消耗金额(元)'].sum()*100:.1f}%")
print(subset[["SKU", "单价(元)", "年需求量", "年消耗金额(元)"]].head(5).to_string(index=False))
安全库存(Safety Stock)是为应对需求波动和供应不确定性而额外持有的库存量。合理设置安全库存是平衡缺货风险和库存成本的关键。
安全库存的经典计算公式为:
安全库存 = Z x σd x √L
其中,Z为服务水平对应的标准正态分布分位数,σd为日需求标准差,L为平均补货提前期(天)。
| 服务水平 | Z值 | 缺货概率 |
|---|---|---|
| 90% | 1.28 | 10% |
| 95% | 1.65 | 5% |
| 97.5% | 1.96 | 2.5% |
| 99% | 2.33 | 1% |
| 99.9% | 3.09 | 0.1% |
import numpy as np
from scipy import stats
# 安全库存计算函数
def calculate_safety_stock(daily_demand_std, lead_time_days, service_level=0.95):
"""
计算安全库存
:param daily_demand_std: 日需求标准差
:param lead_time_days: 平均补货提前期(天)
:param service_level: 目标服务水平
:return: 安全库存量
"""
z_score = stats.norm.ppf(service_level)
safety_stock = z_score * daily_demand_std * np.sqrt(lead_time_days)
return round(safety_stock), z_score
# 不同服务水平下的安全库存
daily_demand_std = 30 # 日需求标准差:30件
lead_time = 7 # 补货提前期:7天
print(f"日需求标准差: {daily_demand_std}件, 补货提前期: {lead_time}天\n")
print("=== 不同服务水平下的安全库存 ===")
for sl in [0.90, 0.95, 0.975, 0.99, 0.999]:
ss, z = calculate_safety_stock(daily_demand_std, lead_time, sl)
print(f"服务水平 {sl*100:.1f}% (Z={z:.2f}): 安全库存 = {ss}件")
# 计算再订货点
avg_daily_demand = 150
reorder_point = avg_daily_demand * lead_time
ss_95, _ = calculate_safety_stock(daily_demand_std, lead_time, 0.95)
print(f"\n再订货点(95%服务水平) = {reorder_point} + {ss_95} = {reorder_point + ss_95}件")
库存周转分析是评估库存管理效率的重要手段。通过跟踪库存周转率、周转天数等指标,可以判断库存管理是否健康,并识别需要优化的品类。
import pandas as pd
import numpy as np
# 模拟12个月的库存数据
months = [f"{m}月" for m in range(1, 13)]
sales_cost = [280, 310, 295, 340, 365, 390, 410, 385, 420, 445, 460, 480]
avg_inventory = [180, 175, 185, 190, 178, 170, 165, 172, 160, 155, 150, 148]
df = pd.DataFrame({
"月份": months,
"销售成本(万元)": sales_cost,
"平均库存(万元)": avg_inventory
})
# 计算周转率和周转天数
df["库存周转率"] = df["销售成本(万元)"] / df["平均库存(万元)"]
df["库存周转天数"] = 365 / df["库存周转率"]
print("=== 月度库存周转分析 ===")
print(df.to_string(index=False))
print(f"\n年度平均库存周转率: {df['库存周转率'].mean():.2f}次")
print(f"年度平均库存周转天数: {df['库存周转天数'].mean():.1f}天")
# 趋势分析
q1_turn = df.loc[0:2, "库存周转率"].mean()
q4_turn = df.loc[9:11, "库存周转率"].mean()
print(f"\nQ1平均周转率: {q1_turn:.2f}次 → Q4平均周转率: {q4_turn:.2f}次")
print(f"周转率提升: {(q4_turn - q1_turn) / q1_turn * 100:.1f}%")
缺货(Stockout)是指客户需求发生时库存不足以满足需求的情况。缺货不仅会导致直接的销售损失,还会影响客户满意度和品牌声誉。缺货分析的目标是识别缺货原因、评估缺货影响,并制定预防措施。
import pandas as pd
# 模拟缺货事件记录
stockout_data = {
"日期": ["2025-01-15", "2025-02-03", "2025-02-20", "2025-03-08", "2025-03-25",
"2025-04-12", "2025-05-01", "2025-05-18", "2025-06-05", "2025-06-22"],
"SKU": ["SKU-001", "SKU-015", "SKU-003", "SKU-008", "SKU-001",
"SKU-022", "SKU-001", "SKU-015", "SKU-003", "SKU-030"],
"缺货数量": [50, 30, 80, 25, 45, 60, 35, 40, 70, 20],
"缺货原因": ["需求突增", "供应商延迟", "需求突增", "预测偏差", "需求突增",
"供应商延迟", "质量问题", "供应商延迟", "需求突增", "预测偏差"],
"损失金额(元)": [5000, 3000, 8000, 2500, 4500, 6000, 3500, 4000, 7000, 2000]
}
df = pd.DataFrame(stockout_data)
# 按原因统计
print("=== 缺货原因分析 ===")
reason_stats = df.groupby("缺货原因").agg(
发生次数=("缺货数量", "count"),
平均缺货量=("缺货数量", "mean"),
总损失金额=("损失金额(元)", "sum")
).sort_values("总损失金额", ascending=False)
print(reason_stats)
# 按SKU统计
print("\n=== 高频缺货SKU ===")
sku_stats = df.groupby("SKU").agg(
缺货次数=("缺货数量", "count"),
总缺货量=("缺货数量", "sum"),
总损失=("损失金额(元)", "sum")
).sort_values("总损失", ascending=False)
print(sku_stats)
print(f"\n总缺货损失: {df['损失金额(元)'].sum():,}元")
print(f"平均每次缺货损失: {df['损失金额(元)'].mean():,.0f}元")
本节将综合运用前面学到的知识,通过一个完整的Python实战案例,对库存数据进行全面分析,包括ABC分类、安全库存计算、周转分析和缺货预警。
import pandas as pd
import numpy as np
from scipy import stats
# ========== 综合库存分析实战 ==========
# 1. 生成模拟数据
np.random.seed(42)
products = [f"产品-{chr(65+i)}" for i in range(20)]
df = pd.DataFrame({
"产品": products,
"单价(元)": np.round(np.random.uniform(10, 300, 20), 2),
"月均需求量": np.random.randint(100, 3000, 20),
"当前库存": np.random.randint(50, 5000, 20),
"日需求标准差": np.random.randint(5, 80, 20),
"补货提前期(天)": np.random.choice([3, 5, 7, 10, 14], 20)
})
# 2. 计算关键指标
df["月消耗金额"] = df["单价(元)"] * df["月均需求量"]
df = df.sort_values("月消耗金额", ascending=False).reset_index(drop=True)
df["累计金额占比"] = df["月消耗金额"].cumsum() / df["月消耗金额"].sum()
# ABC分类
df["ABC分类"] = df["累计金额占比"].apply(lambda x: "A" if x <= 0.8 else ("B" if x <= 0.95 else "C"))
# 安全库存(95%服务水平)
df["安全库存"] = df.apply(
lambda r: round(1.65 * r["日需求标准差"] * np.sqrt(r["补货提前期(天)"])), axis=1
)
# 再订货点
df["日均需求"] = df["月均需求量"] / 30
df["再订货点"] = (df["日均需求"] * df["补货提前期(天)"] + df["安全库存"]).round(0)
# 缺货预警
df["库存状态"] = df.apply(
lambda r: "⚠️ 库存不足" if r["当前库存"] < r["再订货点"] else "✅ 正常", axis=1
)
# 3. 输出分析报告
print("=" * 60)
print(" 库存综合分析报告")
print("=" * 60)
print(f"\n总SKU数: {len(df)}")
print(f"总库存金额: {df['单价(元)'].sum():,.0f}元")
print(f"A类产品: {len(df[df['ABC分类']=='A'])}个, "
f"B类产品: {len(df[df['ABC分类']=='B'])}个, "
f"C类产品: {len(df[df['ABC分类']=='C'])}个")
print("\n--- 缺货预警产品 ---")
alert = df[df["库存状态"].str.contains("不足")]
if len(alert) > 0:
print(alert[["产品", "当前库存", "再订货点", "ABC分类"]].to_string(index=False))
else:
print("所有产品库存正常")
print("\n--- TOP5 高库存周转产品 ---")
df["月周转率"] = df["月均需求量"] / df["当前库存"]
print(df.nlargest(5, "月周转率")[["产品", "月周转率", "ABC分类"]].to_string(index=False))
需求预测是供应链管理中最重要的分析任务之一。准确的需求预测能够帮助企业合理安排生产计划、优化库存水平、降低运营成本。需求预测的方法大致可以分为两类:
时间序列(Time Series)是按时间顺序排列的一系列数据点。在供应链中,销售数据、库存数据、需求数据等都是典型的时间序列数据。时间序列分析的核心思想是:利用历史数据中的模式和规律来预测未来值。
时间序列通常包含以下四种成分:
import pandas as pd
import numpy as np
# 生成模拟的24个月销售时间序列数据
np.random.seed(42)
months = pd.date_range("2024-01-01", periods=24, freq="MS")
# 构建时间序列:趋势 + 季节 + 随机
trend = np.linspace(100, 200, 24) # 线性上升趋势
seasonal = 30 * np.sin(2 * np.pi * np.arange(24) / 12) # 年度季节性
noise = np.random.normal(0, 10, 24) # 随机噪声
sales = trend + seasonal + noise
sales = np.maximum(sales, 0) # 确保非负
df = pd.DataFrame({"月份": months, "实际销量": np.round(sales, 0)})
df["月份标签"] = df["月份"].dt.strftime("%Y-%m")
print("=== 24个月销售数据 ===")
for _, row in df.iterrows():
bar = "█" * int(row["实际销量"] / 5)
print(f"{row['月份标签']} {row['实际销量']:6.0f} {bar}")
# 基本统计
print(f"\n均值: {df['实际销量'].mean():.1f}")
print(f"标准差: {df['实际销量'].std():.1f}")
print(f"最小值: {df['实际销量'].min():.0f} (月份: {df.loc[df['实际销量'].idxmin(), '月份标签']})")
print(f"最大值: {df['实际销量'].max():.0f} (月份: {df.loc[df['实际销量'].idxmax(), '月份标签']})")
移动平均法(Moving Average)是最简单直观的时间序列预测方法。它通过计算最近N个时期的平均值来预测下一期的值。移动平均法能够平滑短期波动,突出数据的趋势变化。
常用的移动平均方法包括:
import pandas as pd
import numpy as np
# 模拟12周销售数据
weeks = list(range(1, 13))
sales = [200, 215, 210, 230, 225, 240, 235, 250, 260, 255, 270, 280]
df = pd.DataFrame({"周次": weeks, "实际销量": sales})
# 简单移动平均(3期)
df["SMA_3"] = df["实际销量"].rolling(window=3).mean().round(1)
# 简单移动平均(5期)
df["SMA_5"] = df["实际销量"].rolling(window=5).mean().round(1)
# 加权移动平均(3期,权重:3, 2, 1)
def weighted_ma(series, weights):
result = []
for i in range(len(series)):
if i < len(weights) - 1:
result.append(np.nan)
else:
wma = sum(series[i-j] * w for j, w in enumerate(reversed(weights))) / sum(weights)
result.append(round(wma, 1))
return result
df["WMA_3"] = weighted_ma(sales, [1, 2, 3])
print("=== 移动平均预测对比 ===")
print(df.to_string(index=False))
# 计算预测误差(MAE)
actual = df["实际销量"].values
sma3_pred = df["SMA_3"].values
wma3_pred = df["WMA_3"].values
# 从第4周开始有SMA_3预测值
sma3_mae = np.nanmean(np.abs(actual[3:] - sma3_pred[3:]))
wma3_mae = np.nanmean(np.abs(actual[3:] - wma3_pred[3:]))
print(f"\nSMA(3) 平均绝对误差: {sma3_mae:.1f}")
print(f"WMA(3) 平均绝对误差: {wma3_mae:.1f}")
# 预测第13周
next_sma3 = df["实际销量"].iloc[-3:].mean()
print(f"\n第13周预测(SMA-3): {next_sma3:.1f}")
指数平滑法(Exponential Smoothing)是对移动平均法的改进。它对不同时期的数据赋予指数递减的权重,越近期的数据权重越大。指数平滑法比移动平均法更加灵活,常用的变体包括:
一次指数平滑的公式为:
St = alpha x Yt + (1 - alpha) x St-1
其中,alpha为平滑系数(0 < alpha < 1),Yt为第t期的实际值,St为第t期的平滑值。
import numpy as np
# 一次指数平滑实现
def simple_exponential_smoothing(data, alpha, forecast_periods=3):
"""
一次指数平滑
:param data: 历史数据列表
:param alpha: 平滑系数 (0~1)
:param forecast_periods: 预测期数
:return: 平滑值列表, 预测值列表
"""
smoothed = [data[0]] # 初始平滑值 = 第一个实际值
for t in range(1, len(data)):
s = alpha * data[t] + (1 - alpha) * smoothed[t-1]
smoothed.append(round(s, 1))
# 预测:所有未来期的预测值 = 最后一个平滑值
forecasts = [smoothed[-1]] * forecast_periods
return smoothed, forecasts
# 12个月的销售数据
sales = [200, 215, 210, 230, 225, 240, 235, 250, 260, 255, 270, 280]
# 不同alpha值的效果对比
print("=== 不同alpha值的指数平滑结果 ===")
for alpha in [0.1, 0.3, 0.5, 0.7]:
smoothed, forecasts = simple_exponential_smoothing(sales, alpha, 3)
mae = np.mean(np.abs(np.array(sales) - np.array(smoothed)))
print(f"\nalpha = {alpha}, MAE = {mae:.1f}")
print(f" 平滑值: {smoothed}")
print(f" 未来3期预测: {forecasts}")
# 使用statsmodels的Holt-Winters方法
print("\n=== 使用statsmodels进行Holt-Winters预测 ===")
from statsmodels.tsa.holtwinters import ExponentialSmoothing
import pandas as pd
# 转换为时间序列
ts = pd.Series(sales, index=pd.date_range("2025-01", periods=12, freq="MS"))
# 拟合Holt-Winters模型(加法趋势 + 加法季节)
model = ExponentialSmoothing(ts, trend="add", seasonal="add", seasonal_periods=12)
fit = model.fit()
forecast = fit.forecast(3)
print(f"拟合参数: alpha={fit.params['smoothing_level']:.3f}, "
f"beta={fit.params['smoothing_trend']:.3f}, "
f"gamma={fit.params['smoothing_seasonal']:.3f}")
for date, val in forecast.items():
print(f" {date.strftime('%Y-%m')}: {val:.1f}")
本节将综合运用多种预测方法,对一个完整的销售数据集进行需求预测实战,包括数据探索、模型训练、预测评估和结果可视化。
import pandas as pd
import numpy as np
from statsmodels.tsa.holtwinters import ExponentialSmoothing
from sklearn.metrics import mean_absolute_error, mean_absolute_percentage_error
# ========== 需求预测完整实战 ==========
# 1. 准备数据:36个月的历史销售数据
np.random.seed(42)
periods = 36
dates = pd.date_range("2023-01-01", periods=periods, freq="MS")
# 构建:趋势 + 季节 + 噪声
trend = np.linspace(500, 900, periods)
seasonal = 100 * np.sin(2 * np.pi * np.arange(periods) / 12)
noise = np.random.normal(0, 30, periods)
sales = np.round(trend + seasonal + noise, 0)
sales = np.maximum(sales, 100)
df = pd.DataFrame({"日期": dates, "销量": sales})
# 2. 划分训练集和测试集
train = df.iloc[:-6].copy() # 前30个月
test = df.iloc[-6:].copy() # 后6个月
print(f"训练集: {train['日期'].iloc[0].strftime('%Y-%m')} ~ "
f"{train['日期'].iloc[-1].strftime('%Y-%m')} ({len(train)}个月)")
print(f"测试集: {test['日期'].iloc[0].strftime('%Y-%m')} ~ "
f"{test['日期'].iloc[-1].strftime('%Y-%m')} ({len(test)}个月)")
# 3. 模型训练与预测
train_ts = pd.Series(train["销量"].values, index=train["日期"])
# 方法1: 简单移动平均
sma_forecast = [train["销量"].iloc[-3:].mean()] * 6
# 方法2: 一次指数平滑
ses_model = ExponentialSmoothing(train_ts, trend=None, seasonal=None)
ses_fit = ses_model.fit()
ses_forecast = ses_model.fit(smoothing_level=0.3).forecast(6).values
# 方法3: Holt-Winters
hw_model = ExponentialSmoothing(train_ts, trend="add", seasonal="add", seasonal_periods=12)
hw_fit = hw_model.fit()
hw_forecast = hw_fit.forecast(6).values
# 4. 模型评估
actual = test["销量"].values
results = {
"简单移动平均": sma_forecast,
"一次指数平滑": ses_forecast,
"Holt-Winters": hw_forecast
}
print("\n=== 预测模型评估 ===")
print(f"{'方法':<16} {'MAE':>8} {'MAPE(%)':>10}")
print("-" * 38)
best_mape = float("inf")
best_method = ""
for method, pred in results.items():
mae = mean_absolute_error(actual, pred)
mape = mean_absolute_percentage_error(actual, pred) * 100
marker = " *" if mape < best_mape else ""
if mape < best_mape:
best_mape = mape
best_method = method
print(f"{method:<16} {mae:>8.1f} {mape:>10.1f}{marker}")
print(f"\n最佳模型: {best_method} (MAPE={best_mape:.1f}%)")
# 5. 使用最佳模型预测未来6个月
print("\n=== 未来6个月需求预测(Holt-Winters) ===")
full_ts = pd.Series(df["销量"].values, index=df["日期"])
final_model = ExponentialSmoothing(full_ts, trend="add", seasonal="add", seasonal_periods=12)
final_fit = final_model.fit()
future_forecast = final_fit.forecast(6)
for date, val in future_forecast.items():
print(f" {date.strftime('%Y-%m')}: {val:.0f}件")
物流是供应链中连接生产和消费的关键环节。物流数据分析的核心目标是优化运输成本、提高配送效率、降低物流损耗。物流数据分析通常关注以下关键维度:
import pandas as pd
import numpy as np
# 模拟物流运输数据
np.random.seed(42)
n = 200
routes = ["广州-北京", "广州-上海", "广州-成都", "广州-武汉", "广州-杭州"]
modes = ["公路", "铁路", "航空"]
df = pd.DataFrame({
"运单号": [f"WB{str(i).zfill(6)}" for i in range(1, n+1)],
"线路": np.random.choice(routes, n),
"运输方式": np.random.choice(modes, n, p=[0.5, 0.3, 0.2]),
"距离(km)": np.random.randint(500, 2500, n),
"运输成本(元)": np.random.randint(200, 3000, n),
"实际时效(天)": np.random.randint(1, 8, n),
"标准时效(天)": np.random.choice([2, 3, 5, 7], n),
"货损": np.random.choice([0, 0, 0, 0, 0, 1], n) # 5%货损率
})
# 计算单位成本和时效达标率
df["单位成本(元/km)"] = (df["运输成本(元)"] / df["距离(km)"]).round(2)
df["是否准时"] = df["实际时效(天)"] <= df["标准时效(天)"]
# 1. 按运输方式分析
print("=== 按运输方式分析 ===")
mode_stats = df.groupby("运输方式").agg(
运单数=("运单号", "count"),
平均成本=("运输成本(元)", "mean"),
平均单位成本=("单位成本(元/km)", "mean"),
平均时效=("实际时效(天)", "mean"),
准时率=("是否准时", "mean")
).round(2)
mode_stats["准时率"] = (mode_stats["准时率"] * 100).round(1).astype(str) + "%"
print(mode_stats)
# 2. 按线路分析
print("\n=== 按线路分析 ===")
route_stats = df.groupby("线路").agg(
运单数=("运单号", "count"),
平均距离=("距离(km)", "mean"),
平均成本=("运输成本(元)", "mean"),
平均时效=("实际时效(天)", "mean")
).round(1).sort_values("平均成本", ascending=False)
print(route_stats)
print(f"\n总货损率: {df['货损'].mean()*100:.1f}%")
print(f"整体准时率: {df['是否准时'].mean()*100:.1f}%")
供应商评估是供应链管理中的重要环节,直接关系到采购质量、成本和交付的稳定性。一个完善的供应商评估体系通常包含以下核心维度:
| 评估维度 | 关键指标 | 权重建议 |
|---|---|---|
| 质量 | 来料合格率、质量事故次数 | 30% |
| 交付 | 准时交货率、订单满足率 | 25% |
| 成本 | 价格竞争力、付款条件 | 20% |
| 服务 | 响应速度、售后支持 | 15% |
| 创新 | 技术能力、持续改进意愿 | 10% |
import pandas as pd
import numpy as np
# 模拟8个供应商的评估数据
suppliers = pd.DataFrame({
"供应商": ["供应商A", "供应商B", "供应商C", "供应商D",
"供应商E", "供应商F", "供应商G", "供应商H"],
"来料合格率(%)": [98.5, 95.2, 99.1, 93.8, 97.6, 96.5, 98.0, 94.5],
"准时交货率(%)": [95.0, 88.5, 97.2, 85.0, 92.3, 90.5, 96.0, 87.0],
"价格竞争力(分)": [8, 9, 7, 9, 8, 7, 6, 8],
"响应速度(分)": [9, 7, 8, 6, 8, 7, 9, 6],
"技术能力(分)": [8, 6, 9, 5, 7, 7, 8, 6]
})
# 评分标准化(将百分比转为百分制分数)
suppliers["质量得分"] = suppliers["来料合格率(%)"]
suppliers["交付得分"] = suppliers["准时交货率(%)"]
suppliers["成本得分"] = suppliers["价格竞争力(分)"] * 10 # 转为百分制
suppliers["服务得分"] = suppliers["响应速度(分)"] * 10
suppliers["创新得分"] = suppliers["技术能力(分)"] * 10
# 加权计算综合得分
weights = {"质量得分": 0.30, "交付得分": 0.25, "成本得分": 0.20,
"服务得分": 0.15, "创新得分": 0.10}
suppliers["综合得分"] = 0
for col, w in weights.items():
suppliers["综合得分"] += suppliers[col] * w
suppliers = suppliers.sort_values("综合得分", ascending=False).reset_index(drop=True)
suppliers["排名"] = range(1, len(suppliers) + 1)
# 供应商分级
def grade(score):
if score >= 95:
return "A (优秀)"
elif score >= 85:
return "B (良好)"
elif score >= 75:
return "C (合格)"
else:
return "D (待改进)"
suppliers["等级"] = suppliers["综合得分"].apply(grade)
print("=== 供应商综合评估排名 ===")
print(suppliers[["排名", "供应商", "综合得分", "等级",
"质量得分", "交付得分", "成本得分"]].to_string(index=False))
供应商绩效评分模型是对供应商进行量化评估的系统方法。通过建立科学的评分模型,企业可以客观地比较不同供应商的表现,做出更明智的采购决策。本节将构建一个基于多维度加权评分的供应商绩效模型。
import pandas as pd
import numpy as np
# ========== 供应商绩效评分模型 ==========
class SupplierScoringModel:
"""供应商绩效评分模型"""
def __init__(self, weights=None):
# 默认权重
self.weights = weights or {
"来料合格率": 0.25,
"准时交货率": 0.20,
"价格竞争力": 0.20,
"订单满足率": 0.15,
"响应速度": 0.10,
"合作年限": 0.10
}
def normalize(self, value, min_val, max_val, higher_better=True):
"""将原始值归一化到0-100分"""
if max_val == min_val:
return 50
score = (value - min_val) / (max_val - min_val) * 100
if not higher_better:
score = 100 - score
return round(score, 1)
def score_supplier(self, data):
"""计算单个供应商的综合得分"""
scores = {}
for metric, weight in self.weights.items():
raw = data.get(metric, 0)
if metric == "来料合格率":
scores[metric] = self.normalize(raw, 85, 100)
elif metric == "准时交货率":
scores[metric] = self.normalize(raw, 75, 100)
elif metric == "价格竞争力":
scores[metric] = self.normalize(raw, 1, 10)
elif metric == "订单满足率":
scores[metric] = self.normalize(raw, 80, 100)
elif metric == "响应速度":
scores[metric] = self.normalize(raw, 1, 10)
elif metric == "合作年限":
scores[metric] = self.normalize(raw, 0, 10)
total = sum(scores[m] * w for m, w in self.weights.items())
return round(total, 1), scores
def evaluate_batch(self, supplier_data_list):
"""批量评估供应商"""
results = []
for data in supplier_data_list:
total, scores = self.score_supplier(data)
result = {"供应商": data["名称"], "综合得分": total}
result.update(scores)
results.append(result)
df = pd.DataFrame(results)
df = df.sort_values("综合得分", ascending=False).reset_index(drop=True)
df["排名"] = range(1, len(df) + 1)
return df
# 使用模型评估供应商
model = SupplierScoringModel()
suppliers_data = [
{"名称": "华达电子", "来料合格率": 98.5, "准时交货率": 96.0,
"价格竞争力": 8, "订单满足率": 97.0, "响应速度": 9, "合作年限": 5},
{"名称": "鑫源科技", "来料合格率": 95.2, "准时交货率": 88.5,
"价格竞争力": 9, "订单满足率": 92.0, "响应速度": 7, "合作年限": 3},
{"名称": "恒信精密", "来料合格率": 99.1, "准时交货率": 97.2,
"价格竞争力": 7, "订单满足率": 98.0, "响应速度": 8, "合作年限": 8},
{"名称": "博远材料", "来料合格率": 93.8, "准时交货率": 85.0,
"价格竞争力": 9, "订单满足率": 88.0, "响应速度": 6, "合作年限": 2},
{"名称": "正泰工业", "来料合格率": 97.6, "准时交货率": 92.3,
"价格竞争力": 8, "订单满足率": 95.0, "响应速度": 8, "合作年限": 6},
]
result_df = model.evaluate_batch(suppliers_data)
print("=== 供应商绩效评分结果 ===")
print(result_df.to_string(index=False))
# 输出各维度权重
print("\n=== 评分权重配置 ===")
for metric, weight in model.weights.items():
bar = "█" * int(weight * 50)
print(f" {metric}: {weight*100:.0f}% {bar}")
本节将通过一个完整的实战案例,对供应商的历史交易数据进行全面分析,包括供应商绩效评估、风险识别和采购优化建议。
import pandas as pd
import numpy as np
# ========== 供应商综合分析实战 ==========
# 1. 模拟供应商历史交易数据(6个月)
np.random.seed(42)
n_records = 500
suppliers = ["华达电子", "鑫源科技", "恒信精密", "博远材料", "正泰工业",
"天合光电", "瑞升机械", "中鼎零件"]
records = []
for month in range(1, 7):
for supplier in suppliers:
n_orders = np.random.randint(3, 12)
for _ in range(n_orders):
records.append({
"月份": f"2025-{month:02d}",
"供应商": supplier,
"订单金额(万元)": round(np.random.uniform(5, 100), 2),
"来料合格率(%)": round(np.random.normal(96, 2), 1),
"是否准时": np.random.choice([True, False], p=[0.85, 0.15]),
"订单满足率(%)": round(np.random.normal(94, 4), 1),
"响应时间(小时)": max(1, int(np.random.normal(8, 3)))
})
df = pd.DataFrame(records)
# 2. 汇总各供应商绩效
print("=" * 60)
print(" 供应商综合分析报告 (2025上半年)")
print("=" * 60)
summary = df.groupby("供应商").agg(
总订单数=("订单金额(万元)", "count"),
总采购额=("订单金额(万元)", "sum"),
平均订单金额=("订单金额(万元)", "mean"),
平均合格率=("来料合格率(%)", "mean"),
准时交货率=("是否准时", "mean"),
平均满足率=("订单满足率(%)", "mean"),
平均响应时间=("响应时间(小时)", "mean")
).round(2)
summary["准时交货率"] = (summary["准时交货率"] * 100).round(1)
summary = summary.sort_values("总采购额", ascending=False)
print("\n--- 供应商采购额排名 ---")
print(summary[["总订单数", "总采购额", "平均订单金额"]].to_string())
print("\n--- 供应商质量与交付表现 ---")
print(summary[["平均合格率", "准时交货率", "平均满足率", "平均响应时间"]].to_string())
# 3. 供应商风险识别
print("\n--- 供应商风险预警 ---")
risks = []
for supplier in suppliers:
sub = df[df["供应商"] == supplier]
quality = sub["来料合格率(%)"].mean()
otd = sub["是否准时"].mean()
fill = sub["订单满足率(%)"].mean()
issues = []
if quality < 95:
issues.append(f"来料合格率偏低({quality:.1f}%)")
if otd < 0.85:
issues.append(f"准时交货率偏低({otd*100:.1f}%)")
if fill < 90:
issues.append(f"订单满足率偏低({fill:.1f}%)")
if issues:
risks.append({"供应商": supplier, "风险项": len(issues), "详情": "; ".join(issues)})
if risks:
risk_df = pd.DataFrame(risks)
print(risk_df.to_string(index=False))
else:
print("所有供应商表现正常,无风险预警")
# 4. 采购优化建议
print("\n--- 采购优化建议 ---")
# 识别高采购额低合格率的供应商
for _, row in summary.iterrows():
if row["平均合格率"] < 95 and row["总采购额"] > summary["总采购额"].median():
print(f" [重点] {row.name}: 采购额{row['总采购额']:.0f}万元, "
f"但合格率仅{row['平均合格率']:.1f}%, 建议加强质量管控或寻找替代供应商")
# 识别高准时率低采购额的潜在优质供应商
print("\n 潜在优质供应商(高准时率、低采购额,可考虑增加份额):")
for _, row in summary.iterrows():
if row["准时交货率"] > 90 and row["总采购额"] < summary["总采购额"].quantile(0.3):
print(f" - {row.name}: 准时率{row['准时交货率']:.1f}%, "
f"当前采购额{row['总采购额']:.0f}万元")