ipyparallel¶
-
ipyparallel.
version_info
= (5, 0, 1)¶ tuple() -> empty tuple tuple(iterable) -> tuple initialized from iterable’s items
If the argument is a tuple, the return value is the same object.
The IPython parallel version as a tuple of integers. There will always be 3 integers. Development releases will have ‘dev’ as a fourth element.
Classes¶
-
class
ipyparallel.
Client
(url_file=None, profile=None, profile_dir=None, ipython_dir=None, context=None, debug=False, sshserver=None, sshkey=None, password=None, paramiko=None, timeout=10, cluster_id=None, **extra_args)¶ A semi-synchronous client to an IPython parallel cluster
Parameters: - url_file (str) – The path to ipcontroller-client.json. This JSON file should contain all the information needed to connect to a cluster, and is likely the only argument needed. Connection information for the Hub’s registration. If a json connector file is given, then likely no further configuration is necessary. [Default: use profile]
- profile (bytes) – The name of the Cluster profile to be used to find connector information. If run from an IPython application, the default profile will be the same as the running application, otherwise it will be ‘default’.
- cluster_id (str) – String id to added to runtime files, to prevent name collisions when using multiple clusters with a single profile simultaneously. When set, will look for files named like: ‘ipcontroller-<cluster_id>-client.json’ Since this is text inserted into filenames, typical recommendations apply: Simple character strings are ideal, and spaces are not recommended (but should generally work)
- context (zmq.Context) – Pass an existing zmq.Context instance, otherwise the client will create its own.
- debug (bool) – flag for lots of message printing for debug purposes
- timeout (float) – time (in seconds) to wait for connection replies from the Hub [Default: 10]
Other Parameters: - sshserver (str) – A string of the form passed to ssh, i.e. ‘server.tld’ or 'user@server.tld:port’ If keyfile or password is specified, and this is not, it will default to the ip given in addr.
- sshkey (str; path to ssh private key file) – This specifies a key to be used in ssh login, default None. Regular default ssh keys will be used without specifying this argument.
- password (str) – Your ssh password to sshserver. Note that if this is left None, you will be prompted for it if passwordless key based login is unavailable.
- paramiko (bool) – flag for whether to use paramiko instead of shell ssh for tunneling. [default: True on win32, False else]
-
ids
¶ list of int engine IDs
requesting the ids attribute always synchronizes the registration state. To request ids without synchronization, use semi-private _ids attributes.
-
history
¶ list of msg_ids
a list of msg_ids, keeping track of all the execution messages you have submitted in order.
-
outstanding
¶ set of msg_ids
a set of msg_ids that have been submitted, but whose results have not yet been received.
-
results
¶ dict
a dict of all our results, keyed by msg_id
-
block
¶ bool
determines default behavior when block not specified in execution methods
-
__del__
()¶ cleanup sockets, but _not_ context.
-
__getitem__
(key)¶ index access returns DirectView multiplexer objects
Must be int, slice, or list/tuple/xrange of ints
-
__iter__
()¶ Since we define getitem, Client is iterable
but unless we also define __iter__, it won’t work correctly unless engine IDs start at zero and are continuous.
-
__len__
()¶ len(client) returns # of engines.
-
abort
(jobs=None, targets=None, block=None)¶ Abort specific jobs from the execution queues of target(s).
This is a mechanism to prevent jobs that have already been submitted from executing.
Parameters: jobs (msg_id, list of msg_ids, or AsyncResult) – The jobs to be aborted
If unspecified/None: abort all outstanding jobs.
-
activate
(targets='all', suffix='')¶ Create a DirectView and register it with IPython magics
Defines the magics %px, %autopx, %pxresult, %%px
Parameters: - targets (int, list of ints, or ‘all’) – The engines on which the view’s magics will run
- suffix (str [default: ‘’]) –
The suffix, if any, for the magics. This allows you to have multiple views associated with parallel magics at the same time.
e.g.
rc.activate(targets=0, suffix='0')
will give you the magics%px0
,%pxresult0
, etc. for running magics just on engine 0.
-
clear
(targets=None, block=None)¶ Clear the namespace in target(s).
-
close
(linger=None)¶ Close my zmq Sockets
If linger, set the zmq LINGER socket option, which allows discarding of messages.
-
db_query
(query, keys=None)¶ Query the Hub’s TaskRecord database
This will return a list of task record dicts that match query
Parameters: - query (mongodb query dict) – The search dict. See mongodb query docs for details.
- keys (list of strs [optional]) – The subset of keys to be returned. The default is to fetch everything but buffers. ‘msg_id’ will always be included.
-
direct_view
(targets='all')¶ construct a DirectView object.
If no targets are specified, create a DirectView using all engines.
rc.direct_view(‘all’) is distinguished from rc[:] in that ‘all’ will evaluate the target engines at each execution, whereas rc[:] will connect to all current engines, and that list will not change.
That is, ‘all’ will always use all engines, whereas rc[:] will not use engines added after the DirectView is constructed.
Parameters: targets (list,slice,int,etc. [default: use all engines]) – The engines to use for the View
-
executor
(targets=None)¶ Construct a PEP-3148 Executor with a LoadBalancedView
Parameters: targets (list,slice,int,etc. [default: use all engines]) – The subset of engines across which to load-balance execution Returns: executor – The Executor object Return type: Executor
-
get_result
(indices_or_msg_ids=None, block=None, owner=True)¶ Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
If the client already has the results, no request to the Hub will be made.
This is a convenient way to construct AsyncResult objects, which are wrappers that include metadata about execution, and allow for awaiting results that were not submitted by this Client.
It can also be a convenient way to retrieve the metadata associated with blocking execution, since it always retrieves
Examples
In [10]: r = client.apply()
Parameters: - indices_or_msg_ids (integer history index, str msg_id, AsyncResult,) – or a list of same. The indices or msg_ids of indices to be retrieved
- block (bool) – Whether to wait for the result to be done
- owner (bool [default: True]) – Whether this AsyncResult should own the result. If so, calling ar.get() will remove data from the client’s result and metadata cache. There should only be one owner of any given msg_id.
Returns: - AsyncResult – A single AsyncResult object will always be returned.
- AsyncHubResult – A subclass of AsyncResult that retrieves results from the Hub
-
hub_history
()¶ Get the Hub’s history
Just like the Client, the Hub has a history, which is a list of msg_ids. This will contain the history of all clients, and, depending on configuration, may contain history across multiple cluster sessions.
Any msg_id returned here is a valid argument to get_result.
Returns: msg_ids – list of all msg_ids, ordered by task submission time. Return type: list of strs
-
ids
Always up-to-date ids property.
-
load_balanced_view
(targets=None)¶ construct a DirectView object.
If no arguments are specified, create a LoadBalancedView using all engines.
Parameters: targets (list,slice,int,etc. [default: use all engines]) – The subset of engines across which to load-balance execution
-
purge_everything
()¶ Clears all content from previous Tasks from both the hub and the local client
In addition to calling purge_results(“all”) it also deletes the history and other bookkeeping lists.
-
purge_hub_results
(jobs=[], targets=[])¶ Tell the Hub to forget results.
Individual results can be purged by msg_id, or the entire history of specific targets can be purged.
Use purge_results(‘all’) to scrub everything from the Hub’s db.
Parameters: - jobs (str or list of str or AsyncResult objects) – the msg_ids whose results should be forgotten.
- targets (int/str/list of ints/strs) –
The targets, by int_id, whose entire history is to be purged.
default : None
-
purge_local_results
(jobs=[], targets=[])¶ Clears the client caches of results and their metadata.
Individual results can be purged by msg_id, or the entire history of specific targets can be purged.
Use purge_local_results(‘all’) to scrub everything from the Clients’s results and metadata caches.
After this call all AsyncResults are invalid and should be discarded.
If you must “reget” the results, you can still do so by using client.get_result(msg_id) or client.get_result(asyncresult). This will redownload the results from the hub if they are still available (i.e client.purge_hub_results(...) has not been called.
Parameters: - jobs (str or list of str or AsyncResult objects) – the msg_ids whose results should be purged.
- targets (int/list of ints) – The engines, by integer ID, whose entire result histories are to be purged.
Raises: RuntimeError
(if any of the tasks to be purged are still outstanding.)
-
purge_results
(jobs=[], targets=[])¶ Clears the cached results from both the hub and the local client
Individual results can be purged by msg_id, or the entire history of specific targets can be purged.
Use purge_results(‘all’) to scrub every cached result from both the Hub’s and the Client’s db.
Equivalent to calling both purge_hub_results() and purge_client_results() with the same arguments.
Parameters: - jobs (str or list of str or AsyncResult objects) – the msg_ids whose results should be forgotten.
- targets (int/str/list of ints/strs) –
The targets, by int_id, whose entire history is to be purged.
default : None
-
queue_status
(targets='all', verbose=False)¶ Fetch the status of engine queues.
Parameters: - targets (int/str/list of ints/strs) – the engines whose states are to be queried. default : all
- verbose (bool) – Whether to return lengths only, or lists of ids for each element
-
resubmit
(indices_or_msg_ids=None, metadata=None, block=None)¶ Resubmit one or more tasks.
in-flight tasks may not be resubmitted.
Parameters: - indices_or_msg_ids (integer history index, str msg_id, or list of either) – The indices or msg_ids of indices to be retrieved
- block (bool) – Whether to wait for the result to be done
Returns: A subclass of AsyncResult that retrieves results from the Hub
Return type: AsyncHubResult
-
result_status
(msg_ids, status_only=True)¶ Check on the status of the result(s) of the apply request with msg_ids.
If status_only is False, then the actual results will be retrieved, else only the status of the results will be checked.
Parameters: - msg_ids (list of msg_ids) – if int: Passed as index to self.history for convenience.
- status_only (bool (default: True)) – if False: Retrieve the actual results of completed tasks.
Returns: results – There will always be the keys ‘pending’ and ‘completed’, which will be lists of msg_ids that are incomplete or complete. If status_only is False, then completed results will be keyed by their msg_id.
Return type:
-
send_apply_request
(socket, f, args=None, kwargs=None, metadata=None, track=False, ident=None)¶ construct and send an apply message via a socket.
This is the principal method with which all engine execution is performed by views.
-
send_execute_request
(socket, code, silent=True, metadata=None, ident=None)¶ construct and send an execute request via a socket.
-
shutdown
(targets='all', restart=False, hub=False, block=None)¶ Terminates one or more engine processes, optionally including the hub.
Parameters: - targets (list of ints or ‘all’ [default: all]) – Which engines to shutdown.
- hub (bool [default: False]) – Whether to include the Hub. hub=True implies targets=’all’.
- block (bool [default: self.block]) – Whether to wait for clean shutdown replies or not.
- restart (bool [default: False]) – NOT IMPLEMENTED whether to restart engines after shutting them down.
-
spin
()¶ DEPRECATED, DOES NOTHING
-
spin_thread
(interval=1)¶ DEPRECATED, DOES NOTHING
-
stop_spin_thread
()¶ DEPRECATED, DOES NOTHING
-
wait
(jobs=None, timeout=-1)¶ waits on one or more jobs, for up to timeout seconds.
Parameters: - jobs (int, str, or list of ints and/or strs, or one or more AsyncResult objects) – ints are indices to self.history strs are msg_ids default: wait on all outstanding messages
- timeout (float) – a time in seconds, after which to give up. default is -1, which means no timeout
Returns: - True (when all msg_ids are done)
- False (timeout reached, some msg_ids still outstanding)
-
wait_interactive
(jobs=None, interval=1.0, timeout=-1.0)¶ Wait interactively for jobs
If no job is specified, will wait for all outstanding jobs to complete.
-
class
ipyparallel.
DirectView
(client=None, socket=None, targets=None)¶ Direct Multiplexer View of one or more engines.
These are created via indexed access to a client:
>>> dv_1 = client[1] >>> dv_all = client[:] >>> dv_even = client[::2] >>> dv_some = client[1:3]
This object provides dictionary access to engine namespaces:
# push a=5: >>> dv[‘a’] = 5 # pull ‘foo’: >>> dv[‘foo’]
-
activate
(suffix='')¶ Activate IPython magics associated with this View
Defines the magics %px, %autopx, %pxresult, %%px, %pxconfig
Parameters: suffix (str [default: ‘’]) – The suffix, if any, for the magics. This allows you to have multiple views associated with parallel magics at the same time.
e.g.
rc[::2].activate(suffix='_even')
will give you the magics%px_even
,%pxresult_even
, etc. for running magics on the even engines.
-
clear
(targets=None, block=None)¶ Clear the remote namespaces on my engines.
-
execute
(code, silent=True, targets=None, block=None)¶ Executes code on targets in blocking or nonblocking manner.
execute
is always bound (affects engine namespace)Parameters:
-
gather
(key, dist='b', targets=None, block=None)¶ Gather a partitioned sequence on a set of engines as a single local seq.
-
get
(key_s)¶ get object(s) by key_s from remote namespace
see pull for details.
-
importer
¶ sync_imports(local=True) as a property.
See sync_imports for details.
-
map
(f, *sequences, **kwargs)¶ view.map(f, *sequences, block=self.block)
=> list|AsyncMapResultParallel version of builtin map, using this View’s targets.
There will be one task per target, so work will be chunked if the sequences are longer than targets.
Results can be iterated as they are ready, but will become available in chunks.
Parameters: Returns: - If block=False –
An
AsyncMapResult
instance. An object like AsyncResult, but which reassembles the sequence of results into a single list. AsyncMapResults can be iterated through before all results are complete. - else –
A list, the result of
map(f,*sequences)
- If block=False –
An
-
pull
(names, targets=None, block=None)¶ get object(s) by name from remote namespace
will return one object if it is a key. can also take a list of keys, in which case it will return a list of objects.
-
push
(ns, targets=None, block=None, track=None)¶ update remote namespace with dict ns
Parameters: - ns (dict) – dict of keys with which to update engine namespace(s)
- block (bool [default : self.block]) – whether to wait to be notified of engine receipt
-
run
(filename, targets=None, block=None)¶ Execute contents of filename on my engine(s).
This simply reads the contents of the file and calls execute.
Parameters:
-
scatter
(key, seq, dist='b', flatten=False, targets=None, block=None, track=None)¶ Partition a Python sequence and send the partitions to a set of engines.
-
sync_imports
(local=True, quiet=False)¶ Context Manager for performing simultaneous local and remote imports.
‘import x as y’ will not work. The ‘as y’ part will simply be ignored.
If local=True, then the package will also be imported locally.
If quiet=True, no output will be produced when attempting remote imports.
Note that remote-only (local=False) imports have not been implemented.
>>> with view.sync_imports(): ... from numpy import recarray importing recarray from numpy on engine(s)
-
update
(ns)¶ update remote namespace with dict ns
See push for details.
-
use_cloudpickle
()¶ Expand serialization support with cloudpickle.
This calls ipyparallel.serialize.use_cloudpickle() here and on each engine.
-
use_dill
()¶ Expand serialization support with dill
adds support for closures, etc.
This calls ipyparallel.serialize.use_dill() here and on each engine.
-
-
class
ipyparallel.
LoadBalancedView
(client=None, socket=None, **flags)¶ An load-balancing View that only executes via the Task scheduler.
Load-balanced views can be created with the client’s view method:
>>> v = client.load_balanced_view()
or targets can be specified, to restrict the potential destinations:
>>> v = client.load_balanced_view([1,3])
which would restrict loadbalancing to between engines 1 and 3.
-
map
(f, *sequences, **kwargs)¶ view.map(f, *sequences, block=self.block, chunksize=1, ordered=True)
=> list|AsyncMapResultParallel version of builtin map, load-balanced by this View.
block, and chunksize can be specified by keyword only.
Each chunksize elements will be a separate task, and will be load-balanced. This lets individual elements be available for iteration as soon as they arrive.
Parameters: - f (callable) – function to be mapped
- *sequences –
the sequences to be distributed and passed to f
- block (bool [default self.block]) – whether to wait for the result or not
- track (bool) – whether to create a MessageTracker to allow the user to safely edit after arrays and buffers during non-copying sends.
- chunksize (int [default 1]) – how many elements should be in each task.
- ordered (bool [default True]) –
Whether the results should be gathered as they arrive, or enforce the order of submission.
Only applies when iterating through AsyncMapResult as results arrive. Has no effect when block=True.
Returns: - if block=False –
An
AsyncMapResult
instance. An object like AsyncResult, but which reassembles the sequence of results into a single list. AsyncMapResults can be iterated through before all results are complete. - else –
A list, the result of
map(f,*sequences)
-
set_flags
(**kwargs)¶ set my attribute flags by keyword.
A View is a wrapper for the Client’s apply method, but with attributes that specify keyword arguments, those attributes can be set by keyword argument with this method.
Parameters: - block (bool) – whether to wait for results
- track (bool) – whether to create a MessageTracker to allow the user to safely edit after arrays and buffers during non-copying sends.
- after (Dependency or collection of msg_ids) – Only for load-balanced execution (targets=None) Specify a list of msg_ids as a time-based dependency. This job will only be run after the dependencies have been met.
- follow (Dependency or collection of msg_ids) – Only for load-balanced execution (targets=None) Specify a list of msg_ids as a location-based dependency. This job will only be run on an engine where this dependency is met.
- timeout (float/int or None) – Only for load-balanced execution (targets=None) Specify an amount of time (in seconds) for the scheduler to wait for dependencies to be met before failing with a DependencyTimeout.
- retries (int) – Number of times a task will be retried on failure.
-
-
class
ipyparallel.
ViewExecutor
(view)¶ A PEP-3148 Executor API for Views
Access as view.executor
-
map
(func, *iterables, **kwargs)¶ Return generator for View.map_async
-
shutdown
(wait=True)¶ ViewExecutor does not shutdown engines
results are awaited if wait=True, but engines are not shutdown.
-
submit
(fn, *args, **kwargs)¶ Same as View.apply_async
-
Decorators¶
IPython parallel provides some decorators to assist in using your functions as tasks.
-
ipyparallel.
interactive
(f)¶ decorator for making functions appear as interactively defined. This results in the function being linked to the user_ns as globals() instead of the module globals().
-
ipyparallel.
require
(*objects, **mapping)¶ Simple decorator for requiring local objects and modules to be available when the decorated function is called on the engine.
Modules specified by name or passed directly will be imported prior to calling the decorated function.
Objects other than modules will be pushed as a part of the task. Functions can be passed positionally, and will be pushed to the engine with their __name__. Other objects can be passed by keyword arg.
Examples
In [1]: @require('numpy') ...: def norm(a): ...: return numpy.linalg.norm(a,2)
In [2]: foo = lambda x: x*x In [3]: @require(foo) ...: def bar(a): ...: return foo(1-a)
-
ipyparallel.
depend
(_wrapped_f, *args, **kwargs)¶ Dependency decorator, for use with tasks.
@depend lets you define a function for engine dependencies just like you use apply for tasks.
Examples
@depend(df, a,b, c=5) def f(m,n,p) view.apply(f, 1,2,3)
will call df(a,b,c=5) on the engine, and if it returns False or raises an UnmetDependency error, then the task will not be run and another engine will be tried.
-
ipyparallel.
remote
(view, block=None, **flags)¶ Turn a function into a remote function.
This method can be used for map:
In [1]: @remote(view,block=True) ...: def func(a): ...: pass
-
ipyparallel.
parallel
(view, dist='b', block=None, ordered=True, **flags)¶ Turn a function into a parallel remote function.
This method can be used for map:
In [1]: @parallel(view, block=True) ...: def func(a): ...: pass
Exceptions¶
-
exception
ipyparallel.
RemoteError
(ename, evalue, traceback, engine_info=None)¶ Error raised elsewhere
-
exception
ipyparallel.
CompositeError
(message, elist)¶ Error for representing possibly multiple errors on engines
-
exception
ipyparallel.
NoEnginesRegistered
¶
-
exception
ipyparallel.
ImpossibleDependency
¶
-
exception
ipyparallel.
InvalidDependency
¶