XV6操作系统代码阅读心得(三):锁

2 minute read

Published:

锁是操作系统中实现进程同步的重要机制。

基本概念

临界区(Critical Section)是指对共享数据进行访问与操作的代码区域。所谓共享数据,就是可能有多个代码执行流并发地执行,并在执行中可能会同时访问的数据。

同步(Synchronization)是指让两个或多个进程/线程能够按照程序员期望的方式来协调执行的顺序。比如,让A进程必须完成某个操作后,B进程才能执行。互斥(Mutual Exclusion)则是指让多个线程不能够同时访问某些数据,必须要一个进程访问完后,另一个进程才能访问。

当多个进程/线程并发地执行并且访问一块数据,并且进程/线程的执行结果依赖于它们的执行顺序,我们就称这种情况为竞争状态(Race Condition)。

Xv6操作系统要求在内核临界区操作时中断必须关闭。如果此时中断开启,那么可能会出现以下死锁情况:A进程在内核态运行并拿下了p锁时,触发中断进入中断处理程序,中断处理程序也在内核态中请求p锁,由于锁在A进程手里,且只有A进程执行时才能释放p锁,因此中断处理程序必须返回,p锁才能被释放。那么此时中断处理程序会永远拿不到锁,陷入无限循环,进入死锁。

Xv6中实现了自旋锁(Spinlock)用于内核临界区访问的同步和互斥。自旋锁最大的特征是当进程拿不到锁时会进入无限循环,直到拿到锁退出循环。Xv6使用100ms一次的时钟中断和Round-Robin调度算法来避免陷入自旋锁的进程一直无限循环下去。显然,自旋锁看上去效率很低,我们很容易想到更加高效的基于等待队列的方法,让等待进程陷入阻塞而不是无限循环。然而,Xv6允许同时运行多个CPU核,多核CPU上的等待队列实现相当复杂,因此使用自旋锁是相对比较简单且能正确执行的实现方案。

Xv6的Spinlock

Xv6中锁的定义如下

// Mutual exclusion lock.
struct spinlock {
  uint locked;       // Is the lock held?

  // For debugging:
  char *name;        // Name of lock.
  struct cpu *cpu;   // The cpu holding the lock.
  uint pcs[10];      // The call stack (an array of program counters)
                     // that locked the lock.
};

核心的变量只有一个locked,当locked为1时代表锁已被占用,反之未被占用,初始值为0。

在调用锁之前,必须对锁进行初始化。

void initlock(struct spinlock *lk, char *name) {
  lk->name = name;
  lk->locked = 0;
  lk->cpu = 0;
}

最困难的地方是如何对locked变量进行原子操作占用锁和释放锁。这两步具体被实现为acquire()release()函数。(注意v7版本和v11版本的实现略有不同,本文使用的是v11版本)

acquire()函数

// Acquire the lock.
// Loops (spins) until the lock is acquired.
// Holding a lock for a long time may cause
// other CPUs to waste time spinning to acquire it.
void acquire(struct spinlock *lk) {
  pushcli(); // disable interrupts to avoid deadlock.
  if(holding(lk))
    panic("acquire");

  // The xchg is atomic.
  while(xchg(&lk->locked, 1) != 0)
    ;

  // Tell the C compiler and the processor to not move loads or stores
  // past this point, to ensure that the critical section's memory
  // references happen after the lock is acquired.
  __sync_synchronize();

  // Record info about lock acquisition for debugging.
  lk->cpu = mycpu();
  getcallerpcs(&lk, lk->pcs);
}

acquire()函数首先禁止了中断,并且使用专门的pushcli()函数,这个函数保证了如果有两个acquire()禁止了中断,那么也必须调用两次release()中的popcli()后中断才会被允许。然后,acquire()函数采用xchg指令来实现在设置locked为1的同时获得其原来的值的操作。这里的C代码中封装了一个xchg()函数,在xchg()函数中采用GCC的内联汇编特性,实现如下

static inline uint xchg(volatile uint *addr, uint newval) {
  uint result;
  // The + in "+m" denotes a read-modify-write operand.
  asm volatile("lock; xchgl %0, %1" :
               "+m" (*addr), "=a" (result) :
               "1" (newval) :
               "cc");
  return result;
}

其中,volatile标志用于避免gcc对其进行一些优化;第一个冒号后的"+m" (*addr), "=a" (result)是这个汇编指令的两个输出值;newval是这个汇编指令的输入值。假设newval位于eax寄存器中,addr位于rax寄存器中,那么gcc会翻译得到如下汇编指令

 lock; xchgl (%rdx), %eax

由于xchg函数是inline的,它会被直接嵌入调用xchg函数的代码中,使用的寄存器可能会有所不同。

下面我们来分析一下上面的指令的语义。·lock是一个指令前缀,它保证了这条指令对总线和缓存的独占权,也就是这条指令的执行过程中不会有其他CPU或同CPU内的指令访问缓存和内存。由于现代CPU一般是多发射流水线+乱序执行的,因此一般情况下并不能保证这一点。xchgl指令是一条古老的x86指令,作用是交换两个寄存器或者内存地址里的4字节值,两个值不能都是内存地址,他不会设置条件码。

那么,仔细思考一下就能发现,以上一条xchg指令就同时做到了交换locked和1的值,并且在之后通过检查eax寄存器就能知道locked的值是否为0。并且,以上操作是原子的,这就保证了有且只有一个进程能够拿到locked的0值并且进入临界区。

最后,acquire()函数使用__sync_synchronize为了避免编译器对这段代码进行指令顺序调整的话和避免CPU在这块代码采用乱序执行的优化。

release()函数

// Release the lock.
void release(struct spinlock *lk) {
  if(!holding(lk))
    panic("release");

  lk->pcs[0] = 0;
  lk->cpu = 0;

  // Tell the C compiler and the processor to not move loads or stores
  // past this point, to ensure that all the stores in the critical
  // section are visible to other cores before the lock is released.
  // Both the C compiler and the hardware may re-order loads and
  // stores; __sync_synchronize() tells them both not to.
  __sync_synchronize();

  // Release the lock, equivalent to lk->locked = 0.
  // This code can't use a C assignment, since it might
  // not be atomic. A real OS would use C atomics here.
  asm volatile("movl $0, %0" : "+m" (lk->locked) : );

  popcli();
}

release函数为了保证设置locked为0的操作的原子性,同样使用了内联汇编。最后,使用popcli()来允许中断(或者弹出一个cli,但因为其他锁未释放使得中断依然被禁止)。

在Xv6中实现信号量

struct semaphore {
  int value;
  struct spinlock lock;
  struct proc *queue[NPROC];
  int end;
  int start;
};

void sem_init(struct semaphore *s, int value) {
  s->value = value;
  initlock(&s->lock, "semaphore_lock");
  end = start = 0;
}

void sem_wait(struct semaphore *s) {
  acquire(&s->lock);
  s->value--;
  if (s->value < 0) {
    s->queue[s->end] = myproc();
    s->end = (s->end + 1) % NPROC;
    sleep(myproc(), &s->lock)
  }
  release(&s->lock);
}

void sem_signal(struct semaphore *s) {
  acquire(&s->lock);
  s->value++;
  if (s->value <= 0) {
    wakeup(s->queue[s->start]);
    s->queue[s->start] = 0;
    s->start = (s->start + 1) % NPROC;
  }
  release(&s->lock);
}

上面的代码使用Xv6提供的接口实现了信号量,格式和命名与POSIX标准类似。这个信号量的实现采用等待队列的方式。当一个进程因信号量陷入阻塞时,会将自己放进等待队列并睡眠(18-22行)。当一个进程释放信号量时,会从等待队列中取出一个进程继续执行(29-33行)。