# Postgres Replication to Logflare
```elixir
Mix.install([
{:ecto, ">= 3.10.2"},
{:postgrex, ">= 0.17.1"},
{:pgoutput_decoder, "~> 0.1.0"},
{:jason, "~> 1.4"},
{:logflare_api_client, "~> 0.3.5"}
])
```
## Connect to our database
```elixir
# Need this for non-local
# :ssl.start()
{:ok, pid} =
Postgrex.start_link(
hostname: "localhost",
port: 54322,
database: "postgres",
password: "postgres",
username: "postgres"
# ssl: true
)
```
```elixir
{:ok, result} = Postgrex.query(pid, "select 1 as one", [])
result
```
## Create Publication
```elixir
Postgrex.query(pid, "ALTER PUBLICATION supabase_realtime ADD TABLE diary_entries;", [])
```
## Setup Logflare Client
```elixir
defmodule Repl.LogflareClient do
def config() do
%{api_key: "blah", url: "https://api.logflare.app"}
end
def new(config) do
LogflareApiClient.new(config)
end
def post(event) do
source_id = "f8b9a438-3650-4420-9ac0-19dbf908acaa"
metadata = Jason.encode!(event)
timestamp = DateTime.now!("Etc/UTC") |> DateTime.to_unix(:millisecond)
event = %{"timestamp" => timestamp, "message" => "New record", "metadata" => metadata}
config()
|> LogflareApiClient.new()
|> LogflareApiClient.post_logs([event], source_id)
end
end
```
## Listen to the WAL
```elixir
defmodule Repl.ReplConn do
use Postgrex.ReplicationConnection
alias Repl.WalHandler
def start_link(opts) do
# Automatically reconnect if we lose connection.
extra_opts = [
auto_reconnect: true
]
Postgrex.ReplicationConnection.start_link(__MODULE__, :ok, extra_opts ++ opts)
end
@impl true
def init(:ok) do
{:ok, %{step: :disconnected}}
end
@impl true
def handle_connect(state) do
query = "CREATE_REPLICATION_SLOT postgrex TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT"
{:query, query, %{state | step: :create_slot}}
end
@impl true
def handle_result(results, %{step: :create_slot} = state) when is_list(results) do
query =
"START_REPLICATION SLOT postgrex LOGICAL 0/0 (proto_version '1', publication_names 'supabase_realtime')"
{:stream, query, [], %{state | step: :streaming}}
end
@impl true
# https://www.postgresql.org/docs/14/protocol-replication.html
def handle_data(<<?w, _wal_start::64, _wal_end::64, _clock::64, rest::binary>>, state) do
record = PgoutputDecoder.decode_message(rest) |> IO.inspect()
WalHandler.process_message(record)
{:noreply, state}
end
def handle_data(<<?k, wal_end::64, _clock::64, reply>>, state) do
messages =
case reply do
1 -> [<<?r, wal_end + 1::64, wal_end + 1::64, wal_end + 1::64, current_time()::64, 0>>]
0 -> []
end
{:noreply, messages, state}
end
@epoch DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond)
defp current_time(), do: System.os_time(:microsecond) - @epoch
end
```
## Handle WAL records
```elixir
defmodule Repl.WalHandler do
@moduledoc """
Publishes messages from Replication to PubSub
"""
use GenServer
alias PgoutputDecoder.Messages
alias Repl.LogflareClient
defstruct [:relations]
@spec start_link(any) :: :ignore | {:error, any} | {:ok, pid}
def start_link(args) do
GenServer.start_link(__MODULE__, args, name: __MODULE__)
end
@spec process_message(any) :: :ok
def process_message(message) do
GenServer.cast(__MODULE__, {:message, message})
end
@impl true
def init(_args) do
{:ok, %__MODULE__{}}
end
def get_relations() do
GenServer.call(__MODULE__, :get_relations)
end
@impl true
def handle_call(:get_relations, _from, %{relations: nil} = state) do
{:reply, {:error, :no_relations_yet}, state}
end
def handle_call(:get_relations, _from, state) do
{:reply, {:ok, state.relations}, state}
end
@impl true
def handle_cast({:message, %Messages.Relation{} = message}, state) do
relations = [message | state.relations]
{:noreply, %{state | relations: relations}}
end
@impl true
def handle_cast(
{:message, %Messages.Delete{relation_id: rel_id, old_tuple_data: nil} = message},
state
) do
relation = Enum.find(state.relations, &(rel_id == &1.id))
if relation do
record =
for {column, index} <- Enum.with_index(relation.columns),
do: {String.to_atom(column.name), elem(message.changed_key_tuple_data, index)},
into: %{}
LogflareClient.post(record)
{:noreply, state}
else
{:noreply, state}
end
end
@impl true
def handle_cast({:message, %Messages.Delete{relation_id: rel_id} = message}, state) do
relation = Enum.find(state.relations, &(rel_id == &1.id))
if relation do
record =
for {column, index} <- Enum.with_index(relation.columns),
do: {String.to_atom(column.name), elem(message.old_tuple_data, index)},
into: %{}
LogflareClient.post(record)
{:noreply, state}
else
{:noreply, state}
end
end
@impl true
def handle_cast({:message, %{relation_id: rel_id} = message}, state) do
relation = Enum.find(state.relations, &(rel_id == &1.id))
if relation do
record =
for {column, index} <- Enum.with_index(relation.columns),
do: {String.to_atom(column.name), elem(message.tuple_data, index)},
into: %{}
IO.inspect(record, label: "-------RECORD-----")
LogflareClient.post(record)
{:noreply, state}
else
{:noreply, state}
end
end
@impl true
def handle_cast({:message, _message}, state) do
:noop
{:noreply, state}
end
end
```
## Start it!!!
```elixir
# ReplHandler
{:ok, _pid} = Repl.WalHandler.start_link([])
# Replication Connection
{:ok, repl_pid} =
Repl.ReplConn.start_link(
host: "localhost",
port: 54322,
database: "postgres",
username: "postgres",
password: "postgres"
)
```
## Insert some stuff
```elixir
query = """
INSERT INTO diary_entries (body, created_at, updated_at, user_id, tag)
VALUES ($1, NOW(), NOW(), $2, $3);
"""
Postgrex.query(pid, query, ["This is a journal entry.", 1, "work"])
```