Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lifecycle_py/launch/lifecycle_demo_launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from launch import LaunchDescription
from launch import LaunchDescription # type: ignore[attr-defined]
from launch_ros.actions import LifecycleNode
from launch_ros.actions import Node


def generate_launch_description():
def generate_launch_description() -> LaunchDescription:
return LaunchDescription([
LifecycleNode(package='lifecycle_py', executable='lifecycle_talker',
name='lc_talker', namespace='', output='screen'),
Expand Down
27 changes: 19 additions & 8 deletions lifecycle_py/lifecycle_py/talker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Any
from typing import Optional

import example_interfaces.msg
Expand All @@ -31,17 +32,17 @@
from rclpy.timer import Timer


class LifecycleTalker(Node):
class LifecycleTalker(Node): # type: ignore[misc]
"""Our lifecycle talker node."""

def __init__(self, node_name, **kwargs):
def __init__(self, node_name: str, **kwargs: Any) -> None:
"""Construct the node."""
self._count: int = 0
self._pub: Optional[Publisher] = None
self._timer: Optional[Timer] = None
super().__init__(node_name, **kwargs)

def publish(self):
def publish(self) -> None:
"""Publish a new message when enabled."""
msg = example_interfaces.msg.String()
msg.data = 'Lifecycle HelloWorld #' + str(self._count)
Expand Down Expand Up @@ -114,8 +115,13 @@ def on_cleanup(self, state: State) -> TransitionCallbackReturn:
TransitionCallbackReturn.FAILURE transitions to "inactive".
TransitionCallbackReturn.ERROR or any uncaught exceptions to "errorprocessing"
"""
self.destroy_timer(self._timer)
self.destroy_publisher(self._pub)
if self._timer is not None:
self.destroy_timer(self._timer)
self._timer = None

if self._pub is not None:
self.destroy_publisher(self._pub)
self._pub = None

self.get_logger().info('on_cleanup() is called.')
return TransitionCallbackReturn.SUCCESS
Expand All @@ -133,8 +139,13 @@ def on_shutdown(self, state: State) -> TransitionCallbackReturn:
TransitionCallbackReturn.FAILURE transitions to "inactive".
TransitionCallbackReturn.ERROR or any uncaught exceptions to "errorprocessing"
"""
self.destroy_timer(self._timer)
self.destroy_publisher(self._pub)
if self._timer is not None:
self.destroy_timer(self._timer)
self._timer = None

if self._pub is not None:
self.destroy_publisher(self._pub)
self._pub = None

self.get_logger().info('on_shutdown() is called.')
return TransitionCallbackReturn.SUCCESS
Expand All @@ -144,7 +155,7 @@ def on_shutdown(self, state: State) -> TransitionCallbackReturn:
# as a regular node. This means we can spawn a
# node, give it a name and add it to the executor.

def main():
def main() -> None:
try:
with rclpy.init():
executor = SingleThreadedExecutor()
Expand Down
1 change: 1 addition & 0 deletions lifecycle_py/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>ament_xmllint</test_depend>
<test_depend>ament_mypy</test_depend>
<test_depend>lifecycle</test_depend>
<test_depend>ros_testing</test_depend>

Expand Down
2 changes: 1 addition & 1 deletion lifecycle_py/test/test_copyright.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@

@pytest.mark.copyright
@pytest.mark.linter
def test_copyright():
def test_copyright() -> None:
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found errors'
2 changes: 1 addition & 1 deletion lifecycle_py/test/test_flake8.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

@pytest.mark.flake8
@pytest.mark.linter
def test_flake8():
def test_flake8() -> None:
rc, errors = main_with_errors(argv=[])
assert rc == 0, \
'Found %d code style errors / warnings:\n' % len(errors) + \
Expand Down
14 changes: 10 additions & 4 deletions lifecycle_py/test/test_lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
# limitations under the License.

import re
from typing import Any, Dict, Tuple
import unittest

import launch
import launch.actions
import launch.event_handlers.on_process_start
import launch.events

import launch_ros.actions
import launch_ros.events
Expand All @@ -33,7 +35,9 @@


@pytest.mark.rostest
def generate_test_description():
def generate_test_description() -> Tuple[ # type: ignore[name-defined]
launch.LaunchDescription, Dict[str, Any]
]:
talker_node = launch_ros.actions.LifecycleNode(
package='lifecycle_py', executable='lifecycle_talker',
name='lc_talker', namespace='', output='screen'
Expand All @@ -42,7 +46,7 @@ def generate_test_description():
package='lifecycle', executable='lifecycle_listener',
name='listener', output='screen'
)
return launch.LaunchDescription([
return launch.LaunchDescription([ # type: ignore[attr-defined]
talker_node, listener_node,
# Right after the talker starts, make it take the 'configure' transition.
launch.actions.RegisterEventHandler(
Expand Down Expand Up @@ -120,7 +124,9 @@ def generate_test_description():

class TestLifecyclePubSub(unittest.TestCase):

def test_talker_lifecycle(self, proc_info, proc_output, talker_node, listener_node):
def test_talker_lifecycle(
self, proc_info: Any, proc_output: Any, talker_node: Any, listener_node: Any
) -> None:
"""Test lifecycle talker."""
proc_output.assertWaitFor('on_configure() is called', process=talker_node, timeout=5)
proc_output.assertWaitFor('on_activate() is called', process=talker_node, timeout=10)
Expand All @@ -140,6 +146,6 @@ def test_talker_lifecycle(self, proc_info, proc_output, talker_node, listener_no
@launch_testing.post_shutdown_test()
class TestLifecyclePubSubAfterShutdown(unittest.TestCase):

def test_talker_graceful_shutdown(self, proc_info, talker_node):
def test_talker_graceful_shutdown(self, proc_info: Any, talker_node: Any) -> None:
"""Test lifecycle talker graceful shutdown."""
launch_testing.asserts.assertExitCodes(proc_info, process=talker_node)
65 changes: 65 additions & 0 deletions lifecycle_py/test/test_mypy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Copyright 2019 Canonical, Ltd.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is fragile? if the package directory structure ever changes, or if a subdirectory is renamed, this test will silently skip files or fail for structural reasons rather than type errors. this should be replaced with the standard minimal form. IMO, if main(argv=[]) doesn't discover the right files in the ament test harness, that's an ament_mypy bug, not something to work around in every consumer package.

#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from pathlib import Path

from ament_mypy.main import main

import pytest


@pytest.mark.mypy
@pytest.mark.linter
def test_mypy() -> None:
def is_package_root(candidate: Path) -> bool:
return (
(candidate / 'package.xml').is_file()
and (candidate / 'setup.py').is_file()
and (candidate / 'lifecycle_py').is_dir()
and (candidate / 'launch').is_dir()
and (candidate / 'test').is_dir()
)

def find_package_root() -> Path:
here = Path(__file__).resolve()
cwd = Path.cwd().resolve()

candidates = [
here.parent.parent,
cwd,
cwd / 'lifecycle_py',
]

for candidate in candidates:
if is_package_root(candidate):
return candidate

# As a last resort, walk parents of __file__ and cwd.
for candidate in [*here.parents, *cwd.parents]:
if is_package_root(candidate):
return candidate

# Keep mypy scoped if no package root is detected.
return here.parent.parent

package_root = find_package_root()
paths_to_check = [
str(package_root / 'lifecycle_py'),
str(package_root / 'launch'),
str(package_root / 'test'),
str(package_root / 'setup.py'),
]

rc = main(argv=paths_to_check)
assert rc == 0, 'Found code style errors / warnings'
2 changes: 1 addition & 1 deletion lifecycle_py/test/test_pep257.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@

@pytest.mark.linter
@pytest.mark.pep257
def test_pep257():
def test_pep257() -> None:
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found code style errors / warnings'