OpenApi converter script added
Signed-off-by: Gabor Szalai <gabor.szalai@ericsson.com>
diff --git a/tools/OpenApi.py b/tools/OpenApi.py
new file mode 100755
index 0000000..10583a0
--- /dev/null
+++ b/tools/OpenApi.py
@@ -0,0 +1,588 @@
+import yaml
+import sys
+import os
+import pprint
+import re
+
+ttcn_keywords=["action","activate","address","alive","all","alt","altstep","and","and4b","any","anytype","bitstring","boolean","break","case","call","catch","char","charstring","check","clear","complement","component","connect","const","continue","control","create","deactivate","decmatch","default","disconnect","display","do","done","else","encode","enumerated","error","except","exception","execute","extends","extension","external","fail","false","float","for","friend","from","function","getverdict","getcall","getreply","goto","group","halt","hexstring","if","ifpresent","import","in","inconc","infinity","inout","integer","interleave","kill","killed","label","language","length","log","map","match","message","mixed","mod","modifies","module","modulepar","mtc","noblock","none","not","not_a_number","not4b","nowait","null","octetstring","of","omit","on","optional","or","or4b","out","override","param","pass","pattern","permutation","port","present","private","procedure","public","raise","read","receive","record","recursive","rem","repeat","reply","return","running","runs","select","self","send","sender","set","setencode","setverdict","signature","start","stop","subset","superset","system","template","testcase","timeout","timer","to","trigger","true","type","union","universal","unmap","value","valueof","var","variant","verdicttype","while","with","xor","xor4b","bit2hex","bit2int","bit2oct","bit2str","char2int","float2int","hex2bit","hex2int","hex2oct","hex2str","int2bit","int2char","int2float","int2hex","int2oct","int2str","int2unichar","ischosen","ispresent","lengthof","oct2bit","oct2hex","oct2int","oct2str","regexp","rnd","sixeof","str2int","str2oct","substr","unichar2int","replace"]
+
+def numToWords(num):
+ '''words = {} convert an integer number into words'''
+ units = ['','one','two','three','four','five','six','seven','eight','nine']
+ teens = ['','eleven','twelve','thirteen','fourteen','fifteen','sixteen', \
+ 'seventeen','eighteen','nineteen']
+ tens = ['','ten','twenty','thirty','forty','fifty','sixty','seventy', \
+ 'eighty','ninety']
+ thousands = ['','thousand','million','billion','trillion','quadrillion', \
+ 'quintillion','sextillion','septillion','octillion', \
+ 'nonillion','decillion','undecillion','duodecillion', \
+ 'tredecillion','quattuordecillion','sexdecillion', \
+ 'septendecillion','octodecillion','novemdecillion', \
+ 'vigintillion']
+ words = []
+ if num==0: words.append('zero')
+ else:
+ numStr = '%d'%num
+ numStrLen = len(numStr)
+ groups = (numStrLen+2)//3
+ numStr = numStr.zfill(groups*3)
+ for i in range(0,groups*3,3):
+ h,t,u = int(numStr[i]),int(numStr[i+1]),int(numStr[i+2])
+ g = groups-(i//3+1)
+ if h>=1:
+ words.append(units[h])
+ words.append('hundred')
+ if t>1:
+ words.append(tens[t])
+ if u>=1: words.append(units[u])
+ elif t==1:
+ if u>=1: words.append(teens[u])
+ else: words.append(tens[t])
+ else:
+ if u>=1: words.append(units[u])
+ if (g>=1) and ((h+t+u)>0): words.append(thousands[g]+',')
+ return ''.join(words)
+
+
+def clean_name(instr, typename=False):
+ #print(instr)
+ if instr in ttcn_keywords:
+ return instr + "_"
+
+ m = re.search('(^\d+)(.*)',instr)
+ if m:
+ instr = numToWords(int(m.group(1))) + m.group(2)
+ # if m.group(2).isupper():
+ # a = instr.upper()
+ # instr = a
+ if typename:
+ a = instr[:1].upper()+instr[1:]
+ instr = a
+ elif instr[:1] =="_":
+ a=instr[1:]
+ instr =a
+ return instr.replace("-","_")
+
+def get_module(ref):
+ if ref[0]=='#':
+ return '', ref.split('/')[-1]
+ else:
+ return ref.split("#")[0].replace(".yaml",""),ref.split('/')[-1]
+
+
+def process_path(data,module_data):
+ for m in data:
+ #print("process_path ", m, " " , data[m])
+ if m == "parameters":
+ for p in data[m]:
+ process_used_schem_name(p,module_data)
+ else:
+ if "parameters" in data[m]:
+ for p in data[m]["parameters"]:
+ process_used_schem_name(p,module_data)
+
+ if "requestBody" in data[m]:
+ process_used_schem_name(data[m]["requestBody"],module_data)
+ if "responses" in data[m]:
+ for r in data[m]["responses"]:
+ process_used_schem_name(data[m]["responses"][r],module_data)
+ if "callbacks" in data[m]:
+ for c in data[m]["callbacks"]:
+ for p in data[m]["callbacks"][c]:
+ process_path(data[m]["callbacks"][c][p],module_data)
+
+def process_schema(schema,module_data):
+ if "$ref" in schema:
+ refmodule,refstr=get_module(schema["$ref"])
+ if refmodule!= '':
+ if not refmodule in module_data['import']:
+ module_data['import'].append(refmodule)
+ refstr=refmodule+"."+refstr
+ if not refstr in module_data['functions']:
+ module_data['functions'].append(refstr)
+ elif "type" in schema:
+ if schema["type"] == "array" :
+ if "$ref" in schema["items"] :
+ refmodule,refstr=get_module(schema["items"]["$ref"])
+ print("// " + refmodule + " type record of " + refstr + " " +refstr + "_list")
+ print(f'// external function f_enc_{refstr}_list(in {refstr}_list pdu) return octetstring ')
+ print('// with { extension "prototype(convert) encode(JSON)" }')
+ print("")
+ print(f'// external function f_dec_{refstr}_list(in octetstring stream, out {refstr}_list pdu) return integer ')
+ print('// with { extension "prototype(backtrack) decode(JSON)" }')
+ print("")
+ if schema["type"] == "object" :
+ if "properties" in schema:
+ if "jsonData" in schema["properties"]:
+ process_schema(schema["properties"]["jsonData"],module_data)
+
+def process_used_schem_name(data,module_data):
+ #print("process_used_schem_name", data)
+ if "content" in data:
+ for ct in data["content"]:
+ #print("ct ", ct)
+ if "schema" in data["content"][ct]:
+ #print("schema ", data["content"][ct]["schema"])
+ process_schema(data["content"][ct]["schema"],module_data)
+
+ if "schema" in data:
+ process_schema(data["schema"],module_data)
+
+def add_buff(buff, txt, end='\n'):
+ return buff+txt+end
+
+def typewriter(buff, indent, pr_type ,name, data ,module_data):
+ #print("typewriter ", indent, pr_type ,name, data)
+ if "type" in data:
+ if data["type"] == "string":
+ if "enum" in data:
+ enum_data=data["enum"]
+ outstr=f'{indent}{pr_type}enumerated {name} {{'
+ sep=" "
+ for i in enum_data:
+ outstr+=sep+i
+ sep=", "
+ outstr+='}'
+ buff=add_buff(buff,outstr)
+
+ else:
+ buff=add_buff(buff,f'{indent}{pr_type}charstring {name}',end='')
+ if "pattern" in data:
+ buff=add_buff(buff,f' // (pattern \"{data["pattern"]}\")',end='')
+ elif data["type"] == "integer":
+ restrition=False
+ restritionstr=""
+ minstr=" (-infinity.."
+ maxstr="infinity)"
+ if "minimum" in data:
+ restrition=True
+ minstr=f' ({data["minimum"]}..'
+ if "maximum" in data:
+ restrition=True
+ maxstr=f'{data["maximum"]})'
+ if restrition:
+ restritionstr=minstr+maxstr
+ buff=add_buff(buff,f'{indent}{pr_type}integer {name}{restritionstr}',end='')
+ elif data["type"] == "number":
+ buff=add_buff(buff,f'{indent}{pr_type}float {name}',end='')
+ elif data["type"] == "boolean":
+ buff=add_buff(buff,f'{indent}{pr_type}boolean {name}',end='')
+ elif data["type"] == "array":
+ buff=add_buff(buff,f'{indent}{pr_type}set of ',end='')
+ buff=typewriter(buff,indent+" ","","",data["items"],module_data)
+ buff=add_buff(buff,name,end='')
+ elif data["type"] == "object":
+ if "required" in data:
+ required_data=data["required"]
+ else:
+ required_data=[]
+ if "properties" in data:
+ if pr_type == "":
+ buff=add_buff(buff,f'{indent}{pr_type}set {{',end='')
+ else:
+ buff=add_buff(buff,f'{indent}{pr_type}set {name} {{',end='')
+ prop_data=data["properties"]
+ sep=""
+ for prop in prop_data:
+ buff=add_buff(buff,sep)
+ sep=","
+ opt_str=""
+ if not prop in required_data:
+ opt_str=" optional"
+ buff=typewriter(buff,indent+" ","",prop,prop_data[prop],module_data)
+ buff=add_buff(buff,opt_str,end='')
+ buff=add_buff(buff,"")
+ buff=add_buff(buff," }",end='')
+ if pr_type == "":
+ buff=add_buff(buff,name,end='')
+ elif "additionalProperties" in data:
+ buff=add_buff(buff,f'{indent}{pr_type}set of record {{')
+ buff=add_buff(buff,f'{indent} universal charstring key,')
+ buff=typewriter(buff,indent+" ","","additionalProperties",data["additionalProperties"],module_data)
+ buff=add_buff(buff,"")
+ buff=add_buff(buff,f'{indent}}} {name}',end='')
+ else:
+ buff=add_buff(buff,f'{indent}{pr_type}object2 {name}',end='')
+
+ else:
+ print (f' // skiped {name}',end='')
+ elif "$ref" in data:
+ refmodule,refstr=get_module(data["$ref"])
+ if refmodule!= '':
+ if not refmodule in module_data['import']:
+ module_data['import'].append(refmodule)
+ refstr=refmodule+"."+refstr
+ buff=add_buff(buff,f'{indent}{pr_type}{refstr} {name}',end='')
+ elif "allOf" in data:
+ #print(name,data["allOf"])
+ buff=typewriter(buff,indent+" ","type /* allOf */",name,data["allOf"][1],module_data)
+ elif "anyOf" in data:
+ enum_found=False
+ enum_data=''
+ for e in data["anyOf"]:
+ if "enum" in e:
+ enum_data=e["enum"]
+ enum_found=True
+ break
+
+ if enum_found:
+ outstr=f' type enumerated {name}_enum {{'
+ sep=" "
+ for i in enum_data:
+ outstr+=sep+i
+ sep=", "
+ outstr+='}'
+ buff=add_buff(buff,outstr)
+ buff=add_buff(buff,'')
+ buff=add_buff(buff,f' type union {name} {{')
+ buff=add_buff(buff,f' {name}_enum enum_val,')
+ buff=add_buff(buff,' charstring other_val')
+ buff=add_buff(buff,' } with {')
+ buff=add_buff(buff,' variant "JSON: as value"')
+ buff=add_buff(buff,' }')
+
+
+ elif "oneOf" in data:
+ if pr_type == "":
+ buff=add_buff(buff,f'{indent}{pr_type}union {{',end='')
+ else:
+ buff=add_buff(buff,f'{indent}{pr_type}union {name} {{',end='')
+ prop_data=data["oneOf"]
+ sep=""
+ i=1
+ for prop in prop_data:
+ buff=add_buff(buff,sep)
+ sep=","
+ opt_str=""
+ buff=typewriter(buff,indent+" ","",f'field{i}',prop,module_data)
+ i+=1
+ buff=add_buff(buff,"")
+ buff=add_buff(buff,' } with {')
+ buff=add_buff(buff,' variant "JSON: as value"')
+ buff=add_buff(buff,' }')
+ if pr_type == "":
+ buff=add_buff(buff,name,end='')
+
+ else:
+ print (f' // skiped {name}')
+ return buff
+
+def type_builder(name,data,tree):
+ global type_tree
+ #print("type:", name)
+ element_data={}
+ cname=clean_name(name)
+ element_data["name"]=cname
+ element_data["variant"]=[]
+ if name != cname:
+ element_data["variant"].append("name as '"+name+"'")
+
+ element_data["nullable"]= "nullable" in data
+
+ if "type" in data:
+ if data["type"] == "string":
+ if "enum" in data:
+ element_data["type"]="enumerated"
+ element_data["values"]=[]
+ for ev in data["enum"]:
+ #print(ev)
+ cename=clean_name(ev,True)
+ element_data["values"].append(cename)
+ if cename != ev:
+ element_data["variant"].append("text '"+ cename +"' as '"+ev+"'")
+ else:
+ element_data["type"]="charstring"
+ if "pattern" in data:
+ element_data["pattern"]=data["pattern"]
+
+ elif data["type"] == "integer":
+ restrition=False
+ minstr=" (-infinity.."
+ maxstr="infinity)"
+ if "minimum" in data:
+ restrition=True
+ minstr=f' ({data["minimum"]}..'
+ if "maximum" in data:
+ restrition=True
+ maxstr=f'{data["maximum"]})'
+ if restrition:
+ element_data["restriction"]=minstr+maxstr
+ element_data["type"]="integer"
+
+ elif data["type"] == "number":
+ element_data["type"]="float"
+
+ elif data["type"] == "boolean":
+ element_data["type"]="boolean"
+
+ elif data["type"] == "array":
+ element_data["type"]="record of"
+ element_data["inner_type"]={}
+ field=[]
+ type_builder("",data["items"],field)
+ element_data["inner_type"]=field[0]
+
+ elif data["type"] == "object":
+ if "properties" in data:
+ element_data["type"]="set"
+ if "required" in data:
+ element_data["mandatory"]=[]
+ for r in data["required"]:
+ element_data["mandatory"].append(clean_name(r))
+ else:
+ element_data["mandatory"]=[]
+ element_data["fields"]=[]
+ prop_data=data["properties"]
+ for prop in prop_data:
+ field=[]
+ type_builder(prop,prop_data[prop],field)
+ element_data["fields"].append(field[0])
+ elif "additionalProperties" in data:
+ element_data["type"]="set of"
+ element_data["inner_type"]={}
+ element_data["inner_type"]["type"]="record"
+ element_data["inner_type"]["name"]=""
+ element_data["inner_type"]["nullable"]=False
+ element_data["inner_type"]["mandatory"]=["key"]
+ element_data["inner_type"]["fields"]=[]
+ element_data["inner_type"]["fields"].append({'name':'key', 'type':'universal charstring','nullable':False})
+ field=[]
+ type_builder("additionalProperties",data["additionalProperties"],field)
+ element_data["inner_type"]["fields"].append(field[0])
+ element_data["variant"].append("as map")
+ else:
+ element_data["type"]="JSON_Generic.JSON_generic_val"
+ if not "JSON_Generic" in module_data['import']:
+ module_data['import'].append("JSON_Generic")
+ else:
+ element_data["type"]=data["type"]
+ print('!!!!!!unsupported' + data["type"])
+
+ elif "$ref" in data:
+ refmodule,refstr=get_module(data["$ref"])
+ if refmodule!= '':
+ if not refmodule in module_data['import']:
+ module_data['import'].append(refmodule)
+ element_data["type"]=refmodule+"."+clean_name(refstr,True)
+ else:
+ element_data["type"]=clean_name(refstr,True)
+
+ elif "anyOf" in data:
+ element_data["type"]="union"
+ element_data["fields"]=[]
+ num_str=0
+ num_enum=0
+ for e in data["anyOf"]:
+ if ("type" in e ) and (e["type"] == "string"):
+ num_str+=1
+ if "enum" in e:
+ num_enum+=1
+
+ if (num_str==2) and (num_enum==1):
+ for e in data["anyOf"]:
+ if "enum" in e:
+ type_builder(name + "_enum",e,type_tree)
+ element_data["fields"].append({'name':'enum_val', 'type':name + "_enum", 'nullable':False})
+ element_data["fields"].append({'name':'other_val', 'type':'charstring', 'nullable':False})
+ element_data["variant"].append("JSON: as value")
+ else:
+ element_data["type"]="union"
+ element_data["variant"].append("JSON: as value")
+ element_data["fields"]=[]
+ i=1
+ for e in data["anyOf"]:
+ field=[]
+ type_builder(f'field{i}',e,field)
+ i+=1
+ element_data["fields"].append(field[0])
+
+ elif "oneOf" in data:
+ element_data["type"]="union"
+ element_data["variant"].append("JSON: as value")
+ element_data["fields"]=[]
+ i=1
+ for e in data["oneOf"]:
+ field=[]
+ type_builder(f'field{i}',e,field)
+ i+=1
+ element_data["fields"].append(field[0])
+
+ elif "allOf" in data:
+ element_data["type"]="JSON_Generic.JSON_generic_val"
+ if not "JSON_Generic" in module_data['import']:
+ module_data['import'].append("JSON_Generic")
+
+
+ else:
+ element_data["type"]="JSON_Generic.JSON_generic_val"
+ if not "JSON_Generic" in module_data['import']:
+ module_data['import'].append("JSON_Generic")
+ print('!!!!!!unsupported ',name)
+ pprint.pprint(data)
+
+ if "nullable" in data:
+ if not "JSON_Generic" in module_data['import']:
+ module_data['import'].append("JSON_Generic")
+ element_data2={}
+ element_data2["type"]="union"
+ element_data2["name"]=element_data["name"]
+ element_data["name"]="val"
+ element_data["nullable"]=False
+ element_data2["nullable"]=True
+ element_data2["variant"]=[]
+ element_data2["variant"].append("JSON: as value")
+ element_data2["fields"]=[]
+ element_data2["fields"].append({'name':'null_val', 'type':"JSON_generic_val.JSON_null_val", 'nullable':False})
+ element_data2["fields"].append(element_data)
+ tree.append(element_data2)
+ else:
+ tree.append(element_data)
+
+def print_type(t, lend="\n", top=False):
+ global ident_level
+ global ident_c
+ print(t["type"], " ",end="",sep="")
+ if top and (t["type"] !="record of"):
+ print(t["name"]," ", end="",sep="")
+ if "fields" in t:
+ print(" {",sep="")
+ ident_level+=1
+ separatot=ident_c*ident_level
+ for f in t["fields"]:
+ print(separatot, end="",sep="")
+ separatot=",\n" + ident_c*ident_level
+ print_type(f,"")
+ if "mandatory" in t:
+ if f["name"] not in t["mandatory"]:
+ print(" optional", end="")
+ if f["nullable"]:
+ print("/* nullable */", end="",sep="")
+ ident_level-=1
+ print("",sep="")
+ print(ident_c*ident_level,"}",sep="",end=lend)
+ elif "inner_type" in t:
+ print_type(t["inner_type"],"")
+ elif "values" in t:
+ print("{ ",sep="",end="")
+ separatot=""
+ for e in t["values"]:
+ print(separatot, end="",sep="")
+ separatot=", "
+ print(e, end="",sep="")
+ print("}",sep="",end="")
+
+ if (not top) or (t["type"] =="record of"):
+ print(" ", t["name"],end="",sep="")
+ if "restriction" in t:
+ print(" ",t["restriction"],end="",sep="")
+ print(lend,end="")
+
+def gather_variants(t,spec,variants):
+ if "variant" in t:
+ for v in t["variant"]:
+ variants.append({"spec":spec, "var":v})
+ if "fields" in t:
+ for f in t["fields"]:
+ spec2=spec
+ if spec!="":
+ spec2+="."
+ gather_variants(f,spec2+f["name"],variants)
+
+from yaml.constructor import Constructor
+
+def add_bool(self, node):
+ return self.construct_scalar(node)
+
+Constructor.add_constructor(u'tag:yaml.org,2002:bool', add_bool)
+
+f=open(sys.argv[1])
+
+module_name = os.path.splitext(os.path.basename(sys.argv[1]))[0]
+
+sys.stdout = open(module_name + ".ttcn",'wt')
+
+print("module " + module_name + " {")
+print("")
+
+doc=yaml.load(f)
+
+schemas=[]
+
+type_tree=[]
+
+module_data={'import' : [], "functions" : []}
+buff=""
+
+ident_level=1
+ident_c=" "
+
+if "components" in doc:
+ if "schemas" in doc["components"]:
+ schemas=doc["components"]["schemas"]
+ for name in schemas:
+ if (name[-2:] == "Rm" ) and (name[:-2] in schemas ):
+ type_tree.append({'name':clean_name(name,True),'type':clean_name(name[:-2],True)})
+ else:
+ data=schemas[name]
+ type_builder(clean_name(name,True),data,type_tree)
+ #buff=typewriter(buff," ", "type ",name,data,module_data)
+ #buff=add_buff(buff,'')
+ #buff=add_buff(buff,'')
+ if "responses" in doc["components"]:
+ #print(doc["components"]["responses"])
+ for r in doc["components"]["responses"]:
+ #print(r)
+ process_used_schem_name(doc["components"]["responses"][r],module_data)
+
+if 'paths' in doc:
+ for p in doc["paths"]:
+ #print(p)
+ process_path(doc["paths"][p],module_data)
+
+
+for i in module_data['import']:
+ print(f' import from {i} all')
+print("")
+
+for fs in module_data['functions']:
+ f=clean_name(fs,True)
+ print(f'external function f_enc_{f}(in {f} pdu) return octetstring ')
+ print('with { extension "prototype(convert) encode(JSON)" }')
+ print("")
+ print(f'external function f_dec_{f}(in octetstring stream, out {f} pdu) return integer ')
+ print('with { extension "prototype(backtrack) decode(JSON)" }')
+ print("")
+
+print("")
+#pprint.pprint(type_tree)
+for t in type_tree:
+ print(ident_c*ident_level, "type ",end="",sep="")
+# ident_level+=1
+ print_type(t,"", top=True)
+ variants=[]
+ gather_variants(t,"",variants)
+ if variants != []:
+ print(" with {")
+ ident_level+=1
+ for v in variants:
+ print(ident_c*ident_level, "variant ",end="",sep="")
+ if v["spec"] != "":
+ print("(",v["spec"],") ",end="",sep="")
+ print('"',v["var"],'"',sep="")
+ ident_level-=1
+ print(ident_c*ident_level, "}",sep="")
+ else:
+ print("")
+# ident_level-=1
+ print("")
+
+#print(buff)
+print("")
+
+print("")
+print('')
+print('} with {')
+print(' encode "JSON"')
+print('}')
+
+pprint.pprint(type_tree)