Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/simple_echo/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ defmodule SimpleEcho.Application do
# Starts a worker by calling: SimpleEcho.Worker.start_link(arg)
# {SimpleEcho.Worker, arg}
{Plug.Cowboy, plug: SimpleEcho.MyPlug, scheme: :http, port: 4444},
{SimpleEcho.ZenohexPingPong, []}
{SimpleEcho.ZenohexPingPong, []},
{SimpleEcho.MqttPingPong, []}
]

# See https://hexdocs.pm/elixir/Supervisor.html
Expand Down
34 changes: 34 additions & 0 deletions lib/simple_echo/mqtt_pingpong.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
defmodule SimpleEcho.MqttPingPong do
use GenServer

@ping_topic "demo/mcam/ping/"
@pong_topic "demo/mcam/pong/"

@client_id Mecho

def start_link(_args) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end

def init(_args) do
# Set topic names
default_server_type = System.get_env("SERVER_TYPE", "cloud")
ping_topic = @ping_topic <> default_server_type
pong_topic = @pong_topic <> default_server_type

{:ok, pid} =
Tortoise311.Connection.start_link(
client_id: @client_id,
server: {Tortoise311.Transport.Tcp, host: "localhost", port: 1883},
handler:
{SimpleEcho.MqttPingPong.Handler,
[ping_topic: ping_topic, pong_topic: pong_topic, client_id: @client_id]},
subscriptions: [
{ping_topic, 0}
]
)

state = %{pid: pid}
{:ok, state}
end
end
73 changes: 73 additions & 0 deletions lib/simple_echo/mqtt_pingpong/handler.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
defmodule SimpleEcho.MqttPingPong.Handler do
use Tortoise311.Handler

require Logger

def init(args) do
ping_topic = Keyword.fetch!(args, :ping_topic)
pong_topic = Keyword.fetch!(args, :pong_topic)
client_id = Keyword.fetch!(args, :client_id)
Logger.info("Initializing handler")

{:ok,
%{
ping_topic: ping_topic,
pong_topic: pong_topic,
client_id: client_id
}}
end

def connection(:up, state) do
Logger.info("Connection has been established")
{:ok, state}
end

def connection(:down, state) do
Logger.warning("Connection has been dropped")
{:ok, state}
end

def connection(:terminating, state) do
Logger.warning("Connection is terminating")
{:ok, state}
end

def subscription(:up, topic, state) do
Logger.info("Subscribed to #{topic}")
{:ok, state}
end

def subscription({:warn, [requested: req, accepted: qos]}, topic, state) do
Logger.warning("Subscribed to #{topic}; requested #{req} but got accepted with QoS #{qos}")
{:ok, state}
end

def subscription({:error, reason}, topic, state) do
Logger.error("Error subscribing to #{topic}; #{inspect(reason)}")
{:ok, state}
end

def subscription(:down, topic, state) do
Logger.info("Unsubscribed from #{topic}")
{:ok, state}
end

def handle_message(topic, publish, state) do
ping_topic = state.ping_topic
pong_topic = state.pong_topic
client_id = state.client_id

topic_path = Enum.join(topic, "/")

case topic_path do
^ping_topic ->
Tortoise311.publish(client_id, pong_topic, publish, qos: 0)
Logger.debug(inspect(publish))

_ ->
Logger.warning("Subscribed to wrong #{topic_path}")
end

{:ok, state}
end
end
1 change: 1 addition & 0 deletions lib/simple_echo/my_plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ defmodule SimpleEcho.MyPlug do

defp handle_request({:ok, body, _conn}, conn) do
Logger.debug(inspect(body))

conn
|> put_resp_content_type("application/json")
|> send_resp(200, body)
Expand Down
11 changes: 7 additions & 4 deletions lib/simple_echo/zenohex_pingpong.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,24 @@ defmodule SimpleEcho.ZenohexPingPong do
use GenServer
require Logger

@default_ping_key "demo/zcam/ping"
@default_pong_key "demo/zcam/pong"
@ping_key "demo/zcam/ping/"
@pong_key "demo/zcam/pong/"

def start_link(_args) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end

def init(_args) do
# Set key names
default_server_type = System.get_env("SERVER_TYPE", "cloud")
ping_key = @ping_key <> default_server_type
pong_key = @pong_key <> default_server_type

# Open Zenoh session and declare subscriber
{:ok, session} = Zenohex.open()
ping_key = System.get_env("PING_KEY", @default_ping_key)
{:ok, subscriber} = Zenohex.Session.declare_subscriber(session, ping_key)

# Declare publisher with created Zenoh session
pong_key = System.get_env("PONG_KEY", @default_pong_key)
{:ok, publisher} = Zenohex.Session.declare_publisher(session, pong_key)

state = %{subscriber: subscriber, publisher: publisher}
Expand Down
3 changes: 2 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ defmodule SimpleEcho.MixProject do
[
{:plug_cowboy, "~> 2.0"},
{:jason, "~> 1.4"},
{:zenohex, "~> 0.3.0"}
{:zenohex, "~> 0.3.0"},
{:tortoise311, "~> 0.11"}
]
end

Expand Down
2 changes: 2 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"cowboy": {:hex, :cowboy, "2.12.0", "f276d521a1ff88b2b9b4c54d0e753da6c66dd7be6c9fca3d9418b561828a3731", [:make, :rebar3], [{:cowlib, "2.13.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "8a7abe6d183372ceb21caa2709bec928ab2b72e18a3911aa1771639bef82651e"},
"cowboy_telemetry": {:hex, :cowboy_telemetry, "0.4.0", "f239f68b588efa7707abce16a84d0d2acf3a0f50571f8bb7f56a15865aae820c", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7d98bac1ee4565d31b62d59f8823dfd8356a169e7fcbb83831b8a5397404c9de"},
"cowlib": {:hex, :cowlib, "2.13.0", "db8f7505d8332d98ef50a3ef34b34c1afddec7506e4ee4dd4a3a266285d282ca", [:make, :rebar3], [], "hexpm", "e1e1284dc3fc030a64b1ad0d8382ae7e99da46c3246b815318a4b848873800a4"},
"gen_state_machine": {:hex, :gen_state_machine, "3.0.0", "1e57f86a494e5c6b14137ebef26a7eb342b3b0070c7135f2d6768ed3f6b6cdff", [:mix], [], "hexpm", "0a59652574bebceb7309f6b749d2a41b45fdeda8dbb4da0791e355dd19f0ed15"},
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
"mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"},
"plug": {:hex, :plug, "1.16.0", "1d07d50cb9bb05097fdf187b31cf087c7297aafc3fed8299aac79c128a707e47", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "cbf53aa1f5c4d758a7559c0bd6d59e286c2be0c6a1fac8cc3eee2f638243b93e"},
Expand All @@ -13,5 +14,6 @@
"rustler_precompiled": {:hex, :rustler_precompiled, "0.7.1", "ecadf02cc59a0eccbaed6c1937303a5827fbcf60010c541595e6d3747d3d0f9f", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:rustler, "~> 0.23", [hex: :rustler, repo: "hexpm", optional: true]}], "hexpm", "b9e4657b99a1483ea31502e1d58c464bedebe9028808eda45c3a429af4550c66"},
"telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"},
"toml": {:hex, :toml, "0.7.0", "fbcd773caa937d0c7a02c301a1feea25612720ac3fa1ccb8bfd9d30d822911de", [:mix], [], "hexpm", "0690246a2478c1defd100b0c9b89b4ea280a22be9a7b313a8a058a2408a2fa70"},
"tortoise311": {:hex, :tortoise311, "0.11.7", "29e78e6ffa153a2c0728344277e587342cf7035ee62a38dc5656d970007920f0", [:mix], [{:gen_state_machine, "~> 2.0 or ~> 3.0", [hex: :gen_state_machine, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "0ef7cf940b50ab47c433323294c9a3bb2883a03e0c7cb37de3e860c0b9ba5bef"},
"zenohex": {:hex, :zenohex, "0.3.0", "373dbda06f86e0244611261db180212220eefd990f40c846863621c1f6fbdb7f", [:mix], [{:rustler, ">= 0.32.1", [hex: :rustler, repo: "hexpm", optional: true]}, {:rustler_precompiled, "~> 0.7.1", [hex: :rustler_precompiled, repo: "hexpm", optional: false]}, {:toml, "~> 0.7", [hex: :toml, repo: "hexpm", optional: false]}], "hexpm", "877328532c37f356fd3499239d3af3fb8c6fafc2ef28733295551a6ee2759b79"},
}
2 changes: 2 additions & 0 deletions mosquitto.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
allow_anonymous true
listener 1883 0.0.0.0