Skip to content

Control flow graph extractors

CFG Extractor with ANTLR listener (version 1)

ANTLR listener to build control flow graph (CFG) for C++ functions If a CPP file contains multiple function the CFG is created for each function. For each function in the source file the CFG is stored in a dot file which can be visualized with graphviz.

Changelog:

Version 0.2.0

  • Add writing the extracted CFG in dot file.
  • Add visualization with pydot and graphviz

Version 0.1.0

CFGInstListener

__init__(self, common_token_stream, number_of_tokens, directory_name) special

:param common_token_stream:

Source code in coda\analysis\cfg\cfg_extractor_listener1.py
def __init__(self, common_token_stream: CommonTokenStream, number_of_tokens, directory_name):
    """
    :param common_token_stream:
    """
    self.cfg_path = 'CFGS/' + directory_name + '/'
    self.instrument_path = 'Instrument/' + directory_name + '/'
    self.block_dict = {}
    self.block_number = 0
    self.block_start = 0
    self.block_stop = 0
    self.domain_name = 0
    self.function_dict = {}
    self.select_junction_stack = []
    self.select_decision_stack = []
    self.iterate_junction_stack = []
    self.iterate_stack = []
    self.switch_junction_stack = []
    self.switch_stack = []
    self.switch_for_stack = []
    self.has_jump_stack = []
    self.has_default_stack = []
    self.has_case_stack = []
    self.try_stack = []
    self.try_junction_stack = []
    self.is_catch = False
    self.afterInsert = [''] * number_of_tokens
    self.initial_nodes = set()
    self.final_nodes = set()
    self.label_dict = {}
    self.goto_dict = {}

    # Move all the tokens in the source code in a buffer, token_stream_rewriter.
    if common_token_stream is not None:
        self.token_stream_rewriter = TokenStreamRewriter.TokenStreamRewriter(common_token_stream)
    else:
        raise TypeError('common_token_stream is None')

enterStatement(self, ctx)

DFS traversal of a statement subtree, rooted at ctx. If the statement is a branching condition insert a prob. :param ctx: :return:

Source code in coda\analysis\cfg\cfg_extractor_listener1.py
def enterStatement(self, ctx: CPP14_v2Parser.StatementContext):
    """
    DFS traversal of a statement subtree, rooted at ctx.
    If the statement is a branching condition insert a prob.
    :param ctx:
    :return:
    """
    # do-while and range-for
    if isinstance(ctx.parentCtx,
                  (CPP14_v2Parser.Iterationstatement4Context, CPP14_v2Parser.Iterationstatement2Context)):
        # if there is a compound statement after the branchning condition:
        body = ctx.compoundstatement()
        if body is not None:
            self.insertAfter(body)
        # if there is only one statement after the branchning condition then create a block.
        else:
            new_code = '{'
            new_code += '\n' + self.logLine() + ';\n'
            self.token_stream_rewriter.insertBeforeIndex(ctx.start.tokenIndex, new_code)

    # one line while and for
    elif isinstance(ctx.parentCtx, CPP14_v2Parser.IterationstatementContext):
        self.block_number += 1
        self.block_start = ctx.start.line

        # if there is a compound statement after the branchning condition:
        body = ctx.compoundstatement()
        if body is not None:
            self.insertAfter(body)
        # if there is only one statement after the branchning condition then create a block.
        else:
            new_code = '{'
            new_code += '\n' + self.logLine() + ';\n'
            self.token_stream_rewriter.insertBeforeIndex(ctx.start.tokenIndex, new_code)
    elif isinstance(ctx.parentCtx,
                    (CPP14_v2Parser.Selectionstatement1Context, CPP14_v2Parser.Selectionstatement2Context)):
        self.block_number += 1
        self.addDecisionEdge()
        self.block_start = ctx.start.line
        self.has_jump_stack.append(False)
        # if there is a compound statement after the branchning condition:
        body = ctx.compoundstatement()
        if body is not None:
            self.insertAfter(body)
        # if there is only one statement after the branchning condition then create a block.
        else:
            new_code = '{'
            new_code += '\n' + self.logLine() + ';\n'
            self.token_stream_rewriter.insertBeforeIndex(ctx.start.tokenIndex, new_code)

    elif isinstance(ctx.parentCtx,
                    CPP14_v2Parser.Selectionstatement3Context):
        if ctx.compoundstatement() is None:
            new_code = '{\n'
            self.token_stream_rewriter.insertBeforeIndex(ctx.start.tokenIndex, new_code)

enterTranslationunit(self, ctx)

Creating and open a text file for logging the instrumentation result :param ctx: :return:

Source code in coda\analysis\cfg\cfg_extractor_listener1.py
def enterTranslationunit(self, ctx: CPP14_v2Parser.TranslationunitContext):
    """
    Creating and open a text file for logging the instrumentation result
    :param ctx:
    :return:
    """
    self.instrumented_source = open(self.instrument_path + 'instrumented_source.cpp', 'w')
    log_path = self.instrument_path + "log_file.txt"
    new_code = '\n#include <fstream>\nstd::ofstream logFile("log_file.txt");\n\n'
    self.token_stream_rewriter.insertBeforeIndex(ctx.start.tokenIndex, new_code)
    self.domain_name = 0

exitFunctionbody1(self, ctx)

Insert a prob at the end of the function only if the function is void. :param ctx: :return:

Source code in coda\analysis\cfg\cfg_extractor_listener1.py
def exitFunctionbody1(self, ctx: CPP14_v2Parser.Functionbody1Context):
    """
     Insert a prob at the end of the function only if the function is void.
    :param ctx:
    :return:
    """
    if not self.has_jump_stack.pop():
        self.block_stop = ctx.stop.line
        self.addNode()
        self.final_nodes.add(self.block_number)

exitTranslationunit(self, ctx)

Creating and open a text file for logging the instrumentation result :param ctx: :return:

Source code in coda\analysis\cfg\cfg_extractor_listener1.py
def exitTranslationunit(self, ctx: CPP14_v2Parser.TranslationunitContext):
    """
    Creating and open a text file for logging the instrumentation result
    :param ctx:
    :return:
    """
    for i in range(len(self.afterInsert)):
        if self.afterInsert[i] != '':
            self.token_stream_rewriter.insertAfter(i, self.afterInsert[i])
    self.instrumented_source.write(self.token_stream_rewriter.getDefaultText())
    self.instrumented_source.close()

    functions_json = open(self.cfg_path + 'functions.json', 'w')
    json.dump(self.function_dict, functions_json)

CFG Extractor with ANTLR listener (version 2)

ANTLR listener to build control flow graph (CFG) for C++ functions

The improved version of the cfg_extractor_listener1.py module

Changelog:

version 2.0.1

  • Refactor the module

version 2.0.0

CFGInstListener

__init__(self, common_token_stream, number_of_tokens, directory_name) special

:param common_token_stream:

Source code in coda\analysis\cfg\cfg_extractor_listener2.py
def __init__(self, common_token_stream: CommonTokenStream, number_of_tokens, directory_name):
    """
    :param common_token_stream:
    """
    self.cfg_path = 'extracted_cfgs/' + directory_name + '/'
    self.instrument_path = 'instrumented_programs/' + directory_name + '/'
    self.block_dict = {}
    self.block_number = 0

    self.block_start = 0
    self.block_stop = 0
    self.domain_name = 0
    self.function_dict = {}
    self.select_junction_stack = []
    self.select_decision_stack = []
    self.iterate_junction_stack = []
    self.iterate_stack = []
    self.switch_junction_stack = []
    self.temp = []
    self.switch_stack = []
    self.switch_for_stack = []
    self.has_jump_stack = []
    self.is_for = []
    self.is_while = []
    self.is_doWhile = []
    self.has_default_stack = []
    self.has_case_stack = []
    self.try_stack = []
    self.try_junction_stack = []
    self.is_catch = False
    self.throw_stack = []
    self.afterInsert = [''] * number_of_tokens
    self.initial_nodes = set()
    self.final_nodes = set()
    self.label_dict = {}
    self.goto_dict = {}

    # Move all the tokens in the source code in a buffer, token_stream_rewriter.
    if common_token_stream is not None:
        self.token_stream_rewriter = TokenStreamRewriter.TokenStreamRewriter(common_token_stream)
    else:
        raise TypeError('common_token_stream is None')
    # create graph
    self.CFG_graph = nx.Graph()

enterStatement(self, ctx)

DFS traversal of a statement subtree, rooted at ctx. If the statement is a branching condition insert a prob. :param ctx: :return:

Source code in coda\analysis\cfg\cfg_extractor_listener2.py
def enterStatement(self, ctx: CPP14_v2Parser.StatementContext):
    """
    DFS traversal of a statement subtree, rooted at ctx.
    If the statement is a branching condition insert a prob.
    :param ctx:
    :return:
    """
    # do-while and range-for
    # line 342(CPP14_v2Parser.Iterationstatement4Context)
    if isinstance(ctx.parentCtx,
                  (CPP14_v2Parser.Iterationstatement2Context)):
        # if there is a compound statement after the branchning condition:

        body = ctx.compoundstatement()
        if body != None:
            self.insertAfter(body)
        # if there is only one statement after the branchning condition then create a block.
        else:
            new_code = '{'
            new_code += '\n' + self.logLine() + ';\n'
            self.token_stream_rewriter.insertBeforeIndex(ctx.start.tokenIndex, new_code)

    # one line while and for
    elif isinstance(ctx.parentCtx, CPP14_v2Parser.IterationstatementContext):
        self.block_number += 1
        self.block_start = ctx.start.line

        # if there is a compound statement after the branchning condition:
        body = ctx.compoundstatement()
        if body != None:
            self.insertAfter(body)
        # if there is only one statement after the branchning condition then create a block.
        else:
            new_code = '{'
            new_code += '\n' + self.logLine() + ';\n'
            self.token_stream_rewriter.insertBeforeIndex(ctx.start.tokenIndex, new_code)
    elif isinstance(ctx.parentCtx,
                    (CPP14_v2Parser.Selectionstatement1Context, CPP14_v2Parser.Selectionstatement2Context)):

        self.block_number += 1

        self.addDecisionEdge()

        self.block_start = ctx.start.line

        self.has_jump_stack.append(False)

        # if there is a compound statement after the branchning condition:
        body = ctx.compoundstatement()
        if body != None:
            self.insertAfter(body)
        # if there is only one statement after the branchning condition then create a block.
        else:
            new_code = '{'
            new_code += '\n' + self.logLine() + ';\n'
            self.token_stream_rewriter.insertBeforeIndex(ctx.start.tokenIndex, new_code)

    elif isinstance(ctx.parentCtx,
                    CPP14_v2Parser.Selectionstatement3Context):
        if ctx.compoundstatement() == None:
            new_code = '{\n'
            self.token_stream_rewriter.insertBeforeIndex(ctx.start.tokenIndex, new_code)

enterTranslationunit(self, ctx)

Creating and open a text file for logging the instrumentation result :param ctx: :return:

Source code in coda\analysis\cfg\cfg_extractor_listener2.py
def enterTranslationunit(self, ctx: CPP14_v2Parser.TranslationunitContext):
    """
    Creating and open a text file for logging the instrumentation result
    :param ctx:
    :return:
    """
    self.instrumented_source = open(self.instrument_path + 'instrumented_source.cpp', 'w')
    log_path = self.instrument_path + "log_file.txt"
    new_code = '\n//in the name of allah\n#include <fstream>\nstd::ofstream logFile("log_file.txt");\n\n'
    self.token_stream_rewriter.insertBeforeIndex(ctx.start.tokenIndex, new_code)
    self.domain_name = 0

exitFunctionbody1(self, ctx)

Insert a prob at the end of the function only if the function is void. :param ctx: :return:

Source code in coda\analysis\cfg\cfg_extractor_listener2.py
def exitFunctionbody1(self, ctx: CPP14_v2Parser.Functionbody1Context):
    """
     Insert a prob at the end of the function only if the function is void.
    :param ctx:
    :return:
    """
    if not self.has_jump_stack.pop():
        self.block_stop = ctx.stop.line
        self.addNode()
        self.final_nodes.add(self.block_number)

exitTranslationunit(self, ctx)

Creating and open a text file for logging the instrumentation result :param ctx: :return:

Source code in coda\analysis\cfg\cfg_extractor_listener2.py
def exitTranslationunit(self, ctx: CPP14_v2Parser.TranslationunitContext):
    """
    Creating and open a text file for logging the instrumentation result
    :param ctx:
    :return:
    """
    for i in range(len(self.afterInsert)):
        if self.afterInsert[i] != '':
            self.token_stream_rewriter.insertAfter(i, self.afterInsert[i])
    self.instrumented_source.write(self.token_stream_rewriter.getDefaultText())
    self.instrumented_source.close()

    functions_json = open(self.cfg_path + 'functions.json', 'w')
    json.dump(self.function_dict, functions_json)

CFG Extractor with ANTLR visitor

ANTLR visitor to build control flow graph (CFG) for C++ functions

The improved version of the cfg_extractor_listener1.py module

Changelog:

version 1.0.1

  • Refactor the module

version 1.0.0