merge --squash openid branch to take care of a false merge commit in the

basic_auth branch that openid is forked from

Commits squashed together (in reverse chronological order):
 - do the label thing only for boolean fields
 - made edit_account to autofocus on the first field
 - added feature to render_divs where if field.label == '' then it
   will render form.description the same a render_label
 - added allow_registration check
 - refactored create_user
 - removed verification_key from create_user
 - removed get_user from openid
 - cleanup after removing openid from template_env.globals
 - fix for werkzueg 0.9.1
 - cleanup after merge
 - more tests
 - restored openid extra_validation just for safety
 - tests for openid
 - deleted openid extra_validation
 - passed next parameter in session for openid
 - fixed a bug that was deleting the messages
 - implemented openid store using sqlalchemy
 - ask openid provider for 'nickname' to prefill username in registration form
 - refactored delete openid url to work with generic urls such as
   google and to not allow a user to delete a url if it is there only
   one and they don't have a pw
 - refactored login to register user workflow, which fixed a problem
   where the 'or register with a password link' wasn't showing up when
   the finish_login view called the register view because there wasn't
   any redirect.
 - added the ability to remove openid's
 - added the ability to add openids to an existing account
 - refactored start_login and finish_login views
 - modified edit_account.html to use render_divs
 - modified gmg/edit/views to behave appropriatly if no password
   authentication is enabled. moved the update email stuff to it's own
   funtion to make edit_account view cleaner. edit_account now
   modifies the form depending on the plugins.
 - minor typos
 - added retrieving email from openid provider
 - moved allow_registration check to a decorator
 - moved check if auth is enabled to a decorator
 - changed openid user registration to go through login first
 - cleanup after merge
 - modified verification emails to use itsdangerous tokens
 - added error handling on bad token, fixed route, and added tests
 - added support for user to change email address
 - added link to login view openid/password in login template
 - updated openid get_user function
 - modified get_user function to take kwargs instead of username
 - no need for user might be email kwarg in check_login_simple
 - added gen_password_hash and check_password functions to auth/__init__
 - added focus to form input
 - made imports fully qualified
 - modified basic_auth.check_login to check that the user has a pw_hash first
 - changed occurances of form.data['whatever'] to form.whatever.data
 - convert tabs to spaces in register template, remove unsed
   templates, and fixed trans tags in templates
 - in process of openid login. it works, but needs major imporvements
 - make password field required in basic_auth form
 - check if password field present in basic_auth create_user
 - modified openid create_user function
 - modified models based on Elronds suggestions
 - changed register form action to a variable to be passed in by the
   view using the template
 - openid plugin v0, still need to authenticate via openid.
 - added a register_user function to be able to use in a plugin's
   register view, and modified auth/views.register to redirect to
   openid/register if appropriate.
 - Modified basic_auth plugin to work with modified auth plugin
   hooks. Added context variables. Removed basic_auth/tools which was
   previously renamed to basic_auth/lib.
 - modified auth/__init__ hooks to work better with multiple
   plugins. Removed auth/lib.py. And added a basic_extra_verification
   function that all plugins will use.
 - added models and migrations for openid plugin
This commit is contained in:
Rodney Ewing 2013-06-26 11:20:50 -07:00 committed by Christopher Allan Webber
parent ac0bc6a1e1
commit 5adb906a0a
24 changed files with 1506 additions and 60 deletions

View File

@ -116,6 +116,7 @@ def send_fp_verification_email(user, request):
""" """
fp_verification_key = get_timed_signer_url('mail_verification_token') \ fp_verification_key = get_timed_signer_url('mail_verification_token') \
.dumps(user.id) .dumps(user.id)
rendered_email = render_template( rendered_email = render_template(
request, 'mediagoblin/auth/fp_verification_email.txt', request, 'mediagoblin/auth/fp_verification_email.txt',
{'username': user.username, {'username': user.username,
@ -199,3 +200,11 @@ def no_auth_logout(request):
if not mg_globals.app.auth and 'user_id' in request.session: if not mg_globals.app.auth and 'user_id' in request.session:
del request.session['user_id'] del request.session['user_id']
request.session.save() request.session.save()
def create_basic_user(form):
user = User()
user.username = form.username.data
user.email = form.email.data
user.save()
return user

View File

@ -14,12 +14,12 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import uuid
from itsdangerous import BadSignature from itsdangerous import BadSignature
from mediagoblin import messages, mg_globals from mediagoblin import messages, mg_globals
from mediagoblin.db.models import User from mediagoblin.db.models import User
from mediagoblin.tools.crypto import get_timed_signer_url from mediagoblin.tools.crypto import get_timed_signer_url
from mediagoblin.decorators import auth_enabled, allow_registration
from mediagoblin.tools.response import render_to_response, redirect, render_404 from mediagoblin.tools.response import render_to_response, redirect, render_404
from mediagoblin.tools.translate import pass_to_ugettext as _ from mediagoblin.tools.translate import pass_to_ugettext as _
from mediagoblin.tools.mail import email_debug_message from mediagoblin.tools.mail import email_debug_message
@ -31,21 +31,14 @@ from mediagoblin.auth.tools import (send_verification_email, register_user,
from mediagoblin import auth from mediagoblin import auth
@allow_registration
@auth_enabled
def register(request): def register(request):
"""The registration view. """The registration view.
Note that usernames will always be lowercased. Email domains are lowercased while Note that usernames will always be lowercased. Email domains are lowercased while
the first part remains case-sensitive. the first part remains case-sensitive.
""" """
# Redirects to indexpage if registrations are disabled or no authentication
# is enabled
if not mg_globals.app_config["allow_registration"] or not mg_globals.app.auth:
messages.add_message(
request,
messages.WARNING,
_('Sorry, registration is disabled on this instance.'))
return redirect(request, "index")
if 'pass_auth' not in request.template_env.globals: if 'pass_auth' not in request.template_env.globals:
redirect_name = hook_handle('auth_no_pass_redirect') redirect_name = hook_handle('auth_no_pass_redirect')
return redirect(request, 'mediagoblin.plugins.{0}.register'.format( return redirect(request, 'mediagoblin.plugins.{0}.register'.format(
@ -71,20 +64,13 @@ def register(request):
'post_url': request.urlgen('mediagoblin.auth.register')}) 'post_url': request.urlgen('mediagoblin.auth.register')})
@auth_enabled
def login(request): def login(request):
""" """
MediaGoblin login view. MediaGoblin login view.
If you provide the POST with 'next', it'll redirect to that view. If you provide the POST with 'next', it'll redirect to that view.
""" """
# Redirects to index page if no authentication is enabled
if not mg_globals.app.auth:
messages.add_message(
request,
messages.WARNING,
_('Sorry, authentication is disabled on this instance.'))
return redirect(request, 'index')
if 'pass_auth' not in request.template_env.globals: if 'pass_auth' not in request.template_env.globals:
redirect_name = hook_handle('auth_no_pass_redirect') redirect_name = hook_handle('auth_no_pass_redirect')
return redirect(request, 'mediagoblin.plugins.{0}.login'.format( return redirect(request, 'mediagoblin.plugins.{0}.login'.format(

View File

@ -307,6 +307,7 @@ def drop_token_related_User_columns(db):
db.commit() db.commit()
class CommentSubscription_v0(declarative_base()): class CommentSubscription_v0(declarative_base()):
__tablename__ = 'core__comment_subscriptions' __tablename__ = 'core__comment_subscriptions'
id = Column(Integer, primary_key=True) id = Column(Integer, primary_key=True)
@ -378,4 +379,3 @@ def pw_hash_nullable(db):
constraint.create() constraint.create()
db.commit() db.commit()

View File

@ -18,11 +18,12 @@ from functools import wraps
from urlparse import urljoin from urlparse import urljoin
from werkzeug.exceptions import Forbidden, NotFound from werkzeug.exceptions import Forbidden, NotFound
from werkzeug.urls import url_quote
from mediagoblin import mg_globals as mgg from mediagoblin import mg_globals as mgg
from mediagoblin import messages
from mediagoblin.db.models import MediaEntry, User from mediagoblin.db.models import MediaEntry, User
from mediagoblin.tools.response import redirect, render_404 from mediagoblin.tools.response import redirect, render_404
from mediagoblin.tools.translate import pass_to_ugettext as _
def require_active_login(controller): def require_active_login(controller):
@ -235,3 +236,35 @@ def get_workbench(func):
return func(*args, workbench=workbench, **kwargs) return func(*args, workbench=workbench, **kwargs)
return new_func return new_func
def allow_registration(controller):
""" Decorator for if registration is enabled"""
@wraps(controller)
def wrapper(request, *args, **kwargs):
if not mgg.app_config["allow_registration"]:
messages.add_message(
request,
messages.WARNING,
_('Sorry, registration is disabled on this instance.'))
return redirect(request, "index")
return controller(request, *args, **kwargs)
return wrapper
def auth_enabled(controller):
"""Decorator for if an auth plugin is enabled"""
@wraps(controller)
def wrapper(request, *args, **kwargs):
if not mgg.app.auth:
messages.add_message(
request,
messages.WARNING,
_('Sorry, authentication is disabled on this instance.'))
return redirect(request, 'index')
return controller(request, *args, **kwargs)
return wrapper

View File

@ -236,30 +236,7 @@ def edit_account(request):
user.license_preference = form.license_preference.data user.license_preference = form.license_preference.data
if form.new_email.data: if form.new_email.data:
new_email = form.new_email.data _update_email(request, form, user)
users_with_email = User.query.filter_by(
email=new_email).count()
if users_with_email:
form.new_email.errors.append(
_('Sorry, a user with that email address'
' already exists.'))
else:
verification_key = get_timed_signer_url(
'mail_verification_token').dumps({
'user': user.id,
'email': new_email})
rendered_email = render_template(
request, 'mediagoblin/edit/verification.txt',
{'username': user.username,
'verification_url': EMAIL_VERIFICATION_TEMPLATE.format(
uri=request.urlgen('mediagoblin.edit.verify_email',
qualified=True),
verification_key=verification_key)})
email_debug_message(request)
auth_tools.send_verification_email(user, request, new_email,
rendered_email)
if not form.errors: if not form.errors:
user.save() user.save()
@ -365,6 +342,10 @@ def edit_collection(request, collection):
@require_active_login @require_active_login
def change_pass(request): def change_pass(request):
# If no password authentication, no need to change your password
if 'pass_auth' not in request.template_env.globals:
return redirect(request, 'index')
form = forms.ChangePassForm(request.form) form = forms.ChangePassForm(request.form)
user = request.user user = request.user
@ -442,3 +423,32 @@ def verify_email(request):
return redirect( return redirect(
request, 'mediagoblin.user_pages.user_home', request, 'mediagoblin.user_pages.user_home',
user=user.username) user=user.username)
def _update_email(request, form, user):
new_email = form.new_email.data
users_with_email = User.query.filter_by(
email=new_email).count()
if users_with_email:
form.new_email.errors.append(
_('Sorry, a user with that email address'
' already exists.'))
elif not users_with_email:
verification_key = get_timed_signer_url(
'mail_verification_token').dumps({
'user': user.id,
'email': new_email})
rendered_email = render_template(
request, 'mediagoblin/edit/verification.txt',
{'username': user.username,
'verification_url': EMAIL_VERIFICATION_TEMPLATE.format(
uri=request.urlgen('mediagoblin.edit.verify_email',
qualified=True),
verification_key=verification_key)})
email_debug_message(request)
auth_tools.send_verification_email(user, request, new_email,
rendered_email)

View File

@ -111,7 +111,7 @@ class CsrfMeddleware(BaseMeddleware):
httponly=True) httponly=True)
# update the Vary header # update the Vary header
response.vary = (getattr(response, 'vary', None) or []) + ['Cookie'] response.vary = list(getattr(response, 'vary', None) or []) + ['Cookie']
def _make_token(self, request): def _make_token(self, request):
"""Generate a new token to use for CSRF protection.""" """Generate a new token to use for CSRF protection."""

View File

@ -15,6 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from mediagoblin.plugins.basic_auth import forms as auth_forms from mediagoblin.plugins.basic_auth import forms as auth_forms
from mediagoblin.plugins.basic_auth import tools as auth_tools from mediagoblin.plugins.basic_auth import tools as auth_tools
from mediagoblin.auth.tools import create_basic_user
from mediagoblin.db.models import User from mediagoblin.db.models import User
from mediagoblin.tools import pluginapi from mediagoblin.tools import pluginapi
from sqlalchemy import or_ from sqlalchemy import or_
@ -38,9 +39,7 @@ def get_user(**kwargs):
def create_user(registration_form): def create_user(registration_form):
user = get_user(username=registration_form.username.data) user = get_user(username=registration_form.username.data)
if not user and 'password' in registration_form: if not user and 'password' in registration_form:
user = User() user = create_basic_user(registration_form)
user.username = registration_form.username.data
user.email = registration_form.email.data
user.pw_hash = gen_password_hash( user.pw_hash = gen_password_hash(
registration_form.password.data) registration_form.password.data)
user.save() user.save()
@ -89,7 +88,7 @@ hooks = {
'auth_fake_login_attempt': auth_tools.fake_login_attempt, 'auth_fake_login_attempt': auth_tools.fake_login_attempt,
'template_global_context': append_to_global_context, 'template_global_context': append_to_global_context,
('mediagoblin.plugins.openid.register', ('mediagoblin.plugins.openid.register',
'mediagoblin/auth/register.html'): add_to_form_context, 'mediagoblin/auth/register.html'): add_to_form_context,
('mediagoblin.plugins.openid.login', ('mediagoblin.plugins.openid.finish_login',
'mediagoblin/auth/login.html'): add_to_form_context, 'mediagoblin/auth/register.html'): add_to_form_context,
} }

View File

@ -0,0 +1,122 @@
# GNU MediaGoblin -- federated, autonomous media hosting
# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import uuid
from sqlalchemy import or_
from mediagoblin.auth.tools import create_basic_user
from mediagoblin.db.models import User
from mediagoblin.plugins.openid.models import OpenIDUserURL
from mediagoblin.tools import pluginapi
from mediagoblin.tools.translate import lazy_pass_to_ugettext as _
PLUGIN_DIR = os.path.dirname(__file__)
def setup_plugin():
config = pluginapi.get_config('mediagoblin.plugins.openid')
routes = [
('mediagoblin.plugins.openid.register',
'/auth/openid/register/',
'mediagoblin.plugins.openid.views:register'),
('mediagoblin.plugins.openid.login',
'/auth/openid/login/',
'mediagoblin.plugins.openid.views:login'),
('mediagoblin.plugins.openid.finish_login',
'/auth/openid/login/finish/',
'mediagoblin.plugins.openid.views:finish_login'),
('mediagoblin.plugins.openid.edit',
'/edit/openid/',
'mediagoblin.plugins.openid.views:start_edit'),
('mediagoblin.plugins.openid.finish_edit',
'/edit/openid/finish/',
'mediagoblin.plugins.openid.views:finish_edit'),
('mediagoblin.plugins.openid.delete',
'/edit/openid/delete/',
'mediagoblin.plugins.openid.views:delete_openid'),
('mediagoblin.plugins.openid.finish_delete',
'/edit/openid/delete/finish/',
'mediagoblin.plugins.openid.views:finish_delete')]
pluginapi.register_routes(routes)
pluginapi.register_template_path(os.path.join(PLUGIN_DIR, 'templates'))
def create_user(register_form):
if 'openid' in register_form:
username = register_form.username.data
user = User.query.filter(
or_(
User.username == username,
User.email == username,
)).first()
if not user:
user = create_basic_user(register_form)
new_entry = OpenIDUserURL()
new_entry.openid_url = register_form.openid.data
new_entry.user_id = user.id
new_entry.save()
return user
def extra_validation(register_form):
openid = register_form.openid.data if 'openid' in \
register_form else None
if openid:
openid_url_exists = OpenIDUserURL.query.filter_by(
openid_url=openid
).count()
extra_validation_passes = True
if openid_url_exists:
register_form.openid.errors.append(
_('Sorry, an account is already registered to that OpenID.'))
extra_validation_passes = False
return extra_validation_passes
def no_pass_redirect():
return 'openid'
def add_to_form_context(context):
context['openid_link'] = True
return context
def Auth():
return True
hooks = {
'setup': setup_plugin,
'authentication': Auth,
'auth_extra_validation': extra_validation,
'auth_create_user': create_user,
'auth_no_pass_redirect': no_pass_redirect,
('mediagoblin.auth.register',
'mediagoblin/auth/register.html'): add_to_form_context,
('mediagoblin.auth.login',
'mediagoblin/auth/login.html'): add_to_form_context,
('mediagoblin.edit.account',
'mediagoblin/edit/edit_account.html'): add_to_form_context,
}

View File

@ -0,0 +1,41 @@
# GNU MediaGoblin -- federated, autonomous media hosting
# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import wtforms
from mediagoblin.tools.translate import lazy_pass_to_ugettext as _
from mediagoblin.auth.tools import normalize_user_or_email_field
class RegistrationForm(wtforms.Form):
openid = wtforms.HiddenField(
'',
[wtforms.validators.Required()])
username = wtforms.TextField(
_('Username'),
[wtforms.validators.Required(),
normalize_user_or_email_field(allow_email=False)])
email = wtforms.TextField(
_('Email address'),
[wtforms.validators.Required(),
normalize_user_or_email_field(allow_user=False)])
class LoginForm(wtforms.Form):
openid = wtforms.TextField(
_('OpenID'),
[wtforms.validators.Required(),
# Can openid's only be urls?
wtforms.validators.URL(message='Please enter a valid url.')])

View File

@ -0,0 +1,28 @@
# GNU MediaGoblin -- federated, autonomous media hosting
# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import mediagoblin.plugins.openid.forms as auth_forms
def get_register_form(request):
# This function will check to see if persona plugin is enabled. If so,
# this function will call hook_transform? and return a modified form
# containing both openid & persona info.
return auth_forms.RegistrationForm(request.form)
def get_login_form(request):
# See register_form comment above
return auth_forms.LoginForm(request.form)

View File

@ -0,0 +1,65 @@
# GNU MediaGoblin -- federated, autonomous media hosting
# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from sqlalchemy import Column, Integer, Unicode, ForeignKey
from sqlalchemy.orm import relationship, backref
from mediagoblin.db.models import User
from mediagoblin.db.base import Base
class OpenIDUserURL(Base):
__tablename__ = "openid__user_urls"
id = Column(Integer, primary_key=True)
openid_url = Column(Unicode, nullable=False)
user_id = Column(Integer, ForeignKey(User.id), nullable=False)
# OpenID's are owned by their user, so do the full thing.
user = relationship(User, backref=backref('openid_urls',
cascade='all, delete-orphan'))
# OpenID Store Models
class Nonce(Base):
__tablename__ = "openid__nonce"
server_url = Column(Unicode, primary_key=True)
timestamp = Column(Integer, primary_key=True)
salt = Column(Unicode, primary_key=True)
def __unicode__(self):
return u'Nonce: %r, %r' % (self.server_url, self.salt)
class Association(Base):
__tablename__ = "openid__association"
server_url = Column(Unicode, primary_key=True)
handle = Column(Unicode, primary_key=True)
secret = Column(Unicode)
issued = Column(Integer)
lifetime = Column(Integer)
assoc_type = Column(Unicode)
def __unicode__(self):
return u'Association: %r, %r' % (self.server_url, self.handle)
MODELS = [
OpenIDUserURL,
Nonce,
Association
]

View File

@ -0,0 +1,128 @@
# GNU MediaGoblin -- federated, autonomous media hosting
# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import time
from openid.association import Association as OIDAssociation
from openid.store.interface import OpenIDStore
from openid.store import nonce
from mediagoblin.plugins.openid.models import Association, Nonce
class SQLAlchemyOpenIDStore(OpenIDStore):
def __init__(self):
self.max_nonce_age = 6 * 60 * 60
def storeAssociation(self, server_url, association):
assoc = Association.query.filter_by(
server_url=server_url, handle=association.handle
).first()
if not assoc:
assoc = Association()
assoc.server_url = unicode(server_url)
assoc.handle = association.handle
# django uses base64 encoding, python-openid uses a blob field for
# secret
assoc.secret = unicode(base64.encodestring(association.secret))
assoc.issued = association.issued
assoc.lifetime = association.lifetime
assoc.assoc_type = association.assoc_type
assoc.save()
def getAssociation(self, server_url, handle=None):
assocs = []
if handle is not None:
assocs = Association.query.filter_by(
server_url=server_url, handle=handle
)
else:
assocs = Association.query.filter_by(
server_url=server_url
)
if assocs.count() == 0:
return None
else:
associations = []
for assoc in assocs:
association = OIDAssociation(
assoc.handle, base64.decodestring(assoc.secret),
assoc.issued, assoc.lifetime, assoc.assoc_type
)
if association.getExpiresIn() == 0:
assoc.delete()
else:
associations.append((association.issued, association))
if not associations:
return None
associations.sort()
return associations[-1][1]
def removeAssociation(self, server_url, handle):
assocs = Association.query.filter_by(
server_url=server_url, handle=handle
).first()
assoc_exists = True if assocs else False
for assoc in assocs:
assoc.delete()
return assoc_exists
def useNonce(self, server_url, timestamp, salt):
if abs(timestamp - time.time()) > nonce.SKEW:
return False
ononce = Nonce.query.filter_by(
server_url=server_url,
timestamp=timestamp,
salt=salt
).first()
if ononce:
return False
else:
ononce = Nonce()
ononce.server_url = server_url
ononce.timestamp = timestamp
ononce.salt = salt
ononce.save()
return True
# Need to test these cleanups, not sure if the expired Association query
# will work
def cleanupNonces(self, _now=None):
if _now is None:
_now = int(time.time())
expired = Nonce.query.filter(
Nonce.timestamp < (_now - nonce.SKEW)
)
count = expired.count()
for each in expired:
each.delete()
return count
def cleanupAssociations(self):
now = int(time.time())
expired = Association.query.filter(
'issued + lifetime' < now)
count = expired.count()
for each in expired:
each.delete()
return count

View File

@ -0,0 +1,44 @@
{#
# GNU MediaGoblin -- federated, autonomous media hosting
# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#}
{% extends "mediagoblin/base.html" %}
{% import "/mediagoblin/utils/wtforms.html" as wtforms_util %}
{% block title -%}
{% trans %}Add an OpenID{% endtrans %} &mdash; {{ super() }}
{%- endblock %}
{% block mediagoblin_content %}
<form action="{{ request.urlgen('mediagoblin.plugins.openid.edit') }}"
method="POST" enctype="multipart/form-data">
{{ csrf_token }}
<div class="form_box">
<h1>{% trans %}Add an OpenID{% endtrans %}</h1>
<p>
<a href="{{ request.urlgen('mediagoblin.plugins.openid.delete') }}">
{% trans %}Delete an OpenID{% endtrans %}
</a>
</p>
{{ wtforms_util.render_divs(form, True) }}
<div class="form_submit_buttons">
<input type="submit" value="{% trans %}Add{% endtrans %}" class="button_form"/>
</div>
</div>
</form>
{% endblock %}

View File

@ -0,0 +1,43 @@
{#
# GNU MediaGoblin -- federated, autonomous media hosting
# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#}
{% extends "mediagoblin/base.html" %}
{% import "/mediagoblin/utils/wtforms.html" as wtforms_util %}
{% block title -%}
{% trans %}Delete an OpenID{% endtrans %} &mdash; {{ super() }}
{%- endblock %}
{% block mediagoblin_content %}
<form action="{{ request.urlgen('mediagoblin.plugins.openid.delete') }}"
method="POST" enctype="multipart/form-data">
{{ csrf_token }}
<div class="form_box">
<h1>{% trans %}Delete an OpenID{% endtrans %}</h1>
<p>
<a href="{{ request.urlgen('mediagoblin.plugins.openid.edit') }}">
{% trans %}Add an OpenID{% endtrans %}
</a>
</p>
{{ wtforms_util.render_divs(form, True) }}
<div class="form_submit_buttons">
<input type="submit" value="{% trans %}Delete{% endtrans %}" class="button_form"/>
</div>
</div>
</form>
{% endblock %}

View File

@ -0,0 +1,65 @@
{#
# GNU MediaGoblin -- federated, autonomous media hosting
# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#}
{% extends "mediagoblin/base.html" %}
{% import "/mediagoblin/utils/wtforms.html" as wtforms_util %}
{% block mediagoblin_head %}
<script type="text/javascript"
src="{{ request.staticdirect('/js/autofilledin_password.js') }}"></script>
{% endblock %}
{% block title -%}
{% trans %}Log in{% endtrans %} &mdash; {{ super() }}
{%- endblock %}
{% block mediagoblin_content %}
<form action="{{ post_url }}"
method="POST" enctype="multipart/form-data">
{{ csrf_token }}
<div class="form_box">
<h1>{% trans %}Log in{% endtrans %}</h1>
{% if login_failed %}
<div class="form_field_error">
{% trans %}Logging in failed!{% endtrans %}
</div>
{% endif %}
{% if allow_registration %}
<p>
{% trans %}Log in to create an account!{% endtrans %}
</p>
{% endif %}
{% if pass_auth is defined %}
<p>
<a href="{{ request.urlgen('mediagoblin.auth.login') }}?{{ request.query_string }}">
{%- trans %}Or login with a password!{% endtrans %}
</a>
</p>
{% endif %}
{{ wtforms_util.render_divs(login_form, True) }}
<div class="form_submit_buttons">
<input type="submit" value="{% trans %}Log in{% endtrans %}" class="button_form"/>
</div>
{% if next %}
<input type="hidden" name="next" value="{{ next }}" class="button_form"
style="display: none;"/>
{% endif %}
</div>
</form>
{% endblock %}

View File

@ -0,0 +1,24 @@
{#
# GNU MediaGoblin -- federated, autonomous media hosting
# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#}
{% extends "mediagoblin/base.html" %}
{% block mediagoblin_content %}
<div onload="document.getElementById('openid_message').submit()">
{{ html|safe }}
{% endblock %}

View File

@ -0,0 +1,405 @@
# GNU MediaGoblin -- federated, autonomous media hosting
# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from openid.consumer import consumer
from openid.consumer.discover import DiscoveryFailure
from openid.extensions.sreg import SRegRequest, SRegResponse
from mediagoblin import mg_globals, messages
from mediagoblin.db.models import User
from mediagoblin.decorators import (auth_enabled, allow_registration,
require_active_login)
from mediagoblin.tools.response import redirect, render_to_response
from mediagoblin.tools.translate import pass_to_ugettext as _
from mediagoblin.plugins.openid import lib as auth_lib
from mediagoblin.plugins.openid import forms as auth_forms
from mediagoblin.plugins.openid.models import OpenIDUserURL
from mediagoblin.plugins.openid.store import SQLAlchemyOpenIDStore
from mediagoblin.auth.tools import register_user
def _start_verification(request, form, return_to, sreg=True):
"""
Start OpenID Verification.
Returns False if verification fails, otherwise, will return either a
redirect or render_to_response object
"""
openid_url = form.openid.data
c = consumer.Consumer(request.session, SQLAlchemyOpenIDStore())
# Try to discover provider
try:
auth_request = c.begin(openid_url)
except DiscoveryFailure:
# Discovery failed, return to login page
form.openid.errors.append(
_('Sorry, the OpenID server could not be found'))
return False
host = 'http://' + request.host
if sreg:
# Ask provider for email and nickname
auth_request.addExtension(SRegRequest(required=['email', 'nickname']))
# Do we even need this?
if auth_request is None:
form.openid.errors.append(
_('No OpenID service was found for %s' % openid_url))
elif auth_request.shouldSendRedirect():
# Begin the authentication process as a HTTP redirect
redirect_url = auth_request.redirectURL(
host, return_to)
return redirect(
request, location=redirect_url)
else:
# Send request as POST
form_html = auth_request.htmlMarkup(
host, host + return_to,
# Is this necessary?
form_tag_attrs={'id': 'openid_message'})
# Beware: this renders a template whose content is a form
# and some javascript to submit it upon page load. Non-JS
# users will have to click the form submit button to
# initiate OpenID authentication.
return render_to_response(
request,
'mediagoblin/plugins/openid/request_form.html',
{'html': form_html})
return False
def _finish_verification(request):
"""
Complete OpenID Verification Process.
If the verification failed, will return false, otherwise, will return
the response
"""
c = consumer.Consumer(request.session, SQLAlchemyOpenIDStore())
# Check the response from the provider
response = c.complete(request.args, request.base_url)
if response.status == consumer.FAILURE:
messages.add_message(
request,
messages.WARNING,
_('Verification of %s failed: %s' %
(response.getDisplayIdentifier(), response.message)))
elif response.status == consumer.SUCCESS:
# Verification was successfull
return response
elif response.status == consumer.CANCEL:
# Verification canceled
messages.add_message(
request,
messages.WARNING,
_('Verification cancelled'))
return False
def _response_email(response):
""" Gets the email from the OpenID providers response"""
sreg_response = SRegResponse.fromSuccessResponse(response)
if sreg_response and 'email' in sreg_response:
return sreg_response.data['email']
return None
def _response_nickname(response):
""" Gets the nickname from the OpenID providers response"""
sreg_response = SRegResponse.fromSuccessResponse(response)
if sreg_response and 'nickname' in sreg_response:
return sreg_response.data['nickname']
return None
@auth_enabled
def login(request):
"""OpenID Login View"""
login_form = auth_lib.get_login_form(request)
allow_registration = mg_globals.app_config["allow_registration"]
# Can't store next in request.GET because of redirects to OpenID provider
# Store it in the session
next = request.GET.get('next')
request.session['next'] = next
login_failed = False
if request.method == 'POST' and login_form.validate():
return_to = request.urlgen(
'mediagoblin.plugins.openid.finish_login')
success = _start_verification(request, login_form, return_to)
if success:
return success
login_failed = True
return render_to_response(
request,
'mediagoblin/plugins/openid/login.html',
{'login_form': login_form,
'next': request.session.get('next'),
'login_failed': login_failed,
'post_url': request.urlgen('mediagoblin.plugins.openid.login'),
'allow_registration': allow_registration})
@auth_enabled
def finish_login(request):
"""Complete OpenID Login Process"""
response = _finish_verification(request)
if not response:
# Verification failed, redirect to login page.
return redirect(request, 'mediagoblin.plugins.openid.login')
# Verification was successfull
query = OpenIDUserURL.query.filter_by(
openid_url=response.identity_url,
).first()
user = query.user if query else None
if user:
# Set up login in session
request.session['user_id'] = unicode(user.id)
request.session.save()
if request.session.get('next'):
return redirect(request, location=request.session.pop('next'))
else:
return redirect(request, "index")
else:
# No user, need to register
if not mg_globals.app.auth:
messages.add_message(
request,
messages.WARNING,
_('Sorry, authentication is disabled on this instance.'))
return redirect(request, 'index')
# Get email and nickname from response
email = _response_email(response)
username = _response_nickname(response)
register_form = auth_forms.RegistrationForm(request.form,
openid=response.identity_url,
email=email,
username=username)
return render_to_response(
request,
'mediagoblin/auth/register.html',
{'register_form': register_form,
'post_url': request.urlgen('mediagoblin.plugins.openid.register')})
@allow_registration
@auth_enabled
def register(request):
"""OpenID Registration View"""
if request.method == 'GET':
# Need to connect to openid provider before registering a user to
# get the users openid url. If method is 'GET', then this page was
# acessed without logging in first.
return redirect(request, 'mediagoblin.plugins.openid.login')
register_form = auth_forms.RegistrationForm(request.form)
if register_form.validate():
user = register_user(request, register_form)
if user:
# redirect the user to their homepage... there will be a
# message waiting for them to verify their email
return redirect(
request, 'mediagoblin.user_pages.user_home',
user=user.username)
return render_to_response(
request,
'mediagoblin/auth/register.html',
{'register_form': register_form,
'post_url': request.urlgen('mediagoblin.plugins.openid.register')})
@require_active_login
def start_edit(request):
"""Starts the process of adding an openid url to a users account"""
form = auth_forms.LoginForm(request.form)
if request.method == 'POST' and form.validate():
query = OpenIDUserURL.query.filter_by(
openid_url=form.openid.data
).first()
user = query.user if query else None
if not user:
return_to = request.urlgen('mediagoblin.plugins.openid.finish_edit')
success = _start_verification(request, form, return_to, False)
if success:
return success
else:
form.openid.errors.append(
_('Sorry, an account is already registered to that OpenID.'))
return render_to_response(
request,
'mediagoblin/plugins/openid/add.html',
{'form': form,
'post_url': request.urlgen('mediagoblin.plugins.openid.edit')})
@require_active_login
def finish_edit(request):
"""Finishes the process of adding an openid url to a user"""
response = _finish_verification(request)
if not response:
# Verification failed, redirect to add openid page.
return redirect(request, 'mediagoblin.plugins.openid.edit')
# Verification was successfull
query = OpenIDUserURL.query.filter_by(
openid_url=response.identity_url,
).first()
user_exists = query.user if query else None
if user_exists:
# user exists with that openid url, redirect back to edit page
messages.add_message(
request,
messages.WARNING,
_('Sorry, an account is already registered to that OpenID.'))
return redirect(request, 'mediagoblin.plugins.openid.edit')
else:
# Save openid to user
user = User.query.filter_by(
id=request.session['user_id']
).first()
new_entry = OpenIDUserURL()
new_entry.openid_url = response.identity_url
new_entry.user_id = user.id
new_entry.save()
messages.add_message(
request,
messages.SUCCESS,
_('Your OpenID url was saved successfully.'))
return redirect(request, 'mediagoblin.edit.account')
@require_active_login
def delete_openid(request):
"""View to remove an openid from a users account"""
form = auth_forms.LoginForm(request.form)
if request.method == 'POST' and form.validate():
# Check if a user has this openid
query = OpenIDUserURL.query.filter_by(
openid_url=form.openid.data
)
user = query.first().user if query.first() else None
if user and user.id == int(request.session['user_id']):
count = len(user.openid_urls)
if not count > 1 and not user.pw_hash:
# Make sure the user has a pw or another OpenID
messages.add_message(
request,
messages.WARNING,
_("You can't delete your only OpenID URL unless you"
" have a password set"))
elif user:
# There is a user, but not the same user who is logged in
form.openid.errors.append(
_('That OpenID is not registered to this account.'))
if not form.errors and not request.session['messages']:
# Okay to continue with deleting openid
return_to = request.urlgen(
'mediagoblin.plugins.openid.finish_delete')
success = _start_verification(request, form, return_to, False)
if success:
return success
return render_to_response(
request,
'mediagoblin/plugins/openid/delete.html',
{'form': form,
'post_url': request.urlgen('mediagoblin.plugins.openid.delete')})
@require_active_login
def finish_delete(request):
"""Finishes the deletion of an OpenID from an user's account"""
response = _finish_verification(request)
if not response:
# Verification failed, redirect to delete openid page.
return redirect(request, 'mediagoblin.plugins.openid.delete')
query = OpenIDUserURL.query.filter_by(
openid_url=response.identity_url
)
user = query.first().user if query.first() else None
# Need to check this again because of generic openid urls such as google's
if user and user.id == int(request.session['user_id']):
count = len(user.openid_urls)
if count > 1 or user.pw_hash:
# User has more then one openid or also has a password.
query.first().delete()
messages.add_message(
request,
messages.SUCCESS,
_('OpenID was successfully removed.'))
return redirect(request, 'mediagoblin.edit.account')
elif not count > 1:
messages.add_message(
request,
messages.WARNING,
_("You can't delete your only OpenID URL unless you have a "
"password set"))
return redirect(request, 'mediagoblin.plugins.openid.delete')
else:
messages.add_message(
request,
messages.WARNING,
_('That OpenID is not registered to this account.'))
return redirect(request, 'mediagoblin.plugins.openid.delete')

View File

@ -29,7 +29,7 @@
{%- endblock %} {%- endblock %}
{% block mediagoblin_content %} {% block mediagoblin_content %}
<form action="{{ request.urlgen('mediagoblin.auth.login') }}" <form action="{{ post_url }}"
method="POST" enctype="multipart/form-data"> method="POST" enctype="multipart/form-data">
{{ csrf_token }} {{ csrf_token }}
<div class="form_box"> <div class="form_box">
@ -41,16 +41,24 @@
{% endif %} {% endif %}
{% if allow_registration %} {% if allow_registration %}
<p> <p>
{% trans %}Don't have an account yet?{% endtrans %} <a href="{{ request.urlgen('mediagoblin.auth.register') }}"> {% trans %}Don't have an account yet?{% endtrans %}
<a href="{{ request.urlgen('mediagoblin.auth.register') }}">
{%- trans %}Create one here!{% endtrans %}</a> {%- trans %}Create one here!{% endtrans %}</a>
</p> </p>
{% endif %} {% endif %}
{% if openid_link is defined %}
<p>
<a href="{{ request.urlgen('mediagoblin.plugins.openid.login') }}?{{ request.query_string }}">
{%- trans %}Or login with OpenID!{% endtrans %}
</a>
</p>
{% endif %}
{{ wtforms_util.render_divs(login_form, True) }} {{ wtforms_util.render_divs(login_form, True) }}
{% if pass_auth %} {% if pass_auth %}
<p> <p>
<a href="{{ request.urlgen('mediagoblin.auth.forgot_password') }}" id="forgot_password"> <a href="{{ request.urlgen('mediagoblin.auth.forgot_password') }}" id="forgot_password">
{% trans %}Forgot your password?{% endtrans %}</a> {% trans %}Forgot your password?{% endtrans %}</a>
</p> </p>
{% endif %} {% endif %}
<div class="form_submit_buttons"> <div class="form_submit_buttons">
<input type="submit" value="{% trans %}Log in{% endtrans %}" class="button_form"/> <input type="submit" value="{% trans %}Log in{% endtrans %}" class="button_form"/>

View File

@ -30,10 +30,23 @@
{% block mediagoblin_content %} {% block mediagoblin_content %}
<form action="{{ request.urlgen('mediagoblin.auth.register') }}" <form action="{{ post_url }}"
method="POST" enctype="multipart/form-data"> method="POST" enctype="multipart/form-data">
<div class="form_box"> <div class="form_box">
<h1>{% trans %}Create an account!{% endtrans %}</h1> <h1>{% trans %}Create an account!{% endtrans %}</h1>
{% if openid_link is defined %}
<p>
<a href="{{ request.urlgen('mediagoblin.plugins.openid.register') }}">
{%- trans %}Or register with OpenID!{% endtrans %}
</a>
</p>
{% elif pass_auth_link is defined %}
<p>
<a href="{{ request.urlgen('mediagoblin.auth.register') }}">
{%- trans %}Or register with a password!{% endtrans %}
</a>
</p>
{% endif %}
{{ wtforms_util.render_divs(register_form, True) }} {{ wtforms_util.render_divs(register_form, True) }}
{{ csrf_token }} {{ csrf_token }}
<div class="form_submit_buttons"> <div class="form_submit_buttons">

View File

@ -41,13 +41,22 @@
Changing {{ username }}'s account settings Changing {{ username }}'s account settings
{%- endtrans -%} {%- endtrans -%}
</h1> </h1>
{% if pass_auth is defined %}
<p> <p>
<a href="{{ request.urlgen('mediagoblin.edit.pass') }}"> <a href="{{ request.urlgen('mediagoblin.edit.pass') }}">
{% trans %}Change your password.{% endtrans %} {% trans %}Change your password.{% endtrans %}
</a> </a>
</p> </p>
{% endif %}
{% if openid_link is defined %}
<p>
<a href="{{ request.urlgen('mediagoblin.plugins.openid.edit') }}">
{% trans %}Edit your OpenID's{% endtrans %}
</a>
</p>
{% endif %}
{{ wtforms_util.render_divs(form, True) }} {{ wtforms_util.render_divs(form, True) }}
<div class="form_submit_buttons"> <div class="form_submit_buttons">
<input type="submit" value="{% trans %}Save changes{% endtrans %}" class="button_form" /> <input type="submit" value="{% trans %}Save changes{% endtrans %}" class="button_form" />
{{ csrf_token }} {{ csrf_token }}
</div> </div>

View File

@ -0,0 +1,41 @@
# GNU MediaGoblin -- federated, autonomous media hosting
# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
[mediagoblin]
direct_remote_path = /test_static/
email_sender_address = "notice@mediagoblin.example.org"
email_debug_mode = true
# TODO: Switch to using an in-memory database
sql_engine = "sqlite:///%(here)s/user_dev/mediagoblin.db"
# Celery shouldn't be set up by the application as it's setup via
# mediagoblin.init.celery.from_celery
celery_setup_elsewhere = true
[storage:publicstore]
base_dir = %(here)s/user_dev/media/public
base_url = /mgoblin_media/
[storage:queuestore]
base_dir = %(here)s/user_dev/media/queue
[celery]
CELERY_ALWAYS_EAGER = true
CELERY_RESULT_DBURI = "sqlite:///%(here)s/user_dev/celery.db"
BROKER_HOST = "sqlite:///%(here)s/user_dev/kombu.db"
[plugins]
[[mediagoblin.plugins.openid]]

View File

@ -14,7 +14,6 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import urlparse import urlparse
import datetime
import pkg_resources import pkg_resources
import pytest import pytest
@ -236,6 +235,7 @@ def test_authentication_views(test_app):
# Make a new user # Make a new user
test_user = fixture_add_user(active_user=False) test_user = fixture_add_user(active_user=False)
# Get login # Get login
# --------- # ---------
test_app.get('/auth/login/') test_app.get('/auth/login/')

View File

@ -32,3 +32,4 @@ BROKER_HOST = "sqlite:///%(here)s/user_dev/kombu.db"
[[mediagoblin.plugins.httpapiauth]] [[mediagoblin.plugins.httpapiauth]]
[[mediagoblin.plugins.piwigo]] [[mediagoblin.plugins.piwigo]]
[[mediagoblin.plugins.basic_auth]] [[mediagoblin.plugins.basic_auth]]
[[mediagoblin.plugins.openid]]

View File

@ -0,0 +1,372 @@
# GNU MediaGoblin -- federated, autonomous media hosting
# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import urlparse
import pkg_resources
import pytest
import mock
from openid.consumer.consumer import SuccessResponse
from mediagoblin import mg_globals
from mediagoblin.db.base import Session
from mediagoblin.db.models import User
from mediagoblin.plugins.openid.models import OpenIDUserURL
from mediagoblin.tests.tools import get_app, fixture_add_user
from mediagoblin.tools import template
# App with plugin enabled
@pytest.fixture()
def openid_plugin_app(request):
return get_app(
request,
mgoblin_config=pkg_resources.resource_filename(
'mediagoblin.tests.auth_configs',
'openid_appconfig.ini'))
class TestOpenIDPlugin(object):
def _setup(self, openid_plugin_app, value=True, edit=False, delete=False):
if value:
response = SuccessResponse(mock.Mock(), mock.Mock())
if edit or delete:
response.identity_url = u'http://add.myopenid.com'
else:
response.identity_url = u'http://real.myopenid.com'
self._finish_verification = mock.Mock(return_value=response)
else:
self._finish_verification = mock.Mock(return_value=False)
@mock.patch('mediagoblin.plugins.openid.views._response_email', mock.Mock(return_value=None))
@mock.patch('mediagoblin.plugins.openid.views._response_nickname', mock.Mock(return_value=None))
@mock.patch('mediagoblin.plugins.openid.views._finish_verification', self._finish_verification)
def _setup_start(self, openid_plugin_app, edit, delete):
if edit:
self._start_verification = mock.Mock(return_value=openid_plugin_app.post(
'/edit/openid/finish/'))
elif delete:
self._start_verification = mock.Mock(return_value=openid_plugin_app.post(
'/edit/openid/delete/finish/'))
else:
self._start_verification = mock.Mock(return_value=openid_plugin_app.post(
'/auth/openid/login/finish/'))
_setup_start(self, openid_plugin_app, edit, delete)
def test_bad_login(self, openid_plugin_app):
""" Test that attempts to login with invalid paramaters"""
# Test GET request for auth/register page
res = openid_plugin_app.get('/auth/register/').follow()
# Make sure it redirected to the correct place
assert urlparse.urlsplit(res.location)[2] == '/auth/openid/login/'
# Test GET request for auth/login page
res = openid_plugin_app.get('/auth/login/')
res.follow()
# Correct redirect?
assert urlparse.urlsplit(res.location)[2] == '/auth/openid/login/'
# Test GET request for auth/openid/register page
res = openid_plugin_app.get('/auth/openid/register/')
res.follow()
# Correct redirect?
assert urlparse.urlsplit(res.location)[2] == '/auth/openid/login/'
# Test GET request for auth/openid/login/finish page
res = openid_plugin_app.get('/auth/openid/login/finish/')
res.follow()
# Correct redirect?
assert urlparse.urlsplit(res.location)[2] == '/auth/openid/login/'
# Test GET request for auth/openid/login page
res = openid_plugin_app.get('/auth/openid/login/')
# Correct place?
assert 'mediagoblin/plugins/openid/login.html' in template.TEMPLATE_TEST_CONTEXT
# Try to login with an empty form
template.clear_test_template_context()
openid_plugin_app.post(
'/auth/openid/login/', {})
context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/plugins/openid/login.html']
form = context['login_form']
assert form.openid.errors == [u'This field is required.']
# Try to login with wrong form values
template.clear_test_template_context()
openid_plugin_app.post(
'/auth/openid/login/', {
'openid': 'not_a_url.com'})
context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/plugins/openid/login.html']
form = context['login_form']
assert form.openid.errors == [u'Please enter a valid url.']
# Should be no users in the db
assert User.query.count() == 0
# Phony OpenID URl
template.clear_test_template_context()
openid_plugin_app.post(
'/auth/openid/login/', {
'openid': 'http://phoney.myopenid.com/'})
context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/plugins/openid/login.html']
form = context['login_form']
assert form.openid.errors == [u'Sorry, the OpenID server could not be found']
def test_login(self, openid_plugin_app):
"""Tests that test login and registion with openid"""
# Test finish_login redirects correctly when response = False
self._setup(openid_plugin_app, False)
@mock.patch('mediagoblin.plugins.openid.views._finish_verification', self._finish_verification)
@mock.patch('mediagoblin.plugins.openid.views._start_verification', self._start_verification)
def _test_non_response():
template.clear_test_template_context()
res = openid_plugin_app.post(
'/auth/openid/login/', {
'openid': 'http://phoney.myopenid.com/'})
res.follow()
# Correct Place?
assert urlparse.urlsplit(res.location)[2] == '/auth/openid/login/'
assert 'mediagoblin/plugins/openid/login.html' in template.TEMPLATE_TEST_CONTEXT
_test_non_response()
# Test login with new openid
# Need to clear_test_template_context before calling _setup
template.clear_test_template_context()
self._setup(openid_plugin_app)
@mock.patch('mediagoblin.plugins.openid.views._finish_verification', self._finish_verification)
@mock.patch('mediagoblin.plugins.openid.views._start_verification', self._start_verification)
def _test_new_user():
openid_plugin_app.post(
'/auth/openid/login/', {
'openid': u'http://real.myopenid.com'})
# Right place?
assert 'mediagoblin/auth/register.html' in template.TEMPLATE_TEST_CONTEXT
context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html']
register_form = context['register_form']
# Register User
res = openid_plugin_app.post(
'/auth/openid/register/', {
'openid': register_form.openid.data,
'username': u'chris',
'email': u'chris@example.com'})
res.follow()
# Correct place?
assert urlparse.urlsplit(res.location)[2] == '/u/chris/'
assert 'mediagoblin/user_pages/user.html' in template.TEMPLATE_TEST_CONTEXT
# No need to test if user is in logged in and verification email
# awaits, since openid uses the register_user function which is
# tested in test_auth
# Logout User
openid_plugin_app.get('/auth/logout')
# Get user and detach from session
test_user = mg_globals.database.User.find_one({
'username': u'chris'})
Session.expunge(test_user)
# Log back in
# Could not get it to work by 'POST'ing to /auth/openid/login/
template.clear_test_template_context()
res = openid_plugin_app.post(
'/auth/openid/login/finish/', {
'openid': u'http://real.myopenid.com'})
res.follow()
assert urlparse.urlsplit(res.location)[2] == '/'
assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT
# Make sure user is in the session
context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html']
session = context['request'].session
assert session['user_id'] == unicode(test_user.id)
_test_new_user()
# Test register with empty form
template.clear_test_template_context()
openid_plugin_app.post(
'/auth/openid/register/', {})
context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html']
register_form = context['register_form']
assert register_form.openid.errors == [u'This field is required.']
assert register_form.email.errors == [u'This field is required.']
assert register_form.username.errors == [u'This field is required.']
# Try to register with existing username and email
template.clear_test_template_context()
openid_plugin_app.post(
'/auth/openid/register/', {
'openid': 'http://real.myopenid.com',
'email': 'chris@example.com',
'username': 'chris'})
context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html']
register_form = context['register_form']
assert register_form.username.errors == [u'Sorry, a user with that name already exists.']
assert register_form.email.errors == [u'Sorry, a user with that email address already exists.']
assert register_form.openid.errors == [u'Sorry, an account is already registered to that OpenID.']
def test_add_delete(self, openid_plugin_app):
"""Test adding and deleting openids"""
# Add user
test_user = fixture_add_user(password='')
openid = OpenIDUserURL()
openid.openid_url = 'http://real.myopenid.com'
openid.user_id = test_user.id
openid.save()
# Log user in
template.clear_test_template_context()
self._setup(openid_plugin_app)
@mock.patch('mediagoblin.plugins.openid.views._finish_verification', self._finish_verification)
@mock.patch('mediagoblin.plugins.openid.views._start_verification', self._start_verification)
def _login_user():
openid_plugin_app.post(
'/auth/openid/login/finish/', {
'openid': u'http://real.myopenid.com'})
_login_user()
# Try and delete only OpenID url
template.clear_test_template_context()
res = openid_plugin_app.post(
'/edit/openid/delete/', {
'openid': 'http://real.myopenid.com'})
assert 'mediagoblin/plugins/openid/delete.html' in template.TEMPLATE_TEST_CONTEXT
# Add OpenID to user
# Empty form
template.clear_test_template_context()
res = openid_plugin_app.post(
'/edit/openid/', {})
context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/plugins/openid/add.html']
form = context['form']
assert form.openid.errors == [u'This field is required.']
# Try with a bad url
template.clear_test_template_context()
openid_plugin_app.post(
'/edit/openid/', {
'openid': u'not_a_url.com'})
context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/plugins/openid/add.html']
form = context['form']
assert form.openid.errors == [u'Please enter a valid url.']
# Try with a url that's already registered
template.clear_test_template_context()
openid_plugin_app.post(
'/edit/openid/', {
'openid': 'http://real.myopenid.com'})
context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/plugins/openid/add.html']
form = context['form']
assert form.openid.errors == [u'Sorry, an account is already registered to that OpenID.']
# Test adding openid to account
# Need to clear_test_template_context before calling _setup
template.clear_test_template_context()
self._setup(openid_plugin_app, edit=True)
# Need to remove openid_url from db because it was added at setup
openid = OpenIDUserURL.query.filter_by(
openid_url=u'http://add.myopenid.com')
openid.delete()
@mock.patch('mediagoblin.plugins.openid.views._finish_verification', self._finish_verification)
@mock.patch('mediagoblin.plugins.openid.views._start_verification', self._start_verification)
def _test_add():
# Successful add
template.clear_test_template_context()
res = openid_plugin_app.post(
'/edit/openid/', {
'openid': u'http://add.myopenid.com'})
res.follow()
# Correct place?
assert urlparse.urlsplit(res.location)[2] == '/edit/account/'
assert 'mediagoblin/edit/edit_account.html' in template.TEMPLATE_TEST_CONTEXT
# OpenID Added?
new_openid = mg_globals.database.OpenIDUserURL.find_one(
{'openid_url': u'http://add.myopenid.com'})
assert new_openid
_test_add()
# Test deleting openid from account
# Need to clear_test_template_context before calling _setup
template.clear_test_template_context()
self._setup(openid_plugin_app, delete=True)
# Need to add OpenID back to user because it was deleted during
# patch
openid = OpenIDUserURL()
openid.openid_url = 'http://add.myopenid.com'
openid.user_id = test_user.id
openid.save()
@mock.patch('mediagoblin.plugins.openid.views._finish_verification', self._finish_verification)
@mock.patch('mediagoblin.plugins.openid.views._start_verification', self._start_verification)
def _test_delete(self, test_user):
# Delete openid from user
# Create another user to test deleting OpenID that doesn't belong to them
new_user = fixture_add_user(username='newman')
openid = OpenIDUserURL()
openid.openid_url = 'http://realfake.myopenid.com/'
openid.user_id = new_user.id
openid.save()
# Try and delete OpenID url that isn't the users
template.clear_test_template_context()
res = openid_plugin_app.post(
'/edit/openid/delete/', {
'openid': 'http://realfake.myopenid.com/'})
context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/plugins/openid/delete.html']
form = context['form']
assert form.openid.errors == [u'That OpenID is not registered to this account.']
# Delete OpenID
# Kind of weird to POST to delete/finish
template.clear_test_template_context()
res = openid_plugin_app.post(
'/edit/openid/delete/finish/', {
'openid': u'http://add.myopenid.com'})
res.follow()
# Correct place?
assert urlparse.urlsplit(res.location)[2] == '/edit/account/'
assert 'mediagoblin/edit/edit_account.html' in template.TEMPLATE_TEST_CONTEXT
# OpenID deleted?
new_openid = mg_globals.database.OpenIDUserURL.find_one(
{'openid_url': u'http://add.myopenid.com'})
assert not new_openid
_test_delete(self, test_user)