| #// Copyright (c) 2000-2018 Ericsson Telecom AB // |
| #// All rights reserved. This program and the accompanying materials are made available under the terms // |
| #// of the Eclipse Public License v2.0 which accompanies this distribution, and is available at // |
| #// https://www.eclipse.org/org/documents/epl-2.0/EPL-2.0.html // |
| #///////////////////////////////////////////////////////////////////////////////////////////////////////// |
| import sys, json, threading, urllib2, time, logging, re, cookielib, ssl |
| |
| class ScheduledPlaylist: |
| |
| def __init__(self, descriptor, customCallbackForResponse = None): |
| self._logger = logging.getLogger(__name__) |
| self._status = {'node': {'val': '', 'tp': 11}} |
| self._finished = 0 |
| self._descriptor = descriptor |
| self._callback = customCallbackForResponse |
| self._stopEvent = threading.Event() |
| self._continueEvent = threading.Event() |
| self._continueEvent.set() |
| self._urlOpeners = [] |
| self._threads = [] |
| |
| def _waitOrExit(self): |
| if self._stopEvent.isSet(): |
| return True |
| self._continueEvent.wait() |
| if self._stopEvent.isSet(): |
| return True |
| else: |
| return False |
| |
| def _wait(self, amount): |
| time.sleep(amount) |
| |
| def _checkCondition(self, url, opener, expression): |
| requests = [ |
| { |
| 'getData': { |
| 'source': 'DataSource', |
| 'element': 'Sources', |
| 'filter': expression |
| } |
| } |
| ] |
| response = self._sendRequestsToUrl(opener, url, requests) |
| if 'node' in response['contentList'][0] and response['contentList'][0]['node']['tp'] == 0: |
| return False |
| else: |
| return True |
| |
| def _sendRequestsToUrl(self, opener, url, requests): |
| return json.loads(opener.open(urllib2.Request(url, json.dumps({'requests': requests}))).read()) |
| |
| def _sendRequests(self, url, opener, requests): |
| if len(requests) > 0: |
| if 'getData' in requests[0]: |
| self._status['node']['val'] = '[led:green]Running - Last action: ' + requests[0]['getData']['element'] |
| elif 'setData' in requests[0]: |
| self._status['node']['val'] = '[led:green]Running - Last action: ' + requests[0]['setData']['element'] |
| return self._sendRequestsToUrl(opener, url, requests) |
| |
| def _processPlaylist(self, url, opener, action): |
| try: |
| if 'condition' in action and 'expression' in action['condition']: |
| tries = 0 |
| startTime = time.time() |
| while True: |
| if self._waitOrExit(): |
| return |
| self._continueEvent.wait() |
| tries += 1 |
| if self._checkCondition(url, opener, action['condition']['expression']): |
| break |
| elif ('cancelingTimeout' in action['condition'] and time.time() - startTime > action['condition']['cancelingTimeout']) or ('numberOfExecutions' in action['condition'] and tries > action['condition']['numberOfExecutions']): |
| self.stop() |
| self._status['node']['val'] = '[led:red]Timeout' |
| return |
| if 'evaluatingPeriod' in action['condition']: |
| self._wait(action['condition']['evaluatingPeriod']) |
| if 'startTime' in action: |
| self._wait(action['startTime']) |
| if self._waitOrExit(): |
| return |
| response = self._sendRequests(url, opener, action['requests']) |
| if self._waitOrExit(): |
| return |
| self._playlistFinished(action['id']) |
| if self._callback is not None: |
| self._callback(response) |
| |
| except: |
| self._logger.exception('Error during playlist execution') |
| self.stop() |
| self._status['node']['val'] = '[led:red]Error' |
| |
| def _playlistFinished(self, id): |
| self._finished += 1 |
| if self._finished == len(self._threads): |
| self._status['node']['val'] = self._status['node']['val'].replace('[led:green]Running', '[led:blue]Finished') |
| self._logout() |
| if id is not None: |
| for thread in self._threads: |
| if 'relativeTo' in thread and id in thread['relativeTo']: |
| thread['relativeTo'].remove(id) |
| if len(thread['relativeTo']) == 0: |
| thread['thread'].start() |
| |
| def _getBaseUrl(self, url): |
| match = re.search(r'\w/', url) |
| if match: |
| return url[:match.start() + 1] |
| else: |
| return url |
| |
| def _logout(self): |
| for url, opener in self._urlOpeners: |
| opener.open(urllib2.Request(self._getBaseUrl(url) + '/logout/api.authenticate')) |
| |
| def _login(self, opener, url, userCredentials): |
| try: |
| opener.open(urllib2.Request(self._getBaseUrl(url) + '/login/api.authenticate', '{"username": "' + userCredentials['username'] + '", "password": "' + userCredentials['password'] + '"}')) |
| self._urlOpeners.append((url, opener)) |
| except: |
| pass |
| |
| def start(self, userCredentials): |
| for playlistObject in self._descriptor: |
| url = playlistObject['apiUrl'] |
| try: |
| opener = urllib2.build_opener(urllib2.HTTPHandler(), urllib2.HTTPSHandler(context = ssl._create_unverified_context()), urllib2.HTTPCookieProcessor(cookielib.CookieJar()), urllib2.ProxyHandler({})) |
| except: |
| opener = urllib2.build_opener(urllib2.HTTPHandler(), urllib2.HTTPSHandler(), urllib2.HTTPCookieProcessor(cookielib.CookieJar()), urllib2.ProxyHandler({})) |
| if userCredentials['username'] is not None and userCredentials['password'] is not None: |
| self._login(opener, url, userCredentials) |
| playlist = playlistObject['playlist'] |
| for action in playlist: |
| if 'id' not in action: |
| action['id'] = None |
| requests = action['requests'] |
| thread = threading.Thread(target = self._processPlaylist, args = (url, opener, action)) |
| thread.daemon = True |
| self._threads.append({'thread': thread}) |
| if 'relativeTo' not in action or len(action['relativeTo']) == 0: |
| thread.start() |
| else: |
| self._threads[-1]['relativeTo'] = action['relativeTo'] |
| if len(self._threads) == 0: |
| self._status['node']['val'] = '[led:blue]Finished' |
| else: |
| self._status['node']['val'] = '[led:green]Running' |
| |
| def stop(self): |
| self._stopEvent.set() |
| self._continueEvent.set() |
| self._status['node']['val'] = '[led:black]Stopped' |
| self._logout() |
| |
| def pause(self): |
| if self._continueEvent.isSet(): |
| self._continueEvent.clear() |
| self._status['node']['val'] = self._status['node']['val'].replace('[led:green]Running', '[led:yellow]Paused') |
| else: |
| self._continueEvent.set() |
| self._status['node']['val'] = self._status['node']['val'].replace('[led:yellow]Paused', '[led:green]Running') |
| |
| def isRunning(self): |
| for thread in self._threads: |
| if thread['thread'].isAlive(): |
| return True |
| return False |
| |
| def getStatus(self): |
| return self._status |
| |
| if __name__ == '__main__': |
| def printResponse(response): |
| print json.dumps(response) |
| |
| playlistDescriptor = [] |
| with open(sys.argv[1], 'r') as f: |
| playlistDescriptor = json.load(f) |
| scheduledPlaylist = ScheduledPlaylist(playlistDescriptor, printResponse) |
| scheduledPlaylist.start({'username': 'admin', 'password': 'admin', 'groups': set(['admin'])}) |
| time.sleep(3) |
| while scheduledPlaylist.isRunning(): |
| time.sleep(1) |