
There was a recent change in 0f3bf8d where several names which were used to refer the actor where renamed to the standard "actor" property name. There were some places in the OAuth decorator and the OAuth views where the old names were still referred. This fixes that.
343 lines
12 KiB
Python
343 lines
12 KiB
Python
# GNU MediaGoblin -- federated, autonomous media hosting
|
|
# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import datetime
|
|
import urllib
|
|
|
|
import six
|
|
|
|
from oauthlib.oauth1.rfc5849.utils import UNICODE_ASCII_CHARACTER_SET
|
|
from oauthlib.oauth1 import (RequestTokenEndpoint, AuthorizationEndpoint,
|
|
AccessTokenEndpoint)
|
|
|
|
from mediagoblin.decorators import require_active_login
|
|
from mediagoblin.tools.translate import pass_to_ugettext
|
|
from mediagoblin.meddleware.csrf import csrf_exempt
|
|
from mediagoblin.tools.request import decode_request
|
|
from mediagoblin.tools.response import (render_to_response, redirect,
|
|
json_response, render_400,
|
|
form_response)
|
|
from mediagoblin.tools.crypto import random_string
|
|
from mediagoblin.tools.validator import validate_email, validate_url
|
|
from mediagoblin.oauth.forms import AuthorizeForm
|
|
from mediagoblin.oauth.oauth import GMGRequestValidator, GMGRequest
|
|
from mediagoblin.oauth.tools.request import decode_authorization_header
|
|
from mediagoblin.oauth.tools.forms import WTFormData
|
|
from mediagoblin.db.models import NonceTimestamp, Client, RequestToken
|
|
|
|
# possible client types
|
|
CLIENT_TYPES = ["web", "native"] # currently what pump supports
|
|
|
|
@csrf_exempt
|
|
def client_register(request):
|
|
""" Endpoint for client registration """
|
|
try:
|
|
data = decode_request(request)
|
|
except ValueError:
|
|
error = "Could not decode data."
|
|
return json_response({"error": error}, status=400)
|
|
|
|
if data is "":
|
|
error = "Unknown Content-Type"
|
|
return json_response({"error": error}, status=400)
|
|
|
|
if "type" not in data:
|
|
error = "No registration type provided."
|
|
return json_response({"error": error}, status=400)
|
|
if data.get("application_type", None) not in CLIENT_TYPES:
|
|
error = "Unknown application_type."
|
|
return json_response({"error": error}, status=400)
|
|
|
|
client_type = data["type"]
|
|
|
|
if client_type == "client_update":
|
|
# updating a client
|
|
if "client_id" not in data:
|
|
error = "client_id is requried to update."
|
|
return json_response({"error": error}, status=400)
|
|
elif "client_secret" not in data:
|
|
error = "client_secret is required to update."
|
|
return json_response({"error": error}, status=400)
|
|
|
|
client = Client.query.filter_by(
|
|
id=data["client_id"],
|
|
secret=data["client_secret"]
|
|
).first()
|
|
|
|
if client is None:
|
|
error = "Unauthorized."
|
|
return json_response({"error": error}, status=403)
|
|
|
|
client.application_name = data.get(
|
|
"application_name",
|
|
client.application_name
|
|
)
|
|
|
|
client.application_type = data.get(
|
|
"application_type",
|
|
client.application_type
|
|
)
|
|
|
|
app_name = ("application_type", client.application_name)
|
|
if app_name in CLIENT_TYPES:
|
|
client.application_name = app_name
|
|
|
|
elif client_type == "client_associate":
|
|
# registering
|
|
if "client_id" in data:
|
|
error = "Only set client_id for update."
|
|
return json_response({"error": error}, status=400)
|
|
elif "access_token" in data:
|
|
error = "access_token not needed for registration."
|
|
return json_response({"error": error}, status=400)
|
|
elif "client_secret" in data:
|
|
error = "Only set client_secret for update."
|
|
return json_response({"error": error}, status=400)
|
|
|
|
# generate the client_id and client_secret
|
|
client_id = random_string(22, UNICODE_ASCII_CHARACTER_SET)
|
|
client_secret = random_string(43, UNICODE_ASCII_CHARACTER_SET)
|
|
expirey = 0 # for now, lets not have it expire
|
|
expirey_db = None if expirey == 0 else expirey
|
|
application_type = data["application_type"]
|
|
|
|
# save it
|
|
client = Client(
|
|
id=client_id,
|
|
secret=client_secret,
|
|
expirey=expirey_db,
|
|
application_type=application_type,
|
|
)
|
|
|
|
else:
|
|
error = "Invalid registration type"
|
|
return json_response({"error": error}, status=400)
|
|
|
|
logo_uri = data.get("logo_uri", client.logo_url)
|
|
if logo_uri is not None and not validate_url(logo_uri):
|
|
error = "Logo URI {0} is not a valid URI.".format(logo_uri)
|
|
return json_response(
|
|
{"error": error},
|
|
status=400
|
|
)
|
|
else:
|
|
client.logo_url = logo_uri
|
|
|
|
client.application_name = data.get("application_name", None)
|
|
|
|
contacts = data.get("contacts", None)
|
|
if contacts is not None:
|
|
if not isinstance(contacts, six.text_type):
|
|
error = "Contacts must be a string of space-seporated email addresses."
|
|
return json_response({"error": error}, status=400)
|
|
|
|
contacts = contacts.split()
|
|
for contact in contacts:
|
|
if not validate_email(contact):
|
|
# not a valid email
|
|
error = "Email {0} is not a valid email.".format(contact)
|
|
return json_response({"error": error}, status=400)
|
|
|
|
|
|
client.contacts = contacts
|
|
|
|
redirect_uris = data.get("redirect_uris", None)
|
|
if redirect_uris is not None:
|
|
if not isinstance(redirect_uris, six.text_type):
|
|
error = "redirect_uris must be space-seporated URLs."
|
|
return json_response({"error": error}, status=400)
|
|
|
|
redirect_uris = redirect_uris.split()
|
|
|
|
for uri in redirect_uris:
|
|
if not validate_url(uri):
|
|
# not a valid uri
|
|
error = "URI {0} is not a valid URI".format(uri)
|
|
return json_response({"error": error}, status=400)
|
|
|
|
client.redirect_uri = redirect_uris
|
|
|
|
|
|
client.save()
|
|
|
|
expirey = 0 if client.expirey is None else client.expirey
|
|
|
|
return json_response(
|
|
{
|
|
"client_id": client.id,
|
|
"client_secret": client.secret,
|
|
"expires_at": expirey,
|
|
})
|
|
|
|
@csrf_exempt
|
|
def request_token(request):
|
|
""" Returns request token """
|
|
try:
|
|
data = decode_request(request)
|
|
except ValueError:
|
|
error = "Could not decode data."
|
|
return json_response({"error": error}, status=400)
|
|
|
|
if not data and request.headers:
|
|
data = request.headers
|
|
|
|
data = dict(data) # mutableifying
|
|
|
|
authorization = decode_authorization_header(data)
|
|
|
|
if authorization == dict() or u"oauth_consumer_key" not in authorization:
|
|
error = "Missing required parameter."
|
|
return json_response({"error": error}, status=400)
|
|
|
|
# check the client_id
|
|
client_id = authorization[u"oauth_consumer_key"]
|
|
client = Client.query.filter_by(id=client_id).first()
|
|
|
|
if client == None:
|
|
# client_id is invalid
|
|
error = "Invalid client_id"
|
|
return json_response({"error": error}, status=400)
|
|
|
|
# make request token and return to client
|
|
request_validator = GMGRequestValidator(authorization)
|
|
rv = RequestTokenEndpoint(request_validator)
|
|
tokens = rv.create_request_token(request, authorization)
|
|
|
|
# store the nonce & timestamp before we return back
|
|
nonce = authorization[u"oauth_nonce"]
|
|
timestamp = authorization[u"oauth_timestamp"]
|
|
timestamp = datetime.datetime.fromtimestamp(float(timestamp))
|
|
|
|
nc = NonceTimestamp(nonce=nonce, timestamp=timestamp)
|
|
nc.save()
|
|
|
|
return form_response(tokens)
|
|
|
|
@require_active_login
|
|
def authorize(request):
|
|
""" Displays a page for user to authorize """
|
|
if request.method == "POST":
|
|
return authorize_finish(request)
|
|
|
|
_ = pass_to_ugettext
|
|
token = request.args.get("oauth_token", None)
|
|
if token is None:
|
|
# no token supplied, display a html 400 this time
|
|
err_msg = _("Must provide an oauth_token.")
|
|
return render_400(request, err_msg=err_msg)
|
|
|
|
oauth_request = RequestToken.query.filter_by(token=token).first()
|
|
if oauth_request is None:
|
|
err_msg = _("No request token found.")
|
|
return render_400(request, err_msg)
|
|
|
|
if oauth_request.used:
|
|
return authorize_finish(request)
|
|
|
|
if oauth_request.verifier is None:
|
|
orequest = GMGRequest(request)
|
|
orequest.resource_owner_key = token
|
|
request_validator = GMGRequestValidator()
|
|
auth_endpoint = AuthorizationEndpoint(request_validator)
|
|
verifier = auth_endpoint.create_verifier(orequest, {})
|
|
oauth_request.verifier = verifier["oauth_verifier"]
|
|
|
|
oauth_request.actor = request.user.id
|
|
oauth_request.save()
|
|
|
|
# find client & build context
|
|
client = Client.query.filter_by(id=oauth_request.client).first()
|
|
|
|
authorize_form = AuthorizeForm(WTFormData({
|
|
"oauth_token": oauth_request.token,
|
|
"oauth_verifier": oauth_request.verifier
|
|
}))
|
|
|
|
context = {
|
|
"user": request.user,
|
|
"oauth_request": oauth_request,
|
|
"client": client,
|
|
"authorize_form": authorize_form,
|
|
}
|
|
|
|
|
|
# AuthorizationEndpoint
|
|
return render_to_response(
|
|
request,
|
|
"mediagoblin/api/authorize.html",
|
|
context
|
|
)
|
|
|
|
|
|
def authorize_finish(request):
|
|
""" Finishes the authorize """
|
|
_ = pass_to_ugettext
|
|
token = request.form["oauth_token"]
|
|
verifier = request.form["oauth_verifier"]
|
|
oauth_request = RequestToken.query.filter_by(token=token, verifier=verifier)
|
|
oauth_request = oauth_request.first()
|
|
|
|
if oauth_request is None:
|
|
# invalid token or verifier
|
|
err_msg = _("No request token found.")
|
|
return render_400(request, err_msg)
|
|
|
|
oauth_request.used = True
|
|
oauth_request.updated = datetime.datetime.now()
|
|
oauth_request.save()
|
|
|
|
if oauth_request.callback == "oob":
|
|
# out of bounds
|
|
context = {"oauth_request": oauth_request}
|
|
return render_to_response(
|
|
request,
|
|
"mediagoblin/api/oob.html",
|
|
context
|
|
)
|
|
|
|
# okay we need to redirect them then!
|
|
querystring = "?oauth_token={0}&oauth_verifier={1}".format(
|
|
oauth_request.token,
|
|
oauth_request.verifier
|
|
)
|
|
|
|
# It's come from the OAuth headers so it'll be encoded.
|
|
redirect_url = urllib.unquote(oauth_request.callback)
|
|
|
|
return redirect(
|
|
request,
|
|
querystring=querystring,
|
|
location=redirect_url
|
|
)
|
|
|
|
@csrf_exempt
|
|
def access_token(request):
|
|
""" Provides an access token based on a valid verifier and request token """
|
|
data = request.headers
|
|
|
|
parsed_tokens = decode_authorization_header(data)
|
|
|
|
if parsed_tokens == dict() or "oauth_token" not in parsed_tokens:
|
|
error = "Missing required parameter."
|
|
return json_response({"error": error}, status=400)
|
|
|
|
request.resource_owner_key = parsed_tokens["oauth_consumer_key"]
|
|
request.oauth_token = parsed_tokens["oauth_token"]
|
|
request_validator = GMGRequestValidator(data)
|
|
av = AccessTokenEndpoint(request_validator)
|
|
tokens = av.create_access_token(request, {})
|
|
return form_response(tokens)
|