新建 interface
wasmcloud app 开发使用的是 CDD 模式,也就是基于契约开发的模式。其中契约的定义,使用 wash 指令即可完成:
1 | wash new interface |
执行完成后,会根据契约的相关属性信息(如 contractId)等,生成 smithy IDL 的定义文件。smithy 文件专注于契约的定义,而成功将契约和具体的语言实现解耦出来。
简单来说,同一套 smithy 定义文件,可以根据编程语言的不同,生成不同的代码,方便不同语言进行集成开发。
比如当前 wasmcloud 支持生成 rust 和 go 两种语言的代码。这样在基于 rust 语言开发 actor 时,可以直接在 cargo.toml 文件中引入 interface 的包,进而调用 interface 相关的 operation,也可以使用 interface 中的数据结构。
而这些 operation 的具体实现,则由 provider 来实现。
通过 smithy IDL 定义的契约,成功将 actor, provider 解耦出来。
actor-to-actor VS capability provider
在新建 interface 的时候,有两种模板可选,分别是 actor-to-actor 和 provider。其区别主要在 wasmbus 的相关处理上。本文主要讨论的是 capability provider。
1 | // factorial.smithy |
1 | // converter.smithy |
文件结构
这里以 keyvalue interface 为例进行分析,具体的定义可参阅下面的链接:
https://github.com/wasmCloud/interfaces/tree/main/keyvalue
我们在 smithy IDL 中主要定义了下面三块内容:
- metadata,包括:contractId, version 等
- operations,主要是该契约允许操作的方法,比如:increment, contains, del, get, set 等
- structure,主要是用于各个 operation 的传参和入参,除了标量类型外的符合类型,包括:GetReponse, GetRequest 等
部分归纳如下
metadata | contractId, version, namespace |
---|---|
operations | get, set, del, increment, contains |
structure | GetResponse, GetRequest, SetResponse, SetRequest |
我们重点看生成的 rust 文件 keyvalue/rust/src/keyvalue.rs
文件脉络如下:
根据 smithy 定义的结构体信息,使用 rust 语言语法格式,定义各种 struct。除此之外,为了能够在 wasmcloud 运行时中传递该结构,需要对结构进行必要的序列化和反序列化(CBOR)。我们以 GetReponse 结构体为例。
GetResponse
Smithy IDL,契约定义1
2
3
4
5
6
7
8
9
10
11/// Response to get request
structure GetResponse {
/// the value, if it existed
@required
@n(0)
value: String,
/// whether or not the value existed
@required
@n(1)
exists: Boolean,
}
- `GetResponse` rust struct,rust语言的结构体定义
1
2
3
4
5
6
7
8
9
10
/// Response to get request
pub struct GetResponse {
/// the value, if it existed
pub value: String,
/// whether or not the value existed
pub exists: bool,
}
- `GetResponse` rust encode,结构体序列化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Encode GetResponse as CBOR and append to output stream
pub fn encode_get_response<W: wasmbus_rpc::cbor::Write>(
mut e: &mut wasmbus_rpc::cbor::Encoder<W>,
val: &GetResponse,
) -> RpcResult<()>
where
<W as wasmbus_rpc::cbor::Write>::Error: std::fmt::Display,
{
e.array(2)?; // 结构体成员数量 2
e.str(&val.value)?; // 第一个成员的类型 string
e.bool(val.exists)?; // 第二个成员的类型 bool
Ok(())
}
- `GetResponse` rust decode,结构体反序列化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// Decode GetResponse from cbor input stream
pub fn decode_get_response(
d: &mut wasmbus_rpc::cbor::Decoder<'_>,
) -> Result<GetResponse, RpcError> {
let __result = {
let mut value: Option<String> = None;
let mut exists: Option<bool> = None;
let is_array = match d.datatype()? {
wasmbus_rpc::cbor::Type::Array => true,
wasmbus_rpc::cbor::Type::Map => false,
_ => {
return Err(RpcError::Deser(
"decoding struct GetResponse, expected array or map".to_string(),
))
}
};
if is_array {
let len = d.fixed_array()?;
for __i in 0..(len as usize) {
match __i {
0 => value = Some(d.str()?.to_string()),
1 => exists = Some(d.bool()?),
_ => d.skip()?,
}
}
} else {
let len = d.fixed_map()?;
for __i in 0..(len as usize) {
match d.str()? {
"value" => value = Some(d.str()?.to_string()),
"exists" => exists = Some(d.bool()?),
_ => d.skip()?,
}
}
}
GetResponse {
value: if let Some(__x) = value {
__x
} else {
return Err(RpcError::Deser(
"missing field GetResponse.value (#0)".to_string(),
));
},
exists: if let Some(__x) = exists {
__x
} else {
return Err(RpcError::Deser(
"missing field GetResponse.exists (#1)".to_string(),
));
},
}
};
Ok(__result)
}
contract traIt 定义。这个定义比较简单,就是把 smithy IDL 的 operations 用 rust 语言翻译一遍。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80/// wasmbus.contractId: wasmcloud:keyvalue
/// wasmbus.providerReceive
pub trait KeyValue {
/// returns the capability contract id for this interface
fn contract_id() -> &'static str {
"wasmcloud:keyvalue"
}
/// Increments a numeric value, returning the new value
async fn increment(&self, ctx: &Context, arg: &IncrementRequest) -> RpcResult<i32>;
/// returns whether the store contains the key
async fn contains<TS: ToString + ?Sized + std::marker::Sync>(
&self,
ctx: &Context,
arg: &TS,
) -> RpcResult<bool>;
/// Deletes a key, returning true if the key was deleted
async fn del<TS: ToString + ?Sized + std::marker::Sync>(
&self,
ctx: &Context,
arg: &TS,
) -> RpcResult<bool>;
/// Gets a value for a specified key. If the key exists,
/// the return structure contains exists: true and the value,
/// otherwise the return structure contains exists == false.
async fn get<TS: ToString + ?Sized + std::marker::Sync>(
&self,
ctx: &Context,
arg: &TS,
) -> RpcResult<GetResponse>;
/// Append a value onto the end of a list. Returns the new list size
async fn list_add(&self, ctx: &Context, arg: &ListAddRequest) -> RpcResult<u32>;
/// Deletes a list and its contents
/// input: list name
/// output: true if the list existed and was deleted
async fn list_clear<TS: ToString + ?Sized + std::marker::Sync>(
&self,
ctx: &Context,
arg: &TS,
) -> RpcResult<bool>;
/// Deletes a value from a list. Returns true if the item was removed.
async fn list_del(&self, ctx: &Context, arg: &ListDelRequest) -> RpcResult<bool>;
/// Retrieves a range of values from a list using 0-based indices.
/// Start and end values are inclusive, for example, (0,10) returns
/// 11 items if the list contains at least 11 items. If the stop value
/// is beyond the end of the list, it is treated as the end of the list.
async fn list_range(&self, ctx: &Context, arg: &ListRangeRequest) -> RpcResult<StringList>;
/// Sets the value of a key.
/// expires is an optional number of seconds before the value should be automatically deleted,
/// or 0 for no expiration.
async fn set(&self, ctx: &Context, arg: &SetRequest) -> RpcResult<()>;
/// Add an item into a set. Returns number of items added (1 or 0)
async fn set_add(&self, ctx: &Context, arg: &SetAddRequest) -> RpcResult<u32>;
/// Deletes an item from the set. Returns number of items removed from the set (1 or 0)
async fn set_del(&self, ctx: &Context, arg: &SetDelRequest) -> RpcResult<u32>;
/// perform intersection of sets and returns values from the intersection.
/// input: list of sets for performing intersection (at least two)
/// output: values
async fn set_intersection(&self, ctx: &Context, arg: &StringList) -> RpcResult<StringList>;
/// Retrieves all items from a set
/// input: String
/// output: set members
async fn set_query<TS: ToString + ?Sized + std::marker::Sync>(
&self,
ctx: &Context,
arg: &TS,
) -> RpcResult<StringList>;
/// perform union of sets and returns values from the union
/// input: list of sets for performing union (at least two)
/// output: union of values
async fn set_union(&self, ctx: &Context, arg: &StringList) -> RpcResult<StringList>;
/// clears all values from the set and removes it
/// input: set name
/// output: true if the set existed and was deleted
async fn set_clear<TS: ToString + ?Sized + std::marker::Sync>(
&self,
ctx: &Context,
arg: &TS,
) -> RpcResult<bool>;
}
Receiver trait 定义。receiver 的 trait 定义如下,默认实现了 dispatch 方法,根据 message.method 来进行不同的数据转换。Message 的结构体定义如下:
wasmbus-rpc/src/common.rs
, 用于封装 wasmcloud message,主要是 trait 的 method 和 parameters1
2
3
4
5
6
7
8
9/// A wasmcloud message
pub struct Message<'m> {
/// Message name, usually in the form 'Trait.method'
pub method: &'m str,
/// parameter serialized as a byte array. If the method takes no args, the array will be
/// zero length.
pub arg: Cow<'m, [u8]>,
}
`keyvalue/rust/src/keyvalue.rs`
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/// KeyValueReceiver receives messages defined in the KeyValue service trait
**pub trait KeyValueReceiver: MessageDispatch + KeyValue {**
async fn dispatch(&self, ctx: &Context, message: Message<'_>) -> Result<Vec<u8>, RpcError> {
match message.method { // method 主要由 method 和 args 组成,用于传递消息
"**Increment**" => {
let value: IncrementRequest = wasmbus_rpc::common::deserialize(&message.arg)
.map_err(|e| RpcError::Deser(format!("'IncrementRequest': {}", e)))?;
let resp = **KeyValue::increment**(self, ctx, &value).await?; // 根据不同的 method 调用 trait 的不同方法
let buf = wasmbus_rpc::common::serialize(&resp)?; // 根据该方法的返回值,进行不同的反序列化
Ok(buf)
}
// ================= 其他更多 方法 ========== //
"Contains" => {
let value: String = wasmbus_rpc::common::deserialize(&message.arg)
.map_err(|e| RpcError::Deser(format!("'String': {}", e)))?;
let resp = KeyValue::contains(self, ctx, &value).await?;
let buf = wasmbus_rpc::common::serialize(&resp)?;
Ok(buf)
}
"Del" => {
let value: String = wasmbus_rpc::common::deserialize(&message.arg)
.map_err(|e| RpcError::Deser(format!("'String': {}", e)))?;
let resp = KeyValue::del(self, ctx, &value).await?;
let buf = wasmbus_rpc::common::serialize(&resp)?;
Ok(buf)
}
"Get" => {
let value: String = wasmbus_rpc::common::deserialize(&message.arg)
.map_err(|e| RpcError::Deser(format!("'String': {}", e)))?;
let resp = KeyValue::get(self, ctx, &value).await?;
let buf = wasmbus_rpc::common::serialize(&resp)?;
Ok(buf)
}
"Set" => {
let value: SetRequest = wasmbus_rpc::common::deserialize(&message.arg)
.map_err(|e| RpcError::Deser(format!("'SetRequest': {}", e)))?;
let _resp = KeyValue::set(self, ctx, &value).await?;
let buf = Vec::new();
Ok(buf)
}
_ => Err(RpcError::MethodNotHandled(format!(
"KeyValue::{}",
message.method
))),
}
}
}
Sender trait 定义。Sender 用来传输调用的 method 和 args。
keyvalue/rust/src/keyvalue.rs
,根据调用的 operation 不同,组装不同的请求参数,进行不同的数据序列化1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42impl KeyValueSender<wasmbus_rpc::actor::prelude::WasmHost> {
**/// Constructs a client for sending to a KeyValue provider
/// implementing the 'wasmcloud:keyvalue' capability contract, with the "default" link**
**pub fn new() -> Self** {
let transport =
**wasmbus_rpc::actor::prelude::WasmHost::to_provider("wasmcloud:keyvalue", "default")**
.unwrap();
Self { transport }
}
/// Constructs a client for sending to a KeyValue provider
/// implementing the 'wasmcloud:keyvalue' capability contract, with the specified link name
pub fn new_with_link(link_name: &str) -> wasmbus_rpc::error::RpcResult<Self> {
let transport =
wasmbus_rpc::actor::prelude::WasmHost::to_provider("wasmcloud:keyvalue", link_name)?;
Ok(Self { transport })
}
}
impl<T: Transport + std::marker::Sync + std::marker::Send> KeyValue for KeyValueSender<T> {
/// Increments a numeric value, returning the new value
async fn **increment**(&self, ctx: &Context, arg: &**IncrementRequest**) -> RpcResult<i32> {
let buf = wasmbus_rpc::common::serialize(arg)?;
let resp = self
.transport
.send(
ctx,
Message {
**method: "KeyValue.Increment",**
arg: Cow::Borrowed(&buf),
},
None,
)
.await?;
let value: i32 = wasmbus_rpc::common::deserialize(&resp)
.map_err(|e| RpcError::Deser(format!("'{}': I32", e)))?;
Ok(value)
}
actor 调用 interface
我们以 kvcounter 为例,首先需要在 makefile 中声明(claims)自己需要使用的契约。
1 | # examples/actor/kvcounter |
其次在 cargo.toml 中引入需要的 interface。
1 | [package] |
最后在 actor 中使用
1 | async fn increment_counter(ctx: &Context, key: String, value: i32) -> RpcResult<i32> { |
这里的 Sender::new().increment 就是我们之前在 Interface sender 中默认生成的方法,其核心就是把 actor 的调用包装成消息体发送出去:
1 | async fn increment(&self, ctx: &Context, arg: &IncrementRequest) -> RpcResult<i32> { |
总结
根据上面的分析,我们初探了 interface 的具体代码生成规则:
- 根据契约定义的不同,生成对应的结构体。包括序列化和反序列化
- 包装调用方法,抽象出 Sender 和 Receiver 两个 trait
- receiver 默认实现了 dispatch。里面根据具体调用的 method 进行数据的序列化和反序列化
- sender 则根据不同的方法,生成了不同的 sender method,这样调用 Sender 的 method 时,就能直接发送对应的请求,请求体包含了 method 和 params
- actor 实际调用 interface 时,就是向 runtime 发送了具体的 rpc 请求。再由 runtime 去调度对应的 provider 实现,进而返回。