diff --git a/launchable/commands/record/attachment.py b/launchable/commands/record/attachment.py index 5ffb6de56..b1f05bf7d 100644 --- a/launchable/commands/record/attachment.py +++ b/launchable/commands/record/attachment.py @@ -1,11 +1,21 @@ +import tarfile +import zipfile +from io import BytesIO from typing import Optional import click +from tabulate import tabulate from ...utils.launchable_client import LaunchableClient from ..helper import require_session +class AttachmentStatus: + SUCCESS = "✓ Recorded successfully" + FAILED = "⚠ Failed to record" + SKIPPED_NON_TEXT = "⚠ Skipped: not a valid text file" + + @click.command() @click.option( '--session', @@ -21,15 +31,95 @@ def attachment( session: Optional[str] = None ): client = LaunchableClient(app=context.obj) + summary_rows = [] try: session = require_session(session) + assert session is not None for a in attachments: - click.echo("Sending {}".format(a)) - with open(a, mode='rb') as f: - res = client.request( - "post", "{}/attachment".format(session), compress=True, payload=f, - additional_headers={"Content-Disposition": "attachment;filename=\"{}\"".format(a)}) - res.raise_for_status() + # If zip file + if zipfile.is_zipfile(a): + with zipfile.ZipFile(a, 'r') as zip_file: + for zip_info in zip_file.infolist(): + if zip_info.is_dir(): + continue + + file_content = zip_file.read(zip_info.filename) + + if not valid_utf8_file(file_content): + summary_rows.append( + [zip_info.filename, AttachmentStatus.SKIPPED_NON_TEXT]) + continue + + status = post_attachment( + client, session, file_content, zip_info.filename) + summary_rows.append([zip_info.filename, status]) + + # If tar file (tar, tar.gz, tar.bz2, tgz, etc.) + elif tarfile.is_tarfile(a): + with tarfile.open(a, 'r:*') as tar_file: + for tar_info in tar_file: + if tar_info.isdir(): + continue + + file_obj = tar_file.extractfile(tar_info) + if file_obj is None: + continue + + file_content = file_obj.read() + + if not valid_utf8_file(file_content): + summary_rows.append( + [tar_info.name, AttachmentStatus.SKIPPED_NON_TEXT]) + continue + + status = post_attachment( + client, session, file_content, tar_info.name) + summary_rows.append([tar_info.name, status]) + + else: + with open(a, mode='rb') as f: + file_content = f.read() + + if not valid_utf8_file(file_content): + summary_rows.append( + [a, AttachmentStatus.SKIPPED_NON_TEXT]) + continue + + status = post_attachment(client, session, file_content, a) + summary_rows.append([a, status]) + except Exception as e: client.print_exception_and_recover(e) + + display_summary_as_table(summary_rows) + + +def valid_utf8_file(file_content: bytes) -> bool: + # Check for null bytes (binary files) + if b'\x00' in file_content: + return False + + try: + file_content.decode('utf-8') + return True + except UnicodeDecodeError: + return False + + +def post_attachment(client: LaunchableClient, session: str, file_content: bytes, filename: str) -> str: + try: + res = client.request( + "post", "{}/attachment".format(session), compress=True, payload=BytesIO(file_content), + additional_headers={"Content-Disposition": "attachment;filename=\"{}\"".format(filename)}) + res.raise_for_status() + return AttachmentStatus.SUCCESS + except Exception as e: + click.echo("Failed to upload {}: {}".format( + filename, str(e)), err=True) + return AttachmentStatus.FAILED + + +def display_summary_as_table(rows): + headers = ["File", "Status"] + click.echo(tabulate(rows, headers, tablefmt="github")) diff --git a/tests/commands/record/test_attachment.py b/tests/commands/record/test_attachment.py index 2bf002e2c..4a4743c5d 100644 --- a/tests/commands/record/test_attachment.py +++ b/tests/commands/record/test_attachment.py @@ -1,6 +1,8 @@ import gzip import os +import tarfile import tempfile +import zipfile from unittest import mock import responses # type: ignore @@ -44,3 +46,68 @@ def verify_body(request): self.assertEqual(TEST_CONTENT, body) os.unlink(attachment.name) + + @responses.activate + @mock.patch.dict(os.environ, {"LAUNCHABLE_TOKEN": CliTestCase.launchable_token}) + def test_attachment_zip_file(self): + with tempfile.TemporaryDirectory() as temp_dir: + # Create temporary files + text_file_1 = os.path.join(temp_dir, "app.log") + text_file_2 = os.path.join(temp_dir, "nested", "debug.log") + binary_file = os.path.join(temp_dir, "binary.dat") + zip_path = os.path.join(temp_dir, "logs.zip") + tar_path = os.path.join(temp_dir, "logs.tar.gz") + + # Create directory structure + os.makedirs(os.path.dirname(text_file_2)) + + # Write test content + with open(text_file_1, 'w') as f: + f.write("[INFO] Test log entry") + with open(text_file_2, 'w') as f: + f.write("[DEBUG] Nested log entry") + with open(binary_file, 'wb') as f: + f.write(b'\x00\x01\x02\x03') + + # Create zip file + with zipfile.ZipFile(zip_path, 'w') as zf: + zf.write(text_file_1, 'app.log') + zf.write(text_file_2, 'nested/debug.log') + zf.write(binary_file, 'binary.dat') + + # Create tar.gz file + with tarfile.open(tar_path, 'w:gz') as tf: + tf.add(text_file_1, 'app.log') + tf.add(text_file_2, 'nested/debug.log') + tf.add(binary_file, 'binary.dat') + + responses.add( + responses.POST, + "{}/intake/organizations/{}/workspaces/{}/builds/{}/test_sessions/{}/attachment".format( + get_base_url(), self.organization, self.workspace, self.build_name, self.session_id), + match=[responses.matchers.header_matcher({"Content-Disposition": 'attachment;filename="app.log"'})], + json={"error": "Log file of the same name already exists"}, + status=400) + + responses.add( + responses.POST, + "{}/intake/organizations/{}/workspaces/{}/builds/{}/test_sessions/{}/attachment".format( + get_base_url(), self.organization, self.workspace, self.build_name, self.session_id), + match=[responses.matchers.header_matcher({"Content-Disposition": 'attachment;filename="nested/debug.log"'})], + status=200) + + expect = """ +| File | Status | +|------------------|----------------------------------| +| app.log | ⚠ Failed to record | +| nested/debug.log | ✓ Recorded successfully | +| binary.dat | ⚠ Skipped: not a valid text file | +""" + + result = self.cli("record", "attachment", "--session", self.session, zip_path) + + self.assertIn(expect, result.output) + + result = self.cli("record", "attachment", "--session", self.session, tar_path) + + self.assertIn(expect, result.output)