blob: 10583a077e7e2e102d39e42abe2c99bcc7c0915c [file] [log] [blame]
import yaml
import sys
import os
import pprint
import re
ttcn_keywords=["action","activate","address","alive","all","alt","altstep","and","and4b","any","anytype","bitstring","boolean","break","case","call","catch","char","charstring","check","clear","complement","component","connect","const","continue","control","create","deactivate","decmatch","default","disconnect","display","do","done","else","encode","enumerated","error","except","exception","execute","extends","extension","external","fail","false","float","for","friend","from","function","getverdict","getcall","getreply","goto","group","halt","hexstring","if","ifpresent","import","in","inconc","infinity","inout","integer","interleave","kill","killed","label","language","length","log","map","match","message","mixed","mod","modifies","module","modulepar","mtc","noblock","none","not","not_a_number","not4b","nowait","null","octetstring","of","omit","on","optional","or","or4b","out","override","param","pass","pattern","permutation","port","present","private","procedure","public","raise","read","receive","record","recursive","rem","repeat","reply","return","running","runs","select","self","send","sender","set","setencode","setverdict","signature","start","stop","subset","superset","system","template","testcase","timeout","timer","to","trigger","true","type","union","universal","unmap","value","valueof","var","variant","verdicttype","while","with","xor","xor4b","bit2hex","bit2int","bit2oct","bit2str","char2int","float2int","hex2bit","hex2int","hex2oct","hex2str","int2bit","int2char","int2float","int2hex","int2oct","int2str","int2unichar","ischosen","ispresent","lengthof","oct2bit","oct2hex","oct2int","oct2str","regexp","rnd","sixeof","str2int","str2oct","substr","unichar2int","replace"]
def numToWords(num):
'''words = {} convert an integer number into words'''
units = ['','one','two','three','four','five','six','seven','eight','nine']
teens = ['','eleven','twelve','thirteen','fourteen','fifteen','sixteen', \
'seventeen','eighteen','nineteen']
tens = ['','ten','twenty','thirty','forty','fifty','sixty','seventy', \
'eighty','ninety']
thousands = ['','thousand','million','billion','trillion','quadrillion', \
'quintillion','sextillion','septillion','octillion', \
'nonillion','decillion','undecillion','duodecillion', \
'tredecillion','quattuordecillion','sexdecillion', \
'septendecillion','octodecillion','novemdecillion', \
'vigintillion']
words = []
if num==0: words.append('zero')
else:
numStr = '%d'%num
numStrLen = len(numStr)
groups = (numStrLen+2)//3
numStr = numStr.zfill(groups*3)
for i in range(0,groups*3,3):
h,t,u = int(numStr[i]),int(numStr[i+1]),int(numStr[i+2])
g = groups-(i//3+1)
if h>=1:
words.append(units[h])
words.append('hundred')
if t>1:
words.append(tens[t])
if u>=1: words.append(units[u])
elif t==1:
if u>=1: words.append(teens[u])
else: words.append(tens[t])
else:
if u>=1: words.append(units[u])
if (g>=1) and ((h+t+u)>0): words.append(thousands[g]+',')
return ''.join(words)
def clean_name(instr, typename=False):
#print(instr)
if instr in ttcn_keywords:
return instr + "_"
m = re.search('(^\d+)(.*)',instr)
if m:
instr = numToWords(int(m.group(1))) + m.group(2)
# if m.group(2).isupper():
# a = instr.upper()
# instr = a
if typename:
a = instr[:1].upper()+instr[1:]
instr = a
elif instr[:1] =="_":
a=instr[1:]
instr =a
return instr.replace("-","_")
def get_module(ref):
if ref[0]=='#':
return '', ref.split('/')[-1]
else:
return ref.split("#")[0].replace(".yaml",""),ref.split('/')[-1]
def process_path(data,module_data):
for m in data:
#print("process_path ", m, " " , data[m])
if m == "parameters":
for p in data[m]:
process_used_schem_name(p,module_data)
else:
if "parameters" in data[m]:
for p in data[m]["parameters"]:
process_used_schem_name(p,module_data)
if "requestBody" in data[m]:
process_used_schem_name(data[m]["requestBody"],module_data)
if "responses" in data[m]:
for r in data[m]["responses"]:
process_used_schem_name(data[m]["responses"][r],module_data)
if "callbacks" in data[m]:
for c in data[m]["callbacks"]:
for p in data[m]["callbacks"][c]:
process_path(data[m]["callbacks"][c][p],module_data)
def process_schema(schema,module_data):
if "$ref" in schema:
refmodule,refstr=get_module(schema["$ref"])
if refmodule!= '':
if not refmodule in module_data['import']:
module_data['import'].append(refmodule)
refstr=refmodule+"."+refstr
if not refstr in module_data['functions']:
module_data['functions'].append(refstr)
elif "type" in schema:
if schema["type"] == "array" :
if "$ref" in schema["items"] :
refmodule,refstr=get_module(schema["items"]["$ref"])
print("// " + refmodule + " type record of " + refstr + " " +refstr + "_list")
print(f'// external function f_enc_{refstr}_list(in {refstr}_list pdu) return octetstring ')
print('// with { extension "prototype(convert) encode(JSON)" }')
print("")
print(f'// external function f_dec_{refstr}_list(in octetstring stream, out {refstr}_list pdu) return integer ')
print('// with { extension "prototype(backtrack) decode(JSON)" }')
print("")
if schema["type"] == "object" :
if "properties" in schema:
if "jsonData" in schema["properties"]:
process_schema(schema["properties"]["jsonData"],module_data)
def process_used_schem_name(data,module_data):
#print("process_used_schem_name", data)
if "content" in data:
for ct in data["content"]:
#print("ct ", ct)
if "schema" in data["content"][ct]:
#print("schema ", data["content"][ct]["schema"])
process_schema(data["content"][ct]["schema"],module_data)
if "schema" in data:
process_schema(data["schema"],module_data)
def add_buff(buff, txt, end='\n'):
return buff+txt+end
def typewriter(buff, indent, pr_type ,name, data ,module_data):
#print("typewriter ", indent, pr_type ,name, data)
if "type" in data:
if data["type"] == "string":
if "enum" in data:
enum_data=data["enum"]
outstr=f'{indent}{pr_type}enumerated {name} {{'
sep=" "
for i in enum_data:
outstr+=sep+i
sep=", "
outstr+='}'
buff=add_buff(buff,outstr)
else:
buff=add_buff(buff,f'{indent}{pr_type}charstring {name}',end='')
if "pattern" in data:
buff=add_buff(buff,f' // (pattern \"{data["pattern"]}\")',end='')
elif data["type"] == "integer":
restrition=False
restritionstr=""
minstr=" (-infinity.."
maxstr="infinity)"
if "minimum" in data:
restrition=True
minstr=f' ({data["minimum"]}..'
if "maximum" in data:
restrition=True
maxstr=f'{data["maximum"]})'
if restrition:
restritionstr=minstr+maxstr
buff=add_buff(buff,f'{indent}{pr_type}integer {name}{restritionstr}',end='')
elif data["type"] == "number":
buff=add_buff(buff,f'{indent}{pr_type}float {name}',end='')
elif data["type"] == "boolean":
buff=add_buff(buff,f'{indent}{pr_type}boolean {name}',end='')
elif data["type"] == "array":
buff=add_buff(buff,f'{indent}{pr_type}set of ',end='')
buff=typewriter(buff,indent+" ","","",data["items"],module_data)
buff=add_buff(buff,name,end='')
elif data["type"] == "object":
if "required" in data:
required_data=data["required"]
else:
required_data=[]
if "properties" in data:
if pr_type == "":
buff=add_buff(buff,f'{indent}{pr_type}set {{',end='')
else:
buff=add_buff(buff,f'{indent}{pr_type}set {name} {{',end='')
prop_data=data["properties"]
sep=""
for prop in prop_data:
buff=add_buff(buff,sep)
sep=","
opt_str=""
if not prop in required_data:
opt_str=" optional"
buff=typewriter(buff,indent+" ","",prop,prop_data[prop],module_data)
buff=add_buff(buff,opt_str,end='')
buff=add_buff(buff,"")
buff=add_buff(buff," }",end='')
if pr_type == "":
buff=add_buff(buff,name,end='')
elif "additionalProperties" in data:
buff=add_buff(buff,f'{indent}{pr_type}set of record {{')
buff=add_buff(buff,f'{indent} universal charstring key,')
buff=typewriter(buff,indent+" ","","additionalProperties",data["additionalProperties"],module_data)
buff=add_buff(buff,"")
buff=add_buff(buff,f'{indent}}} {name}',end='')
else:
buff=add_buff(buff,f'{indent}{pr_type}object2 {name}',end='')
else:
print (f' // skiped {name}',end='')
elif "$ref" in data:
refmodule,refstr=get_module(data["$ref"])
if refmodule!= '':
if not refmodule in module_data['import']:
module_data['import'].append(refmodule)
refstr=refmodule+"."+refstr
buff=add_buff(buff,f'{indent}{pr_type}{refstr} {name}',end='')
elif "allOf" in data:
#print(name,data["allOf"])
buff=typewriter(buff,indent+" ","type /* allOf */",name,data["allOf"][1],module_data)
elif "anyOf" in data:
enum_found=False
enum_data=''
for e in data["anyOf"]:
if "enum" in e:
enum_data=e["enum"]
enum_found=True
break
if enum_found:
outstr=f' type enumerated {name}_enum {{'
sep=" "
for i in enum_data:
outstr+=sep+i
sep=", "
outstr+='}'
buff=add_buff(buff,outstr)
buff=add_buff(buff,'')
buff=add_buff(buff,f' type union {name} {{')
buff=add_buff(buff,f' {name}_enum enum_val,')
buff=add_buff(buff,' charstring other_val')
buff=add_buff(buff,' } with {')
buff=add_buff(buff,' variant "JSON: as value"')
buff=add_buff(buff,' }')
elif "oneOf" in data:
if pr_type == "":
buff=add_buff(buff,f'{indent}{pr_type}union {{',end='')
else:
buff=add_buff(buff,f'{indent}{pr_type}union {name} {{',end='')
prop_data=data["oneOf"]
sep=""
i=1
for prop in prop_data:
buff=add_buff(buff,sep)
sep=","
opt_str=""
buff=typewriter(buff,indent+" ","",f'field{i}',prop,module_data)
i+=1
buff=add_buff(buff,"")
buff=add_buff(buff,' } with {')
buff=add_buff(buff,' variant "JSON: as value"')
buff=add_buff(buff,' }')
if pr_type == "":
buff=add_buff(buff,name,end='')
else:
print (f' // skiped {name}')
return buff
def type_builder(name,data,tree):
global type_tree
#print("type:", name)
element_data={}
cname=clean_name(name)
element_data["name"]=cname
element_data["variant"]=[]
if name != cname:
element_data["variant"].append("name as '"+name+"'")
element_data["nullable"]= "nullable" in data
if "type" in data:
if data["type"] == "string":
if "enum" in data:
element_data["type"]="enumerated"
element_data["values"]=[]
for ev in data["enum"]:
#print(ev)
cename=clean_name(ev,True)
element_data["values"].append(cename)
if cename != ev:
element_data["variant"].append("text '"+ cename +"' as '"+ev+"'")
else:
element_data["type"]="charstring"
if "pattern" in data:
element_data["pattern"]=data["pattern"]
elif data["type"] == "integer":
restrition=False
minstr=" (-infinity.."
maxstr="infinity)"
if "minimum" in data:
restrition=True
minstr=f' ({data["minimum"]}..'
if "maximum" in data:
restrition=True
maxstr=f'{data["maximum"]})'
if restrition:
element_data["restriction"]=minstr+maxstr
element_data["type"]="integer"
elif data["type"] == "number":
element_data["type"]="float"
elif data["type"] == "boolean":
element_data["type"]="boolean"
elif data["type"] == "array":
element_data["type"]="record of"
element_data["inner_type"]={}
field=[]
type_builder("",data["items"],field)
element_data["inner_type"]=field[0]
elif data["type"] == "object":
if "properties" in data:
element_data["type"]="set"
if "required" in data:
element_data["mandatory"]=[]
for r in data["required"]:
element_data["mandatory"].append(clean_name(r))
else:
element_data["mandatory"]=[]
element_data["fields"]=[]
prop_data=data["properties"]
for prop in prop_data:
field=[]
type_builder(prop,prop_data[prop],field)
element_data["fields"].append(field[0])
elif "additionalProperties" in data:
element_data["type"]="set of"
element_data["inner_type"]={}
element_data["inner_type"]["type"]="record"
element_data["inner_type"]["name"]=""
element_data["inner_type"]["nullable"]=False
element_data["inner_type"]["mandatory"]=["key"]
element_data["inner_type"]["fields"]=[]
element_data["inner_type"]["fields"].append({'name':'key', 'type':'universal charstring','nullable':False})
field=[]
type_builder("additionalProperties",data["additionalProperties"],field)
element_data["inner_type"]["fields"].append(field[0])
element_data["variant"].append("as map")
else:
element_data["type"]="JSON_Generic.JSON_generic_val"
if not "JSON_Generic" in module_data['import']:
module_data['import'].append("JSON_Generic")
else:
element_data["type"]=data["type"]
print('!!!!!!unsupported' + data["type"])
elif "$ref" in data:
refmodule,refstr=get_module(data["$ref"])
if refmodule!= '':
if not refmodule in module_data['import']:
module_data['import'].append(refmodule)
element_data["type"]=refmodule+"."+clean_name(refstr,True)
else:
element_data["type"]=clean_name(refstr,True)
elif "anyOf" in data:
element_data["type"]="union"
element_data["fields"]=[]
num_str=0
num_enum=0
for e in data["anyOf"]:
if ("type" in e ) and (e["type"] == "string"):
num_str+=1
if "enum" in e:
num_enum+=1
if (num_str==2) and (num_enum==1):
for e in data["anyOf"]:
if "enum" in e:
type_builder(name + "_enum",e,type_tree)
element_data["fields"].append({'name':'enum_val', 'type':name + "_enum", 'nullable':False})
element_data["fields"].append({'name':'other_val', 'type':'charstring', 'nullable':False})
element_data["variant"].append("JSON: as value")
else:
element_data["type"]="union"
element_data["variant"].append("JSON: as value")
element_data["fields"]=[]
i=1
for e in data["anyOf"]:
field=[]
type_builder(f'field{i}',e,field)
i+=1
element_data["fields"].append(field[0])
elif "oneOf" in data:
element_data["type"]="union"
element_data["variant"].append("JSON: as value")
element_data["fields"]=[]
i=1
for e in data["oneOf"]:
field=[]
type_builder(f'field{i}',e,field)
i+=1
element_data["fields"].append(field[0])
elif "allOf" in data:
element_data["type"]="JSON_Generic.JSON_generic_val"
if not "JSON_Generic" in module_data['import']:
module_data['import'].append("JSON_Generic")
else:
element_data["type"]="JSON_Generic.JSON_generic_val"
if not "JSON_Generic" in module_data['import']:
module_data['import'].append("JSON_Generic")
print('!!!!!!unsupported ',name)
pprint.pprint(data)
if "nullable" in data:
if not "JSON_Generic" in module_data['import']:
module_data['import'].append("JSON_Generic")
element_data2={}
element_data2["type"]="union"
element_data2["name"]=element_data["name"]
element_data["name"]="val"
element_data["nullable"]=False
element_data2["nullable"]=True
element_data2["variant"]=[]
element_data2["variant"].append("JSON: as value")
element_data2["fields"]=[]
element_data2["fields"].append({'name':'null_val', 'type':"JSON_generic_val.JSON_null_val", 'nullable':False})
element_data2["fields"].append(element_data)
tree.append(element_data2)
else:
tree.append(element_data)
def print_type(t, lend="\n", top=False):
global ident_level
global ident_c
print(t["type"], " ",end="",sep="")
if top and (t["type"] !="record of"):
print(t["name"]," ", end="",sep="")
if "fields" in t:
print(" {",sep="")
ident_level+=1
separatot=ident_c*ident_level
for f in t["fields"]:
print(separatot, end="",sep="")
separatot=",\n" + ident_c*ident_level
print_type(f,"")
if "mandatory" in t:
if f["name"] not in t["mandatory"]:
print(" optional", end="")
if f["nullable"]:
print("/* nullable */", end="",sep="")
ident_level-=1
print("",sep="")
print(ident_c*ident_level,"}",sep="",end=lend)
elif "inner_type" in t:
print_type(t["inner_type"],"")
elif "values" in t:
print("{ ",sep="",end="")
separatot=""
for e in t["values"]:
print(separatot, end="",sep="")
separatot=", "
print(e, end="",sep="")
print("}",sep="",end="")
if (not top) or (t["type"] =="record of"):
print(" ", t["name"],end="",sep="")
if "restriction" in t:
print(" ",t["restriction"],end="",sep="")
print(lend,end="")
def gather_variants(t,spec,variants):
if "variant" in t:
for v in t["variant"]:
variants.append({"spec":spec, "var":v})
if "fields" in t:
for f in t["fields"]:
spec2=spec
if spec!="":
spec2+="."
gather_variants(f,spec2+f["name"],variants)
from yaml.constructor import Constructor
def add_bool(self, node):
return self.construct_scalar(node)
Constructor.add_constructor(u'tag:yaml.org,2002:bool', add_bool)
f=open(sys.argv[1])
module_name = os.path.splitext(os.path.basename(sys.argv[1]))[0]
sys.stdout = open(module_name + ".ttcn",'wt')
print("module " + module_name + " {")
print("")
doc=yaml.load(f)
schemas=[]
type_tree=[]
module_data={'import' : [], "functions" : []}
buff=""
ident_level=1
ident_c=" "
if "components" in doc:
if "schemas" in doc["components"]:
schemas=doc["components"]["schemas"]
for name in schemas:
if (name[-2:] == "Rm" ) and (name[:-2] in schemas ):
type_tree.append({'name':clean_name(name,True),'type':clean_name(name[:-2],True)})
else:
data=schemas[name]
type_builder(clean_name(name,True),data,type_tree)
#buff=typewriter(buff," ", "type ",name,data,module_data)
#buff=add_buff(buff,'')
#buff=add_buff(buff,'')
if "responses" in doc["components"]:
#print(doc["components"]["responses"])
for r in doc["components"]["responses"]:
#print(r)
process_used_schem_name(doc["components"]["responses"][r],module_data)
if 'paths' in doc:
for p in doc["paths"]:
#print(p)
process_path(doc["paths"][p],module_data)
for i in module_data['import']:
print(f' import from {i} all')
print("")
for fs in module_data['functions']:
f=clean_name(fs,True)
print(f'external function f_enc_{f}(in {f} pdu) return octetstring ')
print('with { extension "prototype(convert) encode(JSON)" }')
print("")
print(f'external function f_dec_{f}(in octetstring stream, out {f} pdu) return integer ')
print('with { extension "prototype(backtrack) decode(JSON)" }')
print("")
print("")
#pprint.pprint(type_tree)
for t in type_tree:
print(ident_c*ident_level, "type ",end="",sep="")
# ident_level+=1
print_type(t,"", top=True)
variants=[]
gather_variants(t,"",variants)
if variants != []:
print(" with {")
ident_level+=1
for v in variants:
print(ident_c*ident_level, "variant ",end="",sep="")
if v["spec"] != "":
print("(",v["spec"],") ",end="",sep="")
print('"',v["var"],'"',sep="")
ident_level-=1
print(ident_c*ident_level, "}",sep="")
else:
print("")
# ident_level-=1
print("")
#print(buff)
print("")
print("")
print('')
print('} with {')
print(' encode "JSON"')
print('}')
pprint.pprint(type_tree)