kvredis 和 http-server-rs 两个 provider 的 main 函数入口如下:
kvredis/src/main.rs
1 2 3 4 5 6 7 8 9 10 11 12
fnmain() -> Result<(), Box<dyn std::error::Error>> { // handle lattice control messages and forward rpc to the provider dispatch // returns when provider receives a shutdown control message // 执行后,常驻 runtime,和 rpc 消息进行交互,直到接收到关闭指令后,才会退出。 provider_main( KvRedisProvider::default(), Some("KeyValue Redis Provider".to_string()), )?;
eprintln!("KVRedis provider exiting"); Ok(()) }
httpserver-rs/bin/main.rs
1 2 3 4 5 6 7 8 9 10 11
fnmain() -> Result<(), Box<dyn std::error::Error>> { // handle lattice control messages and forward rpc to the provider dispatch // returns when provider receives a shutdown control message provider_main( HttpServerProvider::default(), Some("HttpServer Provider".to_string()), )?;
/// Start provider services: tokio runtime, logger, nats, and rpc subscriptions, pubfn **provider_start**<P>( provider_dispatch: P, host_data: HostData, friendly_name: Option<String>, ) -> Result<(), Box<dyn std::error::Error>> where P: ProviderDispatch + Send + Sync + Clone + 'static, { // 这里的 runtime 是一个 tokio runtime 的库。 // 简单来说,就是提供了诸如:并发、异步等编程所用的运行时,用用户态的调度方式来管理并发编程,类似 Go 的协程管理,Erlang 的进程管理等 let runtime = tokio::runtime::Builder::new_multi_thread() .enable_all() //.enable_io() .build()?;
// block_on 就是执行一个异步操作,等待其结果 **runtime.block_on(async { provider_run(provider_dispatch, host_data, friendly_name).await })?;** // in the unlikely case there are any stuck threads, // close them so the process has a clean exit runtime.shutdown_timeout(core::time::Duration::from_secs(10)); Ok(()) }
// 初始化 nats-server,建立链接 let nats_addr = if !host_data.lattice_rpc_url.is_empty() { host_data.lattice_rpc_url.as_str() } else { crate::provider::DEFAULT_NATS_ADDR }; let nats_server = async_nats::ServerAddr::from_str(nats_addr).map_err(|e| { RpcError::InvalidParameter(format!("Invalid nats server url '{}': {}", nats_addr, e)) })?;
let nc = crate::rpc_client::with_connection_event_logging( match ( host_data.lattice_rpc_user_jwt.trim(), host_data.lattice_rpc_user_seed.trim(), ) { ("", "") => async_nats::ConnectOptions::default(), (rpc_jwt, rpc_seed) => { let key_pair = std::sync::Arc::new(nkeys::KeyPair::from_seed(rpc_seed).unwrap()); let jwt = rpc_jwt.to_owned(); async_nats::ConnectOptions::with_jwt(jwt, move |nonce| { let key_pair = key_pair.clone(); async move { key_pair.sign(&nonce).map_err(async_nats::AuthError::new) } }) } }, ) .connect(nats_server) .await?;
// initialize HostBridge // HostBriage 管理 nats 和 host 之间的通信 let bridge = HostBridge::new_client(nc, &host_data)?; set_host_bridge(bridge).ok(); let bridge = get_host_bridge();
// pre-populate provider and bridge with initial set of link definitions // initialization of any link is fatal for provider startup // 如果已经有相关的 link 定义,那么在启动 provider 时候,需要恢复这些 link 信息 let initial_links = host_data.link_definitions.clone(); for ld in initial_links.into_iter() { ifletErr(e) = provider_dispatch.put_link(&ld).await { eprintln!( "Failed to initialize link during provider startup - ({:?}): {:?}", &ld, e ); } else { bridge.put_link(ld).await; } }
// run until we receive a shutdown request from host // 直到接收到 shutdown 请求,不然一直在这里监听 **let _ = shutdown_rx.recv().await;**
// close chunkifiers let _ = tokio::task::spawn_blocking(crate::chunkify::shutdown).await;
// flush async_nats client bridge.flush().await;
Ok(()) }
在我们添加完 provider 后,wasmcloud host 输出的日志如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
[debug] StateMonitor handle info wasmbus.evt.default [debug] Handling host heartbeat **[debug] Received control interface request on wasmbus.ctl.default.cmd.NBTXBYKGMTTLY6ZJHUMW4MBO2R6CJBPE5VC3VQ6XON3TMO4DASQ4AG2H.lp** [debug] StateMonitor handle info wasmbus.evt.default [debug] Handling host heartbeat [info] Starting executable capability provider from '/var/folders/wj/68rwh9s94w17h9z0frb4h6w00000gn/T/wasmcloudcache/VAZVC4RX54J2NVCMCW7BPCAHGGG5XZXDBXFUMDUXGESTMQEJLC3YVZWB/1660779368/wasmcloud_keyvalue_default' **[debug] Publishing OCI ref map for"wasmcloud.azurecr.io/kvredis:0.16.3"** [debug] StateMonitor handle info wasmbus.evt.default span_id=e2b0fb00ba96b407 trace_id=8bdcf587b24a5221c1ea0a9bf721d630 [**debug] Successfully started provider wasmcloud.azurecr.io/kvredis:0.16.3 (default)** [debug] StateMonitor handle info wasmbus.evt.default [debug] Cached claims for VAZVC4RX54J2NVCMCW7BPCAHGGG5XZXDBXFUMDUXGESTMQEJLC3YVZWB [debug] Cached OCI map reference from wasmcloud.azurecr.io/kvredis:0.16.3 to VAZVC4RX54J2NVCMCW7BPCAHGGG5XZXDBXFUMDUXGESTMQEJLC3YVZWB RUST_LOG was not set or the given directive was invalid: FromEnvError { kind: Env(NotPresent) } Defaulting logger to `info` level **Starting capability provider VAZVC4RX54J2NVCMCW7BPCAHGGG5XZXDBXFUMDUXGESTMQEJLC3YVZWB instance 8efc5842-5b6e-4400-afeb-95c6207deb29 with nats url 127.0.0.1:4222** [debug] StateMonitor handle info wasmbus.evt.default [debug] Handling successful health check for VAZVC4RX54J2NVCMCW7BPCAHGGG5XZXDBXFUMDUXGESTMQEJLC3YVZWB
/// Handle provider control commands /// put_link (new actor link command), del_link (remove link command), and shutdown #[async_trait] impl ProviderHandler for KvRedisProvider { /// Provider should perform any operations needed for a new link, /// including setting up per-actor resources, and checking authorization. /// If the link is allowed, return true, otherwise return false to deny the link. **// put_link 的时候建立链接** #[instrument(level = "debug", skip(self, ld), fields(actor_id = %ld.actor_id))] async fn **put_link**(&self, ld: &LinkDefinition) -> RpcResult<bool> { let redis_url = match ld.values.get(REDIS_URL_KEY) { Some(v) => v.as_str(), None => DEFAULT_CONNECT_URL, }; let client = redis::Client::open(redis_url).map_err(|e| { RpcError::ProviderInit(format!("redis connection to {}: {}", redis_url, e)) })?; let connection = client.get_async_connection().await.map_err(|e| { RpcError::ProviderInit(format!("redis connection to {}: {}", redis_url, e)) })?;
/// Handle notification that a link is dropped - close the connection **// delete_link 的时候,移除链接** #[instrument(level = "info", skip(self))] async fn **delete_link**(&self, actor_id: &str) { letmut aw = self.actors.write().await; ifletSome(conn) = aw.remove(actor_id) { info!("redis closing connection for actor {}", actor_id); drop(conn) } }
/// Handle shutdown request by closing all connections // shutdown 时关闭所有链接 async fn **shutdown**(&self) -> Result<(), Infallible> { letmut aw = self.actors.write().await; // empty the actor link data and stop all servers for (_, conn) in aw.drain() { drop(conn) } Ok(()) } }
我们看到在 http-server-rs(一个实现 http server 的 provider) 中,有类似处理
#[async_trait] impl ProviderHandler for HttpServerProvider { /// Provider should perform any operations needed for a new link, /// including setting up per-actor resources, and checking authorization. /// If the link is allowed, return true, otherwise return false to deny the link. async fn **put_link**(&self, ld: &LinkDefinition) -> Result<bool, RpcError> { let settings = load_settings(&ld.values).map_err(|e| RpcError::ProviderInit(e.to_string()))?;
**let http_server = HttpServerCore::new(settings.clone(), get_host_bridge());** http_server.start(ld.clone()).await.map_err(|e| { RpcError::ProviderInit(format!( "starting httpserver for {} {:?}: {}", &ld.actor_id, &settings.address, e )) })?;
/// Handle notification that a link is dropped - stop the http listener async fn **delete_link**(&self, actor_id: &str) { letmut aw = self.actors.write().await; ifletSome(server) = aw.remove(actor_id) { tracing::info!(%actor_id, "httpserver stopping listener for actor"); **server.begin_shutdown().await;** } }
/// Handle shutdown request by shutting down all the http server threads async fn **shutdown**(&self) -> Result<(), Infallible> { letmut aw = self.actors.write().await; // empty the actor link data and stop all servers **for (_, server) in aw.drain() { server.begin_shutdown().await; }** Ok(()) } }