spreadflow_core.jobqueue module¶
Cooperative job queue.
Provides a job queue (and iterator) specifically designed for the twisted cooperative multitasking facilities.
-
class
spreadflow_core.jobqueue.Entry(deferred, job)¶ Bases:
tuple-
__getnewargs__()¶ Return self as a plain tuple. Used by copy and pickle.
-
__getstate__()¶ Exclude the OrderedDict from pickling
-
__repr__()¶ Return a nicely formatted representation string
-
deferred¶ Alias for field number 0
-
job¶ Alias for field number 1
-
-
class
spreadflow_core.jobqueue.Job(channel, func, args, kwds)¶ Bases:
tuple-
__getnewargs__()¶ Return self as a plain tuple. Used by copy and pickle.
-
__getstate__()¶ Exclude the OrderedDict from pickling
-
__repr__()¶ Return a nicely formatted representation string
-
args¶ Alias for field number 2
-
channel¶ Alias for field number 0
-
func¶ Alias for field number 1
-
kwds¶ Alias for field number 3
-
-
class
spreadflow_core.jobqueue.JobQueue¶ Bases:
_abcoll.IteratorCooperative job queue.
A job queue (and iterator) specifically designed for the twisted cooperative multitasking facilities.
A job is any callable together with positional and keyword arguments. Results returned by a job are passed back to the caller by a deferred. If the job itself returns a deferred, any queued jobs on the same channel are blocked until a result becomes available.
Example
The following example illustrates how the
spreadflow_core.jobqueue.JobQueuecan be used together withtwisted.internet.task.cooperate(). Note that this function returns an instance oftwisted.internet.task.CooperativeTask. It can be used to pause, resume and stop the queue:from __future__ import print_function from twisted.internet import defer, task, reactor from spreadflow_core.jobqueue import JobQueue def say(message): ''' Prints a message and returns immediately. ''' print(message) def pause(seconds): ''' Returns a deferred which fires after the specified amount of time. ''' d = defer.Defered() reactor.callLater(seconds, d.success, None) return d def stop(result): reactor.stop() queue = JobQueue() queue_task = task.cooperate(queue) queue.put('channel one', say, 'hello') queue.put('channel one', pause, 2) queue.put('channel one', say, 'world!').addCallback(done) queue.put('channel two', pause, 1) queue.put('channel two', say, 'what?') reactor.run()
This example will generate the following output, pausing for one second between every line:
hello what? world!
-
__next__()¶ Implements
iterator.__next__()(Python >= 3)
-
clear()¶ Clears the backlog.
-
get()¶ Removes and returns the first item in the queue where the channel is ready.
- Returns
- tuple: A 5-tuple containing the channel, func, args, kwds, deferred
Raises: spreadflow_core.jobqueue.QueueNoneReady– Raised if there is either no item ready or no channel.
-
next()¶ Implements
iterator.__next__()(Python < 3)
-
put(channel, func, *args, **kwds)¶ Queue up a job for later execution on a specified channel.
Parameters: - channel – Any hashable value representing a channel. Jobs sent to the same channel are executed in FIFO sequence.
- func (callable) – The function to call upon job execution.
- *args – Positional parameters passed to the function upon job execution.
- **kwds – Keyword parameters passed to the function upon job execution.
Returns: A deferred firing when the job completed.
Return type:
-
stopempty¶ - bool: True if iterator should stop if both, the backlog gets empty and
- there are no more pending jobs.
-
-
exception
spreadflow_core.jobqueue.QueueNoneReady¶ Bases:
exceptions.Exception