wasmcloud app provider 源码阅读

背景

provider 在 wasmcloud 体系中扮演者重要的作用。它负责实现 Interface,在 actor 和 provider 建立链接关系后,可为 actor 提供具体的能力。对于同一个 Interface,可以有不同的 Provider 实现,在 link def 阶段,再去指定具体由哪个 Provider 给 actor 提供能力。

而 actor 调用具体的 Interface 方法时,则由 wasmcloud host runtime 作为协调者,接收到 actor 调用的方法信息(包括函数名称,参数),进而寻找对应的 provider 进行执行,并将调用结果返回。

主流程

Provider 的运行机理主要分三块:

  1. 启动 provider,建立和 wasmcloud runtime 的通信链接。这部分在我们 start provider 的时候就会执行。
  2. 实现 interface 中定义的 operations
  3. 处理控制 provider 的部分事件(可选)

启动 provider

我们可以在平台(或使用 wasmcloud shell)来启动 provider。启动后,会进行一系列的初始化,并向 wasmcloud runtime 中注册该 provider。

kvredis 和 http-server-rs 两个 provider 的 main 函数入口如下:

kvredis/src/main.rs

1
2
3
4
5
6
7
8
9
10
11
12
fn main() -> Result<(), Box<dyn std::error::Error>> {
// handle lattice control messages and forward rpc to the provider dispatch
// returns when provider receives a shutdown control message
// 执行后,常驻 runtime,和 rpc 消息进行交互,直到接收到关闭指令后,才会退出。
provider_main(
KvRedisProvider::default(),
Some("KeyValue Redis Provider".to_string()),
)?;

eprintln!("KVRedis provider exiting");
Ok(())
}

httpserver-rs/bin/main.rs

1
2
3
4
5
6
7
8
9
10
11
fn main() -> Result<(), Box<dyn std::error::Error>> {
// handle lattice control messages and forward rpc to the provider dispatch
// returns when provider receives a shutdown control message
provider_main(
HttpServerProvider::default(),
Some("HttpServer Provider".to_string()),
)?;

eprintln!("HttpServer provider exiting");
Ok(())
}

我们看到 provider 的 main 干的事情是一致的,即调用 provider_main 来启动 provider server。其中 provider_main 是由 wasmbus-rpc 库来提供,作为 provider 的开发者,直接调用即可。我们简单分析下 wasmbus-rpc 的 相关实现。

wasmbus-rpc-src/provider_main.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/// Start provider services: tokio runtime, logger, nats, and rpc subscriptions
pub fn **provider_main**<P>(
provider_dispatch: P,
friendly_name: Option<String>,
) -> Result<(), Box<dyn std::error::Error>>
where
P: ProviderDispatch + Send + Sync + Clone + 'static,
{
// get lattice configuration from host
// 通过 stdin 获取 host 配置信息
let host_data = load_host_data().map_err(|e| {
eprintln!("error loading host data: {}", &e.to_string());
Box::new(e)
})?;
**provider_start**(provider_dispatch, host_data, friendly_name)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/// Start provider services: tokio runtime, logger, nats, and rpc subscriptions,
pub fn **provider_start**<P>(
provider_dispatch: P,
host_data: HostData,
friendly_name: Option<String>,
) -> Result<(), Box<dyn std::error::Error>>
where
P: ProviderDispatch + Send + Sync + Clone + 'static,
{
// 这里的 runtime 是一个 tokio runtime 的库。
// 简单来说,就是提供了诸如:并发、异步等编程所用的运行时,用用户态的调度方式来管理并发编程,类似 Go 的协程管理,Erlang 的进程管理等
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
//.enable_io()
.build()?;

// block_on 就是执行一个异步操作,等待其结果
**runtime.block_on(async { provider_run(provider_dispatch, host_data, friendly_name).await })?;**
// in the unlikely case there are any stuck threads,
// close them so the process has a clean exit
runtime.shutdown_timeout(core::time::Duration::from_secs(10));
Ok(())
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96

/// Async provider initialization
// provider 初始化
pub async fn **provider_run**<P>(
provider_dispatch: P,
host_data: HostData,
friendly_name: Option<String>,
) -> Result<(), Box<dyn std::error::Error>>
where
P: ProviderDispatch + Send + Sync + Clone + 'static,
{
configure_tracing(
friendly_name.unwrap_or_else(|| host_data.provider_key.clone()),
host_data.structured_logging,
);

let (shutdown_tx, mut shutdown_rx) = tokio::sync::broadcast::channel::<bool>(1);

// 这里输出了一条日志信息,在我们启动 provider 时候可以看到该日志
**eprintln!(
"Starting capability provider {} instance {} with nats url {}",
&host_data.provider_key, &host_data.instance_id, &host_data.lattice_rpc_url,
);**

// 初始化 nats-server,建立链接
let nats_addr = if !host_data.lattice_rpc_url.is_empty() {
host_data.lattice_rpc_url.as_str()
} else {
crate::provider::DEFAULT_NATS_ADDR
};
let nats_server = async_nats::ServerAddr::from_str(nats_addr).map_err(|e| {
RpcError::InvalidParameter(format!("Invalid nats server url '{}': {}", nats_addr, e))
})?;

let nc = crate::rpc_client::with_connection_event_logging(
match (
host_data.lattice_rpc_user_jwt.trim(),
host_data.lattice_rpc_user_seed.trim(),
) {
("", "") => async_nats::ConnectOptions::default(),
(rpc_jwt, rpc_seed) => {
let key_pair = std::sync::Arc::new(nkeys::KeyPair::from_seed(rpc_seed).unwrap());
let jwt = rpc_jwt.to_owned();
async_nats::ConnectOptions::with_jwt(jwt, move |nonce| {
let key_pair = key_pair.clone();
async move { key_pair.sign(&nonce).map_err(async_nats::AuthError::new) }
})
}
},
)
.connect(nats_server)
.await?;

// initialize HostBridge
// HostBriage 管理 nats 和 host 之间的通信
let bridge = HostBridge::new_client(nc, &host_data)?;
set_host_bridge(bridge).ok();
let bridge = get_host_bridge();

// pre-populate provider and bridge with initial set of link definitions
// initialization of any link is fatal for provider startup
// 如果已经有相关的 link 定义,那么在启动 provider 时候,需要恢复这些 link 信息
let initial_links = host_data.link_definitions.clone();
for ld in initial_links.into_iter() {
if let Err(e) = provider_dispatch.put_link(&ld).await {
eprintln!(
"Failed to initialize link during provider startup - ({:?}): {:?}",
&ld, e
);
} else {
bridge.put_link(ld).await;
}
}

// subscribe to nats topics
// 订阅主题,用来和 wasmcloud host runtime 交互更自己相关的消息
let _join = bridge
.connect(
provider_dispatch,
&shutdown_tx,
&host_data.lattice_rpc_prefix,
)
.await;

// run until we receive a shutdown request from host
// 直到接收到 shutdown 请求,不然一直在这里监听
**let _ = shutdown_rx.recv().await;**

// close chunkifiers
let _ = tokio::task::spawn_blocking(crate::chunkify::shutdown).await;

// flush async_nats client
bridge.flush().await;

Ok(())
}

在我们添加完 provider 后,wasmcloud host 输出的日志如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
[debug] StateMonitor handle info wasmbus.evt.default
[debug] Handling host heartbeat
**[debug] Received control interface request on wasmbus.ctl.default.cmd.NBTXBYKGMTTLY6ZJHUMW4MBO2R6CJBPE5VC3VQ6XON3TMO4DASQ4AG2H.lp**
[debug] StateMonitor handle info wasmbus.evt.default
[debug] Handling host heartbeat
[info] Starting executable capability provider from '/var/folders/wj/68rwh9s94w17h9z0frb4h6w00000gn/T/wasmcloudcache/VAZVC4RX54J2NVCMCW7BPCAHGGG5XZXDBXFUMDUXGESTMQEJLC3YVZWB/1660779368/wasmcloud_keyvalue_default'
**[debug] Publishing OCI ref map for "wasmcloud.azurecr.io/kvredis:0.16.3"**
[debug] StateMonitor handle info wasmbus.evt.default
span_id=e2b0fb00ba96b407 trace_id=8bdcf587b24a5221c1ea0a9bf721d630 [**debug] Successfully started provider wasmcloud.azurecr.io/kvredis:0.16.3 (default)**
[debug] StateMonitor handle info wasmbus.evt.default
[debug] Cached claims for VAZVC4RX54J2NVCMCW7BPCAHGGG5XZXDBXFUMDUXGESTMQEJLC3YVZWB
[debug] Cached OCI map reference from wasmcloud.azurecr.io/kvredis:0.16.3 to VAZVC4RX54J2NVCMCW7BPCAHGGG5XZXDBXFUMDUXGESTMQEJLC3YVZWB
RUST_LOG was not set or the given directive was invalid: FromEnvError { kind: Env(NotPresent) }
Defaulting logger to `info` level
**Starting capability provider VAZVC4RX54J2NVCMCW7BPCAHGGG5XZXDBXFUMDUXGESTMQEJLC3YVZWB instance 8efc5842-5b6e-4400-afeb-95c6207deb29 with nats url 127.0.0.1:4222**
[debug] StateMonitor handle info wasmbus.evt.default
[debug] Handling successful health check for VAZVC4RX54J2NVCMCW7BPCAHGGG5XZXDBXFUMDUXGESTMQEJLC3YVZWB

其中倒数第三行加粗的部分,就是在 provider_run 函数中输出的日志。

实现 interface

对于 keyvalue 这个 interface 而言,我们可以用 redis 来实现 kv 功能,也可以使用 vault。甚至可以使用其他你认为任意其他合适的存储驱动。实现完成后,actor 可根据自己的实际需求,来 link 对应的 provider 满足自己的需求。

那对于 provider 而言,就需要实现 interface trait 中定义的所有方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
use wasmcloud_interface_keyvalue::{
GetResponse, IncrementRequest, **KeyValue**, KeyValueReceiver, ListAddRequest, ListDelRequest,
ListRangeRequest, SetAddRequest, SetDelRequest, SetRequest, StringList,
};

/// Redis keyValue provider implementation.
// 这里通过一个 services 过程宏。给 KvRedisProvider extend 出 KeyValue 的 trait,进而约束其实现所有方法
#[derive(Default, Clone, Provider)]
**#[services(KeyValue)]**
struct KvRedisProvider {
// store redis connections per actor
actors: Arc<RwLock<HashMap<String, RwLock<Connection>>>>,
}

/// Handle KeyValue methods that interact with redis
#[async_trait]
**impl KeyValue for KvRedisProvider {**
/// Increments a numeric value, returning the new value
#[instrument(level = "debug", skip(self, ctx, arg), fields(actor_id = ?ctx.actor, key = %arg.key))]
**async fn increment(&self, ctx: &Context, arg: &IncrementRequest) -> RpcResult<i32> {**
**let mut cmd = redis::Cmd::incr(&arg.key, &arg.value); // 实现 increment 方法**
let val: i32 = self.exec(ctx, &mut cmd).await?;
Ok(val)
}

处理事件

wasmcloud host runtime 会抛出部分事件,而 Provider 可以选择处理其中的部分事件。拿 rediskv 的 provider 而言,我们需要在用户 put link 的时候,建立和 redis 的链接,这样在后面的所有操作中,可以复用该链接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/// Handle provider control commands
/// put_link (new actor link command), del_link (remove link command), and shutdown
#[async_trait]
impl ProviderHandler for KvRedisProvider {
/// Provider should perform any operations needed for a new link,
/// including setting up per-actor resources, and checking authorization.
/// If the link is allowed, return true, otherwise return false to deny the link.
**// put_link 的时候建立链接**
#[instrument(level = "debug", skip(self, ld), fields(actor_id = %ld.actor_id))]
async fn **put_link**(&self, ld: &LinkDefinition) -> RpcResult<bool> {
let redis_url = match ld.values.get(REDIS_URL_KEY) {
Some(v) => v.as_str(),
None => DEFAULT_CONNECT_URL,
};
let client = redis::Client::open(redis_url).map_err(|e| {
RpcError::ProviderInit(format!("redis connection to {}: {}", redis_url, e))
})?;
let connection = client.get_async_connection().await.map_err(|e| {
RpcError::ProviderInit(format!("redis connection to {}: {}", redis_url, e))
})?;

let mut update_map = self.actors.write().await;
update_map.insert(ld.actor_id.to_string(), RwLock::new(connection));

Ok(true)
}

/// Handle notification that a link is dropped - close the connection
**// delete_link 的时候,移除链接**
#[instrument(level = "info", skip(self))]
async fn **delete_link**(&self, actor_id: &str) {
let mut aw = self.actors.write().await;
if let Some(conn) = aw.remove(actor_id) {
info!("redis closing connection for actor {}", actor_id);
drop(conn)
}
}

/// Handle shutdown request by closing all connections
// shutdown 时关闭所有链接
async fn **shutdown**(&self) -> Result<(), Infallible> {
let mut aw = self.actors.write().await;
// empty the actor link data and stop all servers
for (_, conn) in aw.drain() {
drop(conn)
}
Ok(())
}
}

我们看到在 http-server-rs(一个实现 http server 的 provider) 中,有类似处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#[async_trait]
impl ProviderHandler for HttpServerProvider {
/// Provider should perform any operations needed for a new link,
/// including setting up per-actor resources, and checking authorization.
/// If the link is allowed, return true, otherwise return false to deny the link.
async fn **put_link**(&self, ld: &LinkDefinition) -> Result<bool, RpcError> {
let settings =
load_settings(&ld.values).map_err(|e| RpcError::ProviderInit(e.to_string()))?;

**let http_server = HttpServerCore::new(settings.clone(), get_host_bridge());**
http_server.start(ld.clone()).await.map_err(|e| {
RpcError::ProviderInit(format!(
"starting httpserver for {} {:?}: {}",
&ld.actor_id, &settings.address, e
))
})?;

let mut update_map = self.actors.write().await;
update_map.insert(ld.actor_id.to_string(), http_server);

Ok(true)
}

/// Handle notification that a link is dropped - stop the http listener
async fn **delete_link**(&self, actor_id: &str) {
let mut aw = self.actors.write().await;
if let Some(server) = aw.remove(actor_id) {
tracing::info!(%actor_id, "httpserver stopping listener for actor");
**server.begin_shutdown().await;**
}
}

/// Handle shutdown request by shutting down all the http server threads
async fn **shutdown**(&self) -> Result<(), Infallible> {
let mut aw = self.actors.write().await;
// empty the actor link data and stop all servers
**for (_, server) in aw.drain() {
server.begin_shutdown().await;
}**
Ok(())
}
}

总结

回顾一下 provider 的主要职能: