Subversion Repositories ais_server

Rev

Go to most recent revision | Blame | Compare with Previous | Last modification | View Log | RSS feed

  1. //
  2. // server.c -- a stream socket server demo
  3. // also includes pthreads management
  4. //
  5. #include <stdio.h>
  6. #include <stdlib.h>
  7. #include <unistd.h>
  8. #include <errno.h>
  9. #include <string.h>
  10. #include <sys/types.h>
  11. #include <sys/socket.h>
  12. #include <netinet/in.h>
  13. #include <netdb.h>
  14. #include <arpa/inet.h>
  15. #include <sys/wait.h>
  16. #include <signal.h>
  17. #include <unistd.h>
  18. #include <fcntl.h>
  19.  
  20.  
  21. #include <termios.h>
  22. #include <errno.h>
  23.  
  24. #include <pthread.h>
  25.  
  26. /* Debugging */
  27.  
  28. #include <sys/time.h>
  29. #include <time.h>
  30.  
  31. #include <syslog.h>
  32.  
  33. #include <sys/poll.h>
  34.  
  35. FILE * debug_file;
  36.  
  37. typedef struct Thread
  38. {
  39.         struct    Thread * next;
  40.         int       fd;
  41.         pthread_t thread;
  42.         char threadname[256];
  43. } sThread;
  44.  
  45. volatile sThread * head;
  46. volatile sThread * tail;
  47.  
  48.  
  49. pthread_mutex_t reaper_mutex = PTHREAD_MUTEX_INITIALIZER;
  50. pthread_mutex_t mutex   = PTHREAD_MUTEX_INITIALIZER;
  51. pthread_cond_t cond     = PTHREAD_COND_INITIALIZER; // a condition for all threads to wait on
  52.  
  53.  
  54. #define PORT "2223" // the port users will be connecting to
  55. #define BACKLOG 20 // how many pending connections queue will hold
  56.  
  57.  
  58. #define MAXCHARS 512
  59. #define BUFFCNT 40
  60.  
  61. // serial buffering structure
  62. typedef struct
  63. {
  64.         char buff[MAXCHARS];
  65.         int  len;
  66. } tBuf;
  67.  
  68. volatile tBuf buffers[BUFFCNT];
  69. volatile int  current = 0 ;
  70.  
  71.  
  72.  
  73. static int UART_InitialisePort(const char* port, speed_t speed)
  74. {
  75.         int handle;
  76.         struct termios newtio;
  77.        
  78.  
  79. /* Cygwin may  fail here because you cant open the port multiple times despite opening O_NONBLOCK */
  80.         handle = open(port, O_RDWR | O_NOCTTY);      /* Try user input depending on port */
  81.         if (handle < 0)
  82.         {
  83.                 perror("UART_InitialisePort");
  84.                 return -1;
  85.         }
  86.        
  87.         memset(&newtio, 0, sizeof(newtio));
  88.        
  89.         /*
  90.         * Set the baudrate to speed (init 9600), Parity to NONE (default), Data bits to 8
  91.         */
  92.        
  93.         newtio.c_cflag = speed;
  94.         newtio.c_cflag |= CS8;    /* Eight bits            */
  95.  
  96. /* cygwin looks in c_ispeed and c_ospeed for the speed setting so do that
  97.    as well */
  98.         #ifdef HAVE_C_IOSPEED
  99.         newtio.c_ispeed = speed;
  100.         newtio.c_ospeed = speed;
  101.         #endif
  102.        
  103.         /*
  104.         * Setting Raw Input and Defaults
  105.         */
  106.         newtio.c_cflag |= CREAD|HUPCL|CLOCAL;
  107.         newtio.c_cflag &= ~CRTSCTS;  // no flow control
  108.         newtio.c_iflag |= IGNBRK|IGNPAR;
  109.         newtio.c_lflag  = 0;
  110.         newtio.c_oflag  = 0;
  111.        
  112.         /* Non-blocking serial port operation. Change c_cc[VTIME] to 1 if
  113.         your platform has a broken tty implementation. */
  114.         newtio.c_cc[VMIN]  = 0;
  115.         newtio.c_cc[VTIME] = 0;
  116.        
  117.         if (tcflush(handle, TCIFLUSH) < 0)            /* Flush the serial port */
  118.         {
  119.                 close(handle);
  120.                 return -1;
  121.         }
  122.        
  123.         if (tcsetattr(handle, TCSANOW, &newtio) < 0)  /* Set the parameters */
  124.         {
  125.                 close(handle);
  126.                 return -1;
  127.         }
  128.         return handle;
  129. }
  130.  
  131. static int serial_blocking(int handle, int block)
  132. {
  133.         struct termios tio;
  134.         static int current=0; /* Port is opened non-blocking */
  135.         if (current==block) return 0;
  136.         current=block;
  137.  
  138.        
  139.         tcgetattr(handle,&tio);
  140.  
  141.         tio.c_cc[VMIN] = (block)?1:0;
  142.         tio.c_cc[VTIME] = 0;
  143.        
  144.         tcsetattr(handle, TCSANOW, &tio);
  145.        
  146.         return 0;
  147. }
  148.  
  149.  
  150.  
  151. static int serial_speed(int handle, speed_t speed)
  152. {
  153.         struct termios options;
  154.  
  155.         /* Get the current options for the port... */
  156.         if(tcgetattr(handle, &options) != 0)
  157.                 return -1;
  158.        
  159.         cfsetospeed(&options, speed);
  160.        
  161.         /* flush buffers */
  162.         tcflush(handle, TCIFLUSH);
  163.        
  164.         /* Set the new options for the port... */
  165.         if( tcsetattr(handle, TCSANOW, &options) != 0)
  166.                 return -1;
  167.        
  168.         return 0;
  169. }
  170.  
  171.  
  172.  
  173.  
  174.  
  175.  
  176.  
  177.  
  178. static speed_t translate_baudrate(unsigned long baudrate)
  179. {
  180.         speed_t BaudRate=0;
  181.         switch(baudrate)
  182.         {
  183.                 case    300 : BaudRate=B300; break;
  184.                 case   1200 : BaudRate=B1200; break;
  185.                 case   2400 : BaudRate=B2400; break;
  186.                 case   4800 : BaudRate=B4800; break;
  187.                 case   9600 : BaudRate=B9600; break;
  188.                 case  19200 : BaudRate=B19200; break;
  189.                 case  38400 : BaudRate=B38400; break;
  190.                 case  57600 : BaudRate=B57600; break;
  191.                 case 115200 : BaudRate=B115200; break;
  192.                 case 230400 : BaudRate=B230400; break;
  193. /* MDJ change for cygwin */
  194. #if defined B460800
  195.                 case 460800 : BaudRate=B460800; break;
  196. #endif
  197.                 default :
  198.                         printf("Bad baudrate (%ld)\n",baudrate);
  199.                         BaudRate=B9600;
  200.                         break;
  201.         }
  202.         return BaudRate;
  203. }
  204.  
  205. void  * serial_thread(void* arg)
  206. {
  207.         int speed=38400;
  208.         int handle;
  209.         int hadCR = 1;
  210.         struct timeval tv = { 0,0 };
  211.  
  212.         struct timeval t0 = { 0, 0};
  213.         char  buff1[MAXCHARS];
  214.         gettimeofday(&t0,NULL);
  215. // this uses a UDEV rule e.g.
  216. //
  217. // (PL2303) SUBSYSTEM=="tty", ATTRS{idVendor}=="067b", ATTRS{idProduct}=="2303", SYMLINK+="ttyAIS"
  218. // 340 chinese device is different
  219.         handle = UART_InitialisePort("/dev/ttyAIS",translate_baudrate(speed));
  220.         serial_blocking(handle,1);
  221.         while(handle>=0)
  222.         {
  223.                 int i,j;
  224.                 int chars;
  225.                 int offset;
  226.                 char * where;
  227.                 chars = read(handle, buff1, MAXCHARS);
  228.                 j=0;
  229.                 while(j<chars) {
  230. // tag the buffer being sent
  231.                         if (hadCR) {
  232.                                 gettimeofday(&tv,NULL);
  233.                                 where = &buffers[current].buff[0];
  234.                                 offset=sprintf(where,
  235.                                         "[%d.%02d]\r\n",
  236.                                         tv.tv_sec-t0.tv_sec,tv.tv_usec/10000);                 
  237.                                 hadCR=0;
  238.                         }
  239.                         where[offset]=buff1[j];
  240.                
  241.  
  242.                         if(where[offset]==0x0a)
  243.                                 hadCR=1;
  244.                         j++;offset++;
  245.  
  246.                         if(hadCR || offset == MAXCHARS-1)
  247.                                 {
  248.                                 buffers[current].len = offset;
  249.                                 current = (current + 1)% BUFFCNT;
  250.                                 pthread_cond_broadcast(&cond);
  251.                         }
  252.                 }
  253.  
  254.         }
  255.         printf("done\n");
  256.  
  257.         close(handle);
  258.         return( NULL);
  259.  
  260.  
  261. }
  262.  
  263.  
  264. struct timespec timeWait =
  265. {
  266. .tv_sec = 0,
  267. .tv_nsec= 0,
  268. };
  269.  
  270.  
  271. char conn_msg[] = "AIS server\r\n";
  272.  
  273.  
  274. void * telnet_thread(void * ptr)
  275. {
  276.         sThread * thread = (sThread *) ptr;
  277.  
  278.         int local_count = current;
  279.         int written;
  280.         char temp_buff[1500];
  281.         struct timespec timeNow;
  282.  
  283. //      printf("in thread ptr %p fd %d\n",thread,thread->fd);
  284.         written=send(thread->fd,conn_msg,strlen(conn_msg),
  285.                 MSG_NOSIGNAL| MSG_DONTWAIT); // lose SIGPIPE error
  286.         while(1)
  287.         {
  288.  
  289.                 pthread_mutex_lock(&mutex);
  290.  
  291.                 while(local_count == current) {
  292.                         int rc;
  293. // this is wait until , which is why we took the time and incremented it
  294.                         do {
  295.                                 gettimeofday(&timeNow,NULL);
  296.                                 timeNow.tv_sec ++;
  297.                                 rc = pthread_cond_timedwait(&cond,&mutex,&timeNow);
  298.  
  299.                                 if(rc==ETIMEDOUT) {
  300.                                         int chars;
  301.                                         struct pollfd p;
  302.                                        
  303.                                         p.fd = thread->fd;
  304.                                         p.events = POLLIN;
  305.                                         p.revents= 0;
  306.                                         poll(&p,1,0);
  307. // poll any input stuff and discard
  308.                                         if(p.revents & (POLLHUP||POLLERR)) {
  309.                                                 close(thread->fd);
  310.                                                 thread->fd = -1;
  311.                                                 pthread_mutex_unlock(&mutex);
  312.                                                 return(NULL);
  313.                                                 }
  314.                                         if(p.revents & POLLIN) {
  315.                                         chars=recv(thread->fd,temp_buff,1500,0);
  316.                                         if (chars) {
  317.  
  318. // dont log crap recieved
  319. //                                              syslog(LOG_INFO,"read %d from %s\n",chars,thread->threadname); 
  320.                                                 }
  321.                                         else {
  322.                                                 close(thread->fd);
  323.                                                 thread->fd =-1;
  324.                                                 pthread_mutex_unlock(&mutex);
  325.                                                 syslog(LOG_INFO,"Closed from %s\n",thread->threadname);
  326.                                                 return(NULL);
  327.                                                 }
  328.                                         }
  329.                                         }
  330.                                 }
  331.  
  332.                         while (rc==ETIMEDOUT);
  333.                         }
  334.  
  335.                 pthread_mutex_unlock(&mutex);
  336.  
  337.                 while(local_count != current)
  338.                 {      
  339.                 written=send(thread->fd,buffers[local_count].buff,
  340.                         buffers[local_count].len,MSG_NOSIGNAL| MSG_DONTWAIT); // lose SIGPIPE error
  341.  
  342.                 if(errno==EAGAIN)
  343.                         break;
  344.                        
  345.  
  346.                 if(written<=0)
  347.                         syslog(LOG_INFO,"error: write %d : err  %s on %s\n",written,strerror(errno),thread->threadname);
  348.  
  349.  
  350.                 // this could be EAGAIN which is 'network congestion' so dont waste time queueing up packets
  351.                 if (written<=0 && errno != EAGAIN)
  352.                 {      
  353.                
  354.                         close(thread->fd);
  355.                         thread->fd = -1;
  356.                         return( NULL);
  357.  
  358.                
  359.                 }
  360.                 if(errno!=EAGAIN && written>0)
  361.                         local_count = (local_count +1) % BUFFCNT;
  362.                 }
  363.                
  364.         }
  365.         return(NULL);
  366.  
  367. }
  368.  
  369. // this allocates storage for a thread and starts it
  370. void * open_thread(int fd,char * name)
  371. {
  372.         sThread * newthread;
  373.  
  374.         newthread = calloc(1,sizeof(sThread));
  375. //      printf("in open_thread %p\n",newthread);
  376.         pthread_mutex_lock(&reaper_mutex);
  377.         if(!head)
  378.         {
  379.                 head=newthread;
  380.         }
  381.         if(tail)
  382.         {
  383.                 tail->next = newthread;
  384.         }
  385.         tail=newthread;
  386.         tail->next = NULL;
  387.         pthread_mutex_unlock(&reaper_mutex);
  388.         strcpy(newthread->threadname,name);
  389.         newthread->fd = fd;
  390.         pthread_create(&newthread->thread,NULL,telnet_thread,newthread);
  391. //      pthread_detach(newthread->thread);
  392. }
  393.  
  394.  
  395.  
  396. void * reaper_thread(void * arg)
  397. {
  398.         sThread * curr;
  399.         sThread * prev;
  400.         sThread * next;
  401.         int threads= 0;
  402.         int reaped = 0;
  403.         while(1)
  404.         {
  405. //              printf("reaper thread checking\n");
  406.                 curr = head;
  407.                 prev = NULL;
  408.                 threads=0;
  409.                 reaped =0;
  410.  
  411.                 while (curr)
  412.                 {
  413.                         pthread_mutex_lock(&reaper_mutex);
  414.                         next = curr->next;
  415.                         if(curr->fd==-1)
  416.                         {      
  417.                                 void * status;
  418.                                 pthread_join(curr->thread,&status);
  419.                                
  420.                                 syslog(LOG_INFO,"Disconnection :%s \n",curr->threadname);
  421.                                 if(prev)
  422.                                         prev->next = next;     
  423.                                 if(curr==head)
  424.                                         head=next;
  425.                                 if(curr==tail && prev)
  426.                                         tail=prev;
  427.                                 free(curr);
  428.                                 curr=next;
  429.                                 reaped ++;
  430.                         }
  431.                         else
  432.                         {
  433.                                 prev=curr;
  434.                                 curr=next;
  435.                         }
  436.                         pthread_mutex_unlock(&reaper_mutex);
  437.                         threads++;
  438.        
  439.                 }
  440.         if(reaped)
  441.                 syslog(LOG_INFO,"Reaper checked %d threads, %d reaped\n",threads,reaped);
  442.         sleep(1);
  443.         }      
  444.         return NULL;           
  445. }
  446.  
  447.  
  448. void sigchld_handler(int s)
  449. {
  450.         while(waitpid(-1, NULL, WNOHANG) > 0);
  451. }
  452.  
  453. // get sockaddr, IPv4 or IPv6:
  454. void *get_in_addr(struct sockaddr *sa)
  455. {
  456.         if (sa->sa_family == AF_INET) {
  457.                 return &(((struct sockaddr_in*)sa)->sin_addr);
  458.         }
  459.         return &(((struct sockaddr_in6*)sa)->sin6_addr);
  460. }
  461.  
  462. int main(int argc,char * argv[])
  463. {
  464.         int sockfd, new_fd; // listen on sock_fd, new connection on new_fd
  465.         struct addrinfo hints, *servinfo, *p;
  466.         struct sockaddr_storage their_addr; // connector's address information
  467.         socklen_t sin_size;
  468.         struct sigaction sa;
  469.         int yes=1;
  470.         char s[INET6_ADDRSTRLEN];
  471.         int rv;
  472.  
  473.         int serial_fd;
  474.         int telnet_fds;
  475.         pthread_t serial_thread_id,reaper_thread_id;
  476.  
  477.  
  478.  
  479.         if((argc>1))
  480.         {
  481. // daemonize code
  482.         pid_t pid;
  483.         printf("daemonizing \n");
  484.  
  485.  
  486.         umask(0);
  487.        
  488.         if((pid = fork()) < 0)
  489.         {
  490.                 printf("fork: %s\n",strerror(errno));
  491.         }
  492.         else if(pid !=0) {
  493.                 exit(0); //this is the parent
  494.         }
  495.        
  496.         setsid();
  497.  
  498.         if(chdir("/")<0)
  499.         {      
  500.                 printf("chdir to log dir: %s\n",strerror(errno));
  501.                 exit (-1);
  502.         }
  503.        
  504.         close(STDIN_FILENO);
  505.         close(STDOUT_FILENO);
  506.         close(STDERR_FILENO);
  507.  
  508.        
  509.         }      
  510.         else
  511.         {
  512.         printf("using interactive\n");
  513.         }
  514.  
  515.  
  516.  
  517. // end daemonize code
  518.  
  519.  
  520.         openlog("ais_server",LOG_CONS|LOG_PID,LOG_USER);
  521.  
  522.  
  523.  
  524.  
  525. // setup the conditional variable and the mutex used for triggering all of the threads
  526.         pthread_mutex_init(&mutex,NULL);
  527.         pthread_cond_init (&cond,NULL); // default operation
  528.  
  529.         pthread_create(&serial_thread_id,NULL,serial_thread,&cond); // pass cond variable to serial thread as argument
  530.  
  531.         pthread_mutex_init(&reaper_mutex,NULL);
  532.         pthread_create(&reaper_thread_id,NULL,reaper_thread,NULL);// reap dead threads
  533.  
  534.         memset(&hints, 0, sizeof hints);
  535.         hints.ai_family = AF_UNSPEC;
  536.         hints.ai_socktype = SOCK_STREAM;
  537.         hints.ai_flags = AI_PASSIVE; // use my IP
  538.         if ((rv = getaddrinfo(NULL, PORT, &hints, &servinfo)) != 0) {
  539.                 fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv));
  540.                 return 1;
  541.         }
  542. // loop through all the results and bind to the first we can
  543.  
  544.         for(p = servinfo; p != NULL; p = p->ai_next) {
  545.                 if ((sockfd = socket(p->ai_family, p->ai_socktype,
  546.                         p->ai_protocol)) == -1) {
  547.                         perror("server: socket");
  548.                         continue;
  549.                 }
  550.                 if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes,
  551.                         sizeof(int)) == -1) {
  552.                         perror("setsockopt");
  553.                         exit(1);
  554.                 }
  555.                 if (bind(sockfd, p->ai_addr, p->ai_addrlen) == -1) {
  556.                         close(sockfd);
  557.                         perror("server: bind");
  558.                         continue;
  559.                 }
  560.                 break;
  561.         }
  562.         if (p == NULL) {
  563.                 fprintf(stderr, "server: failed to bind\n");
  564.                 return 2;
  565.         }
  566.         freeaddrinfo(servinfo); // all done with this structure
  567.         if (listen(sockfd, BACKLOG) == -1) {
  568.                 perror("listen");
  569.                 exit(1);
  570.         }
  571.  
  572.         sa.sa_handler = sigchld_handler; // reap all dead processes
  573.         sigemptyset(&sa.sa_mask);
  574.         sa.sa_flags = SA_RESTART;
  575.         if (sigaction(SIGCHLD, &sa, NULL) == -1) {
  576.                 perror("sigaction");
  577.                 exit(1);
  578.         }
  579.         syslog(LOG_INFO,"server: waiting for connections...\n");
  580.         while(1) { // main accept() loop
  581.                 sin_size = sizeof their_addr;
  582.                 new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &sin_size);
  583. // this code is
  584.                 if (new_fd == -1) {
  585.                         perror("accept");
  586.                         continue;
  587.                 }
  588.                 inet_ntop(their_addr.ss_family,
  589.                 get_in_addr((struct sockaddr *)&their_addr),
  590.                 s, sizeof s);
  591.                 syslog(LOG_INFO,"server: got connection from %s\n", s);
  592.  
  593. //              fcntl(new_fd,F_SETFL,O_NONBLOCK); // non blocking socket
  594.  
  595.                 open_thread(new_fd,s);
  596.  
  597.         }
  598. }
  599.  
  600.