Subversion Repositories ais_server

Rev

Go to most recent revision | Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
3 mjames 1
//
2
// server.c -- a stream socket server demo
3
// also includes pthreads management
4
//
5
#include <stdio.h>
6
#include <stdlib.h>
7
#include <unistd.h>
8
#include <errno.h>
9
#include <string.h>
10
#include <sys/types.h>
11
#include <sys/socket.h>
12
#include <netinet/in.h>
13
#include <netdb.h>
14
#include <arpa/inet.h>
15
#include <sys/wait.h>
16
#include <signal.h>
17
#include <unistd.h>
18
#include <fcntl.h>
19
 
20
 
21
#include <termios.h>
22
#include <errno.h>
23
 
24
#include <pthread.h>
25
 
26
/* Debugging */
27
 
28
#include <sys/time.h>
29
#include <time.h>
30
 
31
#include <syslog.h>
32
 
33
#include <sys/poll.h>
34
 
35
FILE * debug_file;
36
 
37
typedef struct Thread
38
{
39
        struct    Thread * next;
40
        int       fd;
41
        pthread_t thread;
42
        char threadname[256];
43
} sThread;
44
 
45
volatile sThread * head;
46
volatile sThread * tail;
47
 
48
 
49
pthread_mutex_t reaper_mutex = PTHREAD_MUTEX_INITIALIZER;
50
pthread_mutex_t mutex   = PTHREAD_MUTEX_INITIALIZER;
51
pthread_cond_t cond     = PTHREAD_COND_INITIALIZER; // a condition for all threads to wait on
52
 
53
 
54
#define PORT "2223" // the port users will be connecting to
55
#define BACKLOG 20 // how many pending connections queue will hold
56
 
57
 
58
#define MAXCHARS 512
59
#define BUFFCNT 40
60
 
61
// serial buffering structure
62
typedef struct
63
{
64
        char buff[MAXCHARS];
65
        int  len;
66
} tBuf;
67
 
68
volatile tBuf buffers[BUFFCNT];
69
volatile int  current = 0 ;
70
 
71
 
72
 
73
static int UART_InitialisePort(const char* port, speed_t speed)
74
{
75
        int handle;
76
        struct termios newtio;
77
 
78
 
79
/* Cygwin may  fail here because you cant open the port multiple times despite opening O_NONBLOCK */
80
        handle = open(port, O_RDWR | O_NOCTTY);      /* Try user input depending on port */
81
        if (handle < 0)
82
        {
83
                perror("UART_InitialisePort");
84
                return -1;
85
        }
86
 
87
        memset(&newtio, 0, sizeof(newtio));
88
 
89
        /*
90
        * Set the baudrate to speed (init 9600), Parity to NONE (default), Data bits to 8
91
        */
92
 
93
        newtio.c_cflag = speed;
94
        newtio.c_cflag |= CS8;    /* Eight bits            */
95
 
96
/* cygwin looks in c_ispeed and c_ospeed for the speed setting so do that
97
   as well */
98
        #ifdef HAVE_C_IOSPEED
99
        newtio.c_ispeed = speed;
100
        newtio.c_ospeed = speed;
101
        #endif
102
 
103
        /*
104
        * Setting Raw Input and Defaults
105
        */
106
        newtio.c_cflag |= CREAD|HUPCL|CLOCAL;
107
        newtio.c_cflag &= ~CRTSCTS;  // no flow control 
108
        newtio.c_iflag |= IGNBRK|IGNPAR;
109
        newtio.c_lflag  = 0;
110
        newtio.c_oflag  = 0;
111
 
112
        /* Non-blocking serial port operation. Change c_cc[VTIME] to 1 if
113
        your platform has a broken tty implementation. */
114
        newtio.c_cc[VMIN]  = 0;
115
        newtio.c_cc[VTIME] = 0;
116
 
117
        if (tcflush(handle, TCIFLUSH) < 0)            /* Flush the serial port */
118
        {
119
                close(handle);
120
                return -1;
121
        }
122
 
123
        if (tcsetattr(handle, TCSANOW, &newtio) < 0)  /* Set the parameters */
124
        {
125
                close(handle);
126
                return -1;
127
        }
128
        return handle;
129
}
130
 
131
static int serial_blocking(int handle, int block)
132
{
133
        struct termios tio;
134
        static int current=0; /* Port is opened non-blocking */
135
        if (current==block) return 0;
136
        current=block;
137
 
138
 
139
        tcgetattr(handle,&tio);
140
 
141
        tio.c_cc[VMIN] = (block)?1:0;
142
        tio.c_cc[VTIME] = 0;
143
 
144
        tcsetattr(handle, TCSANOW, &tio);
145
 
146
        return 0;
147
}
148
 
149
 
150
 
151
static int serial_speed(int handle, speed_t speed)
152
{
153
        struct termios options;
154
 
155
        /* Get the current options for the port... */
156
        if(tcgetattr(handle, &options) != 0)
157
                return -1;
158
 
159
        cfsetospeed(&options, speed);
160
 
161
        /* flush buffers */
162
        tcflush(handle, TCIFLUSH);
163
 
164
        /* Set the new options for the port... */
165
        if( tcsetattr(handle, TCSANOW, &options) != 0)
166
                return -1;
167
 
168
        return 0;
169
}
170
 
171
 
172
 
173
 
174
 
175
 
176
 
177
 
178
static speed_t translate_baudrate(unsigned long baudrate)
179
{
180
        speed_t BaudRate=0;
181
        switch(baudrate)
182
        {
183
                case    300 : BaudRate=B300; break;
184
                case   1200 : BaudRate=B1200; break;
185
                case   2400 : BaudRate=B2400; break;
186
                case   4800 : BaudRate=B4800; break;
187
                case   9600 : BaudRate=B9600; break;
188
                case  19200 : BaudRate=B19200; break;
189
                case  38400 : BaudRate=B38400; break;
190
                case  57600 : BaudRate=B57600; break;
191
                case 115200 : BaudRate=B115200; break;
192
                case 230400 : BaudRate=B230400; break;
193
/* MDJ change for cygwin */
194
#if defined B460800
195
                case 460800 : BaudRate=B460800; break;
196
#endif
197
                default :
198
                        printf("Bad baudrate (%ld)\n",baudrate);
199
                        BaudRate=B9600;
200
                        break;
201
        }
202
        return BaudRate;
203
}
204
 
205
void  * serial_thread(void* arg)
206
{
207
        int speed=38400;
208
        int handle;
209
        int hadCR = 1;
210
        struct timeval tv = { 0,0 };
211
 
212
        struct timeval t0 = { 0, 0};
213
        char  buff1[MAXCHARS];
214
        gettimeofday(&t0,NULL);
215
// this uses a UDEV rule e.g. 
216
//
217
// (PL2303) SUBSYSTEM=="tty", ATTRS{idVendor}=="067b", ATTRS{idProduct}=="2303", SYMLINK+="ttyAIS"
218
// 340 chinese device is different 
219
        handle = UART_InitialisePort("/dev/ttyAIS",translate_baudrate(speed));
220
        serial_blocking(handle,1);
221
        while(handle>=0)
222
        {
223
                int i,j;
224
                int chars;
225
                int offset;
226
                char * where;
227
                chars = read(handle, buff1, MAXCHARS);
228
                j=0;
229
                while(j<chars) {
230
// tag the buffer being sent 
231
                        if (hadCR) {
232
                                gettimeofday(&tv,NULL);
233
                                where = &buffers[current].buff[0];
234
                                offset=sprintf(where,
235
                                        "[%d.%02d]\r\n",
236
                                        tv.tv_sec-t0.tv_sec,tv.tv_usec/10000);                 
237
                                hadCR=0;
238
                        }
239
                        where[offset]=buff1[j];
240
 
241
 
242
                        if(where[offset]==0x0a)
243
                                hadCR=1;
244
                        j++;offset++;
245
 
246
                        if(hadCR || offset == MAXCHARS-1)
247
                                {
248
                                buffers[current].len = offset;
249
                                current = (current + 1)% BUFFCNT;
250
                                pthread_cond_broadcast(&cond);
251
                        }
252
                }
253
 
254
        }
255
        printf("done\n");
256
 
257
        close(handle);
258
        return( NULL);
259
 
260
 
261
}
262
 
263
 
264
struct timespec timeWait =
265
{
266
.tv_sec = 0,
267
.tv_nsec= 0,
268
};
269
 
270
 
271
char conn_msg[] = "AIS server\r\n";
272
 
273
 
274
void * telnet_thread(void * ptr)
275
{
276
        sThread * thread = (sThread *) ptr;
277
 
278
        int local_count = current;
279
        int written;
280
        char temp_buff[1500];
281
        struct timespec timeNow;
282
 
283
//      printf("in thread ptr %p fd %d\n",thread,thread->fd);
284
        written=send(thread->fd,conn_msg,strlen(conn_msg),
285
                MSG_NOSIGNAL| MSG_DONTWAIT); // lose SIGPIPE error
286
        while(1)
287
        {
288
 
289
                pthread_mutex_lock(&mutex);
290
 
291
                while(local_count == current) {
292
                        int rc;
293
// this is wait until , which is why we took the time and incremented it 
294
                        do {
295
                                gettimeofday(&timeNow,NULL);
296
                                timeNow.tv_sec ++;
297
                                rc = pthread_cond_timedwait(&cond,&mutex,&timeNow);
298
 
299
                                if(rc==ETIMEDOUT) {
300
                                        int chars;
301
                                        struct pollfd p;
302
 
303
                                        p.fd = thread->fd;
304
                                        p.events = POLLIN;
305
                                        p.revents= 0;
306
                                        poll(&p,1,0);
307
// poll any input stuff and discard 
308
                                        if(p.revents & (POLLHUP||POLLERR)) {
309
                                                close(thread->fd);
310
                                                thread->fd = -1;
311
                                                pthread_mutex_unlock(&mutex);
312
                                                return(NULL);
313
                                                }
314
                                        if(p.revents & POLLIN) {
315
                                        chars=recv(thread->fd,temp_buff,1500,0);
316
                                        if (chars) {
317
 
318
// dont log crap recieved
319
//                                              syslog(LOG_INFO,"read %d from %s\n",chars,thread->threadname);  
320
                                                }
321
                                        else {
322
                                                close(thread->fd);
323
                                                thread->fd =-1;
324
                                                pthread_mutex_unlock(&mutex);
325
                                                syslog(LOG_INFO,"Closed from %s\n",thread->threadname);
326
                                                return(NULL);
327
                                                }
328
                                        }
329
                                        }
330
                                }
331
 
332
                        while (rc==ETIMEDOUT);
333
                        }
334
 
335
                pthread_mutex_unlock(&mutex);
336
 
337
                while(local_count != current)
338
                {      
339
                written=send(thread->fd,buffers[local_count].buff,
340
                        buffers[local_count].len,MSG_NOSIGNAL| MSG_DONTWAIT); // lose SIGPIPE error
341
 
342
                if(errno==EAGAIN)
343
                        break;
344
 
345
 
346
                if(written<=0)
347
                        syslog(LOG_INFO,"error: write %d : err  %s on %s\n",written,strerror(errno),thread->threadname);
348
 
349
 
350
                // this could be EAGAIN which is 'network congestion' so dont waste time queueing up packets 
351
                if (written<=0 && errno != EAGAIN)
352
                {      
353
 
354
                        close(thread->fd);
355
                        thread->fd = -1;
356
                        return( NULL);
357
 
358
 
359
                }
360
                if(errno!=EAGAIN && written>0)
361
                        local_count = (local_count +1) % BUFFCNT;
362
                }
363
 
364
        }
365
        return(NULL);
366
 
367
}
368
 
369
// this allocates storage for a thread and starts it
370
void * open_thread(int fd,char * name)
371
{
372
        sThread * newthread;
373
 
374
        newthread = calloc(1,sizeof(sThread));
375
//      printf("in open_thread %p\n",newthread);
376
        pthread_mutex_lock(&reaper_mutex);
377
        if(!head)
378
        {
379
                head=newthread;
380
        }
381
        if(tail)
382
        {
383
                tail->next = newthread;
384
        }
385
        tail=newthread;
386
        tail->next = NULL;
387
        pthread_mutex_unlock(&reaper_mutex);
388
        strcpy(newthread->threadname,name);
389
        newthread->fd = fd;
390
        pthread_create(&newthread->thread,NULL,telnet_thread,newthread);
391
//      pthread_detach(newthread->thread);
392
}
393
 
394
 
395
 
396
void * reaper_thread(void * arg)
397
{
398
        sThread * curr;
399
        sThread * prev;
400
        sThread * next;
401
        int threads= 0;
402
        int reaped = 0;
403
        while(1)
404
        {
405
//              printf("reaper thread checking\n");
406
                curr = head;
407
                prev = NULL;
408
                threads=0;
409
                reaped =0;
410
 
411
                while (curr)
412
                {
413
                        pthread_mutex_lock(&reaper_mutex);
414
                        next = curr->next;
415
                        if(curr->fd==-1)
416
                        {      
417
                                void * status;
418
                                pthread_join(curr->thread,&status);
419
 
420
                                syslog(LOG_INFO,"Disconnection :%s \n",curr->threadname);
421
                                if(prev)
422
                                        prev->next = next;     
423
                                if(curr==head)
424
                                        head=next;
425
                                if(curr==tail && prev)
426
                                        tail=prev;
427
                                free(curr);
428
                                curr=next;
429
                                reaped ++;
430
                        }
431
                        else
432
                        {
433
                                prev=curr;
434
                                curr=next;
435
                        }
436
                        pthread_mutex_unlock(&reaper_mutex);
437
                        threads++;
438
 
439
                }
440
        if(reaped)
441
                syslog(LOG_INFO,"Reaper checked %d threads, %d reaped\n",threads,reaped);
442
        sleep(1);
443
        }      
444
        return NULL;           
445
}
446
 
447
 
448
void sigchld_handler(int s)
449
{
450
        while(waitpid(-1, NULL, WNOHANG) > 0);
451
}
452
 
453
// get sockaddr, IPv4 or IPv6:
454
void *get_in_addr(struct sockaddr *sa)
455
{
456
        if (sa->sa_family == AF_INET) {
457
                return &(((struct sockaddr_in*)sa)->sin_addr);
458
        }
459
        return &(((struct sockaddr_in6*)sa)->sin6_addr);
460
}
461
 
462
int main(int argc,char * argv[])
463
{
464
        int sockfd, new_fd; // listen on sock_fd, new connection on new_fd
465
        struct addrinfo hints, *servinfo, *p;
466
        struct sockaddr_storage their_addr; // connector's address information
467
        socklen_t sin_size;
468
        struct sigaction sa;
469
        int yes=1;
470
        char s[INET6_ADDRSTRLEN];
471
        int rv;
472
 
473
        int serial_fd;
474
        int telnet_fds;
475
        pthread_t serial_thread_id,reaper_thread_id;
476
 
477
 
478
 
479
        if((argc>1))
480
        {
481
// daemonize code
482
        pid_t pid;
483
        printf("daemonizing \n");
484
 
485
 
486
        umask(0);
487
 
488
        if((pid = fork()) < 0)
489
        {
490
                printf("fork: %s\n",strerror(errno));
491
        }
492
        else if(pid !=0) {
493
                exit(0); //this is the parent
494
        }
495
 
496
        setsid();
497
 
498
        if(chdir("/")<0)
499
        {      
500
                printf("chdir to log dir: %s\n",strerror(errno));
501
                exit (-1);
502
        }
503
 
504
        close(STDIN_FILENO);
505
        close(STDOUT_FILENO);
506
        close(STDERR_FILENO);
507
 
508
 
509
        }      
510
        else
511
        {
512
        printf("using interactive\n");
513
        }
514
 
515
 
516
 
517
// end daemonize code
518
 
519
 
520
        openlog("ais_server",LOG_CONS|LOG_PID,LOG_USER);
521
 
522
 
523
 
524
 
525
// setup the conditional variable and the mutex used for triggering all of the threads
526
        pthread_mutex_init(&mutex,NULL);
527
        pthread_cond_init (&cond,NULL); // default operation 
528
 
529
        pthread_create(&serial_thread_id,NULL,serial_thread,&cond); // pass cond variable to serial thread as argument 
530
 
531
        pthread_mutex_init(&reaper_mutex,NULL);
532
        pthread_create(&reaper_thread_id,NULL,reaper_thread,NULL);// reap dead threads
533
 
534
        memset(&hints, 0, sizeof hints);
535
        hints.ai_family = AF_UNSPEC;
536
        hints.ai_socktype = SOCK_STREAM;
537
        hints.ai_flags = AI_PASSIVE; // use my IP
538
        if ((rv = getaddrinfo(NULL, PORT, &hints, &servinfo)) != 0) {
539
                fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv));
540
                return 1;
541
        }
542
// loop through all the results and bind to the first we can
543
 
544
        for(p = servinfo; p != NULL; p = p->ai_next) {
545
                if ((sockfd = socket(p->ai_family, p->ai_socktype,
546
                        p->ai_protocol)) == -1) {
547
                        perror("server: socket");
548
                        continue;
549
                }
550
                if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes,
551
                        sizeof(int)) == -1) {
552
                        perror("setsockopt");
553
                        exit(1);
554
                }
555
                if (bind(sockfd, p->ai_addr, p->ai_addrlen) == -1) {
556
                        close(sockfd);
557
                        perror("server: bind");
558
                        continue;
559
                }
560
                break;
561
        }
562
        if (p == NULL) {
563
                fprintf(stderr, "server: failed to bind\n");
564
                return 2;
565
        }
566
        freeaddrinfo(servinfo); // all done with this structure
567
        if (listen(sockfd, BACKLOG) == -1) {
568
                perror("listen");
569
                exit(1);
570
        }
571
 
572
        sa.sa_handler = sigchld_handler; // reap all dead processes
573
        sigemptyset(&sa.sa_mask);
574
        sa.sa_flags = SA_RESTART;
575
        if (sigaction(SIGCHLD, &sa, NULL) == -1) {
576
                perror("sigaction");
577
                exit(1);
578
        }
579
        syslog(LOG_INFO,"server: waiting for connections...\n");
580
        while(1) { // main accept() loop
581
                sin_size = sizeof their_addr;
582
                new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &sin_size);
583
// this code is 
584
                if (new_fd == -1) {
585
                        perror("accept");
586
                        continue;
587
                }
588
                inet_ntop(their_addr.ss_family,
589
                get_in_addr((struct sockaddr *)&their_addr),
590
                s, sizeof s);
591
                syslog(LOG_INFO,"server: got connection from %s\n", s);
592
 
593
//              fcntl(new_fd,F_SETFL,O_NONBLOCK); // non blocking socket
594
 
595
                open_thread(new_fd,s);
596
 
597
        }
598
}
599