A Discrete-Event Network Simulator
API
distributed-simulator-impl.cc
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * This program is free software; you can redistribute it and/or modify
4  * it under the terms of the GNU General Public License version 2 as
5  * published by the Free Software Foundation;
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15  *
16  * Author: George Riley <riley@ece.gatech.edu>
17  *
18  */
19 
28 #include "mpi-interface.h"
29 
30 #include "ns3/simulator.h"
31 #include "ns3/scheduler.h"
32 #include "ns3/event-impl.h"
33 #include "ns3/channel.h"
34 #include "ns3/node-container.h"
35 #include "ns3/ptr.h"
36 #include "ns3/pointer.h"
37 #include "ns3/assert.h"
38 #include "ns3/log.h"
39 
40 #include <mpi.h>
41 #include <cmath>
42 
43 namespace ns3 {
44 
45 NS_LOG_COMPONENT_DEFINE ("DistributedSimulatorImpl");
46 
47 NS_OBJECT_ENSURE_REGISTERED (DistributedSimulatorImpl);
48 
50 {
51 }
52 
53 Time
55 {
56  return m_smallestTime;
57 }
58 
59 uint32_t
61 {
62  return m_txCount;
63 }
64 
65 uint32_t
67 {
68  return m_rxCount;
69 }
70 uint32_t
72 {
73  return m_myId;
74 }
75 
76 bool
78 {
79  return m_isFinished;
80 }
81 
88 
89 TypeId
91 {
92  static TypeId tid = TypeId ("ns3::DistributedSimulatorImpl")
94  .SetGroupName ("Mpi")
95  .AddConstructor<DistributedSimulatorImpl> ()
96  ;
97  return tid;
98 }
99 
101 {
102  NS_LOG_FUNCTION (this);
103 
106 
107  // Allocate the LBTS message buffer
109  m_grantedTime = Seconds (0);
110 
111  m_stop = false;
112  m_globalFinished = false;
115  m_currentTs = 0;
118  m_eventCount = 0;
119  m_events = 0;
120 }
121 
123 {
124  NS_LOG_FUNCTION (this);
125 }
126 
127 void
129 {
130  NS_LOG_FUNCTION (this);
131 
132  while (!m_events->IsEmpty ())
133  {
134  Scheduler::Event next = m_events->RemoveNext ();
135  next.impl->Unref ();
136  }
137  m_events = 0;
138  delete [] m_pLBTS;
140 }
141 
142 void
144 {
145  NS_LOG_FUNCTION (this);
146 
147  while (!m_destroyEvents.empty ())
148  {
149  Ptr<EventImpl> ev = m_destroyEvents.front ().PeekEventImpl ();
150  m_destroyEvents.pop_front ();
151  NS_LOG_LOGIC ("handle destroy " << ev);
152  if (!ev->IsCancelled ())
153  {
154  ev->Invoke ();
155  }
156  }
157 
159 }
160 
161 
162 void
164 {
165  NS_LOG_FUNCTION (this);
166 
167  /* If runnning sequential simulation can ignore lookahead */
168  if (MpiInterface::GetSize () <= 1)
169  {
170  m_lookAhead = Seconds (0);
171  }
172  else
173  {
175  for (NodeContainer::Iterator iter = c.Begin (); iter != c.End (); ++iter)
176  {
177  if ((*iter)->GetSystemId () != MpiInterface::GetSystemId ())
178  {
179  continue;
180  }
181 
182  for (uint32_t i = 0; i < (*iter)->GetNDevices (); ++i)
183  {
184  Ptr<NetDevice> localNetDevice = (*iter)->GetDevice (i);
185  // only works for p2p links currently
186  if (!localNetDevice->IsPointToPoint ())
187  {
188  continue;
189  }
190  Ptr<Channel> channel = localNetDevice->GetChannel ();
191  if (channel == 0)
192  {
193  continue;
194  }
195 
196  // grab the adjacent node
197  Ptr<Node> remoteNode;
198  if (channel->GetDevice (0) == localNetDevice)
199  {
200  remoteNode = (channel->GetDevice (1))->GetNode ();
201  }
202  else
203  {
204  remoteNode = (channel->GetDevice (0))->GetNode ();
205  }
206 
207  // if it's not remote, don't consider it
208  if (remoteNode->GetSystemId () == MpiInterface::GetSystemId ())
209  {
210  continue;
211  }
212 
213  // compare delay on the channel with current value of
214  // m_lookAhead. if delay on channel is smaller, make
215  // it the new lookAhead.
216  TimeValue delay;
217  channel->GetAttribute ("Delay", delay);
218 
219  if (delay.Get () < m_lookAhead)
220  {
221  m_lookAhead = delay.Get ();
222  }
223  }
224  }
225  }
226 
227  // m_lookAhead is now set
229 
230  /*
231  * Compute the maximum inter-task latency and use that value
232  * for tasks with no inter-task links.
233  *
234  * Special processing for edge cases. For tasks that have no
235  * nodes need to determine a reasonable lookAhead value. Infinity
236  * would work correctly but introduces a performance issue; tasks
237  * with an infinite lookAhead would execute all their events
238  * before doing an AllGather resulting in very bad load balance
239  * during the first time window. Since all tasks participate in
240  * the AllGather it is desirable to have all the tasks advance in
241  * simulation time at a similar rate assuming roughly equal events
242  * per unit of simulation time in order to equalize the amount of
243  * work per time window.
244  */
245  long sendbuf;
246  long recvbuf;
247 
248  /* Tasks with no inter-task links do not contribute to max */
250  {
251  sendbuf = 0;
252  }
253  else
254  {
255  sendbuf = m_lookAhead.GetInteger ();
256  }
257 
258  MPI_Allreduce (&sendbuf, &recvbuf, 1, MPI_LONG, MPI_MAX, MpiInterface::GetCommunicator ());
259 
260  /* For nodes that did not compute a lookahead use max from ranks
261  * that did compute a value. An edge case occurs if all nodes have
262  * no inter-task links (max will be 0 in this case). Use infinity so all tasks
263  * will proceed without synchronization until a single AllGather
264  * occurs when all tasks have finished.
265  */
266  if (m_lookAhead == GetMaximumSimulationTime () && recvbuf != 0)
267  {
268  m_lookAhead = Time (recvbuf);
270  }
271 }
272 
273 void
275 {
276  if (lookAhead > Time (0))
277  {
278  NS_LOG_FUNCTION (this << lookAhead);
279  m_lookAhead = Min(m_lookAhead, lookAhead);
280  }
281  else
282  {
283  NS_LOG_WARN ("attempted to set lookahead to a negative time: " << lookAhead);
284  }
285 }
286 
287 void
289 {
290  NS_LOG_FUNCTION (this << schedulerFactory);
291 
292  Ptr<Scheduler> scheduler = schedulerFactory.Create<Scheduler> ();
293 
294  if (m_events != 0)
295  {
296  while (!m_events->IsEmpty ())
297  {
298  Scheduler::Event next = m_events->RemoveNext ();
299  scheduler->Insert (next);
300  }
301  }
302  m_events = scheduler;
303 }
304 
305 void
307 {
308  NS_LOG_FUNCTION (this);
309 
310  Scheduler::Event next = m_events->RemoveNext ();
311 
312  PreEventHook (EventId (next.impl, next.key.m_ts,
313  next.key.m_context, next.key.m_uid));
314 
315  NS_ASSERT (next.key.m_ts >= m_currentTs);
317  m_eventCount++;
318 
319  NS_LOG_LOGIC ("handle " << next.key.m_ts);
320  m_currentTs = next.key.m_ts;
322  m_currentUid = next.key.m_uid;
323  next.impl->Invoke ();
324  next.impl->Unref ();
325 }
326 
327 bool
329 {
330  return m_globalFinished;
331 }
332 
333 bool
335 {
336  return m_events->IsEmpty () || m_stop;
337 }
338 
339 uint64_t
341 {
342  // If local MPI task is has no more events or stop was called
343  // next event time is infinity.
344  if (IsLocalFinished ())
345  {
347  }
348  else
349  {
350  Scheduler::Event ev = m_events->PeekNext ();
351  return ev.key.m_ts;
352  }
353 }
354 
355 Time
357 {
358  return TimeStep (NextTs ());
359 }
360 
361 void
363 {
364  NS_LOG_FUNCTION (this);
365 
367  m_stop = false;
368  m_globalFinished = false;
369  while (!m_globalFinished)
370  {
371  Time nextTime = Next ();
372 
373  // If local event is beyond grantedTime then need to synchronize
374  // with other tasks to determine new time window. If local task
375  // is finished then continue to participate in allgather
376  // synchronizations with other tasks until all tasks have
377  // completed.
378  if (nextTime > m_grantedTime || IsLocalFinished () )
379  {
380  // Can't process next event, calculate a new LBTS
381  // First receive any pending messages
383  // reset next time
384  nextTime = Next ();
385  // And check for send completes
387  // Finally calculate the lbts
389  m_myId, IsLocalFinished (), nextTime);
390  m_pLBTS[m_myId] = lMsg;
391  MPI_Allgather (&lMsg, sizeof (LbtsMessage), MPI_BYTE, m_pLBTS,
392  sizeof (LbtsMessage), MPI_BYTE, MpiInterface::GetCommunicator ());
393  Time smallestTime = m_pLBTS[0].GetSmallestTime ();
394  // The totRx and totTx counts insure there are no transient
395  // messages; If totRx != totTx, there are transients,
396  // so we don't update the granted time.
397  uint32_t totRx = m_pLBTS[0].GetRxCount ();
398  uint32_t totTx = m_pLBTS[0].GetTxCount ();
400 
401  for (uint32_t i = 1; i < m_systemCount; ++i)
402  {
403  if (m_pLBTS[i].GetSmallestTime () < smallestTime)
404  {
405  smallestTime = m_pLBTS[i].GetSmallestTime ();
406  }
407  totRx += m_pLBTS[i].GetRxCount ();
408  totTx += m_pLBTS[i].GetTxCount ();
410  }
411 
412  // Global halting condition is all nodes have empty queue's and
413  // no messages are in-flight.
414  m_globalFinished &= totRx == totTx;
415 
416  if (totRx == totTx)
417  {
418  // If lookahead is infinite then granted time should be as well.
419  // Covers the edge case if all the tasks have no inter tasks
420  // links, prevents overflow of granted time.
422  {
424  }
425  else
426  {
427  // Overflow is possible here if near end of representable time.
428  m_grantedTime = smallestTime + m_lookAhead;
429  }
430  }
431  }
432 
433  // Execute next event if it is within the current time window.
434  // Local task may be completed.
435  if ( (nextTime <= m_grantedTime) && (!IsLocalFinished ()) )
436  { // Safe to process
437  ProcessOneEvent ();
438  }
439  }
440 
441  // If the simulator stopped naturally by lack of events, make a
442  // consistency test to check that we didn't lose any events along the way.
443  NS_ASSERT (!m_events->IsEmpty () || m_unscheduledEvents == 0);
444 }
445 
447 {
448  return m_myId;
449 }
450 
451 void
453 {
454  NS_LOG_FUNCTION (this);
455 
456  m_stop = true;
457 }
458 
459 void
461 {
462  NS_LOG_FUNCTION (this << delay.GetTimeStep ());
463 
465 }
466 
467 //
468 // Schedule an event for a _relative_ time in the future.
469 //
470 EventId
472 {
473  NS_LOG_FUNCTION (this << delay.GetTimeStep () << event);
474 
475  Time tAbsolute = delay + TimeStep (m_currentTs);
476 
477  NS_ASSERT (tAbsolute.IsPositive ());
478  NS_ASSERT (tAbsolute >= TimeStep (m_currentTs));
479  Scheduler::Event ev;
480  ev.impl = event;
481  ev.key.m_ts = static_cast<uint64_t> (tAbsolute.GetTimeStep ());
482  ev.key.m_context = GetContext ();
483  ev.key.m_uid = m_uid;
484  m_uid++;
486  m_events->Insert (ev);
487  return EventId (event, ev.key.m_ts, ev.key.m_context, ev.key.m_uid);
488 }
489 
490 void
491 DistributedSimulatorImpl::ScheduleWithContext (uint32_t context, Time const &delay, EventImpl *event)
492 {
493  NS_LOG_FUNCTION (this << context << delay.GetTimeStep () << m_currentTs << event);
494 
495  Scheduler::Event ev;
496  ev.impl = event;
497  ev.key.m_ts = m_currentTs + delay.GetTimeStep ();
498  ev.key.m_context = context;
499  ev.key.m_uid = m_uid;
500  m_uid++;
502  m_events->Insert (ev);
503 }
504 
505 EventId
507 {
508  NS_LOG_FUNCTION (this << event);
509  return Schedule (Time (0), event);
510 }
511 
512 EventId
514 {
515  NS_LOG_FUNCTION (this << event);
516 
517  EventId id (Ptr<EventImpl> (event, false), m_currentTs, 0xffffffff, 2);
518  m_destroyEvents.push_back (id);
519  m_uid++;
520  return id;
521 }
522 
523 Time
525 {
526  return TimeStep (m_currentTs);
527 }
528 
529 Time
531 {
532  if (IsExpired (id))
533  {
534  return TimeStep (0);
535  }
536  else
537  {
538  return TimeStep (id.GetTs () - m_currentTs);
539  }
540 }
541 
542 void
544 {
545  if (id.GetUid () == EventId::UID::DESTROY)
546  {
547  // destroy events.
548  for (DestroyEvents::iterator i = m_destroyEvents.begin (); i != m_destroyEvents.end (); i++)
549  {
550  if (*i == id)
551  {
552  m_destroyEvents.erase (i);
553  break;
554  }
555  }
556  return;
557  }
558  if (IsExpired (id))
559  {
560  return;
561  }
562  Scheduler::Event event;
563  event.impl = id.PeekEventImpl ();
564  event.key.m_ts = id.GetTs ();
565  event.key.m_context = id.GetContext ();
566  event.key.m_uid = id.GetUid ();
567  m_events->Remove (event);
568  event.impl->Cancel ();
569  // whenever we remove an event from the event list, we have to unref it.
570  event.impl->Unref ();
571 
573 }
574 
575 void
577 {
578  if (!IsExpired (id))
579  {
580  id.PeekEventImpl ()->Cancel ();
581  }
582 }
583 
584 bool
586 {
587  if (id.GetUid () == EventId::UID::DESTROY)
588  {
589  if (id.PeekEventImpl () == 0
590  || id.PeekEventImpl ()->IsCancelled ())
591  {
592  return true;
593  }
594  // destroy events.
595  for (DestroyEvents::const_iterator i = m_destroyEvents.begin (); i != m_destroyEvents.end (); i++)
596  {
597  if (*i == id)
598  {
599  return false;
600  }
601  }
602  return true;
603  }
604  if (id.PeekEventImpl () == 0
605  || id.GetTs () < m_currentTs
606  || (id.GetTs () == m_currentTs
607  && id.GetUid () <= m_currentUid)
608  || id.PeekEventImpl ()->IsCancelled ())
609  {
610  return true;
611  }
612  else
613  {
614  return false;
615  }
616 }
617 
618 Time
620 {
623  return TimeStep (0x7fffffffffffffffLL);
624 }
625 
626 uint32_t
628 {
629  return m_currentContext;
630 }
631 
632 uint64_t
634 {
635  return m_eventCount;
636 }
637 
638 } // namespace ns3
Distributed simulator implementation using lookahead.
virtual void Cancel(const EventId &id)
Set the cancel bit on this event: the event's associated function will not be invoked when it expires...
DestroyEvents m_destroyEvents
The container of events to run at Destroy()
Time Next(void) const
Get the time of the next event, as returned by NextTs().
virtual EventId Schedule(Time const &delay, EventImpl *event)
Schedule a future event execution (in the same context).
Time m_grantedTime
End of current window.
virtual Time GetMaximumSimulationTime(void) const
Get the maximum representable simulation time.
uint32_t m_currentContext
Execution context of the current event.
LbtsMessage * m_pLBTS
Container for Lbts messages, one per rank.
uint64_t m_currentTs
Timestamp of the current event.
virtual void ScheduleWithContext(uint32_t context, Time const &delay, EventImpl *event)
Schedule a future event execution (in a different context).
virtual void Remove(const EventId &id)
Remove an event from the event list.
virtual bool IsFinished(void) const
Check if the simulation should finish.
void ProcessOneEvent(void)
Process the next event.
virtual void Run(void)
Run the simulation.
Ptr< Scheduler > m_events
The event priority queue.
virtual void Destroy()
Execute the events scheduled with ScheduleDestroy().
virtual Time GetDelayLeft(const EventId &id) const
Get the remaining time until this event will execute.
bool m_globalFinished
Are all parallel instances completed.
uint32_t m_uid
Next event unique id.
virtual uint64_t GetEventCount(void) const
Get the number of events executed.
virtual bool IsExpired(const EventId &id) const
Check if an event has already run or been cancelled.
void CalculateLookAhead(void)
Calculate lookahead constraint based on network latency.
virtual Time Now(void) const
Return the current simulation virtual time.
int m_unscheduledEvents
Number of events that have been inserted but not yet scheduled, not counting the "destroy" events; th...
virtual void DoDispose(void)
Destructor implementation.
virtual void SetScheduler(ObjectFactory schedulerFactory)
Set the Scheduler to be used to manage the event list.
virtual EventId ScheduleNow(EventImpl *event)
Schedule an event to run at the current virtual time.
bool IsLocalFinished(void) const
Check if this rank is finished.
uint32_t m_systemCount
MPI communicator size.
virtual void BoundLookAhead(const Time lookAhead)
Add additional bound to lookahead constraints.
virtual EventId ScheduleDestroy(EventImpl *event)
Schedule an event to run at the end of the simulation, after the Stop() time or condition has been re...
static TypeId GetTypeId(void)
Register this type.
virtual uint32_t GetContext(void) const
Get the current simulation context.
uint64_t NextTs(void) const
Get the timestep of the next event.
virtual uint32_t GetSystemId(void) const
Get the system id of this simulator.
bool m_stop
Flag calling for the end of the simulation.
static Time m_lookAhead
Current window size.
uint32_t m_currentUid
Unique id of the current event.
virtual void Stop(void)
Tell the Simulator the calling event should be the last one executed.
An identifier for simulation events.
Definition: event-id.h:54
A simulation event.
Definition: event-impl.h:45
void Invoke(void)
Called by the simulation engine to notify the event that it is time to execute.
Definition: event-impl.cc:46
static void ReceiveMessages()
Check for received messages complete.
static void TestSendComplete()
Check for completed sends.
Structure used for all-reduce LBTS computation.
uint32_t m_txCount
Count of transmitted messages.
uint32_t m_rxCount
Count of received messages.
uint32_t m_myId
System Id of the rank sending this LBTS.
Time m_smallestTime
Earliest next event timestamp.
bool m_isFinished
true when this rank has no more events.
static MPI_Comm GetCommunicator()
Return the communicator used to run ns-3.
static void Destroy()
Deletes storage used by the parallel environment.
static uint32_t GetSystemId()
Get the id number of this rank.
static uint32_t GetSize()
Get the number of ranks used by ns-3.
virtual Ptr< Channel > GetChannel(void) const =0
virtual bool IsPointToPoint(void) const =0
Return true if the net device is on a point-to-point link.
keep track of a set of node pointers.
Iterator Begin(void) const
Get an iterator which refers to the first Node in the container.
Iterator End(void) const
Get an iterator which indicates past-the-last Node in the container.
static NodeContainer GetGlobal(void)
Create a NodeContainer that contains a list of all nodes created through NodeContainer::Create() and ...
std::vector< Ptr< Node > >::const_iterator Iterator
Node container iterator.
uint32_t GetSystemId(void) const
Definition: node.cc:123
Instantiate subclasses of ns3::Object.
Ptr< Object > Create(void) const
Create an Object instance of the configured TypeId.
virtual void DoDispose(void)
Destructor implementation.
Definition: object.cc:346
Smart pointer class similar to boost::intrusive_ptr.
Definition: ptr.h:74
Maintain the event list.
Definition: scheduler.h:156
void Unref(void) const
Decrement the reference count.
static void Stop(void)
Tell the Simulator the calling event should be the last one executed.
Definition: simulator.cc:180
static EventId Schedule(Time const &delay, FUNC f, Ts &&... args)
Schedule an event to expire after delay.
Definition: simulator.h:556
@ NO_CONTEXT
Flag for events not associated with any particular context.
Definition: simulator.h:199
The SimulatorImpl base class.
virtual void PreEventHook(const EventId &id)
Hook called before processing each event.
Simulation virtual time values and global simulation resolution.
Definition: nstime.h:103
int64_t GetTimeStep(void) const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:415
bool IsPositive(void) const
Exactly equivalent to t >= 0.
Definition: nstime.h:316
int64_t GetInteger(void) const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:423
static Time Max()
Maximum representable Time Not to be confused with Max(Time,Time).
Definition: nstime.h:282
AttributeValue implementation for Time.
Definition: nstime.h:1308
Time Get(void) const
Definition: time.cc:519
a unique identifier for an interface.
Definition: type-id.h:59
TypeId SetParent(TypeId tid)
Set the parent TypeId.
Definition: type-id.cc:922
Declaration of classes ns3::LbtsMessage and ns3::DistributedSimulatorImpl.
Declaration of classes ns3::SentBuffer and ns3::GrantedTimeWindowMpiInterface.
@ INVALID
INVALID.
Definition: aodv-rtable.h:51
@ VALID
VALID.
Definition: aodv-rtable.h:50
#define NS_ASSERT(condition)
At runtime, in debugging builds, if this condition is not true, the program prints the source file,...
Definition: assert.h:67
int64x64_t Min(const int64x64_t &a, const int64x64_t &b)
Minimum.
Definition: int64x64.h:218
#define NS_LOG_COMPONENT_DEFINE(name)
Define a Log component with a specific name.
Definition: log.h:205
#define NS_LOG_LOGIC(msg)
Use NS_LOG to output a message of level LOG_LOGIC.
Definition: log.h:289
#define NS_LOG_FUNCTION(parameters)
If log level LOG_FUNCTION is enabled, this macro will output all input parameters separated by ",...
#define NS_LOG_WARN(msg)
Use NS_LOG to output a message of level LOG_WARN.
Definition: log.h:265
#define NS_OBJECT_ENSURE_REGISTERED(type)
Register an Object subclass with the TypeId system.
Definition: object-base.h:45
Time Seconds(double value)
Construct a Time in the indicated unit.
Definition: nstime.h:1244
Declaration of class ns3::MpiInterface.
void(* Time)(Time oldValue, Time newValue)
TracedValue callback signature for Time.
Definition: nstime.h:793
Every class exported by the ns3 library is enclosed in the ns3 namespace.
channel
Definition: third.py:92
Scheduler event.
Definition: scheduler.h:182
EventKey key
Key for sorting and ordering Events.
Definition: scheduler.h:184
EventImpl * impl
Pointer to the event implementation.
Definition: scheduler.h:183
uint32_t m_context
Event context.
Definition: scheduler.h:172
uint64_t m_ts
Event time stamp.
Definition: scheduler.h:170
uint32_t m_uid
Event unique id.
Definition: scheduler.h:171