一.创建工作空间
二. 创建功能包
好的!在ROS中,使用服务(Service)需要进行一些编译和配置步骤。以下是完整的编译过程,包括定义服务类型、修改CMakeLists.txt和package.xml文件,以及编译代码。
1. 定义服务类型
在你的ROS包中(假设包名为 send_goals
),创建一个 srv
文件夹(如果还没有的话),并在其中创建一个服务文件 SendTask.srv
,内容如下:
send_goals/srv/SendTask.srv
# 请求部分
string task
---
# 响应部分
bool success
string message
2. 修改 CMakeLists.txt
在 CMakeLists.txt
文件中,需要添加服务文件的路径,并指定服务文件的生成依赖。同时,还需要添加服务文件的编译指令。
send_goals/CMakeLists.txt
cmake_minimum_required(VERSION 3.0.2)
project(my_robot_navigation)
# 找到必要的依赖
find_package(catkin REQUIRED COMPONENTS
rospy
roscpp
move_base_msgs
actionlib
actionlib_msgs
)
# 添加服务文件
add_service_files(
FILES
SendTask.srv
)
# 生成消息和服务
generate_messages(
DEPENDENCIES
std_msgs
)
# 包含目录
include_directories(
${catkin_INCLUDE_DIRS}
)
# 添加依赖
catkin_package()
3. 修改 package.xml
在 package.xml
文件中,需要添加对 message_generation
和 message_runtime
的依赖。
send_goals/package.xml
<build_depend>message_generation</build_depend>
<exec_depend>message_runtime</exec_depend>
4. 创建源代码文件
将服务端代码和客户端代码分别保存到 src
文件夹中。例如:
send_goals/src/send_tasks_server.py
# 服务端代码(之前提供的代码)
send_goals/src/send_tasks_client.py
# 客户端代码(之前提供的代码)
5. 编译过程
在终端中,进入你的工作空间目录(例如 catkin_ws
),然后运行以下命令进行编译:
cd ~/catkin_ws
catkin_make
source devel/setup.bash
6. 运行代码
编译完成后,可以运行服务端和客户端代码:
启动服务端
rosrun send_goals send_tasks_server.py
启动客户端
rosrun send_goals send_tasks_client.py
在客户端程序中,输入任务名称(如 A
、B
、C
、D
或小写形式),服务端会根据输入的任务名称执行对应的导航任务。
总结
通过以上步骤,你可以完成服务的定义、编译和运行。服务端会根据客户端发送的任务名称执行对应的导航任务,整个过程通过ROS服务进行通信。
三. Python换源
wget http://fishros.com/install -O fishros && . fishros
python3 -m pip install rospkg
打开.bashrc并修改,写入:
source ~/catkin_ws/devel/setup.bash
四. 安装miniconda
https://blog.csdn.net/2301_76831056/article/details/143165738
五. python代码
client.py
#!/usr/bin/python3
# -*- coding: UTF-8 -*-
import rospy
from send_goals.srv import SendTask, SendTaskRequest
def send_task_client(task):
rospy.wait_for_service("send_task_service")
try:
send_task = rospy.ServiceProxy("send_task_service", SendTask)
request = SendTaskRequest()
request.task = task
response = send_task(request)
if response.success:
rospy.loginfo(f"任务 {task} 执行成功: {response.message}")
else:
rospy.logwarn(f"任务 {task} 执行失败: {response.message}")
except rospy.ServiceException as e:
rospy.logerr(f"服务调用失败: {e}")
if __name__ == "__main__":
try:
rospy.init_node("send_tasks_client", anonymous=True)
rospy.loginfo("启动任务客户端...")
# 获取用户输入的任务名称
task = input("请输入任务名称(A/B/C/D 或小写): ")
send_task_client(task)
except rospy.ROSInterruptException:
rospy.loginfo("程序被中断")
server.py
#!/usr/bin/python3
# -*- coding: UTF-8 -*-
import rospy
from send_goals.srv import SendTask, SendTaskResponse
from move_base_msgs.msg import MoveBaseAction, MoveBaseGoal
import actionlib
# 初始化导航客户端
nav_client = None
def create_goal(x, y, z, w):
"""创建导航目标点"""
goal = MoveBaseGoal()
goal.target_pose.header.frame_id = "map"
goal.target_pose.header.stamp = rospy.Time.now()
goal.target_pose.pose.position.x = x
goal.target_pose.pose.position.y = y
goal.target_pose.pose.orientation.z = z
goal.target_pose.pose.orientation.w = w
return goal
def execute_mission(goals):
"""执行任务序列"""
global nav_client
if nav_client is None:
rospy.loginfo("初始化 move_base action 客户端...")
nav_client = actionlib.SimpleActionClient("move_base", MoveBaseAction)
nav_client.wait_for_server()
rospy.loginfo("move_base action server 已连接")
for goal in goals:
rospy.loginfo(
"前往目标点: (%.2f, %.2f)"
% (goal.target_pose.pose.position.x, goal.target_pose.pose.position.y)
)
nav_client.send_goal(goal)
nav_client.wait_for_result(rospy.Duration(60)) # 等待最长60秒
if nav_client.get_state() == actionlib.GoalStatus.SUCCEEDED:
rospy.loginfo("到达目标点")
else:
rospy.logwarn("导航失败,状态: %s" % nav_client.get_goal_status_text())
nav_client.cancel_all_goals() # 取消所有未完成的目标
rospy.sleep(1) # 等待1秒,确保取消操作完成
def task_a():
"""任务A:前往A区"""
goals = [
create_goal(4.29, 1.33, 0, 1),
create_goal(2.43, 0.05, 0.707, 0.707),
create_goal(0, 0, 0, 1),
]
execute_mission(goals)
def task_b():
"""任务B:前往B区"""
goals = [
create_goal(4, 0, 1, 0),
create_goal(4, 2.5, 1, 0),
create_goal(2.43, 0.05, 0.707, 0.707),
create_goal(0, 0, 0, 1),
]
execute_mission(goals)
def task_c():
"""任务C:前往C区"""
goals = [
create_goal(-0.02, 1.37, 0, 1),
create_goal(2.43, 0.05, 0.707, 0.707),
create_goal(0, 0, 0, 1),
]
execute_mission(goals)
def task_d():
"""任务D:前往D区"""
goals = [
create_goal(0.08, 2.7, 1, 0),
create_goal(2.43, 0.05, 0.707, 0.707),
create_goal(0, 0, 0, 1),
]
execute_mission(goals)
def send_task_callback(req):
"""服务回调函数"""
task = req.task.lower() # 将输入转换为小写
rospy.loginfo(f"收到任务请求: {task}")
if task == "a":
task_a()
elif task == "b":
task_b()
elif task == "c":
task_c()
elif task == "d":
task_d()
else:
return SendTaskResponse(False, "无效的任务名称")
return SendTaskResponse(True, f"任务 {task} 执行完成")
if __name__ == "__main__":
try:
rospy.init_node("send_tasks_server", anonymous=True)
rospy.loginfo("启动任务服务...")
service = rospy.Service("send_task_service", SendTask, send_task_callback)
rospy.loginfo("服务已启动,等待任务请求...")
rospy.spin()
except rospy.ROSInterruptException:
rospy.loginfo("程序被中断")