blob: 47deb816c4cda345c62512e2cfd373e71aca8cf0 [file] [log] [blame]
import yaml
import sys
import os
import pprint
import re
# Data structure used during conversion
# modules - map of the module_name:module_data
# module_data - map of the detailed data of one module:
# * import: list of the impoted modules
# * functions: list of the encoder functions to be generated
# * type_tree: list of the type definitions
# * extra_lists: list of the extra record of's and encoder functions
# Global variable, they always the reference to the actually processed module.
# modules
# module_data
# type_tree
# module_name
# ident_level
# ident_c
# saved_stdout
# Reserved word list. Used to sanitaze the type and field names.
ttcn_keywords=[ # Reserved words
"action","activate","address","alive","all","alt","altstep","and","and4b","any","anytype",
"bitstring","boolean","break","case","call","catch","char","charstring","check","clear","complement",
"component","connect","const","continue","control","create","deactivate","decmatch","default","disconnect",
"display","do","done","else","encode","enumerated","error","except","exception","execute","extends",
"extension","external","fail","false","float","for","friend","from","function","getverdict","getcall",
"getreply","goto","group","halt","hexstring","if","ifpresent","import","in","inconc","infinity",
"inout","integer","interleave","kill","killed","label","language","length","log","map","match","message",
"mixed","mod","modifies","module","modulepar","mtc","noblock","none","not","not_a_number","not4b","nowait",
"null","octetstring","of","omit","on","optional","or","or4b","out","override","param","pass","pattern","permutation",
"port","present","private","procedure","public","raise","read","receive","record","recursive","rem","repeat",
"reply","return","running","runs","select","self","send","sender","set","setencode","setverdict","signature","start",
"stop","subset","superset","system","template","testcase","timeout","timer","to","trigger","true","type","union","universal",
"unmap","value","valueof","var","variant","verdicttype","while","with","xor","xor4b","bit2hex","bit2int","bit2oct","bit2str",
"char2int","float2int","hex2bit","hex2int","hex2oct","hex2str","int2bit","int2char","int2float","int2hex","int2oct","int2str",
"int2unichar","ischosen","ispresent","lengthof","oct2bit","oct2hex","oct2int","oct2str","regexp","rnd","sixeof","str2int",
"str2oct","substr","unichar2int","replace",
# Reserved by extension packages
"apply", "assert", "at", "configuration", "conjunct", "cont",", ""delta", "disjunct", "duration", "finished", "history", "implies",
"inv", "mode", "notinv", "now", "onentry", "onexit", "par", "prev", "realtime", "seq", "setstate", "static", "stepsize", "stream",
"timestamp", "until", "values", "wait",
# OO reserved
"class", "finally", "object","this"
]
# Converts a number to the word representation.
# Used to replace the leading numbers to letters in the names
# Example: 5qi -> this function is used to replace the 5 to five
def numToWords(num):
units = ['','one','two','three','four','five','six','seven','eight','nine']
teens = ['','eleven','twelve','thirteen','fourteen','fifteen','sixteen', \
'seventeen','eighteen','nineteen']
tens = ['','ten','twenty','thirty','forty','fifty','sixty','seventy', \
'eighty','ninety']
thousands = ['','thousand','million','billion','trillion','quadrillion', \
'quintillion','sextillion','septillion','octillion', \
'nonillion','decillion','undecillion','duodecillion', \
'tredecillion','quattuordecillion','sexdecillion', \
'septendecillion','octodecillion','novemdecillion', \
'vigintillion']
words = []
if num==0: words.append('zero')
else:
numStr = '%d'%num
numStrLen = len(numStr)
groups = (numStrLen+2)//3
numStr = numStr.zfill(groups*3)
for i in range(0,groups*3,3):
h,t,u = int(numStr[i]),int(numStr[i+1]),int(numStr[i+2])
g = groups-(i//3+1)
if h>=1:
words.append(units[h])
words.append('hundred')
if t>1:
words.append(tens[t])
if u>=1: words.append(units[u])
elif t==1:
if u>=1: words.append(teens[u])
else: words.append(tens[t])
else:
if u>=1: words.append(units[u])
if (g>=1) and ((h+t+u)>0): words.append(thousands[g])
return ''.join(words)
# Adds the module 'new_module' to the import list of the module if it is not in the list
def addImport(new_module):
if (new_module != module_name) and (not new_module in module_data['import']):
module_data['import'].append(new_module)
# Converts the identifiers to a valid TTCN-3 identifiers
# - Ensure that the identifier starts with letters
# * The whole identifier is a number -> Adds "Num_" prefix
# * Starts with a number -> convert the initial number part to word -> 5qi -> fiveqi
# - Contains only letters, numbers and underscore
# * Replaces every other charaters with underscore
# * The caller can add extra charatesr to the list of the valid charcters in the 'extrachr' parameter
# - Postfix the reserved words with underscore
# - Capitalize the first character for type names
def clean_name(instr, typename=False, extrachr=""):
#print(instr)
if instr in ttcn_keywords:
return instr + "_"
if not isinstance(instr, str):
instr=str(instr)
if instr.isdigit():
instr = "Num_" + instr
m = re.search('(^\d+)(.*)',instr)
if m:
instr = numToWords(int(m.group(1))) + m.group(2)
if typename:
a = instr[:1].upper()+instr[1:]
instr = a
elif instr[:1] =="_":
a=instr[1:]
instr =a
rx='[^a-zA-Z0-9_' + extrachr + ']'
return re.sub(rx,'_',instr)
# Returns the module and the referenced type name as tuplefrom the $ref
#
# local reference: '#/components/schemas/SnssaiExtension' -> '', 'SnssaiExtension'
# remote reference: 'TS29571_CommonData.yaml#/components/schemas/Ncgi' -> 'TS29571_CommonData', 'Ncgi'
#
def get_module(ref):
if ref[0]=='#':
return '', ref.split('/')[-1]
else:
return ref.split("#")[0].replace(".yaml",""),ref.split('/')[-1]
# The process_path, process_schema, and process_used_schem_name functions
# parses the paths: parts of the yaml definitions and collects the type refences
# Encoder and decoder functions are needed for the referenced types as they are used
# in API messages directly.
# Also creates prototype for possible arrays
#
# If the generation of encoder function or array definiton is missed these functions
# should be updated to find the missing references.
#
def process_path(data):
for m in data:
#print("process_path ", m, " " , data[m])
if m == "parameters":
for p in data[m]:
process_used_schem_name(p)
else:
if "parameters" in data[m]:
for p in data[m]["parameters"]:
process_used_schem_name(p)
if "requestBody" in data[m]:
process_used_schem_name(data[m]["requestBody"])
if "responses" in data[m]:
for r in data[m]["responses"]:
process_used_schem_name(data[m]["responses"][r])
if "callbacks" in data[m]:
for c in data[m]["callbacks"]:
for p in data[m]["callbacks"][c]:
process_path(data[m]["callbacks"][c][p])
def process_schema(schema):
global module_data
if "$ref" in schema:
refmodule,refstr=get_module(schema["$ref"])
if refmodule!= '':
addImport(refmodule)
refstr=refmodule+"."+refstr
if not refstr in module_data['functions']:
module_data['functions'].append(refstr)
elif "type" in schema:
if schema["type"] == "array" :
if "$ref" in schema["items"] :
refmodule,refstr=get_module(schema["items"]["$ref"]) # This direct code generation should be moved out from here.
if not (refmodule,refstr) in module_data['extra_lists']:
module_data['extra_lists'].append((refmodule,refstr))
# print("// " + refmodule + " type record of " + refstr + " " +refstr + "_list")
# print(f'// external function f_enc_{refstr}_list(in {refstr}_list pdu) return octetstring ')
# print('// with { extension "prototype(convert) encode(JSON)" }')
# print("")
# print(f'// external function f_dec_{refstr}_list(in octetstring stream, out {refstr}_list pdu) return integer ')
# print('// with { extension "prototype(backtrack) decode(JSON)" }')
# print("")
if schema["type"] == "object" :
if "properties" in schema:
if "jsonData" in schema["properties"]:
process_schema(schema["properties"]["jsonData"])
def process_used_schem_name(data):
#print("process_used_schem_name", data)
if "content" in data:
for ct in data["content"]:
#print("ct ", ct)
if "schema" in data["content"][ct]:
#print("schema ", data["content"][ct]["schema"])
process_schema(data["content"][ct]["schema"])
if "schema" in data:
process_schema(data["schema"])
# Processes one schema definition and build the data structure needed for code generation.
# The processed data is appended to the type_tree list.
# All processing of the schema definition is done here, except the resolution of the allOf
# The allOf should be reprocessed after the schema processing, because the fields needs to be collected from the processed data.
def type_builder(name,data,tree):
global type_tree
#print("type:", name)
element_data={}
cname=clean_name(name)
element_data["name"]=cname
element_data["variant"]=[]
if name != cname:
element_data["variant"].append("name as '"+name+"'")
element_data["nullable"]= "nullable" in data
if "type" in data:
if data["type"] == "string":
if "enum" in data:
element_data["type"]="enumerated"
element_data["values"]=[]
for ev in data["enum"]:
#print(ev)
cename=clean_name(ev,True)
element_data["values"].append(cename)
if cename != ev:
element_data["variant"].append("text '"+ cename +"' as '"+str(ev)+"'")
else:
element_data["type"]="charstring"
if "pattern" in data:
element_data["pattern"]=data["pattern"]
elif data["type"] == "integer":
restrition=False
minstr=" (-infinity.."
maxstr="infinity)"
if "minimum" in data:
restrition=True
minstr=f' ({data["minimum"]}..'
if "maximum" in data:
restrition=True
maxstr=f'{data["maximum"]})'
if restrition:
element_data["restriction"]=minstr+maxstr
element_data["type"]="integer"
elif data["type"] == "number":
element_data["type"]="float"
elif data["type"] == "boolean":
element_data["type"]="boolean"
elif data["type"] == "array":
element_data["type"]="record of"
element_data["inner_type"]={}
field=[]
type_builder("",data["items"],field)
element_data["inner_type"]=field[0]
elif data["type"] == "object":
if "properties" in data:
element_data["type"]="set"
if "required" in data:
element_data["mandatory"]=[]
for r in data["required"]:
element_data["mandatory"].append(clean_name(r))
else:
element_data["mandatory"]=[]
element_data["fields"]=[]
prop_data=data["properties"]
for prop in prop_data:
field=[]
type_builder(prop,prop_data[prop],field)
element_data["fields"].append(field[0])
elif "additionalProperties" in data:
element_data["type"]="set of"
element_data["inner_type"]={}
element_data["inner_type"]["type"]="record"
element_data["inner_type"]["name"]=""
element_data["inner_type"]["nullable"]=False
element_data["inner_type"]["mandatory"]=["key"]
element_data["inner_type"]["fields"]=[]
element_data["inner_type"]["fields"].append({'name':'key', 'type':'universal charstring','nullable':False})
field=[]
type_builder("additionalProperties",data["additionalProperties"],field)
element_data["inner_type"]["fields"].append(field[0])
element_data["variant"].append("as map")
else:
element_data["type"]="JSON_Generic.JSON_generic_val"
addImport("JSON_Generic")
else:
element_data["type"]=data["type"]
#print('!!!!!!unsupported' + data["type"])
elif "$ref" in data:
refmodule,refstr=get_module(data["$ref"])
if refmodule!= '':
addImport(refmodule)
element_data["type"]=refmodule+"."+clean_name(refstr,True)
else:
element_data["type"]=clean_name(refstr,True)
elif "anyOf" in data:
element_data["type"]="union"
element_data["fields"]=[]
num_str=0
num_enum=0
for e in data["anyOf"]:
if ("type" in e ) and (e["type"] == "string"):
num_str+=1
if "enum" in e:
num_enum+=1
if (num_str==2) and (num_enum==1):
for e in data["anyOf"]:
if "enum" in e:
type_builder(name + "_enum",e,type_tree)
element_data["fields"].append({'name':'enum_val', 'type':name + "_enum", 'nullable':False})
element_data["fields"].append({'name':'other_val', 'type':'charstring', 'nullable':False})
element_data["variant"].append("JSON: as value")
else:
element_data["type"]="union"
element_data["variant"].append("JSON: as value")
element_data["fields"]=[]
i=1
for e in data["anyOf"]:
field=[]
type_builder(f'field{i}',e,field)
i+=1
element_data["fields"].append(field[0])
elif "oneOf" in data:
element_data["type"]="union"
element_data["variant"].append("JSON: as value")
element_data["fields"]=[]
i=1
for e in data["oneOf"]:
field=[]
type_builder(f'field{i}',e,field)
i+=1
element_data["fields"].append(field[0])
elif "allOf" in data:
element_data["allOf"]=True
element_data["fields"]=[]
i=0
for e in data["allOf"]:
if "$ref" in e:
refmodule,refstr=get_module(e["$ref"])
element_data["fields"].append({"ref":True,"refmodule":refmodule,"refstr":clean_name(refstr,True)})
else:
field=[]
type_builder(f'field{i}',e,field)
i+=1
element_data["fields"].append(field[0])
element_data["type"]="JSON_Generic.JSON_generic_val"
addImport("JSON_Generic")
else:
element_data["type"]="JSON_Generic.JSON_generic_val"
addImport("JSON_Generic")
#print('!!!!!!unsupported ',name)
#pprint.pprint(data)
if "nullable" in data:
addImport("JSON_Generic")
element_data2={}
element_data2["type"]="union"
element_data2["name"]=element_data["name"]
element_data["name"]="val"
element_data["nullable"]=False
element_data2["nullable"]=True
element_data2["variant"]=[]
element_data2["variant"].append("JSON: as value")
element_data2["fields"]=[]
element_data2["fields"].append({'name':'null_val', 'type':"JSON_generic_val.JSON_null_val", 'nullable':False})
element_data2["fields"].append(element_data)
tree.append(element_data2)
else:
tree.append(element_data)
# Finds the data of the type or return None
def find_type(tname):
for t in type_tree:
if t["name"] == tname:
return t
return None
# Generates the TTCN-3 type definition from the processed type data
def print_type(t, lend="\n", top=False):
global ident_level
global ident_c
if "allOf" in t: # The allOf should be reprocessed to collect the actual fields.
element_data = {}
element_data["name"]=t["name"]
element_data["type"]="set"
element_data["fields"]=[]
element_data["variant"]=[]
element_data["mandatory"]=[]
is_ok=True
for f in t["fields"]: # the "fields" contains the reference or the data of the to be merged objects
if "ref" in f: # Collect fields from the referenced type
if f["refmodule"] == "" :
it=find_type(f["refstr"])
if it != None:
element_data["fields"]+= it["fields"]
element_data["variant"]+= it["variant"]
element_data["mandatory"]+= it["mandatory"]
else:
is_ok=False
else:
print("// Please add the fields from " + f["refmodule"] + "." + f["refstr"])
elif f["type"] == "set": # Collect fields from the directly defined object
element_data["fields"]+= f["fields"]
element_data["variant"]+= f["variant"]
element_data["mandatory"]+= f["mandatory"]
else:
is_ok=False
if is_ok: # The field collection was successfull, generate the code
print_type(element_data,lend,top)
t["fields"]=element_data["fields"]
else: # the allOf can't be resolved, use generic JSON type
del t["fields"] # remove the "fields" list, teh type_builder already filled the type data for the generic JSON type, just use it
print_type(t,lend,top)
else:
print(t["type"], " ",end="",sep="")
if top and (t["type"] !="record of"):
print(t["name"]," ", end="",sep="")
if "fields" in t:
print(" {",sep="")
ident_level+=1
separatot=ident_c*ident_level
for f in t["fields"]:
print(separatot, end="",sep="")
separatot=",\n" + ident_c*ident_level
print_type(f,"")
if "mandatory" in t:
if f["name"] not in t["mandatory"]:
print(" optional", end="")
ident_level-=1
print("",sep="")
print(ident_c*ident_level,"}",sep="",end=lend)
elif "inner_type" in t:
print_type(t["inner_type"],"")
elif "values" in t:
print("{ ",sep="",end="")
separatot=""
for e in t["values"]:
print(separatot, end="",sep="")
separatot=", "
print(e, end="",sep="")
print("}",sep="",end="")
if (not top) or (t["type"] =="record of"):
print(" ", t["name"],end="",sep="")
if "restriction" in t:
print(" ",t["restriction"],end="",sep="")
print(lend,end="")
# Gather the variants for the type, builds the variant references
def gather_variants(t,spec,variants):
if "variant" in t:
for v in t["variant"]:
variants.append({"spec":spec, "var":v})
if "fields" in t:
for f in t["fields"]:
spec2=spec
if spec!="":
spec2+="."
gather_variants(f,spec2+f["name"],variants)
# By default the yaml loader parses the Yes, No as bool value instead of string
# The bool constructor is overridden to return them as string
from yaml.constructor import Constructor
def add_bool(self, node):
return self.construct_scalar(node)
Constructor.add_constructor(u'tag:yaml.org,2002:bool', add_bool)
# Processes the content of one yaml file
# Fills the module_data.
# The code is not generated here.
# doc - loaded yaml file
# The global variable should be set
def process_one_module(doc):
print("Processing " + module_name)
if "components" in doc:
if "schemas" in doc["components"]:
schemas=doc["components"]["schemas"]
for name in schemas:
# The type name suffix 'Rm' is used for a nullable type alias
# Generate the nullable structure but use the reference to the original type
if (name[-2:] == "Rm" ) and (name[:-2] in schemas ):
type_tree.append({'fields': [{'name': 'null_val',
'nullable': False,
'type': 'JSON_generic_val.JSON_null_val'},
{'name': 'val',
'nullable': False,
'type': clean_name(name[:-2],True),
'variant': []}],
'name': clean_name(name,True),
'nullable': True,
'type': 'union',
'variant': ['JSON: as value']})
addImport("JSON_Generic")
elif name == "NullValue": # Special type, used for nullable enums
type_tree.append({'type': 'JSON_generic_val.JSON_null_val','name':'NullValue','nullable': True})
else:
# Normal type schema processing.
data=schemas[name]
type_builder(clean_name(name,True),data,type_tree)
if "responses" in doc["components"]:
#print(doc["components"]["responses"])
for r in doc["components"]["responses"]:
#print(r)
process_used_schem_name(doc["components"]["responses"][r])
if 'paths' in doc:
for p in doc["paths"]:
#print(p)
process_path(doc["paths"][p])
print("Processing completed")
# Generates the code of one module
# The global variable should be set
def generate_one_module():
global ident_level
print("Generating code for " + module_name)
fout = open(module_name + ".ttcn",'wt')
sys.stdout = fout
print("module " + module_name + " {")
print("")
for i in module_data['extra_lists']:
refmodule, refstr = i
print("// " + refmodule + " type record of " + refstr + " " +refstr + "_list")
print(f'// external function f_enc_{refstr}_list(in {refstr}_list pdu) return octetstring ')
print('// with { extension "prototype(convert) encode(JSON)" }')
print("")
print(f'// external function f_dec_{refstr}_list(in octetstring stream, out {refstr}_list pdu) return integer ')
print('// with { extension "prototype(backtrack) decode(JSON)" }')
print("")
for i in module_data['import']:
print(f' import from {i} all')
print("")
for fs in module_data['functions']:
f=clean_name(fs,True,".")
if "." in f:
pre="// "
else:
pre=""
print(pre+f'external function f_enc_{f}(in {f} pdu) return octetstring ')
print(pre+'with { extension "prototype(convert) encode(JSON)" }')
print("")
print(pre+f'external function f_dec_{f}(in octetstring stream, out {f} pdu) return integer ')
print(pre+'with { extension "prototype(backtrack) decode(JSON)" }')
print("")
print("")
#pprint.pprint(type_tree)
for t in type_tree:
print(ident_c*ident_level, "type ",end="",sep="")
# ident_level+=1
print_type(t,"", top=True)
variants=[]
gather_variants(t,"",variants)
if variants != []:
print(" with {")
ident_level+=1
for v in variants:
print(ident_c*ident_level, "variant ",end="",sep="")
if v["spec"] != "":
print("(",v["spec"],") ",end="",sep="")
print('"',v["var"],'"',sep="")
ident_level-=1
print(ident_c*ident_level, "}",sep="")
else:
print("")
# ident_level-=1
print("")
print("")
print("")
print('')
print('} with {')
print(' encode "JSON"')
print('}')
sys.stdout = saved_stdout
fout.close
print("Code generation completed")
# Main code starts here:
# global variables
saved_stdout = sys.stdout
module_data = {}
type_tree= []
modules = {}
ident_level=1
ident_c=" "
module_name = ""
for filename in sys.argv[1:]:
try:
f=open(filename)
except:
print("Can't open the file: ", filename)
pprint.pprint(sys.exc_info()[1])
quit()
print("File opened: " + filename)
module_name = os.path.splitext(os.path.basename(filename))[0]
ydoc=yaml.load(f)
# Already processed modules can add functions and type to the module
if module_name not in modules:
modules[module_name]={'import' : [], "functions" : [], "type_tree": [], "extra_lists":[]}
module_data=modules[module_name]
type_tree=module_data["type_tree"]
process_one_module(ydoc)
for module_name in modules:
module_data=modules[module_name]
type_tree=module_data["type_tree"]
generate_one_module()
#pprint.pprint(module_data)