mptsd reads mpegts streams from udp/multicast or http and combines them into one multiple program stream that is suitable for outputting to DVB-C modulator. Tested with Dektec DTE-3114 Quad QAM Modulator and used in production in small DVB-C networks. https://georgi.unixsol.org/programs/mptsd/
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

output_write.c 6.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. /*
  2. * mptsd output writing
  3. * Copyright (C) 2010-2011 Unix Solutions Ltd.
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License version 2
  7. * as published by the Free Software Foundation.
  8. *
  9. * This program is distributed in the hope that it will be useful,
  10. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. * GNU General Public License for more details.
  13. *
  14. * You should have received a copy of the GNU General Public License
  15. * along with this program; if not, write to the Free Software
  16. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
  17. */
  18. #include <unistd.h>
  19. #include <string.h>
  20. #include <signal.h>
  21. #include <sys/time.h>
  22. #include <errno.h>
  23. #include <math.h>
  24. #include <inttypes.h>
  25. #include "libfuncs/io.h"
  26. #include "libfuncs/log.h"
  27. #include "libfuncs/list.h"
  28. #include "libtsfuncs/tsfuncs.h"
  29. #include "sleep.h"
  30. #include "data.h"
  31. #include "config.h"
  32. #include "network.h"
  33. void increase_process_priority() {
  34. return;
  35. #ifdef __linux__
  36. struct sched_param param;
  37. param.sched_priority = 99;
  38. if (sched_setscheduler(0, SCHED_FIFO, &param)==-1) {
  39. log_perror("sched_setscheduler() failed!", errno);
  40. } else {
  41. LOGf("PRIO : sched_setschedule() succeded.\n");
  42. }
  43. #endif
  44. }
  45. void ts_frame_process(CONFIG *conf, OUTPUT *o, uint8_t *data) {
  46. int i;
  47. uint16_t pid;
  48. uint8_t *ts_packet;
  49. for (i=0; i<FRAME_PACKET_SIZE; i+=TS_PACKET_SIZE) {
  50. ts_packet = data + i;
  51. pid = ts_packet_get_pid(ts_packet);
  52. if (pid == 0x1fff) // NULL packet
  53. o->padding_period += TS_PACKET_SIZE;
  54. if (ts_packet_has_pcr(ts_packet)) {
  55. uint64_t pcr = ts_packet_get_pcr(ts_packet); // Current PCR
  56. uint64_t new_pcr = pcr;
  57. uint64_t bytes = o->traffic + i;
  58. if (o->last_pcr[pid]) {
  59. uint64_t old_pcr = o->last_pcr[pid];
  60. uint64_t old_org_pcr = o->last_org_pcr[pid];
  61. uint64_t old_bytes = o->last_traffic[pid];
  62. if (old_org_pcr < pcr) { // Detect PCR wraparound
  63. new_pcr = old_pcr + (double)((bytes - old_bytes) * 8 * 27000000) / o->output_bitrate;
  64. // Rewrite pcrs || Move pcrs & rewrite prcs
  65. if (conf->pcr_mode == 2 || conf->pcr_mode == 3) {
  66. ts_packet_set_pcr(ts_packet, new_pcr);
  67. }
  68. if (conf->debug) {
  69. uint64_t ts_rate = (double)(((bytes - old_bytes) * 8) * 27000000) / (pcr - old_org_pcr);
  70. uint64_t ts_rate_new = (double)(((bytes - old_bytes) * 8) * 27000000) / (new_pcr - old_pcr);
  71. LOGf("PCR[%03x]: old:%14" PRIu64 " new:%14" PRIu64 " pcr_diff:%8" PRId64 " ts_rate:%9" PRIu64 " ts_rate_new:%9" PRIu64 " diff:%9" PRId64 " | passed:%" PRIu64 "\n",
  72. pid,
  73. pcr,
  74. new_pcr,
  75. pcr - new_pcr,
  76. ts_rate,
  77. ts_rate_new,
  78. ts_rate - ts_rate_new,
  79. bytes - old_bytes
  80. );
  81. }
  82. }
  83. } else {
  84. // if (config->debug) {
  85. // LOGf("PCR[%03x]: %10llu init\n", pid, pcr);
  86. // }
  87. }
  88. o->last_pcr[pid] = new_pcr;
  89. o->last_org_pcr[pid] = pcr;
  90. o->last_traffic[pid] = bytes;
  91. }
  92. }
  93. }
  94. ssize_t ts_frame_write(OUTPUT *o, uint8_t *data) {
  95. ssize_t written;
  96. written = fdwrite(o->out_sock, (char *)data, FRAME_PACKET_SIZE);
  97. if (written >= 0) {
  98. o->traffic += written;
  99. o->traffic_period += written;
  100. }
  101. if (o->ofd)
  102. write(o->ofd, data, FRAME_PACKET_SIZE);
  103. return written;
  104. }
  105. void * output_handle_write(void *_config) {
  106. CONFIG *conf = _config;
  107. OUTPUT *o = conf->output;
  108. int buf_in_use = 0;
  109. struct timeval stats_ts, now;
  110. struct timeval start_write_ts, end_write_ts, used_ts;
  111. unsigned long long stats_interval;
  112. signal(SIGPIPE, SIG_IGN);
  113. increase_process_priority();
  114. gettimeofday(&stats_ts, NULL);
  115. while (!o->dienow) {
  116. gettimeofday(&now, NULL);
  117. OBUF *curbuf = &o->obuf[buf_in_use];
  118. while (curbuf->status != obuf_full) { // Wait untill the buffer is ready ot it is already emptying
  119. if (o->dienow)
  120. goto OUT;
  121. //LOGf("MIX: Waiting for obuf %d\n", buf_in_use);
  122. usleep(1);
  123. }
  124. curbuf->status = obuf_emptying; // Mark buffer as being filled
  125. // Show stats
  126. stats_interval = timeval_diff_msec(&stats_ts, &now);
  127. if (stats_interval > conf->timeouts.stats) {
  128. stats_ts = now;
  129. double out_kbps = (double)(o->traffic_period * 8) / 1000;
  130. double out_mbps = (double)out_kbps / 1000;
  131. double opadding = ((double)o->padding_period / o->traffic_period) * 100;
  132. if (!conf->quiet) {
  133. LOGf("STAT : Pad:%6.2f%% Traf:%5.2f Mbps | %8.2f | %7" PRIu64 "\n",
  134. opadding,
  135. out_mbps,
  136. out_kbps,
  137. o->traffic_period
  138. );
  139. }
  140. o->traffic_period = 0;
  141. o->padding_period = 0;
  142. }
  143. gettimeofday(&start_write_ts, NULL);
  144. int packets_written = 0, real_sleep_time = conf->output_tmout - conf->usleep_overhead;
  145. long time_taken, time_diff, real_time, overhead = 0, overhead_total = 0;
  146. ssize_t written = 0;
  147. while (curbuf->written < curbuf->size) {
  148. if (o->dienow)
  149. goto OUT;
  150. long sleep_interval = conf->output_tmout;
  151. uint8_t *ts_frame = curbuf->buf + curbuf->written;
  152. ts_frame_process(conf, o, ts_frame); // Fix PCR and count NULL packets
  153. written += ts_frame_write(o, ts_frame); // Write packet to network/file
  154. curbuf->written += FRAME_PACKET_SIZE;
  155. if (packets_written) {
  156. time_taken = timeval_diff_usec(&start_write_ts, &used_ts);
  157. real_time = packets_written * (conf->output_tmout + conf->usleep_overhead);
  158. time_diff = real_time - time_taken;
  159. overhead = (time_taken / packets_written) - sleep_interval;
  160. overhead_total += overhead;
  161. /*
  162. LOGf("[%5d] time_taken:%5ld real_time:%5ld time_diff:%ld | overhead:%5ld overhead_total:%5ld\n",
  163. packets_written,
  164. time_taken,
  165. real_time,
  166. time_diff,
  167. overhead,
  168. overhead_total
  169. );
  170. */
  171. if (time_diff > real_sleep_time) {
  172. sleep_interval = time_diff - conf->usleep_overhead;
  173. if (sleep_interval < 0)
  174. sleep_interval = 1;
  175. // LOGf("Add sleep. time_diff: %ld sleep_interval: %ld\n", time_diff, sleep_interval);
  176. } else {
  177. //LOGf("Skip sleep %ld\n", time_diff);
  178. sleep_interval = 0;
  179. }
  180. }
  181. if (sleep_interval > 0)
  182. usleep(sleep_interval);
  183. gettimeofday(&used_ts, NULL);
  184. packets_written++;
  185. }
  186. gettimeofday(&end_write_ts, NULL);
  187. unsigned long long write_time = timeval_diff_usec(&start_write_ts, &end_write_ts);
  188. if (write_time < o->obuf_ms * 1000) {
  189. //LOGf("Writen for -%llu us less\n", o->obuf_ms*1000 - write_time);
  190. usleep(o->obuf_ms*1000 - write_time);
  191. } else {
  192. //LOGf("Writen for +%llu us more\n", write_time - o->obuf_ms*1000);
  193. }
  194. obuf_reset(curbuf); // Buffer us all used up
  195. buf_in_use = buf_in_use ? 0 : 1; // Switch buffer
  196. if (written < 0) {
  197. LOG("OUTPUT: Error writing into output socket.\n");
  198. shutdown_fd(&o->out_sock);
  199. connect_output(o);
  200. }
  201. }
  202. OUT:
  203. LOG("OUTPUT: WRITE thread stopped.\n");
  204. o->dienow++;
  205. return 0;
  206. }