| import yaml |
| import sys |
| import os |
| import pprint |
| import re |
| |
| # Data structure used during conversion |
| # modules - map of the module_name:module_data |
| # module_data - map of the detailed data of one module: |
| # * import: list of the impoted modules |
| # * functions: list of the encoder functions to be generated |
| # * type_tree: list of the type definitions |
| # * extra_lists: list of the extra record of's and encoder functions |
| |
| # Global variable, they always the reference to the actually processed module. |
| # modules |
| # module_data |
| # type_tree |
| # module_name |
| # ident_level |
| # ident_c |
| # saved_stdout |
| |
| |
| # Reserved word list. Used to sanitaze the type and field names. |
| ttcn_keywords=[ # Reserved words |
| "action","activate","address","alive","all","alt","altstep","and","and4b","any","anytype", |
| "bitstring","boolean","break","case","call","catch","char","charstring","check","clear","complement", |
| "component","connect","const","continue","control","create","deactivate","decmatch","default","disconnect", |
| "display","do","done","else","encode","enumerated","error","except","exception","execute","extends", |
| "extension","external","fail","false","float","for","friend","from","function","getverdict","getcall", |
| "getreply","goto","group","halt","hexstring","if","ifpresent","import","in","inconc","infinity", |
| "inout","integer","interleave","kill","killed","label","language","length","log","map","match","message", |
| "mixed","mod","modifies","module","modulepar","mtc","noblock","none","not","not_a_number","not4b","nowait", |
| "null","octetstring","of","omit","on","optional","or","or4b","out","override","param","pass","pattern","permutation", |
| "port","present","private","procedure","public","raise","read","receive","record","recursive","rem","repeat", |
| "reply","return","running","runs","select","self","send","sender","set","setencode","setverdict","signature","start", |
| "stop","subset","superset","system","template","testcase","timeout","timer","to","trigger","true","type","union","universal", |
| "unmap","value","valueof","var","variant","verdicttype","while","with","xor","xor4b","bit2hex","bit2int","bit2oct","bit2str", |
| "char2int","float2int","hex2bit","hex2int","hex2oct","hex2str","int2bit","int2char","int2float","int2hex","int2oct","int2str", |
| "int2unichar","ischosen","ispresent","lengthof","oct2bit","oct2hex","oct2int","oct2str","regexp","rnd","sixeof","str2int", |
| "str2oct","substr","unichar2int","replace", |
| # Reserved by extension packages |
| "apply", "assert", "at", "configuration", "conjunct", "cont",", ""delta", "disjunct", "duration", "finished", "history", "implies", |
| "inv", "mode", "notinv", "now", "onentry", "onexit", "par", "prev", "realtime", "seq", "setstate", "static", "stepsize", "stream", |
| "timestamp", "until", "values", "wait", |
| # OO reserved |
| "class", "finally", "object","this" |
| ] |
| |
| # Converts a number to the word representation. |
| # Used to replace the leading numbers to letters in the names |
| # Example: 5qi -> this function is used to replace the 5 to five |
| def numToWords(num): |
| units = ['','one','two','three','four','five','six','seven','eight','nine'] |
| teens = ['','eleven','twelve','thirteen','fourteen','fifteen','sixteen', \ |
| 'seventeen','eighteen','nineteen'] |
| tens = ['','ten','twenty','thirty','forty','fifty','sixty','seventy', \ |
| 'eighty','ninety'] |
| thousands = ['','thousand','million','billion','trillion','quadrillion', \ |
| 'quintillion','sextillion','septillion','octillion', \ |
| 'nonillion','decillion','undecillion','duodecillion', \ |
| 'tredecillion','quattuordecillion','sexdecillion', \ |
| 'septendecillion','octodecillion','novemdecillion', \ |
| 'vigintillion'] |
| words = [] |
| if num==0: words.append('zero') |
| else: |
| numStr = '%d'%num |
| numStrLen = len(numStr) |
| groups = (numStrLen+2)//3 |
| numStr = numStr.zfill(groups*3) |
| for i in range(0,groups*3,3): |
| h,t,u = int(numStr[i]),int(numStr[i+1]),int(numStr[i+2]) |
| g = groups-(i//3+1) |
| if h>=1: |
| words.append(units[h]) |
| words.append('hundred') |
| if t>1: |
| words.append(tens[t]) |
| if u>=1: words.append(units[u]) |
| elif t==1: |
| if u>=1: words.append(teens[u]) |
| else: words.append(tens[t]) |
| else: |
| if u>=1: words.append(units[u]) |
| if (g>=1) and ((h+t+u)>0): words.append(thousands[g]) |
| return ''.join(words) |
| |
| # Adds the module 'new_module' to the import list of the module if it is not in the list |
| def addImport(new_module): |
| if (new_module != module_name) and (not new_module in module_data['import']): |
| module_data['import'].append(new_module) |
| |
| # Converts the identifiers to a valid TTCN-3 identifiers |
| # - Ensure that the identifier starts with letters |
| # * The whole identifier is a number -> Adds "Num_" prefix |
| # * Starts with a number -> convert the initial number part to word -> 5qi -> fiveqi |
| # - Contains only letters, numbers and underscore |
| # * Replaces every other charaters with underscore |
| # * The caller can add extra charatesr to the list of the valid charcters in the 'extrachr' parameter |
| # - Postfix the reserved words with underscore |
| # - Capitalize the first character for type names |
| def clean_name(instr, typename=False, extrachr=""): |
| #print(instr) |
| if instr in ttcn_keywords: |
| return instr + "_" |
| |
| if not isinstance(instr, str): |
| instr=str(instr) |
| |
| if instr.isdigit(): |
| instr = "Num_" + instr |
| |
| m = re.search('(^\d+)(.*)',instr) |
| if m: |
| instr = numToWords(int(m.group(1))) + m.group(2) |
| if typename: |
| a = instr[:1].upper()+instr[1:] |
| instr = a |
| elif instr[:1] =="_": |
| a=instr[1:] |
| instr =a |
| rx='[^a-zA-Z0-9_' + extrachr + ']' |
| return re.sub(rx,'_',instr) |
| |
| |
| # Returns the module and the referenced type name as tuplefrom the $ref |
| # |
| # local reference: '#/components/schemas/SnssaiExtension' -> '', 'SnssaiExtension' |
| # remote reference: 'TS29571_CommonData.yaml#/components/schemas/Ncgi' -> 'TS29571_CommonData', 'Ncgi' |
| # |
| def get_module(ref): |
| if ref[0]=='#': |
| return '', ref.split('/')[-1] |
| else: |
| return ref.split("#")[0].replace(".yaml",""),ref.split('/')[-1] |
| |
| |
| # The process_path, process_schema, and process_used_schem_name functions |
| # parses the paths: parts of the yaml definitions and collects the type refences |
| # Encoder and decoder functions are needed for the referenced types as they are used |
| # in API messages directly. |
| # Also creates prototype for possible arrays |
| # |
| # If the generation of encoder function or array definiton is missed these functions |
| # should be updated to find the missing references. |
| # |
| def process_path(data): |
| for m in data: |
| #print("process_path ", m, " " , data[m]) |
| if m == "parameters": |
| for p in data[m]: |
| process_used_schem_name(p) |
| else: |
| if "parameters" in data[m]: |
| for p in data[m]["parameters"]: |
| process_used_schem_name(p) |
| |
| if "requestBody" in data[m]: |
| process_used_schem_name(data[m]["requestBody"]) |
| if "responses" in data[m]: |
| for r in data[m]["responses"]: |
| process_used_schem_name(data[m]["responses"][r]) |
| if "callbacks" in data[m]: |
| for c in data[m]["callbacks"]: |
| for p in data[m]["callbacks"][c]: |
| process_path(data[m]["callbacks"][c][p]) |
| |
| def process_schema(schema): |
| global module_data |
| if "$ref" in schema: |
| refmodule,refstr=get_module(schema["$ref"]) |
| if refmodule!= '': |
| addImport(refmodule) |
| refstr=refmodule+"."+refstr |
| if not refstr in module_data['functions']: |
| module_data['functions'].append(refstr) |
| elif "type" in schema: |
| if schema["type"] == "array" : |
| if "$ref" in schema["items"] : |
| refmodule,refstr=get_module(schema["items"]["$ref"]) # This direct code generation should be moved out from here. |
| if not (refmodule,refstr) in module_data['extra_lists']: |
| module_data['extra_lists'].append((refmodule,refstr)) |
| # print("// " + refmodule + " type record of " + refstr + " " +refstr + "_list") |
| # print(f'// external function f_enc_{refstr}_list(in {refstr}_list pdu) return octetstring ') |
| # print('// with { extension "prototype(convert) encode(JSON)" }') |
| # print("") |
| # print(f'// external function f_dec_{refstr}_list(in octetstring stream, out {refstr}_list pdu) return integer ') |
| # print('// with { extension "prototype(backtrack) decode(JSON)" }') |
| # print("") |
| if schema["type"] == "object" : |
| if "properties" in schema: |
| if "jsonData" in schema["properties"]: |
| process_schema(schema["properties"]["jsonData"]) |
| |
| def process_used_schem_name(data): |
| #print("process_used_schem_name", data) |
| if "content" in data: |
| for ct in data["content"]: |
| #print("ct ", ct) |
| if "schema" in data["content"][ct]: |
| #print("schema ", data["content"][ct]["schema"]) |
| process_schema(data["content"][ct]["schema"]) |
| |
| if "schema" in data: |
| process_schema(data["schema"]) |
| |
| # Processes one schema definition and build the data structure needed for code generation. |
| # The processed data is appended to the type_tree list. |
| # All processing of the schema definition is done here, except the resolution of the allOf |
| # The allOf should be reprocessed after the schema processing, because the fields needs to be collected from the processed data. |
| def type_builder(name,data,tree): |
| global type_tree |
| #print("type:", name) |
| element_data={} |
| cname=clean_name(name) |
| element_data["name"]=cname |
| element_data["variant"]=[] |
| if name != cname: |
| element_data["variant"].append("name as '"+name+"'") |
| |
| element_data["nullable"]= "nullable" in data |
| |
| if "type" in data: |
| if data["type"] == "string": |
| if "enum" in data: |
| element_data["type"]="enumerated" |
| element_data["values"]=[] |
| for ev in data["enum"]: |
| #print(ev) |
| cename=clean_name(ev,True) |
| element_data["values"].append(cename) |
| if cename != ev: |
| element_data["variant"].append("text '"+ cename +"' as '"+str(ev)+"'") |
| else: |
| element_data["type"]="charstring" |
| if "pattern" in data: |
| element_data["pattern"]=data["pattern"] |
| |
| elif data["type"] == "integer": |
| restrition=False |
| minstr=" (-infinity.." |
| maxstr="infinity)" |
| if "minimum" in data: |
| restrition=True |
| minstr=f' ({data["minimum"]}..' |
| if "maximum" in data: |
| restrition=True |
| maxstr=f'{data["maximum"]})' |
| if restrition: |
| element_data["restriction"]=minstr+maxstr |
| element_data["type"]="integer" |
| |
| elif data["type"] == "number": |
| element_data["type"]="float" |
| |
| elif data["type"] == "boolean": |
| element_data["type"]="boolean" |
| |
| elif data["type"] == "array": |
| element_data["type"]="record of" |
| element_data["inner_type"]={} |
| field=[] |
| type_builder("",data["items"],field) |
| element_data["inner_type"]=field[0] |
| |
| elif data["type"] == "object": |
| if "properties" in data: |
| element_data["type"]="set" |
| if "required" in data: |
| element_data["mandatory"]=[] |
| for r in data["required"]: |
| element_data["mandatory"].append(clean_name(r)) |
| else: |
| element_data["mandatory"]=[] |
| element_data["fields"]=[] |
| prop_data=data["properties"] |
| for prop in prop_data: |
| field=[] |
| type_builder(prop,prop_data[prop],field) |
| element_data["fields"].append(field[0]) |
| elif "additionalProperties" in data: |
| element_data["type"]="set of" |
| element_data["inner_type"]={} |
| element_data["inner_type"]["type"]="record" |
| element_data["inner_type"]["name"]="" |
| element_data["inner_type"]["nullable"]=False |
| element_data["inner_type"]["mandatory"]=["key"] |
| element_data["inner_type"]["fields"]=[] |
| element_data["inner_type"]["fields"].append({'name':'key', 'type':'universal charstring','nullable':False}) |
| field=[] |
| type_builder("additionalProperties",data["additionalProperties"],field) |
| element_data["inner_type"]["fields"].append(field[0]) |
| element_data["variant"].append("as map") |
| else: |
| element_data["type"]="JSON_Generic.JSON_generic_val" |
| addImport("JSON_Generic") |
| else: |
| element_data["type"]=data["type"] |
| #print('!!!!!!unsupported' + data["type"]) |
| |
| elif "$ref" in data: |
| refmodule,refstr=get_module(data["$ref"]) |
| if refmodule!= '': |
| addImport(refmodule) |
| element_data["type"]=refmodule+"."+clean_name(refstr,True) |
| else: |
| element_data["type"]=clean_name(refstr,True) |
| |
| elif "anyOf" in data: |
| element_data["type"]="union" |
| element_data["fields"]=[] |
| num_str=0 |
| num_enum=0 |
| for e in data["anyOf"]: |
| if ("type" in e ) and (e["type"] == "string"): |
| num_str+=1 |
| if "enum" in e: |
| num_enum+=1 |
| |
| if (num_str==2) and (num_enum==1): |
| for e in data["anyOf"]: |
| if "enum" in e: |
| type_builder(name + "_enum",e,type_tree) |
| element_data["fields"].append({'name':'enum_val', 'type':name + "_enum", 'nullable':False}) |
| element_data["fields"].append({'name':'other_val', 'type':'charstring', 'nullable':False}) |
| element_data["variant"].append("JSON: as value") |
| else: |
| element_data["type"]="union" |
| element_data["variant"].append("JSON: as value") |
| element_data["fields"]=[] |
| i=1 |
| for e in data["anyOf"]: |
| field=[] |
| type_builder(f'field{i}',e,field) |
| i+=1 |
| element_data["fields"].append(field[0]) |
| |
| elif "oneOf" in data: |
| element_data["type"]="union" |
| element_data["variant"].append("JSON: as value") |
| element_data["fields"]=[] |
| i=1 |
| for e in data["oneOf"]: |
| field=[] |
| type_builder(f'field{i}',e,field) |
| i+=1 |
| element_data["fields"].append(field[0]) |
| |
| elif "allOf" in data: |
| element_data["allOf"]=True |
| element_data["fields"]=[] |
| i=0 |
| for e in data["allOf"]: |
| if "$ref" in e: |
| refmodule,refstr=get_module(e["$ref"]) |
| element_data["fields"].append({"ref":True,"refmodule":refmodule,"refstr":clean_name(refstr,True)}) |
| else: |
| field=[] |
| type_builder(f'field{i}',e,field) |
| i+=1 |
| element_data["fields"].append(field[0]) |
| element_data["type"]="JSON_Generic.JSON_generic_val" |
| addImport("JSON_Generic") |
| |
| |
| else: |
| element_data["type"]="JSON_Generic.JSON_generic_val" |
| addImport("JSON_Generic") |
| #print('!!!!!!unsupported ',name) |
| #pprint.pprint(data) |
| |
| if "nullable" in data: |
| addImport("JSON_Generic") |
| element_data2={} |
| element_data2["type"]="union" |
| element_data2["name"]=element_data["name"] |
| element_data["name"]="val" |
| element_data["nullable"]=False |
| element_data2["nullable"]=True |
| element_data2["variant"]=[] |
| element_data2["variant"].append("JSON: as value") |
| element_data2["fields"]=[] |
| element_data2["fields"].append({'name':'null_val', 'type':"JSON_generic_val.JSON_null_val", 'nullable':False}) |
| element_data2["fields"].append(element_data) |
| tree.append(element_data2) |
| else: |
| tree.append(element_data) |
| |
| # Finds the data of the type or return None |
| def find_type(tname): |
| for t in type_tree: |
| if t["name"] == tname: |
| return t |
| return None |
| |
| # Generates the TTCN-3 type definition from the processed type data |
| def print_type(t, lend="\n", top=False): |
| global ident_level |
| global ident_c |
| if "allOf" in t: # The allOf should be reprocessed to collect the actual fields. |
| element_data = {} |
| element_data["name"]=t["name"] |
| element_data["type"]="set" |
| element_data["fields"]=[] |
| element_data["variant"]=[] |
| element_data["mandatory"]=[] |
| is_ok=True |
| for f in t["fields"]: # the "fields" contains the reference or the data of the to be merged objects |
| if "ref" in f: # Collect fields from the referenced type |
| if f["refmodule"] == "" : |
| it=find_type(f["refstr"]) |
| if it != None: |
| element_data["fields"]+= it["fields"] |
| element_data["variant"]+= it["variant"] |
| element_data["mandatory"]+= it["mandatory"] |
| else: |
| is_ok=False |
| else: |
| print("// Please add the fields from " + f["refmodule"] + "." + f["refstr"]) |
| elif f["type"] == "set": # Collect fields from the directly defined object |
| element_data["fields"]+= f["fields"] |
| element_data["variant"]+= f["variant"] |
| element_data["mandatory"]+= f["mandatory"] |
| else: |
| is_ok=False |
| |
| if is_ok: # The field collection was successfull, generate the code |
| print_type(element_data,lend,top) |
| t["fields"]=element_data["fields"] |
| |
| else: # the allOf can't be resolved, use generic JSON type |
| del t["fields"] # remove the "fields" list, teh type_builder already filled the type data for the generic JSON type, just use it |
| print_type(t,lend,top) |
| else: |
| print(t["type"], " ",end="",sep="") |
| if top and (t["type"] !="record of"): |
| print(t["name"]," ", end="",sep="") |
| if "fields" in t: |
| print(" {",sep="") |
| ident_level+=1 |
| separatot=ident_c*ident_level |
| for f in t["fields"]: |
| print(separatot, end="",sep="") |
| separatot=",\n" + ident_c*ident_level |
| print_type(f,"") |
| if "mandatory" in t: |
| if f["name"] not in t["mandatory"]: |
| print(" optional", end="") |
| ident_level-=1 |
| print("",sep="") |
| print(ident_c*ident_level,"}",sep="",end=lend) |
| elif "inner_type" in t: |
| print_type(t["inner_type"],"") |
| elif "values" in t: |
| print("{ ",sep="",end="") |
| separatot="" |
| for e in t["values"]: |
| print(separatot, end="",sep="") |
| separatot=", " |
| print(e, end="",sep="") |
| print("}",sep="",end="") |
| |
| if (not top) or (t["type"] =="record of"): |
| print(" ", t["name"],end="",sep="") |
| if "restriction" in t: |
| print(" ",t["restriction"],end="",sep="") |
| print(lend,end="") |
| |
| # Gather the variants for the type, builds the variant references |
| def gather_variants(t,spec,variants): |
| if "variant" in t: |
| for v in t["variant"]: |
| variants.append({"spec":spec, "var":v}) |
| if "fields" in t: |
| for f in t["fields"]: |
| spec2=spec |
| if spec!="": |
| spec2+="." |
| gather_variants(f,spec2+f["name"],variants) |
| |
| # By default the yaml loader parses the Yes, No as bool value instead of string |
| # The bool constructor is overridden to return them as string |
| from yaml.constructor import Constructor |
| |
| def add_bool(self, node): |
| return self.construct_scalar(node) |
| |
| Constructor.add_constructor(u'tag:yaml.org,2002:bool', add_bool) |
| |
| |
| # Processes the content of one yaml file |
| # Fills the module_data. |
| # The code is not generated here. |
| # doc - loaded yaml file |
| # The global variable should be set |
| def process_one_module(doc): |
| print("Processing " + module_name) |
| if "components" in doc: |
| if "schemas" in doc["components"]: |
| schemas=doc["components"]["schemas"] |
| for name in schemas: |
| # The type name suffix 'Rm' is used for a nullable type alias |
| # Generate the nullable structure but use the reference to the original type |
| if (name[-2:] == "Rm" ) and (name[:-2] in schemas ): |
| type_tree.append({'fields': [{'name': 'null_val', |
| 'nullable': False, |
| 'type': 'JSON_generic_val.JSON_null_val'}, |
| {'name': 'val', |
| 'nullable': False, |
| 'type': clean_name(name[:-2],True), |
| 'variant': []}], |
| 'name': clean_name(name,True), |
| 'nullable': True, |
| 'type': 'union', |
| 'variant': ['JSON: as value']}) |
| addImport("JSON_Generic") |
| elif name == "NullValue": # Special type, used for nullable enums |
| type_tree.append({'type': 'JSON_generic_val.JSON_null_val','name':'NullValue','nullable': True}) |
| else: |
| # Normal type schema processing. |
| data=schemas[name] |
| type_builder(clean_name(name,True),data,type_tree) |
| if "responses" in doc["components"]: |
| #print(doc["components"]["responses"]) |
| for r in doc["components"]["responses"]: |
| #print(r) |
| process_used_schem_name(doc["components"]["responses"][r]) |
| |
| if 'paths' in doc: |
| for p in doc["paths"]: |
| #print(p) |
| process_path(doc["paths"][p]) |
| print("Processing completed") |
| |
| # Generates the code of one module |
| # The global variable should be set |
| def generate_one_module(): |
| global ident_level |
| print("Generating code for " + module_name) |
| fout = open(module_name + ".ttcn",'wt') |
| sys.stdout = fout |
| |
| print("module " + module_name + " {") |
| print("") |
| |
| for i in module_data['extra_lists']: |
| refmodule, refstr = i |
| print("// " + refmodule + " type record of " + refstr + " " +refstr + "_list") |
| print(f'// external function f_enc_{refstr}_list(in {refstr}_list pdu) return octetstring ') |
| print('// with { extension "prototype(convert) encode(JSON)" }') |
| print("") |
| print(f'// external function f_dec_{refstr}_list(in octetstring stream, out {refstr}_list pdu) return integer ') |
| print('// with { extension "prototype(backtrack) decode(JSON)" }') |
| print("") |
| |
| for i in module_data['import']: |
| print(f' import from {i} all') |
| print("") |
| |
| for fs in module_data['functions']: |
| f=clean_name(fs,True,".") |
| if "." in f: |
| pre="// " |
| else: |
| pre="" |
| print(pre+f'external function f_enc_{f}(in {f} pdu) return octetstring ') |
| print(pre+'with { extension "prototype(convert) encode(JSON)" }') |
| print("") |
| print(pre+f'external function f_dec_{f}(in octetstring stream, out {f} pdu) return integer ') |
| print(pre+'with { extension "prototype(backtrack) decode(JSON)" }') |
| print("") |
| |
| print("") |
| #pprint.pprint(type_tree) |
| for t in type_tree: |
| print(ident_c*ident_level, "type ",end="",sep="") |
| # ident_level+=1 |
| print_type(t,"", top=True) |
| variants=[] |
| gather_variants(t,"",variants) |
| if variants != []: |
| print(" with {") |
| ident_level+=1 |
| for v in variants: |
| print(ident_c*ident_level, "variant ",end="",sep="") |
| if v["spec"] != "": |
| print("(",v["spec"],") ",end="",sep="") |
| print('"',v["var"],'"',sep="") |
| ident_level-=1 |
| print(ident_c*ident_level, "}",sep="") |
| else: |
| print("") |
| # ident_level-=1 |
| print("") |
| |
| print("") |
| |
| print("") |
| print('') |
| print('} with {') |
| print(' encode "JSON"') |
| print('}') |
| |
| |
| sys.stdout = saved_stdout |
| fout.close |
| print("Code generation completed") |
| |
| |
| # Main code starts here: |
| |
| # global variables |
| |
| saved_stdout = sys.stdout |
| module_data = {} |
| type_tree= [] |
| modules = {} |
| ident_level=1 |
| ident_c=" " |
| module_name = "" |
| |
| |
| |
| for filename in sys.argv[1:]: |
| |
| try: |
| f=open(filename) |
| except: |
| print("Can't open the file: ", filename) |
| pprint.pprint(sys.exc_info()[1]) |
| quit() |
| |
| print("File opened: " + filename) |
| module_name = os.path.splitext(os.path.basename(filename))[0] |
| |
| ydoc=yaml.load(f) |
| |
| # Already processed modules can add functions and type to the module |
| if module_name not in modules: |
| modules[module_name]={'import' : [], "functions" : [], "type_tree": [], "extra_lists":[]} |
| |
| module_data=modules[module_name] |
| type_tree=module_data["type_tree"] |
| |
| process_one_module(ydoc) |
| |
| for module_name in modules: |
| module_data=modules[module_name] |
| type_tree=module_data["type_tree"] |
| |
| generate_one_module() |
| |
| #pprint.pprint(module_data) |