Celery Priority testing with debug statements

Error at this line:
`self.entry.set_file_metadata(self.curr_file, **file_metadata)`
Otherwise, celery part should work fine.
This commit is contained in:
vijeth-aradhya 2017-06-13 01:43:43 +05:30
parent bd011c940e
commit d77eb56280
10 changed files with 80 additions and 47 deletions

View File

@ -55,6 +55,9 @@ def get_celery_settings_dict(app_config, global_config,
queue_arguments={'x-max-priority': 10}), queue_arguments={'x-max-priority': 10}),
) )
print "CELERY_ACKS_LATE", celery_conf['CELERY_ACKS_LATE']
print "CELERYD_PREFETCH_MULTIPLIER", celery_conf['CELERYD_PREFETCH_MULTIPLIER']
celery_settings = {} celery_settings = {}
# Add all celery settings from config # Add all celery settings from config

View File

@ -274,8 +274,7 @@ class AsciiProcessingManager(ProcessingManager):
self.add_processor(InitialProcessor) self.add_processor(InitialProcessor)
self.add_processor(Resizer) self.add_processor(Resizer)
def workflow(self, entry, manager, feed_url, reprocess_action, def workflow(self, entry_id, feed_url, reprocess_action, reprocess_info=None):
reprocess_info=None):
ProcessMedia().apply_async( ProcessMedia().apply_async(
[entry.id, feed_url, reprocess_action, reprocess_info], {}, [entry.id, feed_url, reprocess_action, reprocess_info], {},
task_id=entry.queued_task_id) task_id=entry.queued_task_id)

View File

@ -366,8 +366,7 @@ class AudioProcessingManager(ProcessingManager):
self.add_processor(Resizer) self.add_processor(Resizer)
self.add_processor(Transcoder) self.add_processor(Transcoder)
def workflow(self, entry, manager, feed_url, reprocess_action, def workflow(self, entry_id, feed_url, reprocess_action, reprocess_info=None):
reprocess_info=None):
ProcessMedia().apply_async( ProcessMedia().apply_async(
[entry.id, feed_url, reprocess_action, reprocess_info], {}, [entry.id, feed_url, reprocess_action, reprocess_info], {},
task_id=entry.queued_task_id) task_id=entry.queued_task_id)

View File

@ -431,8 +431,7 @@ class ImageProcessingManager(ProcessingManager):
self.add_processor(Resizer) self.add_processor(Resizer)
self.add_processor(MetadataProcessing) self.add_processor(MetadataProcessing)
def workflow(self, entry, manager, feed_url, reprocess_action, def workflow(self, entry_id, feed_url, reprocess_action, reprocess_info=None):
reprocess_info=None):
ProcessMedia().apply_async( ProcessMedia().apply_async(
[entry.id, feed_url, reprocess_action, reprocess_info], {}, [entry.id, feed_url, reprocess_action, reprocess_info], {},
task_id=entry.queued_task_id) task_id=entry.queued_task_id)

View File

@ -471,8 +471,7 @@ class PdfProcessingManager(ProcessingManager):
self.add_processor(InitialProcessor) self.add_processor(InitialProcessor)
self.add_processor(Resizer) self.add_processor(Resizer)
def workflow(self, entry, manager, feed_url, reprocess_action, def workflow(self, entry_id, feed_url, reprocess_action, reprocess_info=None):
reprocess_info=None):
ProcessMedia().apply_async( ProcessMedia().apply_async(
[entry.id, feed_url, reprocess_action, reprocess_info], {}, [entry.id, feed_url, reprocess_action, reprocess_info], {},
task_id=entry.queued_task_id) task_id=entry.queued_task_id)

View File

@ -81,8 +81,7 @@ class RawImageProcessingManager(ProcessingManager):
self.add_processor(InitialRawProcessor) self.add_processor(InitialRawProcessor)
self.add_processor(Resizer) self.add_processor(Resizer)
def workflow(self, entry, manager, feed_url, reprocess_action, def workflow(self, entry_id, feed_url, reprocess_action, reprocess_info=None):
reprocess_info=None):
ProcessMedia().apply_async( ProcessMedia().apply_async(
[entry.id, feed_url, reprocess_action, reprocess_info], {}, [entry.id, feed_url, reprocess_action, reprocess_info], {},
task_id=entry.queued_task_id) task_id=entry.queued_task_id)

View File

@ -369,8 +369,7 @@ class StlProcessingManager(ProcessingManager):
self.add_processor(InitialProcessor) self.add_processor(InitialProcessor)
self.add_processor(Resizer) self.add_processor(Resizer)
def workflow(self, entry, manager, feed_url, reprocess_action, def workflow(self, entry_id, feed_url, reprocess_action, reprocess_info=None):
reprocess_info=None):
ProcessMedia().apply_async( ProcessMedia().apply_async(
[entry.id, feed_url, reprocess_action, reprocess_info], {}, [entry.id, feed_url, reprocess_action, reprocess_info], {},
task_id=entry.queued_task_id) task_id=entry.queued_task_id)

View File

@ -29,7 +29,7 @@ from mediagoblin.processing import (
ProgressCallback, MediaProcessor, ProgressCallback, MediaProcessor,
ProcessingManager, request_from_args, ProcessingManager, request_from_args,
get_process_filename, store_public, get_process_filename, store_public,
copy_original) copy_original, get_entry_and_processing_manager)
from mediagoblin.processing.task import ProcessMedia from mediagoblin.processing.task import ProcessMedia
from mediagoblin.tools.translate import lazy_pass_to_ugettext as _ from mediagoblin.tools.translate import lazy_pass_to_ugettext as _
from mediagoblin.media_types import MissingComponents from mediagoblin.media_types import MissingComponents
@ -166,27 +166,35 @@ def store_metadata(media_entry, metadata):
@celery.task() @celery.task()
def main_task(resolution, medium_size, **process_info): def main_task(entry_id, resolution, medium_size, **process_info):
processor = CommonVideoProcessor(process_info['manager'], process_info['entry']) entry, manager = get_entry_and_processing_manager(entry_id)
print "\nEntered main_task\n"
with CommonVideoProcessor(manager, entry) as processor:
processor.common_setup(resolution) processor.common_setup(resolution)
processor.transcode(medium_size=medium_size, vp8_quality=process_info['vp8_quality'], processor.transcode(medium_size=tuple(medium_size), vp8_quality=process_info['vp8_quality'],
vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality']) vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality'])
processor.generate_thumb(thumb_size=process_info['thumb_size']) processor.generate_thumb(thumb_size=process_info['thumb_size'])
processor.store_orig_metadata() processor.store_orig_metadata()
print "\nExited main_task\n"
@celery.task() @celery.task()
def complimentary_task(resolution, medium_size, **process_info): def complimentary_task(entry_id, resolution, medium_size, **process_info):
processor = CommonVideoProcessor(process_info['manager'], process_info['entry']) entry, manager = get_entry_and_processing_manager(entry_id)
print "\nEntered complimentary_task\n"
with CommonVideoProcessor(manager, entry) as processor:
processor.common_setup(resolution) processor.common_setup(resolution)
processor.transcode(medium_size=medium_size, vp8_quality=process_info['vp8_quality'], processor.transcode(medium_size=tuple(medium_size), vp8_quality=process_info['vp8_quality'],
vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality']) vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality'])
print "\nExited complimentary_task\n"
@celery.task() @celery.task()
def processing_cleanup(entry, manager): def processing_cleanup(entry_id):
processor = CommonVideoProcessor(manager, entry) entry, manager = get_entry_and_processing_manager(entry_id)
with CommonVideoProcessor(manager, entry) as processor:
processor.delete_queue_file() processor.delete_queue_file()
print "\nDeleted queue_file\n"
# ===================== # =====================
@ -218,6 +226,8 @@ class CommonVideoProcessor(MediaProcessor):
self.curr_file = 'webm_video' self.curr_file = 'webm_video'
self.part_filename = self.name_builder.fill('{basename}.medium.webm') self.part_filename = self.name_builder.fill('{basename}.medium.webm')
print self.curr_file, ": Done common_setup()"
def copy_original(self): def copy_original(self):
# If we didn't transcode, then we need to keep the original # If we didn't transcode, then we need to keep the original
raise NotImplementedError raise NotImplementedError
@ -254,6 +264,7 @@ class CommonVideoProcessor(MediaProcessor):
def transcode(self, medium_size=None, vp8_quality=None, vp8_threads=None, def transcode(self, medium_size=None, vp8_quality=None, vp8_threads=None,
vorbis_quality=None): vorbis_quality=None):
print self.curr_file, ": Enter transcode"
progress_callback = ProgressCallback(self.entry) progress_callback = ProgressCallback(self.entry)
tmp_dst = os.path.join(self.workbench.dir, self.part_filename) tmp_dst = os.path.join(self.workbench.dir, self.part_filename)
@ -292,25 +303,34 @@ class CommonVideoProcessor(MediaProcessor):
self.entry.media_files[self.curr_file].delete() self.entry.media_files[self.curr_file].delete()
else: else:
print self.curr_file, ": ->1.1"
print type(medium_size)
medium_size = tuple(medium_size)
print type(medium_size)
print self.curr_file, ": ->1.2"
self.transcoder.transcode(self.process_filename, tmp_dst, self.transcoder.transcode(self.process_filename, tmp_dst,
vp8_quality=vp8_quality, vp8_quality=vp8_quality,
vp8_threads=vp8_threads, vp8_threads=vp8_threads,
vorbis_quality=vorbis_quality, vorbis_quality=vorbis_quality,
progress_callback=progress_callback, progress_callback=progress_callback,
dimensions=tuple(medium_size)) dimensions=tuple(medium_size))
print self.curr_file, ": ->2"
if self.transcoder.dst_data: if self.transcoder.dst_data:
print self.curr_file, ": ->3"
# Push transcoded video to public storage # Push transcoded video to public storage
_log.debug('Saving medium...') _log.debug('Saving medium...')
store_public(self.entry, 'webm_video', tmp_dst, store_public(self.entry, 'webm_video', tmp_dst, self.part_filename)
self.name_builder.fill('{basename}.medium.webm'))
_log.debug('Saved medium') _log.debug('Saved medium')
print self.curr_file, ": ->4"
# Is this the file_metadata that paroneayea was talking about? # Is this the file_metadata that paroneayea was talking about?
self.entry.set_file_metadata(self.curr_file, **file_metadata) self.entry.set_file_metadata(self.curr_file, **file_metadata)
self.did_transcode = True self.did_transcode = True
print self.curr_file, ": Done transcode()"
def generate_thumb(self, thumb_size=None): def generate_thumb(self, thumb_size=None):
print self.curr_file, ": Enter generate_thumb()"
# Temporary file for the video thumbnail (cleaned up with workbench) # Temporary file for the video thumbnail (cleaned up with workbench)
tmp_thumb = os.path.join(self.workbench.dir, tmp_thumb = os.path.join(self.workbench.dir,
self.name_builder.fill( self.name_builder.fill(
@ -339,9 +359,10 @@ class CommonVideoProcessor(MediaProcessor):
self.name_builder.fill('{basename}.thumbnail.jpg')) self.name_builder.fill('{basename}.thumbnail.jpg'))
self.entry.set_file_metadata('thumb', thumb_size=thumb_size) self.entry.set_file_metadata('thumb', thumb_size=thumb_size)
print self.curr_file, ": Done generate_thumb()"
def store_orig_metadata(self): def store_orig_metadata(self):
print self.curr_file, ": 2"
# Extract metadata and keep a record of it # Extract metadata and keep a record of it
metadata = transcoders.discover(self.process_filename) metadata = transcoders.discover(self.process_filename)
@ -524,25 +545,41 @@ class VideoProcessingManager(ProcessingManager):
self.add_processor(Resizer) self.add_processor(Resizer)
self.add_processor(Transcoder) self.add_processor(Transcoder)
def workflow(self, entry, manager, feed_url, reprocess_action, def workflow(self, entry_id, feed_url, reprocess_action, reprocess_info=None):
reprocess_info=None):
reprocess_info['entry'] = entry reprocess_info = reprocess_info or {}
reprocess_info['manager'] = manager if 'vp8_quality' not in reprocess_info:
reprocess_info['vp8_quality'] = None
if 'vorbis_quality' not in reprocess_info:
reprocess_info['vorbis_quality'] = None
if 'vp8_threads' not in reprocess_info:
reprocess_info['vp8_threads'] = None
if 'thumb_size' not in reprocess_info:
reprocess_info['thumb_size'] = None
transcoding_tasks = group( transcoding_tasks = group([
main_task.signature(args=('480p', ACCEPTED_RESOLUTIONS['480p']), main_task.signature(args=(entry_id, '480p', ACCEPTED_RESOLUTIONS['480p']),
kwargs=reprocess_info, queue='default', kwargs=reprocess_info, queue='default',
priority=5, immutable=True), priority=5, immutable=True),
complimentary_task.signature(args=('360p', ACCEPTED_RESOLUTIONS['360p']), ])
kwargs=reprocess_info, queue='default',
priority=4, immutable=True),
complimentary_task.signature(args=('720p', ACCEPTED_RESOLUTIONS['720p']),
kwargs=reprocess_info, queue='default',
priority=3, immutable=True),
)
cleanup_task = processing_cleanup.signature(args=(entry, manager), cleanup_task = processing_cleanup.signature(args=(entry_id,),
queue='default', immutable=True) queue='default', immutable=True)
"""
complimentary_task.signature(args=(entry_id, '360p', ACCEPTED_RESOLUTIONS['360p']),
kwargs=reprocess_info, queue='default',
priority=4, immutable=True),
complimentary_task.signature(args=(entry_id, '720p', ACCEPTED_RESOLUTIONS['720p']),
kwargs=reprocess_info, queue='default',
priority=3, immutable=True),
main_task.apply_async(args=(entry_id, '480p', ACCEPTED_RESOLUTIONS['480p']),
kwargs=reprocess_info, queue='default',
priority=5, immutable=True)
processing_cleanup.apply_async(args=(entry_id,), queue='default', immutable=True)
"""
chord(transcoding_tasks)(cleanup_task) chord(transcoding_tasks)(cleanup_task)
# main_task(entry_id, '480p', ACCEPTED_RESOLUTIONS['480p'], **reprocess_info)
# processing_cleanup(entry_id)

View File

@ -257,8 +257,7 @@ class ProcessingManager(object):
return processor return processor
def workflow(self, entry, manager, feed_url, reprocess_action, def workflow(self, entry_id, feed_url, reprocess_action, reprocess_info=None):
reprocess_info=None):
""" """
Returns the Celery command needed to proceed with media processing Returns the Celery command needed to proceed with media processing
*This method has to be implemented in all media types* *This method has to be implemented in all media types*

View File

@ -267,7 +267,7 @@ def run_process_media(entry, feed_url=None,
entry, manager = get_entry_and_processing_manager(entry.id) entry, manager = get_entry_and_processing_manager(entry.id)
try: try:
manager.workflow(entry, manager, feed_url, reprocess_action, reprocess_info) manager.workflow(entry.id, feed_url, reprocess_action, reprocess_info)
except BaseException as exc: except BaseException as exc:
# The purpose of this section is because when running in "lazy" # The purpose of this section is because when running in "lazy"
# or always-eager-with-exceptions-propagated celery mode that # or always-eager-with-exceptions-propagated celery mode that