# Copyright (c) 2011, 2012 Free Software Foundation # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # This project incorporates work covered by the following copyright and permission notice: # Copyright (c) 2009, Julien Fache # All rights reserved. # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # * Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in # the documentation and/or other materials provided with the # distribution. # * Neither the name of the author nor the names of other # contributors may be used to endorse or promote products derived # from this software without specific prior written permission. # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, # STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED # OF THE POSSIBILITY OF SUCH DAMAGE. # Copyright (c) 2011, 2012 Free Software Foundation # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """WordPress to Gstudio command module""" import sys from datetime import datetime from optparse import make_option from xml.etree import ElementTree as ET from django.utils.html import strip_tags from django.db.utils import IntegrityError from django.utils.encoding import smart_str from django.contrib.auth.models import User from django.contrib.sites.models import Site from django.utils.text import truncate_words from django.template.defaultfilters import slugify from django.contrib import comments from django.core.management.base import CommandError from django.core.management.base import LabelCommand from tagging.models import Tag from gstudio import __version__ from gstudio.models import Nodetype from gstudio.models import Metatype from gstudio.signals import disconnect_gstudio_signals from gstudio.managers import DRAFT, HIDDEN, PUBLISHED WP_NS = 'http://wordpress.org/export/%s/' class Command(LabelCommand): """Command object for importing a WordPress blog into Gstudio via a WordPress eXtended RSS (WXR) file.""" help = 'Import a Wordpress blog into Gstudio.' label = 'WXR file' args = 'wordpress.xml' option_list = LabelCommand.option_list + ( make_option('--noautoexcerpt', action='store_false', dest='auto_excerpt', default=True, help='Do NOT generate an excerpt if not present.'), make_option('--author', dest='author', default='', help='All imported nodetypes belong to specified author'), make_option('--wxr_version', dest='wxr_version', default='1.0', help='Wordpress XML export version'), ) SITE = Site.objects.get_current() REVERSE_STATUS = {'pending': DRAFT, 'draft': DRAFT, 'auto-draft': DRAFT, 'inherit': DRAFT, 'publish': PUBLISHED, 'future': PUBLISHED, 'trash': HIDDEN, 'private': PUBLISHED} def __init__(self): """Init the Command and add custom styles""" super(Command, self).__init__() self.style.TITLE = self.style.SQL_FIELD self.style.STEP = self.style.SQL_COLTYPE self.style.ITEM = self.style.HTTP_INFO disconnect_gstudio_signals() def write_out(self, message, verbosity_level=1): """Convenient method for outputing""" if self.verbosity and self.verbosity >= verbosity_level: sys.stdout.write(smart_str(message)) sys.stdout.flush() def handle_label(self, wxr_file, **options): global WP_NS self.verbosity = int(options.get('verbosity', 1)) self.auto_excerpt = options.get('auto_excerpt', True) WP_NS = WP_NS % options.get('wxr_version') self.default_author = options.get('author') if self.default_author: try: self.default_author = User.objects.get( username=self.default_author) except User.DoesNotExist: raise CommandError('Invalid username for default author') self.write_out(self.style.TITLE( 'Starting migration from Wordpress to Gstudio %s:\n' % __version__)) tree = ET.parse(wxr_file) self.authors = self.import_authors(tree) self.metatypes = self.import_metatypes( tree.findall('channel/{%s}metatype' % WP_NS)) self.import_tags(tree.findall('channel/{%s}tag' % WP_NS)) self.import_nodetypes(tree.findall('channel/item')) def import_authors(self, tree): """Retrieve all the authors used in posts and convert it to new or existing user, and return the convertion""" self.write_out(self.style.STEP('- Importing authors\n')) post_authors = set() for item in tree.findall('channel/item'): post_type = item.find('{%s}post_type' % WP_NS).text if post_type == 'post': post_authors.add(item.find( '{http://purl.org/dc/elements/1.1/}creator').text) self.write_out('%i authors found.\n' % len(post_authors)) authors = {} for post_author in post_authors: if self.default_author: authors[post_author] = self.default_author else: authors[post_author] = self.migrate_author(post_author) return authors def migrate_author(self, author_name): """Handle actions for migrating the users""" action_text = "The author '%s' needs to be migrated to an User:\n"\ "1. Use an existing user ?\n"\ "2. Create a new user ?\n"\ "Please select a choice: " % author_name while 42: selection = raw_input(smart_str(action_text)) if selection in '12': break if selection == '1': users = User.objects.all() usernames = [user.username for user in users] while 42: user_text = "1. Select your user, by typing " \ "one of theses usernames:\n"\ "[%s]\n"\ "Please select a choice: " % ', '.join(usernames) user_selected = raw_input(user_text) if user_selected in usernames: break return users.get(username=user_selected) else: create_text = "2. Please type the email of the '%s' user: " % \ author_name author_mail = raw_input(create_text) try: return User.objects.create_user(author_name, author_mail) except IntegrityError: return User.objects.get(username=author_name) def import_metatypes(self, metatype_nodes): """Import all the metatypes from 'wp:metatype' nodes, because metatypes in 'item' nodes are not necessarily all the metatypes and returning it in a dict for database optimizations.""" self.write_out(self.style.STEP('- Importing metatypes\n')) metatypes = {} for metatype_node in metatype_nodes: title = metatype_node.find('{%s}cat_name' % WP_NS).text[:255] slug = metatype_node.find( '{%s}metatype_nicename' % WP_NS).text[:255] try: parent = metatype_node.find( '{%s}metatype_parent' % WP_NS).text[:255] except TypeError: parent = None self.write_out('> %s... ' % title) metatype, created = Metatype.objects.get_or_create( title=title, slug=slug, parent=metatypes.get(parent)) metatypes[title] = metatype self.write_out(self.style.ITEM('OK\n')) return metatypes def import_tags(self, tag_nodes): """Import all the tags form 'wp:tag' nodes, because tags in 'item' nodes are not necessarily all the tags, then use only the nicename, because it's like a slug and the true tag name may be not valid for url usage.""" self.write_out(self.style.STEP('- Importing tags\n')) for tag_node in tag_nodes: tag_name = tag_node.find( '{%s}tag_slug' % WP_NS).text[:50] self.write_out('> %s... ' % tag_name) Tag.objects.get_or_create(name=tag_name) self.write_out(self.style.ITEM('OK\n')) def get_nodetype_tags(self, metatypes): """Return a list of nodetype's tags, by using the nicename for url compatibility""" tags = [] for metatype in metatypes: domain = metatype.attrib.get('domain', 'metatype') if domain == 'tag' and metatype.attrib.get('nicename'): tags.append(metatype.attrib.get('nicename')) return tags def get_nodetype_metatypes(self, metatype_nodes): """Return a list of nodetype's metatypes based of imported metatypes""" metatypes = [] for metatype_node in metatype_nodes: domain = metatype_node.attrib.get('domain') if domain == 'metatype': metatypes.append(self.metatypes[metatype_node.text]) return metatypes def import_nodetype(self, title, content, item_node): """Importing a nodetype but some data are missing like the image, related nodetypes, start_publication and end_publication. start_publication and creation_date will use the same value, wich is always in Wordpress $post->post_date""" creation_date = datetime.strptime( item_node.find('{%s}post_date' % WP_NS).text, '%Y-%m-%d %H:%M:%S') excerpt = item_node.find('{%sexcerpt/}encoded' % WP_NS).text if not excerpt: if self.auto_excerpt: excerpt = truncate_words(strip_tags(content), 50) else: excerpt = '' nodetype_dict = { 'content': content, 'excerpt': excerpt, # Prefer use this function than # item_node.find('{%s}post_name' % WP_NS).text # Because slug can be not well formated 'slug': slugify(title)[:255] or 'post-%s' % item_node.find( '{%s}post_id' % WP_NS).text, 'tags': ', '.join(self.get_nodetype_tags(item_node.findall( 'metatype'))), 'status': self.REVERSE_STATUS[item_node.find( '{%s}status' % WP_NS).text], 'comment_enabled': item_node.find( '{%s}comment_status' % WP_NS).text == 'open', 'pingback_enabled': item_node.find( '{%s}ping_status' % WP_NS).text == 'open', 'featured': item_node.find('{%s}is_sticky' % WP_NS).text == '1', 'password': item_node.find('{%s}post_password' % WP_NS).text or '', 'login_required': item_node.find( '{%s}status' % WP_NS).text == 'private', 'creation_date': creation_date, 'last_update': datetime.now(), 'start_publication': creation_date} nodetype, created = Nodetype.objects.get_or_create(title=title, defaults=nodetype_dict) nodetype.metatypes.add(*self.get_nodetype_metatypes( item_node.findall('metatype'))) nodetype.authors.add(self.authors[item_node.find( '{http://purl.org/dc/elements/1.1/}creator').text]) nodetype.sites.add(self.SITE) #current_id = item_node.find('{%s}post_id' % WP_NS).text #parent_id = item_node.find('%s}post_parent' % WP_NS).text return nodetype def import_nodetypes(self, items): """Loops over items and find nodetype to import, a nodetype need to have 'post_type' set to 'post' and have content.""" self.write_out(self.style.STEP('- Importing nodetypes\n')) for item_node in items: title = (item_node.find('title').text or '')[:255] post_type = item_node.find('{%s}post_type' % WP_NS).text content = item_node.find( '{http://purl.org/rss/1.0/modules/content/}encoded').text if post_type == 'post' and content and title: self.write_out('> %s... ' % title) nodetype = self.import_nodetype(title, content, item_node) self.write_out(self.style.ITEM('OK\n')) self.import_comments(nodetype, item_node.findall( '{%s}comment/' % WP_NS)) else: self.write_out('> %s... ' % title, 2) self.write_out(self.style.NOTICE('SKIPPED (not a post)\n'), 2) def import_comments(self, nodetype, comment_nodes): """Loops over comments nodes and import then in django.contrib.comments""" for comment_node in comment_nodes: is_pingback = comment_node.find( '{%s}comment_type' % WP_NS).text == 'pingback' is_trackback = comment_node.find( '{%s}comment_type' % WP_NS).text == 'trackback' title = 'Comment #%s' % (comment_node.find( '{%s}comment_id/' % WP_NS).text) self.write_out(' > %s... ' % title) content = comment_node.find( '{%s}comment_content/' % WP_NS).text if not content: self.write_out(self.style.NOTICE('SKIPPED (unfilled)\n')) return submit_date = datetime.strptime( comment_node.find('{%s}comment_date' % WP_NS).text, '%Y-%m-%d %H:%M:%S') approvation = comment_node.find( '{%s}comment_approved' % WP_NS).text is_public = True is_removed = False if approvation != '1': is_removed = True if approvation == 'spam': is_public = False comment_dict = { 'content_object': nodetype, 'site': self.SITE, 'user_name': comment_node.find( '{%s}comment_author/' % WP_NS).text[:50], 'user_email': comment_node.find( '{%s}comment_author_email/' % WP_NS).text or '', 'user_url': comment_node.find( '{%s}comment_author_url/' % WP_NS).text or '', 'comment': content, 'submit_date': submit_date, 'ip_address': comment_node.find( '{%s}comment_author_IP/' % WP_NS).text or '', 'is_public': is_public, 'is_removed': is_removed, } comment = comments.get_model()(**comment_dict) comment.save() if approvation == 'spam': comment.flags.create( user=nodetype.authors.all()[0], flag='spam') if is_pingback: comment.flags.create( user=nodetype.authors.all()[0], flag='pingback') if is_trackback: comment.flags.create( user=nodetype.authors.all()[0], flag='trackback') self.write_out(self.style.ITEM('OK\n'))