blob: 54b72ace5e45521c6e6c91cc795be5e01e38e1a3 [file] [log] [blame]
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#///////////////////////////////////////////////////////////////////////////////
#// Copyright (c) 2000-2019 Ericsson Telecom AB //
#// //
#// All rights reserved. This program and the accompanying materials //
#// are made available under the terms of the Eclipse Public License v2.0 //
#// which accompanies this distribution, and is available at //
#// https://www.eclipse.org/org/documents/epl-2.0/EPL-2.0.html //
#///////////////////////////////////////////////////////////////////////////////
import sys
import urllib2
import json
class DsRestAPI:
__url = "http://localhost:4001/"
def __init__(self, p_url="", p_useProxy=True):
self.__useProxy = p_useProxy
if p_url is not None and p_url is not "":
if p_url[-1] != '/':
p_url += '/'
self.__url = p_url
self.__url += "api.dsapi"
self._urlOpener = urllib2.build_opener(urllib2.HTTPHandler(), urllib2.ProxyHandler({} if not p_useProxy else None))
def __httpcall(self, postdata, method = 'POST'):
'''
Does a HTTP Request against the specified URL, with the optional POSTDATA piggyback, setting the method to the optional HTTP Verb.
Returns the HTTP Response body.
'''
if not isinstance(postdata, basestring):
postdata = json.dumps(postdata)
req = urllib2.Request(self.__url, postdata, {'Content-Type': 'application/json'})
req.add_header('Connection', 'close')
req.get_method = lambda: method
try:
return self._urlOpener.open(req).read()
except Exception as e:
msg = 'Error connecting to server: ' + str(e)
proxyList = [name[0:-5] for name in dir(urllib2.ProxyHandler()) if name.endswith('open')];
if 'http' in proxyList and self.__useProxy:
msg += '\\nThe error may be caused by wrong proxy settings. Current possible poxys are: \\n' + ' '.join(proxyList)
msg += '\\nPlease try to use -noproxy option.'
return '{"contentList":[{"node":{"val":"' + msg + '", "tp":4}}]}'
def getList(self, request):
'''Returns the response to the request as a json object'''
response = json.loads(self.getContentList(request))
if "contentList" in response:
response = response["contentList"]
return response
def getContentList(self, request):
'''Returns the response to the request as a string.'''
dsCall = {"requests": request, "timeOut": 5.0};
return self.__httpcall(dsCall)
def get(self, request):
'''Returns the first element of the response to the request as a json object.'''
return self.getList(request)[0]
def getData(self, aSource, aElement, aParams = None, aPtcName = None):
'''Creates a getData request from the parameters and returns its response object, i.e. either a {list: [...]} or a {node: {...}} as a json object.'''
return self.get( [ {"getData": {"source": aSource, "element": aElement, "ptcname": aPtcName, "params": aParams}} ] )
def setData(self, aSource, aElement, aContent, aType, aParams = None, aPtcName = None, aIndxs = None):
'''Creates a setData request from the parameters and returns its response object, i.e. either a {list: [...]} or a {node: {...}} as a json object.'''
return self.get( [ {"setData": {"source": aSource, "element": aElement, "ptcname": aPtcName, "params": aParams, "content": aContent, "indxsInList": ([int(aIndxs)] if aIndxs is not None else None), "tp": int(aType or 0)}} ] )
def getHelp(self, aSource = None, aElement = None):
'''Returns the help as a json object.'''
if aSource is not None and aElement is not None:
help = self.getData("DataSource", "help", [ {"paramName": "Format", "paramValue": "JSON"}, {"paramName": "Source", "paramValue": aSource }, {"paramName": "Element", "paramValue": aElement} ] )
elif aSource is not None:
help = self.getData("DataSource", "help", [ {"paramName": "Format", "paramValue": "JSON"}, {"paramName": "Source", "paramValue": aSource } ] )
else:
help = self.getData("DataSource", "help", [ {"paramName": "Format", "paramValue": "JSON"} ] )
txt = help["node"]["val"]
try:
help["node"]["val"] = json.loads(''.join(chr(int(''.join(c), 16)) for c in zip(txt[0::2], txt[1::2]))) # http://stackoverflow.com/a/9641593/357403
return json.dumps(help, indent = 2, check_circular = False)
except Exception as e:
return help
def usage():
print "Usage:"
print sys.argv[0], "-?"
print sys.argv[0], "[-u <URL>] [-noproxy] -h [source [element]]"
print sys.argv[0], "[-u <URL>] [-noproxy] -f file"
print sys.argv[0], "[-u <URL>] [-noproxy] -j jsonString"
print sys.argv[0], "[-u <URL>] [-noproxy] -source <Source> -element <Element> [-ptcname <PTCName>] [-params <ParamName1 ParamValue1 ParamName2 ParamValue2 ...>] [-content <Content> -tp <Tp> [-indxsInList index1 index2]]"
print "All parameters that are sent to the server are case sensitive."
print "Legend:"
print "\t-?\t\tLocal usage"
print "\t-u\t\tThe URL of the remote server, it is http://localhost:4001 by default."
print "\t-noproxy\tSet this option to disable proxy usage."
print "\t-h\t\tHelp from remote server. Accepts 2 optional arguments to filter on Source, or on Source-specific Element."
print "\t-f\t\tLoad request from a file."
print "\t-j\t\tThe json string as the request. WARNING: use single ' to enclose the string."
print ""
print "List of types for the -tp parameter and their meaning:"
print "\t 1: intType"
print "\t 2: floatType"
print "\t 3: boolType"
print "\t 4: charstringType"
print "\t 5: octetstringType"
print "\t 6: hexstringType"
print "\t 7: bitstringType"
print "\t 8: integerlistType"
print "\t 9: floatlistType"
print "\t10: charstringlistType"
print "\t11: statusLEDType"
print ""
print "Examples:"
print "DataSource help:\t", sys.argv[0], ' -h'
print "Source filtered help:\t", sys.argv[0], ' -h ExecCtrl'
print "Help for an element:\t", sys.argv[0], ' -h ExecCtrl ScGrpStart'
print "Simple GetData request:\t", sys.argv[0], ' -source ExecCtrl -element Progressbar'
print "Use different address:\t", sys.argv[0], ' -u http://127.0.0.1:9876 -source ExecCtrl -element Progressbar'
print "Use without proxy:\t", sys.argv[0], ' -u http://127.0.0.1:4001 -noproxy -source ExecCtrl -element Progressbar'
print "GetData with params:\t", sys.argv[0], ' -source ExecCtrl -element ScGrpStart -params ScenarioGroup ScGroup1'
print "SetData example:\t", sys.argv[0], ' -source ExecCtrl -element ScGrpStart -content true -tp 3 -params ScenarioGroup ScGroup1'
print "GetData from JSON:\t", sys.argv[0], ' -j \'[{"getData": {"source": "ExecCtrl", "element": "ScGrpStart", "params": [{"paramName": "ScenarioGroup", "paramValue": "ScGroup1"}]}}]\''
print "SetData from JSON:\t", sys.argv[0], ' -j \'[{"setData": {"source": "ExecCtrl", "element": "ScGrpStart", "params": [{"paramName": "ScenarioGroup", "paramValue": "ScGroup1"}], "tp": 3, "content": "true"}}]\''
print "Request from file:\t", sys.argv[0], ' -f path/to/request/request.json'
def printHumanReadableResponse(response):
if "list" in response:
print [str(listElement["node"]["val"]) for listElement in response["list"]]
else:
print response["node"]["val"]
def DsRestAPIPythonAPIMain(argv):
argv = argv[1:]
url = ''
if '-u' in argv:
i = argv.index('-u')
url = argv[i + 1]
argv.pop(i)
argv.pop(i)
proxies = True
if '-noproxy' in argv:
proxies = False
i = argv.index('-noproxy')
argv.pop(i)
api = DsRestAPI(url, proxies)
if '-?' in argv or len(argv) == 0:
usage()
elif '-g' in argv or '-s' in argv:
print '-g and -s parameters are not supported anymore due to an interface rationalization. Please update your request according to the new behavior.'
elif '-h' in argv:
i = argv.index('-h')
if i + 2 < len(argv):
print api.getHelp(argv[i+1], argv[i+2])
elif i + 1 < len(argv):
print api.getHelp(argv[i+1])
else:
print api.getHelp()
elif '-f' in argv:
fileName = argv[argv.index('-f') + 1]
f = open(fileName, 'r')
request = json.load(f)
f.close()
print api.getContentList(request)
elif '-j' in argv:
jsonString = argv[argv.index('-j') + 1]
request = json.loads(jsonString)
print api.getContentList(request)
else:
requests = []
request = {}
if '-content' in argv and '-tp' in argv:
requests.append({'setData': request})
else:
requests.append({'getData': request})
i = 0
while i < len(argv):
if i + 1 >= len(argv):
print 'Suspicious parameter detected:', argv[i]
i += 1
elif argv[i] == '-source' and not argv[i+1].startswith('-'):
request['source'] = argv[i+1]
i += 2
elif argv[i] == '-element' and not argv[i+1].startswith('-'):
request['element'] = argv[i+1]
i += 2
elif argv[i] == '-ptcname' and not argv[i+1].startswith('-'):
request['ptcname'] = argv[i+1]
i += 2
elif argv[i] == '-content' and not argv[i+1].startswith('-'):
request['content'] = argv[i+1]
i += 2
elif argv[i] == '-tp' and not argv[i+1].startswith('-'):
request['tp'] = int(argv[i+1])
i += 2
elif argv[i] == '-params':
request['params'] = []
i += 1
while i + 1 < len(argv) and not argv[i].startswith('-') and not argv[i+1].startswith('-'):
request['params'].append({'paramName': argv[i], 'paramValue': argv[i+1]})
i += 2
elif argv[i] == 'indxsInList':
request['indxsInList'] = []
i += 1
while i < len(argv) and not argv[i].startswith('-'):
request['indxsInList'].append(int(argv[i]))
i += 1
else:
print 'Suspicious parameter detected:', argv[i]
i += 1
printHumanReadableResponse(api.get(requests))
return 0
if __name__ == "__main__":
errCode = -1;
try:
errCode = DsRestAPIPythonAPIMain(sys.argv)
except Exception as e:
print 'Error while processing your request: ' + str(e)
sys.exit(errCode)