Stride Reference Manual  1.0
Runner.cpp
Go to the documentation of this file.
1 
2 #include "Runner.h"
3 
4 #include <iostream>
5 #include <exception>
6 #include <thread>
7 #include <cstddef>
8 #include <spdlog/spdlog.h>
9 #include "util/InstallDirs.h"
12 #include "sim/SimulatorSetup.h"
13 #include "sim/SimulatorBuilder.h"
14 #include "util/StringUtils.h"
15 #include "util/Stopwatch.h"
16 #include "output/CasesFile.h"
17 #include "output/PersonFile.h"
18 
19 using namespace stride;
20 using namespace util;
21 using namespace run;
22 using namespace std;
23 
24 namespace pt = boost::property_tree;
25 namespace fs = boost::filesystem;
26 
27 void Runner::setup() {
28  spdlog::set_async_mode(1048576);
29 }
30 
31 Runner::Runner(const vector<string>& overrides_list, const string& config_file,
32  const RunMode& mode, int timestep)
33  : m_config_file(config_file), m_mode(mode), m_uses_mpi(false), m_timestep(timestep), m_world_rank(0) {
34  for (const string& kv: overrides_list) {
35  vector<string> parts = StringUtils::split(kv, "=");
36  if (parts.size() != 2) {
37  throw runtime_error(string("Couldn't parse the override ") + kv);
38  }
39 
40  string key = string("run.") + StringUtils::trim(parts[0]);
41  key = StringUtils::replace(key, "@", "<xmlattr>.");
42  m_overrides[key] = StringUtils::trim(parts[1]);
43  }
44  fs::path base_dir = InstallDirs::getOutputDir();
45  parseConfig();
46  m_output_dir = base_dir / m_name;
47 }
48 
50  pt::read_xml(m_config_file, m_config, pt::xml_parser::trim_whitespace);
51  for (auto& override: m_overrides) {
52  m_config.put(override.first, override.second);
53  }
54  bool has_raw_population = false;
55  for (auto& it: m_config.get_child("run.regions")) {
56  if (it.first == "region") {
57  pt::ptree& region_config = it.second;
58  string name = region_config.get<string>("<xmlattr>.name");
59  m_region_configs[name] = region_config;
60  m_region_order.push_back(name);
61 
62  if (region_config.count("raw_population") != 0) {
63  has_raw_population = true;
64  }
65  }
66  }
67  if (m_region_configs.size() == 0) {
68  throw runtime_error("You need at least one region");
69  } else if (m_region_configs.size() > 1 && has_raw_population) {
70  throw runtime_error(
71  "One of the regions does not contain the necessary information to work in a multi region environment (districts, cities, ...)");
72  }
73  m_travel_schedule = m_config.get<string>("run.regions.<xmlattr>.travel_schedule", "");
74  m_config.get_child("run").erase("regions");
75 
76  m_name = m_config.get<string>("run.<xmlattr>.name");
77 }
78 
80  cout << "Configuration info:" << endl;
81  cout << " - name: " << m_name << endl;
82  // TODO: I don't know what kind of info would be really useful to print
83  cout << " - regions:" << endl;
84  for (auto& it: m_region_configs) {
85  boost::optional<string> remote = it.second.get_optional<string>("remote");
86  if (remote)
87  cout << " - '" << it.first << "' running at " << remote << endl;
88  else
89  cout << " - '" << it.first << "' running locally" << endl;
90  }
91  cout << endl;
92 }
93 
95  int i = 1;
96 
97  // Create output dir
98  auto base_dir = InstallDirs::getOutputDir();
99  auto output_dir = base_dir / m_name;
100  bool fresh = fs::create_directory(output_dir);
101  if (fresh) {
102  cout << "--> Created new output directory at " << output_dir << "." << endl;
103  } else {
104  cout << "--> Using existing output directory at " << output_dir << ", will overwrite." << endl;
105  }
106 
107  for (auto& it: m_region_configs) {
108  boost::optional<string> remote = it.second.get_optional<string>("remote");
109  if (remote) {
110  #ifdef MPI_USED
111  initMpi();
112  #else
113  throw runtime_error("MPI support is not enabled in this build");
114  #endif
115  break;
116  }
117  }
118 
119  for (auto& it: m_region_configs) {
120  cout << "\r--> Initializing simulators [" << i << "/" << m_region_configs.size() << "]";
121  i++;
122  cout.flush();
123 
124  boost::optional<string> remote = it.second.get_optional<string>("remote");
125  pt::ptree sim_config = getRegionsConfig({it.first});
126  string sim_name = sim_config.get<string>("run.regions.region.<xmlattr>.name");
127  if (m_is_master) {
128  if (not remote) {
129  addLocalSimulator(sim_name, sim_config);
130  } else {
131  addRemoteSimulator(sim_name, sim_config);
132  }
133  } else {
134  if (not remote) {
135  // This is a simulator running at the master
136  // TODO: get Master's contact info?
137  // Then, do MPI stuff
138  addRemoteSimulator(sim_name, sim_config);
139  } else {
140  if (sim_name == "TODO get process ID") {
141  // This is our (unique) local simulator
142  addLocalSimulator(sim_name, sim_config);
143  } else {
144  // This is just another remote simulator
145  addRemoteSimulator(sim_name, sim_config);
146  }
147  }
148  }
149  }
150 
151  std::map<string, AsyncSimulator*> comm_map;
152  for (auto& it: m_async_simulators) comm_map[it.first] = it.second.get();
153  for (auto& it: m_local_simulators) {
154  it.second->setCommunicationMap(comm_map);
155  }
156 
157  cout << endl;
158 
159  if (m_uses_mpi and m_local_simulators.size() > 1) {
160  throw runtime_error("You can't have multiple simulators in one system when working with MPI");
161  }
162 
163  // Also set up the Coordinator
164  // TODO allow a single simulator without schedule
165  if (m_world_rank == 0) m_coord = make_shared<Coordinator>(m_async_simulators, m_travel_schedule, m_config);
166 }
167 
168 shared_ptr<Simulator> Runner::addLocalSimulator(const string& name, const boost::property_tree::ptree& config) {
169  auto sim = SimulatorBuilder::build(config);
170  sim->setName(config.get<string>("run.regions.region.<xmlattr>.name"));
171 
173  // adjust the state of the simulator
174  std::string pathStr = hdf5Path(name).string();
175  Hdf5Loader loader = Hdf5Loader(pathStr.c_str());
176 
177  int timestep = m_mode == RunMode::Extend ?
178  loader.getLastSavedTimestep() : m_timestep;
179 
180  loader.loadFromTimestep(timestep, sim);
181  }
182 
183  initOutputs(*sim.get());
184  m_local_simulators[name] = sim;
185  m_async_simulators[name] = make_shared<LocalSimulatorAdapter>(sim);
186  return sim;
187 }
188 
190 #ifdef MPI_USED
191  if (not m_uses_mpi) {
192  int provided = 0;
193  MPI_Init_thread(NULL, NULL, MPI_THREAD_SERIALIZED, &provided);
194  if (provided != MPI_THREAD_SERIALIZED) throw runtime_error("We need serialized thread support in MPI");
195  MPI_Comm_rank(MPI_COMM_WORLD, &m_world_rank);
196  MPI_Comm_size(MPI_COMM_WORLD, &m_world_size);
197  m_uses_mpi = true;
198  makeSetupStruct();
199  m_is_master = (m_world_rank == 0);
200 
201  char processor_name[MPI_MAX_PROCESSOR_NAME];
202  int name_len;
203  MPI_Get_processor_name(processor_name, &name_len);
204  m_processor_name = std::string(processor_name);
205 
206  vector<SimulatorWorldrank> worldranks;
207  for (auto& it: m_region_configs) {
208  boost::optional<string> remote = it.second.get_optional<string>("remote");
209  if (remote) {
210  if (remote.get() == m_processor_name) {
211  SimulatorWorldrank setup {it.second.get<string>("<xmlattr>.name"), m_world_rank};
212  MPI_Send(&setup, 1, MPI_INT, 0, 30, MPI_COMM_WORLD); // Tag 30 = setup related MPI message
213  }
214  } else if (m_is_master) {
215  worldranks.emplace_back(it.second.get<string>("<xmlattr>.name"), 0);
216  }
217  }
218 
219  if (m_is_master) {
220  // TODO Anthony: collect all SimulatorWorldranks into worldranks
221  for (int i = 1; i < m_world_size; i++) {
222  SimulatorWorldrank data {"", 0};
223  MPI_Recv(&data, 21, m_setup_message, i, 30, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
224  worldranks.push_back(data);
225  }
226  for (SimulatorWorldrank& swr: worldranks) {
227  char* name = swr.simulator_name;
228  int rank = swr.world_rank;
229  bool unique_name = m_worldranks.left.find(name) == m_worldranks.left.end();
230  bool unique_rank = m_worldranks.right.find(rank) == m_worldranks.right.end();
231  if (unique_name and unique_rank) {
232  m_worldranks.insert(decltype(m_worldranks)
233  ::value_type(std::string(name), rank));
234  }
235  }
236 
237  if (m_worldranks.size() != m_region_configs.size()) {
238  throw runtime_error(to_string(m_worldranks.size()) + " simulators but "
239  + to_string(m_region_configs.size()) + " regions.");
240  }
241 
242  for (int i = 0; i < m_world_size; i++) {
243  for (auto iter = m_worldranks.begin(), iend = m_worldranks.end(); iter != iend; ++iter) {
244  SimulatorWorldrank data {iter->left, iter->right};
245  MPI_Send(&data, 21, m_setup_message, i, 31,
246  MPI_COMM_WORLD); // Tag 31 = send worldranks to each node
247  }
248  }
249  } else {
250  for (int i = 0; i < m_world_size; i++) {
251  if (i == m_world_size) continue;
252  SimulatorWorldrank data {"", 0};
253  MPI_Recv(&data, 21, m_setup_message, i, 31, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
254  }
255  }
256  }
257 #endif
258 }
259 
261 #ifdef MPI_USED
262  MPI_Datatype type[2] = {MPI_CHAR, MPI_INT};
264  int blocklen[2] = {20, 1};
266  MPI_Aint disp[2];
267  disp[0] = offsetof(SimulatorWorldrank, simulator_name);
268  disp[1] = offsetof(SimulatorWorldrank, world_rank);
270  MPI_Type_create_struct(2, blocklen, disp, type, &m_setup_message);
271  MPI_Type_commit(&m_setup_message);
272 #endif
273 }
274 
275 shared_ptr<AsyncSimulator> Runner::addRemoteSimulator(const string& name, const boost::property_tree::ptree& config) {
276  // TODO: DO MPI STUFF
277  boost::optional<string> remote = config.get_optional<string>("run.regions.region.remote");
278  m_async_simulators[name] = make_shared<RemoteSimulatorSender>(name, stoi(remote.get()));
279 }
280 
282  // There is a difference between the outputs per Simulator and the ones for everything
283  // So far, we only have output per Simulator
284 
285  // Logs (we had to refactor some stuff for this)
286  sim.m_logger = spdlog::rotating_logger_mt(sim.getName() + "_logger",
287  (m_output_dir / (sim.getName() + "_log.txt")).string(),
288  std::numeric_limits<size_t>::max(),
289  std::numeric_limits<size_t>::max());
290 
291  sim.m_logger->set_pattern("%v"); // Remove meta data from log => time-stamp of logging
292  // Log level already set
293 
294  auto checkpointing = m_config.get_child_optional("run.outputs.checkpointing");
295  if (checkpointing) {
296  int freq = checkpointing.get().get<int>("<xmlattr>.frequency");
297  auto saver = make_shared<Hdf5Saver>(hdf5Path(sim.getName()).string().c_str(), sim.m_config_pt,
298  freq, m_mode, m_timestep);
299  auto fn = std::bind(&Hdf5Saver::update, saver, std::placeholders::_1);
300  sim.registerObserver(saver, fn);
301  m_hdf5_savers[sim.getName()] = saver;
302 
303  // initial save
304  if (!(m_mode == RunMode::Extend && m_timestep != 0)) {
305  saver->forceSave(sim);
306  }
307  }
308 
309  auto visualization = m_config.get_child_optional("run.outputs.visualization");
310  if (visualization) {
311  // TODO Vis output, how does it work?
312  // We need to save to m_output_dir / ...<something that makes sense in the context of visualisation>...
313  // See hdf5Path for inspiration, but since we only consider output (whereas hdf5 is also input) there's
314  // no need to write a separate method for it.
315  std::string vis_output_dir = fs::system_complete(m_output_dir / (string("vis_") + sim.getName())).string();
316  auto vis_saver = make_shared<ClusterSaver>("vis_output", "vis_pop_output", "vis_facility_output",
317  vis_output_dir);
318  auto fn = bind(&ClusterSaver::update, vis_saver, std::placeholders::_1);
319  sim.registerObserver(vis_saver, fn);
320  vis_saver->update(sim);
321  m_vis_savers[sim.getName()] = vis_saver;
322  }
323 }
324 
325 void Runner::run() {
326  map<string, vector<unsigned int>> cases;
327  if (m_is_master) {
328  Stopwatch<> run_clock("run_clock");
329 
330  if (m_uses_mpi)
331  cout << "--> " << m_processor_name << " is running,";
332  else
333  cout << "--> We are running locally,";
334  cout << " printing infected/adopted." << endl;
335  int num_days = m_config.get<int>("run.num_days");
336  cout << endl << " day | ";
337  int total = 0;
338  for (auto& it: m_region_configs) {
339  cout << it.first;
340  for (int i = 15 - it.first.size(); i > 0; i--) cout << ' ';
341  cout << " | ";
342  }
343 
344  cout << endl << "-----+";
345  for (auto& it: m_region_configs) {
346  cout << "-----------------+";
347  }
348  cout << endl;
349 
350  int start_day = 0;
351  if (m_mode == RunMode::Replay) {
352  start_day += m_timestep;
353  }
354 
355  for (int day = start_day; day < m_timestep + num_days; day++) {
356  cout << setw(4) << day << " | ";
357  // Assumes same order!
358  vector<SimulatorStatus> results = m_coord->timeStep();
359  int i = 0;
360 
361  for (auto& it: m_async_simulators) {
362  cout << setw(7) << results[i].infected << " " << setw(7) << results[i].adopted << " | ";
363  cases[it.first].push_back(results[i].infected);
364  i++;
365  }
366  cout << endl;
367  }
368  } else {
369  cout << m_processor_name << " awaits messages." << endl;
370  }
371 
372 #ifdef MPI_USED
373  // Close the MPI environment properly
374  if (m_uses_mpi) {
375  if (m_world_rank == 0) {
376  // Send message from system 0 (coordinator) to all other systems to terminate their listening thread
377  for (int i = 0; i < m_world_size; i++) MPI_Send(nullptr, 0, MPI_INT, i, 10, MPI_COMM_WORLD);
378  }
379  m_listen_thread.join(); // Join and terminate listening thread
380  MPI_Finalize();
381  }
382 #endif
383 
384  // More output!
385  // TODO only save at last timestep if freq == 0
386  // for (auto& it: m_hdf5_savers) {
387  // Simulator& sim = *m_local_simulators[it.first];
388  // it.second->forceSave(sim, m_timestep + num_days);
389  // }
390 
391  auto cases_conf = m_config.get_child_optional("run.outputs.cases");
392  auto person_conf = m_config.get_child_optional("run.outputs.persons");
393 
394  for (auto& it: m_local_simulators) {
395  if (cases_conf) {
396  output::CasesFile((m_output_dir / (string("cases_") + it.first)).string())
397  .print(cases[it.first]);
398  }
399 
400  if (person_conf) {
401  output::PersonFile((m_output_dir / (string("persons_") + it.first)).string())
402  .print(it.second->getPopulation());
403  }
404  }
405 }
406 
407 pt::ptree Runner::getConfig() {
409 }
410 
411 pt::ptree Runner::getRegionsConfig(const std::vector<string>& names) {
412  pt::ptree regions;
413  for (const string& name: names) {
414  regions.add_child("region", m_region_configs[name]); // includes the name
415  }
416  pt::ptree new_config = m_config;
417  new_config.add_child("run.regions", regions);
418  return new_config;
419 }
420 
421 fs::path Runner::hdf5Path(const string& name) {
422  return fs::system_complete(m_output_dir / (string("cp_") + name + ".h5"));
423 }
std::string m_name
Definition: Runner.h:101
string m_processor_name
Definition: Runner.h:85
static std::string replace(std::string source, std::string from, std::string to)
Replace all occurences of a string with another.
Definition: StringUtils.h:129
static boost::filesystem::path getOutputDir()
Utility method: get the path to the output directory.
Interface for install directory queries.
std::shared_ptr< spdlog::logger > m_logger
Definition: Simulator.h:177
std::map< std::string, std::string > m_overrides
Definition: Runner.h:73
Produces a file with daily cases count.
Definition: CasesFile.h:32
boost::filesystem::path m_output_dir
Definition: Runner.h:102
std::map< std::string, std::shared_ptr< Hdf5Saver > > m_hdf5_savers
Definition: Runner.h:105
std::map< std::string, shared_ptr< AsyncSimulator > > m_async_simulators
Definition: Runner.h:97
void loadFromTimestep(unsigned int timestep, shared_ptr< Simulator > sim) const
Load from timestep, if the specified timestep is present in the hdf5 file.
Definition: Hdf5Loader.h:93
void initOutputs(Simulator &sim)
Definition: Runner.cpp:281
void registerObserver(const std::shared_ptr< U > &u, CallbackType f)
Definition: Subject.h:44
Time Dependent Person DataType.
Definition: NoBehaviour.h:17
std::shared_ptr< Coordinator > m_coord
Definition: Runner.h:98
Conversion from or to string.
virtual void update(const Simulator &sim)
Definition: ClusterSaver.h:30
boost::filesystem::path hdf5Path(const string &name)
Definition: Runner.cpp:421
virtual void update(const Simulator &sim)
Update function which is called by the subject.
Definition: Hdf5Saver.h:87
Setup for the simulator and configuration tree.
std::map< std::string, std::shared_ptr< ClusterSaver > > m_vis_savers
Definition: Runner.h:106
void parseConfig()
Definition: Runner.cpp:49
Provides a stopwatch interface to time: it accumulates time between start/stop pairs.
Definition: Stopwatch.h:33
static void setup()
Definition: Runner.cpp:27
std::map< std::string, boost::property_tree::ptree > m_region_configs
Definition: Runner.h:92
Header for the CasesFile class.
std::map< std::string, shared_ptr< Simulator > > m_local_simulators
Definition: Runner.h:95
Header for the SimulatorBuilder class.
std::string m_travel_schedule
Definition: Runner.h:103
std::vector< std::string > m_region_order
Definition: Runner.h:93
void initSimulators()
Definition: Runner.cpp:94
static std::string trim(std::string const &source, std::string const &t=" ")
Trim characters at both ends of string.
Definition: StringUtils.h:123
Definition of Stopwatch.
void makeSetupStruct()
Definition: Runner.cpp:260
thread m_listen_thread
Definition: Runner.h:84
STL namespace.
boost::property_tree::ptree getConfig()
Definition: Runner.cpp:407
Main class that contains and direct the virtual world.
Definition: Simulator.h:64
std::string m_config_file
Definition: Runner.h:74
Produces a file with daily cases count.
Definition: PersonFile.h:35
std::shared_ptr< Simulator > addLocalSimulator(const string &name, const boost::property_tree::ptree &config)
Definition: Runner.cpp:168
Header for the PersonFile class.
static std::shared_ptr< Simulator > build(const boost::property_tree::ptree &pt_config)
Build simulator.
boost::property_tree::ptree getRegionsConfig(const std::vector< string > &names)
Definition: Runner.cpp:411
boost::property_tree::ptree m_config
Definition: Runner.h:91
string getName() const
Definition: Simulator.h:87
std::shared_ptr< AsyncSimulator > addRemoteSimulator(const string &name, const boost::property_tree::ptree &config)
Definition: Runner.cpp:275
static std::vector< std::string > split(const std::string &str, const std::string &delimiters)
Split a string (in order of occurence) by splitting it on the given delimiters.
Definition: StringUtils.h:49
boost::bimap< string, int > m_worldranks
Definition: Runner.h:86
unsigned int getLastSavedTimestep() const
Retrieves the last saved timestep index in the hdf5 file.
Definition: Hdf5Loader.h:105
RunMode m_mode
Definition: Runner.h:75
boost::property_tree::ptree m_config_pt
Configuration property tree.
Definition: Simulator.h:175