Run this notebook

Use Livebook to open this notebook and explore new ideas.

It is easy to get started, on your machine or the cloud.

Click below to open and run it in your Livebook at .

(or change your Livebook location)

# State synchronization with state reducers ## State reducer State is an inherent piece to most applications and we have to deal with it on daily basis. Whether it's some local information or description of the whole running system, it all comes down to data. Let's consider a simple example: ```elixir state = %{stack: []} ``` As a result of program evaluation, state may get transformed a number of times. Most of the time we operate on the state directly, that is, we modify the data structure to reflect the new situation. In functional languages like Elixir, we keep the original data intact and produce new data instead, but that's more of an implementation detail really. The important point is that we get to a new state. ```elixir push = fn state, item -> update_in(state.stack, &[item | &1]) end state |> push.(1) |> push.(2) ``` That's how we generally deal with data, by defining operations as functions. In other words, **functions are the API** used to transform the data. This approach does the job if we know all operations upfront, but what if the operations are determined by events occurring over the system lifetime? Events are just another kind of data, so ideally our API would be data-driven. That's where **state reducer** comes in! ```elixir state = %{stack: []} # Reducer takes the current state and an operation # to produce the new transformed state reducer = fn state, {:push, item} -> update_in(state.stack, &[item | &1]) state, :pop -> update_in(state.stack, &tl/1) state, :reverse -> update_in(state.stack, &Enum.reverse/1) end state |> IO.inspect() |> reducer.({:push, 1}) |> IO.inspect() |> reducer.({:push, 2}) |> IO.inspect() |> reducer.({:push, 3}) |> IO.inspect() |> reducer.(:pop) |> IO.inspect() |> reducer.(:reverse) |> IO.inspect() |> reducer.(:pop) ``` In the above approach, each operation is described as a data structure (either `{:push, term}`, `:pop` or `:reverse`), so this time **data is the API** used to transform the data. In some sense, we simply added a tiny layer of abstraction translating data (operations) into function calls. However, the difference lies mainly in the way we use and think about the new API. One important detail about state reducer is that it should be a pure function, but since it's only supposed to transform the state data structure and we are within a functional language this is the case sort of automatically. ### Implications By modeling state operations as data structures, we can easily send them between processes or systems. That's exactly what we will explore further into the notebook. Additionally, with this approach we could keep track of the operations to implement an undo stack. ### State reducers in practice It's likely that you already use the state reducer pattern quite often! If you're familiar with [Redux](https://redux.js.org/) for React, that's exactly the core concept behind it. The name of `Enum.reduce/3` isn't accidental either! The accumulator is your state, while the enumerable is a series of operations we use to transform the data. Finally, `GenServer` encapsulates state and transforms it based on events it receives, which makes it a kind of reducer too. This analogy is a bit loose though, because `GenServer` implementations generally have side effects. ## State synchronization Now let's move to the actual problem. Imagine a number of processes (clients/users) that operate together on the same state. Initially every client gets a local copy of the current state, so they are aligned with each other, then they intend to apply modifications to the sate. The question is, how do we ensure all the local states stay in sync? Let's have a quick look at two models addressing this problem. ### Eventual consistency In this model the processes modify their local state and communicate changes to all other processes. The local states may differ at a specific point in time, but the goal is to ensure **eventual consistency**, meaning that eventually all local states get back in sync. The tricky part is that each process may get the peer changes in different order, so we need an algorithm that figures out how to integrate these changes such that the final state ends up the same for everyone. So far the research produced two categories of algorithms - **OT** (Operational Transformation) and **CRDT** (Conflict-free Replicated Data Type). You can find a brief overview of these in [this article](https://www.tiny.cloud/blog/real-time-collaboration-ot-vs-crdt). This approach surely sounds appealing, however these algorithms don't easily generalize to arbitrary data structures and operations, and can be quite complex on top of that. ### Serialized changes As outlined above, the main challenge comes from the fact that processes may receive peer changes in different order, so an obvious solution would be to enforce a specific order of changes (serialize them). In terms of implementation all we need to do is pass all changes through an intermediate process that forwards each change to all client processes. The intermediate process handles only a single message at a time, so the changes are implicitly ordered and consequently every client process receives exactly the same stream of changes. Naturally, the simplicity of this approach has its own pitfalls. All changes need to go through the intermediate process before their result is reflected in the local state, which is not viable in cases like text collaboration, where the UI must update immediately on every keystroke. Also, enforcing the order doesn't eliminate race conditions, imagine two people removing an item from one-item stack at the same time - we get two `:pop` changes and no matter which comes first, the second is invalid. Sometimes applying one change may invalidate the intent of other changes applied afterwards. With all that said, these limitations may not necessarily be relevant, in which case this model is a perfect fit, because it's simple, not limited to specific data structures and ensures the local states change in exactly the same way over time. ## Serialized changes and state reducer This whole discussion on changes/operations applied to state sounds like **state reducer**, doesn't it? In the **serialized changes** model, each process starts with a state and then keeps applying operations to it as they come. In other words, each process keeps reducing the state upon subsequent operations. Now back to the code! ### Synchronized state Let's define a data structure representing the piece of state we need to keep in sync, along with the `reduce` method handling a well defined set of operations. That's essentially what we did in the first section, just a bit more organized. **Note:** we are going to use the term "data" instead of "state" to avoid confusion with `GenServer` state later on. ```elixir defmodule Data do defstruct stack: [] @derive Inspect @type t :: %__MODULE__{stack: list()} @type operation :: {:push, term()} | :pop | :reverse @doc """ Returns initial data. """ @spec new() :: t() def new(), do: %__MODULE__{} @doc """ Applies the change specified by `operation` to `data`. """ @spec reduce(t(), operation()) :: t() def reduce(data, operation) def reduce(data, {:push, item}) do update_in(data.stack, &[item | &1]) end def reduce(data, :pop) do update_in(data.stack, &tl/1) end def reduce(data, :reverse) do update_in(data.stack, &Enum.reverse/1) end end ``` ### Server Now we need a process for serializing the operations. It's gonna be the source of truth for our data and all operations will go through this process to implicitly impose an order of the operations. Since it serves as a communication proxy for all the client processes, we are going to just call it a **server**. In our example the server will do the minimal job of tracking the clients and broadcasting operations. In a real use case it could additionally coordinate some work, given its central role. ```elixir defmodule Server do use GenServer def start_link() do GenServer.start_link(__MODULE__, []) end @doc """ Returns the current data and subscribes the caller to change operations. """ def join(server) do GenServer.call(server, :join) end @doc """ Notifies the server about intended change, which gets broadcasted to all the clients. """ def submit_operation(server, operation) do GenServer.cast(server, {:submit_operation, operation}) end @impl true def init(_opts) do {:ok, %{clients: [], data: Data.new()}} end @impl true def handle_call(:join, {pid, _}, state) do Process.monitor(pid) {:reply, state.data, %{state | clients: [pid | state.clients]}} end @impl true def handle_cast({:submit_operation, operation}, state) do # Send the operation to all the subscribed clients broadcast_operation(operation, state.clients) # Keep the updated data on the server, so that every # new client can get the latest data state = update_in(state.data, &Data.reduce(&1, operation)) [ :blue, "Server #{inspect(self())} received #{inspect(operation)}, " <> "new data: #{inspect(state.data)}" ] |> IO.ANSI.format() |> IO.puts() {:noreply, state} end @impl true def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do {:noreply, %{state | clients: List.delete(state.clients, pid)}} end defp broadcast_operation(operation, clients) do # In a production app we would use a pubsub instead for pid <- clients do send(pid, {:operation, operation}) end end end ``` ### Client Finally, we need a number of concurrent clients with a local copy of the data that we want to keep in sync. In our example we imitate a client process by exposing a `submit_operation` function to simulate the client making a change to the data. The client processes are specific to given use case, for instance these could be LiveView processes willing to keep some state in sync and the operations would correspond to user interactions. ```elixir defmodule Client do use GenServer def start_link(server, opts \\ []) do opts = put_in(opts[:server], server) GenServer.start_link(__MODULE__, opts) end @doc """ Originates a change operation from the given client. """ def submit_operation(client, operation) do GenServer.cast(client, {:submit_operation, operation}) end @impl true def init(opts) do server = opts[:server] color = opts[:color] || :light_black current_data = Server.join(server) {:ok, %{server: server, color: color, data: current_data}} end @impl true def handle_cast({:submit_operation, operation}, state) do Server.submit_operation(state.server, operation) {:noreply, state} end @impl true def handle_info({:operation, operation}, state) do # Upon receiving an operation, we apply it to the local data state = update_in(state.data, &Data.reduce(&1, operation)) [ state.color, "Client #{inspect(self())} received #{inspect(operation)}, " <> "new data: #{inspect(state.data)}" ] |> IO.ANSI.format() |> IO.puts() {:noreply, state} end end ``` With these three pieces ready, let's start a server with some clients: ```elixir {:ok, server} = Server.start_link() {:ok, client_g} = Client.start_link(server, color: :green) {:ok, client_y} = Client.start_link(server, color: :yellow) ``` And we can simulate change operations being submitted by the clients. ```elixir client_g |> Client.submit_operation({:push, 1}) client_y |> Client.submit_operation({:push, 2}) client_y |> Client.submit_operation({:push, 3}) client_y |> Client.submit_operation(:reverse) client_g |> Client.submit_operation(:pop) client_y |> Client.submit_operation(:pop) ``` If we look closely at the output, we may notice that the server receives operations in different order than the function calls above, that's because they are sent by concurrent clients. However, the server, and consequently all the clients, get and process operations in the exact same order, ending up with the same data! ## Race conditions As mentioned eralier, with this approach we are still open to race conditions. In our example, imagine the following scenario: ```elixir Data.new() # The stack has just a single element |> Data.reduce({:push, 1}) # Two clients send a :pop change at the same time, # it seems valid for both of them at that point |> Data.reduce(:pop) |> Data.reduce(:pop) ``` The kinds of race conditions and whether they are acceptable depend on the specific use case. The simplest solution is to validate incoming operations and ignore those that no longer make sense for the latest data. Implementation-wise we can modify `Data.reduce/2` to return either `{:ok, data}` or `:error`. ```elixir defmodule DataWithValidation do defstruct stack: [] @derive Inspect @type t :: %__MODULE__{stack: list()} @type operation :: {:push, term()} | :pop | :reverse @doc """ Returns initial data. """ @spec new() :: t() def new(), do: %__MODULE__{} @doc """ Applies the change specified by `operation` to `data`. """ @spec reduce(t(), operation()) :: {:ok, t()} | :error def reduce(data, operation) def reduce(data, {:push, item}) do {:ok, update_in(data.stack, &[item | &1])} end def reduce(data, :pop) do # In this case using "with" is an overkill, # but this generally makes for a neat pattern # if the validation involves multiple checks with true <- data.stack != [] do {:ok, update_in(data.stack, &tl/1)} else _ -> :error end end def reduce(data, :reverse) do {:ok, update_in(data.stack, &Enum.reverse/1)} end end ``` ```elixir alias DataWithValidation, as: Data {:ok, data} = Data.new() |> Data.reduce({:push, 1}) # The first :pop operation is valid and updates the data {:ok, data} = Data.reduce(data, :pop) # The second :pop operation is invalid becuase the stack # is empty, so we get an error instead :error = Data.reduce(data, :pop) ``` The server/clients would then try applying the operation to current data and ignore it if the result is an `:error`. ## Livebook Livebook is a collaborative web application - multiple users can edit the same notebook at the same time. You can verify this yourself by opening the same session in two browser tabs and applying changes in either of them. When it comes to the implementation, each session (notebook page) has a server process coordinating the work and multiple LiveView processes - one per each browser tab. All of these processes keep a local data with the notebook content and additional evaluation information. To keep the data in sync Livebook uses the synchronization pattern outlined above for all of the operations. The only exception is editing cell content, which is solved using Operational Transformation to provide proper conflict resolution in simultaneous text editing. However, the information and events necessary for OT are also modeled as part of the data with corresponding operations. Feel free to have a look at [the source code](https://github.com/elixir-nx/livebook/blob/main/lib/livebook/session/data.ex) for more insights. ## Final notes In this notebook we covered the **state reducer** pattern, which provides a data-centric API to state transformation. Then, we discussed the problem of **state synchronization** along with some approaches we may take to takle it. Finally, we implemented synchronized state by **serializing** concurrent operations and using **state reducer** for applying the updates.
See source

Have you already installed Livebook?

If you already installed Livebook, you can configure the default Livebook location where you want to open notebooks.
Livebook up Checking status We can't reach this Livebook (but we saved your preference anyway)
Run notebook

Not yet? Install Livebook in just a minute

Livebook is open source, free, and ready to run anywhere.

Run in the cloud

on select platforms

To run on Linux, Docker, embedded devices, or Elixir’s Mix, check our README.

PLATINUM SPONSORS
SPONSORS
Code navigation with go to definition of modules and functions Read More