init publish
							parent
							
								
									2faca77f11
								
							
						
					
					
						commit
						c4bde9632a
					
				| 
						 | 
				
			
			@ -0,0 +1,13 @@
 | 
			
		|||
FROM ubuntu:22.04
 | 
			
		||||
ENV DEBIAN_FRONTEND noninteractive
 | 
			
		||||
 | 
			
		||||
RUN apt-get update && apt-get install --yes \
 | 
			
		||||
    python3-capstone \
 | 
			
		||||
    python3-setuptools \
 | 
			
		||||
    python3-sqlalchemy \
 | 
			
		||||
    && apt-get clean
 | 
			
		||||
 | 
			
		||||
COPY setup.py /app/
 | 
			
		||||
COPY subdisassem /app/subdisassem/
 | 
			
		||||
WORKDIR /app/
 | 
			
		||||
RUN python3 setup.py install
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,3 @@
 | 
			
		|||
# subdisassem
 | 
			
		||||
 | 
			
		||||
- [capstone python bindings](https://github.com/capstone-engine/capstone/tree/master/bindings/python)
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,11 @@
 | 
			
		|||
version: "3"
 | 
			
		||||
 | 
			
		||||
services:
 | 
			
		||||
  subdisassem:
 | 
			
		||||
    image: subdisassem
 | 
			
		||||
    build:
 | 
			
		||||
      context: .
 | 
			
		||||
    volumes:
 | 
			
		||||
      - ./firmware:/firmware
 | 
			
		||||
    working_dir: /firmware
 | 
			
		||||
    command: subdisassem -b firmware.bin
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,17 @@
 | 
			
		|||
from setuptools import setup
 | 
			
		||||
 | 
			
		||||
setup(
 | 
			
		||||
    name="subdisassem",
 | 
			
		||||
    version="0.0.1",
 | 
			
		||||
    packages=["subdisassem"],
 | 
			
		||||
    entry_points={
 | 
			
		||||
        "console_scripts": [
 | 
			
		||||
            "subdisassem = subdisassem:subdisassem_script",
 | 
			
		||||
        ],
 | 
			
		||||
    },
 | 
			
		||||
    python_requires=">3",
 | 
			
		||||
    install_requires=[
 | 
			
		||||
        "capstone",
 | 
			
		||||
        "SQLAlchemy",
 | 
			
		||||
    ],
 | 
			
		||||
)
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1 @@
 | 
			
		|||
from .scripts import subdisassem_script
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,133 @@
 | 
			
		|||
from capstone import Cs
 | 
			
		||||
from capstone import (
 | 
			
		||||
    CS_ARCH_ARM,
 | 
			
		||||
    CS_ARCH_ARM64,
 | 
			
		||||
    CS_ARCH_MIPS,
 | 
			
		||||
    CS_ARCH_PPC,
 | 
			
		||||
    CS_ARCH_SPARC,
 | 
			
		||||
    CS_ARCH_SYSZ,
 | 
			
		||||
    CS_ARCH_X86,
 | 
			
		||||
    CS_ARCH_XCORE,
 | 
			
		||||
)
 | 
			
		||||
from capstone import (
 | 
			
		||||
    CS_MODE_16,
 | 
			
		||||
    CS_MODE_32,
 | 
			
		||||
    CS_MODE_64,
 | 
			
		||||
    CS_MODE_ARM,
 | 
			
		||||
    CS_MODE_BIG_ENDIAN,
 | 
			
		||||
    CS_MODE_LITTLE_ENDIAN,
 | 
			
		||||
    CS_MODE_MCLASS,
 | 
			
		||||
    CS_MODE_MICRO,
 | 
			
		||||
    CS_MODE_MIPS3,
 | 
			
		||||
    CS_MODE_MIPS32,
 | 
			
		||||
    CS_MODE_MIPS32R6,
 | 
			
		||||
    CS_MODE_MIPS64,
 | 
			
		||||
    CS_MODE_THUMB,
 | 
			
		||||
    CS_MODE_V8,
 | 
			
		||||
    CS_MODE_V9,
 | 
			
		||||
)
 | 
			
		||||
import logging
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class _CapstoneBase:
 | 
			
		||||
    def __init__(self, payload: bytes, offset: int = 0):
 | 
			
		||||
        self.disassembly = list()
 | 
			
		||||
 | 
			
		||||
        for opcode in self.capstone.disasm(payload, offset):
 | 
			
		||||
            self.disassembly.append(opcode)
 | 
			
		||||
 | 
			
		||||
    def __repr__(self) -> str:
 | 
			
		||||
        return self.objdump
 | 
			
		||||
 | 
			
		||||
    def __len__(self) -> int:
 | 
			
		||||
        return len(self.disassembly)
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def objdump(self) -> str:
 | 
			
		||||
        opcodes = str()
 | 
			
		||||
 | 
			
		||||
        for opcode in self.disassembly:
 | 
			
		||||
            opcodes += f"{opcode.address:#02x}:\t{opcode.mnemonic}\t{opcode.op_str}\n"
 | 
			
		||||
 | 
			
		||||
        return opcodes
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def disasm(self) -> list:
 | 
			
		||||
        opcodes = list()
 | 
			
		||||
 | 
			
		||||
        for opcode in self.disassembly:
 | 
			
		||||
            opcodes.append(
 | 
			
		||||
                [
 | 
			
		||||
                    opcode.address,
 | 
			
		||||
                    opcode.mnemonic,
 | 
			
		||||
                    opcode.op_str,
 | 
			
		||||
                    opcode.size,
 | 
			
		||||
                ]
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        return opcodes
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class X86_intel(_CapstoneBase):
 | 
			
		||||
    capstone = Cs(CS_ARCH_X86, CS_MODE_16)
 | 
			
		||||
    arch = "x86-16"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class X86(_CapstoneBase):
 | 
			
		||||
    capstone = Cs(CS_ARCH_X86, CS_MODE_32)
 | 
			
		||||
    arch = "x86-32"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class X86_64(_CapstoneBase):
 | 
			
		||||
    capstone = Cs(CS_ARCH_X86, CS_MODE_64)
 | 
			
		||||
    arch = "x86-64"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ARM(_CapstoneBase):
 | 
			
		||||
    capstone = Cs(CS_ARCH_ARM, CS_MODE_ARM)
 | 
			
		||||
    arch = "ARM"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Thumb(_CapstoneBase):
 | 
			
		||||
    capstone = Cs(CS_ARCH_ARM, CS_MODE_THUMB)
 | 
			
		||||
    arch = "Thumb"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ARM_64(_CapstoneBase):
 | 
			
		||||
    capstone = Cs(CS_ARCH_ARM64, CS_MODE_ARM)
 | 
			
		||||
    arch = "ARM 64"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class MIPS_32_eb(_CapstoneBase):
 | 
			
		||||
    capstone = Cs(CS_ARCH_MIPS, CS_MODE_MIPS32 + CS_MODE_BIG_ENDIAN)
 | 
			
		||||
    arch = "MIPS-32 (Big-endian)"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class MIPS_64_el(_CapstoneBase):
 | 
			
		||||
    capstone = Cs(CS_ARCH_MIPS, CS_MODE_MIPS64 + CS_MODE_LITTLE_ENDIAN)
 | 
			
		||||
    arch = "MIPS-64-EL (Little-endian)"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class PPC_64(_CapstoneBase):
 | 
			
		||||
    capstone = Cs(CS_ARCH_PPC, CS_MODE_BIG_ENDIAN)
 | 
			
		||||
    arch = "PPC-64"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Sparc(_CapstoneBase):
 | 
			
		||||
    capstone = Cs(CS_ARCH_SPARC, CS_MODE_BIG_ENDIAN)
 | 
			
		||||
    arch = "Sparc"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class SparcV9(_CapstoneBase):
 | 
			
		||||
    capstone = Cs(CS_ARCH_SPARC, CS_MODE_BIG_ENDIAN + CS_MODE_V9)
 | 
			
		||||
    arch = "SparcV9"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class SystemZ(_CapstoneBase):
 | 
			
		||||
    capstone = Cs(CS_ARCH_SYSZ, 0)
 | 
			
		||||
    arch = "SystemZ"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class XCore(_CapstoneBase):
 | 
			
		||||
    capstone = Cs(CS_ARCH_XCORE, 0)
 | 
			
		||||
    arch = "XCore"
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,44 @@
 | 
			
		|||
from sqlalchemy import create_engine, Column, Integer, String, LargeBinary
 | 
			
		||||
from sqlalchemy.ext.declarative import declarative_base
 | 
			
		||||
from sqlalchemy.orm import Session
 | 
			
		||||
from pathlib import Path
 | 
			
		||||
import json
 | 
			
		||||
 | 
			
		||||
from .disassemble import _CapstoneBase
 | 
			
		||||
 | 
			
		||||
Base = declarative_base()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def db_config(path: Path) -> Session:
 | 
			
		||||
    engine = create_engine(f"sqlite:///{path.resolve()}", native_datetime=True)
 | 
			
		||||
    Base.metadata.create_all(engine)
 | 
			
		||||
    session = Session(engine)
 | 
			
		||||
    return session
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Disassembly(Base):
 | 
			
		||||
    __tablename__ = "Disassembly"
 | 
			
		||||
    id = Column(Integer, primary_key=True)
 | 
			
		||||
    arch = Column(String, nullable=False)
 | 
			
		||||
    checksum = Column(String, nullable=False)
 | 
			
		||||
    count = Column(Integer, nullable=False)
 | 
			
		||||
    size = Column(Integer, nullable=False)
 | 
			
		||||
    offset = Column(Integer, nullable=False)
 | 
			
		||||
    opcodes = Column(String, nullable=False)
 | 
			
		||||
    path = Column(String, nullable=False)
 | 
			
		||||
 | 
			
		||||
    def __repr__(self):
 | 
			
		||||
        return f"<Disassembly {json.dumps(self.values, indent=1)}>"
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def values(self) -> dict:
 | 
			
		||||
        values_dict = {
 | 
			
		||||
            "arch": self.arch,
 | 
			
		||||
            "checksum": self.checksum,
 | 
			
		||||
            "count": self.count,
 | 
			
		||||
            "size": self.size,
 | 
			
		||||
            "offset": self.offset,
 | 
			
		||||
            "path": self.path,
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        return values_dict
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,124 @@
 | 
			
		|||
from argparse import ArgumentParser
 | 
			
		||||
from hashlib import sha1
 | 
			
		||||
from pathlib import Path
 | 
			
		||||
from sqlalchemy import desc
 | 
			
		||||
import logging
 | 
			
		||||
 | 
			
		||||
from .disassemble import (
 | 
			
		||||
    X86_intel,
 | 
			
		||||
    X86,
 | 
			
		||||
    X86_64,
 | 
			
		||||
    ARM,
 | 
			
		||||
    Thumb,
 | 
			
		||||
    ARM_64,
 | 
			
		||||
    MIPS_32_eb,
 | 
			
		||||
    MIPS_64_el,
 | 
			
		||||
    PPC_64,
 | 
			
		||||
    Sparc,
 | 
			
		||||
    SparcV9,
 | 
			
		||||
    SystemZ,
 | 
			
		||||
    XCore,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
from .schema import db_config, Disassembly
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def subdisassem_script():
 | 
			
		||||
    parser = ArgumentParser(description="")
 | 
			
		||||
    parser.add_argument("-v", "--verbose", action="count", help="verbose logging")
 | 
			
		||||
    parser.add_argument("-b", "--bin-path", required=True)
 | 
			
		||||
    parser.add_argument("-l", "--log", action="store_true", help="log to file")
 | 
			
		||||
    parser.add_argument("-f", "--fuzz", default=64, help="offset bruteforce max")
 | 
			
		||||
    args = parser.parse_args()
 | 
			
		||||
 | 
			
		||||
    args.bin_path = Path(args.bin_path)
 | 
			
		||||
 | 
			
		||||
    if args.verbose:
 | 
			
		||||
        level = logging.DEBUG
 | 
			
		||||
        format = "%(asctime)s %(filename)s:%(lineno)d %(message)s"
 | 
			
		||||
    else:
 | 
			
		||||
        level = logging.INFO
 | 
			
		||||
        format = "%(asctime)s %(message)s"
 | 
			
		||||
 | 
			
		||||
    if args.log:
 | 
			
		||||
        filename = args.bin_path.parent.joinpath(f"{args.bin_path.name}.log")
 | 
			
		||||
 | 
			
		||||
        logging.basicConfig(
 | 
			
		||||
            level=level,
 | 
			
		||||
            format=format,
 | 
			
		||||
            filename=filename,
 | 
			
		||||
        )
 | 
			
		||||
    else:
 | 
			
		||||
        logging.basicConfig(
 | 
			
		||||
            level=level,
 | 
			
		||||
            format=format,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    logging.info(args)
 | 
			
		||||
 | 
			
		||||
    db_path = args.bin_path.parent.joinpath(f"{args.bin_path.name}.sqlite").absolute()
 | 
			
		||||
    session = db_config(db_path)
 | 
			
		||||
    logging.info(f"results sqlite database created at {db_path}")
 | 
			
		||||
 | 
			
		||||
    # reading the whole file into memory until I get an idea for pagnating
 | 
			
		||||
 | 
			
		||||
    with args.bin_path.open("rb") as file_open:
 | 
			
		||||
        raw_bytes = file_open.read()
 | 
			
		||||
 | 
			
		||||
    sha1sum = sha1()
 | 
			
		||||
    sha1sum.update(raw_bytes)
 | 
			
		||||
    checksum = sha1sum.hexdigest()
 | 
			
		||||
 | 
			
		||||
    logging.info(f"sha1sum: {checksum}")
 | 
			
		||||
 | 
			
		||||
    archs = [
 | 
			
		||||
        X86_intel,
 | 
			
		||||
        X86,
 | 
			
		||||
        X86_64,
 | 
			
		||||
        ARM,
 | 
			
		||||
        Thumb,
 | 
			
		||||
        ARM_64,
 | 
			
		||||
        MIPS_32_eb,
 | 
			
		||||
        MIPS_64_el,
 | 
			
		||||
        PPC_64,
 | 
			
		||||
        Sparc,
 | 
			
		||||
        SparcV9,
 | 
			
		||||
        SystemZ,
 | 
			
		||||
        XCore,
 | 
			
		||||
    ]
 | 
			
		||||
 | 
			
		||||
    for arch in archs:
 | 
			
		||||
        for offset in range(args.fuzz):
 | 
			
		||||
            disasembler = arch(payload=raw_bytes, offset=offset)
 | 
			
		||||
            row = Disassembly()
 | 
			
		||||
            row.arch = disasembler.arch
 | 
			
		||||
            row.checksum = checksum
 | 
			
		||||
            row.count = len(disasembler)
 | 
			
		||||
            row.size = len(raw_bytes) - offset
 | 
			
		||||
            row.offset = offset
 | 
			
		||||
            row.opcodes = disasembler.objdump
 | 
			
		||||
            row.path = str(args.bin_path.absolute())
 | 
			
		||||
 | 
			
		||||
            exists = (
 | 
			
		||||
                session.query(Disassembly)
 | 
			
		||||
                .filter(Disassembly.checksum == row.checksum)
 | 
			
		||||
                .filter(Disassembly.offset == row.offset)
 | 
			
		||||
                .filter(Disassembly.arch == row.arch)
 | 
			
		||||
                .first()
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            if not exists:
 | 
			
		||||
                session.add(row)
 | 
			
		||||
 | 
			
		||||
    session.commit()
 | 
			
		||||
 | 
			
		||||
    count = session.query(Disassembly).order_by(desc("count")).first()
 | 
			
		||||
    tops = (
 | 
			
		||||
        session.query(Disassembly)
 | 
			
		||||
        .filter(Disassembly.count == count.count)
 | 
			
		||||
        .order_by(desc("size"))
 | 
			
		||||
        .all()
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    for top in tops[:3]:
 | 
			
		||||
        logging.info(top)
 | 
			
		||||
		Loading…
	
		Reference in New Issue