From e34ab62096abe95c3cc9c1e9c5c631486d1b7ca4 Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Mon, 4 Nov 2019 15:22:20 -0600 Subject: [PATCH 1/5] Added random split function --- optimus/dataframe/extension.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/optimus/dataframe/extension.py b/optimus/dataframe/extension.py index f026919e0..470871dbd 100644 --- a/optimus/dataframe/extension.py +++ b/optimus/dataframe/extension.py @@ -412,6 +412,21 @@ def table(self, limit=None, columns=None, title=None, truncate=True): self.show() +@add_method(DataFrame) +def random_split(self, weights: list = None, seed: int = 1): + """ + Create 2 random splited DataFrames + :param self: + :param weights: + :param seed: + :return: + """ + if weights is None: + weights = [0.8, 0.2] + + return self.randomSplit(weights, seed) + + @add_method(DataFrame) def debug(self): """ From 376e17b7fb9fad9cc95a002cda66ad3cf0717a69 Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Mon, 4 Nov 2019 15:41:30 -0600 Subject: [PATCH 2/5] Add limit to row --- optimus/dataframe/rows.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/optimus/dataframe/rows.py b/optimus/dataframe/rows.py index ec5edd2d9..2abe1de6f 100644 --- a/optimus/dataframe/rows.py +++ b/optimus/dataframe/rows.py @@ -220,6 +220,18 @@ def drop_first(): df = df.preserve_meta(self, Actions.DROP_ROW.value, df.cols.names()) return df + @add_attr(rows) + def limit(count): + """ + Limit the number of rows + :param count: + :return: + """ + df = self + df = df.limit(count) + df = df.preserve_meta(self) + return df + # TODO: Merge with select @add_attr(rows) def is_in(input_cols, values): From 75c09025eaf2380ad2d8de492bca22b924407d37 Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Mon, 4 Nov 2019 15:42:54 -0600 Subject: [PATCH 3/5] Improve code quality --- optimus/profiler/profiler.py | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/optimus/profiler/profiler.py b/optimus/profiler/profiler.py index c9a06d782..bc0501024 100644 --- a/optimus/profiler/profiler.py +++ b/optimus/profiler/profiler.py @@ -245,8 +245,10 @@ def dataset(self, df, columns="*", buckets=10, infer=False, relative_error=RELAT # If not empty the profiler already run. # So process the dataframe's metadata to be sure which columns need to be profiled is_cached = len(self.output_columns) > 0 - actions = df.get_meta()["transformations"].get("actions") - + actions = df.get_meta("transformations.actions") + # print(actions) + # are_actions = None + # actions = actions.get("actions") are_actions = actions is not None and len(actions) > 0 # Process actions to check if any column must be processed @@ -260,13 +262,14 @@ def match_actions_names(_actions): :param _actions: :return: """ - transformations = df.get_meta()["transformations"]["actions"] + + _actions = df.get_meta("transformations.actions") modified = [] for action in _actions: - if transformations.get(action): + if _actions.get(action): # Check if was renamed - col = transformations.get(action) + col = _actions.get(action) if len(match_names(col)) == 0: _result = col else: @@ -282,33 +285,33 @@ def match_names(_col_names): :return: """ _renamed_columns = [] - transformations = df.get_meta()["transformations"].get("actions") - _rename = transformations.get("rename") + _actions = df.get_meta("transformations.actions") + _rename = _actions.get("rename") if _rename: - for col_name in _col_names: + for _col_name in _col_names: # The column name has been changed. Get the new name - c = transformations["rename"].get(col_name) + c = _actions["rename"].get(_col_name) # The column has not been rename. Get the actual if c is None: - c = col_name + c = _col_name _renamed_columns.append(c) else: _renamed_columns = _col_names return _renamed_columns - # New columns + # New columns new_columns = [] current_col_names = df.cols.names() - renamed_cols = match_names(df.get_meta()["transformations"]["columns"]) + renamed_cols = match_names(df.get_meta("transformations.columns")) for current_col_name in current_col_names: if current_col_name not in renamed_cols: new_columns.append(current_col_name) # Rename keys to match new names profiler_columns = self.output_columns["columns"] - actions = df.get_meta()["transformations"]["actions"] + actions = df.get_meta("transformations.actions") rename = actions.get("rename") if rename: for k, v in actions["rename"].items(): From 8eb6dff39529a794f1fce6ed7245ffee7f1f8f1d Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Mon, 4 Nov 2019 15:43:35 -0600 Subject: [PATCH 4/5] Fix profiler send ouput --- optimus/dataframe/extension.py | 1 + optimus/profiler/profiler.py | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/optimus/dataframe/extension.py b/optimus/dataframe/extension.py index 470871dbd..57c7249c2 100644 --- a/optimus/dataframe/extension.py +++ b/optimus/dataframe/extension.py @@ -456,6 +456,7 @@ def send(self, name=None, infer=True, mismatch=None, stats=True): approx_count=True, sample=10000, stats=stats, + format="json", mismatch=mismatch) if Comm: diff --git a/optimus/profiler/profiler.py b/optimus/profiler/profiler.py index bc0501024..c7af4f30b 100644 --- a/optimus/profiler/profiler.py +++ b/optimus/profiler/profiler.py @@ -378,6 +378,7 @@ def match_names(_col_names): df = df.set_meta(value={}) df = df.columns_meta(df.cols.names()) + col_names = output_columns["columns"].keys() if format == "json": result = json.dumps(output_columns, ignore_nan=True, default=json_converter) else: @@ -388,7 +389,7 @@ def match_names(_col_names): # print(result) df = df.set_meta("transformations.actions", {}) - return result["columns"].keys(), result + return col_names, result def columns_stats(self, df, columns, buckets=10, infer=False, relative_error=RELATIVE_ERROR, approx_count=True, mismatch=None): From 9821ae9896d2f4d3d1bbb59b41c0791eb932388f Mon Sep 17 00:00:00 2001 From: Argenis Leon Date: Mon, 4 Nov 2019 16:12:18 -0600 Subject: [PATCH 5/5] Bump version --- docs/source/conf.py | 2 +- optimus/version.py | 2 +- setup.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/source/conf.py b/docs/source/conf.py index 6cfbf9388..6cfe94515 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -60,7 +60,7 @@ # The short X.Y version. version = '2.2' # The full version, including alpha/beta/rc tags. -release = "2.2.24" +release = "2.2.25" # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/optimus/version.py b/optimus/version.py index d9078b144..91c3e216e 100644 --- a/optimus/version.py +++ b/optimus/version.py @@ -5,5 +5,5 @@ def _safe_int(string): return string -__version__ = '2.2.24' +__version__ = '2.2.25' VERSION = tuple(_safe_int(x) for x in __version__.split('.')) diff --git a/setup.py b/setup.py index 36f2d76db..0c4076910 100644 --- a/setup.py +++ b/setup.py @@ -59,7 +59,7 @@ def readme(): author='Favio Vazquez and Argenis Leon', author_email='argenisleon@gmail.com', url='https://github.com/ironmussa/Optimus/', - download_url='https://github.com/ironmussa/Optimus/archive/2.2.24.tar.gz', + download_url='https://github.com/ironmussa/Optimus/archive/2.2.25.tar.gz', description=('Optimus is the missing framework for cleaning and pre-processing data in a distributed fashion with ' 'pyspark.'), long_description=readme(),