From 05d54b30702ecfc8a0d92704a1da222f06e5937f Mon Sep 17 00:00:00 2001 From: SunZhaonan Date: Wed, 16 Dec 2015 16:58:32 -0800 Subject: [PATCH 1/2] Add Hive metadata ETL process --- backend-service/app/actors/EtlJobFactory.java | 3 + backend-service/app/models/EtlJobName.java | 1 + build.gradle | 4 +- metadata-etl/build.gradle | 1 + .../etl/dataset/hive/HiveMetadataEtl.java | 50 +++ .../etl/dataset/hive/HiveViewDependency.java | 30 ++ .../resources/application.properties.template | 10 + .../src/main/resources/jython/HiveExtract.py | 310 ++++++++++++++++++ .../src/main/resources/jython/HiveLoad.py | 163 +++++++++ .../main/resources/jython/HiveTransform.py | 145 ++++++++ .../metadata/etl/dataset/hive/HiveTest.java | 48 +++ .../hive/HiveViewDependencyParserTest.java | 22 ++ 12 files changed, 786 insertions(+), 1 deletion(-) create mode 100644 metadata-etl/src/main/java/metadata/etl/dataset/hive/HiveMetadataEtl.java create mode 100644 metadata-etl/src/main/java/metadata/etl/dataset/hive/HiveViewDependency.java create mode 100644 metadata-etl/src/main/resources/jython/HiveExtract.py create mode 100644 metadata-etl/src/main/resources/jython/HiveLoad.py create mode 100644 metadata-etl/src/main/resources/jython/HiveTransform.py create mode 100644 metadata-etl/src/test/java/metadata/etl/dataset/hive/HiveTest.java create mode 100644 metadata-etl/src/test/java/metadata/etl/dataset/hive/HiveViewDependencyParserTest.java diff --git a/backend-service/app/actors/EtlJobFactory.java b/backend-service/app/actors/EtlJobFactory.java index 8319428a37832d..b30622f614ed9e 100644 --- a/backend-service/app/actors/EtlJobFactory.java +++ b/backend-service/app/actors/EtlJobFactory.java @@ -16,6 +16,7 @@ import java.util.Properties; import metadata.etl.EtlJob; import metadata.etl.dataset.hdfs.HdfsMetadataEtl; +import metadata.etl.dataset.hive.HiveMetadataEtl; import metadata.etl.dataset.teradata.TeradataMetadataEtl; import metadata.etl.git.GitMetadataEtl; import metadata.etl.lineage.AzLineageMetadataEtl; @@ -49,6 +50,8 @@ public static EtlJob getEtlJob(EtlJobName etlJobName, Integer refId, Long whExec return new LdapEtl(refId, whExecId, properties); case GIT_MEDATA_ETL: return new GitMetadataEtl(refId, whExecId, properties); + case HIVE_DATASET_METADATA_ETL: + return new HiveMetadataEtl(refId, whExecId, properties); default: throw new UnsupportedOperationException("Unsupported job type: " + etlJobName); } diff --git a/backend-service/app/models/EtlJobName.java b/backend-service/app/models/EtlJobName.java index 54a0b2d4e7f415..489bf4a3efda7b 100644 --- a/backend-service/app/models/EtlJobName.java +++ b/backend-service/app/models/EtlJobName.java @@ -25,6 +25,7 @@ public enum EtlJobName { HADOOP_DATASET_OWNER_ETL(EtlType.OWNER, RefIdType.DB), LDAP_USER_ETL(EtlType.LDAP, RefIdType.APP), GIT_MEDATA_ETL(EtlType.VCS, RefIdType.APP), + HIVE_DATASET_METADATA_ETL(EtlType.DATASET, RefIdType.DB), ; EtlType etlType; diff --git a/build.gradle b/build.gradle index f174a196399e15..ff22125e666067 100644 --- a/build.gradle +++ b/build.gradle @@ -69,5 +69,7 @@ subprojects { "play" : "com.typesafe.play:play_2.10:2.2.4", "play_ebean" : "com.typesafe.play:play-java-ebean_2.10:2.2.4", "play_java_jdbc" : "com.typesafe.play:play-java-jdbc_2.10:2.2.4", - "play_cache" : "com.typesafe.play:play-cache_2.10:2.2.4"] + "play_cache" : "com.typesafe.play:play-cache_2.10:2.2.4", + "hive_exec" : "org.apache.hive:hive-exec:0.8.0" + ] } diff --git a/metadata-etl/build.gradle b/metadata-etl/build.gradle index 883aeecc77f462..16028250125460 100644 --- a/metadata-etl/build.gradle +++ b/metadata-etl/build.gradle @@ -22,6 +22,7 @@ dependencies { compile externalDependency.akka compile externalDependency.slf4j_api compile externalDependency.slf4j_log4j + compile externalDependency.hive_exec compile files("extralibs/terajdbc4-15.00.00.20.jar") compile files("extralibs/tdgssconfig-15.00.00.20.jar") // compile externalDependency.jython diff --git a/metadata-etl/src/main/java/metadata/etl/dataset/hive/HiveMetadataEtl.java b/metadata-etl/src/main/java/metadata/etl/dataset/hive/HiveMetadataEtl.java new file mode 100644 index 00000000000000..c7511849276efd --- /dev/null +++ b/metadata-etl/src/main/java/metadata/etl/dataset/hive/HiveMetadataEtl.java @@ -0,0 +1,50 @@ +package metadata.etl.dataset.hive; + +import java.io.InputStream; +import java.util.Properties; +import metadata.etl.EtlJob; + + +/** + * Created by zsun on 11/16/15. + */ +public class HiveMetadataEtl extends EtlJob { + + @Deprecated + public HiveMetadataEtl(int dbId, long whExecId) { + super(null, dbId, whExecId); + } + + public HiveMetadataEtl(int dbId, long whExecId, Properties prop) { + super(null, dbId, whExecId, prop); + } + + + @Override + public void extract() + throws Exception { + logger.info("In Hive metadata ETL, launch extract jython scripts"); + InputStream inputStream = classLoader.getResourceAsStream("jython/HiveExtract.py"); + //logger.info("before call scripts " + interpreter.getSystemState().argv); + super.interpreter.execfile(inputStream); + inputStream.close(); + } + + @Override + public void transform() + throws Exception { + logger.info("In Hive metadata ETL, launch transform jython scripts"); + InputStream inputStream = classLoader.getResourceAsStream("jython/HiveTransform.py"); + interpreter.execfile(inputStream); + inputStream.close(); + } + + @Override + public void load() + throws Exception { + logger.info("In Hive metadata ETL, launch load jython scripts"); + InputStream inputStream = classLoader.getResourceAsStream("jython/HiveLoad.py"); + interpreter.execfile(inputStream); + inputStream.close(); + } +} diff --git a/metadata-etl/src/main/java/metadata/etl/dataset/hive/HiveViewDependency.java b/metadata-etl/src/main/java/metadata/etl/dataset/hive/HiveViewDependency.java new file mode 100644 index 00000000000000..91b850e937efa8 --- /dev/null +++ b/metadata-etl/src/main/java/metadata/etl/dataset/hive/HiveViewDependency.java @@ -0,0 +1,30 @@ +package metadata.etl.dataset.hive; + +import java.util.TreeSet; +import org.apache.hadoop.hive.ql.tools.LineageInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Created by zsun on 12/14/15. + */ +public class HiveViewDependency { + final Logger logger = LoggerFactory.getLogger(getClass()); + LineageInfo lineageInfoTool; + + public HiveViewDependency() { + lineageInfoTool = new LineageInfo(); + } + public String[] getViewDependency(String hiveQl) { + try { + lineageInfoTool.getLineageInfo(hiveQl); + TreeSet inputs = lineageInfoTool.getInputTableList(); + return inputs.toArray(new String[inputs.size()]); + } catch (Exception e) { + logger.error("Sql statements : \n" + hiveQl + "\n parse ERROR, will return an empty String array"); + logger.error(String.valueOf(e.getCause())); + return new String[]{}; + } + } +} diff --git a/metadata-etl/src/main/resources/application.properties.template b/metadata-etl/src/main/resources/application.properties.template index 7e3504bc9fb5bf..5946f3d92effb5 100644 --- a/metadata-etl/src/main/resources/application.properties.template +++ b/metadata-etl/src/main/resources/application.properties.template @@ -106,3 +106,13 @@ ldap.group.search.return.attributes= git.host= git.project.whitelist= +# hive metastore +hive.metastore.jdbc.url= +hive.metastore.jdbc.driver= +hive.metstore.username= +hive.metastore.password= + +hive.schema_json_file= +#hive.sample_csv= +hive.schema_csv_file= +hive.field_metadata= diff --git a/metadata-etl/src/main/resources/jython/HiveExtract.py b/metadata-etl/src/main/resources/jython/HiveExtract.py new file mode 100644 index 00000000000000..9c4ed5c07ebe0b --- /dev/null +++ b/metadata-etl/src/main/resources/jython/HiveExtract.py @@ -0,0 +1,310 @@ +# +# Copyright 2015 LinkedIn Corp. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# + +from com.ziclix.python.sql import zxJDBC +import sys, os, re, json +import datetime +from wherehows.common import Constant + +class TableInfo: + """ Class to define the variable name """ + table_name = 'name' + type = 'type' + serialization_format = 'serialization_format' + create_time = 'create_time' + schema_url = 'schema_url' + field_delimiter = 'field_delimiter' + db_id = 'DB_ID' + table_id = 'TBL_ID' + serde_id = 'SD_ID' + table_type = 'tbl_type' + location = 'location' + view_expended_text = 'view_expanded_text' + input_format = 'input_format' + output_format = 'output_format' + is_compressed = 'is_compressed' + is_storedassubdirectories = 'is_storedassubdirectories' + etl_source = 'etl_source' + + field_list = 'field_list' + schema_literal = 'schema_literal' + + optional_prop = [create_time, serialization_format, field_delimiter, schema_url, db_id, table_id, serde_id, + table_type, location, view_expended_text, input_format, output_format, is_compressed, + is_storedassubdirectories, etl_source] + +class HiveExtract: + """ + Extract hive metadata from hive metastore. store it in a json file + """ + conn_hms = None + db_dict = {} # name : index + table_dict = {} # fullname : index + serde_param_columns = [] + + def get_table_info_from_v2(self, database_name): + """ + get table, column info from table columns_v2 + :param database_name: + :return: (DB_NAME, TBL_NAME, SERDE_FORMAT, TBL_CREATE_TIME, INTEGER_IDX, COLUMN_NAME, TYPE_NAME, COMMENT + DB_ID, TBL_ID, SD_ID, LOCATION, VIEW_EXPANDED_TEXT, TBL_TYPE, INPUT_FORMAT) + """ + curs = self.conn_hms.cursor() + tbl_info_sql = """select d.NAME DB_NAME, t.TBL_NAME TBL_NAME, + case when s.INPUT_FORMAT like '%.TextInput%' then 'Text' + when s.INPUT_FORMAT like '%.Avro%' then 'Avro' + when s.INPUT_FORMAT like '%.RCFile%' then 'RC' + when s.INPUT_FORMAT like '%.Orc%' then 'ORC' + when s.INPUT_FORMAT like '%.Sequence%' then 'Sequence' + when s.INPUT_FORMAT like '%.Parquet%' then 'Parquet' + else s.INPUT_FORMAT + end SerializationFormat, + t.CREATE_TIME TableCreateTime, + t.DB_ID, t.TBL_ID, s.SD_ID, + substr(s.LOCATION, length(substring_index(s.LOCATION, '/', 3))+1) Location, + t.TBL_TYPE, t.VIEW_EXPANDED_TEXT, s.INPUT_FORMAT, s.OUTPUT_FORMAT, s.IS_COMPRESSED, s.IS_STOREDASSUBDIRECTORIES, + c.INTEGER_IDX, c.COLUMN_NAME, c.TYPE_NAME, c.COMMENT + from TBLS t join DBS d on t.DB_ID=d.DB_ID + join SDS s on t.SD_ID = s.SD_ID + join COLUMNS_V2 c on s.CD_ID = c.CD_ID + where + d.NAME in ('{db_name}') + order by 1,2 + """.format(db_name=database_name) + curs.execute(tbl_info_sql) + rows = curs.fetchall() + curs.close() + return rows + + def get_table_info_from_serde_params(self, database_name): + """ + get table, column info {MANAGED and EXTERNAL} from avro schema parameter + :param database_name: + :return: (DatabaseName, TableName, SerializationFormat, Create_Time, SchemaLiteral, SchemaUrl, FieldDelimiter, DB_ID, TBL_ID, SD_ID + TBL_TYPE, INPUT_FORMAT, OUTPUT_FORMAT, IS_COMPRESSED, IS_STOREDASSUBDIRECTORIES, LOCATION, VIEW_EXPANDED_TEXT) + """ + curs_et = self.conn_hms.cursor() + tbl_info_sql = """select d.NAME DatabaseName, et.TBL_NAME TableName, + case when s.INPUT_FORMAT like '%.TextInput%' then 'Text' + when s.INPUT_FORMAT like '%.Avro%' then 'Avro' + when s.INPUT_FORMAT like '%.RCFile%' then 'RC' + when s.INPUT_FORMAT like '%.Orc%' then 'ORC' + when s.INPUT_FORMAT like '%.Sequence%' then 'Sequence' + when s.INPUT_FORMAT like '%.Parquet%' then 'Parquet' + else s.INPUT_FORMAT + end SerializationFormat, + et.CREATE_TIME TableCreateTime, et.DB_ID, et.TBL_ID, s.SD_ID, + substr(s.LOCATION, length(substring_index(s.LOCATION, '/', 3))+1) Location, + et.TBL_TYPE, et.VIEW_EXPANDED_TEXT, s.INPUT_FORMAT, s.OUTPUT_FORMAT, s.IS_COMPRESSED, s.IS_STOREDASSUBDIRECTORIES, + et.schema_literal SchemaLiteral, et.schema_url SchemaUrl, et.field_delim FieldDelimiter + from ( + select t.DB_ID, t.TBL_ID, sp.SERDE_ID, + t.TBL_NAME, t.CREATE_TIME, t.TBL_TYPE, t.VIEW_EXPANDED_TEXT, + replace(max( case when param_key in ('avro.schema.literal', 'schema.literal') + then param_value + end), '\\n', ' ') schema_literal, + max( case when param_key in ('avro.schema.url', 'schema.url') + then param_value + end) schema_url, + max( case when param_key in ('field.delim') + then param_value + end) field_delim + from SERDE_PARAMS sp join TBLS t on sp.SERDE_ID = t.SD_ID + where sp.PARAM_KEY regexp 'schema.literal|schema.url|field.delim' + and sp.PARAM_VALUE regexp """ + r" '^(,|{|\\\\|\\|)' " + """ + group by 1,2,3,4,5 ) et + JOIN DBS d on et.DB_ID = d.DB_ID + JOIN SDS s on et.SERDE_ID = s.SD_ID + where d.NAME = '{db_name}' + order by 1,2 """.format(db_name=database_name) + curs_et.execute(tbl_info_sql) + rows = curs_et.fetchall() + curs_et.close() + return rows + + def format_table_metadata_v2(self, rows, schema): + """ + process info get from COLUMN_V2 into final table, several lines form one table info + :param rows: the info get from COLUMN_V2 table + :param schema: {database : _, type : _, tables : [{}, {} ...] } + :return: + """ + db_idx = len(schema) - 1 + table_idx = -1 + + previous_db_name = '' + previous_tb_name = '' + field_list = [] + for row_index, row_value in enumerate(rows): + if row_index != len(rows) - 1 and (row_index == 0 or ( + row_index != len(rows) - 1 and row_value[0] == previous_db_name and row_value[1] == previous_tb_name)): + field_list.append( + {'IntegerIndex': row_value[14], 'ColumnName': row_value[15], 'TypeName': row_value[16], 'Comment': row_value[17]}) + else: # add new record into result + if row_index == len(rows) - 1: # edge case for last row + field_list.append({'IntegerIndex': row_value[14], 'ColumnName': row_value[15], 'TypeName': row_value[16], + 'Comment': row_value[17]}) + + table_record = {TableInfo.table_name: row_value[1], TableInfo.type: 'Table', TableInfo.serialization_format: row_value[2], + TableInfo.create_time: row_value[3], TableInfo.db_id: row_value[4], TableInfo.table_id: row_value[5], + TableInfo.serde_id: row_value[6], TableInfo.location: row_value[7], TableInfo.table_type: row_value[8], + TableInfo.view_expended_text: row_value[9], TableInfo.input_format: row_value[10], TableInfo.output_format: row_value[11], + TableInfo.is_compressed: row_value[12], TableInfo.is_storedassubdirectories: row_value[13], + TableInfo.etl_source: 'COLUMN_V2', + TableInfo.field_list: field_list} + field_list = [] + + if row_value[0] not in self.db_dict: + schema.append({'database': row_value[0], 'type': 'Hive', 'tables': []}) + db_idx += 1 + self.db_dict[row_value[0]] = db_idx + full_name = '' + if row_value[0]: + full_name = row_value[0] + if row_value[1]: + full_name += '.' + row_value[1] + elif row_value[1]: + full_name = row_value[1] + + # put in schema result + if full_name not in self.table_dict: + schema[db_idx]['tables'].append(table_record) + table_idx += 1 + self.table_dict[full_name] = table_idx + # print "%6d: %s: %s" % (table_idx, full_name, str(schema[db_idx]['tables'][table_idx])) + previous_db_name = row_value[0] + previous_tb_name = row_value[1] + + print "%s %6d tables processed for database %12s from COLUMN_V2" % ( + datetime.datetime.now(), table_idx + 1, row_value[0]) + + def format_table_metadata_serde(self, rows, schema): + """ + add table info from rows into schema + also add extra info. + :param rows: DatabaseName, TableName, SerializationFormat, Create_Time, SchemaLiteral, SchemaUrl, FieldDelimiter, DB_ID, TBL_ID, SD_ID + :param schema: {database : _, type : _, tables : ['name' : _, ... '' : _] } + :return: + """ + + db_idx = len(schema) - 1 + table_idx = -1 + for row_value in rows: + if row_value[0] not in self.db_dict: + schema.append({'database': row_value[0], 'type': 'Hive', 'tables': []}) + db_idx += 1 + self.db_dict[row_value[0]] = db_idx + else: + db_idx = self.db_dict[row_value[0]] + full_name = '' + if row_value[0]: + full_name = row_value[0] + if row_value[1]: + full_name += '.' + row_value[1] + elif row_value[1]: + full_name = row_value[1] + + # put in schema result + if full_name not in self.table_dict: + schema[db_idx]['tables'].append( + {TableInfo.table_name: row_value[1], TableInfo.type: 'Table', TableInfo.serialization_format: row_value[2], + TableInfo.create_time: row_value[3], TableInfo.db_id: row_value[4], TableInfo.table_id: row_value[5], + TableInfo.serde_id: row_value[6], TableInfo.location: row_value[7], TableInfo.table_type: row_value[8], + TableInfo.view_expended_text: row_value[9], TableInfo.input_format: row_value[10], + TableInfo.output_format: row_value[11], TableInfo.is_compressed: row_value[12], + TableInfo.is_storedassubdirectories: row_value[13], + TableInfo.etl_source: 'SERDE_PARAMS', + TableInfo.schema_literal: row_value[14], TableInfo.schema_url: row_value[15], + TableInfo.field_delimiter: row_value[16]} + ) + table_idx += 1 + self.table_dict[full_name] = table_idx + + print "%s %6d tables processed for database %12s from SERDE_PARAM" % ( + datetime.datetime.now(), table_idx + 1, row_value[0]) + + def run(self, schema_output_file, sample_output_file): + """ + The entrance of the class, extract schema. + One database per time + :param schema_output_file: output file + :return: + """ + cur = self.conn_hms.cursor() + schema = [] + + schema_json_file = open(schema_output_file, 'wb') + os.chmod(schema_output_file, 0666) + + # open(sample_output_file, 'wb') + # os.chmod(sample_output_file, 0666) + # sample_file_writer = FileWriter(sample_output_file) + + for database_name in self.databases: + print "Collecting hive tables in database : " + database_name + # tables from schemaLiteral + rows = [] + begin = datetime.datetime.now().strftime("%H:%M:%S") + rows.extend(self.get_table_info_from_serde_params(database_name)) + if len(rows) > 0: + self.format_table_metadata_serde(rows, schema) + end = datetime.datetime.now().strftime("%H:%M:%S") + print "Get table info from Serde %12s [%s -> %s]\n" % (database_name, str(begin), str(end)) + + # tables from Column V2 + rows = [] + begin = datetime.datetime.now().strftime("%H:%M:%S") + rows.extend(self.get_table_info_from_v2(database_name)) + if len(rows) > 0: + self.format_table_metadata_v2(rows, schema) + end = datetime.datetime.now().strftime("%H:%M:%S") + print "Get table info from COLUMN_V2 %12s [%s -> %s]\n" % (database_name, str(begin), str(end)) + + schema_json_file.write(json.dumps(schema, indent=None) + '\n') + + cur.close() + schema_json_file.close() + + def get_all_databases(self): + """ + Fetch all databases name from DBS table + :return: + """ + fetch_all_database_names = "SELECT `NAME` FROM DBS WHERE NAME NOT LIKE 'u_%'" + curs = self.conn_hms.cursor() + curs.execute(fetch_all_database_names) + rows = [item[0] for item in curs.fetchall()] + curs.close() + return rows + + +if __name__ == "__main__": + args = sys.argv[1] + + # connection + username = args[Constant.HIVE_METASTORE_USERNAME] + password = args[Constant.HIVE_METASTORE_PASSWORD] + jdbc_driver = args[Constant.HIVE_METASTORE_JDBC_DRIVER] + jdbc_url = args[Constant.HIVE_METASTORE_JDBC_URL] + + e = HiveExtract() + e.conn_hms = zxJDBC.connect(jdbc_url, username, password, jdbc_driver) + + e.databases = e.get_all_databases() + print 'Process databases : ' + print e.databases + + + e.run(args[Constant.HIVE_SCHEMA_JSON_FILE_KEY], None) + e.conn_hms.close() diff --git a/metadata-etl/src/main/resources/jython/HiveLoad.py b/metadata-etl/src/main/resources/jython/HiveLoad.py new file mode 100644 index 00000000000000..38c6c91d9c76c7 --- /dev/null +++ b/metadata-etl/src/main/resources/jython/HiveLoad.py @@ -0,0 +1,163 @@ +# +# Copyright 2015 LinkedIn Corp. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# + +import sys +from com.ziclix.python.sql import zxJDBC +from wherehows.common import Constant + + +class HiveLoad: + def load_metadata(self): + cursor = self.conn_mysql.cursor() + load_cmd = """ + DELETE FROM stg_dict_dataset WHERE db_id = {db_id}; + + LOAD DATA LOCAL INFILE '{source_file}' + INTO TABLE stg_dict_dataset + FIELDS TERMINATED BY '\Z' ESCAPED BY '\0' + (`name`, `schema`, properties, fields, urn, source, @sample_partition_full_path, source_created_time, @source_modified_time) + SET db_id = {db_id}, + storage_type = 'Table', dataset_type = 'hive', + source_modified_time=nullif(@source_modified_time,''), + sample_partition_full_path=nullif(@sample_partition_full_path,''), + wh_etl_exec_id = {wh_etl_exec_id}; + + -- SELECT COUNT(*) FROM stg_dict_dataset; + -- clear + DELETE FROM stg_dict_dataset where db_id = {db_id} + AND (length(`name`)) = 0 + OR `name` like 'tmp\_%' + OR `name` like 't\_%' + ; + + update stg_dict_dataset + set location_prefix = substring_index(substring_index(urn, '/', 4), '/', -2) /* hive location_prefix is it's schema name*/ + WHERE db_id = {db_id} and location_prefix is null; + + update stg_dict_dataset + set parent_name = substring_index(substring_index(urn, '/', 4), '/', -1) /* hive parent_name is it's schema name*/ + where db_id = {db_id} and parent_name is null; + + -- insert into final table + INSERT INTO dict_dataset + ( `name`, + `schema`, + schema_type, + fields, + properties, + urn, + source, + location_prefix, + parent_name, + storage_type, + ref_dataset_id, + status_id, + dataset_type, + hive_serdes_class, + is_partitioned, + partition_layout_pattern_id, + sample_partition_full_path, + source_created_time, + source_modified_time, + created_time, + wh_etl_exec_id + ) + select s.name, s.schema, s.schema_type, s.fields, + s.properties, s.urn, + s.source, s.location_prefix, s.parent_name, + s.storage_type, s.ref_dataset_id, s.status_id, + s.dataset_type, s.hive_serdes_class, s.is_partitioned, + s.partition_layout_pattern_id, s.sample_partition_full_path, + s.source_created_time, s.source_modified_time, UNIX_TIMESTAMP(now()), + s.wh_etl_exec_id + from stg_dict_dataset s + where s.db_id = {db_id} + on duplicate key update + `name`=s.name, `schema`=s.schema, schema_type=s.schema_type, fields=s.fields, + properties=s.properties, source=s.source, location_prefix=s.location_prefix, parent_name=s.parent_name, + storage_type=s.storage_type, ref_dataset_id=s.ref_dataset_id, status_id=s.status_id, + dataset_type=s.dataset_type, hive_serdes_class=s.hive_serdes_class, is_partitioned=s.is_partitioned, + partition_layout_pattern_id=s.partition_layout_pattern_id, sample_partition_full_path=s.sample_partition_full_path, + source_created_time=s.source_created_time, source_modified_time=s.source_modified_time, + modified_time=UNIX_TIMESTAMP(now()), wh_etl_exec_id=s.wh_etl_exec_id + ; + analyze table dict_dataset; + """.format(source_file=self.input_schema_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id) + + for state in load_cmd.split(";"): + print state + cursor.execute(state) + self.conn_mysql.commit() + cursor.close() + + def load_field(self): + """ + TODO: Load field is not used for now, as we need to open the nested structure type + :return: + """ + cursor = self.conn_mysql.cursor() + load_cmd = """ + DELETE FROM stg_dict_field_detail where db_id = {db_id}; + + LOAD DATA LOCAL INFILE '{source_file}' + INTO TABLE stg_dict_field_detail + FIELDS TERMINATED BY '\Z' + (urn, sort_id, parent_sort_id, @parent_path, field_name, field_label, data_type, + @data_size, @precision, @scale, @is_nullable, @is_indexed, @is_partitioned, @default_value, @namespace, description, + @dummy + ) + set + parent_path=nullif(@parent_path,'null') + , data_size=nullif(@data_size,'null') + , data_precision=nullif(@precision,'null') + , data_scale=nullif(@scale,'null') + , is_nullable=nullif(@is_nullable,'null') + , is_indexed=nullif(@is_indexed,'null') + , is_partitioned=nullif(@is_partitioned,'null') + , default_value=nullif(@default_value,'null') + , namespace=nullif(@namespace,'null') + , db_id = {db_id} + ; + + analyze table stg_dict_field_detail; + + """.format(source_file=self.input_field_file, db_id=self.db_id) + + # didn't load into final table for now + + for state in load_cmd.split(";"): + print state + cursor.execute(state) + self.conn_mysql.commit() + cursor.close() + +if __name__ == "__main__": + args = sys.argv[1] + + l = HiveLoad() + + # set up connection + username = args[Constant.WH_DB_USERNAME_KEY] + password = args[Constant.WH_DB_PASSWORD_KEY] + JDBC_DRIVER = args[Constant.WH_DB_DRIVER_KEY] + JDBC_URL = args[Constant.WH_DB_URL_KEY] + + l.input_schema_file = args[Constant.HIVE_SCHEMA_CSV_FILE_KEY] + l.input_field_file = args[Constant.HIVE_FIELD_METADATA_KEY] + l.db_id = args[Constant.DB_ID_KEY] + l.wh_etl_exec_id = args[Constant.WH_EXEC_ID_KEY] + l.conn_mysql = zxJDBC.connect(JDBC_URL, username, password, JDBC_DRIVER) + l.load_metadata() + # l.load_field() + l.conn_mysql.close() diff --git a/metadata-etl/src/main/resources/jython/HiveTransform.py b/metadata-etl/src/main/resources/jython/HiveTransform.py new file mode 100644 index 00000000000000..58068189a98920 --- /dev/null +++ b/metadata-etl/src/main/resources/jython/HiveTransform.py @@ -0,0 +1,145 @@ +# +# Copyright 2015 LinkedIn Corp. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# + +import json +import pprint, datetime +import sys, os +import time +from wherehows.common.writers import FileWriter +from wherehows.common.schemas import DatasetSchemaRecord, DatasetFieldRecord +from wherehows.common import Constant +from HiveExtract import TableInfo +from org.apache.hadoop.hive.ql.tools import LineageInfo +from metadata.etl.dataset.hive import HiveViewDependency + +class HiveTransform: + def transform(self, input, hive_metadata, hive_field_metadata): + """ + convert from json to csv + :param input: input json file + :param hive_metadata: output data file for hive table metadata + :param hive_field_metadata: output data file for hive field metadata + :return: + """ + pp = pprint.PrettyPrinter(indent=1) + + f_json = open(input) + all_data = json.load(f_json) + f_json.close() + + schema_file_writer = FileWriter(hive_metadata) + field_file_writer = FileWriter(hive_field_metadata) + + lineageInfo = LineageInfo() + + # one db info : 'type', 'database', 'tables' + # one table info : required : 'name' , 'type', 'serializationFormat' ,'createTime', 'DB_ID', 'TBL_ID', 'SD_ID' + # optional : 'schemaLiteral', 'schemaUrl', 'fieldDelimiter', 'fieldList' + for one_db_info in all_data: + i = 0 + for table in one_db_info['tables']: + i += 1 + schema_json = {} + prop_json = {} # set the prop json + + for prop_name in TableInfo.optional_prop: + if prop_name in table and table[prop_name] is not None: + prop_json[prop_name] = table[prop_name] + + if TableInfo.view_expended_text in prop_json: + text = prop_json[TableInfo.view_expended_text].replace('`', '') + array = HiveViewDependency.getViewDependency(text) + l = [] + for a in array: + l.append(a) + prop_json['view_depends_on'] = l + + # process either schema + flds = {} + field_detail_list = [] + if TableInfo.schema_literal in table and table[TableInfo.schema_literal] is not None: + sort_id = 0 + try: + schema_data = json.loads(table[TableInfo.schema_literal]) + except ValueError: + print "Schema json error for table : " + print table + schema_json = schema_data + + # process each field + for field in schema_data['fields']: + field_name = field['name'] + type = field['type'] # could be a list + default_value = field['default'] if 'default' in field else None + doc = field['doc'] if 'doc' in field else None + + attributes_json = json.loads(field['attributes_json']) if 'attributes_json' in field else None + pk = delta = is_nullable = is_indexed = is_partitioned = inside_type = format = data_size = None + if attributes_json: + pk = attributes_json['pk'] if 'pk' in attributes_json else None + delta = attributes_json['delta'] if 'delta' in attributes_json else None + is_nullable = attributes_json['nullable'] if 'nullable' in attributes_json else None + inside_type = attributes_json['type'] if 'type' in attributes_json else None + format = attributes_json['format'] if 'format' in attributes_json else None + + flds[field_name] = {'type': type} + # String urn, Integer sortId, Integer parentSortId, String parentPath, String fieldName, + #String dataType, String isNullable, String defaultValue, Integer dataSize, String namespace, String description + sort_id += 1 + field_detail_list.append( + ["hive:///%s/%s" % (one_db_info['database'], table['name']), str(sort_id), '0', None, field_name, '', + type, data_size, None, None, is_nullable, is_indexed, is_partitioned, default_value, None, + json.dumps(attributes_json)]) + elif TableInfo.field_list in table: + schema_json = {'type': 'record', 'name': table['name'], + 'fields': table[TableInfo.field_list]} # construct a schema for data came from COLUMN_V2 + for field in table[TableInfo.field_list]: + field_name = field['ColumnName'] + type = field['TypeName'] + # ColumnName, IntegerIndex, TypeName, Comment + flds[field_name] = {'type': type} + pk = delta = is_nullable = is_indexed = is_partitioned = inside_type = format = data_size = default_value = None # TODO ingest + field_detail_list.append( + ["hive:///%s/%s" % (one_db_info['database'], table['name']), field['IntegerIndex'], '0', None, field_name, + '', field['TypeName'], None, None, None, is_nullable, is_indexed, is_partitioned, default_value, None, + None]) + + dataset_scehma_record = DatasetSchemaRecord(table['name'], json.dumps(schema_json), json.dumps(prop_json), + json.dumps(flds), + "hive:///%s/%s" % (one_db_info['database'], table['name']), 'Hive', + '', (table[TableInfo.create_time] if table.has_key( + TableInfo.create_time) else None), (table["lastAlterTime"]) if table.has_key("lastAlterTime") else None) + schema_file_writer.append(dataset_scehma_record) + + for fields in field_detail_list: + field_record = DatasetFieldRecord(fields) + field_file_writer.append(field_record) + + schema_file_writer.flush() + field_file_writer.flush() + print "%20s contains %6d tables" % (one_db_info['database'], i) + + schema_file_writer.close() + field_file_writer.close() + + def convert_timestamp(self, time_string): + return int(time.mktime(time.strptime(time_string, "%Y-%m-%d %H:%M:%S"))) + + +if __name__ == "__main__": + args = sys.argv[1] + t = HiveTransform() + + t.transform(args[Constant.HIVE_SCHEMA_JSON_FILE_KEY], args[Constant.HIVE_SCHEMA_CSV_FILE_KEY], args[Constant.HIVE_FIELD_METADATA_KEY]) + diff --git a/metadata-etl/src/test/java/metadata/etl/dataset/hive/HiveTest.java b/metadata-etl/src/test/java/metadata/etl/dataset/hive/HiveTest.java new file mode 100644 index 00000000000000..615205080985a6 --- /dev/null +++ b/metadata-etl/src/test/java/metadata/etl/dataset/hive/HiveTest.java @@ -0,0 +1,48 @@ +package metadata.etl.dataset.hive; + +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + + +/** + * Created by zsun on 11/13/15. + */ +public class HiveTest { + HiveMetadataEtl hm; + + @BeforeTest + public void setUp() + throws Exception { + hm = new HiveMetadataEtl(0, 0L); + } + + @Test + public void extractTest() + throws Exception { + hm.extract(); + // check the json file + } + + @Test + public void transformTest() + throws Exception { + hm.transform(); + // check the csv file + } + + @Test + public void loadTest() + throws Exception { + hm.load(); + // check in database + } + + @Test + public void runTest() + throws Exception { + extractTest(); + transformTest(); + loadTest(); + } + +} diff --git a/metadata-etl/src/test/java/metadata/etl/dataset/hive/HiveViewDependencyParserTest.java b/metadata-etl/src/test/java/metadata/etl/dataset/hive/HiveViewDependencyParserTest.java new file mode 100644 index 00000000000000..0d9c26aa6ffdf0 --- /dev/null +++ b/metadata-etl/src/test/java/metadata/etl/dataset/hive/HiveViewDependencyParserTest.java @@ -0,0 +1,22 @@ +package metadata.etl.dataset.hive; + +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hadoop.hive.ql.parse.ParseException; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.testng.Assert; +import org.testng.annotations.Test; + +/** + * Created by zsun on 12/9/15. + */ +public class HiveViewDependencyParserTest { + @Test + public void parseTest() + throws CommandNeedRetryException, SemanticException, ParseException { + String hiveQl = "select t1.c2 from (select t2.column2 c2, t3.column3 from db1.table2 t2 join db2.table3 t3 on t2.x = t3.y) t1"; + HiveViewDependency hiveViewDependency = new HiveViewDependency(); + String[] result = hiveViewDependency.getViewDependency(hiveQl); + String[] expctedResult = new String[]{"db1.table2", "db2.table3"}; + Assert.assertEquals(expctedResult, result); + } +} From 0eddd8accf0395cf215c54a7d9fc3243dd697845 Mon Sep 17 00:00:00 2001 From: SunZhaonan Date: Mon, 21 Dec 2015 12:06:26 -0800 Subject: [PATCH 2/2] close connection appropriatly, minior fix --- build.gradle | 2 +- .../metadata/etl/dataset/hive/HiveMetadataEtl.java | 2 +- metadata-etl/src/main/resources/jython/HdfsLoad.py | 2 +- .../src/main/resources/jython/HiveExtract.py | 14 +++++++------- metadata-etl/src/main/resources/jython/HiveLoad.py | 8 +++++--- 5 files changed, 15 insertions(+), 13 deletions(-) diff --git a/build.gradle b/build.gradle index ff22125e666067..dfb4b88ba695ec 100644 --- a/build.gradle +++ b/build.gradle @@ -70,6 +70,6 @@ subprojects { "play_ebean" : "com.typesafe.play:play-java-ebean_2.10:2.2.4", "play_java_jdbc" : "com.typesafe.play:play-java-jdbc_2.10:2.2.4", "play_cache" : "com.typesafe.play:play-cache_2.10:2.2.4", - "hive_exec" : "org.apache.hive:hive-exec:0.8.0" + "hive_exec" : "org.apache.hive:hive-exec:1.2.1" ] } diff --git a/metadata-etl/src/main/java/metadata/etl/dataset/hive/HiveMetadataEtl.java b/metadata-etl/src/main/java/metadata/etl/dataset/hive/HiveMetadataEtl.java index c7511849276efd..f0d4e8512370a9 100644 --- a/metadata-etl/src/main/java/metadata/etl/dataset/hive/HiveMetadataEtl.java +++ b/metadata-etl/src/main/java/metadata/etl/dataset/hive/HiveMetadataEtl.java @@ -26,7 +26,7 @@ public void extract() logger.info("In Hive metadata ETL, launch extract jython scripts"); InputStream inputStream = classLoader.getResourceAsStream("jython/HiveExtract.py"); //logger.info("before call scripts " + interpreter.getSystemState().argv); - super.interpreter.execfile(inputStream); + interpreter.execfile(inputStream); inputStream.close(); } diff --git a/metadata-etl/src/main/resources/jython/HdfsLoad.py b/metadata-etl/src/main/resources/jython/HdfsLoad.py index f6599445bb4ba7..4f81e54864611e 100644 --- a/metadata-etl/src/main/resources/jython/HdfsLoad.py +++ b/metadata-etl/src/main/resources/jython/HdfsLoad.py @@ -30,7 +30,7 @@ def load_metadata(self): LOAD DATA LOCAL INFILE '{source_file}' INTO TABLE stg_dict_dataset - FIELDS TERMINATED BY '\Z' ESCAPED BY '\0' + FIELDS TERMINATED BY '\Z' ESCAPED BY '\\' (`name`, `schema`, properties, fields, urn, source, sample_partition_full_path, source_created_time, source_modified_time) SET db_id = {db_id}, -- TODO storage_type = 'Avro', diff --git a/metadata-etl/src/main/resources/jython/HiveExtract.py b/metadata-etl/src/main/resources/jython/HiveExtract.py index 9c4ed5c07ebe0b..1011e14621ba40 100644 --- a/metadata-etl/src/main/resources/jython/HiveExtract.py +++ b/metadata-etl/src/main/resources/jython/HiveExtract.py @@ -301,10 +301,10 @@ def get_all_databases(self): e = HiveExtract() e.conn_hms = zxJDBC.connect(jdbc_url, username, password, jdbc_driver) - e.databases = e.get_all_databases() - print 'Process databases : ' - print e.databases - - - e.run(args[Constant.HIVE_SCHEMA_JSON_FILE_KEY], None) - e.conn_hms.close() + try: + e.databases = e.get_all_databases() + print 'Process databases : ' + print e.databases + e.run(args[Constant.HIVE_SCHEMA_JSON_FILE_KEY], None) + finally: + e.conn_hms.close() diff --git a/metadata-etl/src/main/resources/jython/HiveLoad.py b/metadata-etl/src/main/resources/jython/HiveLoad.py index 38c6c91d9c76c7..fd618c8adef836 100644 --- a/metadata-etl/src/main/resources/jython/HiveLoad.py +++ b/metadata-etl/src/main/resources/jython/HiveLoad.py @@ -158,6 +158,8 @@ def load_field(self): l.db_id = args[Constant.DB_ID_KEY] l.wh_etl_exec_id = args[Constant.WH_EXEC_ID_KEY] l.conn_mysql = zxJDBC.connect(JDBC_URL, username, password, JDBC_DRIVER) - l.load_metadata() - # l.load_field() - l.conn_mysql.close() + try: + l.load_metadata() + # l.load_field() + finally: + l.conn_mysql.close()