Updating to the point where we can allllmost run with the new reprocessing code
This commit sponsored by Odin Hørthe Omdal. Thank you!
This commit is contained in:
parent
d1e9913b71
commit
77ea4c9bd1
@ -19,9 +19,12 @@ import os
|
|||||||
from mediagoblin import mg_globals
|
from mediagoblin import mg_globals
|
||||||
from mediagoblin.db.models import MediaEntry
|
from mediagoblin.db.models import MediaEntry
|
||||||
from mediagoblin.gmg_commands import util as commands_util
|
from mediagoblin.gmg_commands import util as commands_util
|
||||||
|
from mediagoblin.submit.lib import run_process_media
|
||||||
from mediagoblin.tools.translate import lazy_pass_to_ugettext as _
|
from mediagoblin.tools.translate import lazy_pass_to_ugettext as _
|
||||||
from mediagoblin.tools.pluginapi import hook_handle
|
from mediagoblin.tools.pluginapi import hook_handle
|
||||||
from mediagoblin.processing import ProcessorDoesNotExist, ProcessorNotEligible
|
from mediagoblin.processing import (
|
||||||
|
ProcessorDoesNotExist, ProcessorNotEligible,
|
||||||
|
get_entry_and_manager, get_manager_for_type)
|
||||||
|
|
||||||
|
|
||||||
def reprocess_parser_setup(subparser):
|
def reprocess_parser_setup(subparser):
|
||||||
@ -205,31 +208,16 @@ def _set_media_state(args):
|
|||||||
args[0].state = 'processed'
|
args[0].state = 'processed'
|
||||||
|
|
||||||
|
|
||||||
class MediaEntryNotFound(Exception): pass
|
|
||||||
|
|
||||||
def extract_entry_and_type(media_id):
|
|
||||||
"""
|
|
||||||
Fetch a media entry, as well as its media type
|
|
||||||
"""
|
|
||||||
entry = MediaEntry.query.filter_by(id=media_id).first()
|
|
||||||
if entry is None:
|
|
||||||
raise MediaEntryNotFound("Can't find media with id '%s'" % media_id)
|
|
||||||
|
|
||||||
return entry.media_type, entry
|
|
||||||
|
|
||||||
|
|
||||||
def available(args):
|
def available(args):
|
||||||
# Get the media type, either by looking up media id, or by specific type
|
# Get the media type, either by looking up media id, or by specific type
|
||||||
try:
|
try:
|
||||||
media_id = int(args.id_or_type)
|
media_entry, manager = get_entry_and_manager(args.id_or_type)
|
||||||
media_type, media_entry = extract_entry_and_type(media_id)
|
media_type = media_entry.type
|
||||||
except ValueError:
|
except ValueError:
|
||||||
media_type = args.id_or_type
|
media_type = args.id_or_type
|
||||||
media_entry = None
|
media_entry = None
|
||||||
|
manager = get_manager_for_type(media_type)
|
||||||
|
|
||||||
manager_class = hook_handle(('reprocess_manager', media_type))
|
|
||||||
manager = manager_class()
|
|
||||||
|
|
||||||
if media_entry is None:
|
if media_entry is None:
|
||||||
processors = manager.list_all_processors()
|
processors = manager.list_all_processors()
|
||||||
else:
|
else:
|
||||||
@ -257,10 +245,7 @@ def available(args):
|
|||||||
|
|
||||||
|
|
||||||
def run(args):
|
def run(args):
|
||||||
media_type, media_entry = extract_entry_and_type(args.media_id)
|
media_entry, manager = get_entry_and_manager(args.media_id)
|
||||||
|
|
||||||
manager_class = hook_handle(('reprocess_manager', media_type))
|
|
||||||
manager = manager_class()
|
|
||||||
|
|
||||||
# TODO: (maybe?) This could probably be handled entirely by the
|
# TODO: (maybe?) This could probably be handled entirely by the
|
||||||
# processor class...
|
# processor class...
|
||||||
@ -279,8 +264,11 @@ def run(args):
|
|||||||
reprocess_parser = processor_class.generate_parser()
|
reprocess_parser = processor_class.generate_parser()
|
||||||
reprocess_args = reprocess_parser.parse_args(args.reprocess_args)
|
reprocess_args = reprocess_parser.parse_args(args.reprocess_args)
|
||||||
reprocess_request = processor_class.args_to_request(reprocess_args)
|
reprocess_request = processor_class.args_to_request(reprocess_args)
|
||||||
processor = processor_class(manager, media_entry)
|
run_process_media(
|
||||||
processor.process(**reprocess_request)
|
media_entry,
|
||||||
|
reprocess_action=args.reprocess_command,
|
||||||
|
reprocess_info=reprocess_request)
|
||||||
|
manager.process(media_entry, args.reprocess_command, **reprocess_request)
|
||||||
|
|
||||||
|
|
||||||
def reprocess(args):
|
def reprocess(args):
|
||||||
|
@ -18,9 +18,10 @@ from collections import OrderedDict
|
|||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
|
|
||||||
from mediagoblin.db.util import atomic_update
|
|
||||||
from mediagoblin import mg_globals as mgg
|
from mediagoblin import mg_globals as mgg
|
||||||
|
from mediagoblin.db.util import atomic_update
|
||||||
|
from mediagoblin.db.models import MediaEntry
|
||||||
|
from mediagoblin.tools.pluginapi import hook_handle
|
||||||
from mediagoblin.tools.translate import lazy_pass_to_ugettext as _
|
from mediagoblin.tools.translate import lazy_pass_to_ugettext as _
|
||||||
|
|
||||||
_log = logging.getLogger(__name__)
|
_log = logging.getLogger(__name__)
|
||||||
@ -208,7 +209,7 @@ class ProcessingManager(object):
|
|||||||
|
|
||||||
return processor
|
return processor
|
||||||
|
|
||||||
def process(self, entry, directive, request):
|
def process_from_args(self, entry, reprocess_command, request):
|
||||||
"""
|
"""
|
||||||
Process a media entry.
|
Process a media entry.
|
||||||
"""
|
"""
|
||||||
@ -226,6 +227,39 @@ def request_from_args(args, which_args):
|
|||||||
return request
|
return request
|
||||||
|
|
||||||
|
|
||||||
|
class MediaEntryNotFound(Exception): pass
|
||||||
|
|
||||||
|
|
||||||
|
def get_manager_for_type(media_type):
|
||||||
|
"""
|
||||||
|
Get the appropriate media manager for this type
|
||||||
|
"""
|
||||||
|
manager_class = hook_handle(('reprocess_manager', media_type))
|
||||||
|
manager = manager_class()
|
||||||
|
|
||||||
|
return manager
|
||||||
|
|
||||||
|
|
||||||
|
def get_entry_and_manager(media_id):
|
||||||
|
"""
|
||||||
|
Get a MediaEntry, its media type, and its manager all in one go.
|
||||||
|
|
||||||
|
Returns a tuple of: `(entry, media_type, media_manager)`
|
||||||
|
"""
|
||||||
|
entry = MediaEntry.query.filter_by(id=media_id).first()
|
||||||
|
if entry is None:
|
||||||
|
raise MediaEntryNotFound("Can't find media with id '%s'" % media_id)
|
||||||
|
|
||||||
|
manager = get_manager_for_type(entry.media_type)
|
||||||
|
|
||||||
|
return entry, manager
|
||||||
|
|
||||||
|
|
||||||
|
################################################
|
||||||
|
# TODO: This ProcessingState is OUTDATED,
|
||||||
|
# and needs to be refactored into other tools!
|
||||||
|
################################################
|
||||||
|
|
||||||
class ProcessingState(object):
|
class ProcessingState(object):
|
||||||
"""
|
"""
|
||||||
The first and only argument to the "processor" of a media type
|
The first and only argument to the "processor" of a media type
|
||||||
|
@ -21,9 +21,9 @@ import urllib2
|
|||||||
from celery import registry, task
|
from celery import registry, task
|
||||||
|
|
||||||
from mediagoblin import mg_globals as mgg
|
from mediagoblin import mg_globals as mgg
|
||||||
from mediagoblin.db.models import MediaEntry
|
from . import mark_entry_failed, BaseProcessingFail
|
||||||
from . import mark_entry_failed, BaseProcessingFail, ProcessingState
|
|
||||||
from mediagoblin.tools.processing import json_processing_callback
|
from mediagoblin.tools.processing import json_processing_callback
|
||||||
|
from mediagoblin.processing import get_entry_and_manager
|
||||||
|
|
||||||
_log = logging.getLogger(__name__)
|
_log = logging.getLogger(__name__)
|
||||||
logging.basicConfig()
|
logging.basicConfig()
|
||||||
@ -68,7 +68,7 @@ class ProcessMedia(task.Task):
|
|||||||
"""
|
"""
|
||||||
Pass this entry off for processing.
|
Pass this entry off for processing.
|
||||||
"""
|
"""
|
||||||
def run(self, media_id, feed_url, reprocess_info=None):
|
def run(self, media_id, feed_url, reprocess_action, reprocess_info=None):
|
||||||
"""
|
"""
|
||||||
Pass the media entry off to the appropriate processing function
|
Pass the media entry off to the appropriate processing function
|
||||||
(for now just process_image...)
|
(for now just process_image...)
|
||||||
@ -78,28 +78,20 @@ class ProcessMedia(task.Task):
|
|||||||
:param reprocess: A dict containing all of the necessary reprocessing
|
:param reprocess: A dict containing all of the necessary reprocessing
|
||||||
info for the media_type.
|
info for the media_type.
|
||||||
"""
|
"""
|
||||||
entry = MediaEntry.query.get(media_id)
|
reprocess_info = reprocess_info or {}
|
||||||
|
entry, manager = get_entry_and_manager(media_id)
|
||||||
|
|
||||||
# Try to process, and handle expected errors.
|
# Try to process, and handle expected errors.
|
||||||
try:
|
try:
|
||||||
|
processor_class = manager.get_processor(reprocess_action, entry)
|
||||||
|
|
||||||
entry.state = u'processing'
|
entry.state = u'processing'
|
||||||
entry.save()
|
entry.save()
|
||||||
|
|
||||||
_log.debug('Processing {0}'.format(entry))
|
_log.debug('Processing {0}'.format(entry))
|
||||||
|
|
||||||
proc_state = ProcessingState(entry)
|
with processor_class(manager, entry) as processor:
|
||||||
with mgg.workbench_manager.create() as workbench:
|
processor.process(**reprocess_info)
|
||||||
|
|
||||||
proc_state.set_workbench(workbench)
|
|
||||||
processor = entry.media_manager.processor(proc_state)
|
|
||||||
|
|
||||||
# If we have reprocess_info, let's reprocess
|
|
||||||
if reprocess_info:
|
|
||||||
processor.reprocess(reprocess_info)
|
|
||||||
|
|
||||||
# Run initial processing
|
|
||||||
else:
|
|
||||||
processor.initial_processing()
|
|
||||||
|
|
||||||
# We set the state to processed and save the entry here so there's
|
# We set the state to processed and save the entry here so there's
|
||||||
# no need to save at the end of the processing stage, probably ;)
|
# no need to save at the end of the processing stage, probably ;)
|
||||||
|
@ -76,7 +76,8 @@ def prepare_queue_task(app, entry, filename):
|
|||||||
return queue_file
|
return queue_file
|
||||||
|
|
||||||
|
|
||||||
def run_process_media(entry, feed_url=None, reprocess_info=None):
|
def run_process_media(entry, feed_url=None,
|
||||||
|
reprocess_action="inital", reprocess_info=None):
|
||||||
"""Process the media asynchronously
|
"""Process the media asynchronously
|
||||||
|
|
||||||
:param entry: MediaEntry() instance to be processed.
|
:param entry: MediaEntry() instance to be processed.
|
||||||
@ -84,11 +85,12 @@ def run_process_media(entry, feed_url=None, reprocess_info=None):
|
|||||||
should be notified of. This will be sth like: `request.urlgen(
|
should be notified of. This will be sth like: `request.urlgen(
|
||||||
'mediagoblin.user_pages.atom_feed',qualified=True,
|
'mediagoblin.user_pages.atom_feed',qualified=True,
|
||||||
user=request.user.username)`
|
user=request.user.username)`
|
||||||
:param reprocess: A dict containing all of the necessary reprocessing
|
:param reprocess_action: What particular action should be run.
|
||||||
|
:param reprocess_info: A dict containing all of the necessary reprocessing
|
||||||
info for the given media_type"""
|
info for the given media_type"""
|
||||||
try:
|
try:
|
||||||
process_media.apply_async(
|
process_media.apply_async(
|
||||||
[entry.id, feed_url, reprocess_info], {},
|
[entry.id, feed_url, reprocess_action, reprocess_info], {},
|
||||||
task_id=entry.queued_task_id)
|
task_id=entry.queued_task_id)
|
||||||
except BaseException as exc:
|
except BaseException as exc:
|
||||||
# The purpose of this section is because when running in "lazy"
|
# The purpose of this section is because when running in "lazy"
|
||||||
|
Loading…
x
Reference in New Issue
Block a user