Add additional celery config settings

Fixes older webm_video backward compatibilty issue.
Add 'default' queue to be used from now.
Add other necessary celery settings for priority.
This commit is contained in:
vijeth-aradhya 2017-06-11 19:07:58 +05:30
parent dd0db38e2c
commit 9a27fa60a4
3 changed files with 15 additions and 3 deletions

View File

@ -154,6 +154,7 @@ CELERY_RESULT_DBURI = string(default="sqlite:///%(here)s/celery.db")
# default kombu stuff # default kombu stuff
BROKER_URL = string(default="amqp://") BROKER_URL = string(default="amqp://")
CELERY_DEFAULT_QUEUE = string(default="default")
# known booleans # known booleans
CELERY_RESULT_PERSISTENT = boolean() CELERY_RESULT_PERSISTENT = boolean()
@ -165,7 +166,7 @@ CELERY_EAGER_PROPAGATES_EXCEPTIONS = boolean()
CELERY_IGNORE_RESULT = boolean() CELERY_IGNORE_RESULT = boolean()
CELERY_TRACK_STARTED = boolean() CELERY_TRACK_STARTED = boolean()
CELERY_DISABLE_RATE_LIMITS = boolean() CELERY_DISABLE_RATE_LIMITS = boolean()
CELERY_ACKS_LATE = boolean() CELERY_ACKS_LATE = boolean(default=True)
CELERY_STORE_ERRORS_EVEN_IF_IGNORED = boolean() CELERY_STORE_ERRORS_EVEN_IF_IGNORED = boolean()
CELERY_SEND_TASK_ERROR_EMAILS = boolean() CELERY_SEND_TASK_ERROR_EMAILS = boolean()
CELERY_SEND_EVENTS = boolean() CELERY_SEND_EVENTS = boolean()
@ -175,7 +176,7 @@ CELERY_REDIRECT_STDOUTS = boolean()
# known ints # known ints
CELERYD_CONCURRENCY = integer() CELERYD_CONCURRENCY = integer()
CELERYD_PREFETCH_MULTIPLIER = integer() CELERYD_PREFETCH_MULTIPLIER = integer(default=1)
CELERY_AMQP_TASK_RESULT_EXPIRES = integer() CELERY_AMQP_TASK_RESULT_EXPIRES = integer()
CELERY_AMQP_TASK_RESULT_CONNECTION_MAX = integer() CELERY_AMQP_TASK_RESULT_CONNECTION_MAX = integer()
REDIS_PORT = integer() REDIS_PORT = integer()

View File

@ -22,6 +22,7 @@ import logging
import six import six
from celery import Celery from celery import Celery
from kombu import Exchange, Queue
from mediagoblin.tools.pluginapi import hook_runall from mediagoblin.tools.pluginapi import hook_runall
@ -32,6 +33,7 @@ MANDATORY_CELERY_IMPORTS = [
'mediagoblin.processing.task', 'mediagoblin.processing.task',
'mediagoblin.notifications.task', 'mediagoblin.notifications.task',
'mediagoblin.submit.task', 'mediagoblin.submit.task',
'mediagoblin.media_types.video.processing',
] ]
DEFAULT_SETTINGS_MODULE = 'mediagoblin.init.celery.dummy_settings_module' DEFAULT_SETTINGS_MODULE = 'mediagoblin.init.celery.dummy_settings_module'
@ -47,6 +49,12 @@ def get_celery_settings_dict(app_config, global_config,
else: else:
celery_conf = {} celery_conf = {}
# Add x-max-priority to config
celery_conf['CELERY_QUEUES'] = (
Queue('default', Exchange('default'), routing_key='default',
queue_arguments={'x-max-priority': 10}),
)
celery_settings = {} celery_settings = {}
# Add all celery settings from config # Add all celery settings from config

View File

@ -18,6 +18,7 @@ import argparse
import os.path import os.path
import logging import logging
import datetime import datetime
import celery
import six import six
@ -163,6 +164,7 @@ def store_metadata(media_entry, metadata):
# ===================== # =====================
@celery.task()
def main_task(**process_info): def main_task(**process_info):
processor = CommonVideoProcessor(process_info['manager'], process_info['entry']) processor = CommonVideoProcessor(process_info['manager'], process_info['entry'])
processor.common_setup(process_info['resolution']) processor.common_setup(process_info['resolution'])
@ -172,6 +174,7 @@ def main_task(**process_info):
processor.store_orig_metadata() processor.store_orig_metadata()
@celery.task()
def complimentary_task(**process_info): def complimentary_task(**process_info):
processor = CommonVideoProcessor(process_info['manager'], process_info['entry']) processor = CommonVideoProcessor(process_info['manager'], process_info['entry'])
processor.common_setup(process_info['resolution']) processor.common_setup(process_info['resolution'])
@ -179,6 +182,7 @@ def complimentary_task(**process_info):
vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality']) vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality'])
@celery.task()
def processing_cleanup(**process_info): def processing_cleanup(**process_info):
processor = CommonVideoProcessor(process_info['manager'], process_info['entry']) processor = CommonVideoProcessor(process_info['manager'], process_info['entry'])
processor.delete_queue_file() processor.delete_queue_file()
@ -408,7 +412,6 @@ class InitialProcessor(CommonVideoProcessor):
self.transcode(medium_size=medium_size, vp8_quality=vp8_quality, self.transcode(medium_size=medium_size, vp8_quality=vp8_quality,
vp8_threads=vp8_threads, vorbis_quality=vorbis_quality) vp8_threads=vp8_threads, vorbis_quality=vorbis_quality)
self.copy_original()
self.generate_thumb(thumb_size=thumb_size) self.generate_thumb(thumb_size=thumb_size)
self.delete_queue_file() self.delete_queue_file()