diff --git a/blinkpy/livestream.py b/blinkpy/livestream.py index 25c4429e..25c55965 100644 --- a/blinkpy/livestream.py +++ b/blinkpy/livestream.py @@ -181,13 +181,12 @@ async def recv(self): _LOGGER.debug("Starting copy from target to clients") while not self.target_reader.at_eof(): # Read header from the target server - data = await self.target_reader.read(9) - - # Check if we have enough data for the header - if len(data) < 9: + try: + data = await self.target_reader.readexactly(9) + except asyncio.IncompleteReadError as err: _LOGGER.warning( "Insufficient data for header: %d bytes, expected 9", - len(data), + len(err.partial), ) break @@ -208,13 +207,12 @@ async def recv(self): continue # Read payload from the target server - data = await self.target_reader.read(payload_length) - - # Check if we have enough data for the payload - if len(data) < payload_length: + try: + data = await self.target_reader.readexactly(payload_length) + except asyncio.IncompleteReadError as err: _LOGGER.warning( "Insufficient data for payload: %d bytes, expected %d", - len(data), + len(err.partial), payload_length, ) break diff --git a/tests/test_livestream.py b/tests/test_livestream.py index b3e845cf..98a4c2b9 100644 --- a/tests/test_livestream.py +++ b/tests/test_livestream.py @@ -1,5 +1,6 @@ """Tests for BlinkLiveStream class.""" +import asyncio import ssl import urllib.parse from unittest import mock @@ -259,8 +260,8 @@ async def test_join_client_connect_disconnect(self, mock_resp): mock_writer = mock.Mock() # Mock client reading data then disconnecting - mock_reader.read = mock.AsyncMock() - mock_reader.read.side_effect = [b"test_data", b""] + mock_reader.readexactly = mock.AsyncMock() + mock_reader.readexactly.side_effect = [b"test_data", b""] mock_writer.is_closing.return_value = False # Start the join coroutine @@ -276,8 +277,8 @@ async def test_join_connection_reset(self, mock_resp): mock_writer = mock.Mock() # Mock connection reset error - mock_reader.read = mock.AsyncMock() - mock_reader.read.side_effect = ConnectionResetError() + mock_reader.readexactly = mock.AsyncMock() + mock_reader.readexactly.side_effect = ConnectionResetError() mock_writer.is_closing.return_value = False with mock.patch.object(self.livestream, "stop") as mock_stop: @@ -294,8 +295,8 @@ async def test_join_general_exception_logging(self, mock_resp): mock_writer = mock.Mock() # Mock general exception (not ConnectionResetError) - mock_reader.read = mock.AsyncMock() - mock_reader.read.side_effect = ValueError("Test join exception") + mock_reader.readexactly = mock.AsyncMock() + mock_reader.readexactly.side_effect = ValueError("Test join exception") mock_writer.is_closing.return_value = False with ( @@ -335,8 +336,8 @@ async def test_recv_valid_packet(self, mock_resp): # Mock payload starting with 0x47 (transport stream packet start) payload_data = bytearray([0x47] + [0x00] * 187) # 188 bytes total - mock_reader.read = mock.AsyncMock() - mock_reader.read.side_effect = [header_data, payload_data, b""] + mock_reader.readexactly = mock.AsyncMock() + mock_reader.readexactly.side_effect = [header_data, payload_data, asyncio.IncompleteReadError(b"", 9)] mock_reader.at_eof.side_effect = [False, False, True] mock_client.is_closing.return_value = False mock_client.write = mock.Mock() @@ -374,8 +375,8 @@ async def test_recv_invalid_msgtype(self, mock_resp): payload_data = bytearray([0x47] + [0x00] * 187) # 188 bytes total - mock_reader.read = mock.AsyncMock() - mock_reader.read.side_effect = [header_data_invalid, payload_data, b""] + mock_reader.readexactly = mock.AsyncMock() + mock_reader.readexactly.side_effect = [header_data_invalid, payload_data, asyncio.IncompleteReadError(b"", 9)] mock_reader.at_eof.side_effect = [False, False, True] self.livestream.target_reader = mock_reader @@ -393,8 +394,8 @@ async def test_recv_incomplete_header(self, mock_resp): mock_client = mock.Mock() # Simulate reading incomplete header - mock_reader.read = mock.AsyncMock() - mock_reader.read.side_effect = [b"short", b""] + mock_reader.readexactly = mock.AsyncMock() + mock_reader.readexactly.side_effect = [asyncio.IncompleteReadError(b"short", 9)] mock_reader.at_eof.side_effect = [False, True] self.livestream.target_reader = mock_reader @@ -446,8 +447,8 @@ async def test_recv_empty_payload_skipped(self, mock_resp): ] ) - mock_reader.read = mock.AsyncMock() - mock_reader.read.side_effect = [header_data_empty, header_data, b"short", b""] + mock_reader.readexactly = mock.AsyncMock() + mock_reader.readexactly.side_effect = [header_data_empty, header_data, asyncio.IncompleteReadError(b"short", 188)] mock_reader.at_eof.side_effect = [False, False, True] self.livestream.target_reader = mock_reader @@ -462,7 +463,7 @@ async def test_recv_empty_payload_skipped(self, mock_resp): mock_logger.assert_called_once() # Verify that the first payload read was skipped (empty payload) - self.assertEqual(mock_reader.read.call_count, 3) # odd number of reads + self.assertEqual(mock_reader.readexactly.call_count, 3) # odd number of reads # Verify no data was written to client (incomplete header) mock_client.write.assert_not_called() @@ -490,8 +491,8 @@ async def test_recv_invalid_stream_marker(self, mock_resp): # Mock payload starting with 0x42 (invalid transport stream packet start) payload_data = bytearray([0x42] + [0x00] * 187) # 188 bytes total - mock_reader.read = mock.AsyncMock() - mock_reader.read.side_effect = [header_data, payload_data, b""] + mock_reader.readexactly = mock.AsyncMock() + mock_reader.readexactly.side_effect = [header_data, payload_data, asyncio.IncompleteReadError(b"", 9)] mock_reader.at_eof.side_effect = [False, False, True] mock_client.is_closing.return_value = False mock_client.write = mock.Mock() @@ -612,8 +613,8 @@ async def test_recv_ssl_error(self, mock_resp): # Mock SSL error ssl_error = ssl.SSLError() ssl_error.reason = "APPLICATION_DATA_AFTER_CLOSE_NOTIFY" - mock_reader.read = mock.AsyncMock() - mock_reader.read.side_effect = ssl_error + mock_reader.readexactly = mock.AsyncMock() + mock_reader.readexactly.side_effect = ssl_error mock_reader.at_eof.return_value = False self.livestream.target_reader = mock_reader @@ -634,8 +635,8 @@ async def test_recv_ssl_error_other_reason(self, mock_resp): # Mock SSL error with different reason ssl_error = ssl.SSLError() ssl_error.reason = "SOME_OTHER_SSL_ERROR" - mock_reader.read = mock.AsyncMock() - mock_reader.read.side_effect = ssl_error + mock_reader.readexactly = mock.AsyncMock() + mock_reader.readexactly.side_effect = ssl_error mock_reader.at_eof.return_value = False self.livestream.target_reader = mock_reader @@ -658,8 +659,8 @@ async def test_recv_exception_logging(self, mock_resp): mock_writer = mock.Mock() # Mock general exception - mock_reader.read = mock.AsyncMock() - mock_reader.read.side_effect = Exception("Test exception") + mock_reader.readexactly = mock.AsyncMock() + mock_reader.readexactly.side_effect = Exception("Test exception") mock_reader.at_eof.return_value = False self.livestream.target_reader = mock_reader @@ -681,8 +682,8 @@ async def test_recv_timeout_exception(self, mock_resp): mock_writer = mock.Mock() # Mock asyncio timeout exception - mock_reader.read = mock.AsyncMock() - mock_reader.read.side_effect = TimeoutError() + mock_reader.readexactly = mock.AsyncMock() + mock_reader.readexactly.side_effect = TimeoutError() mock_reader.at_eof.return_value = False self.livestream.target_reader = mock_reader