Run this notebook

Use Livebook to open this notebook and explore new ideas.

It is easy to get started, on your machine or the cloud.

Click below to open and run it in your Livebook at .

(or change your Livebook location)

# Amoc ```elixir Mix.install([ {:ranch, "~> 2.1"}, {:jiffy, "~> 1.1"}, {:gen_state_machine, "~> 2.0"}, {:uuid, "~> 1.1"}, {:amoc, "~> 3.0"}, {:kino, "~> 0.11.3"}, {:vega_lite, "~> 0.1.6"}, {:kino_vega_lite, "~> 0.1.10"}, {:kino_explorer, "~> 0.1.11"} ]) ``` ## Getting Started In this book, we're going to experiment with some barebones load testing cases. We will define a server, a scenario, some instrumentation, and we will load-test the server and get some results. Let's get started! ## The Server For this demo, we will need _something_ to load-test to begin with. So let's create a very basic super private messaging server. The server is a barebones message-router and nothing more, and it works with JSON payloads, we will define the protocol as follows: * On user connect, the user provides `%{<<"connect">> => user_id}` where `user_id` is _any_ binary, for example a UUIDv4 or something even more impossible to guess: * The server **checks only that no other user is connected** with this `user_id`: * If the `user_id` is free, accept the connection; * If the `user_id` is taken, then it closes the connection; * On a connected session, the server takes _any_ JSON payload, provided it has two mandatory fields, `from` and `to`: * The server **checks only that the specified `from` matches** the one the connection was establised with: * If it matches, then it routes the entire payload to `to`, transparently; * If it doesn't, then it does not route the message knowing it was a pishing attemp; * Note that in any case the server tell anything to the client about whether the peer was connected or the pishing attempt, to avoid leaking any knowledge about the intention of the payload. It is the responsibility of the peer, if wants to, to react to the message. How does the routing work? Simply, it delivers the payload to the recipient, if a recipient is connected, otherwise it does nothing. As the recipient's `user_id` is arbitrarily set by the recipient, the sender can only write to the recipient if it already knows his `user_id`. See the code below! <!-- livebook:{"break_markdown":true} --> ### The Session Manager ```elixir defmodule MessagingServer.SM do @moduledoc """ This is a simple wrapper around an ETS table that stores {user_id, pid} pairs, in order to be able to deliver messages from one user_id to another. """ use GenServer @table __MODULE__ @type user_id() :: term() @doc "Puts a _new_ user in the session table" def put_new(user_id, pid) do :ets.insert_new(@table, {user_id, pid}) end @doc "Gets the pid of a user by its ID, if the ID is registered" def get(user_id) do case :ets.lookup_element(@table, user_id, 2, {:error, :not_found}) do {:error, :not_found} -> {:error, :not_found} pid -> pid end end @doc "Removes a user from the session table" def delete(user_id) do :ets.delete(@table, user_id) end def start_link() do GenServer.start_link(__MODULE__, :no_args, []) end def init(_) do :ets.new(@table, [ :named_table, :public, :set, write_concurrency: true, read_concurrency: true ]) {:ok, :no_state} end end ``` ### The C2S code This is what implements the protocol, as a very simple state machine ```elixir defmodule MessagingServer.C2S do @moduledoc """ Implements the server-side protocol """ use GenStateMachine, callback_mode: :handle_event_function require Record require Logger Record.defrecord(:data, ref: nil, transport: nil, socket: nil) @type state() :: :connecting | {:connected, term()} def start_link(ref, transport, opts) do GenStateMachine.start_link(__MODULE__, {ref, transport, opts}, []) end def init({ref, transport, _}) do do_connect = {:next_event, :internal, {:do_connect, ref}} {:ok, :connecting, data(ref: ref, transport: transport), do_connect} end ## get ranch handshake and socket def handle_event(:internal, {:do_connect, ref}, :connecting, data) do {:ok, socket} = :ranch.handshake(ref) full_data = data(data, socket: socket) active_once(full_data) {:next_state, :connecting, full_data} end ## From device to server ## Connecting a session def handle_event(:internal, %{"connect" => user_id}, :connecting, data) do case MessagingServer.SM.put_new(user_id, self()) do true -> payload = %{"connected" => user_id} transport_send(data, payload) {:next_state, {:connected, user_id}, data} false -> payload = %{"rejected" => "duplicate"} transport_send(data, payload) {:stop, :rejected_connection_because_of_duplicate} end end ## Sending messages def handle_event(:internal, payload, {:connected, user_id}, _) do case payload do ## user_id =:= from -> no pishing %{"from" => ^user_id, "to" => to} -> case MessagingServer.SM.get(to) do {:error, :not_found} -> :keep_state_and_data pid -> send(pid, {:route, payload}) :keep_state_and_data end _ -> :keep_state_and_data end end ## From peer to peer ## Receiving messages def handle_event(:info, {:route, payload}, {:connected, user_id}, data) do case payload do ## user_id =:= to %{"to" => ^user_id} -> transport_send(data, payload) :keep_state_and_data ## message wasnt to me, ignore without revealing who wrote to me _ -> :keep_state_and_data end end ## TCP related handlers def handle_event(:info, {:tcp_closed, socket}, _, data(socket: socket)) do {:stop, :normal} end def handle_event(:info, {:tcp_error, socket, reason}, _, data(socket: socket)) do {:stop, reason} end def handle_event(:info, {:tcp, socket, packet}, _, data = data(socket: socket)) do case decode_all(packet) do :bad_json_received -> {:stop, :bad_json_received} events -> active_once(data) {:keep_state_and_data, events} end end ## Match-all ignore-rest def handle_event(event_type, event_content, _state, _data) do Process.sleep(500) Logger.alert("we got something") Logger.alert(%{type: event_type, content: event_content, where: 105}) :keep_state_and_data end def terminate(reason, state, d = data(socket: socket, transport: transport)) when socket != nil and transport != nil do try do apply(transport, :close, [socket]) catch _ -> :ok end terminate(reason, state, data(d, socket: nil, transport: nil)) end def terminate(reason, {:connected, user_id}, data) do MessagingServer.SM.delete(user_id) terminate(reason, :connecting, data) end def terminate(_, _, _) do :ok end defp active_once(data(transport: transport, socket: socket)) do :ok = transport.setopts(socket, active: :once) end defp transport_send(data(transport: transport, socket: socket), payload) do packet = :jiffy.encode(payload) transport.send(socket, packet) end defp decode_all(packet, events \\ []) do try do case :jiffy.decode(packet, [:return_maps, :return_trailer]) do {:has_trailer, payload, trailer} -> payload_event = {:next_event, :internal, payload} decode_all(trailer, [payload_event | events]) payload -> payload_event = {:next_event, :internal, payload} Enum.reverse([payload_event | events]) end catch _, _ -> :bad_json_received end end end ``` ### The Supervisor and Application modules ```elixir defmodule MessagingServer.Sup do use Supervisor def start_link(_) do Supervisor.start_link(__MODULE__, [], name: __MODULE__) end def init([]) do ranch = :ranch.child_spec( MessagingServer, :ranch_tcp, %{socket_opts: [port: 8888]}, MessagingServer.C2S, [] ) sm = %{id: MessagingServer.SM, start: {MessagingServer.SM, :start_link, []}} supFlags = %{strategy: :one_for_all, intensity: 0, period: 1} childSpecs = [sm, ranch] {:ok, {supFlags, childSpecs}} end end ``` ```elixir defmodule MessagingServer do use Application def start(_type, _args) do children = [MessagingServer.Sup] Supervisor.start_link(children, strategy: :one_for_one) end end ``` ```elixir MessagingServer.start(:normal, []) ``` ## The Scenario ### Developing a scenario TL;DR: https://hexdocs.pm/amoc/scenario.html Now, say you've read the official guide, now you know we need two special callbacks, `init/0` and `start/1,2`. Let's separate them into different modules, for readability. <!-- livebook:{"break_markdown":true} --> ### The user process We first define a simple struct that will hold the permanent data for the connection, like socket and config state. Then, we define a state machine that knows how to send messages. After a connection is established: * Every ~1 second it will send a message of 20-30 characters to some randomly selected neighbour * If it receives a message, it adds this message to the list of pending messages to send and continues going through its ~1 second loop * The user will have capped the maximum number of messages that it will send to ~20..30 * When it can't send more messages nor it has received any for long enough, ~3 seconds, it will die ```elixir defmodule LoadTest.User.Data do defstruct([:id, :neighbours, :socket]) end ``` ```elixir defmodule LoadTest.User do alias LoadTest.User.Data, as: Data use GenStateMachine, callback_mode: :handle_event_function defstruct([:id, :neighbours, :socket]) def init(_) do raise "This should be unreachable" end def handle_event(:info, {:tcp_error, socket, reason}, _, %Data{socket: socket}) do {:stop, reason} end def handle_event(:info, {:tcp, socket, packet}, _, %Data{socket: socket}) do case decode_all(packet) do :bad_json_received -> {:stop, :bad_json_received} events -> {:keep_state_and_data, events} end end def handle_event(:info, {:EXIT, _pid, reason}, _, _) do {:stop, reason} end ## Connecting a session def handle_event(:internal, :do_connect, :connecting, data = %Data{id: my_id}) do payload = %{"connect" => my_id} transport_send(data, payload) {:next_state, :waiting_for_connection_ack, data} end def handle_event( :internal, %{"connected" => my_id}, :waiting_for_connection_ack, %Data{id: my_id, neighbours: neighbours} = data ) do left = num_of_messages() pending = build_pendings(my_id, neighbours) schedule = [{:next_event, {:timeout, :time_to_send_one}, :time_to_send_one}] {:next_state, {:connected, left, pending}, data, schedule} end ## This is it, it really can't send any more messages, nor has it received any, DIE! def handle_event(:timeout, :inactive_long_enough, {:connected, 0, _}, _) do :stop end ## It is still receiving messages, act like you stay a bit longer to read them but then die def handle_event(_, _, {:connected, 0, _}, _) do schedule = [{:timeout, time_to_die_if_inactive(), :inactive_long_enough}] {:keep_state_and_data, schedule} end ## Nothing has really happened in a while, disconnect the user def handle_event(:timeout, :inactive_long_enough, _, _) do :stop end ## Time to send a message def handle_event( {:timeout, :time_to_send_one}, :time_to_send_one, {:connected, left, pending}, data ) do send_now = Enum.random(pending) new_pending = List.delete(pending, send_now) transport_send(data, send_now) case {left, Enum.count(new_pending)} do {1, _} -> schedule = [{:timeout, time_to_die_if_inactive(), :inactive_long_enough}] {:next_state, {:connected, 0, []}, data, schedule} {_, 0} -> schedule = [{:timeout, time_to_die_if_inactive(), :inactive_long_enough}] {:next_state, {:connected, left - 1, new_pending}, data, schedule} _ -> schedule = [{{:timeout, :time_to_send_one}, time_to_respond(), :time_to_send_one}] {:next_state, {:connected, left - 1, new_pending}, data, schedule} end end ## I received a message, note that we match the Data.id with the "to" field def handle_event( :internal, %{"to" => my_id} = payload, {:connected, left, pending}, %Data{id: my_id} = data ) do new_pending = [build_answer(my_id, payload) | pending] case Enum.count(pending) do ## then we know we didn't have anything schedule, do so now 0 -> schedule = [{{:timeout, :time_to_send_one}, time_to_respond(), :time_to_send_one}] {:next_state, {:connected, left, new_pending}, data, schedule} ## So something else is coming already, this only append to pending _ -> {:next_state, {:connected, left, new_pending}, data} end end def handle_event(_, _, _, _) do :keep_state_and_data end def terminate(_, _, %Data{socket: socket}) do :gen_tcp.close(socket) end defp transport_send(%Data{socket: socket}, payload) do packet = :jiffy.encode(payload) :gen_tcp.send(socket, packet) end defp decode_all(packet, events \\ []) do try do case :jiffy.decode(packet, [:return_maps, :return_trailer]) do {:has_trailer, payload, trailer} -> payload_event = {:next_event, :internal, payload} decode_all(trailer, [payload_event | events]) payload -> payload_event = {:next_event, :internal, payload} Enum.reverse([payload_event | events]) end catch _, _ -> :bad_json_received end end defp build_pendings(my_id, range) do Enum.map(range, payload_builder(my_id)) end defp build_answer(my_id, %{"from" => from_id, "msg_id" => msg_id}) do %{ "from" => my_id, "to" => from_id, "msg" => generate_msg(), "msg_id" => UUID.uuid4(), "resp_to" => msg_id } end defp payload_builder(my_id) do fn i -> id = Integer.to_string(i) %{"from" => my_id, "to" => id, "msg" => generate_msg(), "msg_id" => UUID.uuid4()} end end ## Approximately 20..30 messages defp num_of_messages() do :rand.uniform(10) + 20 end ## Approximately ~1s defp time_to_respond() do :rand.uniform(200) + 900 end ## Approximately ~3s defp time_to_die_if_inactive() do :rand.uniform(200) + 2900 end ## generate a message between 16 and 24 characters long defp generate_msg() do (10 + :rand.uniform(6)) |> :crypto.strong_rand_bytes() |> Base.encode64(case: :lower) end end ``` ### Initialisation We first implement what is going to be the initialisation module. ```elixir defmodule LoadTest.Init do def init do {:ok, build_state()} end defp build_state() do host = :amoc_config.get(:host) port = :amoc_config.get(:port) %{host: host, port: port} end end ``` ### User runs And here, we define a function that each user will execute: ```elixir defmodule LoadTest.Start do def start(my_id, %{host: host, port: port}) do {:ok, socket} = :gen_tcp.connect(host, port, [:binary, active: true]) data = %LoadTest.User.Data{ id: Integer.to_string(my_id), neighbours: build_neighbours(my_id), socket: socket } try do schedule = [{:next_event, :internal, :do_connect}] :gen_statem.enter_loop(LoadTest.User, [], :connecting, data, schedule) catch :exit, :normal -> :ok end end defp build_neighbours(my_id) do n = :amoc_config.get(:number_of_neighbours) a = my_id - n b = my_id + n a..b |> Enum.to_list() |> List.delete(my_id) end end ``` ### The load testing module Alas, this module does implement the `:amoc_scenario` behaviour, and its two main callbacks simply call the previously defined two modules. ```elixir defmodule LoadTest do @behaviour :amoc_scenario Module.register_attribute(__MODULE__, :required_variable, persist: true) @required_variable [ %{:name => :host, :default_value => ~c"localhost", :description => ~c"host to connect to"}, %{:name => :port, :default_value => 8888, :description => ~c"port to connect to"}, %{ :name => :number_of_neighbours, :default_value => 10, :description => ~c"number of neighbours a user will write to" } ] def init do LoadTest.Init.init() end def start(user_id, opts) do LoadTest.Start.start(user_id, opts) end end ``` ### Running the scenario! Now we're ready to run this scenario, we will just ensure `amoc` has been just cleanly (re-)started, as this also ensures the created module has been loaded, and we're good to go! ```elixir Application.stop(:amoc) Application.ensure_all_started(:amoc) :amoc.do(LoadTest, 6, host: ~c"localhost", port: 8888, number_of_neighbours: 2 ) ``` Don't forget to check the status of the run every once in a while: ```elixir :amoc_controller.get_status() ``` ```elixir :amoc.stop() ``` ## The Instrumentation ### Collect metrics We have already seen that we can run a scenario, now, let's see about collecting some basic metrics thanks to `amoc`'s telemetry instrumentation. We will collect first timestamps of the scenario start, so that timestamps of user starts can be aggregated to it, and then we also define collectors for users. We store everything on `ets` tables, for simplicity, to analyse later. ```elixir defmodule AmocHandlers.ScenarioStats do @scenario_stats :amoc_html_reporter_scenario_stats def attach_handlers do @scenario_stats = :ets.new(@scenario_stats, [:named_table, :ordered_set, :public, write_concurrency: true]) :ok = :telemetry.attach_many( "amoc_html_reporter-scenario-init", [ [:amoc, :scenario, :init, :start], [:amoc, :scenario, :init, :stop], [:amoc, :scenario, :init, :exception] ], &__MODULE__.handle_event/4, %{} ) end def handle_event( [:amoc, :scenario, :init, :start], %{monotonic_time: monotonicTime}, %{scenario: scenario}, _config ) do milliseconds = System.convert_time_unit(monotonicTime, :native, :millisecond) record = %{ scenario: :erlang.atom_to_binary(scenario), status: "started", reason: nil, started_at: milliseconds, duration: nil } :ets.insert(@scenario_stats, {scenario, record}) end def handle_event( [:amoc, :scenario, :init, :stop], %{duration: duration}, %{scenario: scenario}, _config ) do [{_, record}] = :ets.lookup(@scenario_stats, scenario) milliseconds = System.convert_time_unit(duration, :native, :millisecond) newrecord = %{record | status: "finished", reason: :normal, duration: milliseconds} :ets.insert(@scenario_stats, {scenario, newrecord}) end def handle_event( [:amoc, :scenario, :init, :exception], %{duration: duration}, %{scenario: scenario, kind: class, reason: reason}, _config ) do [{_, record}] = :ets.lookup(@scenario_stats, scenario) milliseconds = System.convert_time_unit(duration, :native, :millisecond) newrecord = %{record | status: "exited", reason: {class, reason}, duration: milliseconds} :ets.insert(@scenario_stats, {scenario, newrecord}) end end ``` ```elixir defmodule AmocHandlers.UserStats do @scenario_stats :amoc_html_reporter_scenario_stats @users_plot :amoc_html_reporter_users_plot def attach_handlers do @users_plot = :ets.new(@users_plot, [ :named_table, :ordered_set, :public, read_concurrency: true, write_concurrency: true ]) :ok = :telemetry.attach_many( "amoc_html_reporter-scenario-user", [ [:amoc, :scenario, :start, :start], [:amoc, :scenario, :start, :stop], [:amoc, :scenario, :start, :exception] ], &AmocHandlers.UserStats.handle_event/4, %{} ) end def handle_event( [:amoc, :scenario, :start, :start], %{monotonic_time: monotonicTime}, %{scenario: scenario, user_id: userId}, _config ) do milliseconds = System.convert_time_unit(monotonicTime, :native, :millisecond) record = %{ id: userId, scenario: :erlang.atom_to_binary(scenario), status: "started", reason: nil, started_at: milliseconds, duration: nil } :ets.insert(@users_plot, {userId, record}) end def handle_event( [:amoc, :scenario, :start, :stop], %{duration: duration}, %{scenario: scenario, user_id: user_id}, _config ) do status = "finished" event_reason = :normal handle_end(user_id, scenario, duration, status, event_reason) end def handle_event( [:amoc, :scenario, :start, :exception], %{duration: duration}, %{scenario: scenario, user_id: user_id, kind: class, reason: reason, stacktrace: s}, _config ) do status = "exited" event_reason = {class, reason, s} handle_end(user_id, scenario, duration, status, event_reason) end defp handle_end(user_id, scenario, duration, status, reason) do duration_ms = System.convert_time_unit(duration, :native, :millisecond) [{_, scenario_record}] = :ets.lookup(@scenario_stats, scenario) scenario_started_at = scenario_record.started_at [{_, user_record}] = :ets.lookup(@users_plot, user_id) started_at = user_record.started_at new_record = %{ user_record | status: status, reason: reason, started_at: scenario_started_at - started_at, duration: duration_ms } :ets.insert(@users_plot, {user_id, new_record}) end end ``` ```elixir defmodule AmocHandlers do use GenServer # Initialization def start_link() do case :erlang.whereis(__MODULE__) do :undefined -> :ok _pid -> GenServer.stop(__MODULE__) end GenServer.start_link(__MODULE__, :no_opts, name: __MODULE__) end def init(:no_opts) do attach_handlers() {:ok, :no_state} end # Callbacks def handle_call(_msg, _from, state) do {:reply, :ok, state} end def handle_cast(_msg, state) do {:noreply, state} end def handle_info(_msg, state) do {:noreply, state} end defp attach_handlers() do AmocHandlers.UserStats.attach_handlers() AmocHandlers.ScenarioStats.attach_handlers() end end ``` ## The Results Don't forget to restart amoc ```elixir Application.stop(:amoc) Application.stop(:telemetry) {ok, apps} = Application.ensure_all_started([:telemetry, :amoc]) {ok, server_handler} = AmocHandlers.start_link() {apps, server_handler} ``` ```elixir :amoc.do(LoadTest, 5000, host: ~c"localhost", port: 8888, number_of_neighbours: 10 ) ``` And we can some time check the scenario status ```elixir :amoc_controller.get_status() ``` And then finally stop the scenario ```elixir # :amoc.stop() ``` ```elixir mapper = fn {_id, value} -> Map.delete(value, :reason) end tab = :ets.tab2list(:amoc_html_reporter_users_plot) |> Enum.map(mapper) |> Explorer.DataFrame.new() ``` <!-- livebook:{"attrs":{"assign_to":"done_so_far","collect":true,"data_frame":"tab","data_frame_alias":"Elixir.Explorer.DataFrame","is_data_frame":true,"missing_require":"Elixir.Explorer.DataFrame","operations":[{"active":true,"column":"duration","data_options":{"duration":"integer","id":"integer","scenario":"string","started_at":"integer","status":"string"},"datalist":[],"filter":"greater equal","message":null,"operation_type":"filters","type":"integer","value":"0"}]},"chunks":null,"kind":"Elixir.KinoExplorer.DataTransformCell","livebook_object":"smart_cell"} --> ```elixir require Explorer.DataFrame done_so_far = tab |> Explorer.DataFrame.lazy() |> Explorer.DataFrame.filter(duration >= 0) |> Explorer.DataFrame.collect() ``` ```elixir require Explorer.DataFrame tab |> Explorer.DataFrame.lazy() |> Explorer.DataFrame.group_by("status") |> Explorer.DataFrame.summarise(status_count: count(status)) ``` <!-- livebook:{"attrs":{"chart_title":null,"height":null,"layers":[{"active":true,"chart_type":"line","color_field":null,"color_field_aggregate":null,"color_field_bin":null,"color_field_scale_scheme":null,"color_field_type":null,"data_variable":"done_so_far","geodata_color":"blue","latitude_field":null,"longitude_field":null,"x_field":"started_at","x_field_aggregate":null,"x_field_bin":null,"x_field_scale_type":"linear","x_field_type":"quantitative","y_field":"duration","y_field_aggregate":null,"y_field_bin":null,"y_field_scale_type":"linear","y_field_type":"quantitative"}],"vl_alias":"Elixir.VegaLite","width":800},"chunks":null,"kind":"Elixir.KinoVegaLite.ChartCell","livebook_object":"smart_cell"} --> ```elixir VegaLite.new(width: 800) |> VegaLite.data_from_values(done_so_far, only: ["started_at", "duration"]) |> VegaLite.mark(:line) |> VegaLite.encode_field(:x, "started_at", type: :quantitative, scale: [type: :linear]) |> VegaLite.encode_field(:y, "duration", type: :quantitative, scale: [type: :linear]) ``` ### Interpretation We can see it this simple result, that on average, users took around 25-30 seconds to carry on their tasks, with no spikes delayed long above, meaning, the users didn't have to wait long for the server to accept their messages. Note however, that we don't know if the messages were delivered, nor how long they took, because the server had hidden this information from us. But we have written a server, and a scenario, and we have run the scenario, purely in Elixir and Livebook. May this guide have served you well!
See source

Have you already installed Livebook?

If you already installed Livebook, you can configure the default Livebook location where you want to open notebooks.
Livebook up Checking status We can't reach this Livebook (but we saved your preference anyway)
Run notebook

Not yet? Install Livebook in just a minute

Livebook is open source, free, and ready to run anywhere.

Run on your machine

with Livebook Desktop

Run in the cloud

on select platforms

To run on Linux, Docker, embedded devices, or Elixir’s Mix, check our README.

PLATINUM SPONSORS
SPONSORS
Code navigation with go to definition of modules and functions Read More ×