首页 > 编程笔记 > C语言笔记 阅读:19308

C语言线程间通信

C11 标准为线程间通信提供了条件变量(condition variable)。线程可以使用条件变量,以等待来自另一个线程的通知,通知告知了指定的条件已被满足。例如,这类通知可能代表某些数据已经准备好进行处理。

条件变量由类型为 cnd_t 的对象表示,并配合互斥一起使用。一般过程如下:线程获得互斥,然后测试条件。如果条件不满足,则线程继续等待条件变量(释放互斥),直到另一个线程再次唤醒它,然后该线程再次获得互斥,并再次测试条件,重复上述过程,直到条件满足。

头文件 threads.h 定义了使用条件变量的函数,它们如下所示:

  1. int cnd_init(cnd_t*cond);
初始化 cond 引用的条件变量。

  1. void cnd_destroy(cnd_t*cond);
释放指定条件变量使用的所有资源。

  1. int cnd_signal(cnd_t*cond);
在等待指定条件变量的任意数量的线程中,唤醒其中一个线程。

  1. int cnd_broadcast(cnd_t*cond);
唤醒所有等待指定条件变量的线程。

  1. int cnd_wait(cnd_t*cond,mtx_t*mtx);
阻塞正在调用的线程,并释放指定的互斥。在调用 cnd_wait()之前,线程必须持有互斥。如果另一线程通过发送一个信号解除当前线程的阻塞(也就是说,通过指定同样的条件变量作为参数调用 cond_signal()或 cnd_broadcast()),那么调用 cnd_wait()的线程在 cnd_wait()返回之前会再次获得互斥。

  1. int cnd_timedwait(cnd_t*restrict cond,mtx_t*restrict mtx,const struct timespec*restrict ts);
与 cnd_wait()类似,cnd_timedwait()阻塞调用它们的线程,但仅维持由参数 ts 指定的时间。可以通过调用函数 timespec_get()获得一个 struct timespec 对象,它表示当前时间。

除 cnd_destroy()以外的所有条件变量函数,如果它们引发错误,则返回值 thrd_error,否则返回值 thrd_success。当时间达到限定值时,函数 cnd_timedwait()也会返回值 thrd_timedout。

例 1 与例 2 中的程序展示了在常见的“生产者-消费者”模型中使用条件变量。程序为每个生产者和消费者开启一个新线程。生产者将一个新产品(在我们的示例中,新产品为一个 int 变量)放入一个环形缓冲区中,假设这个缓冲区没有满,然后通知等待的消费者:产品已经准备好。每个消费者从该缓冲区中取出产品,然后将实际情况通知给正在等待的生产者。

在任一特定时间,只有一个线程可以修改环形缓冲器。因此,在函数 bufPut()和 bufGet()间将存在线程同步问题,函数 bufPut()将一个元素插入到缓冲区,函数 buf-Get()将一个元素从缓冲区移除。

有两个条件变量:生产者等待其中一个条件变量,以判断缓冲器是否满了;消费者等待另一个条件变量,以判断缓冲器是否空了。缓冲区的所有必需元素都包括在结构 Buffer 中。函数 bufInit()初始化具有指定大小的 Buffer 对象,而函数 bufDestroy()销毁 Buffer 对象。

【例1】用于“生产者-消费者”模型的环形缓冲区
  1. /* buffer.h
  2. * 用于线程安全缓冲区的所有声明
  3. */
  4. #include <stdbool.h>
  5. #include <threads.h>
  6.  
  7. typedef struct Buffer
  8. {
  9. int *data; // 指向数据数组的指针
  10. size_t size, count; // 元素数量的最大值和当前值
  11. size_t tip, tail; // tip = 下一个空点的索引
  12. mtx_t mtx; // 一个互斥
  13. cnd_t cndPut, cndGet; // 两个条件变量
  14. } Buffer;
  15.  
  16. bool bufInit( Buffer *bufPtr, size_t size );
  17. void bufDestroy(Buffer *bufPtr);
  18.  
  19. bool bufPut(Buffer *bufPtr, int data);
  20. bool bufGet(Buffer *bufPtr, int *dataPtr, int sec);
  21. /* -------------------------------------------------------------
  22. * buffer.c
  23. * 定义用于处理Buffer的函数
  24. */
  25. #include "buffer.h"
  26. #include <stdlib.h> // 为了使用malloc()和free()
  27. bool bufInit( Buffer *bufPtr, size_t size)
  28. {
  29. if ((bufPtr->data = malloc( size * sizeof(int))) == NULL)
  30. return false;
  31. bufPtr->size = size;
  32. bufPtr->count = 0;
  33. bufPtr->tip = bufPtr->tail = 0;
  34. return mtx_init( &bufPtr->mtx, mtx_plain) == thrd_success
  35. && cnd_init( &bufPtr->cndPut) == thrd_success
  36. && cnd_init( &bufPtr->cndGet) == thrd_success;
  37. }
  38.  
  39. void bufDestroy(Buffer *bufPtr)
  40. {
  41. cnd_destroy( &bufPtr->cndGet );
  42. cnd_destroy( &bufPtr->cndPut );
  43. mtx_destroy( &bufPtr->mtx );
  44. free( bufPtr->data );
  45. }
  46.  
  47. // 在缓冲区中插入一个新元素
  48. bool bufPut(Buffer *bufPtr, int data)
  49. {
  50. mtx_lock( &bufPtr->mtx );
  51.  
  52. while (bufPtr->count == bufPtr->size)
  53. if (cnd_wait( &bufPtr->cndPut, &bufPtr->mtx ) != thrd_success)
  54. return false;
  55.  
  56. bufPtr->data[bufPtr->tip] = data;
  57. bufPtr->tip = (bufPtr->tip + 1) % bufPtr->size;
  58. ++bufPtr->count;
  59.  
  60. mtx_unlock( &bufPtr->mtx );
  61. cnd_signal( &bufPtr->cndGet );
  62.  
  63. return true;
  64. }
  65.  
  66. // 从缓冲区中移除一个元素
  67. // 如果缓冲区是空的,则等待不超过sec秒
  68. bool bufGet(Buffer *bufPtr, int *dataPtr, int sec)
  69. {
  70. struct timespec ts;
  71. timespec_get( &ts, TIME_UTC ); // 当前时间
  72. ts.tv_sec += sec; // + sec秒延时
  73.  
  74. mtx_lock( &bufPtr->mtx );
  75. while ( bufPtr->count == 0 )
  76. if (cnd_timedwait(&bufPtr->cndGet,
  77. &bufPtr->mtx, &ts) != thrd_success)
  78. return false;
  79.  
  80. *dataPtr = bufPtr->data[bufPtr->tail];
  81. bufPtr->tail = (bufPtr->tail + 1) % bufPtr->size;
  82. --bufPtr->count;
  83.  
  84. mtx_unlock( &bufPtr->mtx );
  85. cnd_signal( &bufPtr->cndPut );
  86.  
  87. return true;
  88. }

例 2 中的 main()函数创建了一个缓冲区,并启动了若干个生产者和消费者线程,给予每个线程一个识别号码和一个指向缓冲区的指针。每个生产者线程创建一定数量的“产品”,然后用一个 return 语句退出。一个消费者线程如果在给定延时期间无法获得产品以进行消费,则直接返回。

【例2】启动生产者和消费者线程
  1. // producer_consumer.c
  2. #include "buffer.h"
  3. #include <stdio.h>
  4. #include <stdlib.h>
  5.  
  6. #define NP 2 // 生产者的数量
  7. #define NC 3 // 消费者的数量
  8.  
  9. int producer(void *); // 线程函数
  10. int consumer(void *);
  11.  
  12. struct Arg { int id; Buffer *bufPtr; }; // 线程函数的参数
  13. _Noreturn void errorExit(const char* msg)
  14. {
  15. fprintf(stderr, "%s\n", msg); exit(0xff);
  16. }
  17.  
  18. int main(void)
  19. {
  20. printf("Producer-Consumer Demo\n\n");
  21. Buffer buf; // 为5个产品创建一个缓冲区
  22. bufInit( &buf, 5 );
  23.  
  24. thrd_t prod[NP], cons[NC]; // 线程
  25. struct Arg prodArg[NP], consArg[NC]; // 线程的参数
  26. int i = 0, res = 0;
  27.  
  28. for ( i = 0; i < NP; ++i ) // 启动生产者
  29. {
  30. prodArg[i].id = i+1, prodArg[i].bufPtr = &buf;
  31. if (thrd_create( &prod[i], producer, &prodArg[i] ) != thrd_success)
  32. errorExit("Thread error.");
  33. }
  34.  
  35. for ( i = 0; i < NC; ++i ) // 启动消费者
  36. {
  37. consArg[i].id = i+1, consArg[i].bufPtr = &buf;
  38. if ( thrd_create( &cons[i], consumer, &consArg[i] ) != thrd_success)
  39. errorExit("Thread error.");
  40. }
  41.  
  42. for ( i = 0; i < NP; ++i ) // 等待线程结束
  43. thrd_join(prod[i], &res),
  44. printf("\nProducer %d ended with result %d.\n", prodArg[i].id, res);
  45.  
  46. for ( i = 0; i < NC; ++i )
  47. thrd_join(cons[i], &res),
  48. printf("Consumer %d ended with result %d.\n", consArg[i].id, res);
  49. bufDestroy( &buf );
  50. return 0;
  51. }
  52.  
  53. int producer(void *arg) // 生产者线程函数
  54. {
  55. struct Arg *argPtr = (struct Arg *)arg;
  56. int id = argPtr->id;
  57. Buffer *bufPtr = argPtr->bufPtr;
  58. int count = 0;
  59. for (int i = 0; i < 10; ++i)
  60. {
  61. int data = 10*id + i;
  62. if (bufPut( bufPtr, data ))
  63. printf("Producer %d produced %d\n", id, data), ++count;
  64. else
  65. { fprintf( stderr,
  66. "Producer %d: error storing %d\n", id, data);
  67. return -id;
  68. }
  69. }
  70. return count;
  71. }
  72.  
  73. int consumer(void *arg) // 消费者线程函数
  74. {
  75. struct Arg *argPtr = (struct Arg *)arg;
  76. int id = argPtr->id;
  77. Buffer *bufPtr = argPtr->bufPtr;
  78. int count = 0;
  79. int data = 0;
  80. while (bufGet( bufPtr, &data, 2 ))
  81. {
  82. ++count;
  83. printf("Consumer %d consumed %d\n", id, data);
  84. }
  85. return count;
  86. }

相关文章