Tweak Celery Task

- Make sure Exceptions are pickleable (not sure if this was not the
  case but this is the pattern as documented in the celery docs.
- Don't create a task_id in the GMG code, but save the one
  implicitely created by celery.
- Don't create a task-id directory per upload. Just store queued uploads
  in a single directory (this is the most controversial change and might
  need discussion!!!)

Signed-off-by: Sebastian Spaeth <Sebastian@SSpaeth.de>
This commit is contained in:
Sebastian Spaeth 2012-12-20 13:42:37 +01:00 committed by Rodney Ewing
parent bd0b5daa44
commit bf2dafd1a0
4 changed files with 17 additions and 17 deletions

View File

@ -109,7 +109,7 @@ class MediaEntry(Base_v0):
queued_media_file = Column(PathTupleWithSlashes) queued_media_file = Column(PathTupleWithSlashes)
queued_task_id = Column(Unicode) queued_task_id = Column(Unicode, default=None)
__table_args__ = ( __table_args__ = (
UniqueConstraint('uploader', 'slug'), UniqueConstraint('uploader', 'slug'),

View File

@ -181,9 +181,10 @@ class BaseProcessingFail(Exception):
return u"%s:%s" % ( return u"%s:%s" % (
self.__class__.__module__, self.__class__.__name__) self.__class__.__module__, self.__class__.__name__)
def __init__(self, **metadata): def __init__(self, *args, **kwargs):
self.metadata = metadata or {} # next line is REQUIRED to have pickable exceptions if you want
# to be able to pass in custom arguments (see celery docs)
Exception.__init__(self, *args, **metadata)
class BadMediaFail(BaseProcessingFail): class BadMediaFail(BaseProcessingFail):
""" """

View File

@ -18,11 +18,13 @@ import logging
import urllib import urllib
import urllib2 import urllib2
from celery import registry, task #TODO: newer celeries use from celery import Task. Change when we upgrade
from celery.task import Task
from celery.registry import tasks
from mediagoblin import mg_globals as mgg from mediagoblin import mg_globals as mgg
from mediagoblin.db.models import MediaEntry from mediagoblin.db.sql.models import MediaEntry
from . import mark_entry_failed, BaseProcessingFail, ProcessingState from mediagoblin.processing import mark_entry_failed, BaseProcessingFail
from mediagoblin.tools.processing import json_processing_callback from mediagoblin.tools.processing import json_processing_callback
_log = logging.getLogger(__name__) _log = logging.getLogger(__name__)
@ -63,12 +65,10 @@ def handle_push_urls(feed_url):
################################ ################################
# Media processing initial steps # Media processing initial steps
################################ ################################
class ProcessMedia(Task):
track_started=True
class ProcessMedia(task.Task): def run(self, media_id):
"""
Pass this entry off for processing.
"""
def run(self, media_id, feed_url):
""" """
Pass the media entry off to the appropriate processing function Pass the media entry off to the appropriate processing function
(for now just process_image...) (for now just process_image...)
@ -81,8 +81,8 @@ class ProcessMedia(task.Task):
# Try to process, and handle expected errors. # Try to process, and handle expected errors.
try: try:
entry.state = u'processing' entry.state = u'processing'
entry.queued_task_id = self.request.id
entry.save() entry.save()
_log.debug('Processing {0}'.format(entry)) _log.debug('Processing {0}'.format(entry))
proc_state = ProcessingState(entry) proc_state = ProcessingState(entry)
@ -140,6 +140,4 @@ class ProcessMedia(task.Task):
entry = mgg.database.MediaEntry.query.filter_by(id=entry_id).first() entry = mgg.database.MediaEntry.query.filter_by(id=entry_id).first()
json_processing_callback(entry) json_processing_callback(entry)
# Register the task tasks.register(ProcessMedia)
process_media = registry.tasks[ProcessMedia.name]

View File

@ -89,7 +89,7 @@ def submit_start(request):
# Save now so we have this data before kicking off processing # Save now so we have this data before kicking off processing
entry.save() entry.save()
# Pass off to processing # Pass off to async processing
# #
# (... don't change entry after this point to avoid race # (... don't change entry after this point to avoid race
# conditions with changes to the document via processing code) # conditions with changes to the document via processing code)
@ -97,6 +97,7 @@ def submit_start(request):
'mediagoblin.user_pages.atom_feed', 'mediagoblin.user_pages.atom_feed',
qualified=True, user=request.user.username) qualified=True, user=request.user.username)
run_process_media(entry, feed_url) run_process_media(entry, feed_url)
add_message(request, SUCCESS, _('Woohoo! Submitted!')) add_message(request, SUCCESS, _('Woohoo! Submitted!'))
add_comment_subscription(request.user, entry) add_comment_subscription(request.user, entry)