diff --git a/mediagoblin/process_media/__init__.py b/mediagoblin/process_media/__init__.py index 00402d7e..d6cdd747 100644 --- a/mediagoblin/process_media/__init__.py +++ b/mediagoblin/process_media/__init__.py @@ -15,13 +15,14 @@ # along with this program. If not, see . import Image -from mediagoblin.db.util import ObjectId -from celery.task import task -from mediagoblin import mg_globals as mgg from contextlib import contextmanager +from celery.task import task, Task +from celery import registry -from mediagoblin.process_media.errors import BadMediaFail +from mediagoblin.db.util import ObjectId +from mediagoblin import mg_globals as mgg +from mediagoblin.process_media.errors import BaseProcessingFail, BadMediaFail THUMB_SIZE = 180, 180 @@ -34,6 +35,7 @@ def create_pub_filepath(entry, filename): unicode(entry['_id']), filename]) + @contextmanager def closing(callback): try: @@ -41,12 +43,66 @@ def closing(callback): finally: pass -@task -def process_media_initial(media_id): - workbench = mgg.workbench_manager.create_workbench() - entry = mgg.database.MediaEntry.one( - {'_id': ObjectId(media_id)}) +################################ +# Media processing initial steps +################################ + +class ProcessMedia(Task): + """ + Pass this entry off for processing. + """ + def run(self, media_id): + """ + Pass the media entry off to the appropriate processing function + (for now just process_image...) + """ + entry = mgg.database.MediaEntry.one( + {'_id': ObjectId(media_id)}) + process_image(entry) + entry['state'] = u'processed' + entry.save() + + def on_failure(self, exc, task_id, args, kwargs, einfo): + """ + If the processing failed we should mark that in the database. + + Assuming that the exception raised is a subclass of BaseProcessingFail, + we can use that to get more information about the failure and store that + for conveying information to users about the failure, etc. + """ + media_id = args[0] + entry = mgg.database.MediaEntry.one( + {'_id': ObjectId(media_id)}) + + entry[u'state'] = u'failed' + + # Was this a BaseProcessingFail? In other words, was this a + # type of error that we know how to handle? + if isinstance(exc, BaseProcessingFail): + # Looks like yes, so record information about that failure and any + # metadata the user might have supplied. + entry[u'fail_error'] = exc.exception_path + entry[u'fail_metadata'] = exc.metadata + else: + # Looks like no, so just mark it as failed and don't record a + # failure_error (we'll assume it wasn't handled) and don't record + # metadata (in fact overwrite it if somehow it had previous info + # here) + entry[u'fail_error'] = None + entry[u'fail_metadata'] = {} + + entry.save() + + +process_media = registry.tasks[ProcessMedia.name] + + +def process_image(entry): + """ + Code to process an image + """ + workbench = mgg.workbench_manager.create_workbench() queued_filepath = entry['queued_media_file'] queued_filename = workbench.localized_file( @@ -107,8 +163,6 @@ def process_media_initial(media_id): media_files_dict['original'] = original_filepath if medium_processed: media_files_dict['medium'] = medium_filepath - entry['state'] = u'processed' - entry.save() # clean up workbench workbench.destroy_self() diff --git a/mediagoblin/submit/views.py b/mediagoblin/submit/views.py index 1e8c6a68..25b3664b 100644 --- a/mediagoblin/submit/views.py +++ b/mediagoblin/submit/views.py @@ -14,9 +14,10 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . +import uuid + from os.path import splitext from cgi import FieldStorage -from string import split from werkzeug.utils import secure_filename @@ -27,7 +28,7 @@ from mediagoblin.util import ( from mediagoblin.util import pass_to_ugettext as _ from mediagoblin.decorators import require_active_login from mediagoblin.submit import forms as submit_forms, security -from mediagoblin.process_media import process_media_initial +from mediagoblin.process_media import process_media from mediagoblin.messages import add_message, SUCCESS @@ -87,15 +88,24 @@ def submit_start(request): # Add queued filename to the entry entry['queued_media_file'] = queue_filepath + # We generate this ourselves so we know what the taks id is for + # retrieval later. + # (If we got it off the task's auto-generation, there'd be a risk of + # a race condition when we'd save after sending off the task) + task_id = unicode(uuid.uuid4()) + entry['queued_task_id'] = task_id + # Save now so we have this data before kicking off processing - entry.save(validate=False) - - result = process_media_initial.delay(unicode(entry['_id'])) - - # Save the task id - entry['queued_task_id'] = unicode(result.task_id) entry.save(validate=True) + # Pass off to processing + # + # (... don't change entry after this point to avoid race + # conditions with changes to the document via processing code) + process_media.apply_async( + [unicode(entry['_id'])], {}, + task_id=task_id) + add_message(request, SUCCESS, _('Woohoo! Submitted!')) return redirect(request, "mediagoblin.user_pages.user_home",