API

This page covers the ProcessRunner interface.

ProcessRunner Class

class processrunner.ProcessRunner(command, cwd=None, autostart=True, stdin=None, log_name=None)

Easily execute external processes

Public Data Attributes:

Inherited from PRTemplate

id

log_name

name

Public Methods:

__init__(command[, cwd, autostart, stdin, …])

Easily execute external processes

__enter__()

Support ‘with’ syntax

__exit__(*exc_details)

Support ‘with’ syntax

__getattr__(item)

__or__(other)

Support ProcessRunner chaining

getCommand()

Retrieve the command list originally passed in

validateCommandFormat(command)

Run validation against the command list argument

enableStdin()

Use to open the stdin pipe before starting the command

closeStdin()

Close the stdin pipe

start()

Start the process

started()

Whether the command has started

isQueueEmpty(procPipeName, clientId)

Check whether the pipe manager queues report empty

areAllQueuesEmpty()

Check that all queues are empty

isAlive()

Check whether the Popen process reports alive

poll()

Invoke the subprocess.Popen.poll() method

join()

Deprecated

wait([timeout])

Block until the Popen process exits and reader queues are drained

terminate([timeoutMs])

Terminate both the target process (the command) and reader queues.

terminateCommand()

Send SIGTERM to the main process (the command)

killCommand()

Send SIGKILL to the main process (the command)

shutdown()

Shutdown the process managers.

registerForClientQueue(procPipeName)

Register to get a client queue on a pipe manager

unRegisterClientQueue(procPipeName, clientId)

Unregister a client queue from a pipe manager

getLineFromPipe(procPipeName, clientId[, …])

Retrieve a line from a pipe manager

destructiveAudit()

Force one line of output each from attached pipes

mapLines(func, procPipeName)

Run a function against each line presented by a pipe manager

map(func, procPipeName)

Run a function against each line presented by a pipe manager

startMapLines([force])

Start mapLines() child processes

getExpandingList([procPipeName])

Get a shared list and one or more completion Events for lines from one or more output pipes (not stdin).

getClientCount([procPipeName])

Return the number of clients for a given pipe, or all pipes

collectLines([procPipeName, timeout, suppress])

Retrieve output lines as a list for one or all pipes from the command.

readlines([procPipeName, timeout, suppress])

Retrieve output lines as a list for one or all pipes from the command.

write(file_path[, procPipeName, append])

Specify a file to direct output from the process.

Inherited from PRTemplate

add_logging_handler(handler)

Add a logging handler to the logger

Private Data Attributes:

Inherited from PRTemplate

_log

Private Methods:

_linked_watcher(run, downstream_pr, out_queues)

Misnomer to better align with the correct section of the codebase.

_join()

Join any client processes, waiting for them to exit

_terminate_queues()

Clean up straggling processes that might still be running.

_link_client_queues(procPipeName, queue)

Connect an existing registerForClientQueue() queue to another queue

Inherited from PRTemplate

_initialize_logging(class_name)

Basic logging with class name

_initialize_logging_with_id(class_name)

Adds self.id to logger name after class name

_initialize_logging_with_log_name(class_name)

Use an extended name when recording log lines


add_logging_handler(handler)

Add a logging handler to the logger

Parameters

handler (logging.Handler) – A logging handler

areAllQueuesEmpty()

Check that all queues are empty

A bit dangerous to use, will block if any client has stopped pulling from their queue. Better to use isQueueEmpty() for the dedicated client queue. Sometimes (especially externally) that’s not possible.

Returns

True if all queues are empty

Return type

bool

closeStdin()

Close the stdin pipe

collectLines(procPipeName=None, timeout=None, suppress=None)

Retrieve output lines as a list for one or all pipes from the command.

Blocks until the sources are finished and queues drained.

Please note

  • Will start the command if it is not already running

  • Removes trailing newlines

If the command is already running, you will get a PotentialDataLoss exception. Catch the exception if the potenital data loss is acceptable in your situation.

Parameters
  • procPipeName (string) – One of “stdout” or “stderr”

  • timeout (float) – Max time to stay in collectLines()

  • suppress (list or None) – List of exceptions to not raise

Raises
  • Timeout – If timeout is exceeded

  • PotentialDataLoss – Command is started and other readers have already been registered

Returns

List of strings that are the output lines from selected pipes

Return type

list

destructiveAudit()

Force one line of output each from attached pipes

Used for debugging issues that might relate to data stuck in the queues. Triggers the pipes’ destructive_audit function which prints the last line of the queue or an ‘empty’ message.

enableStdin()

Use to open the stdin pipe before starting the command

Raises
  • KeyError – Stdin has already been enabled

  • ProcessAlreadyStarted – Stdin cannot be enabled at this point because the command has already started

getClientCount(procPipeName=None)

Return the number of clients for a given pipe, or all pipes

Parameters

procPipeName (string) – Name of the pipe

Returns

Number of clients of procPipeName or all pipes if

procPipeName is None

Return type

int

getCommand()

Retrieve the command list originally passed in

Returns

The list of strings passed to Popen

Return type

list

getExpandingList(procPipeName=None)

Get a shared list and one or more completion Events for lines from one or more output pipes (not stdin). Non-blocking in that it does not wait for the list to be populated.

Parameters

procPipeName (string) – Name of a pipe to read, or None to read all

Returns

List of output lines and dict(pipename: Event,)

Return type

tuple

Returns a tuple of (list, dict). List of the output lines that expands as mapLines() adds to it, and a dict of [pipeName: Event,]. When an Event is “set”, that means the given pipe has finished. The list is a multiprocessing.Manager.list, shared with the mapLines() process populating the content.

The dict will have multiple values if procPipeName is None, one for each output pipe.

Used as a component of collectLines().

getLineFromPipe(procPipeName, clientId, timeout=- 1)

Retrieve a line from a pipe manager

Throws Empty if no lines are available.

Parameters
  • procPipeName (string) – One of “stdout” or “stderr”

  • clientId (string) – ID of the client queue on this pipe manager

  • timeout (float) – Less than 0 for get_nowait behavior, otherwise uses Queue.get(timeout=timeout); in seconds; default -1

Returns

Line from specified client queue

Return type

string

Raises

Empty – No lines are available

isAlive()

Check whether the Popen process reports alive

Returns

True if the command is still running

Return type

bool

isQueueEmpty(procPipeName, clientId)

Check whether the pipe manager queues report empty

Parameters
  • procPipeName (string) – One of “stdout” or “stderr”

  • clientId (string) – ID of the client queue on this pipe manager

Returns

True if the queue(s) is/are empty

Return type

bool

join()

Deprecated

killCommand()

Send SIGKILL to the main process (the command)

map(func, procPipeName)

Run a function against each line presented by a pipe manager

Returns a multiprocessing.Event that can be used to monitor the status of the function. When the process is dead, the queues are empty, and all lines are processed, the multiprocessing.Event will be set to True. This can be used as a blocking mechanism by functions invoking ProcessRunner.map(), by using wait().

Be careful to call startMapLines directly if invoking map after start() has been called.

Parameters
  • func (function) – A function that takes one parameter, the line from the pipe

  • procPipeName (string) – One of “stdout” or “stderr”

Returns

multiprocessing.Event

mapLines(func, procPipeName)

Run a function against each line presented by a pipe manager

Returns a multiprocessing.Event that can be used to monitor the status of the function. When the process is dead, the queues are empty, and all lines are processed, the multiprocessing.Event will be set to True. This can be used as a blocking mechanism by functions invoking mapLines(), by using multiprocessing.Event.wait().

Be careful to call startMapLines directly if invoking mapLines after start() has been called.

Parameters
  • func (function) – A function that takes one parameter, the line from the pipe

  • procPipeName (string) – One of “stdout” or “stderr”

Returns

multiprocessing.Event

poll()

Invoke the subprocess.Popen.poll() method

Returns

Type depends on the status of the command

  • None: If the command is still running

  • int: Exit code of the command once stopped

readlines(procPipeName=None, timeout=None, suppress=None)

Retrieve output lines as a list for one or all pipes from the command.

Blocks until the sources are finished and queues drained.

Alias of collectLines.

Similar to IOBase.readlines, without the hint parameter.

Please note

  • Will start the command if it is not already running

  • Removes trailing newlines

If the command is already running, you may get a

Parameters
  • procPipeName (string) – One of “stdout” or “stderr”

  • timeout (float) – Max time to stay in collectLines()

Raises

Timeout – If timeout is exceeded

Returns

List of strings that are the output lines from selected pipes

Return type

list

registerForClientQueue(procPipeName)

Register to get a client queue on a pipe manager

The ID for the queue is returned from the method as a string.

Parameters

procPipeName (string) – One of “stdout” or “stderr”

Returns

Client’s queue ID (string) on this pipe and the new multiprocessing.JoinableQueue

Return type

tuple

shutdown()

Shutdown the process managers. Run after verifying terminate/kill has destroyed any child processes

Runs terminate() in case it hasn’t already been run

start()

Start the process

Use when setting autostart=False to run the target command.

DO NOT use when autostart=True (the default), as this will raise a ProcessAlreadyStarted exception.

Raises

ProcessAlreadyStarted – The command has already been started

Returns

Supports method chaining

Return type

ProcessRunner

startMapLines(force=False)

Start mapLines() child processes

Triggered by wait(), so almost never needs to be called directly

started()

Whether the command has started

Returns

Whether the command has been started

Return type

bool

terminate(timeoutMs=3000)

Terminate both the target process (the command) and reader queues.

Use terminate() to gracefully stop the target process (the command) and readers once you’re done reading. Use shutdown() if you are just trying to clean up, as it will trigger terminate().

Parameters

timeoutMs (int) – Milliseconds terminate() should wait for main process to exit before raising a Timeout exception

Raises

Timeout – If the command hasn’t stopped within timeoutMs

terminateCommand()

Send SIGTERM to the main process (the command)

unRegisterClientQueue(procPipeName, clientId)

Unregister a client queue from a pipe manager

Keeps other clients from waiting on other clients that will never be read.

Parameters
  • procPipeName (string) – One of “stdout” or “stderr”

  • clientId (string) – ID of the client queue on this pipe manager

Returns

Client’s queue ID that was unregistered

Return type

string

validateCommandFormat(command)

Run validation against the command list argument

Used for validation by ProcessRunner.__init__

Parameters

command (list) – A list of strings

Returns

Whether the command list is valid

Return type

bool

wait(timeout=None)

Block until the Popen process exits and reader queues are drained

Does some extra checking to make sure the pipe managers have finished reading, and consumers have read off all consumer queues.

Supports a timeout argument to handle unexpected delays.

Parameters

timeout (float) – Max time in seconds before raising Timeout

Raises

Timeout – When the command has run longer than timeout

Returns

Supports method chaining

Return type

ProcessRunner

write(file_path, procPipeName=None, append=False)

Specify a file to direct output from the process. Does not block.

Parameters
  • file_path (string) – Path to a file that should receive output

  • procPipeName (string) – Outbound pipe to reference (stdout, stderr)

  • append (bool) – True to append to an existing file. Default (False) is to truncate the file

Returns

Supports method chaining

Return type

ProcessRunner