server/snapshots: rewrite

This commit is contained in:
rr-
2016-08-14 20:06:49 +02:00
parent 03a7bd0d5c
commit 80af79779d
36 changed files with 931 additions and 629 deletions

View File

@ -355,50 +355,6 @@ def import_scores(v1_session, v2_session):
v2_session.add(score)
v2_session.commit()
def import_snapshots(v1_session, v2_session):
logger.info('Importing snapshots...')
for row in exec_query(v1_session, 'SELECT * FROM snapshots ORDER BY time ASC'):
snapshot = db.Snapshot()
snapshot.creation_time = row['time']
snapshot.user_id = row['userId']
snapshot.operation = {
0: db.Snapshot.OPERATION_CREATED,
1: db.Snapshot.OPERATION_MODIFIED,
2: db.Snapshot.OPERATION_DELETED,
}[row['operation']]
snapshot.resource_type = {
0: 'post',
1: 'tag',
}[row['type']]
snapshot.resource_id = row['primaryKey']
data = json.loads(zlib.decompress(row['data'], -15).decode('utf-8'))
if snapshot.resource_type == 'post':
if 'contentChecksum' in data:
data['checksum'] = data['contentChecksum']
del data['contentChecksum']
if 'tags' in data and isinstance(data['tags'], dict):
data['tags'] = list(data['tags'].values())
if 'notes' in data:
notes = []
for note in data['notes']:
notes.append({
'polygon': translate_note_polygon(note),
'text': note['text'],
})
data['notes'] = notes
snapshot.resource_repr = row['primaryKey']
elif snapshot.resource_type == 'tag':
if 'banned' in data:
del data['banned']
if 'name' in data:
data['names'] = [data['name']]
del data['name']
snapshot.resource_repr = data['names'][0]
snapshot.data = data
v2_session.add(snapshot)
v2_session.commit()
def main():
args = parse_args()
@ -426,7 +382,6 @@ def main():
import_post_favorites(unused_post_ids, v1_session, v2_session)
import_comments(unused_post_ids, v1_session, v2_session)
import_scores(v1_session, v2_session)
import_snapshots(v1_session, v2_session)
if __name__ == '__main__':
main()

View File

@ -1,5 +1,5 @@
import datetime
from szurubooru import search
from szurubooru import search, db
from szurubooru.rest import routes
from szurubooru.func import (
auth, tags, posts, snapshots, favorites, scores, util, versions)
@ -44,6 +44,9 @@ def create_post(ctx, _params=None):
content, tag_names, None if anonymous else ctx.user)
if len(new_tags):
auth.verify_privilege(ctx.user, 'tags:create')
db.session.flush()
for tag in new_tags:
snapshots.create(tag, ctx.user)
posts.update_post_safety(post, safety)
posts.update_post_source(post, source)
posts.update_post_relations(post, relations)
@ -52,7 +55,7 @@ def create_post(ctx, _params=None):
if ctx.has_file('thumbnail'):
posts.update_post_thumbnail(post, ctx.get_file('thumbnail'))
ctx.session.add(post)
snapshots.save_entity_creation(post, ctx.user)
snapshots.create(post, ctx.user)
ctx.session.commit()
tags.export_to_json()
return _serialize_post(ctx, post)
@ -78,6 +81,9 @@ def update_post(ctx, params):
new_tags = posts.update_post_tags(post, ctx.get_param_as_list('tags'))
if len(new_tags):
auth.verify_privilege(ctx.user, 'tags:create')
db.session.flush()
for tag in new_tags:
snapshots.create(tag, ctx.user)
if ctx.has_param('safety'):
auth.verify_privilege(ctx.user, 'posts:edit:safety')
posts.update_post_safety(post, ctx.get_param_as_string('safety'))
@ -100,7 +106,7 @@ def update_post(ctx, params):
posts.update_post_thumbnail(post, ctx.get_file('thumbnail'))
post.last_edit_time = datetime.datetime.utcnow()
ctx.session.flush()
snapshots.save_entity_modification(post, ctx.user)
snapshots.modify(post, ctx.user)
ctx.session.commit()
tags.export_to_json()
return _serialize_post(ctx, post)
@ -111,7 +117,7 @@ def delete_post(ctx, params):
auth.verify_privilege(ctx.user, 'posts:delete')
post = posts.get_post_by_id(params['post_id'])
versions.verify_version(post, ctx)
snapshots.save_entity_deletion(post, ctx.user)
snapshots.delete(post, ctx.user)
posts.delete(post)
ctx.session.commit()
tags.export_to_json()
@ -134,9 +140,7 @@ def set_featured_post(ctx, _params=None):
raise posts.PostAlreadyFeaturedError(
'Post %r is already featured.' % post_id)
posts.feature_post(post, ctx.user)
if featured_post:
snapshots.save_entity_modification(featured_post, ctx.user)
snapshots.save_entity_modification(post, ctx.user)
snapshots.modify(post, ctx.user)
ctx.session.commit()
return _serialize_post(ctx, post)

View File

@ -10,4 +10,4 @@ _search_executor = search.Executor(search.configs.SnapshotSearchConfig())
def get_snapshots(ctx, _params=None):
auth.verify_privilege(ctx.user, 'snapshots:list')
return _search_executor.execute_and_serialize(
ctx, snapshots.serialize_snapshot)
ctx, lambda snapshot: snapshots.serialize_snapshot(snapshot, ctx.user))

View File

@ -20,7 +20,7 @@ def _create_if_needed(tag_names, user):
auth.verify_privilege(user, 'tags:create')
db.session.flush()
for tag in new_tags:
snapshots.save_entity_creation(tag, user)
snapshots.create(tag, user)
@routes.get('/tags/?')
@ -50,7 +50,7 @@ def create_tag(ctx, _params=None):
tags.update_tag_description(tag, description)
ctx.session.add(tag)
ctx.session.flush()
snapshots.save_entity_creation(tag, ctx.user)
snapshots.create(tag, ctx.user)
ctx.session.commit()
tags.export_to_json()
return _serialize(ctx, tag)
@ -91,7 +91,7 @@ def update_tag(ctx, params):
tags.update_tag_implications(tag, implications)
tag.last_edit_time = datetime.datetime.utcnow()
ctx.session.flush()
snapshots.save_entity_modification(tag, ctx.user)
snapshots.modify(tag, ctx.user)
ctx.session.commit()
tags.export_to_json()
return _serialize(ctx, tag)
@ -102,7 +102,7 @@ def delete_tag(ctx, params):
tag = tags.get_tag_by_name(params['tag_name'])
versions.verify_version(tag, ctx)
auth.verify_privilege(ctx.user, 'tags:delete')
snapshots.save_entity_deletion(tag, ctx.user)
snapshots.delete(tag, ctx.user)
tags.delete(tag)
ctx.session.commit()
tags.export_to_json()
@ -120,7 +120,7 @@ def merge_tags(ctx, _params=None):
versions.bump_version(target_tag)
auth.verify_privilege(ctx.user, 'tags:merge')
tags.merge_tags(source_tag, target_tag)
snapshots.save_entity_deletion(source_tag, ctx.user)
snapshots.merge(source_tag, target_tag, ctx.user)
ctx.session.commit()
tags.export_to_json()
return _serialize(ctx, target_tag)

View File

@ -25,7 +25,7 @@ def create_tag_category(ctx, _params=None):
category = tag_categories.create_category(name, color)
ctx.session.add(category)
ctx.session.flush()
snapshots.save_entity_creation(category, ctx.user)
snapshots.create(category, ctx.user)
ctx.session.commit()
tags.export_to_json()
return _serialize(ctx, category)
@ -52,7 +52,7 @@ def update_tag_category(ctx, params):
tag_categories.update_category_color(
category, ctx.get_param_as_string('color'))
ctx.session.flush()
snapshots.save_entity_modification(category, ctx.user)
snapshots.modify(category, ctx.user)
ctx.session.commit()
tags.export_to_json()
return _serialize(ctx, category)
@ -64,7 +64,7 @@ def delete_tag_category(ctx, params):
versions.verify_version(category, ctx)
auth.verify_privilege(ctx.user, 'tag_categories:delete')
tag_categories.delete_category(category)
snapshots.save_entity_deletion(category, ctx.user)
snapshots.delete(category, ctx.user)
ctx.session.commit()
tags.export_to_json()
return {}
@ -75,7 +75,7 @@ def set_tag_category_as_default(ctx, params):
auth.verify_privilege(ctx.user, 'tag_categories:set_default')
category = tag_categories.get_category_by_name(params['category_name'])
tag_categories.set_default_category(category)
snapshots.save_entity_modification(category, ctx.user)
snapshots.modify(category, ctx.user)
ctx.session.commit()
tags.export_to_json()
return _serialize(ctx, category)

View File

@ -1,11 +1,7 @@
from szurubooru.db.base import Base
from szurubooru.db.user import User
from szurubooru.db.tag_category import TagCategory
from szurubooru.db.tag import (
Tag,
TagName,
TagSuggestion,
TagImplication)
from szurubooru.db.tag import (Tag, TagName, TagSuggestion, TagImplication)
from szurubooru.db.post import (
Post,
PostTag,
@ -14,12 +10,8 @@ from szurubooru.db.post import (
PostScore,
PostNote,
PostFeature)
from szurubooru.db.comment import (
Comment,
CommentScore)
from szurubooru.db.comment import (Comment, CommentScore)
from szurubooru.db.snapshot import Snapshot
from szurubooru.db.session import (
session,
reset_query_count,
get_query_count)
session, sessionmaker, reset_query_count, get_query_count)
import szurubooru.db.util

View File

@ -18,15 +18,12 @@ class QueryCounter(object):
return QueryCounter._query_count
def create_session():
_engine = sqlalchemy.create_engine(config.config['database'])
sqlalchemy.event.listen(
_engine, 'after_execute', lambda *args: QueryCounter.bump())
_session_maker = sqlalchemy.orm.sessionmaker(bind=_engine)
return sqlalchemy.orm.scoped_session(_session_maker)
# pylint: disable=invalid-name
session = create_session()
_engine = sqlalchemy.create_engine(config.config['database'])
sessionmaker = sqlalchemy.orm.sessionmaker(bind=_engine)
session = sqlalchemy.orm.scoped_session(sessionmaker)
reset_query_count = QueryCounter.reset
get_query_count = QueryCounter.get
sqlalchemy.event.listen(
_engine, 'after_execute', lambda *args: QueryCounter.bump())

View File

@ -10,14 +10,17 @@ class Snapshot(Base):
OPERATION_CREATED = 'created'
OPERATION_MODIFIED = 'modified'
OPERATION_DELETED = 'deleted'
OPERATION_MERGED = 'merged'
snapshot_id = Column('id', Integer, primary_key=True)
creation_time = Column('creation_time', DateTime, nullable=False)
operation = Column('operation', Unicode(16), nullable=False)
resource_type = Column(
'resource_type', Unicode(32), nullable=False, index=True)
resource_id = Column('resource_id', Integer, nullable=False, index=True)
resource_repr = Column('resource_repr', Unicode(64), nullable=False)
operation = Column('operation', Unicode(16), nullable=False)
resource_pkey = Column(
'resource_pkey', Integer, nullable=False, index=True)
resource_name = Column(
'resource_name', Unicode(64), nullable=False)
user_id = Column(
'user_id',
Integer,

View File

@ -16,13 +16,13 @@ def get_resource_info(entity):
assert primary_key is not None
assert len(primary_key) == 1
resource_repr = serializers[resource_type](entity)
assert resource_repr
resource_name = serializers[resource_type](entity)
assert resource_name
resource_id = primary_key[0]
assert resource_id
resource_pkey = primary_key[0]
assert resource_pkey
return (resource_type, resource_id, resource_repr)
return (resource_type, resource_pkey, resource_name)
def get_aux_entity(session, get_table_info, entity, user):

View File

@ -0,0 +1,57 @@
def get_list_diff(old, new):
value = {'type': 'list change', 'added': [], 'removed': []}
equal = True
for item in old:
if item not in new:
equal = False
value['removed'].append(item)
for item in new:
if item not in old:
equal = False
value['added'].append(item)
return None if equal else value
def get_dict_diff(old, new):
value = {}
equal = True
for key in old.keys():
if key in new:
if old[key] != new[key]:
if isinstance(old[key], dict) and isinstance(new[key], dict):
value_diff = get_dict_diff(old[key], new[key])
if value_diff:
equal = False
value[key] = value_diff
elif isinstance(old[key], list) and isinstance(new[key], list):
value_diff = get_list_diff(old[key], new[key])
if value_diff:
equal = False
value[key] = value_diff
else:
equal = False
value[key] = {
'type': 'primitive change',
'old-value': old[key],
'new-value': new[key],
}
else:
equal = False
value[key] = {
'type': 'deleted property',
'value': old[key]
}
for key in new.keys():
if key not in old:
equal = False
value[key] = {
'type': 'added property',
'value': new[key],
}
return None if equal else {'type': 'object change', 'value': value}

View File

@ -2,7 +2,7 @@ import datetime
import sqlalchemy
from szurubooru import config, db, errors
from szurubooru.func import (
users, snapshots, scores, comments, tags, util, mime, images, files)
users, scores, comments, tags, util, mime, images, files)
EMPTY_PIXEL = \
@ -165,7 +165,6 @@ def serialize_post(post, auth_user, options=None):
for comment in sorted(
post.comments,
key=lambda comment: comment.creation_time)],
'snapshots': lambda: snapshots.get_serialized_history(post),
},
options)

View File

@ -1,8 +1,19 @@
import datetime
from datetime import datetime
from szurubooru import db
from szurubooru.func import diff, users
def get_tag_category_snapshot(category):
assert category
return {
'name': category.name,
'color': category.color,
'default': True if category.default else False,
}
def get_tag_snapshot(tag):
assert tag
return {
'names': [tag_name.name for tag_name in tag.names],
'category': tag.category.name,
@ -12,132 +23,106 @@ def get_tag_snapshot(tag):
def get_post_snapshot(post):
assert post
return {
'source': post.source,
'safety': post.safety,
'checksum': post.checksum,
'tags': sorted([tag.first_name for tag in post.tags]),
'relations': sorted([
rel.post_id for rel in post.relations]),
'notes': sorted([{
'polygon': note.polygon,
'text': note.text,
} for note in post.notes], key=lambda x: x['polygon']),
'flags': post.flags,
'featured': post.is_featured,
'tags': sorted([tag.first_name for tag in post.tags]),
'relations': sorted([rel.post_id for rel in post.relations]),
'notes': sorted([{
'polygon': [[point[0], point[1]] for point in note.polygon],
'text': note.text,
} for note in post.notes], key=lambda x: x['polygon']),
}
def get_tag_category_snapshot(category):
return {
'name': category.name,
'color': category.color,
'default': True if category.default else False,
}
_snapshot_factories = {
# lambdas allow mocking target functions in the tests
# pylint: disable=unnecessary-lambda
'tag_category': lambda entity: get_tag_category_snapshot(entity),
'tag': lambda entity: get_tag_snapshot(entity),
'post': lambda entity: get_post_snapshot(entity),
}
def get_previous_snapshot(snapshot):
def serialize_snapshot(snapshot, auth_user):
assert snapshot
return db.session \
.query(db.Snapshot) \
.filter(db.Snapshot.resource_type == snapshot.resource_type) \
.filter(db.Snapshot.resource_id == snapshot.resource_id) \
.filter(db.Snapshot.creation_time < snapshot.creation_time) \
.order_by(db.Snapshot.creation_time.desc()) \
.limit(1) \
.first()
def get_snapshots(entity):
assert entity
resource_type, resource_id, _ = db.util.get_resource_info(entity)
return db.session \
.query(db.Snapshot) \
.filter(db.Snapshot.resource_type == resource_type) \
.filter(db.Snapshot.resource_id == resource_id) \
.order_by(db.Snapshot.creation_time.desc()) \
.all()
def serialize_snapshot(snapshot, earlier_snapshot=()):
assert snapshot
if earlier_snapshot is ():
earlier_snapshot = get_previous_snapshot(snapshot)
return {
'operation': snapshot.operation,
'type': snapshot.resource_type,
'id': snapshot.resource_repr,
'user': snapshot.user.name if snapshot.user else None,
'id': snapshot.resource_name,
'user': users.serialize_micro_user(snapshot.user, auth_user),
'data': snapshot.data,
'earlier-data': earlier_snapshot.data if earlier_snapshot else None,
'time': snapshot.creation_time,
}
def get_serialized_history(entity):
if not entity:
return []
ret = []
earlier_snapshot = None
for snapshot in reversed(get_snapshots(entity)):
ret.insert(0, serialize_snapshot(snapshot, earlier_snapshot))
earlier_snapshot = snapshot
return ret
def _save(operation, entity, auth_user):
assert operation
assert entity
serializers = {
'tag': get_tag_snapshot,
'tag_category': get_tag_category_snapshot,
'post': get_post_snapshot,
}
resource_type, resource_id, resource_repr = (
def _create(operation, entity, auth_user):
resource_type, resource_pkey, resource_name = (
db.util.get_resource_info(entity))
now = datetime.datetime.utcnow()
snapshot = db.Snapshot()
snapshot.creation_time = now
snapshot.creation_time = datetime.utcnow()
snapshot.operation = operation
snapshot.resource_type = resource_type
snapshot.resource_id = resource_id
snapshot.resource_repr = resource_repr
snapshot.data = serializers[resource_type](entity)
snapshot.resource_pkey = resource_pkey
snapshot.resource_name = resource_name
snapshot.user = auth_user
earlier_snapshots = get_snapshots(entity)
delta = datetime.timedelta(minutes=10)
snapshots_left = len(earlier_snapshots)
while earlier_snapshots:
last_snapshot = earlier_snapshots.pop(0)
is_fresh = now - last_snapshot.creation_time <= delta
if snapshot.data != last_snapshot.data:
if not is_fresh or last_snapshot.user != auth_user:
break
db.session.delete(last_snapshot)
if snapshot.operation != db.Snapshot.OPERATION_DELETED:
snapshot.operation = last_snapshot.operation
snapshots_left -= 1
if not snapshots_left and operation == db.Snapshot.OPERATION_DELETED:
pass
else:
db.session.add(snapshot)
return snapshot
def save_entity_creation(entity, auth_user):
def create(entity, auth_user):
assert entity
_save(db.Snapshot.OPERATION_CREATED, entity, auth_user)
snapshot = _create(db.Snapshot.OPERATION_CREATED, entity, auth_user)
snapshot_factory = _snapshot_factories[snapshot.resource_type]
snapshot.data = snapshot_factory(entity)
db.session.add(snapshot)
def save_entity_modification(entity, auth_user):
# pylint: disable=protected-access
def modify(entity, auth_user):
assert entity
_save(db.Snapshot.OPERATION_MODIFIED, entity, auth_user)
model = next((model
for model in db.Base._decl_class_registry.values()
if hasattr(model, '__table__')
and model.__table__.fullname == entity.__table__.fullname),
None)
assert model
snapshot = _create(db.Snapshot.OPERATION_MODIFIED, entity, auth_user)
snapshot_factory = _snapshot_factories[snapshot.resource_type]
detached_session = db.sessionmaker()
detached_entity = detached_session.query(model).get(snapshot.resource_pkey)
assert detached_entity, 'Entity not found in DB, have you committed it?'
detached_snapshot = snapshot_factory(detached_entity)
detached_session.close()
active_snapshot = snapshot_factory(entity)
snapshot.data = diff.get_dict_diff(detached_snapshot, active_snapshot)
if not snapshot.data:
return
db.session.add(snapshot)
def save_entity_deletion(entity, auth_user):
def delete(entity, auth_user):
assert entity
_save(db.Snapshot.OPERATION_DELETED, entity, auth_user)
snapshot = _create(db.Snapshot.OPERATION_DELETED, entity, auth_user)
snapshot_factory = _snapshot_factories[snapshot.resource_type]
snapshot.data = snapshot_factory(entity)
db.session.add(snapshot)
def merge(source_entity, target_entity, auth_user):
assert source_entity
assert target_entity
snapshot = _create(db.Snapshot.OPERATION_MERGED, source_entity, auth_user)
resource_type, _resource_pkey, resource_name = (
db.util.get_resource_info(target_entity))
snapshot.data = [resource_type, resource_name]
db.session.add(snapshot)

View File

@ -1,7 +1,7 @@
import re
import sqlalchemy
from szurubooru import config, db, errors
from szurubooru.func import util, snapshots, cache
from szurubooru.func import util, cache
class TagCategoryNotFoundError(errors.NotFoundError):
@ -40,7 +40,6 @@ def serialize_category(category, options=None):
'color': lambda: category.color,
'usages': lambda: category.tag_count,
'default': lambda: category.default,
'snapshots': lambda: snapshots.get_serialized_history(category),
},
options)

View File

@ -4,7 +4,7 @@ import os
import re
import sqlalchemy
from szurubooru import config, db, errors
from szurubooru.func import util, tag_categories, snapshots
from szurubooru.func import util, tag_categories
class TagNotFoundError(errors.NotFoundError):
@ -86,7 +86,6 @@ def serialize_tag(tag, options=None):
'implications': lambda: [
relation.names[0].name
for relation in sort_tags(tag.implications)],
'snapshots': lambda: snapshots.get_serialized_history(tag),
},
options)

View File

@ -0,0 +1,54 @@
'''
Rename snapshot columns
Revision ID: 4a020f1d271a
Created at: 2016-08-16 09:25:38.350861
'''
import sqlalchemy as sa
from alembic import op
revision = '4a020f1d271a'
down_revision = '840b460c5613'
branch_labels = None
depends_on = None
def upgrade():
op.add_column(
'snapshot',
sa.Column('resource_name', sa.Unicode(length=64), nullable=False))
op.add_column(
'snapshot',
sa.Column('resource_pkey', sa.Integer(), nullable=False))
op.create_index(
op.f('ix_snapshot_resource_pkey'),
'snapshot',
['resource_pkey'],
unique=False)
op.drop_index('ix_snapshot_resource_id', table_name='snapshot')
op.drop_column('snapshot', 'resource_id')
op.drop_column('snapshot', 'resource_repr')
def downgrade():
op.add_column(
'snapshot',
sa.Column(
'resource_repr',
sa.VARCHAR(length=64),
autoincrement=False,
nullable=False))
op.add_column(
'snapshot',
sa.Column(
'resource_id',
sa.INTEGER(),
autoincrement=False,
nullable=False))
op.create_index(
'ix_snapshot_resource_id', 'snapshot', ['resource_id'], unique=False)
op.drop_index(op.f('ix_snapshot_resource_pkey'), table_name='snapshot')
op.drop_column('snapshot', 'resource_pkey')
op.drop_column('snapshot', 'resource_name')

View File

@ -14,7 +14,7 @@ class SnapshotSearchConfig(BaseSearchConfig):
def named_filters(self):
return {
'type': search_util.create_str_filter(db.Snapshot.resource_type),
'id': search_util.create_str_filter(db.Snapshot.resource_repr),
'id': search_util.create_str_filter(db.Snapshot.resource_name),
'date': search_util.create_date_filter(db.Snapshot.creation_time),
'time': search_util.create_date_filter(db.Snapshot.creation_time),
'operation': search_util.create_str_filter(db.Snapshot.operation),

View File

@ -31,7 +31,7 @@ def test_creating_minimal_posts(
patch('szurubooru.func.posts.update_post_thumbnail'), \
patch('szurubooru.func.posts.serialize_post'), \
patch('szurubooru.func.tags.export_to_json'), \
patch('szurubooru.func.snapshots.save_entity_creation'):
patch('szurubooru.func.snapshots.create'):
posts.create_post.return_value = (post, [])
posts.serialize_post.return_value = 'serialized post'
@ -61,8 +61,8 @@ def test_creating_minimal_posts(
post, 'post-thumbnail')
posts.serialize_post.assert_called_once_with(
post, auth_user, options=None)
snapshots.create.assert_called_once_with(post, auth_user)
tags.export_to_json.assert_called_once_with()
snapshots.save_entity_creation.assert_called_once_with(post, auth_user)
def test_creating_full_posts(context_factory, post_factory, user_factory):
@ -79,7 +79,7 @@ def test_creating_full_posts(context_factory, post_factory, user_factory):
patch('szurubooru.func.posts.update_post_flags'), \
patch('szurubooru.func.posts.serialize_post'), \
patch('szurubooru.func.tags.export_to_json'), \
patch('szurubooru.func.snapshots.save_entity_creation'):
patch('szurubooru.func.snapshots.create'):
posts.create_post.return_value = (post, [])
posts.serialize_post.return_value = 'serialized post'
@ -110,8 +110,8 @@ def test_creating_full_posts(context_factory, post_factory, user_factory):
post, ['flag1', 'flag2'])
posts.serialize_post.assert_called_once_with(
post, auth_user, options=None)
snapshots.create.assert_called_once_with(post, auth_user)
tags.export_to_json.assert_called_once_with()
snapshots.save_entity_creation.assert_called_once_with(post, auth_user)
def test_anonymous_uploads(
@ -122,7 +122,6 @@ def test_anonymous_uploads(
db.session.flush()
with patch('szurubooru.func.tags.export_to_json'), \
patch('szurubooru.func.snapshots.save_entity_creation'), \
patch('szurubooru.func.posts.serialize_post'), \
patch('szurubooru.func.posts.create_post'), \
patch('szurubooru.func.posts.update_post_source'):
@ -154,7 +153,6 @@ def test_creating_from_url_saves_source(
with patch('szurubooru.func.net.download'), \
patch('szurubooru.func.tags.export_to_json'), \
patch('szurubooru.func.snapshots.save_entity_creation'), \
patch('szurubooru.func.posts.serialize_post'), \
patch('szurubooru.func.posts.create_post'), \
patch('szurubooru.func.posts.update_post_source'):
@ -186,7 +184,6 @@ def test_creating_from_url_with_source_specified(
with patch('szurubooru.func.net.download'), \
patch('szurubooru.func.tags.export_to_json'), \
patch('szurubooru.func.snapshots.save_entity_creation'), \
patch('szurubooru.func.posts.serialize_post'), \
patch('szurubooru.func.posts.create_post'), \
patch('szurubooru.func.posts.update_post_source'):

View File

@ -1,7 +1,7 @@
from unittest.mock import patch
import pytest
from szurubooru import api, db, errors
from szurubooru.func import posts, tags
from szurubooru.func import posts, tags, snapshots
@pytest.fixture(autouse=True)
@ -10,16 +10,17 @@ def inject_config(config_injector):
def test_deleting(user_factory, post_factory, context_factory):
db.session.add(post_factory(id=1))
db.session.commit()
with patch('szurubooru.func.tags.export_to_json'):
auth_user = user_factory(rank=db.User.RANK_REGULAR)
post = post_factory(id=1)
db.session.add(post)
with patch('szurubooru.func.tags.export_to_json'), \
patch('szurubooru.func.snapshots.delete'):
result = api.post_api.delete_post(
context_factory(
params={'version': 1},
user=user_factory(rank=db.User.RANK_REGULAR)),
context_factory(params={'version': 1}, user=auth_user),
{'post_id': 1})
assert result == {}
assert db.session.query(db.Post).count() == 0
snapshots.delete.assert_called_once_with(post, auth_user)
tags.export_to_json.assert_called_once_with()

View File

@ -1,7 +1,7 @@
from unittest.mock import patch
import pytest
from szurubooru import api, db, errors
from szurubooru.func import posts
from szurubooru.func import posts, snapshots
@pytest.fixture(autouse=True)
@ -15,15 +15,15 @@ def inject_config(config_injector):
def test_featuring(user_factory, post_factory, context_factory):
db.session.add(post_factory(id=1))
db.session.commit()
auth_user = user_factory(rank=db.User.RANK_REGULAR)
post = post_factory(id=1)
db.session.add(post)
assert not posts.get_post_by_id(1).is_featured
with patch('szurubooru.func.posts.serialize_post'):
with patch('szurubooru.func.posts.serialize_post'), \
patch('szurubooru.func.snapshots.modify'):
posts.serialize_post.return_value = 'serialized post'
result = api.post_api.set_featured_post(
context_factory(
params={'id': 1},
user=user_factory(rank=db.User.RANK_REGULAR)))
context_factory(params={'id': 1}, user=auth_user))
assert result == 'serialized post'
assert posts.try_get_featured_post() is not None
assert posts.try_get_featured_post().post_id == 1
@ -32,6 +32,7 @@ def test_featuring(user_factory, post_factory, context_factory):
context_factory(
user=user_factory(rank=db.User.RANK_REGULAR)))
assert result == 'serialized post'
snapshots.modify.assert_called_once_with(post, auth_user)
def test_trying_to_omit_required_parameter(user_factory, context_factory):

View File

@ -40,7 +40,7 @@ def test_post_updating(
patch('szurubooru.func.posts.update_post_flags'), \
patch('szurubooru.func.posts.serialize_post'), \
patch('szurubooru.func.tags.export_to_json'), \
patch('szurubooru.func.snapshots.save_entity_modification'), \
patch('szurubooru.func.snapshots.modify'), \
fake_datetime('1997-01-01'):
posts.serialize_post.return_value = 'serialized post'
@ -77,9 +77,8 @@ def test_post_updating(
post, ['flag1', 'flag2'])
posts.serialize_post.assert_called_once_with(
post, auth_user, options=None)
snapshots.modify.assert_called_once_with(post, auth_user)
tags.export_to_json.assert_called_once_with()
snapshots.save_entity_modification.assert_called_once_with(
post, auth_user)
assert post.last_edit_time == datetime(1997, 1, 1)
@ -90,10 +89,10 @@ def test_uploading_from_url_saves_source(
db.session.flush()
with patch('szurubooru.func.net.download'), \
patch('szurubooru.func.tags.export_to_json'), \
patch('szurubooru.func.snapshots.save_entity_modification'), \
patch('szurubooru.func.posts.serialize_post'), \
patch('szurubooru.func.posts.update_post_content'), \
patch('szurubooru.func.posts.update_post_source'):
patch('szurubooru.func.posts.update_post_source'), \
patch('szurubooru.func.snapshots.modify'):
net.download.return_value = b'content'
api.post_api.update_post(
context_factory(
@ -112,10 +111,10 @@ def test_uploading_from_url_with_source_specified(
db.session.flush()
with patch('szurubooru.func.net.download'), \
patch('szurubooru.func.tags.export_to_json'), \
patch('szurubooru.func.snapshots.save_entity_modification'), \
patch('szurubooru.func.posts.serialize_post'), \
patch('szurubooru.func.posts.update_post_content'), \
patch('szurubooru.func.posts.update_post_source'):
patch('szurubooru.func.posts.update_post_source'), \
patch('szurubooru.func.snapshots.modify'):
net.download.return_value = b'content'
api.post_api.update_post(
context_factory(

View File

@ -7,8 +7,8 @@ def snapshot_factory():
snapshot = db.Snapshot()
snapshot.creation_time = datetime(1999, 1, 1)
snapshot.resource_type = 'dummy'
snapshot.resource_id = 1
snapshot.resource_repr = 'dummy'
snapshot.resource_pkey = 1
snapshot.resource_name = 'dummy'
snapshot.operation = 'added'
snapshot.data = '{}'
return snapshot

View File

@ -1,7 +1,7 @@
from unittest.mock import patch
import pytest
from szurubooru import api, db, errors
from szurubooru.func import tag_categories, tags
from szurubooru.func import tag_categories, tags, snapshots
def _update_category_name(category, name):
@ -15,21 +15,26 @@ def inject_config(config_injector):
})
def test_creating_category(user_factory, context_factory):
with patch('szurubooru.func.tag_categories.serialize_category'), \
def test_creating_category(
tag_category_factory, user_factory, context_factory):
auth_user = user_factory(rank=db.User.RANK_REGULAR)
category = tag_category_factory(name='meta')
db.session.add(category)
with patch('szurubooru.func.tag_categories.create_category'), \
patch('szurubooru.func.tag_categories.serialize_category'), \
patch('szurubooru.func.tag_categories.update_category_name'), \
patch('szurubooru.func.snapshots.create'), \
patch('szurubooru.func.tags.export_to_json'):
tag_categories.create_category.return_value = category
tag_categories.update_category_name.side_effect = _update_category_name
tag_categories.serialize_category.return_value = 'serialized category'
result = api.tag_category_api.create_tag_category(
context_factory(
params={'name': 'meta', 'color': 'black'},
user=user_factory(rank=db.User.RANK_REGULAR)))
params={'name': 'meta', 'color': 'black'}, user=auth_user))
assert result == 'serialized category'
category = db.session.query(db.TagCategory).one()
assert category.name == 'meta'
assert category.color == 'black'
assert category.tag_count == 0
tag_categories.create_category.assert_called_once_with('meta', 'black')
snapshots.create.assert_called_once_with(category, auth_user)
tags.export_to_json.assert_called_once_with()

View File

@ -1,7 +1,7 @@
from unittest.mock import patch
import pytest
from szurubooru import api, db, errors
from szurubooru.func import tag_categories, tags
from szurubooru.func import tag_categories, tags, snapshots
@pytest.fixture(autouse=True)
@ -12,18 +12,19 @@ def inject_config(config_injector):
def test_deleting(user_factory, tag_category_factory, context_factory):
auth_user = user_factory(rank=db.User.RANK_REGULAR)
category = tag_category_factory(name='category')
db.session.add(tag_category_factory(name='root'))
db.session.add(tag_category_factory(name='category'))
db.session.commit()
with patch('szurubooru.func.tags.export_to_json'):
db.session.add(category)
with patch('szurubooru.func.snapshots.delete'), \
patch('szurubooru.func.tags.export_to_json'):
result = api.tag_category_api.delete_tag_category(
context_factory(
params={'version': 1},
user=user_factory(rank=db.User.RANK_REGULAR)),
context_factory(params={'version': 1}, user=auth_user),
{'category_name': 'category'})
assert result == {}
assert db.session.query(db.TagCategory).count() == 1
assert db.session.query(db.TagCategory).one().name == 'root'
snapshots.delete.assert_called_once_with(category, auth_user)
tags.export_to_json.assert_called_once_with()

View File

@ -35,7 +35,6 @@ def test_retrieving_single(
'color': 'dummy',
'usages': 0,
'default': False,
'snapshots': [],
'version': 1,
}

View File

@ -1,7 +1,7 @@
from unittest.mock import patch
import pytest
from szurubooru import api, db, errors
from szurubooru.func import tag_categories, tags
from szurubooru.func import tag_categories, tags, snapshots
def _update_category_name(category, name):
@ -20,29 +20,27 @@ def inject_config(config_injector):
def test_simple_updating(user_factory, tag_category_factory, context_factory):
auth_user = user_factory(rank=db.User.RANK_REGULAR)
category = tag_category_factory(name='name', color='black')
db.session.add(category)
db.session.commit()
with patch('szurubooru.func.tag_categories.serialize_category'), \
patch('szurubooru.func.tag_categories.update_category_name'), \
patch('szurubooru.func.tag_categories.update_category_color'), \
patch('szurubooru.func.snapshots.modify'), \
patch('szurubooru.func.tags.export_to_json'):
tag_categories.update_category_name.side_effect = _update_category_name
tag_categories.serialize_category.return_value = 'serialized category'
result = api.tag_category_api.update_tag_category(
context_factory(
params={
'name': 'changed',
'color': 'white',
'version': 1,
},
user=user_factory(rank=db.User.RANK_REGULAR)),
params={'name': 'changed', 'color': 'white', 'version': 1},
user=auth_user),
{'category_name': 'name'})
assert result == 'serialized category'
tag_categories.update_category_name.assert_called_once_with(
category, 'changed')
tag_categories.update_category_color.assert_called_once_with(
category, 'white')
snapshots.modify.assert_called_once_with(category, auth_user)
tags.export_to_json.assert_called_once_with()

View File

@ -1,7 +1,7 @@
from unittest.mock import patch
import pytest
from szurubooru import api, db, errors
from szurubooru.func import tags
from szurubooru.func import tags, snapshots
@pytest.fixture(autouse=True)
@ -10,12 +10,15 @@ def inject_config(config_injector):
def test_creating_simple_tags(tag_factory, user_factory, context_factory):
auth_user = user_factory(rank=db.User.RANK_REGULAR)
tag = tag_factory()
with patch('szurubooru.func.tags.create_tag'), \
patch('szurubooru.func.tags.get_or_create_tags_by_names'), \
patch('szurubooru.func.tags.serialize_tag'), \
patch('szurubooru.func.snapshots.create'), \
patch('szurubooru.func.tags.export_to_json'):
tags.get_or_create_tags_by_names.return_value = ([], [])
tags.create_tag.return_value = tag_factory()
tags.create_tag.return_value = tag
tags.serialize_tag.return_value = 'serialized tag'
result = api.tag_api.create_tag(
context_factory(
@ -26,10 +29,11 @@ def test_creating_simple_tags(tag_factory, user_factory, context_factory):
'suggestions': ['sug1', 'sug2'],
'implications': ['imp1', 'imp2'],
},
user=user_factory(rank=db.User.RANK_REGULAR)))
user=auth_user))
assert result == 'serialized tag'
tags.create_tag.assert_called_once_with(
['tag1', 'tag2'], 'meta', ['sug1', 'sug2'], ['imp1', 'imp2'])
snapshots.create.assert_called_once_with(tag, auth_user)
tags.export_to_json.assert_called_once_with()

View File

@ -1,7 +1,7 @@
from unittest.mock import patch
import pytest
from szurubooru import api, db, errors
from szurubooru.func import tags
from szurubooru.func import tags, snapshots
@pytest.fixture(autouse=True)
@ -10,16 +10,18 @@ def inject_config(config_injector):
def test_deleting(user_factory, tag_factory, context_factory):
db.session.add(tag_factory(names=['tag']))
auth_user = user_factory(rank=db.User.RANK_REGULAR)
tag = tag_factory(names=['tag'])
db.session.add(tag)
db.session.commit()
with patch('szurubooru.func.tags.export_to_json'):
with patch('szurubooru.func.tags.export_to_json'), \
patch('szurubooru.func.snapshots.delete'):
result = api.tag_api.delete_tag(
context_factory(
params={'version': 1},
user=user_factory(rank=db.User.RANK_REGULAR)),
context_factory(params={'version': 1}, user=auth_user),
{'tag_name': 'tag'})
assert result == {}
assert db.session.query(db.Tag).count() == 0
snapshots.delete.assert_called_once_with(tag, auth_user)
tags.export_to_json.assert_called_once_with()

View File

@ -1,7 +1,7 @@
from unittest.mock import patch
import pytest
from szurubooru import api, db, errors
from szurubooru.func import tags
from szurubooru.func import tags, snapshots
@pytest.fixture(autouse=True)
@ -10,6 +10,7 @@ def inject_config(config_injector):
def test_merging(user_factory, tag_factory, context_factory, post_factory):
auth_user = user_factory(rank=db.User.RANK_REGULAR)
source_tag = tag_factory(names=['source'])
target_tag = tag_factory(names=['target'])
db.session.add_all([source_tag, target_tag])
@ -24,6 +25,7 @@ def test_merging(user_factory, tag_factory, context_factory, post_factory):
assert target_tag.post_count == 0
with patch('szurubooru.func.tags.serialize_tag'), \
patch('szurubooru.func.tags.merge_tags'), \
patch('szurubooru.func.snapshots.merge'), \
patch('szurubooru.func.tags.export_to_json'):
api.tag_api.merge_tags(
context_factory(
@ -33,8 +35,10 @@ def test_merging(user_factory, tag_factory, context_factory, post_factory):
'remove': 'source',
'mergeTo': 'target',
},
user=user_factory(rank=db.User.RANK_REGULAR)))
user=auth_user))
tags.merge_tags.called_once_with(source_tag, target_tag)
snapshots.merge.assert_called_once_with(
source_tag, target_tag, auth_user)
tags.export_to_json.assert_called_once_with()

View File

@ -1,7 +1,7 @@
from unittest.mock import patch
import pytest
from szurubooru import api, db, errors
from szurubooru.func import tags
from szurubooru.func import tags, snapshots
@pytest.fixture(autouse=True)
@ -31,6 +31,7 @@ def test_simple_updating(user_factory, tag_factory, context_factory):
patch('szurubooru.func.tags.update_tag_suggestions'), \
patch('szurubooru.func.tags.update_tag_implications'), \
patch('szurubooru.func.tags.serialize_tag'), \
patch('szurubooru.func.snapshots.modify'), \
patch('szurubooru.func.tags.export_to_json'):
tags.get_or_create_tags_by_names.return_value = ([], [])
tags.serialize_tag.return_value = 'serialized tag'
@ -57,6 +58,8 @@ def test_simple_updating(user_factory, tag_factory, context_factory):
tag, ['imp1', 'imp2'])
tags.serialize_tag.assert_called_once_with(
tag, options=None)
snapshots.modify.assert_called_once_with(tag, auth_user)
tags.export_to_json.assert_called_once_with()
@pytest.mark.parametrize(

View File

@ -145,8 +145,8 @@ def test_cascade_deletions(post_factory, user_factory, comment_factory):
snapshot.user = user
snapshot.creation_time = datetime(1997, 1, 1)
snapshot.resource_type = '-'
snapshot.resource_id = 1
snapshot.resource_repr = '-'
snapshot.resource_pkey = 1
snapshot.resource_name = '-'
snapshot.operation = '-'
db.session.add_all([user, post, comment, snapshot])

View File

@ -0,0 +1,275 @@
import pytest
from szurubooru.func import diff
@pytest.mark.parametrize('old,new,expected', [
(
[], [], None,
),
(
[],
['added'],
{'type': 'list change', 'added': ['added'], 'removed': []},
),
(
['removed'],
[],
{'type': 'list change', 'added': [], 'removed': ['removed']},
),
(
['untouched'],
['untouched'],
None,
),
(
['untouched'],
['untouched', 'added'],
{'type': 'list change', 'added': ['added'], 'removed': []},
),
(
['untouched', 'removed'],
['untouched'],
{'type': 'list change', 'added': [], 'removed': ['removed']},
),
])
def test_get_list_diff(old, new, expected):
assert diff.get_list_diff(old, new) == expected
@pytest.mark.parametrize('old,new,expected', [
(
{}, {}, None,
),
(
{'removed key': 'removed value'},
{},
{
'type': 'object change',
'value':
{
'removed key':
{
'type': 'deleted property',
'value': 'removed value',
},
},
},
),
(
{},
{'added key': 'added value'},
{
'type': 'object change',
'value':
{
'added key':
{
'type': 'added property',
'value': 'added value',
},
},
},
),
(
{'key': 'old value'},
{'key': 'new value'},
{
'type': 'object change',
'value':
{
'key':
{
'type': 'primitive change',
'old-value': 'old value',
'new-value': 'new value',
},
},
},
),
(
{'key': 'untouched'},
{'key': 'untouched'},
None,
),
(
{'key': 'untouched', 'removed key': 'removed value'},
{'key': 'untouched'},
{
'type': 'object change',
'value':
{
'removed key':
{
'type': 'deleted property',
'value': 'removed value',
},
},
},
),
(
{'key': 'untouched'},
{'key': 'untouched', 'added key': 'added value'},
{
'type': 'object change',
'value':
{
'added key':
{
'type': 'added property',
'value': 'added value',
},
},
},
),
(
{'key': 'untouched', 'changed key': 'old value'},
{'key': 'untouched', 'changed key': 'new value'},
{
'type': 'object change',
'value':
{
'changed key':
{
'type': 'primitive change',
'old-value': 'old value',
'new-value': 'new value',
},
},
},
),
(
{'key': {'subkey': 'old value'}},
{'key': {'subkey': 'new value'}},
{
'type': 'object change',
'value':
{
'key':
{
'type': 'object change',
'value':
{
'subkey':
{
'type': 'primitive change',
'old-value': 'old value',
'new-value': 'new value',
},
},
},
},
},
),
(
{'key': {}},
{'key': {'subkey': 'removed value'}},
{
'type': 'object change',
'value':
{
'key':
{
'type': 'object change',
'value':
{
'subkey':
{
'type': 'added property',
'value': 'removed value',
},
},
},
},
},
),
(
{'key': {'subkey': 'removed value'}},
{'key': {}},
{
'type': 'object change',
'value':
{
'key':
{
'type': 'object change',
'value':
{
'subkey':
{
'type': 'deleted property',
'value': 'removed value',
},
},
},
},
},
),
(
{'key': ['old value']},
{'key': ['new value']},
{
'type': 'object change',
'value':
{
'key':
{
'type': 'list change',
'added': ['new value'],
'removed': ['old value'],
},
},
},
),
(
{'key': []},
{'key': ['new value']},
{
'type': 'object change',
'value':
{
'key':
{
'type': 'list change',
'added': ['new value'],
'removed': [],
},
},
},
),
(
{'key': ['removed value']},
{'key': []},
{
'type': 'object change',
'value':
{
'key':
{
'type': 'list change',
'added': [],
'removed': ['removed value'],
},
},
},
),
])
def test_get_dict_diff(old, new, expected):
assert diff.get_dict_diff(old, new) == expected

View File

@ -3,8 +3,7 @@ from unittest.mock import patch
from datetime import datetime
import pytest
from szurubooru import db
from szurubooru.func import (
posts, users, comments, snapshots, tags, images, files, util)
from szurubooru.func import (posts, users, comments, tags, images, files, util)
@pytest.mark.parametrize('input_mime_type,expected_url', [
@ -78,14 +77,12 @@ def test_serialize_post(
config_injector({'data_url': 'http://example.com/'})
with patch('szurubooru.func.comments.serialize_comment'), \
patch('szurubooru.func.users.serialize_micro_user'), \
patch('szurubooru.func.posts.files.has'), \
patch('szurubooru.func.snapshots.get_serialized_history'):
patch('szurubooru.func.posts.files.has'):
files.has.return_value = True
users.serialize_micro_user.side_effect \
= lambda user, auth_user: user.name
comments.serialize_comment.side_effect \
= lambda comment, auth_user: comment.user.name
snapshots.get_serialized_history.return_value = 'snapshot history'
auth_user = user_factory(name='auth user')
post = db.Post()
@ -178,7 +175,6 @@ def test_serialize_post(
'favoritedBy': ['fav1'],
'hasCustomThumbnail': True,
'mimeType': 'image/jpeg',
'snapshots': 'snapshot history',
'comments': ['commenter1', 'commenter2'],
}

View File

@ -1,10 +1,52 @@
from unittest.mock import patch
from datetime import datetime
import pytest
from szurubooru import db
from szurubooru.func import snapshots
from szurubooru.func import snapshots, users
def test_serializing_post(post_factory, user_factory, tag_factory):
def test_get_tag_category_snapshot(tag_category_factory):
category = tag_category_factory(name='name', color='color')
assert snapshots.get_tag_category_snapshot(category) == {
'name': 'name',
'color': 'color',
'default': False,
}
category.default = True
assert snapshots.get_tag_category_snapshot(category) == {
'name': 'name',
'color': 'color',
'default': True,
}
def test_get_tag_snapshot(tag_factory, tag_category_factory):
category = tag_category_factory(name='dummy')
tag = tag_factory(names=['main_name', 'alias'], category=category)
assert snapshots.get_tag_snapshot(tag) == {
'names': ['main_name', 'alias'],
'category': 'dummy',
'suggestions': [],
'implications': [],
}
tag = tag_factory(names=['main_name', 'alias'], category=category)
imp1 = tag_factory(names=['imp1_main_name', 'imp1_alias'])
imp2 = tag_factory(names=['imp2_main_name', 'imp2_alias'])
sug1 = tag_factory(names=['sug1_main_name', 'sug1_alias'])
sug2 = tag_factory(names=['sug2_main_name', 'sug2_alias'])
db.session.add_all([imp1, imp2, sug1, sug2])
tag.implications = [imp1, imp2]
tag.suggestions = [sug1, sug2]
db.session.flush()
assert snapshots.get_tag_snapshot(tag) == {
'names': ['main_name', 'alias'],
'category': 'dummy',
'implications': ['imp1_main_name', 'imp2_main_name'],
'suggestions': ['sug1_main_name', 'sug2_main_name'],
}
def test_get_post_snapshot(post_factory, user_factory, tag_factory):
user = user_factory(name='dummy-user')
tag1 = tag_factory(names=['dummy-tag1'])
tag2 = tag_factory(names=['dummy-tag2'])
@ -50,12 +92,10 @@ def test_serializing_post(post_factory, user_factory, tag_factory):
'checksum': 'deadbeef',
'featured': True,
'flags': [],
'notes': [
{
'polygon': [(1, 1), (200, 1), (200, 200), (1, 200)],
'text': 'some text',
}
],
'notes': [{
'polygon': [[1, 1], [200, 1], [200, 200], [1, 200]],
'text': 'some text',
}],
'relations': [2, 3],
'safety': 'safe',
'source': 'example.com',
@ -63,243 +103,103 @@ def test_serializing_post(post_factory, user_factory, tag_factory):
}
def test_serializing_tag(tag_factory, tag_category_factory):
category = tag_category_factory(name='dummy')
tag = tag_factory(names=['main_name', 'alias'], category=category)
assert snapshots.get_tag_snapshot(tag) == {
'names': ['main_name', 'alias'],
'category': 'dummy',
'suggestions': [],
'implications': [],
}
tag = tag_factory(names=['main_name', 'alias'], category=category)
imp1 = tag_factory(names=['imp1_main_name', 'imp1_alias'])
imp2 = tag_factory(names=['imp2_main_name', 'imp2_alias'])
sug1 = tag_factory(names=['sug1_main_name', 'sug1_alias'])
sug2 = tag_factory(names=['sug2_main_name', 'sug2_alias'])
db.session.add_all([imp1, imp2, sug1, sug2])
tag.implications = [imp1, imp2]
tag.suggestions = [sug1, sug2]
db.session.flush()
assert snapshots.get_tag_snapshot(tag) == {
'names': ['main_name', 'alias'],
'category': 'dummy',
'implications': ['imp1_main_name', 'imp2_main_name'],
'suggestions': ['sug1_main_name', 'sug2_main_name'],
}
def test_serialize_snapshot(user_factory):
auth_user = user_factory()
snapshot = db.Snapshot()
snapshot.operation = snapshot.OPERATION_CREATED
snapshot.resource_type = 'type'
snapshot.resource_name = 'id'
snapshot.user = user_factory(name='issuer')
snapshot.data = {'complex': list('object')}
snapshot.creation_time = datetime(1997, 1, 1)
with patch('szurubooru.func.users.serialize_micro_user'):
users.serialize_micro_user.return_value = 'mocked'
assert snapshots.serialize_snapshot(snapshot, auth_user) == {
'operation': 'created',
'type': 'type',
'id': 'id',
'user': 'mocked',
'data': {'complex': list('object')},
'time': datetime(1997, 1, 1),
}
def test_serializing_tag_category(tag_category_factory):
category = tag_category_factory(name='name', color='color')
assert snapshots.get_tag_category_snapshot(category) == {
'name': 'name',
'color': 'color',
'default': False,
}
category.default = True
assert snapshots.get_tag_category_snapshot(category) == {
'name': 'name',
'color': 'color',
'default': True,
}
def test_merging_modification_to_creation(tag_factory, user_factory):
def test_create(tag_factory, user_factory):
tag = tag_factory(names=['dummy'])
user = user_factory()
db.session.add_all([tag, user])
db.session.add(tag)
db.session.flush()
snapshots.save_entity_creation(tag, user)
tag.names = [db.TagName('changed')]
snapshots.save_entity_modification(tag, user)
with patch('szurubooru.func.snapshots.get_tag_snapshot'):
snapshots.get_tag_snapshot.return_value = 'mocked'
snapshots.create(tag, user_factory())
results = db.session.query(db.Snapshot).all()
assert len(results) == 1
assert results[0].operation == db.Snapshot.OPERATION_CREATED
assert results[0].data['names'] == ['changed']
assert results[0].data == 'mocked'
def test_merging_modifications(fake_datetime, tag_factory, user_factory):
tag = tag_factory(names=['dummy'])
def test_modify_saves_non_empty_diffs(post_factory, user_factory):
if 'sqlite' in db.sessionmaker.kw['bind'].driver:
pytest.xfail(
'SQLite doesn\'t support transaction isolation, '
'which is required to retrieve original entity')
post = post_factory()
post.notes = [db.PostNote(polygon=[(0, 0), (0, 1), (1, 1)], text='old')]
user = user_factory()
db.session.add_all([tag, user])
db.session.add_all([post, user])
db.session.commit()
post.source = 'new source'
post.notes = [db.PostNote(polygon=[(0, 0), (0, 1), (1, 1)], text='new')]
db.session.flush()
with fake_datetime('13:00:00'):
snapshots.save_entity_creation(tag, user)
tag.names = [db.TagName('changed')]
with fake_datetime('14:00:00'):
snapshots.save_entity_modification(tag, user)
tag.names = [db.TagName('changed again')]
with fake_datetime('14:00:01'):
snapshots.save_entity_modification(tag, user)
results = db.session.query(db.Snapshot).all()
assert len(results) == 2
assert results[0].operation == db.Snapshot.OPERATION_CREATED
assert results[1].operation == db.Snapshot.OPERATION_MODIFIED
assert results[0].data['names'] == ['dummy']
assert results[1].data['names'] == ['changed again']
def test_not_adding_snapshot_if_data_doesnt_change(
fake_datetime, tag_factory, user_factory):
tag = tag_factory(names=['dummy'])
user = user_factory()
db.session.add_all([tag, user])
db.session.flush()
with fake_datetime('13:00:00'):
snapshots.save_entity_creation(tag, user)
with fake_datetime('14:00:00'):
snapshots.save_entity_modification(tag, user)
snapshots.modify(post, user)
results = db.session.query(db.Snapshot).all()
assert len(results) == 1
assert results[0].operation == db.Snapshot.OPERATION_CREATED
assert results[0].data['names'] == ['dummy']
def test_not_merging_due_to_time_difference(
fake_datetime, tag_factory, user_factory):
tag = tag_factory(names=['dummy'])
user = user_factory()
db.session.add_all([tag, user])
db.session.flush()
with fake_datetime('13:00:00'):
snapshots.save_entity_creation(tag, user)
tag.names = [db.TagName('changed')]
with fake_datetime('13:10:01'):
snapshots.save_entity_modification(tag, user)
assert db.session.query(db.Snapshot).count() == 2
def test_not_merging_operations_by_different_users(
fake_datetime, tag_factory, user_factory):
tag = tag_factory(names=['dummy'])
user1, user2 = [user_factory(), user_factory()]
db.session.add_all([tag, user1, user2])
db.session.flush()
with fake_datetime('13:00:00'):
snapshots.save_entity_creation(tag, user1)
tag.names = [db.TagName('changed')]
snapshots.save_entity_modification(tag, user2)
assert db.session.query(db.Snapshot).count() == 2
def test_merging_resets_merging_time_window(
fake_datetime, tag_factory, user_factory):
tag = tag_factory(names=['dummy'])
user = user_factory()
db.session.add_all([tag, user])
db.session.flush()
with fake_datetime('13:00:00'):
snapshots.save_entity_creation(tag, user)
tag.names = [db.TagName('changed')]
with fake_datetime('13:09:59'):
snapshots.save_entity_modification(tag, user)
tag.names = [db.TagName('changed again')]
with fake_datetime('13:19:59'):
snapshots.save_entity_modification(tag, user)
results = db.session.query(db.Snapshot).all()
assert len(results) == 1
assert results[0].data['names'] == ['changed again']
@pytest.mark.parametrize(
'initial_operation',
[snapshots.save_entity_creation, snapshots.save_entity_modification])
def test_merging_deletion_to_modification_or_creation(
fake_datetime,
tag_factory,
tag_category_factory,
user_factory,
initial_operation):
category = tag_category_factory(name='dummy')
tag = tag_factory(names=['dummy'], category=category)
user = user_factory()
db.session.add_all([tag, user])
db.session.flush()
with fake_datetime('13:00:00'):
initial_operation(tag, user)
tag.names = [db.TagName('changed')]
with fake_datetime('14:00:00'):
snapshots.save_entity_modification(tag, user)
tag.names = [db.TagName('changed again')]
with fake_datetime('14:00:01'):
snapshots.save_entity_deletion(tag, user)
assert db.session.query(db.Snapshot).count() == 2
results = db.session \
.query(db.Snapshot) \
.order_by(db.Snapshot.snapshot_id.asc()) \
.all()
assert results[1].operation == db.Snapshot.OPERATION_DELETED
assert results[1].data == {
'names': ['changed again'],
'category': 'dummy',
'suggestions': [],
'implications': [],
assert results[0].data == {
'type': 'object change',
'value': {
'source': {
'type': 'primitive change',
'old-value': None,
'new-value': 'new source',
},
'notes': {
'type': 'list change',
'removed': [
{'polygon': [[0, 0], [0, 1], [1, 1]], 'text': 'old'}],
'added': [
{'polygon': [[0, 0], [0, 1], [1, 1]], 'text': 'new'}],
},
},
}
def test_merging_deletion_all_the_way_deletes_all_snapshots(
fake_datetime, tag_factory, user_factory):
def test_modify_doesnt_save_empty_diffs(tag_factory, user_factory):
tag = tag_factory(names=['dummy'])
user = user_factory()
db.session.add_all([tag, user])
db.session.flush()
with fake_datetime('13:00:00'):
snapshots.save_entity_creation(tag, user)
tag.names = [db.TagName('changed')]
with fake_datetime('13:00:01'):
snapshots.save_entity_modification(tag, user)
tag.names = [db.TagName('changed again')]
with fake_datetime('13:00:02'):
snapshots.save_entity_deletion(tag, user)
db.session.commit()
snapshots.modify(tag, user)
assert db.session.query(db.Snapshot).count() == 0
def test_get_serialized_history(
fake_datetime, tag_factory, tag_category_factory, user_factory):
category = tag_category_factory(name='dummy')
tag = tag_factory(names=['dummy'], category=category)
user = user_factory(name='the-user')
db.session.add_all([tag, user])
def test_delete(tag_factory, user_factory):
tag = tag_factory(names=['dummy'])
db.session.add(tag)
db.session.flush()
with fake_datetime('2016-04-19 13:00:00'):
snapshots.save_entity_creation(tag, user)
tag.names = [db.TagName('changed')]
with patch('szurubooru.func.snapshots.get_tag_snapshot'):
snapshots.get_tag_snapshot.return_value = 'mocked'
snapshots.delete(tag, user_factory())
results = db.session.query(db.Snapshot).all()
assert len(results) == 1
assert results[0].operation == db.Snapshot.OPERATION_DELETED
assert results[0].data == 'mocked'
def test_merge(tag_factory, user_factory):
source_tag = tag_factory(names=['source'])
target_tag = tag_factory(names=['target'])
db.session.add_all([source_tag, target_tag])
db.session.flush()
with fake_datetime('2016-04-19 13:10:01'):
snapshots.save_entity_modification(tag, user)
assert snapshots.get_serialized_history(tag) == [
{
'operation': 'modified',
'time': datetime(2016, 4, 19, 13, 10, 1),
'type': 'tag',
'id': 'changed',
'user': 'the-user',
'data': {
'names': ['changed'],
'category': 'dummy',
'suggestions': [],
'implications': [],
},
'earlier-data': {
'names': ['dummy'],
'category': 'dummy',
'suggestions': [],
'implications': [],
},
},
{
'operation': 'created',
'time': datetime(2016, 4, 19, 13, 0, 0),
'type': 'tag',
'id': 'dummy',
'user': 'the-user',
'data': {
'names': ['dummy'],
'category': 'dummy',
'suggestions': [],
'implications': [],
},
'earlier-data': None,
},
]
snapshots.merge(source_tag, target_tag, user_factory())
result = db.session.query(db.Snapshot).one()
assert result.operation == db.Snapshot.OPERATION_MERGED
assert result.data == ['tag', 'target']

View File

@ -1,7 +1,7 @@
from unittest.mock import patch
import pytest
from szurubooru import db
from szurubooru.func import tag_categories, cache, snapshots
from szurubooru.func import tag_categories, cache
@pytest.fixture(autouse=True)
@ -14,24 +14,21 @@ def test_serialize_category_when_empty():
def test_serialize_category(tag_category_factory, tag_factory):
with patch('szurubooru.func.snapshots.get_serialized_history'):
snapshots.get_serialized_history.return_value = 'snapshot history'
category = tag_category_factory(name='name', color='color')
category.category_id = 1
category.default = True
tag1 = tag_factory(category=category)
tag2 = tag_factory(category=category)
db.session.add_all([category, tag1, tag2])
db.session.flush()
result = tag_categories.serialize_category(category)
assert result == {
'name': 'name',
'color': 'color',
'default': True,
'version': 1,
'snapshots': 'snapshot history',
'usages': 2,
}
category = tag_category_factory(name='name', color='color')
category.category_id = 1
category.default = True
tag1 = tag_factory(category=category)
tag2 = tag_factory(category=category)
db.session.add_all([category, tag1, tag2])
db.session.flush()
result = tag_categories.serialize_category(category)
assert result == {
'name': 'name',
'color': 'color',
'default': True,
'version': 1,
'usages': 2,
}
def test_create_category_when_first():

View File

@ -4,7 +4,7 @@ from unittest.mock import patch
from datetime import datetime
import pytest
from szurubooru import db
from szurubooru.func import tags, tag_categories, cache, snapshots
from szurubooru.func import tags, tag_categories, cache
@pytest.fixture(autouse=True)
@ -44,39 +44,36 @@ def test_serialize_tag_when_empty():
def test_serialize_tag(post_factory, tag_factory, tag_category_factory):
with patch('szurubooru.func.snapshots.get_serialized_history'):
snapshots.get_serialized_history.return_value = 'snapshot history'
tag = tag_factory(
names=['tag1', 'tag2'],
category=tag_category_factory(name='cat'))
tag.tag_id = 1
tag.description = 'description'
tag.suggestions = [
tag_factory(names=['sug1']), tag_factory(names=['sug2'])]
tag.implications = [
tag_factory(names=['impl1']), tag_factory(names=['impl2'])]
tag.last_edit_time = datetime(1998, 1, 1)
post1 = post_factory()
post2 = post_factory()
post1.tags = [tag]
post2.tags = [tag]
db.session.add_all([tag, post1, post2])
db.session.flush()
result = tags.serialize_tag(tag)
result['suggestions'].sort()
result['implications'].sort()
assert result == {
'names': ['tag1', 'tag2'],
'version': 1,
'category': 'cat',
'creationTime': datetime(1996, 1, 1, 0, 0),
'lastEditTime': datetime(1998, 1, 1, 0, 0),
'description': 'description',
'suggestions': ['sug1', 'sug2'],
'implications': ['impl1', 'impl2'],
'usages': 2,
'snapshots': 'snapshot history',
}
tag = tag_factory(
names=['tag1', 'tag2'],
category=tag_category_factory(name='cat'))
tag.tag_id = 1
tag.description = 'description'
tag.suggestions = [
tag_factory(names=['sug1']), tag_factory(names=['sug2'])]
tag.implications = [
tag_factory(names=['impl1']), tag_factory(names=['impl2'])]
tag.last_edit_time = datetime(1998, 1, 1)
post1 = post_factory()
post2 = post_factory()
post1.tags = [tag]
post2.tags = [tag]
db.session.add_all([tag, post1, post2])
db.session.flush()
result = tags.serialize_tag(tag)
result['suggestions'].sort()
result['implications'].sort()
assert result == {
'names': ['tag1', 'tag2'],
'version': 1,
'category': 'cat',
'creationTime': datetime(1996, 1, 1, 0, 0),
'lastEditTime': datetime(1998, 1, 1, 0, 0),
'description': 'description',
'suggestions': ['sug1', 'sug2'],
'implications': ['impl1', 'impl2'],
'usages': 2,
}
def test_export_to_json(