basic subscriptions system

This commit is contained in:
James Taylor
2019-02-16 23:41:52 -08:00
parent 24642455d0
commit 3905e7e640
57 changed files with 12440 additions and 23 deletions

12
python/atoma/__init__.py Normal file
View File

@@ -0,0 +1,12 @@
from .atom import parse_atom_file, parse_atom_bytes
from .rss import parse_rss_file, parse_rss_bytes
from .json_feed import (
parse_json_feed, parse_json_feed_file, parse_json_feed_bytes
)
from .opml import parse_opml_file, parse_opml_bytes
from .exceptions import (
FeedParseError, FeedDocumentError, FeedXMLError, FeedJSONError
)
from .const import VERSION
__version__ = VERSION

284
python/atoma/atom.py Normal file
View File

@@ -0,0 +1,284 @@
from datetime import datetime
import enum
from io import BytesIO
from typing import Optional, List
from xml.etree.ElementTree import Element
import attr
from .utils import (
parse_xml, get_child, get_text, get_datetime, FeedParseError, ns
)
class AtomTextType(enum.Enum):
text = "text"
html = "html"
xhtml = "xhtml"
@attr.s
class AtomTextConstruct:
text_type: str = attr.ib()
lang: Optional[str] = attr.ib()
value: str = attr.ib()
@attr.s
class AtomEntry:
title: AtomTextConstruct = attr.ib()
id_: str = attr.ib()
# Should be mandatory but many feeds use published instead
updated: Optional[datetime] = attr.ib()
authors: List['AtomPerson'] = attr.ib()
contributors: List['AtomPerson'] = attr.ib()
links: List['AtomLink'] = attr.ib()
categories: List['AtomCategory'] = attr.ib()
published: Optional[datetime] = attr.ib()
rights: Optional[AtomTextConstruct] = attr.ib()
summary: Optional[AtomTextConstruct] = attr.ib()
content: Optional[AtomTextConstruct] = attr.ib()
source: Optional['AtomFeed'] = attr.ib()
@attr.s
class AtomFeed:
title: Optional[AtomTextConstruct] = attr.ib()
id_: str = attr.ib()
# Should be mandatory but many feeds do not include it
updated: Optional[datetime] = attr.ib()
authors: List['AtomPerson'] = attr.ib()
contributors: List['AtomPerson'] = attr.ib()
links: List['AtomLink'] = attr.ib()
categories: List['AtomCategory'] = attr.ib()
generator: Optional['AtomGenerator'] = attr.ib()
subtitle: Optional[AtomTextConstruct] = attr.ib()
rights: Optional[AtomTextConstruct] = attr.ib()
icon: Optional[str] = attr.ib()
logo: Optional[str] = attr.ib()
entries: List[AtomEntry] = attr.ib()
@attr.s
class AtomPerson:
name: str = attr.ib()
uri: Optional[str] = attr.ib()
email: Optional[str] = attr.ib()
@attr.s
class AtomLink:
href: str = attr.ib()
rel: Optional[str] = attr.ib()
type_: Optional[str] = attr.ib()
hreflang: Optional[str] = attr.ib()
title: Optional[str] = attr.ib()
length: Optional[int] = attr.ib()
@attr.s
class AtomCategory:
term: str = attr.ib()
scheme: Optional[str] = attr.ib()
label: Optional[str] = attr.ib()
@attr.s
class AtomGenerator:
name: str = attr.ib()
uri: Optional[str] = attr.ib()
version: Optional[str] = attr.ib()
def _get_generator(element: Element, name,
optional: bool=True) -> Optional[AtomGenerator]:
child = get_child(element, name, optional)
if child is None:
return None
return AtomGenerator(
child.text.strip(),
child.attrib.get('uri'),
child.attrib.get('version'),
)
def _get_text_construct(element: Element, name,
optional: bool=True) -> Optional[AtomTextConstruct]:
child = get_child(element, name, optional)
if child is None:
return None
try:
text_type = AtomTextType(child.attrib['type'])
except KeyError:
text_type = AtomTextType.text
try:
lang = child.lang
except AttributeError:
lang = None
if child.text is None:
if optional:
return None
raise FeedParseError(
'Could not parse atom feed: "{}" text is required but is empty'
.format(name)
)
return AtomTextConstruct(
text_type,
lang,
child.text.strip()
)
def _get_person(element: Element) -> Optional[AtomPerson]:
try:
return AtomPerson(
get_text(element, 'feed:name', optional=False),
get_text(element, 'feed:uri'),
get_text(element, 'feed:email')
)
except FeedParseError:
return None
def _get_link(element: Element) -> AtomLink:
length = element.attrib.get('length')
length = int(length) if length else None
return AtomLink(
element.attrib['href'],
element.attrib.get('rel'),
element.attrib.get('type'),
element.attrib.get('hreflang'),
element.attrib.get('title'),
length
)
def _get_category(element: Element) -> AtomCategory:
return AtomCategory(
element.attrib['term'],
element.attrib.get('scheme'),
element.attrib.get('label'),
)
def _get_entry(element: Element,
default_authors: List[AtomPerson]) -> AtomEntry:
root = element
# Mandatory
title = _get_text_construct(root, 'feed:title')
id_ = get_text(root, 'feed:id')
# Optional
try:
source = _parse_atom(get_child(root, 'feed:source', optional=False),
parse_entries=False)
except FeedParseError:
source = None
source_authors = []
else:
source_authors = source.authors
authors = [_get_person(e)
for e in root.findall('feed:author', ns)] or default_authors
authors = [a for a in authors if a is not None]
authors = authors or default_authors or source_authors
contributors = [_get_person(e)
for e in root.findall('feed:contributor', ns) if e]
contributors = [c for c in contributors if c is not None]
links = [_get_link(e) for e in root.findall('feed:link', ns)]
categories = [_get_category(e) for e in root.findall('feed:category', ns)]
updated = get_datetime(root, 'feed:updated')
published = get_datetime(root, 'feed:published')
rights = _get_text_construct(root, 'feed:rights')
summary = _get_text_construct(root, 'feed:summary')
content = _get_text_construct(root, 'feed:content')
return AtomEntry(
title,
id_,
updated,
authors,
contributors,
links,
categories,
published,
rights,
summary,
content,
source
)
def _parse_atom(root: Element, parse_entries: bool=True) -> AtomFeed:
# Mandatory
id_ = get_text(root, 'feed:id', optional=False)
# Optional
title = _get_text_construct(root, 'feed:title')
updated = get_datetime(root, 'feed:updated')
authors = [_get_person(e)
for e in root.findall('feed:author', ns) if e]
authors = [a for a in authors if a is not None]
contributors = [_get_person(e)
for e in root.findall('feed:contributor', ns) if e]
contributors = [c for c in contributors if c is not None]
links = [_get_link(e)
for e in root.findall('feed:link', ns)]
categories = [_get_category(e)
for e in root.findall('feed:category', ns)]
generator = _get_generator(root, 'feed:generator')
subtitle = _get_text_construct(root, 'feed:subtitle')
rights = _get_text_construct(root, 'feed:rights')
icon = get_text(root, 'feed:icon')
logo = get_text(root, 'feed:logo')
if parse_entries:
entries = [_get_entry(e, authors)
for e in root.findall('feed:entry', ns)]
else:
entries = []
atom_feed = AtomFeed(
title,
id_,
updated,
authors,
contributors,
links,
categories,
generator,
subtitle,
rights,
icon,
logo,
entries
)
return atom_feed
def parse_atom_file(filename: str) -> AtomFeed:
"""Parse an Atom feed from a local XML file."""
root = parse_xml(filename).getroot()
return _parse_atom(root)
def parse_atom_bytes(data: bytes) -> AtomFeed:
"""Parse an Atom feed from a byte-string containing XML data."""
root = parse_xml(BytesIO(data)).getroot()
return _parse_atom(root)

1
python/atoma/const.py Normal file
View File

@@ -0,0 +1 @@
VERSION = '0.0.13'

View File

@@ -0,0 +1,14 @@
class FeedParseError(Exception):
"""Document is an invalid feed."""
class FeedDocumentError(Exception):
"""Document is not a supported file."""
class FeedXMLError(FeedDocumentError):
"""Document is not valid XML."""
class FeedJSONError(FeedDocumentError):
"""Document is not valid JSON."""

223
python/atoma/json_feed.py Normal file
View File

@@ -0,0 +1,223 @@
from datetime import datetime, timedelta
import json
from typing import Optional, List
import attr
from .exceptions import FeedParseError, FeedJSONError
from .utils import try_parse_date
@attr.s
class JSONFeedAuthor:
name: Optional[str] = attr.ib()
url: Optional[str] = attr.ib()
avatar: Optional[str] = attr.ib()
@attr.s
class JSONFeedAttachment:
url: str = attr.ib()
mime_type: str = attr.ib()
title: Optional[str] = attr.ib()
size_in_bytes: Optional[int] = attr.ib()
duration: Optional[timedelta] = attr.ib()
@attr.s
class JSONFeedItem:
id_: str = attr.ib()
url: Optional[str] = attr.ib()
external_url: Optional[str] = attr.ib()
title: Optional[str] = attr.ib()
content_html: Optional[str] = attr.ib()
content_text: Optional[str] = attr.ib()
summary: Optional[str] = attr.ib()
image: Optional[str] = attr.ib()
banner_image: Optional[str] = attr.ib()
date_published: Optional[datetime] = attr.ib()
date_modified: Optional[datetime] = attr.ib()
author: Optional[JSONFeedAuthor] = attr.ib()
tags: List[str] = attr.ib()
attachments: List[JSONFeedAttachment] = attr.ib()
@attr.s
class JSONFeed:
version: str = attr.ib()
title: str = attr.ib()
home_page_url: Optional[str] = attr.ib()
feed_url: Optional[str] = attr.ib()
description: Optional[str] = attr.ib()
user_comment: Optional[str] = attr.ib()
next_url: Optional[str] = attr.ib()
icon: Optional[str] = attr.ib()
favicon: Optional[str] = attr.ib()
author: Optional[JSONFeedAuthor] = attr.ib()
expired: bool = attr.ib()
items: List[JSONFeedItem] = attr.ib()
def _get_items(root: dict) -> List[JSONFeedItem]:
rv = []
items = root.get('items', [])
if not items:
return rv
for item in items:
rv.append(_get_item(item))
return rv
def _get_item(item_dict: dict) -> JSONFeedItem:
return JSONFeedItem(
id_=_get_text(item_dict, 'id', optional=False),
url=_get_text(item_dict, 'url'),
external_url=_get_text(item_dict, 'external_url'),
title=_get_text(item_dict, 'title'),
content_html=_get_text(item_dict, 'content_html'),
content_text=_get_text(item_dict, 'content_text'),
summary=_get_text(item_dict, 'summary'),
image=_get_text(item_dict, 'image'),
banner_image=_get_text(item_dict, 'banner_image'),
date_published=_get_datetime(item_dict, 'date_published'),
date_modified=_get_datetime(item_dict, 'date_modified'),
author=_get_author(item_dict),
tags=_get_tags(item_dict, 'tags'),
attachments=_get_attachments(item_dict, 'attachments')
)
def _get_attachments(root, name) -> List[JSONFeedAttachment]:
rv = list()
for attachment_dict in root.get(name, []):
rv.append(JSONFeedAttachment(
_get_text(attachment_dict, 'url', optional=False),
_get_text(attachment_dict, 'mime_type', optional=False),
_get_text(attachment_dict, 'title'),
_get_int(attachment_dict, 'size_in_bytes'),
_get_duration(attachment_dict, 'duration_in_seconds')
))
return rv
def _get_tags(root, name) -> List[str]:
tags = root.get(name, [])
return [tag for tag in tags if isinstance(tag, str)]
def _get_datetime(root: dict, name, optional: bool=True) -> Optional[datetime]:
text = _get_text(root, name, optional)
if text is None:
return None
return try_parse_date(text)
def _get_expired(root: dict) -> bool:
if root.get('expired') is True:
return True
return False
def _get_author(root: dict) -> Optional[JSONFeedAuthor]:
author_dict = root.get('author')
if not author_dict:
return None
rv = JSONFeedAuthor(
name=_get_text(author_dict, 'name'),
url=_get_text(author_dict, 'url'),
avatar=_get_text(author_dict, 'avatar'),
)
if rv.name is None and rv.url is None and rv.avatar is None:
return None
return rv
def _get_int(root: dict, name: str, optional: bool=True) -> Optional[int]:
rv = root.get(name)
if not optional and rv is None:
raise FeedParseError('Could not parse feed: "{}" int is required but '
'is empty'.format(name))
if optional and rv is None:
return None
if not isinstance(rv, int):
raise FeedParseError('Could not parse feed: "{}" is not an int'
.format(name))
return rv
def _get_duration(root: dict, name: str,
optional: bool=True) -> Optional[timedelta]:
duration = _get_int(root, name, optional)
if duration is None:
return None
return timedelta(seconds=duration)
def _get_text(root: dict, name: str, optional: bool=True) -> Optional[str]:
rv = root.get(name)
if not optional and rv is None:
raise FeedParseError('Could not parse feed: "{}" text is required but '
'is empty'.format(name))
if optional and rv is None:
return None
if not isinstance(rv, str):
raise FeedParseError('Could not parse feed: "{}" is not a string'
.format(name))
return rv
def parse_json_feed(root: dict) -> JSONFeed:
return JSONFeed(
version=_get_text(root, 'version', optional=False),
title=_get_text(root, 'title', optional=False),
home_page_url=_get_text(root, 'home_page_url'),
feed_url=_get_text(root, 'feed_url'),
description=_get_text(root, 'description'),
user_comment=_get_text(root, 'user_comment'),
next_url=_get_text(root, 'next_url'),
icon=_get_text(root, 'icon'),
favicon=_get_text(root, 'favicon'),
author=_get_author(root),
expired=_get_expired(root),
items=_get_items(root)
)
def parse_json_feed_file(filename: str) -> JSONFeed:
"""Parse a JSON feed from a local json file."""
with open(filename) as f:
try:
root = json.load(f)
except json.decoder.JSONDecodeError:
raise FeedJSONError('Not a valid JSON document')
return parse_json_feed(root)
def parse_json_feed_bytes(data: bytes) -> JSONFeed:
"""Parse a JSON feed from a byte-string containing JSON data."""
try:
root = json.loads(data)
except json.decoder.JSONDecodeError:
raise FeedJSONError('Not a valid JSON document')
return parse_json_feed(root)

107
python/atoma/opml.py Normal file
View File

@@ -0,0 +1,107 @@
from datetime import datetime
from io import BytesIO
from typing import Optional, List
from xml.etree.ElementTree import Element
import attr
from .utils import parse_xml, get_text, get_int, get_datetime
@attr.s
class OPMLOutline:
text: Optional[str] = attr.ib()
type: Optional[str] = attr.ib()
xml_url: Optional[str] = attr.ib()
description: Optional[str] = attr.ib()
html_url: Optional[str] = attr.ib()
language: Optional[str] = attr.ib()
title: Optional[str] = attr.ib()
version: Optional[str] = attr.ib()
outlines: List['OPMLOutline'] = attr.ib()
@attr.s
class OPML:
title: Optional[str] = attr.ib()
owner_name: Optional[str] = attr.ib()
owner_email: Optional[str] = attr.ib()
date_created: Optional[datetime] = attr.ib()
date_modified: Optional[datetime] = attr.ib()
expansion_state: Optional[str] = attr.ib()
vertical_scroll_state: Optional[int] = attr.ib()
window_top: Optional[int] = attr.ib()
window_left: Optional[int] = attr.ib()
window_bottom: Optional[int] = attr.ib()
window_right: Optional[int] = attr.ib()
outlines: List[OPMLOutline] = attr.ib()
def _get_outlines(element: Element) -> List[OPMLOutline]:
rv = list()
for outline in element.findall('outline'):
rv.append(OPMLOutline(
outline.attrib.get('text'),
outline.attrib.get('type'),
outline.attrib.get('xmlUrl'),
outline.attrib.get('description'),
outline.attrib.get('htmlUrl'),
outline.attrib.get('language'),
outline.attrib.get('title'),
outline.attrib.get('version'),
_get_outlines(outline)
))
return rv
def _parse_opml(root: Element) -> OPML:
head = root.find('head')
body = root.find('body')
return OPML(
get_text(head, 'title'),
get_text(head, 'ownerName'),
get_text(head, 'ownerEmail'),
get_datetime(head, 'dateCreated'),
get_datetime(head, 'dateModified'),
get_text(head, 'expansionState'),
get_int(head, 'vertScrollState'),
get_int(head, 'windowTop'),
get_int(head, 'windowLeft'),
get_int(head, 'windowBottom'),
get_int(head, 'windowRight'),
outlines=_get_outlines(body)
)
def parse_opml_file(filename: str) -> OPML:
"""Parse an OPML document from a local XML file."""
root = parse_xml(filename).getroot()
return _parse_opml(root)
def parse_opml_bytes(data: bytes) -> OPML:
"""Parse an OPML document from a byte-string containing XML data."""
root = parse_xml(BytesIO(data)).getroot()
return _parse_opml(root)
def get_feed_list(opml_obj: OPML) -> List[str]:
"""Walk an OPML document to extract the list of feed it contains."""
rv = list()
def collect(obj):
for outline in obj.outlines:
if outline.type == 'rss' and outline.xml_url:
rv.append(outline.xml_url)
if outline.outlines:
collect(outline)
collect(opml_obj)
return rv

221
python/atoma/rss.py Normal file
View File

@@ -0,0 +1,221 @@
from datetime import datetime
from io import BytesIO
from typing import Optional, List
from xml.etree.ElementTree import Element
import attr
from .utils import (
parse_xml, get_child, get_text, get_int, get_datetime, FeedParseError
)
@attr.s
class RSSImage:
url: str = attr.ib()
title: Optional[str] = attr.ib()
link: str = attr.ib()
width: int = attr.ib()
height: int = attr.ib()
description: Optional[str] = attr.ib()
@attr.s
class RSSEnclosure:
url: str = attr.ib()
length: Optional[int] = attr.ib()
type: Optional[str] = attr.ib()
@attr.s
class RSSSource:
title: str = attr.ib()
url: Optional[str] = attr.ib()
@attr.s
class RSSItem:
title: Optional[str] = attr.ib()
link: Optional[str] = attr.ib()
description: Optional[str] = attr.ib()
author: Optional[str] = attr.ib()
categories: List[str] = attr.ib()
comments: Optional[str] = attr.ib()
enclosures: List[RSSEnclosure] = attr.ib()
guid: Optional[str] = attr.ib()
pub_date: Optional[datetime] = attr.ib()
source: Optional[RSSSource] = attr.ib()
# Extension
content_encoded: Optional[str] = attr.ib()
@attr.s
class RSSChannel:
title: Optional[str] = attr.ib()
link: Optional[str] = attr.ib()
description: Optional[str] = attr.ib()
language: Optional[str] = attr.ib()
copyright: Optional[str] = attr.ib()
managing_editor: Optional[str] = attr.ib()
web_master: Optional[str] = attr.ib()
pub_date: Optional[datetime] = attr.ib()
last_build_date: Optional[datetime] = attr.ib()
categories: List[str] = attr.ib()
generator: Optional[str] = attr.ib()
docs: Optional[str] = attr.ib()
ttl: Optional[int] = attr.ib()
image: Optional[RSSImage] = attr.ib()
items: List[RSSItem] = attr.ib()
# Extension
content_encoded: Optional[str] = attr.ib()
def _get_image(element: Element, name,
optional: bool=True) -> Optional[RSSImage]:
child = get_child(element, name, optional)
if child is None:
return None
return RSSImage(
get_text(child, 'url', optional=False),
get_text(child, 'title'),
get_text(child, 'link', optional=False),
get_int(child, 'width') or 88,
get_int(child, 'height') or 31,
get_text(child, 'description')
)
def _get_source(element: Element, name,
optional: bool=True) -> Optional[RSSSource]:
child = get_child(element, name, optional)
if child is None:
return None
return RSSSource(
child.text.strip(),
child.attrib.get('url'),
)
def _get_enclosure(element: Element) -> RSSEnclosure:
length = element.attrib.get('length')
try:
length = int(length)
except (TypeError, ValueError):
length = None
return RSSEnclosure(
element.attrib['url'],
length,
element.attrib.get('type'),
)
def _get_link(element: Element) -> Optional[str]:
"""Attempt to retrieve item link.
Use the GUID as a fallback if it is a permalink.
"""
link = get_text(element, 'link')
if link is not None:
return link
guid = get_child(element, 'guid')
if guid is not None and guid.attrib.get('isPermaLink') == 'true':
return get_text(element, 'guid')
return None
def _get_item(element: Element) -> RSSItem:
root = element
title = get_text(root, 'title')
link = _get_link(root)
description = get_text(root, 'description')
author = get_text(root, 'author')
categories = [e.text for e in root.findall('category')]
comments = get_text(root, 'comments')
enclosure = [_get_enclosure(e) for e in root.findall('enclosure')]
guid = get_text(root, 'guid')
pub_date = get_datetime(root, 'pubDate')
source = _get_source(root, 'source')
content_encoded = get_text(root, 'content:encoded')
return RSSItem(
title,
link,
description,
author,
categories,
comments,
enclosure,
guid,
pub_date,
source,
content_encoded
)
def _parse_rss(root: Element) -> RSSChannel:
rss_version = root.get('version')
if rss_version != '2.0':
raise FeedParseError('Cannot process RSS feed version "{}"'
.format(rss_version))
root = root.find('channel')
title = get_text(root, 'title')
link = get_text(root, 'link')
description = get_text(root, 'description')
language = get_text(root, 'language')
copyright = get_text(root, 'copyright')
managing_editor = get_text(root, 'managingEditor')
web_master = get_text(root, 'webMaster')
pub_date = get_datetime(root, 'pubDate')
last_build_date = get_datetime(root, 'lastBuildDate')
categories = [e.text for e in root.findall('category')]
generator = get_text(root, 'generator')
docs = get_text(root, 'docs')
ttl = get_int(root, 'ttl')
image = _get_image(root, 'image')
items = [_get_item(e) for e in root.findall('item')]
content_encoded = get_text(root, 'content:encoded')
return RSSChannel(
title,
link,
description,
language,
copyright,
managing_editor,
web_master,
pub_date,
last_build_date,
categories,
generator,
docs,
ttl,
image,
items,
content_encoded
)
def parse_rss_file(filename: str) -> RSSChannel:
"""Parse an RSS feed from a local XML file."""
root = parse_xml(filename).getroot()
return _parse_rss(root)
def parse_rss_bytes(data: bytes) -> RSSChannel:
"""Parse an RSS feed from a byte-string containing XML data."""
root = parse_xml(BytesIO(data)).getroot()
return _parse_rss(root)

224
python/atoma/simple.py Normal file
View File

@@ -0,0 +1,224 @@
"""Simple API that abstracts away the differences between feed types."""
from datetime import datetime, timedelta
import html
import os
from typing import Optional, List, Tuple
import urllib.parse
import attr
from . import atom, rss, json_feed
from .exceptions import (
FeedParseError, FeedDocumentError, FeedXMLError, FeedJSONError
)
@attr.s
class Attachment:
link: str = attr.ib()
mime_type: Optional[str] = attr.ib()
title: Optional[str] = attr.ib()
size_in_bytes: Optional[int] = attr.ib()
duration: Optional[timedelta] = attr.ib()
@attr.s
class Article:
id: str = attr.ib()
title: Optional[str] = attr.ib()
link: Optional[str] = attr.ib()
content: str = attr.ib()
published_at: Optional[datetime] = attr.ib()
updated_at: Optional[datetime] = attr.ib()
attachments: List[Attachment] = attr.ib()
@attr.s
class Feed:
title: str = attr.ib()
subtitle: Optional[str] = attr.ib()
link: Optional[str] = attr.ib()
updated_at: Optional[datetime] = attr.ib()
articles: List[Article] = attr.ib()
def _adapt_atom_feed(atom_feed: atom.AtomFeed) -> Feed:
articles = list()
for entry in atom_feed.entries:
if entry.content is not None:
content = entry.content.value
elif entry.summary is not None:
content = entry.summary.value
else:
content = ''
published_at, updated_at = _get_article_dates(entry.published,
entry.updated)
# Find article link and attachments
article_link = None
attachments = list()
for candidate_link in entry.links:
if candidate_link.rel in ('alternate', None):
article_link = candidate_link.href
elif candidate_link.rel == 'enclosure':
attachments.append(Attachment(
title=_get_attachment_title(candidate_link.title,
candidate_link.href),
link=candidate_link.href,
mime_type=candidate_link.type_,
size_in_bytes=candidate_link.length,
duration=None
))
if entry.title is None:
entry_title = None
elif entry.title.text_type in (atom.AtomTextType.html,
atom.AtomTextType.xhtml):
entry_title = html.unescape(entry.title.value).strip()
else:
entry_title = entry.title.value
articles.append(Article(
entry.id_,
entry_title,
article_link,
content,
published_at,
updated_at,
attachments
))
# Find feed link
link = None
for candidate_link in atom_feed.links:
if candidate_link.rel == 'self':
link = candidate_link.href
break
return Feed(
atom_feed.title.value if atom_feed.title else atom_feed.id_,
atom_feed.subtitle.value if atom_feed.subtitle else None,
link,
atom_feed.updated,
articles
)
def _adapt_rss_channel(rss_channel: rss.RSSChannel) -> Feed:
articles = list()
for item in rss_channel.items:
attachments = [
Attachment(link=e.url, mime_type=e.type, size_in_bytes=e.length,
title=_get_attachment_title(None, e.url), duration=None)
for e in item.enclosures
]
articles.append(Article(
item.guid or item.link,
item.title,
item.link,
item.content_encoded or item.description or '',
item.pub_date,
None,
attachments
))
if rss_channel.title is None and rss_channel.link is None:
raise FeedParseError('RSS feed does not have a title nor a link')
return Feed(
rss_channel.title if rss_channel.title else rss_channel.link,
rss_channel.description,
rss_channel.link,
rss_channel.pub_date,
articles
)
def _adapt_json_feed(json_feed: json_feed.JSONFeed) -> Feed:
articles = list()
for item in json_feed.items:
attachments = [
Attachment(a.url, a.mime_type,
_get_attachment_title(a.title, a.url),
a.size_in_bytes, a.duration)
for a in item.attachments
]
articles.append(Article(
item.id_,
item.title,
item.url,
item.content_html or item.content_text or '',
item.date_published,
item.date_modified,
attachments
))
return Feed(
json_feed.title,
json_feed.description,
json_feed.feed_url,
None,
articles
)
def _get_article_dates(published_at: Optional[datetime],
updated_at: Optional[datetime]
) -> Tuple[Optional[datetime], Optional[datetime]]:
if published_at and updated_at:
return published_at, updated_at
if updated_at:
return updated_at, None
if published_at:
return published_at, None
raise FeedParseError('Article does not have proper dates')
def _get_attachment_title(attachment_title: Optional[str], link: str) -> str:
if attachment_title:
return attachment_title
parsed_link = urllib.parse.urlparse(link)
return os.path.basename(parsed_link.path)
def _simple_parse(pairs, content) -> Feed:
is_xml = True
is_json = True
for parser, adapter in pairs:
try:
return adapter(parser(content))
except FeedXMLError:
is_xml = False
except FeedJSONError:
is_json = False
except FeedParseError:
continue
if not is_xml and not is_json:
raise FeedDocumentError('File is not a supported feed type')
raise FeedParseError('File is not a valid supported feed')
def simple_parse_file(filename: str) -> Feed:
"""Parse an Atom, RSS or JSON feed from a local file."""
pairs = (
(rss.parse_rss_file, _adapt_rss_channel),
(atom.parse_atom_file, _adapt_atom_feed),
(json_feed.parse_json_feed_file, _adapt_json_feed)
)
return _simple_parse(pairs, filename)
def simple_parse_bytes(data: bytes) -> Feed:
"""Parse an Atom, RSS or JSON feed from a byte-string containing data."""
pairs = (
(rss.parse_rss_bytes, _adapt_rss_channel),
(atom.parse_atom_bytes, _adapt_atom_feed),
(json_feed.parse_json_feed_bytes, _adapt_json_feed)
)
return _simple_parse(pairs, data)

84
python/atoma/utils.py Normal file
View File

@@ -0,0 +1,84 @@
from datetime import datetime, timezone
from xml.etree.ElementTree import Element
from typing import Optional
import dateutil.parser
from defusedxml.ElementTree import parse as defused_xml_parse, ParseError
from .exceptions import FeedXMLError, FeedParseError
ns = {
'content': 'http://purl.org/rss/1.0/modules/content/',
'feed': 'http://www.w3.org/2005/Atom'
}
def parse_xml(xml_content):
try:
return defused_xml_parse(xml_content)
except ParseError:
raise FeedXMLError('Not a valid XML document')
def get_child(element: Element, name,
optional: bool=True) -> Optional[Element]:
child = element.find(name, namespaces=ns)
if child is None and not optional:
raise FeedParseError(
'Could not parse feed: "{}" does not have a "{}"'
.format(element.tag, name)
)
elif child is None:
return None
return child
def get_text(element: Element, name, optional: bool=True) -> Optional[str]:
child = get_child(element, name, optional)
if child is None:
return None
if child.text is None:
if optional:
return None
raise FeedParseError(
'Could not parse feed: "{}" text is required but is empty'
.format(name)
)
return child.text.strip()
def get_int(element: Element, name, optional: bool=True) -> Optional[int]:
text = get_text(element, name, optional)
if text is None:
return None
return int(text)
def get_datetime(element: Element, name,
optional: bool=True) -> Optional[datetime]:
text = get_text(element, name, optional)
if text is None:
return None
return try_parse_date(text)
def try_parse_date(date_str: str) -> Optional[datetime]:
try:
date = dateutil.parser.parse(date_str, fuzzy=True)
except (ValueError, OverflowError):
return None
if date.tzinfo is None:
# TZ naive datetime, make it a TZ aware datetime by assuming it
# contains UTC time
date = date.replace(tzinfo=timezone.utc)
return date