Skip to content

GraphBuilder ¤

GraphBuilder ¤

GraphBuilder()
Source code in sema_toolchain/sema_scdg/application/helper/GraphBuilder.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def __init__(self):
    """
    Initialize GraphBuilder with configuration settings and logger.
    """
    config = configparser.ConfigParser()
    config.read(sys.argv[1])
    self.config = config

    self.DISCARD = {
        "LoopBreaker",
        "Dummy_call",
    }  # Nodes used for debug purpose but not real syscall
    self.TAKE = {}
    self.id = 0
    self.graph_file = None
    self.existing_nodes = {}
    self.current_trace_nodes = {}
    self.id_map = 0
    self.tabnode = []  # Nodes in gspan format
    self.tablink = []  # Edges in gspan format
    self.nodes = {}  # mapping Node ID --> node name (addr.callname args)
    self.mapping = {}
    self.on_flight = False
    self.dico_addr = {}

    # Metrics about traces which add information in the graph (or not)
    self.uselessTraces = 0
    self.usefullTraces = 0
    self.totTrace = 0

    # Default value of parameters
    self.graph_output = self.config['build_graph_arg']['graph_output']
    self.MERGE_CALL = not self.config['build_graph_arg'].getboolean('disjoint_union')
    self.COMP_ARGS = not self.config['build_graph_arg'].getboolean('not_comp_args')
    self.MIN_SIZE = int(self.config['build_graph_arg']['min_size'])
    self.IGNORE_ZERO = not self.config['build_graph_arg'].getboolean('not_ignore_zero')
    self.three_edges = self.config['build_graph_arg'].getboolean('three_edges')
    ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
    ROOT_DIR = ROOT_DIR.replace("/helper", "")

    self.__config_logger()
add_link(graph, dico, call)

Add links between calls in the graph based on call arguments and return value.

Parameters:

  • graph

    Graph representation.

  • dico

    Dictionary used to build links between args.

  • call

    Call information containing args and return value.

Source code in sema_toolchain/sema_scdg/application/helper/GraphBuilder.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
def add_link(self, graph, dico, call):
    """
    Add links between calls in the graph based on call arguments and return value.

    Args:
        graph: Graph representation.
        dico: Dictionary used to build links between args.
        call: Call information containing args and return value.
    """
    arg_id = 1
    if call["args"]:
        for j in call["args"]:
            if str(j) in dico and str(j) not in [" ", "", "None", "0"]:
                self.__create_link((str(self.id), arg_id), dico[str(j)], graph)
                dico[str(j)].append((self.id, arg_id))
            elif str(j) in dico and str(j) == "0" and not self.IGNORE_ZERO:
                self.__create_link((str(self.id), arg_id), dico[str(j)], graph)
                dico[str(j)].append((self.id, arg_id))
            else:
                try:
                    if (str(j) not in ["", " ", "None"] and (not self.IGNORE_ZERO or int(str(j)) != 0)):
                        dico[str(j)] = [(self.id, arg_id, j)]
                except Exception:
                    dico[str(j)] = [(self.id, arg_id, j)]
            arg_id = arg_id + 1

    ret = str(call["ret"])

    if call["ret"] != None and ret != "symbolic":
        try:
            if (str(j) not in ["", " ", "None"] and (not self.IGNORE_ZERO or int(ret) != 0)):
                if ret in dico:
                    self.__create_link((str(self.id), 0), dico[ret], graph)
                    dico[ret].append((self.id, 0))
                else:
                    dico[ret] = [(self.id, 0)]
        except Exception:
            if ret in dico:
                self.__create_link((str(self.id), 0), dico[ret], graph)
                dico[ret].append((self.id, 0))
            else:
                dico[ret] = [(self.id, 0)]
    return dico

build ¤

build(stashes_content, mapping, odir, family)

Build the system call dependency graph using the list representing the syscalls and the mapping.

Parameters:

  • stashes_content

    Content of the stashes.

  • mapping

    Name of the file for the mapping to use.

  • odir

    Output directory for the graph.

  • family

    Family of the graph.

Source code in sema_toolchain/sema_scdg/application/helper/GraphBuilder.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def build(self, stashes_content, mapping, odir, family):
    """
    Build the system call dependency graph using the list representing the syscalls and the mapping.

    Args:
        stashes_content: Content of the stashes.
        mapping: Name of the file for the mapping to use.
        odir: Output directory for the graph.
        family: Family of the graph.
    """
    self.__set_graph_parameters(mapping, odir, family)
    if self.graph_output == "":
        self.__build_graph(stashes_content, graph_output="gs")
        self.__build_graph(stashes_content, graph_output="json", gv = False)
    else :
        self.__build_graph(stashes_content, graph_output=self.graph_output)

clear ¤

clear()

Reset all lists and dictionaries of the object.

Metrics about traces which add information in the graph (or not) are reset to zero.

Source code in sema_toolchain/sema_scdg/application/helper/GraphBuilder.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def clear(self):
    """
    Reset all lists and dictionaries of the object.

    Metrics about traces which add information in the graph (or not) are reset to zero.
    """
    self.TAKE.clear()
    self.id = 0
    self.graph_file = None
    self.existing_nodes.clear()
    self.current_trace_nodes.clear()
    self.id_map = 0
    self.tabnode.clear()  # Nodes in gspan format
    self.tablink.clear()  # Edges in gspan format
    self.nodes.clear()  # mapping Node ID --> node name (addr.callname args)
    self.mapping.clear()
    self.on_flight = False
    self.dico_addr.clear()

    # Metrics about traces which add information in the graph (or not)
    self.uselessTraces = 0
    self.usefullTraces = 0
    self.totTrace = 0

reset_attributes ¤

reset_attributes()

Reset attributes used in the graph building process.

Clears various attributes to prepare for building a new graph.

Source code in sema_toolchain/sema_scdg/application/helper/GraphBuilder.py
313
314
315
316
317
318
319
320
321
322
323
324
325
def reset_attributes(self):
    """
    Reset attributes used in the graph building process.

    Clears various attributes to prepare for building a new graph.
    """
    self.id = 0
    self.tabnode = []
    self.tablink = []
    self.dico_addr.clear()
    self.existing_nodes.clear()
    self.current_trace_nodes.clear()
    self.nodes.clear()

save_result ¤

save_result(graph_output, json_content)

Save the result of the graph building process.

Parameters:

  • graph_output

    Output format for the graph.

  • json_content

    JSON content to be saved.

Source code in sema_toolchain/sema_scdg/application/helper/GraphBuilder.py
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
def save_result(self, graph_output, json_content):
    """
    Save the result of the graph building process.

    Args:
        graph_output: Output format for the graph.
        json_content: JSON content to be saved.
    """
    if graph_output == "json":
        json.dump(json_content, self.graph_file)
    self.graph_file.close()

    if self.on_flight:
        with open(self.mapping_dir, "w") as out_map:
            for key in self.mapping:
                out_map.write(f"{str(self.mapping[key])} {str(key)}" + "\n")

scdg_with_disjoint_union ¤

scdg_with_disjoint_union(SCDG, graph_output, json_content)

Build the system call dependency graph with disjoint union.

Parameters:

  • SCDG

    List representing syscalls.

  • graph_output

    Output format for the graph.

  • json_content

    Dictionary to store JSON content.

Returns:

  • None

Source code in sema_toolchain/sema_scdg/application/helper/GraphBuilder.py
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
def scdg_with_disjoint_union(self, SCDG, graph_output, json_content):
    """
    Build the system call dependency graph with disjoint union.

    Args:
        SCDG: List representing syscalls.
        graph_output: Output format for the graph.
        json_content: Dictionary to store JSON content.

    Returns:
        None
    """
    dot = Digraph(comment="SCDG with disjoint union", format="dot")
    for i in range(len(SCDG)):
        if len(SCDG[i]) >= self.MIN_SIZE:
            json_content[f"graph_{str(i)}"] = {"nodes": [], "links": []}
            self.__build_links(SCDG[i], dot)

            for n in self.tabnode:
                if graph_output == "json":
                    id_node = n.replace("\n", "").split(" ")[1]
                    node_name = self.nodes[id_node].split(" ")[0]
                    arg_node = self.nodes[id_node].split(" ")[1].split("\n")
                    content = self.existing_nodes[node_name]
                    newnode = {
                            "id": id_node,
                            "name": content["name"],
                            "addr": node_name.split(".")[0],
                            "args": arg_node,
                        }
                    json_content[f"graph_{str(i)}"]["nodes"].append(newnode)
                else:
                    self.graph_file.write(n)
            for l in self.tablink:
                if graph_output == "json":
                    tab_split = l.split(" ")
                    newlink = {
                            "id1": tab_split[1],
                            "id2": tab_split[2],
                            "label": tab_split[3],
                        }
                    json_content[f"graph_{str(i)}"]["links"].append(newlink)
                else:
                    self.graph_file.write(l)

            dot.save(f"{self.odir}/test-output/disjoint_union{str(i)}.gv")
            self.reset_attributes()
            dot.clear()
    dot.save(f"{self.odir}/test-output/disjoint_union.gv")

scdg_with_merge_calls ¤

scdg_with_merge_calls(SCDG, graph_output, gv, json_content)

Build the system call dependency graph with merge calls.

Parameters:

  • SCDG

    List representing syscalls.

  • graph_output

    Output format for the graph.

  • gv

    Boolean to determine if the function should also provide the gv graph.

  • json_content

    Dictionary to store JSON content.

Source code in sema_toolchain/sema_scdg/application/helper/GraphBuilder.py
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
def scdg_with_merge_calls(self, SCDG, graph_output, gv, json_content):
    """
    Build the system call dependency graph with merge calls.

    Args:
        SCDG: List representing syscalls.
        graph_output: Output format for the graph.
        gv: Boolean to determine if the function should also provide the gv graph.
        json_content: Dictionary to store JSON content.
    """
    json_content["nodes"] = []
    json_content["links"] = []

    dico = {}
    dot = Digraph(comment="Global SCDG with merge call", format="dot")

    for i in range(len(SCDG)):
        self.log.info(f"Using SCDG {str(i + 1)} over {len(SCDG)}")

        if len(SCDG[i]) >= self.MIN_SIZE:
            self.__build_links(SCDG[i], dot, dico)
        else:
            self.log.info(
                f"The SCDG {str(i)} was too small, smaller than {str(self.MIN_SIZE)} calls."
            )
        self.current_trace_nodes.clear()

    # Save data parts
    for n in self.tabnode:
        if graph_output == "json":
            id_node = n.replace("\n", "").split(" ")[1]
            node_name = self.nodes[id_node].split(" ")[0]
            arg_node = self.nodes[id_node].split(" ")[1].split("\n")
            content = self.existing_nodes[node_name]
            newnode = {
                    "id": id_node,
                    "name": content["name"],
                    "addr": node_name.split(".")[0],
                    "args": arg_node,
                }
            json_content["nodes"].append(newnode)
        else:
            self.graph_file.write(n)
    for l in self.tablink:
        if graph_output == "json":
            tab_split = l.split(" ")
            newlink = {
                    "id1": tab_split[1],
                    "id2": tab_split[2],
                    "label": tab_split[3].replace("\n", ""),
                }
            json_content["links"].append(newlink)
        else:
            self.graph_file.write(l)
    if gv:
        dot.save(f"{self.odir}/final_SCDG.gv")