Dynamo KVindexer

1 minute read

Source code check for KVindexer. Some help from Zhihu’s Dynamo code analysis.

It’s mainly written in the Rust code.

1 KV Router

KV router initialized KV indexer and call the method.

kv_listener = self.runtime.namespace("dynamo").component("VllmWorker")
await kv_listener.create_service()
self.indexer = KvIndexer(kv_listener, self.args.block_size)

scores = await self.indexer.find_matches_for_request(
                request.tokens, lora_id
            )

2 KV Indexer

First all KV Indexer returns OverlapScores object which contains two field

  1. scores is the HashMap shows worker_id->matches
  2. frequencies is freq of queries Here are the logic of find_matches_for_request:
    find_matches_for_request(tokens) 
     let sequence = compute_block_hash_for_seq(tokens, self.kv_block_size)
         tokens.chunks_exact(kv_block_size)
         bytes = num.to_le_bytes() ## Convert each i32 to its little-endian bytes
         compute_block_hash(&Bytes::from(bytes)) ## Convert the byte Vec to Bytes
     self.find_matches(sequence).await
         let req = MatchRequest {
             sequence,
             early_exit: false,
             resp: resp_tx,
         }
         self.match_tx.send(req) # Send out MatchRequest
         resp_rx.await  # async wait for match response
    

3 KV RadixTree

The find_matches in the KV RadixTree perform the actually matches.

pub fn find_matches(&self, sequence: Vec<LocalBlockHash>, early_exit: bool)
    let mut current = self.root.clone()
    for block_hash in sequence 
        next_block = current.children.get(&block_hash).cloned()
        Some(block) = next_block
        scores.update_scores(&block.borrow().workers)
            let score = self.scores.entry(*worker).or_insert(0); # initial worker as 0
            *score += 1; # increase worker counter
        scores.add_frequency(block_mut.recent_uses.len())
            # block_mut.recent_uses is recent use of this block less than expiration duration
            # The len is the frequency of recent uses
            self.frequencies.push(frequency);

A chinese version of the score returned is as below:

  1. 假设现在存在序列[“我爱北京城门”],并且kv_block_size=2,假如经过上述的 tokenized,分块,字节转化,哈希化之后成为序列:[[432, 265],[251,234],[673,654]]。然后进行匹配:
  2. 第一个块[432, 265],发现 workers = {1, 2, 3} ,意味着 worker 1,2,3 都缓存了这个块,scores 更新之后会变成 scores = { 1:1, 2:1, 3:1 }
  3. 第二个块[251,234],发现 workers = {1, 2},意味着只有 worker 1,2 缓存了这个块,scores 更新之后会变成 scores = { 1:2, 2:2, 3:1 }
  4. 第三个块[673,654],发现 workers = {1},意味着只有 worker 1 缓存了这个块,scores 更新之后会变成 scores = { 1:3, 2:2, 3:1 }
  5. 匹配统计完成之后,上述的 scores 就会被返回,进行 _cost_function 的计算。

Tags:

Categories:

Updated: