Skip to content
8 changes: 8 additions & 0 deletions controller_manager/controller_manager/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
set_hardware_component_state,
switch_controllers,
unload_controller,
get_parameter_from_param_file,
set_controller_parameters,
set_controller_parameters_from_param_file,
bcolors,
)

__all__ = [
Expand All @@ -36,4 +40,8 @@
"set_hardware_component_state",
"switch_controllers",
"unload_controller",
"get_parameter_from_param_file",
"set_controller_parameters",
"set_controller_parameters_from_param_file",
"bcolors",
]
117 changes: 117 additions & 0 deletions controller_manager/controller_manager/controller_manager_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,29 @@
)

import rclpy
import yaml
from rcl_interfaces.msg import Parameter

# @note: The versions conditioning is added here to support the source-compatibility with Humble
# The `get_parameter_value` function is moved to `rclpy.parameter` module from `ros2param.api` module from version 3.6.0
try:
from rclpy.parameter import get_parameter_value
except ImportError:
from ros2param.api import get_parameter_value
from ros2param.api import call_set_parameters


# from https://stackoverflow.com/a/287944
class bcolors:
MAGENTA = "\033[95m"
OKBLUE = "\033[94m"
OKCYAN = "\033[96m"
OKGREEN = "\033[92m"
WARNING = "\033[93m"
FAIL = "\033[91m"
ENDC = "\033[0m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"


class ServiceNotFoundError(Exception):
Expand Down Expand Up @@ -220,3 +243,97 @@ def unload_controller(node, controller_manager_name, controller_name, service_ti
request,
service_timeout,
)


def get_parameter_from_param_file(controller_name, namespace, parameter_file, parameter_name):
with open(parameter_file) as f:
namespaced_controller = (
controller_name if namespace == "/" else f"{namespace}/{controller_name}"
)
parameters = yaml.safe_load(f)
if namespaced_controller in parameters:
value = parameters[namespaced_controller]
if not isinstance(value, dict) or "ros__parameters" not in value:
raise RuntimeError(
f"YAML file : {parameter_file} is not a valid ROS parameter file for controller : {namespaced_controller}"
)
if parameter_name in parameters[namespaced_controller]["ros__parameters"]:
return parameters[namespaced_controller]["ros__parameters"][parameter_name]
else:
return None
else:
return None


def set_controller_parameters(
node, controller_manager_name, controller_name, parameter_name, parameter_value
):
parameter = Parameter()
parameter.name = controller_name + "." + parameter_name
parameter_string = str(parameter_value)
parameter.value = get_parameter_value(string_value=parameter_string)

response = call_set_parameters(
node=node, node_name=controller_manager_name, parameters=[parameter]
)
assert len(response.results) == 1
result = response.results[0]
if result.successful:
node.get_logger().info(
bcolors.OKCYAN
+ 'Setting controller param "'
+ parameter_name
+ '" to "'
+ parameter_string
+ '" for '
+ bcolors.BOLD
+ controller_name
+ bcolors.ENDC
)
else:
node.get_logger().fatal(
bcolors.FAIL
+ 'Could not set controller param "'
+ parameter_name
+ '" to "'
+ parameter_string
+ '" for '
+ bcolors.BOLD
+ controller_name
+ bcolors.ENDC
)
return False
return True


def set_controller_parameters_from_param_file(
node, controller_manager_name, controller_name, parameter_file, namespace=None
):
if parameter_file:
spawner_namespace = namespace if namespace else node.get_namespace()
set_controller_parameters(
node, controller_manager_name, controller_name, "param_file", parameter_file
)

controller_type = get_parameter_from_param_file(
controller_name, spawner_namespace, parameter_file, "type"
)
if controller_type:
if not set_controller_parameters(
node, controller_manager_name, controller_name, "type", controller_type
):
return False

fallback_controllers = get_parameter_from_param_file(
controller_name, spawner_namespace, parameter_file, "fallback_controllers"
)
if fallback_controllers:
if not set_controller_parameters(
node,
controller_manager_name,
controller_name,
"fallback_controllers",
fallback_controllers,
):
return False
return True
14 changes: 1 addition & 13 deletions controller_manager/controller_manager/hardware_spawner.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from controller_manager import (
list_hardware_components,
set_hardware_component_state,
bcolors,
)
from controller_manager.controller_manager_services import ServiceNotFoundError

Expand All @@ -28,19 +29,6 @@
from rclpy.signals import SignalHandlerOptions


# from https://stackoverflow.com/a/287944
class bcolors:
HEADER = "\033[95m"
OKBLUE = "\033[94m"
OKCYAN = "\033[96m"
OKGREEN = "\033[92m"
WARNING = "\033[93m"
FAIL = "\033[91m"
ENDC = "\033[0m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"


def first_match(iterable, predicate):
return next((n for n in iterable if predicate(n)), None)

Expand Down
165 changes: 9 additions & 156 deletions controller_manager/controller_manager/spawner.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,43 +19,21 @@
import sys
import time
import warnings
import yaml

from controller_manager import (
configure_controller,
list_controllers,
load_controller,
switch_controllers,
unload_controller,
set_controller_parameters_from_param_file,
bcolors,
)
from controller_manager.controller_manager_services import ServiceNotFoundError

import rclpy
from rcl_interfaces.msg import Parameter
from rclpy.node import Node

# @note: The versions conditioning is added here to support the source-compatibility with Humble
# The `get_parameter_value` function is moved to `rclpy.parameter` module from `ros2param.api` module from version 3.6.0
try:
from rclpy.parameter import get_parameter_value
except ImportError:
from ros2param.api import get_parameter_value
from rclpy.signals import SignalHandlerOptions
from ros2param.api import call_set_parameters

# from https://stackoverflow.com/a/287944


class bcolors:
MAGENTA = "\033[95m"
OKBLUE = "\033[94m"
OKCYAN = "\033[96m"
OKGREEN = "\033[92m"
WARNING = "\033[93m"
FAIL = "\033[91m"
ENDC = "\033[0m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"


def first_match(iterable, predicate):
Expand Down Expand Up @@ -87,24 +65,6 @@ def is_controller_loaded(node, controller_manager, controller_name, service_time
return any(c.name == controller_name for c in controllers)


def get_parameter_from_param_file(controller_name, namespace, parameter_file, parameter_name):
with open(parameter_file) as f:
namespaced_controller = (
controller_name if namespace == "/" else f"{namespace}/{controller_name}"
)
parameters = yaml.safe_load(f)
if namespaced_controller in parameters:
value = parameters[namespaced_controller]
if not isinstance(value, dict) or "ros__parameters" not in value:
raise RuntimeError(
f"YAML file : {parameter_file} is not a valid ROS parameter file for controller : {namespaced_controller}"
)
if parameter_name in parameters[namespaced_controller]["ros__parameters"]:
return parameters[namespaced_controller]["ros__parameters"][parameter_name]
else:
return None


def main(args=None):
rclpy.init(args=args, signal_handler_options=SignalHandlerOptions.NO)
parser = argparse.ArgumentParser()
Expand Down Expand Up @@ -162,14 +122,6 @@ def main(args=None):
action="store_true",
required=False,
)
parser.add_argument(
"--fallback_controllers",
help="Fallback controllers list are activated as a fallback strategy when the"
" spawned controllers fail. When the argument is provided, it takes precedence over"
" the fallback_controllers list in the param file",
default=None,
nargs="+",
)

command_line_args = rclpy.utilities.remove_ros_args(args=sys.argv)[1:]
args = parser.parse_args(command_line_args)
Expand Down Expand Up @@ -210,7 +162,6 @@ def main(args=None):

try:
for controller_name in controller_names:
fallback_controllers = args.fallback_controllers

if is_controller_loaded(
node, controller_manager_name, controller_name, controller_manager_timeout
Expand All @@ -221,112 +172,14 @@ def main(args=None):
+ bcolors.ENDC
)
else:
controller_type = (
None
if param_file is None
else get_parameter_from_param_file(
controller_name, spawner_namespace, param_file, "type"
)
)
if controller_type:
parameter = Parameter()
parameter.name = controller_name + ".type"
parameter.value = get_parameter_value(string_value=controller_type)

response = call_set_parameters(
node=node, node_name=controller_manager_name, parameters=[parameter]
)
assert len(response.results) == 1
result = response.results[0]
if result.successful:
node.get_logger().info(
bcolors.OKCYAN
+ 'Set controller type to "'
+ controller_type
+ '" for '
+ bcolors.BOLD
+ controller_name
+ bcolors.ENDC
)
else:
node.get_logger().fatal(
bcolors.FAIL
+ 'Could not set controller type to "'
+ controller_type
+ '" for '
+ bcolors.BOLD
+ controller_name
+ bcolors.ENDC
)
return 1

if param_file:
parameter = Parameter()
parameter.name = controller_name + ".params_file"
parameter.value = get_parameter_value(string_value=param_file)

response = call_set_parameters(
node=node, node_name=controller_manager_name, parameters=[parameter]
)
assert len(response.results) == 1
result = response.results[0]
if result.successful:
node.get_logger().info(
bcolors.OKCYAN
+ 'Set controller params file to "'
+ param_file
+ '" for '
+ bcolors.BOLD
+ controller_name
+ bcolors.ENDC
)
else:
node.get_logger().fatal(
bcolors.FAIL
+ 'Could not set controller params file to "'
+ param_file
+ '" for '
+ bcolors.BOLD
+ controller_name
+ bcolors.ENDC
)
return 1

if not fallback_controllers and param_file:
fallback_controllers = get_parameter_from_param_file(
controller_name, spawner_namespace, param_file, "fallback_controllers"
)

if fallback_controllers:
parameter = Parameter()
parameter.name = controller_name + ".fallback_controllers"
parameter.value = get_parameter_value(string_value=str(fallback_controllers))

response = call_set_parameters(
node=node, node_name=controller_manager_name, parameters=[parameter]
)
assert len(response.results) == 1
result = response.results[0]
if result.successful:
node.get_logger().info(
bcolors.OKCYAN
+ 'Setting fallback_controllers to ["'
+ ",".join(fallback_controllers)
+ '"] for '
+ bcolors.BOLD
+ controller_name
+ bcolors.ENDC
)
else:
node.get_logger().fatal(
bcolors.FAIL
+ 'Could not set fallback_controllers to ["'
+ ",".join(fallback_controllers)
+ '"] for '
+ bcolors.BOLD
+ controller_name
+ bcolors.ENDC
)
if not set_controller_parameters_from_param_file(
node,
controller_manager_name,
controller_name,
param_file,
spawner_namespace,
):
return 1

ret = load_controller(node, controller_manager_name, controller_name)
Expand Down
Loading