ROS2 Action 实战进阶:从零构建异步任务框架与 MoveIt! 运动控制集成
1. ROS2 Action 异步任务框架设计在机器人开发中经常需要处理长时间运行的任务比如导航到指定位置、执行抓取动作等。ROS2的Action机制就是为了解决这类场景而设计的它比简单的Service更适合处理需要反馈、可能被取消的长时间任务。我曾经在一个工业分拣项目中遇到过这样的需求机器人需要按照优先级顺序执行多个分拣任务同时还要实时反馈任务进度。这时候基础的Action机制就显得有些力不从心了我们需要构建一个更强大的异步任务框架。1.1 自定义复杂Action消息首先我们需要设计一个足够灵活的消息结构。标准的Action消息包含Goal、Feedback和Result三部分但实际项目中往往需要更丰富的信息。# ComplexTask.action # Goal部分 uint32 task_id string task_type # 如pick_and_place、inspection geometry_msgs/Pose target_pose float32 timeout uint8 priority # 0-255优先级 string parameters # JSON格式的额外参数 --- # Result部分 bool success string message duration execution_time --- # Feedback部分 uint8 state # 0等待 1执行中 2暂停 3取消中 float32 progress string status_message geometry_msgs/Pose current_pose这种设计有几个优点支持任务标识和类型区分包含空间位置信息内置优先级字段通过JSON参数支持扩展详细的执行状态反馈1.2 任务队列与优先级调度单纯的Action Server一次只能处理一个Goal要实现任务队列需要自己实现调度逻辑。这里我分享一个经过实战检验的方案class TaskScheduler: def __init__(self): self.pending_tasks [] self.current_task None self.lock threading.Lock() def add_task(self, goal): with self.lock: # 按优先级插入队列 bisect.insort(self.pending_tasks, goal, keylambda x: -x.priority) def get_next_task(self): with self.lock: if not self.pending_tasks: return None self.current_task self.pending_tasks.pop(0) return self.current_task def cancel_task(self, task_id): with self.lock: # 取消当前任务 if self.current_task and self.current_task.task_id task_id: self.current_task None return True # 取消队列中的任务 for i, task in enumerate(self.pending_tasks): if task.task_id task_id: self.pending_tasks.pop(i) return True return False这个调度器实现了线程安全的队列操作基于优先级的任务排序任务取消功能简单的任务管理接口2. 可靠的任务执行与监控2.1 状态监控与重试机制在实际部署中网络波动或硬件故障可能导致任务中断。我们需要实现自动恢复机制class TaskExecutor: def __init__(self): self.max_retries 3 self.retry_delay 1.0 # 秒 def execute_task(self, goal_handle): retry_count 0 while retry_count self.max_retries: try: return self._execute_single(goal_handle) except Exception as e: retry_count 1 if retry_count self.max_retries: goal_handle.abort() return ComplexTask.Result(successFalse, messagestr(e)) time.sleep(self.retry_delay) def _execute_single(self, goal_handle): # 实际任务执行逻辑 for i in range(10): if goal_handle.is_cancel_requested: goal_handle.canceled() return ComplexTask.Result(successFalse, messageCanceled) # 更新进度 feedback ComplexTask.Feedback() feedback.progress i / 10.0 feedback.state 1 # 执行中 goal_handle.publish_feedback(feedback) time.sleep(0.5) return ComplexTask.Result(successTrue, messageCompleted)这个执行器提供了自动重试失败的任务完善的错误处理取消请求检查进度反馈2.2 客户端状态管理一个健壮的客户端需要处理各种异常情况class RobustActionClient: def __init__(self, node, action_name, action_type): self.node node self.client ActionClient(node, action_type, action_name) self.last_goal_handle None def send_goal(self, goal, callbackNone): if not self.client.wait_for_server(timeout_sec5.0): self.node.get_logger().error(Server not available) return None send_goal_future self.client.send_goal_async( goal, feedback_callbackself._feedback_callback ) send_goal_future.add_done_callback( lambda future: self._goal_response_callback(future, callback) ) return send_goal_future def _goal_response_callback(self, future, callback): goal_handle future.result() self.last_goal_handle goal_handle if not goal_handle.accepted: self.node.get_logger().warn(Goal rejected) if callback: callback(goal_handle, None) return get_result_future goal_handle.get_result_async() get_result_future.add_done_callback( lambda future: self._result_callback(future, callback) ) def _result_callback(self, future, callback): result future.result().result if callback: callback(self.last_goal_handle, result) def _feedback_callback(self, feedback_msg): self.node.get_logger().info( fProgress: {feedback_msg.feedback.progress:.1%} ) def cancel_current_goal(self): if self.last_goal_handle: return self.last_goal_handle.cancel_goal_async() return None这个客户端实现了服务器可用性检查异步结果处理目标取消接口回调机制3. MoveIt! 运动控制集成3.1 MoveIt! Action 接口分析MoveIt! 已经提供了完善的Action接口主要包含MoveGroupAction处理运动规划请求ExecuteTrajectoryAction执行规划好的轨迹PickPlaceAction处理抓取放置动作理解这些接口是集成的基础。我曾经花了不少时间研究这些接口的细节这里分享一些关键发现MoveGroupAction的Goal包含运动约束、路径约束等信息执行结果会包含规划时间和错误信息反馈中包含当前状态和进度3.2 复合任务编排真正的机器人任务往往是多个动作的组合。比如一个典型的抓取-放置任务包含移动到预抓取位置执行抓取动作移动到放置位置执行放置动作下面是一个实现示例class PickAndPlaceManager: def __init__(self, move_group): self.move_group move_group self.state idle def execute(self, pick_pose, place_pose): self.state moving_to_pick # 移动到预抓取位置 if not self._move_to_pre_grasp(pick_pose): return False self.state grasping # 执行抓取 if not self._execute_grasp(): return False self.state moving_to_place # 移动到放置位置 if not self._move_to_place(place_pose): return False self.state releasing # 执行放置 if not self._execute_release(): return False self.state completed return True def _move_to_pre_grasp(self, pose): self.move_group.set_pose_target(pose) success self.move_group.go(waitTrue) self.move_group.stop() self.move_group.clear_pose_targets() return success def _execute_grasp(self): # 实际抓取逻辑 return True def _move_to_place(self, pose): # 类似_move_to_pre_grasp pass def _execute_release(self): # 实际放置逻辑 return True这个管理器提供了状态跟踪错误处理可扩展的任务流程3.3 与自定义框架的集成将MoveIt!集成到我们的异步任务框架中需要创建一个适配器class MoveItTaskAdapter: def __init__(self, move_group): self.move_group move_group def execute(self, goal_handle): task_type goal_handle.request.task_type params json.loads(goal_handle.request.parameters) if task_type move_to_pose: return self._handle_move_to_pose(goal_handle, params) elif task_type pick_and_place: return self._handle_pick_and_place(goal_handle, params) else: goal_handle.abort() return ComplexTask.Result(successFalse, messageUnknown task type) def _handle_move_to_pose(self, goal_handle, params): pose Pose() pose.position.x params[x] pose.position.y params[y] pose.position.z params[z] self.move_group.set_pose_target(pose) success self.move_group.go(waitTrue) self.move_group.stop() self.move_group.clear_pose_targets() return ComplexTask.Result( successsuccess, messageCompleted if success else Failed )这个适配器实现了任务类型路由参数解析统一的接口封装4. 实战案例智能分拣系统4.1 系统架构设计基于上述框架我们可以构建一个完整的分拣系统任务接收层通过ROS2 Topic接收分拣请求任务调度层管理任务队列和优先级执行层与MoveIt!交互控制机械臂监控层提供状态反馈和错误恢复class SortingSystem: def __init__(self): self.node rclpy.create_node(sorting_system) # 初始化各组件 self.move_group MoveGroupCommander(arm_group) self.task_adapter MoveItTaskAdapter(self.move_group) self.scheduler TaskScheduler() # 创建Action Server self.action_server ActionServer( self.node, ComplexTask, robot_task, self.execute_callback ) # 订阅任务请求 self.task_sub self.node.create_subscription( String, sorting_requests, self.request_callback, 10 ) def request_callback(self, msg): # 解析请求并创建任务 goal ComplexTask.Goal() goal.task_id generate_task_id() goal.task_type pick_and_place goal.priority parse_priority(msg) goal.parameters build_parameters(msg) self.scheduler.add_task(goal) def execute_callback(self, goal_handle): # 从调度器获取任务 task self.scheduler.get_next_task() if not task: goal_handle.abort() return ComplexTask.Result(successFalse, messageNo tasks available) # 执行任务 return self.task_adapter.execute(goal_handle)4.2 性能优化技巧在实际部署中我发现几个关键的性能优化点轨迹规划预计算在空闲时预先计算常用轨迹碰撞检测优化调整MoveIt!的碰撞检测参数任务批处理将多个小任务合并执行状态缓存减少重复的状态查询class OptimizedMoveItAdapter(MoveItTaskAdapter): def __init__(self, move_group): super().__init__(move_group) self.trajectory_cache {} def _handle_move_to_pose(self, goal_handle, params): cache_key f{params[x]},{params[y]},{params[z]} if cache_key in self.trajectory_cache: # 使用缓存的轨迹 trajectory self.trajectory_cache[cache_key] success self.move_group.execute(trajectory, waitTrue) else: # 首次计算并缓存 self.move_group.set_pose_target(params) success self.move_group.go(waitTrue) if success: self.trajectory_cache[cache_key] ( self.move_group.remember_trajectory() ) self.move_group.stop() self.move_group.clear_pose_targets() return ComplexTask.Result( successsuccess, messageCompleted if success else Failed )4.3 常见问题排查在开发过程中我遇到过不少问题这里分享几个典型案例任务卡死通常是由于没有正确处理取消请求解决方案是增加超时检查和取消处理运动抖动调整MoveIt!的加速度和速度参数可以改善规划失败检查碰撞物体设置和约束条件通信延迟优化网络配置或增加本地缓冲def monitor_system_health(): while True: # 检查任务执行时间 for task in active_tasks: if time.time() - task.start_time MAX_TASK_TIME: task.cancel() log(Task timeout canceled) # 检查系统资源 if cpu_usage 90%: throttle_new_tasks() time.sleep(1.0)