From ee16aef17f8a5528e32a6485b4ab84b95c53db5b Mon Sep 17 00:00:00 2001 From: Rama Date: Fri, 5 Nov 2021 13:02:08 -0300 Subject: [PATCH 1/7] DI-2029: DRAFT The dockerfile assumes you got the data-commmon in the folder, so you can build the last version Also theres some dirty logs all over the place, to have visibility --- Dockerfile | 19 ++++++++++++++++--- migratron/VERSION | 2 +- migratron/command/run_migration.py | 26 ++++++++++++++++++++++++++ migratron/parsers.py | 2 +- requirements.txt | 7 ++++--- 5 files changed, 48 insertions(+), 8 deletions(-) diff --git a/Dockerfile b/Dockerfile index 4268f3d..9941e53 100644 --- a/Dockerfile +++ b/Dockerfile @@ -34,14 +34,27 @@ RUN cd /opt && \ RUN cd /opt && \ curl https://archive.apache.org/dist/hadoop/core/hadoop-$HADOOP_VERSION/hadoop-$HADOOP_VERSION.tar.gz -o hadoop-$HADOOP_VERSION.tar.gz && \ tar -xzf hadoop-$HADOOP_VERSION.tar.gz && \ - rm -rf apache-hive-bin.tar.gz + rm -rf hadoop-$HADOOP_VERSION.tar.gz # Download PrestoCli RUN mkdir /opt/presto-cli && \ cd /opt/presto-cli && \ curl https://repo1.maven.org/maven2/io/prestosql/presto-cli/$PRESTO_CLI_VERSION/presto-cli-$PRESTO_CLI_VERSION-executable.jar -o presto-cli && \ chmod +x presto-cli - # Install migratron -RUN pip install migratron==$MIGRATRON_VERSION +# RUN pip install migratron==$MIGRATRON_VERSION +# RUN pip install datacommon[livy]==5.0.0 --extra-index-url http://pypi.jampp.com/pypi/ --trusted-host pypi.jampp.com + +ADD ./data-common /opt/data-common +RUN cd /opt/data-common && \ + pip install -e . && \ + pip install -e .[livy] && \ + python setup.py install + + +# Install local migratron +ADD . /opt/migratron +RUN cd /opt/migratron && \ + python setup.py install + diff --git a/migratron/VERSION b/migratron/VERSION index 7ec1d6d..3e3c2f1 100644 --- a/migratron/VERSION +++ b/migratron/VERSION @@ -1 +1 @@ -2.1.0 +2.1.1 diff --git a/migratron/command/run_migration.py b/migratron/command/run_migration.py index 91cfa0b..358c1ed 100644 --- a/migratron/command/run_migration.py +++ b/migratron/command/run_migration.py @@ -236,6 +236,32 @@ def _execute_filename(self, file_content, complete_filename): "-f", complete_filename ] + elif self.db_type == "livy": + from datacommon.livy import dbapi + from sqlalchemy.sql import text + + with dbapi.connect( + # "emr-stg-etl.jampp.com", + # self.db_uri, + "livy", + queue= "etl", + spark_conf={ + "spark.sql.catalog.hive_prod": "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.hive_prod.type": "hive", + "spark.jars.packages" : "org.apache.iceberg:iceberg-spark3-runtime:0.12.0", + "spark.sql.catalog.hive_prod.uri": "thrift://hive-metastore:9083" + } + ) as conn: + logger.warning("Created connnection") + f = open(complete_filename) + query = f.read() + logger.info(query) + logger.info(f"query: {query} ") + logger.info(f"filename: {complete_filename} ") + conn.execute(query).fetchall() + logger.info("query executed") + command = ["true"] + else: raise ValueError("Invalid database type") diff --git a/migratron/parsers.py b/migratron/parsers.py index 10eea49..7f8d9c0 100644 --- a/migratron/parsers.py +++ b/migratron/parsers.py @@ -86,7 +86,7 @@ def create_run_migration_parser(subparser): ) migrate_parser.add_argument( "--db-type", - choices=("postgresql", "hive", "presto"), + choices=("postgresql", "hive", "presto", "livy"), required=False, default="postgresql", help=( diff --git a/requirements.txt b/requirements.txt index a432f15..67e2d58 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ -pygments -psycopg2 -six +pygments==2.10.0 +psycopg2-binary==2.9.1 +datacommon[livy]==5.0.0 +six==1.16.0 From b42024f93c31fed10cf5e47e23cba095bc786924 Mon Sep 17 00:00:00 2001 From: Felipe Gonzalez Date: Mon, 8 Nov 2021 11:22:17 -0300 Subject: [PATCH 2/7] DI-2029: Cleanup --- Dockerfile | 15 +-------------- migratron/command/run_migration.py | 8 ++++---- requirements.txt | 9 +++++---- 3 files changed, 10 insertions(+), 22 deletions(-) diff --git a/Dockerfile b/Dockerfile index 9941e53..8d5a857 100644 --- a/Dockerfile +++ b/Dockerfile @@ -43,18 +43,5 @@ RUN mkdir /opt/presto-cli && \ chmod +x presto-cli # Install migratron -# RUN pip install migratron==$MIGRATRON_VERSION -# RUN pip install datacommon[livy]==5.0.0 --extra-index-url http://pypi.jampp.com/pypi/ --trusted-host pypi.jampp.com - -ADD ./data-common /opt/data-common -RUN cd /opt/data-common && \ - pip install -e . && \ - pip install -e .[livy] && \ - python setup.py install - - -# Install local migratron ADD . /opt/migratron -RUN cd /opt/migratron && \ - python setup.py install - +RUN pip install -e /opt/migratron --extra-index-url https://pypi.jampp.com/pypi --trusted-host pypi.jampp.com diff --git a/migratron/command/run_migration.py b/migratron/command/run_migration.py index 358c1ed..688e145 100644 --- a/migratron/command/run_migration.py +++ b/migratron/command/run_migration.py @@ -246,10 +246,10 @@ def _execute_filename(self, file_content, complete_filename): "livy", queue= "etl", spark_conf={ - "spark.sql.catalog.hive_prod": "org.apache.iceberg.spark.SparkCatalog", - "spark.sql.catalog.hive_prod.type": "hive", - "spark.jars.packages" : "org.apache.iceberg:iceberg-spark3-runtime:0.12.0", - "spark.sql.catalog.hive_prod.uri": "thrift://hive-metastore:9083" + "spark.sql.catalog.hive_prod": "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.hive_prod.type": "hive", + "spark.jars.packages" : "org.apache.iceberg:iceberg-spark3-runtime:0.12.0", + "spark.sql.catalog.hive_prod.uri": "thrift://hive-metastore:9083" } ) as conn: logger.warning("Created connnection") diff --git a/requirements.txt b/requirements.txt index 67e2d58..bd6da5d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ -pygments==2.10.0 -psycopg2-binary==2.9.1 -datacommon[livy]==5.0.0 -six==1.16.0 +pygments +psycopg2-binary +six + +datacommon[livy]==5.0.0rc0 From 5c0b6c1d312e0b5de57b747942ad309771e4a809 Mon Sep 17 00:00:00 2001 From: Felipe Gonzalez Date: Tue, 9 Nov 2021 12:08:39 -0300 Subject: [PATCH 3/7] DI-2029: Use SQLAlchemy instead of data-common --- docs/installation.rst | 5 +++ migratron/VERSION | 2 +- migratron/command/run_migration.py | 49 ++++++++++------------------ migratron/parsers.py | 2 +- requirements.txt | 3 +- tests/commands/test_run_migration.py | 36 ++++++++++++++++++++ 6 files changed, 62 insertions(+), 35 deletions(-) diff --git a/docs/installation.rst b/docs/installation.rst index 4b292a5..ea8fcd7 100644 --- a/docs/installation.rst +++ b/docs/installation.rst @@ -97,3 +97,8 @@ or locally, but a **very** basic guide is: export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64/ export PATH=$PATH:$HIVE_HOME/bin + +SQLAlchemy +========== + +For the SQLAlchemy option you only need to install the drivers needed for you particular URI. diff --git a/migratron/VERSION b/migratron/VERSION index 3e3c2f1..ccbccc3 100644 --- a/migratron/VERSION +++ b/migratron/VERSION @@ -1 +1 @@ -2.1.1 +2.2.0 diff --git a/migratron/command/run_migration.py b/migratron/command/run_migration.py index 688e145..e02337e 100644 --- a/migratron/command/run_migration.py +++ b/migratron/command/run_migration.py @@ -9,6 +9,9 @@ from logging import getLogger from datetime import datetime +from sqlalchemy import create_engine +from sqlalchemy.exc import ResourceClosedError + from migratron.command.base import ( BaseCommand, INSERT_MIGRATION_DATA, @@ -236,32 +239,15 @@ def _execute_filename(self, file_content, complete_filename): "-f", complete_filename ] - elif self.db_type == "livy": - from datacommon.livy import dbapi - from sqlalchemy.sql import text - - with dbapi.connect( - # "emr-stg-etl.jampp.com", - # self.db_uri, - "livy", - queue= "etl", - spark_conf={ - "spark.sql.catalog.hive_prod": "org.apache.iceberg.spark.SparkCatalog", - "spark.sql.catalog.hive_prod.type": "hive", - "spark.jars.packages" : "org.apache.iceberg:iceberg-spark3-runtime:0.12.0", - "spark.sql.catalog.hive_prod.uri": "thrift://hive-metastore:9083" - } - ) as conn: - logger.warning("Created connnection") - f = open(complete_filename) - query = f.read() - logger.info(query) - logger.info(f"query: {query} ") - logger.info(f"filename: {complete_filename} ") - conn.execute(query).fetchall() - logger.info("query executed") - command = ["true"] - + elif self.db_type == "sqlalchemy": + engine = create_engine(self.db_uri) + with engine.connect() as conn: + cursor = conn.execute(file_content) + try: + cursor.fetchall() # Force non async execution + except ResourceClosedError: + pass + command = None else: raise ValueError("Invalid database type") @@ -274,8 +260,9 @@ def _execute_filename(self, file_content, complete_filename): else: command += [option, value] - try: - subprocess.check_call(command) - except Exception: - logger.exception("Error while running the migration: %s", complete_filename) - raise + if command is not None: + try: + subprocess.check_call(command) + except Exception: + logger.exception("Error while running the migration: %s", complete_filename) + raise diff --git a/migratron/parsers.py b/migratron/parsers.py index 7f8d9c0..5d0210f 100644 --- a/migratron/parsers.py +++ b/migratron/parsers.py @@ -86,7 +86,7 @@ def create_run_migration_parser(subparser): ) migrate_parser.add_argument( "--db-type", - choices=("postgresql", "hive", "presto", "livy"), + choices=("postgresql", "hive", "presto", "sqlalchemy"), required=False, default="postgresql", help=( diff --git a/requirements.txt b/requirements.txt index bd6da5d..5d06718 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,4 @@ pygments psycopg2-binary six - -datacommon[livy]==5.0.0rc0 +sqlalchemy diff --git a/tests/commands/test_run_migration.py b/tests/commands/test_run_migration.py index 8c1c801..8f58591 100644 --- a/tests/commands/test_run_migration.py +++ b/tests/commands/test_run_migration.py @@ -3,6 +3,9 @@ import os import subprocess import unittest +from tempfile import NamedTemporaryFile + +import sqlalchemy from .helper import BaseHelper, invalid_dbname from migratron.command.initialize import CREATE_TABLE_SQL @@ -131,3 +134,36 @@ def _drop_tables(self): "DROP TABLE IF EXISTS %s" % table_name ] subprocess.check_call(command) + + +class SQLAlchemyRunMigrationTest(BaseRunMigration): + + @classmethod + def setUpClass(cls): + cls.dbfile = NamedTemporaryFile() + cls.database_uri = "sqlite:///" + cls.dbfile.name + cls.engine = sqlalchemy.create_engine(cls.database_uri) + + super(BaseRunMigration, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + cls.dbfile.close() + + def create_command(self): + command = RunMigrationCommand( + migration_type=ALL_MIGRATION_TYPES, + just_list_files=False, + additional_options=None, + db_type="sqlalchemy", + db_uri=self.database_uri, + **self.BASE_ARGS + ) + return command + + def _check_table_exist(self, table_name): + return sqlalchemy.inspect(self.engine).has_table(table_name) + + def _drop_tables(self): + for table_name in ['t0', 't1']: + self.engine.execute(sqlalchemy.text("DROP TABLE IF EXISTS %s" % table_name)) From bd65343a3db8a1cd8b27e614e6e3f3d86c42c7b4 Mon Sep 17 00:00:00 2001 From: Felipe Gonzalez Date: Tue, 9 Nov 2021 12:27:21 -0300 Subject: [PATCH 4/7] DI-2029: Add jq --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 8d5a857..da6e364 100644 --- a/Dockerfile +++ b/Dockerfile @@ -22,7 +22,7 @@ USER root # Install OS dependencies RUN apt-get update && \ - apt-get install -y curl gcc python3-dev libpq-dev openjdk-11-jre postgresql-contrib + apt-get install -y curl gcc python3-dev libpq-dev openjdk-11-jre postgresql-contrib jq # Download all the Jars required to run Beeline RUN cd /opt && \ From dca395eda11ccb3e0ec874db2e8516e264da9d0f Mon Sep 17 00:00:00 2001 From: Felipe Gonzalez Date: Tue, 9 Nov 2021 12:34:28 -0300 Subject: [PATCH 5/7] DI-2029: Remove extra index url --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index da6e364..d5dbd9f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -44,4 +44,4 @@ RUN mkdir /opt/presto-cli && \ # Install migratron ADD . /opt/migratron -RUN pip install -e /opt/migratron --extra-index-url https://pypi.jampp.com/pypi --trusted-host pypi.jampp.com +RUN pip install -e /opt/migratron From d640ae42cd3596326d03e7f98b6b514a5e0114ce Mon Sep 17 00:00:00 2001 From: Felipe Gonzalez Date: Tue, 9 Nov 2021 13:29:12 -0300 Subject: [PATCH 6/7] DI-2029: Explicitely close connection --- migratron/command/run_migration.py | 1 + 1 file changed, 1 insertion(+) diff --git a/migratron/command/run_migration.py b/migratron/command/run_migration.py index e02337e..adf7d6a 100644 --- a/migratron/command/run_migration.py +++ b/migratron/command/run_migration.py @@ -247,6 +247,7 @@ def _execute_filename(self, file_content, complete_filename): cursor.fetchall() # Force non async execution except ResourceClosedError: pass + conn.close() command = None else: raise ValueError("Invalid database type") From 6a4693f9476a268205a414065451615c00fcc822 Mon Sep 17 00:00:00 2001 From: Felipe Gonzalez Date: Tue, 9 Nov 2021 14:05:57 -0300 Subject: [PATCH 7/7] DI-2029: Dont explicitely close connection --- migratron/command/run_migration.py | 1 - 1 file changed, 1 deletion(-) diff --git a/migratron/command/run_migration.py b/migratron/command/run_migration.py index adf7d6a..e02337e 100644 --- a/migratron/command/run_migration.py +++ b/migratron/command/run_migration.py @@ -247,7 +247,6 @@ def _execute_filename(self, file_content, complete_filename): cursor.fetchall() # Force non async execution except ResourceClosedError: pass - conn.close() command = None else: raise ValueError("Invalid database type")