Rewrite thumbnailer

Previous thumbnailer didn't always work properly. It was also not ready
to be ported to GStreamer 1.0

The rewrite makes it shorter, more pythonic and prepares it for porting.

 - no longer uses playbin2;
 - is tested
 - logs some events
 - previous thumbnailer is removed
This commit is contained in:
Boris Bobrov 2014-05-29 14:50:32 +04:00
parent a02de38f91
commit 7e266d5a37
3 changed files with 143 additions and 330 deletions

View File

@ -74,6 +74,13 @@ def store_metadata(media_entry, metadata):
""" """
Store metadata from this video for this media entry. Store metadata from this video for this media entry.
""" """
stored_metadata = dict()
audio_info_list = metadata.get_audio_streams()
if audio_info_list:
audio_info = audio_info_list[0]
stored_metadata['audiochannels'] = audio_info.get_channels()
# video is always there
video_info = metadata.get_video_streams()[0]
# Let's pull out the easy, not having to be converted ones first # Let's pull out the easy, not having to be converted ones first
stored_metadata = dict( stored_metadata = dict(
[(key, metadata[key]) [(key, metadata[key])
@ -270,7 +277,7 @@ class CommonVideoProcessor(MediaProcessor):
return return
# We will only use the width so that the correct scale is kept # We will only use the width so that the correct scale is kept
transcoders.VideoThumbnailerMarkII( transcoders.capture_thumb(
self.process_filename, self.process_filename,
tmp_thumb, tmp_thumb,
thumb_size[0]) thumb_size[0])

View File

@ -55,336 +55,73 @@ os.putenv('GST_DEBUG_DUMP_DOT_DIR', '/tmp')
def pixbuf_to_pilbuf(buf): def pixbuf_to_pilbuf(buf):
data = list() data = list()
for i in range(0, len(buf), 3): for i in range(0, len(buf)-4, 4):
r, g, b = struct.unpack('BBB', buf[i:i + 3]) r, g, b, x = struct.unpack('BBBB', buf[i:i + 4])
# XXX: can something be done with the 'X' part of RGBX?
data.append((r, g, b)) data.append((r, g, b))
return data return data
def capture_thumb(video_path, dest_path, width=None, height=None, percent=0.5):
class VideoThumbnailerMarkII(object): def pad_added(element, pad, connect_to):
''' caps = pad.get_caps()
Creates a thumbnail from a video file. Rewrite of VideoThumbnailer. name = caps[0].get_name()
_log.debug('on_pad_added: {0}'.format(name))
Large parts of the functionality and overall architectue contained within if name.startswith('video') and not connect_to.is_linked():
this object is taken from Participatory Culture Foundation's pad.link(connect_to)
`gst_extractor.Extractor` object last seen at # construct pipeline: uridecodebin ! ffmpegcolorspace ! videoscale ! \
https://github.com/pculture/miro/blob/master/tv/lib/frontends/widgets/gst/gst_extractor.py # ! CAPS ! appsink
in the `miro` codebase. pipeline = gst.Pipeline()
uridecodebin = gst.element_factory_make('uridecodebin')
The `miro` codebase and the gst_extractor.py are licensed under the GNU uridecodebin.set_property('uri', 'file://{0}'.format(video_path))
General Public License v2 or later. ffmpegcolorspace = gst.element_factory_make('ffmpegcolorspace')
''' uridecodebin.connect('pad-added', pad_added,
STATE_NULL = 0 ffmpegcolorspace.get_pad('sink'))
STATE_HALTING = 1 videoscale = gst.element_factory_make('videoscale')
STATE_PROCESSING = 2 filter = gst.element_factory_make('capsfilter', 'filter')
STATE_PROCESSING_THUMBNAIL = 3 # create caps for video scaling
caps_struct = gst.Structure('video/x-raw-rgb')
def __init__(self, source_path, dest_path, width=None, height=None, caps_struct.set_value('pixel-aspect-ratio', gst.Fraction(1, 1))
position_callback=None): if height:
self.state = self.STATE_NULL caps_struct.set_value('height', height)
if width:
self.has_reached_playbin_pause = False caps_struct.set_value('width', width)
caps = gst.Caps(caps_struct)
self.thumbnail_pipeline = None filter.set_property('caps', caps)
appsink = gst.element_factory_make('appsink')
self.permission_to_take_picture = False pipeline.add(uridecodebin, ffmpegcolorspace, videoscale, filter, appsink)
gst.element_link_many(ffmpegcolorspace, videoscale, filter, appsink)
self.buffer_probes = {} # pipeline constructed, starting playing, but first some preparations
if pipeline.set_state(gst.STATE_PAUSED) == gst.STATE_CHANGE_FAILURE:
self.errors = [] _log.warning('state change failed')
pipeline.get_state()
self.source_path = os.path.abspath(source_path) duration = pipeline.query_duration(gst.FORMAT_TIME, None)[0]
self.dest_path = os.path.abspath(dest_path) if duration == gst.CLOCK_TIME_NONE:
_log.warning('query_duration failed')
self.width = width duration = 0 # XXX
self.height = height seek_to = int(duration * int(percent * 100) / 100)
self.position_callback = position_callback \ _log.debug('Seeking to {0} of {1}'.format(
or self.wadsworth_position_callback seek_to / gst.SECOND, duration / gst.SECOND))
seek = pipeline.seek_simple(gst.FORMAT_TIME, gst.SEEK_FLAG_FLUSH, seek_to)
self.mainloop = gobject.MainLoop() if not seek:
_log.warning('seek failed')
self.playbin = gst.element_factory_make('playbin') # get sample, retrieve it's format and save
sample = appsink.emit("pull-preroll")
self.videosink = gst.element_factory_make('fakesink', 'videosink') if not sample:
self.audiosink = gst.element_factory_make('fakesink', 'audiosink') _log.warning('could not get sample')
return
self.playbin.set_property('video-sink', self.videosink) caps = sample.get_caps()
self.playbin.set_property('audio-sink', self.audiosink) if not caps:
_log.warning('could not get snapshot format')
self.playbin_message_bus = self.playbin.get_bus() structure = caps.get_structure(0)
(success, width) = structure.get_int('width')
self.playbin_message_bus.add_signal_watch() (success, height) = structure.get_int('height')
self.playbin_bus_watch_id = self.playbin_message_bus.connect( buffer = sample.get_buffer()
'message', im = Image.frombytes('RGB', (width, height),
self.on_playbin_message) buffer.extract_dup(0, buffer.get_size()))
im.save(dest_path)
self.playbin.set_property( _log.info('thumbnail saved to {0}'.format(dest_path))
'uri', # cleanup
'file:{0}'.format( pipeline.set_state(gst.STATE_NULL)
urllib.pathname2url(self.source_path)))
self.playbin.set_state(gst.STATE_PAUSED)
try:
self.run()
except Exception as exc:
_log.critical(
'Exception "{0}" caught, shutting down mainloop and re-raising'\
.format(exc))
self.disconnect()
raise
def wadsworth_position_callback(self, duration, gst):
return self.duration / 100 * 30
def run(self):
self.mainloop.run()
def on_playbin_message(self, message_bus, message):
# Silenced to prevent clobbering of output
#_log.debug('playbin message: {0}'.format(message))
if message.type == gst.MESSAGE_ERROR:
_log.error('playbin error: {0}'.format(message))
gobject.idle_add(self.on_playbin_error)
if message.type == gst.MESSAGE_STATE_CHANGED:
prev_state, cur_state, pending_state = \
message.parse_state_changed()
_log.debug('playbin state changed: \nprev: {0}\ncur: {1}\n \
pending: {2}'.format(
prev_state,
cur_state,
pending_state))
if cur_state == gst.STATE_PAUSED:
if message.src == self.playbin:
_log.info('playbin ready')
gobject.idle_add(self.on_playbin_paused)
def on_playbin_paused(self):
if self.has_reached_playbin_pause:
_log.warn('Has already reached on_playbin_paused. Aborting \
without doing anything this time.')
return False
self.has_reached_playbin_pause = True
# XXX: Why is this even needed at this point?
current_video = self.playbin.get_property('current-video')
if not current_video:
_log.critical('Could not get any video data \
from playbin')
else:
_log.info('Got video data from playbin')
self.duration = self.get_duration(self.playbin)
self.permission_to_take_picture = True
self.buffer_probes = {}
pipeline = ''.join([
'filesrc location="%s" ! decodebin2 ! ' % self.source_path,
'ffmpegcolorspace ! videoscale ! ',
'video/x-raw-rgb,depth=24,bpp=24,pixel-aspect-ratio=1/1',
',width={0}'.format(self.width) if self.width else '',
',height={0}'.format(self.height) if self.height else '',
' ! ',
'fakesink signal-handoffs=True'])
_log.debug('thumbnail_pipeline: {0}'.format(pipeline))
self.thumbnail_pipeline = gst.parse_launch(pipeline)
self.thumbnail_message_bus = self.thumbnail_pipeline.get_bus()
self.thumbnail_message_bus.add_signal_watch()
self.thumbnail_bus_watch_id = self.thumbnail_message_bus.connect(
'message',
self.on_thumbnail_message)
self.thumbnail_pipeline.set_state(gst.STATE_PAUSED)
gobject.timeout_add(3000, self.on_gobject_timeout)
return False
def on_thumbnail_message(self, message_bus, message):
# This is silenced to prevent clobbering of the terminal window
#_log.debug('thumbnail message: {0}'.format(message))
if message.type == gst.MESSAGE_ERROR:
_log.error('thumbnail error: {0}'.format(message.parse_error()))
gobject.idle_add(self.on_thumbnail_error, message)
if message.type == gst.MESSAGE_STATE_CHANGED:
prev_state, cur_state, pending_state = \
message.parse_state_changed()
_log.debug('thumbnail state changed: \nprev: {0}\ncur: {1}\n \
pending: {2}'.format(
prev_state,
cur_state,
pending_state))
if cur_state == gst.STATE_PAUSED and \
not self.state == self.STATE_PROCESSING_THUMBNAIL:
# Find the fakesink sink pad and attach the on_buffer_probe
# handler to it.
seek_amount = self.position_callback(self.duration, gst)
seek_result = self.thumbnail_pipeline.seek(
1.0,
gst.FORMAT_TIME,
gst.SEEK_FLAG_FLUSH | gst.SEEK_FLAG_ACCURATE,
gst.SEEK_TYPE_SET,
seek_amount,
gst.SEEK_TYPE_NONE,
0)
if not seek_result:
_log.info('Could not seek.')
else:
_log.info('Seek successful, attaching buffer probe')
self.state = self.STATE_PROCESSING_THUMBNAIL
for sink in self.thumbnail_pipeline.sinks():
sink_name = sink.get_name()
sink_factory_name = sink.get_factory().get_name()
if sink_factory_name == 'fakesink':
sink_pad = sink.get_pad('sink')
self.buffer_probes[sink_name] = sink_pad\
.add_buffer_probe(
self.on_pad_buffer_probe,
sink_name)
_log.info('Attached buffer probes: {0}'.format(
self.buffer_probes))
break
elif self.state == self.STATE_PROCESSING_THUMBNAIL:
_log.info('Already processing thumbnail')
def on_pad_buffer_probe(self, *args):
_log.debug('buffer probe handler: {0}'.format(args))
gobject.idle_add(lambda: self.take_snapshot(*args))
def take_snapshot(self, pad, buff, name):
if self.state == self.STATE_HALTING:
_log.debug('Pipeline is halting, will not take snapshot')
return False
_log.info('Taking snapshot! ({0})'.format(
(pad, buff, name)))
try:
caps = buff.caps
if caps is None:
_log.error('No buffer caps present /take_snapshot')
self.disconnect()
_log.debug('caps: {0}'.format(caps))
filters = caps[0]
width = filters['width']
height = filters['height']
im = Image.new('RGB', (width, height))
data = pixbuf_to_pilbuf(buff.data)
im.putdata(data)
im.save(self.dest_path)
_log.info('Saved snapshot!')
self.disconnect()
except gst.QueryError as exc:
_log.error('take_snapshot - QueryError: {0}'.format(exc))
return False
def on_thumbnail_error(self, message):
scaling_failed = False
if 'Error calculating the output scaled size - integer overflow' \
in message.parse_error()[1]:
# GStreamer videoscale sometimes fails to calculate the dimensions
# given only one of the destination dimensions and the source
# dimensions. This is a workaround in case videoscale returns an
# error that indicates this has happened.
scaling_failed = True
_log.error('Thumbnailing failed because of videoscale integer'
' overflow. Will retry with fallback.')
else:
_log.error('Thumbnailing failed: {0}'.format(message.parse_error()))
# Kill the current mainloop
self.disconnect()
if scaling_failed:
# Manually scale the destination dimensions
_log.info('Retrying with manually set sizes...')
info = VideoTranscoder().discover(self.source_path)
h = info['videoheight']
w = info['videowidth']
ratio = 180 / int(w)
h = int(h * ratio)
self.__init__(self.source_path, self.dest_path, 180, h)
def disconnect(self):
self.state = self.STATE_HALTING
if self.playbin is not None:
self.playbin.set_state(gst.STATE_NULL)
for sink in self.playbin.sinks():
sink_name = sink.get_name()
sink_factory_name = sink.get_factory().get_name()
if sink_factory_name == 'fakesink':
sink_pad = sink.get_pad('sink')
sink_pad.remove_buffer_probe(self.buffer_probes[sink_name])
del self.buffer_probes[sink_name]
self.playbin = None
if self.thumbnail_pipeline is not None:
self.thumbnail_pipeline.set_state(gst.STATE_NULL)
self.thumbnail_pipeline = None
if self.playbin_message_bus is not None:
self.playbin_message_bus.disconnect(self.playbin_bus_watch_id)
self.playbin_message_bus = None
self.halt()
def halt(self):
gobject.idle_add(self.mainloop.quit)
def on_gobject_timeout(self):
_log.critical('Reached gobject timeout')
self.disconnect()
def get_duration(self, pipeline, attempt=1):
if attempt == 5:
_log.critical('Pipeline duration query retry limit reached.')
return 0
try:
return pipeline.query_duration(gst.FORMAT_TIME)[0]
except gst.QueryError as exc:
_log.error('Could not get duration on attempt {0}: {1}'.format(
attempt,
exc))
return self.get_duration(pipeline, attempt + 1)
class VideoTranscoder(object): class VideoTranscoder(object):
@ -451,7 +188,6 @@ class VideoTranscoder(object):
self.discoverer.discover() self.discoverer.discover()
self.loop.run() self.loop.run()
if hasattr(self, '_discovered_data'): if hasattr(self, '_discovered_data'):
return self._discovered_data.__dict__ return self._discovered_data.__dict__
else: else:
@ -729,7 +465,6 @@ class VideoTranscoder(object):
if __name__ == '__main__': if __name__ == '__main__':
os.nice(19) os.nice(19)
logging.basicConfig()
from optparse import OptionParser from optparse import OptionParser
parser = OptionParser( parser = OptionParser(

View File

@ -0,0 +1,71 @@
# GNU MediaGoblin -- federated, autonomous media hosting
# Copyright (C) 2013 MediaGoblin contributors. See AUTHORS.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import tempfile
import shutil
import os
import pytest
from contextlib import contextmanager
import logging
import imghdr
#TODO: this should be skipped if video plugin is not enabled
import pygst
pygst.require('0.10')
import gst
from mediagoblin.media_types.video.transcoders import capture_thumb
@contextmanager
def create_data(suffix):
video = tempfile.NamedTemporaryFile()
src = gst.element_factory_make('videotestsrc')
src.set_property('num-buffers', 50)
enc = gst.element_factory_make('theoraenc')
mux = gst.element_factory_make('oggmux')
dst = gst.element_factory_make('filesink')
dst.set_property('location', video.name)
pipeline = gst.Pipeline()
pipeline.add(src, enc, mux, dst)
gst.element_link_many(src, enc, mux, dst)
pipeline.set_state(gst.STATE_PLAYING)
# wait for finish
bus = pipeline.get_bus()
message = bus.timed_pop_filtered(gst.CLOCK_TIME_NONE,
gst.MESSAGE_ERROR | gst.MESSAGE_EOS)
thumb = tempfile.NamedTemporaryFile(suffix=suffix)
pipeline.set_state(gst.STATE_NULL)
yield (video.name, thumb.name)
#TODO: this should be skipped if video plugin is not enabled
def test_thumbnails():
'''
Test thumbnails generation.
1. Create a video from gst's videotestsrc
3. Capture thumbnail
4. Remove it
'''
#data create_data() as (video_name, thumbnail_name):
test_formats = [('.png', 'png'), ('.jpg', 'jpeg'), ('.gif', 'gif')]
for suffix, format in test_formats:
with create_data(suffix) as (video_name, thumbnail_name):
capture_thumb(video_name, thumbnail_name, width=40)
# check if png
assert imghdr.what(thumbnail_name) == format
# TODO: check height and width
# FIXME: it doesn't work with small width, say, 10px. This should be
# fixed somehow