/* tsiproxy mpeg transport stream proxy */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "libfuncs/libfuncs.h" #include "data.h" #include "conf.h" #include "proxy_common.h" extern LIST *restreamer; extern STATS allstats; extern CONFIG *config; extern char TS_NULL_FRAME[FRAME_PACKET_SIZE]; extern int keep_running; #define MAX_ZERO_READS 3 #define CONNECT_RETRIES 2 /* Start: 3 seconds on connect */ /* In connection: Max UDP timeout == 3 seconds (read) + 2 seconds (connect) == 5 seconds */ #define UDP_READ_RETRIES 3 #define UDP_READ_TIMEOUT 1000 /* Start: 1/4 seconds on connect */ /* In connection: Max TCP timeout == 5 seconds (read) + 2 seconds (connect) == 7 seconds */ /* In connection: Max TCP timeout == 5 seconds (read) + 8 seconds (connect, host unrch) == 13 seconds */ #define TCP_READ_RETRIES 5 #define TCP_READ_TIMEOUT 1000 /* Returns: 0 = synced ok 1 = not synced, reconnect */ int mpeg_sync(int proxysock, char *channel, channel_source source_proto) { time_t sync_start = time(NULL); unsigned int sync_packets = 0; unsigned int read_bytes = 0; char syncframe[188]; if (!config->ts_sync) return 0; int _timeout = TCP_READ_TIMEOUT; int _retries = TCP_READ_RETRIES; if (source_proto == udp_sock) { _timeout = UDP_READ_TIMEOUT; _retries = UDP_READ_RETRIES; } do { resync: if (!keep_running) return 1; if (fdread_ex(proxysock, syncframe, 1, _timeout, _retries, 1) != 1) { LOGf("DEBUG: mpeg_sync fdread() timeout | Channel: %s\n", channel); return 1; // reconnect } // LOGf("DEBUG: Read 0x%02x Offset %u Sync: %u\n", (uint8_t)syncframe[0], read_bytes, sync_packets); read_bytes++; if (syncframe[0] == 0x47) { ssize_t rdsz = fdread_ex(proxysock, syncframe, 188-1, _timeout, _retries, 1); if (rdsz != 188-1) { LOGf("DEBUG: mpeg_sync fdread() timeout | Channel: %s\n", channel); return 1; // reconnect } read_bytes += 188-1; if (++sync_packets == 7) // sync 7 packets break; goto resync; } else { sync_packets = 0; } if (read_bytes > FRAME_PACKET_SIZE) { // Can't sync in 1316 bytes LOGf("DEBUG: Can't sync after %d bytes | Channel: %s\n", FRAME_PACKET_SIZE, channel); return 1; // reconnect } if (sync_start+2 <= time(NULL)) { // Do not sync in two seconds LOGf("DEBUG: Timeout while syncing (read %u bytes) | Channel: %s\n", read_bytes, channel); return 1; // reconnect } } while (1); LOGf("SYNC : TS synced after %u bytes | Channel: %s\n", read_bytes-FRAME_PACKET_SIZE, channel); return 0; } void * proxy_ts_stream(void *self) { RESTREAMER *r = self; int retries = CONNECT_RETRIES; unsigned int pos = 0; int first_connect = 1; signal(SIGPIPE, SIG_IGN); #define BUFFER_SIZE (FRAME_PACKET_SIZE * BUF) char buff[BUFFER_SIZE]; char *buff_end = buff + BUFFER_SIZE; int http_code = 0; while (retries--) { int result = connect_source(self, retries, BUFFER_SIZE, &http_code); if (result > 0) { if (!first_connect) { goto RECONNECT; } else { goto EXIT_THREAD; } } if (result < 0) goto EXIT_THREAD; switch (check_restreamer_state(r)) { case 1: goto RECONNECT; // r->reconnect is on case 2: goto QUIT; // r->dienow is on } channel_source sproto = get_sproto(r->channel->source); int mpgsync = mpeg_sync(r->sock, r->channel->name, sproto); if (mpgsync == 1 && first_connect) { // Timeout if (sproto == udp_sock) { goto RECONNECT; } else { goto EXIT_THREAD; } } char *p = buff; pos = 0; ssize_t readen = 0; int max_zero_reads = MAX_ZERO_READS; for (;;) { switch (check_restreamer_state(r)) { case 1: goto RECONNECT; // r->reconnect is on case 2: goto QUIT; // r->dienow is on } if (sproto == tcp_sock) { readen = fdread_ex(r->sock, p, FRAME_PACKET_SIZE, TCP_READ_TIMEOUT, TCP_READ_RETRIES, 1); } else { int read_so_far = 0; int buffer_left; AGAIN: buffer_left = buff_end - (p + read_so_far); if (buffer_left >= FRAME_PACKET_SIZE) { // There is enough size for one frame in the buffer readen = fdread_ex(r->sock, p+read_so_far, FRAME_PACKET_SIZE, UDP_READ_TIMEOUT, UDP_READ_RETRIES, 0); if (readen > 0) { read_so_far += readen; if (read_so_far < FRAME_PACKET_SIZE) goto AGAIN; readen = read_so_far; buffer_left = buff_end - (p + read_so_far); if (buffer_left < FRAME_PACKET_SIZE) { memcpy(p+read_so_far, TS_NULL_FRAME, buffer_left); } } } else { // Not enough buffer space, wait a bit so the buffer can wrap around if (buffer_left > 0) { //LOGf("no read : only %d bytes left to the end of buffer, pos:%d/%d\n", buffer_left, pos, BUF); readen = FRAME_PACKET_SIZE; // Oh, yeah we have that many bytes already read if (pos+1 == BUF) { // Buffer will wrap now //LOGf("Reached buffer end correcting last frame\n"); p -= FRAME_PACKET_SIZE - buffer_left; // That last packet must be sended now } else { p -= readen; // Prevent p from moving further (it will be added to bellow) } } } } if (readen == -1) { // timeout if (first_connect || sproto == udp_sock) goto EXIT_THREAD; break; } if (readen == 0) { // ho, hum, wtf is going on here? if (--max_zero_reads == 0) { LOGf("RESTR: %d zero reads on srv_fd: %i | Channel: %s Source: %s\n", MAX_ZERO_READS, r->sock, r->channel->name, r->channel->source); break; } continue; } max_zero_reads = MAX_ZERO_READS; allstats.traffic_in += readen; r->traffic_in += readen; // Fill short frame with NULL packets if (readen < FRAME_PACKET_SIZE) { LOGf("DEBUG: Short read (%d) on retreamer srv_fd: %i | Channel: %s\n", (int)readen, r->sock, r->channel->name); memcpy(p+readen, TS_NULL_FRAME+readen, FRAME_PACKET_SIZE - readen); readen = FRAME_PACKET_SIZE; } process_new_clients(r, BUFFER_SIZE, 0, 0); pos++; p += readen; if (pos == BUF) { pos = 0; p=buff; } int num_clients = shout(r, buff, BUF, FRAME_PACKET_SIZE, pos*FRAME_PACKET_SIZE, "Content-Type: video/mpeg\n\n", NULL, 0); if (!num_clients) goto QUIT; first_connect = 0; retries = CONNECT_RETRIES; } LOGf("DEBUG: fdread timeout restreamer srv_fd: %i | Channel: %s\n", r->sock, r->channel->name); RECONNECT: if (!keep_running) goto QUIT; proxy_begin_reconnect(r); if (r->clients && r->clients->items) continue; QUIT: break; } EXIT_THREAD: proxy_close(r, http_code); return 0; }