Browse Source

Add decode and write circular buffers

Georgi Chorbadzhiyski 13 years ago
parent
commit
988fda9eeb
5 changed files with 343 additions and 1 deletions
  1. 1
    1
      Makefile
  2. 290
    0
      cbuf.c
  3. 34
    0
      cbuf.h
  4. 6
    0
      data.c
  5. 12
    0
      data.h

+ 1
- 1
Makefile View File

@@ -10,7 +10,7 @@ FUNCS_LIB = $(FUNCS_DIR)/libfuncs.a
10 10
 TS_DIR = libts
11 11
 TS_LIB = $(TS_DIR)/libts.a
12 12
 
13
-tsdecrypt_OBJS = data.o udp.o util.o camd.o tables.o tsdecrypt.o $(FUNCS_LIB) $(TS_LIB)
13
+tsdecrypt_OBJS = cbuf.o data.o udp.o util.o camd.o tables.o tsdecrypt.o $(FUNCS_LIB) $(TS_LIB)
14 14
 tsdecrypt_LIBS = -lcrypto -ldvbcsa -lpthread
15 15
 
16 16
 CLEAN_OBJS = tsdecrypt $(tsdecrypt_OBJS) *~

+ 290
- 0
cbuf.c View File

@@ -0,0 +1,290 @@
1
+#include <stdio.h>
2
+#include <stdlib.h>
3
+#include <string.h>
4
+#include <netdb.h>
5
+#include <pthread.h>
6
+#include <assert.h>
7
+
8
+#include "libfuncs/libfuncs.h"
9
+
10
+#include "cbuf.h"
11
+
12
+static void cbuf_lock(CBUF *b) {
13
+	pthread_mutex_lock(b->lock);
14
+}
15
+
16
+static void cbuf_unlock(CBUF *b) {
17
+	pthread_mutex_unlock(b->lock);
18
+}
19
+
20
+/* Returns how much data is filled in the buffer */
21
+int cbuf_free_data_size(CBUF *b) {
22
+	int ret = b->size - (b->input - b->output);
23
+	assert(ret >= 0);
24
+	return ret;
25
+}
26
+
27
+void cbuf_dump(CBUF *b) {
28
+	LOGf("CBUF  [%10s]: size:%d pos:%d writepos:%d input:%llu output:%llu free_data:%d buffered:%lld\n",
29
+		b->name,
30
+		b->size,
31
+		b->pos,
32
+		b->writepos,
33
+		b->input,
34
+		b->output,
35
+		cbuf_free_data_size(b),
36
+		b->input - b->output
37
+	);
38
+/*
39
+	char *z = b->buffer;
40
+	printf("cbuf(%s), dump:", b->name);
41
+	int i;
42
+	for (i=0;i<b->size;i++) {
43
+		printf("%c", z[i]);
44
+	}
45
+	printf("\n\n");
46
+*/
47
+}
48
+
49
+CBUF *cbuf_init(int buffer_size, char *name) {
50
+	CBUF *b = calloc(1, sizeof(CBUF));
51
+	if (!b)
52
+		return NULL;
53
+	if (!buffer_size)
54
+		return 0;
55
+	pthread_mutex_t *mutex = malloc(sizeof(pthread_mutex_t));
56
+	if (pthread_mutex_init(mutex, NULL) != 0) {
57
+		perror("cbuf_new: mutex_init");
58
+		return NULL;
59
+	}
60
+	b->lock     = mutex;
61
+	b->name     = strdup(name);
62
+	b->size     = buffer_size;
63
+	b->pos      = 0;
64
+	b->writepos = 0;
65
+	b->buffer   = calloc(1, buffer_size);
66
+	if (!b->buffer) {
67
+		free(b);
68
+		LOGf("CBUF  [%10s]: Can't allocate buffer size: %d\n", name, buffer_size);
69
+		return NULL;
70
+	}
71
+	return b;
72
+}
73
+
74
+void cbuf_free(CBUF **pb) {
75
+	CBUF *b = *pb;
76
+	if (!b)
77
+		return;
78
+	pthread_mutex_destroy(b->lock);
79
+	FREE(b->lock);
80
+	FREE(b->buffer);
81
+	FREE(b->name);
82
+	FREE(*pb);
83
+}
84
+
85
+// Returns -1 on buffer wrap around
86
+int cbuf_fill(CBUF *b, uint8_t *data, int datasize) {
87
+	int ret = 0;
88
+	cbuf_lock(b);
89
+//	LOGf("  cbuf_fill(%s, '%s', %d)\n", b->name, data, datasize);
90
+//	cbuf_dump(b);
91
+	assert(datasize <= b->size);
92
+	int to_copy = min(datasize, (b->size - b->pos));
93
+	if (!to_copy || !data) {
94
+		LOGf("CBUF [%10s]: Nothing to fill.\n", b->name);
95
+		ret = -2;
96
+		goto OUT;
97
+	}
98
+	if (cbuf_free_data_size(b)-to_copy <= 0) {
99
+//		LOGf("CBUF [%10s]: Buffer will wrap by (%d bytes). Data not filled, consume more!\n", b->name, -(cbuf_free_data_size(b)-to_copy));
100
+//		cbuf_dump(b);
101
+//		b->debug_get = 1;
102
+		ret = -1;
103
+		goto OUT;
104
+	}
105
+	memcpy(b->buffer + b->pos, data, to_copy);
106
+	int copied = to_copy;
107
+	b->pos   += copied; // Move current buffer position
108
+	b->input += copied;
109
+	assert(b->pos <= b->size);
110
+	if (b->pos == b->size) { // Buffer wrap around
111
+		b->pos = 0;
112
+	}
113
+	if (copied < datasize) { // Move the rest
114
+//		Logs when wrapping
115
+//		LOGf("cbuf(%10s) copied < datasize, copied:%d datasize:%d datasize-copied: %d pos:%d\n",
116
+//			b->name, copied, datasize, datasize - copied, b->pos);
117
+//		cbuf_dump(b);
118
+		cbuf_unlock(b);
119
+		ret = cbuf_fill(b, data + copied, datasize - copied);
120
+		goto OUT;
121
+	}
122
+
123
+OUT:
124
+	cbuf_unlock(b);
125
+	return ret;
126
+}
127
+
128
+
129
+
130
+
131
+
132
+/* Returns how much space is left to the end of the buffer */
133
+static int cbuf_size_to_end(CBUF *b) {
134
+	int ret = b->input - b->output;
135
+	if (b->writepos + ret > b->size) {
136
+		ret = b->size - b->writepos;
137
+	}
138
+	return ret;
139
+}
140
+
141
+int cbuf_data_size(CBUF *b) {
142
+	return cbuf_size_to_end(b);
143
+}
144
+
145
+void *cbuf_get(CBUF *b, int size, int *ret_datasize) {
146
+	cbuf_lock(b);
147
+	void *ret = NULL;
148
+	int new_size = min(size, cbuf_size_to_end(b));
149
+	if (b->debug_get) {
150
+		LOGf("1 cbuf_get(%s, %d) new_size: %d size_to_end: %d\n",
151
+				b->name, size, new_size, cbuf_size_to_end(b));
152
+		cbuf_dump(b);
153
+	}
154
+	if (new_size <= 0) { // No data
155
+		*ret_datasize = 0;
156
+		ret = NULL;
157
+		goto OUT;
158
+	}
159
+	*ret_datasize = new_size;
160
+	ret = b->buffer + b->writepos;
161
+	b->writepos += new_size; // Move writepos
162
+	b->output   += new_size;
163
+	if (b->writepos > b->size) {
164
+		LOGf("!!! b->writepos > b->size !!! size:%d new_size:%d\n", size, new_size);
165
+		cbuf_dump(b);
166
+		assert(b->writepos <= b->size);
167
+	}
168
+	if (b->writepos == b->size) // Buffer wraparound
169
+		b->writepos = 0;
170
+
171
+OUT:
172
+	if (b->debug_get) {
173
+		LOGf("2 cbuf_get(%s, %d) new_size: %d size_to_end: %d ret_sz:%d\n",
174
+				b->name, size, new_size, cbuf_size_to_end(b), *ret_datasize);
175
+		cbuf_dump(b);
176
+		b->debug_get = 0;
177
+	}
178
+	cbuf_unlock(b);
179
+	return ret;
180
+}
181
+
182
+void *cbuf_peek(CBUF *b, int size, int *ret_datasize) {
183
+	cbuf_lock(b);
184
+	void *ret = NULL;
185
+	int new_size = min(size, cbuf_size_to_end(b));
186
+
187
+	if (new_size <= 0) { // No data
188
+		*ret_datasize = 0;
189
+		ret = NULL;
190
+		goto OUT;
191
+	}
192
+	*ret_datasize = new_size;
193
+	ret = b->buffer + b->writepos;
194
+
195
+OUT:
196
+	cbuf_unlock(b);
197
+	return ret;
198
+}
199
+
200
+void cbuf_copy(CBUF *from, CBUF *to) {
201
+//	LOGf("cbuf_copy(%s, %s)\n", from->name, to->name);
202
+	int data_size;
203
+	void *data;
204
+	do {
205
+		data = cbuf_get(from, from->input - from->output, &data_size);
206
+		if (from->debug_get)
207
+			LOGf("copied from %s to %s size=%d\n", from->name, to->name, data_size);
208
+		if (!data || data_size <= 0)
209
+			break;
210
+		cbuf_fill(to, data, data_size);
211
+	} while (1);
212
+}
213
+
214
+void cbuf_poison(CBUF *b, char poison_byte) {
215
+	memset(b->buffer, poison_byte, b->size);
216
+}
217
+
218
+
219
+/*
220
+void consume(CBUF *b, int size) {
221
+	int data_size, i;
222
+	char *data = cbuf_get(b, size, &data_size);
223
+	if (data && data_size > 0) {
224
+		printf("Consumed %d Data: \"", data_size);
225
+		for (i=0;i<data_size;i++) {
226
+			printf("%c", data[i]);
227
+		}
228
+		printf("\"\n");
229
+	} else {
230
+		printf("%s", "There is nothing to consume!\n");
231
+	}
232
+}
233
+
234
+void cbuf_test() {
235
+	CBUF *in;
236
+
237
+	CBUF *out;
238
+	out = cbuf_init(64, "out");
239
+	cbuf_poison(out, 'O');
240
+	cbuf_dump(out);
241
+
242
+	in = cbuf_init(4, "in");
243
+	cbuf_poison(in, '*');
244
+
245
+	cbuf_fill(in, "12", 2);
246
+	cbuf_fill(in, "34", 2);
247
+	cbuf_fill(in, "z" , 1);
248
+	cbuf_dump(in);
249
+
250
+	cbuf_copy(in, out);
251
+	cbuf_dump(out);
252
+	consume(in, 16);
253
+	cbuf_dump(in);
254
+
255
+	cbuf_fill(in, "a", 1);
256
+	cbuf_fill(in, "b", 1);
257
+	cbuf_fill(in, "c", 1);
258
+	cbuf_fill(in, "d", 1);
259
+	cbuf_dump(in);
260
+
261
+	cbuf_copy(in, out);
262
+	cbuf_dump(out);
263
+
264
+	consume(in, 4);
265
+	cbuf_dump(in);
266
+
267
+	cbuf_fill(in, "gfgf", 4);
268
+	cbuf_dump(in);
269
+
270
+	consume(in, 4);
271
+	cbuf_dump(in);
272
+
273
+	cbuf_fill(in, "r", 1);
274
+	cbuf_fill(in, "r", 1);
275
+	cbuf_fill(in, "r", 1);
276
+	cbuf_fill(in, "r", 1);
277
+	cbuf_dump(in);
278
+
279
+	consume(out, 6);
280
+	cbuf_copy(in, out);
281
+	consume(out, 6);
282
+	consume(out, 6);
283
+	consume(out, 6);
284
+	consume(out, 6);
285
+
286
+	cbuf_free(in);
287
+	cbuf_free(out);
288
+
289
+}
290
+*/

+ 34
- 0
cbuf.h View File

@@ -0,0 +1,34 @@
1
+#ifndef CBUF_H
2
+#define CBUF_H
3
+
4
+#include <netdb.h>
5
+
6
+// Circular buffer
7
+typedef struct {
8
+	pthread_mutex_t *lock;
9
+	char *name;
10
+	int size;			/* Buffer size, must be (bufsize % 1316) == 0 */
11
+	int pos;			/* Up to where the buffer is filled */
12
+	int writepos;		/* Up to where the buffer is get */
13
+	void *buffer;		/* The buffer data */
14
+	uint64_t input;
15
+	uint64_t output;
16
+	int pos_wrapped;
17
+	int debug_get;
18
+} CBUF;
19
+
20
+CBUF *cbuf_init(int buffer_size, char *name);
21
+void cbuf_free(CBUF **buffer);
22
+
23
+int  cbuf_fill(CBUF *b, uint8_t *data, int datasize);
24
+void *cbuf_get(CBUF *b, int size, int *ret_datasize);
25
+void *cbuf_peek(CBUF *b, int size, int *ret_datasize);
26
+void cbuf_copy(CBUF *src, CBUF *dest);
27
+
28
+int cbuf_data_size(CBUF *b);
29
+
30
+void cbuf_poison(CBUF *b, char poison_byte);
31
+
32
+void cbuf_dump(CBUF *b);
33
+
34
+#endif

+ 6
- 0
data.c View File

@@ -52,6 +52,9 @@ void data_init(struct ts *ts) {
52 52
 	ts->output.fd   = 1; // STDOUT
53 53
 	ts->output.type = FILE_IO;
54 54
 	ts->output.ttl  = 1;
55
+
56
+	ts->decode_buf  = cbuf_init((7 * dvbcsa_bs_batch_size() * 188) * 2, "decode");
57
+	ts->write_buf   = cbuf_init((7 * dvbcsa_bs_batch_size() * 188) * 2, "write");
55 58
 }
56 59
 
57 60
 void data_free(struct ts *ts) {
@@ -72,6 +75,9 @@ void data_free(struct ts *ts) {
72 75
 	dvbcsa_bs_key_free(ts->key.bs_csakey[0]);
73 76
 	dvbcsa_bs_key_free(ts->key.bs_csakey[1]);
74 77
 
78
+	cbuf_free(&ts->decode_buf);
79
+	cbuf_free(&ts->write_buf);
80
+
75 81
 	FREE(ts->input.fname);
76 82
 	FREE(ts->output.fname);
77 83
 }

+ 12
- 0
data.h View File

@@ -11,6 +11,8 @@
11 11
 #include "libfuncs/libfuncs.h"
12 12
 #include "libts/tsfuncs.h"
13 13
 
14
+#include "cbuf.h"
15
+
14 16
 struct key {
15 17
 	uint8_t				cw[16];
16 18
 	int					is_valid_cw;
@@ -90,6 +92,16 @@ struct ts {
90 92
 
91 93
 	int					camd_stop;
92 94
 	int					is_cw_error;
95
+
96
+	int					threaded;
97
+
98
+	int					decode_stop;
99
+	pthread_t			decode_thread;
100
+	CBUF				*decode_buf;
101
+
102
+	int					write_stop;
103
+	pthread_t			write_thread;
104
+	CBUF				*write_buf;
93 105
 };
94 106
 
95 107
 enum msg_type { EMM_MSG, ECM_MSG };

Loading…
Cancel
Save