分析

与常见的br跳转不同的是这里使用了2个condition select

jcc跳转

图片描述
先是,根据cmp的结果通过condition select instruction (0x27B78这里是CSEL)得到一个下标,然后是一个存放偏移地址的table(off_1F4340),根据上面得到的下标从table取出地址,再一次根据前面cmp的结果选择一个常量与table里取出的地址相加减运算得到最终的跳转地址。
图片描述

从上面分析可以看到,两个condition select都是使用了同一个cmp结果,也就是说下标和常量是配对的,最终会根据不同结果得到两个不同的地址,这里就是模拟了jcc。

还有一种情况是将上面condition select得到的运算的常量进行了一次或多次加密,在参与运算前会解密这些常量,使用的算法是异或。比如下图是某函数的开始位置,通过memcpy从qword_1980B0复制了一段数据到栈上
图片描述
调用函数完成第一次解密
图片描述
图片描述
间接跳转的时候就会取出第二次解密计算
图片描述
也有一种解密方式是直接把解密函数内联优化了,直接在函数内通过condition branch完成。
图片描述

jmp跳转

另一种br间接跳转是模拟了无条件jmp,但是也是通过table[index] +/- value得到目标地址
图片描述

最后观察其他的br,可以看到每个function都有自己的跳转地址的table,并且在entry block赋值,这一点对后面一些特殊情况需要手动计算时比较有帮助。

反混淆思路

  1. 找出所有的br指令位置,判断br前的几个指令是否符合间接跳转的几个特征。识别br跳转的类型是jcc还是jmp并记录跳转的目标地址,如果是jcc还需要记录br之前第一个set condition的位置、 condition select使用的condition,如EQ、NE。
  2. 从函数entry block开始模拟执行,在set condition也就是cmp后面的位置修改条件命中的状态,修改命中状态之前先保存当前模拟器的完整context(包括寄存器和map的mem),之后分别在两种不同状态继续向下执行,对于条件跳转指令也是保存context后对两个分支分别执行。
  3. 得到所有分支的目标地址后进行patch。大部分跳转部分的代码都是计算好跳转目标后就直接br跳转了,中间的代码不会破坏标志位寄存器,所以可以直接用记录的condition重新patch一个条件跳转。patch实现主要难点在jcc部分,需要处理不同condition跳到不同地址。可以新建一个段专门存放各个分发部分的代码,br那里patch直接跳转到对应分发部分就好,也可以直接操作反编译器的ir完成。剩下的一小部分是上面带有常量解密部分的,先计算好跳转目标,之后去解密了一部分常量,解密过程会破坏标记位寄存器,不过好在只有几个位置是这种情况,直接nop掉解密常量的部分继续使用上面方法就好。

代码实现

提取br信息

def _getBlockByAddr(addr, flowchart):
    bb: BasicBlock
    for bb in flowchart:
        if (addr >= bb.start_ea and addr < bb.end_ea) or addr == bb.start_ea:
            return bb
    return None
 
def process_br(addr):
    function = idaapi.get_func(addr)
    flowchart = list(idaapi.FlowChart(function))
    basic_block = _getBlockByAddr(addr, flowchart)
 
    instruction = idaapi.insn_t()
    idaapi.decode_insn(instruction, addr)
    branch_register = instruction.Op1.reg
 
    instruction_addr = addr
    instruction_count = (basic_block.end_ea - basic_block.start_ea) // 4
    if instruction_count <= 2:  # 一些特殊情况导致basicblock被分开
        while instruction_addr >= basic_block.start_ea and instruction_addr <= basic_block.end_ea:
            instruction_addr = idc.prev_head(instruction_addr, idc.get_inf_attr(idc.INF_MIN_EA))
        basic_block = _getBlockByAddr(instruction_addr, flowchart)
 
    if basic_block is None:
        return None
 
    is_entry_block = False
    for (start_addr, end_addr) in func_range:
        if basic_block.start_ea == start_addr:
            is_entry_block = True
            func_start_br.append((start_addr, end_addr))
            break
 
    instruction_addr = addr
    count = 0
    jcc_phase = 0
    jmp_phase = 0
    is_jcc = False
    is_jmp = False
    condition_instructions = []
    jcc_conditions = []
    jcc_base_register = ""
    jmp_base_register = ""
    calc_used_registers = set()
    cmp_address = 0
 
    '''
    jcc:
        CMP             X8, X9
        MOV             W8, #8
        MOV             W9, #0x90
        CSEL            X8, X9, X8, EQ
        LDR             X8, [X27,X8]
        MOV             W10, #0xB5AF 
        MOV             W11, #0xA4ED
        CSEL            X10, X11, X10, EQ
        SUB             X8, X8, X10 
        BR              X8           
 
    jmp:
        LDR             X8, [X24,#0xe8]
        MOV             X9, #0xFFFFFFFFFFFF7B09
        ADD             X8, X8, X9
        BR              X8
    '''
    while count < 50:
        instruction_addr = idc.prev_head(instruction_addr, idc.get_inf_attr(idc.INF_MIN_EA))
        count += 1
 
        instruction = idaapi.insn_t()
        length = idaapi.decode_insn(instruction, instruction_addr)
 
        mnemonic = idc.print_insn_mnem(instruction_addr).lower()
 
        if mnemonic in ["csel", "csinc", "csinv", "csneg", "cset", "csetm", "cinc", "cinv", "cneg"]:
            condition_instructions.append(GetDisasm(instruction_addr))
            jcc_conditions.append(GetDisasm(instruction_addr).split(", ")[-1])
            if jcc_phase == 1:
                jcc_phase += 1
            elif jcc_phase == 3:
                is_jcc = True
 
        elif mnemonic == "ldr":
            if instruction.Op1.type == o_reg and instruction.Op1.reg in calc_used_registers:
                if jcc_phase == 2:
                    jcc_phase += 1
                    jcc_base_register = reg_name[instruction.Op2.phrase]
                if jmp_phase == 2:
                    is_jmp = True
                    jmp_base_register = reg_name[instruction.Op2.phrase]
 
        elif mnemonic in ["br", "blr", "bl", "b", "ret"]:
            break
 
        elif mnemonic in ["add", "sub"]:
            if instruction.Op1.type == instruction.Op2.type == o_reg and instruction.Op1.reg == branch_register and instruction.Op3.type in [o_reg, o_idpspec0]:
                if jmp_phase == 0:
                    jmp_phase += 1
                    jmp_calc_used_register = instruction.Op3.reg
                    calc_used_registers.add(instruction.Op1.reg)
                    calc_used_registers.add(instruction.Op2.reg)
                if jcc_phase == 0:
                    jcc_phase += 1
                    calc_used_registers.add(instruction.Op1.reg)
                    calc_used_registers.add(instruction.Op2.reg)
 
        elif mnemonic == "mov":
            if jmp_phase == 1 and instruction.Op2.type == o_imm and instruction.Op1.type == o_reg and instruction.Op1.reg == jmp_calc_used_register:
                jmp_phase += 1
 
        elif spoils_flags(instruction):
            if is_jcc and cmp_address == 0:
                cmp_address = instruction_addr
 
    if is_jcc:
        if jcc_conditions[0] == jcc_conditions[1]:
            print(f"jcc br    addr: {hex(addr)}    is_entry_block: {is_entry_block}  base_reg: {jcc_base_register}  cmp addr: {hex(cmp_address)}")
        else: # 特殊情况,两个condtion select使用的条件不同
            print(f"jcc br    addr: {hex(addr)}    is_entry_block: {is_entry_block}  base_reg: {jcc_base_register}  condition not valid!  condition: {jcc_conditions}  condition_instructions: {condition_instructions}")
    elif is_jmp:
        print(f"jmp br    addr: {hex(addr)}    is_entry_block: {is_entry_block}  base_reg: {jmp_base_register}")
    else:
        print(f"Invalid br addr: {hex(addr)}")

模拟执行

class BranchInstruction:
    def __init__(self, address, branch_type, compare_address=None, condition=None, function_index=None):
        self.address = address
        self.branch_type = branch_type
        self.compare_address = compare_address
        self.condition = condition
        self.taken_branch_address = None
        self.not_taken_branch_address = None
        self.function_index = function_index
 
branch_info = dict()
 
branch_data = [
    {"branch_address": 0x27b9c, "compare_address": 0x27b70, "condition": "EQ", "branch_type": "jcc"},
    {"branch_address": 0x27be4, "branch_type": "jmp"},
    # ... 其他数据保持不变
]
 
breakpoint_addresses_branch = []
breakpoint_addresses_compare = []
all_conditions = set()
 
compare_address_lookup = dict()
function_ranges = [(162512, 164428), (182320, 185084), (220224, 220252), (294424, 294440), (300112, 300140),
                   (308092, 309132), (368928, 368976), ..., (0, 0)]
 
                       
def set_condition_flags(uc, condition, condition_hit=True):
    nzcv = uc.reg_read(arm64_const.UC_ARM64_REG_NZCV)
 
    # Helper functions to set or clear specific bits in NZCV
    def set_bit(value, bit):
        return value | (1 << bit)
 
    def clear_bit(value, bit):
        return value & ~(1 << bit)
 
    # N: Negative, Z: Zero, C: Carry, V: Overflow
    N_BIT = 31
    Z_BIT = 30
    C_BIT = 29
    V_BIT = 28
 
    if condition == "EQ":  # Equal (Z set)
        if condition_hit:
            nzcv = set_bit(nzcv, Z_BIT)
        else:
            nzcv = clear_bit(nzcv, Z_BIT)
    elif condition == "NE":  # Not Equal (Z clear)
        if condition_hit:
            nzcv = clear_bit(nzcv, Z_BIT)
        else:
            nzcv = set_bit(nzcv, Z_BIT)
    elif condition == "GT":  # Greater Than (Z clear and N == V)
        if condition_hit:
            nzcv = clear_bit(nzcv, Z_BIT)
            nzcv = set_bit(nzcv, N_BIT) if (nzcv >> V_BIT) & 1 else clear_bit(nzcv, N_BIT)
        else:
            nzcv = set_bit(nzcv, Z_BIT)
    elif condition == "LT":  # Less Than (N != V)
        if condition_hit:
            nzcv = set_bit(nzcv, N_BIT) if not (nzcv >> V_BIT) & 1 else clear_bit(nzcv, N_BIT)
        else:
            nzcv = set_bit(nzcv, N_BIT) if (nzcv >> V_BIT) & 1 else clear_bit(nzcv, N_BIT)
    elif condition == "HI":  # Higher (C set and Z clear)
        if condition_hit:
            nzcv = set_bit(nzcv, C_BIT)
            nzcv = clear_bit(nzcv, Z_BIT)
        else:
            nzcv = clear_bit(nzcv, C_BIT)
            nzcv = set_bit(nzcv, Z_BIT)
    elif condition == "CC":  # Carry Clear (C clear)
        if condition_hit:
            nzcv = clear_bit(nzcv, C_BIT)
        else:
            nzcv = set_bit(nzcv, C_BIT)
    else:
        raise ValueError(f"Unsupported condition: {condition}")
 
    uc.reg_write(arm64_const.UC_ARM64_REG_NZCV, nzcv)
 
 
def is_condition_met(uc, condition):
    nzcv = uc.reg_read(arm64_const.UC_ARM64_REG_NZCV)
 
    # Helper functions to check specific bits in NZCV
    def is_bit_set(value, bit):
        return (value >> bit) & 1
 
    # N: Negative, Z: Zero, C: Carry, V: Overflow
    N_BIT = 31
    Z_BIT = 30
    C_BIT = 29
    V_BIT = 28
 
    if condition == "EQ":  # Equal (Z set)
        return is_bit_set(nzcv, Z_BIT)
    elif condition == "NE":  # Not Equal (Z clear)
        return not is_bit_set(nzcv, Z_BIT)
    elif condition == "GT":  # Greater Than (Z clear and N == V)
        return not is_bit_set(nzcv, Z_BIT) and (is_bit_set(nzcv, N_BIT) == is_bit_set(nzcv, V_BIT))
    elif condition == "LT":  # Less Than (N != V)
        return is_bit_set(nzcv, N_BIT) != is_bit_set(nzcv, V_BIT)
    elif condition == "HI":  # Higher (C set and Z clear)
        return is_bit_set(nzcv, C_BIT) and not is_bit_set(nzcv, Z_BIT)
    elif condition == "CC":  # Carry Clear (C clear)
        return not is_bit_set(nzcv, C_BIT)
    else:
        raise ValueError(f"Unsupported condition: {condition}")
 
 
def is_valid_address(addr, segment_name=".text"):
    seg = get_segm_by_name(segment_name)
    return addr >= seg.start_ea and addr <= seg.end_ea
 
 
def instruction_hook(uc, address, size, userData):
    print(">>> Tracing instruction at 0x%x" % (address))
 
    eh: flare_emu.EmuHelper = userData["EmuHelper"]
    uc: unicorn.Uc
 
    # 特殊处理
    if address == 0xB9624:
        print("memcpy api hook")
        eh._handleApiHooks(address, eh.getArgv(), "memcpy", userData)
        uc.reg_write(arm64_const.UC_ARM64_REG_PC, address + size)
        return
 
    if address == 0xB96A4 or address == 0xBB18C or address in range(0xBA2E8, 0x0BA338):
        print("Consts Decrypt func execute")
        return
 
    if not is_valid_address(address):
        print("Error: Invalid address detected. Stopping emulation.")
        print(traceback.print_stack())
        uc.emu_stop()
        return
 
    if idc.print_insn_mnem(address) in ["BL", "BLR", "BLX"]:
        print("Info: Skipping function call.")
        uc.reg_write(arm64_const.UC_ARM64_REG_X0, 1)
        uc.reg_write(arm64_const.UC_ARM64_REG_PC, address + size)
 
    if idc.print_insn_mnem(address) == "RET":
        print("Info: Function returned. Stopping emulation.")
        uc.emu_stop()
        return
 
    if idc.print_insn_mnem(address)[:2] == "B." and address not in compare_visited:
        print(f"Info: Conditional branch detected at 0x{address:x}")
        compare_visited.append(address)
        current_state: unicorn.UcContext = uc.context_save()
        instruction = idaapi.insn_t()
        idaapi.decode_insn(instruction, address)
        current_emu_helper = flare_emu.EmuHelper(verbose=1, emuHelper=emu_helper)
        print(f"    Emulating branch condition: taken (0x{instruction.Op1.addr:x})")
        current_emu_helper.emulateRange(instruction.Op1.addr, user_data["endAddr"], instructionHook=instruction_hook, count=100,
                                        uc_context=current_state)
 
        current_emu_helper = flare_emu.EmuHelper(verbose=1, emuHelper=emu_helper)
        print(f"    Emulating branch condition: not taken (0x{address + 4:x})")
        current_emu_helper.emulateRange(address + 4, user_data["endAddr"], instructionHook=instruction_hook, count=100,
                                        uc_context=current_state)
 
        emu_helper.stopEmulation(user_data)
        return
 
    if address in breakpoint_addresses_compare:
        branch_address = compare_address_lookup[address]
        condition = branch_info[branch_address].condition
        print(f"Info: Compare instruction at 0x{address:x}, Condition: {condition}")
        breakpoint_addresses_compare.remove(address)
        start_pc = address + 4
        current_state: unicorn.UcContext = uc.context_save()
 
        if branch_info[branch_address].taken_branch_address is None:
            print(f"Info: Starting emulation with condition taken at 0x{start_pc:x}")
            current_emu_helper = flare_emu.EmuHelper(verbose=1, emuHelper=emu_helper)
            set_condition_flags(current_state, condition, True)
            current_emu_helper.emulateRange(start_pc, user_data["endAddr"], instructionHook=instruction_hook, count=100,
                                            uc_context=current_state)
 
        if branch_info[branch_address].not_taken_branch_address is None:
            print(f"Info: Starting emulation with condition not taken at 0x{start_pc:x}")
            current_emu_helper = flare_emu.EmuHelper(verbose=1, emuHelper=emu_helper)
            set_condition_flags(current_state, condition, False)
            current_emu_helper.emulateRange(start_pc, user_data["endAddr"], instructionHook=instruction_hook, count=100,
                                            uc_context=current_state)
 
        emu_helper.stopEmulation(user_data)
        return
    elif address in breakpoint_addresses_branch:
        instruction = idaapi.insn_t()
        idaapi.decode_insn(instruction, address)
        register_name = regid_2_name[instruction.Op1.reg]
        register_value = uc.reg_read(unicorn_reg_map[register_name])
        print(f"Info: Executing branch instruction at 0x{address:x}, {register_name} = 0x{register_value:x}")
        if branch_info[address].branch_type == "jcc":
            if is_condition_met(uc, branch_info[address].condition) and branch_info[address].taken_branch_address is None:
                if register_value in range(function_ranges[branch_info[address].function_index][0],
                                           function_ranges[branch_info[address].function_index][1]):
                    print(f"Info: Conditional jump (JCC) condition taken at 0x{address:x}, {register_name} = 0x{register_value:x}")
                    branch_info[address].taken_branch_address = register_value
            elif not is_condition_met(uc, branch_info[address].condition) and branch_info[address].not_taken_branch_address is None:
                if register_value in range(function_ranges[branch_info[address].function_index][0],
                                           function_ranges[branch_info[address].function_index][1]):
                    print(f"Info: Conditional jump (JCC) condition not taken at 0x{address:x}, {register_name} = 0x{register_value:x}")
                    branch_info[address].not_taken_branch_address = register_value
            if branch_info[address].taken_branch_address is not None and branch_info[address].not_taken_branch_address is not None:
                print(
                    f"Info: All conditional jump (JCC) paths processed at 0x{address:x}. "
                    f"Taken path: 0x{branch_info[address].taken_branch_address:x}, "
                    f"Not taken path: 0x{branch_info[address].not_taken_branch_address:x}")
                breakpoint_addresses_branch.remove(address)
 
        elif branch_info[address].branch_type == "jmp" and branch_info[address].taken_branch_address is None:
            if register_value in range(function_ranges[branch_info[address].function_index][0],
                                       function_ranges[branch_info[address].function_index][1]):
                branch_info[address].taken_branch_address = register_value
                print(f"Info: Unconditional jump (JMP) executed at 0x{address:x}, {register_name} = 0x{register_value:x}")
                breakpoint_addresses_branch.remove(address)
 
 
for (function_start, function_end) in function_ranges[:-1]:
    print(f"Info: Emulating function from 0x{function_start:x} to 0x{function_end:x}")
    emu_helper = flare_emu.EmuHelper(verbose=1)
    try:
        emu_helper.emulateRange(function_start, function_end - 4, instructionHook=instruction_hook, count=100)
        print("Info: Function emulation completed successfully.")
    except Exception as e:
        print(f"Error: Exception occurred during emulation: {e}")

需要注意的几个地方:

  1. unicorn执行arm64 atomic instructions会出错,在unicorn中直接hook跳过这条指令
    图片描述
    图片描述
  2. 两个condition select使用不同condition的情况,两个condition刚好是相反的,识别时对不同的swap一下
    图片描述
  3. unicorn的save_context默认只能保存寄存器状态,mem map的保存可以借助flare_emu的emuHelper完成
  4. 部分函数的常量解密部分的代码要正常执行。
  5. 如果br间接跳转会跳回当前basic block,那么模拟执行部分会出现死循环,不过好在没有跳回的情况出现。

反混淆效果

图片描述

https://bbs.kanxue.com/thread-283706.htm