Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 96 additions & 6 deletions launchable/commands/record/attachment.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
import tarfile
import zipfile
from io import BytesIO
from typing import Optional

import click
from tabulate import tabulate

from ...utils.launchable_client import LaunchableClient
from ..helper import require_session


class AttachmentStatus:
SUCCESS = "✓ Recorded successfully"
FAILED = "⚠ Failed to record"
SKIPPED_NON_TEXT = "⚠ Skipped: not a valid text file"


@click.command()
@click.option(
'--session',
Expand All @@ -21,15 +31,95 @@ def attachment(
session: Optional[str] = None
):
client = LaunchableClient(app=context.obj)
summary_rows = []
try:
session = require_session(session)
assert session is not None

for a in attachments:
click.echo("Sending {}".format(a))
with open(a, mode='rb') as f:
res = client.request(
"post", "{}/attachment".format(session), compress=True, payload=f,
additional_headers={"Content-Disposition": "attachment;filename=\"{}\"".format(a)})
res.raise_for_status()
# If zip file
if zipfile.is_zipfile(a):
with zipfile.ZipFile(a, 'r') as zip_file:
for zip_info in zip_file.infolist():
if zip_info.is_dir():
continue

file_content = zip_file.read(zip_info.filename)

if not valid_utf8_file(file_content):
summary_rows.append(
[zip_info.filename, AttachmentStatus.SKIPPED_NON_TEXT])
continue

status = post_attachment(
client, session, file_content, zip_info.filename)
summary_rows.append([zip_info.filename, status])

# If tar file (tar, tar.gz, tar.bz2, tgz, etc.)
elif tarfile.is_tarfile(a):
with tarfile.open(a, 'r:*') as tar_file:
for tar_info in tar_file:
if tar_info.isdir():
continue

file_obj = tar_file.extractfile(tar_info)
if file_obj is None:
continue

file_content = file_obj.read()

if not valid_utf8_file(file_content):
summary_rows.append(
[tar_info.name, AttachmentStatus.SKIPPED_NON_TEXT])
continue

status = post_attachment(
client, session, file_content, tar_info.name)
summary_rows.append([tar_info.name, status])

else:
with open(a, mode='rb') as f:
file_content = f.read()

if not valid_utf8_file(file_content):
summary_rows.append(
[a, AttachmentStatus.SKIPPED_NON_TEXT])
continue

status = post_attachment(client, session, file_content, a)
summary_rows.append([a, status])

except Exception as e:
client.print_exception_and_recover(e)

display_summary_as_table(summary_rows)


def valid_utf8_file(file_content: bytes) -> bool:
# Check for null bytes (binary files)
if b'\x00' in file_content:
return False

try:
file_content.decode('utf-8')
return True
except UnicodeDecodeError:
return False


def post_attachment(client: LaunchableClient, session: str, file_content: bytes, filename: str) -> str:
try:
res = client.request(
"post", "{}/attachment".format(session), compress=True, payload=BytesIO(file_content),
additional_headers={"Content-Disposition": "attachment;filename=\"{}\"".format(filename)})
res.raise_for_status()
return AttachmentStatus.SUCCESS
except Exception as e:
click.echo("Failed to upload {}: {}".format(
filename, str(e)), err=True)
return AttachmentStatus.FAILED


def display_summary_as_table(rows):
headers = ["File", "Status"]
click.echo(tabulate(rows, headers, tablefmt="github"))
67 changes: 67 additions & 0 deletions tests/commands/record/test_attachment.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import gzip
import os
import tarfile
import tempfile
import zipfile
from unittest import mock

import responses # type: ignore
Expand Down Expand Up @@ -44,3 +46,68 @@ def verify_body(request):
self.assertEqual(TEST_CONTENT, body)

os.unlink(attachment.name)

@responses.activate
@mock.patch.dict(os.environ, {"LAUNCHABLE_TOKEN": CliTestCase.launchable_token})
def test_attachment_zip_file(self):
with tempfile.TemporaryDirectory() as temp_dir:
# Create temporary files
text_file_1 = os.path.join(temp_dir, "app.log")
text_file_2 = os.path.join(temp_dir, "nested", "debug.log")
binary_file = os.path.join(temp_dir, "binary.dat")
zip_path = os.path.join(temp_dir, "logs.zip")
tar_path = os.path.join(temp_dir, "logs.tar.gz")

# Create directory structure
os.makedirs(os.path.dirname(text_file_2))

# Write test content
with open(text_file_1, 'w') as f:
f.write("[INFO] Test log entry")
with open(text_file_2, 'w') as f:
f.write("[DEBUG] Nested log entry")
with open(binary_file, 'wb') as f:
f.write(b'\x00\x01\x02\x03')

# Create zip file
with zipfile.ZipFile(zip_path, 'w') as zf:
zf.write(text_file_1, 'app.log')
zf.write(text_file_2, 'nested/debug.log')
zf.write(binary_file, 'binary.dat')

# Create tar.gz file
with tarfile.open(tar_path, 'w:gz') as tf:
tf.add(text_file_1, 'app.log')
tf.add(text_file_2, 'nested/debug.log')
tf.add(binary_file, 'binary.dat')

responses.add(
responses.POST,
"{}/intake/organizations/{}/workspaces/{}/builds/{}/test_sessions/{}/attachment".format(
get_base_url(), self.organization, self.workspace, self.build_name, self.session_id),
match=[responses.matchers.header_matcher({"Content-Disposition": 'attachment;filename="app.log"'})],
json={"error": "Log file of the same name already exists"},
status=400)

responses.add(
responses.POST,
"{}/intake/organizations/{}/workspaces/{}/builds/{}/test_sessions/{}/attachment".format(
get_base_url(), self.organization, self.workspace, self.build_name, self.session_id),
match=[responses.matchers.header_matcher({"Content-Disposition": 'attachment;filename="nested/debug.log"'})],
status=200)

expect = """
| File | Status |
|------------------|----------------------------------|
| app.log | ⚠ Failed to record |
| nested/debug.log | ✓ Recorded successfully |
| binary.dat | ⚠ Skipped: not a valid text file |
"""

result = self.cli("record", "attachment", "--session", self.session, zip_path)

self.assertIn(expect, result.output)

result = self.cli("record", "attachment", "--session", self.session, tar_path)

self.assertIn(expect, result.output)
Loading