blob: 0848954d640d2b5ea3cc79ccf0d3c2d275832a6d [file] [log] [blame]
/******************************************************************************
* Copyright (c) 2005 The Regents of the University of California.
* This material was produced under U.S. Government contract W-7405-ENG-36
* for Los Alamos National Laboratory, which is operated by the University
* of California for the U.S. Department of Energy. The U.S. Government has
* rights to use, reproduce, and distribute this software. NEITHER THE
* ASSUMES ANY LIABILITY FOR THE USE OF THIS SOFTWARE. If software is modified
* to produce derivative works, such modified software should be clearly
* marked, so as not to confuse it with the version available from LANL.
*
* Additionally, this program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* LA-CC 04-115
*
* Copyright (c) 2005, 2008 IBM Corporation and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0s
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
******************************************************************************/
/*
* Questions/problems
* 22) Should probably serialize around process and node object generation to
* simplify generation of node and process state change messages affecting
* multiple nodes or processes with consecutive object id ranges
* 23) For PE/LL case, machine_id and queue_id need to be set to the machine id and
* queue id for the cluster that contains the node where this proxy is running.
* (LoadLeveler restricts interactive PE applications to running only on the
* cluster where the poe process runs, which in our case is the node where the
* proxy runs)
*/
#include <pthread.h>
#include "config.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/wait.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <strings.h>
#include <errno.h>
#include <pwd.h>
#include <grp.h>
#include <dirent.h>
#include <proxy_cmd.h>
#include <proxy.h>
#include <proxy_tcp.h>
#include <proxy_event.h>
#include <proxy_cmd.h>
#include <proxy_msg.h>
#include <proxy_attr.h>
#include <handler.h>
#include <signal.h>
#include <list.h>
#include <hash.h>
#include <limits.h>
#include <dlfcn.h>
#ifdef __linux__
#include <getopt.h>
#endif
#ifdef _AIX
#include <procinfo.h>
#endif
#ifdef HAVE_LLAPI_H
#include <llapi.h>
#endif
#define DEFAULT_PROXY "tcp"
#define DEFAULT_QUEUE_NAME "default"
#define SKIPPING_SPACES 1
#define PARSING_UNQUOTED_ARG 2
#define PARSING_QUOTED_ARG 3
#define SKIPPING_CHARS 4
#define READ_BUFFER_SIZE 1024
#define STDIO_WRITE_BUFSIZE 1024
#ifndef POE
#define POE "/usr/bin/poe"
#endif /* POE */
#define INFO_MESSAGE 0
#define TRACE_MESSAGE 1
#define TRACE_DETAIL_MESSAGE 2
#define WARNING_MESSAGE 3
#define ERROR_MESSAGE 4
#define FATAL_MESSAGE 5
#define ARGS_MESSAGE 6
#define ATTR_FOR_LINUX 0x0000001
#define ATTR_FOR_AIX 0x00000002
#define ATTR_FOR_PE_STANDALONE 0x00000100
#define ATTR_FOR_PE_WITH_LL 0x00000200
#define ATTR_FOR_ALL_OS 0x000000ff
#define ATTR_FOR_ALL_PROXY 0x0000ff00
#define ATTR_ALWAYS_ALLOWED 0xffffffff
#define TRACE_ENTRY print_message(TRACE_MESSAGE, ">>> %s entered. (Line %d)\n", __FUNCTION__, __LINE__);
#define TRACE_EXIT print_message(TRACE_MESSAGE, "<<< %s exited. (Line %d)\n", __FUNCTION__, __LINE__);
#define TRACE_DETAIL(format, ...) print_message(TRACE_DETAIL_MESSAGE, format)
#define TRACE_DETAIL_V(format, ...) print_message(TRACE_DETAIL_MESSAGE, format, __VA_ARGS__)
typedef struct jobinfo *jobinfoptr; /* Forward reference to jobinfo */
typedef struct {
char *read_buf; /* Read buffer for stdout/stderr */
char *write_buf; /* Write buffer for stdout/stderr */
char *cp; /* Current character in write buffer */
int allocated; /* Allocated size of write buffer */
int remaining; /* Bytes left in write buffer */
void (*write_func) (jobinfoptr job, char *buf); /* Function writing data */
} ioinfo;
typedef struct {
int proxy_taskid; /* Process id assigned by proxy */
pid_t parent_pid; /* PID for parent (poe) process */
pid_t task_pid; /* PID for this process */
char *ipaddr; /* IP address of node where task is */
char *hostname; /* Hostname of node where task is */
} taskinfo;
typedef struct {
int proxy_jobid; /* Job id assigned by proxy */
char *submit_jobid; /* Jobid used when submitted by GUI */
pid_t poe_pid; /* Process id for main poe process */
pid_t task0_pid; /* Process id for app. task 0 */
int stdin_fd; /* STDIN pipe/file descriptor */
int stdout_fd; /* STDOUT pipe/file descriptor */
int stderr_fd; /* STDERR pipe/file descriptor */
ioinfo stdout_info; /* Stdout file buffer info */
ioinfo stderr_info; /* Stderr file buffer info */
int numtasks; /* Number of tasks in application */
taskinfo *tasks; /* Tasks in this application */
pthread_t startup_thread; /* Startup monitor thread for app */
time_t submit_time; /* Time job was submitted */
int label_io:1; /* User set MP_LABELIO */
int split_io:1; /* STDOUT is split by task */
int stdin_redirect:1; /* Stdin redirected to file */
int stdout_redirect:1; /* Stdout redirected to file */
int stderr_redirect:1; /* Stderr redirected to file */
int discovered_job:1; /* Job already running at PTP startup */
} jobinfo;
typedef struct NODE_REFCOUNT {
struct NODE_REFCOUNT *next; /* -> Next node in hash chain */
char *key; /* Hash key for this structure */
int proxy_nodeid; /* Proxy assigned node id for node */
int node_number; /* This node's node number */
int task_count; /* Number of tasks running on node */
} node_refcount;
typedef struct {
char *id; /* Attribute identifier */
int type; /* Cases where attribute is allowed */
char *short_name; /* Description used as label in GUI */
char *long_name; /* Text used for tooltip text in GUI */
char *default_value; /* Attribute's default value */
} string_launch_attr;
typedef struct {
char *id; /* Attribute identifier */
int type; /* Cases where attribute is allowed */
char *short_name; /* Description used as label in GUI */
char *long_name; /* Text used for tooltip text in GUI */
char *default_value; /* Attribute's default value */
char *enums; /* Enumeration values ',' delimited */
} enum_launch_attr;
typedef struct {
char *id; /* Attribute identifier */
int type; /* Cases where attribute is allowed */
char *short_name; /* Description used as label in GUI */
char *long_name; /* Text used for tooltip text in GUI */
int default_value; /* Attribute's default value */
int llimit; /* Attribute's lower limit */
int ulimit; /* Attribute's upper limit */
} int_launch_attr;
typedef struct {
char *id; /* Attribute identifier */
int type; /* Cases where attribute is allowed */
char *short_name; /* Description used as label in GUI */
char *long_name; /* Text used for tooltip text in GUI */
long long default_value; /* Attribute's default value */
long long llimit; /* Attribute's lower limit */
long long ulimit; /* Attribute's upper limit */
} long_int_launch_attr;
static RETSIGTYPE ptp_signal_handler(int sig);
static int server(char *name, char *host, int port);
static int start_daemon(int trans_id, int nargs, char *args[]);
static int define_model(int trans_id, int nargs, char *args[]);
static int run(int trans_id, int nargs, char *args[]);
static int terminate_job(int trans_id, int nargs, char *args[]);
static int quit(int trans_id, int nargs, char *args[]);
static int shutdown_proxy(void);
static int start_events(int trans_id, int nargs, char *args[]);
static int halt_events(int trans_id, int nargs, char *args[]);
static void post_error(int trans_id, int type, char *msg);
static void post_submitjob_error(int trans_id, char *subid, char *msg);
static char
**create_exec_parmlist(char *execname, char *targetname, char *args);
static char **create_env_array(char *args[], int split_io, char *mp_buffer_mem,
char *mp_rdma_count);
static void add_environment_variable(char *env_var);
static int setup_stdio_fd(int run_trans_id, char *subid, int pipe_fds[], char *path, char *stdio_name,
int *fd, int *redirect);
static int setup_child_stdio(int run_trans_id, char *subid, int stdio_fd, int redirect,
int *file_fd, int pipe_fd[]);
static int stdout_handler(int fd, void *job);
static int stderr_handler(int fd, void *job);
static void check_bufsize(ioinfo * file_info);
static void send_stdout(jobinfo * job, char *buf);
static void send_stderr(jobinfo * job, char *buf);
static int write_output(int fd, jobinfo * job, ioinfo * file_info);
static void *zombie_reaper(void *args);
static void update_node_refcounts(int numtasks, taskinfo * tasks);
static void delete_noderef(char *hostname);
static void *startup_monitor(void *pid);
static void delete_task_list(int numtasks, taskinfo * tasks);
static void *kill_process(void *pid);
static void update_nodes(int trans_id, FILE * hostlist);
static void malloc_check(void *p, const char *function, int line);
static node_refcount *add_node(char *key);
static node_refcount *find_node(char *key);
static void hash_cleanup(void *hash_list);
static void send_ok_event(int trans_id);
static void discover_jobs(void);
static void add_discovered_job(char *pid);
static void redirect_io(void);
static proxy_msg *proxy_attr_def_enum_event(int trans_id, char *id, char *name,
char *desc, int display, char *def, int count);
static proxy_msg *proxy_attr_def_int_limits_event(int trans_id, char *id,
char *name, char *desc, int display, int def,
int llimit, int ulimit);
static proxy_msg *proxy_attr_def_long_int_limits_event(int trans_id, char *id, char *name,
char *desc, int display, long long def,
long long llimit, long long ulimit);
static void send_string_attrs(int trans_id, int flags);
static void send_int_attrs(int trans_id, int flags);
static void send_long_int_attrs(int trans_id, int flags);
static void send_enum_attrs(int trans_id, int flags);
static void send_local_default_attrs(int trans_id);
static void send_new_node_list(int trans_id, int machine_id, List * new_nodes);
static void send_job_state_change_event(int trans_id, int jobid, char *state);
static void send_process_state_change_event(int trans_id, jobinfo * job, char *state);
static void send_process_state_output_event(int trans_id, int procid, char *output);
static int generate_id(void);
static void enqueue_event(proxy_msg * event);
static void print_message(int type, const char *format, ...);
static void print_message_args(int argc, char *optional_args[]);
static int find_load_leveler_library(void);
static int load_load_leveler_library(int trans_id);
int main(int argc, char *argv[]);
extern char **environ;
static int events_enabled = 0;
static int shutdown_requested;
static int ptp_signal_exit;
static List *jobs; /* Jobs run by this proxy */
static Hash *nodes; /* Nodes currently in use */
static int node_count; /* Number of active nodes */
static int global_node_index; /* Sequentially assigned node number */
static RETSIGTYPE(*saved_signals[NSIG]) (int);
static proxy_svr *pe_proxy; /* Handle for proxy message link */
static int base_id; /* Base id for proxy objects */
static int last_id = 1; /* Last assigned object id */
static int queue_id; /* Object id for our queue */
static int machine_id; /* Object id for our machine */
static int start_events_transid; /* start_events command id */
static int run_miniproxy; /* Run miniproxy at proxy shutdown */
static char emsg_buffer[_POSIX_PATH_MAX + 50]; /* Buffer for building error msg */
static int use_load_leveler = 0; /* Use LL resource managment/tracking */
static char *user_libpath; /* Alternate libdir for LoadLeveler */
static int multicluster_status; /* LoadLeveler multicluster status */
static int state_template; /* Rewrite template file at startup */
static int min_node_sleep_seconds = 30; /* Min. LL node status interval */
static int max_node_sleep_seconds = 300; /* Max. LL node status interval */
static int job_sleep_seconds = 30; /* LL job status interval */
static char ibmll_libpath_name[_POSIX_PATH_MAX]; /* LoadLeveler lib path */
static char miniproxy_path[_POSIX_PATH_MAX];
/*
* List functions are safe for adding or removing list elements since they
* have appropriate locks for updating the list. However, since the list
* current location pointer is part of the list object, any time the list
* is traversed, starting with a SetList call, the list must be locked
* since a SetList call for the same list on a different thread can
* invalidate the list positioning for the first thread.
*/
static pthread_mutex_t job_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t node_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t print_message_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_attr_t thread_attrs; /* Thread creation attributes */
static pthread_t termination_thread; /* Thread to monitor process exit */
static int state_trace = 0;
static int state_trace_detail = 0;
static int state_info = 0;
static int state_warning = 0;
static int state_error = 1;
static int state_args = 0;
static int state_message_timedate = 1;
static int state_message_threadid = 1;
static int state_events_active = 0;
static pthread_t thread_map_table[256];
static char *my_username;
static int next_env_entry;
static int env_array_size;
static char **env_array;
#ifdef __linux__
static struct option longopts[] = {
{"proxy", required_argument, NULL, 'P'},
{"port", required_argument, NULL, 'p'},
{"host", required_argument, NULL, 'h'},
{"useloadleveler", required_argument, NULL, 'L'},
{"trace", required_argument, NULL, 't'},
{"lib_override", required_argument, NULL, 'l'},
{"multicluster", required_argument, NULL, 'm'},
{"template_override", required_argument, NULL, 'o'},
{"template_write", required_argument, NULL, 'r'},
{"node_polling_min", required_argument, NULL, 'x'},
{"node_polling_max", required_argument, NULL, 'y'},
{"job_polling", required_argument, NULL, 'z'},
{"suspend_at_startup", no_argument, NULL, 'S'},
{"debug", required_argument, NULL, 'D'},
{"runMiniproxy", no_argument, NULL, 'M'},
{NULL, 0, NULL, 0}
};
static char *libpath[] = { NULL, "/opt/ibmll/LoadL/full/lib/",
"/opt/ibmll/LoadL/so/lib/", (char *) -1
};
static char *libname = "libllapi.so";
#else
static char *libpath[] = {
NULL, "/usr/lpp/LoadL/full/lib", "/usr/lpp/LoadL/so/lib", "/opt/ibmll/LoadL/full/lib/",
"/opt/ibmll/LoadL/so/lib/", (char *) -1
};
static char *libname = "libllapi.a";
#endif
#ifdef HAVE_LLAPI_H
#define MY_STATE_UNKNOWN 0
#define MY_STATE_UP 1
#define MY_STATE_DOWN 2
#define MY_STATE_STOPPED 3
#define MY_STATE_RUNNING 4
#define MY_STATE_IDLE 5
#define MY_STATE_TERMINATED 6
struct ClusterObject { /* a LoadLeveler cluster (same as a ptp machine) */
int proxy_generated_cluster_id;
char *cluster_name;
Hash *node_hash;
int proxy_generated_queue_id;
int cluster_state;
int queue_state;
int cluster_is_local;
int node_cleanup_required;
int job_cleanup_required;
};
typedef struct ClusterObject ClusterObject;
struct NodeObject { /* a LoadLeveler or ptp node in a cluster (machine) */
int proxy_generated_node_id;
int node_found; /* node found indicator */
int node_state;
char *node_name; /* use the name as the key to the node hash table in the cluster object */
};
typedef struct NodeObject NodeObject;
struct JobObject { /* a LoadLeveler or ptp job in a cluster */
int proxy_generated_job_id;
char *gui_assigned_job_id;
int job_found; /* job found indicator */
int job_state; /* 1=submitted, 2=in queue */
time_t job_submit_time; /* time when submitted */
List *task_list; /* processes running for this job */
LL_STEP_ID ll_step_id;
char *cluster_name;
};
typedef struct JobObject JobObject;
struct TaskObject { /* a LoadLeveler or ptp task for job */
int proxy_generated_task_id;
int task_found; /* job found indicator */
int ll_task_id;
int task_state;
char *node_name;
char *node_address;
};
typedef struct TaskObject TaskObject;
static void *ibmll_libpath_handle = NULL;
static struct {
LL_element *(*ll_query) (enum QueryType);
int (*ll_set_request) (LL_element *, enum QueryFlags, char **, enum DataFilter);
LL_element *(*ll_get_objs) (LL_element *, enum LL_Daemon, char *, int *, int *);
int (*ll_get_data) (LL_element *, enum LLAPI_Specification, void *);
int (*ll_deallocate) (LL_element *);
LL_element *(*ll_next_obj) (LL_element *);
int (*ll_free_objs) (LL_element *);
int (*ll_cluster) (int, LL_element **, LL_cluster_param *);
int (*ll_submit_job) (char *job_cmd_file, char *monitor_program, char *monitor_arg,
LL_job * job_info, int job_version);
int (*ll_terminate_job) (LL_terminate_job_info * ptr);
void (*ll_free_job_info) (LL_job * job_info, int job_version);
char *(*ll_error) (LL_element ** errObj, int print_to);
} LL_SYMS;
static int state_shutdown_requested = 0; /* shutdown not in progress */
TaskObject *task_object = NULL;
static pthread_t monitor_LoadLeveler_jobs_thread = 0;
static pthread_attr_t monitor_LoadLeveler_jobs_thread_attr;
static pthread_t monitor_LoadLeveler_nodes_thread = 0;
static pthread_attr_t monitor_LoadLeveler_nodes_thread_attr;
static pthread_mutex_t master_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t access_LoadLeveler_lock = PTHREAD_MUTEX_INITIALIZER;
static List *cluster_list = NULL; /* list of clusters if multicluster (we'll set to single local if none) */
static List *job_list = NULL; /* job list for all clusters (since jobs can move) */
static pthread_mutex_t job_notify_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t job_notify_condvar = PTHREAD_COND_INITIALIZER;
static char ibmll_proxy_base_id_string[256];
static ClusterObject *my_cluster;
static void *monitor_LoadLeveler_jobs(void *args);
static void *monitor_LoadLeveler_nodes(void *args);
static int get_multicluster_status();
static int my_ll_get_data(LL_element * request, enum LLAPI_Specification spec, void *result);
static int my_ll_cluster(int version, LL_element ** errObj, LL_cluster_param * cp);
static int my_ll_deallocate(LL_element * query_elem);
static LL_element *my_ll_query(enum QueryType type);
static int my_ll_free_objs(LL_element * query_elem);
static LL_element *my_ll_next_obj(LL_element * query_elem);
static LL_element *my_ll_get_objs(LL_element * query_elem, enum LL_Daemon daemon, char *ignore,
int *value, int *rc);
static int my_ll_set_request(LL_element * query_elem, enum QueryFlags qflags, char **ignore,
enum DataFilter dfilter);
static int sendNodeAddEvent(int gui_transmission_id, ClusterObject * cluster_object,
NodeObject * node_object);
static int sendNodeChangeEvent(int gui_transmission_id, ClusterObject * cluster_object,
NodeObject * node_object);
static int sendJobAddEvent(int gui_transmission_id, ClusterObject * cluster_object,
JobObject * job_object);
static int sendJobChangeEvent(int gui_transmission_id, JobObject * job_object);
static int sendJobRemoveEvent(int gui_transmission_id, JobObject * job_object);
static int sendTaskAddEvent(int gui_transmission_id, ClusterObject * cluster_object,
JobObject * job_object, TaskObject * task_object);
static int sendTaskChangeEvent(int gui_transmission_id, JobObject * job_object,
TaskObject * task_object);
static int sendTaskRemoveEvent(int gui_transmission_id, JobObject * job_object,
TaskObject * task_object);
static int sendQueueAddEvent(int gui_transmission_id, ClusterObject * cluster_object);
static int sendMachineAddEvent(int gui_transmission_id, ClusterObject * cluster_object);
static void add_job_to_list(List * job_list, JobObject * job_object);
static void add_task_to_list(List * task_list, TaskObject * task_object);
static void add_node_to_hash(Hash * node_hash, NodeObject * node_object);
static void delete_task_from_list(List * task_list, TaskObject * task_object);
static JobObject *get_job_in_list(List * job_list, LL_STEP_ID ll_step_id);
static NodeObject *get_node_in_hash(Hash * node_hash, char *node_name);
static TaskObject *get_task_in_list(List * task_list, char *task_instance_machine_name,
int ll_task_id);
static void refresh_cluster_list();
#endif
static proxy_svr_helper_funcs helper_funcs = { NULL, NULL };
/*
* Proxy infrastructure expects commands in exactly this sequence.
* Be careful when adding or deleting commands
*/
static proxy_cmd cmds[] = { quit, start_daemon, define_model, start_events,
halt_events, run, terminate_job
};
static proxy_commands command_tab = { 0, sizeof cmds / sizeof(proxy_cmd), cmds };
static char *mp_infolevel_labels[] = {"Error", "Warning", "Informational",
"Diagnostic", "Diagnostic level 4", "Diagnostic level 5",
"Diagnostic level 6"
};
/*
* This table defines the launch attributes corresponding to PE environment
* variables for the stand-alone PE case for Linux.
*/
static string_launch_attr string_launch_attrs[] = {
/*
* Attributes needed in both basic and advanced mode
*/
{"PE_STDIN_PATH", ATTR_ALWAYS_ALLOWED, "Stdin Path", "Specify path for stdin input file", ""},
{"PE_STDOUT_PATH", ATTR_ALWAYS_ALLOWED, "Stdout Path",
"Specify path for stdout output file", ""},
{"PE_STDERR_PATH", ATTR_ALWAYS_ALLOWED, "Stderr Path",
"Specify path for stderr output file", ""},
/*
* I/O Related attributes
*/
{"MP_IONODEFILE", ATTR_ALWAYS_ALLOWED, "MPI I/O Node List",
"Specify file listing nodes performing parallel I/O (MP_IONODEFILE)",
""},
/*
* Diagnostic related attributes
*/
{"MP_PMDLOG_DIR", ATTR_ALWAYS_ALLOWED, "PMD Log Directory",
"Specify directory where PMD log is generated (MP_PMDLOG_DIR)", ""},
{"MP_PRIORITY_LOG_DIR", ATTR_FOR_AIX | ATTR_FOR_ALL_PROXY, "Priority Log Directory",
"Specify directory containing co-scheduler log (MP_PRIORITY_LOG_DIR)", "/tmp"},
{"MP_PRIORITY_LOG_NAME", ATTR_FOR_AIX | ATTR_FOR_ALL_PROXY, "Priority Log Name",
"Specify name of co-scheduler log (MP_PRIORITY_LOG_NAME)", "pmadjpri.log"},
/*
* Debug related attributes
*/
{"MP_COREDIR", ATTR_ALWAYS_ALLOWED, "Corefile Directory",
"Specify directory for application's core files (MP_COREDIR)", ""},
{"MP_DEBUG_INITIAL_STOP", ATTR_FOR_AIX | ATTR_FOR_ALL_PROXY, "Initial Breakpoint",
"Initial breakpoint when debugging an application (MP_DEBUG_INITIAL_STOP)", ""},
{"MP_PROFDIR", ATTR_FOR_LINUX | ATTR_FOR_ALL_PROXY, "GMON Directory",
"Directory containing GMON profiling data files (GMON_PROFDIR)", ""},
/*
* System resource related attributes
*/
{"MP_PRIORITY", ATTR_FOR_AIX | ATTR_FOR_ALL_PROXY, "Dispatch Priority Class",
"Specify priority class or high/low priority values (MP_PRIORITY)", ""},
/*
* Node allocation related attributes
*/
{"MP_CMDFILE", ATTR_ALWAYS_ALLOWED, "Command File",
"Specify script to load nodes in partition (MP_CMDFILE)", ""},
{"MP_HOSTFILE", ATTR_FOR_ALL_OS | ATTR_FOR_ALL_PROXY, "Host List File",
"Specify name of host list file for node allocation (MP_HOSTFILE)", ""},
{"MP_REMOTEDIR", ATTR_FOR_AIX | ATTR_FOR_ALL_PROXY,
"Specify name of script echoing current directory (MP_REMOTEDIR)", ""},
{"MP_LLFILE", ATTR_FOR_ALL_OS | ATTR_FOR_PE_WITH_LL, "Job Command File",
"Specify path of a LoadLeveler job command file used for node allocation (MP_LLFILE)", ""},
{"MP_RMPOOL", ATTR_FOR_ALL_OS | ATTR_FOR_PE_WITH_LL, "Resource Pool",
"Name or number of resource pool to use for node allocation (MP_RMPOOL)", ""},
/*
* Performance related attributes
*/
/*
* Miscellaneous attributes
*/
{"MP_EUILIBPATH", ATTR_ALWAYS_ALLOWED, "Library Path",
"Specify path to message passing and communications libraries (MP_EUILIBPATH)", ""},
{"MP_CKPTFILE", ATTR_FOR_AIX | ATTR_FOR_ALL_PROXY, "Checkpoint File Name",
"Base name of the chcekpoint file (MP_CKPTFILE)", ""},
{"MP_CKPTDIR", ATTR_FOR_AIX | ATTR_FOR_ALL_PROXY, "Checkpoint Directory",
"Directory where the checkpoint file will reside (MP_CHPTDIR)", ""},
/*
* Alternate resource manager attributes
*/
{"MP_RMLIB", ATTR_FOR_ALL_OS | ATTR_FOR_PE_STANDALONE, "Resource Manager Library",
"Specify alternate resource manager library", ""}, /*
* Other attributes
*/
{"PE_ENV_SCRIPT", ATTR_ALWAYS_ALLOWED, "Environment Setup Script",
"Specify if using basic or advanced mode", ""},
{"PE_ADVANCED_MODE", ATTR_ALWAYS_ALLOWED, "Advanced Mode",
"Specify name of PE environment variable setup script", "no"},
{"MP_SAVE_LLFILE", ATTR_FOR_ALL_OS | ATTR_FOR_PE_WITH_LL, "Save Job Command File",
"Specify path of generated LoadLeveler job command file (MP_SAVE_LLFILE)", ""},
{"MP_SAVEHOSTFILE", ATTR_FOR_ALL_OS | ATTR_FOR_PE_WITH_LL, "Save Hostlist File",
"Specify path of generated host list file (MP_SAVEHOSTFILE)", ""}
};
static enum_launch_attr enum_attrs[] = {
/*
* I/O tab related attributes
*/
{"MP_DEVTYPE", ATTR_ALWAYS_ALLOWED, "Device Type Class",
"Specify that Infiniband interconnect is to be used (MP_DEVTYPE)", "", "|ib"},
{"MP_LABELIO", ATTR_ALWAYS_ALLOWED, "Label I/O",
"Specify if application output is labeled by task id (MP_LABELIO)", "no", "yes|no"},
{"MP_IO_ERRLOG", ATTR_ALWAYS_ALLOWED, "Create I/O Error Log",
"Specify if I/O error logging is enabled (MP_IO_ERRLOG)", "no", "yes|no"},
{"PE_SPLIT_STDOUT", ATTR_ALWAYS_ALLOWED, "Split STDOUT by Task",
"Specify if stdio output is split by task", "no", "yes|no"},
{"MP_STDINMODE", ATTR_ALWAYS_ALLOWED, "STDIN Mode",
"Specify how application's stdin is managed (MP_STDINMODE)", "all", "all|none"},
{"MP_STDOUTMODE", ATTR_ALWAYS_ALLOWED, "STDOUT Mode",
"Specify how application's stdio output is handled (MP_STDOUTMODE)",
"unordered", "ordered|unordered"},
/*
* Diagnostic tab related attributes
*/
{"MP_PMDLOG", ATTR_ALWAYS_ALLOWED, "Create PMD Log",
"Specify if PE diagnostic messages are logged", "no", "yes|no"},
{"MP_PRINTENV", ATTR_ALWAYS_ALLOWED, "Print Environment",
"Specify if PE environment variables are printed (MP_PRINTENV)", "no", "yes|no"},
{"MP_PRIORITY_LOG", ATTR_FOR_AIX | ATTR_FOR_ALL_PROXY, "Log Co-scheduler Messages",
"Specify if messages are logged to co-scheduler log (MP_PRIORITY_LOG)", "yes", "yes|no"},
{"MP_INFOLEVEL", ATTR_ALWAYS_ALLOWED, "Message Level",
"Specify level of PE message reporting (MP_INFOLEVEL)", "Warning",
"Error|Warning|Informational|Diagnostic|Diagnostic level 4|Diagnostic level 5|Diagnostic level 6"},
{"MP_LAPI_TRACE_LEVEL", ATTR_ALWAYS_ALLOWED, "LAPI Trace Level",
"Specify level of LAPI trace (MP_LAPI_TRACE_LEVEL)", "0", "0|1|2|3|4|5"},
{"MP_STATISTICS", ATTR_ALWAYS_ALLOWED, "MPI Statistics",
"Obtain communication statistics for user space jobs (MP_STATISTICS)", "no", "yes|no|print"},
/*
* Debug tab related attributes
*/
{"MP_DEBUG_NOTIMEOUT", ATTR_ALWAYS_ALLOWED, "Suppress Timeout",
"Specify if debugger can attach without causing application timeout (MP_DEBUG_NOTIMEOUT)",
"no", "yes|no"},
{"MP_COREFILE_SIGTERM", ATTR_FOR_AIX | ATTR_FOR_ALL_PROXY, "Corefile on SIGTERM",
"Specify if corefile generated on SIGTERM (MP_COREFILE_SIGTERM)", "no", "yes|no"},
{"MP_EUIDEVELOP", ATTR_ALWAYS_ALLOWED, "MPI Parameter Checking",
"Specify level of MPI parameter checking (MP_EUIDEVELOP)", "no", "yes|no|debug|minimum"},
{"MP_COREFILE_FORMAT", ATTR_FOR_AIX | ATTR_FOR_ALL_PROXY,
"Specify core file format or name of lightweight core file (MP_COREFILE_FORMAT)",
"standard", "standard|STDERR"},
/*
* System tab related attributes
*/
{"MP_ADAPTER_USE", ATTR_ALWAYS_ALLOWED, "Exclusive Adapter Use",
"Specify how node's adapter should be used (MP_ADAPTER_USE)", "shared", "dedicated|shared"},
{"MP_CPU_USE", ATTR_ALWAYS_ALLOWED, "Exclusive CPU Use",
"Specify how node's CPU should be used (MP_CPU_USE)", "multiple", "multiple|unique"},
{"MP_EUIDEVICE", ATTR_FOR_AIX | ATTR_FOR_ALL_PROXY, "Adapter",
"Specify adapter to use for message passing (MP_EUIDEVICE)",
"", "en0|fi0|tr0|sn_all|sn_single|ml0"},
{"MP_EUIDEVICE", ATTR_FOR_LINUX | ATTR_FOR_ALL_PROXY, "Adapter",
"Specify adapter to use for message passing (MP_EUIDEVICE)", "", "ethx|sn_all|sn_single"},
{"MP_EUILIB", ATTR_FOR_ALL_OS | ATTR_FOR_ALL_PROXY, "Communications Subsystem",
"Communications susbsystem to be used (MP_EUILIB)", "ip", "ip|us"},
{"MP_INSTANCES", ATTR_FOR_ALL_OS | ATTR_FOR_PE_WITH_LL, "Window or IP Instances",
"Number of user space windows or IP addresses to assign (MP_INSTANCES)", "", "|max"},
{"MP_RETRY", ATTR_FOR_ALL_OS | ATTR_FOR_PE_WITH_LL, "Node Retry Interval",
"Period in seconds between node allocation retries (MP_RETRY)", "0", "|,wait"},
/*
* Node tab related attributes
*/
{"MP_PGMMODEL", ATTR_ALWAYS_ALLOWED, "Programming Model",
"Specify programming model (MP_PGMMODEL)", "spmd", "spmd|mpmd"},
/*
* Performance tab related attributes
*/
{"MP_CC_SCRATCH_BUF", ATTR_ALWAYS_ALLOWED, "Fastest Collectives",
"Specify if fastest collective algorithm is used (MP_CC_SCRATCH_BUF)", "yes", "yes|no"},
{"MP_CSS_INTERRUPT", ATTR_ALWAYS_ALLOWED, "Packets Generate Interrupts",
"Specify if arriving packets generate interrupts (MP_CSS_INTERRUPT)", "no", "yes|no"},
{"MP_PRIORITY_NTP", ATTR_FOR_AIX | ATTR_FOR_ALL_PROXY, "Coscheduler disables NTP",
"Specify if PE co-scheduler turns NTP off (MP_PRIORITY_NTP)", "no", "yes|no"},
{"MP_SHARED_MEMORY", ATTR_ALWAYS_ALLOWED, "Use Shared Memory",
"Specify is shared memory used for communication (MP_SHARED_MEMORY)", "yes", "yes|no"},
{"MP_SINGLE_THREAD", ATTR_ALWAYS_ALLOWED, "Single MPI Thread",
"Specify if application has single thread with MPI calls (MP_SINGLE_THREAD)",
"no", "yes|no"},
{"MP_WAIT_MODE", ATTR_ALWAYS_ALLOWED, "Wait Mode",
"Specify thread behavior when waiting for messages (MP_WAIT_MODE)",
"poll", "nopoll|poll|sleep|yield"},
{"MP_TASK_AFFINITY", ATTR_FOR_AIX | ATTR_FOR_ALL_PROXY, "Task Affinity",
"Specify task affinity constraints (MP_TASK_AFFINITY)", "", "|SNI|-1"},
{"MP_NEWJOB", ATTR_FOR_ALL_OS | ATTR_FOR_PE_WITH_LL, "Maintain Partition",
"Maintain LoadLeveler parttition for multiple job steps (MP_NEWJOB)", "no", "yes|no"},
{"MP_USE_BULK_XFER", ATTR_FOR_AIX | ATTR_FOR_PE_WITH_LL, "Use Bulk Transfer",
"Exploit high performance switch data transfer (MP_USE_BULK_XFER)", "no", "yes|no"},
/*
* Miscellaneous tab related attributes
*/
{"MP_HINTS_FILTERED", ATTR_ALWAYS_ALLOWED, "Hints Filtered",
"Specify if MPI info objects reject hints (MP_HINTS_FILTERED)", "yes", "yes|no"},
{"MP_CLOCK_SOURCE", ATTR_ALWAYS_ALLOWED, "Clock Source",
"Specify if high performance switch clock is time source (MP_CLOCK_SOURCE)", "", "|OS"},
{"MP_MSG_API", ATTR_ALWAYS_ALLOWED, "Message Passing API",
"Specify message passing API used by application (MP_MSG_API)",
"MPI", "MPI|LAPI|MPI_LAPI|MPI,LAPI"},
{"MP_TLP_REQUIRED", ATTR_FOR_AIX | ATTR_FOR_ALL_PROXY, "Large Page Check",
"Specify if PE checks if application was compiled with large pages (MP_TLP_REQUIRED)",
"none", "none|warn|kill"},
{"MP_CKPTDIR_PER_TASK", ATTR_FOR_AIX | ATTR_FOR_ALL_PROXY, "Checkpoint Directory Per Task",
"Specify if separate checkpoint directory per task (MP_CKPTDIR_PER_TASK)", "no"}
};
int_launch_attr int_attrs[] = {
/*
* I/O tab related attributes
*/
{"MP_IO_BUFFER_SIZE", ATTR_ALWAYS_ALLOWED, "MPI I/O Buffer Size",
"Specify default buffer size for MPI-IO (MPI_IO_BUFFER_SIZE)", 8192, 1, 0x8000000},
/*
* Node tab related attributes
*/
{"MP_PROCS", ATTR_ALWAYS_ALLOWED,
"Number of Tasks", "Specify number of program tasks (MP_PROCS)", 1, 1, INT_MAX},
{"MP_NODES", ATTR_FOR_ALL_OS | ATTR_FOR_PE_WITH_LL,
"Number of Nodes", "Number of nodes to allocate (MP_NODES)", 1, 1, INT_MAX},
{"MP_TASKS_PER_NODE", ATTR_FOR_ALL_OS | ATTR_FOR_PE_WITH_LL, "Tasks per Node",
"Number of tasks to assign to a node (MP_TASKS_PER_NODE)", 1, 1, INT_MAX},
{"MP_RETRYCOUNT", ATTR_FOR_ALL_OS | ATTR_FOR_PE_WITH_LL, "Node Retry Count",
"Number of times to retry node allocation (MP_RETRYCOUNT)", 1, 1, INT_MAX},
/*
* Performance tab related attributes
*/
{"MP_ACK_THRESH", ATTR_ALWAYS_ALLOWED, "Acknowledgment Threshold",
"Specify packet acknowledgment threshhold (MP_ACK_THRESH)", 30, 1, 31},
{"MP_EAGER_LIMIT", ATTR_ALWAYS_ALLOWED, "Eager Limit",
"Specify rondezvous protocol message size threshhold (MP_EAGER_LIMIT)",
0x10000, 1, INT_MAX},
{"MP_MSG_ENVELOPE_BUF", ATTR_ALWAYS_ALLOWED, "Envelope Buffer Size",
"Specify size of message envelope buffer (MP_MSG_ENVELOPE_BUF)",
0x800000, 1000001, INT_MAX},
{"MP_POLLING_INTERVAL", ATTR_ALWAYS_ALLOWED, "MPI Polling Interval",
"Specify PE polling interval (MP_POLLING_INTERVAL)", 400000, 1, INT_MAX},
{"PE_RDMA_COUNT", ATTR_ALWAYS_ALLOWED, "Number of rCtx Blocks",
"Specify number of rCxt blocks (MP_RDMA_COUNT)", 0, 0, INT_MAX},
{"PE_RDMA_COUNT_2", ATTR_ALWAYS_ALLOWED, "Number of rCtx Blocks",
"Specify number of rCxt blocks (MP_RDMA_COUNT)", 0, 0, INT_MAX},
{"MP_RETRANSMIT_INTERVAL", ATTR_ALWAYS_ALLOWED, "MPI Retransmit Interval",
"Specify interval to check if retransmit neeeded (MP_RETRANSMIT_INTERVAL)",
10000, 1000, INT_MAX},
{"MP_REXMIT_BUF_CNT", ATTR_ALWAYS_ALLOWED, "Retransmit Buffers",
"Specify number of retransmit buffers per task (MP_REXMIT_BUF_CNT)", 128, 1, INT_MAX},
{"MP_REXMIT_BUF_SIZE", ATTR_ALWAYS_ALLOWED, "Maximum LAPI Buffered Message",
"Specify maximum LAPI buffer size (MP_REXMIT_BUF_SIZE)", 65568, 1, INT_MAX},
{"MP_UDP_PACKET_SIZE", ATTR_ALWAYS_ALLOWED,
"UDP Packet Size", "Specify UDP packet size (MP_UDP_PACKET_SIZE)", 1, 1, INT_MAX},
{"PE_BUFFER_MEM", ATTR_ALWAYS_ALLOWED, "Buffer Memory",
"Specify size of preallocated early arrival buffer (MP_BUFFER_MEM)",
0x4000000, 0, INT_MAX},
{"MP_BULK_MIN_MSG_SIZE", ATTR_FOR_AIX | ATTR_FOR_PE_WITH_LL, "Minimum Bulk Message Size",
"Minimum message size to use bulk transfer path (MP_BULK_MIN_MSG_SIZE)",
153600, 0, INT_MAX},
/*
* Miscellaneous tab related attributes
*/
{"MP_THREAD_STACKSIZE", ATTR_ALWAYS_ALLOWED, "Additional MPI Thread Stack Size",
"Specify additional stack size for MPI service thread (MP_THREAD_STACKSIZE)",
0, 0, INT_MAX},
{"MP_PULSE", ATTR_ALWAYS_ALLOWED, "Pulse Interval",
"Specify interval PE checks remote modes (MP_PULSE)", 600, 0, INT_MAX},
{"MP_TIMEOUT", ATTR_ALWAYS_ALLOWED, "Connection Timeout",
"Specify timeout limit for connecting to remote nodes (MP_TIMEOUT)", 150, 1, INT_MAX},
/*
* Additional integer attributes used only for validation of attribute
* field contents. These are used in cases where a single integer,
* string or enumerated attribute is insufficient for validating
* an attribute field, for instance where an allowable value is an
* enumeration or integer value, or where a field contains
* multiple sub-fields, such as mmm,nnn. For these attributes,
* short name, long name and default value are irrelevant.
*/
{"MP_INSTANCES_INT", ATTR_ALWAYS_ALLOWED, "???", "???", 0, 0, INT_MAX},
};
long_int_launch_attr long_int_attrs[] = {
{"PE_BUFFER_MEM_MAX", ATTR_ALWAYS_ALLOWED, "???",
"Specify maximum size of early arrival buffer (MP_BUFFER_MEM)", 0, 0, 0x7fffffffffffffffLL}
};
/*************************************************************************/
/* Proxy command handlers */
/*************************************************************************/
int
start_daemon(int trans_id, int nargs, char *args[])
{
/*
* Proxy startup. Allocate a list to contain machine definitions, where
* each unique hostfile defines a new machine. Create a thread that
* will monitor started poe processes for termination and will
* notify the front end that the poe process has terminatred.
*/
pthread_attr_t term_thread_attrs;
struct passwd *userinfo;
TRACE_ENTRY;
print_message_args(nargs, args);
userinfo = getpwuid(getuid());
my_username = strdup(userinfo->pw_name);
base_id = strtol(args[1], NULL, 10);
nodes = HashCreate(1024);
pthread_attr_init(&thread_attrs);
pthread_attr_setdetachstate(&thread_attrs, PTHREAD_CREATE_DETACHED);
pthread_attr_init(&term_thread_attrs);
pthread_attr_setdetachstate(&term_thread_attrs, PTHREAD_CREATE_JOINABLE);
pthread_create(&termination_thread, &thread_attrs, zombie_reaper, NULL);
#ifdef HAVE_LLAPI_H
strcpy(ibmll_proxy_base_id_string, args[1]);
if (use_load_leveler) {
load_load_leveler_library(trans_id);
}
#endif
send_ok_event(trans_id);
TRACE_EXIT;
return PROXY_RES_OK;
}
int
define_model(int trans_id, int nargs, char *args[])
{
/*
* Send the attribute definitions, launch attribute definitions,
* and element definitions known by this proxy to the GUI.
*/
int flags;
TRACE_ENTRY;
flags = 0;
#ifdef __linux__
flags = flags | ATTR_FOR_LINUX;
#endif
#ifdef _AIX
flags = flags | ATTR_FOR_AIX;
#endif
if (use_load_leveler) {
flags = flags | ATTR_FOR_PE_WITH_LL;
}
else {
flags = flags | ATTR_FOR_PE_STANDALONE;
}
print_message_args(nargs, args);
send_string_attrs(trans_id, flags);
send_int_attrs(trans_id, flags);
send_long_int_attrs(trans_id, flags);
send_enum_attrs(trans_id, flags);
send_local_default_attrs(trans_id);
send_ok_event(trans_id);
TRACE_EXIT;
return PROXY_RES_OK;
}
int
start_events(int trans_id, int nargs, char *args[])
{
/*
* Send the complete machine state to the GUI. In PE standalone case,
* there is only a machine and a queue, since nodes are not known
* until an appliation is invoked and a hostlist provided. In the
* PE/LoadLeveler case, query LoadLeveler to get the set of nodes
* that are part of the cluster (machine) and send new node events
* to the GUI for each node.
*/
TRACE_ENTRY;
print_message_args(nargs, args);
start_events_transid = trans_id;
state_events_active = 1;
if (use_load_leveler) {
#ifdef HAVE_LLAPI_H
/*
* If LoadLeveler is used, then PE jobs will use the machine and queue that the node where this
* proxy is running is a member of, so machine_id and queue_id won't be generated here
*/
/* Create thread to monitor LoadLeveler clusters of machines (these are nodes in a machine in ptp lingo). */
pthread_attr_init(&monitor_LoadLeveler_nodes_thread_attr);
pthread_attr_setdetachstate(&monitor_LoadLeveler_nodes_thread_attr,
PTHREAD_CREATE_DETACHED);
pthread_create(&monitor_LoadLeveler_nodes_thread, &monitor_LoadLeveler_nodes_thread_attr,
monitor_LoadLeveler_nodes, NULL);
/* Create thread to monitor LoadLeveler jobs in clusters. */
pthread_attr_init(&monitor_LoadLeveler_jobs_thread_attr);
pthread_attr_setdetachstate(&monitor_LoadLeveler_jobs_thread_attr, PTHREAD_CREATE_DETACHED);
pthread_create(&monitor_LoadLeveler_jobs_thread, &monitor_LoadLeveler_jobs_thread_attr,
monitor_LoadLeveler_jobs, NULL);
#endif
}
else {
char id_str[12];
char base_id_str[12];
machine_id = generate_id();
queue_id = generate_id();
sprintf(base_id_str, "%d", base_id);
sprintf(id_str, "%d", machine_id);
enqueue_event(proxy_new_machine_event(trans_id, base_id_str, id_str,
"default", MACHINE_STATE_UP));
sprintf(id_str, "%d", queue_id);
enqueue_event(proxy_new_queue_event(trans_id, base_id_str, id_str,
"default", QUEUE_STATE_NORMAL));
/*
* Look for poe jobs already running on system
*/
discover_jobs();
}
/*
* Do not send an acknowledgment for the start_events command here
* since asynchronous event notifications for new machine, node,
* process, job, as well as state changes in those objects use the
* start_events transaction id. If an ack is sent here, that
* transaction id is invalid and any events using that id will cause
* an exception in the front end state machine loop, terminating the
* state machine.
*/
TRACE_EXIT;
return PROXY_RES_OK;
}
int
run(int trans_id, int nargs, char *args[])
{
/*
* Submit a Parallel Environmnent application for execution.
* This function:
* 1) parses the passed argument list
* 2) sets the current working directory
* 3) sets all environment variables passed in the argument list
* 4) creates pipes to handle stdio to/from the application
* 5) sets up I/O handlers for stdio
* 6) sets up the argument list for the application
* 7) starts the application by fork/exec
* 8) starts a monitoring thread whose purpose is to wait for the
* attach.cfg file to be created, then update job state to running.
* 9) notify the front end a new job has been submitted
*/
char *execname;
char *execdir;
char *argp;
char *jobid;
char *cp;
char *cwd;
jobinfo *job;
int i;
int label_io;
int split_io;
int status;
int stdout_pipe[2];
int stderr_pipe[2];
char *stdin_path;
char *stdout_path;
char *stderr_path;
pid_t pid;
char *mp_buffer_mem;
char *mp_buffer_mem_max;
int mp_buffer_mem_set;
char mp_buffer_mem_value[50];
char *mp_rdma_count;
char *mp_rdma_count_2;
int mp_rdma_count_set;
char mp_rdma_count_value[50];
int redirect;
TRACE_ENTRY;
print_message_args(nargs, args);
jobid = NULL;
execdir = NULL;
execname = NULL;
cwd = NULL;
argp = NULL;
label_io = 0;
split_io = 0;
stdin_path = NULL;
stdout_path = NULL;
stderr_path = NULL;
mp_buffer_mem = "";
mp_buffer_mem_max = "";
mp_buffer_mem_set = 0;
mp_buffer_mem_value[0] = '\0';
mp_rdma_count = "";
mp_rdma_count_2 = "";
mp_rdma_count_set = 0;
mp_rdma_count_value[0] = '\0';
/*
* Process arguments passed to this function
*/
TRACE_DETAIL("+++ Parsing arguments\n");
for (i = 0; args[i] != NULL; i++) {
/*
* Check if this is a PE environment variable, for instance
* MP_PROCS=2
*/
if (strncmp(args[i], "MP_", 3) == 0) {
cp = strchr(args[i], '=');
if (cp != NULL) {
/*
* The MP_LABELIO and MP_HOSTFILE environment variables are
* used by the proxy so process them here.
*/
*cp = '\0';
if (strcmp(args[i], "MP_LABELIO") == 0) {
if (strcmp((cp + 1), "yes") == 0) {
label_io = 1;
}
}
else if ((!use_load_leveler) && (strcmp(args[i], "MP_HOSTFILE")) == 0) {
FILE *hostlist;
/*
* Process host file, building new machine
* configuration if this is a unique hostfile.
* If LoadLeveler is used, then don't process hostfile since node status
* is handled by tracking Loadleveler's view of node status.
*/
hostlist = fopen((cp + 1), "r");
if (hostlist != NULL) {
update_nodes(trans_id, hostlist);
fclose(hostlist);
}
}
/*
* If MP_INFOLELEL is > 1 char, then convert from label
* name to real setting value. Note that the original
* setting is overwritten, which is ok since the
* new value is guaranteed to be shorter than the
* original value.
*/
else if ((strcmp(args[i], "MP_INFOLEVEL") == 0) && (strlen(cp + 1) != 1)) {
int n;
for (n = 0; n < sizeof mp_infolevel_labels / sizeof(char *); n++) {
if (strcmp(cp + 1, mp_infolevel_labels[n]) == 0) {
break;
}
}
if (n < (sizeof mp_infolevel_labels / sizeof(char *))) {
sprintf(cp + 1, "%d", n);
}
}
/*
* Restore the '=' in the environment variable setting
*/
*cp = '=';
}
}
/*
* Check if this is a variable set by the PE front end and handle it
* if so.
*/
else if (strncmp(args[i], "PE_", 3) == 0) {
cp = strchr(args[i], '=');
if (cp != NULL) {
*cp = '\0';
if (strcmp(args[i], "PE_STDIN_PATH") == 0) {
stdin_path = cp + 1;
}
else if (strcmp(args[i], "PE_STDOUT_PATH") == 0) {
stdout_path = cp + 1;
}
else if (strcmp(args[i], "PE_STDERR_PATH") == 0) {
stderr_path = cp + 1;
}
else if ((strcmp(args[i], "PE_SPLIT_STDOUT") == 0)
&& (strcmp(cp + 1, "yes") == 0)) {
split_io = 1;
}
/*
* The PE environment variable MP_BUFFER_MEM gets
* special handling since it is the only environment
* variable which has up to 2 parameters, These parameters
* are treated separately in the GUI to simplify
* validation. The GUI sends the attributes PE_BUFFER_MEM
* and PE_BUFFER_MEM_MAX representing these two values.
* This code must re-assemble the parameters into a proper
* MP_BUFFER_MEM setting and ensure that the environment
* variable gets set. Since the values passed by the
* GUI must not be passed directly to the application, the
* front end identifies them with a PE_ prefix instead of
* the usual MP_ prefix.
* MP_RDMA_COUNT gets similar treatment, using GUI attributes
* PE_RDMA_COUNT and PE_RDMA_COUNT_2
*/
else if (strcmp(args[i], "PE_BUFFER_MEM") == 0) {
mp_buffer_mem = cp + 1;
mp_buffer_mem_set = 1;
}
else if (strcmp(args[i], "PE_BUFFER_MEM_MAX") == 0) {
mp_buffer_mem_max = cp + 1;
mp_buffer_mem_set = 1;
}
else if (strcmp(args[i], "PE_RDMA_COUNT") == 0) {
mp_rdma_count = cp + 1;
mp_rdma_count_set = 1;
}
else if (strcmp(args[i], "PE_RDMA_COUNT_2") == 0) {
mp_rdma_count_2 = cp + 1;
mp_rdma_count_set = 1;
}
}
}
else {
/*
* Look for general launch configuration variables and handle
* appropriately
*/
cp = strchr(args[i], '=');
if (cp != NULL) {
*cp = '\0';
cp = cp + 1;
if (strcmp(args[i], JOB_SUB_ID_ATTR) == 0) {
jobid = strdup(cp);
}
else if (strcmp(args[i], JOB_EXEC_NAME_ATTR) == 0) {
execname = strdup(cp);
}
else if (strcmp(args[i], JOB_WORKING_DIR_ATTR) == 0) {
cwd = strdup(cp);
}
else if (strcmp(args[i], JOB_PROG_ARGS_ATTR) == 0) {
argp = strdup(cp);
}
else if (strcmp(args[i], JOB_EXEC_PATH_ATTR) == 0) {
execdir = strdup(cp);
}
else if (strcmp(args[i], JOB_ENV_ATTR) == 0) {
}
*(cp - 1) = '=';
}
}
}
if (jobid == NULL) {
post_error(trans_id, PROXY_EV_RT_SUBMITJOB_ERROR, "Missing ID on job submission");
}
if (cwd != NULL) {
status = chdir(cwd);
if (status == -1) {
post_submitjob_error(trans_id, jobid,
"Invalid working directory");
TRACE_EXIT;
return PROXY_RES_OK;
}
}
if (mp_buffer_mem_set) {
snprintf(mp_buffer_mem_value, sizeof mp_buffer_mem_value,
"MP_BUFFER_MEM=%s%s%s", mp_buffer_mem, (mp_buffer_mem_max[0]
== '\0') ? "" : ",", mp_buffer_mem_max);
}
if (mp_rdma_count_set) {
snprintf(mp_rdma_count_value, sizeof mp_rdma_count_value,
"MP_RDMA_COUNT=%s%s%s", mp_rdma_count, (mp_rdma_count_2[0]
== '\0') ? "" : ",", mp_rdma_count_2);
}
if (execdir == NULL) {
post_submitjob_error(trans_id, jobid, "No executable directory specified");
TRACE_EXIT;
return PROXY_RES_OK;
}
if (execname == NULL) {
post_submitjob_error(trans_id, jobid, "No executable specified");
TRACE_EXIT;
return PROXY_RES_OK;
}
job = (jobinfo *) malloc(sizeof(jobinfo));
malloc_check(job, __FUNCTION__, __LINE__);
TRACE_DETAIL("+++ Setting up stdio pipe descriptors\n");
TRACE_DETAIL_V("+++ stdout path: %s\n", stdout_path == NULL ? "NULL" : stdout_path);
TRACE_DETAIL_V("+++ stderr path: %s\n", stderr_path == NULL ? "NULL" : stderr_path);
/*
* Set up pipes or files to handle stdio for application. If the path
* for a file is null, then that file descriptor will be redirected to
* a pipe.
* Handle file descriptor setup for stdout first
*/
status =
setup_stdio_fd(trans_id, jobid, stdout_pipe, stdout_path, "stdout", &(job->stdout_fd), &redirect);
if (status == -1) {
TRACE_EXIT;
return PROXY_RES_OK;
}
job->stdout_redirect = redirect;
TRACE_DETAIL_V("stdout FD %d %d\n", stdout_pipe[0], stdout_pipe[1]);
status =
setup_stdio_fd(trans_id, jobid, stderr_pipe, stderr_path, "stderr", &(job->stderr_fd), &redirect);
if (status == -1) {
TRACE_EXIT;
return PROXY_RES_OK;
}
job->stderr_redirect = redirect;
TRACE_DETAIL_V("stderr FD %d %d\n", stderr_pipe[0], stderr_pipe[1]);
job->submit_jobid = jobid;
job->label_io = label_io;
job->split_io = split_io;
job->stdout_info.read_buf = (char *) malloc(READ_BUFFER_SIZE);
malloc_check(job->stdout_info.read_buf, __FUNCTION__, __LINE__);
job->stdout_info.write_buf = (char *) malloc(STDIO_WRITE_BUFSIZE);
malloc_check(job->stdout_info.write_buf, __FUNCTION__, __LINE__);
job->stdout_info.allocated = STDIO_WRITE_BUFSIZE;
job->stdout_info.remaining = STDIO_WRITE_BUFSIZE - 1;
job->stdout_info.cp = job->stdout_info.write_buf;
job->stdout_info.write_func = send_stdout;
job->stderr_info.read_buf = (char *) malloc(READ_BUFFER_SIZE);
malloc_check(job->stderr_info.read_buf, __FUNCTION__, __LINE__);
job->stderr_info.write_buf = (char *) malloc(STDIO_WRITE_BUFSIZE);
malloc_check(job->stderr_info.write_buf, __FUNCTION__, __LINE__);
job->stderr_info.allocated = STDIO_WRITE_BUFSIZE;
job->stderr_info.remaining = STDIO_WRITE_BUFSIZE - 1;
job->stderr_info.cp = job->stderr_info.write_buf;
job->stderr_info.write_func = send_stderr;
job->discovered_job = 0;
TRACE_DETAIL("+++ Forking child process\n");
pid = fork();
if (pid == 0) {
char **argv;
char **envp;
char poe_target[_POSIX_PATH_MAX * 2 + 2];
int max_fd;
/*
* Set up executable argument list and environment variables first
* since there is a small timing window where a second run command
* could be processed while this process is still setting up parameters
* for the first run, resulting in modification of the first program's
* parameters and environment variables.
*/
TRACE_DETAIL("+++ Creating poe exec() parameter list\n");
argv = create_exec_parmlist(POE, poe_target, argp);
envp = create_env_array(args, split_io, mp_buffer_mem_value, mp_rdma_count_value);
/*
* Connect stdio to pipes or files owned by parent process (the
* proxy)
*/
TRACE_DETAIL("+++ Setting up poe stdio file descriptors\n");
status =
setup_child_stdio(trans_id, jobid, STDOUT_FILENO, job->stdout_redirect, &(job->stdout_fd),
stdout_pipe);
if (status == -1) {
TRACE_EXIT;
exit(1);
}
status =
setup_child_stdio(trans_id, jobid, STDERR_FILENO, job->stderr_redirect, &(job->stderr_fd),
stderr_pipe);
if (status == -1) {
TRACE_EXIT;
exit(1);
}
/*
* Close all open file descriptors above stderr.
*/
max_fd = sysconf(_SC_OPEN_MAX);
for (i = STDERR_FILENO + 1; i < max_fd; i++) {
close(i);
}
/*
* Invoke the application as a target of 'poe'
*/
snprintf(poe_target, sizeof poe_target, "%s/%s", execdir, execname);
poe_target[sizeof poe_target - 1] = '\0';
TRACE_DETAIL_V("+++ Ready to invoke %s\n", poe_target);
i = 0;
while (envp[i] != NULL) {
TRACE_DETAIL_V("Target env[%d]: %s\n", i, envp[i]);
i = i + 1;
}
i = 0;
while (argv[i] != NULL) {
TRACE_DETAIL_V("Target arg[%d]: %s\n", i, argv[i]);
i = i + 1;
}
status = execve("/usr/bin/poe", argv, envp);
print_message(ERROR_MESSAGE, "%s failed to execute, status %s\n", argv[0], strerror(errno));
post_submitjob_error(trans_id, jobid, "Exec failed");
TRACE_EXIT;
exit(1);
}
else {
if (pid == -1) {
post_submitjob_error(trans_id, jobid, "Fork failed");
return PROXY_RES_OK;
}
else {
char jobname[40];
char queue_id_str[12];
char jobid_str[12];
if (!job->stdout_redirect) {
close(stdout_pipe[1]);
}
if (!job->stderr_redirect) {
close(stderr_pipe[1]);
}
/*
* Update job information for application and notify front end
* that job is started.
*/
job->tasks = NULL;
job->poe_pid = pid;
job->task0_pid = -1;
job->submit_time = time(NULL);
job->proxy_jobid = generate_id();
TRACE_DETAIL_V("+++ Created poe process pid %d for jobid %d\n", job->poe_pid,
job->proxy_jobid);
/*
* Create thread to watch for application's attach.cfg file to
* be created
*/
pthread_create(&job->startup_thread, &thread_attrs, startup_monitor, job);
AddToList(jobs, job);
snprintf(jobname, sizeof jobname, "%s.%s", my_username, job->submit_jobid);
sprintf(queue_id_str, "%d", queue_id);
sprintf(jobid_str, "%d", job->proxy_jobid);
jobname[sizeof jobname - 1] = '\0';
enqueue_event(proxy_new_job_event(start_events_transid,
queue_id_str, jobid_str, jobname, JOB_STATE_INIT,
job->submit_jobid));
}
}
send_ok_event(trans_id);
TRACE_EXIT;
return PROXY_RES_OK;
}
int
halt_events(int trans_id, int nargs, char *args[])
{
/*
* Set flag indicating events are shut down, and send OK acks for
* both the start_events and stop_events commands now. Once the
* ack for the start_events command is sent, the start_events
* transaction id is no longer valid.
*/
TRACE_ENTRY;
print_message_args(nargs, args);
events_enabled = 0;
send_ok_event(start_events_transid);
send_ok_event(trans_id);
TRACE_EXIT;
return PROXY_RES_OK;
}
int
terminate_job(int trans_id, int nargs, char *args[])
{
/*
* Terminate the application. The initial kill is SIGTERM. If that
* doesn't work, then a separate thread issues a kill -9 one minute
* later.
*/
pthread_t kill_tid;
jobinfo *job;
int job_ident = -1;
int i;
TRACE_ENTRY;
print_message_args(nargs, args);
pthread_mutex_lock(&job_lock);
SetList(jobs);
job = GetListElement(jobs);
for (i = 0; i < nargs; i++) {
if (proxy_test_attribute(JOB_ID_ATTR, args[i])) {
job_ident = proxy_get_attribute_value_int(args[i]);
}
}
while (job != NULL) {
if (job_ident == job->proxy_jobid) {
break;
}
job = GetListElement(jobs);
}
if (job != NULL) {
kill(job->poe_pid, SIGTERM);
/*
* Create a thread to kill the process with kill(9) if the
* target process is still around after 1 minute.
*/
pthread_create(&kill_tid, &thread_attrs, kill_process, (void *) job->poe_pid);
}
pthread_mutex_unlock(&job_lock);
send_ok_event(trans_id);
TRACE_EXIT;
return PROXY_RES_OK;
}
int
quit(int trans_id, int nargs, char *args[])
{
void *thread_status;
TRACE_ENTRY;
print_message_args(nargs, args);
#ifdef HAVE_LLAPI_H
if (use_load_leveler) {
dlclose(ibmll_libpath_handle);
state_shutdown_requested = 1;
}
#endif
pthread_cancel(termination_thread);
pthread_join(termination_thread, &thread_status);
enqueue_event(proxy_shutdown_event(trans_id));
if (run_miniproxy) {
redirect_io();
}
shutdown_requested = 1;
TRACE_EXIT;
return PROXY_RES_OK;
}
/*************************************************************************/
/* Service threads */
/*************************************************************************/
void *
startup_monitor(void *job_ident)
{
/*
* Wait for the attach.cfg file for this task to be completely filled
* in, then read and parse it to build the map of tasks to nodes for
* this application
*/
char tasklist_path[_POSIX_PATH_MAX + 1];
char *cfginfo;
FILE *cfgfile;
int numtasks;
taskinfo *tasks;
taskinfo *taskp;
jobinfo *job;
proxy_msg *msg;
int status;
int done;
time_t last_mtime;
struct stat fileinfo;
int i;
List *new_nodes;
char jobid_str[30];
char procid_str[12];
char procname[20];
TRACE_ENTRY;
job = (jobinfo *) job_ident;
if (job->discovered_job) {
new_nodes = NewList();
}
snprintf(tasklist_path, sizeof tasklist_path, "/tmp/.ppe.%d.attach.cfg", job->poe_pid);
tasklist_path[sizeof tasklist_path - 1] = '\0';
print_message(TRACE_DETAIL_MESSAGE, "Waiting for task config file %s\n", tasklist_path);
last_mtime = -1;
/*
* Wait for the attach.cfg file to be created and to not be modified
* within the last second before trying to read and parse it
*/
for (;;) {
sleep(1);
status = stat(tasklist_path, &fileinfo);
if (status == 0) {
if ((last_mtime >= job->submit_time) && (last_mtime == fileinfo.st_mtime)) {
break;
}
else {
last_mtime = fileinfo.st_mtime;
}
}
}
TRACE_DETAIL("+++ Have task config file\n");
done = 0;
while (!done) {
char *lineptr;
char *tokenptr;
char *linep;
char *p;
int tasknum;
int numlines;
stat(tasklist_path, &fileinfo);
cfgfile = fopen(tasklist_path, "r");
if (cfgfile != NULL) {
cfginfo = (char *) malloc(fileinfo.st_size);
malloc_check(cfginfo, __FUNCTION__, __LINE__);
status = fread(cfginfo, fileinfo.st_size, 1, cfgfile);
fclose(cfgfile);
if (status != fileinfo.st_size) {
/*
* First line contains version info which we don't care
* about, and so it is ignored
*/
p = strtok_r(cfginfo, ";", &lineptr);
if (p == NULL) {
break;
}
/*
* Second line contains number of tasks in job
*/
p = strtok_r(NULL, ";", &lineptr);
if (p == NULL) {
break;
}
linep = p;
p = strtok_r(linep, " ", &tokenptr);
numtasks = atoi(p);
numlines = 0;
TRACE_DETAIL_V("+++ Application has %d tasks\n", numtasks);
tasks = (taskinfo *) calloc(numtasks, sizeof(taskinfo));
malloc_check(tasks, __FUNCTION__, __LINE__);
/*
* Now read one line per task and build task list
*/
p = strtok_r(NULL, ";", &lineptr);
if (p == NULL) {
break;
}
while (!done) {
char *cp;
linep = p;
TRACE_DETAIL_V("Processing %s", linep);
/*
* get task index
*/
p = strtok_r(linep, " ", &tokenptr);
if (p == NULL) {
break;
}
tasknum = atoi(p);
taskp = &tasks[tasknum];
taskp->proxy_taskid = generate_id();
#ifdef __linux__
/*
* skip ignored token
*/
p = strtok_r(NULL, " ", &tokenptr);
if (p == NULL) {
break;
}
#endif
/*
* get node IP address
*/
p = strtok_r(NULL, " ", &tokenptr);
if (p == NULL) {
break;
}
taskp->ipaddr = strdup(p);
/*
* get task hostname. hostname will be truncated to
* short form if LoadLeveler is not being used. If Loadleveler
* is used, it's node list uses full host name so truncation is
* not allowed.
*/
p = strtok_r(NULL, " ", &tokenptr);
if (p == NULL) {
break;
}
if (!use_load_leveler) {
cp = strchr(p, '.');
if (cp != NULL) {
*cp = '\0';
}
}
taskp->hostname = strdup(p);
/*
* get task pid
*/
p = strtok_r(NULL, " ", &tokenptr);
if (p == NULL) {
break;
}
taskp->task_pid = atoi(p);
/*
* get task parent pid
*/
p = strtok_r(NULL, " ", &tokenptr);
if (p == NULL) {
break;
}
taskp->parent_pid = atoi(p);
numlines = numlines + 1;
if (numlines >= numtasks) {
done = 1;
break;
}
/*
* Read next line of task info
*/
p = strtok_r(NULL, ";", &lineptr);
if (p == NULL) {
break;
}
}
/*
* If numline == numtasks then the file was complete and
* we are done with parsing the file. Otherwise clean up
* the list and try again
*/
if (!done) {
free(cfginfo);
delete_task_list(tasknum + 1, tasks);
}
}
else {
/*
* File was not completely read. Close the file then try
* again.
*/
free(cfginfo);
fclose(cfgfile);
delete_task_list(tasknum + 1, tasks);
sleep(1);
}
}
}
/*
* attach.cfg file is complete and has been parsed. Notify the
* front end that the job is running
*/
job->tasks = tasks;
job->numtasks = numtasks;
send_job_state_change_event(start_events_transid, job->proxy_jobid, JOB_STATE_RUNNING);
/*
* For each task in the application, send a new process event to the
* GUI.
*/
taskp = tasks;
sprintf(jobid_str, "%d", ((jobinfo *) job_ident)->proxy_jobid);
msg = proxy_new_process_event(start_events_transid, jobid_str, numtasks);
if (job->discovered_job) {
/*
* If this is a job running before the proxy started, then
* there will be no hostfile. In this case, just add new
* nodes for each unique nodename found in the attach.cfg file.
*/
for (i = 0; i < numtasks; i++) {
if (find_node(taskp[i].hostname) == NULL) {
node_refcount *node;
node = add_node(taskp[i].hostname);
AddToList(new_nodes, node);
node_count = node_count + 1;
}
}
if (SizeOfList(new_nodes) > 0) {
send_new_node_list(start_events_transid, machine_id, new_nodes);
}
DestroyList(new_nodes, NULL);
}
if (use_load_leveler) {
#ifdef HAVE_LLAPI_H
NodeObject *node;
for (i = 0; i < numtasks; i++) {
if (i == 0) {
job->task0_pid = taskp->task_pid;
}
node = get_node_in_hash(my_cluster->node_hash, taskp[i].hostname);
if (node == NULL) {
print_message(ERROR_MESSAGE, "Node %s not found in node list\n", taskp->hostname);
}
else {
sprintf(procid_str, "%d", taskp[i].proxy_taskid);
sprintf(procname, "task_%d", i);
proxy_add_process(msg, procid_str, procname, PROC_STATE_RUNNING, 3);
proxy_add_int_attribute(msg, PROC_NODEID_ATTR, node->proxy_generated_node_id);
proxy_add_int_attribute(msg, PROC_INDEX_ATTR, i);
proxy_add_int_attribute(msg, PROC_PID_ATTR, taskp[i].task_pid);
}
}
#endif
}
else {
for (i = 0; i < numtasks; i++) {
node_refcount *node;
if (i == 0) {
job->task0_pid = taskp->task_pid;
}
/*
* Increment the number of tasks running on the node and send the
* new process event
*/
node = find_node(taskp[i].hostname);
if (node == NULL) {
print_message(ERROR_MESSAGE, "Node %s not found in node list\n", taskp->hostname);
}
else {
node->task_count = node->task_count + 1;
sprintf(procid_str, "%d", taskp[i].proxy_taskid);
sprintf(procname, "task_%d", i);
proxy_add_process(msg, procid_str, procname, PROC_STATE_RUNNING, 3);
proxy_add_int_attribute(msg, PROC_NODEID_ATTR, node->proxy_nodeid);
proxy_add_int_attribute(msg, PROC_INDEX_ATTR, i);
proxy_add_int_attribute(msg, PROC_PID_ATTR, taskp[i].task_pid);
}
}
}
enqueue_event(msg);
/*
* Now that all task pids are known, I/O from the application can be
* enabled since we can now map I/O to a specific pid. This is done
* here only for stdout since task mapping information is required to
* handle splitting stdout by task. Output to stderr is not split by
* task, so the stderr file handler is registered at application startup.
* If the job was a job discovered at proxy startup, there is no connection
* to stdio file descriptors, so don't register file handlers.
*/
if (!job->stdout_redirect && !job->discovered_job) {
RegisterFileHandler(job->stdout_fd, READ_FILE_HANDLER, stdout_handler, job);
}
/*
* The startup thread exits at this point, so clear the reference in
* the job info
*/
job->startup_thread = 0;
TRACE_EXIT;
return NULL;
}
void *
zombie_reaper(void *arg)
{
/*
* Watch for poe tasks started by this proxy to terminate. When a task
* terminates, clean up resources and notify the front end that a job
* has completed. Post completion status to front end. This function is
* invoked by a thread started at proxy startup, and runs until the
* proxy is shut down
*/
int status;
pid_t terminated_pmd;
struct rusage rusage_info;
jobinfo *job;
TRACE_ENTRY;
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
for (;;) {
pthread_mutex_lock(&job_lock);
/*
* Scan the job list looking for jobs running before the proxy
* started. For each of those jobs, check if the poe process
* still exists. If the process has disappeared, then mark the
* job terminated.
*/
SetList(jobs);
job = GetListElement(jobs);
while (job != NULL) {
if ((job->discovered_job) && (kill(job->poe_pid, 0) != 0)) {
TRACE_DETAIL_V("+++ poe process %d Exited with unknown status\n", job->poe_pid);
send_job_state_change_event(start_events_transid,
job->proxy_jobid, JOB_STATE_TERMINATED);
send_process_state_change_event(start_events_transid, job, PROC_STATE_EXITED);
if (job->tasks != NULL) {
if (!use_load_leveler) {
update_node_refcounts(job->numtasks, job->tasks);
}
delete_task_list(job->numtasks, job->tasks);
}
RemoveFromList(jobs, job);
}
job = GetListElement(jobs);
}
/*
* wait for any poe task to terminate
*/
terminated_pmd = wait3(&status, WNOHANG, &rusage_info);
if (terminated_pmd > 0) {
TRACE_DETAIL_V("+++ Pid %d exited\n", terminated_pmd);
/*
* Look for job with matching poe process pid. If found, then
* post process status to front end and clean up resources
*/
SetList(jobs);
job = GetListElement(jobs);
while (job != NULL) {
if (terminated_pmd == job->poe_pid) {
break;
}
job = GetListElement(jobs);
}
if (job != NULL) {
/*
* POE process status used to be reported to the GUI.
* Apparently, now, only job status is posted.
*/
if (status <= 128) {
TRACE_DETAIL_V("+++ %d Exited with status %08x\n", terminated_pmd, status);
send_job_state_change_event(start_events_transid,
job->proxy_jobid, JOB_STATE_TERMINATED);
send_process_state_change_event(start_events_transid, job, PROC_STATE_EXITED);
}
else {
TRACE_DETAIL_V("+++ %d signalled with status %08x\n", terminated_pmd,
status - 128);
send_job_state_change_event(start_events_transid, job->proxy_jobid,
JOB_STATE_ERROR);
send_process_state_change_event(start_events_transid, job,
PROC_STATE_EXITED_SIGNALLED);
}
/*
* Since the job is terminated, the stdio file descriptor
* pipes should be unregistered from the main polling loop
* and the file descriptors closed. However, if there
* is still data in the buffers, this data would be lost.
* So, leave the pipes registered. The polling loop
* will eventually process the data left in the pipe. If the
* last line of output does not end with a newline, that
* line may be lost
*/
if (job->tasks != NULL) {
if (!use_load_leveler) {
update_node_refcounts(job->numtasks, job->tasks);
}
delete_task_list(job->numtasks, job->tasks);
}
free(job->submit_jobid);
free(job->stdout_info.read_buf);
free(job->stderr_info.read_buf);
free(job->stdout_info.write_buf);
free(job->stderr_info.write_buf);
RemoveFromList(jobs, job);
free(job);
}
}
pthread_mutex_unlock(&job_lock);
/*
* Poll for status once per second
*/
sleep(1);
}
TRACE_EXIT;
}
static
void
redirect_io(void)
{
/*
* STDIO for running applications needs to be redirected in order to
* prevent the application from terminating with SIGPIPE for stdin
* or from blocking on stdout or stderr pipes when they fill.
* To do this, build a command line invocation for a miniproxy
* process consisting of the path prefixes for stdout and stderr
* file descriptors followed by lists of file descriptors for stdin,
* stdout and stderr, then invoke the miniproxy.
*/
jobinfo *job;
int stdin_count;
int stdout_count;
int stderr_count;
int redirected_fds;
char *miniproxy_args[3];
char *miniproxy_parmlist;
char *stdin_fds;
char *stdout_fds;
char *stderr_fds;
char num[10];
static char *miniproxy_env[] = { NULL };
/*
* Determine how many non-redirected file descriptors there are for
* stdin, stdout and stderr.
*/
TRACE_ENTRY;
stdin_count = 0;
stdout_count = 0;
stderr_count = 0;
SetList(jobs);
job = GetListElement(jobs);
redirected_fds = 0;
while (job != NULL) {
if (!job->stdin_redirect) {
stdin_count = stdin_count + 1;
redirected_fds = 1;
}
if (!job->stdout_redirect) {
stdout_count = stdout_count + 1;
redirected_fds = 1;
}
if (!job->stderr_redirect) {
stderr_count = stderr_count + 1;
redirected_fds = 1;
}
job = GetListElement(jobs);
}
if (redirected_fds) {
/*
* Allocate a string long enough to hold three pathnames, and lists of file
* descriptors for stdin, stdout and stderr, each prefixed with a file
* descriptor count. Then allocate work strings for each file descriptor list
*/
miniproxy_parmlist = (char *) malloc(PATH_MAX * 3 + stdin_count * 11 +
stdout_count * 11 + stderr_count * 11 + 30);
if (miniproxy_parmlist == NULL) {
exit(1);
}
stdin_fds = (char *) malloc(stdin_count * 11);
malloc_check(stdin_fds, __FUNCTION__, __LINE__);
stdout_fds = (char *) malloc(stdout_count * 11);
malloc_check(stdout_fds, __FUNCTION__, __LINE__);
stderr_fds = (char *) malloc(stderr_count * 11);
malloc_check(stderr_fds, __FUNCTION__, __LINE__);
/*
* Initialize the file descriptor lists, then concatenate each
* fd number for a non-redirected file to its list.
*/
stdin_fds[0] = '\0';
stdout_fds[0] = '\0';
stderr_fds[0] = '\0';
SetList(jobs);
job = GetListElement(jobs);
while (job != NULL) {
if (!job->stdin_redirect) {
sprintf(num, "%d ", job->stdin_fd);
strcat(stdin_fds, num);
}
if (!job->stdout_redirect) {
sprintf(num, "%d ", job->stdout_fd);
strcat(stdout_fds, num);
}
if (!job->stderr_redirect) {
sprintf(num, "%d ", job->stderr_fd);
strcat(stderr_fds, num);
}
job = GetListElement(jobs);
}
/*
* Build the miniproxy parameter list. All data is contained in a
* string that is tokenized into individual parameters by miniproxy
*/
strcpy(miniproxy_parmlist, miniproxy_args[0]);
if (state_trace == 0) {
strcat(miniproxy_parmlist, " n");
}
else {
strcat(miniproxy_parmlist, " y");
}
strcat(miniproxy_parmlist, " /tmp/mp_stdout /tmp/mp_stderr ");
sprintf(num, "%d ", stdin_count);
strcat(miniproxy_parmlist, num);
strcat(miniproxy_parmlist, stdin_fds);
sprintf(num, "%d ", stdout_count);
strcat(miniproxy_parmlist, num);
strcat(miniproxy_parmlist, stdout_fds);
sprintf(num, "%d ", stderr_count);
strcat(miniproxy_parmlist, num);
strcat(miniproxy_parmlist, stderr_fds);
miniproxy_args[0] = miniproxy_path;
miniproxy_args[1] = miniproxy_parmlist;
miniproxy_args[2] = NULL;
print_message(TRACE_DETAIL_MESSAGE, "Invoking miniproxy with args %s\n",
miniproxy_parmlist);
if (fork() == 0) {
execve(miniproxy_args[0], miniproxy_args, miniproxy_env);
print_message(ERROR_MESSAGE, "Failed to invke miniproxy %s: %s\n",
miniproxy_args[0], strerror(errno));
TRACE_EXIT;
exit(1);
}
}
}
#ifdef HAVE_LLAPI_H
void *
monitor_LoadLeveler_nodes(void *job_ident)
{
int rc = 0;
int i = 0;
char *node_name = NULL;
LL_element *node = NULL;
LL_element *query_elem = NULL;
int node_count = 0;
int sleep_seconds = 30;
int sleep_time_reset = 0; /* if changes this pass */
LL_element *errObj = NULL;
ListElement *cluster_list_element = NULL;
ClusterObject *cluster_object = NULL;
NodeObject *node_object = NULL;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d.\n", __FUNCTION__, __LINE__);
/*-----------------------------------------------------------------------*
* loop forever until we are told we are shutting down. *
*-----------------------------------------------------------------------*/
while (state_shutdown_requested == 0) {
pthread_mutex_lock(&master_lock);
if (state_shutdown_requested == 1) {
pthread_mutex_unlock(&master_lock);
break;
}
query_elem = NULL;
node_name = NULL;
node_count = 0;
node = NULL;
LL_cluster_param cluster_parm;
char *remote_cluster[2];
Hash *node_hash = NULL;
HashEntry *hash_element = NULL;
List *node_list = NULL;
ListElement *node_list_element = NULL;
sleep_time_reset = 0;
print_message(TRACE_MESSAGE, ">>> %s thread running. line=%d.\n", __FUNCTION__, __LINE__);
if (cluster_list == NULL) {
refresh_cluster_list();
}
if (cluster_list != NULL) {
/*-----------------------------------------------------------------------*
* loop on the cluster list we obtained earlier from LoadLeveler. *
*-----------------------------------------------------------------------*/
cluster_list_element = cluster_list->l_head;
while (cluster_list_element != NULL) {
cluster_object = cluster_list_element->l_value;
cluster_list_element = cluster_list_element->l_next;
if (cluster_object->node_hash->count <= 0) {
sleep_time_reset = 1;
}
if (multicluster_status == 1) {
/*-----------------------------------------------------------------------*
* we are running multicluster - set cluster name into environment *
* to influence where LoadLeveler searches for data (what cluster) *
*-----------------------------------------------------------------------*/
remote_cluster[0] = cluster_object->cluster_name;
remote_cluster[1] = NULL;
print_message(INFO_MESSAGE, "Setting access for LoadLeveler cluster=%s.\n",
cluster_object->cluster_name);
cluster_parm.action = CLUSTER_SET;
cluster_parm.cluster_list = remote_cluster;
rc = my_ll_cluster(LL_API_VERSION, &errObj, &cluster_parm);
}
else {
/*-----------------------------------------------------------------------*
* not running multicluster *
*-----------------------------------------------------------------------*/
print_message(INFO_MESSAGE,
"Setting access for LoadLeveler local cluster (single cluster).\n");
cluster_parm.action = CLUSTER_UNSET;
cluster_parm.cluster_list = NULL;
rc = my_ll_cluster(LL_API_VERSION, &errObj, &cluster_parm);
}
/*-----------------------------------------------------------------------*
* build a LoadLeveler query object (for nodes) *
*-----------------------------------------------------------------------*/
query_elem = my_ll_query(MACHINES);
if (query_elem == NULL) {
print_message(ERROR_MESSAGE,
"Unable to obtain query element. LoadLeveler may not be active or is not responding.\n");
continue;
}
/*-----------------------------------------------------------------------*
* set the request type for LoadLeveler (we want nodes) *
*-----------------------------------------------------------------------*/
print_message(INFO_MESSAGE,
"Call LoadLeveler (ll_set_request) for nodes in cluster=%s.\n",
cluster_object->cluster_name);
rc = my_ll_set_request(query_elem, QUERY_ALL, NULL, ALL_DATA);
if (rc != 0) {
rc = my_ll_deallocate(query_elem);
query_elem = NULL;
continue;
}
/*-----------------------------------------------------------------------*
* get nodes from LoadLeveler for current or local cluster. *
*-----------------------------------------------------------------------*/
print_message(INFO_MESSAGE,
"Call LoadLeveler (ll_get_objs) for nodes in cluster=%s.\n",
cluster_object->cluster_name);
node = my_ll_get_objs(query_elem, LL_CM, NULL, &node_count, &rc);
if (rc != 0) {
rc = my_ll_deallocate(query_elem);
query_elem = NULL;
continue;
}
print_message(INFO_MESSAGE, "Number of LoadLeveler Nodes=%d in cluster=%s.\n",
node_count, cluster_object->cluster_name);
/*-----------------------------------------------------------------------*
* loop on the nodes returned by LoadLeveler *
*-----------------------------------------------------------------------*/
i = 0;
while (node != NULL) {
print_message(INFO_MESSAGE, "LoadLeveler Node %d:\n", i);
rc = my_ll_get_data(node, LL_MachineName, &node_name);
if (rc == 0) {
print_message(INFO_MESSAGE, "Node name=%s\n", node_name);
if ((node_object = get_node_in_hash(cluster_object->node_hash, node_name)) != NULL) {
/*-----------------------------------------------------------------------*
* node returned by LoadLeveler was found in our ptp node list. *
* flag it as found. *
*-----------------------------------------------------------------------*/
node_object->node_found = 1;
if (node_object->node_state != MY_STATE_UP) {
node_object->node_state = MY_STATE_UP;
sleep_time_reset = 1;
print_message(INFO_MESSAGE,
"Schedule event notification: node=%s changed for LoadLeveler Cluster=%s.\n",
node_name, cluster_object->cluster_name);
sendNodeChangeEvent(start_events_transid, cluster_object,
node_object);
}
}
else { /* new node (not yet in list) */
/*-----------------------------------------------------------------------*
* node returned by LoadLeveler was not found in our ptp node list *
* add it and generate an event to the gui. flag it as added. *
*-----------------------------------------------------------------------*/
node_object = (NodeObject *) malloc(sizeof(NodeObject));
malloc_check(node_object, __FUNCTION__, __LINE__);
memset(node_object, '\0', sizeof(node_object));
node_object->proxy_generated_node_id = generate_id();
node_object->node_name = strdup(node_name);
node_object->node_found = 2;
node_object->node_state = MY_STATE_UP;
sleep_time_reset = 1;
add_node_to_hash(cluster_object->node_hash, (void *) node_object);
sleep_time_reset = 1;
print_message(INFO_MESSAGE,
"Schedule event notification: node=%s added for LoadLeveler Cluster=%s.\n",
node_name, cluster_object->cluster_name);
sendNodeAddEvent(start_events_transid, cluster_object, node_object);
}
}
i++;
node = my_ll_next_obj(query_elem);
}
/*-----------------------------------------------------------------------*
* loop on the ptp node list to see if any nodes were not returned *
* by LoadLeveler on this pass (maybe they went down). *
* generate an event (changed/gone) to the gui. *
*-----------------------------------------------------------------------*/
node_hash = cluster_object->node_hash;
if (node_hash != NULL) {
HashSet(node_hash);
hash_element = HashGet(node_hash);
while (hash_element != NULL) {
node_list = (List *) hash_element->h_data;
hash_element = HashGet(node_hash);
node_list_element = node_list->l_head;
while (node_list_element != NULL) {
node_object = node_list_element->l_value;
node_list_element = node_list_element->l_next;
if (node_object->node_found == 0) {
if (node_object->node_state != MY_STATE_UNKNOWN) {
node_object->node_state = MY_STATE_UNKNOWN;
print_message(INFO_MESSAGE,
"Schedule event notification: node=%s changed for LoadLeveler Cluster=%s.\n",
node_name, cluster_object->cluster_name);
sendNodeChangeEvent(start_events_transid, cluster_object,
node_object);
sleep_time_reset = 1;
}
}
else {
node_object->node_found = 0;
}
}
}
}
if (query_elem != NULL) {
rc = my_ll_free_objs(query_elem);
rc = my_ll_deallocate(query_elem);
query_elem = NULL;
}
}
}
else {
sleep_time_reset = 1;
}
pthread_mutex_unlock(&master_lock);
/*-----------------------------------------------------------------------*
* adjust sleep interval based on changes this pass. *
*-----------------------------------------------------------------------*/
if (sleep_time_reset == 1) {
sleep_seconds = min_node_sleep_seconds;
}
else {
sleep_seconds = sleep_seconds + min_node_sleep_seconds;
if (sleep_seconds > max_node_sleep_seconds) {
sleep_seconds = max_node_sleep_seconds;
}
}
/*-----------------------------------------------------------------------*
* sleep and loop again on the LoadLeveler machines. *
*-----------------------------------------------------------------------*/
if (state_shutdown_requested == 0) {
int sleep_interval = 0;
int mini_sleep_interval = (sleep_seconds + 4) / 5;
print_message(INFO_MESSAGE, "%s Sleeping for (%d seconds) %d intervals of 5 seconds\n",
__FUNCTION__, mini_sleep_interval * 5, mini_sleep_interval);
for (sleep_interval = 0; sleep_interval < mini_sleep_interval; sleep_interval++) {
if (state_shutdown_requested == 0) {
sleep(5);
}
}
}
}
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return NULL;
}
/*************************************************************************
* Service thread - Loop while allowed to monitor LoadLeveler for jobs *
*************************************************************************/
void *
monitor_LoadLeveler_jobs(void *job_ident)
{
int rc = 0;
int i = 0;
char *job_name = NULL;
char *job_submit_host = NULL;
char *step_ID = NULL;
LL_STEP_ID ll_step_id;
int step_machine_count = 0;
LL_element *job = NULL;
LL_element *step = NULL;
LL_element *query_elem = NULL;
LL_element *node = NULL;
LL_element *task = NULL;
LL_element *task_instance = NULL;
int job_count = 0;
LL_element *errObj = NULL;
LL_cluster_param cluster_parm;
char *remote_cluster[2];
ListElement *job_list_element = NULL;
List *task_list = NULL;
ListElement *task_list_element = NULL;
char *task_instance_machine_name = NULL;
char *task_instance_machine_address = NULL;
int task_instance_task_ID = 0;
int step_node_count = 0;
int node_task_count = 0;
int task_instance_count = 0;
time_t my_clock;
ListElement *cluster_list_element = NULL;
ClusterObject *cluster_object = NULL;
JobObject *job_object = NULL;
TaskObject *task_object = NULL;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d.\n", __FUNCTION__, __LINE__);
/*-----------------------------------------------------------------------*
* loop forever until we are told we are shutting down. *
*-----------------------------------------------------------------------*/
while (state_shutdown_requested == 0) {
pthread_mutex_lock(&master_lock);
if (state_shutdown_requested == 1) {
pthread_mutex_unlock(&master_lock);
break;
}
query_elem = NULL;
job_name = NULL;
step_ID = NULL;
job_count = 0;
step_machine_count = 0;
job = NULL;
char *pChar = NULL;
print_message(TRACE_MESSAGE, ">>> %s thread running. line=%d.\n", __FUNCTION__, __LINE__);
if (cluster_list != NULL) {
/*-----------------------------------------------------------------------*
* loop on the cluster list we obtained earlier from LoadLeveler. *
*-----------------------------------------------------------------------*/
cluster_list_element = cluster_list->l_head;
while (cluster_list_element != NULL) {
cluster_object = cluster_list_element->l_value;
cluster_list_element = cluster_list_element->l_next;
if (cluster_object != NULL) {
if (cluster_object->node_hash != NULL) {
if (cluster_object->node_hash->count > 0) {
if (multicluster_status == 1) {
/*-----------------------------------------------------------------------*
* we are running multicluster - set cluster name into environment *
* to influence where LoadLeveler searches for data (what cluster) *
*-----------------------------------------------------------------------*/
remote_cluster[0] = cluster_object->cluster_name;
remote_cluster[1] = NULL;
print_message(INFO_MESSAGE,
"Setting access for LoadLeveler cluster=%s.\n",
cluster_object->cluster_name);
cluster_parm.action = CLUSTER_SET;
cluster_parm.cluster_list = remote_cluster;
rc = my_ll_cluster(LL_API_VERSION, &errObj, &cluster_parm);
}
else {
/*-----------------------------------------------------------------------*
* not running multicluster *
*-----------------------------------------------------------------------*/
print_message(INFO_MESSAGE,
"Setting access for LoadLeveler local cluster (single cluster).\n");
cluster_parm.action = CLUSTER_UNSET;
cluster_parm.cluster_list = NULL;
rc = my_ll_cluster(LL_API_VERSION, &errObj, &cluster_parm);
}
/*-----------------------------------------------------------------------*
* build a LoadLeveler query object (for jobs) *
*-----------------------------------------------------------------------*/
query_elem = my_ll_query(JOBS);
if (query_elem == NULL) {
print_message(ERROR_MESSAGE,
"Unable to obtain query element. LoadLeveler may not be active or is not responding.\n");
continue;
}
/*-----------------------------------------------------------------------*
* set the request type for LoadLeveler (we want nodes) *
*-----------------------------------------------------------------------*/
print_message(INFO_MESSAGE,
"Call LoadLeveler (ll_set_request) for jobs in cluster=%s.\n",
cluster_object->cluster_name);
rc = my_ll_set_request(query_elem, QUERY_ALL, NULL, ALL_DATA);
if (rc != 0) {
rc = my_ll_deallocate(query_elem);
query_elem = NULL;
continue;
}
/*-----------------------------------------------------------------------*
* get jobs from LoadLeveler for current or local cluster. *
*-----------------------------------------------------------------------*/
print_message(INFO_MESSAGE,
"Call LoadLeveler (ll_get_objs) for jobs in cluster=%s.\n",
cluster_object->cluster_name);
job_count = 0;
job = my_ll_get_objs(query_elem, LL_CM, NULL, &job_count, &rc);
if (rc != 0) {
rc = my_ll_deallocate(query_elem);
query_elem = NULL;
}
print_message(INFO_MESSAGE,
"Number of LoadLeveler Jobs=%d in cluster=%s.\n",
job_count, cluster_object->cluster_name);
/*-----------------------------------------------------------------------*
* loop on the jobs returned by LoadLeveler *
*-----------------------------------------------------------------------*/
i = 0;
while (job != NULL) {
print_message(INFO_MESSAGE, "LoadLeveler Job %d:\n", i);
rc = my_ll_get_data(job, LL_JobSubmitHost, &job_submit_host);
rc = my_ll_get_data(job, LL_JobName, &job_name);
if (rc == 0) { /* do something here with the job object */
/*
* For interactive PE jobs submitted thru LoadLeveler, there is no way to match
* the invocation of the job thru PE and the appearance of the job in the Loadleveler
* job queue since when the job is submitted, the only thing known is the pid of the poe
* process, and the pid is not available in the responses to LoadLeveler queries. If the pid was
* available, then we could match up based on reading the attach.cfg file generated by PE.
* The alternative is to attempt to detect interactive PE jobs in the LoadLeveler job queue
* and ignore them. The proxy threads created to monitor interactive PE status by watching for the
* attach.cfg file and process termination will be responsible for creating the new job events
* and associated events for the interactive PE job. This isn't 100% perfect, but with the available
* information is probably the best that can be done.
*/
int job_step_type;
my_ll_get_data(job, LL_JobStepType, &job_step_type);
if (job_step_type == INTERACTIVE_JOB) {
int job_is_remote;
char *submit_user_name;
my_ll_get_data(job, LL_JobIsRemote, &job_is_remote);
if (job_is_remote) {
my_ll_get_data(job, LL_JobSubmittingUser,
*submit_user_name);
}
else {
LL_element *job_credentials;
my_ll_get_data(job, LL_JobCredential, &job_credentials);
my_ll_get_data(job_credentials, LL_CredentialUserName,
&submit_user_name);
}
if (strcmp(my_username, submit_user_name) == 0) {
print_message(INFO_MESSAGE,
"Job %s is an interactive job for this user, and is ignored\n",
job_name);
free(submit_user_name);
job = my_ll_next_obj(query_elem);
continue;
}
else {
free(submit_user_name);
}
}
print_message(INFO_MESSAGE, "Job name=%s\n", job_name);
rc = my_ll_get_data(job, LL_JobGetFirstStep, &step);
while (step != NULL) {
step_machine_count = 0;
rc = my_ll_get_data(step, LL_StepID, &step_ID);
if (rc != 0) {
rc = my_ll_free_objs(query_elem);
rc = my_ll_deallocate(query_elem);
query_elem = NULL;
continue;
}
else {
/*-----------------------------------------------------------------------*
* break the job step name apart into a LoadLeveler LL_STEP_ID *
*-----------------------------------------------------------------------*/
ll_step_id.from_host = strdup(job_submit_host);
pChar = step_ID + strlen(job_submit_host) + 1;
pChar = strtok(pChar, ".");
ll_step_id.cluster = atoi(pChar);
pChar = strtok(NULL, ".");
ll_step_id.proc = atoi(pChar);
print_message(INFO_MESSAGE, "Job step ID=%s.%d.%d\n",
ll_step_id.from_host, ll_step_id.cluster,
ll_step_id.proc);
if ((job_object = get_job_in_list(job_list, ll_step_id)) != NULL) {
/*-----------------------------------------------------------------------*
* step returned by LoadLeveler was found in our ptp job list. *
* flag it as found. *
*-----------------------------------------------------------------------*/
job_object->job_found = 1;
if (job_object->job_state == MY_STATE_UNKNOWN) {
job_object->job_state = MY_STATE_IDLE;
print_message(INFO_MESSAGE,
"Schedule event notification: Job=%s.%d.%d changed for LoadLeveler Cluster=%s.\n",
job_object->ll_step_id.from_host,
job_object->ll_step_id.cluster,
job_object->ll_step_id.proc,
job_object->cluster_name);
sendJobChangeEvent(start_events_transid,
job_object);
}
if (multicluster_status == 1) {
if (strcmp(job_object->cluster_name, cluster_object->cluster_name) != 0) {
sendJobRemoveEvent(start_events_transid,
job_object);
job_object->cluster_name = cluster_object->cluster_name;
sendJobAddEvent(start_events_transid,
cluster_object, job_object);
}
}
}
else {
/*-----------------------------------------------------------------------*
* job returned by LoadLeveler was not found in our ptp job list *
* add it and generate an event to the gui. flag it as added. *
*-----------------------------------------------------------------------*/
job_object =
(JobObject *) malloc(sizeof(JobObject));
malloc_check(job_object, __FUNCTION__, __LINE__);
memset(job_object, '\0', sizeof(job_object));
job_object->proxy_generated_job_id = generate_id();
job_object->gui_assigned_job_id = "-1";
job_object->ll_step_id.from_host =
strdup(ll_step_id.from_host);
job_object->ll_step_id.cluster = ll_step_id.cluster;
job_object->ll_step_id.proc = ll_step_id.proc;
job_object->job_found = 2;
job_object->job_state = MY_STATE_IDLE;
job_object->task_list = NewList();
job_object->cluster_name =
strdup(cluster_object->cluster_name);
add_job_to_list(job_list, (void *) job_object);
print_message(INFO_MESSAGE,
"Schedule event notification: Job=%s.%d.%d added for LoadLeveler Cluster=%s.\n",
job_object->ll_step_id.from_host,
job_object->ll_step_id.cluster,
job_object->ll_step_id.proc,
job_object->cluster_name);
sendJobAddEvent(start_events_transid,
cluster_object, job_object);
}
rc = my_ll_get_data(step, LL_StepNodeCount, &step_node_count);
print_message(INFO_MESSAGE,
"Step=%s.%d.%d. StepNodeCount=%d.\n",
job_object->ll_step_id.from_host,
job_object->ll_step_id.cluster,
job_object->ll_step_id.proc,
step_node_count);
/*-----------------------------------------------------------------------*
* if this job from LoadLeveler has nodes (is running-like) then loop on *
* the nodes to see task status for new or existing tasks. *
*-----------------------------------------------------------------------*/
if (step_node_count > 0) {
rc = my_ll_get_data(step, LL_StepGetFirstNode, &node); /* node */
/*-----------------------------------------------------------------------*
* loop on the nodes in the job returned by LoadLeveler *
*-----------------------------------------------------------------------*/
while (node != NULL) {
rc = my_ll_get_data(node, LL_NodeTaskCount, &node_task_count);
print_message(INFO_MESSAGE,
"NodeTaskCount=%d.\n",
node_task_count);
rc = my_ll_get_data(node, LL_NodeGetFirstTask, &task);
/*-----------------------------------------------------------------------*
* loop on the tasks in the job returned by LoadLeveler *
*-----------------------------------------------------------------------*/
while (task != NULL) {
rc = my_ll_get_data(task, LL_TaskTaskInstanceCount, &task_instance_count);
print_message(INFO_MESSAGE,
"TaskInstanceCount=%d.\n",
task_instance_count);
rc = my_ll_get_data(task, LL_TaskGetFirstTaskInstance, &task_instance);
/*-----------------------------------------------------------------------*
* loop on the task_instances in the job returned by LoadLeveler *
*-----------------------------------------------------------------------*/
while (task_instance != NULL) {
rc = my_ll_get_data(task_instance, LL_TaskInstanceMachineName, &task_instance_machine_name);
rc = my_ll_get_data(task_instance, LL_TaskInstanceMachineAddress, &task_instance_machine_address);
rc = my_ll_get_data(task_instance, LL_TaskInstanceTaskID, &task_instance_task_ID);
print_message(INFO_MESSAGE,
"TaskInstanceMachineName=%s. TaskInstanceMachineAddress=%s. TaskInstanceTaskID=%d.\n",
task_instance_machine_name,
task_instance_machine_address,
task_instance_task_ID);
if ((task_object = get_task_in_list(job_object->task_list, task_instance_machine_name, task_instance_task_ID)) != NULL) {
/*-----------------------------------------------------------------------*
* task returned by LoadLeveler was found in our ptp job task list. *
* flag it as found. *
*-----------------------------------------------------------------------*/
task_object->ll_task_id =
task_instance_task_ID;
task_object->task_found = 1;
if (task_object->task_state !=
MY_STATE_RUNNING) {
task_object->task_state =
MY_STATE_RUNNING;
print_message(INFO_MESSAGE,
"Schedule event notification: Task_ID=%d running on node_name=%s added for LoadLeveler Job=%s.%d.%d for LoadLeveler Cluster=%s.\n",
task_object->ll_task_id,
task_object->node_name,
job_object->ll_step_id.
from_host,
job_object->ll_step_id.
cluster,
job_object->ll_step_id.proc,
job_object->cluster_name);
sendTaskChangeEvent
(start_events_transid,
job_object, task_object);
}
if (job_object->job_state !=
MY_STATE_RUNNING) {
job_object->job_state =
MY_STATE_RUNNING;
print_message(INFO_MESSAGE,
"Schedule event notification: Job=%s.%d.%d changed for LoadLeveler Cluster=%s.\n",
job_object->ll_step_id.
from_host,
job_object->ll_step_id.
cluster,
job_object->ll_step_id.proc,
job_object->cluster_name);
sendJobChangeEvent
(start_events_transid,
job_object);
}
}
else {
/*-----------------------------------------------------------------------*
* task returned by LoadLeveler was not found in our ptp job task list *
* add it and generate an event to the gui. flag it as added. *
*-----------------------------------------------------------------------*/
task_object =
(TaskObject *)
malloc(sizeof(TaskObject));
malloc_check(task_object,
__FUNCTION__,
__LINE__);
memset(task_object, '\0', sizeof(task_object));
task_object->proxy_generated_task_id = generate_id();
task_object->ll_task_id =
task_instance_task_ID;
task_object->node_name =
strdup
(task_instance_machine_name);
task_object->node_address =
strdup
(task_instance_machine_address);
task_object->task_found = 2; /* flag it as added */
task_object->task_state =
MY_STATE_RUNNING;
print_message(INFO_MESSAGE,
"Schedule event notification: Job=%s.%d.%d changed for LoadLeveler Cluster=%s.\n",
job_object->ll_step_id.from_host,
job_object->ll_step_id.cluster,
job_object->ll_step_id.proc,
job_object->cluster_name);
if (job_object->job_state !=
MY_STATE_RUNNING) {
job_object->job_state =
MY_STATE_RUNNING;
print_message(INFO_MESSAGE,
"Schedule event notification: Job=%s.%d.%d changed for LoadLeveler Cluster=%s.\n",
job_object->ll_step_id.
from_host,
job_object->ll_step_id.
cluster,
job_object->ll_step_id.proc,
job_object->cluster_name);
sendJobChangeEvent
(start_events_transid,
job_object);
}
add_task_to_list(job_object->task_list, (void *) task_object);
print_message(INFO_MESSAGE,
"Schedule event notification: Task_ID=%d running on node_name=%s added for LoadLeveler Job=%s.%d.%d for LoadLeveler Cluster=%s.\n",
task_object->ll_task_id,
task_object->node_name,
job_object->ll_step_id.from_host,
job_object->ll_step_id.cluster,
job_object->ll_step_id.proc,
job_object->cluster_name);
sendTaskAddEvent
(start_events_transid,
cluster_object, job_object,
task_object);
}
rc = my_ll_get_data(task, LL_TaskGetNextTaskInstance, &task_instance);
}
rc = my_ll_get_data(node, LL_NodeGetNextTask, &task);
}
rc = my_ll_get_data(step, LL_StepGetNextNode, &node);
}
}
/*-----------------------------------------------------------------------*
* loop on the tasks in the job object - if any not found that were *
* present before then generate deleted task events. *
*-----------------------------------------------------------------------*/
task_list = (List *) job_object->task_list;
task_list_element = task_list->l_head;
while (task_list_element != NULL) {
task_object = task_list_element->l_value;
task_list_element = task_list_element->l_next;
if (task_object != 0) {
if (task_object->task_found == 0) {
task_object->task_state =
MY_STATE_TERMINATED;
task_object->task_found = 0;
print_message(INFO_MESSAGE,
"Schedule event notification: Task_ID=%d running on node_name=%s deleted for LoadLeveler Job=%s.%d.%d for LoadLeveler Cluster=%s.\n",
task_object->ll_task_id,
task_object->node_name,
job_object->ll_step_id.
from_host,
job_object->ll_step_id.
cluster,
job_object->ll_step_id.proc,
job_object->cluster_name);
sendTaskRemoveEvent(start_events_transid,
job_object,
task_object);
delete_task_from_list(task_list, task_object);
if (SizeOfList(task_list) == 0) {
if (job_object->job_state ==
MY_STATE_RUNNING) {
job_object->job_state = MY_STATE_TERMINATED;
print_message(INFO_MESSAGE,
"Schedule event notification: Job=%s.%d.%d terminated for LoadLeveler Cluster=%s.\n",
job_object->ll_step_id.from_host,
job_object->ll_step_id.cluster,
job_object->ll_step_id.proc,
job_object->cluster_name);
sendJobChangeEvent
(start_events_transid,
job_object);
}
else {
job_object->job_state =
MY_STATE_IDLE;
print_message(INFO_MESSAGE,
"Schedule event notification: Job=%s.%d.%d changed for LoadLeveler Cluster=%s.\n",
job_object->ll_step_id.from_host,
job_object->ll_step_id.cluster,
job_object->ll_step_id.proc,
job_object->cluster_name);
sendJobChangeEvent
(start_events_transid,
job_object);
}
}
}
else {
task_object->task_found = 0;
}
}
}
rc = my_ll_get_data(job, LL_JobGetNextStep, &step);
}
}
}
i++;
job = my_ll_next_obj(query_elem);
}
}
}
}
if (query_elem != NULL) {
rc = my_ll_free_objs(query_elem);
rc = my_ll_deallocate(query_elem);
query_elem = NULL;
}
}
if (query_elem != NULL) {
rc = my_ll_free_objs(query_elem);
rc = my_ll_deallocate(query_elem);
query_elem = NULL;
}
/*-----------------------------------------------------------------------*
* get the time and see if job has been sitting in submitted state too *
* long. *
*-----------------------------------------------------------------------*/
time(&my_clock);
/*-----------------------------------------------------------------------*
* loop on the ptp job list to see if any jobs were not returned *
* by LoadLeveler on this pass (maybe they went down). *
* generate an event (changed/gone) to the gui. *
*-----------------------------------------------------------------------*/
if (job_list != NULL) {
job_list_element = job_list->l_head;
while (job_list_element != NULL) {
job_object = job_list_element->l_value;
job_list_element = job_list_element->l_next;
if ((job_object->job_found == 0) &&
((job_object->job_state != MY_STATE_UNKNOWN) ||
((my_clock - job_object->job_submit_time) > 300))) {
job_object->job_found = 0;
/*-----------------------------------------------------------------------*
* loop on the tasks in the job object - send deleted event and mark *
* all deleted. *
*-----------------------------------------------------------------------*/
task_list = (List *) job_object->task_list;
task_list_element = task_list->l_head;
while (task_list_element != NULL) {
task_object = task_list_element->l_value;
task_list_element = task_list_element->l_next;
if (task_object != 0) {
print_message(INFO_MESSAGE,
"Schedule event notification: Task_ID=%d deleted on node_name=%s deleted for LoadLeveler Job=%s.%d.%d for LoadLeveler Cluster=%s.\n",
task_object->ll_task_id, task_object->node_name,
job_object->ll_step_id.from_host,
job_object->ll_step_id.cluster,
job_object->ll_step_id.proc,
job_object->cluster_name);
sendTaskRemoveEvent(start_events_transid, job_object, task_object);
delete_task_from_list(task_list, task_object);
if (SizeOfList(task_list) == 0) {
job_object->job_state = MY_STATE_IDLE;
print_message(INFO_MESSAGE,
"Schedule event notification: Job=%s.%d.%d changed for LoadLeveler Cluster=%s.\n",
job_object->ll_step_id.from_host,
job_object->ll_step_id.cluster,
job_object->ll_step_id.proc,
job_object->cluster_name);
sendJobChangeEvent(start_events_transid, job_object);
}
}
}
job_object->job_state = MY_STATE_TERMINATED;
print_message(INFO_MESSAGE,
"Schedule event notification: Job=%s.%d.%d terminated for LoadLeveler Cluster=%s.\n",
job_object->ll_step_id.from_host,
job_object->ll_step_id.cluster, job_object->ll_step_id.proc,
job_object->cluster_name);
sendJobChangeEvent(start_events_transid, job_object);
print_message(INFO_MESSAGE,
"Schedule event notification: Job=%s.%d.%d deleted for LoadLeveler Cluster=%s.\n",
job_object->ll_step_id.from_host,
job_object->ll_step_id.cluster, job_object->ll_step_id.proc,
job_object->cluster_name);
sendJobRemoveEvent(start_events_transid, job_object);
}
else {
job_object->job_found = 0;
}
}
}
}
pthread_mutex_unlock(&master_lock);
/*-----------------------------------------------------------------------*
* sleep and loop again on the LoadLeveler machines. *
*-----------------------------------------------------------------------*/
if (state_shutdown_requested == 0) {
int sleep_interval = 0;
int mini_sleep_interval = (job_sleep_seconds + 4) / 5;
print_message(INFO_MESSAGE, "%s Sleeping for (%d seconds) %d intervals of 5 seconds\n",
__FUNCTION__, mini_sleep_interval * 5, mini_sleep_interval);
for (sleep_interval = 0; sleep_interval < mini_sleep_interval; sleep_interval++) {
if (state_shutdown_requested == 0) {
struct timespec wakeup_time;
int status;
gettimeofday(&wakeup_time, NULL);
wakeup_time.tv_sec = wakeup_time.tv_sec + 5;
pthread_mutex_lock(&job_notify_lock);
status =
pthread_cond_timedwait(&job_notify_condvar, &job_notify_lock, &wakeup_time);
pthread_mutex_unlock(&job_notify_lock);
if (status == 0) {
print_message(INFO_MESSAGE, "Main thread requests job query\n");
break;
}
else {
if (status != ETIMEDOUT) {
print_message(INFO_MESSAGE,
"Error in condition wait in job query thread: %s(%d)\n",
strerror(status), status);
}
else {
print_message(INFO_MESSAGE,
"Job query thread woke up after 5 second wait\n");
}
}
}
}
}
}
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return NULL;
}
#endif
/**************************************************************************/
/* Support functions */
/**************************************************************************/
char **
create_exec_parmlist(char *execname, char *targetname, char *args)
{
char *tokenized_args;
char *cp;
char **argv;
int i;
int arg_count;
int state;
char quote;
/*
* Process argument list to 'poe' command. This is done in two passes
* The first pass determines how many arguments there are so that argv
* can be allocated, and terminates each arg with a '\0'. The first
* pass ignores whitespace outside of quoted strings when counting
* and tokenizing args. It must preserve spaces within quoted strings
* and also deal with quoted strings within an arg, for instance
* 'This is an arg with a "quoted string" inside'
*/
TRACE_ENTRY;
arg_count = 0;
if (args == NULL) {
argv = malloc(3 * sizeof(char *));
malloc_check(argv, __FUNCTION__, __LINE__);
argv[0] = execname;
argv[1] = targetname;
argv[2] = NULL;
}
else {
quote = '\0';
tokenized_args = strdup(args);
cp = tokenized_args;
state = SKIPPING_SPACES;
while (*cp != '\0') {
switch (*cp) {
case ' ':
if (state == PARSING_UNQUOTED_ARG) {
arg_count = arg_count + 1;
*cp = '\0';
state = SKIPPING_SPACES;
}
break;
case '"':
case '\'':
if (state == PARSING_QUOTED_ARG) {
if (*cp == quote) {
arg_count = arg_count + 1;
quote = '\0';
*cp = '\0';
state = SKIPPING_SPACES;
}
else {
quote = *cp;
state = PARSING_QUOTED_ARG;
}
}
break;
default:
if (state == SKIPPING_SPACES) {
state = PARSING_UNQUOTED_ARG;
}
}
cp = cp + 1;
}
if (state != SKIPPING_SPACES) {
/*
* Last arg is terminated by ending '\0' so needs to be counted
* here
*/
arg_count = arg_count + 1;
}
/*
* The second pass allocates argv, with one extra slot for the
* poe executable, one for the name of the executable invoked by
* poe and one for the terminating null pointer. It then builds the
* argv array by scanning for the start of each arg, skipping
* over an initial quote, if present. The trailing quote in a
* quoted arg was removed by the first pass tokenizing step.
*/
argv = malloc(sizeof(char *) * (arg_count + 3));
malloc_check(argv, __FUNCTION__, __LINE__);
argv[0] = execname;
argv[1] = targetname;
i = 2;
cp = tokenized_args;
state = SKIPPING_SPACES;
while (i < (arg_count + 2)) {
if (state == SKIPPING_SPACES) {
if (*cp != ' ') {
state = SKIPPING_CHARS;
if ((*cp == '"') || (*cp == '\'')) {
argv[i] = cp + 1;
}
else {
argv[i] = cp;
}
i = i + 1;
}
}
else {
if (*cp == '\0') {
state = SKIPPING_SPACES;
}
}
cp = cp + 1;
}
argv[i] = NULL;
}
TRACE_EXIT;
return argv;
}
char **
create_env_array(char *args[], int split_io, char *mp_buffer_mem, char *mp_rdma_count)
{
/*
* Set up the environment variable array for the target application.
* Environment variables have two forms. Environment variables set on
* the environment tab of the launch configuration have the form
* env=HOME=/home/ptpuser. PE environment variables set on the
* parallel tab of the launch configuration have the form MP_PROCS=2
* Both types of environment variables are added to the application
* environment array in the form HOME=/home/ptpuser.
* The user may have requested stdout output to be split by task. If so,
* then MP_LABELIO must be set to 'yes' as the last environment variable
* setting in the array.
* No other environment variables are passed to the application.
*/
int i;
TRACE_ENTRY;
env_array_size = 100;
next_env_entry = 0;
env_array = (char **) malloc(sizeof(char *) * env_array_size);
for (i = 0; args[i] != NULL; i++) {
if (strncmp(args[i], "MP_", 3) == 0) {
add_environment_variable(strdup(args[i]));
}
else {
if (strncmp(args[i], "env=", 4) == 0) {
add_environment_variable(strdup(args[i]) + 4);
}
}
}
if (split_io == 1) {
add_environment_variable("MP_LABELIO=yes");
}
if (mp_buffer_mem[0] != '\0') {
add_environment_variable(mp_buffer_mem);
}
if (mp_rdma_count[0] != '\0') {
add_environment_variable(mp_rdma_count);
}
if (use_load_leveler) {
add_environment_variable("MP_RESD=yes");
print_message(TRACE_DETAIL_MESSAGE, "PE Job uses LoadLeveler resource management\n");
}
else {
add_environment_variable("MP_RESD=no");
}
add_environment_variable(NULL);
TRACE_EXIT;
return env_array;
}
/*
* Add the specified environment variable to the poe process environment variable set
*/
void
add_environment_variable(char *env_var)
{
if (next_env_entry >= env_array_size) {
env_array_size = env_array_size + 10;
env_array = (char **) realloc(env_array, sizeof(char *) * env_array_size);
malloc_check(env_array, __FUNCTION__, __LINE__);
}
env_array[next_env_entry++] = env_var;
}
/*
* Set up the file descriptor for stdio output files
*/
int
setup_stdio_fd(int run_trans_id, char *subid, int pipe_fds[], char *path, char *stdio_name, int *fd,
int *redirect)
{
int status;
TRACE_ENTRY;
if (path == NULL) {
status = pipe(pipe_fds);
if (status == -1) {
snprintf(emsg_buffer, sizeof emsg_buffer,
"Error creating %s pipe: %s", stdio_name, strerror(errno));
emsg_buffer[sizeof emsg_buffer - 1] = '\0';
post_submitjob_error(run_trans_id, subid, emsg_buffer);
TRACE_EXIT;
return -1;
}
status = fcntl(pipe_fds[0], F_SETFL, O_NONBLOCK);
if (status == -1) {
snprintf(emsg_buffer, sizeof emsg_buffer,
"Error initializing %s pipe: %s", stdio_name, strerror(errno));
emsg_buffer[sizeof emsg_buffer - 1] = '\0';
post_submitjob_error(run_trans_id, subid, emsg_buffer);
TRACE_EXIT;
return -1;
}
*fd = pipe_fds[0];
*redirect = 0;
}
else {
*fd = open(path, O_RDWR | O_CREAT | O_TRUNC, 0644);
if (*fd == -1) {
snprintf(emsg_buffer, sizeof emsg_buffer,
"Error redirecting %s to %s: %s", stdio_name, path, strerror(errno));
emsg_buffer[sizeof emsg_buffer - 1] = '\0';
post_submitjob_error(run_trans_id, subid, emsg_buffer);
TRACE_EXIT;
return -1;
}
*redirect = 1;
}
TRACE_EXIT;
return 0;
}
/*
* Set up a stdio file descriptor for child process so it is redirected to a file or pipe
*/
int
setup_child_stdio(int run_trans_id, char *subid, int stdio_fd, int redirect, int *file_fd, int pipe_fd[])
{
int status;
TRACE_ENTRY;
if (redirect) {
status = dup2(*file_fd, stdio_fd);
}
else {
close(pipe_fd[0]);
status = dup2(pipe_fd[1], stdio_fd);
}
if (status == -1) {
snprintf(emsg_buffer, sizeof emsg_buffer,
"Error setting stdio file descriptor (%d) for application: %s",
stdio_fd, strerror(errno));
emsg_buffer[sizeof emsg_buffer - 1] = '\0';
post_submitjob_error(run_trans_id, subid, emsg_buffer);
}
TRACE_EXIT;
return status;
}
void
update_nodes(int trans_id, FILE * hostlist)
{
/*
* Create a node list, containing unique nodes, from the hostlist file.
* Each time this function is called, new nodes may be referenced in the
* hostlist since there is no restriction on modifying the hostlist. Any
* new nodes will be added to the machine configuration.
*
* Message format for nodes message is event-id (RTEV_NATTR) followed by
* encoded key-value pairs where each token is preceded with one space.
* Multiple consecutive spaces in the message will cause parsing errors
* in the Java code handling the response.
*/
char *res;
char *valstr;
struct group *grp;
struct passwd *pwd;
char hostname[256];
List *new_nodes;
TRACE_ENTRY;
valstr = NULL;
pwd = getpwuid(geteuid());
grp = getgrgid(getgid());
res = fgets(hostname, sizeof(hostname), hostlist);
new_nodes = NewList();
while (res != NULL) {
char *cp;
/*
* Truncate node name to short form name
*/
cp = strpbrk(hostname, ".\n\r");
if (cp != NULL) {
*cp = '\0';
}
if (find_node(hostname) == NULL) {
node_refcount *node;
node = add_node(hostname);
AddToList(new_nodes, node);
node_count = node_count + 1;
}
res = fgets(hostname, sizeof(hostname), hostlist);
}
send_new_node_list(start_events_transid, machine_id, new_nodes);
DestroyList(new_nodes, NULL);
TRACE_EXIT;
}
node_refcount *
find_node(char *key)
{
/*
* Look for node with matching key in the node list. The HashSearch
* function only looks for a matching hash value. Since multiple keys
* may hash to the same hash value, an additional check needs to be
* made of keys that hash to the same value before determining a true
* match.
*/
int hash;
List *node_list;
TRACE_ENTRY;
TRACE_DETAIL_V("+++ Looking for node '%s'\n", key);
hash = HashCompute(key, strlen(key));
node_list = HashSearch(nodes, hash);
if (node_list == NULL) {
TRACE_EXIT;
return NULL;
}
else {
node_refcount *node;
/*
* Optimally, this code would obtain a lock on the hash node rather
* than locking all accesses to nodes, but currently the hash entry
* only contains a pointer to the list. If this becomes a problem
* then the hash object could be replaced with a structure
* containing the node lock and the list pointer.
*/
pthread_mutex_lock(&node_lock);
SetList(node_list);
node = GetListElement(node_list);
while (node != NULL) {
if (strcmp(key, node->key) == 0) {
break;
}
node = GetListElement(node_list);
}
pthread_mutex_unlock(&node_lock);
TRACE_EXIT;
return node;
}
}
node_refcount *
add_node(char *key)
{
/*
* Add a node to the node list. Since the hash functions in hash.c only
* detect duplicate hash values, and not keys that map to the same hash
* value, the data at each node in the hash table is a simple linked
* list of keys that map to the same hash and the associated data.
*/
int hash;
node_refcount *node;
List *node_list;
/*
* Create the node for the hostname. The node key is the node's hostname
*/
TRACE_ENTRY;
node = malloc(sizeof(node_refcount));
malloc_check(node, __FUNCTION__, __LINE__);
node->key = strdup(key);
node->node_number = global_node_index;
global_node_index = global_node_index + 1;
node->proxy_nodeid = generate_id();
node->task_count = 0;
/*
* Look for the hash key. If the hash key already exists, then
* just add the hostname to the list for the hash key. Otherwise,
* create a new list, add the hostname to the list and then add the
* node list as the 'data' for the hash node.
*/
hash = HashCompute(key, strlen(key));
node_list = HashSearch(nodes, hash);
if (node_list == NULL) {
node_list = NewList();
AddToList(node_list, node);
HashInsert(nodes, hash, node_list);
}
else {
AddToList(node_list, node);
}
TRACE_EXIT;
return node;
}
void
update_node_refcounts(int numtasks, taskinfo * tasks)
{
/*
* Update node reference counts (number of tasks running on node),
* subtracting 1 for each appearance of a node name in the task
* list. If the count for a node is zero, then that node needs to
* be removed from the node list and the front end notified.
*/
node_refcount *noderef;
int i;
TRACE_ENTRY;
for (i = 0; i < numtasks; i++) {
noderef = find_node(tasks[i].hostname);
if (noderef == NULL) {
print_message(TRACE_DETAIL_MESSAGE,
"Failed to find expected node %s\n", tasks[i].hostname);
}
else {
noderef->task_count = noderef->task_count - 1;
if (noderef->task_count <= 0) {
print_message(TRACE_DETAIL_MESSAGE,
"No tasks left on %s, deleting\n", tasks[i].hostname);
delete_noderef(tasks[i].hostname);
}
else {
print_message(TRACE_DETAIL_MESSAGE,
"Refcount for %s is %d\n", tasks[i].hostname, noderef->task_count);
}
}
}
TRACE_EXIT;
}
void
delete_noderef(char *hostname)
{
/*
* delete node from the node list and notify the front end
*/
int hash;
List *node_list;
node_refcount *node;
TRACE_ENTRY;
hash = HashCompute(hostname, strlen(hostname));
node_list = HashSearch(nodes, hash);
if (node_list == NULL) {
TRACE_EXIT;
return;
}
pthread_mutex_lock(&node_lock);
SetList(node_list);
node = GetListElement(node_list);
while (node != NULL) {
if (strcmp(node->key, hostname) == 0) {
char node_number[11];
free(node->key);
RemoveFromList(node_list, node);
sprintf(node_number, "%d", node->proxy_nodeid);
print_message(TRACE_DETAIL_MESSAGE, "Sending delete event for node %s\n", node_number);
enqueue_event(proxy_remove_node_event(start_events_transid, node_number));
free(node);
break;
}
node = GetListElement(node_list);
}
pthread_mutex_unlock(&node_lock);
TRACE_EXIT;
}
void
delete_task_list(int numtasks, taskinfo * tasks)
{
int i;
TRACE_ENTRY;
for (i = 0; i < numtasks; i++) {
if (tasks[i].hostname != NULL) {
free(tasks[i].hostname);
}
if (tasks[i].ipaddr != NULL) {
free(tasks[i].ipaddr);
}
}
free(tasks);
TRACE_EXIT;
}
void
hash_cleanup(void *hash_list)
{
TRACE_ENTRY;
DestroyList(hash_list, NULL);
TRACE_EXIT;
}
void *
kill_process(void *pid)
{
TRACE_ENTRY;
sleep(60);
kill((pid_t) pid, 9);
TRACE_EXIT;
return NULL;
}
void
malloc_check(void *p, const char *function, int line)
{
if (p == NULL) {
print_message(FATAL_MESSAGE,
"Memory allocation error in resource manager in %s (line %d)\n",
function, line);
exit(1);
}
}
#ifdef HAVE_LLAPI_H
int
load_load_leveler_library(int trans_id)
{
int dlopen_mode;
int my_errno;
memset(&LL_SYMS, '\0', sizeof(LL_SYMS)); /* zero the LoadLeveler dlsym symbol table */
dlopen_mode = 0;
print_message(INFO_MESSAGE, "dlopen LoadLeveler shared library %s.\n", ibmll_libpath_name);
#ifdef _AIX
dlopen_mode = RTLD_LOCAL | RTLD_NOW | RTLD_MEMBER;
#else
dlopen_mode = RTLD_LOCAL | RTLD_NOW;
#endif
ibmll_libpath_handle = dlopen(ibmll_libpath_name, dlopen_mode);
my_errno = errno;
if (ibmll_libpath_handle == NULL) {
print_message(ERROR_MESSAGE, "dlopen of %s failed with errno=%d.\n", ibmll_libpath_name,
my_errno);
#if 0
sendErrorEvent(trans_id, RTEV_ERROR_LL_INIT,
"dlopen failed for LoadLeveler shared library");
#endif
return PROXY_RES_ERR;
}
else {
print_message(INFO_MESSAGE, "dlopen %s successful.\n", ibmll_libpath_name);
}
print_message(INFO_MESSAGE, "Locating LoadLeveler functions via dlsym.\n");
*(void **) (&(LL_SYMS.ll_query)) = dlsym(ibmll_libpath_handle, "ll_query");
*(void **) (&LL_SYMS.ll_set_request) = dlsym(ibmll_libpath_handle, "ll_set_request");
*(void **) (&LL_SYMS.ll_get_objs) = dlsym(ibmll_libpath_handle, "ll_get_objs");
*(void **) (&LL_SYMS.ll_get_data) = dlsym(ibmll_libpath_handle, "ll_get_data");
*(void **) (&LL_SYMS.ll_free_objs) = dlsym(ibmll_libpath_handle, "ll_free_objs");
*(void **) (&LL_SYMS.ll_deallocate) = dlsym(ibmll_libpath_handle, "ll_deallocate");
*(void **) (&LL_SYMS.ll_next_obj) = dlsym(ibmll_libpath_handle, "ll_next_obj");
*(void **) (&LL_SYMS.ll_cluster)
= dlsym(ibmll_libpath_handle, "ll_cluster");
*(void **) (&LL_SYMS.ll_submit_job) = dlsym(ibmll_libpath_handle, "llsubmit");
*(void **) (&LL_SYMS.ll_terminate_job) = dlsym(ibmll_libpath_handle, "ll_terminate_job");
*(void **) (&LL_SYMS.ll_free_job_info) = dlsym(ibmll_libpath_handle, "llfree_job_info");
*(void **) (&LL_SYMS.ll_error) = dlsym(ibmll_libpath_handle, "ll_error");
if ((LL_SYMS.ll_query == NULL) || (LL_SYMS.ll_set_request == NULL)
|| (LL_SYMS.ll_get_objs == NULL) || (LL_SYMS.ll_get_data == NULL)
|| (LL_SYMS.ll_free_objs == NULL)
|| (LL_SYMS.ll_deallocate == NULL) || (LL_SYMS.ll_cluster == NULL)
|| (LL_SYMS.ll_next_obj == NULL) || (LL_SYMS.ll_free_job_info
== NULL) || (LL_SYMS.ll_terminate_job == NULL)
|| (LL_SYMS.ll_error == NULL) || (LL_SYMS.ll_submit_job == NULL)) {
print_message(ERROR_MESSAGE,
"One or more LoadLeveler symbols could not be located in %s.\n",
ibmll_libpath_name);
#if 0
sendErrorEvent(trans_id, RTEV_ERROR_LL_INIT, "LoadLeveler symbols not located");
#endif
return PROXY_RES_ERR;
}
else {
print_message(INFO_MESSAGE,
"Successfully located all of the required LoadLeveler functions via dlsym.\n");
}
return PROXY_RES_OK;
}
/*************************************************************************
* Call LoadLeveler to get data *
*************************************************************************/
int
my_ll_get_data(LL_element * request, enum LLAPI_Specification spec, void *result)
{
int rc = 0;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. LLAPI_Specification=%d.\n", __FUNCTION__,
__LINE__, spec);
pthread_mutex_lock(&access_LoadLeveler_lock);
rc = (*LL_SYMS.ll_get_data) (request, spec, result);
pthread_mutex_unlock(&access_LoadLeveler_lock);
if (rc != 0) {
print_message(INFO_MESSAGE, "LoadLeveler ll_get_data rc=%d.\n", rc);
}
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d. rc=%d.\n", __FUNCTION__, __LINE__, rc);
return rc;
}
/*************************************************************************
* Call LoadLeveler to retrieve the cluster element *
*************************************************************************/
int
my_ll_cluster(int version, LL_element ** errObj, LL_cluster_param * cp)
{
int rc = 0;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. version=%d.\n", __FUNCTION__, __LINE__,
version);
pthread_mutex_lock(&access_LoadLeveler_lock);
rc = (*LL_SYMS.ll_cluster) (version, errObj, cp); /* set the cluster name */
pthread_mutex_unlock(&access_LoadLeveler_lock);
if (rc != 0) {
print_message(INFO_MESSAGE, "LoadLeveler ll_cluster rc=%d.\n", rc);
}
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d. rc=%d.\n", __FUNCTION__, __LINE__, rc);
return rc;
}
/*************************************************************************
* Call LoadLeveler to deallocate the query element *
*************************************************************************/
int
my_ll_deallocate(LL_element * query_elem)
{
int rc = 0;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d.\n", __FUNCTION__, __LINE__);
pthread_mutex_lock(&access_LoadLeveler_lock);
rc = (*LL_SYMS.ll_deallocate) (query_elem);
pthread_mutex_unlock(&access_LoadLeveler_lock);
if (rc != 0) {
print_message(ERROR_MESSAGE, "LoadLeveler ll_deallocate rc=%d.\n", rc);
}
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d. rc=%d.\n", __FUNCTION__, __LINE__, rc);
return rc;
}
/*************************************************************************
* Call LoadLeveler to perform a query *
*************************************************************************/
LL_element *
my_ll_query(enum QueryType type)
{
LL_element *query_elem = NULL;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. QueryType=%d.\n", __FUNCTION__, __LINE__,
type);
pthread_mutex_lock(&access_LoadLeveler_lock);
query_elem = (*LL_SYMS.ll_query) (type);
pthread_mutex_unlock(&access_LoadLeveler_lock);
if (query_elem == NULL) {
print_message(INFO_MESSAGE,
"LoadLeveler ll_query element=NULL. End of list was probably reached.\n");
}
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return query_elem;
}
/*************************************************************************
* Call LoadLeveler to free the objects in the query element *
*************************************************************************/
int
my_ll_free_objs(LL_element * query_elem)
{
int rc = 0;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d.\n", __FUNCTION__, __LINE__);
pthread_mutex_lock(&access_LoadLeveler_lock);
rc = (*LL_SYMS.ll_free_objs) (query_elem);
pthread_mutex_unlock(&access_LoadLeveler_lock);
if (rc != 0) {
print_message(ERROR_MESSAGE, "LoadLeveler ll_free_objs rc=%d.\n", rc);
}
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d. rc=%d.\n", __FUNCTION__, __LINE__, rc);
return rc;
}
/*************************************************************************
* Call LoadLeveler to get the next object from the returned list *
*************************************************************************/
LL_element *
my_ll_next_obj(LL_element * query_elem)
{
LL_element *next_elem = NULL;;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d.\n", __FUNCTION__, __LINE__);
pthread_mutex_lock(&access_LoadLeveler_lock);
next_elem = (*LL_SYMS.ll_next_obj) (query_elem);
pthread_mutex_unlock(&access_LoadLeveler_lock);
if (next_elem == NULL) {
print_message(INFO_MESSAGE,
"LoadLeveler ll_next_obj element=NULL. End of list was probably reached.\n");
}
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return next_elem;
}
/*************************************************************************
* Call LoadLeveler to get objects from the previously returned element *
*************************************************************************/
LL_element *
my_ll_get_objs(LL_element * query_elem, enum LL_Daemon daemon, char *ignore, int *value, int *rc)
{
*rc = 0; /* preset rc to 0 */
LL_element *ret_object = NULL;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. LL_Daemon=%d.\n", __FUNCTION__, __LINE__,
daemon);
pthread_mutex_lock(&access_LoadLeveler_lock);
ret_object = LL_SYMS.ll_get_objs(query_elem, daemon, ignore, value, rc);
pthread_mutex_unlock(&access_LoadLeveler_lock);
if (ret_object == NULL) {
print_message(INFO_MESSAGE,
"LoadLeveler ll_get_objs element=NULL. End of list was probably reached.\n");
}
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d. rc=%d.\n", __FUNCTION__, __LINE__,
*rc);
return ret_object;
}
/*************************************************************************
* Call LoadLeveler to build a request object for a subsequent call *
*************************************************************************/
int
my_ll_set_request(LL_element * query_elem, enum QueryFlags qflags, char **ignore,
enum DataFilter dfilter)
{
int rc = 0;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. QueryFlags=%d.\n", __FUNCTION__,
__LINE__, qflags);
pthread_mutex_lock(&access_LoadLeveler_lock);
rc = LL_SYMS.ll_set_request(query_elem, qflags, ignore, dfilter);
pthread_mutex_unlock(&access_LoadLeveler_lock);
if (rc != 0) {
print_message(ERROR_MESSAGE, "LoadLeveler ll_set_request rc=%i.\n", rc);
}
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d. rc=%d.\n", __FUNCTION__, __LINE__, rc);
return rc;
}
/*************************************************************************
* send node added event to gui *
*************************************************************************/
static int
sendNodeAddEvent(int gui_transmission_id, ClusterObject * cluster_object, NodeObject * node_object)
{
proxy_msg *msg;
char proxy_generated_cluster_id_string[256];
char proxy_generated_node_id_string[256];
char *node_state_to_report = NODE_STATE_UNKNOWN;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. cluster=%s. node=%s. state=%d.\n",
__FUNCTION__, __LINE__, cluster_object->cluster_name, node_object->node_name,
node_object->node_state);
memset(proxy_generated_cluster_id_string, '\0', sizeof(proxy_generated_cluster_id_string));
memset(proxy_generated_node_id_string, '\0', sizeof(proxy_generated_node_id_string));
sprintf(proxy_generated_cluster_id_string, "%d", cluster_object->proxy_generated_cluster_id);
sprintf(proxy_generated_node_id_string, "%d", node_object->proxy_generated_node_id);
switch (node_object->node_state) {
case MY_STATE_UP:
node_state_to_report = NODE_STATE_UP;
break;
case MY_STATE_DOWN:
node_state_to_report = NODE_STATE_DOWN;
break;
default:
node_state_to_report = NODE_STATE_UNKNOWN;
break;
}
msg = proxy_new_node_event(gui_transmission_id, proxy_generated_cluster_id_string, 1);
proxy_add_node(msg, proxy_generated_node_id_string, node_object->node_name, node_state_to_report, 0);
enqueue_event(msg);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return 0;
}
/*************************************************************************
* send node changed event to gui *
*************************************************************************/
static int
sendNodeChangeEvent(int gui_transmission_id, ClusterObject * cluster_object,
NodeObject * node_object)
{
proxy_msg *msg;
char proxy_generated_node_id_string[256];
char *node_state_to_report = NODE_STATE_UNKNOWN;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. cluster=%s. node=%s. state=%d.\n",
__FUNCTION__, __LINE__, cluster_object->cluster_name, node_object->node_name,
node_object->node_state);
memset(proxy_generated_node_id_string, '\0', sizeof(proxy_generated_node_id_string));
sprintf(proxy_generated_node_id_string, "%d", node_object->proxy_generated_node_id);
switch (node_object->node_state) {
case MY_STATE_UP:
node_state_to_report = NODE_STATE_UP;
break;
case MY_STATE_DOWN:
node_state_to_report = NODE_STATE_DOWN;
break;
default:
node_state_to_report = NODE_STATE_UNKNOWN;
break;
}
msg = proxy_node_change_event(gui_transmission_id, proxy_generated_node_id_string, 1);
proxy_add_node(msg, proxy_generated_node_id_string, node_object->node_name, node_state_to_report, 0);
enqueue_event(msg);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return 0;
}
/*************************************************************************
* send job added event to gui *
*************************************************************************/
static int
sendJobAddEvent(int gui_transmission_id, ClusterObject * cluster_object, JobObject * job_object)
{
proxy_msg *msg;
char proxy_generated_job_id_string[256];
char proxy_generated_queue_id_string[256];
char job_name_string[256];
char *job_state_to_report = JOB_STATE_INIT;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. job=%s.%d.%d. state=%d.\n", __FUNCTION__,
__LINE__, job_object->ll_step_id.from_host, job_object->ll_step_id.cluster,
job_object->ll_step_id.proc, job_object->job_state);
memset(proxy_generated_job_id_string, '\0', sizeof(proxy_generated_job_id_string));
memset(proxy_generated_queue_id_string, '\0', sizeof(proxy_generated_queue_id_string));
memset(job_name_string, '\0', sizeof(job_name_string));
sprintf(proxy_generated_job_id_string, "%d", job_object->proxy_generated_job_id);
sprintf(proxy_generated_queue_id_string, "%d", cluster_object->proxy_generated_queue_id);
switch (job_object->job_state) {
case MY_STATE_IDLE:
job_state_to_report = JOB_STATE_INIT;
break;
case MY_STATE_RUNNING:
job_state_to_report = JOB_STATE_RUNNING;
break;
case MY_STATE_STOPPED:
job_state_to_report = JOB_STATE_INIT;
break;
case MY_STATE_TERMINATED:
job_state_to_report = JOB_STATE_TERMINATED;
break;
default:
job_state_to_report = JOB_STATE_INIT;
break;
}
sprintf(job_name_string, "%s.%d.%d", job_object->ll_step_id.from_host,
job_object->ll_step_id.cluster, job_object->ll_step_id.proc);
msg =
proxy_new_job_event(gui_transmission_id, proxy_generated_queue_id_string,
proxy_generated_job_id_string, job_name_string, job_state_to_report,
job_object->gui_assigned_job_id);
enqueue_event(msg);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return 0;
}
/*************************************************************************
* send job changed event to gui *
*************************************************************************/
static int
sendJobChangeEvent(int gui_transmission_id, JobObject * job_object)
{
proxy_msg *msg;
char proxy_generated_job_id_string[256];
char job_state_string[256];
char *job_state_to_report = JOB_STATE_INIT;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. job=%s.%d.%d. state=%d.\n", __FUNCTION__,
__LINE__, job_object->ll_step_id.from_host, job_object->ll_step_id.cluster,
job_object->ll_step_id.proc, job_object->job_state);
memset(proxy_generated_job_id_string, '\0', sizeof(proxy_generated_job_id_string));
memset(job_state_string, '\0', sizeof(job_state_string));
sprintf(proxy_generated_job_id_string, "%d", job_object->proxy_generated_job_id);
switch (job_object->job_state) {
case MY_STATE_IDLE:
job_state_to_report = JOB_STATE_INIT;
break;
case MY_STATE_RUNNING:
job_state_to_report = JOB_STATE_RUNNING;
break;
case MY_STATE_STOPPED:
job_state_to_report = JOB_STATE_INIT;
break;
case MY_STATE_TERMINATED:
job_state_to_report = JOB_STATE_TERMINATED;
break;
default:
job_state_to_report = JOB_STATE_INIT;
break;
}
msg = proxy_job_change_event(gui_transmission_id, proxy_generated_job_id_string, 1);
sprintf(job_state_string, "%d", job_object->job_state);
proxy_add_string_attribute(msg, JOB_STATE_ATTR, job_state_to_report);
enqueue_event(msg);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return 0;
}
/*************************************************************************
* send job removed event to gui *
*************************************************************************/
static int
sendJobRemoveEvent(int gui_transmission_id, JobObject * job_object)
{
proxy_msg *msg;
char proxy_generated_job_id_string[256];
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. job=%s.%d.%d. state=%d.\n", __FUNCTION__,
__LINE__, job_object->ll_step_id.from_host, job_object->ll_step_id.cluster,
job_object->ll_step_id.proc, job_object->job_state);
memset(proxy_generated_job_id_string, '\0', sizeof(proxy_generated_job_id_string));
sprintf(proxy_generated_job_id_string, "%d", job_object->proxy_generated_job_id);
msg = proxy_remove_job_event(gui_transmission_id, proxy_generated_job_id_string);
enqueue_event(msg);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return 0;
}
/*************************************************************************
* send task added event to gui *
*************************************************************************/
static int
sendTaskAddEvent(int gui_transmission_id, ClusterObject * cluster_object, JobObject * job_object,
TaskObject * task_object)
{
proxy_msg *msg;
char proxy_generated_job_id_string[256];
char proxy_generated_task_id_string[256];
char ll_task_id_string[256];
char *task_state_to_report = PROC_STATE_STOPPED;
NodeObject *node_object = NULL;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. job=%s.%d.%d. node=%s. task=%d.\n",
__FUNCTION__, __LINE__, job_object->ll_step_id.from_host,
job_object->ll_step_id.cluster, job_object->ll_step_id.proc,
task_object->node_name, task_object->ll_task_id);
memset(proxy_generated_job_id_string, '\0', sizeof(proxy_generated_job_id_string));
memset(proxy_generated_task_id_string, '\0', sizeof(proxy_generated_task_id_string));
memset(ll_task_id_string, '\0', sizeof(ll_task_id_string));
sprintf(proxy_generated_job_id_string, "%d", job_object->proxy_generated_job_id);
sprintf(proxy_generated_task_id_string, "%d", task_object->proxy_generated_task_id);
sprintf(ll_task_id_string, "%d", task_object->ll_task_id);
msg = proxy_new_process_event(gui_transmission_id, proxy_generated_job_id_string, 1);
switch (task_object->task_state) {
case MY_STATE_IDLE:
task_state_to_report = PROC_STATE_STARTING;
break;
case MY_STATE_RUNNING:
task_state_to_report = PROC_STATE_RUNNING;
break;
case MY_STATE_STOPPED:
task_state_to_report = PROC_STATE_STOPPED;
break;
case MY_STATE_TERMINATED:
task_state_to_report = PROC_STATE_EXITED;
break;
default:
task_state_to_report = PROC_STATE_STARTING;
break;
}
proxy_add_process(msg, proxy_generated_task_id_string, ll_task_id_string, task_state_to_report, 2);
node_object = get_node_in_hash(cluster_object->node_hash, task_object->node_name);
proxy_add_int_attribute(msg, PROC_NODEID_ATTR, node_object->proxy_generated_node_id);
proxy_add_int_attribute(msg, PROC_INDEX_ATTR, task_object->ll_task_id);
enqueue_event(msg);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return 0;
}
/*************************************************************************
* send task changed event to gui *
*************************************************************************/
static int
sendTaskChangeEvent(int gui_transmission_id, JobObject * job_object, TaskObject * task_object)
{
proxy_msg *msg;
char proxy_generated_task_id_string[256];
char *task_state_to_report = PROC_STATE_STOPPED;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d.\n", __FUNCTION__, __LINE__);
memset(proxy_generated_task_id_string, '\0', sizeof(proxy_generated_task_id_string));
sprintf(proxy_generated_task_id_string, "%d", task_object->proxy_generated_task_id);
msg = proxy_process_change_event(gui_transmission_id, proxy_generated_task_id_string, 1);
switch (task_object->task_state) {
case MY_STATE_IDLE:
task_state_to_report = PROC_STATE_STARTING;
break;
case MY_STATE_RUNNING:
task_state_to_report = PROC_STATE_RUNNING;
break;
case MY_STATE_STOPPED:
task_state_to_report = PROC_STATE_STOPPED;
break;
case MY_STATE_TERMINATED:
task_state_to_report = PROC_STATE_EXITED;
break;
default:
task_state_to_report = PROC_STATE_STARTING;
break;
}
proxy_add_string_attribute(msg, PROC_STATE_ATTR, task_state_to_report);
enqueue_event(msg);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return 0;
}
/*************************************************************************
* send task removed event to gui *
*************************************************************************/
static int
sendTaskRemoveEvent(int gui_transmission_id, JobObject * job_object, TaskObject * task_object)
{
proxy_msg *msg;
char proxy_generated_task_id_string[256];
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d.\n", __FUNCTION__, __LINE__);
memset(proxy_generated_task_id_string, '\0', sizeof(proxy_generated_task_id_string));
sprintf(proxy_generated_task_id_string, "%d", task_object->proxy_generated_task_id);
msg = proxy_remove_process_event(gui_transmission_id, proxy_generated_task_id_string);
enqueue_event(msg);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return 0;
}
/*************************************************************************
* send cluster added event to gui (a cluster to LoadLeveler is a *
* PTP machine to the gui) *
*************************************************************************/
static int
sendMachineAddEvent(int gui_transmission_id, ClusterObject * cluster_object)
{
proxy_msg *msg;
char proxy_generated_cluster_id_string[256];
char *machine_state_to_report = MACHINE_STATE_UNKNOWN;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. cluster=%s. state=%d.\n", __FUNCTION__,
__LINE__, cluster_object->cluster_name, cluster_object->cluster_state);
memset(proxy_generated_cluster_id_string, '\0', sizeof(proxy_generated_cluster_id_string));
sprintf(proxy_generated_cluster_id_string, "%d", cluster_object->proxy_generated_cluster_id);
// TODO - Need to ensure that machine_id gets set to the machine for the cluster owning the proxy node
my_cluster = cluster_object;
machine_id = cluster_object->proxy_generated_cluster_id;
switch (cluster_object->cluster_state) {
case MY_STATE_UP:
machine_state_to_report = MACHINE_STATE_UP;
break;
case MY_STATE_DOWN:
machine_state_to_report = MACHINE_STATE_DOWN;
break;
default:
machine_state_to_report = MACHINE_STATE_UNKNOWN;
break;
}
msg =
proxy_new_machine_event(gui_transmission_id, ibmll_proxy_base_id_string,
proxy_generated_cluster_id_string, cluster_object->cluster_name,
machine_state_to_report);
enqueue_event(msg);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return 0;
}
/*************************************************************************
* send queue added event to gui (one per cluster) *
*************************************************************************/
static int
sendQueueAddEvent(int gui_transmission_id, ClusterObject * cluster_object)
{
proxy_msg *msg;
char proxy_generated_queue_id_string[256];
char *queue_state_to_report = QUEUE_STATE_STOPPED;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. queue=%s. state=%d.\n", __FUNCTION__,
__LINE__, cluster_object->cluster_name, cluster_object->queue_state);
memset(proxy_generated_queue_id_string, '\0', sizeof(proxy_generated_queue_id_string));
sprintf(proxy_generated_queue_id_string, "%d", cluster_object->proxy_generated_queue_id);
// TODO - Need to ensure that queue_id gets set to the queue belonging to the cluster where the proxy node resides
queue_id = cluster_object->proxy_generated_queue_id;
switch (cluster_object->cluster_state) {
case MY_STATE_UP:
queue_state_to_report = QUEUE_STATE_NORMAL;
break;
default:
queue_state_to_report = QUEUE_STATE_STOPPED;
break;
}
msg =
proxy_new_queue_event(gui_transmission_id, ibmll_proxy_base_id_string,
proxy_generated_queue_id_string, cluster_object->cluster_name,
queue_state_to_report);
enqueue_event(msg);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return 0;
}
/*************************************************************************
* add job to my list *
*************************************************************************/
void
add_job_to_list(List * job_list, JobObject * job_object)
{
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. job_object=x\'%08x\'.\n", __FUNCTION__,
__LINE__, job_object);
AddToList(job_list, job_object);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
}
/*************************************************************************
* add task to my list *
*************************************************************************/
void
add_task_to_list(List * task_list, TaskObject * task_object)
{
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. task_object=x\'%08x\'.\n", __FUNCTION__,
__LINE__, task_object);
AddToList(task_list, task_object);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
}
/*************************************************************************
* add node to my hash table *
*************************************************************************/
void
add_node_to_hash(Hash * node_hash, NodeObject * node_object)
{
int hash_key;
List *node_list;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. object=x\'%08x\'.\n", __FUNCTION__,
__LINE__, node_object);
hash_key = HashCompute(node_object->node_name, strlen(node_object->node_name));
node_list = HashSearch(node_hash, hash_key);
if (node_list == NULL) {
node_list = NewList();
AddToList(node_list, node_object);
HashInsert(node_hash, hash_key, node_list);
}
else {
AddToList(node_list, node_object);
}
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
}
/*************************************************************************
* delete task from my list *
*************************************************************************/
void
delete_task_from_list(List * task_list, TaskObject * task_object)
{
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d.\n", __FUNCTION__, __LINE__);
if (task_object->node_name != NULL) {
free(task_object->node_name);
task_object->node_name = NULL;
}
if (task_object->node_address != NULL) {
free(task_object->node_address);
task_object->node_address = NULL;
}
RemoveFromList(task_list, task_object);
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
}
/*************************************************************************
* find job in my list *
*************************************************************************/
JobObject *
get_job_in_list(List * job_list, LL_STEP_ID ll_step_id)
{
ListElement *job_list_element = NULL;
JobObject *job_object = NULL;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. name=%s.%d.%d.\n", __FUNCTION__,
__LINE__, ll_step_id.from_host, ll_step_id.cluster, ll_step_id.proc);
job_list_element = job_list->l_head;
while (job_list_element != NULL) {
job_object = job_list_element->l_value;
job_list_element = job_list_element->l_next;
if ((strcmp(ll_step_id.from_host, job_object->ll_step_id.from_host) == 0)
&& (ll_step_id.cluster == job_object->ll_step_id.cluster)
&& (ll_step_id.proc == job_object->ll_step_id.proc)) {
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d. job_object=x\'%08x\'.\n",
__FUNCTION__, __LINE__, job_object);
return job_object;
}
}
job_object = NULL;
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d. job_object=x\'%08x\'.\n", __FUNCTION__,
__LINE__, job_object);
return job_object;
}
/*************************************************************************
* find node in my hash table *
*************************************************************************/
NodeObject *
get_node_in_hash(Hash * node_hash, char *node_name)
{
int hash_key = 0;
List *node_list = NULL;
ListElement *node_list_element = NULL;
NodeObject *node_object = NULL;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. name=%s.\n", __FUNCTION__, __LINE__,
node_name);
hash_key = HashCompute(node_name, strlen(node_name));
node_list = HashSearch(node_hash, hash_key);
if (node_list != NULL) {
node_list_element = node_list->l_head;
while (node_list_element != NULL) {
node_object = node_list_element->l_value;
node_list_element = node_list_element->l_next;
if (strcmp(node_name, node_object->node_name) == 0) {
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d. node_object=x\'%08x\'.\n",
__FUNCTION__, __LINE__, node_object);
return node_object;
}
}
}
node_object = NULL;
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d. node_object=x\'%08x\'.\n",
__FUNCTION__, __LINE__, node_object);
return node_object;
}
/*************************************************************************
* find task in my list *
*************************************************************************/
TaskObject *
get_task_in_list(List * task_list, char *task_instance_machine_name, int ll_task_id)
{
TaskObject *task_object = NULL;
ListElement *task_list_element = NULL;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d. id=%d.\n", __FUNCTION__, __LINE__,
ll_task_id);
task_list_element = task_list->l_head;
while (task_list_element != NULL) {
task_object = task_list_element->l_value;
task_list_element = task_list_element->l_next;
if ((ll_task_id == task_object->ll_task_id)
&& (strcmp(task_object->node_name, task_instance_machine_name) == 0)) {
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d. task_object=x\'%08x\'.\n",
__FUNCTION__, __LINE__, task_object);
return task_object;
}
}
task_object = NULL;
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d. task_object=x\'%08x\'.\n",
__FUNCTION__, __LINE__, task_object);
return task_object;
}
/*************************************************************************
* Determine if LoadLeveler is configured multicluster. *
* If any errors then default to local. This code will not be called *
* if the user has forced us to run local only or multicluster mode. *
*************************************************************************/
int
get_multicluster_status()
{
int rc = 0;
int i = 0;
LL_element *query_elem = NULL;
LL_element *cluster = NULL;
int cluster_count = 0;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d.\n", __FUNCTION__, __LINE__);
if ((state_shutdown_requested == 0) && (state_events_active == 1)) {
if (multicluster_status == -1) {
query_elem = NULL;
query_elem = my_ll_query(CLUSTERS);
if (query_elem == NULL) {
print_message(ERROR_MESSAGE,
"Unable to obtain query element. LoadLeveler may not be active.\n");
return -1;
}
/* Get information relating to LoadLeveler clusters.
* QUERY_ALL: we are querying for local cluster data
* NULL: no filter needed
* ALL_DATA: we want all the information available about the cluster */
print_message(INFO_MESSAGE, "Call LoadLeveler (ll_set_request) for cluster.\n");
rc = my_ll_set_request(query_elem, QUERY_ALL, NULL, ALL_DATA);
print_message(INFO_MESSAGE, "Call LoadLeveler (ll_get_objs) for cluster.\n");
cluster = my_ll_get_objs(query_elem, LL_CM, NULL, &cluster_count, &rc);
if (rc < 0) {
rc = my_ll_deallocate(query_elem);
return -1;
}
print_message(INFO_MESSAGE, "Number of LoadLeveler Clusters=%d.\n", cluster_count);
i = 0;
if (cluster != NULL) {
print_message(INFO_MESSAGE, "Cluster %d:\n", i);
rc = my_ll_get_data(cluster, LL_ClusterMusterEnvironment, &multicluster_status);
if (rc != 0) {
print_message(ERROR_MESSAGE,
"Error rc=%d trying to determine if LoadLeveler is running local or multicluster configuration. Defaulting to local cluster only (no multicluster).\n",
rc);
multicluster_status = 0;
}
else {
print_message(INFO_MESSAGE, "Multicluster returned is = %d.\n",
multicluster_status);
}
}
/* First we need to release the individual objects that were */
/* obtained by the query */
rc = my_ll_free_objs(query_elem);
rc = my_ll_deallocate(query_elem);
}
}
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return multicluster_status;
}
/*************************************************************************
* Retrieve a list of LoadLeveler clusters. *
* If no multicluster environment then return a list of 1 cluster. *
*************************************************************************/
void
refresh_cluster_list()
{
int rc = 0;
int i = 0;
LL_element *query_elem = NULL;
LL_element *cluster = NULL;
int cluster_count = 0;
char *cluster_name = NULL;
int cluster_local = 0;
ClusterObject *cluster_object = NULL;
print_message(TRACE_MESSAGE, ">>> %s entered. line=%d.\n", __FUNCTION__, __LINE__);
if ((state_shutdown_requested == 0) && (state_events_active == 1)) {
if (multicluster_status == -1) {
multicluster_status = get_multicluster_status();
}
switch (multicluster_status) {
/*-----------------------------------------------------------------------*
* no contact with loadleveler yet *
*-----------------------------------------------------------------------*/
case -1:
break;
/*-----------------------------------------------------------------------*
* single cluster and multi cluster *
*-----------------------------------------------------------------------*/
case 0:
if (cluster_list == NULL) {
cluster_list = NewList();
job_list = NewList();
}
/* end if no list obtained yet */
print_message(INFO_MESSAGE,
"Number of LoadLeveler Clusters=0 (not running multicluster).\n");
i = 0;
cluster_object = (ClusterObject *) malloc(sizeof(ClusterObject));
malloc_check(cluster_object, __FUNCTION__, __LINE__);
memset(cluster_object, '\0', sizeof(cluster_object));
cluster_object->proxy_generated_cluster_id = generate_id();
cluster_object->proxy_generated_queue_id = generate_id();
cluster_object->cluster_state = MY_STATE_UP;
cluster_object->queue_state = MY_STATE_UP;
cluster_object->cluster_name = strdup("Local (not multicluster)");
cluster_object->cluster_is_local = 1;
cluster_object->node_hash = HashCreate(1024);
print_message(INFO_MESSAGE, "Cluster name=%s\n", cluster_name);
AddToList(cluster_list, (void *) cluster_object);
print_message(INFO_MESSAGE,
"Send event notification: PTP Machine added for LoadLeveler Cluster=%s.\n",
cluster_name);
sendMachineAddEvent(start_events_transid, cluster_object);
print_message(INFO_MESSAGE,
"Send event notification: PTP Queue added for LoadLeveler Cluster=%s.\n",
cluster_name);
sendQueueAddEvent(start_events_transid, cluster_object);
break;
/*-----------------------------------------------------------------------*
* multicluster *
*-----------------------------------------------------------------------*/
case 1:
if (cluster_list == NULL) {
cluster_list = NewList();
job_list = NewList();
}
/* end if no list obtained yet */
query_elem = NULL;
query_elem = my_ll_query(MCLUSTERS);
if (query_elem == NULL) {
print_message(ERROR_MESSAGE,
"Unable to obtain query element. LoadLeveler may not be active.\n");
multicluster_status = -1;
return;
}
print_message(INFO_MESSAGE, "Call LoadLeveler (ll_set_request) for clusters.\n");
rc = my_ll_set_request(query_elem, QUERY_ALL, NULL, ALL_DATA);
if (rc != 0) {
rc = my_ll_deallocate(query_elem);
query_elem = NULL;
multicluster_status = -1;
return;
}
print_message(INFO_MESSAGE, "Call LoadLeveler (ll_get_objs) for clusters.\n");
cluster = my_ll_get_objs(query_elem, LL_SCHEDD, NULL, &cluster_count, &rc);
if (rc != 0) {
rc = my_ll_deallocate(query_elem);
query_elem = NULL;
multicluster_status = -1;
return;
}
print_message(INFO_MESSAGE, "Number of LoadLeveler Clusters=%d.\n", cluster_count);
i = 0;
while ((cluster != NULL) && (query_elem != NULL)) {
print_message(INFO_MESSAGE, "Cluster %d:\n", i);
rc = my_ll_get_data(cluster, LL_MClusterName, &cluster_name);
if (rc != 0) {
rc = my_ll_free_objs(query_elem);
rc = my_ll_deallocate(query_elem);
query_elem = NULL;
multicluster_status = -1;
return;
}
else {
cluster_object = (ClusterObject *) malloc(sizeof(ClusterObject));
malloc_check(cluster_object, __FUNCTION__, __LINE__);
memset(cluster_object, '\0', sizeof(cluster_object));
cluster_object->proxy_generated_cluster_id = generate_id();
cluster_object->proxy_generated_queue_id = generate_id();
cluster_object->cluster_state = MY_STATE_UP;
cluster_object->queue_state = MY_STATE_UP;
rc = my_ll_get_data(cluster, LL_MClusterLocal, &cluster_local);
cluster_object->cluster_name = strdup(cluster_name);
if (cluster_local == 1) {
cluster_object->cluster_is_local = 1;
}
cluster_object->node_hash = HashCreate(1024);
print_message(INFO_MESSAGE, "Cluster name=%s\n", cluster_name);
AddToList(cluster_list, (void *) cluster_object);
print_message(INFO_MESSAGE,
"Send event notification: PTP Machine added for LoadLeveler Cluster=%s.\n",
cluster_name);
sendMachineAddEvent(start_events_transid, cluster_object);
print_message(INFO_MESSAGE,
"Send event notification: PTP Queue added for LoadLeveler Cluster=%s.\n",
cluster_name);
sendQueueAddEvent(start_events_transid, cluster_object);
}
i++;
cluster = my_ll_next_obj(query_elem);
}
if (query_elem != NULL) {
rc = my_ll_free_objs(query_elem);
rc = my_ll_deallocate(query_elem);
}
query_elem = NULL;
break;
}
}
print_message(TRACE_MESSAGE, "<<< %s returning. line=%d.\n", __FUNCTION__, __LINE__);
return;
}
#endif
void
send_string_attrs(int trans_id, int flags)
{
/*
* Send string attributes, including default values, to front end
*/
TRACE_ENTRY;
int i;
for (i = 0; i < (sizeof string_launch_attrs / sizeof(string_launch_attr)); i++) {
if ((string_launch_attrs[i].type & flags) == flags) {
enqueue_event(proxy_attr_def_string_event(trans_id,
string_launch_attrs[i].id,
string_launch_attrs[i].short_name,
string_launch_attrs[i].long_name, 0,
string_launch_attrs[i].default_value));
}
}
TRACE_EXIT;
}
void
send_int_attrs(int trans_id, int flags)
{
/*
* Send integer attributes, including default value, lower and upper
* bounds, to front end
*/
int i;
TRACE_ENTRY;
for (i = 0; i < (sizeof int_attrs / sizeof(int_launch_attr)); i++) {
if ((int_attrs[i].type & flags) == flags) {
enqueue_event(proxy_attr_def_int_limits_event(trans_id,
int_attrs[i].id, int_attrs[i].short_name,
int_attrs[i].long_name, 0,
int_attrs[i].default_value,
int_attrs[i].llimit,
int_attrs[i].ulimit));
}
}
TRACE_EXIT;
}
void
send_long_int_attrs(int trans_id, int flags)
{
/*
* Send long (64 bit) intgeger attributes, including default value and
* lower and upper bounds to front end
*/
int i;
TRACE_ENTRY;
for (i = 0; i < (sizeof long_int_attrs / sizeof(long_int_launch_attr)); i++) {
if ((long_int_attrs[i].type & flags) == flags) {
enqueue_event(proxy_attr_def_long_int_limits_event(trans_id,
long_int_attrs[i].id,
long_int_attrs[i].short_name,
long_int_attrs[i].long_name, 0,
long_int_attrs[i].default_value,
long_int_attrs[i].llimit,
long_int_attrs[i].ulimit));
}
}
TRACE_EXIT;
}
void
send_enum_attrs(int trans_id, int flags)
{
/*
* Send enumerated attributes, including default and allowable values, to
* front end.
*/
int i;
TRACE_ENTRY;
for (i = 0; i < (sizeof enum_attrs / sizeof(enum_launch_attr)); i++) {
char *cp;
char *end_cp;
char *cp_save;
int n;
proxy_msg *msg;
if ((enum_attrs[i].type & flags) == flags) {
/*
* Count the number of enumerations in the enum list. Enumerations are
* delimited by '|' so number of enumerations is number of '|' + 1
*/
cp = enum_attrs[i].enums;
n = 1;
while (*cp != '\0') {
if ((*cp) == '|') {
n = n + 1;
}
cp = cp + 1;
}
/*
* Create the enumeration attribute definition header
*/
msg = proxy_attr_def_enum_event(trans_id, enum_attrs[i].id,
enum_attrs[i].short_name, enum_attrs[i].long_name, 0,
enum_attrs[i].default_value, n);
/*
* Append enumerations to message. Since enumerations string is
* a string literal that is illegal to modify, create a working copy
*/
cp = strdup(enum_attrs[i].enums);
cp_save = cp;
for (;;) {
end_cp = strchr(cp, '|');
if (end_cp == NULL) {
proxy_msg_add_string(msg, cp);
break;
}
else {
*end_cp = '\0';
proxy_msg_add_string(msg, cp);
cp = end_cp + 1;
}
}
free(cp_save);
/*
* Send the attribute definition
*/
enqueue_event(msg);
}
}
TRACE_EXIT;
}
void
send_local_default_attrs(int trans_id)
{
/*
* Send values of any PE environment variables (MP_*) to front end. These
* are sent as string attributes with the MP_ prefix replaced with
* EN_. These are used by the front end to set local (user) default values
* for PE environment values, overriding IBM defaults.
*/
char **env;
char *cp;
char *value;
TRACE_ENTRY;
env = environ;
while (*env != NULL) {
/*
* Check if the environment variable is a PE environment variable.
*/
if (memcmp(*env, "MP_", 3) == 0) {
/*
* Make a duplicate of the environment then split into name/value
* at '=', change the first two characters of the name to 'EN' and
* send it to the front end as a string attribute. These attributes
* do not have attribute name or description since they do not
* display in the GUI.
*/
cp = strdup(*env);
value = strchr(cp, '=');
if (value != NULL) {
*value = '\0';
value = value + 1;
memcpy(cp, "EN", 2);
enqueue_event(proxy_attr_def_string_event(trans_id, cp, "", "", 0, value));
}
free(cp);
}
env = env + 1;
}
TRACE_EXIT;
}
#ifdef __linux__
void
discover_jobs()
{
/*
* Look for already running poe jobs started by this user, and inform
* the front end of each new job. For Linux, new jobs are found by
* reading /proc looking for directories corresponding to processes.
* For each process which is owned by the user running the proxy, and
* where the executable named in the command line is /usr/bin/poe,
* generate the new job.
*/
DIR *procdir;
struct dirent *proc_entry;
uid_t my_uid;
struct stat stat_info;
int status;
char cmd_path[PATH_MAX];
TRACE_ENTRY;
my_uid = getuid();
procdir = opendir("/proc");
if (procdir == NULL) {
TRACE_EXIT;
return;
}
status = chdir("/proc");
if (status != 0) {
TRACE_EXIT;
return;
}
proc_entry = readdir(procdir);
/*
* For each filename in /proc, determine if it is a directory owned by
* the user running the proxy. If it is, then attempt to read
* /proc/<pid>/cmdline and verify the executable's pathname. If it is
* /usr/bin/poe, then this is a new job to be added to the job list.
* Note that since a process may exit at any time, this function must
* anticipate errors attempting to open or read files in /proc.
*/
while (proc_entry != NULL) {
status = stat(proc_entry->d_name, &stat_info);
if (status == 0) {
if ((stat_info.st_uid == my_uid) && (S_ISDIR(stat_info.st_mode))) {
FILE *cmdline_file;
snprintf(cmd_path, sizeof cmd_path, "/proc/%s/cmdline", proc_entry->d_name);
cmdline_file = fopen(cmd_path, "r");
if (cmdline_file != NULL) {
char cmdline[PATH_MAX];
char *cp;
cp = fgets(cmdline, sizeof cmdline, cmdline_file);
if (cp != NULL) {
int len;
/*
* The command line is an array of '\0' terminated
* tokens representing arg[0] thru arg[n]. Look
* at the last 3 characters of arg[0] to see if
* it is a poe process.
*/
len = strlen(cmdline);
if (len >= 3) {
cp = cp + len - 3;
}
if (strcmp(cp, "poe") == 0) {
add_discovered_job(proc_entry->d_name);
}
}
fclose(cmdline_file);
}
}
}
proc_entry = readdir(procdir);
}
closedir(procdir);
TRACE_EXIT;
}
#endif
#ifdef _AIX
void
discover_jobs()
{
/*
* Look for already running poe jobs started by this user, and inform
* the front end of each new job. For AIX, do this by querying the
* process table and notifying the front end for each job detected.
*/
struct procsinfo *procinfo;
int num_procs;
uid_t uid;
pid_t start_pid;
/*
* Just get the number of entries in the process table so that we know
* how big of an array to allocate. Note that processes may be added or
* deleted from the process table before the next call to getprocs, so it
* is remotely possible that a newly started poe process is missed or
* a poe process just terminating is found. The chances of either happening
* are small and not worth the trouble of iterating on getprocs() to ensure
* all processes are detected
*/
TRACE_ENTRY;
start_pid = 0;
num_procs = getprocs(NULL, sizeof(struct procsinfo), NULL,
sizeof(struct fdsinfo), &start_pid, INT_MAX);
if (num_procs > 0) {
procinfo = (struct procsinfo *) malloc(num_procs * sizeof(struct procsinfo));
malloc_check(procinfo, __FUNCTION__, __LINE__);
/*
* Now get the actual process table.
* start_pid must be reset in order for getprocs to get process
* table information.
*/
start_pid = 0;
num_procs = getprocs(procinfo, sizeof(struct procsinfo), NULL,
sizeof(struct fdsinfo), &start_pid, num_procs);
if (num_procs >= 0) {
int i;
uid = getuid();
/*
* Notify the front end for each poe process owned by the user
*/
for (i = 0; i < num_procs; i++) {
if (procinfo[i].pi_uid == uid) {
char *cp;
cp = strrchr(procinfo[i].pi_comm, '/');
if (cp == NULL) {
cp = procinfo[i].pi_comm;
}
else {
cp = cp + 1;
}
if (strcmp(cp, "poe") == 0) {
char poe_pid[20];
snprintf(poe_pid, sizeof poe_pid, "%d", procinfo[i].pi_pid);
add_discovered_job(poe_pid);
}
}
}
}
free(procinfo);
}
TRACE_EXIT;
}
#endif
void
add_discovered_job(char *pid)
{
/*
* Create a new job object for the discovered job, add it to the job
* list, and notify the proxy of the new job.
*/
jobinfo *job;
struct passwd *userinfo;
char jobid_str[12];
char queueid_str[12];
char jobname[40];
TRACE_ENTRY;
job = calloc(1, sizeof(jobinfo));
malloc_check(job, __FUNCTION__, __LINE__);
job->proxy_jobid = generate_id();
job->submit_jobid = "";
job->poe_pid = atoi(pid);
job->discovered_job = 1;
job->submit_time = 0;
sprintf(jobid_str, "%d", job->proxy_jobid);
sprintf(queueid_str, "%d", queue_id);
userinfo = getpwuid(getuid());
snprintf(jobname, sizeof jobname, "%s.run_%s", userinfo->pw_name, pid);
jobname[sizeof jobname - 1] = '\0';
AddToList(jobs, job);
enqueue_event(proxy_new_job_event(start_events_transid, queueid_str,
jobid_str, jobname, JOB_STATE_INIT, job->submit_jobid));
/*
* Start a thread to watch for the attach.cfg file for this job
* as if it was invoked by the front end.
*/
pthread_create(&job->startup_thread, &thread_attrs, startup_monitor, job);
TRACE_EXIT;
}
/*************************************************************************/
/* STDOUT/STDERR handling */
/*************************************************************************/
int
stdout_handler(int fd, void *job)
{
return write_output(fd, (jobinfo *) job, &((jobinfo *) job)->stdout_info);
}
int
stderr_handler(int fd, void *job)
{
return write_output(fd, (jobinfo *) job, &((jobinfo *) job)->stderr_info);
}
int
write_output(int fd, jobinfo * job, ioinfo * file_info)
{
/*
* Read available data from the file descriptor and send it to
* front end. Data is read from a non-blocking pipe to avoid hanging
* on a read of an incomplete line of data. As a result, the data read
* may be an incomplete line. This requires that as data is read, it is
* appended to a second buffer until a newline character is read.
* Once the newline character is read, the second buffer is sent to the
* front end and the buffer reset so the next line of output can be
* built.
* FIX: May need to flush an incomplete line of output if the
* application process exits (and closes the pipe) before the ending
* newline is seen.
*/
int byte_count;
char *cp;
byte_count = read(fd, file_info->read_buf, READ_BUFFER_SIZE - 1);
while (byte_count > 0) {
file_info->read_buf[byte_count] = '\0';
cp = file_info->read_buf;
while (*cp != '\0') {
check_bufsize(file_info);
*file_info->cp = *cp;
if (*cp == '\n') {
check_bufsize(file_info);
file_info->cp = file_info->cp + 1;
*file_info->cp = '\0';
file_info->write_func(job, file_info->write_buf);
file_info->cp = file_info->write_buf;
file_info->remaining = file_info->allocated;
}
else {
file_info->cp = file_info->cp + 1;
file_info->remaining = file_info->remaining - 1;
}
cp = cp + 1;
}
byte_count = read(fd, file_info->read_buf, READ_BUFFER_SIZE - 1);
}
if (byte_count == -1) {
if ((errno == EINTR) || (errno == EAGAIN)) {
return 0;
}
else {
return -1;
}
}
else {
print_message(TRACE_DETAIL_MESSAGE, "EOF for fd %d. Unregistering handler\n", fd);
UnregisterFileHandler(fd);
close(fd);
return 0;
}
}
void
check_bufsize(ioinfo * file_info)
{
if (file_info->remaining == 0) {
file_info->allocated = file_info->allocated * 2;
file_info->write_buf = (char *) realloc(file_info->write_buf, file_info->allocated);
malloc_check(file_info->write_buf, __FILE__, __LINE__);
}
}
void
send_stdout(jobinfo * job, char *buf)
{
/*
* Send the data written to stdio file descriptors to the front end.
* If the user requested I/O to be split by task, then tag the message
* with the task id of the originating task. Otherwise, tag it for task
* 0. In order to properly tag stdio, the proxy internally sets
* MP_LABELIO=yes If the user had also set MP_LABELIO=yes, then send
* the output with the PE-generated task id to the front end. Otherwise
* strip off the task id before sending the message.
*/
char *cp;
char *outp;
int task;
cp = strchr(buf, ':');
if (cp != NULL) {
if (job->split_io) {
*cp = '\0';
task = atoi(buf);
*cp = ':';
}
else {
task = 0;
}
if (job->label_io) {
outp = buf;
}
else {
outp = cp + 1;
}
}
else {
task = 0;
outp = buf;
}
send_process_state_output_event(start_events_transid, job->tasks[task].proxy_taskid, outp);
}
void
send_stderr(jobinfo * job, char *buf)
{
/*
* Send the data written to stderr file descriptors to the front end.
*/
fprintf(stderr, "%s", buf);
}
/*************************************************************************/
/* Functions to send events to front end GUI */
/*************************************************************************/
static void
send_ok_event(int trans_id)
{
proxy_msg *msg;
msg = proxy_ok_event(trans_id);
enqueue_event(msg);
}
void
post_error(int trans_id, int type, char *msgtext)
{
/*
* Send an error message to the front end
*/
proxy_msg *msg;
fprintf(stderr, "post_error: %s\n", msgtext);
fflush(stderr);
msg = proxy_error_event(trans_id, type, msgtext);
enqueue_event(msg);
return;
}
void
post_submitjob_error(int trans_id, char *subid, char *msgtext)
{
/*
* Send an error message to the front end
*/
proxy_msg *msg;
fprintf(stderr, "post_submitjob_error: %s\n", msgtext);
fflush(stderr);
msg = proxy_submitjob_error_event(trans_id, subid, 0, msgtext);
enqueue_event(msg);
return;
}
/*
* Functions to send event notifications to front end. The attributes
* and attribute values allowed for each event are defined in proxy_event.h
*/
static proxy_msg *
proxy_attr_def_int_limits_event(int trans_id, char *id,
char *name, char *desc, int display, int def, int llimit,
int ulimit)
{
proxy_msg *msg;
msg = new_proxy_msg(PROXY_EV_RT_ATTR_DEF, trans_id);
proxy_msg_add_int(msg, 1);
proxy_msg_add_int(msg, 8);
proxy_msg_add_string(msg, id);
proxy_msg_add_string(msg, "INTEGER");
proxy_msg_add_string(msg, name);
proxy_msg_add_string(msg, desc);
proxy_msg_add_int(msg, display);
proxy_msg_add_int(msg, def);
proxy_msg_add_int(msg, llimit);
proxy_msg_add_int(msg, ulimit);
return msg;
}
static proxy_msg *
proxy_attr_def_long_int_limits_event(int trans_id, char *id,
char *name, char *desc, int display, long long def,
long long llimit, long long ulimit)
{
proxy_msg *msg;
char num_str[22];
msg = new_proxy_msg(PROXY_EV_RT_ATTR_DEF, trans_id);
proxy_msg_add_int(msg, 1);
proxy_msg_add_int(msg, 8);
proxy_msg_add_string(msg, id);
proxy_msg_add_string(msg, "BIGINTEGER");
proxy_msg_add_string(msg, name);
proxy_msg_add_string(msg, desc);
proxy_msg_add_int(msg, display);
sprintf(num_str, "%lld", def);
proxy_msg_add_string(msg, num_str);
sprintf(num_str, "%lld", llimit);
proxy_msg_add_string(msg, num_str);
sprintf(num_str, "%lld", ulimit);
proxy_msg_add_string(msg, num_str);
return msg;
}
static proxy_msg *
proxy_attr_def_enum_event(int trans_id, char *id,
char *name, char *desc, int display, char *def, int count)
{
proxy_msg *msg;
msg = new_proxy_msg(PROXY_EV_RT_ATTR_DEF, trans_id);
proxy_msg_add_int(msg, 1);
proxy_msg_add_int(msg, count + 6);
proxy_msg_add_string(msg, id);
proxy_msg_add_string(msg, "ENUMERATED");
proxy_msg_add_string(msg, name);
proxy_msg_add_string(msg, desc);
proxy_msg_add_int(msg, display);
proxy_msg_add_string(msg, def);
return msg;
}
static void
send_new_node_list(int trans_id, int machine_id, List * new_nodes)
{
proxy_msg *msg;
node_refcount *noderef;
char id_string[12];
pthread_mutex_lock(&node_lock);
SetList(new_nodes);
sprintf(id_string, "%d", machine_id);
msg = proxy_new_node_event(trans_id, id_string, SizeOfList(new_nodes));
noderef = GetListElement(new_nodes);
while (noderef != NULL) {
sprintf(id_string, "%d", noderef->proxy_nodeid);
proxy_add_node(msg, id_string, noderef->key, NODE_STATE_UP, 0);
noderef = GetListElement(new_nodes);
}
pthread_mutex_unlock(&node_lock);
enqueue_event(msg);
}
static void
send_job_state_change_event(int trans_id, int proxy_jobid, char *state)
{
proxy_msg *msg;
char jobid_str[12];
sprintf(jobid_str, "%d", proxy_jobid);
msg = proxy_job_change_event(trans_id, jobid_str, 1);
proxy_msg_add_keyval_string(msg, JOB_STATE_ATTR, state);
enqueue_event(msg);
}
static void
send_process_state_change_event(int trans_id, jobinfo * job, char *state)
{
/*
* Send state change event for all processes associated with the job
*
* Find each range of consecutive process object ids and send a
* notification to the front end for that range of processes. This
* usually reduces the number of process state change messages sent
* to the front end. In the most common case, where a single
* application was started before the next application was started,
* there will only be a single message. If two application's
* startup overlaps, then there is no guarantee of consecutive
* ids for processes, then more than one message may be generated.
*/
taskinfo *tasks;
taskinfo *info;
int task_index;
int start_task;
int next_task;
proxy_msg *msg;
char range[25];
int i;
tasks = job->tasks;
if (tasks == NULL) {
return;
}
task_index = -1;
info = tasks;
i = 0;
while (i < job->numtasks) {
task_index = info->proxy_taskid;
start_task = task_index;
next_task = task_index;
/*
* Loop as long as the process object ids are consecutive and
* not end of list.
*/
while (task_index == next_task) {
i = i + 1;
info = &tasks[i];
next_task = next_task + 1;
if (i >= job->numtasks) {
break;
}
task_index = info->proxy_taskid;
}
if (task_index != -1) {
/*
* Generate a process state change event for a consecutive
* range of process id objects. next_task will have a value
* 1 more than the last consecutive object id.
*/
snprintf(range, sizeof range, "%d-%d", start_task, next_task - 1);
range[sizeof range - 1] = '\0';
msg = proxy_process_change_event(trans_id, range, 1);
proxy_msg_add_keyval_string(msg, PROC_STATE_ATTR, state);
enqueue_event(msg);
task_index = -1;
}
}
}
static void
send_process_state_output_event(int trans_id, int procid, char *output)
{
proxy_msg *msg;
char procid_str[12];
sprintf(procid_str, "%d", procid);
msg = proxy_process_change_event(trans_id, procid_str, 1);
proxy_msg_add_keyval_string(msg, PROC_STDOUT_ATTR, output);
print_message(TRACE_DETAIL_MESSAGE, "Sent stdout: %s\n", output);
enqueue_event(msg);
}
static int
generate_id()
{
return base_id + last_id++;
}
static void
enqueue_event(proxy_msg * msg)
{
proxy_svr_queue_msg(pe_proxy, msg);
}
/*************************************************************************/
/* Proxy startup and command loop */
/*************************************************************************/
int
main(int argc, char *argv[])
{
char *host = "localhost";
char *proxy_str = DEFAULT_PROXY;
int ch;
int port = PROXY_TCP_PORT;
int rc;
int debug;
char *cp;
int n;
strcpy(miniproxy_path, argv[0]);
cp = strrchr(miniproxy_path, '/');
if (cp != NULL) {
strcpy(cp + 1, "ptp_ibmpe_miniproxy");
}
else {
strcpy(miniproxy_path, "./ptp_ibmpe_miniproxy");
}
#ifdef __linux__
while ((ch = getopt_long(argc, argv, "D:P:p:h:t:l:m:o:r:x:y:z:LSM", longopts, NULL)) != -1) {
switch (ch) {
case 'P':
proxy_str = optarg;
break;
case 'p':
port = atoi(optarg);
break;
case 'h':
host = optarg;
break;
case 'L':
use_load_leveler = 1;
break;
case 't':
if (strcmp(optarg, "Function") == 0) {
state_trace = 1;
state_trace_detail = 0;
}
else if (strcmp(optarg, "Detail") == 0) {
state_trace = 1;
state_trace_detail = 1;
state_info = 1;
}
else if (strcmp(optarg, "None") == 0) {
state_trace = 0;
state_trace_detail = 0;
}
else {
print_message(FATAL_MESSAGE, "Incorrect trace level '%s'\n", optarg);
return 1;
}
break;
case 'D':
break;
case 'l':
user_libpath = strdup(optarg); /* user has specified override full path to LoadLeveler shared library */
break;
case 'm':
if (strncmp(optarg, "y", 1) == 0) { /* y - multicluster forced on */
multicluster_status = 1; /* force multicluster */
}
else if (strncmp(optarg, "n", 1) == 0) { /* n - multicluster forced off */
multicluster_status = 0; /* force local */
}
else { /* d - default to LoadLeveler determine multicluster state */
multicluster_status = -1; /* allow LoadLeveler to determine mode */
}
break;
case 'x': /* min node polling */
min_node_sleep_seconds = atoi(optarg);
break;
case 'y': /* max node polling */
max_node_sleep_seconds = atoi(optarg);
break;
case 'z': /* job polling */
job_sleep_seconds = atoi(optarg);
break;
case 'S':
debug = 1;
while (debug) {
sleep(1);
}
break;
case 'M':
run_miniproxy = 1;
break;
default:
print_message(FATAL_MESSAGE,
"%s [--proxy=proxy] [--host=host_name] [--port=port] [--useloadleveler] [--trace=level] [--lib_override=directory] [--multicluster=d|n|y] [--template_override=file] [--template_write=y|n] --node_polling_min=value --node_polling_max=value --job_polling=value\n",
argv[0]);
exit(1);
}
}
#else
/*
* AIX does not have the getopt_long function. Since the proxy is
* invoked only by the PTP front end, the following simplified options
* parsing is sufficient.
* Make sure that this is maintained in sync with the above
* getopt_long loop.
*/
n = 1;
while (n < argc) {
cp = strchr(argv[n], '=');
if (cp == NULL) {
if (strcmp(argv[n], "--suspend_at_startup") == 0) {
debug = 1;
while (debug) {
sleep(1);
}
}
else if (strcmp(argv[n], "--runMiniproxy") == 0) {
run_miniproxy = 1;
}
else if (strcmp(argv[n], "--useloadleveler") == 0) {
use_load_leveler = 1;
}
else {
print_message(FATAL_MESSAGE, "Invalid argument %s (%d)\n", argv[n], n);
print_message(FATAL_MESSAGE,
"%s [--proxy=proxy] [--host=host_name] [--port=port] [--useloadleveler=y/n] [--trace=level] [--lib_override=directory] [--multicluster=d|n|y] --node_polling_min=value --node_polling_max=value --job_polling=value\n",
argv[0]);
exit(1);
}
}
else {
*cp = '\0';
if (strcmp(argv[n], "--proxy") == 0) {
proxy_str = cp + 1;
}
else if (strcmp(argv[n], "--port") == 0) {
port = atoi(cp + 1);
}
else if (strcmp(argv[n], "--host") == 0) {
host = cp + 1;
}
else if (strcmp(argv[n], "--trace") == 0) {
if (strcmp(cp + 1, "Function") == 0) {
state_trace = 1;
state_trace_detail = 0;
}
else if (strcmp(cp + 1, "Detail") == 0) {
state_trace = 1;
state_trace_detail = 1;
state_info = 1;
}
else if (strcmp(cp + 1, "None") == 0) {
state_trace = 0;
state_trace_detail = 0;
}
else {
print_message(FATAL_MESSAGE, "Incorrect trace level '%s'\n", argv[n + 1]);
return 1;
}
}
else if (strcmp(argv[n], "--lib-override") == 0) {
user_libpath = strdup(cp + 1);
}
else if (strcmp(argv[n], "--multicluster") == 0) {
if (strncmp(cp + 1, "y", 1) == 0) {
multicluster_status = 1;
}
else if (strncmp(cp + 1, "n", 1) == 0) {
multicluster_status = 0;
}
else {
multicluster_status = -1;
}
}
else if (strcmp(argv[n], "--node_polling_min") == 0) {
min_node_sleep_seconds = atoi(cp + 1);
}
else if (strcmp(argv[n], "--node_polling_max") == 0) {
max_node_sleep_seconds = atoi(cp + 1);
}
else if (strcmp(argv[n], "--job_polling") == 0) {
job_sleep_seconds = atoi(cp + 1);
}
else if (strcmp(argv[n], "--debug") == 0) {
/* Do nothing */
}
else {
print_message(FATAL_MESSAGE, "Invalid argument %s (%d)\n", argv[n], n);
print_message(FATAL_MESSAGE,
"%s [--proxy=proxy] [--host=host_name] [--port=port] [--useloadleveler] [--trace=level] [--lib_override=directory] [--multicluster=d|n|y] --node_polling_min=value --node_polling_max=value --job_polling=value\n",
argv[0]);
exit(1);
}
}
n = n + 1;
}
#endif
if (use_load_leveler) {
if (find_load_leveler_library() != 0) {
exit(1);
}
}
ptp_signal_exit = 0;
signal(SIGINT, ptp_signal_handler);
signal(SIGHUP, SIG_IGN);
signal(SIGILL, ptp_signal_handler);
signal(SIGSEGV, ptp_signal_handler);
signal(SIGTERM, ptp_signal_handler);
signal(SIGQUIT, ptp_signal_handler);
signal(SIGABRT, ptp_signal_handler);
rc = server(proxy_str, host, port);
HashDestroy(nodes, hash_cleanup);
exit(rc);
}
int
find_load_leveler_library()
{
/*-----------------------------------------------------------------------*
* *
* Find the LoadLeveler shared library we are using for AIX or Linux. *
* If we cannot find it then LoadLeveler is not installed or we have *
* been directed to use the wrong path. If we find it OK then we will *
* try to dynamic open the library and compare versions later in the *
* command_initialize. *
*-----------------------------------------------------------------------*/
int lib_found;
int i;
int status;
int save_errno;
struct stat statinfo;
if (user_libpath != NULL) { /* if user specified lib */
if (strlen(user_libpath) > 0) { /* if not a null string */
libpath[0] = user_libpath; /* pick up user specified libpath
* as only one to check */
libpath[1] = (char *) -1; /* new end of list */
}
}
lib_found = 0; /* preset to not found */
i = 0;
print_message(INFO_MESSAGE, "Searching for LoadLeveler shared library.\n");
while ((libpath[i] != (char *) -1) && (lib_found == 0)) { /* if not end of list and not yet found */
if (libpath[i] != NULL) { /* if valid entry */
strcpy(ibmll_libpath_name, libpath[i]);
strcat(ibmll_libpath_name, "/");
strcat(ibmll_libpath_name, libname);
/* see if this is a valid LoadLeveler shared library */
print_message(INFO_MESSAGE, "Trying: %s\n", ibmll_libpath_name);
status = stat(ibmll_libpath_name, &statinfo);
if (status == 0) {
#ifdef _AIX
strcat(ibmll_libpath_name, "(shr.o)");
#endif
save_errno = errno;
print_message(INFO_MESSAGE,
"Successful search: Found LoadLeveler shared library %s\n",
ibmll_libpath_name);
lib_found = 1; /* we found it */
break;
}
else {
print_message(ERROR_MESSAGE,
"Search failure: \"stat\" of LoadLeveler shared library %s returned errno=%d.\n",
ibmll_libpath_name, save_errno);
}
}
i++;
}
if (lib_found == 0) {
print_message(FATAL_MESSAGE, "No LoadLeveler shared library found - quitting...\n");
return -1;
}
return 0;
}
int
shutdown_proxy(void)
{
return 0;
}
RETSIGTYPE
ptp_signal_handler(int sig)
{
ptp_signal_exit = sig;
if (sig >= 0 && sig < NSIG) {
RETSIGTYPE(*saved_signal) (int) = saved_signals[sig];
if (saved_signal != SIG_ERR && saved_signal != SIG_IGN && saved_signal != SIG_DFL) {
saved_signal(sig);
}
}
}
int
server(char *name, char *host, int port)
{
/*
* Initialize the proxy, connect to front end, then run the main loop
* until the proxy is requested to shut down.
*/
char *msg1;
char *msg2;
int rc;
struct timeval timeout = { 0, 20000 };
jobs = NewList();
if (proxy_svr_init(name, &timeout, &helper_funcs, &command_tab, &pe_proxy)
!= PROXY_RES_OK)
return 0;
rc = proxy_svr_connect(pe_proxy, host, port);
if (rc != PROXY_RES_OK) {
print_message(INFO_MESSAGE, "proxy_connect fails with %d status.\n", rc);
return 0;
}
print_message(INFO_MESSAGE, "Running IBMPE proxy on port %d\n", port);
while (ptp_signal_exit == 0 && !shutdown_requested) {
if ((proxy_svr_progress(pe_proxy) != PROXY_RES_OK)) {
print_message(TRACE_DETAIL_MESSAGE, "Loop ending\n");
break;
}
}
if (ptp_signal_exit != 0) {
switch (ptp_signal_exit) {
case SIGINT:
msg1 = "INT";
msg2 = "Interrupt";
break;
case SIGHUP:
msg1 = "HUP";
msg2 = "Hangup";
break;
case SIGILL:
msg1 = "ILL";
msg2 = "Illegal Instruction";
break;
case SIGSEGV:
msg1 = "SEGV";
msg2 = "Segmentation Violation";
break;
case SIGTERM:
msg1 = "TERM";
msg2 = "Termination";
break;
case SIGQUIT:
msg1 = "QUIT";
msg2 = "Quit";
break;
case SIGABRT:
msg1 = "ABRT";
msg2 = "Process Aborted";
break;
default:
msg1 = "***UNKNOWN SIGNAL***";
msg2 = "ERROR - UNKNOWN SIGNAL";
break;
}
print_message(FATAL_MESSAGE,
"ptp_pe_proxy received signal %s (%s) and is exiting.", msg1, msg2);
shutdown_proxy();
/* our return code = the signal that fired */
rc = ptp_signal_exit;
}
DestroyList(jobs, NULL);
proxy_svr_finish(pe_proxy);
print_message(TRACE_DETAIL_MESSAGE, "proxy_svr_finish returned.\n");
return rc;
}
/*************************************************************************
* Print message (with time and date and thread id) *
* Info, Trace, Arg and Warning messages go to stdout. *
* Error and Fatal messages go to stderr. *
*************************************************************************/
void
print_message(int type, const char *format, ...)
{
va_list ap;
char timebuf[20];
time_t clock;
struct tm a_tm;
int thread_id = 0;
pthread_mutex_lock(&print_message_lock);
pthread_t tid = pthread_self(); /* what thread am I ? */
for (thread_id = 0; thread_id < (sizeof(thread_map_table)
/ sizeof(pthread_t)); thread_id++) {
if (tid == thread_map_table[thread_id]) {
break;
}
}
time(&clock); /* what time is it ? */
localtime_r(&clock, &a_tm);
strftime(&timebuf[0], 15, "%m/%d %02H:%02M:%02S", &a_tm);
va_start(ap, format);
switch (type) {
case INFO_MESSAGE:
if (state_info == 1) { /* if info messages allowed */
if (state_message_timedate != 0) {
fprintf(stdout, "%s ", timebuf);
}
if (state_message_threadid != 0) {
fprintf(stdout, "T(%d) ", thread_id);
}
fprintf(stdout, "Info: ");
vfprintf(stdout, format, ap);
fflush(stdout);
}
break;
case TRACE_MESSAGE:
if (state_trace == 1) { /* if trace messages allowed */
if (state_message_timedate != 0) {
fprintf(stdout, "%s ", timebuf);
}
if (state_message_threadid != 0) {
fprintf(stdout, "T(%d) ", thread_id);
}
fprintf(stdout, "Trace: ");
vfprintf(stdout, format, ap);
fflush(stdout);
}
break;
case TRACE_DETAIL_MESSAGE:
if (state_trace_detail == 1) { /* if trace messages allowed */
if (state_message_timedate != 0) {
fprintf(stdout, "%s ", timebuf);
}
if (state_message_threadid != 0) {
fprintf(stdout, "T(%d) ", thread_id);
}
fprintf(stdout, "Trace: ");
vfprintf(stdout, format, ap);
fflush(stdout);
}
break;
case WARNING_MESSAGE:
if (state_warning == 1) { /* if warning messages allowed */
if (state_message_timedate != 0) {
fprintf(stdout, "%s ", timebuf);
}
if (state_message_threadid != 0) {
fprintf(stdout, "T(%d) ", thread_id);
}
fprintf(stdout, "Warning: ");
vfprintf(stdout, format, ap);
fflush(stdout);
}
break;
case ERROR_MESSAGE:
if (state_error == 1) { /* if error messages allowed */
if (state_message_timedate != 0) {
fprintf(stderr, "%s ", timebuf);
}
if (state_message_threadid != 0) {
fprintf(stderr, "T(%d) ", thread_id);
}
fprintf(stderr, "Error: ");
vfprintf(stderr, format, ap);
fflush(stderr);
}
break;
case FATAL_MESSAGE: /* fatal messages are never suppressed */
if (state_error == 1) { /* if error messages allowed */
if (state_message_timedate != 0) {
fprintf(stderr, "%s ", timebuf);
}
if (state_message_threadid != 0) {
fprintf(stderr, "T(%d) ", thread_id);
}
fprintf(stderr, "Fatal: ");
vfprintf(stderr, format, ap);
fflush(stderr);
}
break;
case ARGS_MESSAGE:
if (state_args == 1) { /* if formatted arg messages are allowed */
if (state_message_timedate != 0) {
fprintf(stdout, "%s ", timebuf);
}
if (state_message_threadid != 0) {
fprintf(stdout, "T(%d) ", thread_id);
}
fprintf(stdout, "Args: ");
vfprintf(stdout, format, ap);
fprintf(stdout, "\n");
fflush(stdout);
}
break;
default: /* unknown message type - allow it */
if (state_message_timedate != 0) {
fprintf(stdout, "%s ", timebuf);
}
if (state_message_threadid != 0) {
fprintf(stdout, "T(%d) ", thread_id);
}
fprintf(stdout, ": ");
vfprintf(stdout, format, ap);
fflush(stdout);
break;
}
va_end(ap);
pthread_mutex_unlock(&print_message_lock);
}
void
print_message_args(int argc, char *optional_args[])
{
int i;
if (optional_args != NULL) {
for (i = 0; i < argc; i++) {
print_message(TRACE_MESSAGE, " '%s'", optional_args[i]);
}
}
}
/*
* This source file formatted using following indent options
* -bap
* -bbb
* -br
* -brs
* -cli4
* -i4
* -l80
* -lp
* -ts8
* -bli4
* -npcs
*/