Subversion Repositories ais_server

Rev

Rev 3 | Blame | Compare with Previous | Last modification | View Log | RSS feed

  1. //
  2. // server.c -- a stream socket server demo
  3. // also includes pthreads management
  4. //
  5. #include <stdio.h>
  6. #include <stdlib.h>
  7. #include <unistd.h>
  8. #include <errno.h>
  9. #include <string.h>
  10. #include <sys/types.h>
  11. #include <sys/socket.h>
  12. #include <netinet/in.h>
  13. #include <netdb.h>
  14. #include <arpa/inet.h>
  15. #include <sys/wait.h>
  16. #include <signal.h>
  17. #include <unistd.h>
  18. #include <fcntl.h>
  19.  
  20.  
  21. #include <termios.h>
  22. #include <errno.h>
  23.  
  24. #include <pthread.h>
  25.  
  26. /* Debugging */
  27.  
  28. #include <sys/time.h>
  29. #include <time.h>
  30.  
  31. #include <syslog.h>
  32.  
  33. #include <sys/poll.h>
  34.  
  35. FILE * debug_file;
  36.  
  37. typedef struct Thread
  38. {
  39.         struct    Thread * next;
  40.         int       fd;
  41.         pthread_t thread;
  42.         char threadname[256];
  43. } sThread;
  44.  
  45. volatile sThread * head;
  46. volatile sThread * tail;
  47.  
  48.  
  49. pthread_mutex_t reaper_mutex = PTHREAD_MUTEX_INITIALIZER;
  50. pthread_mutex_t mutex   = PTHREAD_MUTEX_INITIALIZER;
  51. pthread_cond_t cond     = PTHREAD_COND_INITIALIZER; // a condition for all threads to wait on
  52.  
  53.  
  54. #define PORT "2223" // the port users will be connecting to
  55. #define BACKLOG 20 // how many pending connections queue will hold
  56.  
  57.  
  58. #define MAXCHARS 512
  59. #define BUFFCNT 40
  60.  
  61. // serial buffering structure
  62. typedef struct
  63. {
  64.         char buff[MAXCHARS];
  65.         int  len;
  66. } tBuf;
  67.  
  68. volatile tBuf buffers[BUFFCNT];
  69. volatile int  current = 0 ;
  70.  
  71.  
  72.  
  73. static int UART_InitialisePort(const char* port, speed_t speed)
  74. {
  75.         int handle;
  76.         struct termios newtio;
  77.        
  78.  
  79. /* Cygwin may  fail here because you cant open the port multiple times despite opening O_NONBLOCK */
  80.         handle = open(port, O_RDWR | O_NOCTTY);      /* Try user input depending on port */
  81.         if (handle < 0)
  82.         {
  83.                 perror("UART_InitialisePort");
  84.                 return -1;
  85.         }
  86.        
  87.         memset(&newtio, 0, sizeof(newtio));
  88.        
  89.         /*
  90.         * Set the baudrate to speed (init 9600), Parity to NONE (default), Data bits to 8
  91.         */
  92.        
  93.         newtio.c_cflag = speed;
  94.         newtio.c_cflag |= CS8;    /* Eight bits            */
  95.  
  96. /* cygwin looks in c_ispeed and c_ospeed for the speed setting so do that
  97.    as well */
  98.         #ifdef HAVE_C_IOSPEED
  99.         newtio.c_ispeed = speed;
  100.         newtio.c_ospeed = speed;
  101.         #endif
  102.        
  103.         /*
  104.         * Setting Raw Input and Defaults
  105.         */
  106.         newtio.c_cflag |= CREAD|HUPCL|CLOCAL;
  107.         newtio.c_cflag &= ~CRTSCTS;  // no flow control
  108.         newtio.c_iflag |= IGNBRK|IGNPAR;
  109.         newtio.c_lflag  = 0;
  110.         newtio.c_oflag  = 0;
  111.        
  112.         /* Non-blocking serial port operation. Change c_cc[VTIME] to 1 if
  113.         your platform has a broken tty implementation. */
  114.         newtio.c_cc[VMIN]  = 0;
  115.         newtio.c_cc[VTIME] = 0;
  116.        
  117.         if (tcflush(handle, TCIFLUSH) < 0)            /* Flush the serial port */
  118.         {
  119.                 close(handle);
  120.                 return -1;
  121.         }
  122.        
  123.         if (tcsetattr(handle, TCSANOW, &newtio) < 0)  /* Set the parameters */
  124.         {
  125.                 close(handle);
  126.                 return -1;
  127.         }
  128.         return handle;
  129. }
  130.  
  131. static int serial_blocking(int handle, int block)
  132. {
  133.         struct termios tio;
  134.         static int current=0; /* Port is opened non-blocking */
  135.         if (current==block) return 0;
  136.         current=block;
  137.  
  138.        
  139.         tcgetattr(handle,&tio);
  140.  
  141.         tio.c_cc[VMIN] = (block)?1:0;
  142.         tio.c_cc[VTIME] = 0;
  143.        
  144.         tcsetattr(handle, TCSANOW, &tio);
  145.        
  146.         return 0;
  147. }
  148.  
  149.  
  150.  
  151. static int serial_speed(int handle, speed_t speed)
  152. {
  153.         struct termios options;
  154.  
  155.         /* Get the current options for the port... */
  156.         if(tcgetattr(handle, &options) != 0)
  157.                 return -1;
  158.        
  159.         cfsetospeed(&options, speed);
  160.        
  161.         /* flush buffers */
  162.         tcflush(handle, TCIFLUSH);
  163.        
  164.         /* Set the new options for the port... */
  165.         if( tcsetattr(handle, TCSANOW, &options) != 0)
  166.                 return -1;
  167.        
  168.         return 0;
  169. }
  170.  
  171.  
  172.  
  173.  
  174.  
  175.  
  176.  
  177.  
  178. static speed_t translate_baudrate(unsigned long baudrate)
  179. {
  180.         speed_t BaudRate=0;
  181.         switch(baudrate)
  182.         {
  183.                 case    300 : BaudRate=B300; break;
  184.                 case   1200 : BaudRate=B1200; break;
  185.                 case   2400 : BaudRate=B2400; break;
  186.                 case   4800 : BaudRate=B4800; break;
  187.                 case   9600 : BaudRate=B9600; break;
  188.                 case  19200 : BaudRate=B19200; break;
  189.                 case  38400 : BaudRate=B38400; break;
  190.                 case  57600 : BaudRate=B57600; break;
  191.                 case 115200 : BaudRate=B115200; break;
  192.                 case 230400 : BaudRate=B230400; break;
  193. /* MDJ change for cygwin */
  194. #if defined B460800
  195.                 case 460800 : BaudRate=B460800; break;
  196. #endif
  197.                 default :
  198.                         printf("Bad baudrate (%ld)\n",baudrate);
  199.                         BaudRate=B9600;
  200.                         break;
  201.         }
  202.         return BaudRate;
  203. }
  204.  
  205. void  * serial_thread(void* arg)
  206. {
  207.         int speed=38400;
  208.         int handle;
  209.         int hadCR = 1;
  210.         struct timeval tv = { 0,0 };
  211.  
  212.         struct timeval t0 = { 0, 0};
  213.         char  buff1[MAXCHARS];
  214.         gettimeofday(&t0,NULL);
  215. // this uses a UDEV rule e.g.
  216. //
  217. // (PL2303) SUBSYSTEM=="tty", ATTRS{idVendor}=="067b", ATTRS{idProduct}=="2303", SYMLINK+="ttyAIS"
  218. // 340 chinese device is different
  219.         handle = UART_InitialisePort("/dev/ttyAIS",translate_baudrate(speed));
  220.         serial_blocking(handle,1);
  221.         while(handle>=0)
  222.         {
  223.                 int i,j;
  224.                 int chars;
  225.                 int offset;
  226.                 char * where;
  227.                 chars = read(handle, buff1, MAXCHARS);
  228.                 j=0;
  229.                 while(j<chars) {
  230. // tag the buffer being sent
  231.                         if (hadCR) {
  232.                                 gettimeofday(&tv,NULL);
  233.                                 where = &buffers[current].buff[0];
  234.                                 offset=sprintf(where,
  235.                                         "[%d.%02d]\r\n",
  236.                                         tv.tv_sec-t0.tv_sec,tv.tv_usec/10000);                 
  237.                                 hadCR=0;
  238.                         }
  239.                         where[offset]=buff1[j];
  240.                
  241.  
  242.                         if(where[offset]==0x0a)
  243.                                 hadCR=1;
  244.                         j++;offset++;
  245.  
  246.                         if(hadCR || offset == MAXCHARS-1)
  247.                                 {
  248.                                 buffers[current].len = offset;
  249.                                 current = (current + 1)% BUFFCNT;
  250.                                 pthread_cond_broadcast(&cond);
  251.                         }
  252.                 }
  253.  
  254.         }
  255.         printf("done\n");
  256.  
  257.         close(handle);
  258.         return( NULL);
  259.  
  260.  
  261. }
  262.  
  263.  
  264. struct timespec timeWait =
  265. {
  266. .tv_sec = 0,
  267. .tv_nsec= 0,
  268. };
  269.  
  270.  
  271. char conn_msg[] = "AIS server\r\n";
  272.  
  273.  
  274. void * telnet_thread(void * ptr)
  275. {
  276.         sThread * thread = (sThread *) ptr;
  277.  
  278.         int local_count = current;
  279.         int written;
  280.         char temp_buff[1500];
  281.         struct timespec timeNow;
  282.  
  283. //      printf("in thread ptr %p fd %d\n",thread,thread->fd);
  284.         written=send(thread->fd,conn_msg,strlen(conn_msg),
  285.                 MSG_NOSIGNAL| MSG_DONTWAIT); // lose SIGPIPE error
  286.         while(1)
  287.         {
  288.  
  289.                 pthread_mutex_lock(&mutex);
  290.  
  291.                 while(local_count == current) {
  292.                         int rc;
  293. // this is wait until , which is why we took the time and incremented it
  294.                         do {
  295.                                 gettimeofday(&timeNow,NULL);
  296.                                 timeNow.tv_sec ++;
  297.                                 rc = pthread_cond_timedwait(&cond,&mutex,&timeNow);
  298.  
  299.                                 if(rc==ETIMEDOUT) {
  300.                                         int chars;
  301.                                         struct pollfd p;
  302.                                        
  303.                                         p.fd = thread->fd;
  304.                                         p.events = POLLIN;
  305.                                         p.revents= 0;
  306.                                         poll(&p,1,0);
  307. // poll any input stuff and discard
  308.                                         if(p.revents & (POLLHUP|POLLERR)) {
  309.                                                 syslog(LOG_INFO,"HUP/Error shutdown %s\n",thread->threadname);
  310.                                                 close(thread->fd);
  311.                                                 thread->fd = -1;
  312.                                                 pthread_mutex_unlock(&mutex);
  313.                                                 return(NULL);
  314.                                                 }
  315.                                         if(p.revents & POLLIN) {
  316.                                                 chars=recv(thread->fd,temp_buff,1500,MSG_DONTWAIT);
  317.                                                 if (chars) {
  318.  
  319. // dont log crap recieved
  320.                                                         syslog(LOG_INFO,"read %d from %s\n",chars,thread->threadname); 
  321.                                                 }
  322.                                                 else {
  323.                                                         close(thread->fd);
  324.                                                         thread->fd =-1;
  325.                                                         pthread_mutex_unlock(&mutex);
  326.                                                         syslog(LOG_INFO,"Closed from %s\n",thread->threadname);
  327.                                                         return(NULL);
  328.                                                 }
  329.                                         }
  330.                                         }
  331.                                 }
  332.  
  333.                         while (rc==ETIMEDOUT);
  334.                         }
  335.  
  336.                 pthread_mutex_unlock(&mutex);
  337.  
  338.                 while(local_count != current)
  339.                 {      
  340.                 written=send(thread->fd,buffers[local_count].buff,
  341.                         buffers[local_count].len,MSG_NOSIGNAL| MSG_DONTWAIT); // lose SIGPIPE error
  342.  
  343.                 if(errno==EAGAIN)
  344.                         break;
  345.                        
  346.  
  347.                 if(written<=0)
  348.                         syslog(LOG_INFO,"error: write %d : err  %s on %s\n",written,strerror(errno),thread->threadname);
  349.  
  350.  
  351.                 // this could be EAGAIN which is 'network congestion' so dont waste time queueing up packets
  352.                 if (written<=0 && errno != EAGAIN)
  353.                 {      
  354.                
  355.                         close(thread->fd);
  356.                         thread->fd = -1;
  357.                         return( NULL);
  358.  
  359.                
  360.                 }
  361.                 if(errno!=EAGAIN && written>0)
  362.                         local_count = (local_count +1) % BUFFCNT;
  363.                 }
  364.  
  365.                 // read any incoming data and discard
  366.                 //size_t readCount;
  367.                 //char buffer[512];
  368.                 //do {
  369.                 //      readCount = recv(thread->fd,&buffer[0],sizeof(buffer),MSG_DONTWAIT);
  370.                 //}
  371.                 //while (written != 0 && errno!=EAGAIN);
  372.         }
  373.         return(NULL);
  374.  
  375. }
  376.  
  377. // this allocates storage for a thread and starts it
  378. void * open_thread(int fd,char * name)
  379. {
  380.         sThread * newthread;
  381.  
  382.         newthread = calloc(1,sizeof(sThread));
  383. //      printf("in open_thread %p\n",newthread);
  384.         pthread_mutex_lock(&reaper_mutex);
  385.         if(!head)
  386.         {
  387.                 head=newthread;
  388.         }
  389.         if(tail)
  390.         {
  391.                 tail->next = newthread;
  392.         }
  393.         tail=newthread;
  394.         tail->next = NULL;
  395.         pthread_mutex_unlock(&reaper_mutex);
  396.         strcpy(newthread->threadname,name);
  397.         newthread->fd = fd;
  398.         pthread_create(&newthread->thread,NULL,telnet_thread,newthread);
  399. //      pthread_detach(newthread->thread);
  400. }
  401.  
  402.  
  403.  
  404. void * reaper_thread(void * arg)
  405. {
  406.         sThread * curr;
  407.         sThread * prev;
  408.         sThread * next;
  409.         int threads= 0;
  410.         int reaped = 0;
  411.         while(1)
  412.         {
  413. //              printf("reaper thread checking\n");
  414.                 curr = head;
  415.                 prev = NULL;
  416.                 threads=0;
  417.                 reaped =0;
  418.  
  419.                 while (curr)
  420.                 {
  421.                         pthread_mutex_lock(&reaper_mutex);
  422.                         next = curr->next;
  423.                         if(curr->fd==-1)
  424.                         {      
  425.                                 void * status;
  426.                                 pthread_join(curr->thread,&status);
  427.                                
  428.                                 syslog(LOG_INFO,"Disconnection :%s \n",curr->threadname);
  429.                                 if(prev)
  430.                                         prev->next = next;     
  431.                                 if(curr==head)
  432.                                         head=next;
  433.                                 if(curr==tail && prev)
  434.                                         tail=prev;
  435.                                 free(curr);
  436.                                 curr=next;
  437.                                 reaped ++;
  438.                         }
  439.                         else
  440.                         {
  441.                                 prev=curr;
  442.                                 curr=next;
  443.                         }
  444.                         pthread_mutex_unlock(&reaper_mutex);
  445.                         threads++;
  446.        
  447.                 }
  448.         if(reaped)
  449.                 syslog(LOG_INFO,"Reaper checked %d threads, %d reaped\n",threads,reaped);
  450.         sleep(1);
  451.         }      
  452.         return NULL;           
  453. }
  454.  
  455.  
  456. void sigchld_handler(int s)
  457. {
  458.         while(waitpid(-1, NULL, WNOHANG) > 0);
  459. }
  460.  
  461. // get sockaddr, IPv4 or IPv6:
  462. void *get_in_addr(struct sockaddr *sa)
  463. {
  464.         if (sa->sa_family == AF_INET) {
  465.                 return &(((struct sockaddr_in*)sa)->sin_addr);
  466.         }
  467.         return &(((struct sockaddr_in6*)sa)->sin6_addr);
  468. }
  469.  
  470. int main(int argc,char * argv[])
  471. {
  472.         int sockfd, new_fd; // listen on sock_fd, new connection on new_fd
  473.         struct addrinfo hints, *servinfo, *p;
  474.         struct sockaddr_storage their_addr; // connector's address information
  475.         socklen_t sin_size;
  476.         struct sigaction sa;
  477.         int yes=1;
  478.         char s[INET6_ADDRSTRLEN];
  479.         int rv;
  480.  
  481.         int serial_fd;
  482.         int telnet_fds;
  483.         pthread_t serial_thread_id,reaper_thread_id;
  484.  
  485.  
  486.  
  487.         if((argc>1))
  488.         {
  489. // daemonize code
  490.         pid_t pid;
  491.         printf("daemonizing \n");
  492.  
  493.  
  494.         umask(0);
  495.        
  496.         if((pid = fork()) < 0)
  497.         {
  498.                 printf("fork: %s\n",strerror(errno));
  499.         }
  500.         else if(pid !=0) {
  501.                 exit(0); //this is the parent
  502.         }
  503.        
  504.         setsid();
  505.  
  506.         if(chdir("/")<0)
  507.         {      
  508.                 printf("chdir to log dir: %s\n",strerror(errno));
  509.                 exit (-1);
  510.         }
  511.        
  512.         close(STDIN_FILENO);
  513.         close(STDOUT_FILENO);
  514.         close(STDERR_FILENO);
  515.  
  516.        
  517.         }      
  518.         else
  519.         {
  520.         printf("using interactive\n");
  521.         }
  522.  
  523.  
  524.  
  525. // end daemonize code
  526.  
  527.  
  528.         openlog("ais_server",LOG_CONS|LOG_PID,LOG_USER);
  529.  
  530.  
  531.  
  532.  
  533. // setup the conditional variable and the mutex used for triggering all of the threads
  534.         pthread_mutex_init(&mutex,NULL);
  535.         pthread_cond_init (&cond,NULL); // default operation
  536.  
  537.         pthread_create(&serial_thread_id,NULL,serial_thread,&cond); // pass cond variable to serial thread as argument
  538.  
  539.         pthread_mutex_init(&reaper_mutex,NULL);
  540.         pthread_create(&reaper_thread_id,NULL,reaper_thread,NULL);// reap dead threads
  541.  
  542.         memset(&hints, 0, sizeof hints);
  543.         hints.ai_family = AF_UNSPEC;
  544.         hints.ai_socktype = SOCK_STREAM;
  545.         hints.ai_flags = AI_PASSIVE; // use my IP
  546.         if ((rv = getaddrinfo(NULL, PORT, &hints, &servinfo)) != 0) {
  547.                 fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv));
  548.                 return 1;
  549.         }
  550. // loop through all the results and bind to the first we can
  551.  
  552.         for(p = servinfo; p != NULL; p = p->ai_next) {
  553.                 if ((sockfd = socket(p->ai_family, p->ai_socktype,
  554.                         p->ai_protocol)) == -1) {
  555.                         perror("server: socket");
  556.                         continue;
  557.                 }
  558.                 if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes,
  559.                         sizeof(int)) == -1) {
  560.                         perror("setsockopt");
  561.                         exit(1);
  562.                 }
  563.                 if (bind(sockfd, p->ai_addr, p->ai_addrlen) == -1) {
  564.                         close(sockfd);
  565.                         perror("server: bind");
  566.                         continue;
  567.                 }
  568.                 break;
  569.         }
  570.         if (p == NULL) {
  571.                 fprintf(stderr, "server: failed to bind\n");
  572.                 return 2;
  573.         }
  574.         freeaddrinfo(servinfo); // all done with this structure
  575.         if (listen(sockfd, BACKLOG) == -1) {
  576.                 perror("listen");
  577.                 exit(1);
  578.         }
  579.  
  580.         sa.sa_handler = sigchld_handler; // reap all dead processes
  581.         sigemptyset(&sa.sa_mask);
  582.         sa.sa_flags = SA_RESTART;
  583.         if (sigaction(SIGCHLD, &sa, NULL) == -1) {
  584.                 perror("sigaction");
  585.                 exit(1);
  586.         }
  587.         syslog(LOG_INFO,"server: waiting for connections...\n");
  588.         while(1) { // main accept() loop
  589.                 sin_size = sizeof their_addr;
  590.                 new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &sin_size);
  591. // this code is
  592.                 if (new_fd == -1) {
  593.                         perror("accept");
  594.                         continue;
  595.                 }
  596.                 inet_ntop(their_addr.ss_family,
  597.                 get_in_addr((struct sockaddr *)&their_addr),
  598.                 s, sizeof s);
  599.                 syslog(LOG_INFO,"server: got connection from %s\n", s);
  600.  
  601. //              fcntl(new_fd,F_SETFL,O_NONBLOCK); // non blocking socket
  602.  
  603.                 open_thread(new_fd,s);
  604.  
  605.         }
  606. }
  607.  
  608.