
作者 chauncy 日期 2020-01-15

自己实现一个简单的DB(2) -底层的存储

数据放入到我们的磁盘(机械磁盘,ssd nvme 等)


public interface DbFile {
* 通过pageId,读取磁盘里面的数据,每次读取一个page
public Page readPage(PageId id);
* 将 一个page 的内容写入到磁盘
* @param p
* @throws IOException
public void writePage(Page p) throws IOException;
* 通过事务将一个tuple 插入到 db file 里面,这个是需要获取一个lock,来讲进行报货并发的错误
* @param tid
* @param t
* @return
* @throws DbException
* @throws IOException
* @throws TransactionAbortedException
public ArrayList<Page> insertTuple(TransactionId tid, Tuple t)
throws DbException, IOException, TransactionAbortedException;
* 删除指定tuple,在一个事务里面,同理也需要获取一个lock
* @param tid
* @param t
* @return
* @throws DbException
* @throws TransactionAbortedException
public Page deleteTuple(TransactionId tid, Tuple t)
throws DbException, TransactionAbortedException;
* 获取一个 iterator,在获取数据的db file 里面,都是通过buffer pool,来获取,
* buffer pool 则是通过 iterator 来一个一个tuple 进行的获取,所以返回的是一个iterator
* @param tid
* @return
public DbFileIterator iterator(TransactionId tid);
* 返回 db file 的id,这个id与 catalog 进行,管理这个到后面会进行实现,先明白他是一个唯一表示服务就行
* @return
public int getId();
* 返回这个db file 的一个描述信息,即是数据库的字段描述
* @return
public TupleDesc getTupleDesc();

Db File 是数据库与磁盘的一个交互定义,定义了怎么获取在磁盘中的信息,然后db file 的访问都是通过 Buffer Pool 进行访问的,访问的方式是通过iterator 迭代器的模式,这个可以直接联系到就是我们在使用sql 查询的时候是需要limit 操作一样,如果数据类容过多不可能一次返回所有数据,需要分批获取,数据获取主要是通过Buffer Pool 获取主要是db 需要有一个自己的淘汰策略和数据的事务,这样就不依赖与操作系统的,因为他不是可控。

在目前的RDBMS 中,DBfile 表现的形式有 B+ tree 类型,也有直接在堆 上面的 Heap 的形式

简单一点,先从Heap 开始

* heap 实现的Db File
public class HeapFile implements DbFile {
private final File dbFile;
private final TupleDesc tupleDesc;
* construction
* @param f
* @param td
public HeapFile(File f, TupleDesc td) {
// some code goes here
this.dbFile = f;
this.tupleDesc = td;
* 返回 heap file
* @return
public File getFile() {
return dbFile;
* 返回一个id,每次都返回同样的一个 id,这里简单一点直接返回绝对路径的 hash code
* @return
public int getId() {
return dbFile.getAbsoluteFile().hashCode();
* 返回描述符
* @return
public TupleDesc getTupleDesc() {
return tupleDesc;
* 读取db file 里面的一页数据
* pageId,里面获取 读取第几页pageNo
* 通过buffer pool 获得 每一页的大小 pageSize
* 读取 pageNo * pageSize 之后的数据,因为我们是分页读取的,所以需要skip 掉
* pageNo * pageSize
* @param pid
* @return
public Page readPage(PageId pid) {
int tableid = pid.getTableId();
int pgNo = pid.pageNumber();
final int pageSize = Database.getBufferPool().getPageSize();
byte[] rawPgData = HeapPage.createEmptyPageData();
// random access read from disk
FileInputStream in = null;
try {
in = new FileInputStream(dbFile);
try {
in.skip(pgNo * pageSize);
return new HeapPage(new HeapPageId(tableid, pgNo), rawPgData);
} catch (IOException e) {
throw new IllegalArgumentException("HeapFile: readPage:");
} finally {
} catch (IOException e) {
throw new IllegalArgumentException("HeapFile: readPage: file not found");
* 这里和读取类似,需要做一个分页处理,写在特定的位置上面
* @param page
* @throws IOException
public void writePage(Page page) throws IOException {
PageId pid = page.getId();
int pgNo = pid.pageNumber();
final int pageSize = Database.getBufferPool().getPageSize();
byte[] pgData = page.getPageData();
RandomAccessFile dbfile = new RandomAccessFile(dbFile, "rws");
dbfile.skipBytes(pgNo * pageSize);
* 返回该文件的多少页
* @return
public int numPages() {
// some code goes here
int fileSizeinByte = (int) dbFile.length();
return fileSizeinByte / Database.getBufferPool().getPageSize();
* 在一个事务里面,添加一个tuple
* 首先我们去 buffer pool 里面寻找一个page ,如果找不到
* page 我们就新建一个page
* 然后在page里面插入 tuple
* 如果我们的page 还么有溢出那么就需要记录起来,通过list 记录起来,等待后面进行flush 刷新
* 如果超过我们现在buffer 了,则直接进行append,到file 里面
* @return
* @throws DbException
* @throws IOException
* @throws TransactionAbortedException
public ArrayList<Page> insertTuple(TransactionId tid, Tuple t)
throws DbException, IOException, TransactionAbortedException {
ArrayList<Page> affected = new ArrayList(1);
int numPages = numPages();
for (int pgNo = 0; pgNo <= numPages; pgNo++) {
HeapPageId pid = new HeapPageId(getId(), pgNo);
HeapPage pg;
if (pgNo < numPages) {
pg = (HeapPage) Database.getBufferPool().getPage(tid, pid, Permissions.READ_WRITE);
} else {
// pgNo = numpages -> we need add new page
pg = new HeapPage(pid, HeapPage.createEmptyPageData());
if (pg.getNumEmptySlots() > 0) {
// insert will update tuple when inserted
// writePage(pg);
if (pgNo < numPages) {
pg.markDirty(true, tid);
} else {
// should append the dbfile
return affected;
throw new DbException("HeapFile: InsertTuple: Tuple can not be added");
* 跟上插入tuple 有一定的类同
* 只是这里不会新建一个page,同时也需要记录更改过的page,方便后续进行flush 操作
* @param tid
* @param t
* @return
* @throws DbException
* @throws TransactionAbortedException
public ArrayList<Page> deleteTuple(TransactionId tid, Tuple t) throws DbException,
TransactionAbortedException {
// some code goes here
// not necessary for lab1
ArrayList<Page> affected = new ArrayList(1);
RecordId rid = t.getRecordId();
HeapPageId pid = (HeapPageId) rid.getPageId();
if (pid.getTableId() == getId()) {
// int pgNo = pid.pageNumber();
HeapPage pg = (HeapPage) Database.getBufferPool().getPage(tid, pid, Permissions.READ_WRITE);
// writePage(pg);
pg.markDirty(true, tid);
return affected;
throw new DbException("HeapFile: deleteTuple: tuple.tableid != getId");
* DB File iterator 的迭代器,之前说过,db 的获取是通过 iterator 获取的
private class HeapFileIterator implements DbFileIterator {
private Integer pgCursor;
private Iterator<Tuple> tupleIter;
private final TransactionId transactionId;
private final int tableId;
private final int numPages;
public HeapFileIterator(TransactionId tid) {
this.pgCursor = null;
this.tupleIter = null;
this.transactionId = tid;
this.tableId = getId();
this.numPages = numPages();
public void open() throws DbException, TransactionAbortedException {
pgCursor = 0;
tupleIter = getTupleIter(pgCursor);
public boolean hasNext() throws DbException, TransactionAbortedException {
// < numpage - 1
if (pgCursor != null) {
while (pgCursor < numPages - 1) {
if (tupleIter.hasNext()) {
return true;
} else {
pgCursor += 1;
tupleIter = getTupleIter(pgCursor);
return tupleIter.hasNext();
} else {
return false;
public Tuple next() throws DbException, TransactionAbortedException, NoSuchElementException {
if (hasNext()) {
return tupleIter.next();
throw new NoSuchElementException("HeapFileIterator: error: next: no more elemens");
public void rewind() throws DbException, TransactionAbortedException {
public void close() {
pgCursor = null;
tupleIter = null;
private Iterator<Tuple> getTupleIter(int pgNo)
throws TransactionAbortedException, DbException {
PageId pid = new HeapPageId(tableId, pgNo);
return ((HeapPage)
public DbFileIterator iterator(TransactionId tid) {
return new HeapFileIterator(tid);

Db File 存储的是真是数据库的数据,读取他的方式是通过 iterator 迭代器的方式,通过buffer pool 进行获取,每一次读取Db File 的最小大小是page 为单位,并且是他的整数倍,page 里面存储的就是数据是tuple ,tuple里面存储的是数值字段值

page 里面也有HeapPage 的实现,它主要是用于存储 Db File 里面的数据读入到这个page 里面来

public class HeapPage implements Page {
final HeapPageId pid; //page Id,一种heap 的形式
final TupleDesc td; //tuple 的desc
final byte header[]; //头信息
final Tuple tuples[]; //tuple 的元素
final int numSlots;
byte[] oldData;
private final Byte oldDataLock = new Byte((byte) 0);
* Create a HeapPage from a set of bytes of data read from disk.
* The format of a HeapPage is a set of header bytes indicating
* the slots of the page that are in use, some number of tuple slots.
* Specifically, the number of tuples is equal to: <p>
* floor((BufferPool.getPageSize()*8) / (tuple size * 8 + 1))
* <p> where tuple size is the size of tuples in this
* database table, which can be determined via {@link Catalog#getTupleDesc}.
* The number of 8-bit header words is equal to:
* <p>
* ceiling(no. tuple slots / 8)
* <p>
* @see Database#getCatalog
* @see Catalog#getTupleDesc
* @see BufferPool#getPageSize()
public HeapPage(HeapPageId id, byte[] data) throws IOException {
this.pid = id;
this.td = Database.getCatalog().getTupleDesc(id.getTableId());
this.numSlots = getNumTuples();
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(data));
// allocate and read the header slots of this page
header = new byte[getHeaderSize()];
for (int i = 0; i < header.length; i++)
header[i] = dis.readByte();
tuples = new Tuple[numSlots];
try {
// allocate and read the actual records of this page
for (int i = 0; i < tuples.length; i++)
tuples[i] = readNextTuple(dis, i);
} catch (NoSuchElementException e) {
* Retrieve the number of tuples on this page.
* @return the number of tuples on this page
private int getNumTuples() {
return (BufferPool.getPageSize() * 8) / (td.getSize() * 8 + 1);
* Computes the number of bytes in the header of a page in a HeapFile with each tuple occupying tupleSize bytes
* @return the number of bytes in the header of a page in a HeapFile with each tuple occupying tupleSize bytes
private int getHeaderSize() {
return (getNumTuples() + 7) / 8;
* Return a view of this page before it was modified
* -- used by recovery
public HeapPage getBeforeImage() {
try {
byte[] oldDataRef = null;
synchronized (oldDataLock) {
oldDataRef = oldData;
return new HeapPage(pid, oldDataRef);
} catch (IOException e) {
//should never happen -- we parsed it OK before!
return null;
public void setBeforeImage() {
synchronized (oldDataLock) {
oldData = getPageData().clone();
* @return the PageId associated with this page.
public HeapPageId getId() {
// some code goes here
return this.pid;
* Suck up tuples from the source file.
private Tuple readNextTuple(DataInputStream dis, int slotId) throws NoSuchElementException {
// if associated bit is not set, read forward to the next tuple, and
// return null.
if (!isSlotUsed(slotId)) {
for (int i = 0; i < td.getSize(); i++) {
try {
} catch (IOException e) {
throw new NoSuchElementException("error reading empty tuple");
return null;
// read fields in the tuple
Tuple t = new Tuple(td);
RecordId rid = new RecordId(pid, slotId);
try {
for (int j = 0; j < td.numFields(); j++) {
Field f = td.getFieldType(j).parse(dis);
t.setField(j, f);
} catch (java.text.ParseException e) {
throw new NoSuchElementException("parsing error!");
return t;
* Generates a byte array representing the contents of this page.
* Used to serialize this page to disk.
* <p>
* The invariant here is that it should be possible to pass the byte
* array generated by getPageData to the HeapPage constructor and
* have it produce an identical HeapPage object.
* @return A byte array correspond to the bytes of this page.
* @see #HeapPage
public byte[] getPageData() {
int len = BufferPool.getPageSize();
ByteArrayOutputStream baos = new ByteArrayOutputStream(len);
DataOutputStream dos = new DataOutputStream(baos);
// create the header of the page
for (int i = 0; i < header.length; i++) {
try {
} catch (IOException e) {
// this really shouldn't happen
// create the tuples
for (int i = 0; i < tuples.length; i++) {
// empty slot
if (!isSlotUsed(i)) {
for (int j = 0; j < td.getSize(); j++) {
try {
} catch (IOException e) {
// non-empty slot
for (int j = 0; j < td.numFields(); j++) {
Field f = tuples[i].getField(j);
try {
} catch (IOException e) {
// padding
int zerolen = BufferPool.getPageSize() - (header.length + td.getSize() * tuples.length); //- numSlots * td.getSize();
byte[] zeroes = new byte[zerolen];
try {
dos.write(zeroes, 0, zerolen);
} catch (IOException e) {
try {
} catch (IOException e) {
return baos.toByteArray();
* Static method to generate a byte array corresponding to an empty
* HeapPage.
* Used to add new, empty pages to the file. Passing the results of
* this method to the HeapPage constructor will create a HeapPage with
* no valid tuples in it.
* @return The returned ByteArray.
public static byte[] createEmptyPageData() {
int len = BufferPool.getPageSize();
return new byte[len]; //all 0
* Delete the specified tuple from the page; the tuple should be updated to reflect
* that it is no longer stored on any page.
* @param t The tuple to delete
* @throws DbException if this tuple is not on this page, or tuple slot is
* already empty.
public void deleteTuple(Tuple t) throws DbException {
// some code goes here
// not necessary for lab1
assert t != null;
RecordId recToDelete = t.getRecordId();
if (recToDelete != null && pid.equals(recToDelete.pageId)) {
for (int i = 0; i < numSlots; i++) {
if (isSlotUsed(i) && t.getRecordId().equals(tuples[i].getRecordId())) {
markSlotUsed(i, false);
// t.setRecordId(null);
tuples[i] = null;
throw new DbException("deleteTuple: Error: tuple slot is empty");
throw new DbException("deleteTuple: Error: tuple is not on this page");
* Adds the specified tuple to the page; the tuple should be updated to reflect
* that it is now stored on this page.
* @param t The tuple to add.
* @throws DbException if the page is full (no empty slots) or tupledesc
* is mismatch.
public void insertTuple(Tuple t) throws DbException {
// some code goes here
// not necessary for lab1
assert t != null;
if (td.equals(t.getTupleDesc())) {
for (int i = 0; i < numSlots; i++) {
if (!isSlotUsed(i)) {
// insert and update header
markSlotUsed(i, true);
t.setRecordId(new RecordId(pid, i));
tuples[i] = t;
throw new DbException("insertTuple: ERROR: no tuple is inserted");
throw new DbException("insertTuple: no empty slots or tupledesc is mismatch");
// private boolean dirty;
private TransactionId dirtier;
* Marks this page as dirty/not dirty and record that transaction
* that did the dirtying
public void markDirty(boolean dirty, TransactionId tid) {
// some code goes here
// not necessary for lab1
if (dirty) {
// this.dirty = dirty;
this.dirtier = tid;
} else {
// this.dirty = false;
this.dirtier = null;
* Returns the tid of the transaction that last dirtied this page, or null if the page is not dirty
public TransactionId isDirty() {
// some code goes here
// Not necessary for lab1
return dirtier;
* Returns the number of empty slots on this page.
public int getNumEmptySlots() {
// some code goes here
int numEmptSlot = 0;
for (int i = 0; i < numSlots; i++) {
if (!isSlotUsed(i)) {
numEmptSlot += 1;
return numEmptSlot;
* Returns true if associated slot on this page is filled.
public boolean isSlotUsed(int i) {
// some code goes here
// big endian most significant in lower address
if (i < numSlots) {
int hdNo = i / 8;
int offset = i % 8;
return (header[hdNo] & (0x1 << offset)) != 0;
return false;
* Abstraction to fill or clear a slot on this page.
private void markSlotUsed(int i, boolean value) {
// some code goes here
// not necessary for lab1
if (i < numSlots) {
int hdNo = i / 8;
int offset = i % 8;
byte mask = (byte) (0x1 << offset);
if (value) {
header[hdNo] |= mask;
} else {
header[hdNo] &= ~mask;
protected class HeapPageTupleIterator implements Iterator<Tuple> {
private final Iterator<Tuple> iter;
public HeapPageTupleIterator() {
ArrayList<Tuple> tupleArrayList = new ArrayList<Tuple>(numSlots);
for (int i = 0; i < numSlots; i++) {
if (isSlotUsed(i)) {
tupleArrayList.add(i, tuples[i]);
iter = tupleArrayList.iterator();
public void remove() {
throw new UnsupportedOperationException("TupleIterator: remove not supported");
public boolean hasNext() {
return iter.hasNext();
public Tuple next() {
return iter.next();
* @return an iterator over all tuples on this page (calling remove on this iterator throws an UnsupportedOperationException)
* (note that this iterator shouldn't return tuples in empty slots!)
public Iterator<Tuple> iterator() {
return new HeapPageTupleIterator();

HeapPage 里面分为 header 和tuples numSlots 描述整个可以使用tuple 的个数,剩下的就是进本的io 操作