-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathMPFProcess.py
More file actions
125 lines (97 loc) · 4.68 KB
/
MPFProcess.py
File metadata and controls
125 lines (97 loc) · 4.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
"""
File name: MPFProcess.py
Author: Matthew Allen
Description:
This is the parent class for a process object. It executes the process loop and handles i/o with the main process.
Note that process termination happens regardless of what any child objects are in the middle of, so it is
important to implement the cleanup function properly as it is the only notification of termination that child
objects will get.
"""
import sys
import traceback
from multiprocessing import Process
class MPFProcess(Process):
STOP_KEYWORD = "STOP MPF PROCESS"
def __init__(self, process_name = "unnamed_mpf_process", loop_wait_period=None):
Process.__init__(self)
self._out = None
self._inp = None
self._loop_wait_period = loop_wait_period
self.name = process_name
self.shared_memory = None
self.task_checker = None
self.results_publisher = None
self._MPFLog = None
self._successful_termination = False
def run(self):
"""
The function to be called when a process is started.
:return: None
"""
try:
#We import everything necessary here to ensure that the libraries we need will be imported into the new
#process memory instead of the main process memory.
from MPFramework import MPFResultPublisher
from MPFramework import MPFTaskChecker
import logging
import time
#This are our i/o objects for interfacing with the main process.
self.task_checker = MPFTaskChecker(self._inp, self.name)
self.results_publisher = MPFResultPublisher(self._out, self.name)
self._MPFLog = logging.getLogger("MPFLogger")
self._MPFLog.debug("MPFProcess initializing...")
#Initialize.
self.init()
self._MPFLog.debug("MPFProcess {} has successfully initialized".format(self.name))
while True:
#Here is the simple loop to be executed by this process until termination.
#Check for new inputs from the main process.
if self.task_checker.check_for_update():
self._MPFLog.debug("MPFProcess {} got update {}".format(self.name, self.task_checker.header))
#If we are told to stop running, do so.
if self.task_checker.header == MPFProcess.STOP_KEYWORD:
self._MPFLog.debug("MPFPROCESS {} RECEIVED STOP SIGNAL!".format(self.name))
self._successful_termination = True
raise sys.exit(0)
#Otherwise, update with the latest main process message.
self._MPFLog.debug("MPFProcess {} sending update to subclass".format(self.name))
self.update(self.task_checker.header, self.task_checker.latest_data)
#Take a step.
self.step()
#Publish any output we might have.
self.publish()
#Wait if requested.
if self._loop_wait_period is not None and self._loop_wait_period > 0:
time.sleep(self._loop_wait_period)
except:
#Catch-all because I'm lazy.
error = traceback.format_exc()
if not self._successful_termination:
self._MPFLog.critical("MPFPROCESS {} HAS CRASHED!\n"
"EXCEPTION TRACEBACK:\n"
"{}".format(self.name, error))
finally:
#Clean everything up and terminate.
if self.task_checker is not None:
self._MPFLog.debug("MPFProcess {} Cleaning task checker...".format(self.name))
self.task_checker.cleanup()
del self.task_checker
self._MPFLog.debug("MPFProcess {} has cleaned its task checker!".format(self.name))
if self.results_publisher is not None:
del self.results_publisher
self._MPFLog.debug("MPFProcess {} Cleaning up...".format(self.name))
self.cleanup()
self._MPFLog.debug("MPFProcess {} Exiting!".format(self.name))
return
def set_shared_memory(self, memory):
self.shared_memory = memory
def init(self):
raise NotImplementedError
def update(self, header, data):
raise NotImplementedError
def step(self):
raise NotImplementedError
def publish(self):
raise NotImplementedError
def cleanup(self):
raise NotImplementedError