1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
| import requests from bs4 import BeautifulSoup import pandas as pd from datetime import datetime import time
class FinanceNewsAggregator: """财经新闻聚合器""" def __init__(self): self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' } self.all_news = [] def fetch_sina_finance(self, max_news=10): """抓取新浪财经""" url = "https://finance.sina.com.cn/china/" try: resp = requests.get(url, headers=self.headers, timeout=10) resp.encoding = 'utf-8' soup = BeautifulSoup(resp.text, 'html.parser') items = soup.select('ul.list_009 li') for i, item in enumerate(items[:max_news], 1): link_tag = item.find('a') time_tag = item.find('span') news = { '序号': len(self.all_news) + 1, '标题': link_tag.get_text(strip=True), '来源': '新浪财经', '发布时间': time_tag.get_text(strip=True) if time_tag else '', '链接': link_tag['href'], '摘要': '', '关键词': self._extract_keywords(link_tag.get_text(strip=True)) } self.all_news.append(news) print(f"✅ 新浪: {len(items[:max_news])}条") return True except Exception as e: print(f"❌ 新浪失败: {e}") return False def fetch_eastmoney(self, max_news=10): """抓取东方财富""" url = "https://www.eastmoney.com/" try: resp = requests.get(url, headers=self.headers, timeout=10) resp.encoding = 'utf-8' soup = BeautifulSoup(resp.text, 'html.parser') items = soup.select('div.newlist li')[:max_news] for item in items: link_tag = item.find('a') news = { '序号': len(self.all_news) + 1, '标题': link_tag.get_text(strip=True), '来源': '东方财富', '发布时间': datetime.now().strftime('%m-%d %H:%M'), '链接': link_tag['href'], '摘要': '', '关键词': self._extract_keywords(link_tag.get_text(strip=True)) } self.all_news.append(news) print(f"✅ 东财: {len(items)}条") return True except Exception as e: print(f"❌ 东财失败: {e}") return False def _extract_keywords(self, title: str): """简单关键词提取""" keywords = ['股市', '央行', '利率', '汇率', '房价', '通胀', 'GDP', '就业'] found = [kw for kw in keywords if kw in title] return ','.join(found) if found else '—' def fetch_all(self, max_per_source=8): """抓取所有来源""" print("\n" + "=" * 40) print("开始抓取财经新闻...") print("=" * 40) self.all_news = [] self.fetch_sina_finance(max_per_source) time.sleep(1) self.fetch_eastmoney(max_per_source) print(f"\n🎉 总计抓取 {len(self.all_news)} 条新闻") def show_dashboard(self): """显示新闻仪表板""" if not self.all_news: print("⚠️ 暂无数据") return print("\n" + "=" * 70) print("📊 财经新闻聚合 - 实时更新") print("=" * 70) df = pd.DataFrame(self.all_news) source_stats = df['来源'].value_counts() print("\n来源统计:") for source, count in source_stats.items(): print(f" {source}: {count}条") keywords = [] for kw in df['关键词']: if kw != '—': keywords.extend(kw.split(',')) kw_series = pd.Series(keywords).value_counts() print("\n今日热点关键词:") for kw, count in kw_series.head(5).items(): print(f" {kw}: {count}次") print("\n" + "-" * 70) print("新闻列表(按抓取顺序):") print("-" * 70) for news in self.all_news: print(f"\n{news['序号']}. [{news['来源']}] {news['标题']}") if news['关键词'] != '—': print(f" 🔑 {news['关键词']}") print(f" 🔗 {news['链接']}") def save_to_csv(self): """保存所有新闻""" if not self.all_news: print("⚠️ 无数据可保存") return filename = f"财经新闻聚合_{datetime.now().strftime('%Y%m%d_%H%M')}.csv" df = pd.DataFrame(self.all_news) df.to_csv(filename, index=False, encoding='utf-8-sig') print(f"\n✅ 已保存: {filename} ({len(df)}条)") return filename
def main(): """主程序""" aggregator = FinanceNewsAggregator() while True: print("\n" + "=" * 45) print("财经新闻聚合器") print("=" * 45) print("1. 抓取新闻") print("2. 显示新闻") print("3. 保存到CSV") print("4. 退出") print("=" * 45) choice = input("请选择: ").strip() if choice == "1": aggregator.fetch_all(max_per_source=8) elif choice == "2": aggregator.show_dashboard() elif choice == "3": aggregator.save_to_csv() elif choice == "4": print("👋 再见!") break else: print("请输入1-4!")
if __name__ == "__main__": main()
|