Implementing a Key-value Store, Part 2
In the last post, I introduced the idea of linear hashing. This post will describe a Rust implementation of the algorithm. I won’t go through every last line of code, but hopefully enough to give you a good understanding of how the whole thing works. I should also mention that even though this is a post about implementing linear hashing, it spends quite some time talking about how storing data to disk works. This is intentional– articles about hashtable implementations are aplenty; articles talking about how external storage datastructures work, in my opinion, are not.
You can check out the full source code on Github.
The interface
Before diving into how the hashtable will work, let’s first discuss how a client using the hashtable will be expected to use it.
Our hashtable implementation will map byte arrays to byte arrays. That is, we’ll let the client program figure out how to serialize/deserialize keys and values.
This is how a linear hashing table will be created:
let mut h = LinHash::open("book_ratings", 32, 4);
The parameters to open
are the filename, the length of the key and
the length of the value. That means that our key-value pairs(or
records) will be stored in the file “book_ratings”; keys will be 32
bytes long(in the example above, it’s probably a string) and the value
will be 4 bytes long(probably an integer). Keeping key and value size
fixed is somewhat restrictive(RocksDB, for example, allows you to just
pass in bytearrays of any length) but this will simplify how records
are stored in our file.
Inserting and looking up works like so(I’ve put an assert for get
just to give an idea of what the return value looks like):
h.put(b"Spin", &i32_to_bytearray(9)); h.put(b"Axis", &i32_to_bytearray(6)); // Assumes little-endian architecture assert_eq!(h.get(b"Spin"), Some(vec![9, 0, 0, 0]));
I know this feels clunky but not having to worry about
serialization/deserialization will simplify things quite a bit for
us. I should mention that i32_to_byte_array
(and other similarly
named functions in the util
module) use
std::mem::transmute
under the hood and so are not
portable across architectures(hence the comment above about
endianness).
Implementation overview
Here’s a high-level description of what happens when you call put
as
in the example above(the rest of the post goes into it in depth, of
course). First we hash the key, and take however many bits the
hashtable is currently set to take(see last post). This tells
us which bucket to place the record in. A bucket is a linked list of
pages, which are chunks of bytes on disk. Pages in the linked list may
be full, so we now need to figure out which page the record should go
in. Once we figure out which page it should go in, this page is
fetched from disk. We then make the necessary changes to the page in
memory– eg. write record, increment number of records in page’s
header)– then save it out to disk.
get
is very similar and uses the same method(search_bucket
described below) that we use to figure out which page in the bucket
the record should be placed in.
Pages and buffer pool
A file is divided into pages of size 4KB. When we want to read or write a some chunk of bytes in a page, we have to read the page into memory, make whatever changes we want to make on the copy resident in memory, then save the updated page out to disk.
Now, we will usually have a little more than 4KB of memory at our disposal, so what we can do is buffer the pages we read and write. So, instead of flushing the page out to disk once we do one operation, we keep the page in memory as long as possible. In this post, we will use a rather simplistic(and inefficient!) way to decide which pages get to stay(or rather which page gets evicted)– a FIFO queue1. If we have a buffer pool of size 2, and we read pages in the order 1,1,2,3 this is what the reads/writes to disk will look like:
read 1 [fetch page 1] read 1 [no fetch necessary, 1 is already in pool] read 2 [fetch page 2] read 3 [fetch page 3; place in slot occupied by 1]
a page resident in our buffer pool is represented in a Page
:
pub struct Page { pub id: usize, pub storage: [u8; PAGE_SIZE], pub num_records: usize, // page_id of overflow bucket pub next: Option<usize>, pub dirty: bool, keysize: usize, valsize: usize, }
Notice that besides the bytearray(storage
) we have some other
metadata about the page here too:
id
specifies which page in the file this page isstorage
is all the bytes in the page copied out to a byte arraynum_records
specifies how many records this page hasnext
is what strings the overflow pages we talked about in the last post. It is also used to keep track of pages that used to be overflow pages but are not in use.dirty
specifies whether the page here is out of sync with its corresponding page on file.- the next two fields
keysize
andvalsize
specify what length the key and value bytearrays are in the records the page stores.
The metadata is stored in the page itself. To read and write this metadata we have the following methods:
// in a `impl Page` block pub fn read_header(&mut self) { let num_records : usize = bytearray_to_usize(self.storage[0..8].to_vec()); let next : usize = bytearray_to_usize(self.storage[8..16].to_vec()); self.num_records = num_records; self.next = if next != 0 { Some(next) } else { None }; } pub fn write_header(&mut self) { mem_move(&mut self.storage[0..8], &usize_to_bytearray(self.num_records)); mem_move(&mut self.storage[8..16], &usize_to_bytearray(self.next.unwrap_or(0))); }
mem_move
is a function similar to the memcpy
system call.
Of course, the main content of the page are the records it stores. These are read and written using the following methods
pub fn read_record(&mut self, row_num: usize) -> (&[u8], &[u8]) { let offsets = self.compute_offsets(row_num); let key = &self.storage[offsets.key_offset..offsets.val_offset]; let val = &self.storage[offsets.val_offset..offsets.row_end]; (key, val) } pub fn write_record(&mut self, row_num: usize, key: &[u8], val: &[u8]) { let offsets = self.compute_offsets(row_num); mem_move(&mut self.storage[offsets.key_offset..offsets.val_offset], key); mem_move(&mut self.storage[offsets.val_offset..offsets.row_end], val); }
I think both of the above are fairly straightforward. Because records are of fixed-size and because we’re only dealing with bytearrays, it’s just a matter of moving stuff around.
Buckets
Being able to read and write records to pages is great, but we need to be able to write to buckets without having to know in advance where and in which page the record will go to:
pub struct SearchResult { pub page_id: Option<usize>, pub row_num: Option<usize>, pub val: Option<Vec<u8>> } // impl DbFile { ... pub fn search_bucket(&mut self, bucket_id: usize, key: &[u8]) -> SearchResult { let mut page_id = self.bucket_to_page(bucket_id); let mut buffer_index; let mut first_free_row = SearchResult { page_id: None, row_num: None, val: None, }; loop { buffer_index = self.fetch_page(page_id); let next_page = self.buffers[buffer_index].next; let page_records = self.all_records_in_page(page_id); let len = page_records.len(); for (row_num, (k,v)) in page_records.into_iter().enumerate() { if slices_eq(&k, key) { return SearchResult{ page_id: Some(page_id), row_num: Some(row_num), val: Some(v) } } } let row_num = if len < self.records_per_page { Some(len) } else { None }; match (first_free_row.page_id, first_free_row.row_num) { // this is the first free space for a row found, so // keep track of it. (Some(_), None) | (None, _) => { first_free_row = SearchResult { page_id: Some(page_id), row_num: row_num, val: None, } }, _ => (), } if let Some(p) = next_page { page_id = p; } else { break; } } first_free_row }
This may look like a lot but what it’s doing is not too complicated. It
keeps fetching the next page in the bucket until it finds the record
with the key it’s looking for. What is perhaps interesting is what it
returns when it doesn’t find the record(because the record hasn’t been
inserted)– if there’s space, it indicates in SearchResult
which
page and row to insert the record in; if there isn’t any space, it
returns what the last page it looked in was, which is meant to be used
when creating an overflow page.
Methods in LinHash
We now know how to figure out where a record should be
placed in a bucket(search_bucket
) and how to place record in said
location(write_record
). An implementation of our hashtable’s put
operation arises quite naturally:
// impl LinHash { ... pub fn put(&mut self, key: &[u8], val: &[u8]) { let bucket_index = self.bucket(&key); match self.buckets.search_bucket(bucket_index, key.clone()) { SearchResult { page_id, row_num, val: old_val } => { match (page_id, row_num, old_val) { // new insert (Some(page_id), Some(pos), None) => { self.buckets.write_record_incr(page_id, pos, key, val); self.nitems += 1; }, // case for update (Some(_page_id), Some(pos), Some(_old_val)) => { panic!("can't use put to reinsert old item: {:?}", (key, val)); }, // new insert, in overflow page (Some(last_page_id), None, None) => { // overflow self.buckets.allocate_overflow(bucket_index, last_page_id); self.put(key, val); }, _ => panic!("impossible case"), } }, } self.maybe_split(); self.buckets.write_ctrlpage((self.nbits, self.nitems, self.nbuckets)); }
The self.bucket
call at the very top hashes the key and computes
which bucket the item should go in. Remember that which bucket should
go in depends on which how many bits we’re looking at and how many
buckets we have. We covered how this works in part 1, so we won’t go
into that now.
This is how maybe_split
is implemented:
fn maybe_split(&mut self) -> bool { if self.split_needed() { self.nbuckets += 1; self.buckets.allocate_new_bucket(); if self.nbuckets > (1 << self.nbits) { self.nbits += 1; } // Take index of last item added and subtract the 1 at the // MSB position. eg: after bucket 11 is added, bucket 01 // needs to be split let bucket_to_split = (self.nbuckets-1) ^ (1 << (self.nbits-1)); // Replace the bucket to split with a fresh, empty // page. And get a list of all records stored in the bucket let old_bucket_records = self.buckets.clear_bucket(bucket_to_split); // Re-hash all records in old_bucket. Ideally, about half // of the records will go into the new bucket. for (k, v) in old_bucket_records.into_iter() { self.reinsert(&k, &v); } return true } false }
Limitations
Let me close by stating some of the limitations of this implementation. I’ve already hinted at the limitations of the cache eviction policy we use. Most real systems use LRU. The error handling story here is also not great– we should not be panicking when the client uses the datastructure incorrectly.
More importantly, our hashtable cannot handle concurrent accesses. We’re not using locks anywhere so, if multiple threads are inserting records, we almost certainly will see the data go out of whack.
Thanks to Kevin Butler for sending in corrections and suggestions.
A much better way of doing cache eviction is to kick out the least-recently used page. I chose FIFO queue just because it’s much simpler to implement.