Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
412 views
in Technique[技术] by (71.8m points)

c - pthread_cond_wait and pthread_mutex_lock priority?

I have a multiple read threads and one write thread. If I lock mutex on one of the read threads and send broadcast from it, is it guaranteed that mutex will be locked by write thread waiting on pthread_cond_wait() or is there a possibility that another read thread that is wainting on pthread_mutex_lock() will lock mutex? Main question is does pthread_cond_wait() have priority over pthread_mutex_lock()?

If not, how can I achieve that the mutex will always be locked by write thread on pthread_cond_broadcast()?

Example

Read thread:

pthread_mutex_lock(mutex);
pthread_cond_broadcast(cond);
pthread_mutex_unlock(mutex);

Write thread:

pthread_mutex_lock(&mutex);
pthread_cond_wait(&cond, &mutex);
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Let's assume both threads, read and write, reach the pthread_mutex_lock in the same moment. So, either write thread acquire the mutex on pthread_mutex_lock call, or read thread.

If it would be the write thread, the read one will wait on pthread_mutex_lock. The write, by calling pthread_cond_wait releases mutex and blocks on cond. It is done atomically. So, when read thread is grantex the mutex, we can be sure the the read one waits on cond. So, broadcast on cond reaches the write thread, it no more waits on cond but - still in scope of pthread_cond_wait - tries to get a lock on mutex (hold be read thread). After broadcasting cond the read thread releases the mutex and it goes to write thread. So write thread finally exits from pthread_cond_wait having the mutex locked. Remember to unlock it later.

If it would be the read thread, the write one will wait on pthread_mutex_lock, the read will broadcast a signal on cond then release the mutex. After then the write thread acquires the mutex on pthread_mutex_lock and immediately releases in it pthread_cond_wait waiting for cond (please note, that previous cond broadcast has no effect on current pthread_cond_wait). In the next iteration of read thread it acquires lock onmutex, send broadcast on cond and unlock mutex. It means the write thread moves forward on cond and acquires lock on mutex.

Does it answer your question about priority?


Update after comment.

Let's assume we have one thread (let's name it A for future reference) holding the lock on mutex and few other trying to acquire the same lock. As soon as the lock is released by first thread, there is no predictable which thread would acquire lock. Moreover, if the A thread has a loop and tries to reacquire lock on mutex, there is a chance it would be granted this lock and other threads would keep waiting. Adding pthread_cond_wait doesn't change anything in scope of granting a lock.

Let me quote fragments of POSIX specification (see https://stackoverflow.com/a/9625267/2989411 for reference):

These functions atomically release mutex and cause the calling thread to block on the condition variable cond; atomically here means "atomically with respect to access by another thread to the mutex and then the condition variable". That is, if another thread is able to acquire the mutex after the about-to-block thread has released it, then a subsequent call to pthread_cond_broadcast() or pthread_cond_signal() in that thread shall behave as if it were issued after the about-to-block thread has blocked.

And this is only guarantee given by standard regarding order of operations. Order of granting the lock to other threads is rather unpredictable and it changes depending on some very subtle fluctuation in timing.

For only mutex related code, please play a little with following code:

#define _GNU_SOURCE
#include <pthread.h>

#include <stdio.h>
#include <unistd.h>

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

void *th(void *arg) {
    int i;
    char *s = arg;
    for (i = 0; i < 10; ++i) {
        pthread_mutex_lock(&mutex);
        printf("%s %d
", s, i);
        //sleep(1);
        pthread_mutex_unlock(&mutex);
#if 0
        pthread_yield();
#endif
    }
    return NULL;
}

int main() {
    int i;
    for (i = 0; i < 10; ++i) {
        pthread_t t1, t2, t3;
        printf("================================
");
        pthread_create(&t1, NULL, th, "t1");
        pthread_create(&t2, NULL, th, "     t2");
        pthread_create(&t3, NULL, th, "            t3");
        pthread_join(t1, NULL);
        pthread_join(t2, NULL);
        pthread_join(t3, NULL);
    }
    return 0;
}

On one machine (single CPU) it always shows whole loop from t3, then t2 and finally from t1. On another (2 cores) the order of threads is more random, but almost always it shows whole loop for each thread before granting the mutex to other thread. Rarely there is a situation like:

t1 8
t1 9
            t3 0
     t2 0
     t2 1
     [removed other t2 output]
     t2 8
     t2 9
            t3 1
            t3 2

Enable pthread_yield by replacing #if 0 with #if 1 and watch results and check output. For me it works in a way two threads display their output interlaced, then third thread finally has a chance to work. Add another or more thread. Play with sleep, etc. It confirms the random behaviour.

If you wish to experiment a little, compile and run following piece of code. It's an example of single producer - multiple consumers model. It can be run with two parameters: first is the number of consumer threads, second is the length of produced data series. If no parameters are given there is one consumer thread and 120 items to be processed. I also recommend with sleep/usleep in places marked /* play here */: change the value of arguments, remove the sleep at all, move it - when appropriate - to critical section or replace with pthread_yield and observe changes in behaviour.

#define _GNU_SOURCE
#include <assert.h>
#include <limits.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>

struct data_t {
    int seq;
    int payload;
    struct data_t *next;
};

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
struct data_t *first = NULL, *last = NULL;
int in_progress = 1;
int num_data = 120;

void push(int seq, int payload) {
    struct data_t *e;
    e = malloc(sizeof(struct data_t));
    e->seq = seq;
    e->payload = payload;
    e->next = NULL;
    if (last == NULL) {
        assert(first == NULL);
        first = last = e;
    } else {
        last->next = e;
        last = e;
    }
}

struct data_t pop() {
    struct data_t res = {0};
    if (first == NULL) {
        res.seq = -1;
    } else {
        res.seq = first->seq;
        res.payload = first->payload;
        first = first->next;
        if (first == NULL) {
            last = NULL;
        }
    }
    return res;
}

void *producer(void *arg __attribute__((unused))) {
    int i;
    printf("producer created
");
    for (i = 0; i < num_data; ++i) {
        int val;
        sleep(1); /* play here */
        pthread_mutex_lock(&mutex);
        val = rand() / (INT_MAX / 1000);
        push(i, val);
        pthread_mutex_unlock(&mutex);
        pthread_cond_signal(&cond);
        printf("prod %3d %3d signaled
", i, val);
    }
    in_progress = 0;
    printf("prod end
");
    pthread_cond_broadcast(&cond);
    printf("prod end signaled
");
    return NULL;
}

void *consumer(void *arg) {
    char c_id[1024];
    int t_id = *(int *)arg;
    sprintf(c_id, "%*s c %02d", t_id % 10, "", t_id);
    printf("%s created
", c_id);
    while (1) {
        struct data_t item;
        pthread_mutex_lock(&mutex);
        item = pop();
        while (item.seq == -1 && in_progress) {
            printf("%s waits for data
", c_id);
            pthread_cond_wait(&cond, &mutex);
            printf("%s got signal
", c_id);
            item = pop();
        }
        if (!in_progress && item.seq == -1) {
            printf("%s detected end of data.
", c_id);
            pthread_mutex_unlock(&mutex);
            break;
        }
        pthread_mutex_unlock(&mutex);
        printf("%s processing %3d %3d
", c_id, item.seq, item.payload);
        sleep(item.payload % 10); /* play here */
        printf("%s processed  %3d %3d
", c_id, item.seq, item.payload);
    }
    printf("%s end
", c_id);
    return NULL;
}

int main(int argc, char *argv[]) {
    int num_cons = 1;
    pthread_t t_prod;
    pthread_t *t_cons;
    int i;
    int *nums;
    if (argc > 1) {
        num_cons = atoi(argv[1]);
        if (num_cons == 0) {
            num_cons = 1;
        }
        if (num_cons > 99) {
            num_cons = 99;
        }
    }
    if (argc > 2) {
        num_data = atoi(argv[2]);
        if (num_data < 10) {
            num_data = 10;
        }
        if (num_data > 600) {
            num_data = 600;
        }
    }

    printf("Spawning %d consumer%s for %d items.
", num_cons, num_cons == 1 ? "" : "s", num_data);
    t_cons = malloc(sizeof(pthread_t) * num_cons);
    nums = malloc(sizeof(int) * num_cons);
    if (!t_cons || !nums) {
        printf("Out of memory!
");
        exit(1);
    }
    srand(time(NULL));
    pthread_create(&t_prod, NULL, producer, NULL);

    for (i = 0; i < num_cons; ++i) {
        nums[i] = i + 1;
        usleep(100000); /* play here */
        pthread_create(t_cons + i, NULL, consumer, nums + i);
    }

    pthread_join(t_prod, NULL);

    for (i = 0; i < num_cons; ++i) {
        pthread_join(t_cons[i], NULL);
    }
    free(nums);
    free(t_cons);

    return 0;
}

I hope I have cleared your doubts and gave you some code to experiment and gain some confidence about pthread behaviour.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...