API¶
This page covers the ProcessRunner interface.
ProcessRunner Class¶
-
class
processrunner.ProcessRunner(command, cwd=None, autostart=True, stdin=None, log_name=None)¶ Easily execute external processes
Public Data Attributes:
Inherited from
PRTemplateidlog_namenamePublic Methods:
__init__(command[, cwd, autostart, stdin, …])Easily execute external processes
__enter__()Support ‘with’ syntax
__exit__(*exc_details)Support ‘with’ syntax
__getattr__(item)__or__(other)Support ProcessRunner chaining
Retrieve the command list originally passed in
validateCommandFormat(command)Run validation against the command list argument
Use to open the stdin pipe before starting the command
Close the stdin pipe
start()Start the process
started()Whether the command has started
isQueueEmpty(procPipeName, clientId)Check whether the pipe manager queues report empty
Check that all queues are empty
isAlive()Check whether the Popen process reports alive
poll()Invoke the subprocess.Popen.poll() method
join()Deprecated
wait([timeout])Block until the Popen process exits and reader queues are drained
terminate([timeoutMs])Terminate both the target process (the command) and reader queues.
Send SIGTERM to the main process (the command)
Send SIGKILL to the main process (the command)
shutdown()Shutdown the process managers.
registerForClientQueue(procPipeName)Register to get a client queue on a pipe manager
unRegisterClientQueue(procPipeName, clientId)Unregister a client queue from a pipe manager
getLineFromPipe(procPipeName, clientId[, …])Retrieve a line from a pipe manager
Force one line of output each from attached pipes
mapLines(func, procPipeName)Run a function against each line presented by a pipe manager
map(func, procPipeName)Run a function against each line presented by a pipe manager
startMapLines([force])Start
mapLines()child processesgetExpandingList([procPipeName])Get a shared
listand one or more completionEventsfor lines from one or more output pipes (not stdin).getClientCount([procPipeName])Return the number of clients for a given pipe, or all pipes
collectLines([procPipeName, timeout, suppress])Retrieve output lines as a
listfor one or all pipes from the command.readlines([procPipeName, timeout, suppress])Retrieve output lines as a
listfor one or all pipes from the command.write(file_path[, procPipeName, append])Specify a file to direct output from the process.
Inherited from
PRTemplateadd_logging_handler(handler)Add a logging handler to the logger
Private Data Attributes:
Inherited from
PRTemplate_logPrivate Methods:
_linked_watcher(run, downstream_pr, out_queues)Misnomer to better align with the correct section of the codebase.
_join()Join any client processes, waiting for them to exit
_terminate_queues()Clean up straggling processes that might still be running.
_link_client_queues(procPipeName, queue)Connect an existing
registerForClientQueue()queue to another queueInherited from
PRTemplate_initialize_logging(class_name)Basic logging with class name
_initialize_logging_with_id(class_name)Adds self.id to logger name after class name
_initialize_logging_with_log_name(class_name)Use an extended name when recording log lines
-
add_logging_handler(handler)¶ Add a logging handler to the logger
- Parameters
handler (logging.Handler) – A logging handler
-
areAllQueuesEmpty()¶ Check that all queues are empty
A bit dangerous to use, will block if any client has stopped pulling from their queue. Better to use isQueueEmpty() for the dedicated client queue. Sometimes (especially externally) that’s not possible.
- Returns
True if all queues are empty
- Return type
bool
-
closeStdin()¶ Close the stdin pipe
-
collectLines(procPipeName=None, timeout=None, suppress=None)¶ Retrieve output lines as a
listfor one or all pipes from the command.Blocks until the sources are finished and queues drained.
Please note
Will start the command if it is not already running
Removes trailing newlines
If the command is already running, you will get a PotentialDataLoss exception. Catch the exception if the potenital data loss is acceptable in your situation.
- Parameters
procPipeName (string) – One of “stdout” or “stderr”
timeout (float) – Max time to stay in
collectLines()suppress (list or None) – List of exceptions to not raise
- Raises
Timeout – If
timeoutis exceededPotentialDataLoss – Command is started and other readers have already been registered
- Returns
List of strings that are the output lines from selected pipes
- Return type
list
-
destructiveAudit()¶ Force one line of output each from attached pipes
Used for debugging issues that might relate to data stuck in the queues. Triggers the pipes’ destructive_audit function which prints the last line of the queue or an ‘empty’ message.
-
enableStdin()¶ Use to open the stdin pipe before starting the command
- Raises
KeyError – Stdin has already been enabled
ProcessAlreadyStarted – Stdin cannot be enabled at this point because the command has already started
-
getClientCount(procPipeName=None)¶ Return the number of clients for a given pipe, or all pipes
- Parameters
procPipeName (string) – Name of the pipe
- Returns
- Number of clients of
procPipeNameor all pipes if procPipeNameis None
- Number of clients of
- Return type
int
-
getCommand()¶ Retrieve the command list originally passed in
- Returns
The list of strings passed to Popen
- Return type
list
-
getExpandingList(procPipeName=None)¶ Get a shared
listand one or more completionEventsfor lines from one or more output pipes (not stdin). Non-blocking in that it does not wait for the list to be populated.- Parameters
procPipeName (string) – Name of a pipe to read, or
Noneto read all- Returns
List of output lines and
dict(pipename: Event,)- Return type
tuple
Returns a tuple of
(list, dict). List of the output lines that expands asmapLines()adds to it, and adictof[pipeName: Event,]. When anEventis “set”, that means the given pipe has finished. The list is amultiprocessing.Manager.list, shared with themapLines()process populating the content.The
dictwill have multiple values ifprocPipeNameisNone, one for each output pipe.Used as a component of
collectLines().
-
getLineFromPipe(procPipeName, clientId, timeout=- 1)¶ Retrieve a line from a pipe manager
Throws
Emptyif no lines are available.- Parameters
procPipeName (string) – One of “stdout” or “stderr”
clientId (string) – ID of the client queue on this pipe manager
timeout (float) – Less than 0 for get_nowait behavior, otherwise uses
Queue.get(timeout=timeout); in seconds; default -1
- Returns
Line from specified client queue
- Return type
string
- Raises
Empty – No lines are available
-
isAlive()¶ Check whether the Popen process reports alive
- Returns
True if the command is still running
- Return type
bool
-
isQueueEmpty(procPipeName, clientId)¶ Check whether the pipe manager queues report empty
- Parameters
procPipeName (string) – One of “stdout” or “stderr”
clientId (string) – ID of the client queue on this pipe manager
- Returns
True if the queue(s) is/are empty
- Return type
bool
-
join()¶ Deprecated
-
killCommand()¶ Send SIGKILL to the main process (the command)
-
map(func, procPipeName)¶ Run a function against each line presented by a pipe manager
Returns a
multiprocessing.Eventthat can be used to monitor the status of the function. When the process is dead, the queues are empty, and all lines are processed, themultiprocessing.Eventwill be set toTrue. This can be used as a blocking mechanism by functions invokingProcessRunner.map(), by usingwait().Be careful to call startMapLines directly if invoking map after start() has been called.
- Parameters
func (function) – A function that takes one parameter, the line from the pipe
procPipeName (string) – One of “stdout” or “stderr”
- Returns
multiprocessing.Event
-
mapLines(func, procPipeName)¶ Run a function against each line presented by a pipe manager
Returns a
multiprocessing.Eventthat can be used to monitor the status of the function. When the process is dead, the queues are empty, and all lines are processed, themultiprocessing.Eventwill be set toTrue. This can be used as a blocking mechanism by functions invokingmapLines(), by usingmultiprocessing.Event.wait().Be careful to call startMapLines directly if invoking mapLines after start() has been called.
- Parameters
func (function) – A function that takes one parameter, the line from the pipe
procPipeName (string) – One of “stdout” or “stderr”
- Returns
multiprocessing.Event
-
poll()¶ Invoke the subprocess.Popen.poll() method
- Returns
Type depends on the status of the command
None: If the command is still running
int: Exit code of the command once stopped
-
readlines(procPipeName=None, timeout=None, suppress=None)¶ Retrieve output lines as a
listfor one or all pipes from the command.Blocks until the sources are finished and queues drained.
Alias of collectLines.
Similar to IOBase.readlines, without the hint parameter.
Please note
Will start the command if it is not already running
Removes trailing newlines
If the command is already running, you may get a
- Parameters
procPipeName (string) – One of “stdout” or “stderr”
timeout (float) – Max time to stay in
collectLines()
- Raises
Timeout – If
timeoutis exceeded- Returns
List of strings that are the output lines from selected pipes
- Return type
list
-
registerForClientQueue(procPipeName)¶ Register to get a client queue on a pipe manager
The ID for the queue is returned from the method as a string.
- Parameters
procPipeName (string) – One of “stdout” or “stderr”
- Returns
Client’s queue ID (
string) on this pipe and the newmultiprocessing.JoinableQueue- Return type
tuple
-
shutdown()¶ Shutdown the process managers. Run after verifying terminate/kill has destroyed any child processes
Runs
terminate()in case it hasn’t already been run
-
start()¶ Start the process
Use when setting
autostart=Falseto run the target command.DO NOT use when
autostart=True(the default), as this will raise aProcessAlreadyStartedexception.- Raises
ProcessAlreadyStarted – The command has already been started
- Returns
Supports method chaining
- Return type
-
startMapLines(force=False)¶ Start
mapLines()child processesTriggered by
wait(), so almost never needs to be called directly
-
started()¶ Whether the command has started
- Returns
Whether the command has been started
- Return type
bool
-
terminate(timeoutMs=3000)¶ Terminate both the target process (the command) and reader queues.
Use
terminate()to gracefully stop the target process (the command) and readers once you’re done reading. Useshutdown()if you are just trying to clean up, as it will triggerterminate().- Parameters
timeoutMs (int) – Milliseconds
terminate()should wait for main process to exit before raising aTimeoutexception- Raises
Timeout – If the command hasn’t stopped within
timeoutMs
-
terminateCommand()¶ Send SIGTERM to the main process (the command)
-
unRegisterClientQueue(procPipeName, clientId)¶ Unregister a client queue from a pipe manager
Keeps other clients from waiting on other clients that will never be read.
- Parameters
procPipeName (string) – One of “stdout” or “stderr”
clientId (string) – ID of the client queue on this pipe manager
- Returns
Client’s queue ID that was unregistered
- Return type
string
-
validateCommandFormat(command)¶ Run validation against the command list argument
Used for validation by ProcessRunner.__init__
- Parameters
command (list) – A list of strings
- Returns
Whether the command list is valid
- Return type
bool
-
wait(timeout=None)¶ Block until the Popen process exits and reader queues are drained
Does some extra checking to make sure the pipe managers have finished reading, and consumers have read off all consumer queues.
Supports a timeout argument to handle unexpected delays.
- Parameters
timeout (float) – Max time in seconds before raising
Timeout- Raises
Timeout – When the command has run longer than
timeout- Returns
Supports method chaining
- Return type
-
write(file_path, procPipeName=None, append=False)¶ Specify a file to direct output from the process. Does not block.
- Parameters
file_path (string) – Path to a file that should receive output
procPipeName (string) – Outbound pipe to reference (stdout, stderr)
append (bool) –
Trueto append to an existing file. Default (False) is to truncate the file
- Returns
Supports method chaining
- Return type
-