Fix some unit tests and bugs

This fixes a lot of the issues with the LocalUser changes that were
merged recently. There was a problem where the attributes of LocalUser
were not being eagerly loaded and because the Session was detached an
exception was being raised when they were accessed.

This also fixes some typo's which were introduced.

Finally this adds a temporary fix for a potential SQLAlchemy bug, this
is a bug where doing:

    User.query.filter(LocalUser.username == "some_username").first()

does NOT yeild a user with the username "some_username" but all users
on the site. The temp fix is to just query the LocalUser, this should
be resolved when bug is confirmed and fixed upstream.
This commit is contained in:
Jessica Tallon 2015-08-24 18:28:41 +02:00
parent e9bb5879f7
commit b4997540dc
25 changed files with 87 additions and 82 deletions

View File

@ -45,7 +45,7 @@ def get_profile(request):
can be found then this function returns a (None, None).
"""
username = request.matchdict["username"]
user = User.query.filter(LocalUser.username==username).first()
user = LocalUser.query.filter(LocalUser.username==username).first()
if user is None:
return None, None
@ -94,7 +94,7 @@ def user_endpoint(request):
def uploads_endpoint(request):
""" Endpoint for file uploads """
username = request.matchdict["username"]
requested_user = User.query.filter(LocalUser.username==username).first()
requested_user = LocalUser.query.filter(LocalUser.username==username).first()
if requested_user is None:
return json_error("No such 'user' with id '{0}'".format(username), 404)
@ -142,7 +142,7 @@ def inbox_endpoint(request, inbox=None):
inbox: allows you to pass a query in to limit inbox scope
"""
username = request.matchdict["username"]
user = User.query.filter(LocalUser.username==username).first()
user = LocalUser.query.filter(LocalUser.username==username).first()
if user is None:
return json_error("No such 'user' with id '{0}'".format(username), 404)
@ -225,7 +225,7 @@ def inbox_major_endpoint(request):
def feed_endpoint(request, outbox=None):
""" Handles the user's outbox - /api/user/<username>/feed """
username = request.matchdict["username"]
requested_user = User.query.filter(LocalUser.username==username).first()
requested_user = LocalUser.query.filter(LocalUser.username==username).first()
# check if the user exists
if requested_user is None:
@ -747,7 +747,7 @@ def lrdd_lookup(request):
username, host = resource.split("@", 1)
# Now lookup the user
user = User.query.filter(LocalUser.username==username).first()
user = LocalUser.query.filter(LocalUser.username==username).first()
if user is None:
return json_error(

View File

@ -38,7 +38,8 @@ from mediagoblin.db.extratypes import JSONEncoded, MutationDict
from mediagoblin.db.migration_tools import (
RegisterMigration, inspect_table, replace_table_hack)
from mediagoblin.db.models import (MediaEntry, Collection, MediaComment, User,
Privilege, Generator, LocalUser, Location)
Privilege, Generator, LocalUser, Location,
Client, RequestToken, AccessToken)
from mediagoblin.db.extratypes import JSONEncoded, MutationDict

View File

@ -451,8 +451,8 @@ class ActivityMixin(object):
"audio": _("audio"),
"person": _("a person"),
}
obj = self.object
target = None if self.target is None else self.target
obj = self.object()
target = None if self.target_id is None else self.target()
actor = self.get_actor
content = verb_to_content.get(self.verb, None)
@ -470,7 +470,7 @@ class ActivityMixin(object):
# Do we want to add a target (indirect object) to content?
if target is not None and "targetted" in content:
if hasattr(target, "title") and target.title.strip(" "):
target_value = terget.title
target_value = target.title
elif target.object_type in object_map:
target_value = object_map[target.object_type]
else:

View File

@ -250,9 +250,29 @@ class User(Base, UserMixin):
__mapper_args__ = {
'polymorphic_identity': 'user',
'polymorphic_on': type
'polymorphic_on': type,
}
def delete(self, **kwargs):
"""Deletes a User and all related entries/comments/files/..."""
# Collections get deleted by relationships.
media_entries = MediaEntry.query.filter(MediaEntry.uploader == self.id)
for media in media_entries:
# TODO: Make sure that "MediaEntry.delete()" also deletes
# all related files/Comments
media.delete(del_orphan_tags=False, commit=False)
# Delete now unused tags
# TODO: import here due to cyclic imports!!! This cries for refactoring
from mediagoblin.db.util import clean_orphan_tags
clean_orphan_tags(commit=False)
# Delete user, pass through commit=False/True in kwargs
username = self.username
super(User, self).delete(**kwargs)
_log.info('Deleted user "{0}" account'.format(username))
def has_privilege(self, privilege, allow_admin=True):
"""
This method checks to make sure a user has all the correct privileges
@ -335,7 +355,7 @@ class LocalUser(User):
upload_limit = Column(Integer)
__mapper_args__ = {
'polymorphic_identity': 'user_local'
"polymorphic_identity": "user_local",
}
## TODO
@ -349,25 +369,6 @@ class LocalUser(User):
'admin' if self.has_privilege(u'admin') else 'user',
self.username)
def delete(self, **kwargs):
"""Deletes a User and all related entries/comments/files/..."""
# Collections get deleted by relationships.
media_entries = MediaEntry.query.filter(MediaEntry.uploader == self.id)
for media in media_entries:
# TODO: Make sure that "MediaEntry.delete()" also deletes
# all related files/Comments
media.delete(del_orphan_tags=False, commit=False)
# Delete now unused tags
# TODO: import here due to cyclic imports!!! This cries for refactoring
from mediagoblin.db.util import clean_orphan_tags
clean_orphan_tags(commit=False)
# Delete user, pass through commit=False/True in kwargs
super(User, self).delete(**kwargs)
_log.info('Deleted user "{0}" account'.format(self.username))
def serialize(self, request):
user = {
"id": "acct:{0}@{1}".format(self.username, request.host),

View File

@ -71,7 +71,7 @@ def addmedia(args):
app = commands_util.setup_app(args)
# get the user
user = app.db.User.query.filter(
user = app.db.LocalUser.query.filter(
LocalUser.username==args.username.lower()
).first()
if user is None:

View File

@ -65,7 +65,7 @@ def batchaddmedia(args):
files_uploaded, files_attempted = 0, 0
# get the user
user = app.db.User.query.filter(
user = app.db.LocalUser.query.filter(
LocalUser.username==args.username.lower()
).first()
if user is None:

View File

@ -47,7 +47,7 @@ def adduser(args):
db = mg_globals.database
users_with_username = \
db.User.query.filter(
db.LocalUser.query.filter(
LocalUser.username==args.username.lower()
).count()
@ -88,7 +88,7 @@ def makeadmin(args):
db = mg_globals.database
user = db.User.query.filter(
user = db.LocalUser.query.filter(
LocalUser.username==six.text_type(args.username.lower())
).one()
if user:
@ -117,7 +117,7 @@ def changepw(args):
db = mg_globals.database
user = db.User.query.filter(
user = db.LocalUser.query.filter(
LocalUser.username==six.text_type(args.username.lower())
).one()
if user:
@ -141,7 +141,7 @@ def deleteuser(args):
db = mg_globals.database
user = db.User.query.filter(
user = db.LocalUser.query.filter(
LocalUser.username==args.username.lower()
).first()
if user:

View File

@ -306,7 +306,7 @@ def blog_delete(request, **kwargs):
Deletes a blog and media entries, tags associated with it.
"""
url_user = request.matchdict.get('user')
owner_user = request.db.User.query.filter(
owner_user = request.db.LocalUser.query.filter(
LocalUser.username==url_user
).first()

View File

@ -122,7 +122,7 @@ def take_away_privileges(user,*privileges):
if len(privileges) == 1:
privilege = Privilege.query.filter(
Privilege.privilege_name==privileges[0]).first()
user = User.query.filter(
user = LocalUser.query.filter(
LocalUser.username==user
).first()
if privilege in user.all_privileges:
@ -155,8 +155,8 @@ def give_privileges(user,*privileges):
if len(privileges) == 1:
privilege = Privilege.query.filter(
Privilege.privilege_name==privileges[0]).first()
user = User.query.filter(
LoclUser.username==user
user = LocalUser.query.filter(
LocalUser.username==user
).first()
if privilege not in user.all_privileges:
user.all_privileges.append(privilege)

View File

@ -79,7 +79,7 @@ def moderation_users_detail(request):
'''
Shows details about a particular user.
'''
user = User.query.filter(
user = LocalUser.query.filter(
LocalUser.username==request.matchdict['user']
).first()
active_reports = user.reports_filed_on.filter(

View File

@ -34,7 +34,7 @@ def get_media_entry_from_uploader_slug(uploader_username, slug):
:returns media A MediaEntry object or None if no entry
matches the specifications.
"""
uploader = User.query.filter(
uploader = LocalUser.query.filter(
LocalUser.username==uploader_username
).first()
media = MediaEntry.query.filter(

View File

@ -38,7 +38,7 @@ def login(request):
login_form.password.data)
if username:
user = User.query.filter(
user = LocalUser.query.filter(
LocalUser.username==username
).first()

View File

@ -44,7 +44,7 @@ class MGClientTestCase:
fixture_add_user(username, **options)
def user(self, username):
return User.query.filter(LocalUser.username==username).first()
return LocalUser.query.filter(LocalUser.username==username).first()
def _do_request(self, url, *context_keys, **kwargs):
template.clear_test_template_context()

View File

@ -98,8 +98,8 @@ def test_register_views(test_app):
assert 'mediagoblin/user_pages/user_nonactive.html' in template.TEMPLATE_TEST_CONTEXT
## Make sure user is in place
new_user = mg_globals.database.User.query.filter(
LocalUser.usrname==u'angrygirl'
new_user = mg_globals.database.LocalUser.query.filter(
LocalUser.username==u'angrygirl'
).first()
assert new_user
@ -138,7 +138,7 @@ def test_register_views(test_app):
# assert context['verification_successful'] == True
# TODO: Would be good to test messages here when we can do so...
new_user = mg_globals.database.User.query.filter(
new_user = mg_globals.database.LocalUser.query.filter(
LocalUser.username==u'angrygirl'
).first()
assert new_user
@ -151,7 +151,7 @@ def test_register_views(test_app):
'mediagoblin/user_pages/user.html']
# assert context['verification_successful'] == True
# TODO: Would be good to test messages here when we can do so...
new_user = mg_globals.database.User.query.filter(
new_user = mg_globals.database.LocalUser.query.filter(
LocalUser.username==u'angrygirl'
).first()
assert new_user

View File

@ -88,7 +88,7 @@ def test_change_password(test_app):
assert urlparse.urlsplit(res.location)[2] == '/edit/account/'
# test_user has to be fetched again in order to have the current values
test_user = User.query.filter(LocalUser.username==u'chris').first()
test_user = LocalUser.query.filter(LocalUser.username==u'chris').first()
assert auth_tools.bcrypt_check_password('123456', test_user.pw_hash)
# test that the password cannot be changed if the given
@ -100,5 +100,5 @@ def test_change_password(test_app):
'new_password': '098765',
})
test_user = User.query.filter(LocalUser.username==u'chris').first()
test_user = LocalUser.query.filter(LocalUser.username==u'chris').first()
assert not auth_tools.bcrypt_check_password('098765', test_user.pw_hash)

View File

@ -44,12 +44,12 @@ class TestUserEdit(object):
self.login(test_app)
# Make sure user exists
assert User.query.filter(LocalUser.username=u'chris').first()
assert LocalUser.query.filter(LocalUser.username==u'chris').first()
res = test_app.post('/edit/account/delete/', {'confirmed': 'y'})
# Make sure user has been deleted
assert User.query.filter(LocalUser.username==u'chris').first() == None
assert LocalUser.query.filter(LocalUser.username==u'chris').first() == None
#TODO: make sure all corresponding items comments etc have been
# deleted too. Perhaps in submission test?
@ -79,7 +79,7 @@ class TestUserEdit(object):
'bio': u'I love toast!',
'url': u'http://dustycloud.org/'})
test_user = User.query.filter(LocalUser.username==u'chris').first()
test_user = LocalUser.query.filter(LocalUser.username==u'chris').first()
assert test_user.bio == u'I love toast!'
assert test_user.url == u'http://dustycloud.org/'
@ -159,10 +159,10 @@ class TestUserEdit(object):
assert urlparse.urlsplit(res.location)[2] == '/'
# Email shouldn't be saved
email_in_db = mg_globals.database.User.query.filter(
email_in_db = mg_globals.database.LocalUser.query.filter(
LocalUser.email=='new@example.com'
).first()
email = User.query.filter(LocalUser.username=='chris').first().email
email = LocalUser.query.filter(LocalUser.username=='chris').first().email
assert email_in_db is None
assert email == 'chris@example.com'
@ -173,7 +173,7 @@ class TestUserEdit(object):
res.follow()
# New email saved?
email = User.query.filter(LocalUser.username=='chris').first().email
email = LocalUser.query.filter(LocalUser.username=='chris').first().email
assert email == 'new@example.com'
# test changing the url inproperly

View File

@ -115,7 +115,7 @@ def test_ldap_plugin(ldap_plugin_app):
ldap_plugin_app.get('/auth/logout/')
# Get user and detach from session
test_user = mg_globals.database.User.query.filter(
test_user = mg_globals.database.LocalUser.query.filter(
LocalUser.username==u'chris'
).first()
Session.expunge(test_user)

View File

@ -168,9 +168,9 @@ class TestUserHasPrivilege:
privileges=[u'admin',u'moderator',u'active'])
fixture_add_user(u'aeva',
privileges=[u'moderator',u'active'])
self.natalie_user = User.query.filter(
self.natalie_user = LocalUser.query.filter(
LocalUser.username==u'natalie').first()
self.aeva_user = User.query.filter(
self.aeva_user = LocalUser.query.filter(
LocalUser.username==u'aeva').first()
def test_privilege_added_correctly(self, test_app):

View File

@ -47,9 +47,9 @@ class TestModerationViews:
self.query_for_users()
def query_for_users(self):
self.admin_user = User.query.filter(LocalUser.username==u'admin').first()
self.mod_user = User.query.filter(LocalUser.username==u'moderator').first()
self.user = User.query.filter(LocalUser.username==u'regular').first()
self.admin_user = LocalUser.query.filter(LocalUser.username==u'admin').first()
self.mod_user = LocalUser.query.filter(LocalUser.username==u'moderator').first()
self.user = LocalUser.query.filter(LocalUser.username==u'regular').first()
def do_post(self, data, *context_keys, **kwargs):
url = kwargs.pop('url', '/submit/')

View File

@ -192,7 +192,7 @@ class TestOpenIDPlugin(object):
openid_plugin_app.get('/auth/logout')
# Get user and detach from session
test_user = mg_globals.database.User.query.filter(
test_user = mg_globals.database.LocalUser.query.filter(
LocalUser.username==u'chris'
).first()
Session.expunge(test_user)

View File

@ -27,8 +27,8 @@ import six.moves.urllib.parse as urlparse
pytest.importorskip("requests")
from mediagoblin import mg_globals
from mediagoblin.db.base import Session, LocalUser
from mediagoblin.db.models import Privilege
from mediagoblin.db.base import Session
from mediagoblin.db.models import Privilege, LocalUser
from mediagoblin.tests.tools import get_app
from mediagoblin.tools import template
@ -117,14 +117,14 @@ class TestPersonaPlugin(object):
persona_plugin_app.get('/auth/logout/')
# Get user and detach from session
test_user = mg_globals.database.User.query.filter(
test_user = mg_globals.database.LocalUser.query.filter(
LocalUser.username==u'chris'
).first()
active_privilege = Privilege.query.filter(
Privilege.privilege_name==u'active').first()
test_user.all_privileges.append(active_privilege)
test_user.save()
test_user = mg_globals.database.User.query.filter(
test_user = mg_globals.database.LocalUser.query.filter(
LocalUser.username==u'chris'
).first()
Session.expunge(test_user)

View File

@ -64,9 +64,9 @@ class TestPrivilegeFunctionality:
return response, context_data
def query_for_users(self):
self.admin_user = User.query.filter(LocalUser.username==u'alex').first()
self.mod_user = User.query.filter(LocalUser.username==u'meow').first()
self.user = User.query.filter(LocalUser.username==u'natalie').first()
self.admin_user = LocalUser.query.filter(LocalUser.username==u'alex').first()
self.mod_user = LocalUser.query.filter(LocalUser.username==u'meow').first()
self.user = LocalUser.query.filter(LocalUser.username==u'natalie').first()
def testUserBanned(self):
self.login(u'natalie')

View File

@ -20,7 +20,7 @@ import six
from mediagoblin.tools import template
from mediagoblin.tests.tools import (fixture_add_user, fixture_media_entry,
fixture_add_comment, fixture_add_comment_report)
from mediagoblin.db.models import (MediaReport, CommentReport, User, LocalUser
from mediagoblin.db.models import (MediaReport, CommentReport, User, LocalUser,
MediaComment)
@ -56,8 +56,8 @@ class TestReportFiling:
return response, context_data
def query_for_users(self):
return (User.query.filter(LocalUser.username==u'allie').first(),
User.query.filter(LocalUser.username==u'natalie').first())
return (LocalUser.query.filter(LocalUser.username==u'allie').first(),
LocalUser.query.filter(LocalUser.username==u'natalie').first())
def testMediaReports(self):
self.login(u'allie')

View File

@ -72,7 +72,7 @@ class TestSubmission:
#### totally stupid.
#### Also if we found a way to make this run it should be a
#### property.
return User.query.filter(LocalUser.username==u'chris').first()
return LocalUser.query.filter(LocalUser.username==u'chris').first()
def login(self):
self.test_app.post(

View File

@ -176,7 +176,7 @@ def assert_db_meets_expected(db, expected):
def fixture_add_user(username=u'chris', password=u'toast',
privileges=[], wants_comment_notification=True):
# Reuse existing user or create a new one
test_user = User.query.filter(LocalUser.username==username).first()
test_user = LocalUser.query.filter(LocalUser.username==username).first()
if test_user is None:
test_user = LocalUser()
test_user.username = username
@ -190,8 +190,11 @@ def fixture_add_user(username=u'chris', password=u'toast',
test_user.all_privileges.append(query.one())
test_user.save()
# Reload
test_user = User.query.filter(LocalUser.username==username).first()
# Reload - The `with_polymorphic` needs to be there to eagerly load
# the attributes on the LocalUser as this can't be done post detachment.
user_query = LocalUser.query.with_polymorphic(LocalUser)
test_user = user_query.filter(LocalUser.username==username).first()
# ... and detach from session:
Session.expunge(test_user)
@ -201,7 +204,7 @@ def fixture_add_user(username=u'chris', password=u'toast',
def fixture_comment_subscription(entry, notify=True, send_email=None):
if send_email is None:
uploader = User.query.filter_by(id=entry.uploader).first()
uploader = LocalUser.query.filter_by(id=entry.uploader).first()
send_email = uploader.wants_comment_notification
cs = CommentSubscription(