# Elixir notes on processes: Task, Genserver, supervisor
```elixir
Mix.install([
{:kino, "~> 0.8.0"},
{:httpoison, "~> 2.0"},
{:jason, "~> 1.4"}
])
Supervisor.start_link(
[
{DynamicSupervisor, name: DynSup, strategy: :one_for_one},
{Registry, keys: :unique, name: MyRegistry},
{Task.Supervisor, name: MyTaskSupervisor}
],
strategy: :one_for_one,
name: MySupervisor
)
```
## Introduction
Elixir/Erlang has two main modules, [Task](https://hexdocs.pm/elixir/1.14.3/Task.html) and [Genservers](https://hexdocs.pm/elixir/1.14.3/GenServer.html), to run code concurrently. When you need to leverage concurency and run code in a short-lived _stateless_ single function without interaction, you use the `Task` module. When you need a long-running _stateful_ process where you need control and interactivity, you run the code with a `GenServer`.
## Task
Suppose you have to send millions of emails. It is a short-lived process which exits as soon as completed. You can run this code with a `Task` to leverage concurrency and use [Task.async](https://hexdocs.pm/elixir/1.14.3/Task.html#async/1).
<!-- livebook:{"break_markdown":true} -->
We spawn 2 processes:
```elixir
defmodule Email1 do
require Logger
@emails ["a@com", "b@com"]
def send_email(email) do
Process.sleep(1_000)
Logger.info("#{email} received")
end
def run do
Enum.each(@emails, fn email -> Task.async(fn -> send_email(email) end) end)
end
end
Time.utc_now() |> IO.inspect(label: :before)
Email1.run()
Time.utc_now() |> IO.inspect(label: :after)
```
> Note that in case of failure, the error will bubble up (see "Supervisor" further down).
<!-- livebook:{"break_markdown":true} -->
##### Get the response
We can use `Enum.each` if we just want to run the tasks. It returns `:ok`. If we want to the response of the task, we use `Enum.map` instead because this returns a list of corresponding Task structs. This struct is used by the [Task.await](https://hexdocs.pm/elixir/1.14.3/Task.html#await/2) or [Task.yield](https://hexdocs.pm/elixir/1.14.3/Task.html#yield/2) functions to get the result.
An "awaited" task returns the result or error whilst a "yielded" task returns `{:ok, result}` or `{:error, reason}`. Tasks are supposed to run within a certain amount of time. The parameter key is `:timeout`, defaults to 5000.
```elixir
t_await = Task.async(fn -> 1 end)
t_yield = Task.async(fn -> 2 end)
%{task: t_await, _await: Task.await(t_await), _yield: Task.yield(t_yield)}
```
We set the optional "timeout" value in ms in the [Task.yield](https://hexdocs.pm/elixir/1.14.3/Task.html#yield/2) function. When the task isn't completed within this period, it returns `nil`.
```elixir
Task.async(fn -> Process.sleep(1000) end)
|> Task.yield(500)
```
We can run repeatedly `yield` whilst not `await`.
```elixir
defmodule Continue do
require Logger
def _yield(task, timeout: time) do
case Task.yield(task, time) do
nil ->
Logger.info("called again")
_yield(task, timeout: time)
result ->
result
end
end
end
Task.async(fn -> Process.sleep(3000) end)
|> Continue._yield(timeout: 500)
```
#### `Async_stream`
Instead of doing `Enum.map |> Task.async`, we can do in one go with `Task.async_stream`. It is designed to create task processes from a list of items. The key point is we can set a _limit on the number of processes_ running at the same time with `:max_concurrency`. This provides _back-pressure_.
The function `Task.async_stream` returns a stream, a list of functions ready to be called. We use `Stream.run/1` to run it when you don't care about the result. It returns `:ok`. If we want a result, we use `Enum.to_list/1`.
```elixir
chars = Enum.map(?a..?l, fn x -> <<x::utf8>> <> "@com" end)
```
The function will run 4 concurrent stream so the 12 emails will be processed in 3 runs.
```elixir
defmodule MyTasks do
require Logger
@emails chars
def send_email(email) do
Process.sleep(500)
Logger.info("#{inspect(self())} processed #{email}")
end
def back_pressured do
Task.Supervisor.async_stream_nolink(
MyTaskSupervisor,
@emails,
&send_email(&1),
max_concurrency: 4,
ordered: false
)
end
end
Time.utc_now() |> IO.inspect(label: :before)
MyTasks.back_pressured() |> Stream.run()
Time.utc_now() |> IO.inspect(label: :after)
# |> Enum.to_list()
```
Usefull parameters are:
* `on_timeout: :kill_task`
* `ordered: false`
The function `System.schedulers_online()` returns the number of available cores that the function would use by default in the `max_concurrency` parameter.
<!-- livebook:{"break_markdown":true} -->
### Example of task: cron job
We want to mimic a cron job with a Task. The task runs another task and is turned into a long-running process by calling itself when the main process receives a delayed message.
We used `Process.send_after` for [not to use sleep](https://hexdocs.pm/elixir/Process.html#sleep/1). You need to build a permanent message receiver. Another implementation is done further down with a genserver, who supports messages handling among other benefits.
```elixir
defmodule Period do
use Task
def start(args) do
args = Keyword.merge(args, parent: self())
Task.start(__MODULE__, :do_run, [args])
end
def do_run([every: time, run: f, parent: parent] = args) do
Task.async(fn -> apply(f, []) end)
# send to the main process
Process.send_after(parent, {:loop, args}, time)
end
end
defmodule Clean do
require Logger
def up, do: Logger.info("cleaned!")
end
# continuous listener on the main process
defmodule Listen do
def loop do
receive do
{:loop, args} ->
# on message, run the task
Period.do_run(args)
loop()
after
10_000 -> :timeout
end
end
end
##### The final API:
pid =
spawn(fn ->
Period.start(
every: 3_000,
run: fn -> Clean.up() end
)
Listen.loop()
end)
Process.sleep(10_000)
Process.exit(pid, :shutdown)
```
> It is preferable to use the supervised form [Task.Superivsor.async_nolink](https://hexdocs.pm/elixir/1.14.3/Task.Supervisor.html#async_nolink/3). See further down.
## GenServer
A `Genserver` is a _(spawned)_ single threaded process. It is a mechanism to run a long-lived _stateful_ process that can communicate with other processes. It works with client-side functions and server-side callbacks and message passing. Messages are processed _sequentially_, "one at a time".
The client functions are accessible from everywhere. The callbacks are functions that you can't call; they exist only in the module that implements the genserver behaviour; you have to implement them within the genserver module.
Reading about using genservers: [spawn or not](https://www.theerlangelist.com/article/spawn_or_not)
> A process in `Erlang/Elixir` is a memory isolated and single threaded code runner; it has a mailbox and an address and communicates with asynchronous message passing.
The main client/server built-in functions in a genserver are: [cf Genserver cheat sheet](https://elixir-lang.org/downloads/cheatsheets/gen-server.pdf).
<!-- livebook:{"break_markdown":true} -->
| cilent / callback | what is does | replies? | what it returns |
| ---------------------------------------------------------------- | ----------------------------------------------------------- | :------: | --------------------------------- |
| `GenServer.start(args)` $\to$ `init(init_state)` | spawns a genserver and inits the state | yes | `{:ok, pid}` or `{:stop, reason}` |
| `GenServer.call(to,match)` $\to$ `handle_call(pid, from, match)` | internal sync to perform a task and replies `response` | yes | `{:reply, response, state}` |
| `GenServer.cast(to, match)` $\to$ `handle_cast(pid, match)` | internal async to perform a task without a response | no | `{:noreply, state}` |
| `GenServer.call` $\to$ `GenServer.reply` | async delayed response | yes | `:ok` |
| `send(pid, msg)` $\to$ `handle_info(msg, state)` | handle an async message from any process without a response | no | `{:noreply, state}` |
<!-- livebook:{"break_markdown":true} -->
You can use client functions in the callbacks, except:
> ❗ it is [not recommended](https://hexdocs.pm/elixir/1.14.3/Task.html#await/2-compatibility-with-otp-behaviours) to `await` a long-running task inside a genserver. Since suchs tasks are monitored by the genserver and return 2 messages, these message should be captured in a `handle_info` . Uou can also return a delayed response with `GenServer.reply` from a `GenServer.call` (see the example in "Running Async Tasks")
> ❗ You _cannot_ use a `GenServer.call` within a `handle_cast` or and `handle_info` callback because none of them returns whilst `call` awaits for a response to run. Use instead `send` and `handle_info`.
<!-- livebook:{"break_markdown":true} -->
##### Get info on a process
The module [Process](https://hexdocs.pm/elixir/1.14/Process.html) offers routines such as `Process.alive?(pid)` or `Process.info(pid)` that gives information: on the linked process `|> Keyword.get(:links)`, on the message queue `|> Keyworkd.get(:message_queue_len)` ...
The module [sys](https://hexdocs.pm/elixir/1.14.3/GenServer.html#module-debugging-with-the-sys-module) offers primitives for debugging `:sys.trace(pid, true)`, to retrieve the state `:sys.get_state(pid)`.
<!-- livebook:{"break_markdown":true} -->
##### Note on the `__MODULE__` placeholder
The variable `__MODULE__` is a placeholder. It is a compilation environment macro which is the **current module name as an atom**. It is convenient to use this variable instead of writting the literal module name.
```elixir
defmodule Test do
IO.puts("This module is named #{__MODULE__} and is an atom: #{is_atom(__MODULE__)}")
end
is_atom(Test)
```
## How to start a Genserver
A genserver is declared with the behaviour `use GenServer` and is started with (a function that implements a) `GenServer.start_link` or `start` to which you pass the tuple `{MODULE_NAME, state, options}`. This function returns `{:ok, pid}` or `{:error, {:already_started, pid}}`. This in turn automatically invokes the callback `init/1` to instanciate the state. We are required to define the `init` callback. You add `@impl true` to warn the compiler you are defining a callback.
An example of a minimal (and useless) genserver - where we put $1$ in its state - would be:
```elixir
defmodule MinimalGS do
use GenServer
@impl true
def init(1), do: {:ok, 1}
end
# instantiate
{:ok, pid} = GenServer.start(MinimalGS, 1)
```
We can check its state:
```elixir
:sys.get_state(pid)
```
You elaborate custom client functions to use callbacks. It relies on _pattern matching_ between the client function and the server callback. In the genserver module below, we have implemented only the callbacks:
```elixir
defmodule FirstGs do
use GenServer
@impl true
def init(init_args), do: {:ok, init_args}
@impl true
def handle_call(:double, _from, state), do: {:reply, state * 2, state}
def handle_call({:triple, n}, _from, state), do: {:reply, n * 3, state}
end
```
You can instantiate genservers "on-the-fly", identified by their PID:
```elixir
for i <- 1..3 do
{:ok, pid} = GenServer.start(FirstGs, i)
%{
pid: pid,
double: GenServer.call(pid, :double),
triple: GenServer.call(pid, {:triple, i}),
state: :sys.get_state(pid)
}
end
```
## How to stop a genserver
This can be done from a client function by invoking a `GenServer.stop(pid, :code)` order or equivalently letting a callback return `{:stop, reason, state}` from a callback. The optional callback `terminate` is triggered and can be used to handle cleanups before termination.
Other reason than `:normal` or `:shutdown` is considered as a crash by OTP. You may set `Process.flag(:trap_exit, true)` in the `init` callback to capture the `:EXIT` and let `terminate` run (eg save state into a database before exiting).
You can also `raise "message"`
<!-- livebook:{"force_markdown":true} -->
```elixir
GenServer.stop(pid, :code)
#or return a `:stop` based tuple in a callback
{:stop, :reason, state}
# or raise in th callback
raise "Stop me"
# or `Process.exit` in a callback
Process.exit(self(), :reason)
```
More indeep material:
* this blog about [Stop a genserver](https://alexcastano.com/how-to-stop-a-genserver-in-elixir/)
* you have a nice `Livebook`about this:
[](https://livebook.dev/run?url=https%3A%2F%2Fgithub.com%2Fpaulanthonywilson%2Ffurlough%2Fblob%2Fmaster%2F_posts%2Flivebook%2Fvaried-ways-to-kill.livemd)
```elixir
if Process.alive?(pid) do
:sys.get_state(pid) |> IO.inspect(label: " the state of the genserver LongInitGs is: ")
GenServer.stop(pid)
Process.alive?(pid)
end
```
##### Example of genserver usage
Long running code is discouraged in the `init` callback. You may want use the `:continue` instruction and implement the `handle_continue` callback to guaranty that this operation is finished before the Genserver handles another message/task. This is a blocking operation and avoids potential race conditions.
<!-- livebook:{"break_markdown":true} -->
The following usage of a genserver is an illustration; it is not appropriate as the simple Task used above is enough as you don't need to interact with this task. We implement again a genserver to run a cron job. It uses `handle_continue`. Once launched, it will run a reccurent job forever. We then kill the process shortly after.
```elixir
defmodule Periodic do
use GenServer
require Logger
@start Time.utc_now()
defp time, do: Time.diff(Time.utc_now(), @start, :millisecond)
def start_link(opts), do: GenServer.start(__MODULE__, opts)
def init(args) do
Logger.info("#{inspect(time())}ms: First Init")
{:ok, args, {:continue, :run}}
end
def handle_continue(:run, [every: time, run: _f] = state) do
Logger.info("#{inspect(time())}ms: Continue")
Process.send_after(self(), :repeat, time)
{:noreply, state}
end
def handle_info(:repeat, [every: time, run: myfun] = state) do
Logger.info("#{inspect(time())}ms: repeat")
Task.Supervisor.async_nolink(MyTaskSupervisor, fn -> apply(myfun, []) end)
Process.send_after(self(), :repeat, time)
{:noreply, state}
end
def handle_info({_, msg}, state) do
Logger.info("message from task: #{msg}")
{:noreply, state}
end
def handle_info(_msg, state) do
# IO.inspect(msg)
{:noreply, state}
end
end
defmodule Send do
def cleaner do
Process.sleep(500)
:cleaned!
end
end
{:ok, pid} =
Periodic.start_link(
every: 3_000,
run: fn -> Send.cleaner() end
)
Process.sleep(11_000)
GenServer.stop(pid, :shutdown)
```
## Linking Genservers
You can link processes with `Process.link/1`. This has to be used within a process: the parent is implicit and the child's PID is given as an argument. This is what `GenServer.start_link` does. It starts a **bi-directional** link with the caller. If you instantiate with `GenServer.start`, no link is created.
<!-- livebook:{"break_markdown":true} -->
> ❗ If a process is started with `start_link`, then this process will be linked to the current process, the `Livebook`. This means the livebook fails when the genserver fails (ie with other reason than `:normal` or `:shutdown`). One can use `Process.flag(:trap_exit, :true)` to capture the message `{:EXIT, from, reason}`. In Elixir, the `Supervisor` behaviour is meant to manage these links and restart policies.
<!-- livebook:{"break_markdown":true} -->
##### Example
Let's give an example with two linked genservers. The genserver `Parent` is instanciated with `GenServer.start` and it instantiates another genserver, called `Child` with `start_link`. When we kill one or another, both are killed.
```elixir
defmodule Child do
use GenServer
# -----
@impl true
def init(_), do: {:ok, {}}
@impl true
def terminate(reason, _), do: IO.inspect(reason, label: :terminate)
end
```
```elixir
defmodule Parent do
use GenServer
def link(pid, module), do: GenServer.call(pid, {:link, module})
# -------
@impl true
def init(_), do: {:ok, {}}
@impl true
def handle_call({:link, module}, _, _), do: {:reply, GenServer.start_link(module, :ok), {}}
@impl true
def terminate(reason, _), do: IO.inspect(reason, label: :terminate)
end
```
The routine `Process.info` returns a keyword list, in particular the the linked processes. The routine `Process.alive?` returns a boolean.
We link `Parent` and `child`:
```elixir
# instantiate a "Parent" genserver
{:ok, pid_parent} = GenServer.start(Parent, :ok) |> IO.inspect(label: :parent)
# instantiate a linked genserver "Child"#
{:ok, pid_child} = Parent.link(pid_parent, Child) |> IO.inspect(label: :child)
{Process.alive?(pid_parent), Process.alive?(pid_child)}
|> IO.inspect(label: :start)
Process.info(pid_parent) |> Keyword.get(:links) |> IO.inspect(label: :check_links_of_parent)
Process.info(pid_child) |> Keyword.get(:links) |> IO.inspect(label: :check_links_of_child)
# check 1
{Process.alive?(pid_parent), Process.alive?(pid_child)}
|> IO.inspect(label: :start)
```
We stop the parent: both are stopped.
```elixir
GenServer.stop(pid_parent, :shutdown)
# check 1
{Process.alive?(pid_parent), Process.alive?(pid_child)}
```
We restart and stop the child: both are stopped.
```elixir
{:ok, pid_parent} = GenServer.start(Parent, :ok)
{:ok, pid_child} = Parent.link(pid_parent, Child)
{Process.alive?(pid_parent), Process.alive?(pid_child)}
|> IO.inspect(label: :start)
# stop Child
GenServer.stop(pid_child, :shutdown)
# check 2
{Process.alive?(pid_parent), Process.alive?(pid_child)}
```
> ❗ If we instantiate `Parent` with `GenServer.start_link`, this will link `Parent` with the caller which is the main process, this `Livebook`. If we stop `Parent`, this will stop the `Livebook`, which we don't want.
<!-- livebook:{"break_markdown":true} -->
#### Restart strategies of processes ( Genserver, Task)
When you instantiate a genserver, you set the restart strategy by declaring it in: `use GenServer, restart: :strategy` where:
* `restart: :temporary`: the process will not be restarted even if it crashes,
* `restart: :transient`: when you want the process to be restarted for non-successful exits,
* `restart: :permanent`: always restarted. This is the default behaviour
## Supervisor
A supervisor is a "state machine": you declare with functions you want to maintain running. This relies on linking processes together.
<!-- livebook:{"break_markdown":true} -->
### Test with a Genserver
We start and supervise a genserver with `Supervisor.start_link` and pass a list of `child_spec` with a strategy. It is a good idea to use the convention to name `MyModule.start_link` the client function that invokes `GenServer.start_link` because of the defaults. ❗ Note that you have to use `GenServer.start_link` to link the supervisor and the module.
> ❗❗ The supervisor expects a return in the form `{:ok, pid}` from the function that starts a process. This means function can't contain any other instruction else than `GenServer.start_link`.
<!-- livebook:{"break_markdown":true} -->
Let's supervise this minimal genserver:
```elixir
defmodule SupMinimalGs do
use GenServer
# <- returns {:ok,pid} for the supervisor
def start_link, do: GenServer.start_link(__MODULE__, nil)
@impl true
def init(_), do: {:ok, nil}
end
```
The supervisor expects `child_specs`. It can be the module name or a map. If the supervised module is a genserver with no state, then we have a shortcut: the genserver name. Here we use show case an explicit `child_spec` map.
<!-- livebook:{"force_markdown":true} -->
```elixir
child_spec = %{id: SupMinimalGs, start: {SupMinimalGs, :start_link, []}}
Supervisor.start_link([child_spec], strategy: :one_for_one, name: TestSupervisor)
# or
Supervisor.start_link([SupMinimalGs], strategy: :one_for_one, name: TestSupervisor)
```
<!-- livebook:{"break_markdown":true} -->
In action, we instanciate a supervised module, check it is alive and kill it.
```elixir
child_spec = %{id: SupMinimalGs, start: {SupMinimalGs, :start_link, []}}
Supervisor.start_link([child_spec], strategy: :one_for_one, name: TestSupervisor)
# Supervisor.start_link([SupMinimalGs], strategy: :one_for_one, name: TestSupervisor)
[{SupMinimalGs, pid, _, _}] =
Supervisor.which_children(TestSupervisor)
|> Kino.inspect()
IO.puts("the genserver with pid #{inspect(pid)} is supervised")
Process.alive?(pid) |> Kino.inspect(label: :alive?)
GenServer.stop(pid, :shutdown) |> IO.inspect(label: :genserver_stopped)
Process.sleep(100)
Process.alive?(pid) |> Kino.inspect(label: :alive_after_exit?)
```
Then we observe that the supervisor created a new one.
```elixir
Process.sleep(1000)
# we check agai which children are supervised
[{SupMinimalGs, pid, _, _}] =
Supervisor.which_children(TestSupervisor)
|> Kino.inspect()
IO.puts("a new genserver is created with pid: #{inspect(pid)}")
Process.alive?(pid) |> Kino.inspect(label: :recreated_and_alive?)
```
#### Save the state
You notice that the state of the module is reset to the initial values. One way is to save the state in a database (`ETS` for example) in the `terminate` callback (for "normal" shutdowns or when `:EXIT` is trapped), and read from the database the the `init` callback.
<!-- livebook:{"break_markdown":true} -->
### Supervision tree
You normally start it within the module with the behaviour `use Application` with `Supervisor.start_link`. You can also define a module with the behaviour `use Supervisor`.
##### Strategies of Supervisors
* `strategy: :one_for_all`. If a child process terminates, all other child processes are terminated and then all child processes (including the terminated one) are restarted.
* `strategy :one_for_one`. Only restart the failed child process
* `strategy :rest_for_one`. Restart the failed process and any process started after it.
Look at the supervision tree: we use a Dynamical Supervisor (see below), a Registry (see later) and a Task.Supervisor later so we start and supervise them here.
```elixir
Process.whereis(MySupervisor)
```
The `Supervisor` module comes with some useful routines:
```elixir
pid = Process.whereis(MySupervisor)
Supervisor.count_children(pid)
```
```elixir
Supervisor.which_children(MySupervisor)
```
## Dynamic Supervisor
A dynamical supervisor is design to supervise "on-the-fly" singleton modules, such as genservers with specific attributes.
The usage is quite similar to supervisors.
```elixir
defmodule DynSupGs do
use GenServer, restart: :permanent
def start_link(v), do: GenServer.start_link(__MODULE__, v)
@impl true
def init(v), do: {:ok, v + 1}
end
```
```elixir
child_spec = %{id: DynSupGs, start: {DynSupGs, :start_link, [1]}}
# or simply child_spec = {DynSupGs, [1]} since we used the standard "start_link"
!is_nil(Process.whereis(DynSup))
|> IO.inspect()
DynamicSupervisor.start_child(DynSup, child_spec)
# we check which children are supervised, get the pid
list = DynamicSupervisor.which_children(DynSup)
{_, pid, _, _} = List.last(list)
IO.puts("the genserver with pid #{inspect(pid)} is supervised")
Process.alive?(pid) |> Kino.inspect(label: :alive?)
GenServer.stop(pid, :shutdown) |> IO.inspect(label: :genserver_stopped)
Process.sleep(100)
Process.alive?(pid) |> Kino.inspect(label: :alive_after_exit?)
Process.sleep(3000)
# we check again which children are supervised
list = DynamicSupervisor.which_children(DynSup)
{_, pid, _, _} = List.last(list)
IO.puts("a new genserver is created with pid: #{inspect(pid)}")
Process.alive?(pid) |> Kino.inspect()
:sys.get_state(pid)
```
You can review the supervised processes with the supervision tree by running:
```elixir
Process.whereis(MySupervisor)
```
## Task Supervisor
Tasks can also be supervised. You declare a [Task.Supervisor](https://hexdocs.pm/elixir/1.14.3/Task.Supervisor.html) in your supervising tree with the child spec: `{Task.Supervisor, name: MyTaskSupervisor}`. You can use the unlinked with [async_nolkink](https://hexdocs.pm/elixir/1.14.3/Task.Supervisor.html#async_nolink/3) or linked with [async](https://hexdocs.pm/elixir/1.14.3/Task.Supervisor.html#async/3) version:
```elixir
defmodule Email2 do
require Logger
@emails ["a@com", "b@com", "c@com"]
def send_email("b@com"), do: raise("oh no!")
def send_email(email), do: Logger.info("#{email}")
def run do
Enum.each(
@emails,
fn email ->
Task.Supervisor.async_nolink(
MyTaskSupervisor,
fn -> send_email(email) end
)
end
)
end
end
Email2.run()
```
## Genserver Registration
Every process can be given a name, aka as registration, with an **atom**: `Process.register(pid, :name)`. A Genserver can be referenced by its _PID_ or with the `:name` option (cf [name](https://hexdocs.pm/elixir/1.14.3/GenServer.html#module-name-registration)).
The identifier passed to `:name` must be:
* a unique atom such as `name: __MODULE__`.
* or use the `Registry` mechanism: `name: {:via, Registry, {MyRegistry, "name"}}`
> Note: this will uniquely identify a genserver on a node. You can identify it globally with `name: {:global, name}`. The genserver will now be uniquely identified within distributed Erlang nodes. Solutions to distribute registries exists with [Horde](https://hexdocs.pm/horde/readme.html).
<!-- livebook:{"force_markdown":true} -->
```elixir
# using the PID
{:ok, pid} = GenServer.call(__MODULE__, init_args)
Genserver.call(pid, :action)
# using the atom module name __MODULE__
GenServer.start_link(__MODULE__, init_args, name: __MODULE__)
GenServer.call(__MODULE__, :action)
# using any atom
GenServer.start_link(__MODULE__, init_args, name: :an_atom)
GenServer.call(:an_atom, :action)
# via a Registry. Make sure one has been instantiated
Registry.start_link(keys: :unique, name: MyRegistry)
via_tuple = {:via, Registry, {MyRegistry, "name"}}
GenServer.start_link(__MODULE__, init_args, name: via_tuple)
GenServer.call(via_tuple, :action)
```
<!-- livebook:{"break_markdown":true} -->
❗ A Genserver instantiated with the `:name` attribute or with the `:via` tuple can only be instantiated **once**. The instantiation routine `GenServer.start` returns `{:error, {already_started, pid}` or `{:ok, pid}`.
<!-- livebook:{"force_markdown":true} -->
```elixir
pid =
case GenServer.start(..) do
{:ok, pid} -> pid
{:error, {:already_started, pid}} -> pid
end
```
See the following examples.
<!-- livebook:{"break_markdown":true} -->
#### PID as identifier
The generated PID is grabbed by pattern matching on the reponse. You pass it to the client functions `GenServer.call(PID, message)` and `GenServer.cast(PID, message)`.
Each time you run the module below, a new PID is generated.
> Notice below that when the `Kernel.send` function is run in the context of the genserver (for example in the `handle_call`), we can use `self()` whereas when used in the context of the main thread (for example in the `CallByPid.start_link`), we have to use the PID to reach the `handle_info` handled by the Genserver.
```elixir
defmodule CallByPid do
require Logger
use GenServer
def start_link(state: state) do
{:ok, pid} = GenServer.start_link(__MODULE__, state)
# message handled by "handle_info({:msg, pid})"
send(pid, {:msg, pid})
# '.call' returns the 2d argument, which is the spwaned pid
GenServer.call(pid, :ready)
end
# callbacks
@impl true
def init(state) do
Logger.info("spawned #{inspect(self())} is instantiated with state: #{state}")
{:ok, state}
end
@impl true
def handle_call(:ready, {pid, _} = _from, state) do
send(
self(),
{:msg, "process #{inspect(pid)} asks #{inspect(self())} to run :ready and returns its PID"}
)
{:reply, self(), state}
end
# message printer
@impl true
def handle_info({:msg, message}, state) do
Logger.info("spawned received: #{inspect(message)} from main")
{:noreply, state}
end
end
require Logger
Logger.info("main process pid: " <> "#{inspect(self())}")
Kino.Process.render_seq_trace(fn ->
# sending to main process the pid of the spawned process with state 1
send(self(), {:spw1, CallByPid.start_link(state: "1")}) |> Kino.inspect()
# this returns the pid of the spwaned process with state 2
CallByPid.start_link(state: "2")
end)
```
We can check that our spawned process - the Genserver - is still alive and check its state directly:
```elixir
receive do
{:spw1, pid} ->
Logger.info(inspect(pid))
Process.alive?(pid) |> Kino.inspect(label: "spawned process #{inspect(pid)} is still alive?")
:sys.get_state(pid)
end
```
### The atom `__MODULE__` as identifier
Instead of the `pid`, we can use `name: __MODULE__` which is an atom. You can also use any atom.
Note that we can run `start_link` **only once** when we name the Genserver this way.
We see that the "named" Genserver" can only be instantiated once per name, thus we cannot change the initial state.
```elixir
defmodule CallByAtom do
require Logger
use GenServer
def start_link(state: state) do
case GenServer.start_link(__MODULE__, state, name: __MODULE__) do
{:ok, pid} ->
{:ok, pid}
{:error, {:already_started, pid}} ->
Logger.info(
"Genserver #{inspect(pid)} with name #{__MODULE__} and state #{state} was already started"
)
{:ok, pid}
end
end
# the process identifier is "__MODULE__"
def get_state, do: GenServer.call(__MODULE__, :get_state)
def init(state) do
Logger.info("#{inspect(self())} is instantiated with state: #{inspect(state)}")
{:ok, state}
end
def handle_call(:get_state, _from, state), do: {:reply, state, state}
end
CallByAtom.start_link(state: "1")
CallByAtom.get_state() |> IO.inspect()
CallByAtom.start_link(state: "2")
CallByAtom.get_state()
```
### The Registry
You can't use a string to register a Genserver. However, you can cast the string into an atom. This is however bad pratice in the Elixir world as the number of available atoms is limited.
You can use the [Registry](https://hexdocs.pm/elixir/1.14.3/Registry.html) mechanism instead. It is a [name -> pid] dictionnary, based on [ETS](https://elixir-lang.org/getting-started/mix-otp/ets.html), thus local to each `node`.
You instantiate a Registry, name it (`MyRegistry` here below).
<!-- livebook:{"force_markdown":true} -->
```elixir
Registry.start_link(keys: :unique, name: MyRegistry)
```
You can also isntantiate it in the supervision tree of the app:
<!-- livebook:{"force_markdown":true} -->
```elixir
children = [
...
{Registry, keys: :unique, name: MyOTPApp.Registry},
]
```
Then you can identify the Genserver with any string or a number when you use the following tuple below. It associates a PID with the variable "var" and stores it in the registry MyRegistry. When a Genserver receives such a tuple with the [:via](https://hexdocs.pm/elixir/1.14.3/Registry.html#module-using-in-via), he will lookup for the PID in the given registry:
<!-- livebook:{"force_markdown":true} -->
```elixir
{:via, Registry, {MyRegistry, "var"}}
```
```elixir
defmodule CallByRegistry do
require Logger
use GenServer
defp via_tuple(name), do: {:via, Registry, {ARegistry, name}}
def start_link(name: name, state: state) do
# instantiate a Registry
Registry.start_link(keys: :unique, name: ARegistry)
# reference a Genserver with the registry mechanism
GenServer.start_link(__MODULE__, state, name: via_tuple(name))
# use the registry mechanism
GenServer.call(via_tuple(name), :ready)
end
def get_state(name), do: via_tuple(name) |> :sys.get_state()
@impl true
def init(state) do
Logger.info("#{inspect(self())} is instantiated with state: #{inspect(state)}")
{:ok, state}
end
@impl true
def handle_call(:ready, _from, state) do
Logger.info("#{inspect(state)} ready")
{:reply, :step_0, state}
end
end
```
If we reevaluate several times the cell below, we see that can instanciate a Genserver with a given name only once, so the "name" and "state" is unique. If we want to instanciate another state, we create another Genserver with this state.
```elixir
CallByRegistry.start_link(name: "fish_1", state: "fish_1")
CallByRegistry.get_state("fish_1") |> Kino.inspect()
CallByRegistry.start_link(name: "fish_1", state: "fish_2")
CallByRegistry.get_state("fish_1") |> Kino.inspect()
CallByRegistry.start_link(name: "fish_2", state: "fish_2")
CallByRegistry.get_state("fish_2")
```
The Registry modules comes with useful [routines](https://hexdocs.pm/elixir/1.14.3/Registry.html#functions):
```elixir
Registry.count(ARegistry)
```
```elixir
CallByRegistry.get_state("fish_1")
```
```elixir
CallByRegistry.get_state("fish_2")
```
```elixir
Registry.lookup(ARegistry, "fish_2")
```
```elixir
[{pid, _}] = Registry.lookup(ARegistry, "fish_2")
Registry.keys(ARegistry, pid)
```
#### Useful resources on Registries
Besides the [doc](https://hexdocs.pm/elixir/1.14.3/Registry.html), you might find interesting:
* <https://dashbit.co/blog/homemade-analytics-with-ecto-and-elixir>
* <https://github.com/amokan/registry_sample/blob/master/lib/registry_sample/account.ex>
## Running Async Tasks in a Genserver
You could run a `Task.async |> Task.await` in a genserver process. However, it is [not recommended](https://hexdocs.pm/elixir/1.14.3/Task.html#await/2-compatibility-with-otp-behaviours) to await a long task in a genserver.
If you just want to run the task but don't care of the result, then you can launch an `async` task from a callback. This task is monitored by the Genserver. Once terminated, [two messages](https://hexdocs.pm/elixir/1.14.3/Task.html#await/2-compatibility-with-otp-behaviours) are generated with the format `{:DOWN,...,:normal}` and `{ref, result}`. You use a `GenServer.handle_info` to match on the response (see the example below).
Even if you may not use `await`, you can still get a delayed response from a `Task.async` when you use `GenServer.reply` (see the "successful' example below).
The typical use cases can be:
* you may run a long task in a Liveview and don't want it to crash because of this process. You will use `Task.Supervisor` (see example below). To supervise tasks, you register a `Task.Supervisor` in the supervision tree. The failure if any is collected in the `terminate` callback of the Genserver.
* you may also want the process to stop running when you change Liveview. You will use `Task.Supervisor.async` for this because the processes will be **linked**.
* in the situation you just want to launch a task and don't care of receiving a response, then you may use `Task.Supervisor.async_nolink` so the processes are not linked.
<!-- livebook:{"break_markdown":true} -->
We will use this HTTP call:
```elixir
defmodule AsyncTask do
def request do
Process.sleep(1_000)
HTTPoison.get!("http://httpbin.org/json")
|> Map.get(:body)
|> Jason.decode!()
|> get_in(["slideshow", "slides"])
end
def run, do: Task.async(&request/0)
end
AsyncTask.run()
```
The genserver below calls the `AsyncTask` module. It will receive 2 messages: the result `{ref, return from async task}` and a completion message of the finished task: `{:DOWN, ref, :process, pid}`. The "catch all" `handle_info` below receives these two messages.
```elixir
defmodule FirstAsyncTaskGsRunner do
use GenServer
require Logger
def run(pid), do: GenServer.call(pid, :async)
def init(_), do: {:ok, nil}
def handle_call(:async, _, _s) do
%{ref: ref, pid: pid} = AsyncTask.run()
reply = "task with pid #{inspect(pid)} sent"
{:reply, reply, ref}
end
# catch all
def handle_info(msg, ref) do
Logger.debug(inspect(msg))
{:noreply, ref}
end
def terminate(reason, _s), do: Logger.info("Terminate Child with reason: #{inspect(reason)}")
end
GenServer.start(FirstAsyncTaskGsRunner, {})
|> elem(1)
|> FirstAsyncTaskGsRunner.run()
```
We can use `Process.demonitor(ref, [:flush])` inside the `handle_info` callback where you received the first message `{ref, message}`. In case the task is successful, you will not receive the `{:DOWN, ref, :process, pid, :normal}` status message. If the task fails, you will receive a message in the `:DOWN` handler.
<!-- livebook:{"break_markdown":true} -->
#### Successful `AsyncTask` and usage of `GenServer.reply` to return
<!-- livebook:{"break_markdown":true} -->
It is recommended to run this `AsyncTask` as a supervised task (see the next example). Recall that you can link the task to the main process or not. If you want to stop the task when the main process stops (eg you leave a page), use `Task.Supervisor.async`, and `Task.Supervisor.async_nolink` otherwise.
##### Example with `GenServer.reply`
You can get a response back with [GenServer.reply/2](https://hexdocs.pm/elixir/1.14.3/GenServer.html#reply/2). In the callback `handle_call(:key, from, state)`, you reply instead with `{:noreply, state}`. You save the `from = {pid, [alias: ref]}` in the state so you can use it later. When the (monitored) async task is finished, there is message tuple sent with `{ref, result}`. Inside the callback you can use the function `GenServer.reply(from, result)` to return a delayed message from the `.call`.
```elixir
defmodule AsyncTaskReplier do
use GenServer
require Logger
def start, do: GenServer.start(__MODULE__, nil, name: __MODULE__)
def init(_), do: {:ok, nil}
def nolink, do: GenServer.call(__MODULE__, :nolink)
def handle_call(:nolink, from, _) do
%{ref: ref} = Task.Supervisor.async_nolink(MyTaskSupervisor, &AsyncTask.request/0)
# Task.Supervisor.async_nolink(MyTaskSupervisor, fn -> raise "bad" end)
{:noreply, {ref, from}}
end
def handle_info({r, response}, {ref, from}) when ref == r do
# <-- stops the second :DOWN message if success
Process.demonitor(ref, [:flush])
{pid, _} = from
GenServer.reply(from, {pid, response})
{:noreply, response}
end
def handle_info({:DOWN, _ref, :process, _pid, status}, s) do
Logger.debug("received status: #{inspect(status)}")
{:noreply, s}
end
# the Genserver will call "terminate" if killed.
def terminate(reason, _s) do
Logger.info("Terminate Main with reason: #{inspect(reason)}")
end
end
AsyncTaskReplier.start()
{pid, response} = AsyncTaskReplier.nolink()
Process.alive?(pid) |> IO.inspect()
response
```
We use this time the "normal" monitoring mechanism of async tasks (when we don't care of the response): we listen to the 2 messages sent. Both forms, linked and unlined, are coded.
```elixir
defmodule NoFailAsyncTaskRunner do
use GenServer
require Logger
def start, do: GenServer.start(__MODULE__, nil, name: __MODULE__)
def link, do: GenServer.call(__MODULE__, :async)
def nolink, do: GenServer.call(__MODULE__, :nolink)
def add, do: GenServer.call(__MODULE__, :one)
def init(_), do: {:ok, nil}
def handle_call(:one, _, s), do: {:reply, 1, s}
def handle_call(:async, _from, _) do
%{ref: ref, pid: pid} = Task.Supervisor.async(MyTaskSupervisor, &AsyncTask.request/0)
{:reply, pid, ref}
end
def handle_call(:nolink, _from, _) do
%{ref: ref, pid: pid} = Task.Supervisor.async_nolink(MyTaskSupervisor, &AsyncTask.request/0)
{:reply, pid, ref}
end
def handle_info({r, response}, ref) when ref == r do
Process.demonitor(ref, [:flush])
Logger.debug("response: #{inspect(response)}")
{:noreply, nil}
end
def handle_info({:DOWN, _ref, :process, pid, status}, _rf) do
Logger.debug("received status: #{inspect(status)} for the task #{inspect(pid)}}")
{:noreply, nil}
end
# the Genserver will call "terminate" if killed.
def terminate(reason, _s) do
Logger.info("Terminate Main with reason: #{inspect(reason)}")
end
end
```
We run the task:
```elixir
NoFailAsyncTaskRunner.start()
pid_task = NoFailAsyncTaskRunner.link()
```
We kill the main process. We check that the task is killed.
```elixir
Process.whereis(NoFailAsyncTaskRunner) |> GenServer.stop(:kill)
IO.puts("Is task alive? :#{Process.alive?(pid)}")
```
We kill the main process and run the task **unlinked**. We check that the task is still alive as we get the response from the asyn task.
```elixir
NoFailAsyncTaskRunner.start()
pid_task =
NoFailAsyncTaskRunner.nolink()
|> IO.inspect()
Process.whereis(NoFailAsyncTaskRunner) |> GenServer.stop(:kill)
IO.puts("Is task alive? :#{Process.alive?(pid_task)}")
```
#### Supervising an async task
Set-up another genserver below to run again the previous `AsyncTask` but we make it fail this time.
* when we pass `bool = true`, we supervise it with `Task.Supervisor.async`.
* when `bool = false`, we do not supervise it and run `Task.async`
```elixir
defmodule AsyncTaskRunner do
use GenServer
require Logger
def start_link() do
GenServer.start_link(__MODULE__, {}, name: __MODULE__)
end
def run(bool \\ false), do: GenServer.call(__MODULE__, {:async, bool})
def init(_), do: {:ok, {}}
def handle_call({:async, bool}, _, _) do
%{ref: ref} =
case bool do
true ->
Task.Supervisor.async(MyTaskSupervisor, fn ->
Process.exit(self(), :kill)
AsyncTask.request()
end)
false ->
Task.async(fn ->
Process.exit(self(), :kill)
AsyncTask.request()
end)
end
{:reply, Process.alive?(self()), ref}
end
def handle_info({r, response}, ref) when ref == r do
Logger.debug("response: #{inspect(response)}")
{:noreply, ref}
end
def handle_info({:DOWN, _ref, :process, _pid, status}, s) do
Logger.debug("received status: #{inspect(status)}")
{:noreply, s}
end
# the Genserver will call "terminate" if killed.
def terminate(reason, _s) do
Logger.info("Terminate Child with reason: #{inspect(reason)}")
end
end
```
We supervise the task runner above as a failing linked process will also kill the parent. It will run the (failing) `AsyncTask`:
```elixir
DynamicSupervisor.start_child(DynSup, %{id: 1, start: {AsyncTaskRunner, :start_link, []}})
{_, pid1, _, _} =
DynamicSupervisor.which_children(DynSup)
|> List.last()
```
We run the linked (failing) `AsyncTask` is a supervised task: `Task.Supervisor.async`. We check that the parent process `AsyncTaskRunner` hasn't been restarted (same PID).
```elixir
AsyncTaskRunner.run()
# check pid
{_, pid2, _, _} =
DynamicSupervisor.which_children(DynSup)
|> List.last()
|> IO.inspect(label: :after_supervised_task)
IO.puts("Same process? #{pid2 == pid2}")
```
The linked (failing) `AsyncTask` is not supervised: `Task.async`. We check that a new `AsyncTaskRunner` has been restarted by the DynamicSupervisor.
```elixir
AsyncTaskRunner.run(true)
Process.sleep(1000)
{_, pid3, _, _} =
DynamicSupervisor.which_children(DynSup)
|> List.last()
|> IO.inspect()
IO.puts("Same process? #{pid2 == pid3}")
```
## Concurrency
How fast does it take to launch plenty of genservers concurrently?
Even if using a genserver is clearly a wrong use case (no need for state nor communication), let's compute the factorial of a number with it. Recall the factorial noted $n!$ of a positive integer $n$ is the product of all the integers from 1 to $n$.
We compute this product in three ways:
* with a single process using `Enum.reduce` since $n! = n \cdot (n-1)$.
* lazily and concurrently by streaming slices of the list `[1,2,..,n]` into chunks of say 10 elements, and spawn (with `GenServer.start`) a genserver of each chunk. Each genserver is responsbile to calculate the subproduct of the integers in the chunk he receives. We finally run an `Enum.reduce` on this list of subproducts. This method will lazily run concurrently many genservers.
* running the same "stream chunks" process as above, lazily and concurrently, but without genservers.
```elixir
defmodule FactorialGs do
@moduledoc """
Genserver to compute the product of all integers in the received chunk
"""
def start_link(chunk),
do: GenServer.start(__MODULE__, chunk) |> elem(1) |> GenServer.call(:factorial)
def init(chunk), do: {:ok, chunk}
def handle_call(:factorial, _, s), do: {:reply, Eval.reduce(s), s}
end
defmodule Eval do
@moduledoc """
Expose three functions, `worker/1` and `stream/1` and `reduce/1`.
The first spawns genservers, the second is a pur stream implementation,
and the last is a single reduction process.
"""
@n 10
@doc """
Lazy streaming genservers without recursion
iex> Eval.worker(4)
24
"""
def worker(n) do
1..n
|> Stream.chunk_every(@n)
|> Stream.map(&FactorialGs.start_link(&1))
|> Enum.to_list()
|> Enum.reduce(1, &(&1 * &2))
end
@doc """
Lazy stream recursion without genserver
iex> Eval.stream(4)
24
"""
def stream(n), do: Enum.to_list(1..n) |> rec_stream()
def rec_stream(list) when is_list(list) and length(list) > @n do
list
|> Stream.chunk_every(@n)
|> Stream.map(&Eval.reduce/1)
|> Enum.to_list()
|> rec_stream()
end
def rec_stream(list) when is_list(list), do: Enum.reduce(list, 1, &(&1 * &2))
@doc """
Simple single process reduction
iex> Eval.reduce(1..4)
24
"""
def reduce(list), do: Enum.reduce(list, 1, &(&1 * &2))
end
Eval.worker(3)
```
When we measure the time (in ms) to compute the factorial of a big number, say 10_000, we see that the concurrent process spawning genservers is faster than a single process, even with the overhead of using genservers. The fastest is the lazy concurrent without the overhead of using genservers. We see that the BEAM is able to spawn around 1000 genservers per ms.
```elixir
n = 10_000
%{
reduce: :timer.tc(fn -> Eval.reduce(1..n) end) |> elem(0) |> div(1000),
genserver: :timer.tc(fn -> Eval.worker(n) end) |> elem(0) |> div(1000),
stream: :timer.tc(fn -> Eval.stream(n) end) |> elem(0) |> div(1000)
}
```