Chapter 1.6: Nodes, Topics, Services, Actions
Learning Objectives
By the end of this chapter, students will be able to:
- Implement ROS 2 nodes with proper lifecycle management
- Create and use topics for publish-subscribe communication
- Design and implement services for request-reply communication
- Build action servers and clients for goal-oriented operations
- Choose appropriate communication patterns for different scenarios
Introduction
In the previous chapter, we introduced the core communication patterns of ROS 2. Now we'll dive deeper into each pattern, exploring their implementation details, best practices, and appropriate use cases. Understanding these patterns in depth is essential for building robust and efficient robot systems, particularly in complex applications like humanoid robotics where multiple subsystems must coordinate seamlessly.
The choice of communication pattern significantly impacts system performance, reliability, and maintainability. In this chapter, we'll explore each pattern with practical examples, showing how to implement them effectively and when to use each one.
Nodes: The Building Blocks of ROS 2
Node Lifecycle Management
ROS 2 provides a structured lifecycle for nodes that allows for better resource management and coordination:
import rclpy
from rclpy.lifecycle import LifecycleNode, LifecycleState, TransitionCallbackReturn
from rclpy.lifecycle import Node as rclpy_Node
class LifecycleExampleNode(LifecycleNode):
def __init__(self):
super().__init__('lifecycle_example_node')
self.get_logger().info('Lifecycle node created')
def on_configure(self, state: LifecycleState) -> TransitionCallbackReturn:
self.get_logger().info('Configuring node')
# Initialize resources here
return TransitionCallbackReturn.SUCCESS
def on_activate(self, state: LifecycleState) -> TransitionCallbackReturn:
self.get_logger().info('Activating node')
# Activate publishers, subscribers, etc.
return TransitionCallbackReturn.SUCCESS
def on_deactivate(self, state: LifecycleState) -> TransitionCallbackReturn:
self.get_logger().info('Deactivating node')
# Deactivate publishers, subscribers, etc.
return TransitionCallbackReturn.SUCCESS
def on_cleanup(self, state: LifecycleState) -> TransitionCallbackReturn:
self.get_logger().info('Cleaning up node')
# Release resources here
return TransitionCallbackReturn.SUCCESS
Best Practices for Node Design
- Single Responsibility: Each node should have a well-defined purpose
- Error Handling: Implement proper error handling and recovery
- Resource Management: Properly manage memory and system resources
- Logging: Use appropriate logging levels for debugging and monitoring
import rclpy
from rclpy.node import Node
import traceback
class BestPracticeNode(Node):
def __init__(self):
super().__init__('best_practice_node')
# Initialize components
try:
self.setup_components()
except Exception as e:
self.get_logger().error(f'Failed to initialize components: {e}')
self.get_logger().error(traceback.format_exc())
raise
def setup_components(self):
"""Initialize node components safely"""
self.publisher = self.create_publisher(String, 'topic', 10)
self.subscription = self.create_subscription(
String, 'input_topic', self.callback, 10)
# Timer for periodic tasks
self.timer = self.create_timer(1.0, self.timer_callback)
def callback(self, msg):
"""Process incoming messages with error handling"""
try:
# Process the message
result = self.process_message(msg)
if result:
self.publisher.publish(result)
except Exception as e:
self.get_logger().error(f'Error processing message: {e}')
self.get_logger().error(traceback.format_exc())
def timer_callback(self):
"""Handle periodic tasks with error handling"""
try:
# Perform periodic tasks
self.perform_periodic_task()
except Exception as e:
self.get_logger().error(f'Error in periodic task: {e}')
self.get_logger().error(traceback.format_exc())
Topics: Publish-Subscribe Communication
Advanced Topic Usage
Topics are ideal for streaming data like sensor readings, system status, or control commands:
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import LaserScan
from geometry_msgs.msg import Twist
from std_msgs.msg import Float32
class AdvancedTopicNode(Node):
def __init__(self):
super().__init__('advanced_topic_node')
# Multiple publishers with different QoS settings
self.cmd_vel_pub = self.create_publisher(
Twist, 'cmd_vel',
qos_profile=self.get_control_qos_profile())
self.status_pub = self.create_publisher(
String, 'status',
qos_profile=self.get_status_qos_profile())
# Multiple subscribers
self.scan_sub = self.create_subscription(
LaserScan, 'scan',
self.scan_callback,
qos_profile=self.get_sensor_qos_profile())
self.speed_sub = self.create_subscription(
Float32, 'target_speed',
self.speed_callback,
10)
def get_control_qos_profile(self):
"""QoS for control commands - reliable delivery"""
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
return QoSProfile(
reliability=QoSReliabilityPolicy.RELIABLE,
history=QoSHistoryPolicy.KEEP_LAST,
depth=1
)
def get_status_qos_profile(self):
"""QoS for status messages - best effort"""
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
return QoSProfile(
reliability=QoSReliabilityPolicy.BEST_EFFORT,
history=QoSHistoryPolicy.KEEP_ALL
)
def get_sensor_qos_profile(self):
"""QoS for sensor data - best effort, keep last"""
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
return QoSProfile(
reliability=QoSReliabilityPolicy.BEST_EFFORT,
history=QoSHistoryPolicy.KEEP_LAST,
depth=5
)
def scan_callback(self, msg):
"""Process laser scan data"""
# Example: Find minimum distance
min_distance = min(msg.ranges) if msg.ranges else float('inf')
# Publish control command based on sensor data
cmd = Twist()
if min_distance > 1.0:
cmd.linear.x = 0.5 # Move forward
else:
cmd.angular.z = 0.5 # Turn to avoid
self.cmd_vel_pub.publish(cmd)
def speed_callback(self, msg):
"""Update target speed"""
self.target_speed = msg.data
Topic Synchronization
For systems that need to process multiple streams of data together:
from rclpy.qos import QoSProfile
from message_filters import ApproximateTimeSynchronizer, Subscriber
from sensor_msgs.msg import Image, CameraInfo
class SynchronizedNode(Node):
def __init__(self):
super().__init__('synchronized_node')
# Create subscribers
self.image_sub = Subscriber(
self, Image, 'image_raw',
qos_profile=QoSProfile(depth=10))
self.info_sub = Subscriber(
self, CameraInfo, 'camera_info',
qos_profile=QoSProfile(depth=10))
# Synchronize messages with a time tolerance of 0.1 seconds
self.sync = ApproximateTimeSynchronizer(
[self.image_sub, self.info_sub],
queue_size=10,
slop=0.1) # Time tolerance in seconds
self.sync.registerCallback(self.sync_callback)
def sync_callback(self, image_msg, info_msg):
"""Process synchronized image and camera info"""
# Process the synchronized data
self.process_image_with_calibration(image_msg, info_msg)
Services: Request-Reply Communication
Service Implementation Best Practices
Services are appropriate for operations that have a clear request-response pattern:
import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts
import threading
from concurrent.futures import ThreadPoolExecutor
class ServiceNode(Node):
def __init__(self):
super().__init__('service_node')
# Create service server with custom executor for async handling
self.srv = self.create_service(
AddTwoInts,
'add_two_ints',
self.add_two_ints_callback)
# Thread pool for handling long-running requests
self.executor = ThreadPoolExecutor(max_workers=4)
def add_two_ints_callback(self, request, response):
"""Handle service requests"""
self.get_logger().info(f'Request: {request.a} + {request.b}')
# Perform computation (could be complex)
try:
result = self.complex_calculation(request.a, request.b)
response.sum = result
except Exception as e:
self.get_logger().error(f'Calculation error: {e}')
response.sum = 0 # Default value on error
self.get_logger().info(f'Response: {response.sum}')
return response
def complex_calculation(self, a, b):
"""Simulate a potentially complex calculation"""
# In a real implementation, this might involve:
# - Complex mathematical operations
# - Database queries
# - External API calls
# - File operations
return a + b
Service Client Implementation
import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts
from rclpy.qos import QoSProfile
class ServiceClientNode(Node):
def __init__(self):
super().__init__('service_client_node')
# Create client
self.cli = self.create_client(AddTwoInts, 'add_two_ints')
# Wait for service to be available
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Service not available, waiting again...')
# Timer to periodically call the service
self.timer = self.create_timer(2.0, self.call_service)
self.req = AddTwoInts.Request()
def call_service(self):
"""Call the service asynchronously"""
self.req.a = 42
self.req.b = 24
# Make asynchronous call
self.future = self.cli.call_async(self.req)
self.future.add_done_callback(self.service_callback)
def service_callback(self, future):
"""Handle service response"""
try:
response = future.result()
self.get_logger().info(f'Result of add_two_ints: {response.sum}')
except Exception as e:
self.get_logger().error(f'Service call failed: {e}')
Actions: Goal-Oriented Communication
Action Server Implementation
Actions are ideal for long-running tasks that provide feedback and can be preempted:
import rclpy
from rclpy.action import ActionServer, CancelResponse, GoalResponse
from rclpy.node import Node
from example_interfaces.action import Fibonacci
import time
class FibonacciActionServer(Node):
def __init__(self):
super().__init__('fibonacci_action_server')
# Create action server
self._action_server = ActionServer(
self,
Fibonacci,
'fibonacci',
self.execute_callback,
goal_callback=self.goal_callback,
cancel_callback=self.cancel_callback
)
def goal_callback(self, goal_request):
"""Accept or reject a goal"""
self.get_logger().info('Received goal request')
# Accept all goals for this example
return GoalResponse.ACCEPT
def cancel_callback(self, goal_handle):
"""Accept or reject a cancel request"""
self.get_logger().info('Received cancel request')
# Accept all cancel requests for this example
return CancelResponse.ACCEPT
async def execute_callback(self, goal_handle):
"""Execute the goal"""
self.get_logger().info('Executing goal...')
# Get the goal order
order = goal_handle.request.order
# Feedback and result messages
feedback_msg = Fibonacci.Feedback()
result_msg = Fibonacci.Result()
# Initialize the sequence
feedback_msg.sequence = [0, 1]
# Check if order is small enough to process
if order < 2:
result_msg.sequence = feedback_msg.sequence[:order+1]
goal_handle.succeed()
return result_msg
# Process the sequence with feedback
for i in range(1, order):
# Check if cancel was requested
if goal_handle.is_cancel_requested:
result_msg.sequence = feedback_msg.sequence
goal_handle.canceled()
self.get_logger().info('Goal canceled')
return result_msg
# Calculate next Fibonacci number
next_fib = feedback_msg.sequence[i] + feedback_msg.sequence[i-1]
feedback_msg.sequence.append(next_fib)
# Publish feedback
goal_handle.publish_feedback(feedback_msg)
self.get_logger().info(f'Feedback: {feedback_msg.sequence}')
# Simulate processing time
time.sleep(0.5)
# Complete successfully
goal_handle.succeed()
result_msg.sequence = feedback_msg.sequence
self.get_logger().info(f'Result: {result_msg.sequence}')
return result_msg
Action Client Implementation
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from example_interfaces.action import Fibonacci
class FibonacciActionClient(Node):
def __init__(self):
super().__init__('fibonacci_action_client')
# Create action client
self._action_client = ActionClient(
self,
Fibonacci,
'fibonacci'
)
def send_goal(self, order):
"""Send a goal to the action server"""
# Wait for action server
self._action_client.wait_for_server()
# Create goal request
goal_msg = Fibonacci.Goal()
goal_msg.order = order
# Send goal asynchronously
self._send_goal_future = self._action_client.send_goal_async(
goal_msg,
feedback_callback=self.feedback_callback
)
self._send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
"""Handle goal response"""
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Goal rejected')
return
self.get_logger().info('Goal accepted')
# Get result asynchronously
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
def feedback_callback(self, feedback_msg):
"""Handle feedback"""
self.get_logger().info(f'Received feedback: {feedback_msg.feedback.sequence}')
def get_result_callback(self, future):
"""Handle result"""
result = future.result().result
self.get_logger().info(f'Result: {result.sequence}')
# Shutdown after receiving result
rclpy.shutdown()
def main(args=None):
rclpy.init(args=args)
action_client = FibonacciActionClient()
# Send a goal
action_client.send_goal(10)
# Spin to process callbacks
rclpy.spin(action_client)
Choosing the Right Communication Pattern
When to Use Each Pattern
| Pattern | Use Case | Characteristics |
|---|---|---|
| Topics | Streaming data, broadcasting | Asynchronous, many-to-many, real-time |
| Services | Request-response operations | Synchronous, one-to-one, definitive result |
| Actions | Long-running, goal-oriented tasks | Asynchronous, with feedback and preemption |
Decision Framework
def choose_communication_pattern(requirements):
"""
Decision framework for choosing communication pattern
"""
if requirements.is_streaming:
return "Use Topics (publish-subscribe)"
if requirements.is_request_response and requirements.is_short_running:
return "Use Services (request-reply)"
if (requirements.is_long_running or
requirements.needs_feedback or
requirements.can_be_preempted):
return "Use Actions (goal-oriented)"
if requirements.is_configuration:
return "Use Parameters (configuration)"
return "Default to Topics for general communication"
Hands-On Exercise: Communication Pattern Selection
Objective
Design and implement a robot navigation system using appropriate communication patterns.
Prerequisites
- ROS 2 Humble installed
- Understanding of all communication patterns
- TurtleBot3 simulation environment
Steps
- Analyze the navigation system requirements
- Identify which components need which communication patterns
- Implement the navigation system using appropriate patterns:
- Topics for sensor data streaming
- Services for map queries
- Actions for navigation goals
- Test the system with different scenarios
Expected Result
Students will implement a navigation system that correctly uses topics, services, and actions based on the requirements of each component.
Assessment Questions
Multiple Choice
Q1: Which communication pattern is most appropriate for a robot's laser scan data that multiple nodes need to process simultaneously?
- a) Service
- b) Action
- c) Topic
- d) Parameter
Details
Click to reveal answer
Answer: cExplanation: Topics use the publish-subscribe pattern which allows multiple nodes to receive the same data stream simultaneously, making them ideal for sensor data.
Short Answer
Q2: Explain when you would use an action instead of a service in a robot system, providing a specific example.
Practical Exercise
Q3: Create a ROS 2 system that includes all three communication patterns: (1) a topic publisher for sensor data, (2) a service server for configuration requests, and (3) an action server for executing navigation goals. Demonstrate how they work together in a coordinated robot behavior.
Further Reading
- "Effective ROS 2" - Best practices for ROS 2 development
- "ROS 2 Design Patterns" - Architectural patterns for ROS 2 systems
- "Robot Systems Design" - Communication patterns in robotics
Summary
In this chapter, we've explored the three primary communication patterns in ROS 2 in detail: topics, services, and actions. We've learned how to implement each pattern effectively, understand their appropriate use cases, and design systems that combine them appropriately.
Choosing the right communication pattern is crucial for building efficient and maintainable robot systems. Topics are ideal for streaming data, services for request-response operations, and actions for long-running, goal-oriented tasks. Understanding these patterns is fundamental to creating complex robot systems that can operate effectively in the real world.
In the next chapter, we'll explore how to build ROS 2 packages with Python, implementing these communication patterns in practical applications.