Browse Source

Add web status, reconnect, reload and getconfig commands

Georgi Chorbadzhiyski 7 years ago
parent
commit
621a0a9a62
6 changed files with 195 additions and 47 deletions
  1. 4
    1
      Makefile
  2. 46
    0
      config.h
  3. 55
    44
      tomcast.c
  4. 81
    2
      web_pages.c
  5. 3
    0
      web_pages.h
  6. 6
    0
      web_server.c

+ 4
- 1
Makefile View File

@@ -1,7 +1,10 @@
1 1
 CC = cc
2 2
 STRIP = strip
3 3
 CROSS := $(TARGET)
4
-CFLAGS = -ggdb -Wall -Wextra -Wshadow -Wformat-security -Wno-strict-aliasing -O2
4
+CFLAGS := -O2 -ggdb -pipe \
5
+ -W -Wall -Wextra \
6
+ -Wshadow -Wformat-security -Wstrict-prototypes \
7
+ -Wredundant-decls -Wold-style-definition
5 8
 RM = /bin/rm -f
6 9
 Q = @
7 10
 

+ 46
- 0
config.h View File

@@ -24,6 +24,48 @@
24 24
 
25 25
 #include "libfuncs/libfuncs.h"
26 26
 
27
+typedef enum { udp_sock, tcp_sock } channel_source;
28
+
29
+typedef struct {
30
+	channel_source sproto;
31
+	char *proto;
32
+	char *host;
33
+	char *path;
34
+	unsigned int port;
35
+} CHANSRC;
36
+
37
+#define MAX_CHANNEL_SOURCES 8
38
+
39
+typedef struct {
40
+	char *name;
41
+	char *source; /* Full source url */
42
+	char *sources[MAX_CHANNEL_SOURCES];
43
+	uint8_t num_src;
44
+	uint8_t curr_src;
45
+	char *dest_host;
46
+	int dest_port;
47
+} CHANNEL;
48
+
49
+typedef struct {
50
+	char  *name;
51
+	CHANNEL *channel;
52
+	int sock;				/* Server socket */
53
+	struct sockaddr_in src_sockname;
54
+	int clientsock;			/* The udp socket */
55
+	struct sockaddr_in dst_sockname;
56
+	int reconnect:1,		/* Set to 1 to force proxy reconnect */
57
+	    connected:1,		/* It's set to 1 when proxy is connected and serving clients */
58
+	    dienow:1,			/* Stop serving clients and exit now */
59
+	    freechannel:1;		/* Free channel data on object free (this is used in chanconf) */
60
+	int cookie;				/* Used in chanconf to determine if the restreamer is alrady checked */
61
+	pthread_t thread;
62
+	pthread_rwlock_t lock;
63
+	time_t conn_ts;
64
+	uint64_t read_bytes;
65
+	char status[64];
66
+} RESTREAMER;
67
+
68
+
27 69
 struct config {
28 70
 	char				*ident;
29 71
 	char				*pidfile;
@@ -47,4 +89,8 @@ struct config {
47 89
 	pthread_mutex_t		channels_lock;
48 90
 };
49 91
 
92
+extern void do_reconnect();
93
+extern void do_reconf();
94
+extern struct config *get_config(void);
95
+
50 96
 #endif

+ 55
- 44
tomcast.c View File

@@ -71,43 +71,6 @@ char *server_sig = "tomcast";
71 71
 char *server_ver = "1.15";
72 72
 char *copyright  = "Copyright (C) 2010-2013 Unix Solutions Ltd.";
73 73
 
74
-typedef enum { udp_sock, tcp_sock } channel_source;
75
-
76
-typedef struct {
77
-	channel_source sproto;
78
-	char *proto;
79
-	char *host;
80
-	char *path;
81
-	unsigned int port;
82
-} CHANSRC;
83
-
84
-#define MAX_CHANNEL_SOURCES 8
85
-
86
-typedef struct {
87
-	char *name;
88
-	char *source; /* Full source url */
89
-	char *sources[MAX_CHANNEL_SOURCES];
90
-	uint8_t num_src;
91
-	uint8_t curr_src;
92
-	char *dest_host;
93
-	int dest_port;
94
-} CHANNEL;
95
-
96
-typedef struct {
97
-	char  *name;
98
-	CHANNEL *channel;
99
-	int sock;				/* Server socket */
100
-	struct sockaddr_in src_sockname;
101
-	int clientsock;			/* The udp socket */
102
-	struct sockaddr_in dst_sockname;
103
-	int reconnect:1,		/* Set to 1 to force proxy reconnect */
104
-	    connected:1,		/* It's set to 1 when proxy is connected and serving clients */
105
-	    dienow:1,			/* Stop serving clients and exit now */
106
-	    freechannel:1;		/* Free channel data on object free (this is used in chanconf) */
107
-	int cookie;				/* Used in chanconf to determine if the restreamer is alrady checked */
108
-	pthread_t thread;
109
-} RESTREAMER;
110
-
111 74
 static struct config config;
112 75
 
113 76
 channel_source get_sproto(char *url) {
@@ -254,6 +217,11 @@ int connect_multicast(struct sockaddr_in send_to) {
254 217
 	return sendsock;
255 218
 }
256 219
 
220
+void proxy_set_status(RESTREAMER *r, const char *proxy_status) {
221
+	pthread_rwlock_wrlock(&r->lock);
222
+	snprintf(r->status, sizeof(r->status), "%s", proxy_status);
223
+	pthread_rwlock_unlock(&r->lock);
224
+}
257 225
 
258 226
 void connect_destination(RESTREAMER *r) {
259 227
 	CHANNEL *c = r->channel;
@@ -261,6 +229,7 @@ void connect_destination(RESTREAMER *r) {
261 229
 		shutdown_fd(&(r->clientsock));
262 230
 	r->clientsock = connect_multicast(r->dst_sockname);
263 231
 	LOGf("CONN : Connected dst_fd: %i | Chan: %s Dest: udp://%s:%d\n", r->clientsock, c->name, c->dest_host, c->dest_port);
232
+	proxy_set_status(r, "Connected udp");
264 233
 }
265 234
 
266 235
 RESTREAMER * new_restreamer(const char *name, CHANNEL *channel) {
@@ -280,6 +249,7 @@ RESTREAMER * new_restreamer(const char *name, CHANNEL *channel) {
280 249
 	r->channel = channel;
281 250
 	r->clientsock = -1;
282 251
 	r->dst_sockname = sockname;
252
+	pthread_rwlock_init(&r->lock, NULL);
283 253
 	connect_destination(r);
284 254
 	return r;
285 255
 }
@@ -489,10 +459,14 @@ int connect_source(RESTREAMER *r, int retries, int readbuflen, int *http_code) {
489 459
 	int active = 1;
490 460
 	int dret = async_resolve_host(src->host, src->port, &(r->src_sockname), DNS_RESOLVER_TIMEOUT, &active);
491 461
 	if (dret != 0) {
492
-		if (dret == 1)
462
+		if (dret == 1) {
493 463
 			proxy_log(r, "ERR  ","Can't resolve src host");
494
-		if (dret == 2)
464
+			proxy_set_status(r, "ERROR: Can not resolve source host");
465
+		}
466
+		if (dret == 2) {
495 467
 			proxy_log(r, "ERR  ","Timeout resolving src host");
468
+			proxy_set_status(r, "ERROR: Dns resolve timeout");
469
+		}
496 470
 		DO_RECONNECT;
497 471
 	}
498 472
 
@@ -507,6 +481,7 @@ int connect_source(RESTREAMER *r, int retries, int readbuflen, int *http_code) {
507 481
 		proxy_log(r, "NEW  ","");
508 482
 		if (do_connect(r->sock, (struct sockaddr *)&(r->src_sockname), sizeof(r->src_sockname), PROXY_CONNECT_TIMEOUT) < 0) {
509 483
 			LOGf("ERR  : Error connecting to %s srv_fd: %i err: %s\n", r->channel->source, r->sock, strerror(errno));
484
+			proxy_set_status(r, "ERROR: Can not connect to source");
510 485
 			DO_RECONNECT;
511 486
 		}
512 487
 
@@ -544,14 +519,17 @@ int connect_source(RESTREAMER *r, int retries, int readbuflen, int *http_code) {
544 519
 		}
545 520
 		if (*http_code == 0) { // No valid HTTP response, retry
546 521
 			LOGf("DEBUG: Server returned not valid HTTP code | srv_fd: %i\n", r->sock);
522
+			proxy_set_status(r, "ERROR: Source returned invalid HTTP code");
547 523
 			DO_RECONNECT;
548 524
 		}
549 525
 		if (*http_code == 504) { // No signal, exit
550 526
 			LOGf("ERR  : Get no-signal for %s from %s on srv_fd: %i\n", r->channel->name, r->channel->source, r->sock);
527
+			proxy_set_status(r, "ERROR: Source returned no-signal");
551 528
 			FATAL_ERROR;
552 529
 		}
553 530
 		if (*http_code > 300) { // Unhandled or error codes, exit
554 531
 			LOGf("ERR  : Get code %i for %s from %s on srv_fd: %i exiting.\n", *http_code, r->channel->name, r->channel->source, r->sock);
532
+			proxy_set_status(r, "ERROR: Source returned unhandled error code");
555 533
 			FATAL_ERROR;
556 534
 		}
557 535
 		// connected ok, continue
@@ -592,6 +570,7 @@ int connect_source(RESTREAMER *r, int retries, int readbuflen, int *http_code) {
592 570
 	if (setsockopt(r->sock, SOL_SOCKET, SO_RCVBUF, (const char *)&readbuflen, sizeof(readbuflen)) < 0)
593 571
 		log_perror("play(): setsockopt(SO_RCVBUF)", errno);
594 572
 
573
+	proxy_set_status(r, "Connected");
595 574
 	r->connected = 1;
596 575
 
597 576
 	free_chansrc(src);
@@ -601,10 +580,12 @@ int connect_source(RESTREAMER *r, int retries, int readbuflen, int *http_code) {
601 580
 int check_restreamer_state(RESTREAMER *r) {
602 581
 	if (r->dienow) {
603 582
 		// LOGf("PROXY: Forced disconnect on srv_fd: %i | Channel: %s Source: %s\n", r->sock, r->channel->name, r->channel->source);
583
+		proxy_set_status(r, "Dying");
604 584
 		return 2;
605 585
 	}
606 586
 	if (r->reconnect) {
607 587
 		LOGf("PROXY: Forced reconnect on srv_fd: %i | Channel: %s Source: %s\n", r->sock, r->channel->name, r->channel->source);
588
+		proxy_set_status(r, "Forced reconnect");
608 589
 		return 1;
609 590
 	}
610 591
 	return 0;
@@ -628,7 +609,7 @@ int check_restreamer_state(RESTREAMER *r) {
628 609
 		0 = synced ok
629 610
 		1 = not synced, reconnect
630 611
 */
631
-int mpeg_sync(int proxysock, char *channel, channel_source source_proto) {
612
+int mpeg_sync(RESTREAMER *r, int proxysock, char *channel, channel_source source_proto) {
632 613
 	time_t sync_start = time(NULL);
633 614
 	unsigned int sync_packets = 0;
634 615
 	unsigned int read_bytes = 0;
@@ -644,6 +625,7 @@ int mpeg_sync(int proxysock, char *channel, channel_source source_proto) {
644 625
 resync:
645 626
 		if (fdread_ex(proxysock, syncframe, 1, _timeout, _retries, 1) != 1) {
646 627
 			LOGf("DEBUG: mpeg_sync fdread() timeout | Channel: %s\n", channel);
628
+			proxy_set_status(r, "ERROR: fdread() timeout while syncing mpeg");
647 629
 			return 1; // reconnect
648 630
 		}
649 631
 		// LOGf("DEBUG:     Read 0x%02x Offset %u Sync: %u\n", (uint8_t)syncframe[0], read_bytes, sync_packets);
@@ -652,6 +634,7 @@ resync:
652 634
 			ssize_t rdsz = fdread_ex(proxysock, syncframe, 188-1, _timeout, _retries, 1);
653 635
 			if (rdsz != 188-1) {
654 636
 				LOGf("DEBUG: mpeg_sync fdread() timeout | Channel: %s\n", channel);
637
+				proxy_set_status(r, "ERROR: fdread() timeout while syncing mpeg");
655 638
 				return 1; // reconnect
656 639
 			}
657 640
 			read_bytes += 188-1;
@@ -663,14 +646,21 @@ resync:
663 646
 		}
664 647
 		if (read_bytes > FRAME_PACKET_SIZE) { // Can't sync in 1316 bytes
665 648
 			LOGf("DEBUG: Can't sync after %d bytes | Channel: %s\n", FRAME_PACKET_SIZE, channel);
649
+			proxy_set_status(r, "ERROR: Can not sync mpeg");
666 650
 			return 1; // reconnect
667 651
 		}
668 652
 		if (sync_start+2 <= time(NULL)) { // Do not sync in two seconds
669 653
 			LOGf("DEBUG: Timeout while syncing (read %u bytes) | Channel: %s\n", read_bytes, channel);
654
+			proxy_set_status(r, "ERROR: Timeout while syncing mpeg");
670 655
 			return 1; // reconnect
671 656
 		}
672 657
 	} while (1);
658
+	pthread_rwlock_wrlock(&r->lock);
659
+	r->conn_ts = time(NULL);
660
+	r->read_bytes = read_bytes;
661
+	pthread_rwlock_unlock(&r->lock);
673 662
 	LOGf("SYNC : TS synced after %u bytes | Channel: %s\n", read_bytes-FRAME_PACKET_SIZE, channel);
663
+	proxy_set_status(r, "Synced");
674 664
 	return 0;
675 665
 }
676 666
 
@@ -795,13 +785,16 @@ void * proxy_ts_stream(void *self) {
795 785
 
796 786
 	int http_code = 0;
797 787
 	while (1) {
788
+		r->conn_ts = 0;
789
+		r->read_bytes = 0;
790
+
798 791
 		int result = connect_source(self, 1, FRAME_PACKET_SIZE * 1000, &http_code);
799 792
 		if (result > 0)
800 793
 			goto RECONNECT;
801 794
 
802 795
 		channel_source sproto = get_sproto(r->channel->source);
803 796
 
804
-		int mpgsync = mpeg_sync(r->sock, r->channel->name, sproto);
797
+		int mpgsync = mpeg_sync(r, r->sock, r->channel->name, sproto);
805 798
 		if (mpgsync == 1) // Timeout
806 799
 			goto RECONNECT;
807 800
 
@@ -824,6 +817,7 @@ void * proxy_ts_stream(void *self) {
824 817
 				LOGf("PROXY: zero read on srv_fd: %i | Channel: %s Source: %s\n", r->sock, r->channel->name, r->channel->source);
825 818
 				if (--max_zero_reads == 0) {
826 819
 					LOGf("PROXY: %d zero reads on srv_fd: %i | Channel: %s Source: %s\n", MAX_ZERO_READS, r->sock, r->channel->name, r->channel->source);
820
+					proxy_set_status(r, "ERROR: Too many zero reads");
827 821
 					break;
828 822
 				}
829 823
 				continue;
@@ -836,6 +830,9 @@ void * proxy_ts_stream(void *self) {
836 830
 				//LOGf("DEBUG: Short read (%d) on retreamer srv_fd: %i | Channel: %s\n", readen, sock, chan->name);
837 831
 				memcpy(buf+readen, TS_NULL_FRAME+readen, FRAME_PACKET_SIZE - readen);
838 832
 			}
833
+			pthread_rwlock_wrlock(&r->lock);
834
+			r->read_bytes += readen;
835
+			pthread_rwlock_unlock(&r->lock);
839 836
 
840 837
 			if (send_reset) {
841 838
 				send_reset = 0;
@@ -848,8 +845,13 @@ void * proxy_ts_stream(void *self) {
848 845
 			}
849 846
 		}
850 847
 		LOGf("DEBUG: fdread timeout restreamer srv_fd: %i | Channel: %s\n", r->sock, r->channel->name);
848
+		proxy_set_status(r, "ERROR: Read timeout");
851 849
 RECONNECT:
850
+		pthread_rwlock_wrlock(&r->lock);
851
+		r->conn_ts = 0;
852
+		pthread_rwlock_unlock(&r->lock);
852 853
 		LOGf("DEBUG: reconnect srv_fd: %i | Channel: %s\n", r->sock, r->channel->name);
854
+		proxy_set_status(r, "Reconnecting");
853 855
 		shutdown_fd(&(r->sock));
854 856
 		next_channel_source(r->channel);
855 857
 		continue;
@@ -900,7 +902,9 @@ void set_ident(char *new_ident, struct config *cfg) {
900 902
 
901 903
 void parse_options(int argc, char **argv, struct config *cfg) {
902 904
 	int j, ttl;
903
-	while ((j = getopt(argc, argv, "i:c:d:t:o:l:L:RHh")) != -1) {
905
+	cfg->server_socket = -1;
906
+	pthread_mutex_init(&cfg->channels_lock, NULL);
907
+	while ((j = getopt(argc, argv, "i:b:p:c:d:t:o:l:L:RHh")) != -1) {
904 908
 		switch (j) {
905 909
 			case 'b':
906 910
 				cfg->server_addr = optarg;
@@ -973,9 +977,12 @@ void parse_options(int argc, char **argv, struct config *cfg) {
973 977
 	} else {
974 978
 		printf("\tDo not daemonize.\n");
975 979
 	}
976
-
977
-	if (cfg->server_port)
980
+	if (cfg->server_port) {
978 981
 		init_server_socket(cfg->server_addr, cfg->server_port, &cfg->server, &cfg->server_socket);
982
+		printf("\tStarting web srv  : http://%s:%d/status (sock: %d)\n", cfg->server_addr, cfg->server_port, cfg->server_socket);
983
+	} else {
984
+		printf("\tNo web server\n");
985
+	}
979 986
 }
980 987
 
981 988
 void init_vars(struct config *cfg) {
@@ -1054,6 +1061,10 @@ void signal_quit(int sig) {
1054 1061
 	raise(sig);
1055 1062
 }
1056 1063
 
1064
+struct config *get_config(void) {
1065
+	return &config;
1066
+}
1067
+
1057 1068
 void do_reconnect() {
1058 1069
 	LNODE *l, *tmp;
1059 1070
 	list_lock(config.restreamer);

+ 81
- 2
web_pages.c View File

@@ -20,6 +20,7 @@
20 20
 #include <sys/types.h>
21 21
 #include <sys/stat.h>
22 22
 #include <fcntl.h>
23
+#include <ctype.h>
23 24
 #include <unistd.h>
24 25
 
25 26
 #include "libfuncs/io.h"
@@ -37,9 +38,87 @@ void cmd_index(int clientsock) {
37 38
 	fdputs(clientsock, "\nHi from tomcast.\n");
38 39
 }
39 40
 
41
+void cmd_status(int clientsock) {
42
+	send_200_ok(clientsock);
43
+	send_header_textplain(clientsock);
44
+	fdputs(clientsock, "\n");
45
+
46
+	LNODE *l, *tmp;
47
+	struct config *cfg = get_config();
48
+
49
+	time_t now = time(NULL);
50
+	fdputsf(clientsock, "%-10s %-20s %8s %10s %-18s %-64s %s\n",
51
+		"# Status",
52
+		"DestAddr",
53
+		"ConnTime",
54
+		"Bytes",
55
+		"Name",
56
+		"Source",
57
+		"Proxy status"
58
+	);
59
+	pthread_mutex_lock(&cfg->channels_lock);
60
+	list_lock(cfg->restreamer);
61
+	list_for_each(cfg->restreamer, l, tmp) {
62
+		char dest[32];
63
+		RESTREAMER *r = l->data;
64
+		pthread_rwlock_rdlock(&r->lock);
65
+		snprintf(dest, sizeof(dest), "%s:%d", r->channel->dest_host, r->channel->dest_port);
66
+		fdputsf(clientsock, "%-10s %-20s %8lu %10llu %-18s %-64s %s\n",
67
+			r->connected ? "CONN_OK" : "CONN_ERROR",
68
+			dest,
69
+			r->conn_ts ? now - r->conn_ts : 0,
70
+			r->read_bytes,
71
+			r->channel->name,
72
+			r->channel->source,
73
+			r->status
74
+		);
75
+		pthread_rwlock_unlock(&r->lock);
76
+	}
77
+	list_unlock(cfg->restreamer);
78
+	pthread_mutex_unlock(&cfg->channels_lock);
79
+}
80
+
81
+void cmd_getconfig(int clientsock) {
82
+	send_200_ok(clientsock);
83
+	send_header_textplain(clientsock);
84
+	fdputs(clientsock, "\n");
85
+
86
+	LNODE *l, *tmp;
87
+	struct config *cfg = get_config();
88
+
89
+	pthread_mutex_lock(&cfg->channels_lock);
90
+	list_lock(cfg->restreamer);
91
+	list_for_each(cfg->restreamer, l, tmp) {
92
+		RESTREAMER *r = l->data;
93
+		pthread_rwlock_rdlock(&r->lock);
94
+		int i;
95
+		for (i = 0; i < r->channel->num_src; i++) {
96
+			fdputsf(clientsock, "%s\t%s:%d\t%s\n",
97
+				r->channel->name,
98
+				r->channel->dest_host,
99
+				r->channel->dest_port,
100
+				r->channel->sources[i]
101
+			);
102
+		}
103
+		pthread_rwlock_unlock(&r->lock);
104
+	}
105
+	list_unlock(cfg->restreamer);
106
+	pthread_mutex_unlock(&cfg->channels_lock);
107
+}
108
+
40 109
 void cmd_reconnect(int clientsock) {
41 110
 	send_200_ok(clientsock);
42 111
 	send_header_textplain(clientsock);
43
-	fdputsf(clientsock, "\nReconnecting %d inputs.\n", 123);
44
-//	fdputsf(clientsock, "\nReconnecting %d inputs.\n", config->inputs->items);
112
+	struct config *cfg = get_config();
113
+	pthread_mutex_lock(&cfg->channels_lock);
114
+	fdputsf(clientsock, "\nReconnecting %d inputs.\n", cfg->chanconf->items);
115
+	pthread_mutex_unlock(&cfg->channels_lock);
116
+	do_reconnect();
117
+}
118
+
119
+void cmd_reload(int clientsock) {
120
+	send_200_ok(clientsock);
121
+	send_header_textplain(clientsock);
122
+	fdputs(clientsock, "\nReloading config\n");
123
+	do_reconf();
45 124
 }

+ 3
- 0
web_pages.h View File

@@ -19,6 +19,9 @@
19 19
 #define WEB_PAGES_H
20 20
 
21 21
 void cmd_index(int clientsock);
22
+void cmd_status(int clientsock);
23
+void cmd_getconfig(int clientsock);
22 24
 void cmd_reconnect(int clientsock);
25
+void cmd_reload(int clientsock);
23 26
 
24 27
 #endif

+ 6
- 0
web_server.c View File

@@ -128,8 +128,14 @@ void *process_web_request(void *in_req) {
128 128
 
129 129
 	if (strlen(path) == 0) {
130 130
 		cmd_index(clientsock);
131
+	} else if (strstr(path,"getconfig")==path) {
132
+		cmd_getconfig(clientsock);
131 133
 	} else if (strstr(path,"reconnect")==path) {
132 134
 		cmd_reconnect(clientsock);
135
+	} else if (strstr(path,"reload")==path) {
136
+		cmd_reload(clientsock);
137
+	} else if (strstr(path,"status")==path) {
138
+		cmd_status(clientsock);
133 139
 	} else {
134 140
 		send_404_not_found(clientsock);
135 141
 	}

Loading…
Cancel
Save