The Default State

In this section we will walk through the implementation of the Default state layer. By the end of this chapter you should be able to implement your own state layer.

Behavior

Before diving into the code, let's first try to describe the state behavior.

The Store

The Default state layer holds a HashMap of String keys to Values. A value is a structure that holds an optional TTL, a timestamp and the value itself which is a serde_json::Value.

When the state layer receives a StateValue to be set, it will deserialize it into a HashMap and will merge it into its own HashMap.

Put in other words, the Default state layer expects the value to be a JSON object with an optional TTL, a timestamp and a value which can be anything JSON compatible.

Resolving Conflicts

To resolve conflicts in case two different peers hold different versions of the same key, the state layer uses a timestamp. The timestamp is set when a key is being committed to the state. This means that a newer timestamp will always "win" the conflict.

The Code

Let's explore the code.

The Default struct

This struct holds the configuration of the state and implements the State trait.


#![allow(unused)]
fn main() {
pub struct Default {
    /// The default TTL (in milliseconds) to use if none is specified when setting a new value.
    ttl: Option<u64>,

    /// The interval in milliseconds in which to purge expired values.
    ///
    /// Default value is 1 minute (60000 milliseconds).
    purge_interval: u64,

    /// The DataSeeder to use for seeding the data on initialization.
    data_seeder: Option<Arc<RwLock<Box<dyn DataSeeder>>>>,

    /// The version of the current state.
    ///
    /// This is set to a random unique string on every state change.
    #[serde(skip_serializing, skip_deserializing)]
    version: Arc<RwLock<String>>,

    /// The SyncSender channel to use for async set operations
    ///
    /// When a set operation is being commited to the state, the state 
    /// will pass the operation to an async handler which will then commit the 
    /// changes to the state.
    #[serde(skip_serializing, skip_deserializing)]
    tx: Option<mpsc::SyncSender<Vec<u8>>>,

    /// The data storage in the form of a Key/Value hashmap.
    #[serde(skip_serializing, skip_deserializing)]
    storage: Arc<RwLock<HashMap<String, Box<Value>>>>,

    /// Calculating the version is a bit expensive so we use 
    /// the dirty flag to lazily calculate the verison on-demand.
    #[serde(skip_serializing, skip_deserializing)]
    is_dirty: Arc<RwLock<bool>>,
}
}

ttl - The Default state layer offers an option to set a default TTL value for keys that do not specify an explicit TTL.

purge_interval - The state will never return expired keys but will hold them in memory until purged at a certain interval for the sake of performance.

data_seeder - The state supports data seeders and will load the data when it first initializes.

version - The state calculates the state version by combining (xor) the hash of all the keys and their respective timestamps. Note that the version is behind a lock to conform to the thread-safety requirement.

tx - Setting a large value to the state might be CPU intensive operation. The state will do this in the background on a dedicated thread. This field is used for writing new set operations.

storage - The HashMap to hold the data. Note that the HashMap is behind a lock since we have to make sure our state is thread-safe.

is_dirty - For performance reasons, the state will use this flag to do some operations in a lazy manner. Calculating the state version, for example, is a costly operation. We want to calculate the version only on-demand and when the data has changed.

As you will learn in a following chapter, a few of the fields above are loaded by the configuration component of the C19 agent. Specifically, the ttl, purge_interval and the data_seeder are configured by the user.

The Value

The Value struct describes the values that we hold in our state.


#![allow(unused)]
fn main() {
struct Value {
    /// A serde_json::Value to hold any value that can be serialized into JSON format.
    value: serde_json::Value,

    /// The timestamp when this value was first created.
    #[serde(default = "epoch")]
    ts: u64,

    /// An optional TTL (resolved to an absolute epoch time) when this value will be expired.
    ttl: Option<u64>,
}
}

value - A serde_json::Value to hold the value itself.

ts - A timestamp representing the time when the key was first created.

ttl - An optional TTL for this value.

Implementing the State Trait

Let's explore a few of the more interesting functions of the State trait.

Init

To conform to the State trait we first have to implement the init function.


#![allow(unused)]
fn main() {
    fn init(&self) -> SafeState;
}

The init function is expected to return a SafeState which is a type definition of an Arc.


#![allow(unused)]
fn main() {
    fn init(&self) -> state::SafeState {
        let mut this = self.clone();

        // if we have a data seeder then use it to seed the data
        this.data_seeder.clone().and_then(|data_seeder| {
            info!("Seeding data...");
            if let Err(e) =  this.seed(data_seeder) {
                warn!("Failed to seed data; ({})", e);
            }

            Some(())
        });

        // start the async_set consumer thread
        let (tx, rx) = mpsc::sync_channel(MAX_SET_OPS);
        this.tx = Some(tx);

        let this = Arc::new(this);
        let t = this.clone();
        tokio::task::spawn_blocking(move || {async_set(t, rx)});
      
        // start the purger thread
        tokio::spawn(purge(this.clone()));

        this
    }
}

The Default state first loads a data seeder, if one is provided, and the goes on to initialize some internal threads. One of async set operations and one for purging expired values.

In the end it returns an Arc::new that wraps self.

Get


#![allow(unused)]
fn main() {
    fn get(&self, key: &dyn StateValue) -> Option<Box<dyn StateValue>> {
        let key: String = String::from_utf8(key.as_bytes().unwrap_or(Vec::new())).unwrap();

        let storage = self.storage.read().unwrap().clone();
        storage
            .get(&key)
            .cloned()
            .filter(|v| !v.is_expired())
            .map(|v| v.into())
    }
}

The state first deserialize the key to a String. It is up to the caller to make sure the key is a StateValue that represents a String.

Then, the state unlocks the storage for reading (recall that the storage is behind a read lock) looks up the key and filters it out if it has expired.

Set


#![allow(unused)]
fn main() {
    fn set(&self, map: &HashMap<String, Box<Value>>) {
        let map = map.clone();
        let mut is_dirty = false;

        // merge the maps
        let mut storage = self.storage.write().unwrap();
        for (key, mut right) in map {
            if right.is_expired() {
                continue;
            }

            if self.ttl.is_some() && right.ttl.is_none() {
                right.ttl = self.ttl;
            }

            storage.entry(key)
                .and_modify(|v| {
                    if v.ts < right.ts {
                        *v = right.clone().into();
                        is_dirty = true;
                    }})
            .or_insert({
                is_dirty = true;
                right.into()
            }); 
        }

        *self.is_dirty.write().unwrap() = is_dirty;
    }

}

The Default state layer implements async set operations by running a dedicated thread that consumes a channel. The set function above is called by the async set operation after it has deserialized the StateValue into a HashMap to make the actual set.

Here, the state merges the maps while marking the state as dirty. It does so to allow lazy operations to be performed only if the state has changed.

Implementing the StateValue

To implement a StateValue we have to implement a deserialization function that will deserialize our value into a vector of u8.

StateValue Trait


#![allow(unused)]
fn main() {
pub trait StateValue: Send + Sync {
    fn as_bytes(&self) -> Option<Vec<u8>>;
}
}

Value implementing the StateValue Trait


#![allow(unused)]
fn main() {
impl StateValue for Value {
    fn as_bytes(&self) -> Option<Vec<u8>> {
        serde_json::to_vec(self).ok()
    }
}
}

This way the connection and agent layers can pass a StateValue around without needing to know anything about it.