server/tags: merge also tag relations

This commit is contained in:
rr-
2016-10-22 17:57:25 +02:00
parent 995cd4610d
commit 141c9fcdc9
5 changed files with 177 additions and 95 deletions

View File

@ -449,18 +449,18 @@ def merge_posts(source_post, target_post, replace_content):
raise InvalidPostRelationError('Cannot merge post with itself.')
def merge_tables(table, anti_dup_func, source_post_id, target_post_id):
table1 = table
table2 = sqlalchemy.orm.util.aliased(table)
update_stmt = (sqlalchemy.sql.expression.update(table1)
.where(table1.post_id == source_post_id))
alias1 = table
alias2 = sqlalchemy.orm.util.aliased(table)
update_stmt = (sqlalchemy.sql.expression.update(alias1)
.where(alias1.post_id == source_post_id))
if anti_dup_func is not None:
update_stmt = (update_stmt
.where(~sqlalchemy.exists()
.where(anti_dup_func(table1, table2))
.where(table2.post_id == target_post_id)))
.where(anti_dup_func(alias1, alias2))
.where(alias2.post_id == target_post_id)))
update_stmt = (update_stmt.values(post_id=target_post_id))
update_stmt = update_stmt.values(post_id=target_post_id)
db.session.execute(update_stmt)
def merge_tags(source_post_id, target_post_id):
@ -488,23 +488,23 @@ def merge_posts(source_post, target_post, replace_content):
merge_tables(db.Comment, None, source_post_id, target_post_id)
def merge_relations(source_post_id, target_post_id):
table1 = db.PostRelation
table2 = sqlalchemy.orm.util.aliased(db.PostRelation)
update_stmt = (sqlalchemy.sql.expression.update(table1)
.where(table1.parent_id == source_post_id)
.where(table1.child_id != target_post_id)
alias1 = db.PostRelation
alias2 = sqlalchemy.orm.util.aliased(db.PostRelation)
update_stmt = (sqlalchemy.sql.expression.update(alias1)
.where(alias1.parent_id == source_post_id)
.where(alias1.child_id != target_post_id)
.where(~sqlalchemy.exists()
.where(table2.child_id == table1.child_id)
.where(table2.parent_id == target_post_id))
.where(alias2.child_id == alias1.child_id)
.where(alias2.parent_id == target_post_id))
.values(parent_id=target_post_id))
db.session.execute(update_stmt)
update_stmt = (sqlalchemy.sql.expression.update(table1)
.where(table1.child_id == source_post_id)
.where(table1.parent_id != target_post_id)
update_stmt = (sqlalchemy.sql.expression.update(alias1)
.where(alias1.child_id == source_post_id)
.where(alias1.parent_id != target_post_id)
.where(~sqlalchemy.exists()
.where(table2.parent_id == table1.parent_id)
.where(table2.child_id == target_post_id))
.where(alias2.parent_id == alias1.parent_id)
.where(alias2.child_id == target_post_id))
.values(child_id=target_post_id))
db.session.execute(update_stmt)

View File

@ -223,16 +223,49 @@ def merge_tags(source_tag, target_tag):
assert target_tag
if source_tag.tag_id == target_tag.tag_id:
raise InvalidTagRelationError('Cannot merge tag with itself.')
pt1 = db.PostTag
pt2 = sqlalchemy.orm.util.aliased(db.PostTag)
update_stmt = (sqlalchemy.sql.expression.update(pt1)
.where(db.PostTag.tag_id == source_tag.tag_id)
.where(~sqlalchemy.exists()
.where(pt2.post_id == pt1.post_id)
.where(pt2.tag_id == target_tag.tag_id))
.values(tag_id=target_tag.tag_id))
db.session.execute(update_stmt)
def merge_posts(source_tag_id, target_tag_id):
alias1 = db.PostTag
alias2 = sqlalchemy.orm.util.aliased(db.PostTag)
update_stmt = (sqlalchemy.sql.expression.update(alias1)
.where(alias1.tag_id == source_tag_id))
update_stmt = (update_stmt
.where(~sqlalchemy.exists()
.where(alias1.post_id == alias2.post_id)
.where(alias2.tag_id == target_tag_id)))
update_stmt = update_stmt.values(tag_id=target_tag_id)
db.session.execute(update_stmt)
def merge_relations(table, source_tag_id, target_tag_id):
alias1 = table
alias2 = sqlalchemy.orm.util.aliased(table)
update_stmt = (sqlalchemy.sql.expression.update(alias1)
.where(alias1.parent_id == source_tag_id)
.where(alias1.child_id != target_tag_id)
.where(~sqlalchemy.exists()
.where(alias2.child_id == alias1.child_id)
.where(alias2.parent_id == target_tag_id))
.values(parent_id=target_tag_id))
db.session.execute(update_stmt)
update_stmt = (sqlalchemy.sql.expression.update(alias1)
.where(alias1.child_id == source_tag_id)
.where(alias1.parent_id != target_tag_id)
.where(~sqlalchemy.exists()
.where(alias2.parent_id == alias1.parent_id)
.where(alias2.child_id == target_tag_id))
.values(child_id=target_tag_id))
db.session.execute(update_stmt)
def merge_suggestions(source_tag_id, target_tag_id):
merge_relations(db.TagSuggestion, source_tag_id, target_tag_id)
def merge_implications(source_tag_id, target_tag_id):
merge_relations(db.TagImplication, source_tag_id, target_tag_id)
merge_posts(source_tag.tag_id, target_tag.tag_id)
merge_suggestions(source_tag.tag_id, target_tag.tag_id)
merge_implications(source_tag.tag_id, target_tag.tag_id)
delete(source_tag)

View File

@ -310,7 +310,7 @@ def test_delete(tag_factory):
assert db.session.query(db.Tag).count() == 2
def test_merge_tags_without_usages(tag_factory):
def test_merge_tags_deletes_source_tag(tag_factory):
source_tag = tag_factory(names=['source'])
target_tag = tag_factory(names=['target'])
db.session.add_all([source_tag, target_tag])
@ -322,7 +322,15 @@ def test_merge_tags_without_usages(tag_factory):
assert tag is not None
def test_merge_tags_with_usages(tag_factory, post_factory):
def test_merge_tags_with_itself(tag_factory):
source_tag = tag_factory(names=['source'])
db.session.add(source_tag)
db.session.flush()
with pytest.raises(tags.InvalidTagRelationError):
tags.merge_tags(source_tag, source_tag)
def test_merge_tags_moves_usages(tag_factory, post_factory):
source_tag = tag_factory(names=['source'])
target_tag = tag_factory(names=['target'])
post = post_factory()
@ -337,62 +345,7 @@ def test_merge_tags_with_usages(tag_factory, post_factory):
assert tags.get_tag_by_name('target').post_count == 1
def test_merge_tags_with_itself(tag_factory):
source_tag = tag_factory(names=['source'])
db.session.add(source_tag)
db.session.flush()
with pytest.raises(tags.InvalidTagRelationError):
tags.merge_tags(source_tag, source_tag)
def test_merge_tags_with_its_child_relation(tag_factory, post_factory):
source_tag = tag_factory(names=['source'])
target_tag = tag_factory(names=['target'])
source_tag.suggestions = [target_tag]
source_tag.implications = [target_tag]
post = post_factory()
post.tags = [source_tag, target_tag]
db.session.add_all([source_tag, post])
db.session.flush()
tags.merge_tags(source_tag, target_tag)
db.session.flush()
assert tags.try_get_tag_by_name('source') is None
assert tags.get_tag_by_name('target').post_count == 1
def test_merge_tags_with_its_parent_relation(tag_factory, post_factory):
source_tag = tag_factory(names=['source'])
target_tag = tag_factory(names=['target'])
target_tag.suggestions = [source_tag]
target_tag.implications = [source_tag]
post = post_factory()
post.tags = [source_tag, target_tag]
db.session.add_all([source_tag, target_tag, post])
db.session.flush()
tags.merge_tags(source_tag, target_tag)
db.session.flush()
assert tags.try_get_tag_by_name('source') is None
assert tags.get_tag_by_name('target').post_count == 1
def test_merge_tags_clears_relations(tag_factory):
source_tag = tag_factory(names=['source'])
target_tag = tag_factory(names=['target'])
referring_tag = tag_factory(names=['parent'])
referring_tag.suggestions = [source_tag]
referring_tag.implications = [source_tag]
db.session.add_all([source_tag, target_tag, referring_tag])
db.session.flush()
assert tags.try_get_tag_by_name('parent').implications != []
assert tags.try_get_tag_by_name('parent').suggestions != []
tags.merge_tags(source_tag, target_tag)
db.session.commit()
assert tags.try_get_tag_by_name('source') is None
assert tags.try_get_tag_by_name('parent').implications == []
assert tags.try_get_tag_by_name('parent').suggestions == []
def test_merge_tags_when_target_exists(tag_factory, post_factory):
def test_merge_tags_doesnt_duplicate_usages(tag_factory, post_factory):
source_tag = tag_factory(names=['source'])
target_tag = tag_factory(names=['target'])
post = post_factory()
@ -407,6 +360,103 @@ def test_merge_tags_when_target_exists(tag_factory, post_factory):
assert tags.get_tag_by_name('target').post_count == 1
def test_merge_tags_moves_child_relations(tag_factory):
source_tag = tag_factory(names=['source'])
target_tag = tag_factory(names=['target'])
related_tag = tag_factory()
source_tag.suggestions = [related_tag]
source_tag.implications = [related_tag]
db.session.add_all([source_tag, target_tag, related_tag])
db.session.commit()
assert source_tag.suggestion_count == 1
assert source_tag.implication_count == 1
assert target_tag.suggestion_count == 0
assert target_tag.implication_count == 0
tags.merge_tags(source_tag, target_tag)
db.session.commit()
assert tags.try_get_tag_by_name('source') is None
assert tags.get_tag_by_name('target').suggestion_count == 1
assert tags.get_tag_by_name('target').implication_count == 1
def test_merge_tags_doesnt_duplicate_child_relations(tag_factory):
source_tag = tag_factory(names=['source'])
target_tag = tag_factory(names=['target'])
related_tag = tag_factory()
source_tag.suggestions = [related_tag]
source_tag.implications = [related_tag]
target_tag.suggestions = [related_tag]
target_tag.implications = [related_tag]
db.session.add_all([source_tag, target_tag, related_tag])
db.session.commit()
assert source_tag.suggestion_count == 1
assert source_tag.implication_count == 1
assert target_tag.suggestion_count == 1
assert target_tag.implication_count == 1
tags.merge_tags(source_tag, target_tag)
db.session.commit()
assert tags.try_get_tag_by_name('source') is None
assert tags.get_tag_by_name('target').suggestion_count == 1
assert tags.get_tag_by_name('target').implication_count == 1
def test_merge_tags_moves_parent_relations(tag_factory):
source_tag = tag_factory(names=['source'])
target_tag = tag_factory(names=['target'])
related_tag = tag_factory(names=['related'])
related_tag.suggestions = [related_tag]
related_tag.implications = [related_tag]
db.session.add_all([source_tag, target_tag, related_tag])
db.session.commit()
assert source_tag.suggestion_count == 0
assert source_tag.implication_count == 0
assert target_tag.suggestion_count == 0
assert target_tag.implication_count == 0
tags.merge_tags(source_tag, target_tag)
db.session.commit()
assert tags.try_get_tag_by_name('source') is None
assert tags.get_tag_by_name('related').suggestion_count == 1
assert tags.get_tag_by_name('related').suggestion_count == 1
assert tags.get_tag_by_name('target').suggestion_count == 0
assert tags.get_tag_by_name('target').implication_count == 0
def test_merge_tags_doesnt_create_relation_loop_for_children(tag_factory):
source_tag = tag_factory(names=['source'])
target_tag = tag_factory(names=['target'])
source_tag.suggestions = [target_tag]
source_tag.implications = [target_tag]
db.session.add_all([source_tag, target_tag])
db.session.commit()
assert source_tag.suggestion_count == 1
assert source_tag.implication_count == 1
assert target_tag.suggestion_count == 0
assert target_tag.implication_count == 0
tags.merge_tags(source_tag, target_tag)
db.session.commit()
assert tags.try_get_tag_by_name('source') is None
assert tags.get_tag_by_name('target').suggestion_count == 0
assert tags.get_tag_by_name('target').implication_count == 0
def test_merge_tags_doesnt_create_relation_loop_for_parents(tag_factory):
source_tag = tag_factory(names=['source'])
target_tag = tag_factory(names=['target'])
target_tag.suggestions = [source_tag]
target_tag.implications = [source_tag]
db.session.add_all([source_tag, target_tag])
db.session.commit()
assert source_tag.suggestion_count == 0
assert source_tag.implication_count == 0
assert target_tag.suggestion_count == 1
assert target_tag.implication_count == 1
tags.merge_tags(source_tag, target_tag)
db.session.commit()
assert tags.try_get_tag_by_name('source') is None
assert tags.get_tag_by_name('target').suggestion_count == 0
assert tags.get_tag_by_name('target').implication_count == 0
def test_create_tag(fake_datetime):
with patch('szurubooru.func.tags.update_tag_names'), \
patch('szurubooru.func.tags.update_tag_category_name'), \