Class for managing the SemaSCDG application, including setting up configurations, creating angr projects, running exploration, building SCDG graphs, and handling various analysis tasks.
This class encapsulates the functionality for initializing the application, setting up configurations, creating angr projects, running exploration, building SCDG graphs, and managing analysis tasks.
This method sets up the application environment, including configurations, logging, plugins, and directories for storing results.
Source code in sema_toolchain/sema_scdg/application/SemaSCDG.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145 | def __init__(self):
"""
Initializes the SemaSCDG application with configurations, log settings, plugins, and other necessary components.
This method sets up the application environment, including configurations, logging, plugins, and directories for storing results.
"""
self.config = config
config.read(sys.argv[1])
self.get_config_param(self.config)
self.log = logger
self.log_level_sema = log_level_sema
self.log_level_angr = log_level_angr
self.log_level_claripy= log_level_claripy
self.store_data = self.csv_file != ""
self.scdg_graph = []
self.new = {}
self.nameFileShort = ""
self.content = ""
self.plugins = PluginManager()
self.packing_manager = self.plugins.get_plugin_packing()
self.data_manager = DataManager()
self.explorer_manager = SemaExplorerManager()
self.nb_exps = 0
self.current_exps = 0
self.current_exp_dir = 0
self.windows_simproc = WindowsSimProcedure(verbose=True)
self.linux_simproc = LinuxSimProcedure(verbose=True)
self.syscall_to_scdg_builder = SyscallToSCDG(self.scdg_graph)
self.graph_builder = GraphBuilder()
# Setup the output directory
self.log.info(f"Results will be saved into : {self.mapping_dir}")
with contextlib.suppress(Exception):
os.makedirs(self.mapping_dir)
self.save_conf()
|
collect_data(exp_dir, proj, state, simgr, execution_time)
Collects and processes data after the binary analysis.
This function handles the collection of execution time, printing block information, logging syscall details, loading plugin data, tracking commands, and building an IOC (Indicator of Compromise) report.
Source code in sema_toolchain/sema_scdg/application/SemaSCDG.py
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544 | def collect_data(self, exp_dir, proj, state, simgr, execution_time):
"""
Collects and processes data after the binary analysis.
This function handles the collection of execution time, printing block information, logging syscall details, loading plugin data, tracking commands, and building an IOC (Indicator of Compromise) report.
"""
self.data_manager.data["execution_time"] = execution_time
self.log.info(f"Total execution time: {execution_time}")
if self.count_block_enable:
self.data_manager.print_block_info()
self.log.debug(f"Syscalls Found:{self.call_sim.syscall_found}")
self.log.debug(f"Loaded libraries:{proj.loader.requested_names}")
if self.plugin_enable:
self.data_manager.get_plugin_data(state, simgr, to_store=self.store_data)
if self.track_command:
self.plugins.enable_plugin_commands(self, simgr, self.scdg_graph, exp_dir)
if self.ioc_report:
self.plugins.enable_plugin_ioc(self, self.scdg_graph, exp_dir)
|
create_binary_init_state(proj)
Creates the initial state for binary analysis with specified arguments, entry address, and options.
This function constructs the initial state for binary analysis, incorporating binary arguments, entry address, angr state options, simulation file handling, heap setup, plugin loading, and constraint enforcement for ASCII characters.
Source code in sema_toolchain/sema_scdg/application/SemaSCDG.py
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419 | def create_binary_init_state(self, proj):
"""
Creates the initial state for binary analysis with specified arguments, entry address, and options.
This function constructs the initial state for binary analysis, incorporating binary arguments, entry address, angr state options, simulation file handling, heap setup, plugin loading, and constraint enforcement for ASCII characters.
"""
args_binary = self.get_binary_args()
entry_addr = self.get_entry_addr(proj)
options = self.get_angr_state_options()
state = proj.factory.entry_state(addr=entry_addr, args=args_binary, add_options=options)
self.handle_simfile(state)
state.options.discard("LAZY_SOLVES")
state.register_plugin(
"heap",
angr.state_plugins.heap.heap_ptmalloc.SimHeapPTMalloc()
)
# Enable plugins set to true in config file
if self.plugin_enable:
self.plugins.load_plugin(state, self.config)
self.setup_heap(state, proj)
# Constraint arguments to ASCII
for i in range(1, len(args_binary)):
for byte in args_binary[i].chop(8):
# state.add_constraints(byte != '\x00') # null
state.add_constraints(byte >= " ".encode("utf8")) # '\x20'
state.add_constraints(byte <= "~".encode("utf8")) # '\x7e'
return state, args_binary
|
Handles different packing scenarios for the binary analysis process.
This function determines the appropriate actions based on the packing type and binary path, setting up the analysis environment accordingly.
Source code in sema_toolchain/sema_scdg/application/SemaSCDG.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284 | def deal_with_packing(self):
"""
Handles different packing scenarios for the binary analysis process.
This function determines the appropriate actions based on the packing type and binary path, setting up the analysis environment accordingly.
"""
if self.is_packed:
if self.packing_type == "symbion":
proj_init = self.init_angr_project(self.binary_path, auto_load_libs=True, support_selfmodifying_code=True)
preload, avatar_gdb = self.packing_manager.setup_symbion(self.binary_path, proj_init, self.concrete_target_is_local, self.call_sim, self.log)
proj = self.init_angr_project(self.binary_path, auto_load_libs=False, load_debug_info=True, preload_libs=preload, support_selfmodifying_code=True, concrete_target=avatar_gdb)
for lib in self.call_sim.system_call_table:
print(proj.loader.find_all_symbols(lib))
elif self.packing_type == "unipacker":
nameFile_unpacked = self.packing_manager.setup_unipacker(self.binary_path, self.nameFileShort, self.log)
proj = self.init_angr_project(nameFile_unpacked, auto_load_libs=True, support_selfmodifying_code=True)
elif self.binary_path.endswith(".bin") or self.binary_path.endswith(".dmp"):
# TODO : implement function -> see PluginPacking.py
self.packing_manager.setup_bin_dmp()
else:
# default behaviour
proj = self.init_angr_project(self.binary_path, support_selfmodifying_code=True, auto_load_libs=True, load_debug_info=True, simos=None)
return proj
|
Finalizes the binary analysis process by clearing resources and data structures.
This function removes handlers, clears simulation data, and resets various components to conclude the analysis.
Source code in sema_toolchain/sema_scdg/application/SemaSCDG.py
621
622
623
624
625
626
627
628
629
630
631
632 | def end_run(self):
"""
Finalizes the binary analysis process by clearing resources and data structures.
This function removes handlers, clears simulation data, and resets various components to conclude the analysis.
"""
logging.getLogger().removeHandler(self.fileHandler)
with contextlib.suppress(Exception):
self.call_sim.clear()
self.scdg_graph.clear()
self.graph_builder.clear()
self.data_manager.clear()
|
Retrieves and returns a set of angr state options based on the configuration settings.
This function reads the ANGR state options from the configuration, converts them to uppercase strings, and returns them as a set.
Source code in sema_toolchain/sema_scdg/application/SemaSCDG.py
232
233
234
235
236
237
238
239
240
241
242 | def get_angr_state_options(self):
"""
Retrieves and returns a set of angr state options based on the configuration settings.
This function reads the ANGR state options from the configuration, converts them to uppercase strings, and returns them as a set.
"""
options = set()
for option in self.config["ANGR_State_options_to_add"] :
if self.config["ANGR_State_options_to_add"].getboolean(option):
options.add(str.upper(option))
return options
|
Generates symbolic arguments for the binary analysis.
This function creates a list of binary arguments, including the binary name and symbolic arguments based on the number of arguments specified.
Source code in sema_toolchain/sema_scdg/application/SemaSCDG.py
338
339
340
341
342
343
344
345
346
347
348 | def get_binary_args(self):
"""
Generates symbolic arguments for the binary analysis.
This function creates a list of binary arguments, including the binary name and symbolic arguments based on the number of arguments specified.
"""
args_binary = [self.nameFileShort]
if self.n_args:
for i in range(self.n_args):
args_binary.append(claripy.BVS("arg" + str(i), 8 * 16))
return args_binary
|
Extracts configuration parameters from the provided config object.
This function retrieves various configuration parameters needed for the SemaSCDG application from the config object.
Source code in sema_toolchain/sema_scdg/application/SemaSCDG.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178 | def get_config_param(self, config):
"""
Extracts configuration parameters from the provided config object.
This function retrieves various configuration parameters needed for the SemaSCDG application from the config object.
"""
output_dir = "database/SCDG/runs/"
self.fast_main = config['SCDG_arg'].getboolean('fast_main')
self.concrete_target_is_local = config['SCDG_arg'].getboolean('concrete_target_is_local')
self.is_packed = config['SCDG_arg'].getboolean('is_packed')
self.packing_type = config['SCDG_arg']['packing_type']
self.keep_inter_scdg = config['SCDG_arg'].getboolean('keep_inter_scdg')
self.approximate = config['SCDG_arg'].getboolean('approximate')
self.track_command = config['Plugins_to_load'].getboolean('plugin_track_command')
self.ioc_report = config['Plugins_to_load'].getboolean('plugin_ioc_report')
self.hooks_enable = config['Plugins_to_load'].getboolean('plugin_hooks')
self.sim_file = config['SCDG_arg'].getboolean('sim_file')
self.count_block_enable = config['SCDG_arg'].getboolean('count_block_enable')
self.plugin_enable = config['SCDG_arg'].getboolean('plugin_enable')
self.expl_method = config['SCDG_arg']["expl_method"]
self.family = config['SCDG_arg']['family']
self.exp_dir_name = config['SCDG_arg']['exp_dir']
self.exp_dir = output_dir + self.exp_dir_name + "/" + self.family
self.mapping_dir = output_dir + self.exp_dir_name + "/"
self.binary_path = config['SCDG_arg']['binary_path']
self.n_args = int(config['SCDG_arg']['n_args'])
self.csv_file = config['SCDG_arg']['csv_file']
self.csv_path = output_dir + self.exp_dir_name + "/" + self.csv_file
self.conf_path = output_dir + self.exp_dir_name + "/scdg_conf.json"
self.pre_run_thread = config['SCDG_arg'].getboolean('pre_run_thread')
self.runtime_run_thread = config['SCDG_arg'].getboolean('runtime_run_thread')
self.post_run_thread = config['SCDG_arg'].getboolean('post_run_thread')
|
get_entry_addr
Retrieves the entry address for the analysis from the provided project.
This function searches for the entry address in the project, considering the 'fast_main' flag and configuration settings, and returns the entry address in hexadecimal format.
Source code in sema_toolchain/sema_scdg/application/SemaSCDG.py
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336 | def get_entry_addr(self, proj):
"""
Retrieves the entry address for the analysis from the provided project.
This function searches for the entry address in the project, considering the 'fast_main' flag and configuration settings, and returns the entry address in hexadecimal format.
"""
# TODO : Maybe useless : Try to directly go into main (optimize some binary in windows)
r = r2pipe.open(self.binary_path)
out_r2 = r.cmd('f ~sym._main')
out_r2 = r.cmd('f ~sym._main')
addr_main = proj.loader.find_symbol("main")
if addr_main and self.fast_main:
addr = addr_main.rebased_addr
elif out_r2:
addr= None
with contextlib.suppress(Exception):
iter = out_r2.split("\n")
for s in iter:
if s.endswith("._main"):
addr = int(s.split(" ")[0],16)
else:
# Take the entry point specify in config file
addr = self.config["SCDG_arg"]["entry_addr"]
if addr != "None":
#Convert string into hexadecimal
addr = hex(int(addr, 16))
else:
addr = None
self.log.info(f"Entry_state address = {str(addr)}")
return addr
|
get_stashes_content
get_stashes_content(main_obj, state, simgr, exp_dir)
Constructs System Call Dependency Graph (SCDG) content from simulation stashes.
Processes simulation stashes to extract relevant traces for graph construction, ensuring uniqueness based on hash values.
Source code in sema_toolchain/sema_scdg/application/SemaSCDG.py
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676 | def get_stashes_content(self, main_obj, state, simgr, exp_dir):
"""
Constructs System Call Dependency Graph (SCDG) content from simulation stashes.
Processes simulation stashes to extract relevant traces for graph construction, ensuring uniqueness based on hash values.
"""
dump_file = {}
dump_id = 0
dic_hash_SCDG = {}
scdg_fin = []
# Add all traces with relevant content to graph construction
stashes = {
"deadended" : simgr.deadended,
"active" : simgr.active,
"errored" : simgr.errored,
"pause" : simgr.pause,
"ExcessLoop" : simgr.stashes["ExcessLoop"],
"ExcessStep" : simgr.stashes["ExcessStep"],
"unconstrained" : simgr.unconstrained,
"new_addr" : simgr.stashes["new_addr"],
"deadbeef" : simgr.stashes["deadbeef"],
"lost" : simgr.stashes["lost"]
}
for stash_name in stashes:
for state in stashes[stash_name]:
present_state = state
if stash_name == "errored":
present_state = state.state
hashVal = hash(str(self.scdg_graph[present_state.globals["id"]]))
if hashVal not in dic_hash_SCDG:
dic_hash_SCDG[hashVal] = 1
dump_file[dump_id] = {
"status": stash_name,
"trace": self.scdg_graph[present_state.globals["id"]],
}
dump_id = dump_id + 1
scdg_fin.append(self.scdg_graph[present_state.globals["id"]])
self.print_memory_info(main_obj, dump_file)
if self.keep_inter_scdg:
self.keep_inter_scdg_meth(exp_dir, dump_file)
return scdg_fin
|
Handles the simulation file by inserting it into the state file system.
This function reads the simulation file content, creates SimFile objects, and inserts them into the state file system if the simulation file flag is set.
Source code in sema_toolchain/sema_scdg/application/SemaSCDG.py
350
351
352
353
354
355
356
357
358
359
360
361
362 | def handle_simfile(self, state):
"""
Handles the simulation file by inserting it into the state file system.
This function reads the simulation file content, creates SimFile objects, and inserts them into the state file system if the simulation file flag is set.
"""
if self.sim_file:
with open_file(self.binary_path, "rb") as f:
self.content = f.read()
simfile = angr.SimFile(self.nameFileShort, content=self.content)
state.fs.insert(self.nameFileShort, simfile)
pagefile = angr.SimFile("pagefile.sys", content=self.content)
state.fs.insert("pagefile.sys", pagefile)
|
init_angr_project(
namefile,
preload_libs=[],
concrete_target=None,
support_selfmodifying_code=None,
simos=None,
arch=None,
auto_load_libs=False,
load_debug_info=False,
)
Initializes and returns an angr Project object with specified parameters.
This function creates an angr Project object with the provided parameters for analysis and symbolic execution.
Source code in sema_toolchain/sema_scdg/application/SemaSCDG.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215 | def init_angr_project(self, namefile, preload_libs=[], concrete_target=None, support_selfmodifying_code=None, simos=None, arch=None, auto_load_libs=False, load_debug_info= False):
"""
Initializes and returns an angr Project object with specified parameters.
This function creates an angr Project object with the provided parameters for analysis and symbolic execution.
"""
return angr.Project(
namefile,
use_sim_procedures=True,
load_options={
"auto_load_libs": auto_load_libs,
"load_debug_info": load_debug_info,
"preload_libs": preload_libs,
},
support_selfmodifying_code=support_selfmodifying_code,
simos=simos,
arch=arch,
concrete_target=concrete_target,
default_analysis_mode=(
"symbolic_approximating" if self.approximate else "symbolic"
),
)
|
keep_inter_scdg_meth(exp_dir, dump_file)
Keeps an intermediate System Call Dependency Graph (SCDG) by updating a JSON file with new data.
Appends the provided data to the existing JSON file or creates a new one if it does not exist.
Source code in sema_toolchain/sema_scdg/application/SemaSCDG.py
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692 | def keep_inter_scdg_meth(self, exp_dir, dump_file):
"""
Keeps an intermediate System Call Dependency Graph (SCDG) by updating a JSON file with new data.
Appends the provided data to the existing JSON file or creates a new one if it does not exist.
"""
ofilename = f"{exp_dir}inter_SCDG.json"
self.log.debug(ofilename)
list_obj = []
if os.path.isfile(ofilename):
with open(ofilename) as fp:
list_obj = json_dumper.load(fp)
list_obj.append(dump_file)
with open(ofilename, "w") as save_SCDG:
json_dumper.dump(list_obj, save_SCDG)
|
perform_exploration(exp_dir, proj, simgr)
Performs the exploration process for the binary analysis.
This function sets up the exploration technique, handles runtime thread settings, logs loader information, runs the simulation manager, and records the exploration time.
Source code in sema_toolchain/sema_scdg/application/SemaSCDG.py
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521 | def perform_exploration(self, exp_dir, proj, simgr):
"""
Performs the exploration process for the binary analysis.
This function sets up the exploration technique, handles runtime thread settings, logs loader information, runs the simulation manager, and records the exploration time.
"""
exploration_tech = self.explorer_manager.get_exploration_tech(self.nameFileShort, simgr, exp_dir, proj, self.expl_method, self.scdg_graph, self.call_sim)
if self.runtime_run_thread:
simgr.active[0].globals["is_thread"] = True
self.log.info(proj.loader.all_pe_objects)
self.log.info(proj.loader.extern_object)
self.log.info(proj.loader.symbols)
simgr.use_technique(exploration_tech)
self.log.info(
"\n------------------------------\nStart -State of simulation manager :\n "
+ str(simgr)
+ "\n------------------------------"
)
start_explo_time = time.time()
simgr.run()
self.data_manager.data["exploration_time"] = time.time() - start_explo_time
self.log.info(
"\n------------------------------\nEnd - State of simulation manager :\n "
+ str(simgr)
+ "\n------------------------------"
)
|
print_memory_info(main_obj, dump_file)
Prints memory section information for the main object.
This function extracts and logs details about memory sections, including virtual address, memory size, and permissions.
Source code in sema_toolchain/sema_scdg/application/SemaSCDG.py
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712 | def print_memory_info(self, main_obj, dump_file):
"""
Prints memory section information for the main object.
This function extracts and logs details about memory sections, including virtual address, memory size, and permissions.
"""
dump_file["sections"] = {}
for sec in main_obj.sections:
name = sec.name.replace("\x00", "")
info_sec = {
"vaddr": sec.vaddr,
"memsize": sec.memsize,
"is_readable": sec.is_readable,
"is_writable": sec.is_writable,
"is_executable": sec.is_executable,
}
dump_file["sections"][name] = info_sec
self.log.info(name)
self.log.info(dump_file["sections"][name])
|
print_program_info(proj, main_obj, os_obj)
Prints information about the program, including libraries used, OS recognition, CPU architecture, entry point, memory addresses, stack executability, binary position independence, and exploration method.
This function logs various details about the program, such as libraries, OS, CPU architecture, entry point, memory addresses, stack properties, binary position independence, and exploration method.
Source code in sema_toolchain/sema_scdg/application/SemaSCDG.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230 | def print_program_info(self, proj, main_obj, os_obj):
"""
Prints information about the program, including libraries used, OS recognition, CPU architecture, entry point, memory addresses, stack executability, binary position independence, and exploration method.
This function logs various details about the program, such as libraries, OS, CPU architecture, entry point, memory addresses, stack properties, binary position independence, and exploration method.
"""
self.log.info(f"Libraries used are :\n {str(proj.loader.requested_names)}")
self.log.info(f"OS recognized as : {str(os_obj)}")
self.log.info(f"CPU architecture recognized as : {str(proj.arch)}")
self.log.info(f"Entry point of the binary recognized as : {hex(proj.entry)}")
self.log.info(f"Min/Max addresses of the binary recognized as : {str(proj.loader)}")
self.log.info(f"Stack executable ? {str(main_obj.execstack)}")
self.log.info(f"Binary position-independent ? {str(main_obj.pic)}")
self.log.info(f"Exploration method: {str(self.expl_method)}")
|
Handles project creation and initial analysis setup.
Source code in sema_toolchain/sema_scdg/application/SemaSCDG.py
478
479
480
481
482
483
484
485
486
487
488 | def project_creation(self):
"""Handles project creation and initial analysis setup."""
proj = self.deal_with_packing()
main_obj = proj.loader.main_object
os_obj = main_obj.os
if self.count_block_enable:
self.data_manager.count_block(proj, main_obj)
self.print_program_info(proj, main_obj, os_obj)
self.setup_simproc_scdg_builder(proj, os_obj)
state, args_binary = self.create_binary_init_state(proj)
return proj, main_obj, os_obj, state, args_binary
|
Runs the complete analysis process for the binary.
This function orchestrates the entire analysis process, including setting up the environment, creating the initial state, configuring hooks, exploration, data collection, SCDG (System Call Dependency Graph) construction, and finalization steps.
Source code in sema_toolchain/sema_scdg/application/SemaSCDG.py
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619 | def run(self, exp_dir):
"""
Runs the complete analysis process for the binary.
This function orchestrates the entire analysis process, including setting up the environment, creating the initial state, configuring hooks, exploration, data collection, SCDG (System Call Dependency Graph) construction, and finalization steps.
"""
start_execution_time = time.time()
exp_dir, self.fileHandler = self.run_setup(exp_dir)
title = f"--- Building SCDG of {self.family}/{self.nameFileShort} ---"
self.log.info("\n" + "-" * len(title) + "\n" + title + "\n" + "-" * len(title))
# Project creation
proj, main_obj, os_obj, state, args_binary = self.project_creation()
# Custom Hooking
start_hooking_time = time.time()
self.setup_hooks(proj, state, os_obj)
self.data_manager.data["hooking_time"] = time.time() - start_hooking_time
# Creation of simulation managerinline_call, primary interface in angr for performing execution
simgr = proj.factory.simulation_manager(state)
dump_file = {}
self.print_memory_info(main_obj, dump_file)
# Exploration
if self.pre_run_thread:
state.plugin_thread.pre_run_thread(self.content, self.inputs)
self.set_breakpoints(state)
# (3) TODO: move that but as serena purposes
for sec in main_obj.sections:
name = sec.name.replace("\x00", "")
if name == ".rsrc":
simgr.active[0].globals["rsrc"] = sec.vaddr
self.scdg_graph.append(
[
{
"name": "main",
"args": [str(args) for args in args_binary],
"addr": state.addr,
"ret": "symbolic",
"addr_func": state.addr,
}
]
)
self.perform_exploration(exp_dir, proj, simgr)
if self.post_run_thread:
state.plugin_thread.post_run_thread(simgr)
# Data collection
execution_time = time.time() - start_execution_time
self.collect_data(exp_dir, proj, state, simgr, execution_time)
# SCDG build
stashes_content = self.get_stashes_content(main_obj, state, simgr, exp_dir)
self.graph_builder.build(
stashes_content,
f"{self.mapping_dir}mapping_{self.exp_dir_name}.txt",
f"{self.exp_dir}/{self.nameFileShort}",
self.family,
)
if self.store_data:
self.data_manager.save_to_csv(proj, self.family, self.call_sim, self.csv_path)
self.end_run()
|
Runs the setup process for the experiment directory.
This function prepares the experiment directory by setting up a CSV file, extracting the sample name, creating directories, configuring log handlers, and returning the updated experiment directory path and file handler.
Source code in sema_toolchain/sema_scdg/application/SemaSCDG.py
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456 | def run_setup(self, exp_dir):
"""
Runs the setup process for the experiment directory.
This function prepares the experiment directory by setting up a CSV file, extracting the sample name, creating directories, configuring log handlers, and returning the updated experiment directory path and file handler.
"""
# TODO check if PE file get /GUARD option (VS code) with leaf
# Create a Dataframe for future data if a csv file is specified
if self.store_data:
self.data_manager.setup_csv(self.csv_path)
# Take name of the sample without full path
if "/" in self.binary_path:
self.nameFileShort = self.binary_path.split("/")[-1]
else:
self.nameFileShort = self.binary_path
self.data_manager.data["nameFileShort"] = self.nameFileShort
try:
os.stat(exp_dir + self.nameFileShort)
except Exception:
os.makedirs(exp_dir + self.nameFileShort)
#Set log handler
fileHandler = logging.FileHandler(exp_dir + self.nameFileShort + "/" + "scdg.ans")
fileHandler.setFormatter(CustomFormatter())
try:
logging.getLogger().removeHandler(fileHandler)
except Exception:
self.log.warning("Exception remove filehandler")
logging.getLogger().addHandler(fileHandler)
exp_dir = exp_dir + self.nameFileShort + "/"
return exp_dir, fileHandler
|
Saves the configuration of the experiment in a JSON file.
This function converts the configuration parameters into a dictionary and writes them to a JSON file for future reference.
Source code in sema_toolchain/sema_scdg/application/SemaSCDG.py
180
181
182
183
184
185
186
187
188
189
190
191
192 | def save_conf(self):
"""
Saves the configuration of the experiment in a JSON file.
This function converts the configuration parameters into a dictionary and writes them to a JSON file for future reference.
"""
param = {}
sections = self.config.sections()
for section in sections:
items=self.config.items(section)
param[section]=dict(items)
with open(self.conf_path, "w") as f:
json.dump(param, f, indent=4)
|
Sets breakpoints for various inspection actions in the given state.
This function sets breakpoints for different inspection actions based on the state provided, such as adding calls, debugging calls, printing state addresses, adding instruction addresses, and adding block addresses.
Source code in sema_toolchain/sema_scdg/application/SemaSCDG.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258 | def set_breakpoints(self, state):
"""
Sets breakpoints for various inspection actions in the given state.
This function sets breakpoints for different inspection actions based on the state provided, such as adding calls, debugging calls, printing state addresses, adding instruction addresses, and adding block addresses.
"""
state.inspect.b("simprocedure", when=angr.BP_AFTER, action=self.syscall_to_scdg_builder.add_call)
state.inspect.b("simprocedure", when=angr.BP_BEFORE, action=self.syscall_to_scdg_builder.add_call_debug)
state.inspect.b("call", when=angr.BP_BEFORE, action=self.syscall_to_scdg_builder.add_addr_call)
state.inspect.b("call", when=angr.BP_AFTER, action=self.syscall_to_scdg_builder.rm_addr_call)
if self.count_block_enable:
state.inspect.b("instruction",when=angr.BP_BEFORE, action=self.data_manager.print_state_address)
state.inspect.b("instruction",when=angr.BP_AFTER, action=self.data_manager.add_instr_addr)
state.inspect.b("irsb",when=angr.BP_BEFORE, action=self.data_manager.add_block_addr)
|
Sets up the heap memory structure in the state based on the architecture.
This function configures the heap memory structure in the state based on the architecture of the project, adjusting memory addresses and values accordingly.
Source code in sema_toolchain/sema_scdg/application/SemaSCDG.py
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382 | def setup_heap(self, state, proj):
"""
Sets up the heap memory structure in the state based on the architecture.
This function configures the heap memory structure in the state based on the architecture of the project, adjusting memory addresses and values accordingly.
"""
tib_addr = state.regs.fs.concat(state.solver.BVV(0, 16))
if proj.arch.name == "AMD64":
peb_addr = state.mem[tib_addr + 0x60].qword.resolved
ProcessHeap = peb_addr + 0x500 #0x18
state.mem[peb_addr + 0x10].qword = ProcessHeap
state.mem[ProcessHeap + 0x18].dword = 0x0 # heapflags windowsvistaorgreater
state.mem[ProcessHeap + 0x70].dword = 0x0 # heapflags else
else:
peb_addr = state.mem[tib_addr + 0x30].dword.resolved
ProcessHeap = peb_addr + 0x500
state.mem[peb_addr + 0x18].dword = ProcessHeap
state.mem[ProcessHeap+0xc].dword = 0x0 #heapflags windowsvistaorgreater
state.mem[ProcessHeap+0x40].dword = 0x0 #heapflags else
|
setup_hooks(proj, state, os_obj)
Sets up hooks for the binary analysis based on the operating system.
This function configures hooks for the binary analysis, including loading libraries, setting custom hooks, and initializing hooks based on the operating system.
Source code in sema_toolchain/sema_scdg/application/SemaSCDG.py
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476 | def setup_hooks(self, proj, state, os_obj):
"""
Sets up hooks for the binary analysis based on the operating system.
This function configures hooks for the binary analysis, including loading libraries, setting custom hooks, and initializing hooks based on the operating system.
"""
if os_obj == "windows":
self.call_sim.loadlibs_proc(self.call_sim.system_call_table, proj) #TODO mbs=symbs,dll=dll)
self.call_sim.custom_hook_static(proj)
if os_obj != "windows":
self.call_sim.custom_hook_linux_symbols(proj)
self.call_sim.custom_hook_no_symbols(proj)
else:
self.call_sim.custom_hook_windows_symbols(proj) #TODO ue if (self.is_packed and False) else False,symbs)
if self.hooks_enable:
self.plugins.enable_plugin_hooks(self, self.content, state, proj, self.call_sim)
|
setup_simproc_scdg_builder(proj, os_obj)
Sets up the system call procedure and builder based on the operating system.
This function initializes the appropriate system call procedure and builder based on the operating system, loads the syscall table, and logs the system call table information.
Source code in sema_toolchain/sema_scdg/application/SemaSCDG.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305 | def setup_simproc_scdg_builder(self, proj, os_obj):
"""
Sets up the system call procedure and builder based on the operating system.
This function initializes the appropriate system call procedure and builder based on the operating system, loads the syscall table, and logs the system call table information.
"""
# Load pre-defined syscall table
if os_obj == "windows":
self.call_sim = self.windows_simproc
self.call_sim.setup("windows")
else:
self.call_sim = self.linux_simproc
self.call_sim.setup("linux")
self.call_sim.load_syscall_table(proj)
self.syscall_to_scdg_builder.set_call_sim(self.call_sim)
self.log.info("System call table loaded")
self.log.debug(f"System call table size : {len(self.call_sim.system_call_table)}")
|