Run this notebook

Use Livebook to open this notebook and explore new ideas.

It is easy to get started, on your machine or the cloud.

Click below to open and run it in your Livebook at .

(or change your Livebook location)

<!-- livebook:{"persist_outputs":true} --> # Beams Guide ```elixir Mix.install( [ {:lux, ">= 0.5.0"} {:kino, "~> 0.14.2"} ], start_applications: false ) Mix.Task.run("setup", install_deps: false) Application.put_env(:venomous, :snake_manager, %{ python_opts: [ module_paths: [ Lux.Python.module_path(), Lux.Python.module_path(:deps) ], python_executable: "python3" ] }) Application.ensure_all_started([:lux, :kino]) ``` ## Overview <a href="https://livebook.dev/run?url=https%3A%2F%2Fgithub.com%2FSpectral-Finance%2Flux%2Fblob%2Fmain%2Flux%2Fguides%2Fbeams.livemd" style="display: none"> <img src="https://livebook.dev/badge/v1/blue.svg" alt="Run in Livebook" /> </a> Beams are the orchestration layer of Lux, allowing you to compose Prisms, Lenses, and other components into complex workflows. They support sequential, parallel, and conditional execution with rich error handling and logging. A Beam consists of: * A sequence of steps * Input and output schemas * Execution configuration * Error handling and logging * Parameter passing between steps ## Creating a Beam Here's a basic example of a Beam: ```elixir defmodule MyApp.Beams.ContentProcessor do use Lux.Beam, name: "Content Processor", description: "Processes and enriches content", input_schema: %{ type: :object, properties: %{ text: %{type: :string}, language: %{type: :string}, enrich: %{type: :boolean, default: true} }, required: ["text"] }, output_schema: %{ type: :object, properties: %{ sentiment: %{type: :string}, entities: %{type: :array, items: %{type: :string}}, summary: %{type: :string} } }, generate_execution_log: true sequence do step(:sentiment, Lux.Prisms.SentimentAnalysisPrism, [:input]) branch {__MODULE__, :should_enrich?} do true -> parallel do step(:entities, MyApp.Prisms.EntityExtraction, [:input], retries: 2) step(:summary, MyApp.Prisms.TextSummarization, [:input], timeout: :timer.seconds(30) ) end false -> step(:skip, MyApp.Prisms.NoOp, %{}) end end def should_enrich?(ctx) do Map.get(ctx.input, :enrich, true) end end Kino.nothing() ``` Before run defined Beam, we need to define some Prisms that it will use. ```elixir defmodule MyApp.Prisms.EntityExtraction do use Lux.Prism def handler(_input, _ctx) do {:ok, %{result: [""]}} end end defmodule MyApp.Prisms.TextSummarization do use Lux.Prism def handler(_input, _ctx) do {:ok, %{result: "A short summary"}} end end defmodule MyApp.Prisms.NoOp do use Lux.Prism def handler(input, _ctx) do {:ok, input} end end Kino.nothing() ``` You can run defined Beam with `run/2` function. You can inspect entire steps and their outputs in `log`. ```elixir frame = Kino.Frame.new() |> Kino.render() {:ok, result, log} = MyApp.Beams.ContentProcessor.run(%{ enrich: true, text: "hello world", language: "en" }) Kino.Frame.append(frame, result) Kino.Frame.append(frame, log) Kino.nothing() ``` ## Step Types ### Sequential Steps Execute steps one after another: ```elixir import Lux.Beam sequence do step(:first, FirstPrism, %{param: :value}) step(:second, SecondPrism, [:steps, :first, :result]) step(:third, ThirdPrism, [:steps, :third, :result]) end ``` <!-- livebook:{"output":true} --> ``` {:sequence, [ %{ id: :first, module: FirstPrism, opts: %{ timeout: 300000, dependencies: [], fallback: nil, retries: 0, retry_backoff: 1000, track: false, store_io: false }, params: %{param: :value} }, %{ id: :second, module: SecondPrism, opts: %{ timeout: 300000, dependencies: [], fallback: nil, retries: 0, retry_backoff: 1000, track: false, store_io: false }, params: [:steps, :first, :result] }, %{ id: :third, module: ThirdPrism, opts: %{ timeout: 300000, dependencies: [], fallback: nil, retries: 0, retry_backoff: 1000, track: false, store_io: false }, params: [:steps, :third, :result] } ]} ``` ### Parallel Steps Execute steps concurrently: ```elixir parallel do step(:analysis, AnalysisPrism, %{data: :input}) step(:validation, ValidationPrism, %{data: :input}) step(:enrichment, EnrichmentPrism, %{data: :input}) end ``` <!-- livebook:{"output":true} --> ``` {:parallel, [ %{ id: :analysis, module: AnalysisPrism, opts: %{ timeout: 300000, dependencies: [], fallback: nil, retries: 0, retry_backoff: 1000, track: false, store_io: false }, params: %{data: :input} }, %{ id: :validation, module: ValidationPrism, opts: %{ timeout: 300000, dependencies: [], fallback: nil, retries: 0, retry_backoff: 1000, track: false, store_io: false }, params: %{data: :input} }, %{ id: :enrichment, module: EnrichmentPrism, opts: %{ timeout: 300000, dependencies: [], fallback: nil, retries: 0, retry_backoff: 1000, track: false, store_io: false }, params: %{data: :input} } ]} ``` ### Conditional Steps Branch based on conditions: ```elixir branch {__MODULE__, :check_condition} do :path_a -> sequence do step(:a1, PathAPrism, %{}) step(:a2, PathAPrism2, %{}) end :path_b -> sequence do step(:b1, PathBPrism, %{}) step(:b2, PathBPrism2, %{}) end _ -> step(:default, DefaultPrism, %{}) end ``` <!-- livebook:{"output":true} --> ``` {:branch, {nil, :check_condition}, [ path_a: {:sequence, [ %{ id: :a1, module: PathAPrism, opts: %{ timeout: 300000, dependencies: [], fallback: nil, retries: 0, retry_backoff: 1000, track: false, store_io: false }, params: %{} }, %{ id: :a2, module: PathAPrism2, opts: %{ timeout: 300000, dependencies: [], fallback: nil, retries: 0, retry_backoff: 1000, track: false, store_io: false }, params: %{} } ]}, path_b: {:sequence, [ %{ id: :b1, module: PathBPrism, opts: %{ timeout: 300000, dependencies: [], fallback: nil, retries: 0, retry_backoff: 1000, track: false, store_io: false }, params: %{} }, %{ id: :b2, module: PathBPrism2, opts: %{ timeout: 300000, dependencies: [], fallback: nil, retries: 0, retry_backoff: 1000, track: false, store_io: false }, params: %{} } ]}, _: %{ id: :default, module: DefaultPrism, opts: %{ timeout: 300000, dependencies: [], fallback: nil, retries: 0, retry_backoff: 1000, track: false, store_io: false }, params: %{} } ]} ``` ## Parameter References ### Basic References Reference previous step outputs: ```elixir step(:data, DataPrism, %{value: :input_value}) step(:process, ProcessPrism, %{data: [:steps, :data, :result]}) ``` <!-- livebook:{"output":true} --> ``` %{ id: :process, module: ProcessPrism, opts: %{ timeout: 300000, dependencies: [], fallback: nil, retries: 0, retry_backoff: 1000, track: false, store_io: false }, params: %{data: [:steps, :data, :result]} } ``` ### Nested References Access nested values: ```elixir step(:complex, ComplexPrism, %{ value: [:steps, :data, :result, :nested, :value], config: [:steps, :settings, :result] }) ``` <!-- livebook:{"output":true} --> ``` %{ id: :complex, module: ComplexPrism, opts: %{ timeout: 300000, dependencies: [], fallback: nil, retries: 0, retry_backoff: 1000, track: false, store_io: false }, params: %{value: [:steps, :data, :result, :nested, :value], config: [:steps, :settings, :result]} } ``` ### Multiple References Combine multiple references: ```elixir step(:combine, CombinePrism, %{ first: [:steps, :step1, :result], second: [:steps, :step2, :result], third: [:steps, :step3, :result] }) ``` <!-- livebook:{"output":true} --> ``` %{ id: :combine, module: CombinePrism, opts: %{ timeout: 300000, dependencies: [], fallback: nil, retries: 0, retry_backoff: 1000, track: false, store_io: false }, params: %{ first: [:steps, :step1, :result], second: [:steps, :step2, :result], third: [:steps, :step3, :result] } } ``` ## Step Configuration ### Timeouts ```elixir step(:long_running, LongPrism, %{}, timeout: :timer.minutes(10)) ``` <!-- livebook:{"output":true} --> ``` %{ id: :long_running, module: LongPrism, opts: %{ timeout: 600000, dependencies: [], fallback: nil, retries: 0, retry_backoff: 1000, track: false, store_io: false }, params: %{} } ``` ### Retries ```elixir step(:flaky, FlakyPrism, %{}, retries: 3, retry_backoff: 1000 ) ``` <!-- livebook:{"output":true} --> ``` %{ id: :flaky, module: FlakyPrism, opts: %{ timeout: 300000, dependencies: [], fallback: nil, retries: 3, retry_backoff: 1000, track: false, store_io: false }, params: %{} } ``` ### Dependencies ```elixir step(:dependent, DependentPrism, %{}, dependencies: ["step1", "step2"]) ``` <!-- livebook:{"output":true} --> ``` %{ id: :dependent, module: DependentPrism, opts: %{ timeout: 300000, dependencies: ["step1", "step2"], fallback: nil, retries: 0, retry_backoff: 1000, track: false, store_io: false }, params: %{} } ``` ### Execution Logging ```elixir step(:important, ImportantPrism, %{}, store_io: true) ``` <!-- livebook:{"output":true} --> ``` %{ id: :important, module: ImportantPrism, opts: %{ timeout: 300000, dependencies: [], fallback: nil, retries: 0, retry_backoff: 1000, track: false, store_io: true }, params: %{} } ``` ## Error Handling ### Basic Error Handling ```elixir defmodule MyApp.Beams.RobustBeam do use Lux.Beam sequence do step(:risky, MyApp.Prisms.RiskyPrism, %{}, retries: 3, retry_backoff: 1000, fallback: MyApp.Fallbacks.RiskyFallback ) end end defmodule MyApp.Prisms.RiskyPrism do use Lux.Prism def handler(_input, _ctx) do {:error, :too_risky} end end defmodule MyApp.Fallbacks.RiskyFallback do def handle_error(%{error: error, context: ctx}) do case error do %{recoverable: true} -> {:continue, %{status: :degraded, result: compute_fallback(ctx)}} _ -> {:stop, "Unrecoverable error: #{inspect(error)}"} end end defp compute_fallback(_ctx) do # Compute fallback result %{value: 0} end end MyApp.Beams.RobustBeam.run(%{}) ``` <!-- livebook:{"output":true} --> ``` {:error, "Unrecoverable error: :too_risky", nil} ``` ### Inline Fallbacks You can also define fallbacks inline using anonymous functions: ```elixir defmodule MyApp.Beams.InlineFallbackBeam do use Lux.Beam sequence do step(:operation, MyApp.Prisms.OperationPrism, %{}, fallback: fn %{error: error, context: _ctx} -> if recoverable?(error) do {:continue, %{status: :degraded}} else {:stop, "Cannot proceed: #{inspect(error)}"} end end) end defp recoverable?(%{type: :temporary}), do: true defp recoverable?(_), do: false end defmodule MyApp.Prisms.OperationPrism do use Lux.Prism def handler(:ok, _ctx) do {:ok, %{type: :temporary}} end def handler(_input, _ctx) do {:error, %{type: :temporary}} end end MyApp.Beams.InlineFallbackBeam.run(%{}) ``` <!-- livebook:{"output":true} --> ``` {:ok, %{status: :degraded}, nil} ``` ### Fallback Behavior Fallbacks can: * Access the error and context * Return `{:continue, result}` to continue execution * Return `{:stop, reason}` to halt the beam * Transform errors into valid results * Implement recovery strategies ### Custom Error Handling ```elixir defmodule MyApp.Beams.ErrorHandlingBeam do use Lux.Beam sequence do step(:operation, MyApp.Prisms.OperationPrism, [:input]) branch {__MODULE__, :handle_error} do :retry -> step(:retry, MyApp.Prisms.RetryPrism, %{ original_input: [:steps, :operatoin, :input], error: [:steps, :operatoin, :error] }) :fallback -> step(:fallback, MyApp.Prisms.FallbackPrism, %{ error: [:steps, :operatoin, :error] }) :fail -> step(:error, MyApp.Prisms.ErrorPrism, %{ error: [:steps, :operatoin, :error], context: :context }) end end def handle_error(ctx) do case ctx.steps.operation.result do %{type: :temporary} -> :retry %{type: :permanent} -> :fallback _ -> :fail end end end defmodule MyApp.Prisms.RetryPrism do use Lux.Prism def handler(_input, _ctx) do {:ok, %{type: :retried}} end end MyApp.Beams.ErrorHandlingBeam.run(:ok) ``` <!-- livebook:{"output":true} --> ``` {:ok, %{type: :retried}, nil} ``` ## Best Practices 1. **Step Organization** * Group related steps together * Use meaningful step IDs * Keep step configurations clear * Document complex workflows 2. **Error Handling** * Use appropriate retry strategies * Implement fallback paths * Log errors with context * Handle all error cases 3. **Performance** * Use parallel execution when possible * Set appropriate timeouts * Monitor execution times 4. **Testing** * Test happy paths * Test error scenarios * Test parallel execution * Test timeouts and retries Example test: <!-- livebook:{"force_markdown":true} --> ```elixir defmodule MyApp.Beams.ContentProcessorTest do use UnitCase, async: true describe "run/2" do test "processes content successfully" do {:ok, _result, log} = MyApp.Beams.ContentProcessor.run(%{ text: "Great product!", language: "en", enrich: true }) steps_by_id = Map.new(log.steps, &{&1.id, &1}) assert steps_by_id[:sentiment].output["sentiment"] == "positive" assert length(steps_by_id[:entities].output.result) > 0 assert is_binary(steps_by_id[:summary].output.result) end test "respects enrich flag" do {:ok, _result, log} = MyApp.Beams.ContentProcessor.run(%{ text: "Simple text", enrich: false }) steps_by_id = Map.new(log.steps, &{&1.id, &1}) assert steps_by_id[:sentiment] refute steps_by_id[:entities] refute steps_by_id[:summary] end end end ``` ## Advanced Topics ### Complex Workflows ```elixir defmodule MyApp.Beams.ComplexWorkflow do use Lux.Beam, generate_execution_log: true sequence do parallel do step(:data1, DataSource1, %{}) step(:data2, DataSource2, %{}) step(:data3, DataSource3, %{}) end step(:validate, DataValidator, %{ data1: [:steps, :data1, :result], data2: [:steps, :data2, :result], data3: [:steps, :data3, :result] }) branch {__MODULE__, :process_path} do :fast -> step(:quick, QuickProcessor, %{ data: [:steps, :validate, :result] }) :thorough -> parallel do step(:analysis, DeepAnalysis, %{ data: [:steps, :validate, :result] }) step(:enrichment, DataEnrichment, %{ data: [:steps, :validate, :result] }) step(:verification, DataVerification, %{ data: [:steps, :validate, :result] }) end end step(:finalize, Finalizer, %{ result: [:steps, :process_path, :result] }) end def process_path(ctx) do cond do ctx.steps.validate.result.size > 1000 -> :thorough true -> :fast end end end Kino.nothing() ``` ```elixir defmodule DataSource1 do use Lux.Prism def handler(_input, _ctx) do {:ok, :source1} end end defmodule DataSource2 do use Lux.Prism def handler(_input, _ctx) do {:ok, :source2} end end defmodule DataSource3 do use Lux.Prism def handler(_input, _ctx) do {:ok, :source3} end end defmodule DataValidator do use Lux.Prism def handler(input, _ctx) do {:ok, Map.merge(input, %{size: 2000})} end end defmodule QuickProcessor do use Lux.Prism def handler(_input, _ctx) do {:ok, %{result: "quick result"}} end end defmodule DeepAnalysis do use Lux.Prism def handler(_input, _ctx) do {:ok, %{result: "analized result"}} end end defmodule DataEnrichment do use Lux.Prism def handler(_input, _ctx) do {:ok, %{result: "enriched result"}} end end defmodule DataVerification do use Lux.Prism def handler(_input, _ctx) do {:ok, %{result: "verified result"}} end end defmodule Finalizer do use Lux.Prism def handler(_input, _ctx) do {:ok, %{result: "finalized result"}} end end ``` <!-- livebook:{"output":true} --> ``` {:module, Finalizer, <<70, 79, 82, 49, 0, 0, 10, ...>>, {:handler, 2}} ``` ```elixir MyApp.Beams.ComplexWorkflow.run(%{}) ``` <!-- livebook:{"output":true} --> ``` {:ok, %{result: "finalized result"}, %{ input: %{}, output: %{result: "finalized result"}, status: :completed, started_at: ~U[2025-02-12 06:30:32.034005Z], steps: [ %{ error: nil, id: :data1, input: %{}, output: :source1, status: :completed, started_at: ~U[2025-02-12 06:30:32.034005Z], step_index: 0, completed_at: ~U[2025-02-12 06:30:32.034027Z] }, %{ error: nil, id: :data2, input: %{}, output: :source2, status: :completed, started_at: ~U[2025-02-12 06:30:32.034137Z], step_index: 1, completed_at: ~U[2025-02-12 06:30:32.034142Z] }, %{ error: nil, id: :data3, input: %{}, output: :source3, status: :completed, started_at: ~U[2025-02-12 06:30:32.034240Z], step_index: 2, completed_at: ~U[2025-02-12 06:30:32.034247Z] }, %{ error: nil, id: :validate, input: %{data1: :source1, data2: :source2, data3: :source3}, output: %{size: 2000, data1: :source1, data2: :source2, data3: :source3}, status: :completed, started_at: ~U[2025-02-12 06:30:32.034338Z], step_index: 3, completed_at: ~U[2025-02-12 06:30:32.034343Z] }, %{ error: nil, id: :analysis, input: %{data: %{size: 2000, data1: :source1, data2: :source2, data3: :source3}}, output: %{result: "analized result"}, status: :completed, started_at: ~U[2025-02-12 06:30:32.034426Z], step_index: 4, completed_at: ~U[2025-02-12 06:30:32.034446Z] }, %{ error: nil, id: :enrichment, input: %{data: %{size: 2000, data1: :source1, data2: :source2, data3: :source3}}, output: %{result: "enriched result"}, status: :completed, started_at: ~U[2025-02-12 06:30:32.034522Z], step_index: 5, completed_at: ~U[2025-02-12 06:30:32.034535Z] }, %{ error: nil, id: :verification, input: %{data: %{size: 2000, data1: :source1, data2: :source2, data3: :source3}}, output: %{result: "verified result"}, status: :completed, started_at: ~U[2025-02-12 06:30:32.034549Z], step_index: 6, completed_at: ~U[2025-02-12 06:30:32.034557Z] }, %{ error: nil, id: :finalize, input: %{result: nil}, output: %{result: "finalized result"}, status: :completed, started_at: ~U[2025-02-12 06:30:32.034601Z], step_index: 7, completed_at: ~U[2025-02-12 06:30:32.034607Z] } ], completed_at: ~U[2025-02-12 06:30:32.034607Z], beam_id: "b54a67b8-7da6-4e53-a90c-4363c721a2c3", started_by: "system" }} ``` ### Execution Monitoring ```elixir defmodule MyApp.Beams.MonitoredBeam do use Lux.Beam, generate_execution_log: true, monitoring: %{ metrics: [:duration, :memory, :errors], alerts: [ %{ condition: &__MODULE__.alert?/1, action: &__MODULE__.notify/1 } ] } sequence do step(:operation, MonitoredPrism, %{}, track: true) end def alert?(metrics) do metrics.duration > :timer.seconds(30) || metrics.memory > 1_000_000_000 end def notify(_metrics) do # Send alert end end Kino.nothing() ```
See source

Have you already installed Livebook?

If you already installed Livebook, you can configure the default Livebook location where you want to open notebooks.
Livebook up Checking status We can't reach this Livebook (but we saved your preference anyway)
Run notebook

Not yet? Install Livebook in just a minute

Livebook is open source, free, and ready to run anywhere.

Run on your machine

with Livebook Desktop

Run in the cloud

on select platforms

To run on Linux, Docker, embedded devices, or Elixir’s Mix, check our README.

PLATINUM SPONSORS
SPONSORS
Code navigation with go to definition of modules and functions Read More ×