blob: 4a152d7246a32dcf1423723ba80565f3c3995cd1 [file] [log] [blame]
import random, json, os, time, subprocess, signal, socket, logging, re, string
from Common.DsRestAPI import DsRestAPI
from Common.WorkspaceHandler import *
class DummyProcess:
'''A small class that acts like a stopped process.'''
def __init__(self, exitcode=0):
self._exitcode=exitcode
def poll(self):
return self._exitcode
class PerformanceMonitor:
SOURCE_ID = 'PerformanceMonitor'
def __init__(self, directory):
self._logger = logging.getLogger(__name__)
self._directory = directory
# tp and state
# 0: off -> not started yet
# 1: started -> stop button can be pressed
# 2: stopped -> start button can be pressed
self._config = {}
with open(os.path.join(self._directory, 'config.json'), 'r') as f:
self._config = json.load(f)
if 'Name' not in self._config:
self._config['Name'] = socket.gethostname()
self._dsRestAPI = DsRestAPI(self._getDataHandler, self._setDataHandler)
self._userData = {}
self._createNewMonitor(None)
self._processes = {}
self._loadProcessesOnStart()
self._count = 1000
self._offset = -1
self._maxpoints = 1000
self._aggregate = False
self._aggregate_method = 'avg' # 'avg' or 'minmax'
def _getDataHandler(self, request, userCredentials):
if userCredentials['username'] in self._userData:
perfMonitor = self._userData[userCredentials['username']]
else:
perfMonitor = self._createNewMonitor(userCredentials['username'])
element = request['element']
params = request['params']
if element == 'help':
with open(os.path.join(self._directory, 'help.json'), 'r') as f:
dataElements = json.load(f)
help = {'sources': [{'source': self.SOURCE_ID, 'dataElements': dataElements}]}
return {'node': {'val': json.dumps(help).encode('hex'), 'tp': 5}}
elif element == 'Name':
return {'node': {'val': self._config['Name'], 'tp': 4}}
elif element == 'PerformanceMonitors':
list = [{'node': {'val': performanceMonitor, 'tp': 10}} for performanceMonitor in self._config['PerfMonitors']]
return {'list': list}
elif element == 'Options' and len(params) == 1 and params[0]['paramName'] == 'PerfMonitor' and params[0]['paramValue'] in self._config['PerfMonitors']:
if 'options' in self._config['PerfMonitors'][params[0]['paramValue']]:
return {'node': {'val': self._config['PerfMonitors'][params[0]['paramValue']]['options'], 'tp': 4}}
else:
return {'node': {'val': '{}', 'tp': 4}}
elif element == 'Workspace' and len(params) == 0:
return {'node': {'val': getPathOfDaemonProcesses(os.path.join(self._config['Name'], self.SOURCE_ID)), 'tp': 4}}
elif element == 'PMProcesses':
list = [{'node': {'val': process, 'tp': 10}} for process in self._processes]
list.sort(key = lambda element: self._processes[element['node']['val']]['StartTime'])
return {'list': list}
elif element == 'ProcessStat' and len(params) == 1 and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes:
return {'node': {'val': self._getProcStatOutput(self._config['Name'], os.path.join(self.SOURCE_ID, params[0]['paramValue'])).encode('hex'), 'tp': 5}}
elif element == 'ProcessStats' and len(params) == 1 and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes:
headerNames = self._getHeaderNames('ProcStat', params[0]['paramValue'])
#list = [{'node': {'val': column, 'tp': 10}} for column in headerNames]
list = [{'node': {'val': str(column), 'tp': 10}} for column in range(0,len(headerNames))]
return {'list': list}
elif element == 'ProcessStat' and len(params) > 1 and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes:
column=0
for param in params:
if param['paramName'] == 'PMProcess':
processName=param['paramValue']
elif param['paramName'] == 'Column':
column=int(param['paramValue'])
if 'timeline' in request:
count = -1
offset = -1
maxpoints = -1
aggregate_method = self._aggregate_method
if 'rangeFilter' in request:
if 'count' in request['rangeFilter']:
count = int(request['rangeFilter']['count'])
if 'offset' in request['rangeFilter']:
offset = int(request['rangeFilter']['offset'])
if 'resample' in request['rangeFilter']:
maxpoints = int(request['rangeFilter']['resample'])
if 'resampleMethod' in request['rangeFilter']:
if request['rangeFilter']['resampleMethod'] in ['avg', 'minmax']:
aggregate_method = request['rangeFilter']['resampleMethod']
if 'maxpoints' in request['timeline']:
maxpoints = int(request['timeline']['maxpoints'])
if count == -1:
count = self._count
if offset == -1:
offset = self._offset
if maxpoints == -1:
maxpoints = self._maxpoints
x, y = self._generateTimeline(column, 'ProcStat', processName, count, offset, maxpoints, aggregate_method)
return {'node': {'val': json.dumps({"tp":2,"x":x,"y":y}), 'tp': 4}}
columnData = self._getColumnData(column, 'ProcStat', processName)
return {'node': {'val': columnData, 'tp': 4}}
elif element == 'ProcessStatName' and len(params) == 2 and params[1]['paramName'] == 'Column' \
and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes:
columnData = self._getHeaderNames('ProcStat', params[0]['paramValue'])
if len(columnData)>int(params[1]['paramValue']):
columnData=columnData[int(params[1]['paramValue'])]
else:
columnData = 'N/A'
return {'node': {'val': columnData, 'tp': 4}}
elif element == 'SystemStat' and len(params) == 1 and params[0]['paramName'] == 'PMProcess' and params[0][
'paramValue'] in self._processes:
return {'node': {'val': self._getSysStatOutput(self._config['Name'], os.path.join(self.SOURCE_ID, params[0][
'paramValue'])).encode('hex'), 'tp': 5}}
elif element == 'SystemStats' and len(params) == 1 and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes:
headerNames = self._getHeaderNames('SysStat', params[0]['paramValue'])
#list = [{'node': {'val': column, 'tp': 10}} for column in headerNames]
list = [{'node': {'val': str(column), 'tp': 10}} for column in range(0,len(headerNames))]
return {'list': list}
elif element == 'SystemStat' and len(params) > 1 and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes:
column=0
for param in params:
if param['paramName'] == 'PMProcess':
processName=param['paramValue']
elif param['paramName'] == 'Column':
column=int(param['paramValue'])
if 'timeline' in request:
count = -1
offset = -1
maxpoints = -1
aggregate_method = self._aggregate_method
if 'rangeFilter' in request:
if 'count' in request['rangeFilter']:
count = int(request['rangeFilter']['count'])
if 'offset' in request['rangeFilter']:
offset = int(request['rangeFilter']['offset'])
if 'resample' in request['rangeFilter']:
maxpoints = int(request['rangeFilter']['resample'])
if 'resampleMethod' in request['rangeFilter']:
if request['rangeFilter']['resampleMethod'] in ['avg', 'minmax']:
aggregate_method = request['rangeFilter']['resampleMethod']
if 'maxpoints' in request['timeline']:
maxpoints = int(request['timeline']['maxpoints'])
if count == -1:
count = self._count
if offset == -1:
offset = self._offset
if maxpoints == -1:
maxpoints = self._maxpoints
x, y = self._generateTimeline(column, 'SysStat', processName, count, offset, maxpoints, aggregate_method)
return {'node': {'val': json.dumps({"tp":2,"x":x,"y":y}), 'tp': 4}}
columnData = self._getColumnData(int(params[1]['paramValue']), 'SysStat', params[0]['paramValue'])
return {'node': {'val': columnData, 'tp': 4}}
elif element == 'SystemStatName' and len(params) == 2 and params[1]['paramName'] == 'Column' \
and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes:
columnData = self._getHeaderNames('SysStat', params[0]['paramValue'])
if len(columnData)>int(params[1]['paramValue']):
columnData=columnData[int(params[1]['paramValue'])]
else:
columnData = 'N/A'
return {'node': {'val': columnData, 'tp': 4}}
elif element == 'ProcStatLen' and len(params) == 1 and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes:
procId=params[0]['paramValue']
return {'node': {'val': len(self._processes[procId]['ProcStat']['lines'][1:-1]) if 'ProcStat' in self._processes[procId] else 0, 'tp': 4}}
elif element == 'SysStatLen' and len(params) == 1 and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes:
procId=params[0]['paramValue']
return {'node': {'val': len(self._processes[procId]['SysStat']['lines'][1:-1]) if 'SysStat' in self._processes[procId] else 0, 'tp': 4}}
elif element == 'TimelineLen':
return {'node': {'val': self._count, 'tp': 4}}
elif element == 'MaxPoints':
if 'rangeFilter' in request:
try:
if 'count' in request['rangeFilter']:
self._count = int(request['rangeFilter']['count'])
if 'offset' in request['rangeFilter']:
self._offset = int(request['rangeFilter']['offset'])
if 'resample' in request['rangeFilter']:
self._maxpoints = int(request['rangeFilter']['resample'])
if 'resampleMethod' in request['rangeFilter']:
if request['rangeFilter']['resampleMethod'] in ['avg', 'minmax']:
self._aggregate_method = request['rangeFilter']['resampleMethod']
except:
pass
return {'node': {'val': self._maxpoints, 'tp': 4}}
elif element == 'Aggregate':
return {'node': {'val': 'true' if self._aggregate else 'false', 'tp': 3}}
elif element == 'Errors' and len(params) == 1 and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes:
return {'node': {'val': self._getErrorOutput(self._config['Name'], os.path.join(self.SOURCE_ID, params[0]['paramValue'])).encode('hex'), 'tp': 5}}
elif element == 'Config' and len(params) == 1 and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes:
return {'node': {'val': self._processes[params[0]['paramValue']][element], 'tp': 4}}
elif element == 'StartTime' and len(params) == 1 and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes:
return {'node': {'val': time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(self._processes[params[0]['paramValue']][element])), 'tp': -4}}
elif element == 'Stop' and len(params) == 1 and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes:
if self._processes[params[0]['paramValue']]['Process'].poll() is None:
return {'node': {'val': '0', 'tp': 1}}
else:
return {'node': {'val': '0', 'tp': -1}}
elif element == 'Restart' and len(params) == 1 and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes:
if self._processes[params[0]['paramValue']]['Process'].poll() is None:
return {'node': {'val': '0', 'tp': -1}}
else:
return {'node': {'val': '0', 'tp': 1}}
elif element in ['Start', 'ClearWorkspace']:
return {'node': {'val': '0', 'tp': 1}}
elif element in ['Delete'] and len(params) == 1 and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes:
if self._processes[params[0]['paramValue']]['Process'].poll() is None:
return {'node': {'val': '0', 'tp': -1}}
else:
return {'node': {'val': '0', 'tp': 1}}
elif element == 'Workspace' and len(params) == 1 and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes:
return {'node': {'val': getWorkspace(self._config['Name'], os.path.join(self.SOURCE_ID, params[0]['paramValue'])), 'tp': 4}}
elif element == 'DownloadWorkspace' and len(params) == 1 and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes:
return {'node': {'val': getNameOfDownloadableZip(self._config['Name'], os.path.join(self.SOURCE_ID, params[0]['paramValue'])), 'tp': 4}}
elif element == 'Status' and len(params) == 1 and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes:
statusCode = self._processes[params[0]['paramValue']]['Process'].poll()
val = ''
if statusCode is None:
return {'node': {'val': '[led:green]running', 'tp': 11}}
elif statusCode == 0:
return {'node': {'val': '[led:blue]stopped - 0', 'tp': 11}}
else:
return {'node': {'val': '[led:red]stopped - ' + str(statusCode), 'tp': 11}}
elif element == 'GuiAddress' and len(params) == 3 and params[0]['paramName'] == 'IPAddress' and params[1]['paramName'] == 'Port' and params[2]['paramName'] == 'Status':
guiAddress = 'http://' + params[0]['paramValue'] + ':' + str(params[1]['paramValue'])
guiAddress = 'proxy/' + guiAddress.encode('hex') + '/'
guiAddress += '?hideRibbon=true'
if 'online' in params[2]['paramValue']:
return {'node': {'val': guiAddress, 'tp': 4}}
else:
return {'node': {'val': guiAddress, 'tp': -4}}
def _setDataHandler(self, request, userCredentials):
#if userCredentials['username'] is not None:
if userCredentials['username'] in self._userData:
perfMonitor = self._userData[userCredentials['username']]
else:
perfMonitor = self._createNewMonitor(userCredentials['username'])
element = request['element']
params = request['params']
content = request['content']
if element == 'Start':
configs = {}
options = {}
for param in params:
if param['paramName'] == 'PerfMonitor':
perfMonitor = param['paramValue']
elif param['paramName'] == 'Options':
options = json.loads(param['paramValue'])
else:
return {'node': {'val': 'Wrong parameter for: ' + element, 'tp': 4}}
self._start(perfMonitor, options)
return {'node': {'val': '0', 'tp': 1}}
elif element == 'Options' and len(params) == 1 and params[0]['paramName'] == 'PerfMonitor' and params[0]['paramValue'] in self._config['PerfMonitors']:
options = {}
try:
options = json.dumps(json.loads(content))
self._config['PerfMonitors'][params[0]['paramValue']]['options'] = options
except Exception as e:
self._logger.error('Wrong format: '+ content+':'+str(e))
return {'node': {'val': content, 'tp': 4}}
elif element == 'Stop' and len(params) == 1 and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes:
self._stop(perfMonitor, params[0]['paramValue'])
return {'node': {'val': '0', 'tp': 1}}
elif element == 'Restart' and len(params) == 1 and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes:
self._restart(params[0]['paramValue'])
return {'node': {'val': '0', 'tp': 1}}
elif element == 'Delete' and len(params) == 1 and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes:
if self._processes[params[0]['paramValue']]['Process'].poll() is None:
return {'node': {'val': '0', 'tp': -1}}
else:
self._processes.pop(params[0]['paramValue'])
deleteWorkspace(self._config['Name'], os.path.join(self.SOURCE_ID, params[0]['paramValue']))
return {'node': {'val': '0', 'tp': 1}}
elif element == 'DownloadWorkspace' and params[0]['paramName'] == 'PMProcess' and params[0]['paramValue'] in self._processes:
zip = createDownloadableZip(self._config['Name'], os.path.join(self.SOURCE_ID, params[0]['paramValue']))
return {'node': {'val': zip, 'tp': 4}}
elif element == 'ClearWorkspace':
clearWorkspace(os.path.join(self._config['Name'], self.SOURCE_ID), self._processes)
return {'node': {'val': '0', 'tp': 1}}
elif element == 'TimelineLen':
self._count = int(content)
return {'node': {'val': self._count, 'tp': 4}}
elif element == 'MaxPoints':
self._maxpoints = int(content)
return {'node': {'val': self._maxpoints, 'tp': 4}}
elif element == 'Aggregate':
self._aggregate = ( content.lower() == 'true' )
return {'node': {'val': content, 'tp': 3}}
def _createNewMonitor(self, username):
self._userData[username] = {
}
def _start(self, perfMonitor, options):
#start performance monitoring
self._logger.info('Starting performance measurement for perfMonitor: '+perfMonitor+' options: '+json.dumps(options))
self._startProcess(perfMonitor, options)
def _stop(self, perfMonitor, processId):
#stop performance monitoring
self._logger.info('Performance measurement stopped')
self._killProcess(processId)
self._processes[processId]['EndTime'] = time.time()
def _restart(self, processId):
#restart performance monitoring
perfMonitor=self._processes[processId]['PerformanceMonitor']
options=self._processes[processId]['Options']
self._logger.info('Restarting performance measurement for perfMonitor: '+perfMonitor+' options: '+json.dumps(options))
self._processes.pop(processId)
self._startProcess(perfMonitor, options)
def _loadProcessesOnStart(self):
path = getPathOfDaemonProcesses(self._config['Name'])
path = os.path.join(path,self.SOURCE_ID)
if os.path.exists(path):
for item in os.listdir(path):
try:
with open(os.path.join(path, item, 'processDescriptor.json'), 'r') as f:
descriptor = json.load(f)
self._processes[item] = {
'Process': DummyProcess(0 if 'exitcode' not in descriptor else descriptor['exitcode']),
'StartTime': os.stat(os.path.join(path, item)).st_ctime,
'PerformanceMonitor': descriptor['performanceMonitor'],
'Options': descriptor['options']
}
except:
pass
def _getProcessId(self, performanceMonitor):
i = 0
processId = performanceMonitor + '_' + str(i)
while processId in self._processes:
i += 1
processId = performanceMonitor + '_' + str(i)
return processId
def _startProcess(self, performanceMonitor, options):
processId = self._getProcessId(performanceMonitor)
workspace = self._saveConfig(processId, performanceMonitor, options)
process = {}
process['Process'] = self._startSubProcess(processId, self._startCommand(processId, performanceMonitor, workspace, options))
process['StartTime'] = time.time()
process['PerformanceMonitor'] = performanceMonitor
process['Options'] = options
self._processes[processId] = process
def _startSubProcess(self, processId, command):
#pass
f_stdout=open(getConsolePath(self._config['Name'], os.path.join(self.SOURCE_ID, processId)), 'a')
f_stderr=open(self._getErrorPath(self._config['Name'], os.path.join(self.SOURCE_ID, processId)), 'a')
f_stderr.write('Starting command `'+command+'`\n\n')
f_stderr.flush()
return subprocess.Popen(command, shell = True, preexec_fn=os.setsid, stdout=f_stdout, stderr=f_stderr)
def _killProcess(self, processId):
process=self._processes[processId]['Process']
if process and process.poll() is None:
os.killpg(os.getpgid(self._processes[processId]['Process'].pid), signal.SIGTERM)
self._logger.info('Killed PerfMon instance: ' + processId)
def _startCommand(self, processId, performanceMonitor, workspace, options):
if performanceMonitor not in self._config['PerfMonitors']:
performanceMonitor = 'default'
command = self._config['PerfMonitors'][performanceMonitor]['cmd']
for option in options:
if type(options[option]) == type(True):
command += ' ' + option.split(':', 1)[0]
else:
command += ' ' + option.split(':', 1)[0] + ' ' + options[option]
#replace <workspace> with the actual directory:
command = self._substituteWorkspace(command, workspace)
self._logger.info('Starting performance monitor instance: ' + command)
return command
def _substituteWorkspace(self, command, workspace):
workspaceRepl=re.compile(r'<workspace>')
return workspaceRepl.sub(workspace, command)
def _saveConfig(self, processId, performanceMonitor, options):
try:
workspace = getWorkspace(self._config['Name'], os.path.join(self.SOURCE_ID, processId))
deleteWorkspace(self._config['Name'], os.path.join(self.SOURCE_ID, processId))
os.makedirs(workspace)
descriptor = {
'performanceMonitor': performanceMonitor,
'options': options
}
with open(os.path.join(workspace, 'processDescriptor.json'), 'w') as f:
json.dump(descriptor, f, indent=4)
return workspace
except Exception as e:
self._logger.exception('Failed to save config files: ' + self._config['Name'] + ' ' + processId)
return ''
def _saveExitcode(self, processId):
exitcode=self._processes[processId]['Process'].poll()
if exitcode:
workspace = getWorkspace(self._config['Name'], os.path.join(self.SOURCE_ID, processId))
configFile = os.path.join(workspace, 'processDescriptor.json')
try:
with open(configFile, 'r') as f:
descriptor = json.load(f)
if 'exitcode' not in descriptor:
descriptor['exitcode'] = exitcode
f.close()
with open(configFile, 'w') as f:
json.dump(descriptor, f, indent=4)
except:
pass
def _getOutput(self, fileName, numberOfLinesToRead=50, fromTail=True, currentPos=0):
try:
with open(fileName, 'r') as f:
if currentPos != 0:
f.seek(currentPos)
lines = f.readlines() if fromTail else [f.readline() for i in range(numberOfLinesToRead)]
if len(lines) > numberOfLinesToRead and numberOfLinesToRead > 0:
val = ''.join(lines[-numberOfLinesToRead:]) if fromTail else ''.join(lines[:numberOfLinesToRead])
else:
val = ''.join(lines)
pos = f.tell()
except Exception as e:
raise e
return val, pos
def _getErrorPath(self, daemonName, processId):
return os.path.join(getWorkspace(daemonName, processId), 'stderr.out')
def _getErrorOutput(self, daemonName, processId, numberOfLinesToRead = 50):
val = ''
try:
val,pos = self._getOutput(self._getErrorPath(daemonName, processId), numberOfLinesToRead)
except Exception as e:
val = '\n'.join(['Failed to open error.log', daemonName, processId, str(e)])
#remove the process if stderr output not found (it should be there always)
#remove path from processID:
self._processes.pop(os.path.basename(processId))
return val
def _getSysStatPath(self, daemonName, processId):
return os.path.join(getWorkspace(daemonName, processId), 'sysstat.out')
def _getStatOutput(self, statType, daemonName, processId, numberOfLinesToRead = 50, fromTail=True):
val = ''
if statType not in ['SysStat', 'ProcStat']:
raise 'Invalid statType: ' + statType
getPath = self._getSysStatPath if statType == 'SysStat' else getConsolePath
procId=os.path.basename(processId)
currentPos = 0 if statType not in self._processes[procId
] else 0 if 'currentPos' not in self._processes[procId][statType
] else self._processes[procId][statType]['currentPos']
if fromTail:
linesToReadFromFile = -1 # all until EOF
else:
currentPos = 0 # read from beginning
linesToReadFromFile = numberOfLinesToRead
try:
if not fromTail and statType in self._processes[procId] and len(self._processes[procId][statType]['lines'])>=linesToReadFromFile:
val = '\n'.join([ '\t'.join(line) for line in self._processes[procId][statType]['lines'][:numberOfLinesToRead] ])
else:
val,pos = self._getOutput(getPath(daemonName, processId), linesToReadFromFile, fromTail, currentPos)
if not fromTail and statType not in self._processes[procId] and pos > currentPos:
self._processes[procId][statType] = {'currentPos': pos, 'lines': [ string.split(line, '\t') for line in string.split(val, '\n') ]}
if (fromTail and pos>currentPos):
if statType not in self._processes[procId]:
self._processes[procId][statType] = {'currentPos': pos, 'lines': [ string.split(line, '\t') for line in string.split(val, '\n') ]}
else:
buffer='' if len( self._processes[procId][statType]['lines']) == 0 else '\t'.join(self._processes[procId][statType]['lines'].pop())
self._processes[procId][statType]['currentPos'] = pos
self._processes[procId][statType]['lines'] += [ string.split(line, '\t') for line in string.split(buffer+val, '\n') ]
val = '' if pos == 0 \
else '\n'.join([ '\t'.join(line) for line in self._processes[procId][statType]['lines'][-numberOfLinesToRead:] ]) if fromTail \
else '\n'.join([ '\t'.join(line) for line in self._processes[procId][statType]['lines'][:numberOfLinesToRead] ])
except Exception as e:
raise e
return val
def _getSysStatOutput(self, daemonName, processId, numberOfLinesToRead = 50, fromTail=True):
val = ''
try:
val = self._getStatOutput('SysStat', daemonName, processId, numberOfLinesToRead, fromTail)
except Exception as e:
val = '\n'.join(['System Statistics is not available for ' + processId + ' on ' + daemonName, str(e),
'To enable System Statistics, the process should write to the file <workspace>/sysstat.out',
'Note, that <workspace> in the command will be replaced with the actual value'])
return val
def _getProcStatOutput(self, daemonName, processId, numberOfLinesToRead = 50, fromTail=True):
val = ''
try:
val = self._getStatOutput('ProcStat', daemonName, processId, numberOfLinesToRead, fromTail)
except Exception as e:
val = '\n'.join(['Process Statistics is not available for ' + processId + ' on ' + daemonName, str(e),
'To enable System Statistics, the process should write to the file <workspace>/console.out',
'Note, that <workspace> in the command will be replaced with the actual value'])
return val
def _getHeaderNames(self, statType, procName):
try:
consoleOutput = self._getStatOutput(statType, self._config['Name'],
os.path.join(self.SOURCE_ID, procName), numberOfLinesToRead=1,
fromTail=False)
except: pass
procId = procName
headerNames = [] if statType not in self._processes[procId] or len(self._processes[procId][statType]['lines']) == 0\
else self._processes[procId][statType]['lines'][0]
return headerNames
def _getColumnData(self, column, statType, procName):
try:
consoleOutput = self._getStatOutput(statType, self._config['Name'],
os.path.join(self.SOURCE_ID, procName), numberOfLinesToRead=1)
except: pass
procId = procName
lastRow = [] if procId not in self._processes or statType not in self._processes[procId] or len(self._processes[procId][statType]['lines']) < 2\
else self._processes[procId][statType]['lines'][len(self._processes[procId][statType]['lines'])-2]
return lastRow[column] if len(lastRow) > column else 'N/A'
def _generateTimeline(self, column, statType, processName, count, offset, maxpoints, aggregate_method):
'''
if count > 0 it returns the data in timeline format (x,y) with
1) the last count number of elements (offset == -1)
2) count number of elements from offset
when aggregate is True the whole data list is aggregated into timelineLen long list by averaging the elements
if count == 0 it returns the whole list
if count < 0
1) it returns all elements from the -count-th element (offset == -1)
2) it returns count elements from the back (offset is also measured from the back)
if maxpoints > 0 and count>maxpoints: data is resampled to have at most maxpoints elements
if maxpoints <= 0 : no resampling is done
'''
aggregate = self._aggregate
try:
consoleOutput = self._getStatOutput(statType, self._config['Name'], os.path.join(self.SOURCE_ID, processName),
numberOfLinesToRead=1)
except: pass
procId = processName
lines = self._processes[procId][statType]['lines'][1:-1] # cut the header and buffer lines
nofLines = len(lines)
lines = [] if statType not in self._processes[procId]\
else lines[-count:] if offset == -1 or count == 0\
else lines[count:] if offset == 0 and count < 0\
else lines[-offset+count:-offset] if count < 0\
else lines[offset:offset+count]
nofLines = len(lines)
if aggregate and maxpoints > 0 and ((count>0 and count>maxpoints and nofLines > maxpoints) or (count <= 0 and nofLines>maxpoints)):
aggregate = True
else:
aggregate = False
if not aggregate or statType not in self._processes[procId]:
rows = lines
else:
if aggregate_method == 'minmax':
maxpoints //= 2
aggregationStep = float(nofLines) / maxpoints
#self._logger.info(statType+' noflines: '+str(nofLines)+ ' timelineLen: '+str(timelineLen) + ' aggregation: '+ str(aggregation) +
# ' last index: '+str( (nofLines // aggregation)*aggregation-1 ) + ' nofLines // aggregation: '+str(nofLines // aggregation))
rows = []
currentLineIdx=0
aggregationLimit = 0
for j in range(maxpoints):
if aggregate_method == 'avg':
sum = 0
elif aggregate_method == 'minmax':
min = None
max = None
maxIdx = None
minIdx = None
else:
# unsupported method
pass
nofAggregatedLines = 0
aggregationLimit += aggregationStep
while currentLineIdx<aggregationLimit and currentLineIdx < nofLines:
currentLine=lines[currentLineIdx]
try:
if aggregate_method == 'avg':
sum += float(currentLine[column]) if column < len(currentLine) \
else 0
elif aggregate_method == 'minmax':
if column < len(currentLine):
currenValue = float(currentLine[column])
if max is None or currenValue > max:
max, maxIdx = currenValue, currentLineIdx
if min is None or currenValue < min:
min, minIdx = currenValue, currentLineIdx
else:
# unsupported method
pass
except:
if aggregate_method == 'avg':
if currentLineIdx >= nofLines:
self._logger.info('len: '+str(nofLines)+' index: ' + str(currentLineIdx))
if column >= len(currentLine):
self._logger.info('len: '+str(len(currentLine))+' column: ' + str(column))
sum = currentLine[column] if sum == 0 else 0
elif aggregate_method == 'minmax':
max, maxIdx = 0, currentLineIdx
min, minIdx = 0, currentLineIdx
else:
# unsupported method
pass
nofAggregatedLines += 1
currentLineIdx += 1
if aggregate_method == 'avg':
if type(sum) == type(1.0):
sum = "%.4f" % (sum/nofAggregatedLines)
rows += [[sum]] if type(sum) == type(1.0) else [[sum]]
elif aggregate_method == 'minmax':
if max is not None and min is not None:
max = "%.4f" % max
min = "%.4f" % min
rows += [[max], [min]] if maxIdx < minIdx else [[min], [max]]
else:
# unsupported method
pass
column = 0 # data is written into the 0-th column
#x = []
y = []
for row in rows:
# x.append(row[0])
if column < len(row):
y.append(row[column])
x = range(1, len(y) + 1)
return x, y
def handleMessage(self, method, path, headers, body, userCredentials, response):
response['body'] = json.dumps(self._dsRestAPI.parseRequest(json.loads(body), userCredentials))
response['headers']['Content-Type'] = 'application/json'
def getDataSourceHandlers(self):
return {
self.SOURCE_ID: {
'getDataHandler': self._getDataHandler,
'setDataHandler': self._setDataHandler
}
}
def close(self):
#self._runThread = False
for processId in self._processes:
self._killProcess(processId)
self._saveExitcode(processId)