summaryrefslogtreecommitdiff
path: root/notification/atomformat.py
diff options
context:
space:
mode:
Diffstat (limited to 'notification/atomformat.py')
-rw-r--r--notification/atomformat.py547
1 files changed, 547 insertions, 0 deletions
diff --git a/notification/atomformat.py b/notification/atomformat.py
new file mode 100644
index 0000000..a307edd
--- /dev/null
+++ b/notification/atomformat.py
@@ -0,0 +1,547 @@
+#
+# django-atompub by James Tauber <http://jtauber.com/>
+# http://code.google.com/p/django-atompub/
+# An implementation of the Atom format and protocol for Django
+#
+# For instructions on how to use this module to generate Atom feeds,
+# see http://code.google.com/p/django-atompub/wiki/UserGuide
+#
+#
+# Copyright (c) 2007, James Tauber
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+
+from xml.sax.saxutils import XMLGenerator
+from datetime import datetime
+
+
+GENERATOR_TEXT = 'django-atompub'
+GENERATOR_ATTR = {
+ 'uri': 'http://code.google.com/p/django-atompub/',
+ 'version': 'r33'
+}
+
+
+
+## based on django.utils.xmlutils.SimplerXMLGenerator
+class SimplerXMLGenerator(XMLGenerator):
+ def addQuickElement(self, name, contents=None, attrs=None):
+ "Convenience method for adding an element with no children"
+ if attrs is None: attrs = {}
+ self.startElement(name, attrs)
+ if contents is not None:
+ self.characters(contents)
+ self.endElement(name)
+
+
+
+## based on django.utils.feedgenerator.rfc3339_date
+def rfc3339_date(date):
+ return date.strftime('%Y-%m-%dT%H:%M:%SZ')
+
+
+
+## based on django.utils.feedgenerator.get_tag_uri
+def get_tag_uri(url, date):
+ "Creates a TagURI. See http://diveintomark.org/archives/2004/05/28/howto-atom-id"
+ parts = urlparse.urlparse(url)
+ date_part = ""
+ if date is not None:
+ date_part = ",%s:" % date.strftime("%Y-%m-%d")
+ return "tag:%s%s%s/%s" % (
+ parts.hostname,
+ date_part,
+ parts.path,
+ parts.fragment,
+ )
+
+
+
+## based on django.contrib.syndication.feeds.Feed
+class Feed(object):
+
+
+ VALIDATE = True
+
+
+ def __init__(self, slug, feed_url):
+ # @@@ slug and feed_url are not used yet
+ pass
+
+
+ def __get_dynamic_attr(self, attname, obj, default=None):
+ try:
+ attr = getattr(self, attname)
+ except AttributeError:
+ return default
+ if callable(attr):
+ # Check func_code.co_argcount rather than try/excepting the
+ # function and catching the TypeError, because something inside
+ # the function may raise the TypeError. This technique is more
+ # accurate.
+ if hasattr(attr, 'func_code'):
+ argcount = attr.func_code.co_argcount
+ else:
+ argcount = attr.__call__.func_code.co_argcount
+ if argcount == 2: # one argument is 'self'
+ return attr(obj)
+ else:
+ return attr()
+ return attr
+
+
+ def get_feed(self, extra_params=None):
+
+ if extra_params:
+ try:
+ obj = self.get_object(extra_params.split('/'))
+ except (AttributeError, LookupError):
+ raise LookupError('Feed does not exist')
+ else:
+ obj = None
+
+ feed = AtomFeed(
+ atom_id = self.__get_dynamic_attr('feed_id', obj),
+ title = self.__get_dynamic_attr('feed_title', obj),
+ updated = self.__get_dynamic_attr('feed_updated', obj),
+ icon = self.__get_dynamic_attr('feed_icon', obj),
+ logo = self.__get_dynamic_attr('feed_logo', obj),
+ rights = self.__get_dynamic_attr('feed_rights', obj),
+ subtitle = self.__get_dynamic_attr('feed_subtitle', obj),
+ authors = self.__get_dynamic_attr('feed_authors', obj, default=[]),
+ categories = self.__get_dynamic_attr('feed_categories', obj, default=[]),
+ contributors = self.__get_dynamic_attr('feed_contributors', obj, default=[]),
+ links = self.__get_dynamic_attr('feed_links', obj, default=[]),
+ extra_attrs = self.__get_dynamic_attr('feed_extra_attrs', obj),
+ hide_generator = self.__get_dynamic_attr('hide_generator', obj, default=False)
+ )
+
+ items = self.__get_dynamic_attr('items', obj)
+ if items is None:
+ raise LookupError('Feed has no items field')
+
+ for item in items:
+ feed.add_item(
+ atom_id = self.__get_dynamic_attr('item_id', item),
+ title = self.__get_dynamic_attr('item_title', item),
+ updated = self.__get_dynamic_attr('item_updated', item),
+ content = self.__get_dynamic_attr('item_content', item),
+ published = self.__get_dynamic_attr('item_published', item),
+ rights = self.__get_dynamic_attr('item_rights', item),
+ source = self.__get_dynamic_attr('item_source', item),
+ summary = self.__get_dynamic_attr('item_summary', item),
+ authors = self.__get_dynamic_attr('item_authors', item, default=[]),
+ categories = self.__get_dynamic_attr('item_categories', item, default=[]),
+ contributors = self.__get_dynamic_attr('item_contributors', item, default=[]),
+ links = self.__get_dynamic_attr('item_links', item, default=[]),
+ extra_attrs = self.__get_dynamic_attr('item_extra_attrs', None, default={}),
+ )
+
+ if self.VALIDATE:
+ feed.validate()
+ return feed
+
+
+
+class ValidationError(Exception):
+ pass
+
+
+
+## based on django.utils.feedgenerator.SyndicationFeed and django.utils.feedgenerator.Atom1Feed
+class AtomFeed(object):
+
+
+ mime_type = 'application/atom+xml'
+ ns = u'http://www.w3.org/2005/Atom'
+
+
+ def __init__(self, atom_id, title, updated=None, icon=None, logo=None, rights=None, subtitle=None,
+ authors=[], categories=[], contributors=[], links=[], extra_attrs={}, hide_generator=False):
+ if atom_id is None:
+ raise LookupError('Feed has no feed_id field')
+ if title is None:
+ raise LookupError('Feed has no feed_title field')
+ # if updated == None, we'll calculate it
+ self.feed = {
+ 'id': atom_id,
+ 'title': title,
+ 'updated': updated,
+ 'icon': icon,
+ 'logo': logo,
+ 'rights': rights,
+ 'subtitle': subtitle,
+ 'authors': authors,
+ 'categories': categories,
+ 'contributors': contributors,
+ 'links': links,
+ 'extra_attrs': extra_attrs,
+ 'hide_generator': hide_generator,
+ }
+ self.items = []
+
+
+ def add_item(self, atom_id, title, updated, content=None, published=None, rights=None, source=None, summary=None,
+ authors=[], categories=[], contributors=[], links=[], extra_attrs={}):
+ if atom_id is None:
+ raise LookupError('Feed has no item_id method')
+ if title is None:
+ raise LookupError('Feed has no item_title method')
+ if updated is None:
+ raise LookupError('Feed has no item_updated method')
+ self.items.append({
+ 'id': atom_id,
+ 'title': title,
+ 'updated': updated,
+ 'content': content,
+ 'published': published,
+ 'rights': rights,
+ 'source': source,
+ 'summary': summary,
+ 'authors': authors,
+ 'categories': categories,
+ 'contributors': contributors,
+ 'links': links,
+ 'extra_attrs': extra_attrs,
+ })
+
+
+ def latest_updated(self):
+ """
+ Returns the latest item's updated or the current time if there are no items.
+ """
+ updates = [item['updated'] for item in self.items]
+ if len(updates) > 0:
+ updates.sort()
+ return updates[-1]
+ else:
+ return datetime.now() # @@@ really we should allow a feed to define its "start" for this case
+
+
+ def write_text_construct(self, handler, element_name, data):
+ if isinstance(data, tuple):
+ text_type, text = data
+ if text_type == 'xhtml':
+ handler.startElement(element_name, {'type': text_type})
+ handler._write(text) # write unescaped -- it had better be well-formed XML
+ handler.endElement(element_name)
+ else:
+ handler.addQuickElement(element_name, text, {'type': text_type})
+ else:
+ handler.addQuickElement(element_name, data)
+
+
+ def write_person_construct(self, handler, element_name, person):
+ handler.startElement(element_name, {})
+ handler.addQuickElement(u'name', person['name'])
+ if 'uri' in person:
+ handler.addQuickElement(u'uri', person['uri'])
+ if 'email' in person:
+ handler.addQuickElement(u'email', person['email'])
+ handler.endElement(element_name)
+
+
+ def write_link_construct(self, handler, link):
+ if 'length' in link:
+ link['length'] = str(link['length'])
+ handler.addQuickElement(u'link', None, link)
+
+
+ def write_category_construct(self, handler, category):
+ handler.addQuickElement(u'category', None, category)
+
+
+ def write_source(self, handler, data):
+ handler.startElement(u'source', {})
+ if data.get('id'):
+ handler.addQuickElement(u'id', data['id'])
+ if data.get('title'):
+ self.write_text_construct(handler, u'title', data['title'])
+ if data.get('subtitle'):
+ self.write_text_construct(handler, u'subtitle', data['subtitle'])
+ if data.get('icon'):
+ handler.addQuickElement(u'icon', data['icon'])
+ if data.get('logo'):
+ handler.addQuickElement(u'logo', data['logo'])
+ if data.get('updated'):
+ handler.addQuickElement(u'updated', rfc3339_date(data['updated']))
+ for category in data.get('categories', []):
+ self.write_category_construct(handler, category)
+ for link in data.get('links', []):
+ self.write_link_construct(handler, link)
+ for author in data.get('authors', []):
+ self.write_person_construct(handler, u'author', author)
+ for contributor in data.get('contributors', []):
+ self.write_person_construct(handler, u'contributor', contributor)
+ if data.get('rights'):
+ self.write_text_construct(handler, u'rights', data['rights'])
+ handler.endElement(u'source')
+
+
+ def write_content(self, handler, data):
+ if isinstance(data, tuple):
+ content_dict, text = data
+ if content_dict.get('type') == 'xhtml':
+ handler.startElement(u'content', content_dict)
+ handler._write(text) # write unescaped -- it had better be well-formed XML
+ handler.endElement(u'content')
+ else:
+ handler.addQuickElement(u'content', text, content_dict)
+ else:
+ handler.addQuickElement(u'content', data)
+
+
+ def write(self, outfile, encoding):
+ handler = SimplerXMLGenerator(outfile, encoding)
+ handler.startDocument()
+ feed_attrs = {u'xmlns': self.ns}
+ if self.feed.get('extra_attrs'):
+ feed_attrs.update(self.feed['extra_attrs'])
+ handler.startElement(u'feed', feed_attrs)
+ handler.addQuickElement(u'id', self.feed['id'])
+ self.write_text_construct(handler, u'title', self.feed['title'])
+ if self.feed.get('subtitle'):
+ self.write_text_construct(handler, u'subtitle', self.feed['subtitle'])
+ if self.feed.get('icon'):
+ handler.addQuickElement(u'icon', self.feed['icon'])
+ if self.feed.get('logo'):
+ handler.addQuickElement(u'logo', self.feed['logo'])
+ if self.feed['updated']:
+ handler.addQuickElement(u'updated', rfc3339_date(self.feed['updated']))
+ else:
+ handler.addQuickElement(u'updated', rfc3339_date(self.latest_updated()))
+ for category in self.feed['categories']:
+ self.write_category_construct(handler, category)
+ for link in self.feed['links']:
+ self.write_link_construct(handler, link)
+ for author in self.feed['authors']:
+ self.write_person_construct(handler, u'author', author)
+ for contributor in self.feed['contributors']:
+ self.write_person_construct(handler, u'contributor', contributor)
+ if self.feed.get('rights'):
+ self.write_text_construct(handler, u'rights', self.feed['rights'])
+ if not self.feed.get('hide_generator'):
+ handler.addQuickElement(u'generator', GENERATOR_TEXT, GENERATOR_ATTR)
+
+ self.write_items(handler)
+
+ handler.endElement(u'feed')
+
+
+ def write_items(self, handler):
+ for item in self.items:
+ entry_attrs = item.get('extra_attrs', {})
+ handler.startElement(u'entry', entry_attrs)
+
+ handler.addQuickElement(u'id', item['id'])
+ self.write_text_construct(handler, u'title', item['title'])
+ handler.addQuickElement(u'updated', rfc3339_date(item['updated']))
+ if item.get('published'):
+ handler.addQuickElement(u'published', rfc3339_date(item['published']))
+ if item.get('rights'):
+ self.write_text_construct(handler, u'rights', item['rights'])
+ if item.get('source'):
+ self.write_source(handler, item['source'])
+
+ for author in item['authors']:
+ self.write_person_construct(handler, u'author', author)
+ for contributor in item['contributors']:
+ self.write_person_construct(handler, u'contributor', contributor)
+ for category in item['categories']:
+ self.write_category_construct(handler, category)
+ for link in item['links']:
+ self.write_link_construct(handler, link)
+ if item.get('summary'):
+ self.write_text_construct(handler, u'summary', item['summary'])
+ if item.get('content'):
+ self.write_content(handler, item['content'])
+
+ handler.endElement(u'entry')
+
+
+ def validate(self):
+
+ def validate_text_construct(obj):
+ if isinstance(obj, tuple):
+ if obj[0] not in ['text', 'html', 'xhtml']:
+ return False
+ # @@@ no validation is done that 'html' text constructs are valid HTML
+ # @@@ no validation is done that 'xhtml' text constructs are well-formed XML or valid XHTML
+
+ return True
+
+ if not validate_text_construct(self.feed['title']):
+ raise ValidationError('feed title has invalid type')
+ if self.feed.get('subtitle'):
+ if not validate_text_construct(self.feed['subtitle']):
+ raise ValidationError('feed subtitle has invalid type')
+ if self.feed.get('rights'):
+ if not validate_text_construct(self.feed['rights']):
+ raise ValidationError('feed rights has invalid type')
+
+ alternate_links = {}
+ for link in self.feed.get('links'):
+ if link.get('rel') == 'alternate' or link.get('rel') == None:
+ key = (link.get('type'), link.get('hreflang'))
+ if key in alternate_links:
+ raise ValidationError('alternate links must have unique type/hreflang')
+ alternate_links[key] = link
+
+ if self.feed.get('authors'):
+ feed_author = True
+ else:
+ feed_author = False
+
+ for item in self.items:
+ if not feed_author and not item.get('authors'):
+ if item.get('source') and item['source'].get('authors'):
+ pass
+ else:
+ raise ValidationError('if no feed author, all entries must have author (possibly in source)')
+
+ if not validate_text_construct(item['title']):
+ raise ValidationError('entry title has invalid type')
+ if item.get('rights'):
+ if not validate_text_construct(item['rights']):
+ raise ValidationError('entry rights has invalid type')
+ if item.get('summary'):
+ if not validate_text_construct(item['summary']):
+ raise ValidationError('entry summary has invalid type')
+ source = item.get('source')
+ if source:
+ if source.get('title'):
+ if not validate_text_construct(source['title']):
+ raise ValidationError('source title has invalid type')
+ if source.get('subtitle'):
+ if not validate_text_construct(source['subtitle']):
+ raise ValidationError('source subtitle has invalid type')
+ if source.get('rights'):
+ if not validate_text_construct(source['rights']):
+ raise ValidationError('source rights has invalid type')
+
+ alternate_links = {}
+ for link in item.get('links'):
+ if link.get('rel') == 'alternate' or link.get('rel') == None:
+ key = (link.get('type'), link.get('hreflang'))
+ if key in alternate_links:
+ raise ValidationError('alternate links must have unique type/hreflang')
+ alternate_links[key] = link
+
+ if not item.get('content'):
+ if not alternate_links:
+ raise ValidationError('if no content, entry must have alternate link')
+
+ if item.get('content') and isinstance(item.get('content'), tuple):
+ content_type = item.get('content')[0].get('type')
+ if item.get('content')[0].get('src'):
+ if item.get('content')[1]:
+ raise ValidationError('content with src should be empty')
+ if not item.get('summary'):
+ raise ValidationError('content with src requires a summary too')
+ if content_type in ['text', 'html', 'xhtml']:
+ raise ValidationError('content with src cannot have type of text, html or xhtml')
+ if content_type:
+ if '/' in content_type and \
+ not content_type.startswith('text/') and \
+ not content_type.endswith('/xml') and not content_type.endswith('+xml') and \
+ not content_type in ['application/xml-external-parsed-entity', 'application/xml-dtd']:
+ # @@@ check content is Base64
+ if not item.get('summary'):
+ raise ValidationError('content in Base64 requires a summary too')
+ if content_type not in ['text', 'html', 'xhtml'] and '/' not in content_type:
+ raise ValidationError('content type does not appear to be valid')
+
+ # @@@ no validation is done that 'html' text constructs are valid HTML
+ # @@@ no validation is done that 'xhtml' text constructs are well-formed XML or valid XHTML
+
+ return
+
+ return
+
+
+
+class LegacySyndicationFeed(AtomFeed):
+ """
+ Provides an SyndicationFeed-compatible interface in its __init__ and
+ add_item but is really a new AtomFeed object.
+ """
+
+ def __init__(self, title, link, description, language=None, author_email=None,
+ author_name=None, author_link=None, subtitle=None, categories=[],
+ feed_url=None, feed_copyright=None):
+
+ atom_id = link
+ title = title
+ updated = None # will be calculated
+ rights = feed_copyright
+ subtitle = subtitle
+ author_dict = {'name': author_name}
+ if author_link:
+ author_dict['uri'] = author_uri
+ if author_email:
+ author_dict['email'] = author_email
+ authors = [author_dict]
+ if categories:
+ categories = [{'term': term} for term in categories]
+ links = [{'rel': 'alternate', 'href': link}]
+ if feed_url:
+ links.append({'rel': 'self', 'href': feed_url})
+ if language:
+ extra_attrs = {'xml:lang': language}
+ else:
+ extra_attrs = {}
+
+ # description ignored (as with Atom1Feed)
+
+ AtomFeed.__init__(self, atom_id, title, updated, rights=rights, subtitle=subtitle,
+ authors=authors, categories=categories, links=links, extra_attrs=extra_attrs)
+
+
+ def add_item(self, title, link, description, author_email=None,
+ author_name=None, author_link=None, pubdate=None, comments=None,
+ unique_id=None, enclosure=None, categories=[], item_copyright=None):
+
+ if unique_id:
+ atom_id = unique_id
+ else:
+ atom_id = get_tag_uri(link, pubdate)
+ title = title
+ updated = pubdate
+ if item_copyright:
+ rights = item_copyright
+ else:
+ rights = None
+ if description:
+ summary = 'html', description
+ else:
+ summary = None
+ author_dict = {'name': author_name}
+ if author_link:
+ author_dict['uri'] = author_uri
+ if author_email:
+ author_dict['email'] = author_email
+ authors = [author_dict]
+ categories = [{'term': term} for term in categories]
+ links = [{'rel': 'alternate', 'href': link}]
+ if enclosure:
+ links.append({'rel': 'enclosure', 'href': enclosure.url, 'length': enclosure.length, 'type': enclosure.mime_type})
+
+ AtomFeed.add_item(self, atom_id, title, updated, rights=rights, summary=summary,
+ authors=authors, categories=categories, links=links)