Run this notebook

Use Livebook to open this notebook and explore new ideas.

It is easy to get started, on your machine or the cloud.

Click below to open and run it in your Livebook at .

(or change your Livebook location)

# Elixir notes on processes: Task, Genserver, supervisor ```elixir Mix.install([ {:kino, "~> 0.8.0"}, {:httpoison, "~> 2.0"}, {:jason, "~> 1.4"} ]) Supervisor.start_link( [ {DynamicSupervisor, name: DynSup, strategy: :one_for_one}, {Registry, keys: :unique, name: MyRegistry}, {Task.Supervisor, name: MyTaskSupervisor} ], strategy: :one_for_one, name: MySupervisor ) ``` ## Introduction Elixir/Erlang has two main modules, [Task](https://hexdocs.pm/elixir/1.14.3/Task.html) and [Genservers](https://hexdocs.pm/elixir/1.14.3/GenServer.html), to run code concurrently. When you need to leverage concurency and run code in a short-lived _stateless_ single function without interaction, you use the `Task` module. When you need a long-running _stateful_ process where you need control and interactivity, you run the code with a `GenServer`. ## Task Suppose you have to send millions of emails. It is a short-lived process which exits as soon as completed. You can run this code with a `Task` to leverage concurrency and use [Task.async](https://hexdocs.pm/elixir/1.14.3/Task.html#async/1). <!-- livebook:{"break_markdown":true} --> We spawn 2 processes: ```elixir defmodule Email1 do require Logger @emails ["a@com", "b@com"] def send_email(email) do Process.sleep(1_000) Logger.info("#{email} received") end def run do Enum.each(@emails, fn email -> Task.async(fn -> send_email(email) end) end) end end Time.utc_now() |> IO.inspect(label: :before) Email1.run() Time.utc_now() |> IO.inspect(label: :after) ``` > Note that in case of failure, the error will bubble up (see "Supervisor" further down). <!-- livebook:{"break_markdown":true} --> ##### Get the response We can use `Enum.each` if we just want to run the tasks. It returns `:ok`. If we want to the response of the task, we use `Enum.map` instead because this returns a list of corresponding Task structs. This struct is used by the [Task.await](https://hexdocs.pm/elixir/1.14.3/Task.html#await/2) or [Task.yield](https://hexdocs.pm/elixir/1.14.3/Task.html#yield/2) functions to get the result. An "awaited" task returns the result or error whilst a "yielded" task returns `{:ok, result}` or `{:error, reason}`. Tasks are supposed to run within a certain amount of time. The parameter key is `:timeout`, defaults to 5000. ```elixir t_await = Task.async(fn -> 1 end) t_yield = Task.async(fn -> 2 end) %{task: t_await, _await: Task.await(t_await), _yield: Task.yield(t_yield)} ``` We set the optional "timeout" value in ms in the [Task.yield](https://hexdocs.pm/elixir/1.14.3/Task.html#yield/2) function. When the task isn't completed within this period, it returns `nil`. ```elixir Task.async(fn -> Process.sleep(1000) end) |> Task.yield(500) ``` We can run repeatedly `yield` whilst not `await`. ```elixir defmodule Continue do require Logger def _yield(task, timeout: time) do case Task.yield(task, time) do nil -> Logger.info("called again") _yield(task, timeout: time) result -> result end end end Task.async(fn -> Process.sleep(3000) end) |> Continue._yield(timeout: 500) ``` #### `Async_stream` Instead of doing `Enum.map |> Task.async`, we can do in one go with `Task.async_stream`. It is designed to create task processes from a list of items. The key point is we can set a _limit on the number of processes_ running at the same time with `:max_concurrency`. This provides _back-pressure_. The function `Task.async_stream` returns a stream, a list of functions ready to be called. We use `Stream.run/1` to run it when you don't care about the result. It returns `:ok`. If we want a result, we use `Enum.to_list/1`. ```elixir chars = Enum.map(?a..?l, fn x -> <<x::utf8>> <> "@com" end) ``` The function will run 4 concurrent stream so the 12 emails will be processed in 3 runs. ```elixir defmodule MyTasks do require Logger @emails chars def send_email(email) do Process.sleep(500) Logger.info("#{inspect(self())} processed #{email}") end def back_pressured do Task.Supervisor.async_stream_nolink( MyTaskSupervisor, @emails, &send_email(&1), max_concurrency: 4, ordered: false ) end end Time.utc_now() |> IO.inspect(label: :before) MyTasks.back_pressured() |> Stream.run() Time.utc_now() |> IO.inspect(label: :after) # |> Enum.to_list() ``` Usefull parameters are: * `on_timeout: :kill_task` * `ordered: false` The function `System.schedulers_online()` returns the number of available cores that the function would use by default in the `max_concurrency` parameter. <!-- livebook:{"break_markdown":true} --> ### Example of task: cron job We want to mimic a cron job with a Task. The task runs another task and is turned into a long-running process by calling itself when the main process receives a delayed message. We used `Process.send_after` for [not to use sleep](https://hexdocs.pm/elixir/Process.html#sleep/1). You need to build a permanent message receiver. Another implementation is done further down with a genserver, who supports messages handling among other benefits. ```elixir defmodule Period do use Task def start(args) do args = Keyword.merge(args, parent: self()) Task.start(__MODULE__, :do_run, [args]) end def do_run([every: time, run: f, parent: parent] = args) do Task.async(fn -> apply(f, []) end) # send to the main process Process.send_after(parent, {:loop, args}, time) end end defmodule Clean do require Logger def up, do: Logger.info("cleaned!") end # continuous listener on the main process defmodule Listen do def loop do receive do {:loop, args} -> # on message, run the task Period.do_run(args) loop() after 10_000 -> :timeout end end end ##### The final API: pid = spawn(fn -> Period.start( every: 3_000, run: fn -> Clean.up() end ) Listen.loop() end) Process.sleep(10_000) Process.exit(pid, :shutdown) ``` > It is preferable to use the supervised form [Task.Superivsor.async_nolink](https://hexdocs.pm/elixir/1.14.3/Task.Supervisor.html#async_nolink/3). See further down. ## GenServer A `Genserver` is a _(spawned)_ single threaded process. It is a mechanism to run a long-lived _stateful_ process that can communicate with other processes. It works with client-side functions and server-side callbacks and message passing. Messages are processed _sequentially_, "one at a time". The client functions are accessible from everywhere. The callbacks are functions that you can't call; they exist only in the module that implements the genserver behaviour; you have to implement them within the genserver module. Reading about using genservers: [spawn or not](https://www.theerlangelist.com/article/spawn_or_not) > A process in `Erlang/Elixir` is a memory isolated and single threaded code runner; it has a mailbox and an address and communicates with asynchronous message passing. The main client/server built-in functions in a genserver are: [cf Genserver cheat sheet](https://elixir-lang.org/downloads/cheatsheets/gen-server.pdf). <!-- livebook:{"break_markdown":true} --> | cilent / callback | what is does | replies? | what it returns | | ---------------------------------------------------------------- | ----------------------------------------------------------- | :------: | --------------------------------- | | `GenServer.start(args)` $\to$ `init(init_state)` | spawns a genserver and inits the state | yes | `{:ok, pid}` or `{:stop, reason}` | | `GenServer.call(to,match)` $\to$ `handle_call(pid, from, match)` | internal sync to perform a task and replies `response` | yes | `{:reply, response, state}` | | `GenServer.cast(to, match)` $\to$ `handle_cast(pid, match)` | internal async to perform a task without a response | no | `{:noreply, state}` | | `GenServer.call` $\to$ `GenServer.reply` | async delayed response | yes | `:ok` | | `send(pid, msg)` $\to$ `handle_info(msg, state)` | handle an async message from any process without a response | no | `{:noreply, state}` | <!-- livebook:{"break_markdown":true} --> You can use client functions in the callbacks, except: > ❗ it is [not recommended](https://hexdocs.pm/elixir/1.14.3/Task.html#await/2-compatibility-with-otp-behaviours) to `await` a long-running task inside a genserver. Since suchs tasks are monitored by the genserver and return 2 messages, these message should be captured in a `handle_info` . Uou can also return a delayed response with `GenServer.reply` from a `GenServer.call` (see the example in "Running Async Tasks") > ❗ You _cannot_ use a `GenServer.call` within a `handle_cast` or and `handle_info` callback because none of them returns whilst `call` awaits for a response to run. Use instead `send` and `handle_info`. <!-- livebook:{"break_markdown":true} --> ##### Get info on a process The module [Process](https://hexdocs.pm/elixir/1.14/Process.html) offers routines such as `Process.alive?(pid)` or `Process.info(pid)` that gives information: on the linked process `|> Keyword.get(:links)`, on the message queue `|> Keyworkd.get(:message_queue_len)` ... The module [sys](https://hexdocs.pm/elixir/1.14.3/GenServer.html#module-debugging-with-the-sys-module) offers primitives for debugging `:sys.trace(pid, true)`, to retrieve the state `:sys.get_state(pid)`. <!-- livebook:{"break_markdown":true} --> ##### Note on the `__MODULE__` placeholder The variable `__MODULE__` is a placeholder. It is a compilation environment macro which is the **current module name as an atom**. It is convenient to use this variable instead of writting the literal module name. ```elixir defmodule Test do IO.puts("This module is named #{__MODULE__} and is an atom: #{is_atom(__MODULE__)}") end is_atom(Test) ``` ## How to start a Genserver A genserver is declared with the behaviour `use GenServer` and is started with (a function that implements a) `GenServer.start_link` or `start` to which you pass the tuple `{MODULE_NAME, state, options}`. This function returns `{:ok, pid}` or `{:error, {:already_started, pid}}`. This in turn automatically invokes the callback `init/1` to instanciate the state. We are required to define the `init` callback. You add `@impl true` to warn the compiler you are defining a callback. An example of a minimal (and useless) genserver - where we put $1$ in its state - would be: ```elixir defmodule MinimalGS do use GenServer @impl true def init(1), do: {:ok, 1} end # instantiate {:ok, pid} = GenServer.start(MinimalGS, 1) ``` We can check its state: ```elixir :sys.get_state(pid) ``` You elaborate custom client functions to use callbacks. It relies on _pattern matching_ between the client function and the server callback. In the genserver module below, we have implemented only the callbacks: ```elixir defmodule FirstGs do use GenServer @impl true def init(init_args), do: {:ok, init_args} @impl true def handle_call(:double, _from, state), do: {:reply, state * 2, state} def handle_call({:triple, n}, _from, state), do: {:reply, n * 3, state} end ``` You can instantiate genservers "on-the-fly", identified by their PID: ```elixir for i <- 1..3 do {:ok, pid} = GenServer.start(FirstGs, i) %{ pid: pid, double: GenServer.call(pid, :double), triple: GenServer.call(pid, {:triple, i}), state: :sys.get_state(pid) } end ``` ## How to stop a genserver This can be done from a client function by invoking a `GenServer.stop(pid, :code)` order or equivalently letting a callback return `{:stop, reason, state}` from a callback. The optional callback `terminate` is triggered and can be used to handle cleanups before termination. Other reason than `:normal` or `:shutdown` is considered as a crash by OTP. You may set `Process.flag(:trap_exit, true)` in the `init` callback to capture the `:EXIT` and let `terminate` run (eg save state into a database before exiting). You can also `raise "message"` <!-- livebook:{"force_markdown":true} --> ```elixir GenServer.stop(pid, :code) #or return a `:stop` based tuple in a callback {:stop, :reason, state} # or raise in th callback raise "Stop me" # or `Process.exit` in a callback Process.exit(self(), :reason) ``` More indeep material: * this blog about [Stop a genserver](https://alexcastano.com/how-to-stop-a-genserver-in-elixir/) * you have a nice `Livebook`about this: [![Run in Livebook](https://livebook.dev/badge/v1/blue.svg)](https://livebook.dev/run?url=https%3A%2F%2Fgithub.com%2Fpaulanthonywilson%2Ffurlough%2Fblob%2Fmaster%2F_posts%2Flivebook%2Fvaried-ways-to-kill.livemd) ```elixir if Process.alive?(pid) do :sys.get_state(pid) |> IO.inspect(label: " the state of the genserver LongInitGs is: ") GenServer.stop(pid) Process.alive?(pid) end ``` ##### Example of genserver usage Long running code is discouraged in the `init` callback. You may want use the `:continue` instruction and implement the `handle_continue` callback to guaranty that this operation is finished before the Genserver handles another message/task. This is a blocking operation and avoids potential race conditions. <!-- livebook:{"break_markdown":true} --> The following usage of a genserver is an illustration; it is not appropriate as the simple Task used above is enough as you don't need to interact with this task. We implement again a genserver to run a cron job. It uses `handle_continue`. Once launched, it will run a reccurent job forever. We then kill the process shortly after. ```elixir defmodule Periodic do use GenServer require Logger @start Time.utc_now() defp time, do: Time.diff(Time.utc_now(), @start, :millisecond) def start_link(opts), do: GenServer.start(__MODULE__, opts) def init(args) do Logger.info("#{inspect(time())}ms: First Init") {:ok, args, {:continue, :run}} end def handle_continue(:run, [every: time, run: _f] = state) do Logger.info("#{inspect(time())}ms: Continue") Process.send_after(self(), :repeat, time) {:noreply, state} end def handle_info(:repeat, [every: time, run: myfun] = state) do Logger.info("#{inspect(time())}ms: repeat") Task.Supervisor.async_nolink(MyTaskSupervisor, fn -> apply(myfun, []) end) Process.send_after(self(), :repeat, time) {:noreply, state} end def handle_info({_, msg}, state) do Logger.info("message from task: #{msg}") {:noreply, state} end def handle_info(_msg, state) do # IO.inspect(msg) {:noreply, state} end end defmodule Send do def cleaner do Process.sleep(500) :cleaned! end end {:ok, pid} = Periodic.start_link( every: 3_000, run: fn -> Send.cleaner() end ) Process.sleep(11_000) GenServer.stop(pid, :shutdown) ``` ## Linking Genservers You can link processes with `Process.link/1`. This has to be used within a process: the parent is implicit and the child's PID is given as an argument. This is what `GenServer.start_link` does. It starts a **bi-directional** link with the caller. If you instantiate with `GenServer.start`, no link is created. <!-- livebook:{"break_markdown":true} --> > ❗ If a process is started with `start_link`, then this process will be linked to the current process, the `Livebook`. This means the livebook fails when the genserver fails (ie with other reason than `:normal` or `:shutdown`). One can use `Process.flag(:trap_exit, :true)` to capture the message `{:EXIT, from, reason}`. In Elixir, the `Supervisor` behaviour is meant to manage these links and restart policies. <!-- livebook:{"break_markdown":true} --> ##### Example Let's give an example with two linked genservers. The genserver `Parent` is instanciated with `GenServer.start` and it instantiates another genserver, called `Child` with `start_link`. When we kill one or another, both are killed. ```elixir defmodule Child do use GenServer # ----- @impl true def init(_), do: {:ok, {}} @impl true def terminate(reason, _), do: IO.inspect(reason, label: :terminate) end ``` ```elixir defmodule Parent do use GenServer def link(pid, module), do: GenServer.call(pid, {:link, module}) # ------- @impl true def init(_), do: {:ok, {}} @impl true def handle_call({:link, module}, _, _), do: {:reply, GenServer.start_link(module, :ok), {}} @impl true def terminate(reason, _), do: IO.inspect(reason, label: :terminate) end ``` The routine `Process.info` returns a keyword list, in particular the the linked processes. The routine `Process.alive?` returns a boolean. We link `Parent` and `child`: ```elixir # instantiate a "Parent" genserver {:ok, pid_parent} = GenServer.start(Parent, :ok) |> IO.inspect(label: :parent) # instantiate a linked genserver "Child"# {:ok, pid_child} = Parent.link(pid_parent, Child) |> IO.inspect(label: :child) {Process.alive?(pid_parent), Process.alive?(pid_child)} |> IO.inspect(label: :start) Process.info(pid_parent) |> Keyword.get(:links) |> IO.inspect(label: :check_links_of_parent) Process.info(pid_child) |> Keyword.get(:links) |> IO.inspect(label: :check_links_of_child) # check 1 {Process.alive?(pid_parent), Process.alive?(pid_child)} |> IO.inspect(label: :start) ``` We stop the parent: both are stopped. ```elixir GenServer.stop(pid_parent, :shutdown) # check 1 {Process.alive?(pid_parent), Process.alive?(pid_child)} ``` We restart and stop the child: both are stopped. ```elixir {:ok, pid_parent} = GenServer.start(Parent, :ok) {:ok, pid_child} = Parent.link(pid_parent, Child) {Process.alive?(pid_parent), Process.alive?(pid_child)} |> IO.inspect(label: :start) # stop Child GenServer.stop(pid_child, :shutdown) # check 2 {Process.alive?(pid_parent), Process.alive?(pid_child)} ``` > ❗ If we instantiate `Parent` with `GenServer.start_link`, this will link `Parent` with the caller which is the main process, this `Livebook`. If we stop `Parent`, this will stop the `Livebook`, which we don't want. <!-- livebook:{"break_markdown":true} --> #### Restart strategies of processes ( Genserver, Task) When you instantiate a genserver, you set the restart strategy by declaring it in: `use GenServer, restart: :strategy` where: * `restart: :temporary`: the process will not be restarted even if it crashes, * `restart: :transient`: when you want the process to be restarted for non-successful exits, * `restart: :permanent`: always restarted. This is the default behaviour ## Supervisor A supervisor is a "state machine": you declare with functions you want to maintain running. This relies on linking processes together. <!-- livebook:{"break_markdown":true} --> ### Test with a Genserver We start and supervise a genserver with `Supervisor.start_link` and pass a list of `child_spec` with a strategy. It is a good idea to use the convention to name `MyModule.start_link` the client function that invokes `GenServer.start_link` because of the defaults. ❗ Note that you have to use `GenServer.start_link` to link the supervisor and the module. > ❗❗ The supervisor expects a return in the form `{:ok, pid}` from the function that starts a process. This means function can't contain any other instruction else than `GenServer.start_link`. <!-- livebook:{"break_markdown":true} --> Let's supervise this minimal genserver: ```elixir defmodule SupMinimalGs do use GenServer # <- returns {:ok,pid} for the supervisor def start_link, do: GenServer.start_link(__MODULE__, nil) @impl true def init(_), do: {:ok, nil} end ``` The supervisor expects `child_specs`. It can be the module name or a map. If the supervised module is a genserver with no state, then we have a shortcut: the genserver name. Here we use show case an explicit `child_spec` map. <!-- livebook:{"force_markdown":true} --> ```elixir child_spec = %{id: SupMinimalGs, start: {SupMinimalGs, :start_link, []}} Supervisor.start_link([child_spec], strategy: :one_for_one, name: TestSupervisor) # or Supervisor.start_link([SupMinimalGs], strategy: :one_for_one, name: TestSupervisor) ``` <!-- livebook:{"break_markdown":true} --> In action, we instanciate a supervised module, check it is alive and kill it. ```elixir child_spec = %{id: SupMinimalGs, start: {SupMinimalGs, :start_link, []}} Supervisor.start_link([child_spec], strategy: :one_for_one, name: TestSupervisor) # Supervisor.start_link([SupMinimalGs], strategy: :one_for_one, name: TestSupervisor) [{SupMinimalGs, pid, _, _}] = Supervisor.which_children(TestSupervisor) |> Kino.inspect() IO.puts("the genserver with pid #{inspect(pid)} is supervised") Process.alive?(pid) |> Kino.inspect(label: :alive?) GenServer.stop(pid, :shutdown) |> IO.inspect(label: :genserver_stopped) Process.sleep(100) Process.alive?(pid) |> Kino.inspect(label: :alive_after_exit?) ``` Then we observe that the supervisor created a new one. ```elixir Process.sleep(1000) # we check agai which children are supervised [{SupMinimalGs, pid, _, _}] = Supervisor.which_children(TestSupervisor) |> Kino.inspect() IO.puts("a new genserver is created with pid: #{inspect(pid)}") Process.alive?(pid) |> Kino.inspect(label: :recreated_and_alive?) ``` #### Save the state You notice that the state of the module is reset to the initial values. One way is to save the state in a database (`ETS` for example) in the `terminate` callback (for "normal" shutdowns or when `:EXIT` is trapped), and read from the database the the `init` callback. <!-- livebook:{"break_markdown":true} --> ### Supervision tree You normally start it within the module with the behaviour `use Application` with `Supervisor.start_link`. You can also define a module with the behaviour `use Supervisor`. ##### Strategies of Supervisors * `strategy: :one_for_all`. If a child process terminates, all other child processes are terminated and then all child processes (including the terminated one) are restarted. * `strategy :one_for_one`. Only restart the failed child process * `strategy :rest_for_one`. Restart the failed process and any process started after it. Look at the supervision tree: we use a Dynamical Supervisor (see below), a Registry (see later) and a Task.Supervisor later so we start and supervise them here. ```elixir Process.whereis(MySupervisor) ``` The `Supervisor` module comes with some useful routines: ```elixir pid = Process.whereis(MySupervisor) Supervisor.count_children(pid) ``` ```elixir Supervisor.which_children(MySupervisor) ``` ## Dynamic Supervisor A dynamical supervisor is design to supervise "on-the-fly" singleton modules, such as genservers with specific attributes. The usage is quite similar to supervisors. ```elixir defmodule DynSupGs do use GenServer, restart: :permanent def start_link(v), do: GenServer.start_link(__MODULE__, v) @impl true def init(v), do: {:ok, v + 1} end ``` ```elixir child_spec = %{id: DynSupGs, start: {DynSupGs, :start_link, [1]}} # or simply child_spec = {DynSupGs, [1]} since we used the standard "start_link" !is_nil(Process.whereis(DynSup)) |> IO.inspect() DynamicSupervisor.start_child(DynSup, child_spec) # we check which children are supervised, get the pid list = DynamicSupervisor.which_children(DynSup) {_, pid, _, _} = List.last(list) IO.puts("the genserver with pid #{inspect(pid)} is supervised") Process.alive?(pid) |> Kino.inspect(label: :alive?) GenServer.stop(pid, :shutdown) |> IO.inspect(label: :genserver_stopped) Process.sleep(100) Process.alive?(pid) |> Kino.inspect(label: :alive_after_exit?) Process.sleep(3000) # we check again which children are supervised list = DynamicSupervisor.which_children(DynSup) {_, pid, _, _} = List.last(list) IO.puts("a new genserver is created with pid: #{inspect(pid)}") Process.alive?(pid) |> Kino.inspect() :sys.get_state(pid) ``` You can review the supervised processes with the supervision tree by running: ```elixir Process.whereis(MySupervisor) ``` ## Task Supervisor Tasks can also be supervised. You declare a [Task.Supervisor](https://hexdocs.pm/elixir/1.14.3/Task.Supervisor.html) in your supervising tree with the child spec: `{Task.Supervisor, name: MyTaskSupervisor}`. You can use the unlinked with [async_nolkink](https://hexdocs.pm/elixir/1.14.3/Task.Supervisor.html#async_nolink/3) or linked with [async](https://hexdocs.pm/elixir/1.14.3/Task.Supervisor.html#async/3) version: ```elixir defmodule Email2 do require Logger @emails ["a@com", "b@com", "c@com"] def send_email("b@com"), do: raise("oh no!") def send_email(email), do: Logger.info("#{email}") def run do Enum.each( @emails, fn email -> Task.Supervisor.async_nolink( MyTaskSupervisor, fn -> send_email(email) end ) end ) end end Email2.run() ``` ## Genserver Registration Every process can be given a name, aka as registration, with an **atom**: `Process.register(pid, :name)`. A Genserver can be referenced by its _PID_ or with the `:name` option (cf [name](https://hexdocs.pm/elixir/1.14.3/GenServer.html#module-name-registration)). The identifier passed to `:name` must be: * a unique atom such as `name: __MODULE__`. * or use the `Registry` mechanism: `name: {:via, Registry, {MyRegistry, "name"}}` > Note: this will uniquely identify a genserver on a node. You can identify it globally with `name: {:global, name}`. The genserver will now be uniquely identified within distributed Erlang nodes. Solutions to distribute registries exists with [Horde](https://hexdocs.pm/horde/readme.html). <!-- livebook:{"force_markdown":true} --> ```elixir # using the PID {:ok, pid} = GenServer.call(__MODULE__, init_args) Genserver.call(pid, :action) # using the atom module name __MODULE__ GenServer.start_link(__MODULE__, init_args, name: __MODULE__) GenServer.call(__MODULE__, :action) # using any atom GenServer.start_link(__MODULE__, init_args, name: :an_atom) GenServer.call(:an_atom, :action) # via a Registry. Make sure one has been instantiated Registry.start_link(keys: :unique, name: MyRegistry) via_tuple = {:via, Registry, {MyRegistry, "name"}} GenServer.start_link(__MODULE__, init_args, name: via_tuple) GenServer.call(via_tuple, :action) ``` <!-- livebook:{"break_markdown":true} --> ❗ A Genserver instantiated with the `:name` attribute or with the `:via` tuple can only be instantiated **once**. The instantiation routine `GenServer.start` returns `{:error, {already_started, pid}` or `{:ok, pid}`. <!-- livebook:{"force_markdown":true} --> ```elixir pid = case GenServer.start(..) do {:ok, pid} -> pid {:error, {:already_started, pid}} -> pid end ``` See the following examples. <!-- livebook:{"break_markdown":true} --> #### PID as identifier The generated PID is grabbed by pattern matching on the reponse. You pass it to the client functions `GenServer.call(PID, message)` and `GenServer.cast(PID, message)`. Each time you run the module below, a new PID is generated. > Notice below that when the `Kernel.send` function is run in the context of the genserver (for example in the `handle_call`), we can use `self()` whereas when used in the context of the main thread (for example in the `CallByPid.start_link`), we have to use the PID to reach the `handle_info` handled by the Genserver. ```elixir defmodule CallByPid do require Logger use GenServer def start_link(state: state) do {:ok, pid} = GenServer.start_link(__MODULE__, state) # message handled by "handle_info({:msg, pid})" send(pid, {:msg, pid}) # '.call' returns the 2d argument, which is the spwaned pid GenServer.call(pid, :ready) end # callbacks @impl true def init(state) do Logger.info("spawned #{inspect(self())} is instantiated with state: #{state}") {:ok, state} end @impl true def handle_call(:ready, {pid, _} = _from, state) do send( self(), {:msg, "process #{inspect(pid)} asks #{inspect(self())} to run :ready and returns its PID"} ) {:reply, self(), state} end # message printer @impl true def handle_info({:msg, message}, state) do Logger.info("spawned received: #{inspect(message)} from main") {:noreply, state} end end require Logger Logger.info("main process pid: " <> "#{inspect(self())}") Kino.Process.render_seq_trace(fn -> # sending to main process the pid of the spawned process with state 1 send(self(), {:spw1, CallByPid.start_link(state: "1")}) |> Kino.inspect() # this returns the pid of the spwaned process with state 2 CallByPid.start_link(state: "2") end) ``` We can check that our spawned process - the Genserver - is still alive and check its state directly: ```elixir receive do {:spw1, pid} -> Logger.info(inspect(pid)) Process.alive?(pid) |> Kino.inspect(label: "spawned process #{inspect(pid)} is still alive?") :sys.get_state(pid) end ``` ### The atom `__MODULE__` as identifier Instead of the `pid`, we can use `name: __MODULE__` which is an atom. You can also use any atom. Note that we can run `start_link` **only once** when we name the Genserver this way. We see that the "named" Genserver" can only be instantiated once per name, thus we cannot change the initial state. ```elixir defmodule CallByAtom do require Logger use GenServer def start_link(state: state) do case GenServer.start_link(__MODULE__, state, name: __MODULE__) do {:ok, pid} -> {:ok, pid} {:error, {:already_started, pid}} -> Logger.info( "Genserver #{inspect(pid)} with name #{__MODULE__} and state #{state} was already started" ) {:ok, pid} end end # the process identifier is "__MODULE__" def get_state, do: GenServer.call(__MODULE__, :get_state) def init(state) do Logger.info("#{inspect(self())} is instantiated with state: #{inspect(state)}") {:ok, state} end def handle_call(:get_state, _from, state), do: {:reply, state, state} end CallByAtom.start_link(state: "1") CallByAtom.get_state() |> IO.inspect() CallByAtom.start_link(state: "2") CallByAtom.get_state() ``` ### The Registry You can't use a string to register a Genserver. However, you can cast the string into an atom. This is however bad pratice in the Elixir world as the number of available atoms is limited. You can use the [Registry](https://hexdocs.pm/elixir/1.14.3/Registry.html) mechanism instead. It is a [name -> pid] dictionnary, based on [ETS](https://elixir-lang.org/getting-started/mix-otp/ets.html), thus local to each `node`. You instantiate a Registry, name it (`MyRegistry` here below). <!-- livebook:{"force_markdown":true} --> ```elixir Registry.start_link(keys: :unique, name: MyRegistry) ``` You can also isntantiate it in the supervision tree of the app: <!-- livebook:{"force_markdown":true} --> ```elixir children = [ ... {Registry, keys: :unique, name: MyOTPApp.Registry}, ] ``` Then you can identify the Genserver with any string or a number when you use the following tuple below. It associates a PID with the variable "var" and stores it in the registry MyRegistry. When a Genserver receives such a tuple with the [:via](https://hexdocs.pm/elixir/1.14.3/Registry.html#module-using-in-via), he will lookup for the PID in the given registry: <!-- livebook:{"force_markdown":true} --> ```elixir {:via, Registry, {MyRegistry, "var"}} ``` ```elixir defmodule CallByRegistry do require Logger use GenServer defp via_tuple(name), do: {:via, Registry, {ARegistry, name}} def start_link(name: name, state: state) do # instantiate a Registry Registry.start_link(keys: :unique, name: ARegistry) # reference a Genserver with the registry mechanism GenServer.start_link(__MODULE__, state, name: via_tuple(name)) # use the registry mechanism GenServer.call(via_tuple(name), :ready) end def get_state(name), do: via_tuple(name) |> :sys.get_state() @impl true def init(state) do Logger.info("#{inspect(self())} is instantiated with state: #{inspect(state)}") {:ok, state} end @impl true def handle_call(:ready, _from, state) do Logger.info("#{inspect(state)} ready") {:reply, :step_0, state} end end ``` If we reevaluate several times the cell below, we see that can instanciate a Genserver with a given name only once, so the "name" and "state" is unique. If we want to instanciate another state, we create another Genserver with this state. ```elixir CallByRegistry.start_link(name: "fish_1", state: "fish_1") CallByRegistry.get_state("fish_1") |> Kino.inspect() CallByRegistry.start_link(name: "fish_1", state: "fish_2") CallByRegistry.get_state("fish_1") |> Kino.inspect() CallByRegistry.start_link(name: "fish_2", state: "fish_2") CallByRegistry.get_state("fish_2") ``` The Registry modules comes with useful [routines](https://hexdocs.pm/elixir/1.14.3/Registry.html#functions): ```elixir Registry.count(ARegistry) ``` ```elixir CallByRegistry.get_state("fish_1") ``` ```elixir CallByRegistry.get_state("fish_2") ``` ```elixir Registry.lookup(ARegistry, "fish_2") ``` ```elixir [{pid, _}] = Registry.lookup(ARegistry, "fish_2") Registry.keys(ARegistry, pid) ``` #### Useful resources on Registries Besides the [doc](https://hexdocs.pm/elixir/1.14.3/Registry.html), you might find interesting: * <https://dashbit.co/blog/homemade-analytics-with-ecto-and-elixir> * <https://github.com/amokan/registry_sample/blob/master/lib/registry_sample/account.ex> ## Running Async Tasks in a Genserver You could run a `Task.async |> Task.await` in a genserver process. However, it is [not recommended](https://hexdocs.pm/elixir/1.14.3/Task.html#await/2-compatibility-with-otp-behaviours) to await a long task in a genserver. If you just want to run the task but don't care of the result, then you can launch an `async` task from a callback. This task is monitored by the Genserver. Once terminated, [two messages](https://hexdocs.pm/elixir/1.14.3/Task.html#await/2-compatibility-with-otp-behaviours) are generated with the format `{:DOWN,...,:normal}` and `{ref, result}`. You use a `GenServer.handle_info` to match on the response (see the example below). Even if you may not use `await`, you can still get a delayed response from a `Task.async` when you use `GenServer.reply` (see the "successful' example below). The typical use cases can be: * you may run a long task in a Liveview and don't want it to crash because of this process. You will use `Task.Supervisor` (see example below). To supervise tasks, you register a `Task.Supervisor` in the supervision tree. The failure if any is collected in the `terminate` callback of the Genserver. * you may also want the process to stop running when you change Liveview. You will use `Task.Supervisor.async` for this because the processes will be **linked**. * in the situation you just want to launch a task and don't care of receiving a response, then you may use `Task.Supervisor.async_nolink` so the processes are not linked. <!-- livebook:{"break_markdown":true} --> We will use this HTTP call: ```elixir defmodule AsyncTask do def request do Process.sleep(1_000) HTTPoison.get!("http://httpbin.org/json") |> Map.get(:body) |> Jason.decode!() |> get_in(["slideshow", "slides"]) end def run, do: Task.async(&request/0) end AsyncTask.run() ``` The genserver below calls the `AsyncTask` module. It will receive 2 messages: the result `{ref, return from async task}` and a completion message of the finished task: `{:DOWN, ref, :process, pid}`. The "catch all" `handle_info` below receives these two messages. ```elixir defmodule FirstAsyncTaskGsRunner do use GenServer require Logger def run(pid), do: GenServer.call(pid, :async) def init(_), do: {:ok, nil} def handle_call(:async, _, _s) do %{ref: ref, pid: pid} = AsyncTask.run() reply = "task with pid #{inspect(pid)} sent" {:reply, reply, ref} end # catch all def handle_info(msg, ref) do Logger.debug(inspect(msg)) {:noreply, ref} end def terminate(reason, _s), do: Logger.info("Terminate Child with reason: #{inspect(reason)}") end GenServer.start(FirstAsyncTaskGsRunner, {}) |> elem(1) |> FirstAsyncTaskGsRunner.run() ``` We can use `Process.demonitor(ref, [:flush])` inside the `handle_info` callback where you received the first message `{ref, message}`. In case the task is successful, you will not receive the `{:DOWN, ref, :process, pid, :normal}` status message. If the task fails, you will receive a message in the `:DOWN` handler. <!-- livebook:{"break_markdown":true} --> #### Successful `AsyncTask` and usage of `GenServer.reply` to return <!-- livebook:{"break_markdown":true} --> It is recommended to run this `AsyncTask` as a supervised task (see the next example). Recall that you can link the task to the main process or not. If you want to stop the task when the main process stops (eg you leave a page), use `Task.Supervisor.async`, and `Task.Supervisor.async_nolink` otherwise. ##### Example with `GenServer.reply` You can get a response back with [GenServer.reply/2](https://hexdocs.pm/elixir/1.14.3/GenServer.html#reply/2). In the callback `handle_call(:key, from, state)`, you reply instead with `{:noreply, state}`. You save the `from = {pid, [alias: ref]}` in the state so you can use it later. When the (monitored) async task is finished, there is message tuple sent with `{ref, result}`. Inside the callback you can use the function `GenServer.reply(from, result)` to return a delayed message from the `.call`. ```elixir defmodule AsyncTaskReplier do use GenServer require Logger def start, do: GenServer.start(__MODULE__, nil, name: __MODULE__) def init(_), do: {:ok, nil} def nolink, do: GenServer.call(__MODULE__, :nolink) def handle_call(:nolink, from, _) do %{ref: ref} = Task.Supervisor.async_nolink(MyTaskSupervisor, &AsyncTask.request/0) # Task.Supervisor.async_nolink(MyTaskSupervisor, fn -> raise "bad" end) {:noreply, {ref, from}} end def handle_info({r, response}, {ref, from}) when ref == r do # <-- stops the second :DOWN message if success Process.demonitor(ref, [:flush]) {pid, _} = from GenServer.reply(from, {pid, response}) {:noreply, response} end def handle_info({:DOWN, _ref, :process, _pid, status}, s) do Logger.debug("received status: #{inspect(status)}") {:noreply, s} end # the Genserver will call "terminate" if killed. def terminate(reason, _s) do Logger.info("Terminate Main with reason: #{inspect(reason)}") end end AsyncTaskReplier.start() {pid, response} = AsyncTaskReplier.nolink() Process.alive?(pid) |> IO.inspect() response ``` We use this time the "normal" monitoring mechanism of async tasks (when we don't care of the response): we listen to the 2 messages sent. Both forms, linked and unlined, are coded. ```elixir defmodule NoFailAsyncTaskRunner do use GenServer require Logger def start, do: GenServer.start(__MODULE__, nil, name: __MODULE__) def link, do: GenServer.call(__MODULE__, :async) def nolink, do: GenServer.call(__MODULE__, :nolink) def add, do: GenServer.call(__MODULE__, :one) def init(_), do: {:ok, nil} def handle_call(:one, _, s), do: {:reply, 1, s} def handle_call(:async, _from, _) do %{ref: ref, pid: pid} = Task.Supervisor.async(MyTaskSupervisor, &AsyncTask.request/0) {:reply, pid, ref} end def handle_call(:nolink, _from, _) do %{ref: ref, pid: pid} = Task.Supervisor.async_nolink(MyTaskSupervisor, &AsyncTask.request/0) {:reply, pid, ref} end def handle_info({r, response}, ref) when ref == r do Process.demonitor(ref, [:flush]) Logger.debug("response: #{inspect(response)}") {:noreply, nil} end def handle_info({:DOWN, _ref, :process, pid, status}, _rf) do Logger.debug("received status: #{inspect(status)} for the task #{inspect(pid)}}") {:noreply, nil} end # the Genserver will call "terminate" if killed. def terminate(reason, _s) do Logger.info("Terminate Main with reason: #{inspect(reason)}") end end ``` We run the task: ```elixir NoFailAsyncTaskRunner.start() pid_task = NoFailAsyncTaskRunner.link() ``` We kill the main process. We check that the task is killed. ```elixir Process.whereis(NoFailAsyncTaskRunner) |> GenServer.stop(:kill) IO.puts("Is task alive? :#{Process.alive?(pid)}") ``` We kill the main process and run the task **unlinked**. We check that the task is still alive as we get the response from the asyn task. ```elixir NoFailAsyncTaskRunner.start() pid_task = NoFailAsyncTaskRunner.nolink() |> IO.inspect() Process.whereis(NoFailAsyncTaskRunner) |> GenServer.stop(:kill) IO.puts("Is task alive? :#{Process.alive?(pid_task)}") ``` #### Supervising an async task Set-up another genserver below to run again the previous `AsyncTask` but we make it fail this time. * when we pass `bool = true`, we supervise it with `Task.Supervisor.async`. * when `bool = false`, we do not supervise it and run `Task.async` ```elixir defmodule AsyncTaskRunner do use GenServer require Logger def start_link() do GenServer.start_link(__MODULE__, {}, name: __MODULE__) end def run(bool \\ false), do: GenServer.call(__MODULE__, {:async, bool}) def init(_), do: {:ok, {}} def handle_call({:async, bool}, _, _) do %{ref: ref} = case bool do true -> Task.Supervisor.async(MyTaskSupervisor, fn -> Process.exit(self(), :kill) AsyncTask.request() end) false -> Task.async(fn -> Process.exit(self(), :kill) AsyncTask.request() end) end {:reply, Process.alive?(self()), ref} end def handle_info({r, response}, ref) when ref == r do Logger.debug("response: #{inspect(response)}") {:noreply, ref} end def handle_info({:DOWN, _ref, :process, _pid, status}, s) do Logger.debug("received status: #{inspect(status)}") {:noreply, s} end # the Genserver will call "terminate" if killed. def terminate(reason, _s) do Logger.info("Terminate Child with reason: #{inspect(reason)}") end end ``` We supervise the task runner above as a failing linked process will also kill the parent. It will run the (failing) `AsyncTask`: ```elixir DynamicSupervisor.start_child(DynSup, %{id: 1, start: {AsyncTaskRunner, :start_link, []}}) {_, pid1, _, _} = DynamicSupervisor.which_children(DynSup) |> List.last() ``` We run the linked (failing) `AsyncTask` is a supervised task: `Task.Supervisor.async`. We check that the parent process `AsyncTaskRunner` hasn't been restarted (same PID). ```elixir AsyncTaskRunner.run() # check pid {_, pid2, _, _} = DynamicSupervisor.which_children(DynSup) |> List.last() |> IO.inspect(label: :after_supervised_task) IO.puts("Same process? #{pid2 == pid2}") ``` The linked (failing) `AsyncTask` is not supervised: `Task.async`. We check that a new `AsyncTaskRunner` has been restarted by the DynamicSupervisor. ```elixir AsyncTaskRunner.run(true) Process.sleep(1000) {_, pid3, _, _} = DynamicSupervisor.which_children(DynSup) |> List.last() |> IO.inspect() IO.puts("Same process? #{pid2 == pid3}") ``` ## Concurrency How fast does it take to launch plenty of genservers concurrently? Even if using a genserver is clearly a wrong use case (no need for state nor communication), let's compute the factorial of a number with it. Recall the factorial noted $n!$ of a positive integer $n$ is the product of all the integers from 1 to $n$. We compute this product in three ways: * with a single process using `Enum.reduce` since $n! = n \cdot (n-1)$. * lazily and concurrently by streaming slices of the list `[1,2,..,n]` into chunks of say 10 elements, and spawn (with `GenServer.start`) a genserver of each chunk. Each genserver is responsbile to calculate the subproduct of the integers in the chunk he receives. We finally run an `Enum.reduce` on this list of subproducts. This method will lazily run concurrently many genservers. * running the same "stream chunks" process as above, lazily and concurrently, but without genservers. ```elixir defmodule FactorialGs do @moduledoc """ Genserver to compute the product of all integers in the received chunk """ def start_link(chunk), do: GenServer.start(__MODULE__, chunk) |> elem(1) |> GenServer.call(:factorial) def init(chunk), do: {:ok, chunk} def handle_call(:factorial, _, s), do: {:reply, Eval.reduce(s), s} end defmodule Eval do @moduledoc """ Expose three functions, `worker/1` and `stream/1` and `reduce/1`. The first spawns genservers, the second is a pur stream implementation, and the last is a single reduction process. """ @n 10 @doc """ Lazy streaming genservers without recursion iex> Eval.worker(4) 24 """ def worker(n) do 1..n |> Stream.chunk_every(@n) |> Stream.map(&FactorialGs.start_link(&1)) |> Enum.to_list() |> Enum.reduce(1, &(&1 * &2)) end @doc """ Lazy stream recursion without genserver iex> Eval.stream(4) 24 """ def stream(n), do: Enum.to_list(1..n) |> rec_stream() def rec_stream(list) when is_list(list) and length(list) > @n do list |> Stream.chunk_every(@n) |> Stream.map(&Eval.reduce/1) |> Enum.to_list() |> rec_stream() end def rec_stream(list) when is_list(list), do: Enum.reduce(list, 1, &(&1 * &2)) @doc """ Simple single process reduction iex> Eval.reduce(1..4) 24 """ def reduce(list), do: Enum.reduce(list, 1, &(&1 * &2)) end Eval.worker(3) ``` When we measure the time (in ms) to compute the factorial of a big number, say 10_000, we see that the concurrent process spawning genservers is faster than a single process, even with the overhead of using genservers. The fastest is the lazy concurrent without the overhead of using genservers. We see that the BEAM is able to spawn around 1000 genservers per ms. ```elixir n = 10_000 %{ reduce: :timer.tc(fn -> Eval.reduce(1..n) end) |> elem(0) |> div(1000), genserver: :timer.tc(fn -> Eval.worker(n) end) |> elem(0) |> div(1000), stream: :timer.tc(fn -> Eval.stream(n) end) |> elem(0) |> div(1000) } ```
See source

Have you already installed Livebook?

If you already installed Livebook, you can configure the default Livebook location where you want to open notebooks.
Livebook up Checking status We can't reach this Livebook (but we saved your preference anyway)
Run notebook

Not yet? Install Livebook in just a minute

Livebook is open source, free, and ready to run anywhere.

Run on your machine

with Livebook Desktop

Run in the cloud

on select platforms

To run on Linux, Docker, embedded devices, or Elixir’s Mix, check our README.

PLATINUM SPONSORS
SPONSORS
Code navigation with go to definition of modules and functions Read More ×