Skip to content

Commit

Permalink
Restrict ICaseString to input validation
Browse files Browse the repository at this point in the history
  • Loading branch information
MSK61 committed May 5, 2024
1 parent 0e8424a commit d4fdf46
Show file tree
Hide file tree
Showing 24 changed files with 203 additions and 423 deletions.
15 changes: 5 additions & 10 deletions examples/unified memory.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
"#\n",
"# author: Mohammed El-Afifi (ME)\n",
"#\n",
"# environment: Visual Studio Code 1.88.1, python 3.11.9, Fedora release\n",
"# 39 (Thirty Nine)\n",
"# environment: Visual Studio Code 1.89.0, python 3.11.9, Fedora release\n",
"# 40 (Forty)\n",
"#\n",
"# notes: This is a private program.\n",
"#\n",
Expand Down Expand Up @@ -69,8 +69,7 @@
"from processor_utils import units\n",
"from processor_utils.units import FuncUnit\n",
"import program_utils\n",
"import sim_services\n",
"from str_utils import ICaseString"
"import sim_services"
]
},
{
Expand All @@ -88,14 +87,10 @@
"metadata": {},
"outputs": [],
"source": [
"capabilities = [ICaseString(cap) for cap in [\"ALU\", \"MEM\"]]\n",
"capabilities = [\"ALU\", \"MEM\"]\n",
"fetch_unit, decode_unit, execute_unit, memory_unit, writeback_unit = (\n",
" units.UnitModel(\n",
" ICaseString(name),\n",
" 1,\n",
" capabilities,\n",
" units.LockInfo(rd_lock, wr_lock),\n",
" mem_acl,\n",
" name, 1, capabilities, units.LockInfo(rd_lock, wr_lock), mem_acl\n",
" )\n",
" for name, rd_lock, wr_lock, mem_acl in [\n",
" (\"F\", False, False, capabilities),\n",
Expand Down
90 changes: 46 additions & 44 deletions src/processor_utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
#
# author: Mohammed El-Afifi (ME)
#
# environment: Visual Studio Code 1.88.1, python 3.11.9, Fedora release
# environment: Visual Studio Code 1.89.0, python 3.11.9, Fedora release
# 40 (Forty)
#
# notes: This is a private program.
Expand All @@ -52,7 +52,7 @@
import os
import sys
import typing
from typing import Any, TypeVar
from typing import Any

from attr import field, frozen
from fastcore import foundation
Expand All @@ -78,15 +78,12 @@
UNIT_WLOCK_KEY,
)

_CapT = TypeVar("_CapT")
_InstrT = TypeVar("_InstrT")
_T = TypeVar("_T")
_UNIT_KEY: typing.Final = "unit"


def load_isa(
raw_isa: Iterable[Iterable[str]], capabilities: Iterable[ICaseString]
) -> dict[str, ICaseString]:
) -> dict[str, str]:
"""Transform the given raw description into an instruction set.
`raw_isa` is the raw description to extract an instruction set from.
Expand All @@ -110,7 +107,7 @@ class _CapabilityInfo:
def _add_capability(
unit: object,
cap: ICaseString,
cap_list: MutableSequence[ICaseString],
cap_list: MutableSequence[str],
unit_cap_reg: SelfIndexSet[ICaseString],
global_cap_reg: IndexedSet[_CapabilityInfo],
) -> None:
Expand Down Expand Up @@ -176,11 +173,11 @@ def _add_edge(


def _add_instr(
instr_registry: SelfIndexSet[_InstrT],
cap_registry: SelfIndexSet[_CapT],
instr: _InstrT,
cap: _CapT,
) -> _CapT:
instr_registry: SelfIndexSet[ICaseString],
cap_registry: SelfIndexSet[ICaseString],
instr: str,
cap: str,
) -> str:
"""Add an instruction to the instruction set.
`instr_registry` is the store of previously added instructions.
Expand All @@ -191,13 +188,12 @@ def _add_instr(
"""
_chk_instr(instr, instr_registry)
instr_registry.add(instr)
return _get_cap_name(cap, cap_registry)


def _add_new_cap(
cap: _CapabilityInfo,
cap_list: MutableSequence[ICaseString],
cap_list: MutableSequence[str],
unit_cap_reg: SelfIndexSet[ICaseString],
global_cap_reg: IndexedSet[_CapabilityInfo],
) -> None:
Expand All @@ -223,7 +219,7 @@ def _add_new_cap(
std_cap.unit,
)

cap_list.append(std_cap.name)
cap_list.append(std_cap.name.raw_str)
unit_cap_reg.add(cap.name)


Expand All @@ -250,7 +246,7 @@ def _add_unit(
_chk_unit_name(unit_name, unit_registry)
_chk_unit_width(unit)
processor.add_node(
unit_name,
unit[UNIT_NAME_KEY],
**{
UNIT_WIDTH_KEY: unit[UNIT_WIDTH_KEY],
UNIT_CAPS_KEY: _load_caps(unit, cap_registry),
Expand All @@ -264,25 +260,29 @@ def _add_unit(
unit_registry.add(unit_name)


def _chk_instr(instr: _T, instr_registry: SelfIndexSet[_T]) -> None:
def _chk_instr(instr: str, instr_registry: SelfIndexSet[ICaseString]) -> None:
"""Check the given instruction.
`instr` is the instruction.
`instr_registry` is the store of previously added instructions.
The function raises a DupElemError if the same instruction was
previously added to the instruction store.
previously added to the instruction store, otherwise it adds the new
instruction to the store.
"""
old_instr = instr_registry.get(instr)
new_instr = ICaseString(instr)
old_instr = instr_registry.get(new_instr)

if old_instr:
raise DupElemError(
f"Instruction ${DupElemError.NEW_ELEM_KEY} previously added as "
f"${DupElemError.OLD_ELEM_KEY}",
old_instr,
old_instr.raw_str,
instr,
)

instr_registry.add(new_instr)


def _add_rev_edges(graph: Graph) -> None:
"""Add reverse edges to the given graph.
Expand All @@ -301,7 +301,9 @@ def _add_rev_edges(graph: Graph) -> None:
graph.add_edges_from(chain.from_iterable(edges))


def _chk_unit_name(name: _T, name_registry: SelfIndexSet[_T]) -> None:
def _chk_unit_name(
name: ICaseString, name_registry: SelfIndexSet[ICaseString]
) -> None:
"""Check the given unit name.
`name` is the unit name.
Expand All @@ -316,8 +318,8 @@ def _chk_unit_name(name: _T, name_registry: SelfIndexSet[_T]) -> None:
raise DupElemError(
f"Functional unit ${DupElemError.NEW_ELEM_KEY} previously added as"
f" ${DupElemError.OLD_ELEM_KEY}",
old_name,
name,
old_name.raw_str,
name.raw_str,
)


Expand Down Expand Up @@ -369,7 +371,7 @@ def _create_graph(

def _create_isa(
isa_spec: Iterable[Iterable[str]], cap_registry: SelfIndexSet[ICaseString]
) -> dict[str, ICaseString]:
) -> dict[str, str]:
"""Create an instruction set of the given ISA dictionary.
`isa_spec` is the ISA specification to normalize.
Expand All @@ -380,18 +382,14 @@ def _create_isa(
"""
instr_registry = SelfIndexSet[ICaseString]()
return {
instr.upper(): _add_instr(
instr_registry,
cap_registry,
*(ICaseString(prop) for prop in [instr, cap]),
)
instr.upper(): _add_instr(instr_registry, cap_registry, instr, cap)
for instr, cap in isa_spec
}


def _get_acl_cap(
unit: object, cap: str, global_cap_reg: IndexedSet[_CapabilityInfo]
) -> ICaseString:
) -> str:
"""Return a supported ACL capability name.
`unit` is the unit to get a capability from whose memory ACL.
Expand All @@ -410,10 +408,12 @@ def _get_acl_cap(
"definition..."
)

return std_cap.name
return std_cap.name.raw_str


def _get_cap_name(capability: _T, cap_registry: SelfIndexSet[_T]) -> _T:
def _get_cap_name(
capability: str, cap_registry: SelfIndexSet[ICaseString]
) -> str:
"""Return a supported capability name.
`capability` is the name of the capability to validate.
Expand All @@ -422,14 +422,14 @@ def _get_cap_name(capability: _T, cap_registry: SelfIndexSet[_T]) -> _T:
name is supported, otherwise returns the supported capability name.
"""
std_cap = cap_registry.get(capability)
std_cap = cap_registry.get(ICaseString(capability))

if not std_cap:
raise UndefElemError(
f"Unsupported capability ${UndefElemError.ELEM_KEY}", capability
)

return std_cap
return std_cap.raw_str


def _get_preds(
Expand Down Expand Up @@ -465,7 +465,7 @@ def _get_proc_units(graph: DiGraph) -> Generator[FuncUnit, None, None]:

def _get_std_edge(
edge: Iterable[str], unit_registry: SelfIndexSet[ICaseString]
) -> Generator[ICaseString, None, None]:
) -> Generator[str, None, None]:
"""Return a validated edge.
`edge` is the edge to validate.
Expand All @@ -474,10 +474,10 @@ def _get_std_edge(
encountered.
"""
return (_get_unit_name(ICaseString(unit), unit_registry) for unit in edge)
return (_get_unit_name(unit, unit_registry) for unit in edge)


def _get_unit_entry(name: ICaseString, attrs: Mapping[str, Any]) -> UnitModel:
def _get_unit_entry(name: str, attrs: Mapping[str, Any]) -> UnitModel:
"""Create a unit map entry from the given attributes.
`name` is the unit name.
Expand Down Expand Up @@ -509,7 +509,7 @@ def _get_unit_graph(internal_units: Iterable[FuncUnit]) -> DiGraph:
return rev_graph


def _get_unit_name(unit: _T, unit_registry: SelfIndexSet[_T]) -> _T:
def _get_unit_name(unit: str, unit_registry: SelfIndexSet[ICaseString]) -> str:
"""Return a validated unit name.
`unit` is the name of the unit to validate.
Expand All @@ -518,28 +518,28 @@ def _get_unit_name(unit: _T, unit_registry: SelfIndexSet[_T]) -> _T:
name, otherwise returns the validated unit name.
"""
std_name = unit_registry.get(unit)
std_name = unit_registry.get(ICaseString(unit))

if not std_name:
raise UndefElemError(
f"Undefined functional unit ${UndefElemError.ELEM_KEY}", unit
)

return std_name
return std_name.raw_str


def _load_caps(
unit: Mapping[object, Iterable[str]],
cap_registry: IndexedSet[_CapabilityInfo],
) -> list[ICaseString]:
) -> list[str]:
"""Load the given unit capabilities.
`unit` is the unit to load whose capabilities.
`cap_registry` is the store of previously added capabilities.
The function returns a list of loaded capabilities.
"""
cap_list: list[ICaseString] = []
cap_list: list[str] = []
unit_cap_reg = SelfIndexSet[ICaseString]()

for cur_cap in unit[UNIT_CAPS_KEY]:
Expand All @@ -557,7 +557,7 @@ def _load_caps(
def _load_mem_acl(
unit: Mapping[object, Iterable[str]],
cap_registry: IndexedSet[_CapabilityInfo],
) -> Generator[ICaseString, None, None]:
) -> Generator[str, None, None]:
"""Load the given unit memory ACL.
`unit` is the unit to load whose memory ACL.
Expand Down Expand Up @@ -644,7 +644,9 @@ def get_abilities(processor: ProcessorDesc) -> frozenset[ICaseString]:
"""
in_units = chain(processor.in_out_ports, processor.in_ports)
return frozenset(
chain.from_iterable(port.capabilities for port in in_units)
chain.from_iterable(
map(ICaseString, port.capabilities) for port in in_units
)
)


Expand Down
Loading

0 comments on commit d4fdf46

Please sign in to comment.