Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lifecycle_py/launch/lifecycle_demo_launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from launch_ros.actions import Node


def generate_launch_description():
def generate_launch_description() -> LaunchDescription:
return LaunchDescription([
LifecycleNode(package='lifecycle_py', executable='lifecycle_talker',
name='lc_talker', namespace='', output='screen'),
Expand Down
28 changes: 18 additions & 10 deletions lifecycle_py/lifecycle_py/talker.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import rclpy
from rclpy.executors import ExternalShutdownException
from rclpy.executors import SingleThreadedExecutor

# Node, State and Publisher are aliases for LifecycleNode, LifecycleState and LifecyclePublisher
# respectively.
# In case of ambiguity, the more explicit names can be imported.
Expand All @@ -28,20 +27,25 @@
from rclpy.lifecycle import Publisher
from rclpy.lifecycle import State
from rclpy.lifecycle import TransitionCallbackReturn
from rclpy.lifecycle.node import LifecycleNodeArgs
from rclpy.timer import Timer
from typing_extensions import Unpack


class LifecycleTalker(Node):
"""Our lifecycle talker node."""

def __init__(self, node_name, **kwargs):
def __init__(self,
node_name: str,
**kwargs: Unpack[LifecycleNodeArgs] # type: ignore
) -> None:
"""Construct the node."""
self._count: int = 0
self._pub: Optional[Publisher] = None
self._pub: Optional[Publisher[example_interfaces.msg.String]] = None
self._timer: Optional[Timer] = None
super().__init__(node_name, **kwargs)

def publish(self):
def publish(self) -> None:
"""Publish a new message when enabled."""
msg = example_interfaces.msg.String()
msg.data = 'Lifecycle HelloWorld #' + str(self._count)
Expand Down Expand Up @@ -74,7 +78,7 @@ def on_configure(self, state: State) -> TransitionCallbackReturn:
TransitionCallbackReturn.ERROR or any uncaught exceptions to "errorprocessing"
"""
self._pub = self.create_lifecycle_publisher(
example_interfaces.msg.String, 'lifecycle_chatter', 10)
example_interfaces.msg.String, 'lifecycle_chatter', 10)
self._timer = self.create_timer(1.0, self.publish)

self.get_logger().info('on_configure() is called.')
Expand Down Expand Up @@ -114,8 +118,10 @@ def on_cleanup(self, state: State) -> TransitionCallbackReturn:
TransitionCallbackReturn.FAILURE transitions to "inactive".
TransitionCallbackReturn.ERROR or any uncaught exceptions to "errorprocessing"
"""
self.destroy_timer(self._timer)
self.destroy_publisher(self._pub)
if self._timer is not None:
self.destroy_timer(self._timer)
if self._pub is not None:
self.destroy_publisher(self._pub)

self.get_logger().info('on_cleanup() is called.')
return TransitionCallbackReturn.SUCCESS
Expand All @@ -133,8 +139,10 @@ def on_shutdown(self, state: State) -> TransitionCallbackReturn:
TransitionCallbackReturn.FAILURE transitions to "inactive".
TransitionCallbackReturn.ERROR or any uncaught exceptions to "errorprocessing"
"""
self.destroy_timer(self._timer)
self.destroy_publisher(self._pub)
if self._timer is not None:
self.destroy_timer(self._timer)
if self._pub is not None:
self.destroy_publisher(self._pub)

self.get_logger().info('on_shutdown() is called.')
return TransitionCallbackReturn.SUCCESS
Expand All @@ -144,7 +152,7 @@ def on_shutdown(self, state: State) -> TransitionCallbackReturn:
# as a regular node. This means we can spawn a
# node, give it a name and add it to the executor.

def main():
def main() -> None:
try:
with rclpy.init():
executor = SingleThreadedExecutor()
Expand Down
1 change: 1 addition & 0 deletions lifecycle_py/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
<test_depend>ament_xmllint</test_depend>
<test_depend>lifecycle</test_depend>
<test_depend>ros_testing</test_depend>
<test_depend>ament_mypy</test_depend>

<export>
<build_type>ament_python</build_type>
Expand Down
2 changes: 1 addition & 1 deletion lifecycle_py/test/test_copyright.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@

@pytest.mark.copyright
@pytest.mark.linter
def test_copyright():
def test_copyright() -> None:
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found errors'
2 changes: 1 addition & 1 deletion lifecycle_py/test/test_flake8.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

@pytest.mark.flake8
@pytest.mark.linter
def test_flake8():
def test_flake8() -> None:
rc, errors = main_with_errors(argv=[])
assert rc == 0, \
'Found %d code style errors / warnings:\n' % len(errors) + \
Expand Down
15 changes: 12 additions & 3 deletions lifecycle_py/test/test_lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,32 @@
# limitations under the License.

import re
from typing import Any
import unittest

import launch

import launch.actions
import launch.event_handlers.on_process_start

import launch_ros.actions

import launch_ros.events
import launch_ros.events.lifecycle

import launch_testing
import launch_testing.actions
import launch_testing.asserts
from launch_testing.io_handler import ActiveIoHandler
from launch_testing.proc_info_handler import ActiveProcInfoHandler

import lifecycle_msgs.msg

import pytest


@pytest.mark.rostest
def generate_test_description():
def generate_test_description() -> tuple[launch.LaunchDescription, dict[str, Any]]:
talker_node = launch_ros.actions.LifecycleNode(
package='lifecycle_py', executable='lifecycle_talker',
name='lc_talker', namespace='', output='screen'
Expand Down Expand Up @@ -120,7 +125,10 @@ def generate_test_description():

class TestLifecyclePubSub(unittest.TestCase):

def test_talker_lifecycle(self, proc_info, proc_output, talker_node, listener_node):
def test_talker_lifecycle(self, proc_info: ActiveProcInfoHandler,
proc_output: ActiveIoHandler,
talker_node: launch_ros.actions.LifecycleNode,
listener_node: launch_ros.actions.Node) -> None:
"""Test lifecycle talker."""
proc_output.assertWaitFor('on_configure() is called', process=talker_node, timeout=5)
proc_output.assertWaitFor('on_activate() is called', process=talker_node, timeout=10)
Expand All @@ -140,6 +148,7 @@ def test_talker_lifecycle(self, proc_info, proc_output, talker_node, listener_no
@launch_testing.post_shutdown_test()
class TestLifecyclePubSubAfterShutdown(unittest.TestCase):

def test_talker_graceful_shutdown(self, proc_info, talker_node):
def test_talker_graceful_shutdown(self, proc_info: ActiveProcInfoHandler,
talker_node: launch_ros.actions.LifecycleNode) -> None:
"""Test lifecycle talker graceful shutdown."""
launch_testing.asserts.assertExitCodes(proc_info, process=talker_node)
21 changes: 21 additions & 0 deletions lifecycle_py/test/test_mypy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Copyright 2019 Canonical, Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from ament_mypy.main import main


def test_mypy() -> None:
rc = main(argv=[])
assert rc == 0, 'Found code style errors / warnings'
2 changes: 1 addition & 1 deletion lifecycle_py/test/test_pep257.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@

@pytest.mark.linter
@pytest.mark.pep257
def test_pep257():
def test_pep257() -> None:
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found code style errors / warnings'