Merge remote-tracking branch 'gitorious/master'
This commit is contained in:
@@ -23,8 +23,9 @@ from webob import Request, exc
|
||||
|
||||
from mediagoblin import routing, util, storage, staticdirect
|
||||
from mediagoblin.db.open import setup_connection_and_db_from_config
|
||||
from mediagoblin.globals import setup_globals
|
||||
from mediagoblin.mg_globals import setup_globals
|
||||
from mediagoblin.celery_setup import setup_celery_from_config
|
||||
from mediagoblin.workbench import WorkbenchManager, DEFAULT_WORKBENCH_DIR
|
||||
|
||||
|
||||
class Error(Exception): pass
|
||||
@@ -39,7 +40,8 @@ class MediaGoblinApp(object):
|
||||
public_store, queue_store,
|
||||
staticdirector,
|
||||
email_sender_address, email_debug_mode,
|
||||
user_template_path=None):
|
||||
user_template_path=None,
|
||||
workbench_path=DEFAULT_WORKBENCH_DIR):
|
||||
# Get the template environment
|
||||
self.template_loader = util.get_jinja_loader(user_template_path)
|
||||
|
||||
@@ -66,7 +68,8 @@ class MediaGoblinApp(object):
|
||||
db_connection=connection,
|
||||
database=self.db,
|
||||
public_store=self.public_store,
|
||||
queue_store=self.queue_store)
|
||||
queue_store=self.queue_store,
|
||||
workbench_manager=WorkbenchManager(workbench_path))
|
||||
|
||||
def __call__(self, environ, start_response):
|
||||
request = Request(environ)
|
||||
@@ -154,6 +157,7 @@ def paste_app_factory(global_config, **app_config):
|
||||
email_sender_address=app_config.get(
|
||||
'email_sender_address', 'notice@mediagoblin.example.org'),
|
||||
email_debug_mode=asbool(app_config.get('email_debug_mode')),
|
||||
user_template_path=app_config.get('local_templates'))
|
||||
user_template_path=app_config.get('local_templates'),
|
||||
workbench_path=app_config.get('workbench_path', DEFAULT_WORKBENCH_DIR))
|
||||
|
||||
return mgoblin_app
|
||||
|
||||
@@ -20,7 +20,7 @@ import random
|
||||
import bcrypt
|
||||
|
||||
from mediagoblin.util import send_email, render_template
|
||||
from mediagoblin import globals as mgoblin_globals
|
||||
from mediagoblin import mg_globals
|
||||
|
||||
|
||||
def bcrypt_check_password(raw_pass, stored_hash, extra_salt=None):
|
||||
@@ -112,7 +112,7 @@ def send_verification_email(user, request):
|
||||
|
||||
# TODO: There is no error handling in place
|
||||
send_email(
|
||||
mgoblin_globals.email_sender_address,
|
||||
mg_globals.email_sender_address,
|
||||
[user['email']],
|
||||
# TODO
|
||||
# Due to the distributed nature of GNU MediaGoblin, we should
|
||||
|
||||
@@ -22,14 +22,14 @@ from paste.deploy.converters import asbool
|
||||
from mediagoblin import storage
|
||||
from mediagoblin.db.open import setup_connection_and_db_from_config
|
||||
from mediagoblin.celery_setup import setup_celery_from_config
|
||||
from mediagoblin.globals import setup_globals
|
||||
from mediagoblin import globals as mgoblin_globals
|
||||
from mediagoblin.mg_globals import setup_globals
|
||||
from mediagoblin.workbench import WorkbenchManager, DEFAULT_WORKBENCH_DIR
|
||||
|
||||
|
||||
OUR_MODULENAME = 'mediagoblin.celery_setup.from_celery'
|
||||
OUR_MODULENAME = __name__
|
||||
|
||||
|
||||
def setup_self(setup_globals_func=setup_globals):
|
||||
def setup_self():
|
||||
"""
|
||||
Transform this module into a celery config module by reading the
|
||||
mediagoblin config file. Set the environment variable
|
||||
@@ -76,7 +76,11 @@ def setup_self(setup_globals_func=setup_globals):
|
||||
queue_store = storage.storage_system_from_paste_config(
|
||||
mgoblin_section, 'queuestore')
|
||||
|
||||
setup_globals_func(
|
||||
workbench_manager = WorkbenchManager(
|
||||
mgoblin_section.get(
|
||||
'workbench_path', DEFAULT_WORKBENCH_DIR))
|
||||
|
||||
setup_globals(
|
||||
db_connection=connection,
|
||||
database=db,
|
||||
public_store=public_store,
|
||||
@@ -84,7 +88,8 @@ def setup_self(setup_globals_func=setup_globals):
|
||||
email_sender_address=mgoblin_section.get(
|
||||
'email_sender_address',
|
||||
'notice@mediagoblin.example.org'),
|
||||
queue_store=queue_store)
|
||||
queue_store=queue_store,
|
||||
workbench_manager=workbench_manager)
|
||||
|
||||
|
||||
if os.environ['CELERY_CONFIG_MODULE'] == OUR_MODULENAME:
|
||||
|
||||
@@ -19,13 +19,12 @@ import os
|
||||
from mediagoblin.tests.tools import TEST_APP_CONFIG
|
||||
from mediagoblin import util
|
||||
from mediagoblin.celery_setup import setup_celery_from_config
|
||||
from mediagoblin.globals import setup_globals
|
||||
|
||||
|
||||
OUR_MODULENAME = 'mediagoblin.celery_setup.from_tests'
|
||||
OUR_MODULENAME = __name__
|
||||
|
||||
|
||||
def setup_self(setup_globals_func=setup_globals):
|
||||
def setup_self():
|
||||
"""
|
||||
Set up celery for testing's sake, which just needs to set up
|
||||
celery and celery only.
|
||||
|
||||
@@ -16,8 +16,6 @@
|
||||
|
||||
from mongokit import DocumentMigration
|
||||
|
||||
from mediagoblin import globals as mediagoblin_globals
|
||||
|
||||
|
||||
class MediaEntryMigration(DocumentMigration):
|
||||
def allmigration01_uploader_to_reference(self):
|
||||
|
||||
@@ -20,7 +20,7 @@ from mongokit import Document, Set
|
||||
|
||||
from mediagoblin import util
|
||||
from mediagoblin.auth import lib as auth_lib
|
||||
from mediagoblin import globals as mediagoblin_globals
|
||||
from mediagoblin import mg_globals
|
||||
from mediagoblin.db import migrations
|
||||
from mediagoblin.db.util import ObjectId
|
||||
|
||||
@@ -114,7 +114,7 @@ class MediaEntry(Document):
|
||||
def generate_slug(self):
|
||||
self['slug'] = util.slugify(self['title'])
|
||||
|
||||
duplicate = mediagoblin_globals.database.media_entries.find_one(
|
||||
duplicate = mg_globals.database.media_entries.find_one(
|
||||
{'slug': self['slug']})
|
||||
|
||||
if duplicate:
|
||||
|
||||
@@ -17,7 +17,6 @@
|
||||
|
||||
from mediagoblin.db import migrations
|
||||
from mediagoblin.gmg_commands import util as commands_util
|
||||
from mediagoblin import globals as mgoblin_globals
|
||||
|
||||
|
||||
def migrate_parser_setup(subparser):
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
|
||||
import code
|
||||
|
||||
from mediagoblin import globals as mgoblin_globals
|
||||
from mediagoblin import mg_globals
|
||||
from mediagoblin.gmg_commands import util as commands_util
|
||||
|
||||
|
||||
@@ -35,7 +35,7 @@ GNU MediaGoblin shell!
|
||||
----------------------
|
||||
Available vars:
|
||||
- mgoblin_app: instantiated mediagoblin application
|
||||
- mgoblin_globals: mediagoblin.globals
|
||||
- mg_globals: mediagoblin.globals
|
||||
- db: database instance
|
||||
"""
|
||||
|
||||
@@ -50,5 +50,5 @@ def shell(args):
|
||||
banner=SHELL_BANNER,
|
||||
local={
|
||||
'mgoblin_app': mgoblin_app,
|
||||
'mgoblin_globals': mgoblin_globals,
|
||||
'db': mgoblin_globals.database})
|
||||
'mg_globals': mg_globals,
|
||||
'db': mg_globals.database})
|
||||
|
||||
@@ -27,7 +27,7 @@ translations = gettext.find(
|
||||
|
||||
|
||||
def setup_globals(**kwargs):
|
||||
from mediagoblin import globals as mg_globals
|
||||
from mediagoblin import mg_globals
|
||||
|
||||
for key, value in kwargs.iteritems():
|
||||
setattr(mg_globals, key, value)
|
||||
@@ -18,48 +18,63 @@ import Image
|
||||
from mediagoblin.db.util import ObjectId
|
||||
from celery.task import task
|
||||
|
||||
from mediagoblin.globals import database, queue_store, public_store
|
||||
from mediagoblin import mg_globals as mgg
|
||||
|
||||
|
||||
THUMB_SIZE = 200, 200
|
||||
|
||||
|
||||
def create_pub_filepath(entry, filename):
|
||||
return mgg.public_store.get_unique_filepath(
|
||||
['media_entries',
|
||||
unicode(entry['_id']),
|
||||
filename])
|
||||
|
||||
|
||||
@task
|
||||
def process_media_initial(media_id):
|
||||
entry = database.MediaEntry.one(
|
||||
workbench = mgg.workbench_manager.create_workbench()
|
||||
|
||||
entry = mgg.database.MediaEntry.one(
|
||||
{'_id': ObjectId(media_id)})
|
||||
|
||||
queued_filepath = entry['queued_media_file']
|
||||
queued_file = queue_store.get_file(queued_filepath, 'r')
|
||||
queued_filename = workbench.localized_file(
|
||||
mgg.queue_store, queued_filepath,
|
||||
'source')
|
||||
|
||||
queued_file = file(queued_filename, 'r')
|
||||
|
||||
with queued_file:
|
||||
thumb = Image.open(queued_file)
|
||||
thumb.thumbnail(THUMB_SIZE, Image.ANTIALIAS)
|
||||
# ensure color mode is compatible with jpg
|
||||
if thumb.mode != "RGB":
|
||||
thumb = thumb.convert("RGB")
|
||||
|
||||
thumb_filepath = public_store.get_unique_filepath(
|
||||
['media_entries',
|
||||
unicode(entry['_id']),
|
||||
'thumbnail.jpg'])
|
||||
thumb_filepath = create_pub_filepath(entry, 'thumbnail.jpg')
|
||||
|
||||
with public_store.get_file(thumb_filepath, 'w') as thumb_file:
|
||||
thumb_file = mgg.public_store.get_file(thumb_filepath, 'w')
|
||||
with thumb_file:
|
||||
thumb.save(thumb_file, "JPEG")
|
||||
|
||||
# we have to re-read because unlike PIL, not everything reads
|
||||
# things in string representation :)
|
||||
queued_file = queue_store.get_file(queued_filepath, 'rb')
|
||||
queued_file = file(queued_filename, 'rb')
|
||||
|
||||
with queued_file:
|
||||
main_filepath = public_store.get_unique_filepath(
|
||||
['media_entries',
|
||||
unicode(entry['_id']),
|
||||
queued_filepath[-1]])
|
||||
main_filepath = create_pub_filepath(entry, queued_filepath[-1])
|
||||
|
||||
with public_store.get_file(main_filepath, 'wb') as main_file:
|
||||
with mgg.public_store.get_file(main_filepath, 'wb') as main_file:
|
||||
main_file.write(queued_file.read())
|
||||
|
||||
queue_store.delete_file(queued_filepath)
|
||||
mgg.queue_store.delete_file(queued_filepath)
|
||||
entry['queued_media_file'] = []
|
||||
media_files_dict = entry.setdefault('media_files', {})
|
||||
media_files_dict['thumb'] = thumb_filepath
|
||||
media_files_dict['main'] = main_filepath
|
||||
entry['state'] = u'processed'
|
||||
entry.save()
|
||||
|
||||
# clean up workbench
|
||||
workbench.destroy_self()
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import urlparse
|
||||
import uuid
|
||||
|
||||
@@ -60,6 +61,9 @@ class StorageInterface(object):
|
||||
StorageInterface.
|
||||
"""
|
||||
|
||||
# Whether this file store is on the local filesystem.
|
||||
local_storage = False
|
||||
|
||||
def __raise_not_implemented(self):
|
||||
"""
|
||||
Raise a warning about some component not implemented by a
|
||||
@@ -127,12 +131,43 @@ class StorageInterface(object):
|
||||
else:
|
||||
return filepath
|
||||
|
||||
def get_local_path(self, filepath):
|
||||
"""
|
||||
If this is a local_storage implementation, give us a link to
|
||||
the local filesystem reference to this file.
|
||||
|
||||
>>> storage_handler.get_local_path(['foo', 'bar', 'baz.jpg'])
|
||||
u'/path/to/mounting/foo/bar/baz.jpg'
|
||||
"""
|
||||
# Subclasses should override this method, if applicable.
|
||||
self.__raise_not_implemented()
|
||||
|
||||
def copy_locally(self, filepath, dest_path):
|
||||
"""
|
||||
Copy this file locally.
|
||||
|
||||
A basic working method for this is provided that should
|
||||
function both for local_storage systems and remote storge
|
||||
systems, but if more efficient systems for copying locally
|
||||
apply to your system, override this method with something more
|
||||
appropriate.
|
||||
"""
|
||||
if self.local_storage:
|
||||
shutil.copy(
|
||||
self.get_local_path(filepath), dest_path)
|
||||
else:
|
||||
with self.get_file(filepath, 'rb') as source_file:
|
||||
with file(dest_path, 'wb') as dest_file:
|
||||
dest_file.write(source_file.read())
|
||||
|
||||
|
||||
class BasicFileStorage(StorageInterface):
|
||||
"""
|
||||
Basic local filesystem implementation of storage API
|
||||
"""
|
||||
|
||||
local_storage = True
|
||||
|
||||
def __init__(self, base_dir, base_url=None, **kwargs):
|
||||
"""
|
||||
Keyword arguments:
|
||||
@@ -177,6 +212,9 @@ class BasicFileStorage(StorageInterface):
|
||||
self.base_url,
|
||||
'/'.join(clean_listy_filepath(filepath)))
|
||||
|
||||
def get_local_path(self, filepath):
|
||||
return self._resolve_filepath(filepath)
|
||||
|
||||
|
||||
###########
|
||||
# Utilities
|
||||
@@ -187,7 +225,7 @@ def clean_listy_filepath(listy_filepath):
|
||||
Take a listy filepath (like ['dir1', 'dir2', 'filename.jpg']) and
|
||||
clean out any nastiness from it.
|
||||
|
||||
For example:
|
||||
|
||||
>>> clean_listy_filepath([u'/dir1/', u'foo/../nasty', u'linooks.jpg'])
|
||||
[u'dir1', u'foo_.._nasty', u'linooks.jpg']
|
||||
|
||||
@@ -253,3 +291,5 @@ def storage_system_from_paste_config(paste_config, storage_prefix):
|
||||
|
||||
storage_class = util.import_component(storage_class)
|
||||
return storage_class(**config_params)
|
||||
|
||||
|
||||
|
||||
@@ -13,3 +13,14 @@
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from mediagoblin import mg_globals
|
||||
|
||||
|
||||
def setup_package():
|
||||
pass
|
||||
|
||||
def teardown_package():
|
||||
print "Killing db ..."
|
||||
mg_globals.db_connection.drop_database(mg_globals.database.name)
|
||||
print "... done"
|
||||
|
||||
@@ -20,7 +20,7 @@ from nose.tools import assert_equal
|
||||
|
||||
from mediagoblin.auth import lib as auth_lib
|
||||
from mediagoblin.tests.tools import setup_fresh_app
|
||||
from mediagoblin import globals as mgoblin_globals
|
||||
from mediagoblin import mg_globals
|
||||
from mediagoblin import util
|
||||
|
||||
|
||||
@@ -137,7 +137,7 @@ def test_register_views(test_app):
|
||||
u'Passwords must match.']
|
||||
|
||||
## At this point there should be no users in the database ;)
|
||||
assert not mgoblin_globals.database.User.find().count()
|
||||
assert not mg_globals.database.User.find().count()
|
||||
|
||||
# Successful register
|
||||
# -------------------
|
||||
@@ -158,7 +158,7 @@ def test_register_views(test_app):
|
||||
'mediagoblin/auth/register_success.html')
|
||||
|
||||
## Make sure user is in place
|
||||
new_user = mgoblin_globals.database.User.find_one(
|
||||
new_user = mg_globals.database.User.find_one(
|
||||
{'username': 'happygirl'})
|
||||
assert new_user
|
||||
assert new_user['status'] == u'needs_email_verification'
|
||||
@@ -191,7 +191,7 @@ def test_register_views(test_app):
|
||||
context = util.TEMPLATE_TEST_CONTEXT[
|
||||
'mediagoblin/auth/verify_email.html']
|
||||
assert context['verification_successful'] == False
|
||||
new_user = mgoblin_globals.database.User.find_one(
|
||||
new_user = mg_globals.database.User.find_one(
|
||||
{'username': 'happygirl'})
|
||||
assert new_user
|
||||
assert new_user['status'] == u'needs_email_verification'
|
||||
@@ -203,7 +203,7 @@ def test_register_views(test_app):
|
||||
context = util.TEMPLATE_TEST_CONTEXT[
|
||||
'mediagoblin/auth/verify_email.html']
|
||||
assert context['verification_successful'] == True
|
||||
new_user = mgoblin_globals.database.User.find_one(
|
||||
new_user = mg_globals.database.User.find_one(
|
||||
{'username': 'happygirl'})
|
||||
assert new_user
|
||||
assert new_user['status'] == u'active'
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from mediagoblin import globals as mg_globals
|
||||
from mediagoblin import mg_globals
|
||||
|
||||
def test_setup_globals():
|
||||
mg_globals.setup_globals(
|
||||
|
||||
@@ -52,6 +52,11 @@ class FakeStorageSystem():
|
||||
self.foobie = foobie
|
||||
self.blech = blech
|
||||
|
||||
class FakeRemoteStorage(storage.BasicFileStorage):
|
||||
# Theoretically despite this, all the methods should work but it
|
||||
# should force copying to the workbench
|
||||
local_storage = False
|
||||
|
||||
|
||||
def test_storage_system_from_paste_config():
|
||||
this_storage = storage.storage_system_from_paste_config(
|
||||
@@ -81,9 +86,12 @@ def test_storage_system_from_paste_config():
|
||||
# Basic file storage tests
|
||||
##########################
|
||||
|
||||
def get_tmp_filestorage(mount_url=None):
|
||||
def get_tmp_filestorage(mount_url=None, fake_remote=False):
|
||||
tmpdir = tempfile.mkdtemp()
|
||||
this_storage = storage.BasicFileStorage(tmpdir, mount_url)
|
||||
if fake_remote:
|
||||
this_storage = FakeRemoteStorage(tmpdir, mount_url)
|
||||
else:
|
||||
this_storage = storage.BasicFileStorage(tmpdir, mount_url)
|
||||
return tmpdir, this_storage
|
||||
|
||||
|
||||
@@ -214,3 +222,36 @@ def test_basic_storage_url_for_file():
|
||||
['dir1', 'dir2', 'filename.txt'])
|
||||
expected = 'http://media.example.org/ourmedia/dir1/dir2/filename.txt'
|
||||
assert result == expected
|
||||
|
||||
|
||||
def test_basic_storage_get_local_path():
|
||||
tmpdir, this_storage = get_tmp_filestorage()
|
||||
|
||||
result = this_storage.get_local_path(
|
||||
['dir1', 'dir2', 'filename.txt'])
|
||||
|
||||
expected = os.path.join(
|
||||
tmpdir, 'dir1/dir2/filename.txt')
|
||||
|
||||
assert result == expected
|
||||
|
||||
|
||||
def test_basic_storage_is_local():
|
||||
tmpdir, this_storage = get_tmp_filestorage()
|
||||
assert this_storage.local_storage is True
|
||||
|
||||
|
||||
def test_basic_storage_copy_locally():
|
||||
tmpdir, this_storage = get_tmp_filestorage()
|
||||
|
||||
dest_tmpdir = tempfile.mkdtemp()
|
||||
|
||||
filepath = ['dir1', 'dir2', 'ourfile.txt']
|
||||
with this_storage.get_file(filepath, 'w') as our_file:
|
||||
our_file.write('Testing this file')
|
||||
|
||||
new_file_dest = os.path.join(dest_tmpdir, 'file2.txt')
|
||||
|
||||
this_storage.copy_locally(filepath, new_file_dest)
|
||||
|
||||
assert file(new_file_dest).read() == 'Testing this file'
|
||||
|
||||
@@ -16,7 +16,7 @@
|
||||
|
||||
from mediagoblin.tests.tools import get_test_app
|
||||
|
||||
from mediagoblin import globals as mgoblin_globals
|
||||
from mediagoblin import mg_globals
|
||||
|
||||
|
||||
def test_get_test_app_wipes_db():
|
||||
@@ -24,15 +24,15 @@ def test_get_test_app_wipes_db():
|
||||
Make sure we get a fresh database on every wipe :)
|
||||
"""
|
||||
get_test_app()
|
||||
assert mgoblin_globals.database.User.find().count() == 0
|
||||
assert mg_globals.database.User.find().count() == 0
|
||||
|
||||
new_user = mgoblin_globals.database.User()
|
||||
new_user = mg_globals.database.User()
|
||||
new_user['username'] = u'lolcat'
|
||||
new_user['email'] = u'lol@cats.example.org'
|
||||
new_user['pw_hash'] = u'pretend_this_is_a_hash'
|
||||
new_user.save()
|
||||
assert mgoblin_globals.database.User.find().count() == 1
|
||||
assert mg_globals.database.User.find().count() == 1
|
||||
|
||||
get_test_app()
|
||||
|
||||
assert mgoblin_globals.database.User.find().count() == 0
|
||||
assert mg_globals.database.User.find().count() == 0
|
||||
|
||||
@@ -103,3 +103,22 @@ def test_locale_to_lower_lower():
|
||||
# crazy renditions. Useful?
|
||||
assert util.locale_to_lower_lower('en-US') == 'en-us'
|
||||
assert util.locale_to_lower_lower('en_us') == 'en-us'
|
||||
|
||||
|
||||
def test_html_cleaner():
|
||||
# Remove images
|
||||
result = util.clean_html(
|
||||
'<p>Hi everybody! '
|
||||
'<img src="http://example.org/huge-purple-barney.png" /></p>\n'
|
||||
'<p>:)</p>')
|
||||
assert result == (
|
||||
'<div>'
|
||||
'<p>Hi everybody! </p>\n'
|
||||
'<p>:)</p>'
|
||||
'</div>')
|
||||
|
||||
# Remove evil javascript
|
||||
result = util.clean_html(
|
||||
'<p><a href="javascript:nasty_surprise">innocent link!</a></p>')
|
||||
assert result == (
|
||||
'<p><a href="">innocent link!</a></p>')
|
||||
|
||||
94
mediagoblin/tests/test_workbench.py
Normal file
94
mediagoblin/tests/test_workbench.py
Normal file
@@ -0,0 +1,94 @@
|
||||
# GNU MediaGoblin -- federated, autonomous media hosting
|
||||
# Copyright (C) 2011 Free Software Foundation, Inc
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
from nose.tools import assert_raises
|
||||
|
||||
from mediagoblin import workbench
|
||||
from mediagoblin.tests.test_storage import get_tmp_filestorage
|
||||
|
||||
|
||||
class TestWorkbench(object):
|
||||
def setUp(self):
|
||||
self.workbench_manager = workbench.WorkbenchManager(
|
||||
os.path.join(tempfile.gettempdir(), u'mgoblin_workbench_testing'))
|
||||
|
||||
def test_create_workbench(self):
|
||||
workbench = self.workbench_manager.create_workbench()
|
||||
assert os.path.isdir(workbench.dir)
|
||||
assert workbench.dir.startswith(self.workbench_manager.base_workbench_dir)
|
||||
|
||||
def test_joinpath(self):
|
||||
this_workbench = self.workbench_manager.create_workbench()
|
||||
tmpname = this_workbench.joinpath('temp.txt')
|
||||
assert tmpname == os.path.join(this_workbench.dir, 'temp.txt')
|
||||
this_workbench.destroy_self()
|
||||
|
||||
def test_destroy_workbench(self):
|
||||
# kill a workbench
|
||||
this_workbench = self.workbench_manager.create_workbench()
|
||||
tmpfile_name = this_workbench.joinpath('temp.txt')
|
||||
tmpfile = file(tmpfile_name, 'w')
|
||||
with tmpfile:
|
||||
tmpfile.write('lollerskates')
|
||||
|
||||
assert os.path.exists(tmpfile_name)
|
||||
|
||||
wb_dir = this_workbench.dir
|
||||
this_workbench.destroy_self()
|
||||
assert not os.path.exists(tmpfile_name)
|
||||
assert not os.path.exists(wb_dir)
|
||||
|
||||
def test_localized_file(self):
|
||||
tmpdir, this_storage = get_tmp_filestorage()
|
||||
this_workbench = self.workbench_manager.create_workbench()
|
||||
|
||||
# Write a brand new file
|
||||
filepath = ['dir1', 'dir2', 'ourfile.txt']
|
||||
|
||||
with this_storage.get_file(filepath, 'w') as our_file:
|
||||
our_file.write('Our file')
|
||||
|
||||
# with a local file storage
|
||||
filename = this_workbench.localized_file(this_storage, filepath)
|
||||
assert filename == os.path.join(
|
||||
tmpdir, 'dir1/dir2/ourfile.txt')
|
||||
|
||||
# with a fake remote file storage
|
||||
tmpdir, this_storage = get_tmp_filestorage(fake_remote=True)
|
||||
|
||||
# ... write a brand new file, again ;)
|
||||
with this_storage.get_file(filepath, 'w') as our_file:
|
||||
our_file.write('Our file')
|
||||
|
||||
filename = this_workbench.localized_file(this_storage, filepath)
|
||||
assert filename == os.path.join(
|
||||
this_workbench.dir, 'ourfile.txt')
|
||||
|
||||
# fake remote file storage, filename_if_copying set
|
||||
filename = this_workbench.localized_file(
|
||||
this_storage, filepath, 'thisfile')
|
||||
assert filename == os.path.join(
|
||||
this_workbench.dir, 'thisfile.txt')
|
||||
|
||||
# fake remote file storage, filename_if_copying set,
|
||||
# keep_extension_if_copying set to false
|
||||
filename = this_workbench.localized_file(
|
||||
this_storage, filepath, 'thisfile.text', False)
|
||||
assert filename == os.path.join(
|
||||
this_workbench.dir, 'thisfile.text')
|
||||
@@ -30,8 +30,9 @@ import jinja2
|
||||
import translitcodec
|
||||
from paste.deploy.loadwsgi import NicerConfigParser
|
||||
from webob import Response, exc
|
||||
from lxml.html.clean import Cleaner
|
||||
|
||||
from mediagoblin import globals as mgoblin_globals
|
||||
from mediagoblin import mg_globals
|
||||
from mediagoblin.db.util import ObjectId
|
||||
|
||||
|
||||
@@ -102,8 +103,8 @@ def get_jinja_env(template_loader, locale):
|
||||
extensions=['jinja2.ext.i18n'])
|
||||
|
||||
template_env.install_gettext_callables(
|
||||
mgoblin_globals.translations.gettext,
|
||||
mgoblin_globals.translations.ngettext)
|
||||
mg_globals.translations.gettext,
|
||||
mg_globals.translations.ngettext)
|
||||
|
||||
if exists(locale):
|
||||
SETUP_JINJA_ENVS[locale] = template_env
|
||||
@@ -264,9 +265,9 @@ def send_email(from_addr, to_addrs, subject, message_body):
|
||||
- message_body: email body text
|
||||
"""
|
||||
# TODO: make a mock mhost if testing is enabled
|
||||
if TESTS_ENABLED or mgoblin_globals.email_debug_mode:
|
||||
if TESTS_ENABLED or mg_globals.email_debug_mode:
|
||||
mhost = FakeMhost()
|
||||
elif not mgoblin_globals.email_debug_mode:
|
||||
elif not mg_globals.email_debug_mode:
|
||||
mhost = smtplib.SMTP()
|
||||
|
||||
mhost.connect()
|
||||
@@ -279,7 +280,7 @@ def send_email(from_addr, to_addrs, subject, message_body):
|
||||
if TESTS_ENABLED:
|
||||
EMAIL_TEST_INBOX.append(message)
|
||||
|
||||
if getattr(mgoblin_globals, 'email_debug_mode', False):
|
||||
if getattr(mg_globals, 'email_debug_mode', False):
|
||||
print u"===== Email ====="
|
||||
print u"From address: %s" % message['From']
|
||||
print u"To addresses: %s" % message['To']
|
||||
@@ -373,6 +374,32 @@ def read_config_file(conf_file):
|
||||
return mgoblin_conf
|
||||
|
||||
|
||||
# A super strict version of the lxml.html cleaner class
|
||||
HTML_CLEANER = Cleaner(
|
||||
scripts=True,
|
||||
javascript=True,
|
||||
comments=True,
|
||||
style=True,
|
||||
links=True,
|
||||
page_structure=True,
|
||||
processing_instructions=True,
|
||||
embedded=True,
|
||||
frames=True,
|
||||
forms=True,
|
||||
annoying_tags=True,
|
||||
allow_tags=[
|
||||
'div', 'b', 'i', 'em', 'strong', 'p', 'ul', 'ol', 'li', 'a', 'br'],
|
||||
remove_unknown_tags=False, # can't be used with allow_tags
|
||||
safe_attrs_only=True,
|
||||
add_nofollow=True, # for now
|
||||
host_whitelist=(),
|
||||
whitelist_tags=set([]))
|
||||
|
||||
|
||||
def clean_html(html):
|
||||
return HTML_CLEANER.clean_html(html)
|
||||
|
||||
|
||||
SETUP_GETTEXTS = {}
|
||||
|
||||
def setup_gettext(locale):
|
||||
@@ -393,7 +420,7 @@ def setup_gettext(locale):
|
||||
if exists(locale):
|
||||
SETUP_GETTEXTS[locale] = this_gettext
|
||||
|
||||
mgoblin_globals.setup_globals(
|
||||
mg_globals.setup_globals(
|
||||
translations=this_gettext)
|
||||
|
||||
|
||||
|
||||
148
mediagoblin/workbench.py
Normal file
148
mediagoblin/workbench.py
Normal file
@@ -0,0 +1,148 @@
|
||||
# GNU MediaGoblin -- federated, autonomous media hosting
|
||||
# Copyright (C) 2011 Free Software Foundation, Inc
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
|
||||
|
||||
DEFAULT_WORKBENCH_DIR = os.path.join(
|
||||
tempfile.gettempdir(), u'mgoblin_workbench')
|
||||
|
||||
|
||||
# Actual workbench stuff
|
||||
# ----------------------
|
||||
|
||||
class Workbench(object):
|
||||
"""
|
||||
Represent the directory for the workbench
|
||||
|
||||
WARNING: DO NOT create Workbench objects on your own,
|
||||
let the WorkbenchManager do that for you!
|
||||
"""
|
||||
def __init__(self, dir):
|
||||
"""
|
||||
WARNING: DO NOT create Workbench objects on your own,
|
||||
let the WorkbenchManager do that for you!
|
||||
"""
|
||||
self.dir = dir
|
||||
|
||||
def __unicode__(self):
|
||||
return unicode(self.dir)
|
||||
def __str__(self):
|
||||
return str(self.dir)
|
||||
def __repr__(self):
|
||||
return repr(self.dir)
|
||||
|
||||
def joinpath(self, *args):
|
||||
return os.path.join(self.dir, *args)
|
||||
|
||||
def localized_file(self, storage, filepath,
|
||||
filename_if_copying=None,
|
||||
keep_extension_if_copying=True):
|
||||
"""
|
||||
Possibly localize the file from this storage system (for read-only
|
||||
purposes, modifications should be written to a new file.).
|
||||
|
||||
If the file is already local, just return the absolute filename of that
|
||||
local file. Otherwise, copy the file locally to the workbench, and
|
||||
return the absolute path of the new file.
|
||||
|
||||
If it is copying locally, we might want to require a filename like
|
||||
"source.jpg" to ensure that we won't conflict with other filenames in
|
||||
our workbench... if that's the case, make sure filename_if_copying is
|
||||
set to something like 'source.jpg'. Relatedly, if you set
|
||||
keep_extension_if_copying, you don't have to set an extension on
|
||||
filename_if_copying yourself, it'll be set for you (assuming such an
|
||||
extension can be extacted from the filename in the filepath).
|
||||
|
||||
Returns:
|
||||
localized_filename
|
||||
|
||||
Examples:
|
||||
>>> wb_manager.localized_file(
|
||||
... '/our/workbench/subdir', local_storage,
|
||||
... ['path', 'to', 'foobar.jpg'])
|
||||
u'/local/storage/path/to/foobar.jpg'
|
||||
|
||||
>>> wb_manager.localized_file(
|
||||
... '/our/workbench/subdir', remote_storage,
|
||||
... ['path', 'to', 'foobar.jpg'])
|
||||
'/our/workbench/subdir/foobar.jpg'
|
||||
|
||||
>>> wb_manager.localized_file(
|
||||
... '/our/workbench/subdir', remote_storage,
|
||||
... ['path', 'to', 'foobar.jpg'], 'source.jpeg', False)
|
||||
'/our/workbench/subdir/foobar.jpeg'
|
||||
|
||||
>>> wb_manager.localized_file(
|
||||
... '/our/workbench/subdir', remote_storage,
|
||||
... ['path', 'to', 'foobar.jpg'], 'source', True)
|
||||
'/our/workbench/subdir/foobar.jpg'
|
||||
"""
|
||||
if storage.local_storage:
|
||||
return storage.get_local_path(filepath)
|
||||
else:
|
||||
if filename_if_copying is None:
|
||||
dest_filename = filepath[-1]
|
||||
else:
|
||||
orig_filename, orig_ext = os.path.splitext(filepath[-1])
|
||||
if keep_extension_if_copying and orig_ext:
|
||||
dest_filename = filename_if_copying + orig_ext
|
||||
else:
|
||||
dest_filename = filename_if_copying
|
||||
|
||||
full_dest_filename = os.path.join(
|
||||
self.dir, dest_filename)
|
||||
|
||||
# copy it over
|
||||
storage.copy_locally(
|
||||
filepath, full_dest_filename)
|
||||
|
||||
return full_dest_filename
|
||||
|
||||
def destroy_self(self):
|
||||
"""
|
||||
Destroy this workbench! Deletes the directory and all its contents!
|
||||
|
||||
WARNING: Does no checks for a sane value in self.dir!
|
||||
"""
|
||||
# just in case
|
||||
workbench = os.path.abspath(self.dir)
|
||||
|
||||
shutil.rmtree(workbench)
|
||||
|
||||
del self.dir
|
||||
|
||||
|
||||
class WorkbenchManager(object):
|
||||
"""
|
||||
A system for generating and destroying workbenches.
|
||||
|
||||
Workbenches are actually just subdirectories of a temporary storage space
|
||||
for during the processing stage.
|
||||
"""
|
||||
|
||||
def __init__(self, base_workbench_dir):
|
||||
self.base_workbench_dir = os.path.abspath(base_workbench_dir)
|
||||
if not os.path.exists(self.base_workbench_dir):
|
||||
os.makedirs(self.base_workbench_dir)
|
||||
|
||||
def create_workbench(self):
|
||||
"""
|
||||
Create and return the path to a new workbench (directory).
|
||||
"""
|
||||
return Workbench(tempfile.mkdtemp(dir=self.base_workbench_dir))
|
||||
Reference in New Issue
Block a user