Tutorial

Observing changes in directories

# -*- coding: utf-8 -*-
#
# Observing changes in directories
# ================================
#
# This spredaflow configuration script demonstrates how to monitor changes to
# files in directories. Run the following command line and then make changes to
# some rst files while following the output printed to the terminal.
#
# Usage:
# spreadflow-twistd -n  -c tutorial/001-observing-directories/spreadflow.conf
#
# Visualization:
# spreadflow-confviz tutorial/001-observing-directories/spreadflow.conf > /tmp/config.svg
# firefox /tmp/config.svg

# Import the configuration script utilities.
from spreadflow_core.script import Process, ChainTemplate

# Import observers and processors.
from spreadflow_core.proc import DebugLog
from spreadflow_observer_fs.source import FilesystemObserverSource

# Setup the environment.
import os
import platform

directory = os.getcwd()
use_spotlight = platform.system() == 'Darwin'

@Process()
class RstMonitor(ChainTemplate):
    """
    A chain which monitors changes to all reStructured text files in the
    current directory. Change records are printed to the console.
    """
    chain = (
        FilesystemObserverSource("kMDItemFSName endswith[c] '.rst'" if use_spotlight else "*.rst", directory),
        DebugLog(),
    )

Using Duplicate and Filter

# -*- coding: utf-8 -*-
#
# Using Duplicate and Filter
# ==========================
#
# This spredaflow configuration script demonstrates how to use Duplicate and
# Filter in order to branch of chains according to some property of a file.
#
# Usage:
# spreadflow-twistd -n  -c tutorial/002-duplicate-and-filter/spreadflow.conf
#
# Visualization:
# spreadflow-confviz tutorial/002-duplicate-and-filter/spreadflow.conf > /tmp/config.svg
# firefox /tmp/config.svg

# Import the configuration script utilities.
from spreadflow_core.script import Process, ChainTemplate, DuplicatorTemplate

# Import observers and processors.
from spreadflow_core.proc import DebugLog
from spreadflow_delta.proc import Filter
from spreadflow_observer_fs.source import FilesystemObserverSource

# Setup the environment.
import os
import platform

directory = os.getcwd()
use_spotlight = platform.system() == 'Darwin'

def is_readme(key, doc):
    """
    Returns True if the given record points to a readme file.
    """
    filename = os.path.basename(doc['path'])
    return filename.lower().startswith('readme')

@Process()
class ReadmeLogger(ChainTemplate):
    """
    A chain which only acts on readme files.
    """
    chain = (
        Filter(is_readme),
        DebugLog()
    )

@Process()
class RstMonitor(ChainTemplate):
    """
    A chain which monitors changes to all reStructured text files in the
    current directory. Change records are printed to the console.
    """
    chain = (
        FilesystemObserverSource("kMDItemFSName endswith[c] '.rst'" if use_spotlight else "*.rst", directory),
        DuplicatorTemplate("ReadmeLogger"),
    )

Using Loadfile and SetComputedValue

# -*- coding: utf-8 -*-
#
# Using Loadfile and SetComputedValue
# ===================================
#
# This spredaflow configuration script demonstrates how to use Loadfile to act
# on file content and SetComputedValue to add new keys to the document record.
#
# Usage:
# spreadflow-twistd -n  -c tutorial/003-loadfile-computed-value/spreadflow.conf
#
# Visualization:
# spreadflow-confviz tutorial/003-loadfile-computed-value/spreadflow.conf > /tmp/config.svg
# firefox /tmp/config.svg

# Import the configuration script utilities (Chain and Duplicate).
from spreadflow_core.script import Process, ChainTemplate, DuplicatorTemplate

# Import observers and processors.
from spreadflow_core.proc import DebugLog
from spreadflow_delta.proc import Filter, Loadfile, SetComputedValue
from spreadflow_observer_fs.source import FilesystemObserverSource

# Setup the environment.
import os
import platform
import re

directory = os.getcwd()
use_spotlight = platform.system() == 'Darwin'

def is_readme(key, doc):
    """
    Returns True if the given record points to a readme file.
    """
    filename = os.path.basename(doc['path'])
    return filename.lower().startswith('readme')

def count_words(key, doc):
    """
    Counts the number of words in a given document.
    """
    return len(re.findall(r'\w+', doc['content']))

@Process()
class ReadmeWordCountLogger(ChainTemplate):
    """
    A chain which counts the words in readme files.
    """
    chain = (
        Filter(is_readme),
        Loadfile(encoding='utf-8'),
        SetComputedValue('words', count_words),
        DebugLog()
    )

@Process()
class RstMonitor(ChainTemplate):
    """
    A chain which monitors changes to all reStructured text files in the
    current directory. Change records are printed to the console.
    """
    chain = (
        FilesystemObserverSource("kMDItemFSName endswith[c] '.rst'" if use_spotlight else "*.rst", directory),
        DuplicatorTemplate("ReadmeWordCountLogger"),
    )

Using MapReduce

# -*- coding: utf-8 -*-
#
# Using Loadfile and SetComputedValue
# ===================================
#
# This spredaflow configuration script demonstrates how to use the MapReduce
# component to merge multiple documents.
#
# Usage:
# spreadflow-twistd -n  -c tutorial/004-mapreduce/spreadflow.conf
#
# Visualization:
# spreadflow-confviz tutorial/004-mapreduce/spreadflow.conf > /tmp/config.svg
# firefox /tmp/config.svg

# Import the configuration script utilities (Chain and Duplicate).
from spreadflow_core.script import Process, ChainTemplate, DuplicatorTemplate

# Import observers and processors.
from spreadflow_core.proc import DebugLog
from spreadflow_delta.proc import Filter, Loadfile, SetComputedValue, MapReduce
from spreadflow_observer_fs.source import FilesystemObserverSource

# Setup the environment.
import functools
import os
import platform
import re

directory = os.getcwd()
use_spotlight = platform.system() == 'Darwin'

def is_readme(key, doc):
    """
    Returns True if the given record points to a readme file.
    """
    filename = os.path.basename(doc['path'])
    return filename.lower().startswith('readme')

def count_words(key, doc):
    """
    Counts the number of words in a given document.
    """
    return len(re.findall(r'\w+', doc['content']))

def map_readme_text(key, doc):
    """
    Map callback for mapreduce operation.
    """
    yield 'combined-readme.rst', {
        'words': doc['words'],
        'content': 'File: {:s} ({:d})\n{:s}\n\n'.format(doc['path'], doc['words'], doc['content'])
    }

def reduce_readme_text(key, docs):
    """
    Reduce callback for mapreduce operation.
    """
    def _combine_text(first, second):
        return {
            'words': first['words'] + second['words'],
            'content': first['content'] + second['content']
        }

    return functools.reduce(_combine_text, docs)

def sort_key_readme_text(key, doc, dockey):
    """
    Sort key callback for mapreduce operation.
    """
    return key, doc['words']


@Process()
class ReadmeConcat(ChainTemplate):
    """
    A chain which concatenates all readme files into one ordered by the number of
    words.
    """
    chain = (
        Filter(is_readme),
        Loadfile(encoding='utf-8'),
        SetComputedValue('words', count_words),
        MapReduce(map=map_readme_text, reduce=reduce_readme_text, sort_key=sort_key_readme_text),
        DebugLog()
    )

@Process()
class RstMonitor(ChainTemplate):
    """
    A chain which monitors changes to all reStructured text files in the
    current directory. Change records are printed to the console.
    """
    chain = (
        FilesystemObserverSource("kMDItemFSName endswith[c] '.rst'" if use_spotlight else "*.rst", directory),
        DuplicatorTemplate("ReadmeConcat"),
    )