Howard Klein

Howard Klein

Musings on software, discrete event simulation and other pseudo-random topics

Simulating an M/M/1 Queue Using Generators and Greenlets

My last two posts (here and here) introduced three Python-based alternatives for expressing simulation process or agent-based behavior that takes place over (or blocks for) simulated time:

  • Generators, a core feature of the Python language
  • Greenlets, provided by the third-party greenlet Python extension package
  • Tasklets, provided by two standalone Python distributions, Stackless Python and PyPy

While those posts included a few brief code snippets, perhaps now is a good time to add meat to the bones and present the full code for one (albeit simple) simulation model.  We’ll model the classic M/M/1 queue, a system with a single queue and a single server.  Both interarrival and service times are exponentially distributed.  There is no upper bound on the queue length.  We’ll implement a process-oriented simulation in two ways; one using generators, the other, greenlets.  (With small modifications, we could convert the greenlet-based code to tasklets.)

Most of the code is common to both approaches.  First a few basics, starting with a clock:

class SimClock(object):
    current_time = 0
    
    @staticmethod
    def now():
        return SimClock.current_time
    
    @staticmethod
    def advance_to(tm):
        SimClock.current_time = tm

Every simulation needs a clock 🙂 .  For now, we’ll keep it simple and unit-less. We could keep it even simpler and just define current_time as a global variable, but we’ll wrap it inside a class instead, where now() returns the current simulated time and advance_to() advances the simulated time.

eventlist = []
counter = itertools.count()
def initialize():
    eventlist.clear()
    SimClock.current_time = 0

The simulation must maintain a list of the events to process.  That list functions as a priority queue, where priority is the simulated time the event is scheduled to be fired/handled; if two events are scheduled for the same time, they should be handled first-in, first-out.

A single simulated M/M/1 queue is not going to terribly stress the simulator’s event manager, so I generally wouldn’t put a great deal of effort into optimizing this code right now.  Fortunately, the Python standard library provides a heap, which makes for a pretty decent priority queue implementation with O(log n) performance.  With almost no additional effort, we can keep events on a “heapified” global list, and maintain a counter to maintain the secondary FIFO priority.

The initialize() function simply sets (or resets) both the event list and clock prior to starting a simulation run.

We now have an event list, so we need events:

class SimEvent(object):
    def __init__(self, simtime):
        self.time = simtime
        
    def register(self):
        heappush(eventlist, (self.time, next(counter), self))
        
    @abstractmethod
    def handle(self):
        """
        Process the event.  Implemented by subclasses
        """
        pass

SimEvent is an abstract base class for the concrete event classes described below (ArrivalEvent and ResumeEvent).  The base class includes one piece of data: the time at which the event should be handled/processed.  The actual handling/processing is performed by the handle() method, which must be implemented by the subclasses.  The register() method simply pushes the event on to the heapified list (priority queue) as part of a time/count/event tuple; the event time and counter value are be used to prioritize this event within the queue.

And to finish up with preliminaries, our simulator needs an event processor:

def processEvents(until_time):
    while eventlist:
        next_event = heappop(eventlist)[2]
        SimClock.advance_to(next_event.time)
        if SimClock.now() > until_time:
            break
        else:
            next_event.handle()

The processEvents() function executes the simulation run for a specified length of simulated time. A seed ArrivalEvent (see below) must be registered prior to calling processEvents(); otherwise, the initial event list will be empty and the simulation stops.  Otherwise, processEvents runs a loop until either we run out of events or the clock is advanced passed the specified until_time.  The loop body simply pops the next scheduled event, advances the clock to that event’s scheduled execution time, and then processes/executes the event via it’s handle() method.

On to the aforementioned ArrivalEvent:

class ArrivalEvent(SimEvent):
    def __init__(self, interarrival_mean, service_mean, server,                   
                 process_cls):
        super().__init__(0)
        self.interarrival_mean = interarrival_mean
        self.service_mean = service_mean
        self.server = server
        self.process_cls = process_cls
        self.rng = random.Random()
        self.rng.seed(1000420099314159)
        self.time = self.rng.expovariate(1/self.interarrival_mean)
        
    def handle(self):
        service_time = self.rng.expovariate(1/self.service_mean)
        process = self.process_cls(self.server, service_time)
        start_process_event = ResumeEvent(process, SimClock.now())
        start_process_event.register()
        next_ia_time = self.rng.expovariate(1/self.interarrival_mean)
        self.time = SimClock.now() + next_ia_time
        self.register()

The ArrivalEvent looks a bit more complicated.  Its initializer is passed the basic M/M/1 model parameters (mean interarrival and service time) along with a reference to the single server and a process class.  A Generator-based simulation uses the GeneratorProcess class, while a greenlet-based simulation uses GreenletProcess (as discussed below).  The event’s  handle() method does the following:

  1. It instantiates a new process object using the process class passed to the initializer
  2. The service time is sampled from an exponential distribution (with a mean specified in the initializer) via a pseudo-random number generator (attribute rng, created in the initializer with a hard-coded seed.) The service time is also passed to the new process object, along with a reference to the server object.
  3. The new process is started by instantiating and scheduling a ResumeEvent for right now (in simulated time)
  4. Finally, the ArrivalEvent reschedules itself by sampling from another exponential distribution (with mean equal to the initialized interarrival time), setting a new event time, and reregistering itself. Rinse, lather, repeat.

As noted above, a single ArrivalEvent must be created and registered prior to running processEvents().  The ArrivalEvent essentially seeds the simulation, providing an initial event to execute.

class ResumeEvent(SimEvent):
    def __init__(self, process, resumetime):
        super().__init__(resumetime)
        self.process = process
        
    def handle(self):
        self.process.resume()

The ResumeEvent starts execution of a new process, or restarts a process that is currently blocked/not executing while either acquiring the server or waiting for the service time to pass.  This is done through the process object’s resume() method.  (The GeneratorProcess and GreenletProcess classes each have their own implementation of resume().)

Now that we’re through with event classes, it’s on to the server (or resource, in alternative jargon):

class Server(object):
    def __init__(self):
        self.requests = deque()
        self.total_queue_time = 0
        self.process_count = 0
        self.current_process = None
        
    def request(self, process):
        self.process_count += 1
        if self.current_process:
            self.requests.append(process)
            return False
        else:
            self.current_process = process
            return True
    
    def release(self, process):
        if self.requests:
            self.current_process = self.requests.popleft()
            event = ResumeEvent(self.current_process, SimClock.now())
            event.register()
            qtime = SimClock.now() - self.current_process.start_time
            self.total_queue_time += qtime
        else:
            self.current_process = None  
                         
    @property
    def mean_queue_time(self):
        if self.process_count:
            return self.total_queue_time / self.process_count
        else:
            return None 

@property
    def queueSize(self):
        return len(self.requests)

The Server class implements a server resource through request() and release() methods.  Internally, it maintains a reference to the process object that currently holds the server, and a FIFO queue of processes that are waiting to acquire the server.  In another moment of gratuitous optimization, we’ll implement that queue as a deque rather than a standard Python list.  When the server is released, it pulls the next requesting process of that queue, makes it the currently-being-served process, and tells the prcoess to resume (via a ResumeEvent).

The server instance also tracks queue time data (time in queue, waiting for a request to be fulfilled), and returns the mean queue time via a property. If this were a real model we would certainly be tracking any number of other outputs, but this is enough to start.

Since we’re building a process-oriented model, we need a process class:

class SimProcess(object):
    def __init__(self, server, servicetime):
        self.server = server
        self.servicetime = servicetime
        self.start_time = SimClock.now()
        
    @abstractmethod
    def resume(self):
        pass
        
    @abstractmethod
    def acquire(self, server):
        pass
        
    @abstractmethod
    def wait_for(self, delaytime):
        pass
        
    @abstractmethod
    def execute(self):
        pass

SimProcess is the abstract base class defining the interface for the generator and greenlet-based process classes mentioned above:

  • resume(), as noted above, is invoked by the ResumeEvent
  • execute() is invoked immediately after process creation; as its name implies, this method encapsulates the execution of the simulation process (for one simulation entity) over simulated time
  • acquire() and wait_for() are typically invoked by subclass implementations of execute()

Finally, we have the generator and greenlet-specific subclasses of SimProcess, which each also include M/M/1-specific processing logic:

class GeneratorProcess(SimProcess):    
    def __init__(self, server, servicetime):
        super().__init__(server, servicetime)
        self.generator = self.execute()
        
    def resume(self):
        try:
            next(self.generator)
        except StopIteration as e:
            pass
                
    def acquire(self, server):
        acquired = server.request(self)
        if not acquired:
            yield
        
    def wait_for(self, delaytime):
        event = ResumeEvent(self, SimClock.now() + delaytime)
        event.register()
        yield
        
    def execute(self):
        yield from self.acquire(self.server)
        yield from self.wait_for(self.servicetime)
        self.server.release(self)


class GreenletProcess(SimProcess):
    maingreenlet = greenlet.getcurrent() # the root greenlet
    
    def __init__(self, server, servicetime):
        super().__init__(server, servicetime)
        self.greenlet = greenlet(self.execute)
        
    def suspend(self):
        self.maingreenlet.switch()
        
    def resume(self):
        self.greenlet.switch()
        
    def acquire(self, server):
        acquired = server.request(self)
        if not acquired:
            self.suspend()
        
    def wait_for(self, delaytime):
        event = ResumeEvent(self, SimClock.now() + delaytime)
        event.register()
        self.suspend()
        
    def execute(self):
        self.acquire(self.server)
        self.wait_for(self.servicetime)
        self.server.release(self)

A few notes:

  • As noted in my earlier posts, generator-based process methods (execute()) are started and resumed through a next() call on the generator created in the process initializer. Similarly, greenlet-based process execution is started/resumed through a switch() call on the process’s greenlet (also created in the initializer)
  • For a generator-based process, execution is suspended (returning control to the event processor) through a yield statement. For greenlet-based processes, this is done through a switch back to the main (parent) greenlet.
  • A greenlet-based process simply calls acquire() and wait_for() like any other method; a generator-based implementation must remember to use a yield from statement.

Last but not least, a bit of code to actually initialize and execute two simulation runs (one for each SimProcess implementation):

mean_interarrival_time = 10
mean_service_time = 8
classes = (GreenletProcess, GeneratorProcess)
for cls in classes:    
    initialize()
    server = Server()
    arrival_event = ArrivalEvent(mean_interarrival_time, 
                                 mean_service_time, server, cls)
    arrival_event.register()
    processEvents(1000000)
        
    print("Process Class:", cls.__name__)
    print("total entries processed:", server.process_count)
    print("mean queue time:", server.mean_queue_time)

This code simply defines the parameters of the M/M/1 model (mean interarrival and service time) and runs the simulation twice, with each simulation being run for one million simulated time units.  The resulting output is:

Process Class: GreenletProcess
total entries processed: 99803
mean queue time: 31.48626676016153
Process Class: GeneratorProcess
total entries processed: 99803
mean queue time: 31.48626676016153

It’s always reassuring to see that the two implementations generate identical results.

Queuing theory allows us to independently calculate the expected mean queue time analytically:

λ = 1 / mean interarrival time1 = 0.125

µ = 1 / mean service time = 0.1

ρ (utilization) = λ / µ = 0.8

mean queue time = ρ / (µ – λ) = 32

I’ll declare our simulation models valid based on the simulated mean queue time of just under 31.5  🙂 .

 

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes:
<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>