Run this notebook

Use Livebook to open this notebook and explore new ideas.

It is easy to get started, on your machine or the cloud.

Click below to open and run it in your Livebook at .

(or change your Livebook location)

<!-- livebook:{"persist_outputs":true} --> # Distribute ## Section A small toy to show how you might, given a stream, do a "fan out", processing different elements in separate streams. Powered by simple primitives like `Stream.resource` and `spawn_link`. ```elixir defmodule Distribute do @moduledoc """ Documentation for `Distribute`. """ def distribute(enum, key_fun, stream_funs, opts \\ []) do stream_ref = make_ref() collect? = Keyword.get(opts, :collect?, true) Stream.transform( enum, fn -> %{} end, fn i, pids -> stream_id = key_fun.(i) case Map.fetch(pids, stream_id) do {:ok, {pid, _ref}} -> send(pid, {:stream_distribute_push, stream_ref, i}) {[], pids} :error -> worker_ref = make_ref() pid = distributed_worker( self(), worker_ref, stream_ref, stream_funs[stream_id] || (& &1), collect? ) send(pid, {:stream_distribute_push, stream_ref, i}) {[], Map.put(pids, stream_id, {pid, worker_ref})} end end, fn pids -> if collect? do {pids |> Stream.flat_map(fn {_, {pid, worker_ref}} -> send(pid, {:stream_distribute_stop, stream_ref}) Stream.resource( fn -> nil end, fn :stop -> {:halt, nil} nil -> receive do {:stream_distribute_pull, ^worker_ref, ^stream_ref, :item, item} -> {[item], nil} {:stream_distribute_pull, ^worker_ref, ^stream_ref, :done} -> {receive_all_pulls(worker_ref, stream_ref), :stop} after 0 -> {[], nil} end end, fn nil -> :ok end ) end), nil} else {[], nil} end end, fn _ -> :ok end ) end defp distributed_worker(parent, worker_ref, stream_ref, stream_fun, collect?) do spawn_link(fn -> stream = Stream.resource( fn -> nil end, fn :stop -> {:halt, nil} nil -> receive do {:stream_distribute_push, ^stream_ref, item} -> {[item], nil} {:stream_distribute_stop, ^stream_ref} -> {receive_all_pushes(stream_ref), :stop} after 0 -> {[], nil} end end, fn nil -> :ok end ) stream = stream_fun.(stream) if collect? do Enum.each( stream, &send(parent, {:stream_distribute_pull, worker_ref, stream_ref, :item, &1}) ) send(parent, {:stream_distribute_pull, worker_ref, stream_ref, :done}) [] else Stream.run(stream) send(parent, {:stream_distribute_pull, worker_ref, stream_ref, :done}) end end) end defp receive_all_pulls(worker_ref, stream_ref, acc \\ []) do receive do {:stream_distribute_pull, ^worker_ref, ^stream_ref, :item, item} -> [item | acc] after 0 -> acc end end defp receive_all_pushes(stream_ref, acc \\ []) do receive do {:stream_distribute_push, ^stream_ref, item} -> [item | acc] after 0 -> acc end end end ``` <!-- livebook:{"output":true} --> ``` {:module, Distribute, <<70, 79, 82, 49, 0, 0, 26, ...>>, {:receive_all_pushes, 2}} ``` ```elixir processed = [{:a, 1}, {:a, 2}, {:b, 1}, {:b, 2}] |> Distribute.distribute(&elem(&1, 0), %{ a: fn stream -> Stream.map(stream, &{:a, elem(&1, 1), self()}) end, b: fn stream -> Stream.map(stream, &{:b, elem(&1, 1), self()}) end }) |> Enum.to_list() |> IO.inspect() {as, bs} = Enum.split_with(processed, &(elem(&1, 0) == :a)) a_pids = as |> Enum.map(&elem(&1, 2)) |> Enum.uniq() |> IO.inspect() b_pids = bs |> Enum.map(&elem(&1, 2)) |> Enum.uniq() |> IO.inspect() a_pids != b_pids ``` <!-- livebook:{"output":true} --> ``` [ {:a, 1, #PID<0.201.0>}, {:a, 2, #PID<0.201.0>}, {:b, 1, #PID<0.202.0>}, {:b, 2, #PID<0.202.0>} ] [#PID<0.201.0>] [#PID<0.202.0>] ``` <!-- livebook:{"output":true} --> ``` true ```
See source

Have you already installed Livebook?

If you already installed Livebook, you can configure the default Livebook location where you want to open notebooks.
Livebook up Checking status We can't reach this Livebook (but we saved your preference anyway)
Run notebook

Not yet? Install Livebook in just a minute

Livebook is open source, free, and ready to run anywhere.

Run on your machine

with Livebook Desktop

Run in the cloud

on select platforms

To run on Linux, Docker, embedded devices, or Elixir’s Mix, check our README.

PLATINUM SPONSORS
SPONSORS
Code navigation with go to definition of modules and functions Read More ×