Topic Communication and Data Streaming
Dr. Abdulkarim Albanna
Artificial Intelligence
University of Petra
3 hours (Week 3 of 16)
By the end of this week, students will be able to:
A node in ROS2 is a process representing the smallest program unit within the framework. Each node runs in its own independent memory space and is designed for a specific purpose, such as controlling a motor, capturing images from a camera, or performing path planning. Nodes communicate with one another by exchanging messages, facilitating collaborative functionality within the robotic system.
The publish-subscribe (pub-sub) pattern is a messaging paradigm where:
ROS2 supports different communication patterns between nodes. Understanding these patterns is crucial for designing efficient robotic systems.
One publisher sends data to exactly one subscriber.
Use Case: Direct sensor-to-controller communication, dedicated data streams.
One publisher broadcasts data to multiple subscribers.
Use Case: Broadcasting sensor data, system status updates, camera feeds.
Multiple publishers send data to one subscriber (data aggregation).
Use Case: Sensor fusion, multi-robot coordination, distributed data collection.
Multiple publishers and multiple subscribers share the same topic.
Use Case: Multi-robot systems, distributed logging, collaborative tasks.
| Pattern | Publishers | Subscribers | Best Use Case |
|---|---|---|---|
| 1:1 | 1 | 1 | Direct dedicated communication |
| 1:M | 1 | Many | Broadcasting data to multiple nodes |
| M:1 | Many | 1 | Data aggregation and fusion |
| M:M | Many | Many | Complex distributed systems |
| Concept | Description |
|---|---|
| Topic | Named channel for messages |
| Message | Data structure sent over topic |
| Publisher | Node that sends messages |
| Subscriber | Node that receives messages |
| Callback | Function called when message arrives |
| Queue Size | Number of messages buffered |
ROS2 provides several standard message packages:
std_msgs: Basic data types (Int, Float, String, Bool)geometry_msgs: Geometric data (Pose, Twist, Transform)sensor_msgs: Sensor data (Image, LaserScan, Imu, Temperature)nav_msgs: Navigation data (Odometry, Path, OccupancyGrid)| Package | Message | Purpose |
|---|---|---|
| std_msgs | String | Text messages |
| std_msgs | Int32 | Integer values |
| std_msgs | Float32 | Floating point values |
| std_msgs | Bool | Boolean values |
| geometry_msgs | Twist | Velocity commands (linear & angular) |
| geometry_msgs | Pose | Position and orientation |
| sensor_msgs | Temperature | Temperature readings |
| sensor_msgs | Imu | IMU sensor data |
| sensor_msgs | Image | Camera images |
# Example: geometry_msgs/Twist
# Used for velocity commands
geometry_msgs/Vector3 linear
float64 x # Forward/backward velocity
float64 y # Left/right velocity
float64 z # Up/down velocity
geometry_msgs/Vector3 angular
float64 x # Roll rate
float64 y # Pitch rate
float64 z # Yaw rate (turning)
A publisher node sends messages to a topic at regular or event-driven intervals.
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Temperature
import random
class TemperaturePublisher(Node):
def __init__(self):
super().__init__('temperature_publisher')
# Create publisher
self.publisher_ = self.create_publisher(
Temperature, # Message type
'temperature_data', # Topic name
10 # Queue size
)
# Create timer (publish every 1 second)
self.timer_ = self.create_timer(1.0, self.publish_temperature)
self.get_logger().info('Temperature Publisher started')
def publish_temperature(self):
# Create message
msg = Temperature()
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = 'temperature_sensor'
msg.temperature = 20.0 + random.uniform(-5.0, 5.0) # Simulate sensor
msg.variance = 0.5
# Publish message
self.publisher_.publish(msg)
self.get_logger().info(f'Published temperature: {msg.temperature:.2f} C')
def main(args=None):
rclpy.init(args=args)
node = TemperaturePublisher()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
self.publisher_ = self.create_publisher(
Temperature, # Message type
'temperature_data', # Topic name
10 # Queue size (QoS)
)
Parameters:
msg_type: The message class to publishtopic: Name of the topic (string)qos_profile: Queue size or QoS settingsself.timer_ = self.create_timer(
1.0, # Period in seconds
self.publish_temperature # Callback function
)
# Create message instance
msg = Temperature()
# Fill message fields
msg.temperature = 25.5
msg.variance = 0.5
# Publish
self.publisher_.publish(msg)
A subscriber node receives messages from a topic and processes them in a callback function.
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Temperature
class TemperatureSubscriber(Node):
def __init__(self):
super().__init__('temperature_subscriber')
# Create subscriber
self.subscription_ = self.create_subscription(
Temperature, # Message type
'temperature_data', # Topic name
self.temperature_callback, # Callback function
10 # Queue size
)
self.get_logger().info('Temperature Subscriber started')
def temperature_callback(self, msg):
"""Called when a message is received"""
temp = msg.temperature
self.get_logger().info(f'Received temperature: {temp:.2f} C')
# Process data
if temp > 30.0:
self.get_logger().warn('High temperature detected!')
elif temp < 10.0:
self.get_logger().warn('Low temperature detected!')
def main(args=None):
rclpy.init(args=args)
node = TemperatureSubscriber()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
self.subscription_ = self.create_subscription(
Temperature, # Message type
'temperature_data', # Topic name
self.temperature_callback, # Callback function
10 # Queue size
)
def temperature_callback(self, msg):
"""Automatically called when message arrives"""
# Access message data
temperature = msg.temperature
variance = msg.variance
# Process the data
self.get_logger().info(f'Temperature: {temperature}')
time.sleep()Quality of Service (QoS) policies control how messages are delivered between publishers and subscribers.
| Policy | Description |
|---|---|
| Reliability | Reliable (guaranteed) or Best Effort |
| Durability | Transient Local (keep last) or Volatile |
| History | Keep Last N or Keep All |
| Depth | Queue size (number of messages) |
| Lifespan | How long messages are valid |
| Deadline | Maximum time between messages |
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
# Custom QoS
qos_profile = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_LAST,
depth=10
)
# Use with publisher
self.publisher_ = self.create_publisher(
Temperature,
'temperature',
qos_profile
)
# Predefined profiles
from rclpy.qos import qos_profile_sensor_data
from rclpy.qos import qos_profile_system_default
self.sensor_pub_ = self.create_publisher(
Temperature,
'temperature',
qos_profile_sensor_data
)
# List all topics
ros2 topic list
# Show topic information
ros2 topic info /temperature
# Echo topic messages
ros2 topic echo /temperature
# Show message type
ros2 topic type /temperature
# Publish from command line
ros2 topic pub /cmd_vel geometry_msgs/msg/Twist \
"{linear: {x: 1.0}, angular: {z: 0.5}}"
# Check publishing rate
ros2 topic hz /temperature
# Bandwidth usage
ros2 topic bw /camera/image_raw
# Find topics by type
ros2 topic find sensor_msgs/msg/Temperature
Problem: Subscriber doesn't receive messages
Solutions:
ros2 node listProblem: Messages not received despite matching topics
Solution:
ros2 topic info /topic_name to check QoS/robot/sensors/temperaturetime.sleep() in callbacksclass RobotNode(Node):
def __init__(self):
super().__init__('robot_node')
# Publishers
self._setup_publishers()
# Subscribers
self._setup_subscribers()
# Timers
self._setup_timers()
# State variables
self._initialize_state()
self.get_logger().info('Node initialized')
def _setup_publishers(self):
"""Initialize all publishers"""
self.cmd_pub_ = self.create_publisher(...)
def _setup_subscribers(self):
"""Initialize all subscribers"""
self.sensor_sub_ = self.create_subscription(...)
def _setup_timers(self):
"""Initialize all timers"""
self.timer_ = self.create_timer(...)
def _initialize_state(self):
"""Initialize state variables"""
self.robot_state_ = "idle"
Create a complete sensor monitoring and fusion system for a simulated robot.
Package Name: sensor_fusion_system
sensor_publisher.py)
sensor_publisher.py)
data_logger.py)
alert_system.py)
sensor_fusion.py)
| Component | Points |
|---|---|
| Sensor Publisher Node | 20 |
| Data Logger Node | 20 |
| Alert System Node | 20 |
| Fusion Node | 20 |
| Code Quality & Documentation | 10 |
| Testing & Demonstration | 10 |
| Bonus Challenges | +10 |
| Total | 100 (+10) |
Complete Week 3 project and review callback functions
This week covered topic-based communication in ROS2, including publishers, subscribers, message types, and data fusion concepts. We implemented practical examples based on the Computer Vision Pipeline and Navigation System architectures.