mptsd reads mpegts streams from udp/multicast or http and combines them into one multiple program stream that is suitable for outputting to DVB-C modulator. Tested with Dektec DTE-3114 Quad QAM Modulator and used in production in small DVB-C networks. https://georgi.unixsol.org/programs/mptsd/
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

input.c 11KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420
  1. #include <stdio.h>
  2. #include <math.h>
  3. #include <stdlib.h>
  4. #include <unistd.h>
  5. #include <signal.h>
  6. #include <string.h>
  7. #include "libfuncs/io.h"
  8. #include "libfuncs/log.h"
  9. #include "libtsfuncs/tsfuncs.h"
  10. #include "data.h"
  11. #include "config.h"
  12. #include "network.h"
  13. extern int keep_going;
  14. // #define dump_tables 1
  15. #define MAX_ZERO_READS 3
  16. /* Start: 3 seconds on connect */
  17. /* In connection: Max UDP timeout == 3 seconds (read) + 2 seconds (connect) == 5 seconds */
  18. #define UDP_READ_RETRIES 3
  19. #define UDP_READ_TIMEOUT 1000
  20. /* Start: 1/4 seconds on connect */
  21. /* In connection: Max TCP timeout == 5 seconds (read) + 2 seconds (connect) == 7 seconds */
  22. /* In connection: Max TCP timeout == 5 seconds (read) + 8 seconds (connect, host unrch) == 13 seconds */
  23. #define TCP_READ_RETRIES 5
  24. #define TCP_READ_TIMEOUT 1000
  25. // Init pmt_pid and nit_pid
  26. // Return 0 on error, 1 on success
  27. int input_process_pat(INPUT *r) {
  28. int i;
  29. int num_programs = 0;
  30. INPUT_STREAM *s = &r->stream;
  31. struct ts_pat *pat = s->pat;
  32. s->nit_pid = 0x10; // Default NIT pid
  33. for (i=0;i<pat->programs_num;i++) {
  34. struct ts_pat_program *prg = pat->programs[i];
  35. if (prg->pid) {
  36. if (prg->program == 0) { // NIT
  37. s->nit_pid = prg->pid;
  38. } else { // PAT
  39. s->pmt_pid = prg->pid;
  40. num_programs++;
  41. break; // Get only the first program
  42. }
  43. }
  44. }
  45. // MPTS is not supported as input stream in the moment
  46. if (num_programs > 1) {
  47. LOGf("INPUT : %-10s | Can't handle MPTS (%d programs) as input stream\n", r->channel->id, num_programs);
  48. return 0;
  49. }
  50. return 1;
  51. }
  52. void input_rewrite_pat(INPUT *r) {
  53. int i;
  54. INPUT_STREAM *s = &r->stream;
  55. struct ts_pat *new_pat = ts_pat_copy(s->pat);
  56. if (!new_pat)
  57. return;
  58. // Rewrite PAT pids
  59. for (i=0;i<new_pat->programs_num;i++) {
  60. struct ts_pat_program *prg = new_pat->programs[i];
  61. if (prg->program != 0) { // Skip NIT
  62. // Add pid to rewriter
  63. pidref_add(s->pidref, prg->pid, s->pidref->base_pid);
  64. // Rewrite PAT
  65. prg->program = r->channel->service_id;
  66. prg->pid = s->pidref->base_pid;
  67. s->pidref->base_pid++;
  68. }
  69. }
  70. // Save rewritten packet
  71. ts_pat_regenerate_packets(new_pat);
  72. s->pat_rewritten = new_pat;
  73. }
  74. void input_rewrite_pmt(INPUT *r) {
  75. INPUT_STREAM *s = &r->stream;
  76. struct ts_pmt *new_pmt = ts_pmt_copy(s->pmt);
  77. if (!new_pmt)
  78. return;
  79. // Rewrite PMT pids
  80. new_pmt->ts_header.pid = pidref_get_new_pid(s->pidref, s->pmt_pid);
  81. new_pmt->section_header->ts_id_number = r->channel->service_id;
  82. uint16_t org_pcr_pid = new_pmt->PCR_pid;
  83. s->pcr_pid = new_pmt->PCR_pid;
  84. pidref_add(s->pidref, org_pcr_pid, s->pidref->base_pid);
  85. new_pmt->PCR_pid = s->pidref->base_pid;
  86. r->output_pcr_pid = new_pmt->PCR_pid;
  87. s->pidref->base_pid++;
  88. int i;
  89. for (i=0;i<new_pmt->streams_num;i++) {
  90. struct ts_pmt_stream *stream = new_pmt->streams[i];
  91. if (stream->pid == org_pcr_pid) { // Already rewritten and added to pidref
  92. stream->pid = new_pmt->PCR_pid;
  93. continue;
  94. }
  95. pidref_add(s->pidref, stream->pid, s->pidref->base_pid);
  96. stream->pid = s->pidref->base_pid;
  97. s->pidref->base_pid++;
  98. }
  99. ts_pmt_regenerate_packets(new_pmt);
  100. s->pmt_rewritten = new_pmt;
  101. }
  102. extern CONFIG *config;
  103. void input_buffer_add(INPUT *r, uint8_t *data, int datasize) {
  104. if (r->dienow)
  105. return;
  106. if (r->ifd)
  107. write(r->ifd, data, datasize);
  108. if (r->disabled) {
  109. unsigned long bufsize = r->buf->input - r->buf->output;
  110. double buffull = ((double)bufsize / r->buf->size) * 100;
  111. if (buffull <= 50) {
  112. proxy_log(r, "Enable input");
  113. r->disabled = 0;
  114. } else {
  115. return;
  116. }
  117. }
  118. if (cbuf_fill(r->buf, data, datasize) != 0) {
  119. proxy_log(r, "Disable input, buffer is full.");
  120. r->disabled = 1;
  121. }
  122. }
  123. int input_check_state(INPUT *r) {
  124. if (r->dienow) {
  125. // proxy_log(r, "Forced disconnect.");
  126. return 2;
  127. }
  128. if (r->reconnect) {
  129. proxy_log(r, "Forced reconnect.");
  130. return 1;
  131. }
  132. return 0;
  133. }
  134. int process_pat(INPUT *r, uint16_t pid, uint8_t *ts_packet) {
  135. INPUT_STREAM *s = &r->stream;
  136. if (pid != 0)
  137. return 0;
  138. // Process PAT
  139. s->pat = ts_pat_push_packet(s->pat, ts_packet);
  140. s->last_pat = ts_pat_push_packet(s->last_pat, ts_packet);
  141. if (s->last_pat->initialized) {
  142. if (!ts_pat_is_same(s->pat, s->last_pat)) {
  143. proxy_log(r, "PAT changed.");
  144. return -1; // Reconnect
  145. }
  146. ts_pat_free(&s->last_pat);
  147. s->last_pat = ts_pat_alloc();
  148. }
  149. if (s->pat->initialized) {
  150. // PMT pid is still unknown
  151. if (!s->pmt_pid) {
  152. if (!input_process_pat(r)) {
  153. proxy_log(r, "Can't parse PAT to find PMT pid.");
  154. return -1;
  155. }
  156. }
  157. // Rewritten PAT is not yet initialized
  158. if (!s->pat_rewritten || !s->pat_rewritten->initialized) {
  159. input_rewrite_pat(r);
  160. #if dump_tables
  161. proxy_log(r, "PAT found!");
  162. proxy_log(r, "*** Original PAT ***");
  163. ts_pat_dump(s->pat);
  164. proxy_log(r, "*** Rewritten PAT ***");
  165. ts_pat_dump(s->pat_rewritten);
  166. pidref_dump(s->pidref);
  167. #endif
  168. }
  169. // Only if output file is written
  170. if (r->ifd && s->pat_rewritten && s->pat_rewritten->initialized) {
  171. int j;
  172. struct ts_pat *P = s->pat_rewritten;
  173. for (j=0;j<P->section_header->num_packets;j++) {
  174. ts_packet_set_cont(P->section_header->packet_data + (j * TS_PACKET_SIZE), j + s->pid_pat_cont);
  175. }
  176. P->ts_header.continuity = s->pid_pat_cont;
  177. s->pid_pat_cont += P->section_header->num_packets;
  178. write(r->ifd, P->section_header->packet_data, P->section_header->num_packets * TS_PACKET_SIZE);
  179. }
  180. }
  181. // Stuff packet with NULL data
  182. memset(ts_packet, 0xff, TS_PACKET_SIZE);
  183. ts_packet[0] = 0x47;
  184. ts_packet[1] = 0x1F;
  185. ts_packet[2] = 0xFF;
  186. ts_packet[3] = 0x10;
  187. return 1;
  188. }
  189. int process_pmt(INPUT *r, uint16_t pid, uint8_t *ts_packet) {
  190. INPUT_STREAM *s = &r->stream;
  191. if (!pid || pid != s->pmt_pid)
  192. return 0;
  193. s->pmt = ts_pmt_push_packet(s->pmt, ts_packet);
  194. s->last_pmt = ts_pmt_push_packet(s->last_pmt, ts_packet);
  195. if (s->last_pmt->initialized) {
  196. if (!ts_pmt_is_same(s->pmt, s->last_pmt)) {
  197. proxy_log(r, "PMT changed.");
  198. return -2; // Reconnect
  199. }
  200. ts_pmt_free(&s->last_pmt);
  201. s->last_pmt = ts_pmt_alloc();
  202. }
  203. if (s->pmt->initialized) {
  204. if (!s->pmt_rewritten || !s->pmt_rewritten->initialized) {
  205. input_rewrite_pmt(r);
  206. #if dump_tables
  207. proxy_log(r, "PMT found!");
  208. proxy_log(r, "*** Original PMT ***");
  209. ts_pmt_dump(s->pmt);
  210. proxy_log(r, "*** Rewritten PMT ***");
  211. ts_pmt_dump(s->pmt_rewritten);
  212. // pidref_dump(s->pidref);
  213. #endif
  214. }
  215. if (s->pmt_rewritten && s->pmt_rewritten->initialized) {
  216. int j;
  217. struct ts_pmt *P = s->pmt_rewritten;
  218. for (j=0;j<P->section_header->num_packets;j++) {
  219. ts_packet_set_cont(P->section_header->packet_data + (j * TS_PACKET_SIZE), j + s->pid_pmt_cont);
  220. }
  221. P->ts_header.continuity = s->pid_pmt_cont;
  222. s->pid_pmt_cont += P->section_header->num_packets;
  223. input_buffer_add(r, P->section_header->packet_data, P->section_header->num_packets * TS_PACKET_SIZE);
  224. }
  225. return -1;
  226. }
  227. return 1;
  228. }
  229. int in_worktime(int start, int end) {
  230. if (!start && !end)
  231. return 1;
  232. struct tm ltime;
  233. struct tm *ltimep = &ltime;
  234. time_t timep = time(NULL);
  235. ltimep = localtime_r(&timep, ltimep);
  236. int seconds = ltime.tm_sec + ltime.tm_min * 60 + ltime.tm_hour * 3600;
  237. if (start > end) {
  238. if (start >= seconds && end < seconds)
  239. return 0;
  240. else
  241. return 1;
  242. } else {
  243. if (start <= seconds && end > seconds)
  244. return 1;
  245. else
  246. return 0;
  247. }
  248. return 1;
  249. }
  250. void * input_stream(void *self) {
  251. INPUT *r = self;
  252. INPUT_STREAM *s = &r->stream;
  253. char buf[FRAME_PACKET_SIZE];
  254. signal(SIGPIPE, SIG_IGN);
  255. proxy_log(r, "Start");
  256. r->working = in_worktime(r->channel->worktime_start, r->channel->worktime_end);
  257. if (!r->working)
  258. proxy_log(r, "Worktime has not yet begin, sleeping.");
  259. int http_code = 0;
  260. while (keep_going) {
  261. if (input_check_state(r) == 2) // r->dienow is on
  262. goto QUIT;
  263. while (!r->working) {
  264. usleep(250000);
  265. r->working = in_worktime(r->channel->worktime_start, r->channel->worktime_end);
  266. if (r->working)
  267. proxy_log(r, "Worktime started.");
  268. if (!keep_going)
  269. goto QUIT;
  270. }
  271. r->working = in_worktime(r->channel->worktime_start, r->channel->worktime_end);
  272. int result = connect_source(self, 1, FRAME_PACKET_SIZE * 1000, &http_code);
  273. if (result != 0)
  274. goto RECONNECT;
  275. channel_source sproto = get_sproto(r->channel->source);
  276. if (mpeg_sync(r, sproto) != 0) {
  277. proxy_log(r, "Can't sync input MPEG TS");
  278. sleep(2);
  279. goto RECONNECT;
  280. }
  281. ssize_t readen;
  282. int max_zero_reads = MAX_ZERO_READS;
  283. // Reset all stream parameters on reconnect.
  284. input_stream_reset(r);
  285. for (;;) {
  286. r->working = in_worktime(r->channel->worktime_start, r->channel->worktime_end);
  287. if (!r->working) {
  288. proxy_log(r, "Worktime ended.");
  289. goto STOP;
  290. }
  291. switch (input_check_state(r)) {
  292. case 1: goto RECONNECT; // r->reconnect is on
  293. case 2: goto QUIT; // r->dienow is on
  294. }
  295. if (sproto == tcp_sock) {
  296. readen = fdread_ex(r->sock, buf, FRAME_PACKET_SIZE, TCP_READ_TIMEOUT, TCP_READ_RETRIES, 1);
  297. } else {
  298. readen = fdread_ex(r->sock, buf, FRAME_PACKET_SIZE, UDP_READ_TIMEOUT, UDP_READ_RETRIES, 0);
  299. }
  300. if (readen < 0)
  301. goto RECONNECT;
  302. if (readen == 0) { // ho, hum, wtf is going on here?
  303. proxy_log(r, "Zero read, continuing...");
  304. if (--max_zero_reads == 0) {
  305. proxy_log(r, "Max zero reads reached, reconnecting.");
  306. break;
  307. }
  308. continue;
  309. }
  310. int i;
  311. for (i=0; i<readen; i+=188) {
  312. if (r->dienow)
  313. goto QUIT;
  314. uint8_t *ts_packet = (uint8_t *)buf + i;
  315. uint16_t pid = ts_packet_get_pid(ts_packet);
  316. if (process_pat(r, pid, ts_packet) < 0)
  317. goto RECONNECT;
  318. int pmt_result = process_pmt(r, pid, ts_packet);
  319. if (pmt_result == -2)
  320. goto RECONNECT;
  321. if (pmt_result < 0) // PMT rewritten
  322. continue;
  323. pid = ts_packet_get_pid(ts_packet);
  324. // Kill incomming NIT, SDT, EIT, RST, TDT/TOT
  325. if (pid == s->nit_pid || pid == 0x10 || pid == 0x11 || pid == 0x12 || pid == 0x13 || pid == 0x14 || pid == 0x1fff) {
  326. // LOGf("INPUT: %-10s: Remove PID %03x\n", r->channel->id, pid);
  327. continue;
  328. }
  329. // Do we have PAT and PMT? (if we have pmt we have PAT, so check only for PMT)
  330. if (s->pmt_rewritten && pid == s->pcr_pid && ts_packet_has_pcr(ts_packet)) {
  331. s->input_pcr = ts_packet_get_pcr(ts_packet);
  332. // LOGf("INPUT : [%-12s] PCR: %llu\n", r->channel->id, s->input_pcr);
  333. }
  334. // Yes, we have enough data to start outputing
  335. if (s->input_pcr) {
  336. pidref_change_packet_pid(ts_packet, pid, s->pidref);
  337. input_buffer_add(r, ts_packet, TS_PACKET_SIZE);
  338. if (!r->input_ready)
  339. r->input_ready = 1;
  340. }
  341. }
  342. max_zero_reads = MAX_ZERO_READS;
  343. }
  344. proxy_log(r, "fdread timeout");
  345. RECONNECT:
  346. proxy_log(r, "Reconnect");
  347. shutdown_fd(&(r->sock));
  348. chansrc_next(r->channel);
  349. continue;
  350. STOP:
  351. proxy_log(r, "Stop");
  352. shutdown_fd(&(r->sock));
  353. continue;
  354. QUIT:
  355. break;
  356. }
  357. proxy_close(config->inputs, &r);
  358. return 0;
  359. }