Run this notebook

Use Livebook to open this notebook and explore new ideas.

It is easy to get started, on your machine or the cloud.

Click below to open and run it in your Livebook at .

(or change your Livebook location)

# Postgres Replication to Logflare ```elixir Mix.install([ {:ecto, ">= 3.10.2"}, {:postgrex, ">= 0.17.1"}, {:pgoutput_decoder, "~> 0.1.0"}, {:jason, "~> 1.4"}, {:logflare_api_client, "~> 0.3.5"} ]) ``` ## Connect to our database ```elixir # Need this for non-local # :ssl.start() {:ok, pid} = Postgrex.start_link( hostname: "localhost", port: 54322, database: "postgres", password: "postgres", username: "postgres" # ssl: true ) ``` ```elixir {:ok, result} = Postgrex.query(pid, "select 1 as one", []) result ``` ## Create Publication ```elixir Postgrex.query(pid, "ALTER PUBLICATION supabase_realtime ADD TABLE diary_entries;", []) ``` ## Setup Logflare Client ```elixir defmodule Repl.LogflareClient do def config() do %{api_key: "blah", url: "https://api.logflare.app"} end def new(config) do LogflareApiClient.new(config) end def post(event) do source_id = "f8b9a438-3650-4420-9ac0-19dbf908acaa" metadata = Jason.encode!(event) timestamp = DateTime.now!("Etc/UTC") |> DateTime.to_unix(:millisecond) event = %{"timestamp" => timestamp, "message" => "New record", "metadata" => metadata} config() |> LogflareApiClient.new() |> LogflareApiClient.post_logs([event], source_id) end end ``` ## Listen to the WAL ```elixir defmodule Repl.ReplConn do use Postgrex.ReplicationConnection alias Repl.WalHandler def start_link(opts) do # Automatically reconnect if we lose connection. extra_opts = [ auto_reconnect: true ] Postgrex.ReplicationConnection.start_link(__MODULE__, :ok, extra_opts ++ opts) end @impl true def init(:ok) do {:ok, %{step: :disconnected}} end @impl true def handle_connect(state) do query = "CREATE_REPLICATION_SLOT postgrex TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT" {:query, query, %{state | step: :create_slot}} end @impl true def handle_result(results, %{step: :create_slot} = state) when is_list(results) do query = "START_REPLICATION SLOT postgrex LOGICAL 0/0 (proto_version '1', publication_names 'supabase_realtime')" {:stream, query, [], %{state | step: :streaming}} end @impl true # https://www.postgresql.org/docs/14/protocol-replication.html def handle_data(<<?w, _wal_start::64, _wal_end::64, _clock::64, rest::binary>>, state) do record = PgoutputDecoder.decode_message(rest) |> IO.inspect() WalHandler.process_message(record) {:noreply, state} end def handle_data(<<?k, wal_end::64, _clock::64, reply>>, state) do messages = case reply do 1 -> [<<?r, wal_end + 1::64, wal_end + 1::64, wal_end + 1::64, current_time()::64, 0>>] 0 -> [] end {:noreply, messages, state} end @epoch DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond) defp current_time(), do: System.os_time(:microsecond) - @epoch end ``` ## Handle WAL records ```elixir defmodule Repl.WalHandler do @moduledoc """ Publishes messages from Replication to PubSub """ use GenServer alias PgoutputDecoder.Messages alias Repl.LogflareClient defstruct [:relations] @spec start_link(any) :: :ignore | {:error, any} | {:ok, pid} def start_link(args) do GenServer.start_link(__MODULE__, args, name: __MODULE__) end @spec process_message(any) :: :ok def process_message(message) do GenServer.cast(__MODULE__, {:message, message}) end @impl true def init(_args) do {:ok, %__MODULE__{}} end def get_relations() do GenServer.call(__MODULE__, :get_relations) end @impl true def handle_call(:get_relations, _from, %{relations: nil} = state) do {:reply, {:error, :no_relations_yet}, state} end def handle_call(:get_relations, _from, state) do {:reply, {:ok, state.relations}, state} end @impl true def handle_cast({:message, %Messages.Relation{} = message}, state) do relations = [message | state.relations] {:noreply, %{state | relations: relations}} end @impl true def handle_cast( {:message, %Messages.Delete{relation_id: rel_id, old_tuple_data: nil} = message}, state ) do relation = Enum.find(state.relations, &(rel_id == &1.id)) if relation do record = for {column, index} <- Enum.with_index(relation.columns), do: {String.to_atom(column.name), elem(message.changed_key_tuple_data, index)}, into: %{} LogflareClient.post(record) {:noreply, state} else {:noreply, state} end end @impl true def handle_cast({:message, %Messages.Delete{relation_id: rel_id} = message}, state) do relation = Enum.find(state.relations, &(rel_id == &1.id)) if relation do record = for {column, index} <- Enum.with_index(relation.columns), do: {String.to_atom(column.name), elem(message.old_tuple_data, index)}, into: %{} LogflareClient.post(record) {:noreply, state} else {:noreply, state} end end @impl true def handle_cast({:message, %{relation_id: rel_id} = message}, state) do relation = Enum.find(state.relations, &(rel_id == &1.id)) if relation do record = for {column, index} <- Enum.with_index(relation.columns), do: {String.to_atom(column.name), elem(message.tuple_data, index)}, into: %{} IO.inspect(record, label: "-------RECORD-----") LogflareClient.post(record) {:noreply, state} else {:noreply, state} end end @impl true def handle_cast({:message, _message}, state) do :noop {:noreply, state} end end ``` ## Start it!!! ```elixir # ReplHandler {:ok, _pid} = Repl.WalHandler.start_link([]) # Replication Connection {:ok, repl_pid} = Repl.ReplConn.start_link( host: "localhost", port: 54322, database: "postgres", username: "postgres", password: "postgres" ) ``` ## Insert some stuff ```elixir query = """ INSERT INTO diary_entries (body, created_at, updated_at, user_id, tag) VALUES ($1, NOW(), NOW(), $2, $3); """ Postgrex.query(pid, query, ["This is a journal entry.", 1, "work"]) ```
See source

Have you already installed Livebook?

If you already installed Livebook, you can configure the default Livebook location where you want to open notebooks.
Livebook up Checking status We can't reach this Livebook (but we saved your preference anyway)
Run notebook

Not yet? Install Livebook in just a minute

Livebook is open source, free, and ready to run anywhere.

Run on your machine

with Livebook Desktop

Run in the cloud

on select platforms

To run on Linux, Docker, embedded devices, or Elixir’s Mix, check our README.

PLATINUM SPONSORS
SPONSORS
Code navigation with go to definition of modules and functions Read More ×