blob: d4baac001f902c9196a9ef440f487ca084e37ec3 [file] [log] [blame]
#// Copyright (c) 2000-2019 Ericsson Telecom AB //
#// All rights reserved. This program and the accompanying materials are made available under the terms //
#// of the Eclipse Public License v2.0 which accompanies this distribution, and is available at //
#// https://www.eclipse.org/org/documents/epl-2.0/EPL-2.0.html //
#/////////////////////////////////////////////////////////////////////////////////////////////////////////
import sys, json, threading, urllib2, time, logging, re, cookielib, ssl
class ScheduledPlaylist:
def __init__(self, descriptor, customCallbackForResponse = None):
self._logger = logging.getLogger(__name__)
self._status = {'node': {'val': '', 'tp': 11}}
self._finished = 0
self._descriptor = descriptor
self._callback = customCallbackForResponse
self._stopEvent = threading.Event()
self._continueEvent = threading.Event()
self._continueEvent.set()
self._urlOpeners = []
self._threads = []
def _waitOrExit(self):
if self._stopEvent.isSet():
return True
self._continueEvent.wait()
if self._stopEvent.isSet():
return True
else:
return False
def _wait(self, amount):
time.sleep(amount)
def _checkCondition(self, url, opener, expression):
requests = [
{
'getData': {
'source': 'DataSource',
'element': 'Sources',
'filter': expression
}
}
]
response = self._sendRequestsToUrl(opener, url, requests)
if 'node' in response['contentList'][0] and response['contentList'][0]['node']['tp'] == 0:
return False
else:
return True
def _sendRequestsToUrl(self, opener, url, requests):
return json.loads(opener.open(urllib2.Request(url, json.dumps({'requests': requests}))).read())
def _sendRequests(self, url, opener, requests):
if len(requests) > 0:
if 'getData' in requests[0]:
self._status['node']['val'] = '[led:green]Running - Last action: ' + requests[0]['getData']['element']
elif 'setData' in requests[0]:
self._status['node']['val'] = '[led:green]Running - Last action: ' + requests[0]['setData']['element']
return self._sendRequestsToUrl(opener, url, requests)
def _processPlaylist(self, url, opener, action):
try:
if 'condition' in action and 'expression' in action['condition']:
tries = 0
startTime = time.time()
while True:
if self._waitOrExit():
return
self._continueEvent.wait()
tries += 1
if self._checkCondition(url, opener, action['condition']['expression']):
break
elif ('cancelingTimeout' in action['condition'] and time.time() - startTime > action['condition']['cancelingTimeout']) or ('numberOfExecutions' in action['condition'] and tries > action['condition']['numberOfExecutions']):
self.stop()
self._status['node']['val'] = '[led:red]Timeout'
return
if 'evaluatingPeriod' in action['condition']:
self._wait(action['condition']['evaluatingPeriod'])
if 'startTime' in action:
self._wait(action['startTime'])
if self._waitOrExit():
return
response = self._sendRequests(url, opener, action['requests'])
if self._waitOrExit():
return
self._playlistFinished(action['id'])
if self._callback is not None:
self._callback(response)
except:
self._logger.exception('Error during playlist execution')
self.stop()
self._status['node']['val'] = '[led:red]Error'
def _playlistFinished(self, id):
self._finished += 1
if self._finished == len(self._threads):
self._status['node']['val'] = self._status['node']['val'].replace('[led:green]Running', '[led:blue]Finished')
self._logout()
if id is not None:
for thread in self._threads:
if 'relativeTo' in thread and id in thread['relativeTo']:
thread['relativeTo'].remove(id)
if len(thread['relativeTo']) == 0:
thread['thread'].start()
def _getBaseUrl(self, url):
match = re.search(r'\w/', url)
if match:
return url[:match.start() + 1]
else:
return url
def _logout(self):
for url, opener in self._urlOpeners:
opener.open(urllib2.Request(self._getBaseUrl(url) + '/logout/api.authenticate'))
def _login(self, opener, url, userCredentials):
try:
opener.open(urllib2.Request(self._getBaseUrl(url) + '/login/api.authenticate', '{"username": "' + userCredentials['username'] + '", "password": "' + userCredentials['password'] + '"}'))
self._urlOpeners.append((url, opener))
except:
pass
def start(self, userCredentials):
for playlistObject in self._descriptor:
url = playlistObject['apiUrl']
try:
opener = urllib2.build_opener(urllib2.HTTPHandler(), urllib2.HTTPSHandler(context = ssl._create_unverified_context()), urllib2.HTTPCookieProcessor(cookielib.CookieJar()), urllib2.ProxyHandler({}))
except:
opener = urllib2.build_opener(urllib2.HTTPHandler(), urllib2.HTTPSHandler(), urllib2.HTTPCookieProcessor(cookielib.CookieJar()), urllib2.ProxyHandler({}))
if userCredentials['username'] is not None and userCredentials['password'] is not None:
self._login(opener, url, userCredentials)
playlist = playlistObject['playlist']
for action in playlist:
if 'id' not in action:
action['id'] = None
requests = action['requests']
thread = threading.Thread(target = self._processPlaylist, args = (url, opener, action))
thread.daemon = True
self._threads.append({'thread': thread})
if 'relativeTo' not in action or len(action['relativeTo']) == 0:
thread.start()
else:
self._threads[-1]['relativeTo'] = action['relativeTo']
if len(self._threads) == 0:
self._status['node']['val'] = '[led:blue]Finished'
else:
self._status['node']['val'] = '[led:green]Running'
def stop(self):
self._stopEvent.set()
self._continueEvent.set()
self._status['node']['val'] = '[led:black]Stopped'
self._logout()
def pause(self):
if self._continueEvent.isSet():
self._continueEvent.clear()
self._status['node']['val'] = self._status['node']['val'].replace('[led:green]Running', '[led:yellow]Paused')
else:
self._continueEvent.set()
self._status['node']['val'] = self._status['node']['val'].replace('[led:yellow]Paused', '[led:green]Running')
def isRunning(self):
for thread in self._threads:
if thread['thread'].isAlive():
return True
return False
def getStatus(self):
return self._status
if __name__ == '__main__':
def printResponse(response):
print json.dumps(response)
playlistDescriptor = []
with open(sys.argv[1], 'r') as f:
playlistDescriptor = json.load(f)
scheduledPlaylist = ScheduledPlaylist(playlistDescriptor, printResponse)
scheduledPlaylist.start({'username': 'admin', 'password': 'admin', 'groups': set(['admin'])})
time.sleep(3)
while scheduledPlaylist.isRunning():
time.sleep(1)