操作系统 - jyy

有什么不懂的直接问gpt

什么是程序

image-20230512154440672

00 01 10 00 01 10

源代码角度

同时我们也可以用C语言实现

image-20230512155633763

状态:内存中的所有东西,全部栈帧

使用栈模拟递归

image-20230512161116475

二进制角度

  • 状态:内存 + 寄存器
  • 初始状态:
  • 迁移:一条01指令

任何的程序都需要退出,也就是结束。因此需要特别的指令 syscall 把现在的状态交给操作系统

程序 = 普通计算 + syscall

实现与操作系统中别的对象交互

  • 读写文件 如果有权限,操作系统把状态写入程序的M, R
  • 改变进程 杀死程序

如何构造一个printf("hello world")

image-20230512171448873

操作系统

  • 收编了所有对象(包含程序状态机和文件),实现霸主地位
  • 管理多个状态机,根据权限访问。打开文件,屏幕显示

在程序眼里,操作系统就是syscall,程序 = (普通计算 + syscall) 如何只展示syscall

  • strace a.out 去掉计算,只展示用到的所有的系统调用

c语言的第一条程序是什么,谁定义的,能不能修改

计算机没有玄学,一切都建立在确定的机制上。bug只要能复现,就能解决

Python操作系统

思路

  • 应用程序 = 纯粹计算(Python 代码) + syscall; 状态机
  • 操作系统 = Python syscall实现,有 “假想” 的 I/O 设备; 管理状态机

​ 操作系统为方框,程序为圆圈,操作系统管理全部程序,并且会提供红色的syscall指令。蓝色为当前运行程序
spawn创建程序后,操作系统有了选择,到底执行哪一个程序呢?

image-20230527124547932

四个 “系统调用” API

  • choose(xs): 返回 xs 中的一个随机选项,纯粹的计算不能实现随机
  • write(s): 输出字符串 s
  • spawn(fn): 创建一个可运行的状态机 fn
  • sched(): 随机切换到任意状态机执行(这里是主动切换,实际也存在被OS强制切换)
demo-code

我进行状态机切换,肯定需要保存状态(变量值是多少、pc在哪) : yield (实际OS由一段汇编代码将当前状态机 (执行流) 的寄存器保存到内存中)
每一个进程在操作系统里被视为一个生成器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#!/usr/bin/env python3

import sys
import random
from pathlib import Path

class OperatingSystem():
"""A minimal executable operating system model."""

SYSCALLS = ['choose', 'write', 'spawn', 'sched']

class Thread:
"""A "freezed" thread state."""

def __init__(self, func, *args):
self._func = func(*args)
self.retval = None

def step(self):
"""Proceed with the thread until its next trap."""
syscall, args, *_ = self._func.send(self.retval)
self.retval = None
return syscall, args

def __init__(self, src):
variables = {}
exec(src, variables)
self._main = variables['main']

def run(self):
threads = [OperatingSystem.Thread(self._main)]
while threads: # Any thread lives
try:
match (t := threads[0]).step():
case 'choose', xs: # Return a random choice
t.retval = random.choice(xs)
case 'write', xs: # Write to debug console
print(xs, end='')
case 'spawn', (fn, args): # Spawn a new thread
threads += [OperatingSystem.Thread(fn, *args)]
case 'sched', _: # Non-deterministic schedule
random.shuffle(threads)
except StopIteration: # A thread terminates
threads.remove(t)
random.shuffle(threads) # sys_sched()

if __name__ == '__main__':
if len(sys.argv) < 2:
print(f'Usage: {sys.argv[0]} file')
exit(1)

src = Path(sys.argv[1]).read_text()
for syscall in OperatingSystem.SYSCALLS:
src = src.replace(f'sys_{syscall}', # sys_write(...)
f'yield "{syscall}", ') # -> yield 'write', (...)

OperatingSystem(src).run()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
count = 0

def Tprint(name):
global count
for i in range(3):
count += 1
sys_write(f'#{count:02} Hello from {name}{i+1}\n')
sys_sched()

def main():
n = sys_choose([3, 4, 5])
sys_write(f'#Thread = {n}\n')
for name in 'ABCDE'[:n]:
sys_spawn(Tprint, name)
sys_sched()
more

进程 + 线程 + 终端 + 存储 (崩溃一致性)

系统调用/Linux 对应 行为
sys_spawn(fn)/pthread_create 创建从 fn 开始执行的线程
sys_fork()/fork 创建当前状态机的完整复制
sys_sched()/定时被动调用 切换到随机的线程/进程执行
sys_choose(xs)/rand 返回一个 xs 中的随机的选择
sys_write(s)/printf 向调试终端输出字符串 s
sys_bread(k)/read 读取虚拟设磁盘块 �k 的数据
sys_bwrite(k, v)/write 向虚拟磁盘块 �k 写入数据 �v
sys_sync()/sync 将所有向虚拟磁盘的数据写入落盘
sys_crash()/长按电源按键 模拟系统崩溃

mosaic.py:500行操作系统。还实现了打印每一步的状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
T = 3
n = 3
def Tsum( ):
for _ in range(n):
tmp = heap.x
tmp += 1
sys_sched()
heap.x = tmp
sys_sched() # 没有这一步, 最小为 n=3 添加后为2
heap.done += 1
def main():
heap.x = 0
heap.done = 0
for _ in range(T):
sys_spawn(Tsum)
while heap.done != T:
sys_sched()
sys_write(f'SUM = {heap.x}')

不确定性发生在sys_sched()--check 遍历所有答案

1
2
3
4
5
6
7
8
9
10
void Tsum() {
for (int i = 0; i < n; i++) {
int tmp = sum;
tmp++;
// 假设此时可能发生进程/线程切换
sum = tmp;
// 假设此时可能发生进程/线程切换 没有这个最小值为n
}
}
T个程序执行n次,[2~n*T]

并发

多处理器编程

1.放弃原子性

1
2
3
4
5
6
7
8
9
unsigned int balance = 100;
int Alipay_withdraw(int amt) {
if (balance >= amt) {
balance -= amt;
return SUCCESS;
} else {
return FAIL;
}
}
1
for (int i = 0; i < N; i++) sum++;

printf("a") 为什么不会报错?带了锁

互斥和原子性是本学期的重要主题

2.顺序丧失

1
2
3
4
5
6
7
8
9
10
-O1: R[eax] = sum; R[eax] += N; sum = R[eax]
O1保证最终一致,如果要写入多次,直接一次性写入
最终读出100000000

-O2: sum += N; 200000000

另一个例子: 系统默认done不会改变了
while (!done);
// would be optimized to
if (!done) while (1);

3.丧失可见性

理论上输出 01 10 11, 但其实00也有输出

处理器也是一个编译器,一条指令拆分多个uops

如果想写入x时未命中,print就可以先执行

1
2
3
4
5
6
7
8
9
10
11
12
13
int x = 0, y = 0;

void T1() {
x = 1;
asm volatile("" : : "memory"); // compiler barrier
printf("y = %d\n", y);
}

void T2() {
y = 1;
asm volatile("" : : "memory"); // compiler barrier
printf("x = %d\n", x);
}

理解并发程序执行

共享内存实现并发时,一个反例 get set不是原子操作

1
2
3
4
5
6
7
8
9
10
11
12
13
int locked = UNLOCK;

void critical_section() {
retry:
if (locked != UNLOCK) {
goto retry;
}
locked = LOCK;

// critical section

locked = UNLOCK;
}

Peterson

棋子代表:我想上厕所;门上贴的人代表着:谁能上厕所

image-20230514163743735
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int x = 0, y = 0, turn = A;
void TA() {
while (1) {
/* PC=1 */ x = 1;
/* PC=2 */ turn = B;
/* PC=3 */ while (y && turn == B) ;
critical_section();
/* PC=4 */ x = 0; } }
void TB() {
while (1) {
/* PC=1 */ y = 1;
/* PC=2 */ turn = A;
/* PC=3 */ while (x && turn == A) ;
critical_section();
/* PC=4 */ y = 0; } }

证明正确性:直接画出状态机表达出全部状态。

Model Checker

并发程序 = 状态机,画出状态机就可以知道并发程序有没有错

1
2
3
4
5
6
7
8
9
10
11
12
13
class Mutex:
locked = ''

def T1(self):
yield checkpoint()
while True:
yield checkpoint()
while self.locked == '🔒':
yield checkpoint()
pass
yield checkpoint()
self.locked = '🔒'
...

使用程序去遍历出全部的状态 Model Checker

没有工具(编程、测试、调试),不做系统

互斥

1
2
3
4
5
typedef struct {
...
} lock_t;
void lock(lock_t *lk);
void unlock(lock_t *lk);

还是原来难点:get set 非原子

自旋锁 spin lock

硬件能为我们提供一条 “瞬间完成” 的读 + 写指令

xchg dest, src 原子的实现交换数据,并返回原来的值。 这样就可以实现两个线程之间的锁

1
2
3
4
5
6
7
8
9
10
int xchg(volatile int *addr, int newval) {
int result;
asm volatile ("lock xchg %0, %1"
: "+m"(*addr), "=a"(result) : "1"(newval));
return result;
}

int locked = 0;
void lock() { while (xchg(&locked, 1)) ; }
void unlock() { xchg(&locked, 0); }
  • 处理器保证,带lock的指令可以锁定总线,xchg默认带lock
  • 两个cpu共享内存时,带lock指令会锁住memory ,硬件实现,一个bit实现bus lock
  • cpu有缓存L1,如何保证缓存一致。当一个cpu锁定memory时,需要把别的cpu的缓存都剔除

Load-Reserved/Store-Conditional,硬件里会实现

Compare-and-Swap:乐观锁

1
2
3
4
5
6
int cas(int *addr, int cmp_val, int new_val) {   
int old_val = *addr;
if (old_val == cmp_val) {
*addr = new_val; return 0;
} else { return 1; }
}

缺陷:未获得锁的线程在空转,甚至获取锁的线程被OS切换了,所以需要不拥堵时使用

操作系统内部自己使用:操作系统内核的并发数据结构 (短临界区);关中断

互斥锁 Mutex Lock

与其干等,不如把cpu让给别的线程执行,阻塞

把锁的实现放到操作系统里就好!

  • syscall(SYSCALL_lock, &lk); // 试图获得 `lk`,但如果失败,就切换到其他线程
    
    1
    2
    3

    - ```c
    syscall(SYSCALL_unlock, &lk); // 释放 `lk`,如果有等待锁的线程就唤醒

未得到锁会进入等待队列,释放锁时OS会取出等待队列中一个线程,OS使用自旋锁确保自己处理过程是原子的

上锁失败会睡眠,不占用CPU,但不管有没有竞争都需要进出内核系统调用,带来一定的开销

Futex = Spin + Mutex

Fast Userspace muTexes,无竞争情况下,能够避免系统调用的开销

1.自旋锁 (线程直接共享 locked)

  • 更快的 fast path
    • xchg 成功 → 立即进入临界区,开销很小
  • 更慢的 slow path
    • xchg 失败 → 浪费 CPU 自旋等待

2.睡眠锁 (通过系统调用访问 locked)

  • 更快的 slow path
    • 上锁失败线程不再占用 CPU
  • 更慢的 fast path
    • 即便上锁成功也需要进出内核 (syscall)

3.融合:先原子指令上锁,失败后系统调用睡眠

线程库的锁就是这样的锁,但还有很多的优化以减少系统调用

code:Kernel为操作系统需要做的,这里使用while模拟

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class Futex:
locked, waits = '', ''
# Model Checker会按行执行,所以这会被视为一个原子操作
def tryacquire(self):
if not self.locked:
# Test-and-set (cmpxchg)
# Same effect, but more efficient than xchg
self.locked = '🔒'
return ''
else:
return '🔒'

def release(self):
if self.waits:
self.waits = self.waits[1:]
else:
self.locked = ''

@thread
def t1(self):
while True:
if self.tryacquire() == '🔒': # User
self.waits = self.waits + '1' # Kernel
while '1' in self.waits: # Kernel 实际上这里已经被os剥夺了
pass
cs = True # User
del cs # User
self.release() # Kernel

@thread
def t2(self):
while True:
if self.tryacquire() == '🔒':
self.waits = self.waits + '2'
while '2' in self.waits:
pass
cs = True
del cs
self.release()

Fast/slow paths:性能优化的途径

同步

有了基本的互斥锁,现在需要通过他实现一些线程间的同步机制。有锁时默认使用Futex

线程同步:共同达到互相已知的状态

生产者消费者(解决并发的万能钥匙):等价于打印左右括号,左括号往队列加资源,右括号消费资源

自旋锁、互斥锁、条件变量、信号量、管道(通信)

互斥锁

count计数左括号,是共享资源需要互斥,使用spin lock实现互斥访问count,并且如果不符合条件就反复询问
使用管道输入到python程序进行检查

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
int n, count = 0;
mutex_t lk = MUTEX_INIT();

void Tproduce() {
while (1) {
retry:
mutex_lock(&lk);
if (count == n) {
mutex_unlock(&lk);
goto retry;
}
count++;
printf("(");
mutex_unlock(&lk);
}
}

void Tconsume() {
while (1) {
retry:
mutex_lock(&lk);
if (count == 0) {
mutex_unlock(&lk);
goto retry;
}
count--;
printf(")");
mutex_unlock(&lk);
}
}

万能的条件变量

我希望不要循环,在不满足时进行sleep

条件变量 API:在不符合某条件时,等待别人通过cv唤醒我 cv是程序间的暗号 需要访问共享条件当然要加锁

  • wait(cv, mutex) 💤 release(mutex)、sleep
    • 调用时必须保证已经获得 mutex
    • 醒来时需要获取mutex
  • signal/notify(cv) 💬 私信:走起
    • 如果有线程正在等待 cv,则唤醒其中一个线程
  • broadcast/notifyAll(cv) 📣 所有人:走起
    • 唤醒全部正在等待 cv 的线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void Tproduce() {
mutex_lock(&lk);
if (count == n) cond_wait(&cv, &lk); // 睡眠前释放锁;在唤醒后,会重新获取锁
printf("("); count++; cond_signal(&cv);
mutex_unlock(&lk);
}

void Tconsume() {
mutex_lock(&lk);
if (count == 0) cond_wait(&cv, &lk);
printf(")"); count--; cond_signal(&cv);
mutex_unlock(&lk);
}

代码有问题,因为消费者cond_signal(&cv)时会唤醒消费者,因此需要两个条件变量分别用来唤醒P、C
或者我直接boradcast唤醒全部,但if检测改成while循环
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class ProducerConsumer:
locked, count, log, waits = '', 0, '', ''

def tryacquire(self):
self.locked, seen = '🔒', self.locked
return seen == ''

def release(self):
self.locked = ''

@thread
def tp(self):
for _ in range(2):
while not self.tryacquire(): pass # mutex_lock()

if self.count == 1:
# cond_wait
_, self.waits = self.release(), self.waits + '1'
while '1' in self.waits: pass
while not self.tryacquire(): pass

self.log, self.count = self.log + '(', self.count + 1
self.waits = self.waits[1:] # cond_signal
self.release() # mutex_unlock()

@thread
def tc1(self):
while not self.tryacquire(): pass

if self.count == 0:
_, self.waits = self.release(), self.waits + '2'
while '2' in self.waits: pass
while not self.tryacquire(): pass

self.log, self.count = self.log + ')', self.count - 1

self.waits = self.waits[1:]
self.release()

@thread
def tc2(self):
while not self.tryacquire(): pass

if self.count == 0:
_, self.waits = self.release(), self.waits + '3'
while '3' in self.waits: pass
while not self.tryacquire(): pass

self.log, self.count = self.log + ')', self.count - 1

self.waits = self.waits[1:]
self.release()

两个条件变量实现

1
2
3
4
5
6
7
8
9
10
11
12
void Tproduce() {
mutex_lock(&lk);
if (count == n) cond_wait(&c, &lk);
printf("("); count++; cond_signal(&p);
mutex_unlock(&lk);
}
void Tconsume() {
mutex_lock(&lk);
if (count == 0) cond_wait(&p, &lk);
printf(")"); count--; cond_signal(&c);
mutex_unlock(&lk);
}
while循环+broadcast

通用模板

1
2
3
4
5
6
7
8
9
10
11
12
13
mutex_lock(&mutex);
while (!cond) { cond可以多个条件
wait(&cv, &mutex);
}
assert(cond); // 互斥锁保证了在此期间条件 cond 总是成立

其他线程条件可能被满足时,就算不满足进入while还是会睡眠
broadcast(&cv);

mutex_unlock(&mutex);


/// ...计算任务,可能需要更长时间 也就是T(job) >> T(同步互斥)

作业:打印指定的形状 <><_><>_ http://jyywiki.cn/pages/OS/2022/demos/fish.c

处理器可以乱序执行,先执行第二条,但13需要顺序执行,硬件实现顺序执行

1
2
3
x = 0   write
t = y # 可以先执行
z = x read

信号量PV

在执行前需要某些东西,没有就睡眠等到有。happens before优雅但不全

最好解决单一资源问题,但上面的打印🐟难以实现

token为资源数量,当token=1、0时就代表互斥锁Mutex,没得到就睡眠(但相当于有多把钥匙)

P: token– if token < 0,线程加入等待队列

V:token++ if token<=0, 唤醒等待队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Semaphore:
token, waits = 1, ''

def P(self, tid):
if self.token > 0:
self.token -= 1
return True
else:
self.waits = self.waits + tid
return False

def V(self):
if self.waits:
self.waits = self.waits[1:]
else:
self.token += 1
1
2
3
4
5
6
7
8
9
10
void producer() {
P(&empty); // P()返回 -> 得到手环
printf("("); // 假设线程安全
V(&fill);
}
void consumer() {
P(&fill);
printf(")");
V(&empty);
}
线程同步
1
2
3
4
5
6
7
8
9
A -> B      如果使用条件变量,可能出现A已经执行,但B还没进入
s = 0
A: V(s)
B: P(S)

join:
s = 0
A、B、C: V(s)
main: P(S) * |T|
计算图

如果想要并行,就需要画出计算图,并让程序按计算图执行:PV很方便

每条边PV各一次

image-20230526171022426
打印🐟
1
2
3
当前线程想打符号'<' 那就P('<')

当前线程结束后,根据规则决定谁可以执行 V('>') 多个就随机
信号量实现条件变量
1
2
3
4
5
6
失败:我不能带者锁睡眠,必须要先释放锁,但释放之后不能保证原子性了
wait(mutex){
release(mutex)
// 可能被broadcast
sleep
}

管道

之前实现通信需要共享内存并且锁定,会引发竞争和死锁;因此我们反过来,通过通信来实现共享内存

Do not communicate by sharing memory; instead, share memory by communicating. ——*Effective Go*

管道:不但能同步,还能通信

cat a.txt | wc -l Linux管道就是一种同步机制。 后一个会一边接收一边处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main

import "fmt"

var stream = make(chan int, 10)
const n = 4

func produce() {
for i := 0; ; i++ {
fmt.Println("produce", i)
stream <- i
}
}

func consume() {
for {
x := <-stream
fmt.Println("consume", x)
}
}

func main() {
for i := 0; i < n; i++ {
go produce()
}
consume()
}

哲学家吃饭

条件变量

直接上吃饭的条件,并用互斥锁保护起来

1
2
3
4
5
6
7
8
9
10
11
12
13
mutex_lock(&mutex);
while (!(avail[lhs] && avail[rhs])) {
wait(&cv, &mutex);
}
avail[lhs] = avail[rhs] = false;
mutex_unlock(&mutex);

// ...

mutex_lock(&mutex);
avail[lhs] = avail[rhs] = true;
broadcast(&cv);
mutex_unlock(&mutex);
信号量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 失败的尝试,这里会死锁
void Tphilosopher(int id) {
int lhs = (N + id - 1) % N;
int rhs = id % N;
while (1) {
P(&locks[lhs]);
printf("T%d Got %d\n", id, lhs + 1);
P(&locks[rhs]);
printf("T%d Got %d\n", id, rhs + 1);

// ...

V(&locks[lhs]);
V(&locks[rhs]);
}
}

// 1.直接锁起来
mutex_lock(&mutex);
P(&locks[lhs]);
P(&locks[rhs]);
mutex_unlock(&mutex);

// 2. 只允许四个人上座 numperson=4 需求变了怎么办?
P(&numperson);
P(&locks[lhs]);
P(&locks[rhs]);


V(&numperson);

需求变了:如果一个人要左边两把右边一把,如何设计? 还是条件变量方便,直接改cond就行

分布与集中

集中控制而不是各自协调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void Tphilosopher(int id) {
send_request(id, EAT);
P(allowed[id]); // waiter 会把叉子递给哲学家
philosopher_eat();
send_request(id, DONE);
}

void Twaiter() {
while (1) {
(id, status) = receive_request();
if (status == EAT) { ... }
if (status == DONE) { ... }
}
}
  • 集中的人压力太大?再分级

信号量可以被操作系统高效实现,避免了broadcast开销

真实世界的并发编程

背景回顾:我们已经掌握了多种并发控制技术:自旋锁、互斥锁、条件变量、信号量。我们已经可以实现共享内存系统上的任意并发/并行计算。然而,大家也在使用这些 “底层” 并发控制时发现使用的困难。那么,真实世界的程序员是怎么实现并发程序的?

  • 高性能计算 (注重任务分解)中的并行编程 (embarrassingly parallel 的数值计算)
  • 数据中心(注重系统调用): (协程、Goroutine 和 channel)
  • 人工智能时代的分布式机器学习 (GPU 和 Parameter Server)
  • 用户身边的并发编程 (Web 和异步编程) (注重易用性)

高性能计算

massive computation 源自数值密集型科学计算任务。通常有固定的计算图

  • 物理系统模拟
    • 天气预报、航天、制造、能源、制药、……
    • 大到宇宙小到量子,有模型就能模拟
  • 矿厂 (现在不那么热了)
    • 纯粹的 hash 计算
  • HPC-China 100

embarrassingly parallel :这类问题可以被分解成多个独立的子问题,每个子问题可以在不同的处理器上并行计算,而不需要进行任何进一步的同步或通信。这种问题的并行化非常简单,因为每个子问题都是相互独立的,不需要进行任何协调或同步。

通常出现在科学计算、数据分析、图像处理等领域。例如,在图像处理中,可以将一张大图像分成多个小块,每个小块可以在不同的处理器上并行处理,最后将结果合并成一张完整的图像。在科学计算中,可以将一个大型计算任务分成多个小任务,每个小任务可以在不同的处理器上并行计算,最后将结果合并成一个完整的计算结果。

image-20230529102943340两个线程画图

数据中心

“A network of computing and storage resources that enable the delivery of *shared* applications and data.” (CISCO)

数据 (存储) 为中心

  • 互联网索引与搜索
    • Google
  • 社交网络
    • Facebook/Twitter
  • 支撑各类互联网应用
    • 通信 (微信/QQ 群人数为什么有上限?)、支付 (支付宝)、游戏/网盘/……
关键问题

我希望高可靠、低延时、多副本的分布式 存储 计算系统

image-20230529103310529

举个例子:微信先拉黑,再发朋友圈,如果没有一致性,那么朋友圈可能被拉黑的人看到。亚马逊没一致性可能发两个快递

单机程序

image-20230529103721001

假设有数千/数万个请求同时到达服务器……

  • 线程能够实现并行处理
  • 但远多于处理器数量的线程导致性能问题
    • 切换开销
    • 维护开销
协程(协作的线程)

和线程概念相同 (独立堆栈、共享内存) ,用户态的线程,由程序员主动控制

  • 但 “一直执行”,直到 yield() 可以视为函数调用, 主动放弃处理器
    • 有编译器辅助,切换开销低
      • yield() 是函数调用,只需保存/恢复 “callee saved” 寄存器(函数调用保存的寄存器) RBP
      • 线程切换需要保存/恢复全部寄存器
    • 但等待 I/O 时,其他协程就不能运行了……
      • 失去了并行 go优化
1
2
3
// 只可能是 1122 或 2211
void T1() { send("1"); send("1"); yield(); }
void T2() { send("2"); send("2"); yield(); }

Goroutine:概念上是线程,实现上是协程:在遇到IO且可能等待时,yield切换

如果是协程,线程sleep后计算机就停止了,但go优化成yield切换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main

import (
"fmt"
"time"
)

func main() {
go spinner(100 * time.Millisecond)
const n = 45
fibN := fib(n) // slow
fmt.Printf("\rFibonacci(%d) = %d\n", n, fibN)
}

func spinner(delay time.Duration) {
for {
for _, r := range `-\|/` {
fmt.Printf("\r%c", r)
time.Sleep(delay)
}
}
}

func fib(x int) int {
if x < 2 { return x }
return fib(x - 1) + fib(x - 2)
}

go之前,java已经形成了大数据处理系统的生态

人工智能时代的分布式机器学习

特点

image-20230529110833871

并行

数据并行:一部分数据这台机器,一部分那台 model = nn.DataParallel(model)
模型并行:切割计算图

image-20230529111116782

SIMP

Single Instruction, Multiple Threads

CPU:多个cpu,但各自运行各自的,都有pc指针

image-20230529112121783

GPU:一个pc控制多个执行流,独立寄存器标记线程号

image-20230529112321136
SIMD

image-20230529112558826

分布式机器学习

image-20230529112936378

用户身边并发编程

web2.0:HTML(DOM Tree) + CSS + JS

特点:不太复杂

  • 既没有太多计算
    • DOM Tree 也不至于太大 (大了人也看不过来)
    • DOM Tree 怎么画浏览器全帮我们搞定了
  • 也没有太多 I/O
    • 就是一些网络请求
JS

单线程 + 事件模型

  • 一个线程、按序执行 (run-to-complete)。无并行减少了bug 主线程执行栈 + 微任务队列

  • 耗时的 API (Timer, Ajax, …) 调用会立即返回 + Callback

    • 当Promise被创建时,它处于未完成的状态(pending)。当异步操作完成并且Promise成功解析(resolved)时,或者发生错误导致Promise被拒绝(rejected)时,回调函数会被添加到微任务队列中。
  • 坏处$.ajax 嵌套 5 层,可维护性已经接近于零了

Promise

Callback没有很好表示流程图 -> Promise

Chaining

1
2
3
4
5
6
7
loadScript("/article/promise-chaining/one.js")
.then( script => loadScript("/article/promise-chaining/two.js") )
.then( script => loadScript("/article/promise-chaining/three.js") )
.then( script => {
// scripts are loaded, we can use functions declared there
})
.catch(err => { ... } );

Fork-join

1
2
3
4
a = new Promise( (resolve, reject) => { resolve('A') } )
b = new Promise( (resolve, reject) => { resolve('B') } )
c = new Promise( (resolve, reject) => { resolve('C') } )
Promise.all([a, b, c]).then( res => { console.log(res) } )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 封装一个函数,用于加载一张图片
function loadImage(url) {
return new Promise((resolve, reject) => {
const image = new Image();
image.onload = () => {
resolve(image); // 图片加载成功,将 Promise 标记为成功
};
image.onerror = () => {
reject(new Error('Failed to load image')); // 图片加载失败,将 Promise 标记为失败
};
image.src = url;
});
}

// 图片加载任务列表
const imageUrls = [
'image1.jpg',
'image2.jpg',
'image3.jpg'
];

// 使用 Promise.all() 来处理多个图片加载任务
Promise.all(imageUrls.map(url => loadImage(url)))
.then(images => {
// 所有图片加载完成,可以进行展示
images.forEach(image => {
document.body.appendChild(image); // 假设将图片添加到页面中
});
})
.catch(error => {
// 图片加载过程中出现错误
console.error(error);
});

Async-Await

一种计算图的描述语言.

通过使用 async 关键字声明一个函数为异步函数(函数会返回一个Promise对象),我们可以在函数内部使用 await 关键字来等待一个 Promise 对象的解决(resolve)

更现代、更优雅的方式来处理异步代码,与 .then 方法相比更易于理解和编写(避免嵌套)。

1
2
3
4
5
6
7
A = async () => await $.ajax('/hello/a')
B = async () => await $.ajax('/hello/b')
C = async () => await $.ajax('/hello/c')
hello = async () => await Promise.all([A(), B(), C()])
hello()
.then(window.alert)
.catch(res => { console.log('fetch failed!') } )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 异步函数示例
async function fetchData() {
try {
const response = await axios.get('https://api.example.com/data');
const data = response.data;
return data;
} catch (error) {
console.error(error);
throw new Error('Failed to fetch data');
}
}


// 调用异步函数
console.log('Before fetchData()');

fetchData()
.then(data => {
console.log('Data:', data);
})
.catch(error => {
console.error('Error:', error);
});

console.log('After fetchData()');

// res:
// Before fetchData()
// After fetchData()
// Data: [data from API]

并发bug

  • 死锁
  • 数据竞争
  • 原子性和顺序违反
1
2
3
assert mode in [XRay, Electron]
assert mirror in [On, Off]
assert not (mode == XRay and mirror == Off)

死锁

表现明显:程序停止了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
lock(&lk);
// lk = LOCKED;
lock(&lk);
// while (xchg(&lk, LOCKED) == LOCKED) ;
多层函数调用、隐藏的控制流(中断)


void Tphilosopher() {
P(&avail[lhs]);
P(&avail[rhs]);
// ...
V(&avail[lhs]);
V(&avail[rhs]);
}
  • Mutual-exclusion - 一张校园卡只能被一个人拥有
  • Wait-for - 一个人等其他校园卡时,不会释放已有的校园卡
  • No-preemption - 不能抢夺他人的校园卡
  • Circular-chain - 形成校园卡的循环等待关系

处理方法

  • 预防死锁(一次性分配资源; 允许抢占; Locker order规定获取锁的顺序,必须从小到大(获得编号最大的进程能运行))
  • 避免死锁银行家:在一次分配完后检查是否有安全序列能实现全部运行)
  • 监测死锁资源分配图,分配完有没有环)
  • 解除死锁(资源剥夺、回滚进程、终止进程)

数据竞争

多个线程对同一内存进行读写,先写还是先读产生的结果不同。山寨支付宝判断并修改余额

image-20230530154901555

先来先写,多个memory,无法定义谁先来,所以需要避免data race。上锁

不同的线程同时访问同一内存,且至少有一个是写

  • “内存” 可以是地址空间中的任何内存
    • 可以是全部变量
    • 可以是堆区分配的变量
    • 可以是栈
  • “访问” 可以是任何代码
    • 可能发生在你的代码里
    • 可以发生在框架代码里
    • 可能是一行你没有读到过的汇编代码
    • 可能时一条 ret 指令

原子性违反

调查100个BUGs,97% 的非死锁并发 bug 都是原子性(山寨支付宝)或顺序错误

image-20230530161346187

应对并发bug

假设程序员会花式犯错

编译器:只管翻译代码,不管需求含义

怎么才能编写出 “正确” (符合 specification) 的程序?

  • 证明:Annotation verifier (Dafny), Refinement types
  • 推测:Specification mining (Daikon)
  • 构造:Program sketching
  • 编程语言的历史和未来
    • 机器语言 → 汇编语言 → 高级语言 → 自然编程语言

防御性编程

机器永远是对的,代码始终是错的

防御性编程:把可能不对的情况都检查一遍。assert 大型项目很需要

  • Peterson 算法中的临界区计数器

    • assert(nest == 1);
  • 二叉树的旋转

    • assert(p->parent->left == p || p->parent->right == p);
  • AA-Deadlock 的检查

    • if (holding(&lk)) panic();
    • xv6 spinlock 实现示例

自动运行时检查

Lockdep死锁检测

Lockdep :死锁的检查。linux内核

​ 每一个锁有一个唯一的site(线程文件行号),上锁解锁日志记录下该site。通过查看所有日志有没有环

1
2
3
void lock(lock_t *lk) {
printf("LOCK %s\n", lk->site);
}
data race检测

ThreadSanitizer:运行时的数据竞争检查.编译时实现

两个进程同时读写同一内存,有没有happens-before关系,没有就存在data race (T1-5 T2-5)

image-20230531112908484

rule of thumb

  • 不实现 “完整” 的检查
  • 允许存在误报/漏报
  • 但实现简单、非常有用
canary

“牺牲” 一些内存单元,来预警 memory error 的发生。栈空间多分配一些canary空间作为保护,值被修改了就异常了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#define MAGIC 0x55555555
#define BOTTOM (STK_SZ / sizeof(u32) - 1)
struct stack { char data[STK_SZ]; };

void canary_init(struct stack *s) {
u32 *ptr = (u32 *)s;
for (int i = 0; i < CANARY_SZ; i++)
ptr[BOTTOM - i] = ptr[i] = MAGIC;
}

void canary_check(struct stack *s) {
u32 *ptr = (u32 *)s;
for (int i = 0; i < CANARY_SZ; i++) {
panic_on(ptr[BOTTOM - i] != MAGIC, "underflow");
panic_on(ptr[i] != MAGIC, "overflow");
}
}

msvc 中 debug mode 的 guard/fence/canary

  • 未初始化栈: 0xcccccccc
  • 未初始化堆: 0xcdcdcdcd
  • 对象头尾: 0xfdfdfdfd
低配lockdep

统计自旋的次数,超过某个值肯定不正常

1
2
3
4
5
6
int count = 0;
while (xchg(&lk, LOCKED) == LOCKED) {
if (count++ > SPIN_LIMIT) {
panic("Spin limit exceeded @ %s:%d\n", __FILE__, __LINE__);
}
}
低配AddressSanitizer

并发分配内存时,分配完标记一个颜色

1
2
3
4
5
6
7
8
9
10
11
// allocation
for (int i = 0; (i + 1) * sizeof(u32) <= size; i++) {
panic_on(((u32 *)ptr)[i] == MAGIC, "double-allocation");
arr[i] = MAGIC;
}

// free
for (int i = 0; (i + 1) * sizeof(u32) <= alloc_size(ptr); i++) {
panic_on(((u32 *)ptr)[i] == 0, "double-free");
arr[i] = 0;
}

多处理器与中断

处理器如何实现多线程?中断

本讲内容:操作系统内核实现。

  • 多处理器和中断
  • AbstractMachine API
  • 50 行实现嵌入式操作系统

多处理器的状态机模型:状态为内存和每一个状态。执行为任选一个cpu

image-20230601105219177

如果死循环某个CPU就会卡死,而为什么我们写的死循环程序不会卡死电脑?

中断

下降沿触发的一根线。IF寄存器(interrupter flag) 决定是否响应中断(操作系统可以修改该值)

image-20230601111009780
  • x86 Family
    • 询问中断控制器获得中断号 n
    • 保存 CS, RIP, RFLAGS, SS, RSP 到堆栈
    • 跳转到 IDT[n] 指定的地址,并设置处理器状态 (例如关闭中断)

关中断实现了 “stop the world” ,尝试asm volatile ("cli")被操作系统检测到,会直接报错

上下文保存

所有线程都有一块内存用来保存自己的现场

1
2
3
4
5
6
7
8
struct Context {
uint64_ t rax, rbx, rcx, rdx,
rbp, rsi, rdi,
r8,r9,r10, r11,
r12,r13, r14, r15,
rip, cs, rflags,
rsp, ss, rsp0;
};

中断处理:保存当前的ctx,并返回运行在该cpu上的另一个线程的上下文

  • tasks[n]是内存中所有线程的状态。next指针把所有状态串联
  • currents[MAX_CPU] 是每一个cpu当前的状态,都指向tasks中的某一个
  • current=currents[cpu_current()]是当前cpu状态指针,current->context = ctx保存现场到内存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Task *currents[MAX_CPU];
#define current currents[cpu_current()]

Context *on_interrupt(Event ev, Context *ctx) {
if (!current) {
current = &tasks[0]; // First trap for this CPU
} else {
current->context = ctx; // Keep the stack-saved context
}

// Schedule
do {
current = current->next;
} while (!on_this_cpu(current)); // ((current - tasks) % cpu_count() != cpu_current());

return current->context;
}

50行操作系统

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include <am.h>
#include <klib.h>
#include <klib-macros.h>

#define MAX_CPU 8

typedef union task {
struct {
const char *name;
union task *next;
void (*entry)(void *);
Context *context;
};
uint8_t stack[8192];
} Task; // A "state machine"

Task *currents[MAX_CPU];
#define current currents[cpu_current()]

int locked = 0; // A spin lock
void lock() { while (atomic_xchg(&locked, 1)); }
void unlock() { atomic_xchg(&locked, 0); }

#include "tasks.h"

Context *on_interrupt(Event ev, Context *ctx) {
if (!current) current = &tasks[0]; // First interrupt
else current->context = ctx; // Save pointer to stack-saved context
do {
current = current->next;
} while ((current - tasks) % cpu_count() != cpu_current());
return current->context; // Restore a new context
}

void mp_entry() {
yield(); // Self-trap; never returns
}

int main() {
cte_init(on_interrupt);

for (int i = 0; i < LENGTH(tasks); i++) {
Task *task = &tasks[i];
Area stack = (Area) { &task->context + 1, task + 1 }; // 栈起始结束地址
task->context = kcontext(stack, task->entry, (void *)task->name);
task->next = &tasks[(i + 1) % LENGTH(tasks)];
}
mpe_init(mp_entry);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// User-defined tasks

void func(void *arg) {
while (1) {
lock();
printf("Thread-%s on CPU #%d\n", arg, cpu_current());
unlock();
for (int volatile i = 0; i < 100000; i++) ;
}
}

Task tasks[] = {
{ .name = "A", .entry = func },
{ .name = "B", .entry = func },
{ .name = "C", .entry = func },
{ .name = "D", .entry = func },
{ .name = "E", .entry = func },
};

内核

fork execv exit

  • 应用程序 = 纯粹计算(Python 代码) + syscall; 状态机
  • 操作系统 = syscall实现:重要的三个系统调用
    • fork: 对当前状态机状态进行完整复制
    • execve: 将当前状态机状态重置为某个可执行文件描述的状态机
    • exit: 销毁当前状态机

fork

fork: 完全复制一份当前的状态 (内存、寄存器现场),相当于创建了一个新的进程,返回子进程号。unix唯一方法

因为状态机是复制的,因此总能找到 “父子关系”

  • 因此有了进程树 (pstree)
1
2
3
4
5
6
7
8
9
10
systemd-+-ModemManager---2*[{ModemManager}]
|-NetworkManager---2*[{NetworkManager}]
|-accounts-daemon---2*[{accounts-daemon}]
|-at-spi-bus-laun-+-dbus-daemon
| `-3*[{at-spi-bus-laun}]
|-at-spi2-registr---2*[{at-spi2-registr}]
|-atd
|-avahi-daemon---avahi-daemon
|-colord---2*[{colord}]
...

1个变成了4个。fork出来的子进程也会执行下面的fork(), 除了x不一样其他都一样

1
2
3
pid_t x = fork();
pid_t y = fork();
printf("%d %d\n", x, y);

打印了多少个?

1
2
3
4
5
6
7
for (int i = 0; i < 2; i++) {
fork();
printf("Hello\n");
}

./a.out 6
./a.out | cat 8个 因为如果输入到管道,print的信息会放到缓存中,被同时复制

execv

execv

  • 将当前进程重置成一个可执行文件描述状态机的初始状态
  • 唯一能够执行程序的系统调用,一切程序的起点(fork是父进程调用的)
  • 传入参数环境变量
1
2
3
4
5
int execve(const char *filename,
char * const argv[], char * const envp[]);

ls -al
execve(" /usr/bin/ls", ["ls""-al"], 0x7ffeaabcda88 /* 54 vars */) = 0

环境变量,默认继承父进程

  • 使用env命令查看
    • PATH: 可执行文件搜索路径
    • PWD: 当前路径
    • HOME: home 目录
    • DISPLAY: 图形输出
    • PS1: shell 的提示符
  • export: 告诉 shell 在创建子进程时设置环境变量
    • 小技巧:export ARCH=x86_64-qemuexport ARCH=native

strace gcc gcc会先去找可执行的as汇编器程序,去path里找,所以会输出:

1
2
3
4
[pid 28369] execve("/usr/local/sbin/as", ["as", "--64", ...
[pid 28369] execve("/usr/local/bin/as", ["as", "--64", ...
[pid 28369] execve("/usr/sbin/as", ["as", "--64", ...
[pid 28369] execve("/usr/bin/as", ["as", "--64", ..

exit

1
2
3
void _exit(int status)
销毁当前状态机,并允许有一个返回值
子进程终止会通知父进程 (后续课程解释)

exit 的几种写法 (它们是不同)

  • exit(0) stdlib.h中声明的 libc 函数
    • 会调用 atexit
  • _exit(0) glibc 的 syscall wrapper
    • 执行 “exit_group” 系统调用终止整个进程 (所有线程)
      • 细心的同学已经在 strace 中发现了
    • 不会调用 atexit
  • syscall(SYS_exit, 0)
    • 执行 “exit” 系统调用终止当前线程
    • 不会调用 atexit

shell中运行./a.out:

  1. Shell解析命令行输入,发现./a.out是一个可执行文件。
  2. Shell调用fork()系统调用,创建一个新的子进程。子进程复制父进程的所有资源和代码段。
  3. 子进程调用execve()系统调用,加载并执行a.out可执行文件。
    1. 如果可执行文件需要进行系统调用,例如读取文件或写入文件,它会使用相应的系统调用,如open()read()write()等。
    2. 当可执行文件执行完毕或调用了exit()系统调用来终止进程时,进程会返回到Shell。
  4. shell wail()等待子进程(如果后台运行&就不会等待)

Shell本身也是一个进程/bin/bash,初始状态读取配置文件~/.bashrc。输入:

  • 执行内部命令(例如cdecho
  • 执行系统命令(例如lsgrep)(本质上也是可执行文件)
  • 可执行文件

Linux中的init

  • Linux 操作系统
  • Linux 系统启动和 initramfs
  • Linux 应用世界的构建

just for fun

1
2
Hello, everybody out there using minix – I’m doing a (free) operating system (just a hobby, won’t be big and professional like gnu) for 386(486) AT clones. This has been brewing since April, and is starting to get ready.
—— Linus Torvalds (时年 21 岁)

Minix: 完全用于教学的真实操作系统 1987。Andrew Tanenbaum。 Linux 的起点

image-20230604102002909

Kernel

  • 加载第一个进程
    • 相当于在操作系统中 “放置一个位于初始状态的状态机” **init ** systemd
    • Single user model (高权限)
  • 包含一些进程可操纵的操作系统对象
  • 除此之外 “什么也没有”
    • Linux 变为一个中断 (系统调用) 处理程序

Linux Kernel 系统调用上的发行版和应用生态

  • 系统工具 coreutils, binutils, systemd, …
  • 桌面系统 Gnome, xfce, Android
  • 应用程序 file manager, vscode, …

第一个状态机

1.我们可以控制init程序是谁,比如最简单的helloword。当helloword 退出后,也就是杀死最后一个进程,Kernel panic

1
2
3
4
5
6
7
8
9
10
11
INIT := /init
# INIT := /minimal

run:
# Run QEMU with the installed kernel and generated initramfs
qemu-system-x86_64 \
-serial mon:stdio \
-kernel build/vmlinuz \
-initrd build/initramfs.cpio.gz \
-machine accel=kvm:tcg \
-append "console=ttyS0 quiet rdinit=$(INIT)"

2.改回启动Init程序,其中busybox 是一个程序,但包含很多文件 5000行 busybox vi busybox sh,因此我们的init程序就可以使用命令了,再通过软连接就可以直接执行了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#!/bin/busybox sh

# initrd, only busybox and /init
BB=/bin/busybox

# (1) Print something and exit
$BB echo -e "\033[31mHello, OS World\033[0m"
$BB poweroff -f

# (2) Run a shell on the init console
$BB sh

# (3) Rock'n Roll!
for cmd in $($BB --list); do
$BB ln -s $BB /bin/$cmd
done
# /bin/ls -> /bin/busybox
# /bin/cat -> /bin/busybox
# /bin/grep -> /bin/busybox
mkdir -p /tmp
mkdir -p /proc && mount -t proc none /proc
mkdir -p /sys && mount -t sysfs none /sys
mknod /dev/tty c 4 1
setsid /bin/sh </dev/tty >/dev/tty 2>&1

至此我们通过init启动了一个独立sh,sh中可以执行我们的指令

1
2
3
4
5
6
如果我们放一个vi & 在后台
# pstree
init---sh---pstree
-vi

指令都是指向busybox,busybox通过系统调用的参数(执行的名称)实现不同的命令

其他工作

只是一个内存里的小文件系统

  • 我们 “看到” 的都是被 init 创造出来的
    • 加载剩余必要的驱动程序,例如网卡
    • 根据 fstab 中的信息挂载文件系统,例如网络驱动器
    • 将根文件系统和控制权移交pivot_root给另一个程序,例如 systemd

进程的地址空间

进程的状态由 (M,R) 组成,R为体系结构中决定的,M的具体实现?

地址空间内容

pmap pid 读取/proc/pid/maps strace跟踪可以证明

1
2
3
4
5
6
7
8
9
10
11
12
13
# 一大堆内存,有的rx为代码pc访问  有的r(字符串常量)  有的rw(变量 栈)
12345: executable_program
Address Kbytes RSS Dirty Mode Mapping
0000000000400000 2048 976 976 r-x-- executable_program
0000000000600000 1024 512 512 rw--- executable_program
0000000000700000 4096 2048 2048 rw--- [ anon ]
00007f9a28345000 4096 2048 2048 r-x-- libc.so.6
00007f9a283d4000 2048 0 0 ----- libc.so.6
00007f9a285d4000 16 12 0 r---- libc.so.6
00007f9a285d8000 4 4 4 rw--- libc.so.6
00007f9a285d9000 16 4 4 rw--- [ stack ]

# 猜测对不对的上? 加一些变量代码,然后调试

如果是动态链接呢?有一个加载的过程,在main之前把动态链接库(如libc.so.6)加载到地址空间中

系统调用需要陷入内核,但有些简单的(只读的)可以不用陷入内核执行

  • vvar :Virtual Variable 存储内核和用户空间之间共享的变量。这些变量包括与时间相关的信息、线程特定的数据等。
  • vdso:取这些信息的代码,不进入内核的系统调用

空间的管理

程序空间是变化的:操作系统应该提供一个修改进程地址空间的系统调用

1
2
3
4
5
6
7
8
// 映射  fd文件描述符 offset偏移量,提供时把文件内容加载到内存
void *mmap(void *addr, size_t length, int prot, int flags,
int fd, off_t offset);
int munmap(void *addr, size_t length);
// 为 malloc/free 提供了机制

// 修改映射权限
int mprotect(void *addr, size_t length, int prot);

分配8G(可以超过内存)会直接分配,使用时才加入内存发生缺页中断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <unistd.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/mman.h>

#define GiB * (1024LL * 1024 * 1024)

int main() {
volatile uint8_t *p = mmap(NULL, 8 GiB, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
printf("mmap: %lx\n", (uintptr_t)p);
if ((intptr_t)p == -1) {
perror("cannot map");
exit(1);
}
*(p + 2 GiB) = 1;
*(p + 4 GiB) = 2;
*(p + 7 GiB) = 3;
printf("Read get: %d\n", *(p + 4 GiB));
printf("Read get: %d\n", *(p + 6 GiB));
printf("Read get: %d\n", *(p + 7 GiB));
}

Memory-Mapped File: 一致性:什么时候将修改写入磁盘?多进程如何共享?

入侵地址空间

入侵地址空间就是可以任意操控其他进程

  • 调试器 (gdb)
    • gdb 可以任意观测和修改程序的状态
  • Profiler (perf)
    • 合理的需求,操作系统就必须支持
金手指

物理劫持

image-20230605105022909

金山游侠

入侵地址空间,修改金钱生命属性

任何操作系统肯定提供了gdb

原理为pmap读取内存的内容并修改

代码扫描所有地址,找出金钱为3000的,消耗一些后找出价钱为2700的,这些地址值都修改就可以修改金钱

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
int main(int argc, char *argv[]) {
long val;
char buf[64];
struct game game;

if (load_game(&game, argv[1]) < 0) {
goto release;
}

while (!feof(stdin)) {
printf("(%s %d) ", game.name, game.pid);
if (scanf("%s", buf) <= 0) goto release;

switch (buf[0]) {
case 'q': goto release; break;
case 'b': scanf("%ld", &val); game.bits = val; reset(&game); break;
case 's': scanf("%ld", &val); scan(&game, val); break;
case 'w': scanf("%ld", &val); overwrite(&game, val); break;
case 'r': reset(&game); break;
}
}

release:
close_game(&game);
return 0;
}
按键精灵

大量重复固定任务。这个简单,就是给进程发送键盘/鼠标事件

  • 做个驱动 (可编程键盘/鼠标)
  • 利用操作系统/窗口管理器提供的 API
    • xdotool (我们用这玩意测试 vscode 的插件)
    • evdev (按键显示脚本;主播常用)
变速齿轮

跳转游戏逻辑更新速度

程序只是状态机,除了syscall无法感知时间。修改syscall返回值就可以欺骗程序。

代码注入
1
2
3
4
5
6
// 修改 API 调用的值
set_alarm(1000 / FPS); // 希望改成 100 / FPS

// 锁定生命值
hp -= damage; // 希望 “消除” 此次修改
if (hp < 0) game_over();