mediagoblin/mediagoblin/tests/test_storage.py
Sebastian Spaeth 7e55bcb898 Fix up tests
empty find() queries would not work anymore with the simplified .find
compatability code, so remove these and use proper sqlalchemy in the
tests.

The storage test failed because my virtualenv environment ran
mediagoblin/local/mediagoblin/tests/test_storage.py and somehow decided
the 2 classes are different objects. Just test against the full class name.

Signed-off-by: Sebastian Spaeth <Sebastian@SSpaeth.de>
2012-12-21 00:30:48 +01:00

283 lines
9.4 KiB
Python

# GNU MediaGoblin -- federated, autonomous media hosting
# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import tempfile
from nose.tools import assert_raises, assert_equal, assert_true
from werkzeug.utils import secure_filename
from mediagoblin import storage
################
# Test utilities
################
def test_clean_listy_filepath():
expected = [u'dir1', u'dir2', u'linooks.jpg']
assert storage.clean_listy_filepath(
['dir1', 'dir2', 'linooks.jpg']) == expected
expected = [u'dir1', u'foo_.._nasty', u'linooks.jpg']
assert storage.clean_listy_filepath(
['/dir1/', 'foo/../nasty', 'linooks.jpg']) == expected
expected = [u'etc', u'passwd']
assert storage.clean_listy_filepath(
['../../../etc/', 'passwd']) == expected
assert_raises(
storage.InvalidFilepath,
storage.clean_listy_filepath,
['../../', 'linooks.jpg'])
class FakeStorageSystem():
def __init__(self, foobie, blech, **kwargs):
self.foobie = foobie
self.blech = blech
class FakeRemoteStorage(storage.filestorage.BasicFileStorage):
# Theoretically despite this, all the methods should work but it
# should force copying to the workbench
local_storage = False
def copy_local_to_storage(self, *args, **kwargs):
return storage.StorageInterface.copy_local_to_storage(
self, *args, **kwargs)
def test_storage_system_from_config():
this_storage = storage.storage_system_from_config(
{'base_url': 'http://example.org/moodia/',
'base_dir': '/tmp/',
'garbage_arg': 'garbage_arg',
'garbage_arg': 'trash'})
assert this_storage.base_url == 'http://example.org/moodia/'
assert this_storage.base_dir == '/tmp/'
assert this_storage.__class__ is storage.filestorage.BasicFileStorage
this_storage = storage.storage_system_from_config(
{'foobie': 'eiboof',
'blech': 'hcelb',
'garbage_arg': 'garbage_arg',
'storage_class':
'mediagoblin.tests.test_storage:FakeStorageSystem'})
assert_equal(this_storage.foobie, 'eiboof')
assert_equal(this_storage.blech, 'hcelb')
assert_equal(unicode(this_storage.__class__),
u'mediagoblin.tests.test_storage.FakeStorageSystem')
##########################
# Basic file storage tests
##########################
def get_tmp_filestorage(mount_url=None, fake_remote=False):
tmpdir = tempfile.mkdtemp()
if fake_remote:
this_storage = FakeRemoteStorage(tmpdir, mount_url)
else:
this_storage = storage.filestorage.BasicFileStorage(tmpdir, mount_url)
return tmpdir, this_storage
def test_basic_storage__resolve_filepath():
tmpdir, this_storage = get_tmp_filestorage()
result = this_storage._resolve_filepath(['dir1', 'dir2', 'filename.jpg'])
assert result == os.path.join(
tmpdir, 'dir1/dir2/filename.jpg')
result = this_storage._resolve_filepath(['../../etc/', 'passwd'])
assert result == os.path.join(
tmpdir, 'etc/passwd')
assert_raises(
storage.InvalidFilepath,
this_storage._resolve_filepath,
['../../', 'etc', 'passwd'])
def test_basic_storage_file_exists():
tmpdir, this_storage = get_tmp_filestorage()
os.makedirs(os.path.join(tmpdir, 'dir1', 'dir2'))
filename = os.path.join(tmpdir, 'dir1', 'dir2', 'filename.txt')
with open(filename, 'w') as ourfile:
ourfile.write("I'm having a lovely day!")
assert this_storage.file_exists(['dir1', 'dir2', 'filename.txt'])
assert not this_storage.file_exists(['dir1', 'dir2', 'thisfile.lol'])
assert not this_storage.file_exists(['dnedir1', 'dnedir2', 'somefile.lol'])
def test_basic_storage_get_unique_filepath():
tmpdir, this_storage = get_tmp_filestorage()
# write something that exists
os.makedirs(os.path.join(tmpdir, 'dir1', 'dir2'))
filename = os.path.join(tmpdir, 'dir1', 'dir2', 'filename.txt')
with open(filename, 'w') as ourfile:
ourfile.write("I'm having a lovely day!")
# now we want something new, with the same name!
new_filepath = this_storage.get_unique_filepath(
['dir1', 'dir2', 'filename.txt'])
assert new_filepath[:-1] == [u'dir1', u'dir2']
new_filename = new_filepath[-1]
assert new_filename.endswith('filename.txt')
assert len(new_filename) > len('filename.txt')
assert new_filename == secure_filename(new_filename)
def test_basic_storage_get_file():
tmpdir, this_storage = get_tmp_filestorage()
# Write a brand new file
filepath = ['dir1', 'dir2', 'ourfile.txt']
with this_storage.get_file(filepath, 'w') as our_file:
our_file.write('First file')
with this_storage.get_file(filepath, 'r') as our_file:
assert our_file.read() == 'First file'
assert os.path.exists(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
with file(os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'), 'r') as our_file:
assert our_file.read() == 'First file'
# Write to the same path but try to get a unique file.
new_filepath = this_storage.get_unique_filepath(filepath)
assert not os.path.exists(os.path.join(tmpdir, *new_filepath))
with this_storage.get_file(new_filepath, 'w') as our_file:
our_file.write('Second file')
with this_storage.get_file(new_filepath, 'r') as our_file:
assert our_file.read() == 'Second file'
assert os.path.exists(os.path.join(tmpdir, *new_filepath))
with file(os.path.join(tmpdir, *new_filepath), 'r') as our_file:
assert our_file.read() == 'Second file'
# Read from an existing file
manually_written_file = os.makedirs(
os.path.join(tmpdir, 'testydir'))
with file(os.path.join(tmpdir, 'testydir/testyfile.txt'), 'w') as testyfile:
testyfile.write('testy file! so testy.')
with this_storage.get_file(['testydir', 'testyfile.txt']) as testyfile:
assert testyfile.read() == 'testy file! so testy.'
def test_basic_storage_delete_file():
tmpdir, this_storage = get_tmp_filestorage()
assert not os.path.exists(
os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
filepath = ['dir1', 'dir2', 'ourfile.txt']
with this_storage.get_file(filepath, 'w') as our_file:
our_file.write('Testing this file')
assert os.path.exists(
os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
this_storage.delete_file(filepath)
assert not os.path.exists(
os.path.join(tmpdir, 'dir1/dir2/ourfile.txt'))
def test_basic_storage_url_for_file():
# Not supplying a base_url should actually just bork.
tmpdir, this_storage = get_tmp_filestorage()
assert_raises(
storage.NoWebServing,
this_storage.file_url,
['dir1', 'dir2', 'filename.txt'])
# base_url without domain
tmpdir, this_storage = get_tmp_filestorage('/media/')
result = this_storage.file_url(
['dir1', 'dir2', 'filename.txt'])
expected = '/media/dir1/dir2/filename.txt'
assert result == expected
# base_url with domain
tmpdir, this_storage = get_tmp_filestorage(
'http://media.example.org/ourmedia/')
result = this_storage.file_url(
['dir1', 'dir2', 'filename.txt'])
expected = 'http://media.example.org/ourmedia/dir1/dir2/filename.txt'
assert result == expected
def test_basic_storage_get_local_path():
tmpdir, this_storage = get_tmp_filestorage()
result = this_storage.get_local_path(
['dir1', 'dir2', 'filename.txt'])
expected = os.path.join(
tmpdir, 'dir1/dir2/filename.txt')
assert result == expected
def test_basic_storage_is_local():
tmpdir, this_storage = get_tmp_filestorage()
assert this_storage.local_storage is True
def test_basic_storage_copy_locally():
tmpdir, this_storage = get_tmp_filestorage()
dest_tmpdir = tempfile.mkdtemp()
filepath = ['dir1', 'dir2', 'ourfile.txt']
with this_storage.get_file(filepath, 'w') as our_file:
our_file.write('Testing this file')
new_file_dest = os.path.join(dest_tmpdir, 'file2.txt')
this_storage.copy_locally(filepath, new_file_dest)
assert file(new_file_dest).read() == 'Testing this file'
def _test_copy_local_to_storage_works(tmpdir, this_storage):
local_filename = tempfile.mktemp()
with file(local_filename, 'w') as tmpfile:
tmpfile.write('haha')
this_storage.copy_local_to_storage(
local_filename, ['dir1', 'dir2', 'copiedto.txt'])
assert file(
os.path.join(tmpdir, 'dir1/dir2/copiedto.txt'),
'r').read() == 'haha'
def test_basic_storage_copy_local_to_storage():
tmpdir, this_storage = get_tmp_filestorage()
_test_copy_local_to_storage_works(tmpdir, this_storage)
def test_general_storage_copy_local_to_storage():
tmpdir, this_storage = get_tmp_filestorage(fake_remote=True)
_test_copy_local_to_storage_works(tmpdir, this_storage)