Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 18 additions & 16 deletions demo_nodes_py/demo_nodes_py/events/matched_event_detect.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Union

from example_interfaces.msg import String
import rclpy
from rclpy.event_handler import PublisherEventCallbacks
Expand Down Expand Up @@ -39,7 +41,7 @@

class MatchedEventDetectNode(Node):

def __init__(self, pub_topic_name: String, sub_topic_name: String):
def __init__(self, pub_topic_name: str, sub_topic_name: str):
super().__init__('matched_event_detection_node')
self.__any_subscription_connected = False # used for publisher event
self.__any_publisher_connected = False # used for subscription event
Expand All @@ -49,10 +51,10 @@ def __init__(self, pub_topic_name: String, sub_topic_name: String):
event_callbacks=pub_event_callback)

sub_event_callback = SubscriptionEventCallbacks(matched=self.__sub_matched_event_callback)
self.sub = self.create_subscription(String, sub_topic_name, lambda msg: ...,
self.sub = self.create_subscription(String, sub_topic_name, lambda msg: None,
10, event_callbacks=sub_event_callback)

def __pub_matched_event_callback(self, info: QoSPublisherMatchedInfo):
def __pub_matched_event_callback(self, info: QoSPublisherMatchedInfo) -> None:
if self.__any_subscription_connected:
if info.current_count == 0:
self.get_logger().info('Last subscription is disconnected.')
Expand All @@ -69,7 +71,7 @@ def __pub_matched_event_callback(self, info: QoSPublisherMatchedInfo):

self.future.set_result(True)

def __sub_matched_event_callback(self, info: QoSSubscriptionMatchedInfo):
def __sub_matched_event_callback(self, info: QoSSubscriptionMatchedInfo) -> None:
if self.__any_publisher_connected:
if info.current_count == 0:
self.get_logger().info('Last publisher is disconnected.')
Expand All @@ -86,25 +88,25 @@ def __sub_matched_event_callback(self, info: QoSSubscriptionMatchedInfo):

self.future.set_result(True)

def get_future(self):
self.future = Future()
def get_future(self) -> Future[bool]:
self.future: Future[bool] = Future()
return self.future


class MultiSubNode(Node):

def __init__(self, topic_name: String):
def __init__(self, topic_name: str):
super().__init__('multi_sub_node')
self.__subs = []
self.__subs: list[Subscription[String]] = []
self.__topic_name = topic_name

def create_one_sub(self) -> Subscription:
def create_one_sub(self) -> Subscription[String]:
self.get_logger().info('Create a new subscription.')
sub = self.create_subscription(String, self.__topic_name, lambda msg: ..., 10)
sub = self.create_subscription(String, self.__topic_name, lambda msg: None, 10)
self.__subs.append(sub)
return sub

def destroy_one_sub(self, sub: Subscription):
def destroy_one_sub(self, sub: Subscription[String]) -> None:

if sub in self.__subs:
self.get_logger().info('Destroy a subscription.')
Expand All @@ -114,26 +116,26 @@ def destroy_one_sub(self, sub: Subscription):

class MultiPubNode(Node):

def __init__(self, topic_name: String):
def __init__(self, topic_name: str):
super().__init__('multi_pub_node')
self.__pubs = []
self.__pubs: list[Publisher[String]] = []
self.__topic_name = topic_name

def create_one_pub(self) -> Publisher:
def create_one_pub(self) -> Publisher[String]:
self.get_logger().info('Create a new publisher.')
pub = self.create_publisher(String, self.__topic_name, 10)
self.__pubs.append(pub)
return pub

def destroy_one_pub(self, pub: Publisher):
def destroy_one_pub(self, pub: Publisher[String]) -> None:

if pub in self.__pubs:
self.get_logger().info('Destroy a publisher.')
self.__pubs.remove(pub)
self.destroy_publisher(pub)


def main(args=None):
def main(args: Union[list[str], None] = None) -> None:
try:
with rclpy.init(args=args):
topic_name_for_detect_pub_matched_event = 'pub_topic_matched_event_detect'
Expand Down
38 changes: 24 additions & 14 deletions demo_nodes_py/demo_nodes_py/logging/use_logger_service.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#!/usr/bin/env python3
# Copyright 2023 Sony Group Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -15,6 +16,8 @@
import argparse
Comment thread
cronenberg64 marked this conversation as resolved.
import threading
import time
from typing import Literal
from typing import Union

from example_interfaces.msg import String
from rcl_interfaces.msg import LoggerLevel
Expand Down Expand Up @@ -66,12 +69,12 @@

class LoggerServiceNode(Node):

def __init__(self):
def __init__(self) -> None:
super().__init__('LoggerServiceNode', enable_logger_service=True)
self.child_logger = self.get_logger().get_child('child')
self.sub = self.create_subscription(String, 'output', self.callback, 10)

def callback(self, msg):
def callback(self, msg: String) -> None:
self.get_logger().debug(msg.data + ' with DEBUG logger level.')
self.get_logger().info(msg.data + ' with INFO logger level.')
self.get_logger().warning(msg.data + ' with WARN logger level.')
Expand All @@ -84,7 +87,7 @@ def callback(self, msg):

class TestNode(Node):

def __init__(self, remote_node_name):
def __init__(self, remote_node_name: str) -> None:
super().__init__('TestNode')
self.pub = self.create_publisher(String, 'output', 10)
self.logger_get_client = self.create_client(
Expand All @@ -93,15 +96,17 @@ def __init__(self, remote_node_name):
SetLoggerLevels, remote_node_name + '/set_logger_levels')
self._remote_node_name = remote_node_name

def set_logger_level_on_remote_node(self, logger_level, logger_name='') -> bool:
def set_logger_level_on_remote_node(
self, logger_level: int, logger_name: str = '',
) -> bool:
if not self._logger_set_client.service_is_ready():
return False

request = SetLoggerLevels.Request()
set_logger_level = LoggerLevel()
set_logger_level.name = logger_name if logger_name else self._remote_node_name
set_logger_level.level = logger_level
request.levels.append(set_logger_level)
request.levels.append(set_logger_level) # type: ignore[attr-defined]

future = self._logger_set_client.call_async(request)
rclpy.spin_until_future_complete(self, future)
Expand All @@ -117,37 +122,42 @@ def set_logger_level_on_remote_node(self, logger_level, logger_name='') -> bool:

return True

def get_logger_level_on_remote_node(self, logger_name=''):
def get_logger_level_on_remote_node(
self, logger_name: str = '',
) -> Union[tuple[Literal[False], None], tuple[Literal[True], int]]:
if not self.logger_get_client.service_is_ready():
return [False, None]
return False, None

request = GetLoggerLevels.Request()
request.names.append(logger_name if logger_name else self._remote_node_name)
name = logger_name if logger_name else self._remote_node_name
request.names = [name]

future = self.logger_get_client.call_async(request)
rclpy.spin_until_future_complete(self, future)

ret_results = future.result()
if not ret_results:
return [False, None]
return False, None

return [True, ret_results.levels[0].level]
return True, ret_results.levels[0].level


def get_logger_level_func(test_node, child_logger_name):
ret, level = test_node.get_logger_level_on_remote_node()
def get_logger_level_func(test_node: TestNode, child_logger_name: str) -> None:
results = test_node.get_logger_level_on_remote_node()
ret, level = results
if ret:
test_node.get_logger().info('Current logger level: ' + str(level))
else:
test_node.get_logger().error('Failed to get logger level via logger service !')
ret, child_level = test_node.get_logger_level_on_remote_node(child_logger_name)
child_results = test_node.get_logger_level_on_remote_node(child_logger_name)
ret, child_level = child_results
if ret:
test_node.get_logger().info('Current child logger level: ' + str(child_level))
else:
test_node.get_logger().error('Failed to get child logger level via logger service !')


def main(args=None):
def main(args: Union[list[str], None] = None) -> None:
# Check for --service-only flag before ROS 2 consumes the arguments
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument('--service-only', action='store_true', default=False)
Expand Down
56 changes: 29 additions & 27 deletions demo_nodes_py/demo_nodes_py/parameters/async_param_client.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#!/usr/bin/env python3
# Copyright 2022 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -23,7 +24,7 @@
from rclpy.parameter_client import AsyncParameterClient


def main(args=None):
def main(args: list[str] | None = None) -> None:
try:
with rclpy.init():
node = rclpy.create_node('async_param_client')
Expand All @@ -44,63 +45,64 @@ def main(args=None):
rclpy.spin_until_future_complete(node, future)
set_parameters_result = future.result()
if set_parameters_result is not None:
for i, v in enumerate(set_parameters_result.results):
for i, set_param_result in enumerate(set_parameters_result.results):
logger.info(f' {param_list[i]}:')
logger.info(f' successful: {v.successful}')
logger.info(f' successful: {set_param_result.successful}')
else:
logger.error(f'Error setting parameters: {future.exception()}')

logger.info('Listing Parameters:')
future = client.list_parameters(param_list, 10)
rclpy.spin_until_future_complete(node, future)
list_parameters_result = future.result()
list_future = client.list_parameters(param_list, 10)
rclpy.spin_until_future_complete(node, list_future)
list_parameters_result = list_future.result()
if list_parameters_result is not None:
for param_name in list_parameters_result.result.names:
logger.info(f' - {param_name}')
else:
logger.error(f'Error listing parameters: {future.exception()}')
logger.error(f'Error listing parameters: {list_future.exception()}')

logger.info('Getting parameters:')
future = client.get_parameters(param_list)
rclpy.spin_until_future_complete(node, future)
get_parameters_result = future.result()
get_future = client.get_parameters(param_list)
rclpy.spin_until_future_complete(node, get_future)
get_parameters_result = get_future.result()
if get_parameters_result is not None:
for i, v in enumerate(get_parameters_result.values):
logger.info(f' - {param_list[i]}: {parameter_value_to_python(v)}')
for i, get_param_value in enumerate(get_parameters_result.values):
logger.info(
f' - {param_list[i]}: {parameter_value_to_python(get_param_value)}')
else:
logger.error(f'Error getting parameters: {future.exception()}')
logger.error(f'Error getting parameters: {get_future.exception()}')

logger.info('Loading parameters: ')
param_dir = get_package_share_directory('demo_nodes_py')
param_file_path = os.path.join(param_dir, 'params.yaml')
future = client.load_parameter_file(param_file_path)
rclpy.spin_until_future_complete(node, future)
load_parameter_results = future.result()
load_future = client.load_parameter_file(param_file_path)
rclpy.spin_until_future_complete(node, load_future)
load_parameter_results = load_future.result()
if load_parameter_results is not None:
param_file_dict = parameter_dict_from_yaml_file(
param_file_path, False, target_nodes=['parameter_blackboard'])
for i, v in enumerate(param_file_dict.keys()):
logger.info(f' {v}:')
for i, k in enumerate(param_file_dict.keys()):
logger.info(f' {k}:')
logger.info(f' successful: '
f'{load_parameter_results.results[i].successful}')
logger.info(f' value: '
f'{parameter_value_to_python(param_file_dict[v].value)}')
f'{parameter_value_to_python(param_file_dict[k].value)}')
else:
logger.error(f'Error loading parameters: {future.exception()}')
logger.error(f'Error loading parameters: {load_future.exception()}')

logger.info('Deleting parameters: ')
params_to_delete = ['other_int_parameter', 'other_string_parameter',
'string_parameter']
future = client.delete_parameters(params_to_delete)
rclpy.spin_until_future_complete(node, future)
delete_parameters_result = future.result()
del_future = client.delete_parameters(params_to_delete)
rclpy.spin_until_future_complete(node, del_future)
delete_parameters_result = del_future.result()
if delete_parameters_result is not None:
for i, v in enumerate(delete_parameters_result.results):
for i, del_param_result in enumerate(delete_parameters_result.results):
logger.info(f' {params_to_delete[i]}:')
logger.info(f' successful: {v.successful}')
logger.info(f' reason: {v.reason}')
logger.info(f' successful: {del_param_result.successful}')
logger.info(f' reason: {del_param_result.reason}')
else:
logger.error(f'Error deleting parameters: {future.exception()}')
logger.error(f'Error deleting parameters: {del_future.exception()}')
except (KeyboardInterrupt, ExternalShutdownException):
pass

Expand Down
21 changes: 16 additions & 5 deletions demo_nodes_py/demo_nodes_py/parameters/set_parameters_callback.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#!/usr/bin/env python3
# Copyright 2022 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -12,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Union

from rcl_interfaces.msg import SetParametersResult

import rclpy
Expand All @@ -26,9 +29,11 @@

# node for demonstrating correct usage of pre_set, on_set
# and post_set parameter callbacks


class SetParametersCallback(Node):

def __init__(self):
def __init__(self) -> None:
super().__init__('set_parameters_callback')

# tracks 'param1' value
Expand All @@ -38,7 +43,9 @@ def __init__(self):

# setting another parameter from the callback is possible
# we expect the callback to be called for param2
def pre_set_parameter_callback(parameter_list):
def pre_set_parameter_callback(
parameter_list: list[Parameter], # type: ignore[type-arg]
) -> list[Parameter]: # type: ignore[type-arg]
modified_parameters = parameter_list.copy()
for param in parameter_list:
if param.name == 'param1':
Expand All @@ -47,7 +54,9 @@ def pre_set_parameter_callback(parameter_list):
return modified_parameters

# validation callback
def on_set_parameter_callback(parameter_list):
def on_set_parameter_callback(
parameter_list: list[Parameter], # type: ignore[type-arg]
) -> SetParametersResult:
result = SetParametersResult()
for param in parameter_list:
if param.name == 'param1':
Expand All @@ -60,7 +69,9 @@ def on_set_parameter_callback(parameter_list):
return result

# can change internally tracked class attributes
def post_set_parameter_callback(parameter_list):
def post_set_parameter_callback(
parameter_list: list[Parameter], # type: ignore[type-arg]
) -> None:
for param in parameter_list:
if param.name == 'param1':
self.internal_tracked_param_1 = param.value
Expand All @@ -72,7 +83,7 @@ def post_set_parameter_callback(parameter_list):
self.add_post_set_parameters_callback(post_set_parameter_callback)


def main(args=None):
def main(args: Union[list[str], None] = None) -> None:
try:
with rclpy.init(args=args):
node = SetParametersCallback()
Expand Down
Loading