spreadflow_core.jobqueue module

Cooperative job queue.

Provides a job queue (and iterator) specifically designed for the twisted cooperative multitasking facilities.

class spreadflow_core.jobqueue.Entry(deferred, job)

Bases: tuple

__getnewargs__()

Return self as a plain tuple. Used by copy and pickle.

__getstate__()

Exclude the OrderedDict from pickling

__repr__()

Return a nicely formatted representation string

deferred

Alias for field number 0

job

Alias for field number 1

class spreadflow_core.jobqueue.Job(channel, func, args, kwds)

Bases: tuple

__getnewargs__()

Return self as a plain tuple. Used by copy and pickle.

__getstate__()

Exclude the OrderedDict from pickling

__repr__()

Return a nicely formatted representation string

args

Alias for field number 2

channel

Alias for field number 0

func

Alias for field number 1

kwds

Alias for field number 3

class spreadflow_core.jobqueue.JobQueue

Bases: _abcoll.Iterator

Cooperative job queue.

A job queue (and iterator) specifically designed for the twisted cooperative multitasking facilities.

A job is any callable together with positional and keyword arguments. Results returned by a job are passed back to the caller by a deferred. If the job itself returns a deferred, any queued jobs on the same channel are blocked until a result becomes available.

Example

The following example illustrates how the spreadflow_core.jobqueue.JobQueue can be used together with twisted.internet.task.cooperate(). Note that this function returns an instance of twisted.internet.task.CooperativeTask. It can be used to pause, resume and stop the queue:

from __future__ import print_function
from twisted.internet import defer, task, reactor
from spreadflow_core.jobqueue import JobQueue

def say(message):
    '''
    Prints a message and returns immediately.
    '''
    print(message)

def pause(seconds):
    '''
    Returns a deferred which fires after the specified amount of
    time.
    '''
    d = defer.Defered()
    reactor.callLater(seconds, d.success, None)
    return d

def stop(result):
    reactor.stop()

queue = JobQueue()
queue_task = task.cooperate(queue)

queue.put('channel one', say, 'hello')
queue.put('channel one', pause, 2)
queue.put('channel one', say, 'world!').addCallback(done)

queue.put('channel two', pause, 1)
queue.put('channel two', say, 'what?')

reactor.run()

This example will generate the following output, pausing for one second between every line:

hello
what?
world!
__next__()

Implements iterator.__next__() (Python >= 3)

clear()

Clears the backlog.

get()

Removes and returns the first item in the queue where the channel is ready.

Returns
tuple: A 5-tuple containing the channel, func, args, kwds, deferred
Raises:spreadflow_core.jobqueue.QueueNoneReady – Raised if there is either no item ready or no channel.
next()

Implements iterator.__next__() (Python < 3)

put(channel, func, *args, **kwds)

Queue up a job for later execution on a specified channel.

Parameters:
  • channel – Any hashable value representing a channel. Jobs sent to the same channel are executed in FIFO sequence.
  • func (callable) – The function to call upon job execution.
  • *args – Positional parameters passed to the function upon job execution.
  • **kwds – Keyword parameters passed to the function upon job execution.
Returns:

A deferred firing when the job completed.

Return type:

twisted.internet.defer.Deferred

stopempty
bool: True if iterator should stop if both, the backlog gets empty and
there are no more pending jobs.
exception spreadflow_core.jobqueue.QueueNoneReady

Bases: exceptions.Exception