//
// server.c -- a stream socket server demo
// also includes pthreads management
//
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <arpa/inet.h>
#include <sys/wait.h>
#include <signal.h>
#include <unistd.h>
#include <fcntl.h>
#include <termios.h>
#include <errno.h>
#include <pthread.h>
/* Debugging */
#include <sys/time.h>
#include <time.h>
#include <syslog.h>
#include <sys/poll.h>
FILE * debug_file;
typedef struct Thread
{
struct Thread * next;
int fd;
pthread_t thread;
char threadname[256];
} sThread;
volatile sThread * head;
volatile sThread * tail;
pthread_mutex_t reaper_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER; // a condition for all threads to wait on
#define PORT "2223" // the port users will be connecting to
#define BACKLOG 20 // how many pending connections queue will hold
#define MAXCHARS 512
#define BUFFCNT 40
// serial buffering structure
typedef struct
{
char buff[MAXCHARS];
int len;
} tBuf;
volatile tBuf buffers[BUFFCNT];
volatile int current = 0 ;
static int UART_InitialisePort(const char* port, speed_t speed)
{
int handle;
struct termios newtio;
/* Cygwin may fail here because you cant open the port multiple times despite opening O_NONBLOCK */
handle = open(port, O_RDWR | O_NOCTTY); /* Try user input depending on port */
if (handle < 0)
{
perror("UART_InitialisePort");
return -1;
}
memset(&newtio
, 0, sizeof(newtio
));
/*
* Set the baudrate to speed (init 9600), Parity to NONE (default), Data bits to 8
*/
newtio.c_cflag = speed;
newtio.c_cflag |= CS8; /* Eight bits */
/* cygwin looks in c_ispeed and c_ospeed for the speed setting so do that
as well */
#ifdef HAVE_C_IOSPEED
newtio.c_ispeed = speed;
newtio.c_ospeed = speed;
#endif
/*
* Setting Raw Input and Defaults
*/
newtio.c_cflag |= CREAD|HUPCL|CLOCAL;
newtio.c_cflag &= ~CRTSCTS; // no flow control
newtio.c_iflag |= IGNBRK|IGNPAR;
newtio.c_lflag = 0;
newtio.c_oflag = 0;
/* Non-blocking serial port operation. Change c_cc[VTIME] to 1 if
your platform has a broken tty implementation. */
newtio.c_cc[VMIN] = 0;
newtio.c_cc[VTIME] = 0;
if (tcflush(handle, TCIFLUSH) < 0) /* Flush the serial port */
{
close(handle);
return -1;
}
if (tcsetattr(handle, TCSANOW, &newtio) < 0) /* Set the parameters */
{
close(handle);
return -1;
}
return handle;
}
static int serial_blocking(int handle, int block)
{
struct termios tio;
static int current=0; /* Port is opened non-blocking */
if (current==block) return 0;
current=block;
tcgetattr(handle,&tio);
tio.c_cc[VMIN] = (block)?1:0;
tio.c_cc[VTIME] = 0;
tcsetattr(handle, TCSANOW, &tio);
return 0;
}
static int serial_speed(int handle, speed_t speed)
{
struct termios options;
/* Get the current options for the port... */
if(tcgetattr(handle, &options) != 0)
return -1;
cfsetospeed(&options, speed);
/* flush buffers */
tcflush(handle, TCIFLUSH);
/* Set the new options for the port... */
if( tcsetattr(handle, TCSANOW, &options) != 0)
return -1;
return 0;
}
static speed_t translate_baudrate(unsigned long baudrate)
{
speed_t BaudRate=0;
switch(baudrate)
{
case 300 : BaudRate=B300; break;
case 1200 : BaudRate=B1200; break;
case 2400 : BaudRate=B2400; break;
case 4800 : BaudRate=B4800; break;
case 9600 : BaudRate=B9600; break;
case 19200 : BaudRate=B19200; break;
case 38400 : BaudRate=B38400; break;
case 57600 : BaudRate=B57600; break;
case 115200 : BaudRate=B115200; break;
case 230400 : BaudRate=B230400; break;
/* MDJ change for cygwin */
#if defined B460800
case 460800 : BaudRate=B460800; break;
#endif
default :
printf("Bad baudrate (%ld)\n",baudrate
);
BaudRate=B9600;
break;
}
return BaudRate;
}
void * serial_thread(void* arg)
{
int speed=38400;
int handle;
int hadCR = 1;
struct timeval tv = { 0,0 };
struct timeval t0 = { 0, 0};
char buff1[MAXCHARS];
gettimeofday(&t0,NULL);
// this uses a UDEV rule e.g.
//
// (PL2303) SUBSYSTEM=="tty", ATTRS{idVendor}=="067b", ATTRS{idProduct}=="2303", SYMLINK+="ttyAIS"
// 340 chinese device is different
handle = UART_InitialisePort("/dev/ttyAIS",translate_baudrate(speed));
serial_blocking(handle,1);
while(handle>=0)
{
int i,j;
int chars;
int offset;
char * where;
chars = read(handle, buff1, MAXCHARS);
j=0;
while(j<chars) {
// tag the buffer being sent
if (hadCR) {
gettimeofday(&tv,NULL);
where = &buffers[current].buff[0];
"[%d.%02d]\r\n",
tv.tv_sec-t0.tv_sec,tv.tv_usec/10000);
hadCR=0;
}
where[offset]=buff1[j];
if(where[offset]==0x0a)
hadCR=1;
j++;offset++;
if(hadCR || offset == MAXCHARS-1)
{
buffers[current].len = offset;
current = (current + 1)% BUFFCNT;
pthread_cond_broadcast(&cond);
}
}
}
close(handle);
return( NULL);
}
struct timespec timeWait =
{
.tv_sec = 0,
.tv_nsec= 0,
};
char conn_msg[] = "AIS server\r\n";
void * telnet_thread(void * ptr)
{
sThread * thread = (sThread *) ptr;
int local_count = current;
int written;
char temp_buff[1500];
struct timespec timeNow;
// printf("in thread ptr %p fd %d\n",thread,thread->fd);
written
=send
(thread
->fd
,conn_msg
,strlen(conn_msg
),
MSG_NOSIGNAL| MSG_DONTWAIT); // lose SIGPIPE error
while(1)
{
pthread_mutex_lock(&mutex);
while(local_count == current) {
int rc;
// this is wait until , which is why we took the time and incremented it
do {
gettimeofday(&timeNow,NULL);
timeNow.tv_sec ++;
rc = pthread_cond_timedwait(&cond,&mutex,&timeNow);
if(rc==ETIMEDOUT) {
int chars;
struct pollfd p;
p.fd = thread->fd;
p.events = POLLIN;
p.revents= 0;
poll(&p,1,0);
// poll any input stuff and discard
if(p.revents & (POLLHUP||POLLERR)) {
close(thread->fd);
thread->fd = -1;
pthread_mutex_unlock(&mutex);
return(NULL);
}
if(p.revents & POLLIN) {
chars=recv(thread->fd,temp_buff,1500,0);
if (chars) {
// dont log crap recieved
// syslog(LOG_INFO,"read %d from %s\n",chars,thread->threadname);
}
else {
close(thread->fd);
thread->fd =-1;
pthread_mutex_unlock(&mutex);
syslog(LOG_INFO,"Closed from %s\n",thread->threadname);
return(NULL);
}
}
}
}
while (rc==ETIMEDOUT);
}
pthread_mutex_unlock(&mutex);
while(local_count != current)
{
written=send(thread->fd,buffers[local_count].buff,
buffers[local_count].len,MSG_NOSIGNAL| MSG_DONTWAIT); // lose SIGPIPE error
if(errno==EAGAIN)
break;
if(written<=0)
syslog
(LOG_INFO
,"error: write %d : err %s on %s\n",written
,strerror(errno
),thread
->threadname
);
// this could be EAGAIN which is 'network congestion' so dont waste time queueing up packets
if (written<=0 && errno != EAGAIN)
{
close(thread->fd);
thread->fd = -1;
return( NULL);
}
if(errno!=EAGAIN && written>0)
local_count = (local_count +1) % BUFFCNT;
}
}
return(NULL);
}
// this allocates storage for a thread and starts it
void * open_thread(int fd,char * name)
{
sThread * newthread;
newthread
= calloc(1,sizeof(sThread
));
// printf("in open_thread %p\n",newthread);
pthread_mutex_lock(&reaper_mutex);
if(!head)
{
head=newthread;
}
if(tail)
{
tail->next = newthread;
}
tail=newthread;
tail->next = NULL;
pthread_mutex_unlock(&reaper_mutex);
strcpy(newthread
->threadname
,name
);
newthread->fd = fd;
pthread_create(&newthread->thread,NULL,telnet_thread,newthread);
// pthread_detach(newthread->thread);
}
void * reaper_thread(void * arg)
{
sThread * curr;
sThread * prev;
sThread * next;
int threads= 0;
int reaped = 0;
while(1)
{
// printf("reaper thread checking\n");
curr = head;
prev = NULL;
threads=0;
reaped =0;
while (curr)
{
pthread_mutex_lock(&reaper_mutex);
next = curr->next;
if(curr->fd==-1)
{
void * status;
pthread_join(curr->thread,&status);
syslog(LOG_INFO,"Disconnection :%s \n",curr->threadname);
if(prev)
prev->next = next;
if(curr==head)
head=next;
if(curr==tail && prev)
tail=prev;
curr=next;
reaped ++;
}
else
{
prev=curr;
curr=next;
}
pthread_mutex_unlock(&reaper_mutex);
threads++;
}
if(reaped)
syslog(LOG_INFO,"Reaper checked %d threads, %d reaped\n",threads,reaped);
sleep(1);
}
return NULL;
}
void sigchld_handler(int s)
{
while(waitpid(-1, NULL, WNOHANG) > 0);
}
// get sockaddr, IPv4 or IPv6:
void *get_in_addr(struct sockaddr *sa)
{
if (sa->sa_family == AF_INET) {
return &(((struct sockaddr_in*)sa)->sin_addr);
}
return &(((struct sockaddr_in6*)sa)->sin6_addr);
}
int main(int argc,char * argv[])
{
int sockfd, new_fd; // listen on sock_fd, new connection on new_fd
struct addrinfo hints, *servinfo, *p;
struct sockaddr_storage their_addr; // connector's address information
socklen_t sin_size;
struct sigaction sa;
int yes=1;
char s[INET6_ADDRSTRLEN];
int rv;
int serial_fd;
int telnet_fds;
pthread_t serial_thread_id,reaper_thread_id;
if((argc>1))
{
// daemonize code
pid_t pid;
umask(0);
if((pid = fork()) < 0)
{
}
else if(pid !=0) {
exit(0); //this is the parent
}
setsid();
if(chdir("/")<0)
{
}
close(STDIN_FILENO);
close(STDOUT_FILENO);
close(STDERR_FILENO);
}
else
{
printf("using interactive\n");
}
// end daemonize code
openlog("ais_server",LOG_CONS|LOG_PID,LOG_USER);
// setup the conditional variable and the mutex used for triggering all of the threads
pthread_mutex_init(&mutex,NULL);
pthread_cond_init (&cond,NULL); // default operation
pthread_create(&serial_thread_id,NULL,serial_thread,&cond); // pass cond variable to serial thread as argument
pthread_mutex_init(&reaper_mutex,NULL);
pthread_create(&reaper_thread_id,NULL,reaper_thread,NULL);// reap dead threads
memset(&hints
, 0, sizeof hints
);
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE; // use my IP
if ((rv = getaddrinfo(NULL, PORT, &hints, &servinfo)) != 0) {
fprintf(stderr
, "getaddrinfo: %s\n", gai_strerror
(rv
));
return 1;
}
// loop through all the results and bind to the first we can
for(p = servinfo; p != NULL; p = p->ai_next) {
if ((sockfd = socket(p->ai_family, p->ai_socktype,
p->ai_protocol)) == -1) {
continue;
}
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes,
sizeof(int)) == -1) {
}
if (bind(sockfd, p->ai_addr, p->ai_addrlen) == -1) {
close(sockfd);
continue;
}
break;
}
if (p == NULL) {
fprintf(stderr
, "server: failed to bind\n");
return 2;
}
freeaddrinfo(servinfo); // all done with this structure
if (listen(sockfd, BACKLOG) == -1) {
}
sa.sa_handler = sigchld_handler; // reap all dead processes
sigemptyset(&sa.sa_mask);
sa.sa_flags = SA_RESTART;
if (sigaction(SIGCHLD, &sa, NULL) == -1) {
}
syslog(LOG_INFO,"server: waiting for connections...\n");
while(1) { // main accept() loop
sin_size = sizeof their_addr;
new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &sin_size);
// this code is
if (new_fd == -1) {
continue;
}
inet_ntop(their_addr.ss_family,
get_in_addr((struct sockaddr *)&their_addr),
s, sizeof s);
syslog(LOG_INFO,"server: got connection from %s\n", s);
// fcntl(new_fd,F_SETFL,O_NONBLOCK); // non blocking socket
open_thread(new_fd,s);
}
}