Skip to content

Commit 0b2f7c0

Browse files
committed
Update cli
1 parent be752f9 commit 0b2f7c0

5 files changed

Lines changed: 222 additions & 84 deletions

File tree

libby/cli/libby_cli.py

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
from __future__ import annotations
2+
3+
import argparse, json, os, signal, sys, time
4+
from typing import Any, Dict, List, Optional
5+
from libby.libby import Libby
6+
7+
DEFAULT_SELF_ID = "cli"
8+
DEFAULT_BIND = "tcp://127.0.0.1:56001"
9+
10+
def _parse_addr_kv(kv: str) -> tuple[str, str]:
11+
if "=" not in kv:
12+
raise argparse.ArgumentTypeError("Expected 'peerId=tcp://host:port'")
13+
k, v = kv.split("=", 1)
14+
k, v = k.strip(), v.strip()
15+
if not k or not v:
16+
raise argparse.ArgumentTypeError("Expected 'peerId=tcp://host:port'")
17+
return k, v
18+
19+
def _load_book(path: Optional[str]) -> Dict[str, str]:
20+
if not path:
21+
return {}
22+
with open(path, "r", encoding="utf-8") as f:
23+
data = json.load(f)
24+
if not isinstance(data, dict):
25+
raise SystemExit("--book JSON must be an object mapping peer->endpoint")
26+
return {str(k): str(v) for k, v in data.items()}
27+
28+
def _parse_json(s: Optional[str]) -> Dict[str, Any]:
29+
if not s:
30+
return {}
31+
try:
32+
return json.loads(s)
33+
except Exception as ex:
34+
raise SystemExit(f"--data must be JSON: {ex}")
35+
36+
def _self_id(ns: argparse.Namespace) -> str:
37+
return ns.self_id or os.environ.get("LIBBY_SELF_ID", DEFAULT_SELF_ID)
38+
39+
def _bind(ns: argparse.Namespace) -> str:
40+
return ns.bind or os.environ.get("LIBBY_BIND", DEFAULT_BIND)
41+
42+
def _mk_libby(self_id: str, bind: str, book: Dict[str, str]) -> Libby:
43+
lib = Libby.zmq(
44+
self_id=self_id,
45+
bind=bind,
46+
address_book=book,
47+
keys=[],
48+
callback=None,
49+
discover=True,
50+
discover_interval_s=2.0,
51+
hello_on_start=True,
52+
)
53+
try:
54+
lib.hello()
55+
except Exception:
56+
pass
57+
return lib
58+
59+
def cmd_req(ns: argparse.Namespace) -> int:
60+
book = _load_book(ns.book)
61+
for kv in ns.addr or []:
62+
k, v = _parse_addr_kv(kv); book[k] = v
63+
64+
self_id = _self_id(ns)
65+
bind = _bind(ns)
66+
payload = _parse_json(ns.data)
67+
68+
lib: Optional[Libby] = None
69+
try:
70+
lib = _mk_libby(self_id, bind, book)
71+
ttl_ms = int(ns.ttl_ms) if ns.ttl_ms is not None else int(ns.timeout * 1000.0)
72+
res = lib.rpc(ns.peer, ns.key, payload, ttl_ms=ttl_ms)
73+
print(json.dumps(res, indent=2 if ns.raw_json else 2))
74+
return 0 if res.get("status") == "delivered" else 2
75+
except KeyboardInterrupt:
76+
return 130
77+
except Exception as ex:
78+
print(f"libby-cli req: {ex}", file=sys.stderr)
79+
return 2
80+
finally:
81+
if lib:
82+
try: lib.stop()
83+
except Exception: pass
84+
85+
def cmd_sub(ns: argparse.Namespace) -> int:
86+
topics: List[str] = ns.topics
87+
if not topics:
88+
print("sub: provide at least one topic", file=sys.stderr)
89+
return 2
90+
91+
book = _load_book(ns.book)
92+
for kv in ns.addr or []:
93+
k, v = _parse_addr_kv(kv); book[k] = v
94+
95+
self_id = _self_id(ns)
96+
bind = _bind(ns)
97+
98+
lib: Optional[Libby] = None
99+
stop = False
100+
101+
def on_sig(_s, _f):
102+
nonlocal stop; stop = True
103+
104+
signal.signal(signal.SIGINT, on_sig)
105+
signal.signal(signal.SIGTERM, on_sig)
106+
107+
try:
108+
lib = _mk_libby(self_id, bind, book)
109+
110+
def _printer(msg):
111+
try:
112+
print(json.dumps(
113+
{"source": msg.env.sourceid, "topic": msg.env.key, "payload": msg.env.payload},
114+
indent=2 if ns.raw_json else 2,
115+
))
116+
except Exception as ex:
117+
print(f"[event decode error] {ex}", file=sys.stderr)
118+
119+
for t in topics:
120+
lib.listen(t, _printer)
121+
lib.subscribe(*topics)
122+
123+
print(f"[libby sub] up: id={self_id} bind={bind} topics={topics}")
124+
while not stop:
125+
time.sleep(0.25)
126+
return 0
127+
except KeyboardInterrupt:
128+
return 130
129+
except Exception as ex:
130+
print(f"libby-cli sub: {ex}", file=sys.stderr)
131+
return 2
132+
finally:
133+
if lib:
134+
try: lib.stop()
135+
except Exception: pass
136+
print("[libby sub] stopped")
137+
138+
def build_parser() -> argparse.ArgumentParser:
139+
ap = argparse.ArgumentParser(
140+
prog="libby-cli",
141+
description="Simple Libby CLI: request a key or subscribe to topics."
142+
)
143+
sub = ap.add_subparsers(dest="cmd", required=True)
144+
145+
def common(p):
146+
p.add_argument("--self-id", help=f"Local peer id (default: {DEFAULT_SELF_ID} or $LIBBY_SELF_ID)")
147+
p.add_argument("--bind", help=f"Local ROUTER bind (default: {DEFAULT_BIND} or $LIBBY_BIND)")
148+
p.add_argument("--book", help="Path to JSON {peer_id:'tcp://host:port'}")
149+
p.add_argument("--addr", action="append", metavar="peer=tcp://host:port",
150+
help="Add/override address-book entry (repeatable)")
151+
p.add_argument("--raw-json", action="store_true", help="Pretty-print JSON")
152+
153+
pr = sub.add_parser("req", help="Send a keyed request (RPC) to a peer and print the response")
154+
common(pr)
155+
pr.add_argument("-p", "--peer", required=True, help="Destination peer id")
156+
pr.add_argument("-k", "--key", required=True, help="Key to request (service name)")
157+
pr.add_argument("-d", "--data", help="JSON payload to send (default: {})")
158+
pr.add_argument("--timeout", type=float, default=8.0, help="Timeout seconds (default 8.0)")
159+
pr.add_argument("--ttl-ms", type=int, help="Override TTL ms (default: timeout*1000)")
160+
pr.set_defaults(func=cmd_req)
161+
162+
ps = sub.add_parser("sub", help="Subscribe to one or more topics and print publishes")
163+
common(ps)
164+
ps.add_argument("topics", nargs="+", help="Topic(s) to subscribe to")
165+
ps.set_defaults(func=cmd_sub)
166+
167+
return ap
168+
169+
def main(argv: Optional[List[str]] = None) -> int:
170+
ns = build_parser().parse_args(argv)
171+
return ns.func(ns)
172+
173+
if __name__ == "__main__":
174+
raise SystemExit(main())

libby/cli/lshow.py

Lines changed: 0 additions & 69 deletions
This file was deleted.

libby/zmq_transport.py

Lines changed: 45 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ def __init__(self, bind_router: str, address_book: Dict[str, str], my_id: str):
2424
self._router = self._ctx.socket(zmq.ROUTER)
2525
self._router.setsockopt(zmq.LINGER, 0)
2626
self._router.bind(bind_router)
27+
self._router_id_by_peer: Dict[str, bytes] = {}
2728

2829
self._dealers: Dict[str, zmq.Socket] = {}
2930
self._book: Dict[str, str] = dict(address_book)
@@ -72,17 +73,41 @@ def stop(self) -> None:
7273
def on_receive(self, cb: Callable[[SrcStr, bytes], None]) -> None:
7374
self._cb = cb
7475

76+
def reply_to(self, peer_id: str, frame: bytes) -> bool:
77+
"""
78+
Try to send directly back to the peer using the ROUTER routing-id
79+
we observed on the incoming request. Returns True if used, False otherwise.
80+
"""
81+
rid = self._router_id_by_peer.get(peer_id)
82+
if rid is None:
83+
return False
84+
with self._send_lock:
85+
self._router.send_multipart([rid, frame])
86+
return True
87+
7588
def send(self, dest: DestStr, frame: bytes) -> None:
7689
"""
7790
dest:
78-
- "peer:<peer_id>" -> send direct to that peer via DEALER
79-
- "broadcast:*" -> send to all known peers (fire-and-forget)
91+
- "peer:<peer_id>" or "<peer_id>" -> direct to that peer
92+
- "broadcast:*" -> to all known peers
8093
"""
81-
if dest.startswith("peer:"):
82-
peer_id = dest.split(":", 1)[1]
83-
endpoint = self._book.get(peer_id)
84-
if not endpoint:
85-
return # unknown peer_id; drop silently to match bamboo no-NACK
94+
# 1) broadcast
95+
if dest.startswith("broadcast:"):
96+
with self._send_lock:
97+
for _peer_id, _endpoint in self._book.items():
98+
dealer = self._dealers.get(_peer_id)
99+
if dealer is None:
100+
dealer = self._new_dealer(_peer_id, _endpoint)
101+
self._dealers[_peer_id] = dealer
102+
dealer.send(frame)
103+
return
104+
105+
# 2) normalize to plain id
106+
peer_id = dest.split(":", 1)[1] if dest.startswith("peer:") else dest
107+
108+
# 3) preferred path: DEALER via address_book
109+
endpoint = self._book.get(peer_id)
110+
if endpoint:
86111
dealer = self._dealers.get(peer_id)
87112
if dealer is None:
88113
dealer = self._new_dealer(peer_id, endpoint)
@@ -91,14 +116,18 @@ def send(self, dest: DestStr, frame: bytes) -> None:
91116
dealer.send(frame)
92117
return
93118

94-
if dest.startswith("broadcast:"):
95-
with self._send_lock:
96-
for peer_id, endpoint in self._book.items():
97-
dealer = self._dealers.get(peer_id)
98-
if dealer is None:
99-
dealer = self._new_dealer(peer_id, endpoint)
100-
self._dealers[peer_id] = dealer
101-
dealer.send(frame)
119+
# 4) fallback: reply via ROUTER if we cached this peer's routing-id
120+
rid_map = getattr(self, "_router_id_by_peer", None)
121+
if rid_map is not None:
122+
rid = rid_map.get(peer_id)
123+
if rid is not None:
124+
with self._send_lock:
125+
self._router.send_multipart([rid, frame])
126+
return
127+
128+
# 5) unknown route -> drop silently (matches bamboo no-NACK semantics)
129+
return
130+
102131

103132
def add_peer(self, peer_id: str, endpoint: str) -> None:
104133
"""Dynamically add or update an endpoint for a peer."""
@@ -145,6 +174,7 @@ def _rx_loop(self) -> None:
145174
except Exception:
146175
src_peer = "unknown"
147176

177+
self._router_id_by_peer[src_peer] = ident
148178
if self._cb:
149179
# Pass the *remote* peer id as the source
150180
self._cb(f"peer:{src_peer}", payload)

peers/peer_a.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ class PeerA(LibbyDaemon):
99
address_book = {
1010
"peer-B": "tcp://127.0.0.1:5556",
1111
"peer-C": "tcp://127.0.0.1:5557",
12+
"cli": "tcp://127.0.0.1:56001",
1213
}
1314

1415
# Transport selection: "zmq" (default) or "rabbitmq"

peers/peer_b.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ class PeerB(LibbyDaemon):
2525
address_book = {
2626
"peer-A": "tcp://127.0.0.1:5555",
2727
"peer-C": "tcp://127.0.0.1:5557",
28+
"peer-D": "tcp://127.0.0.1:5558",
29+
"cli": "tcp://127.0.0.1:56001",
2830
}
2931

3032
# Transport selection: "zmq" (default) or "rabbitmq"

0 commit comments

Comments
 (0)