Skip to content

Commit

Permalink
Make the tables that are registered configurable
Browse files Browse the repository at this point in the history
Make the list of tables configurable in the API.
Some of the tables are quite expensive to generate such a instructions
or symbols.

Allow the users to configure which tables to generate when they create a
sql_engine.
  • Loading branch information
fzakaria committed Sep 26, 2023
1 parent 5a1264d commit 1a09e70
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 70 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ lint: ## Run pep8, black, mypy linters.
.PHONY: test
test: ## Run pytest primarily.
pytest
pytest -m "slow"

.PHONY: coverage
coverage:
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ skip = [".git", "result"]
profile = "black"

[tool.pytest.ini_options]
addopts = ""
addopts = "-m 'not slow' --strict-markers"
markers = ["slow: marks tests as slow (deselect with '-m \"not slow\"')"]

[tool.pyright]
exclude = ["**/__pycache__", "sqlelf/_version.py"]
Expand Down
216 changes: 149 additions & 67 deletions sqlelf/elf.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import Any, Callable, Iterator, Sequence, cast
from enum import Flag, auto
from typing import Any, Callable, Iterator, Optional, Sequence, cast

import apsw
import apsw.ext
Expand Down Expand Up @@ -35,7 +36,40 @@ def make_generator(
return Generator(columns, apsw.ext.VTColumnAccess.By_Name, generator)


def make_dynamic_entries_generator(binaries: list[lief.Binary]) -> Generator:
class GeneratorFlag(Flag):
NONE = 0
DYNAMIC_ENTRIES = auto()
HEADERS = auto()
INSTRUCTIONS = auto()
SECTIONS = auto()
SYMBOLS = auto()
STRINGS = auto()
VERSION_REQUIREMENTS = auto()
VERSION_DEFINITIONS = auto()

@classmethod
def ALL(cls: type[GeneratorFlag]) -> GeneratorFlag:
retval = cls.NONE
for member in cls.__members__.values():
retval |= member
return retval


@dataclass
class MakeGeneratorResponse:
"""A response from a generator factory.
Contains everything needed to register the virtual table."""

generator: Generator
table_name: str
flag: GeneratorFlag
sql: Optional[str] = None


def make_dynamic_entries_generator(
binaries: list[lief.Binary],
) -> MakeGeneratorResponse:
"""Create the .dynamic section virtual table."""

def dynamic_entries_generator() -> Iterator[dict[str, Any]]:
Expand All @@ -46,13 +80,17 @@ def dynamic_entries_generator() -> Iterator[dict[str, Any]]:
for entry in binary.dynamic_entries: # type: ignore
yield {"path": binary_name, "tag": entry.tag.name, "value": entry.value}

return Generator.make_generator(
["path", "tag", "value"],
dynamic_entries_generator,
return MakeGeneratorResponse(
Generator.make_generator(
["path", "tag", "value"],
dynamic_entries_generator,
),
"elf_dynamic_entries",
GeneratorFlag.DYNAMIC_ENTRIES,
)


def make_headers_generator(binaries: list[lief.Binary]) -> Generator:
def make_headers_generator(binaries: list[lief.Binary]) -> MakeGeneratorResponse:
"""Create the ELF headers virtual table,"""

def headers_generator() -> Iterator[dict[str, Any]]:
Expand All @@ -65,13 +103,17 @@ def headers_generator() -> Iterator[dict[str, Any]]:
"entry": binary.header.entrypoint,
}

return Generator.make_generator(
["path", "type", "machine", "version", "entry"],
headers_generator,
return MakeGeneratorResponse(
Generator.make_generator(
["path", "type", "machine", "version", "entry"],
headers_generator,
),
"elf_headers",
GeneratorFlag.HEADERS,
)


def make_instructions_generator(binaries: list[lief.Binary]) -> Generator:
def make_instructions_generator(binaries: list[lief.Binary]) -> MakeGeneratorResponse:
"""Create the instructions virtual table.
This table includes dissasembled instructions from the executable sections"""
Expand Down Expand Up @@ -105,9 +147,15 @@ def instructions_generator() -> Iterator[dict[str, Any]]:
"operands": op_str,
}

return Generator.make_generator(
["path", "section", "mnemonic", "address", "operands"],
instructions_generator,
return MakeGeneratorResponse(
Generator.make_generator(
["path", "section", "mnemonic", "address", "operands"],
instructions_generator,
),
"raw_elf_instructions",
GeneratorFlag.INSTRUCTIONS,
"""CREATE TEMP TABLE elf_instructions
AS SELECT * FROM raw_elf_instructions;""",
)


Expand All @@ -123,7 +171,7 @@ def arch(binary: lief.Binary) -> int:
raise RuntimeError(f"Unknown machine type for {binary.name}")


def make_sections_generator(binaries: list[lief.Binary]) -> Generator:
def make_sections_generator(binaries: list[lief.Binary]) -> MakeGeneratorResponse:
"""Create the ELF sections virtual table."""

def sections_generator() -> Iterator[dict[str, Any]]:
Expand All @@ -141,9 +189,13 @@ def sections_generator() -> Iterator[dict[str, Any]]:
"content": bytes(section.content),
}

return Generator.make_generator(
["path", "name", "offset", "size", "type", "content"],
sections_generator,
return MakeGeneratorResponse(
Generator.make_generator(
["path", "name", "offset", "size", "type", "content"],
sections_generator,
),
"elf_sections",
GeneratorFlag.SECTIONS,
)


Expand All @@ -154,7 +206,7 @@ def coerce_section_name(name: str | None) -> str | None:
return name


def make_strings_generator(binaries: list[lief.Binary]) -> Generator:
def make_strings_generator(binaries: list[lief.Binary]) -> MakeGeneratorResponse:
"""Create the ELF strings virtual table.
This goes through all string tables in the ELF binary and splits them on null bytes.
Expand Down Expand Up @@ -188,9 +240,13 @@ def strings_generator() -> Iterator[dict[str, Any]]:
"offset": offset + 1,
}

return Generator.make_generator(
["path", "section", "value", "offset"],
strings_generator,
return MakeGeneratorResponse(
Generator.make_generator(
["path", "section", "value", "offset"],
strings_generator,
),
"elf_strings",
GeneratorFlag.STRINGS,
)


Expand All @@ -207,7 +263,7 @@ def split_with_index(str: str, delimiter: str) -> list[tuple[int, str]]:
return result


def make_symbols_generator(binaries: list[lief.Binary]) -> Generator:
def make_symbols_generator(binaries: list[lief.Binary]) -> MakeGeneratorResponse:
"""Create the ELF symbols virtual table."""

def symbols_generator() -> Iterator[dict[str, Any]]:
Expand Down Expand Up @@ -254,24 +310,32 @@ def symbols_generator() -> Iterator[dict[str, Any]]:
"value": symbol.value,
}

return Generator.make_generator(
[
"path",
"name",
"demangled_name",
"imported",
"exported",
"section",
"size",
"version",
"type",
"value",
],
symbols_generator,
return MakeGeneratorResponse(
Generator.make_generator(
[
"path",
"name",
"demangled_name",
"imported",
"exported",
"section",
"size",
"version",
"type",
"value",
],
symbols_generator,
),
"raw_elf_symbols",
GeneratorFlag.SYMBOLS,
"""CREATE TEMP TABLE elf_symbols
AS SELECT * FROM raw_elf_symbols;
CREATE INDEX elf_symbols_path_idx ON elf_symbols (path);
CREATE INDEX elf_symbols_name_idx ON elf_symbols (name);""",
)


def make_version_requirements(binaries: list[lief.Binary]) -> Generator:
def make_version_requirements(binaries: list[lief.Binary]) -> MakeGeneratorResponse:
"""Create the ELF version requirements virtual table.
This should match the values found in .gnu.version_r section.
Expand All @@ -292,12 +356,17 @@ def version_requirements_generator() -> Iterator[dict[str, Any]]:
"name": aux_requirement.name,
}

return Generator.make_generator(
["path", "file", "name"], version_requirements_generator
return MakeGeneratorResponse(
Generator.make_generator(
["path", "file", "name"],
version_requirements_generator,
),
"elf_version_requirements",
GeneratorFlag.VERSION_REQUIREMENTS,
)


def make_version_definitions(binaries: list[lief.Binary]) -> Generator:
def make_version_definitions(binaries: list[lief.Binary]) -> MakeGeneratorResponse:
"""Create the ELF version requirements virtual table.
This should match the values found in .gnu.version_d section.
Expand All @@ -318,8 +387,13 @@ def version_definitions_generator() -> Iterator[dict[str, Any]]:
"flags": flags,
}

return Generator.make_generator(
["path", "name", "flags"], version_definitions_generator
return MakeGeneratorResponse(
Generator.make_generator(
["path", "name", "flags"],
version_definitions_generator,
),
"elf_version_definitions",
GeneratorFlag.VERSION_DEFINITIONS,
)


Expand All @@ -343,30 +417,38 @@ def symbols(binary: lief.Binary) -> Sequence[lief.ELF.Symbol]:


def register_virtual_tables(
connection: apsw.Connection, binaries: list[lief.Binary]
connection: apsw.Connection,
binaries: list[lief.Binary],
flags: GeneratorFlag = GeneratorFlag.ALL(),
) -> None:
"""Register the virtual table modules."""
factory_and_names = [
(make_dynamic_entries_generator, "elf_dynamic_entries"),
(make_headers_generator, "elf_headers"),
(make_instructions_generator, "raw_elf_instructions"),
(make_sections_generator, "elf_sections"),
(make_strings_generator, "elf_strings"),
(make_symbols_generator, "raw_elf_symbols"),
(make_version_requirements, "elf_version_requirements"),
(make_version_definitions, "elf_version_definitions"),
"""Register the virtual table modules.
You can make the SQL engine more speedy by only specifying the
Generators (virtual tables) that you care about via the flags argument.
Args:
connection: the connection to register the virtual tables on
binaries: the list of binaries to analyze
flags: the bitwise flags which controls which virtual table to enable"""
generator_factories = [
make_dynamic_entries_generator,
make_headers_generator,
make_instructions_generator,
make_sections_generator,
make_strings_generator,
make_symbols_generator,
make_version_requirements,
make_version_definitions,
]
for factory, name in factory_and_names:
generator = factory(binaries)
apsw.ext.make_virtual_module(connection, name, generator)
connection.execute(
"""
CREATE TEMP TABLE elf_instructions
AS SELECT * FROM raw_elf_instructions;
CREATE TEMP TABLE elf_symbols
AS SELECT * FROM raw_elf_symbols;
CREATE INDEX elf_symbols_path_idx ON elf_symbols (path);
CREATE INDEX elf_symbols_name_idx ON elf_symbols (name);
"""
)
for factory in generator_factories:
generator_response = factory(binaries)

if generator_response.flag not in flags:
continue

apsw.ext.make_virtual_module(
connection, generator_response.table_name, generator_response.generator
)

if generator_response.sql:
connection.execute(generator_response.sql)
14 changes: 12 additions & 2 deletions sqlelf/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,23 @@ def find_libraries(binary: lief.Binary) -> Dict[str, str]:
return result


def make_sql_engine(filenames: list[str], recursive: bool = False) -> SQLEngine:
def make_sql_engine(
filenames: list[str],
recursive: bool = False,
flags: elf.GeneratorFlag = elf.GeneratorFlag.ALL(),
) -> SQLEngine:
"""Create a SQL engine from a list of binaries
You can make the SQL engine more speedy by only specifying the
Generators (virtual tables) that you care about via the flags argument.
The INSTRUCTIONS and SYMBOLS table are typically quite expensive to generate
if they are not
Args:
filenames: the list of binaries to analyze -- should be absolute path
recursive: whether to recursively load all shared
libraries needed by each binary
flags: the flags to use when generating the virtual tables
"""
binaries: list[lief.Binary] = [
lief.parse(filename) for filename in filenames if lief.is_elf(filename)
Expand All @@ -88,5 +98,5 @@ def make_sql_engine(filenames: list[str], recursive: bool = False) -> SQLEngine:
)
binaries = binaries + [lief.parse(library) for library in shared_libraries_set]

elf.register_virtual_tables(connection, binaries)
elf.register_virtual_tables(connection, binaries, flags)
return SQLEngine(connection)
Loading

0 comments on commit 1a09e70

Please sign in to comment.