XV6操作系统代码阅读心得(三):锁
Published:
锁是操作系统中实现进程同步的重要机制。
基本概念
临界区(Critical Section)是指对共享数据进行访问与操作的代码区域。所谓共享数据,就是可能有多个代码执行流并发地执行,并在执行中可能会同时访问的数据。
同步(Synchronization)是指让两个或多个进程/线程能够按照程序员期望的方式来协调执行的顺序。比如,让A进程必须完成某个操作后,B进程才能执行。互斥(Mutual Exclusion)则是指让多个线程不能够同时访问某些数据,必须要一个进程访问完后,另一个进程才能访问。
当多个进程/线程并发地执行并且访问一块数据,并且进程/线程的执行结果依赖于它们的执行顺序,我们就称这种情况为竞争状态(Race Condition)。
Xv6操作系统要求在内核临界区操作时中断必须关闭。如果此时中断开启,那么可能会出现以下死锁情况:A进程在内核态运行并拿下了p锁时,触发中断进入中断处理程序,中断处理程序也在内核态中请求p锁,由于锁在A进程手里,且只有A进程执行时才能释放p锁,因此中断处理程序必须返回,p锁才能被释放。那么此时中断处理程序会永远拿不到锁,陷入无限循环,进入死锁。
Xv6中实现了自旋锁(Spinlock)用于内核临界区访问的同步和互斥。自旋锁最大的特征是当进程拿不到锁时会进入无限循环,直到拿到锁退出循环。Xv6使用100ms一次的时钟中断和Round-Robin调度算法来避免陷入自旋锁的进程一直无限循环下去。显然,自旋锁看上去效率很低,我们很容易想到更加高效的基于等待队列的方法,让等待进程陷入阻塞而不是无限循环。然而,Xv6允许同时运行多个CPU核,多核CPU上的等待队列实现相当复杂,因此使用自旋锁是相对比较简单且能正确执行的实现方案。
Xv6的Spinlock
Xv6中锁的定义如下
// Mutual exclusion lock.
struct spinlock {
uint locked; // Is the lock held?
// For debugging:
char *name; // Name of lock.
struct cpu *cpu; // The cpu holding the lock.
uint pcs[10]; // The call stack (an array of program counters)
// that locked the lock.
};
核心的变量只有一个locked
,当locked
为1时代表锁已被占用,反之未被占用,初始值为0。
在调用锁之前,必须对锁进行初始化。
void initlock(struct spinlock *lk, char *name) {
lk->name = name;
lk->locked = 0;
lk->cpu = 0;
}
最困难的地方是如何对locked
变量进行原子操作占用锁和释放锁。这两步具体被实现为acquire()
和release()
函数。(注意v7版本和v11版本的实现略有不同,本文使用的是v11版本)
acquire()
函数
// Acquire the lock.
// Loops (spins) until the lock is acquired.
// Holding a lock for a long time may cause
// other CPUs to waste time spinning to acquire it.
void acquire(struct spinlock *lk) {
pushcli(); // disable interrupts to avoid deadlock.
if(holding(lk))
panic("acquire");
// The xchg is atomic.
while(xchg(&lk->locked, 1) != 0)
;
// Tell the C compiler and the processor to not move loads or stores
// past this point, to ensure that the critical section's memory
// references happen after the lock is acquired.
__sync_synchronize();
// Record info about lock acquisition for debugging.
lk->cpu = mycpu();
getcallerpcs(&lk, lk->pcs);
}
acquire()
函数首先禁止了中断,并且使用专门的pushcli()
函数,这个函数保证了如果有两个acquire()
禁止了中断,那么也必须调用两次release()
中的popcli()
后中断才会被允许。然后,acquire()
函数采用xchg
指令来实现在设置locked
为1的同时获得其原来的值的操作。这里的C代码中封装了一个xchg()
函数,在xchg()
函数中采用GCC的内联汇编特性,实现如下
static inline uint xchg(volatile uint *addr, uint newval) {
uint result;
// The + in "+m" denotes a read-modify-write operand.
asm volatile("lock; xchgl %0, %1" :
"+m" (*addr), "=a" (result) :
"1" (newval) :
"cc");
return result;
}
其中,volatile
标志用于避免gcc对其进行一些优化;第一个冒号后的"+m" (*addr), "=a" (result)
是这个汇编指令的两个输出值;newval
是这个汇编指令的输入值。假设newval
位于eax
寄存器中,addr
位于rax
寄存器中,那么gcc会翻译得到如下汇编指令
lock; xchgl (%rdx), %eax
由于xchg
函数是inline
的,它会被直接嵌入调用xchg
函数的代码中,使用的寄存器可能会有所不同。
下面我们来分析一下上面的指令的语义。·lock
是一个指令前缀,它保证了这条指令对总线和缓存的独占权,也就是这条指令的执行过程中不会有其他CPU或同CPU内的指令访问缓存和内存。由于现代CPU一般是多发射流水线+乱序执行的,因此一般情况下并不能保证这一点。xchgl
指令是一条古老的x86指令,作用是交换两个寄存器或者内存地址里的4字节值,两个值不能都是内存地址,他不会设置条件码。
那么,仔细思考一下就能发现,以上一条xchg
指令就同时做到了交换locked和1的值,并且在之后通过检查eax
寄存器就能知道locked的值是否为0。并且,以上操作是原子的,这就保证了有且只有一个进程能够拿到locked的0值并且进入临界区。
最后,acquire()
函数使用__sync_synchronize
为了避免编译器对这段代码进行指令顺序调整的话和避免CPU在这块代码采用乱序执行的优化。
release()
函数
// Release the lock.
void release(struct spinlock *lk) {
if(!holding(lk))
panic("release");
lk->pcs[0] = 0;
lk->cpu = 0;
// Tell the C compiler and the processor to not move loads or stores
// past this point, to ensure that all the stores in the critical
// section are visible to other cores before the lock is released.
// Both the C compiler and the hardware may re-order loads and
// stores; __sync_synchronize() tells them both not to.
__sync_synchronize();
// Release the lock, equivalent to lk->locked = 0.
// This code can't use a C assignment, since it might
// not be atomic. A real OS would use C atomics here.
asm volatile("movl $0, %0" : "+m" (lk->locked) : );
popcli();
}
release
函数为了保证设置locked为0的操作的原子性,同样使用了内联汇编。最后,使用popcli()来允许中断(或者弹出一个cli,但因为其他锁未释放使得中断依然被禁止)。
在Xv6中实现信号量
struct semaphore {
int value;
struct spinlock lock;
struct proc *queue[NPROC];
int end;
int start;
};
void sem_init(struct semaphore *s, int value) {
s->value = value;
initlock(&s->lock, "semaphore_lock");
end = start = 0;
}
void sem_wait(struct semaphore *s) {
acquire(&s->lock);
s->value--;
if (s->value < 0) {
s->queue[s->end] = myproc();
s->end = (s->end + 1) % NPROC;
sleep(myproc(), &s->lock)
}
release(&s->lock);
}
void sem_signal(struct semaphore *s) {
acquire(&s->lock);
s->value++;
if (s->value <= 0) {
wakeup(s->queue[s->start]);
s->queue[s->start] = 0;
s->start = (s->start + 1) % NPROC;
}
release(&s->lock);
}
上面的代码使用Xv6提供的接口实现了信号量,格式和命名与POSIX标准类似。这个信号量的实现采用等待队列的方式。当一个进程因信号量陷入阻塞时,会将自己放进等待队列并睡眠(18-22行)。当一个进程释放信号量时,会从等待队列中取出一个进程继续执行(29-33行)。
Leave a Comment