Adds test for request_tokens

This commit is contained in:
xray7224 2013-07-18 19:15:05 +01:00
parent 86ba416883
commit 89d5b44e0a
4 changed files with 71 additions and 20 deletions

View File

@ -26,8 +26,9 @@ class GMGRequestValidator(RequestValidator):
enforce_ssl = False
def __init__(self, data=None):
def __init__(self, data=None, *args, **kwargs):
self.POST = data
super(GMGRequestValidator, self).__init__(*args, **kwargs)
def save_request_token(self, token, request):
""" Saves request token in db """
@ -38,7 +39,8 @@ class GMGRequestValidator(RequestValidator):
secret=token["oauth_token_secret"],
)
request_token.client = client_id
request_token.callback = token.get("oauth_callback", None)
if u"oauth_callback" in self.POST:
request_token.callback = self.POST[u"oauth_callback"]
request_token.save()
def save_verifier(self, token, verifier, request):

View File

@ -14,14 +14,19 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import re
# Regex for parsing Authorization string
auth_header_re = re.compile('(\w+)[:=] ?"?(\w+)"?')
def decode_authorization_header(header):
""" Decodes a HTTP Authorization Header to python dictionary """
authorization = header.get("Authorization", "")
tokens = dict(auth_header_re.findall(authorization))
authorization = header.get("Authorization", "").lstrip(" ").lstrip("OAuth")
tokens = {}
for param in authorization.split(","):
key, value = param.split("=")
key = key.lstrip(" ")
value = value.lstrip(" ").lstrip('"')
value = value.rstrip(" ").rstrip('"')
tokens[key] = value
return tokens

View File

@ -198,7 +198,6 @@ def request_token(request):
authorization = decode_authorization_header(data)
if authorization == dict() or u"oauth_consumer_key" not in authorization:
error = "Missing required parameter."
return json_response({"error": error}, status=400)
@ -206,12 +205,13 @@ def request_token(request):
# check the client_id
client_id = authorization[u"oauth_consumer_key"]
client = Client.query.filter_by(id=client_id).first()
if client is None:
if client == None:
# client_id is invalid
error = "Invalid client_id"
return json_response({"error": error}, status=400)
# make request token and return to client
# make request token and return to client
request_validator = GMGRequestValidator(authorization)
rv = RequestTokenEndpoint(request_validator)
tokens = rv.create_request_token(request, authorization)
@ -219,7 +219,7 @@ def request_token(request):
# store the nonce & timestamp before we return back
nonce = authorization[u"oauth_nonce"]
timestamp = authorization[u"oauth_timestamp"]
timestamp = datetime.datetime.fromtimestamp(int(timestamp))
timestamp = datetime.datetime.fromtimestamp(float(timestamp))
nc = NonceTimestamp(nonce=nonce, timestamp=timestamp)
nc.save()

View File

@ -14,17 +14,23 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import cgi
import pytest
from urlparse import parse_qs, urlparse
from oauthlib.oauth1 import Client
from mediagoblin import mg_globals
from mediagoblin.tools import template, pluginapi
from mediagoblin.tests.tools import fixture_add_user
class TestOAuth(object):
MIME_FORM = "application/x-www-form-urlencoded"
MIME_JSON = "application/json"
@pytest.fixture(autouse=True)
def setup(self, test_app):
self.test_app = test_app
@ -54,7 +60,7 @@ class TestOAuth(object):
def test_client_client_register_limited_info(self):
""" Tests that a client can be registered with limited information """
response = self.register_client()
client_info = json.loads(response.body)
client_info = response.json
client = self.db.Client.query.filter_by(id=client_info["client_id"]).first()
@ -72,7 +78,7 @@ class TestOAuth(object):
}
response = self.register_client(**query)
client_info = json.loads(response.body)
client_info = response.json
client = self.db.Client.query.filter_by(id=client_info["client_id"]).first()
@ -89,7 +95,7 @@ class TestOAuth(object):
# first we need to register a client
response = self.register_client()
client_info = json.loads(response.body)
client_info = response.json
client = self.db.Client.query.filter_by(id=client_info["client_id"]).first()
# Now update
@ -105,8 +111,8 @@ class TestOAuth(object):
update_response = self.register_client(**update_query)
assert update_response.status_int == 200
client_info = json.loads(update_response.body)
client = self.Client.query.filter_by(id=client_info["client_id"]).first()
client_info = update_response.json
client = self.db.Client.query.filter_by(id=client_info["client_id"]).first()
assert client.secret == client_info["client_secret"]
assert client.application_type == update_query["application_type"]
@ -115,8 +121,46 @@ class TestOAuth(object):
assert client.logo_url == update_query["logo_url"]
assert client.redirect_uri == update_query["redirect_uris"].split()
def request_token(self):
def to_authorize_headers(self, data):
headers = ""
for key, value in data.items():
headers += '{0}="{1}",'.format(key, value)
return {"Authorization": "OAuth " + headers[:-1]}
def test_request_token(self):
""" Test a request for a request token """
response = self.register_client()
client_id = response.json["client_id"]
endpoint = "/oauth/request_token"
request_query = {
"oauth_consumer_key": client_id,
"oauth_nonce": "abcdefghij",
"oauth_timestamp": 123456789.0,
"oauth_callback": "https://some.url/callback",
}
headers = self.to_authorize_headers(request_query)
headers["Content-Type"] = self.MIME_FORM
response = self.test_app.post(endpoint, headers=headers)
response = cgi.parse_qs(response.body)
# each element is a list, reduce it to a string
for key, value in response.items():
response[key] = value[0]
request_token = self.db.RequestToken.query.filter_by(
token=response["oauth_token"]
).first()
client = self.db.Client.query.filter_by(id=client_id).first()
assert request_token is not None
assert request_token.secret == response["oauth_token_secret"]
assert request_token.client == client.id
assert request_token.used == False
assert request_token.callback == request_query["oauth_callback"]