Week 4: Services and Client-Server Architecture

Request-Response Communication Patterns


Instructor

Dr. Abdulkarim Albanna

Department

Artificial Intelligence

University

University of Petra

Duration

3 hours (Week 4 of 16)

Week Overview

Learning Objectives

By the end of this week, students will be able to:

  1. Understand the request-response communication pattern
  2. Create service servers to handle requests
  3. Create service clients to send requests
  4. Work with custom service definitions
  5. Implement synchronous and asynchronous service calls
  6. Handle errors and timeouts in service communication
  7. Build a complete robot task management system
  8. Debug service communication issues

Topics Covered

  • Request-response pattern
  • Service architecture
  • Creating service servers
  • Creating service clients
  • Custom service definitions
  • Synchronous vs asynchronous calls
  • Error handling and timeouts
  • Retry mechanisms
  • Concurrent service requests
  • Debugging techniques

Introduction to Service-Based Communication

What are Services?

Services in ROS2 provide a request/response communication pattern, unlike topics which use publish/subscribe. Services are ideal for:

  • One-time operations (e.g., trigger scan, reset system)
  • Request-response interactions (e.g., compute path, get status)
  • Actions requiring acknowledgment
  • Remote procedure calls (RPC)

Service Communication Flow

Basic Service Communication
Service Client Request Service Server Response
Key Characteristics of Services
  • Synchronous: Client waits for response
  • One-to-One: Direct connection between client and server
  • Blocking: Can block execution until complete
  • Reliable: Guaranteed delivery and response

Service Architecture

Example: Robot Task Management

Robot Task Management System
Client 1 Client 2 Client 3 Task Server /robot_task Priority Queue Task Executor Robot

In this architecture:

  • Multiple clients send task requests (move, scan, report)
  • The task server receives and queues tasks by priority
  • A task executor processes tasks sequentially
  • The robot executes the actual commands
  • See example

Services vs Topics

Feature Topics (Pub/Sub) Services (Req/Res)
Pattern Publish/Subscribe Request/Response
Communication One-to-many One-to-one
Timing Asynchronous (fire and forget) Synchronous (wait for response)
Connection Anonymous, decoupled Direct connection required
Data Flow Continuous streaming Discrete transactions
Best For Sensor data, status updates Commands, queries, computations
Example Use Camera feed, odometry Start navigation, compute path
Topic Communication
Publisher Sub 1 Sub 2 Sub 3

Continuous data streaming

Service Communication
Client Request Response Server

Request-response transaction

Service Definition Files

Creating Custom Service Types

Service definitions use .srv files with two parts separated by ---:

Example: AddTwoInts.srv
# Request
int64 a
int64 b
---
# Response
int64 sum

Robot Task Service Definition

RobotTask.srv - For task management system
# Request
string task_type    # "move", "scan", "report"
string target       # target location or object
int32 priority      # 1-10 (10 = highest)
---
# Response
bool success        # whether task was accepted
string message      # status message
int32 task_id       # unique task identifier
float32 estimated_time  # estimated completion time (seconds)
Service Definition Structure
  • Request section: Parameters sent by client
  • Separator: Three dashes (---)
  • Response section: Data returned by server
  • Use standard ROS2 data types or custom messages

Creating Service Servers

Basic Service Server Example

add_two_ints_server.py
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts


class AddTwoIntsServer(Node):
    def __init__(self):
        super().__init__('add_two_ints_server')

        # Create service server
        self.srv = self.create_service(
            AddTwoInts,              # Service type
            'add_two_ints',          # Service name
            self.add_two_ints_callback  # Callback function
        )

        self.get_logger().info('Add Two Ints Server is ready')

    def add_two_ints_callback(self, request, response):
        """Handle incoming service requests"""
        # Access request data
        a = request.a
        b = request.b

        # Perform computation
        response.sum = a + b

        # Log the operation
        self.get_logger().info(
            f'Incoming request: {a} + {b} = {response.sum}'
        )

        # Return response
        return response


def main(args=None):
    rclpy.init(args=args)
    node = AddTwoIntsServer()

    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()


if __name__ == '__main__':
    main()

Robot Task Server with Priority Queue

task_server.py - Complete implementation
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from my_robot_interfaces.srv import RobotTask
from queue import PriorityQueue
import threading
import time


class TaskManagementServer(Node):
    def __init__(self):
        super().__init__('task_management_server')

        # Create service
        self.srv = self.create_service(
            RobotTask,
            'robot_task',
            self.handle_task_request
        )

        # Task management
        self.task_queue = PriorityQueue()
        self.task_counter = 0
        self.current_task = None
        self.lock = threading.Lock()

        # Start task processor thread
        self.processor_thread = threading.Thread(
            target=self.process_tasks,
            daemon=True
        )
        self.processor_thread.start()

        self.get_logger().info('Task Management Server ready')

    def handle_task_request(self, request, response):
        """Handle incoming task requests"""
        with self.lock:
            self.task_counter += 1
            task_id = self.task_counter

        # Validate task type
        valid_tasks = ['move', 'scan', 'report']
        if request.task_type not in valid_tasks:
            response.success = False
            response.message = f'Invalid task: {request.task_type}'
            response.task_id = -1
            response.estimated_time = 0.0
            return response

        # Calculate estimated time
        time_map = {'move': 5.0, 'scan': 3.0, 'report': 1.0}
        estimated_time = time_map.get(request.task_type, 1.0)

        # Add to priority queue (negative for max-heap)
        task_data = {
            'id': task_id,
            'type': request.task_type,
            'target': request.target,
            'priority': request.priority
        }
        self.task_queue.put((-request.priority, task_id, task_data))

        # Prepare response
        response.success = True
        response.message = f'Task queued: {request.task_type}'
        response.task_id = task_id
        response.estimated_time = estimated_time

        self.get_logger().info(
            f'Task {task_id} queued: {request.task_type} '
            f'to {request.target} (priority: {request.priority})'
        )

        return response

    def process_tasks(self):
        """Background thread to process queued tasks"""
        while True:
            if not self.task_queue.empty():
                _, task_id, task_data = self.task_queue.get()
                self.execute_task(task_data)
            time.sleep(0.1)

    def execute_task(self, task_data):
        """Execute a single task"""
        self.current_task = task_data
        self.get_logger().info(
            f'Executing task {task_data["id"]}: '
            f'{task_data["type"]} - {task_data["target"]}'
        )

        # Simulate task execution
        time_map = {'move': 5.0, 'scan': 3.0, 'report': 1.0}
        time.sleep(time_map.get(task_data['type'], 1.0))

        self.get_logger().info(
            f'Task {task_data["id"]} completed'
        )
        self.current_task = None


def main(args=None):
    rclpy.init(args=args)
    server = TaskManagementServer()
    rclpy.spin(server)
    rclpy.shutdown()


if __name__ == '__main__':
    main()

Creating Service Clients

Basic Service Client Example

add_two_ints_client.py
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts
import sys


class AddTwoIntsClient(Node):
    def __init__(self):
        super().__init__('add_two_ints_client')

        # Create client
        self.client = self.create_client(
            AddTwoInts,      # Service type
            'add_two_ints'   # Service name
        )

        # Wait for service to be available
        while not self.client.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('Service not available, waiting...')

        self.get_logger().info('Connected to service')

    def send_request(self, a, b):
        """Send a service request"""
        # Create request
        request = AddTwoInts.Request()
        request.a = a
        request.b = b

        # Call service asynchronously
        future = self.client.call_async(request)

        # Wait for result
        rclpy.spin_until_future_complete(self, future)

        if future.result() is not None:
            return future.result()
        else:
            self.get_logger().error('Service call failed')
            return None


def main(args=None):
    rclpy.init(args=args)

    if len(sys.argv) != 3:
        print('Usage: ros2 run pkg client  ')
        return

    client = AddTwoIntsClient()

    a = int(sys.argv[1])
    b = int(sys.argv[2])

    result = client.send_request(a, b)

    if result:
        client.get_logger().info(f'Result: {a} + {b} = {result.sum}')

    client.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

Robot Task Client with Multiple Requests

task_client.py - Asynchronous client
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from my_robot_interfaces.srv import RobotTask


class TaskClient(Node):
    def __init__(self):
        super().__init__('task_client')

        self.client = self.create_client(RobotTask, 'robot_task')

        # Wait for service
        if not self.client.wait_for_service(timeout_sec=5.0):
            self.get_logger().error('Service not available')
            raise RuntimeError('Service unavailable')

        self.get_logger().info('Task client ready')

    def send_task(self, task_type, target, priority):
        """Send task request asynchronously"""
        request = RobotTask.Request()
        request.task_type = task_type
        request.target = target
        request.priority = priority

        future = self.client.call_async(request)
        future.add_done_callback(self.response_callback)

        return future

    def response_callback(self, future):
        """Handle service response"""
        try:
            response = future.result()
            if response.success:
                self.get_logger().info(
                    f'Task accepted! ID: {response.task_id}, '
                    f'ETA: {response.estimated_time}s - {response.message}'
                )
            else:
                self.get_logger().warn(
                    f'Task rejected: {response.message}'
                )
        except Exception as e:
            self.get_logger().error(f'Service call failed: {e}')

    def send_multiple_tasks(self):
        """Send multiple tasks"""
        tasks = [
            ('move', 'warehouse', 8),
            ('scan', 'area_a', 9),
            ('move', 'station_1', 5),
            ('report', 'central', 3)
        ]

        for task_type, target, priority in tasks:
            self.send_task(task_type, target, priority)


def main(args=None):
    rclpy.init(args=args)
    client = TaskClient()

    # Send tasks
    client.send_multiple_tasks()

    # Keep node alive to receive callbacks
    rclpy.spin(client)

    client.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

Synchronous vs Asynchronous Calls

Synchronous Call Flow
Send Request ⏳ WAIT (Blocked) Receive Response Continue Execution
Asynchronous Call Flow
Send Request ✓ Continue (Non-blocking) Do Other Work Callback Triggered
Aspect Synchronous Asynchronous
Blocking Yes - waits for response No - returns immediately
Complexity Simple, sequential More complex, callbacks
Performance Can block other operations Better for concurrent tasks
Use Case Simple scripts, testing Real-time systems, multiple requests
When to Use Each Approach

Use Synchronous when:

  • Writing simple command-line tools
  • Sequential task execution is required
  • Testing and debugging services

Use Asynchronous when:

  • Node needs to remain responsive
  • Handling multiple concurrent requests
  • Building real-time robotic systems
  • Processing callbacks from other sources

Error Handling and Timeouts

Timeout Handling

Service call with timeout
def send_request_with_timeout(self, request, timeout=5.0):
    """Send request with timeout handling"""
    future = self.client.call_async(request)

    start_time = self.get_clock().now()
    while rclpy.ok():
        rclpy.spin_once(self, timeout_sec=0.1)

        if future.done():
            try:
                response = future.result()
                return response
            except Exception as e:
                self.get_logger().error(f'Service failed: {e}')
                return None

        # Check timeout
        elapsed = (self.get_clock().now() - start_time).nanoseconds / 1e9
        if elapsed > timeout:
            self.get_logger().warn('Service call timed out')
            return None

    return None

Retry Logic with Exponential Backoff

Automatic retry with backoff
import time

def send_request_with_retry(self, request, max_retries=3):
    """Send request with automatic retry"""
    for attempt in range(max_retries):
        try:
            future = self.client.call_async(request)
            rclpy.spin_until_future_complete(
                self,
                future,
                timeout_sec=5.0
            )

            if future.done():
                response = future.result()
                if response:
                    return response

            self.get_logger().warn(
                f'Attempt {attempt + 1} failed, retrying...'
            )

        except Exception as e:
            self.get_logger().error(
                f'Attempt {attempt + 1} error: {e}'
            )

        # Exponential backoff: 1s, 2s, 4s
        time.sleep(2 ** attempt)

    self.get_logger().error('All retry attempts failed')
    return None
Timeout Best Practices
  • Set timeout to 2-5x expected execution time
  • Consider network latency in distributed systems
  • Too short: False timeouts and wasted resources
  • Too long: Poor user experience
  • Always log timeout events for debugging

Complete Example: Task Management System

Complete System Architecture
Application Layer Task Client 1 Task Client 2 Service Layer Task Server /robot_task service Business Logic Priority Queue Task Executor Hardware Robot
System Features
  • ✓ Multiple clients can submit tasks concurrently
  • ✓ Tasks are prioritized in a queue (1-10, highest first)
  • ✓ Server validates and acknowledges requests immediately
  • ✓ Task executor processes tasks sequentially
  • ✓ Three task types: move, scan, report
  • ✓ Estimated completion time provided to clients

Debugging Services

Command Line Tools

ROS2 Service Commands