Add VideoProcessingManager.workflow() tests

Testing workflow method including its return data.
This commit is contained in:
vijeth-aradhya 2017-06-24 22:45:14 +05:30
parent 602cfcb789
commit 2771f4678a

View File

@ -47,9 +47,11 @@ import os
import pytest
import webtest.forms
import pkg_resources
import mock
import six.moves.urllib.parse as urlparse
from celery import Signature
from mediagoblin.tests.tools import (
fixture_add_user, fixture_add_collection, get_app)
from mediagoblin import mg_globals
@ -58,6 +60,10 @@ from mediagoblin.db.base import Session
from mediagoblin.tools import template
from mediagoblin.media_types.image import ImageMediaManager
from mediagoblin.media_types.pdf.processing import check_prerequisites as pdf_check_prerequisites
from mediagoblin.media_types.video.processing import (
VideoProcessingManager, main_task, complimentary_task, group)
from mediagoblin.media_types.video.util import ACCEPTED_RESOLUTIONS
from mediagoblin.submit.lib import new_upload_entry
from .resources import GOOD_JPG, GOOD_PNG, EVIL_FILE, EVIL_JPG, EVIL_PNG, \
BIG_BLUE, GOOD_PDF, GPS_JPG, MED_PNG, BIG_PNG
@ -101,6 +107,16 @@ def pdf_plugin_app(request):
'mediagoblin.tests',
'test_mgoblin_app_pdf.ini'))
def get_sample_entry(user):
entry = new_upload_entry(user)
entry.media_type = 'mediagoblin.media_types.video'
entry.title = 'testentry'
entry.description = u""
entry.license = None
entry.media_metadata = {}
entry.save()
return entry
class BaseTestSubmission:
@pytest.fixture(autouse=True)
@ -536,6 +552,59 @@ class TestSubmissionVideo(BaseTestSubmission):
with create_av(make_video=True) as path:
self.check_normal_upload('Video', path)
@mock.patch('mediagoblin.media_types.video.processing.complimentary_task.signature')
@mock.patch('mediagoblin.media_types.video.processing.main_task.signature')
def test_workflow(self, mock_main_task, mock_comp_task):
# create a new entry and get video manager
entry = get_sample_entry(self.our_user())
manager = VideoProcessingManager()
# prepare things for testing
video_config = mg_globals.global_config['plugins'][entry.media_type]
def_res = video_config['default_resolution']
priority_num = len(video_config['available_resolutions']) + 1
main_priority = priority_num
calls = []
reprocess_info = {
'vorbis_quality': None,
'vp8_threads': None,
'thumb_size': None,
'vp8_quality': None
}
for comp_res in video_config['available_resolutions']:
if comp_res != def_res:
priority_num += -1
calls.append(
mock.call(args=(entry.id, comp_res, ACCEPTED_RESOLUTIONS[comp_res]),
kwargs=reprocess_info, queue='default',
priority=priority_num, immutable=True)
)
# call workflow method
manager.workflow(entry, feed_url=None, reprocess_action='initial')
# test section
mock_main_task.assert_called_once_with(args=(entry.id, def_res,
ACCEPTED_RESOLUTIONS[def_res]),
kwargs=reprocess_info, queue='default',
priority=main_priority, immutable=True)
mock_comp_task.assert_has_calls(calls)
assert entry.state == u'processing'
# delete the entry
entry.delete()
def test_workflow_return(self):
entry = get_sample_entry(self.our_user())
manager = VideoProcessingManager()
wf = manager.workflow(entry, feed_url=None, reprocess_action='initial')
assert type(wf) == tuple
assert len(wf) == 2
assert isinstance(wf[0], group)
assert isinstance(wf[1], Signature)
entry.delete()
class TestSubmissionAudio(BaseTestSubmission):
@pytest.fixture(autouse=True)