Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 44 additions & 34 deletions launchable/commands/record/attachment.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class AttachmentStatus:
SUCCESS = "✓ Recorded successfully"
FAILED = "⚠ Failed to record"
SKIPPED_NON_TEXT = "⚠ Skipped: not a valid text file"
SKIPPED_DUPLICATE = "⚠ Skipped: duplicate"


@click.command()
Expand Down Expand Up @@ -66,8 +67,12 @@ def attachment(
[zip_info.filename, AttachmentStatus.SKIPPED_NON_TEXT])
continue

file_name = normalize_filename(zip_info.filename)
file_name = get_unique_filename(file_name, used_filenames)
file_name = get_unique_filename(zip_info.filename, used_filenames)
if not file_name:
summary_rows.append(
[zip_info.filename, AttachmentStatus.SKIPPED_DUPLICATE])
continue

status = post_attachment(
client, session, file_content, file_name)
summary_rows.append([file_name, status])
Expand All @@ -93,8 +98,12 @@ def attachment(
[tar_info.name, AttachmentStatus.SKIPPED_NON_TEXT])
continue

file_name = normalize_filename(tar_info.name)
file_name = get_unique_filename(file_name, used_filenames)
file_name = get_unique_filename(tar_info.name, used_filenames)
if not file_name:
summary_rows.append(
[tar_info.name, AttachmentStatus.SKIPPED_DUPLICATE])
continue

status = post_attachment(
client, session, file_content, file_name)
summary_rows.append([file_name, status])
Expand All @@ -108,8 +117,12 @@ def attachment(
[a, AttachmentStatus.SKIPPED_NON_TEXT])
continue

file_name = normalize_filename(a)
file_name = get_unique_filename(file_name, used_filenames)
file_name = get_unique_filename(a, used_filenames)
if not file_name:
summary_rows.append(
[a, AttachmentStatus.SKIPPED_DUPLICATE])
continue

status = post_attachment(client, session, file_content, file_name)
summary_rows.append([file_name, status])

Expand All @@ -119,40 +132,37 @@ def attachment(
display_summary_as_table(summary_rows)


def get_unique_filename(filepath: str, used_filenames: Set[str]) -> str:
def get_unique_filename(filepath: str, used_filenames: Set[str]) -> Optional[str]:
"""
Get a unique filename by extracting the basename and prepending parent folder if needed.
Get a unique filename by extracting the basename and prepending parent folders if needed.
Strategy:
1. First occurrence: use basename (e.g., app.log)
2. Duplicate: prepend parent folder (e.g., nested-app.log)
3. Still duplicate: append .1, .2, etc. (e.g., nested-app.1.log)
2. Duplicate: prepend parent directories until unique
"""
filename = os.path.basename(filepath)
# Normalize path separators to forward slash (archives always use forward slash in both linux, and windows)
normalized_path = filepath.replace(os.sep, '/')
normalized_path = normalize_filename(normalized_path)

basename = normalized_path.split('/')[-1]

# If basename is not used, return it
if filename not in used_filenames:
used_filenames.add(filename)
return filename

# Try prepending the parent directory name
parent_dir = os.path.basename(os.path.dirname(filepath))
if parent_dir: # Has a parent directory
name, ext = os.path.splitext(filename)
filename = f"{parent_dir}-{name}{ext}"

if filename not in used_filenames:
used_filenames.add(filename)
return filename

# If still duplicate, append numbers
name, ext = os.path.splitext(filename)
counter = 1
while True:
filename = f"{name}.{counter}{ext}"
if filename not in used_filenames:
used_filenames.add(filename)
return filename
counter += 1
if basename not in used_filenames:
used_filenames.add(basename)
return basename

# Try prepending parents from nearest to farthest
path_parts = normalized_path.split('/')
parent_parts = [p for p in path_parts[:-1] if p]

prefixed_name = basename
for parent in reversed(parent_parts):
prefixed_name = f"{parent}/{prefixed_name}"

if prefixed_name not in used_filenames:
used_filenames.add(prefixed_name)
return prefixed_name

return None


def matches_include_patterns(filename: str, include_patterns: Tuple[str, ...]) -> bool:
Expand Down
56 changes: 49 additions & 7 deletions tests/commands/record/test_attachment.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def test_attachment_with_identical_file_names(self):
with tempfile.TemporaryDirectory() as temp_dir:
# Create temporary files
text_file_1 = os.path.join(temp_dir, "app.log")
text_file_2 = os.path.join(temp_dir, "nested", "app.log")
text_file_2 = os.path.join(temp_dir, "nested_2", "app.log")
zip_path = os.path.join(temp_dir, "logs.zip")

# Create directory structure
Expand All @@ -183,7 +183,8 @@ def test_attachment_with_identical_file_names(self):
# Create zip file
with zipfile.ZipFile(zip_path, 'w') as zf:
zf.write(text_file_1, 'app.log')
zf.write(text_file_2, 'nested/app.log')
zf.write(text_file_2, 'nested_1/app.log')
zf.write(text_file_2, 'nested_2/app.log')

responses.add(
responses.POST,
Expand All @@ -196,15 +197,23 @@ def test_attachment_with_identical_file_names(self):
responses.POST,
"{}/intake/organizations/{}/workspaces/{}/builds/{}/test_sessions/{}/attachment".format(
get_base_url(), self.organization, self.workspace, self.build_name, self.session_id),
match=[responses.matchers.header_matcher({"Content-Disposition": 'attachment;filename="nested-app.log"'})],
match=[responses.matchers.header_matcher({"Content-Disposition": 'attachment;filename="nested_1/app.log"'})],
status=200)

responses.add(
responses.POST,
"{}/intake/organizations/{}/workspaces/{}/builds/{}/test_sessions/{}/attachment".format(
get_base_url(), self.organization, self.workspace, self.build_name, self.session_id),
match=[responses.matchers.header_matcher({"Content-Disposition": 'attachment;filename="nested_2/app.log"'})],
status=200)

result = self.cli("record", "attachment", "--session", self.session, zip_path)

expect = """| File | Status |
|----------------|-------------------------|
| app.log | ✓ Recorded successfully |
| nested-app.log | ✓ Recorded successfully |
expect = """| File | Status |
|------------------|-------------------------|
| app.log | ✓ Recorded successfully |
| nested_1/app.log | ✓ Recorded successfully |
| nested_2/app.log | ✓ Recorded successfully |
"""
self.assertEqual(expect, result.output)

Expand Down Expand Up @@ -236,3 +245,36 @@ def test_attachment_with_whitespace_in_filename(self):

self.assert_success(result)
self.assertIn("✓ Recorded successfully", result.output)

@responses.activate
@mock.patch.dict(os.environ, {"LAUNCHABLE_TOKEN": CliTestCase.launchable_token})
def test_attachment_duplicate_file_paths(self):
with tempfile.TemporaryDirectory() as temp_dir:
# Create temporary files
text_file_1 = os.path.join(temp_dir, "app.log")
zip_path = os.path.join(temp_dir, "logs.zip")

# Write test content
with open(text_file_1, 'w') as f:
f.write("[INFO] Test log entry")

# Create zip file
with zipfile.ZipFile(zip_path, 'w') as zf:
zf.write(text_file_1, 'app.log')

responses.add(
responses.POST,
"{}/intake/organizations/{}/workspaces/{}/builds/{}/test_sessions/{}/attachment".format(
get_base_url(), self.organization, self.workspace, self.build_name, self.session_id),
match=[responses.matchers.header_matcher({"Content-Disposition": 'attachment;filename="app.log"'})],
status=200)

expect = """| File | Status |
|---------|-------------------------|
| app.log | ✓ Recorded successfully |
| app.log | ⚠ Skipped: duplicate |
"""

result = self.cli("record", "attachment", "--session", self.session, zip_path, zip_path)

self.assertIn(expect, result.output)
Loading