blob: 2782888b4ba4678aede0e38156e2434f439b60da [file] [log] [blame]
/********************************************************************************
* Copyright (c) 2008,2009
* School of Computer, National University of Defense Technology, P.R.China
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Jie Jiang <jiangjie@nudt.edu.cn>
*******************************************************************************/
#ifndef SLURM_PROXY
#define SLURM_PROXY
#endif
#ifdef __gnu_linux__
#define _GNU_SOURCE
#endif /* __gnu_linux__ */
#include "config.h"
#include <fcntl.h>
#include <getopt.h>
#include <unistd.h>
#include <grp.h>
#include <pwd.h>
#include <stdbool.h>
#include <errno.h>
#include <signal.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <stdint.h>
#include <assert.h>
#include <unistd.h>
#include <pthread.h>
#include <libgen.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/select.h>
#include <sys/param.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/socket.h>
#include "proxy.h"
#include "proxy_tcp.h"
#include "handler.h"
#include "list.h"
#include "args.h"
#include "rangeset.h"
#include "slurm/slurm.h"
#include "srun_opt.h"
#define PTP_JOBID 0
#define SLURM_JOBID 1
#define DEBUG_JOBID 2
#define JOBID_INIT 0
#define JOBID_FAIL -1
#define MAX_THREADS 60
#define CORE_INVALID -1
#define CORE_DEFAULT 0
#define MAX_BUF_SIZE 8192
#define MAX_SRUN_ARG_NUM 256
#define ALL_JOBSTATE -1
/*
* Need to undef these if we include
* two config.h files
*/
#undef PACKAGE_BUGREPORT
#undef PACKAGE_NAME
#undef PACKAGE_STRING
#undef PACKAGE_TARNAME
#undef PACKAGE_VERSION
#define WIRE_PROTOCOL_VERSION "2.0"
#define DEFAULT_HOST "localhost"
#define DEFAULT_PROXY "tcp"
/*
* Proxy server states. The SHUTTING_DOWN state is used to
* give the proxy a chance to send any pending events once
* a QUIT command has been received.
*/
#define STATE_INIT 0
#define STATE_RUNNING 1
#define STATE_SHUTTING_DOWN 2
#define STATE_SHUTDOWN 3
/*
* RTEV codes must EXACTLY match org.eclipse.ptp.rtsystem.proxy.event.IProxyRuntimeEvent
*/
#define RTEV_OFFSET 200
#define RTEV_ERROR_SLURM_INIT RTEV_OFFSET + 1000
#define RTEV_ERROR_SLURM_FINALIZE RTEV_OFFSET + 1001
#define RTEV_ERROR_SLURM_SUBMIT RTEV_OFFSET + 1002
#define RTEV_ERROR_JOB RTEV_OFFSET + 1003
#define RTEV_ERROR_NATTR RTEV_OFFSET + 1007
//#define RTEV_ERROR_ORTE_BPROC_SUBSCRIBE RTEV_OFFSET + 1008
#define RTEV_ERROR_SIGNAL RTEV_OFFSET + 1009
/*
* Queue attributes
*/
#define DEFAULT_QUEUE_NAME "default"
#define SLURM_JOB_NAME_FMT "job%02d"
#define MAX_RETRIES 100
#define JOB_UPDATE_TIMER 1
#define NODE_UPDATE_TIMER 2
#define JOB_UPDATE_TIMEOUT 500000 /*usec*/
#define NODE_UPDATE_TIMEOUT 5000000 /*usec*/
/* SLURM job state and attributes */
#define SLURM_JOB_STATE_PENDING "PENDING"
#define SLURM_JOB_STATE_RUNNING "RUNNING"
#define SLURM_JOB_STATE_SUSPENDED "SUSPENDED"
#define SLURM_JOB_STATE_TERMINATED "TERMINATED"
#define SLURM_JOB_STATE_COMPLETE SLURM_JOB_STATE_TERMINATED
#define SLURM_JOB_STATE_CANCELLED "CANCELLED"
#define SLURM_JOB_STATE_FAILED "FAILED"
#define SLURM_JOB_STATE_TIMEOUT "TIMEOUT"
#define SLURM_JOB_STATE_NODEFAIL "NODEFAIL"
#define SLURM_JOB_SUB_ID_ATTR "jobSubId"
#define SLURM_JOB_NUM_PROCS_ATTR "jobNumProcs" //-n
#define SLURM_JOB_NUM_NODES_ATTR "jobNumNodes" //-N
#define SLURM_JOB_TIME_LIMIT_ATTR "jobTimeLimit" //-t
#define SLURM_JOB_PARTITION_ATTR "jobPartition" //-p
#define SLURM_JOB_ID_ATTR "jobId" //--jobid
#define SLURM_JOB_NODELIST_ATTR "jobNodeList" //-w
#define SLURM_JOB_TYPE_ATTR "jobType" //--jobtype
#define SLURM_JOB_IOLABEL_ATTR "jobIoLabel" //-l
#define SLURM_JOB_VERBOSE_ATTR "jobVerbose" //-v
#define SLURM_JOB_EXEC_NAME_ATTR "execName"
#define SLURM_JOB_EXEC_PATH_ATTR "execPath"
#define SLURM_JOB_WORKING_DIR_ATTR "workingDir"
#define SLURM_JOB_PROG_ARGS_ATTR "progArgs"
#define SLURM_JOB_ENV_ATTR "env"
#define SLURM_JOB_DEBUG_EXEC_NAME_ATTR "debugExecName"
#define SLURM_JOB_DEBUG_EXEC_PATH_ATTR "debugExecPath"
#define SLURM_JOB_DEBUG_ARGS_ATTR "debugArgs"
#define SLURM_JOB_DEBUG_FLAG_ATTR "debug"
/* SLURM node state and attributes */
#define SLURM_NODE_STATE_UNKNOWN "UNKNOWN"
#define SLURM_NODE_STATE_DOWN "DOWN"
#define SLURM_NODE_STATE_IDLE "UP"
#define SLURM_NODE_STATE_ALLOCATED "ALLOCATED"
#define SLURM_NODE_EXTRA_STATE_ATTR "nodeExtraState"
#define SLURM_NODE_NUMBER_ATTR "nodeNumber"
#define SLURM_NODE_SOCKETS_ATTR "sockNumber"
#define SLURM_NODE_CORES_ATTR "coreNumber"
#define SLURM_NODE_THREADS_ATTR "threadNumber"
#define SLURM_NODE_ARCH_ATTR "cpuArch"
#define SLURM_NODE_OS_ATTR "OS"
#define EXIT_JOB_ALLOC_FAIL -1
#define EXIT_JOB_IOTHREAD_FAIL -2
#define EXIT_EXEC_FAIL -3
struct sync_msg {
int slurm_jobid;
bool jobid_set;
bool io_ready;
};
typedef struct sync_msg sync_msg;
struct ptp_machine {
int id;
List * nodes;
};
typedef struct ptp_machine ptp_machine;
struct ptp_slurm_node {
int id; /* model element id, generated by proxy agent */
int number; /* node number, assigned by SLURM */
char * name;
uint16_t state; /* (uint16_t)node_info_t.state, converted to (char *) */
uint16_t sockets;
uint16_t cores;
uint16_t threads;
char * arch;
char * os;
};
typedef struct ptp_slurm_node ptp_node;
struct ptp_slurm_process {
int id;
int node_id;
int task_id; /* MPI rank */
int pid;
};
typedef struct ptp_slurm_process ptp_process;
struct ptp_slurm_job {
int ptp_jobid; /* job ID as known by PTP */
int slurm_jobid; /* job ID that will be used by program when it starts */
bool need_alloc; /* need to allocate new resource */
int debug_jobid;
int num_procs; /* number of procs requested for program (debugger: num_procs+1) */
int state; /* job state(slurm definition) */
pid_t srun_pid; /* pid of the srun process */
bool debug; /* job is debug job */
bool attach; /* attach debug */
int fd_err; /* fd of pipe for srun's stderr */
int fd_out; /* fd of pipe for srun's stdout */
int iothread_id; /* id of thread forwarding srun's stdio */
bool iothread_exit_req; /* request iothread to exit */
bool iothread_exit; /* flag inidication iothread has exited */
bool removable;
ptp_process ** procs; /* procs of this job */
rangeset * set; /* range set of proc ID */
};
typedef struct ptp_slurm_job ptp_job;
typedef struct slurmctld_comm_addr {
char * hostname;
uint16_t port;
}slurmctld_comm_addr_t;
typedef void SigFunc(int);
typedef int32_t slurm_fd;
static int SLURM_Initialize(int, int, char **);
static int SLURM_ModelDef(int, int, char **);
static int SLURM_StartEvents(int, int, char **);
static int SLURM_StopEvents(int, int, char **);
static int SLURM_SubmitJob(int, int, char **);
static int SLURM_TerminateJob(int, int, char **);
static int SLURM_Quit(int, int, char **);
static FILE * init_logfp();
static void debug_log(FILE * fp, char * fmt,...);
static void * srun_output_forwarding(void * arg);
static bool job_update_timeout();
static void update_job_state(int slurm_jobid);
static bool node_update_timeout();
static void update_node_state();
static int create_node_list(ptp_machine *mach);
static void init_job_timer();
static void init_node_timer();
static struct timeval job_update_timer;
static struct timeval node_update_timer;
static sync_msg * sync_msg_addr;
static FILE * logfp;
static int destroy_job = 0; /* job allocation cancelled by signal */
static bool enable_state_update = false;
static srun_opt_t opt;
static allocation_msg_thread_t* msg_thr = NULL;
static slurmctld_comm_addr_t slurmctld_comm_addr;
static int gTransID = 0; /* transaction id for start of event stream, is 0 when events are off */
static int gBaseID = -1; /* base ID for event generation */
static int gLastID = 1; /* ID generator */
static int gQueueID; /* ID of default queue */
static int proxy_state = STATE_INIT;
static proxy_svr * slurm_proxy;
static List * gJobList;
static List * gMachineList;
static int ptp_signal_exit = 0;;
static proxy_svr_helper_funcs helper_funcs = {
NULL, // newconn() - can be used to reject connections
NULL // numservers() - if there are multiple servers, return the number
};
#define CMD_BASE 0
static proxy_cmd cmds[] = {
SLURM_Quit,
SLURM_Initialize,
SLURM_ModelDef,
SLURM_StartEvents,
SLURM_StopEvents,
SLURM_SubmitJob,
SLURM_TerminateJob
};
static proxy_commands command_tab = {
CMD_BASE,
sizeof(cmds)/sizeof(proxy_cmd),
cmds
};
static struct option longopts[] = {
{"proxy", required_argument, NULL, 'P'},
{"port", required_argument, NULL, 'p'},
{"host", required_argument, NULL, 'h'},
{NULL, 0, NULL, 0}
};
/*
* If the log file is specified, return the FILE pointer.
* Otherwise, default to stderr.
*/
static FILE *
init_logfp()
{
FILE * fp = stderr;
char * logdir;
char * logfile;
int rc;
if ((logdir = getenv("PTP_SLURM_PROXY_LOGDIR")) == NULL)
return fp;
rc = access(logdir, R_OK|W_OK);
if (rc < 0) {
fprintf(stderr, "Please ensure the DIR %s exists and you can READ/WRITE it!\n", logdir);
return fp;
}
asprintf(&logfile,"%s/ptp_proxy.log",logdir);
fp = fopen(logfile,"w+");
if(fp == NULL)
fp = stderr;
free(logfile);
return fp;
}
/*
* Write debug info into logfile.
*/
static void
debug_log(FILE * fp, char * fmt,...)
{
va_list va;
if (fp == NULL)
return;
va_start(va, fmt);
vfprintf(fp, fmt, va);
fflush(fp);
va_end(va);
return;
}
/*
* Generate a model element ID.
*/
static int
generate_id(void)
{
return gBaseID + gLastID++;
}
/*
* Create a new machine.
*/
static ptp_machine *
new_machine()
{
ptp_machine * m = (ptp_machine *)malloc(sizeof(ptp_machine));
m->id = generate_id();
m->nodes = NewList();
AddToList(gMachineList, (void *)m);
return m;
}
/*
* Convert SLURM node state code (uint16_t) to STRING.
*/
static char *
nodestate_to_string(uint16_t slurm_node_state)
{
char * str = NULL;
switch (slurm_node_state & NODE_STATE_BASE) {
case NODE_STATE_UNKNOWN:
str = SLURM_NODE_STATE_UNKNOWN;
break;
case NODE_STATE_DOWN:
str = SLURM_NODE_STATE_DOWN;
break;
case NODE_STATE_IDLE:
str = SLURM_NODE_STATE_IDLE;
break;
case NODE_STATE_ALLOCATED:
str = SLURM_NODE_STATE_ALLOCATED;
break;
default:
str = SLURM_NODE_STATE_UNKNOWN;
break;
}
return str;
}
/*
* Convert SLURM job state code(uint16_t) to STRING.
*/
static char *
jobstate_to_string(uint16_t slurm_job_state)
{
char * str = NULL;
switch (slurm_job_state & (~JOB_COMPLETING)) {
case JOB_PENDING:
str = SLURM_JOB_STATE_PENDING;
break;
case JOB_RUNNING:
str = SLURM_JOB_STATE_RUNNING;
break;
case JOB_SUSPENDED:
str = SLURM_JOB_STATE_SUSPENDED;
break;
case JOB_COMPLETE:
str = SLURM_JOB_STATE_COMPLETE;
break;
case JOB_CANCELLED:
str = SLURM_JOB_STATE_CANCELLED;
break;
case JOB_FAILED:
str = SLURM_JOB_STATE_FAILED;
break;
case JOB_TIMEOUT:
str = SLURM_JOB_STATE_TIMEOUT;
break;
case JOB_NODE_FAIL:
str = SLURM_JOB_STATE_NODEFAIL;
break;
default:
str = "Unknown job state";
break;
}
return str;
}
/*
* Create a new node and insert it into machine node list.
*/
static ptp_node *
new_node(ptp_machine *mach, node_info_t *ni)
{
static int node_number = 0;
ptp_node * n = (ptp_node *)malloc(sizeof(ptp_node));
memset((char *)n, 0, sizeof(ptp_node));
n->id = generate_id();
n->number = node_number++;
n->sockets = ni->sockets;
n->cores = ni->cores;
n->threads = ni->threads;
if (ni->name != NULL)
n->name = strdup(ni->name);
n->state = ni->node_state;
if (ni->arch != NULL)
n->arch = strdup(ni->arch);
if (ni->os != NULL)
n->os = strdup(ni->os);
AddToList(mach->nodes, (void *)n);
return n;
}
/*
* Get node pointer from node name.
*/
static ptp_node *
find_node_by_name(char *name)
{
ptp_machine * m;
ptp_node * n;
for (SetList(gMachineList); (m = (ptp_machine *)GetListElement(gMachineList)) != NULL; ) {
for (SetList(m->nodes); (n = (ptp_node *)GetListElement(m->nodes)) != NULL; ) {
if (strcmp(name, n->name) == 0)
return n;
}
}
return NULL;
}
/*
* Create a new process and insert it into job structure.
*/
static ptp_process *
new_process(ptp_job *job, int node_id, int task_id, int pid)
{
ptp_process * p = (ptp_process *)malloc(sizeof(ptp_process));
if (p == NULL)
return NULL;
p->id = generate_id();
p->task_id = task_id;
p->pid = pid;
p->node_id = node_id;
job->procs[task_id] = p;
insert_in_rangeset(job->set, p->id);
return p;
}
/*
* Free the space allocated to ptp_process.
*/
static void
free_process(ptp_process *p)
{
if (p)
free(p);
return;
}
/*
* Get process pointer given job and task_id.
*/
static ptp_process *
find_process(ptp_job *job, int task_id)
{
if (task_id < 0 || task_id >= job->num_procs)
return NULL;
return job->procs[task_id];
}
/*
* Get jobid of 'which' type
*/
static int
get_jobid(ptp_job *j, int which)
{
int id = -1;
if (j != NULL) {
switch (which) {
case PTP_JOBID:
id = j->ptp_jobid;
break;
case SLURM_JOBID:
id = j->slurm_jobid;
break;
case DEBUG_JOBID:
id = j->debug_jobid;
break;
default:
id = -1;
break;
}
}
return id;
}
/*
* Create a new job,set job attributes and add to gJobList.
*/
static ptp_job *
new_job(int num_procs, bool debug, int ptp_jobid, int slurm_jobid, int debug_jobid, bool need_alloc)
{
ptp_job * j = (ptp_job *)malloc(sizeof(ptp_job));
if (j == NULL) {
debug_log(logfp, "Allcoate space for job struct fail. \n");
return NULL;
}
j->ptp_jobid = ptp_jobid;
j->slurm_jobid = slurm_jobid;
j->need_alloc = need_alloc;
j->num_procs = num_procs;
j->debug = debug;
j->debug_jobid = debug_jobid;
j->attach = false;
j->srun_pid = -1;
j->state = -1;
j->removable = false;
j->fd_err = -1;
j->fd_out = -1;
j->iothread_id = -1;
j->iothread_exit_req = false;
j->iothread_exit = false;
j->set = new_rangeset();
j->procs = (ptp_process **)malloc(sizeof(ptp_process *) * num_procs);
memset(j->procs, 0, sizeof(ptp_process *) * num_procs);
AddToList(gJobList, (void *)j);
return j;
}
/*
* Free job space allocated by new_job().
*/
static void
free_job(ptp_job *j)
{
int i;
if (j) {
if (j->procs) {
for (i = 0; i < j->num_procs; i++) {
if (j->procs[i] != NULL)
free_process(j->procs[i]);
}
free(j->procs);
}
if (j->set)
free_rangeset(j->set);
free(j);
}
}
/*
* Get job pointer given the jobid of 'which' type.
* If debug is true, find the job using the debug jobid.
*/
static ptp_job *
find_job(int jobid, int which)
{
ptp_job * j = NULL;
for (SetList(gJobList); (j = (ptp_job *)GetListElement(gJobList)) != NULL; ) {
if (get_jobid(j, which) == jobid)
return j;
}
return NULL;
}
/*
* Determin if the given job is ACTIVE.
*/
static bool
slurm_job_active(ptp_job * job)
{
uint16_t state;
bool active;
state = job->state;
if (state == JOB_COMPLETE || state == JOB_CANCELLED
|| state == JOB_FAILED || state == JOB_TIMEOUT || state == JOB_NODE_FAIL) {
active = false;
} else
active = true; /* JOB_COMPLETING is ACTIVE */
return active;
}
static void
sendOKEvent(int trans_id)
{
proxy_svr_queue_msg(slurm_proxy, proxy_ok_event(trans_id));
}
static void
sendShutdownEvent(int trans_id)
{
proxy_svr_queue_msg(slurm_proxy, proxy_shutdown_event(trans_id));
}
static void
sendMessageEvent(int trans_id, char *level, int code, char *fmt, ...)
{
va_list ap;
va_start(ap, fmt);
proxy_svr_queue_msg(slurm_proxy, proxy_message_event(trans_id, level, code, fmt, ap));
va_end(ap);
}
static void
sendErrorEvent(int trans_id, int code, char *fmt, ...)
{
va_list ap;
va_start(ap, fmt);
debug_log(logfp, "sendErrorEvent(%d,%d),", trans_id, code);
debug_log(logfp, fmt,ap);
proxy_svr_queue_msg(slurm_proxy, proxy_error_event(trans_id, code, fmt, ap));
va_end(ap);
}
static void
sendJobSubErrorEvent(int trans_id, char *jobSubId, char *msg)
{
proxy_svr_queue_msg(slurm_proxy, proxy_submitjob_error_event(trans_id, jobSubId, RTEV_ERROR_SLURM_SUBMIT, msg));
}
static void
sendJobTerminateErrorEvent(int trans_id, int id, char *msg)
{
char * job_id;
asprintf(&job_id, "%d", id);
proxy_svr_queue_msg(slurm_proxy, proxy_terminatejob_error_event(trans_id, job_id, RTEV_ERROR_JOB, msg));
}
static void
sendNewMachineEvent(int trans_id, int id, char *name)
{
char * rm_id;
char * machine_id;
asprintf(&rm_id, "%d", gBaseID);
asprintf(&machine_id, "%d", id);
proxy_svr_queue_msg(slurm_proxy, proxy_new_machine_event(trans_id, rm_id, machine_id, name, MACHINE_STATE_UP));
free(machine_id);
free(rm_id);
}
/*
* Get the number of node attributes setup in ptp_node.
*/
static int
num_node_attrs(ptp_node *node)
{
int cnt = 0;
if (node->number >= 0)
cnt++;
if (node->sockets > 0)
cnt++;
if (node->cores > 0)
cnt++;
if (node->threads > 0)
cnt++;
if (node->arch != NULL)
cnt++;
if (node->os != NULL)
cnt++;
return cnt;
}
/*
* Add node attributes to proxy_msg.
*/
static void
add_node_attrs(proxy_msg *m, ptp_node *node)
{
/* NODE_NUMBER_ATTR enables the node number ruler in Machine View */
if (node->number >= 0)
proxy_add_int_attribute(m, SLURM_NODE_NUMBER_ATTR, node->number);
proxy_add_int_attribute(m, SLURM_NODE_SOCKETS_ATTR, (int)node->sockets);
proxy_add_int_attribute(m, SLURM_NODE_CORES_ATTR, (int)node->cores);
proxy_add_int_attribute(m, SLURM_NODE_THREADS_ATTR, (int)node->threads);
if (node->arch != NULL)
proxy_add_string_attribute(m, SLURM_NODE_ARCH_ATTR, node->arch);
if (node->os != NULL)
proxy_add_string_attribute(m, SLURM_NODE_OS_ATTR, node->os);
}
static void
sendNewJobEvent(int trans_id, int jobid, char *name, char *jobSubId, char *state)
{
char * queue_id;
char * job_id;
asprintf(&queue_id, "%d", gQueueID);
asprintf(&job_id, "%d", jobid);
proxy_svr_queue_msg(slurm_proxy, proxy_new_job_event(trans_id, queue_id, job_id, name, state, jobSubId));
free(queue_id);
free(job_id);
}
static void
sendNewNodeEvent(int trans_id, int machid, ptp_machine *mach)
{
ptp_node * n;
proxy_msg * m;
char * machine_id;
char * node_id;
asprintf(&machine_id, "%d", machid);
m = proxy_new_node_event(trans_id, machine_id, SizeOfList(mach->nodes));
for (SetList(mach->nodes); (n = (ptp_node *)GetListElement(mach->nodes)) != NULL; ) {
asprintf(&node_id, "%d", n->id);
proxy_add_node(m, node_id, n->name, nodestate_to_string(n->state), num_node_attrs(n));
add_node_attrs(m, n);
free(node_id);
}
proxy_svr_queue_msg(slurm_proxy, m);
free(machine_id);
}
static void
sendNodeChangeEvent(int trans_id, char * id_range, char * state)
{
proxy_msg * m;
m = proxy_node_change_event(trans_id, id_range, 1);
proxy_add_string_attribute(m, NODE_STATE_ATTR, state);
proxy_svr_queue_msg(slurm_proxy, m);
}
/*
* FIXME:
* Add extra attributes when task topology information is available.
* This can be done via slurm_job_step_layout_get() API when updating job state.
*/
static void
sendNewProcessEvent(int trans_id, int jobid, ptp_process *p, char *state)
{
proxy_msg * m;
char * job_id;
char * proc_id;
char * name;
if (p == NULL)
return;
asprintf(&job_id, "%d", jobid);
asprintf(&proc_id, "%d", p->id);
asprintf(&name, "%d", p->task_id);
m = proxy_new_process_event(trans_id, job_id, 1);
/*
* By now, p->node_id, p->task_id, p->pid can't be obtained.
* So set the extra_attrs=0
*/
proxy_add_process(m, proc_id, name, state, 0);
/*
proxy_add_process(m, proc_id, name, state, 3);
proxy_add_int_attribute(m, PROC_NODEID_ATTR, p->node_id);
proxy_add_int_attribute(m, PROC_INDEX_ATTR, p->task_id);
proxy_add_int_attribute(m, PROC_PID_ATTR, p->pid);
*/
proxy_svr_queue_msg(slurm_proxy, m);
free(job_id);
free(proc_id);
free(name);
}
static void
sendNewQueueEvent(int trans_id)
{
char * rm_id;
char * queue_id;
gQueueID = generate_id();
asprintf(&rm_id, "%d", gBaseID);
asprintf(&queue_id, "%d", gQueueID);
proxy_svr_queue_msg(slurm_proxy, proxy_new_queue_event(trans_id, rm_id, queue_id, DEFAULT_QUEUE_NAME, QUEUE_STATE_NORMAL));
free(rm_id);
free(queue_id);
}
static void
sendProcessStateChangeEvent(int trans_id, ptp_job *j, char *state)
{
proxy_msg * m;
if (j == NULL || j->num_procs == 0)
return;
m = proxy_process_change_event(trans_id, rangeset_to_string(j->set), 1);
proxy_add_string_attribute(m, PROC_STATE_ATTR, state);
proxy_svr_queue_msg(slurm_proxy, m);
}
static void
sendJobStateChangeEvent(int trans_id, int jobid, char *state)
{
char * job_id;
proxy_msg * m;
asprintf(&job_id, "%d", jobid);
m = proxy_job_change_event(trans_id, job_id, 1);
proxy_add_string_attribute(m, JOB_STATE_ATTR, state);
proxy_svr_queue_msg(slurm_proxy, m);
free(job_id);
}
/*
* SLURM provides no process state, only job state.
* Let process state equal to job state.
*/
static void
sendProcessChangeEvent(int trans_id, ptp_process *p, int node_id, int task_id, int pid)
{
return;
}
static void
sendProcessOutputEvent(int trans_id, int procid, char *output)
{
char * proc_id;
proxy_msg * m;
asprintf(&proc_id, "%d", procid);
m = proxy_process_change_event(trans_id, proc_id, 1);
proxy_add_string_attribute(m, PROC_STDOUT_ATTR, output);
proxy_svr_queue_msg(slurm_proxy, m);
free(proc_id);
}
/*
* Get the number of compute nodes managed by SLURM.
*/
static int
get_num_nodes(int machid)
{
uint32_t cnt = 0;
node_info_msg_t * ninfo;
slurm_load_node((time_t)NULL, &ninfo, SHOW_ALL);
cnt = ninfo->record_count;
slurm_free_node_info_msg(ninfo);
return cnt;
}
/*
* Currently only ONE machine supported.
*/
static int
get_num_machines()
{
return 1;
}
/*
* Get hostname of the server node where slurmctld runs.
*/
static char *
get_machine_name(int num)
{
static char hostname[512];
gethostname(hostname, 512);
return hostname;
}
/*
* Cteate the node list for a machine.
*/
static int
create_node_list(ptp_machine *mach)
{
uint32_t cnt;
node_info_msg_t *nmsg;
ptp_node * node;
int i;
if (slurm_load_node((time_t)NULL,&nmsg, SHOW_ALL))
return -1;
cnt = nmsg->record_count;
for (i = 0; i < cnt; i++)
node = new_node(mach, nmsg->node_array + i);
return 0;
}
/*
* If we're under debug control, let the debugger handle process state update.
*
* Note: this will only be called if the debugger allows the program to
* reach MPI_Init(), which may never happen (e.g. if it's not an MPI program).
* Don't rely this to do anything for arbitrary jobs.
*
* Note also: the debugger manages process state updates so we don't need
* to send events back to the runtime.
*/
//static void
//debug_app_job_state_callback(orte_jobid_t jobid, orte_proc_state_t state)
//{
/* this is what it has before, untouched */
// switch(state) {
// case ORTE_PROC_STATE_TERMINATED:
// case ORTE_PROC_STATE_ABORTED:
// break;
// }
//}
/*
* job_state_callback for the debugger. Detects debugger start and exit and notifies the
* UI. Cleans up job id map.
*/
/*
static void
debug_job_state_callback(orte_jobid_t jobid, orte_proc_state_t state)
{
ptp_job * j;
if ((j = find_job(jobid, JOBID_DEBUG)) == NULL)
return;
do_state_callback(j, state);
}
*/
/*
* Tell the daemon to exit.
* Noops for SLURM.
*/
static int
do_slurm_shutdown(void)
{
debug_log(logfp, "do_slurm_shutdown() called.\n");
return 0;
}
/*
* gethostname_short - equivalent to gethostname, but return only the first
* component of the fully qualified name
* (e.g. "linux123.foo.bar" becomes "linux123")
*/
static int
gethostname_short (char *name, size_t len)
{
int error_code, name_len;
char *dot_ptr, path_name[1024];
error_code = gethostname (path_name, sizeof(path_name));
if (error_code)
return error_code;
dot_ptr = strchr (path_name, '.');
if (dot_ptr == NULL)
dot_ptr = path_name + strlen(path_name);
else
dot_ptr[0] = '\0';
name_len = dot_ptr - path_name;
if (name_len > len)
return ENAMETOOLONG;
strcpy (name, path_name);
return 0;
}
/*
* Set default srun options.
*/
static int
opt_default(srun_opt_t * opt)
{
char buf[MAXPATHLEN + 1];
struct passwd *pw;
int i;
char hostname[64];
if (opt == NULL)
return -1;
if ((pw = getpwuid(getuid())) != NULL) {
strncpy(opt->ps_user, pw->pw_name, MAX_USERNAME);
opt->ps_uid = pw->pw_uid;
} else {
debug_log(logfp, "opt_default:who are you?");
return -1;
}
opt->ps_gid = getgid();
if ((getcwd(buf, MAXPATHLEN)) == NULL) {
debug_log(logfp,"getcwd failed");
return -1;
}
opt->ps_cwd = strdup(buf);
opt->ps_cwd_set = false;
opt->ps_progname = NULL;
opt->ps_nprocs = 1;
opt->ps_nprocs_set = false;
opt->ps_cpus_per_task = 1;
opt->ps_cpus_set = false;
opt->ps_min_nodes = 1;
opt->ps_max_nodes = 0;
opt->ps_min_sockets_per_node = NO_VAL; /* requested min/maxsockets */
opt->ps_max_sockets_per_node = NO_VAL;
opt->ps_min_cores_per_socket = NO_VAL; /* requested min/maxcores */
opt->ps_max_cores_per_socket = NO_VAL;
opt->ps_min_threads_per_core = NO_VAL; /* requested min/maxthreads */
opt->ps_max_threads_per_core = NO_VAL;
opt->ps_ntasks_per_node = NO_VAL; /* ntask max limits */
opt->ps_ntasks_per_socket = NO_VAL;
opt->ps_ntasks_per_core = NO_VAL;
opt->ps_nodes_set = false;
opt->ps_cpu_bind_type = 0;
opt->ps_cpu_bind = NULL;
opt->ps_mem_bind_type = 0;
opt->ps_mem_bind = NULL;
opt->ps_time_limit = NO_VAL;
opt->ps_time_limit_str = NULL;
opt->ps_ckpt_interval = 0;
opt->ps_ckpt_interval_str = NULL;
opt->ps_ckpt_path = NULL;
opt->ps_partition = NULL;
//use default value:32
/*
opt->ps_max_threads = MAX_THREADS;
pmi_server_max_threads(opt->ps_max_threads);
*/
opt->ps_relative = NO_VAL;
opt->ps_relative_set = false;
opt->ps_job_name = NULL;
opt->ps_job_name_set = false;
opt->ps_jobid = NO_VAL;
opt->ps_jobid_set = false;
opt->ps_dependency = NULL;
opt->ps_account = NULL;
opt->ps_comment = NULL;
opt->ps_distribution = SLURM_DIST_UNKNOWN;
opt->ps_plane_size = NO_VAL;
opt->ps_ofname = NULL;
opt->ps_ifname = NULL;
opt->ps_efname = NULL;
opt->ps_core_type = CORE_DEFAULT;
opt->ps_labelio = false;
opt->ps_unbuffered = false;
opt->ps_overcommit = false;
opt->ps_shared = (uint16_t)NO_VAL;
opt->ps_exclusive = false;
opt->ps_no_kill = false;
opt->ps_kill_bad_exit = false;
opt->ps_immediate = false;
opt->ps_join = false;
slurm_ctl_conf_t * slurm_ctl_conf_ptr;
slurm_load_ctl_conf((time_t)NULL, &slurm_ctl_conf_ptr);
opt->ps_max_wait = slurm_ctl_conf_ptr->wait_time;
opt->ps_quit_on_intr = false;
opt->ps_disable_status = false;
opt->ps_test_only = false;
opt->ps_quiet = 0;
opt->ps_job_min_cpus = NO_VAL;
opt->ps_job_min_sockets = NO_VAL;
opt->ps_job_min_cores = NO_VAL;
opt->ps_job_min_threads = NO_VAL;
opt->ps_job_min_memory = NO_VAL;
opt->ps_task_mem = NO_VAL;
opt->ps_job_min_tmp_disk= NO_VAL;
opt->ps_hold = false;
opt->ps_constraints = NULL;
opt->ps_contiguous = false;
opt->ps_nodelist = NULL;
opt->ps_exc_nodes = NULL;
opt->ps_max_launch_time = 120;/* 120 seconds to launch job */
opt->ps_max_exit_timeout= 60; /* Warn user 60 seconds after task exit */
/* Default launch msg timeout */
opt->ps_msg_timeout = slurm_ctl_conf_ptr->msg_timeout;
for (i=0; i<SYSTEM_DIMENSIONS; i++)
opt->ps_geometry[i] = (uint16_t) NO_VAL;
opt->ps_reboot = false;
opt->ps_no_rotate = false;
opt->ps_conn_type = (uint16_t) NO_VAL;
opt->ps_blrtsimage = NULL;
opt->ps_linuximage = NULL;
opt->ps_mloaderimage = NULL;
opt->ps_ramdiskimage = NULL;
opt->ps_euid = (uid_t) -1;
opt->ps_egid = (gid_t) -1;
opt->ps_propagate = NULL; /* propagate specific rlimits */
opt->ps_prolog = slurm_ctl_conf_ptr->srun_prolog;
opt->ps_epilog = slurm_ctl_conf_ptr->srun_epilog;
opt->ps_task_prolog = NULL;
opt->ps_task_epilog = NULL;
gethostname_short(hostname, sizeof(hostname));
opt->ps_ctrl_comm_ifhn = strdup(hostname);
opt->ps_pty = false;
opt->ps_open_mode = 0;
opt->ps_acctg_freq = -1;
if (slurm_ctl_conf_ptr)
slurm_free_ctl_conf(slurm_ctl_conf_ptr);
return 0;
}
/*
* Initialize option defaults.
*/
static int
set_srun_options_defaults(srun_opt_t * opt)
{
return opt_default(opt);
}
/*
* Free space allocated for job "opt".
*/
static void
free_opt(srun_opt_t * opt)
{
if (opt == NULL)
return;
if (opt->ps_cwd_set == true && opt->ps_cwd != NULL)
free(opt->ps_cwd);
if (opt->ps_nodes_set == true && opt->ps_nodelist != NULL)
free(opt->ps_nodelist);
if (opt->ps_ctrl_comm_ifhn) /* allocated in opt_default() */
free(opt->ps_ctrl_comm_ifhn);
}
/*
* Free space allocated to save srun args in SLURM_SubmitJob().
*/
static void
free_srun_argv(int srun_argc,char ** srun_argv)
{
int i;
for (i = 0; i< srun_argc; i++) {
if (srun_argv[i] != NULL)
free(srun_argv[i]);
}
free(srun_argv);
}
/*
* Create job desc. msg from opts for job allocation.
* By now, -w/-x options are not supported.
*/
static job_desc_msg_t *
create_job_desc_msg_from_opts(srun_opt_t *opt)
{
job_desc_msg_t * j = NULL;
assert(opt != NULL);
if ((j = (job_desc_msg_t *)malloc(sizeof(job_desc_msg_t))) == NULL) {
debug_log(logfp, "Allocate job_desg_msg fail");
return NULL;
}
slurm_init_job_desc_msg(j);
if (opt->ps_account)
j->account = strdup(opt->ps_account);
j->contiguous = opt->ps_contiguous;
j->features = opt->ps_constraints;
j->immediate = (uint16_t)opt->ps_immediate;
j->name = opt->ps_job_name;
if (opt->ps_nodelist)
j->req_nodes = strdup(opt->ps_nodelist);
/*
* FIXME: handle -w nodelist request
*/
/*
if(j->req_nodes) {
hl = hostlist_create(j->req_nodes);
hostlist_ranged_string(hl, sizeof(buf), buf);
xfree(opt.nodelist);
opt.nodelist = xstrdup(buf);
hostlist_uniq(hl);
hostlist_ranged_string(hl, sizeof(buf), buf);
hostlist_destroy(hl);
xfree(j->req_nodes);
j->req_nodes = xstrdup(buf);
}
*/
if(opt->ps_distribution == SLURM_DIST_ARBITRARY
&& !j->req_nodes) {
debug_log(logfp,"With Arbitrary distribution you need to \
specify a nodelist or hostfile with the -w option");
return NULL;
}
j->exc_nodes = opt->ps_exc_nodes;
j->partition = opt->ps_partition;
j->min_nodes = opt->ps_min_nodes;
if (opt->ps_min_sockets_per_node != NO_VAL)
j->min_sockets = (uint16_t)opt->ps_min_sockets_per_node;
if (opt->ps_min_cores_per_socket != NO_VAL)
j->min_cores = (uint16_t)opt->ps_min_cores_per_socket;
if (opt->ps_min_threads_per_core != NO_VAL)
j->min_threads = (uint16_t)opt->ps_min_threads_per_core;
j->user_id = opt->ps_uid;
j->dependency = opt->ps_dependency;
if (opt->ps_nice)
j->nice = (uint16_t)(NICE_OFFSET + opt->ps_nice);
j->task_dist = (uint16_t)opt->ps_distribution;
if (opt->ps_plane_size != NO_VAL)
j->plane_size = (uint16_t)opt->ps_plane_size;
j->group_id = opt->ps_gid;
j->mail_type = opt->ps_mail_type;
if (opt->ps_ntasks_per_node != NO_VAL)
j->ntasks_per_node = (uint16_t)opt->ps_ntasks_per_node;
if (opt->ps_ntasks_per_socket != NO_VAL)
j->ntasks_per_socket = (uint16_t)opt->ps_ntasks_per_socket;
if (opt->ps_ntasks_per_core != NO_VAL)
j->ntasks_per_core =(uint16_t)opt->ps_ntasks_per_core;
if (opt->ps_mail_user)
j->mail_user = strdup(opt->ps_mail_user);
if (opt->ps_begin)
j->begin_time = opt->ps_begin;
if (opt->ps_licenses)
j->licenses = strdup(opt->ps_licenses);
if (opt->ps_network)
j->network = strdup(opt->ps_network);
if (opt->ps_comment)
j->comment = strdup(opt->ps_comment);
if (opt->ps_hold)
j->priority = 0;
if (opt->ps_jobid != NO_VAL)
j->job_id = opt->ps_jobid;
#if SYSTEM_DIMENSIONS
if (opt->ps_geometry[0] > 0) {
int i;
for (i=0; i<SYSTEM_DIMENSIONS; i++)
j->geometry[i] = opt->ps_geometry[i];
}
#endif
if (opt->ps_conn_type != (uint16_t) NO_VAL)
j->conn_type = opt->ps_conn_type;
if (opt->ps_reboot)
j->reboot = 1;
if (opt->ps_no_rotate)
j->rotate = 0;
if (opt->ps_blrtsimage)
j->blrtsimage = strdup(opt->ps_blrtsimage);
if (opt->ps_linuximage)
j->linuximage = strdup(opt->ps_linuximage);
if (opt->ps_mloaderimage)
j->mloaderimage = strdup(opt->ps_mloaderimage);
if (opt->ps_ramdiskimage)
j->ramdiskimage = strdup(opt->ps_ramdiskimage);
if (opt->ps_max_nodes)
j->max_nodes = opt->ps_max_nodes;
if (opt->ps_max_sockets_per_node)
j->max_sockets = opt->ps_max_sockets_per_node;
if (opt->ps_max_cores_per_socket)
j->max_cores = opt->ps_max_cores_per_socket;
if (opt->ps_max_threads_per_core)
j->max_threads = opt->ps_max_threads_per_core;
if (opt->ps_job_min_cpus != NO_VAL)
j->job_min_procs = opt->ps_job_min_cpus;
if (opt->ps_job_min_sockets != NO_VAL)
j->job_min_sockets = opt->ps_job_min_sockets;
if (opt->ps_job_min_cores != NO_VAL)
j->job_min_cores = opt->ps_job_min_cores;
if (opt->ps_job_min_threads != NO_VAL)
j->job_min_threads = opt->ps_job_min_threads;
if (opt->ps_job_min_memory != NO_VAL)
j->job_min_memory = opt->ps_job_min_memory;
if (opt->ps_job_min_tmp_disk != NO_VAL)
j->job_min_tmp_disk = opt->ps_job_min_tmp_disk;
if (opt->ps_overcommit) {
j->num_procs = opt->ps_min_nodes;
j->overcommit = opt->ps_overcommit;
} else
j->num_procs = opt->ps_nprocs * opt->ps_cpus_per_task;
if (opt->ps_nprocs_set)
j->num_tasks = opt->ps_nprocs;
if (opt->ps_cpus_set)
j->cpus_per_task = opt->ps_cpus_per_task;
if (opt->ps_no_kill)
j->kill_on_node_fail = 0;
if (opt->ps_time_limit != NO_VAL)
j->time_limit = opt->ps_time_limit;
j->shared = opt->ps_shared;
/*
* srun uses the same listening port for the allocation response
* message as for all other message.
* slurmctld_comm_addr structure initialized by slurmctld_msg_init()
*/
j->alloc_resp_port = slurmctld_comm_addr.port;
j->other_port = slurmctld_comm_addr.port;
return j;
}
/*
* Callback handlers for job allocation.
*/
static void
timeout_handler(srun_timeout_msg_t *msg)
{
static time_t last_timeout = 0;
if (msg->timeout != last_timeout) {
last_timeout = msg->timeout;
debug_log(logfp,"callback--->timeout_handler:");
debug_log(logfp,"job time limit to be reached at %s",
ctime(&msg->timeout));
}
}
static void
user_msg_handler(srun_user_msg_t *msg)
{
debug_log(logfp,"callback--->usr_msg_handler: %s", msg->msg);
}
static void
ping_handler(srun_ping_msg_t *msg)
{
debug_log(logfp,"callback--->pingt_handler:");
}
static void
node_fail_handler(srun_node_fail_msg_t *msg)
{
debug_log(logfp,"callback--->node_fail_handler: Node failure on %s", msg->nodelist);
}
static void
job_complete_handler(srun_job_complete_msg_t * msg)
{
debug_log(logfp,"callback--->job_complete_handler: Force Terminate job\n");
}
static bool
wait_retry()
{
static int count = 0;
if (count < MAX_RETRIES) {
sleep(++count);
return true;
}else
return false;
}
/*
* Install signo handler.
*/
static SigFunc *
xsignal(int signo, SigFunc *f)
{
struct sigaction sa, old_sa;
sa.sa_handler = f;
sigemptyset(&sa.sa_mask);
sigaddset(&sa.sa_mask, signo);
sa.sa_flags = 0;
if (sigaction(signo, &sa, &old_sa) < 0)
debug_log(logfp,"xsignal(%d) failed: %m", signo);
return old_sa.sa_handler;
}
/*
* Signal handler during allocating jobs.
*/
static void
signal_while_allocating(int signo)
{
destroy_job = 1;
if (sync_msg_addr && (sync_msg_addr->slurm_jobid > 0))
slurm_complete_job(sync_msg_addr->slurm_jobid, 0);
}
/*
* Pending callback during block job allocation.
*/
static void
set_pending_jobid(uint32_t id)
{
/*
* This callback can set the jobid even in the case of blocking allocation.
* So proxy agent can get the slurm jobid and will not block .
*/
if (sync_msg_addr != NULL) {
sync_msg_addr->slurm_jobid = (int) id;
sync_msg_addr->jobid_set = true;
}
}
static void
ignore_signal(int signo)
{
/*do nothing*/
}
/*
* Release allocated memory in job_desc_msg.
*/
static void
destroy_job_desc_msg(job_desc_msg_t * j)
{
if (j != NULL) {
if (j->account != NULL)
free(j->account);
if (j->req_nodes != NULL)
free(j->req_nodes);
if (j->mail_user != NULL)
free(j->mail_user);
if (j->licenses != NULL)
free(j->licenses);
if (j->network != NULL)
free(j->network);
if (j->comment != NULL)
free(j->comment);
if (j->blrtsimage != NULL)
free(j->blrtsimage);
if (j->linuximage != NULL)
free(j->linuximage);
if (j->mloaderimage != NULL)
free(j->mloaderimage);
if (j->ramdiskimage != NULL)
free(j->ramdiskimage);
free(j);
}
}
/*
* Create a socket to communicate with slurmctld
* during job allocation. This fd will be closed
* on executing srun after job allocation.
*/
slurm_fd
slurm_init_msg_engine_port(uint16_t port)
{
slurm_addr addr;
int fd;
int rc;
const int one = 1;
const size_t sz1 = sizeof(one);
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = htonl(INADDR_ANY);
if ((fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
debug_log(logfp,"create slurmctld socket error\n");
return fd;
}
rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,&one,sz1);
if (rc < 0) {
goto error;
}
rc = bind(fd, &addr, sizeof(addr));
if (rc < 0) {
goto error;
}
rc = listen(fd, 128);
if(rc < 0) {
goto error;
}
return fd;
error:
if ((close(fd) < 0) && (errno == EINTR))
close(fd); /*try again*/
return rc;
}
/*
* Init socket fd to handle message from slurmctld.
*/
slurm_fd
slurmctld_msg_init(srun_opt_t * opt)
{
slurm_addr slurm_address;
uint16_t port;
static slurm_fd slurmctld_fd = 0;
socklen_t name_len;
int fval;
int rc;
if (slurmctld_fd)
return slurmctld_fd;
slurmctld_fd = -1;
slurmctld_comm_addr.hostname = NULL;
slurmctld_comm_addr.port = 0;
/* open socket */
if ((slurmctld_fd = slurm_init_msg_engine_port(0)) < 0) {
debug_log(logfp, "slurm_init_msg_engine_port error\n");
return -1;
}
/* get socket port number */
name_len = sizeof(slurm_address);
if ((rc = getsockname(slurmctld_fd, &slurm_address, &name_len)) < 0) {
debug_log(logfp, "getsockname error\n");
return -1;
}
/* set non-blocking */
fval = fcntl(slurmctld_fd, F_GETFL, 0);
fcntl(slurmctld_fd, F_SETFL, fval|O_NONBLOCK);
/* set FD_CLOEXEC: close slurmctld_fd when exec(srun) after allocation */
fval = fcntl(slurmctld_fd, F_GETFD, 0);
fcntl(slurmctld_fd, F_SETFD, fval|FD_CLOEXEC);
/* set global var.: slurmctld_comm_addr */
slurmctld_comm_addr.hostname = strdup(opt->ps_ctrl_comm_ifhn);
port = ntohs(slurm_address.sin_port);
slurmctld_comm_addr.port = port;
return slurmctld_fd;
}
/*
* Allocate job resource in response to SubmitJob cmd.
* Called in child process
*/
static resource_allocation_response_msg_t *
allocate_nodes(srun_opt_t *opt)
{
resource_allocation_response_msg_t *resp = NULL;
slurm_allocation_callbacks_t callbacks;
slurmctld_msg_init(opt);
job_desc_msg_t *j = create_job_desc_msg_from_opts(opt);
if (!j)
return NULL;
/*
* Do not re-use existing job id when submitting new job
* from within a running job
*/
if ((j->job_id != NO_VAL) && !opt->ps_jobid_set) {
if (!opt->ps_jobid_set) /* Let slurmctld set jobid */
j->job_id = NO_VAL;
}
callbacks.ping = ping_handler;
callbacks.timeout = timeout_handler;
callbacks.job_complete = job_complete_handler;
callbacks.user_msg = user_msg_handler;
callbacks.node_fail = node_fail_handler;
/* create message thread to handle pings and such from slurmctld */
msg_thr = slurm_allocation_msg_thr_create(&j->other_port, &callbacks);
xsignal(SIGHUP, signal_while_allocating);
xsignal(SIGINT, signal_while_allocating);
xsignal(SIGQUIT, signal_while_allocating);
xsignal(SIGPIPE, signal_while_allocating);
xsignal(SIGTERM, signal_while_allocating);
xsignal(SIGUSR1, signal_while_allocating);
xsignal(SIGUSR2, signal_while_allocating);
// while (!resp)
{
/*
* BLOCK unitl allocation granted or interrupt by signal
* if allocation blocked/pending,
* the jobid can be returned to parent by 'set_pending_jobid' callback
*/
resp = slurm_allocate_resources_blocking(j, 0, set_pending_jobid);
if (resp == NULL)
debug_log(logfp, "blocking job allocate error!\n");
// if (destroy_job) /* interrupt by signal */
// break;
// else if(!resp && !wait_retry()) /* time out */
// break;
}
xsignal(SIGHUP, ignore_signal);
xsignal(SIGINT, ignore_signal);
xsignal(SIGQUIT, ignore_signal);
xsignal(SIGPIPE, ignore_signal);
xsignal(SIGTERM, ignore_signal);
xsignal(SIGUSR1, ignore_signal);
xsignal(SIGUSR2, ignore_signal);
destroy_job_desc_msg(j);
return resp;
}
static int
handle_attach_debug()
{
return 0;
}
static int
handle_debug()
{
return 0;
}
/*
* Create a thread to retrieve srun's stdout/stderr,
* and send to ptp ui.
*/
static int
create_iothread(ptp_job * job)
{
pthread_attr_t attr;
pthread_t iothread_id;
int rc;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
if (pthread_create(&iothread_id, &attr, srun_output_forwarding, (void *)job) == 0 ) {
job->iothread_id = iothread_id;
debug_log(logfp, "iothread created for job[%d].\n", job->slurm_jobid);
rc = 0;
} else {
debug_log(logfp,"error on creating iothread for job[%d].\n", job->slurm_jobid);
rc = -1;
}
return rc;
}
/*
* Allocate job resource if necessary, launch job
* and forward job stdout/stderr to ptp ui.
*/
static int
allocate_and_launch_job(int trans_id, char * jobsubid, ptp_job * in, srun_opt_t * opt, int srun_argc, char * srun_argv[])
{
int i;
int shmid;
int fd_out[2];
int fd_err[2];
bool need_alloc;
pid_t pid;
int rc;
char * ptr;
sync_msg * pp, *pc;
char * msg;
ptp_job * job;
char * name;
ptp_process * p;
int node_id = -1;
int task_id = -1;
int proc_pid = -1;
ptp_machine * m;
ptp_node * n;
int kill_jobid = -1;
int allocated_jobid = -1;
resource_allocation_response_msg_t * resp = NULL;
/* create a shared memory region to communiate/synchronize with child process */
if ((shmid = shmget(IPC_PRIVATE, sizeof(sync_msg), SHM_R|SHM_W)) < 0) {
debug_log(logfp,"error on creating shared memory\n");
return -1;
}
/* init shm */
pp = (sync_msg *)shmat(shmid, 0, 0);
pp->slurm_jobid = JOBID_INIT;
pp->jobid_set = false;
pp->io_ready = false;
pipe(fd_out);
pipe(fd_err);
if (opt->ps_jobid == NO_VAL && opt->ps_jobid_set == false)
need_alloc = true;
else {
need_alloc = false;
allocated_jobid = opt->ps_jobid;
}
switch (pid = fork()) {
case 0: /* child: allocate job if necessary and launch job via srun */
/* attach shm */
pc = (sync_msg *)shmat(shmid, 0, 0);
/*
* save shm addr in a global variable.
* if job allocation blocked, SLURM callback can store the pending job id at this addr,
* so parent can get the slurm jobid without blocking.
*/
sync_msg_addr = pc;
if (need_alloc) { /* require new job allocation */
resp = allocate_nodes(opt);
if (resp == NULL || resp->node_list == NULL) { /* allocation fail, rarely happen */
pc->slurm_jobid = JOBID_FAIL;
pc->jobid_set = true;
shmdt(pc);
exit(EXIT_JOB_ALLOC_FAIL);
}
/* allocation granted */
if (pc->jobid_set == false) {
pc->slurm_jobid = resp->job_id;
pc->jobid_set = true;
}
kill_jobid = resp->job_id;
asprintf(&ptr,"%d",(int)resp->job_id);
/* srun will no more allocate resource if SLURM_JOBID set. */
setenv("SLURM_JOBID", ptr, 1);
free(ptr);
if (resp)
slurm_free_resource_allocation_response_msg(resp);
}
else {
/*
* run in allocated job (required by ATTACH debug)
*/
pc->slurm_jobid = allocated_jobid;
pc->jobid_set = true;
asprintf(&ptr,"%d",allocated_jobid);
setenv("SLURM_JOBID", ptr, 1);
free(ptr);
}
/*
* BLOCK until parent sets io_ready to true,
* which means the io thread is ready.
* Parent sets this flag after getting jobid and create iothread.
*/
while (!pc->io_ready && wait_retry()) {
continue;
}
if (!pc->io_ready) {
if (need_alloc){
//slurm_complete_job(kill_jobid, 0);
slurm_kill_job(kill_jobid, SIGKILL, 0);
}
shmdt(pc);
debug_log(logfp, "child exits due to iothread fail\n");
exit(EXIT_JOB_IOTHREAD_FAIL);
} else { /* io thread ready */
shmdt(pc);
/* redirect srun's stdout and stderr */
close(fd_out[0]);
close(fd_err[0]);
/* job stdout+stderr ==> srun's stdout */
dup2(fd_out[1],1);
/* srun outputs ==> srun's stderr */
dup2(fd_err[1],2);
/* spawn job with srun cmd */
rc = execvp(srun_argv[0], srun_argv);
if (rc < 0) {/* rarely happens */
if (need_alloc) {
//slurm_complete_job(kill_jobid, 0);
slurm_kill_job(kill_jobid, SIGKILL, 0);
}
debug_log(logfp,"srun exec fail\n");
exit(EXIT_EXEC_FAIL);
}
}
break;
case -1:/* error */
debug_log(logfp,"child fork error\n");
close(fd_out[0]);
close(fd_out[1]);
close(fd_err[0]);
close(fd_err[1]);
if (pp)
shmdt(pp);
shmctl(shmid,IPC_RMID, 0);
return -1;
default:/* parent */
/*
* BLOCK until child set the slurm jobid
*/
while (!pp->jobid_set && wait_retry()) {
continue;
}
if (!pp->jobid_set || pp->slurm_jobid == JOBID_FAIL) {
msg = "Job allocation fail!\nPlease check RMS and job config parameters!";
debug_log(logfp, msg);
sendJobSubErrorEvent(trans_id, jobsubid, msg);
close(fd_out[0]);
close(fd_out[1]);
close(fd_err[0]);
close(fd_err[1]);
shmdt(pp);
kill(pid, SIGKILL);
shmctl(shmid, IPC_RMID, 0);
return -1;
}
/* ceate job structure after getting slurm jobid */
in->slurm_jobid = pp->slurm_jobid;
job = new_job(in->num_procs, in->debug, in->ptp_jobid, in->slurm_jobid, in->debug_jobid, need_alloc);
job->srun_pid = pid;
/* send OK event for SubmitJob cmd */
sendOKEvent(trans_id);
/* send NewJob event */
asprintf(&name, SLURM_JOB_NAME_FMT, job->slurm_jobid);
sendNewJobEvent(gTransID, job->ptp_jobid, name, jobsubid, JOB_STATE_INIT);
free(name);
/*
* As required by ptp ui,
* one NewProcess Event MUST be sent for each process of this new job.
*/
task_id = 0;
node_id = 0;
proc_pid = 0;
SetList(gMachineList);
/* By now, only 1 machine is supported */
m = (ptp_machine *)GetFirstElement(gMachineList);
SetList(m->nodes);
n = (ptp_node *)GetListElement(m->nodes);
node_id = n->id; /* node_id calculated by generateid() */
for (i = 0; i < job->num_procs; i++) {
/*
* SLURM provide no API to get pid ,node_id and task_id.
* And these information can obtained ONLY after job launching.
* So FAKE them by now.
* sendNewProcessEvent() shouldn't make use of these fileds.
* FIXME:
* call slurm_job_step_layout_get() to obtain such information
* and send to ui via sendProcessChangeEvent
*/
p = new_process(job, node_id, task_id, proc_pid);
sendNewProcessEvent(gTransID, job->ptp_jobid, p, PROC_STATE_STARTING);
task_id += 1;
proc_pid += 1;
}
close(fd_out[1]);
close(fd_err[1]);
job->fd_out = fd_out[0];
job->fd_err = fd_err[0];
/*
* Create io thread to manage srun's stderr and stdout
* Don't start it before sending the NewProcess events!
*/
if (create_iothread(job) == 0) {
pp->io_ready = true;
} else {
pp->io_ready = false;
job->iothread_exit = true;
}
shmdt(pp);
shmctl(shmid, IPC_RMID, 0);
return 0;
}
return 0;
}
/*
* Search $CWD and $PATH dir and return the absolute path of cmd.
*/
static char *
get_path(char * cmd, char * cwd, int mode)
{
char * path = NULL;
char * fullpath = NULL;
char * c;
char * ptr;
if ( (cmd[0] == '.' || cmd[0] == '/')
&& access(cmd, mode) == 0) {
if (cmd[0] == '.')
asprintf(&fullpath,"%s/%s",cwd,cmd);
else
asprintf(&fullpath,cmd);
} else {
/* search $PATH */
path = getenv("PATH");
if (path != NULL) {
c = ptr = path;
while((c=strchr(ptr,':')) != NULL) {
*c = '\0';
asprintf(&fullpath, "%s/%s",ptr,cmd);
if (access(fullpath, mode) == 0)
break;
else {
free(fullpath);
fullpath = NULL;
ptr = c + 1;
}
}
/* handle the last element */
if (*ptr != '\0') {
asprintf(&fullpath, "%s/%s",ptr,cmd);
if (access(fullpath, mode) != 0) {
free(fullpath);
fullpath = NULL;
}
}
}
}
return fullpath;
}
ssize_t fd_read_line(int fd, void *buf, size_t maxlen)
{
ssize_t n, rc;
unsigned char c, *p;
n = 0;
p = buf;
while (n < maxlen - 1) { /* reserve space for NUL-termination */
if ((rc = read(fd, &c, 1)) == 1) {
n++;
*p++ = c;
if (c == '\n')
break; /* store newline, like fgets() */
}else if (rc == 0) {
if (n == 0) /* EOF, no data read */
return(0);
else /* EOF, some data read */
break;
}else {
if (errno == EINTR)
continue;
return(-1);
}
}
*p = '\0'; /* NULL-terminate, like fgets() */
return(n);
}
/*
* Forwarding srun's stdout to ptp ui
* FIXME:
* How about srun's stderr? By now, it's ignored.
*/
void *
srun_output_forwarding(void * arg)
{
int fd = -1;
char * ptr;
ptp_job * job;
int task_id;
fd_set rfds;
struct timeval tv;
char buf[MAX_BUF_SIZE];
FILE * fp = NULL;
int cancel_state;
int cancel_type;
int ret;
char * p;
pid_t cpid;
int status;
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,&cancel_state);
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, &cancel_type);
job = (ptp_job *)arg;
job->iothread_exit = false;
/* By now only focus on job stdout */
fd = job->fd_out;
while (1)
{
if (job->iothread_exit_req) {
debug_log(logfp, "job[%d] iothread exit on exit_request.\n", job->slurm_jobid);
job->iothread_exit = true;
if (fp)
fclose(fp);
pthread_exit(NULL);
}
tv.tv_sec = 0;
tv.tv_usec = 5000;
/*
* if timeout, the rfds will be cleared
* rfds must be set in each iteration
*/
FD_ZERO(&rfds);
FD_SET(fd, &rfds);
ret = select(fd+1,&rfds,NULL,NULL,&tv);
switch(ret)
{
case -1: /* error */
debug_log(logfp,"job[%d] iothread exit on select error\n",job->slurm_jobid);
if (fp)
fclose(fp);
job->iothread_exit = true;
pthread_exit(NULL);
break;
case 0: /* timeout */
//debug_log(logfp,"select timeout\n");
break;
default: /* fd ready */
if (fp == NULL)
fp = fdopen(fd,"r");
if (fgets(buf, sizeof(buf), fp) != NULL) {
//if ((rc = fd_read_line(fd, buf, sizeof(buf)))> 0 ) {
p = buf;
/* get task id from srun label */
task_id = atoi(p);
ptr = strchr(p, ':');
ptr ++;
if (ptr != NULL) {
//debug_log(logfp,"send task[%d] output to ptp ui\n", task_id);
/*
* no synchronization needed
* since the event list is internally protected by pthread_mutex
*/
ptp_process * proc;
proc = find_process(job, task_id);
sendProcessOutputEvent(gTransID, proc->id, ptr);
} else
debug_log(logfp,"process label not found\n");
} else { /* error or EOF of pipe(write end closed) */
debug_log(logfp,"EOF of pipe\n");
if (fp)
fclose(fp);
job->iothread_exit = true;
cpid = waitpid(job->srun_pid, &status, 0);
if (cpid == job->srun_pid) {
if (WIFEXITED(status)) {
if (job->need_alloc)
slurm_complete_job(job->slurm_jobid, 0);
debug_log(logfp, "srun exit code:%d\n", WEXITSTATUS(status));
}
else if (WIFSIGNALED(status)) {
if (job->need_alloc)
slurm_kill_job(job->slurm_jobid, SIGKILL, 0);
debug_log(logfp, "srun terminated by signal[%d]\n", WTERMSIG(status));
}
pthread_exit(NULL);
}
}
break;
}
}
return NULL;
}
/*
* Delete removable job fro gJobList.
*/
static void
purge_global_joblist()
{
ptp_job * j;
for (SetList(gJobList); (j = (ptp_job *)GetListElement(gJobList)) != NULL; ) {
if (j->iothread_exit && !slurm_job_active(j))
j->removable = true;
if (j->removable) {
RemoveFromList(gJobList, j);
free_job(j);
}
}
return;
}
/******************************
* START OF DISPATCH ROUTINES *
******************************/
int
SLURM_Initialize(int trans_id, int nargs, char **args)
{
int i;
int primary = 1;
int secondary = 2;
long version;
debug_log(logfp, "SLURM_Initialize (%d):\n", trans_id);
if (proxy_state != STATE_INIT) {
sendErrorEvent(trans_id, RTEV_ERROR_SLURM_INIT, "Already initialized");
return PROXY_RES_OK;
}
for (i = 0; i < nargs; i++) {
if (proxy_test_attribute(PROTOCOL_VERSION_ATTR, args[i])) {
if (strcmp(proxy_get_attribute_value_str(args[i]), WIRE_PROTOCOL_VERSION) != 0) {
sendErrorEvent(trans_id, RTEV_ERROR_SLURM_INIT, "Wire protocol version \"%s\" not supported", args[0]);
return PROXY_RES_OK;
}
} else if (proxy_test_attribute(BASE_ID_ATTR, args[i]))
gBaseID = proxy_get_attribute_value_int(args[i]);
}
if (gBaseID < 0) {
sendErrorEvent(trans_id, RTEV_ERROR_SLURM_INIT, "No base ID supplied");
return PROXY_RES_OK;
}
/* confirm slurmctld works well via slurm_ping */
if (slurm_ping(primary) && slurm_ping(secondary)) {
sendErrorEvent(trans_id, RTEV_ERROR_SLURM_INIT, "No response from slurmctld. Check SLURM RMS!");
return PROXY_RES_OK;
}
/*
* SLURM version verfication,
* Should work on more versions supporting used API.
*/
version = slurm_api_version();
if (version < SLURM_VERSION_NUM(1,3,4)) {
sendErrorEvent(trans_id, RTEV_ERROR_SLURM_INIT, "SLURM version number too low");
return PROXY_RES_OK;
}
proxy_state = STATE_RUNNING;
sendOKEvent(trans_id);
return PROXY_RES_OK;
}
/*
* Init the model definition phase.
*/
int
SLURM_ModelDef(int trans_id, int nargs, char **args)
{
debug_log(logfp, "SLURM_ModelDef (%d):\n", trans_id);
sendOKEvent(trans_id);
return PROXY_RES_OK;
}
int
SLURM_StopEvents(int trans_id, int nargs, char **args)
{
debug_log(logfp, "SLURM_StopEvents (%d):\n", trans_id);
/* Notify that tartEvents complete */
sendOKEvent(gTransID);
gTransID = 0;
sendOKEvent(trans_id);
return PROXY_RES_OK;
}
/*
* Submit a job with the given executable path and arguments.
* (1)Process cmd arguments;
* (2)Distinguish between debug job and non-debug job;
* (3)Allocate resource and spawn job step.
*/
int
SLURM_SubmitJob(int trans_id, int nargs, char **args)
{
int i,k;
int num_args = 0;
int num_env = 0;
char * jobsubid = NULL; /* jobid assigned by RMS(ui) */
/* srun options: -n, -N, -t, -p, -l, -v, --jobid, -w, -x, --job_type */
int num_procs = 0; /* -n: number of tasks to run*/
int num_nodes = 0; /* -N: number of nodes on which to run (N=min[-max]) */
int tlimit = -1; /* -t: time limit */
bool tlimit_set = false;
char * partition = NULL; /* -p: partition requested */
bool partition_set = false;
bool io_label = false; /* -l: prepend task number to lines of stdout/err */
bool io_label_set = false;
bool verbose = false; /* -v:verbose mode */
bool verbose_set = false;
char * node_list=NULL; /* -w: request a specific list of hosts */
bool nodelist_set = false;
int allocated_jobid; /* --jobid: run under already allocated job */
bool jobid_set = false;
char * jobtype = NULL; /* --job_type:mpi,omp,serial */
bool jobtype_set = false;
char * full_path = NULL;
char * cwd = NULL;
char * exec_path = NULL; /* PATH of executable */
char * pgm_name = NULL;
/* debug job support */
bool debug = false;
int debug_argc = 0;
int attach_mode;
char * debug_exec_name = NULL;
char * debug_exec_path = NULL;
char * debug_full_path;
char ** debug_args = NULL;
int srun_argc = 0;
char ** srun_argv = NULL;
int ret;
int ptpid = generate_id();
debug_log(logfp, "SLURM_SubmitJob (%d):\n", trans_id);
/* Process job submit args */
debug_log(logfp, "job submit commands:\n");
for (i = 0; i < nargs; i++) {
debug_log(logfp, "\t%s\n", args[i]);
if (proxy_test_attribute(SLURM_JOB_SUB_ID_ATTR, args[i])) {
jobsubid = proxy_get_attribute_value_str(args[i]);
} else if (proxy_test_attribute(SLURM_JOB_NUM_PROCS_ATTR, args[i])) {
num_procs = proxy_get_attribute_value_int(args[i]);
} else if (proxy_test_attribute(SLURM_JOB_NUM_NODES_ATTR, args[i])) {
num_nodes = proxy_get_attribute_value_int(args[i]);
} else if (proxy_test_attribute(SLURM_JOB_TIME_LIMIT_ATTR, args[i])) {
tlimit_set = true;
tlimit = proxy_get_attribute_value_int(args[i]);
} else if (proxy_test_attribute(SLURM_JOB_PARTITION_ATTR, args[i])) {
partition_set = true;
partition = proxy_get_attribute_value_str(args[i]);
} else if (proxy_test_attribute(SLURM_JOB_IOLABEL_ATTR, args[i])) {
io_label_set = true;
io_label = proxy_get_attribute_value_bool(args[i]);
} else if (proxy_test_attribute(SLURM_JOB_VERBOSE_ATTR, args[i])) {
verbose_set = true;
verbose = proxy_get_attribute_value_bool(args[i]);
} else if (proxy_test_attribute(SLURM_JOB_ID_ATTR, args[i])) {
jobid_set = true;
allocated_jobid = proxy_get_attribute_value_int(args[i]);
} else if (proxy_test_attribute(SLURM_JOB_TYPE_ATTR, args[i])) {
jobtype_set = true;
jobtype = proxy_get_attribute_value_str(args[i]);
} else if (proxy_test_attribute(SLURM_JOB_NODELIST_ATTR, args[i])) {
nodelist_set = true;
node_list = proxy_get_attribute_value_str(args[i]);
} else if (proxy_test_attribute(SLURM_JOB_EXEC_NAME_ATTR, args[i])) {
pgm_name = proxy_get_attribute_value_str(args[i]);
} else if (proxy_test_attribute(SLURM_JOB_EXEC_PATH_ATTR, args[i])) {
exec_path = proxy_get_attribute_value_str(args[i]);
} else if (proxy_test_attribute(SLURM_JOB_WORKING_DIR_ATTR, args[i])) {
cwd = proxy_get_attribute_value_str(args[i]);
} else if (proxy_test_attribute(SLURM_JOB_PROG_ARGS_ATTR, args[i])) {
num_args++;
} else if (proxy_test_attribute(SLURM_JOB_ENV_ATTR, args[i])) {
num_env++;
} else if (proxy_test_attribute(SLURM_JOB_DEBUG_ARGS_ATTR, args[i])) {
debug_argc++;
} else if (proxy_test_attribute(SLURM_JOB_DEBUG_FLAG_ATTR, args[i])) {
debug = proxy_get_attribute_value_bool(args[i]);
}
}
/* Do some checking first */
if (jobsubid == NULL) {
sendErrorEvent(trans_id, RTEV_ERROR_SLURM_SUBMIT, "missing ID on job submission");
return PROXY_RES_OK;
}
if (proxy_state != STATE_RUNNING) {
sendJobSubErrorEvent(trans_id, jobsubid, "must call INIT first");
return PROXY_RES_OK;
}
if (nargs < 1) {
sendJobSubErrorEvent(trans_id, jobsubid, "incorrect arg count");
return PROXY_RES_OK;
}
if (pgm_name == NULL) {
sendJobSubErrorEvent(trans_id, jobsubid, "must specify a program name");
return PROXY_RES_OK;
}
if (num_procs <= 0) {
sendJobSubErrorEvent(trans_id, jobsubid, "must specify number of task to launch");
return PROXY_RES_OK;
}
/*
* Process environment variables.
* Environment variables will be brought to compute node
* by SLURM before launching job.
*/
if (num_env > 0) {
for (i = 0; i < nargs; i++) {
if (proxy_test_attribute(SLURM_JOB_ENV_ATTR, args[i]))
putenv(proxy_get_attribute_value_str(args[i]));
}
}
/* locate execuable */
if (exec_path == NULL) {
full_path = get_path(pgm_name, cwd, X_OK);
if (full_path == NULL) {
sendJobSubErrorEvent(trans_id, jobsubid, "executable not found");
return PROXY_RES_OK;
}
} else
asprintf(&full_path, "%s/%s", exec_path, pgm_name);
/* check access right */
if (access(full_path, X_OK) < 0) {
sendJobSubErrorEvent(trans_id, jobsubid, strerror(errno));
if (full_path != NULL)
free(full_path);
return PROXY_RES_OK;
}
/* allocate space for srun args */
srun_argv = (char **)malloc(sizeof(char *)*MAX_SRUN_ARG_NUM);
if (srun_argv == NULL) {
debug_log(logfp, "memory allocation for srun_argv fail");
sendJobSubErrorEvent(trans_id, jobsubid, "memory allocation for srun_args fail");
return PROXY_RES_OK;
}
/*
*FIXME: more debug job support will be added soon.
*/
/****************handle debug job*****************/
if (debug) {
debug_argc++;
debug_args = (char **)malloc((debug_argc+1) * sizeof(char *));
for (i = 0, k = 1; i < nargs; i++) {
if (proxy_test_attribute(SLURM_JOB_DEBUG_ARGS_ATTR, args[i])) {
debug_args[k++] = proxy_get_attribute_value_str(args[i]);
} else if (proxy_test_attribute(SLURM_JOB_DEBUG_EXEC_NAME_ATTR, args[i])) {
debug_exec_name = proxy_get_attribute_value_str(args[i]);
} else if (proxy_test_attribute(SLURM_JOB_DEBUG_EXEC_PATH_ATTR, args[i])) {
debug_exec_path = proxy_get_attribute_value_str(args[i]);
}
}
debug_args[k] = NULL;
/*
* If no path specified, try to locate execuable.
*/
if (debug_exec_path == NULL) {
debug_full_path = get_path(debug_exec_name, cwd, X_OK);
if (debug_full_path == NULL) {
sendJobSubErrorEvent(trans_id, jobsubid, "Debugger executuable not found");
return PROXY_RES_OK;
}
} else {
asprintf(&debug_full_path, "%s/%s", debug_exec_path, debug_exec_name);
}
if (access(debug_full_path, X_OK) < 0) {
sendJobSubErrorEvent(trans_id, jobsubid, strerror(errno));
if (debug_full_path != NULL)
free(debug_full_path);
return PROXY_RES_OK;
}
debug_args[0] = strdup(debug_full_path);
if (debug_full_path != NULL)
free(debug_full_path);
}
/*******************************************************/
/* set default srun options */
set_srun_options_defaults(&opt);
/*
* change srun options based on SubmitJob cmd args
* and prepare srun_argc,srun_argv
*/
int index = 0;
srun_argv[index++] = strdup("srun");
if (num_procs > 0) {
opt.ps_nprocs = num_procs;
opt.ps_nprocs_set = true;
asprintf(&(srun_argv[index]), "--ntasks=%d", opt.ps_nprocs);
index += 1;
}
if (num_nodes > 0) {
opt.ps_min_nodes = num_nodes;
opt.ps_nodes_set = true;
asprintf(&(srun_argv[index]), "--nodes=%d", opt.ps_min_nodes);
index += 1;
}
if (tlimit_set) {
opt.ps_time_limit = tlimit;
asprintf(&(srun_argv[index]), "--time=%d", opt.ps_time_limit);
index += 1;
}
if (partition_set) {
opt.ps_partition = partition;
asprintf(&(srun_argv[index]), "--partition=%s", opt.ps_partition);
index += 1;
}
/* To distinguish task stdout, this option MUST be set */
io_label_set = true;
if (io_label_set) {
opt.ps_labelio = io_label;
asprintf(&(srun_argv[index]), "--label");
index += 1;
}
if (verbose_set) {
asprintf(&(srun_argv[index]), "--verbose");
index += 1;
}
if (jobid_set) {
opt.ps_jobid = allocated_jobid;
opt.ps_jobid_set = true;
asprintf(&(srun_argv[index]), "--jobid=%d", opt.ps_jobid);
index += 1;
}
/*if (jobtype_set) {
opt.ps_jobtype = jobtype;
asprintf(&(srun_argv[index]), "--jobtype=%s", opt.ps_jobtype);
index += 1;
}
*/
if (nodelist_set) {
opt.ps_nodelist = strdup(node_list);
opt.ps_nodes_set = true;
asprintf(&(srun_argv[index]), "--nodelist=%s", opt.ps_nodelist);
index += 1;
}
if (cwd) {
if (opt.ps_cwd != NULL)
free(opt.ps_cwd);
opt.ps_cwd = strdup(cwd);
opt.ps_cwd_set = true;
}
opt.ps_progname = full_path;
/* set job name,otherwise be NULL */
opt.ps_job_name = basename(full_path);
asprintf(&(srun_argv[index]), "%s", opt.ps_progname);
index += 1;
/* program name followd by args */
if (num_args > 0) {
for (i = 0; i < nargs; i++) {
if (proxy_test_attribute(SLURM_JOB_PROG_ARGS_ATTR, args[i])) {
asprintf(&(srun_argv[index]), "%s", proxy_get_attribute_value_str(args[i]));
index += 1;
}
}
}
srun_argv[index] = NULL; /* mark the end of srun_argv[] */
srun_argc = index;
debug_log(logfp, "srun cmd:");
for (i = 0; i < srun_argc; i++)
debug_log(logfp,"%s ",srun_argv[i]);
debug_log(logfp, "\n");
/*
* FIXME: By now, debug job is not supported.
*/
if (debug) {
if (attach_mode) {
ret = handle_attach_debug();
} else {
ret = handle_debug();
}
} else {
ptp_job j;
j.debug = debug;
j.ptp_jobid = ptpid; /* model element id generated by proxy agent */
j.num_procs = num_procs;
allocate_and_launch_job(trans_id, jobsubid, &j, &opt, srun_argc, srun_argv);
free_opt(&opt);
free_srun_argv(srun_argc,srun_argv);
}
return PROXY_RES_OK;
}
/*
* Cancel job, given ptp jobid (not slurm jobid).
*/
int
SLURM_TerminateJob(int trans_id, int nargs, char **args)
{
int i;
int ptp_jobid = -1;
ptp_job * j;
if (proxy_state != STATE_RUNNING) {
sendErrorEvent(trans_id, RTEV_ERROR_JOB, "must call INIT first");
return PROXY_RES_OK;
}
for (i = 0; i < nargs; i++) {
if (proxy_test_attribute(JOB_ID_ATTR, args[i])) {
ptp_jobid = proxy_get_attribute_value_int(args[i]);
break;
}
}
if (ptp_jobid < 0) {
sendJobTerminateErrorEvent(trans_id, ptp_jobid, "invalid jobid ");
return PROXY_RES_OK;
}
/* convert ptp jobid to slurm jobid */
if ((j = find_job(ptp_jobid, PTP_JOBID)) != NULL) {
/*
* Kill all job steps and request iothread to exit.
* Removing job structure from the global job list
* is left to purge_global_joblist().
*/
kill(j->srun_pid, SIGKILL);
j->iothread_exit_req = true;
if (j->need_alloc) {
slurm_kill_job(j->slurm_jobid, SIGKILL, 0);
}
}
sendOKEvent(trans_id);
return PROXY_RES_OK;
}
/*
* Enable sending of events.
* The first msg that must be sent is a description of the model.
* This comprises NEW model element events
* (NewMachine, NewNode, NewQueue) for each element in the model.
* Once the model description has been sent, model change events will be sent as detected.
*/
int
SLURM_StartEvents(int trans_id, int nargs, char **args)
{
int num_machines;
int m;
ptp_machine * mach;
int num_nodes;
debug_log(logfp, "SLURM_StartEvents (%d):\n", trans_id);
if (proxy_state != STATE_RUNNING) {
sendErrorEvent(trans_id, RTEV_ERROR_SLURM_INIT, "Must call INIT first");
return PROXY_RES_OK;
}
/* NodeChange, JobChange event use gTransID as TID to match START_EVENTS cmd */
gTransID = trans_id;
/*
* FIXME: how to handle partition information in SLURM?
*/
num_machines = get_num_machines();
for(m = 0; m < num_machines; m++) {
mach = new_machine();
/* NewMachine element */
sendNewMachineEvent(trans_id, mach->id, get_machine_name(m));
num_nodes = get_num_nodes(mach->id);
if(create_node_list(mach)) {
sendErrorEvent(trans_id, RTEV_ERROR_NATTR, "Fail to create nodelist");
return PROXY_RES_OK;
}
/* NewNode element */
sendNewNodeEvent(trans_id, mach->id, mach);
}
/* NewQueue element */
sendNewQueueEvent(trans_id);
/* From now on, job state and node state update msg can be sent */
enable_state_update = true;
return PROXY_RES_OK;
}
/*
* Compitable interface.
* Proxy not allowed to stop SLURM rms.
*/
int
SLURM_Quit(int trans_id, int nargs, char **args)
{
int old_state = proxy_state;
debug_log(logfp, "SLURM_Quit called\n");
proxy_state = STATE_SHUTTING_DOWN;
if (old_state == STATE_RUNNING)
do_slurm_shutdown();
sendShutdownEvent(trans_id);
return PROXY_RES_OK;
}
/******************************
* END OF DISPATCH ROUTINES *
******************************/
/*
* Init jobstate_update_timer.
*/
static void
init_job_timer()
{
gettimeofday(&job_update_timer, NULL);
return;
}
/*
* Init nodestate_update_timer.
*/
static void
init_node_timer()
{
gettimeofday(&node_update_timer, NULL);
return;
}
/*
* Check if timer expires given timeout value.
*/
static bool
update_timeout(int timer_id, const int timeout)
{
struct timeval * timer;
struct timeval now;
int val;
bool rc = false;
switch (timer_id) {
case JOB_UPDATE_TIMER:
timer = &job_update_timer;
break;
case NODE_UPDATE_TIMER:
timer = &node_update_timer;
break;
default:
return false;
}
gettimeofday(&now, NULL);
val = (now.tv_sec - timer->tv_sec) * 1000000 + (now.tv_usec - timer->tv_usec) - timeout;
if (val >= 0) {
/* update timer */
gettimeofday(timer, NULL);
rc = true;
}
return rc;
}
/*
* Wrapper routine to check job_update_timer.
*/
static bool
job_update_timeout()
{
return update_timeout(JOB_UPDATE_TIMER, JOB_UPDATE_TIMEOUT);
}
/*
* Wrapper routine to check node_update_timer.
*/
static bool
node_update_timeout()
{
return update_timeout(NODE_UPDATE_TIMER, NODE_UPDATE_TIMEOUT);
}
/*
* Update job/process state and send state CHANGE to ui.
*/
static void
update_job_state(int slurm_jobid)
{
int i;
int errcode;
bool job_find;
ptp_job * j;
job_info_msg_t * msg = NULL;
errcode = slurm_load_jobs((time_t)NULL, &msg, SHOW_ALL);
if (errcode) {
debug_log(logfp,"slurm_load_jobs error");
return;
}
for (SetList(gJobList); (j = (ptp_job *)GetListElement(gJobList)) != NULL;) {
if (slurm_jobid > -1) {
if (j->slurm_jobid != slurm_jobid)
continue;
}
job_find = false;
for (i = 0; i < msg->record_count; i++) {
if (j->slurm_jobid == (msg->job_array[i]).job_id) {
job_find = true;
if (j->state != (msg->job_array[i]).job_state) { /*state change*/
j->state = (msg->job_array[i]).job_state;
/*
* SLURM doesn't provide process state.
* Force process state changs with job state.
*/
sendProcessStateChangeEvent(gTransID, j, jobstate_to_string(j->state));
sendJobStateChangeEvent(gTransID, j->ptp_jobid, jobstate_to_string(j->state));
}
break;
}
}
if (!job_find) {
/*
* job not found(rarely happens).
* In this case, simply mark this job removable.
* SLURM keep the informatin of complete/fail jobs for MinJobAge (default to 300) seconds
* MinJobAge can be set in slurm/etc/slurm.conf.
*/
j->removable = true;
}
if (slurm_jobid > -1)
break;
}
slurm_free_job_info_msg(msg);
return;
}
/*
* Update ALL nodes state and send state CHANGE to ui.
*/
void
update_node_state()
{
int i;
ptp_node * node;
int errcode;
node_info_msg_t * msg;
rangeset * unknown_set = new_rangeset();
rangeset * idle_set = new_rangeset();
rangeset * down_set = new_rangeset();
rangeset * allocated_set = new_rangeset();
if (unknown_set == NULL || idle_set == NULL || down_set == NULL || allocated_set == NULL)
goto cleanup;
errcode = slurm_load_node((time_t)NULL, &msg, SHOW_ALL);
if (errcode) {
debug_log(logfp,"slurm_load_node error.\n");
return;
} else {
for (i = 0; i < msg->record_count; i++) {
node = find_node_by_name(msg->node_array[i].name);
if (node->state == msg->node_array[i].node_state)
continue;
else { /* node state change */
node->state = msg->node_array[i].node_state;
switch (msg->node_array[i].node_state & NODE_STATE_BASE) {
case NODE_STATE_UNKNOWN:
insert_in_rangeset(unknown_set,node->id);
break;
case NODE_STATE_DOWN:
insert_in_rangeset(down_set,node->id);
break;
case NODE_STATE_IDLE:
insert_in_rangeset(idle_set,node->id);
break;
case NODE_STATE_ALLOCATED:
insert_in_rangeset(allocated_set,node->id);
break;
default:
debug_log(logfp, "unrecognized node state\n");
break;
}
}
}
if (!EmptyList(unknown_set->elements)) {
sendNodeChangeEvent(gTransID,rangeset_to_string(unknown_set),nodestate_to_string(NODE_STATE_UNKNOWN));
}
if (!EmptyList(down_set->elements)) {
sendNodeChangeEvent(gTransID,rangeset_to_string(down_set),nodestate_to_string(NODE_STATE_DOWN));
}
if (!EmptyList(idle_set->elements)) {
sendNodeChangeEvent(gTransID,rangeset_to_string(idle_set),nodestate_to_string(NODE_STATE_IDLE));
}
if (!EmptyList(allocated_set->elements)) {
sendNodeChangeEvent(gTransID,rangeset_to_string(allocated_set),nodestate_to_string(NODE_STATE_ALLOCATED));
}
slurm_free_node_info_msg(msg);
}
cleanup:
if(unknown_set)
free_rangeset(unknown_set);
if(down_set)
free_rangeset(down_set);
if(idle_set)
free_rangeset(idle_set);
if (allocated_set)
free_rangeset(allocated_set);
return;
}
/*
* signal handler of slurm proxy.
*/
RETSIGTYPE
ptp_signal_handler(int sig)
{
if (sig != SIGCHLD) /* proxy doesn't exit on SIGCHLD */
ptp_signal_exit = sig;
}
/*
* Cleanup work on proxy exiting:
* kill srun process, release job resource,
* terminate io_thread,and free space.
*/
static void
destroy_global_joblist()
{
ptp_job * j;
for (SetList(gJobList); (j = (ptp_job *)GetListElement(gJobList)) != NULL; ) {
if (slurm_job_active(j)) {
kill(j->srun_pid, SIGKILL);
slurm_kill_job(j->slurm_jobid, SIGKILL, 0);
}
if (j->iothread_exit == false)
j->iothread_exit_req = true;
RemoveFromList(gJobList, j);
free_job(j);
}
return;
}
static int
server(char *name, char *host, int port)
{
int rc = 0;
struct timeval timeout = {0, 20000};
gJobList = NewList();
gMachineList = NewList();
init_job_timer();
init_node_timer();
if (proxy_svr_init(name, &timeout, &helper_funcs, &command_tab, &slurm_proxy) != PROXY_RES_OK) {
debug_log(logfp, "proxy failed to initialized\n");
return 0;
}
if (proxy_svr_connect(slurm_proxy, host, port) == PROXY_RES_OK) {
debug_log(logfp, "proxy connected\n");
while (ptp_signal_exit == 0 && proxy_state != STATE_SHUTDOWN) {
if (proxy_state == STATE_SHUTTING_DOWN) {
proxy_state = STATE_SHUTDOWN;
}
if (proxy_svr_progress(slurm_proxy) != PROXY_RES_OK)
break;
/* update job and node state */
if (enable_state_update) {
if (job_update_timeout())
update_job_state(ALL_JOBSTATE);
if (node_update_timeout())
update_node_state();
}
/* delete removable job */
purge_global_joblist();
}
if (ptp_signal_exit != 0) {
if (proxy_state != STATE_SHUTTING_DOWN
&& proxy_state != STATE_SHUTDOWN) {
do_slurm_shutdown();
}
destroy_global_joblist();
/* our return code = the signal that fired */
rc = ptp_signal_exit;
debug_log(logfp, "ptp_slurm_proxy terminated by signal [%d]\n", ptp_signal_exit);
}
} else
debug_log(logfp, "proxy connection failed\n");
proxy_svr_finish(slurm_proxy);
return rc;
}
/*
* Entry routine
*/
int
main(int argc, char *argv[])
{
int ch;
int port = PROXY_TCP_PORT;
char * host = DEFAULT_HOST;
char * proxy_str = DEFAULT_PROXY;
int rc;
while ((ch = getopt_long(argc, argv, "P:p:h:", longopts, NULL)) != -1){
switch (ch) {
case 'P':
proxy_str = optarg;
break;
case 'p':
port = (int)strtol(optarg, NULL, 10);
break;
case 'h':
host = optarg;
break;
default:
fprintf(stderr, "%s [--proxy=proxy] [--host=host_name] [--port=port] \n", argv[0]);
return 1;
}
}
//putenv("PTP_SLURM_PROXY_LOGDIR=$HOME/log");
logfp = init_logfp();
/*
* signal can happen any time after handlers are installed, so
* make sure we catch it.
*/
ptp_signal_exit = 0;
/* setup signal handlers */
xsignal(SIGINT, ptp_signal_handler);
xsignal(SIGHUP, ptp_signal_handler);
xsignal(SIGILL, ptp_signal_handler);
xsignal(SIGSEGV, ptp_signal_handler);
xsignal(SIGTERM, ptp_signal_handler);
xsignal(SIGQUIT, ptp_signal_handler);
xsignal(SIGABRT, ptp_signal_handler);
xsignal(SIGCHLD, ptp_signal_handler);
rc = server(proxy_str, host, port);
return rc;
}