aoi学院

Aisaka's Blog, School of Aoi, Aisaka University

Python-第13篇《股票数据监控器》

导语

经过前两阶段的学习,你已经掌握了Python的数据处理基础。今天,我们要把这些知识用起来,做一个真正“有用”的小工具——股票数据监控器。别担心,即使你不炒股,这个项目也能让你学会如何从互联网上获取实时数据,这可是打开无数可能性的关键技能!


本篇目标

  • 理解什么是网络API,学会从网络获取数据
  • 获取真实的股票数据并解析
  • 实现定时监控和自动提醒功能
  • 完成一个可运行的个人股票监控脚本

一、准备工作:安装必要的库

在开始前,我们需要安装一个强大的工具—— akshare 库。这是一个免费的中文财经数据接口库,能帮我们轻松获取股票数据。

1
2
3
4
5
# 在命令行中运行以下命令安装
# pip install akshare

# 同时确保pandas和numpy已经安装
# pip install pandas numpy

安装完成后,我们先导入必要的库:

1
2
3
4
5
import akshare as ak  # 财经数据接口库
import pandas as pd # 数据处理
import time # 时间控制
from datetime import datetime # 日期时间处理
import sys # 系统相关(用于退出程序)

二、获取网络数据基础

2.1 什么是API?

API(应用程序接口)就像餐厅的服务员——你告诉服务员想吃什么(发送请求),服务员把订单传给厨房(服务器),再把做好的菜端给你(返回数据)。对于股票数据,我们只需要调用现成的API接口,就能拿到实时股价信息,而不需要自己去交易所爬取。

2.2 获取单只股票数据

让我们先试试获取一只股票的实时数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 获取茅台(600519)的实时行情
def get_stock_price(stock_code):
"""
获取股票当前价格
stock_code: 股票代码(如"600519")
"""
try:
# 使用akshare获取实时数据
df = ak.stock_zh_a_spot_em() # 获取所有A股实时数据

# 在数据中查找我们的股票
stock_data = df[df['代码'] == stock_code]

if not stock_data.empty:
# 提取关键信息
name = stock_data['名称'].values[0]
price = stock_data['最新价'].values[0]
change = stock_data['涨跌幅'].values[0]
volume = stock_data['成交量'].values[0]

print(f"📊 {name}({stock_code})")
print(f"当前价格: ¥{price:.2f}")
print(f"涨跌幅: {change:.2f}%")
print(f"成交量: {volume}手")
print("-" * 30)

return price
else:
print(f"未找到股票代码: {stock_code}")
return None

except Exception as e:
print(f"获取数据时出错: {e}")
return None

# 测试一下
get_stock_price("600519")

运行这段代码,你会看到类似这样的输出:

1
2
3
4
5
📊 贵州茅台(600519)
当前价格: ¥1685.00
涨跌幅: -1.20%
成交量: 12580手
------------------------------

知识点: 我们使用了 ak.stock_zh_a_spot_em() 这个函数,它会从东方财富网获取所有A股的实时数据,返回一个DataFrame。然后我们通过股票代码筛选出我们关心的那一只股票。


三、实时监控股价变化

现在我们来实现“监控”功能。思路很简单:每隔一段时间(比如30秒),自动获取一次股价并显示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def monitor_stock(stock_code, check_interval=30, duration_minutes=10):
"""
监控股票价格变化
stock_code: 股票代码
check_interval: 检查间隔(秒)
duration_minutes: 总监控时长(分钟)
"""
print(f"开始监控 {stock_code},每{check_interval}秒刷新一次")
print(f"总时长: {duration_minutes}分钟")
print("=" * 40)

# 计算总检查次数
total_checks = int(duration_minutes * 60 / check_interval)

for i in range(total_checks):
print(f"\n第 {i+1}/{total_checks} 次检查 - {datetime.now().strftime('%H:%M:%S')}")

# 获取当前股价
current_price = get_stock_price(stock_code)

# 如果获取失败,跳过这次
if current_price is None:
continue

# 如果是第一次,记录初始价格
if i == 0:
initial_price = current_price
print(f"初始价格: ¥{initial_price:.2f}")

# 等待指定时间
time.sleep(check_interval)

print("\n监控结束!")

# 测试监控功能(只监控2分钟,每30秒刷新)
# monitor_stock("600519", check_interval=30, duration_minutes=2)

代码解析:

  • time.sleep(seconds):让程序暂停指定的秒数
  • datetime.now().strftime('%H:%M:%S'):获取当前时间并格式化为时分秒
  • for i in range(total_checks):循环指定次数,每次循环就是一次检查

四、设置价格提醒

监控的核心价值在于”提醒”。我们来添加一个功能:当股价达到某个预警值时,程序会主动提醒我们。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def monitor_with_alert(stock_code, upper_price=None, lower_price=None, check_interval=30):
"""
带价格提醒的股票监控器
upper_price: 价格上限(高于此值提醒)
lower_price: 价格下限(低于此值提醒)
"""
print(f"🔔 启动智能监控 - {stock_code}")
print(f"上限预警: {upper_price}" if upper_price else "无上限预警")
print(f"下限预警: {lower_price}" if lower_price else "无下限预警")
print("按 Ctrl+C 可停止监控")
print("=" * 50)

try:
while True: # 无限循环,直到用户手动停止
current_time = datetime.now().strftime('%H:%M:%S')
print(f"\n检查时间: {current_time}")

# 获取股价
df = ak.stock_zh_a_spot_em()
stock_data = df[df['代码'] == stock_code]

if not stock_data.empty:
name = stock_data['名称'].values[0]
current_price = stock_data['最新价'].values[0]
change = stock_data['涨跌幅'].values[0]

print(f"当前价格: ¥{current_price:.2f} (涨跌幅: {change:.2f}%)")

# 检查是否需要提醒
alert_triggered = False

if upper_price and current_price >= upper_price:
print(f"🚨 **价格预警!** {name}当前价格¥{current_price:.2f} 已达到上限 ¥{upper_price}")
alert_triggered = True

if lower_price and current_price <= lower_price:
print(f"🚨 **价格预警!** {name}当前价格¥{current_price:.2f} 已跌破下限 ¥{lower_price}")
alert_triggered = True

# 如果有预警,可以在这里添加邮件或微信通知功能
if alert_triggered:
# 发出提示音(仅在Windows有效)
try:
import winsound
winsound.Beep(1000, 500) # 频率1000Hz,持续500ms
except:
print("\a") # 通用响铃符

else:
print(f"未找到股票数据: {stock_code}")

# 等待下次检查
time.sleep(check_interval)

except KeyboardInterrupt:
print("\n\n监控已手动停止,谢谢使用!")
sys.exit(0) # 正常退出程序

# 测试提醒功能(设置合理的预警价格)
# monitor_with_alert("600519", upper_price=1700, lower_price=1600, check_interval=30)

五、完整项目代码

现在,我们把所有功能整合成一个完整的、用户友好的脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# ========== 股票数据监控器 - 完整版 ==========
import akshare as ak
import pandas as pd
import time
from datetime import datetime
import sys

def get_stock_info(stock_code):
"""获取股票基本信息"""
try:
df = ak.stock_zh_a_spot_em()
stock_data = df[df['代码'] == stock_code]

if not stock_data.empty:
return {
'name': stock_data['名称'].values[0],
'price': stock_data['最新价'].values[0],
'change': stock_data['涨跌幅'].values[0],
'volume': stock_data['成交量'].values[0]
}
except Exception as e:
print(f"获取数据失败: {e}")
return None

def print_stock_info(stock_info):
"""打印股票信息"""
if stock_info:
print(f"📊 {stock_info['name']}")
print(f"价格: ¥{stock_info['price']:.2f}")
print(f"涨跌: {stock_info['change']:.2f}%")
print(f"成交量: {stock_info['volume']}手")
print("-" * 30)
return stock_info['price']
return None

def main():
"""主程序"""
print("=" * 50)
print("股票数据监控器 v1.0")
print("=" * 50)

# 1. 输入股票代码
stock_code = input("请输入股票代码(如600519):").strip()

# 2. 获取并显示当前信息
print("\n正在获取数据...")
info = get_stock_info(stock_code)

if not info:
print("股票代码错误或网络连接失败!")
return

current_price = print_stock_info(info)

# 3. 设置预警价格
print("\n请设置价格预警(直接回车跳过):")
try:
upper = input("上限价格:").strip()
lower = input("下限价格:").strip()

upper_price = float(upper) if upper else None
lower_price = float(lower) if lower else None
except:
print("输入格式错误,将不使用价格预警")
upper_price = lower_price = None

# 4. 设置检查频率
try:
interval = int(input("\n检查间隔(秒,建议30):") or 30)
except:
interval = 30

print(f"\n✅ 监控启动!每{interval}秒检查一次")
print("按 Ctrl+C 停止监控\n")

# 5. 开始监控
try:
while True:
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"[{now}] 检查中...", end=" ")

info = get_stock_info(stock_code)
if info:
price = info['price']
print(f"¥{price:.2f} ({info['change']:.2f}%)", end="")

# 检查预警
alerted = False
if upper_price and price >= upper_price:
print(f" 🚨 突破上限!")
alerted = True
elif lower_price and price <= lower_price:
print(f" 🚨 跌破下限!")
alerted = True
else:
print() # 换行

# 如果有预警,可以在这里添加通知功能
if alerted:
print("\a") # 响铃提示
else:
print("获取数据失败")

time.sleep(interval)

except KeyboardInterrupt:
print("\n\n监控结束,感谢使用!")

if __name__ == "__main__":
main()

六、运行你的监控器

保存上面的代码为 stock_monitor.py,然后运行:python stock_monitor.py

使用流程:

  1. 输入股票代码(如600519)
  2. 查看当前股价
  3. 可选:设置上下限预警价格
  4. 设置检查间隔(秒)
  5. 开始自动监控

七、知识加油站

7.1 常用股票代码示例

  • 贵州茅台:600519
  • 中国平安:601318
  • 比亚迪:002594
  • 宁德时代:300750

7.2 如何扩展功能?

你可以尝试添加这些功能:

  1. 记录价格变化:把每次检查的价格存入CSV文件,方便后续分析
  2. 邮件提醒:达到预警价格时发送邮件通知
  3. 监控多只股票:同时监控你自己关注的股票组合
  4. 价格走势图:用matplotlib画出实时价格变化曲线

7.3 注意事项

  • 实时数据有延迟,仅供参考
  • 不要过于频繁请求数据(建议间隔≥30秒)
  • akshare库免费但依赖网络,请确保网络畅通

八、总结

  • ✅ 使用akshare库获取网络实时数据
  • ✅ 实现定时任务(用time.sleep
  • ✅ 添加条件判断和提醒功能
  • ✅ 将代码整合成用户友好的完整程序

下篇预告

第14篇《智能记账本》——我们将学习如何自动分类收支、设置预算提醒,打造一个懂你的私人理财助手!


🤖 Powered by Kimi K2 Thinking 💻 内容经葵葵🌻审核与修改