Conversational AI & Capstone Integration
Learning Objectives
By the end of this week, you will be able to:
- Implement multimodal conversational AI for humanoid robots
- Integrate speech recognition, natural language understanding, and dialogue management
- Combine all previous concepts into a complete autonomous humanoid system
- Deploy and test the full capstone project with conversational capabilities
- Evaluate the performance of the integrated system
Introduction to Conversational AI for Robotics
Conversational AI in robotics enables natural human-robot interaction through multiple modalities including speech, gesture, and vision. This integration represents the culmination of all concepts learned throughout the course.
Key Components of Conversational AI Systems
- Automatic Speech Recognition (ASR): Converting speech to text
- Natural Language Understanding (NLU): Interpreting user intent
- Dialog Management: Managing conversation flow and context
- Natural Language Generation (NLG): Creating appropriate responses
- Speech Synthesis: Converting text to speech
- Multimodal Integration: Combining speech with visual and gestural cues
Speech Recognition and Natural Language Understanding
Step 1: Speech Recognition System
Implement a robust speech recognition system that works in real-world environments:
# speech_recognition_node.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from audio_common_msgs.msg import AudioData
import speech_recognition as sr
import threading
import queue
import numpy as np
class SpeechRecognitionNode(Node):
def __init__(self):
super().__init__('speech_recognition_node')
# Publisher for recognized text
self.text_pub = self.create_publisher(String, 'speech_recognized', 10)
# Audio data subscriber
self.audio_sub = self.create_subscription(
AudioData,
'audio_input',
self.audio_callback,
10
)
# Initialize speech recognizer
self.recognizer = sr.Recognizer()
self.recognizer.energy_threshold = 4000 # Adjust based on environment
self.recognizer.dynamic_energy_threshold = True
self.recognizer.pause_threshold = 0.8 # Pause duration to consider phrase complete
self.audio_queue = queue.Queue()
# Start audio processing thread
self.audio_thread = threading.Thread(target=self.process_audio)
self.audio_thread.daemon = True
self.audio_thread.start()
self.get_logger().info('Speech recognition node initialized')
def audio_callback(self, msg):
"""Callback for audio data"""
try:
# Convert ROS AudioData to audio data for speech recognition
audio_data = sr.AudioData(
msg.data,
sample_rate=16000, # Adjust based on your audio source
sample_width=2 # 16-bit audio
)
# Add to processing queue
self.audio_queue.put(audio_data)
except Exception as e:
self.get_logger().error(f'Error processing audio callback: {str(e)}')
def process_audio(self):
"""Process audio data in a separate thread"""
while rclpy.ok():
try:
# Get audio from queue
audio_data = self.audio_queue.get(timeout=1.0)
# Perform speech recognition
try:
# Using Google Web Speech API (requires internet)
# For offline option, consider using pocketsphinx or vosk
text = self.recognizer.recognize_google(audio_data)
# Publish recognized text
text_msg = String()
text_msg.data = text
self.text_pub.publish(text_msg)
self.get_logger().info(f'Recognized: {text}')
except sr.UnknownValueError:
self.get_logger().info('Speech recognition could not understand audio')
except sr.RequestError as e:
self.get_logger().error(f'Could not request results from speech recognition service; {e}')
except Exception as e:
self.get_logger().error(f'Unexpected error in speech recognition: {str(e)}')
except queue.Empty:
continue
def main(args=None):
rclpy.init(args=args)
speech_node = SpeechRecognitionNode()
try:
rclpy.spin(speech_node)
except KeyboardInterrupt:
pass
finally:
speech_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Step 2: Natural Language Understanding
Implement natural language understanding to interpret commands and questions:
# nlu_node.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from geometry_msgs.msg import Pose
import re
import json
import spacy
from typing import Dict, List, Tuple
class NaturalLanguageUnderstandingNode(Node):
def __init__(self):
super().__init__('nlu_node')
# Subscribe to recognized speech
self.speech_sub = self.create_subscription(
String,
'speech_recognized',
self.speech_callback,
10
)
# Publishers for various commands
self.nav_goal_pub = self.create_publisher(Pose, 'navigation_goal', 10)
self.cmd_pub = self.create_publisher(String, 'robot_command', 10)
self.response_pub = self.create_publisher(String, 'robot_response', 10)
# Load spaCy model for NLP (install with: python -m spacy download en_core_web_sm)
try:
self.nlp = spacy.load("en_core_web_sm")
except OSError:
self.get_logger().warn("spaCy model not found. Install with: python -m spacy download en_core_web_sm")
self.nlp = None
# Define command patterns and intents
self.intent_patterns = {
'navigation': [
r'.*go to (.+)',
r'.*move to (.+)',
r'.*navigate to (.+)',
r'.*go (.+)',
r'.*walk to (.+)',
r'.*come to (.+)'
],
'manipulation': [
r'.*pick up (.+)',
r'.*grasp (.+)',
r'.*take (.+)',
r'.*get (.+)',
r'.*bring me (.+)'
],
'information': [
r'.*what is (.+)',
r'.*tell me about (.+)',
r'.*describe (.+)',
r'.*how many (.+)'
],
'social': [
r'.*hello',
r'.*hi',
r'.*good morning',
r'.*good afternoon',
r'.*good evening'
],
'control': [
r'.*stop',
r'.*halt',
r'.*wait',
r'.*pause',
r'.*follow me',
r'.*dance',
r'.*perform (.+)'
]
}
# Location mappings
self.location_map = {
'kitchen': (2.0, 3.0, 0.0),
'living room': (0.0, 0.0, 0.0),
'bedroom': (-2.0, 1.0, 0.0),
'office': (1.0, -2.0, 0.0),
'entrance': (0.0, 3.0, 0.0)
}
self.get_logger().info('Natural Language Understanding node initialized')
def speech_callback(self, msg):
"""Process recognized speech"""
text = msg.data.lower().strip()
# Process the command using NLP if available
if self.nlp:
doc = self.nlp(text)
command = self.process_with_nlp(doc, text)
else:
# Fallback to pattern matching
command = self.interpret_command(text)
if command:
self.get_logger().info(f'Interpreted command: {command}')
# Publish command
cmd_msg = String()
cmd_msg.data = json.dumps(command)
self.cmd_pub.publish(cmd_msg)
# Handle specific commands
self.handle_command(command)
# Generate response
response = self.generate_response(command)
response_msg = String()
response_msg.data = response
self.response_pub.publish(response_msg)
def process_with_nlp(self, doc, text):
"""Process text using spaCy NLP"""
# Extract entities and dependencies
entities = [(ent.text, ent.label_) for ent in doc.ents]
tokens = [(token.text, token.pos_, token.dep_) for token in doc]
# Identify intent based on root verb or action
intent = self.identify_intent_nlp(doc)
if intent:
return {
'type': intent,
'entities': entities,
'original_text': text,
'confidence': 0.8 # Placeholder confidence
}
# Fallback to pattern matching
return self.interpret_command(text)
def identify_intent_nlp(self, doc):
"""Identify intent using NLP analysis"""
# Look for action verbs
action_verbs = [token.lemma_ for token in doc if token.pos_ == "VERB"]
# Map verbs to intents
verb_to_intent = {
'go': 'navigation',
'move': 'navigation',
'navigate': 'navigation',
'pick': 'manipulation',
'grasp': 'manipulation',
'take': 'manipulation',
'get': 'manipulation',
'bring': 'manipulation',
'tell': 'information',
'describe': 'information',
'what': 'information',
'how': 'information',
'stop': 'control',
'halt': 'control',
'wait': 'control',
'follow': 'control',
'dance': 'control',
'perform': 'control'
}
for verb in action_verbs:
if verb in verb_to_intent:
return verb_to_intent[verb]
# Check for social intents
social_indicators = ['hello', 'hi', 'good']
for token in doc:
if token.lemma_ in social_indicators:
return 'social'
return None
def interpret_command(self, text):
"""Interpret natural language command using pattern matching"""
for intent, patterns in self.intent_patterns.items():
for pattern in patterns:
match = re.match(pattern, text)
if match:
if intent == 'navigation':
location = match.group(1).strip()
return {
'type': intent,
'location': location,
'coordinates': self.location_map.get(location, (0, 0, 0))
}
elif intent in ['manipulation', 'information']:
target = match.group(1).strip()
return {
'type': intent,
'target': target
}
elif intent == 'social':
return {
'type': intent,
'greeting': text
}
else: # control commands
return {
'type': intent,
'command': text
}
# If no pattern matches, return None
return None
def handle_command(self, command):
"""Handle the interpreted command"""
cmd_type = command['type']
if cmd_type == 'navigation':
self.handle_navigation_command(command)
elif cmd_type == 'manipulation':
self.handle_manipulation_command(command)
elif cmd_type == 'information':
self.handle_information_command(command)
elif cmd_type == 'social':
self.handle_social_command(command)
elif cmd_type == 'control':
self.handle_control_command(command)
def handle_navigation_command(self, command):
"""Handle navigation commands"""
location = command.get('location', 'unknown')
coords = command.get('coordinates', (0, 0, 0))
self.get_logger().info(f'Navigating to: {location} at coordinates {coords}')
# Create and publish navigation goal
pose_msg = Pose()
pose_msg.position.x = coords[0]
pose_msg.position.y = coords[1]
pose_msg.position.z = coords[2]
# Add orientation as needed
self.nav_goal_pub.publish(pose_msg)
def handle_manipulation_command(self, command):
"""Handle manipulation commands"""
target = command.get('target', 'unknown')
self.get_logger().info(f'Attempting to manipulate: {target}')
# In a real system, this would trigger perception and manipulation
# For now, just log the command
cmd_msg = String()
cmd_msg.data = f"manipulation_request:{target}"
self.cmd_pub.publish(cmd_msg)
def handle_information_command(self, command):
"""Handle information requests"""
target = command.get('target', 'unknown')
self.get_logger().info(f'Information requested about: {target}')
# In a real system, this would query knowledge base or perception
cmd_msg = String()
cmd_msg.data = f"information_request:{target}"
self.cmd_pub.publish(cmd_msg)
def handle_social_command(self, command):
"""Handle social/greeting commands"""
greeting = command.get('greeting', 'hello')
self.get_logger().info(f'Received greeting: {greeting}')
def handle_control_command(self, command):
"""Handle control commands"""
cmd = command.get('command', 'unknown')
self.get_logger().info(f'Received control command: {cmd}')
# Handle specific control commands
if 'stop' in cmd or 'halt' in cmd:
stop_msg = String()
stop_msg.data = 'stop_robot'
self.cmd_pub.publish(stop_msg)
def generate_response(self, command):
"""Generate an appropriate response to the user"""
cmd_type = command['type']
responses = {
'navigation': f"Okay, I'm navigating to {command.get('location', 'that location')}.",
'manipulation': f"Okay, I'll try to get the {command.get('target', 'object')}.",
'information': f"I'll look for information about {command.get('target', 'that')}.",
'social': "Hello! How can I help you today?",
'control': "I understand the command."
}
return responses.get(cmd_type, "I'm not sure I understand. Could you please repeat that?")
def main(args=None):
rclpy.init(args=args)
nlu_node = NaturalLanguageUnderstandingNode()
try:
rclpy.spin(nlu_node)
except KeyboardInterrupt:
pass
finally:
nlu_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Multimodal Interaction Integration
Step 3: Combining Speech, Vision, and Action
Create a coordinator node that integrates all modalities:
# multimodal_coordinator_node.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import String, Bool
from geometry_msgs.msg import Pose, Point
from sensor_msgs.msg import Image, CameraInfo
from vision_msgs.msg import Detection2DArray
from cv_bridge import CvBridge
import json
import time
from collections import deque
class MultimodalCoordinatorNode(Node):
def __init__(self):
super().__init__('multimodal_coordinator')
# Subscribers
self.speech_sub = self.create_subscription(
String,
'speech_recognized',
self.speech_callback,
10
)
self.nlu_sub = self.create_subscription(
String,
'nlu_output',
self.nlu_callback,
10
)
self.vision_sub = self.create_subscription(
Detection2DArray,
'object_detections',
self.vision_callback,
10
)
self.gaze_sub = self.create_subscription(
Point,
'gaze_target',
self.gaze_callback,
10
)
# Publishers
self.cmd_pub = self.create_publisher(String, 'robot_command', 10)
self.response_pub = self.create_publisher(String, 'robot_response', 10)
self.gesture_pub = self.create_publisher(String, 'gesture_command', 10)
self.attention_pub = self.create_publisher(Point, 'attention_target', 10)
# Initialize components
self.bridge = CvBridge()
self.current_task = None
self.context = {}
self.response_queue = deque(maxlen=10) # Keep last 10 responses
self.last_interaction_time = time.time()
# State management
self.interaction_state = 'idle' # idle, listening, processing, responding
self.attention_target = Point(x=0.0, y=0.0, z=0.0)
# Timer for state management
self.state_timer = self.create_timer(0.1, self.state_callback)
self.get_logger().info('Multimodal coordinator node initialized')
def speech_callback(self, msg):
"""Handle speech input"""
self.interaction_state = 'processing'
self.last_interaction_time = time.time()
# Process speech with NLU
self.process_speech(msg.data)
def nlu_callback(self, msg):
"""Handle NLU output"""
try:
command = json.loads(msg.data)
self.execute_command(command)
except json.JSONDecodeError:
self.get_logger().error('Invalid JSON in NLU output')
def vision_callback(self, msg):
"""Handle visual input"""
# Update context with detected objects
detected_objects = []
for detection in msg.detections:
obj_info = {
'class': detection.results[0].hypothesis.class_id,
'confidence': detection.results[0].hypothesis.score,
'position': detection.pose
}
detected_objects.append(obj_info)
self.context['detected_objects'] = detected_objects
self.update_attention_target()
def gaze_callback(self, msg):
"""Handle gaze input"""
self.attention_target = msg
self.context['gaze_target'] = (msg.x, msg.y, msg.z)
def process_speech(self, text):
"""Process speech input and determine action"""
# This would typically call an NLU service
# For now, we'll do simple processing
self.get_logger().info(f'Processing speech: {text}')
# Determine if this is a follow-up to previous interaction
time_since_last = time.time() - self.last_interaction_time
is_follow_up = time_since_last < 10.0 # 10 seconds
# Update context
self.context['last_speech'] = text
self.context['is_follow_up'] = is_follow_up
# Simple intent detection
if any(word in text.lower() for word in ['look', 'see', 'find', 'where']):
self.interaction_state = 'searching'
elif any(word in text.lower() for word in ['go', 'move', 'navigate']):
self.interaction_state = 'navigating'
elif any(word in text.lower() for word in ['grasp', 'pick', 'take', 'get']):
self.interaction_state = 'manipulating'
else:
self.interaction_state = 'responding'
def execute_command(self, command):
"""Execute the interpreted command"""
cmd_type = command.get('type', 'unknown')
if cmd_type == 'navigation':
self.execute_navigation(command)
elif cmd_type == 'manipulation':
self.execute_manipulation(command)
elif cmd_type == 'information':
self.execute_information(command)
elif cmd_type == 'social':
self.execute_social(command)
elif cmd_type == 'control':
self.execute_control(command)
self.interaction_state = 'responding'
def execute_navigation(self, command):
"""Execute navigation command"""
location = command.get('location', 'unknown')
coords = command.get('coordinates', (0, 0, 0))
# Publish navigation command
nav_cmd = String()
nav_cmd.data = f"navigate_to:{location}:{coords[0]}:{coords[1]}:{coords[2]}"
self.cmd_pub.publish(nav_cmd)
self.get_logger().info(f'Navigating to {location}')
def execute_manipulation(self, command):
"""Execute manipulation command"""
target = command.get('target', 'unknown')
# First, look for the object
look_cmd = String()
look_cmd.data = f"look_for:{target}"
self.cmd_pub.publish(look_cmd)
# Then attempt to grasp
grasp_cmd = String()
grasp_cmd.data = f"grasp:{target}"
self.cmd_pub.publish(grasp_cmd)
self.get_logger().info(f'Attempting to manipulate {target}')
def execute_information(self, command):
"""Execute information command"""
target = command.get('target', 'unknown')
# Query knowledge base or perception system
query_cmd = String()
query_cmd.data = f"query_info:{target}"
self.cmd_pub.publish(query_cmd)
self.get_logger().info(f'Querying information about {target}')
def execute_social(self, command):
"""Execute social command"""
greeting = command.get('greeting', 'hello')
# Generate appropriate social response
response = self.generate_social_response(greeting)
response_msg = String()
response_msg.data = response
self.response_pub.publish(response_msg)
# Possibly trigger gesture
gesture_cmd = String()
gesture_cmd.data = "wave"
self.gesture_pub.publish(gesture_cmd)
def execute_control(self, command):
"""Execute control command"""
cmd = command.get('command', 'unknown')
# Publish control command
control_msg = String()
control_msg.data = cmd
self.cmd_pub.publish(control_msg)
def generate_social_response(self, greeting):
"""Generate appropriate social response"""
if any(word in greeting for word in ['hello', 'hi']):
return "Hello there! How can I assist you today?"
elif 'good morning' in greeting:
return "Good morning! What would you like to do today?"
elif 'good afternoon' in greeting:
return "Good afternoon! How can I help you?"
elif 'good evening' in greeting:
return "Good evening! What can I do for you?"
else:
return "Hello! How can I help you?"
def update_attention_target(self):
"""Update attention target based on context"""
if 'gaze_target' in self.context:
# Use explicit gaze target
x, y, z = self.context['gaze_target']
target = Point(x=x, y=y, z=z)
elif 'detected_objects' in self.context and self.context['detected_objects']:
# Use most confident detection
detections = self.context['detected_objects']
best_detection = max(detections, key=lambda x: x['confidence'])
target = best_detection['position']
else:
# Default to center
target = Point(x=0.0, y=0.0, z=1.0)
self.attention_target = target
self.attention_pub.publish(target)
def state_callback(self):
"""Manage interaction state"""
current_time = time.time()
time_since_interaction = current_time - self.last_interaction_time
# Reset to idle if no interaction for 30 seconds
if time_since_interaction > 30.0 and self.interaction_state != 'idle':
self.interaction_state = 'idle'
self.get_logger().info('Resetting to idle state due to inactivity')
def generate_response(self, command):
"""Generate a response based on command and context"""
cmd_type = command.get('type', 'unknown')
responses = {
'navigation': f"Okay, I'm navigating to {command.get('location', 'that location')}.",
'manipulation': f"Okay, I'll try to get the {command.get('target', 'object')}.",
'information': f"I'll look for information about {command.get('target', 'that')}.",
'social': "Hello! How can I help you today?",
'control': "I understand the command."
}
return responses.get(cmd_type, "I'm processing your request.")
def main(args=None):
rclpy.init(args=args)
coordinator = MultimodalCoordinatorNode()
try:
rclpy.spin(coordinator)
except KeyboardInterrupt:
pass
finally:
coordinator.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Capstone Integration
Step 4: Complete System Integration
Now I'll create the complete capstone integration that brings together all components:
# capstone_integration_node.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import String, Bool, Float32
from geometry_msgs.msg import Pose, Twist, Point
from sensor_msgs.msg import Image, LaserScan, JointState
from nav_msgs.msg import Odometry
from visualization_msgs.msg import Marker
import json
import time
from threading import Lock
class CapstoneIntegrationNode(Node):
def __init__(self):
super().__init__('capstone_integration')
# Subscribers for all system components
self.speech_sub = self.create_subscription(
String,
'speech_recognized',
self.speech_callback,
10
)
self.vision_sub = self.create_subscription(
String,
'object_detections',
self.vision_callback,
10
)
self.nav_sub = self.create_subscription(
String,
'navigation_status',
self.nav_callback,
10
)
self.manip_sub = self.create_subscription(
String,
'manipulation_status',
self.manip_callback,
10
)
self.odom_sub = self.create_subscription(
Odometry,
'odom',
self.odom_callback,
10
)
self.scan_sub = self.create_subscription(
LaserScan,
'scan',
self.scan_callback,
10
)
# Publishers for integrated system
self.cmd_pub = self.create_publisher(String, 'system_command', 10)
self.response_pub = self.create_publisher(String, 'system_response', 10)
self.status_pub = self.create_publisher(String, 'system_status', 10)
self.behavior_pub = self.create_publisher(String, 'behavior_command', 10)
# Internal state
self.system_state = {
'current_task': 'idle',
'task_queue': [],
'robot_pose': Pose(),
'battery_level': 100.0,
'safety_status': 'safe',
'active_components': [],
'last_interaction': time.time()
}
self.state_lock = Lock()
self.task_execution_timer = self.create_timer(0.1, self.task_execution_callback)
self.status_timer = self.create_timer(1.0, self.status_callback)
self.get_logger().info('Capstone integration node initialized')
def speech_callback(self, msg):
"""Handle speech input and update system state"""
with self.state_lock:
self.system_state['last_interaction'] = time.time()
self.get_logger().info(f'Processing speech command: {msg.data}')
# Parse and queue task based on speech
task = self.parse_speech_command(msg.data)
if task:
self.system_state['task_queue'].append(task)
self.system_state['current_task'] = 'processing_command'
def vision_callback(self, msg):
"""Handle vision input and update system state"""
with self.state_lock:
# Update object detections in system state
try:
detections = json.loads(msg.data)
self.system_state['detections'] = detections
except json.JSONDecodeError:
self.get_logger().error('Invalid vision data format')
def nav_callback(self, msg):
"""Handle navigation status updates"""
with self.state_lock:
status = json.loads(msg.data) if msg.data.startswith('{') else {'status': msg.data}
self.system_state['navigation'] = status
def manip_callback(self, msg):
"""Handle manipulation status updates"""
with self.state_lock:
status = json.loads(msg.data) if msg.data.startswith('{') else {'status': msg.data}
self.system_state['manipulation'] = status
def odom_callback(self, msg):
"""Handle odometry updates"""
with self.state_lock:
self.system_state['robot_pose'] = msg.pose.pose
def scan_callback(self, msg):
"""Handle laser scan for safety"""
with self.state_lock:
# Check for obstacles
min_range = min(msg.ranges) if msg.ranges else float('inf')
if min_range < 0.5: # Less than 50cm to obstacle
self.system_state['safety_status'] = 'unsafe_approaching'
else:
self.system_state['safety_status'] = 'safe'
def parse_speech_command(self, text):
"""Parse speech command and create task"""
text_lower = text.lower()
if any(word in text_lower for word in ['go to', 'navigate to', 'move to']):
# Extract location
for loc in ['kitchen', 'living room', 'bedroom', 'office', 'entrance']:
if loc in text_lower:
return {
'type': 'navigation',
'target': loc,
'priority': 'high',
'timestamp': time.time()
}
elif any(word in text_lower for word in ['pick up', 'grasp', 'get', 'take']):
# Extract object
words = text_lower.split()
obj_idx = -1
for i, word in enumerate(words):
if word in ['pick', 'grasp', 'get', 'take']:
obj_idx = i + 1
break
if obj_idx < len(words):
obj = words[obj_idx]
return {
'type': 'manipulation',
'target': obj,
'priority': 'high',
'timestamp': time.time()
}
elif any(word in text_lower for word in ['tell me', 'what is', 'describe']):
return {
'type': 'information',
'query': text,
'priority': 'medium',
'timestamp': time.time()
}
elif any(word in text_lower for word in ['hello', 'hi', 'good']):
return {
'type': 'social',
'greeting': text,
'priority': 'low',
'timestamp': time.time()
}
return None
def task_execution_callback(self):
"""Execute tasks from the queue"""
with self.state_lock:
if not self.system_state['task_queue']:
# Check if we need to switch from processing to idle
if (self.system_state['current_task'] == 'processing_command' and
time.time() - self.system_state['last_interaction'] > 5.0):
self.system_state['current_task'] = 'idle'
return
# Get highest priority task
task = self.system_state['task_queue'][0] # Simple FIFO for now
# Execute based on task type
if task['type'] == 'navigation':
self.execute_navigation_task(task)
elif task['type'] == 'manipulation':
self.execute_manipulation_task(task)
elif task['type'] == 'information':
self.execute_information_task(task)
elif task['type'] == 'social':
self.execute_social_task(task)
# Remove executed task
self.system_state['task_queue'].pop(0)
def execute_navigation_task(self, task):
"""Execute navigation task"""
location = task['target']
# In a real system, this would call navigation stack
nav_cmd = String()
nav_cmd.data = f"navigate_to:{location}"
self.cmd_pub.publish(nav_cmd)
self.system_state['current_task'] = f'navigating_to_{location}'
self.get_logger().info(f'Executing navigation task to {location}')
def execute_manipulation_task(self, task):
"""Execute manipulation task"""
obj = task['target']
# First localize object, then manipulate
local_cmd = String()
local_cmd.data = f"localize:{obj}"
self.cmd_pub.publish(local_cmd)
# After localization, attempt manipulation
manip_cmd = String()
manip_cmd.data = f"manipulate:{obj}"
self.cmd_pub.publish(manip_cmd)
self.system_state['current_task'] = f'manipulating_{obj}'
self.get_logger().info(f'Executing manipulation task for {obj}')
def execute_information_task(self, task):
"""Execute information task"""
query = task['query']
# Query knowledge base or perception
info_cmd = String()
info_cmd.data = f"query:{query}"
self.cmd_pub.publish(info_cmd)
self.system_state['current_task'] = 'answering_query'
self.get_logger().info(f'Executing information task for query: {query}')
def execute_social_task(self, task):
"""Execute social task"""
greeting = task['greeting']
# Generate response
response = self.generate_social_response(greeting)
response_msg = String()
response_msg.data = response
self.response_pub.publish(response_msg)
# Trigger social behavior
behavior_cmd = String()
behavior_cmd.data = "greet_user"
self.behavior_pub.publish(behavior_cmd)
self.system_state['current_task'] = 'social_interaction'
self.get_logger().info(f'Executing social task: {greeting}')
def generate_social_response(self, greeting):
"""Generate appropriate social response"""
greeting_lower = greeting.lower()
if any(word in greeting_lower for word in ['hello', 'hi']):
return "Hello! How can I assist you today?"
elif 'good morning' in greeting_lower:
return "Good morning! It's great to see you."
elif 'good afternoon' in greeting_lower:
return "Good afternoon! What would you like to do?"
elif 'good evening' in greeting_lower:
return "Good evening! How can I help you?"
else:
return "I heard you. How can I assist you?"
def status_callback(self):
"""Publish system status periodically"""
with self.state_lock:
status_msg = String()
status_msg.data = json.dumps({
'current_task': self.system_state['current_task'],
'task_queue_size': len(self.system_state['task_queue']),
'robot_pose': {
'x': self.system_state['robot_pose'].position.x,
'y': self.system_state['robot_pose'].position.y,
'z': self.system_state['robot_pose'].position.z
},
'battery_level': self.system_state['battery_level'],
'safety_status': self.system_state['safety_status'],
'active_components': self.system_state['active_components']
})
self.status_pub.publish(status_msg)
def check_system_integrity(self):
"""Check if all components are operational"""
with self.state_lock:
# This would check if all expected nodes are running
# For now, we'll assume they are
return True
def main(args=None):
rclpy.init(args=args)
integration_node = CapstoneIntegrationNode()
try:
rclpy.spin(integration_node)
except KeyboardInterrupt:
pass
finally:
integration_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
System Launch and Testing
Step 5: Integration Launch File
Create a launch file to bring up the complete system:
# launch/capstone_system_launch.py
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument, IncludeLaunchDescription, RegisterEventHandler
from launch.event_handlers import OnProcessStart
from launch.launch_description_sources import PythonLaunchDescriptionSource
from launch.substitutions import LaunchConfiguration, PathJoinSubstitution
from launch_ros.actions import Node
from launch_ros.substitutions import FindPackageShare
from ament_index_python.packages import get_package_share_directory
import os
def generate_launch_description():
# Launch arguments
use_sim_time = LaunchConfiguration('use_sim_time', default='true')
robot_namespace = LaunchConfiguration('robot_namespace', default='humanoid_robot')
# Speech recognition node
speech_recognition_node = Node(
package='capstone_speech',
executable='speech_recognition_node',
name='speech_recognition',
parameters=[{'use_sim_time': use_sim_time}],
output='screen'
)
# Natural language understanding node
nlu_node = Node(
package='capstone_nlu',
executable='nlu_node',
name='natural_language_understanding',
parameters=[{'use_sim_time': use_sim_time}],
output='screen'
)
# Perception pipeline node
perception_node = Node(
package='isaac_ros_detectnet',
executable='isaac_ros_detectnet',
name='perception_system',
parameters=[{
'input_topic': '/camera/image_raw',
'model_name': 'ssd_mobilenet_v2_coco',
'confidence_threshold': 0.5
}],
output='screen'
)
# Navigation system
navigation_node = Node(
package='nav2_bringup',
executable='nav2_bringup',
name='navigation_system',
parameters=[{
'use_sim_time': use_sim_time,
'autostart': True
}],
output='screen'
)
# Manipulation system
manipulation_node = Node(
package='moveit_ros',
executable='moveit_planning',
name='manipulation_system',
parameters=[{
'robot_description': 'robot_description',
'use_sim_time': use_sim_time
}],
output='screen'
)
# Multimodal coordinator
multimodal_coordinator = Node(
package='capstone_integration',
executable='multimodal_coordinator_node',
name='multimodal_coordinator',
parameters=[{'use_sim_time': use_sim_time}],
output='screen'
)
# Capstone integration node
capstone_integration = Node(
package='capstone_integration',
executable='capstone_integration_node',
name='capstone_integration',
parameters=[{'use_sim_time': use_sim_time}],
output='screen'
)
# Text-to-speech node
tts_node = Node(
package='tts_package',
executable='text_to_speech_node',
name='text_to_speech',
parameters=[{'use_sim_time': use_sim_time}],
output='screen'
)
# Robot state publisher
robot_state_publisher = Node(
package='robot_state_publisher',
executable='robot_state_publisher',
name='robot_state_publisher',
parameters=[{
'use_sim_time': use_sim_time,
'publish_frequency': 50.0
}],
output='screen'
)
return LaunchDescription([
DeclareLaunchArgument(
'use_sim_time',
default_value='true',
description='Use simulation (Gazebo) clock if true'
),
DeclareLaunchArgument(
'robot_namespace',
default_value='humanoid_robot',
description='Robot namespace for multi-robot systems'
),
speech_recognition_node,
nlu_node,
perception_node,
navigation_node,
manipulation_node,
multimodal_coordinator,
capstone_integration,
tts_node,
robot_state_publisher
])
Assessment and Evaluation
Step 6: Capstone Assessment
What are the key components of a multimodal conversational AI system for robotics?
How does the capstone integration node handle conflicting tasks from different modalities?
Capstone Project Completion
Step 7: Final Integration Testing
To complete the capstone project, you should test the integrated system with the following scenarios:
-
Basic Interaction Test:
- Say "Hello" to the robot
- Verify it responds appropriately with speech and gesture
-
Navigation Test:
- Command "Go to kitchen"
- Verify the robot plans a path and navigates safely
-
Manipulation Test:
- Command "Pick up the red cup"
- Verify the robot localizes the object and attempts manipulation
-
Multimodal Test:
- Point to an object and say "What is that?"
- Verify the robot looks at the pointed location and identifies the object
-
Complex Task Test:
- Command "Go to the kitchen and bring me the water bottle"
- Verify the robot breaks this into subtasks and executes them sequentially
Project Deliverables
Complete the following to finish the capstone project:
- Integrated System: All components working together in a cohesive system
- Conversational Interface: Natural language interaction with the robot
- Multimodal Capabilities: Integration of speech, vision, and action
- Safety Features: Proper safety checks and emergency stops
- Documentation: Complete documentation of the integrated system
- Testing Results: Evidence of successful testing across all scenarios
Extension Activities
- Learning Capabilities: Implement machine learning for improved interaction over time
- Emotion Recognition: Add facial expression recognition for more natural interaction
- Multi-Robot Coordination: Extend to multiple robots working together
- Cloud Integration: Connect to cloud services for enhanced capabilities
- Mobile App Interface: Create a mobile app for remote interaction
This concludes the Physical AI & Humanoid Robotics textbook. You now have a comprehensive understanding of how to build, integrate, and deploy complete humanoid robotics systems with conversational AI capabilities.