diff --git a/contrib/metadata-ingestion/haskell/README.md b/contrib/metadata-ingestion/haskell/README.md deleted file mode 100644 index 0342d69302fee..0000000000000 --- a/contrib/metadata-ingestion/haskell/README.md +++ /dev/null @@ -1,72 +0,0 @@ -# datahub Ingestion Tool - - -## Introduction - -some tool to ingestion [jdbc-database-schema] and [etl-lineage] metadata. - -i split the ingestion procedure to two part: [datahub-producer] and different [metadata-generator] - - -## Roadmap - -- [X] datahub-producer load json avro data. -- [X] add lineage-hive generator -- [X] add dataset-jdbc generator[include [mysql, mssql, postgresql, oracle] driver] -- [X] add dataset-hive generator -- [ ] *> add lineage-oracle generator -- [ ] enhance lineage-jdbc generator to lazy iterator mode. -- [ ] enchance avro parser to show error information - - - -## Quickstart -1. install nix and channel - -``` - sudo install -d -m755 -o $(id -u) -g $(id -g) /nix - curl https://nixos.org/nix/install | sh - - nix-channel --add https://nixos.org/channels/nixos-20.03 nixpkgs - nix-channel --update nixpkgs -``` - -2. [optional] you can download specified dependency in advanced, or it will automatically download at run time. - -``` - nix-shell bin/[datahub-producer].hs.nix - nix-shell bin/[datahub-producer].py.nix - ... -``` - -3. load json data to datahub - -``` - cat sample/mce.json.dat | bin/datahub-producer.hs config -``` - -4. parse hive sql to datahub -``` - ls sample/hive_*.sql | bin/lineage_hive_generator.hs | bin/datahub-producer.hs config -``` - -5. load jdbc schema(mysql, mssql, postgresql, oracle) to datahub -``` - bin/dataset-jdbc-generator.hs | bin/datahub-producer.hs config -``` - -6. load hive schema to datahub -``` - bin/dataset-hive-generator.py | bin/datahub-producer.hs config -``` - -## Reference - -- hive/presto/vertica SQL Parser - uber/queryparser [https://github.com/uber/queryparser.git] - -- oracle procedure syntax - https://docs.oracle.com/cd/E11882_01/server.112/e41085/sqlqr01001.htm#SQLQR110 - -- postgresql procedure parser - SQream/hssqlppp [https://github.com/JakeWheat/hssqlppp.git] diff --git a/contrib/metadata-ingestion/haskell/bin/datahub-producer.hs b/contrib/metadata-ingestion/haskell/bin/datahub-producer.hs deleted file mode 100755 index ae81d52902ff9..0000000000000 --- a/contrib/metadata-ingestion/haskell/bin/datahub-producer.hs +++ /dev/null @@ -1,174 +0,0 @@ -#! /usr/bin/env nix-shell -#! nix-shell datahub-producer.hs.nix -i runghc - -{-# LANGUAGE OverloadedStrings, FlexibleInstances, FlexibleContexts, ScopedTypeVariables #-} -{-# LANGUAGE TemplateHaskell #-} - -import System.Environment (getArgs) -import System.Directory (canonicalizePath) -import Data.Typeable (Typeable) -import Data.Functor ((<&>)) -import Control.Arrow (left, right) -import Control.Monad ((>=>), when) -import Control.Monad.IO.Class (MonadIO(..)) - -import Control.Monad.Catch (Exception, MonadThrow(..)) - -import qualified Data.ByteString.Lazy as B -import qualified Data.ByteString.Lazy.Char8 as BC -import qualified Data.Text as T -import qualified Data.Aeson as J -import Data.String.Conversions (cs) -import qualified Data.Binary as BIN - -import Data.HashMap.Strict ((!)) -import qualified Data.HashMap.Strict as MS -import qualified Data.Vector as V - - -import Control.Lens ((^?), (^..), folded, _Just) -import Data.Aeson.Lens (key, _Array, _String) - -import qualified Data.Avro.Types as A (Value(..)) -import qualified Data.Avro as A (Schema, Result(..)) -import qualified Data.Avro.Schema as AS ( - Schema(..), resultToEither, buildTypeEnvironment - , renderFullname, parseFullname, typeName, parseAvroJSON - ) --- import Data.Avro.JSON (decodeAvroJSON) -import Data.Avro.Encode (encodeAvro) -import Data.Avro.Decode (decodeAvro) --- import Data.Avro.Deriving (makeSchema) - -import Kafka.Avro ( - SchemaRegistry(..), Subject(..), SchemaId(..) - , schemaRegistry, sendSchema - , extractSchemaId, loadSchema - ) - -import Data.Conduit (ConduitT, ZipSink(..), getZipSink, runConduitRes, runConduit, bracketP, (.|), yield) -import qualified Data.Conduit.Combinators as C -import Kafka.Conduit.Sink (ProducerRecord(..), TopicName(..), ProducePartition(..), BrokerAddress(..), kafkaSink, brokersList) - -import Network.URI (parseURI) -import Network.URI.Lens (uriAuthorityLens, uriRegNameLens, uriPortLens) - -import System.Process (readProcess) - -data StringException = StringException String deriving (Typeable, Show) -instance Exception StringException - -decodeAvroJSON :: A.Schema -> J.Value -> A.Result (A.Value A.Schema) -decodeAvroJSON schema json = - AS.parseAvroJSON union env schema json - where - env = - AS.buildTypeEnvironment missing schema - missing name = - fail ("Type " <> show name <> " not in schema") - - union (AS.Union schemas) J.Null - | AS.Null `elem` schemas = - pure $ A.Union schemas AS.Null A.Null - | otherwise = - fail "Null not in union." - union (AS.Union schemas) (J.Object obj) - | null obj = - fail "Invalid encoding of union: empty object ({})." - | length obj > 1 = - fail ("Invalid encoding of union: object with too many fields:" ++ show obj) - | otherwise = - let - canonicalize name - | isBuiltIn name = name - | otherwise = AS.renderFullname $ AS.parseFullname name - branch = - head $ MS.keys obj - names = - MS.fromList [(AS.typeName t, t) | t <- V.toList schemas] - in case MS.lookup (canonicalize branch) names of - Just t -> do - nested <- AS.parseAvroJSON union env t (obj ! branch) - return (A.Union schemas t nested) - Nothing -> fail ("Type '" <> T.unpack branch <> "' not in union: " <> show schemas) - union AS.Union{} _ = - A.Error "Invalid JSON representation for union: has to be a JSON object with exactly one field." - union _ _ = - error "Impossible: function given non-union schema." - - isBuiltIn name = name `elem` [ "null", "boolean", "int", "long", "float" - , "double", "bytes", "string", "array", "map" ] - - -fromRight :: (MonadThrow m, Show a) => String -> Either a b -> m b -fromRight label = either (throwM . StringException . (label ++) . show) return - -fromJust :: (MonadThrow m, Show a) => String -> Maybe a -> m a -fromJust label = maybe (throwM . StringException $ (label ++ "value is missing") ) return - -encodeJsonWithSchema :: (MonadIO m, MonadThrow m) - => SchemaRegistry - -> Subject - -> A.Schema - -> J.Value - -> m B.ByteString -encodeJsonWithSchema sr subj schema json = do - v <- fromRight "[decodeAvroJSON]" $ AS.resultToEither $ decodeAvroJSON schema json - mbSid <- fromRight "[SchemaRegistry.sendSchema]"=<< sendSchema sr subj schema - return $ appendSchemaId v mbSid - where appendSchemaId v (SchemaId sid)= B.cons (toEnum 0) (BIN.encode sid) <> (encodeAvro v) - -decodeJsonWithSchema :: (MonadIO m, MonadThrow m) - => SchemaRegistry - -> B.ByteString - -> m J.Value -decodeJsonWithSchema sr bs = do - (sid, payload) <- maybe (throwM . StringException $ "BadPayloadNoSchemaId") return $ extractSchemaId bs - schema <- fromRight "[SchemaRegistry.loadSchema]" =<< loadSchema sr sid - J.toJSON <$> (fromRight "[Avro.decodeAvro]" $ decodeAvro schema payload) - - -parseNixJson :: FilePath -> IO J.Value -parseNixJson f = do - stdout :: String <- read <$> readProcess "nix-instantiate" ["--eval", "--expr", "builtins.toJSON (import " ++ f ++ ")"] "" - fromRight "[Aeson.eitherDecode] parse nix json" (J.eitherDecode (cs stdout)) - -main :: IO () -main = do - args <- getArgs - when (length args /= 1) $ - error " datahub-producer.hs [config-dir]" - - confDir <- canonicalizePath (head args) - putStrLn ("confDir:" <> confDir) - confJson <- parseNixJson (confDir <> "/" <> "datahub-config.nix") - -- putStrLn ("confJson: " ++ show confJson) - schema <- fromRight "[Aeson.eitherDecode] parse asvc file:" =<< - J.eitherDecode <$> B.readFile (confDir <> "/" <> "MetadataChangeEvent.avsc") - -- putStrLn ("schema: " ++ show schema) - - let - topic = "MetadataChangeEvent" - -- schema = $(makeSchema "../MetadataChangeEvent.avsc") - sandboxL = key "services".key "linkedin-datahub-pipeline".key "sandbox" - urisL = key "uris". _Array.folded._String - brokers = confJson ^.. sandboxL.key "kafka".urisL - srs = confJson ^.. sandboxL.key "schema-registry".urisL - brokers' = map (\uriText -> BrokerAddress . cs . concat $ parseURI (cs uriText) ^.. _Just.uriAuthorityLens._Just.(uriRegNameLens <> uriPortLens)) brokers - - contents <- B.getContents <&> BC.lines - sr <- schemaRegistry (cs (head srs)) - - putStrLn " ==> beginning to send data..." - runConduitRes $ C.yieldMany contents - .| C.mapM (fromRight "[JSON.eitherDecode] read json record:". J.eitherDecode) - -- .| C.iterM (liftIO . putStrLn. cs . J.encode) - .| C.mapM (encodeJsonWithSchema sr (Subject (topic <> "-value")) schema) - -- .| C.iterM (decodeJsonWithSchema sr >=> liftIO . print . J.encode) - .| C.map (mkRecord (TopicName topic)) - .| getZipSink ( ZipSink (kafkaSink (brokersList brokers')) *> - ZipSink ((C.length >>= yield) .| C.iterM (\n -> liftIO $ putStrLn ("total table num:" <> show n)) .| C.sinkNull)) - return () - where - mkRecord :: TopicName -> B.ByteString -> ProducerRecord - mkRecord topic bs = ProducerRecord topic UnassignedPartition Nothing (Just (cs bs)) diff --git a/contrib/metadata-ingestion/haskell/bin/datahub-producer.hs.nix b/contrib/metadata-ingestion/haskell/bin/datahub-producer.hs.nix deleted file mode 100644 index 57e131e055454..0000000000000 --- a/contrib/metadata-ingestion/haskell/bin/datahub-producer.hs.nix +++ /dev/null @@ -1,16 +0,0 @@ -with import {} ; -let -in -mkShell { - buildInputs = [ - (haskellPackages.ghcWithPackages ( p: - [ p.bytestring p.string-conversions - p.exceptions - p.network-uri p.directory - p.lens p.aeson p.lens-aeson p.avro p.hw-kafka-avro - p.hw-kafka-client - p.conduit p.hw-kafka-conduit - ] - )) - ]; -} diff --git a/contrib/metadata-ingestion/haskell/bin/dataset-hive-generator.py b/contrib/metadata-ingestion/haskell/bin/dataset-hive-generator.py deleted file mode 100755 index 9da5ae198017a..0000000000000 --- a/contrib/metadata-ingestion/haskell/bin/dataset-hive-generator.py +++ /dev/null @@ -1,66 +0,0 @@ -#! /usr/bin/env nix-shell -#! nix-shell dataset-hive-generator.py.nix -i python - -import sys -import time -from pyhive import hive -from TCLIService.ttypes import TOperationState - -import simplejson as json - -HIVESTORE='localhost' - -AVROLOADPATH = '../../metadata-events/mxe-schemas/src/renamed/avro/com/linkedin/mxe/MetadataChangeEvent.avsc' -KAFKATOPIC = 'MetadataChangeEvent_v4' -BOOTSTRAP = 'localhost:9092' -SCHEMAREGISTRY = 'http://localhost:8081' - -def hive_query(query): - """ - Execute the query to the HiveStore. - """ - cursor = hive.connect(HIVESTORE).cursor() - cursor.execute(query, async_=True) - status = cursor.poll().operationState - while status in (TOperationState.INITIALIZED_STATE, TOperationState.RUNNING_STATE): - logs = cursor.fetch_logs() - for message in logs: - sys.stdout.write(message) - status = cursor.poll().operationState - results = cursor.fetchall() - return results - -def build_hive_dataset_mce(dataset_name, schema, metadata): - """ - Create the MetadataChangeEvent via dataset_name and schema. - """ - actor, type, created_time, upstreams_dataset, sys_time = "urn:li:corpuser:" + metadata[2][7:], str(metadata[-1][11:-1]), int(metadata[3][12:]), metadata[-28][10:], int(time.time()) - owners = {"owners":[{"owner":actor,"type":"DATAOWNER"}],"lastModified":{"time":sys_time,"actor":actor}} - upstreams = {"upstreams":[{"auditStamp":{"time":sys_time,"actor":actor},"dataset":"urn:li:dataset:(urn:li:dataPlatform:hive," + upstreams_dataset + ",PROD)","type":"TRANSFORMED"}]} - elements = {"elements":[{"url":HIVESTORE,"description":"sample doc to describe upstreams","createStamp":{"time":sys_time,"actor":actor}}]} - schema_name = {"schemaName":dataset_name,"platform":"urn:li:dataPlatform:hive","version":0,"created":{"time":created_time,"actor":actor}, - "lastModified":{"time":sys_time,"actor":actor},"hash":"","platformSchema":{"com.linkedin.pegasus2avro.schema.OtherSchema": {"rawSchema": schema}}, - "fields":[{"fieldPath":"","description":{"string":""},"nativeDataType":"string","type":{"type":{"com.linkedin.pegasus2avro.schema.StringType":{}}}}]} - - mce = {"auditHeader": None, - "proposedSnapshot":{"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": - {"urn": "urn:li:dataset:(urn:li:dataPlatform:hive,"+ dataset_name +",PROD)" - ,"aspects": [ - {"com.linkedin.pegasus2avro.common.Ownership": owners} - , {"com.linkedin.pegasus2avro.dataset.UpstreamLineage": upstreams} - , {"com.linkedin.pegasus2avro.common.InstitutionalMemory": elements} - , {"com.linkedin.pegasus2avro.schema.SchemaMetadata": schema_name} - ]}}, - "proposedDelta": None} - - print(json.dumps(mce)) - -databases = hive_query('show databases') -for database in databases: - tables = hive_query('show tables in ' + database[0]) - for table in tables: - dataset_name = database[0] + '.' + table[0] - description = hive_query('describe extended ' + dataset_name) - build_hive_dataset_mce(dataset_name, str(description[:-1][:-1]), description[-1][1].split(',')) - -sys.exit(0) diff --git a/contrib/metadata-ingestion/haskell/bin/dataset-hive-generator.py.nix b/contrib/metadata-ingestion/haskell/bin/dataset-hive-generator.py.nix deleted file mode 100644 index 9bfce06fdd775..0000000000000 --- a/contrib/metadata-ingestion/haskell/bin/dataset-hive-generator.py.nix +++ /dev/null @@ -1,61 +0,0 @@ -with import {} ; -let - avro-python3-1_8 = python3Packages.buildPythonPackage rec { - pname = "avro-python3" ; - version = "1.8.2" ; - - src = python3Packages.fetchPypi { - inherit pname version ; - sha256 = "f82cf0d66189600b1e6b442f650ad5aca6c189576723dcbf6f9ce096eab81bd6" ; - } ; - doCheck = false; - } ; - - sasl = python3Packages.buildPythonPackage rec { - pname = "sasl" ; - version = "0.2.1" ; - - src = python3Packages.fetchPypi { - inherit pname version ; - sha256 = "04f22e17bbebe0cd42471757a48c2c07126773c38741b1dad8d9fe724c16289d" ; - } ; - doCheck = false; - propagatedBuildInputs = [ cyrus_sasl ] ++ (with python3Packages ; [six]) ; - } ; - - thrift_sasl = python3Packages.buildPythonPackage rec { - pname = "thrift_sasl" ; - version = "0.4.2" ; - - src = python3Packages.fetchPypi { - inherit pname version ; - sha256 = "6a1c54731cb3ce61bdc041d9dc36e21f0f56db0163bb7b57be84de3fda70922f" ; - } ; - doCheck = false; - propagatedBuildInputs = with python3Packages; [ thrift sasl ] ; - } ; - - PyHive = python3Packages.buildPythonPackage rec { - pname = "PyHive" ; - version = "0.6.1" ; - - src = python3Packages.fetchPypi { - inherit pname version ; - sha256 = "a5f2b2f8bcd85a8cd80ab64ff8fbfe1c09515d266650a56f789a8d89ad66d7f4" ; - } ; - doCheck = false; - propagatedBuildInputs = with python3Packages ; [ dateutil future thrift sasl thrift_sasl ]; - } ; - -in -mkShell { - buildInputs = (with python3Packages ;[ - python - requests - PyHive - - simplejson - # avro-python3-1_8 - # confluent-kafka - ]) ; -} diff --git a/contrib/metadata-ingestion/haskell/bin/dataset-jdbc-generator.hs b/contrib/metadata-ingestion/haskell/bin/dataset-jdbc-generator.hs deleted file mode 100755 index 68748f38ddbbb..0000000000000 --- a/contrib/metadata-ingestion/haskell/bin/dataset-jdbc-generator.hs +++ /dev/null @@ -1,213 +0,0 @@ -#! /usr/bin/env nix-shell -#! nix-shell dataset-jdbc-generator.hs.nix -i "runghc --ghc-arg=-fobject-code" - -{-# LANGUAGE OverloadedStrings, FlexibleInstances, FlexibleContexts, ScopedTypeVariables #-} -{-# LANGUAGE DataKinds #-} -{-# LANGUAGE TemplateHaskell, QuasiQuotes #-} -{-# LANGUAGE TypeOperators #-} - - -{-# OPTIONS_GHC -fplugin=Language.Java.Inline.Plugin #-} - - -import System.Environment (lookupEnv) -import System.IO (hPrint, stderr, hSetEncoding, stdout, utf8) - -import qualified Language.Haskell.TH.Syntax as TH - -import Control.Concurrent (runInBoundThread) -import Language.Java (J, withJVM, reify, reflect, JType(..)) -import Language.Java.Inline (imports, java) - -import qualified Data.Text as T -import qualified Data.Text.IO as T -import Data.String.Conversions (cs) -import Text.InterpolatedString.Perl6 (q) - -import Prelude hiding ((>>=), (>>)) - -import Data.Conduit (ConduitT, ZipSink(..), getZipSink, runConduitRes, runConduit, bracketP, (.|), yield) -import qualified Data.Conduit.Combinators as C -import qualified Data.Conduit.List as C (groupBy) - -import qualified Data.Aeson as J -import Control.Arrow ((>>>)) -import Data.Aeson.QQ (aesonQQ) - -import Text.Printf (printf) - - -imports "java.util.*" -imports "java.sql.*" - - -datasetOracleSql :: T.Text -datasetOracleSql = [q| - select - c.OWNER || '.' || c.TABLE_NAME as schema_name - , t.COMMENTS as schema_description - , c.COLUMN_NAME as field_path - , c.DATA_TYPE as native_data_type - , m.COMMENTS as description - from ALL_TAB_COLUMNS c - left join ALL_TAB_COMMENTS t - on c.OWNER = t.OWNER - and c.TABLE_NAME = t.TABLE_NAME - left join ALL_COL_COMMENTS m - on c.OWNER = m.OWNER - and c.TABLE_NAME = m.TABLE_NAME - and c.COLUMN_NAME = m.COLUMN_NAME - where NOT REGEXP_LIKE(c.OWNER, 'ANONYMOUS|PUBLIC|SYS|SYSTEM|DBSNMP|MDSYS|CTXSYS|XDB|TSMSYS|ORACLE.*|APEX.*|TEST?*|GG_.*|\\$') - order by schema_name, c.COLUMN_ID -|] - -datasetMysqlSql :: T.Text -datasetMysqlSql = [q| - select - concat(c.TABLE_SCHEMA, '.', c.TABLE_NAME) as schema_name - , NULL as schema_description - , c.COLUMN_NAME as field_path - , c.DATA_TYPE as native_data_type - , c.COLUMN_COMMENT as description - from information_schema.columns c - where table_schema not in ('information_schema') - order by schema_name, c.ORDINAL_POSITION -|] - -datasetPostgresqlSql :: T.Text -datasetPostgresqlSql = [q| - SELECT - c.table_schema || '.' || c.table_name as schema_name - , pgtd.description as schema_description - , c.column_name as field_path - , c.data_type as native_data_type - , pgcd.description as description - FROM INFORMATION_SCHEMA.COLUMNS c - INNER JOIN - pg_catalog.pg_statio_all_tables as st on c.table_schema=st.schemaname and c.table_name=st.relname - LEFT JOIN - pg_catalog.pg_description pgcd on pgcd.objoid=st.relid and pgcd.objsubid=c.ordinal_position - LEFT JOIN - pg_catalog.pg_description pgtd on pgtd.objoid=st.relid and pgtd.objsubid=0 - WHERE c.table_schema NOT IN ('information_schema', 'pg_catalog') - ORDER by schema_name, ordinal_position ; -|] - - -mkMCE :: Int -> T.Text -> [[T.Text]] -> J.Value -mkMCE ts platform fields@((schemaName:schemaDescription:_):_) = [aesonQQ| - { "proposedSnapshot": { - "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { - "urn": #{urn} - , "aspects": [ - { "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [{"owner": "urn:li:corpuser:datahub", "type":"DATAOWNER"}] - , "lastModified": {"time": #{ts}, "actor": "urn:li:corpuser:datahub"} - } - } - , { "com.linkedin.pegasus2avro.dataset.DatasetProperties": { - "description": {"string": #{schemaDescription}} - } - } - , { "com.linkedin.pegasus2avro.schema.SchemaMetadata": { - "schemaName": #{schemaName} - , "platform": "urn:li:dataPlatform:" - , "version": 0 - , "created": {"time": #{ts}, "actor": "urn:li:corpuser:datahub"} - , "lastModified": {"time": #{ts}, "actor": "urn:li:corpuser:datahub"} - , "hash": "" - , "platformSchema": { - "com.linkedin.pegasus2avro.schema.MySqlDDL": { - "documentSchema": "{}" - , "tableSchema": "{}" - } - } - , "fields": #{mkFields fields} - } - } - ] - } - } - } - |] - where - urn :: String = printf "urn:li:dataset:(urn:li:dataPlatform:%s,%s,%s)" - platform schemaName ("PROD"::String) - mkField (_:_:fieldPath:nativeDataType:description:[]) = [aesonQQ| - { - "fieldPath": #{fieldPath} - , "description": {"string": #{description}} - , "type": {"type": {"com.linkedin.pegasus2avro.schema.StringType": {}}} - , "nativeDataType": #{nativeDataType} - } - |] - mkFields = map mkField - -main :: IO () -main = do - hSetEncoding stdout utf8 - let - jvmArgs = case $(TH.lift =<< TH.runIO (lookupEnv "CLASSPATH")) of - Nothing -> [] - Just cp -> [ cs ("-Djava.class.path=" ++ cp) ] - platform :: T.Text = "localhost_postgresql" - -- dbUrl :: T.Text = "jdbc:mysql://localhost:3306/datahub?useSSL=false" - -- dbUrl :: T.Text = "jdbc:oracle:thin@localhost:1521:EDWDB" - dbUrl :: T.Text = "jdbc:postgresql://localhost:5432/datahub" - - dbUser :: T.Text = "datahub" - dbPassword :: T.Text = "datahub" - - -- dbDriver:: T.Text = "oracle.jdbc.OracleDriver" ; - -- dbDriver:: T.Text = "com.mysql.jdbc.Driver" ; - dbDriver:: T.Text = "org.postgresql.Driver" ; - -- dbDriver:: T.Text = "com.microsoft.sqlserver.jdbc.SQLServerDriver" ; - - -- dbSQL :: T.Text = datasetMysqlSql - -- dbSQL :: T.Text = datasetOracleSql - dbSQL :: T.Text = datasetPostgresqlSql - runInBoundThread $ withJVM jvmArgs $ do - [jDbUrl, jDbUser, jDbPassword, jDbDriver, jDbSQL ] <- - mapM reflect [dbUrl, dbUser, dbPassword, dbDriver, dbSQL] - - result <- [java| { - try { - Class.forName($jDbDriver) ; - } catch (ClassNotFoundException e) { - e.printStackTrace() ; - System.exit(1) ; - } - - List result = new ArrayList() ; - try (Connection con = DriverManager.getConnection($jDbUrl, $jDbUser, $jDbPassword) ; - Statement st = con.createStatement(); ) { - try (ResultSet rs = st.executeQuery($jDbSQL)) { - while(rs.next()) { - String[] row = new String[] { - Optional.ofNullable(rs.getString("schema_name")).orElse("") - , Optional.ofNullable(rs.getString("schema_description")).orElse("") - , Optional.ofNullable(rs.getString("field_path")).orElse("") - , Optional.ofNullable(rs.getString("native_data_type")).orElse("") - , Optional.ofNullable(rs.getString("description")).orElse("") - } ; - result.add(row) ; - } - } - return result.toArray(new String[0][0]) ; - } catch (SQLException e) { - e.printStackTrace() ; - return null ; - } - } |] - - rows :: [[T.Text]] <- reify result - runConduit $ C.yieldMany rows - -- .| C.iterM (hPrint stderr) - .| C.groupBy sameSchemaName - -- .| C.iterM (hPrint stderr) - .| C.map (mkMCE 0 platform) - .| C.mapM_ (J.encode >>> cs >>> putStrLn) - .| C.sinkNull - return () - where - sameSchemaName (schemaNameL:_) (schemaNameR:_) = schemaNameL == schemaNameR diff --git a/contrib/metadata-ingestion/haskell/bin/dataset-jdbc-generator.hs.nix b/contrib/metadata-ingestion/haskell/bin/dataset-jdbc-generator.hs.nix deleted file mode 100644 index c7f48e0163be1..0000000000000 --- a/contrib/metadata-ingestion/haskell/bin/dataset-jdbc-generator.hs.nix +++ /dev/null @@ -1,54 +0,0 @@ -with import {} ; -let - inline_java_git = fetchFromGitHub { - owner = "tweag" ; - repo = "inline-java" ; - rev = "a897d32df99e4ed19314d2a7e245785152e9099d" ; - sha256 = "00pk19j9g0mm9sknj3aklz01zv1dy234s3vnzg6daq1dmwd4hb68" ; - } ; - haskellPackages = pkgs.haskellPackages.override { - overrides = self: super: with pkgs.haskell.lib; { - jni = overrideCabal (self.callCabal2nix "jni" (inline_java_git + /jni) {}) (drv: { - preConfigure = '' - local libdir=( "${pkgs.jdk}/lib/openjdk/jre/lib/"*"/server" ) - configureFlags+=" --extra-lib-dir=''${libdir[0]}" - '' ; - }) ; - - jvm = overrideCabal (self.callCabal2nix "jvm" (inline_java_git + /jvm) {}) (drv: { - doCheck = false ; - }) ; - inline-java = overrideCabal (self.callCabal2nix "inline-java" inline_java_git {}) (drv: { - doCheck = false ; - }) ; - jvm-batching = overrideCabal (self.callCabal2nix "jvm-batching" (inline_java_git + /jvm-batching) {}) (drv: { - doCheck = false ; - }) ; - jvm-streaming = overrideCabal (self.callCabal2nix "jvm-streaming" (inline_java_git + /jvm-streaming) {}) (drv: { - doCheck = false ; - }) ; - - } ; - }; - -in -mkShell { - buildInputs = [ - pkgs.jdk - pkgs.postgresql_jdbc - pkgs.mysql_jdbc - pkgs.mssql_jdbc - pkgs.oracle-instantclient - - (haskellPackages.ghcWithPackages ( p: - [ p.bytestring p.string-conversions - p.interpolatedstring-perl6 - p.aeson p.aeson-qq - p.exceptions - p.inline-java - - p.conduit - ] - )) - ]; -} diff --git a/contrib/metadata-ingestion/haskell/bin/lineage_hive_generator.hs b/contrib/metadata-ingestion/haskell/bin/lineage_hive_generator.hs deleted file mode 100755 index a8549f39da0a9..0000000000000 --- a/contrib/metadata-ingestion/haskell/bin/lineage_hive_generator.hs +++ /dev/null @@ -1,113 +0,0 @@ -#! /usr/bin/env nix-shell -#! nix-shell ./lineage_hive_generator.hs.nix -i runghc - -{-# LANGUAGE OverloadedStrings #-} -{-# LANGUAGE TemplateHaskell #-} -{-# LANGUAGE QuasiQuotes #-} - -import Data.Functor ((<&>)) -import Control.Monad (when) -import Control.Arrow ((>>>)) -import Data.Proxy (Proxy(..)) -import Data.Either (isLeft, fromLeft, fromRight) - -import Text.Printf (formatString) - -import System.IO (hPrint, stderr) - -import Data.String.Conversions (cs) -import qualified Data.Text.Lazy as T -import qualified Data.Text.Lazy.IO as T - -import qualified Data.Map as M -import qualified Data.Set as S -import qualified Data.HashMap.Strict as HM -import qualified Data.Aeson as J - -import Data.Conduit (ConduitT, runConduitRes, runConduit, bracketP, (.|)) -import qualified Data.Conduit.Combinators as C - -import qualified Database.Sql.Hive.Parser as HIVE -import qualified Database.Sql.Hive.Type as HIVE - -import Database.Sql.Type ( - Catalog(..), DatabaseName(..), FullyQualifiedTableName(..), FQTN(..) - , makeDefaultingCatalog, mkNormalSchema - ) - -import Database.Sql.Util.Scope (runResolverWarn) -import Database.Sql.Util.Lineage.Table (getTableLineage) - -import Data.Aeson.QQ (aesonQQ) -import Data.Time.Clock.POSIX (getPOSIXTime) - - -instance J.ToJSON FullyQualifiedTableName -instance J.ToJSONKey FullyQualifiedTableName - -nowts :: IO Int -nowts = floor . (* 1000) <$> getPOSIXTime - -catalog :: Catalog -catalog = makeDefaultingCatalog HM.empty - [mkNormalSchema "public" ()] - (DatabaseName () "defaultDatabase") - -tableName :: FullyQualifiedTableName -> T.Text -tableName (FullyQualifiedTableName database schema name) = T.intercalate "." [database, schema, name] - -mkMCE :: Int -> (FQTN, S.Set FQTN) -> J.Value -mkMCE ts (output, inputs) = [aesonQQ| - { "proposedSnapshot": { - "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { - "urn": #{uriName output} - , "aspects": [ - { "com.linkedin.pegasus2avro.dataset.UpstreamLineage": { - "upstreams": #{upstreams ts inputs} - } - } - ] - } - } - } - |] - where - upstream :: Int -> T.Text -> J.Value - upstream ts dataset = [aesonQQ| - { "auditStamp": {"time":#{ts}, "actor":"urn:li:corpuser:jdoe"} - , "dataset": #{dataset} - , "type":"TRANSFORMED" - } - |] - upstreams ts = map (upstream ts . uriName) . S.toList - uriName :: FQTN -> T.Text - uriName fqtn = "urn:li:dataset:(urn:li:dataPlatform:hive," - <> tableName fqtn - <> ",PROD)" - - -main = do - contents <- T.getContents <&> T.lines - ts <- nowts - - runConduit $ C.yieldMany contents - .| C.iterM (hPrint stderr) - .| C.mapM (cs >>> T.readFile) - .| C.concatMap parseSQL - .| C.mapM resolveStatement - .| C.concatMap (getTableLineage >>> M.toList) - .| C.map (mkMCE ts) - .| C.mapM_ (J.encode >>> cs >>> putStrLn) - where - parseSQL sql = do - let stOrErr = HIVE.parseManyAll sql - when (isLeft stOrErr) $ - error $ show (fromLeft undefined stOrErr) - fromRight undefined stOrErr - resolveStatement st = do - let resolvedStOrErr = runResolverWarn (HIVE.resolveHiveStatement st) HIVE.dialectProxy catalog - when (isLeft . fst $ resolvedStOrErr) $ - error $ show (fromLeft undefined (fst resolvedStOrErr)) - let (Right queryResolved, resolutions) = resolvedStOrErr - return queryResolved - diff --git a/contrib/metadata-ingestion/haskell/bin/lineage_hive_generator.hs.nix b/contrib/metadata-ingestion/haskell/bin/lineage_hive_generator.hs.nix deleted file mode 100644 index 17e5883a46185..0000000000000 --- a/contrib/metadata-ingestion/haskell/bin/lineage_hive_generator.hs.nix +++ /dev/null @@ -1,31 +0,0 @@ -with import {} ; -let - queryparser_git = fetchFromGitHub { - owner = "uber" ; - repo = "queryparser" ; - rev = "6015e8f273f4498326fec0315ac5580d7036f8a4" ; - sha256 = "05pnifm5awyqxi6330v791b1cvw26xbcn2r20pqakvl8d3xyaxa4" ; - } ; - haskellPackages = pkgs.haskellPackages.override { - overrides = self: super: with pkgs.haskell.lib; { - queryparser = appendConfigureFlag - (dontHaddock (doJailbreak (self.callCabal2nix "queryparser" queryparser_git {}))) - "--ghc-options=-XNoMonadFailDesugaring" ; - queryparser-hive = dontHaddock (doJailbreak (self.callCabal2nix "queryparser-hive" (queryparser_git + /dialects/hive) {})) ; - } ; - }; - -in -mkShell { - buildInputs = [ - (haskellPackages.ghcWithPackages ( p: - [ p.bytestring p.text p.string-conversions - p.exceptions p.time - p.aeson p.aeson-qq - p.conduit - p.queryparser p.queryparser-hive - ] - )) - ]; -} - diff --git a/contrib/metadata-ingestion/haskell/config/MetadataChangeEvent.avsc b/contrib/metadata-ingestion/haskell/config/MetadataChangeEvent.avsc deleted file mode 120000 index d5576bd9d8c72..0000000000000 --- a/contrib/metadata-ingestion/haskell/config/MetadataChangeEvent.avsc +++ /dev/null @@ -1 +0,0 @@ -../../../metadata-events/mxe-schemas/src/renamed/avro/com/linkedin/mxe/MetadataChangeEvent.avsc \ No newline at end of file diff --git a/contrib/metadata-ingestion/haskell/config/datahub-config.nix b/contrib/metadata-ingestion/haskell/config/datahub-config.nix deleted file mode 120000 index 80fbf896760fc..0000000000000 --- a/contrib/metadata-ingestion/haskell/config/datahub-config.nix +++ /dev/null @@ -1 +0,0 @@ -../../nix/datahub-config.nix \ No newline at end of file diff --git a/contrib/metadata-ingestion/haskell/sample/hive_1.sql b/contrib/metadata-ingestion/haskell/sample/hive_1.sql deleted file mode 100644 index 8d9eb94bcb834..0000000000000 --- a/contrib/metadata-ingestion/haskell/sample/hive_1.sql +++ /dev/null @@ -1,144 +0,0 @@ -WITH TL AS -( - SELECT B1.CPC, - B1.CSC, - B1.CSN, - B1.CSC_P, - B1.CCC, - B1.NSD, - CASE - WHEN B1.CSL = 4 THEN - B5.CSN - ELSE - B1.CSN - END AS GL_ATTR, - B1.CSL, - CASE WHEN B1.CSL = 4 THEN B4.CSN - WHEN B1.CSL = 3 THEN B3.CSN - WHEN B1.CSL = 2 THEN B2.CSN - WHEN B1.CSL = 1 THEN B1.CSN - END AS FACCTNAME_LV1, - CASE WHEN B1.CSL = 4 THEN B3.CSN - WHEN B1.CSL = 3 THEN B2.CSN - WHEN B1.CSL = 2 THEN B1.CSN - END AS FACCTNAME_LV2, - CASE WHEN B1.CSL = 4 THEN B2.CSN - WHEN B1.CSL = 3 THEN B1.CSN - END AS FACCTNAME_LV3, - CASE WHEN B1.CSL = 4 THEN B1.CSN - END AS FACCTNAME_LV4 - FROM (SELECT CPC, CSC, CSN, CSC_P, CCC - , NSD, CSL - FROM ODS.FVS - WHERE HDATASRC = 'A' - ) B1 - LEFT JOIN - (SELECT CPC, CSC, CSN, CSC_P, CCC - , NSD, CSL - FROM ODS.FVS - WHERE HDATASRC = 'A' - ) B2 - ON B1.CPC = B2.CPC - AND B1.CSC_P = B2.CSC - LEFT JOIN - (SELECT CPC, CSC, CSN, CSC_P, CCC - , NSD, CSL - FROM ODS.FVS - WHERE HDATASRC = 'A' - ) B3 - ON B2.CPC = B3.CPC - AND B2.CSC_P = B3.CSC - LEFT JOIN - (SELECT CPC, CSC, CSN, CSC_P, CCC - , NSD, CSL - FROM ODS.FVS - WHERE HDATASRC = 'A' - ) B4 - ON B3.CPC = B4.CPC - AND B3.CSC_P = B4.CSC - LEFT JOIN - (SELECT CPC, CSC, CSN, CSC_P, CCC - , NSD, CSL - FROM ODS.FVS - WHERE HDATASRC = 'A' - ) B5 - ON B1.CPC = B5.CPC - AND B1.CSC_P = B5.CSC -) -INSERT OVERWRITE TABLE TMP.TFVDM1 PARTITION (HDATASRC = 'A') -SELECT qt_sequence("UUID", A.CAC) AS UUID, - C.PH AS PH, - A.CAC AS PC, - A.CPC AS ASS, - A.D_BIZ AS BD, - E.CH AS CH, - F.EH AS EH, - A.CSC AS GL_CODE, - CASE - WHEN A.CSN = ' ' THEN - A.C_KEY_NAME - ELSE - NVL(A.CSN,A.C_KEY_NAME) - END AS GL_NAME, - A.N_VALPRICE AS ATPRICE, - A.N_HLDAMT AS ATQTY, - A.N_HLDCST_LOCL AS ATCOST, - A.N_HLDCST AS ATCOST_ORICUR, - A.N_HLDMKV_LOCL AS ATMKTVAL, - A.N_HLDMKV AS ATMKTVAL_ORICUR, - A.N_HLDVVA_L AS ATVAL_ADDED, - A.N_HLDVVA AS ATVAL_ADDED_ORICUR, - A.N_VALRATE AS ATEXRATE, - NULL AS COST_TO_AT_RIO, - NULL AS MKTVAL_TO_AT_RIO, - B.NSD AS IS_DETAIL_GL, - A.C_PA_CODE AS ATITEM, - A.C_IVT_CLSS AS INVEST_CLASS, - A.C_ML_ATTR AS ISSUE_MODE, - A.C_FEE_CODE AS FEE_CODE, - A.C_SEC_VAR_MX AS SEC_KIND, - A.C_TD_ATTR AS TRADE_ATTR, - H.C_CA_ATTR AS CASH_ACCOUNT, - A.GL_LV1 AS GL_LV1, - B.FACCTNAME_LV1 AS GL_NAME_LV1, - B.FACCTNAME_LV2 AS GL_NAME_LV2, - B.FACCTNAME_LV3 AS GL_NAME_LV3, - B.FACCTNAME_LV4 AS GL_NAME_LV4, - NULL AS GL_NAME_LV5, - NULL AS GL_NAME_LV6, - A.CSC_T AS GL_ATTR_CODE, - CASE WHEN B.GL_ATTR = '' THEN A.CSN - ELSE B.GL_ATTR END AS GL_ATTR, - NVL(B.CSN, A.C_KEY_NAME) AS GL_FNAME, - A.C_SEC_CODE AS SEC_CODE_FA, - NULL AS SYMBOL_ORI, - NULL AS SYMBOL, - NULL AS SEC_TYPE , - FROM_UNIXTIME(UNIX_TIMESTAMP(CURRENT_TIMESTAMP()),'yyyy-MM-dd HH:mm:ss') AS HLOADTIME, - '20190101' AS HBATCHDATE - FROM (SELECT SUBSTR(T.CSC, 1, 4) AS GL_LV1, - T.* - FROM ODS.FVRV T - WHERE T.D_BIZ IN (SELECT BD - FROM CTL.CFD - WHERE HDATASRC = 'A') - AND T.HDATASRC = 'A' - ) A - LEFT JOIN TL B - ON NVL(A.CSC_T, A.CSC) = B.CSC - AND A.CPC = B.CPC - LEFT JOIN DW.PPCM C - ON A.CPC = C.ORI_SYS_PC - AND C.ORI_SYS_HCODE = 'A' - AND A.D_BIZ BETWEEN C.STDATE AND C.ENDDATE - LEFT JOIN DW.RCM E - ON A.CCC = E.ORI_SYS_CR_CODE - AND E.ORI_SYS_HCODE = 'A' - LEFT JOIN DW.REM F - ON A.C_MKT_CODE = F.ORI_SYS_EXCH_CODE - AND F.ORI_SYS_HCODE = 'A' - LEFT JOIN (SELECT C_CA_CODE, MAX(C_CA_ATTR) AS C_CA_ATTR - FROM ODS.FVC - GROUP BY C_CA_CODE) H - ON A.C_CA_CODE = H.C_CA_CODE - diff --git a/contrib/metadata-ingestion/haskell/sample/mce.json.dat b/contrib/metadata-ingestion/haskell/sample/mce.json.dat deleted file mode 100644 index da70cd52418d5..0000000000000 --- a/contrib/metadata-ingestion/haskell/sample/mce.json.dat +++ /dev/null @@ -1,5 +0,0 @@ -{"proposedSnapshot": {"com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot": {"urn": "urn:li:corpuser:datahub", "aspects": [{"com.linkedin.pegasus2avro.identity.CorpUserInfo":{"active": true, "displayName": {"string": "Data Hub"}, "email": "datahub@linkedin.com", "title": {"string": "CEO"}, "fullName": {"string": "Data Hub"}}}]}}} -{"proposedSnapshot": {"com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot": {"urn": "urn:li:corpuser:jdoe", "aspects": [{"com.linkedin.pegasus2avro.identity.CorpUserInfo":{"active": true, "displayName": {"string": "John Doe"}, "email": "jdoe@linkedin.com", "title": {"string": "Software Engineer"}, "fullName": {"string": "John Doe"}}}]}}} -{"proposedSnapshot": {"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {"urn": "urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)", "aspects": [{"com.linkedin.pegasus2avro.common.Ownership": {"owners":[{"owner":"urn:li:corpuser:jdoe","type":"DATAOWNER"}, {"owner":"urn:li:corpuser:datahub","type":"DATAOWNER"}],"lastModified":{"time":1581407189000,"actor":"urn:li:corpuser:jdoe"}}},{"com.linkedin.pegasus2avro.dataset.UpstreamLineage":{"upstreams":[]}}, {"com.linkedin.pegasus2avro.common.InstitutionalMemory":{"elements":[{"url":"https://www.linkedin.com","description":"Sample doc","createStamp":{"time":1581407189000,"actor":"urn:li:corpuser:jdoe"}}]}},{"com.linkedin.pegasus2avro.schema.SchemaMetadata":{"schemaName":"SampleKafkaSchema","platform":"urn:li:dataPlatform:kafka","version":0, "created":{"time":1581407189000,"actor":"urn:li:corpuser:jdoe"}, "lastModified":{"time":1581407189000,"actor":"urn:li:corpuser:jdoe"}, "hash":"","platformSchema":{"com.linkedin.pegasus2avro.schema.KafkaSchema":{"documentSchema":"{\"type\":\"record\",\"name\":\"SampleKafkaSchema\",\"namespace\":\"com.linkedin.dataset\",\"doc\":\"Sample Kafka dataset.\",\"fields\":[{\"name\":\"field_foo\",\"type\":[\"string\"]},{\"name\":\"field_bar\",\"type\":[\"boolean\"]}]}"}}, "fields":[{"fieldPath":"field_foo", "description":{"string": "Foo field description"},"type":{"type":{"com.linkedin.pegasus2avro.schema.StringType":{}}},"nativeDataType":"string"}, {"fieldPath":"field_bar", "description":{"string": "Bar field description"},"type":{"type":{"com.linkedin.pegasus2avro.schema.BooleanType":{}}},"nativeDataType":"boolean"}]}}] } } } -{"proposedSnapshot": {"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {"urn": "urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD)", "aspects": [{"com.linkedin.pegasus2avro.common.Ownership": {"owners":[{"owner":"urn:li:corpuser:jdoe","type":"DATAOWNER"}, {"owner":"urn:li:corpuser:datahub","type":"DATAOWNER"}],"lastModified":{"time":1581407189000,"actor":"urn:li:corpuser:jdoe"}}},{"com.linkedin.pegasus2avro.dataset.UpstreamLineage":{"upstreams":[{"auditStamp":{"time":1581407189000,"actor":"urn:li:corpuser:jdoe"},"dataset":"urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)","type":"TRANSFORMED"}]}}, {"com.linkedin.pegasus2avro.common.InstitutionalMemory":{"elements":[{"url":"https://www.linkedin.com","description":"Sample doc","createStamp":{"time":1581407189000,"actor":"urn:li:corpuser:jdoe"}}]}},{"com.linkedin.pegasus2avro.schema.SchemaMetadata":{"schemaName":"SampleHdfsSchema","platform":"urn:li:dataPlatform:hdfs","version":0, "created":{"time":1581407189000,"actor":"urn:li:corpuser:jdoe"}, "lastModified":{"time":1581407189000,"actor":"urn:li:corpuser:jdoe"}, "hash":"","platformSchema":{"com.linkedin.pegasus2avro.schema.KafkaSchema":{"documentSchema":"{\"type\":\"record\",\"name\":\"SampleHdfsSchema\",\"namespace\":\"com.linkedin.dataset\",\"doc\":\"Sample HDFS dataset.\",\"fields\":[{\"name\":\"field_foo\",\"type\":[\"string\"]},{\"name\":\"field_bar\",\"type\":[\"boolean\"]}]}"}}, "fields":[{"fieldPath":"field_foo", "description":{"string": "Foo field description"},"type":{"type":{"com.linkedin.pegasus2avro.schema.StringType":{}}},"nativeDataType":"string"}, {"fieldPath":"field_bar", "description":{"string": "Bar field description"},"type":{"type":{"com.linkedin.pegasus2avro.schema.BooleanType":{}}},"nativeDataType":"boolean"}]}}] } } } -{"proposedSnapshot": {"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {"urn": "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)", "aspects": [{"com.linkedin.pegasus2avro.common.Ownership": {"owners":[{"owner":"urn:li:corpuser:jdoe","type":"DATAOWNER"}, {"owner":"urn:li:corpuser:datahub","type":"DATAOWNER"}],"lastModified":{"time":1581407189000,"actor":"urn:li:corpuser:jdoe"}}},{"com.linkedin.pegasus2avro.dataset.UpstreamLineage":{"upstreams":[{"auditStamp":{"time":1581407189000,"actor":"urn:li:corpuser:jdoe"},"dataset":"urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD)","type":"TRANSFORMED"}]}}, {"com.linkedin.pegasus2avro.common.InstitutionalMemory":{"elements":[{"url":"https://www.linkedin.com","description":"Sample doc","createStamp":{"time":1581407189000,"actor":"urn:li:corpuser:jdoe"}}]}},{"com.linkedin.pegasus2avro.schema.SchemaMetadata":{"schemaName":"SampleHiveSchema","platform":"urn:li:dataPlatform:hive","version":0, "created":{"time":1581407189000,"actor":"urn:li:corpuser:jdoe"}, "lastModified":{"time":1581407189000,"actor":"urn:li:corpuser:jdoe"}, "hash":"","platformSchema":{"com.linkedin.pegasus2avro.schema.KafkaSchema":{"documentSchema":"{\"type\":\"record\",\"name\":\"SampleHiveSchema\",\"namespace\":\"com.linkedin.dataset\",\"doc\":\"Sample Hive dataset.\",\"fields\":[{\"name\":\"field_foo\",\"type\":[\"string\"]},{\"name\":\"field_bar\",\"type\":[\"boolean\"]}]}"}}, "fields":[{"fieldPath":"field_foo", "description":{"string": "Foo field description"},"type":{"type":{"com.linkedin.pegasus2avro.schema.StringType":{}}},"nativeDataType":"string"}, {"fieldPath":"field_bar", "description":{"string": "Bar field description"},"type":{"type":{"com.linkedin.pegasus2avro.schema.BooleanType":{}}},"nativeDataType":"boolean"}]}}] } } } \ No newline at end of file diff --git a/contrib/metadata-ingestion/python/.gitkeep b/contrib/metadata-ingestion/python/.gitkeep deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/contrib/metadata-ingestion/python/looker/dashboard_ingestion/README.md b/contrib/metadata-ingestion/python/looker/dashboard_ingestion/README.md deleted file mode 100644 index dd70429597e6b..0000000000000 --- a/contrib/metadata-ingestion/python/looker/dashboard_ingestion/README.md +++ /dev/null @@ -1,15 +0,0 @@ -## looker_dashboard_ingestion.py -This tool helps ingest Looker dashboard and chart metadata into datahub. -Currently it creates a separate platform named "looker" and loads all dashboard and chart information into that platform as virtual datasets. This was to workaround datahub's lack of support for dashboard entities, however datahub recently started supporting proper dashboard entities. - -The script assumes you already have run lookml_ingestion.py to scrape view definitions into datahub, this is important because we assign lineage between looker views and looker dashboards/charts where possible. - - -## Steps: -- Use a version of python >= 3.7 -- Make a virtual environment -- pip install -r requirements.txt -- Set env vars: LOOKERSDK_CLIENT_ID, LOOKERSDK_CLIENT_SECRET, LOOKERSDK_BASE_URL -- Configure extra kafka conf in looker_dashboard_ingestion.py - -python looker_dashboard_ingestion.py \ No newline at end of file diff --git a/contrib/metadata-ingestion/python/looker/dashboard_ingestion/looker_dashboard_ingestion.py b/contrib/metadata-ingestion/python/looker/dashboard_ingestion/looker_dashboard_ingestion.py deleted file mode 100644 index 22fc9d8718ea9..0000000000000 --- a/contrib/metadata-ingestion/python/looker/dashboard_ingestion/looker_dashboard_ingestion.py +++ /dev/null @@ -1,426 +0,0 @@ -#! /usr/bin/python -import time -import os -import json -import typing -from pprint import pprint -import looker_sdk -from looker_sdk.sdk.api31.models import Query, DashboardElement, LookWithQuery, Dashboard -from looker_sdk.error import SDKError - -from dataclasses import dataclass - -from confluent_kafka import avro -from confluent_kafka.avro import AvroProducer - -# Configuration -AVSC_PATH = "../../metadata-events/mxe-schemas/src/renamed/avro/com/linkedin/mxe/MetadataChangeEvent.avsc" -KAFKA_TOPIC = 'MetadataChangeEvent_v4' - -# Set the following environmental variables to hit Looker's API -# LOOKERSDK_CLIENT_ID=YourClientID -# LOOKERSDK_CLIENT_SECRET=YourClientSecret -# LOOKERSDK_BASE_URL=https://company.looker.com:19999 -LOOKERSDK_BASE_URL = os.environ["LOOKERSDK_BASE_URL"] - -EXTRA_KAFKA_CONF = { - 'bootstrap.servers': 'localhost:9092', - 'schema.registry.url': 'http://localhost:8081' - # 'security.protocol': 'SSL', - # 'ssl.ca.location': '', - # 'ssl.key.location': '', - # 'ssl.certificate.location': '' -} - -# The datahub platform where looker views are stored, must be the same as VIEW_DATAHUB_PLATFORM in lookml_ingestion.py -VIEW_DATAHUB_PLATFORM = "looker_views" -# The datahub platform where looker dashboards will be stored -VISUALIZATION_DATAHUB_PLATFORM = "looker" - - -@dataclass -class LookerDashboardElement: - id: str - title: str - query_slug: str - looker_views: typing.List[str] - look_id: typing.Optional[str] - - @property - def url(self) -> str: - base_url = get_looker_base_url() - - # A dashboard element can use a look or just a raw query against an explore - if self.look_id is not None: - return base_url + "/looks/" + self.look_id - else: - return base_url + "/x/" + self.query_slug - - def get_urn_element_id(self): - # A dashboard element can use a look or just a raw query against an explore - return f"dashboard_elements.{self.id}" - - def get_view_urns(self) -> typing.List[str]: - return [f"urn:li:dataset:(urn:li:dataPlatform:{VIEW_DATAHUB_PLATFORM},{v},PROD)" for v in self.looker_views] - - -@dataclass -class LookerDashboard: - id: str - title: str - description: str - dashboard_elements: typing.List[LookerDashboardElement] - - @property - def url(self): - return get_looker_base_url() + "/dashboards/" + self.id - - def get_urn_dashboard_id(self): - return f"dashboards.{self.id}" - - -@dataclass -class DashboardKafkaEvents: - dashboard_mce: typing.Dict - chart_mces: typing.List[typing.Dict] - - def all_mces(self) -> typing.List[typing.Dict]: - return self.chart_mces + [self.dashboard_mce] - - -def get_looker_base_url(): - base_url = LOOKERSDK_BASE_URL.split("looker.com")[0] + "looker.com" - return base_url - - -def get_actor_and_sys_time(): - actor, sys_time = "urn:li:corpuser:analysts", int(time.time()) * 1000 - return actor, sys_time - - -class ProperDatahubEvents: - """ - This class generates events for "proper" datahub charts and dashboards - These events will not be visualized anywhere as of 12/11/2020 - """ - @staticmethod - def make_chart_mce(dashboard_element: LookerDashboardElement) -> typing.Dict: - actor, sys_time = get_actor_and_sys_time() - - owners = [{ - "owner": actor, - "type": "DEVELOPER" - }] - - return { - "auditHeader": None, - "proposedSnapshot": ("com.linkedin.pegasus2avro.metadata.snapshot.ChartSnapshot", { - "urn": f"urn:li:chart:(looker,{dashboard_element.get_urn_element_id()})", - "aspects": [ - ("com.linkedin.pegasus2avro.dataset.ChartInfo", { - "title": dashboard_element.title, - "description": "", - "inputs": dashboard_element.get_view_urns(), - "url": f"", - "lastModified": {"created": {"time": sys_time, "actor": actor}} - }), - ("com.linkedin.pegasus2avro.common.Ownership", { - "owners": owners, - "lastModified": { - "time": sys_time, - "actor": actor - } - }) - ] - }), - "proposedDelta": None - } - - @staticmethod - def make_dashboard_mce(looker_dashboard: LookerDashboard) -> DashboardKafkaEvents: - actor, sys_time = get_actor_and_sys_time() - - owners = [{ - "owner": actor, - "type": "DEVELOPER" - }] - - chart_mces = [ProperDatahubEvents.make_chart_mce(element) for element in looker_dashboard.dashboard_elements] - - dashboard_mce = { - "auditHeader": None, - "proposedSnapshot": ("com.linkedin.pegasus2avro.metadata.snapshot.DashboardSnapshot", { - "urn": f"urn:li:dashboard:(looker,{looker_dashboard.get_urn_dashboard_id()},PROD)", - "aspects": [ - ("com.linkedin.pegasus2avro.dataset.DashboardInfo", { - "title": looker_dashboard.title, - "description": looker_dashboard.description, - "charts": [mce["proposedSnapshot"][1]["urn"] for mce in chart_mces], - "url": looker_dashboard.url, - "lastModified": {"created": {"time": sys_time, "actor": actor}} - }), - ("com.linkedin.pegasus2avro.common.Ownership", { - "owners": owners, - "lastModified": { - "time": sys_time, - "actor": actor - } - }) - ] - }), - "proposedDelta": None - } - - return DashboardKafkaEvents(dashboard_mce=dashboard_mce, chart_mces=chart_mces) - - -class WorkaroundDatahubEvents: - """ - This class generates events for "workaround" datahub charts and dashboards - This is so we can display end to end lineage without being blocked on datahub's support for dashboards and charts - - The approach is we generate "charts" and "dashboards" as just "datasets" in datahub under a new platform - We then link them together using "UpstreamLineage" just like any other dataset - """ - @staticmethod - def make_chart_mce(dashboard_element: LookerDashboardElement) -> typing.Dict: - actor, sys_time = get_actor_and_sys_time() - - owners = [{ - "owner": actor, - "type": "DEVELOPER" - }] - - upstreams = [{ - "auditStamp":{ - "time": sys_time, - "actor": actor - }, - "dataset": view_urn, - "type":"TRANSFORMED" - } for view_urn in dashboard_element.get_view_urns()] - - doc_elements = [{ - "url": dashboard_element.url, - "description": "Looker chart url", - "createStamp": { - "time": sys_time, - "actor": actor - } - }] - - return { - "auditHeader": None, - "proposedSnapshot": ("com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot", { - "urn": f"urn:li:dataset:(urn:li:dataPlatform:{VISUALIZATION_DATAHUB_PLATFORM},{dashboard_element.get_urn_element_id()},PROD)", - "aspects": [ - ("com.linkedin.pegasus2avro.dataset.UpstreamLineage", {"upstreams": upstreams}), - ("com.linkedin.pegasus2avro.common.InstitutionalMemory", {"elements": doc_elements}), - ("com.linkedin.pegasus2avro.dataset.DatasetProperties", {"description": dashboard_element.title, "customProperties": {}}), - ("com.linkedin.pegasus2avro.common.Ownership", { - "owners": owners, - "lastModified": { - "time": sys_time, - "actor": actor - } - }) - ] - }), - "proposedDelta": None - } - - @staticmethod - def make_dashboard_mce(looker_dashboard: LookerDashboard) -> DashboardKafkaEvents: - actor, sys_time = get_actor_and_sys_time() - - chart_mces = [WorkaroundDatahubEvents.make_chart_mce(element) for element in looker_dashboard.dashboard_elements] - - owners = [{ - "owner": actor, - "type": "DEVELOPER" - }] - - upstreams = [{ - "auditStamp":{ - "time": sys_time, - "actor": actor - }, - "dataset": chart_urn, - "type":"TRANSFORMED" - } for chart_urn in [mce["proposedSnapshot"][1]["urn"] for mce in chart_mces]] - - doc_elements = [{ - "url": looker_dashboard.url, - "description": "Looker dashboard url", - "createStamp": { - "time": sys_time, - "actor": actor - } - }] - - dashboard_mce = { - "auditHeader": None, - "proposedSnapshot": ("com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot", { - "urn": f"urn:li:dataset:(urn:li:dataPlatform:{VISUALIZATION_DATAHUB_PLATFORM},{looker_dashboard.get_urn_dashboard_id()},PROD)", - "aspects": [ - ("com.linkedin.pegasus2avro.dataset.UpstreamLineage", {"upstreams": upstreams}), - ("com.linkedin.pegasus2avro.common.InstitutionalMemory", {"elements": doc_elements}), - ("com.linkedin.pegasus2avro.dataset.DatasetProperties", {"description": looker_dashboard.title, "customProperties": {}}), - ("com.linkedin.pegasus2avro.common.Ownership", { - "owners": owners, - "lastModified": { - "time": sys_time, - "actor": actor - } - }) - ] - }), - "proposedDelta": None - } - - return DashboardKafkaEvents(dashboard_mce=dashboard_mce, chart_mces=chart_mces) - - -def delivery_report(err, msg): - """ Called once for each message produced to indicate delivery result. - Triggered by poll() or flush(). """ - if err is not None: - print('Message delivery failed: {}'.format(err)) - else: - print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) - - -def make_kafka_producer(extra_kafka_conf): - conf = { - "on_delivery": delivery_report, - **extra_kafka_conf - } - - key_schema = avro.loads('{"type": "string"}') - record_schema = avro.load(AVSC_PATH) - producer = AvroProducer(conf, default_key_schema=key_schema, default_value_schema=record_schema) - return producer - - -def _extract_view_from_field(field: str) -> str: - assert field.count(".") == 1, f"Error: A field must be prefixed by a view name, field is: {field}" - view_name = field.split(".")[0] - return view_name - - -def get_views_from_query(query: Query) -> typing.List[str]: - all_views = set() - - # query.dynamic_fields can contain: - # - looker table calculations: https://docs.looker.com/exploring-data/using-table-calculations - # - looker custom measures: https://docs.looker.com/de/exploring-data/adding-fields/custom-measure - # - looker custom dimensions: https://docs.looker.com/exploring-data/adding-fields/custom-measure#creating_a_custom_dimension_using_a_looker_expression - dynamic_fields = json.loads(query.dynamic_fields if query.dynamic_fields is not None else '[]') - custom_field_to_underlying_field = {} - for field in dynamic_fields: - # Table calculations can only reference fields used in the fields section, so this will always be a subset of of the query.fields - if "table_calculation" in field: - continue - # Looker custom measures can reference fields in arbitrary views, so this needs to be parsed to find the underlying view field the custom measure is based on - if "measure" in field: - measure = field["measure"] - based_on = field["based_on"] - custom_field_to_underlying_field[measure] = based_on - - # Looker custom dimensions can reference fields in arbitrary views, so this needs to be parsed to find the underlying view field the custom measure is based on - # However, unlike custom measures custom dimensions can be defined using an arbitrary expression - # We are not going to support parsing arbitrary Looker expressions here, so going to ignore these fields for now - # TODO: support parsing arbitrary looker expressions - if "dimension" in field: - dimension = field["dimension"] - expression = field["expression"] - custom_field_to_underlying_field[dimension] = None - - # A query uses fields defined in views, find the views those fields use - fields: typing.Sequence[str] = query.fields if query.fields is not None else [] - for field in fields: - # If the field is a custom field, look up the field it is based on - field_name = custom_field_to_underlying_field[field] if field in custom_field_to_underlying_field else field - if field_name is None: - continue - view_name = _extract_view_from_field(field_name) - all_views.add(view_name) - - # A query uses fields for filtering and those fields are defined in views, find the views those fields use - filters: typing.MutableMapping[str, typing.Any] = query.filters if query.filters is not None else {} - for field in filters.keys(): - # If the field is a custom field, look up the field it is based on - field_name = custom_field_to_underlying_field[field] if field in custom_field_to_underlying_field else field - if field_name is None: - continue - view_name = _extract_view_from_field(field_name) - all_views.add(view_name) - - return list(all_views) - - -def get_views_from_look(look: LookWithQuery): - return get_views_from_query(look.query) - - -def get_looker_dashboard_element(element: DashboardElement)-> typing.Optional[LookerDashboardElement]: - # Dashboard elements can use raw queries against explores - if element.query is not None: - views = get_views_from_query(element.query) - return LookerDashboardElement(id=element.id, title=element.title, look_id=None, query_slug=element.query.slug, looker_views=views) - - # Dashboard elements can *alternatively* link to an existing look - if element.look is not None: - views = get_views_from_look(element.look) - return LookerDashboardElement(id=element.id, title=element.title, look_id=element.look_id, query_slug=element.look.query.slug, looker_views=views) - - # This occurs for "text" dashboard elements that just contain static text (ie: no queries) - # There is not much meaningful info to extract from these elements, so ignore them - return None - - -def get_looker_dashboard(dashboard: Dashboard) -> LookerDashboard: - dashboard_elements: typing.List[LookerDashboardElement] = [] - for element in dashboard.dashboard_elements: - looker_dashboard_element = get_looker_dashboard_element(element) - if looker_dashboard_element is not None: - dashboard_elements.append(looker_dashboard_element) - - looker_dashboard = LookerDashboard(id=dashboard.id, title=dashboard.title, description=dashboard.description, dashboard_elements=dashboard_elements) - return looker_dashboard - - -# Perform IO in main -def main(): - kafka_producer = make_kafka_producer(EXTRA_KAFKA_CONF) - sdk = looker_sdk.init31() - dashboard_ids = [dashboard_base.id for dashboard_base in sdk.all_dashboards(fields="id")] - - looker_dashboards = [] - for dashboard_id in dashboard_ids: - try: - fields = ["id", "title", "dashboard_elements", "dashboard_filters"] - dashboard_object = sdk.dashboard(dashboard_id=dashboard_id, fields=",".join(fields)) - except SDKError as e: - # A looker dashboard could be deleted in between the list and the get - print(f"Skipping dashboard with dashboard_id: {dashboard_id}") - print(e) - continue - - looker_dashboard = get_looker_dashboard(dashboard_object) - looker_dashboards.append(looker_dashboard) - pprint(looker_dashboard) - - for looker_dashboard in looker_dashboards: - workaround_dashboard_kafka_events = WorkaroundDatahubEvents.make_dashboard_mce(looker_dashboard) - # Hard to test these events since datahub does not have a UI, for now disable sending them - # proper_dashboard_kafka_events = ProperDatahubEvents.make_dashboard_mce(looker_dashboard) - - for mce in workaround_dashboard_kafka_events.all_mces(): - print(mce) - kafka_producer.produce(topic=KAFKA_TOPIC, key=mce['proposedSnapshot'][1]['urn'], value=mce) - kafka_producer.flush() - - -if __name__ == "__main__": - main() diff --git a/contrib/metadata-ingestion/python/looker/dashboard_ingestion/requirements.txt b/contrib/metadata-ingestion/python/looker/dashboard_ingestion/requirements.txt deleted file mode 100644 index 14c3c7828458b..0000000000000 --- a/contrib/metadata-ingestion/python/looker/dashboard_ingestion/requirements.txt +++ /dev/null @@ -1,4 +0,0 @@ -avro-python3==1.8.2 -confluent-kafka[avro]==1.4.0 -PyYAML==5.4.1 -looker-sdk==0.1.3b20 \ No newline at end of file diff --git a/contrib/metadata-ingestion/python/looker/lookml_ingestion/README.md b/contrib/metadata-ingestion/python/looker/lookml_ingestion/README.md deleted file mode 100644 index 994c123f5f121..0000000000000 --- a/contrib/metadata-ingestion/python/looker/lookml_ingestion/README.md +++ /dev/null @@ -1,20 +0,0 @@ -## lookml_ingestion.py -This script ingests Looker view metadata from lookml into datahub. Looker views are essentially like database views that can be either materialized or ephemeral, so we treat them as you would any other dataset in datahub. - -Underneath the hood, this script uses the `lkml` python parsing library to parse lkml files and so it comes with all the limitations of that underlying parser. - -Roughly how the script works: -- Point the script at a directory on the filesystem, finds all files named `*.model.lkml` in any level of nesting -- Finds the viewfile includes in the model file, this indicates that the viewfile is a part of that model (and a model has a single SQL connection associated with it). Does not handle a model importing a view file but *not* using the view in the model since that would require parsing explore blocks and adds complexity. -- For each viewfile in the model, parses the view files. For each view in the viewfile, resolve the sql table name for the view: - - We do not support parsing derived tables using a `sql:` block, this would require parsing SQL to understand dependencies. We only support views using `sql_table_name`. In the future, could support limited SQL parsing for limited SQL dialects. - - We support views using the `extends` keyword: https://docs.looker.com/reference/view-params/extends-for-view This is surprisingly painful because views can extend other views in other files. We do this inefficiently right now. - - We do not support views using `refinements`. SpotHero does not use refinements right now, so we had no need to implement it: https://docs.looker.com/data-modeling/learning-lookml/refinements -- After binding views to models and finding the sql table name associated with the views, we generate the MCE events into a separate looker platform in datahub since they are not "real" tables but "virtual" looker tables - -## Steps -- Use a version of python >= 3.7 -- Make a virtual environment -- pip install -r requirements.txt -- Set env var: LOOKER_DIRECTORY to the root path of lkml on your filesystem -- Modify EXTRA_KAFKA_CONF section of script to point to datahub \ No newline at end of file diff --git a/contrib/metadata-ingestion/python/looker/lookml_ingestion/lookml_ingestion.py b/contrib/metadata-ingestion/python/looker/lookml_ingestion/lookml_ingestion.py deleted file mode 100644 index a73583e1eb0da..0000000000000 --- a/contrib/metadata-ingestion/python/looker/lookml_ingestion/lookml_ingestion.py +++ /dev/null @@ -1,343 +0,0 @@ -import lkml -import glob -import time -import typing -import os -import re - -from confluent_kafka import avro -from confluent_kafka.avro import AvroProducer - -from dataclasses import dataclass, replace - -from sql_metadata import get_query_tables - -# Configuration -AVSC_PATH = "../../metadata-events/mxe-schemas/src/renamed/avro/com/linkedin/mxe/MetadataChangeEvent.avsc" -KAFKA_TOPIC = 'MetadataChangeEvent_v4' - -# LOOKER_DIRECTORY = "./test_lookml" -LOOKER_DIRECTORY = os.environ["LOOKER_DIRECTORY"] -LOOKER_DIRECTORY = os.path.abspath(LOOKER_DIRECTORY) - -EXTRA_KAFKA_CONF = { - 'bootstrap.servers': 'localhost:9092', - 'schema.registry.url': 'http://localhost:8081' - # 'security.protocol': 'SSL', - # 'ssl.ca.location': '', - # 'ssl.key.location': '', - # 'ssl.certificate.location': '' -} - -# The datahub platform where looker views are stored -LOOKER_VIEW_PLATFORM = "looker_views" - -class LookerViewFileLoader: - """ - Loads the looker viewfile at a :path and caches the LookerViewFile in memory - This is to avoid reloading the same file off of disk many times during the recursive include resolution process - """ - - def __init__(self): - self.viewfile_cache = {} - - def _load_viewfile(self, path: str) -> typing.Optional["LookerViewFile"]: - if path in self.viewfile_cache: - return self.viewfile_cache[path] - - try: - with open(path, "r") as file: - parsed = lkml.load(file) - looker_viewfile = LookerViewFile.from_looker_dict(path, parsed) - self.viewfile_cache[path] = looker_viewfile - return looker_viewfile - except Exception as e: - print(e) - print(f"Error processing view file {path}. Skipping it") - - def load_viewfile(self, path: str, connection: str): - viewfile = self._load_viewfile(path) - if viewfile is None: - return None - - return replace(viewfile, connection=connection) - - - -@dataclass -class LookerModel: - connection: str - includes: typing.List[str] - resolved_includes: typing.List[str] - - @staticmethod - def from_looker_dict(looker_model_dict): - connection = looker_model_dict["connection"] - includes = looker_model_dict["includes"] - resolved_includes = LookerModel.resolve_includes(includes) - - return LookerModel(connection=connection, includes=includes, resolved_includes=resolved_includes) - - @staticmethod - def resolve_includes(includes) -> typing.List[str]: - resolved = [] - for inc in includes: - # Massage the looker include into a valid glob wildcard expression - glob_expr = f"{LOOKER_DIRECTORY}/{inc}" - outputs = glob.glob(glob_expr) - resolved.extend(outputs) - return resolved - - -@dataclass -class LookerViewFile: - absolute_file_path: str - connection: typing.Optional[str] - includes: typing.List[str] - resolved_includes: typing.List[str] - views: typing.List[typing.Dict] - - @staticmethod - def from_looker_dict(absolute_file_path, looker_view_file_dict): - includes = looker_view_file_dict.get("includes", []) - resolved_includes = LookerModel.resolve_includes(includes) - views = looker_view_file_dict.get("views", []) - - return LookerViewFile(absolute_file_path=absolute_file_path, connection=None, includes=includes, resolved_includes=resolved_includes, views=views) - -@dataclass -class LookerView: - absolute_file_path: str - connection: str - view_name: str - sql_table_names: typing.List[str] - - def get_relative_file_path(self): - if LOOKER_DIRECTORY in self.absolute_file_path: - return self.absolute_file_path.replace(LOOKER_DIRECTORY, '').lstrip('/') - else: - raise Exception(f"Found a looker view with name: {view_name} at path: {absolute_file_path} not underneath the base LOOKER_DIRECTORY: {LOOKER_DIRECTORY}. This should not happen") - - @staticmethod - def from_looker_dict(looker_view, connection: str, looker_viewfile: LookerViewFile, looker_viewfile_loader: LookerViewFileLoader) -> typing.Optional["LookerView"]: - view_name = looker_view["name"] - sql_table_name = looker_view.get("sql_table_name", None) - # Some sql_table_name fields contain quotes like: optimizely."group", just remove the quotes - sql_table_name = sql_table_name.replace('"', '') if sql_table_name is not None else None - derived_table = looker_view.get("derived_table", None) - - # Parse SQL from derived tables to extract dependencies - if derived_table is not None and 'sql' in derived_table: - # Get the list of tables in the query - sql_tables: typing.List[str] = get_query_tables(derived_table['sql']) - - # Remove temporary tables from WITH statements - sql_table_names = [t for t in sql_tables if not re.search(f'WITH(.*,)?\s+{t}(\s*\([\w\s,]+\))?\s+AS\s+\(', derived_table['sql'], re.IGNORECASE|re.DOTALL)] - - # Remove quotes from tables - sql_table_names = [t.replace('"', '') for t in sql_table_names] - - return LookerView(absolute_file_path=looker_viewfile.absolute_file_path, connection=connection, view_name=view_name, sql_table_names=sql_table_names) - - # There is a single dependency in the view, on the sql_table_name - if sql_table_name is not None: - return LookerView(absolute_file_path=looker_viewfile.absolute_file_path, connection=connection, view_name=view_name, sql_table_names=[sql_table_name]) - - # The sql_table_name might be defined in another view and this view is extending that view, try to find it - else: - extends = looker_view.get("extends", []) - if len(extends) == 0: - # The view is malformed, the view is not a derived table, does not contain a sql_table_name or an extends - print(f"Skipping malformed with view_name: {view_name}. View should have a sql_table_name if it is not a derived table") - return None - - extends_to_looker_view = [] - - # The base view could live in the same file - for raw_view in looker_viewfile.views: - raw_view_name = raw_view["name"] - # Make sure to skip loading view we are currently trying to resolve - if raw_view_name != view_name: - maybe_looker_view = LookerView.from_looker_dict(raw_view, connection, looker_viewfile, looker_viewfile_loader) - if maybe_looker_view is not None and maybe_looker_view.view_name in extends: - extends_to_looker_view.append(maybe_looker_view) - - # Or it could live in one of the included files, we do not know which file the base view lives in, try them all! - for include in looker_viewfile.resolved_includes: - looker_viewfile = looker_viewfile_loader.load_viewfile(include, connection) - if looker_viewfile is not None: - for view in looker_viewfile.views: - maybe_looker_view = LookerView.from_looker_dict(view, connection, looker_viewfile, looker_viewfile_loader) - if maybe_looker_view is None: - continue - - if maybe_looker_view is not None and maybe_looker_view.view_name in extends: - extends_to_looker_view.append(maybe_looker_view) - - if len(extends_to_looker_view) != 1: - print(f"Skipping malformed view with view_name: {view_name}. View should have a single view in a view inheritance chain with a sql_table_name") - return None - - output_looker_view = LookerView(absolute_file_path=looker_viewfile.absolute_file_path, connection=connection, view_name=view_name, sql_table_names=extends_to_looker_view[0].sql_table_names) - return output_looker_view - - - - -def get_platform_and_table(view_name: str, connection: str, sql_table_name: str): - """ - This will depend on what database connections you use in Looker - For SpotHero, we had two database connections in Looker: "redshift_test" (a redshift database) and "presto" (a presto database) - Presto supports querying across multiple catalogs, so we infer which underlying database presto is using based on the presto catalog name - For SpotHero, we have 3 catalogs in presto: "redshift", "hive", and "hive_emr" - """ - if connection == "redshift_test": - platform = "redshift" - table_name = sql_table_name - return platform, table_name - - elif connection == "presto": - parts = sql_table_name.split(".") - catalog = parts[0] - - if catalog == "hive": - platform = "hive" - elif catalog == "hive_emr": - platform = "hive_emr" - elif catalog == "redshift": - platform = "redshift" - else: - # Looker lets you exclude a catalog and use a configured default, the default we have configured is to use hive_emr - if sql_table_name.count(".") != 1: - raise Exception("Unknown catalog for sql_table_name: {sql_table_name} for view_name: {view_name}") - - platform = "hive_emr" - return platform, sql_table_name - - table_name = ".".join(parts[1::]) - return platform, table_name - else: - raise Exception(f"Could not find a platform for looker view with connection: {connection}") - - -def construct_datalineage_urn(view_name: str, connection: str, sql_table_name: str): - platform, table_name = get_platform_and_table(view_name, connection, sql_table_name) - return f"urn:li:dataset:(urn:li:dataPlatform:{platform},{table_name},PROD)" - -def construct_data_urn(looker_view: LookerView): - return f"urn:li:dataset:(urn:li:dataPlatform:{LOOKER_VIEW_PLATFORM},{looker_view.view_name},PROD)" - - - -def build_dataset_mce(looker_view: LookerView): - """ - Creates MetadataChangeEvent for the dataset, creating upstream lineage links - """ - actor, sys_time = "urn:li:corpuser:etl", int(time.time()) * 1000 - - upstreams = [{ - "auditStamp":{ - "time": sys_time, - "actor":actor - }, - "dataset": construct_datalineage_urn(looker_view.view_name, looker_view.connection, sql_table_name), - "type":"TRANSFORMED" - } for sql_table_name in looker_view.sql_table_names] - - - doc_elements = [{ - "url":f"https://github.com/spothero/internal-looker-repo/blob/master/{looker_view.get_relative_file_path()}", - "description":"Github looker view definition", - "createStamp":{ - "time": sys_time, - "actor": actor - } - }] - - owners = [{ - "owner": f"urn:li:corpuser:analysts", - "type": "DEVELOPER" - }] - - return { - "auditHeader": None, - "proposedSnapshot":("com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot", { - "urn": construct_data_urn(looker_view), - "aspects": [ - ("com.linkedin.pegasus2avro.dataset.UpstreamLineage", {"upstreams": upstreams}), - ("com.linkedin.pegasus2avro.common.InstitutionalMemory", {"elements": doc_elements}), - ("com.linkedin.pegasus2avro.common.Ownership", { - "owners": owners, - "lastModified":{ - "time": sys_time, - "actor": actor - } - }) - ] - }), - "proposedDelta": None - } - - -def delivery_report(err, msg): - """ Called once for each message produced to indicate delivery result. - Triggered by poll() or flush(). """ - if err is not None: - print('Message delivery failed: {}'.format(err)) - else: - print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) - - -def make_kafka_producer(extra_kafka_conf): - conf = { - "on_delivery": delivery_report, - **extra_kafka_conf - } - - key_schema = avro.loads('{"type": "string"}') - record_schema = avro.load(AVSC_PATH) - producer = AvroProducer(conf, default_key_schema=key_schema, default_value_schema=record_schema) - return producer - - -def main(): - kafka_producer = make_kafka_producer(EXTRA_KAFKA_CONF) - viewfile_loader = LookerViewFileLoader() - - looker_models = [] - all_views = [] - - model_files = sorted(f for f in glob.glob(f"{LOOKER_DIRECTORY}/**/*.model.lkml", recursive=True)) - for f in model_files: - try: - with open(f, 'r') as file: - parsed = lkml.load(file) - looker_model = LookerModel.from_looker_dict(parsed) - looker_models.append(looker_model) - except Exception as e: - print(e) - print(f"Error processing model file {f}. Skipping it") - - - - for model in looker_models: - for include in model.resolved_includes: - looker_viewfile = viewfile_loader.load_viewfile(include, model.connection) - if looker_viewfile is not None: - for raw_view in looker_viewfile.views: - maybe_looker_view = LookerView.from_looker_dict(raw_view, model.connection, looker_viewfile, viewfile_loader) - if maybe_looker_view: - all_views.append(maybe_looker_view) - - - for view in all_views: - MCE = build_dataset_mce(view) - print(view) - print(MCE) - kafka_producer.produce(topic=KAFKA_TOPIC, key=MCE['proposedSnapshot'][1]['urn'], value=MCE) - kafka_producer.flush() - - - -if __name__ == "__main__": - main() diff --git a/contrib/metadata-ingestion/python/looker/lookml_ingestion/requirements.txt b/contrib/metadata-ingestion/python/looker/lookml_ingestion/requirements.txt deleted file mode 100644 index 8f9fd5d999990..0000000000000 --- a/contrib/metadata-ingestion/python/looker/lookml_ingestion/requirements.txt +++ /dev/null @@ -1,4 +0,0 @@ -lkml==1.1.0 -avro-python3==1.8.2 -confluent-kafka[avro]==1.4.0 -sql-metadata==1.12.0 diff --git a/contrib/nix/README.md b/contrib/nix/README.md deleted file mode 100644 index d768f08edb0bd..0000000000000 --- a/contrib/nix/README.md +++ /dev/null @@ -1,67 +0,0 @@ -# Nix sandbox for datahub - - -## Introduction -database is not suitable for virtualization for it's io performance. - -so we use simple nix package tool to install package and setup service on physical machine. - -we declare it, then it works. see [sandbox.nix] file for details. - -it install software on /nix directory, and run service on launchpad(darwin) and systemd(linux). - -NOTE: for linux, ensure 'systemd --user' process running. - - -## Roadmap - -- [X] support mac and linux -- [ ] add environment check script -- [ ] add datahub nix package -- [ ] add datahub[gms, frontend, pipeline] service module -- [ ] add nixops distributed deploy - - -## Quickstart -1. install nix and channel - -``` - sudo install -d -m755 -o $(id -u) -g $(id -g) /nix - curl https://nixos.org/nix/install | sh - - nix-channel --add https://nixos.org/channels/nixos-20.03 nixpkgs - nix-channel --update nixpkgs -``` - -2. install home-manager - -``` - nix-channel --add https://github.com/clojurians-org/home-manager/archive/v1.0.0.tar.gz home-manager - nix-channel --update home-manager - NIX_PATH=~/.nix-defexpr/channels nix-shell '' -A install -``` - -3. setup environment, and well done! -``` - NIX_PATH=~/.nix-defexpr/channels home-manager -f sandbox.nix switch -``` - -## Client connect -``` -mysql => mysql -u root -S /nix/var/run/mysqld.sock -postgresql => psql -h /nix/var/run postgres -elasticsearch => curl http://localhost:9200 -neo4j => cypher-shell -uneo4j -pneo4j -zookeeper => zkCli.sh -kafka => kafka-topics.sh --bootstrap-server localhost:9092 --list -confluent schema-registry => curl http://localhost:8081 - -``` - -## Environemnt Check - -you only need install nix to run it! - -``` -nix-shell datahub-check.nix -A gms -``` diff --git a/contrib/nix/datahub-check.nix b/contrib/nix/datahub-check.nix deleted file mode 100644 index aee544283f8e1..0000000000000 --- a/contrib/nix/datahub-check.nix +++ /dev/null @@ -1,99 +0,0 @@ -{ pkgs ? import {} }: - -with pkgs ; -let - datahub = import ./datahub-config.nix ; - build-prompt = '' - echo This derivation is not buildable, instead run it using nix-shell. - exit 1 - '' ; - parse-uri = uri : - let - uriSchemaSplit = builtins.split "://" uri ; - schema = builtins.head uriSchemaSplit ; - uriNoSchema = lib.last uriSchemaSplit ; - - uriPathSplit = builtins.split "/" uriNoSchema ; - hostPort = builtins.head uriPathSplit ; - path = lib.optionalString (builtins.length uriPathSplit > 1) (lib.last uriPathSplit) ; - - hostPortSplit = builtins.split ":" hostPort ; - host = builtins.head hostPortSplit ; - port = lib.last hostPortSplit ; - - in { inherit schema host port path ; } ; - gms = - let - gms-conf = datahub.services.linkedin-datahub-gms ; - jdbc-uri = parse-uri gms-conf.sandbox.jdbc.uri ; - elasticsearch-uri = parse-uri (builtins.head gms-conf.sandbox.elasticsearch.uris) ; - neo4j-uri = parse-uri gms-conf.sandbox.neo4j.uri ; - kafka-uri = parse-uri (builtins.head gms-conf.sandbox.kafka.uris) ; - schema-registry-uri = parse-uri (builtins.head gms-conf.sandbox.schema-registry.uris) ; - gms-uri = parse-uri gms-conf.listener ; - - check-port = name : uri : '' - echo " [${name}] checking port..." - ${netcat-gnu}/bin/nc -z ${uri.host} ${uri.port} - if [ $? != 0 ]; then echo " [${name}] !ERROR: can not connec to ${uri.host}:${uri.port}" && exit 1; fi - '' ; - check-jdbc-user = '' - # echo " [jdbc] checking username and password..." - '' ; - check-jdbc-table = '' - # echo " [jdbc] checking [metadata_aspect] table..." - '' ; - check-elasticsearch-index = '' - # echo " [elasticsearch] checking [corpuserinfodocument, datasetdocument] indices ..." - '' ; - check-neo4j-user = '' - # echo " [neo4j] checking user and password..." - '' ; - check-kafka-topic = '' - # echo " [kafka] checking [MetadataChangeEvent, MetadataAuditEvent] indices..." - '' ; - in - stdenv.mkDerivation { - name = "gms-check" ; - - buildInputs = [ netcat-gnu ] ; - - preferLocalBuild = true ; - buildCommand = build-prompt ; - - shellHookOnly = true; - shellHook = '' - echo "******** checking sandbox.jdbc " - ${check-port "jdbc" jdbc-uri} - ${check-jdbc-user } - ${check-jdbc-table } - - echo "******** checking sandbox.elasticsearch " - ${check-port "elasticsearch" elasticsearch-uri} - ${check-elasticsearch-index} - - echo "******** checking sandbox.neo4j " - ${check-port "neo4j" neo4j-uri} - ${check-neo4j-user } - - echo "******** checking sandbox.kafka " - ${check-port "kafka" kafka-uri} - ${check-kafka-topic } - - echo "******** checking sandbox.schema-registry " - ${check-port "schema-registry" schema-registry-uri} - - echo "******** checking gms " - ${check-port "gms" gms-uri} - exit 0 - '' ; - } ; - frontend = - let - frontend-conf = ddatahub.services.linkedin-datahub-frontend ; - in {} ; - pipeline = - let - pipeline-conf = ddatahub.services.linkedin-datahub-pipeline ; - in {} ; -in { inherit gms frontend pipeline;} diff --git a/contrib/nix/datahub-config.nix b/contrib/nix/datahub-config.nix deleted file mode 100644 index 25a212b807aa7..0000000000000 --- a/contrib/nix/datahub-config.nix +++ /dev/null @@ -1,31 +0,0 @@ -{ - services.linkedin-datahub-gms = { - enable = true; - sandbox = { - jdbc.uri = "jdbc:postgresql://localhost:5432/datahub" ; - jdbc.username = "datahub" ; - jdbc.password = "datahub" ; - elasticsearch.uris = [ "http://localhost:9200" ] ; - neo4j.uri = "bolt://localhost:7687" ; - neo4j.username = "neo4j" ; - neo4j.password = "datahub" ; - kafka.uris = [ "PLAINTEXT://localhost:9092" ] ; - schema-registry.uris = [ "http://localhost:8081" ] ; - } ; - listener = "http://localhost:8080" ; - } ; - - services.linkedin-datahub-frontend = { - enable = true ; - listener = "http://localhost:9001" ; - linkedin-datahub-gms.uri = "http://localhost:8080" ; - } ; - services.linkedin-datahub-pipeline = { - enable = true ; - linkedin-datahub-gms.uri = "http://localhost:8080" ; - sandbox = { - kafka.uris = [ "PLAINTEXT://localhost:9092" ] ; - schema-registry.uris = [ "http://localhost:8081" ] ; - } ; - } ; -} diff --git a/contrib/nix/sandbox.nix b/contrib/nix/sandbox.nix deleted file mode 100644 index a6f2b95529c8b..0000000000000 --- a/contrib/nix/sandbox.nix +++ /dev/null @@ -1,80 +0,0 @@ -{ config, pkgs, ... }: - -{ - # Let Home Manager install and manage itself. - programs.home-manager.enable = true; - - # This value determines the Home Manager release that your - # configuration is compatible with. This helps avoid breakage - # when a new Home Manager release introduces backwards - # incompatible changes. - # - # You can update Home Manager without changing this value. See - # the Home Manager release notes for a list of state version - # changes in each release. - home.stateVersion = "19.09"; - - - environment.systemPackages = [ - pkgs.gradle - - pkgs.postgresql_11 - pkgs.mysql57 - pkgs.elasticsearch - pkgs.neo4j - pkgs.zookeeper - pkgs.apacheKafka - pkgs.confluent-platform - pkgs.kafkacat - pkgs.neo4j - ]; - - services.postgresql = { - enable = true ; - package = pkgs.postgresql_11 ; - dataDir = "/opt/nix-module/data/postgresql" ; - } ; - - services.mysql = { - enable = true ; - # package = pkgs.mysql80 ; - package = pkgs.mysql57 ; - dataDir = "/opt/nix-module/data/mysql" ; - } ; - - services.elasticsearch = { - enable = true ; - # package = pkgs.elasticsearch7 ; - package = pkgs.elasticsearch ; - dataDir = "/opt/nix-module/data/elasticsearch" ; - } ; - - services.neo4j = { - enable = true ; - package = pkgs.neo4j ; - directories.home = "/opt/nix-module/data/neo4j" ; - } ; - - services.zookeeper = { - enable = true ; - package = pkgs.zookeeper ; - dataDir = "/opt/nix-module/data/zookeeper" ; - } ; - - services.apache-kafka = { - enable = true ; - package = pkgs.apacheKafka ; - logDirs = [ "/opt/nix-module/data/kafka" ] ; - zookeeper = "localhost:2181" ; - extraProperties = '' - offsets.topic.replication.factor = 1 - zookeeper.session.timeout.ms = 600000 - '' ; - } ; - - services.confluent-schema-registry = { - enable = true ; - package = pkgs.confluent-platform ; - kafkas = [ "PLAINTEXT://localhost:9092" ] ; - } ; -} diff --git a/metadata-ingestion-examples/README.md b/metadata-ingestion-examples/README.md deleted file mode 100644 index d19438481f562..0000000000000 --- a/metadata-ingestion-examples/README.md +++ /dev/null @@ -1,40 +0,0 @@ -# Metadata Ingestion - -**LEGACY** -This is a legacy module. The examples here are not actively maintained and may not work as described. Please see the `metadata-ingestion` module for more up-to-date uses. - -This directory contains example apps for ingesting data into DataHub. - -You are more than welcome to use these examples directly, or use them as a reference for you own jobs. - -See the READMEs of each example for more information on each. - -### Common themes - -All these examples ingest by firing MetadataChangeEvent Kafka events. They do not ingest directly into DataHub, though -this is possible. Instead, the mce-consumer-job should be running, listening for these events, and perform the ingestion -for us. - -### A note on languages - -We initially wrote these examples in Python (they still exist in `contrib`). The idea was that these were very small -example scripts, that should've been easy to use. However, upon reflection, not all developers are familiar with Python, -and the lack of types can hinder development. So the decision was made to port the examples to Java. - -You're more than welcome to extrapolate these examples into whatever languages you like. At LinkedIn, we primarily use -Java. - -### Ingestion at LinkedIn - -It is worth noting that we do not use any of these examples directly (in Java, Python, or anything else) at LinkedIn. We -have several different pipelines for ingesting data; it all depends on the source. - -- Some pipelines are based off other Kafka events, where we'll transform some existing Kafka event to a metadata event. - - For example, we get Kafka events hive changes. We make MCEs out of those hive events to ingest hive data. -- For others, we've directly instrumented existing pipelines / apps / jobs to also emit metadata events. -- For others still, we've created a series offline jobs to ingest data. - - For example, we have an Azkaban job to process our HDFS datasets. - -For some sources of data one of these example scripts may work fine. For others, it may make more sense to have some -custom logic, like the above list. Namely, all these examples today are one-off (they run, fire events, and then stop), -you may wish to build continuous ingestion pipelines instead. diff --git a/metadata-ingestion-examples/common/build.gradle b/metadata-ingestion-examples/common/build.gradle deleted file mode 100644 index d2d3637f6892c..0000000000000 --- a/metadata-ingestion-examples/common/build.gradle +++ /dev/null @@ -1,29 +0,0 @@ -plugins { - id 'java' -} - -dependencies { - compile project(':metadata-dao-impl:kafka-producer') - - compile externalDependency.javaxInject - compile externalDependency.kafkaAvroSerde - compile externalDependency.lombok - compile externalDependency.springBeans - compile externalDependency.springBootAutoconfigure - compile externalDependency.springCore - compile externalDependency.springKafka - compile externalDependency.zookeeper - - annotationProcessor externalDependency.lombok - - runtime externalDependency.logbackClassic - - constraints { - implementation("org.apache.logging.log4j:log4j-core:2.17.0") { - because("previous versions are vulnerable to CVE-2021-45105") - } - implementation("org.apache.logging.log4j:log4j-api:2.17.0") { - because("previous versions are vulnerable to CVE-2021-45105") - } - } -} diff --git a/metadata-ingestion-examples/common/src/main/java/com/linkedin/metadata/examples/configs/KafkaConfig.java b/metadata-ingestion-examples/common/src/main/java/com/linkedin/metadata/examples/configs/KafkaConfig.java deleted file mode 100644 index c5a1452978779..0000000000000 --- a/metadata-ingestion-examples/common/src/main/java/com/linkedin/metadata/examples/configs/KafkaConfig.java +++ /dev/null @@ -1,42 +0,0 @@ -package com.linkedin.metadata.examples.configs; - -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; -import io.confluent.kafka.serializers.KafkaAvroSerializer; -import java.util.Arrays; -import java.util.Map; -import org.apache.avro.generic.GenericRecord; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.kafka.KafkaProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - - -@Configuration -public class KafkaConfig { - @Value("${KAFKA_BOOTSTRAP_SERVER:localhost:9092}") - private String kafkaBootstrapServers; - - @Value("${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081}") - private String kafkaSchemaRegistryUrl; - - @Bean(name = "kafkaProducer") - public Producer kafkaProducerFactory(KafkaProperties properties) { - KafkaProperties.Producer producerProps = properties.getProducer(); - - producerProps.setKeySerializer(StringSerializer.class); - producerProps.setValueSerializer(KafkaAvroSerializer.class); - - // KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS - if (kafkaBootstrapServers != null && kafkaBootstrapServers.length() > 0) { - producerProps.setBootstrapServers(Arrays.asList(kafkaBootstrapServers.split(","))); - } // else we rely on KafkaProperties which defaults to localhost:9092 - - Map props = properties.buildProducerProperties(); - props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaSchemaRegistryUrl); - - return new KafkaProducer<>(props); - } -} diff --git a/metadata-ingestion-examples/common/src/main/java/com/linkedin/metadata/examples/configs/SchemaRegistryConfig.java b/metadata-ingestion-examples/common/src/main/java/com/linkedin/metadata/examples/configs/SchemaRegistryConfig.java deleted file mode 100644 index ceb1d05d6a7d9..0000000000000 --- a/metadata-ingestion-examples/common/src/main/java/com/linkedin/metadata/examples/configs/SchemaRegistryConfig.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.linkedin.metadata.examples.configs; - -import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - - -@Configuration -public class SchemaRegistryConfig { - @Value("${SCHEMAREGISTRY_URL:http://localhost:8081}") - private String schemaRegistryUrl; - - @Bean(name = "schemaRegistryClient") - public SchemaRegistryClient schemaRegistryFactory() { - return new CachedSchemaRegistryClient(schemaRegistryUrl, 512); - } -} diff --git a/metadata-ingestion-examples/common/src/main/java/com/linkedin/metadata/examples/configs/TopicConventionFactory.java b/metadata-ingestion-examples/common/src/main/java/com/linkedin/metadata/examples/configs/TopicConventionFactory.java deleted file mode 100644 index eeb375355888e..0000000000000 --- a/metadata-ingestion-examples/common/src/main/java/com/linkedin/metadata/examples/configs/TopicConventionFactory.java +++ /dev/null @@ -1,52 +0,0 @@ -package com.linkedin.metadata.examples.configs; - -import com.linkedin.mxe.TopicConvention; -import com.linkedin.mxe.TopicConventionImpl; -import com.linkedin.mxe.Topics; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - - -/** - * Creates a {@link TopicConvention} to generate kafka metadata event topic names. - * - *

This allows you to easily override Kafka topic names within your organization. - */ -@Configuration -public class TopicConventionFactory { - public static final String TOPIC_CONVENTION_BEAN = "metadataKafkaTopicConvention"; - - @Value("${METADATA_CHANGE_EVENT_NAME:" + Topics.METADATA_CHANGE_EVENT + "}") - private String metadataChangeEventName; - - @Value("${METADATA_AUDIT_EVENT_NAME:" + Topics.METADATA_AUDIT_EVENT + "}") - private String metadataAuditEventName; - - @Value("${FAILED_METADATA_CHANGE_EVENT_NAME:" + Topics.FAILED_METADATA_CHANGE_EVENT + "}") - private String failedMetadataChangeEventName; - - @Value("${METADATA_CHANGE_PROPOSAL_TOPIC_NAME:" + Topics.METADATA_CHANGE_PROPOSAL + "}") - private String metadataChangeProposalName; - - @Value("${METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME:" + Topics.METADATA_CHANGE_LOG_VERSIONED + "}") - private String metadataChangeLogVersionedTopicName; - - @Value("${METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME:" + Topics.METADATA_CHANGE_LOG_TIMESERIES + "}") - private String metadataChangeLogTimeseriesTopicName; - - @Value("${FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME:" + Topics.FAILED_METADATA_CHANGE_PROPOSAL + "}") - private String failedMetadataChangeProposalName; - - @Value("${PLATFORM_EVENT_TOPIC_NAME:" + Topics.PLATFORM_EVENT + "}") - private String platformEventTopicName; - - @Bean(name = TOPIC_CONVENTION_BEAN) - protected TopicConvention createInstance() { - return new TopicConventionImpl(metadataChangeEventName, metadataAuditEventName, failedMetadataChangeEventName, - metadataChangeProposalName, metadataChangeLogVersionedTopicName, metadataChangeLogTimeseriesTopicName, - failedMetadataChangeProposalName, platformEventTopicName, - // TODO once we start rolling out v5 add support for changing the new event names. - TopicConventionImpl.DEFAULT_EVENT_PATTERN); - } -} diff --git a/metadata-ingestion-examples/common/src/main/java/com/linkedin/metadata/examples/configs/ZooKeeperConfig.java b/metadata-ingestion-examples/common/src/main/java/com/linkedin/metadata/examples/configs/ZooKeeperConfig.java deleted file mode 100644 index 2923cd1a7ee64..0000000000000 --- a/metadata-ingestion-examples/common/src/main/java/com/linkedin/metadata/examples/configs/ZooKeeperConfig.java +++ /dev/null @@ -1,26 +0,0 @@ -package com.linkedin.metadata.examples.configs; - -import java.io.IOException; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - - -@Configuration -public class ZooKeeperConfig { - @Value("${ZOOKEEPER:localhost:2181}") - private String zookeeper; - - @Value("${ZOOKEEPER_TIMEOUT_MILLIS:3000}") - private int timeoutMillis; - - @Bean(name = "zooKeeper") - public ZooKeeper zooKeeperFactory() throws IOException { - Watcher noopWatcher = event -> { - }; - - return new ZooKeeper(zookeeper, timeoutMillis, noopWatcher); - } -} diff --git a/metadata-ingestion-examples/kafka-etl/README.md b/metadata-ingestion-examples/kafka-etl/README.md deleted file mode 100644 index 2d5aa3d07bd25..0000000000000 --- a/metadata-ingestion-examples/kafka-etl/README.md +++ /dev/null @@ -1,40 +0,0 @@ -# Kafka ETL - -A small application which reads existing Kafka topics from ZooKeeper, retrieves their schema from the schema registry, -and then fires an MCE for each schema. - -## Running the Application - -First, ensure that services this depends on, like schema registry / zookeeper / mce-consumer-job / gms / etc, are all -running. - -This application can be run via gradle: - -``` -./gradlew :metadata-ingestion-examples:kafka-etl:bootRun -``` - -Or by building and running the jar: - -``` -./gradlew :metadata-ingestion-examples:kafka-etl:build - -java -jar metadata-ingestion-examples/kafka-etl/build/libs/kafka-etl.jar -``` - -### Environment Variables - -See the files under `src/main/java/com/linkedin/metadata/examples/kafka/config` for a list of customizable spring -environment variables. - -### Common pitfalls - -For events to be fired correctly, schemas must exist in the schema registry. If a topic was newly created, but no schema -has been registered for it yet, this application will fail to retrieve the schema for that topic. Check the output of -the application to see if this happens. If you see a message like - -``` -io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401 -``` - -Then the odds are good that you need to register the schema for this topic. \ No newline at end of file diff --git a/metadata-ingestion-examples/kafka-etl/build.gradle b/metadata-ingestion-examples/kafka-etl/build.gradle deleted file mode 100644 index 0ad4da77888a1..0000000000000 --- a/metadata-ingestion-examples/kafka-etl/build.gradle +++ /dev/null @@ -1,36 +0,0 @@ -plugins { - id 'org.springframework.boot' - id 'java' -} - -dependencies { - compile project(':metadata-utils') - compile project(':metadata-dao-impl:kafka-producer') - compile project(':metadata-events:mxe-schemas') - compile project(':metadata-ingestion-examples:common') - - compile externalDependency.javaxInject - compile externalDependency.kafkaAvroSerde - compile externalDependency.lombok - compile externalDependency.springBeans - compile externalDependency.springBootAutoconfigure - compile externalDependency.springCore - compile externalDependency.springKafka - - annotationProcessor externalDependency.lombok - - runtime externalDependency.logbackClassic - - constraints { - implementation("org.apache.logging.log4j:log4j-core:2.17.0") { - because("previous versions are vulnerable to CVE-2021-45105") - } - implementation("org.apache.logging.log4j:log4j-api:2.17.0") { - because("previous versions are vulnerable to CVE-2021-45105") - } - } -} - -bootJar { - mainClassName = 'com.linkedin.metadata.examples.kafka.KafkaEtlApplication' -} diff --git a/metadata-ingestion-examples/kafka-etl/src/main/java/com/linkedin/metadata/examples/kafka/KafkaEtl.java b/metadata-ingestion-examples/kafka-etl/src/main/java/com/linkedin/metadata/examples/kafka/KafkaEtl.java deleted file mode 100644 index eb5752052c53a..0000000000000 --- a/metadata-ingestion-examples/kafka-etl/src/main/java/com/linkedin/metadata/examples/kafka/KafkaEtl.java +++ /dev/null @@ -1,121 +0,0 @@ -package com.linkedin.metadata.examples.kafka; - -import com.linkedin.common.AuditStamp; -import com.linkedin.common.FabricType; -import com.linkedin.common.urn.CorpuserUrn; -import com.linkedin.common.urn.DataPlatformUrn; -import com.linkedin.common.urn.DatasetUrn; -import com.linkedin.metadata.aspect.DatasetAspect; -import com.linkedin.metadata.dao.producer.KafkaMetadataEventProducer; -import com.linkedin.metadata.examples.configs.TopicConventionFactory; -import com.linkedin.metadata.snapshot.DatasetSnapshot; -import com.linkedin.mxe.MetadataChangeEvent; -import com.linkedin.mxe.TopicConvention; -import com.linkedin.schema.KafkaSchema; -import com.linkedin.schema.SchemaField; -import com.linkedin.schema.SchemaFieldArray; -import com.linkedin.schema.SchemaFieldDataType; -import com.linkedin.schema.SchemaMetadata; -import com.linkedin.schema.StringType; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import java.util.List; -import javax.inject.Inject; -import javax.inject.Named; -import lombok.extern.slf4j.Slf4j; -import org.apache.avro.generic.GenericRecord; -import org.apache.kafka.clients.producer.Producer; -import org.apache.zookeeper.ZooKeeper; -import org.springframework.boot.CommandLineRunner; -import org.springframework.stereotype.Component; - -/** - * Gathers Kafka topics from the local zookeeper instance and schemas from the schema registry, and then fires - * MetadataChangeEvents for their schemas. - * - *

This should cause DataHub to be populated with this information, assuming it and the mce-consumer-job are running - * locally. - * - *

Can be run with {@code ./gradlew :metadata-ingestion-examples:java:kafka-etl:bootRun}. - */ -@Slf4j -@Component -public final class KafkaEtl implements CommandLineRunner { - private static final DataPlatformUrn KAFKA_URN = new DataPlatformUrn("kafka"); - - @Inject - @Named("kafkaProducer") - private Producer _producer; - - @Inject - @Named(TopicConventionFactory.TOPIC_CONVENTION_BEAN) - private TopicConvention _topicConvention; - - @Inject - @Named("zooKeeper") - private ZooKeeper _zooKeeper; - - @Inject - @Named("schemaRegistryClient") - private SchemaRegistryClient _schemaRegistryClient; - - private SchemaMetadata buildDatasetSchema(String datasetName, String schema, int schemaVersion) { - final AuditStamp auditStamp = new AuditStamp(); - auditStamp.setTime(System.currentTimeMillis()); - auditStamp.setActor(new CorpuserUrn(System.getenv("USER"))); - final SchemaMetadata.PlatformSchema platformSchema = new SchemaMetadata.PlatformSchema(); - platformSchema.setKafkaSchema(new KafkaSchema().setDocumentSchema(schema)); - return new SchemaMetadata().setSchemaName(datasetName) - .setPlatform(KAFKA_URN) - .setCreated(auditStamp) - .setLastModified(auditStamp) - .setVersion(schemaVersion) - .setHash("") - .setPlatformSchema(platformSchema) - .setFields(new SchemaFieldArray(new SchemaField().setFieldPath("") - .setDescription("") - .setNativeDataType("string") - .setType(new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new StringType()))))); - } - - private void produceKafkaDatasetMce(SchemaMetadata schemaMetadata) { - MetadataChangeEvent.class.getClassLoader().getResource("avro/com/linkedin/mxe/MetadataChangeEvent.avsc"); - - // Kafka topics are considered datasets in the current DataHub metadata ecosystem. - final KafkaMetadataEventProducer eventProducer = - new KafkaMetadataEventProducer<>(DatasetSnapshot.class, DatasetAspect.class, _producer, _topicConvention); - eventProducer.produceSnapshotBasedMetadataChangeEvent( - new DatasetUrn(KAFKA_URN, schemaMetadata.getSchemaName(), FabricType.PROD), schemaMetadata); - _producer.flush(); - } - - @Override - public void run(String... args) throws Exception { - log.info("Starting up"); - - final List topics = _zooKeeper.getChildren("/brokers/topics", false); - for (String datasetName : topics) { - if (datasetName.startsWith("_")) { - continue; - } - - final String topic = datasetName + "-value"; - io.confluent.kafka.schemaregistry.client.SchemaMetadata schemaMetadata; - try { - schemaMetadata = _schemaRegistryClient.getLatestSchemaMetadata(topic); - } catch (Throwable t) { - log.error("Failed to get schema for topic " + datasetName, t); - log.error("Common failure: does this event schema exist in the schema registry?"); - continue; - } - - if (schemaMetadata == null) { - log.warn(String.format("Skipping topic without schema: %s", topic)); - continue; - } - log.trace(topic); - - produceKafkaDatasetMce(buildDatasetSchema(datasetName, schemaMetadata.getSchema(), schemaMetadata.getVersion())); - log.info("Successfully fired MCE for " + datasetName); - } - } -} diff --git a/metadata-ingestion-examples/kafka-etl/src/main/java/com/linkedin/metadata/examples/kafka/KafkaEtlApplication.java b/metadata-ingestion-examples/kafka-etl/src/main/java/com/linkedin/metadata/examples/kafka/KafkaEtlApplication.java deleted file mode 100644 index 4eec1a5afb8e7..0000000000000 --- a/metadata-ingestion-examples/kafka-etl/src/main/java/com/linkedin/metadata/examples/kafka/KafkaEtlApplication.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.linkedin.metadata.examples.kafka; - -import org.springframework.boot.WebApplicationType; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientAutoConfiguration; -import org.springframework.boot.builder.SpringApplicationBuilder; - - -@SuppressWarnings("checkstyle:HideUtilityClassConstructor") -@SpringBootApplication(exclude = {ElasticsearchRestClientAutoConfiguration.class}, scanBasePackages = { - "com.linkedin.metadata.examples.configs", "com.linkedin.metadata.examples.kafka"}) -public class KafkaEtlApplication { - public static void main(String[] args) { - new SpringApplicationBuilder(KafkaEtlApplication.class).web(WebApplicationType.NONE).run(args); - } -} diff --git a/metadata-ingestion-examples/kafka-etl/src/main/resources/logback.xml b/metadata-ingestion-examples/kafka-etl/src/main/resources/logback.xml deleted file mode 100644 index 2c389931ec525..0000000000000 --- a/metadata-ingestion-examples/kafka-etl/src/main/resources/logback.xml +++ /dev/null @@ -1,40 +0,0 @@ - - - - - - %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n - - - - - ${LOG_DIR}/kafka-etl-java.log - true - - %d{HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n - - - ${LOG_DIR}/kafka-etl.%i.log - 1 - 3 - - - 100MB - - - - - - - - - - - - - - - - - - diff --git a/metadata-ingestion-examples/mce-cli/README.md b/metadata-ingestion-examples/mce-cli/README.md deleted file mode 100644 index de524ad19f967..0000000000000 --- a/metadata-ingestion-examples/mce-cli/README.md +++ /dev/null @@ -1,55 +0,0 @@ -# MCE CLI - -A small application which can produce or consume [MetadataChangeEvents](../../docs/what/mxe.md). - -## Running the Application - -First, ensure that services this depends on, like schema registry / zookeeper / mce-consumer-job / gms / etc, are all -running. - -This application can be run via gradle: - -``` -./gradlew :metadata-ingestion-examples:mce-cli:bootRun -``` - -Or by building and running the jar: - -``` -./gradlew :metadata-ingestion-examples:mce-cli:build - -java -jar metadata-ingestion-examples/mce-cli/build/libs/mce-cli.jar -``` - -### Consuming Events - -Consuming MCEs may be useful to help debug other applications that are meant to produce them. You can easily see what -MCEs are being produced (or not) at a glance. - -``` -./gradlew :metadata-ingestion-examples:mce-cli:bootRun - -# Alternatives -./gradlew :metadata-ingestion-examples:mce-cli:bootRun --args='consume' -java -jar metadata-ingestion-examples/mce-cli/build/libs/mce-cli.jar -java -jar metadata-ingestion-examples/mce-cli/build/libs/mce-cli.jar consume -``` - -### Producing Events - -Producing events can be useful to help debug the MCE pipeline, or just to help make some fake data (ideally, don't do -this on your production stack!). - -``` -./gradlew :metadata-ingestion-examples:mce-cli:bootRun --args='-m produce my-file.json' - -# Alternatively -java -jar metadata-ingestion-examples/mce-cli/build/libs/mce-cli.jar -m produce my-file.json -``` - -Where `my-file.json` is some file that contains a -[MetadataChangEvents](./src/main/pegasus/com/linkedin/metadata/examples/cli/MetadataChangeEvents.pdl) JSON object. - -### Producing the Example Events with Docker - -We have some example events in the `example-bootstrap.json` file, which can be invoked via the above example. \ No newline at end of file diff --git a/metadata-ingestion-examples/mce-cli/build.gradle b/metadata-ingestion-examples/mce-cli/build.gradle deleted file mode 100644 index 5d887ae799775..0000000000000 --- a/metadata-ingestion-examples/mce-cli/build.gradle +++ /dev/null @@ -1,43 +0,0 @@ -plugins { - id 'org.springframework.boot' - id 'java' - id 'pegasus' -} - -dependencies { - compile project(':metadata-utils') - compile project(':metadata-dao-impl:kafka-producer') - compile project(':metadata-events:mxe-schemas') - compile project(':metadata-ingestion-examples:common') - - dataModel project(':metadata-models') - - compile spec.product.pegasus.restliServer - compile externalDependency.javaxInject - compile externalDependency.kafkaAvroSerde - compile externalDependency.lombok - compile externalDependency.picocli - compile externalDependency.springBeans - compile externalDependency.springBootAutoconfigure - compile externalDependency.springCore - compile externalDependency.springKafka - - runtime externalDependency.logbackClassic - - annotationProcessor externalDependency.lombok - annotationProcessor externalDependency.picocli - - constraints { - implementation("org.apache.logging.log4j:log4j-core:2.17.0") { - because("previous versions are vulnerable to CVE-2021-45105") - } - implementation("org.apache.logging.log4j:log4j-api:2.17.0") { - because("previous versions are vulnerable to CVE-2021-45105") - } - } - -} - -bootJar { - mainClassName = 'com.linkedin.metadata.examples.cli.MceCliApplication' -} \ No newline at end of file diff --git a/metadata-ingestion-examples/mce-cli/example-bootstrap.json b/metadata-ingestion-examples/mce-cli/example-bootstrap.json deleted file mode 100644 index c2d86f88bcabf..0000000000000 --- a/metadata-ingestion-examples/mce-cli/example-bootstrap.json +++ /dev/null @@ -1,602 +0,0 @@ -{ - "events": [ - { - "proposedSnapshot": { - "com.linkedin.metadata.snapshot.CorpUserSnapshot": { - "urn": "urn:li:corpuser:datahub", - "aspects": [ - { - "com.linkedin.identity.CorpUserInfo": { - "active": true, - "displayName": "Data Hub", - "fullName": "Data Hub", - "email": "datahub@linkedin.com", - "title": "CEO" - } - } - ] - } - } - }, - { - "proposedSnapshot": { - "com.linkedin.metadata.snapshot.CorpUserSnapshot": { - "urn": "urn:li:corpuser:jdoe", - "aspects": [ - { - "com.linkedin.identity.CorpUserInfo": { - "active": true, - "displayName": "John Doe", - "fullName": "John Doe", - "email": "jdoe@linkedin.com", - "title": "Software Engineer" - } - } - ] - } - } - }, - { - "proposedSnapshot": { - "com.linkedin.metadata.snapshot.DatasetSnapshot": { - "urn": "urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)", - "aspects": [ - { - "com.linkedin.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:jdoe", - "type": "DATAOWNER" - }, - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 1581407189000, - "actor": "urn:li:corpuser:jdoe" - } - } - }, - { - "com.linkedin.common.InstitutionalMemory": { - "elements": [ - { - "url": "https://www.linkedin.com", - "description": "Sample doc", - "createStamp": { - "time": 1581407189000, - "actor": "urn:li:corpuser:jdoe" - } - } - ] - } - }, - { - "com.linkedin.schema.SchemaMetadata": { - "schemaName": "SampleKafkaSchema", - "platform": "urn:li:dataPlatform:kafka", - "version": 0, - "created": { - "time": 1581407189000, - "actor": "urn:li:corpuser:jdoe" - }, - "lastModified": { - "time": 1581407189000, - "actor": "urn:li:corpuser:jdoe" - }, - "hash": "", - "platformSchema": { - "com.linkedin.schema.KafkaSchema": { - "documentSchema": "{\"type\":\"record\",\"name\":\"SampleKafkaSchema\",\"namespace\":\"com.linkedin.dataset\",\"doc\":\"Sample Kafka dataset\",\"fields\":[{\"name\":\"field_foo\",\"type\":[\"string\"]},{\"name\":\"field_bar\",\"type\":[\"boolean\"]}]}" - } - }, - "fields": [ - { - "fieldPath": "field_foo", - "description": "Foo field description", - "nativeDataType": "string", - "type": { - "type": { - "com.linkedin.schema.StringType": {} - } - }, - "glossaryTerms": { - "terms": [{ - "urn": "urn:li:glossaryTerm:instruments.FinancialInstrument_v1" - }], - "auditStamp": { - "time": 1581407189000, - "actor": "urn:li:corpuser:jdoe" - } - } - }, - { - "fieldPath": "field_bar", - "description": "Bar field description", - "nativeDataType": "boolean", - "type": { - "type": { - "com.linkedin.schema.BooleanType": {} - } - } - } - ] - } - }, - { - "com.linkedin.common.GlossaryTerms": { - "terms": [{ - "urn": "urn:li:glossaryTerm:instruments.FinancialInstrument_v1" - }], - "auditStamp": { - "time": 1581407189000, - "actor": "urn:li:corpuser:jdoe" - } - } - } - ] - } - } - }, - { - "proposedSnapshot": { - "com.linkedin.metadata.snapshot.DatasetSnapshot": { - "urn": "urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD)", - "aspects": [ - { - "com.linkedin.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:jdoe", - "type": "DATAOWNER" - }, - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 1581407189000, - "actor": "urn:li:corpuser:jdoe" - } - } - }, - { - "com.linkedin.dataset.UpstreamLineage": { - "upstreams": [ - { - "auditStamp": { - "time": 1581407189000, - "actor": "urn:li:corpuser:jdoe" - }, - "dataset": "urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)", - "type": "TRANSFORMED" - } - ] - } - }, - { - "com.linkedin.common.InstitutionalMemory": { - "elements": [ - { - "url": "https://www.linkedin.com", - "description": "Sample doc", - "createStamp": { - "time": 1581407189000, - "actor": "urn:li:corpuser:jdoe" - } - } - ] - } - }, - { - "com.linkedin.schema.SchemaMetadata": { - "schemaName": "SampleHdfsSchema", - "platform": "urn:li:dataPlatform:hdfs", - "version": 0, - "created": { - "time": 1581407189000, - "actor": "urn:li:corpuser:jdoe" - }, - "lastModified": { - "time": 1581407189000, - "actor": "urn:li:corpuser:jdoe" - }, - "hash": "", - "platformSchema": { - "com.linkedin.schema.KafkaSchema": { - "documentSchema": "{\"type\":\"record\",\"name\":\"SampleHdfsSchema\",\"namespace\":\"com.linkedin.dataset\",\"doc\":\"Sample HDFS dataset\",\"fields\":[{\"name\":\"field_foo\",\"type\":[\"string\"]},{\"name\":\"field_bar\",\"type\":[\"boolean\"]}]}" - } - }, - "fields": [ - { - "fieldPath": "field_foo", - "description": "Foo field description", - "nativeDataType": "string", - "type": { - "type": { - "com.linkedin.schema.StringType": {} - } - } - }, - { - "fieldPath": "field_bar", - "description": "Bar field description", - "nativeDataType": "boolean", - "type": { - "type": { - "com.linkedin.schema.BooleanType": {} - } - } - } - ] - } - } - ] - } - } - }, - { - "proposedSnapshot": { - "com.linkedin.metadata.snapshot.DatasetSnapshot": { - "urn": "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)", - "aspects": [ - { - "com.linkedin.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:jdoe", - "type": "DATAOWNER" - }, - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 1581407189000, - "actor": "urn:li:corpuser:jdoe" - } - } - }, - { - "com.linkedin.dataset.UpstreamLineage": { - "upstreams": [ - { - "auditStamp": { - "time": 1581407189000, - "actor": "urn:li:corpuser:jdoe" - }, - "dataset": "urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD)", - "type": "TRANSFORMED" - } - ] - } - }, - { - "com.linkedin.common.InstitutionalMemory": { - "elements": [ - { - "url": "https://www.linkedin.com", - "description": "Sample doc", - "createStamp": { - "time": 1581407189000, - "actor": "urn:li:corpuser:jdoe" - } - } - ] - } - }, - { - "com.linkedin.schema.SchemaMetadata": { - "schemaName": "SampleHiveSchema", - "platform": "urn:li:dataPlatform:hive", - "version": 0, - "created": { - "time": 1581407189000, - "actor": "urn:li:corpuser:jdoe" - }, - "lastModified": { - "time": 1581407189000, - "actor": "urn:li:corpuser:jdoe" - }, - "hash": "", - "platformSchema": { - "com.linkedin.schema.KafkaSchema": { - "documentSchema": "{\"type\":\"record\",\"name\":\"SampleHiveSchema\",\"namespace\":\"com.linkedin.dataset\",\"doc\":\"Sample Hive dataset\",\"fields\":[{\"name\":\"field_foo\",\"type\":[\"string\"]},{\"name\":\"field_bar\",\"type\":[\"boolean\"]}]}" - } - }, - "fields": [ - { - "fieldPath": "field_foo", - "description": "Foo field description", - "nativeDataType": "string", - "type": { - "type": { - "com.linkedin.schema.StringType": {} - } - } - }, - { - "fieldPath": "field_bar", - "description": "Bar field description", - "nativeDataType": "boolean", - "type": { - "type": { - "com.linkedin.schema.BooleanType": {} - } - } - } - ] - } - } - ] - } - } - }, - { - "proposedSnapshot": { - "com.linkedin.metadata.snapshot.DataProcessSnapshot": { - "urn": "urn:li:dataProcess:(sqoop,DEMO,PROD)", - "aspects": [ - { - "com.linkedin.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 1581407189000, - "actor": "urn:li:corpuser:datahub" - } - } - }, - { - "com.linkedin.dataprocess.DataProcessInfo": { - "inputs": [ - "urn:li:dataset:(urn:li:dataPlatform:cassandra,barEarth,DEV)", - "urn:li:dataset:(urn:li:dataPlatform:cassandra,barMars,DEV)" - ], - "outputs": [ - "urn:li:dataset:(urn:li:dataPlatform:hbase,barSky,PROD)", - "urn:li:dataset:(urn:li:dataPlatform:hbase,barOcean,PROD)" - ] - } - } - ] - } - } - }, - { - "proposedSnapshot": { - "com.linkedin.metadata.snapshot.MLModelSnapshot": { - "urn": "urn:li:mlmodel:(urn:li:dataPlatform:science,scienceModel,PROD)", - "aspects": [ - { - "com.linkedin.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:jdoe", - "type": "DATAOWNER" - }, - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:jdoe" - } - } - }, - { - "com.linkedin.ml.metadata.MLModelProperties": { - "description": "A sample model for predicting some outcome.", - "date": 0, - "version": { - "versionTag": "1.5.3" - }, - "tags": [ - "science" - ] - } - }, - { - "com.linkedin.ml.metadata.TrainingData": { - "trainingData": [ - { - "dataset": "urn:li:dataset:(urn:li:dataPlatform:hive,pageViewsHive,PROD)", - "motivation": "For science!", - "preProcessing": [ - "Aggregation" - ] - } - ] - } - }, - { - "com.linkedin.ml.metadata.EvaluationData": { - "evaluationData": [ - { - "dataset": "urn:li:dataset:(urn:li:dataPlatform:hive,pageViewsHive,PROD)" - } - ] - } - } - ] - } - } - }, - { - "proposedSnapshot": { - "com.linkedin.metadata.snapshot.ChartSnapshot": { - "urn": "urn:li:chart:(Looker,1)", - "aspects": [ - { - "com.linkedin.chart.ChartInfo": { - "title": "Sample Looker Chart", - "description": "This chart contains sample data from Kafka", - "lastModified": { - "lastModified": { - "time": 1581407189000, - "actor": "urn:li:corpuser:datahub" - }, - "created": { - "time": 1581407189000, - "actor": "urn:li:corpuser:jdoe" - } - }, - "inputs": [ - { - "string": "urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)" - } - ], - "chartUrl": "https://www.looker.com", - "type": "BAR", - "access": "PUBLIC" - } - }, - { - "com.linkedin.chart.ChartQuery": { - "rawQuery": "SELECT * FROM SampleTable", - "type": "SQL" - } - }, - { - "com.linkedin.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "STAKEHOLDER" - }, - { - "owner": "urn:li:corpuser:jdoe", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 1581407589000, - "actor": "urn:li:corpuser:datahub" - } - } - } - ] - } - } - }, - { - "proposedSnapshot": { - "com.linkedin.metadata.snapshot.DashboardSnapshot": { - "urn": "urn:li:dashboard:(Looker,0)", - "aspects": [ - { - "com.linkedin.dashboard.DashboardInfo": { - "title": "Sample Looker Dashboard", - "description": "This dashboard shows charts about user retention.", - "lastModified": { - "lastModified": { - "time": 1581407139000, - "actor": "urn:li:corpuser:datahub" - }, - "created": { - "time": 1581404189000, - "actor": "urn:li:corpuser:jdoe" - } - }, - "charts": [ "urn:li:chart:(Looker,1)" ], - "dashboardUrl": "https://www.looker.com", - "access": "PUBLIC" - } - }, - { - "com.linkedin.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:datahub", - "type": "DATAOWNER" - }, - { - "owner": "urn:li:corpuser:jdoe", - "type": "STAKEHOLDER" - } - ], - "lastModified": { - "time": 1581407389000, - "actor": "urn:li:corpuser:jdoe" - } - } - } - ] - } - } - }, - { - "proposedSnapshot": { - "com.linkedin.metadata.snapshot.GlossaryTermSnapshot": { - "urn": "urn:li:glossaryTerm:instruments.FinancialInstrument_v1", - "aspects": [ - { - "com.linkedin.glossary.GlossaryTermInfo": { - "definition": "written contract that gives rise to both a financial asset of one entity and a financial liability of another entity", - "parentNode": "urn:li:glossaryNode:instruments", - "sourceRef": "FIBO", - "termSource": "EXTERNAL", - "sourceUrl": "https://spec.edmcouncil.org/fibo/ontology/FBC/FinancialInstruments/FinancialInstruments/FinancialInstrument", - "customProperties": { - "FQDN": "common.instruments.FinancialInstrument" - } - } - }, - { - "com.linkedin.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:jdoe", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 1581407189000, - "actor": "urn:li:corpuser:jdoe" - } - } - } - ] - } - } - }, - { - "proposedSnapshot": { - "com.linkedin.metadata.snapshot.GlossaryNodeSnapshot": { - "urn": "urn:li:glossaryNode:instruments", - "aspects": [ - { - "com.linkedin.glossary.GlossaryNodeInfo": { - "definition": "Financial Instruments" - } - }, - { - "com.linkedin.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:jdoe", - "type": "DATAOWNER" - } - ], - "lastModified": { - "time": 1581407189000, - "actor": "urn:li:corpuser:jdoe" - } - } - } - ] - } - } - } - ] -} \ No newline at end of file diff --git a/metadata-ingestion-examples/mce-cli/src/main/java/com/linkedin/metadata/examples/cli/MceCli.java b/metadata-ingestion-examples/mce-cli/src/main/java/com/linkedin/metadata/examples/cli/MceCli.java deleted file mode 100644 index 01c83c1fb5334..0000000000000 --- a/metadata-ingestion-examples/mce-cli/src/main/java/com/linkedin/metadata/examples/cli/MceCli.java +++ /dev/null @@ -1,127 +0,0 @@ -package com.linkedin.metadata.examples.cli; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; -import com.google.common.collect.ImmutableList; -import com.linkedin.data.DataMap; -import com.linkedin.data.schema.validation.RequiredMode; -import com.linkedin.data.schema.validation.UnrecognizedFieldMode; -import com.linkedin.data.schema.validation.ValidateDataAgainstSchema; -import com.linkedin.data.schema.validation.ValidationOptions; -import com.linkedin.data.schema.validation.ValidationResult; -import com.linkedin.data.template.DataTemplateUtil; -import com.linkedin.metadata.EventUtils; -import com.linkedin.mxe.MetadataChangeEvent; -import com.linkedin.mxe.Topics; -import com.linkedin.restli.common.ContentType; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.time.Duration; -import java.util.concurrent.ExecutionException; -import javax.annotation.Nonnull; -import javax.inject.Inject; -import javax.inject.Named; -import lombok.extern.slf4j.Slf4j; -import org.apache.avro.generic.GenericRecord; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.springframework.boot.CommandLineRunner; -import org.springframework.stereotype.Component; -import picocli.CommandLine; - - -@Slf4j -@Component -public class MceCli implements CommandLineRunner { - private enum Mode { - PRODUCE, CONSUME - } - - private static final class Args { - @CommandLine.Option(names = {"-m", "--mode"}, defaultValue = "CONSUME") - Mode mode; - - @CommandLine.Parameters( - paramLabel = "EVENT_FILE", - description = "MCE file; required if running 'producer' mode. See MetadataChangeEvents.pdl for schema.", - arity = "0..1" - ) - File eventFile; - } - - @Inject - @Named("kafkaProducer") - private Producer _producer; - - @Inject - @Named("kafkaEventConsumer") - private Consumer _consumer; - - private void consume() { - log.info("Consuming records."); - - _consumer.subscribe(ImmutableList.of(Topics.METADATA_CHANGE_EVENT)); - - while (true) { - final ConsumerRecords records = _consumer.poll(Duration.ofSeconds(1)); - - for (ConsumerRecord record : records) { - log.info(record.value().toString()); - } - } - } - - @VisibleForTesting - static MetadataChangeEvents readEventsFile(@Nonnull File eventsFile) throws IOException { - final DataMap dataMap = ContentType.JSON.getCodec().readMap(new FileInputStream(eventsFile)); - - final ValidationOptions options = new ValidationOptions(); - options.setRequiredMode(RequiredMode.CAN_BE_ABSENT_IF_HAS_DEFAULT); - options.setUnrecognizedFieldMode(UnrecognizedFieldMode.DISALLOW); - - final ValidationResult result = - ValidateDataAgainstSchema.validate(dataMap, DataTemplateUtil.getSchema(MetadataChangeEvents.class), options); - - if (!result.isValid()) { - throw new IllegalArgumentException( - String.format("Error parsing metadata events file %s: \n%s", eventsFile.toString(), - Joiner.on('\n').join(result.getMessages()))); - } - - return DataTemplateUtil.wrap(dataMap, MetadataChangeEvents.class); - } - - private void produce(@Nonnull File eventsFile) throws IOException, ExecutionException, InterruptedException { - final MetadataChangeEvents events = readEventsFile(eventsFile); - int record = 1; - for (MetadataChangeEvent mce : events.getEvents()) { - log.info("Producing record {} of {}", record++, events.getEvents().size()); - _producer.send(new ProducerRecord(Topics.METADATA_CHANGE_EVENT, EventUtils.pegasusToAvroMCE(mce))).get(); - log.info("Produced record."); - } - } - - @Override - public void run(String... cmdLineArgs) throws Exception { - final Args args = new Args(); - new CommandLine(args).setCaseInsensitiveEnumValuesAllowed(true).parseArgs(cmdLineArgs); - - switch (args.mode) { - case CONSUME: - consume(); - break; - case PRODUCE: - if (args.eventFile == null) { - throw new IllegalArgumentException("Event file is required when producing."); - } - produce(args.eventFile); - break; - default: - break; - } - } -} diff --git a/metadata-ingestion-examples/mce-cli/src/main/java/com/linkedin/metadata/examples/cli/MceCliApplication.java b/metadata-ingestion-examples/mce-cli/src/main/java/com/linkedin/metadata/examples/cli/MceCliApplication.java deleted file mode 100644 index 4a8eac0f40d74..0000000000000 --- a/metadata-ingestion-examples/mce-cli/src/main/java/com/linkedin/metadata/examples/cli/MceCliApplication.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.linkedin.metadata.examples.cli; - -import org.springframework.boot.WebApplicationType; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientAutoConfiguration; -import org.springframework.boot.builder.SpringApplicationBuilder; - - -@SuppressWarnings("checkstyle:HideUtilityClassConstructor") -@SpringBootApplication(exclude = {ElasticsearchRestClientAutoConfiguration.class}, scanBasePackages = { - "com.linkedin.metadata.examples.configs", "com.linkedin.metadata.examples.cli"}) -public class MceCliApplication { - public static void main(String[] args) { - new SpringApplicationBuilder(MceCliApplication.class).web(WebApplicationType.NONE).run(args); - } -} diff --git a/metadata-ingestion-examples/mce-cli/src/main/java/com/linkedin/metadata/examples/cli/config/KafkaConsumerConfig.java b/metadata-ingestion-examples/mce-cli/src/main/java/com/linkedin/metadata/examples/cli/config/KafkaConsumerConfig.java deleted file mode 100644 index 072b40dc9b7ad..0000000000000 --- a/metadata-ingestion-examples/mce-cli/src/main/java/com/linkedin/metadata/examples/cli/config/KafkaConsumerConfig.java +++ /dev/null @@ -1,43 +0,0 @@ -package com.linkedin.metadata.examples.cli.config; - -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import java.util.Arrays; -import java.util.Map; -import org.apache.avro.generic.GenericRecord; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.kafka.KafkaProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - - -@Configuration -public class KafkaConsumerConfig { - @Value("${KAFKA_BOOTSTRAP_SERVER:localhost:9092}") - private String kafkaBootstrapServers; - - @Value("${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081}") - private String kafkaSchemaRegistryUrl; - - @Bean(name = "kafkaEventConsumer") - public Consumer kafkaConsumerFactory(KafkaProperties properties) { - KafkaProperties.Consumer consumerProps = properties.getConsumer(); - - consumerProps.setKeyDeserializer(StringDeserializer.class); - consumerProps.setValueDeserializer(KafkaAvroDeserializer.class); - consumerProps.setGroupId("mce-cli"); - - // KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS - if (kafkaBootstrapServers != null && kafkaBootstrapServers.length() > 0) { - consumerProps.setBootstrapServers(Arrays.asList(kafkaBootstrapServers.split(","))); - } // else we rely on KafkaProperties which defaults to localhost:9092 - - Map props = properties.buildConsumerProperties(); - props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaSchemaRegistryUrl); - - return new KafkaConsumer<>(props); - } -} diff --git a/metadata-ingestion-examples/mce-cli/src/main/pegasus/com/linkedin/metadata/examples/cli/MetadataChangeEvents.pdl b/metadata-ingestion-examples/mce-cli/src/main/pegasus/com/linkedin/metadata/examples/cli/MetadataChangeEvents.pdl deleted file mode 100644 index 5ee3aa5bad737..0000000000000 --- a/metadata-ingestion-examples/mce-cli/src/main/pegasus/com/linkedin/metadata/examples/cli/MetadataChangeEvents.pdl +++ /dev/null @@ -1,13 +0,0 @@ -namespace com.linkedin.metadata.examples.cli - -import com.linkedin.mxe.MetadataChangeEvent - -/** - * Schema definition for the format of the input file to the CLI for producing events. - */ -record MetadataChangeEvents { - /** - * Events to produce. - */ - events: array[MetadataChangeEvent] -} \ No newline at end of file diff --git a/metadata-ingestion-examples/mce-cli/src/main/resources/logback.xml b/metadata-ingestion-examples/mce-cli/src/main/resources/logback.xml deleted file mode 100644 index 164930daf0028..0000000000000 --- a/metadata-ingestion-examples/mce-cli/src/main/resources/logback.xml +++ /dev/null @@ -1,40 +0,0 @@ - - - - - - %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n - - - - - ${LOG_DIR}/kafka-etl-java.log - true - - %d{HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n - - - ${LOG_DIR}/kafka-etl.%i.log - 1 - 3 - - - 100MB - - - - - - - - - - - - - - - - - - diff --git a/metadata-ingestion-examples/mce-cli/src/test/java/com/linkedin/metadata/examples/cli/TestExamples.java b/metadata-ingestion-examples/mce-cli/src/test/java/com/linkedin/metadata/examples/cli/TestExamples.java deleted file mode 100644 index 8d993aa8227d5..0000000000000 --- a/metadata-ingestion-examples/mce-cli/src/test/java/com/linkedin/metadata/examples/cli/TestExamples.java +++ /dev/null @@ -1,30 +0,0 @@ -package com.linkedin.metadata.examples.cli; - -import com.linkedin.restli.common.ContentType; -import java.io.File; -import java.io.FileInputStream; -import org.testng.annotations.Test; - -import static org.testng.Assert.*; - - -/** - * Simple test to help us keep our example file up to date with the MCE schema definition, in the event we change - * schemas, or change the file without manually testing it (which we shouldn't do, but can happen by mistake). - */ -public class TestExamples { - private static final File EXAMPLE_FILE = new File("example-bootstrap.json"); - - @Test - public void examplesAreValidJson() throws Exception { - assertTrue(EXAMPLE_FILE.exists()); - // no exception = test passes - ContentType.JSON.getCodec().readMap(new FileInputStream(EXAMPLE_FILE)); - } - - @Test - public void examplesMatchSchemas() throws Exception { - // no exception = test passes - MceCli.readEventsFile(EXAMPLE_FILE); - } -} diff --git a/settings.gradle b/settings.gradle index 4f5b5dc44cd88..7b864c773e1fd 100644 --- a/settings.gradle +++ b/settings.gradle @@ -19,9 +19,6 @@ include 'metadata-events:mxe-registration' include 'metadata-events:mxe-schemas' include 'metadata-events:mxe-utils-avro-1.7' include 'metadata-ingestion' -include 'metadata-ingestion-examples:common' -include 'metadata-ingestion-examples:kafka-etl' -include 'metadata-ingestion-examples:mce-cli' include 'metadata-jobs:mae-consumer' include 'metadata-jobs:mce-consumer' include 'metadata-jobs:pe-consumer' @@ -46,4 +43,4 @@ include 'metadata-integration:java:datahub-client' include 'metadata-integration:java:datahub-protobuf' include 'metadata-ingestion-modules:airflow-plugin' include 'ingestion-scheduler' -include 'smoke-test' \ No newline at end of file +include 'smoke-test' diff --git a/temp_time_series/create_mock_index.py b/temp_time_series/create_mock_index.py deleted file mode 100755 index 65997f7011f35..0000000000000 --- a/temp_time_series/create_mock_index.py +++ /dev/null @@ -1,174 +0,0 @@ -#!/usr/bin/env python3 -import os -from datetime import datetime -from typing import Optional, Generator, Tuple - -# import hashlib - -HOUR_IN_MS = 3600000 -DAY_IN_MS = 86400000 -START_DAY_IN_MS = int(datetime.now().timestamp() * 1000) - 5 * DAY_IN_MS - -CounterType = Optional[int] -NameType = Optional[str] -IndexRowType = Tuple[ - NameType, - CounterType, - CounterType, - NameType, - CounterType, - CounterType, - CounterType, - CounterType, - CounterType, - CounterType, -] - - -def day(n: int) -> int: - return START_DAY_IN_MS + n * DAY_IN_MS - - -class MockIndexGenerator: - INDEX_NAME = "mock_dataset_stats_aspect_v1" - - INDEX_FIELD_NAMES = [ - "urn", - "rowCount", - "columnCount", - "columnStats.key", - "columnStats.numNull", - "eventTimestampMillis", - "eventGranularity", - "partitionSpec.parition", - "partitionSpec.timeWindow.startTimeMillis", - "partitionSpec.timeWindow.granulatiry", - ] - - INDEX_FIELD_TYPES = [ - "keyword", - "long", - "long", - "keyword", - "long", - "date", - "long", - "keyword", - "date", - "long", - ] - - def __init__(self, start_days_in_ms, num_recs, num_cols): - self._start_days_in_ms = start_days_in_ms - self._num_recs = num_recs - self._num_cols = num_cols - self._stat_num_rows_start = 10000 - self._stat_num_cols_start = 50 - self._stat_num_nulls = 100 - - def _get_num_rows(self, i: int): - return self._stat_num_rows_start + (100 * i) - - def _get_num_cols(self, i: int): - return self._stat_num_cols_start + i - - def _get_num_nulls(self, i: int, c: int): - return self._stat_num_nulls + c + (10 * i) - - def _get_event_time_ms(self, i: int): - return self._start_days_in_ms + (i * HOUR_IN_MS) - - @staticmethod - def _get_index_row_json(row: IndexRowType) -> str: - return ",".join( - [ - f'"{field}" : "{value}"' - for field, value in zip(MockIndexGenerator.INDEX_FIELD_NAMES, row) - if value is not None - ] - ) - - def get_records(self) -> Generator[IndexRowType, None, None]: - for i in range(self._num_recs): - # emit one table record - yield self._get_index_row_json(( - "table_1", - self._get_num_rows(i), - self._get_num_cols(i), - None, - None, - self._get_event_time_ms(i), - HOUR_IN_MS, - None, - None, - None) - ) - # emit one record per column - for c in range(self._num_cols): - yield self._get_index_row_json(( - f"table_1", - None, - None, - f"col_{c}", - self._get_num_nulls(i, c), - self._get_event_time_ms(i), - HOUR_IN_MS, - None, - None, - None) - ) - - @staticmethod - def get_props_json() -> str: - return ",".join( - [ - f'"{field}" : {{ "type" : "{type}" }}' - for field, type in zip( - MockIndexGenerator.INDEX_FIELD_NAMES, - MockIndexGenerator.INDEX_FIELD_TYPES, - ) - ] - ) - - -def gen_index_schema() -> None: - properties_json = MockIndexGenerator.get_props_json() - index_schema_gen_cmd = ( - f"curl -v -XPUT http://localhost:9200/{MockIndexGenerator.INDEX_NAME} -H 'Content-Type: application/json' -d '" - + """ - { - "settings":{}, - "mappings":{ - "properties":{ """ - + f"{properties_json}" - + """ - } - } - }'""" - ) - print(index_schema_gen_cmd) - os.system(index_schema_gen_cmd) - - -def populate_index_data() -> None: - for id, row in enumerate( - MockIndexGenerator(START_DAY_IN_MS, 100, 20).get_records() - ): - # id = hashlib.md5(row.encode("utf-8")).hexdigest() - index_row_gen_command = ( - f"curl -v -XPUT http://localhost:9200/{MockIndexGenerator.INDEX_NAME}/_doc/{id} " - + "-H 'Content-Type: application/json' -d '{ " - + f"{row}" - + " }'" - ) - print(index_row_gen_command) - os.system(index_row_gen_command) - - -def generate() -> None: - #gen_index_schema() - populate_index_data() - - -if __name__ == "__main__": - generate()