"""Author: Changbeom Choi (@cbchoi)Copyright (c) 2014-2020 Handong Global UniversityCopyright (c) 2021-2024 Hanbat National UniversityLicense: MIT. The full license text is available at:https://github.com/eventsim/pyjevsim/blob/main/LICENSEThis module contains a StructuralExecutor, an object for executing a StructuralModel."""importcopyfromcollectionsimportdequefrom.definitionimportInfinitefrom.executorimportExecutorfrom.executor_factoryimportExecutorFactoryfrom.message_delivererimportMessageDeliverer
[docs]classStructuralExecutor(Executor):def__init__(self,global_time,itime,dtime,ename,model,parent,factory):"""Structural Exeuctor Args : global_time (float): Global simulation time itime (float): Instance creation time dtime (float): Destruction time ename (str): SysExecutor name model (StructuralModel): StructuralModel to execute factory(ExecutorFactory) : """super().__init__(itime,dtime,ename,model,parent)self.global_time=global_timeself.ex_factory=factoryself.behavior_object=modelself.min_schedule_item=deque([])self.model_executor_map={}self.sm=modelformodel_id,modelinself.behavior_object.get_models().items():executor=factory.create_executor(global_time,itime,dtime,ename,model,self)self.min_schedule_item.append(executor)self.model_executor_map[model]=executorself.request_time=0self._next_event_t=0self.min_schedule_item=deque(sorted(self.min_schedule_item,key=lambdabm:(bm.get_req_time(),bm.get_obj_id()),))self.next_exec_model=self.min_schedule_item.popleft()self.time_advance()def__str__(self):return"[N]:{0}, [S]:{1}".format(self.get_name(),"")
[docs]defint_trans(self):# Perform internal transitionself.next_exec_model.int_trans()#req_t = executor.get_req_time()self.next_exec_model.set_req_time(self.global_time)# Update next event time and reinsert into schedule listself.min_schedule_item.append(self.next_exec_model)self.min_schedule_item=deque(sorted(self.min_schedule_item,key=lambdabm:(bm.get_req_time(),bm.get_obj_id()),))self.next_exec_model=self.min_schedule_item.popleft()
[docs]defoutput(self,msg_deliver):ifnotmsg_deliver.has_contents():# Invoke output function of the first executor in schedule listexecutor=self.next_exec_modelexecutor.output(msg_deliver)whilemsg_deliver.has_contents():msg=msg_deliver.get_contents().pop()cr=(executor.get_core_model(),msg.get_dst())self.route_message(cr,msg)