| import yaml |
| import sys |
| import os |
| import pprint |
| import re |
| |
| ttcn_keywords=["action","activate","address","alive","all","alt","altstep","and","and4b","any","anytype","bitstring","boolean","break","case","call","catch","char","charstring","check","clear","complement","component","connect","const","continue","control","create","deactivate","decmatch","default","disconnect","display","do","done","else","encode","enumerated","error","except","exception","execute","extends","extension","external","fail","false","float","for","friend","from","function","getverdict","getcall","getreply","goto","group","halt","hexstring","if","ifpresent","import","in","inconc","infinity","inout","integer","interleave","kill","killed","label","language","length","log","map","match","message","mixed","mod","modifies","module","modulepar","mtc","noblock","none","not","not_a_number","not4b","nowait","null","octetstring","of","omit","on","optional","or","or4b","out","override","param","pass","pattern","permutation","port","present","private","procedure","public","raise","read","receive","record","recursive","rem","repeat","reply","return","running","runs","select","self","send","sender","set","setencode","setverdict","signature","start","stop","subset","superset","system","template","testcase","timeout","timer","to","trigger","true","type","union","universal","unmap","value","valueof","var","variant","verdicttype","while","with","xor","xor4b","bit2hex","bit2int","bit2oct","bit2str","char2int","float2int","hex2bit","hex2int","hex2oct","hex2str","int2bit","int2char","int2float","int2hex","int2oct","int2str","int2unichar","ischosen","ispresent","lengthof","oct2bit","oct2hex","oct2int","oct2str","regexp","rnd","sixeof","str2int","str2oct","substr","unichar2int","replace"] |
| |
| def numToWords(num): |
| '''words = {} convert an integer number into words''' |
| units = ['','one','two','three','four','five','six','seven','eight','nine'] |
| teens = ['','eleven','twelve','thirteen','fourteen','fifteen','sixteen', \ |
| 'seventeen','eighteen','nineteen'] |
| tens = ['','ten','twenty','thirty','forty','fifty','sixty','seventy', \ |
| 'eighty','ninety'] |
| thousands = ['','thousand','million','billion','trillion','quadrillion', \ |
| 'quintillion','sextillion','septillion','octillion', \ |
| 'nonillion','decillion','undecillion','duodecillion', \ |
| 'tredecillion','quattuordecillion','sexdecillion', \ |
| 'septendecillion','octodecillion','novemdecillion', \ |
| 'vigintillion'] |
| words = [] |
| if num==0: words.append('zero') |
| else: |
| numStr = '%d'%num |
| numStrLen = len(numStr) |
| groups = (numStrLen+2)//3 |
| numStr = numStr.zfill(groups*3) |
| for i in range(0,groups*3,3): |
| h,t,u = int(numStr[i]),int(numStr[i+1]),int(numStr[i+2]) |
| g = groups-(i//3+1) |
| if h>=1: |
| words.append(units[h]) |
| words.append('hundred') |
| if t>1: |
| words.append(tens[t]) |
| if u>=1: words.append(units[u]) |
| elif t==1: |
| if u>=1: words.append(teens[u]) |
| else: words.append(tens[t]) |
| else: |
| if u>=1: words.append(units[u]) |
| if (g>=1) and ((h+t+u)>0): words.append(thousands[g]+',') |
| return ''.join(words) |
| |
| |
| def clean_name(instr, typename=False): |
| #print(instr) |
| if instr in ttcn_keywords: |
| return instr + "_" |
| |
| m = re.search('(^\d+)(.*)',instr) |
| if m: |
| instr = numToWords(int(m.group(1))) + m.group(2) |
| # if m.group(2).isupper(): |
| # a = instr.upper() |
| # instr = a |
| if typename: |
| a = instr[:1].upper()+instr[1:] |
| instr = a |
| elif instr[:1] =="_": |
| a=instr[1:] |
| instr =a |
| return instr.replace("-","_") |
| |
| def get_module(ref): |
| if ref[0]=='#': |
| return '', ref.split('/')[-1] |
| else: |
| return ref.split("#")[0].replace(".yaml",""),ref.split('/')[-1] |
| |
| |
| def process_path(data,module_data): |
| for m in data: |
| #print("process_path ", m, " " , data[m]) |
| if m == "parameters": |
| for p in data[m]: |
| process_used_schem_name(p,module_data) |
| else: |
| if "parameters" in data[m]: |
| for p in data[m]["parameters"]: |
| process_used_schem_name(p,module_data) |
| |
| if "requestBody" in data[m]: |
| process_used_schem_name(data[m]["requestBody"],module_data) |
| if "responses" in data[m]: |
| for r in data[m]["responses"]: |
| process_used_schem_name(data[m]["responses"][r],module_data) |
| if "callbacks" in data[m]: |
| for c in data[m]["callbacks"]: |
| for p in data[m]["callbacks"][c]: |
| process_path(data[m]["callbacks"][c][p],module_data) |
| |
| def process_schema(schema,module_data): |
| if "$ref" in schema: |
| refmodule,refstr=get_module(schema["$ref"]) |
| if refmodule!= '': |
| if not refmodule in module_data['import']: |
| module_data['import'].append(refmodule) |
| refstr=refmodule+"."+refstr |
| if not refstr in module_data['functions']: |
| module_data['functions'].append(refstr) |
| elif "type" in schema: |
| if schema["type"] == "array" : |
| if "$ref" in schema["items"] : |
| refmodule,refstr=get_module(schema["items"]["$ref"]) |
| print("// " + refmodule + " type record of " + refstr + " " +refstr + "_list") |
| print(f'// external function f_enc_{refstr}_list(in {refstr}_list pdu) return octetstring ') |
| print('// with { extension "prototype(convert) encode(JSON)" }') |
| print("") |
| print(f'// external function f_dec_{refstr}_list(in octetstring stream, out {refstr}_list pdu) return integer ') |
| print('// with { extension "prototype(backtrack) decode(JSON)" }') |
| print("") |
| if schema["type"] == "object" : |
| if "properties" in schema: |
| if "jsonData" in schema["properties"]: |
| process_schema(schema["properties"]["jsonData"],module_data) |
| |
| def process_used_schem_name(data,module_data): |
| #print("process_used_schem_name", data) |
| if "content" in data: |
| for ct in data["content"]: |
| #print("ct ", ct) |
| if "schema" in data["content"][ct]: |
| #print("schema ", data["content"][ct]["schema"]) |
| process_schema(data["content"][ct]["schema"],module_data) |
| |
| if "schema" in data: |
| process_schema(data["schema"],module_data) |
| |
| def add_buff(buff, txt, end='\n'): |
| return buff+txt+end |
| |
| def typewriter(buff, indent, pr_type ,name, data ,module_data): |
| #print("typewriter ", indent, pr_type ,name, data) |
| if "type" in data: |
| if data["type"] == "string": |
| if "enum" in data: |
| enum_data=data["enum"] |
| outstr=f'{indent}{pr_type}enumerated {name} {{' |
| sep=" " |
| for i in enum_data: |
| outstr+=sep+i |
| sep=", " |
| outstr+='}' |
| buff=add_buff(buff,outstr) |
| |
| else: |
| buff=add_buff(buff,f'{indent}{pr_type}charstring {name}',end='') |
| if "pattern" in data: |
| buff=add_buff(buff,f' // (pattern \"{data["pattern"]}\")',end='') |
| elif data["type"] == "integer": |
| restrition=False |
| restritionstr="" |
| minstr=" (-infinity.." |
| maxstr="infinity)" |
| if "minimum" in data: |
| restrition=True |
| minstr=f' ({data["minimum"]}..' |
| if "maximum" in data: |
| restrition=True |
| maxstr=f'{data["maximum"]})' |
| if restrition: |
| restritionstr=minstr+maxstr |
| buff=add_buff(buff,f'{indent}{pr_type}integer {name}{restritionstr}',end='') |
| elif data["type"] == "number": |
| buff=add_buff(buff,f'{indent}{pr_type}float {name}',end='') |
| elif data["type"] == "boolean": |
| buff=add_buff(buff,f'{indent}{pr_type}boolean {name}',end='') |
| elif data["type"] == "array": |
| buff=add_buff(buff,f'{indent}{pr_type}set of ',end='') |
| buff=typewriter(buff,indent+" ","","",data["items"],module_data) |
| buff=add_buff(buff,name,end='') |
| elif data["type"] == "object": |
| if "required" in data: |
| required_data=data["required"] |
| else: |
| required_data=[] |
| if "properties" in data: |
| if pr_type == "": |
| buff=add_buff(buff,f'{indent}{pr_type}set {{',end='') |
| else: |
| buff=add_buff(buff,f'{indent}{pr_type}set {name} {{',end='') |
| prop_data=data["properties"] |
| sep="" |
| for prop in prop_data: |
| buff=add_buff(buff,sep) |
| sep="," |
| opt_str="" |
| if not prop in required_data: |
| opt_str=" optional" |
| buff=typewriter(buff,indent+" ","",prop,prop_data[prop],module_data) |
| buff=add_buff(buff,opt_str,end='') |
| buff=add_buff(buff,"") |
| buff=add_buff(buff," }",end='') |
| if pr_type == "": |
| buff=add_buff(buff,name,end='') |
| elif "additionalProperties" in data: |
| buff=add_buff(buff,f'{indent}{pr_type}set of record {{') |
| buff=add_buff(buff,f'{indent} universal charstring key,') |
| buff=typewriter(buff,indent+" ","","additionalProperties",data["additionalProperties"],module_data) |
| buff=add_buff(buff,"") |
| buff=add_buff(buff,f'{indent}}} {name}',end='') |
| else: |
| buff=add_buff(buff,f'{indent}{pr_type}object2 {name}',end='') |
| |
| else: |
| print (f' // skiped {name}',end='') |
| elif "$ref" in data: |
| refmodule,refstr=get_module(data["$ref"]) |
| if refmodule!= '': |
| if not refmodule in module_data['import']: |
| module_data['import'].append(refmodule) |
| refstr=refmodule+"."+refstr |
| buff=add_buff(buff,f'{indent}{pr_type}{refstr} {name}',end='') |
| elif "allOf" in data: |
| #print(name,data["allOf"]) |
| buff=typewriter(buff,indent+" ","type /* allOf */",name,data["allOf"][1],module_data) |
| elif "anyOf" in data: |
| enum_found=False |
| enum_data='' |
| for e in data["anyOf"]: |
| if "enum" in e: |
| enum_data=e["enum"] |
| enum_found=True |
| break |
| |
| if enum_found: |
| outstr=f' type enumerated {name}_enum {{' |
| sep=" " |
| for i in enum_data: |
| outstr+=sep+i |
| sep=", " |
| outstr+='}' |
| buff=add_buff(buff,outstr) |
| buff=add_buff(buff,'') |
| buff=add_buff(buff,f' type union {name} {{') |
| buff=add_buff(buff,f' {name}_enum enum_val,') |
| buff=add_buff(buff,' charstring other_val') |
| buff=add_buff(buff,' } with {') |
| buff=add_buff(buff,' variant "JSON: as value"') |
| buff=add_buff(buff,' }') |
| |
| |
| elif "oneOf" in data: |
| if pr_type == "": |
| buff=add_buff(buff,f'{indent}{pr_type}union {{',end='') |
| else: |
| buff=add_buff(buff,f'{indent}{pr_type}union {name} {{',end='') |
| prop_data=data["oneOf"] |
| sep="" |
| i=1 |
| for prop in prop_data: |
| buff=add_buff(buff,sep) |
| sep="," |
| opt_str="" |
| buff=typewriter(buff,indent+" ","",f'field{i}',prop,module_data) |
| i+=1 |
| buff=add_buff(buff,"") |
| buff=add_buff(buff,' } with {') |
| buff=add_buff(buff,' variant "JSON: as value"') |
| buff=add_buff(buff,' }') |
| if pr_type == "": |
| buff=add_buff(buff,name,end='') |
| |
| else: |
| print (f' // skiped {name}') |
| return buff |
| |
| def type_builder(name,data,tree): |
| global type_tree |
| #print("type:", name) |
| element_data={} |
| cname=clean_name(name) |
| element_data["name"]=cname |
| element_data["variant"]=[] |
| if name != cname: |
| element_data["variant"].append("name as '"+name+"'") |
| |
| element_data["nullable"]= "nullable" in data |
| |
| if "type" in data: |
| if data["type"] == "string": |
| if "enum" in data: |
| element_data["type"]="enumerated" |
| element_data["values"]=[] |
| for ev in data["enum"]: |
| #print(ev) |
| cename=clean_name(ev,True) |
| element_data["values"].append(cename) |
| if cename != ev: |
| element_data["variant"].append("text '"+ cename +"' as '"+ev+"'") |
| else: |
| element_data["type"]="charstring" |
| if "pattern" in data: |
| element_data["pattern"]=data["pattern"] |
| |
| elif data["type"] == "integer": |
| restrition=False |
| minstr=" (-infinity.." |
| maxstr="infinity)" |
| if "minimum" in data: |
| restrition=True |
| minstr=f' ({data["minimum"]}..' |
| if "maximum" in data: |
| restrition=True |
| maxstr=f'{data["maximum"]})' |
| if restrition: |
| element_data["restriction"]=minstr+maxstr |
| element_data["type"]="integer" |
| |
| elif data["type"] == "number": |
| element_data["type"]="float" |
| |
| elif data["type"] == "boolean": |
| element_data["type"]="boolean" |
| |
| elif data["type"] == "array": |
| element_data["type"]="record of" |
| element_data["inner_type"]={} |
| field=[] |
| type_builder("",data["items"],field) |
| element_data["inner_type"]=field[0] |
| |
| elif data["type"] == "object": |
| if "properties" in data: |
| element_data["type"]="set" |
| if "required" in data: |
| element_data["mandatory"]=[] |
| for r in data["required"]: |
| element_data["mandatory"].append(clean_name(r)) |
| else: |
| element_data["mandatory"]=[] |
| element_data["fields"]=[] |
| prop_data=data["properties"] |
| for prop in prop_data: |
| field=[] |
| type_builder(prop,prop_data[prop],field) |
| element_data["fields"].append(field[0]) |
| elif "additionalProperties" in data: |
| element_data["type"]="set of" |
| element_data["inner_type"]={} |
| element_data["inner_type"]["type"]="record" |
| element_data["inner_type"]["name"]="" |
| element_data["inner_type"]["nullable"]=False |
| element_data["inner_type"]["mandatory"]=["key"] |
| element_data["inner_type"]["fields"]=[] |
| element_data["inner_type"]["fields"].append({'name':'key', 'type':'universal charstring','nullable':False}) |
| field=[] |
| type_builder("additionalProperties",data["additionalProperties"],field) |
| element_data["inner_type"]["fields"].append(field[0]) |
| element_data["variant"].append("as map") |
| else: |
| element_data["type"]="JSON_Generic.JSON_generic_val" |
| if not "JSON_Generic" in module_data['import']: |
| module_data['import'].append("JSON_Generic") |
| else: |
| element_data["type"]=data["type"] |
| print('!!!!!!unsupported' + data["type"]) |
| |
| elif "$ref" in data: |
| refmodule,refstr=get_module(data["$ref"]) |
| if refmodule!= '': |
| if not refmodule in module_data['import']: |
| module_data['import'].append(refmodule) |
| element_data["type"]=refmodule+"."+clean_name(refstr,True) |
| else: |
| element_data["type"]=clean_name(refstr,True) |
| |
| elif "anyOf" in data: |
| element_data["type"]="union" |
| element_data["fields"]=[] |
| num_str=0 |
| num_enum=0 |
| for e in data["anyOf"]: |
| if ("type" in e ) and (e["type"] == "string"): |
| num_str+=1 |
| if "enum" in e: |
| num_enum+=1 |
| |
| if (num_str==2) and (num_enum==1): |
| for e in data["anyOf"]: |
| if "enum" in e: |
| type_builder(name + "_enum",e,type_tree) |
| element_data["fields"].append({'name':'enum_val', 'type':name + "_enum", 'nullable':False}) |
| element_data["fields"].append({'name':'other_val', 'type':'charstring', 'nullable':False}) |
| element_data["variant"].append("JSON: as value") |
| else: |
| element_data["type"]="union" |
| element_data["variant"].append("JSON: as value") |
| element_data["fields"]=[] |
| i=1 |
| for e in data["anyOf"]: |
| field=[] |
| type_builder(f'field{i}',e,field) |
| i+=1 |
| element_data["fields"].append(field[0]) |
| |
| elif "oneOf" in data: |
| element_data["type"]="union" |
| element_data["variant"].append("JSON: as value") |
| element_data["fields"]=[] |
| i=1 |
| for e in data["oneOf"]: |
| field=[] |
| type_builder(f'field{i}',e,field) |
| i+=1 |
| element_data["fields"].append(field[0]) |
| |
| elif "allOf" in data: |
| element_data["type"]="JSON_Generic.JSON_generic_val" |
| if not "JSON_Generic" in module_data['import']: |
| module_data['import'].append("JSON_Generic") |
| |
| |
| else: |
| element_data["type"]="JSON_Generic.JSON_generic_val" |
| if not "JSON_Generic" in module_data['import']: |
| module_data['import'].append("JSON_Generic") |
| print('!!!!!!unsupported ',name) |
| pprint.pprint(data) |
| |
| if "nullable" in data: |
| if not "JSON_Generic" in module_data['import']: |
| module_data['import'].append("JSON_Generic") |
| element_data2={} |
| element_data2["type"]="union" |
| element_data2["name"]=element_data["name"] |
| element_data["name"]="val" |
| element_data["nullable"]=False |
| element_data2["nullable"]=True |
| element_data2["variant"]=[] |
| element_data2["variant"].append("JSON: as value") |
| element_data2["fields"]=[] |
| element_data2["fields"].append({'name':'null_val', 'type':"JSON_generic_val.JSON_null_val", 'nullable':False}) |
| element_data2["fields"].append(element_data) |
| tree.append(element_data2) |
| else: |
| tree.append(element_data) |
| |
| def print_type(t, lend="\n", top=False): |
| global ident_level |
| global ident_c |
| print(t["type"], " ",end="",sep="") |
| if top and (t["type"] !="record of"): |
| print(t["name"]," ", end="",sep="") |
| if "fields" in t: |
| print(" {",sep="") |
| ident_level+=1 |
| separatot=ident_c*ident_level |
| for f in t["fields"]: |
| print(separatot, end="",sep="") |
| separatot=",\n" + ident_c*ident_level |
| print_type(f,"") |
| if "mandatory" in t: |
| if f["name"] not in t["mandatory"]: |
| print(" optional", end="") |
| if f["nullable"]: |
| print("/* nullable */", end="",sep="") |
| ident_level-=1 |
| print("",sep="") |
| print(ident_c*ident_level,"}",sep="",end=lend) |
| elif "inner_type" in t: |
| print_type(t["inner_type"],"") |
| elif "values" in t: |
| print("{ ",sep="",end="") |
| separatot="" |
| for e in t["values"]: |
| print(separatot, end="",sep="") |
| separatot=", " |
| print(e, end="",sep="") |
| print("}",sep="",end="") |
| |
| if (not top) or (t["type"] =="record of"): |
| print(" ", t["name"],end="",sep="") |
| if "restriction" in t: |
| print(" ",t["restriction"],end="",sep="") |
| print(lend,end="") |
| |
| def gather_variants(t,spec,variants): |
| if "variant" in t: |
| for v in t["variant"]: |
| variants.append({"spec":spec, "var":v}) |
| if "fields" in t: |
| for f in t["fields"]: |
| spec2=spec |
| if spec!="": |
| spec2+="." |
| gather_variants(f,spec2+f["name"],variants) |
| |
| from yaml.constructor import Constructor |
| |
| def add_bool(self, node): |
| return self.construct_scalar(node) |
| |
| Constructor.add_constructor(u'tag:yaml.org,2002:bool', add_bool) |
| |
| f=open(sys.argv[1]) |
| |
| module_name = os.path.splitext(os.path.basename(sys.argv[1]))[0] |
| |
| sys.stdout = open(module_name + ".ttcn",'wt') |
| |
| print("module " + module_name + " {") |
| print("") |
| |
| doc=yaml.load(f) |
| |
| schemas=[] |
| |
| type_tree=[] |
| |
| module_data={'import' : [], "functions" : []} |
| buff="" |
| |
| ident_level=1 |
| ident_c=" " |
| |
| if "components" in doc: |
| if "schemas" in doc["components"]: |
| schemas=doc["components"]["schemas"] |
| for name in schemas: |
| if (name[-2:] == "Rm" ) and (name[:-2] in schemas ): |
| type_tree.append({'name':clean_name(name,True),'type':clean_name(name[:-2],True)}) |
| else: |
| data=schemas[name] |
| type_builder(clean_name(name,True),data,type_tree) |
| #buff=typewriter(buff," ", "type ",name,data,module_data) |
| #buff=add_buff(buff,'') |
| #buff=add_buff(buff,'') |
| if "responses" in doc["components"]: |
| #print(doc["components"]["responses"]) |
| for r in doc["components"]["responses"]: |
| #print(r) |
| process_used_schem_name(doc["components"]["responses"][r],module_data) |
| |
| if 'paths' in doc: |
| for p in doc["paths"]: |
| #print(p) |
| process_path(doc["paths"][p],module_data) |
| |
| |
| for i in module_data['import']: |
| print(f' import from {i} all') |
| print("") |
| |
| for fs in module_data['functions']: |
| f=clean_name(fs,True) |
| print(f'external function f_enc_{f}(in {f} pdu) return octetstring ') |
| print('with { extension "prototype(convert) encode(JSON)" }') |
| print("") |
| print(f'external function f_dec_{f}(in octetstring stream, out {f} pdu) return integer ') |
| print('with { extension "prototype(backtrack) decode(JSON)" }') |
| print("") |
| |
| print("") |
| #pprint.pprint(type_tree) |
| for t in type_tree: |
| print(ident_c*ident_level, "type ",end="",sep="") |
| # ident_level+=1 |
| print_type(t,"", top=True) |
| variants=[] |
| gather_variants(t,"",variants) |
| if variants != []: |
| print(" with {") |
| ident_level+=1 |
| for v in variants: |
| print(ident_c*ident_level, "variant ",end="",sep="") |
| if v["spec"] != "": |
| print("(",v["spec"],") ",end="",sep="") |
| print('"',v["var"],'"',sep="") |
| ident_level-=1 |
| print(ident_c*ident_level, "}",sep="") |
| else: |
| print("") |
| # ident_level-=1 |
| print("") |
| |
| #print(buff) |
| print("") |
| |
| print("") |
| print('') |
| print('} with {') |
| print(' encode "JSON"') |
| print('}') |
| |
| pprint.pprint(type_tree) |