Add priority to the celery tasks

Few more changes to be made before executing the tasks.
Also #1 should be handled soon after this.
This commit is contained in:
vijeth-aradhya 2017-06-12 20:53:23 +05:30
parent 9a27fa60a4
commit 25ecdec997
9 changed files with 42 additions and 21 deletions

View File

@ -274,7 +274,8 @@ class AsciiProcessingManager(ProcessingManager):
self.add_processor(InitialProcessor)
self.add_processor(Resizer)
def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None):
def workflow(self, entry, manager, feed_url, reprocess_action,
reprocess_info=None):
ProcessMedia().apply_async(
[entry.id, feed_url, reprocess_action, reprocess_info], {},
task_id=entry.queued_task_id)

View File

@ -366,7 +366,8 @@ class AudioProcessingManager(ProcessingManager):
self.add_processor(Resizer)
self.add_processor(Transcoder)
def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None):
def workflow(self, entry, manager, feed_url, reprocess_action,
reprocess_info=None):
ProcessMedia().apply_async(
[entry.id, feed_url, reprocess_action, reprocess_info], {},
task_id=entry.queued_task_id)

View File

@ -431,7 +431,8 @@ class ImageProcessingManager(ProcessingManager):
self.add_processor(Resizer)
self.add_processor(MetadataProcessing)
def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None):
def workflow(self, entry, manager, feed_url, reprocess_action,
reprocess_info=None):
ProcessMedia().apply_async(
[entry.id, feed_url, reprocess_action, reprocess_info], {},
task_id=entry.queued_task_id)

View File

@ -471,7 +471,8 @@ class PdfProcessingManager(ProcessingManager):
self.add_processor(InitialProcessor)
self.add_processor(Resizer)
def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None):
def workflow(self, entry, manager, feed_url, reprocess_action,
reprocess_info=None):
ProcessMedia().apply_async(
[entry.id, feed_url, reprocess_action, reprocess_info], {},
task_id=entry.queued_task_id)

View File

@ -81,7 +81,8 @@ class RawImageProcessingManager(ProcessingManager):
self.add_processor(InitialRawProcessor)
self.add_processor(Resizer)
def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None):
def workflow(self, entry, manager, feed_url, reprocess_action,
reprocess_info=None):
ProcessMedia().apply_async(
[entry.id, feed_url, reprocess_action, reprocess_info], {},
task_id=entry.queued_task_id)

View File

@ -369,7 +369,8 @@ class StlProcessingManager(ProcessingManager):
self.add_processor(InitialProcessor)
self.add_processor(Resizer)
def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None):
def workflow(self, entry, manager, feed_url, reprocess_action,
reprocess_info=None):
ProcessMedia().apply_async(
[entry.id, feed_url, reprocess_action, reprocess_info], {},
task_id=entry.queued_task_id)

View File

@ -22,6 +22,7 @@ import celery
import six
from celery import group, chord
from mediagoblin import mg_globals as mgg
from mediagoblin.processing import (
FilenameBuilder, BaseProcessingFail,
@ -34,7 +35,7 @@ from mediagoblin.tools.translate import lazy_pass_to_ugettext as _
from mediagoblin.media_types import MissingComponents
from . import transcoders
from .util import skip_transcode
from .util import skip_transcode, ACCEPTED_RESOLUTIONS
_log = logging.getLogger(__name__)
_log.setLevel(logging.DEBUG)
@ -165,26 +166,26 @@ def store_metadata(media_entry, metadata):
@celery.task()
def main_task(**process_info):
def main_task(resolution, medium_size, **process_info):
processor = CommonVideoProcessor(process_info['manager'], process_info['entry'])
processor.common_setup(process_info['resolution'])
processor.transcode(medium_size=process_info['medium_size'], vp8_quality=process_info['vp8_quality'],
processor.common_setup(resolution)
processor.transcode(medium_size=medium_size, vp8_quality=process_info['vp8_quality'],
vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality'])
processor.generate_thumb(thumb_size=process_info['thumb_size'])
processor.store_orig_metadata()
@celery.task()
def complimentary_task(**process_info):
def complimentary_task(resolution, medium_size, **process_info):
processor = CommonVideoProcessor(process_info['manager'], process_info['entry'])
processor.common_setup(process_info['resolution'])
processor.transcode(medium_size=process_info['medium_size'], vp8_quality=process_info['vp8_quality'],
processor.common_setup(resolution)
processor.transcode(medium_size=medium_size, vp8_quality=process_info['vp8_quality'],
vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality'])
@celery.task()
def processing_cleanup(**process_info):
processor = CommonVideoProcessor(process_info['manager'], process_info['entry'])
def processing_cleanup(entry, manager):
processor = CommonVideoProcessor(manager, entry) # is it manager, entry or entry, manager?
processor.delete_queue_file()
# =====================
@ -523,7 +524,20 @@ class VideoProcessingManager(ProcessingManager):
self.add_processor(Resizer)
self.add_processor(Transcoder)
def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None):
ProcessMedia().apply_async(
[entry.id, feed_url, reprocess_action, reprocess_info], {},
task_id=entry.queued_task_id)
def workflow(self, entry, manager, feed_url, reprocess_action,
reprocess_info=None):
reprocess_info['entry'] = entry.id # ?
reprocess_info['manager'] = manager # can celery serialize this?
# Add args
transcoding_tasks = group(
main_task.signature(queue='default', priority=5, immutable=True),
complimentary_task.signature(queue='default', priority=4, immutable=True),
complimentary_task.signature(queue='default', priority=3, immutable=True),
complimentary_task.signature(queue='default', priority=2, immutable=True)
complimentary_task.signature(queue='default', priority=1, immutable=True)
)
chord(transcoding_tasks)(processing_cleanup.signature(queue='default', immutable=True))

View File

@ -257,7 +257,8 @@ class ProcessingManager(object):
return processor
def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None):
def workflow(self, entry, manager, feed_url, reprocess_action,
reprocess_info=None):
"""
Returns the Celery command needed to proceed with media processing
*This method has to be implemented in all media types*

View File

@ -267,7 +267,7 @@ def run_process_media(entry, feed_url=None,
entry, manager = get_entry_and_processing_manager(entry.id)
try:
manager.workflow(entry, feed_url, reprocess_action, reprocess_info)
manager.workflow(entry, manager, feed_url, reprocess_action, reprocess_info)
except BaseException as exc:
# The purpose of this section is because when running in "lazy"
# or always-eager-with-exceptions-propagated celery mode that