# State synchronization with state reducers
## State reducer
State is an inherent piece to most applications and we have to
deal with it on daily basis. Whether it's some local information
or description of the whole running system, it all comes down to
data. Let's consider a simple example:
```elixir
state = %{stack: []}
```
As a result of program evaluation, state may get transformed
a number of times. Most of the time we operate on the state directly,
that is, we modify the data structure to reflect the new situation.
In functional languages like Elixir, we keep the original data
intact and produce new data instead, but that's more of an
implementation detail really. The important point is that we
get to a new state.
```elixir
push = fn state, item ->
update_in(state.stack, &[item | &1])
end
state
|> push.(1)
|> push.(2)
```
That's how we generally deal with data, by defining operations
as functions. In other words, **functions are the API** used
to transform the data.
This approach does the job if we know all operations upfront,
but what if the operations are determined by events occurring
over the system lifetime? Events are just another kind of data,
so ideally our API would be data-driven. That's where **state
reducer** comes in!
```elixir
state = %{stack: []}
# Reducer takes the current state and an operation
# to produce the new transformed state
reducer = fn
state, {:push, item} ->
update_in(state.stack, &[item | &1])
state, :pop ->
update_in(state.stack, &tl/1)
state, :reverse ->
update_in(state.stack, &Enum.reverse/1)
end
state
|> IO.inspect()
|> reducer.({:push, 1})
|> IO.inspect()
|> reducer.({:push, 2})
|> IO.inspect()
|> reducer.({:push, 3})
|> IO.inspect()
|> reducer.(:pop)
|> IO.inspect()
|> reducer.(:reverse)
|> IO.inspect()
|> reducer.(:pop)
```
In the above approach, each operation is described as a data
structure (either `{:push, term}`, `:pop` or `:reverse`), so
this time **data is the API** used to transform the data.
In some sense, we simply added a tiny layer of abstraction
translating data (operations) into function calls. However,
the difference lies mainly in the way we use and think about
the new API.
One important detail about state reducer is that it should be
a pure function, but since it's only supposed to transform the
state data structure and we are within a functional language
this is the case sort of automatically.
### Implications
By modeling state operations as data structures, we can easily
send them between processes or systems. That's exactly what
we will explore further into the notebook.
Additionally, with this approach we could keep track of the
operations to implement an undo stack.
### State reducers in practice
It's likely that you already use the state reducer pattern
quite often! If you're familiar with [Redux](https://redux.js.org/)
for React, that's exactly the core concept behind it.
The name of `Enum.reduce/3` isn't accidental either! The accumulator
is your state, while the enumerable is a series of operations
we use to transform the data.
Finally, `GenServer` encapsulates state and transforms it
based on events it receives, which makes it a kind of reducer
too. This analogy is a bit loose though, because `GenServer`
implementations generally have side effects.
## State synchronization
Now let's move to the actual problem. Imagine a number of processes
(clients/users) that operate together on the same state. Initially
every client gets a local copy of the current state, so they are
aligned with each other, then they intend to apply modifications to
the sate. The question is, how do we ensure all the local states
stay in sync?
Let's have a quick look at two models addressing this problem.
### Eventual consistency
In this model the processes modify their local state and communicate changes
to all other processes. The local states may differ at a specific point in
time, but the goal is to ensure **eventual consistency**, meaning that eventually
all local states get back in sync. The tricky part is that each process may
get the peer changes in different order, so we need an algorithm that figures
out how to integrate these changes such that the final state ends up the same
for everyone.
So far the research produced two categories of algorithms -
**OT** (Operational Transformation) and **CRDT** (Conflict-free Replicated Data Type).
You can find a brief overview of these in [this article](https://www.tiny.cloud/blog/real-time-collaboration-ot-vs-crdt).
This approach surely sounds appealing, however these algorithms don't easily
generalize to arbitrary data structures and operations, and can be quite complex
on top of that.
### Serialized changes
As outlined above, the main challenge comes from the fact that processes
may receive peer changes in different order, so an obvious solution
would be to enforce a specific order of changes (serialize them). In terms
of implementation all we need to do is pass all changes through an
intermediate process that forwards each change to all client processes.
The intermediate process handles only a single message at a time, so the
changes are implicitly ordered and consequently every client process receives
exactly the same stream of changes.
Naturally, the simplicity of this approach has its own pitfalls. All changes
need to go through the intermediate process before their result is reflected
in the local state, which is not viable in cases like text collaboration,
where the UI must update immediately on every keystroke. Also, enforcing the
order doesn't eliminate race conditions, imagine two people removing an item
from one-item stack at the same time - we get two `:pop` changes and no matter
which comes first, the second is invalid. Sometimes applying one change may
invalidate the intent of other changes applied afterwards.
With all that said, these limitations may not necessarily be relevant,
in which case this model is a perfect fit, because it's simple, not limited
to specific data structures and ensures the local states change in exactly
the same way over time.
## Serialized changes and state reducer
This whole discussion on changes/operations applied to state sounds
like **state reducer**, doesn't it? In the **serialized changes** model,
each process starts with a state and then keeps applying operations to
it as they come. In other words, each process keeps reducing the state
upon subsequent operations.
Now back to the code!
### Synchronized state
Let's define a data structure representing the piece of state we need
to keep in sync, along with the `reduce` method handling a well defined
set of operations. That's essentially what we did in the first section,
just a bit more organized.
**Note:** we are going to use the term "data" instead of "state" to avoid
confusion with `GenServer` state later on.
```elixir
defmodule Data do
defstruct stack: []
@derive Inspect
@type t :: %__MODULE__{stack: list()}
@type operation :: {:push, term()} | :pop | :reverse
@doc """
Returns initial data.
"""
@spec new() :: t()
def new(), do: %__MODULE__{}
@doc """
Applies the change specified by `operation` to `data`.
"""
@spec reduce(t(), operation()) :: t()
def reduce(data, operation)
def reduce(data, {:push, item}) do
update_in(data.stack, &[item | &1])
end
def reduce(data, :pop) do
update_in(data.stack, &tl/1)
end
def reduce(data, :reverse) do
update_in(data.stack, &Enum.reverse/1)
end
end
```
### Server
Now we need a process for serializing the operations.
It's gonna be the source of truth for our data and
all operations will go through this process to implicitly
impose an order of the operations. Since it serves as
a communication proxy for all the client processes,
we are going to just call it a **server**.
In our example the server will do the minimal job
of tracking the clients and broadcasting operations.
In a real use case it could additionally coordinate
some work, given its central role.
```elixir
defmodule Server do
use GenServer
def start_link() do
GenServer.start_link(__MODULE__, [])
end
@doc """
Returns the current data and subscribes the caller
to change operations.
"""
def join(server) do
GenServer.call(server, :join)
end
@doc """
Notifies the server about intended change, which
gets broadcasted to all the clients.
"""
def submit_operation(server, operation) do
GenServer.cast(server, {:submit_operation, operation})
end
@impl true
def init(_opts) do
{:ok, %{clients: [], data: Data.new()}}
end
@impl true
def handle_call(:join, {pid, _}, state) do
Process.monitor(pid)
{:reply, state.data, %{state | clients: [pid | state.clients]}}
end
@impl true
def handle_cast({:submit_operation, operation}, state) do
# Send the operation to all the subscribed clients
broadcast_operation(operation, state.clients)
# Keep the updated data on the server, so that every
# new client can get the latest data
state = update_in(state.data, &Data.reduce(&1, operation))
[
:blue,
"Server #{inspect(self())} received #{inspect(operation)}, " <>
"new data: #{inspect(state.data)}"
]
|> IO.ANSI.format()
|> IO.puts()
{:noreply, state}
end
@impl true
def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
{:noreply, %{state | clients: List.delete(state.clients, pid)}}
end
defp broadcast_operation(operation, clients) do
# In a production app we would use a pubsub instead
for pid <- clients do
send(pid, {:operation, operation})
end
end
end
```
### Client
Finally, we need a number of concurrent clients with
a local copy of the data that we want to keep in sync.
In our example we imitate a client process by exposing
a `submit_operation` function to simulate the client
making a change to the data. The client processes are
specific to given use case, for instance these could be
LiveView processes willing to keep some state in sync
and the operations would correspond to user interactions.
```elixir
defmodule Client do
use GenServer
def start_link(server, opts \\ []) do
opts = put_in(opts[:server], server)
GenServer.start_link(__MODULE__, opts)
end
@doc """
Originates a change operation from the given client.
"""
def submit_operation(client, operation) do
GenServer.cast(client, {:submit_operation, operation})
end
@impl true
def init(opts) do
server = opts[:server]
color = opts[:color] || :light_black
current_data = Server.join(server)
{:ok, %{server: server, color: color, data: current_data}}
end
@impl true
def handle_cast({:submit_operation, operation}, state) do
Server.submit_operation(state.server, operation)
{:noreply, state}
end
@impl true
def handle_info({:operation, operation}, state) do
# Upon receiving an operation, we apply it to the local data
state = update_in(state.data, &Data.reduce(&1, operation))
[
state.color,
"Client #{inspect(self())} received #{inspect(operation)}, " <>
"new data: #{inspect(state.data)}"
]
|> IO.ANSI.format()
|> IO.puts()
{:noreply, state}
end
end
```
With these three pieces ready, let's start a server
with some clients:
```elixir
{:ok, server} = Server.start_link()
{:ok, client_g} = Client.start_link(server, color: :green)
{:ok, client_y} = Client.start_link(server, color: :yellow)
```
And we can simulate change operations being submitted by
the clients.
```elixir
client_g |> Client.submit_operation({:push, 1})
client_y |> Client.submit_operation({:push, 2})
client_y |> Client.submit_operation({:push, 3})
client_y |> Client.submit_operation(:reverse)
client_g |> Client.submit_operation(:pop)
client_y |> Client.submit_operation(:pop)
```
If we look closely at the output, we may notice that the server
receives operations in different order than the function calls above,
that's because they are sent by concurrent clients. However, the server,
and consequently all the clients, get and process operations in the
exact same order, ending up with the same data!
## Race conditions
As mentioned eralier, with this approach we are still open
to race conditions. In our example, imagine the following
scenario:
```elixir
Data.new()
# The stack has just a single element
|> Data.reduce({:push, 1})
# Two clients send a :pop change at the same time,
# it seems valid for both of them at that point
|> Data.reduce(:pop)
|> Data.reduce(:pop)
```
The kinds of race conditions and whether they are acceptable
depend on the specific use case. The simplest solution is
to validate incoming operations and ignore those that no longer
make sense for the latest data.
Implementation-wise we can modify `Data.reduce/2` to return
either `{:ok, data}` or `:error`.
```elixir
defmodule DataWithValidation do
defstruct stack: []
@derive Inspect
@type t :: %__MODULE__{stack: list()}
@type operation :: {:push, term()} | :pop | :reverse
@doc """
Returns initial data.
"""
@spec new() :: t()
def new(), do: %__MODULE__{}
@doc """
Applies the change specified by `operation` to `data`.
"""
@spec reduce(t(), operation()) :: {:ok, t()} | :error
def reduce(data, operation)
def reduce(data, {:push, item}) do
{:ok, update_in(data.stack, &[item | &1])}
end
def reduce(data, :pop) do
# In this case using "with" is an overkill,
# but this generally makes for a neat pattern
# if the validation involves multiple checks
with true <- data.stack != [] do
{:ok, update_in(data.stack, &tl/1)}
else
_ -> :error
end
end
def reduce(data, :reverse) do
{:ok, update_in(data.stack, &Enum.reverse/1)}
end
end
```
```elixir
alias DataWithValidation, as: Data
{:ok, data} = Data.new() |> Data.reduce({:push, 1})
# The first :pop operation is valid and updates the data
{:ok, data} = Data.reduce(data, :pop)
# The second :pop operation is invalid becuase the stack
# is empty, so we get an error instead
:error = Data.reduce(data, :pop)
```
The server/clients would then try applying the operation
to current data and ignore it if the result is an `:error`.
## Livebook
Livebook is a collaborative web application - multiple users can edit
the same notebook at the same time. You can verify this yourself by
opening the same session in two browser tabs and applying changes in
either of them.
When it comes to the implementation, each session (notebook page) has
a server process coordinating the work and multiple LiveView
processes - one per each browser tab. All of these processes keep
a local data with the notebook content and additional evaluation information.
To keep the data in sync Livebook uses the synchronization pattern outlined
above for all of the operations. The only exception is editing cell content,
which is solved using Operational Transformation to provide proper conflict
resolution in simultaneous text editing. However, the information and events
necessary for OT are also modeled as part of the data with corresponding
operations.
Feel free to have a look at [the source code](https://github.com/elixir-nx/livebook/blob/main/lib/livebook/session/data.ex)
for more insights.
## Final notes
In this notebook we covered the **state reducer** pattern,
which provides a data-centric API to state transformation.
Then, we discussed the problem of **state synchronization**
along with some approaches we may take to takle it. Finally,
we implemented synchronized state by **serializing** concurrent
operations and using **state reducer** for applying the updates.