跳转至

信号量及管程

10. 信号量及管程

并发问题: 存在竞争条件,除过互斥还需要同步等操作,或者临界区里有多个线程执行,锁无法满足

10.1 信号量

信号量(sem)用一个整形来表示,有两个原子操作

  • P操作: sem减一,如果sem<0,等待,否则继续
  • V操作: sem加一,如果sem<=0(当前有些进程在等待这个信号量),唤醒一个等待的P

用一个铁路的例子来说明,铁路有一段是并行的,同时可以走两辆,那么当这段有两辆时,信号量就要求后面再来的火车等待,直到唤醒为止

P和V是荷兰语,荷兰科学家Dijkstra在20世纪60年代提出

  • V: Verhoog (荷兰语增加)
  • P: Prolaag (荷兰语减少)

信号量机制在早期操作系统里是主要的同步原语,现在则少用了


10.2 信号量的使用

信号量是整数,是被保护的变量

初始化完成后,改变信号量的唯一方法是通过PV

P会阻塞,V不会

V后唤醒哪一个线程,实践中FIFO经常被使用

10.2.1 两种类型信号量

  • 二进制信号量: 0/1两种取值
  • 一般/计数信号量: 可取任何非负值

用二进制信号量实现互斥

在临界区前进行P操作,临界区后进行V操作,即可实现对临界区的互斥访问

muetx = new Semaphore(0);
mutex->P();
// Critical Section
mutex->V();

用二进制信号量实现调度约束

初始化为0,线程A需要在线程B执行到某一步后才能继续执行,那么线程A这步前调用P操作,线程B这步后调用V操作,则线程A只有在B执行完后才会继续执行

更复杂的互斥操作

例如,目前有一个buffer,一个生产者线程会不停向里面写数据,一个消费者线程会从里面取数据,正确性要求

  • 同一时间只能有一个线程操作缓冲区
  • 缓冲区为空,消费者必须等待生产者
  • 缓存区满,生产者必须等待消费者

如何用信号量解决:每个约束用一个单独信号量完成

mutex = new Semaphore(1);
fullBuffers = new Semaphore(0);
emptyBuffers = new Semaphore(n);
  • 二进制信号量负责互斥,初始化为1
  • 一般信号量fullBuffers,初始化为0
  • 一般信号量emptyBuffers,初始化为n

生产者deposit函数

emptyBuffers->P(); // 一直小于0后才会阻塞
mutex->P();
Add c to the buffer
mutex->V();
fullBuffers->V();

消费者remove函数

fullBuffers->P();
mutex->P();
Remove c from buffer
mutex->V();
emptyBuffers->V();

两者的P和V都不能交换顺序


10.3 信号量的实现

信号量本身是一个整形

进程的等要通过等待队列实现

class Semaphore {
    int sem;
    WaitQueue q;
};

Semaphore::P() {
    sem --;
    if (sem < 0) {
        Add this thread t to q;
        block(p);
    }
}

Semaphore::V() {
    sem ++;
    if (sem <= 0) {
        Remove a thread t from q; // FIFO
        wakeup(t);
    }
}

信号量不能够处理死锁问题


10.4 管程

能不能把信号量更简化一些,管程就是抽象度更高的概念

目的: 分离互斥和条件同步,最开始出现在编程语言的设计里,针对语言并发机制完成

10.4.1 什么是管程(Monitor)

  • 一个锁: 指定临界区
  • 0或者多个条件变量: 等待/通知信号量用于管理并发访问共享数据,将需要等待的资源挂上去

  • 如图,右上角是进入管程的队列,进入管程是互斥的
  • 进入管程之后,线程要执行管程维护的一系列的函数,即圆柱体
  • 执行函数过程中,会对管程维护的一系列共享变量进行操作,即图中x和y,得不到满足时需要等待在条件变量对应队列上,同时释放对管程的lock
  • 条件变量有两个操作,wait和signal

具体各部分操作描述如下

Lock

即一个信号量,一个时刻只有一个线程可以进入管程中

Lock::Acquire();
Lock::Release();

Condition Variable

Wait(); // 释放锁,睡眠
Signal(); // 唤醒等待者

Class Condition {
    int numWaiting = 0;
    WaitQueue = q;
};

Condition::Wait(lock) {
    numWaiting ++; // 多了一个睡眠线程
    Add this thread t to q; // 加入等待队列
    release(lock);
    schedule(); // 选择下一个线程来执行,need mutex
    require(lock);
}

Condition::Signal() {
    if (numWaiting > 0) { // 有线程等待条件变量
        Remove a thread t from q;
        wakeup(t); // need mutex
        numWaiting --;
    }
}

用管程来解决buffer问题

classBoundedBuffer {
    Lock lock;
    int count = 0;
    Condition notFull, notEmpty;
};

// 一进入函数就互斥,保证进入管程线程唯一
BoundedBuffer::Deposit(c) {
    lock->Acquire();
    while (count == n) notFull.Wait(&lock); // 这里即前一步的wait,里面会release掉lock,让其它线程进入执行
    Add c to the buffer;
    count ++;
    notEmpty.Signal();
    lock->Release();
}

BoundedBuffer::Remove(c) {
    lock->Acquire();
    while (count == 0) notEmpty.Wait(&lock);
    Remove c from buffer;
    count --;
    notFull.Signal();
    lock->Release();
}

执行顺序

在一个线程signal令一个线程后,因为管程同时只能有一个线程在执行,让哪个线程继续执行,是一个问题

  • Hoare-style,让等待的线程执行,再执行发出signal的线程
  • Hansen-style,发signal线程执行完后再执行等待线程 (Most real OSes, or Java)

这两种其实会有影响,如上面代码,如果用hoare机制,用if代替while也行,因为这时只唤醒一个等待线程,不用再次用while确认

用Hansen,会有多个线程来抢资源,要用while再次确认


10.5 经典同步问题

10.5.1 读者-写者问题

读者优先

问题描述: 访问共享数据,读者优先,即读者可以跳过正在等待的写者去读数据

  • 允许同一时间有多个读者,但任何时候都只有一个写者
  • 没有写者时读者才能访问数据
  • 没有读者和写者时写者才能访问数据
  • 任何时候只有一个线程可以操作共享变量

对读者有一个计数RCount,初始化为0,因为可以有多个读者,同时用二元信号量CountMutex来保护,初始化为1

只会有一个写者,所以不用计数,只用一个二元信号量WriteMutex来保护,初始化为1

因为是对变量的保护,所以用二元信号量

Writer

sem_wait(WriteMutex); // P,减
write;
sem_post(WriteMutex); // V,加

Reader

因为CountMutex是对Rcount的信号量,所以每次操作Rcount前都要先减后增

sem_wait(CountMutex);
if (Rcount == 0)
    sem_wait(WriteMutex); // 没有读者,确保没有写者即可
Rcount ++;
sem_post(CountMutex);
read;
sem_wait(CountMutex);
Rcount --;
if (Rcount == 0)
    sem_post(WriteMutex);
sem_post(CountMutex)
写者优先

一旦写者就绪,后面的写者可以跳过读者来写

管程实现

Monitor's State Variables

AR = 0; // active readers
AW = 0; // active writers
WR = 0; // waiting readers
WW = 0; // waiting writers
Condition okToRead;
Condition okToWrite;
Lock lock;

Reader

Database::Read() {
    // Wait until no writers
    StartRead();
    read database;
    // checkout - wake up
    DoneRead();
}

Database::StartRead() {
    lock.Acquire();
    while (AW + WW > 0) {
        WR ++;
        okToRead.wait(&lock); // 被唤醒后继续执行
        WR --;
    }
    AR ++;
    lock.Release();
}

Database::DoneRead() {
    lock.Acquire();
    AR --;
    if (AR == 0 && WW > 0)
        okToWrite.signal();
    lock.Release();
}

Writer

Database::Write() {
    // Wait until no readers/writers 只包含正在读的读者
    StartWrite();
    write database;
    // check out - wake up waiting readers/writers
    DoneWrite();
}

Database::StartWrite() {
    lock.Acquire();
    while (AW + AR > 0) {
        WW ++;
        okToWrite.wait(&lock);
        WW --;
    }
    AW ++;
    lock.Release();
}

Database::DoneWrite() {
    lock.Acquire();
    AW --;
    if (WW > 0)
        okToWrite.signal();  // 只唤醒一个线程
    else if (WR > 0)
        okToRead.broadcast(); // 唤醒所有线程,因为所有读线程都可以读,并且多个Reader可以同时读
    lock.Release();
}

10.5.2 哲学家就餐问题

问题描述: 1965年Dijkstra提出,5个哲学家围绕着一张圆桌而坐,桌上放着5把叉子,每两个哲学家之间放一支,哲学家动作包括思考和就餐,就餐时需要同时拿起左右两支叉子,思考则同时放下。如何保证哲学家们动作有序进行?(不会有人永远拿不到叉子)

死锁出现
#define N 5
void philosopher (int i) {
    while (true) {
        think();
        take_fork(i); // 拿左边叉子
        take_fork((i+1)%N); // 拿右边叉子
        eat();
        put_fork(i);
        put_fork((i+1)%N);
    }
}

以上方法会出现死锁,所有人先拿左边叉子,再拿右边叉子,会每人拿到一把,死锁

改进
#define N 5
void philosopher (int i) {
    while (true) {
        think();
        take_fork(i); // 拿左边叉子
        if (fork((i+1)%N)) {
            take_fork((i+1)%N);
            eat();
            break;
        } else {
            put_fork(i);
            wait_random_time();
        }
    }
}

如果拿不到右边叉子,放下左边叉子,随机等待一段时间,可行,但非万全之策

互斥访问

把拿叉子用锁保护,正确,每次只允许一个人进餐

理论上说,应该允许两个不相邻的哲学家同时进餐

正确算法

因为相邻哲学家不能同时进餐,所以检查左右邻居就可以判断出自己能不能进餐

哲学家思考->饥饿->检查最优邻居->拿两把叉子吃东西->放下

拿一把叉子没有意义,这里通过状态来记录是否两把叉子都拿了起来

  • 有数据结构来描述哲学家状态
  • 有一个数据结构来描述每个哲学家当前状态,该状态是临接资源,读和写互斥
  • 唤醒左邻右舍,存在着同步
#define N 5  // 哲学家个数
#define LEFT (i-1+N)%N // 左邻
#define RIGHT (i+1)%N  // 右舍
#define THINKING 0   // 思考状态
#define HUNGRY   1   // 饥饿状态
#define EATING   2   // 进餐状态
int state[N]         // 记录每个人状态

由于该数组是临界值,需要互斥访问该资源。这里把整个数组都保护起来,因为一个人尝试拿的时候,其他人状态不能改变

semaphore mutex;  // 互斥信号量,初值1

一个哲学家吃饱后,可能要唤醒邻居,存在着同步关系

semaphore s[N];   // 同步信号量,初值0

算法主体

// 第i个哲学家执行的函数
void philosopher(int i) {
    while (true) {
        think();
        take_forks(i); // 拿到两把叉子或者被阻塞
        eat();
        put_forks(i);
    }
}

void take_forks(int i) {
    P(mutex);
    state[i] = HUNGRY;
    test_take_left_right_forks(i);
    V(mutex);
    P(s[i]);    // 没有叉子便阻塞
}

void test_take_left_right_forks(int i) {
    if (state[i] == HUNGRY &&
        state[LEFT] != EATING &&
        state[RIGHT] != EATING) {
        state[i] = EATING;   // 置为EATING即两把叉子已经拿起来
        V(s[i]);  // 通知自己吃饭,因为之后会有P操作
    }
}

void put_forks(int i) {
    P(mutex);
    state[i] = THINKING; // 只要不是EATING,就是两把叉子都放下
    test_take_left_right_forks(LEFT);
    test_take_left_right_forks(RIGHT);
    V(mutex);
}

put_forks里,左右邻居会被阻塞在take_forks里P(s[i])这句上,在test成功后,会V(s[i]),接着会执行eat

剩下think和eat相对简单,eat不需要做任何操作,think只需要在最开始将状态置为THINKING

以上五个进程并发执行即可解决哲学家就餐问题