Require uploader privileges to upload media to API
This commit is contained in:
parent
ee9956c3de
commit
967df5eff0
@ -1,4 +1,3 @@
|
||||
|
||||
import json
|
||||
import io
|
||||
import mimetypes
|
||||
@ -7,6 +6,7 @@ from werkzeug.datastructures import FileStorage
|
||||
|
||||
from mediagoblin.media_types import sniff_media
|
||||
from mediagoblin.decorators import oauth_required
|
||||
from mediagoblin.federation.decorators import user_has_privilege
|
||||
from mediagoblin.db.models import User, MediaEntry, MediaComment
|
||||
from mediagoblin.tools.response import redirect, json_response
|
||||
from mediagoblin.meddleware.csrf import csrf_exempt
|
||||
@ -46,6 +46,7 @@ def user(request):
|
||||
|
||||
@oauth_required
|
||||
@csrf_exempt
|
||||
@user_has_privilege(u'uploader')
|
||||
def uploads(request):
|
||||
""" Endpoint for file uploads """
|
||||
user = request.matchdict["username"]
|
||||
|
@ -19,21 +19,16 @@ import json
|
||||
import pytest
|
||||
import mock
|
||||
|
||||
from webtest import AppError
|
||||
|
||||
from mediagoblin import mg_globals
|
||||
from .resources import GOOD_JPG
|
||||
from mediagoblin.db.models import User
|
||||
from mediagoblin.tests.tools import fixture_add_user
|
||||
from mediagoblin.moderation.tools import take_away_privileges
|
||||
from .resources import GOOD_JPG, GOOD_PNG, EVIL_FILE, EVIL_JPG, EVIL_PNG, \
|
||||
BIG_BLUE
|
||||
|
||||
def mocked_oauth_required(*args, **kwargs):
|
||||
""" Mocks mediagoblin.decorator.oauth_required to always validate """
|
||||
|
||||
def oauth_required(controller):
|
||||
return controller
|
||||
|
||||
return oauth_required
|
||||
|
||||
class TestAPI(object):
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
@ -42,6 +37,18 @@ class TestAPI(object):
|
||||
self.db = mg_globals.database
|
||||
self.user = fixture_add_user(privileges=[u'active', u'uploader'])
|
||||
|
||||
def mocked_oauth_required(self, *args, **kwargs):
|
||||
""" Mocks mediagoblin.decorator.oauth_required to always validate """
|
||||
|
||||
def fake_controller(controller, request, *args, **kwargs):
|
||||
request.user = User.query.filter_by(id=self.user.id).first()
|
||||
return controller(request, *args, **kwargs)
|
||||
|
||||
def oauth_required(c):
|
||||
return lambda *args, **kwargs: fake_controller(c, *args, **kwargs)
|
||||
|
||||
return oauth_required
|
||||
|
||||
def test_can_post_image(self, test_app):
|
||||
""" Tests that an image can be posted to the API """
|
||||
# First request we need to do is to upload the image
|
||||
@ -52,7 +59,7 @@ class TestAPI(object):
|
||||
}
|
||||
|
||||
|
||||
with mock.patch("mediagoblin.decorators.oauth_required", new_callable=mocked_oauth_required):
|
||||
with mock.patch("mediagoblin.decorators.oauth_required", new_callable=self.mocked_oauth_required):
|
||||
response = test_app.post(
|
||||
"/api/user/{0}/uploads".format(self.user.username),
|
||||
data,
|
||||
@ -98,15 +105,13 @@ class TestAPI(object):
|
||||
"Content-Length": str(len(data)),
|
||||
}
|
||||
|
||||
with mock.patch("mediagoblin.decorators.oauth_required", new_callable=mocked_oauth_required):
|
||||
response = test_app.post(
|
||||
"/api/user/{0}/uploads".format(self.user.username),
|
||||
data,
|
||||
headers=headers
|
||||
)
|
||||
|
||||
error = json.loads(response.body)
|
||||
with mock.patch("mediagoblin.decorators.oauth_required", new_callable=self.mocked_oauth_required):
|
||||
with pytest.raises(AppError) as excinfo:
|
||||
response = test_app.post(
|
||||
"/api/user/{0}/uploads".format(self.user.username),
|
||||
data,
|
||||
headers=headers
|
||||
)
|
||||
|
||||
# Assert that we've got a 403
|
||||
assert response.status_code == 403
|
||||
assert "error" in error
|
||||
assert "403 FORBIDDEN" in excinfo.value.message
|
||||
|
@ -52,8 +52,8 @@ class TestOAuth(object):
|
||||
|
||||
def register_client(self, **kwargs):
|
||||
""" Regiters a client with the API """
|
||||
|
||||
kwargs["type"] = "client_associate"
|
||||
|
||||
kwargs["type"] = "client_associate"
|
||||
kwargs["application_type"] = kwargs.get("application_type", "native")
|
||||
return self.test_app.post("/api/client/register", kwargs)
|
||||
|
||||
@ -63,7 +63,7 @@ class TestOAuth(object):
|
||||
client_info = response.json
|
||||
|
||||
client = self.db.Client.query.filter_by(id=client_info["client_id"]).first()
|
||||
|
||||
|
||||
assert response.status_int == 200
|
||||
assert client is not None
|
||||
|
||||
@ -81,7 +81,7 @@ class TestOAuth(object):
|
||||
client_info = response.json
|
||||
|
||||
client = self.db.Client.query.filter_by(id=client_info["client_id"]).first()
|
||||
|
||||
|
||||
assert client is not None
|
||||
assert client.secret == client_info["client_secret"]
|
||||
assert client.application_type == query["application_type"]
|
||||
@ -163,4 +163,3 @@ class TestOAuth(object):
|
||||
assert request_token.client == client.id
|
||||
assert request_token.used == False
|
||||
assert request_token.callback == request_query["oauth_callback"]
|
||||
|
||||
|
@ -16,7 +16,9 @@
|
||||
|
||||
import json
|
||||
import logging
|
||||
from mediagoblin.db.models import User
|
||||
|
||||
from mediagoblin.db.models import User, AccessToken
|
||||
from mediagoblin.oauth.tools.request import decode_authorization_header
|
||||
|
||||
_log = logging.getLogger(__name__)
|
||||
|
||||
@ -31,6 +33,18 @@ def setup_user_in_request(request):
|
||||
Examine a request and tack on a request.user parameter if that's
|
||||
appropriate.
|
||||
"""
|
||||
# If API request the user will be associated with the access token
|
||||
authorization = decode_authorization_header(request.headers)
|
||||
|
||||
if authorization.get(u"access_token"):
|
||||
# Check authorization header.
|
||||
token = authorization[u"oauth_token"]
|
||||
token = AccessToken.query.filter_by(token=token).first()
|
||||
if token is not None:
|
||||
request.user = token.user
|
||||
return
|
||||
|
||||
|
||||
if 'user_id' not in request.session:
|
||||
request.user = None
|
||||
return
|
||||
@ -46,7 +60,7 @@ def setup_user_in_request(request):
|
||||
def decode_request(request):
|
||||
""" Decodes a request based on MIME-Type """
|
||||
data = request.data
|
||||
|
||||
|
||||
if request.content_type == json_encoded:
|
||||
data = json.loads(data)
|
||||
elif request.content_type == form_encoded or request.content_type == "":
|
||||
|
Loading…
x
Reference in New Issue
Block a user