The Default Connection
The connection layer is responsible for exchanging the state with other peers. As mentioned before, this is probably the more involved layer to implement. It is involved because one has to decide how the state should be exchanged and there are usually trade-offs to make. Exchanging to fast will be resource intensive, while exchanging to slow will extend the imbalance of the shared state.
Behavior
The Default
connection layer uses two threads to exchange the state with other peers: The publisher and receiver threads.
The publisher
thread publishes the only the changes made since the last publish time.
The receiver
thread will connect and pull the full state from a peer if they both hold a different version of the state.
Apart from the two threads for exchanging the state, the connection layer runs a HTTP server to allow other peers to connect and exchange the state with it.
It exposes two endpoints: one for retrieving the state if the other for setting the state.
Get
#![allow(unused)] fn main() { fn get_handler(state: state::SafeState, req: &Request<Body>) -> Response<Body> { let versions_match = req.uri().path().split('/').last().and_then(|version| { trace!("get_handler / version {} ({})", version, state.version()); (version.is_empty() || version != state.version()).into() }).unwrap(); if versions_match { Responses::ok(state.get_root().unwrap().into()) } else { Responses::no_content() } } }
Here, the Default
connection layer compares the version it holds against the version reported by the connected peer. If the versions do not match then
it will return the full state. Otherwise nothing will be returned.
Set
#![allow(unused)] fn main() { fn set_handler<'a>( state: state::SafeState, req: Request<Body>, ) -> impl FutureExt<Output = Result<Response<Body>>> + 'a { hyper::body::to_bytes(req.into_body()) .and_then(move |body| async move { let body = &body as &dyn state::StateValue; let result = state.set(body); Ok(match result { Ok(_) => Responses::no_content(), _ => Responses::unprocessable(None), }) }) .map_err(|e| e.into()) } }
The Default
connection layer exposes this function to allow other peers to publish their state. It will accept the state and set it to the State
layer.
The Receiver Thread
The receiver thread connects to other peers and sends a HTTP GET
request while specifying the version it holds.
It uses a peer provider to get the full list of available peers and randomly chooses a few of them.
It them loops through the responses and sets them to the state.
#![allow(unused)] fn main() { async fn receiver(&self, state: state::SafeState) -> Result<()> { loop { // sample r0 peers let peers = self.peer_provider.get(); let peers = peers.into_iter().sample(self.r0); let res = stream::iter(peers) .map(|peer| { let url = format!("http://{}:{}/{}", peer.ip(), peer.port().unwrap_or(self.target_port.unwrap_or(self.port)), state.version()); let timeout = self.timeout; tokio::spawn(async move { trace!("Retreiving from {}", url); let client = Client::builder() .connect_timeout(Duration::from_millis(timeout)) .build().unwrap(); let result = client .get(&url.to_string()) .send() .await? .bytes() .await; result }) }) .buffer_unordered(4); res.collect::<Vec<_>>().await.iter().for_each(|result| { if let Ok(Ok(result)) = result { if let Err(e) = state.set(result as &dyn state::StateValue) { warn!("Failed to set peer response to state; {}", e); } } }); time::delay_for(time::Duration::from_millis(self.pull_interval)).await; } } }
The Publisher Thread
The publisher thread connects to other peers and publishes the changes since last publish time.
#![allow(unused)] fn main() { async fn publisher(&self, state: state::SafeState) -> Result<()> { let mut last_published: Vec<u8> = Vec::<u8>::default(); let mut last_published_version = String::default(); loop { if last_published_version == state.version() { time::delay_for(time::Duration::from_millis(self.push_interval)).await; continue; } let last = last_published.clone(); let state_clone = state.clone(); let res = tokio::task::spawn_blocking(move || { // get the recent state let root = state_clone.get_root().unwrap().as_bytes().unwrap(); if root == last { return None; } let state_to_publish = state_clone.diff(&last).and_then(|diff| { Ok(diff.as_bytes().unwrap()) }).or::<Vec<u8>>(Ok(root.clone())).unwrap(); Some((state_to_publish, root)) }).await?; if res.is_none() { time::delay_for(time::Duration::from_millis(self.push_interval)).await; continue; } let (state_to_publish, last) = res.unwrap(); last_published = last; last_published_version = state.version(); // sample r0 peers let peers = self.peer_provider.get(); let peers = peers.into_iter().sample(self.r0); // start sending to peers in parallel let res = stream::iter(peers) .map(|peer| { let state_to_publish = state_to_publish.clone(); let url = format!("http://{}:{}/", peer.ip(), peer.port().unwrap_or(self.target_port.unwrap_or(self.port))); let timeout = self.timeout; tokio::spawn(async move { trace!("Sending to {}", url); let client = Client::builder() .connect_timeout(Duration::from_millis(timeout)) .build().unwrap(); let result = client .put(&url.to_string()) .body(state_to_publish) .send() .await? .bytes() .await; result }) }) .buffer_unordered(4); res.collect::<Vec<_>>().await.iter().for_each(|result| { if let Ok(Ok(result)) = result { if let Err(e) = state.set(result as &dyn state::StateValue) { warn!("Failed to set peer response to state; {}", e); } } }); time::delay_for(time::Duration::from_millis(self.push_interval)).await; } } }