Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions blinkpy/livestream.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,12 @@ async def recv(self):
_LOGGER.debug("Starting copy from target to clients")
while not self.target_reader.at_eof():
# Read header from the target server
data = await self.target_reader.read(9)

# Check if we have enough data for the header
if len(data) < 9:
try:
data = await self.target_reader.readexactly(9)
except asyncio.IncompleteReadError as err:
_LOGGER.warning(
"Insufficient data for header: %d bytes, expected 9",
len(data),
len(err.partial),
)
break

Expand All @@ -208,13 +207,12 @@ async def recv(self):
continue

# Read payload from the target server
data = await self.target_reader.read(payload_length)

# Check if we have enough data for the payload
if len(data) < payload_length:
try:
data = await self.target_reader.readexactly(payload_length)
except asyncio.IncompleteReadError as err:
_LOGGER.warning(
"Insufficient data for payload: %d bytes, expected %d",
len(data),
len(err.partial),
payload_length,
)
break
Expand Down
51 changes: 26 additions & 25 deletions tests/test_livestream.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Tests for BlinkLiveStream class."""

import asyncio
import ssl
import urllib.parse
from unittest import mock
Expand Down Expand Up @@ -259,8 +260,8 @@ async def test_join_client_connect_disconnect(self, mock_resp):
mock_writer = mock.Mock()

# Mock client reading data then disconnecting
mock_reader.read = mock.AsyncMock()
mock_reader.read.side_effect = [b"test_data", b""]
mock_reader.readexactly = mock.AsyncMock()
mock_reader.readexactly.side_effect = [b"test_data", b""]
mock_writer.is_closing.return_value = False

# Start the join coroutine
Expand All @@ -276,8 +277,8 @@ async def test_join_connection_reset(self, mock_resp):
mock_writer = mock.Mock()

# Mock connection reset error
mock_reader.read = mock.AsyncMock()
mock_reader.read.side_effect = ConnectionResetError()
mock_reader.readexactly = mock.AsyncMock()
mock_reader.readexactly.side_effect = ConnectionResetError()
mock_writer.is_closing.return_value = False

with mock.patch.object(self.livestream, "stop") as mock_stop:
Expand All @@ -294,8 +295,8 @@ async def test_join_general_exception_logging(self, mock_resp):
mock_writer = mock.Mock()

# Mock general exception (not ConnectionResetError)
mock_reader.read = mock.AsyncMock()
mock_reader.read.side_effect = ValueError("Test join exception")
mock_reader.readexactly = mock.AsyncMock()
mock_reader.readexactly.side_effect = ValueError("Test join exception")
mock_writer.is_closing.return_value = False

with (
Expand Down Expand Up @@ -335,8 +336,8 @@ async def test_recv_valid_packet(self, mock_resp):
# Mock payload starting with 0x47 (transport stream packet start)
payload_data = bytearray([0x47] + [0x00] * 187) # 188 bytes total

mock_reader.read = mock.AsyncMock()
mock_reader.read.side_effect = [header_data, payload_data, b""]
mock_reader.readexactly = mock.AsyncMock()
mock_reader.readexactly.side_effect = [header_data, payload_data, asyncio.IncompleteReadError(b"", 9)]
mock_reader.at_eof.side_effect = [False, False, True]
mock_client.is_closing.return_value = False
mock_client.write = mock.Mock()
Expand Down Expand Up @@ -374,8 +375,8 @@ async def test_recv_invalid_msgtype(self, mock_resp):

payload_data = bytearray([0x47] + [0x00] * 187) # 188 bytes total

mock_reader.read = mock.AsyncMock()
mock_reader.read.side_effect = [header_data_invalid, payload_data, b""]
mock_reader.readexactly = mock.AsyncMock()
mock_reader.readexactly.side_effect = [header_data_invalid, payload_data, asyncio.IncompleteReadError(b"", 9)]
mock_reader.at_eof.side_effect = [False, False, True]

self.livestream.target_reader = mock_reader
Expand All @@ -393,8 +394,8 @@ async def test_recv_incomplete_header(self, mock_resp):
mock_client = mock.Mock()

# Simulate reading incomplete header
mock_reader.read = mock.AsyncMock()
mock_reader.read.side_effect = [b"short", b""]
mock_reader.readexactly = mock.AsyncMock()
mock_reader.readexactly.side_effect = [asyncio.IncompleteReadError(b"short", 9)]
mock_reader.at_eof.side_effect = [False, True]

self.livestream.target_reader = mock_reader
Expand Down Expand Up @@ -446,8 +447,8 @@ async def test_recv_empty_payload_skipped(self, mock_resp):
]
)

mock_reader.read = mock.AsyncMock()
mock_reader.read.side_effect = [header_data_empty, header_data, b"short", b""]
mock_reader.readexactly = mock.AsyncMock()
mock_reader.readexactly.side_effect = [header_data_empty, header_data, asyncio.IncompleteReadError(b"short", 188)]
mock_reader.at_eof.side_effect = [False, False, True]

self.livestream.target_reader = mock_reader
Expand All @@ -462,7 +463,7 @@ async def test_recv_empty_payload_skipped(self, mock_resp):
mock_logger.assert_called_once()

# Verify that the first payload read was skipped (empty payload)
self.assertEqual(mock_reader.read.call_count, 3) # odd number of reads
self.assertEqual(mock_reader.readexactly.call_count, 3) # odd number of reads

# Verify no data was written to client (incomplete header)
mock_client.write.assert_not_called()
Expand Down Expand Up @@ -490,8 +491,8 @@ async def test_recv_invalid_stream_marker(self, mock_resp):
# Mock payload starting with 0x42 (invalid transport stream packet start)
payload_data = bytearray([0x42] + [0x00] * 187) # 188 bytes total

mock_reader.read = mock.AsyncMock()
mock_reader.read.side_effect = [header_data, payload_data, b""]
mock_reader.readexactly = mock.AsyncMock()
mock_reader.readexactly.side_effect = [header_data, payload_data, asyncio.IncompleteReadError(b"", 9)]
mock_reader.at_eof.side_effect = [False, False, True]
mock_client.is_closing.return_value = False
mock_client.write = mock.Mock()
Expand Down Expand Up @@ -612,8 +613,8 @@ async def test_recv_ssl_error(self, mock_resp):
# Mock SSL error
ssl_error = ssl.SSLError()
ssl_error.reason = "APPLICATION_DATA_AFTER_CLOSE_NOTIFY"
mock_reader.read = mock.AsyncMock()
mock_reader.read.side_effect = ssl_error
mock_reader.readexactly = mock.AsyncMock()
mock_reader.readexactly.side_effect = ssl_error
mock_reader.at_eof.return_value = False

self.livestream.target_reader = mock_reader
Expand All @@ -634,8 +635,8 @@ async def test_recv_ssl_error_other_reason(self, mock_resp):
# Mock SSL error with different reason
ssl_error = ssl.SSLError()
ssl_error.reason = "SOME_OTHER_SSL_ERROR"
mock_reader.read = mock.AsyncMock()
mock_reader.read.side_effect = ssl_error
mock_reader.readexactly = mock.AsyncMock()
mock_reader.readexactly.side_effect = ssl_error
mock_reader.at_eof.return_value = False

self.livestream.target_reader = mock_reader
Expand All @@ -658,8 +659,8 @@ async def test_recv_exception_logging(self, mock_resp):
mock_writer = mock.Mock()

# Mock general exception
mock_reader.read = mock.AsyncMock()
mock_reader.read.side_effect = Exception("Test exception")
mock_reader.readexactly = mock.AsyncMock()
mock_reader.readexactly.side_effect = Exception("Test exception")
mock_reader.at_eof.return_value = False

self.livestream.target_reader = mock_reader
Expand All @@ -681,8 +682,8 @@ async def test_recv_timeout_exception(self, mock_resp):
mock_writer = mock.Mock()

# Mock asyncio timeout exception
mock_reader.read = mock.AsyncMock()
mock_reader.read.side_effect = TimeoutError()
mock_reader.readexactly = mock.AsyncMock()
mock_reader.readexactly.side_effect = TimeoutError()
mock_reader.at_eof.return_value = False

self.livestream.target_reader = mock_reader
Expand Down
Loading