Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First PR for new annotations and new model for DFG nodes #583

Merged
merged 19 commits into from
Jun 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
af308e6
WIP: first changes for new annotations
festutz Apr 14, 2022
a3fd11d
Make parallelization use mapper info from new annotations
festutz Apr 20, 2022
20bc389
Make parallelization use mapper info from new annotations
festutz Apr 20, 2022
c22ec79
Delete com_mapper field in dfg_node and use the info from new annotat…
festutz Apr 21, 2022
65d2580
Use input info from new annotations
festutz Apr 21, 2022
95de306
Use output info from new annotations
festutz Apr 21, 2022
a1d7e88
Make dfg_options use the information from new annotations and complet…
festutz Apr 21, 2022
7627d4d
Make to_ast work for wf.sh with some hacks, e.g. to_ast for eager to …
festutz Apr 28, 2022
296bdc9
Remove com_mapper and com_aggregator from DFGNode
festutz May 10, 2022
cf9438b
WIP: incorporating remodelled command invocations
festutz Jun 20, 2022
2f83af0
Parsing with new dataflow node model works
festutz Jun 21, 2022
00462e6
WIP: parallelization
festutz Jun 21, 2022
a1ec6bd
Rudimentary parallelization with new annotations works
festutz Jun 22, 2022
02b6031
Add way to specify where to find repository for annotations repository
festutz Jun 23, 2022
26bab26
Do not require flag `r_split` since we do consecutive chunks for now
festutz Jun 23, 2022
6f8ed01
Merge remote-tracking branch 'origin/future_annotations' into new_ann…
festutz Jun 24, 2022
8d9fbf7
1st part of changes due to comments for PR
festutz Jun 24, 2022
ed2f411
minor fix
festutz Jun 24, 2022
f1dade0
2nd part of changes due to comments for PR
festutz Jun 27, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
## PaSh: Light-touch Data-Parallel Shell Processing

**TODO before testing new annotations (temporary fix):**

Connect the new annotations repository to PaSh in the `future_annotations`-branch:
- clone the `connect_to_pash` branch from the new repository for annotations: [email protected]:binpash/annotations.git
- Specify the path in `compiler/config.py`

> _A system for parallelizing POSIX shell scripts._
> _Hosted by the [Linux Foundation](https://linuxfoundation.org/press-release/linux-foundation-to-host-the-pash-project-accelerating-shell-scripting-with-automated-parallelization-for-industrial-use-cases/)._

Expand Down
10 changes: 10 additions & 0 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
## TODOs before merging to `future`

- eager
- aggregation trees
- r_split
- cat-split fusion
- working on all tests
- Adding annotation library installation and removing ad-hoc import of the latter
- clean up utils for annotations
- Changing PaSh flags (making the default be priority r-split and then consecutive chunks), so remove the r_split flag and make defaults be the ones from the OSDI paper (I can do that TODO too)
1 change: 1 addition & 0 deletions compiler/annotations.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ def get_command_properties_from_annotations(command, options, annotations):
return command_ann['properties']

def get_command_aggregator_from_annotations(command, options, annotations):
log(f'still used')
command_ann = get_command_from_annotations(command, options, annotations)
if(command_ann
and 'aggregator' in command_ann):
Expand Down
59 changes: 59 additions & 0 deletions compiler/annotations_utils/util_aggregator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# TODO: this file can properly be deleted

import sys
from config import get_path_annotation_repo
sys.path.insert(1, get_path_annotation_repo())

from definitions.ir.dfg_node import DFGNode
from definitions.ir.nodes.cat import Cat
from annotations_utils.util_cmd_invocations import get_command_invocation_prefix_from_dfg_node
from util import log
from ir_utils import string_to_argument
from definitions.ir.arg import Arg

def get_aggregator_as_dfg_node_from_node(node, parallelizer, inputs, outputs) -> DFGNode:
assert(False)
cmd_inv_pref = get_command_invocation_prefix_from_dfg_node(node)
log(f'cmdinvpref for agg: {cmd_inv_pref}')
aggregator = parallelizer.get_actual_aggregator(cmd_inv_pref)
log(f'here agg: {aggregator}')
# TODO: this could be simplified once we use the new attributes
if aggregator.cmd_name == 'cat':
return Cat(inputs=inputs,
outputs=outputs,
com_name=Arg(string_to_argument(aggregator.cmd_name)),
com_options=[], # empty and not taking over from other one
com_category="stateless",
com_redirs=node.com_redirs,
com_assignments=node.com_assignments,
flag_option_list=aggregator.flag_option_list,
positional_config_list=aggregator.positional_config_list,
positional_input_list=None, # TODO: somehow from inputs, future shift
positional_output_list=None # TODO: somehow from outputs, future shift
# TODO:
# implicit_use_of_stdin = False,
# implicit_use_of_stdout = False,
# omitted for now since we do not consider nested parallelization
# parallelizer_list = None,
# cmd_related_properties = None,
)
else:
log(f'agg_com_name: {aggregator.cmd_name}')
log(f'agg_flag_option_list: {aggregator.flag_option_list}')
return DFGNode(inputs=inputs,
outputs=outputs,
com_name=Arg(string_to_argument(aggregator.cmd_name)),
com_options=node.com_options,
com_redirs=node.com_redirs,
com_assignments=node.com_assignments,
flag_option_list=aggregator.flag_option_list,
positional_config_list=aggregator.positional_config_list,
positional_input_list=None, # TODO: somehow from inputs, future shift
positional_output_list=None # TODO: somehow from outputs, future shift
# TODO:
# implicit_use_of_stdin = False,
# implicit_use_of_stdout = False,
# omitted for now since we do not consider nested parallelization
# parallelizer_list = None,
# cmd_related_properties = None,
)
96 changes: 96 additions & 0 deletions compiler/annotations_utils/util_cmd_invocations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import sys

from datatypes_new.BasicDatatypes import Flag
from datatypes_new.BasicDatatypesWithIO import OptionWithIO
from datatypes_new.CommandInvocationInitial import CommandInvocationInitial
from annotation_generation_new.datatypes.InputOutputInfo import InputOutputInfo
from annotation_generation_new.datatypes.ParallelizabilityInfo import ParallelizabilityInfo
from annotation_generation_new.datatypes.CommandProperties import CommandProperties
from annotation_generation_new.AnnotationGeneration import get_input_output_info_from_cmd_invocation, \
get_parallelizability_info_from_cmd_invocation

from util import log

from config import get_path_annotation_repo
sys.path.insert(1, get_path_annotation_repo())

# for typing
from datatypes_new.CommandInvocationPrefix import CommandInvocationPrefix

from ir_utils import string_to_argument, redir_stdout_to_file, redir_file_to_stdin, make_command

def get_command_invocation_prefix_from_dfg_node(dfg_node):
return CommandInvocationPrefix(cmd_name = dfg_node.com_name,
flag_option_list = dfg_node.flag_option_list,
positional_config_list = dfg_node.positional_config_list)

# TODO: ideally methods in the respective classes but requires refactoring of parsing infrastructure
def to_node_cmd_inv_with_io_vars(cmd_inv, edges, redirs, assignments):
log("edges", edges)
ast_cmd_name = string_to_argument(cmd_inv.cmd_name)
log("ast_cmd_name", ast_cmd_name)
ast_flagoptions = []
for flagoption in cmd_inv.flag_option_list:
ast_flagoptions += to_ast_flagoption(flagoption, edges)
log("flagoptions", cmd_inv.flag_option_list)
log("ast_flagoptions", ast_flagoptions)
ast_operands = [to_ast_operand(operand, edges) for operand in cmd_inv.operand_list]
log("operands", cmd_inv.operand_list)
log("ast_operands", ast_operands)
# log("type of ast_operands [0]", type(ast_operands[0])) # can only be used if there are operands
cmd_asts = [ast_cmd_name] + ast_flagoptions + ast_operands

# TODO: check for actual stdin
stdin_redir = []
if cmd_inv.implicit_use_of_streaming_input is not None:
fid, _, _ = edges[cmd_inv.implicit_use_of_streaming_input]
if not (fid.has_file_descriptor_resource() and fid.resource.is_stdin()):
stdin_redir = [redir_file_to_stdin(fid.to_ast())]

# TODO: check for actual stdout
stdout_redir = []
if cmd_inv.implicit_use_of_streaming_output is not None:
fid, _, _ = edges[cmd_inv.implicit_use_of_streaming_output]
if not (fid.has_file_descriptor_resource() and fid.resource.is_stdout()):
stdout_redir = [redir_stdout_to_file(fid.to_ast())]

new_redirs = redirs + stdin_redir + stdout_redir
node = make_command(cmd_asts, redirections=new_redirs, assignments=assignments)
log("node", node)
return node

def to_ast_flagoption(flagoption, _edges):
if isinstance(flagoption, Flag):
return [string_to_argument(flagoption.get_name())]
elif isinstance(flagoption, OptionWithIO): # retype to IOVar
opt_name_ast = string_to_argument(flagoption.get_name())
opt_arg_ast = translate_io_var_if_applicable(flagoption.get_arg())
return [opt_name_ast, opt_arg_ast]

def to_ast_operand(operand, edges):
return translate_io_var_if_applicable(operand, edges)

def translate_io_var_if_applicable(pot_io_var, edges):
if isinstance(pot_io_var, int):
return dereference_io_var(pot_io_var, edges)
else:
return to_ast_arg_string_type(pot_io_var)

def to_ast_arg_string_type(arg_string_type):
return arg_string_type.get_name().arg_char_list # is of type Arg

# assumes io_var is an edge id
def dereference_io_var(io_var, edges):
fid, _, _ = edges[io_var]
log(fid)
return fid.to_ast()

def get_input_output_info_from_cmd_invocation_util(cmd_invocationInitial : CommandInvocationInitial) -> InputOutputInfo:
return get_input_output_info_from_cmd_invocation(cmd_invocationInitial)

def get_parallelizability_info_from_cmd_invocation_util(cmd_invocationInitial : CommandInvocationInitial) -> ParallelizabilityInfo:
return get_parallelizability_info_from_cmd_invocation(cmd_invocationInitial)

def construct_property_container_from_list_of_properties(list_properties):
return CommandProperties(dict(list_properties))

21 changes: 21 additions & 0 deletions compiler/annotations_utils/util_file_descriptors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from util import log
from definitions.ir.resource import FileResource, Resource, FileDescriptorResource
import sys
from config import get_path_annotation_repo
sys.path.insert(1, get_path_annotation_repo())
from datatypes_new.BasicDatatypesWithIO import FileNameWithIOInfo, StdDescriptorWithIOInfo


def resource_from_file_descriptor(file_descriptor) -> Resource:
if isinstance(file_descriptor, FileNameWithIOInfo):
arg = file_descriptor.get_name()
log(f'filedes name: {file_descriptor.get_name()}')
log(f'filedes name type: {type(file_descriptor.get_name())}')
log(f'arg: {arg}')
return FileResource(file_descriptor.get_name())
elif isinstance(file_descriptor, StdDescriptorWithIOInfo):
resource = ("fd", file_descriptor.get_type().value)
return FileDescriptorResource(resource)
else:
assert(False)
# unreachable
97 changes: 97 additions & 0 deletions compiler/annotations_utils/util_mapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# TODO: this file can properly be deleted

# imports from annotation framework
import sys
from config import get_path_annotation_repo
sys.path.insert(1, get_path_annotation_repo())
# for typing
# for use
from annotation_generation_new.datatypes.parallelizability.Mapper import Mapper

from definitions.ir.dfg_node import DFGNode
from annotations_utils.util_cmd_invocations import get_command_invocation_prefix_from_dfg_node
from util import log

def get_actual_mapper_from_node(node, parallelizer) -> Mapper:
assert(False)
cmd_inv_pref = get_command_invocation_prefix_from_dfg_node(node)
return parallelizer.get_actual_mapper(cmd_inv_pref)

def get_mapper_as_dfg_node_from_node(node, parallelizer, inputs, outputs) -> DFGNode:
assert(False)
mapper = get_actual_mapper_from_node(node, parallelizer)
log(f'mapper for cmd_name: {node.com_name}')
log(f'here mapper: {mapper}')
return DFGNode(inputs=inputs,
outputs=outputs,
com_name=mapper.cmd_name,
# com_options=node.com_options,
com_redirs=node.com_redirs,
com_assignments=node.com_assignments,
flag_option_list=mapper.flag_option_list,
positional_config_list=mapper.positional_config_list,
positional_input_list=None, # TODO: somehow from inputs, future shift
positional_output_list=None # TODO: somehow from outputs, future shift
# TODO:
# implicit_use_of_stdin = False,
# implicit_use_of_stdout = False,
# omitted for now since we do not consider nested parallelization
# parallelizer_list = None,
# cmd_related_properties = None,
)

## MOVED from dfg_node
## Get the file names of the outputs of the map commands. This
## differs if the command is stateless, pure that can be
## written as a map and a reduce, and a pure that can be
## written as a generalized map and reduce.
# BEGIN ANNO
# OLD
# def get_map_output_files(node, input_edge_ids, fileIdGen):
# NEW
def get_map_output_files(node, input_edge_ids, fileIdGen, parallelizer):
assert(False)
assert (node.is_parallelizable())
# TODO ANNO: How to substitute? @KK
if (node.com_category == "stateless"):
map_output_fids = [fileIdGen.next_ephemeral_file_id() for in_fid in input_edge_ids]
elif (node.is_pure_parallelizable()):
# BEGIN ANNO
# OLD
# map_output_fids = node.pure_get_map_output_files(input_edge_ids, fileIdGen)
# NEW
map_output_fids = pure_get_map_output_files(node, input_edge_ids, fileIdGen, parallelizer)
# END ANNO
else:
log("Unreachable code reached :(")
assert (False)
## This should be unreachable

return map_output_fids

## TODO: Fix this somewhere in the annotations and not in the code
# BEGIN ANNO
# OLD
# def pure_get_map_output_files(node, input_edge_ids, fileIdGen):
# NEW
def pure_get_map_output_files(node, input_edge_ids, fileIdGen, parallelizer):
assert(False)
assert (node.is_pure_parallelizable())
# BEGIN ANNO
# OLD
## The number of the mapper outputs defaults to 1
# if(node.com_mapper is None):
# number_outputs = 1
# else:
# number_outputs = node.com_mapper.num_outputs
# NEW
# TODO: which parallelizer did we choose?
actual_mapper = get_actual_mapper_from_node(node, parallelizer)
number_outputs = actual_mapper.num_outputs # defaults to 1 in class Mapper
# END ANNO

new_output_fids = [[fileIdGen.next_ephemeral_file_id() for i in range(number_outputs)]
for in_fid in input_edge_ids]
return new_output_fids


Loading