自己实现数据库(3)BufferPool

作者 chauncy 日期 2020-04-07
db
自己实现数据库(3)BufferPool

自己实现数据库(3)BufferPool

BufferPool 在数据的重要性不证明自明,素有的数据IO 都需通过BufferPool 来获取,同时系统也要保证多个线程并发访问的正确性,已经也要负责将BufferPool 的数据进行flush 到磁盘持久化的过程

BufferPool 的基本变量

private static final int PAGE_SIZE = 4096; // 4096
private static int pageSize = PAGE_SIZE;
/** Default number of pages passed to the constructor. This is used by
other classes. BufferPool should use the numPages argument to the
constructor instead. */
public static final int DEFAULT_PAGES = 500;
// page buffer; PageId -> page
private ConcurrentHashMap<PageId, Page> pgBufferPool;
private int capacity;
private LockManager lockMgr;
private static int TRANSATION_FACTOR = 2;
// timeout 1s for deadlock detection
private static int DEFAUT_MAXTIMEOUT = 5000;
  • pageSize 每一页的大小,数据库在读取数据的时候都是通过最小单位page 进行读取

  • pgBufferPool 里面存储了 pageId,page 的映射关系

  • lockMgr lock 是用于在多线程下面进行访问的时候对锁的管理

获取pgae

public Page getPage(TransactionId tid, PageId pid, Permissions perm)
throws TransactionAbortedException, DbException {
// some code goes here
LockManager.LockType lockType;
if (perm == Permissions.READ_ONLY) {
lockType = LockManager.LockType.SLock;
} else {
lockType = LockManager.LockType.XLock;
}
Debug.log(pid.toString() + ": before acquire lock\n");
lockMgr.acquireLock(tid, pid, lockType, DEFAUT_MAXTIMEOUT);
Debug.log(pid.toString() + ": acquired the lock\n");
Page pg;
if (pgBufferPool.containsKey(pid)) {
pg = pgBufferPool.get(pid);
} else {
if (pgBufferPool.size() >= capacity) {
evictPage();
}
pg = Database
.getCatalog()
.getDatabaseFile(pid.getTableId())
.readPage(pid);
pgBufferPool.put(pid, pg);
}
return pg;
}

getPage 是通过 事务的id(TransactionId) ,pageId,读写权限(就是获取锁那种类型的锁,共享or 排它)

  • 首先通过lockMgr,获取lock(lockMgr 等会再说)

  • 通过pageId 从 pgBufferPool 获取 page

  • 如果pgBufferPool 里面没有当前的pageId,同时pgBufferPool 已经满了,则需要淘汰一部分page,否则通过 DbFile(之前说过了)获取一个page,然后在放入到buffer pool 里面

插入数据

public void insertTuple(TransactionId tid, int tableId, Tuple t)
throws DbException, IOException, TransactionAbortedException {
DbFile tableFile = Database.getCatalog().getDatabaseFile(tableId);
ArrayList<Page> affected = tableFile.insertTuple(tid, t);
for (Page newPg : affected) {
newPg.markDirty(true, tid);
pgBufferPool.remove(newPg.getId());
pgBufferPool.put(newPg.getId(), newPg);
}
}

插入一个数据 一按照一个tuple 进行插入的,逻辑还是比较清楚简单的

  • 获取的dbfile
  • 通过db file 获取需要变更的 page
  • 需要变更的page 从 buffer pool 里面移除,这里需要将page 标记为dirty,因为flush 是通过判断是和否进行了更改,从而判断是和否需要数据落盘
  • 同时也需要更新buffer pool 里面的page 等待后面的flush

flush 数据

private synchronized void flushPage(PageId pid) throws IOException {
if (pgBufferPool.containsKey(pid)) {
Page p = pgBufferPool.get(pid);
TransactionId dirtier = p.isDirty();
if (dirtier != null) {
Database.getLogFile().logWrite(dirtier, p.getBeforeImage(), p);
Database.getLogFile().force();
DbFile tb = Database.getCatalog().getDatabaseFile(p.getId().getTableId());
p.markDirty(false, null);
tb.writePage(p);
}
}
}

flushPage,就是将page flush 到磁盘

  • 通过 pageId,从buffer pool 里面获取 page
  • 判断page 的标记文dirty,是否存在(每一次更改会更改这个bit 位)
  • 然后写入到getLogFile 记录 log, 之后调用force 进行存储到磁盘
  • 最后写入到DbFile 里面

lock

public synchronized void acquireLock(TransactionId tid, PageId pid, LockType reqLock, int maxTimeout)
throws TransactionAbortedException {
// boolean isAcquired = false;
long start = System.currentTimeMillis();
Random rand = new Random();
long randomTimeout = rand.nextInt((maxTimeout - 0) + 1) + 0;
while (true) {
if (lockTable.containsKey(pid)) {
// page is locked by some transaction
if (lockTable.get(pid).getType() == LockType.SLock) {
if (reqLock == LockType.SLock) {
updateTransactionTable(tid, pid);
assert lockTable.get(pid).addHolder(tid) != null;
// isAcquired = true;
return;
} else {
// request XLock
if (transactionTable.containsKey(tid) && transactionTable.get(tid).contains(pid)
&& lockTable.get(pid).getHolders().size() == 1) {
// sanity check
assert lockTable.get(pid).getHolders().get(0) == tid;
// this is a combined case when lock on pid hold only by one trans (which is exactly tid)
lockTable.get(pid).tryUpgradeLock(tid);
// isAcquired = true;
return;
} else {
// all need to do is just blocking
block(pid, start, randomTimeout);
}
}
} else {
// already get a Xlock on pid
if (lockTable.get(pid).getHolders().get(0) == tid) {
// Xlock means only one holder
// request xlock or slock on the pid with that tid
// sanity check
assert lockTable.get(pid).getHolders().size() == 1;
// isAcquired = true;
return;
} else {
// otherwise block
block(pid, start, randomTimeout);
}
}
} else {
ArrayList<TransactionId> initialHolders = new ArrayList<>();
initialHolders.add(tid);
lockTable.put(pid, new ObjLock(reqLock, pid, initialHolders));
updateTransactionTable(tid, pid);
// isAcquired = true;
return;
}
}
}

lock 使用了一个 ConcurrentHashMap lockTable 用于存储每一个page 他使用锁的情况

同时锁分为 SLock XLock 分别为 共享锁,排它锁 就读写锁 读写冲突,写写冲突

ObjLock 里面使用了 一个list 用于存储那些transaction 正在使用它的lock

class ObjLock {
LockType type;
PageId obj;
ArrayList<TransactionId> holders;
public ObjLock(LockType t, PageId obj, ArrayList<TransactionId> holders) {
// this.blocked = false;
this.type = t;
this.obj = obj;
this.holders = holders;
}
public boolean tryUpgradeLock(TransactionId tid) {
if (type == LockType.SLock && holders.size() == 1 && holders.get(0).equals(tid)) {
type = LockType.XLock;
return true;
}
return false;
}
public TransactionId addHolder(TransactionId tid) {
if (type == LockType.SLock) {
if (!holders.contains(tid)) {
holders.add(tid);
}
return tid;
}
return null;
}
}

获取锁的流程

  • 通过 pageId 获取lockTable 里面的 ObjLock 对象

  • 判断lock 的类型 读or 写

  • 如果是共享锁,调用 objlock 里面的addHolder 将 当前的transactionId 添加到list 里面 然后返回

  • 如果是写锁,需要进行判断 holder 里面的transaction id 与当前的tranasactionid 进行比较,如果是当前的事务,则需要进行锁的升级(有可能之前的事务,是read lock,现在需要写数据,所以需要进行一个锁升级)

  • 如果 当前的lock的transaction 与当前的transaction 不等,或者已经有其他的写死已经在holder 里面则,需要等待 block,block 的方法其实就是调用一个wait 方法

释放锁

public synchronized void releaseLock(TransactionId tid, PageId pid) {
// remove from trans table
if (transactionTable.containsKey(tid)) {
transactionTable.get(tid).remove(pid);
if (transactionTable.get(tid).size() == 0) {
transactionTable.remove(tid);
}
}
// remove from locktable
if (lockTable.containsKey(pid)) {
lockTable.get(pid).getHolders().remove(tid);
if (lockTable.get(pid).getHolders().size() == 0) {
// no more threads are waiting here
lockTable.remove(pid);
} else {
// ObjLock lock = lockTable.get(pid);
// synchronized (lock) {
notifyAll();
//}
}
}
}

释放比较简单,将obj lock 里面的holder 移除,如果里面还有其他的lock,则调用notifyall 对之前的 wait 进行唤醒