Broadway es una herramienta construida sobre GenStage, concurrente de varias etapas para crear pipelines de ingesta y procesamiento de datos.
Es decir, esta biblioteca nos permite consumir y procesar información de fuentes como un Message Broker (Kafka, RabbitMQ, etc.).
Por si fuera poco, viene con una serie de características integradas que nos permiten diseñar nuestros pipelines con una gran flexibilidad:
- Back-pressure
- Concurrencia y procesamiento por lotes
- Rate limiting
- Varios producers out of the box
- La posibilidad de crear custom producers
- Entre otras.
Creando un custom producer
Si bien consumir información desde un message broker es, por decirlo de alguna manera, la opción por defecto de Broadway, esto no significa que esté limitado a estos como fuente de información.
Como ya se mencionó, nos permite crear y conectar nuestros propios producers y estos pueden consumir información de cualquier fuente que queramos. Una base de datos, por ejemplo.
Lo primero es crear un módulo que utilice el behaviour de Broadway:
defmodule MyBroadway do
use Broadway
require Logger
@chunk_size 10
def start_link(_opts) do
options = [
name: __MODULE__,
producer: [
module: {DBDummyProducer, []},
transformer: {__MODULE__, :transform, []},
concurrency: 1
],
processors: [
default: [
max_demand: @chunk_size,
concurrency: 1
]
]
]
Broadway.start_link(__MODULE__, options)
end
end
En las opciones le estamos diciendo a la biblioteca que nuestro producer es el módulo DBDummyProducer
, ya que vamos a “realizar” consultas a la base de datos, usamos la opción max_demand
para limitar el número de registros que vamos a cargar en memoria. Vamos a utilizar este valor en DBDummyProducer
.
Además, le estamos diciendo que vamos a utilizar como transformer la función transform
en el mismo módulo MyBroadway
. Ya que los datos llegaran como maps o structs y deben incluirse en la estructura Broadway.Message.t()
.
def transform(event, _opts) do
Logger.debug("[MyBroadway] transforming event: #{inspect(event)}")
%Message{
data: event,
acknowledger: {__MODULE__, :trackings, :ack_data}
}
end
Otra función que debemos implementar es ack/3
. Esta función sirve para notificar que los mensajes recibidos se procesaron correctamente o fallaron.
def ack(:ack_id, successful, failed) do
Logger.debug(
"[MyBroadway] ack successful: #{Enum.count(successful)}, failed: #{Enum.count(failed)}"
)
:ok
end
Ahora creamos nuestro módulo DBDummyProducer
, para efectos de simplificación vamos a generar datos en memoria, solo habría que sustituir esto por la llamada a nuestro contexto o módulo que genere la consulta.
defmodule DBDummyProducer do
@moduledoc """
This module emulates DBProducer behavior producing in memory rows
"""
use GenStage
require Logger
@impl GenStage
def init(opts) do
{:producer, opts}
end
def start_link(opts) do
GenStage.start_link(__MODULE__, opts)
end
@impl true
@spec handle_demand(integer, any) :: {:noreply, list, any}
def handle_demand(demand, state) do
Logger.debug(fn ->
"[DBProducer] handling demand: #{demand}"
end)
# events = Users.list(limit: demand)
events =
Enum.map(1..demand, fn _number ->
%{
id: Enum.random(1..9_999_999),
}
end)
{:noreply, events, state}
end
end
Ahora nuestro pipeline puede recibir datos desde nuestro producer. Seguramente necesitamos procesar esta información de alguna manera. Para esto tenemos dos callback handle_message/3
para procesar mensajes individuales y handle_batch/4
para procesar lotes.
def handle_message(_processor, message, _context) do
Logger.debug(fn -> "[MyBroadway] Incoming Message: #{inspect(message)}" end)
# perform business logic
message
end
def handle_batch(_batcher, messages, _batch_info, _context) do
Logger.debug("[MyBroadway] in default batcher")
Logger.debug(fn -> "[MyBroadway] size: #{length(messages)}" end)
messages
end
Sí queremos hacer uso de handle_batch/4
debemos hacer dos cosas:
- Agregar la llave
batchers
a nuestrasoptions
- Al transformar el mensaje debemos especificar en cual
batcher
se va a procesar
def start_link(_opts) do
options = [
# ... previous options
batchers: [default: []]
]
Broadway.start_link(__MODULE__, options)
end
def transform(event, _state) do
Logger.debug("[MyBroadway] transforming event: #{inspect(event)}")
%Broadway.Message{
data: event,
acknowledger: {__MODULE__, :trackings, :ack_data}
}
|> Broadway.Message.put_batcher(:default)
end
Por último, solo resta agregar MyBroadway
al supervisor, normalmente en el archivo application.ex
children = [
{MyBroadway, []}
]
Supervisor.start_link(children, strategy: :one_for_one)
Si intentamos probar el comportamiento del código, no seremos capaces de apreciar los eventos que están sucediendo. Para configurar un poco este comportamiento podemos usar la configuración de rate_limiting en las opciones del producer:
def start_link(_opts) do
options = [
# ... previous options
producer: [
module: {DBDummyProducer, []},
transformer: {__MODULE__, :transform, []},
rate_limiting: [
interval: 10_000,
allowed_messages: 10
],
concurrency: 1
],
# .. next options
]
Broadway.start_link(__MODULE__, options)
end
Puedes ver el resultado final en el repositorio broadway-custom-producer. Pero nuestro código queda de la siguiente manera:
defmodule MyBroadway do
use Broadway
require Logger
@chunk_size 10
@doc """
Uses rate_limiting options to be able to see log messages in the console
"""
def start_link(_opts) do
options = [
name: __MODULE__,
producer: [
module: {DBDummyProducer, []},
transformer: {__MODULE__, :transform, []},
rate_limiting: [
interval: 10_000,
allowed_messages: 10
],
concurrency: 1
],
processors: [
default: [
max_demand: @chunk_size,
concurrency: 1
]
],
batchers: [default: []]
]
Broadway.start_link(__MODULE__, options)
end
def transform(event, _state) do
Logger.debug("[MyBroadway] transforming event: #{inspect(event)}")
%Broadway.Message{
data: event,
acknowledger: {__MODULE__, :trackings, :ack_data}
}
|> Broadway.Message.put_batcher(:default)
end
def ack(:trackings, successful, failed) do
Logger.debug(
"[MyBroadway] ack successful: #{length(successful)}, failed: #{length(failed)}"
)
:ok
end
def handle_message(_processor, message, _context) do
Logger.debug(fn -> "[MyBroadway] Incoming Message: #{inspect(message)}" end)
message
end
def handle_batch(_batcher, messages, _batch_info, _context) do
Logger.debug("[MyBroadway] in default batcher")
Logger.debug(fn -> "[MyBroadway] size: #{length(messages)}" end)
messages
end
end
defmodule DBDummyProducer do
@moduledoc """
This module emulates database behavior by producing in-memory rows
"""
use GenStage
require Logger
@impl GenStage
def init(opts) do
{:producer, opts}
end
def start_link(opts) do
GenStage.start_link(__MODULE__, opts)
end
@impl true
def handle_demand(demand, state) do
Logger.debug(fn ->
"[DBDummyProducer] handling demand: #{demand}"
end)
# events = Users.list_users(limit: demand)
events =
Enum.map(1..demand, fn _number ->
%{
id: Enum.random(1..9_999_999_999)
}
end)
{:noreply, events, state}
end
end
Ahora, si ingresamos a la consola interactiva de nuestra aplicación, veremos algo como los siguientes mensajes que mostramos mediante el Logger
:
10:10:39.341 [debug] [DBDummyProducer] handling demand: 10
10:10:39.349 [debug] [MyBroadway] transforming event: %{id: 2730249896}
10:10:39.349 [debug] [MyBroadway] transforming event: %{id: 8277765695}
10:10:39.349 [debug] [MyBroadway] transforming event: %{id: 917212511}
10:10:39.349 [debug] [MyBroadway] transforming event: %{id: 2264493728}
10:10:39.349 [debug] [MyBroadway] transforming event: %{id: 4033891075}
10:10:39.349 [debug] [MyBroadway] transforming event: %{id: 2065156038}
10:10:39.349 [debug] [MyBroadway] transforming event: %{id: 4625753942}
10:10:39.349 [debug] [MyBroadway] transforming event: %{id: 2587137776}
10:10:39.349 [debug] [MyBroadway] transforming event: %{id: 6481119527}
10:10:39.349 [debug] [MyBroadway] transforming event: %{id: 6661902836}
10:10:39.353 [debug] [MyBroadway] Incoming Message: %Broadway.Message{data: %{id: 2730249896}, metadata: %{}, acknowledger: {MyBroadway, :trackings, :ack_data}, batcher: :default, batch_key: :default, batch_mode: :bulk, status: :ok}
10:10:39.353 [debug] [MyBroadway] Incoming Message: %Broadway.Message{data: %{id: 8277765695}, metadata: %{}, acknowledger: {MyBroadway, :trackings, :ack_data}, batcher: :default, batch_key: :default, batch_mode: :bulk, status: :ok}
10:10:39.353 [debug] [MyBroadway] Incoming Message: %Broadway.Message{data: %{id: 917212511}, metadata: %{}, acknowledger: {MyBroadway, :trackings, :ack_data}, batcher: :default, batch_key: :default, batch_mode: :bulk, status: :ok}
10:10:39.353 [debug] [MyBroadway] Incoming Message: %Broadway.Message{data: %{id: 2264493728}, metadata: %{}, acknowledger: {MyBroadway, :trackings, :ack_data}, batcher: :default, batch_key: :default, batch_mode: :bulk, status: :ok}
10:10:39.353 [debug] [MyBroadway] Incoming Message: %Broadway.Message{data: %{id: 4033891075}, metadata: %{}, acknowledger: {MyBroadway, :trackings, :ack_data}, batcher: :default, batch_key: :default, batch_mode: :bulk, status: :ok}
10:10:39.354 [debug] [MyBroadway] Incoming Message: %Broadway.Message{data: %{id: 2065156038}, metadata: %{}, acknowledger: {MyBroadway, :trackings, :ack_data}, batcher: :default, batch_key: :default, batch_mode: :bulk, status: :ok}
10:10:39.354 [debug] [MyBroadway] Incoming Message: %Broadway.Message{data: %{id: 4625753942}, metadata: %{}, acknowledger: {MyBroadway, :trackings, :ack_data}, batcher: :default, batch_key: :default, batch_mode: :bulk, status: :ok}
10:10:39.354 [debug] [MyBroadway] Incoming Message: %Broadway.Message{data: %{id: 2587137776}, metadata: %{}, acknowledger: {MyBroadway, :trackings, :ack_data}, batcher: :default, batch_key: :default, batch_mode: :bulk, status: :ok}
10:10:39.354 [debug] [MyBroadway] Incoming Message: %Broadway.Message{data: %{id: 6481119527}, metadata: %{}, acknowledger: {MyBroadway, :trackings, :ack_data}, batcher: :default, batch_key: :default, batch_mode: :bulk, status: :ok}
10:10:39.354 [debug] [MyBroadway] Incoming Message: %Broadway.Message{data: %{id: 6661902836}, metadata: %{}, acknowledger: {MyBroadway, :trackings, :ack_data}, batcher: :default, batch_key: :default, batch_mode: :bulk, status: :ok}
10:10:40.355 [debug] [MyBroadway] in default batcher
10:10:40.355 [debug] [MyBroadway] size: 10
10:10:40.355 [debug] [MyBroadway] ack successful: 10, failed: 0
Como pudimos ver, Broadway es una opción muy poderosa para consumir y procesar información, podemos diseñar nuestros pipelines incluyendo distintas fases en el proceso y contamos con múltiples opciones que nos permiten adecuar el comportamiento de acuerdo a nuestro caso de uso.
En este post apenas abarcamos lo básico, algunos recursos muy útiles para consultar: