From e3029ea47c9d4271bd654c77676a4f8b3e99a3cb Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Thu, 29 Oct 2020 09:38:08 +0200 Subject: [PATCH 1/8] Handle ZMQError Address already in use --- posttroll/publisher.py | 17 ++++++++++++++++- posttroll/tests/test_pubsub.py | 33 +++++++++++++++++++++++++-------- 2 files changed, 41 insertions(+), 9 deletions(-) diff --git a/posttroll/publisher.py b/posttroll/publisher.py index 501e0fd..28a460a 100644 --- a/posttroll/publisher.py +++ b/posttroll/publisher.py @@ -28,6 +28,8 @@ import socket from datetime import datetime, timedelta from threading import Lock +import time + from six.moves.urllib.parse import urlsplit, urlunsplit import six import zmq @@ -37,6 +39,7 @@ from posttroll.message_broadcaster import sendaddressservice LOGGER = logging.getLogger(__name__) +BIND_RETRIES = 5 def get_own_ip(): @@ -121,7 +124,7 @@ def __init__(self, address, name="", min_port=None, max_port=None): self.destination = urlunsplit((u__.scheme, netloc, u__.path, u__.query, u__.fragment)) else: - self.publish.bind(self.destination) + self._bind_destination() self.port_number = port LOGGER.info("publisher started on port %s", str(self.port_number)) @@ -130,6 +133,18 @@ def __init__(self, address, name="", min_port=None, max_port=None): self._heartbeat = None self._pub_lock = Lock() + def _bind_destination(self): + """Bind publish destination.""" + last_error = "" + for _ in range(BIND_RETRIES): + try: + self.publish.bind(self.destination) + return + except zmq.error.ZMQError as err: + last_error = err.strerror + time.sleep(0.01) + raise OSError("Could not bind %s - %s" % (self.destination, last_error)) + def send(self, msg): """Send the given message. """ diff --git a/posttroll/tests/test_pubsub.py b/posttroll/tests/test_pubsub.py index 0b842df..b1cb633 100644 --- a/posttroll/tests/test_pubsub.py +++ b/posttroll/tests/test_pubsub.py @@ -1,27 +1,27 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- - +# # Copyright (c) 2014 Martin Raspaud - +# # Author(s): - +# # Martin Raspaud - +# Panu Lahtinen +# # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. - +# # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. - +# # You should have received a copy of the GNU General Public License # along with this program. If not, see . -"""Test the publishing and subscribing facilities. -""" +"""Test the publishing and subscribing facilities.""" import unittest from unittest import mock from datetime import timedelta @@ -314,6 +314,23 @@ def test_pub_minmax_port(self): self.assertEqual(res, port) break + @mock.patch("posttroll.publisher.get_context") + def test_bind_retries(self, get_context): + """Test that the destination bind is retried on failure.""" + from zmq.error import ZMQError + from posttroll.publisher import Publish, BIND_RETRIES + context = mock.MagicMock() + context.bind.side_effect = ZMQError("mocked failure") + get_context.return_value.socket.return_value = context + + try: + with Publish("test_bind_retries", port=50000) as pub: + pass + raise AssertionError("OSError not raised") + except OSError: + pass + assert context.bind.call_count == BIND_RETRIES + def _get_port(min_port=None, max_port=None): from zmq.error import ZMQError From 1c39e57fba2a1b50acd6bb8ff25adc253c88e9ba Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Thu, 29 Oct 2020 09:46:19 +0200 Subject: [PATCH 2/8] Remove unused variable --- posttroll/tests/test_pubsub.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/posttroll/tests/test_pubsub.py b/posttroll/tests/test_pubsub.py index b1cb633..5453e78 100644 --- a/posttroll/tests/test_pubsub.py +++ b/posttroll/tests/test_pubsub.py @@ -279,7 +279,7 @@ def test_pub_unicode(self): from posttroll.publisher import Publish message = Message("/pџтяöll", "info", 'hej') - with Publish("a_service", 9000) as pub: + with Publish("a_service", 9000): try: pub.send(message.encode()) except UnicodeDecodeError: From 25f791b9b468abbb60613266601f982001927f2d Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Thu, 29 Oct 2020 09:51:16 +0200 Subject: [PATCH 3/8] Revert publisher context removal --- posttroll/tests/test_pubsub.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/posttroll/tests/test_pubsub.py b/posttroll/tests/test_pubsub.py index 5453e78..b1cb633 100644 --- a/posttroll/tests/test_pubsub.py +++ b/posttroll/tests/test_pubsub.py @@ -279,7 +279,7 @@ def test_pub_unicode(self): from posttroll.publisher import Publish message = Message("/pџтяöll", "info", 'hej') - with Publish("a_service", 9000): + with Publish("a_service", 9000) as pub: try: pub.send(message.encode()) except UnicodeDecodeError: From 254c58e396b41d1b1b62cbb612f90e785d6a9ed1 Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Fri, 30 Oct 2020 07:28:08 +0200 Subject: [PATCH 4/8] Make bind retries configurable via env variables --- posttroll/publisher.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/posttroll/publisher.py b/posttroll/publisher.py index 28a460a..af5ed87 100644 --- a/posttroll/publisher.py +++ b/posttroll/publisher.py @@ -39,7 +39,8 @@ from posttroll.message_broadcaster import sendaddressservice LOGGER = logging.getLogger(__name__) -BIND_RETRIES = 5 +BIND_RETRIES = int(os.environ.get("PYTROLL_BIND_RETRIES", 5)) +BIND_RETRY_TIMEOUT = float(os.environ.get("PYTROLL_BIND_RETRY_TIMEOUT", 0.1)) def get_own_ip(): @@ -142,7 +143,7 @@ def _bind_destination(self): return except zmq.error.ZMQError as err: last_error = err.strerror - time.sleep(0.01) + time.sleep(BIND_RETRY_TIMEOUT) raise OSError("Could not bind %s - %s" % (self.destination, last_error)) def send(self, msg): From 6745515c850825e3ec6299950b7d1ed4cd0d235e Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Tue, 3 Nov 2020 14:03:05 +0200 Subject: [PATCH 5/8] Remove unused pub context --- posttroll/tests/test_pubsub.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/posttroll/tests/test_pubsub.py b/posttroll/tests/test_pubsub.py index b1cb633..3f590db 100644 --- a/posttroll/tests/test_pubsub.py +++ b/posttroll/tests/test_pubsub.py @@ -324,7 +324,7 @@ def test_bind_retries(self, get_context): get_context.return_value.socket.return_value = context try: - with Publish("test_bind_retries", port=50000) as pub: + with Publish("test_bind_retries", port=50000): pass raise AssertionError("OSError not raised") except OSError: From 8cedf83c2706bfba991e7a4f4ed17e0aa0a8efbc Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Tue, 3 Nov 2020 14:11:55 +0200 Subject: [PATCH 6/8] Use pytest.raises to chech that OSError is raised --- posttroll/tests/test_pubsub.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/posttroll/tests/test_pubsub.py b/posttroll/tests/test_pubsub.py index 3f590db..60bc631 100644 --- a/posttroll/tests/test_pubsub.py +++ b/posttroll/tests/test_pubsub.py @@ -29,6 +29,7 @@ import time import six +import pytest test_lock = Lock() @@ -323,12 +324,9 @@ def test_bind_retries(self, get_context): context.bind.side_effect = ZMQError("mocked failure") get_context.return_value.socket.return_value = context - try: + with pytest.raises(OSError): with Publish("test_bind_retries", port=50000): pass - raise AssertionError("OSError not raised") - except OSError: - pass assert context.bind.call_count == BIND_RETRIES From df3ed3a41df0eecf4947f5b60ffa05e5eb6bd354 Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Tue, 3 Nov 2020 14:15:31 +0200 Subject: [PATCH 7/8] Test that bind failure and port number are in the raised error message --- posttroll/publisher.py | 2 +- posttroll/tests/test_pubsub.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/posttroll/publisher.py b/posttroll/publisher.py index af5ed87..d454789 100644 --- a/posttroll/publisher.py +++ b/posttroll/publisher.py @@ -39,7 +39,7 @@ from posttroll.message_broadcaster import sendaddressservice LOGGER = logging.getLogger(__name__) -BIND_RETRIES = int(os.environ.get("PYTROLL_BIND_RETRIES", 5)) +BIND_RETRIES = max(0, int(os.environ.get("PYTROLL_BIND_RETRIES", 5))) BIND_RETRY_TIMEOUT = float(os.environ.get("PYTROLL_BIND_RETRY_TIMEOUT", 0.1)) diff --git a/posttroll/tests/test_pubsub.py b/posttroll/tests/test_pubsub.py index 60bc631..1df37bf 100644 --- a/posttroll/tests/test_pubsub.py +++ b/posttroll/tests/test_pubsub.py @@ -324,10 +324,12 @@ def test_bind_retries(self, get_context): context.bind.side_effect = ZMQError("mocked failure") get_context.return_value.socket.return_value = context - with pytest.raises(OSError): + with pytest.raises(OSError) as err: with Publish("test_bind_retries", port=50000): pass assert context.bind.call_count == BIND_RETRIES + assert "Could not bind" in err.value.args[0] + assert "50000" in err.value.args[0] def _get_port(min_port=None, max_port=None): From a6bf18cecad3a1efa8ea1ed61d0d143a9ffc3339 Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Fri, 27 Nov 2020 10:07:58 +0200 Subject: [PATCH 8/8] Add tests, fix number of retries --- posttroll/publisher.py | 2 +- posttroll/tests/test_pubsub.py | 38 +++++++++++++++++++++++++++++++++- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/posttroll/publisher.py b/posttroll/publisher.py index d454789..36276e8 100644 --- a/posttroll/publisher.py +++ b/posttroll/publisher.py @@ -137,7 +137,7 @@ def __init__(self, address, name="", min_port=None, max_port=None): def _bind_destination(self): """Bind publish destination.""" last_error = "" - for _ in range(BIND_RETRIES): + for _ in range(BIND_RETRIES + 1): try: self.publish.bind(self.destination) return diff --git a/posttroll/tests/test_pubsub.py b/posttroll/tests/test_pubsub.py index 1df37bf..bbc9f35 100644 --- a/posttroll/tests/test_pubsub.py +++ b/posttroll/tests/test_pubsub.py @@ -327,10 +327,44 @@ def test_bind_retries(self, get_context): with pytest.raises(OSError) as err: with Publish("test_bind_retries", port=50000): pass - assert context.bind.call_count == BIND_RETRIES + assert context.bind.call_count == BIND_RETRIES + 1 assert "Could not bind" in err.value.args[0] assert "50000" in err.value.args[0] + @mock.patch("posttroll.publisher.BIND_RETRIES", 0) + @mock.patch("posttroll.publisher.get_context") + def test_bind_no_retries(self, get_context): + """Test that the destination bind retries can be turned off.""" + from zmq.error import ZMQError + from posttroll.publisher import Publish, BIND_RETRIES + # Just ensure the mock sets the variable correctly + assert BIND_RETRIES == 0 + context = mock.MagicMock() + context.bind.side_effect = ZMQError("mocked failure") + get_context.return_value.socket.return_value = context + + with pytest.raises(OSError) as err: + with Publish("test_bind_retries", port=50000): + pass + assert context.bind.call_count == 1 + + def test_bind_retries_env_variable(self): + """Test that the retry count env variable is handled correctly.""" + import os + os.environ["PYTROLL_BIND_RETRIES"] = "-1" + from posttroll.publisher import BIND_RETRIES + + assert BIND_RETRIES == 0 + + def test_bind_retry_timeout_env_variable(self): + """Test that the retry timeout env variable is handled correctly.""" + import os + val = 0.3 + os.environ["PYTROLL_BIND_RETRY_TIMEOUT"] = str(val) + from posttroll.publisher import BIND_RETRY_TIMEOUT + + assert BIND_RETRY_TIMEOUT == val + def _get_port(min_port=None, max_port=None): from zmq.error import ZMQError @@ -417,4 +451,6 @@ def suite(): mysuite.addTest(loader.loadTestsFromTestCase(TestListenerContainer)) mysuite.addTest(loader.loadTestsFromTestCase(TestPub)) mysuite.addTest(loader.loadTestsFromTestCase(TestAddressReceiver)) + mysuite.addTest(loader.loadTestsFromTestCase(TestBindRetryEnvVariable)) + mysuite.addTest(loader.loadTestsFromTestCase(TestBindRetryTimeoutEnvVariable)) return mysuite