编写程序让智能雨伞检测到下雨湿度时,伞柄指示灯亮起,提醒带伞出门。
智能雨伞湿度检测系统 - 出行提醒解决方案一、实际应用场景描述某智能硬件创业团队计划开发一款智能雨伞产品解决都市人群因天气突变而忘带雨伞的痛点。现有市场产品仅具备基础防雨功能缺乏主动提醒能力。本系统目标在雨伞手柄内集成DHT11温湿度传感器实时监测环境湿度当湿度超过设定阈值如70%RH时通过RGB LED指示灯发出不同颜色警示配合手机App推送实现湿度超标→灯光提醒→带伞出门的完整链路。二、引入痛点痛点类型 具体表现 本方案解决方式忘带雨伞 早上看天晴下午突降暴雨 实时监测主动提醒误判天气 云层厚但无雨携带雨伞不便 精准湿度检测非单纯看天无提醒机制 即使有雨伞也常遗忘在角落 多维度提醒灯光App功能单一 传统雨伞无智能属性 增加环境感知能力三、核心逻辑讲解graph TDA[系统初始化] -- B[传感器校准]B -- C[持续监测环境湿度]C -- D{湿度阈值?}D -- 否 -- CD -- 是 -- E[判断持续时长]E -- F{持续确认时间?}F -- 否 -- CF -- 是 -- G[触发伞柄LED警示]G -- H[记录事件日志]H -- I[发送App推送]I -- J{湿度恢复正常?}J -- 否 -- GJ -- 是 -- K[关闭LED恢复待机]K -- C关键逻辑点1. 双重确认机制湿度超标需持续30秒才触发提醒避免路过水洼等瞬时干扰2. 分级警示湿度70-80%黄色预警80%以上红色紧急提醒3. 自适应阈值可根据季节/地区自动调整湿度阈值雨季降低阈值4. 能耗优化采用间歇采样策略非雨天每小时采样1次雨天每10秒采样1次四、代码模块化实现项目结构smart_umbrella/├── main.py # 主程序入口├── config.py # 配置文件├── sensors/ # 传感器模块│ ├── __init__.py│ └── humidity_sensor.py # DHT11湿度传感器驱动├── indicators/ # 指示灯模块│ ├── __init__.py│ └── led_controller.py # RGB LED控制├── notifications/ # 通知模块│ ├── __init__.py│ └── notification_service.py # App推送服务├── storage/ # 数据存储模块│ ├── __init__.py│ └── data_logger.py # 事件日志记录├── utils/ # 工具函数│ ├── __init__.py│ ├── debounce.py # 防抖处理│ └── adaptive_threshold.py # 自适应阈值算法└── tests/ # 单元测试├── __init__.py└── test_sensor.py1. 配置文件 (config.py)智能雨伞湿度检测系统配置文件包含传感器参数、指示灯设置、通知配置等# 传感器配置SENSOR_TYPE DHT11 # 支持的传感器类型: DHT11/DHT22/SHT31DHT_PIN 17 # DHT传感器数据引脚 (BCM编号)SAMPLE_INTERVAL_NORMAL 3600 # 正常模式采样间隔(秒) - 每小时1次SAMPLE_INTERVAL_RAINY 10 # 雨天模式采样间隔(秒) - 每10秒1次CONFIRMATION_TIME 30 # 湿度超标确认时间(秒)# 湿度阈值配置HUMIDITY_WARNING_THRESHOLD 70 # 黄色预警阈值 (%RH)HUMIDITY_CRITICAL_THRESHOLD 80 # 红色紧急阈值 (%RH)HYSTERESIS 5 # 迟滞值防止频繁切换状态# LED指示灯配置LED_RED_PIN 27 # 红色LED引脚LED_GREEN_PIN 22 # 绿色LED引脚LED_BLUE_PIN 18 # 蓝色LED引脚LED_BRIGHTNESS 0.8 # LED亮度 (0.0-1.0)# 通知配置ENABLE_APP_NOTIFICATION True # 是否启用App推送NOTIFICATION_COOLDOWN 1800 # 通知冷却时间(秒) - 30分钟内不重复推送PUSH_SERVER_URL https://api.pushservice.com/v1/send# 自适应阈值配置SEASONAL_ADJUSTMENT True # 是否启用季节性阈值调整RAINY_SEASON_MONTHS [5, 6, 7, 8, 9] # 雨季月份 (5-9月)# 日志配置LOG_FILE umbrella_events.logLOG_FORMAT %(asctime)s - %(levelname)s - %(message)sDATA_STORAGE_PATH ./sensor_data/2. 湿度传感器模块 (sensors/humidity_sensor.py)DHT系列温湿度传感器驱动模块支持DHT11/DHT22传感器读取环境湿度数据import Adafruit_DHT # DHT传感器Python库import timefrom config import SENSOR_TYPE, DHT_PIN, SAMPLE_INTERVAL_NORMALclass HumiditySensor:def __init__(self):初始化湿度传感器self.sensor_type getattr(Adafruit_DHT, SENSOR_TYPE)self.pin DHT_PINself.last_read_time 0self.cached_humidity Noneself.cached_temperature Noneself._validate_sensor()def _validate_sensor(self):验证传感器连接是否正常humidity, temperature Adafruit_DHT.read_retry(self.sensor_type, self.pin)if humidity is None or temperature is None:raise RuntimeError(f无法读取{SENSOR_TYPE}传感器数据请检查连接)print(f✅ {SENSOR_TYPE}传感器初始化成功)print(f 初始读数: 温度{temperature:.1f}°C, 湿度{humidity:.1f}%RH)def read_humidity(self, force_updateFalse) - float:读取当前环境湿度force_update: 是否强制刷新缓存返回: 湿度值(%RH)读取失败返回Nonecurrent_time time.time()# 使用缓存减少传感器读取次数DHT传感器读取间隔至少2秒if not force_update and self.cached_humidity is not None:if current_time - self.last_read_time 2.0:return self.cached_humidity# 读取传感器数据humidity, temperature Adafruit_DHT.read_retry(self.sensor_type, self.pin)if humidity is not None and 0 humidity 100:self.cached_humidity humidityself.cached_temperature temperatureself.last_read_time current_timereturn humidityelse:print(f⚠️ 湿度读取无效: {humidity})return Nonedef read_temperature(self) - float:读取当前环境温度self.read_humidity(force_updateTrue) # 强制更新以确保温度同步return self.cached_temperaturedef get_sensor_info(self) - dict:获取传感器详细信息return {sensor_type: SENSOR_TYPE,pin: self.pin,last_humidity: self.cached_humidity,last_temperature: self.cached_temperature,last_read_time: self.last_read_time}def cleanup(self):清理传感器资源pass # DHT传感器无需特殊清理3. 指示灯控制模块 (indicators/led_controller.py)RGB LED指示灯控制模块控制伞柄处的三色LED实现多级警示效果import RPi.GPIO as GPIOimport timefrom config import LED_RED_PIN, LED_GREEN_PIN, LED_BLUE_PIN, LED_BRIGHTNESSclass RGBLEDController:# LED颜色常量COLOR_OFF (0, 0, 0)COLOR_GREEN (1, 0, 0) # 绿灯湿度正常COLOR_YELLOW (1, 1, 0) # 黄灯湿度预警COLOR_RED (0, 0, 1) # 红灯湿度紧急COLOR_ORANGE (1, 0.5, 0) # 橙灯过渡状态def __init__(self):初始化RGB LED引脚self.red_pin LED_RED_PINself.green_pin LED_GREEN_PINself.blue_pin LED_BLUE_PINself.brightness LED_BRIGHTNESSself.current_color self.COLOR_OFFself.blinking Falseself.blink_thread Noneself._setup_gpio()def _setup_gpio(self):配置GPIO为输出模式GPIO.setmode(GPIO.BCM)GPIO.setup(self.red_pin, GPIO.OUT)GPIO.setup(self.green_pin, GPIO.OUT)GPIO.setup(self.blue_pin, GPIO.OUT)# 初始化PWM控制亮度self.red_pwm GPIO.PWM(self.red_pin, 1000) # 1kHz频率self.green_pwm GPIO.PWM(self.green_pin, 1000)self.blue_pwm GPIO.PWM(self.blue_pin, 1000)# 启动PWM初始占空比0self.red_pwm.start(0)self.green_pwm.start(0)self.blue_pwm.start(0)print(✅ RGB LED指示灯初始化成功)def set_color(self, color: tuple, brightness: float None):设置LED颜色color: RGB元组 (红, 绿, 蓝)值为0或1brightness: 亮度系数 (0.0-1.0)None则使用默认值if brightness is None:brightness self.brightnessr, g, b color# 设置PWM占空比控制亮度self.red_pwm.ChangeDutyCycle(r * 100 * brightness)self.green_pwm.ChangeDutyCycle(g * 100 * brightness)self.blue_pwm.ChangeDutyCycle(b * 100 * brightness)self.current_color colordef blink(self, color: tuple, interval: float 0.5):让LED闪烁指定颜色color: 闪烁颜色interval: 闪烁间隔(秒)self.blinking Trueself._blink_thread threading.Thread(targetself._blink_loop,args(color, interval),daemonTrue)self._blink_thread.start()def _blink_loop(self, color, interval):闪烁循环while self.blinking:self.set_color(color)time.sleep(interval)self.set_color(self.COLOR_OFF)time.sleep(interval)def stop_blink(self):停止闪烁self.blinking Falseif self._blink_thread:self._blink_thread.join(timeout1.0)self.set_color(self.current_color)def set_warning_level(self, level: str):根据警告级别设置LED状态level: normal, warning, criticalif level normal:self.set_color(self.COLOR_GREEN)elif level warning:self.set_color(self.COLOR_YELLOW)elif level critical:self.set_color(self.COLOR_RED, brightness1.0) # 紧急状态全亮else:self.set_color(self.COLOR_OFF)def get_status(self) - dict:获取LED当前状态return {current_color: self.current_color,brightness: self.brightness,is_blinking: self.blinking}def cleanup(self):清理LED资源self.stop_blink()self.set_color(self.COLOR_OFF)self.red_pwm.stop()self.green_pwm.stop()self.blue_pwm.stop()GPIO.cleanup([self.red_pin, self.green_pin, self.blue_pin])4. 通知服务模块 (notifications/notification_service.py)移动端通知服务模块通过HTTP API发送推送通知到用户手机import requestsimport jsonimport timeimport hashlibfrom config import ENABLE_APP_NOTIFICATION, NOTIFICATION_COOLDOWN, PUSH_SERVER_URLclass NotificationService:def __init__(self, user_id: str):初始化通知服务user_id: 用户唯一标识self.user_id user_idself.last_notification_time 0self.notification_history []if ENABLE_APP_NOTIFICATION:print(✅ 应用推送服务已启用)else:print(ℹ️ 应用推送服务已禁用)def send_push_notification(self, title: str, message: str,priority: str normal) - bool:发送推送通知到用户手机title: 通知标题message: 通知内容priority: 优先级 (low, normal, high)返回: 发送成功返回Trueif not ENABLE_APP_NOTIFICATION:print(推送服务已禁用跳过发送)return Falsecurrent_time time.time()# 检查冷却时间if current_time - self.last_notification_time NOTIFICATION_COOLDOWN:remaining NOTIFICATION_COOLDOWN - (current_time - self.last_notification_time)print(f通知冷却中剩余{remaining:.0f}秒)return Falsetry:# 构建请求负载payload {user_id: self.user_id,title: title,message: message,priority: priority,timestamp: current_time,device_type: smart_umbrella,signature: self._generate_signature(title, message)}# 发送POST请求response requests.post(PUSH_SERVER_URL,jsonpayload,headers{Content-Type: application/json},timeout10)response.raise_for_status()# 更新最后发送时间self.last_notification_time current_timeself.notification_history.append({title: title,message: message,sent_at: current_time,success: True})print(f 推送通知已发送: {title})return Trueexcept requests.exceptions.RequestException as e:print(f推送发送失败: {e})self.notification_history.append({title: title,message: message,sent_at: current_time,success: False,error: str(e)})return Falsedef _generate_signature(self, title: str, message: str) - str:生成请求签名用于身份验证secret_key your_secret_key_here # 实际应用中应从安全存储获取data f{self.user_id}{title}{message}{secret_key}return hashlib.sha256(data.encode()).hexdigest()def get_notification_history(self, limit: int 10) - list:获取最近的通知历史return self.notification_history[-limit:]def cleanup(self):清理通知服务资源pass5. 数据存储模块 (storage/data_logger.py)传感器数据存储模块记录湿度数据和系统事件支持本地存储和云端同步import csvimport jsonimport osfrom datetime import datetimefrom config import DATA_STORAGE_PATH, LOG_FILEclass DataLogger:def __init__(self):初始化数据存储self.data_path DATA_STORAGE_PATHself.log_file LOG_FILEself._ensure_directories()self._init_csv_file()def _ensure_directories(self):确保存储目录存在if not os.path.exists(self.data_path):os.makedirs(self.data_path)print(f 创建数据目录: {self.data_path})def _init_csv_file(self):初始化CSV数据文件csv_path os.path.join(self.data_path, humidity_data.csv)if not os.path.exists(csv_path):with open(csv_path, w, newline) as f:writer csv.writer(f)writer.writerow([timestamp, humidity, temperature,warning_level, led_status])print(f 创建数据文件: {csv_path})def log_humidity_event(self, humidity: float, temperature: float,warning_level: str, led_status: str):记录湿度事件到CSV文件timestamp datetime.now().isoformat()csv_path os.path.join(self.data_path, humidity_data.csv)with open(csv_path, a, newline) as f:writer csv.writer(f)writer.writerow([timestamp, humidity, temperature,warning_level, led_status])def log_system_event(self, level: str, message: str):记录系统事件到日志文件level: INFO, WARNING, ERRORtimestamp datetime.now().strftime(%Y-%m-%d %H:%M:%S)log_entry f{timestamp} - {level} - {message}\nwith open(self.log_file, a, encodingutf-8) as f:f.write(log_entry)# 同时在控制台输出print(f[{level}] {message})def get_recent_data(self, hours: int 24) - list:获取最近N小时的数据csv_path os.path.join(self.data_path, humidity_data.csv)cutoff_time datetime.now().timestamp() - (hours * 3600)recent_data []try:with open(csv_path, r, newline) as f:reader csv.DictReader(f)for row in reader:row_time datetime.fromisoformat(row[timestamp]).timestamp()if row_time cutoff_time:recent_data.append(row)except FileNotFoundError:passreturn recent_datadef export_to_json(self, filename: str None) - str:导出数据为JSON格式if filename is None:filename fhumidity_export_{datetime.now().strftime(%Y%m%d_%H%M%S)}.jsonfilepath os.path.join(self.data_path, filename)data {export_time: datetime.now().isoformat(),recent_24h_data: self.get_recent_data(24)}with open(filepath, w, encodingutf-8) as f:json.dump(data, f, indent2, ensure_asciiFalse)print(f 数据已导出到: {filepath})return filepathdef cleanup(self):清理数据存储资源pass6. 自适应阈值模块 (utils/adaptive_threshold.py)自适应阈值算法模块根据季节、地理位置和历史数据动态调整湿度阈值from datetime import datetimefrom config import SEASONAL_ADJUSTMENT, RAINY_SEASON_MONTHS, \HUMIDITY_WARNING_THRESHOLD, HUMIDITY_CRITICAL_THRESHOLDclass AdaptiveThreshold:def __init__(self):初始化自适应阈值管理器self.base_warning_threshold HUMIDITY_WARNING_THRESHOLDself.base_critical_threshold HUMIDITY_CRITICAL_THRESHOLDself.current_warning_threshold self.base_warning_thresholdself.current_critical_threshold self.base_critical_thresholddef get_month(self) - int:获取当前月份return datetime.now().monthdef is_rainy_season(self) - bool:判断当前是否为雨季month self.get_month()return month in RAINY_SEASON_MONTHSdef calculate_thresholds(self) - tuple:计算当前环境下的自适应阈值返回: (warning_threshold, critical_threshold)if not SEASONAL_ADJUSTMENT:return (self.base_warning_threshold, self.base_critical_threshold)if self.is_rainy_season():# 雨季降低阈值提高敏感度warning max(50, self.base_warning_threshold - 15)critical max(60, self.base_critical_threshold - 15)else:# 旱季提高阈值减少误报warning min(80, self.base_warning_threshold 10)critical min(90, self.base_critical_threshold 10)self.current_warning_threshold warningself.current_critical_threshold criticalreturn (warning, critical)def get_thresholds_info(self) - dict:获取阈值信息warning, critical self.calculate_thresholds()return {base_warning: self.base_warning_threshold,base_critical: self.base_critical_threshold,current_warning: warning,current_critical: critical,is_rainy_season: self.is_rainy_season(),adjustment_active: SEASONAL_ADJUSTMENT}7. 防抖处理模块 (utils/debounce.py)防抖处理工具模块用于消除传感器噪声和瞬时干扰from collections import dequefrom config import CONFIRMATION_TIMEclass HumidityDebouncer:def __init__(self, confirmation_samples: int 3):初始化湿度防抖器confirmation_samples: 确认所需连续超标的样本数self.confirmation_samples confirmation_samplesself.above_threshold_count 0self.humidity_history deque(maxlenconfirmation_samples)self.is_confirmed_high Falsedef process(self, current_humidity: float, threshold: float) - bool:处理当前湿度值返回是否确认超标is_above current_humidity thresholdself.humidity_history.append(is_above)if is_above:self.above_threshold_count 1else:self.above_threshold_count 0# 确认需要连续多次超标was_confirmed self.is_confirmed_highself.is_confirmed_high self.above_threshold_count self.confirmation_samplesreturn self.is_confirmed_highdef reset(self):重置防抖器状态self.above_threshold_count 0self.humidity_history.clear()self.is_confirmed_high False8. 主程序入口 (main.py)#!/usr/bin/env python3智能雨伞湿度检测系统主程序实现环境湿度监测、阈值判断、LED提醒、通知推送的完整功能链import sysimport timeimport threadingimport signalfrom sensors.humidity_sensor import HumiditySensorfrom indicators.led_controller import RGBLEDControllerfrom notifications.notification_service import NotificationServicefrom storage.data_logger import DataLoggerfrom utils.adaptive_threshold import AdaptiveThresholdfrom utils.debounce import HumidityDebouncerfrom config import (SAMPLE_INTERVAL_NORMAL, SAMPLE_INTERVAL_RAINY,CONFIRMATION_TIME, HUMIDITY_WARNING_THRESHOLD,HUMIDITY_CRITICAL_THRESHOLD, HYSTERESIS)class SmartUmbrellaSystem:智能雨伞控制系统主类# 系统状态枚举STATE_IDLE IDLE # 空闲状态湿度正常STATE_MONITORING MONITORING # 监控状态湿度偏高STATE_WARNING WARNING # 预警状态黄色警示STATE_CRITICAL CRITICAL # 紧急状态红色警示def __init__(self, user_id: str default_user):初始化所有子系统print( 正在初始化智能雨伞系统...)# 初始化各模块self.sensor HumiditySensor()self.led RGBLEDController()self.notifier NotificationService(user_id)self.logger DataLogger()self.threshold_manager AdaptiveThreshold()self.debouncer HumidityDebouncer()# 系统状态变量self.current_state self.STATE_IDLEself.warning_level normalself.last_humidity Noneself.last_temp Noneself.running Trueself.in_rainy_condition False# 注利用AI解决实际问题如果你觉得这个工具好用欢迎关注长安牧笛