Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 62 additions & 64 deletions notebook.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import marimo

__generated_with = "0.22.4"
__generated_with = "0.23.0"
app = marimo.App(width="medium")


Expand Down Expand Up @@ -77,20 +77,8 @@ def parse_s3_keys(dataframe: pd.DataFrame) -> pd.DataFrame:
key_parts = dataframe["key"].str.split("/", expand=True)

dataframe.loc[:, "bagname"] = key_parts[8] if key_parts.shape[1] > 8 else ""
dataframe.loc[:, "uuid"] = (
key_parts[0]
+ key_parts[1]
+ "-"
+ key_parts[2]
+ "-"
+ key_parts[3]
+ "-"
+ key_parts[4]
+ "-"
+ key_parts[5]
+ key_parts[6]
if key_parts.shape[1] > 6
else "" + key_parts[7] if key_parts.shape[1] > 7 else ""
dataframe["uuid"] = dataframe["key"].str.extract(
r"(\S{8}-\S{4}-\S{4}-\S{4}-\S{12})"
)
dataframe["accession_name"] = dataframe["bagname"].str.split("-").str[0]
dataframe.loc[:, "file"] = dataframe["key"].str.split("/").str[-1]
Expand Down Expand Up @@ -309,10 +297,15 @@ def _(datetime, mo, timedelta):


@app.cell(hide_code=True)
def _(date_selector, pd):
def _(date_selector, mo, pd, symlink_dict):
# Retrieve parquet files from the selected date

selected_date = pd.to_datetime(date_selector.value).strftime("%Y-%m-%d")

mo.stop(
selected_date not in symlink_dict,
mo.md(f"No S3 Inventory data found for {selected_date}, select a different date"),
)
Comment on lines +305 to +308
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 - after we did that marimo research about how and when to use mo.stop(...), this feels like the right approach.

And maybe most importantly, how descendent cells are set; where the selected_date here makes all other cells descendent, and thus also stop with this mo.stop().

return (selected_date,)


Expand All @@ -329,6 +322,7 @@ def _(
ClientError,
io,
logger,
mo,
parquet_file_uri_cache,
pd,
s3,
Expand All @@ -338,7 +332,6 @@ def _(
):
# Add parquet file URIs to cache if not already present
if not parquet_file_uri_cache.get(selected_date):
logger.info(f"Collecting parquet file URIs for date: {selected_date}")
parquet_file_uris = []
for symlink in symlink_dict[selected_date]:
# Get parquet file URI from symlink.txt
Expand All @@ -358,37 +351,42 @@ def _(

# Retrieve parquet files
parquet_dfs = []
logger.info(f"Processing parquet file URIs for date: {selected_date}")
for parquet_file_uri in parquet_file_uri_cache[selected_date]:
# Parse parquet file URI
parsed_parquet_file_uri = urlparse(parquet_file_uri)
parquet_bucket = parsed_parquet_file_uri.netloc
parquet_key = parsed_parquet_file_uri.path.lstrip("/")

# Get parquet file and convert to dataframe
try:
logger.info(f"Retrieving parquet file: s3://{parquet_bucket}/{parquet_key}")
s3_object = s3.get_object(Bucket=parquet_bucket, Key=parquet_key)
except ClientError:
logger.exception("Client error while retrieving parquet file:")
raise
parquet_df = pd.read_parquet(io.BytesIO(s3_object["Body"].read()))
parquet_df.loc[:, "parquet_file"] = parquet_key.split("/")[-1]
parquet_dfs.append(parquet_df)

# Concatenate parquet dataframes
inventory_df = (
pd.concat(parquet_dfs, ignore_index=True).drop_duplicates().reset_index(drop=True)
)
with mo.status.spinner(title="Loading S3 inventory data..."):
logger.info(f"Processing parquet file URIs for date: {selected_date}")
for parquet_file_uri in parquet_file_uri_cache[selected_date]:
# Parse parquet file URI
parsed_parquet_file_uri = urlparse(parquet_file_uri)
parquet_bucket = parsed_parquet_file_uri.netloc
parquet_key = parsed_parquet_file_uri.path.lstrip("/")

# Get parquet file and convert to dataframe
try:
logger.info(
f"Retrieving parquet file: s3://{parquet_bucket}/{parquet_key}"
)
s3_object = s3.get_object(Bucket=parquet_bucket, Key=parquet_key)
except ClientError:
logger.exception("Client error while retrieving parquet file:")
raise
parquet_df = pd.read_parquet(io.BytesIO(s3_object["Body"].read()))
parquet_df.loc[:, "parquet_file"] = parquet_key.split("/")[-1]
parquet_dfs.append(parquet_df)

# Concatenate parquet dataframes
inventory_df = (
pd.concat(parquet_dfs, ignore_index=True)
.drop_duplicates()
.reset_index(drop=True)
)

# Keep only current objects in dataframe
inventory_df.loc[:, "is_current"] = (
inventory_df["is_latest"] & ~inventory_df["is_delete_marker"]
)
current_df = (
inventory_df.loc[inventory_df["is_current"]].copy().reset_index(drop=True)
)
logger.info(f"Current CDPS dataframe built with {len(current_df)} records.")
# Keep only current objects in dataframe
inventory_df.loc[:, "is_current"] = (
inventory_df["is_latest"] & ~inventory_df["is_delete_marker"]
)
current_df = (
inventory_df.loc[inventory_df["is_current"]].copy().reset_index(drop=True)
)
logger.info(f"Current CDPS dataframe built with {len(current_df)} records.")
return (current_df,)


Expand All @@ -401,24 +399,26 @@ def _(
is_normalized_file,
is_replica,
mime_types,
mo,
parse_s3_keys,
preservation_level,
rename_bucket,
set_status,
):
# Update dataframe with additional metadata
cdps_df = (
current_df.pipe(rename_bucket)
.pipe(parse_s3_keys)
.pipe(is_metadata)
.pipe(preservation_level)
.pipe(mime_types)
.pipe(is_digitized_aip)
.pipe(is_replica)
.pipe(is_normalized_file)
.pipe(set_status)
.pipe(is_av_image)
)
with mo.status.spinner(title="Processing data..."):
cdps_df = (
current_df.pipe(rename_bucket)
.pipe(parse_s3_keys)
.pipe(is_metadata)
.pipe(preservation_level)
.pipe(mime_types)
.pipe(is_digitized_aip)
.pipe(is_replica)
.pipe(is_normalized_file)
.pipe(set_status)
.pipe(is_av_image)
)
return (cdps_df,)


Expand Down Expand Up @@ -613,26 +613,24 @@ def _(cdps_df, convert_size, go, mo):

largest_file = cdps_df.loc[cdps_df["size"].idxmax()]
largest_file_data = {
"File name": largest_file["file"],
"File extension": largest_file["extension"],
"Storage size": convert_size(largest_file["size"]),
"Bag": largest_file["bagname"],
"Parquet file": largest_file["parquet_file"],
"File path": largest_file["filepath"],
}

metadata_files_data = cdps_df[cdps_df["status"] == "metadata"]
largest_metadata_file = metadata_files_data.loc[metadata_files_data["size"].idxmax()]
largest_metadata_file_data = {
"File name": largest_metadata_file["file"],
"File extension": largest_metadata_file["extension"],
"Storage size": convert_size(largest_metadata_file["size"]),
"Bag": largest_metadata_file["bagname"],
"Parquet file": largest_metadata_file["parquet_file"],
"File path": largest_metadata_file["filepath"],
}

top10_largest_files_data = (
cdps_df.sort_values(by="size", ascending=False)
.loc[:, ["file", "size"]]
.loc[:, ["extension", "bagname", "size"]]
.assign(size=lambda x: x["size"].apply(convert_size))
.reset_index(drop=True)[:10]
)
Expand Down
Loading
Loading