diff --git a/README.md b/README.md index c86d1e65..4c06cc00 100644 --- a/README.md +++ b/README.md @@ -1,28 +1,232 @@ -# How to create a PR with a homework task +RSS reader +========= -1. Create fork from the following repo: https://github.com/E-P-T/Homework. (Docs: https://docs.github.com/en/get-started/quickstart/fork-a-repo ) -2. Clone your forked repo in your local folder. -3. Create separate branches for each session.Example(`session_2`, `session_3` and so on) -4. Create folder with you First and Last name in you forked repo in the created session. -5. Add your task into created folder -6. Push finished session task in the appropriate branch in accordance with written above. - You should get the structure that looks something like that +This is RSS reader version 1.0. +rss_reader.py is a python script intended to get RSS feed from given source URL +and write its content to standart output. + +Please be carefull with redirecting output to files. In this case CPython implementation +of Python interpreter will change encoding from UTF-8 to +the system locale encoding (i.e. the ANSI codepage). + +This script will try to install all required packages from PyPI with pip in +the current environment. + +Tests +------ + +To launch tests run + +on Windows + +```shell +python -m unittest tests.py +``` + +on Linux + +```bash +python3 -m unittest tests.py +``` + +To check test coverage run + +on Windows + +```shell +python -m coverage run --source=rss_reader -m unittest tests.py +python -m coverage report -m +``` + +on Linux + +```bash +python3 -m coverage run --source=rss_reader -m unittest tests.py +python3 -m coverage report -m +``` + +All specified above commands should be used when current directory is the directory with rss_reader.py + +How to execute without installation +------ + +Before installation there are two ways to start RSS reader + +1. Using module loading. Run from directory with rss_reader.py file the following command + + on Windows + + ```shell + python -m rss_reader ... + ``` + + on Linux + + ```bash + python3 -m rss_reader ... + ``` + +2. Specifying the script file. Run from directory with rss_reader.py file the following command + + on Windows + + ```shell + python rss_reader.py ... + ``` + + on Linux + + ```bash + python3 rss_reader.py ... + ``` + +Installation +------ + +To install the script as site-package to python environment run the following command + +on Windows + +```shell +python setup.py install ``` - Branch: Session_2 - DzmitryKolb - |___Task1.py - |___Task2.py - Branch: Session_3 - DzmitryKolb - |___Task1.py - |___Task2.py + +on Linux + +```bash +python3 setup.py install ``` -7. When you finish your work on task you should create Pull request to the appropriate branch of the main repo https://github.com/E-P-T/Homework (Docs: https://docs.github.com/en/github/collaborating-with-pull-requests/proposing-changes-to-your-work-with-pull-requests/creating-a-pull-request-from-a-fork). -Please use the following instructions to prepare good description of the pull request: - - Pull request header should be: `Session - `. - Example: `Session 2 - Dzmitry Kolb` - - Pull request body: You should write here what tasks were implemented. - Example: `Finished: Task 1.2, Task 1.3, Task 1.6` +How to execute after installation +------ + +Before installation there are three ways to start RSS reader + +1. Using module loading. Run from any directory + + on Windows + + ```shell + python -m rss_reader ... + ``` + + on Linux + + ```bash + python3 -m rss_reader ... + ``` + +2. Specifying the script file. Run from directory with rss_reader.py file the following command + + on Windows + + ```shell + python rss_reader.py ... + ``` + + on Linux + + ```bash + python3 rss_reader.py ... + ``` + +3. Using entry point. Run from any directory + + ```shell + rss_reader ... + ``` + +Command line format +------- + + usage: rss_reader.py [-h] [--version] [--json] [--verbose] [--limit LIMIT] [--date DATE] [--to-html HTML_DEST] + [--to-pdf PDF_DEST] + source + + Pure Python command-line RSS reader. + + positional arguments: + source RSS URL + + optional arguments: + -h, --help show this help message and exit + --version Print version info + --json Print result as JSON in stdout + --verbose Outputs verbose status messages + --limit LIMIT Limit news topics if this parameter provided + --date DATE Get from cache news that was published after specified date (date should be specified in format + YYYYmmdd, for example --date 20191020) + --to-html HTML_DEST Store feed in HTML as specified file + --to-pdf PDF_DEST Store feed in PDF as specified file + +JSON representation +------- + +```json +{ + "title": Title of the feed, + "link": URL of feed, + "description": Description of the feed, + "items": [ + { + "title": Item title if present, + "pubDate": Publication date if present, + "link": URL of the item if present, + "description": Description of the item, + "links": [ + [ + Link URL, + Link type + ], + ... + ] + }, + ... + ] +} +``` + +Cache storage format +------ + +News cache is stored in file rss_reader.cache in current working directory + +Content of the cache file is serialized dictionary by module `pickle`. + +Keys of the dictionary are URLs of retieved feeds. + +For each key in dictionary appropriate value is the result of parsing feed with merged item lists. + +Items from all retrieval of the same URL will be merged together in single list. + +The result of parsing feed is the dictionary with following keys: + + - `title` - title of the feed + + - `link` - link to the feed + + - `description` - description of the feed + + - `items` - list of parsed items of the feed. + +The result of parsing item of feed is dictionary with following keys: + + - `title` - title of the item + + - `pubDate` - publication date of the item + + - `link` - link to resource related to the item + + - `description` - description of the item + + - `images` - dictionary of collected images (keys are URLs of images and values are their content as bytes object) + + - `links` - a list of links collected for the item. + +There is a tuple in the list `links` for each link collected for the item. The tuple has two elements: + - URL of the link + + - type of the link. It may be just link if type is html or unknown. + Or it may be type part of MIME type of the resource. \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 00000000..67ca1062 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,14 @@ +beautifulsoup4==4.11.1 +bs4==0.0.1 +certifi==2022.6.15 +charset-normalizer==2.1.0 +defusedxml==0.7.1 +fpdf2==2.5.5 +idna==3.3 +lxml==4.9.0 +Pillow==9.1.1 +python-dateutil==2.8.2 +requests==2.28.1 +six==1.16.0 +soupsieve==2.3.2.post1 +urllib3==1.26.9 diff --git a/rss_reader.py b/rss_reader.py new file mode 100644 index 00000000..e0d461f2 --- /dev/null +++ b/rss_reader.py @@ -0,0 +1,535 @@ +""" +This rss_reader.py is a python script intended to get RSS feed from given source URL +and write its content to standart output. + +Please be carefull with redirecting output to files. In this case CPython implementation +of Python interpreter will change encoding from UTF-8 to +the system locale encoding (i.e. the ANSI codepage). + +This script will try to install all required packages from PyPI with pip in +the current environment. +""" + +import sys +import logging +import json +from functools import cached_property + + +def install_and_import(module_name, package_name=None): + """ + This function tries to import module `module_name`. + In case of failure of that operation the function installs package `package_name` + """ + from importlib import import_module + try: + logging.debug(f'Trying to import module {module_name}') + return import_module(module_name) + except ImportError: + from subprocess import run + if package_name is None: + package_name = module_name + run([sys.executable, '-m', 'pip', 'install', package_name]) + try: + return import_module(module_name) + except ImportError: + print(f'Failed to install package {package_name}', file=sys.stderr) + + +def install_modules(): + """ + Try to import nonstandard modules and install them in case of failure + """ + for module_name in 'lxml', 'bs4', 'requests': + install_and_import(module_name) + for module_name, package_name in ('dateutil', 'python-dateutil'), ('fpdf', 'fpdf2'): + install_and_import(module_name, package_name) + + +def parse_args(args=None): + """ + Parse command line arguments from args or if not provided from sys.argv + + Args should not contain name of program. + """ + import argparse + from datetime import datetime + parser = argparse.ArgumentParser( + description='Pure Python command-line RSS reader.', + exit_on_error=False) + parser.add_argument('--version', action='version', help='Print version info', version='4.0') + parser.add_argument('--json', action='store_true', default=False, help='Print result as JSON in stdout') + parser.add_argument('--verbose', action='store_true', default=False, help='Outputs verbose status messages') + parser.add_argument('--limit', type=int, help='Limit news topics if this parameter provided') + parser.add_argument('--date', type=lambda s: datetime.strptime(s, '%Y%m%d').astimezone(), + help='Get from cache news that was published after specified date\n' + '(date should be specified in format YYYYmmdd, for example --date 20191020)') + parser.add_argument('--to-html', dest='html_dest', help='Store feed in HTML as specified file') + parser.add_argument('--to-pdf', dest='pdf_dest', help='Store feed in PDF as specified file') + parser.add_argument('source', help='RSS URL') + parsed_args = parser.parse_args(args) + return parsed_args + + +def request_feed(source): + """ + Get content of feed by URL + """ + import requests + resp = requests.get(source) + return resp.text + + +def get_text(element): + """ + Return text of element or None if no element + """ + return element.text if element is not None else '' + + +def get_date(element): + """ + Create datetime object from text of element. If element is None then datetime.min is returned + """ + from dateutil import parser + from datetime import datetime + return parser.parse(element.text) if element is not None else datetime.min + + +def get_link(elem): + """ + Get information of link element: url and type + + Return tuple with url and kind of resource + """ + url = elem['url'] + type_ = elem['type'].split('/')[0] if 'type' in elem.attrs else 'image' + return (url, type_) + + +def recieve_image(src): + """ + Get content of resource. Intended to store images in cache. + Returns bytes object with content downloaded from specified URL. + """ + from requests import get + from shutil import copyfileobj + from io import BytesIO + + r = get(src, stream=True) + with BytesIO() as f: + r.raw.decode_content = True + copyfileobj(r.raw, f) + return f.getvalue() + + +def parse_item(item): + """ + Parse item element + + Returns dict with keys title, pubDate, link, description and links. + For keys title, pubDate and description correspond information is stored. + For key links dict stores list of tuples: + first element of the tuple is URL of the link and + second element of the tuple is type (link or image etc). + """ + from bs4 import BeautifulSoup + logging.debug("Getting item information...") + item_info = { + 'title': get_text(item.title), + 'pubDate': get_date(item.pubDate), + 'link': get_text(item.link) + } + links = [(item_info['link'], 'link')] + logging.debug('Looking for enclosures') + enclosures = [get_link(enclosure) for enclosure in item('enclosure')] + prefix = "".join(f'[image {n}]' for n, _ in enumerate(enclosures, start=len(links) + 1)) + links.extend(enclosures) + item_info['images'] = {url: recieve_image(url) for url, t in enclosures if t == 'image'} + logging.debug('Looking for medias') + medias = [get_link(media) for media in item('media:content')] + prefix += "".join(f'[image {n}]' for n, _ in enumerate(medias, start=len(links) + 1)) + links.extend(medias) + item_info['images'].update({url: recieve_image(url) for url, t in medias if t == 'image'}) + if item.description is not None: + logging.debug('Parsing item description') + item_info['description_raw'] = item.description.text + description = BeautifulSoup(item.description.text, 'lxml') + logging.debug('Replacing image references and links in description') + for tag in description(['img', 'a']): + if tag.name == 'img': + links.append((tag['src'], 'image')) + num = len(links) + tag.replace_with(f'[image {num}]') + item_info['images'][tag['src']] = recieve_image(tag['src']) + else: + links.append((tag['href'], 'link')) + num = len(links) + tag.append(f'[{num}]') + description = description.text + else: + description = '' + item_info['description'] = prefix + description + item_info['links'] = links + return item_info + + +def parse_feed(content): + """ + Parse content as channel acording to RSS 2.0 + """ + from bs4 import BeautifulSoup + from operator import itemgetter + try: + logging.debug('Extracitng channel information...') + feed = BeautifulSoup(content, 'lxml-xml').rss.channel + logging.debug('Examining metadata') + info = { + 'title': feed.find('title', recursive=False).text, + 'link': feed.find('link', recursive=False).text, + 'description': feed.find('description', recursive=False).text, + } + logging.debug('Getting items...') + info['items'] = sorted([parse_item(item) for item in feed('item')], + key=itemgetter('pubDate', 'title', 'description'), reverse=True) + return info + except Exception: + raise ValueError('Failed to parse feed') + + +def limit_feed(feed, limit): + """ + Limit number of items in feed. This function will + replace feed items list by its slice. + """ + feed['items'] = feed['items'][:limit] + + +def format_text(feed): + """ + Make a text representation of feed + """ + from io import StringIO + with StringIO() as fd: + print('Feed:', feed['title'], file=fd) + for item in feed['items']: + print(file=fd) + print('Title:', item['title'], file=fd) + print('Date:', item['pubDate'].strftime('%a, %d %b %Y %H:%M:%S %z'), file=fd) + print('Link:', item['link'], file=fd) + print(file=fd) + print(item['description'], file=fd) + print('\nLinks:', file=fd) + for num, (link, kind) in enumerate(item['links'], start=1): + print(f'[{num}]: {link} ({kind})', file=fd) + return fd.getvalue() + + +class DateTimeEncoder(json.JSONEncoder): + + """ + The DateTimeEncoder class provides marshalling of type datatime.datetime for JSON encoding with module json. + """ + + def __init__(self, *, skipkeys=False, ensure_ascii=True, check_circular=True, + allow_nan=True, sort_keys=False, indent=None, + separators=None, default=None): + """ + The constructor just call parents constructor with the same parameters + """ + super().__init__(skipkeys=skipkeys, ensure_ascii=ensure_ascii, + check_circular=check_circular, allow_nan=allow_nan, + sort_keys=sort_keys, indent=indent, + separators=separators, default=default) + + def default(self, obj): + """ + Objects of type datatime.datetime will be converted to JSON as + string of format strftime('%a, %d %b %Y %H:%M:%S %z'). All other objects will be + converted in usual way. + """ + from datetime import datetime + if type(obj) == datetime: + return obj.strftime('%a, %d %b %Y %H:%M:%S %z') + else: + return super().default(obj) + + +def format_json(news): + """ + Represent feed in JSON format + """ + from json import dumps + from copy import deepcopy + noimg = deepcopy(news) + for item in noimg['items']: + del item['images'] + item['description'] = item['description_raw'] + del item['description_raw'] + return dumps(noimg, ensure_ascii=False, indent=1, cls=DateTimeEncoder) + + +def load_cache(): + """ + Loading all items from local cache + + Cache is represented as dictionary: + - keys are of sources + - values are news collections for the source with of items sorted ascending by pubDate + """ + from pickle import load + try: + with open('rss_reader.cache', 'rb') as f: + return load(f) + except (FileNotFoundError, EOFError): + # Cache is empty + return {} + + +def save_cache(cache): + """ + Save cache in locate storage + """ + from pickle import dump + with open('rss_reader.cache', 'wb') as f: + dump(cache, f) + + +def merge_items(alist, blist): + """ + Merge two lists of feed items. Eleminate duplicates. Items from blist has greater prececdnce + """ + from operator import itemgetter + key_getter = itemgetter('pubDate', 'title', 'link', 'description') + adict = {key_getter(a): a for a in alist} + bdict = {key_getter(b): b for b in blist} + adict.update(bdict) + return sorted(adict.values(), key=itemgetter('pubDate'), reverse=True) + + +def update_cache(cache, source, feed): + """ + Update cache with parsed feed + """ + from copy import deepcopy, copy + if source in cache: + items = merge_items(cache[source]['items'], feed['items']) + else: + items = copy(feed['items']) + cache[source] = deepcopy(feed) + cache[source]['items'] = items + + +def lookup_cache(cache, source, date): + """ + Looking for feed items in cache + """ + from itertools import takewhile + logging.debug(f'Looking for news not before {date=}') + assert source in cache, f'No news in cache for {source=}' + news = cache[source] + news['items'] = list(takewhile( + lambda item: item['pubDate'] >= date, + news['items'])) + return news + + +def receive_feed(source, cache): + """ + Request feed content from specified source + """ + logging.debug(f'Trying to get {source}') + content = request_feed(source) + logging.debug('Data is received') + news = parse_feed(content) + logging.debug('Feed is parsed') + try: + update_cache(cache, source, news) + logging.debug('Cache update') + save_cache(cache) + logging.debug('Cache stored') + except Exception as e: + logging.debug(e) + print('WARNING: Cache is disabled. No new items were stored.') + return news + + +def _download_font(): + """ + Download archive with fonts for FPDF and extract DejaVuSansCondensed.ttf + + If the file DejaVuSansCondensed.ttf is found in current directory + then no operation is performed + """ + from os.path import exists + from requests import get + from io import BytesIO + from zipfile import ZipFile + from shutil import copyfileobj + if exists('DejaVuSansCondensed.ttf'): + return + try: + r = get('https://github.com/reingart/pyfpdf/releases/download/binary/fpdf_unicode_font_pack.zip', stream=True) + with BytesIO(r.content) as b, ZipFile(b) as z, z.open('font/DejaVuSansCondensed.ttf') as f: + with open('DejaVuSansCondensed.ttf', 'wb') as d: + copyfileobj(f, d) + except Exception as e: + raise IOError(f'Can not get font: {e}') + + +class Formatters: + + """ + Set of interdependant formatters. One formatter may require result of other formatter + """ + + def __init__(self, feed): + """ + Initialize formatter. Feed is parsed representation of feed being formatted + """ + self.feed = feed + self.images = {url: image for item in self.feed['items'] for url, image in item['images'].items()} + + def _get_cached_image(self, url): + """ + Get image from feed as BytesIO + """ + from io import BytesIO + return BytesIO(self.images[url]) + + @cached_property + def to_html(self): + """ + Represent feed as HTML document + """ + from bs4 import BeautifulSoup as bsoup + html = bsoup('', 'html.parser') + new_tag = html.new_tag + head = new_tag('head') + html('html')[0].append(head) + head.append(new_tag('meta', charset='utf-8')) + title = new_tag('title') + title.append(self.feed['title']) + head.append(title) + body = new_tag('body') + html('html')[0].append(body) + tag = new_tag('h1') + body.append(tag) + tag.append(self.feed['title']) + tag = new_tag('div') + body.append(tag) + tag.append(self.feed['description']) + for item in self.feed['items']: + art_title = new_tag('h2') + body.append(art_title) + art_title.append(item['title']) + art_time = new_tag('p') + body.append(art_time) + art_time.append(item['pubDate'].strftime('%a, %d %b %Y %H:%M:%S %z')) + if 'description_raw' in item: + description = new_tag('div') + body.append(description) + descr_content = bsoup(item['description_raw'], 'lxml') + for img_tag in descr_content('img'): + if 'width' not in img_tag.attrs or 'height' not in img_tag.attrs: + img_tag['width'] = 160 + img_tag['height'] = 100 + try: + inner = descr_content.html.body.p.text + except Exception: + inner = None + if inner == item['description_raw']: + description.append(inner) + else: + description.append(descr_content) + link_list = new_tag('ol') + body.append(link_list) + for link, mt in item['links']: + links_item = new_tag('li') + link_list.append(links_item) + if mt == 'image': + link_tag = new_tag('img', src=link, width=160, height=100) + links_item.append(link_tag) + else: + link_tag = new_tag('a', href=link) + links_item.append(link_tag) + link_tag.append(mt) + return str(html).encode('utf-8') + + @cached_property + def to_pdf(self): + """ + Represent feed as PDF document. + + Use weasyprint to convert from HTML representation + """ + + from fpdf import FPDF, HTMLMixin + + class PDF(FPDF, HTMLMixin): + pass + + html_str = self.to_html.decode('utf-8') + pdf = PDF() + pdf.set_title(self.feed['title']) + pdf.add_page() + _download_font() + pdf.add_font('DejaVu', fname='DejaVuSansCondensed.ttf') + pdf.set_font('DejaVu', size=14) + pdf.write_html(html_str, image_map=self._get_cached_image) + return pdf.output() + + +def main(): + """ + Preparation and execution organization + """ + try: + # parse arguments + args = parse_args() + # install required modules + install_modules() + # set logging level acording to --verbose flag + logging.basicConfig( + level=logging.DEBUG if args.verbose else logging.INFO + ) + cache = load_cache() + logging.debug('Cache loaded') + if args.date is None: + news = receive_feed(args.source, cache) + else: + news = lookup_cache(cache, args.source, args.date) + assert news is not None, 'No news found' + limit_feed(news, args.limit) + logging.debug(f'{len(news["items"])} item(s) extracted') + text_required = args.html_dest is None and args.pdf_dest is None and not args.json + formatter = Formatters(news) + if args.html_dest: + try: + with open(args.html_dest, 'wb') as f: + html = formatter.to_html + f.write(html) + except Exception as e: + print(f'Faild to write html file: {e}') + if args.pdf_dest: + try: + with open(args.pdf_dest, 'wb') as f: + pdf = formatter.to_pdf + f.write(pdf) + except Exception as e: + print(f'Failed to write pdf file: {e}') + if args.json: + content = format_json + sys.stdout.write(content) + if text_required: + content = format_text(news) + logging.debug('Content formatted') + sys.stdout.write(content) + except AssertionError as failed: + print(failed) + except Exception as e: + print(e) + + +if __name__ == '__main__': + main() diff --git a/setup.py b/setup.py new file mode 100644 index 00000000..6abc88bc --- /dev/null +++ b/setup.py @@ -0,0 +1,27 @@ +from setuptools import setup + +setup( + name='rss_reader', + version='2.0', + py_modules=['rss_reader'], + install_requires=[ + 'beautifulsoup4==4.11.1', + 'bs4==0.0.1', + 'certifi==2022.6.15', + 'charset-normalizer==2.1.0', + 'defusedxml==0.7.1', + 'fpdf2==2.5.5', + 'idna==3.3', + 'lxml==4.9.0', + 'Pillow==9.1.1', + 'python-dateutil==2.8.2' + 'requests==2.28.1', + 'soupsieve==2.3.2.post1', + 'urllib3==1.26.9', + ], + entry_points={ + 'console_scripts': [ + 'rss_reader = rss_reader:main' + ] + } +) diff --git a/tests.py b/tests.py new file mode 100644 index 00000000..98faf90b --- /dev/null +++ b/tests.py @@ -0,0 +1,337 @@ +import unittest +import rss_reader +import sys +import datetime +import bs4 + + +class TestParseArgs(unittest.TestCase): + + def setUp(self): + self.fake_exit_status = None + self.orig_sys_exit = sys.exit + sys.exit = self._fake_sys_exit + + def _fake_sys_exit(self, status=0): + self.fake_exit_status = status + + def tearDown(self): + sys.exit = self.orig_sys_exit + + def test_empty(self): + rss_reader.parse_args([]) + self.assertNotEqual(self.fake_exit_status, None) + + def test_only_url(self): + args = rss_reader.parse_args(['mockurl']) + self.assertFalse(args.json) + self.assertFalse(args.verbose) + self.assertEqual(args.source, 'mockurl') + self.assertEqual(args.limit, None) + + def test_version(self): + rss_reader.parse_args(['--version']) + self.assertNotEqual(self.fake_exit_status, None) + + def test_json(self): + args = rss_reader.parse_args(['--json', 'mockurl2']) + self.assertTrue(args.json) + self.assertFalse(args.verbose) + self.assertEqual(args.source, 'mockurl2') + self.assertEqual(args.limit, None) + + def test_verbose(self): + args = rss_reader.parse_args(['--verbose', 'mockurl3']) + self.assertFalse(args.json) + self.assertTrue(args.verbose) + self.assertEqual(args.source, 'mockurl3') + self.assertEqual(args.limit, None) + + def test_limit(self): + args = rss_reader.parse_args(['--limit', '10', 'mockurl4']) + self.assertFalse(args.json) + self.assertFalse(args.verbose) + self.assertEqual(args.source, 'mockurl4') + self.assertEqual(args.limit, 10) + + +class TestParseFeed(unittest.TestCase): + + SAMPLE_2_0 = """ + + + Liftoff News + http://liftoff.msfc.nasa.gov/ + Liftoff to Space Exploration. + en-us + Tue, 10 Jun 2003 04:00:00 GMT + Tue, 10 Jun 2003 09:41:01 GMT + http://blogs.law.harvard.edu/tech/rss + Weblog Editor 2.0 + editor@example.com + webmaster@example.com + + Star City + http://liftoff.msfc.nasa.gov/news/2003/news-starcity.asp + <img src="https://example.com/images/logo.png">How do Americans get ready to work with Russians aboard the International Space Station? They take a crash course in culture, language and protocol at Russia's <a href="http://howe.iki.rssi.ru/GCTC/gctc_e.htm">Star City</a>. + Tue, 03 Jun 2003 09:39:21 GMT + http://liftoff.msfc.nasa.gov/2003/06/03.html#item573 + + + Sky watchers in Europe, Asia, and parts of Alaska and Canada will experience a <a href="http://science.nasa.gov/headlines/y2003/30may_solareclipse.htm">partial eclipse of the Sun</a> on Saturday, May 31st. + Fri, 30 May 2003 11:06:42 GMT + http://liftoff.msfc.nasa.gov/2003/05/30.html#item572 + + + The Engine That Does More + http://liftoff.msfc.nasa.gov/news/2003/news-VASIMR.asp + Before man travels to Mars, NASA hopes to design new engines that will let us fly through the Solar System more quickly. The proposed VASIMR engine would do that. + Tue, 27 May 2003 08:37:32 GMT + http://liftoff.msfc.nasa.gov/2003/05/27.html#item571 + + + Astronauts' Dirty Laundry + http://liftoff.msfc.nasa.gov/news/2003/news-laundry.asp + Compared to earlier spacecraft, the International Space Station has many luxuries, but laundry facilities are not one of them. Instead, astronauts have other options. + Tue, 20 May 2003 08:56:02 GMT + http://liftoff.msfc.nasa.gov/2003/05/20.html#item570 + + +""" + + def test_sample20(self): + content = self.SAMPLE_2_0 + feed = rss_reader.parse_feed(content) + self.assertEqual(feed['title'], 'Liftoff News') + self.assertEqual(feed['link'], 'http://liftoff.msfc.nasa.gov/') + self.assertEqual(feed['description'], 'Liftoff to Space Exploration.') + self.assertEqual(len(feed['items']), 4) + self.assertEqual(feed['items'][0]['title'], 'Star City') + self.assertEqual(feed['items'][0]['link'], 'http://liftoff.msfc.nasa.gov/news/2003/news-starcity.asp') + self.assertEqual(feed['items'][0]['pubDate'], datetime.datetime(2003, 6, 3, 9, 39, 21, tzinfo=datetime.timezone.utc)) + self.assertEqual(feed['items'][0]['description'], '[image 2]How do Americans get ready to work with Russians aboard the International Space Station? They take a crash course in culture, language and protocol at Russia\'s Star City[3].') + self.assertEqual(len(feed['items'][0]['links']), 3) + self.assertEqual(feed['items'][0]['links'][0], ('http://liftoff.msfc.nasa.gov/news/2003/news-starcity.asp', 'link')) + self.assertEqual(feed['items'][0]['links'][1], ('https://example.com/images/logo.png', 'image')) + self.assertEqual(feed['items'][0]['links'][2], ('http://howe.iki.rssi.ru/GCTC/gctc_e.htm', 'link')) + + def test_illformed(self): + with self.assertRaises(ValueError): + content = "Invalid RSS" + rss_reader.parse_feed(content) + + def test_limit(self): + content = self.SAMPLE_2_0 + feed = rss_reader.parse_feed(content) + rss_reader.limit_feed(feed, 1) + self.assertEqual(len(feed['items']), 1) + + def test_overlimit(self): + content = self.SAMPLE_2_0 + feed = rss_reader.parse_feed(content) + rss_reader.limit_feed(feed, 5) + self.assertEqual(len(feed['items']), 4) + + def test_text(self): + content = self.SAMPLE_2_0 + feed = rss_reader.parse_feed(content) + rss_reader.limit_feed(feed, 1) + text = rss_reader.format_text(feed) + self.assertEqual(text, + "Feed: Liftoff News\n\nTitle: Star City\n" + "Date: Tue, 03 Jun 2003 09:39:21 +0000\n" + "Link: http://liftoff.msfc.nasa.gov/news/2003/news-starcity.asp\n\n" + "[image 2]How do Americans get ready to work with Russians aboard the International Space Station? They take a crash course in culture, language and protocol at Russia\'s Star City[3].\n\n" + "Links:\n[1]: http://liftoff.msfc.nasa.gov/news/2003/news-starcity.asp (link)\n" + "[2]: https://example.com/images/logo.png (image)\n" + "[3]: http://howe.iki.rssi.ru/GCTC/gctc_e.htm (link)\n") + + def test_json(self): + content = self.SAMPLE_2_0 + feed = rss_reader.parse_feed(content) + rss_reader.limit_feed(feed, 1) + json = rss_reader.format_json(feed) + self.maxDiff = None + self.assertEqual(json, + '{\n "title": "Liftoff News",\n "link": "http://liftoff.msfc.nasa.gov/",\n' + ' "description": "Liftoff to Space Exploration.",\n "items": [\n' + ' {\n "title": "Star City",\n "pubDate": "Tue, 03 Jun 2003 09:39:21 +0000",\n' + ' "link": "http://liftoff.msfc.nasa.gov/news/2003/news-starcity.asp",\n' + ' "description": "How do Americans get ready to work with Russians aboard the International Space Station? They take a crash course in culture, language and protocol at Russia\'s Star City.",\n' + ' "links": [\n' + ' [\n "http://liftoff.msfc.nasa.gov/news/2003/news-starcity.asp",\n "link"\n ],\n' + ' [\n "https://example.com/images/logo.png",\n "image"\n ],\n' + ' [\n "http://howe.iki.rssi.ru/GCTC/gctc_e.htm",\n "link"\n ]\n' + ' ]\n }\n ]\n}') + + +class TestAuxiliary(unittest.TestCase): + + def test_get_link(self): + elem = bs4.BeautifulSoup('', 'lxml-xml').enclosure + link = rss_reader.get_link(elem) + self.assertEqual(link, ('http://www.scripting.com/mp3s/weatherReportSuite.mp3', 'audio')) + + def test_get_text(self): + self.assertEqual(rss_reader.get_text(None), '') + + +class TextCache(unittest.TestCase): + + def test_merge_items(self): + self.maxDiff = None + alist = [ + { + 'title': 'A', + 'link': 'http://example.com/A', + 'description': 'Describe A', + 'pubDate': datetime.datetime.fromisoformat('2022-03-01T10:11:23+03:00') + }, + { + 'title': 'B', + 'link': 'http://example.com/B', + 'description': 'Describe B', + 'pubDate': datetime.datetime.fromisoformat('2022-02-02T10:11:23+03:00') + }, + ] + blist = [ + { + 'title': 'B', + 'link': 'http://example.com/B', + 'description': 'Describe B', + 'pubDate': datetime.datetime.fromisoformat('2022-02-02T10:11:23+03:00') + }, + { + 'title': 'C', + 'link': 'http://example.com/C', + 'description': 'Describe C', + 'pubDate': datetime.datetime.fromisoformat('2022-01-02T10:11:23+03:00') + }, + ] + rlist = [ + { + 'title': 'A', + 'link': 'http://example.com/A', + 'description': 'Describe A', + 'pubDate': datetime.datetime.fromisoformat('2022-03-01T10:11:23+03:00') + }, + { + 'title': 'B', + 'link': 'http://example.com/B', + 'description': 'Describe B', + 'pubDate': datetime.datetime.fromisoformat('2022-02-02T10:11:23+03:00') + }, + { + 'title': 'C', + 'link': 'http://example.com/C', + 'description': 'Describe C', + 'pubDate': datetime.datetime.fromisoformat('2022-01-02T10:11:23+03:00') + }, + ] + self.assertEqual(rss_reader.merge_items(alist, blist), rlist) + + def test_update_cache(self): + cache = { + 'http://example.com/feedA': { + 'title': 'feed A', + 'description': 'Describe A', + 'link': 'http://example.com/feedA', + 'items': [] + } + } + feed = { + 'title': 'feed B', + 'description': 'Describe B', + 'link': 'http://example.com/feedB', + 'items': [] + } + new_cache = { + 'http://example.com/feedA': { + 'title': 'feed A', + 'description': 'Describe A', + 'link': 'http://example.com/feedA', + 'items': [] + }, + 'http://example.com/feedB': { + 'title': 'feed B', + 'description': 'Describe B', + 'link': 'http://example.com/feedB', + 'items': [] + } + } + rss_reader.update_cache(cache, 'http://example.com/feedB', feed) + self.assertEqual(cache, new_cache) + + def test_lookup_cache(self): + cache = { + 'http://example.com/feedA': { + 'title': 'feed A', + 'description': 'Describe A', + 'link': 'http://example.com/feedA', + 'items': [] + }, + 'http://example.com/feedB': { + 'title': 'feed B', + 'description': 'Describe B', + 'link': 'http://example.com/feedB', + 'items': [] + } + } + self.assertEqual( + rss_reader.lookup_cache( + cache, 'http://example.com/feedA', + datetime.datetime.fromisoformat('2022-06-01T01:00+03:00')), + { + 'title': 'feed A', + 'description': 'Describe A', + 'link': 'http://example.com/feedA', + 'items': [] + } + ) + + +class TestFormatters(unittest.TestCase): + + def test_get_cached_image(self): + feed = { + 'title': 'Feed A', + 'description': 'Describe A', + 'link': 'http://example.com/feedA', + 'items': [ + { + 'title': 'A01', + 'link': 'http://example.com/feedA', + 'description': 'Describe A01', + 'pubDate': datetime.datetime.fromisoformat('2022-01-02T10:11:23+03:00'), + 'links': [('http://example.com/feedA', 'link'), ('http://example.com/image01.png', 'image')], + 'images': {'http://example.com/image01.png': b'IMAGEDATA'} + } + ] + } + formatter = rss_reader.Formatters(feed) + self.assertEqual(formatter._get_cached_image('http://example.com/image01.png').getvalue(), b'IMAGEDATA') + + def test_to_html(self): + feed = { + 'title': 'Feed A', + 'description': 'Describe A', + 'link': 'http://example.com/feedA', + 'items': [ + { + 'title': 'A01', + 'link': 'http://example.com/feedA', + 'description': 'Describe A01', + 'description_raw': 'Describe A01', + 'pubDate': datetime.datetime.fromisoformat('2022-01-02T10:11:23+03:00'), + 'links': [('http://example.com/feedA', 'link'), ('http://example.com/image01.png', 'image')], + 'images': {'http://example.com/image01.png': b'IMAGEDATA'} + } + ] + } + formatter = rss_reader.Formatters(feed) + self.assertEqual(formatter.to_html, + b'\nFeed A' + b'

Feed A

Describe A

A01

Sun, 02 Jan 2022 10:11:23 +0300

' + b'
Describe A01
  1. link
  2. ' + b'
' + b'') \ No newline at end of file