PyFunctional makes creating data pipelines easy by using chained functional operators. Here are a
few examples of what it can do:
- Chained operators: seq(1, 2, 3).map(lambda x: x * 2).reduce(lambda x, y: x + y)
- Expressive and feature complete API
- Read and write text,csv,json,jsonl,sqlite,gzip,bz2, andlzma/xzfiles
- Parallelize "embarrassingly parallel" operations like mapeasily
- Complete documentation, rigorous unit test suite, 100% test coverage, and CI which provide robustness
PyFunctional's API takes inspiration from Scala collections, Apache Spark RDDs, and Microsoft
LINQ.
- Installation
- Examples
- Writing to Files
- Parallel Execution
- Github Shortform Documentation
- Contributing and Bug Fixes
- Changelog
PyFunctional is available on pypi and can be
installed by running:
# Install from command line
$ pip install pyfunctionalThen in python run: from functional import seq
PyFunctional is useful for many tasks, and can natively open several common file types. Here
are a few examples of what you can do.
from functional import seq
seq(1, 2, 3, 4)\
    .map(lambda x: x * 2)\
    .filter(lambda x: x > 4)\
    .reduce(lambda x, y: x + y)
# 14
# or if you don't like backslash continuation
(seq(1, 2, 3, 4)
    .map(lambda x: x * 2)
    .filter(lambda x: x > 4)
    .reduce(lambda x, y: x + y)
)
# 14PyFunctional has three types of functions:
- Streams: read data for use by the collections API.
- Transformations: transform data from streams with functions such as map,flat_map, andfilter
- Actions: These cause a series of transformations to evaluate to a concrete value. to_list,reduce, andto_dictare examples of actions.
In the expression seq(1, 2, 3).map(lambda x: x * 2).reduce(lambda x, y: x + y), seq is the
stream, map is the transformation, and reduce is the action.
from functional import seq
from collections import namedtuple
Transaction = namedtuple('Transaction', 'reason amount')
transactions = [
    Transaction('github', 7),
    Transaction('food', 10),
    Transaction('coffee', 5),
    Transaction('digitalocean', 5),
    Transaction('food', 5),
    Transaction('riotgames', 25),
    Transaction('food', 10),
    Transaction('amazon', 200),
    Transaction('paycheck', -1000)
]
# Using the Scala/Spark inspired APIs
food_cost = seq(transactions)\
    .filter(lambda x: x.reason == 'food')\
    .map(lambda x: x.amount).sum()
# Using the LINQ inspired APIs
food_cost = seq(transactions)\
    .where(lambda x: x.reason == 'food')\
    .select(lambda x: x.amount).sum()
# Using PyFunctional with fn
from fn import _
food_cost = seq(transactions).filter(_.reason == 'food').map(_.amount).sum()The account transactions example could be done easily in pure python using list comprehensions. To
show some of the things PyFunctional excels at, take a look at a couple of word count examples.
words = 'I dont want to believe I want to know'.split(' ')
seq(words).map(lambda word: (word, 1)).reduce_by_key(lambda x, y: x + y)
# [('dont', 1), ('I', 2), ('to', 2), ('know', 1), ('want', 2), ('believe', 1)]In the next example we have chat logs formatted in json lines (jsonl) which contain messages and metadata. A typical jsonl file will have one valid json on each line of a file. Below are a few lines out of examples/chat_logs.jsonl.
{"message":"hello anyone there?","date":"10/09","user":"bob"}
{"message":"need some help with a program","date":"10/09","user":"bob"}
{"message":"sure thing. What do you need help with?","date":"10/09","user":"dave"}from operator import add
import re
messages = seq.jsonl('examples/chat_logs.jsonl')
# Split words on space and normalize before doing word count
def extract_words(message):
    return re.sub('[^0-9a-z ]+', '', message.lower()).split(' ')
word_counts = messages\
    .map(lambda log: extract_words(log['message']))\
    .flatten().map(lambda word: (word, 1))\
    .reduce_by_key(add).order_by(lambda x: x[1])Next, let's continue that example but introduce a json database of users from examples/users.json.
In the previous example we showed how PyFunctional can do word counts, in the next example let's
show how PyFunctional can join different data sources.
# First read the json file
users = seq.json('examples/users.json')
#[('sarah',{'date_created':'08/08','news_email':True,'email':'[email protected]'}),...]
email_domains = users.map(lambda u: u[1]['email'].split('@')[1]).distinct()
# ['yahoo.com', 'python.org', 'gmail.com']
# Join users with their messages
message_tuples = messages.group_by(lambda m: m['user'])
data = users.inner_join(message_tuples)
# [('sarah',
#    (
#      {'date_created':'08/08','news_email':True,'email':'[email protected]'},
#      [{'date':'10/10','message':'what is a...','user':'sarah'}...]
#    )
#  ),...]
# From here you can imagine doing more complex analysisIn examples/camping_purchases.csv there is a list of camping purchases. Let's do some cost analysis and compare it to the required camping gear list stored in examples/gear_list.txt.
purchases = seq.csv('examples/camping_purchases.csv')
total_cost = purchases.select(lambda row: int(row[2])).sum()
# 1275
most_expensive_item = purchases.max_by(lambda row: int(row[2]))
# ['4', 'sleeping bag', ' 350']
purchased_list = purchases.select(lambda row: row[1])
gear_list = seq.open('examples/gear_list.txt').map(lambda row: row.strip())
missing_gear = gear_list.difference(purchased_list)
# ['water bottle','gas','toilet paper','lighter','spoons','sleeping pad',...]In addition to the aggregate functions shown above (sum and max_by) there are many more.
Similarly, there are several more set like functions in addition to difference.
PyFunctional can read and write to SQLite3 database files. In the example below, users are read
from examples/users.db which stores them as rows with columns id:Int and name:String.
db_path = 'examples/users.db'
users = seq.sqlite3(db_path, 'select * from user').to_list()
# [(1, 'Tom'), (2, 'Jack'), (3, 'Jane'), (4, 'Stephan')]]
sorted_users = seq.sqlite3(db_path, 'select * from user order by name').to_list()
# [(2, 'Jack'), (3, 'Jane'), (4, 'Stephan'), (1, 'Tom')]Writing to a SQLite3 database is similarly easy
import sqlite3
from collections import namedtuple
with sqlite3.connect(':memory:') as conn:
    conn.execute('CREATE TABLE user (id INT, name TEXT)')
    conn.commit()
    User = namedtuple('User', 'id name')
    # Write using a specific query
    seq([(1, 'pedro'), (2, 'fritz')]).to_sqlite3(conn, 'INSERT INTO user (id, name) VALUES (?, ?)')
    # Write by inserting values positionally from a tuple/list into named table
    seq([(3, 'sam'), (4, 'stan')]).to_sqlite3(conn, 'user')
    # Write by inferring schema from namedtuple
    seq([User(name='tom', id=5), User(name='keiga', id=6)]).to_sqlite3(conn, 'user')
    # Write by inferring schema from dict
    seq([dict(name='david', id=7), dict(name='jordan', id=8)]).to_sqlite3(conn, 'user')
    # Read everything back to make sure it wrote correctly
    print(list(conn.execute('SELECT * FROM user')))
    # [(1, 'pedro'), (2, 'fritz'), (3, 'sam'), (4, 'stan'), (5, 'tom'), (6, 'keiga'), (7, 'david'), (8, 'jordan')]Just as PyFunctional can read from csv, json, jsonl, sqlite3, and text files, it can
also write them. For complete API documentation see the collections API table or the official docs.
PyFunctional will auto-detect files compressed with gzip, lzma/xz, and bz2. This is done
by examining the first several bytes of the file to determine if it is compressed so therefore
requires no code changes to work.
To write compressed files, every to_ function has a parameter compression which can be set to
the default None for no compression, gzip or gz for gzip compression, lzma or xz for lzma
compression, and bz2 for bz2 compression.
The only change required to enable parallelism is to import from functional import pseq instead of
from functional import seq and use pseq where you would use seq. The following
operations are run in parallel with more to be implemented in a future release:
- map/- select
- filter/- filter_not/- where
- flat_map
Parallelization uses python multiprocessing and squashes chains of embarrassingly parallel
operations to reduce overhead costs. For example, a sequence of maps and filters would be executed
all at once rather than in multiple loops using multiprocessing.
Shortform documentation is below and full documentation is at docs.pyfunctional.pedro.ai.
All of PyFunctional streams can be accessed through the seq object. The primary way to create
a stream is by calling seq with an iterable. The seq callable is smart and is able to accept
multiple types of parameters as shown in the examples below.
# Passing a list
seq([1, 1, 2, 3]).to_set()
# [1, 2, 3]
# Passing direct arguments
seq(1, 1, 2, 3).map(lambda x: x).to_list()
# [1, 1, 2, 3]
# Passing a single value
seq(1).map(lambda x: -x).to_list()
# [-1]seq also provides entry to other streams as attribute functions as shown below.
# number range
seq.range(10)
# text file
seq.open('filepath')
# json file
seq.json('filepath')
# jsonl file
seq.jsonl('filepath')
# csv file
seq.csv('filepath')
seq.csv_dict_reader('filepath')
# sqlite3 db and sql query
seq.sqlite3('filepath', 'select * from data')For more information on the parameters that these functions can take, reference the streams documentation
Below is the complete list of functions which can be called on a stream object from seq. For
complete documentation reference
transformation and actions API.
| Function | Description | Type | 
|---|---|---|
| map(func)/select(func) | Maps funconto elements of sequence | transformation | 
| starmap(func)/smap(func) | Applies functo sequence withitertools.starmap | transformation | 
| filter(func)/where(func) | Filters elements of sequence to only those where func(element)isTrue | transformation | 
| filter_not(func) | Filters elements of sequence to only those where func(element)isFalse | transformation | 
| flatten() | Flattens sequence of lists to a single sequence | transformation | 
| flat_map(func) | Maps functo each element, then merges the result to one flat sequence.funcmust return an iterable | transformation | 
| group_by(func) | Groups sequence into (key, value)pairs wherekey=func(element)andvalueis from the original sequence | transformation | 
| group_by_key() | Groups sequence of (key, value)pairs bykey | transformation | 
| reduce_by_key(func) | Reduces list of (key, value)pairs usingfunc | transformation | 
| count_by_key() | Counts occurrence of each keyin sequence of(key, value)pairs | transformation | 
| count_by_value() | Counts occurrence of each value in the sequence | transformation | 
| union(other) | Union of unique elements in sequence and other | transformation | 
| intersection(other) | Intersection of unique elements in sequence and other | transformation | 
| difference(other) | New sequence with unique elements present in sequence but not in other | transformation | 
| symmetric_difference(other) | New sequence with unique elements present in sequence or other, but not both | transformation | 
| distinct() | Returns distinct elements of sequence. Elements must be hashable | transformation | 
| distinct_by(func) | Returns distinct elements of sequence using funcas a key | transformation | 
| drop(n) | Drops the first nelements of the sequence | transformation | 
| drop_right(n) | Drops the last nelements of the sequence | transformation | 
| drop_while(func) | Drops elements while funcevaluates toTrue, returning the rest | transformation | 
| take(n) | Returns sequence of first nelements | transformation | 
| take_while(func) | Takes elements while funcevaluates toTrue, dropping the rest | transformation | 
| init() | Returns sequence without the last element | transformation | 
| tail() | Returns sequence without the first element | transformation | 
| inits() | Returns consecutive inits of sequence | transformation | 
| tails() | Returns consecutive tails of sequence | transformation | 
| zip(other) | Zips the sequence with other | transformation | 
| zip_with_index(start=0) | Zips the sequence with the index starting at starton the right side | transformation | 
| zip_with_next() | Zips the sequence minus last element with itself minus first element, resulting in adjacent elements paired with each other | transformation | 
| enumerate(start=0) | Zips the sequence with the index starting at starton the left side | transformation | 
| cartesian(*iterables, repeat=1) | Returns cartesian product from itertools.product | transformation | 
| inner_join(other) | Returns inner join of sequence with other. Must be a sequence of(key, value)pairs | transformation | 
| outer_join(other) | Returns outer join of sequence with other. Must be a sequence of(key, value)pairs | transformation | 
| left_join(other) | Returns left join of sequence with other. Must be a sequence of(key, value)pairs | transformation | 
| right_join(other) | Returns right join of sequence with other. Must be a sequence of(key, value)pairs | transformation | 
| join(other, join_type='inner') | Returns join of sequence with otheras specified byjoin_type. Must be a sequence of(key, value)pairs | transformation | 
| partition(func) | Partitions the sequence into elements that satisfy func(element)and those that don't | transformation | 
| grouped(size) | Partitions the elements into groups of size size | transformation | 
| sorted(key=None, reverse=False)/order_by(func) | Returns elements sorted according to python sorted | transformation | 
| reverse() | Returns the reversed sequence | transformation | 
| slice(start, until) | Sequence starting at startand including elements up tountil | transformation | 
| head(no_wrap=None)/first(no_wrap=None) | Returns first element in sequence (if no_wrap=True, the result will never be wrapped withSequence) | action | 
| head_option(no_wrap=None) | Returns first element in sequence or Noneif its empty (ifno_wrap=True, the result will never be wrapped withSequence) | action | 
| last(no_wrap=None) | Returns last element in sequence (if no_wrap=True, the result will never be wrapped withSequence) | action | 
| last_option(no_wrap=None) | Returns last element in sequence or Noneif its empty (ifno_wrap=True, the result will never be wrapped withSequence) | action | 
| len()/size() | Returns length of sequence | action | 
| count(func) | Returns count of elements in sequence where func(element)is True | action | 
| empty() | Returns Trueif the sequence has zero length | action | 
| non_empty() | Returns Trueif sequence has non-zero length | action | 
| all() | Returns Trueif all elements in sequence are truthy | action | 
| exists(func) | Returns Trueiffunc(element)for any element in the sequence isTrue | action | 
| for_all(func) | Returns Trueiffunc(element)isTruefor all elements in the sequence | action | 
| find(func) | Returns the first element for which func(element)evaluates toTrue | action | 
| any() | Returns Trueif any element in sequence is truthy | action | 
| max() | Returns maximal element in sequence | action | 
| min() | Returns minimal element in sequence | action | 
| max_by(func) | Returns element with maximal value func(element) | action | 
| min_by(func) | Returns element with minimal value func(element) | action | 
| sum()/sum(projection) | Returns the sum of elements possibly using a projection | action | 
| product()/product(projection) | Returns the product of elements possibly using a projection | action | 
| average()/average(projection) | Returns the average of elements possibly using a projection | action | 
| aggregate(func)/aggregate(seed, func)/aggregate(seed, func, result_map) | Aggregates using funcstarting withseedor first element of list then appliesresult_mapto the result | action | 
| fold_left(zero_value, func) | Reduces element from left to right using funcand initial valuezero_value | action | 
| fold_right(zero_value, func) | Reduces element from right to left using funcand initial valuezero_value | action | 
| make_string(separator) | Returns string with separatorbetween eachstr(element) | action | 
| dict(default=None)/to_dict(default=None) | Converts a sequence of (Key, Value)pairs to adictionary. Ifdefaultis not None, it must be a value or zero argument callable which will be used to create acollections.defaultdict | action | 
| list()/to_list() | Converts sequence to a list | action | 
| set() / to_set() | Converts sequence to a set | action | 
| to_file(path) | Saves the sequence to a file at pathwith each element on a newline | action | 
| to_csv(path) | Saves the sequence to a csv file at pathwith each element representing a row | action | 
| to_jsonl(path) | Saves the sequence to a jsonl file with each element being transformed to json and printed to a new line | action | 
| to_json(path) | Saves the sequence to a json file. The contents depend on if the json root is an array or dictionary | action | 
| to_sqlite3(conn, tablename_or_query, *args, **kwargs) | Saves the sequence to a SQLite3 db. The target table must be created in advance | action | 
| to_pandas(columns=None) | Converts the sequence to a pandas DataFrame | action | 
| cache() | Forces evaluation of sequence immediately and caches the result | action | 
| for_each(func) | Executes funcon each element of the sequence | action | 
| peek(func) | Executes funcon each element of the sequence and returns it | transformation | 
Whenever possible, PyFunctional will compute lazily. This is accomplished by tracking the list
of transformations that have been applied to the sequence and only evaluating them when an action is
called. In PyFunctional this is called tracking lineage. This is also responsible for PyFunctional's
ability to cache the results of computations to prevent expensive re-computation.
This is predominantly done to preserve sensible behavior and used sparingly. For example, calling
size() will cache the underlying sequence. If this was not done and the input was an iterator,
then further calls would operate on an expired iterator since it was used to compute the length.
Similarly, repr also caches since it is most often used during interactive sessions where it's
undesirable to keep recomputing the same value. Below are some examples of inspecting lineage.
def times_2(x):
    return 2 * x
elements = (
   seq(1, 1, 2, 3, 4)
      .map(times_2)
      .peek(print)
      .distinct()
)
elements._lineage
# Lineage: sequence -> map(times_2) -> peek(print) -> distinct
l_elements = elements.to_list()
# Prints: 1
# Prints: 1
# Prints: 2
# Prints: 3
# Prints: 4
elements._lineage
# Lineage: sequence -> map(times_2) -> peek(print) -> distinct -> cache
l_elements = elements.to_list()
# The cached result is returned so times_2 is not called and nothing is printedFiles are given special treatment if opened through the seq.open and related APIs.
functional.util.ReusableFile implements a wrapper around the standard python file to support
multiple iterations over a single file object while correctly handling iteration termination and
file closing.
Even though functions like first() are supposed to return a single element, if the element is an iterable,
then it is wrapped into a Sequence. For instance:
>>> s = seq(list(), list())
>>> type(s.first())
<class 'functional.pipeline.Sequence'>
That behaviour can be changed with no_wrap option:
>>> type(s.first(no_wrap=True))
<class 'list'>
The option is also accepted by seq()/pseq() as well as Sequence() constructor, for example:
>>> type(seq([list(), list()], no_wrap=True).last())
<class 'list'>
- SQL based query planner and interpreter
- _lambda operator
Any contributions or bug reports are welcome. Thus far, there is a 100% acceptance rate for pull requests and contributors have offered valuable feedback and critique on code. It is great to hear from users of the package, especially what it is used for, what works well, and what could be improved.
To contribute, create a fork of PyFunctional, make your changes, then make sure that they pass.
In order to be merged, all pull requests must:
- Pass all the unit tests
- Pass all the pylint tests, or ignore warnings with explanation of why it's correct to do so
- Not significantly reduce coverage without a good reason (coveralls.io)
- Edit the CHANGELOG.mdfile in theNext Releaseheading with changes
- PyFunctional1.6 is tested against Python 3.12 and Python 3.13.
- PyFunctional1.5 is tested against Python 3.8 to 3.11. PyPy3 is not tested, but bug fixed on best effort basis.
- PyFunctional1.4 supports and is tested against Python 3.6, Python 3.7, and PyPy3
- PyFunctional1.4 and above do not support python 2.7
- PyFunctional1.4 works in Python 3.5, but is not tested against it
- PyFunctional1.4 and above partially works in 3.8, parallel processing currently has issues, but other feature work fine
- PyFunctional1.3 and below supports and was tested against Python 2.7, Python 3.5, Python 3.6, PyPy2, and PyPy3
To learn more about me (the author) visit my webpage at pedro.ai.
I created PyFunctional while using Python extensively, and finding that I missed the
ease of use for manipulating data that Spark RDDs and Scala collections have. The project takes the
best ideas from these APIs as well as LINQ to provide an easy way to manipulate data when using
Scala is not an option or PySpark is overkill.
These people have generously contributed their time to improving PyFunctional