/* tsiproxy stream proxy */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "libfuncs/libfuncs.h" #include "data.h" #include "conf.h" #include "proxy_common.h" extern char *server_sig; extern char *server_ver; extern regex_t http_response; extern LIST *restreamer, *chanconf; extern STATS allstats; extern CONFIG *config; extern int keep_running; extern void * proxy_ts_stream(void *channel_id); void process_client(int clientsock, CLIENT *client) { list_lock(chanconf); LNODE *l, *tmp; CHANNEL *chan = NULL; list_for_each(chanconf, l, tmp) { if (strcmp(client->chan, ((CHANNEL *)l->data)->name)==0) { chan = l->data; break; } } /* Check if requested channel exists? */ if (!chan) { LOGf("ERROR: No such channel | IP: %s Path: /%s Agent: %s\n", client->IP, client->chan, client->agent); send_404_not_found(client->fno); stop_client_shutdown(client); list_unlock(chanconf); return; } /* Is there an active restreamer for the requested channel */ list_lock(restreamer); RESTREAMER *r = NULL; list_for_each(restreamer, l, tmp) { if (strcmp(client->chan, ((RESTREAMER *)l->data)->name)==0) { r = l->data; break; } } if (r) { /* Active restreamer found. Add client to it's queue. Do it here while restreamer is locked otherwise there is a race! */ queue_add(r->queue, client); list_unlock(restreamer); list_unlock(chanconf); return; } list_unlock(restreamer); /* No active restreamer found, start new one */ RESTREAMER *nr = new_restreamer(client->chan, chan); queue_add(nr->queue, client); list_add(restreamer, nr); list_unlock(chanconf); pthread_t newthread; LOGf("SPAWN: Restreamer | Channel: %s Source: %s client_fd: %i\n", chan->name, chan->source, clientsock); if (pthread_create(&newthread, NULL, &proxy_ts_stream, nr)) { log_perror("Error TS proxy creating thread.", errno); exit(1); } pthread_detach(newthread); } static void remove_clients(RESTREAMER *r, const char *text, int error_code) { CLIENT *c; while ((c=queue_get_nowait(r->queue))) { LOGf("NOSIG: fd: %i srv_fd: %i {Op:%s} {Err:%d} | Client: %lu IP: %s Channel: %s Agent: %s\n", c->fno, r->sock, text, error_code, c->clientid, c->IP, c->chan, c->agent); // Headers are already sent or the client is using udp, just yank it if (c->headers_sent || c->client_port) { stop_client_shutdown(c); } else { char *code = "504 Gateway Timeout"; char *msg = "no-signal"; if (error_code >= 300) { send_http_response(c->fno, code); send_header_textplain(c->fno); fdputsf(c->fno, "X-ErrorCode: %d\nX-Error: %s\n\n%s\n", error_code, msg, msg); } else { send_http_error(c->fno, code, msg); } stop_client_shutdown(c); } } } static void remove_smart_clients(RESTREAMER *r) { CLIENT *c; LNODE *l, *tmp; list_for_each(r->clients, l, tmp) { c = l->data; if (!c->smart_client) continue; list_del(r->clients, &l); if (c->headers_sent || c->client_port) { stop_client_shutdown(c); } else { send_http_error(c->fno, "504 Gateway Timeout", "no-signal"); stop_client_shutdown(c); } } } void proxy_close(RESTREAMER *r, int error_code) { LOGf("CLOSE: Restreamer | Channel: %s Source: %s IP: %s srv_fd: %i Conn: %ld Serv: %ld TrafIn: %lld TrafOut: %lld\n", r->channel->name, r->channel->source, inet_ntoa(r->sockname.sin_addr), r->sock, r->connects, r->served, r->traffic_in, r->traffic_out); // If there are no clients left, no "Timeout" messages will be logged remove_clients(r, "Timeout", error_code); list_del_entry(restreamer, r); free_restreamer(r); } void proxy_begin_reconnect(RESTREAMER *r) { shutdown_fd(&(r->sock)); next_channel_source(r->channel); remove_smart_clients(r); } #define FATAL_ERROR(__no_signal_msg) do \ { \ remove_clients(r, __no_signal_msg, *http_code); \ free_chansrc(src); \ return -1; \ } while (0) /* On the last try, send no-signal to clients and exit otherwise wait a little bit before trying again */ #define DO_RECONNECT do \ { \ if (retries == 0) { \ FATAL_ERROR("Connect"); \ } else { \ free_chansrc(src); \ if (errno != EHOSTUNREACH) /* When host is unreachable there is already a delay of ~4 secs per try so no sleep is needed */ \ usleep(PROXY_RETRY_TIMEOUT * 1000); \ return 1; \ } \ } while(0) /* Returns: -1 = exit thread 1 = retry 0 = connected ok */ int connect_source(RESTREAMER *r, int retries, int readbuflen, int *http_code) { CHANSRC *src = init_chansrc(r->channel->source); if (!src) { LOGf("ERROR: Can't parse channel source | Channel: %s Source: %s\n", r->channel->name, r->channel->source); FATAL_ERROR("cant-parse-channel-source"); } r->connected = 0; r->reconnect = 0; int active = 1; int dret = async_resolve_host(src->host, src->port, &(r->sockname), DNS_RESOLVER_TIMEOUT, &active); if (dret != 0) { if (dret == 1) LOGf("ERROR: Can't resolve host | Channel: %s Source: %s Host: %s\n", r->channel->name, r->channel->source, src->host); if (dret == 2) LOGf("ERROR: DNS timeout | Channel: %s Source: %s Host: %s\n", r->channel->name, r->channel->source, src->host); DO_RECONNECT; } char buf[1024]; *http_code = 0; if (src->sproto == tcp_sock) { r->sock = socket(PF_INET, SOCK_STREAM, 0); if (r->sock < 0) { log_perror("connect_source(): Could not create SOCK_STREAM socket.", errno); FATAL_ERROR("cant-create-sock-stream-socket"); } LOGf("RCONN: Connecting | Channel: %s Source: %s IP: %s src_fd: %i TriesLeft: %i\n", r->channel->name, r->channel->source, inet_ntoa(r->sockname.sin_addr), r->sock, retries ); if (do_connect(r->sock, (struct sockaddr *)&(r->sockname), sizeof(r->sockname), PROXY_CONNECT_TIMEOUT) < 0) { LOGf("ERROR: Error connecting to %s srv_fd: %i err: %s\n", r->channel->source, r->sock, strerror(errno)); DO_RECONNECT; } snprintf(buf,sizeof(buf)-1, "GET /%s HTTP/1.0\nHost: %s:%u\nUser-Agent: %s %s (%s)\n\n", src->path, src->host, src->port, server_sig, server_ver, config->ident); buf[sizeof(buf)-1] = 0; allstats.traffic_out += safe_write(r->sock, buf, strlen(buf)); char xresponse[128]; memset(xresponse, 0, sizeof(xresponse)); memset(buf, 0, sizeof(buf)); regmatch_t res[4]; while (fdgetline(r->sock,buf,sizeof(buf)-1)) { if (buf[0] == '\n' || buf[0] == '\r') break; if (strstr(buf,"HTTP/1.") != NULL) { if (regexec(&http_response,buf,3,res,0) != REG_NOMATCH) { char codestr[4]; if ((unsigned int)res[1].rm_eo-res[1].rm_so < (unsigned)sizeof(xresponse)) { strncpy(xresponse, &buf[res[1].rm_so], res[1].rm_eo-res[1].rm_so); xresponse[res[1].rm_eo-res[1].rm_so] = '\0'; chomp(xresponse); strncpy(codestr, &buf[res[2].rm_so], res[2].rm_eo-res[2].rm_so); codestr[3] = 0; *http_code = atoi(codestr); } } } if (*http_code == 504) { // Extract extra error code if (strstr(buf, "X-ErrorCode: ") != NULL) { *http_code = atoi(buf+13); break; } } } if (*http_code == 0) { // No valid HTTP response, retry LOGf("DEBUG: Server returned not valid HTTP code | srv_fd: %i\n", r->sock); DO_RECONNECT; } if (*http_code == 504) { // No signal, exit LOGf("ERROR: Get no-signal for %s from %s on srv_fd: %i\n", r->channel->name, r->channel->source, r->sock); FATAL_ERROR("no-signal"); } if (*http_code > 300) { // Unhandled or error codes, exit LOGf("ERROR: Get code %i for %s from %s on srv_fd: %i exiting.\n", *http_code, r->channel->name, r->channel->source, r->sock); FATAL_ERROR("unhandled-http-code"); } // connected ok, continue } else { if (!IN_MULTICAST(ntohl(r->sockname.sin_addr.s_addr))) { LOGf("ERROR: %s is not multicast address\n", r->channel->source); FATAL_ERROR("source-is-not-multicast-address"); } struct ip_mreq mreq; struct sockaddr_in receiving_from; r->sock = socket(PF_INET, SOCK_DGRAM, 0); if (r->sock < 0) { log_perror("play(): Could not create SOCK_DGRAM socket.", errno); FATAL_ERROR("cant-create-sock-dgram-socket"); } LOGf("RCONN: Listening on multicast socket %s srv_fd: %i retries left: %i\n", r->channel->source, r->sock, retries); int on = 1; setsockopt(r->sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); // subscribe to multicast group memcpy(&mreq.imr_multiaddr, &(r->sockname.sin_addr), sizeof(struct in_addr)); mreq.imr_interface.s_addr = htonl(INADDR_ANY); if (setsockopt(r->sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) { LOGf("ERROR: Failed to add IP membership on %s srv_fd: %i\n", r->channel->source, r->sock); FATAL_ERROR("failed-to-add-membership"); } // bind to the socket so data can be read memset(&receiving_from, 0, sizeof(receiving_from)); receiving_from.sin_family = AF_INET; receiving_from.sin_addr = r->sockname.sin_addr; receiving_from.sin_port = htons(src->port); if (bind(r->sock, (struct sockaddr *) &receiving_from, sizeof(receiving_from)) < 0) { LOGf("ERROR: Failed to bind to %s srv_fd: %i\n", r->channel->source, r->sock); FATAL_ERROR("failed-to-bind"); } } if (setsockopt(r->sock, SOL_SOCKET, SO_RCVBUF, (const char *)&readbuflen, sizeof(readbuflen)) < 0) log_perror("play(): setsockopt(SO_RCVBUF)", errno); r->connects++; r->connected = 1; free_chansrc(src); return 0; } int connect_udp_client(CLIENT *c) { int newsock = -1; int client_socket = c->fno; struct sockaddr_in sockname; struct in_addr client_ip; inet_aton(c->IP, &client_ip); sockname.sin_family = AF_INET; sockname.sin_port = htons(c->client_port); sockname.sin_addr = client_ip; // Create new udp socket to the client newsock = socket(PF_INET, SOCK_DGRAM, 0); if (newsock < 0) { log_perror("process_new_clients(): Could not create SOCK_DGRAM socket.", errno); stop_client_noshutdown(c); // ?? return -1; } // Connect to the client // Connect on udp socket never returns an error // The error will be returned on the first send/write do_connect(newsock, (struct sockaddr *)&sockname, sizeof(sockname), 1000); // Return pls playlist (vlc supports it) send_200_ok(c->fno); if (c->agent && strstr(c->agent,"VLC")==c->agent) { // VLC support pls playlists fdputsf(c->fno, "Content-type: application/pls+xml\n\n[playlist]\nFile1=%s://@:%u\nTitle1=LiveTV\n", "udp", c->client_port); // sleep a little bit so VLC can start listening to the udp port shutdown_fd(&client_socket); usleep(500000); } else { fdputsf(c->fno, "Content-type: text/plain\n\n%s://@:%u\n", "udp", c->client_port); shutdown_fd(&client_socket); } // Change client socket to the new udp socket c->fno = newsock; return newsock; } void process_new_clients(RESTREAMER *r, int bufsz, uint32_t start_pts, unsigned int pos) { CLIENT *c; while ((c=queue_get_nowait(r->queue))) { if (c->clientid) { int client_exists = 0; { LNODE *l, *tmp; CLIENT *n = NULL; list_lock(r->clients); list_for_each_reverse(r->clients, l, tmp) { if (c->clientid == ((CLIENT *)l->data)->clientid) { n = l->data; break; } } // if the same client is already watching the channel but from different IP // or the same client is already watching the channel on the same port if (n && (strcmp(n->IP,c->IP)!=0 || (n->client_port && n->client_port==c->client_port))) client_exists = 1; list_unlock(r->clients); } if (client_exists) { LOGf("2ND : fd: %i | Client: %lu IP: %s Channel: %s Agent: %s\n", c->fno, c->clientid, c->IP, c->chan, c->agent); send_400_bad_request(c->fno, "already-watching-the-channel"); stop_client_shutdown(c); continue; } } // Connect with udp to the client, closing incomming http req socket if (c->client_port && connect_udp_client(c) == -1) { continue; } if (setsockopt(c->fno, SOL_SOCKET, SO_SNDBUF, (const char *) &bufsz,sizeof(bufsz))<0) log_perror("play(): setsockopt(SO_SNDBUF)", errno); c->pos = pos; c->start_pts = start_pts; list_add(r->clients, c); } } int shout(RESTREAMER *r, char *buf, const size_t bufsz, const size_t framelen, unsigned int pos, const char *content_type, char *header, unsigned int header_len) { CLIENT *n; int num_clients=0, nfds=0; struct timeval tv; fd_set wfds,efds,rfds; time_t now = time(NULL); if (r->clients==NULL || r->clients->items == 0) return 0; FD_ZERO(&wfds); FD_ZERO(&rfds); FD_ZERO(&efds); LNODE *l, *tmp; list_for_each_reverse(r->clients, l, tmp) { n = l->data; /* EXPR: No traffic in XX seconds. Kill the client. X seconds have passed and 0 traffic send - kill */ if ((now-n->ts >= NO_TRAFFIC_SECONDS) || (n->traffic_out == 0 && now-n->start >= NO_TRAFFIC_SECONDS)) { n->start += NO_TRAFFIC_SECONDS; stop_client_shutdown_mark(n, 'x'); list_del(r->clients, &l); continue; } FD_SET(n->fno,&wfds); FD_SET(n->fno,&efds); FD_SET(n->fno,&rfds); if (n->fno>nfds) nfds=n->fno; num_clients++; if (!n->logged) { client_log_connect(n); r->served++; } } tv.tv_sec = 0; tv.tv_usec= 0; if (select(nfds+1,&rfds,&wfds,&efds,&tv)>0) { list_for_each_reverse(r->clients, l, tmp) { n = l->data; if ((FD_ISSET(n->fno,&rfds)) || (FD_ISSET(n->fno,&efds)) || // closed connection ((n->expire > 0) && (now > n->expire)) || // expired session n->dienow) // Forcefully stopped client { stop_client_shutdown(n); list_del(r->clients, &l); num_clients--; continue; } // OK, send him the stream if is ready if ((buf!=NULL)&&(FD_ISSET(n->fno,&wfds))){ // Sent headers to the client (http client) if (!n->client_port && !n->headers_sent) { n->headers_sent = 1; send_200_ok(n->fno); // Standart http client response fdputsf(n->fno, "%s", content_type); if (header != NULL && header_len > 0) { allstats.traffic_out += safe_write(n->fno, header, header_len); } } int i,j; // When in mpeg mode the first packet is always 1316 also the client is at current buffer position if (!n->data_send && framelen == FRAME_PACKET_SIZE) { i = framelen; n->data_send = 1; if (config->disable_burst) { if (pos >= FRAME_PACKET_SIZE) n->pos = pos - FRAME_PACKET_SIZE; else n->pos = pos; } } if (pos > n->pos) { i = pos - n->pos; } else { i = bufsz * framelen - n->pos; } // Sent it j = safe_write(n->fno, buf+n->pos, i); if (j >= 0) { /* 0 bytes written is not an error per se, but here we threat it special, because there are buggy clients that connect and start reading 0 bytes. If we do not set n->ts these clients will be disconnected when NO_TRAFFIC_SECONDS pass */ if (j > 0) { allstats.traffic_out += j; r->traffic_out += j; n->traffic_out += j; n->pos += j; n->ts = time(NULL); } if (n->pos == bufsz * framelen) n->pos = 0; // Send netmsg if (framelen == FRAME_PACKET_SIZE && n->netmsg[0]) { char *netmsg = get_netmsg_packet(n->netmsg); if (netmsg) { safe_write(n->fno, netmsg, FRAME_PACKET_SIZE); n->netmsg[0] = 0; FREE(netmsg); } } } else { stop_client_shutdown(n); list_del(r->clients, &l); num_clients--; } } } } return num_clients; } int check_restreamer_state(RESTREAMER *r) { if (!keep_running) return 2; if (r->dienow) { LOGf("RESTR: Forced disconnect on srv_fd: %i | Channel: %s Source: %s\n", r->sock, r->channel->name, r->channel->source); return 2; } if (r->reconnect) { LOGf("RESTR: Forced reconnect on srv_fd: %i | Channel: %s Source: %s\n", r->sock, r->channel->name, r->channel->source); return 1; } return 0; }