Run this notebook

Use Livebook to open this notebook and explore new ideas.

It is easy to get started, on your machine or the cloud.

Click below to open and run it in your Livebook at .

(or change your Livebook location)

<!-- livebook:{"app_settings":{"auto_shutdown_ms":3600000,"show_source":true,"slug":"es-cqrs-anatomy"},"file_entries":[{"name":"eventstorming.png","type":"attachment"}]} --> # ES/CQRS's Anatomy ```elixir Mix.install([ {:kino_db, "~> 0.2.13"}, {:postgrex, "~> 0.19.1"} ]) ``` ## Introduction docker compose: ![image](https://raw.githubusercontent.com/frnmjn/es-cqrs-anatomy/main/img/containers.png) <!-- livebook:{"break_markdown":true} --> ### What's Event Sourcing? Event sourcing is like a **historical timeline for your data**. Instead of storing a snapshot of the **current state**, event sourcing keeps a record of **every change** that has happened to your data. This allows you to reconstruct the data's past state and track its evolution over time. Think of it as a medical record for your data. Just as a doctor can review a patient's medical history to diagnose an illness, you can use event sourcing to **understand how your data has changed** and identify any anomalies. This approach is especially useful for applications that need strong **audit trails**, complex state tracking, or flexible scalability. However, it also comes with **higher storage requirements** and computational complexity. <!-- livebook:{"break_markdown":true} --> <!-- Learn more at https://mermaid-js.github.io/mermaid --> ```mermaid graph LR; A(Order Created) -->B(Item 1 Added) -->C(Item 2 Added) --> D(Order Paid) --> E(Order Shipped); ``` <!-- livebook:{"break_markdown":true} --> ### What's CQRS? CQRS (**Command Query Responsibility Segregation**) is like having **two separate sets of instructions** for your data: one for changing it (commands) and one for reading it (queries). This separation allows for specializing each set of instructions, making data manipulation and retrieval more efficient, especially for applications with high write or read loads. <!-- livebook:{"break_markdown":true} --> ### Let's put ALL TOGETHER <!-- livebook:{"break_markdown":true} --> <!-- Learn more at https://mermaid-js.github.io/mermaid --> ```mermaid graph LR; C(Command) -->|Invoked on| AGG([Aggregate]) -->|Generates|E(Event) -->|Stored to| ES[(Event Store)]; PR(Projector) -->|Read from| ES; Q(Query) -->|Query to| PN([Projection]) -->|Retrive from| RS[(Read Store)]; PR -->|Write to| RS ``` <!-- livebook:{"break_markdown":true} --> A Command is routed to an Aggregate, that emit an Event. Then the event is stored into the Event Store. The Projector **subscribe** some events and build one or more **table** into the Read Store. The Projection **retrieve data** from Read Store based on a Query <!-- livebook:{"break_markdown":true} --> ### Ok, but why all this complex stuff? <!-- livebook:{"break_markdown":true} --> Elixir for its actor model and the pattern match that fix very We'll this job Event Sourcing for its auditing built in, time traveling and new projections with an historical deep CQRS to scale the read and write using the best database for out need <!-- livebook:{"break_markdown":true} --> ### What's Event Storming Event Storming is a **collaborative workshop technique** for understanding and designing **complex business processes or software systems**. It involves a visual approach where participants **use sticky notes to represent events, commands, aggregates,** and other key elements on a large workspace. The goal is to foster communication, **shared understanding**, and **uncover insights** into the dynamics of the system or process being modeled. Event Storming is commonly used in agile development, domain-driven design, and other contexts where a shared understanding among team members is crucial. **The** picture that explains everything ![image](https://raw.githubusercontent.com/frnmjn/es-cqrs-anatomy/main/img/eventstorming.png) ## Yet Another ERP We put all the Domain Experts, Developers, Product Owners ecc in the same room and start doing a big picture event storming. <!-- livebook:{"break_markdown":true} --> ![images](https://raw.githubusercontent.com/frnmjn/es-cqrs-anatomy/main/img/es-events.png) <!-- livebook:{"break_markdown":true} --> Then at the end we have a **clear idea** of what the system should do (or at least we think so). <!-- livebook:{"break_markdown":true} --> ![images](https://raw.githubusercontent.com/frnmjn/es-cqrs-anatomy/main/img/es-complete.png) ## Let's play with Orders Let's create an Order invoking the relative command and take the aggregate id from the command response, for the next steps. <!-- livebook:{"attrs":"eyJhc3NpZ25fdG8iOiJhZ2dyZWdhdGVfdXVpZCIsImNvZGUiOiJhbGlhcyBFc0NxcnNBbmF0b215Lk9yZGVyLkNvbW1hbmRzLkNyZWF0ZU9yZGVyXG5hbGlhcyBFc0NxcnNBbmF0b215Lk9yZGVyLkNvbW1hbmRzLk9yZGVySXRlbVxuXG4lQ3JlYXRlT3JkZXJ7XG4gIGlkOiBGYWtlci5VVUlELnY0KCksIFxuICBvcmRlcl9udW1iZXI6IEZha2VyLlN0cmluZy5iYXNlNjQoNSksIFxuICBidXNpbmVzc19wYXJ0bmVyOiBGYWtlci5JbnRlcm5ldC5lbWFpbCgpLFxuICBpdGVtczogW1xuICAgICVPcmRlckl0ZW17XG4gICAgICBwcm9kdWN0X2lkOiBGYWtlci5VVUlELnY0KCksXG4gICAgICBxdWFudGl0eTogMSxcbiAgICAgIHVvbTogXCJLR1wiXG4gICAgfVxuICBdXG59IFxufD4gRXNDcXJzQW5hdG9teS5BcHAuZGlzcGF0Y2goaW5jbHVkZV9leGVjdXRpb25fcmVzdWx0OiB0cnVlKVxufD4gZWxlbSgxKVxufD4gTWFwLmdldCg6YWdncmVnYXRlX3V1aWQpIiwiY29va2llIjoic2VjcmV0Y29va2llIiwiY29va2llX3NlY3JldCI6IkNPT0tJRSIsIm5vZGUiOiJlc2NxcnNhbmF0b215QGhvc3QxLmNvbSIsIm5vZGVfc2VjcmV0IjoiIiwidXNlX2Nvb2tpZV9zZWNyZXQiOmZhbHNlLCJ1c2Vfbm9kZV9zZWNyZXQiOmZhbHNlfQ","chunks":null,"kind":"Elixir.Kino.RemoteExecutionCell","livebook_object":"smart_cell"} --> ```elixir require Kino.RPC node = :"escqrsanatomy@host1.com" Node.set_cookie(node, :secretcookie) aggregate_uuid = Kino.RPC.eval_string( node, ~S""" alias EsCqrsAnatomy.Order.Commands.CreateOrder alias EsCqrsAnatomy.Order.Commands.OrderItem %CreateOrder{ id: Faker.UUID.v4(), order_number: Faker.String.base64(5), business_partner: Faker.Internet.email(), items: [ %OrderItem{ product_id: Faker.UUID.v4(), quantity: 1, uom: "KG" } ] } |> EsCqrsAnatomy.App.dispatch(include_execution_result: true) |> elem(1) |> Map.get(:aggregate_uuid) """, file: __ENV__.file ) ``` Create a connection to the Event Store <!-- livebook:{"attrs":"eyJkYXRhYmFzZSI6ImV2ZW50X3N0b3JlIiwiaG9zdG5hbWUiOiJwb3N0Z3JlcyIsInBhc3N3b3JkIjoicG9zdGdyZXMiLCJwb3J0Ijo1NDMyLCJ0eXBlIjoicG9zdGdyZXMiLCJ1c2VfaXB2NiI6ZmFsc2UsInVzZV9zc2wiOmZhbHNlLCJ1c2VybmFtZSI6InBvc3RncmVzIiwidmFyaWFibGUiOiJjb25uIn0","chunks":null,"kind":"Elixir.KinoDB.ConnectionCell","livebook_object":"smart_cell"} --> ```elixir opts = [ hostname: "postgres", port: 5432, username: "postgres", password: "postgres", database: "event_store" ] {:ok, conn} = Kino.start_child({Postgrex, opts}) ``` Fetch all the event of the Aggregate. Here we **cheat a bit** using the event payload to identify all the events related to our Aggregate. We see the corect way to doi it soon <!-- livebook:{"attrs":"eyJjYWNoZV9xdWVyeSI6dHJ1ZSwiY29ubmVjdGlvbiI6eyJ0eXBlIjoicG9zdGdyZXMiLCJ2YXJpYWJsZSI6ImNvbm4ifSwiZGF0YV9mcmFtZV9hbGlhcyI6IkVsaXhpci5FeHBsb3Jlci5EYXRhRnJhbWUiLCJxdWVyeSI6IlNFTEVDVCBcbiAgZXZlbnRfaWQ6OnRleHQsXG4gIGV2ZW50X3R5cGUsXG4gIGRhdGFcbkZST00gZXZlbnRzIFxuV0hFUkUgZW5jb2RlKGRhdGEsICdlc2NhcGUnKTo6anNvbiAtPj4gJ2lkJyA9IHt7YWdncmVnYXRlX3V1aWR9fSIsInJlc3VsdF92YXJpYWJsZSI6InJlc3VsdF9ldmVudHMiLCJ0aW1lb3V0IjpudWxsfQ","chunks":null,"kind":"Elixir.KinoDB.SQLCell","livebook_object":"smart_cell"} --> ```elixir result_events = Postgrex.query!( conn, ~S""" SELECT event_id::text, event_type, data FROM events WHERE encode(data, 'escape')::json ->> 'id' = $1 """, [aggregate_uuid] ) ``` Every Aggregate as it's own **Stream** <!-- livebook:{"attrs":"eyJjYWNoZV9xdWVyeSI6dHJ1ZSwiY29ubmVjdGlvbiI6eyJ0eXBlIjoicG9zdGdyZXMiLCJ2YXJpYWJsZSI6ImNvbm4ifSwiZGF0YV9mcmFtZV9hbGlhcyI6IkVsaXhpci5FeHBsb3Jlci5EYXRhRnJhbWUiLCJxdWVyeSI6IlNFTEVDVCBcbiAgc3RyZWFtX2lkLFxuICBzdHJlYW1fdXVpZCxcbiAgc3RyZWFtX3ZlcnNpb24gXG5GUk9NIHN0cmVhbXMgXG5XSEVSRSBzdHJlYW1fdXVpZCA9IHt7YWdncmVnYXRlX3V1aWR9fSIsInJlc3VsdF92YXJpYWJsZSI6InJlc3VsdCIsInRpbWVvdXQiOm51bGx9","chunks":null,"kind":"Elixir.KinoDB.SQLCell","livebook_object":"smart_cell"} --> ```elixir result = Postgrex.query!( conn, ~S""" SELECT stream_id, stream_uuid, stream_version FROM streams WHERE stream_uuid = $1 """, [aggregate_uuid] ) ``` Let's invoke another command on the Aggregate <!-- livebook:{"attrs":"eyJhc3NpZ25fdG8iOiIiLCJjb2RlIjoiYWxpYXMgRXNDcXJzQW5hdG9teS5PcmRlci5Db21tYW5kcy5Db21wbGV0ZU9yZGVyXG5cbiVDb21wbGV0ZU9yZGVye1xuICBpZDogYWdncmVnYXRlX3V1aWRcbn0gXG58PiBFc0NxcnNBbmF0b215LkFwcC5kaXNwYXRjaCgpIiwiY29va2llIjoic2VjcmV0Y29va2llIiwiY29va2llX3NlY3JldCI6IkNPT0tJRSIsIm5vZGUiOiJlc2NxcnNhbmF0b215QGhvc3QxLmNvbSIsIm5vZGVfc2VjcmV0IjoiIiwidXNlX2Nvb2tpZV9zZWNyZXQiOmZhbHNlLCJ1c2Vfbm9kZV9zZWNyZXQiOmZhbHNlfQ","chunks":null,"kind":"Elixir.Kino.RemoteExecutionCell","livebook_object":"smart_cell"} --> ```elixir require Kino.RPC node = :"escqrsanatomy@host1.com" Node.set_cookie(node, :secretcookie) Kino.RPC.eval_string( node, ~S""" alias EsCqrsAnatomy.Order.Commands.CompleteOrder %CompleteOrder{ id: aggregate_uuid } |> EsCqrsAnatomy.App.dispatch() """, file: __ENV__.file ) ``` Check the Stream now <!-- livebook:{"attrs":"eyJjYWNoZV9xdWVyeSI6dHJ1ZSwiY29ubmVjdGlvbiI6eyJ0eXBlIjoicG9zdGdyZXMiLCJ2YXJpYWJsZSI6ImNvbm4ifSwiZGF0YV9mcmFtZV9hbGlhcyI6IkVsaXhpci5FeHBsb3Jlci5EYXRhRnJhbWUiLCJxdWVyeSI6IlNFTEVDVCBcbiAgc3RyZWFtX2lkLFxuICBzdHJlYW1fdXVpZCxcbiAgc3RyZWFtX3ZlcnNpb24gXG5GUk9NIHN0cmVhbXMgXG5XSEVSRSBzdHJlYW1fdXVpZCA9IHt7YWdncmVnYXRlX3V1aWR9fSIsInJlc3VsdF92YXJpYWJsZSI6InN0cmVhbSIsInRpbWVvdXQiOm51bGx9","chunks":null,"kind":"Elixir.KinoDB.SQLCell","livebook_object":"smart_cell"} --> ```elixir stream = Postgrex.query!( conn, ~S""" SELECT stream_id, stream_uuid, stream_version FROM streams WHERE stream_uuid = $1 """, [aggregate_uuid] ) ``` Take the Stream Id of out Aggregate <!-- livebook:{"attrs":"eyJhc3NpZ25fdG8iOiJzdHJlYW1faWQiLCJjb2RlIjoiJXtyb3dzOiBbW3N0cmVhbV9pZCB8IF9dXX0gPSBzdHJlYW1cbnN0cmVhbV9pZCIsImNvb2tpZSI6InNlY3JldGNvb2tpZSIsImNvb2tpZV9zZWNyZXQiOiJDT09LSUUiLCJub2RlIjoiZXNjcXJzYW5hdG9teUBob3N0MS5jb20iLCJub2RlX3NlY3JldCI6IiIsInVzZV9jb29raWVfc2VjcmV0IjpmYWxzZSwidXNlX25vZGVfc2VjcmV0IjpmYWxzZX0","chunks":null,"kind":"Elixir.Kino.RemoteExecutionCell","livebook_object":"smart_cell"} --> ```elixir require Kino.RPC node = :"escqrsanatomy@host1.com" Node.set_cookie(node, :secretcookie) stream_id = Kino.RPC.eval_string( node, ~S""" %{rows: [[stream_id | _]]} = stream stream_id """, file: __ENV__.file ) ``` Check the event i that Stream <!-- livebook:{"attrs":"eyJjYWNoZV9xdWVyeSI6dHJ1ZSwiY29ubmVjdGlvbiI6eyJ0eXBlIjoicG9zdGdyZXMiLCJ2YXJpYWJsZSI6ImNvbm4ifSwiZGF0YV9mcmFtZV9hbGlhcyI6IkVsaXhpci5FeHBsb3Jlci5EYXRhRnJhbWUiLCJxdWVyeSI6IlNFTEVDVFxuICBldmVudF9pZDo6dGV4dCxcbiAgc3RyZWFtX2lkLFxuICBzdHJlYW1fdmVyc2lvblxuRlJPTSBzdHJlYW1fZXZlbnRzIHNlXG5XSEVSRSBzdHJlYW1faWQgPSB7e3N0cmVhbV9pZH19XG5PUkRFUiBCWSBzdHJlYW1fdmVyc2lvbiBBU0MiLCJyZXN1bHRfdmFyaWFibGUiOiJyZXN1bHQ0IiwidGltZW91dCI6bnVsbH0","chunks":null,"kind":"Elixir.KinoDB.SQLCell","livebook_object":"smart_cell"} --> ```elixir result4 = Postgrex.query!( conn, ~S""" SELECT event_id::text, stream_id, stream_version FROM stream_events se WHERE stream_id = $1 ORDER BY stream_version ASC """, [stream_id] ) ``` Put **all together** to see all events for a give Aggregate <!-- livebook:{"attrs":"eyJjYWNoZV9xdWVyeSI6dHJ1ZSwiY29ubmVjdGlvbiI6eyJ0eXBlIjoicG9zdGdyZXMiLCJ2YXJpYWJsZSI6ImNvbm4ifSwiZGF0YV9mcmFtZV9hbGlhcyI6IkVsaXhpci5FeHBsb3Jlci5EYXRhRnJhbWUiLCJxdWVyeSI6IlNFTEVDVCBcbiAgZS5ldmVudF9pZDo6dGV4dCxcbiAgZXZlbnRfdHlwZSxcbiAgZGF0YVxuRlJPTSBldmVudHMgZVxuICBKT0lOIHN0cmVhbV9ldmVudHMgc2VcbiAgICBPTiBlLmV2ZW50X2lkID0gc2UuZXZlbnRfaWRcbiAgSk9JTiBzdHJlYW1zIHNcbiAgICBPTiBzLnN0cmVhbV9pZCA9IHNlLnN0cmVhbV9pZFxuV0hFUkUgcy5zdHJlYW1fdXVpZCA9IHt7YWdncmVnYXRlX3V1aWR9fVxuT1JERVIgQlkgc2Uuc3RyZWFtX3ZlcnNpb24gQVNDIiwicmVzdWx0X3ZhcmlhYmxlIjoicmVzdWx0MyIsInRpbWVvdXQiOm51bGx9","chunks":null,"kind":"Elixir.KinoDB.SQLCell","livebook_object":"smart_cell"} --> ```elixir result3 = Postgrex.query!( conn, ~S""" SELECT e.event_id::text, event_type, data FROM events e JOIN stream_events se ON e.event_id = se.event_id JOIN streams s ON s.stream_id = se.stream_id WHERE s.stream_uuid = $1 ORDER BY se.stream_version ASC """, [aggregate_uuid] ) ``` ## The C stand for Command How a command is routed to the right Aggregate? With the Router of course! We define which fields are the key of the Aggregate and which Command should be routed to which Aggregate **Note! We have removed CreateOrder intentionally. Let's see what happen...** <!-- livebook:{"attrs":"eyJhc3NpZ25fdG8iOiIiLCJjb2RlIjoiZGVmbW9kdWxlIEVzQ3Fyc0FuYXRvbXkuUm91dGVyIGRvXG4gIHVzZSBDb21tYW5kZWQuQ29tbWFuZHMuUm91dGVyXG5cbiAgYWxpYXMgRXNDcXJzQW5hdG9teS5PcmRlci5BZ2dyZWdhdGUuT3JkZXJcblxuICBhbGlhcyBFc0NxcnNBbmF0b215Lk9yZGVyLkNvbW1hbmRzLntDb21wbGV0ZU9yZGVyfVxuICBcbiAgaWRlbnRpZnkoT3JkZXIsIGJ5OiA6aWQpXG5cbiAgZGlzcGF0Y2goW0NvbXBsZXRlT3JkZXJdLCB0bzogT3JkZXIpXG4gIFxuZW5kIiwiY29va2llIjoic2VjcmV0Y29va2llIiwiY29va2llX3NlY3JldCI6IkNPT0tJRSIsIm5vZGUiOiJlc2NxcnNhbmF0b215QGhvc3QxLmNvbSIsIm5vZGVfc2VjcmV0IjoiIiwidXNlX2Nvb2tpZV9zZWNyZXQiOmZhbHNlLCJ1c2Vfbm9kZV9zZWNyZXQiOmZhbHNlfQ","chunks":null,"kind":"Elixir.Kino.RemoteExecutionCell","livebook_object":"smart_cell"} --> ```elixir require Kino.RPC node = :"escqrsanatomy@host1.com" Node.set_cookie(node, :secretcookie) Kino.RPC.eval_string( node, ~S""" defmodule EsCqrsAnatomy.Router do use Commanded.Commands.Router alias EsCqrsAnatomy.Order.Aggregate.Order alias EsCqrsAnatomy.Order.Commands.{CompleteOrder} identify(Order, by: :id) dispatch([CompleteOrder], to: Order) end """, file: __ENV__.file ) ``` <!-- livebook:{"attrs":"eyJhc3NpZ25fdG8iOiJjb21tYW5kX3Jlc3VsdCIsImNvZGUiOiJhbGlhcyBFc0NxcnNBbmF0b215Lk9yZGVyLkNvbW1hbmRzLkNyZWF0ZU9yZGVyXG5hbGlhcyBFc0NxcnNBbmF0b215Lk9yZGVyLkNvbW1hbmRzLk9yZGVySXRlbVxuXG4lQ3JlYXRlT3JkZXJ7XG4gIGlkOiBGYWtlci5VVUlELnY0KCksIFxuICBvcmRlcl9udW1iZXI6IEZha2VyLlN0cmluZy5iYXNlNjQoNSksIFxuICBidXNpbmVzc19wYXJ0bmVyOiBGYWtlci5JbnRlcm5ldC5lbWFpbCgpLFxuICBpdGVtczogW1xuICAgICVPcmRlckl0ZW17XG4gICAgICBwcm9kdWN0X2lkOiBGYWtlci5VVUlELnY0KCksXG4gICAgICBxdWFudGl0eTogMSxcbiAgICAgIHVvbTogXCJLR1wiXG4gICAgfVxuICBdXG59IFxufD4gRXNDcXJzQW5hdG9teS5BcHAuZGlzcGF0Y2goKSIsImNvb2tpZSI6InNlY3JldGNvb2tpZSIsImNvb2tpZV9zZWNyZXQiOiJDT09LSUUiLCJub2RlIjoiZXNjcXJzYW5hdG9teUBob3N0MS5jb20iLCJub2RlX3NlY3JldCI6IiIsInVzZV9jb29raWVfc2VjcmV0IjpmYWxzZSwidXNlX25vZGVfc2VjcmV0IjpmYWxzZX0","chunks":null,"kind":"Elixir.Kino.RemoteExecutionCell","livebook_object":"smart_cell"} --> ```elixir require Kino.RPC node = :"escqrsanatomy@host1.com" Node.set_cookie(node, :secretcookie) command_result = Kino.RPC.eval_string( node, ~S""" alias EsCqrsAnatomy.Order.Commands.CreateOrder alias EsCqrsAnatomy.Order.Commands.OrderItem %CreateOrder{ id: Faker.UUID.v4(), order_number: Faker.String.base64(5), business_partner: Faker.Internet.email(), items: [ %OrderItem{ product_id: Faker.UUID.v4(), quantity: 1, uom: "KG" } ] } |> EsCqrsAnatomy.App.dispatch() """, file: __ENV__.file ) ``` Cool! The Command is not registered to any Aggregate ## Finally the Aggregate <!-- livebook:{"attrs":"eyJhc3NpZ25fdG8iOiIiLCJjb2RlIjoiZGVmbW9kdWxlIEVzQ3Fyc0FuYXRvbXkuT3JkZXIuQWdncmVnYXRlLk9yZGVyIGRvXG4gIHVzZSBUeXBlZFN0cnVjdFxuXG4gIGFsaWFzIEVzQ3Fyc0FuYXRvbXkuT3JkZXIuQ29tbWFuZHMue0NyZWF0ZU9yZGVyLCBDb21wbGV0ZU9yZGVyLCBEZWxldGVPcmRlcn1cbiAgYWxpYXMgRXNDcXJzQW5hdG9teS5PcmRlci5FdmVudHMue09yZGVyQ3JlYXRlZCwgT3JkZXJDb21wbGV0ZWQsIE9yZGVyRGVsZXRlZH1cbiAgYWxpYXMgRXNDcXJzQW5hdG9teS5PcmRlci5BZ2dyZWdhdGUuT3JkZXJTdGF0dXNcblxuICBAZGVyaXZlIEphc29uLkVuY29kZXJcbiAgdHlwZWRzdHJ1Y3QgZW5mb3JjZTogdHJ1ZSBkb1xuICAgIGZpZWxkKDppZCwgU3RyaW5nLnQoKSlcbiAgICBmaWVsZCg6c3RhdHVzLCBTdHJpbmcudCgpKVxuICBlbmRcblxuICBkZWYgZXhlY3V0ZSglX19NT0RVTEVfX3tpZDogbmlsfSwgJUNyZWF0ZU9yZGVye30gPSBjb21tYW5kKSBkb1xuICAgICVPcmRlckNyZWF0ZWR7XG4gICAgICBpZDogY29tbWFuZC5pZCxcbiAgICAgIG9yZGVyX251bWJlcjogY29tbWFuZC5vcmRlcl9udW1iZXIsXG4gICAgICBidXNpbmVzc19wYXJ0bmVyOiBjb21tYW5kLmJ1c2luZXNzX3BhcnRuZXIsXG4gICAgICBpdGVtczogY29tbWFuZC5pdGVtc1xuICAgIH1cbiAgZW5kXG5cbiAgZGVmIGV4ZWN1dGUoJV9fTU9EVUxFX197fSwgJUNyZWF0ZU9yZGVye30pLCBkbzogezplcnJvciwgXCJPcmRlciBhbHJlYWR5IGNyZWF0ZWRcIn1cblxuICBkZWYgZXhlY3V0ZSglX19NT0RVTEVfX3tzdGF0dXM6IFwiQ09NUExFVEVEXCJ9LCAlQ29tcGxldGVPcmRlcnt9KSxcbiAgICBkbzogezplcnJvciwgXCJPcmRlciBhbHJlYWR5IGNvbXBsZXRlZFwifVxuXG4gIGRlZiBleGVjdXRlKCVfX01PRFVMRV9fe30sICVDb21wbGV0ZU9yZGVye2Jsb2NrZWRfcHJvZHVjdF9pZHM6IGJsb2NrZWRfcHJvZHVjdF9pZHN9KVxuICAgICAgd2hlbiBsZW5ndGgoYmxvY2tlZF9wcm9kdWN0X2lkcykgPiAwIGRvXG4gICAgezplcnJvciwgXCJPcmRlciBjb250YWlucyBibG9ja2VkIHByb2R1Y3RzOiAje0lPLmluc3BlY3QoYmxvY2tlZF9wcm9kdWN0X2lkcyl9XCJ9XG4gIGVuZFxuXG4gIGRlZiBleGVjdXRlKCVfX01PRFVMRV9fe2lkOiBpZH0sICVDb21wbGV0ZU9yZGVye30pLCBkbzogJU9yZGVyQ29tcGxldGVke2lkOiBpZH1cblxuICBkZWYgZXhlY3V0ZSglX19NT0RVTEVfX3tpZDogaWR9LCAlRGVsZXRlT3JkZXJ7fSksIGRvOiAlT3JkZXJEZWxldGVke2lkOiBpZH1cblxuICBkZWYgYXBwbHkoJV9fTU9EVUxFX197fSA9IG9yZGVyLCAlT3JkZXJDcmVhdGVke30gPSBldmVudCkgZG9cbiAgICAlX19NT0RVTEVfX3tcbiAgICAgIG9yZGVyXG4gICAgICB8IGlkOiBldmVudC5pZCxcbiAgICAgICAgc3RhdHVzOiBPcmRlclN0YXR1cy5vcGVuKClcbiAgICB9XG4gIGVuZFxuXG4gIGRlZiBhcHBseSglX19NT0RVTEVfX3t9ID0gb3JkZXIsICVPcmRlckNvbXBsZXRlZHt9ID0gZXZlbnQpIGRvXG4gICAgJV9fTU9EVUxFX197XG4gICAgICBvcmRlciB8IHN0YXR1czogT3JkZXJTdGF0dXMuY29tcGxldGVkKClcbiAgICB9XG4gIGVuZFxuXG4gIGRlZiBhcHBseSglX19NT0RVTEVfX3t9ID0gb3JkZXIsIF8pLCBkbzogb3JkZXJcbmVuZFxuIiwiY29va2llIjoic2VjcmV0Y29va2llIiwiY29va2llX3NlY3JldCI6IkNPT0tJRSIsIm5vZGUiOiJlc2NxcnNhbmF0b215QGhvc3QxLmNvbSIsIm5vZGVfc2VjcmV0IjoiIiwidXNlX2Nvb2tpZV9zZWNyZXQiOmZhbHNlLCJ1c2Vfbm9kZV9zZWNyZXQiOmZhbHNlfQ","chunks":null,"kind":"Elixir.Kino.RemoteExecutionCell","livebook_object":"smart_cell"} --> ```elixir require Kino.RPC node = :"escqrsanatomy@host1.com" Node.set_cookie(node, :secretcookie) Kino.RPC.eval_string( node, ~S""" defmodule EsCqrsAnatomy.Order.Aggregate.Order do use TypedStruct alias EsCqrsAnatomy.Order.Commands.{CreateOrder, CompleteOrder, DeleteOrder} alias EsCqrsAnatomy.Order.Events.{OrderCreated, OrderCompleted, OrderDeleted} alias EsCqrsAnatomy.Order.Aggregate.OrderStatus @derive Jason.Encoder typedstruct enforce: true do field(:id, String.t()) field(:status, String.t()) end def execute(%__MODULE__{id: nil}, %CreateOrder{} = command) do %OrderCreated{ id: command.id, order_number: command.order_number, business_partner: command.business_partner, items: command.items } end def execute(%__MODULE__{}, %CreateOrder{}), do: {:error, "Order already created"} def execute(%__MODULE__{status: "COMPLETED"}, %CompleteOrder{}), do: {:error, "Order already completed"} def execute(%__MODULE__{}, %CompleteOrder{blocked_product_ids: blocked_product_ids}) when length(blocked_product_ids) > 0 do {:error, "Order contains blocked products: #{IO.inspect(blocked_product_ids)}"} end def execute(%__MODULE__{id: id}, %CompleteOrder{}), do: %OrderCompleted{id: id} def execute(%__MODULE__{id: id}, %DeleteOrder{}), do: %OrderDeleted{id: id} def apply(%__MODULE__{} = order, %OrderCreated{} = event) do %__MODULE__{ order | id: event.id, status: OrderStatus.open() } end def apply(%__MODULE__{} = order, %OrderCompleted{} = event) do %__MODULE__{ order | status: OrderStatus.completed() } end def apply(%__MODULE__{} = order, _), do: order end """, file: __ENV__.file ) ``` ## Event Handlers ![images](https://raw.githubusercontent.com/frnmjn/es-cqrs-anatomy/main/img/es-policy.png) <!-- livebook:{"attrs":"eyJhc3NpZ25fdG8iOiIiLCJjb2RlIjoiZGVmbW9kdWxlIEVzQ3Fyc0FuYXRvbXkuUm91dGVyIGRvXG4gIHVzZSBDb21tYW5kZWQuQ29tbWFuZHMuUm91dGVyXG5cbiAgYWxpYXMgRXNDcXJzQW5hdG9teS5NaWRkbGV3YXJlLntWYWxpZGF0ZSwgRW5yaWNobWVudH1cbiAgYWxpYXMgQ29tbWFuZGVkLk1pZGRsZXdhcmUuVW5pcXVlbmVzc1xuICBhbGlhcyBFc0NxcnNBbmF0b215Lk9yZGVyLkFnZ3JlZ2F0ZS5PcmRlclxuICBhbGlhcyBFc0NxcnNBbmF0b215Lk9yZGVyLkNvbW1hbmRzLntDcmVhdGVPcmRlciwgQ29tcGxldGVPcmRlcn1cbiAgYWxpYXMgRXNDcXJzQW5hdG9teS5TaGlwbWVudC5BZ2dyZWdhdGUuU2hpcG1lbnRcbiAgYWxpYXMgRXNDcXJzQW5hdG9teS5TaGlwbWVudC5Db21tYW5kcy57Q3JlYXRlU2hpcG1lbnQsIENvbXBsZXRlU2hpcG1lbnR9XG5cbiAgbWlkZGxld2FyZShFbnJpY2htZW50KVxuICBtaWRkbGV3YXJlKFZhbGlkYXRlKVxuICBtaWRkbGV3YXJlKFVuaXF1ZW5lc3MpXG5cbiAgaWRlbnRpZnkoT3JkZXIsIGJ5OiA6aWQpXG4gIGlkZW50aWZ5KFNoaXBtZW50LCBieTogOmlkKVxuXG4gIGRpc3BhdGNoKFtDcmVhdGVPcmRlciwgQ29tcGxldGVPcmRlcl0sIHRvOiBPcmRlcilcbiAgZGlzcGF0Y2goW0NyZWF0ZVNoaXBtZW50LCBDb21wbGV0ZVNoaXBtZW50XSwgdG86IFNoaXBtZW50KVxuZW5kIiwiY29va2llIjoic2VjcmV0Y29va2llIiwiY29va2llX3NlY3JldCI6IkNPT0tJRSIsIm5vZGUiOiJlc2NxcnNhbmF0b215QGhvc3QxLmNvbSIsIm5vZGVfc2VjcmV0IjoiIiwidXNlX2Nvb2tpZV9zZWNyZXQiOmZhbHNlLCJ1c2Vfbm9kZV9zZWNyZXQiOmZhbHNlfQ","chunks":null,"kind":"Elixir.Kino.RemoteExecutionCell","livebook_object":"smart_cell"} --> ```elixir require Kino.RPC node = :"escqrsanatomy@host1.com" Node.set_cookie(node, :secretcookie) Kino.RPC.eval_string( node, ~S""" defmodule EsCqrsAnatomy.Router do use Commanded.Commands.Router alias EsCqrsAnatomy.Middleware.{Validate, Enrichment} alias Commanded.Middleware.Uniqueness alias EsCqrsAnatomy.Order.Aggregate.Order alias EsCqrsAnatomy.Order.Commands.{CreateOrder, CompleteOrder} alias EsCqrsAnatomy.Shipment.Aggregate.Shipment alias EsCqrsAnatomy.Shipment.Commands.{CreateShipment, CompleteShipment} middleware(Enrichment) middleware(Validate) middleware(Uniqueness) identify(Order, by: :id) identify(Shipment, by: :id) dispatch([CreateOrder, CompleteOrder], to: Order) dispatch([CreateShipment, CompleteShipment], to: Shipment) end """, file: __ENV__.file ) ``` <!-- livebook:{"attrs":"eyJhc3NpZ25fdG8iOiIiLCJjb2RlIjoiZGVmbW9kdWxlIEVzQ3Fyc0FuYXRvbXkuU2hpcG1lbnQuUG9saWNpZXMuU2hpcG1lbnQgZG9cbiAgdXNlIENvbW1hbmRlZC5FdmVudC5IYW5kbGVyLFxuICAgIGFwcGxpY2F0aW9uOiBFc0NxcnNBbmF0b215LkFwcCxcbiAgICBuYW1lOiBcInNoaXBtZW50XCIsXG4gICAgc3RhcnRfZnJvbTogOmN1cnJlbnRcbiAgICBcbiAgdXNlIEVzQ3Fyc0FuYXRvbXkuQmFzZS5FdmVudEhhbmRsZXJcbiAgXG4gIGFsaWFzIEVzQ3Fyc0FuYXRvbXkuT3JkZXIuRXZlbnRzLk9yZGVyQ29tcGxldGVkXG4gIGFsaWFzIEVzQ3Fyc0FuYXRvbXkuU2hpcG1lbnQuQ29tbWFuZHMuQ3JlYXRlU2hpcG1lbnRcblxuICBkZWYgaGFuZGxlKCVPcmRlckNvbXBsZXRlZHtpZDogaWR9LCAle1xuICAgICAgICBldmVudF9pZDogY2F1c2F0aW9uX2lkLFxuICAgICAgICBjb3JyZWxhdGlvbl9pZDogY29ycmVsYXRpb25faWRcbiAgICAgIH0pIGRvXG4gICAgJUNyZWF0ZVNoaXBtZW50e1xuICAgICAgaWQ6IFVVSUQudXVpZDQoKSxcbiAgICAgIG9yZGVyX2lkOiBpZFxuICAgIH1cbiAgICB8PiBFc0NxcnNBbmF0b215LkFwcC5kaXNwYXRjaChjYXVzYXRpb25faWQ6IGNhdXNhdGlvbl9pZCwgY29ycmVsYXRpb25faWQ6IGNvcnJlbGF0aW9uX2lkKVxuICBlbmRcblxuZW5kIiwiY29va2llIjoic2VjcmV0Y29va2llIiwiY29va2llX3NlY3JldCI6IkNPT0tJRSIsIm5vZGUiOiJlc2NxcnNhbmF0b215QGhvc3QxLmNvbSIsIm5vZGVfc2VjcmV0IjoiIiwidXNlX2Nvb2tpZV9zZWNyZXQiOmZhbHNlLCJ1c2Vfbm9kZV9zZWNyZXQiOmZhbHNlfQ","chunks":null,"kind":"Elixir.Kino.RemoteExecutionCell","livebook_object":"smart_cell"} --> ```elixir require Kino.RPC node = :"escqrsanatomy@host1.com" Node.set_cookie(node, :secretcookie) Kino.RPC.eval_string( node, ~S""" defmodule EsCqrsAnatomy.Shipment.Policies.Shipment do use Commanded.Event.Handler, application: EsCqrsAnatomy.App, name: "shipment", start_from: :current use EsCqrsAnatomy.Base.EventHandler alias EsCqrsAnatomy.Order.Events.OrderCompleted alias EsCqrsAnatomy.Shipment.Commands.CreateShipment def handle(%OrderCompleted{id: id}, %{ event_id: causation_id, correlation_id: correlation_id }) do %CreateShipment{ id: UUID.uuid4(), order_id: id } |> EsCqrsAnatomy.App.dispatch(causation_id: causation_id, correlation_id: correlation_id) end end """, file: __ENV__.file ) ``` <!-- livebook:{"attrs":"eyJhc3NpZ25fdG8iOiIiLCJjb2RlIjoiZGVmbW9kdWxlIEVzQ3Fyc0FuYXRvbXkuRXZlbnRIYW5kbGVyU3VwZXJ2aXNvciBkb1xuICB1c2UgU3VwZXJ2aXNvclxuXG4gIGRlZiBzdGFydF9saW5rKF9hcmdzKSBkb1xuICAgIFN1cGVydmlzb3Iuc3RhcnRfbGluayhfX01PRFVMRV9fLCBbXSwgbmFtZTogX19NT0RVTEVfXylcbiAgZW5kXG5cbiAgZGVmIGluaXQoX2FyZ3MpIGRvXG4gICAgU3VwZXJ2aXNvci5pbml0KFxuICAgICAgW1xuICAgICAgICBFc0NxcnNBbmF0b215Lk9yZGVyLlByb2plY3RvcnMuT3JkZXJzLFxuICAgICAgICBFc0NxcnNBbmF0b215LlNoaXBtZW50LlBvbGljaWVzLlNoaXBtZW50XG4gICAgICBdLFxuICAgICAgc3RyYXRlZ3k6IDpvbmVfZm9yX29uZVxuICAgIClcbiAgZW5kXG5lbmRcbiIsImNvb2tpZSI6InNlY3JldGNvb2tpZSIsImNvb2tpZV9zZWNyZXQiOiJDT09LSUUiLCJub2RlIjoiZXNjcXJzYW5hdG9teUBob3N0MS5jb20iLCJub2RlX3NlY3JldCI6IiIsInVzZV9jb29raWVfc2VjcmV0IjpmYWxzZSwidXNlX25vZGVfc2VjcmV0IjpmYWxzZX0","chunks":null,"kind":"Elixir.Kino.RemoteExecutionCell","livebook_object":"smart_cell"} --> ```elixir require Kino.RPC node = :"escqrsanatomy@host1.com" Node.set_cookie(node, :secretcookie) Kino.RPC.eval_string( node, ~S""" defmodule EsCqrsAnatomy.EventHandlerSupervisor do use Supervisor def start_link(_args) do Supervisor.start_link(__MODULE__, [], name: __MODULE__) end def init(_args) do Supervisor.init( [ EsCqrsAnatomy.Order.Projectors.Orders, EsCqrsAnatomy.Shipment.Policies.Shipment ], strategy: :one_for_one ) end end """, file: __ENV__.file ) ``` <!-- livebook:{"attrs":"eyJhc3NpZ25fdG8iOiIiLCJjb2RlIjoiUHJvY2Vzcy53aGVyZWlzKEVzQ3Fyc0FuYXRvbXkuRXZlbnRIYW5kbGVyU3VwZXJ2aXNvcikgfD4gUHJvY2Vzcy5leGl0KDpraWxsKSIsImNvb2tpZSI6InNlY3JldGNvb2tpZSIsImNvb2tpZV9zZWNyZXQiOiJDT09LSUUiLCJub2RlIjoiZXNjcXJzYW5hdG9teUBob3N0MS5jb20iLCJub2RlX3NlY3JldCI6IiIsInVzZV9jb29raWVfc2VjcmV0IjpmYWxzZSwidXNlX25vZGVfc2VjcmV0IjpmYWxzZX0","chunks":null,"kind":"Elixir.Kino.RemoteExecutionCell","livebook_object":"smart_cell"} --> ```elixir require Kino.RPC node = :"escqrsanatomy@host1.com" Node.set_cookie(node, :secretcookie) Kino.RPC.eval_string( node, ~S"Process.whereis(EsCqrsAnatomy.EventHandlerSupervisor) |> Process.exit(:kill)", file: __ENV__.file ) ``` <!-- livebook:{"attrs":"eyJhc3NpZ25fdG8iOiJvcmRlcl9pZCIsImNvZGUiOiJhbGlhcyBFc0NxcnNBbmF0b215Lk9yZGVyLkNvbW1hbmRzLntDcmVhdGVPcmRlciwgQ29tcGxldGVPcmRlcn1cbmFsaWFzIEVzQ3Fyc0FuYXRvbXkuT3JkZXIuQ29tbWFuZHMuT3JkZXJJdGVtXG5hbGlhcyBFc0NxcnNBbmF0b215Lk9yZGVyLkNvbnN0YW50c1xuXG5vcmRlcl9pZCA9IFxuICAlQ3JlYXRlT3JkZXJ7XG4gICAgaWQ6IEZha2VyLlVVSUQudjQoKSwgXG4gICAgb3JkZXJfbnVtYmVyOiBGYWtlci5TdHJpbmcuYmFzZTY0KDUpLCBcbiAgICBidXNpbmVzc19wYXJ0bmVyOiBGYWtlci5JbnRlcm5ldC5lbWFpbCgpLFxuICAgIGl0ZW1zOiBbXG4gICAgICAlT3JkZXJJdGVte1xuICAgICAgICBwcm9kdWN0X2lkOiBGYWtlci5VVUlELnY0KCksXG4gICAgICAgIHF1YW50aXR5OiAxLFxuICAgICAgICB1b206IFwiS0dcIlxuICAgICAgfVxuICAgIF1cbiAgfSBcbiAgfD4gRXNDcXJzQW5hdG9teS5BcHAuZGlzcGF0Y2goaW5jbHVkZV9leGVjdXRpb25fcmVzdWx0OiB0cnVlKVxuICB8PiBlbGVtKDEpXG4gIHw+IE1hcC5nZXQoOmFnZ3JlZ2F0ZV91dWlkKVxuXG4lQ29tcGxldGVPcmRlcntcbiAgaWQ6IG9yZGVyX2lkXG59IFxufD4gRXNDcXJzQW5hdG9teS5BcHAuZGlzcGF0Y2goKVxuXG5vcmRlcl9pZCIsImNvb2tpZSI6InNlY3JldGNvb2tpZSIsImNvb2tpZV9zZWNyZXQiOiJDT09LSUUiLCJub2RlIjoiZXNjcXJzYW5hdG9teUBob3N0MS5jb20iLCJub2RlX3NlY3JldCI6IiIsInVzZV9jb29raWVfc2VjcmV0IjpmYWxzZSwidXNlX25vZGVfc2VjcmV0IjpmYWxzZX0","chunks":null,"kind":"Elixir.Kino.RemoteExecutionCell","livebook_object":"smart_cell"} --> ```elixir require Kino.RPC node = :"escqrsanatomy@host1.com" Node.set_cookie(node, :secretcookie) order_id = Kino.RPC.eval_string( node, ~S""" alias EsCqrsAnatomy.Order.Commands.{CreateOrder, CompleteOrder} alias EsCqrsAnatomy.Order.Commands.OrderItem alias EsCqrsAnatomy.Order.Constants order_id = %CreateOrder{ id: Faker.UUID.v4(), order_number: Faker.String.base64(5), business_partner: Faker.Internet.email(), items: [ %OrderItem{ product_id: Faker.UUID.v4(), quantity: 1, uom: "KG" } ] } |> EsCqrsAnatomy.App.dispatch(include_execution_result: true) |> elem(1) |> Map.get(:aggregate_uuid) %CompleteOrder{ id: order_id } |> EsCqrsAnatomy.App.dispatch() order_id """, file: __ENV__.file ) ``` <!-- livebook:{"attrs":"eyJjYWNoZV9xdWVyeSI6dHJ1ZSwiY29ubmVjdGlvbiI6eyJ0eXBlIjoicG9zdGdyZXMiLCJ2YXJpYWJsZSI6ImNvbm4ifSwiZGF0YV9mcmFtZV9hbGlhcyI6IkVsaXhpci5FeHBsb3Jlci5EYXRhRnJhbWUiLCJxdWVyeSI6IlNFTEVDVCBcbiAgZXZlbnRfaWQ6OnRleHQsXG4gIGV2ZW50X3R5cGUsXG4gIGRhdGFcbkZST00gZXZlbnRzIFxuV0hFUkUgZXZlbnRfdHlwZSA9ICdTaGlwbWVudC5TaGlwbWVudENyZWF0ZWQnIFxuQU5EIGVuY29kZShkYXRhLCAnZXNjYXBlJyk6Ompzb24gLT4+ICdvcmRlcl9pZCcgPSB7e29yZGVyX2lkfX0iLCJyZXN1bHRfdmFyaWFibGUiOiJyZXN1bHQ1IiwidGltZW91dCI6bnVsbH0","chunks":null,"kind":"Elixir.KinoDB.SQLCell","livebook_object":"smart_cell"} --> ```elixir result5 = Postgrex.query!( conn, ~S""" SELECT event_id::text, event_type, data FROM events WHERE event_type = 'Shipment.ShipmentCreated' AND encode(data, 'escape')::json ->> 'order_id' = $1 """, [order_id] ) ``` ### Projector <!-- livebook:{"attrs":"eyJhc3NpZ25fdG8iOiIiLCJjb2RlIjoiZGVmbW9kdWxlIEVzQ3Fyc0FuYXRvbXkuT3JkZXIuUHJvamVjdG9ycy5PcmRlcnMgZG9cbiAgdXNlIENvbW1hbmRlZC5Qcm9qZWN0aW9ucy5FY3RvLFxuICAgIGFwcGxpY2F0aW9uOiBFc0NxcnNBbmF0b215LkFwcCxcbiAgICByZXBvOiBFc0NxcnNBbmF0b215LlJlcG8sXG4gICAgbmFtZTogXCJvcmRlcnNcIlxuXG4gIHVzZSBFc0NxcnNBbmF0b215LkJhc2UuRXZlbnRIYW5kbGVyXG5cbiAgYWxpYXMgRXNDcXJzQW5hdG9teS5PcmRlci5FdmVudHMue1xuICAgIE9yZGVyQ3JlYXRlZCxcbiAgICBPcmRlckNvbXBsZXRlZCxcbiAgICBPcmRlckRlbGV0ZWRcbiAgfVxuXG4gIGFsaWFzIEVzQ3Fyc0FuYXRvbXkuT3JkZXIuUHJvamVjdGlvbnMue09yZGVyLCBPcmRlckl0ZW19XG4gIGFsaWFzIEVzQ3Fyc0FuYXRvbXkuT3JkZXIuQWdncmVnYXRlLk9yZGVyU3RhdHVzXG5cbiAgcHJvamVjdChcbiAgICAlT3JkZXJDcmVhdGVke30gPSBldmVudCxcbiAgICBfbWV0YWRhdGEsXG4gICAgZm4gbXVsdGkgLT5cbiAgICAgIG11bHRpXG4gICAgICB8PiBFY3RvLk11bHRpLmluc2VydChcbiAgICAgICAgOm9yZGVycyxcbiAgICAgICAgJU9yZGVye1xuICAgICAgICAgIGlkOiBldmVudC5pZCxcbiAgICAgICAgICBvcmRlcl9udW1iZXI6IGV2ZW50Lm9yZGVyX251bWJlcixcbiAgICAgICAgICBidXNpbmVzc19wYXJ0bmVyOiBldmVudC5idXNpbmVzc19wYXJ0bmVyLFxuICAgICAgICAgIHN0YXR1czogT3JkZXJTdGF0dXMub3BlbigpLFxuICAgICAgICAgIGl0ZW1zOlxuICAgICAgICAgICAgZXZlbnQuaXRlbXNcbiAgICAgICAgICAgIHw+IEVudW0ubWFwKFxuICAgICAgICAgICAgICAmJU9yZGVySXRlbXtcbiAgICAgICAgICAgICAgICBpZDogVVVJRC51dWlkNCgpLFxuICAgICAgICAgICAgICAgIG9yZGVyX2lkOiBldmVudC5pZCxcbiAgICAgICAgICAgICAgICBwcm9kdWN0X2lkOiAmMS5wcm9kdWN0X2lkLFxuICAgICAgICAgICAgICAgIHF1YW50aXR5OiAmMS5xdWFudGl0eSxcbiAgICAgICAgICAgICAgICB1b206ICYxLnVvbVxuICAgICAgICAgICAgICB9XG4gICAgICAgICAgICApXG4gICAgICAgIH1cbiAgICAgIClcbiAgICBlbmRcbiAgKVxuXG4gIHByb2plY3QoXG4gICAgJU9yZGVyQ29tcGxldGVke2lkOiBpZH0sXG4gICAgX21ldGFkYXRhLFxuICAgIGZuIG11bHRpIC0+XG4gICAgICBtdWx0aVxuICAgICAgfD4gRWN0by5NdWx0aS5ydW4oOm9yZGVyX3RvX3VwZGF0ZSwgZm4gcmVwbywgX2NoYW5nZXMgLT5cbiAgICAgICAgezpvaywgcmVwby5nZXQoT3JkZXIsIGlkKX1cbiAgICAgIGVuZClcbiAgICAgIHw+IEVjdG8uTXVsdGkudXBkYXRlKDpvcmRlciwgZm4gJXtvcmRlcl90b191cGRhdGU6IG9yZGVyfSAtPlxuICAgICAgICBFY3RvLkNoYW5nZXNldC5jaGFuZ2Uob3JkZXIsXG4gICAgICAgICAgc3RhdHVzOiBPcmRlclN0YXR1cy5jb21wbGV0ZWQoKVxuICAgICAgICApXG4gICAgICBlbmQpXG4gICAgZW5kXG4gIClcblxuICBwcm9qZWN0KFxuICAgICVPcmRlckRlbGV0ZWR7aWQ6IGlkfSxcbiAgICBfbWV0YWRhdGEsXG4gICAgZm4gbXVsdGkgLT5cbiAgICAgIG11bHRpXG4gICAgICB8PiBFY3RvLk11bHRpLnJ1big6b3JkZXJfdG9fZGVsZXRlLCBmbiByZXBvLCBfY2hhbmdlcyAtPlxuICAgICAgICB7cm93c19kZWxldGVkLCBffSA9IGZyb20obyBpbiBPcmRlciwgd2hlcmU6IG8uaWQgPT0gXmlkKSB8PiByZXBvLmRlbGV0ZV9hbGwoKVxuICAgICAgICB7Om9rLCByb3dzX2RlbGV0ZWR9XG4gICAgICBlbmQpXG4gICAgZW5kXG4gIClcblxuZW5kIiwiY29va2llIjoic2VjcmV0Y29va2llIiwiY29va2llX3NlY3JldCI6IkNPT0tJRSIsIm5vZGUiOiJlc2NxcnNhbmF0b215QGhvc3QxLmNvbSIsIm5vZGVfc2VjcmV0IjoiIiwidXNlX2Nvb2tpZV9zZWNyZXQiOmZhbHNlLCJ1c2Vfbm9kZV9zZWNyZXQiOmZhbHNlfQ","chunks":null,"kind":"Elixir.Kino.RemoteExecutionCell","livebook_object":"smart_cell"} --> ```elixir require Kino.RPC node = :"escqrsanatomy@host1.com" Node.set_cookie(node, :secretcookie) Kino.RPC.eval_string( node, ~S""" defmodule EsCqrsAnatomy.Order.Projectors.Orders do use Commanded.Projections.Ecto, application: EsCqrsAnatomy.App, repo: EsCqrsAnatomy.Repo, name: "orders" use EsCqrsAnatomy.Base.EventHandler alias EsCqrsAnatomy.Order.Events.{ OrderCreated, OrderCompleted, OrderDeleted } alias EsCqrsAnatomy.Order.Projections.{Order, OrderItem} alias EsCqrsAnatomy.Order.Aggregate.OrderStatus project( %OrderCreated{} = event, _metadata, fn multi -> multi |> Ecto.Multi.insert( :orders, %Order{ id: event.id, order_number: event.order_number, business_partner: event.business_partner, status: OrderStatus.open(), items: event.items |> Enum.map( &%OrderItem{ id: UUID.uuid4(), order_id: event.id, product_id: &1.product_id, quantity: &1.quantity, uom: &1.uom } ) } ) end ) project( %OrderCompleted{id: id}, _metadata, fn multi -> multi |> Ecto.Multi.run(:order_to_update, fn repo, _changes -> {:ok, repo.get(Order, id)} end) |> Ecto.Multi.update(:order, fn %{order_to_update: order} -> Ecto.Changeset.change(order, status: OrderStatus.completed() ) end) end ) project( %OrderDeleted{id: id}, _metadata, fn multi -> multi |> Ecto.Multi.run(:order_to_delete, fn repo, _changes -> {rows_deleted, _} = from(o in Order, where: o.id == ^id) |> repo.delete_all() {:ok, rows_deleted} end) end ) end """, file: __ENV__.file ) ``` <!-- livebook:{"attrs":"eyJkYXRhYmFzZSI6InJlYWRfc3RvcmUiLCJob3N0bmFtZSI6InBvc3RncmVzIiwicGFzc3dvcmQiOiJwb3N0Z3JlcyIsInBvcnQiOjU0MzIsInR5cGUiOiJwb3N0Z3JlcyIsInVzZV9pcHY2IjpmYWxzZSwidXNlX3NzbCI6ZmFsc2UsInVzZXJuYW1lIjoicG9zdGdyZXMiLCJ2YXJpYWJsZSI6ImNvbm4ifQ","chunks":null,"kind":"Elixir.KinoDB.ConnectionCell","livebook_object":"smart_cell"} --> ```elixir opts = [ hostname: "postgres", port: 5432, username: "postgres", password: "postgres", database: "read_store" ] {:ok, conn} = Kino.start_child({Postgrex, opts}) ``` <!-- livebook:{"attrs":"eyJjYWNoZV9xdWVyeSI6dHJ1ZSwiY29ubmVjdGlvbiI6eyJ0eXBlIjoicG9zdGdyZXMiLCJ2YXJpYWJsZSI6ImNvbm4ifSwiZGF0YV9mcmFtZV9hbGlhcyI6IkVsaXhpci5FeHBsb3Jlci5EYXRhRnJhbWUiLCJxdWVyeSI6IlNFTEVDVFxuICBvLm9yZGVyX251bWJlciwgXG4gIG8uYnVzaW5lc3NfcGFydG5lciwgXG4gIG8uc3RhdHVzLCBcbiAgb2kucHJvZHVjdF9pZCwgXG4gIG9pLnF1YW50aXR5LCBcbiAgb2kudW9tIFxuRlJPTSBvcmRlcnMgbyBcblx0Sk9JTiBvcmRlcl9pdGVtcyBvaSBcblx0XHRPTiBvLmlkID0gb2kub3JkZXJfaWQgXG5PUkRFUiBCWSBcbiAgby5vcmRlcl9udW1iZXIsIFxuICBvaS5xdWFudGl0eSAiLCJyZXN1bHRfdmFyaWFibGUiOiJyZXN1bHQ5IiwidGltZW91dCI6bnVsbH0","chunks":null,"kind":"Elixir.KinoDB.SQLCell","livebook_object":"smart_cell"} --> ```elixir result9 = Postgrex.query!( conn, ~S""" SELECT o.order_number, o.business_partner, o.status, oi.product_id, oi.quantity, oi.uom FROM orders o JOIN order_items oi ON o.id = oi.order_id ORDER BY o.order_number, oi.quantity """, [] ) ``` ## Performance ### Aggregate Snapshoot <!-- livebook:{"attrs":"eyJjYWNlcnRmaWxlIjoiIiwiZGF0YWJhc2UiOiJldmVudF9zdG9yZSIsImhvc3RuYW1lIjoicG9zdGdyZXMiLCJwYXNzd29yZCI6InBvc3RncmVzIiwicG9ydCI6NTQzMiwidHlwZSI6InBvc3RncmVzIiwidXNlX2lwdjYiOmZhbHNlLCJ1c2Vfc3NsIjpmYWxzZSwidXNlcm5hbWUiOiJwb3N0Z3JlcyIsInZhcmlhYmxlIjoiY29ubiJ9","chunks":null,"kind":"Elixir.KinoDB.ConnectionCell","livebook_object":"smart_cell"} --> ```elixir opts = [ hostname: "postgres", port: 5432, username: "postgres", password: "postgres", database: "event_store" ] {:ok, conn} = Kino.start_child({Postgrex, opts}) ``` ``` config :es_cqrs_anatomy, EsCqrsAnatomy.App, snapshotting: %{ EsCqrsAnatomy.Order.Aggregate.Order => [ snapshot_every: 1, snapshot_version: 1 ] } ``` <!-- livebook:{"attrs":"eyJjYWNoZV9xdWVyeSI6dHJ1ZSwiY29ubmVjdGlvbiI6eyJ0eXBlIjoicG9zdGdyZXMiLCJ2YXJpYWJsZSI6ImNvbm4ifSwiZGF0YV9mcmFtZV9hbGlhcyI6IkVsaXhpci5FeHBsb3Jlci5EYXRhRnJhbWUiLCJxdWVyeSI6IlNFTEVDVFxuXHRzLnNvdXJjZV92ZXJzaW9uLFxuXHRzLnNvdXJjZV90eXBlLFxuICBzLm1ldGFkYXRhLFxuXHRzLmRhdGFcbkZST00gc25hcHNob3RzIHMgIiwicmVzdWx0X3ZhcmlhYmxlIjoicmVzdWx0MTIiLCJ0aW1lb3V0IjpudWxsfQ","chunks":null,"kind":"Elixir.KinoDB.SQLCell","livebook_object":"smart_cell"} --> ```elixir result12 = Postgrex.query!( conn, ~S""" SELECT s.source_version, s.source_type, s.metadata, s.data FROM snapshots s """, [] ) ``` <!-- livebook:{"offset":36982,"stamp":{"token":"XCP.EAjf2FpITwyMtRdU4EETT5sUaHiz71_8tMNKDKsiToVOGyNrfyFu7qFIv9HyWqGPEA3ZWoI-impUuH5c_43qlgeut4gWDKI-5cm3Im5aMwf6FShTI83j0_YP","version":2}} -->
See source

Have you already installed Livebook?

If you already installed Livebook, you can configure the default Livebook location where you want to open notebooks.
Livebook up Checking status We can't reach this Livebook (but we saved your preference anyway)
Run notebook

Not yet? Install Livebook in just a minute

Livebook is open source, free, and ready to run anywhere.

Run in the cloud

on select platforms

To run on Linux, Docker, embedded devices, or Elixir’s Mix, check our README.

PLATINUM SPONSORS
SPONSORS
Code navigation with go to definition of modules and functions Read More