Skip to content

Commit

Permalink
Simplify the code
Browse files Browse the repository at this point in the history
  • Loading branch information
fzakaria committed Sep 22, 2023
1 parent c251cf9 commit 985d4a5
Show file tree
Hide file tree
Showing 16 changed files with 364 additions and 385 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ lint: ## Run pep8, black, mypy linters.
flake8 sqlelf/
black --check sqlelf/
pyright
mypy --strict --install-types --non-interactive sqlelf tests

.PHONY: test
test: ## Run pytest primarily.
Expand Down
16 changes: 15 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,35 @@ readme = "README.md"
description = "Explore ELF objects through the power of SQL"
license = { file = "LICENSE" }
requires-python = ">=3.10,<4.0"

keywords = []
classifiers = [
"Development Status :: 4 - Beta",
"Programming Language :: Python",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy",
]
dependencies = [
"capstone >= 5.0.1",
"lief >=0.13.2",
"apsw >= 3.43.1.0",
"sh >= 2.0.6",
]

[project.urls]
Documentation = "https://github.com/fzakaria/sqlelf#readme"
Issues = "https://github.com/fzakaria/sqlelf/issues"
Source = "https://github.com/fzakaria/sqlelf"

[project.optional-dependencies]
dev = [
"black >= 23.7.0",
"isort >= 5.12.0",
"flake8 >= 6.1.0",
"pyright >= 1.1.325",
"pytest >= 7.4.0",
"mypy >= 1.0.0",
]

[tool.setuptools]
Expand Down
11 changes: 10 additions & 1 deletion sqlelf/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,18 @@
import lief

from sqlelf import sql as api_sql
from typing import TextIO
from dataclasses import dataclass


def start(args=sys.argv[1:], stdin=sys.stdin):
@dataclass
class ProgramArguments:
filenames: list[str]
sql: list[str]
recursive: bool = False


def start(args: list[str] = sys.argv[1:], stdin: TextIO = sys.stdin):
"""
Start the main CLI
Expand Down
264 changes: 264 additions & 0 deletions sqlelf/elf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
from dataclasses import dataclass
from typing import Any, Callable, Iterator, Sequence, cast

import apsw
import apsw.ext
import capstone # type: ignore
import lief


@dataclass
class Generator:
"""A generator for the virtual table SQLite module."""

columns: Sequence[str]
column_access: apsw.ext.VTColumnAccess
callable: Callable[[], Iterator[dict[str, Any]]]

def __call__(self) -> Iterator[dict[str, Any]]:
"""Call the generator should return an iterator of dictionaries.
The dictionaries should have keys that match the column names."""
return self.callable()


def make_dynamic_entries_generator(binaries: list[lief.Binary]) -> Generator:
"""Create the .dynamic section virtual table."""

def _generator() -> Iterator[dict[str, Any]]:
for binary in binaries:
# super important that these accessors are pulled out of the tight loop
# as they can be costly
binary_name = binary.name
for entry in binary.dynamic_entries: # type: ignore
yield {"path": binary_name, "tag": entry.tag.name, "value": entry.value}

columns, column_access = apsw.ext.get_column_names(next(_generator()))
return Generator(columns, column_access, _generator)


def make_headers_generator(binaries: list[lief.Binary]) -> Generator:
"""Create the ELF headers virtual table,"""

def _generator() -> Iterator[dict[str, Any]]:
for binary in binaries:
yield {
"path": binary.name,
"type": binary.header.file_type.name,
"machine": binary.header.machine_type.name,
"version": binary.header.identity_version.name,
"entry": binary.header.entrypoint,
}

columns, column_access = apsw.ext.get_column_names(next(_generator()))
return Generator(columns, column_access, _generator)


def make_instructions_generator(binaries: list[lief.Binary]) -> Generator:
"""Create the instructions virtual table.
This table includes dissasembled instructions from the executable sections"""

def _generator() -> Iterator[dict[str, Any]]:
for binary in binaries:
# super important that these accessors are pulled out of the tight loop
# as they can be costly
binary_name = binary.name

for section in binary.sections:
if section.has(lief.ELF.SECTION_FLAGS.EXECINSTR):
data = bytes(section.content)
md = capstone.Cs(arch(binary), mode(binary))
# keep in mind that producing details costs more memory,
# complicates the internal operations and slows down
# the engine a bit, so only do that if needed.
md.detail = False

# super important that these accessors are pulled out
# of the tight loop as they can be costly
section_name = section.name
for address, size, mnemonic, op_str in md.disasm_lite(
data, section.virtual_address
):
yield {
"path": binary_name,
"section": section_name,
"mnemonic": mnemonic,
"address": address,
"operands": op_str,
}

columns, column_access = apsw.ext.get_column_names(next(_generator()))
return Generator(columns, column_access, _generator)


def mode(binary: lief.Binary) -> int:
if binary.header.identity_class == lief.ELF.ELF_CLASS.CLASS64:
return cast(int, capstone.CS_MODE_64)
raise Exception(f"Unknown mode for {binary.name}")


def arch(binary: lief.Binary) -> int:
if binary.header.machine_type == lief.ELF.ARCH.x86_64:
return cast(int, capstone.CS_ARCH_X86)
raise Exception(f"Unknown machine type for {binary.name}")


def make_sections_generator(binaries: list[lief.Binary]) -> Generator:
"""Create the ELF sections virtual table."""

def _generator() -> Iterator[dict[str, Any]]:
for binary in binaries:
# super important that these accessors are pulled out of the tight loop
# as they can be costly
binary_name = binary.name
for section in binary.sections:
yield {
"path": binary_name,
"name": section.name,
"offset": section.offset,
"size": section.size,
"type": section.type.name,
"content": bytes(section.content),
}

columns, column_access = apsw.ext.get_column_names(next(_generator()))
return Generator(columns, column_access, _generator)


def coerce_section_name(name: str | None) -> str | None:
"""Return a section name or undefined if the name is empty."""
if name == "":
return "undefined"
return name


def make_strings_generator(binaries: list[lief.Binary]) -> Generator:
"""Create the ELF strings virtual table.
This goes through all string tables in the ELF binary and splits them on null bytes.
"""

def _generator() -> Iterator[dict[str, Any]]:
for binary in binaries:
strtabs = [
section
for section in binary.sections
if section.type == lief.ELF.SECTION_TYPES.STRTAB
]
# super important that these accessors are pulled out of the tight loop
# as they can be costly
binary_name = binary.name
for strtab in strtabs:
# The first byte is always the null byte in the STRTAB
# Python also treats the final null in the string by creating
# an empty item so we chop it off.
# https://stackoverflow.com/a/18970869
for string in str(strtab.content[1:-1], "utf-8").split("\x00"):
yield {"path": binary_name, "section": strtab.name, "value": string}

columns, column_access = apsw.ext.get_column_names(next(_generator()))
return Generator(columns, column_access, _generator)


def make_symbols_generator(binaries: list[lief.Binary]) -> Generator:
"""Create the ELF symbols virtual table."""

def _generator() -> Iterator[dict[str, Any]]:
for binary in binaries:
# super important that these accessors are pulled out of the tight loop
# as they can be costly
binary_name = binary.name
for symbol in symbols(binary):
# The section index can be special numbers like 65521 or 65522
# that refer to special sections so they can't be indexed
section_name: str | None = next(
(
section.name
for shndx, section in enumerate(binary.sections)
if shndx == symbol.shndx
),
None,
)

yield {
"path": binary_name,
"name": symbol.name,
"demangled_name": symbol.demangled_name,
# A bit of detailed explanation here to explain these values.
# A symbol may point to the SHN_UNDEF section which is a good it's
# an "imported symbol" -- meaning it needs to be linked in.
# If the section is != SH_UNDEF then it is "exported" as it's
# logic resides within this shared object file.
# refs:
# https://github.com/lief-project/LIEF/blob/0875ee2467d5ae6628d8bf3f4f0b82ca5854c401/src/ELF/Symbol.cpp#L90
# https://stackoverflow.com/questions/12666253/elf-imports-and-exports
# https://www.m4b.io/elf/export/binary/analysis/2015/05/25/what-is-an-elf-export.html
"imported": symbol.imported,
"exported": symbol.exported,
"section": coerce_section_name(section_name),
"size": symbol.size,
# TODO(fzakaria): Better understand why is it auxiliary?
# this returns versions like GLIBC_2.2.5
"version": symbol.symbol_version.symbol_version_auxiliary.name
if symbol.symbol_version
and symbol.symbol_version.symbol_version_auxiliary
else None,
"type": symbol.type.name,
"value": symbol.value,
}

columns, column_access = apsw.ext.get_column_names(next(_generator()))
return Generator(columns, column_access, _generator)


def symbols(binary: lief.Binary) -> Sequence[lief.ELF.Symbol]:
"""Use heuristic to either get static symbols or dynamic symbol table
The static symbol table is a superset of the dynamic symbol table.
However it is often stripped from binaries as it's not needed beyond
debugging.
This method uses the simplest heuristic of checking for it's existence
to return the static symbol table.
A bad actor is free to strip arbitrarily from the static symbol table
and it would affect this method.
"""
static_symbols: Sequence[lief.ELF.Symbol] = binary.static_symbols # type: ignore
if len(static_symbols) > 0:
return static_symbols
return binary.dynamic_symbols # type: ignore


def register_virtual_tables(
connection: apsw.Connection, binaries: list[lief.Binary]
) -> None:
"""Register the virtual table modules."""
factory_and_names = [
(make_dynamic_entries_generator, "elf_dynamic_entries"),
(make_headers_generator, "elf_headers"),
(make_instructions_generator, "raw_elf_instructions"),
(make_sections_generator, "elf_sections"),
(make_strings_generator, "elf_strings"),
(make_symbols_generator, "raw_elf_symbols"),
]
for factory, name in factory_and_names:
print(name)
generator = factory(binaries)
# setup columns and access by providing an example of the first entry returned
generator.columns, generator.column_access = apsw.ext.get_column_names(
next(generator())
)
apsw.ext.make_virtual_module(connection, name, generator)
connection.execute(
"""
CREATE TEMP TABLE elf_instructions
AS SELECT * FROM raw_elf_instructions;
CREATE TEMP TABLE elf_symbols
AS SELECT * FROM raw_elf_symbols;
CREATE INDEX elf_symbols_path_idx ON elf_symbols (path);
CREATE INDEX elf_symbols_name_idx ON elf_symbols (name);
"""
)
Empty file removed sqlelf/elf/__init__.py
Empty file.
31 changes: 0 additions & 31 deletions sqlelf/elf/dynamic.py

This file was deleted.

28 changes: 0 additions & 28 deletions sqlelf/elf/header.py

This file was deleted.

Loading

0 comments on commit 985d4a5

Please sign in to comment.