Merge branch 'master' into merge-python3-port
Has some issues, will iteratively fix! Conflicts: mediagoblin/gmg_commands/__init__.py mediagoblin/gmg_commands/deletemedia.py mediagoblin/gmg_commands/users.py mediagoblin/oauth/views.py mediagoblin/plugins/api/views.py mediagoblin/tests/test_api.py mediagoblin/tests/test_edit.py mediagoblin/tests/test_oauth1.py mediagoblin/tests/test_util.py mediagoblin/tools/mail.py mediagoblin/webfinger/views.py setup.py
This commit is contained in:
4
mediagoblin/tests/starttls_config.ini
Executable file
4
mediagoblin/tests/starttls_config.ini
Executable file
@@ -0,0 +1,4 @@
|
||||
[mediagoblin]
|
||||
email_debug_mode = false
|
||||
email_smtp_force_starttls = true
|
||||
email_smtp_host = someplace.com
|
||||
@@ -13,81 +13,476 @@
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
import json
|
||||
|
||||
|
||||
import logging
|
||||
import base64
|
||||
|
||||
import mock
|
||||
import pytest
|
||||
|
||||
from webtest import AppError
|
||||
|
||||
from .resources import GOOD_JPG
|
||||
from mediagoblin import mg_globals
|
||||
from mediagoblin.tools import template, pluginapi
|
||||
from mediagoblin.db.models import User, MediaEntry, MediaComment
|
||||
from mediagoblin.tests.tools import fixture_add_user
|
||||
from .resources import GOOD_JPG, GOOD_PNG, EVIL_FILE, EVIL_JPG, EVIL_PNG, \
|
||||
BIG_BLUE
|
||||
|
||||
|
||||
_log = logging.getLogger(__name__)
|
||||
|
||||
from mediagoblin.moderation.tools import take_away_privileges
|
||||
|
||||
class TestAPI(object):
|
||||
def setup(self):
|
||||
""" Test mediagoblin's pump.io complient APIs """
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup(self, test_app):
|
||||
self.test_app = test_app
|
||||
self.db = mg_globals.database
|
||||
|
||||
self.user_password = u'4cc355_70k3N'
|
||||
self.user = fixture_add_user(u'joapi', self.user_password,
|
||||
privileges=[u'active',u'uploader'])
|
||||
self.user = fixture_add_user(privileges=[u'active', u'uploader', u'commenter'])
|
||||
self.other_user = fixture_add_user(
|
||||
username="otheruser",
|
||||
privileges=[u'active', u'uploader', u'commenter']
|
||||
)
|
||||
self.active_user = self.user
|
||||
|
||||
def login(self, test_app):
|
||||
test_app.post(
|
||||
'/auth/login/', {
|
||||
'username': self.user.username,
|
||||
'password': self.user_password})
|
||||
def _activity_to_feed(self, test_app, activity, headers=None):
|
||||
""" Posts an activity to the user's feed """
|
||||
if headers:
|
||||
headers.setdefault("Content-Type", "application/json")
|
||||
else:
|
||||
headers = {"Content-Type": "application/json"}
|
||||
|
||||
def get_context(self, template_name):
|
||||
return template.TEMPLATE_TEST_CONTEXT[template_name]
|
||||
with self.mock_oauth():
|
||||
response = test_app.post(
|
||||
"/api/user/{0}/feed".format(self.active_user.username),
|
||||
json.dumps(activity),
|
||||
headers=headers
|
||||
)
|
||||
|
||||
def http_auth_headers(self):
|
||||
return {'Authorization': ('Basic {0}'.format(
|
||||
base64.b64encode((':'.join([
|
||||
self.user.username,
|
||||
self.user_password])).encode('ascii')).decode()))}
|
||||
return response, json.loads(response.body)
|
||||
|
||||
def do_post(self, data, test_app, **kwargs):
|
||||
url = kwargs.pop('url', '/api/submit')
|
||||
do_follow = kwargs.pop('do_follow', False)
|
||||
def _upload_image(self, test_app, image):
|
||||
""" Uploads and image to MediaGoblin via pump.io API """
|
||||
data = open(image, "rb").read()
|
||||
headers = {
|
||||
"Content-Type": "image/jpeg",
|
||||
"Content-Length": str(len(data))
|
||||
}
|
||||
|
||||
if 'headers' not in kwargs.keys():
|
||||
kwargs['headers'] = self.http_auth_headers()
|
||||
|
||||
response = test_app.post(url, data, **kwargs)
|
||||
with self.mock_oauth():
|
||||
response = test_app.post(
|
||||
"/api/user/{0}/uploads".format(self.active_user.username),
|
||||
data,
|
||||
headers=headers
|
||||
)
|
||||
image = json.loads(response.body)
|
||||
|
||||
if do_follow:
|
||||
response.follow()
|
||||
return response, image
|
||||
|
||||
return response
|
||||
def _post_image_to_feed(self, test_app, image):
|
||||
""" Posts an already uploaded image to feed """
|
||||
activity = {
|
||||
"verb": "post",
|
||||
"object": image,
|
||||
}
|
||||
|
||||
def upload_data(self, filename):
|
||||
return {'upload_files': [('file', filename)]}
|
||||
return self._activity_to_feed(test_app, activity)
|
||||
|
||||
def test_1_test_test_view(self, test_app):
|
||||
self.login(test_app)
|
||||
def mocked_oauth_required(self, *args, **kwargs):
|
||||
""" Mocks mediagoblin.decorator.oauth_required to always validate """
|
||||
|
||||
response = test_app.get(
|
||||
'/api/test',
|
||||
headers=self.http_auth_headers())
|
||||
def fake_controller(controller, request, *args, **kwargs):
|
||||
request.user = User.query.filter_by(id=self.active_user.id).first()
|
||||
return controller(request, *args, **kwargs)
|
||||
|
||||
assert response.body == \
|
||||
b'{"email": "joapi@example.com", "username": "joapi"}'
|
||||
def oauth_required(c):
|
||||
return lambda *args, **kwargs: fake_controller(c, *args, **kwargs)
|
||||
|
||||
def test_2_test_submission(self, test_app):
|
||||
self.login(test_app)
|
||||
return oauth_required
|
||||
|
||||
response = self.do_post(
|
||||
{'title': 'Great JPG!'},
|
||||
test_app,
|
||||
**self.upload_data(GOOD_JPG))
|
||||
def mock_oauth(self):
|
||||
""" Returns a mock.patch for the oauth_required decorator """
|
||||
return mock.patch(
|
||||
target="mediagoblin.decorators.oauth_required",
|
||||
new_callable=self.mocked_oauth_required
|
||||
)
|
||||
|
||||
assert response.status_int == 200
|
||||
def test_can_post_image(self, test_app):
|
||||
""" Tests that an image can be posted to the API """
|
||||
# First request we need to do is to upload the image
|
||||
response, image = self._upload_image(test_app, GOOD_JPG)
|
||||
|
||||
assert self.db.MediaEntry.query.filter_by(title=u'Great JPG!').first()
|
||||
# I should have got certain things back
|
||||
assert response.status_code == 200
|
||||
|
||||
assert "id" in image
|
||||
assert "fullImage" in image
|
||||
assert "url" in image["fullImage"]
|
||||
assert "url" in image
|
||||
assert "author" in image
|
||||
assert "published" in image
|
||||
assert "updated" in image
|
||||
assert image["objectType"] == "image"
|
||||
|
||||
# Check that we got the response we're expecting
|
||||
response, _ = self._post_image_to_feed(test_app, image)
|
||||
assert response.status_code == 200
|
||||
|
||||
def test_unable_to_upload_as_someone_else(self, test_app):
|
||||
""" Test that can't upload as someoen else """
|
||||
data = open(GOOD_JPG, "rb").read()
|
||||
headers = {
|
||||
"Content-Type": "image/jpeg",
|
||||
"Content-Length": str(len(data))
|
||||
}
|
||||
|
||||
with self.mock_oauth():
|
||||
# Will be self.user trying to upload as self.other_user
|
||||
with pytest.raises(AppError) as excinfo:
|
||||
test_app.post(
|
||||
"/api/user/{0}/uploads".format(self.other_user.username),
|
||||
data,
|
||||
headers=headers
|
||||
)
|
||||
|
||||
assert "403 FORBIDDEN" in excinfo.value.message
|
||||
|
||||
def test_unable_to_post_feed_as_someone_else(self, test_app):
|
||||
""" Tests that can't post an image to someone else's feed """
|
||||
response, data = self._upload_image(test_app, GOOD_JPG)
|
||||
|
||||
activity = {
|
||||
"verb": "post",
|
||||
"object": data
|
||||
}
|
||||
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
with self.mock_oauth():
|
||||
with pytest.raises(AppError) as excinfo:
|
||||
test_app.post(
|
||||
"/api/user/{0}/feed".format(self.other_user.username),
|
||||
json.dumps(activity),
|
||||
headers=headers
|
||||
)
|
||||
|
||||
assert "403 FORBIDDEN" in excinfo.value.message
|
||||
|
||||
def test_only_able_to_update_own_image(self, test_app):
|
||||
""" Test's that the uploader is the only person who can update an image """
|
||||
response, data = self._upload_image(test_app, GOOD_JPG)
|
||||
response, data = self._post_image_to_feed(test_app, data)
|
||||
|
||||
activity = {
|
||||
"verb": "update",
|
||||
"object": data["object"],
|
||||
}
|
||||
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
# Lets change the image uploader to be self.other_user, this is easier
|
||||
# than uploading the image as someone else as the way self.mocked_oauth_required
|
||||
# and self._upload_image.
|
||||
media = MediaEntry.query.filter_by(id=data["object"]["id"]).first()
|
||||
media.uploader = self.other_user.id
|
||||
media.save()
|
||||
|
||||
# Now lets try and edit the image as self.user, this should produce a 403 error.
|
||||
with self.mock_oauth():
|
||||
with pytest.raises(AppError) as excinfo:
|
||||
test_app.post(
|
||||
"/api/user/{0}/feed".format(self.user.username),
|
||||
json.dumps(activity),
|
||||
headers=headers
|
||||
)
|
||||
|
||||
assert "403 FORBIDDEN" in excinfo.value.message
|
||||
|
||||
def test_upload_image_with_filename(self, test_app):
|
||||
""" Tests that you can upload an image with filename and description """
|
||||
response, data = self._upload_image(test_app, GOOD_JPG)
|
||||
response, data = self._post_image_to_feed(test_app, data)
|
||||
|
||||
image = data["object"]
|
||||
|
||||
# Now we need to add a title and description
|
||||
title = "My image ^_^"
|
||||
description = "This is my super awesome image :D"
|
||||
license = "CC-BY-SA"
|
||||
|
||||
image["displayName"] = title
|
||||
image["content"] = description
|
||||
image["license"] = license
|
||||
|
||||
activity = {"verb": "update", "object": image}
|
||||
|
||||
with self.mock_oauth():
|
||||
response = test_app.post(
|
||||
"/api/user/{0}/feed".format(self.user.username),
|
||||
json.dumps(activity),
|
||||
headers={"Content-Type": "application/json"}
|
||||
)
|
||||
|
||||
image = json.loads(response.body)["object"]
|
||||
|
||||
# Check everything has been set on the media correctly
|
||||
media = MediaEntry.query.filter_by(id=image["id"]).first()
|
||||
assert media.title == title
|
||||
assert media.description == description
|
||||
assert media.license == license
|
||||
|
||||
# Check we're being given back everything we should on an update
|
||||
assert image["id"] == media.id
|
||||
assert image["displayName"] == title
|
||||
assert image["content"] == description
|
||||
assert image["license"] == license
|
||||
|
||||
|
||||
def test_only_uploaders_post_image(self, test_app):
|
||||
""" Test that only uploaders can upload images """
|
||||
# Remove uploader permissions from user
|
||||
take_away_privileges(self.user.username, u"uploader")
|
||||
|
||||
# Now try and upload a image
|
||||
data = open(GOOD_JPG, "rb").read()
|
||||
headers = {
|
||||
"Content-Type": "image/jpeg",
|
||||
"Content-Length": str(len(data)),
|
||||
}
|
||||
|
||||
with self.mock_oauth():
|
||||
with pytest.raises(AppError) as excinfo:
|
||||
test_app.post(
|
||||
"/api/user/{0}/uploads".format(self.user.username),
|
||||
data,
|
||||
headers=headers
|
||||
)
|
||||
|
||||
# Assert that we've got a 403
|
||||
assert "403 FORBIDDEN" in excinfo.value.message
|
||||
|
||||
def test_object_endpoint(self, test_app):
|
||||
""" Tests that object can be looked up at endpoint """
|
||||
# Post an image
|
||||
response, data = self._upload_image(test_app, GOOD_JPG)
|
||||
response, data = self._post_image_to_feed(test_app, data)
|
||||
|
||||
# Now lookup image to check that endpoint works.
|
||||
image = data["object"]
|
||||
|
||||
assert "links" in image
|
||||
assert "self" in image["links"]
|
||||
|
||||
# Get URI and strip testing host off
|
||||
object_uri = image["links"]["self"]["href"]
|
||||
object_uri = object_uri.replace("http://localhost:80", "")
|
||||
|
||||
with self.mock_oauth():
|
||||
request = test_app.get(object_uri)
|
||||
|
||||
image = json.loads(request.body)
|
||||
entry = MediaEntry.query.filter_by(id=image["id"]).first()
|
||||
|
||||
assert request.status_code == 200
|
||||
assert entry.id == image["id"]
|
||||
|
||||
assert "image" in image
|
||||
assert "fullImage" in image
|
||||
assert "pump_io" in image
|
||||
assert "links" in image
|
||||
|
||||
def test_post_comment(self, test_app):
|
||||
""" Tests that I can post an comment media """
|
||||
# Upload some media to comment on
|
||||
response, data = self._upload_image(test_app, GOOD_JPG)
|
||||
response, data = self._post_image_to_feed(test_app, data)
|
||||
|
||||
content = "Hai this is a comment on this lovely picture ^_^"
|
||||
|
||||
activity = {
|
||||
"verb": "post",
|
||||
"object": {
|
||||
"objectType": "comment",
|
||||
"content": content,
|
||||
"inReplyTo": data["object"],
|
||||
}
|
||||
}
|
||||
|
||||
response, comment_data = self._activity_to_feed(test_app, activity)
|
||||
assert response.status_code == 200
|
||||
|
||||
# Find the objects in the database
|
||||
media = MediaEntry.query.filter_by(id=data["object"]["id"]).first()
|
||||
comment = media.get_comments()[0]
|
||||
|
||||
# Tests that it matches in the database
|
||||
assert comment.author == self.user.id
|
||||
assert comment.content == content
|
||||
|
||||
# Test that the response is what we should be given
|
||||
assert comment.id == comment_data["object"]["id"]
|
||||
assert comment.content == comment_data["object"]["content"]
|
||||
|
||||
def test_unable_to_post_comment_as_someone_else(self, test_app):
|
||||
""" Tests that you're unable to post a comment as someone else. """
|
||||
# Upload some media to comment on
|
||||
response, data = self._upload_image(test_app, GOOD_JPG)
|
||||
response, data = self._post_image_to_feed(test_app, data)
|
||||
|
||||
activity = {
|
||||
"verb": "post",
|
||||
"object": {
|
||||
"objectType": "comment",
|
||||
"content": "comment commenty comment ^_^",
|
||||
"inReplyTo": data["object"],
|
||||
}
|
||||
}
|
||||
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
with self.mock_oauth():
|
||||
with pytest.raises(AppError) as excinfo:
|
||||
test_app.post(
|
||||
"/api/user/{0}/feed".format(self.other_user.username),
|
||||
json.dumps(activity),
|
||||
headers=headers
|
||||
)
|
||||
|
||||
assert "403 FORBIDDEN" in excinfo.value.message
|
||||
|
||||
def test_unable_to_update_someone_elses_comment(self, test_app):
|
||||
""" Test that you're able to update someoen elses comment. """
|
||||
# Upload some media to comment on
|
||||
response, data = self._upload_image(test_app, GOOD_JPG)
|
||||
response, data = self._post_image_to_feed(test_app, data)
|
||||
|
||||
activity = {
|
||||
"verb": "post",
|
||||
"object": {
|
||||
"objectType": "comment",
|
||||
"content": "comment commenty comment ^_^",
|
||||
"inReplyTo": data["object"],
|
||||
}
|
||||
}
|
||||
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
# Post the comment.
|
||||
response, comment_data = self._activity_to_feed(test_app, activity)
|
||||
|
||||
# change who uploaded the comment as it's easier than changing
|
||||
comment_id = comment_data["object"]["id"]
|
||||
comment = MediaComment.query.filter_by(id=comment_id).first()
|
||||
comment.author = self.other_user.id
|
||||
comment.save()
|
||||
|
||||
# Update the comment as someone else.
|
||||
comment_data["object"]["content"] = "Yep"
|
||||
activity = {
|
||||
"verb": "update",
|
||||
"object": comment_data["object"]
|
||||
}
|
||||
|
||||
with self.mock_oauth():
|
||||
with pytest.raises(AppError) as excinfo:
|
||||
test_app.post(
|
||||
"/api/user/{0}/feed".format(self.user.username),
|
||||
json.dumps(activity),
|
||||
headers=headers
|
||||
)
|
||||
|
||||
assert "403 FORBIDDEN" in excinfo.value.message
|
||||
|
||||
def test_profile(self, test_app):
|
||||
""" Tests profile endpoint """
|
||||
uri = "/api/user/{0}/profile".format(self.user.username)
|
||||
with self.mock_oauth():
|
||||
response = test_app.get(uri)
|
||||
profile = json.loads(response.body)
|
||||
|
||||
assert response.status_code == 200
|
||||
|
||||
assert profile["preferredUsername"] == self.user.username
|
||||
assert profile["objectType"] == "person"
|
||||
|
||||
assert "links" in profile
|
||||
|
||||
def test_user(self, test_app):
|
||||
""" Test the user endpoint """
|
||||
uri = "/api/user/{0}/".format(self.user.username)
|
||||
with self.mock_oauth():
|
||||
response = test_app.get(uri)
|
||||
user = json.loads(response.body)
|
||||
|
||||
assert response.status_code == 200
|
||||
|
||||
assert user["nickname"] == self.user.username
|
||||
assert user["updated"] == self.user.created.isoformat()
|
||||
assert user["published"] == self.user.created.isoformat()
|
||||
|
||||
# Test profile exists but self.test_profile will test the value
|
||||
assert "profile" in response
|
||||
|
||||
def test_whoami_without_login(self, test_app):
|
||||
""" Test that whoami endpoint returns error when not logged in """
|
||||
with pytest.raises(AppError) as excinfo:
|
||||
response = test_app.get("/api/whoami")
|
||||
|
||||
assert "401 UNAUTHORIZED" in excinfo.value.message
|
||||
|
||||
def test_read_feed(self, test_app):
|
||||
""" Test able to read objects from the feed """
|
||||
response, data = self._upload_image(test_app, GOOD_JPG)
|
||||
response, data = self._post_image_to_feed(test_app, data)
|
||||
|
||||
uri = "/api/user/{0}/feed".format(self.active_user.username)
|
||||
with self.mock_oauth():
|
||||
response = test_app.get(uri)
|
||||
feed = json.loads(response.body)
|
||||
|
||||
assert response.status_code == 200
|
||||
|
||||
# Check it has the attributes it should
|
||||
assert "displayName" in feed
|
||||
assert "objectTypes" in feed
|
||||
assert "url" in feed
|
||||
assert "links" in feed
|
||||
assert "author" in feed
|
||||
assert "items" in feed
|
||||
|
||||
# Check that image i uploaded is there
|
||||
assert feed["items"][0]["verb"] == "post"
|
||||
assert feed["items"][0]["actor"]
|
||||
|
||||
def test_cant_post_to_someone_elses_feed(self, test_app):
|
||||
""" Test that can't post to someone elses feed """
|
||||
response, data = self._upload_image(test_app, GOOD_JPG)
|
||||
self.active_user = self.other_user
|
||||
|
||||
with self.mock_oauth():
|
||||
with pytest.raises(AppError) as excinfo:
|
||||
self._post_image_to_feed(test_app, data)
|
||||
|
||||
assert "403 FORBIDDEN" in excinfo.value.message
|
||||
|
||||
def test_object_endpoint_requestable(self, test_app):
|
||||
""" Test that object endpoint can be requested """
|
||||
response, data = self._upload_image(test_app, GOOD_JPG)
|
||||
response, data = self._post_image_to_feed(test_app, data)
|
||||
object_id = data["object"]["id"]
|
||||
|
||||
with self.mock_oauth():
|
||||
response = test_app.get(data["object"]["links"]["self"]["href"])
|
||||
data = json.loads(response.body)
|
||||
|
||||
assert response.status_code == 200
|
||||
|
||||
assert object_id == data["id"]
|
||||
assert "url" in data
|
||||
assert "links" in data
|
||||
assert data["objectType"] == "image"
|
||||
|
||||
@@ -48,7 +48,8 @@ def test_setup_celery_from_config():
|
||||
assert isinstance(fake_celery_module.CELERYD_ETA_SCHEDULER_PRECISION, float)
|
||||
assert fake_celery_module.CELERY_RESULT_PERSISTENT is True
|
||||
assert fake_celery_module.CELERY_IMPORTS == [
|
||||
'foo.bar.baz', 'this.is.an.import', 'mediagoblin.processing.task', 'mediagoblin.notifications.task']
|
||||
'foo.bar.baz', 'this.is.an.import', 'mediagoblin.processing.task', \
|
||||
'mediagoblin.notifications.task', 'mediagoblin.submit.task']
|
||||
assert fake_celery_module.CELERY_RESULT_BACKEND == 'database'
|
||||
assert fake_celery_module.CELERY_RESULT_DBURI == (
|
||||
'sqlite:///' +
|
||||
|
||||
@@ -15,10 +15,11 @@
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import six.moves.urllib.parse as urlparse
|
||||
import pytest
|
||||
|
||||
from mediagoblin import mg_globals
|
||||
from mediagoblin.db.models import User
|
||||
from mediagoblin.tests.tools import fixture_add_user
|
||||
from mediagoblin.db.models import User, MediaEntry
|
||||
from mediagoblin.tests.tools import fixture_add_user, fixture_media_entry
|
||||
from mediagoblin import auth
|
||||
from mediagoblin.tools import template, mail
|
||||
|
||||
@@ -173,3 +174,81 @@ class TestUserEdit(object):
|
||||
email = User.query.filter_by(username='chris').first().email
|
||||
assert email == 'new@example.com'
|
||||
# test changing the url inproperly
|
||||
|
||||
class TestMetaDataEdit:
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup(self, test_app):
|
||||
# set up new user
|
||||
self.user_password = u'toast'
|
||||
self.user = fixture_add_user(password = self.user_password,
|
||||
privileges=[u'active',u'admin'])
|
||||
self.test_app = test_app
|
||||
|
||||
def login(self, test_app):
|
||||
test_app.post(
|
||||
'/auth/login/', {
|
||||
'username': self.user.username,
|
||||
'password': self.user_password})
|
||||
|
||||
def do_post(self, data, *context_keys, **kwargs):
|
||||
url = kwargs.pop('url', '/submit/')
|
||||
do_follow = kwargs.pop('do_follow', False)
|
||||
template.clear_test_template_context()
|
||||
response = self.test_app.post(url, data, **kwargs)
|
||||
if do_follow:
|
||||
response.follow()
|
||||
context_data = template.TEMPLATE_TEST_CONTEXT
|
||||
for key in context_keys:
|
||||
context_data = context_data[key]
|
||||
return response, context_data
|
||||
|
||||
def test_edit_metadata(self, test_app):
|
||||
media_entry = fixture_media_entry(uploader=self.user.id,
|
||||
state=u'processed')
|
||||
media_slug = "/u/{username}/m/{media_id}/metadata/".format(
|
||||
username = str(self.user.username),
|
||||
media_id = str(media_entry.id))
|
||||
|
||||
self.login(test_app)
|
||||
response = test_app.get(media_slug)
|
||||
assert response.status == '200 OK'
|
||||
assert media_entry.media_metadata == {}
|
||||
# First test adding in metadata
|
||||
################################
|
||||
response, context = self.do_post({
|
||||
"media_metadata-0-identifier":"dc:title",
|
||||
"media_metadata-0-value":"Some title",
|
||||
"media_metadata-1-identifier":"dc:creator",
|
||||
"media_metadata-1-value":"Me"},url=media_slug)
|
||||
|
||||
media_entry = MediaEntry.query.first()
|
||||
new_metadata = media_entry.media_metadata
|
||||
assert new_metadata != {}
|
||||
assert new_metadata.get("dc:title") == "Some title"
|
||||
assert new_metadata.get("dc:creator") == "Me"
|
||||
# Now test removing the metadata
|
||||
################################
|
||||
response, context = self.do_post({
|
||||
"media_metadata-0-identifier":"dc:title",
|
||||
"media_metadata-0-value":"Some title"},url=media_slug)
|
||||
|
||||
media_entry = MediaEntry.query.first()
|
||||
new_metadata = media_entry.media_metadata
|
||||
assert new_metadata.get("dc:title") == "Some title"
|
||||
assert new_metadata.get("dc:creator") is None
|
||||
# Now test adding bad metadata
|
||||
###############################
|
||||
response, context = self.do_post({
|
||||
"media_metadata-0-identifier":"dc:title",
|
||||
"media_metadata-0-value":"Some title",
|
||||
"media_metadata-1-identifier":"dc:creator",
|
||||
"media_metadata-1-value":"Me",
|
||||
"media_metadata-2-identifier":"dc:created",
|
||||
"media_metadata-2-value":"On the worst day"},url=media_slug)
|
||||
|
||||
media_entry = MediaEntry.query.first()
|
||||
old_metadata = new_metadata
|
||||
new_metadata = media_entry.media_metadata
|
||||
assert new_metadata == old_metadata
|
||||
assert ("u'On the worst day' is not a 'date-time'" in
|
||||
response.body)
|
||||
|
||||
@@ -67,7 +67,8 @@ def test_ldap_plugin(ldap_plugin_app):
|
||||
assert form.username.errors == [u'This field is required.']
|
||||
assert form.password.errors == [u'This field is required.']
|
||||
|
||||
@mock.patch('mediagoblin.plugins.ldap.tools.LDAP.login', mock.Mock(return_value=return_value()))
|
||||
@mock.patch('mediagoblin.plugins.ldap.tools.LDAP.login',
|
||||
mock.Mock(return_value=return_value()))
|
||||
def _test_authentication():
|
||||
template.clear_test_template_context()
|
||||
res = ldap_plugin_app.post(
|
||||
@@ -75,7 +76,8 @@ def test_ldap_plugin(ldap_plugin_app):
|
||||
{'username': u'chris',
|
||||
'password': u'toast'})
|
||||
|
||||
context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html']
|
||||
context = template.TEMPLATE_TEST_CONTEXT[
|
||||
'mediagoblin/auth/register.html']
|
||||
register_form = context['register_form']
|
||||
|
||||
assert register_form.username.data == u'chris'
|
||||
@@ -89,7 +91,8 @@ def test_ldap_plugin(ldap_plugin_app):
|
||||
res.follow()
|
||||
|
||||
assert urlparse.urlsplit(res.location)[2] == '/u/chris/'
|
||||
assert 'mediagoblin/user_pages/user_nonactive.html' in template.TEMPLATE_TEST_CONTEXT
|
||||
assert 'mediagoblin/user_pages/user_nonactive.html' in \
|
||||
template.TEMPLATE_TEST_CONTEXT
|
||||
|
||||
# Try to register with same email and username
|
||||
template.clear_test_template_context()
|
||||
@@ -98,11 +101,14 @@ def test_ldap_plugin(ldap_plugin_app):
|
||||
{'username': u'chris',
|
||||
'email': u'chris@example.com'})
|
||||
|
||||
context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html']
|
||||
context = template.TEMPLATE_TEST_CONTEXT[
|
||||
'mediagoblin/auth/register.html']
|
||||
register_form = context['register_form']
|
||||
|
||||
assert register_form.email.errors == [u'Sorry, a user with that email address already exists.']
|
||||
assert register_form.username.errors == [u'Sorry, a user with that name already exists.']
|
||||
assert register_form.email.errors == [
|
||||
u'Sorry, a user with that email address already exists.']
|
||||
assert register_form.username.errors == [
|
||||
u'Sorry, a user with that name already exists.']
|
||||
|
||||
# Log out
|
||||
ldap_plugin_app.get('/auth/logout/')
|
||||
|
||||
93
mediagoblin/tests/test_legacy_api.py
Normal file
93
mediagoblin/tests/test_legacy_api.py
Normal file
@@ -0,0 +1,93 @@
|
||||
# GNU MediaGoblin -- federated, autonomous media hosting
|
||||
# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
|
||||
import logging
|
||||
import base64
|
||||
|
||||
import pytest
|
||||
|
||||
from mediagoblin import mg_globals
|
||||
from mediagoblin.tools import template, pluginapi
|
||||
from mediagoblin.tests.tools import fixture_add_user
|
||||
from .resources import GOOD_JPG, GOOD_PNG, EVIL_FILE, EVIL_JPG, EVIL_PNG, \
|
||||
BIG_BLUE
|
||||
|
||||
|
||||
_log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TestAPI(object):
|
||||
def setup(self):
|
||||
self.db = mg_globals.database
|
||||
|
||||
self.user_password = u'4cc355_70k3N'
|
||||
self.user = fixture_add_user(u'joapi', self.user_password,
|
||||
privileges=[u'active',u'uploader'])
|
||||
|
||||
def login(self, test_app):
|
||||
test_app.post(
|
||||
'/auth/login/', {
|
||||
'username': self.user.username,
|
||||
'password': self.user_password})
|
||||
|
||||
def get_context(self, template_name):
|
||||
return template.TEMPLATE_TEST_CONTEXT[template_name]
|
||||
|
||||
def http_auth_headers(self):
|
||||
return {'Authorization': 'Basic {0}'.format(
|
||||
base64.b64encode(':'.join([
|
||||
self.user.username,
|
||||
self.user_password])))}
|
||||
|
||||
def do_post(self, data, test_app, **kwargs):
|
||||
url = kwargs.pop('url', '/api/submit')
|
||||
do_follow = kwargs.pop('do_follow', False)
|
||||
|
||||
if not 'headers' in kwargs.keys():
|
||||
kwargs['headers'] = self.http_auth_headers()
|
||||
|
||||
response = test_app.post(url, data, **kwargs)
|
||||
|
||||
if do_follow:
|
||||
response.follow()
|
||||
|
||||
return response
|
||||
|
||||
def upload_data(self, filename):
|
||||
return {'upload_files': [('file', filename)]}
|
||||
|
||||
def test_1_test_test_view(self, test_app):
|
||||
self.login(test_app)
|
||||
|
||||
response = test_app.get(
|
||||
'/api/test',
|
||||
headers=self.http_auth_headers())
|
||||
|
||||
assert response.body == \
|
||||
'{"username": "joapi", "email": "joapi@example.com"}'
|
||||
|
||||
def test_2_test_submission(self, test_app):
|
||||
self.login(test_app)
|
||||
|
||||
response = self.do_post(
|
||||
{'title': 'Great JPG!'},
|
||||
test_app,
|
||||
**self.upload_data(GOOD_JPG))
|
||||
|
||||
assert response.status_int == 200
|
||||
|
||||
assert self.db.MediaEntry.query.filter_by(title=u'Great JPG!').first()
|
||||
78
mediagoblin/tests/test_metadata.py
Normal file
78
mediagoblin/tests/test_metadata.py
Normal file
@@ -0,0 +1,78 @@
|
||||
# GNU MediaGoblin -- federated, autonomous media hosting
|
||||
# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import pytest
|
||||
from mediagoblin.tools.metadata import compact_and_validate
|
||||
from jsonschema import ValidationError
|
||||
|
||||
class TestMetadataFunctionality:
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _setup(self, test_app):
|
||||
self.test_app = test_app
|
||||
|
||||
def testCompactAndValidate(self):
|
||||
# First, test out a well formatted piece of metadata
|
||||
######################################################
|
||||
test_metadata = {
|
||||
'dc:title':'My Pet Bunny',
|
||||
'dc:description':'A picture displaying how cute my pet bunny is.',
|
||||
'location':'/home/goblin/Pictures/bunny.png',
|
||||
'license':'http://www.gnu.org/licenses/gpl.txt'
|
||||
}
|
||||
jsonld_metadata =compact_and_validate(test_metadata)
|
||||
assert jsonld_metadata
|
||||
assert jsonld_metadata.get('dc:title') == 'My Pet Bunny'
|
||||
# Free floating nodes should be removed
|
||||
assert jsonld_metadata.get('location') is None
|
||||
assert jsonld_metadata.get('@context') == \
|
||||
u"http://www.w3.org/2013/json-ld-context/rdfa11"
|
||||
|
||||
# Next, make sure that various badly formatted metadata
|
||||
# will be rejected.
|
||||
#######################################################
|
||||
#,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.
|
||||
# Metadata with a non-URI license should fail :
|
||||
#`'`'`'`'`'`'`'`'`'`'`'`'`'`'`'`'`'`'`'`'`'`'`'
|
||||
metadata_fail_1 = {
|
||||
'dc:title':'My Pet Bunny',
|
||||
'dc:description':'A picture displaying how cute my pet bunny is.',
|
||||
'location':'/home/goblin/Pictures/bunny.png',
|
||||
'license':'All Rights Reserved.'
|
||||
}
|
||||
jsonld_fail_1 = None
|
||||
try:
|
||||
jsonld_fail_1 = compact_and_validate(metadata_fail_1)
|
||||
except ValidationError, e:
|
||||
assert e.message == "'All Rights Reserved.' is not a 'uri'"
|
||||
assert jsonld_fail_1 == None
|
||||
#,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,
|
||||
# Metadata with an ivalid date-time dc:created should fail :
|
||||
#`'`'`'`'`'`'`'`'`'`'`'`'`'`'`'`'`'`'`'`'`'`'`'`'`'`'`'`'`''
|
||||
metadata_fail_2 = {
|
||||
'dc:title':'My Pet Bunny',
|
||||
'dc:description':'A picture displaying how cute my pet bunny is.',
|
||||
'location':'/home/goblin/Pictures/bunny.png',
|
||||
'license':'http://www.gnu.org/licenses/gpl.txt',
|
||||
'dc:created':'The other day'
|
||||
}
|
||||
jsonld_fail_2 = None
|
||||
try:
|
||||
jsonld_fail_2 = compact_and_validate(metadata_fail_2)
|
||||
except ValidationError, e:
|
||||
assert e.message == "'The other day' is not a 'date-time'"
|
||||
assert jsonld_fail_2 == None
|
||||
|
||||
@@ -14,7 +14,16 @@
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import pytz
|
||||
import datetime
|
||||
|
||||
from werkzeug.datastructures import FileStorage
|
||||
|
||||
from .resources import GOOD_JPG
|
||||
from mediagoblin.db.base import Session
|
||||
from mediagoblin.media_types import sniff_media
|
||||
from mediagoblin.submit.lib import new_upload_entry
|
||||
from mediagoblin.submit.task import collect_garbage
|
||||
from mediagoblin.db.models import User, MediaEntry, MediaComment
|
||||
from mediagoblin.tests.tools import fixture_add_user, fixture_media_entry
|
||||
|
||||
@@ -91,3 +100,35 @@ def test_media_deletes_broken_attachment(test_app):
|
||||
|
||||
MediaEntry.query.get(media.id).delete()
|
||||
User.query.get(user_a.id).delete()
|
||||
|
||||
def test_garbage_collection_task(test_app):
|
||||
""" Test old media entry are removed by GC task """
|
||||
user = fixture_add_user()
|
||||
|
||||
# Create a media entry that's unprocessed and over an hour old.
|
||||
entry_id = 72
|
||||
now = datetime.datetime.now(pytz.UTC)
|
||||
file_data = FileStorage(
|
||||
stream=open(GOOD_JPG, "rb"),
|
||||
filename="mah_test.jpg",
|
||||
content_type="image/jpeg"
|
||||
)
|
||||
|
||||
# Find media manager
|
||||
media_type, media_manager = sniff_media(file_data, "mah_test.jpg")
|
||||
entry = new_upload_entry(user)
|
||||
entry.id = entry_id
|
||||
entry.title = "Mah Image"
|
||||
entry.slug = "slugy-slug-slug"
|
||||
entry.media_type = 'image'
|
||||
entry.created = now - datetime.timedelta(days=2)
|
||||
entry.save()
|
||||
|
||||
# Validate the model exists
|
||||
assert MediaEntry.query.filter_by(id=entry_id).first() is not None
|
||||
|
||||
# Call the garbage collection task
|
||||
collect_garbage()
|
||||
|
||||
# Now validate the image has been deleted
|
||||
assert MediaEntry.query.filter_by(id=entry_id).first() is None
|
||||
|
||||
@@ -184,20 +184,17 @@ class TestUserHasPrivilege:
|
||||
self._setup()
|
||||
|
||||
# then test out the user.has_privilege method for one privilege
|
||||
assert not self.natalie_user.has_privilege(u'commenter')
|
||||
assert self.aeva_user.has_privilege(u'active')
|
||||
assert not self.aeva_user.has_privilege(u'admin')
|
||||
assert self.natalie_user.has_privilege(u'active')
|
||||
|
||||
|
||||
def test_user_has_privileges_multiple(self, test_app):
|
||||
def test_allow_admin(self, test_app):
|
||||
self._setup()
|
||||
|
||||
# when multiple args are passed to has_privilege, the method returns
|
||||
# True if the user has ANY of the privileges
|
||||
assert self.natalie_user.has_privilege(u'admin',u'commenter')
|
||||
assert self.aeva_user.has_privilege(u'moderator',u'active')
|
||||
assert not self.natalie_user.has_privilege(u'commenter',u'uploader')
|
||||
|
||||
# This should work because she is an admin.
|
||||
assert self.natalie_user.has_privilege(u'commenter')
|
||||
|
||||
# Test that we can look this out ignoring that she's an admin
|
||||
assert not self.natalie_user.has_privilege(u'commenter', allow_admin=False)
|
||||
|
||||
def test_media_data_init(test_app):
|
||||
Session.rollback()
|
||||
|
||||
@@ -162,4 +162,3 @@ class TestOAuth(object):
|
||||
assert request_token.client == client.id
|
||||
assert request_token.used == False
|
||||
assert request_token.callback == request_query["oauth_callback"]
|
||||
|
||||
|
||||
@@ -14,10 +14,15 @@
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import mock
|
||||
import email
|
||||
import pytest
|
||||
import smtplib
|
||||
import pkg_resources
|
||||
|
||||
import six
|
||||
|
||||
from mediagoblin.tests.tools import get_app
|
||||
from mediagoblin.tools import common, url, translate, mail, text, testing
|
||||
|
||||
testing._activate_testing()
|
||||
@@ -71,6 +76,28 @@ I hope you like unit tests JUST AS MUCH AS I DO!"""
|
||||
|
||||
I hope you like unit tests JUST AS MUCH AS I DO!"""
|
||||
|
||||
@pytest.fixture()
|
||||
def starttls_enabled_app(request):
|
||||
return get_app(
|
||||
request,
|
||||
mgoblin_config=pkg_resources.resource_filename(
|
||||
"mediagoblin.tests",
|
||||
"starttls_config.ini"
|
||||
)
|
||||
)
|
||||
|
||||
def test_email_force_starttls(starttls_enabled_app):
|
||||
common.TESTS_ENABLED = False
|
||||
SMTP = lambda *args, **kwargs: mail.FakeMhost()
|
||||
with mock.patch('smtplib.SMTP', SMTP):
|
||||
with pytest.raises(smtplib.SMTPException):
|
||||
mail.send_email(
|
||||
from_addr="notices@my.test.instance.com",
|
||||
to_addrs="someone@someplace.com",
|
||||
subject="Testing is so much fun!",
|
||||
message_body="Ohai ^_^"
|
||||
)
|
||||
|
||||
def test_slugify():
|
||||
assert url.slugify(u'a walk in the park') == u'a-walk-in-the-park'
|
||||
assert url.slugify(u'A Walk in the Park') == u'a-walk-in-the-park'
|
||||
|
||||
@@ -26,13 +26,15 @@ from webtest import TestApp
|
||||
|
||||
from mediagoblin import mg_globals
|
||||
from mediagoblin.db.models import User, MediaEntry, Collection, MediaComment, \
|
||||
CommentSubscription, CommentNotification, Privilege, CommentReport
|
||||
CommentSubscription, CommentNotification, Privilege, CommentReport, Client, \
|
||||
RequestToken, AccessToken
|
||||
from mediagoblin.tools import testing
|
||||
from mediagoblin.init.config import read_mediagoblin_config
|
||||
from mediagoblin.db.base import Session
|
||||
from mediagoblin.meddleware import BaseMeddleware
|
||||
from mediagoblin.auth import gen_password_hash
|
||||
from mediagoblin.gmg_commands.dbupdate import run_dbupdate
|
||||
from mediagoblin.tools.crypto import random_string
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
Reference in New Issue
Block a user