libfuncs is collection of code (list, queue, circular buffer, io, logging, etc.). https://georgi.unixsol.org/programs/libfuncs/
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

queue.c 2.1KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. /*
  2. * IPTV.bg Media Proxy
  3. * Request queue handling
  4. *
  5. * Copyright (C) 2006 Unix Solutions Ltd.
  6. * Written by Luben Karavelov (luben@unixsol.org)
  7. *
  8. */
  9. #include <stdio.h>
  10. #include <stdlib.h>
  11. #include <assert.h>
  12. #include <errno.h>
  13. #include <err.h>
  14. #include <pthread.h>
  15. #include "libfuncs.h"
  16. #include "queue.h"
  17. QUEUE *queue_new() {
  18. pthread_mutex_t *mutex = malloc(sizeof(pthread_mutex_t));
  19. if (pthread_mutex_init(mutex,NULL) != 0) {
  20. perror("queue_new: mutex_init");
  21. return NULL;
  22. }
  23. pthread_cond_t *cond = malloc(sizeof(pthread_cond_t));
  24. if (pthread_cond_init(cond,NULL)!=0){
  25. perror("queue_new: cond_init");
  26. return NULL;
  27. }
  28. QUEUE *q = calloc(1, sizeof(QUEUE));
  29. if (!q)
  30. return NULL;
  31. // initialize queue
  32. q->mutex = mutex;
  33. q->cond = cond;
  34. return q;
  35. }
  36. void queue_free(QUEUE **pq) {
  37. QUEUE *q = *pq;
  38. void *data;
  39. if (!q)
  40. return;
  41. while (q->items > 0) {
  42. data = queue_get(q);
  43. }
  44. pthread_mutex_destroy(q->mutex);
  45. FREE(q->mutex);
  46. pthread_cond_destroy(q->cond);
  47. FREE(q->cond);
  48. FREE(*pq);
  49. }
  50. void queue_add(QUEUE *q, void *data) {
  51. if (!q)
  52. return;
  53. QNODE *a_msg = malloc(sizeof(QNODE));
  54. if (!a_msg) {
  55. perror("queue_enqueue, malloc:");
  56. return;
  57. }
  58. a_msg->data = data;
  59. a_msg->next = NULL;
  60. pthread_mutex_lock(q->mutex);
  61. if (q->items == 0) { // special case - queue is empty
  62. q->head = a_msg;
  63. q->tail = a_msg;
  64. } else {
  65. q->tail->next = a_msg;
  66. q->tail = a_msg;
  67. }
  68. q->items++;
  69. pthread_cond_signal(q->cond);
  70. pthread_mutex_unlock(q->mutex);
  71. }
  72. void *queue_get(QUEUE *q) {
  73. if (!q)
  74. return NULL;
  75. pthread_mutex_lock(q->mutex);
  76. while (q->items==0) {
  77. pthread_cond_wait(q->cond, q->mutex);
  78. if (q->items == 0)
  79. return NULL;
  80. }
  81. if (!q || !q->head || q->items == 0) // Can happen in free
  82. return NULL;
  83. QNODE *a_msg = q->head;
  84. q->head = a_msg->next;
  85. if (q->head == NULL) { // this was last msg
  86. q->tail = NULL;
  87. }
  88. q->items--;
  89. pthread_cond_signal(q->cond);
  90. pthread_mutex_unlock(q->mutex);
  91. void *data = a_msg->data;
  92. FREE(a_msg);
  93. return data;
  94. }
  95. void *queue_get_nowait(QUEUE* q) {
  96. if (!q || q->items == 0)
  97. return NULL;
  98. return queue_get(q);
  99. }
  100. void queue_wakeup(QUEUE *q) {
  101. if (q) {
  102. pthread_cond_signal(q->cond);
  103. }
  104. }