Request-Response Communication Patterns
Dr. Abdulkarim Albanna
Artificial Intelligence
University of Petra
3 hours (Week 4 of 16)
By the end of this week, students will be able to:
Services in ROS2 provide a request/response communication pattern, unlike topics which use publish/subscribe. Services are ideal for:
In this architecture:
| Feature | Topics (Pub/Sub) | Services (Req/Res) |
|---|---|---|
| Pattern | Publish/Subscribe | Request/Response |
| Communication | One-to-many | One-to-one |
| Timing | Asynchronous (fire and forget) | Synchronous (wait for response) |
| Connection | Anonymous, decoupled | Direct connection required |
| Data Flow | Continuous streaming | Discrete transactions |
| Best For | Sensor data, status updates | Commands, queries, computations |
| Example Use | Camera feed, odometry | Start navigation, compute path |
Continuous data streaming
Request-response transaction
Service definitions use .srv files with two parts separated by ---:
# Request
int64 a
int64 b
---
# Response
int64 sum
# Request
string task_type # "move", "scan", "report"
string target # target location or object
int32 priority # 1-10 (10 = highest)
---
# Response
bool success # whether task was accepted
string message # status message
int32 task_id # unique task identifier
float32 estimated_time # estimated completion time (seconds)
---)#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts
class AddTwoIntsServer(Node):
def __init__(self):
super().__init__('add_two_ints_server')
# Create service server
self.srv = self.create_service(
AddTwoInts, # Service type
'add_two_ints', # Service name
self.add_two_ints_callback # Callback function
)
self.get_logger().info('Add Two Ints Server is ready')
def add_two_ints_callback(self, request, response):
"""Handle incoming service requests"""
# Access request data
a = request.a
b = request.b
# Perform computation
response.sum = a + b
# Log the operation
self.get_logger().info(
f'Incoming request: {a} + {b} = {response.sum}'
)
# Return response
return response
def main(args=None):
rclpy.init(args=args)
node = AddTwoIntsServer()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from my_robot_interfaces.srv import RobotTask
from queue import PriorityQueue
import threading
import time
class TaskManagementServer(Node):
def __init__(self):
super().__init__('task_management_server')
# Create service
self.srv = self.create_service(
RobotTask,
'robot_task',
self.handle_task_request
)
# Task management
self.task_queue = PriorityQueue()
self.task_counter = 0
self.current_task = None
self.lock = threading.Lock()
# Start task processor thread
self.processor_thread = threading.Thread(
target=self.process_tasks,
daemon=True
)
self.processor_thread.start()
self.get_logger().info('Task Management Server ready')
def handle_task_request(self, request, response):
"""Handle incoming task requests"""
with self.lock:
self.task_counter += 1
task_id = self.task_counter
# Validate task type
valid_tasks = ['move', 'scan', 'report']
if request.task_type not in valid_tasks:
response.success = False
response.message = f'Invalid task: {request.task_type}'
response.task_id = -1
response.estimated_time = 0.0
return response
# Calculate estimated time
time_map = {'move': 5.0, 'scan': 3.0, 'report': 1.0}
estimated_time = time_map.get(request.task_type, 1.0)
# Add to priority queue (negative for max-heap)
task_data = {
'id': task_id,
'type': request.task_type,
'target': request.target,
'priority': request.priority
}
self.task_queue.put((-request.priority, task_id, task_data))
# Prepare response
response.success = True
response.message = f'Task queued: {request.task_type}'
response.task_id = task_id
response.estimated_time = estimated_time
self.get_logger().info(
f'Task {task_id} queued: {request.task_type} '
f'to {request.target} (priority: {request.priority})'
)
return response
def process_tasks(self):
"""Background thread to process queued tasks"""
while True:
if not self.task_queue.empty():
_, task_id, task_data = self.task_queue.get()
self.execute_task(task_data)
time.sleep(0.1)
def execute_task(self, task_data):
"""Execute a single task"""
self.current_task = task_data
self.get_logger().info(
f'Executing task {task_data["id"]}: '
f'{task_data["type"]} - {task_data["target"]}'
)
# Simulate task execution
time_map = {'move': 5.0, 'scan': 3.0, 'report': 1.0}
time.sleep(time_map.get(task_data['type'], 1.0))
self.get_logger().info(
f'Task {task_data["id"]} completed'
)
self.current_task = None
def main(args=None):
rclpy.init(args=args)
server = TaskManagementServer()
rclpy.spin(server)
rclpy.shutdown()
if __name__ == '__main__':
main()
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts
import sys
class AddTwoIntsClient(Node):
def __init__(self):
super().__init__('add_two_ints_client')
# Create client
self.client = self.create_client(
AddTwoInts, # Service type
'add_two_ints' # Service name
)
# Wait for service to be available
while not self.client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Service not available, waiting...')
self.get_logger().info('Connected to service')
def send_request(self, a, b):
"""Send a service request"""
# Create request
request = AddTwoInts.Request()
request.a = a
request.b = b
# Call service asynchronously
future = self.client.call_async(request)
# Wait for result
rclpy.spin_until_future_complete(self, future)
if future.result() is not None:
return future.result()
else:
self.get_logger().error('Service call failed')
return None
def main(args=None):
rclpy.init(args=args)
if len(sys.argv) != 3:
print('Usage: ros2 run pkg client ')
return
client = AddTwoIntsClient()
a = int(sys.argv[1])
b = int(sys.argv[2])
result = client.send_request(a, b)
if result:
client.get_logger().info(f'Result: {a} + {b} = {result.sum}')
client.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from my_robot_interfaces.srv import RobotTask
class TaskClient(Node):
def __init__(self):
super().__init__('task_client')
self.client = self.create_client(RobotTask, 'robot_task')
# Wait for service
if not self.client.wait_for_service(timeout_sec=5.0):
self.get_logger().error('Service not available')
raise RuntimeError('Service unavailable')
self.get_logger().info('Task client ready')
def send_task(self, task_type, target, priority):
"""Send task request asynchronously"""
request = RobotTask.Request()
request.task_type = task_type
request.target = target
request.priority = priority
future = self.client.call_async(request)
future.add_done_callback(self.response_callback)
return future
def response_callback(self, future):
"""Handle service response"""
try:
response = future.result()
if response.success:
self.get_logger().info(
f'Task accepted! ID: {response.task_id}, '
f'ETA: {response.estimated_time}s - {response.message}'
)
else:
self.get_logger().warn(
f'Task rejected: {response.message}'
)
except Exception as e:
self.get_logger().error(f'Service call failed: {e}')
def send_multiple_tasks(self):
"""Send multiple tasks"""
tasks = [
('move', 'warehouse', 8),
('scan', 'area_a', 9),
('move', 'station_1', 5),
('report', 'central', 3)
]
for task_type, target, priority in tasks:
self.send_task(task_type, target, priority)
def main(args=None):
rclpy.init(args=args)
client = TaskClient()
# Send tasks
client.send_multiple_tasks()
# Keep node alive to receive callbacks
rclpy.spin(client)
client.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
| Aspect | Synchronous | Asynchronous |
|---|---|---|
| Blocking | Yes - waits for response | No - returns immediately |
| Complexity | Simple, sequential | More complex, callbacks |
| Performance | Can block other operations | Better for concurrent tasks |
| Use Case | Simple scripts, testing | Real-time systems, multiple requests |
Use Synchronous when:
Use Asynchronous when:
def send_request_with_timeout(self, request, timeout=5.0):
"""Send request with timeout handling"""
future = self.client.call_async(request)
start_time = self.get_clock().now()
while rclpy.ok():
rclpy.spin_once(self, timeout_sec=0.1)
if future.done():
try:
response = future.result()
return response
except Exception as e:
self.get_logger().error(f'Service failed: {e}')
return None
# Check timeout
elapsed = (self.get_clock().now() - start_time).nanoseconds / 1e9
if elapsed > timeout:
self.get_logger().warn('Service call timed out')
return None
return None
import time
def send_request_with_retry(self, request, max_retries=3):
"""Send request with automatic retry"""
for attempt in range(max_retries):
try:
future = self.client.call_async(request)
rclpy.spin_until_future_complete(
self,
future,
timeout_sec=5.0
)
if future.done():
response = future.result()
if response:
return response
self.get_logger().warn(
f'Attempt {attempt + 1} failed, retrying...'
)
except Exception as e:
self.get_logger().error(
f'Attempt {attempt + 1} error: {e}'
)
# Exponential backoff: 1s, 2s, 4s
time.sleep(2 ** attempt)
self.get_logger().error('All retry attempts failed')
return None