Add more tests for federation APIs

This commit is contained in:
Jessica Tallon 2014-07-11 15:23:55 +01:00
parent 967df5eff0
commit 51ab51921e
5 changed files with 189 additions and 105 deletions

View File

@ -446,9 +446,8 @@ class MediaEntry(Base, MediaEntryMixin):
)
context = {
"id": self.id,
"id": self.id,
"author": author.serialize(request),
"displayName": self.title,
"objectType": self.objectType,
"url": url,
"image": {
@ -464,6 +463,15 @@ class MediaEntry(Base, MediaEntryMixin):
},
}
if self.title:
context["displayName"] = self.title
if self.description:
context["content"] = self.description
if self.license:
context["license"] = self.license
if show_comments:
comments = [comment.serialize(request) for comment in self.get_comments()]
total = len(comments)
@ -478,7 +486,7 @@ class MediaEntry(Base, MediaEntryMixin):
),
}
return context
return context
class FileKeynames(Base):
"""
@ -630,6 +638,7 @@ class MediaComment(Base, MediaCommentMixin):
media = MediaEntry.query.filter_by(id=self.media_entry).first()
author = self.get_author
context = {
"id": self.id,
"objectType": "comment",
"content": self.content,
"inReplyTo": media.serialize(request, show_comments=False),

View File

@ -116,15 +116,27 @@ def feed(request):
elif obj.get("objectType", None) == "image":
# Posting an image to the feed
# NB: This is currently just handing the image back until we have an
# to send the image to the actual feed
media_id = int(data["object"]["id"])
media = MediaEntry.query.filter_by(id=media_id)
if media is None:
error = "No such 'image' with id '{0}'".format(id=media_id)
return json_response(error, status=404)
media = media[0]
media = media.first()
obj = data["object"]
if "displayName" in obj:
media.title = obj["displayName"]
if "content" in obj:
media.description = obj["content"]
if "license" in obj:
media.license = obj["license"]
media.save()
manager = media.media_manager.api_add_to_feed(request, media)
return json_response({
"verb": "post",
"object": media.serialize(request)
@ -206,6 +218,11 @@ def feed(request):
}
return json_response(activity)
elif request.method != "GET":
# Currently unsupported
error = "Unsupported HTTP method {0}".format(request.method)
return json_response({"error": error}, status=501)
feed_url = request.urlgen(
"mediagoblin.federation.feed",
username=request.user.username,

View File

@ -63,10 +63,7 @@ class ImageMediaManager(MediaManagerBase):
""" This handles a image upload request """
# Use the same kind of method from mediagoblin/submit/views:submit_start
entry.media_type = unicode(MEDIA_TYPE)
entry.title = unicode(request.args.get("title", file_data.filename))
entry.description = unicode(request.args.get("description", ""))
entry.license = request.args.get("license", "") # not part of the standard API
entry.title = file_data.filename
entry.generate_slug()
queue_file = prepare_queue_task(request.app, entry, file_data.filename)
@ -74,6 +71,16 @@ class ImageMediaManager(MediaManagerBase):
queue_file.write(request.data)
entry.save()
return json_response(entry.serialize(request))
@staticmethod
def api_add_to_feed(request, entry):
""" Add media to Feed """
if entry.title:
# Shame we have to do this here but we didn't have the data in
# api_upload_request as no filename is usually specified.
entry.slug = None
entry.generate_slug()
feed_url = request.urlgen(
'mediagoblin.user_pages.atom_feed',

View File

@ -23,11 +23,10 @@ from webtest import AppError
from mediagoblin import mg_globals
from .resources import GOOD_JPG
from mediagoblin.db.models import User
from mediagoblin.db.models import User, MediaEntry
from mediagoblin.tests.tools import fixture_add_user
from mediagoblin.moderation.tools import take_away_privileges
from .resources import GOOD_JPG, GOOD_PNG, EVIL_FILE, EVIL_JPG, EVIL_PNG, \
BIG_BLUE
from .resources import GOOD_JPG
class TestAPI(object):
@ -35,8 +34,54 @@ class TestAPI(object):
def setup(self, test_app):
self.test_app = test_app
self.db = mg_globals.database
self.user = fixture_add_user(privileges=[u'active', u'uploader'])
def _activity_to_feed(self, test_app, activity, headers=None):
""" Posts an activity to the user's feed """
if headers:
headers.setdefault("Content-Type", "application/json")
else:
headers = {"Content-Type": "application/json"}
with mock.patch("mediagoblin.decorators.oauth_required", new_callable=self.mocked_oauth_required):
response = test_app.post(
"/api/user/{0}/feed".format(self.user.username),
json.dumps(activity),
headers=headers
)
return response, json.loads(response.body)
def _upload_image(self, test_app, image):
""" Uploads and image to MediaGoblin via pump.io API """
data = open(image, "rb").read()
headers = {
"Content-Type": "image/jpeg",
"Content-Length": str(len(data))
}
with mock.patch("mediagoblin.decorators.oauth_required", new_callable=self.mocked_oauth_required):
response = test_app.post(
"/api/user/{0}/uploads".format(self.user.username),
data,
headers=headers
)
image = json.loads(response.body)
return response, image
def _post_image_to_feed(self, test_app, image):
""" Posts an already uploaded image to feed """
activity = {
"verb": "post",
"object": image,
}
return self._activity_to_feed(test_app, activity)
def mocked_oauth_required(self, *args, **kwargs):
""" Mocks mediagoblin.decorator.oauth_required to always validate """
@ -52,46 +97,63 @@ class TestAPI(object):
def test_can_post_image(self, test_app):
""" Tests that an image can be posted to the API """
# First request we need to do is to upload the image
data = open(GOOD_JPG, "rb").read()
headers = {
"Content-Type": "image/jpeg",
"Content-Length": str(len(data))
}
response, image = self._upload_image(test_app, GOOD_JPG)
# I should have got certain things back
assert response.status_code == 200
assert "id" in image
assert "fullImage" in image
assert "url" in image["fullImage"]
assert "url" in image
assert "author" in image
assert "published" in image
assert "updated" in image
assert image["objectType"] == "image"
# Check that we got the response we're expecting
response, _ = self._post_image_to_feed(test_app, image)
assert response.status_code == 200
def test_upload_image_with_filename(self, test_app):
""" Tests that you can upload an image with filename and description """
response, data = self._upload_image(test_app, GOOD_JPG)
response, data = self._post_image_to_feed(test_app, data)
image = data["object"]
# Now we need to add a title and description
title = "My image ^_^"
description = "This is my super awesome image :D"
license = "CC-BY-SA"
image["displayName"] = title
image["content"] = description
image["license"] = license
activity = {"verb": "update", "object": image}
with mock.patch("mediagoblin.decorators.oauth_required", new_callable=self.mocked_oauth_required):
response = test_app.post(
"/api/user/{0}/uploads".format(self.user.username),
data,
headers=headers
)
image = json.loads(response.body)
# I should have got certain things back
assert response.status_code == 200
assert "id" in image
assert "fullImage" in image
assert "url" in image["fullImage"]
assert "url" in image
assert "author" in image
assert "published" in image
assert "updated" in image
assert image["objectType"] == "image"
# Now post this to the feed
activity = {
"verb": "post",
"object": image,
}
response = test_app.post(
"/api/user/{0}/feed".format(self.user.username),
activity
json.dumps(activity),
headers={"Content-Type": "application/json"}
)
# Check that we got the response we're expecting
assert response.status_code == 200
image = json.loads(response.body)["object"]
# Check everything has been set on the media correctly
media = MediaEntry.query.filter_by(id=image["id"]).first()
assert media.title == title
assert media.description == description
assert media.license == license
# Check we're being given back everything we should on an update
assert image["id"] == media.id
assert image["displayName"] == title
assert image["content"] == description
assert image["license"] == license
def test_only_uploaders_post_image(self, test_app):
""" Test that only uploaders can upload images """
@ -115,3 +177,50 @@ class TestAPI(object):
# Assert that we've got a 403
assert "403 FORBIDDEN" in excinfo.value.message
def test_post_comment(self, test_app):
""" Tests that I can post an comment media """
# Upload some media to comment on
response, data = self._upload_image(test_app, GOOD_JPG)
response, data = self._post_image_to_feed(test_app, data)
content = "Hai this is a comment on this lovely picture ^_^"
activity = {
"verb": "post",
"object": {
"objectType": "comment",
"content": content,
"inReplyTo": data["object"],
}
}
response, comment_data = self._activity_to_feed(test_app, activity)
assert response.status_code == 200
# Find the objects in the database
media = MediaEntry.query.filter_by(id=data["object"]["id"]).first()
comment = media.get_comments()[0]
# Tests that it matches in the database
assert comment.author == self.user.id
assert comment.content == content
# Test that the response is what we should be given
assert comment.id == comment_data["object"]["id"]
assert comment.content == comment_data["object"]["content"]
def test_profile(self, test_app):
""" Tests profile endpoint """
uri = "/api/user/{0}/profile".format(self.user.username)
with mock.patch("mediagoblin.decorators.oauth_required", new_callable=self.mocked_oauth_required):
response = test_app.get(uri)
profile = json.loads(response.body)
assert response.status_code == 200
assert profile["preferredUsername"] == self.user.username
assert profile["objectType"] == "person"
assert "links" in profile

View File

@ -347,61 +347,3 @@ def fixture_add_comment_report(comment=None, reported_user=None,
return comment_report
def fixture_add_oauth_client(client_name=None, client_type="native",
redirect_uri=None, contacts=None):
client_id = random_string(22, OAUTH_ALPHABET)
client_secret = random_string(43, OAUTH_ALPHABET)
client = Client(
id=client_id,
secret=client_secret,
expirey=None,
application_type=client_type,
application_name=client_name,
contacts=contacts,
redirect_uri=redirect_uri
)
client.save()
return client
def fixture_add_oauth_request_token(user, client=None):
if client is None:
client = fixture_add_oauth_client()
rt_token = random_string(22, OAUTH_ALPHABET)
rt_secret = random_string(43, OAUTH_ALPHABET)
rt_verifier = random_string(22, OAUTH_ALPHABET)
request_token = RequestToken(
token=rt_token,
secret=rt_secret,
user=user.id,
used=True,
authenticated=True,
verifier=rt_verifier,
)
request_token.save()
return request_token
def fixture_add_oauth_access_token(user, client=None, request_token=None):
if client is None:
client = fixture_add_oauth_client()
if request_token is None:
request_token = fixture_add_oauth_request_token(user)
at_token = random_string(22, OAUTH_ALPHABET)
at_secret = random_string(43, OAUTH_ALPHABET)
access_token = AccessToken(
token=at_token,
secret=at_secret,
user=user.id,
request_token=request_token.token
)
access_token.save()
return access_token