5.7. マスタ・ワーカ型ジョブ

マスタ・ワーカ型ジョブとは、以下の特徴を持つジョブモデルの1つです。

  • ジョブスクリプトプロセス、マスタプロセス、および、ワーカプロセスから構成されます。
    マスタプロセスとワーカプロセスが協調することで、計算タスク(並列プログラムの処理単位)を実行します。
  • マスタプロセスは、計算タスク全体を統括し、ワーカプロセスの生成や管理、計算結果を取りまとめます。

  • ワーカプロセスは、マスタプロセスから依頼された計算タスクを実行し、結果をマスタプロセスに返します。

  • マスタ・ワーカ型ジョブに割り当てられた計算ノードのダウンや、プロセスの異常終了が発生しても、ジョブスクリプトプロセスが動作している限り、マスタ・ワーカ型ジョブは継続します。

この特徴を利用して、計算ノードダウンやワーカプロセスの異常終了に対し、ワーカプロセスを別のノードで再実行する仕組みをユーザが作ることで計算タスクを継続できます。

../_images/MasterWorkerJob_01.png

参考

  • ジョブスクリプトプロセスが動作するノードを「ジョブマスタノード」、それ以外のノードを「ジョブスレーブノード」と呼びます。マスタプロセスは、ジョブスレーブノードで動作するワーカプロセスを管理するのが目的であるため、ジョブマスタノードで動作させる必要があります。

  • 複数プロセスを生成するという点で「バルクジョブ」は類似していますが、バルクジョブは同一のジョブスクリプトを複数のサブジョブとして投入する方式であり、それぞれのサブジョブは独立して動作します。また、バルクジョブでは、サブジョブの数はジョブ投入時に指定され、実行中に変化しません。 一方、マスタ・ワーカ型ジョブでは、マスタプロセスと各ワーカプロセスが、1つのジョブとして、プロセス間通信を行いながら動作します。また、ワーカプロセスはマスタプロセスによって動的に生成するため、ジョブの実行中に数が変化します。

  • マスタ・ワーカ型ジョブの詳細は、マニュアル「ジョブ運用ソフトウェア エンドユーザ向けガイド マスタ・ワーカ型ジョブ編」を参照してください。

ワーカプロセスの生成方法の観点で、以下の3つの方式をサポートします。

  • この方式は、マスタプロセス、ワーカプロセスが共にMPIプログラムの場合に利用します。
    ワーカプロセスを生成するノードの選択はジョブ運用ソフトウェアが行います。
  • この方式は、ワーカプロセスがMPIプログラムでない場合に利用します。
    ワーカプロセスの生成やそれを生成するノードの選択はユーザーが制御・管理します。
  • この方式は、ワーカプロセスがMPIプログラムでない場合に利用します。
    ワーカプロセスを生成するノードの選択もユーザーが制御・管理します。ただし、ワーカプロセスの生成は、ジョブ運用ソフトウェアが提供するpjaexeコマンドを利用します。

5.7.1. ジョブの投入

ジョブの投入はpjsubコマンドに--mswkオプションとジョブスクリプトを指定します。

[書式]

[_LNlogin]$ pjsub --mswk ジョブスクリプト

注意

  • マスタ・ワーカ型ジョブはノードの割り当て方法にtorusを使用する必要があります。 torus指定ができる全てのリソースグループでマスタ・ワーカ型ジョブを実行できます。

  • 資源の指定(-L)やノード形状などは、必要に応じてpjsubコマンドの引数、または、ジョブスクリプト内に記述してください。

  • pjsubコマンドの--mswkオプションは、--stepオプション、--bulkオプション、--interact オプションと同時には指定できません。

  • マスタ・ワーカ型ジョブでは、ワーカプロセスを生成するノードはジョブ運用ソフトウェアの並列実行環境が決定、または、ユーザがプロセス生成時に指定します。このため、pjsubコマンドの--mpi rank-map-hostfileオプションの指定は意味がありません。このオプションを指定しても無視されます。

5.7.2. ワーカプロセスの動的生成

マスタプロセスからワーカプロセスを動的に生成する方式では、ワーカプロセスの生成や通信はMPIの仕組みを利用します。 ユーザは以下の機能を実装する必要があります。

  1. マスタプログラム(マスタプロセス)

  1. ワーカプロセスの生成

  2. ワーカプロセスへの演算実行依頼

  3. ワーカプロセスの生存確認

  1. ワーカプログラム(ワーカプロセス)

  1. マスタプロセスからの演算実行依頼受信と演算結果の送信

  2. マスタプロセスへの演算終了通知の送信

マスタ・ワーカ型ジョブで実行するMPIプログラムでは、一部のMPI関数やMPIサブルーチンをマスタ・ワーカ型ジョブ向けのものに置き換える必要があります。 これはジョブ運用ソフトウェアの内部でマスタ・ワーカ型ジョブ固有の処理をする必要があるためです。

以下に、これらのMPI関数名およびMPIサブルーチン名を示します。

マスタ・ワーカ型ジョブ用のMPI関数(C言語)

通常ジョブでのMPI関数名

マスタ・ワーカ型ジョブ用MPI関数名

MPI_Comm_connect()

FJMPI_Mswk_connect()

MPI_Comm_disconnect()

FJMPI_Mswk_disconnect()

MPI_Comm_accept()

FJMPI_Mswk_accept()

注釈

上記のマスタ・ワーカ型ジョブ用MPI関数は、ヘッダファイルmpi-ext.hで宣言されています。

マスタ・ワーカ型ジョブ用のMPIサブルーチン(Fortran言語)

通常ジョブでのMPIサブルーチン名

マスタ・ワーカ型ジョブ用MPIサブルーチン名

MPI_COMM_CONNECT()

FJMPI_MSWK_CONNECT()

MPI_COMM_DISCONNECT()

FJMPI_MSWK_DISCONNECT()

MPI_COMM_ACCEPT()

FJMPI_MSWK_ACCEPT()

注釈

上記のマスタ・ワーカ型ジョブ用MPIサブルーチンは、モジュールmpi_f08_extmpi_extで宣言されています。 モジュールmpi_f08_extmpi_extは、それぞれMPIのモジュールmpi_f08mpiに対応しますので、どちらかをUSE文で引用できます。

注意

マスタ・ワーカ型ジョブを使用する場合は、言語環境'4.5.0 tcsds-1.2.31'版以降を使用してください。


ここでは、以下の図の構成のプログラムの例を示します。
MPIプログラムの作成に関する詳細は「MPI使用手引書」等を参照してください。
../_images/MasterWorkerDynamic_01.png

[マスタプログラム master_spawn.c]

#include <mpi.h>
#include <mpi-ext.h>
#include <stdio.h>
#include <string.h>

int main(int argc, char **argv) {
  int world_size, universe_size;
  int *universe_size_p;
  int flag;
  MPI_Status status;

  MPI_Comm worker_comm;
  char master_port[MPI_MAX_PORT_NAME] = "";
  char worker_port[MPI_MAX_PORT_NAME] = "";
  // 各コミュニケーターのルートランク
  const int self_root = 0; // SELF (MPI_COMM_SELF)
  const int master_root = 0; // MASTER (MPI_COMM_WORLD)
  const int worker_root = 0; // WORKER (worker_comm)

  const int tag = 0;
  char *message = "Hello";

  // 初期化処理
  MPI_Init(&argc, &argv);

  // world_size, universe_size を取得する。
  MPI_Comm_size(MPI_COMM_WORLD, &world_size);
  if (world_size != 1) {
    // マスタプロセスが複数個存在する場合
    fprintf(stderr, "Error! world_size=%d (expected 1)", world_size);
    MPI_Abort(MPI_COMM_WORLD, 1);
  }
  MPI_Comm_get_attr(MPI_COMM_WORLD, MPI_UNIVERSE_SIZE, &universe_size_p, &flag);
  if (flag == 0) {
    // universe_size の取得に失敗した場合
    fprintf(stderr, "Error! cannot get universe_size");
    MPI_Abort(MPI_COMM_WORLD, 1);
  }
  universe_size = *universe_size_p;
  printf("universe_size=%d\n", universe_size);
  if (universe_size == 1) {
    // universe_size が 1 の場合
    fprintf(stderr, "Error! universe_size=%d (expected > 1)", universe_size);
    MPI_Abort(MPI_COMM_WORLD, 1);
  }
  // ワーカプロセスとの通信用ポートをオープンする。
  MPI_Open_port(MPI_INFO_NULL, master_port);
  printf("master_port=%s\n", master_port);

  // ワーカプロセスを生成する。
  MPI_Comm_spawn("./worker_spawn.out", MPI_ARGV_NULL, universe_size - 1,
  MPI_INFO_NULL, self_root, MPI_COMM_SELF, &worker_comm, MPI_ERRCODES_IGNORE);

  // ワーカプロセスへポート名を送信する。
  MPI_Send(master_port, MPI_MAX_PORT_NAME, MPI_CHAR, worker_root, tag, worker_comm);

  // ワーカプロセスとの接続を切断する。
  FJMPI_Mswk_disconnect(&worker_comm);

  // ワーカプロセスからのデータを受信する。(ワーカプロセスのポート名が送信されてくる)
  FJMPI_Mswk_accept(master_port, MPI_INFO_NULL, self_root, MPI_COMM_SELF, &worker_comm);
  MPI_Recv(worker_port, MPI_MAX_PORT_NAME, MPI_CHAR, worker_root, tag, worker_comm, &status);
  printf("worker_port=%s\n", worker_port);
  FJMPI_Mswk_disconnect(&worker_comm);

  // ワーカプロセスへデータを送信する。
  FJMPI_Mswk_connect(worker_port, MPI_INFO_NULL, self_root, MPI_COMM_SELF, &worker_comm);
  MPI_Send(message, strlen(message) + 1, MPI_CHAR, worker_root, tag, worker_comm);
  FJMPI_Mswk_disconnect(&worker_comm);

  // 終了処理
  MPI_Close_port(master_port);
  MPI_Finalize();
}

[ワーカプログラム worker_spawn.c]

#include <mpi.h>
#include <mpi-ext.h>
#include <stdio.h>
int main(int argc, char **argv) {

  int rank;
  MPI_Status status;
  MPI_Comm master_comm;
  char master_port[MPI_MAX_PORT_NAME] = "";
  char worker_port[MPI_MAX_PORT_NAME] = "";

  // 各コミュニケーターのルートランク
  const int self_root = 0; // SELF (MPI_COMM_SELF)
  const int master_root = 0; // MASTER (master_comm)
  const int worker_root = 0; // WORKER (MPI_COMM_WORLD)
  const int tag = 0;
  char message[100] = "";

  // 初期化処理
  MPI_Init(&argc, &argv);
  MPI_Comm_get_parent(&master_comm);
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  printf("Hello! rank=%d\n", rank);
  if (rank == worker_root) {
    MPI_Open_port(MPI_INFO_NULL, worker_port);
    printf("worker_port=%s\n", worker_port); fflush(stdout);
  }
  if (rank == worker_root) {

    // マスタプロセスからのデータを受信する。
    MPI_Recv(master_port, MPI_MAX_PORT_NAME, MPI_CHAR, master_root, tag, master_comm, &status);
    printf("master_port=%s\n", master_port); fflush(stdout);
  }

  // マスタプロセスとの通信を切断する。
  FJMPI_Mswk_disconnect(&master_comm);

  // マスタプロセスへポート名を送信する。
  if (rank == worker_root) {
    FJMPI_Mswk_connect(master_port, MPI_INFO_NULL, self_root, MPI_COMM_SELF, &master_comm);
    MPI_Send(worker_port, MPI_MAX_PORT_NAME, MPI_CHAR, master_root, tag, master_comm);
    FJMPI_Mswk_disconnect(&master_comm);
  }

  // マスタプロセスからのデータを受信する。
  if (rank == worker_root) {
    FJMPI_Mswk_accept(worker_port, MPI_INFO_NULL, self_root, MPI_COMM_SELF, &master_comm);
    MPI_Recv(message, MPI_MAX_PORT_NAME, MPI_CHAR, master_root, tag, master_comm, &status);
    printf("message=%s\n", message);
    FJMPI_Mswk_disconnect(&master_comm);
  }

  // 終了処理
  if (rank == worker_root) {
    MPI_Close_port(worker_port);
  }
  MPI_Finalize();
}

ワーカプロセスを動的に生成する方式では、MPIプログラムの開始時に生成されるマスタプロセスがジョブマスタノードだけで起動するように、pjsubコマンドの--mpi "shape=1" --mpi "proc=1"オプションを指定する必要があります。

以下の例では、ジョブに対して385ノードを割り当て、MPIプログラム起動時に生成されるマスタプロセスに1ノードを割り当てます。 残りの384ノードが、ワーカプロセスを動的に生成するためのノードになります。

[_LNlogin]$ mpifccpx -o worker_spawn.out woker-spawn.c
[_LNlogin]$ mpifccpx -o master-spawn.out master-spawn.c
[_LNlogin]$ cat job_dynamic.sh
#!/bin/bash -x
#PJM -L "node=385"
#PJM -L "rscgrp=large"
#PJM -L "elapse=10:00"
#PJM --mpi "shape=1"
#PJM --mpi "proc=1"
#PJM -g groupname
#PJM -x PJM_LLIO_GFSCACHE=/vol000N
#PJM -s

export PLE_MPI_STD_EMPTYFILE=off
mpiexec -stdout-proc ./output.%j/%/1000r/stdout -stderr-proc ./output.%j/%/1000r/stderr ./master-spawn.out
[_LNlogin]$ pjsub --mswk job_dynamic.sh

5.7.3. Agentプロセスによるワーカプロセス生成

Agentプロセスからワーカプロセスを生成する方式(以降、Agentプロセス方式)は、ワーカプロセスが非MPIプログラムの場合に利用します。 Agentプロセス方式では、各ノードでAgentプロセスが1つだけ生成されるようにするため、ジョブに割り当てるノード数とmpiexecコマンドで生成するプロセス数を同じにしてください。

この方式では、ユーザはジョブに対し、以下の機能を実装する必要があります。

  1. ジョブスクリプト

  1. マスタプロセスとなるマスタプログラムの実行

  2. Agentプロセスの生成

  3. マスタプロセスの終了待ち合わせ

  1. マスタプログラム(マスタプロセス)

  1. Agentプロセスの生存確認

  2. Agentプロセスへのワーカプロセス生成依頼

  1. Agentプログラム(Agentプロセス)

  1. マスタプロセスとの通信確立

  2. ワーカプロセスの生成

  3. ワーカプロセスの処理結果のマスタプロセスへの送信

ここでは、以下の図の構成のプログラムの例を示します。

../_images/MasterWorkerAgent_01.png

[ジョブスクリプト job_agent.sh]

#!/bin/bash
#PJM -L "node=385"
#PJM -L "rscgrp=large"
#PJM -L "elapse=10:00"
#PJM --mpi "proc=385"
#PJM -g groupname
#PJM -x PJM_LLIO_GFSCACHE=/vol000N

. utility.sh

# マスタプロセスを生成する。
./master_agent.out port_num.txt &
MASTER_PID=$!
PORT_NUM=$(cat port_num.txt)

# ジョブマスタノード(このノード)のIPアドレスを取得する。
IP_ADDR=$(print_ipaddr "tofu1")

# mpiexec コマンドで Agent プロセスを生成する。
mpiexec start_agent.sh "${IP_ADDR}" "${PORT_NUM}" &
wait ${MASTER_PID}

[マスタプログラム master_agent.c]

#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
int main(int argc, char **argv)
{
  char *port_file = argv[1];

  int sockfd = socket(...); // ソケットを作成する。
  bind(sockfd, ...);        // ソケットを特定のポートにバインドする。
  listen(sockfd, ...);      // ワーカプロセスからの接続を待つ。

  FILE *fp = fopen(port_file, "w");
  fprintf(fp, "%d", port_number); // ポート番号をファイルに書き出す。
  fclose(fp);

  while (1) {
    accept(sockfd, ...);    // ワーカプロセスからの接続を受け付ける。
    ...                     // ワーカプロセスに対し、処理を要求する。
  }
}

[Agent プロセス起動スクリプト start_agent.sh]

#!/bin/bash
./agent.out $@

[Agent プログラム agent.c]

#include <string.h>
#include <stdlib.h>
int main(int argc, char **argv)
{
  // コマンドライン引数で与えられたジョブマスタノードのIPアドレスとポート番号を代入する。
  char *ip_addr = argv[1];
  int port_num = atoi(argv[2]);
  char *my_ip_addr;
  get_ipaddr("tofu1", &my_ip_addr);

  // 自ノードがジョブマスタノードの場合 (ジョブマスタノードと同一IPアドレスを持つ場合)、
  // Agent プロセスを終了させる。
  if (strcmp(my_ip_addr, ip_addr) == 0) {
    exit(0);
  }

  // ジョブマスタノードに接続する。(ソケット接続などを用いる)
  connect_to(ip_addr, port_num);

  // ジョブマスタノードからの要求に応じて処理する。
  ...
}

[get_ipaddr()関数]

get_ipaddr()関数は、自ノードのIPアドレスを文字列として返す関数です。

#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/ioctl.h>
#include <netinet/in.h>
#include <net/if.h>
#include <arpa/inet.h>
int get_ipaddr(const char *device_name, const char **ip_addr)
{
  int fd;
  struct ifreq ifr;

  fd = socket(AF_INET, SOCK_DGRAM, 0);
  ifr.ifr_addr.sa_family = AF_INET;

  strncpy(ifr.ifr_name, device_name, IFNAMSIZ - 1);
  int rc = ioctl(fd, SIOCGIFADDR, &ifr);

  if (rc == 0) {
    *ip_addr = inet_ntoa(((struct sockaddr_in *)&ifr.ifr_addr)->sin_addr);
  }
  close(fd);
  return rc;
}

[utility.sh]

# 指定したネットワークインターフェースのIPアドレスを出力する。
# Usage: print_ipaddr <interface>
print_ipaddr() {
  local INTERFACE=$1
  LANG=C ip addr show dev "${INTERFACE}" | sed -n '/.*inet \([0-9.]*\).*/{s//\1/;p}'
}

# 指定したコマンドをタイムアウト付きで実行する。
# Usage: timeout <timeout_sec> <command> <arg1> <arg2> ...
timeout() {
  local TIMEOUT=$1
  shift 1

  # コマンド実行およびプロセスID を記録する。
  eval "$@" &
  local PID=$!
  echo ${PID}
  while true; do

    # コマンドプロセスの生存をチェックする。
    if ! ps -p "${PID}" >/dev/null 2>&1; then
      # プロセスが終了したため、ループを抜ける。
      break
    fi

    if [ "${TIMEOUT}" -le 0 ]; then
      # タイムアウトした場合、プロセスを異常終了させる。
      kill -KILL "${PID}"
      break
    fi

    # 1秒間スリープした後、ループの先頭に戻る。
    sleep 1
    TIMEOUT=$((TIMEOUT - 1))

  done

  # プロセスの終了コードを返す。
  wait "${PID}"
  return $?
}

5.7.4. pjaexeコマンドによるワーカプロセス生成

ワーカプロセスの生成は、Agentプロセスによるワーカプロセス生成以外に、ジョブ運用ソフトウェアが提供するpjaexeコマンドを利用する方式もあります。この方式は、非MPIプログラムの場合に利用します。

この方式では、ユーザは以下の機能を実装する必要があります。

  1. ジョブスクリプト

  1. pjaexeコマンドによるワーカプロセスの起動

  1. マスタプログラム (マスタプロセス)

  1. ワーカプロセスの生存確認 (ワーカプロセスからのコネクションの有無で判断)

  2. ワーカプロセスへのタスク実行依頼

  1. ワーカプログラム (ワーカプロセス)

  1. マスタプロセスとの接続確立

  2. マスタプロセスへの計算結果の送信

ここでは、以下の図の構成のプログラムの例を示します。

../_images/MasterWorkerPjaexe_01.png

[ジョブスクリプト job_pjaexe.sh]

#!/bin/bash
#PJM -L "node=385"
#PJM -L "rscgrp=large"
#PJM -L "elapse=10:00"
#PJM -g groupname
#PJM -x PJM_LLIO_GFSCACHE=/vol000N
# マスタプロセスを実行
./master_pjaexe.sh

[マスタプログラム master_pjaexe.sh]

#!/bin/bash
. utility.sh

# ジョブマスタノード(このノード)のIPアドレスを取得する。
IP_ADDR=$(print_ipaddr "tofu1")

# ワーカプロセスからの接続を受け付けられるようにソケットを初期化する。
PORT_NUM=port番号
...
# すべてのジョブスレーブノードでワーカプロセス worker.out を起動する。
for X in $(seq 0 11); do
  for Y in $(seq 0 7); do
    VCOORD="(${X},${Y})"

    # 60 秒以内に pjaexe コマンドが復帰しない場合は、タイムアウトとする。
    timeout 60 pjaexe --vcoord \"${VCOORD}\" ./worker.out "${IP_ADDR}" "${PORT_NUM}"
    RC=$?
    if [ "${RC}" -eq 1 ]; then
      # ユーザの指定ミスの場合、マスタプロセスを異常終了させる。
      exit 1
    fi

    if [ "${RC}" -ne 0 ]; then
      # pjaexe コマンドが異常終了した場合、ノード故障したと判断し、故障ノードリストに追加する。
      echo "${VCOORD}" >> broken_node_list.txt
    fi
  done
done

# ワーカプロセス worker.out との通信や計算結果の取りまとめ処理
...<省略> ...
# マスタプロセスの終了
exit

参考

通常、マスタプロセスとなるプログラムは C 言語や Fortran 言語などのプログラミング言語で記述しますが、ここでは処理論理を説明するために、マスタプログラムをシェルスクリプトで実装した例を示しています。シェルスクリプトでは実装が難しい処理(ソケットの初期化処理やワーカプロセスとの通信処理)は省略しています。これらの処理については一般的なプロセス間通信の手段を参考にしてください。

[utility.sh]

# 指定したネットワークインターフェースのIPアドレスを出力する。
# Usage: print_ipaddr <interface>
print_ipaddr() {
  local INTERFACE=$1
  LANG=C ip addr show dev "${INTERFACE}" | sed -n '/.*inet \([0-9.]*\).*/{s//\1/;p}'
}

# 指定したコマンドをタイムアウト付きで実行する。
# Usage: timeout <timeout_sec> <command> <arg1> <arg2> ...
timeout() {
  local TIMEOUT=$1
  shift 1

  # コマンド実行およびプロセスID を記録する。
  eval "$@" &
  local PID=$!
  echo ${PID}
  while true; do

    # コマンドプロセスの生存をチェックする。
    if ! ps -p "${PID}" >/dev/null 2>&1; then
      # プロセスが終了したため、ループを抜ける。
      break
    fi

    if [ "${TIMEOUT}" -le 0 ]; then
      # タイムアウトした場合、プロセスを異常終了させる。
      kill -KILL "${PID}"
      break
    fi

    # 1秒間スリープした後、ループの先頭に戻る。
    sleep 1
    TIMEOUT=$((TIMEOUT - 1))

  done

  # プロセスの終了コードを返す。
  wait "${PID}"
  return $?
}

5.7.5. ジョブ作成における注意事項

  • 1ジョブで同時に実行可能なpjaexeコマンドの個数は128個までです。

    • 128を越えて実行しようとした場合は次のメッセージを出力してpjaexeコマンドが異常終了します。

      [ERR.] PLE 0050 plexec cannot be executed any further.
      
    • pjaexeコマンドは--vcoordfileオプションを利用することで1回の実行で複数のノードにプロセスを同時生成できるため、pjaexeコマンドの実行回数削減に利用してください。
      [vcoordfile]
      (0)
      (1)
      (2)
      (3)
      (4)
      

      [コマンドライン]

      pjaexe --vcoordfile vcoordfile ./worker.out "${IP_ADDR}" "${PORT_NUM}"
      
  • ワーカプロセスの生成や異常の検出、およびその対処はジョブ作成者が考える必要があります。
  • ワーカプロセスを動的に生成する方式では、ワーカプロセスが実行されているノードがダウンした場合、そのノード上のワーカプロセスと同じ MPI_COMM_WORLD に属するすべてのワーカプロセスは動作ができなくなります。これらのワーカプロセスは、ユーザが終了させるか、またはジョブが終了するまで残ります。
    また、これらのワーカプロセスが動作していたノードはmpiexecコマンドが終了するまでは、ワーカプロセスの再生成先として選択されません。ただし、mpiexecコマンドを再実行すると、ダウンしたノードが再びワーカプロセスの生成先として選択される可能性があることに注意してください。
  • マスタ・ワーカ型ジョブ内で、MPI通信関数を利用する場合は、以下に注意してください。

    • MPI規格によると、MPI通信処理に失敗した場合、デフォルトでは、通信関数の呼び出し元プロセスも異常終了します。(例えば、処理中に通信先プロセスが異常終了した場合や通信先ノードがダウンした場合)

    • この通信処理は、ユーザが明示的に MPI_Send()、MPI_Recv()、MPI_Bcast() などの MPI通信関数を呼び出した場合だけではなく、MPIライブラリの内部処理で実行される場合もあります。このような場合、プログラムにとっては、マスタプロセスが通信とは関係ない処理を実行している最中に、突然、異常終了してしまうように見えます。

    • マスタ・ワーカ型ジョブで MPI通信関数を用いる場合は、ワーカプロセスの異常終了に伴ってマスタプロセスが異常終了しないように、以下に示す対処をしてください。これにより、マスタプロセスが異常終了する確率が低くなります。

      1. MPI_Comm_spawn() 関数の呼び出し後、FJMPI_Mswk_disconnect() 関数を呼び出します。
        MPI_Comm_spawn() 関数によって動的にプロセスを生成した場合、生成されたプロセスと MPI_Comm_spawn() 関数の呼び出し元プロセスの間は、通信が接続した状態になります。
        MPIの内部通信処理は、この通信が接続状態で発生し、切断された状態だと発生しません。このため、MPI_Comm_spawn() 呼び出し後に、FJMPI_Mswk_disconnect() を呼び出す必要があります。
      2. MPI通信関数の呼び出し前に FJMPI_Mswk_connect() または FJMPI_Mswk_accept() を呼び出し、MPI通信関数呼び出し後に FJMPI_Mswk_disconnect() を呼び出します。
        接続先プロセスが異常終了していた場合、FJMPI_Mswk_connect() 関数、FJMPI_Mswk_accept() 関数、およびFJMPI_Mswk_disconnect() 関数は、異常復帰するだけで、呼び出し元プロセスが異常終了することはありません。
        このため、以下の例のように MPI通信関数を呼び出す前に毎回 FJMPI_Mswk_connect() 関数を呼び出す必要があります。これにより、ワーカプロセスの異常がマスタプロセスに及ぼす影響を低減できます。
        //[例] マスタプロセスからワーカプロセスに接続する場合
        
        // マスタプロセス
        FJMPI_Mswk_connect(worker_port, …, &worker_comm);
        MPI_Send(…, worker_comm, …);
        FJMPI_Mswk_disconnect(&worker_comm);
        
        // ワーカプロセス
        FJMPI_Mswk_accept(worker_port, …, &master_comm);
        MPI_Recv(…, master_comm, …);
        FJMPI_Mswk_disconnect(&master_comm);
        

        上記手順に従わなかった場合、ワーカプロセスの異常終了後、またはワーカプロセスが動作しているノードのダウン後、任意のタイミングでマスタプロセスが異常終了する可能性があります。マスタプロセスが異常終了すると、mpiexecコマンドも異常終了します。ただし、ジョブスクリプトは実行を継続します。

5.7.6. システム異常時の影響

マスタ・ワーカ型ジョブの実行中にノードダウンなどシステムに起因する異常が起こった場合の影響について説明します。

5.7.6.1. ジョブの動作への影響

異常の内容によって、マスタ・ワーカ型ジョブは終了する場合と継続する場合があります。

  • マスタ・ワーカ型ジョブが終了するケース

    以下の場合は、ほかのジョブモデルと同様に、マスタ・ワーカ型ジョブは終了し、再度キューイングされます。

    • ジョブマスタノードがダウンした場合

    • マスタ・ワーカ型ジョブに割り当てた計算ノードで ICC または Port 故障が発生した場合

    • 管理者(クラスタ管理者)がマスタ・ワーカ型ジョブに割り当てたノードを運用から切り離す際に、ジョブを即時に終了させるように指定した場合

    • 共有テンポラリ領域や第2層ストレージのキャッシュ領域を使用するジョブに割り当てられたBIO/SIO/GIOがダウンした場合

  • マスタ・ワーカ型ジョブが継続するケース

    以下の場合は、マスタ・ワーカ型ジョブは継続します。ただし、これらに起因してワーカプロセスが異常になった場合は、ほかのノードでワーカプロセスを実行するなどの対処を、ユーザプログラム内で考慮する必要があります。

    • ジョブスレーブノードのジョブ運用ソフトウェアのサービス異常

    • ジョブスレーブノードのダウン

    • ジョブスレーブノードのハードウェア(CPU またはメモリ)の異常

参考

ジョブの終了原因がユーザ側にある場合(例:CPU時間などの資源制限値超過)、ジョブが再キューイングされるかどうかは、ほかのジョブモデルと同様に、ジョブ投入時の指定やジョブACL機能の設定によります。

5.7.6.2. ジョブ統計情報への影響

マスタ・ワーカ型ジョブの実行中にノードダウンが発生した場合、pjsub -s/-S や、pjstat -vで出力されるジョブ統計情報は以下のようになります。

項目

説明

PC、PJM CODE
(ジョブ終了コード)

ジョブマスタノードのダウンや、「富岳」の故障(ICC 異常)によってマスタ・ワーカ型ジョブが異常終了した場合、ジョブ統計情報のジョブ終了コードは通常ジョブの場合と同様になります。ジョブスレーブノードだけのダウンのように、マスタ・ワーカ型ジョブの継続に影響がない異常の場合は、マスタ・ワーカ型ジョブは最後まで実行されます。正常に終了した場合は、ジョブ終了コードは 0 になります。

REASON
(終了原因)

前項のジョブ終了コードと同様ですが、ジョブが正常終了した場合は "-" になります。

name(REQUIRE)
(要求資源量)

"NODE NUM (REQUIRE)" などの "(REQUIRE)" が付く項目は、ノードのダウンに関係なく、ジョブ投入時にユーザが指定した値になります。

name(ALLOC)
(割り当て資源量)

"NODE NUM (ALLOC)" などの "(ALLOC)" が付く項目は、ノードのダウンに関係なく、ジョブ投入時に決定した値になります。

name(USE))
(使用資源量))

"NODE NUM (USE)" などの "(USE)" が付く項目は、故障したノードの分は除外された値になります。

5.7.6.3. コマンドの表示への影響

ジョブ運用ソフトウェアが提供するpjshowrscコマンドは、計算機資源としてのノード数を表示できます。

マスタ・ワーカ型ジョブに割り当てられたノードがジョブ実行中にダウンした場合、ダウンしたノードは利用可能資源から除外されます。

  • 引数なしの場合

TOTAL と ALLOC の値が、ダウンしたノードの数だけ減ります。

[ジョブスレーブノードがダウンする前]

[_LNlogin]$ pjshowrsc
[ CLST: fugaku-comp ]
RSCUNIT          NODE
                 TOTAL   FREE  ALLOC
rscunit_ft01    158976  94269  64707

[ジョブスレーブノードのうち、1ノードがダウンした後]

[_LNlogin]$ pjshowrsc
[ CLST: fugaku-comp ]
RSCUNIT          NODE
                 TOTAL   FREE  ALLOC
rscunit_ft01    158975  94269  64706
  • -lオプションを指定した場合

各資源の TOTAL と ALLOC の値が、ダウンしたノードの分だけ減ります。

[ジョブスレーブノードがダウンする前]

[_LNlogin]$ pjshowrsc -l
[ CLST: fugaku-comp ]
[ RSCUNIT: rscunit_ft01 ]
     RSC    TOTAL    FREE   ALLOC
    node   158976   75231   83745
     cpu  7630848 3611088 4019760
     mem    4.4Pi   4.4Pi   672Gi

[ジョブスレーブノードのうち、1ノードがダウンした後]

[_LNlogin]$ pjshowrsc -l
[ CLST: fugaku-comp ]
[ RSCUNIT: rscunit_ft01 ]
     RSC    TOTAL    FREE   ALLOC
    node   158975   75231   83744
     cpu  7630800 3611088 4019712
     mem    4.4Pi   4.4Pi   672Gi