Add test_chord for TestSubmissionVideo
Testing the arguments passed to celery.chord when a video is submitted.
This commit is contained in:
parent
2771f4678a
commit
ee2b53dea1
@ -61,9 +61,9 @@ from mediagoblin.tools import template
|
||||
from mediagoblin.media_types.image import ImageMediaManager
|
||||
from mediagoblin.media_types.pdf.processing import check_prerequisites as pdf_check_prerequisites
|
||||
from mediagoblin.media_types.video.processing import (
|
||||
VideoProcessingManager, main_task, complimentary_task, group)
|
||||
VideoProcessingManager, main_task, complimentary_task, group, processing_cleanup)
|
||||
from mediagoblin.media_types.video.util import ACCEPTED_RESOLUTIONS
|
||||
from mediagoblin.submit.lib import new_upload_entry
|
||||
from mediagoblin.submit.lib import new_upload_entry, run_process_media
|
||||
|
||||
from .resources import GOOD_JPG, GOOD_PNG, EVIL_FILE, EVIL_JPG, EVIL_PNG, \
|
||||
BIG_BLUE, GOOD_PDF, GPS_JPG, MED_PNG, BIG_PNG
|
||||
@ -107,9 +107,9 @@ def pdf_plugin_app(request):
|
||||
'mediagoblin.tests',
|
||||
'test_mgoblin_app_pdf.ini'))
|
||||
|
||||
def get_sample_entry(user):
|
||||
def get_sample_entry(user, media_type):
|
||||
entry = new_upload_entry(user)
|
||||
entry.media_type = 'mediagoblin.media_types.video'
|
||||
entry.media_type = media_type
|
||||
entry.title = 'testentry'
|
||||
entry.description = u""
|
||||
entry.license = None
|
||||
@ -552,12 +552,13 @@ class TestSubmissionVideo(BaseTestSubmission):
|
||||
with create_av(make_video=True) as path:
|
||||
self.check_normal_upload('Video', path)
|
||||
|
||||
@mock.patch('mediagoblin.media_types.video.processing.processing_cleanup.signature')
|
||||
@mock.patch('mediagoblin.media_types.video.processing.complimentary_task.signature')
|
||||
@mock.patch('mediagoblin.media_types.video.processing.main_task.signature')
|
||||
def test_workflow(self, mock_main_task, mock_comp_task):
|
||||
def test_celery_tasks(self, mock_main_task, mock_comp_task, mock_cleanup):
|
||||
|
||||
# create a new entry and get video manager
|
||||
entry = get_sample_entry(self.our_user())
|
||||
entry = get_sample_entry(self.our_user(), 'mediagoblin.media_types.video')
|
||||
manager = VideoProcessingManager()
|
||||
|
||||
# prepare things for testing
|
||||
@ -590,19 +591,83 @@ class TestSubmissionVideo(BaseTestSubmission):
|
||||
kwargs=reprocess_info, queue='default',
|
||||
priority=main_priority, immutable=True)
|
||||
mock_comp_task.assert_has_calls(calls)
|
||||
mock_cleanup.assert_called_once_with(args=(entry.id,), queue='default',
|
||||
immutable=True)
|
||||
assert entry.state == u'processing'
|
||||
|
||||
# delete the entry
|
||||
entry.delete()
|
||||
|
||||
def test_workflow_return(self):
|
||||
entry = get_sample_entry(self.our_user())
|
||||
def test_workflow(self):
|
||||
entry = get_sample_entry(self.our_user(), 'mediagoblin.media_types.video')
|
||||
manager = VideoProcessingManager()
|
||||
wf = manager.workflow(entry, feed_url=None, reprocess_action='initial')
|
||||
assert type(wf) == tuple
|
||||
assert len(wf) == 2
|
||||
assert isinstance(wf[0], group)
|
||||
assert isinstance(wf[1], Signature)
|
||||
|
||||
# more precise testing
|
||||
video_config = mg_globals.global_config['plugins'][entry.media_type]
|
||||
def_res = video_config['default_resolution']
|
||||
priority_num = len(video_config['available_resolutions']) + 1
|
||||
reprocess_info = {
|
||||
'vorbis_quality': None,
|
||||
'vp8_threads': None,
|
||||
'thumb_size': None,
|
||||
'vp8_quality': None
|
||||
}
|
||||
tasks_list = [main_task.signature(args=(entry.id, def_res,
|
||||
ACCEPTED_RESOLUTIONS[def_res]),
|
||||
kwargs=reprocess_info, queue='default',
|
||||
priority=priority_num, immutable=True)]
|
||||
for comp_res in video_config['available_resolutions']:
|
||||
if comp_res != def_res:
|
||||
priority_num += -1
|
||||
tasks_list.append(
|
||||
complimentary_task.signature(args=(entry.id, comp_res,
|
||||
ACCEPTED_RESOLUTIONS[comp_res]),
|
||||
kwargs=reprocess_info, queue='default',
|
||||
priority=priority_num, immutable=True)
|
||||
)
|
||||
transcoding_tasks = group(tasks_list)
|
||||
cleanup_task = processing_cleanup.signature(args=(entry.id,),
|
||||
queue='default', immutable=True)
|
||||
assert wf[0] == transcoding_tasks
|
||||
assert wf[1] == cleanup_task
|
||||
entry.delete()
|
||||
|
||||
@mock.patch('mediagoblin.submit.lib.ProcessMedia.apply_async')
|
||||
@mock.patch('mediagoblin.submit.lib.chord')
|
||||
def test_celery_chord(self, mock_chord, mock_process_media):
|
||||
entry = get_sample_entry(self.our_user(), 'mediagoblin.media_types.video')
|
||||
|
||||
# prepare things for testing
|
||||
video_config = mg_globals.global_config['plugins'][entry.media_type]
|
||||
def_res = video_config['default_resolution']
|
||||
priority_num = len(video_config['available_resolutions']) + 1
|
||||
reprocess_info = {
|
||||
'vorbis_quality': None,
|
||||
'vp8_threads': None,
|
||||
'thumb_size': None,
|
||||
'vp8_quality': None
|
||||
}
|
||||
tasks_list = [main_task.signature(args=(entry.id, def_res,
|
||||
ACCEPTED_RESOLUTIONS[def_res]),
|
||||
kwargs=reprocess_info, queue='default',
|
||||
priority=priority_num, immutable=True)]
|
||||
for comp_res in video_config['available_resolutions']:
|
||||
if comp_res != def_res:
|
||||
priority_num += -1
|
||||
tasks_list.append(
|
||||
complimentary_task.signature(args=(entry.id, comp_res,
|
||||
ACCEPTED_RESOLUTIONS[comp_res]),
|
||||
kwargs=reprocess_info, queue='default',
|
||||
priority=priority_num, immutable=True)
|
||||
)
|
||||
transcoding_tasks = group(tasks_list)
|
||||
run_process_media(entry)
|
||||
mock_chord.assert_called_once_with(transcoding_tasks)
|
||||
entry.delete()
|
||||
|
||||
|
||||
@ -660,3 +725,4 @@ class TestSubmissionPDF(BaseTestSubmission):
|
||||
**self.upload_data(GOOD_PDF))
|
||||
self.check_url(response, '/u/{0}/'.format(self.our_user().username))
|
||||
assert 'mediagoblin/user_pages/user.html' in context
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user