Run this notebook

Use Livebook to open this notebook and explore new ideas.

It is easy to get started, on your machine or the cloud.

Click below to open and run it in your Livebook at .

(or change your Livebook location)

# Flow ```elixir Mix.install([:req, :flow]) ``` ## What Is Flow? Flow is an abstraction built on top of GenStage. Its job is *processing collections asynchronously* through series of stages. > But that's kind of what we did with `Task.async_stream/1`! — you, now Indeed, the similarities are a few: * They both work on *bounded* or *unbounded* collections * They both process collection elements in parallel However, Flow is more powerful, versatile, and customizable than `Task.async_stream/1`. Because of this, it's also more complex and has a steeper learning curve, so make sure it solves your problem better than `async_stream` before using it! Flow is built on top of GenStage itself. It's the next layer of abstraction when thinking about parallel data processing and map/reduce algorithms. ## Counting Words Let's implement the classic map/reduce toy example: counting the occurrence of every word in a text. We'll fetch the words from the [baconipsum.com](https://baconipsum.com) API 🥓 #### The `Enum` Version Let's start with how we'd do this with `Enum`. ```elixir start_time = System.monotonic_time(:millisecond) result = 1..10 |> Enum.map(fn _ -> Req.get!("https://baconipsum.com/api/?type=meat-and-filler&paras=50&format=text").body end) |> Enum.flat_map(fn paragraph -> String.split(paragraph, "\n", trim: true) end) |> Enum.flat_map(&String.split(&1, " ", trim: true)) |> Enum.reduce(%{}, fn word, acc -> Map.update(acc, word, 1, &(&1 + 1)) end) |> Enum.sort_by(fn {_word, count} -> count end, :desc) end_time = System.monotonic_time(:millisecond) IO.puts("Took us #{end_time - start_time}ms") result ``` Cool. Cool cool cool. This solution works. However, it is **sequential**. It also loads the whole text into memory for every request and then it keeps eagerly evaluating expressions. Each of those can be expensive: first we split the whole text into a full list of lines, then each line into a full list of words, and so on. We fully compute each step before moving to the next. I know what you're thinking: this is what **streams** are for. You're kind of right. #### The `Stream` Version It's easy enough to change the `Enum`-based version to use streams instead. ```elixir start_time = System.monotonic_time(:millisecond) result = 1..10 |> Stream.map(fn _ -> Req.get!("https://baconipsum.com/api/?type=meat-and-filler&paras=50&format=text").body end) |> Stream.flat_map(fn paragraph -> String.splitter(paragraph, "\n", trim: true) end) |> Stream.flat_map(&String.splitter(&1, " ", trim: true)) |> Enum.reduce(%{}, fn word, acc -> Map.update(acc, word, 1, &(&1 + 1)) end) |> Enum.sort_by(fn {_word, count} -> count end, :desc) end_time = System.monotonic_time(:millisecond) IO.puts("Took us #{end_time - start_time}ms") result ``` The differences are: * We use [`String.splitter/3`](https://hexdocs.pm/elixir/String.html#splitter/2) instead of `String.split/3`, since it returns a lazy stream * We change `Enum.flat_map/2` into `Stream.flat_map/2` Just like that, our solution is now *lazy* and won't compute every step and load it fully into memory. That's fantastic, but our execution time is kind of exactly the same! Splitting strings and stuff like that can take memory, but it's blazing-fast compared to performing HTTP requests. ### Enter Flow Flow tries its hardest to provide an API that's almost identical to `Enum` and `Stream`. This is how we'd rewrite our example. ```elixir start_time = System.monotonic_time(:millisecond) result = 1..10 |> Flow.from_enumerable(max_demand: 1, min_demand: 0) |> Flow.map(fn _ -> Req.get!("https://baconipsum.com/api/?type=meat-and-filler&paras=50&format=text").body end) |> Flow.flat_map(fn paragraph -> String.splitter(paragraph, "\n", trim: true) end) |> Flow.flat_map(&String.splitter(&1, " ", trim: true)) |> Flow.partition() |> Flow.reduce(fn -> %{} end, fn word, acc -> Map.update(acc, word, 1, &(&1 + 1)) end) |> Enum.sort_by(fn {_word, count} -> count end, :desc) end_time = System.monotonic_time(:millisecond) IO.puts("Took us #{end_time - start_time}ms") result ``` ## Partitioning The example above works thanks to a little function we snuck in there: [`Flow.partition/1`](https://hexdocs.pm/flow/Flow.html#partition/1). Flow executes computations in different processes, including the `Flow.reduce/3` step. This means that if we don't have a way to make sure words are deterministically processed in specific processes, we're not going to be able to perform the reduce step in parallel. This is a known issue when working with map/reduce: if you want the reduce step to happen in parallel, you need to divide the output of the map step in subsets that you can reduce on their own. `Flow.partition/1` does exactly that: it introduces a new set of *stages* and makes sure the same word is always mapped to the same stage. It does this through a hash function. ![](images/04-flow_partitioning.png) ## Windows and Triggers We won't go into too much detail on windows and triggers, but they're a powerful feature of Flow. The reason for having these features is *unbounded collections*. When working with unbounded collections, we can't simply `Flow.reduce/3` like we did in the word-counting example. We'll never arrive at a reduceable result if our collection has infinite elements. Windows and triggers address this problem. Windows let us split the collection based on time, and triggers let us tell the pipeline when to "flush" the results we have computed so far. Then, Flow can run its operations on *windows* of data instead of on the unbounded collection. Once we specify a window strategy, then we can use triggers to *materialize* the data in the window. The parallel computation now happens on the window itself, not on all the data like in the examples above. All events belong to the **global window** by default, which is returned by [`Flow.Window.global/0`](https://hexdocs.pm/flow/1.2.0/Flow.Window.html#global/0). ```elixir Flow.Window.global() ``` ### Triggers Let's see a trigger in action. We'll use the [`Flow.Window.trigger_every/2` trigger](https://hexdocs.pm/flow/Flow.Window.html#trigger_every/2), which triggers every `n` elements in the window. ```elixir window = Flow.Window.global() |> Flow.Window.trigger_every(10) Flow.from_enumerable(1..100) |> Flow.partition(window: window, stages: 1) |> Flow.reduce(fn -> 0 end, &(&1 + &2)) |> Flow.emit(:state) |> Enum.to_list() ``` As you can see in the output, every element in the returned list is the accumulated sum of the ten elements before it. #### What to Emit? You might notice a little call to [`Flow.emit/2`](https://hexdocs.pm/flow/Flow.html#emit/2) in the pipeline in our previous example. You can use `Flow.emit/2` to tell Flow what is the value you want to emit to the next stage of the computation. Often, you'd use `Flow.emit(flow, :events)`. In our case, what we want to emit to the next stage is the *state* calculated by `Flow.reduce/3`, so we use `Flow.emit(:state)`. ### Other Window Types We can have different window types. The [`Flow.Window.count/1` window](https://hexdocs.pm/flow/Flow.Window.html#count/1) considers the given `n` elements. We can see how this plays out with the `trigger_every` trigger we used above. In the example below, we consider a window of 15 elements at a time, but emit the trigger every 6 elements. ```elixir window = Flow.Window.count(15) |> Flow.Window.trigger_every(6) Stream.repeatedly(fn -> 5 end) |> Flow.from_enumerable() |> Flow.partition(window: window, stages: 1) |> Flow.reduce(fn -> 0 end, &(&1 + &2)) |> Flow.emit(:state) |> Enum.take(10) ``` ## Stages As mentioned at the beginning of this livebook, Flow is built on top of GenStage. In many cases, you won't even notice it. However, it's important because it means all the GenStage features we know and love power Flow as well. For example, Flow pipelines have built-in backpressure through GenStage's demand. You can also use GenStage producers or producer-consumers to feed a Flow pipeline through [`Flow.from_stages/2`](https://hexdocs.pm/flow/Flow.html#from_stages/1). To control parallelism and batching in Flow, you'll see GenStage concepts pop up. For example, the `Flow.partition/2` function we used above uses a `:stages` option to control how many processes to use in the partitioning. You can also use `:min_demand` and `:max_demand` to control batching.
See source

Have you already installed Livebook?

If you already installed Livebook, you can configure the default Livebook location where you want to open notebooks.
Livebook up Checking status We can't reach this Livebook (but we saved your preference anyway)
Run notebook

Not yet? Install Livebook in just a minute

Livebook is open source, free, and ready to run anywhere.

Run on your machine

with Livebook Desktop

Run in the cloud

on select platforms

To run on Linux, Docker, embedded devices, or Elixir’s Mix, check our README.

PLATINUM SPONSORS
SPONSORS
Code navigation with go to definition of modules and functions Read More ×