diff --git a/docs/source/conf.py b/docs/source/conf.py index 6cfbf9388..6cfe94515 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -60,7 +60,7 @@ # The short X.Y version. version = '2.2' # The full version, including alpha/beta/rc tags. -release = "2.2.24" +release = "2.2.25" # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/optimus/dataframe/extension.py b/optimus/dataframe/extension.py index f026919e0..57c7249c2 100644 --- a/optimus/dataframe/extension.py +++ b/optimus/dataframe/extension.py @@ -412,6 +412,21 @@ def table(self, limit=None, columns=None, title=None, truncate=True): self.show() +@add_method(DataFrame) +def random_split(self, weights: list = None, seed: int = 1): + """ + Create 2 random splited DataFrames + :param self: + :param weights: + :param seed: + :return: + """ + if weights is None: + weights = [0.8, 0.2] + + return self.randomSplit(weights, seed) + + @add_method(DataFrame) def debug(self): """ @@ -441,6 +456,7 @@ def send(self, name=None, infer=True, mismatch=None, stats=True): approx_count=True, sample=10000, stats=stats, + format="json", mismatch=mismatch) if Comm: diff --git a/optimus/dataframe/rows.py b/optimus/dataframe/rows.py index ec5edd2d9..2abe1de6f 100644 --- a/optimus/dataframe/rows.py +++ b/optimus/dataframe/rows.py @@ -220,6 +220,18 @@ def drop_first(): df = df.preserve_meta(self, Actions.DROP_ROW.value, df.cols.names()) return df + @add_attr(rows) + def limit(count): + """ + Limit the number of rows + :param count: + :return: + """ + df = self + df = df.limit(count) + df = df.preserve_meta(self) + return df + # TODO: Merge with select @add_attr(rows) def is_in(input_cols, values): diff --git a/optimus/profiler/profiler.py b/optimus/profiler/profiler.py index c9a06d782..c7af4f30b 100644 --- a/optimus/profiler/profiler.py +++ b/optimus/profiler/profiler.py @@ -245,8 +245,10 @@ def dataset(self, df, columns="*", buckets=10, infer=False, relative_error=RELAT # If not empty the profiler already run. # So process the dataframe's metadata to be sure which columns need to be profiled is_cached = len(self.output_columns) > 0 - actions = df.get_meta()["transformations"].get("actions") - + actions = df.get_meta("transformations.actions") + # print(actions) + # are_actions = None + # actions = actions.get("actions") are_actions = actions is not None and len(actions) > 0 # Process actions to check if any column must be processed @@ -260,13 +262,14 @@ def match_actions_names(_actions): :param _actions: :return: """ - transformations = df.get_meta()["transformations"]["actions"] + + _actions = df.get_meta("transformations.actions") modified = [] for action in _actions: - if transformations.get(action): + if _actions.get(action): # Check if was renamed - col = transformations.get(action) + col = _actions.get(action) if len(match_names(col)) == 0: _result = col else: @@ -282,33 +285,33 @@ def match_names(_col_names): :return: """ _renamed_columns = [] - transformations = df.get_meta()["transformations"].get("actions") - _rename = transformations.get("rename") + _actions = df.get_meta("transformations.actions") + _rename = _actions.get("rename") if _rename: - for col_name in _col_names: + for _col_name in _col_names: # The column name has been changed. Get the new name - c = transformations["rename"].get(col_name) + c = _actions["rename"].get(_col_name) # The column has not been rename. Get the actual if c is None: - c = col_name + c = _col_name _renamed_columns.append(c) else: _renamed_columns = _col_names return _renamed_columns - # New columns + # New columns new_columns = [] current_col_names = df.cols.names() - renamed_cols = match_names(df.get_meta()["transformations"]["columns"]) + renamed_cols = match_names(df.get_meta("transformations.columns")) for current_col_name in current_col_names: if current_col_name not in renamed_cols: new_columns.append(current_col_name) # Rename keys to match new names profiler_columns = self.output_columns["columns"] - actions = df.get_meta()["transformations"]["actions"] + actions = df.get_meta("transformations.actions") rename = actions.get("rename") if rename: for k, v in actions["rename"].items(): @@ -375,6 +378,7 @@ def match_names(_col_names): df = df.set_meta(value={}) df = df.columns_meta(df.cols.names()) + col_names = output_columns["columns"].keys() if format == "json": result = json.dumps(output_columns, ignore_nan=True, default=json_converter) else: @@ -385,7 +389,7 @@ def match_names(_col_names): # print(result) df = df.set_meta("transformations.actions", {}) - return result["columns"].keys(), result + return col_names, result def columns_stats(self, df, columns, buckets=10, infer=False, relative_error=RELATIVE_ERROR, approx_count=True, mismatch=None): diff --git a/optimus/version.py b/optimus/version.py index d9078b144..91c3e216e 100644 --- a/optimus/version.py +++ b/optimus/version.py @@ -5,5 +5,5 @@ def _safe_int(string): return string -__version__ = '2.2.24' +__version__ = '2.2.25' VERSION = tuple(_safe_int(x) for x in __version__.split('.')) diff --git a/setup.py b/setup.py index 36f2d76db..0c4076910 100644 --- a/setup.py +++ b/setup.py @@ -59,7 +59,7 @@ def readme(): author='Favio Vazquez and Argenis Leon', author_email='argenisleon@gmail.com', url='https://github.com/ironmussa/Optimus/', - download_url='https://github.com/ironmussa/Optimus/archive/2.2.24.tar.gz', + download_url='https://github.com/ironmussa/Optimus/archive/2.2.25.tar.gz', description=('Optimus is the missing framework for cleaning and pre-processing data in a distributed fashion with ' 'pyspark.'), long_description=readme(),