Capture and properly handle errors.

Handled in several places:

 - In the run() of the ProcessMedia itself for
   handled (BaseProcessingFail derived) errors (best to do these not
   in on_failure because the errors are highlighted in celeryd in a
   way that looks inappropriate for when the errors are well handled)
 - In ProcessMedia.on_failure() for all other errors
 - In the submit view where all exceptions are caught, media is marked
   at having failed, then the error is re-raised.  (The reason for
   this is that users running in "lazy" mode will get errors
   propagated by celery and so on_failure won't run for them.)
This commit is contained in:
Christopher Allan Webber 2011-08-13 12:21:06 -05:00
parent 4a477e246d
commit 6788b4123e
2 changed files with 50 additions and 27 deletions

View File

@ -59,7 +59,14 @@ class ProcessMedia(Task):
""" """
entry = mgg.database.MediaEntry.one( entry = mgg.database.MediaEntry.one(
{'_id': ObjectId(media_id)}) {'_id': ObjectId(media_id)})
# Try to process, and handle expected errors.
try:
process_image(entry) process_image(entry)
except BaseProcessingFail, exc:
mark_entry_failed(entry[u'_id'], exc)
return
entry['state'] = u'processed' entry['state'] = u'processed'
entry.save() entry.save()
@ -71,31 +78,34 @@ class ProcessMedia(Task):
we can use that to get more information about the failure and store that we can use that to get more information about the failure and store that
for conveying information to users about the failure, etc. for conveying information to users about the failure, etc.
""" """
media_id = args[0] entry_id = args[0]
entry = mgg.database.MediaEntry.one( mark_entry_failed(entry_id, exc)
{'_id': ObjectId(media_id)})
entry[u'state'] = u'failed'
process_media = registry.tasks[ProcessMedia.name]
def mark_entry_failed(entry_id, exc):
# Was this a BaseProcessingFail? In other words, was this a # Was this a BaseProcessingFail? In other words, was this a
# type of error that we know how to handle? # type of error that we know how to handle?
if isinstance(exc, BaseProcessingFail): if isinstance(exc, BaseProcessingFail):
# Looks like yes, so record information about that failure and any # Looks like yes, so record information about that failure and any
# metadata the user might have supplied. # metadata the user might have supplied.
entry[u'fail_error'] = exc.exception_path mgg.database['media_entries'].update(
entry[u'fail_metadata'] = exc.metadata {'_id': entry_id},
{'$set': {u'state': u'failed',
u'fail_error': exc.exception_path,
u'fail_metadata': exc.metadata}})
else: else:
# Looks like no, so just mark it as failed and don't record a # Looks like no, so just mark it as failed and don't record a
# failure_error (we'll assume it wasn't handled) and don't record # failure_error (we'll assume it wasn't handled) and don't record
# metadata (in fact overwrite it if somehow it had previous info # metadata (in fact overwrite it if somehow it had previous info
# here) # here)
entry[u'fail_error'] = None mgg.database['media_entries'].update(
entry[u'fail_metadata'] = {} {'_id': entry_id},
{'$set': {u'state': u'failed',
entry.save() u'fail_error': None,
u'fail_metadata': {}}})
process_media = registry.tasks[ProcessMedia.name]
def process_image(entry): def process_image(entry):

View File

@ -28,7 +28,7 @@ from mediagoblin.util import (
from mediagoblin.util import pass_to_ugettext as _ from mediagoblin.util import pass_to_ugettext as _
from mediagoblin.decorators import require_active_login from mediagoblin.decorators import require_active_login
from mediagoblin.submit import forms as submit_forms, security from mediagoblin.submit import forms as submit_forms, security
from mediagoblin.process_media import process_media from mediagoblin.process_media import process_media, mark_entry_failed
from mediagoblin.messages import add_message, SUCCESS from mediagoblin.messages import add_message, SUCCESS
@ -102,9 +102,22 @@ def submit_start(request):
# #
# (... don't change entry after this point to avoid race # (... don't change entry after this point to avoid race
# conditions with changes to the document via processing code) # conditions with changes to the document via processing code)
try:
process_media.apply_async( process_media.apply_async(
[unicode(entry['_id'])], {}, [unicode(entry['_id'])], {},
task_id=task_id) task_id=task_id)
except BaseException as exc:
# The purpose of this section is because when running in "lazy"
# or always-eager-with-exceptions-propagated celery mode that
# the failure handling won't happen on Celery end. Since we
# expect a lot of users to run things in this way we have to
# capture stuff here.
#
# ... not completely the diaper pattern because the exception is
# re-raised :)
mark_entry_failed(entry[u'_id'], exc)
# re-raise the exception
raise
add_message(request, SUCCESS, _('Woohoo! Submitted!')) add_message(request, SUCCESS, _('Woohoo! Submitted!'))