Source code for examples.banksim.model.model_queue
"""Author: Changbeom Choi (@cbchoi)Copyright (c) 2014-2020 Handong Global UniversityCopyright (c) 2021-2024 Hanbat National UniversityLicense: MIT. The full license text is available at:https://github.com/eventsim/pyjevsim/blob/main/LICENSEThis module contains Banksim Queue Model """frompyjevsim.behavior_modelimportBehaviorModelfrompyjevsim.definitionimport*frompyjevsim.system_messageimportSysMessage
[docs]classBankQueue(BehaviorModel):"""Class representing a bank queue managing users and processors."""def__init__(self,name,queue_size,proc_num):""" Args: name (str): Name of Model queue_size (int): Maximum size of the queue proc_num (int): Number of processors """BehaviorModel.__init__(self,name)self.init_state("WAIT")# Initialize initial stateself.insert_state("WAIT",Infinite)# Add "WAIT" stateself.insert_state("SEND",0)# Add "SEND" state with duration 0self.insert_state("DROP",0)self.insert_input_port("user_in")# Add input port "user_in"self.insert_input_port("proc_checked")# Add input port "proc_checked"self.usable_proc=[]# List of usable processorsforiinrange(proc_num):self.insert_output_port(f"proc{i}")# Add output port for each processorself.usable_proc.append(f"proc{i}")# Add processor to usable listself.insert_output_port("result")self.proc_num=proc_num# Number of processorsself.queue_size=queue_size# Maximum queue sizeself.user=[]# List of users in the queueself.dropped_user=[]
[docs]defset_queue_size(self,queue_size):""" Sets the maximum size of the queue. Args: queue_size (int): Maximum size of the queue """self.queue_size=queue_size
[docs]defext_trans(self,port,msg):""" Handles external transitions based on the input port. Args: port (str): The port that received the message msg (SysMessage): The received message """_time=self.global_timeifport=="user_in":iflen(self.user)<self.queue_size:user=msg.retrieve()[0]self.user.append(user)# Add user to the queue#print(f"[Q][in] ID:{user.get_id()} Time:{_time}")self._cur_state="SEND"else:user=msg.retrieve()[0]#print(f"User Dropped: {user}")self._cur_state="DROP"user.set_drop_time(self.global_time)self.dropped_user.append(user)elifport=="proc_checked":#print("proc_checked")self.usable_proc.append(msg.retrieve()[0])# Add processor to usable listself._cur_state="SEND"ifnotself.usable_procornotself.user:self._cur_state="WAIT"# Transition to "WAIT" state if no users or processors
[docs]defoutput(self,msg_deliver):""" Generates the output message when in the "SEND" state. Returns: SysMessage: The output message """msg=None_time=self.global_timeifself._cur_state=="SEND":user=self.user.pop(0)# Get the first user in the queue#print(f"[Q][out] ID:{user.get_id()} Time:{_time}")msg=SysMessage(self.get_name(),self.usable_proc.pop(0))msg.insert(user)# Insert user into messagemsg2=SysMessage(self.get_name(),"result")msg2.insert(self.dropped_user)self.dropped_user=[]msg_deliver.insert_message(msg)msg_deliver.insert_message(msg2)ifself._cur_state=="DROP":#user = self.user.pop(0) # Get the first user in the queue#print(f"[Q][out] Dropped")msg=SysMessage(self.get_name(),"result")#msg.insert(user) # Insert user into messagedrop_user=self.dropped_usermsg.insert(drop_user)self.dropped_user=[]msg_deliver.insert_message(msg)returnmsg_deliver
[docs]defint_trans(self):"""Handles internal transitions based on the current state."""ifself._cur_state=="SEND":self._cur_state="SEND"# Remain in "SEND" stateifself._cur_state=="DROP":self._cur_state="SEND"ifnotself.usable_procornotself.user:self._cur_state="WAIT"# Transition to "WAIT" state if no users or processors
[docs]defset_proc_num(self,proc_num):""" Sets the number of processors and adjusts the usable processor list. Args: proc_num (int): Number of processors """ifproc_num>self.proc_num:foriinrange(self.proc_num,proc_num):self.insert_output_port(f"proc{i}")self.usable_proc.append(f"proc{i}")elifproc_num<self.proc_num:foriinrange(proc_num,self.proc_num):self.usable_proc.remove(f"proc{i}")self.proc_num=proc_numwhilelen(self.user)>self.queue_size:print(f"User Dropped: {self.user.pop()}")
def__str__(self):"""Returns a string representation of the BankQueue. Returns: str: String representation """returnf">> {self.get_name()}, State:{self._cur_state}, {self.user}"