tsdumper2 reads incoming mpeg transport stream over UDP/RTP and then records it to disk. The files names are generated based on preconfigured time interval. https://georgi.unixsol.org/programs/tsdumper2/
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

process.c 6.1KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. /*
  2. * Process incoming data and save it into files.
  3. * Copyright (C) 2013 Unix Solutions Ltd.
  4. *
  5. * This program is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License version 2
  7. * as published by the Free Software Foundation.
  8. *
  9. * This program is distributed in the hope that it will be useful,
  10. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. * GNU General Public License (COPYING file) for more details.
  13. *
  14. */
  15. #include <stdlib.h>
  16. #include <string.h>
  17. #include <unistd.h>
  18. #include <sys/types.h>
  19. #include <sys/stat.h>
  20. #include <fcntl.h>
  21. #include <errno.h>
  22. #include "tsdumper2.h"
  23. #define NO_UNLINK 0
  24. #define UNLINK_OLD 1
  25. #define ALIGN_DOWN(__src, __value) (__src - (__src % __value))
  26. static mode_t dir_perm;
  27. static int file_exists(char *filename) {
  28. return access(filename, W_OK) == 0;
  29. }
  30. static void format_output_filename(struct ts *ts, time_t file_time) {
  31. struct tm file_tm;
  32. localtime_r(&file_time, &file_tm);
  33. ts->output_startts = file_time;
  34. ts->output_filename[0] = '\0';
  35. strcat(ts->output_filename, ts->prefix);
  36. strcat(ts->output_filename, "-");
  37. strftime(ts->output_filename + strlen(ts->output_filename), OUTFILE_NAME_MAX, "%Y%m%d_%H%M%S-%s.ts", &file_tm);
  38. ts->output_dirname[0] = '\0';
  39. strftime(ts->output_dirname, OUTFILE_NAME_MAX, "%Y/%m/%d/%H", &file_tm);
  40. ts->output_full_filename[0] = '\0';
  41. snprintf(ts->output_full_filename, sizeof(ts->output_full_filename), "%s/%s",
  42. ts->output_dirname, ts->output_filename);
  43. }
  44. static void report_file_creation(struct ts *ts, char *text_prefix, char *filename) {
  45. char qdepth[32];
  46. qdepth[0] = '\0';
  47. if (ts->packet_queue->items)
  48. snprintf(qdepth, sizeof(qdepth), " (depth:%d)", ts->packet_queue->items);
  49. p_info("%s%s%s\n", text_prefix, filename, qdepth);
  50. }
  51. static void create_output_directory(struct ts *ts) {
  52. if (!ts->create_dirs)
  53. return;
  54. if (!file_exists(ts->output_dirname)) {
  55. p_info(" = Create directory %s", ts->output_dirname);
  56. create_dir(ts->output_dirname, dir_perm);
  57. }
  58. }
  59. static int create_output_file(struct ts *ts) {
  60. char *filename = ts->output_filename;
  61. if (ts->create_dirs)
  62. filename = ts->output_full_filename;
  63. create_output_directory(ts);
  64. report_file_creation(ts, " = Create new file ", filename);
  65. int fd = open(ts->output_filename, O_CREAT | O_WRONLY | O_TRUNC, 0644);
  66. if (fd < 0) {
  67. p_err("Can't create output file %s", ts->output_filename);
  68. return -1;
  69. }
  70. if (ts->create_dirs) {
  71. link(ts->output_filename, ts->output_full_filename);
  72. }
  73. return fd;
  74. }
  75. static int append_output_file(struct ts *ts) {
  76. char *filename = ts->output_filename;
  77. if (ts->create_dirs)
  78. filename = ts->output_full_filename;
  79. create_output_directory(ts);
  80. report_file_creation(ts, " + Append to file ", filename);
  81. int fd = open(ts->output_filename, O_APPEND | O_WRONLY);
  82. if (fd < 0) {
  83. p_err("Can't append to output file %s", ts->output_filename);
  84. return -1;
  85. }
  86. if (ts->create_dirs) {
  87. link(ts->output_filename, ts->output_full_filename);
  88. }
  89. return fd;
  90. }
  91. static void close_output_file(struct ts *ts, int unlink_file) {
  92. if (ts->output_fd > -1) {
  93. close(ts->output_fd);
  94. if (unlink_file && ts->create_dirs) {
  95. // The file is hard linked into the subdirectory. There is no need
  96. // to keep it in the main directory.
  97. unlink(ts->output_filename);
  98. }
  99. }
  100. }
  101. static void handle_files(struct ts *ts, struct packet *packet) {
  102. int file_time = ALIGN_DOWN(packet->ts.tv_sec, ts->rotate_secs);
  103. // Is this file already created?
  104. if (file_time <= ts->output_startts)
  105. return;
  106. close_output_file(ts, UNLINK_OLD);
  107. format_output_filename(ts, file_time);
  108. /*
  109. * When tsdumper2 is started, try to continue writing into "current" file.
  110. * This allows the program to be killed/restarted.
  111. *
  112. * If current file does not exist, create new file with the time of the start
  113. * (not aligned to rotate_secs).
  114. */
  115. int append = 0;
  116. if (ts->output_fd < 0) { // First file (or error).
  117. append = file_exists(ts->output_filename);
  118. if (!append) { // Create first file *NOT ALIGNED*
  119. format_output_filename(ts, packet->ts.tv_sec);
  120. }
  121. }
  122. ts->output_fd = append ? append_output_file(ts) : create_output_file(ts);
  123. }
  124. void *write_thread(void *_ts) {
  125. struct ts *ts = _ts;
  126. struct packet *packet;
  127. mode_t umask_val = umask(0);
  128. dir_perm = (0777 & ~umask_val) | (S_IWUSR | S_IXUSR);
  129. set_thread_name("tsdump-write");
  130. while ((packet = queue_get(ts->packet_queue))) {
  131. if (!packet->data_len)
  132. continue;
  133. p_dbg1(" - Got packet %d, size: %u, file_time:%lu packet_time:%lu depth:%d\n",
  134. packet->num, packet->data_len, ALIGN_DOWN(packet->ts.tv_sec, ts->rotate_secs),
  135. packet->ts.tv_sec, ts->packet_queue->items);
  136. handle_files(ts, packet);
  137. if (ts->output_fd > -1) {
  138. p_dbg2(" - Writing into fd:%d size:%d file:%s\n", ts->output_fd, packet->data_len, ts->output_filename);
  139. ssize_t written = write(ts->output_fd, packet->data, packet->data_len);
  140. if (written != packet->data_len) {
  141. p_err("Can not write data (fd:%d written %zd of %d file:%s)",
  142. ts->output_fd, written, packet->data_len, ts->output_filename);
  143. }
  144. }
  145. free_packet(packet);
  146. }
  147. close_output_file(ts, NO_UNLINK);
  148. return NULL;
  149. }
  150. static struct packet *add_to_queue(struct ts *ts) {
  151. queue_add(ts->packet_queue, ts->current_packet);
  152. ts->current_packet = alloc_packet(ts);
  153. return ts->current_packet;
  154. }
  155. void process_packets(struct ts *ts, uint8_t *ts_packet, ssize_t readen) {
  156. struct timeval now;
  157. struct packet *packet = ts->current_packet;
  158. if (packet->data_len + readen < PACKET_MAX_LENGTH) {
  159. // Add data to buffer
  160. memcpy(packet->data + packet->data_len, ts_packet, readen);
  161. packet->data_len += readen;
  162. } else {
  163. // Too much data, add to queue
  164. p_dbg1("*** Reached buffer end (%zd + %zd > %d)\n", packet->data_len + readen, readen, PACKET_MAX_LENGTH);
  165. packet = add_to_queue(ts);
  166. }
  167. if (!packet->ts.tv_sec)
  168. gettimeofday(&packet->ts, NULL);
  169. gettimeofday(&now, NULL);
  170. unsigned long long diff = timeval_diff_msec(&packet->ts, &now);
  171. if (diff > PACKET_MAX_TIME) {
  172. // Too much time have passed, add to queue
  173. p_dbg1("+++ Reached time limit (%llu > %d)\n", diff, PACKET_MAX_TIME);
  174. add_to_queue(ts);
  175. }
  176. }