wasmcloud host heartbeat 事件处理

host_core 定时 pub

每隔 30 秒,host_core/lib/host_core/heartbeat_emitter.ex 进程会 pub 消息到 wasmbus.evt.{lattice_prefix},其消息内容是符合 CloudEvent 规范的消息体,如下:

1
2
3
4
5
6
7
8
9
10
%{
actors: actors, # 该 host 上守护(监督)的所有 actors,就是 actor supervisor 的子进程
providers: providers, # 同上,为 provider supervisor 的子进程
labels: HostCore.Host.host_labels(),
friendly_name: HostCore.Host.friendly_name(),
version: Application.spec(:host_core, :vsn) |> to_string(),
uptime_seconds: ut_seconds,
uptime_human: ut_human
}
|> CloudEvent.new("host_heartbeat", state[:host_key]) # 事件类型是 host_heartbeat

抓取 nats 的调试日志,看到实际的结构体为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{
"data": {
"actors": [],
"friendly_name": "wandering-moon-6866",
"labels": {
"hostcore.arch": "x86_64",
"hostcore.os": "macos",
"hostcore.osfamily": "unix"
},
"providers": [],
"uptime_human": "1 hour, 27 minutes, 33 seconds",
"uptime_seconds": 5253,
"version": "0.56.0"
},
"datacontenttype": "application/json",
"id": "fc7b1193-1e79-4ec1-9bd5-26c57feea037",
"source": "NCSWT2MNKTGQC7FZE3NEW5LN44PIRUJUK7ZIN2LNNN4DIUJVTFGLOI7I",
"specversion": "1.0",
"time": "2022-09-30T07:37:36.766320Z",
"type": "com.wasmcloud.lattice.host_heartbeat"
}

在添加了 actor provider 和 link 之后

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
{
"data": {
"actors": [
{
"instance_id": "3268bc07-bfe2-47f5-b5a7-06626555d953",
"public_key": "MBCFOPM6JW2APJLXJD3Z5O4CN7CPYJ2B4FTKLJUR5YR5MITIU7HD3WD5"
},
{
"instance_id": "e8cceadb-b8f7-466f-9373-2060b28d8394",
"public_key": "MBCFOPM6JW2APJLXJD3Z5O4CN7CPYJ2B4FTKLJUR5YR5MITIU7HD3WD5"
}
],
"friendly_name": "wandering-moon-6866",
"labels": {
"hostcore.arch": "x86_64",
"hostcore.os": "macos",
"hostcore.osfamily": "unix"
},
"providers": [
{
"contract_id": "wasmcloud:httpserver",
"instance_id": "afb01c6a-b716-4608-bf33-7742654136d5",
"link_name": "default",
"public_key": "VAG3QITQQ2ODAOWB5TTQSDJ53XK3SHBEIFNK4AYJ5RKAX2UNSCAPHA5M"
},
{
"contract_id": "wasmcloud:keyvalue",
"instance_id": "ba82fa4b-b7c5-4a30-9e09-e3c904e2df33",
"link_name": "default",
"public_key": "VAZVC4RX54J2NVCMCW7BPCAHGGG5XZXDBXFUMDUXGESTMQEJLC3YVZWB"
}
],
"uptime_human": "1 hour, 41 minutes, 3 seconds",
"uptime_seconds": 6063,
"version": "0.56.0"
},
"datacontenttype": "application/json",
"id": "35173a21-32d2-4a41-af7a-aa20f5ef6d8f",
"source": "NCSWT2MNKTGQC7FZE3NEW5LN44PIRUJUK7ZIN2LNNN4DIUJVTFGLOI7I",
"specversion": "1.0",
"time": "2022-09-30T07:50:58.990859Z",
"type": "com.wasmcloud.lattice.host_heartbeat"
}

代码实现细节

启动 heartbeat emitter 进程

host_core/lib/host_core.ex start 的时候,启动了相关的进程

1
2
Supervisor.child_spec(
{HostCore.HeartbeatEmitter, config}, # 这里

host_core/lib/host_core/heartbeat_emitter.ex

1
2
3
4
5
6
7
8
9
@impl true
def init(opts) do

# 每隔 30 秒执行一次 :publish_heartbeat
:timer.send_interval(@thirty_seconds, self(), :publish_heartbeat)
Process.send(self(), :publish_heartbeat, [:noconnect, :nosuspend])

{:ok, opts}
end

进程处理 :publish_heartbeat

1
2
3
4
5
@impl true
def handle_info(:publish_heartbeat, state) do # 匹配该处理函数
publish_heartbeat(state) # 这里进行具体处理
{:noreply, state}
end

实际上执行的函数如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
defp publish_heartbeat(state) do
topic = "wasmbus.evt.#{state[:lattice_prefix]}"
msg = generate_heartbeat(state)
HostCore.Nats.safe_pub(:control_nats, topic, msg) # 实际上,就是 pub 了这个
end

def safe_pub(process_name, topic, msg) do
if Process.whereis(process_name) != nil do
trace_context = :otel_propagator_text_map.inject([])
Gnat.pub(process_name, topic, msg, headers: trace_context)
else
Logger.warn("Publication on #{topic} aborted - connection #{process_name} is down",
nats_topic: topic
)
end
end

sub event

wasmcloud_host/lib/wasmcloud_host/lattice/state_monitor.ex

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def init(_opts) do
state = %State{
linkdefs: %{},
refmaps: %{},
claims: %{},
hosts: %{
HostCore.Host.host_key() => %{
actors: %{},
providers: %{},
labels: HostCore.Host.host_labels()
}
}
}

prefix = HostCore.Host.lattice_prefix()

topic = "wasmbus.evt.#{prefix}"
{:ok, _sub} = Gnat.sub(:control_nats, self(), topic) # 这里 sub 了事件,自己处理

Registry.register(Registry.EventMonitorRegistry, "cache_loader_events", [])

{:ok, state, {:continue, :retrieve_cache}}
end

当 sub 到消息时,会 match 对应的 handle_info 函数进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@impl true
def handle_info(
{:msg, %{body: body, topic: topic}},
state
) do
Logger.debug("StateMonitor handle info #{topic}")

state =
if String.starts_with?(topic, "wasmbus.evt.") do
handle_event(state, body) # 这里根据具体的 body,Match 对应的处理函数
end

{:noreply, state}
end

defp handle_event(state, body) do
evt =
body
|> Cloudevents.from_json!()

process_event(state, evt) # 最后,由 process_event 根据 event 进行匹配处理
end

process event 根据 lattice event 的 type 来进行处理,目前已知的 event 类型有(作为 cloudEvent 格式的 type 字段)

  • com.wasmcloud.lattice.actor_started
  • com.wasmcloud.lattice.actor_stopped
  • com.wasmcloud.lattice.provider_started
  • com.wasmcloud.lattice.provider_stopped
  • com.wasmcloud.lattice.host_heartbeat
  • com.wasmcloud.lattice.health_check_passed
  • com.wasmcloud.lattice.health_check_failed
  • com.wasmcloud.lattice.refmap_set
  • com.wasmcloud.lattice.linkdef_set
  • com.wasmcloud.lattice.linkdef_deleted

我们这里处理的是 host_heartbeat

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
defp process_event(
state,
%Cloudevents.Format.V_1_0.Event{
data: %{"actors" => actors, "providers" => providers}, # 这里把获取的 actors 和 providers 分别赋值给 actors providers 变量
datacontenttype: "application/json",
source: source_host,
type: "com.wasmcloud.lattice.host_heartbeat"
}
) do
Logger.debug("Handling host heartbeat")

current_host = Map.get(state.hosts, source_host, %{})

##### 根据 heartbeat 传回来的 actors 和 providers,更新 process 的状态。同时给 phoenix 发送最新状态

# TODO: Also ensure that actors don't exist in the dashboard that aren't in the health check
actor_map =
actors
|> Enum.reduce(%{}, fn actor, actor_map ->
actor_id = Map.get(actor, "public_key")

actor_info = Map.get(actor_map, actor_id, %{})
count = Map.get(actor_info, :count, 0) + 1

status =
current_host
|> Map.get(:actors, %{})
|> Map.get(actor_id, %{})
|> Map.get(:status, "Awaiting")

Map.put(actor_map, actor_id, %{count: count, status: status})
end)

# TODO: Also ensure that providers don't exist in the dashboard that aren't in the health check
# Provider map is keyed by a tuple of the form {public_key, link_name}
provider_map =
providers
|> Enum.reduce(%{}, fn provider, provider_map ->
provider_id = Map.get(provider, "public_key")
link_name = Map.get(provider, "link_name")

existing_provider =
current_host |> Map.get(:providers, %{}) |> Map.get({provider_id, link_name})

if existing_provider != nil do
Map.put(provider_map, {provider_id, link_name}, existing_provider)
else
Map.put(
provider_map,
{provider_id, link_name},
%{
contract_id: Map.get(provider, "contract_id"),
status: "Awaiting"
}
)
end
end)

host =
if current_host == %{} do
labels =
case Gnat.request(
:control_nats,
"wasmbus.ctl.#{HostCore.Host.lattice_prefix()}.get.#{source_host}.inv",
"",
[{:receive_timeout, 2_000}]
) do
{:ok, msg} ->
Map.get(Jason.decode!(msg.body), "labels", %{})

{:error, :timeout} ->
%{}
end

%{
actors: actor_map,
providers: provider_map,
labels: labels
}
else
%{
actors: actor_map,
providers: provider_map,
labels: state.hosts |> Map.get(source_host) |> Map.get(:labels)
}
end

hosts = Map.put(state.hosts, source_host, host)
PubSub.broadcast(WasmcloudHost.PubSub, "lattice:state", {:hosts, hosts})
%State{state | hosts: hosts}
end

总结

host_core 作为 emitter,负责产生事件,里面携带了 host,actors,providers 数据。

wasmcloud_host 处理了该事件,用于更新 web ui 上最新的 actors,providers 的状态。