Browse Source

Add circullar (cbuf) buffer code.

Georgi Chorbadzhiyski 12 years ago
parent
commit
47cde0cdca
4 changed files with 325 additions and 1 deletions
  1. 1
    1
      Makefile
  2. 289
    0
      cbuf.c
  3. 34
    0
      cbuf.h
  4. 1
    0
      libfuncs.h

+ 1
- 1
Makefile View File

@@ -5,7 +5,7 @@ CFLAGS = -ggdb -Wall -Wextra -Wshadow -Wformat-security -O2
5 5
 RM = /bin/rm -f
6 6
 Q=@
7 7
 
8
-OBJS = queue.o list.o io.o log.o http_response.o asyncdns.o server.o misc.o
8
+OBJS = queue.o list.o cbuf.o io.o log.o http_response.o asyncdns.o server.o misc.o
9 9
 PROG = libfuncs.a
10 10
 
11 11
 all: $(PROG)

+ 289
- 0
cbuf.c View File

@@ -0,0 +1,289 @@
1
+#include <stdio.h>
2
+#include <stdlib.h>
3
+#include <string.h>
4
+#include <netdb.h>
5
+#include <pthread.h>
6
+#include <assert.h>
7
+
8
+#include "libfuncs.h"
9
+#include "cbuf.h"
10
+
11
+static void cbuf_lock(CBUF *b) {
12
+	pthread_mutex_lock(b->lock);
13
+}
14
+
15
+static void cbuf_unlock(CBUF *b) {
16
+	pthread_mutex_unlock(b->lock);
17
+}
18
+
19
+/* Returns how much data is filled in the buffer */
20
+int cbuf_free_data_size(CBUF *b) {
21
+	int ret = b->size - (b->input - b->output);
22
+	assert(ret >= 0);
23
+	return ret;
24
+}
25
+
26
+void cbuf_dump(CBUF *b) {
27
+	LOGf("CBUF  [%10s]: size:%d pos:%d writepos:%d input:%llu output:%llu free_data:%d buffered:%lld\n",
28
+		b->name,
29
+		b->size,
30
+		b->pos,
31
+		b->writepos,
32
+		b->input,
33
+		b->output,
34
+		cbuf_free_data_size(b),
35
+		b->input - b->output
36
+	);
37
+/*
38
+	char *z = b->buffer;
39
+	printf("cbuf(%s), dump:", b->name);
40
+	int i;
41
+	for (i=0;i<b->size;i++) {
42
+		printf("%c", z[i]);
43
+	}
44
+	printf("\n\n");
45
+*/
46
+}
47
+
48
+CBUF *cbuf_init(int buffer_size, char *name) {
49
+	CBUF *b = calloc(1, sizeof(CBUF));
50
+	if (!b)
51
+		return NULL;
52
+	if (!buffer_size)
53
+		return 0;
54
+	pthread_mutex_t *mutex = malloc(sizeof(pthread_mutex_t));
55
+	if (pthread_mutex_init(mutex, NULL) != 0) {
56
+		perror("cbuf_new: mutex_init");
57
+		return NULL;
58
+	}
59
+	b->lock     = mutex;
60
+	b->name     = strdup(name);
61
+	b->size     = buffer_size;
62
+	b->pos      = 0;
63
+	b->writepos = 0;
64
+	b->buffer   = calloc(1, buffer_size);
65
+	if (!b->buffer) {
66
+		free(b);
67
+		LOGf("CBUF  [%10s]: Can't allocate buffer size: %d\n", name, buffer_size);
68
+		return NULL;
69
+	}
70
+	return b;
71
+}
72
+
73
+void cbuf_free(CBUF **pb) {
74
+	CBUF *b = *pb;
75
+	if (!b)
76
+		return;
77
+	pthread_mutex_destroy(b->lock);
78
+	FREE(b->lock);
79
+	FREE(b->buffer);
80
+	FREE(b->name);
81
+	FREE(*pb);
82
+}
83
+
84
+// Returns -1 on buffer wrap around
85
+int cbuf_fill(CBUF *b, uint8_t *data, int datasize) {
86
+	int ret = 0;
87
+	cbuf_lock(b);
88
+//	LOGf("  cbuf_fill(%s, '%s', %d)\n", b->name, data, datasize);
89
+//	cbuf_dump(b);
90
+	assert(datasize <= b->size);
91
+	int to_copy = min(datasize, (b->size - b->pos));
92
+	if (!to_copy || !data) {
93
+		LOGf("CBUF [%10s]: Nothing to fill.\n", b->name);
94
+		ret = -2;
95
+		goto OUT;
96
+	}
97
+	if (cbuf_free_data_size(b)-to_copy <= 0) {
98
+//		LOGf("CBUF [%10s]: Buffer will wrap by (%d bytes). Data not filled, consume more!\n", b->name, -(cbuf_free_data_size(b)-to_copy));
99
+//		cbuf_dump(b);
100
+//		b->debug_get = 1;
101
+		ret = -1;
102
+		goto OUT;
103
+	}
104
+	memcpy(b->buffer + b->pos, data, to_copy);
105
+	int copied = to_copy;
106
+	b->pos   += copied; // Move current buffer position
107
+	b->input += copied;
108
+	assert(b->pos <= b->size);
109
+	if (b->pos == b->size) { // Buffer wrap around
110
+		b->pos = 0;
111
+	}
112
+	if (copied < datasize) { // Move the rest
113
+//		Logs when wrapping
114
+//		LOGf("cbuf(%10s) copied < datasize, copied:%d datasize:%d datasize-copied: %d pos:%d\n",
115
+//			b->name, copied, datasize, datasize - copied, b->pos);
116
+//		cbuf_dump(b);
117
+		cbuf_unlock(b);
118
+		ret = cbuf_fill(b, data + copied, datasize - copied);
119
+		goto OUT;
120
+	}
121
+
122
+OUT:
123
+	cbuf_unlock(b);
124
+	return ret;
125
+}
126
+
127
+
128
+
129
+
130
+
131
+/* Returns how much space is left to the end of the buffer */
132
+static int cbuf_size_to_end(CBUF *b) {
133
+	int ret = b->input - b->output;
134
+	if (b->writepos + ret > b->size) {
135
+		ret = b->size - b->writepos;
136
+	}
137
+	return ret;
138
+}
139
+
140
+int cbuf_data_size(CBUF *b) {
141
+	return cbuf_size_to_end(b);
142
+}
143
+
144
+void *cbuf_get(CBUF *b, int size, int *ret_datasize) {
145
+	cbuf_lock(b);
146
+	void *ret = NULL;
147
+	int new_size = min(size, cbuf_size_to_end(b));
148
+	if (b->debug_get) {
149
+		LOGf("1 cbuf_get(%s, %d) new_size: %d size_to_end: %d\n",
150
+				b->name, size, new_size, cbuf_size_to_end(b));
151
+		cbuf_dump(b);
152
+	}
153
+	if (new_size <= 0) { // No data
154
+		*ret_datasize = 0;
155
+		ret = NULL;
156
+		goto OUT;
157
+	}
158
+	*ret_datasize = new_size;
159
+	ret = b->buffer + b->writepos;
160
+	b->writepos += new_size; // Move writepos
161
+	b->output   += new_size;
162
+	if (b->writepos > b->size) {
163
+		LOGf("!!! b->writepos > b->size !!! size:%d new_size:%d\n", size, new_size);
164
+		cbuf_dump(b);
165
+		assert(b->writepos <= b->size);
166
+	}
167
+	if (b->writepos == b->size) // Buffer wraparound
168
+		b->writepos = 0;
169
+
170
+OUT:
171
+	if (b->debug_get) {
172
+		LOGf("2 cbuf_get(%s, %d) new_size: %d size_to_end: %d ret_sz:%d\n",
173
+				b->name, size, new_size, cbuf_size_to_end(b), *ret_datasize);
174
+		cbuf_dump(b);
175
+		b->debug_get = 0;
176
+	}
177
+	cbuf_unlock(b);
178
+	return ret;
179
+}
180
+
181
+void *cbuf_peek(CBUF *b, int size, int *ret_datasize) {
182
+	cbuf_lock(b);
183
+	void *ret = NULL;
184
+	int new_size = min(size, cbuf_size_to_end(b));
185
+
186
+	if (new_size <= 0) { // No data
187
+		*ret_datasize = 0;
188
+		ret = NULL;
189
+		goto OUT;
190
+	}
191
+	*ret_datasize = new_size;
192
+	ret = b->buffer + b->writepos;
193
+
194
+OUT:
195
+	cbuf_unlock(b);
196
+	return ret;
197
+}
198
+
199
+void cbuf_copy(CBUF *from, CBUF *to) {
200
+//	LOGf("cbuf_copy(%s, %s)\n", from->name, to->name);
201
+	int data_size;
202
+	void *data;
203
+	do {
204
+		data = cbuf_get(from, from->input - from->output, &data_size);
205
+		if (from->debug_get)
206
+			LOGf("copied from %s to %s size=%d\n", from->name, to->name, data_size);
207
+		if (!data || data_size <= 0)
208
+			break;
209
+		cbuf_fill(to, data, data_size);
210
+	} while (1);
211
+}
212
+
213
+void cbuf_poison(CBUF *b, char poison_byte) {
214
+	memset(b->buffer, poison_byte, b->size);
215
+}
216
+
217
+
218
+/*
219
+void consume(CBUF *b, int size) {
220
+	int data_size, i;
221
+	char *data = cbuf_get(b, size, &data_size);
222
+	if (data && data_size > 0) {
223
+		printf("Consumed %d Data: \"", data_size);
224
+		for (i=0;i<data_size;i++) {
225
+			printf("%c", data[i]);
226
+		}
227
+		printf("\"\n");
228
+	} else {
229
+		printf("%s", "There is nothing to consume!\n");
230
+	}
231
+}
232
+
233
+void cbuf_test() {
234
+	CBUF *in;
235
+
236
+	CBUF *out;
237
+	out = cbuf_init(64, "out");
238
+	cbuf_poison(out, 'O');
239
+	cbuf_dump(out);
240
+
241
+	in = cbuf_init(4, "in");
242
+	cbuf_poison(in, '*');
243
+
244
+	cbuf_fill(in, "12", 2);
245
+	cbuf_fill(in, "34", 2);
246
+	cbuf_fill(in, "z" , 1);
247
+	cbuf_dump(in);
248
+
249
+	cbuf_copy(in, out);
250
+	cbuf_dump(out);
251
+	consume(in, 16);
252
+	cbuf_dump(in);
253
+
254
+	cbuf_fill(in, "a", 1);
255
+	cbuf_fill(in, "b", 1);
256
+	cbuf_fill(in, "c", 1);
257
+	cbuf_fill(in, "d", 1);
258
+	cbuf_dump(in);
259
+
260
+	cbuf_copy(in, out);
261
+	cbuf_dump(out);
262
+
263
+	consume(in, 4);
264
+	cbuf_dump(in);
265
+
266
+	cbuf_fill(in, "gfgf", 4);
267
+	cbuf_dump(in);
268
+
269
+	consume(in, 4);
270
+	cbuf_dump(in);
271
+
272
+	cbuf_fill(in, "r", 1);
273
+	cbuf_fill(in, "r", 1);
274
+	cbuf_fill(in, "r", 1);
275
+	cbuf_fill(in, "r", 1);
276
+	cbuf_dump(in);
277
+
278
+	consume(out, 6);
279
+	cbuf_copy(in, out);
280
+	consume(out, 6);
281
+	consume(out, 6);
282
+	consume(out, 6);
283
+	consume(out, 6);
284
+
285
+	cbuf_free(in);
286
+	cbuf_free(out);
287
+
288
+}
289
+*/

+ 34
- 0
cbuf.h View File

@@ -0,0 +1,34 @@
1
+#ifndef CBUF_H
2
+#define CBUF_H
3
+
4
+#include <netdb.h>
5
+
6
+// Circular buffer
7
+typedef struct {
8
+	pthread_mutex_t *lock;
9
+	char *name;
10
+	int size;			/* Buffer size, must be (bufsize % 1316) == 0 */
11
+	int pos;			/* Up to where the buffer is filled */
12
+	int writepos;		/* Up to where the buffer is get */
13
+	void *buffer;		/* The buffer data */
14
+	uint64_t input;
15
+	uint64_t output;
16
+	int pos_wrapped;
17
+	int debug_get;
18
+} CBUF;
19
+
20
+CBUF *cbuf_init(int buffer_size, char *name);
21
+void cbuf_free(CBUF **buffer);
22
+
23
+int  cbuf_fill(CBUF *b, uint8_t *data, int datasize);
24
+void *cbuf_get(CBUF *b, int size, int *ret_datasize);
25
+void *cbuf_peek(CBUF *b, int size, int *ret_datasize);
26
+void cbuf_copy(CBUF *src, CBUF *dest);
27
+
28
+int cbuf_data_size(CBUF *b);
29
+
30
+void cbuf_poison(CBUF *b, char poison_byte);
31
+
32
+void cbuf_dump(CBUF *b);
33
+
34
+#endif

+ 1
- 0
libfuncs.h View File

@@ -37,6 +37,7 @@
37 37
 #include "list.h"
38 38
 #include "log.h"
39 39
 #include "queue.h"
40
+#include "cbuf.h"
40 41
 #include "server.h"
41 42
 #include "misc.h"
42 43
 

Loading…
Cancel
Save