#!/usr/bin/env python | |
import sys | |
import os.path | |
from argparse import ArgumentParser, RawTextHelpFormatter | |
import _paths | |
_paths.setup_path() | |
from xsap.data_structure.tfpg.tfpg_errors import UnsupportedInfinitySemantics | |
from xsap.data_structure.tfpg.tfpg import OPEN_INFINITY | |
from xsap.data_structure.tfpg.tfpg import Tfpg | |
from xsap.features.tfpg_smt.utils import parse_scenario_file, print_model | |
from xsap.features.tfpg_smt.reasoner import TFPGReasoner | |
from xsap.features.tfpg_smt.encoding import node_state_symbol | |
def main(): | |
parser = ArgumentParser(formatter_class=RawTextHelpFormatter, | |
description=""" | |
TFPG Diagnosis (SMT): Diagnose the scenario for the given TFPG. | |
""") | |
parser.add_argument('inputTFPG', metavar='inputTFPG', | |
type=str, | |
help='The TFPG model to use') | |
parser.add_argument('--diagnosability',action="store_true", | |
help='Checks if all failure modes are diagnosable') | |
parser.add_argument('--diagnose', '-d', metavar='scenario', | |
type=str, | |
help='Enumerate the possible diagnoses for the ' \ | |
'given scenario') | |
parser.add_argument('--all-diag', '-a', metavar='node', type=str, | |
help='Checks if the given node appears in all ' \ | |
'the diagnoses') | |
parser.add_argument('--open-infinity', action='store_true', | |
help='Force the usage of Open Infinity semantics') | |
args = parser.parse_args() | |
# Command line handling | |
tfpg = Tfpg.parse_tfpg(args.inputTFPG) | |
if tfpg is None: | |
sys.exit(1) | |
if args.open_infinity: | |
tfpg.set_infinity_semantics(OPEN_INFINITY) | |
try: | |
reasoner = TFPGReasoner(tfpg, include_health=False) | |
except UnsupportedInfinitySemantics: | |
print("The current version of the algorithm only supports Open Infinity Semantics.") | |
print("To force the implicit conversion, use the option --open-infinity") | |
sys.exit(1) | |
if args.diagnose: | |
scenario_f = args.diagnose | |
condition_node = args.all_diag | |
scenario = parse_scenario_file(scenario_f) | |
if scenario is None: | |
print("The given scenario is invalid") | |
sys.exit(1) | |
fscenario = reasoner.scenario_to_formula(scenario) | |
if condition_node is not None: | |
print("Checking if all diagnoses imply node...") | |
beta = node_state_symbol(condition_node) | |
res, model = reasoner.certain_diagnosis(scenario, beta) | |
if res: | |
print("Node appears in all diagnoses") | |
else: | |
print("Node does NOT appear in all diagnoses!") | |
print("A diagnosis for which node is not present is:") | |
print_model(model) | |
else: | |
print("Getting all the diagnoses...") | |
diagnoses = reasoner.get_all_diagnosis(fscenario) | |
if len(diagnoses) == 0: | |
print("No diagnoses found.") | |
elif len(diagnoses) == 1: | |
print("One diagnosis found.") | |
else: | |
print("%d diagnoses found." % len(diagnoses)) | |
for d in diagnoses: | |
print("------") | |
print_model(d) | |
elif args.diagnosability: | |
print("Checking diagnosability of the TFPG...") | |
nd_list, _ = reasoner.check_diagnosability() | |
if len(nd_list) == 0: | |
print("All Failure Modes can be diagnosed in all modes!") | |
else: | |
print("The following Failure Modes CANNOT be diagnosed in the given modes!") | |
summary = {} | |
for mode, fm in nd_list: | |
if mode not in summary: summary[mode] = [] | |
summary[mode].append(fm) | |
print("\n".join(["%s : %s" % x for x in summary.items()])) | |
else: | |
print "Invalid command line parameters." | |
print "See help for further details." | |
sys.exit(1) | |
if __name__ == "__main__": | |
main() |