aoi学院

Aisaka's Blog, School of Aoi, Aisaka University

Python-第10篇《数据清洗:让脏数据变干净》

📖 开篇语

还记得你第一次收到客户信息表的情景吗?

手机号有的是13800138000,有的是138-0013-8000,还有的是空着的
日期格式五花八门:2024/1/1、2024-01-01、1-Jan-2024
客户名称前后有空格,有的还多了个“有限公司”
邮箱地址明显写错了:zhangsan@email、lisi123@com
这就是“脏数据”——现实世界中的数据从来都不是完美的!

今天我们要学的,就是让Python帮你把这些“脏数据”变成“干净数据”。
就像财务做账前的整理工作一样,数据清洗是数据分析的第一步,也是最重要的一步!


🎯 今日学习目标

  • 识别和处理缺失值(空值、NaN)
  • 统一数据格式(日期、数字、文本)
  • 实战清理客户信息表(比手工清洗快100倍)

🧹 什么是数据清洗?

脏数据的常见类型

脏数据类型示例清洗方法
缺失值手机号:空值填充或删除
格式不统一日期:2024/1/1 vs 2024-01-01标准化格式
数据错误邮箱:zhangsan@email校验和修正
重复数据同一个客户录入两次去重
多余空格“ 张三 ”去除空格
数据类型错误“123.45元”(文本)转换为数字

为什么数据清洗很重要?

举个财务的例子🌰:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
原始数据:应收账款
- 客户A:15,000元 ✅
- 客户B:空值 ❌(是0元还是未统计?)
- 客户C:"12,500元"(文本)❌(无法计算)
- 客户D:12,500.00 ✅

清洗后:
- 客户A:15000.00 ✅
- 客户B:0.00 ✅(明确为0)
- 客户C:12500.00 ✅(转换为数字)
- 客户D:12500.00 ✅

总计:15000 + 0 + 12500 + 12500 = 50000元
(清洗前无法准确计算总计!)

🔍 识别脏数据:先发现问题

创建示例客户信息表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import pandas as pd
import numpy as np

# 创建一个"脏兮兮"的客户信息表
客户信息 = pd.DataFrame({
'客户ID': ['C001', 'C002', 'C003', 'C004', 'C005', 'C006', 'C007', 'C008', 'C009', 'C010'],
'客户名称': [' 张三 ', '李四', '王五', None, '赵六', '钱七', '孙八', '周九', '吴十', '郑十一'],
'联系电话': ['13800138000', '138-0013-8001', None, '1381234567', '13800138004', 'not-a-phone', '13800138005', '', '138-0013-8006', '13800138007'],
'邮箱': ['zhangsan@email.com', 'lisi@company', None, 'wangwu123@', 'zhaoliu@email.com', 'qianqi@email.com', 'sunba@email.com', 'zhoujiu@', 'wushi@email.com', 'zhengshiyi@email.com'],
'注册日期': ['2024-01-01', '2024/01/02', '2024.01.03', None, '2024-01-05', '2024-01-06', '2024-01-07', '2024-01-08', '2024-01-09', '2024-01-10'],
'信用等级': ['A', 'B', 'A', 'C', None, 'B', 'A', 'C', 'B', 'A'],
'应收账款': ['15000.50', '25000', None, '18000.75', 'not-a-number', '22000', '19500.25', '', '16500', '21000.80'],
'状态': ['活跃', '活跃', ' 不活跃 ', '活跃', '活跃', '不活跃', '活跃', '活跃', '不活跃', '活跃']
})

print("原始客户信息表:")
print(客户信息)
print(f"\n数据形状:{客户信息.shape}")
print(f"数据类型:\n{客户信息.dtypes}")

数据质量检查

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def 数据质量报告(df, 表名="数据表"):
"""生成数据质量检查报告"""
print(f"\n{'='*50}")
print(f"{表名} - 数据质量检查报告")
print(f"{'='*50}")

print(f"数据形状:{df.shape}")
print(f"列名:{df.columns.tolist()}")

print(f"\n缺失值统计:")
缺失统计 = df.isnull().sum()
for 列名, 缺失数 in 缺失统计.items():
缺失率 = 缺失数 / len(df) * 100
print(f" {列名}{缺失数}个 ({缺失率:.1f}%)")

print(f"\n重复数据:{df.duplicated().sum()}行")

print(f"\n数据类型检查:")
for 列名 in df.columns:
数据类型 = df[列名].dtype
唯一值数量 = df[列名].nunique()
print(f" {列名}{数据类型},唯一值:{唯一值数量}个")

# 显示前5个唯一值
唯一值 = df[列名].dropna().unique()[:5]
print(f" 示例值:{唯一值.tolist()}")

# 生成数据质量报告
数据质量报告(客户信息, "客户信息表")

🧽 处理缺失值:把“空”变成“满”

识别缺失值

1
2
3
4
5
6
7
8
9
10
11
12
13
# 查看缺失值分布
print("缺失值分布:")
print(客户信息.isnull().sum())

# 查看包含缺失值的行
print("\n包含缺失值的行:")
缺失行 = 客户信息[客户信息.isnull().any(axis=1)]
print(缺失行)

# 查看特定列的缺失值
print("\n邮箱地址缺失的行:")
邮箱缺失 = 客户信息[客户信息['邮箱'].isnull()]
print(邮箱缺失[['客户ID', '客户名称', '邮箱']])

方法1:删除缺失值(适合缺失比例小的数据)

1
2
3
4
5
6
7
8
9
10
11
12
13
# 方法1.1:删除包含缺失值的行
客户信息_删除缺失 = 客户信息.dropna()
print(f"删除缺失值后:{客户信息_删除缺失.shape}")

# 方法1.2:删除特定列缺失的行
客户信息_删除邮箱缺失 = 客户信息.dropna(subset=['邮箱'])
print(f"删除邮箱缺失后:{客户信息_删除邮箱缺失.shape}")

# 方法1.3:删除缺失比例高的列
缺失比例 = 客户信息.isnull().sum() / len(客户信息)
高缺失列 = 缺失比例[缺失比例 > 0.3].index # 缺失超过30%的列
客户信息_删除高缺失列 = 客户信息.drop(columns=高缺失列)
print(f"删除高缺失列后:{客户信息_删除高缺失列.shape}")

方法2:填充缺失值(更常用的方法)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 方法2.1:用固定值填充
客户信息_填充1 = 客户信息.copy()
客户信息_填充1['信用等级'] = 客户信息_填充1['信用等级'].fillna('C') # 默认C级

# 方法2.2:用前一个值或后一个值填充
客户信息_填充2 = 客户信息.copy()
客户信息_填充2['信用等级'] = 客户信息_填充2['信用等级'].fillna(method='ffill') # 前向填充

# 方法2.3:用平均值填充(适合数值型)
客户信息_填充3 = 客户信息.copy()
# 先转换应收账款为数值类型
客户信息_填充3['应收账款'] = pd.to_numeric(客户信息_填充3['应收账款'], errors='coerce')
客户信息_填充3['应收账款'] = 客户信息_填充3['应收账款'].fillna(客户信息_填充3['应收账款'].mean())

# 方法2.4:智能填充(根据其他列推断)
客户信息_智能填充 = 客户信息.copy()

# 根据销售员推断信用等级(同级别的销售员信用等级相同)
信用等级映射 = 客户信息_智能填充.groupby('销售员')['信用等级'].first().to_dict()
客户信息_智能填充['信用等级'] = 客户信息_智能填充.apply(
lambda row: row['信用等级'] if pd.notna(row['信用等级']) else
信用等级映射.get(row['销售员'], 'C'), axis=1
)

print("智能填充后的信用等级:")
print(客户信息_智能填充[['销售员', '信用等级']])

方法3:插值填充(适合时间序列)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 创建时间序列数据
时间序列数据 = pd.DataFrame({
'日期': pd.date_range('2024-01-01', periods=10),
'销售额': [10000, 12000, np.nan, 15000, 18000, np.nan, 22000, 25000, 28000, 30000]
})

print("原始时间序列:")
print(时间序列数据)

# 线性插值填充
时间序列_插值 = 时间序列数据.copy()
时间序列_插值['销售额'] = 时间序列_插值['销售额'].interpolate(method='linear')

print("\n插值填充后:")
print(时间序列_插值)

# 可视化插值效果
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 6))
plt.plot(时间序列数据['日期'], 时间序列数据['销售额'], 'ro-', label='原始数据(含缺失)')
plt.plot(时间序列_插值['日期'], 时间序列_插值['销售额'], 'bo-', label='插值填充后')
plt.title('时间序列数据插值填充')
plt.xlabel('日期')
plt.ylabel('销售额')
plt.legend()
plt.grid(True)
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig('时间序列插值.png', dpi=300, bbox_inches='tight')
plt.close()

🔄 统一数据格式:让数据“整齐划一”

文本数据清洗

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 创建示例文本数据
文本数据 = pd.DataFrame({
'客户名称': [' 张三 ', '李四', '王五', ' 赵六 ', '钱七'],
'产品名称': [' 产品A ', '产品B ', ' 产品C', '产品D', ' 产品E '],
'状态': [' 活跃 ', '活跃', ' 不活跃', '活跃 ', ' 活跃 ']
})

print("原始文本数据:")
print(文本数据)

# 去除前后空格
文本数据_清洗 = 文本数据.copy()
文本数据_清洗 = 文本数据_清洗.apply(lambda x: x.str.strip() if x.dtype == 'object' else x)

print("\n去除空格后:")
print(文本数据_清洗)

# 统一大小写
文本数据_大小写 = 文本数据_清洗.copy()
文本数据_大小写['状态'] = 文本数据_大小写['状态'].str.upper()

print("\n统一为大写后:")
print(文本数据_大小写)

# 替换特定文本
文本数据_替换 = 文本数据_清洗.copy()
文本数据_替换['状态'] = 文本数据_替换['状态'].replace('不活跃', 'Inactive')
文本数据_替换['状态'] = 文本数据_替换['状态'].replace('活跃', 'Active')

print("\n文本替换后:")
print(文本数据_替换)

数字数据清洗

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# 创建混合格式的数字数据
数字数据 = pd.DataFrame({
'金额': ['15000.50', '25000', None, '18,000.75', 'not-a-number', '22000', '19500.25', '', '16500', '21000.80元'],
'数量': ['100', '150件', '200个', '180', 'not-a-number', '220', '195', '', '165', '210台'],
'单价': ['150.50元', '166.67', '152.50', '158.33', 'not-a-number', '157.50', '156.25', '', '160.00', '157.50元/件']
})

print("原始数字数据:")
print(数字数据)
print(f"\n数据类型:\n{数字数据.dtypes}")

# 清洗数字数据函数
def 清洗数字列(系列, 列名="数字列"):
"""清洗包含非数字字符的数字列"""
清洗后的系列 = 系列.astype(str) # 先转换为字符串

# 移除常见的非数字字符
清洗后的系列 = 清洗后的系列.str.replace(',', '') # 移除千分位逗号
清洗后的系列 = 清洗后的系列.str.replace('元', '') # 移除货币符号
清洗后的系列 = 清洗后的系列.str.replace('件', '') # 移除单位
清洗后的系列 = 清洗后的系列.str.replace('个', '')
清洗后的系列 = 清洗后的系列.str.replace('台', '')
清洗后的系列 = 清洗后的系列.str.replace('/件', '')
清洗后的系列 = 清洗后的系列.str.strip() # 去除空格

# 将无法转换的值设为NaN
清洗后的系列 = pd.to_numeric(清洗后的系列, errors='coerce')

return 清洗后的系列

# 应用数字清洗
数字数据_清洗 = 数字数据.copy()
for 列名 in 数字数据.columns:
数字数据_清洗[列名] = 清洗数字列(数字数据[列名], 列名)

print("\n清洗后的数字数据:")
print(数字数据_清洗)
print(f"\n清洗后数据类型:\n{数字数据_清洗.dtypes}")

# 处理清洗后的缺失值
数字数据_最终 = 数字数据_清洗.copy()
数字数据_最终 = 数字数据_最终.fillna(0) # 将无法转换的值设为0

print("\n最终清洗结果:")
print(数字数据_最终)

日期数据清洗

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# 创建混乱的日期数据
日期数据 = pd.DataFrame({
'原始日期': ['2024-01-01', '2024/01/02', '2024.01.03', '01-Jan-2024', '20240104',
'2024年1月5日', '1/6/2024', 'Jan 7, 2024', None, 'not-a-date'],
'事件': ['订单1', '订单2', '订单3', '订单4', '订单5', '订单6', '订单7', '订单8', '订单9', '订单10']
})

print("原始日期数据:")
print(日期数据)

# 智能日期清洗函数
def 智能日期清洗(日期系列):
"""智能识别和清洗各种格式的日期"""
清洗后的日期 = pd.to_datetime(日期系列, errors='coerce', infer_datetime_format=True)

# 如果还有无法识别的,尝试手动处理
无法识别 = 清洗后的日期.isnull() & 日期系列.notnull()

if 无法识别.sum() > 0:
print(f"发现{无法识别.sum()}个无法识别的日期格式")

# 尝试其他格式
for i, 原始值 in enumerate(日期系列[无法识别]):
try:
# 移除中文和特殊字符
清理值 = str(原始值).replace('年', '-').replace('月', '-').replace('日', '')
清理值 = 清理值.replace('Jan', '01').replace('Feb', '02').replace('Mar', '03')
清理值 = 清理值.replace(' ', '-')

# 尝试解析
解析日期 = pd.to_datetime(清理值, errors='coerce')
if pd.notna(解析日期):
清洗后的日期.loc[清洗后的日期.index[无法识别][i]] = 解析日期
except:
continue

return 清洗后的日期

# 应用日期清洗
日期数据_清洗 = 日期数据.copy()
日期数据_清洗['标准日期'] = 智能日期清洗(日期数据['原始日期'])

print("\n日期清洗结果:")
print(日期数据_清洗)

# 提取日期组件
日期数据_增强 = 日期数据_清洗.copy()
日期数据_增强['年'] = 日期数据_增强['标准日期'].dt.year
日期数据_增强['月'] = 日期_data_增强['标准日期'].dt.month
日期数据_增强['日'] = 日期_data_增强['标准日期'].dt.day
日期数据_增强['星期'] = 日期_data_增强['标准日期'].dt.day_name()

print("\n增强后的日期数据:")
print(日期数据_增强)

🧪 数据验证:确保清洗质量

创建数据验证函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def 数据清洗质量检查(原始数据, 清洗后数据):
"""检查数据清洗的质量"""

print("="*60)
print("数据清洗质量检查报告")
print("="*60)

print(f"原始数据形状:{原始数据.shape}")
print(f"清洗后数据形状:{清洗后_data.shape}")

# 缺失值检查
print(f"\n缺失值对比:")
原始缺失 = 原始数据.isnull().sum()
清洗缺失 = 清洗后数据.isnull().sum()

for 列名 in 原始数据.columns:
原始缺失数 = 原始缺失[列名]
清洗缺失数 = 清洗缺失[列名]
改善 = 原始缺失数 - 清洗缺失数
if 改善 > 0:
print(f" {列名}{原始缺失数}{清洗缺失数}(改善{改善}个)")
elif 改善 < 0:
print(f" {列名}{原始缺失数}{清洗缺失数}(新增{abs(改善)}个)⚠️")
else:
print(f" {列名}{原始缺失数}{清洗缺失数}(无变化)")

# 数据类型检查
print(f"\n数据类型变化:")
for 列名 in 原始数据.columns:
if 列名 in 清洗后_data.columns:
原始类型 = 原始数据[列名].dtype
清洗类型 = 清洗后_data[列名].dtype
if 原始类型 != 清洗类型:
print(f" {列名}{原始类型}{清洗_type} ✅")

# 数据范围检查
print(f"\n数据范围检查:")
for 列名 in 清洗后_data.select_dtypes(include=[np.number]).columns:
最小值 = 清洗后_data[列名].min()
最大值 = 清洗后_data[列名].max()
平均值 = 清洗后_data[列名].mean()
print(f" {列名}:范围[{最小值:.2f}, {最大值:.2f}],平均值{平均值:.2f}")

# 使用质量检查函数
数据清洗质量检查(客户信息, 客户信息_智能填充)

💼 实战项目:清理客户信息表

版本1.0:基础清洗
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# 客户信息清洗 - 基础版
print("=== 客户信息清洗系统 ===")

def 基础客户信息清洗(df):
"""基础版客户信息清洗"""

print("🔍 开始基础数据清洗...")

# 1. 去除文本前后空格
print("1. 去除文本空格...")
文本列 = ['客户名称', '状态']
for 列名 in 文本列:
if 列名 in df.columns:
df[列名] = df[列名].str.strip()

# 2. 统一大小写
print("2. 统一状态大小写...")
if '状态' in df.columns:
df['状态'] = df['状态'].str.title()

# 3. 处理缺失值
print("3. 处理缺失值...")
# 客户名称缺失:标记为"待补充"
if '客户名称' in df.columns:
df['客户名称'] = df['客户名称'].fillna('待补充')

# 信用等级缺失:默认为C级
if '信用等级' in df.columns:
df['信用等级'] = df['信用等级'].fillna('C')

# 4. 清洗数字数据
print("4. 清洗应收账款数据...")
if '应收账款' in df.columns:
df['应收账款'] = pd.to_numeric(df['应收账款'], errors='coerce')
df['应收账款'] = df['应收账款'].fillna(0)

# 5. 清洗电话号码
print("5. 清洗电话号码...")
if '联系电话' in df.columns:
# 移除所有非数字字符
df['联系电话_清洗'] = df['联系电话'].astype(str).str.replace(r'[^0-9]', '', regex=True)
# 验证手机号长度(11位)
df['电话有效性'] = df['联系电话_清洗'].str.len() == 11

# 6. 清洗邮箱地址
print("6. 清洗邮箱地址...")
if '邮箱' in df.columns:
# 基本邮箱格式验证
df['邮箱有效性'] = df['邮箱'].str.contains('@', na=False)
# 填充无效邮箱
df.loc[~df['邮箱有效性'], '邮箱'] = '待补充@company.com'

print("✅ 基础清洗完成!")
return df

# 使用基础清洗
客户信息_基础清洗 = 基础客户信息清洗(客户信息.copy())

print("\n基础清洗结果:")
print(客户信息_基础清洗.head())
版本2.0:高级清洗
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
# 客户信息清洗 - 高级版
print("\n=== 高级客户信息清洗系统 ===")

import re
from datetime import datetime

class 高级客户信息清洗器:
def __init__(self):
self.清洗规则 = {
'手机号规则': r'^1[3-9]\d{9}$',
'邮箱规则': r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$',
'日期规则': r'^\d{4}-\d{2}-\d{2}$'
}

def 清洗客户信息(self, df):
"""高级客户信息清洗"""
print("🔍 开始高级数据清洗...")

df_清洗 = df.copy()

# 1. 智能文本清洗
print("1. 智能文本清洗...")
df_清洗 = self._智能文本清洗(df_清洗)

# 2. 电话号码标准化
print("2. 电话号码标准化...")
df_清洗 = self._电话号码标准化(df_清洗)

# 3. 邮箱地址验证和修正
print("3. 邮箱地址验证和修正...")
df_清洗 = self._邮箱地址清洗(df_清洗)

# 4. 日期格式统一
print("4. 日期格式统一...")
df_清洗 = self._日期格式统一(df_清洗)

# 5. 信用等级标准化
print("5. 信用等级标准化...")
df_清洗 = self._信用等级标准化(df_清洗)

# 6. 应收账款标准化
print("6. 应收账款标准化...")
df_清洗 = self._应收账款标准化(df_清洗)

# 7. 数据验证
print("7. 数据验证...")
df_清洗 = self._数据验证(df_清洗)

print("✅ 高级清洗完成!")
return df_清洗

def _智能文本清洗(self, df):
"""智能文本清洗"""
文本列 = ['客户名称', '状态']

for 列名 in 文本列:
if 列名 in df.columns:
# 去除各种空白字符
df[列名] = df[列名].astype(str).str.strip()
# 去除多余空格
df[列名] = df[列名].str.replace(r'\s+', ' ', regex=True)
# 统一大小写格式
if 列名 == '状态':
df[列名] = df[列名].str.title()

return df

def _电话号码标准化(self, df):
"""电话号码标准化"""
if '联系电话' not in df.columns:
return df

# 创建新列保存清洗结果
df['联系电话_标准化'] = df['联系电话'].astype(str)

# 移除非数字字符
df['联系电话_标准化'] = df['联系电话_标准化'].str.replace(r'[^0-9]', '', regex=True)

# 验证手机号格式
df['电话有效性'] = df['联系电话_标准化'].str.match(self.清洗规则['手机号规则'])

# 对无效电话进行分类处理
无效电话 = ~df['电话有效性']

# 长度不对的
长度不对 = 无效电话 & (df['联系电话_标准化'].str.len() != 11)
df.loc[长度不对, '电话错误类型'] = '长度错误'

# 格式不对的
格式不对 = 无效电话 & (df['联系电话_标准化'].str.len() == 11) & \
(~df['联系电话_标准化'].str.startswith(('13', '14', '15', '16', '17', '18', '19')))
df.loc[格式不对, '电话错误类型'] = '格式错误'

# 缺失值
缺失值 = 无效电话 & (df['联系电话'].isnull() | (df['联系电话'] == 'nan'))
df.loc[缺失值, '电话错误类型'] = '缺失'

# 特殊值
特殊值 = 无效电话 & df['联系电话'].astype(str).str.contains('not-a-phone', na=False)
df.loc[特殊值, '电话错误类型'] = '无效数据'

# 对不同类型的错误进行处理
# 缺失值:标记为待补充
df.loc[缺失值, '联系电话_标准化'] = '待补充'

# 格式错误但长度正确:尝试修正
可修正 = 格式不对 & (df['联系电话_标准化'].str.len() == 11)
# 这里可以添加更复杂的修正逻辑

return df

def _邮箱地址清洗(self, df):
"""邮箱地址验证和修正"""
if '邮箱' not in df.columns:
return df

# 邮箱有效性验证
df['邮箱有效性'] = df['邮箱'].str.match(self.清洗规则['邮箱规则'], na=False)

# 识别常见邮箱错误
无效邮箱 = ~df['邮箱有效性']

# 缺失@符号
缺少at = 无效邮箱 & (~df['邮箱'].str.contains('@', na=False))
df.loc[缺少at, '邮箱错误类型'] = '缺少@符号'

# 域名不完整
域名不完整 = 无效邮箱 & df['邮箱'].str.contains('@', na=False) & \
(~df['邮箱'].str.contains('.', na=False))
df.loc[域名不完整, '邮箱错误类型'] = '域名不完整'

# 常见域名修正
常见域名修正 = {
'@email': '@email.com',
'@company': '@company.com',
'@gmail': '@gmail.com',
'@qq': '@qq.com',
'@163': '@163.com'
}

for 错误域名, 正确域名 in 常见域名修正.items():
需要修正 = df['邮箱'].str.contains(错误域名, na=False)
df.loc[需要修正, '邮箱'] = df.loc[需要修正, '邮箱'].str.replace(错误域名, 正确域名)

# 对无法修正的邮箱提供默认值
仍然无效 = ~df['邮箱'].str.match(self.清洗规则['邮箱规则'], na=False)
df.loc[仍然无效, '邮箱'] = df.loc[仍然无效, '客户ID'] + '@company.com'

return df

def _日期格式统一(self, df):
"""日期格式统一"""
if '注册日期' not in df.columns:
return df

# 尝试多种日期格式
日期格式列表 = [
'%Y-%m-%d', # 2024-01-01
'%Y/%m/%d', # 2024/01/01
'%Y.%m.%d', # 2024.01.01
'%d-%b-%Y', # 01-Jan-2024
'%d %b %Y', # 01 Jan 2024
'%Y年%m月%d日', # 2024年1月1日
'%m/%d/%Y', # 01/01/2024
'%Y%m%d' # 20240101
]

df['注册日期_标准化'] = pd.NaT

for 日期格式 in 日期格式列表:
未解析 = df['注册日期_标准化'].isnull()
if 未解析.sum() == 0:
break

try:
解析日期 = pd.to_datetime(df.loc[未解析, '注册日期'], format=日期格式, errors='coerce')
df.loc[未解析 & 解析日期.notnull(), '注册日期_标准化'] = 解析日期[解析日期.notnull()]
except:
continue

# 对仍然无法解析的日期使用智能解析
仍然未解析 = df['注册日期_标准化'].isnull() & df['注册日期'].notnull()
if 仍然未解析.sum() > 0:
智能解析 = pd.to_datetime(df.loc[仍然未解析, '注册日期'], errors='coerce')
df.loc[仍然未解析, '注册日期_标准化'] = 智能解析

# 对仍然无效的日期提供默认值
最终无效 = df['注册日期_标准化'].isnull() & (df['注册日期'].notnull())
if 最终无效.sum() > 0:
# 使用当前日期作为默认值
df.loc[最终无效, '注册日期_标准化'] = datetime.now().date()
df.loc[最终无效, '日期错误标记'] = '使用默认值'

return df

def _信用等级标准化(self, df):
"""信用等级标准化"""
if '信用等级' not in df.columns:
return df

# 定义标准信用等级
标准等级 = ['A', 'B', 'C', 'D']

# 统一格式
df['信用等级'] = df['信用等级'].astype(str).str.upper().str.strip()

# 处理非标准等级
非标准等级 = ~df['信用等级'].isin(标准等级)

# 根据客户历史推断等级(这里简化处理)
# 在实际应用中,可以基于客户的历史交易数据推断
df.loc[非标准等级, '信用等级'] = 'C' # 默认C级

return df

def _应收账款标准化(self, df):
"""应收账款标准化"""
if '应收账款' not in df.columns:
return df

# 转换为数值类型
df['应收账款_数值'] = pd.to_numeric(df['应收账款'], errors='coerce')

# 处理异常值
# 负数应收账款(预收账款)
负数 = df['应收账款_数值'] < 0
df.loc[负数, '账款类型'] = '预收账款'
df.loc[负数, '应收账款_标准化'] = abs(df.loc[负数, '应收账款_数值'])

# 过大的数值(可能需要验证)
过大数值 = df['应收账款_数值'] > 1000000 # 超过100万
df.loc[过大数值, '数值异常标记'] = '金额过大需核实'

# 正常应收账款
正常数值 = (df['应收账款_数值'] >= 0) & (df['应收账款_数值'] <= 1000000)
df.loc[正常数值, '应收账款_标准化'] = df.loc[正常数值, '应收账款_数值']

# 缺失值处理
缺失值 = df['应收账款_数值'].isnull()
df.loc[缺失值, '应收账款_标准化'] = 0
df.loc[缺失值, '账款类型'] = '无应收账款'

return df

def _数据验证(self, df):
"""数据验证"""
# 创建验证结果列
df['数据质量评分'] = 100

# 电话有效性扣分
if '电话有效性' in df.columns:
无效电话 = ~df['电话有效性']
df.loc[无效电话, '数据质量评分'] -= 20

# 邮箱有效性扣分
if '邮箱有效性' in df.columns:
无效邮箱 = ~df['邮箱有效性']
df.loc[无效邮箱, '数据质量评分'] -= 15

# 日期有效性扣分
if '日期错误标记' in df.columns:
日期错误 = df['日期错误标记'].notnull()
df.loc[日期错误, '数据质量评分'] -= 10

# 数据质量分级
df['数据质量等级'] = pd.cut(df['数据质量评分'],
bins=[0, 60, 80, 90, 100],
labels=['差', '一般', '良好', '优秀'])

return df

# 使用高级清洗器
高级清洗器 = 高级客户信息清洗器()
客户信息_高级清洗 = 高级清洗器.清洗客户信息(客户信息.copy())

print("\n高级清洗结果示例:")
print(客户信息_高级清洗[['客户ID', '客户名称', '联系电话_标准化', '电话有效性', '邮箱有效性', '数据质量评分']].head())
版本3.0:专业数据清洗系统
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
# 专业数据清洗系统
print("\n=== 专业数据清洗系统 ===")

from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, LabelEncoder
import joblib

class 专业数据清洗系统:
def __init__(self):
self.清洗规则 = {}
self.清洗模型 = {}
self.清洗历史 = []

def 分析数据质量(self, df):
"""全面分析数据质量"""
print("🔍 分析数据质量...")

质量报告 = {
'总体情况': {},
'缺失值分析': {},
'异常值分析': {},
'格式一致性': {},
'重复数据': {}
}

# 总体情况
质量报告['总体情况'] = {
'数据形状': df.shape,
'列数': len(df.columns),
'行数': len(df),
'内存使用': df.memory_usage(deep=True).sum()
}

# 缺失值分析
缺失统计 = df.isnull().sum()
缺失比例 = (缺失统计 / len(df) * 100).round(2)

质量报告['缺失值分析'] = {
'总缺失值': 缺失统计.sum(),
'缺失值比例': 缺失比例.to_dict(),
'完全缺失的行': df.isnull().all(axis=1).sum(),
'任何缺失的行': df.isnull().any(axis=1).sum()
}

# 异常值分析(使用IQR方法)
数值列 = df.select_dtypes(include=[np.number]).columns
异常值统计 = {}

for 列名 in 数值列:
Q1 = df[列名].quantile(0.25)
Q3 = df[列名].quantile(0.75)
IQR = Q3 - Q1
下限 = Q1 - 1.5 * IQR
上限 = Q3 + 1.5 * IQR

异常值 = df[(df[列名] < 下限) | (df[列名] > 上限)][列名]
异常值统计[列名] = {
'异常值数量': len(异常值),
'异常值比例': len(异常值) / len(df) * 100,
'下限': 下限,
'上限': 上限
}

质量报告['异常值分析'] = 异常值统计

# 重复数据
完全重复 = df.duplicated().sum()
部分重复 = df.duplicated(subset=df.columns[:3]).sum() # 检查前3列

质量报告['重复数据'] = {
'完全重复行': 完全重复,
'部分重复行': 部分重复 - 完全重复,
'重复率': (完全重复 + max(0, 部分重复 - 完全重复)) / len(df) * 100
}

return 质量报告

def 智能清洗(self, df, 清洗配置=None):
"""智能数据清洗"""
print("🤖 开始智能数据清洗...")

if 清洗配置 is None:
清洗配置 = self._生成默认清洗配置(df)

df_清洗 = df.copy()

# 1. 根据数据类型自动选择清洗策略
for 列名 in df.columns:
数据类型 = df[列名].dtype
唯一值数量 = df[列名].nunique()
缺失比例 = df[列名].isnull().sum() / len(df)

print(f" 处理列:{列名}(类型:{数据类型},缺失率:{缺失比例:.1%})")

if pd.api.types.is_numeric_dtype(df[列名]):
df_清洗 = self._清洗数值列(df_清洗, 列名, 缺失比例)
elif pd.api.types.is_datetime64_dtype(df[列名]):
df_清洗 = self._清洗日期列(df_清洗, 列名, 缺失比例)
else:
df_清洗 = self._清洗文本列(df_清洗, 列名, 唯一值数量, 缺失比例)

# 2. 高级清洗技术
print(" 应用高级清洗技术...")
df_清洗 = self._高级清洗技术(df_清洗)

# 3. 数据验证
print(" 验证清洗结果...")
验证结果 = self._验证清洗结果(df, df_清洗)

print("✅ 智能清洗完成!")
return df_清洗, 验证结果

def _生成默认清洗配置(self, df):
"""根据数据特征生成默认清洗配置"""
配置 = {}

for 列名 in df.columns:
缺失比例 = df[列名].isnull().sum() / len(df)
数据类型 = df[列名].dtype

配置[列名] = {
'缺失值处理': '智能填充' if 缺失比例 < 0.3 else '删除行',
'异常值处理': '标记',
'格式标准化': True,
'数据验证': True
}

return 配置

def _清洗数值列(self, df, 列名, 缺失比例):
"""清洗数值列"""
# 转换为数值类型
df[列名] = pd.to_numeric(df[列名], errors='coerce')

# 处理缺失值
if 缺失比例 < 0.1: # 缺失率小于10%,用中位数填充
中位数 = df[列名].median()
df[列名] = df[列名].fillna(中位数)
elif 缺失比例 < 0.3: # 缺失率10-30%,用均值填充
均值 = df[列名].mean()
df[列名] = df[列名].fillna(均值)
else: # 缺失率大于30%,创建指示变量
df[f'{列名}_是否缺失'] = df[列名].isnull()
df[列名] = df[列名].fillna(0)

# 处理异常值
Q1 = df[列名].quantile(0.25)
Q3 = df[列名].quantile(0.75)
IQR = Q3 - Q1
下限 = Q1 - 1.5 * IQR
上限 = Q3 + 1.5 * IQR

异常值 = (df[列名] < 下限) | (df[列名] > 上限)
if 异常值.sum() > 0:
df[f'{列名}_异常标记'] = 异常值
# 用边界值替换异常值
df.loc[df[列名] < 下限, 列名] = 下限
df.loc[df[列名] > 上限, 列名] = 上限

return df

def _清洗日期列(self, df, 列名, 缺失比例):
"""清洗日期列"""
# 尝试多种日期格式
日期格式列表 = [
'%Y-%m-%d', '%Y/%m/%d', '%Y.%m.%d', '%d-%b-%Y',
'%d %b %Y', '%Y年%m月%d日', '%m/%d/%Y', '%Y%m%d'
]

成功解析 = pd.Series([False] * len(df))

for 日期格式 in 日期格式列表:
未解析 = ~成功解析 & df[列名].notnull()
if 未解析.sum() == 0:
break

try:
解析结果 = pd.to_datetime(df.loc[未解析, 列名], format=日期格式, errors='coerce')
有效解析 = 解析结果.notnull()
df.loc[未解析 & 有效解析, 列名] = 解析结果[有效解析]
成功解析 |= 有效解析
except:
continue

# 对无法解析的使用智能解析
仍然未解析 = ~成功解析 & df[列名].notnull()
if 仍然未解析.sum() > 0:
智能结果 = pd.to_datetime(df.loc[仍然未解析, 列名], errors='coerce')
df.loc[仍然未解析, 列名] = 智能结果

# 处理缺失值
if 缺失比例 < 0.2:
# 用中位数日期填充(对于时间序列)
df[列名] = pd.to_datetime(df[列名])
中位数日期 = df[列名].dropna().quantile(0.5)
df[列名] = df[列名].fillna(中位数日期)
else:
# 用当前日期填充
df[列名] = pd.to_datetime(df[列名])
df[列名] = df[列名].fillna(datetime.now())

return df

def _清洗文本列(self, df, 列名, 唯一值数量, 缺失比例):
"""清洗文本列"""
# 去除空白字符
df[列名] = df[列名].astype(str).str.strip()
df[列名] = df[列名].str.replace(r'\s+', ' ', regex=True)

# 处理缺失值
if 缺失比例 > 0:
if 唯一值数量 < 20: # 分类变量
# 用众数填充
众数 = df[列名].mode()[0] if len(df[列名].mode()) > 0 else '未知'
df[列名] = df[列名].fillna(众数)
else: # 文本变量
df[列名] = df[列名].fillna('待补充')

# 标准化处理
if 列名 == '状态':
df[列名] = df[列名].str.title()
elif 列名 == '信用等级':
# 标准化信用等级
有效等级 = ['A', 'B', 'C', 'D']
无效等级 = ~df[列名].isin(有效等级)
df.loc[无效等级, 列名] = 'C' # 默认C级

return df

def _高级清洗技术(self, df):
"""高级清洗技术"""
# 1. 重复数据处理
重复掩码 = df.duplicated()
if 重复掩码.sum() > 0:
df['是否重复'] = 重复掩码
df = df.drop_duplicates()

# 2. 基于机器学习的缺失值填充(简化版)
# 使用相关列预测缺失值
数值列 = df.select_dtypes(include=[np.number]).columns
for 列名 in 数值列:
if df[列名].isnull().sum() > 0:
# 使用其他数值列的均值进行预测
其他列 = [col for col in 数值列 if col != 列名]
if len(其他列) > 0:
缺失索引 = df[列名].isnull()
预测值 = df.loc[缺失索引, 其他列].mean(axis=1)
df.loc[缺失索引, 列名] = 预测值

return df

def _验证清洗结果(self, 原始数据, 清洗后数据):
"""验证清洗结果"""
验证报告 = {
'清洗效果': {},
'数据质量提升': {},
'建议': []
}

# 清洗效果评估
原始缺失率 = 原始数据.isnull().sum().sum() / (原始数据.shape[0] * 原始数据.shape[1])
清洗缺失率 = 清洗后_data.isnull().sum().sum() / (清洗后_data.shape[0] * 清洗后_data.shape[1])

验证报告['清洗效果'] = {
'原始缺失率': f"{原始缺失率:.2%}",
'清洗缺失率': f"{清洗缺失率:.2%}",
'缺失率降低': f"{(原始缺失率 - 清洗缺失率):.2%}"
}

# 数据质量提升
原始重复率 = 原始数据.duplicated().sum() / len(原始数据)
清洗重复率 = 清洗后_data.duplicated().sum() / len(清洗后_data)

验证报告['数据质量提升'] = {
'重复率降低': f"{(原始重复率 - 清洗重复率):.2%}",
'数据完整性': f"{(1 - 清洗缺失率):.2%}"
}

# 建议
if 清洗缺失率 > 0.1:
验证报告['建议'].append("缺失率仍然较高,建议检查数据源质量")

if 清洗后_data.select_dtypes(include=[np.number]).empty:
验证报告['建议'].append("没有数值型数据,可能类型转换有问题")

验证报告['建议'].append("建议定期执行数据清洗流程")

return 验证报告

# 使用专业清洗系统
专业系统 = 专业数据清洗系统()

# 分析原始数据质量
质量报告 = 专业系统.分析数据质量(客户信息)
print("📊 数据质量分析报告:")
for 分类, 数据 in 质量报告.items():
print(f"\n{分类}:")
for 项目, 值 in 数据.items():
print(f" {项目}{值}")

# 执行智能清洗
客户信息_专业清洗, 验证结果 = 专业系统.智能清洗(客户信息)

print(f"\n✅ 专业清洗完成!")
print(f"清洗效果:缺失率从{验证结果['清洗效果']['原始缺失率']}降低到{验证结果['清洗效果']['清洗缺失率']}")

# 保存清洗模型
joblib.dump(专业系统, '客户信息清洗模型.pkl')
print("✅ 清洗模型已保存!")

🏃‍♀️ 进阶小挑战

挑战1:发票数据清洗
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# 发票数据清洗挑战
print("=== 发票数据清洗挑战 ===")

# 创建复杂的发票数据
发票数据 = pd.DataFrame({
'发票号码': ['12345678', '87654321', 'ABCD1234', '1234567', '1234567890123', None, '12345678', '87654322'],
'开票日期': ['2024-01-01', '2024/01/02', '2024.01.03', '01-Jan-2024', '20240105', '2024年1月6日', None, '2024-01-08'],
'购买方名称': [' 张三公司 ', '李四企业', '王五集团 ', ' 赵六有限公司', None, '钱七股份', '孙八实业', '周九集团'],
'金额': ['15000.50', '25,000.00', '18000.75元', 'not-a-number', '22000', None, '19500.25', '21000.80'],
'税额': ['1950.07', '3,250.00', '2,340.10', 'invalid', '2,860.00', None, '2,535.03', '2,730.10'],
'税率': ['13%', '13%', '13%', 'not-a-rate', '13%', None, '13%', '13%'],
'发票类型': ['专用发票', ' 普通发票 ', '专用发票', None, '电子发票', '专用发票', '普通发票', '专用发票'],
'发票状态': ['正常', '正常', ' 作废 ', '正常', None, '正常', '正常', '正常']
})

def 清洗发票数据(发票df):
"""专业发票数据清洗"""

print("🔍 开始清洗发票数据...")
df = 发票df.copy()

# 1. 发票号码标准化
print("1. 发票号码标准化...")
# 移除所有非数字字符
df['发票号码_清洗'] = df['发票号码'].astype(str).str.replace(r'[^0-9]', '', regex=True)
# 验证发票号码长度(8位)
df['发票号码有效性'] = (df['发票号码_清洗'].str.len() == 8) & (df['发票号码_清洗'] != 'nan')
# 处理无效发票号码
df.loc[~df['发票号码有效性'], '发票号码_清洗'] = 'INVALID'

# 2. 开票日期标准化
print("2. 开票日期标准化...")
df['开票日期_标准'] = pd.to_datetime(df['开票日期'], errors='coerce')
# 处理超出合理范围的日期
当前日期 = pd.Timestamp.now()
未来日期 = df['开票日期_标准'] > 当前日期
过早日期 = df['开票日期_标准'] < pd.Timestamp('2020-01-01')

df.loc[未来日期, '日期异常标记'] = '未来日期'
df.loc[过早日期, '日期异常标记'] = '过早日期'
df.loc[df['开票日期_标准'].isnull(), '日期异常标记'] = '无效日期'

# 3. 购买方名称清洗
print("3. 购买方名称清洗...")
df['购买方名称_清洗'] = df['购买方名称'].astype(str).str.strip()
# 统一有限公司格式
df['购买方名称_清洗'] = df['购买方名称_清洗'].str.replace('有限公司', '有限公司', regex=False)
df['购买方名称_清洗'] = df['购买方名称_清洗'].str.replace('有限责任', '有限公司', regex=False)

# 4. 金额和税额清洗
print("4. 金额和税额清洗...")
for 列名 in ['金额', '税额']:
# 移除货币符号和千分位
df[f'{列名}_数值'] = df[列名].astype(str).str.replace(r'[,\元]', '', regex=True)
# 转换为数值
df[f'{列名}_数值'] = pd.to_numeric(df[f'{列名}_数值'], errors='coerce')
# 处理异常值
df[f'{列名}_数值'] = df[f'{列名}_数值'].fillna(0)

# 5. 税率标准化
print("5. 税率标准化...")
df['税率_数值'] = df['税率'].astype(str).str.replace('%', '').str.strip()
df['税率_数值'] = pd.to_numeric(df['税率_数值'], errors='coerce')
# 标准化为百分比形式
df['税率_标准'] = df['税率_数值'] / 100
# 处理异常税率
异常税率 = (df['税率_标准'] < 0) | (df['税率_标准'] > 0.2) # 假设税率不超过20%
df.loc[异常税率, '税率异常标记'] = '税率超出正常范围'
df.loc[异常税率, '税率_标准'] = 0.13 # 默认13%

# 6. 发票类型标准化
print("6. 发票类型标准化...")
df['发票类型_标准'] = df['发票类型'].astype(str).str.strip()
# 统一命名
类型映射 = {
'专用发票': '增值税专用发票',
'普通发票': '增值税普通发票',
'电子发票': '增值税电子普通发票'
}
for 原始, 标准 in 类型映射.items():
df['发票类型_标准'] = df['发票类型_标准'].str.replace(原始, 标准, regex=False)

# 7. 发票状态标准化
print("7. 发票状态标准化...")
df['发票状态_标准'] = df['发票状态'].astype(str).str.strip().str.title()
# 统一状态名称
状态映射 = {
'正常': '有效',
'作废': '无效',
'冲红': '已冲红'
}
for 原始, 标准 in 状态映射.items():
df['发票状态_标准'] = df['发票状态_标准'].str.replace(原始, 标准, regex=False)

# 8. 数据验证
print("8. 数据验证...")
# 验证税额计算
df['计算税额'] = df['金额_数值'] * df['税率_标准']
df['税额差异'] = abs(df['税额_数值'] - df['计算税额'])
税额不匹配 = df['税额差异'] > 0.01 # 差异超过1分钱
df.loc[税额不匹配, '税额验证标记'] = '税额计算不匹配'

# 验证发票状态一致性
作废发票有金额 = (df['发票状态_标准'] == '无效') & (df['金额_数值'] > 0)
df.loc[作废发票有金额, '状态一致性标记'] = '作废发票不应有金额'

print("✅ 发票数据清洗完成!")
return df

# 清洗发票数据
发票_清洗结果 = 清洗发票数据(发票数据)

print("\n清洗结果摘要:")
print(f"有效发票号码:{发票_清洗结果['发票号码有效性'].sum()}个")
print(f"有效日期:{发票_清洗结果['开票日期_标准'].notnull().sum()}个")
print(f"税额验证通过:{(~发票_清洗结果['税额验证标记'].notnull()).sum()}个")
print(f"状态一致:{(~发票_清洗结果['状态一致性标记'].notnull()).sum()}个")

# 显示异常数据
print("\n⚠️ 发现的数据异常:")
异常列 = [col for col in 发票_清洗结果.columns if '异常' in col or '错误' in col or '无效' in col]
for 异常列名 in 异常列:
异常数量 = 发票_清洗结果[异常列名].notnull().sum()
if 异常数量 > 0:
print(f" {异常列名}{异常数量}个")
挑战2:银行流水智能分类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# 银行流水智能分类系统
print("=== 银行流水智能分类系统 ===")

# 创建银行流水数据
银行流水 = pd.DataFrame({
'交易日期': ['2024-01-01', '2024-01-02', '2024-01-03', '2024-01-04', '2024-01-05', '2024-01-06', '2024-01-07', '2024-01-08'],
'摘要': ['工资收入-张三', '支付宝转账-花呗还款', '微信支付-超市购物', '跨行转账-客户A货款',
'ATM取款', '信用卡还款', '微信红包-新年红包', '对公转账-供应商B货款'],
'金额': [15000.00, -2500.00, -156.50, 35000.00, -3000.00, -5800.00, 88.88, -45000.00],
'对方账户': ['工资专户', '支付宝(中国)网络技术有限公司', '财付通支付科技有限公司', '张三',
'ATM机', '中国工商银行信用卡中心', '微信红包', 'ABC有限公司'],
'交易类型': ['转入', '转出', '转出', '转入', '转出', '转出', '转入', '转出']
})

def 智能流水分类(流水df):
"""智能银行流水分类"""

print("🤖 开始智能流水分类...")
df = 流水df.copy()

# 定义分类关键词
分类规则 = {
'工资收入': {
'关键词': ['工资', '薪资', '奖金', '津贴'],
'金额特征': '>0',
'优先级': 1
},
'投资理财': {
'关键词': ['理财', '基金', '股票', '证券', '投资'],
'金额特征': '任意',
'优先级': 2
},
'生活消费': {
'关键词': ['支付宝', '微信', '购物', '餐饮', '超市'],
'金额特征': '<0',
'优先级': 3
},
'信用卡还款': {
'关键词': ['信用卡', '还款', 'creditcard'],
'金额特征': '<0',
'优先级': 1
},
'转账汇款': {
'关键词': ['转账', '汇款', '跨行'],
'金额特征': '任意',
'优先级': 4
},
'现金业务': {
'关键词': ['ATM', '取款', '存款', '现金'],
'金额特征': '任意',
'优先级': 5
},
'其他收入': {
'关键词': ['红包', '退款', '返利'],
'金额特征': '>0',
'优先级': 6
},
'其他支出': {
'关键词': [],
'金额特征': '<0',
'优先级': 7
}
}

# 创建分类列
df['分类结果'] = '未分类'
df['分类置信度'] = 0.0
df['分类依据'] = ''

def 计算置信度(关键词匹配数, 总关键词数, 金额匹配, 规则优先级):
"""计算分类置信度"""
关键词置信度 = 关键词匹配数 / max(总关键词数, 1)
金额置信度 = 1.0 if 金额匹配 else 0.5
优先级权重 = 1.0 / 规则优先级

return (关键词置信度 * 0.6 + 金额置信度 * 0.3 + 优先级权重 * 0.1)

# 逐行分析
for 索引, 行 in df.iterrows():
摘要 = str(行['摘要']).lower()
对方账户 = str(行['对方账户']).lower()
金额 = 行['金额']
交易类型 = 行['交易类型']

最佳分类 = None
最高置信度 = 0

# 检查每个分类规则
for 分类名, 规则 in 分类规则.items():
关键词 = 规则['关键词']
金额特征 = 规则['金额特征']
优先级 = 规则['优先级']

# 关键词匹配
关键词匹配数 = 0
匹配关键词 = []

for 关键词项 in 关键词:
if 关键词项 in 摘要 or 关键词项 in 对方账户:
关键词匹配数 += 1
匹配关键词.append(关键词项)

# 金额特征匹配
金额匹配 = False
if 金额特征 == '任意':
金额匹配 = True
elif 金额特征 == '>0' and 金额 > 0:
金额匹配 = True
elif 金额特征 == '<0' and 金额 < 0:
金额匹配 = True

# 计算置信度
置信度 = 计算置信度(关键词匹配数, len(关键词), 金额匹配, 优先级)

if 置信度 > 最高置信度:
最高置信度 = 置信度
最佳分类 = 分类名
分类依据 = f"关键词:{匹配关键词},金额:{金额},优先级:{优先级}"

# 记录最佳分类结果
if 最佳分类 and 最高置信度 > 0.3: # 置信度阈值
df.loc[索引, '分类结果'] = 最佳分类
df.loc[索引, '分类置信度'] = 最高置信度
df.loc[索引, '分类依据'] = 分类依据
else:
# 使用默认分类
if 金额 > 0:
df.loc[索引, '分类结果'] = '其他收入'
df.loc[索引, '分类置信度'] = 0.3
else:
df.loc[索引, '分类结果'] = '其他支出'
df.loc[索引, '分类置信度'] = 0.3

# 高级分析
print("📊 执行高级分析...")

# 计算各类别的统计信息
分类统计 = df.groupby('分类结果').agg({
'金额': ['count', 'sum', 'mean'],
'分类置信度': 'mean'
}).round(2)

分类统计.columns = ['笔数', '总金额', '平均金额', '平均置信度']
分类统计 = 分类统计.sort_values('总金额', ascending=False)

print("\n分类统计结果:")
print(分类统计)

# 异常检测
print("\n🔍 异常检测:")

# 大额异常检测
大额阈值 = df['金额'].abs().quantile(0.95)
大额交易 = df[df['金额'].abs() > 大额阈值]
if len(大额交易) > 0:
print(f"发现{len(大额交易)}笔大额交易(>{大额阈值:.2f}元):")
for _, 交易 in 大额交易.iterrows():
print(f" {交易['交易日期']}{交易['分类结果']} - {交易['金额']:,.2f}元")

# 低置信度检测
低置信度阈值 = 0.5
低置信度交易 = df[df['分类置信度'] < 低置信度阈值]
if len(低置信度交易) > 0:
print(f"\n发现{len(低置信度交易)}笔低置信度分类:")
for _, 交易 in 低置信度交易.iterrows():
print(f" {交易['交易日期']}{交易['分类结果']}(置信度:{交易['分类置信度']:.2f})")

print("✅ 智能分类完成!")
return df

# 执行智能分类
流水_分类结果 = 智能流水分类(银行流水)

# 生成分类报告
print(f"\n📈 分类结果摘要:")
分类汇总 = 流水_分类结果['分类结果'].value_counts()
for 分类, 数量 in 分类汇总.items():
平均置信度 = 流水_分类结果[流水_分类结果['分类结果'] == 分类]['分类置信度'].mean()
print(f" {分类}{数量}笔(平均置信度:{平均置信度:.2f})")

# 可视化分类结果
import matplotlib.pyplot as plt

plt.figure(figsize=(12, 8))

# 子图1:分类饼图
plt.subplot(2, 2, 1)
分类统计 = 流水_分类结果['分类结果'].value_counts()
plt.pie(分类统计.values, labels=分类统计.index, autopct='%1.1f%%', startangle=90)
plt.title('交易分类分布')

# 子图2:收入vs支出
plt.subplot(2, 2, 2)
收支对比 = 流水_分类结果.groupby(['分类结果', 流水_分类结果['金额'] > 0])['金额'].sum().unstack(fill_value=0)
收支对比.plot(kind='bar', stacked=True)
plt.title('各类别收入vs支出')
plt.xlabel('分类')
plt.ylabel('金额')
plt.xticks(rotation=45)

# 子图3:时间序列
plt.subplot(2, 2, 3)
日分类金额 = 流水_分类结果.groupby(['交易日期', '分类结果'])['金额'].sum().unstack(fill_value=0)
日分类金额.plot(kind='line', marker='o')
plt.title('各类别金额时间序列')
plt.xlabel('日期')
plt.ylabel('金额')
plt.xticks(rotation=45)

# 子图4:置信度分布
plt.subplot(2, 2, 4)
plt.hist(流水_分类结果['分类置信度'], bins=10, edgecolor='black')
plt.title('分类置信度分布')
plt.xlabel('置信度')
plt.ylabel('频数')

plt.tight_layout()
plt.savefig('银行流水智能分类分析.png', dpi=300, bbox_inches='tight')
plt.close()

print("✅ 可视化分析图表已生成!")

💭 今日思考

通过今天的学习,我们发现:

  • 现实世界的数据总是“脏”的,需要清洗才能使用
  • Python可以自动识别和处理各种数据质量问题
  • 数据清洗比手工清洗更快、更准确、更一致
  • 专业的数据清洗系统能大幅提升数据质量

📝 课后小结

  • ✅ 学会了识别和处理缺失值
  • ✅ 掌握了统一数据格式的方法
  • ✅ 能够处理文本、数字、日期等各种数据类型
  • ✅ 完成了专业的客户信息清洗系统

🎯 下节预告

下节我们将学习简单的数据分析,掌握基础统计运算、学会分组统计,用Python做些“简单”的数据分析,让老板对你的分析能力刮目相看!


💡 小贴士

  • 数据清洗是数据分析的基础,一定要重视
  • 不同的数据类型要用不同的清洗方法
  • 清洗后要验证数据质量,确保清洗效果
  • 建立标准化的清洗流程,提高工作效率

🤖 Powered by Kimi K2 0905 💻 内容经葵葵🌻审核与修改