Merge remote-tracking branch 'upstream/master' into auth
Conflicts: mediagoblin/app.py mediagoblin/auth/forms.py mediagoblin/auth/tools.py mediagoblin/db/migrations.py mediagoblin/db/models.py mediagoblin/edit/views.py mediagoblin/plugins/basic_auth/tools.py mediagoblin/tests/test_edit.py
This commit is contained in:
@@ -118,20 +118,15 @@ def test_register_views(test_app):
|
||||
assert path == u'/auth/verify_email/'
|
||||
parsed_get_params = urlparse.parse_qs(get_params)
|
||||
|
||||
### user should have these same parameters
|
||||
assert parsed_get_params['userid'] == [
|
||||
unicode(new_user.id)]
|
||||
assert parsed_get_params['token'] == [
|
||||
new_user.verification_key]
|
||||
|
||||
## Try verifying with bs verification key, shouldn't work
|
||||
template.clear_test_template_context()
|
||||
response = test_app.get(
|
||||
"/auth/verify_email/?userid=%s&token=total_bs" % unicode(
|
||||
new_user.id))
|
||||
"/auth/verify_email/?token=total_bs")
|
||||
response.follow()
|
||||
context = template.TEMPLATE_TEST_CONTEXT[
|
||||
'mediagoblin/user_pages/user.html']
|
||||
|
||||
# Correct redirect?
|
||||
assert urlparse.urlsplit(response.location)[2] == '/'
|
||||
|
||||
# assert context['verification_successful'] == True
|
||||
# TODO: Would be good to test messages here when we can do so...
|
||||
new_user = mg_globals.database.User.find_one(
|
||||
@@ -195,35 +190,17 @@ def test_register_views(test_app):
|
||||
|
||||
path = urlparse.urlsplit(email_context['verification_url'])[2]
|
||||
get_params = urlparse.urlsplit(email_context['verification_url'])[3]
|
||||
assert path == u'/auth/forgot_password/verify/'
|
||||
parsed_get_params = urlparse.parse_qs(get_params)
|
||||
|
||||
# user should have matching parameters
|
||||
new_user = mg_globals.database.User.find_one({'username': u'happygirl'})
|
||||
assert parsed_get_params['userid'] == [unicode(new_user.id)]
|
||||
assert parsed_get_params['token'] == [new_user.fp_verification_key]
|
||||
|
||||
### The forgotten password token should be set to expire in ~ 10 days
|
||||
# A few ticks have expired so there are only 9 full days left...
|
||||
assert (new_user.fp_token_expire - datetime.datetime.now()).days == 9
|
||||
assert path == u'/auth/forgot_password/verify/'
|
||||
|
||||
## Try using a bs password-changing verification key, shouldn't work
|
||||
template.clear_test_template_context()
|
||||
response = test_app.get(
|
||||
"/auth/forgot_password/verify/?userid=%s&token=total_bs" % unicode(
|
||||
new_user.id), status=404)
|
||||
assert response.status.split()[0] == u'404' # status="404 NOT FOUND"
|
||||
"/auth/forgot_password/verify/?token=total_bs")
|
||||
response.follow()
|
||||
|
||||
## Try using an expired token to change password, shouldn't work
|
||||
template.clear_test_template_context()
|
||||
new_user = mg_globals.database.User.find_one({'username': u'happygirl'})
|
||||
real_token_expiration = new_user.fp_token_expire
|
||||
new_user.fp_token_expire = datetime.datetime.now()
|
||||
new_user.save()
|
||||
response = test_app.get("%s?%s" % (path, get_params), status=404)
|
||||
assert response.status.split()[0] == u'404' # status="404 NOT FOUND"
|
||||
new_user.fp_token_expire = real_token_expiration
|
||||
new_user.save()
|
||||
# Correct redirect?
|
||||
assert urlparse.urlsplit(response.location)[2] == '/'
|
||||
|
||||
## Verify step 1 of password-change works -- can see form to change password
|
||||
template.clear_test_template_context()
|
||||
@@ -234,7 +211,6 @@ def test_register_views(test_app):
|
||||
template.clear_test_template_context()
|
||||
response = test_app.post(
|
||||
'/auth/forgot_password/verify/', {
|
||||
'userid': parsed_get_params['userid'],
|
||||
'password': 'iamveryveryhappy',
|
||||
'token': parsed_get_params['token']})
|
||||
response.follow()
|
||||
|
||||
@@ -48,7 +48,7 @@ def test_setup_celery_from_config():
|
||||
assert isinstance(fake_celery_module.CELERYD_ETA_SCHEDULER_PRECISION, float)
|
||||
assert fake_celery_module.CELERY_RESULT_PERSISTENT is True
|
||||
assert fake_celery_module.CELERY_IMPORTS == [
|
||||
'foo.bar.baz', 'this.is.an.import', 'mediagoblin.processing.task']
|
||||
'foo.bar.baz', 'this.is.an.import', 'mediagoblin.processing.task', 'mediagoblin.notifications.task']
|
||||
assert fake_celery_module.CELERY_RESULT_BACKEND == 'database'
|
||||
assert fake_celery_module.CELERY_RESULT_DBURI == (
|
||||
'sqlite:///' +
|
||||
|
||||
@@ -15,13 +15,12 @@
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import urlparse
|
||||
import pytest
|
||||
|
||||
from mediagoblin import mg_globals
|
||||
from mediagoblin.db.models import User
|
||||
from mediagoblin.tests.tools import fixture_add_user
|
||||
from mediagoblin.tools import template
|
||||
from mediagoblin import auth
|
||||
from mediagoblin.tools import template, mail
|
||||
|
||||
|
||||
class TestUserEdit(object):
|
||||
@@ -142,4 +141,68 @@ class TestUserEdit(object):
|
||||
assert form.url.errors == [
|
||||
u'This address contains errors']
|
||||
|
||||
def test_email_change(self, test_app):
|
||||
self.login(test_app)
|
||||
|
||||
# Test email already in db
|
||||
template.clear_test_template_context()
|
||||
test_app.post(
|
||||
'/edit/account/', {
|
||||
'new_email': 'chris@example.com',
|
||||
'password': 'toast'})
|
||||
|
||||
# Check form errors
|
||||
context = template.TEMPLATE_TEST_CONTEXT[
|
||||
'mediagoblin/edit/edit_account.html']
|
||||
assert context['form'].new_email.errors == [
|
||||
u'Sorry, a user with that email address already exists.']
|
||||
|
||||
# Test successful email change
|
||||
template.clear_test_template_context()
|
||||
res = test_app.post(
|
||||
'/edit/account/', {
|
||||
'new_email': 'new@example.com',
|
||||
'password': 'toast'})
|
||||
res.follow()
|
||||
|
||||
# Correct redirect?
|
||||
assert urlparse.urlsplit(res.location)[2] == '/u/chris/'
|
||||
|
||||
# Make sure we get email verification and try verifying
|
||||
assert len(mail.EMAIL_TEST_INBOX) == 1
|
||||
message = mail.EMAIL_TEST_INBOX.pop()
|
||||
assert message['To'] == 'new@example.com'
|
||||
email_context = template.TEMPLATE_TEST_CONTEXT[
|
||||
'mediagoblin/edit/verification.txt']
|
||||
assert email_context['verification_url'] in \
|
||||
message.get_payload(decode=True)
|
||||
|
||||
path = urlparse.urlsplit(email_context['verification_url'])[2]
|
||||
assert path == u'/edit/verify_email/'
|
||||
|
||||
## Try verifying with bs verification key, shouldn't work
|
||||
template.clear_test_template_context()
|
||||
res = test_app.get(
|
||||
"/edit/verify_email/?token=total_bs")
|
||||
res.follow()
|
||||
|
||||
# Correct redirect?
|
||||
assert urlparse.urlsplit(res.location)[2] == '/'
|
||||
|
||||
# Email shouldn't be saved
|
||||
email_in_db = mg_globals.database.User.find_one(
|
||||
{'email': 'new@example.com'})
|
||||
email = User.query.filter_by(username='chris').first().email
|
||||
assert email_in_db is None
|
||||
assert email == 'chris@example.com'
|
||||
|
||||
# Verify email activation works
|
||||
template.clear_test_template_context()
|
||||
get_params = urlparse.urlsplit(email_context['verification_url'])[3]
|
||||
res = test_app.get('%s?%s' % (path, get_params))
|
||||
res.follow()
|
||||
|
||||
# New email saved?
|
||||
email = User.query.filter_by(username='chris').first().email
|
||||
assert email == 'new@example.com'
|
||||
# test changing the url inproperly
|
||||
|
||||
@@ -28,8 +28,10 @@ def test_user_deletes_other_comments(test_app):
|
||||
user_a = fixture_add_user(u"chris_a")
|
||||
user_b = fixture_add_user(u"chris_b")
|
||||
|
||||
media_a = fixture_media_entry(uploader=user_a.id, save=False)
|
||||
media_b = fixture_media_entry(uploader=user_b.id, save=False)
|
||||
media_a = fixture_media_entry(uploader=user_a.id, save=False,
|
||||
expunge=False, fake_upload=False)
|
||||
media_b = fixture_media_entry(uploader=user_b.id, save=False,
|
||||
expunge=False, fake_upload=False)
|
||||
Session.add(media_a)
|
||||
Session.add(media_b)
|
||||
Session.flush()
|
||||
@@ -79,7 +81,7 @@ def test_user_deletes_other_comments(test_app):
|
||||
def test_media_deletes_broken_attachment(test_app):
|
||||
user_a = fixture_add_user(u"chris_a")
|
||||
|
||||
media = fixture_media_entry(uploader=user_a.id, save=False)
|
||||
media = fixture_media_entry(uploader=user_a.id, save=False, expunge=False)
|
||||
media.attachment_files.append(dict(
|
||||
name=u"some name",
|
||||
filepath=[u"does", u"not", u"exist"],
|
||||
|
||||
151
mediagoblin/tests/test_notifications.py
Normal file
151
mediagoblin/tests/test_notifications.py
Normal file
@@ -0,0 +1,151 @@
|
||||
# GNU MediaGoblin -- federated, autonomous media hosting
|
||||
# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import pytest
|
||||
|
||||
import urlparse
|
||||
|
||||
from mediagoblin.tools import template, mail
|
||||
|
||||
from mediagoblin.db.models import Notification, CommentNotification, \
|
||||
CommentSubscription
|
||||
from mediagoblin.db.base import Session
|
||||
|
||||
from mediagoblin.notifications import mark_comment_notification_seen
|
||||
|
||||
from mediagoblin.tests.tools import fixture_add_comment, \
|
||||
fixture_media_entry, fixture_add_user, \
|
||||
fixture_comment_subscription
|
||||
|
||||
|
||||
class TestNotifications:
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup(self, test_app):
|
||||
self.test_app = test_app
|
||||
|
||||
# TODO: Possibly abstract into a decorator like:
|
||||
# @as_authenticated_user('chris')
|
||||
self.test_user = fixture_add_user()
|
||||
|
||||
self.current_user = None
|
||||
|
||||
self.login()
|
||||
|
||||
def login(self, username=u'chris', password=u'toast'):
|
||||
response = self.test_app.post(
|
||||
'/auth/login/', {
|
||||
'username': username,
|
||||
'password': password})
|
||||
|
||||
response.follow()
|
||||
|
||||
assert urlparse.urlsplit(response.location)[2] == '/'
|
||||
assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT
|
||||
|
||||
ctx = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html']
|
||||
|
||||
assert Session.merge(ctx['request'].user).username == username
|
||||
|
||||
self.current_user = ctx['request'].user
|
||||
|
||||
def logout(self):
|
||||
self.test_app.get('/auth/logout/')
|
||||
self.current_user = None
|
||||
|
||||
@pytest.mark.parametrize('wants_email', [True, False])
|
||||
def test_comment_notification(self, wants_email):
|
||||
'''
|
||||
Test
|
||||
- if a notification is created when posting a comment on
|
||||
another users media entry.
|
||||
- that the comment data is consistent and exists.
|
||||
|
||||
'''
|
||||
user = fixture_add_user('otherperson', password='nosreprehto',
|
||||
wants_comment_notification=wants_email)
|
||||
|
||||
user_id = user.id
|
||||
|
||||
media_entry = fixture_media_entry(uploader=user.id, state=u'processed')
|
||||
|
||||
media_entry_id = media_entry.id
|
||||
|
||||
subscription = fixture_comment_subscription(media_entry)
|
||||
|
||||
subscription_id = subscription.id
|
||||
|
||||
media_uri_id = '/u/{0}/m/{1}/'.format(user.username,
|
||||
media_entry.id)
|
||||
media_uri_slug = '/u/{0}/m/{1}/'.format(user.username,
|
||||
media_entry.slug)
|
||||
|
||||
self.test_app.post(
|
||||
media_uri_id + 'comment/add/',
|
||||
{
|
||||
'comment_content': u'Test comment #42'
|
||||
}
|
||||
)
|
||||
|
||||
notifications = Notification.query.filter_by(
|
||||
user_id=user.id).all()
|
||||
|
||||
assert len(notifications) == 1
|
||||
|
||||
notification = notifications[0]
|
||||
|
||||
assert type(notification) == CommentNotification
|
||||
assert notification.seen == False
|
||||
assert notification.user_id == user.id
|
||||
assert notification.subject.get_author.id == self.test_user.id
|
||||
assert notification.subject.content == u'Test comment #42'
|
||||
|
||||
if wants_email == True:
|
||||
assert mail.EMAIL_TEST_MBOX_INBOX == [
|
||||
{'from': 'notice@mediagoblin.example.org',
|
||||
'message': 'Content-Type: text/plain; \
|
||||
charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: \
|
||||
base64\nSubject: GNU MediaGoblin - chris commented on your \
|
||||
post\nFrom: notice@mediagoblin.example.org\nTo: \
|
||||
otherperson@example.com\n\nSGkgb3RoZXJwZXJzb24sCmNocmlzIGNvbW1lbnRlZCBvbiB5b3VyIHBvc3QgKGh0dHA6Ly9sb2Nh\nbGhvc3Q6ODAvdS9vdGhlcnBlcnNvbi9tL3NvbWUtdGl0bGUvYy8xLyNjb21tZW50KSBhdCBHTlUg\nTWVkaWFHb2JsaW4KClRlc3QgY29tbWVudCAjNDIKCkdOVSBNZWRpYUdvYmxpbg==\n',
|
||||
'to': [u'otherperson@example.com']}]
|
||||
else:
|
||||
assert mail.EMAIL_TEST_MBOX_INBOX == []
|
||||
|
||||
# Save the ids temporarily because of DetachedInstanceError
|
||||
notification_id = notification.id
|
||||
comment_id = notification.subject.id
|
||||
|
||||
self.logout()
|
||||
self.login('otherperson', 'nosreprehto')
|
||||
|
||||
self.test_app.get(media_uri_slug + '/c/{0}/'.format(comment_id))
|
||||
|
||||
notification = Notification.query.filter_by(id=notification_id).first()
|
||||
|
||||
assert notification.seen == True
|
||||
|
||||
self.test_app.get(media_uri_slug + '/notifications/silence/')
|
||||
|
||||
subscription = CommentSubscription.query.filter_by(id=subscription_id)\
|
||||
.first()
|
||||
|
||||
assert subscription.notify == False
|
||||
|
||||
notifications = Notification.query.filter_by(
|
||||
user_id=user_id).all()
|
||||
|
||||
# User should not have been notified
|
||||
assert len(notifications) == 1
|
||||
@@ -15,18 +15,17 @@
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
|
||||
import sys
|
||||
import os
|
||||
import pkg_resources
|
||||
import shutil
|
||||
|
||||
from functools import wraps
|
||||
|
||||
from paste.deploy import loadapp
|
||||
from webtest import TestApp
|
||||
|
||||
from mediagoblin import mg_globals
|
||||
from mediagoblin.db.models import User, MediaEntry, Collection
|
||||
from mediagoblin.db.models import User, MediaEntry, Collection, MediaComment, \
|
||||
CommentSubscription, CommentNotification
|
||||
from mediagoblin.tools import testing
|
||||
from mediagoblin.init.config import read_mediagoblin_config
|
||||
from mediagoblin.db.base import Session
|
||||
@@ -171,7 +170,7 @@ def assert_db_meets_expected(db, expected):
|
||||
|
||||
|
||||
def fixture_add_user(username=u'chris', password=u'toast',
|
||||
active_user=True):
|
||||
active_user=True, wants_comment_notification=True):
|
||||
# Reuse existing user or create a new one
|
||||
test_user = User.query.filter_by(username=username).first()
|
||||
if test_user is None:
|
||||
@@ -184,6 +183,8 @@ def fixture_add_user(username=u'chris', password=u'toast',
|
||||
test_user.email_verified = True
|
||||
test_user.status = u'active'
|
||||
|
||||
test_user.wants_comment_notification = wants_comment_notification
|
||||
|
||||
test_user.save()
|
||||
|
||||
# Reload
|
||||
@@ -195,19 +196,79 @@ def fixture_add_user(username=u'chris', password=u'toast',
|
||||
return test_user
|
||||
|
||||
|
||||
def fixture_comment_subscription(entry, notify=True, send_email=None):
|
||||
if send_email is None:
|
||||
uploader = User.query.filter_by(id=entry.uploader).first()
|
||||
send_email = uploader.wants_comment_notification
|
||||
|
||||
cs = CommentSubscription(
|
||||
media_entry_id=entry.id,
|
||||
user_id=entry.uploader,
|
||||
notify=notify,
|
||||
send_email=send_email)
|
||||
|
||||
cs.save()
|
||||
|
||||
cs = CommentSubscription.query.filter_by(id=cs.id).first()
|
||||
|
||||
Session.expunge(cs)
|
||||
|
||||
return cs
|
||||
|
||||
|
||||
def fixture_add_comment_notification(entry_id, subject_id, user_id,
|
||||
seen=False):
|
||||
cn = CommentNotification(user_id=user_id,
|
||||
seen=seen,
|
||||
subject_id=subject_id)
|
||||
cn.save()
|
||||
|
||||
cn = CommentNotification.query.filter_by(id=cn.id).first()
|
||||
|
||||
Session.expunge(cn)
|
||||
|
||||
return cn
|
||||
|
||||
|
||||
def fixture_media_entry(title=u"Some title", slug=None,
|
||||
uploader=None, save=True, gen_slug=True):
|
||||
uploader=None, save=True, gen_slug=True,
|
||||
state=u'unprocessed', fake_upload=True,
|
||||
expunge=True):
|
||||
"""
|
||||
Add a media entry for testing purposes.
|
||||
|
||||
Caution: if you're adding multiple entries with fake_upload=True,
|
||||
make sure you save between them... otherwise you'll hit an
|
||||
IntegrityError from multiple newly-added-MediaEntries adding
|
||||
FileKeynames at once. :)
|
||||
"""
|
||||
if uploader is None:
|
||||
uploader = fixture_add_user().id
|
||||
|
||||
entry = MediaEntry()
|
||||
entry.title = title
|
||||
entry.slug = slug
|
||||
entry.uploader = uploader or fixture_add_user().id
|
||||
entry.uploader = uploader
|
||||
entry.media_type = u'image'
|
||||
entry.state = state
|
||||
|
||||
if fake_upload:
|
||||
entry.media_files = {'thumb': ['a', 'b', 'c.jpg'],
|
||||
'medium': ['d', 'e', 'f.png'],
|
||||
'original': ['g', 'h', 'i.png']}
|
||||
entry.media_type = u'mediagoblin.media_types.image'
|
||||
|
||||
if gen_slug:
|
||||
entry.generate_slug()
|
||||
|
||||
if save:
|
||||
entry.save()
|
||||
|
||||
if expunge:
|
||||
entry = MediaEntry.query.filter_by(id=entry.id).first()
|
||||
|
||||
Session.expunge(entry)
|
||||
|
||||
return entry
|
||||
|
||||
|
||||
@@ -231,3 +292,25 @@ def fixture_add_collection(name=u"My first Collection", user=None):
|
||||
|
||||
return coll
|
||||
|
||||
def fixture_add_comment(author=None, media_entry=None, comment=None):
|
||||
if author is None:
|
||||
author = fixture_add_user().id
|
||||
|
||||
if media_entry is None:
|
||||
media_entry = fixture_media_entry().id
|
||||
|
||||
if comment is None:
|
||||
comment = \
|
||||
'Auto-generated test comment by user #{0} on media #{0}'.format(
|
||||
author, media_entry)
|
||||
|
||||
comment = MediaComment(author=author,
|
||||
media_entry=media_entry,
|
||||
content=comment)
|
||||
|
||||
comment.save()
|
||||
|
||||
Session.expunge(comment)
|
||||
|
||||
return comment
|
||||
|
||||
|
||||
Reference in New Issue
Block a user