-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathrpc_example.py
More file actions
87 lines (63 loc) · 2.63 KB
/
rpc_example.py
File metadata and controls
87 lines (63 loc) · 2.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
#!/usr/bin/env python3
"""SlimeRPC loopback test — two PeerAgents in one process.
Prerequisites:
1. Start NanoCtrl: cd NanoCtrl && cargo run --release
2. Redis must be reachable (NanoCtrl returns its address automatically).
Usage:
python rpc_example.py # default NanoCtrl at localhost:3000
python rpc_example.py --ctrl http://host:3000
"""
import argparse
import threading
from dlslime import PeerAgent
from dlslime.rpc import method, proxy, serve
# ═══ Service definition ═════════════════════════════
class CalcService:
@method
def add(self, a: int, b: int) -> int:
return a + b
@method
def mul(self, a: float, b: float) -> float:
return a * b
@method
def echo(self, msg: str) -> str:
return f"echo: {msg}"
# ═══ Loopback test ══════════════════════════════════
def main(ctrl_url: str):
worker = PeerAgent(nanoctrl_url=ctrl_url, alias="worker:0")
# --- driver agent ---
driver = PeerAgent(nanoctrl_url=ctrl_url, alias="driver:0")
driver_conn = driver.connect_to("worker:0", ib_port=1, qp_num=1)
worker_conn = worker.connect_to("driver:0", ib_port=1, qp_num=1)
# wait for both sides to connect
driver_conn.wait()
worker_conn.wait()
print("Connected.")
try:
# serve() blocks — run it in a daemon thread
t = threading.Thread(
target=serve,
args=(worker, CalcService(), "driver:0"),
daemon=True,
)
t.start()
w = proxy(driver, "worker:0", CalcService)
# ── synchronous ──────────────────────────────────
assert w.add(1, 2).wait() == 3
print("add(1, 2) = 3 ✓")
assert abs(w.mul(3.14, 2.0).wait() - 6.28) < 1e-6
print("mul(3.14, 2.0) = 6.28 ✓")
assert w.echo("hello").wait() == "echo: hello"
print("echo('hello') ✓")
# ── batch (sequential request-reply) ─────────────
results = [w.add(i, i * 10).wait() for i in range(5)]
assert results == [0, 11, 22, 33, 44]
print(f"batch add = {results} ✓")
print("\nAll tests passed!")
finally:
worker.shutdown()
driver.shutdown()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="SlimeRPC loopback test")
parser.add_argument("--ctrl", default="http://127.0.0.1:3000", help="NanoCtrl URL")
main(parser.parse_args().ctrl)