/* EchoLink multi-proxy/relay server for Linux */ /* (c)2015-2021 by Rob Janssen, PE1CHL */ /* Protocol and constants from Java version 1.2.3 by Jonathan Taylor, K1RFD */ /* EchoLink is a registered trademark of Synergenics, LLC */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define RTP_PORT 5198 #define RTCP_PORT 5199 #define ADDR_SERVER_PORT 5200 #define CLIENT_PORT 7000 #define PROXY_PORT 8100 #define IN_SA(SA) ((struct sockaddr_in *)&(SA)) #define IN_PORT(SA) IN_SA(SA)->sin_port #define IN_ADDR(SA) IN_SA(SA)->sin_addr.s_addr #define IN_EQUAL(SA1,SA2) (IN_ADDR(SA1) == IN_ADDR(SA2) && \ IN_PORT(SA1) == IN_PORT(SA2)) #define NHASH 251 #define HASHIP(addr) (ntohl(addr) % NHASH) #define POSTER_HOST "www.echolink.org" #define POSTER_PORT 80 #define POSTER_URL "/proxypost.jsp" #define MD5_SALT "#5A!zu" #define USEC 1000000LL /* microseconds in 1s */ #define AUTH_SOCKET_TIMEOUT 30*USEC #define CLIENT_SOCKET_TIMEOUT 900*USEC #define ADDR_SOCKET_TIMEOUT 60*USEC #define CONNECT_TIMEOUT 10*USEC #define POSTER_INTERVAL USEC/2 #define STATP_INTERVAL 600*USEC #define STATS_INTERVAL 15*USEC /* must be >10s */ #define REG_TIMEOUT 600*USEC #define FLOW_TIMEOUT 60*USEC #define RELAY_INTERVAL 10*USEC #define PROGRAM_VERSION "1.2.5c" #define PUBLIC_PASSWORD "PUBLIC" #define EXAMPLE_PASSWORD "notset" #define HEADER_SIZE 9 /* proxy header size */ #define MAX_DATA_SIZE 5000 /* proxy message size */ #define TCP_DATA_SIZE 4096 /* <= MAX_DATA_SIZE */ #define REQ_HLEN 7 /* relay req header */ #define RES_HLEN 9 /* relay rsp header */ #define MAX_PACKET_DATA 1500 /* relay max data */ #ifndef max # define max(a,b) (((a) > (b)) ? (a) : (b)) # define min(a,b) (((a) < (b)) ? (a) : (b)) #endif /* datatypes */ /* proxy message type on TCP socket */ enum msgtype { PROXY_MSG_TCP_OPEN = 1, /* open TCP connection to ADDR srvr */ PROXY_MSG_TCP_DATA = 2, /* data for TCP connection */ PROXY_MSG_TCP_CLOSE = 3, /* close TCP connection */ PROXY_MSG_TCP_STATUS = 4, /* status of TCP connect */ PROXY_MSG_UDP_DATA = 5, /* data for RTP socket */ PROXY_MSG_UDP_CONTROL = 6, /* data for RTCP socket */ PROXY_MSG_SYSTEM = 7 /* system message */ }; /* proxy reason codes for auth failure */ enum reason { REASON_CODE_BAD_PW = 1, /* bad password for proxy server */ REASON_CODE_ACCESS_DENIED = 2 /* client rejected due to pattern */ }; /* relay request types on UDP socket */ enum request { REQUEST_TYPE_KEEPALIVE = 0, /* keep flow alive */ REQUEST_TYPE_DATA = 1, /* relay data */ REQUEST_TYPE_CLOSE = 2, /* unused */ REQUEST_TYPE_REGISTER = 3, /* register callsign */ REQUEST_TYPE_UNREGISTER = 4, /* unregister callsign */ REQUEST_TYPE_DROP_FLOW = 5, /* delete flow record */ REQUEST_TYPE_PING = 6 /* ping request */ }; /* relay response types on UDP socket */ enum response { RESPONSE_TYPE_DATA = 1, /* relay data */ RESPONSE_TYPE_ERROR = 2, /* error message */ RESPONSE_TYPE_PING = 3, /* ping reply */ RESPONSE_TYPE_ACK = 4 /* acknowledgment of client request */ }; /* relay error codes */ enum relayerror { ERR_ALLOCATION = 1, /* cannot allocate flow */ }; /* definition of an upcall function for opentcp */ typedef void (*upcall_t)(int fd,void *ptr,int err); /* items from the config file */ struct confvar { struct confvar *next; /* linked list */ char *name; /* variable name */ char *value; /* variable value */ }; /* a descriptor for each open fd holding all state and session info */ struct descriptor { enum fdtype { UNUSED = 0, RTP_FD, RTCP_FD, CLIENT_FD, PROXY_FD, SESSION_FD, ADDR_TCP_FD, POSTER_FD, CONNECT_FD, } fdtype; /* type of fd */ uint32_t events; /* epoll events */ struct proxy *proxy; /* related proxy session */ int64_t timeout; /* microseconds of next timer event */ upcall_t upcall; /* upcall for nonblocking connect */ void *ptr; /* pointer parameter for upcall */ }; /* definitions for a single proxy instance */ struct proxy { struct proxy *next; /* linked list */ enum state { UNUSED_P = 0, READY, OFF, NONCE, AUTH1, AUTH2, BUSY, } state; /* connection state */ int fd; /* client file descriptor */ int tcp_fd; /* ADDR TCP file descriptor */ uint32_t nonce; /* nonce for client auth */ int64_t bytesout; /* bytes sent to this client */ int64_t nextstatuspost; /* time for next status post */ int64_t connectiontimeout; /* timeout for this connection */ in_addr_t qaddr; /* address of most recent qso peer */ char *regaddr; /* registration address */ char *regname; /* registration name */ char *regcomment; /* registration comment */ char *password; /* password for this proxy */ char digest[32]; /* MD5 digest of name/comment */ char clientcall[16]; /* client callsign */ struct sockaddr laddr; /* local address */ char laddr_s[16]; /* same as a string */ struct sockaddr raddr; /* remote address */ char raddr_s[16]; /* same as a string */ int hdrbytes; /* header bytes that have been read */ int msgbytes; /* message bytes that have been read */ int heldbytes; /* message bytes held for throttle */ char header[HEADER_SIZE]; /* buffer for received TCP header */ char message[MAX_DATA_SIZE]; /* buffer for received TCP message */ }; /* definitions for a single relay instance */ struct relay { struct relay *next; /* linked list */ enum rlstate { UNUSED_R = 0, RELAY, } state; /* relay state */ int64_t bytesout; /* bytes sent by this relay */ int64_t bytesin; /* bytes received by this relay */ struct sockaddr laddr; /* local address */ char laddr_s[16]; /* same as a string */ struct sockaddr lastclient; /* client for which last error logged */ struct registration *registration; /* callsign registration */ struct flowrecord *flowrecord[NHASH]; /* flow records by peeraddress */ }; /* callsign registration */ struct registration { struct registration *next; /* linked list */ struct registration *prev; char clientcall[16]; /* client callsign */ struct sockaddr clientaddr; /* client address */ int64_t lastactivity; /* timestamp of last activity */ }; /* flow record */ struct flowrecord { struct flowrecord *next; /* linked list */ struct flowrecord *prev; in_addr_t peeraddr; /* peer address */ struct sockaddr clientaddr; /* client address */ int64_t lastactivity; /* timestamp of last activity */ int64_t bytesout; /* bytes sent to this client */ int64_t bytesin; /* bytes received from this client */ }; /* variables */ int daemonize = 1; /* automatically become daemon? */ int debug = 0; /* print debug output? */ int quit = 0; /* quit program? */ FILE *logfile = NULL; /* logging/debugging file */ FILE *statfile = NULL; /* file to put operative status */ struct confvar *configvars = NULL; /* list of config variables */ char *denied_pattern = NULL; /* denied callsigns pattern */ char *allowed_pattern = NULL; /* allowed callsigns pattern */ int proxy_port = PROXY_PORT; /* proxy portnumber */ int64_t connectiontimeout = 0; /* connection timeout */ int64_t now; /* current time in microseconds */ int64_t next = 0; /* time of earliest next time event */ int epoll_fd; /* fd for epoll instance */ int rtp_fd; /* fd for RTP port */ int rtcp_fd; /* fd for RTCP port */ int client_fd; /* fd for CLIENT port */ int proxy_fd; /* fd for PROXY port */ int minfd = FD_SETSIZE-1; /* lowest fd currently registered */ int maxfd = -1; /* highest fd currently registered */ int numevents; /* number of epoll events received */ int nproxies = 0; /* number of proxies defined */ int nrelays = 0; /* number of relays defined */ struct proxy *proxy; /* data for each proxy */ struct proxy *proxy_by_addr[NHASH]; /* hashed index of proxy by address */ struct relay *relay; /* data for each relay */ struct relay *relay_by_addr[NHASH]; /* hashed index of relay by address */ struct descriptor desc[FD_SETSIZE]; /* descriptor for each fd */ struct sockaddr poster_addr; /* status poster address */ char buf[MAX_DATA_SIZE]; /* general-purpose buffer */ char *error403 = "403 This is an EchoLink Proxy, not a Web Proxy! Go away!\r\n\r\nThis is an EchoLink Proxy, not a Web Proxy! Go away!\r\n"; int len403; /* prototypes */ static void logmsg(char *fmt,...) __attribute__ ((format(printf,1,2))); static void sighndlr(int sig); static int programloop(void); static void status(); static void proxy_accept(); static void proxy_session(int fd); static void proxy_tcp(int fd,void *ptr,int err); static void proxy_timer(int fd); static int proxy_message(struct proxy *pr,enum msgtype type,in_addr_t addr,char *data,int size); static void proxy_resume(int fd); static void proxy_close(struct proxy *pr); static void tcp_session(int fd); static void tcp_timer(int fd); static void udp_input(int fd,in_port_t port,enum msgtype msgtype); static void statusposter(); static int poststatus(struct proxy *pr,int nonblock); static void post_upcall(int fd,void *ptr,int err); static void client_input(int fd); static struct flowrecord *findpeerflowrecord(struct relay *rl,in_addr_t saddr); static struct flowrecord *findflowrecord(struct relay *rl,struct sockaddr *caddr,in_addr_t paddr); static void deleteflowrecord(struct relay *rl,struct flowrecord *fr,char *reason); static void fixupflowrecord(struct relay *rl,struct sockaddr *raddr_old,struct sockaddr *raddr_new); static void registercallsign(struct relay *rl,struct sockaddr *raddr,char *callsign); static struct registration *findcallsign(struct relay *rl,struct sockaddr *raddr); static void unregistercallsign(struct relay *rl,struct registration *reg,char *reason); static void removeobsolete(void); static int client_error(struct relay *rl,struct sockaddr *addr,struct sockaddr *saddr,enum relayerror errcode); static int client_message(struct relay *rl,enum response resp,struct sockaddr *raddr,struct sockaddr *saddr,in_port_t dport,char *data,int size); static char *getcallfrommessage(char *data,int len); static int checkaccesscontrols(char *call); static int isbyepacket(char *data,int len); static int buildbyepacket(void); static int readconfig(char *config); static char *confvar(char *name); static struct descriptor *registerfd(int fd,enum fdtype fdtype,uint32_t events); static int modifyfd(int fd,uint32_t events); static int unregisterfd(int fd); static int hostlookup(char *name,struct sockaddr *addr,char *text,size_t len); static int bindport(int port,int socktype); static int opentcp(struct sockaddr *from,struct sockaddr *to,int nonblock,upcall_t upcall,void *ptr); static int opentcp_upcall(int fd,upcall_t upcall,void *ptr); static int recvudp(int fd,in_addr_t *laddr,struct sockaddr *raddr,char *buf1,int len1); static int sendudp(int fd,in_addr_t laddr,struct sockaddr *raddr,char *buf1,int len1,char *buf2,int len2); static int64_t timestamp(void); static char *urlencode(char *s); /* main program */ int main (int argc,char *argv[]) { int ex = 0; int n,i; char *p; struct proxy *pr,*pr2; struct relay *rl,*rl2; MD5_CTX md5ctx; unsigned char digest[MD5_DIGEST_SIZE]; char hex[3]; setlocale(LC_ALL,"C"); /* make sure plain ASCII is used */ if (argc < 2) { fprintf(stderr,"Usage: elproxy configfile [debuglevel]\n"); exit(99); } if (argc > 2) { debug = atoi(argv[2]); daemonize = 0; } if (readconfig(argv[1]) < 0) /* read configuration file */ exit(1); now = timestamp(); /* in case it is used while init */ if ((p = confvar("LogFile")) != NULL && (logfile = fopen(p,"a")) == NULL) fprintf(stderr,"Warning: cannot open logfile %s\n",p); if ((p = confvar("StatusFile")) != NULL) { unlink(p); if ((statfile = fopen(p,"w")) == NULL) fprintf(stderr,"Warning: cannot open statusfile %s\n",p); } /* process some of the global parameters */ if ((p = confvar("Daemonize")) != NULL) daemonize = atoi(p); if ((p = confvar("ConnectionTimeout")) != NULL) connectiontimeout = atoi(p) * 60 * USEC; denied_pattern = confvar("CallsignsDenied"); allowed_pattern = confvar("CallsignsAllowed"); /* allocate and initialize all of the proxy instances */ if ((p = confvar("NumberOfProxies")) != NULL) nproxies = atoi(p); if (nproxies != 0) proxy = calloc(sizeof(struct proxy),nproxies); for (n = 0; n < nproxies; n++) { pr = &proxy[n]; snprintf(buf,sizeof(buf),"Proxy_%d",n); /* init this proxy when an address is defined in the configuration */ if ((p = confvar(buf)) != NULL) { if (hostlookup(p,&pr->laddr,pr->laddr_s,sizeof(pr->laddr_s)) < 0) { fprintf(stderr,"Bad address %s=%s (%s) pr=%d\n", buf,p,hstrerror(h_errno),n); exit(2); } /* make sure all proxy addresses are unique */ i = HASHIP(IN_ADDR(pr->laddr)); for (pr2 = proxy_by_addr[i]; pr2 != NULL; pr2 = pr2->next) if (IN_ADDR(pr->laddr) == IN_ADDR(pr2->laddr)) { fprintf(stderr,"Duplicate address %s=%s\n",buf,pr->laddr_s); exit(2); } pr->next = proxy_by_addr[i]; proxy_by_addr[i] = pr; pr->regaddr = p; /* check if specific PublicAddress for this proxy defined */ /* if not, use the Proxy address */ snprintf(buf,sizeof(buf),"PublicAddress_%d",n); if ((p = confvar(buf)) != NULL) pr->regaddr = p; /* the RegistrationAddr, Name and Comment are hashed */ /* to be sent with the registration */ MD5Init(&md5ctx); MD5Update(&md5ctx,(unsigned char *)pr->regaddr,strlen(pr->regaddr)); /* check if a specific RegistrationName for this proxy is defined */ /* if not, use the global RegistrationName */ snprintf(buf,sizeof(buf),"RegistrationName_%d",n); if ((p = confvar(buf)) == NULL) if ((p = confvar("RegistrationName")) == NULL) { fprintf(stderr,"RegistrationName not set!\n"); exit(2); } snprintf(buf,sizeof(buf),p,n,n,n); pr->regname = urlencode(buf); MD5Update(&md5ctx,(unsigned char *)buf,strlen(buf)); /* check if specific RegistrationComment for this proxy defined */ /* if not, use the global RegistrationComment */ snprintf(buf,sizeof(buf),"RegistrationComment_%d",n); if ((p = confvar(buf)) == NULL) if ((p = confvar("RegistrationComment")) == NULL) { fprintf(stderr,"RegistrationComment not set!\n"); exit(2); } snprintf(buf,sizeof(buf),p,n,n,n); pr->regcomment = urlencode(buf); strcpy(buf,MD5_SALT); MD5Update(&md5ctx,(unsigned char *)buf,strlen(buf)); MD5Final(digest,&md5ctx); for (i = 0; i < MD5_DIGEST_SIZE; i++) { snprintf(hex,sizeof(hex),"%02X",digest[i]); memcpy(pr->digest + 2*i,hex,2); } /* check if a specific password for this proxy is defined */ /* if not, use the global password */ snprintf(buf,sizeof(buf),"Password_%d",n); if ((p = confvar(buf)) == NULL) p = confvar("Password"); if (p == NULL || !strcmp(p,EXAMPLE_PASSWORD)) { fprintf(stderr, "Password not set, please edit configfile first!\n"); exit(2); } pr->password = p; while (*p != '\0') { *p = toupper(*p); p++; } pr->state = READY; } } /* allocate and initialize all of the relay instances */ if ((p = confvar("NumberOfRelays")) != NULL) nrelays = atoi(p); if (nrelays != 0) relay = calloc(sizeof(struct relay),nrelays); for (n = 0; n < nrelays; n++) { rl = &relay[n]; snprintf(buf,sizeof(buf),"Relay_%d",n); /* init this relay when an address is defined in the configuration */ if ((p = confvar(buf)) != NULL) { if (hostlookup(p,&rl->laddr,rl->laddr_s,sizeof(rl->laddr_s)) < 0) { fprintf(stderr,"Bad address %s=%s (%s)\n", buf,p,hstrerror(h_errno)); exit(2); } /* make sure all relay addresses are unique */ i = HASHIP(IN_ADDR(rl->laddr)); for (pr2 = proxy_by_addr[i]; pr2 != NULL; pr2 = pr2->next) if (IN_ADDR(rl->laddr) == IN_ADDR(pr2->laddr)) { fprintf(stderr,"Duplicate address %s=%s\n",buf,rl->laddr_s); exit(2); } for (rl2 = relay_by_addr[i]; rl2 != NULL; rl2 = rl2->next) if (IN_ADDR(rl->laddr) == IN_ADDR(rl2->laddr)) { fprintf(stderr,"Duplicate address %s=%s\n",buf,rl->laddr_s); exit(2); } rl->next = relay_by_addr[i]; relay_by_addr[i] = rl; rl->state = RELAY; } } /* lookup poster host address once for entire run */ if (hostlookup(p = POSTER_HOST,&poster_addr,NULL,0) < 0) { fprintf(stderr,"Lookup poster host %s failed (%s)\n", p,hstrerror(h_errno)); exit(2); } IN_PORT(poster_addr) = htons(POSTER_PORT); /* prepare the epoll instance */ if ((epoll_fd = epoll_create1(0)) < 0) { fprintf(stderr,"Cannot create epoll instance (%s)\n",strerror(errno)); exit(2); } /* open all listen sockets */ if ((rtp_fd = bindport(RTP_PORT,SOCK_DGRAM)) < 0) { fprintf(stderr,"Cannot bind UDP port %d\n",RTP_PORT); exit(3); } registerfd(rtp_fd,RTP_FD,EPOLLIN); if ((rtcp_fd = bindport(RTCP_PORT,SOCK_DGRAM)) < 0) { fprintf(stderr,"Cannot bind UDP port %d\n",RTCP_PORT); exit(3); } registerfd(rtcp_fd,RTCP_FD,EPOLLIN); if (nrelays != 0) { if ((client_fd = bindport(CLIENT_PORT,SOCK_DGRAM)) < 0) { fprintf(stderr,"Cannot bind UDP port %d\n",CLIENT_PORT); exit(3); } registerfd(client_fd,CLIENT_FD,EPOLLIN); } if (nproxies != 0) { if ((p = confvar("Port")) != NULL) proxy_port = atoi(p); if ((proxy_fd = bindport(proxy_port,SOCK_STREAM)) < 0) { fprintf(stderr,"Cannot bind TCP port %d\n",proxy_port); exit(3); } registerfd(proxy_fd,PROXY_FD,EPOLLIN); } /* catch some signals */ signal(SIGHUP,sighndlr); signal(SIGINT,sighndlr); signal(SIGQUIT,sighndlr); signal(SIGTERM,sighndlr); /* ready to start the service */ if (daemonize) daemon(1,0); len403 = strlen(error403); now = timestamp(); srand((unsigned int)now); logmsg("start"); status(); while (!quit && programloop() == 0) ; logmsg("quit %d",quit); for (n = 0; n < nproxies; n++) { pr = &proxy[n]; if (pr->state != UNUSED_P) { pr->state = OFF; poststatus(pr,0); proxy_close(pr); } } logmsg("exit %d",ex); if (statfile != NULL) fclose(statfile); if (logfile != NULL) fclose(logfile); exit(ex); } /* log a message */ static void logmsg (char *fmt,...) { time_t t; char *tim,*p; va_list ap; if (logfile != NULL || !daemonize) { t = (now = timestamp()) / USEC; tim = ctime(&t); if ((p = strchr(tim,'\n')) != NULL) *p = '\0'; } if (logfile != NULL) { fprintf(logfile,"%s: ",tim); va_start(ap,fmt); vfprintf(logfile,fmt,ap); va_end(ap); fprintf(logfile,"\n"); } if (!daemonize) { fprintf(stderr,"%.15s.%06d: ",tim + 4,(int)(now % USEC)); va_start(ap,fmt); vfprintf(stderr,fmt,ap); va_end(ap); fprintf(stderr,"\n"); } } /* signal handler */ static void sighndlr (int sig) { char *p; signal(sig,sighndlr); if (sig == SIGHUP) { if (logfile != NULL) fclose(logfile); if ((p = confvar("LogFile")) != NULL) logfile = fopen(p,"a"); } else { quit++; } logmsg("signal %d errno %d quit %d",sig,errno,quit); if (quit > 3) exit(5); } /* epoll loop to wait for next event */ static int programloop () { int e,fd,p; int timeout; int maxevents; struct epoll_event *events; /* allocate buffer to receive epoll_wait events (up to 10) */ maxevents = min((maxfd - minfd) + nproxies + 2,10); events = calloc(maxevents,sizeof(struct epoll_event)); next = 0; while (!quit) { if (logfile != NULL) fflush(logfile); /* find the next time to process timers */ if (next < (now = timestamp())) { next = now + STATS_INTERVAL; for (fd = minfd; fd <= maxfd; fd++) if (desc[fd].timeout && desc[fd].timeout < next) next = desc[fd].timeout; for (p = 0; p < nproxies; p++) if (proxy[p].state != UNUSED_P) if (proxy[p].nextstatuspost < next) { if (proxy[p].nextstatuspost == 0) next = min(next,now + POSTER_INTERVAL); else next = proxy[p].nextstatuspost; } } /* epoll_wait uses milliseconds but our times are in microseconds */ if ((timeout = (next - now) / 1000) < 10) timeout = 10; /* find the ready file descriptors for input and output */ if ((numevents = epoll_wait(epoll_fd,events,maxevents,timeout)) < 0) { if (errno == EINTR) continue; logmsg("error %d on epoll_wait",errno); free(events); return EOF; } /* update global current time */ now = timestamp(); /* process the received epoll events */ for (e = 0; e < numevents; e++) { fd = events[e].data.fd; if (events[e].events & EPOLLIN) { switch (desc[fd].fdtype) { case RTP_FD: udp_input(fd,htons(RTP_PORT),PROXY_MSG_UDP_DATA); break; case RTCP_FD: udp_input(fd,htons(RTCP_PORT),PROXY_MSG_UDP_CONTROL); break; case CLIENT_FD: client_input(fd); break; case PROXY_FD: proxy_accept(); break; case SESSION_FD: proxy_session(fd); break; case ADDR_TCP_FD: tcp_session(fd); break; case POSTER_FD: recv(fd,buf,sizeof(buf),0); close(fd); unregisterfd(fd); break; default: logmsg("epoll_wait returns EPOLLIN for fd=%d type=%d", fd,desc[fd].fdtype); unregisterfd(fd); break; } } if (events[e].events & EPOLLOUT) { switch (desc[fd].fdtype) { case SESSION_FD: proxy_resume(fd); break; case CONNECT_FD: opentcp_upcall(fd,desc[fd].upcall,desc[fd].ptr); break; default: logmsg("epoll_wait returns EPOLLOUT for fd=%d type=%d", fd,desc[fd].fdtype); unregisterfd(fd); break; } } } /* process timers */ if (now >= next) { for (fd = minfd; fd <= maxfd; fd++) if (desc[fd].timeout && now >= desc[fd].timeout) switch (desc[fd].fdtype) { case SESSION_FD: proxy_timer(fd); break; case ADDR_TCP_FD: tcp_timer(fd); break; case POSTER_FD: close(fd); unregisterfd(fd); break; case CONNECT_FD: if (desc[fd].upcall != NULL) (*desc[fd].upcall)(fd,desc[fd].ptr,1); close(fd); unregisterfd(fd); break; default: logmsg("timer event for fd=%d type=%d", fd,desc[fd].fdtype); desc[fd].timeout = 0; break; } next = 0; } status(); /* update statusfile */ if (nproxies != 0) statusposter(); /* post proxy status */ if (nrelays != 0) removeobsolete(); /* remove obsolete flows/reg */ } free(events); return 0; } /* write status into statfile */ static void status () { static int64_t nextstat = 0; int n,h,npr,nbs,nrl,nuse,nreg,nflow; struct relay *rl; struct registration *reg; struct flowrecord *fr; if (statfile == NULL) return; if (now >= nextstat) { npr = nbs = nrl = nuse = nreg = nflow = 0; for (n = 0; n < nproxies; n++) { if (proxy[n].state != UNUSED_P) { npr++; if (proxy[n].state != READY) nbs++; } } for (n = 0; n < nrelays; n++) { rl = &relay[n]; if (rl->state != UNUSED_R) { nrl++; if (rl->registration != NULL) { nuse++; for (reg = rl->registration; reg != NULL; reg = reg->next) nreg++; for (h = 0; h < NHASH; h++) for (fr = rl->flowrecord[h]; fr != NULL; fr = fr->next) nflow++; } } } rewind(statfile); fprintf(statfile,"OK\nOK: %d Proxies, %d Busy, %d Relays, %d Used, %d Regs, %d Flows | proxies=%d busy=%d relays=%d used=%d regs=%d flows=%d\n", npr,nbs,nrl,nuse,nreg,nflow,npr,nbs,nrl,nuse,nreg,nflow); if (nbs != 0) fprintf(statfile,"Pr# Local address Remote address Callsign QSO Bytes TX\n"); for (n = 0; n < nproxies; n++) { struct proxy *pr = &proxy[n]; if (pr->state != UNUSED_P && pr->state != READY) { char tmp[16]; if (pr->state == BUSY) { if (pr->qaddr) { struct sockaddr addr; memset(&addr,0,sizeof(addr)); addr.sa_family = AF_INET; IN_ADDR(addr) = pr->qaddr; getnameinfo(&addr,sizeof(struct sockaddr),tmp, sizeof(tmp),NULL,0,NI_NUMERICHOST); pr->qaddr = 0; } else { tmp[0] = '\0'; } } else { snprintf(tmp,sizeof(tmp),"[st%d fd%d t%d]", pr->state,pr->fd,desc[pr->fd].fdtype); } fprintf(statfile,"%3d %-15s %-15s %-12s %-15s %11lld\n", n,pr->laddr_s,pr->raddr_s,pr->clientcall,tmp, (long long)pr->bytesout); } } if (nbs != 0 && nreg != 0) fprintf(statfile,"\n"); if (nreg != 0) fprintf(statfile,"Rl# Local address Registr Flows Latest callsign Bytes RX Bytes TX\n"); for (n = 0; n < nrelays; n++) { rl = &relay[n]; if (rl->state != UNUSED_R && rl->registration != NULL) { char *callsign; nreg = nflow = 0; callsign = ""; for (reg = rl->registration; reg != NULL; reg = reg->next) nreg++; callsign = rl->registration->clientcall; for (h = 0; h < NHASH; h++) for (fr = rl->flowrecord[h]; fr != NULL; fr = fr->next) nflow++; fprintf(statfile,"%3d %-15s %7d %6d %-15s %11lld %11lld\n", n,rl->laddr_s,nreg,nflow,callsign, (long long)rl->bytesin,(long long)rl->bytesout); } } fflush(statfile); ftruncate(fileno(statfile),ftell(statfile)); nextstat = now + STATS_INTERVAL; } } /* accept a connection to the proxy port */ static void proxy_accept () { int fd,v; struct proxy *pr; struct descriptor *ds; socklen_t sklen; struct sockaddr laddr,raddr; char raddr_s[16]; memset(&raddr,0,sklen = sizeof(struct sockaddr)); if ((fd = accept(proxy_fd,&raddr,&sklen)) < 0) { logmsg("accept failed (%d %s)",fd,strerror(errno)); return; } getnameinfo(&raddr,sizeof(struct sockaddr),raddr_s,sizeof(raddr_s), NULL,0,NI_NUMERICHOST); if (debug) logmsg("accept from %s fd=%d",raddr_s,fd); v = fcntl(fd,F_GETFL,0); /* set nonblocking mode */ fcntl(fd,F_SETFL,v|O_NONBLOCK); v = 1; /* set keepalive mode */ setsockopt(fd,SOL_SOCKET,SO_KEEPALIVE,&v,sizeof(v)); memset(&laddr,0,sklen = sizeof(struct sockaddr)); getsockname(fd,&laddr,&sklen); /* find the proxy entry with this local address */ for (pr = proxy_by_addr[HASHIP(IN_ADDR(laddr))]; pr != NULL; pr = pr->next) if (IN_ADDR(laddr) == IN_ADDR(pr->laddr)) break; /* when not found or busy, just close the connection */ if (pr == NULL || pr->state != READY) { close(fd); if (debug && pr != NULL) logmsg("pr=%ld busy pr->fd=%d",(long)(pr - proxy),pr->fd); return; } /* check if already some data sent -> probably a proxy scanner */ errno = 0; if (recv(fd,buf,sizeof(buf),0) > 0) { send(fd,error403,len403,MSG_NOSIGNAL); close(fd); if (debug) logmsg("pr=%ld fd=%d unsolicited data",(long)(pr - proxy),fd); return; } if (errno == EBADF) { close(fd); logmsg("pr=%ld fd=%d bad fd",(long)(pr - proxy),fd); return; } /* register the fd and link everything together */ if ((ds = registerfd(fd,SESSION_FD,EPOLLIN)) == NULL) { close(fd); logmsg("register failed for fd=%d",fd); return; } ds->proxy = pr; /* initialize the proxy instance */ pr->fd = fd; pr->raddr = raddr; strncpy(pr->raddr_s,raddr_s,sizeof(pr->raddr_s)); memset(pr->clientcall,0,sizeof(pr->clientcall)); pr->tcp_fd = pr->hdrbytes = pr->msgbytes = pr->heldbytes = 0; pr->qaddr = pr->bytesout = 0; pr->state = NONCE; /* set a .5 second timer to wait for unsolicited data (proxy scanner) */ ds->timeout = now + USEC/2; next = min(next,ds->timeout); } /* receive data on a proxy session */ static void proxy_session (int fd) { struct proxy *pr; struct descriptor *ds; int nread,n,letter,digit; char *call; uint32_t msglen; struct sockaddr saddr; MD5_CTX md5ctx; unsigned char digest[MD5_DIGEST_SIZE]; char tmp[32]; if ((pr = (ds = &desc[fd])->proxy) == NULL) return; switch (pr->state) { case AUTH1: /* get call (up to \n) and check for validity */ letter = digit = nread = 0; for (n = 0; n < 12; n++) { if ((nread = recv(fd,buf,1,0)) != 1) break; if (buf[0] == '\n') break; if (isupper((unsigned char)buf[0])) letter++; else if (isdigit((unsigned char)buf[0])) digit++; else if (buf[0] != '-') break; pr->clientcall[n] = buf[0]; } /* when it does not look like a callsign, it probably is one of */ /* those webproxy scanners sending a CONNECT, GET, POST etc */ if (nread != 1 || letter < 2 || !digit || buf[0] != '\n') { if (debug) logmsg("pr=%ld fd=%d bad callsign %.15s", (long)(pr - proxy),fd,pr->clientcall); recv(fd,buf,sizeof(buf),0); send(fd,error403,len403,MSG_NOSIGNAL); proxy_close(pr); break; } pr->clientcall[n] = '\0'; pr->state = AUTH2; break; case AUTH2: /* check password (MD5 of password and nonce) */ MD5Init(&md5ctx); MD5Update(&md5ctx,(unsigned char *)pr->password,strlen(pr->password)); snprintf(tmp,sizeof(tmp),"%x",pr->nonce); MD5Update(&md5ctx,(unsigned char *)tmp,8); MD5Final(digest,&md5ctx); if (recv(fd,buf,MD5_DIGEST_SIZE,0) != MD5_DIGEST_SIZE) { if (debug) logmsg("pr=%ld fd=%d bad md5 digest",(long)(pr - proxy),fd); send(fd,error403,len403,MSG_NOSIGNAL); proxy_close(pr); break; } if (memcmp(digest,buf,MD5_DIGEST_SIZE)) { buf[0] = REASON_CODE_BAD_PW; proxy_message(pr,PROXY_MSG_SYSTEM,0,buf,1); logmsg("pr=%ld Bad password call=%s ip=%s", (long)(pr - proxy),pr->clientcall,pr->raddr_s); proxy_close(pr); break; } /* check for allowed and denied callsigns */ if (!checkaccesscontrols(pr->clientcall)) { buf[0] = REASON_CODE_ACCESS_DENIED; proxy_message(pr,PROXY_MSG_SYSTEM,0,buf,1); logmsg("pr=%ld Access denied call=%s ip=%s", (long)(pr - proxy),pr->clientcall,pr->raddr_s); proxy_close(pr); break; } /* check if this same callsign is connected to another proxy */ for (n = 0; n < nproxies; n++) if (proxy[n].state == BUSY && !strcmp(pr->clientcall,proxy[n].clientcall)) { logmsg("pr=%ld Duplicate call=%s ip=%s other=%d fd=%d", (long)(pr - proxy),pr->clientcall,pr->raddr_s,n,fd); proxy_close(&proxy[n]); break; } /* access OK, log and setup connection */ logmsg("pr=%ld Auth call=%s ip=%s fd=%d", (long)(pr - proxy),pr->clientcall,pr->raddr_s,fd); ds->timeout = now + CLIENT_SOCKET_TIMEOUT; next = 0; if (connectiontimeout) pr->connectiontimeout = now + connectiontimeout; else pr->connectiontimeout = 0; pr->state = BUSY; poststatus(pr,1); break; case BUSY: /* active proxy, collect header */ if (pr->hdrbytes != HEADER_SIZE) { if ((nread = recv(fd,&pr->header[pr->hdrbytes], HEADER_SIZE - pr->hdrbytes,0)) <= 0) { if (debug) logmsg("pr=%ld EOF on fd=%d",(long)(pr - proxy),fd); proxy_close(pr); break; } if ((pr->hdrbytes += nread) != HEADER_SIZE) break; } /* extract address and message length from header */ memset(&saddr,0,sizeof(saddr)); saddr.sa_family = AF_INET; memcpy(&IN_ADDR(saddr),&pr->header[1],4); memcpy(&msglen,&pr->header[5],4); if (msglen > MAX_DATA_SIZE) { for (n = 0; n < HEADER_SIZE; n++) snprintf(&tmp[2*n],3,"%02x",(unsigned char)pr->header[n]); logmsg("invalid packet msglen=%u %s",msglen,tmp); proxy_close(pr); break; } /* collect message part by part */ if (pr->msgbytes != msglen) { pr->heldbytes = 0; if ((nread = recv(fd,&pr->message[pr->msgbytes], msglen - pr->msgbytes,0)) <= 0) { if (nread < 0 && errno == EAGAIN) break; if (debug) logmsg("pr=%ld EOF on fd=%d",(long)(pr - proxy),fd); proxy_close(pr); break; } if ((pr->msgbytes += nread) != msglen) break; } /* be ready for next message after handling this one */ pr->hdrbytes = pr->msgbytes = 0; ds->timeout = now + CLIENT_SOCKET_TIMEOUT; next = min(next,ds->timeout); if (pr->connectiontimeout && now >= pr->connectiontimeout) { logmsg("pr=%ld Conn timeout call=%s", (long)(pr - proxy),pr->clientcall); proxy_close(pr); break; } /* process the message */ switch ((enum msgtype)pr->header[0]) { case PROXY_MSG_TCP_OPEN: if (pr->tcp_fd) { close(pr->tcp_fd); unregisterfd(pr->tcp_fd); } IN_PORT(saddr) = htons(ADDR_SERVER_PORT); if ((pr->tcp_fd = opentcp(&pr->laddr,&saddr,1,proxy_tcp,pr)) < 0) { buf[0] = errno? errno : 1; buf[1] = buf[2] = buf[3] = 0; proxy_message(pr,PROXY_MSG_TCP_STATUS,0,buf,4); break; } modifyfd(fd,0); /* hold off further messages */ /* until connection established */ break; case PROXY_MSG_TCP_DATA: if ((call = getcallfrommessage(pr->message,msglen)) != NULL && !checkaccesscontrols(call)) { buf[0] = REASON_CODE_ACCESS_DENIED; proxy_message(pr,PROXY_MSG_SYSTEM,0,buf,1); close(pr->tcp_fd); unregisterfd(pr->tcp_fd); pr->tcp_fd = 0; logmsg("pr=%ld Access denied call=%s (msg)", (long)(pr - proxy),call); break; } if (send(pr->tcp_fd,pr->message,msglen,MSG_NOSIGNAL) != msglen) { buf[0] = errno; buf[1] = buf[2] = buf[3] = 0; proxy_message(pr,PROXY_MSG_TCP_STATUS,0,buf,4); break; } break; case PROXY_MSG_TCP_CLOSE: if (pr->tcp_fd) { close(pr->tcp_fd); unregisterfd(pr->tcp_fd); pr->tcp_fd = 0; } if (debug) logmsg("pr=%ld %s TCP server closed sent=%lld", (long)(pr - proxy),pr->clientcall,(long long)pr->bytesout); break; case PROXY_MSG_UDP_DATA: IN_PORT(saddr) = htons(RTP_PORT); sendudp(rtp_fd,IN_ADDR(pr->laddr),&saddr,pr->message,msglen,NULL,0); pr->qaddr = IN_ADDR(saddr); break; case PROXY_MSG_UDP_CONTROL: IN_PORT(saddr) = htons(RTCP_PORT); sendudp(rtcp_fd,IN_ADDR(pr->laddr),&saddr,pr->message,msglen,NULL,0); pr->qaddr = IN_ADDR(saddr); break; default: break; } break; default: recv(fd,buf,sizeof(buf),0); send(fd,error403,len403,MSG_NOSIGNAL); proxy_close(pr); break; } } /* upcall for ADDR TCP open from proxy session */ static void proxy_tcp (int fd,void *ptr,int err) { struct proxy *pr = ptr; struct descriptor *ds; memset(buf,0,4); if (err || pr->state != BUSY || (ds = registerfd(fd,ADDR_TCP_FD,EPOLLIN)) == NULL) { if (errno) memcpy(buf,&errno,4); else buf[0] = 1; close(fd); pr->tcp_fd = 0; } else { ds->proxy = pr; ds->timeout = now + ADDR_SOCKET_TIMEOUT; next = 0; } if (debug) logmsg("pr=%ld %s TCP server opened (%d)", (long)(pr - proxy),pr->clientcall,(unsigned char)buf[0]); if (pr->state == BUSY) { proxy_message(pr,PROXY_MSG_TCP_STATUS,0,buf,4); modifyfd(pr->fd,EPOLLIN); /* enable client messages */ } } /* timer event on a proxy session */ static void proxy_timer (int fd) { struct proxy *pr; struct descriptor *ds; pr = (ds = &desc[fd])->proxy; switch (pr->state) { case NONCE: /* end of wait for unsol data */ while ((pr->nonce = rand()) < 0x10000000) ; /* avoid short values */ snprintf(buf,sizeof(buf),"%x",pr->nonce); if (send(fd,buf,8,MSG_NOSIGNAL) != 8) { proxy_close(pr); } else { pr->state = AUTH1; ds->timeout = now + AUTH_SOCKET_TIMEOUT; next = 0; } break; case BUSY: /* conn idle for too long */ proxy_close(pr); break; default: send(fd,error403,len403,MSG_NOSIGNAL); proxy_close(pr); break; } } /* send a proxy message back to the user */ /* return 1 when successful, 0 when send error */ /* keep left-over data in message buffer so it can be tried again later */ static int proxy_message (struct proxy *pr,enum msgtype type,in_addr_t addr, char *data,int size) { int bytesout; struct msghdr msg; struct iovec iov[2]; char hdr[HEADER_SIZE]; /* construct the header */ hdr[0] = type; memcpy(&hdr[1],&addr,4); memcpy(&hdr[5],&size,4); /* construct a descriptor for sendmsg */ memset(&msg,0,sizeof(msg)); msg.msg_iov = iov; msg.msg_iovlen = 1; iov[0].iov_base = hdr; iov[0].iov_len = HEADER_SIZE; if (size) { iov[1].iov_base = data; iov[1].iov_len = size; msg.msg_iovlen = 2; } size += HEADER_SIZE; /* send the message */ if ((bytesout = sendmsg(pr->fd,&msg,MSG_NOSIGNAL)) != size) { if (bytesout < 0) return bytesout; /* part of message was sent, copy remainder to pr->message */ if ((size - bytesout) > MAX_DATA_SIZE) return -2; /* should never happen! */ if (bytesout < HEADER_SIZE) { memcpy(pr->message,hdr + bytesout,HEADER_SIZE - bytesout); memcpy(pr->message + HEADER_SIZE - bytesout,data,size-HEADER_SIZE); } else { memcpy(pr->message,data + bytesout - HEADER_SIZE,size - bytesout); } pr->heldbytes = size - bytesout; /* #bytes to be re-sent */ logmsg("pr=%ld proxy_message msg=%d wr=%d held=%d", (long)(pr - proxy),size,bytesout,pr->heldbytes); modifyfd(pr->fd,EPOLLOUT); /* hold off input data */ /* and wait for space */ } if (bytesout > 0) pr->bytesout += bytesout; return (bytesout == size); } /* resume sending on a blocked proxy connection */ /* pr->message holds unsent data from proxy_message */ static void proxy_resume (int fd) { struct proxy *pr; int bytesout; if ((pr = desc[fd].proxy) == NULL) return; if (pr->heldbytes) { /* try sending the remainder of the data, when error close the proxy */ if ((bytesout = send(fd,pr->message,pr->heldbytes,MSG_NOSIGNAL)) < 0) { proxy_close(pr); return; } pr->bytesout += bytesout; /* still not completely sent? */ if (bytesout != pr->heldbytes) { logmsg("pr=%ld proxy_resume msg=%d wr=%d sent=%lld", (long)(pr - proxy),pr->heldbytes,bytesout, (long long)pr->bytesout); if (bytesout != 0) { pr->heldbytes -= bytesout; memmove(pr->message,pr->message + bytesout,pr->heldbytes); } return; } if (debug) logmsg("pr=%ld proxy_resume msg=%d wr=%d sent=%lld", (long)(pr - proxy),pr->heldbytes,bytesout, (long long)pr->bytesout); } /* now all is sent, resume normal operation */ pr->heldbytes = 0; modifyfd(fd,EPOLLIN); if (pr->tcp_fd != 0 && desc[pr->tcp_fd].fdtype == ADDR_TCP_FD) modifyfd(pr->tcp_fd,EPOLLIN); } /* forcibly close a proxy connection */ static void proxy_close (struct proxy *pr) { if (pr->tcp_fd) { close(pr->tcp_fd); unregisterfd(pr->tcp_fd); pr->tcp_fd = 0; } if (pr->fd) { close(pr->fd); unregisterfd(pr->fd); pr->fd = 0; } pr->heldbytes = 0; if (pr->state == BUSY) { logmsg("pr=%ld Disc call=%s sent=%lld", (long)(pr - proxy),pr->clientcall,(long long)pr->bytesout); pr->nextstatuspost = now + POSTER_INTERVAL; next = min(next,now + POSTER_INTERVAL); } pr->state = READY; } /* data received on ADDR TCP connection */ static void tcp_session (int fd) { struct proxy *pr; int nread,rv; if ((pr = desc[fd].proxy) == NULL) return; if ((nread = recv(fd,buf,TCP_DATA_SIZE,0)) <= 0) { close(fd); unregisterfd(fd); pr->tcp_fd = 0; if (debug) logmsg("pr=%ld %s TCP server EOF %d sent=%lld", (long)(pr - proxy),pr->clientcall,nread, (long long)pr->bytesout); if (pr->state == BUSY) proxy_message(pr,PROXY_MSG_TCP_CLOSE,0,NULL,0); return; } if (pr->state == BUSY) { if ((rv = proxy_message(pr,PROXY_MSG_TCP_DATA,0,buf,nread)) < 0) { proxy_close(pr); return; } /* proxy_message blocked? */ if (rv == 0) modifyfd(fd,0); /* hold off further data */ } } /* timeout on ADDR TCP connection */ static void tcp_timer (int fd) { struct proxy *pr; pr = desc[fd].proxy; close(fd); unregisterfd(fd); if (pr == NULL) return; pr->tcp_fd = 0; if (pr->state == BUSY) { if (!(desc[pr->fd].events & EPOLLIN)) modifyfd(pr->fd,EPOLLIN); proxy_message(pr,PROXY_MSG_TCP_CLOSE,0,NULL,0); } } /* UDP received on RTP or RTCP port */ /* this can be for Proxy or for Relay */ static void udp_input (int fd,in_port_t port,enum msgtype msgtype) { struct proxy *pr; struct relay *rl; int nread,pkt,wr; struct flowrecord *fr; in_addr_t laddr; struct sockaddr raddr; memset(&raddr,0,sizeof(raddr)); laddr = pkt = 0; while ((nread = recvudp(fd,&laddr,&raddr,buf,sizeof(buf))) >= 0 && ++pkt < 10) { /* find the proxy entry with this local address */ for (pr = proxy_by_addr[HASHIP(laddr)]; pr != NULL; pr = pr->next) if (laddr == IN_ADDR(pr->laddr)) { if (pr->state == BUSY) { /* remember address of qso peer for status list */ pr->qaddr = IN_ADDR(raddr); /* relay the message to the user */ if (!proxy_message(pr,msgtype,IN_ADDR(raddr),buf,nread)) proxy_close(pr); } goto nxtmsg; } /* find the relay entry with this local address */ for (rl = relay_by_addr[HASHIP(laddr)]; rl != NULL; rl = rl->next) if (laddr == IN_ADDR(rl->laddr)) { if ((fr = findpeerflowrecord(rl,IN_ADDR(raddr))) != NULL && (wr = client_message(rl,RESPONSE_TYPE_DATA,&fr->clientaddr, &raddr,port,buf,nread)) > 0) { rl->bytesin += wr; fr->bytesin += wr; fr->lastactivity = now; goto nxtmsg; } /* locating the flow or sending the packet failed */ /* if this is NOT a BYE packet, send back a BYE */ if (port == htons(RTCP_PORT) && !isbyepacket(buf,nread)) { nread = buildbyepacket(); sendudp(fd,IN_ADDR(rl->laddr),&raddr,buf,nread,NULL,0); } goto nxtmsg; } nxtmsg: ; } } /* post regular status updates for all proxies */ static void statusposter () { static int64_t nextpost = 0; static int p = 0; if (now >= nextpost) { nextpost = now + POSTER_INTERVAL; next = 0; while (proxy[p].state == UNUSED_P || now < proxy[p].nextstatuspost) if (++p >= nproxies) { p = 0; return; } poststatus(&proxy[p],1); if (++p >= nproxies) p = 0; } } /* post proxy status for specified proxy */ static int poststatus (struct proxy *pr,int nonblock) { if (pr->state == UNUSED_P) return -1; pr->nextstatuspost = now + STATP_INTERVAL; next = 0; return opentcp(&pr->laddr,&poster_addr,nonblock,post_upcall,pr); } /* upcall for TCP open from poststatus */ static void post_upcall (int fd,void *ptr,int err) { struct proxy *pr = ptr; int len; struct descriptor *ds; char *public,*status; static char post[512]; if (err || (ds = registerfd(fd,POSTER_FD,EPOLLIN)) == NULL) { close(fd); return; } ds->timeout = now + CONNECT_TIMEOUT; next = 0; /* post a status message to the http server */ public = !strcmp(pr->password,PUBLIC_PASSWORD)? "Y" : "N"; switch (pr->state) { case READY: status = "Ready"; break; case OFF: status = "Off"; break; default: status = "Busy"; break; } len = snprintf(post,sizeof(post), "name=%s&comment=%s&public=%s&status=%s&a=%s&d=%.32s&p=%d&v=%s", pr->regname,pr->regcomment,public,status, pr->regaddr,pr->digest,proxy_port,PROGRAM_VERSION); if (pr->state == BUSY) len += snprintf(post + len,sizeof(post) - len, "&cc=%s&ca=%s",pr->clientcall,pr->raddr_s); len = snprintf(buf,sizeof(buf), "POST %s HTTP/1.0\r\nHost: %s\r\nContent-Length: %d\r\nContent-Type: application/x-www-form-urlencoded\r\n\r\n%s\r\n", POSTER_URL,POSTER_HOST,len,post); if (send(fd,buf,len,MSG_NOSIGNAL) != len && debug) logmsg("poststatus send error (%s)",strerror(errno)); } /* UDP received on Relay client port */ static void client_input (int fd) { struct relay *rl; int nread,pkt,h,dfd; unsigned int len; struct registration *reg; struct flowrecord *fr; in_addr_t laddr; struct sockaddr raddr,daddr; char tmp1[16],tmp2[16]; laddr = pkt = 0; memset(&raddr,0,sizeof(raddr)); memset(&daddr,0,sizeof(daddr)); daddr.sa_family = AF_INET; while ((nread = recvudp(fd,&laddr,&raddr,buf,sizeof(buf))) >= 0 && ++pkt < 10) { if ((nread -= REQ_HLEN) < 0 || nread > MAX_PACKET_DATA) continue; /* too short or too long */ /* extract dest address and port from header */ memcpy(&IN_ADDR(daddr),&buf[1],4); memcpy(&IN_PORT(daddr),&buf[5],2); /* find the relay entry with this local address */ for (rl = relay_by_addr[HASHIP(laddr)]; rl != NULL; rl = rl->next) if (laddr == IN_ADDR(rl->laddr)) break; if (rl == NULL) continue; /* process the message */ switch (buf[0]) { case REQUEST_TYPE_KEEPALIVE: if ((fr = findflowrecord(rl,&raddr,IN_ADDR(daddr))) != NULL) fr->lastactivity = now; client_message(rl,RESPONSE_TYPE_ACK,&raddr,&daddr,0,NULL,0); break; case REQUEST_TYPE_DATA: /* data record, locate existing flow record */ if ((fr = findflowrecord(rl,&raddr,IN_ADDR(daddr))) == NULL) { /* no flow record, check if this user is registered */ if ((reg = findcallsign(rl,&raddr)) == NULL) { if (!IN_EQUAL(raddr,rl->lastclient)) { getnameinfo(&raddr,sizeof(struct sockaddr), tmp1,sizeof(tmp1),tmp2,sizeof(tmp2), NI_NUMERICHOST|NI_NUMERICSERV); logmsg("rl=%ld flow from nonreg %s:%s", (long)(rl - relay),tmp1,tmp2); } rl->lastclient = raddr; client_error(rl,&raddr,&daddr,ERR_ALLOCATION); break; } reg->lastactivity = now; /* make sure there is no competing flow to this peer */ if (findpeerflowrecord(rl,IN_ADDR(daddr)) != NULL) { if (!IN_EQUAL(raddr,rl->lastclient)) { getnameinfo(&daddr,sizeof(struct sockaddr),tmp1, sizeof(tmp1),NULL,0,NI_NUMERICHOST); logmsg("rl=%ld competing flow %s to %s", (long)(rl - relay),reg->clientcall,tmp1); } rl->lastclient = raddr; client_error(rl,&raddr,&daddr,ERR_ALLOCATION); break; } /* be sure we're not trying to send a packet to */ /* ourselves, or to some other client of this relay */ if (IN_ADDR(daddr) == IN_ADDR(rl->laddr)) { if (!IN_EQUAL(raddr,rl->lastclient)) { getnameinfo(&daddr,sizeof(struct sockaddr),tmp1, sizeof(tmp1),NULL,0,NI_NUMERICHOST); logmsg("rl=%ld conflicting flow %s to %s", (long)(rl - relay),reg->clientcall,tmp1); } rl->lastclient = raddr; client_error(rl,&raddr,&daddr,ERR_ALLOCATION); break; } /* create new flow record */ if ((fr = calloc(1,sizeof(struct flowrecord))) == NULL) { client_error(rl,&raddr,&daddr,ERR_ALLOCATION); break; } getnameinfo(&daddr,sizeof(struct sockaddr),tmp1, sizeof(tmp1),NULL,0,NI_NUMERICHOST); if (debug) logmsg("rl=%ld New flow %s to %s", (long)(rl - relay),reg->clientcall,tmp1); fr->clientaddr = raddr; fr->peeraddr = IN_ADDR(daddr); /* link it to the correct hashlist depending on peer */ h = HASHIP(IN_ADDR(daddr)); if ((fr->next = rl->flowrecord[h]) != NULL) fr->next->prev = fr; rl->flowrecord[h] = fr; } fr->lastactivity = now; if (IN_PORT(daddr) == htons(RTP_PORT)) dfd = rtp_fd; else if (IN_PORT(daddr) == htons(RTCP_PORT)) dfd = rtcp_fd; else break; if (sendudp(dfd,IN_ADDR(rl->laddr),&daddr,&buf[REQ_HLEN],nread, NULL,0) == nread) { rl->bytesout += nread; fr->bytesout += nread; } break; case REQUEST_TYPE_REGISTER: if (nread >= 2) { len = min((unsigned int)buf[REQ_HLEN],nread - 1); buf[REQ_HLEN+1 + len] = '\0'; registercallsign(rl,&raddr,&buf[REQ_HLEN+1]); client_message(rl,RESPONSE_TYPE_ACK,&raddr,&daddr,0,NULL,0); } break; case REQUEST_TYPE_UNREGISTER: if (nread >= 2) { len = min((unsigned int)buf[REQ_HLEN],nread - 1); buf[REQ_HLEN+1 + len] = '\0'; if ((reg = findcallsign(rl,&raddr)) != NULL && !strcmp(reg->clientcall,&buf[REQ_HLEN+1])) { unregistercallsign(rl,reg,"Unregister"); client_message(rl,RESPONSE_TYPE_ACK,&raddr,&daddr,0,NULL,0); } } break; case REQUEST_TYPE_DROP_FLOW: if ((fr = findflowrecord(rl,&raddr,IN_ADDR(daddr))) != NULL) deleteflowrecord(rl,fr,"Drop"); break; case REQUEST_TYPE_PING: IN_ADDR(daddr) = IN_ADDR(raddr); IN_PORT(daddr) = 0; len = snprintf(buf,sizeof(buf),"ver: %s\r\nipv4: %s\r\n", PROGRAM_VERSION,rl->laddr_s); client_message(rl,RESPONSE_TYPE_PING,&raddr,&daddr,0,buf,len); break; default: break; } } } /* find flowrecord for source address */ static struct flowrecord * findpeerflowrecord (struct relay *rl,in_addr_t saddr) { struct flowrecord *fr; for (fr = rl->flowrecord[HASHIP(saddr)]; fr != NULL; fr = fr->next) if (fr->peeraddr == saddr) break; return fr; } /* find flowrecord for client socket and address */ static struct flowrecord * findflowrecord (struct relay *rl,struct sockaddr *caddr,in_addr_t paddr) { struct flowrecord *fr; for (fr = rl->flowrecord[HASHIP(paddr)]; fr != NULL; fr = fr->next) if (IN_EQUAL(fr->clientaddr,*caddr) && fr->peeraddr == paddr) break; return fr; } /* delete flowrecord for client socket and address */ static void deleteflowrecord (struct relay *rl,struct flowrecord *fr,char *reason) { struct sockaddr addr; char tmp1[16],tmp2[16]; /* log */ getnameinfo(&fr->clientaddr,sizeof(struct sockaddr), tmp1,sizeof(tmp1),NULL,0,NI_NUMERICHOST); memset(&addr,0,sizeof(addr)); addr.sa_family = AF_INET; IN_ADDR(addr) = fr->peeraddr; getnameinfo(&addr,sizeof(struct sockaddr), tmp2,sizeof(tmp2),NULL,0,NI_NUMERICHOST); if (debug) logmsg("rl=%ld %s flow %s to %s in=%lld out=%lld", (long)(rl - relay),reason,tmp1,tmp2, (long long)fr->bytesin,(long long)fr->bytesout); /* unlink this flow record */ if (fr->next != NULL) fr->next->prev = fr->prev; if (fr->prev != NULL) fr->prev->next = fr->next; else rl->flowrecord[HASHIP(IN_ADDR(addr))] = fr->next; free(fr); } /* update flow records for a client IP change */ static void fixupflowrecord (struct relay *rl,struct sockaddr *raddr_old,struct sockaddr *raddr_new) { struct flowrecord *fr; int h; /* peer address remains the same, so records can be updated in-place */ for (h = 0; h < NHASH; h++) { for (fr = rl->flowrecord[h]; fr != NULL; fr = fr->next) { if (IN_EQUAL(fr->clientaddr,*raddr_old)) { fr->clientaddr = *raddr_new; } } } } /* register a relay client callsign */ static void registercallsign (struct relay *rl,struct sockaddr *raddr,char *callsign) { char *p,letter,digit; struct registration *reg; char tmp1[16],tmp2[16]; /* validate callsign */ letter = digit = 0; for (p = callsign; *p != '\0'; p++) if (isupper((unsigned char)*p)) letter++; else if (isdigit((unsigned char)*p)) digit++; else if (*p != '-') return; if (!letter || !digit || (p - callsign) > 15) return; /* find existing registration */ for (reg = rl->registration; reg != NULL; reg = reg->next) if (!strcmp(callsign,reg->clientcall)) break; /* when not found, add a new one at head of list */ if (reg == NULL) { if ((reg = calloc(1,sizeof(struct registration))) == NULL) return; if ((reg->next = rl->registration) != NULL) reg->next->prev = reg; rl->registration = reg; strcpy(reg->clientcall,callsign); getnameinfo(raddr,sizeof(struct sockaddr),tmp1,sizeof(tmp1), tmp2,sizeof(tmp2),NI_NUMERICHOST|NI_NUMERICSERV); logmsg("rl=%ld Register %s at %s:%s", (long)(rl - relay),reg->clientcall,tmp1,tmp2); } else { /* If client IP address has changed for an existing */ /* callsign registration, fix up this address in any */ /* flow-table records also. */ if (!IN_EQUAL(reg->clientaddr,*raddr)) { getnameinfo(raddr,sizeof(struct sockaddr),tmp1,sizeof(tmp1), tmp2,sizeof(tmp2),NI_NUMERICHOST|NI_NUMERICSERV); logmsg("rl=%ld client address change for %s to %s:%s", (long)(rl - relay),reg->clientcall,tmp1,tmp2); fixupflowrecord(rl,®->clientaddr,raddr); } } /* update registration */ reg->clientaddr = *raddr; reg->lastactivity = now; } /* find a client callsign registration */ static struct registration * findcallsign (struct relay *rl,struct sockaddr *raddr) { struct registration *reg; /* find registration by addr */ for (reg = rl->registration; reg != NULL; reg = reg->next) if (IN_EQUAL(*raddr,reg->clientaddr)) break; return reg; } /* unregister a relay client callsign */ static void unregistercallsign (struct relay *rl,struct registration *reg,char *reason) { int h; struct flowrecord *fr,*frn; char tmp1[16],tmp2[16]; getnameinfo(®->clientaddr,sizeof(struct sockaddr),tmp1,sizeof(tmp1), tmp2,sizeof(tmp2),NI_NUMERICHOST|NI_NUMERICSERV); logmsg("rl=%ld %s %s at %s:%s", (long)(rl - relay),reason,reg->clientcall,tmp1,tmp2); /* remove any flows for this client */ for (h = 0; h < NHASH; h++) for (fr = rl->flowrecord[h]; fr != NULL; fr = frn) { frn = fr->next; if (IN_EQUAL(fr->clientaddr,reg->clientaddr)) deleteflowrecord(rl,fr,reason); } /* unlink this registration */ if (reg->next != NULL) reg->next->prev = reg->prev; if (reg->prev != NULL) reg->prev->next = reg->next; else rl->registration = reg->next; free(reg); } /* remove obsolete flows and relay callsign registrations */ static void removeobsolete (void) { static int64_t nextremove = 0; struct relay *rl; struct registration *reg,*regn; struct flowrecord *fr,*frn; int n,h; if (now >= nextremove) { for (n = 0; n < nrelays; n++) { rl = &relay[n]; if (rl->state != RELAY) continue; for (h = 0; h < NHASH; h++) for (fr = rl->flowrecord[h]; fr != NULL; fr = frn) { frn = fr->next; if (fr->lastactivity < (now - FLOW_TIMEOUT)) deleteflowrecord(rl,fr,"Timeout"); } for (reg = rl->registration; reg != NULL; reg = regn) { regn = reg->next; if (reg->lastactivity < (now - REG_TIMEOUT)) unregistercallsign(rl,reg,"Timeout"); } } nextremove = now + RELAY_INTERVAL; } } /* send an error code to the relay client */ static int client_error (struct relay *rl,struct sockaddr *addr,struct sockaddr *saddr, enum relayerror errcode) { char err[1]; err[0] = errcode; return client_message(rl,RESPONSE_TYPE_ERROR,addr,saddr,0,err,1); } /* send a relay response message to the client */ static int client_message (struct relay *rl,enum response resp,struct sockaddr *raddr, struct sockaddr *saddr,in_port_t dport,char *data,int size) { int rv; char hdr[RES_HLEN]; /* construct the header */ hdr[0] = resp; memcpy(&hdr[1],&IN_ADDR(*saddr),4); memcpy(&hdr[5],&dport,2); memcpy(&hdr[7],&dport,2); /* send the message */ errno = 0; if ((rv = sendudp(client_fd,IN_ADDR(rl->laddr),raddr, hdr,RES_HLEN,data,size)) != (RES_HLEN + size)) { logmsg("rl=%ld client_message msg=%d wr=%d (%s)", (long)(rl - relay),RES_HLEN + size,rv,strerror(errno)); rv = -1; } return rv; } /* check if a directory server message contains a callsign and extract it */ /* return a pointer to the \0-terminated callsign (overwritten by each call) */ static char * getcallfrommessage (char *data,int len) { int i; static char call[32]; if (len < 20) return NULL; for (i = 1; i < 19; i++) if ((uint8_t)data[i] == 0xAC && (uint8_t)data[i+1] == 0xAC) { memset(call,0,sizeof(call)); if (data[0] == 'l') memcpy(call,data + 1,i - 1); else memcpy(call,data,i); return call; } return NULL; } /* check access controls (allowed and denied callsigns) */ static int checkaccesscontrols (char *call) { regex_t reg; int result; if (denied_pattern != NULL && denied_pattern[0] != '\0' && regcomp(®,denied_pattern,REG_EXTENDED|REG_ICASE|REG_NOSUB) == 0) { result = regexec(®,call,0,NULL,0); regfree(®); if (result == 0) return 0; } if (allowed_pattern != NULL && allowed_pattern[0] != '\0' && regcomp(®,allowed_pattern,REG_EXTENDED|REG_ICASE|REG_NOSUB) == 0) { result = regexec(®,call,0,NULL,0); regfree(®); if (result != 0) return 0; } return 1; } /* check if an RTCP message is a BYE message */ /* look for these two two-byte sequences: 0xC0 0xC9 and 0xE1 0xCB */ static int isbyepacket (char *data,int len) { int i; for (i = 0; i < (len - 1); i++) if ((uint8_t)data[i] == 0xC0 && (uint8_t)data[i+1] == 0xC9) { while (i < (len - 1)) { if ((uint8_t)data[i] == 0xE1 && (uint8_t)data[i+1] == 0xCB) return 1; i++; } break; } return 0; } /* build an RTCP BYE message in the global buffer 000000: c0 c9 00 01 00 00 00 00 e1 cb 00 09 00 00 00 00 ................ 000010: 13 4e 6f 20 72 6f 75 74 65 20 61 76 61 69 6c 61 .No route availa 000020: 62 6c 65 00 00 00 00 04 ble..... */ static int buildbyepacket () { char *p = buf; memset(buf,0,40); /* empty RR */ *p++ = 0xC0; /* ver 3, no padding, RC=0 */ *p++ = 0xC9; /* payload=201 */ p++; *p++ = 1; /* len=1 */ p += 4; /* SSRC of packet sender (0) */ /* BYE (count=1) */ *p++ = 0xE1; /* ver 3, padding, SC=1 */ *p++ = 0xCB; /* payload=203 */ p++; *p++ = 9; /* len=9 */ p += 4; /* SSRC/CSRC_1 (0) */ *p++ = 19; /* padded length */ strcpy(p,"No route available"); /* reason for leaving */ p += 19; p += 3; *p++ = 4; /* 4 more bytes, last being 4 */ return p - buf; } /* read the configuration file */ /* syntax: lines with name=value (no quotes), # in 1st column for comments */ static int readconfig (char *config) { FILE *cfile; char *p,*q; struct confvar *cv; if ((cfile = fopen(config,"r")) == 0) { fprintf(stderr,"Cannot open config file %s\n",config); return -1; } while (fgets(buf,sizeof(buf),cfile) != NULL) { if ((p = strchr(buf,'\r')) != NULL || (p = strchr(buf,'\n')) != NULL) *p = '\0'; p = buf; while (*p == ' ' || *p == '\t') p++; if (*p == '\0' || *p == '#') continue; if ((q = strchr(p,'=')) == NULL) { fprintf(stderr,"Invalid config line: %s\n",p); continue; } *q++ = '\0'; cv = calloc(1,sizeof(struct confvar)); cv->name = strdup(p); cv->value = strdup(q); cv->next = configvars; configvars = cv; } fclose(cfile); return 0; } /* get the value of a configuration variable */ static char * confvar (char *name) { struct confvar *cv; for (cv = configvars; cv != NULL; cv = cv->next) if (!strcmp(name,cv->name)) return cv->value; return NULL; } /* register a filedescriptor for epoll */ struct descriptor * registerfd (int fd,enum fdtype fdtype,uint32_t events) { struct descriptor *ds; static struct epoll_event epoll_event = {0}; if (fd < 0 || fd >= FD_SETSIZE) return NULL; /* too big to register */ epoll_event.events = events; epoll_event.data.fd = fd; if (epoll_ctl(epoll_fd,EPOLL_CTL_ADD,fd,&epoll_event) < 0) { logmsg("epoll_ctl add failed (%d %s)",fd,strerror(errno)); return NULL; } minfd = min(minfd,fd); /* keep track of min fd */ maxfd = max(maxfd,fd); /* keep track of max fd */ ds = &desc[fd]; ds->fdtype = fdtype; ds->events = events; return ds; } /* modify a filedescriptor's events for epoll */ static int modifyfd (int fd,uint32_t events) { static struct epoll_event epoll_event = {0}; if (fd < 0 || fd >= FD_SETSIZE) return -1; epoll_event.events = events; epoll_event.data.fd = fd; if (epoll_ctl(epoll_fd,EPOLL_CTL_MOD,fd,&epoll_event) < 0) { logmsg("epoll_ctl mod failed (%d %s)",fd,strerror(errno)); return -1; } desc[fd].events = events; numevents = 0; return 0; } /* remove a filedescriptor registration */ static int unregisterfd (int fd) { if (fd < 0 || fd >= FD_SETSIZE) return -1; memset(&desc[fd],0,sizeof(struct descriptor)); epoll_ctl(epoll_fd,EPOLL_CTL_DEL,fd,NULL); /* ignore errors */ if (fd == maxfd) { while (--fd >= 0 && desc[fd].fdtype == UNUSED) ; maxfd = fd; } numevents = 0; return 0; } /* lookup host (DNS or literal), convert to sockaddr and return plain IP */ static int hostlookup (char *name,struct sockaddr *addr,char *text,size_t len) { struct hostent *hp; if ((hp = gethostbyname2(name,AF_INET)) == NULL) return -1; memset(addr,0,sizeof(struct sockaddr)); addr->sa_family = hp->h_addrtype; memcpy(&IN_ADDR(*addr),hp->h_addr_list[0],hp->h_length); if (text != NULL && len != 0) getnameinfo(addr,sizeof(struct sockaddr),text,len, NULL,0,NI_NUMERICHOST); return 0; } /* bind server port and listen */ static int bindport (int port,int socktype) { int v,fd; struct addrinfo *result,*rp; struct addrinfo hints; char port_s[16]; snprintf(port_s,sizeof(port_s),"%d",port); memset(&hints,0,sizeof(struct addrinfo)); hints.ai_family = AF_INET; /* allow IPv4 only */ hints.ai_socktype = socktype; hints.ai_flags = AI_PASSIVE; /* for wildcard IP address */ if ((v = getaddrinfo(NULL,port_s,&hints,&result)) != 0) { fprintf(stderr,"getaddrinfo: %s\n",gai_strerror(v)); return 1; } /* getaddrinfo() returns a list of address structures. Try each address until we successfully bind(2). If socket(2) (or bind(2)) fails, we (close the socket and) try the next address. */ for (rp = result; rp != NULL; rp = rp->ai_next) { fd = socket(rp->ai_family,rp->ai_socktype,rp->ai_protocol); if (fd == -1) continue; v = 1; setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,&v,sizeof(v)); if (bind(fd,rp->ai_addr,rp->ai_addrlen) == 0) break; /* success */ close(fd); } freeaddrinfo(result); /* no longer needed */ if (rp == NULL) { /* no address succeeded */ fprintf(stderr,"Could not bind\n"); return -2; } v = fcntl(fd,F_GETFL,0); /* set nonblocking mode */ fcntl(fd,F_SETFL,v|O_NONBLOCK); if (socktype == SOCK_STREAM && listen(fd,3)) { fprintf(stderr,"Could not listen\n"); return -3; } if (socktype == SOCK_DGRAM) { v = 1; if (setsockopt(fd,IPPROTO_IP,IP_PKTINFO,&v,sizeof(v)) < 0) return -4; } return fd; } /* open TCP connection */ /* the passed upcall function is called after the connection has been */ /* established. this function can send the first data to the connection. */ /* when nonblock is true, the connect is done in nonblocking mode and the */ /* upcall function is called from the main event loop. */ static int opentcp (struct sockaddr *from,struct sockaddr *to,int nonblock, upcall_t upcall,void *ptr) { int fd,v,err; struct descriptor *ds; if ((fd = socket(AF_INET,SOCK_STREAM,0)) < 0) return -1; v = 1; setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,&v,sizeof(v)); bind(fd,from,sizeof(struct sockaddr)); if (nonblock) { v = fcntl(fd,F_GETFL,0); /* set nonblocking mode */ fcntl(fd,F_SETFL,v|O_NONBLOCK); } if (connect(fd,to,sizeof(struct sockaddr)) < 0) { if (nonblock && errno == EINPROGRESS) { /* register the fd for later upcall, but not for input yet */ if ((ds = registerfd(fd,CONNECT_FD,EPOLLOUT)) != NULL) { ds->timeout = now + CONNECT_TIMEOUT; next = 0; ds->upcall = upcall; ds->ptr = ptr; return fd; } } if (debug) logmsg("opentcp connect failed nb=%d",nonblock); close(fd); return -1; } /* continue here when the connect could be completed synchronously */ v = fcntl(fd,F_GETFL,0); /* set nonblocking mode */ err = fcntl(fd,F_SETFL,v|O_NONBLOCK); /* call the upcall function now, it should close the fd when err */ err |= opentcp_upcall(fd,upcall,ptr); return err? -1 : fd; } /* set the final modes of the TCP socket and perform the upcall function */ static int opentcp_upcall (int fd,upcall_t upcall,void *ptr) { int v,err; socklen_t len; if (desc[fd].fdtype == CONNECT_FD) unregisterfd(fd); /* unregister when async mode */ err = errno = 0; len = sizeof(err); getsockopt(fd,SOL_SOCKET,SO_ERROR,&err,&len); if (!errno && !err) { v = 256*1024; /* set big buffer */ err |= setsockopt(fd,SOL_SOCKET,SO_SNDBUF,&v,sizeof(v)); v = 1; /* set keepalive mode */ err |= setsockopt(fd,SOL_SOCKET,SO_KEEPALIVE,&v,sizeof(v)); } if (upcall != NULL) (*upcall)(fd,ptr,err); /* call the upcall function */ /* it should close the fd when err */ return err; } /* receive datagram from UDP socket including addresses */ static int recvudp (int fd,in_addr_t *laddr,struct sockaddr *raddr,char *buf1,int len1) { int nread; struct msghdr msg; struct cmsghdr *cp; struct iovec iov[1]; struct in_pktinfo *pktinfo; char cmsg[128]; msg.msg_name = raddr; msg.msg_namelen = sizeof(struct sockaddr); msg.msg_iov = iov; msg.msg_iovlen = 1; msg.msg_control = cmsg; msg.msg_controllen = sizeof(cmsg); msg.msg_flags = 0; iov[0].iov_base = buf1; iov[0].iov_len = len1; if ((nread = recvmsg(fd,&msg,0)) < 0) return nread; for (cp = CMSG_FIRSTHDR(&msg); cp != NULL; cp = CMSG_NXTHDR(&msg,cp)) if (cp->cmsg_level == IPPROTO_IP && cp->cmsg_type == IP_PKTINFO) { pktinfo = (struct in_pktinfo *)CMSG_DATA(cp); *laddr = pktinfo->ipi_addr.s_addr; break; } return nread; } /* send datagram to UDP socket specifying addresses */ static int sendudp (int fd,in_addr_t laddr,struct sockaddr *raddr, char *buf1,int len1,char *buf2,int len2) { struct msghdr msg; struct cmsghdr *cp; struct iovec iov[2]; struct in_pktinfo *pktinfo; char cmsg[128]; msg.msg_name = raddr; msg.msg_namelen = sizeof(struct sockaddr); msg.msg_iov = iov; msg.msg_iovlen = 1; msg.msg_control = cmsg; msg.msg_controllen = sizeof(cmsg); msg.msg_flags = 0; iov[0].iov_base = buf1; iov[0].iov_len = len1; if (len2) { iov[1].iov_base = buf2; iov[1].iov_len = len2; msg.msg_iovlen = 2; } cp = CMSG_FIRSTHDR(&msg); cp->cmsg_level = IPPROTO_IP; cp->cmsg_type = IP_PKTINFO; cp->cmsg_len = CMSG_LEN(sizeof(struct in_pktinfo)); pktinfo = (struct in_pktinfo *)CMSG_DATA(cp); memset(pktinfo,0,sizeof(struct in_pktinfo)); pktinfo->ipi_spec_dst.s_addr = laddr; msg.msg_controllen = cp->cmsg_len; return sendmsg(fd,&msg,0); } /* return timestamp in microseconds */ static int64_t timestamp () { struct timeval tv; gettimeofday(&tv,NULL); return ((int64_t)tv.tv_sec * USEC) + tv.tv_usec; } /* URL encode of a string to a malloc'ed buffer */ static char *urlencode(char *s) { char *tbuf = malloc(strlen(s) * 3 + 1), *pbuf = tbuf; unsigned char c; while ((c = *s++) != '\0') { if (isalnum(c) || strchr("-_.~",c) != NULL) *pbuf++ = c; else if (c == ' ') *pbuf++ = '+'; else *pbuf++ = '%', snprintf(pbuf,3,"%02x",c), pbuf += 2; } *pbuf++ = '\0'; return realloc(tbuf,pbuf - tbuf); }