Add more security checks when updating objects and tests

This commit is contained in:
Jessica Tallon 2014-08-01 22:26:12 +01:00
parent bf92ac6d7c
commit 8d75091de2
2 changed files with 129 additions and 37 deletions

View File

@ -20,7 +20,6 @@ import mimetypes
from werkzeug.datastructures import FileStorage from werkzeug.datastructures import FileStorage
from mediagoblin.media_types import sniff_media
from mediagoblin.decorators import oauth_required from mediagoblin.decorators import oauth_required
from mediagoblin.federation.decorators import user_has_privilege from mediagoblin.federation.decorators import user_has_privilege
from mediagoblin.db.models import User, MediaEntry, MediaComment from mediagoblin.db.models import User, MediaEntry, MediaComment
@ -36,18 +35,19 @@ from mediagoblin.media_types.image import MEDIA_TYPE as IMAGE_MEDIA_TYPE
def profile(request, raw=False): def profile(request, raw=False):
""" This is /api/user/<username>/profile - This will give profile info """ """ This is /api/user/<username>/profile - This will give profile info """
user = request.matchdict["username"] user = request.matchdict["username"]
requested_user = User.query.filter_by(username=user) requested_user = User.query.filter_by(username=user).first()
if requested_user is None: if user is None:
return json_error("No such 'user' with id '{0}'".format(user), 404) return json_error(
"No such 'user' with id '{0}'".format(user),
user = requested_user[0] status=404
)
if raw: if raw:
return (user, user.serialize(request)) return (requested_user.username, requested_user.serialize(request))
# user profiles are public so return information # user profiles are public so return information
return json_response(user.serialize(request)) return json_response(requested_user.serialize(request))
@oauth_required @oauth_required
def user(request): def user(request):
@ -68,12 +68,11 @@ def user(request):
def uploads(request): def uploads(request):
""" Endpoint for file uploads """ """ Endpoint for file uploads """
user = request.matchdict["username"] user = request.matchdict["username"]
requested_user = User.query.filter_by(username=user) requested_user = User.query.filter_by(username=user).first()
if requested_user is None: if requested_user is None:
return json_error("No such 'user' with id '{0}'".format(user), 404) return json_error("No such 'user' with id '{0}'".format(user), 404)
requested_user = requested_user[0]
if request.method == "POST": if request.method == "POST":
# Ensure that the user is only able to upload to their own # Ensure that the user is only able to upload to their own
# upload endpoint. # upload endpoint.
@ -82,7 +81,7 @@ def uploads(request):
"Not able to post to another users feed.", "Not able to post to another users feed.",
status=403 status=403
) )
# Wrap the data in the werkzeug file wrapper # Wrap the data in the werkzeug file wrapper
if "Content-Type" not in request.headers: if "Content-Type" not in request.headers:
return json_error( return json_error(
@ -97,7 +96,6 @@ def uploads(request):
) )
# Find media manager # Find media manager
media_type, media_manager = sniff_media(file_data, filename)
entry = new_upload_entry(request.user) entry = new_upload_entry(request.user)
entry.media_type = IMAGE_MEDIA_TYPE entry.media_type = IMAGE_MEDIA_TYPE
return api_upload_request(request, file_data, entry) return api_upload_request(request, file_data, entry)
@ -109,13 +107,12 @@ def uploads(request):
def feed(request): def feed(request):
""" Handles the user's outbox - /api/user/<username>/feed """ """ Handles the user's outbox - /api/user/<username>/feed """
user = request.matchdict["username"] user = request.matchdict["username"]
requested_user = User.query.filter_by(username=user) requested_user = User.query.filter_by(username=user).first()
# check if the user exists # check if the user exists
if requested_user is None: if requested_user is None:
return json_error("No such 'user' with id '{0}'".format(user), 404) return json_error("No such 'user' with id '{0}'".format(user), 404)
requested_user = requested_user[0]
if request.data: if request.data:
data = json.loads(request.data) data = json.loads(request.data)
else: else:
@ -123,7 +120,9 @@ def feed(request):
# We need to check that the user they're posting to is # We need to check that the user they're posting to is
# the person that they are. # the person that they are.
if request.method in ["POST", "PUT"] and requested_user.id != request.user.id: if request.method in ["POST", "PUT"] and \
requested_user.id != request.user.id:
return json_error( return json_error(
"Not able to post to another users feed.", "Not able to post to another users feed.",
status=403 status=403
@ -151,14 +150,13 @@ def feed(request):
elif obj.get("objectType", None) == "image": elif obj.get("objectType", None) == "image":
# Posting an image to the feed # Posting an image to the feed
media_id = int(data["object"]["id"]) media_id = int(data["object"]["id"])
media = MediaEntry.query.filter_by(id=media_id) media = MediaEntry.query.filter_by(id=media_id).first()
if media is None: if media is None:
return json_response( return json_response(
"No such 'image' with id '{0}'".format(id=media_id), "No such 'image' with id '{0}'".format(id=media_id),
status=404 status=404
) )
media = media.first()
if not media.unserialize(data["object"]): if not media.unserialize(data["object"]):
return json_error( return json_error(
"Invalid 'image' with id '{0}'".format(media_id) "Invalid 'image' with id '{0}'".format(media_id)
@ -203,13 +201,20 @@ def feed(request):
status=403 status=403
) )
comment = MediaComment.query.filter_by(id=obj_id) comment = MediaComment.query.filter_by(id=obj_id).first()
if comment is None: if comment is None:
return json_error( return json_error(
"No such 'comment' with id '{0}'.".format(obj_id) "No such 'comment' with id '{0}'.".format(obj_id)
) )
comment = comment[0] # Check that the person trying to update the comment is
# the author of the comment.
if comment.author != request.user.id:
return json_error(
"Only author of comment is able to update comment.",
status=403
)
if not comment.unserialize(data["object"]): if not comment.unserialize(data["object"]):
return json_error( return json_error(
"Invalid 'comment' with id '{0}'".format(obj_id) "Invalid 'comment' with id '{0}'".format(obj_id)
@ -224,13 +229,20 @@ def feed(request):
return json_response(activity) return json_response(activity)
elif obj["objectType"] == "image": elif obj["objectType"] == "image":
image = MediaEntry.query.filter_by(id=obj_id) image = MediaEntry.query.filter_by(id=obj_id).first()
if image is None: if image is None:
return json_error( return json_error(
"No such 'image' with the id '{0}'.".format(obj_id) "No such 'image' with the id '{0}'.".format(obj_id)
) )
image = image[0] # Check that the person trying to update the comment is
# the author of the comment.
if image.uploader != request.user.id:
return json_error(
"Only uploader of image is able to update image.",
status=403
)
if not image.unserialize(obj): if not image.unserialize(obj):
return json_error( return json_error(
"Invalid 'image' with id '{0}'".format(obj_id) "Invalid 'image' with id '{0}'".format(obj_id)
@ -314,7 +326,10 @@ def object(request, raw_obj=False):
if object_type not in ["image"]: if object_type not in ["image"]:
# not sure why this is 404, maybe ask evan. Maybe 400? # not sure why this is 404, maybe ask evan. Maybe 400?
return json_error("Unknown type: {0}".format(object_type), status=404) return json_error(
"Unknown type: {0}".format(object_type),
status=404
)
media = MediaEntry.query.filter_by(id=object_id).first() media = MediaEntry.query.filter_by(id=object_id).first()
if media is None: if media is None:

View File

@ -22,7 +22,7 @@ from webtest import AppError
from .resources import GOOD_JPG from .resources import GOOD_JPG
from mediagoblin import mg_globals from mediagoblin import mg_globals
from mediagoblin.db.models import User, MediaEntry from mediagoblin.db.models import User, MediaEntry, MediaComment
from mediagoblin.tests.tools import fixture_add_user from mediagoblin.tests.tools import fixture_add_user
from mediagoblin.moderation.tools import take_away_privileges from mediagoblin.moderation.tools import take_away_privileges
@ -119,7 +119,7 @@ class TestAPI(object):
# Check that we got the response we're expecting # Check that we got the response we're expecting
response, _ = self._post_image_to_feed(test_app, image) response, _ = self._post_image_to_feed(test_app, image)
assert response.status_code == 200 assert response.status_code == 200
def test_unable_to_upload_as_someone_else(self, test_app): def test_unable_to_upload_as_someone_else(self, test_app):
""" Test that can't upload as someoen else """ """ Test that can't upload as someoen else """
data = open(GOOD_JPG, "rb").read() data = open(GOOD_JPG, "rb").read()
@ -127,10 +127,10 @@ class TestAPI(object):
"Content-Type": "image/jpeg", "Content-Type": "image/jpeg",
"Content-Length": str(len(data)) "Content-Length": str(len(data))
} }
with mock.patch("mediagoblin.decorators.oauth_required", with mock.patch("mediagoblin.decorators.oauth_required",
new_callable=self.mocked_oauth_required): new_callable=self.mocked_oauth_required):
# Will be self.user trying to upload as self.other_user # Will be self.user trying to upload as self.other_user
with pytest.raises(AppError) as excinfo: with pytest.raises(AppError) as excinfo:
test_app.post( test_app.post(
@ -138,22 +138,22 @@ class TestAPI(object):
data, data,
headers=headers headers=headers
) )
assert "403 FORBIDDEN" in excinfo.value.message assert "403 FORBIDDEN" in excinfo.value.message
def test_unable_to_post_feed_as_someone_else(self, test_app): def test_unable_to_post_feed_as_someone_else(self, test_app):
""" Tests that can't post an image to someone else's feed """ """ Tests that can't post an image to someone else's feed """
response, data = self._upload_image(test_app, GOOD_JPG) response, data = self._upload_image(test_app, GOOD_JPG)
activity = { activity = {
"verb": "post", "verb": "post",
"object": data "object": data
} }
headers = { headers = {
"Content-Type": "application/json", "Content-Type": "application/json",
} }
with mock.patch("mediagoblin.decorators.oauth_required", with mock.patch("mediagoblin.decorators.oauth_required",
new_callable=self.mocked_oauth_required): new_callable=self.mocked_oauth_required):
with pytest.raises(AppError) as excinfo: with pytest.raises(AppError) as excinfo:
@ -162,7 +162,40 @@ class TestAPI(object):
json.dumps(activity), json.dumps(activity),
headers=headers headers=headers
) )
assert "403 FORBIDDEN" in excinfo.value.message
def test_only_able_to_update_own_image(self, test_app):
""" Test's that the uploader is the only person who can update an image """
response, data = self._upload_image(test_app, GOOD_JPG)
response, data = self._post_image_to_feed(test_app, data)
activity = {
"verb": "update",
"object": data["object"],
}
headers = {
"Content-Type": "application/json",
}
# Lets change the image uploader to be self.other_user, this is easier
# than uploading the image as someone else as the way self.mocked_oauth_required
# and self._upload_image.
media = MediaEntry.query.filter_by(id=data["object"]["id"]).first()
media.uploader = self.other_user.id
media.save()
# Now lets try and edit the image as self.user, this should produce a 403 error.
with mock.patch("mediagoblin.decorators.oauth_required",
new_callable=self.mocked_oauth_required):
with pytest.raises(AppError) as excinfo:
test_app.post(
"/api/user/{0}/feed".format(self.user.username),
json.dumps(activity),
headers=headers
)
assert "403 FORBIDDEN" in excinfo.value.message assert "403 FORBIDDEN" in excinfo.value.message
def test_upload_image_with_filename(self, test_app): def test_upload_image_with_filename(self, test_app):
@ -292,13 +325,13 @@ class TestAPI(object):
# Test that the response is what we should be given # Test that the response is what we should be given
assert comment.id == comment_data["object"]["id"] assert comment.id == comment_data["object"]["id"]
assert comment.content == comment_data["object"]["content"] assert comment.content == comment_data["object"]["content"]
def test_unable_to_post_comment_as_someone_else(self, test_app): def test_unable_to_post_comment_as_someone_else(self, test_app):
""" Tests that you're unable to post a comment as someone else. """ """ Tests that you're unable to post a comment as someone else. """
# Upload some media to comment on # Upload some media to comment on
response, data = self._upload_image(test_app, GOOD_JPG) response, data = self._upload_image(test_app, GOOD_JPG)
response, data = self._post_image_to_feed(test_app, data) response, data = self._post_image_to_feed(test_app, data)
activity = { activity = {
"verb": "post", "verb": "post",
"object": { "object": {
@ -307,11 +340,11 @@ class TestAPI(object):
"inReplyTo": data["object"], "inReplyTo": data["object"],
} }
} }
headers = { headers = {
"Content-Type": "application/json", "Content-Type": "application/json",
} }
with mock.patch("mediagoblin.decorators.oauth_required", with mock.patch("mediagoblin.decorators.oauth_required",
new_callable=self.mocked_oauth_required): new_callable=self.mocked_oauth_required):
with pytest.raises(AppError) as excinfo: with pytest.raises(AppError) as excinfo:
@ -320,9 +353,53 @@ class TestAPI(object):
json.dumps(activity), json.dumps(activity),
headers=headers headers=headers
) )
assert "403 FORBIDDEN" in excinfo.value.message assert "403 FORBIDDEN" in excinfo.value.message
def test_unable_to_update_someone_elses_comment(self, test_app):
""" Test that you're able to update someoen elses comment. """
# Upload some media to comment on
response, data = self._upload_image(test_app, GOOD_JPG)
response, data = self._post_image_to_feed(test_app, data)
activity = {
"verb": "post",
"object": {
"objectType": "comment",
"content": "comment commenty comment ^_^",
"inReplyTo": data["object"],
}
}
headers = {
"Content-Type": "application/json",
}
# Post the comment.
response, comment_data = self._activity_to_feed(test_app, activity)
# change who uploaded the comment as it's easier than changing
comment_id = comment_data["object"]["id"]
comment = MediaComment.query.filter_by(id=comment_id).first()
comment.author = self.other_user.id
# Update the comment as someone else.
comment_data["object"]["content"] = "Yep"
activity = {
"verb": "update",
"object": comment_data["object"]
}
with mock.patch("mediagoblin.decorators.oauth_required",
new_callable=self.mocked_oauth_required):
with pytest.raises(AppError) as excinfo:
test_app.post(
"/api/user/{0}/feed".format(self.user.username),
json.dumps(activity),
headers=headers
)
assert "403 FORBIDDEN" in excinfo.value.message
def test_profile(self, test_app): def test_profile(self, test_app):
""" Tests profile endpoint """ """ Tests profile endpoint """