Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 24 additions & 30 deletions cyberdrop_dl/crawlers/_one_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,13 @@
from cyberdrop_dl.data_structures.url_objects import AbsoluteHttpURL, ScrapeItem


class Selectors:
class Selector:
TABLE = "table#list-table"
FILE_LINK = "a.download"
FOLDER_LINK = "a[name='folderlist']"
FILE = f"tr:has({FILE_LINK})"
FOLDER = f"tr:has({FOLDER_LINK})"
DATE = "td.updated_at"
README = "div#head.markdown-body"


_SELECTORS = Selectors()


class OneManagerCrawler(Crawler, is_abc=True):
Expand All @@ -41,51 +37,49 @@ class OneManagerCrawler(Crawler, is_abc=True):
async def fetch(self, scrape_item: ScrapeItem) -> None:
scrape_item.url = scrape_item.url.with_query(None)
if self.PRIMARY_URL not in scrape_item.parent_threads:
self.init_item(scrape_item)
await self.process_path(scrape_item)
self._init_item(scrape_item)
await self._path(scrape_item)

async def __async_post_init__(self) -> None:
self.manager.client_manager.download_slots.update({self.DOMAIN: 2})

@error_handling_wrapper
async def process_path(self, scrape_item: ScrapeItem) -> None:
async def _path(self, scrape_item: ScrapeItem) -> None:
try:
soup = await self.request_soup(scrape_item.url)
except InvalidContentTypeError: # This is a file, not html
except InvalidContentTypeError: # This is a file, not HTML
scrape_item.parent_title = scrape_item.parent_title.rsplit("/", 1)[0]
link = scrape_item.url
scrape_item.url = link.parent
return await self._process_file(scrape_item, link)

# TODO: save readme as a sidecard
if soup.select_one(_SELECTORS.README):
pass
return await self._file(scrape_item, link)

# href are not actual links, they only have the name of the new part
table = css.select(soup, _SELECTORS.TABLE)
for file in css.iselect(table, _SELECTORS.FILE):
await self.process_file(scrape_item, file)
table = css.select(soup, Selector.TABLE)

for file in css.iselect(table, Selector.FILE):
await self.file(scrape_item, file)
scrape_item.add_children()

for folder in css.iselect(table, _SELECTORS.FOLDER):
link = scrape_item.url / css.select(folder, _SELECTORS.FOLDER_LINK, "href")
new_scrape_item = scrape_item.create_child(link, new_title_part=link.name)
self.create_task(self.run(new_scrape_item))
for folder in css.iselect(table, Selector.FOLDER):
link = scrape_item.url / css.select(folder, Selector.FOLDER_LINK, "href")
new_item = scrape_item.create_child(link)
new_item.add_to_parent_title(link.name)
self.create_task(self.run(new_item))
scrape_item.add_children()

@error_handling_wrapper
async def process_file(self, scrape_item: ScrapeItem, file: Tag) -> None:
datetime = self.parse_date(css.select_text(file, _SELECTORS.DATE))
link = scrape_item.url / css.select(file, _SELECTORS.FILE_LINK, "href")
await self._process_file(scrape_item, link, datetime)
async def file(self, scrape_item: ScrapeItem, file: Tag) -> None:
datetime = self.parse_iso_date(css.select_text(file, Selector.DATE))
link = scrape_item.url / css.select(file, Selector.FILE_LINK, "href")
await self._file(scrape_item, link, datetime)

async def _process_file(self, scrape_item: ScrapeItem, link: AbsoluteHttpURL, datetime: int | None = None) -> None:
async def _file(self, scrape_item: ScrapeItem, link: AbsoluteHttpURL, uploaded_at: int | None = None) -> None:
preview_url = link.with_query("preview") # The query param needs to be `?preview` exactly, with no value or `=`
new_scrape_item = scrape_item.create_child(preview_url, possible_datetime=datetime)
filename, ext = self.get_filename_and_ext(link.name)
await self.handle_file(link, new_scrape_item, filename, ext)
new_item = scrape_item.create_child(preview_url)
new_item.uploaded_at = uploaded_at
await self.direct_file(new_item, link)

def init_item(self, scrape_item: ScrapeItem) -> None:
def _init_item(self, scrape_item: ScrapeItem) -> None:
scrape_item.setup_as_album(self.FOLDER_DOMAIN, album_id=self.DOMAIN)
for part in scrape_item.url.parts[1:]:
scrape_item.add_to_parent_title(part)
Expand Down
190 changes: 50 additions & 140 deletions cyberdrop_dl/crawlers/dirtyship.py
Original file line number Diff line number Diff line change
@@ -1,178 +1,88 @@
from __future__ import annotations

import json
from typing import TYPE_CHECKING, ClassVar, NamedTuple
from typing import TYPE_CHECKING, ClassVar

from cyberdrop_dl.crawlers.crawler import Crawler, SupportedPaths
from cyberdrop_dl.data_structures.mediaprops import Resolution
from cyberdrop_dl.data_structures.url_objects import AbsoluteHttpURL
from cyberdrop_dl.exceptions import ScrapeError
from cyberdrop_dl.utils import css
from cyberdrop_dl.utils.utilities import error_handling_wrapper
from cyberdrop_dl.utils.utilities import error_handling_wrapper, parse_url

if TYPE_CHECKING:
from collections.abc import Generator

from bs4 import BeautifulSoup

from cyberdrop_dl.data_structures.url_objects import ScrapeItem


class Selectors:
class Selector:
VIDEO = "video#fp-video-0 > source"
FLOWPLAYER_VIDEO = "div.freedomplayer"
FLOWPLAYER = ".freedomplayer"
PLAYLIST_ITEM = "li.thumi > a"
GALLERY_TITLE = "div#album p[style='text-align: center;']"
GALLERY_ALTERNATIVE_TITLE = "h1.singletitle"
GALLERY_THUMBNAILS = "div.gallery_grid img.gallery-img"
GALLERY_ALTERNATIVE_THUMBNAILS = "div#gallery-1 img"
GALLERY_DECODING_ASYNC = "div#album img[decoding='async']"
SINGLE_PHOTO = "div.resolutions a"


_SELECTORS = Selectors()


class Format(NamedTuple):
resolution: int | None
url: AbsoluteHttpURL


PRIMARY_URL = AbsoluteHttpURL("https://dirtyship.com")


class DirtyShipCrawler(Crawler):
SUPPORTED_PATHS: ClassVar[SupportedPaths] = {
"Category": "/category/...",
"Tag": "/tag/...",
"Video": "/<video_name>",
"Gallery": "/gallery/...",
"Photo": "/gallery/.../...",
"Category": "/category/<name>",
"Tag": "/tag/<name>",
"Video": "/<slug>",
}
PRIMARY_URL: ClassVar[AbsoluteHttpURL] = PRIMARY_URL
PRIMARY_URL: ClassVar[AbsoluteHttpURL] = AbsoluteHttpURL("https://dirtyship.com")
NEXT_PAGE_SELECTOR: ClassVar[str] = "a.page-next"
DOMAIN: ClassVar[str] = "dirtyship"
FOLDER_DOMAIN: ClassVar[str] = "DirtyShip"

async def fetch(self, scrape_item: ScrapeItem) -> None:
if any(p in scrape_item.url.parts for p in ("tag", "category")):
return await self.playlist(scrape_item)
if "gallery" in scrape_item.url.parts:
if len(scrape_item.url.parts) >= 4:
return await self.photo(scrape_item)
else:
return await self.gallery(scrape_item)
return await self.video(scrape_item)

@error_handling_wrapper
async def photo(self, scrape_item: ScrapeItem) -> None:
if await self.check_complete_from_referer(scrape_item):
return
if not scrape_item.url.suffix == ".jpg":
soup = await self.request_soup(scrape_item.url)
url = self.parse_url(
next(css.attr(a, "href") for a in soup.select(_SELECTORS.SINGLE_PHOTO) if "full" in a.get_text())
)
else:
url = scrape_item.url
filename, ext = self.get_filename_and_ext(url.name)
await self.handle_file(url, scrape_item, filename, ext)
match scrape_item.url.parts[1:]:
case ["tag" | "category" as type_, _]:
return await self.playlist(scrape_item, type_)
case [_]:
return await self.video(scrape_item)
case _:
raise ValueError

@error_handling_wrapper
async def gallery(self, scrape_item: ScrapeItem) -> None:
async def playlist(self, scrape_item: ScrapeItem, type_: str) -> None:
title: str = ""
async for soup in self.web_pager(scrape_item.url):
if not title:
title_tag = soup.select_one(_SELECTORS.GALLERY_TITLE) or soup.select_one(
_SELECTORS.GALLERY_ALTERNATIVE_TITLE
)
assert title_tag
title: str = title_tag.get_text(strip=True)
title = self.create_title(title)
name = css.select_text(soup, "title").split("Archives", 1)[0]
title = self.create_title(f"{name} [{type_}]")
scrape_item.setup_as_album(title)

thumbnails = (
soup.select(_SELECTORS.GALLERY_THUMBNAILS)
or soup.select(_SELECTORS.GALLERY_ALTERNATIVE_THUMBNAILS)
or soup.select(_SELECTORS.GALLERY_DECODING_ASYNC)
)

for img in thumbnails:
url = (
css.attr(img, "src")
if img.get("decoding") == "async"
else get_highest_resolution_picture(css.attr(img, "srcset"))
)
if not url:
raise ScrapeError(404)
url = self.parse_url(url)
filename, ext = self.get_filename_and_ext(url.name)
await self.handle_file(url, scrape_item, filename, ext)

@error_handling_wrapper
async def playlist(self, scrape_item: ScrapeItem) -> None:
title: str = ""
async for soup in self.web_pager(scrape_item.url):
if not title:
title: str = css.select_text(soup, "title")
title = title.split("Archives - DirtyShip")[0]
title = self.create_title(title)
scrape_item.setup_as_album(title)

for _, new_scrape_item in self.iter_children(scrape_item, soup, _SELECTORS.PLAYLIST_ITEM):
for _, new_scrape_item in self.iter_children(scrape_item, soup, Selector.PLAYLIST_ITEM):
self.create_task(self.run(new_scrape_item))

@error_handling_wrapper
async def video(self, scrape_item: ScrapeItem) -> None:
soup = await self.request_soup(scrape_item.url)

title: str = css.select_text(soup, "title")
title = title.split(" - DirtyShip")[0]
videos = soup.select(_SELECTORS.VIDEO)

def get_formats():
for video in videos:
link_str: str = css.attr(video, "src")
if link_str.startswith("type="):
continue
res: str = css.attr(video, "title")
link = self.parse_url(link_str)
yield (Format(int(res), link))

formats = set(get_formats())
if not formats:
formats = self.get_flowplayer_sources(soup)
if not formats:
raise ScrapeError(422, message="No video source found")

res, link = sorted(formats)[-1]
filename, ext = self.get_filename_and_ext(link.name)
custom_filename = self.create_custom_filename(title, ext, resolution=res)
await self.handle_file(link, scrape_item, filename, ext, custom_filename=custom_filename)

def get_flowplayer_sources(self, soup: BeautifulSoup) -> set[Format]:
flow_player = soup.select_one(_SELECTORS.FLOWPLAYER_VIDEO)
data_item: str | None = css.attr_or_none(flow_player, "data-item") if flow_player else None
if not data_item:
return set()
data_item = data_item.replace(r"\/", "/")
json_data = json.loads(data_item)
sources = json_data["sources"]
return {Format(None, self.parse_url(s["src"])) for s in sources}


"""~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"""


def get_highest_resolution_picture(srcset: str) -> str | None:
"""
Parses a srcset string and returns the URL with the highest resolution (width).
"""
candidates = []
for item in srcset.split(","):
parts = item.strip().split()
if len(parts) == 2:
url, width = parts
try:
width = int(width.rstrip("w"))
candidates.append((width, url))
except ValueError:
continue
return max(candidates)[1] if candidates else None
props = css.json_ld(soup)["@graph"]
article: dict[str, str] = next(prop for prop in props if prop["@type"] == "Article")
title = css.unescape(article["headline"])
_preview = next(prop["contentUrl"] for prop in props if prop["@type"] == "ImageObject")
scrape_item.uploaded_at = self.parse_iso_date(article["datePublished"])

try:
resolution, src = max(_parse_flowplayer_sources(soup))
except css.SelectorError:
resolution, src = max(_parse_html5_formats(soup))

filename, ext = self.get_filename_and_ext(src.name)
custom_filename = self.create_custom_filename(title, ext, resolution=resolution)
await self.handle_file(src, scrape_item, filename, ext, custom_filename=custom_filename)


def _parse_html5_formats(soup: BeautifulSoup) -> Generator[tuple[Resolution, AbsoluteHttpURL]]:
for video in css.iselect(soup, Selector.VIDEO):
res = Resolution.parse(css.attr(video, "title"))
link = parse_url(css.attr(video, "src"))
yield res, link


def _parse_flowplayer_sources(soup: BeautifulSoup) -> Generator[tuple[Resolution, AbsoluteHttpURL]]:
flow_player = css.select(soup, Selector.FLOWPLAYER, "data-item").replace(r"\/", "/")
source: dict[str, str]
for source in json.loads(flow_player)["sources"]:
yield Resolution.unknown(), parse_url(source["src"])
Loading
Loading