mptsd reads mpegts streams from udp/multicast or http and combines them into one multiple program stream that is suitable for outputting to DVB-C modulator. Tested with Dektec DTE-3114 Quad QAM Modulator and used in production in small DVB-C networks. https://georgi.unixsol.org/programs/mptsd/

network.c 10.0KB


  1. /*
  2. * mptsd network routines
  3. * Copyright (C) 2010-2011 Unix Solutions Ltd.
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License version 2
  7. * as published by the Free Software Foundation.
  8. *
  9. * This program is distributed in the hope that it will be useful,
  10. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. * GNU General Public License for more details.
  13. *
  14. * You should have received a copy of the GNU General Public License
  15. * along with this program; if not, write to the Free Software
  16. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
  17. */
  18. #include <stdio.h>
  19. #include <stdlib.h>
  20. #include <unistd.h>
  21. #include <string.h>
  22. #include <pthread.h>
  23. #include <errno.h>
  24. #include <arpa/inet.h>
  25. #include <netinet/in.h>
  26. #include <regex.h>
  27. #include "libfuncs/io.h"
  28. #include "libfuncs/log.h"
  29. #include "libfuncs/list.h"
  30. #include "libfuncs/asyncdns.h"
  31. #include "libtsfuncs/tsfuncs.h"
  32. #include "data.h"
  33. #include "config.h"
  34. extern char *server_sig;
  35. extern char *server_ver;
  36. extern CONFIG *config;
  37. int connect_udp(struct sockaddr_in send_to) {
  38. int sendsock = socket(AF_INET, SOCK_DGRAM, 0);
  39. if (sendsock < 0) {
  40. LOGf("socket(SOCK_DGRAM): %s\n", strerror(errno));
  41. return -1;
  42. }
  43. int on = 1;
  44. setsockopt(sendsock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
  45. // subscribe to multicast group
  46. // LOGf("Using ttl %d\n", multicast_ttl);
  47. if (IN_MULTICAST(ntohl(send_to.sin_addr.s_addr))) {
  48. if (setsockopt(sendsock, IPPROTO_IP, IP_MULTICAST_TTL, &config->multicast_ttl, sizeof(config->multicast_ttl)) < 0) {
  49. LOGf("setsockopt(IP_MUTICAST_TTL): %s\n", strerror(errno));
  50. close(sendsock);
  51. return -1;
  52. }
  53. if (setsockopt(sendsock, IPPROTO_IP, IP_MULTICAST_IF, &config->output_intf, sizeof(config->output_intf)) < 0) {
  54. LOGf("setsockopt(IP_MUTICAST_IF, %s): %s\n", strerror(errno), inet_ntoa(config->output_intf));
  55. close(sendsock);
  56. return -1;
  57. }
  58. }
  59. int writebuflen = 1316 * 100;
  60. if (setsockopt(sendsock, SOL_SOCKET, SO_SNDBUF, (const char *)&writebuflen, sizeof(writebuflen)) < 0)
  61. log_perror("play(): setsockopt(SO_SNDBUF)", errno);
  62. // call connect to get errors
  63. if (connect(sendsock, (struct sockaddr *)&send_to, sizeof send_to)) {
  64. LOGf("udp_connect() error: %s\n", strerror(errno));
  65. close(sendsock);
  66. return -1;
  67. }
  68. return sendsock;
  69. }
  70. void connect_output(OUTPUT *o) {
  71. struct sockaddr_in sock;
  72. sock.sin_family = AF_INET;
  73. sock.sin_port = htons(o->out_port);
  74. sock.sin_addr = o->out_host;
  75. o->out_sock = connect_udp(sock);
  76. if (o->out_sock > -1) {
  77. //LOGf("OUTPUT: Connected out_fd: %i | Output: udp://%s:%d\n", o->out_sock, inet_ntoa(o->out_host), o->out_port);
  78. } else {
  79. LOGf("ERROR: Can't connect output | Output: udp://%s:%d\n", inet_ntoa(o->out_host), o->out_port);
  80. exit(1);
  81. }
  82. }
  83. /*
  84. On the last try, send no-signal to clients and exit
  85. otherwise wait a little bit before trying again
  86. */
  87. #define DO_RECONNECT do \
  88. { \
  89. chansrc_free(&src); \
  90. if (retries == 0) { \
  91. return -1; \
  92. } else { \
  93. if (errno != EHOSTUNREACH) /* When host is unreachable there is already a delay of ~4 secs per try so no sleep is needed */ \
  94. usleep(PROXY_RETRY_TIMEOUT * 1000); \
  95. if (r->dienow) \
  96. return -1; \
  97. return 1; \
  98. } \
  99. } while(0)
  100. #define FATAL_ERROR do \
  101. { \
  102. chansrc_free(&src); \
  103. return -1; \
  104. } while (0)
  105. /*
  106. Returns:
  107. -1 = exit thread
  108. 1 = retry
  109. 0 = connected ok
  110. */
  111. int connect_source(INPUT *r, int retries, int readbuflen, int *http_code) {
  112. CHANSRC *src = chansrc_init(r->channel->source);
  113. if (!src) {
  114. LOGf("ERR : Can't parse channel source | Channel: %s Source: %s\n", r->channel->name, r->channel->source);
  115. FATAL_ERROR;
  116. }
  117. r->connected = 0;
  118. r->reconnect = 0;
  119. int active = 1;
  120. int dret = async_resolve_host(src->host, src->port, &(r->src_sockname), 5000, &active);
  121. if (dret != 0) {
  122. if (dret == 1)
  123. proxy_log(r, "Can't resolve host");
  124. if (dret == 2)
  125. proxy_log(r, "Timeout resolving host");
  126. DO_RECONNECT;
  127. }
  128. proxy_log(r, "Connecting");
  129. char buf[1024];
  130. *http_code = 0;
  131. if (src->sproto == tcp_sock) {
  132. r->sock = socket(PF_INET, SOCK_STREAM, 0);
  133. if (r->sock < 0) {
  134. log_perror("play(): Could not create SOCK_STREAM socket.", errno);
  135. FATAL_ERROR;
  136. }
  137. //proxy_log(r, "Add");
  138. if (do_connect(r->sock, (struct sockaddr *)&(r->src_sockname), sizeof(r->src_sockname), PROXY_CONNECT_TIMEOUT) < 0) {
  139. LOGf("ERR : Error connecting to %s srv_fd: %i err: %s\n", r->channel->source, r->sock, strerror(errno));
  140. DO_RECONNECT;
  141. }
  142. snprintf(buf,sizeof(buf)-1, "GET /%s HTTP/1.0\nHost: %s:%u\nX-Smart-Client: yes\nUser-Agent: %s %s (%s)\n\n",
  143. src->path, src->host, src->port, server_sig, server_ver, config->ident);
  144. buf[sizeof(buf)-1] = 0;
  145. fdwrite(r->sock, buf, strlen(buf));
  146. char xresponse[128];
  147. memset(xresponse, 0, sizeof(xresponse));
  148. memset(buf, 0, sizeof(buf));
  149. regmatch_t res[4];
  150. while (fdgetline(r->sock,buf,sizeof(buf)-1)) {
  151. if (buf[0] == '\n' || buf[0] == '\r')
  152. break;
  153. if (strstr(buf,"HTTP/1.") != NULL) {
  154. regex_t http_response;
  155. regcomp(&http_response, "^HTTP/1.[0-1] (([0-9]{3}) .*)", REG_EXTENDED);
  156. if (regexec(&http_response,buf,3,res,0) != REG_NOMATCH) {
  157. char codestr[4];
  158. if ((unsigned int)res[1].rm_eo-res[1].rm_so < sizeof(xresponse)) {
  159. strncpy(xresponse, &buf[res[1].rm_so], res[1].rm_eo-res[1].rm_so);
  160. xresponse[res[1].rm_eo-res[1].rm_so] = '\0';
  161. chomp(xresponse);
  162. strncpy(codestr, &buf[res[2].rm_so], res[2].rm_eo-res[2].rm_so);
  163. codestr[3] = 0;
  164. *http_code = atoi(codestr);
  165. }
  166. }
  167. regfree(&http_response);
  168. }
  169. if (*http_code == 504) { // Extract extra error code
  170. if (strstr(buf, "X-ErrorCode: ") != NULL) {
  171. *http_code = atoi(buf+13);
  172. break;
  173. }
  174. }
  175. }
  176. if (*http_code == 0) { // No valid HTTP response, retry
  177. LOGf("DEBUG: Server returned not valid HTTP code | srv_fd: %i\n", r->sock);
  178. DO_RECONNECT;
  179. }
  180. if (*http_code == 504) { // No signal, exit
  181. LOGf("ERR : Get no-signal for %s from %s on srv_fd: %i\n", r->channel->name, r->channel->source, r->sock);
  182. FATAL_ERROR;
  183. }
  184. if (*http_code > 300) { // Unhandled or error codes, exit
  185. LOGf("ERR : Get code %i for %s from %s on srv_fd: %i exiting.\n", *http_code, r->channel->name, r->channel->source, r->sock);
  186. FATAL_ERROR;
  187. }
  188. // connected ok, continue
  189. } else {
  190. char multicast = IN_MULTICAST(ntohl(r->src_sockname.sin_addr.s_addr));
  191. //if (!IN_MULTICAST(ntohl(r->src_sockname.sin_addr.s_addr))) {
  192. // LOGf("ERR : %s is not multicast address\n", r->channel->source);
  193. // FATAL_ERROR;
  194. //}
  195. struct ip_mreq mreq;
  196. struct sockaddr_in receiving_from;
  197. r->sock = socket(PF_INET, SOCK_DGRAM, 0);
  198. if (r->sock < 0) {
  199. log_perror("play(): Could not create SOCK_DGRAM socket.", errno);
  200. FATAL_ERROR;
  201. }
  202. // LOGf("CONN : Listening on multicast socket %s srv_fd: %i retries left: %i\n", r->channel->source, r->sock, retries);
  203. int on = 1;
  204. setsockopt(r->sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
  205. if (multicast) {
  206. // subscribe to multicast group
  207. memcpy(&mreq.imr_multiaddr, &(r->src_sockname.sin_addr), sizeof(struct in_addr));
  208. mreq.imr_interface.s_addr = htonl(INADDR_ANY);
  209. if (setsockopt(r->sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) {
  210. LOGf("ERR : Failed to add IP membership on %s srv_fd: %i\n", r->channel->source, r->sock);
  211. FATAL_ERROR;
  212. }
  213. }
  214. // bind to the socket so data can be read
  215. memset(&receiving_from, 0, sizeof(receiving_from));
  216. receiving_from.sin_family = AF_INET;
  217. receiving_from.sin_addr = r->src_sockname.sin_addr;
  218. receiving_from.sin_port = htons(src->port);
  219. if (bind(r->sock, (struct sockaddr *) &receiving_from, sizeof(receiving_from)) < 0) {
  220. LOGf("ERR : Failed to bind to %s srv_fd: %i\n", r->channel->source, r->sock);
  221. FATAL_ERROR;
  222. }
  223. }
  224. if (setsockopt(r->sock, SOL_SOCKET, SO_RCVBUF, (const char *)&readbuflen, sizeof(readbuflen)) < 0)
  225. log_perror("play(): setsockopt(SO_RCVBUF)", errno);
  226. r->connected = 1;
  227. // proxy_log(r, "Connected");
  228. chansrc_free(&src);
  229. return 0;
  230. }
  231. /* Start: 3 seconds on connect */
  232. /* In connection: Max UDP timeout == 3 seconds (read) + 2 seconds (connect) == 5 seconds */
  233. #define UDP_READ_RETRIES 3
  234. #define UDP_READ_TIMEOUT 1000
  235. /* Start: 1/4 seconds on connect */
  236. /* In connection: Max TCP timeout == 5 seconds (read) + 2 seconds (connect) == 7 seconds */
  237. /* In connection: Max TCP timeout == 5 seconds (read) + 8 seconds (connect, host unrch) == 13 seconds */
  238. #define TCP_READ_RETRIES 5
  239. #define TCP_READ_TIMEOUT 1000
  240. /*
  241. Returns:
  242. 0 = synced ok
  243. 1 = not synced, reconnect
  244. */
  245. int mpeg_sync(INPUT *r, channel_source source_proto) {
  246. time_t sync_start = time(NULL);
  247. unsigned int sync_packets = 0;
  248. unsigned int read_bytes = 0;
  249. char syncframe[188];
  250. int _timeout = TCP_READ_TIMEOUT;
  251. int _retries = TCP_READ_RETRIES;
  252. if (source_proto == udp_sock) {
  253. _timeout = UDP_READ_TIMEOUT;
  254. _retries = UDP_READ_RETRIES;
  255. }
  256. do {
  257. if (r->dienow)
  258. return 1;
  259. resync:
  260. if (fdread_ex(r->sock, syncframe, 1, _timeout, _retries, 1) != 1) {
  261. proxy_log(r, "mpeg_sync fdread() timeoutA");
  262. return 1; // reconnect
  263. }
  264. // LOGf("DEBUG: Read 0x%02x Offset %u Sync: %u\n", (uint8_t)syncframe[0], read_bytes, sync_packets);
  265. read_bytes++;
  266. if (syncframe[0] == 0x47) {
  267. ssize_t rdsz = fdread_ex(r->sock, syncframe, 188-1, _timeout, _retries, 1);
  268. if (rdsz != 188-1) {
  269. proxy_log(r, "mpeg_sync fdread() timeoutB");
  270. return 1; // reconnect
  271. }
  272. read_bytes += 188-1;
  273. if (++sync_packets == 7) // sync 7 packets
  274. break;
  275. goto resync;
  276. } else {
  277. sync_packets = 0;
  278. }
  279. if (read_bytes > FRAME_PACKET_SIZE) { // Can't sync in 1316 bytes
  280. proxy_log(r, "mpeg_sync can't sync after 1316 bytes");
  281. return 1; // reconnect
  282. }
  283. if (sync_start+2 <= time(NULL)) { // Do not sync in two seconds
  284. proxy_log(r, "mpeg_sync can't sync in 2 seconds");
  285. return 1; // reconnect
  286. }
  287. } while (1);
  288. if (read_bytes-FRAME_PACKET_SIZE != 0)
  289. LOGf("INPUT : [%-12s] TS synced after %u bytes\n", r->channel->id, read_bytes-FRAME_PACKET_SIZE);
  290. return 0;
  291. }