信号量及管程
10. 信号量及管程¶
并发问题: 存在竞争条件,除过互斥还需要同步等操作,或者临界区里有多个线程执行,锁无法满足
10.1 信号量¶
信号量(sem)用一个整形来表示,有两个原子操作
- P操作: sem减一,如果sem<0,等待,否则继续
- V操作: sem加一,如果sem<=0(当前有些进程在等待这个信号量),唤醒一个等待的P
用一个铁路的例子来说明,铁路有一段是并行的,同时可以走两辆,那么当这段有两辆时,信号量就要求后面再来的火车等待,直到唤醒为止
P和V是荷兰语,荷兰科学家Dijkstra在20世纪60年代提出
- V: Verhoog (荷兰语增加)
- P: Prolaag (荷兰语减少)
信号量机制在早期操作系统里是主要的同步原语,现在则少用了
10.2 信号量的使用¶
信号量是整数,是被保护的变量
初始化完成后,改变信号量的唯一方法是通过P和V
P会阻塞,V不会
V后唤醒哪一个线程,实践中FIFO经常被使用
10.2.1 两种类型信号量¶
- 二进制信号量: 0/1两种取值
- 一般/计数信号量: 可取任何非负值
用二进制信号量实现互斥
在临界区前进行P操作,临界区后进行V操作,即可实现对临界区的互斥访问
muetx = new Semaphore(0);
mutex->P();
// Critical Section
mutex->V();
用二进制信号量实现调度约束
初始化为0,线程A需要在线程B执行到某一步后才能继续执行,那么线程A这步前调用P操作,线程B这步后调用V操作,则线程A只有在B执行完后才会继续执行
更复杂的互斥操作
例如,目前有一个buffer,一个生产者线程会不停向里面写数据,一个消费者线程会从里面取数据,正确性要求
- 同一时间只能有一个线程操作缓冲区
- 缓冲区为空,消费者必须等待生产者
- 缓存区满,生产者必须等待消费者
如何用信号量解决:每个约束用一个单独信号量完成
mutex = new Semaphore(1);
fullBuffers = new Semaphore(0);
emptyBuffers = new Semaphore(n);
- 二进制信号量负责互斥,初始化为1
- 一般信号量fullBuffers,初始化为0
- 一般信号量emptyBuffers,初始化为n
生产者deposit函数
emptyBuffers->P(); // 一直小于0后才会阻塞
mutex->P();
Add c to the buffer
mutex->V();
fullBuffers->V();
消费者remove函数
fullBuffers->P();
mutex->P();
Remove c from buffer
mutex->V();
emptyBuffers->V();
两者的P和V都不能交换顺序
10.3 信号量的实现¶
信号量本身是一个整形
进程的等要通过等待队列实现
class Semaphore {
int sem;
WaitQueue q;
};
Semaphore::P() {
sem --;
if (sem < 0) {
Add this thread t to q;
block(p);
}
}
Semaphore::V() {
sem ++;
if (sem <= 0) {
Remove a thread t from q; // FIFO
wakeup(t);
}
}
信号量不能够处理死锁问题
10.4 管程¶
能不能把信号量更简化一些,管程就是抽象度更高的概念
目的: 分离互斥和条件同步,最开始出现在编程语言的设计里,针对语言并发机制完成
10.4.1 什么是管程(Monitor)¶
- 一个锁: 指定临界区
- 0或者多个条件变量: 等待/通知信号量用于管理并发访问共享数据,将需要等待的资源挂上去
- 如图,右上角是进入管程的队列,进入管程是互斥的
- 进入管程之后,线程要执行管程维护的一系列的函数,即圆柱体
- 执行函数过程中,会对管程维护的一系列共享变量进行操作,即图中x和y,得不到满足时需要等待在条件变量对应队列上,同时释放对管程的lock
- 条件变量有两个操作,wait和signal
具体各部分操作描述如下
Lock
即一个信号量,一个时刻只有一个线程可以进入管程中
Lock::Acquire();
Lock::Release();
Condition Variable
Wait(); // 释放锁,睡眠
Signal(); // 唤醒等待者
Class Condition {
int numWaiting = 0;
WaitQueue = q;
};
Condition::Wait(lock) {
numWaiting ++; // 多了一个睡眠线程
Add this thread t to q; // 加入等待队列
release(lock);
schedule(); // 选择下一个线程来执行,need mutex
require(lock);
}
Condition::Signal() {
if (numWaiting > 0) { // 有线程等待条件变量
Remove a thread t from q;
wakeup(t); // need mutex
numWaiting --;
}
}
用管程来解决buffer问题
classBoundedBuffer {
Lock lock;
int count = 0;
Condition notFull, notEmpty;
};
// 一进入函数就互斥,保证进入管程线程唯一
BoundedBuffer::Deposit(c) {
lock->Acquire();
while (count == n) notFull.Wait(&lock); // 这里即前一步的wait,里面会release掉lock,让其它线程进入执行
Add c to the buffer;
count ++;
notEmpty.Signal();
lock->Release();
}
BoundedBuffer::Remove(c) {
lock->Acquire();
while (count == 0) notEmpty.Wait(&lock);
Remove c from buffer;
count --;
notFull.Signal();
lock->Release();
}
执行顺序
在一个线程signal令一个线程后,因为管程同时只能有一个线程在执行,让哪个线程继续执行,是一个问题
- Hoare-style,让等待的线程执行,再执行发出signal的线程
- Hansen-style,发signal线程执行完后再执行等待线程 (Most real OSes, or Java)
这两种其实会有影响,如上面代码,如果用hoare机制,用if代替while也行,因为这时只唤醒一个等待线程,不用再次用while确认
用Hansen,会有多个线程来抢资源,要用while再次确认
10.5 经典同步问题¶
10.5.1 读者-写者问题¶
读者优先¶
问题描述: 访问共享数据,读者优先,即读者可以跳过正在等待的写者去读数据
- 允许同一时间有多个读者,但任何时候都只有一个写者
- 没有写者时读者才能访问数据
- 没有读者和写者时写者才能访问数据
- 任何时候只有一个线程可以操作共享变量
对读者有一个计数RCount,初始化为0,因为可以有多个读者,同时用二元信号量CountMutex来保护,初始化为1
只会有一个写者,所以不用计数,只用一个二元信号量WriteMutex来保护,初始化为1
因为是对变量的保护,所以用二元信号量
Writer
sem_wait(WriteMutex); // P,减
write;
sem_post(WriteMutex); // V,加
Reader
因为CountMutex是对Rcount的信号量,所以每次操作Rcount前都要先减后增
sem_wait(CountMutex);
if (Rcount == 0)
sem_wait(WriteMutex); // 没有读者,确保没有写者即可
Rcount ++;
sem_post(CountMutex);
read;
sem_wait(CountMutex);
Rcount --;
if (Rcount == 0)
sem_post(WriteMutex);
sem_post(CountMutex)
写者优先¶
一旦写者就绪,后面的写者可以跳过读者来写
管程实现
Monitor's State Variables
AR = 0; // active readers
AW = 0; // active writers
WR = 0; // waiting readers
WW = 0; // waiting writers
Condition okToRead;
Condition okToWrite;
Lock lock;
Reader
Database::Read() {
// Wait until no writers
StartRead();
read database;
// checkout - wake up
DoneRead();
}
Database::StartRead() {
lock.Acquire();
while (AW + WW > 0) {
WR ++;
okToRead.wait(&lock); // 被唤醒后继续执行
WR --;
}
AR ++;
lock.Release();
}
Database::DoneRead() {
lock.Acquire();
AR --;
if (AR == 0 && WW > 0)
okToWrite.signal();
lock.Release();
}
Writer
Database::Write() {
// Wait until no readers/writers 只包含正在读的读者
StartWrite();
write database;
// check out - wake up waiting readers/writers
DoneWrite();
}
Database::StartWrite() {
lock.Acquire();
while (AW + AR > 0) {
WW ++;
okToWrite.wait(&lock);
WW --;
}
AW ++;
lock.Release();
}
Database::DoneWrite() {
lock.Acquire();
AW --;
if (WW > 0)
okToWrite.signal(); // 只唤醒一个线程
else if (WR > 0)
okToRead.broadcast(); // 唤醒所有线程,因为所有读线程都可以读,并且多个Reader可以同时读
lock.Release();
}
10.5.2 哲学家就餐问题¶
问题描述: 1965年Dijkstra提出,5个哲学家围绕着一张圆桌而坐,桌上放着5把叉子,每两个哲学家之间放一支,哲学家动作包括思考和就餐,就餐时需要同时拿起左右两支叉子,思考则同时放下。如何保证哲学家们动作有序进行?(不会有人永远拿不到叉子)
死锁出现¶
#define N 5
void philosopher (int i) {
while (true) {
think();
take_fork(i); // 拿左边叉子
take_fork((i+1)%N); // 拿右边叉子
eat();
put_fork(i);
put_fork((i+1)%N);
}
}
以上方法会出现死锁,所有人先拿左边叉子,再拿右边叉子,会每人拿到一把,死锁
改进¶
#define N 5
void philosopher (int i) {
while (true) {
think();
take_fork(i); // 拿左边叉子
if (fork((i+1)%N)) {
take_fork((i+1)%N);
eat();
break;
} else {
put_fork(i);
wait_random_time();
}
}
}
如果拿不到右边叉子,放下左边叉子,随机等待一段时间,可行,但非万全之策
互斥访问
把拿叉子用锁保护,正确,每次只允许一个人进餐
理论上说,应该允许两个不相邻的哲学家同时进餐
正确算法¶
因为相邻哲学家不能同时进餐,所以检查左右邻居就可以判断出自己能不能进餐
哲学家思考->饥饿->检查最优邻居->拿两把叉子吃东西->放下
拿一把叉子没有意义,这里通过状态来记录是否两把叉子都拿了起来
- 有数据结构来描述哲学家状态
- 有一个数据结构来描述每个哲学家当前状态,该状态是临接资源,读和写互斥
- 唤醒左邻右舍,存在着同步
#define N 5 // 哲学家个数
#define LEFT (i-1+N)%N // 左邻
#define RIGHT (i+1)%N // 右舍
#define THINKING 0 // 思考状态
#define HUNGRY 1 // 饥饿状态
#define EATING 2 // 进餐状态
int state[N] // 记录每个人状态
由于该数组是临界值,需要互斥访问该资源。这里把整个数组都保护起来,因为一个人尝试拿的时候,其他人状态不能改变
semaphore mutex; // 互斥信号量,初值1
一个哲学家吃饱后,可能要唤醒邻居,存在着同步关系
semaphore s[N]; // 同步信号量,初值0
算法主体
// 第i个哲学家执行的函数
void philosopher(int i) {
while (true) {
think();
take_forks(i); // 拿到两把叉子或者被阻塞
eat();
put_forks(i);
}
}
void take_forks(int i) {
P(mutex);
state[i] = HUNGRY;
test_take_left_right_forks(i);
V(mutex);
P(s[i]); // 没有叉子便阻塞
}
void test_take_left_right_forks(int i) {
if (state[i] == HUNGRY &&
state[LEFT] != EATING &&
state[RIGHT] != EATING) {
state[i] = EATING; // 置为EATING即两把叉子已经拿起来
V(s[i]); // 通知自己吃饭,因为之后会有P操作
}
}
void put_forks(int i) {
P(mutex);
state[i] = THINKING; // 只要不是EATING,就是两把叉子都放下
test_take_left_right_forks(LEFT);
test_take_left_right_forks(RIGHT);
V(mutex);
}
put_forks里,左右邻居会被阻塞在take_forks里P(s[i])
这句上,在test成功后,会V(s[i])
,接着会执行eat
剩下think和eat相对简单,eat不需要做任何操作,think只需要在最开始将状态置为THINKING
以上五个进程并发执行即可解决哲学家就餐问题