My last two posts (here and here) introduced three Python-based alternatives for expressing simulation process or agent-based behavior that takes place over (or blocks for) simulated time:
- Generators, a core feature of the Python language
- Greenlets, provided by the third-party greenlet Python extension package
- Tasklets, provided by two standalone Python distributions, Stackless Python and PyPy
While those posts included a few brief code snippets, perhaps now is a good time to add meat to the bones and present the full code for one (albeit simple) simulation model. We’ll model the classic M/M/1 queue, a system with a single queue and a single server. Both interarrival and service times are exponentially distributed. There is no upper bound on the queue length. We’ll implement a process-oriented simulation in two ways; one using generators, the other, greenlets. (With small modifications, we could convert the greenlet-based code to tasklets.)
Most of the code is common to both approaches. First a few basics, starting with a clock:
class SimClock(object): current_time = 0 @staticmethod def now(): return SimClock.current_time @staticmethod def advance_to(tm): SimClock.current_time = tm
Every simulation needs a clock 🙂 . For now, we’ll keep it simple and unit-less. We could keep it even simpler and just define current_time
as a global variable, but we’ll wrap it inside a class instead, where now()
returns the current simulated time and advance_to()
advances the simulated time.
eventlist = [] counter = itertools.count() def initialize(): eventlist.clear() SimClock.current_time = 0
The simulation must maintain a list of the events to process. That list functions as a priority queue, where priority is the simulated time the event is scheduled to be fired/handled; if two events are scheduled for the same time, they should be handled first-in, first-out.
A single simulated M/M/1 queue is not going to terribly stress the simulator’s event manager, so I generally wouldn’t put a great deal of effort into optimizing this code right now. Fortunately, the Python standard library provides a heap, which makes for a pretty decent priority queue implementation with O(log n) performance. With almost no additional effort, we can keep events on a “heapified” global list, and maintain a counter to maintain the secondary FIFO priority.
The initialize()
function simply sets (or resets) both the event list and clock prior to starting a simulation run.
We now have an event list, so we need events:
class SimEvent(object): def __init__(self, simtime): self.time = simtime def register(self): heappush(eventlist, (self.time, next(counter), self)) @abstractmethod def handle(self): """ Process the event. Implemented by subclasses """ pass
SimEvent
is an abstract base class for the concrete event classes described below (ArrivalEvent
and ResumeEvent
). The base class includes one piece of data: the time at which the event should be handled/processed. The actual handling/processing is performed by the handle()
method, which must be implemented by the subclasses. The register()
method simply pushes the event on to the heapified list (priority queue) as part of a time/count/event tuple; the event time and counter value are be used to prioritize this event within the queue.
And to finish up with preliminaries, our simulator needs an event processor:
def processEvents(until_time): while eventlist: next_event = heappop(eventlist)[2] SimClock.advance_to(next_event.time) if SimClock.now() > until_time: break else: next_event.handle()
The processEvents()
function executes the simulation run for a specified length of simulated time. A seed ArrivalEvent
(see below) must be registered prior to calling processEvents()
; otherwise, the initial event list will be empty and the simulation stops. Otherwise, processEvents
runs a loop until either we run out of events or the clock is advanced passed the specified until_time
. The loop body simply pops the next scheduled event, advances the clock to that event’s scheduled execution time, and then processes/executes the event via it’s handle()
method.
On to the aforementioned ArrivalEvent
:
class ArrivalEvent(SimEvent): def __init__(self, interarrival_mean, service_mean, server, process_cls): super().__init__(0) self.interarrival_mean = interarrival_mean self.service_mean = service_mean self.server = server self.process_cls = process_cls self.rng = random.Random() self.rng.seed(1000420099314159) self.time = self.rng.expovariate(1/self.interarrival_mean) def handle(self): service_time = self.rng.expovariate(1/self.service_mean) process = self.process_cls(self.server, service_time) start_process_event = ResumeEvent(process, SimClock.now()) start_process_event.register() next_ia_time = self.rng.expovariate(1/self.interarrival_mean) self.time = SimClock.now() + next_ia_time self.register()
The ArrivalEvent
looks a bit more complicated. Its initializer is passed the basic M/M/1 model parameters (mean interarrival and service time) along with a reference to the single server and a process class. A Generator-based simulation uses the GeneratorProcess
class, while a greenlet-based simulation uses GreenletProcess
(as discussed below). The event’s handle()
method does the following:
- It instantiates a new process object using the process class passed to the initializer
- The service time is sampled from an exponential distribution (with a mean specified in the initializer) via a pseudo-random number generator (attribute
rng
, created in the initializer with a hard-coded seed.) The service time is also passed to the new process object, along with a reference to the server object. - The new process is started by instantiating and scheduling a
ResumeEvent
for right now (in simulated time) - Finally, the
ArrivalEvent
reschedules itself by sampling from another exponential distribution (with mean equal to the initialized interarrival time), setting a new event time, and reregistering itself. Rinse, lather, repeat.
As noted above, a single ArrivalEvent
must be created and registered prior to running processEvents()
. The ArrivalEvent
essentially seeds the simulation, providing an initial event to execute.
class ResumeEvent(SimEvent): def __init__(self, process, resumetime): super().__init__(resumetime) self.process = process def handle(self): self.process.resume()
The ResumeEvent
starts execution of a new process, or restarts a process that is currently blocked/not executing while either acquiring the server or waiting for the service time to pass. This is done through the process object’s resume()
method. (The GeneratorProcess
and GreenletProcess
classes each have their own implementation of resume()
.)
Now that we’re through with event classes, it’s on to the server (or resource, in alternative jargon):
class Server(object): def __init__(self): self.requests = deque() self.total_queue_time = 0 self.process_count = 0 self.current_process = None def request(self, process): self.process_count += 1 if self.current_process: self.requests.append(process) return False else: self.current_process = process return True def release(self, process): if self.requests: self.current_process = self.requests.popleft() event = ResumeEvent(self.current_process, SimClock.now()) event.register() qtime = SimClock.now() - self.current_process.start_time self.total_queue_time += qtime else: self.current_process = None @property def mean_queue_time(self): if self.process_count: return self.total_queue_time / self.process_count else: return None @property def queueSize(self): return len(self.requests)
The Server
class implements a server resource through request()
and release()
methods. Internally, it maintains a reference to the process object that currently holds the server, and a FIFO queue of processes that are waiting to acquire the server. In another moment of gratuitous optimization, we’ll implement that queue as a deque
rather than a standard Python list. When the server is released, it pulls the next requesting process of that queue, makes it the currently-being-served process, and tells the prcoess to resume (via a ResumeEvent
).
The server instance also tracks queue time data (time in queue, waiting for a request to be fulfilled), and returns the mean queue time via a property. If this were a real model we would certainly be tracking any number of other outputs, but this is enough to start.
Since we’re building a process-oriented model, we need a process class:
class SimProcess(object): def __init__(self, server, servicetime): self.server = server self.servicetime = servicetime self.start_time = SimClock.now() @abstractmethod def resume(self): pass @abstractmethod def acquire(self, server): pass @abstractmethod def wait_for(self, delaytime): pass @abstractmethod def execute(self): pass
SimProcess
is the abstract base class defining the interface for the generator and greenlet-based process classes mentioned above:
resume()
, as noted above, is invoked by the ResumeEventexecute()
is invoked immediately after process creation; as its name implies, this method encapsulates the execution of the simulation process (for one simulation entity) over simulated timeacquire()
andwait_for()
are typically invoked by subclass implementations ofexecute()
Finally, we have the generator and greenlet-specific subclasses of SimProcess
, which each also include M/M/1-specific processing logic:
class GeneratorProcess(SimProcess): def __init__(self, server, servicetime): super().__init__(server, servicetime) self.generator = self.execute() def resume(self): try: next(self.generator) except StopIteration as e: pass def acquire(self, server): acquired = server.request(self) if not acquired: yield def wait_for(self, delaytime): event = ResumeEvent(self, SimClock.now() + delaytime) event.register() yield def execute(self): yield from self.acquire(self.server) yield from self.wait_for(self.servicetime) self.server.release(self) class GreenletProcess(SimProcess): maingreenlet = greenlet.getcurrent() # the root greenlet def __init__(self, server, servicetime): super().__init__(server, servicetime) self.greenlet = greenlet(self.execute) def suspend(self): self.maingreenlet.switch() def resume(self): self.greenlet.switch() def acquire(self, server): acquired = server.request(self) if not acquired: self.suspend() def wait_for(self, delaytime): event = ResumeEvent(self, SimClock.now() + delaytime) event.register() self.suspend() def execute(self): self.acquire(self.server) self.wait_for(self.servicetime) self.server.release(self)
A few notes:
- As noted in my earlier posts, generator-based process methods (
execute()
) are started and resumed through anext()
call on the generator created in the process initializer. Similarly, greenlet-based process execution is started/resumed through aswitch()
call on the process’sgreenlet
(also created in the initializer) - For a generator-based process, execution is suspended (returning control to the event processor) through a
yield
statement. For greenlet-based processes, this is done through a switch back to the main (parent) greenlet. - A greenlet-based process simply calls
acquire()
andwait_for()
like any other method; a generator-based implementation must remember to use ayield from
statement.
Last but not least, a bit of code to actually initialize and execute two simulation runs (one for each SimProcess
implementation):
mean_interarrival_time = 10 mean_service_time = 8 classes = (GreenletProcess, GeneratorProcess) for cls in classes: initialize() server = Server() arrival_event = ArrivalEvent(mean_interarrival_time, mean_service_time, server, cls) arrival_event.register() processEvents(1000000) print("Process Class:", cls.__name__) print("total entries processed:", server.process_count) print("mean queue time:", server.mean_queue_time)
This code simply defines the parameters of the M/M/1 model (mean interarrival and service time) and runs the simulation twice, with each simulation being run for one million simulated time units. The resulting output is:
Process Class: GreenletProcess total entries processed: 99803 mean queue time: 31.48626676016153 Process Class: GeneratorProcess total entries processed: 99803 mean queue time: 31.48626676016153
It’s always reassuring to see that the two implementations generate identical results.
Queuing theory allows us to independently calculate the expected mean queue time analytically:
λ = 1 / mean interarrival time1 = 0.125
µ = 1 / mean service time = 0.1
ρ (utilization) = λ / µ = 0.8
mean queue time = ρ / (µ – λ) = 32
I’ll declare our simulation models valid based on the simulated mean queue time of just under 31.5 🙂 .