diff --git a/lib/broadway.ex b/lib/broadway.ex index 9f33d83..4b90314 100644 --- a/lib/broadway.ex +++ b/lib/broadway.ex @@ -714,7 +714,8 @@ defmodule Broadway do name: atom, index: non_neg_integer, message: Broadway.Message.t, - telemetry_span_context: reference + telemetry_span_context: reference, + producer: {atom, list} } ``` @@ -732,7 +733,8 @@ defmodule Broadway do name: atom, index: non_neg_integer, message: Broadway.Message.t, - telemetry_span_context: reference + telemetry_span_context: reference, + producer: {atom, list} } ``` @@ -753,7 +755,8 @@ defmodule Broadway do kind: kind, reason: reason, stacktrace: stacktrace, - telemetry_span_context: reference + telemetry_span_context: reference, + producer: {atom, list} } ``` diff --git a/lib/broadway/topology/processor_stage.ex b/lib/broadway/topology/processor_stage.ex index 5cdc862..43331df 100644 --- a/lib/broadway/topology/processor_stage.ex +++ b/lib/broadway/topology/processor_stage.ex @@ -164,7 +164,8 @@ defmodule Broadway.Topology.ProcessorStage do index: state.partition, name: state.name, message: message, - context: state.context + context: state.context, + producer: state.producer }, fn -> updated_message = @@ -179,7 +180,8 @@ defmodule Broadway.Topology.ProcessorStage do index: state.partition, name: state.name, message: updated_message, - context: state.context + context: state.context, + producer: state.producer }} end ) diff --git a/test/broadway_test.exs b/test/broadway_test.exs index cd33488..2429a0d 100644 --- a/test/broadway_test.exs +++ b/test/broadway_test.exs @@ -946,10 +946,10 @@ defmodule BroadwayTest do assert_receive {:telemetry_event, [:broadway, :processor, :start], %{system_time: _}, %{}} assert_receive {:telemetry_event, [:broadway, :processor, :message, :start], - %{system_time: _}, %{}} + %{system_time: _}, %{producer: {ManualProducer, []}}} assert_receive {:telemetry_event, [:broadway, :processor, :message, :stop], %{duration: _}, - %{}} + %{producer: {ManualProducer, []}}} assert_receive {:telemetry_event, [:broadway, :processor, :stop], %{duration: _}, metadata} assert [] = metadata.failed_messages @@ -1022,7 +1022,7 @@ defmodule BroadwayTest do %{system_time: _}, %{}} assert_receive {:telemetry_event, [:broadway, :processor, :message, :exception], - %{duration: _}, %{}} + %{duration: _}, %{producer: {ManualProducer, []}}} assert_receive {:telemetry_event, [:broadway, :processor, :stop], %{duration: _}, %{}} assert_receive {:ack, ^ref, [], [%{status: {:error, _, _}}]}