Merge remote-tracking branch 'joar-github/oauth/refresh_tokens'
This merges the patch for Issue #548.
This commit is contained in:
@@ -71,7 +71,7 @@ class TestOAuth(object):
|
||||
assert response.status_int == 200
|
||||
|
||||
# Should display an error
|
||||
assert ctx['form'].redirect_uri.errors
|
||||
assert len(ctx['form'].redirect_uri.errors)
|
||||
|
||||
# Should not pass through
|
||||
assert not client
|
||||
@@ -79,12 +79,16 @@ class TestOAuth(object):
|
||||
def test_2_successful_public_client_registration(self, test_app):
|
||||
''' Successfully register a public client '''
|
||||
self._setup(test_app)
|
||||
uri = 'http://foo.example'
|
||||
self.register_client(test_app, u'OMGOMG', 'public', 'OMG!',
|
||||
'http://foo.example')
|
||||
uri)
|
||||
|
||||
client = self.db.OAuthClient.query.filter(
|
||||
self.db.OAuthClient.name == u'OMGOMG').first()
|
||||
|
||||
# redirect_uri should be set
|
||||
assert client.redirect_uri == uri
|
||||
|
||||
# Client should have been registered
|
||||
assert client
|
||||
|
||||
@@ -116,7 +120,7 @@ class TestOAuth(object):
|
||||
redirect_uri = 'https://foo.example'
|
||||
response = test_app.get('/oauth/authorize', {
|
||||
'client_id': client.identifier,
|
||||
'scope': 'admin',
|
||||
'scope': 'all',
|
||||
'redirect_uri': redirect_uri})
|
||||
|
||||
# User-agent should NOT be redirected
|
||||
@@ -142,6 +146,7 @@ class TestOAuth(object):
|
||||
return authorization_response, client_identifier
|
||||
|
||||
def get_code_from_redirect_uri(self, uri):
|
||||
''' Get the value of ?code= from an URI '''
|
||||
return parse_qs(urlparse(uri).query)['code'][0]
|
||||
|
||||
def test_token_endpoint_successful_confidential_request(self, test_app):
|
||||
@@ -170,6 +175,11 @@ code={1}&client_secret={2}'.format(client_id, code, client.secret))
|
||||
assert type(token_data['expires_in']) == int
|
||||
assert token_data['expires_in'] > 0
|
||||
|
||||
# There should be a refresh token provided in the token data
|
||||
assert len(token_data['refresh_token'])
|
||||
|
||||
return client_id, token_data
|
||||
|
||||
def test_token_endpont_missing_id_confidential_request(self, test_app):
|
||||
''' Unsuccessful request against token endpoint, missing client_id '''
|
||||
self._setup(test_app)
|
||||
@@ -192,4 +202,30 @@ code={0}&client_secret={1}'.format(code, client.secret))
|
||||
assert 'error' in token_data
|
||||
assert not 'access_token' in token_data
|
||||
assert token_data['error'] == 'invalid_request'
|
||||
assert token_data['error_description'] == 'Missing client_id in request'
|
||||
assert len(token_data['error_description'])
|
||||
|
||||
def test_refresh_token(self, test_app):
|
||||
''' Try to get a new access token using the refresh token '''
|
||||
# Get an access token and a refresh token
|
||||
client_id, token_data =\
|
||||
self.test_token_endpoint_successful_confidential_request(test_app)
|
||||
|
||||
client = self.db.OAuthClient.query.filter(
|
||||
self.db.OAuthClient.identifier == client_id).first()
|
||||
|
||||
token_res = test_app.get('/oauth/access_token',
|
||||
{'refresh_token': token_data['refresh_token'],
|
||||
'client_id': client_id,
|
||||
'client_secret': client.secret
|
||||
})
|
||||
|
||||
assert token_res.status_int == 200
|
||||
|
||||
new_token_data = json.loads(token_res.body)
|
||||
|
||||
assert not 'error' in new_token_data
|
||||
assert 'access_token' in new_token_data
|
||||
assert 'token_type' in new_token_data
|
||||
assert 'expires_in' in new_token_data
|
||||
assert type(new_token_data['expires_in']) == int
|
||||
assert new_token_data['expires_in'] > 0
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
|
||||
import sys
|
||||
import os
|
||||
import pkg_resources
|
||||
import shutil
|
||||
@@ -28,7 +29,6 @@ from mediagoblin import mg_globals
|
||||
from mediagoblin.db.models import User, MediaEntry, Collection
|
||||
from mediagoblin.tools import testing
|
||||
from mediagoblin.init.config import read_mediagoblin_config
|
||||
from mediagoblin.db.open import setup_connection_and_db_from_config
|
||||
from mediagoblin.db.base import Session
|
||||
from mediagoblin.meddleware import BaseMeddleware
|
||||
from mediagoblin.auth.lib import bcrypt_gen_password_hash
|
||||
@@ -50,7 +50,9 @@ USER_DEV_DIRECTORIES_TO_SETUP = ['media/public', 'media/queue']
|
||||
BAD_CELERY_MESSAGE = """\
|
||||
Sorry, you *absolutely* must run tests with the
|
||||
mediagoblin.init.celery.from_tests module. Like so:
|
||||
$ CELERY_CONFIG_MODULE=mediagoblin.init.celery.from_tests ./bin/py.test"""
|
||||
|
||||
$ CELERY_CONFIG_MODULE=mediagoblin.init.celery.from_tests {0}
|
||||
""".format(sys.argv[0])
|
||||
|
||||
|
||||
class BadCeleryEnviron(Exception): pass
|
||||
@@ -230,7 +232,7 @@ def fixture_media_entry(title=u"Some title", slug=None,
|
||||
entry.slug = slug
|
||||
entry.uploader = uploader or fixture_add_user().id
|
||||
entry.media_type = u'image'
|
||||
|
||||
|
||||
if gen_slug:
|
||||
entry.generate_slug()
|
||||
if save:
|
||||
|
||||
Reference in New Issue
Block a user