blob: 88ef38f4a6fc6455543e8c94daa4bf188d02fe14 [file] [log] [blame]
import sys
import os
import _paths
def handle_options():
"""Returns options values as map."""
import argparse
parser = argparse.ArgumentParser(description="")
parser.add_argument("--tfpg-file", "-t",
dest="tfpg",
action="store",
default=None,
help="The TFPG file (can be in XML or human readable format)")
parser.add_argument("--filter-action", "-F",
dest="filter_action",
choices=['simplify', 'focus'],
default=None,
help="'simplify': Basic simplification routines as executed in "
"TFPG synthesis are performed. Edges are assumed to be maximally "
"permissive (tmin=0, tmax=inf, modes=all), otherwise soundness "
"is not guaranteed. Note that all nodes in the input TFPG are "
"preserved in the output TFPG."
"'focus': All nodes and respective incoming edges from which the "
"nodes specified with '--focus-nodes' cannot be reached are "
"dropped from the TFPG.")
parser.add_argument("--focus-nodes",
dest="focus_nodes",
action="store",
default=None,
help="When selecting filter action 'focus', this option "
"is used to indicate the focus nodes to consider. "
"Specify as colon-separated list of node names.")
parser.add_argument("--outdir", "-O",
dest="outdir",
default="out",
metavar="PATH",
help="Output directory, where all generated file should be "
"put into (default: %(default)s)")
# TODO add option to script and backend to filter also generated nodes; disabled by default.
p_args = parser.parse_args()
if p_args.tfpg is None:
parser.error("The TFPG file was not specified.")
if not os.path.exists(p_args.tfpg):
parser.error("The specified TFPG file does not exist.")
if p_args.filter_action is None:
parser.error("Need to specify a filter action.")
if p_args.filter_action == "focus" and p_args.focus_nodes is None:
parser.error("For 'focus' filtering, need to specify focus nodes.")
if os.path.exists(p_args.outdir) and not os.path.isdir(p_args.outdir):
parser.error("Not a directory: " + p_args.outdir)
return p_args
if __name__ == "__main__":
# set up paths
_paths.setup_path()
# set up logging
import xsap.utils.misc.log
xsap.utils.misc.log.init()
# other imports
from xsap.data_structure.tfpg.tfpg import Tfpg
from xsap.features.tfpg_synthesis.tfpg_synthesizer import TfpgSynthesizer
# handle arguments
args = handle_options()
if not os.path.exists(args.outdir):
os.makedirs(args.outdir)
# parse the TFPG into internal struct
tfpg_struct = Tfpg.parse_tfpg(args.tfpg)
if tfpg_struct is None:
print "Error: Failure during TFPG parsing."
sys.exit(1)
# Filter Type: FOCUS
if args.filter_action == "focus":
focus_node_names = [n.strip() for n in args.focus_nodes.split(':')]
tfpg_nodes = tfpg_struct.get_nodes_list()
tfpg_node_names = [n.get_name() for n in tfpg_nodes]
for name in focus_node_names:
if name not in tfpg_node_names:
print "Error: '%s' is not a valid TFPG node name." % name
sys.exit(1)
print
print "TFPG Filtering> computing reachability set"
focus_reachable_set = tfpg_struct.compute_reachable_from_set(focus_node_names)
# drop all nodes (and respective edges) that are not in focus_reachable_set
print "TFPG Filtering> removing nodes from which focus nodes cannot be reached"
for name in tfpg_node_names:
if name not in focus_reachable_set:
tfpg_struct.remove_node(name)
# write new TFPG to XML file "TFPG_NAME_focused.txml"
print "TFPG Filtering> dumping result"
filtered_tfpg_file_name = os.path.splitext(os.path.basename(args.tfpg))[0] + "_focused.txml"
tfpg_struct.dump_to_xml(os.path.join(args.outdir, filtered_tfpg_file_name))
res_msg = "The filtered TPFG file has been saved at '%s'" % os.path.join(args.outdir, filtered_tfpg_file_name)
print
print '-' * len(res_msg)
print res_msg
print '-' * len(res_msg)
# Filter Type: SIMPLIFY
else:
if tfpg_struct.has_non_maximally_permissive_edges():
print "Error: TFPG has some edges that are not maximally permissive; cannot perform simplification."
sys.exit(1)
system_modes = tfpg_struct.get_modes_list()
failure_modes = [n.get_name() for n in tfpg_struct.get_fm_nodes()]
failure_modes_renamed = []
discrepancies = [n.get_name() for n in tfpg_struct.get_or_nodes() + tfpg_struct.get_and_nodes()]
discrepancies_renamed = []
minimal_cutsets = {}
# create node name map where generated node names are replaced, since
# these are reserved node names for the synthesizer.
map_to_new = {}
map_to_old = {}
for fm_orig in failure_modes:
if TfpgSynthesizer.is_generated_node_name(fm_orig):
map_to_new[fm_orig] = \
TfpgSynthesizer.generate_random_id(failure_modes + discrepancies)
else:
map_to_new[fm_orig] = fm_orig
map_to_old[map_to_new[fm_orig]] = fm_orig
failure_modes_renamed.append(map_to_new[fm_orig])
for d_orig in discrepancies:
if TfpgSynthesizer.is_generated_node_name(d_orig):
map_to_new[d_orig] = \
TfpgSynthesizer.generate_random_id(failure_modes + discrepancies)
else:
map_to_new[d_orig] = d_orig
map_to_old[map_to_new[d_orig]] = d_orig
discrepancies_renamed.append(map_to_new[d_orig])
assert len(failure_modes) == len(failure_modes_renamed)
assert len(discrepancies) == len(discrepancies_renamed)
# for all FM nodes: empty set is the only mcs.
for fm in tfpg_struct.get_fm_nodes():
minimal_cutsets[map_to_new[fm.get_name()]] = [[]]
# for all AND nodes: only one mcs is given, ie the set of predecessors.
for n in tfpg_struct.get_and_nodes():
n_predecessors = tfpg_struct.get_predecessors(n)
minimal_cutsets[map_to_new[n.get_name()]] = \
[[map_to_new[pred.get_name()] for pred in n_predecessors]]
# for all OR nodes: set of mcs are predecessors as singleton sets.
for n in tfpg_struct.get_or_nodes():
n_predecessors = tfpg_struct.get_predecessors(n)
minimal_cutsets[map_to_new[n.get_name()]] = \
[[map_to_new[pred.get_name()]] for pred in n_predecessors]
# call simplification routines from synthesis package
discrepancy_tuples = [(n, "not_monitored") for n in discrepancies_renamed]
tfpg_synthesizer = TfpgSynthesizer(system_modes, failure_modes_renamed, discrepancy_tuples,
minimal_cutsets)
# write new TFPG to XML file "TFPG_NAME_focused.txml"
tfpg_file_name = os.path.splitext(os.path.basename(args.tfpg))[0]
out_tfpg_fname = os.path.join(args.outdir, tfpg_file_name + '_simplified.txml')
res = tfpg_synthesizer.synthesize_tfpg(out_tfpg_fname, tfpg_struct.get_name(), True)
if res != "ok":
print "Error: An error has occurred during simplification (%s)." % res
sys.exit(1)
# translate back generated node names that have been modified
if set(failure_modes) != set(failure_modes_renamed) or \
set(discrepancies) != set(discrepancies_renamed):
tfpg_struct = Tfpg.parse_tfpg(out_tfpg_fname)
assert tfpg_struct is not None
for n_name in failure_modes_renamed:
if n_name not in failure_modes:
tfpg_node = tfpg_struct.get_node_by_name(n_name)
tfpg_node.set_name(map_to_old[tfpg_node.get_name()])
for n_name in discrepancies_renamed:
if n_name not in discrepancies:
tfpg_node = tfpg_struct.get_node_by_name(n_name)
tfpg_node.set_name(map_to_old[tfpg_node.get_name()])
tfpg_struct.dump_to_xml(out_tfpg_fname)
res_msg = "Simplified TFPG has been generated at %s." % out_tfpg_fname
print "-" * len(res_msg)
print res_msg
print "-" * len(res_msg)