No Description
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

proxy_ts.c 6.6KB


  1. /* tsiproxy mpeg transport stream proxy */
  2. #include <stdio.h>
  3. #include <stdlib.h>
  4. #include <stdarg.h>
  5. #include <unistd.h>
  6. #include <string.h>
  7. #include <signal.h>
  8. #include <netdb.h>
  9. #include <pthread.h>
  10. #include <arpa/inet.h>
  11. #include <netinet/in.h>
  12. #include <sys/socket.h>
  13. #include <sys/time.h>
  14. #include <sys/types.h>
  15. #include <fcntl.h>
  16. #include <errno.h>
  17. #include "libfuncs/libfuncs.h"
  18. #include "data.h"
  19. #include "conf.h"
  20. #include "proxy_common.h"
  21. extern LIST *restreamer;
  22. extern STATS allstats;
  23. extern CONFIG *config;
  24. extern char TS_NULL_FRAME[FRAME_PACKET_SIZE];
  25. extern int keep_running;
  26. #define MAX_ZERO_READS 3
  27. #define CONNECT_RETRIES 2
  28. /* Start: 3 seconds on connect */
  29. /* In connection: Max UDP timeout == 3 seconds (read) + 2 seconds (connect) == 5 seconds */
  30. #define UDP_READ_RETRIES 3
  31. #define UDP_READ_TIMEOUT 1000
  32. /* Start: 1/4 seconds on connect */
  33. /* In connection: Max TCP timeout == 5 seconds (read) + 2 seconds (connect) == 7 seconds */
  34. /* In connection: Max TCP timeout == 5 seconds (read) + 8 seconds (connect, host unrch) == 13 seconds */
  35. #define TCP_READ_RETRIES 5
  36. #define TCP_READ_TIMEOUT 1000
  37. /*
  38. Returns:
  39. 0 = synced ok
  40. 1 = not synced, reconnect
  41. */
  42. int mpeg_sync(int proxysock, char *channel, channel_source source_proto) {
  43. time_t sync_start = time(NULL);
  44. unsigned int sync_packets = 0;
  45. unsigned int read_bytes = 0;
  46. char syncframe[188];
  47. if (!config->ts_sync)
  48. return 0;
  49. int _timeout = TCP_READ_TIMEOUT;
  50. int _retries = TCP_READ_RETRIES;
  51. if (source_proto == udp_sock) {
  52. _timeout = UDP_READ_TIMEOUT;
  53. _retries = UDP_READ_RETRIES;
  54. }
  55. do {
  56. resync:
  57. if (!keep_running)
  58. return 1;
  59. if (fdread_ex(proxysock, syncframe, 1, _timeout, _retries, 1) != 1) {
  60. LOGf("DEBUG: mpeg_sync fdread() timeout | Channel: %s\n", channel);
  61. return 1; // reconnect
  62. }
  63. // LOGf("DEBUG: Read 0x%02x Offset %u Sync: %u\n", (uint8_t)syncframe[0], read_bytes, sync_packets);
  64. read_bytes++;
  65. if (syncframe[0] == 0x47) {
  66. ssize_t rdsz = fdread_ex(proxysock, syncframe, 188-1, _timeout, _retries, 1);
  67. if (rdsz != 188-1) {
  68. LOGf("DEBUG: mpeg_sync fdread() timeout | Channel: %s\n", channel);
  69. return 1; // reconnect
  70. }
  71. read_bytes += 188-1;
  72. if (++sync_packets == 7) // sync 7 packets
  73. break;
  74. goto resync;
  75. } else {
  76. sync_packets = 0;
  77. }
  78. if (read_bytes > FRAME_PACKET_SIZE) { // Can't sync in 1316 bytes
  79. LOGf("DEBUG: Can't sync after %d bytes | Channel: %s\n", FRAME_PACKET_SIZE, channel);
  80. return 1; // reconnect
  81. }
  82. if (sync_start+2 <= time(NULL)) { // Do not sync in two seconds
  83. LOGf("DEBUG: Timeout while syncing (read %u bytes) | Channel: %s\n", read_bytes, channel);
  84. return 1; // reconnect
  85. }
  86. } while (1);
  87. LOGf("SYNC : TS synced after %u bytes | Channel: %s\n", read_bytes-FRAME_PACKET_SIZE, channel);
  88. return 0;
  89. }
  90. void * proxy_ts_stream(void *self) {
  91. RESTREAMER *r = self;
  92. int retries = CONNECT_RETRIES;
  93. unsigned int pos = 0;
  94. int first_connect = 1;
  95. signal(SIGPIPE, SIG_IGN);
  96. #define BUFFER_SIZE (FRAME_PACKET_SIZE * BUF)
  97. char buff[BUFFER_SIZE];
  98. char *buff_end = buff + BUFFER_SIZE;
  99. int http_code = 0;
  100. while (retries--) {
  101. int result = connect_source(self, retries, BUFFER_SIZE, &http_code);
  102. if (result > 0) {
  103. if (!first_connect) {
  104. goto RECONNECT;
  105. } else {
  106. goto EXIT_THREAD;
  107. }
  108. }
  109. if (result < 0)
  110. goto EXIT_THREAD;
  111. switch (check_restreamer_state(r)) {
  112. case 1: goto RECONNECT; // r->reconnect is on
  113. case 2: goto QUIT; // r->dienow is on
  114. }
  115. channel_source sproto = get_sproto(r->channel->source);
  116. int mpgsync = mpeg_sync(r->sock, r->channel->name, sproto);
  117. if (mpgsync == 1 && first_connect) { // Timeout
  118. if (sproto == udp_sock) {
  119. goto RECONNECT;
  120. } else {
  121. goto EXIT_THREAD;
  122. }
  123. }
  124. char *p = buff;
  125. pos = 0;
  126. ssize_t readen = 0;
  127. int max_zero_reads = MAX_ZERO_READS;
  128. for (;;) {
  129. switch (check_restreamer_state(r)) {
  130. case 1: goto RECONNECT; // r->reconnect is on
  131. case 2: goto QUIT; // r->dienow is on
  132. }
  133. if (sproto == tcp_sock) {
  134. readen = fdread_ex(r->sock, p, FRAME_PACKET_SIZE, TCP_READ_TIMEOUT, TCP_READ_RETRIES, 1);
  135. } else {
  136. int read_so_far = 0;
  137. int buffer_left;
  138. AGAIN:
  139. buffer_left = buff_end - (p + read_so_far);
  140. if (buffer_left >= FRAME_PACKET_SIZE) { // There is enough size for one frame in the buffer
  141. readen = fdread_ex(r->sock, p+read_so_far, FRAME_PACKET_SIZE, UDP_READ_TIMEOUT, UDP_READ_RETRIES, 0);
  142. if (readen > 0) {
  143. read_so_far += readen;
  144. if (read_so_far < FRAME_PACKET_SIZE)
  145. goto AGAIN;
  146. readen = read_so_far;
  147. buffer_left = buff_end - (p + read_so_far);
  148. if (buffer_left < FRAME_PACKET_SIZE) {
  149. memcpy(p+read_so_far, TS_NULL_FRAME, buffer_left);
  150. }
  151. }
  152. } else { // Not enough buffer space, wait a bit so the buffer can wrap around
  153. if (buffer_left > 0) {
  154. //LOGf("no read : only %d bytes left to the end of buffer, pos:%d/%d\n", buffer_left, pos, BUF);
  155. readen = FRAME_PACKET_SIZE; // Oh, yeah we have that many bytes already read
  156. if (pos+1 == BUF) { // Buffer will wrap now
  157. //LOGf("Reached buffer end correcting last frame\n");
  158. p -= FRAME_PACKET_SIZE - buffer_left; // That last packet must be sended now
  159. } else {
  160. p -= readen; // Prevent p from moving further (it will be added to bellow)
  161. }
  162. }
  163. }
  164. }
  165. if (readen == -1) { // timeout
  166. if (first_connect || sproto == udp_sock)
  167. goto EXIT_THREAD;
  168. break;
  169. }
  170. if (readen == 0) { // ho, hum, wtf is going on here?
  171. if (--max_zero_reads == 0) {
  172. LOGf("RESTR: %d zero reads on srv_fd: %i | Channel: %s Source: %s\n", MAX_ZERO_READS, r->sock, r->channel->name, r->channel->source);
  173. break;
  174. }
  175. continue;
  176. }
  177. max_zero_reads = MAX_ZERO_READS;
  178. allstats.traffic_in += readen;
  179. r->traffic_in += readen;
  180. // Fill short frame with NULL packets
  181. if (readen < FRAME_PACKET_SIZE) {
  182. LOGf("DEBUG: Short read (%d) on retreamer srv_fd: %i | Channel: %s\n", (int)readen, r->sock, r->channel->name);
  183. memcpy(p+readen, TS_NULL_FRAME+readen, FRAME_PACKET_SIZE - readen);
  184. readen = FRAME_PACKET_SIZE;
  185. }
  186. process_new_clients(r, BUFFER_SIZE, 0, 0);
  187. pos++;
  188. p += readen;
  189. if (pos == BUF) {
  190. pos = 0;
  191. p=buff;
  192. }
  193. int num_clients = shout(r, buff, BUF, FRAME_PACKET_SIZE, pos*FRAME_PACKET_SIZE, "Content-Type: video/mpeg\n\n", NULL, 0);
  194. if (!num_clients)
  195. goto QUIT;
  196. first_connect = 0;
  197. retries = CONNECT_RETRIES;
  198. }
  199. LOGf("DEBUG: fdread timeout restreamer srv_fd: %i | Channel: %s\n", r->sock, r->channel->name);
  200. RECONNECT:
  201. if (!keep_running)
  202. goto QUIT;
  203. proxy_begin_reconnect(r);
  204. if (r->clients && r->clients->items)
  205. continue;
  206. QUIT:
  207. break;
  208. }
  209. EXIT_THREAD:
  210. proxy_close(r, http_code);
  211. return 0;
  212. }