simple-db

simple-db

code:Goinggoinggoing/simple-db-hw-2021 (github.com)

基于java语言,实现一个简易事务支持的关系型数据库

难度:lab4 = lab5 > lab6 >>>>>> lab3 > lab2 > lab1

  • lab1 实现基本的数据结构
    tuple, page, tupleDesc, iterator等等,难度不大

  • lab2 实现scan iterator

    ​ 基于scan iterator 来实现各种聚合函数,比如avg,count,sum,join等

  • lab3 join 优化

    ​ 建立一个优化模型, 按照主键,非主键,scan 表代价,直方图等进行成本估计,根据估计值来确定多表join的顺序

  • lab 4 事务以及锁

    ​ 这一章相对较难,要自己实现一个简单的读写锁,但是6.830中简化了,实现了page-level的锁,粒度比较粗,还有多种死锁的情况,test很给力,建议在写的时候一定要看清楚是哪个transaction 拿到了哪些page的哪些lock,而且这里的代码会影响到后面的lab 5、6,这里主要是按照两阶段锁协议并且no steal / force 的策略

    ​ 代码中实现基于TimeoutWait-for GraphGlobal Orderings(wait-die)死锁检测算法

  • lab 5 B+ 树索引(TODO)

    ​ 实现B+树索引,插入、删除、修改,难点在于要把B+树结构以及这三种操作逻辑要捋清楚,还有父节点,子节点;叶子兄弟节点,非叶子节点的指针问题,以及一些边界条件。

  • lab 6 实现基于 log的rollback 和 recover

    ​ lab中并没有真正存在undo log 和redo log,日志结构比较简单,只需要根据偏移处理即可,可以理解成是逻辑上的undo log 和 redo log。基于UNDO日志实现STEAL/NO FORCE策略,提供更灵活的缓冲区管理;实现基本的WAL(Write-Ahead Logging)策略实现事务回滚与恢复

lab1

Database Catalog Table(DbFile,多个page) HeapPage( []Tuple ) Tuple( []Field) Field

image-20230601175517493
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
Tuple:一行记录的内容
private TupleDesc tupleDesc;
private Field[] fields;
private RecordId recordId; table-page-slot

TupleDesc: 单表或单行每列的类别信息
private List<TDItem> tdItems; TDItem(fieldType, fieldName)

Catalog: 所有表信息,加载时载入内存
public HashMap<Integer, Table> tables; key是DbFile.getId() 也就是tableid,hash(path)
Table:DbFile file, String name, String pkeyField 文件、表名、主键
DbFile(HeapFile): tableid 、TupleDesc、File(文件路径)
HeapFile:单张表
包含多个page, readPage(pid)获得;读入单个page时,seek跳过前面



BufferPool: 缓存的page
private Map<PageId, Page> pageCache;
PageId:tableId、pgNo 哪张表的第几个page
map中没有则调用dbFile.readPage(pid); dbFile被catalog存下了



HeapPageId: tableId pgNo 哪个表的第几page

RecordId: PageId tupleno 哪个page哪一个slot

HeapPage: (HeapPageId, data[]) 从data[]中读取数据,转为一个page,data包含header data来自HeapFile.read
HeapPageId 哪个表的第几page
header[getHeaderSize()] 每个slot是否有数据
Tuple[numSlots] 每一行内容,每一行包含rid ,也就是(pid, slotId)
td 行结构

f:
getNumTuples(): floor((_page size_ * 8) / (_tuple size_ * 8 + 1))
getHeaderSize(): Math.ceil(getNumTuples() * 1.0 / 8);

iterator() 返回数据tuple的迭代,基于List的Iterator

ex5
HeapFile HeapFile(File f, TupleDesc td);
readPage(PageId pid) BufferPool会调用
randomAccess.seek(pageSize * pageNumber);
randomAccess.read(buffer);
return new HeapPage((HeapPageId) pid, buffer);

writePage(page) 修改磁盘内容,写入page,如果pagenum>当前则相当于插入
iterator(tid)返回整个表的迭代,一页一页从bufferpool中读,每页内部会调用heappage.iterator



ex6
SeqScan implements OpIterator 对HeapFile包装, 为最基础的select *
TransactionId tid, int tableid, String tableAlias; 包含别名,
之后的操作都是这样的,包含next hashnext 每次取出tuple

运算符是基于迭代器的;每个运算符都实现了 DbIterator 接口。
较低级别的运算符传递到较高级别运算符的构造函数中,使他们串联起来。叶子节点
Operators are connected together into a plan by passing lower-level operators into the constructors of higher-level operators, i.e., by 'chaining them together.' Special access method operators at the leaves of the plan are responsible for reading data from the disk (and hence do not have any operators below them).

At the top of the plan, the program interacting with SimpleDB simply calls getNext on the root operator; this operator then calls getNext on its children, and so on, until these leaf operators are called. They fetch tuples from disk and pass them up the tree (as return arguments to getNext); tuples propagate up the plan in this way until they are output at the root or combined or rejected by another operator in the plan.

For this lab, you will only need to implement one SimpleDB operator.

HeapPage:

对于一个只有两个int的表,单个page中,共4096B的数据,Tuple行数为4096* 8 / 8*8+1 = 504行,headsize=504/8= 63B

也就是前63B都是head(484个空),数据为00007cbd、7fffffff。创建过程在HeapPageReadTest.java

image-20230420143257673

总结

数据获取,txt转为HeapPage可读取的文件格式

1
2
3
4
5
6
7
8
9
10
11
12
13
4066*8/3*4*8+1 = 337   337/8 = 43B
原始数据 java -jar dist/simpledb.jar convert some_data_file.txt 3
1,1,1
2,2,2
3,4,4

.bat 0000 0111 第一个byte低三位代表有三个tuple
0700 0000 0000 0000 0000 0000 0000 0000
0000 0000 0000 0000 0000 0000 0000 0000
0000 0000 0000 0000 0000 00|00 0000 0100
0000 0100 0000 0100 0000 0200 0000 0200
0000 0200 0000 0300 0000 0400 0000 0400
0000 0000 0000 0000 0000 0000 0000 0000
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public static void main(String[] argv) {
// construct a 3-column table schema
Type types[] = new Type[]{ Type.INT_TYPE, Type.INT_TYPE, Type.INT_TYPE };
String names[] = new String[]{ "field0", "field1", "field2" };
TupleDesc descriptor = new TupleDesc(types, names);

// create the table, associate it with some_data_file.dat
// and tell the catalog about the schema of this table.
HeapFile table1 = new HeapFile(new File("some_data_file.dat"), descriptor);
Database.getCatalog().addTable(table1, "test");

// construct the query: we use a simple SeqScan, which spoonfeeds
// tuples via its iterator.
TransactionId tid = new TransactionId();
SeqScan f = new SeqScan(tid, table1.getId());

try {
// and run it
f.open();
while (f.hasNext()) {
Tuple tup = f.next();
System.out.println(tup);
}
f.close();
Database.getBufferPool().transactionComplete(tid);
} catch (Exception e) {
System.out.println ("Exception : " + e);
}
}

lab2

基本操作,需要继承Operator,实现fetchNext, Operator implements OpIterator

image-20230426164343396

基于装饰器模式,

Filter

和某个field比较,并过滤

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Predicate(int field, Op op, Field operand)
public boolean filter(Tuple t) 将传入的tuple和构造函数中的值比较

Filter(Predicate p, OpIterator child) 对child(SeqScan)进行predicte过滤,只保留true
next()方法获取下一个,依靠fetchNext,需要重写
fetchNext:
while (child.hasNext()){
Tuple next = child.next();
if (predicate.filter(next)){
return next;
}
}
return null;
@Test public void filterSomeLessThan() throws Exception {
this.scan = new TestUtil.MockScan(-5, 5, testWidth); // -5~5
Predicate pred;
pred = new Predicate(0, Predicate.Op.LESS_THAN, TestUtil.getField(2)); // 过滤到-5~2
Filter op = new Filter(pred, scan);
TestUtil.MockScan expectedOut = new TestUtil.MockScan(-5, 2, testWidth);
op.open();
TestUtil.compareDbIterators(op, expectedOut);
op.close();
}

Join

根据JoinPredicate规定的列是否满足op

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
JoinPredicate(int field1, Predicate.Op op, int field2)
filter(Tuple t1, Tuple t2); 用于判断两个tuple的特定field是否相等(满足op),join时需要

Join(JoinPredicate p, OpIterator child1, OpIterator child2)
fetchNext:
双重for遍历child1 child2,留下满足条件的。为了配合实现迭代器,需要额外记录下外层tuple1
while(child1.hasNext() || tuple1 != null){
if (tuple1 == null){
tuple1 = child1.next();
}
while(child2.hasNext()){
Tuple t2 = child2.next();
if(joinPredicate.filter(tuple1, t2)){
Tuple tuple = new Tuple(getTupleDesc());
Iterator<Field> fields = tuple1.fields();
int i = 0;
while(fields.hasNext()){
tuple.setField(i++, fields.next());
}
fields = t2.fields();
while(fields.hasNext()){
tuple.setField(i++, fields.next());
}
return tuple;
}
}
child2.rewind();
tuple1 = null;
}
return null;

scancost(t1) + ntups(t1) x scancost(t2) //IO cost
+ ntups(t1) x ntups(t2) //CPU cost

Aggregate

min max sum count avg

1
2
3
4
5
6
7
8
9
10
11
12
13
IntegerAggregator(int gbfield, Type gbfieldtype, int afield, Op what) 分组idx 分组类型 聚合idx
HashMap<Field, Tuple> aggregate; Field是分组的,映射到结果tuple
'male' -> ('male', 10)
'female' -> ('female', 8)
min max sum count 直接在上面的映射里修改
avg 需要额外维护sum count
HashMap<Field, Integer> countsMap;
HashMap<Field, Integer> sumMap;

// 不断传入tuple,并在agg中计算,iterator返回计算结果
IntegerAggregator agg = new IntegerAggregator(0, Type.INT_TYPE, 1, Aggregator.Op.SUM);
agg.mergeTupleIntoGroup(scan1.next());
agg.iterator();
1
2
3
4
Aggregate(OpIterator child, int afield, int gfield, Aggregator.Op aop);
调用Aggregator的聚合 实现基本的Operator,策略模式
Aggregate op = new Aggregate(scan1, afield=0, gfield=0,
Aggregator.Op.MIN);

insert delete

共HeapPage HeapFile BufferPool三层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
HeapPage: Tuple
Tuple t 插入当前page,所以pid要改成当前HeapPage的,slot按插入的位置来

HeapFile: tid Tuple 返回被操作的DirtyPage ArrayList<Page> (注意操作的page是从pool中拿,会修改pool 但不修改文件)
删除:直接调用HeapPage
插入:先选择一个插入,都满了就增加一个page(需要修改file文件的大小,也就是写入空数据,然后再从pool中拿) 实验中tuple2int,一个page504条,超出后自动增一page

BufferPool: tid Tuple。 去Catalog拿表信息 Database.getCatalog().getDatabaseFile(tableid); 需要把设置返回的为dirty 事务层面
删除:直接调用
插入:需要额外tableId代表插入到哪个表


Database.getBufferPool().insertTuple(tid, empty.getId(), t);

HeapWriteTest 2int一个page可以写504条数据

Page eviction

对于一个2个int的表,共1024*504条数据,也就是1024个page,没有eviction全部存入时63MB ,50pages 3MB

evictPage:扔掉前flush

随机
1
2
3
4
5
6
7
8
9
10
11
12
private synchronized  void evictPage() throws DbException {
Iterator<Map.Entry<PageId, Page>> iterator = pageCache.entrySet().iterator();
if (iterator.hasNext()){
Map.Entry<PageId, Page> next = iterator.next();
PageId pid = next.getKey();
Page page = next.getValue();
if (page.isDirty() != null){
flushPage(pid);
}
iterator.remove();
}
}
FIFO
1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public void modifyData(PageId pageId) {
if (set.contains(pageId)) return ;
queue.offer(pageId);
set.add(pageId);
}

@Override
public PageId getEvictPageId() {
PageId peek = queue.poll();
set.remove(peek);
return peek;
}
LRU:

双向链表+Hash

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class LRUEvict implements EvictStrategy{

Node tail;
Node head;
HashMap<PageId, Node> hashMap;

public void LRUEvict(){
hashMap = new HashMap<>();
tail = new Node(null);
head = new Node(null);
head.next = tail;
tail.prev = head;
}

@Override
public void modifyData(PageId pageId) {
if(hashMap.containsKey(pageId)){
Node node = hashMap.get(pageId);
moveToHead(node);
}else{
Node node = new Node(pageId);
addHead(node);
hashMap.put(pageId, node);
}
}
@Override
public PageId getEvictPageId() {
Node prev = tail.prev;
hashMap.remove(prev.pageId);
removeNode(prev);

return prev.pageId;
}

public void addHead(Node node){
node.next = head.next;
head.next.prev = node;
node.prev = head;
head.next = node;
}

public void moveToHead(Node node){
removeNode(node);
addHead(node);
}

private void removeNode(Node node) {
node.prev.next = node.next;
node.next.prev = node.prev;
}

class Node {
PageId pageId;
Node prev;
Node next;
public Node(PageId pageId) {
this.pageId = pageId;
}
}
}
  • flushPage(pid): 如果pid脏的,就写回磁盘
  • discardPage(pid):直接扔掉某一个page

总结

1
2
3
4
5
SELECT *
FROM some_data_file1,
some_data_file2
WHERE some_data_file1.field1 = some_data_file2.field1
AND some_data_file1.id > 1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public static void main(String[] args) {
// construct a 3-column table schema
Type types[] = new Type[]{Type.INT_TYPE, Type.INT_TYPE, Type.INT_TYPE};
String names[] = new String[]{"field0", "field1", "field2"};

TupleDesc td = new TupleDesc(types, names);

// create the tables, associate them with the data files
// and tell the catalog about the schema the tables.
HeapFile table1 = new HeapFile(new File("lab2_file1.dat"), td);
Database.getCatalog().addTable(table1, "t1");

HeapFile table2 = new HeapFile(new File("lab2_file2.dat"), td);
Database.getCatalog().addTable(table2, "t2");

// 上面的是加载数据库表信息到Catalog,运行在数据库系统启动时
// ------------------------------------------------------------------------------
// 下面是用户执行查询请求

// construct the query: we use two SeqScans, which spoonfeed
// tuples via iterators into join
TransactionId tid = new TransactionId();

SeqScan ss1 = new SeqScan(tid, table1.getId(), "t1");
SeqScan ss2 = new SeqScan(tid, table2.getId(), "t2");

// create a filter for the where condition
Filter sf1 = new Filter(
new Predicate(0,
Predicate.Op.GREATER_THAN, new IntField(1)), ss1);

JoinPredicate p = new JoinPredicate(1, Predicate.Op.EQUALS, 1);
Join j = new Join(p, sf1, ss2);

// and run it
try {
j.open();
while (j.hasNext()) {
Tuple tup = j.next();
System.out.println(tup);
}
j.close();
Database.getBufferPool().transactionComplete(tid);

} catch (Exception e) {
e.printStackTrace();
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
int table1Id = Database.getCatalog().getTableId("t1")

// inset到t1表中
Database.getBufferPool().insertTuple(tid, table1Id, tuple);

// delete tuple中PageId 下 第TupleNumber slot的数据
Database.getBufferPool().deleteTuple(tid, tuple);

// 先读取全部数据
SeqScan ss1 = new SeqScan(tid, table1Id, "t1");

// 过滤field0小于1的行
Filter sf1 = new Filter(
new Predicate(0,
Predicate.Op.GREATER_THAN, new IntField(1)), ss1);

// aggregate afiele=1; gfield=2
Aggregate ag = new Aggregate(sf1, 1, 2, Aggregator.Op.SUM);

// join field1和field1
JoinPredicate p = new JoinPredicate(1, Predicate.Op.EQUALS, 1);
Join j = new Join(p, sf1, ss2);

Query Parser

读取表数据 并运行

1
2
3
4
5
6
7
8
java -jar dist/simpledb.jar parser catalog.txt
lab2_file1 (f1 int, f2 int, f3 int)
lab2_file2 (f1 int, f2 int, f3 int)


select d.f1, d.f2 from lab2_file1 d;

where语句报错?

lab3

  • 通过分析表的统计信息,可以估算不同查询计划的成本。计划的成本与中间连接和选择的元组数量、过滤器和连接谓词的选择性相关。
  • 使用这些统计数据 order joins and selections to get an optimal way

selectivity: 筛选比例

image-20230606152332166

在这个实验中,我们只关注连接和基本表访问序列的成本。我们不必担心访问方法的选择(因为我们只有一种访问方法,即表扫描),也不必考虑其他操作符(如聚合)的成本。

Filter Selectivity

base on histogram,to get ntups with one or more predicates

  • f=consth/w
  • f>consth_b x (b_right - const) / w_b + buckets( b+1…NumB-1)

image-20230606161236443

1
2
3
4
private int[] buckets;

public IntHistogram(int buckets, int min, int max)
public void addValue(int v)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 对于某一个tableid,创建从下标到Histogram的映射
// 扫描两次 第一次求出最大最小,第二次添加
private Map<Integer, IntHistogram> intHistogramMap;
private Map<Integer, StringHistogram> stringHistogramMap;

public TableStats(int tableid, int ioCostPerPage)

// 返回某一个fieldid相对于某个值的比较
public double estimateSelectivity(int field, Predicate.Op op, Field constant)
intHistogramMap.get(field).estimateSelectivity(op, ((IntField) constant).getValue())

// 全表扫描代价 没有寻道时间 且 都不在pool中 IO_COST = 71
public double estimateScanCost() {
return numPage * ioCostPerPage;
}
1
2
// 最后每一个table都生成一个TableStats
private static final ConcurrentMap<String, TableStats> statsMap = new ConcurrentHashMap<>();

Join cardinality

estimate the size (ntups) of t1 join t2

  • for equality joins, primary key > non-primary key
  • For range scans > non-primary key equality 估值product*0.3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public int estimateJoinCardinality(LogicalJoinNode j, int card1, int card2,
boolean t1pkey, boolean t2pkey, Map<String, TableStats> stats) {
switch (joinOp) {
case EQUALS:
if (t1pkey && !t2pkey) {
card = card2;
} else if (!t1pkey && t2pkey) {
card = card1;
} else if (t1pkey && t2pkey) {
card = Math.min(card1, card2);
} else {
card = Math.max(card1, card2);
}
break;
case NOT_EQUALS:
// 记录总数-等值记录数
if (t1pkey && !t2pkey) {
card = card1 * card2 - card2;
} else if (!t1pkey && t2pkey) {
card = card1 * card2 - card1;
} else if (t1pkey && t2pkey) {
card = card1 * card2 - Math.min(card1, card2);
} else {
card = card1 * card2 - Math.max(card1, card2);
}
break;
default:
// 其他记录按范围查询计算
card = (int) (0.3 * card1 * card2);
}

Join cost

p=t1 join t2 join ... tn

1
2
3
scancost(t1) + scancost(t2) + joincost(t1 join t2) +
scancost(t3) + joincost((t1 join t2) join t3) +
...

scancost(t1) : the number of pages in t1 x SCALING_FACTOR

joincost(t1 join t2): = scancost(t1) + ntups(t1) x scancost(t2) //IO cost

​ + ntups(t1) x ntups(t2) //CPU cost

1
2
3
public double estimateJoinCost(LogicalJoinNode j, int card1, int card2,
double cost1, double cost2)
cost1 + card1 * cost2 + card1 * card2;

Join order

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class CostCard {
/** The cost of the optimal subplan */
public double cost;
/** The cardinality of the optimal subplan */
public int card;
/** The optimal subplan */
public List<LogicalJoinNode> plan;
}

public List<LogicalJoinNode> orderJoins(
Map<String, TableStats> stats,
Map<String, Double> filterSelectivities, boolean explain)
throws ParsingException {

// some code goes here
//Replace the following
// some code goes here
//Replace the following
PlanCache planCache = new PlanCache();
CostCard bestCostCard = new CostCard();
int size = joins.size();
for (int i = 1; i <= size; i++) {
// 找出给定size的所有子集
Set<Set<LogicalJoinNode>> subsets = enumerateSubsets(joins, i);
for (Set<LogicalJoinNode> subset : subsets) {
double bestCostSoFar = Double.MAX_VALUE;
for (LogicalJoinNode joinNode : subset) {
CostCard costCard =
computeCostAndCardOfSubplan(stats, filterSelectivities, joinNode, subset, bestCostSoFar, planCache);
if (costCard == null) {
continue;
}
bestCostSoFar = costCard.cost;
bestCostCard = costCard;
}
if (bestCostSoFar != Double.MAX_VALUE) {
planCache.addPlan(subset, bestCostCard.cost, bestCostCard.card, bestCostCard.plan);
}
}
}
if (explain) {
printJoins(bestCostCard.plan, planCache, stats, filterSelectivities);
}
return bestCostCard.plan;
}

"SELECT * FROM emp,dept,hobbies,hobby WHERE emp.c1 = dept.c0 AND hobbies.c0 = emp.c2 AND hobbies.c1 = hobby.c0 AND emp.c3 < 1000;"

image-20230607161413205

emp过滤后1500: 随机数最大值65530,1500≈(1000/65535*10w)

lab4

A transaction is a group of database actions (e.g., inserts, deletes, and reads) that are executed atomically;

  • Atomicity: Strict two-phase locking and careful buffer management ensure atomicity.
  • Consistency: The database is transaction consistent by virtue of atomicity. Other consistency issues (e.g., key constraints) are not addressed in SimpleDB.
  • Isolation: Strict two-phase locking provides isolation.
  • Durability: A FORCE buffer management policy ensures durability (see Section 2.3 below).

核心思想:NO STEAL/FORCE

To simplify your job, we recommend that you implement a NO STEAL/FORCE buffer management policy.

  • You shouldn’t evict dirty (updated) pages from the buffer pool if they are locked by an uncommitted transaction (this is NO STEAL).
    • 由于只在最后刷盘,不需要undo了,失败只要重新从磁盘加载page即可
  • On transaction commit, you should force dirty pages to disk (e.g., write the pages out) (this is FORCE).
    • 假设transactionComplete不会失败,这样就不需要redo log

lock acquire

acquire and release locks in BufferPool

  • Modify getPage() to block and acquire the desired lock before returning a page. 核心方法,阻塞获取锁
  • Implement unsafeReleasePage(tid, pid). This method is primarily used for testing, and at the end of transactions. 释放tid在pid上的锁
  • Implement holdsLock(tid, pid) so that logic in Exercise 2 can determine whether a page is already locked by a transaction. tid是否锁住pid

create data structures that keep track of which locks each transaction holds and check to see if a lock should be granted to a transaction when it is requested:hashmap: pageid->locks

  • 插入 删除 查找都是调用pool的getPage,需要传入正确的permission
  • 被操作的页置为dirty
  • 插入时创建了新的page,能否正常锁定

release

strict two-phase locking:This means that transactions should acquire the appropriate type of lock on any object before accessing that object and shouldn’t release any locks until after the transaction commits.

release a shared lock on a page after scanning it to find empty slots

NO STEAL

只evict非脏页,都是脏页就抛出异常。AbortEvictionTest testAllDirtyFails()测试

transactionComplete

commits or aborts

途中可能抛出TransactionAbortedException 请求超时,死锁 如readpage

  • commit:写回磁盘 释放锁
  • abort:重新从磁盘加载page 释放锁
1
2
3
4
5
6
7
8
public void transactionComplete(TransactionId tid, boolean commit) {	
if (commit) {
flushPages(tid);
}else {
recoverPages(tid);
}
lockManager.completeTransaction(tid);
}

AbortEvictionTest测试 :如果abort 插入的行是否还能找到

Deadlocks and Aborts

detect deadlock and throw a TransactionAbortedException,被捕获后调用transactionComplete

  1. a simple timeout policy(或者retry次数过多)
  2. check for cycles in a dependency graph
  3. 全局排序:每个事务分配一个全局序号 WAIT-DIE(当前高就wait、否则rollback;保证环路中优先级最低的一定die) WOUND-WAIT
    • 或者资源分配一个序号 哲学家,在这里代表着pageid 不好实现 因为pageid的读取顺序和业务相关
    • 可能会中止本该成功的事务

DeadlockTest.java

t1读p1 t2读p2,然后t1写p2(等t2) t2写p1(等t1),就死锁了

TransactionTest.java

n个进程,首先都获取读锁,然后再进行一次删除和插入

锁升级时就会出现死锁等待

timeout
1
2
3
4
5
6
7
8
9
10
11
12
13
long start = System.currentTimeMillis();
long timeout = new Random().nextInt(2000) + 1000;

while (true) {
if (lockManager.acquire(pid, tid, perm)) {
break;
}
long now = System.currentTimeMillis();
if (now - start > timeout) {
throw new TransactionAbortedException();
}
Thread.sleep(100); // 睡眠可以减少竞争 否则肯定过不了测试 但加了也很慢
}
cycle check
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
while(!lockManager.acquire(pid, tid, perm)){
Thread.sleep(new Random().nextInt(10)); // 不加很慢 加了很快
}


// acquire:
// 在获取锁失败时,检测有没有环
if (next.getValue().getLockType() == LockType.EXCLUSIVE || perm == Permissions.READ_WRITE){
TidNode tidNow = tidMap.getOrDefault(tid, new TidNode(tid));
tidNow.next = tidMap.get(next.getValue().getTid());
checkCycle(new HashSet<TidNode>(), tidNow);

return false;
}

private void checkCycle(HashSet<TidNode> visited, TidNode tidNode) throws TransactionAbortedException {
if(visited.contains(tidNode)){
throw new TransactionAbortedException();
}
visited.add(tidNode);
if(tidNode.next != null){
checkCycle(visited, tidNode.next);
}
}
wait-die

可以保证环路中,优先级最低的一定die

需要遍历全部资源冲突的(否则会失效),如果存在优先级高的则die

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
boolean unlock = true;

while (iterator.hasNext()){
Map.Entry<TransactionId, LockInfo> next = iterator.next();
if (next.getKey().equals(tid)){
continue;
}

if (next.getValue().getLockType() == LockType.EXCLUSIVE || perm == Permissions.READ_WRITE){

if(tid.getId() > next.getValue().getTid().getId()){
throw new TransactionAbortedException();
}

unlock = false;
}
}

if (!unlock){
return false;
}

进阶:能否在获取失败后wait而不是自旋? wait的锁住的对象不好处理 用map<PageId, PageId>尝试失败

lab5

lab6

  • 之前的pool中的脏页不能写回磁盘(NO STEAL),现在提供undo解决这一问题
  • commit后如果宕机怎么办?先写入日志,日志实现恢复

You will implement rollback and recovery using the contents of the log file.

  • undo:记录下修改 用于 abort
  • redo:恢复成功的事务

STEAL and NO-FORCE:提供更灵活的缓冲区管理,允许commit前写回磁盘

​ abort时,先根据日志把磁盘中的数据还原,再删除pool中的该页面,最后把pool中的脏页recover

基本配置

因为可能没有commit就写回了(缓存空间不足 ,测试中通过flushAllPages模拟),需要写回前记录下写回前后的内容信息。当commit后,需要更新旧的信息

  1. Insert the following lines into BufferPool.flushPage() before your call to writePage(p), where p is a reference to the page being written: 在page写回磁盘前,先将log写回,包含最开始的内容(BeforeImage)以及当前内容

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    // append an update record to the log, with 
    // a before-image and after-image.
    TransactionId dirtier = p.isDirty();
    if (dirtier != null){
    Database.getLogFile().logWrite(dirtier, p.getBeforeImage(), p);
    Database.getLogFile().force();
    }
    if (page.isDirty() != null){
    table.writePage(page);
    page.markDirty(false, null);
    }
  2. 在commit并刷盘后,更新BeforeImage

    1
    2
    3
    4
    5
    flushPages(tid)
    flushPage(pageId);
    // use current page contents as the before-image
    // for the next transaction that modifies this page.
    page.setBeforeImage();

先debug,查看在什么情况下会写log

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static final int ABORT_RECORD = 1;
static final int COMMIT_RECORD = 2;
static final int UPDATE_RECORD = 3;
static final int BEGIN_RECORD = 4;
static final int CHECKPOINT_RECORD = 5;
static final long NO_CHECKPOINT_ID = -1; 日志文件开始

preAppend() 每次都调用 记录加一 或者重置日志

logXactionBegin(tid) 事务开始

logWrite(TransactionId tid, Page before, Page after)
page写回磁盘前,记录下写回前后的page

logCommit(tid) 提交事务

logAbort(tid) 回滚
需要rollback()

shutdown() 优雅关闭系统 记录下状态
logCheckpoint 关闭前记录下 tidToFirstLogRecord, 并在开头写入startCpOffset
写回所有脏页,log中只保存正在运行的tid的内容 minLogRecord 节约空间
logTruncate 截断以及commit的log内容,只保留正在运行的, 修改raf的指向

log格式:type 和 tid, 内容 , 最后写offset

1
2
3
4
5
6
raf.writeInt(ABORT_RECORD);
raf.writeLong(tid.getId());

...
raf.writeLong(currentOffset);
currentOffset = raf.getFilePointer();

RollBack

logFile/rollback():事务回滚后,把改事务修改的page还原到之前的状态

  1. 修改了但已经写回磁盘:pool中存在但已经不是dirty(flushPage会写回磁盘并删除脏页标记)

    • 根据log找到该事务修改的pageId,读出before-image, 并写回( tidToFirstLogRecord map到第一个log)
    • 删除pool中的页面(这里丢弃了页面,注意evict中没丢会空指针,所以evict多加一个判断)
    • 同一个pageId可能有多条log,最老的beforeimage才是正确的(page出去过后before就错了)
  2. 修改了还在pool中的(No steal时只有这一个),直接recoverPages(tid) 对dirty把磁盘中读出来并覆盖

TestAbort and TestAbortCommitInterleaved sub-tests of the LogTest system test.

Recovery

  1. 针对未commit的事务,回滚这些事务
  2. 针对已commit但可能还没同步写入磁盘,再次写回以确定一致性

问题:flushAllPages中flushPage时会清除Dirty标记,清除后事务commit时flushPages(tid)就不会再次移出该page,导致pool中的beforeimage并不会更新 TestCommitAbortCommitCrash测出来的(第二次看,最简单的方法就是把setbeforeimage放到flushPage的结尾)

  • 开源代码在commit时flushPages(tid)更新所有的old,该方案应该是错的,会影响还在进行中的其他事务
  • pool什么时候会被移出? 页面不足时 也就是正常情况下是会在evict调用flushpage,之后一定会removepage,之后再次读入时oldpage就是最新的(对于进行中的当前事务oldpage是错的,但log中存了正确的);也就是正常逻辑下不会出错,但flushAllPages会。所以需要在flushAllPages调用flushpage后也强制removepage一次就可以了

总结

基于java语言,实现一个简易事务支持的关系型数据库

  • 实现基本的遍历、连接、聚合和删除等基本操作算子,以及基于直方图的查询优化
  • 实现BufferPool缓存Page,且实现基于LRU的页面淘汰机制
  • 实现页面级的共享锁、排他锁和锁升级,实现可串行化的并发策略
  • 实现多种死锁检测算法:TimeoutWait-for GraphGlobal Orderings(wait-die)
  • 基于UNDO日志实现STEAL/NO FORCE策略,提供更灵活的缓冲区管理
  • 实现基本的WAL(Write-Ahead Logging)策略实现事务回滚与恢复

参考

https://blog.csdn.net/Cscprx/article/details/123418692

github

Code

wind/SimpleDB - 码云 - 开源中国 (gitee.com) 风在哪个人博客 (wygandwdn.cn)

SimpleDb 实验报告_simpledb实验_跳着迪斯科学Java的博客-CSDN博客

MIT6.830 SimpleDB Lab1