Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 86 additions & 36 deletions src/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
from typing import Optional, Dict, Any, List

from mappers import datetime_from_str
from mappers import datetime_from_str, datetime_to_str
from models import (
SettingsConfig,
BitwardenApiKey,
Expand All @@ -10,11 +11,25 @@
from splunk_api import SplunkApi
from utils import get_logger, set_logging_level, obj_to_json, app_name, secure_url

from solnlib.modular_input.checkpointer import KVStoreCheckpointer

CHECKPOINT_COLLECTION = "bitwarden_checkpoints"
CHECKPOINT_KEY = "event_logs"


class Config:
def __init__(self, splunk_api: SplunkApi):
self.logger = get_logger()
self.splunk_api = splunk_api
self._checkpointer = self._create_checkpointer()

def _create_checkpointer(self) -> KVStoreCheckpointer:
return KVStoreCheckpointer(
CHECKPOINT_COLLECTION,
self.splunk_api.service.token,
app_name,
owner="nobody",
)

def get_settings_config(self):
settings_config_dict = self.splunk_api.get_configuration('script')
Expand All @@ -33,11 +48,17 @@ def get_bitwarden_api_key(self):
return bitwarden_api_key

def get_checkpoint(self) -> EventLogsCheckpoint:
events_api_config_list = self.splunk_api.get_storage_configuration('eventsapi')
checkpoint = Config.__parse_checkpoint(events_api_config_list)
try:
data = self._checkpointer.get(CHECKPOINT_KEY)
except Exception as e:
self.logger.warning('failed to read checkpoint from KVStore: %s', e)
data = None

self.logger.debug('checkpoint %s', checkpoint)
if data is None:
data = self._migrate_legacy_checkpoint()

checkpoint = Config._parse_checkpoint_data(data)
self.logger.debug('checkpoint %s', checkpoint)
return checkpoint

def update_checkpoint(self,
Expand All @@ -48,21 +69,41 @@ def update_checkpoint(self,
if next_request is None and checkpoint.next_request is not None:
last_log_date = checkpoint.next_request.end

new_checkpoint = EventLogsCheckpoint(checkpoint.key_id,
next_request,
last_log_date)
new_checkpoint = EventLogsCheckpoint(next_request=next_request,
last_log_date=last_log_date)

checkpoint_data = Config._serialize_checkpoint(new_checkpoint)

try:
self._checkpointer.update(CHECKPOINT_KEY, checkpoint_data)
except Exception as e:
self.logger.error('failed to update checkpoint: %s', e)
raise

return new_checkpoint

new_checkpoint_json = obj_to_json(new_checkpoint)
def _migrate_legacy_checkpoint(self) -> Optional[Dict[str, Any]]:
"""Attempt to read from the legacy 'eventsapi' KVStore collection."""
try:
legacy_data_list = self.splunk_api.get_storage_configuration('eventsapi')
if legacy_data_list and len(legacy_data_list) > 0:
legacy = legacy_data_list[0]
self.logger.info('migrating legacy checkpoint from eventsapi KVStore')

if new_checkpoint.key_id is None:
self.splunk_api.create_storage_configuration('eventsapi',
new_checkpoint_json)
return self.get_checkpoint()
else:
self.splunk_api.update_storage_configuration('eventsapi',
new_checkpoint.key_id,
new_checkpoint_json)
return new_checkpoint
migrated = {}
next_req = legacy.get('next_request', None)
if next_req and isinstance(next_req, dict):
migrated['next_request_start'] = next_req.get('start')
migrated['next_request_end'] = next_req.get('end')
migrated['next_request_continuation_token'] = next_req.get('continuation_token')

migrated['last_log_date'] = legacy.get('last_log_date')

self._checkpointer.update(CHECKPOINT_KEY, migrated)
return migrated
except Exception as e:
self.logger.debug('no legacy checkpoint to migrate: %s', e)
return None

@classmethod
def __parse_settings_config(cls, settings: Optional[Dict[str, Dict[str, Any]]]) -> SettingsConfig:
Expand Down Expand Up @@ -99,25 +140,34 @@ def __parse_bitwarden_api_key(cls, bitwarden_api_key: Optional[str]) -> Bitwarde

return BitwardenApiKey(client_id, client_secret)

@classmethod
def __parse_checkpoint(cls, events_api_config_list: Optional[List[Any]]) -> EventLogsCheckpoint:
if events_api_config_list is None or len(events_api_config_list) == 0:
@staticmethod
def _parse_checkpoint_data(data: Optional[Dict[str, Any]]) -> EventLogsCheckpoint:
if data is None:
return EventLogsCheckpoint()

events_api_config = events_api_config_list[0]

if events_api_config is None or '_key' not in events_api_config:
raise Exception("Invalid checkpoint")

next_request = None
next_request_dict = events_api_config.get('next_request', None)
if next_request_dict is not None:
next_request = BitwardenEventsRequest(start=datetime_from_str(next_request_dict['start']),
end=datetime_from_str(next_request_dict['end']),
continuation_token=next_request_dict.get('continuation_token', None))

last_log_date = datetime_from_str(events_api_config.get('last_log_date', None))

return EventLogsCheckpoint(events_api_config['_key'],
next_request,
last_log_date)
start = data.get('next_request_start')
end = data.get('next_request_end')
if start and end:
next_request = BitwardenEventsRequest(
start=datetime_from_str(start),
end=datetime_from_str(end),
continuation_token=data.get('next_request_continuation_token')
)

last_log_date = datetime_from_str(data.get('last_log_date'))

return EventLogsCheckpoint(next_request=next_request,
last_log_date=last_log_date)

@staticmethod
def _serialize_checkpoint(checkpoint: EventLogsCheckpoint) -> Dict[str, Any]:
data = {}
if checkpoint.next_request is not None:
data['next_request_start'] = datetime_to_str(checkpoint.next_request.start)
data['next_request_end'] = datetime_to_str(checkpoint.next_request.end)
if checkpoint.next_request.continuation_token:
data['next_request_continuation_token'] = checkpoint.next_request.continuation_token
if checkpoint.last_log_date is not None:
data['last_log_date'] = datetime_to_str(checkpoint.last_log_date)
return data
11 changes: 11 additions & 0 deletions src/event_logs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import sys
import time
from dataclasses import asdict
from datetime import datetime, timedelta, timezone
from typing import Any, Optional, List
Expand All @@ -16,6 +17,12 @@
from utils import get_logger, obj_to_json


# Minimum delay between API calls in seconds.
# Bitwarden rate limit is 400 calls/minute β‰ˆ 6.67 calls/second.
# 0.2s delay = 5 calls/second (safe margin under the limit).
API_CALL_DELAY = 0.2


class EventLogsWriter:
def __init__(self,
bitwarden_api: BitwardenApi,
Expand Down Expand Up @@ -51,6 +58,10 @@ def read_events(self):
if events_response.continuationToken is None:
break

# Rate limiting: pause between API calls to stay under Bitwarden's
# rate limit of 400 calls/minute
time.sleep(API_CALL_DELAY)

def write_events(self, events: List[Any]):
self.logger.info('writing %s events', len(events))

Expand Down
1 change: 0 additions & 1 deletion src/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,5 @@ class BitwardenEventsResponse:

@dataclass
class EventLogsCheckpoint:
key_id: Optional[str] = None
next_request: Optional[BitwardenEventsRequest] = None
last_log_date: Optional[datetime] = None
124 changes: 124 additions & 0 deletions tests/test_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
"""Tests for checkpoint serialization/deserialization.

These test the static _serialize_checkpoint and _parse_checkpoint_data methods
without requiring splunklib or other Splunk dependencies.
"""

from datetime import datetime, timezone

import sys
import os

sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src"))

from models import EventLogsCheckpoint, BitwardenEventsRequest
from mappers import datetime_from_str, datetime_to_str


# Inline the static methods to avoid importing Config (which pulls in splunklib)
def _serialize_checkpoint(checkpoint: EventLogsCheckpoint):
data = {}
if checkpoint.next_request is not None:
data['next_request_start'] = datetime_to_str(checkpoint.next_request.start)
data['next_request_end'] = datetime_to_str(checkpoint.next_request.end)
if checkpoint.next_request.continuation_token:
data['next_request_continuation_token'] = checkpoint.next_request.continuation_token
if checkpoint.last_log_date is not None:
data['last_log_date'] = datetime_to_str(checkpoint.last_log_date)
return data


def _parse_checkpoint_data(data):
if data is None:
return EventLogsCheckpoint()

next_request = None
start = data.get('next_request_start')
end = data.get('next_request_end')
if start and end:
next_request = BitwardenEventsRequest(
start=datetime_from_str(start),
end=datetime_from_str(end),
continuation_token=data.get('next_request_continuation_token')
)

last_log_date = datetime_from_str(data.get('last_log_date'))

return EventLogsCheckpoint(next_request=next_request,
last_log_date=last_log_date)


def test_serialize_checkpoint_empty():
checkpoint = EventLogsCheckpoint()
data = _serialize_checkpoint(checkpoint)
assert data == {}


def test_serialize_checkpoint_with_last_log_date():
checkpoint = EventLogsCheckpoint(
last_log_date=datetime(2024, 6, 15, 12, 0, 0, tzinfo=timezone.utc)
)
data = _serialize_checkpoint(checkpoint)
assert data == {'last_log_date': '2024-06-15T12:00:00.000000Z'}


def test_serialize_checkpoint_with_next_request():
checkpoint = EventLogsCheckpoint(
next_request=BitwardenEventsRequest(
start=datetime(2024, 6, 15, 12, 0, 0, tzinfo=timezone.utc),
end=datetime(2024, 6, 16, 12, 0, 0, tzinfo=timezone.utc),
continuation_token='abc123'
),
last_log_date=datetime(2024, 6, 14, 0, 0, 0, tzinfo=timezone.utc)
)
data = _serialize_checkpoint(checkpoint)
assert data == {
'next_request_start': '2024-06-15T12:00:00.000000Z',
'next_request_end': '2024-06-16T12:00:00.000000Z',
'next_request_continuation_token': 'abc123',
'last_log_date': '2024-06-14T00:00:00.000000Z'
}


def test_parse_checkpoint_data_none():
checkpoint = _parse_checkpoint_data(None)
assert checkpoint.next_request is None
assert checkpoint.last_log_date is None


def test_parse_checkpoint_data_empty():
checkpoint = _parse_checkpoint_data({})
assert checkpoint.next_request is None
assert checkpoint.last_log_date is None


def test_parse_checkpoint_data_full():
data = {
'next_request_start': '2024-06-15T12:00:00.000000Z',
'next_request_end': '2024-06-16T12:00:00.000000Z',
'next_request_continuation_token': 'abc123',
'last_log_date': '2024-06-14T00:00:00.000000Z'
}
checkpoint = _parse_checkpoint_data(data)
assert checkpoint.next_request is not None
assert checkpoint.next_request.continuation_token == 'abc123'
assert checkpoint.last_log_date == datetime(2024, 6, 14, 0, 0, 0, tzinfo=timezone.utc)


def test_roundtrip_checkpoint():
"""Verify that serialize -> parse produces the same checkpoint."""
original = EventLogsCheckpoint(
next_request=BitwardenEventsRequest(
start=datetime(2024, 6, 15, 12, 30, 45, 123456, tzinfo=timezone.utc),
end=datetime(2024, 6, 16, 12, 30, 45, 654321, tzinfo=timezone.utc),
continuation_token='token-xyz'
),
last_log_date=datetime(2024, 6, 14, 0, 0, 0, tzinfo=timezone.utc)
)
data = _serialize_checkpoint(original)
restored = _parse_checkpoint_data(data)

assert restored.next_request.start == original.next_request.start
assert restored.next_request.end == original.next_request.end
assert restored.next_request.continuation_token == original.next_request.continuation_token
assert restored.last_log_date == original.last_log_date
Loading