Patch by Strum. Ticket #451 - Convert all mongokit style .find, .find_one, .one calls over to SQLAlchemy queries
This commit is contained in:
parent
3aeca53c85
commit
44082b12d8
@ -24,18 +24,6 @@ Session = scoped_session(sessionmaker())
|
||||
class GMGTableBase(object):
|
||||
query = Session.query_property()
|
||||
|
||||
@classmethod
|
||||
def find(cls, query_dict):
|
||||
return cls.query.filter_by(**query_dict)
|
||||
|
||||
@classmethod
|
||||
def find_one(cls, query_dict):
|
||||
return cls.query.filter_by(**query_dict).first()
|
||||
|
||||
@classmethod
|
||||
def one(cls, query_dict):
|
||||
return cls.find(query_dict).one()
|
||||
|
||||
def get(self, key):
|
||||
return getattr(self, key)
|
||||
|
||||
|
@ -24,7 +24,7 @@ from mediagoblin.db.models import MediaEntry, Tag, MediaTag, Collection
|
||||
|
||||
|
||||
def atomic_update(table, query_dict, update_values):
|
||||
table.find(query_dict).update(update_values,
|
||||
table.query.filter_by(**query_dict).update(update_values,
|
||||
synchronize_session=False)
|
||||
Session.commit()
|
||||
|
||||
|
@ -87,8 +87,8 @@ def user_may_alter_collection(controller):
|
||||
"""
|
||||
@wraps(controller)
|
||||
def wrapper(request, *args, **kwargs):
|
||||
creator_id = request.db.User.find_one(
|
||||
{'username': request.matchdict['user']}).id
|
||||
creator_id = request.db.User.query.filter_by(
|
||||
username=request.matchdict['user']).first().id
|
||||
if not (request.user.is_admin or
|
||||
request.user.id == creator_id):
|
||||
raise Forbidden()
|
||||
@ -162,15 +162,15 @@ def get_user_collection(controller):
|
||||
"""
|
||||
@wraps(controller)
|
||||
def wrapper(request, *args, **kwargs):
|
||||
user = request.db.User.find_one(
|
||||
{'username': request.matchdict['user']})
|
||||
user = request.db.User.query.filter_by(
|
||||
username=request.matchdict['user']).first()
|
||||
|
||||
if not user:
|
||||
return render_404(request)
|
||||
|
||||
collection = request.db.Collection.find_one(
|
||||
{'slug': request.matchdict['collection'],
|
||||
'creator': user.id})
|
||||
collection = request.db.Collection.query.filter_by(
|
||||
slug=request.matchdict['collection'],
|
||||
creator=user.id).first()
|
||||
|
||||
# Still no collection? Okay, 404.
|
||||
if not collection:
|
||||
@ -187,14 +187,14 @@ def get_user_collection_item(controller):
|
||||
"""
|
||||
@wraps(controller)
|
||||
def wrapper(request, *args, **kwargs):
|
||||
user = request.db.User.find_one(
|
||||
{'username': request.matchdict['user']})
|
||||
user = request.db.User.query.filter_by(
|
||||
username=request.matchdict['user']).first()
|
||||
|
||||
if not user:
|
||||
return render_404(request)
|
||||
|
||||
collection_item = request.db.CollectionItem.find_one(
|
||||
{'id': request.matchdict['collection_item'] })
|
||||
collection_item = request.db.CollectionItem.query.filter_by(
|
||||
id=request.matchdict['collection_item']).first()
|
||||
|
||||
# Still no collection item? Okay, 404.
|
||||
if not collection_item:
|
||||
|
@ -305,9 +305,9 @@ def edit_collection(request, collection):
|
||||
form.slug.data, collection.id)
|
||||
|
||||
# Make sure there isn't already a Collection with this title
|
||||
existing_collection = request.db.Collection.find_one({
|
||||
'creator': request.user.id,
|
||||
'title':form.title.data})
|
||||
existing_collection = request.db.Collection.query.filter_by(
|
||||
creator=request.user.id,
|
||||
title=form.title.data).first()
|
||||
|
||||
if existing_collection and existing_collection.id != collection.id:
|
||||
messages.add_message(
|
||||
|
@ -63,7 +63,7 @@ def _import_media(db, args):
|
||||
# TODO: Add import of queue files
|
||||
queue_cache = BasicFileStorage(args._cache_path['queue'])
|
||||
|
||||
for entry in db.MediaEntry.find():
|
||||
for entry in db.MediaEntry.query.filter_by():
|
||||
for name, path in entry.media_files.items():
|
||||
_log.info('Importing: {0} - {1}'.format(
|
||||
entry.title.encode('ascii', 'replace'),
|
||||
@ -204,7 +204,7 @@ def _export_media(db, args):
|
||||
# TODO: Add export of queue files
|
||||
queue_cache = BasicFileStorage(args._cache_path['queue'])
|
||||
|
||||
for entry in db.MediaEntry.find():
|
||||
for entry in db.MediaEntry.query.filter_by():
|
||||
for name, path in entry.media_files.items():
|
||||
_log.info(u'Exporting {0} - {1}'.format(
|
||||
entry.title,
|
||||
|
@ -40,9 +40,9 @@ def adduser(args):
|
||||
|
||||
db = mg_globals.database
|
||||
users_with_username = \
|
||||
db.User.find({
|
||||
'username': args.username.lower(),
|
||||
}).count()
|
||||
db.User.query.filter_by(
|
||||
username=args.username.lower()
|
||||
).count()
|
||||
|
||||
if users_with_username:
|
||||
print u'Sorry, a user with that name already exists.'
|
||||
@ -71,7 +71,8 @@ def makeadmin(args):
|
||||
|
||||
db = mg_globals.database
|
||||
|
||||
user = db.User.one({'username': unicode(args.username.lower())})
|
||||
user = db.User.query.filter_by(
|
||||
username=unicode(args.username.lower())).one()
|
||||
if user:
|
||||
user.is_admin = True
|
||||
user.save()
|
||||
@ -94,7 +95,8 @@ def changepw(args):
|
||||
|
||||
db = mg_globals.database
|
||||
|
||||
user = db.User.one({'username': unicode(args.username.lower())})
|
||||
user = db.User.query.filter_by(
|
||||
username=unicode(args.username.lower())).one()
|
||||
if user:
|
||||
user.pw_hash = auth.gen_password_hash(args.password)
|
||||
user.save()
|
||||
|
@ -138,9 +138,9 @@ def add_collection(request, media=None):
|
||||
collection.generate_slug()
|
||||
|
||||
# Make sure this user isn't duplicating an existing collection
|
||||
existing_collection = request.db.Collection.find_one({
|
||||
'creator': request.user.id,
|
||||
'title':collection.title})
|
||||
existing_collection = request.db.Collection.query.filter_by(
|
||||
creator=request.user.id,
|
||||
title=collection.title).first()
|
||||
|
||||
if existing_collection:
|
||||
add_message(request, messages.ERROR,
|
||||
|
@ -93,8 +93,8 @@ def test_register_views(test_app):
|
||||
assert 'mediagoblin/user_pages/user.html' in template.TEMPLATE_TEST_CONTEXT
|
||||
|
||||
## Make sure user is in place
|
||||
new_user = mg_globals.database.User.find_one(
|
||||
{'username': u'happygirl'})
|
||||
new_user = mg_globals.database.User.query.filter_by(
|
||||
username=u'happygirl').first()
|
||||
assert new_user
|
||||
assert new_user.status == u'needs_email_verification'
|
||||
assert new_user.email_verified == False
|
||||
@ -128,8 +128,8 @@ def test_register_views(test_app):
|
||||
|
||||
# assert context['verification_successful'] == True
|
||||
# TODO: Would be good to test messages here when we can do so...
|
||||
new_user = mg_globals.database.User.find_one(
|
||||
{'username': u'happygirl'})
|
||||
new_user = mg_globals.database.User.query.filter_by(
|
||||
username=u'happygirl').first()
|
||||
assert new_user
|
||||
assert new_user.status == u'needs_email_verification'
|
||||
assert new_user.email_verified == False
|
||||
@ -142,8 +142,8 @@ def test_register_views(test_app):
|
||||
'mediagoblin/user_pages/user.html']
|
||||
# assert context['verification_successful'] == True
|
||||
# TODO: Would be good to test messages here when we can do so...
|
||||
new_user = mg_globals.database.User.find_one(
|
||||
{'username': u'happygirl'})
|
||||
new_user = mg_globals.database.User.query.filter_by(
|
||||
username=u'happygirl').first()
|
||||
assert new_user
|
||||
assert new_user.status == u'active'
|
||||
assert new_user.email_verified == True
|
||||
|
@ -190,8 +190,8 @@ class TestUserEdit(object):
|
||||
assert urlparse.urlsplit(res.location)[2] == '/'
|
||||
|
||||
# Email shouldn't be saved
|
||||
email_in_db = mg_globals.database.User.find_one(
|
||||
{'email': 'new@example.com'})
|
||||
email_in_db = mg_globals.database.User.query.filter_by(
|
||||
email='new@example.com').first()
|
||||
email = User.query.filter_by(username='chris').first().email
|
||||
assert email_in_db is None
|
||||
assert email == 'chris@example.com'
|
||||
|
@ -186,8 +186,8 @@ class TestOpenIDPlugin(object):
|
||||
openid_plugin_app.get('/auth/logout')
|
||||
|
||||
# Get user and detach from session
|
||||
test_user = mg_globals.database.User.find_one({
|
||||
'username': u'chris'})
|
||||
test_user = mg_globals.database.User.query.filter_by(
|
||||
username=u'chris').first()
|
||||
Session.expunge(test_user)
|
||||
|
||||
# Log back in
|
||||
@ -314,8 +314,8 @@ class TestOpenIDPlugin(object):
|
||||
assert 'mediagoblin/edit/edit_account.html' in template.TEMPLATE_TEST_CONTEXT
|
||||
|
||||
# OpenID Added?
|
||||
new_openid = mg_globals.database.OpenIDUserURL.find_one(
|
||||
{'openid_url': u'http://add.myopenid.com'})
|
||||
new_openid = mg_globals.database.OpenIDUserURL.query.filter_by(
|
||||
openid_url=u'http://add.myopenid.com').first()
|
||||
assert new_openid
|
||||
|
||||
_test_add()
|
||||
@ -365,8 +365,8 @@ class TestOpenIDPlugin(object):
|
||||
assert 'mediagoblin/edit/edit_account.html' in template.TEMPLATE_TEST_CONTEXT
|
||||
|
||||
# OpenID deleted?
|
||||
new_openid = mg_globals.database.OpenIDUserURL.find_one(
|
||||
{'openid_url': u'http://add.myopenid.com'})
|
||||
new_openid = mg_globals.database.OpenIDUserURL.query.filter_by(
|
||||
openid_url=u'http://add.myopenid.com').first()
|
||||
assert not new_openid
|
||||
|
||||
_test_delete(self, test_user)
|
||||
|
@ -77,7 +77,7 @@ class TestSubmission:
|
||||
return {'upload_files': [('file', filename)]}
|
||||
|
||||
def check_comments(self, request, media_id, count):
|
||||
comments = request.db.MediaComment.find({'media_entry': media_id})
|
||||
comments = request.db.MediaComment.query.filter_by(media_entry=media_id)
|
||||
assert count == len(list(comments))
|
||||
|
||||
def test_missing_fields(self):
|
||||
@ -122,7 +122,7 @@ class TestSubmission:
|
||||
assert 'mediagoblin/user_pages/user.html' in context
|
||||
|
||||
def check_media(self, request, find_data, count=None):
|
||||
media = MediaEntry.find(find_data)
|
||||
media = MediaEntry.query.filter_by(**find_data)
|
||||
if count is not None:
|
||||
assert media.count() == count
|
||||
if count == 0:
|
||||
@ -240,8 +240,8 @@ class TestSubmission:
|
||||
|
||||
request = context['request']
|
||||
|
||||
media = request.db.MediaEntry.find_one({
|
||||
u'title': u'UNIQUE_TITLE_PLS_DONT_CREATE_OTHER_MEDIA_WITH_THIS_TITLE'})
|
||||
media = request.db.MediaEntry.query.filter_by(
|
||||
title=u'UNIQUE_TITLE_PLS_DONT_CREATE_OTHER_MEDIA_WITH_THIS_TITLE').first()
|
||||
|
||||
assert media.media_type == 'mediagoblin.media_types.image'
|
||||
|
||||
@ -252,7 +252,7 @@ class TestSubmission:
|
||||
response, context = self.do_post({'title': title}, do_follow=True,
|
||||
**self.upload_data(filename))
|
||||
self.check_url(response, '/u/{0}/'.format(self.test_user.username))
|
||||
entry = mg_globals.database.MediaEntry.find_one({'title': title})
|
||||
entry = mg_globals.database.MediaEntry.query.filter_by(title=title).first()
|
||||
assert entry.state == 'failed'
|
||||
assert entry.fail_error == u'mediagoblin.processing:BadMediaFail'
|
||||
|
||||
|
@ -164,7 +164,7 @@ def assert_db_meets_expected(db, expected):
|
||||
for collection_name, collection_data in expected.iteritems():
|
||||
collection = db[collection_name]
|
||||
for expected_document in collection_data:
|
||||
document = collection.find_one({'id': expected_document['id']})
|
||||
document = collection.query.filter_by(id=expected_document['id']).first()
|
||||
assert document is not None # make sure it exists
|
||||
assert document == expected_document # make sure it matches
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user