mptsd reads mpegts streams from udp/multicast or http and combines them into one multiple program stream that is suitable for outputting to DVB-C modulator. Tested with Dektec DTE-3114 Quad QAM Modulator and used in production in small DVB-C networks. https://georgi.unixsol.org/programs/mptsd/
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

input.c 12KB


  1. /*
  2. * mptsd input handling
  3. * Copyright (C) 2010-2011 Unix Solutions Ltd.
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License version 2
  7. * as published by the Free Software Foundation.
  8. *
  9. * This program is distributed in the hope that it will be useful,
  10. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. * GNU General Public License for more details.
  13. *
  14. * You should have received a copy of the GNU General Public License
  15. * along with this program; if not, write to the Free Software
  16. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
  17. */
  18. #include <stdio.h>
  19. #include <math.h>
  20. #include <stdlib.h>
  21. #include <unistd.h>
  22. #include <signal.h>
  23. #include <string.h>
  24. #include "libfuncs/io.h"
  25. #include "libfuncs/log.h"
  26. #include "libtsfuncs/tsfuncs.h"
  27. #include "data.h"
  28. #include "config.h"
  29. #include "network.h"
  30. extern int keep_going;
  31. // #define dump_tables 1
  32. #define MAX_ZERO_READS 3
  33. /* Start: 3 seconds on connect */
  34. /* In connection: Max UDP timeout == 3 seconds (read) + 2 seconds (connect) == 5 seconds */
  35. #define UDP_READ_RETRIES 3
  36. #define UDP_READ_TIMEOUT 1000
  37. /* Start: 1/4 seconds on connect */
  38. /* In connection: Max TCP timeout == 5 seconds (read) + 2 seconds (connect) == 7 seconds */
  39. /* In connection: Max TCP timeout == 5 seconds (read) + 8 seconds (connect, host unrch) == 13 seconds */
  40. #define TCP_READ_RETRIES 5
  41. #define TCP_READ_TIMEOUT 1000
  42. // Init pmt_pid and nit_pid
  43. // Return 0 on error, 1 on success
  44. int input_process_pat(INPUT *r) {
  45. int i;
  46. int num_programs = 0;
  47. INPUT_STREAM *s = &r->stream;
  48. struct ts_pat *pat = s->pat;
  49. s->nit_pid = 0x10; // Default NIT pid
  50. for (i=0;i<pat->programs_num;i++) {
  51. struct ts_pat_program *prg = pat->programs[i];
  52. if (prg->pid) {
  53. if (prg->program == 0) { // NIT
  54. s->nit_pid = prg->pid;
  55. } else { // PAT
  56. s->pmt_pid = prg->pid;
  57. num_programs++;
  58. break; // Get only the first program
  59. }
  60. }
  61. }
  62. // MPTS is not supported as input stream in the moment
  63. if (num_programs > 1) {
  64. LOGf("INPUT : %-10s | Can't handle MPTS (%d programs) as input stream\n", r->channel->id, num_programs);
  65. return 0;
  66. }
  67. return 1;
  68. }
  69. void input_rewrite_pat(INPUT *r) {
  70. int i;
  71. INPUT_STREAM *s = &r->stream;
  72. struct ts_pat *new_pat = ts_pat_copy(s->pat);
  73. if (!new_pat)
  74. return;
  75. // Rewrite PAT pids
  76. for (i=0;i<new_pat->programs_num;i++) {
  77. struct ts_pat_program *prg = new_pat->programs[i];
  78. if (prg->program != 0) { // Skip NIT
  79. // Add pid to rewriter
  80. pidref_add(s->pidref, prg->pid, s->pidref->base_pid);
  81. // Rewrite PAT
  82. prg->program = r->channel->service_id;
  83. prg->pid = s->pidref->base_pid;
  84. s->pidref->base_pid++;
  85. }
  86. }
  87. // Save rewritten packet
  88. ts_pat_regenerate_packets(new_pat);
  89. s->pat_rewritten = new_pat;
  90. }
  91. void input_rewrite_pmt(INPUT *r) {
  92. INPUT_STREAM *s = &r->stream;
  93. struct ts_pmt *new_pmt = ts_pmt_copy(s->pmt);
  94. if (!new_pmt)
  95. return;
  96. // Rewrite PMT pids
  97. new_pmt->ts_header.pid = pidref_get_new_pid(s->pidref, s->pmt_pid);
  98. new_pmt->section_header->ts_id_number = r->channel->service_id;
  99. uint16_t org_pcr_pid = new_pmt->PCR_pid;
  100. s->pcr_pid = new_pmt->PCR_pid;
  101. pidref_add(s->pidref, org_pcr_pid, s->pidref->base_pid);
  102. new_pmt->PCR_pid = s->pidref->base_pid;
  103. r->output_pcr_pid = new_pmt->PCR_pid;
  104. s->pidref->base_pid++;
  105. int i;
  106. for (i=0;i<new_pmt->streams_num;i++) {
  107. struct ts_pmt_stream *stream = new_pmt->streams[i];
  108. if (stream->pid == org_pcr_pid) { // Already rewritten and added to pidref
  109. stream->pid = new_pmt->PCR_pid;
  110. continue;
  111. }
  112. pidref_add(s->pidref, stream->pid, s->pidref->base_pid);
  113. stream->pid = s->pidref->base_pid;
  114. s->pidref->base_pid++;
  115. }
  116. ts_pmt_regenerate_packets(new_pmt);
  117. s->pmt_rewritten = new_pmt;
  118. }
  119. extern CONFIG *config;
  120. void input_buffer_add(INPUT *r, uint8_t *data, int datasize) {
  121. if (r->dienow)
  122. return;
  123. if (r->ifd)
  124. write(r->ifd, data, datasize);
  125. if (r->disabled) {
  126. unsigned long bufsize = r->buf->input - r->buf->output;
  127. double buffull = ((double)bufsize / r->buf->size) * 100;
  128. if (buffull <= 50) {
  129. proxy_log(r, "Enable input");
  130. r->disabled = 0;
  131. } else {
  132. return;
  133. }
  134. }
  135. if (cbuf_fill(r->buf, data, datasize) != 0) {
  136. proxy_log(r, "Disable input, buffer is full.");
  137. r->disabled = 1;
  138. }
  139. }
  140. int input_check_state(INPUT *r) {
  141. if (r->dienow) {
  142. // proxy_log(r, "Forced disconnect.");
  143. return 2;
  144. }
  145. if (r->reconnect) {
  146. proxy_log(r, "Forced reconnect.");
  147. return 1;
  148. }
  149. return 0;
  150. }
  151. int process_pat(INPUT *r, uint16_t pid, uint8_t *ts_packet) {
  152. INPUT_STREAM *s = &r->stream;
  153. if (pid != 0)
  154. return 0;
  155. // Process PAT
  156. s->pat = ts_pat_push_packet(s->pat, ts_packet);
  157. s->last_pat = ts_pat_push_packet(s->last_pat, ts_packet);
  158. if (s->last_pat->initialized) {
  159. if (!ts_pat_is_same(s->pat, s->last_pat)) {
  160. proxy_log(r, "PAT changed.");
  161. return -1; // Reconnect
  162. }
  163. ts_pat_free(&s->last_pat);
  164. s->last_pat = ts_pat_alloc();
  165. }
  166. if (s->pat->initialized) {
  167. // PMT pid is still unknown
  168. if (!s->pmt_pid) {
  169. if (!input_process_pat(r)) {
  170. proxy_log(r, "Can't parse PAT to find PMT pid.");
  171. return -1;
  172. }
  173. }
  174. // Rewritten PAT is not yet initialized
  175. if (!s->pat_rewritten || !s->pat_rewritten->initialized) {
  176. input_rewrite_pat(r);
  177. #if dump_tables
  178. proxy_log(r, "PAT found!");
  179. proxy_log(r, "*** Original PAT ***");
  180. ts_pat_dump(s->pat);
  181. proxy_log(r, "*** Rewritten PAT ***");
  182. ts_pat_dump(s->pat_rewritten);
  183. pidref_dump(s->pidref);
  184. #endif
  185. }
  186. // Only if output file is written
  187. if (r->ifd && s->pat_rewritten && s->pat_rewritten->initialized) {
  188. int j;
  189. struct ts_pat *P = s->pat_rewritten;
  190. for (j=0;j<P->section_header->num_packets;j++) {
  191. ts_packet_set_cont(P->section_header->packet_data + (j * TS_PACKET_SIZE), j + s->pid_pat_cont);
  192. }
  193. P->ts_header.continuity = s->pid_pat_cont;
  194. s->pid_pat_cont += P->section_header->num_packets;
  195. write(r->ifd, P->section_header->packet_data, P->section_header->num_packets * TS_PACKET_SIZE);
  196. }
  197. }
  198. // Stuff packet with NULL data
  199. memset(ts_packet, 0xff, TS_PACKET_SIZE);
  200. ts_packet[0] = 0x47;
  201. ts_packet[1] = 0x1F;
  202. ts_packet[2] = 0xFF;
  203. ts_packet[3] = 0x10;
  204. return 1;
  205. }
  206. int process_pmt(INPUT *r, uint16_t pid, uint8_t *ts_packet) {
  207. INPUT_STREAM *s = &r->stream;
  208. if (!pid || pid != s->pmt_pid)
  209. return 0;
  210. s->pmt = ts_pmt_push_packet(s->pmt, ts_packet);
  211. s->last_pmt = ts_pmt_push_packet(s->last_pmt, ts_packet);
  212. if (s->last_pmt->initialized) {
  213. if (!ts_pmt_is_same(s->pmt, s->last_pmt)) {
  214. proxy_log(r, "PMT changed.");
  215. return -2; // Reconnect
  216. }
  217. ts_pmt_free(&s->last_pmt);
  218. s->last_pmt = ts_pmt_alloc();
  219. }
  220. if (s->pmt->initialized) {
  221. if (!s->pmt_rewritten || !s->pmt_rewritten->initialized) {
  222. input_rewrite_pmt(r);
  223. #if dump_tables
  224. proxy_log(r, "PMT found!");
  225. proxy_log(r, "*** Original PMT ***");
  226. ts_pmt_dump(s->pmt);
  227. proxy_log(r, "*** Rewritten PMT ***");
  228. ts_pmt_dump(s->pmt_rewritten);
  229. // pidref_dump(s->pidref);
  230. #endif
  231. }
  232. if (s->pmt_rewritten && s->pmt_rewritten->initialized) {
  233. int j;
  234. struct ts_pmt *P = s->pmt_rewritten;
  235. for (j=0;j<P->section_header->num_packets;j++) {
  236. ts_packet_set_cont(P->section_header->packet_data + (j * TS_PACKET_SIZE), j + s->pid_pmt_cont);
  237. }
  238. P->ts_header.continuity = s->pid_pmt_cont;
  239. s->pid_pmt_cont += P->section_header->num_packets;
  240. input_buffer_add(r, P->section_header->packet_data, P->section_header->num_packets * TS_PACKET_SIZE);
  241. }
  242. return -1;
  243. }
  244. return 1;
  245. }
  246. int in_worktime(int start, int end) {
  247. if (!start && !end)
  248. return 1;
  249. struct tm ltime;
  250. time_t timep = time(NULL);
  251. localtime_r(&timep, &ltime);
  252. int seconds = ltime.tm_sec + ltime.tm_min * 60 + ltime.tm_hour * 3600;
  253. if (start > end) {
  254. if (start >= seconds && end < seconds)
  255. return 0;
  256. else
  257. return 1;
  258. } else {
  259. if (start <= seconds && end > seconds)
  260. return 1;
  261. else
  262. return 0;
  263. }
  264. return 1;
  265. }
  266. void * input_stream(void *self) {
  267. INPUT *r = self;
  268. INPUT_STREAM *s = &r->stream;
  269. char buffer[RTP_HEADER_SIZE + FRAME_PACKET_SIZE];
  270. char *buf = buffer + RTP_HEADER_SIZE;
  271. signal(SIGPIPE, SIG_IGN);
  272. proxy_log(r, "Start");
  273. r->working = in_worktime(r->channel->worktime_start, r->channel->worktime_end);
  274. if (!r->working)
  275. proxy_log(r, "Worktime has not yet begin, sleeping.");
  276. int http_code = 0;
  277. while (keep_going) {
  278. if (input_check_state(r) == 2) // r->dienow is on
  279. goto QUIT;
  280. while (!r->working) {
  281. usleep(250000);
  282. r->working = in_worktime(r->channel->worktime_start, r->channel->worktime_end);
  283. if (r->working)
  284. proxy_log(r, "Worktime started.");
  285. if (!keep_going)
  286. goto QUIT;
  287. }
  288. r->working = in_worktime(r->channel->worktime_start, r->channel->worktime_end);
  289. int result = connect_source(self, 1, FRAME_PACKET_SIZE * 1000, &http_code);
  290. if (result != 0)
  291. goto RECONNECT;
  292. channel_source sproto = get_sproto(r->channel->source);
  293. int rtp = is_rtp(r->channel->source);
  294. if (!rtp && mpeg_sync(r, sproto) != 0) {
  295. proxy_log(r, "Can't sync input MPEG TS");
  296. sleep(2);
  297. goto RECONNECT;
  298. }
  299. ssize_t readen;
  300. int max_zero_reads = MAX_ZERO_READS;
  301. // Reset all stream parameters on reconnect.
  302. input_stream_reset(r);
  303. for (;;) {
  304. r->working = in_worktime(r->channel->worktime_start, r->channel->worktime_end);
  305. if (!r->working) {
  306. proxy_log(r, "Worktime ended.");
  307. goto STOP;
  308. }
  309. switch (input_check_state(r)) {
  310. case 1: goto RECONNECT; // r->reconnect is on
  311. case 2: goto QUIT; // r->dienow is on
  312. }
  313. if (sproto == tcp_sock) {
  314. readen = fdread_ex(r->sock, buf, FRAME_PACKET_SIZE, TCP_READ_TIMEOUT, TCP_READ_RETRIES, 1);
  315. } else {
  316. if (!rtp) {
  317. readen = fdread_ex(r->sock, buf, FRAME_PACKET_SIZE, UDP_READ_TIMEOUT, UDP_READ_RETRIES, 0);
  318. } else {
  319. readen = fdread_ex(r->sock, buffer, FRAME_PACKET_SIZE + RTP_HEADER_SIZE, UDP_READ_TIMEOUT, UDP_READ_RETRIES, 0);
  320. if (readen > RTP_HEADER_SIZE)
  321. readen -= RTP_HEADER_SIZE;
  322. }
  323. }
  324. if (readen < 0)
  325. goto RECONNECT;
  326. if (readen == 0) { // ho, hum, wtf is going on here?
  327. proxy_log(r, "Zero read, continuing...");
  328. if (--max_zero_reads == 0) {
  329. proxy_log(r, "Max zero reads reached, reconnecting.");
  330. break;
  331. }
  332. continue;
  333. }
  334. int i;
  335. for (i=0; i<readen; i+=188) {
  336. if (r->dienow)
  337. goto QUIT;
  338. uint8_t *ts_packet = (uint8_t *)buf + i;
  339. uint16_t pid = ts_packet_get_pid(ts_packet);
  340. if (process_pat(r, pid, ts_packet) < 0)
  341. goto RECONNECT;
  342. int pmt_result = process_pmt(r, pid, ts_packet);
  343. if (pmt_result == -2)
  344. goto RECONNECT;
  345. if (pmt_result < 0) // PMT rewritten
  346. continue;
  347. pid = ts_packet_get_pid(ts_packet);
  348. // Kill incomming NIT, SDT, EIT, RST, TDT/TOT
  349. if (pid == s->nit_pid || pid == 0x10 || pid == 0x11 || pid == 0x12 || pid == 0x13 || pid == 0x14 || pid == 0x1fff) {
  350. // LOGf("INPUT: %-10s: Remove PID %03x\n", r->channel->id, pid);
  351. continue;
  352. }
  353. // Do we have PAT and PMT? (if we have pmt we have PAT, so check only for PMT)
  354. if (s->pmt_rewritten && pid == s->pcr_pid && ts_packet_has_pcr(ts_packet)) {
  355. s->input_pcr = ts_packet_get_pcr(ts_packet);
  356. // LOGf("INPUT : [%-12s] PCR: %llu\n", r->channel->id, s->input_pcr);
  357. }
  358. // Yes, we have enough data to start outputing
  359. if (s->input_pcr) {
  360. pidref_change_packet_pid(ts_packet, pid, s->pidref);
  361. input_buffer_add(r, ts_packet, TS_PACKET_SIZE);
  362. if (!r->input_ready)
  363. r->input_ready = 1;
  364. }
  365. }
  366. max_zero_reads = MAX_ZERO_READS;
  367. }
  368. proxy_log(r, "fdread timeout");
  369. RECONNECT:
  370. proxy_log(r, "Reconnect");
  371. shutdown_fd(&(r->sock));
  372. chansrc_next(r->channel);
  373. continue;
  374. STOP:
  375. proxy_log(r, "Stop");
  376. shutdown_fd(&(r->sock));
  377. continue;
  378. QUIT:
  379. break;
  380. }
  381. proxy_close(config->inputs, &r);
  382. return 0;
  383. }