Skip to content

Commit

Permalink
start adding tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jmaupetit committed Oct 17, 2024
1 parent d38acd0 commit 3ec1c2d
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 10 deletions.
2 changes: 2 additions & 0 deletions src/api/qualicharge/factories/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,7 @@ class TimestampedSQLModelFactory(Generic[T], SQLAlchemyFactory[T]):
__is_base_factory__ = True

id = Use(uuid4)
created_by_id = None
updated_by_id = None
created_at = Use(lambda: datetime.now(timezone.utc) - timedelta(hours=1))
updated_at = Use(datetime.now, timezone.utc)
31 changes: 22 additions & 9 deletions src/api/qualicharge/schemas/audit.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""QualiCharge auditable schemas."""

import logging
from datetime import datetime, timezone
from enum import StrEnum
from typing import Any, Optional
Expand All @@ -11,6 +12,8 @@
from sqlalchemy.types import JSON, DateTime
from sqlmodel import Field, SQLModel

logger = logging.getLogger(__name__)


class AuditableFieldBlackListEnum(StrEnum):
"""Fields black listed for auditability."""
Expand Down Expand Up @@ -106,6 +109,10 @@ def track_model_changes(mapper, connection, target):
date and the author. For fields with sensitive information (_e.g._ passwords or
tokens), a null value is stored.
"""
if target.updated_by_id is None:
logger.debug("Target updated_by_id is empty, aborting changes tracking.")
return

state = inspect(target)

# Get changes
Expand All @@ -116,15 +123,21 @@ def track_model_changes(mapper, connection, target):
history = attr.load_history()
if not history.has_changes():
continue
changes[attr.key] = [history.deleted, history.added]
changes[attr.key] = [
str(history.deleted[0]) if len(history.deleted) else None,
str(history.added[0]) if len(history.added) else None,
]

logger.debug("Detected changes: %s", str(changes))

# Log changes
connection.execute(
insert(Audit).values(
table=target.__tablename__,
author_id=target.updated_by_id,
target_id=target.id,
updated_at=target.updated_at,
changes=changes,
)
audit = Audit(
table=target.__tablename__,
author_id=target.updated_by_id,
target_id=target.id,
updated_at=target.updated_at,
changes=changes,
)
connection.execute(insert(Audit).values(**audit.model_dump()))
print(f"{dir(target.audits)=}")
# target.audits.append(audit)
5 changes: 4 additions & 1 deletion src/api/qualicharge/schemas/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
ImplantationStationEnum,
RaccordementEnum,
)
from .audit import BaseAuditableSQLModel
from .audit import BaseAuditableSQLModel, track_model_changes

if TYPE_CHECKING:
from qualicharge.auth.schemas import Group
Expand Down Expand Up @@ -370,3 +370,6 @@ class Status(BaseAuditableSQLModel, StatusBase, table=True):
def id_pdc_itinerance(self) -> str:
"""Return the PointDeCharge.id_pdc_itinerance (used for serialization only)."""
return self.point_de_charge.id_pdc_itinerance


event.listen(Operateur, "after_update", track_model_changes)
104 changes: 104 additions & 0 deletions src/api/tests/schemas/test_audit.py
Original file line number Diff line number Diff line change
@@ -1 +1,105 @@
"""QualiCharge auditable schemas tests."""

from sqlalchemy import func
from sqlmodel import select

from qualicharge.auth.factories import UserFactory
from qualicharge.factories.static import OperateurFactory
from qualicharge.schemas.audit import Audit
from qualicharge.schemas.core import Operateur


def test_auditable_schema_changes(db_session):
"""Test an updated schema instance creates a new Audit entry."""
OperateurFactory.__session__ = db_session
UserFactory.__session__ = db_session

user = UserFactory.create_sync()

# Check initial database state
assert db_session.exec(select(func.count(Operateur.id))).one() == 0
assert db_session.exec(select(func.count(Audit.id))).one() == 0

# Persist an operateur with not creator or updator
operateur = OperateurFactory.create_sync(
nom_operateur="Doe inc.",
contact_operateur="[email protected]",
telephone_operateur="+33144276350",
)

# Check database state
assert db_session.exec(select(func.count(Operateur.id))).one() == 1
assert db_session.exec(select(func.count(Audit.id))).one() == 0

# Update operateur without updator
operateur.contact_operateur = "[email protected]"
operateur.telephone_operateur = "+33144276351"
db_session.add(operateur)

# Check database state
assert db_session.exec(select(func.count(Operateur.id))).one() == 1
assert db_session.exec(select(func.count(Audit.id))).one() == 0

# Now update operateur with an updator
operateur.updated_by_id = user.id
operateur.contact_operateur = "[email protected]"
operateur.telephone_operateur = "+33144276352"
db_session.add(operateur)

# Check database state
assert db_session.exec(select(func.count(Operateur.id))).one() == 1
assert db_session.exec(select(func.count(Audit.id))).one() == 1
audit = db_session.exec(select(Audit)).first()
assert audit.table == "operateur"
assert audit.author_id == user.id
assert audit.target_id == operateur.id
assert audit.updated_at == operateur.updated_at
assert audit.changes == {
"updated_by_id": ["None", str(user.id)],
"contact_operateur": ["[email protected]", "[email protected]"],
"telephone_operateur": ["tel:+33-1-44-27-63-51", "tel:+33-1-44-27-63-52"],
}

# Perform new updates
operateur.contact_operateur = "[email protected]"
operateur.telephone_operateur = "+33144276353"
db_session.add(operateur)

# Check database state
expected_audits = 2
assert db_session.exec(select(func.count(Operateur.id))).one() == 1
assert db_session.exec(select(func.count(Audit.id))).one() == expected_audits
audit = db_session.exec(select(Audit).order_by(Audit.updated_at.desc())).first()
assert audit.table == "operateur"
assert audit.author_id == user.id
assert audit.target_id == operateur.id
assert audit.updated_at == operateur.updated_at
assert audit.changes == {
"contact_operateur": ["[email protected]", "[email protected]"],
"telephone_operateur": ["tel:+33-1-44-27-63-52", "tel:+33-1-44-27-63-53"],
}


def test_auditable_schema_changes_dynamic_fk(db_session):
"""Test auditable schema dynamic audits foreign key."""
OperateurFactory.__session__ = db_session
UserFactory.__session__ = db_session

user = UserFactory.create_sync()
operateur = OperateurFactory.create_sync(
nom_operateur="Doe inc.",
contact_operateur="[email protected]",
telephone_operateur="+33144276350",
updated_by_id=user.id
)

# Now update operateur twice
operateur.contact_operateur = "[email protected]"
operateur.telephone_operateur = "+33144276352"
db_session.add(operateur)
operateur.contact_operateur = "[email protected]"
operateur.telephone_operateur = "+33144276353"
db_session.add(operateur)

# Test audits dymanic generic FK
assert len(operateur.audits) == 2

0 comments on commit 3ec1c2d

Please sign in to comment.