|
|
| import sys
|
| import os
|
| import _paths
|
|
|
|
|
| def handle_options():
|
| """Returns options values as map."""
|
|
|
| import argparse
|
| parser = argparse.ArgumentParser(description="")
|
|
|
| parser.add_argument("--tfpg-file", "-t",
|
| dest="tfpg",
|
| action="store",
|
| default=None,
|
| help="The TFPG file (can be in XML or human readable format)")
|
|
|
| parser.add_argument("--filter-action", "-F",
|
| dest="filter_action",
|
| choices=['simplify', 'focus'],
|
| default=None,
|
| help="'simplify': Basic simplification routines as executed in "
|
| "TFPG synthesis are performed. Edges are assumed to be maximally "
|
| "permissive (tmin=0, tmax=inf, modes=all), otherwise soundness "
|
| "is not guaranteed. Note that all nodes in the input TFPG are "
|
| "preserved in the output TFPG."
|
| "'focus': All nodes and respective incoming edges from which the "
|
| "nodes specified with '--focus-nodes' cannot be reached are "
|
| "dropped from the TFPG.")
|
|
|
| parser.add_argument("--focus-nodes",
|
| dest="focus_nodes",
|
| action="store",
|
| default=None,
|
| help="When selecting filter action 'focus', this option "
|
| "is used to indicate the focus nodes to consider. "
|
| "Specify as colon-separated list of node names.")
|
|
|
| parser.add_argument("--outdir", "-O",
|
| dest="outdir",
|
| default="out",
|
| metavar="PATH",
|
| help="Output directory, where all generated file should be "
|
| "put into (default: %(default)s)")
|
|
|
| # TODO add option to script and backend to filter also generated nodes; disabled by default.
|
|
|
| p_args = parser.parse_args()
|
|
|
| if p_args.tfpg is None:
|
| parser.error("The TFPG file was not specified.")
|
|
|
| if not os.path.exists(p_args.tfpg):
|
| parser.error("The specified TFPG file does not exist.")
|
|
|
| if p_args.filter_action is None:
|
| parser.error("Need to specify a filter action.")
|
|
|
| if p_args.filter_action == "focus" and p_args.focus_nodes is None:
|
| parser.error("For 'focus' filtering, need to specify focus nodes.")
|
|
|
| if os.path.exists(p_args.outdir) and not os.path.isdir(p_args.outdir):
|
| parser.error("Not a directory: " + p_args.outdir)
|
|
|
| return p_args
|
|
|
|
|
| if __name__ == "__main__":
|
|
|
| # set up paths
|
| _paths.setup_path()
|
|
|
| # set up logging
|
| import xsap.utils.misc.log
|
| xsap.utils.misc.log.init()
|
|
|
| # other imports
|
| from xsap.data_structure.tfpg.tfpg import Tfpg
|
| from xsap.features.tfpg_synthesis.tfpg_synthesizer import TfpgSynthesizer
|
|
|
| # handle arguments
|
| args = handle_options()
|
|
|
| if not os.path.exists(args.outdir):
|
| os.makedirs(args.outdir)
|
|
|
| # parse the TFPG into internal struct
|
| tfpg_struct = Tfpg.parse_tfpg(args.tfpg)
|
|
|
| if tfpg_struct is None:
|
| print "Error: Failure during TFPG parsing."
|
| sys.exit(1)
|
|
|
| # Filter Type: FOCUS
|
|
|
| if args.filter_action == "focus":
|
| focus_node_names = [n.strip() for n in args.focus_nodes.split(':')]
|
| tfpg_nodes = tfpg_struct.get_nodes_list()
|
| tfpg_node_names = [n.get_name() for n in tfpg_nodes]
|
|
|
| for name in focus_node_names:
|
| if name not in tfpg_node_names:
|
| print "Error: '%s' is not a valid TFPG node name." % name
|
| sys.exit(1)
|
|
|
| print
|
| print "TFPG Filtering> computing reachability set"
|
| focus_reachable_set = tfpg_struct.compute_reachable_from_set(focus_node_names)
|
|
|
| # drop all nodes (and respective edges) that are not in focus_reachable_set
|
| print "TFPG Filtering> removing nodes from which focus nodes cannot be reached"
|
| for name in tfpg_node_names:
|
| if name not in focus_reachable_set:
|
| tfpg_struct.remove_node(name)
|
|
|
| # write new TFPG to XML file "TFPG_NAME_focused.txml"
|
| print "TFPG Filtering> dumping result"
|
| filtered_tfpg_file_name = os.path.splitext(os.path.basename(args.tfpg))[0] + "_focused.txml"
|
| tfpg_struct.dump_to_xml(os.path.join(args.outdir, filtered_tfpg_file_name))
|
|
|
| res_msg = "The filtered TPFG file has been saved at '%s'" % os.path.join(args.outdir, filtered_tfpg_file_name)
|
|
|
| print
|
| print '-' * len(res_msg)
|
| print res_msg
|
| print '-' * len(res_msg)
|
|
|
| # Filter Type: SIMPLIFY
|
|
|
| else:
|
|
|
| if tfpg_struct.has_non_maximally_permissive_edges():
|
| print "Error: TFPG has some edges that are not maximally permissive; cannot perform simplification."
|
| sys.exit(1)
|
|
|
| system_modes = tfpg_struct.get_modes_list()
|
| failure_modes = [n.get_name() for n in tfpg_struct.get_fm_nodes()]
|
| failure_modes_renamed = []
|
| discrepancies = [n.get_name() for n in tfpg_struct.get_or_nodes() + tfpg_struct.get_and_nodes()]
|
| discrepancies_renamed = []
|
|
|
| minimal_cutsets = {}
|
|
|
| # create node name map where generated node names are replaced, since
|
| # these are reserved node names for the synthesizer.
|
| map_to_new = {}
|
| map_to_old = {}
|
|
|
| for fm_orig in failure_modes:
|
| if TfpgSynthesizer.is_generated_node_name(fm_orig):
|
| map_to_new[fm_orig] = \
|
| TfpgSynthesizer.generate_random_id(failure_modes + discrepancies)
|
| else:
|
| map_to_new[fm_orig] = fm_orig
|
| map_to_old[map_to_new[fm_orig]] = fm_orig
|
| failure_modes_renamed.append(map_to_new[fm_orig])
|
|
|
| for d_orig in discrepancies:
|
| if TfpgSynthesizer.is_generated_node_name(d_orig):
|
| map_to_new[d_orig] = \
|
| TfpgSynthesizer.generate_random_id(failure_modes + discrepancies)
|
| else:
|
| map_to_new[d_orig] = d_orig
|
| map_to_old[map_to_new[d_orig]] = d_orig
|
| discrepancies_renamed.append(map_to_new[d_orig])
|
|
|
| assert len(failure_modes) == len(failure_modes_renamed)
|
| assert len(discrepancies) == len(discrepancies_renamed)
|
|
|
| # for all FM nodes: empty set is the only mcs.
|
| for fm in tfpg_struct.get_fm_nodes():
|
| minimal_cutsets[map_to_new[fm.get_name()]] = [[]]
|
|
|
| # for all AND nodes: only one mcs is given, ie the set of predecessors.
|
| for n in tfpg_struct.get_and_nodes():
|
| n_predecessors = tfpg_struct.get_predecessors(n)
|
| minimal_cutsets[map_to_new[n.get_name()]] = \
|
| [[map_to_new[pred.get_name()] for pred in n_predecessors]]
|
|
|
| # for all OR nodes: set of mcs are predecessors as singleton sets.
|
| for n in tfpg_struct.get_or_nodes():
|
| n_predecessors = tfpg_struct.get_predecessors(n)
|
| minimal_cutsets[map_to_new[n.get_name()]] = \
|
| [[map_to_new[pred.get_name()]] for pred in n_predecessors]
|
|
|
| # call simplification routines from synthesis package
|
| discrepancy_tuples = [(n, "not_monitored") for n in discrepancies_renamed]
|
| tfpg_synthesizer = TfpgSynthesizer(system_modes, failure_modes_renamed, discrepancy_tuples,
|
| minimal_cutsets)
|
|
|
| # write new TFPG to XML file "TFPG_NAME_focused.txml"
|
| tfpg_file_name = os.path.splitext(os.path.basename(args.tfpg))[0]
|
| out_tfpg_fname = os.path.join(args.outdir, tfpg_file_name + '_simplified.txml')
|
|
|
| res = tfpg_synthesizer.synthesize_tfpg(out_tfpg_fname, tfpg_struct.get_name(), True)
|
|
|
| if res != "ok":
|
| print "Error: An error has occurred during simplification (%s)." % res
|
| sys.exit(1)
|
|
|
| # translate back generated node names that have been modified
|
| if set(failure_modes) != set(failure_modes_renamed) or \
|
| set(discrepancies) != set(discrepancies_renamed):
|
|
|
| tfpg_struct = Tfpg.parse_tfpg(out_tfpg_fname)
|
| assert tfpg_struct is not None
|
|
|
| for n_name in failure_modes_renamed:
|
| if n_name not in failure_modes:
|
| tfpg_node = tfpg_struct.get_node_by_name(n_name)
|
| tfpg_node.set_name(map_to_old[tfpg_node.get_name()])
|
|
|
| for n_name in discrepancies_renamed:
|
| if n_name not in discrepancies:
|
| tfpg_node = tfpg_struct.get_node_by_name(n_name)
|
| tfpg_node.set_name(map_to_old[tfpg_node.get_name()])
|
|
|
| tfpg_struct.dump_to_xml(out_tfpg_fname)
|
|
|
| res_msg = "Simplified TFPG has been generated at %s." % out_tfpg_fname
|
|
|
| print "-" * len(res_msg)
|
| print res_msg
|
| print "-" * len(res_msg)
|