| #// Copyright (c) 2000-2017 Ericsson Telecom AB // |
| #// All rights reserved. This program and the accompanying materials are made available under the terms // |
| #// of the Eclipse Public License v2.0 which accompanies this distribution, and is available at // |
| #// https://www.eclipse.org/org/documents/epl-2.0/EPL-2.0.html // |
| #///////////////////////////////////////////////////////////////////////////////////////////////////////// |
| import random, json, os, time, subprocess, signal, socket, logging, re, string |
| from Common.DsRestAPI import DsRestAPI |
| from Common.WorkspaceHandler import * |
| |
| class DummyProcess: |
| '''A small class that acts like a stopped process.''' |
| def __init__(self, exitcode=0): |
| self._exitcode=exitcode |
| |
| def poll(self): |
| return self._exitcode |
| |
| class PerformanceMonitor: |
| |
| SOURCE_ID = 'PerformanceMonitor' |
| |
| def __init__(self, directory): |
| self._logger = logging.getLogger(__name__) |
| self._directory = directory |
| # tp and state |
| # 0: off -> not started yet |
| # 1: started -> stop button can be pressed |
| # 2: stopped -> start button can be pressed |
| self._config = {} |
| with open(os.path.join(self._directory, 'config.json'), 'r') as f: |
| self._config = json.load(f) |
| if 'Name' not in self._config: |
| self._config['Name'] = socket.gethostname() |
| self._dsRestAPI = DsRestAPI(self._getDataHandler, self._setDataHandler) |
| self._userData = {} |
| self._createNewMonitor(None) |
| self._processes = {} |
| self._loadProcessesOnStart() |
| self._count = 1000 |
| self._offset = -1 |
| self._maxpoints = 1000 |
| self._aggregate = False |
| self._aggregate_method = 'avg' # 'avg' or 'minmax' |
| |
| def _getDataHandler(self, request, userCredentials): |
| if userCredentials['username'] in self._userData: |
| perfMonitor = self._userData[userCredentials['username']] |
| else: |
| perfMonitor = self._createNewMonitor(userCredentials['username']) |
| element = request['element'] |
| params = request['params'] |
| |
| if element == 'help': |
| with open(os.path.join(self._directory, 'help.json'), 'r') as f: |
| dataElements = json.load(f) |
| help = {'sources': [{'source': self.SOURCE_ID, 'dataElements': dataElements}]} |
| return {'node': {'val': json.dumps(help).encode('hex'), 'tp': 5}} |
| elif element == 'Name': |
| return {'node': {'val': self._config['Name'], 'tp': 4}} |
| elif element == 'PerformanceMonitors': |
| list = [{'node': {'val': performanceMonitor, 'tp': 10}} for performanceMonitor in self._config['PerfMonitors']] |
| return {'list': list} |
| elif element == 'Options' and len(params) == 1 and params[0]['paramName'] == 'PerfMonitor' and params[0]['paramValue'] in self._config['PerfMonitors']: |
| if 'options' in self._config['PerfMonitors'][params[0]['paramValue']]: |
| return {'node': {'val': self._config['PerfMonitors'][params[0]['paramValue']]['options'], 'tp': 4}} |
| else: |
| return {'node': {'val': '{}', 'tp': 4}} |
| elif element == 'Workspace' and len(params) == 0: |
| return {'node': {'val': getPathOfDaemonProcesses(os.path.join(self._config['Name'], self.SOURCE_ID)), 'tp': 4}} |
| elif element == 'PMProcesses': |
| list = [{'node': {'val': process, 'tp': 10}} for process in self._processes] |
| list.sort(key = lambda element: self._processes[element['node']['val']]['StartTime']) |
| return {'list': list} |
| elif element == 'ProcessStat' and len(params) == 1 and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes: |
| return {'node': {'val': self._getProcStatOutput(self._config['Name'], os.path.join(self.SOURCE_ID, params[0]['paramValue'])).encode('hex'), 'tp': 5}} |
| elif element == 'ProcessStats' and len(params) == 1 and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes: |
| headerNames = self._getHeaderNames('ProcStat', params[0]['paramValue']) |
| #list = [{'node': {'val': column, 'tp': 10}} for column in headerNames] |
| list = [{'node': {'val': str(column), 'tp': 10}} for column in range(0,len(headerNames))] |
| return {'list': list} |
| elif element == 'ProcessStat' and len(params) > 1 and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes: |
| column=0 |
| for param in params: |
| if param['paramName'] == 'PMProcess': |
| processName=param['paramValue'] |
| elif param['paramName'] == 'Column': |
| column=int(param['paramValue']) |
| |
| if 'timeline' in request: |
| count = -1 |
| offset = -1 |
| maxpoints = -1 |
| aggregate_method = self._aggregate_method |
| if 'rangeFilter' in request: |
| if 'count' in request['rangeFilter']: |
| count = int(request['rangeFilter']['count']) |
| if 'offset' in request['rangeFilter']: |
| offset = int(request['rangeFilter']['offset']) |
| if 'resample' in request['rangeFilter']: |
| maxpoints = int(request['rangeFilter']['resample']) |
| if 'resampleMethod' in request['rangeFilter']: |
| if request['rangeFilter']['resampleMethod'] in ['avg', 'minmax']: |
| aggregate_method = request['rangeFilter']['resampleMethod'] |
| |
| if 'maxpoints' in request['timeline']: |
| maxpoints = int(request['timeline']['maxpoints']) |
| |
| if count == -1: |
| count = self._count |
| if offset == -1: |
| offset = self._offset |
| if maxpoints == -1: |
| maxpoints = self._maxpoints |
| x, y = self._generateTimeline(column, 'ProcStat', processName, count, offset, maxpoints, aggregate_method) |
| return {'node': {'val': json.dumps({"tp":2,"x":x,"y":y}), 'tp': 4}} |
| |
| columnData = self._getColumnData(column, 'ProcStat', processName) |
| return {'node': {'val': columnData, 'tp': 4}} |
| elif element == 'ProcessStatName' and len(params) == 2 and params[1]['paramName'] == 'Column' \ |
| and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes: |
| columnData = self._getHeaderNames('ProcStat', params[0]['paramValue']) |
| if len(columnData)>int(params[1]['paramValue']): |
| columnData=columnData[int(params[1]['paramValue'])] |
| else: |
| columnData = 'N/A' |
| return {'node': {'val': columnData, 'tp': 4}} |
| elif element == 'SystemStat' and len(params) == 1 and params[0]['paramName'] == 'PMProcess' and params[0][ |
| 'paramValue'] in self._processes: |
| return {'node': {'val': self._getSysStatOutput(self._config['Name'], os.path.join(self.SOURCE_ID, params[0][ |
| 'paramValue'])).encode('hex'), 'tp': 5}} |
| elif element == 'SystemStats' and len(params) == 1 and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes: |
| headerNames = self._getHeaderNames('SysStat', params[0]['paramValue']) |
| #list = [{'node': {'val': column, 'tp': 10}} for column in headerNames] |
| list = [{'node': {'val': str(column), 'tp': 10}} for column in range(0,len(headerNames))] |
| return {'list': list} |
| elif element == 'SystemStat' and len(params) > 1 and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes: |
| column=0 |
| for param in params: |
| if param['paramName'] == 'PMProcess': |
| processName=param['paramValue'] |
| elif param['paramName'] == 'Column': |
| column=int(param['paramValue']) |
| |
| if 'timeline' in request: |
| count = -1 |
| offset = -1 |
| maxpoints = -1 |
| aggregate_method = self._aggregate_method |
| if 'rangeFilter' in request: |
| |
| if 'count' in request['rangeFilter']: |
| count = int(request['rangeFilter']['count']) |
| if 'offset' in request['rangeFilter']: |
| offset = int(request['rangeFilter']['offset']) |
| if 'resample' in request['rangeFilter']: |
| maxpoints = int(request['rangeFilter']['resample']) |
| if 'resampleMethod' in request['rangeFilter']: |
| if request['rangeFilter']['resampleMethod'] in ['avg', 'minmax']: |
| aggregate_method = request['rangeFilter']['resampleMethod'] |
| |
| if 'maxpoints' in request['timeline']: |
| maxpoints = int(request['timeline']['maxpoints']) |
| |
| |
| if count == -1: |
| count = self._count |
| if offset == -1: |
| offset = self._offset |
| if maxpoints == -1: |
| maxpoints = self._maxpoints |
| x, y = self._generateTimeline(column, 'SysStat', processName, count, offset, maxpoints, aggregate_method) |
| return {'node': {'val': json.dumps({"tp":2,"x":x,"y":y}), 'tp': 4}} |
| |
| columnData = self._getColumnData(int(params[1]['paramValue']), 'SysStat', params[0]['paramValue']) |
| return {'node': {'val': columnData, 'tp': 4}} |
| elif element == 'SystemStatName' and len(params) == 2 and params[1]['paramName'] == 'Column' \ |
| and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes: |
| columnData = self._getHeaderNames('SysStat', params[0]['paramValue']) |
| if len(columnData)>int(params[1]['paramValue']): |
| columnData=columnData[int(params[1]['paramValue'])] |
| else: |
| columnData = 'N/A' |
| return {'node': {'val': columnData, 'tp': 4}} |
| elif element == 'ProcStatLen' and len(params) == 1 and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes: |
| procId=params[0]['paramValue'] |
| return {'node': {'val': len(self._processes[procId]['ProcStat']['lines'][1:-1]) if 'ProcStat' in self._processes[procId] else 0, 'tp': 4}} |
| elif element == 'SysStatLen' and len(params) == 1 and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes: |
| procId=params[0]['paramValue'] |
| return {'node': {'val': len(self._processes[procId]['SysStat']['lines'][1:-1]) if 'SysStat' in self._processes[procId] else 0, 'tp': 4}} |
| elif element == 'TimelineLen': |
| return {'node': {'val': self._count, 'tp': 4}} |
| elif element == 'MaxPoints': |
| if 'rangeFilter' in request: |
| try: |
| if 'count' in request['rangeFilter']: |
| self._count = int(request['rangeFilter']['count']) |
| if 'offset' in request['rangeFilter']: |
| self._offset = int(request['rangeFilter']['offset']) |
| if 'resample' in request['rangeFilter']: |
| self._maxpoints = int(request['rangeFilter']['resample']) |
| if 'resampleMethod' in request['rangeFilter']: |
| if request['rangeFilter']['resampleMethod'] in ['avg', 'minmax']: |
| self._aggregate_method = request['rangeFilter']['resampleMethod'] |
| except: |
| pass |
| return {'node': {'val': self._maxpoints, 'tp': 4}} |
| elif element == 'Aggregate': |
| return {'node': {'val': 'true' if self._aggregate else 'false', 'tp': 3}} |
| elif element == 'Errors' and len(params) == 1 and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes: |
| return {'node': {'val': self._getErrorOutput(self._config['Name'], os.path.join(self.SOURCE_ID, params[0]['paramValue'])).encode('hex'), 'tp': 5}} |
| elif element == 'Config' and len(params) == 1 and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes: |
| return {'node': {'val': self._processes[params[0]['paramValue']][element], 'tp': 4}} |
| elif element == 'StartTime' and len(params) == 1 and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes: |
| return {'node': {'val': time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(self._processes[params[0]['paramValue']][element])), 'tp': -4}} |
| elif element == 'Stop' and len(params) == 1 and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes: |
| if self._processes[params[0]['paramValue']]['Process'].poll() is None: |
| return {'node': {'val': '0', 'tp': 1}} |
| else: |
| return {'node': {'val': '0', 'tp': -1}} |
| elif element == 'Restart' and len(params) == 1 and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes: |
| if self._processes[params[0]['paramValue']]['Process'].poll() is None: |
| return {'node': {'val': '0', 'tp': -1}} |
| else: |
| return {'node': {'val': '0', 'tp': 1}} |
| elif element in ['Start', 'ClearWorkspace']: |
| return {'node': {'val': '0', 'tp': 1}} |
| elif element in ['Delete'] and len(params) == 1 and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes: |
| if self._processes[params[0]['paramValue']]['Process'].poll() is None: |
| return {'node': {'val': '0', 'tp': -1}} |
| else: |
| return {'node': {'val': '0', 'tp': 1}} |
| elif element == 'Workspace' and len(params) == 1 and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes: |
| return {'node': {'val': getWorkspace(self._config['Name'], os.path.join(self.SOURCE_ID, params[0]['paramValue'])), 'tp': 4}} |
| elif element == 'DownloadWorkspace' and len(params) == 1 and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes: |
| return {'node': {'val': getNameOfDownloadableZip(self._config['Name'], os.path.join(self.SOURCE_ID, params[0]['paramValue'])), 'tp': 4}} |
| elif element == 'Status' and len(params) == 1 and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes: |
| statusCode = self._processes[params[0]['paramValue']]['Process'].poll() |
| val = '' |
| if statusCode is None: |
| return {'node': {'val': '[led:green]running', 'tp': 11}} |
| elif statusCode == 0: |
| return {'node': {'val': '[led:blue]stopped - 0', 'tp': 11}} |
| else: |
| return {'node': {'val': '[led:red]stopped - ' + str(statusCode), 'tp': 11}} |
| elif element == 'GuiAddress' and len(params) == 3 and params[0]['paramName'] == 'IPAddress' and params[1]['paramName'] == 'Port' and params[2]['paramName'] == 'Status': |
| guiAddress = 'http://' + params[0]['paramValue'] + ':' + str(params[1]['paramValue']) |
| guiAddress = 'proxy/' + guiAddress.encode('hex') + '/' |
| guiAddress += '?hideRibbon=true' |
| |
| if 'online' in params[2]['paramValue']: |
| return {'node': {'val': guiAddress, 'tp': 4}} |
| else: |
| return {'node': {'val': guiAddress, 'tp': -4}} |
| |
| def _setDataHandler(self, request, userCredentials): |
| #if userCredentials['username'] is not None: |
| if userCredentials['username'] in self._userData: |
| perfMonitor = self._userData[userCredentials['username']] |
| else: |
| perfMonitor = self._createNewMonitor(userCredentials['username']) |
| element = request['element'] |
| params = request['params'] |
| content = request['content'] |
| |
| if element == 'Start': |
| configs = {} |
| options = {} |
| for param in params: |
| if param['paramName'] == 'PerfMonitor': |
| perfMonitor = param['paramValue'] |
| elif param['paramName'] == 'Options': |
| options = json.loads(param['paramValue']) |
| else: |
| return {'node': {'val': 'Wrong parameter for: ' + element, 'tp': 4}} |
| self._start(perfMonitor, options) |
| return {'node': {'val': '0', 'tp': 1}} |
| |
| elif element == 'Options' and len(params) == 1 and params[0]['paramName'] == 'PerfMonitor' and params[0]['paramValue'] in self._config['PerfMonitors']: |
| options = {} |
| try: |
| options = json.dumps(json.loads(content)) |
| self._config['PerfMonitors'][params[0]['paramValue']]['options'] = options |
| except Exception as e: |
| self._logger.error('Wrong format: '+ content+':'+str(e)) |
| |
| return {'node': {'val': content, 'tp': 4}} |
| |
| elif element == 'Stop' and len(params) == 1 and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes: |
| self._stop(perfMonitor, params[0]['paramValue']) |
| return {'node': {'val': '0', 'tp': 1}} |
| |
| elif element == 'Restart' and len(params) == 1 and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes: |
| self._restart(params[0]['paramValue']) |
| return {'node': {'val': '0', 'tp': 1}} |
| |
| elif element == 'Delete' and len(params) == 1 and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes: |
| if self._processes[params[0]['paramValue']]['Process'].poll() is None: |
| return {'node': {'val': '0', 'tp': -1}} |
| else: |
| self._processes.pop(params[0]['paramValue']) |
| deleteWorkspace(self._config['Name'], os.path.join(self.SOURCE_ID, params[0]['paramValue'])) |
| return {'node': {'val': '0', 'tp': 1}} |
| |
| elif element == 'DownloadWorkspace' and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes: |
| zip = createDownloadableZip(self._config['Name'], os.path.join(self.SOURCE_ID, params[0]['paramValue'])) |
| return {'node': {'val': zip, 'tp': 4}} |
| |
| elif element == 'ClearWorkspace': |
| clearWorkspace(os.path.join(self._config['Name'], self.SOURCE_ID), self._processes) |
| return {'node': {'val': '0', 'tp': 1}} |
| |
| elif element == 'TimelineLen': |
| self._count = int(content) |
| return {'node': {'val': self._count, 'tp': 4}} |
| |
| elif element == 'MaxPoints': |
| self._maxpoints = int(content) |
| return {'node': {'val': self._maxpoints, 'tp': 4}} |
| |
| elif element == 'Aggregate': |
| self._aggregate = ( content.lower() == 'true' ) |
| return {'node': {'val': content, 'tp': 3}} |
| |
| def _createNewMonitor(self, username): |
| self._userData[username] = { |
| } |
| |
| def _start(self, perfMonitor, options): |
| #start performance monitoring |
| self._logger.info('Starting performance measurement for perfMonitor: '+perfMonitor+' options: '+json.dumps(options)) |
| self._startProcess(perfMonitor, options) |
| |
| def _stop(self, perfMonitor, processId): |
| #stop performance monitoring |
| self._logger.info('Performance measurement stopped') |
| self._killProcess(processId) |
| self._processes[processId]['EndTime'] = time.time() |
| |
| def _restart(self, processId): |
| #restart performance monitoring |
| perfMonitor=self._processes[processId]['PerformanceMonitor'] |
| options=self._processes[processId]['Options'] |
| self._logger.info('Restarting performance measurement for perfMonitor: '+perfMonitor+' options: '+json.dumps(options)) |
| self._processes.pop(processId) |
| self._startProcess(perfMonitor, options) |
| |
| def _loadProcessesOnStart(self): |
| path = getPathOfDaemonProcesses(self._config['Name']) |
| path = os.path.join(path,self.SOURCE_ID) |
| if os.path.exists(path): |
| for item in os.listdir(path): |
| try: |
| with open(os.path.join(path, item, 'processDescriptor.json'), 'r') as f: |
| descriptor = json.load(f) |
| self._processes[item] = { |
| 'Process': DummyProcess(0 if 'exitcode' not in descriptor else descriptor['exitcode']), |
| 'StartTime': os.stat(os.path.join(path, item)).st_ctime, |
| 'PerformanceMonitor': descriptor['performanceMonitor'], |
| 'Options': descriptor['options'] |
| } |
| except: |
| pass |
| |
| def _getProcessId(self, performanceMonitor): |
| i = 0 |
| processId = performanceMonitor + '_' + str(i) |
| while processId in self._processes: |
| i += 1 |
| processId = performanceMonitor + '_' + str(i) |
| return processId |
| |
| def _startProcess(self, performanceMonitor, options): |
| processId = self._getProcessId(performanceMonitor) |
| workspace = self._saveConfig(processId, performanceMonitor, options) |
| |
| process = {} |
| process['Process'] = self._startSubProcess(processId, self._startCommand(processId, performanceMonitor, workspace, options)) |
| process['StartTime'] = time.time() |
| process['PerformanceMonitor'] = performanceMonitor |
| process['Options'] = options |
| self._processes[processId] = process |
| |
| def _startSubProcess(self, processId, command): |
| #pass |
| f_stdout=open(getConsolePath(self._config['Name'], os.path.join(self.SOURCE_ID, processId)), 'a') |
| f_stderr=open(self._getErrorPath(self._config['Name'], os.path.join(self.SOURCE_ID, processId)), 'a') |
| f_stderr.write('Starting command `'+command+'`\n\n') |
| f_stderr.flush() |
| return subprocess.Popen(command, shell = True, preexec_fn=os.setsid, stdout=f_stdout, stderr=f_stderr) |
| |
| def _killProcess(self, processId): |
| process=self._processes[processId]['Process'] |
| if process and process.poll() is None: |
| os.killpg(os.getpgid(self._processes[processId]['Process'].pid), signal.SIGTERM) |
| self._logger.info('Killed PerfMon instance: ' + processId) |
| |
| def _startCommand(self, processId, performanceMonitor, workspace, options): |
| if performanceMonitor not in self._config['PerfMonitors']: |
| performanceMonitor = 'default' |
| |
| command = self._config['PerfMonitors'][performanceMonitor]['cmd'] |
| |
| for option in options: |
| if type(options[option]) == type(True): |
| command += ' ' + option.split(':', 1)[0] |
| else: |
| command += ' ' + option.split(':', 1)[0] + ' ' + options[option] |
| |
| #replace <workspace> with the actual directory: |
| command = self._substituteWorkspace(command, workspace) |
| self._logger.info('Starting performance monitor instance: ' + command) |
| return command |
| |
| def _substituteWorkspace(self, command, workspace): |
| workspaceRepl=re.compile(r'<workspace>') |
| return workspaceRepl.sub(workspace, command) |
| |
| def _saveConfig(self, processId, performanceMonitor, options): |
| try: |
| workspace = getWorkspace(self._config['Name'], os.path.join(self.SOURCE_ID, processId)) |
| deleteWorkspace(self._config['Name'], os.path.join(self.SOURCE_ID, processId)) |
| os.makedirs(workspace) |
| descriptor = { |
| 'performanceMonitor': performanceMonitor, |
| 'options': options |
| } |
| with open(os.path.join(workspace, 'processDescriptor.json'), 'w') as f: |
| json.dump(descriptor, f, indent=4) |
| return workspace |
| except Exception as e: |
| self._logger.exception('Failed to save config files: ' + self._config['Name'] + ' ' + processId) |
| return '' |
| |
| def _saveExitcode(self, processId): |
| exitcode=self._processes[processId]['Process'].poll() |
| if exitcode: |
| workspace = getWorkspace(self._config['Name'], os.path.join(self.SOURCE_ID, processId)) |
| configFile = os.path.join(workspace, 'processDescriptor.json') |
| try: |
| with open(configFile, 'r') as f: |
| descriptor = json.load(f) |
| if 'exitcode' not in descriptor: |
| descriptor['exitcode'] = exitcode |
| f.close() |
| with open(configFile, 'w') as f: |
| json.dump(descriptor, f, indent=4) |
| except: |
| pass |
| |
| def _getOutput(self, fileName, numberOfLinesToRead=50, fromTail=True, currentPos=0): |
| try: |
| with open(fileName, 'r') as f: |
| if currentPos != 0: |
| f.seek(currentPos) |
| lines = f.readlines() if fromTail else [f.readline() for i in range(numberOfLinesToRead)] |
| if len(lines) > numberOfLinesToRead and numberOfLinesToRead > 0: |
| val = ''.join(lines[-numberOfLinesToRead:]) if fromTail else ''.join(lines[:numberOfLinesToRead]) |
| else: |
| val = ''.join(lines) |
| pos = f.tell() |
| except Exception as e: |
| raise e |
| return val, pos |
| |
| def _getErrorPath(self, daemonName, processId): |
| return os.path.join(getWorkspace(daemonName, processId), 'stderr.out') |
| |
| def _getErrorOutput(self, daemonName, processId, numberOfLinesToRead = 50): |
| val = '' |
| try: |
| val,pos = self._getOutput(self._getErrorPath(daemonName, processId), numberOfLinesToRead) |
| except Exception as e: |
| val = '\n'.join(['Failed to open error.log', daemonName, processId, str(e)]) |
| #remove the process if stderr output not found (it should be there always) |
| #remove path from processID: |
| self._processes.pop(os.path.basename(processId)) |
| return val |
| |
| def _getSysStatPath(self, daemonName, processId): |
| return os.path.join(getWorkspace(daemonName, processId), 'sysstat.out') |
| |
| def _getStatOutput(self, statType, daemonName, processId, numberOfLinesToRead = 50, fromTail=True): |
| val = '' |
| if statType not in ['SysStat', 'ProcStat']: |
| raise 'Invalid statType: ' + statType |
| |
| getPath = self._getSysStatPath if statType == 'SysStat' else getConsolePath |
| procId=os.path.basename(processId) |
| currentPos = 0 if statType not in self._processes[procId |
| ] else 0 if 'currentPos' not in self._processes[procId][statType |
| ] else self._processes[procId][statType]['currentPos'] |
| if fromTail: |
| linesToReadFromFile = -1 # all until EOF |
| else: |
| currentPos = 0 # read from beginning |
| linesToReadFromFile = numberOfLinesToRead |
| try: |
| if not fromTail and statType in self._processes[procId] and len(self._processes[procId][statType]['lines'])>=linesToReadFromFile: |
| val = '\n'.join([ '\t'.join(line) for line in self._processes[procId][statType]['lines'][:numberOfLinesToRead] ]) |
| else: |
| val,pos = self._getOutput(getPath(daemonName, processId), linesToReadFromFile, fromTail, currentPos) |
| if not fromTail and statType not in self._processes[procId] and pos > currentPos: |
| self._processes[procId][statType] = {'currentPos': pos, 'lines': [ string.split(line, '\t') for line in string.split(val, '\n') ]} |
| |
| if (fromTail and pos>currentPos): |
| if statType not in self._processes[procId]: |
| self._processes[procId][statType] = {'currentPos': pos, 'lines': [ string.split(line, '\t') for line in string.split(val, '\n') ]} |
| else: |
| buffer='' if len( self._processes[procId][statType]['lines']) == 0 else '\t'.join(self._processes[procId][statType]['lines'].pop()) |
| self._processes[procId][statType]['currentPos'] = pos |
| self._processes[procId][statType]['lines'] += [ string.split(line, '\t') for line in string.split(buffer+val, '\n') ] |
| val = '' if pos == 0 \ |
| else '\n'.join([ '\t'.join(line) for line in self._processes[procId][statType]['lines'][-numberOfLinesToRead:] ]) if fromTail \ |
| else '\n'.join([ '\t'.join(line) for line in self._processes[procId][statType]['lines'][:numberOfLinesToRead] ]) |
| except Exception as e: |
| raise e |
| return val |
| |
| def _getSysStatOutput(self, daemonName, processId, numberOfLinesToRead = 50, fromTail=True): |
| val = '' |
| try: |
| val = self._getStatOutput('SysStat', daemonName, processId, numberOfLinesToRead, fromTail) |
| |
| except Exception as e: |
| val = '\n'.join(['System Statistics is not available for ' + processId + ' on ' + daemonName, str(e), |
| 'To enable System Statistics, the process should write to the file <workspace>/sysstat.out', |
| 'Note, that <workspace> in the command will be replaced with the actual value']) |
| return val |
| |
| def _getProcStatOutput(self, daemonName, processId, numberOfLinesToRead = 50, fromTail=True): |
| val = '' |
| try: |
| val = self._getStatOutput('ProcStat', daemonName, processId, numberOfLinesToRead, fromTail) |
| |
| except Exception as e: |
| val = '\n'.join(['Process Statistics is not available for ' + processId + ' on ' + daemonName, str(e), |
| 'To enable System Statistics, the process should write to the file <workspace>/console.out', |
| 'Note, that <workspace> in the command will be replaced with the actual value']) |
| return val |
| |
| def _getHeaderNames(self, statType, procName): |
| try: |
| consoleOutput = self._getStatOutput(statType, self._config['Name'], |
| os.path.join(self.SOURCE_ID, procName), numberOfLinesToRead=1, |
| fromTail=False) |
| except: pass |
| procId = procName |
| headerNames = [] if statType not in self._processes[procId] or len(self._processes[procId][statType]['lines']) == 0\ |
| else self._processes[procId][statType]['lines'][0] |
| return headerNames |
| |
| def _getColumnData(self, column, statType, procName): |
| try: |
| consoleOutput = self._getStatOutput(statType, self._config['Name'], |
| os.path.join(self.SOURCE_ID, procName), numberOfLinesToRead=1) |
| except: pass |
| |
| procId = procName |
| lastRow = [] if procId not in self._processes or statType not in self._processes[procId] or len(self._processes[procId][statType]['lines']) < 2\ |
| else self._processes[procId][statType]['lines'][len(self._processes[procId][statType]['lines'])-2] |
| return lastRow[column] if len(lastRow) > column else 'N/A' |
| |
| def _generateTimeline(self, column, statType, processName, count, offset, maxpoints, aggregate_method): |
| ''' |
| if count > 0 it returns the data in timeline format (x,y) with |
| 1) the last count number of elements (offset == -1) |
| 2) count number of elements from offset |
| when aggregate is True the whole data list is aggregated into timelineLen long list by averaging the elements |
| if count == 0 it returns the whole list |
| if count < 0 |
| 1) it returns all elements from the -count-th element (offset == -1) |
| 2) it returns count elements from the back (offset is also measured from the back) |
| |
| if maxpoints > 0 and count>maxpoints: data is resampled to have at most maxpoints elements |
| if maxpoints <= 0 : no resampling is done |
| ''' |
| aggregate = self._aggregate |
| try: |
| consoleOutput = self._getStatOutput(statType, self._config['Name'], os.path.join(self.SOURCE_ID, processName), |
| numberOfLinesToRead=1) |
| except: pass |
| |
| procId = processName |
| lines = self._processes[procId][statType]['lines'][1:-1] # cut the header and buffer lines |
| nofLines = len(lines) |
| |
| lines = [] if statType not in self._processes[procId]\ |
| else lines[-count:] if offset == -1 or count == 0\ |
| else lines[count:] if offset == 0 and count < 0\ |
| else lines[-offset+count:-offset] if count < 0\ |
| else lines[offset:offset+count] |
| nofLines = len(lines) |
| |
| if aggregate and maxpoints > 0 and ((count>0 and count>maxpoints and nofLines > maxpoints) or (count <= 0 and nofLines>maxpoints)): |
| aggregate = True |
| else: |
| aggregate = False |
| |
| if not aggregate or statType not in self._processes[procId]: |
| rows = lines |
| else: |
| if aggregate_method == 'minmax': |
| maxpoints //= 2 |
| aggregationStep = float(nofLines) / maxpoints |
| #self._logger.info(statType+' noflines: '+str(nofLines)+ ' timelineLen: '+str(timelineLen) + ' aggregation: '+ str(aggregation) + |
| # ' last index: '+str( (nofLines // aggregation)*aggregation-1 ) + ' nofLines // aggregation: '+str(nofLines // aggregation)) |
| rows = [] |
| currentLineIdx=0 |
| aggregationLimit = 0 |
| for j in range(maxpoints): |
| if aggregate_method == 'avg': |
| sum = 0 |
| elif aggregate_method == 'minmax': |
| min = None |
| max = None |
| maxIdx = None |
| minIdx = None |
| else: |
| # unsupported method |
| pass |
| nofAggregatedLines = 0 |
| aggregationLimit += aggregationStep |
| while currentLineIdx<aggregationLimit and currentLineIdx < nofLines: |
| currentLine=lines[currentLineIdx] |
| try: |
| if aggregate_method == 'avg': |
| sum += float(currentLine[column]) if column < len(currentLine) \ |
| else 0 |
| elif aggregate_method == 'minmax': |
| if column < len(currentLine): |
| currenValue = float(currentLine[column]) |
| if max is None or currenValue > max: |
| max, maxIdx = currenValue, currentLineIdx |
| if min is None or currenValue < min: |
| min, minIdx = currenValue, currentLineIdx |
| else: |
| # unsupported method |
| pass |
| except: |
| if aggregate_method == 'avg': |
| if currentLineIdx >= nofLines: |
| self._logger.info('len: '+str(nofLines)+' index: ' + str(currentLineIdx)) |
| if column >= len(currentLine): |
| self._logger.info('len: '+str(len(currentLine))+' column: ' + str(column)) |
| sum = currentLine[column] if sum == 0 else 0 |
| elif aggregate_method == 'minmax': |
| max, maxIdx = 0, currentLineIdx |
| min, minIdx = 0, currentLineIdx |
| else: |
| # unsupported method |
| pass |
| nofAggregatedLines += 1 |
| currentLineIdx += 1 |
| |
| if aggregate_method == 'avg': |
| if type(sum) == type(1.0): |
| sum = "%.4f" % (sum/nofAggregatedLines) |
| rows += [[sum]] if type(sum) == type(1.0) else [[sum]] |
| elif aggregate_method == 'minmax': |
| if max is not None and min is not None: |
| max = "%.4f" % max |
| min = "%.4f" % min |
| rows += [[max], [min]] if maxIdx < minIdx else [[min], [max]] |
| else: |
| # unsupported method |
| pass |
| |
| column = 0 # data is written into the 0-th column |
| #x = [] |
| y = [] |
| for row in rows: |
| # x.append(row[0]) |
| if column < len(row): |
| y.append(row[column]) |
| x = range(1, len(y) + 1) |
| return x, y |
| |
| def handleMessage(self, method, path, headers, body, userCredentials, response): |
| response['body'] = json.dumps(self._dsRestAPI.parseRequest(json.loads(body), userCredentials)) |
| response['headers']['Content-Type'] = 'application/json' |
| |
| def getDataSourceHandlers(self): |
| return { |
| self.SOURCE_ID: { |
| 'getDataHandler': self._getDataHandler, |
| 'setDataHandler': self._setDataHandler |
| } |
| } |
| |
| def close(self): |
| #self._runThread = False |
| for processId in self._processes: |
| self._killProcess(processId) |
| self._saveExitcode(processId) |