diff --git a/app/commands.py b/app/commands.py index 0c098fdd..3d1f105d 100644 --- a/app/commands.py +++ b/app/commands.py @@ -357,14 +357,19 @@ def sync_brevo_command(pipeline_names, verbose): app.logger.info("Process sync companies with Brevo done") -@app.cli.command("migrate_anonymize_mission", with_appcontext=True) +@app.cli.command("migrate_anonymize_data", with_appcontext=True) @click.argument("time_interval") -def migrate_anonymize_mission(time_interval): +@click.option( + "--verbose", + is_flag=True, + help="Enable verbose mode for more detailed output", +) +def migrate_anonymize_mission(time_interval, verbose): """ Migrate data to anonymized tables. You can specify time interval as arguments. """ - from app.services.anonymize_tables import migrate_anonymize_mission + from app.services.anonymize_tables import migrate_anonymized_data if not time_interval: print( @@ -374,6 +379,6 @@ def migrate_anonymize_mission(time_interval): app.logger.info("Process with data migration and anonymization began") - migrate_anonymize_mission(time_interval) + migrate_anonymized_data(time_interval, verbose=verbose) app.logger.info("Process with data migration and anonymization done") diff --git a/app/models/__init__.py b/app/models/__init__.py index c982f813..7144ff50 100644 --- a/app/models/__init__.py +++ b/app/models/__init__.py @@ -31,3 +31,5 @@ from .user_survey_actions import UserSurveyActions from .user_agreement import UserAgreement from .anonymized.mission import MissionAnonymized +from .anonymized.activity import ActivityAnonymized +from .anonymized.activity_version import ActivityVersionAnonymized diff --git a/app/models/anonymized/activity.py b/app/models/anonymized/activity.py new file mode 100644 index 00000000..c630fee6 --- /dev/null +++ b/app/models/anonymized/activity.py @@ -0,0 +1,25 @@ +from app import db +from app.models.activity import Activity + + +class ActivityAnonymized(Activity): + backref_base_name = "activity_anonymized" + __mapper_args__ = {"concrete": True} + + id = db.Column(db.Integer, primary_key=True) + type = db.Column(db.String(8), nullable=True) + user_id = db.Column(db.Integer, nullable=True) + submitter_id = db.Column(db.Integer, nullable=True) + mission_id = db.Column(db.Integer, nullable=True) + dismiss_author_id = db.Column(db.Integer, nullable=True) + dismissed_at = db.Column(db.DateTime, nullable=True) + creation_time = db.Column(db.DateTime, nullable=True) + reception_time = db.Column(db.DateTime, nullable=True) + start_time = db.Column(db.DateTime, nullable=True) + end_time = db.Column(db.DateTime, nullable=True) + last_update_time = db.Column(db.DateTime, nullable=True) + last_submitter_id = db.Column(db.Integer, nullable=True) + dismiss_context = db.Column(db.JSON, nullable=True) + + def __repr__(self): + return f"" diff --git a/app/models/anonymized/activity_version.py b/app/models/anonymized/activity_version.py new file mode 100644 index 00000000..663d6e5e --- /dev/null +++ b/app/models/anonymized/activity_version.py @@ -0,0 +1,20 @@ +from app import db +from app.models.activity_version import ActivityVersion + + +class ActivityVersionAnonymized(ActivityVersion): + backref_base_name = "activity_version_anonymized" + __mapper_args__ = {"concrete": True} + + id = db.Column(db.Integer, primary_key=True) + activity_id = db.Column(db.Integer, nullable=True) + version_number = db.Column(db.Integer, nullable=True) + submitter_id = db.Column(db.Integer, nullable=True) + creation_time = db.Column(db.DateTime, nullable=True) + reception_time = db.Column(db.DateTime, nullable=True) + start_time = db.Column(db.DateTime, nullable=True) + end_time = db.Column(db.DateTime, nullable=True) + context = db.Column(db.JSON, nullable=True) + + def __repr__(self): + return f"" diff --git a/app/services/anonymize_tables.py b/app/services/anonymize_tables.py index d6d0138c..bdbb8fa6 100644 --- a/app/services/anonymize_tables.py +++ b/app/services/anonymize_tables.py @@ -1,11 +1,53 @@ +import logging import csv from io import StringIO from sqlalchemy import text from app import db import json +logger = logging.getLogger(__name__) -def migrate_anonymize_mission(interval: str): + +def migrate_anonymized_data(interval: str, verbose=False): + if verbose: + logger.setLevel(logging.DEBUG) + + connection = db.get_engine().raw_connection() + cursor = connection.cursor() + + try: + logger.debug(f"Migrate mission for interval: {interval}") + migrated_mission_ids = migrate_anonymized_mission( + interval, connection, cursor + ) + + logger.debug("Migrate activity for migrated missions") + migrated_activity_ids = migrate_anonymized_activity( + migrated_mission_ids, connection, cursor + ) + + logger.debug("Migrate activity version for migrated activities") + migrate_anonymized_activity_version( + migrated_activity_ids, connection, cursor + ) + + logger.debug("Delete original data") + delete_original_data( + migrated_mission_ids, migrated_activity_ids, connection, cursor + ) + + logger.debug(f"Migration complete for interval: {interval}") + connection.commit() + + except ValueError as e: + logger.error(f"Error during migration for interval '{interval}': {e}") + connection.rollback() + finally: + cursor.close() + connection.close() + + +def migrate_anonymized_mission(interval: str, connection, cursor): select_query = f""" SELECT id, @@ -15,67 +57,253 @@ def migrate_anonymize_mission(interval: str): vehicle_id, date_trunc('month', creation_time) AS creation_time, date_trunc('month', reception_time) AS reception_time, - context::jsonb AS context -- Convertir context en JSON valide + context::jsonb AS context FROM mission WHERE creation_time {interval}; """ + result = db.session.execute(text(select_query)) + rows = result.fetchall() + + if not rows: + print("No mission data to migrate.") + return [] + + csv_buffer = StringIO() + csv_writer = csv.writer(csv_buffer) + + migrated_mission_ids = [] + for row in rows: + row_as_list = list(row) + migrated_mission_ids.append(row_as_list[0]) + + if isinstance(row_as_list[-1], dict): + row_as_list[-1] = json.dumps(row_as_list[-1]) + + csv_writer.writerow(row_as_list) + + csv_buffer.seek(0) + try: - with db.session.begin_nested(): + cursor.copy_expert( + """ + COPY mission_anonymized ( + id, + name, + submitter_id, + company_id, + vehicle_id, + creation_time, + reception_time, + context + ) + FROM STDIN WITH (FORMAT CSV) + """, + csv_buffer, + ) - result = db.session.execute(text(select_query)) - rows = result.fetchall() + print("Anonymized mission migration successful.") + return migrated_mission_ids - if not rows: - print("No data to migrate.") - return + except Exception as e: + connection.rollback() + print(f"Error when copying mission data: {e}") + raise - csv_buffer = StringIO() - csv_writer = csv.writer(csv_buffer) + finally: + csv_buffer.close() - for row in rows: - row_as_list = list(row) - if isinstance(row_as_list[-1], dict): - row_as_list[-1] = json.dumps(row_as_list[-1]) +def migrate_anonymized_activity( + migrated_mission_ids: list, connection, cursor +): + if not migrated_mission_ids: + print("No mission data to migrate activities for.") + return [] - csv_writer.writerow(row_as_list) + select_query = """ + SELECT + id, + type, + NULL AS user_id, + NULL AS submitter_id, + mission_id, + NULL AS dismiss_author_id, + date_trunc('month', dismissed_at) AS dismissed_at, + date_trunc('month', creation_time) AS creation_time, + date_trunc('month', reception_time) AS reception_time, + date_trunc('month', start_time) AS start_time, + date_trunc('month', end_time) AS end_time, + date_trunc('month', last_update_time) AS last_update_time, + NULL AS last_submitter_id, + dismiss_context::jsonb AS dismiss_context + FROM activity + WHERE mission_id = ANY(:mission_ids); + """ - csv_buffer.seek(0) + result = db.session.execute( + text(select_query), {"mission_ids": migrated_mission_ids} + ) + rows = result.fetchall() - engine = db.get_engine() - connection = engine.raw_connection() + if not rows: + print("No activity data to migrate.") + return [] - try: - cursor = connection.cursor() - cursor.copy_expert( - """ - COPY mission_anonymized (id, name, submitter_id, company_id, vehicle_id, creation_time, reception_time, context) - FROM STDIN WITH (FORMAT CSV) - """, - csv_buffer, - ) + csv_buffer = StringIO() + csv_writer = csv.writer(csv_buffer) - delete_query = f""" - DELETE FROM mission WHERE creation_time {interval}; - """ - db.session.execute(text(delete_query)) + migrated_activity_ids = [] + for row in rows: + row_as_list = list(row) + migrated_activity_ids.append(row_as_list[0]) - print("Anonymized data migration successful.") + if isinstance(row_as_list[-1], dict): + row_as_list[-1] = json.dumps(row_as_list[-1]) - except Exception as e: - connection.rollback() - print(f"Error when copying mass data: {e}") - raise + csv_writer.writerow(row_as_list) - finally: - cursor.close() - connection.close() - csv_buffer.close() + csv_buffer.seek(0) + + try: + cursor.copy_expert( + """ + COPY activity_anonymized ( + id, + type, + user_id, + submitter_id, + mission_id, + dismiss_author_id, + dismissed_at, + creation_time, + reception_time, + start_time, + end_time, + last_update_time, + last_submitter_id, + dismiss_context + ) + FROM STDIN WITH (FORMAT CSV) + """, + csv_buffer, + ) + + print("Anonymized activity migration successful.") + return migrated_activity_ids except Exception as e: - db.session.rollback() - print(f"Transaction failed, rolling back changes: {e}") + connection.rollback() + print(f"Error when copying activity data: {e}") + raise finally: - db.session.close() + csv_buffer.close() + + +def migrate_anonymized_activity_version( + migrated_activity_ids: list, connection, cursor +): + if not migrated_activity_ids: + print("No activity data to migrate versions for.") + return + + select_query = """ + SELECT + id, + activity_id, + version_number, + NULL AS submitter_id, + date_trunc('month', creation_time) AS creation_time, + date_trunc('month', reception_time) AS reception_time, + date_trunc('month', start_time) AS start_time, + date_trunc('month', end_time) AS end_time, + context::jsonb AS context + FROM activity_version + WHERE activity_id = ANY(:activity_ids); + """ + + result = db.session.execute( + text(select_query), {"activity_ids": migrated_activity_ids} + ) + rows = result.fetchall() + + if not rows: + print("No activity version data to migrate.") + return + + csv_buffer = StringIO() + csv_writer = csv.writer(csv_buffer) + + for row in rows: + row_as_list = list(row) + + if isinstance(row_as_list[-1], dict): + row_as_list[-1] = json.dumps(row_as_list[-1]) + + csv_writer.writerow(row_as_list) + + csv_buffer.seek(0) + + try: + cursor.copy_expert( + """ + COPY activity_version_anonymized ( + id, + activity_id, + version_number, + submitter_id, + creation_time, + reception_time, + start_time, + end_time, + context + ) + FROM STDIN WITH (FORMAT CSV) + """, + csv_buffer, + ) + + print("Anonymized activity version migration successful.") + + except Exception as e: + connection.rollback() + print(f"Error when copying activity version data: {e}") + raise + + finally: + csv_buffer.close() + + +def delete_original_data( + migrated_mission_ids, migrated_activity_ids, connection, cursor +): + try: + delete_activity_version_query = """ + DELETE FROM activity_version WHERE activity_id = ANY(:activity_ids); + """ + db.session.execute( + text(delete_activity_version_query), + {"activity_ids": migrated_activity_ids}, + ) + + delete_activity_query = """ + DELETE FROM activity WHERE mission_id = ANY(:mission_ids); + """ + db.session.execute( + text(delete_activity_query), {"mission_ids": migrated_mission_ids} + ) + + delete_mission_query = """ + DELETE FROM mission WHERE id = ANY(:ids); + """ + db.session.execute( + text(delete_mission_query), {"ids": migrated_mission_ids} + ) + + print("Anonymized data deletion successful.") + + except Exception as e: + connection.rollback() + print(f"Error during data deletion: {e}") + raise diff --git a/migrations/versions/b016c0aa50e0_.py b/migrations/versions/b016c0aa50e0_.py new file mode 100644 index 00000000..d05f416f --- /dev/null +++ b/migrations/versions/b016c0aa50e0_.py @@ -0,0 +1,66 @@ +"""Add activity_anonymized and activity_version_anonymized table + +Revision ID: b016c0aa50e0 +Revises: c8870f7b9399 +Create Date: 2024-11-13 12:20:44.070879 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "b016c0aa50e0" +down_revision = "c8870f7b9399" +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "activity_anonymized", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("type", sa.String(length=8), nullable=True), + sa.Column("user_id", sa.Integer(), nullable=True), + sa.Column("submitter_id", sa.Integer(), nullable=True), + sa.Column("mission_id", sa.Integer(), nullable=True), + sa.Column("dismiss_author_id", sa.Integer(), nullable=True), + sa.Column("dismissed_at", sa.DateTime(), nullable=True), + sa.Column("creation_time", sa.DateTime(), nullable=True), + sa.Column("reception_time", sa.DateTime(), nullable=True), + sa.Column("start_time", sa.DateTime(), nullable=True), + sa.Column("end_time", sa.DateTime(), nullable=True), + sa.Column("last_update_time", sa.DateTime(), nullable=True), + sa.Column("last_submitter_id", sa.Integer(), nullable=True), + sa.Column("dismiss_context", sa.JSON(), nullable=True), + sa.PrimaryKeyConstraint("id"), + ) + + op.create_table( + "activity_version_anonymized", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("activity_id", sa.Integer(), nullable=True), + sa.Column("version_number", sa.Integer(), nullable=True), + sa.Column("submitter_id", sa.Integer(), nullable=True), + sa.Column("creation_time", sa.DateTime(), nullable=True), + sa.Column("reception_time", sa.DateTime(), nullable=True), + sa.Column("start_time", sa.DateTime(), nullable=True), + sa.Column("end_time", sa.DateTime(), nullable=True), + sa.Column("context", sa.JSON(), nullable=True), + sa.PrimaryKeyConstraint("id"), + ) + op.drop_constraint( + "activity_last_submitter_id_fkey", "activity", type_="foreignkey" + ) + op.create_foreign_key( + None, "activity", "user", ["last_submitter_id"], ["id"] + ) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_table("activity_version_anonymized") + op.drop_table("activity_anonymized") + # ### end Alembic commands ###