"""
Author: Changbeom Choi (@cbchoi)
Copyright (c) 2014-2020 Handong Global University
Copyright (c) 2021-2024 Hanbat National University
License: MIT. The full license text is available at:
https://github.com/eventsim/pyjevsim/blob/main/LICENSE
This module includes SysExecutor, a simulation engine that manages models over time.
"""
import copy, os
import datetime
import heapq
import math
import threading
import time
from collections import deque
from .core_model import CoreModel
from .default_message_catcher import DefaultMessageCatcher
from .definition import ExecutionType, Infinite, ModelType, SimulationMode
from .executor_factory import ExecutorFactory
from .system_message import SysMessage
from .termination_manager import TerminationManager
from .message_deliverer import MessageDeliverer
[docs]
class SysExecutor(CoreModel):
"""SysExecutor managing the execution of models in a simulation.(Simulation Engine)"""
EXTERNAL_SRC = "SRC"
EXTERNAL_DST = "DST"
def __init__(self, _time_resolution, _sim_name="default", ex_mode=ExecutionType.V_TIME, snapshot_manager=None):
"""
Initializes the SysExecutor with time resolution, simulation name, execution mode, and optional snapshot manager.
Args:
_time_resolution (float): The time resolution for the simulation
_sim_name (str, optional): The name of the simulation
ex_mode (R_TIME or VTIME): The execution mode
snapshot_manager (ModelSnapshotManager, optional): Manages SnapshotExecutor
"""
CoreModel.__init__(self, _sim_name, ModelType.UTILITY)
self.lock = threading.Lock()
self.global_time = 0
self.target_time = 0
self.time_resolution = _time_resolution
# dictionary for waiting simulation objects
self.waiting_obj_map = {}
# dictionary for active simulation objects
self.active_obj_map = {}
# dictionary for object to ports
self.product_port_map = {}
self.port_map = {}
self.hierarchical_structure = {}
self.model_map = {}
self.min_schedule_item = deque()
self.sim_init_time = datetime.datetime.now()
self.simulation_mode = SimulationMode.SIMULATION_IDLE
# External Interface
self.input_event_queue = []
self.output_event_queue = deque()
# TIME Handling
self.ex_mode = ex_mode
self.snapshot_manager = snapshot_manager
if snapshot_manager:
self.exec_factory = snapshot_manager.get_snapshot_factory()
else:
self.exec_factory = ExecutorFactory()
#Factory pattern to convert Model to ModelExecutor
self.dmc = DefaultMessageCatcher("dc")
#Model for handling uncaught messages
self.register_entity(self.dmc)
[docs]
def get_global_time(self):
"""
Retrieves the current global time.(simulation time)
Returns:
float: The current global time
"""
return self.global_time
'''
def set_snapshot_manager(self, snapshot_manager):
"""
Sets the snapshot manager.
Args:
snapshot_manager (ModelSnapshotManager): The snapshot manager to set
"""
self.snapshot_manager = snapshot_manager
'''
[docs]
def register_entity(self, entity, inst_t=0, dest_t=Infinite, ename="default"):
"""
Register simulation entity(Model).
Args:
entity (BehaviorModel or StructuralModel): The entity to register
inst_t (float, optional): Instance creation time
dest_t (float, optional): Destruction time
ename (str, optional): SysExecutor name
"""
sim_obj = self.exec_factory.create_executor(
self.global_time, inst_t, self.global_time + dest_t, ename, entity, self
)
self.product_port_map[entity] = sim_obj
if not sim_obj.get_create_time() in self.waiting_obj_map:
self.waiting_obj_map[sim_obj.get_create_time()] = []
self.waiting_obj_map[sim_obj.get_create_time()].append(sim_obj)
if sim_obj.get_name() in self.model_map:
self.model_map[sim_obj.get_name()].append(sim_obj)
else:
self.model_map[sim_obj.get_name()] = [sim_obj]
[docs]
def get_entity(self, model_name):
"""
Retrieves entities by model name.
Args:
model_name (str): The name of the model
Returns:
list: List of entities
"""
if model_name in self.model_map:
return self.model_map[model_name]
return []
[docs]
def get_model(self, name):
"""
Retrieve Model by name.
Args:
name (str): The name of the model
Returns:
CoreModel: Model(BehaviorModel or StructuralModel)
"""
return self.model_map[name][0].get_core_model()
[docs]
def remove_entity(self, model_name):
"""
Removes an entity by model name.
Args:
model_name (str): The name of the model
"""
if model_name in self.model_map:
self.destory_entity(self.model_map[model_name])
del self.model_map[model_name]
[docs]
def create_entity(self):
"""
Creates entities that are scheduled for creation.
"""
if len(self.waiting_obj_map.keys()) != 0:
key = min(self.waiting_obj_map)
if key <= self.global_time:
lst = self.waiting_obj_map[key]
for obj in lst:
self.active_obj_map[obj.get_obj_id()] = obj
obj.set_req_time(self.global_time)
self.min_schedule_item.append(obj)
del self.waiting_obj_map[key]
self.min_schedule_item = sorted(
self.min_schedule_item,
key=lambda bm: (bm.get_req_time(), bm.get_obj_id()),
)
[docs]
def destory_entity(self, delete_lst):
"""
Destroys a list of entities.
Args:
delete_lst (list): List of entities to delete
"""
for agent in delete_lst:
del self.active_obj_map[agent.get_obj_id()]
port_del_map = {}
for key, value in self.port_map.items():
if key[0] == agent:
port_del_map[key] = True
if value:
del_items = []
for src_port in value:
src, _ = src_port
if src == agent:
del_items.append(src_port)
for item in del_items:
value.remove(item)
for key in port_del_map:
del self.port_map[key]
if agent in self.min_schedule_item:
self.min_schedule_item.remove(agent)
[docs]
def destroy_active_entity(self):
"""
Destroys active entities that are scheduled for destruction.
"""
if len(self.active_obj_map.keys()) != 0:
delete_lst = []
for _, agent in self.active_obj_map.items():
if agent.get_destruct_time() <= self.global_time:
delete_lst.append(agent)
self.destory_entity(delete_lst)
[docs]
def coupling_relation(self, src_obj, out_port, dst_obj, in_port):
"""
Related Model's input/output ports to each other.
Related src_obj's output port to dst_obj's input port.
Args:
src_obj (BehaviorMdoel or StructuralModel): Model to relate as output ports
out_port (str): src_obj's output port
dst_obj (CoreModel): Model to relate as input ports
in_port (str): dst_obj's input port
"""
if src_obj and src_obj != self:
src_obj = self.product_port_map[src_obj]
else:
src_obj = self
if dst_obj and dst_obj != self:
dst_obj = self.product_port_map[dst_obj]
else:
dst_obj = self
if (src_obj, out_port) in self.port_map:
self.port_map[(src_obj, out_port)].append((dst_obj, in_port))
else:
self.port_map[(src_obj, out_port)] = [(dst_obj, in_port)]
[docs]
def get_relation(self):
"""
Retrieves the current coupling relations.
Returns:
dict: The relation map
"""
relation_map = {}
for relation in self.port_map.keys():
result_out_list = []
in_tuple = (relation[0].get_core_model(), relation[1])
out_list = self.port_map[relation]
for out in out_list:
result_out_list.append((out[0].get_core_model(), out[1]))
relation_map[in_tuple] = result_out_list
return relation_map
[docs]
def remove_relation(self, src, out_port, dst, in_port):
"""
Removes a coupling relation.
Args:
src_obj (str): BehaviorMdoel or StructuralModel name, Models that remove relationships as output port
out_port (str): src_obj's output port
dst_obj (CoreModel): Models that remove relationships as input port
in_port (str): dst_obj's input port
"""
in_tuple = (self.model_map[src][0], out_port)
found = self.port_map[in_tuple].index((self.model_map[dst][0], in_port))
del self.port_map[in_tuple][found]
if self.port_map[in_tuple] == []:
del self.port_map[in_tuple]
[docs]
def reset_relation(self):
"""Resets all coupling relations."""
self.port_map = {}
[docs]
def single_output_handling(self, obj, msg):
"""
Handles a single output message.
Args:
obj (BehaviorModel or StructuralModel): Model
msg (SysMessage): The message
"""
#print(obj.get_name())
pair = (obj, msg.get_dst())
if pair not in self.port_map:
self.port_map[pair] = [
(self.active_obj_map[self.dmc.get_obj_id()], "uncaught")
]
for port_pair in self.port_map[pair]:
destination = port_pair
if destination is None:
print("Destination Not Found")
raise AssertionError
if destination[0] is self:
self.output_event_queue.append((self.global_time, msg[1].retrieve()))
else:
if destination[0].get_obj_id() in self.active_obj_map:
destination[0].ext_trans(destination[1], msg)
destination[0].set_req_time(self.global_time)
[docs]
def output_handling(self, obj, msg_deliver):
"""
Handles output messages.
Args:
obj (BehaviorModel or StructuralModel): Model
msg (SysMessage): The message
"""
if msg_deliver.has_contents():
for msg in msg_deliver.get_contents():
if isinstance(msg, list):
for ith_msg in msg:
pair = (obj, ith_msg)
self.single_output_handling(obj, copy.deepcopy(pair))
else:
self.single_output_handling(obj, msg)
[docs]
def init_sim(self):
"""Initializes the simulation."""
self.simulation_mode = SimulationMode.SIMULATION_RUNNING
if self.active_obj_map is None:
self.global_time = min(self.waiting_obj_map)
if not self.min_schedule_item:
for obj in self.active_obj_map.items():
if obj[1].time_advance() < 0:
print("You should give positive real number for the deadline")
raise AssertionError
obj[1].set_req_time(self.global_time)
self.min_schedule_item.append(obj[1])
[docs]
def schedule(self):
"""Schedules the next simulation event."""
self.create_entity()
self.handle_external_input_event()
tuple_obj = self.min_schedule_item.popleft()
before = time.perf_counter() # Record time before processing
while tuple_obj.get_req_time() <= self.global_time:
msg_deliver = MessageDeliverer()
#msg = tuple_obj.output(msg_deliver)
tuple_obj.output(msg_deliver)
if msg_deliver.has_contents():
self.output_handling(tuple_obj, msg_deliver)
tuple_obj.int_trans()
req_t = tuple_obj.get_req_time()
tuple_obj.set_req_time(req_t)
self.min_schedule_item.append(tuple_obj)
self.min_schedule_item = deque(
sorted(
self.min_schedule_item,
key=lambda bm: (bm.get_req_time(), bm.get_obj_id()),
)
)
tuple_obj = self.min_schedule_item.popleft()
self.min_schedule_item.appendleft(tuple_obj)
self.global_time += self.time_resolution
self.destroy_active_entity()
if self.ex_mode == ExecutionType.R_TIME:
delta = float(self.time_resolution) - float(before - time.perf_counter())
if delta > 0:
time.sleep(delta)
[docs]
def simulate(self, _time=Infinite, _tm=True):
"""
Runs the simulation for a given amount of time.
Args:
_time (float): The simulation time
_tm (bool): Whether to use the termination manager
"""
if _tm:
self.tm = TerminationManager()
self.target_time = self.global_time + _time
self.init_sim()
while self.global_time < self.target_time:
if not self.waiting_obj_map:
if (
self.min_schedule_item[0].get_req_time() == Infinite
and self.ex_mode == "VIRTUAL_TIME"
):
self.simulation_mode = SimulationMode.SIMULATION_TERMINATED
break
self.schedule()
[docs]
def simulation_stop(self):
"""Stops the simulation and resets SysExecutor."""
self.global_time = 0
self.target_time = 0
self.time_resolution = 1
self.waiting_obj_map = {}
self.active_obj_map = {}
self.port_map = {}
self.min_schedule_item = deque()
self.sim_init_time = datetime.datetime.now()
self.dmc = DefaultMessageCatcher("dc")
self.register_entity(self.dmc)
[docs]
def insert_external_event(self, _port, _msg, scheduled_time=0):
"""
Inserts an external event into the simulation.
Args:
_port (str): port name
_msg (SysMessage or None): Event message
scheduled_time (float, optional): The scheduled time for the event
"""
sys_msg = SysMessage("SRC", _port)
sys_msg.insert(_msg)
if _port in self.external_input_ports:
with self.lock:
heapq.heappush(
self.input_event_queue, (scheduled_time + self.global_time, sys_msg)
)
else:
print("[INSERT_EXTERNAL_EVNT] Port Not Found")
[docs]
def insert_custom_external_event(self, _port, _bodylist, scheduled_time=0):
"""
Inserts a custom external event into the simulation.
Args:
_port (str): The port name / 포트 이름
_bodylist (list): The list of message bodies
scheduled_time (float, optional): The scheduled time for the event
"""
sys_msg = SysMessage("SRC", _port)
sys_msg.extend(_bodylist)
if _port in self.external_input_ports:
with self.lock:
heapq.heappush(
self.input_event_queue, (scheduled_time + self.global_time, sys_msg)
)
else:
print("[INSERT_EXTERNAL_EVNT] Port Not Found")
[docs]
def get_generated_event(self):
"""
Returns the queue of generated events.
Returns:
deque: The queue of generated events
"""
return self.output_event_queue
[docs]
def handle_external_output_event(self):
"""
Handles external output events and clears the output event queue.
Returns:
list: List of output events
"""
event_lists = copy.deepcopy(self.output_event_queue)
self.output_event_queue.clear()
return event_lists
[docs]
def is_terminated(self):
"""
Checks if the simulation is terminated.
Returns:
bool: True if terminated, False otherwise
"""
return self.simulation_mode == SimulationMode.SIMULATION_TERMINATED
[docs]
def snapshot_simulation(self, name = "", directory_path=".") :
"""
Snapshot the model and its releases.
Args :
name(str) : Name of the simulation to be snapshot
directory_path : Where the simulation will be snapshot
Raises:
ValueError: Snapshot manager is not set.
"""
if not self.snapshot_manager :
raise ValueError("Snapshot manager is not set. Cannot take snapshot.")
if name == "" :
name = self.get_name()
self.snapshot_manager.snapshot_simulation(self.port_map, self.model_map, name, directory_path)
[docs]
def terminate_simulation(self):
os._exit(0)