mptsd reads mpegts streams from udp/multicast or http and combines them into one multiple program stream that is suitable for outputting to DVB-C modulator. Tested with Dektec DTE-3114 Quad QAM Modulator and used in production in small DVB-C networks. https://georgi.unixsol.org/programs/mptsd/

input.c 11KB


  1. /*
  2. * mptsd input handling
  3. * Copyright (C) 2010-2011 Unix Solutions Ltd.
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License version 2
  7. * as published by the Free Software Foundation.
  8. *
  9. * This program is distributed in the hope that it will be useful,
  10. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. * GNU General Public License for more details.
  13. *
  14. * You should have received a copy of the GNU General Public License
  15. * along with this program; if not, write to the Free Software
  16. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
  17. */
  18. #include <stdio.h>
  19. #include <math.h>
  20. #include <stdlib.h>
  21. #include <unistd.h>
  22. #include <signal.h>
  23. #include <string.h>
  24. #include "libfuncs/io.h"
  25. #include "libfuncs/log.h"
  26. #include "libtsfuncs/tsfuncs.h"
  27. #include "data.h"
  28. #include "config.h"
  29. #include "network.h"
  30. extern int keep_going;
  31. // #define dump_tables 1
  32. #define MAX_ZERO_READS 3
  33. /* Start: 3 seconds on connect */
  34. /* In connection: Max UDP timeout == 3 seconds (read) + 2 seconds (connect) == 5 seconds */
  35. #define UDP_READ_RETRIES 3
  36. #define UDP_READ_TIMEOUT 1000
  37. /* Start: 1/4 seconds on connect */
  38. /* In connection: Max TCP timeout == 5 seconds (read) + 2 seconds (connect) == 7 seconds */
  39. /* In connection: Max TCP timeout == 5 seconds (read) + 8 seconds (connect, host unrch) == 13 seconds */
  40. #define TCP_READ_RETRIES 5
  41. #define TCP_READ_TIMEOUT 1000
  42. // Init pmt_pid and nit_pid
  43. // Return 0 on error, 1 on success
  44. int input_process_pat(INPUT *r) {
  45. int i;
  46. int num_programs = 0;
  47. INPUT_STREAM *s = &r->stream;
  48. struct ts_pat *pat = s->pat;
  49. s->nit_pid = 0x10; // Default NIT pid
  50. for (i=0;i<pat->programs_num;i++) {
  51. struct ts_pat_program *prg = pat->programs[i];
  52. if (prg->pid) {
  53. if (prg->program == 0) { // NIT
  54. s->nit_pid = prg->pid;
  55. } else { // PAT
  56. s->pmt_pid = prg->pid;
  57. num_programs++;
  58. break; // Get only the first program
  59. }
  60. }
  61. }
  62. // MPTS is not supported as input stream in the moment
  63. if (num_programs > 1) {
  64. LOGf("INPUT : %-10s | Can't handle MPTS (%d programs) as input stream\n", r->channel->id, num_programs);
  65. return 0;
  66. }
  67. return 1;
  68. }
  69. void input_rewrite_pat(INPUT *r) {
  70. int i;
  71. INPUT_STREAM *s = &r->stream;
  72. struct ts_pat *new_pat = ts_pat_copy(s->pat);
  73. if (!new_pat)
  74. return;
  75. // Rewrite PAT pids
  76. for (i=0;i<new_pat->programs_num;i++) {
  77. struct ts_pat_program *prg = new_pat->programs[i];
  78. if (prg->program != 0) { // Skip NIT
  79. // Add pid to rewriter
  80. pidref_add(s->pidref, prg->pid, s->pidref->base_pid);
  81. // Rewrite PAT
  82. prg->program = r->channel->service_id;
  83. prg->pid = s->pidref->base_pid;
  84. s->pidref->base_pid++;
  85. }
  86. }
  87. // Save rewritten packet
  88. ts_pat_regenerate_packets(new_pat);
  89. s->pat_rewritten = new_pat;
  90. }
  91. void input_rewrite_pmt(INPUT *r) {
  92. INPUT_STREAM *s = &r->stream;
  93. struct ts_pmt *new_pmt = ts_pmt_copy(s->pmt);
  94. if (!new_pmt)
  95. return;
  96. // Rewrite PMT pids
  97. new_pmt->ts_header.pid = pidref_get_new_pid(s->pidref, s->pmt_pid);
  98. new_pmt->section_header->ts_id_number = r->channel->service_id;
  99. uint16_t org_pcr_pid = new_pmt->PCR_pid;
  100. s->pcr_pid = new_pmt->PCR_pid;
  101. pidref_add(s->pidref, org_pcr_pid, s->pidref->base_pid);
  102. new_pmt->PCR_pid = s->pidref->base_pid;
  103. r->output_pcr_pid = new_pmt->PCR_pid;
  104. s->pidref->base_pid++;
  105. int i;
  106. for (i=0;i<new_pmt->streams_num;i++) {
  107. struct ts_pmt_stream *stream = new_pmt->streams[i];
  108. if (stream->pid == org_pcr_pid) { // Already rewritten and added to pidref
  109. stream->pid = new_pmt->PCR_pid;
  110. continue;
  111. }
  112. pidref_add(s->pidref, stream->pid, s->pidref->base_pid);
  113. stream->pid = s->pidref->base_pid;
  114. s->pidref->base_pid++;
  115. }
  116. ts_pmt_regenerate_packets(new_pmt);
  117. s->pmt_rewritten = new_pmt;
  118. }
  119. extern CONFIG *config;
  120. void input_buffer_add(INPUT *r, uint8_t *data, int datasize) {
  121. if (r->dienow)
  122. return;
  123. if (r->ifd)
  124. write(r->ifd, data, datasize);
  125. if (r->disabled) {
  126. unsigned long bufsize = r->buf->input - r->buf->output;
  127. double buffull = ((double)bufsize / r->buf->size) * 100;
  128. if (buffull <= 50) {
  129. proxy_log(r, "Enable input");
  130. r->disabled = 0;
  131. } else {
  132. return;
  133. }
  134. }
  135. if (cbuf_fill(r->buf, data, datasize) != 0) {
  136. proxy_log(r, "Disable input, buffer is full.");
  137. r->disabled = 1;
  138. }
  139. }
  140. int input_check_state(INPUT *r) {
  141. if (r->dienow) {
  142. // proxy_log(r, "Forced disconnect.");
  143. return 2;
  144. }
  145. if (r->reconnect) {
  146. proxy_log(r, "Forced reconnect.");
  147. return 1;
  148. }
  149. return 0;
  150. }
  151. int process_pat(INPUT *r, uint16_t pid, uint8_t *ts_packet) {
  152. INPUT_STREAM *s = &r->stream;
  153. if (pid != 0)
  154. return 0;
  155. // Process PAT
  156. s->pat = ts_pat_push_packet(s->pat, ts_packet);
  157. s->last_pat = ts_pat_push_packet(s->last_pat, ts_packet);
  158. if (s->last_pat->initialized) {
  159. if (!ts_pat_is_same(s->pat, s->last_pat)) {
  160. proxy_log(r, "PAT changed.");
  161. return -1; // Reconnect
  162. }
  163. ts_pat_free(&s->last_pat);
  164. s->last_pat = ts_pat_alloc();
  165. }
  166. if (s->pat->initialized) {
  167. // PMT pid is still unknown
  168. if (!s->pmt_pid) {
  169. if (!input_process_pat(r)) {
  170. proxy_log(r, "Can't parse PAT to find PMT pid.");
  171. return -1;
  172. }
  173. }
  174. // Rewritten PAT is not yet initialized
  175. if (!s->pat_rewritten || !s->pat_rewritten->initialized) {
  176. input_rewrite_pat(r);
  177. #if dump_tables
  178. proxy_log(r, "PAT found!");
  179. proxy_log(r, "*** Original PAT ***");
  180. ts_pat_dump(s->pat);
  181. proxy_log(r, "*** Rewritten PAT ***");
  182. ts_pat_dump(s->pat_rewritten);
  183. pidref_dump(s->pidref);
  184. #endif
  185. }
  186. // Only if output file is written
  187. if (r->ifd && s->pat_rewritten && s->pat_rewritten->initialized) {
  188. int j;
  189. struct ts_pat *P = s->pat_rewritten;
  190. for (j=0;j<P->section_header->num_packets;j++) {
  191. ts_packet_set_cont(P->section_header->packet_data + (j * TS_PACKET_SIZE), j + s->pid_pat_cont);
  192. }
  193. P->ts_header.continuity = s->pid_pat_cont;
  194. s->pid_pat_cont += P->section_header->num_packets;
  195. write(r->ifd, P->section_header->packet_data, P->section_header->num_packets * TS_PACKET_SIZE);
  196. }
  197. }
  198. // Stuff packet with NULL data
  199. memset(ts_packet, 0xff, TS_PACKET_SIZE);
  200. ts_packet[0] = 0x47;
  201. ts_packet[1] = 0x1F;
  202. ts_packet[2] = 0xFF;
  203. ts_packet[3] = 0x10;
  204. return 1;
  205. }
  206. int process_pmt(INPUT *r, uint16_t pid, uint8_t *ts_packet) {
  207. INPUT_STREAM *s = &r->stream;
  208. if (!pid || pid != s->pmt_pid)
  209. return 0;
  210. s->pmt = ts_pmt_push_packet(s->pmt, ts_packet);
  211. s->last_pmt = ts_pmt_push_packet(s->last_pmt, ts_packet);
  212. if (s->last_pmt->initialized) {
  213. if (!ts_pmt_is_same(s->pmt, s->last_pmt)) {
  214. proxy_log(r, "PMT changed.");
  215. return -2; // Reconnect
  216. }
  217. ts_pmt_free(&s->last_pmt);
  218. s->last_pmt = ts_pmt_alloc();
  219. }
  220. if (s->pmt->initialized) {
  221. if (!s->pmt_rewritten || !s->pmt_rewritten->initialized) {
  222. input_rewrite_pmt(r);
  223. #if dump_tables
  224. proxy_log(r, "PMT found!");
  225. proxy_log(r, "*** Original PMT ***");
  226. ts_pmt_dump(s->pmt);
  227. proxy_log(r, "*** Rewritten PMT ***");
  228. ts_pmt_dump(s->pmt_rewritten);
  229. // pidref_dump(s->pidref);
  230. #endif
  231. }
  232. if (s->pmt_rewritten && s->pmt_rewritten->initialized) {
  233. int j;
  234. struct ts_pmt *P = s->pmt_rewritten;
  235. for (j=0;j<P->section_header->num_packets;j++) {
  236. ts_packet_set_cont(P->section_header->packet_data + (j * TS_PACKET_SIZE), j + s->pid_pmt_cont);
  237. }
  238. P->ts_header.continuity = s->pid_pmt_cont;
  239. s->pid_pmt_cont += P->section_header->num_packets;
  240. input_buffer_add(r, P->section_header->packet_data, P->section_header->num_packets * TS_PACKET_SIZE);
  241. }
  242. return -1;
  243. }
  244. return 1;
  245. }
  246. int in_worktime(int start, int end) {
  247. if (!start && !end)
  248. return 1;
  249. struct tm ltime;
  250. struct tm *ltimep = &ltime;
  251. time_t timep = time(NULL);
  252. ltimep = localtime_r(&timep, ltimep);
  253. int seconds = ltime.tm_sec + ltime.tm_min * 60 + ltime.tm_hour * 3600;
  254. if (start > end) {
  255. if (start >= seconds && end < seconds)
  256. return 0;
  257. else
  258. return 1;
  259. } else {
  260. if (start <= seconds && end > seconds)
  261. return 1;
  262. else
  263. return 0;
  264. }
  265. return 1;
  266. }
  267. void * input_stream(void *self) {
  268. INPUT *r = self;
  269. INPUT_STREAM *s = &r->stream;
  270. char buf[FRAME_PACKET_SIZE];
  271. signal(SIGPIPE, SIG_IGN);
  272. proxy_log(r, "Start");
  273. r->working = in_worktime(r->channel->worktime_start, r->channel->worktime_end);
  274. if (!r->working)
  275. proxy_log(r, "Worktime has not yet begin, sleeping.");
  276. int http_code = 0;
  277. while (keep_going) {
  278. if (input_check_state(r) == 2) // r->dienow is on
  279. goto QUIT;
  280. while (!r->working) {
  281. usleep(250000);
  282. r->working = in_worktime(r->channel->worktime_start, r->channel->worktime_end);
  283. if (r->working)
  284. proxy_log(r, "Worktime started.");
  285. if (!keep_going)
  286. goto QUIT;
  287. }
  288. r->working = in_worktime(r->channel->worktime_start, r->channel->worktime_end);
  289. int result = connect_source(self, 1, FRAME_PACKET_SIZE * 1000, &http_code);
  290. if (result != 0)
  291. goto RECONNECT;
  292. channel_source sproto = get_sproto(r->channel->source);
  293. if (mpeg_sync(r, sproto) != 0) {
  294. proxy_log(r, "Can't sync input MPEG TS");
  295. sleep(2);
  296. goto RECONNECT;
  297. }
  298. ssize_t readen;
  299. int max_zero_reads = MAX_ZERO_READS;
  300. // Reset all stream parameters on reconnect.
  301. input_stream_reset(r);
  302. for (;;) {
  303. r->working = in_worktime(r->channel->worktime_start, r->channel->worktime_end);
  304. if (!r->working) {
  305. proxy_log(r, "Worktime ended.");
  306. goto STOP;
  307. }
  308. switch (input_check_state(r)) {
  309. case 1: goto RECONNECT; // r->reconnect is on
  310. case 2: goto QUIT; // r->dienow is on
  311. }
  312. if (sproto == tcp_sock) {
  313. readen = fdread_ex(r->sock, buf, FRAME_PACKET_SIZE, TCP_READ_TIMEOUT, TCP_READ_RETRIES, 1);
  314. } else {
  315. readen = fdread_ex(r->sock, buf, FRAME_PACKET_SIZE, UDP_READ_TIMEOUT, UDP_READ_RETRIES, 0);
  316. }
  317. if (readen < 0)
  318. goto RECONNECT;
  319. if (readen == 0) { // ho, hum, wtf is going on here?
  320. proxy_log(r, "Zero read, continuing...");
  321. if (--max_zero_reads == 0) {
  322. proxy_log(r, "Max zero reads reached, reconnecting.");
  323. break;
  324. }
  325. continue;
  326. }
  327. int i;
  328. for (i=0; i<readen; i+=188) {
  329. if (r->dienow)
  330. goto QUIT;
  331. uint8_t *ts_packet = (uint8_t *)buf + i;
  332. uint16_t pid = ts_packet_get_pid(ts_packet);
  333. if (process_pat(r, pid, ts_packet) < 0)
  334. goto RECONNECT;
  335. int pmt_result = process_pmt(r, pid, ts_packet);
  336. if (pmt_result == -2)
  337. goto RECONNECT;
  338. if (pmt_result < 0) // PMT rewritten
  339. continue;
  340. pid = ts_packet_get_pid(ts_packet);
  341. // Kill incomming NIT, SDT, EIT, RST, TDT/TOT
  342. if (pid == s->nit_pid || pid == 0x10 || pid == 0x11 || pid == 0x12 || pid == 0x13 || pid == 0x14 || pid == 0x1fff) {
  343. // LOGf("INPUT: %-10s: Remove PID %03x\n", r->channel->id, pid);
  344. continue;
  345. }
  346. // Do we have PAT and PMT? (if we have pmt we have PAT, so check only for PMT)
  347. if (s->pmt_rewritten && pid == s->pcr_pid && ts_packet_has_pcr(ts_packet)) {
  348. s->input_pcr = ts_packet_get_pcr(ts_packet);
  349. // LOGf("INPUT : [%-12s] PCR: %llu\n", r->channel->id, s->input_pcr);
  350. }
  351. // Yes, we have enough data to start outputing
  352. if (s->input_pcr) {
  353. pidref_change_packet_pid(ts_packet, pid, s->pidref);
  354. input_buffer_add(r, ts_packet, TS_PACKET_SIZE);
  355. if (!r->input_ready)
  356. r->input_ready = 1;
  357. }
  358. }
  359. max_zero_reads = MAX_ZERO_READS;
  360. }
  361. proxy_log(r, "fdread timeout");
  362. RECONNECT:
  363. proxy_log(r, "Reconnect");
  364. shutdown_fd(&(r->sock));
  365. chansrc_next(r->channel);
  366. continue;
  367. STOP:
  368. proxy_log(r, "Stop");
  369. shutdown_fd(&(r->sock));
  370. continue;
  371. QUIT:
  372. break;
  373. }
  374. proxy_close(config->inputs, &r);
  375. return 0;
  376. }