wasmcloud app interface 源码阅读

新建 interface

wasmcloud app 开发使用的是 CDD 模式,也就是基于契约开发的模式。其中契约的定义,使用 wash 指令即可完成:

1
wash new interface

执行完成后,会根据契约的相关属性信息(如 contractId)等,生成 smithy IDL 的定义文件。smithy 文件专注于契约的定义,而成功将契约和具体的语言实现解耦出来。

简单来说,同一套 smithy 定义文件,可以根据编程语言的不同,生成不同的代码,方便不同语言进行集成开发。

比如当前 wasmcloud 支持生成 rust 和 go 两种语言的代码。这样在基于 rust 语言开发 actor 时,可以直接在 cargo.toml 文件中引入 interface 的包,进而调用 interface 相关的 operation,也可以使用 interface 中的数据结构。

而这些 operation 的具体实现,则由 provider 来实现。

通过 smithy IDL 定义的契约,成功将 actor, provider 解耦出来。

actor-to-actor VS capability provider

在新建 interface 的时候,有两种模板可选,分别是 actor-to-actor 和 provider。其区别主要在 wasmbus 的相关处理上。本文主要讨论的是 capability provider。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// factorial.smithy
// A simple service that calculates the factorial of a whole number

// Tell the code generator how to reference symbols defined in this namespace
metadata package = [ { namespace: "org.example.interfaces.factorial", crate: "factorial_interface" } ]

namespace org.example.interfaces.factorial

use org.wasmcloud.model#wasmbus
use org.wasmcloud.model#U32
use org.wasmcloud.model#U64

/// The Factorial service has a single method, calculate, which
/// calculates the factorial of its whole number parameter.
**@wasmbus(
contractId: "example:interfaces:factorial",
actorReceive: true,
providerReceive: true )**
service Factorial {
version: "0.1",
operations: [ Calculate ]
}

/// Calculates the factorial (n!) of the input parameter
operation Calculate {
input: U32,
output: U64
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// converter.smithy
//

// Tell the code generator how to reference symbols defined in this namespace
metadata package = [ { namespace: "org.example.interfaces.converter", crate: "converter_interface" } ]

namespace org.example.interfaces.converter

use org.wasmcloud.model#wasmbus

/// Description of Converter service
@wasmbus( actorReceive: true )
service Converter {
version: "0.1",
operations: [ Convert ]
}

/// Converts the input string to a result
operation Convert {
input: String,
output: String
}

文件结构

这里以 keyvalue interface 为例进行分析,具体的定义可参阅下面的链接:

https://github.com/wasmCloud/interfaces/tree/main/keyvalue

我们在 smithy IDL 中主要定义了下面三块内容:

  1. metadata,包括:contractId, version 等
  2. operations,主要是该契约允许操作的方法,比如:increment, contains, del, get, set 等
  3. structure,主要是用于各个 operation 的传参和入参,除了标量类型外的符合类型,包括:GetReponse, GetRequest 等

部分归纳如下

metadata contractId, version, namespace
operations get, set, del, increment, contains
structure GetResponse, GetRequest, SetResponse, SetRequest

我们重点看生成的 rust 文件 keyvalue/rust/src/keyvalue.rs

文件脉络如下:

  1. 根据 smithy 定义的结构体信息,使用 rust 语言语法格式,定义各种 struct。除此之外,为了能够在 wasmcloud 运行时中传递该结构,需要对结构进行必要的序列化和反序列化(CBOR)。我们以 GetReponse 结构体为例。

    • GetResponse Smithy IDL,契约定义

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      /// Response to get request
      structure GetResponse {
      /// the value, if it existed
      @required
      @n(0)
      value: String,
      /// whether or not the value existed
      @required
      @n(1)
      exists: Boolean,
      }
- `GetResponse` rust struct,rust语言的结构体定义

    
1
2
3
4
5
6
7
8
9
10
/// Response to get request
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
pub struct GetResponse {
/// the value, if it existed
#[serde(default)]
pub value: String,
/// whether or not the value existed
#[serde(default)]
pub exists: bool,
}
- `GetResponse` rust encode,结构体序列化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Encode GetResponse as CBOR and append to output stream
#[doc(hidden)]
#[allow(unused_mut)]
pub fn encode_get_response<W: wasmbus_rpc::cbor::Write>(
mut e: &mut wasmbus_rpc::cbor::Encoder<W>,
val: &GetResponse,
) -> RpcResult<()>
where
<W as wasmbus_rpc::cbor::Write>::Error: std::fmt::Display,
{
e.array(2)?; // 结构体成员数量 2
e.str(&val.value)?; // 第一个成员的类型 string
e.bool(val.exists)?; // 第二个成员的类型 bool
Ok(())
}
- `GetResponse` rust decode,结构体反序列化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// Decode GetResponse from cbor input stream
#[doc(hidden)]
pub fn decode_get_response(
d: &mut wasmbus_rpc::cbor::Decoder<'_>,
) -> Result<GetResponse, RpcError> {
let __result = {
let mut value: Option<String> = None;
let mut exists: Option<bool> = None;

let is_array = match d.datatype()? {
wasmbus_rpc::cbor::Type::Array => true,
wasmbus_rpc::cbor::Type::Map => false,
_ => {
return Err(RpcError::Deser(
"decoding struct GetResponse, expected array or map".to_string(),
))
}
};
if is_array {
let len = d.fixed_array()?;
for __i in 0..(len as usize) {
match __i {
0 => value = Some(d.str()?.to_string()),
1 => exists = Some(d.bool()?),
_ => d.skip()?,
}
}
} else {
let len = d.fixed_map()?;
for __i in 0..(len as usize) {
match d.str()? {
"value" => value = Some(d.str()?.to_string()),
"exists" => exists = Some(d.bool()?),
_ => d.skip()?,
}
}
}
GetResponse {
value: if let Some(__x) = value {
__x
} else {
return Err(RpcError::Deser(
"missing field GetResponse.value (#0)".to_string(),
));
},

exists: if let Some(__x) = exists {
__x
} else {
return Err(RpcError::Deser(
"missing field GetResponse.exists (#1)".to_string(),
));
},
}
};
Ok(__result)
}
  1. contract traIt 定义。这个定义比较简单,就是把 smithy IDL 的 operations 用 rust 语言翻译一遍。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    /// wasmbus.contractId: wasmcloud:keyvalue
    /// wasmbus.providerReceive
    #[async_trait]
    pub trait KeyValue {
    /// returns the capability contract id for this interface
    fn contract_id() -> &'static str {
    "wasmcloud:keyvalue"
    }
    /// Increments a numeric value, returning the new value
    async fn increment(&self, ctx: &Context, arg: &IncrementRequest) -> RpcResult<i32>;
    /// returns whether the store contains the key
    async fn contains<TS: ToString + ?Sized + std::marker::Sync>(
    &self,
    ctx: &Context,
    arg: &TS,
    ) -> RpcResult<bool>;
    /// Deletes a key, returning true if the key was deleted
    async fn del<TS: ToString + ?Sized + std::marker::Sync>(
    &self,
    ctx: &Context,
    arg: &TS,
    ) -> RpcResult<bool>;
    /// Gets a value for a specified key. If the key exists,
    /// the return structure contains exists: true and the value,
    /// otherwise the return structure contains exists == false.
    async fn get<TS: ToString + ?Sized + std::marker::Sync>(
    &self,
    ctx: &Context,
    arg: &TS,
    ) -> RpcResult<GetResponse>;
    /// Append a value onto the end of a list. Returns the new list size
    async fn list_add(&self, ctx: &Context, arg: &ListAddRequest) -> RpcResult<u32>;
    /// Deletes a list and its contents
    /// input: list name
    /// output: true if the list existed and was deleted
    async fn list_clear<TS: ToString + ?Sized + std::marker::Sync>(
    &self,
    ctx: &Context,
    arg: &TS,
    ) -> RpcResult<bool>;
    /// Deletes a value from a list. Returns true if the item was removed.
    async fn list_del(&self, ctx: &Context, arg: &ListDelRequest) -> RpcResult<bool>;
    /// Retrieves a range of values from a list using 0-based indices.
    /// Start and end values are inclusive, for example, (0,10) returns
    /// 11 items if the list contains at least 11 items. If the stop value
    /// is beyond the end of the list, it is treated as the end of the list.
    async fn list_range(&self, ctx: &Context, arg: &ListRangeRequest) -> RpcResult<StringList>;
    /// Sets the value of a key.
    /// expires is an optional number of seconds before the value should be automatically deleted,
    /// or 0 for no expiration.
    async fn set(&self, ctx: &Context, arg: &SetRequest) -> RpcResult<()>;
    /// Add an item into a set. Returns number of items added (1 or 0)
    async fn set_add(&self, ctx: &Context, arg: &SetAddRequest) -> RpcResult<u32>;
    /// Deletes an item from the set. Returns number of items removed from the set (1 or 0)
    async fn set_del(&self, ctx: &Context, arg: &SetDelRequest) -> RpcResult<u32>;
    /// perform intersection of sets and returns values from the intersection.
    /// input: list of sets for performing intersection (at least two)
    /// output: values
    async fn set_intersection(&self, ctx: &Context, arg: &StringList) -> RpcResult<StringList>;
    /// Retrieves all items from a set
    /// input: String
    /// output: set members
    async fn set_query<TS: ToString + ?Sized + std::marker::Sync>(
    &self,
    ctx: &Context,
    arg: &TS,
    ) -> RpcResult<StringList>;
    /// perform union of sets and returns values from the union
    /// input: list of sets for performing union (at least two)
    /// output: union of values
    async fn set_union(&self, ctx: &Context, arg: &StringList) -> RpcResult<StringList>;
    /// clears all values from the set and removes it
    /// input: set name
    /// output: true if the set existed and was deleted
    async fn set_clear<TS: ToString + ?Sized + std::marker::Sync>(
    &self,
    ctx: &Context,
    arg: &TS,
    ) -> RpcResult<bool>;
    }
  1. Receiver trait 定义。receiver 的 trait 定义如下,默认实现了 dispatch 方法,根据 message.method 来进行不同的数据转换。Message 的结构体定义如下:

    wasmbus-rpc/src/common.rs, 用于封装 wasmcloud message,主要是 trait 的 method 和 parameters

    1
    2
    3
    4
    5
    6
    7
    8
    9
    /// A wasmcloud message
    #[derive(Debug)]
    pub struct Message<'m> {
    /// Message name, usually in the form 'Trait.method'
    pub method: &'m str,
    /// parameter serialized as a byte array. If the method takes no args, the array will be
    /// zero length.
    pub arg: Cow<'m, [u8]>,
    }
`keyvalue/rust/src/keyvalue.rs`

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/// KeyValueReceiver receives messages defined in the KeyValue service trait
#[doc(hidden)]
#[async_trait]
**pub trait KeyValueReceiver: MessageDispatch + KeyValue {**
async fn dispatch(&self, ctx: &Context, message: Message<'_>) -> Result<Vec<u8>, RpcError> {
match message.method { // method 主要由 method 和 args 组成,用于传递消息
"**Increment**" => {
let value: IncrementRequest = wasmbus_rpc::common::deserialize(&message.arg)
.map_err(|e| RpcError::Deser(format!("'IncrementRequest': {}", e)))?;

let resp = **KeyValue::increment**(self, ctx, &value).await?; // 根据不同的 method 调用 trait 的不同方法
let buf = wasmbus_rpc::common::serialize(&resp)?; // 根据该方法的返回值,进行不同的反序列化

Ok(buf)
}
// ================= 其他更多 方法 ========== //
"Contains" => {
let value: String = wasmbus_rpc::common::deserialize(&message.arg)
.map_err(|e| RpcError::Deser(format!("'String': {}", e)))?;

let resp = KeyValue::contains(self, ctx, &value).await?;
let buf = wasmbus_rpc::common::serialize(&resp)?;

Ok(buf)
}
"Del" => {
let value: String = wasmbus_rpc::common::deserialize(&message.arg)
.map_err(|e| RpcError::Deser(format!("'String': {}", e)))?;

let resp = KeyValue::del(self, ctx, &value).await?;
let buf = wasmbus_rpc::common::serialize(&resp)?;

Ok(buf)
}
"Get" => {
let value: String = wasmbus_rpc::common::deserialize(&message.arg)
.map_err(|e| RpcError::Deser(format!("'String': {}", e)))?;

let resp = KeyValue::get(self, ctx, &value).await?;
let buf = wasmbus_rpc::common::serialize(&resp)?;

Ok(buf)
}
"Set" => {
let value: SetRequest = wasmbus_rpc::common::deserialize(&message.arg)
.map_err(|e| RpcError::Deser(format!("'SetRequest': {}", e)))?;

let _resp = KeyValue::set(self, ctx, &value).await?;
let buf = Vec::new();
Ok(buf)
}
_ => Err(RpcError::MethodNotHandled(format!(
"KeyValue::{}",
message.method
))),
}
}
}
  1. Sender trait 定义。Sender 用来传输调用的 method 和 args。

    keyvalue/rust/src/keyvalue.rs,根据调用的 operation 不同,组装不同的请求参数,进行不同的数据序列化

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    impl KeyValueSender<wasmbus_rpc::actor::prelude::WasmHost> {
    **/// Constructs a client for sending to a KeyValue provider
    /// implementing the 'wasmcloud:keyvalue' capability contract, with the "default" link**
    **pub fn new() -> Self** {
    let transport =
    **wasmbus_rpc::actor::prelude::WasmHost::to_provider("wasmcloud:keyvalue", "default")**
    .unwrap();
    Self { transport }
    }

    /// Constructs a client for sending to a KeyValue provider
    /// implementing the 'wasmcloud:keyvalue' capability contract, with the specified link name
    pub fn new_with_link(link_name: &str) -> wasmbus_rpc::error::RpcResult<Self> {
    let transport =
    wasmbus_rpc::actor::prelude::WasmHost::to_provider("wasmcloud:keyvalue", link_name)?;
    Ok(Self { transport })
    }
    }

    #[async_trait]
    impl<T: Transport + std::marker::Sync + std::marker::Send> KeyValue for KeyValueSender<T> {
    #[allow(unused)]
    /// Increments a numeric value, returning the new value
    async fn **increment**(&self, ctx: &Context, arg: &**IncrementRequest**) -> RpcResult<i32> {
    let buf = wasmbus_rpc::common::serialize(arg)?;

    let resp = self
    .transport
    .send(
    ctx,
    Message {
    **method: "KeyValue.Increment",**
    arg: Cow::Borrowed(&buf),
    },
    None,
    )
    .await?;

    let value: i32 = wasmbus_rpc::common::deserialize(&resp)
    .map_err(|e| RpcError::Deser(format!("'{}': I32", e)))?;
    Ok(value)
    }

actor 调用 interface

我们以 kvcounter 为例,首先需要在 makefile 中声明(claims)自己需要使用的契约。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# examples/actor/kvcounter

PROJECT = kvcounter
VERSION = $(shell cargo metadata --no-deps --format-version 1 | jq -r '.packages[] .version' | head -1)
REVISION = 0
# list of all contract claims for actor signing (space-separated)
**CLAIMS = wasmcloud:httpserver wasmcloud:keyvalue**
# registry url for our actor
REG_URL = localhost:5000/v2/$(PROJECT):$(VERSION)
# command to upload to registry (without last wasm parameter)
PUSH_REG_CMD = wash reg push --insecure $(REG_URL)
ACTOR_NAME = "KVCounter"

include ../../build/makefiles/actor.mk

其次在 cargo.toml 中引入需要的 interface。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
[package]
name = "kvcounter"
version = "0.3.4"
authors = [ "wasmcloud Team" ]
edition = "2021"

[lib]
crate-type = ["cdylib", "rlib"]

[dependencies]
async-trait = "0.1"
form_urlencoded = "1.0"
futures = "0.3"
serde_bytes = "0.11"
serde_json ="1.0"
serde = {version = "1.0", features = ["derive"]}

wasmbus-rpc = "0.9"
**wasmcloud-interface-keyvalue = "0.7.0"
wasmcloud-interface-httpserver = "0.6.0"**

[profile.release]
# Optimize for small code size
lto = true
opt-level = "s"

最后在 actor 中使用

1
2
3
4
5
6
async fn increment_counter(ctx: &Context, key: String, value: i32) -> RpcResult<i32> {
let new_val = **KeyValueSender::new()**
**.increment(ctx, &IncrementRequest { key, value })**
.await?;
Ok(new_val)
}

这里的 Sender::new().increment 就是我们之前在 Interface sender 中默认生成的方法,其核心就是把 actor 的调用包装成消息体发送出去:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
async fn increment(&self, ctx: &Context, arg: &IncrementRequest) -> RpcResult<i32> {
let buf = wasmbus_rpc::common::serialize(arg)?;

let resp = self
.transport
**.send( // wasmbus-rpc
ctx,
Message {
method: "KeyValue.Increment",
arg: Cow::Borrowed(&buf),
},
None,
)**
.await?;

let value: i32 = wasmbus_rpc::common::deserialize(&resp)
.map_err(|e| RpcError::Deser(format!("'{}': I32", e)))?;
Ok(value)
}

总结

根据上面的分析,我们初探了 interface 的具体代码生成规则:

  • 根据契约定义的不同,生成对应的结构体。包括序列化和反序列化
  • 包装调用方法,抽象出 Sender 和 Receiver 两个 trait
    • receiver 默认实现了 dispatch。里面根据具体调用的 method 进行数据的序列化和反序列化
    • sender 则根据不同的方法,生成了不同的 sender method,这样调用 Sender 的 method 时,就能直接发送对应的请求,请求体包含了 method 和 params
  • actor 实际调用 interface 时,就是向 runtime 发送了具体的 rpc 请求。再由 runtime 去调度对应的 provider 实现,进而返回。