123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239 |
- /* tsiproxy mpeg transport stream proxy */
- #include <stdio.h>
- #include <stdlib.h>
- #include <stdarg.h>
- #include <unistd.h>
- #include <string.h>
- #include <signal.h>
- #include <netdb.h>
- #include <pthread.h>
- #include <arpa/inet.h>
- #include <netinet/in.h>
- #include <sys/socket.h>
- #include <sys/time.h>
- #include <sys/types.h>
-
- #include <fcntl.h>
- #include <errno.h>
-
- #include "libfuncs/libfuncs.h"
-
- #include "data.h"
- #include "conf.h"
- #include "proxy_common.h"
-
- extern LIST *restreamer;
- extern STATS allstats;
- extern CONFIG *config;
-
- extern char TS_NULL_FRAME[FRAME_PACKET_SIZE];
- extern int keep_running;
-
- #define MAX_ZERO_READS 3
-
- #define CONNECT_RETRIES 2
-
- /* Start: 3 seconds on connect */
- /* In connection: Max UDP timeout == 3 seconds (read) + 2 seconds (connect) == 5 seconds */
- #define UDP_READ_RETRIES 3
- #define UDP_READ_TIMEOUT 1000
-
- /* Start: 1/4 seconds on connect */
- /* In connection: Max TCP timeout == 5 seconds (read) + 2 seconds (connect) == 7 seconds */
- /* In connection: Max TCP timeout == 5 seconds (read) + 8 seconds (connect, host unrch) == 13 seconds */
- #define TCP_READ_RETRIES 5
- #define TCP_READ_TIMEOUT 1000
-
- /*
- Returns:
- 0 = synced ok
- 1 = not synced, reconnect
- */
- int mpeg_sync(int proxysock, char *channel, channel_source source_proto) {
- time_t sync_start = time(NULL);
- unsigned int sync_packets = 0;
- unsigned int read_bytes = 0;
- char syncframe[188];
-
- if (!config->ts_sync)
- return 0;
-
- int _timeout = TCP_READ_TIMEOUT;
- int _retries = TCP_READ_RETRIES;
- if (source_proto == udp_sock) {
- _timeout = UDP_READ_TIMEOUT;
- _retries = UDP_READ_RETRIES;
- }
- do {
- resync:
- if (!keep_running)
- return 1;
- if (fdread_ex(proxysock, syncframe, 1, _timeout, _retries, 1) != 1) {
- LOGf("DEBUG: mpeg_sync fdread() timeout | Channel: %s\n", channel);
- return 1; // reconnect
- }
- // LOGf("DEBUG: Read 0x%02x Offset %u Sync: %u\n", (uint8_t)syncframe[0], read_bytes, sync_packets);
- read_bytes++;
- if (syncframe[0] == 0x47) {
- ssize_t rdsz = fdread_ex(proxysock, syncframe, 188-1, _timeout, _retries, 1);
- if (rdsz != 188-1) {
- LOGf("DEBUG: mpeg_sync fdread() timeout | Channel: %s\n", channel);
- return 1; // reconnect
- }
- read_bytes += 188-1;
- if (++sync_packets == 7) // sync 7 packets
- break;
- goto resync;
- } else {
- sync_packets = 0;
- }
- if (read_bytes > FRAME_PACKET_SIZE) { // Can't sync in 1316 bytes
- LOGf("DEBUG: Can't sync after %d bytes | Channel: %s\n", FRAME_PACKET_SIZE, channel);
- return 1; // reconnect
- }
- if (sync_start+2 <= time(NULL)) { // Do not sync in two seconds
- LOGf("DEBUG: Timeout while syncing (read %u bytes) | Channel: %s\n", read_bytes, channel);
- return 1; // reconnect
- }
- } while (1);
- LOGf("SYNC : TS synced after %u bytes | Channel: %s\n", read_bytes-FRAME_PACKET_SIZE, channel);
- return 0;
- }
-
- void * proxy_ts_stream(void *self) {
- RESTREAMER *r = self;
- int retries = CONNECT_RETRIES;
- unsigned int pos = 0;
- int first_connect = 1;
-
- signal(SIGPIPE, SIG_IGN);
-
- #define BUFFER_SIZE (FRAME_PACKET_SIZE * BUF)
-
- char buff[BUFFER_SIZE];
- char *buff_end = buff + BUFFER_SIZE;
-
- int http_code = 0;
- while (retries--) {
- int result = connect_source(self, retries, BUFFER_SIZE, &http_code);
- if (result > 0) {
- if (!first_connect) {
- goto RECONNECT;
- } else {
- goto EXIT_THREAD;
- }
- }
- if (result < 0)
- goto EXIT_THREAD;
-
- switch (check_restreamer_state(r)) {
- case 1: goto RECONNECT; // r->reconnect is on
- case 2: goto QUIT; // r->dienow is on
- }
-
- channel_source sproto = get_sproto(r->channel->source);
-
- int mpgsync = mpeg_sync(r->sock, r->channel->name, sproto);
- if (mpgsync == 1 && first_connect) { // Timeout
- if (sproto == udp_sock) {
- goto RECONNECT;
- } else {
- goto EXIT_THREAD;
- }
- }
-
-
- char *p = buff;
- pos = 0;
- ssize_t readen = 0;
- int max_zero_reads = MAX_ZERO_READS;
- for (;;) {
- switch (check_restreamer_state(r)) {
- case 1: goto RECONNECT; // r->reconnect is on
- case 2: goto QUIT; // r->dienow is on
- }
-
- if (sproto == tcp_sock) {
- readen = fdread_ex(r->sock, p, FRAME_PACKET_SIZE, TCP_READ_TIMEOUT, TCP_READ_RETRIES, 1);
- } else {
- int read_so_far = 0;
- int buffer_left;
- AGAIN:
- buffer_left = buff_end - (p + read_so_far);
- if (buffer_left >= FRAME_PACKET_SIZE) { // There is enough size for one frame in the buffer
- readen = fdread_ex(r->sock, p+read_so_far, FRAME_PACKET_SIZE, UDP_READ_TIMEOUT, UDP_READ_RETRIES, 0);
- if (readen > 0) {
- read_so_far += readen;
- if (read_so_far < FRAME_PACKET_SIZE)
- goto AGAIN;
- readen = read_so_far;
- buffer_left = buff_end - (p + read_so_far);
- if (buffer_left < FRAME_PACKET_SIZE) {
- memcpy(p+read_so_far, TS_NULL_FRAME, buffer_left);
- }
- }
- } else { // Not enough buffer space, wait a bit so the buffer can wrap around
- if (buffer_left > 0) {
- //LOGf("no read : only %d bytes left to the end of buffer, pos:%d/%d\n", buffer_left, pos, BUF);
- readen = FRAME_PACKET_SIZE; // Oh, yeah we have that many bytes already read
- if (pos+1 == BUF) { // Buffer will wrap now
- //LOGf("Reached buffer end correcting last frame\n");
- p -= FRAME_PACKET_SIZE - buffer_left; // That last packet must be sended now
- } else {
- p -= readen; // Prevent p from moving further (it will be added to bellow)
- }
- }
- }
- }
- if (readen == -1) { // timeout
- if (first_connect || sproto == udp_sock)
- goto EXIT_THREAD;
- break;
- }
- if (readen == 0) { // ho, hum, wtf is going on here?
- if (--max_zero_reads == 0) {
- LOGf("RESTR: %d zero reads on srv_fd: %i | Channel: %s Source: %s\n", MAX_ZERO_READS, r->sock, r->channel->name, r->channel->source);
- break;
- }
- continue;
- }
-
- max_zero_reads = MAX_ZERO_READS;
- allstats.traffic_in += readen;
- r->traffic_in += readen;
-
- // Fill short frame with NULL packets
- if (readen < FRAME_PACKET_SIZE) {
- LOGf("DEBUG: Short read (%d) on retreamer srv_fd: %i | Channel: %s\n", (int)readen, r->sock, r->channel->name);
- memcpy(p+readen, TS_NULL_FRAME+readen, FRAME_PACKET_SIZE - readen);
- readen = FRAME_PACKET_SIZE;
- }
-
- process_new_clients(r, BUFFER_SIZE, 0, 0);
- pos++;
- p += readen;
- if (pos == BUF) {
- pos = 0;
- p=buff;
- }
-
- int num_clients = shout(r, buff, BUF, FRAME_PACKET_SIZE, pos*FRAME_PACKET_SIZE, "Content-Type: video/mpeg\n\n", NULL, 0);
- if (!num_clients)
- goto QUIT;
- first_connect = 0;
- retries = CONNECT_RETRIES;
- }
- LOGf("DEBUG: fdread timeout restreamer srv_fd: %i | Channel: %s\n", r->sock, r->channel->name);
- RECONNECT:
- if (!keep_running)
- goto QUIT;
- proxy_begin_reconnect(r);
- if (r->clients && r->clients->items)
- continue;
- QUIT:
- break;
- }
- EXIT_THREAD:
- proxy_close(r, http_code);
- return 0;
- }
|