diff --git a/server/mergin/auth/commands.py b/server/mergin/auth/commands.py index f9ac636e..af6ed6af 100644 --- a/server/mergin/auth/commands.py +++ b/server/mergin/auth/commands.py @@ -8,7 +8,7 @@ from sqlalchemy import or_, func from ..app import db -from .models import User, UserProfile +from .models import User from ..commands import normalize_input @@ -36,7 +36,6 @@ def create(username, password, is_admin, email): # pylint: disable=W0612 sys.exit(1) user = User(username=username, passwd=password, is_admin=is_admin, email=email) - user.profile = UserProfile() user.active = True db.session.add(user) db.session.commit() diff --git a/server/mergin/auth/controller.py b/server/mergin/auth/controller.py index a4d584c8..06859255 100644 --- a/server/mergin/auth/controller.py +++ b/server/mergin/auth/controller.py @@ -24,7 +24,7 @@ CANNOT_EDIT_PROFILE_MSG, ) from .bearer import encode_token -from .models import User, LoginHistory, UserProfile +from .models import User, LoginHistory from .schemas import UserSchema, UserSearchSchema, UserProfileSchema, UserInfoSchema from .forms import ( LoginForm, @@ -65,7 +65,7 @@ def user_profile(user, return_all=True): { "email": user.email, "storage_limit": data["storage"], # duplicate - we should remove it - "receive_notifications": user.profile.receive_notifications, + "receive_notifications": user.receive_notifications, "verified_email": user.verified_email, "tier": "free", "registration_date": user.registration_date, @@ -369,7 +369,6 @@ def update_user_profile(): # pylint: disable=W0613,W0612 return jsonify(form.errors), 400 current_user.verified_email = False - form.update_obj(current_user.profile) form.update_obj(current_user) db.session.add(current_user) db.session.commit() @@ -483,7 +482,7 @@ def get_paginated_users( :rtype: Dict[str: List[User], str: Integer] """ - users = User.query.join(UserProfile).filter( + users = User.query.filter( is_(User.username.ilike("deleted_%"), False) | is_(User.active, True) ) @@ -491,14 +490,16 @@ def get_paginated_users( users = users.filter( User.username.ilike(f"%{like}%") | User.email.ilike(f"%{like}%") - | UserProfile.first_name.ilike(f"%{like}%") - | UserProfile.last_name.ilike(f"%{like}%") + | User.first_name.ilike(f"%{like}%") + | User.last_name.ilike(f"%{like}%") ) if descending and order_by: users = users.order_by(desc(User.__table__.c[order_by])) elif not descending and order_by: users = users.order_by(asc(User.__table__.c[order_by])) + else: + users = users.order_by(asc(User.id)) paginate = users.paginate(page=page, per_page=per_page) result = paginate.items @@ -561,7 +562,7 @@ def create_user(): workspace_role=request.json["role"], ) - if user.profile.receive_notifications: + if user.receive_notifications: send_confirmation_email( current_app, user, diff --git a/server/mergin/auth/models.py b/server/mergin/auth/models.py index 470b934b..760ab740 100644 --- a/server/mergin/auth/models.py +++ b/server/mergin/auth/models.py @@ -19,12 +19,9 @@ class User(db.Model): id = db.Column(db.Integer, primary_key=True) - username = db.Column(db.String(80), info={"label": "Username"}) email = db.Column(db.String(120)) - passwd = db.Column(db.String(80), info={"label": "Password"}) # salted + hashed - active = db.Column(db.Boolean, default=True) is_admin = db.Column(db.Boolean) verified_email = db.Column(db.Boolean, default=False) @@ -35,8 +32,12 @@ class User(db.Model): info={"label": "Date of creation of user account"}, default=datetime.datetime.utcnow, ) - last_signed_in = db.Column(db.DateTime(), nullable=True) + receive_notifications = db.Column( + db.Boolean, default=True, nullable=False, index=True + ) + first_name = db.Column(db.String(256), nullable=True) + last_name = db.Column(db.String(256), nullable=True) __table_args__ = ( db.Index("ix_user_username", func.lower(username), unique=True), @@ -187,8 +188,8 @@ def anonymize(self): self.username = del_str self.email = None self.passwd = None - self.profile.first_name = None - self.profile.last_name = None + self.first_name = None + self.last_name = None db.session.commit() @classmethod @@ -240,11 +241,19 @@ def create( cls, username: str, email: str, password: str, notifications: bool = True ) -> User: user = cls(username.strip(), email.strip(), password, False) - user.profile = UserProfile(receive_notifications=notifications) + user.receive_notifications = notifications db.session.add(user) db.session.commit() return user + @property + def profile(self) -> "User": + """Compatibility shim: profile fields are now on User directly.""" + return self + + def name(self) -> Optional[str]: + return f'{self.first_name if self.first_name else ""} {self.last_name if self.last_name else ""}'.strip() + @property def can_edit_profile(self) -> bool: """Flag if we allow user to edit their email and name""" @@ -252,26 +261,6 @@ def can_edit_profile(self) -> bool: return self.passwd is not None and self.active -class UserProfile(db.Model): - user_id = db.Column( - db.Integer, db.ForeignKey("user.id", ondelete="CASCADE"), primary_key=True - ) - receive_notifications = db.Column(db.Boolean, default=True, index=True) - first_name = db.Column(db.String(256), nullable=True, info={"label": "First name"}) - last_name = db.Column(db.String(256), nullable=True, info={"label": "Last name"}) - - user = db.relationship( - "User", - uselist=False, - backref=db.backref( - "profile", single_parent=True, uselist=False, cascade="all,delete" - ), - ) - - def name(self) -> Optional[str]: - return f'{self.first_name if self.first_name else ""} {self.last_name if self.last_name else ""}'.strip() - - class LoginHistory(db.Model): id = db.Column(db.Integer, primary_key=True) timestamp = db.Column(db.DateTime(), default=datetime.datetime.utcnow, index=True) diff --git a/server/mergin/auth/schemas.py b/server/mergin/auth/schemas.py index 52ed01f6..0bb45c3e 100644 --- a/server/mergin/auth/schemas.py +++ b/server/mergin/auth/schemas.py @@ -5,7 +5,7 @@ from flask import current_app from marshmallow import fields -from .models import User, UserProfile +from .models import User from ..app import DateTimeWithZ, ma @@ -20,13 +20,13 @@ class UserProfileSchema(ma.SQLAlchemyAutoSchema): def get_storage(self, obj): # DEPRECATED functionality - kept for the backward-compatibility - ws = current_app.ws_handler.get_by_name(obj.user.username) + ws = current_app.ws_handler.get_by_name(obj.username) if ws: return ws.storage def get_disk_usage(self, obj): # DEPRECATED functionality - kept for the backward-compatibility - ws = current_app.ws_handler.get_by_name(obj.user.username) + ws = current_app.ws_handler.get_by_name(obj.username) if ws: return ws.disk_usage() @@ -34,21 +34,30 @@ def _has_project(self, obj): # DEPRECATED functionality - kept for the backward-compatibility from ..sync.models import ProjectUser, Project - ws = current_app.ws_handler.get_by_name(obj.user.username) + ws = current_app.ws_handler.get_by_name(obj.username) if ws: projects_count = ( Project.query.join(ProjectUser) - .filter(Project.creator_id == obj.user.id) + .filter(Project.creator_id == obj.id) .filter(Project.removed_at.is_(None)) .filter(Project.workspace_id == ws.id) - .filter(ProjectUser.user_id == obj.user.id) + .filter(ProjectUser.user_id == obj.id) .count() ) return projects_count > 0 return False class Meta: - model = UserProfile + model = User + fields = ( + "receive_notifications", + "first_name", + "last_name", + "name", + "storage", + "disk_usage", + "has_project", + ) load_instance = True @@ -81,7 +90,7 @@ class UserSearchSchema(ma.SQLAlchemyAutoSchema): name = fields.Method("_name", dump_only=True) def _name(self, obj): - return obj.profile.name() + return obj.name() class Meta: model = User @@ -97,11 +106,11 @@ class Meta: class UserInfoSchema(ma.SQLAlchemyAutoSchema): """User schema with full information""" - first_name = fields.String(attribute="profile.first_name") - last_name = fields.String(attribute="profile.last_name") - receive_notifications = fields.Boolean(attribute="profile.receive_notifications") + first_name = fields.String() + last_name = fields.String() + receive_notifications = fields.Boolean() registration_date = DateTimeWithZ(attribute="registration_date") - name = fields.Function(lambda obj: obj.profile.name()) + name = fields.Function(lambda obj: obj.name()) can_edit_profile = fields.Boolean(attribute="can_edit_profile") class Meta: diff --git a/server/mergin/commands.py b/server/mergin/commands.py index dbde7b7f..464890bf 100644 --- a/server/mergin/commands.py +++ b/server/mergin/commands.py @@ -202,7 +202,7 @@ def init_db(): ) def init(email: str, recreate: bool): """Initialize database if does not exist or -r is provided. Perform check of server configuration. Send statistics, respecting your setup.""" - from .auth.models import User, UserProfile + from .auth.models import User inspect_engine = inspect(db.engine) tables = inspect_engine.get_table_names() @@ -221,7 +221,6 @@ def init(email: str, recreate: bool): password_chars = string.ascii_letters + string.digits password = "".join(random.choice(password_chars) for i in range(12)) user = User(username=username, passwd=password, email=email, is_admin=True) - user.profile = UserProfile() user.active = True db.session.add(user) db.session.commit() diff --git a/server/mergin/sync/models.py b/server/mergin/sync/models.py index f1c6cbfd..5f4aa967 100644 --- a/server/mergin/sync/models.py +++ b/server/mergin/sync/models.py @@ -2,12 +2,14 @@ # # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial from __future__ import annotations +from contextlib import contextmanager import json import logging import os +import threading import time import uuid -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from enum import Enum from typing import Optional, List, Dict, Set, Tuple from dataclasses import dataclass, asdict @@ -17,11 +19,11 @@ from flask_login import current_user from pygeodiff import GeoDiff from sqlalchemy import text, null, desc, nullslast, tuple_ -from sqlalchemy.dialects.postgresql import ARRAY, BIGINT, UUID, JSONB, ENUM +from sqlalchemy.dialects.postgresql import ARRAY, BIGINT, UUID, JSONB, ENUM, insert from sqlalchemy.types import String from sqlalchemy.ext.hybrid import hybrid_property from pygeodiff.geodifflib import GeoDiffLibError, GeoDiffLibConflictError -from flask import current_app +from flask import Flask, current_app from .files import ( DeltaChangeMerged, @@ -44,7 +46,6 @@ LOG_BASE, Checkpoint, generate_checksum, - Toucher, get_chunk_location, get_project_path, is_supported_type, @@ -1805,6 +1806,11 @@ class Upload(db.Model): db.Integer, db.ForeignKey("user.id", ondelete="CASCADE"), nullable=True ) created = db.Column(db.DateTime, default=datetime.utcnow) + # last ping time to determine if upload is still active + last_ping = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) + transaction_id = db.Column( + UUID(as_uuid=True), unique=True, nullable=False, index=True + ) user = db.relationship("User") project = db.relationship( @@ -1822,28 +1828,173 @@ def __init__(self, project: Project, version: int, changes: dict, user_id: int): self.version = version self.changes = ChangesSchema().dump(changes) self.user_id = user_id + self.transaction_id = str(uuid.uuid4()) + + @classmethod + def create_upload( + cls, project_id: str, version: int, changes: dict, user_id: int + ) -> Upload | None: + """Create upload session, it can either create a new record or handover an existing one but with new transaction id + Old transaction folder is removed and new one is created. + """ + now = datetime.now(timezone.utc).replace(tzinfo=None) + expiration = current_app.config["LOCKFILE_EXPIRATION"] + new_tx_id = str(uuid.uuid4()) + + # CTE captures the existing row's transaction_id BEFORE the upsert (pre-statement snapshot) + # NULL in RETURNING means fresh INSERT, non-NULL means we took over a stale upload + existing_cte = ( + db.select(Upload.transaction_id) + .where( + Upload.project_id == project_id, + Upload.version == version, + ) + .cte("existing") + ) + + stmt = ( + insert(Upload) + .values( + id=str(uuid.uuid4()), + transaction_id=new_tx_id, + project_id=project_id, + version=version, + user_id=user_id, + last_ping=now, + changes=ChangesSchema().dump(changes), + ) + .add_cte(existing_cte) + ) + + upsert_stmt = stmt.on_conflict_do_update( + constraint="uq_upload_project_id", + set_={ + "transaction_id": new_tx_id, + "user_id": user_id, + "last_ping": now, + "changes": ChangesSchema().dump(changes), + }, + # ONLY update if the existing row is stale + where=(Upload.last_ping < (now - timedelta(seconds=expiration))), + ) + + upsert_stmt = upsert_stmt.returning( + Upload, + db.select(existing_cte.c.transaction_id) + .scalar_subquery() + .label("old_transaction_id"), + ) + + result = db.session.execute(upsert_stmt).fetchone() + db.session.commit() + + # if nothing returned, it means the WHERE clause failed (active upload) + if not result: + return + + upload = result.Upload + old_transaction_id = result.old_transaction_id + + try: + os.makedirs(upload.upload_dir) + + # old_transaction_id is NULL on fresh INSERT, set to old UUID when taking over a stale upload + if old_transaction_id: + upload.project.sync_failed( + "", "push_lost", "Push artefact removed by subsequent push", user_id + ) + if os.path.exists( + os.path.join( + upload.project.storage.project_dir, + "tmp", + str(old_transaction_id), + ) + ): + move_to_tmp( + os.path.join( + upload.project.storage.project_dir, + "tmp", + str(old_transaction_id), + ), + str(old_transaction_id), + ) + except OSError as err: + # filesystem setup failed after the DB row was already committed. + # delete the row immediately so the next attempt isn't blocked until expiration. + db.session.delete(upload) + db.session.commit() + logging.error(f"Failed to create upload directory: {err}") + return + + return upload @property def upload_dir(self): - return os.path.join(self.project.storage.project_dir, "tmp", self.id) + return os.path.join( + self.project.storage.project_dir, "tmp", str(self.transaction_id) + ) - @property - def lockfile(self): - return os.path.join(self.upload_dir, "lockfile") - - def is_active(self): - """Check if upload is still active because there was a ping (lockfile update) from underlying process""" - return os.path.exists(self.lockfile) and ( - time.time() - os.path.getmtime(self.lockfile) - < current_app.config["LOCKFILE_EXPIRATION"] + def _heartbeat_task(self, app: Flask, stop_event: threading.Event, timeout: int): + """ + Background task: Runs as a Thread, it is compatible with Sync (direct) or Gevent (monkey-patch) worker type. + Uses a fresh engine connection to stay pool-efficient. + """ + # manual context push is required for background execution + with app.app_context(): + while not stop_event.is_set(): + try: + # db.engine.begin() is efficient and isolated, it immediately returns a connection to the pool + with db.engine.begin() as conn: + conn.execute( + db.text( + "UPDATE upload SET last_ping = NOW() WHERE id = :id" + ), + {"id": self.id}, + ) + except Exception as e: + logging.exception( + f"Upload heartbeat failed for ID {self.project_id} and version {self.version}: {e}" + ) + + # wait for x seconds, but wake up immediately if stop_event is set + stop_event.wait(timeout) + + @contextmanager + def heartbeat(self, timeout: int = 5): + """ + Context manager to be used inside a Flask route. + + Example of usage: + ----------------- + with upload.heartbeat(interval): + do_something_slow + """ + # we need to pass a real Flask app object to the thread + app = current_app._get_current_object() + stop_event = threading.Event() + + bg = threading.Thread( + target=self._heartbeat_task, args=(app, stop_event, timeout), daemon=True ) + bg.start() + try: + yield + finally: + # signal the loop to stop + stop_event.set() + + # wait for the task to finish its last SQL call. + # in Gevent, this yields to other requests (non-blocking), while in Sync, this blocks the current thread for up to 2s + # this is to protect main thread / greenlet from zombie bg processes + bg.join(timeout=2) + def clear(self): """Clean up pending upload. Uploaded files and table records are removed, and another upload can start. """ try: - move_to_tmp(self.upload_dir, self.id) + move_to_tmp(self.upload_dir, str(self.transaction_id)) db.session.delete(self) db.session.commit() except Exception: @@ -1864,7 +2015,7 @@ def process_chunks( to_remove = [i.path for i in file_changes if i.change == PushChangeType.DELETE] current_files = [f for f in self.project.files if f.path not in to_remove] - with Toucher(self.lockfile, 5): + with self.heartbeat(5): for f in file_changes: if f.change == PushChangeType.DELETE: continue diff --git a/server/mergin/sync/permissions.py b/server/mergin/sync/permissions.py index e155020a..4fe3ad54 100644 --- a/server/mergin/sync/permissions.py +++ b/server/mergin/sync/permissions.py @@ -271,8 +271,10 @@ def check_project_permissions( return None -def get_upload(transaction_id): - upload = Upload.query.get_or_404(transaction_id) +def get_upload_or_fail(transaction_id: str) -> Upload: + if not is_valid_uuid(transaction_id): + abort(404) + upload = Upload.query.filter_by(transaction_id=transaction_id).first_or_404() # upload to 'removed' projects is forbidden if upload.project.removed_at: abort(404) @@ -280,8 +282,7 @@ def get_upload(transaction_id): if upload.user_id != current_user.id: abort(403, "You do not have permissions for ongoing upload") - upload_dir = os.path.join(upload.project.storage.project_dir, "tmp", transaction_id) - return upload, upload_dir + return upload def projects_query(permission, as_admin=True, public=True): diff --git a/server/mergin/sync/project_handler.py b/server/mergin/sync/project_handler.py index 7949dc20..e4d7e189 100644 --- a/server/mergin/sync/project_handler.py +++ b/server/mergin/sync/project_handler.py @@ -3,7 +3,7 @@ from .permissions import ProjectPermissions from sqlalchemy import or_, and_ from typing import List -from ..auth.models import User, UserProfile +from ..auth.models import User class ProjectHandler(AbstractProjectHandler): @@ -12,8 +12,7 @@ def get_push_permission(self, changes: dict): def get_email_receivers(self, project: Project) -> List[User]: return ( - User.query.join(UserProfile) - .outerjoin(ProjectUser, ProjectUser.user_id == User.id) + User.query.outerjoin(ProjectUser, ProjectUser.user_id == User.id) .filter( or_( and_( @@ -24,7 +23,7 @@ def get_email_receivers(self, project: Project) -> List[User]: ), User.active, User.verified_email, - UserProfile.receive_notifications, + User.receive_notifications, ) .all() ) diff --git a/server/mergin/sync/public_api.yaml b/server/mergin/sync/public_api.yaml index 5227b562..157e8262 100644 --- a/server/mergin/sync/public_api.yaml +++ b/server/mergin/sync/public_api.yaml @@ -699,7 +699,7 @@ paths: - do integrity check comparing uploaded file sizes with what was expected - move uploaded files to new version dir and applying sync changes (e.g. geodiff apply_changeset) - bump up version in database - - remove artifacts (chunks, lockfile) by moving them to tmp directory" + - remove artifacts (chunks) by moving them to tmp directory" operationId: push_finish parameters: - name: transaction_id diff --git a/server/mergin/sync/public_api_controller.py b/server/mergin/sync/public_api_controller.py index 8e2b0ea8..8f142e71 100644 --- a/server/mergin/sync/public_api_controller.py +++ b/server/mergin/sync/public_api_controller.py @@ -25,7 +25,7 @@ from pygeodiff import GeoDiffLibError from flask_login import current_user from sqlalchemy import and_, desc, asc -from sqlalchemy.exc import IntegrityError +from sqlalchemy.exc import IntegrityError, SQLAlchemyError from gevent import sleep import base64 from werkzeug.exceptions import HTTPException, Conflict @@ -70,12 +70,11 @@ require_project, projects_query, ProjectPermissions, - get_upload, + get_upload_or_fail, require_project_by_uuid, ) from .utils import ( generate_checksum, - Toucher, get_ip, get_user_agent, generate_location, @@ -775,13 +774,6 @@ def project_push(namespace, project_name): if all(len(changes[key]) == 0 for key in changes.keys()): abort(400, "No changes") - # reject upload early if there is another one already running - pending_upload = Upload.query.filter_by( - project_id=project.id, version=version - ).first() - if pending_upload and pending_upload.is_active(): - abort(400, "Another process is running. Please try later.") - try: ChangesSchema().validate(changes) upload_changes = ChangesSchema().dump(changes) @@ -813,47 +805,22 @@ def project_push(namespace, project_name): if requested_storage > ws.storage: return StorageLimitHit(current_usage, ws.storage).response(422) - upload = Upload(project, version, upload_changes, current_user.id) - db.session.add(upload) try: - # Creating upload transaction with different project's version is possible. - db.session.commit() + upload = Upload.create_upload( + project.id, version, upload_changes, current_user.id + ) + if not upload: + abort(400, "Another process is running. Please try later.") + logging.info( - f"Upload transaction {upload.id} created for project: {project.id}, version: {version}" + f"Upload transaction {upload.transaction_id} created for project: {project.id}, version: {version}" ) - except IntegrityError: + except (IntegrityError, SQLAlchemyError) as err: db.session.rollback() - # check and clean dangling uploads or abort - for current_upload in project.uploads.all(): - if current_upload.is_active(): - abort(400, "Another process is running. Please try later.") - db.session.delete(current_upload) - db.session.commit() - # previous push attempt is definitely lost - project.sync_failed( - "", - "push_lost", - "Push artefact removed by subsequent push", - current_user.id, - ) - - # Try again after cleanup - db.session.add(upload) - try: - db.session.commit() - logging.info( - f"Upload transaction {upload.id} created for project: {project.id}, version: {version}" - ) - move_to_tmp(upload.upload_dir) - except IntegrityError as err: - logging.error(f"Failed to create upload session: {str(err)}") - abort(422, "Failed to create upload session. Please try later.") + logging.exception(f"Failed to create upload: {str(err)}") + abort(422, "Failed to create upload session. Please try later.") - # Create transaction folder and lockfile - os.makedirs(upload.upload_dir) - open(upload.lockfile, "w").close() - - # Update immediately without uploading of new/modified files and remove transaction/lockfile after successful commit + # Update immediately without uploading of new/modified files and remove transaction after successful commit if not (changes["added"] or changes["updated"]): next_version = version + 1 file_changes = files_changes_from_upload( @@ -876,7 +843,7 @@ def project_push(namespace, project_name): db.session.commit() logging.info( f"A project version {ProjectVersion.to_v_name(next_version)} for project: {project.id} created. " - f"Transaction id: {upload.id}. No upload." + f"Transaction id: {upload.transaction_id}. No upload." ) project_version_created.send(pv) push_finished.send(pv) @@ -884,7 +851,7 @@ def project_push(namespace, project_name): except IntegrityError as err: db.session.rollback() logging.exception( - f"Failed to upload a new project version using transaction id: {upload.id}: {str(err)}" + f"Failed to upload a new project version using transaction id: {upload.transaction_id}: {str(err)}" ) abort(422, "Failed to upload a new project version. Please try later.") except gevent.timeout.Timeout: @@ -893,7 +860,7 @@ def project_push(namespace, project_name): finally: upload.clear() - return {"transaction": upload.id}, 200 + return {"transaction": upload.transaction_id}, 200 @auth_required @@ -910,7 +877,7 @@ def chunk_upload(transaction_id, chunk_id): :rtype: Dict """ - upload, upload_dir = get_upload(transaction_id) + upload = get_upload_or_fail(transaction_id) request.view_args["project"] = upload.project chunks = [] for file in upload.changes["added"] + upload.changes["updated"]: @@ -919,8 +886,8 @@ def chunk_upload(transaction_id, chunk_id): if chunk_id not in chunks: abort(404) - dest = os.path.join(upload_dir, "chunks", chunk_id) - with Toucher(upload.lockfile, 30): + dest = os.path.join(upload.upload_dir, "chunks", chunk_id) + with upload.heartbeat(30): try: # we could have used request.data here, but it could eventually cause OOM issue save_to_file(request.stream, dest, current_app.config["MAX_CHUNK_SIZE"]) @@ -945,14 +912,14 @@ def push_finish(transaction_id): - do integrity check comparing uploaded file sizes with what was expected - move uploaded files to new version dir and applying sync changes (e.g. geodiff apply_changeset) - bump up version in database - - remove artifacts (chunks, lockfile) by moving them to tmp directory + - remove artifacts (chunks) by moving them to tmp directory :param transaction_id: Transaction id. :type transaction_id: str :rtype: None """ - upload, upload_dir = get_upload(transaction_id) + upload = get_upload_or_fail(transaction_id) request.view_args["project"] = upload.project project = upload.project next_version = project.next_version() @@ -991,7 +958,7 @@ def push_finish(transaction_id): abort(422, f"Failed to create new version: {msg}") - files_dir = os.path.join(upload_dir, "files", v_next_version) + files_dir = os.path.join(upload.upload_dir, "files", v_next_version) target_dir = os.path.join(project.storage.project_dir, v_next_version) if os.path.exists(target_dir): pv = ProjectVersion.query.filter_by( @@ -1009,39 +976,57 @@ def push_finish(transaction_id): move_to_tmp(target_dir) try: - user_agent = get_user_agent(request) - device_id = get_device_id(request) - pv = ProjectVersion( - project, - next_version, - current_user.id, - file_changes, - get_ip(request), - user_agent, - device_id, - ) - db.session.add(pv) - db.session.add(project) - db.session.commit() + # let's keep upload alive until all work is done so no one else can claim it + with upload.heartbeat(5): + user_agent = get_user_agent(request) + device_id = get_device_id(request) + pv = ProjectVersion( + project, + next_version, + current_user.id, + file_changes, + get_ip(request), + user_agent, + device_id, + ) + db.session.add(pv) + db.session.add(project) - # let's move uploaded files where they are expected to be - os.renames(files_dir, version_dir) + # move files before committing so a filesystem failure leaves the DB clean + if os.path.exists(files_dir): + os.renames(files_dir, version_dir) - logging.info( - f"Push finished for project: {project.id}, project version: {v_next_version}, transaction id: {transaction_id}." - ) - project_version_created.send(pv) - push_finished.send(pv) - except (psycopg2.Error, FileNotFoundError, IntegrityError) as err: + db.session.commit() + + logging.info( + f"Push finished for project: {project.id}, project version: {v_next_version}, transaction id: {transaction_id}." + ) + project_version_created.send(pv) + push_finished.send(pv) + except (psycopg2.Error, OSError, IntegrityError) as err: db.session.rollback() logging.exception( f"Failed to finish push for project: {project.id}, project version: {v_next_version}, " f"transaction id: {transaction_id}.: {str(err)}" ) + if ( + os.path.exists(version_dir) + and not ProjectVersion.query.filter_by( + project_id=project.id, name=next_version + ).count() + ): + move_to_tmp(version_dir) abort(422, "Failed to create new version: {}".format(str(err))) # catch exception during pg transaction so we can rollback and prevent PendingRollbackError during upload clean up except gevent.timeout.Timeout: db.session.rollback() + if ( + os.path.exists(version_dir) + and not ProjectVersion.query.filter_by( + project_id=project.id, name=next_version + ).count() + ): + move_to_tmp(version_dir) raise finally: # remove artifacts @@ -1061,10 +1046,8 @@ def push_cancel(transaction_id): :rtype: None """ - upload, upload_dir = get_upload(transaction_id) - db.session.delete(upload) - db.session.commit() - move_to_tmp(upload_dir) + upload = get_upload_or_fail(transaction_id) + upload.clear() return NoContent, 200 diff --git a/server/mergin/sync/public_api_v2_controller.py b/server/mergin/sync/public_api_v2_controller.py index 9a82a211..ebd909ad 100644 --- a/server/mergin/sync/public_api_v2_controller.py +++ b/server/mergin/sync/public_api_v2_controller.py @@ -15,8 +15,7 @@ from flask import abort, jsonify, current_app from flask_login import current_user from marshmallow import ValidationError -from sqlalchemy.exc import IntegrityError -from sqlalchemy.orm.exc import ObjectDeletedError +from sqlalchemy.exc import IntegrityError, SQLAlchemyError from .schemas_v2 import BatchErrorSchema, ProjectSchema as ProjectSchemaV2 from ..app import db @@ -241,11 +240,6 @@ def create_project_version(id): if pv and pv.name != version: return ProjectVersionExists(version, pv.name).response(409) - # reject push if there is another one already running - pending_upload = Upload.query.filter_by(project_id=project.id).first() - if pending_upload and pending_upload.is_active(): - return AnotherUploadRunning().response(409) - try: ChangesSchema().validate(changes) upload_changes = ChangesSchema().dump(changes) @@ -296,88 +290,78 @@ def create_project_version(id): return NoContent, 204 try: - # while processing data, block other uploads - upload = Upload(project, version, upload_changes, current_user.id) - db.session.add(upload) - # Creating blocking upload can fail, e.g. in case of racing condition - db.session.commit() - except IntegrityError: - db.session.rollback() - # check and clean dangling blocking uploads or abort - for current_upload in project.uploads.all(): - if current_upload.is_active(): - return AnotherUploadRunning().response(409) - db.session.delete(current_upload) - db.session.commit() - # previous push attempt is definitely lost - project.sync_failed( - "", - "push_lost", - "Push artefact removed by subsequent push", - current_user.id, - ) - - try: - # Try again after cleanup - upload = Upload(project, version, upload_changes, current_user.id) - db.session.add(upload) - db.session.commit() - move_to_tmp(upload.upload_dir) - except IntegrityError as err: - logging.error(f"Failed to create upload session: {str(err)}") + upload = Upload.create_upload( + project.id, version, upload_changes, current_user.id + ) + if not upload: return AnotherUploadRunning().response(409) + except (IntegrityError, SQLAlchemyError) as err: + db.session.rollback() + logging.exception(f"Failed to create upload: {str(err)}") + return UploadError().response(422) + except OSError as err: + logging.exception(f"Failed to create upload directory: {str(err)}") + return UploadError().response(422) - # Create transaction folder and lockfile - os.makedirs(upload.upload_dir) - open(upload.lockfile, "w").close() - + # this is the heavy work of processing upload data file_changes, errors = upload.process_chunks(use_shared_chunk_dir=True) # files consistency or geodiff related issues, project push would never succeed, whole upload is aborted if errors: upload.clear() return DataSyncError(failed_files=errors).response(422) - upload_deleted = False + if os.path.exists(version_dir): + if ProjectVersion.query.filter_by( + project_id=project.id, name=next_version + ).count(): + return UploadError( + error=f"Version {v_next_version} already exists" + ).response(409) + move_to_tmp(version_dir) + try: - pv = ProjectVersion( - project, - next_version, - current_user.id, - file_changes, - get_ip(request), - get_user_agent(request), - get_device_id(request), - ) - db.session.add(pv) - db.session.add(project) - db.session.commit() - - # let's move uploaded files where they are expected to be - if to_be_added_files or to_be_updated_files: - temp_files_dir = os.path.join(upload.upload_dir, "files", v_next_version) - os.renames(temp_files_dir, version_dir) - - # remove used chunks - # get chunks from added and updated files - chunks_ids = [] - for file in to_be_added_files + to_be_updated_files: - file_chunks = file.get("chunks", []) - chunks_ids.extend(file_chunks) - remove_transaction_chunks.delay(chunks_ids) - - logging.info( - f"Push finished for project: {project.id}, project version: {v_next_version}." - ) - project_version_created.send(pv) - push_finished.send(pv) + # let's keep upload alive until all work is done so no one else can claim it + with upload.heartbeat(5): + pv = ProjectVersion( + project, + next_version, + current_user.id, + file_changes, + get_ip(request), + get_user_agent(request), + get_device_id(request), + ) + db.session.add(pv) + db.session.add(project) + + # move files before committing so a filesystem failure leaves the DB clean + if to_be_added_files or to_be_updated_files: + temp_files_dir = os.path.join( + upload.upload_dir, "files", v_next_version + ) + os.renames(temp_files_dir, version_dir) + + db.session.commit() + + # remove used chunks only after commit — chunks belong to the now-committed version + if to_be_added_files or to_be_updated_files: + chunks_ids = [] + for file in to_be_added_files + to_be_updated_files: + file_chunks = file.get("chunks", []) + chunks_ids.extend(file_chunks) + remove_transaction_chunks.delay(chunks_ids) + + logging.info( + f"Push finished for project: {project.id}, project version: {v_next_version}." + ) + project_version_created.send(pv) + push_finished.send(pv) except ( psycopg2.Error, - FileNotFoundError, + OSError, IntegrityError, - ObjectDeletedError, ) as err: db.session.rollback() - upload_deleted = isinstance(err, ObjectDeletedError) logging.exception( f"Failed to finish push for project: {project.id}, project version: {v_next_version}: {str(err)}" ) @@ -401,9 +385,8 @@ def create_project_version(id): move_to_tmp(version_dir) raise finally: - # remove artifacts only if upload object is still valid - if not upload_deleted: - upload.clear() + # remove upload artifacts + upload.clear() result = ProjectSchemaV2().dump(project) result["files"] = ProjectFileSchema( diff --git a/server/mergin/sync/schemas.py b/server/mergin/sync/schemas.py index 4eecca0c..da18f7db 100644 --- a/server/mergin/sync/schemas.py +++ b/server/mergin/sync/schemas.py @@ -132,7 +132,7 @@ def _role(self, obj): return role.value def _uploads(self, obj): - return [u.id for u in obj.project.uploads.all()] + return [u.transaction_id for u in obj.project.uploads.all()] def _permissions(self, obj): return project_user_permissions(obj.project) @@ -180,7 +180,7 @@ def _role(self, obj): return role.value def _uploads(self, obj): - return [u.id for u in obj.uploads.all()] + return [u.transaction_id for u in obj.uploads.all()] class Meta: model = Project diff --git a/server/mergin/sync/storages/disk.py b/server/mergin/sync/storages/disk.py index f4cb34fc..80715ed6 100644 --- a/server/mergin/sync/storages/disk.py +++ b/server/mergin/sync/storages/disk.py @@ -114,7 +114,11 @@ def move_to_tmp(src, dest=None): else: root = tempfile.gettempdir() temp_path = os.path.join(root, "delete-me-" + dest, os.path.basename(src)) - os.renames(src, temp_path) + try: + os.renames(src, temp_path) + except OSError as rename_err: + logging.error(f"Failed to move {src} to tmp: {rename_err}") + return None else: raise return temp_path diff --git a/server/mergin/sync/utils.py b/server/mergin/sync/utils.py index 9e89eb7c..48966457 100644 --- a/server/mergin/sync/utils.py +++ b/server/mergin/sync/utils.py @@ -57,53 +57,6 @@ def generate_checksum(file, chunk_size=4096): checksum.update(chunk) -class Toucher: - """ - Helper class to periodically update modification time of file during - execution of longer lasting task. - - Example of usage: - ----------------- - with Toucher(file, interval): - do_something_slow - - """ - - def __init__(self, lockfile, interval): - self.lockfile = lockfile - self.interval = interval - self.running = False - self.timer = None - - def __enter__(self): - self.acquire() - - def __exit__(self, type, value, tb): # pylint: disable=W0612,W0622 - self.release() - - def release(self): - self.running = False - if self.timer: - self.timer.cancel() - self.timer = None - - def acquire(self): - self.running = True - self.touch_lockfile() - - def touch_lockfile(self): - # do an NFS ACCESS procedure request to clear the attribute cache (for various pods to actually see the file) - # https://docs.aws.amazon.com/efs/latest/ug/troubleshooting-efs-general.html#custom-nfs-settings-write-delays - os.access(self.lockfile, os.W_OK) - with open(self.lockfile, "a"): - os.utime(self.lockfile, None) - - sleep(0) # to unblock greenlet - if self.running: - self.timer = Timer(self.interval, self.touch_lockfile) - self.timer.start() - - def is_qgis(path: str) -> bool: """ Check if file is a QGIS project file. diff --git a/server/mergin/tests/test_auth.py b/server/mergin/tests/test_auth.py index b130c109..ba7730c3 100644 --- a/server/mergin/tests/test_auth.py +++ b/server/mergin/tests/test_auth.py @@ -14,7 +14,7 @@ from ..auth.bearer import decode_token, encode_token from ..auth.forms import ResetPasswordForm from ..auth.app import generate_confirmation_token, confirm_token -from ..auth.models import User, UserProfile, LoginHistory +from ..auth.models import User, LoginHistory from ..auth.tasks import anonymize_removed_users from ..app import db from ..sync.models import Project, ProjectRole @@ -286,7 +286,6 @@ def test_change_password(client): email="user_test@mergin.com", ) user.active = True - user.profile = UserProfile() db.session.add(user) db.session.commit() diff --git a/server/mergin/tests/test_db_hooks.py b/server/mergin/tests/test_db_hooks.py index 044294c5..27aadb5b 100644 --- a/server/mergin/tests/test_db_hooks.py +++ b/server/mergin/tests/test_db_hooks.py @@ -114,8 +114,7 @@ def test_remove_project(client, diff_project): # set up mergin_user = User.query.filter_by(username=DEFAULT_USER[0]).first() project_dir = Path(diff_project.storage.project_dir) - upload = Upload(diff_project, 10, [], mergin_user.id) - db.session.add(upload) + upload = Upload.create_upload(diff_project.id, 10, [], mergin_user.id) project_id = diff_project.id user = add_user("user", "user") access_request = AccessRequest(diff_project, user.id) diff --git a/server/mergin/tests/test_disk_utils.py b/server/mergin/tests/test_disk_utils.py index dd485078..c9abd66f 100644 --- a/server/mergin/tests/test_disk_utils.py +++ b/server/mergin/tests/test_disk_utils.py @@ -2,10 +2,13 @@ # # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial +import errno +import logging import os import tempfile import shutil import pytest +from unittest.mock import patch from ..sync.storages.disk import copy_file, copy_dir, move_to_tmp from ..sync.utils import generate_checksum from . import test_project_dir @@ -85,3 +88,22 @@ def test_failures(): os.path.join(test_project_dir, "not_found"), os.path.join(tempfile.gettempdir(), "new_dir"), ) + + +def test_move_to_tmp_full_disk_on_fallback(app, tmp_path, caplog): + """Fallback rename on cross-device error logs error and returns None when disk is full.""" + cross_device_err = OSError(errno.EXDEV, "Invalid cross-device link") + no_space_err = OSError(errno.ENOSPC, "No space left on device") + + src = tmp_path / "test_file.gpkg" + src.touch() + + with caplog.at_level(logging.ERROR), patch( + "mergin.sync.storages.disk.os.renames", + side_effect=[cross_device_err, no_space_err], + ): + result = move_to_tmp(str(src)) + + assert result is None + assert "Failed to move" in caplog.text + assert str(src) in caplog.text diff --git a/server/mergin/tests/test_project_controller.py b/server/mergin/tests/test_project_controller.py index 9318eff1..60c36ee2 100644 --- a/server/mergin/tests/test_project_controller.py +++ b/server/mergin/tests/test_project_controller.py @@ -42,7 +42,7 @@ from ..sync.files import files_changes_from_upload from ..sync.schemas import ProjectListSchema from ..sync.utils import Checkpoint, generate_checksum, is_versioned_file -from ..auth.models import User, UserProfile +from ..auth.models import User from . import ( test_project, @@ -406,7 +406,7 @@ def test_add_project(client, app, data, expected): "mergin", _get_changes_with_diff(test_project_dir) ) upload_chunks(upload_dir, upload.changes) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 200 # add TEMPLATES user and make him creator of test_project (to become template) @@ -510,7 +510,7 @@ def test_delete_project(client): with open(os.path.join(upload_dir, "chunks", chunk), "wb") as out_file: out_file.write(in_file.read(CHUNK_SIZE)) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 200 # try force delete for active project @@ -668,7 +668,6 @@ def test_update_project(client): username="tester", passwd="tester", is_admin=False, email="tester@mergin.com" ) test_user.active = True - test_user.profile = UserProfile() db.session.add(test_user) db.session.commit() @@ -1122,7 +1121,7 @@ def test_push_to_new_project(client): assert resp.status_code == 200 upload_id = resp.json["transaction"] - upload = Upload.query.filter_by(id=upload_id).first() + upload = Upload.query.filter_by(transaction_id=upload_id).first() blacklisted_file = all( added["path"] != "test_dir/test4.txt" for added in upload.changes["added"] ) @@ -1213,6 +1212,52 @@ def test_push_integrity_error(client, app): assert failure.error_details == "No changes" +def test_stale_upload_takeover(client, app): + """Stale upload (last_ping expired) is atomically replaced by a new one. + + Verifies that: + - the new upload gets a fresh transaction_id + - the old upload directory is cleaned up + - a push_lost failure is recorded for the abandoned upload + """ + project = Project.query.filter_by( + name=test_project, workspace_id=test_workspace_id + ).first() + user = User.query.filter_by(username="mergin").first() + changes = _get_changes(test_project_dir) + changes["added"] = changes["removed"] = [] + + # create initial upload and record its identity + upload = Upload.create_upload(project.id, 1, changes, user.id) + old_tx_id = upload.transaction_id + old_upload_dir = upload.upload_dir + assert os.path.exists(old_upload_dir) + + # backdate last_ping to make the upload appear stale + db.session.execute( + db.text( + "UPDATE upload SET last_ping = NOW() - :expiry * INTERVAL '1 second' WHERE id = :id" + ), + { + "id": upload.id, + "expiry": client.application.config["LOCKFILE_EXPIRATION"] + 1, + }, + ) + db.session.commit() + + # takeover — should succeed and replace the stale upload + new_upload = Upload.create_upload(project.id, 1, changes, user.id) + assert new_upload is not None + assert new_upload.transaction_id != old_tx_id + assert os.path.exists(new_upload.upload_dir) + # old directory was moved away + assert not os.path.exists(old_upload_dir) + # push_lost was recorded for the abandoned upload + failure = SyncFailuresHistory.query.filter_by(project_id=project.id).first() + assert failure.error_type == "push_lost" + assert failure.error_details == "Push artefact removed by subsequent push" + + def test_exceed_data_limit(client): project = Project.query.filter_by( name=test_project, workspace_id=test_workspace_id @@ -1291,13 +1336,8 @@ def create_transaction(username, changes, version=1): project = Project.query.filter_by( name=test_project, workspace_id=test_workspace_id ).first() - upload = Upload(project, version, changes, user.id) - db.session.add(upload) - db.session.commit() - upload_dir = os.path.join(upload.project.storage.project_dir, "tmp", upload.id) - os.makedirs(upload_dir) - open(os.path.join(upload_dir, "lockfile"), "w").close() - return upload, upload_dir + upload = Upload.create_upload(project.id, version, changes, user.id) + return upload, upload.upload_dir def remove_transaction(transaction_id): @@ -1313,7 +1353,7 @@ def test_chunk_upload(client, app): changes = _get_changes(test_project_dir) upload, upload_dir = create_transaction("mergin", changes) chunk_id = upload.changes["added"][0]["chunks"][0] - url = "/v1/project/push/chunk/{}/{}".format(upload.id, chunk_id) + url = "/v1/project/push/chunk/{}/{}".format(upload.transaction_id, chunk_id) with open(os.path.join(test_project_dir, "test_dir", "test4.txt"), "rb") as file: data = file.read(CHUNK_SIZE) checksum = hashlib.sha1() @@ -1322,6 +1362,7 @@ def test_chunk_upload(client, app): resp = client.post(url, data=data, headers=headers) assert resp.status_code == 200 assert resp.json["checksum"] == checksum.hexdigest() + assert os.path.exists(os.path.join(upload_dir, "chunks", chunk_id)) # tests to send bigger chunk than allowed app.config["MAX_CHUNK_SIZE"] = 10 * CHUNK_SIZE @@ -1334,6 +1375,8 @@ def test_chunk_upload(client, app): failure = SyncFailuresHistory.query.filter_by(project_id=upload.project.id).first() assert failure.error_type == "chunk_upload" assert failure.error_details == "Too big chunk" + # residual after upload was removed + assert not os.path.exists(os.path.join(upload_dir, "chunks", chunk_id)) # tests with transaction with no uploads expected changes = _get_changes(test_project_dir) @@ -1344,9 +1387,8 @@ def test_chunk_upload(client, app): resp2 = client.post(url, data=data, headers=headers) assert resp2.status_code == 404 assert SyncFailuresHistory.query.count() == 1 - - # cleanup - shutil.rmtree(upload_dir) + # we do not have any chunks, so parent dir was removed as well + assert not os.path.exists(os.path.join(upload_dir)) def upload_chunks(upload_dir, changes, src_dir=test_project_dir): @@ -1368,7 +1410,9 @@ def test_push_finish(client): changes = _get_changes(test_project_dir) upload, upload_dir = create_transaction("mergin", changes) - resp = client.post(f"/v1/project/push/finish/{upload.id}", headers=json_headers) + resp = client.post( + f"/v1/project/push/finish/{upload.transaction_id}", headers=json_headers + ) assert resp.status_code == 422 assert "corrupted_files" in resp.json["detail"].keys() assert not os.path.exists(os.path.join(upload_dir, "files", "test.txt")) @@ -1389,7 +1433,7 @@ def test_push_finish(client): chunks.append(chunk) resp2 = client.post( - f"/v1/project/push/finish/{upload.id}", + f"/v1/project/push/finish/{upload.transaction_id}", headers={**json_headers, "User-Agent": "Werkzeug"}, ) assert resp2.status_code == 200 @@ -1416,7 +1460,7 @@ def test_push_finish(client): db.session.commit() upload, upload_dir = create_transaction(user.username, changes) - url = "/v1/project/push/finish/{}".format(upload.id) + url = "/v1/project/push/finish/{}".format(upload.transaction_id) db.session.add(upload) db.session.commit() # still log in as mergin user @@ -1430,7 +1474,7 @@ def test_push_finish(client): def test_push_close(client): changes = _get_changes(test_project_dir) upload, upload_dir = create_transaction("mergin", changes) - url = "/v1/project/push/cancel/{}".format(upload.id) + url = "/v1/project/push/cancel/{}".format(upload.transaction_id) resp = client.post(url) assert resp.status_code == 200 @@ -1473,12 +1517,12 @@ def test_whole_push_process(client): assert resp.status_code == 200 assert "transaction" in resp.json.keys() - upload = Upload.query.get(resp.json["transaction"]) + upload = Upload.query.filter_by(transaction_id=resp.json["transaction"]).first() assert upload # assert we can get project info with active upload resp = client.get(f"/v1/project/{test_workspace_name}/{upload.project.name}") assert resp.status_code == 200 - assert upload.id in resp.json["uploads"] + assert str(upload.transaction_id) in resp.json["uploads"] assert ( client.get( f"/v1/project/{test_workspace_name}/{upload.project.name}?version=v1" @@ -1489,7 +1533,7 @@ def test_whole_push_process(client): # push upload: upload file chunks for file in changes["added"]: for chunk_id in file["chunks"]: - url = "/v1/project/push/chunk/{}/{}".format(upload.id, chunk_id) + url = "/v1/project/push/chunk/{}/{}".format(upload.transaction_id, chunk_id) with open(os.path.join(test_dir, file["path"]), "rb") as f: data = f.read(CHUNK_SIZE) checksum = hashlib.sha1() @@ -1501,7 +1545,7 @@ def test_whole_push_process(client): assert resp.json["checksum"] == checksum.hexdigest() # push finish: call server to concatenate chunks and finish upload - resp = client.post(f"/v1/project/push/finish/{upload.id}") + resp = client.post(f"/v1/project/push/finish/{upload.transaction_id}") assert resp.status_code == 200 project = Project.query.filter_by( name=test_project, workspace_id=test_workspace_id @@ -1529,7 +1573,7 @@ def test_push_diff_finish(client): changes = _get_changes_with_diff(test_project_dir) upload, upload_dir = create_transaction("mergin", changes) upload_chunks(upload_dir, upload.changes) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 200 # check there are not any changes between local modified file and server patched file (using geodiff) geodiff = GeoDiff() @@ -1553,7 +1597,7 @@ def test_push_diff_finish(client): upload, upload_dir = create_transaction("mergin", changes, 2) upload_chunks(upload_dir, upload.changes) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 422 assert ( "GEODIFF ERROR: Nothing inserted (this should never happen)" @@ -1562,10 +1606,10 @@ def test_push_diff_finish(client): error = resp.json["detail"] # try again to make sure geodiff logs are related only to recent event - client.post("/v1/project/push/cancel/{}".format(upload.id)) + client.post("/v1/project/push/cancel/{}".format(upload.transaction_id)) upload, upload_dir = create_transaction("mergin", changes, 2) upload_chunks(upload_dir, upload.changes) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 422 assert resp.json["detail"] == error @@ -1573,7 +1617,7 @@ def test_push_diff_finish(client): changes = _get_changes_with_diff_0_size(test_project_dir) upload, upload_dir = create_transaction("mergin", changes, 3) upload_chunks(upload_dir, upload.changes) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 422 @@ -1599,7 +1643,7 @@ def test_push_no_diff_finish(client): } upload, upload_dir = create_transaction("mergin", changes) upload_chunks(upload_dir, upload.changes, src_dir=working_dir) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 200 # check diff file was generated by server, and it is in file history latest_version = upload.project.get_latest_version() @@ -1646,7 +1690,7 @@ def copy_file_failing_for_geodiff(src, dest): "mergin.sync.storages.disk.copy_file", side_effect=copy_file_failing_for_geodiff, ): - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 200 latest_version = upload.project.get_latest_version() file_meta = latest_version.changes.filter( @@ -1677,7 +1721,7 @@ def copy_file_failing_for_geodiff(src, dest): } upload, upload_dir = create_transaction("mergin", changes, version=3) upload_chunks(upload_dir, upload.changes, src_dir=working_dir) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 200 latest_version = upload.project.get_latest_version() assert all( @@ -1749,7 +1793,7 @@ def test_clone_project(client, data, username, expected): "mergin", _get_changes_with_diff(test_project_dir) ) upload_chunks(upload_dir, upload.changes) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 200 endpoint = "/v1/project/clone/{}/{}".format(test_workspace_name, test_project) @@ -1891,7 +1935,7 @@ def test_optimize_storage(app, client, diff_project): with open(os.path.join(upload_dir, "chunks", chunk), "wb") as out_file: out_file.write(in_file.read(CHUNK_SIZE)) - resp = client.post(f"/v1/project/push/finish/{upload.id}") + resp = client.post(f"/v1/project/push/finish/{upload.transaction_id}") assert resp.status_code == 200 assert os.path.exists(optimize_v4) @@ -2253,16 +2297,16 @@ def test_inactive_project(client, diff_project): upload, upload_dir = create_transaction("mergin", _get_changes(test_project_dir)) chunk_id = upload.changes["added"][0]["chunks"][0] resp = client.post( - f"/v1/project/push/chunk/{upload.id}/{chunk_id}", + f"/v1/project/push/chunk/{upload.transaction_id}/{chunk_id}", data=data, headers={"Content-Type": "application/octet-stream"}, ) assert resp.status_code == 404 - resp = client.post(f"/v1/project/push/finish/{upload.id}") + resp = client.post(f"/v1/project/push/finish/{upload.transaction_id}") assert resp.status_code == 404 - resp = client.post(f"/v1/project/push/cancel/{upload.id}") + resp = client.post(f"/v1/project/push/cancel/{upload.transaction_id}") assert resp.status_code == 404 # delete project again @@ -2363,7 +2407,7 @@ def test_project_version_integrity(client): "__init__", side_effect=IntegrityError("Project version already exists", None, None), ): - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 422 assert "Failed to create new version" in resp.json["detail"] failure = SyncFailuresHistory.query.filter_by( @@ -2422,7 +2466,7 @@ def _get_user_agent(): changes = _get_changes(test_project_dir) upload, upload_dir = create_transaction("mergin", changes) upload_chunks(upload_dir, upload.changes) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 200 @@ -2458,12 +2502,12 @@ def test_delete_diff_file(client): } upload, upload_dir = create_transaction("mergin", changes) upload_chunks(upload_dir, upload.changes) - client.post(f"/v1/project/push/finish/{upload.id}") + client.post(f"/v1/project/push/finish/{upload.transaction_id}") changes = _get_changes_with_diff(test_project_dir) upload, upload_dir = create_transaction("mergin", changes, version=2) upload_chunks(upload_dir, upload.changes) - client.post(f"/v1/project/push/finish/{upload.id}") + client.post(f"/v1/project/push/finish/{upload.transaction_id}") fh = FileHistory.query.filter_by( project_version_name=upload.project.latest_version, @@ -2609,12 +2653,12 @@ def test_supported_file_upload(client): headers=json_headers, ) assert resp.status_code == 200 - upload = Upload.query.get(resp.json["transaction"]) + upload = Upload.query.filter_by(transaction_id=resp.json["transaction"]).first() assert upload # Even chunks are correctly uploaded for file in changes["added"]: for chunk_id in file["chunks"]: - url = "/v1/project/push/chunk/{}/{}".format(upload.id, chunk_id) + url = "/v1/project/push/chunk/{}/{}".format(upload.transaction_id, chunk_id) with open(os.path.join(TMP_DIR, file["path"]), "rb") as f: data = f.read(CHUNK_SIZE) checksum = hashlib.sha1() @@ -2625,7 +2669,7 @@ def test_supported_file_upload(client): assert resp.status_code == 200 assert resp.json["checksum"] == checksum.hexdigest() # Unsupported file type is revealed when reconstructed from chunks - based on the mime type - and upload is refused - resp = client.post(f"/v1/project/push/finish/{upload.id}") + resp = client.post(f"/v1/project/push/finish/{upload.transaction_id}") assert resp.status_code == 400 assert ( resp.json["detail"] @@ -2658,8 +2702,8 @@ def test_locked_project(client, diff_project): assert resp.headers["Content-Type"] == "application/problem+json" assert resp.json["code"] == "ProjectLocked" # to play safe push finish is also blocked - upload, upload_dir = create_transaction("mergin", changes) - url = "/v1/project/push/finish/{}".format(upload.id) + upload, _ = create_transaction("mergin", changes) + url = "/v1/project/push/finish/{}".format(upload.transaction_id) resp = client.post(url, headers=json_headers) assert resp.status_code == 422 diff --git a/server/mergin/tests/test_public_api_v2.py b/server/mergin/tests/test_public_api_v2.py index 7a87b1d0..56caa7ff 100644 --- a/server/mergin/tests/test_public_api_v2.py +++ b/server/mergin/tests/test_public_api_v2.py @@ -1031,12 +1031,7 @@ def test_create_version_failures(client): data = {"version": "v1", "changes": _get_changes_without_added(test_project_dir)} # somebody else is syncing - upload = Upload(project, 1, _get_changes(test_project_dir), 1) - db.session.add(upload) - db.session.commit() - os.makedirs(upload.upload_dir) - open(upload.lockfile, "w").close() - + upload = Upload.create_upload(project.id, 1, _get_changes(test_project_dir), 1) response = client.post(f"v2/projects/{project.id}/versions", json=data) assert response.status_code == 409 assert response.json["code"] == AnotherUploadRunning.code @@ -1073,16 +1068,6 @@ def test_create_version_failures(client): assert response.status_code == 422 assert response.json["code"] == UploadError.code - # try to finish the transaction which would fail on existing Upload integrity error, e.g. race conditions - with patch.object( - Upload, - "__init__", - side_effect=IntegrityError("Cannot insert upload", None, None), - ): - response = client.post(f"v2/projects/{project.id}/versions", json=data) - assert response.status_code == 409 - assert response.json["code"] == AnotherUploadRunning.code - # try to finish the transaction which would fail on unexpected integrity error # patch of ChangesSchema is just a workaround to trigger and error with patch.object( @@ -1094,46 +1079,6 @@ def test_create_version_failures(client): assert response.status_code == 409 -def test_create_version_object_deleted_error(client): - """Test that ObjectDeletedError during push returns 422 without secondary exception""" - project = Project.query.filter_by( - workspace_id=test_workspace_id, name=test_project - ).first() - - data = { - "version": "v1", - "changes": { - "added": [], - "removed": [ - file_info(test_project_dir, "base.gpkg"), - ], - "updated": [], - }, - } - - # Create a real ObjectDeletedError by using internal SQLAlchemy state - def raise_object_deleted(*args, **kwargs): - # Create a minimal state-like object that ObjectDeletedError can use - class FakeState: - class_ = Upload - - def obj(self): - return None - - raise ObjectDeletedError(FakeState()) - - with patch.object( - ProjectVersion, - "__init__", - side_effect=raise_object_deleted, - ): - response = client.post(f"v2/projects/{project.id}/versions", json=data) - - # Should return 422 UploadError, not 500 from secondary exception - assert response.status_code == 422 - assert response.json["code"] == UploadError.code - - def test_upload_chunk(client): """Test pushing a chunk to a project""" project = Project.query.filter_by( diff --git a/server/mergin/tests/utils.py b/server/mergin/tests/utils.py index 89ead403..57f67e80 100644 --- a/server/mergin/tests/utils.py +++ b/server/mergin/tests/utils.py @@ -15,7 +15,7 @@ from dateutil.tz import tzlocal from pygeodiff import GeoDiff -from ..auth.models import User, UserProfile +from ..auth.models import User from ..sync.utils import generate_location, generate_checksum from ..sync.models import ( Project, @@ -52,7 +52,6 @@ def add_user(username="random", password="random", is_admin=False) -> User: ) user.active = True user.verified_email = True - user.profile = UserProfile() db.session.add(user) db.session.commit() return user diff --git a/server/mergin/version.py b/server/mergin/version.py index 89b89699..2e64bc98 100644 --- a/server/mergin/version.py +++ b/server/mergin/version.py @@ -4,4 +4,4 @@ def get_version(): - return "2026.3.2" + return "2026.4.0" diff --git a/server/migrations/community/e3f1a9b2c4d6_merge_user_profile_into_user.py b/server/migrations/community/e3f1a9b2c4d6_merge_user_profile_into_user.py new file mode 100644 index 00000000..88898460 --- /dev/null +++ b/server/migrations/community/e3f1a9b2c4d6_merge_user_profile_into_user.py @@ -0,0 +1,82 @@ +# Copyright (C) Lutra Consulting Limited +# +# SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial + +"""Merge user_profile table into user table + +Revision ID: e3f1a9b2c4d6 +Revises: 4b4648483770 +Create Date: 2026-04-14 00:00:00.000000 +""" +from alembic import op +import sqlalchemy as sa + +revision = "e3f1a9b2c4d6" +down_revision = "4b4648483770" +branch_labels = None +depends_on = None + + +def upgrade(): + # Add profile columns to user table (nullable initially to allow data copy) + op.add_column( + "user", sa.Column("receive_notifications", sa.Boolean(), nullable=True) + ) + op.add_column("user", sa.Column("first_name", sa.String(256), nullable=True)) + op.add_column("user", sa.Column("last_name", sa.String(256), nullable=True)) + + # Copy data from user_profile + op.execute( + """ + UPDATE "user" u + SET + receive_notifications = up.receive_notifications, + first_name = up.first_name, + last_name = up.last_name + FROM user_profile up + WHERE up.user_id = u.id; + """ + ) + + # Fill in default for any users without a profile row (should not exist, but be safe) + op.execute( + 'UPDATE "user" SET receive_notifications = FALSE WHERE receive_notifications IS NULL;' + ) + + op.alter_column("user", "receive_notifications", nullable=False) + op.create_index("ix_user_receive_notifications", "user", ["receive_notifications"]) + op.drop_table("user_profile") + + +def downgrade(): + # Recreate user_profile table + op.create_table( + "user_profile", + sa.Column("user_id", sa.Integer(), nullable=False), + sa.Column("receive_notifications", sa.Boolean(), nullable=True), + sa.Column("first_name", sa.String(256), nullable=True), + sa.Column("last_name", sa.String(256), nullable=True), + sa.ForeignKeyConstraint(["user_id"], ["user.id"], ondelete="CASCADE"), + sa.PrimaryKeyConstraint("user_id"), + ) + + # Copy data back + op.execute( + """ + INSERT INTO user_profile (user_id, receive_notifications, first_name, last_name) + SELECT id, receive_notifications, first_name, last_name + FROM "user"; + """ + ) + + op.create_index( + "ix_user_profile_receive_notifications", + "user_profile", + ["receive_notifications"], + ) + + # Remove columns from user table + op.drop_index("ix_user_receive_notifications", table_name="user") + op.drop_column("user", "receive_notifications") + op.drop_column("user", "first_name") + op.drop_column("user", "last_name") diff --git a/server/migrations/community/f1d9e4a7b823_update_upload_table_for_concurrency.py b/server/migrations/community/f1d9e4a7b823_update_upload_table_for_concurrency.py new file mode 100644 index 00000000..c99f26df --- /dev/null +++ b/server/migrations/community/f1d9e4a7b823_update_upload_table_for_concurrency.py @@ -0,0 +1,45 @@ +"""Add transaction_id and last_ping columns to upload + +Revision ID: f1d9e4a7b823 +Revises: e3f1a9b2c4d6 +Create Date: 2026-04-14 00:00:00.000000 + +""" + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import UUID + + +# revision identifiers, used by Alembic. +revision = "f1d9e4a7b823" +down_revision = "e3f1a9b2c4d6" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column( + "upload", sa.Column("transaction_id", UUID(as_uuid=True), nullable=True) + ) + op.add_column("upload", sa.Column("last_ping", sa.DateTime(), nullable=True)) + + # backfill existing rows before adding NOT NULL constraint + op.execute( + "UPDATE upload SET transaction_id = id::uuid WHERE transaction_id IS NULL;" + ) + op.execute("UPDATE upload SET last_ping = NOW() WHERE last_ping IS NULL;") + + op.alter_column("upload", "transaction_id", nullable=False) + op.alter_column("upload", "last_ping", nullable=False) + + op.create_index( + op.f("ix_upload_transaction_id"), "upload", ["transaction_id"], unique=True + ) + + +def downgrade(): + op.drop_index(op.f("ix_upload_transaction_id"), table_name="upload") + # column is dropped but there could be orphan transaction folders and required lockfiles will be missing, make sure upload table is empty + op.drop_column("upload", "transaction_id") + op.drop_column("upload", "last_ping") diff --git a/server/setup.py b/server/setup.py index c8c9dfb4..bc6aa257 100644 --- a/server/setup.py +++ b/server/setup.py @@ -6,7 +6,7 @@ setup( name="mergin", - version="2026.3.2", + version="2026.4.0", url="https://github.com/MerginMaps/mergin", license="AGPL-3.0-only", author="Lutra Consulting Limited", diff --git a/web-app/packages/lib/src/modules/user/store.ts b/web-app/packages/lib/src/modules/user/store.ts index 83f2fb9b..34da4d98 100644 --- a/web-app/packages/lib/src/modules/user/store.ts +++ b/web-app/packages/lib/src/modules/user/store.ts @@ -498,15 +498,10 @@ export const useUserStore = defineStore('userModule', { */ async getAuthNotProjectUserSearch(params: UserSearchParams) { const projectStore = useProjectStore() - const access = projectStore.project.access - const projectUsers = [ - ...access.readers, - ...access.writers, - ...access.owners - ] + const projectUsers = projectStore.access.map((item) => item.id) const response = await UserApi.getAuthUserSearch(params) - if (access) { + if (projectUsers.length) { response.data = response.data.filter( (item) => !projectUsers.find((id) => id === item.id) )