自己实现一个简单的DB(二)

作者 chauncy 日期 2020-01-15
db
自己实现一个简单的DB(二)

自己实现一个简单的DB(2) -底层的存储

数据库为了能够高效的吞吐,同时也要能够持久化数据,而持久化数据就是将我们的
数据放入到我们的磁盘(机械磁盘,ssd nvme 等)

为了能够适应不同的存储获取方式我们需要一个接口来进行一个定义,然后子类自己去实现各种具体的存储

public interface DbFile {
/**
* 通过pageId,读取磁盘里面的数据,每次读取一个page
*/
public Page readPage(PageId id);
/**
* 将 一个page 的内容写入到磁盘
* @param p
* @throws IOException
*/
public void writePage(Page p) throws IOException;
/**
* 通过事务将一个tuple 插入到 db file 里面,这个是需要获取一个lock,来讲进行报货并发的错误
* @param tid
* @param t
* @return
* @throws DbException
* @throws IOException
* @throws TransactionAbortedException
*/
public ArrayList<Page> insertTuple(TransactionId tid, Tuple t)
throws DbException, IOException, TransactionAbortedException;
/**
* 删除指定tuple,在一个事务里面,同理也需要获取一个lock
* @param tid
* @param t
* @return
* @throws DbException
* @throws TransactionAbortedException
*/
public Page deleteTuple(TransactionId tid, Tuple t)
throws DbException, TransactionAbortedException;
/**
* 获取一个 iterator,在获取数据的db file 里面,都是通过buffer pool,来获取,
* buffer pool 则是通过 iterator 来一个一个tuple 进行的获取,所以返回的是一个iterator
* @param tid
* @return
*/
public DbFileIterator iterator(TransactionId tid);
/**
* 返回 db file 的id,这个id与 catalog 进行,管理这个到后面会进行实现,先明白他是一个唯一表示服务就行
* @return
*/
public int getId();
/**
* 返回这个db file 的一个描述信息,即是数据库的字段描述
* @return
*/
public TupleDesc getTupleDesc();
}

Db File 是数据库与磁盘的一个交互定义,定义了怎么获取在磁盘中的信息,然后db file 的访问都是通过 Buffer Pool 进行访问的,访问的方式是通过iterator 迭代器的模式,这个可以直接联系到就是我们在使用sql 查询的时候是需要limit 操作一样,如果数据类容过多不可能一次返回所有数据,需要分批获取,数据获取主要是通过Buffer Pool 获取主要是db 需要有一个自己的淘汰策略和数据的事务,这样就不依赖与操作系统的,因为他不是可控。

在目前的RDBMS 中,DBfile 表现的形式有 B+ tree 类型,也有直接在堆 上面的 Heap 的形式

简单一点,先从Heap 开始

/**
* heap 实现的Db File
*/
public class HeapFile implements DbFile {
private final File dbFile;
private final TupleDesc tupleDesc;
/**
* construction
* @param f
* @param td
*/
public HeapFile(File f, TupleDesc td) {
// some code goes here
this.dbFile = f;
this.tupleDesc = td;
}
/**
* 返回 heap file
* @return
*/
public File getFile() {
return dbFile;
}
/**
* 返回一个id,每次都返回同样的一个 id,这里简单一点直接返回绝对路径的 hash code
* @return
*/
public int getId() {
return dbFile.getAbsoluteFile().hashCode();
}
/**
* 返回描述符
* @return
*/
public TupleDesc getTupleDesc() {
return tupleDesc;
}
/**
* 读取db file 里面的一页数据
* pageId,里面获取 读取第几页pageNo
* 通过buffer pool 获得 每一页的大小 pageSize
* 读取 pageNo * pageSize 之后的数据,因为我们是分页读取的,所以需要skip 掉
* pageNo * pageSize
* @param pid
* @return
*/
public Page readPage(PageId pid) {
int tableid = pid.getTableId();
int pgNo = pid.pageNumber();
final int pageSize = Database.getBufferPool().getPageSize();
byte[] rawPgData = HeapPage.createEmptyPageData();
// random access read from disk
FileInputStream in = null;
try {
in = new FileInputStream(dbFile);
try {
in.skip(pgNo * pageSize);
in.read(rawPgData);
return new HeapPage(new HeapPageId(tableid, pgNo), rawPgData);
} catch (IOException e) {
throw new IllegalArgumentException("HeapFile: readPage:");
} finally {
in.close();
}
} catch (IOException e) {
throw new IllegalArgumentException("HeapFile: readPage: file not found");
}
}
/**
* 这里和读取类似,需要做一个分页处理,写在特定的位置上面
* @param page
* @throws IOException
*/
public void writePage(Page page) throws IOException {
PageId pid = page.getId();
int pgNo = pid.pageNumber();
final int pageSize = Database.getBufferPool().getPageSize();
byte[] pgData = page.getPageData();
RandomAccessFile dbfile = new RandomAccessFile(dbFile, "rws");
dbfile.skipBytes(pgNo * pageSize);
dbfile.write(pgData);
}
/**
* 返回该文件的多少页
* @return
*/
public int numPages() {
// some code goes here
int fileSizeinByte = (int) dbFile.length();
return fileSizeinByte / Database.getBufferPool().getPageSize();
}
/**
*
* 在一个事务里面,添加一个tuple
* 首先我们去 buffer pool 里面寻找一个page ,如果找不到
* page 我们就新建一个page
* 然后在page里面插入 tuple
* 如果我们的page 还么有溢出那么就需要记录起来,通过list 记录起来,等待后面进行flush 刷新
* 如果超过我们现在buffer 了,则直接进行append,到file 里面
*
* @return
* @throws DbException
* @throws IOException
* @throws TransactionAbortedException
*/
public ArrayList<Page> insertTuple(TransactionId tid, Tuple t)
throws DbException, IOException, TransactionAbortedException {
ArrayList<Page> affected = new ArrayList(1);
int numPages = numPages();
for (int pgNo = 0; pgNo <= numPages; pgNo++) {
HeapPageId pid = new HeapPageId(getId(), pgNo);
HeapPage pg;
if (pgNo < numPages) {
pg = (HeapPage) Database.getBufferPool().getPage(tid, pid, Permissions.READ_WRITE);
} else {
// pgNo = numpages -> we need add new page
pg = new HeapPage(pid, HeapPage.createEmptyPageData());
}
if (pg.getNumEmptySlots() > 0) {
// insert will update tuple when inserted
pg.insertTuple(t);
// writePage(pg);
if (pgNo < numPages) {
pg.markDirty(true, tid);
affected.add(pg);
} else {
// should append the dbfile
writePage(pg);
}
return affected;
}
}
throw new DbException("HeapFile: InsertTuple: Tuple can not be added");
}
/**
* 跟上插入tuple 有一定的类同
* 只是这里不会新建一个page,同时也需要记录更改过的page,方便后续进行flush 操作
* @param tid
* @param t
* @return
* @throws DbException
* @throws TransactionAbortedException
*/
public ArrayList<Page> deleteTuple(TransactionId tid, Tuple t) throws DbException,
TransactionAbortedException {
// some code goes here
// not necessary for lab1
ArrayList<Page> affected = new ArrayList(1);
RecordId rid = t.getRecordId();
HeapPageId pid = (HeapPageId) rid.getPageId();
if (pid.getTableId() == getId()) {
// int pgNo = pid.pageNumber();
HeapPage pg = (HeapPage) Database.getBufferPool().getPage(tid, pid, Permissions.READ_WRITE);
pg.deleteTuple(t);
// writePage(pg);
pg.markDirty(true, tid);
affected.add(pg);
return affected;
}
throw new DbException("HeapFile: deleteTuple: tuple.tableid != getId");
}
/**
* DB File iterator 的迭代器,之前说过,db 的获取是通过 iterator 获取的
*/
private class HeapFileIterator implements DbFileIterator {
private Integer pgCursor;
private Iterator<Tuple> tupleIter;
private final TransactionId transactionId;
private final int tableId;
private final int numPages;
public HeapFileIterator(TransactionId tid) {
this.pgCursor = null;
this.tupleIter = null;
this.transactionId = tid;
this.tableId = getId();
this.numPages = numPages();
}
@Override
public void open() throws DbException, TransactionAbortedException {
pgCursor = 0;
tupleIter = getTupleIter(pgCursor);
}
@Override
public boolean hasNext() throws DbException, TransactionAbortedException {
// < numpage - 1
if (pgCursor != null) {
while (pgCursor < numPages - 1) {
if (tupleIter.hasNext()) {
return true;
} else {
pgCursor += 1;
tupleIter = getTupleIter(pgCursor);
}
}
return tupleIter.hasNext();
} else {
return false;
}
}
@Override
public Tuple next() throws DbException, TransactionAbortedException, NoSuchElementException {
if (hasNext()) {
return tupleIter.next();
}
throw new NoSuchElementException("HeapFileIterator: error: next: no more elemens");
}
@Override
public void rewind() throws DbException, TransactionAbortedException {
close();
open();
}
@Override
public void close() {
pgCursor = null;
tupleIter = null;
}
private Iterator<Tuple> getTupleIter(int pgNo)
throws TransactionAbortedException, DbException {
PageId pid = new HeapPageId(tableId, pgNo);
return ((HeapPage)
Database
.getBufferPool()
.getPage(transactionId,Permissions.READ_ONLY))
.iterator();
}
}
//
public DbFileIterator iterator(TransactionId tid) {
return new HeapFileIterator(tid);
}
}

Db File 存储的是真是数据库的数据,读取他的方式是通过 iterator 迭代器的方式,通过buffer pool 进行获取,每一次读取Db File 的最小大小是page 为单位,并且是他的整数倍,page 里面存储的就是数据是tuple ,tuple里面存储的是数值字段值

page 里面也有HeapPage 的实现,它主要是用于存储 Db File 里面的数据读入到这个page 里面来

public class HeapPage implements Page {
final HeapPageId pid; //page Id,一种heap 的形式
final TupleDesc td; //tuple 的desc
final byte header[]; //头信息
final Tuple tuples[]; //tuple 的元素
final int numSlots;
byte[] oldData;
private final Byte oldDataLock = new Byte((byte) 0);
/**
* Create a HeapPage from a set of bytes of data read from disk.
* The format of a HeapPage is a set of header bytes indicating
* the slots of the page that are in use, some number of tuple slots.
* Specifically, the number of tuples is equal to: <p>
* floor((BufferPool.getPageSize()*8) / (tuple size * 8 + 1))
* <p> where tuple size is the size of tuples in this
* database table, which can be determined via {@link Catalog#getTupleDesc}.
* The number of 8-bit header words is equal to:
* <p>
* ceiling(no. tuple slots / 8)
* <p>
*
* @see Database#getCatalog
* @see Catalog#getTupleDesc
* @see BufferPool#getPageSize()
*/
public HeapPage(HeapPageId id, byte[] data) throws IOException {
this.pid = id;
this.td = Database.getCatalog().getTupleDesc(id.getTableId());
this.numSlots = getNumTuples();
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(data));
// allocate and read the header slots of this page
header = new byte[getHeaderSize()];
for (int i = 0; i < header.length; i++)
header[i] = dis.readByte();
tuples = new Tuple[numSlots];
try {
// allocate and read the actual records of this page
for (int i = 0; i < tuples.length; i++)
tuples[i] = readNextTuple(dis, i);
} catch (NoSuchElementException e) {
e.printStackTrace();
}
dis.close();
setBeforeImage();
}
/**
* Retrieve the number of tuples on this page.
*
* @return the number of tuples on this page
*/
private int getNumTuples() {
return (BufferPool.getPageSize() * 8) / (td.getSize() * 8 + 1);
}
/**
* Computes the number of bytes in the header of a page in a HeapFile with each tuple occupying tupleSize bytes
*
* @return the number of bytes in the header of a page in a HeapFile with each tuple occupying tupleSize bytes
*/
private int getHeaderSize() {
return (getNumTuples() + 7) / 8;
}
/**
* Return a view of this page before it was modified
* -- used by recovery
*/
public HeapPage getBeforeImage() {
try {
byte[] oldDataRef = null;
synchronized (oldDataLock) {
oldDataRef = oldData;
}
return new HeapPage(pid, oldDataRef);
} catch (IOException e) {
e.printStackTrace();
//should never happen -- we parsed it OK before!
System.exit(1);
}
return null;
}
public void setBeforeImage() {
synchronized (oldDataLock) {
oldData = getPageData().clone();
}
}
/**
* @return the PageId associated with this page.
*/
public HeapPageId getId() {
// some code goes here
return this.pid;
}
/**
* Suck up tuples from the source file.
*/
private Tuple readNextTuple(DataInputStream dis, int slotId) throws NoSuchElementException {
// if associated bit is not set, read forward to the next tuple, and
// return null.
if (!isSlotUsed(slotId)) {
for (int i = 0; i < td.getSize(); i++) {
try {
dis.readByte();
} catch (IOException e) {
throw new NoSuchElementException("error reading empty tuple");
}
}
return null;
}
// read fields in the tuple
Tuple t = new Tuple(td);
RecordId rid = new RecordId(pid, slotId);
t.setRecordId(rid);
try {
for (int j = 0; j < td.numFields(); j++) {
Field f = td.getFieldType(j).parse(dis);
t.setField(j, f);
}
} catch (java.text.ParseException e) {
e.printStackTrace();
throw new NoSuchElementException("parsing error!");
}
return t;
}
/**
* Generates a byte array representing the contents of this page.
* Used to serialize this page to disk.
* <p>
* The invariant here is that it should be possible to pass the byte
* array generated by getPageData to the HeapPage constructor and
* have it produce an identical HeapPage object.
*
* @return A byte array correspond to the bytes of this page.
* @see #HeapPage
*/
public byte[] getPageData() {
int len = BufferPool.getPageSize();
ByteArrayOutputStream baos = new ByteArrayOutputStream(len);
DataOutputStream dos = new DataOutputStream(baos);
// create the header of the page
for (int i = 0; i < header.length; i++) {
try {
dos.writeByte(header[i]);
} catch (IOException e) {
// this really shouldn't happen
e.printStackTrace();
}
}
// create the tuples
for (int i = 0; i < tuples.length; i++) {
// empty slot
if (!isSlotUsed(i)) {
for (int j = 0; j < td.getSize(); j++) {
try {
dos.writeByte(0);
} catch (IOException e) {
e.printStackTrace();
}
}
continue;
}
// non-empty slot
for (int j = 0; j < td.numFields(); j++) {
Field f = tuples[i].getField(j);
try {
f.serialize(dos);
} catch (IOException e) {
e.printStackTrace();
}
}
}
// padding
int zerolen = BufferPool.getPageSize() - (header.length + td.getSize() * tuples.length); //- numSlots * td.getSize();
byte[] zeroes = new byte[zerolen];
try {
dos.write(zeroes, 0, zerolen);
} catch (IOException e) {
e.printStackTrace();
}
try {
dos.flush();
} catch (IOException e) {
e.printStackTrace();
}
return baos.toByteArray();
}
/**
* Static method to generate a byte array corresponding to an empty
* HeapPage.
* Used to add new, empty pages to the file. Passing the results of
* this method to the HeapPage constructor will create a HeapPage with
* no valid tuples in it.
*
* @return The returned ByteArray.
*/
public static byte[] createEmptyPageData() {
int len = BufferPool.getPageSize();
return new byte[len]; //all 0
}
/**
* Delete the specified tuple from the page; the tuple should be updated to reflect
* that it is no longer stored on any page.
*
* @param t The tuple to delete
* @throws DbException if this tuple is not on this page, or tuple slot is
* already empty.
*/
public void deleteTuple(Tuple t) throws DbException {
// some code goes here
// not necessary for lab1
assert t != null;
RecordId recToDelete = t.getRecordId();
if (recToDelete != null && pid.equals(recToDelete.pageId)) {
for (int i = 0; i < numSlots; i++) {
if (isSlotUsed(i) && t.getRecordId().equals(tuples[i].getRecordId())) {
markSlotUsed(i, false);
// t.setRecordId(null);
tuples[i] = null;
return;
}
}
throw new DbException("deleteTuple: Error: tuple slot is empty");
}
throw new DbException("deleteTuple: Error: tuple is not on this page");
}
/**
* Adds the specified tuple to the page; the tuple should be updated to reflect
* that it is now stored on this page.
*
* @param t The tuple to add.
* @throws DbException if the page is full (no empty slots) or tupledesc
* is mismatch.
*/
public void insertTuple(Tuple t) throws DbException {
// some code goes here
// not necessary for lab1
assert t != null;
if (td.equals(t.getTupleDesc())) {
for (int i = 0; i < numSlots; i++) {
if (!isSlotUsed(i)) {
// insert and update header
markSlotUsed(i, true);
t.setRecordId(new RecordId(pid, i));
tuples[i] = t;
return;
}
}
throw new DbException("insertTuple: ERROR: no tuple is inserted");
}
throw new DbException("insertTuple: no empty slots or tupledesc is mismatch");
}
// private boolean dirty;
private TransactionId dirtier;
/**
* Marks this page as dirty/not dirty and record that transaction
* that did the dirtying
*/
public void markDirty(boolean dirty, TransactionId tid) {
// some code goes here
// not necessary for lab1
if (dirty) {
// this.dirty = dirty;
this.dirtier = tid;
} else {
// this.dirty = false;
this.dirtier = null;
}
}
/**
* Returns the tid of the transaction that last dirtied this page, or null if the page is not dirty
*/
public TransactionId isDirty() {
// some code goes here
// Not necessary for lab1
return dirtier;
}
/**
* Returns the number of empty slots on this page.
*/
public int getNumEmptySlots() {
// some code goes here
int numEmptSlot = 0;
for (int i = 0; i < numSlots; i++) {
if (!isSlotUsed(i)) {
numEmptSlot += 1;
}
}
return numEmptSlot;
}
/**
* Returns true if associated slot on this page is filled.
*/
public boolean isSlotUsed(int i) {
// some code goes here
// big endian most significant in lower address
if (i < numSlots) {
int hdNo = i / 8;
int offset = i % 8;
return (header[hdNo] & (0x1 << offset)) != 0;
}
return false;
}
/**
* Abstraction to fill or clear a slot on this page.
*/
private void markSlotUsed(int i, boolean value) {
// some code goes here
// not necessary for lab1
if (i < numSlots) {
int hdNo = i / 8;
int offset = i % 8;
byte mask = (byte) (0x1 << offset);
if (value) {
header[hdNo] |= mask;
} else {
header[hdNo] &= ~mask;
}
}
}
protected class HeapPageTupleIterator implements Iterator<Tuple> {
private final Iterator<Tuple> iter;
public HeapPageTupleIterator() {
ArrayList<Tuple> tupleArrayList = new ArrayList<Tuple>(numSlots);
for (int i = 0; i < numSlots; i++) {
if (isSlotUsed(i)) {
tupleArrayList.add(i, tuples[i]);
}
}
iter = tupleArrayList.iterator();
}
@Override
public void remove() {
throw new UnsupportedOperationException("TupleIterator: remove not supported");
}
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public Tuple next() {
return iter.next();
}
}
/**
* @return an iterator over all tuples on this page (calling remove on this iterator throws an UnsupportedOperationException)
* (note that this iterator shouldn't return tuples in empty slots!)
*/
public Iterator<Tuple> iterator() {
return new HeapPageTupleIterator();
}
}

HeapPage 里面分为 header 和tuples numSlots 描述整个可以使用tuple 的个数,剩下的就是进本的io 操作