Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,20 +75,21 @@ installation:

```
$ dfindexeddb -h
usage: dfindexeddb [-h] {blink,gecko,db,ldb,log} ...
usage: dfindexeddb [-h] {blink,gecko,db,db_info,ldb,log} ...

A cli tool for parsing IndexedDB files

positional arguments:
{blink,gecko,db,ldb,log}
{blink,gecko,db,db_info,ldb,log}
blink Parse a file as a blink-encoded value.
gecko Parse a file as a gecko-encoded value.
db Parse a directory/file as IndexedDB.
db_info Extract and filter IndexedDB metadata.
ldb Parse a ldb file as IndexedDB.
log Parse a log file as IndexedDB.

options:
-h, --help show this help message and exit
-h, --help show this help message and exit
```

#### Examples:
Expand All @@ -101,6 +102,7 @@ options:
| **Chrome** (.ldb) | JSON-L | `dfindexeddb ldb -s SOURCE -o jsonl` |
| **Chrome** (.log) | Python repr | `dfindexeddb log -s SOURCE -o repr` |
| **Chrome** (Blink) | JSON | `dfindexeddb blink -s SOURCE` |
| **Database/Object Store information** (All) | JSON | `dfindexeddb db_info -s SOURCE --format [chrome\|firefox\|safari]` |


#### Options:
Expand All @@ -109,6 +111,8 @@ options:
| :--- | :--- |
| `--filter_key [term]` | Performs a substring match on the string representation of the record's key. |
| `--filter_value [term]` | Performs a substring match on the string representation of the record's value. If `--load_blobs` is used, it also searches within any associated blob data. |
| `--database_id [id]` | Filters records or metadata by database ID (where available). |
| `--object_store_id [id]` | Filters records or metadata by object store ID. |
| `--include_raw_data` | Include the raw key and value bytes in the record output. |
| `--load_blobs` | For Firefox, Safari and Chromium-based browsers, attempt to find and read associated blob files. |

Expand Down
86 changes: 81 additions & 5 deletions dfindexeddb/indexeddb/chromium/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -1559,6 +1559,79 @@ def FromDecoder(
return cls(offset=base_offset + offset, entries=entries)


@dataclass
class ChromiumLevelDBObjectStoreInfo:
"""Chromium IndexedDB object store info parsed from LevelDB.

Attributes:
database_id: the database ID.
id: the object store ID.
name: the object store name.
key_path: the object store key path.
auto_increment: whether the object store is auto increment.
database_name: the database name.
"""

database_id: int
id: int
name: str = ""
key_path: Optional[Union[str, list[str]]] = None
auto_increment: bool = False
database_name: str = ""

@classmethod
def FromRecords(
cls, records: Generator[Any, None, None]
) -> Generator[ChromiumLevelDBObjectStoreInfo, None, None]:
"""Yields ChromiumLevelDBObjectStoreInfo from leveldb records.

Args:
records: An iterable or generator of records (ChromiumIndexedDBRecord).
"""
stores: dict[tuple[int, int], ChromiumLevelDBObjectStoreInfo] = {}
db_names: dict[int, str] = {}

for record_obj in records:
if not hasattr(record_obj, "key"):
continue
key = record_obj.key

if isinstance(key, DatabaseNameKey):
db_names[record_obj.value] = key.database_name
continue

if not isinstance(key, ObjectStoreMetaDataKey):
continue

db_id = key.key_prefix.database_id
store_id = key.object_store_id
key_tuple = (db_id, store_id)

if key_tuple not in stores:
stores[key_tuple] = cls(database_id=db_id, id=store_id)

store = stores[key_tuple]
value = record_obj.value

if key.metadata_type == (
definitions.ObjectStoreMetaDataKeyType.OBJECT_STORE_NAME
):
store.name = value
elif key.metadata_type == (
definitions.ObjectStoreMetaDataKeyType.KEY_PATH
):
store.key_path = value.value if hasattr(value, "value") else value
elif key.metadata_type == (
definitions.ObjectStoreMetaDataKeyType.AUTO_INCREMENT_FLAG
):
store.auto_increment = value

for store in stores.values():
if store.database_id in db_names:
store.database_name = db_names[store.database_id]
yield store


@dataclass
class ChromiumIndexedDBRecord:
"""An IndexedDB Record parsed from LevelDB.
Expand Down Expand Up @@ -1746,10 +1819,10 @@ def __init__(self, folder_name: pathlib.Path):
folder_name: the source blob folder.

Raises:
ValueError: if folder_name is None or not a directory.
FileNotFoundError: if folder_name is None or not a directory.
"""
if not folder_name or not folder_name.is_dir():
raise ValueError(f"{folder_name} is None or not a directory")
raise FileNotFoundError(f"{folder_name} is None or not a directory")
self.folder_name = folder_name.absolute()

def ReadBlob(self, database_id: int, blob_id: int) -> tuple[str, bytes]:
Expand Down Expand Up @@ -1837,9 +1910,12 @@ def __init__(self, folder_name: pathlib.Path):
# Locate the correponding blob folder. The folder_name should be
# <origin>.leveldb and the blob folder should be <origin>.blob
if str(self.folder_name).endswith(".leveldb"):
self.blob_folder_reader = BlobFolderReader(
pathlib.Path(str(self.folder_name).replace(".leveldb", ".blob"))
)
try:
self.blob_folder_reader = BlobFolderReader(
pathlib.Path(str(self.folder_name).replace(".leveldb", ".blob"))
)
except FileNotFoundError:
self.blob_folder_reader = None # type: ignore[assignment]
else:
self.blob_folder_reader = None # type: ignore[assignment]

Expand Down
111 changes: 111 additions & 0 deletions dfindexeddb/indexeddb/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def _MatchesFilters(
"""Returns True if the record matches the filter criteria.

Supported filters:
* database_id - filters by database ID
* object_store_id - filters by object store ID
* filter_key - filters by key
* filter_value - filters by value
Expand All @@ -131,6 +132,11 @@ def _MatchesFilters(
):
return False

if args.database_id is not None:
db_id_record = getattr(record, "database_id", None)
if db_id_record is not None and db_id_record != args.database_id:
return False

if args.filter_value is not None:
if not record.is_value_filterable:
return False
Expand Down Expand Up @@ -243,6 +249,53 @@ def DbCommand(args: argparse.Namespace) -> None:
HandleSafariDB(args)


def DbInfoCommand(args: argparse.Namespace) -> None:
"""The CLI for providing information about an IndexedDB's object stores.

Args:
args: The arguments for processing the IndexedDB metadata.
"""
records: Any = []
if args.format in ("chrome", "chromium"):
if args.source.is_file():
sqlite_reader = sqlite.DatabaseReader(str(args.source))
records = sqlite_reader.ObjectStores()
else:
raw_records = chromium_record.FolderReader(args.source).GetRecords(
use_manifest=args.use_manifest,
use_sequence_number=args.use_sequence_number,
include_raw_data=False,
load_blobs=False,
)
records = chromium_record.ChromiumLevelDBObjectStoreInfo.FromRecords(
raw_records
)
elif args.format == "firefox":
firefox_reader = firefox_record.FileReader(str(args.source))
records = firefox_reader.ObjectStores()
elif args.format == "safari":
safari_reader = safari_record.FileReader(str(args.source))
records = safari_reader.ObjectStores()

for record_item in records:
if args.database_id is not None:
if (
hasattr(record_item, "database_id")
and record_item.database_id != args.database_id
):
continue
if args.object_store_id is not None:
if hasattr(record_item, "id") and record_item.id != args.object_store_id:
continue
if (
hasattr(record_item, "object_store_id")
and record_item.object_store_id != args.object_store_id
):
continue

_Output(record_item, output=args.output)


def LdbCommand(args: argparse.Namespace) -> None:
"""The CLI for processing a LevelDB table (.ldb) file as IndexedDB.

Expand Down Expand Up @@ -360,6 +413,11 @@ def App() -> None:
type=int,
help="The object store ID to filter by.",
)
parser_db.add_argument(
"--database_id",
type=int,
help="The database ID to filter by.",
)
parser_db.add_argument(
"--include_raw_data",
action="store_true",
Expand Down Expand Up @@ -396,6 +454,59 @@ def App() -> None:
)
parser_db.set_defaults(func=DbCommand)

parser_db_info = subparsers.add_parser(
"db_info", help="Extract and filter IndexedDB metadata."
)
parser_db_info.add_argument(
"-s",
"--source",
required=True,
type=pathlib.Path,
help=(
"The source IndexedDB folder (for chrome/chromium) "
"or sqlite3 file (for firefox/safari)."
),
)
recover_group_info = parser_db_info.add_mutually_exclusive_group()
recover_group_info.add_argument(
"--use_manifest",
action="store_true",
help="Use manifest file to determine active/deleted records.",
)
recover_group_info.add_argument(
"--use_sequence_number",
action="store_true",
help=(
"Use sequence number and file offset to determine active/deleted "
"records."
),
)
parser_db_info.add_argument(
"-f",
"--format",
required=True,
choices=["chromium", "chrome", "firefox", "safari"],
help="The type of IndexedDB to parse.",
)
parser_db_info.add_argument(
"--database_id",
type=int,
help="The database ID to filter by.",
)
parser_db_info.add_argument(
"--object_store_id",
type=int,
help="The object store ID to filter by.",
)
parser_db_info.add_argument(
"-o",
"--output",
choices=["json", "jsonl", "repr"],
default="json",
help="Output format. Default is json.",
)
parser_db_info.set_defaults(func=DbInfoCommand)

parser_ldb = subparsers.add_parser(
"ldb", help="Parse a ldb file as IndexedDB."
)
Expand Down
8 changes: 4 additions & 4 deletions dfindexeddb/indexeddb/safari/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ class SafariBlobInfo:


@dataclass
class ObjectStoreInfo:
"""An ObjectStoreInfo.
class SafariObjectStoreInfo:
"""A Safari ObjectStoreInfo.

Attributes:
id: the object store ID.
Expand Down Expand Up @@ -243,7 +243,7 @@ def LoadBlobsForRecordId(self, record_id: int) -> List[SafariBlobInfo]:
)
return blobs

def ObjectStores(self) -> Generator[ObjectStoreInfo, None, None]:
def ObjectStores(self) -> Generator[SafariObjectStoreInfo, None, None]:
"""Returns the Object Store information from the IndexedDB database.

Yields:
Expand All @@ -256,7 +256,7 @@ def ObjectStores(self) -> Generator[ObjectStoreInfo, None, None]:
results = cursor.fetchall()
for result in results:
key_path = plistlib.loads(result[2])
yield ObjectStoreInfo(
yield SafariObjectStoreInfo(
id=result[0],
name=self._DecodeString(result[1]),
key_path=key_path,
Expand Down
2 changes: 1 addition & 1 deletion dfindexeddb/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"""Version information for dfIndexeddb."""


__version__ = "20260210"
__version__ = "20260327"


def GetVersion() -> str:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "dfindexeddb"
version = "20260210"
version = "20260327"
requires-python = ">=3.9"
description = "dfindexeddb is an experimental Python tool for performing digital forensic analysis of IndexedDB and leveldb files."
license = "Apache-2.0"
Expand Down
35 changes: 35 additions & 0 deletions tests/dfindexeddb/indexeddb/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def setUp(self) -> None:
self.args = argparse.Namespace(
source="source_file",
format="chrome",
database_id=None,
object_store_id=None,
include_raw_data=False,
filter_value=None,
Expand Down Expand Up @@ -80,6 +81,40 @@ def test_matches_filters_object_store_id(self) -> None:
# pylint: disable=protected-access
self.assertFalse(cli._MatchesFilters(record, self.args))

def test_matches_filters_database_id(self) -> None:
"""Tests _MatchesFilters with database_id filter."""
record = chromium_record.ChromiumIndexedDBRecord(
path="path",
offset=0,
key="key",
value="val",
sequence_number=1,
type=1,
level=0,
recovered=False,
database_id=1,
object_store_id=1,
)
self.args.database_id = 1
# pylint: disable=protected-access
self.assertTrue(cli._MatchesFilters(record, self.args))

self.args.database_id = 2
# pylint: disable=protected-access
self.assertFalse(cli._MatchesFilters(record, self.args))

# Test soft filtering (records without database_id should pass)
safari_record_obj = safari_record.SafariIndexedDBRecord(
key="key",
value="val",
object_store_id=1,
object_store_name="store",
database_name="db",
record_id=1,
)
# pylint: disable=protected-access
self.assertTrue(cli._MatchesFilters(safari_record_obj, self.args))

def test_matches_filters_value(self) -> None:
"""Tests _MatchesFilters with value filter."""
record = chromium_record.ChromiumIndexedDBRecord(
Expand Down
Loading