Run this notebook

Use Livebook to open this notebook and explore new ideas.

It is easy to get started, on your machine or the cloud.

Click below to open and run it in your Livebook at .

(or change your Livebook location)

# MQTT ```elixir Mix.install([ {:vega_lite, "~> 0.1.5"}, {:kino_vega_lite, "~> 0.1.7"}, {:kino, "~> 0.8.0"}, {:tortoise, "~> 0.9"}, {:jason, "~> 1.2"} ]) ``` ## Introduction This notebook implements a simple MQTT producer/consumer pair using the [Tortoise](https://github.com/gausby/tortoise) module. ## Config ```elixir kino_broker = Kino.Input.text("MQTT Broker:", default: "broker.hivemq.com") ``` ```elixir kino_topic = Kino.Input.text("MQTT Topic:", default: "some/test/topic/#{:os.system_time(:milli_seconds)}" ) ``` ```elixir {broker, topic} = { Kino.Input.read(kino_broker), Kino.Input.read(kino_topic) } ``` ## Generator Lets define a function: ```elixir f = fn x -> :math.sin(x * 2) * :math.cos(x) end ``` ## Producer Server code for the producer: ```elixir defmodule Producer do use GenServer @sleeptime 1000 def start_link(broker, topic, f) do GenServer.start(__MODULE__, {broker, topic, f}, name: __MODULE__) end # callbacks @impl true def init({broker, topic, f}) do t = :os.system_time(:milli_seconds) client_id = "mqtt_producer_#{t}" start_client(broker, client_id) register_timeout() state = [t0: t, f: f, client: client_id, topic: topic] {:ok, state} end @impl true def handle_info(:emit, [t0: t0, f: f, client: client_id, topic: topic] = state) do t = :os.system_time(:milli_seconds) tdiff = (t - t0) / 10_000 Tortoise.publish(client_id, topic, ~c"{\"t\": #{tdiff}, \"v\": #{f.(tdiff)}}", qos: 0) register_timeout() {:noreply, state} end # helpers defp start_client(broker, client_id) do Tortoise.Supervisor.start_child( client_id: client_id, handler: {Tortoise.Handler.Default, []}, server: {Tortoise.Transport.Tcp, host: broker, port: 1883}, subscriptions: [] ) end defp register_timeout() do Process.send_after(self(), :emit, @sleeptime) end end ``` Start producer: ```elixir {:ok, producer_pid} = Producer.start_link(broker, topic, f) ``` ## Visualization ```elixir alias VegaLite, as: Vl interval = 100 kino = Vl.new(width: 400, height: 400) |> Vl.mark(:line) |> Vl.encode_field(:x, "x", type: :quantitative) |> Vl.encode_field(:y, "y", type: :quantitative) |> Kino.VegaLite.new() |> Kino.render() nil ``` Data should appear once the consumer has been started. ## Consumer ```elixir defmodule Consumer do use Tortoise.Handler def start_link(args) do GenServer.start(__MODULE__, args) end # callback functions @impl true def init([kino]) do {:ok, %{kino: kino}} end def handle_message(_topic, payload, %{kino: kino} = state) do case payload |> Jason.decode() do {:ok, %{"t" => x, "v" => y}} -> Kino.VegaLite.push(kino, %{x: x, y: y}, window: 100) _ -> IO.puts("Rejecting format of payload #{payload}") end {:ok, state} end def child_spec(opts) do %{ id: __MODULE__, start: {__MODULE__, :start_link, [opts]}, type: :worker, restart: :permanent, shutdown: 500 } end end ``` ```elixir client_id = "mqtt_consumer#{:os.system_time(:milli_seconds)}" {:ok, pid} = Tortoise.Connection.start_link( client_id: client_id, server: {Tortoise.Transport.Tcp, host: broker, port: 1883}, handler: { Consumer, [kino] }, subscriptions: [{topic, 0}] ) ```
See source

Have you already installed Livebook?

If you already installed Livebook, you can configure the default Livebook location where you want to open notebooks.
Livebook up Checking status We can't reach this Livebook (but we saved your preference anyway)
Run notebook

Not yet? Install Livebook in just a minute

Livebook is open source, free, and ready to run anywhere.

Run in the cloud

on select platforms

To run on Linux, Docker, embedded devices, or Elixir’s Mix, check our README.

PLATINUM SPONSORS
SPONSORS
Code navigation with go to definition of modules and functions Read More