Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[binexport] Refactor ISA guessing heuristic #43

Merged
merged 4 commits into from
Nov 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 80 additions & 48 deletions src/qbindiff/loader/backend/binexport.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from __future__ import annotations
import logging
import weakref
from typing import Any, TypeAlias
from typing import Any, TypeAlias, TYPE_CHECKING
from collections.abc import Iterator
from functools import cached_property

Expand All @@ -39,8 +39,11 @@
)
from qbindiff.loader.backend.utils import convert_operand_type
from qbindiff.loader import Structure
from qbindiff.utils import log_once
from qbindiff.loader.types import FunctionType, ReferenceType, ReferenceTarget, OperandType
from qbindiff.types import Addr

if TYPE_CHECKING:
from qbindiff.types import Addr

# Type aliases
beFunction: TypeAlias = binexport.function.FunctionBinExport
Expand All @@ -66,7 +69,7 @@ def capstone_context(arch, mode):
elif binexport_arch == "MIPS-32":
return capstone_context(capstone.CS_ARCH_MIPS, capstone.CS_MODE_32 | mode)
elif binexport_arch == "MIPS-64":
return capstone_context(capstone.CS_ARCH_MIPS, capstone.CS_MODE_32 | mode)
return capstone_context(capstone.CS_ARCH_MIPS, capstone.CS_MODE_64 | mode)

raise NotImplementedError(f"Architecture {binexport_arch} has not be implemented")

Expand Down Expand Up @@ -234,6 +237,56 @@ def __len__(self) -> int:
"""
return len(self.be_block)

def _guess_thumb_context(self, instr_bytes: bytes, mnemonic: str) -> int:
"""Guess wether the instruction is thumb or not"""

if len(instr_bytes) < 2: # Must be an error
raise ValueError(f"Instruction malformed of size {len(instr_bytes)} bytes.")

if len(instr_bytes) == 2: # Must be thumb
return capstone.CS_MODE_THUMB

# Might be either thumb or normal arm.
# There is no easy way of knowing whether a 4/8 bytes instruction is thumb or not
# and IDA sometimes likes to merge two instructions together (so two 4bytes thumb
# instructions might become a single 8bytes thumb instruction from IDA perspective).
# The only way of checking if the context is correct is by comparing the mnemonic
# of the capstone instruction with the one in BinExport.
# Of course we have to rely on heuristics to know whether the two mnemonics are the
# same or not.
log_once(
logging.WARNING,
f"Relying on heuristics to guess the context mode of the binary (thumb or not)",
)

# Save the original mode
arch = self.program.architecture_name

# Bruteforce-guessing the context
for i in range(2):
if i == 0: # Try with regular arm
capstone_mode = capstone.CS_MODE_ARM
elif i == 1: # Try with thumb
capstone_mode = capstone.CS_MODE_THUMB

# Disassemble the instruction and check the mnemonic
disassembler = _get_capstone_disassembler(arch, capstone_mode)
disasm = disassembler.disasm(instr_bytes, self.addr)
try:
instr = next(disasm)
# Check if the mnemonic is the same
if is_same_mnemonic(instr.mnemonic, mnemonic):
return capstone_mode
except StopIteration:
pass

# We have not being lucky
log_once(
logging.ERROR,
f"Cannot guess ISA of the program {self.program.name}." " Consider setting it manually",
)
raise Exception(f"Cannot guess ISA of the instruction at address {self.addr:#x}")

def _disassemble(
self, bb_asm: bytes, correct_mnemonic: str, correct_size: int
) -> list[capstone.CsInsn]:
Expand All @@ -248,51 +301,28 @@ def _disassemble(
"""

# Check if we already have a capstone context, if so use it
if self.program._cs:
return list(self.program._cs.disasm(bb_asm, self.addr))
if self.program.cs:
return list(self.program.cs.disasm(bb_asm, self.addr))

# Continue with the old method
instructions = []
mnemonic = None
size = None
arch = self.program.architecture_name
capstone_mode = 0
arm_mode = 0
capstone_mode = None

# No need to guess the context for these arch
if arch in ("x86", "x86-64"):
md = _get_capstone_disassembler(arch)
return list(md.disasm(bb_asm, self.addr))
if arch in ("x86", "x86-64", "MIPS-32", "MIPS-64", "ARM-64"):
pass

# Bruteforce-guessing the context
while size != correct_size or not is_same_mnemonic(mnemonic, correct_mnemonic):
# change mode
if arch == "ARM-32":
capstone_mode = 0
if arm_mode & 0b1:
capstone_mode |= capstone.CS_MODE_ARM
if arm_mode & 0b10:
capstone_mode |= capstone.CS_MODE_THUMB
if arm_mode > 0b11:
raise Exception(
"Cannot guess the instruction set of the instruction "
f"at address 0x{self.addr:x}"
)
arm_mode += 1

md = _get_capstone_disassembler(arch, capstone_mode)
disasm = md.disasm(bb_asm, self.addr)
try:
instr = next(disasm)
mnemonic = instr.mnemonic
size = instr.size
except StopIteration:
mnemonic = None
size = None
# For arm thumb use appropriate context guessing heuristics
elif arch == "ARM-32":
capstone_mode = self._guess_thumb_context(bb_asm[:correct_size], correct_mnemonic)

# Everything else not yet supported
else:
raise NotImplementedError(f"The architecture {arch} is not yet supported in QBinDiff")

instructions.append(instr)
instructions.extend(disasm)
return instructions
# Set the program wide disassembler
self.program.cs = _get_capstone_disassembler(arch, capstone_mode)
return list(self.program.cs.disasm(bb_asm, self.addr))

@property
def program(self) -> ProgramBackendBinExport:
Expand All @@ -315,7 +345,7 @@ def instructions(self) -> Iterator[InstructionBackendBinExport]:

# Then iterate over the instructions
return (
InstructionBackendBinExport(self.program._cs, instr) for instr in capstone_instructions
InstructionBackendBinExport(self.program.cs, instr) for instr in capstone_instructions
)

@property
Expand Down Expand Up @@ -397,19 +427,21 @@ def __init__(self, file: str, *, arch: str | None = None):
self.be_prog = binexport.ProgramBinExport(file)
self.architecture_name = self.be_prog.architecture
self._fun_names = {} # {fun_name : fun_address}
self._cs = None
self.cs = None

# Check if the architecture is set by the user
if arch:
# Parse the architecture
self._cs = parse_architecture_flag(arch)
if not self._cs:
self.cs = parse_architecture_flag(arch)
if not self.cs:
raise Exception("Unable to instantiate capstone context from given arch: %s" % arch)
else:
logging.warning(
"No architecture set but BinExport backend is used, falling back to guessing method"
logging.info(
"No architecture set but BinExport backend is used. If invalid instructions"
" are found consider setting manually the architecture"
)
self._cs = _get_capstone_disassembler(self.be_prog.architecture)
# self.cs will be set at basic block level
# self.cs = _get_capstone_disassembler(self.be_prog.architecture)

def __repr__(self) -> str:
return f"<{type(self).__name__}:{self.name}>"
Expand Down
2 changes: 1 addition & 1 deletion src/qbindiff/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@
Collection of utilities used internally.
"""

from .utils import is_debug, iter_csr_matrix
from .utils import is_debug, iter_csr_matrix, log_once
13 changes: 13 additions & 0 deletions src/qbindiff/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"""

from __future__ import annotations
from functools import cache
import logging
from typing import TYPE_CHECKING

Expand Down Expand Up @@ -45,3 +46,15 @@ def iter_csr_matrix(matrix: SparseMatrix) -> Generator[tuple[int, int, Any]]:
coo_matrix = matrix.tocoo()
for x, y, v in zip(coo_matrix.row, coo_matrix.col, coo_matrix.data):
yield (x, y, v)


@cache
def log_once(level: int, message: str) -> None:
"""
Log a message with the corresponding level only once.

:param level: The severity level of the logging
:param message: The message to log
"""

logging.log(level, message)