SimGrid  3.7
Scalable simulation of distributed systems
Asynchronous communications
MSG Examples

Simulation of asynchronous communications between a sender and a receiver using a realistic platform and an external description of the deployment.

Table of contents:


Code of the application

Preliminary declarations

#include <stdio.h>
#include "msg/msg.h"            /* Yeah! If you want to use msg, you need to include msg/msg.h */
#include "xbt/sysdep.h"         /* calloc, printf */

/* Create a log channel to have nice outputs. */
#include "xbt/log.h"
#include "xbt/asserts.h"
XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test,
                             "Messages specific for this msg example");

int sender(int argc, char *argv[]);
int receiver(int argc, char *argv[]);

MSG_error_t test_all(const char *platform_file,
                     const char *application_file);

Sender function

The sender send to a receiver an asynchronous message with the function "MSG_task_isend()". Cause this function is non-blocking we have to make "MSG_comm_test()" to know if the communication is finished for finally destroy it with function "MSG_comm_destroy()". It also available to "make MSG_comm_wait()" which make both of them.

C style arguments (argc/argv) are interpreted as:

int sender(int argc, char *argv[])
{
  long number_of_tasks = atol(argv[1]);
  double task_comp_size = atof(argv[2]);
  double task_comm_size = atof(argv[3]);
  long receivers_count = atol(argv[4]);
  double sleep_start_time = atof(argv[5]);
  double sleep_test_time = atof(argv[6]);

  XBT_INFO("sleep_start_time : %f , sleep_test_time : %f", sleep_start_time,
        sleep_test_time);

  msg_comm_t comm = NULL;
  int i;
  m_task_t task = NULL;
  MSG_process_sleep(sleep_start_time);
  for (i = 0; i < number_of_tasks; i++) {
    char mailbox[256];
    char sprintf_buffer[256];

    sprintf(mailbox, "receiver-%ld", i % receivers_count);
    sprintf(sprintf_buffer, "Task_%d", i);

    task =
        MSG_task_create(sprintf_buffer, task_comp_size, task_comm_size,
                        NULL);
    comm = MSG_task_isend(task, mailbox);
    XBT_INFO("Send to receiver-%ld Task_%d", i % receivers_count, i);

    if (sleep_test_time == 0) {
      MSG_comm_wait(comm, -1);
    } else {
      while (MSG_comm_test(comm) == 0) {
        MSG_process_sleep(sleep_test_time);
      };
      MSG_comm_destroy(comm);
    }

  }

  for (i = 0; i < receivers_count; i++) {
    char mailbox[80];
    sprintf(mailbox, "receiver-%ld", i % receivers_count);
    task = MSG_task_create("finalize", 0, 0, 0);
    comm = MSG_task_isend(task, mailbox);
    XBT_INFO("Send to receiver-%ld finalize", i % receivers_count);
    if (sleep_test_time == 0) {
      MSG_comm_wait(comm, -1);
    } else {
      while (MSG_comm_test(comm) == 0) {
        MSG_process_sleep(sleep_test_time);
      };
      MSG_comm_destroy(comm);
    }

  }

  XBT_INFO("Goodbye now!");
  return 0;
}                               /* end_of_sender */

Receiver function

This function executes tasks when it receives them. As the receiving is asynchronous we have to test the communication to know if it is completed or not with "MSG_comm_test()" or wait for the completion "MSG_comm_wait()".

C style arguments (argc/argv) are interpreted as:

int receiver(int argc, char *argv[])
{
  m_task_t task = NULL;
  _XBT_GNUC_UNUSED MSG_error_t res;
  int id = -1;
  char mailbox[80];
  msg_comm_t res_irecv;
  double sleep_start_time = atof(argv[2]);
  double sleep_test_time = atof(argv[3]);
  XBT_INFO("sleep_start_time : %f , sleep_test_time : %f", sleep_start_time,
        sleep_test_time);

  _XBT_GNUC_UNUSED int read;
  read = sscanf(argv[1], "%d", &id);
  xbt_assert(read,
              "Invalid argument %s\n", argv[1]);

  MSG_process_sleep(sleep_start_time);

  sprintf(mailbox, "receiver-%d", id);
  while (1) {
    res_irecv = MSG_task_irecv(&(task), mailbox);
    XBT_INFO("Wait to receive a task");

    if (sleep_test_time == 0) {
      res = MSG_comm_wait(res_irecv, -1);
      xbt_assert(res == MSG_OK, "MSG_task_get failed");
    } else {
      while (MSG_comm_test(res_irecv) == 0) {
        MSG_process_sleep(sleep_test_time);
      };
      MSG_comm_destroy(res_irecv);
    }

    XBT_INFO("Received \"%s\"", MSG_task_get_name(task));
    if (!strcmp(MSG_task_get_name(task), "finalize")) {
      MSG_task_destroy(task);
      break;
    }

    XBT_INFO("Processing \"%s\"", MSG_task_get_name(task));
    MSG_task_execute(task);
    XBT_INFO("\"%s\" done", MSG_task_get_name(task));
    MSG_task_destroy(task);
    task = NULL;
  }
  XBT_INFO("I'm done. See you!");
  return 0;
}                               /* end_of_receiver */

Simulation core

This function is the core of the simulation and is divided only into 3 parts thanks to MSG_create_environment() and MSG_launch_application().

  1. Simulation settings : MSG_create_environment() creates a realistic environment
  2. Application deployment : create the processes on the right locations with MSG_launch_application()
  3. The simulation is run with MSG_main()

Its arguments are:

MSG_error_t test_all(const char *platform_file,
                     const char *application_file)
{
  MSG_error_t res = MSG_OK;

  /* MSG_config("workstation/model","KCCFLN05"); */
  {                             /*  Simulation setting */
    MSG_create_environment(platform_file);
  }
  {                             /*   Application deployment */
    MSG_function_register("sender", sender);
    MSG_function_register("receiver", receiver);
    MSG_launch_application(application_file);
  }
  res = MSG_main();

  XBT_INFO("Simulation time %g", MSG_get_clock());
  return res;
}                               /* end_of_test_all */


Main function

This initializes MSG, runs a simulation, and free all data-structures created by MSG.

int main(int argc, char *argv[])
{
  MSG_error_t res = MSG_OK;

  MSG_global_init(&argc, argv);
  if (argc < 3) {
    printf("Usage: %s platform_file deployment_file\n", argv[0]);
    printf("example: %s msg_platform.xml msg_deployment.xml\n", argv[0]);
    exit(1);
  }
  res = test_all(argv[1], argv[2]);
  MSG_clean();

  if (res == MSG_OK)
    return 0;
  else
    return 1;
}                               /* end_of_main */

Waitall function for sender

The use of this function permit to send all messages and wait for the completion of all in one time.

int sender(int argc, char *argv[])
{
  long number_of_tasks = atol(argv[1]);
  double task_comp_size = atof(argv[2]);
  double task_comm_size = atof(argv[3]);
  long receivers_count = atol(argv[4]);

  msg_comm_t *comm = xbt_new(msg_comm_t, number_of_tasks + receivers_count);
  int i;
  m_task_t task = NULL;
  for (i = 0; i < number_of_tasks; i++) {
    char mailbox[256];
    char sprintf_buffer[256];
    sprintf(mailbox, "receiver-%ld", i % receivers_count);
    sprintf(sprintf_buffer, "Task_%d", i);
    task =
        MSG_task_create(sprintf_buffer, task_comp_size, task_comm_size,
                        NULL);
    comm[i] = MSG_task_isend(task, mailbox);
    XBT_INFO("Send to receiver-%ld Task_%d", i % receivers_count, i);
  }
  for (i = 0; i < receivers_count; i++) {
    char mailbox[80];
    sprintf(mailbox, "receiver-%ld", i % receivers_count);
    task = MSG_task_create("finalize", 0, 0, 0);
    comm[i + number_of_tasks] = MSG_task_isend(task, mailbox);
    XBT_INFO("Send to receiver-%ld finalize", i % receivers_count);

  }
  /* Here we are waiting for the completion of all communications */
  MSG_comm_waitall(comm, (number_of_tasks + receivers_count), -1);

  XBT_INFO("Goodbye now!");
  xbt_free(comm);
  return 0;
}                               /* end_of_sender */

Waitany function

The MSG_comm_waitany() function return the place of the first message send or receive from a xbt_dynar_t table.

From a sender

We can use this function to wait all sent messages.

int sender(int argc, char *argv[])
{
  long number_of_tasks = atol(argv[1]);
  double task_comp_size = atof(argv[2]);
  double task_comm_size = atof(argv[3]);
  long receivers_count = atol(argv[4]);
  int diff_com = atol(argv[5]);
  double coef = 0;
  xbt_dynar_t d = xbt_dynar_new(sizeof(msg_comm_t), NULL);
  int i;
  m_task_t task;
  char mailbox[256];
  char sprintf_buffer[256];
  msg_comm_t comm;

  for (i = 0; i < number_of_tasks; i++) {
    if (diff_com == 0)
      coef = 1;
    else
      coef = (i + 1);

    sprintf(mailbox, "receiver-%ld", (i % receivers_count));
    sprintf(sprintf_buffer, "Task_%d", i);
    task =
        MSG_task_create(sprintf_buffer, task_comp_size,
                        task_comm_size / coef, NULL);
    comm = MSG_task_isend(task, mailbox);
    xbt_dynar_push_as(d, msg_comm_t, comm);
    XBT_INFO("Send to receiver-%ld %s comm_size %f", i % receivers_count,
          sprintf_buffer, task_comm_size / coef);
  }
  /* Here we are waiting for the completion of all communications */

  while (!xbt_dynar_is_empty(d)) {
    xbt_dynar_remove_at(d, MSG_comm_waitany(d), &comm);
    MSG_comm_destroy(comm);
  }
  xbt_dynar_free(&d);

  /* Here we are waiting for the completion of all tasks */
  sprintf(mailbox, "finalize");

  msg_comm_t res_irecv;
  _XBT_GNUC_UNUSED MSG_error_t res_wait;
  for (i = 0; i < receivers_count; i++) {
    task = NULL;
    res_irecv = MSG_task_irecv(&(task), mailbox);
    res_wait = MSG_comm_wait(res_irecv, -1);
    xbt_assert(res_wait == MSG_OK, "MSG_comm_wait failed");
    MSG_comm_destroy(res_irecv);
    MSG_task_destroy(task);
  }

  XBT_INFO("Goodbye now!");
  return 0;
}                               /* end_of_sender */

From a receiver

We can also wait for the arrival of all messages.

int receiver(int argc, char *argv[])
{
  int id = -1;
  int i;
  char mailbox[80];
  xbt_dynar_t comms = xbt_dynar_new(sizeof(msg_comm_t), NULL);
  int tasks = atof(argv[2]);
  m_task_t *task = xbt_new(m_task_t, tasks);

  _XBT_GNUC_UNUSED int read;
  read = sscanf(argv[1], "%d", &id);
  xbt_assert(read, "Invalid argument %s\n", argv[1]);
  sprintf(mailbox, "receiver-%d", id);
  MSG_process_sleep(10);
  msg_comm_t res_irecv;
  for (i = 0; i < tasks; i++) {
    XBT_INFO("Wait to receive task %d", i);
    task[i] = NULL;
    res_irecv = MSG_task_irecv(&task[i], mailbox);
    xbt_dynar_push_as(comms, msg_comm_t, res_irecv);
  }

  /* Here we are waiting for the receiving of all communications */
  m_task_t task_com;
  while (!xbt_dynar_is_empty(comms)) {
    _XBT_GNUC_UNUSED MSG_error_t err;
    xbt_dynar_remove_at(comms, MSG_comm_waitany(comms), &res_irecv);
    task_com = MSG_comm_get_task(res_irecv);
    MSG_comm_destroy(res_irecv);
    XBT_INFO("Processing \"%s\"", MSG_task_get_name(task_com));
    MSG_task_execute(task_com);
    XBT_INFO("\"%s\" done", MSG_task_get_name(task_com));
    err = MSG_task_destroy(task_com);
    xbt_assert(err == MSG_OK, "MSG_task_destroy failed");
  }
  xbt_dynar_free(&comms);
  xbt_free(task);

  /* Here we tell to sender that all tasks are done */
  sprintf(mailbox, "finalize");
  res_irecv = MSG_task_isend(MSG_task_create(NULL, 0, 0, NULL), mailbox);
  MSG_comm_wait(res_irecv, -1);
  MSG_comm_destroy(res_irecv);
  XBT_INFO("I'm done. See you!");
  return 0;
}                               /* end_of_receiver */


Back to the main Simgrid Documentation page The version of SimGrid documented here is v3.7.
Documentation of other versions can be found in their respective archive files (directory doc/html).
Generated by doxygen