Skip to content

Commit

Permalink
Merge pull request #41 from jampp/DI-2029_Spark
Browse files Browse the repository at this point in the history
DI-2029: Add SQLAlchemy DB type
  • Loading branch information
gonzalezzfelipe authored Nov 10, 2021
2 parents a3e146f + 6a4693f commit 22e36a0
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 12 deletions.
8 changes: 4 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ USER root

# Install OS dependencies
RUN apt-get update && \
apt-get install -y curl gcc python3-dev libpq-dev openjdk-11-jre postgresql-contrib
apt-get install -y curl gcc python3-dev libpq-dev openjdk-11-jre postgresql-contrib jq

# Download all the Jars required to run Beeline
RUN cd /opt && \
Expand All @@ -34,14 +34,14 @@ RUN cd /opt && \
RUN cd /opt && \
curl https://archive.apache.org/dist/hadoop/core/hadoop-$HADOOP_VERSION/hadoop-$HADOOP_VERSION.tar.gz -o hadoop-$HADOOP_VERSION.tar.gz && \
tar -xzf hadoop-$HADOOP_VERSION.tar.gz && \
rm -rf apache-hive-bin.tar.gz
rm -rf hadoop-$HADOOP_VERSION.tar.gz

# Download PrestoCli
RUN mkdir /opt/presto-cli && \
cd /opt/presto-cli && \
curl https://repo1.maven.org/maven2/io/prestosql/presto-cli/$PRESTO_CLI_VERSION/presto-cli-$PRESTO_CLI_VERSION-executable.jar -o presto-cli && \
chmod +x presto-cli


# Install migratron
RUN pip install migratron==$MIGRATRON_VERSION
ADD . /opt/migratron
RUN pip install -e /opt/migratron
5 changes: 5 additions & 0 deletions docs/installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,8 @@ or locally, but a **very** basic guide is:
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64/
export PATH=$PATH:$HIVE_HOME/bin
SQLAlchemy
==========

For the SQLAlchemy option you only need to install the drivers needed for you particular URI.
2 changes: 1 addition & 1 deletion migratron/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.1.0
2.2.0
23 changes: 18 additions & 5 deletions migratron/command/run_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
from logging import getLogger
from datetime import datetime

from sqlalchemy import create_engine
from sqlalchemy.exc import ResourceClosedError

from migratron.command.base import (
BaseCommand,
INSERT_MIGRATION_DATA,
Expand Down Expand Up @@ -236,6 +239,15 @@ def _execute_filename(self, file_content, complete_filename):
"-f",
complete_filename
]
elif self.db_type == "sqlalchemy":
engine = create_engine(self.db_uri)
with engine.connect() as conn:
cursor = conn.execute(file_content)
try:
cursor.fetchall() # Force non async execution
except ResourceClosedError:
pass
command = None
else:
raise ValueError("Invalid database type")

Expand All @@ -248,8 +260,9 @@ def _execute_filename(self, file_content, complete_filename):
else:
command += [option, value]

try:
subprocess.check_call(command)
except Exception:
logger.exception("Error while running the migration: %s", complete_filename)
raise
if command is not None:
try:
subprocess.check_call(command)
except Exception:
logger.exception("Error while running the migration: %s", complete_filename)
raise
2 changes: 1 addition & 1 deletion migratron/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def create_run_migration_parser(subparser):
)
migrate_parser.add_argument(
"--db-type",
choices=("postgresql", "hive", "presto"),
choices=("postgresql", "hive", "presto", "sqlalchemy"),
required=False,
default="postgresql",
help=(
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pygments
psycopg2
psycopg2-binary
six
sqlalchemy
36 changes: 36 additions & 0 deletions tests/commands/test_run_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import os
import subprocess
import unittest
from tempfile import NamedTemporaryFile

import sqlalchemy

from .helper import BaseHelper, invalid_dbname
from migratron.command.initialize import CREATE_TABLE_SQL
Expand Down Expand Up @@ -131,3 +134,36 @@ def _drop_tables(self):
"DROP TABLE IF EXISTS %s" % table_name
]
subprocess.check_call(command)


class SQLAlchemyRunMigrationTest(BaseRunMigration):

@classmethod
def setUpClass(cls):
cls.dbfile = NamedTemporaryFile()
cls.database_uri = "sqlite:///" + cls.dbfile.name
cls.engine = sqlalchemy.create_engine(cls.database_uri)

super(BaseRunMigration, cls).setUpClass()

@classmethod
def tearDownClass(cls):
cls.dbfile.close()

def create_command(self):
command = RunMigrationCommand(
migration_type=ALL_MIGRATION_TYPES,
just_list_files=False,
additional_options=None,
db_type="sqlalchemy",
db_uri=self.database_uri,
**self.BASE_ARGS
)
return command

def _check_table_exist(self, table_name):
return sqlalchemy.inspect(self.engine).has_table(table_name)

def _drop_tables(self):
for table_name in ['t0', 't1']:
self.engine.execute(sqlalchemy.text("DROP TABLE IF EXISTS %s" % table_name))

0 comments on commit 22e36a0

Please sign in to comment.