Skip to content

Commit

Permalink
Simplify the code (#12)
Browse files Browse the repository at this point in the history
* Simplify the code. Based on feedback from @markrwilliams I consolidated a bunch of the files into a single file (elf.py) for readability.

* added mypy support
* more typings: everything is typed now!
* remove duplicate assignment of columns in elf.py
* mypy passes
* PR feedback
* typo fix
  • Loading branch information
fzakaria authored Sep 22, 2023
1 parent c251cf9 commit fde5e41
Show file tree
Hide file tree
Showing 16 changed files with 380 additions and 398 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ lint: ## Run pep8, black, mypy linters.
flake8 sqlelf/
black --check sqlelf/
pyright
mypy --strict --install-types --non-interactive sqlelf tests

.PHONY: test
test: ## Run pytest primarily.
Expand Down
16 changes: 15 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,35 @@ readme = "README.md"
description = "Explore ELF objects through the power of SQL"
license = { file = "LICENSE" }
requires-python = ">=3.10,<4.0"

keywords = []
classifiers = [
"Development Status :: 4 - Beta",
"Programming Language :: Python",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy",
]
dependencies = [
"capstone >= 5.0.1",
"lief >=0.13.2",
"apsw >= 3.43.1.0",
"sh >= 2.0.6",
]

[project.urls]
Documentation = "https://github.com/fzakaria/sqlelf#readme"
Issues = "https://github.com/fzakaria/sqlelf/issues"
Source = "https://github.com/fzakaria/sqlelf"

[project.optional-dependencies]
dev = [
"black >= 23.7.0",
"isort >= 5.12.0",
"flake8 >= 6.1.0",
"pyright >= 1.1.325",
"pytest >= 7.4.0",
"mypy >= 1.0.0",
]

[tool.setuptools]
Expand Down
27 changes: 19 additions & 8 deletions sqlelf/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,23 @@
import os
import os.path
import sys
from dataclasses import dataclass, field
from functools import reduce
from typing import TextIO

import lief

from sqlelf import sql as api_sql


def start(args=sys.argv[1:], stdin=sys.stdin):
@dataclass
class ProgramArguments:
filenames: list[str] = field(default_factory=list)
sql: list[str] = field(default_factory=list)
recursive: bool = False


def start(args: list[str] = sys.argv[1:], stdin: TextIO = sys.stdin) -> None:
"""
Start the main CLI
Expand Down Expand Up @@ -37,7 +46,9 @@ def start(args=sys.argv[1:], stdin=sys.stdin):
help="Load all shared libraries needed by each file using ldd",
)

args = parser.parse_args(args)
program_args: ProgramArguments = parser.parse_args(
args, namespace=ProgramArguments()
)

# Iterate through our arguments and if one of them is a directory explode it out
filenames: list[str] = reduce(
Expand All @@ -46,7 +57,7 @@ def start(args=sys.argv[1:], stdin=sys.stdin):
lambda dir: [os.path.join(dir, f) for f in os.listdir(dir)]
if os.path.isdir(dir)
else [dir],
args.filenames,
program_args.filenames,
),
)
# Filter the list of filenames to those that are ELF files only
Expand All @@ -58,11 +69,11 @@ def start(args=sys.argv[1:], stdin=sys.stdin):

binaries: list[lief.Binary] = [lief.parse(filename) for filename in filenames]

sql_engine = api_sql.make_sql_engine(binaries, recursive=args.recursive)
sql_engine = api_sql.make_sql_engine(binaries, recursive=program_args.recursive)
shell = sql_engine.shell(stdin=stdin)

if args.sql:
for sql in args.sql:
shell.process_complete_line(sql)
if program_args.sql and len(program_args.filenames) > 0:
for sql in program_args.sql:
shell.process_complete_line(sql) # type: ignore[no-untyped-call]
else:
shell.cmdloop()
shell.cmdloop() # type: ignore[no-untyped-call]
265 changes: 265 additions & 0 deletions sqlelf/elf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import Any, Callable, Iterator, Sequence, cast

import apsw
import apsw.ext
import capstone # type: ignore
import lief


@dataclass
class Generator:
"""A generator for the virtual table SQLite module.
This class is needed because apsw wants to assign columns and
column_access to the generator function itself."""

columns: Sequence[str]
column_access: apsw.ext.VTColumnAccess
callable: Callable[[], Iterator[dict[str, Any]]]

def __call__(self) -> Iterator[dict[str, Any]]:
"""Call the generator should return an iterator of dictionaries.
The dictionaries should have keys that match the column names."""
return self.callable()

@staticmethod
def make_generator(generator: Callable[[], Iterator[dict[str, Any]]]) -> Generator:
"""Create a generator from a callable that returns
an iterator of dictionaries."""
columns, column_access = apsw.ext.get_column_names(next(generator()))
return Generator(columns, column_access, generator)


def make_dynamic_entries_generator(binaries: list[lief.Binary]) -> Generator:
"""Create the .dynamic section virtual table."""

def dynamic_entries_generator() -> Iterator[dict[str, Any]]:
for binary in binaries:
# super important that these accessors are pulled out of the tight loop
# as they can be costly
binary_name = binary.name
for entry in binary.dynamic_entries: # type: ignore
yield {"path": binary_name, "tag": entry.tag.name, "value": entry.value}

return Generator.make_generator(dynamic_entries_generator)


def make_headers_generator(binaries: list[lief.Binary]) -> Generator:
"""Create the ELF headers virtual table,"""

def headers_generator() -> Iterator[dict[str, Any]]:
for binary in binaries:
yield {
"path": binary.name,
"type": binary.header.file_type.name,
"machine": binary.header.machine_type.name,
"version": binary.header.identity_version.name,
"entry": binary.header.entrypoint,
}

return Generator.make_generator(headers_generator)


def make_instructions_generator(binaries: list[lief.Binary]) -> Generator:
"""Create the instructions virtual table.
This table includes dissasembled instructions from the executable sections"""

def instructions_generator() -> Iterator[dict[str, Any]]:
for binary in binaries:
# super important that these accessors are pulled out of the tight loop
# as they can be costly
binary_name = binary.name

for section in binary.sections:
if section.has(lief.ELF.SECTION_FLAGS.EXECINSTR):
data = bytes(section.content)
md = capstone.Cs(arch(binary), mode(binary))
# keep in mind that producing details costs more memory,
# complicates the internal operations and slows down
# the engine a bit, so only do that if needed.
md.detail = False

# super important that these accessors are pulled out
# of the tight loop as they can be costly
section_name = section.name
for address, size, mnemonic, op_str in md.disasm_lite(
data, section.virtual_address
):
yield {
"path": binary_name,
"section": section_name,
"mnemonic": mnemonic,
"address": address,
"operands": op_str,
}

return Generator.make_generator(instructions_generator)


def mode(binary: lief.Binary) -> int:
if binary.header.identity_class == lief.ELF.ELF_CLASS.CLASS64:
return cast(int, capstone.CS_MODE_64)
raise RuntimeError(f"Unknown mode for {binary.name}")


def arch(binary: lief.Binary) -> int:
if binary.header.machine_type == lief.ELF.ARCH.x86_64:
return cast(int, capstone.CS_ARCH_X86)
raise RuntimeError(f"Unknown machine type for {binary.name}")


def make_sections_generator(binaries: list[lief.Binary]) -> Generator:
"""Create the ELF sections virtual table."""

def sections_generator() -> Iterator[dict[str, Any]]:
for binary in binaries:
# super important that these accessors are pulled out of the tight loop
# as they can be costly
binary_name = binary.name
for section in binary.sections:
yield {
"path": binary_name,
"name": section.name,
"offset": section.offset,
"size": section.size,
"type": section.type.name,
"content": bytes(section.content),
}

return Generator.make_generator(sections_generator)


def coerce_section_name(name: str | None) -> str | None:
"""Return a section name or undefined if the name is empty."""
if name == "":
return "undefined"
return name


def make_strings_generator(binaries: list[lief.Binary]) -> Generator:
"""Create the ELF strings virtual table.
This goes through all string tables in the ELF binary and splits them on null bytes.
"""

def strings_generator() -> Iterator[dict[str, Any]]:
for binary in binaries:
strtabs = [
section
for section in binary.sections
if section.type == lief.ELF.SECTION_TYPES.STRTAB
]
# super important that these accessors are pulled out of the tight loop
# as they can be costly
binary_name = binary.name
for strtab in strtabs:
# The first byte is always the null byte in the STRTAB
# Python also treats the final null in the string by creating
# an empty item so we chop it off.
# https://stackoverflow.com/a/18970869
for string in str(strtab.content[1:-1], "utf-8").split("\x00"):
yield {"path": binary_name, "section": strtab.name, "value": string}

return Generator.make_generator(strings_generator)


def make_symbols_generator(binaries: list[lief.Binary]) -> Generator:
"""Create the ELF symbols virtual table."""

def symbols_generator() -> Iterator[dict[str, Any]]:
for binary in binaries:
# super important that these accessors are pulled out of the tight loop
# as they can be costly
binary_name = binary.name
for symbol in symbols(binary):
# The section index can be special numbers like 65521 or 65522
# that refer to special sections so they can't be indexed
section_name: str | None = next(
(
section.name
for shndx, section in enumerate(binary.sections)
if shndx == symbol.shndx
),
None,
)

yield {
"path": binary_name,
"name": symbol.name,
"demangled_name": symbol.demangled_name,
# A bit of detailed explanation here to explain these values.
# A symbol may point to the SHN_UNDEF section which is a good it's
# an "imported symbol" -- meaning it needs to be linked in.
# If the section is != SH_UNDEF then it is "exported" as it's
# logic resides within this shared object file.
# refs:
# https://github.com/lief-project/LIEF/blob/0875ee2467d5ae6628d8bf3f4f0b82ca5854c401/src/ELF/Symbol.cpp#L90
# https://stackoverflow.com/questions/12666253/elf-imports-and-exports
# https://www.m4b.io/elf/export/binary/analysis/2015/05/25/what-is-an-elf-export.html
"imported": symbol.imported,
"exported": symbol.exported,
"section": coerce_section_name(section_name),
"size": symbol.size,
# TODO(fzakaria): Better understand why is it auxiliary?
# this returns versions like GLIBC_2.2.5
"version": symbol.symbol_version.symbol_version_auxiliary.name
if symbol.symbol_version
and symbol.symbol_version.symbol_version_auxiliary
else None,
"type": symbol.type.name,
"value": symbol.value,
}

return Generator.make_generator(symbols_generator)


def symbols(binary: lief.Binary) -> Sequence[lief.ELF.Symbol]:
"""Use heuristic to either get static symbols or dynamic symbol table
The static symbol table is a superset of the dynamic symbol table.
However it is often stripped from binaries as it's not needed beyond
debugging.
This method uses the simplest heuristic of checking for its existence
to return the static symbol table.
A bad actor is free to strip arbitrarily from the static symbol table
and it would affect this method.
"""
static_symbols: Sequence[lief.ELF.Symbol] = binary.static_symbols # type: ignore
if len(static_symbols) > 0:
return static_symbols
return binary.dynamic_symbols # type: ignore


def register_virtual_tables(
connection: apsw.Connection, binaries: list[lief.Binary]
) -> None:
"""Register the virtual table modules."""
factory_and_names = [
(make_dynamic_entries_generator, "elf_dynamic_entries"),
(make_headers_generator, "elf_headers"),
(make_instructions_generator, "raw_elf_instructions"),
(make_sections_generator, "elf_sections"),
(make_strings_generator, "elf_strings"),
(make_symbols_generator, "raw_elf_symbols"),
]
for factory, name in factory_and_names:
generator = factory(binaries)
apsw.ext.make_virtual_module(connection, name, generator)
connection.execute(
"""
CREATE TEMP TABLE elf_instructions
AS SELECT * FROM raw_elf_instructions;
CREATE TEMP TABLE elf_symbols
AS SELECT * FROM raw_elf_symbols;
CREATE INDEX elf_symbols_path_idx ON elf_symbols (path);
CREATE INDEX elf_symbols_name_idx ON elf_symbols (name);
"""
)
Empty file removed sqlelf/elf/__init__.py
Empty file.
Loading

0 comments on commit fde5e41

Please sign in to comment.