Задача постачальника-споживача
Задача постачальника-споживача (англ. producer-consumer problem), також відома як задача обмеженого буфера (англ. bounded-buffer problem) - це класичний приклад задачі синхронізації кількох процесів. Задача описує два процеси, постачальник і споживач, які спільно використовують буфер встановленого розміру. Завданням постачальника є створення фрагменту даних, запис його до буфера і повторення цих дій раз за разом. Одночасно з цим споживач споживає дані (тобто, видаляє їх з буфера) по одному фрагменту за раз. Задача полягає в тому, щоб не дати постачальнику записати дані, коли буфер повний, а споживачу не дати видалити дані з порожнього буфера.
Розв'язком може бути перехід у стан очікування, якщо буфер повний, або ігнорування даних в такому випадку (сценарій повільного споживача). Коли споживач видалить наступні дані з буфера, він сповіщає про це постачальника, який починає заповнювати буфер знов. Таким самим чином споживач може перейти в режим очікування, якщо буфер виявиться порожнім. Щойно постачальник запише наступні дані він сповіщує про це споживача. Розв'язання задачі можна досягнути за допомогою взаємодії між процесами, зазвичай використовуються семафори. Невідповідний підхід може спричинити взаємне блокування, коли обидва процеси опиняться в стані очікування. Задача може бути узагальнена на випадок багатьох постачальників і споживачів.
Реалізація
Непридатна реалізація
Наведений розв’язок потенційно спричиняє стан гонитви. Використовуються дві бібліотечні функції, sleep і wakeup. Процес, що викликав sleep, блокується допоки інший процес не розблокує його через використання wakeup. Глобальна зміння itemCount містить кількість записів у буфері.
int itemCount;
procedure producer() {
while (true) {
item = produceItem();
if (itemCount == BUFFER_SIZE) {
sleep();
}
putItemIntoBuffer(item);
itemCount = itemCount + 1;
if (itemCount == 1) {
wakeup(consumer);
}
}
}
procedure consumer() {
while (true) {
if (itemCount == 0) {
sleep();
}
item = removeItemFromBuffer();
itemCount = itemCount - 1;
if (itemCount == BUFFER_SIZE - 1) {
wakeup(producer);
}
consumeItem(item);
}
}
Недолік цього розв'язку в тому, що він робить можливим стан гонитви, який призводить до взаємного блокування. Уявімо наступний сценарій:
- Споживач щойно прочитав змінну itemCount, встановив, що вона дорівнює нулю, отже має увійти в if-блок.
- Просто перед викликом sleep споживач переривається і запускається постачальник.
- Постачальник створює запис, записує його до буфера і збільшує itemCount на одиницю.
- Оскільки буфер перед цим був порожнім, постачальник пробує розбудити споживача.
- Споживач не перебуває у стані сну, тож виклик wakeup не має наслідків. Споживач переходить до інструкції sleep, засинає і вже ніколи не буде пробуджений, оскільки постачальник будить споживача лише тоді, коли itemCount дорівнює 1.
- Постачальник працюватиме до заповнення буфера, після чого також засне.
Обидва процеси спатимуть довічно, що є взаємним блокуванням.
Альтернативне доведення непридатності розв’язку полягяє в тому, що якщо мова програмування не визначає способів одночасного доступу до спільних змінних (наразі itemCount) без використання механізмів синхронізації, рішення є незадовільним вже з самої цієї причини, навіть без необхідності явного доказу можливості настання стану гонитви.
Використання семафорів
Семафори розв'язують питання неспрацювання виклику wakeup. В рішенні нижче використовуються два семафори, fillCount і emptyCount. fillCount — це кількість доступних записів у буфері, emptyCount – кількість вільних місць в буфері. fillCount збільшується, а emptyCount зменшується коли створюється новий запис. Якщо постачальник намагається зменшити emptyCount, коли його значення нуль, він засинає. Коли споживач зчитує наступний запис, emptyCount збільшується і постачальник прокидається. Споживач працює подібним чином.
semaphore fillCount = 0; // фрагментів записано
semaphore emptyCount = BUFFER_SIZE; // залишилось місця
procedure producer() {
while (true) {
item = produceItem();
down(emptyCount);
putItemIntoBuffer(item);
up(fillCount);
}
}
procedure consumer() {
while (true) {
down(fillCount);
item = removeItemFromBuffer();
up(emptyCount);
consumeItem(item);
}
}
Наведений розв'язок працює добре в разі, коли присутні один постачальник та один споживач. Однак у випадку з багатьма постачальниками або багатьма споживачами, що використовують спільну пам’ять для буферу, цей розв'язок призводить до стану гонитви, що може призвести до читання з однієї або писання до однієї комірки кількома процесами одночасно. Для розуміння, як це можливо, уявімо як може бути реалізована підпрограма putItemIntoBuffer(). Вона може містити дві дії, перша це визначення доступного слоту і друга це запис в нього. ця процедура може виконуватись одночасно декількома процесами, що уможливлює такий перебіг подій:
- Два постачальники зменшують emptyCount
- Один з них визначає наступну порожню комірку в буфері
- Другий постачальник визначає наступну порожню комірку і отримує однаковий з першим результат
- Обидва пишуть в одну комірку
Для подалання цієї проблеми, ми маємо переконатись, що лиш один постачальник виконує putItemIntoBuffer() в цей час. Інакше кажучи, ми потребуємо спосіб для виконання критичної секції із взаємним виключенням. Для успішного виконання цієї задачі ми використаємо двійковий семафор, відомий як м'ютекс. Через те, що значення м'ютекса може бути або нуль, або одиниця, тільки один процес може виконуватись між down(mutex) і up(mutex). Розв'язок для багатьох постачальників і споживачів наведений вище.
semaphore mutex = 1;
semaphore fillCount = 0;
semaphore emptyCount = BUFFER_SIZE;
procedure producer() {
while (true) {
item = produceItem();
down(emptyCount);
down(mutex);
putItemIntoBuffer(item);
up(mutex);
up(fillCount);
}
}
procedure consumer() {
while (true) {
down(fillCount);
down(mutex);
item = removeItemFromBuffer();
up(mutex);
up(emptyCount);
consumeItem(item);
}
}
Зауважте, що черговість збільшення та зменшення різних семафорів важлива: зміна черговості може спричинити взаємне блокування.
Використання моніторів
Наступний псевдокод наводить розв'язок задачі постачальника-споживача із використанням моніторів. Через прихованість взаємного виключення в випадку використання моніторів, додаткові зусилля для захисту критичної секції не потрібні. Інакше кажучи, розв'язок наведений нижче працює з будь-якою кількістю постачальників і споживачів без змін. Варто також зазначити, що використання моніторів робить можливість гонитви значно меншою порівняно з використанням семафорів.
monitor ProducerConsumer {
int itemCount
condition full;
condition empty;
procedure add(item) {
while (itemCount == BUFFER_SIZE) {
wait(full);
}
putItemIntoBuffer(item);
itemCount = itemCount + 1;
if (itemCount == 1) {
notify(empty);
}
}
procedure remove() {
while (itemCount == 0) {
wait(empty);
}
item = removeItemFromBuffer();
itemCount = itemCount - 1;
if (itemCount == BUFFER_SIZE - 1) {
notify(full);
}
return item;
}
}
procedure producer() {
while (true) {
item = produceItem()
ProducerConsumer.add(item)
}
}
procedure consumer() {
while (true) {
item = ProducerConsumer.remove()
consumeItem()
}
}
Зауважте використання while в коді згори у випадку перевірки на порожність або заповнення буфера. Якщо while замінити на if, забагато записів може бути записано в буфер або може відбутися спроба видалення з порожнього буфера.
Без семафорів або моніторів
Задача постачальника-споживача, особливо у випадку одного постачальника та одного споживача, має значну подібність до FIFO або каналу зв'язку. Шаблон постачальник споживач може забезпечити високо дієву передачу даних без покладання на семафори, м'ютекси або монітори для передачі даних. Використання цих примітивів може вплинути на швидкодію через високу вартість здійснення. Базовий код на C подано нижче. Зауважте, що:
- Атомарний читати-змінювати-записувати доступ до спільних змінних не використовується: кожна з двох змінних лічильників оновлюється лише одним потоком.
- Цей приклад не присипляє потоки, хоча це може бути вдалим рішенням в деяких системах.
volatile unsigned int produceCount, consumeCount;
TokenType buffer[BUFFER_SIZE];
void producer(void) {
while (1) {
while (produceCount - consumeCount == BUFFER_SIZE)
sched_yield(); // буфер повний
buffer[produceCount % BUFFER_SIZE] = produceToken();
produceCount += 1;
}
}
void consumer(void) {
while (1) {
while (produceCount - consumeCount == 0)
sched_yield(); // буфер порожній
consumeToken( buffer[consumeCount % BUFFER_SIZE]);
consumeCount += 1;
}
}