Control flow graph extractors
CFG Extractor with ANTLR listener (version 1)
ANTLR listener to build control flow graph (CFG) for C++ functions If a CPP file contains multiple function the CFG is created for each function. For each function in the source file the CFG is stored in a dot file which can be visualized with graphviz.
Changelog:
Version 0.2.0
- Add writing the extracted CFG in dot file.
- Add visualization with pydot and graphviz
Version 0.1.0
CFGInstListener
__init__(self, common_token_stream, number_of_tokens, directory_name)
special
:param common_token_stream:
Source code in coda\analysis\cfg\cfg_extractor_listener1.py
def __init__(self, common_token_stream: CommonTokenStream, number_of_tokens, directory_name):
"""
:param common_token_stream:
"""
self.cfg_path = 'CFGS/' + directory_name + '/'
self.instrument_path = 'Instrument/' + directory_name + '/'
self.block_dict = {}
self.block_number = 0
self.block_start = 0
self.block_stop = 0
self.domain_name = 0
self.function_dict = {}
self.select_junction_stack = []
self.select_decision_stack = []
self.iterate_junction_stack = []
self.iterate_stack = []
self.switch_junction_stack = []
self.switch_stack = []
self.switch_for_stack = []
self.has_jump_stack = []
self.has_default_stack = []
self.has_case_stack = []
self.try_stack = []
self.try_junction_stack = []
self.is_catch = False
self.afterInsert = [''] * number_of_tokens
self.initial_nodes = set()
self.final_nodes = set()
self.label_dict = {}
self.goto_dict = {}
# Move all the tokens in the source code in a buffer, token_stream_rewriter.
if common_token_stream is not None:
self.token_stream_rewriter = TokenStreamRewriter.TokenStreamRewriter(common_token_stream)
else:
raise TypeError('common_token_stream is None')
enterStatement(self, ctx)
DFS traversal of a statement subtree, rooted at ctx. If the statement is a branching condition insert a prob. :param ctx: :return:
Source code in coda\analysis\cfg\cfg_extractor_listener1.py
def enterStatement(self, ctx: CPP14_v2Parser.StatementContext):
"""
DFS traversal of a statement subtree, rooted at ctx.
If the statement is a branching condition insert a prob.
:param ctx:
:return:
"""
# do-while and range-for
if isinstance(ctx.parentCtx,
(CPP14_v2Parser.Iterationstatement4Context, CPP14_v2Parser.Iterationstatement2Context)):
# if there is a compound statement after the branchning condition:
body = ctx.compoundstatement()
if body is not None:
self.insertAfter(body)
# if there is only one statement after the branchning condition then create a block.
else:
new_code = '{'
new_code += '\n' + self.logLine() + ';\n'
self.token_stream_rewriter.insertBeforeIndex(ctx.start.tokenIndex, new_code)
# one line while and for
elif isinstance(ctx.parentCtx, CPP14_v2Parser.IterationstatementContext):
self.block_number += 1
self.block_start = ctx.start.line
# if there is a compound statement after the branchning condition:
body = ctx.compoundstatement()
if body is not None:
self.insertAfter(body)
# if there is only one statement after the branchning condition then create a block.
else:
new_code = '{'
new_code += '\n' + self.logLine() + ';\n'
self.token_stream_rewriter.insertBeforeIndex(ctx.start.tokenIndex, new_code)
elif isinstance(ctx.parentCtx,
(CPP14_v2Parser.Selectionstatement1Context, CPP14_v2Parser.Selectionstatement2Context)):
self.block_number += 1
self.addDecisionEdge()
self.block_start = ctx.start.line
self.has_jump_stack.append(False)
# if there is a compound statement after the branchning condition:
body = ctx.compoundstatement()
if body is not None:
self.insertAfter(body)
# if there is only one statement after the branchning condition then create a block.
else:
new_code = '{'
new_code += '\n' + self.logLine() + ';\n'
self.token_stream_rewriter.insertBeforeIndex(ctx.start.tokenIndex, new_code)
elif isinstance(ctx.parentCtx,
CPP14_v2Parser.Selectionstatement3Context):
if ctx.compoundstatement() is None:
new_code = '{\n'
self.token_stream_rewriter.insertBeforeIndex(ctx.start.tokenIndex, new_code)
enterTranslationunit(self, ctx)
Creating and open a text file for logging the instrumentation result :param ctx: :return:
Source code in coda\analysis\cfg\cfg_extractor_listener1.py
def enterTranslationunit(self, ctx: CPP14_v2Parser.TranslationunitContext):
"""
Creating and open a text file for logging the instrumentation result
:param ctx:
:return:
"""
self.instrumented_source = open(self.instrument_path + 'instrumented_source.cpp', 'w')
log_path = self.instrument_path + "log_file.txt"
new_code = '\n#include <fstream>\nstd::ofstream logFile("log_file.txt");\n\n'
self.token_stream_rewriter.insertBeforeIndex(ctx.start.tokenIndex, new_code)
self.domain_name = 0
exitFunctionbody1(self, ctx)
Insert a prob at the end of the function only if the function is void. :param ctx: :return:
Source code in coda\analysis\cfg\cfg_extractor_listener1.py
def exitFunctionbody1(self, ctx: CPP14_v2Parser.Functionbody1Context):
"""
Insert a prob at the end of the function only if the function is void.
:param ctx:
:return:
"""
if not self.has_jump_stack.pop():
self.block_stop = ctx.stop.line
self.addNode()
self.final_nodes.add(self.block_number)
exitTranslationunit(self, ctx)
Creating and open a text file for logging the instrumentation result :param ctx: :return:
Source code in coda\analysis\cfg\cfg_extractor_listener1.py
def exitTranslationunit(self, ctx: CPP14_v2Parser.TranslationunitContext):
"""
Creating and open a text file for logging the instrumentation result
:param ctx:
:return:
"""
for i in range(len(self.afterInsert)):
if self.afterInsert[i] != '':
self.token_stream_rewriter.insertAfter(i, self.afterInsert[i])
self.instrumented_source.write(self.token_stream_rewriter.getDefaultText())
self.instrumented_source.close()
functions_json = open(self.cfg_path + 'functions.json', 'w')
json.dump(self.function_dict, functions_json)
CFG Extractor with ANTLR listener (version 2)
ANTLR listener to build control flow graph (CFG) for C++ functions
The improved version of the cfg_extractor_listener1.py
module
Changelog:
version 2.0.1
- Refactor the module
version 2.0.0
CFGInstListener
__init__(self, common_token_stream, number_of_tokens, directory_name)
special
:param common_token_stream:
Source code in coda\analysis\cfg\cfg_extractor_listener2.py
def __init__(self, common_token_stream: CommonTokenStream, number_of_tokens, directory_name):
"""
:param common_token_stream:
"""
self.cfg_path = 'extracted_cfgs/' + directory_name + '/'
self.instrument_path = 'instrumented_programs/' + directory_name + '/'
self.block_dict = {}
self.block_number = 0
self.block_start = 0
self.block_stop = 0
self.domain_name = 0
self.function_dict = {}
self.select_junction_stack = []
self.select_decision_stack = []
self.iterate_junction_stack = []
self.iterate_stack = []
self.switch_junction_stack = []
self.temp = []
self.switch_stack = []
self.switch_for_stack = []
self.has_jump_stack = []
self.is_for = []
self.is_while = []
self.is_doWhile = []
self.has_default_stack = []
self.has_case_stack = []
self.try_stack = []
self.try_junction_stack = []
self.is_catch = False
self.throw_stack = []
self.afterInsert = [''] * number_of_tokens
self.initial_nodes = set()
self.final_nodes = set()
self.label_dict = {}
self.goto_dict = {}
# Move all the tokens in the source code in a buffer, token_stream_rewriter.
if common_token_stream is not None:
self.token_stream_rewriter = TokenStreamRewriter.TokenStreamRewriter(common_token_stream)
else:
raise TypeError('common_token_stream is None')
# create graph
self.CFG_graph = nx.Graph()
enterStatement(self, ctx)
DFS traversal of a statement subtree, rooted at ctx. If the statement is a branching condition insert a prob. :param ctx: :return:
Source code in coda\analysis\cfg\cfg_extractor_listener2.py
def enterStatement(self, ctx: CPP14_v2Parser.StatementContext):
"""
DFS traversal of a statement subtree, rooted at ctx.
If the statement is a branching condition insert a prob.
:param ctx:
:return:
"""
# do-while and range-for
# line 342(CPP14_v2Parser.Iterationstatement4Context)
if isinstance(ctx.parentCtx,
(CPP14_v2Parser.Iterationstatement2Context)):
# if there is a compound statement after the branchning condition:
body = ctx.compoundstatement()
if body != None:
self.insertAfter(body)
# if there is only one statement after the branchning condition then create a block.
else:
new_code = '{'
new_code += '\n' + self.logLine() + ';\n'
self.token_stream_rewriter.insertBeforeIndex(ctx.start.tokenIndex, new_code)
# one line while and for
elif isinstance(ctx.parentCtx, CPP14_v2Parser.IterationstatementContext):
self.block_number += 1
self.block_start = ctx.start.line
# if there is a compound statement after the branchning condition:
body = ctx.compoundstatement()
if body != None:
self.insertAfter(body)
# if there is only one statement after the branchning condition then create a block.
else:
new_code = '{'
new_code += '\n' + self.logLine() + ';\n'
self.token_stream_rewriter.insertBeforeIndex(ctx.start.tokenIndex, new_code)
elif isinstance(ctx.parentCtx,
(CPP14_v2Parser.Selectionstatement1Context, CPP14_v2Parser.Selectionstatement2Context)):
self.block_number += 1
self.addDecisionEdge()
self.block_start = ctx.start.line
self.has_jump_stack.append(False)
# if there is a compound statement after the branchning condition:
body = ctx.compoundstatement()
if body != None:
self.insertAfter(body)
# if there is only one statement after the branchning condition then create a block.
else:
new_code = '{'
new_code += '\n' + self.logLine() + ';\n'
self.token_stream_rewriter.insertBeforeIndex(ctx.start.tokenIndex, new_code)
elif isinstance(ctx.parentCtx,
CPP14_v2Parser.Selectionstatement3Context):
if ctx.compoundstatement() == None:
new_code = '{\n'
self.token_stream_rewriter.insertBeforeIndex(ctx.start.tokenIndex, new_code)
enterTranslationunit(self, ctx)
Creating and open a text file for logging the instrumentation result :param ctx: :return:
Source code in coda\analysis\cfg\cfg_extractor_listener2.py
def enterTranslationunit(self, ctx: CPP14_v2Parser.TranslationunitContext):
"""
Creating and open a text file for logging the instrumentation result
:param ctx:
:return:
"""
self.instrumented_source = open(self.instrument_path + 'instrumented_source.cpp', 'w')
log_path = self.instrument_path + "log_file.txt"
new_code = '\n//in the name of allah\n#include <fstream>\nstd::ofstream logFile("log_file.txt");\n\n'
self.token_stream_rewriter.insertBeforeIndex(ctx.start.tokenIndex, new_code)
self.domain_name = 0
exitFunctionbody1(self, ctx)
Insert a prob at the end of the function only if the function is void. :param ctx: :return:
Source code in coda\analysis\cfg\cfg_extractor_listener2.py
def exitFunctionbody1(self, ctx: CPP14_v2Parser.Functionbody1Context):
"""
Insert a prob at the end of the function only if the function is void.
:param ctx:
:return:
"""
if not self.has_jump_stack.pop():
self.block_stop = ctx.stop.line
self.addNode()
self.final_nodes.add(self.block_number)
exitTranslationunit(self, ctx)
Creating and open a text file for logging the instrumentation result :param ctx: :return:
Source code in coda\analysis\cfg\cfg_extractor_listener2.py
def exitTranslationunit(self, ctx: CPP14_v2Parser.TranslationunitContext):
"""
Creating and open a text file for logging the instrumentation result
:param ctx:
:return:
"""
for i in range(len(self.afterInsert)):
if self.afterInsert[i] != '':
self.token_stream_rewriter.insertAfter(i, self.afterInsert[i])
self.instrumented_source.write(self.token_stream_rewriter.getDefaultText())
self.instrumented_source.close()
functions_json = open(self.cfg_path + 'functions.json', 'w')
json.dump(self.function_dict, functions_json)