Skip to content

Commit

Permalink
Merge pull request #740 from ironmussa/develop
Browse files Browse the repository at this point in the history
Fix send() ouput
  • Loading branch information
argenisleon authored Nov 5, 2019
2 parents 2664a4e + 9821ae9 commit 2fe7ffe
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 17 deletions.
2 changes: 1 addition & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
# The short X.Y version.
version = '2.2'
# The full version, including alpha/beta/rc tags.
release = "2.2.24"
release = "2.2.25"

# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
Expand Down
16 changes: 16 additions & 0 deletions optimus/dataframe/extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,21 @@ def table(self, limit=None, columns=None, title=None, truncate=True):
self.show()


@add_method(DataFrame)
def random_split(self, weights: list = None, seed: int = 1):
"""
Create 2 random splited DataFrames
:param self:
:param weights:
:param seed:
:return:
"""
if weights is None:
weights = [0.8, 0.2]

return self.randomSplit(weights, seed)


@add_method(DataFrame)
def debug(self):
"""
Expand Down Expand Up @@ -441,6 +456,7 @@ def send(self, name=None, infer=True, mismatch=None, stats=True):
approx_count=True,
sample=10000,
stats=stats,
format="json",
mismatch=mismatch)

if Comm:
Expand Down
12 changes: 12 additions & 0 deletions optimus/dataframe/rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,18 @@ def drop_first():
df = df.preserve_meta(self, Actions.DROP_ROW.value, df.cols.names())
return df

@add_attr(rows)
def limit(count):
"""
Limit the number of rows
:param count:
:return:
"""
df = self
df = df.limit(count)
df = df.preserve_meta(self)
return df

# TODO: Merge with select
@add_attr(rows)
def is_in(input_cols, values):
Expand Down
32 changes: 18 additions & 14 deletions optimus/profiler/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,10 @@ def dataset(self, df, columns="*", buckets=10, infer=False, relative_error=RELAT
# If not empty the profiler already run.
# So process the dataframe's metadata to be sure which columns need to be profiled
is_cached = len(self.output_columns) > 0
actions = df.get_meta()["transformations"].get("actions")

actions = df.get_meta("transformations.actions")
# print(actions)
# are_actions = None
# actions = actions.get("actions")
are_actions = actions is not None and len(actions) > 0

# Process actions to check if any column must be processed
Expand All @@ -260,13 +262,14 @@ def match_actions_names(_actions):
:param _actions:
:return:
"""
transformations = df.get_meta()["transformations"]["actions"]

_actions = df.get_meta("transformations.actions")

modified = []
for action in _actions:
if transformations.get(action):
if _actions.get(action):
# Check if was renamed
col = transformations.get(action)
col = _actions.get(action)
if len(match_names(col)) == 0:
_result = col
else:
Expand All @@ -282,33 +285,33 @@ def match_names(_col_names):
:return:
"""
_renamed_columns = []
transformations = df.get_meta()["transformations"].get("actions")
_rename = transformations.get("rename")
_actions = df.get_meta("transformations.actions")
_rename = _actions.get("rename")
if _rename:
for col_name in _col_names:
for _col_name in _col_names:
# The column name has been changed. Get the new name
c = transformations["rename"].get(col_name)
c = _actions["rename"].get(_col_name)
# The column has not been rename. Get the actual
if c is None:
c = col_name
c = _col_name
_renamed_columns.append(c)
else:
_renamed_columns = _col_names
return _renamed_columns

# New columns
# New columns

new_columns = []

current_col_names = df.cols.names()
renamed_cols = match_names(df.get_meta()["transformations"]["columns"])
renamed_cols = match_names(df.get_meta("transformations.columns"))
for current_col_name in current_col_names:
if current_col_name not in renamed_cols:
new_columns.append(current_col_name)

# Rename keys to match new names
profiler_columns = self.output_columns["columns"]
actions = df.get_meta()["transformations"]["actions"]
actions = df.get_meta("transformations.actions")
rename = actions.get("rename")
if rename:
for k, v in actions["rename"].items():
Expand Down Expand Up @@ -375,6 +378,7 @@ def match_names(_col_names):
df = df.set_meta(value={})
df = df.columns_meta(df.cols.names())

col_names = output_columns["columns"].keys()
if format == "json":
result = json.dumps(output_columns, ignore_nan=True, default=json_converter)
else:
Expand All @@ -385,7 +389,7 @@ def match_names(_col_names):
# print(result)
df = df.set_meta("transformations.actions", {})

return result["columns"].keys(), result
return col_names, result

def columns_stats(self, df, columns, buckets=10, infer=False, relative_error=RELATIVE_ERROR, approx_count=True,
mismatch=None):
Expand Down
2 changes: 1 addition & 1 deletion optimus/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ def _safe_int(string):
return string


__version__ = '2.2.24'
__version__ = '2.2.25'
VERSION = tuple(_safe_int(x) for x in __version__.split('.'))
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def readme():
author='Favio Vazquez and Argenis Leon',
author_email='[email protected]',
url='https://github.com/ironmussa/Optimus/',
download_url='https://github.com/ironmussa/Optimus/archive/2.2.24.tar.gz',
download_url='https://github.com/ironmussa/Optimus/archive/2.2.25.tar.gz',
description=('Optimus is the missing framework for cleaning and pre-processing data in a distributed fashion with '
'pyspark.'),
long_description=readme(),
Expand Down

0 comments on commit 2fe7ffe

Please sign in to comment.