Tutorial¶
Observing changes in directories¶
# -*- coding: utf-8 -*-
#
# Observing changes in directories
# ================================
#
# This spredaflow configuration script demonstrates how to monitor changes to
# files in directories. Run the following command line and then make changes to
# some rst files while following the output printed to the terminal.
#
# Usage:
# spreadflow-twistd -n -c tutorial/001-observing-directories/spreadflow.conf
#
# Visualization:
# spreadflow-confviz tutorial/001-observing-directories/spreadflow.conf > /tmp/config.svg
# firefox /tmp/config.svg
# Import the configuration script utilities.
from spreadflow_core.script import Process, ChainTemplate
# Import observers and processors.
from spreadflow_core.proc import DebugLog
from spreadflow_observer_fs.source import FilesystemObserverSource
# Setup the environment.
import os
import platform
directory = os.getcwd()
use_spotlight = platform.system() == 'Darwin'
@Process()
class RstMonitor(ChainTemplate):
"""
A chain which monitors changes to all reStructured text files in the
current directory. Change records are printed to the console.
"""
chain = (
FilesystemObserverSource("kMDItemFSName endswith[c] '.rst'" if use_spotlight else "*.rst", directory),
DebugLog(),
)
Using Duplicate and Filter¶
# -*- coding: utf-8 -*-
#
# Using Duplicate and Filter
# ==========================
#
# This spredaflow configuration script demonstrates how to use Duplicate and
# Filter in order to branch of chains according to some property of a file.
#
# Usage:
# spreadflow-twistd -n -c tutorial/002-duplicate-and-filter/spreadflow.conf
#
# Visualization:
# spreadflow-confviz tutorial/002-duplicate-and-filter/spreadflow.conf > /tmp/config.svg
# firefox /tmp/config.svg
# Import the configuration script utilities.
from spreadflow_core.script import Process, ChainTemplate, DuplicatorTemplate
# Import observers and processors.
from spreadflow_core.proc import DebugLog
from spreadflow_delta.proc import Filter
from spreadflow_observer_fs.source import FilesystemObserverSource
# Setup the environment.
import os
import platform
directory = os.getcwd()
use_spotlight = platform.system() == 'Darwin'
def is_readme(key, doc):
"""
Returns True if the given record points to a readme file.
"""
filename = os.path.basename(doc['path'])
return filename.lower().startswith('readme')
@Process()
class ReadmeLogger(ChainTemplate):
"""
A chain which only acts on readme files.
"""
chain = (
Filter(is_readme),
DebugLog()
)
@Process()
class RstMonitor(ChainTemplate):
"""
A chain which monitors changes to all reStructured text files in the
current directory. Change records are printed to the console.
"""
chain = (
FilesystemObserverSource("kMDItemFSName endswith[c] '.rst'" if use_spotlight else "*.rst", directory),
DuplicatorTemplate("ReadmeLogger"),
)
Using Loadfile and SetComputedValue¶
# -*- coding: utf-8 -*-
#
# Using Loadfile and SetComputedValue
# ===================================
#
# This spredaflow configuration script demonstrates how to use Loadfile to act
# on file content and SetComputedValue to add new keys to the document record.
#
# Usage:
# spreadflow-twistd -n -c tutorial/003-loadfile-computed-value/spreadflow.conf
#
# Visualization:
# spreadflow-confviz tutorial/003-loadfile-computed-value/spreadflow.conf > /tmp/config.svg
# firefox /tmp/config.svg
# Import the configuration script utilities (Chain and Duplicate).
from spreadflow_core.script import Process, ChainTemplate, DuplicatorTemplate
# Import observers and processors.
from spreadflow_core.proc import DebugLog
from spreadflow_delta.proc import Filter, Loadfile, SetComputedValue
from spreadflow_observer_fs.source import FilesystemObserverSource
# Setup the environment.
import os
import platform
import re
directory = os.getcwd()
use_spotlight = platform.system() == 'Darwin'
def is_readme(key, doc):
"""
Returns True if the given record points to a readme file.
"""
filename = os.path.basename(doc['path'])
return filename.lower().startswith('readme')
def count_words(key, doc):
"""
Counts the number of words in a given document.
"""
return len(re.findall(r'\w+', doc['content']))
@Process()
class ReadmeWordCountLogger(ChainTemplate):
"""
A chain which counts the words in readme files.
"""
chain = (
Filter(is_readme),
Loadfile(encoding='utf-8'),
SetComputedValue('words', count_words),
DebugLog()
)
@Process()
class RstMonitor(ChainTemplate):
"""
A chain which monitors changes to all reStructured text files in the
current directory. Change records are printed to the console.
"""
chain = (
FilesystemObserverSource("kMDItemFSName endswith[c] '.rst'" if use_spotlight else "*.rst", directory),
DuplicatorTemplate("ReadmeWordCountLogger"),
)
Using MapReduce¶
# -*- coding: utf-8 -*-
#
# Using Loadfile and SetComputedValue
# ===================================
#
# This spredaflow configuration script demonstrates how to use the MapReduce
# component to merge multiple documents.
#
# Usage:
# spreadflow-twistd -n -c tutorial/004-mapreduce/spreadflow.conf
#
# Visualization:
# spreadflow-confviz tutorial/004-mapreduce/spreadflow.conf > /tmp/config.svg
# firefox /tmp/config.svg
# Import the configuration script utilities (Chain and Duplicate).
from spreadflow_core.script import Process, ChainTemplate, DuplicatorTemplate
# Import observers and processors.
from spreadflow_core.proc import DebugLog
from spreadflow_delta.proc import Filter, Loadfile, SetComputedValue, MapReduce
from spreadflow_observer_fs.source import FilesystemObserverSource
# Setup the environment.
import functools
import os
import platform
import re
directory = os.getcwd()
use_spotlight = platform.system() == 'Darwin'
def is_readme(key, doc):
"""
Returns True if the given record points to a readme file.
"""
filename = os.path.basename(doc['path'])
return filename.lower().startswith('readme')
def count_words(key, doc):
"""
Counts the number of words in a given document.
"""
return len(re.findall(r'\w+', doc['content']))
def map_readme_text(key, doc):
"""
Map callback for mapreduce operation.
"""
yield 'combined-readme.rst', {
'words': doc['words'],
'content': 'File: {:s} ({:d})\n{:s}\n\n'.format(doc['path'], doc['words'], doc['content'])
}
def reduce_readme_text(key, docs):
"""
Reduce callback for mapreduce operation.
"""
def _combine_text(first, second):
return {
'words': first['words'] + second['words'],
'content': first['content'] + second['content']
}
return functools.reduce(_combine_text, docs)
def sort_key_readme_text(key, doc, dockey):
"""
Sort key callback for mapreduce operation.
"""
return key, doc['words']
@Process()
class ReadmeConcat(ChainTemplate):
"""
A chain which concatenates all readme files into one ordered by the number of
words.
"""
chain = (
Filter(is_readme),
Loadfile(encoding='utf-8'),
SetComputedValue('words', count_words),
MapReduce(map=map_readme_text, reduce=reduce_readme_text, sort_key=sort_key_readme_text),
DebugLog()
)
@Process()
class RstMonitor(ChainTemplate):
"""
A chain which monitors changes to all reStructured text files in the
current directory. Change records are printed to the console.
"""
chain = (
FilesystemObserverSource("kMDItemFSName endswith[c] '.rst'" if use_spotlight else "*.rst", directory),
DuplicatorTemplate("ReadmeConcat"),
)