diff --git a/src/api/qualicharge/factories/__init__.py b/src/api/qualicharge/factories/__init__.py index d85f10ce..bb82920d 100644 --- a/src/api/qualicharge/factories/__init__.py +++ b/src/api/qualicharge/factories/__init__.py @@ -32,5 +32,7 @@ class TimestampedSQLModelFactory(Generic[T], SQLAlchemyFactory[T]): __is_base_factory__ = True id = Use(uuid4) + created_by_id = None + updated_by_id = None created_at = Use(lambda: datetime.now(timezone.utc) - timedelta(hours=1)) updated_at = Use(datetime.now, timezone.utc) diff --git a/src/api/qualicharge/schemas/audit.py b/src/api/qualicharge/schemas/audit.py index f7e8f40b..f5991cfd 100644 --- a/src/api/qualicharge/schemas/audit.py +++ b/src/api/qualicharge/schemas/audit.py @@ -1,5 +1,6 @@ """QualiCharge auditable schemas.""" +import logging from datetime import datetime, timezone from enum import StrEnum from typing import Any, Optional @@ -11,6 +12,8 @@ from sqlalchemy.types import JSON, DateTime from sqlmodel import Field, SQLModel +logger = logging.getLogger(__name__) + class AuditableFieldBlackListEnum(StrEnum): """Fields black listed for auditability.""" @@ -106,6 +109,10 @@ def track_model_changes(mapper, connection, target): date and the author. For fields with sensitive information (_e.g._ passwords or tokens), a null value is stored. """ + if target.updated_by_id is None: + logger.debug("Target updated_by_id is empty, aborting changes tracking.") + return + state = inspect(target) # Get changes @@ -116,15 +123,21 @@ def track_model_changes(mapper, connection, target): history = attr.load_history() if not history.has_changes(): continue - changes[attr.key] = [history.deleted, history.added] + changes[attr.key] = [ + str(history.deleted[0]) if len(history.deleted) else None, + str(history.added[0]) if len(history.added) else None, + ] + + logger.debug("Detected changes: %s", str(changes)) # Log changes - connection.execute( - insert(Audit).values( - table=target.__tablename__, - author_id=target.updated_by_id, - target_id=target.id, - updated_at=target.updated_at, - changes=changes, - ) + audit = Audit( + table=target.__tablename__, + author_id=target.updated_by_id, + target_id=target.id, + updated_at=target.updated_at, + changes=changes, ) + connection.execute(insert(Audit).values(**audit.model_dump())) + print(f"{dir(target.audits)=}") + # target.audits.append(audit) diff --git a/src/api/qualicharge/schemas/core.py b/src/api/qualicharge/schemas/core.py index 10815384..d3b8b5b4 100644 --- a/src/api/qualicharge/schemas/core.py +++ b/src/api/qualicharge/schemas/core.py @@ -37,7 +37,7 @@ ImplantationStationEnum, RaccordementEnum, ) -from .audit import BaseAuditableSQLModel +from .audit import BaseAuditableSQLModel, track_model_changes if TYPE_CHECKING: from qualicharge.auth.schemas import Group @@ -370,3 +370,6 @@ class Status(BaseAuditableSQLModel, StatusBase, table=True): def id_pdc_itinerance(self) -> str: """Return the PointDeCharge.id_pdc_itinerance (used for serialization only).""" return self.point_de_charge.id_pdc_itinerance + + +event.listen(Operateur, "after_update", track_model_changes) diff --git a/src/api/tests/schemas/test_audit.py b/src/api/tests/schemas/test_audit.py index 6f917c6e..b8941282 100644 --- a/src/api/tests/schemas/test_audit.py +++ b/src/api/tests/schemas/test_audit.py @@ -1 +1,105 @@ """QualiCharge auditable schemas tests.""" + +from sqlalchemy import func +from sqlmodel import select + +from qualicharge.auth.factories import UserFactory +from qualicharge.factories.static import OperateurFactory +from qualicharge.schemas.audit import Audit +from qualicharge.schemas.core import Operateur + + +def test_auditable_schema_changes(db_session): + """Test an updated schema instance creates a new Audit entry.""" + OperateurFactory.__session__ = db_session + UserFactory.__session__ = db_session + + user = UserFactory.create_sync() + + # Check initial database state + assert db_session.exec(select(func.count(Operateur.id))).one() == 0 + assert db_session.exec(select(func.count(Audit.id))).one() == 0 + + # Persist an operateur with not creator or updator + operateur = OperateurFactory.create_sync( + nom_operateur="Doe inc.", + contact_operateur="john@doe.com", + telephone_operateur="+33144276350", + ) + + # Check database state + assert db_session.exec(select(func.count(Operateur.id))).one() == 1 + assert db_session.exec(select(func.count(Audit.id))).one() == 0 + + # Update operateur without updator + operateur.contact_operateur = "jane@doe.com" + operateur.telephone_operateur = "+33144276351" + db_session.add(operateur) + + # Check database state + assert db_session.exec(select(func.count(Operateur.id))).one() == 1 + assert db_session.exec(select(func.count(Audit.id))).one() == 0 + + # Now update operateur with an updator + operateur.updated_by_id = user.id + operateur.contact_operateur = "janine@doe.com" + operateur.telephone_operateur = "+33144276352" + db_session.add(operateur) + + # Check database state + assert db_session.exec(select(func.count(Operateur.id))).one() == 1 + assert db_session.exec(select(func.count(Audit.id))).one() == 1 + audit = db_session.exec(select(Audit)).first() + assert audit.table == "operateur" + assert audit.author_id == user.id + assert audit.target_id == operateur.id + assert audit.updated_at == operateur.updated_at + assert audit.changes == { + "updated_by_id": ["None", str(user.id)], + "contact_operateur": ["jane@doe.com", "janine@doe.com"], + "telephone_operateur": ["tel:+33-1-44-27-63-51", "tel:+33-1-44-27-63-52"], + } + + # Perform new updates + operateur.contact_operateur = "janot@doe.com" + operateur.telephone_operateur = "+33144276353" + db_session.add(operateur) + + # Check database state + expected_audits = 2 + assert db_session.exec(select(func.count(Operateur.id))).one() == 1 + assert db_session.exec(select(func.count(Audit.id))).one() == expected_audits + audit = db_session.exec(select(Audit).order_by(Audit.updated_at.desc())).first() + assert audit.table == "operateur" + assert audit.author_id == user.id + assert audit.target_id == operateur.id + assert audit.updated_at == operateur.updated_at + assert audit.changes == { + "contact_operateur": ["janine@doe.com", "janot@doe.com"], + "telephone_operateur": ["tel:+33-1-44-27-63-52", "tel:+33-1-44-27-63-53"], + } + + +def test_auditable_schema_changes_dynamic_fk(db_session): + """Test auditable schema dynamic audits foreign key.""" + OperateurFactory.__session__ = db_session + UserFactory.__session__ = db_session + + user = UserFactory.create_sync() + operateur = OperateurFactory.create_sync( + nom_operateur="Doe inc.", + contact_operateur="john@doe.com", + telephone_operateur="+33144276350", + updated_by_id=user.id + ) + + # Now update operateur twice + operateur.contact_operateur = "janine@doe.com" + operateur.telephone_operateur = "+33144276352" + db_session.add(operateur) + operateur.contact_operateur = "janot@doe.com" + operateur.telephone_operateur = "+33144276353" + db_session.add(operateur) + + # Test audits dymanic generic FK + assert len(operateur.audits) == 2