Subversion Repositories ais_server

Rev

Rev 3 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
3 mjames 1
//
2
// server.c -- a stream socket server demo
3
// also includes pthreads management
4
//
5
#include <stdio.h>
6
#include <stdlib.h>
7
#include <unistd.h>
8
#include <errno.h>
9
#include <string.h>
10
#include <sys/types.h>
11
#include <sys/socket.h>
12
#include <netinet/in.h>
13
#include <netdb.h>
14
#include <arpa/inet.h>
15
#include <sys/wait.h>
16
#include <signal.h>
17
#include <unistd.h>
18
#include <fcntl.h>
19
 
20
 
21
#include <termios.h>
22
#include <errno.h>
23
 
24
#include <pthread.h>
25
 
26
/* Debugging */
27
 
28
#include <sys/time.h>
29
#include <time.h>
30
 
31
#include <syslog.h>
32
 
33
#include <sys/poll.h>
34
 
35
FILE * debug_file;
36
 
37
typedef struct Thread
38
{
39
        struct    Thread * next;
40
        int       fd;
41
        pthread_t thread;
42
        char threadname[256];
43
} sThread;
44
 
45
volatile sThread * head;
46
volatile sThread * tail;
47
 
48
 
49
pthread_mutex_t reaper_mutex = PTHREAD_MUTEX_INITIALIZER;
50
pthread_mutex_t mutex   = PTHREAD_MUTEX_INITIALIZER;
51
pthread_cond_t cond     = PTHREAD_COND_INITIALIZER; // a condition for all threads to wait on
52
 
53
 
54
#define PORT "2223" // the port users will be connecting to
55
#define BACKLOG 20 // how many pending connections queue will hold
56
 
57
 
58
#define MAXCHARS 512
59
#define BUFFCNT 40
60
 
61
// serial buffering structure
62
typedef struct
63
{
64
        char buff[MAXCHARS];
65
        int  len;
66
} tBuf;
67
 
68
volatile tBuf buffers[BUFFCNT];
69
volatile int  current = 0 ;
70
 
71
 
72
 
73
static int UART_InitialisePort(const char* port, speed_t speed)
74
{
75
        int handle;
76
        struct termios newtio;
77
 
78
 
79
/* Cygwin may  fail here because you cant open the port multiple times despite opening O_NONBLOCK */
80
        handle = open(port, O_RDWR | O_NOCTTY);      /* Try user input depending on port */
81
        if (handle < 0)
82
        {
83
                perror("UART_InitialisePort");
84
                return -1;
85
        }
86
 
87
        memset(&newtio, 0, sizeof(newtio));
88
 
89
        /*
90
        * Set the baudrate to speed (init 9600), Parity to NONE (default), Data bits to 8
91
        */
92
 
93
        newtio.c_cflag = speed;
94
        newtio.c_cflag |= CS8;    /* Eight bits            */
95
 
96
/* cygwin looks in c_ispeed and c_ospeed for the speed setting so do that
97
   as well */
98
        #ifdef HAVE_C_IOSPEED
99
        newtio.c_ispeed = speed;
100
        newtio.c_ospeed = speed;
101
        #endif
102
 
103
        /*
104
        * Setting Raw Input and Defaults
105
        */
106
        newtio.c_cflag |= CREAD|HUPCL|CLOCAL;
107
        newtio.c_cflag &= ~CRTSCTS;  // no flow control 
108
        newtio.c_iflag |= IGNBRK|IGNPAR;
109
        newtio.c_lflag  = 0;
110
        newtio.c_oflag  = 0;
111
 
112
        /* Non-blocking serial port operation. Change c_cc[VTIME] to 1 if
113
        your platform has a broken tty implementation. */
114
        newtio.c_cc[VMIN]  = 0;
115
        newtio.c_cc[VTIME] = 0;
116
 
117
        if (tcflush(handle, TCIFLUSH) < 0)            /* Flush the serial port */
118
        {
119
                close(handle);
120
                return -1;
121
        }
122
 
123
        if (tcsetattr(handle, TCSANOW, &newtio) < 0)  /* Set the parameters */
124
        {
125
                close(handle);
126
                return -1;
127
        }
128
        return handle;
129
}
130
 
131
static int serial_blocking(int handle, int block)
132
{
133
        struct termios tio;
134
        static int current=0; /* Port is opened non-blocking */
135
        if (current==block) return 0;
136
        current=block;
137
 
138
 
139
        tcgetattr(handle,&tio);
140
 
141
        tio.c_cc[VMIN] = (block)?1:0;
142
        tio.c_cc[VTIME] = 0;
143
 
144
        tcsetattr(handle, TCSANOW, &tio);
145
 
146
        return 0;
147
}
148
 
149
 
150
 
151
static int serial_speed(int handle, speed_t speed)
152
{
153
        struct termios options;
154
 
155
        /* Get the current options for the port... */
156
        if(tcgetattr(handle, &options) != 0)
157
                return -1;
158
 
159
        cfsetospeed(&options, speed);
160
 
161
        /* flush buffers */
162
        tcflush(handle, TCIFLUSH);
163
 
164
        /* Set the new options for the port... */
165
        if( tcsetattr(handle, TCSANOW, &options) != 0)
166
                return -1;
167
 
168
        return 0;
169
}
170
 
171
 
172
 
173
 
174
 
175
 
176
 
177
 
178
static speed_t translate_baudrate(unsigned long baudrate)
179
{
180
        speed_t BaudRate=0;
181
        switch(baudrate)
182
        {
183
                case    300 : BaudRate=B300; break;
184
                case   1200 : BaudRate=B1200; break;
185
                case   2400 : BaudRate=B2400; break;
186
                case   4800 : BaudRate=B4800; break;
187
                case   9600 : BaudRate=B9600; break;
188
                case  19200 : BaudRate=B19200; break;
189
                case  38400 : BaudRate=B38400; break;
190
                case  57600 : BaudRate=B57600; break;
191
                case 115200 : BaudRate=B115200; break;
192
                case 230400 : BaudRate=B230400; break;
193
/* MDJ change for cygwin */
194
#if defined B460800
195
                case 460800 : BaudRate=B460800; break;
196
#endif
197
                default :
198
                        printf("Bad baudrate (%ld)\n",baudrate);
199
                        BaudRate=B9600;
200
                        break;
201
        }
202
        return BaudRate;
203
}
204
 
205
void  * serial_thread(void* arg)
206
{
207
        int speed=38400;
208
        int handle;
209
        int hadCR = 1;
210
        struct timeval tv = { 0,0 };
211
 
212
        struct timeval t0 = { 0, 0};
213
        char  buff1[MAXCHARS];
214
        gettimeofday(&t0,NULL);
215
// this uses a UDEV rule e.g. 
216
//
217
// (PL2303) SUBSYSTEM=="tty", ATTRS{idVendor}=="067b", ATTRS{idProduct}=="2303", SYMLINK+="ttyAIS"
218
// 340 chinese device is different 
219
        handle = UART_InitialisePort("/dev/ttyAIS",translate_baudrate(speed));
220
        serial_blocking(handle,1);
221
        while(handle>=0)
222
        {
223
                int i,j;
224
                int chars;
225
                int offset;
226
                char * where;
227
                chars = read(handle, buff1, MAXCHARS);
228
                j=0;
229
                while(j<chars) {
230
// tag the buffer being sent 
231
                        if (hadCR) {
232
                                gettimeofday(&tv,NULL);
233
                                where = &buffers[current].buff[0];
234
                                offset=sprintf(where,
235
                                        "[%d.%02d]\r\n",
236
                                        tv.tv_sec-t0.tv_sec,tv.tv_usec/10000);                 
237
                                hadCR=0;
238
                        }
239
                        where[offset]=buff1[j];
240
 
241
 
242
                        if(where[offset]==0x0a)
243
                                hadCR=1;
244
                        j++;offset++;
245
 
246
                        if(hadCR || offset == MAXCHARS-1)
247
                                {
248
                                buffers[current].len = offset;
249
                                current = (current + 1)% BUFFCNT;
250
                                pthread_cond_broadcast(&cond);
251
                        }
252
                }
253
 
254
        }
255
        printf("done\n");
256
 
257
        close(handle);
258
        return( NULL);
259
 
260
 
261
}
262
 
263
 
264
struct timespec timeWait =
265
{
266
.tv_sec = 0,
267
.tv_nsec= 0,
268
};
269
 
270
 
271
char conn_msg[] = "AIS server\r\n";
272
 
273
 
274
void * telnet_thread(void * ptr)
275
{
276
        sThread * thread = (sThread *) ptr;
277
 
278
        int local_count = current;
279
        int written;
280
        char temp_buff[1500];
281
        struct timespec timeNow;
282
 
283
//      printf("in thread ptr %p fd %d\n",thread,thread->fd);
284
        written=send(thread->fd,conn_msg,strlen(conn_msg),
285
                MSG_NOSIGNAL| MSG_DONTWAIT); // lose SIGPIPE error
286
        while(1)
287
        {
288
 
289
                pthread_mutex_lock(&mutex);
290
 
291
                while(local_count == current) {
292
                        int rc;
293
// this is wait until , which is why we took the time and incremented it 
294
                        do {
295
                                gettimeofday(&timeNow,NULL);
296
                                timeNow.tv_sec ++;
297
                                rc = pthread_cond_timedwait(&cond,&mutex,&timeNow);
298
 
299
                                if(rc==ETIMEDOUT) {
300
                                        int chars;
301
                                        struct pollfd p;
302
 
303
                                        p.fd = thread->fd;
304
                                        p.events = POLLIN;
305
                                        p.revents= 0;
306
                                        poll(&p,1,0);
307
// poll any input stuff and discard 
4 mjames 308
                                        if(p.revents & (POLLHUP|POLLERR)) {
309
                                                syslog(LOG_INFO,"HUP/Error shutdown %s\n",thread->threadname);
3 mjames 310
                                                close(thread->fd);
311
                                                thread->fd = -1;
312
                                                pthread_mutex_unlock(&mutex);
313
                                                return(NULL);
314
                                                }
315
                                        if(p.revents & POLLIN) {
4 mjames 316
                                                chars=recv(thread->fd,temp_buff,1500,MSG_DONTWAIT);
317
                                                if (chars) {
3 mjames 318
 
319
// dont log crap recieved
4 mjames 320
                                                        syslog(LOG_INFO,"read %d from %s\n",chars,thread->threadname); 
3 mjames 321
                                                }
4 mjames 322
                                                else {
323
                                                        close(thread->fd);
324
                                                        thread->fd =-1;
325
                                                        pthread_mutex_unlock(&mutex);
326
                                                        syslog(LOG_INFO,"Closed from %s\n",thread->threadname);
327
                                                        return(NULL);
3 mjames 328
                                                }
329
                                        }
330
                                        }
331
                                }
332
 
333
                        while (rc==ETIMEDOUT);
334
                        }
335
 
336
                pthread_mutex_unlock(&mutex);
337
 
338
                while(local_count != current)
339
                {      
340
                written=send(thread->fd,buffers[local_count].buff,
341
                        buffers[local_count].len,MSG_NOSIGNAL| MSG_DONTWAIT); // lose SIGPIPE error
342
 
343
                if(errno==EAGAIN)
344
                        break;
345
 
346
 
347
                if(written<=0)
348
                        syslog(LOG_INFO,"error: write %d : err  %s on %s\n",written,strerror(errno),thread->threadname);
349
 
350
 
351
                // this could be EAGAIN which is 'network congestion' so dont waste time queueing up packets 
352
                if (written<=0 && errno != EAGAIN)
353
                {      
354
 
355
                        close(thread->fd);
356
                        thread->fd = -1;
357
                        return( NULL);
358
 
359
 
360
                }
361
                if(errno!=EAGAIN && written>0)
362
                        local_count = (local_count +1) % BUFFCNT;
363
                }
4 mjames 364
 
365
                // read any incoming data and discard
366
                //size_t readCount;
367
                //char buffer[512];
368
                //do {
369
                //      readCount = recv(thread->fd,&buffer[0],sizeof(buffer),MSG_DONTWAIT);
370
                //}
371
                //while (written != 0 && errno!=EAGAIN);
3 mjames 372
        }
373
        return(NULL);
374
 
375
}
376
 
377
// this allocates storage for a thread and starts it
378
void * open_thread(int fd,char * name)
379
{
380
        sThread * newthread;
381
 
382
        newthread = calloc(1,sizeof(sThread));
383
//      printf("in open_thread %p\n",newthread);
384
        pthread_mutex_lock(&reaper_mutex);
385
        if(!head)
386
        {
387
                head=newthread;
388
        }
389
        if(tail)
390
        {
391
                tail->next = newthread;
392
        }
393
        tail=newthread;
394
        tail->next = NULL;
395
        pthread_mutex_unlock(&reaper_mutex);
396
        strcpy(newthread->threadname,name);
397
        newthread->fd = fd;
398
        pthread_create(&newthread->thread,NULL,telnet_thread,newthread);
399
//      pthread_detach(newthread->thread);
400
}
401
 
402
 
403
 
404
void * reaper_thread(void * arg)
405
{
406
        sThread * curr;
407
        sThread * prev;
408
        sThread * next;
409
        int threads= 0;
410
        int reaped = 0;
411
        while(1)
412
        {
413
//              printf("reaper thread checking\n");
414
                curr = head;
415
                prev = NULL;
416
                threads=0;
417
                reaped =0;
418
 
419
                while (curr)
420
                {
421
                        pthread_mutex_lock(&reaper_mutex);
422
                        next = curr->next;
423
                        if(curr->fd==-1)
424
                        {      
425
                                void * status;
426
                                pthread_join(curr->thread,&status);
427
 
428
                                syslog(LOG_INFO,"Disconnection :%s \n",curr->threadname);
429
                                if(prev)
430
                                        prev->next = next;     
431
                                if(curr==head)
432
                                        head=next;
433
                                if(curr==tail && prev)
434
                                        tail=prev;
435
                                free(curr);
436
                                curr=next;
437
                                reaped ++;
438
                        }
439
                        else
440
                        {
441
                                prev=curr;
442
                                curr=next;
443
                        }
444
                        pthread_mutex_unlock(&reaper_mutex);
445
                        threads++;
446
 
447
                }
448
        if(reaped)
449
                syslog(LOG_INFO,"Reaper checked %d threads, %d reaped\n",threads,reaped);
450
        sleep(1);
451
        }      
452
        return NULL;           
453
}
454
 
455
 
456
void sigchld_handler(int s)
457
{
458
        while(waitpid(-1, NULL, WNOHANG) > 0);
459
}
460
 
461
// get sockaddr, IPv4 or IPv6:
462
void *get_in_addr(struct sockaddr *sa)
463
{
464
        if (sa->sa_family == AF_INET) {
465
                return &(((struct sockaddr_in*)sa)->sin_addr);
466
        }
467
        return &(((struct sockaddr_in6*)sa)->sin6_addr);
468
}
469
 
470
int main(int argc,char * argv[])
471
{
472
        int sockfd, new_fd; // listen on sock_fd, new connection on new_fd
473
        struct addrinfo hints, *servinfo, *p;
474
        struct sockaddr_storage their_addr; // connector's address information
475
        socklen_t sin_size;
476
        struct sigaction sa;
477
        int yes=1;
478
        char s[INET6_ADDRSTRLEN];
479
        int rv;
480
 
481
        int serial_fd;
482
        int telnet_fds;
483
        pthread_t serial_thread_id,reaper_thread_id;
484
 
485
 
486
 
487
        if((argc>1))
488
        {
489
// daemonize code
490
        pid_t pid;
491
        printf("daemonizing \n");
492
 
493
 
494
        umask(0);
495
 
496
        if((pid = fork()) < 0)
497
        {
498
                printf("fork: %s\n",strerror(errno));
499
        }
500
        else if(pid !=0) {
501
                exit(0); //this is the parent
502
        }
503
 
504
        setsid();
505
 
506
        if(chdir("/")<0)
507
        {      
508
                printf("chdir to log dir: %s\n",strerror(errno));
509
                exit (-1);
510
        }
511
 
512
        close(STDIN_FILENO);
513
        close(STDOUT_FILENO);
514
        close(STDERR_FILENO);
515
 
516
 
517
        }      
518
        else
519
        {
520
        printf("using interactive\n");
521
        }
522
 
523
 
524
 
525
// end daemonize code
526
 
527
 
528
        openlog("ais_server",LOG_CONS|LOG_PID,LOG_USER);
529
 
530
 
531
 
532
 
533
// setup the conditional variable and the mutex used for triggering all of the threads
534
        pthread_mutex_init(&mutex,NULL);
535
        pthread_cond_init (&cond,NULL); // default operation 
536
 
537
        pthread_create(&serial_thread_id,NULL,serial_thread,&cond); // pass cond variable to serial thread as argument 
538
 
539
        pthread_mutex_init(&reaper_mutex,NULL);
540
        pthread_create(&reaper_thread_id,NULL,reaper_thread,NULL);// reap dead threads
541
 
542
        memset(&hints, 0, sizeof hints);
543
        hints.ai_family = AF_UNSPEC;
544
        hints.ai_socktype = SOCK_STREAM;
545
        hints.ai_flags = AI_PASSIVE; // use my IP
546
        if ((rv = getaddrinfo(NULL, PORT, &hints, &servinfo)) != 0) {
547
                fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv));
548
                return 1;
549
        }
550
// loop through all the results and bind to the first we can
551
 
552
        for(p = servinfo; p != NULL; p = p->ai_next) {
553
                if ((sockfd = socket(p->ai_family, p->ai_socktype,
554
                        p->ai_protocol)) == -1) {
555
                        perror("server: socket");
556
                        continue;
557
                }
558
                if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes,
559
                        sizeof(int)) == -1) {
560
                        perror("setsockopt");
561
                        exit(1);
562
                }
563
                if (bind(sockfd, p->ai_addr, p->ai_addrlen) == -1) {
564
                        close(sockfd);
565
                        perror("server: bind");
566
                        continue;
567
                }
568
                break;
569
        }
570
        if (p == NULL) {
571
                fprintf(stderr, "server: failed to bind\n");
572
                return 2;
573
        }
574
        freeaddrinfo(servinfo); // all done with this structure
575
        if (listen(sockfd, BACKLOG) == -1) {
576
                perror("listen");
577
                exit(1);
578
        }
579
 
580
        sa.sa_handler = sigchld_handler; // reap all dead processes
581
        sigemptyset(&sa.sa_mask);
582
        sa.sa_flags = SA_RESTART;
583
        if (sigaction(SIGCHLD, &sa, NULL) == -1) {
584
                perror("sigaction");
585
                exit(1);
586
        }
587
        syslog(LOG_INFO,"server: waiting for connections...\n");
588
        while(1) { // main accept() loop
589
                sin_size = sizeof their_addr;
590
                new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &sin_size);
591
// this code is 
592
                if (new_fd == -1) {
593
                        perror("accept");
594
                        continue;
595
                }
596
                inet_ntop(their_addr.ss_family,
597
                get_in_addr((struct sockaddr *)&their_addr),
598
                s, sizeof s);
599
                syslog(LOG_INFO,"server: got connection from %s\n", s);
600
 
601
//              fcntl(new_fd,F_SETFL,O_NONBLOCK); // non blocking socket
602
 
603
                open_thread(new_fd,s);
604
 
605
        }
606
}
607