Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions blinkpy/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,50 @@ async def request_syncmodule(blink, network):
return await http_get(blink, url)


async def request_programs(blink, network):
"""
Request all programs (schedules) for a network.

:param blink: Blink instance.
:param network: Sync module network id.
"""
url = (
f"{blink.urls.base_url}/api/v1/accounts/{blink.account_id}"
f"/networks/{network}/programs"
)
return await http_get(blink, url)


async def request_program_enable(blink, network, program_id):
"""
Enable a program (schedule) on a network.

:param blink: Blink instance.
:param network: Sync module network id.
:param program_id: ID of the program to enable.
"""
url = (
f"{blink.urls.base_url}/api/v1/accounts/{blink.account_id}"
f"/networks/{network}/programs/{program_id}/enable"
)
return await http_post(blink, url)


async def request_program_disable(blink, network, program_id):
"""
Disable a program (schedule) on a network.

:param blink: Blink instance.
:param network: Sync module network id.
:param program_id: ID of the program to disable.
"""
url = (
f"{blink.urls.base_url}/api/v1/accounts/{blink.account_id}"
f"/networks/{network}/programs/{program_id}/disable"
)
return await http_post(blink, url)


@Throttle(seconds=MIN_THROTTLE_TIME)
async def request_system_arm(blink, network, **kwargs):
"""
Expand Down
142 changes: 140 additions & 2 deletions blinkpy/sync_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,34 @@ def arm(self):
self.available = False
return None

@property
def scheduler_enabled(self):
"""Return boolean status of schedule."""
if self.network_info is None:
return False
try:
network = self.network_info.get("network", {})
if network.get("active_program_id") is not None or network.get(
"autoarm_time_enable", False
):
return True
# Fallback: check the programs list cached during get_network_info.
# If the key is absent the network info has not yet been fully
# populated - log a debug message so callers are aware.
if "programs" not in self.network_info:
_LOGGER.debug(
"Programs list not yet populated for network %s; "
"scheduler_enabled may be inaccurate.",
self.name,
)
return False
for program in self.network_info["programs"]:
if program.get("status") == "enabled":
return True
return False
except (AttributeError, KeyError, TypeError):
return False

@property
def local_storage(self):
"""Indicate if local storage is activated or not (True/False)."""
Expand All @@ -127,6 +155,111 @@ async def async_arm(self, value):
return await api.request_system_arm(self.blink, self.network_id)
return await api.request_system_disarm(self.blink, self.network_id)

async def async_set_scheduler(self, enable):
"""Enable or disable the Blink native schedule for this network.

:param enable: True to enable the schedule, False to disable it.
"""
if self.scheduler_enabled == enable:
_LOGGER.debug("Schedule already in state %s, skipping.", enable)
return True

# Use the programs list already cached by get_network_info() where
# possible to avoid a redundant API call. Fall back to a fresh fetch
# only if the cache is absent (e.g. in tests or before first refresh).
cached_programs = (
self.network_info.get("programs") if self.network_info else None
)
if cached_programs is None:
_LOGGER.debug("Programs not cached; fetching from API.")
cached_programs = await api.request_programs(self.blink, self.network_id)

if not enable:
# Prefer active_program_id from the network summary if present
program_id = self.network_info.get("network", {}).get("active_program_id")

# Otherwise find the currently enabled program from the list
if program_id is None and isinstance(cached_programs, list):
program_id = next(
(
p.get("id")
for p in cached_programs
if p.get("status") == "enabled"
),
None,
)

# If scheduler_enabled is True but no program ID is found, we cannot call
# the API disable endpoint (which requires a program ID). Since the
# requested operation fails, this is logged as a warning.
if program_id is None:
_LOGGER.warning("Could not find an active program to disable.")
return False

result = await api.request_program_disable(
self.blink, self.network_id, program_id
)
# Optimistically update the cache so scheduler_enabled reflects
# the new state without requiring a full refresh.
if result:
self._update_cached_program_status(program_id, "disabled")
return result

# To enable, use the first available program from the cached list.
# Blink networks typically support only a single native schedule (program),
# so we default to enabling the first one in the list.
try:
program_id = cached_programs[0]["id"]
except (KeyError, IndexError, TypeError):
_LOGGER.warning(
"Could not find a program to enable on network %s.", self.name
)
return False

result = await api.request_program_enable(
self.blink, self.network_id, program_id
)
# Optimistically update the cache so scheduler_enabled reflects
# the new state without requiring a full refresh.
if result:
self._update_cached_program_status(program_id, "enabled")
return result

def _update_cached_program_status(self, program_id, status):
"""Update the cached programs list after an enable/disable call.

The Blink API response for request_program_enable/disable only returns
a generic success message rather than the new program state. By
optimistically updating our local cache, scheduler_enabled remains
accurate immediately following async_set_scheduler, saving an
expensive and redundant network refresh.

:param program_id: ID of the program that was changed.
:param status: New status string, e.g. ``"enabled"`` or ``"disabled"``.
"""
try:
programs = (
self.network_info.get("programs", []) if self.network_info else []
)
if isinstance(programs, list):
program = next((p for p in programs if p.get("id") == program_id), None)
if program:
program["status"] = status
_LOGGER.debug(
"Optimistically updated program %s status to %s.",
program_id,
status,
)
# active_program_id in the network summary takes priority in
# scheduler_enabled, so keep it consistent with the new state.
network = self.network_info.get("network", {}) if self.network_info else {}
if status == "disabled":
network["active_program_id"] = None
else:
network["active_program_id"] = program_id
except (AttributeError, TypeError):
pass

async def start(self):
"""Initialize the system."""
_LOGGER.debug("Initializing the sync module")
Expand Down Expand Up @@ -269,6 +402,12 @@ async def get_network_info(self):
try:
if self.network_info["network"]["sync_module_error"]:
raise KeyError

# Cache the programs list locally during refresh so scheduler_enabled
# can check it synchronously without triggering redundant network calls.
programs = await api.request_programs(self.blink, self.network_id)
if isinstance(programs, list):
self.network_info["programs"] = programs
except (TypeError, KeyError):
self.available = False
return False
Expand Down Expand Up @@ -378,8 +517,7 @@ async def check_new_videos(self):
# Exit the loop once there are no new videos in the list.
if not self.check_new_video_time(iso_timestamp, last_manifest_read):
_LOGGER.info(
"No new local storage videos since last manifest "
"read at %s.",
"No new local storage videos since last manifest read at %s.",
last_read_local,
)
break
Expand Down
22 changes: 22 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,3 +205,25 @@ async def test_wait_for_command(self, mock_resp):

response = await api.wait_for_command(self.blink, None)
self.assertFalse(response)

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One test that should be added is to validate the programs flow if the api.request_programs method returns None (which is the fallback state for http_get)

async def test_request_programs(self, mock_resp):
"""Test request_programs."""
programs = [{"id": 123, "status": "enabled", "name": "My Schedule"}]
mock_resp.return_value = programs
self.assertEqual(await api.request_programs(self.blink, "network"), programs)

async def test_request_program_enable(self, mock_resp):
"""Test request_program_enable."""
mock_resp.return_value = {"status": 200}
self.assertEqual(
await api.request_program_enable(self.blink, "network", 123),
{"status": 200},
)

async def test_request_program_disable(self, mock_resp):
"""Test request_program_disable."""
mock_resp.return_value = {"status": 200}
self.assertEqual(
await api.request_program_disable(self.blink, "network", 123),
{"status": 200},
)
Loading
Loading