Browse Source

Add threaded decode and output functions that use circular buffers

Georgi Chorbadzhiyski 13 years ago
parent
commit
0a4f2f7a70
5 changed files with 232 additions and 98 deletions
  1. 1
    1
      Makefile
  2. 3
    0
      data.h
  3. 199
    0
      process.c
  4. 8
    0
      process.h
  5. 21
    97
      tsdecrypt.c

+ 1
- 1
Makefile View File

@@ -10,7 +10,7 @@ FUNCS_LIB = $(FUNCS_DIR)/libfuncs.a
10 10
 TS_DIR = libts
11 11
 TS_LIB = $(TS_DIR)/libts.a
12 12
 
13
-tsdecrypt_OBJS = cbuf.o data.o udp.o util.o camd.o tables.o tsdecrypt.o $(FUNCS_LIB) $(TS_LIB)
13
+tsdecrypt_OBJS = cbuf.o data.o udp.o util.o camd.o process.o tables.o tsdecrypt.o $(FUNCS_LIB) $(TS_LIB)
14 14
 tsdecrypt_LIBS = -lcrypto -ldvbcsa -lpthread
15 15
 
16 16
 CLEAN_OBJS = tsdecrypt $(tsdecrypt_OBJS) *~

+ 3
- 0
data.h View File

@@ -13,6 +13,9 @@
13 13
 
14 14
 #include "cbuf.h"
15 15
 
16
+// 7 * 188
17
+#define FRAME_SIZE 1316
18
+
16 19
 struct key {
17 20
 	uint8_t				cw[16];
18 21
 	int					is_valid_cw;

+ 199
- 0
process.c View File

@@ -0,0 +1,199 @@
1
+#include <unistd.h>
2
+
3
+#include "data.h"
4
+#include "tables.h"
5
+
6
+static unsigned long ts_pack;
7
+static int ts_pack_shown;
8
+
9
+void show_ts_pack(struct ts *ts, uint16_t pid, char *wtf, char *extra, uint8_t *ts_packet) {
10
+	char cw1_dump[8 * 6];
11
+	char cw2_dump[8 * 6];
12
+	if (ts->debug_level >= 4) {
13
+		if (ts_pack_shown)
14
+			return;
15
+		int stype = ts_packet_get_scrambled(ts_packet);
16
+		ts_hex_dump_buf(cw1_dump, 8 * 6, ts->key.cw    , 8, 0);
17
+		ts_hex_dump_buf(cw2_dump, 8 * 6, ts->key.cw + 8, 8, 0);
18
+		fprintf(stderr, "@ %s %s %03x %5ld %7ld | %s   %s | %s\n",
19
+			stype == 0 ? "------" :
20
+			stype == 2 ? "even 0" :
21
+			stype == 3 ? "odd  1" : "??????",
22
+			wtf,
23
+			pid,
24
+			ts_pack, ts_pack * 188,
25
+			cw1_dump, cw2_dump, extra ? extra : wtf);
26
+	}
27
+}
28
+
29
+static void dump_ts_pack(struct ts *ts, uint16_t pid, uint8_t *ts_packet) {
30
+	if (pid == 0x010)		show_ts_pack(ts, pid, "nit", NULL, ts_packet);
31
+	else if (pid == 0x11)	show_ts_pack(ts, pid, "sdt", NULL, ts_packet);
32
+	else if (pid == 0x12)	show_ts_pack(ts, pid, "epg", NULL, ts_packet);
33
+	else					show_ts_pack(ts, pid, "---", NULL, ts_packet);
34
+}
35
+
36
+static void decode_packet(struct ts *ts, uint8_t *ts_packet) {
37
+	int scramble_idx = ts_packet_get_scrambled(ts_packet);
38
+	if (scramble_idx > 1) {
39
+		if (ts->key.is_valid_cw) {
40
+			// scramble_idx 2 == even key
41
+			// scramble_idx 3 == odd key
42
+			ts_packet_set_not_scrambled(ts_packet);
43
+			uint8_t payload_ofs = ts_packet_get_payload_offset(ts_packet);
44
+			dvbcsa_decrypt(ts->key.csakey[scramble_idx - 2], ts_packet + payload_ofs, 188 - payload_ofs);
45
+		} else {
46
+			// Can't decrypt the packet just make it NULL packet
47
+			if (ts->pid_filter)
48
+				ts_packet_set_pid(ts_packet, 0x1fff);
49
+		}
50
+	}
51
+}
52
+
53
+static void decode_buffer(struct ts *ts, uint8_t *data, int data_len) {
54
+	int i;
55
+	int batch_sz = dvbcsa_bs_batch_size(); // 32?
56
+	int even_packets = 0;
57
+	int odd_packets  = 0;
58
+	struct dvbcsa_bs_batch_s even_pcks[batch_sz + 1];
59
+	struct dvbcsa_bs_batch_s odd_pcks [batch_sz + 1];
60
+
61
+	// Prepare batch structure
62
+	for (i = 0; i < batch_sz; i++) {
63
+		uint8_t *ts_packet = data + (i * 188);
64
+
65
+		int scramble_idx = ts_packet_get_scrambled(ts_packet);
66
+		if (scramble_idx > 1) {
67
+			if (ts->key.is_valid_cw) {
68
+				uint8_t payload_ofs = ts_packet_get_payload_offset(ts_packet);
69
+				if (scramble_idx == 2) { // scramble_idx 2 == even key
70
+					even_pcks[even_packets].data = ts_packet + payload_ofs;
71
+					even_pcks[even_packets].len  = 188 - payload_ofs;
72
+					even_packets++;
73
+				}
74
+				if (scramble_idx == 3) { // scramble_idx 3 == odd key
75
+					odd_pcks[odd_packets].data = ts_packet + payload_ofs;
76
+					odd_pcks[odd_packets].len  = 188 - payload_ofs;
77
+					odd_packets++;
78
+				}
79
+				ts_packet_set_not_scrambled(ts_packet);
80
+			} else {
81
+				if (ts->pid_filter)
82
+					ts_packet_set_pid(ts_packet, 0x1fff);
83
+			}
84
+		}
85
+	}
86
+
87
+	// Decode packets
88
+	if (even_packets) {
89
+		even_pcks[even_packets].data = NULL; // Last one...
90
+		dvbcsa_bs_decrypt(ts->key.bs_csakey[0], even_pcks, 184);
91
+	}
92
+	if (odd_packets) {
93
+		odd_pcks[odd_packets].data = NULL; // Last one...
94
+		dvbcsa_bs_decrypt(ts->key.bs_csakey[1], odd_pcks, 184);
95
+	}
96
+
97
+	// Fill write buffer
98
+	for (i=0; i<data_len; i += 188) {
99
+		uint8_t *ts_packet = data + i;
100
+
101
+		if (!ts->pid_filter) {
102
+			cbuf_fill(ts->write_buf, ts_packet, 188);
103
+		} else {
104
+			uint16_t pid = ts_packet_get_pid(ts_packet);
105
+			if (pidmap_get(&ts->pidmap, pid)) // PAT or allowed PIDs
106
+				cbuf_fill(ts->write_buf, ts_packet, 188);
107
+		}
108
+	}
109
+}
110
+
111
+void *decode_thread(void *_ts) {
112
+	struct ts *ts = _ts;
113
+	uint8_t *data;
114
+	int data_size;
115
+	int req_size = 188 * dvbcsa_bs_batch_size();
116
+
117
+	while (!ts->decode_stop) {
118
+		data = cbuf_peek(ts->decode_buf, req_size, &data_size);
119
+		if (data_size < req_size) {
120
+			usleep(10000);
121
+			continue;
122
+		}
123
+		data = cbuf_get(ts->decode_buf, req_size, &data_size);
124
+		if (data)
125
+			decode_buffer(ts, data, data_size);
126
+	}
127
+
128
+	do { // Flush data
129
+		data = cbuf_get(ts->decode_buf, req_size, &data_size);
130
+		if (data)
131
+			decode_buffer(ts, data, data_size);
132
+	} while(data);
133
+
134
+	return NULL;
135
+}
136
+
137
+void *write_thread(void *_ts) {
138
+	struct ts *ts = _ts;
139
+	uint8_t *data;
140
+	int data_size;
141
+
142
+	while (!ts->write_stop) {
143
+		data_size = 0;
144
+		data = cbuf_peek(ts->write_buf, FRAME_SIZE, &data_size);
145
+		if (data_size < FRAME_SIZE) {
146
+			usleep(5000);
147
+			continue;
148
+		}
149
+		data = cbuf_get (ts->write_buf, FRAME_SIZE, &data_size);
150
+		if (data)
151
+			write(ts->output.fd, data, data_size);
152
+	}
153
+
154
+	do { // Flush data
155
+		data = cbuf_get(ts->write_buf, FRAME_SIZE, &data_size);
156
+		if (data)
157
+			write(ts->output.fd, data, data_size);
158
+	} while(data);
159
+
160
+	return NULL;
161
+}
162
+
163
+void process_packets(struct ts *ts, uint8_t *data, ssize_t data_len) {
164
+	ssize_t i;
165
+	for (i=0; i<data_len; i += 188) {
166
+		uint8_t *ts_packet = data + i;
167
+		uint16_t pid = ts_packet_get_pid(ts_packet);
168
+
169
+		ts_pack_shown = 0;
170
+
171
+		process_pat(ts, pid, ts_packet);
172
+		process_cat(ts, pid, ts_packet);
173
+		process_pmt(ts, pid, ts_packet);
174
+		process_emm(ts, pid, ts_packet);
175
+		process_ecm(ts, pid, ts_packet);
176
+
177
+		if (!ts_pack_shown)
178
+			dump_ts_pack(ts, pid, ts_packet);
179
+
180
+		if (ts->threaded) {
181
+			// Add to decode buffer. The decoder thread will handle it
182
+			if (cbuf_fill(ts->decode_buf, ts_packet, 188) != 0) {
183
+				ts_LOGf("Decode buffer is full, waiting...\n");
184
+				cbuf_dump(ts->decode_buf);
185
+				usleep(10000);
186
+			}
187
+		} else {
188
+			decode_packet(ts, ts_packet);
189
+			if (ts->pid_filter) {
190
+				if (pidmap_get(&ts->pidmap, pid)) // PAT or allowed PIDs
191
+					write(ts->output.fd, ts_packet, 188);
192
+			} else {
193
+				write(ts->output.fd, ts_packet, 188);
194
+			}
195
+		}
196
+
197
+		ts_pack++;
198
+	}
199
+}

+ 8
- 0
process.h View File

@@ -0,0 +1,8 @@
1
+#ifndef PROCESS_H
2
+#define PROCESS_H
3
+
4
+void *decode_thread(void *_ts);
5
+void *write_thread(void *_ts);
6
+void process_packets(struct ts *ts, uint8_t *data, ssize_t data_len);
7
+
8
+#endif

+ 21
- 97
tsdecrypt.c View File

@@ -1,4 +1,3 @@
1
-#include <stdio.h>
2 1
 #include <stdlib.h>
3 2
 #include <unistd.h>
4 3
 #include <string.h>
@@ -7,18 +6,13 @@
7 6
 #include <fcntl.h>
8 7
 #include <errno.h>
9 8
 
10
-#include <dvbcsa/dvbcsa.h>
11
-
12
-#include "libfuncs/libfuncs.h"
13
-#include "libts/tsfuncs.h"
14
-
15 9
 #include "data.h"
16 10
 #include "util.h"
17 11
 #include "camd.h"
18
-#include "tables.h"
12
+#include "process.h"
19 13
 #include "udp.h"
20 14
 
21
-void LOG_func(const char *msg) {
15
+static void LOG_func(const char *msg) {
22 16
 	char date[64];
23 17
 	struct tm tm;
24 18
 	time_t now;
@@ -28,7 +22,7 @@ void LOG_func(const char *msg) {
28 22
 	fprintf(stderr, "%s | %s", date, msg);
29 23
 }
30 24
 
31
-void show_help(struct ts *ts) {
25
+static void show_help(struct ts *ts) {
32 26
 	printf("tsdecrypt v1.0\n");
33 27
 	printf("Copyright (c) 2011 Unix Solutions Ltd.\n");
34 28
 	printf("\n");
@@ -96,7 +90,7 @@ static int parse_io_param(struct io *io, char *opt, int open_flags, mode_t open_
96 90
 	return 0;
97 91
 }
98 92
 
99
-void parse_options(struct ts *ts, int argc, char **argv) {
93
+static void parse_options(struct ts *ts, int argc, char **argv) {
100 94
 	int j, ca_err = 0, server_err = 1, input_addr_err = 0, output_addr_err = 0, output_intf_err = 0;
101 95
 	while ((j = getopt(argc, argv, "cFs:I:O:i:t:U:P:epD:h")) != -1) {
102 96
 		char *p = NULL;
@@ -200,89 +194,6 @@ void parse_options(struct ts *ts, int argc, char **argv) {
200 194
 	ts_LOGf("PID filter : %s\n", ts->pid_filter ? "enabled" : "disabled");
201 195
 }
202 196
 
203
-
204
-static unsigned long ts_pack;
205
-static int ts_pack_shown;
206
-
207
-void show_ts_pack(struct ts *ts, uint16_t pid, char *wtf, char *extra, uint8_t *ts_packet) {
208
-	char cw1_dump[8 * 6];
209
-	char cw2_dump[8 * 6];
210
-	if (ts->debug_level >= 4) {
211
-		if (ts_pack_shown)
212
-			return;
213
-		int stype = ts_packet_get_scrambled(ts_packet);
214
-		ts_hex_dump_buf(cw1_dump, 8 * 6, ts->key.cw    , 8, 0);
215
-		ts_hex_dump_buf(cw2_dump, 8 * 6, ts->key.cw + 8, 8, 0);
216
-		fprintf(stderr, "@ %s %s %03x %5ld %7ld | %s   %s | %s\n",
217
-			stype == 0 ? "------" :
218
-			stype == 2 ? "even 0" :
219
-			stype == 3 ? "odd  1" : "??????",
220
-			wtf,
221
-			pid,
222
-			ts_pack, ts_pack * 188,
223
-			cw1_dump, cw2_dump, extra ? extra : wtf);
224
-	}
225
-}
226
-
227
-void dump_ts_pack(struct ts *ts, uint16_t pid, uint8_t *ts_packet) {
228
-	if (pid == 0x010)		show_ts_pack(ts, pid, "nit", NULL, ts_packet);
229
-	else if (pid == 0x11)	show_ts_pack(ts, pid, "sdt", NULL, ts_packet);
230
-	else if (pid == 0x12)	show_ts_pack(ts, pid, "epg", NULL, ts_packet);
231
-	else					show_ts_pack(ts, pid, "---", NULL, ts_packet);
232
-}
233
-
234
-void ts_process_packets(struct ts *ts, uint8_t *data, ssize_t data_len) {
235
-	ssize_t i;
236
-	for (i=0; i<data_len; i += 188) {
237
-		uint8_t *ts_packet = data + i;
238
-		uint16_t pid = ts_packet_get_pid(ts_packet);
239
-
240
-		ts_pack_shown = 0;
241
-
242
-		process_pat(ts, pid, ts_packet);
243
-		process_cat(ts, pid, ts_packet);
244
-		process_pmt(ts, pid, ts_packet);
245
-		process_emm(ts, pid, ts_packet);
246
-		process_ecm(ts, pid, ts_packet);
247
-
248
-		if (!ts_pack_shown)
249
-			dump_ts_pack(ts, pid, ts_packet);
250
-
251
-		int scramble_idx = ts_packet_get_scrambled(ts_packet);
252
-		if (scramble_idx > 1) {
253
-			if (ts->key.is_valid_cw) {
254
-				// scramble_idx 2 == even key
255
-				// scramble_idx 3 == odd key
256
-				ts_packet_set_not_scrambled(ts_packet);
257
-				uint8_t payload_ofs = ts_packet_get_payload_offset(ts_packet);
258
-				dvbcsa_decrypt(ts->key.csakey[scramble_idx - 2], ts_packet + payload_ofs, 188 - payload_ofs);
259
-			} else {
260
-				// Can't decrypt the packet just make it NULL packet
261
-				if (ts->pid_filter)
262
-					ts_packet_set_pid(ts_packet, 0x1fff);
263
-			}
264
-		}
265
-
266
-		ts_pack++;
267
-	}
268
-}
269
-
270
-void ts_write_packets(struct ts *ts, uint8_t *data, ssize_t data_len) {
271
-	ssize_t i;
272
-	for (i=0; i<data_len; i += 188) {
273
-		uint8_t *ts_packet = data + i;
274
-		uint16_t pid = ts_packet_get_pid(ts_packet);
275
-		if (ts->pid_filter) {
276
-			if (pidmap_get(&ts->pidmap, pid)) // PAT or allowed PIDs
277
-				write(ts->output.fd, ts_packet, 188);
278
-		} else {
279
-			write(ts->output.fd, ts_packet, 188);
280
-		}
281
-	}
282
-}
283
-
284
-#define FRAME_SIZE (188 * 7)
285
-
286 197
 int main(int argc, char **argv) {
287 198
 	ssize_t readen;
288 199
 	uint8_t ts_packet[FRAME_SIZE];
@@ -299,17 +210,30 @@ int main(int argc, char **argv) {
299 210
 	if (ts.output.type == NET_IO && udp_connect_output(&ts.output) < 1)
300 211
 		goto EXIT;
301 212
 
213
+	ts.threaded = !(ts.input.type == FILE_IO && ts.input.fd != 0);
214
+
215
+	if (&ts.threaded) {
216
+		pthread_create(&ts.decode_thread, NULL, &decode_thread, &ts);
217
+		pthread_create(&ts.write_thread, NULL , &write_thread , &ts);
218
+	}
219
+
302 220
 	camd_start(&ts);
303 221
 	do {
304 222
 		readen = read(ts.input.fd, ts_packet, FRAME_SIZE);
305
-		if (readen > 0) {
306
-			ts_process_packets(&ts, ts_packet, readen);
307
-			ts_write_packets(&ts, ts_packet, readen);
308
-		}
223
+		if (readen > 0)
224
+			process_packets(&ts, ts_packet, readen);
309 225
 	} while (readen > 0);
310 226
 EXIT:
311 227
 	camd_stop(&ts);
312 228
 
229
+	if (ts.threaded) {
230
+		ts.decode_stop = 1;
231
+		ts.write_stop = 1;
232
+
233
+		pthread_join(ts.decode_thread, NULL);
234
+		pthread_join(ts.write_thread, NULL);
235
+	}
236
+
313 237
 	data_free(&ts);
314 238
 
315 239
 	exit(0);

Loading…
Cancel
Save