Week 3: ROS2 Publishers & Subscribers

Topic Communication and Data Streaming


Instructor

Dr. Abdulkarim Albanna

Department

Artificial Intelligence

University

University of Petra

Duration

3 hours (Week 3 of 16)

Week Overview

Learning Objectives

By the end of this week, students will be able to:

  1. Understand the publish-subscribe communication pattern
  2. Create publishers to send data on topics
  3. Create subscribers to receive data from topics
  4. Work with standard ROS2 message types
  5. Handle callback functions effectively
  6. Implement real-time data processing
  7. Build a multi-sensor data fusion system
  8. Debug topic communication issues

Topics Covered

  • Publish-subscribe pattern
  • Topic communication
  • Creating publishers
  • Creating subscribers
  • Message types
  • Callback functions
  • Timer-based publishing
  • Data processing
  • Multi-topic communication
  • Debugging techniques

Understanding Topic Communication

ROS2 Nodes in Practice

A node in ROS2 is a process representing the smallest program unit within the framework. Each node runs in its own independent memory space and is designed for a specific purpose, such as controlling a motor, capturing images from a camera, or performing path planning. Nodes communicate with one another by exchanging messages, facilitating collaborative functionality within the robotic system.

Example 1: Computer Vision Pipeline
Camera_Driver Filter Feature_Extraction Object_Detection
  • A camera_driver node captures raw image data
  • A Filter node processes these images to enhance their quality
  • A feature_extraction node analyzes the processed image to identify relevant patterns
  • An object_detection node uses these features to locate and identify objects
Example 2: Navigation System
Path_Planning Motor_Driver
  • A path_planning node calculates the optimal route for navigation
  • A Motor_Driver node translates the planned path into motion by controlling the robot's motors

The Publish-Subscribe Pattern

The publish-subscribe (pub-sub) pattern is a messaging paradigm where:

  • Publishers send messages to topics
  • Subscribers receive messages from topics
  • Communication is anonymous and decoupled
  • Multiple publishers and subscribers can use the same topic
  • No direct connection between publisher and subscriber

Node Communication Patterns

ROS2 supports different communication patterns between nodes. Understanding these patterns is crucial for designing efficient robotic systems.

Pattern 1: One-to-One (1:1)

One publisher sends data to exactly one subscriber.

1:1 Communication
Publisher Topic: /data Subscriber

Use Case: Direct sensor-to-controller communication, dedicated data streams.

Pattern 2: One-to-Many (1:M)

One publisher broadcasts data to multiple subscribers.

1:M Communication
Publisher Topic: /sensor Subscriber 1 Subscriber 2 Subscriber 3

Use Case: Broadcasting sensor data, system status updates, camera feeds.

Pattern 3: Many-to-One (M:1)

Multiple publishers send data to one subscriber (data aggregation).

M:1 Communication
Publisher 1 Publisher 2 Publisher 3 Topic: /sensors Aggregator

Use Case: Sensor fusion, multi-robot coordination, distributed data collection.

Pattern 4: Many-to-Many (M:M)

Multiple publishers and multiple subscribers share the same topic.

M:M Communication
Publisher 1 Publisher 2 Topic: /data Subscriber 1 Subscriber 2 Subscriber 3

Use Case: Multi-robot systems, distributed logging, collaborative tasks.

Communication Pattern Comparison

Pattern Publishers Subscribers Best Use Case
1:1 1 1 Direct dedicated communication
1:M 1 Many Broadcasting data to multiple nodes
M:1 Many 1 Data aggregation and fusion
M:M Many Many Complex distributed systems

Key Concepts

Concept Description
Topic Named channel for messages
Message Data structure sent over topic
Publisher Node that sends messages
Subscriber Node that receives messages
Callback Function called when message arrives
Queue Size Number of messages buffered

ROS2 Message Types

Standard Message Packages

ROS2 provides several standard message packages:

  • std_msgs: Basic data types (Int, Float, String, Bool)
  • geometry_msgs: Geometric data (Pose, Twist, Transform)
  • sensor_msgs: Sensor data (Image, LaserScan, Imu, Temperature)
  • nav_msgs: Navigation data (Odometry, Path, OccupancyGrid)

Common Message Types

Package Message Purpose
std_msgs String Text messages
std_msgs Int32 Integer values
std_msgs Float32 Floating point values
std_msgs Bool Boolean values
geometry_msgs Twist Velocity commands (linear & angular)
geometry_msgs Pose Position and orientation
sensor_msgs Temperature Temperature readings
sensor_msgs Imu IMU sensor data
sensor_msgs Image Camera images

Message Structure Example

geometry_msgs/Twist - Used for velocity commands
# Example: geometry_msgs/Twist
# Used for velocity commands

geometry_msgs/Vector3 linear
  float64 x  # Forward/backward velocity
  float64 y  # Left/right velocity
  float64 z  # Up/down velocity

geometry_msgs/Vector3 angular
  float64 x  # Roll rate
  float64 y  # Pitch rate
  float64 z  # Yaw rate (turning)

Creating a Publisher

Publisher Basics

A publisher node sends messages to a topic at regular or event-driven intervals.

Simple Publisher Example

temperature_publisher.py
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Temperature
import random


class TemperaturePublisher(Node):
    def __init__(self):
        super().__init__('temperature_publisher')

        # Create publisher
        self.publisher_ = self.create_publisher(
            Temperature,        # Message type
            'temperature_data', # Topic name
            10                  # Queue size
        )

        # Create timer (publish every 1 second)
        self.timer_ = self.create_timer(1.0, self.publish_temperature)

        self.get_logger().info('Temperature Publisher started')

    def publish_temperature(self):
        # Create message
        msg = Temperature()
        msg.header.stamp = self.get_clock().now().to_msg()
        msg.header.frame_id = 'temperature_sensor'
        msg.temperature = 20.0 + random.uniform(-5.0, 5.0)  # Simulate sensor
        msg.variance = 0.5

        # Publish message
        self.publisher_.publish(msg)
        self.get_logger().info(f'Published temperature: {msg.temperature:.2f} C')


def main(args=None):
    rclpy.init(args=args)
    node = TemperaturePublisher()

    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()


if __name__ == '__main__':
    main()

Publisher Code Breakdown

Creating a Publisher
Creating Publisher
self.publisher_ = self.create_publisher(
    Temperature,        # Message type
    'temperature_data', # Topic name
    10                  # Queue size (QoS)
)

Parameters:

  • msg_type: The message class to publish
  • topic: Name of the topic (string)
  • qos_profile: Queue size or QoS settings
Creating a Timer
Creating Timer
self.timer_ = self.create_timer(
    1.0,                        # Period in seconds
    self.publish_temperature    # Callback function
)
Publishing a Message
Publishing Message
# Create message instance
msg = Temperature()

# Fill message fields
msg.temperature = 25.5
msg.variance = 0.5

# Publish
self.publisher_.publish(msg)

Creating a Subscriber

Subscriber Basics

A subscriber node receives messages from a topic and processes them in a callback function.

Simple Subscriber Example

temperature_subscriber.py
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Temperature


class TemperatureSubscriber(Node):
    def __init__(self):
        super().__init__('temperature_subscriber')

        # Create subscriber
        self.subscription_ = self.create_subscription(
            Temperature,              # Message type
            'temperature_data',       # Topic name
            self.temperature_callback,  # Callback function
            10                        # Queue size
        )

        self.get_logger().info('Temperature Subscriber started')

    def temperature_callback(self, msg):
        """Called when a message is received"""
        temp = msg.temperature
        self.get_logger().info(f'Received temperature: {temp:.2f} C')

        # Process data
        if temp > 30.0:
            self.get_logger().warn('High temperature detected!')
        elif temp < 10.0:
            self.get_logger().warn('Low temperature detected!')


def main(args=None):
    rclpy.init(args=args)
    node = TemperatureSubscriber()

    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()


if __name__ == '__main__':
    main()

Subscriber Code Breakdown

Creating a Subscriber
Creating Subscriber
self.subscription_ = self.create_subscription(
    Temperature,              # Message type
    'temperature_data',       # Topic name
    self.temperature_callback,  # Callback function
    10                        # Queue size
)
Callback Function
Callback Function
def temperature_callback(self, msg):
    """Automatically called when message arrives"""
    # Access message data
    temperature = msg.temperature
    variance = msg.variance

    # Process the data
    self.get_logger().info(f'Temperature: {temperature}')
Callback Function Rules
  • Must accept one parameter: the message
  • Called automatically when message arrives
  • Should be fast (non-blocking)
  • Avoid long computations
  • Don't use time.sleep()

Quality of Service (QoS)

Understanding QoS

Quality of Service (QoS) policies control how messages are delivered between publishers and subscribers.

Policy Description
Reliability Reliable (guaranteed) or Best Effort
Durability Transient Local (keep last) or Volatile
History Keep Last N or Keep All
Depth Queue size (number of messages)
Lifespan How long messages are valid
Deadline Maximum time between messages

QoS Profiles

QoS Configuration
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy

# Custom QoS
qos_profile = QoSProfile(
    reliability=ReliabilityPolicy.RELIABLE,
    history=HistoryPolicy.KEEP_LAST,
    depth=10
)

# Use with publisher
self.publisher_ = self.create_publisher(
    Temperature,
    'temperature',
    qos_profile
)

# Predefined profiles
from rclpy.qos import qos_profile_sensor_data
from rclpy.qos import qos_profile_system_default

self.sensor_pub_ = self.create_publisher(
    Temperature,
    'temperature',
    qos_profile_sensor_data
)

Debugging Topics

Command Line Tools

ROS2 Topic Commands
# List all topics
ros2 topic list

# Show topic information
ros2 topic info /temperature

# Echo topic messages
ros2 topic echo /temperature

# Show message type
ros2 topic type /temperature

# Publish from command line
ros2 topic pub /cmd_vel geometry_msgs/msg/Twist \
  "{linear: {x: 1.0}, angular: {z: 0.5}}"

# Check publishing rate
ros2 topic hz /temperature

# Bandwidth usage
ros2 topic bw /camera/image_raw

# Find topics by type
ros2 topic find sensor_msgs/msg/Temperature

Common Issues and Solutions

Topic Not Found

Problem: Subscriber doesn't receive messages

Solutions:

  • Check topic names match exactly (case-sensitive)
  • Verify publisher is running: ros2 node list
  • Check QoS compatibility
  • Ensure ROS_DOMAIN_ID matches
QoS Incompatibility

Problem: Messages not received despite matching topics

Solution:

  • Publisher and subscriber must have compatible QoS
  • Use ros2 topic info /topic_name to check QoS
  • Match reliability settings

Best Practices

Publisher Best Practices

  • Use descriptive topic names: /robot/sensors/temperature
  • Set appropriate publishing rates
  • Fill all message fields properly
  • Include timestamps in header
  • Use appropriate QoS for data type
  • Log important events

Subscriber Best Practices

  • Keep callbacks fast and non-blocking
  • Don't use time.sleep() in callbacks
  • Handle missing or invalid data gracefully
  • Use class variables to store latest data
  • Implement proper error handling
  • Log warnings for unexpected data

Code Organization

Best Practice Structure
class RobotNode(Node):
    def __init__(self):
        super().__init__('robot_node')

        # Publishers
        self._setup_publishers()

        # Subscribers
        self._setup_subscribers()

        # Timers
        self._setup_timers()

        # State variables
        self._initialize_state()

        self.get_logger().info('Node initialized')

    def _setup_publishers(self):
        """Initialize all publishers"""
        self.cmd_pub_ = self.create_publisher(...)

    def _setup_subscribers(self):
        """Initialize all subscribers"""
        self.sensor_sub_ = self.create_subscription(...)

    def _setup_timers(self):
        """Initialize all timers"""
        self.timer_ = self.create_timer(...)

    def _initialize_state(self):
        """Initialize state variables"""
        self.robot_state_ = "idle"

Week 3 Project

Project: Multi-Sensor Data Fusion System

Create a complete sensor monitoring and fusion system for a simulated robot.

Requirements

Package Name: sensor_fusion_system

Nodes to Implement:
  1. Sensor Publisher Node (sensor_publisher.py)
    • Sensor Publisher Node (sensor_publisher.py)
      • Publish IMU data (10 Hz)
      • Publish GPS data (1 Hz)
      • Publish Temperature data (2 Hz)
      • Publish Barometer/Altitude data (2 Hz)
      • Simulate realistic sensor noise
    • Data Logger Node (data_logger.py)
      • Subscribe to all sensor topics
      • Log data with timestamps
      • Calculate statistics (min, max, average)
      • Print summary every 5 seconds
    • Alert System Node (alert_system.py)
      • Monitor all sensors
      • Detect anomalies (out of range values)
      • Publish alerts on /alerts topic
      • Log warnings for critical conditions
    • Fusion Node (sensor_fusion.py)
      • Subscribe to multiple sensors
      • Combine data for state estimation
      • Publish fused output
      • Calculate altitude from barometer

Technical Requirements

  • Use appropriate message types from standard packages
  • Implement proper error handling
  • Include comprehensive logging
  • Use timers for periodic operations
  • Follow ROS2 naming conventions
  • Document all functions

Bonus Challenges (+10 points)

  • Implement data filtering (moving average)
  • Add data visualization (plot over time)
  • Create launch file for all nodes
  • Implement QoS profiles appropriately
  • Add parameter configuration
  • Write unit tests

Deliverables

  • Source Code: Complete package with all 4 nodes
  • Documentation:
    • README with setup instructions
    • Node architecture diagram
    • Topic diagram showing all connections
    • API documentation for each node
  • Screenshots:
    • All nodes running simultaneously
    • Topic list output
    • Sample log outputs
    • Alert system in action
  • Demo Video: 3-5 minutes showing system operation

Grading Rubric

Component Points
Sensor Publisher Node 20
Data Logger Node 20
Alert System Node 20
Fusion Node 20
Code Quality & Documentation 10
Testing & Demonstration 10
Bonus Challenges +10
Total 100 (+10)

Resources & References

Official Documentation

Next Week Preview

Week 4: Services and Clients

  • Request-response communication
  • Creating service servers
  • Creating service clients
  • Synchronous vs asynchronous calls
  • Custom service definitions
  • Error handling in services
Preparation

Complete Week 3 project and review callback functions

Summary

This week covered topic-based communication in ROS2, including publishers, subscribers, message types, and data fusion concepts. We implemented practical examples based on the Computer Vision Pipeline and Navigation System architectures.

Key Takeaways
  • Publishers and subscribers enable decoupled communication
  • Topics are named channels for message exchange
  • Callbacks handle incoming messages asynchronously
  • QoS policies control message delivery guarantees
  • Multiple communication patterns support different architectures
  • Proper debugging tools help troubleshoot topic issues