Skip to content

Commit

Permalink
Unpacking CAN frames with JSONCAN in Python (#1271)
Browse files Browse the repository at this point in the history
### Changelist 
<!-- Give a list of the changes covered in this PR. This will help both
you and the reviewer keep this PR within scope. -->

Add the ability to unpack raw CAN frames to signal values in JSONCAN
Python. This can be used with telemetry to decode received packets, or
used to pull logged frames from the SD card.

### Testing Done
<!-- Outline the testing that was done to demonstrate the changes are
solid. This could be unit tests, integration tests, testing on the car,
etc. Include relevant code snippets, screenshots, etc as needed. -->

Was able to successfully decode CAN frames logged to the SD card, and
got expected values.
  • Loading branch information
gtaharaedmonds authored May 22, 2024
1 parent 43fb84a commit 15d1bd5
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 58 deletions.
63 changes: 47 additions & 16 deletions scripts/code_generation/jsoncan/src/can_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,6 @@
from .utils import bits_for_uint, bits_to_bytes, is_int


@dataclass(frozen=True)
class CanEnumItem:
"""
Dataclass for describing a single value table item.
"""

name: str
value: int


@dataclass(frozen=True)
class CanEnum:
"""
Expand All @@ -29,13 +19,13 @@ class CanEnum:
"""

name: str
items: List[CanEnumItem]
items: Dict[int, str] # Dict of enum value to enum item name

def max_val(self) -> int:
"""
Maximum value present in this value table's entries.
"""
return max(entry.value for entry in self.items)
return max(self.items.keys())

@staticmethod
def min_val() -> int:
Expand Down Expand Up @@ -208,7 +198,9 @@ class CanDatabase:

nodes: List[str] # List of names of the nodes on the bus
bus_config: CanBusConfig # Various bus params
msgs: List[CanMessage] # All messages being sent to the bus
msgs: Dict[
int, CanMessage
] # All messages being sent to the bus (dict of (ID to message)
shared_enums: List[CanEnum] # Enums used by all nodes
alerts: Dict[
str, Dict[CanAlert, AlertsEntry]
Expand All @@ -218,13 +210,13 @@ def tx_msgs_for_node(self, tx_node: str) -> List[CanMessage]:
"""
Return list of all CAN messages transmitted by a specific node.
"""
return [msg for msg in self.msgs if tx_node == msg.tx_node]
return [msg for msg in self.msgs.values() if tx_node == msg.tx_node]

def rx_msgs_for_node(self, rx_node: str) -> List[CanMessage]:
"""
Return list of all CAN messages received by a specific node.
"""
return [msg for msg in self.msgs if rx_node in msg.rx_nodes]
return [msg for msg in self.msgs.values() if rx_node in msg.rx_nodes]

def msgs_for_node(self, node: str) -> List[CanMessage]:
"""
Expand Down Expand Up @@ -285,7 +277,9 @@ def node_alerts_with_rx_check(
return self.node_alerts(tx_node, alert_type)
else:
alert_msg = next(
msg for msg in self.msgs if msg.name == f"{tx_node}_{alert_type}s"
msg
for msg in self.msgs.values()
if msg.name == f"{tx_node}_{alert_type}s"
)
return [
alert
Expand All @@ -298,3 +292,40 @@ def node_has_alert(self, node: str, alert_type: CanAlertType) -> bool:
Return whether or not a node transmits any alerts.
"""
return len(self.node_alerts(node, alert_type)) > 0

def unpack(self, id: int, data: bytes) -> Dict:
"""
Unpack a CAN dataframe.
Returns a dict with the signal name, value, and unit.
TODO: Also add packing!
"""
signals = []
for signal in self.msgs[id].signals:
# Interpret raw bytes as an int.
data_uint = int.from_bytes(data, byteorder="little", signed=False)

# Extract the bits representing the current signal.
data_shifted = data_uint >> signal.start_bit
bitmask = (1 << signal.bits) - 1
signal_bits = data_shifted & bitmask

# Interpret value as signed number via 2s complement.
if signal.signed:
if signal_bits & (1 << (signal.bits - 1)):
signal_bits = ~signal_bits & ((1 << signal.bits) - 1)
signal_bits += 1

# Decode the signal value using the scale/offset.
signal_value = signal_bits * signal.scale + signal.offset

# If the signal is an enum, set the value to the entry name.
if signal.enum is not None:
signal_value = signal.enum.items[signal_value]

# Append decoded signal's data.
signals.append(
{"name": signal.name, "value": signal_value, "unit": signal.unit}
)

return signals
Original file line number Diff line number Diff line change
Expand Up @@ -113,31 +113,26 @@ def _get_board_alert_code(self, alert_type: CanAlertType, comment: str):
get_alert = CFunc(
GET_BOARD_FAULT_CODE.format(alert_type=alert_type),
"uint8_t",
args=[
CVar("*alert_array",CTypesConfig.CAN_ALERT_INFO)
],
args=[CVar("*alert_array", CTypesConfig.CAN_ALERT_INFO)],
comment=f"Return whether or not a board has set a {comment}.",
)
get_alert.body.add_line("uint8_t element_num = 0;")
get_alert.body.add_line()
nodes_with_alerts = [
node for node in self._db.nodes if self._db.node_has_alert(node, alert_type)
]
for node in nodes_with_alerts:

for node in nodes_with_alerts:
for alert in self._db.node_alerts_with_rx_check(
node, self._node, alert_type
):

item = self._db.node_name_description(node, alert_type = alert_type)


if item[alert] == {} :
item = self._db.node_name_description(node, alert_type=alert_type)

if item[alert] == {}:
id = 0
description = ""

else:
(id,description) = item[alert]
(id, description) = item[alert]

if node == self._node:
get_alert.body.start_if(
Expand All @@ -147,10 +142,12 @@ def _get_board_alert_code(self, alert_type: CanAlertType, comment: str):
get_alert.body.start_if(
f"{CFuncsConfig.APP_RX_GET_SIGNAL.format(signal=alert)}()"
)

get_alert.body.add_line(f'alert_array[element_num].name = "{alert}";')
get_alert.body.add_line(f'alert_array[element_num].description = "{description}";')
get_alert.body.add_line(f'alert_array[element_num].id = {id};')
get_alert.body.add_line(
f'alert_array[element_num].description = "{description}";'
)
get_alert.body.add_line(f"alert_array[element_num].id = {id};")
get_alert.body.add_line("element_num++;")

get_alert.body.end_if()
Expand Down Expand Up @@ -189,7 +186,7 @@ def _public_functions(self) -> List[CFunc]:
# Alert setters
funcs.extend(self._set_alert_funcs(CanAlertType.WARNING))
funcs.extend(self._set_alert_funcs(CanAlertType.FAULT))

# Alert getters
funcs.extend(self._get_alert_funcs(CanAlertType.WARNING))
funcs.extend(self._get_alert_funcs(CanAlertType.FAULT))
Expand All @@ -201,13 +198,12 @@ def _public_functions(self) -> List[CFunc]:
# All board alert set checkers
funcs.append(self._any_alert_set_func(CanAlertType.WARNING, "warning"))
funcs.append(self._any_alert_set_func(CanAlertType.FAULT, "fault"))

# Fault and Warning code getters
funcs.append(self._get_board_alert_code (CanAlertType.WARNING, "warning"))
funcs.append(self._get_board_alert_code(CanAlertType.WARNING, "warning"))
funcs.append(self._get_board_alert_code(CanAlertType.FAULT, "fault"))

return funcs


def header(self):
cw = CWriter()
Expand Down Expand Up @@ -243,17 +239,16 @@ def header(self):
boards_enum.add_value(
CVar(ALERT_BOARD_ENUM_NAME.format(node=node.upper()), value=i)
)
cw.add_enum(boards_enum)
cw.add_enum(boards_enum)
cw.add_line()

fault_warining_struct = CStruct(CTypesConfig.CAN_ALERT_INFO)
fault_warining_struct.add_member(CVar("description","char*"))
fault_warining_struct.add_member(CVar("description", "char*"))
fault_warining_struct.add_member(CVar("name", "char*"))
fault_warining_struct.add_member(CVar("id", "uint16_t"))

cw.add_struct(fault_warining_struct)



# Add function prototypes
cw.add_line()
cw.add_header_comment("Function Prototypes")
Expand All @@ -277,7 +272,7 @@ def source(self):
cw.add_include('"app_canAlerts.h"')
cw.add_include('"app_canTx.h"')
cw.add_include('"app_canRx.h"')

# Add function definitions
cw.add_line()
cw.add_header_comment("Function Definitions")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ def header(self):
# Add found enums
for can_enum in can_enums:
enum = CEnum(can_enum.name)
for item in can_enum.items:
enum.add_value(CVar(name=item.name, value=item.value))
for item_value, item_name in can_enum.items.items():
enum.add_value(CVar(name=item_name, value=item_value))
enum.add_value(
CVar(
name=f"NUM_{pascal_to_screaming_snake_case(can_enum.name)}_CHOICES",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ class CModule(ABC):
ABC for a C module (i.e. pair of header .h and source .c files)
"""

def header(self) -> str: ...
def header(self) -> str:
...

def source(self) -> str: ...
def source(self) -> str:
...


@dataclass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ def _functions(self) -> List[CFunc]:
# Prepare header
func.body.add_comment("Prepare msg header")
func.body.add_var_declaration(CVar("tx_msg", CTypesConfig.CAN_MSG_STRUCT))
func.body.add_line(f"memset(&tx_msg, 0, sizeof({CTypesConfig.CAN_MSG_STRUCT}));")
func.body.add_line(
f"memset(&tx_msg, 0, sizeof({CTypesConfig.CAN_MSG_STRUCT}));"
)
func.body.add_line(f"tx_msg.std_id = {CMacrosConfig.id(msg.name)};")
func.body.add_line(f"tx_msg.dlc = {CMacrosConfig.bytes(msg.name)};")
func.body.add_line()
Expand Down Expand Up @@ -212,7 +214,9 @@ def source(self):
cw.add_header_comment("Static Variables")
cw.add_line()
cw.add_line("static uint32_t can_mode;")
cw.add_line(f"static void (*transmit_func)(const {CTypesConfig.CAN_MSG_STRUCT}* tx_msg);")
cw.add_line(
f"static void (*transmit_func)(const {CTypesConfig.CAN_MSG_STRUCT}* tx_msg);"
)
cw.add_line()

# Add static function definitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def source(self) -> str:
value_tables_text = ""
cycle_time_attributes_text = ""
signal_start_values_text = ""
for msg in self._db.msgs:
for msg in self._db.msgs.values():
# Generate text for CAN message
msgs_text += self._dbc_message(msg=msg, tx_node=msg.tx_node)

Expand Down Expand Up @@ -176,7 +176,10 @@ def _dbc_value_table(self, signal: CanSignal, msg_id: int) -> str:
Format and return DBC value table.
"""
entries_text = " ".join(
[f'{entry.value} "{entry.name}"' for entry in signal.enum.items]
[
f'{item_value} "{item_name}"'
for item_value, item_name in signal.enum.items.items()
]
)

return DBC_VALUE_TABLE_TEMPLATE.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ def __init__(self, can_data_dir: str):
self._messages: dict[str, CanMessage] = {} # Dict of msg names to msg objects
self._enums: dict[str, CanEnum] = {} # Dict of enum names to enum objects
self._shared_enums: list[CanEnum] = [] # Set of shared enums
self._alerts: dict[str, dict[CanAlert, AlertsEntry]] = (
{}
) # Dict of node names to node's alerts
self._alerts: dict[
str, dict[CanAlert, AlertsEntry]
] = {} # Dict of node names to node's alerts
self._alert_descriptions = {} # TODO this is not used

self._parse_json_data(can_data_dir=can_data_dir)
Expand All @@ -62,7 +62,7 @@ def make_database(self) -> CanDatabase:
return CanDatabase(
nodes=self._nodes,
bus_config=self._bus_cfg,
msgs=list(self._messages.values()),
msgs={msg.id: msg for msg in self._messages.values()},
shared_enums=self._shared_enums,
alerts=self._alerts,
)
Expand Down Expand Up @@ -401,21 +401,21 @@ def _get_parsed_can_enum(enum_name: str, enum_entries: dict[str, int]) -> CanEnu
"""
Parse JSON data dictionary representing a CAN enum.
"""
items = []
items = {}
for name, value in enum_entries.items():
if value < 0:
raise InvalidCanJson(
f"Negative enum value found for enum '{enum_name}', which is not supported. Use only positive integers or zero."
)

if value in {item.value for item in items}:
if value in items:
raise InvalidCanJson(
f"Repeated value {value} for enum '{enum_name}', which is not allowed (values must be unique)."
)

items.append(CanEnumItem(name=name, value=value))
items[value] = name

if 0 not in {item.value for item in items}:
if 0 not in items:
raise InvalidCanJson(f"Enum '{enum_name}' must start at 0.")

return CanEnum(name=enum_name, items=items)
Expand Down

0 comments on commit 15d1bd5

Please sign in to comment.