1. 项目概述边缘新闻聚合的诞生最近在折腾一个挺有意思的小项目我把它叫做“News — At The Edge”。这个名字听起来可能有点抽象但它的核心想法其实很直接我们每天都被海量的新闻信息淹没但真正有价值、与我们切身相关的可能只是其中很小一部分。传统的新闻App要么推送过于庞杂要么算法推荐形成“信息茧房”。这个项目的初衷就是想做一个完全运行在“边缘”的、高度定制化的新闻筛选与聚合器。这里的“边缘”并不是一个地理概念而是技术架构上的“边缘计算”思想。我不希望依赖一个庞大的中心服务器去抓取、分析和推送新闻那样延迟高、隐私存疑而且模式僵化。我希望这个工具能更“轻”、更“近”、更“智能”地运行在我自己的设备或家庭网络环境中只为我一个人服务。它就像一个驻扎在我网络边缘的私人新闻管家按照我设定的规则从指定的、可信的新闻源主要是各大媒体的RSS抓取内容然后根据我关心的关键词、主题进行实时过滤、去重和优先级排序最后生成一份简洁的、属于我个人的“边缘新闻简报”。这个项目特别适合那些对信息质量有要求又希望保持技术掌控感的开发者、科技爱好者或深度阅读者。它不追求大而全而是追求精准和效率。在5月12日这个版本里我基本跑通了核心流程实现了一个可用的最小可行产品。接下来我就把这套从设计思路到踩坑实录的完整过程分享出来。2. 核心架构与设计思路拆解2.1 为什么选择“边缘”架构在项目启动前我首先评估了三种常见的方案一是使用现成的新闻API服务二是自建中心服务器进行爬取和分发三是采用边缘计算模式。现成的API如News API、GNews等虽然方便但存在调用次数限制、内容可能被二次处理、以及无法定制特定小众信源的问题。自建中心服务器则意味着需要维护一台24小时运行的VPS处理爬虫管理、反爬对抗、数据存储和多个终端的数据同步运维成本和复杂度陡然上升。而边缘架构的优势在于隐私与数据自主所有抓取、过滤、分析的动作都发生在你控制的设备上比如家里的NAS、一台始终开机的旧电脑甚至是一个树莓派原始新闻数据不过任何第三方的手阅读历史、偏好设置完全本地化。成本与灵活性省去了中心服务器的费用。规则可以极端个性化你可以轻松添加任何一个提供RSS的小众博客而无需担心服务器爬虫策略的通用性问题。实时性与低延迟边缘设备直接向新闻源发起请求减少了中间转发环节。结合定时任务可以实现近实时的新闻获取。学习与可控性整个数据流水线透明可见从网络请求到文本处理每个环节都可以根据你的需求调整和优化是一个绝佳的学习和实践项目。因此我决定采用边缘架构。核心模型是“一个调度中心 多个轻量级工作节点”。调度中心Cron Job或系统定时任务负责按计划触发工作节点Python脚本则执行具体的抓取、解析、过滤和生成任务。2.2 技术栈选型与考量技术选型上我遵循“轻量、高效、易维护”的原则。核心语言Python 3.8。选择Python是因为其在数据处理、网络爬虫和快速原型开发上无与伦比的生态优势。requests、feedparser、BeautifulSoup4这些库能极大简化开发。依赖管理Pipenv。它比单纯的pip能更好地管理项目依赖和虚拟环境特别是当项目需要部署到不同边缘设备时能确保环境一致性。数据解析Feedparser BeautifulSoup4。绝大多数新闻网站和博客都提供RSS或Atom订阅源feedparser是解析它们的利器稳定且兼容性好。对于少数需要从HTML中提取额外信息的源用BeautifulSoup4补足。文本处理与过滤Jieba (中文) / NLTK (英文) 正则表达式。关键词过滤是核心。对于中文新闻我选用jieba进行分词结合自定义词典加入专业词汇以提高关键词匹配的准确性。初步过滤用正则表达式处理标题和摘要更复杂的语义分析在这个MVP版本中暂未深入可以留待后续。数据存储SQLite。边缘场景下SQLite是天然的选择。它无需单独的服务进程一个文件就是一个数据库非常适合存储订阅源列表、抓取记录、新闻条目和用户过滤规则。轻量且足够可靠。任务调度Systemd Timer (Linux) 或 Crontab。为了保持简单我没有引入Celery这类重型任务队列。在Linux边缘设备上systemd timer是比crontab更现代和强大的选择它能更好地管理日志、处理服务依赖和自动重启。对于macOS或Windows也有相应的定时任务方案。输出与通知Markdown文件 邮件 / Webhook。将最终生成的新闻简报保存为Markdown文件便于阅读和归档。同时可以通过smtplib发送邮件到Kindle或邮箱或者调用requests发送Webhook到钉钉、Slack等协作工具实现推送。这个技术栈组合在保证功能完整的前提下最大限度地降低了资源消耗和架构复杂度非常适合在资源有限的边缘设备上长期稳定运行。3. 核心模块实现细节解析3.1 新闻源管理与RSS抓取器新闻源的配置是整个系统的输入源头我设计了一个JSON格式的配置文件sources.json因为它比YAML更易于Python原生解析也比INI格式能表达更复杂的结构。[ { name: Solidot, url: https://www.solidot.org/index.rss, language: zh, category: tech, enabled: true, priority: 1 }, { name: Reuters Technology, url: http://feeds.reuters.com/reuters/technologyNews, language: en, category: tech, enabled: true, priority: 2 }, { name: 某个人博客, url: https://example.com/feed, language: zh, category: blog, enabled: true, priority: 3, requires_js: false, custom_parser: null } ]每个源除了基本的名称和URL还定义了语言、类别、启用状态和优先级。优先级用于在最终简报中排序。requires_js和custom_parser是为未来扩展预留的用于处理那些动态加载或结构特殊的网站。抓取器的核心函数如下import feedparser import requests from datetime import datetime, timedelta import sqlite3 import logging logger logging.getLogger(__name__) def fetch_feed(source_config): 抓取单个RSS源 name source_config[name] url source_config[url] logger.info(f开始抓取源: {name}) try: # 设置请求头模拟浏览器避免被简单的反爬拦截 headers { User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36 } # 对于简单情况feedparser可以直接处理URL feed feedparser.parse(url) if feed.bozo: # bozo标志位表示解析可能出错 logger.warning(f源 {name} RSS解析异常尝试使用requests获取内容后解析。异常信息: {feed.bozo_exception}) # 降级方案使用requests获取原始内容 resp requests.get(url, headersheaders, timeout30) resp.raise_for_status() feed feedparser.parse(resp.content) entries [] for entry in feed.entries: # 标准化条目信息处理可能缺失的字段 pub_time entry.get(published_parsed) or entry.get(updated_parsed) if pub_time: pub_time datetime(*pub_time[:6]) # 转换为datetime对象 else: pub_time datetime.utcnow() # 如果没有时间使用当前时间 # 只抓取最近24小时的内容避免历史数据堆积 if datetime.utcnow() - pub_time timedelta(hours24): continue standardized_entry { title: entry.title, link: entry.link, summary: entry.get(summary, ), published: pub_time, source: name, category: source_config.get(category, general) } entries.append(standardized_entry) logger.info(f源 {name} 抓取到 {len(entries)} 条新条目) return entries except requests.exceptions.RequestException as e: logger.error(f抓取源 {name} 时网络错误: {e}) return [] except Exception as e: logger.error(f抓取源 {name} 时发生未知错误: {e}) return []注意feedparser的bozo标志位非常有用它能捕获到XML格式不规范等解析错误。当bozo为True时通过bozo_exception可以查看具体错误。采用requests获取原始内容再解析是一个有效的降级策略。另外务必为网络请求设置超时如30秒避免因某个源响应慢而阻塞整个抓取流程。3.2 基于关键词与规则的内容过滤引擎抓取到的新闻条目是原始的我们需要一个过滤引擎来筛选出真正感兴趣的内容。我设计了一个两层过滤机制规则过滤和关键词评分。首先在rules.json中定义规则{ must_have_keywords: [人工智能, 机器学习, LLM, 开源], block_keywords: [八卦, 娱乐, 彩票], category_weights: { tech: 2.0, science: 1.5, finance: 1.2, general: 1.0, blog: 0.8 }, source_priorities: { Solidot: 3, Reuters Technology: 2 } }过滤引擎的工作流程如下硬性规则过滤Must/Block如果一条新闻的标题或摘要中包含block_keywords中的任何一个词则直接丢弃。这是一个“一票否决”机制用于过滤完全不感兴趣的内容。关键词匹配与评分对通过硬性过滤的新闻计算其与must_have_keywords的匹配度。这里不是简单的布尔判断而是引入评分制。基础分新闻条目所在类别的权重category_weights。加分项标题中每匹配一个关键词加1分摘要中每匹配一个关键词加0.5分。同时匹配关键词的权重可以不同例如“人工智能”比“开源”权重更高这可以通过在关键词列表中用元组(“人工智能”, 1.5)来表示。源优先级加成乘以新闻来源的优先级系数source_priorities。时间衰减因子新闻的价值随时间推移而降低。可以为发布时间引入一个衰减因子例如发布后每小时得分衰减1%鼓励系统优先呈现更新鲜的内容。import jieba from datetime import datetime class ContentFilter: def __init__(self, rules_path): self.load_rules(rules_path) # 初始化jieba加载用户词典如果有 jieba.initialize() # 可以添加用户词典 jieba.load_userdict(my_dict.txt) def calculate_score(self, entry): 计算新闻条目的得分 score self.category_weights.get(entry[category], 1.0) # 关键词匹配加分 title_words set(jieba.lcut_for_search(entry[title])) summary_words set(jieba.lcut_for_search(entry[summary])) for keyword, weight in self.keywords_with_weight: if keyword in title_words: score weight * 1.0 # 标题匹配权重高 elif keyword in summary_words: score weight * 0.5 # 摘要匹配权重低 # 源优先级加成 score * self.source_priorities.get(entry[source], 1.0) # 时间衰减示例每过1小时得分乘以0.99 hours_passed (datetime.utcnow() - entry[published]).total_seconds() / 3600 time_decay 0.99 ** hours_passed score * time_decay return score def filter_and_rank(self, entries, top_n20): 过滤并排名新闻条目 filtered [] for entry in entries: # 1. 硬性屏蔽词检查 if self.has_blocked_keywords(entry): continue # 2. 计算得分 entry[score] self.calculate_score(entry) filtered.append(entry) # 按得分降序排序返回前top_n条 filtered.sort(keylambda x: x[score], reverseTrue) return filtered[:top_n]实操心得关键词匹配的准确性严重依赖分词质量。对于中文jieba.lcut_for_search比lcut更适合搜索场景它能将长词拆开。务必根据你的领域构建一个自定义词典例如加入“Stable Diffusion”、“Transformer”等专有名词避免被错误切分。评分规则是系统的“大脑”需要反复调整权重参数来逼近你个人的阅读偏好这是一个持续迭代的过程。3.3 数据持久化与去重机制为了避免每次抓取都推送相同的新闻一个健壮的去重机制是必须的。我选择使用SQLite数据库并设计了两张核心表。-- sources 表记录订阅源信息 CREATE TABLE IF NOT EXISTS sources ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT UNIQUE NOT NULL, url TEXT NOT NULL, last_fetched TIMESTAMP DEFAULT NULL ); -- articles 表记录抓取到的文章 CREATE TABLE IF NOT EXISTS articles ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL, link TEXT UNIQUE NOT NULL, -- 链接作为唯一标识用于去重 summary TEXT, published TIMESTAMP NOT NULL, source_id INTEGER NOT NULL, fetched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, is_read BOOLEAN DEFAULT 0, score REAL DEFAULT 0.0, FOREIGN KEY (source_id) REFERENCES sources (id) ); -- 为link和published字段创建索引以提高查询速度 CREATE INDEX IF NOT EXISTS idx_articles_link ON articles (link); CREATE INDEX IF NOT EXISTS idx_articles_published ON articles (published);去重逻辑在存储抓取结果时实现def save_articles_if_new(articles, db_pathnews_edge.db): 保存文章列表自动去重 conn sqlite3.connect(db_path) cursor conn.cursor() new_count 0 for article in articles: # 以link作为唯一键进行去重 cursor.execute(SELECT id FROM articles WHERE link ?, (article[link],)) if cursor.fetchone() is None: # 获取source_id如果不存在则插入新源 cursor.execute(INSERT OR IGNORE INTO sources (name, url) VALUES (?, ?), (article[source], article.get(source_url, ))) cursor.execute(SELECT id FROM sources WHERE name ?, (article[source],)) source_id cursor.fetchone()[0] # 插入新文章 cursor.execute( INSERT INTO articles (title, link, summary, published, source_id, score) VALUES (?, ?, ?, ?, ?, ?) , (article[title], article[link], article[summary], article[published].isoformat(), source_id, article.get(score, 0))) new_count 1 conn.commit() # 更新源的最近抓取时间 for source_name in set([a[source] for a in articles]): cursor.execute(UPDATE sources SET last_fetched CURRENT_TIMESTAMP WHERE name ?, (source_name,)) conn.commit() conn.close() logger.info(f保存了 {new_count} 篇新文章去重了 {len(articles) - new_count} 篇重复文章。)这个机制确保了即使抓取脚本被多次执行同一篇新闻也只会被记录和推送一次。is_read字段可以用于未来实现“已读”状态同步。4. 系统集成与自动化部署4.1 使用Systemd Timer实现可靠调度在Linux边缘设备上crontab虽然简单但在服务管理、日志集成和故障重启方面不如systemd。我选择使用systemd timer来调度我的新闻抓取脚本。首先创建一个系统服务单元文件/etc/systemd/system/news-edge-fetcher.service[Unit] DescriptionNews At The Edge Fetcher Service Afternetwork-online.target Wantsnetwork-online.target [Service] Typeoneshot Userpi # 替换为你的用户名 Grouppi # 替换为你的用户组 WorkingDirectory/home/pi/news-at-the-edge # 替换为你的项目路径 EnvironmentPATH/home/pi/.local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin ExecStart/usr/bin/bash /home/pi/news-at-the-edge/run_fetch.sh StandardOutputjournal StandardErrorjournal # 如果脚本失败在10秒后重试一次 Restarton-failure RestartSec10这个服务单元定义了做什么。它指定了执行的身份、工作目录、环境变量和具体的启动命令。Typeoneshot表示这是一个执行完就退出的任务。Restarton-failure确保了脚本因临时网络问题失败时会自动重试。然后创建对应的定时器单元文件/etc/systemd/system/news-edge-fetcher.timer[Unit] DescriptionRun News Fetcher every 2 hours Requiresnews-edge-fetcher.service [Timer] OnCalendar*-*-* 0/2:00:00 # 每两小时执行一次 Persistenttrue # 如果错过执行时间如设备关机下次启动后立即补执行 RandomizedDelaySec300 # 随机延迟0-300秒避免所有边缘设备同时请求对新闻源友好 [Install] WantedBytimers.target这个定时器单元定义了何时做。OnCalendar使用了systemd的时间表达式非常灵活。Persistenttrue是一个关键设置它保证了即使设备在计划执行时间处于关机状态开机后也会尽快运行一次任务不会因为关机而完全错过一次抓取。RandomizedDelaySec则是一个礼貌性的设置将请求时间分散开。启用并启动定时器sudo systemctl daemon-reload sudo systemctl enable news-edge-fetcher.timer sudo systemctl start news-edge-fetcher.timer # 查看定时器状态 sudo systemctl list-timers --all4.2 输出格式化与推送渠道经过过滤和排序的新闻需要以一种友好的格式呈现。我选择生成Markdown文件因为它格式简单、通用便于后续处理如转换为PDF、HTML或发送到支持Markdown的阅读器。def generate_markdown_report(filtered_articles, output_pathnews_report.md): 生成Markdown格式的新闻简报 with open(output_path, w, encodingutf-8) as f: f.write(f# 边缘新闻简报 {datetime.now().strftime(%Y-%m-%d %H:%M)}\n\n) f.write(f 生成时间{datetime.now().strftime(%Y-%m-%d %H:%M:%S)} | 共 {len(filtered_articles)} 条精选新闻\n\n) current_category None for article in filtered_articles: if article[category] ! current_category: current_category article[category] f.write(f\n## {current_category.upper()}\n\n) # 使用星级表示得分高低简单可视化 score_star ★ * min(5, int(article.get(score, 0) / 2)) f.write(f### {article[title]} {score_star}\n) f.write(f**来源**{article[source]} | **发布时间**{article[published].strftime(%m-%d %H:%M)}\n\n) f.write(f{article[summary][:200]}...\n\n) f.write(f[阅读原文]({article[link]})\n\n) f.write(---\n\n) logger.info(fMarkdown简报已生成{output_path})生成文件后可以通过多种方式推送邮件推送使用smtplib将Markdown内容作为邮件正文发送到指定邮箱。可以设置邮件客户端规则自动将此类邮件转发到Kindle。Webhook通知将简报的核心内容如前5条标题和链接通过requests库发送到钉钉、飞书或Slack的Webhook实现即时通知。同步到云存储使用rclone或云服务商SDK将生成的news_report.md自动同步到Dropbox、iCloud或坚果云方便在手机、平板等多设备查看。我在run_fetch.sh脚本中集成了这些步骤#!/bin/bash cd /home/pi/news-at-the-edge source $(pipenv --venv)/bin/activate # 激活虚拟环境 # 1. 运行抓取和过滤脚本 python main.py fetch python main.py filter # 2. 生成简报 python main.py generate # 3. 可选发送邮件 # python main.py send_mail # 4. 可选发送Webhook通知 # python main.py send_webhook logger News Edge Fetcher completed at $(date)5. 常见问题、优化与排查实录在实际部署和运行过程中我遇到了不少典型问题这里记录下排查思路和解决方案。5.1 抓取失败与网络问题处理问题现象日志中频繁出现网络超时或连接拒绝错误导致部分新闻源抓取失败。排查与解决增加重试机制与超时设置在requests请求中必须设置timeout参数如timeout(10, 30)表示连接超时10秒读取超时30秒。对于重要源可以封装一个带重试的请求函数。from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry def requests_retry_session(retries3, backoff_factor0.5, status_forcelist(500, 502, 504)): session requests.Session() retry Retry( totalretries, readretries, connectretries, backoff_factorbackoff_factor, status_forceliststatus_forcelist, ) adapter HTTPAdapter(max_retriesretry) session.mount(http://, adapter) session.mount(https://, adapter) return session session requests_retry_session() response session.get(url, timeout30)使用备用源或镜像对于一些知名的新闻源可以配置多个URL如官方源和社区维护的镜像。在抓取时按顺序尝试直到有一个成功。错峰抓取与随机延迟在定时任务中不要在整点准时运行。使用systemd timer的RandomizedDelaySec或在脚本开始执行时time.sleep(random.randint(0, 300))将请求分散开避免对目标服务器造成压力也降低被屏蔽的风险。记录详细日志每次抓取都应记录成功/失败状态、响应时间、数据条数。长期观察日志可以帮助你发现某些源是否变得不稳定以便及时调整或寻找替代源。5.2 内容解析异常与格式兼容问题现象feedparser解析某些RSS源时抛出异常或者解析出的标题、链接为空。排查与解决检查bozo_exception如前所述这是第一道防线。根据异常信息判断是网络问题、编码问题还是XML格式问题。处理编码问题有些源的HTTP头声明编码为UTF-8但实际内容可能是GB2312。可以在使用feedparser.parse(response.content)前先尝试用chardet库检测编码然后手动解码。import chardet detected chardet.detect(response.content) content response.content.decode(detected[encoding] or utf-8, errorsignore) feed feedparser.parse(content)字段缺失的容错处理在提取entry.title、entry.link等字段时务必使用entry.get(title, )的方式并提供默认值。对于时间字段如果published_parsed不存在可以尝试updated_parsed或者直接使用当前时间并在日志中标记警告。HTML标签清理有些RSS的summary字段包含大量HTML标签。可以使用BeautifulSoup快速清理只获取文本。from bs4 import BeautifulSoup summary_text BeautifulSoup(entry.get(summary, ), html.parser).get_text(stripTrue)5.3 过滤效果不理想与规则调优问题现象推送的新闻要么太多无关内容要么漏掉了重要新闻。排查与解决建立反馈循环在生成的Markdown简报中为每条新闻添加一个简单的反馈机制。例如在每条新闻末尾加上[有用]、[无关]的伪链接实际可以是本地脚本的调用。运行一个辅助脚本定期收集这些反馈日志并统计哪些关键词或源经常被标记为“无关”哪些被标记为“有用”。引入否定关键词除了block_keywords完全屏蔽可以增加negative_keywords列表。包含这些词的新闻不会直接被丢弃但会在评分中被大幅扣减分数。这提供了更细粒度的控制。动态调整权重可以根据历史阅读行为如果实现了“已读”标记动态调整规则。例如某个类别下的新闻如果连续多次被标记为未读或快速跳过可以自动调低其category_weights。人工干预入口保留一个“白名单”或“紧急推送”功能。可以创建一个特殊的配置文件urgent_rules.json其中定义的规则如包含某个特定关键词组合会让新闻直接获得最高分确保重要信息不被遗漏。5.4 系统资源与长期运行维护问题现象运行一段时间后数据库文件变大内存使用增加或日志文件占满磁盘。排查与解决数据库清理定期清理旧数据。可以在抓取脚本中增加一个维护任务例如每周删除published时间超过30天的文章。# 定期清理旧数据 cutoff_date (datetime.utcnow() - timedelta(days30)).isoformat() cursor.execute(DELETE FROM articles WHERE published ?, (cutoff_date,)) logger.info(f清理了 {cursor.rowcount} 条30天前的旧文章。)日志轮转使用logrotate工具管理应用日志。创建一个配置文件/etc/logrotate.d/news-edge/home/pi/news-at-the-edge/logs/*.log { daily missingok rotate 7 compress delaycompress notifempty create 644 pi pi }这会将日志按天切割保留最近7天并压缩旧日志。监控与告警为关键指标添加简单监控。例如检查每次抓取的新文章数量是否在正常范围内如非零。如果连续多次抓取到0篇文章可能意味着抓取逻辑失效或网络异常可以通过发送邮件或Webhook通知自己。这个“News — At The Edge”项目从构思到落地让我深刻体会到边缘计算的魅力所在——将控制权和隐私权牢牢握在自己手中通过可编程的方式定制专属的信息流。它现在每天安静地运行在我的树莓派上早晨准时将一份精心筛选的简报推送到我的邮箱成为了我高效获取行业信息不可或缺的工具。整个系统模块清晰扩展性强你可以很容易地为其添加新的数据源比如监控特定GitHub仓库的Release、Hacker News的特定主题、更复杂的过滤逻辑如简单的情绪分析或者更丰富的输出格式。希望这份详细的构建实录能帮助你打造属于自己的那个“边缘信息管家”。