分析
与常见的br跳转不同的是这里使用了2个condition select
jcc跳转
先是,根据cmp的结果通过condition select instruction (0x27B78这里是CSEL)得到一个下标,然后是一个存放偏移地址的table(off_1F4340),根据上面得到的下标从table取出地址,再一次根据前面cmp的结果选择一个常量与table里取出的地址相加减运算得到最终的跳转地址。
从上面分析可以看到,两个condition select都是使用了同一个cmp结果,也就是说下标和常量是配对的,最终会根据不同结果得到两个不同的地址,这里就是模拟了jcc。
还有一种情况是将上面condition select得到的运算的常量进行了一次或多次加密,在参与运算前会解密这些常量,使用的算法是异或。比如下图是某函数的开始位置,通过memcpy从qword_1980B0复制了一段数据到栈上
调用函数完成第一次解密
间接跳转的时候就会取出第二次解密计算
也有一种解密方式是直接把解密函数内联优化了,直接在函数内通过condition branch完成。
jmp跳转
另一种br间接跳转是模拟了无条件jmp,但是也是通过table[index] +/- value得到目标地址
最后观察其他的br,可以看到每个function都有自己的跳转地址的table,并且在entry block赋值,这一点对后面一些特殊情况需要手动计算时比较有帮助。
反混淆思路
- 找出所有的br指令位置,判断br前的几个指令是否符合间接跳转的几个特征。识别br跳转的类型是jcc还是jmp并记录跳转的目标地址,如果是jcc还需要记录br之前第一个set condition的位置、 condition select使用的condition,如EQ、NE。
- 从函数entry block开始模拟执行,在set condition也就是cmp后面的位置修改条件命中的状态,修改命中状态之前先保存当前模拟器的完整context(包括寄存器和map的mem),之后分别在两种不同状态继续向下执行,对于条件跳转指令也是保存context后对两个分支分别执行。
- 得到所有分支的目标地址后进行patch。大部分跳转部分的代码都是计算好跳转目标后就直接br跳转了,中间的代码不会破坏标志位寄存器,所以可以直接用记录的condition重新patch一个条件跳转。patch实现主要难点在jcc部分,需要处理不同condition跳到不同地址。可以新建一个段专门存放各个分发部分的代码,br那里patch直接跳转到对应分发部分就好,也可以直接操作反编译器的ir完成。剩下的一小部分是上面带有常量解密部分的,先计算好跳转目标,之后去解密了一部分常量,解密过程会破坏标记位寄存器,不过好在只有几个位置是这种情况,直接nop掉解密常量的部分继续使用上面方法就好。
代码实现
提取br信息
def _getBlockByAddr(addr, flowchart):
bb: BasicBlock
for bb in flowchart:
if (addr >= bb.start_ea and addr < bb.end_ea) or addr == bb.start_ea:
return bb
return None
def process_br(addr):
function = idaapi.get_func(addr)
flowchart = list(idaapi.FlowChart(function))
basic_block = _getBlockByAddr(addr, flowchart)
instruction = idaapi.insn_t()
idaapi.decode_insn(instruction, addr)
branch_register = instruction.Op1.reg
instruction_addr = addr
instruction_count = (basic_block.end_ea - basic_block.start_ea) // 4
if instruction_count <= 2: # 一些特殊情况导致basicblock被分开
while instruction_addr >= basic_block.start_ea and instruction_addr <= basic_block.end_ea:
instruction_addr = idc.prev_head(instruction_addr, idc.get_inf_attr(idc.INF_MIN_EA))
basic_block = _getBlockByAddr(instruction_addr, flowchart)
if basic_block is None:
return None
is_entry_block = False
for (start_addr, end_addr) in func_range:
if basic_block.start_ea == start_addr:
is_entry_block = True
func_start_br.append((start_addr, end_addr))
break
instruction_addr = addr
count = 0
jcc_phase = 0
jmp_phase = 0
is_jcc = False
is_jmp = False
condition_instructions = []
jcc_conditions = []
jcc_base_register = ""
jmp_base_register = ""
calc_used_registers = set()
cmp_address = 0
'''
jcc:
CMP X8, X9
MOV W8, #8
MOV W9, #0x90
CSEL X8, X9, X8, EQ
LDR X8, [X27,X8]
MOV W10, #0xB5AF
MOV W11, #0xA4ED
CSEL X10, X11, X10, EQ
SUB X8, X8, X10
BR X8
jmp:
LDR X8, [X24,#0xe8]
MOV X9, #0xFFFFFFFFFFFF7B09
ADD X8, X8, X9
BR X8
'''
while count < 50:
instruction_addr = idc.prev_head(instruction_addr, idc.get_inf_attr(idc.INF_MIN_EA))
count += 1
instruction = idaapi.insn_t()
length = idaapi.decode_insn(instruction, instruction_addr)
mnemonic = idc.print_insn_mnem(instruction_addr).lower()
if mnemonic in ["csel", "csinc", "csinv", "csneg", "cset", "csetm", "cinc", "cinv", "cneg"]:
condition_instructions.append(GetDisasm(instruction_addr))
jcc_conditions.append(GetDisasm(instruction_addr).split(", ")[-1])
if jcc_phase == 1:
jcc_phase += 1
elif jcc_phase == 3:
is_jcc = True
elif mnemonic == "ldr":
if instruction.Op1.type == o_reg and instruction.Op1.reg in calc_used_registers:
if jcc_phase == 2:
jcc_phase += 1
jcc_base_register = reg_name[instruction.Op2.phrase]
if jmp_phase == 2:
is_jmp = True
jmp_base_register = reg_name[instruction.Op2.phrase]
elif mnemonic in ["br", "blr", "bl", "b", "ret"]:
break
elif mnemonic in ["add", "sub"]:
if instruction.Op1.type == instruction.Op2.type == o_reg and instruction.Op1.reg == branch_register and instruction.Op3.type in [o_reg, o_idpspec0]:
if jmp_phase == 0:
jmp_phase += 1
jmp_calc_used_register = instruction.Op3.reg
calc_used_registers.add(instruction.Op1.reg)
calc_used_registers.add(instruction.Op2.reg)
if jcc_phase == 0:
jcc_phase += 1
calc_used_registers.add(instruction.Op1.reg)
calc_used_registers.add(instruction.Op2.reg)
elif mnemonic == "mov":
if jmp_phase == 1 and instruction.Op2.type == o_imm and instruction.Op1.type == o_reg and instruction.Op1.reg == jmp_calc_used_register:
jmp_phase += 1
elif spoils_flags(instruction):
if is_jcc and cmp_address == 0:
cmp_address = instruction_addr
if is_jcc:
if jcc_conditions[0] == jcc_conditions[1]:
print(f"jcc br addr: {hex(addr)} is_entry_block: {is_entry_block} base_reg: {jcc_base_register} cmp addr: {hex(cmp_address)}")
else: # 特殊情况,两个condtion select使用的条件不同
print(f"jcc br addr: {hex(addr)} is_entry_block: {is_entry_block} base_reg: {jcc_base_register} condition not valid! condition: {jcc_conditions} condition_instructions: {condition_instructions}")
elif is_jmp:
print(f"jmp br addr: {hex(addr)} is_entry_block: {is_entry_block} base_reg: {jmp_base_register}")
else:
print(f"Invalid br addr: {hex(addr)}")
模拟执行
class BranchInstruction:
def __init__(self, address, branch_type, compare_address=None, condition=None, function_index=None):
self.address = address
self.branch_type = branch_type
self.compare_address = compare_address
self.condition = condition
self.taken_branch_address = None
self.not_taken_branch_address = None
self.function_index = function_index
branch_info = dict()
branch_data = [
{"branch_address": 0x27b9c, "compare_address": 0x27b70, "condition": "EQ", "branch_type": "jcc"},
{"branch_address": 0x27be4, "branch_type": "jmp"},
# ... 其他数据保持不变
]
breakpoint_addresses_branch = []
breakpoint_addresses_compare = []
all_conditions = set()
compare_address_lookup = dict()
function_ranges = [(162512, 164428), (182320, 185084), (220224, 220252), (294424, 294440), (300112, 300140),
(308092, 309132), (368928, 368976), ..., (0, 0)]
def set_condition_flags(uc, condition, condition_hit=True):
nzcv = uc.reg_read(arm64_const.UC_ARM64_REG_NZCV)
# Helper functions to set or clear specific bits in NZCV
def set_bit(value, bit):
return value | (1 << bit)
def clear_bit(value, bit):
return value & ~(1 << bit)
# N: Negative, Z: Zero, C: Carry, V: Overflow
N_BIT = 31
Z_BIT = 30
C_BIT = 29
V_BIT = 28
if condition == "EQ": # Equal (Z set)
if condition_hit:
nzcv = set_bit(nzcv, Z_BIT)
else:
nzcv = clear_bit(nzcv, Z_BIT)
elif condition == "NE": # Not Equal (Z clear)
if condition_hit:
nzcv = clear_bit(nzcv, Z_BIT)
else:
nzcv = set_bit(nzcv, Z_BIT)
elif condition == "GT": # Greater Than (Z clear and N == V)
if condition_hit:
nzcv = clear_bit(nzcv, Z_BIT)
nzcv = set_bit(nzcv, N_BIT) if (nzcv >> V_BIT) & 1 else clear_bit(nzcv, N_BIT)
else:
nzcv = set_bit(nzcv, Z_BIT)
elif condition == "LT": # Less Than (N != V)
if condition_hit:
nzcv = set_bit(nzcv, N_BIT) if not (nzcv >> V_BIT) & 1 else clear_bit(nzcv, N_BIT)
else:
nzcv = set_bit(nzcv, N_BIT) if (nzcv >> V_BIT) & 1 else clear_bit(nzcv, N_BIT)
elif condition == "HI": # Higher (C set and Z clear)
if condition_hit:
nzcv = set_bit(nzcv, C_BIT)
nzcv = clear_bit(nzcv, Z_BIT)
else:
nzcv = clear_bit(nzcv, C_BIT)
nzcv = set_bit(nzcv, Z_BIT)
elif condition == "CC": # Carry Clear (C clear)
if condition_hit:
nzcv = clear_bit(nzcv, C_BIT)
else:
nzcv = set_bit(nzcv, C_BIT)
else:
raise ValueError(f"Unsupported condition: {condition}")
uc.reg_write(arm64_const.UC_ARM64_REG_NZCV, nzcv)
def is_condition_met(uc, condition):
nzcv = uc.reg_read(arm64_const.UC_ARM64_REG_NZCV)
# Helper functions to check specific bits in NZCV
def is_bit_set(value, bit):
return (value >> bit) & 1
# N: Negative, Z: Zero, C: Carry, V: Overflow
N_BIT = 31
Z_BIT = 30
C_BIT = 29
V_BIT = 28
if condition == "EQ": # Equal (Z set)
return is_bit_set(nzcv, Z_BIT)
elif condition == "NE": # Not Equal (Z clear)
return not is_bit_set(nzcv, Z_BIT)
elif condition == "GT": # Greater Than (Z clear and N == V)
return not is_bit_set(nzcv, Z_BIT) and (is_bit_set(nzcv, N_BIT) == is_bit_set(nzcv, V_BIT))
elif condition == "LT": # Less Than (N != V)
return is_bit_set(nzcv, N_BIT) != is_bit_set(nzcv, V_BIT)
elif condition == "HI": # Higher (C set and Z clear)
return is_bit_set(nzcv, C_BIT) and not is_bit_set(nzcv, Z_BIT)
elif condition == "CC": # Carry Clear (C clear)
return not is_bit_set(nzcv, C_BIT)
else:
raise ValueError(f"Unsupported condition: {condition}")
def is_valid_address(addr, segment_name=".text"):
seg = get_segm_by_name(segment_name)
return addr >= seg.start_ea and addr <= seg.end_ea
def instruction_hook(uc, address, size, userData):
print(">>> Tracing instruction at 0x%x" % (address))
eh: flare_emu.EmuHelper = userData["EmuHelper"]
uc: unicorn.Uc
# 特殊处理
if address == 0xB9624:
print("memcpy api hook")
eh._handleApiHooks(address, eh.getArgv(), "memcpy", userData)
uc.reg_write(arm64_const.UC_ARM64_REG_PC, address + size)
return
if address == 0xB96A4 or address == 0xBB18C or address in range(0xBA2E8, 0x0BA338):
print("Consts Decrypt func execute")
return
if not is_valid_address(address):
print("Error: Invalid address detected. Stopping emulation.")
print(traceback.print_stack())
uc.emu_stop()
return
if idc.print_insn_mnem(address) in ["BL", "BLR", "BLX"]:
print("Info: Skipping function call.")
uc.reg_write(arm64_const.UC_ARM64_REG_X0, 1)
uc.reg_write(arm64_const.UC_ARM64_REG_PC, address + size)
if idc.print_insn_mnem(address) == "RET":
print("Info: Function returned. Stopping emulation.")
uc.emu_stop()
return
if idc.print_insn_mnem(address)[:2] == "B." and address not in compare_visited:
print(f"Info: Conditional branch detected at 0x{address:x}")
compare_visited.append(address)
current_state: unicorn.UcContext = uc.context_save()
instruction = idaapi.insn_t()
idaapi.decode_insn(instruction, address)
current_emu_helper = flare_emu.EmuHelper(verbose=1, emuHelper=emu_helper)
print(f" Emulating branch condition: taken (0x{instruction.Op1.addr:x})")
current_emu_helper.emulateRange(instruction.Op1.addr, user_data["endAddr"], instructionHook=instruction_hook, count=100,
uc_context=current_state)
current_emu_helper = flare_emu.EmuHelper(verbose=1, emuHelper=emu_helper)
print(f" Emulating branch condition: not taken (0x{address + 4:x})")
current_emu_helper.emulateRange(address + 4, user_data["endAddr"], instructionHook=instruction_hook, count=100,
uc_context=current_state)
emu_helper.stopEmulation(user_data)
return
if address in breakpoint_addresses_compare:
branch_address = compare_address_lookup[address]
condition = branch_info[branch_address].condition
print(f"Info: Compare instruction at 0x{address:x}, Condition: {condition}")
breakpoint_addresses_compare.remove(address)
start_pc = address + 4
current_state: unicorn.UcContext = uc.context_save()
if branch_info[branch_address].taken_branch_address is None:
print(f"Info: Starting emulation with condition taken at 0x{start_pc:x}")
current_emu_helper = flare_emu.EmuHelper(verbose=1, emuHelper=emu_helper)
set_condition_flags(current_state, condition, True)
current_emu_helper.emulateRange(start_pc, user_data["endAddr"], instructionHook=instruction_hook, count=100,
uc_context=current_state)
if branch_info[branch_address].not_taken_branch_address is None:
print(f"Info: Starting emulation with condition not taken at 0x{start_pc:x}")
current_emu_helper = flare_emu.EmuHelper(verbose=1, emuHelper=emu_helper)
set_condition_flags(current_state, condition, False)
current_emu_helper.emulateRange(start_pc, user_data["endAddr"], instructionHook=instruction_hook, count=100,
uc_context=current_state)
emu_helper.stopEmulation(user_data)
return
elif address in breakpoint_addresses_branch:
instruction = idaapi.insn_t()
idaapi.decode_insn(instruction, address)
register_name = regid_2_name[instruction.Op1.reg]
register_value = uc.reg_read(unicorn_reg_map[register_name])
print(f"Info: Executing branch instruction at 0x{address:x}, {register_name} = 0x{register_value:x}")
if branch_info[address].branch_type == "jcc":
if is_condition_met(uc, branch_info[address].condition) and branch_info[address].taken_branch_address is None:
if register_value in range(function_ranges[branch_info[address].function_index][0],
function_ranges[branch_info[address].function_index][1]):
print(f"Info: Conditional jump (JCC) condition taken at 0x{address:x}, {register_name} = 0x{register_value:x}")
branch_info[address].taken_branch_address = register_value
elif not is_condition_met(uc, branch_info[address].condition) and branch_info[address].not_taken_branch_address is None:
if register_value in range(function_ranges[branch_info[address].function_index][0],
function_ranges[branch_info[address].function_index][1]):
print(f"Info: Conditional jump (JCC) condition not taken at 0x{address:x}, {register_name} = 0x{register_value:x}")
branch_info[address].not_taken_branch_address = register_value
if branch_info[address].taken_branch_address is not None and branch_info[address].not_taken_branch_address is not None:
print(
f"Info: All conditional jump (JCC) paths processed at 0x{address:x}. "
f"Taken path: 0x{branch_info[address].taken_branch_address:x}, "
f"Not taken path: 0x{branch_info[address].not_taken_branch_address:x}")
breakpoint_addresses_branch.remove(address)
elif branch_info[address].branch_type == "jmp" and branch_info[address].taken_branch_address is None:
if register_value in range(function_ranges[branch_info[address].function_index][0],
function_ranges[branch_info[address].function_index][1]):
branch_info[address].taken_branch_address = register_value
print(f"Info: Unconditional jump (JMP) executed at 0x{address:x}, {register_name} = 0x{register_value:x}")
breakpoint_addresses_branch.remove(address)
for (function_start, function_end) in function_ranges[:-1]:
print(f"Info: Emulating function from 0x{function_start:x} to 0x{function_end:x}")
emu_helper = flare_emu.EmuHelper(verbose=1)
try:
emu_helper.emulateRange(function_start, function_end - 4, instructionHook=instruction_hook, count=100)
print("Info: Function emulation completed successfully.")
except Exception as e:
print(f"Error: Exception occurred during emulation: {e}")
需要注意的几个地方:
- unicorn执行arm64 atomic instructions会出错,在unicorn中直接hook跳过这条指令
- 两个condition select使用不同condition的情况,两个condition刚好是相反的,识别时对不同的swap一下
- unicorn的save_context默认只能保存寄存器状态,mem map的保存可以借助flare_emu的emuHelper完成
- 部分函数的常量解密部分的代码要正常执行。
- 如果br间接跳转会跳回当前basic block,那么模拟执行部分会出现死循环,不过好在没有跳回的情况出现。
反混淆效果
https://bbs.kanxue.com/thread-283706.htm