Skip to main content

Chapter 1.6: Nodes, Topics, Services, Actions

Learning Objectives

By the end of this chapter, students will be able to:

  • Implement ROS 2 nodes with proper lifecycle management
  • Create and use topics for publish-subscribe communication
  • Design and implement services for request-reply communication
  • Build action servers and clients for goal-oriented operations
  • Choose appropriate communication patterns for different scenarios

Introduction

In the previous chapter, we introduced the core communication patterns of ROS 2. Now we'll dive deeper into each pattern, exploring their implementation details, best practices, and appropriate use cases. Understanding these patterns in depth is essential for building robust and efficient robot systems, particularly in complex applications like humanoid robotics where multiple subsystems must coordinate seamlessly.

The choice of communication pattern significantly impacts system performance, reliability, and maintainability. In this chapter, we'll explore each pattern with practical examples, showing how to implement them effectively and when to use each one.

Nodes: The Building Blocks of ROS 2

Node Lifecycle Management

ROS 2 provides a structured lifecycle for nodes that allows for better resource management and coordination:

import rclpy
from rclpy.lifecycle import LifecycleNode, LifecycleState, TransitionCallbackReturn
from rclpy.lifecycle import Node as rclpy_Node

class LifecycleExampleNode(LifecycleNode):
def __init__(self):
super().__init__('lifecycle_example_node')
self.get_logger().info('Lifecycle node created')

def on_configure(self, state: LifecycleState) -> TransitionCallbackReturn:
self.get_logger().info('Configuring node')
# Initialize resources here
return TransitionCallbackReturn.SUCCESS

def on_activate(self, state: LifecycleState) -> TransitionCallbackReturn:
self.get_logger().info('Activating node')
# Activate publishers, subscribers, etc.
return TransitionCallbackReturn.SUCCESS

def on_deactivate(self, state: LifecycleState) -> TransitionCallbackReturn:
self.get_logger().info('Deactivating node')
# Deactivate publishers, subscribers, etc.
return TransitionCallbackReturn.SUCCESS

def on_cleanup(self, state: LifecycleState) -> TransitionCallbackReturn:
self.get_logger().info('Cleaning up node')
# Release resources here
return TransitionCallbackReturn.SUCCESS

Best Practices for Node Design

  1. Single Responsibility: Each node should have a well-defined purpose
  2. Error Handling: Implement proper error handling and recovery
  3. Resource Management: Properly manage memory and system resources
  4. Logging: Use appropriate logging levels for debugging and monitoring
import rclpy
from rclpy.node import Node
import traceback

class BestPracticeNode(Node):
def __init__(self):
super().__init__('best_practice_node')

# Initialize components
try:
self.setup_components()
except Exception as e:
self.get_logger().error(f'Failed to initialize components: {e}')
self.get_logger().error(traceback.format_exc())
raise

def setup_components(self):
"""Initialize node components safely"""
self.publisher = self.create_publisher(String, 'topic', 10)
self.subscription = self.create_subscription(
String, 'input_topic', self.callback, 10)

# Timer for periodic tasks
self.timer = self.create_timer(1.0, self.timer_callback)

def callback(self, msg):
"""Process incoming messages with error handling"""
try:
# Process the message
result = self.process_message(msg)
if result:
self.publisher.publish(result)
except Exception as e:
self.get_logger().error(f'Error processing message: {e}')
self.get_logger().error(traceback.format_exc())

def timer_callback(self):
"""Handle periodic tasks with error handling"""
try:
# Perform periodic tasks
self.perform_periodic_task()
except Exception as e:
self.get_logger().error(f'Error in periodic task: {e}')
self.get_logger().error(traceback.format_exc())

Topics: Publish-Subscribe Communication

Advanced Topic Usage

Topics are ideal for streaming data like sensor readings, system status, or control commands:

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import LaserScan
from geometry_msgs.msg import Twist
from std_msgs.msg import Float32

class AdvancedTopicNode(Node):
def __init__(self):
super().__init__('advanced_topic_node')

# Multiple publishers with different QoS settings
self.cmd_vel_pub = self.create_publisher(
Twist, 'cmd_vel',
qos_profile=self.get_control_qos_profile())

self.status_pub = self.create_publisher(
String, 'status',
qos_profile=self.get_status_qos_profile())

# Multiple subscribers
self.scan_sub = self.create_subscription(
LaserScan, 'scan',
self.scan_callback,
qos_profile=self.get_sensor_qos_profile())

self.speed_sub = self.create_subscription(
Float32, 'target_speed',
self.speed_callback,
10)

def get_control_qos_profile(self):
"""QoS for control commands - reliable delivery"""
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
return QoSProfile(
reliability=QoSReliabilityPolicy.RELIABLE,
history=QoSHistoryPolicy.KEEP_LAST,
depth=1
)

def get_status_qos_profile(self):
"""QoS for status messages - best effort"""
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
return QoSProfile(
reliability=QoSReliabilityPolicy.BEST_EFFORT,
history=QoSHistoryPolicy.KEEP_ALL
)

def get_sensor_qos_profile(self):
"""QoS for sensor data - best effort, keep last"""
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
return QoSProfile(
reliability=QoSReliabilityPolicy.BEST_EFFORT,
history=QoSHistoryPolicy.KEEP_LAST,
depth=5
)

def scan_callback(self, msg):
"""Process laser scan data"""
# Example: Find minimum distance
min_distance = min(msg.ranges) if msg.ranges else float('inf')

# Publish control command based on sensor data
cmd = Twist()
if min_distance > 1.0:
cmd.linear.x = 0.5 # Move forward
else:
cmd.angular.z = 0.5 # Turn to avoid

self.cmd_vel_pub.publish(cmd)

def speed_callback(self, msg):
"""Update target speed"""
self.target_speed = msg.data

Topic Synchronization

For systems that need to process multiple streams of data together:

from rclpy.qos import QoSProfile
from message_filters import ApproximateTimeSynchronizer, Subscriber
from sensor_msgs.msg import Image, CameraInfo

class SynchronizedNode(Node):
def __init__(self):
super().__init__('synchronized_node')

# Create subscribers
self.image_sub = Subscriber(
self, Image, 'image_raw',
qos_profile=QoSProfile(depth=10))
self.info_sub = Subscriber(
self, CameraInfo, 'camera_info',
qos_profile=QoSProfile(depth=10))

# Synchronize messages with a time tolerance of 0.1 seconds
self.sync = ApproximateTimeSynchronizer(
[self.image_sub, self.info_sub],
queue_size=10,
slop=0.1) # Time tolerance in seconds
self.sync.registerCallback(self.sync_callback)

def sync_callback(self, image_msg, info_msg):
"""Process synchronized image and camera info"""
# Process the synchronized data
self.process_image_with_calibration(image_msg, info_msg)

Services: Request-Reply Communication

Service Implementation Best Practices

Services are appropriate for operations that have a clear request-response pattern:

import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts
import threading
from concurrent.futures import ThreadPoolExecutor

class ServiceNode(Node):
def __init__(self):
super().__init__('service_node')

# Create service server with custom executor for async handling
self.srv = self.create_service(
AddTwoInts,
'add_two_ints',
self.add_two_ints_callback)

# Thread pool for handling long-running requests
self.executor = ThreadPoolExecutor(max_workers=4)

def add_two_ints_callback(self, request, response):
"""Handle service requests"""
self.get_logger().info(f'Request: {request.a} + {request.b}')

# Perform computation (could be complex)
try:
result = self.complex_calculation(request.a, request.b)
response.sum = result
except Exception as e:
self.get_logger().error(f'Calculation error: {e}')
response.sum = 0 # Default value on error

self.get_logger().info(f'Response: {response.sum}')
return response

def complex_calculation(self, a, b):
"""Simulate a potentially complex calculation"""
# In a real implementation, this might involve:
# - Complex mathematical operations
# - Database queries
# - External API calls
# - File operations
return a + b

Service Client Implementation

import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts
from rclpy.qos import QoSProfile

class ServiceClientNode(Node):
def __init__(self):
super().__init__('service_client_node')

# Create client
self.cli = self.create_client(AddTwoInts, 'add_two_ints')

# Wait for service to be available
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Service not available, waiting again...')

# Timer to periodically call the service
self.timer = self.create_timer(2.0, self.call_service)
self.req = AddTwoInts.Request()

def call_service(self):
"""Call the service asynchronously"""
self.req.a = 42
self.req.b = 24

# Make asynchronous call
self.future = self.cli.call_async(self.req)
self.future.add_done_callback(self.service_callback)

def service_callback(self, future):
"""Handle service response"""
try:
response = future.result()
self.get_logger().info(f'Result of add_two_ints: {response.sum}')
except Exception as e:
self.get_logger().error(f'Service call failed: {e}')

Actions: Goal-Oriented Communication

Action Server Implementation

Actions are ideal for long-running tasks that provide feedback and can be preempted:

import rclpy
from rclpy.action import ActionServer, CancelResponse, GoalResponse
from rclpy.node import Node
from example_interfaces.action import Fibonacci
import time

class FibonacciActionServer(Node):
def __init__(self):
super().__init__('fibonacci_action_server')

# Create action server
self._action_server = ActionServer(
self,
Fibonacci,
'fibonacci',
self.execute_callback,
goal_callback=self.goal_callback,
cancel_callback=self.cancel_callback
)

def goal_callback(self, goal_request):
"""Accept or reject a goal"""
self.get_logger().info('Received goal request')
# Accept all goals for this example
return GoalResponse.ACCEPT

def cancel_callback(self, goal_handle):
"""Accept or reject a cancel request"""
self.get_logger().info('Received cancel request')
# Accept all cancel requests for this example
return CancelResponse.ACCEPT

async def execute_callback(self, goal_handle):
"""Execute the goal"""
self.get_logger().info('Executing goal...')

# Get the goal order
order = goal_handle.request.order

# Feedback and result messages
feedback_msg = Fibonacci.Feedback()
result_msg = Fibonacci.Result()

# Initialize the sequence
feedback_msg.sequence = [0, 1]

# Check if order is small enough to process
if order < 2:
result_msg.sequence = feedback_msg.sequence[:order+1]
goal_handle.succeed()
return result_msg

# Process the sequence with feedback
for i in range(1, order):
# Check if cancel was requested
if goal_handle.is_cancel_requested:
result_msg.sequence = feedback_msg.sequence
goal_handle.canceled()
self.get_logger().info('Goal canceled')
return result_msg

# Calculate next Fibonacci number
next_fib = feedback_msg.sequence[i] + feedback_msg.sequence[i-1]
feedback_msg.sequence.append(next_fib)

# Publish feedback
goal_handle.publish_feedback(feedback_msg)
self.get_logger().info(f'Feedback: {feedback_msg.sequence}')

# Simulate processing time
time.sleep(0.5)

# Complete successfully
goal_handle.succeed()
result_msg.sequence = feedback_msg.sequence
self.get_logger().info(f'Result: {result_msg.sequence}')

return result_msg

Action Client Implementation

import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from example_interfaces.action import Fibonacci

class FibonacciActionClient(Node):
def __init__(self):
super().__init__('fibonacci_action_client')

# Create action client
self._action_client = ActionClient(
self,
Fibonacci,
'fibonacci'
)

def send_goal(self, order):
"""Send a goal to the action server"""
# Wait for action server
self._action_client.wait_for_server()

# Create goal request
goal_msg = Fibonacci.Goal()
goal_msg.order = order

# Send goal asynchronously
self._send_goal_future = self._action_client.send_goal_async(
goal_msg,
feedback_callback=self.feedback_callback
)

self._send_goal_future.add_done_callback(self.goal_response_callback)

def goal_response_callback(self, future):
"""Handle goal response"""
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Goal rejected')
return

self.get_logger().info('Goal accepted')

# Get result asynchronously
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)

def feedback_callback(self, feedback_msg):
"""Handle feedback"""
self.get_logger().info(f'Received feedback: {feedback_msg.feedback.sequence}')

def get_result_callback(self, future):
"""Handle result"""
result = future.result().result
self.get_logger().info(f'Result: {result.sequence}')

# Shutdown after receiving result
rclpy.shutdown()

def main(args=None):
rclpy.init(args=args)

action_client = FibonacciActionClient()

# Send a goal
action_client.send_goal(10)

# Spin to process callbacks
rclpy.spin(action_client)

Choosing the Right Communication Pattern

When to Use Each Pattern

PatternUse CaseCharacteristics
TopicsStreaming data, broadcastingAsynchronous, many-to-many, real-time
ServicesRequest-response operationsSynchronous, one-to-one, definitive result
ActionsLong-running, goal-oriented tasksAsynchronous, with feedback and preemption

Decision Framework

def choose_communication_pattern(requirements):
"""
Decision framework for choosing communication pattern
"""
if requirements.is_streaming:
return "Use Topics (publish-subscribe)"

if requirements.is_request_response and requirements.is_short_running:
return "Use Services (request-reply)"

if (requirements.is_long_running or
requirements.needs_feedback or
requirements.can_be_preempted):
return "Use Actions (goal-oriented)"

if requirements.is_configuration:
return "Use Parameters (configuration)"

return "Default to Topics for general communication"

Hands-On Exercise: Communication Pattern Selection

Objective

Design and implement a robot navigation system using appropriate communication patterns.

Prerequisites

  • ROS 2 Humble installed
  • Understanding of all communication patterns
  • TurtleBot3 simulation environment

Steps

  1. Analyze the navigation system requirements
  2. Identify which components need which communication patterns
  3. Implement the navigation system using appropriate patterns:
    • Topics for sensor data streaming
    • Services for map queries
    • Actions for navigation goals
  4. Test the system with different scenarios

Expected Result

Students will implement a navigation system that correctly uses topics, services, and actions based on the requirements of each component.

Assessment Questions

Multiple Choice

Q1: Which communication pattern is most appropriate for a robot's laser scan data that multiple nodes need to process simultaneously?

  • a) Service
  • b) Action
  • c) Topic
  • d) Parameter
Details

Click to reveal answer Answer: c
Explanation: Topics use the publish-subscribe pattern which allows multiple nodes to receive the same data stream simultaneously, making them ideal for sensor data.

Short Answer

Q2: Explain when you would use an action instead of a service in a robot system, providing a specific example.

Practical Exercise

Q3: Create a ROS 2 system that includes all three communication patterns: (1) a topic publisher for sensor data, (2) a service server for configuration requests, and (3) an action server for executing navigation goals. Demonstrate how they work together in a coordinated robot behavior.

Further Reading

  1. "Effective ROS 2" - Best practices for ROS 2 development
  2. "ROS 2 Design Patterns" - Architectural patterns for ROS 2 systems
  3. "Robot Systems Design" - Communication patterns in robotics

Summary

In this chapter, we've explored the three primary communication patterns in ROS 2 in detail: topics, services, and actions. We've learned how to implement each pattern effectively, understand their appropriate use cases, and design systems that combine them appropriately.

Choosing the right communication pattern is crucial for building efficient and maintainable robot systems. Topics are ideal for streaming data, services for request-response operations, and actions for long-running, goal-oriented tasks. Understanding these patterns is fundamental to creating complex robot systems that can operate effectively in the real world.

In the next chapter, we'll explore how to build ROS 2 packages with Python, implementing these communication patterns in practical applications.