Remove deprecated oauth 2 plugin
This commit is contained in:
@@ -1,85 +0,0 @@
|
||||
# GNU MediaGoblin -- federated, autonomous media hosting
|
||||
# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import json
|
||||
|
||||
import pytest
|
||||
import six
|
||||
|
||||
from six.moves.urllib.parse import parse_qs, urlparse
|
||||
|
||||
from mediagoblin import mg_globals
|
||||
from mediagoblin.tools import processing
|
||||
from mediagoblin.tests.tools import fixture_add_user
|
||||
from mediagoblin.tests.test_submission import GOOD_PNG
|
||||
from mediagoblin.tests import test_oauth2 as oauth
|
||||
|
||||
|
||||
class TestHTTPCallback(object):
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup(self, test_app):
|
||||
self.test_app = test_app
|
||||
|
||||
self.db = mg_globals.database
|
||||
|
||||
self.user_password = u'secret'
|
||||
self.user = fixture_add_user(u'call_back', self.user_password)
|
||||
|
||||
self.login()
|
||||
|
||||
def login(self):
|
||||
self.test_app.post('/auth/login/', {
|
||||
'username': self.user.username,
|
||||
'password': self.user_password})
|
||||
|
||||
def get_access_token(self, client_id, client_secret, code):
|
||||
response = self.test_app.get('/oauth-2/access_token', {
|
||||
'code': code,
|
||||
'client_id': client_id,
|
||||
'client_secret': client_secret})
|
||||
|
||||
response_data = json.loads(response.body.decode())
|
||||
|
||||
return response_data['access_token']
|
||||
|
||||
def test_callback(self):
|
||||
''' Test processing HTTP callback '''
|
||||
self.oauth = oauth.TestOAuth()
|
||||
self.oauth.setup(self.test_app)
|
||||
|
||||
redirect, client_id = self.oauth.test_4_authorize_confidential_client()
|
||||
|
||||
code = parse_qs(urlparse(redirect.location).query)['code'][0]
|
||||
|
||||
client = self.db.OAuthClient.query.filter(
|
||||
self.db.OAuthClient.identifier == six.text_type(client_id)).first()
|
||||
|
||||
client_secret = client.secret
|
||||
|
||||
access_token = self.get_access_token(client_id, client_secret, code)
|
||||
|
||||
callback_url = 'https://foo.example?secrettestmediagoblinparam'
|
||||
|
||||
self.test_app.post('/api/submit?client_id={0}&access_token={1}\
|
||||
&client_secret={2}'.format(
|
||||
client_id,
|
||||
access_token,
|
||||
client_secret), {
|
||||
'title': 'Test',
|
||||
'callback_url': callback_url},
|
||||
upload_files=[('file', GOOD_PNG)])
|
||||
|
||||
assert processing.TESTS_CALLBACKS[callback_url]['state'] == u'processed'
|
||||
@@ -31,7 +31,6 @@ BROKER_URL = "sqlite:///%(here)s/test_user_dev/kombu.db"
|
||||
|
||||
[plugins]
|
||||
[[mediagoblin.plugins.api]]
|
||||
[[mediagoblin.plugins.oauth]]
|
||||
[[mediagoblin.plugins.httpapiauth]]
|
||||
[[mediagoblin.plugins.piwigo]]
|
||||
[[mediagoblin.plugins.basic_auth]]
|
||||
|
||||
@@ -1,225 +0,0 @@
|
||||
# GNU MediaGoblin -- federated, autonomous media hosting
|
||||
# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import json
|
||||
import logging
|
||||
|
||||
import pytest
|
||||
import six
|
||||
|
||||
from six.moves.urllib.parse import parse_qs, urlparse
|
||||
|
||||
from mediagoblin import mg_globals
|
||||
from mediagoblin.tools import template, pluginapi
|
||||
from mediagoblin.tests.tools import fixture_add_user
|
||||
|
||||
|
||||
_log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TestOAuth(object):
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup(self, test_app):
|
||||
self.test_app = test_app
|
||||
|
||||
self.db = mg_globals.database
|
||||
|
||||
self.pman = pluginapi.PluginManager()
|
||||
|
||||
self.user_password = u'4cc355_70k3N'
|
||||
self.user = fixture_add_user(u'joauth', self.user_password,
|
||||
privileges=[u'active'])
|
||||
|
||||
self.login()
|
||||
|
||||
def login(self):
|
||||
self.test_app.post(
|
||||
'/auth/login/', {
|
||||
'username': self.user.username,
|
||||
'password': self.user_password})
|
||||
|
||||
def register_client(self, name, client_type, description=None,
|
||||
redirect_uri=''):
|
||||
return self.test_app.post(
|
||||
'/oauth-2/client/register', {
|
||||
'name': name,
|
||||
'description': description,
|
||||
'type': client_type,
|
||||
'redirect_uri': redirect_uri})
|
||||
|
||||
def get_context(self, template_name):
|
||||
return template.TEMPLATE_TEST_CONTEXT[template_name]
|
||||
|
||||
def test_1_public_client_registration_without_redirect_uri(self):
|
||||
''' Test 'public' OAuth client registration without any redirect uri '''
|
||||
response = self.register_client(
|
||||
u'OMGOMGOMG', 'public', 'OMGOMG Apache License v2')
|
||||
|
||||
ctx = self.get_context('oauth/client/register.html')
|
||||
|
||||
client = self.db.OAuthClient.query.filter(
|
||||
self.db.OAuthClient.name == u'OMGOMGOMG').first()
|
||||
|
||||
assert response.status_int == 200
|
||||
|
||||
# Should display an error
|
||||
assert len(ctx['form'].redirect_uri.errors)
|
||||
|
||||
# Should not pass through
|
||||
assert not client
|
||||
|
||||
def test_2_successful_public_client_registration(self):
|
||||
''' Successfully register a public client '''
|
||||
uri = 'http://foo.example'
|
||||
self.register_client(
|
||||
u'OMGOMG', 'public', 'OMG!', uri)
|
||||
|
||||
client = self.db.OAuthClient.query.filter(
|
||||
self.db.OAuthClient.name == u'OMGOMG').first()
|
||||
|
||||
# redirect_uri should be set
|
||||
assert client.redirect_uri == uri
|
||||
|
||||
# Client should have been registered
|
||||
assert client
|
||||
|
||||
def test_3_successful_confidential_client_reg(self):
|
||||
''' Register a confidential OAuth client '''
|
||||
response = self.register_client(
|
||||
u'GMOGMO', 'confidential', 'NO GMO!')
|
||||
|
||||
assert response.status_int == 302
|
||||
|
||||
client = self.db.OAuthClient.query.filter(
|
||||
self.db.OAuthClient.name == u'GMOGMO').first()
|
||||
|
||||
# Client should have been registered
|
||||
assert client
|
||||
|
||||
return client
|
||||
|
||||
def test_4_authorize_confidential_client(self):
|
||||
''' Authorize a confidential client as a logged in user '''
|
||||
client = self.test_3_successful_confidential_client_reg()
|
||||
|
||||
client_identifier = client.identifier
|
||||
|
||||
redirect_uri = 'https://foo.example'
|
||||
response = self.test_app.get('/oauth-2/authorize', {
|
||||
'client_id': client.identifier,
|
||||
'scope': 'all',
|
||||
'redirect_uri': redirect_uri})
|
||||
|
||||
# User-agent should NOT be redirected
|
||||
assert response.status_int == 200
|
||||
|
||||
ctx = self.get_context('oauth/authorize.html')
|
||||
|
||||
form = ctx['form']
|
||||
|
||||
# Short for client authorization post reponse
|
||||
capr = self.test_app.post(
|
||||
'/oauth-2/client/authorize', {
|
||||
'client_id': form.client_id.data,
|
||||
'allow': 'Allow',
|
||||
'next': form.next.data})
|
||||
|
||||
assert capr.status_int == 302
|
||||
|
||||
authorization_response = capr.follow()
|
||||
|
||||
assert authorization_response.location.startswith(redirect_uri)
|
||||
|
||||
return authorization_response, client_identifier
|
||||
|
||||
def get_code_from_redirect_uri(self, uri):
|
||||
''' Get the value of ?code= from an URI '''
|
||||
return parse_qs(urlparse(uri).query)['code'][0]
|
||||
|
||||
def test_token_endpoint_successful_confidential_request(self):
|
||||
''' Successful request against token endpoint '''
|
||||
code_redirect, client_id = self.test_4_authorize_confidential_client()
|
||||
|
||||
code = self.get_code_from_redirect_uri(code_redirect.location)
|
||||
|
||||
client = self.db.OAuthClient.query.filter(
|
||||
self.db.OAuthClient.identifier == six.text_type(client_id)).first()
|
||||
|
||||
token_res = self.test_app.get('/oauth-2/access_token?client_id={0}&\
|
||||
code={1}&client_secret={2}'.format(client_id, code, client.secret))
|
||||
|
||||
assert token_res.status_int == 200
|
||||
|
||||
token_data = json.loads(token_res.body.decode())
|
||||
|
||||
assert not 'error' in token_data
|
||||
assert 'access_token' in token_data
|
||||
assert 'token_type' in token_data
|
||||
assert 'expires_in' in token_data
|
||||
assert type(token_data['expires_in']) == int
|
||||
assert token_data['expires_in'] > 0
|
||||
|
||||
# There should be a refresh token provided in the token data
|
||||
assert len(token_data['refresh_token'])
|
||||
|
||||
return client_id, token_data
|
||||
|
||||
def test_token_endpont_missing_id_confidential_request(self):
|
||||
''' Unsuccessful request against token endpoint, missing client_id '''
|
||||
code_redirect, client_id = self.test_4_authorize_confidential_client()
|
||||
|
||||
code = self.get_code_from_redirect_uri(code_redirect.location)
|
||||
|
||||
client = self.db.OAuthClient.query.filter(
|
||||
self.db.OAuthClient.identifier == six.text_type(client_id)).first()
|
||||
|
||||
token_res = self.test_app.get('/oauth-2/access_token?\
|
||||
code={0}&client_secret={1}'.format(code, client.secret))
|
||||
|
||||
assert token_res.status_int == 200
|
||||
|
||||
token_data = json.loads(token_res.body.decode())
|
||||
|
||||
assert 'error' in token_data
|
||||
assert not 'access_token' in token_data
|
||||
assert token_data['error'] == 'invalid_request'
|
||||
assert len(token_data['error_description'])
|
||||
|
||||
def test_refresh_token(self):
|
||||
''' Try to get a new access token using the refresh token '''
|
||||
# Get an access token and a refresh token
|
||||
client_id, token_data =\
|
||||
self.test_token_endpoint_successful_confidential_request()
|
||||
|
||||
client = self.db.OAuthClient.query.filter(
|
||||
self.db.OAuthClient.identifier == client_id).first()
|
||||
|
||||
token_res = self.test_app.get('/oauth-2/access_token',
|
||||
{'refresh_token': token_data['refresh_token'],
|
||||
'client_id': client_id,
|
||||
'client_secret': client.secret
|
||||
})
|
||||
|
||||
assert token_res.status_int == 200
|
||||
|
||||
new_token_data = json.loads(token_res.body.decode())
|
||||
|
||||
assert not 'error' in new_token_data
|
||||
assert 'access_token' in new_token_data
|
||||
assert 'token_type' in new_token_data
|
||||
assert 'expires_in' in new_token_data
|
||||
assert type(new_token_data['expires_in']) == int
|
||||
assert new_token_data['expires_in'] > 0
|
||||
Reference in New Issue
Block a user