Skip to content

Commit 897bc0d

Browse files
committed
Stop threads after packets finish.
1 parent b41be45 commit 897bc0d

11 files changed

+114
-66
lines changed

.gitignore

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
*.a
22
*.dblite
33
lib/*
4-
*.txt
4+
*.txt
5+
*.o
6+
*~

CHANGES

Whitespace-only changes.

COPYING

Whitespace-only changes.

INSTALL

Whitespace-only changes.

TODOS

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
TODO List
2+
------------
3+
4+
1. Add tcpdump filter to packet preprocessing method.
5+
6+
2. Optimize flow reordering methods.

src/SConscript

+20-20
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
SConscript(['./flow/SConscript',
2-
'./http/SConscript',
3-
'./packet/SConscript',
4-
'./tcp/SConscript'])
2+
'./http/SConscript',
3+
'./packet/SConscript',
4+
'./tcp/SConscript'])
55

66
env = Environment(CCFLAGS='-w')
77
# Library paths
@@ -14,24 +14,24 @@ cpp_path=['../include']
1414
# Flag debug to decide if NFM libs are used to compile program
1515
debug = ARGUMENTS.get('debug', 0)
1616
if int(debug):
17-
# NFM library names
18-
nfm_libs = ['nfm', 'nfm_framework', 'nfm_error', 'nfm_packet', 'nfm_rules', 'nfm_platform', 'nfe', 'nfp']
19-
libs += nfm_libs
20-
# NFM library path
21-
lib_path.append('/opt/netronome/lib')
22-
# NFM header file path
23-
cpp_path.append('/opt/netronome/nfm/include')
17+
# NFM library names
18+
nfm_libs = ['nfm', 'nfm_framework', 'nfm_error', 'nfm_packet', 'nfm_rules', 'nfm_platform', 'nfe', 'nfp']
19+
libs += nfm_libs
20+
# NFM library path
21+
lib_path.append('/opt/netronome/lib')
22+
# NFM header file path
23+
cpp_path.append('/opt/netronome/nfm/include')
2424

2525
# Compile the programs
26-
env.Program(target = '../bin/weblogger',
27-
source = Glob('*.c'),
28-
LIBPATH = lib_path,
29-
LIBS = libs,
30-
CPPPATH = cpp_path)
26+
env.Program(target = '../bin/http-sniffer',
27+
source = Glob('*.c'),
28+
LIBPATH = lib_path,
29+
LIBS = libs,
30+
CPPPATH = cpp_path)
3131

3232
env.Program(target = '../bin/tracedump',
33-
source = './tracedump/tracedump.c',
34-
LIBS = libs,
35-
LIBPATH = lib_path,
36-
CPPPATH = cpp_path)
37-
33+
source = './tracedump/tracedump.c',
34+
LIBS = libs,
35+
LIBPATH = lib_path,
36+
CPPPATH = cpp_path)
37+

src/capture.c

+20-1
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ capture_main(const char* interface, void (*pkt_handler)(void*))
154154
struct pcap_pkthdr pkthdr;
155155
packet_t *packet = NULL;
156156

157+
printf("Online mode ...\n");
157158
cap = pcap_open_live(interface, 65535, 0, 1000, errbuf);
158159
if( cap == NULL){
159160
printf("%s\n",errbuf);
@@ -170,6 +171,10 @@ capture_main(const char* interface, void (*pkt_handler)(void*))
170171
}
171172
pkt_handler(packet);
172173
}
174+
175+
if( cap != NULL){
176+
pcap_close(cap);
177+
}
173178
return 0;
174179
}
175180

@@ -183,22 +188,36 @@ capture_offline(const char* filename, void (*pkt_handler)(void*))
183188
pcap_t *cap = NULL;
184189
struct pcap_pkthdr pkthdr;
185190
packet_t *packet = NULL;
191+
extern int GP_CAP_FIN;
186192

193+
// Open handler
194+
printf("Offline mode ...\n");
187195
cap = pcap_open_offline(filename, errbuf);
188196
if( cap == NULL){
189197
printf("%s\n",errbuf);
190198
exit(1);
191199
}
200+
201+
// Run the loop
192202
while(1){
193203
raw = pcap_next(cap, &pkthdr);
194204
if( raw == NULL){
195-
continue;
205+
// No more packets?
206+
GP_CAP_FIN = 1;
207+
break;
196208
}
197209
packet = packet_preprocess(raw, &pkthdr);
198210
if( NULL == packet ){
199211
continue;
200212
}
213+
// Handle the packet
201214
pkt_handler(packet);
202215
}
216+
217+
// Close the handler
218+
if( cap != NULL){
219+
printf("Close handler!\n");
220+
pcap_close(cap);
221+
}
203222
return 0;
204223
}

src/flow/flow.c

+2-1
Original file line numberDiff line numberDiff line change
@@ -834,7 +834,8 @@ flow_dump_file_json(const flow_t *flow, const char* file)
834834
struct tm *timeinfo = NULL;
835835
time( &raw_time );
836836
timeinfo = localtime( &raw_time );
837-
strftime(time_buf, sizeof(time_buf), "%Y%m%d %H:%M:%S", timeinfo);
837+
// ISO 8601 time format
838+
strftime(time_buf, sizeof(time_buf), "%Y-%m-%dT%H:%M:%S", timeinfo);
838839
json_object_object_add(new_flow, "t_r", json_object_new_string(time_buf));
839840
/*Convert IP addr */
840841
char *saddr = malloc(sizeof("aaa.bbb.ccc.ddd"));

src/flow/hash_table.c

+3-3
Original file line numberDiff line numberDiff line change
@@ -244,12 +244,12 @@ flow_hash_scnt(void)
244244
}
245245

246246
/*
247-
* Close a flow forcedly if the delta time is lower than timeslot.
247+
* Close a flow forcedly if the delta time is lower than timeout.
248248
* Then add the flow to flow queue.
249249
* Return the number of flows deleted forcedly.
250250
*/
251251
int
252-
flow_scrubber(const int timeslot)
252+
flow_scrubber(const int timeout)
253253
{
254254
int i = 0;
255255
unsigned long delta = 0;
@@ -268,7 +268,7 @@ flow_scrubber(const int timeslot)
268268
gettimeofday(&tv, &tz);
269269
delta = abs(tv.tv_sec - flow->last_action_sec);
270270

271-
if (delta > timeslot){
271+
if (delta > timeout){
272272
num++;
273273
flow->close = FORCED_CLOSE; // Close flow forcedly.
274274
flow_queue_enq(flow_hash_delete(flow));

src/jobs.c

+53-36
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
#include "packet.h"
99
#include "flow.h"
1010

11+
extern int GP_CAP_FIN;
12+
1113
/* Fetch a packet from packet queue and add it to any flow. */
1214
int
1315
process_packet_queue(void)
@@ -16,45 +18,34 @@ process_packet_queue(void)
1618
while(1)
1719
{
1820
pkt = packet_queue_deq();
19-
if (pkt != NULL)
20-
{
21+
if (pkt != NULL){
2122
flow_hash_add_packet(pkt);
2223
continue;
24+
} else if ( GP_CAP_FIN == 1 ){
25+
// job done!
26+
break;
2327
}
2428
}
2529
pthread_exit("Packet processing finished.\n");
2630
return 0;
2731
}
2832

29-
/* Scrub flow hash table to forcely close dead flows.*/
30-
void
31-
scrubbing_flow_htbl(void)
32-
{
33-
int num = 0;
34-
while(1){
35-
sleep(1);
36-
num = flow_scrubber(60); /* seconds */
37-
}
38-
pthread_exit("Flow processing finished.\n");
39-
return 0;
40-
}
41-
42-
char*
43-
gen_filename(void){
44-
char file_name_buf[64];
45-
memset(file_name_buf, 0, sizeof(file_name_buf));
33+
// char*
34+
// gen_filename(void){
35+
// char file_name_buf[64];
36+
// memset(file_name_buf, 0, sizeof(file_name_buf));
4637

47-
time_t raw_time;
48-
struct tm *timeinfo = NULL;
49-
char time_buf[20];
50-
memset(time_buf, 0, sizeof(time_buf));
51-
time( &raw_time );
52-
timeinfo = localtime( &raw_time );
53-
strftime(time_buf, sizeof(time_buf), "%Y%m%d%H", timeinfo);
54-
strcat(file_name_buf, time_buf);
55-
strcat(file_name_buf, ".txt");
56-
return file_name_buf;
57-
}
38+
// time_t raw_time;
39+
// struct tm *timeinfo = NULL;
40+
// char time_buf[20];
41+
// memset(time_buf, 0, sizeof(time_buf));
42+
// time( &raw_time );
43+
// timeinfo = localtime( &raw_time );
44+
// strftime(time_buf, sizeof(time_buf), "%Y%m%d%H", timeinfo);
45+
// strcat(file_name_buf, time_buf);
46+
// strcat(file_name_buf, ".txt");
47+
// return file_name_buf;
48+
// }
5849

5950
/* Fetch a flow from flow queue and process it */
6051
int
@@ -68,6 +59,7 @@ process_flow_queue(const char* dump_file)
6859
if (dump_file != NULL){
6960
file_name = dump_file;
7061
}
62+
7163
flow = flow_queue_deq();
7264
if(flow != NULL){
7365
flow_extract_http(flow);
@@ -90,24 +82,49 @@ process_flow_queue(const char* dump_file)
9082
flow_print(flow);
9183
flow_free(flow);
9284
continue;
85+
} else if ( GP_CAP_FIN == 1 ) {
86+
// job done!
87+
break;
9388
}
9489
}
9590
pthread_exit("Flow processing finished.\n");
9691
return 0;
9792
}
9893

94+
95+
/* Scrub flow hash table to forcely close dead flows.*/
96+
void
97+
scrubbing_flow_htbl(void)
98+
{
99+
int num = 0;
100+
while(1){
101+
sleep(1);
102+
if ( GP_CAP_FIN == 0 ){
103+
num = flow_scrubber(60*10); // seconds, flow timeout
104+
} else {
105+
num = flow_scrubber(-1); // all flows
106+
break;
107+
}
108+
}
109+
pthread_exit(NULL);
110+
return 0;
111+
}
112+
113+
99114
/* for debugging */
100115
void
101116
debugging_print(void)
102117
{
103118
while(1)
104119
{
105-
sleep(3);
106-
printf("...................................\n");
107-
packet_queue_print();
108-
flow_hash_print();
109-
flow_queue_print();
120+
if ( GP_CAP_FIN == 1 ) break;
121+
else {
122+
//packet_queue_print();
123+
flow_hash_print();
124+
//flow_queue_print();
125+
sleep(1);
126+
}
110127
}
111-
pthread_exit("Flow processing finished.\n");
128+
pthread_exit(NULL);
112129
return 0;
113130
}

src/main.c

+7-4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ print_usage(const char* pro_name)
1818
printf(" or %s -f tracefile [-o dumpfile]\n", pro_name);
1919
}
2020

21+
int GP_CAP_FIN = 0;
2122
int main(int argc, char *argv[]){
2223
char* interface = NULL;
2324
char* dumpfile = NULL;
@@ -61,18 +62,20 @@ int main(int argc, char *argv[]){
6162
void *thread_result;
6263
packet_queue_init(); /* Initialize packet queue */
6364
flow_init(); /* Initialize flow queue and hashtable */
65+
// Start backend jobs defined in jobs.c
6466
pthread_create(&j_pkt_q, NULL, (void*)process_packet_queue, NULL);
65-
pthread_create(&j_debug_p, NULL, (void*)debugging_print, NULL);
66-
pthread_create(&j_flow_q, NULL, (void*)process_flow_queue, dumpfile);
6767
pthread_create(&j_scrb_htbl, NULL, (void*)scrubbing_flow_htbl, NULL);
68-
68+
pthread_create(&j_flow_q, NULL, (void*)process_flow_queue, dumpfile);
69+
// For debugging
70+
pthread_create(&j_debug_p, NULL, (void*)debugging_print, NULL);
71+
// Start main capture in live or offline mode
6972
if (interface != NULL){
7073
capture_main(interface, packet_queue_enq);
7174
}else //tracefile != NULL
7275
{
7376
capture_offline(tracefile, packet_queue_enq);
7477
}
75-
78+
// Wait the threads to end
7679
pthread_join(j_pkt_q, &thread_result);
7780
pthread_join(j_debug_p, &thread_result);
7881
pthread_join(j_flow_q, &thread_result);

0 commit comments

Comments
 (0)