pe1chl-elproxy/elproxy.c
2022-05-29 21:12:14 +10:00

2875 lines
66 KiB
C

/* EchoLink multi-proxy/relay server for Linux */
/* (c)2015-2021 by Rob Janssen, PE1CHL */
/* Protocol and constants from Java version 1.2.3 by Jonathan Taylor, K1RFD */
/* EchoLink is a registered trademark of Synergenics, LLC */
#include <stdio.h>
#include <stdarg.h>
#include <stdlib.h>
#include <string.h>
#include <ctype.h>
#include <errno.h>
#include <fcntl.h>
#include <netdb.h>
#include <regex.h>
#include <signal.h>
#include <time.h>
#include <unistd.h>
#include <locale.h>
#include <sys/file.h>
#include <sys/time.h>
#include <sys/ioctl.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <linux/sockios.h>
#include <nettle/md5-compat.h>
#define RTP_PORT 5198
#define RTCP_PORT 5199
#define ADDR_SERVER_PORT 5200
#define CLIENT_PORT 7000
#define PROXY_PORT 8100
#define IN_SA(SA) ((struct sockaddr_in *)&(SA))
#define IN_PORT(SA) IN_SA(SA)->sin_port
#define IN_ADDR(SA) IN_SA(SA)->sin_addr.s_addr
#define IN_EQUAL(SA1,SA2) (IN_ADDR(SA1) == IN_ADDR(SA2) && \
IN_PORT(SA1) == IN_PORT(SA2))
#define NHASH 251
#define HASHIP(addr) (ntohl(addr) % NHASH)
#define POSTER_HOST "www.echolink.org"
#define POSTER_PORT 80
#define POSTER_URL "/proxypost.jsp"
#define MD5_SALT "#5A!zu"
#define USEC 1000000LL /* microseconds in 1s */
#define AUTH_SOCKET_TIMEOUT 30*USEC
#define CLIENT_SOCKET_TIMEOUT 900*USEC
#define ADDR_SOCKET_TIMEOUT 60*USEC
#define CONNECT_TIMEOUT 10*USEC
#define POSTER_INTERVAL USEC/2
#define STATP_INTERVAL 600*USEC
#define STATS_INTERVAL 15*USEC /* must be >10s */
#define REG_TIMEOUT 600*USEC
#define FLOW_TIMEOUT 60*USEC
#define RELAY_INTERVAL 10*USEC
#define PROGRAM_VERSION "1.2.5c"
#define PUBLIC_PASSWORD "PUBLIC"
#define EXAMPLE_PASSWORD "notset"
#define HEADER_SIZE 9 /* proxy header size */
#define MAX_DATA_SIZE 5000 /* proxy message size */
#define TCP_DATA_SIZE 4096 /* <= MAX_DATA_SIZE */
#define REQ_HLEN 7 /* relay req header */
#define RES_HLEN 9 /* relay rsp header */
#define MAX_PACKET_DATA 1500 /* relay max data */
#ifndef max
# define max(a,b) (((a) > (b)) ? (a) : (b))
# define min(a,b) (((a) < (b)) ? (a) : (b))
#endif
/* datatypes */
/* proxy message type on TCP socket */
enum msgtype {
PROXY_MSG_TCP_OPEN = 1, /* open TCP connection to ADDR srvr */
PROXY_MSG_TCP_DATA = 2, /* data for TCP connection */
PROXY_MSG_TCP_CLOSE = 3, /* close TCP connection */
PROXY_MSG_TCP_STATUS = 4, /* status of TCP connect */
PROXY_MSG_UDP_DATA = 5, /* data for RTP socket */
PROXY_MSG_UDP_CONTROL = 6, /* data for RTCP socket */
PROXY_MSG_SYSTEM = 7 /* system message */
};
/* proxy reason codes for auth failure */
enum reason {
REASON_CODE_BAD_PW = 1, /* bad password for proxy server */
REASON_CODE_ACCESS_DENIED = 2 /* client rejected due to pattern */
};
/* relay request types on UDP socket */
enum request {
REQUEST_TYPE_KEEPALIVE = 0, /* keep flow alive */
REQUEST_TYPE_DATA = 1, /* relay data */
REQUEST_TYPE_CLOSE = 2, /* unused */
REQUEST_TYPE_REGISTER = 3, /* register callsign */
REQUEST_TYPE_UNREGISTER = 4, /* unregister callsign */
REQUEST_TYPE_DROP_FLOW = 5, /* delete flow record */
REQUEST_TYPE_PING = 6 /* ping request */
};
/* relay response types on UDP socket */
enum response {
RESPONSE_TYPE_DATA = 1, /* relay data */
RESPONSE_TYPE_ERROR = 2, /* error message */
RESPONSE_TYPE_PING = 3, /* ping reply */
RESPONSE_TYPE_ACK = 4 /* acknowledgment of client request */
};
/* relay error codes */
enum relayerror {
ERR_ALLOCATION = 1, /* cannot allocate flow */
};
/* definition of an upcall function for opentcp */
typedef void (*upcall_t)(int fd,void *ptr,int err);
/* items from the config file */
struct confvar
{
struct confvar *next; /* linked list */
char *name; /* variable name */
char *value; /* variable value */
};
/* a descriptor for each open fd holding all state and session info */
struct descriptor
{
enum fdtype {
UNUSED = 0,
RTP_FD,
RTCP_FD,
CLIENT_FD,
PROXY_FD,
SESSION_FD,
ADDR_TCP_FD,
POSTER_FD,
CONNECT_FD,
} fdtype; /* type of fd */
uint32_t events; /* epoll events */
struct proxy *proxy; /* related proxy session */
int64_t timeout; /* microseconds of next timer event */
upcall_t upcall; /* upcall for nonblocking connect */
void *ptr; /* pointer parameter for upcall */
};
/* definitions for a single proxy instance */
struct proxy
{
struct proxy *next; /* linked list */
enum state {
UNUSED_P = 0,
READY,
OFF,
NONCE,
AUTH1,
AUTH2,
BUSY,
} state; /* connection state */
int fd; /* client file descriptor */
int tcp_fd; /* ADDR TCP file descriptor */
uint32_t nonce; /* nonce for client auth */
int64_t bytesout; /* bytes sent to this client */
int64_t nextstatuspost; /* time for next status post */
int64_t connectiontimeout; /* timeout for this connection */
in_addr_t qaddr; /* address of most recent qso peer */
char *regaddr; /* registration address */
char *regname; /* registration name */
char *regcomment; /* registration comment */
char *password; /* password for this proxy */
char digest[32]; /* MD5 digest of name/comment */
char clientcall[16]; /* client callsign */
struct sockaddr laddr; /* local address */
char laddr_s[16]; /* same as a string */
struct sockaddr raddr; /* remote address */
char raddr_s[16]; /* same as a string */
int hdrbytes; /* header bytes that have been read */
int msgbytes; /* message bytes that have been read */
int heldbytes; /* message bytes held for throttle */
char header[HEADER_SIZE]; /* buffer for received TCP header */
char message[MAX_DATA_SIZE]; /* buffer for received TCP message */
};
/* definitions for a single relay instance */
struct relay
{
struct relay *next; /* linked list */
enum rlstate {
UNUSED_R = 0,
RELAY,
} state; /* relay state */
int64_t bytesout; /* bytes sent by this relay */
int64_t bytesin; /* bytes received by this relay */
struct sockaddr laddr; /* local address */
char laddr_s[16]; /* same as a string */
struct sockaddr lastclient; /* client for which last error logged */
struct registration *registration; /* callsign registration */
struct flowrecord *flowrecord[NHASH]; /* flow records by peeraddress */
};
/* callsign registration */
struct registration
{
struct registration *next; /* linked list */
struct registration *prev;
char clientcall[16]; /* client callsign */
struct sockaddr clientaddr; /* client address */
int64_t lastactivity; /* timestamp of last activity */
};
/* flow record */
struct flowrecord
{
struct flowrecord *next; /* linked list */
struct flowrecord *prev;
in_addr_t peeraddr; /* peer address */
struct sockaddr clientaddr; /* client address */
int64_t lastactivity; /* timestamp of last activity */
int64_t bytesout; /* bytes sent to this client */
int64_t bytesin; /* bytes received from this client */
};
/* variables */
int daemonize = 1; /* automatically become daemon? */
int debug = 0; /* print debug output? */
int quit = 0; /* quit program? */
FILE *logfile = NULL; /* logging/debugging file */
FILE *statfile = NULL; /* file to put operative status */
struct confvar *configvars = NULL; /* list of config variables */
char *denied_pattern = NULL; /* denied callsigns pattern */
char *allowed_pattern = NULL; /* allowed callsigns pattern */
int proxy_port = PROXY_PORT; /* proxy portnumber */
int64_t connectiontimeout = 0; /* connection timeout */
int64_t now; /* current time in microseconds */
int64_t next = 0; /* time of earliest next time event */
int epoll_fd; /* fd for epoll instance */
int rtp_fd; /* fd for RTP port */
int rtcp_fd; /* fd for RTCP port */
int client_fd; /* fd for CLIENT port */
int proxy_fd; /* fd for PROXY port */
int minfd = FD_SETSIZE-1; /* lowest fd currently registered */
int maxfd = -1; /* highest fd currently registered */
int numevents; /* number of epoll events received */
int nproxies = 0; /* number of proxies defined */
int nrelays = 0; /* number of relays defined */
struct proxy *proxy; /* data for each proxy */
struct proxy *proxy_by_addr[NHASH]; /* hashed index of proxy by address */
struct relay *relay; /* data for each relay */
struct relay *relay_by_addr[NHASH]; /* hashed index of relay by address */
struct descriptor desc[FD_SETSIZE]; /* descriptor for each fd */
struct sockaddr poster_addr; /* status poster address */
char buf[MAX_DATA_SIZE]; /* general-purpose buffer */
char *error403 = "403 This is an EchoLink Proxy, not a Web Proxy! Go away!\r\n\r\nThis is an EchoLink Proxy, not a Web Proxy! Go away!\r\n";
int len403;
/* prototypes */
static void logmsg(char *fmt,...) __attribute__ ((format(printf,1,2)));
static void sighndlr(int sig);
static int programloop(void);
static void status();
static void proxy_accept();
static void proxy_session(int fd);
static void proxy_tcp(int fd,void *ptr,int err);
static void proxy_timer(int fd);
static int proxy_message(struct proxy *pr,enum msgtype type,in_addr_t addr,char *data,int size);
static void proxy_resume(int fd);
static void proxy_close(struct proxy *pr);
static void tcp_session(int fd);
static void tcp_timer(int fd);
static void udp_input(int fd,in_port_t port,enum msgtype msgtype);
static void statusposter();
static int poststatus(struct proxy *pr,int nonblock);
static void post_upcall(int fd,void *ptr,int err);
static void client_input(int fd);
static struct flowrecord *findpeerflowrecord(struct relay *rl,in_addr_t saddr);
static struct flowrecord *findflowrecord(struct relay *rl,struct sockaddr *caddr,in_addr_t paddr);
static void deleteflowrecord(struct relay *rl,struct flowrecord *fr,char *reason);
static void fixupflowrecord(struct relay *rl,struct sockaddr *raddr_old,struct sockaddr *raddr_new);
static void registercallsign(struct relay *rl,struct sockaddr *raddr,char *callsign);
static struct registration *findcallsign(struct relay *rl,struct sockaddr *raddr);
static void unregistercallsign(struct relay *rl,struct registration *reg,char *reason);
static void removeobsolete(void);
static int client_error(struct relay *rl,struct sockaddr *addr,struct sockaddr *saddr,enum relayerror errcode);
static int client_message(struct relay *rl,enum response resp,struct sockaddr *raddr,struct sockaddr *saddr,in_port_t dport,char *data,int size);
static char *getcallfrommessage(char *data,int len);
static int checkaccesscontrols(char *call);
static int isbyepacket(char *data,int len);
static int buildbyepacket(void);
static int readconfig(char *config);
static char *confvar(char *name);
static struct descriptor *registerfd(int fd,enum fdtype fdtype,uint32_t events);
static int modifyfd(int fd,uint32_t events);
static int unregisterfd(int fd);
static int hostlookup(char *name,struct sockaddr *addr,char *text,size_t len);
static int bindport(int port,int socktype);
static int opentcp(struct sockaddr *from,struct sockaddr *to,int nonblock,upcall_t upcall,void *ptr);
static int opentcp_upcall(int fd,upcall_t upcall,void *ptr);
static int recvudp(int fd,in_addr_t *laddr,struct sockaddr *raddr,char *buf1,int len1);
static int sendudp(int fd,in_addr_t laddr,struct sockaddr *raddr,char *buf1,int len1,char *buf2,int len2);
static int64_t timestamp(void);
static char *urlencode(char *s);
/* main program */
int
main (int argc,char *argv[])
{
int ex = 0;
int n,i;
char *p;
struct proxy *pr,*pr2;
struct relay *rl,*rl2;
MD5_CTX md5ctx;
unsigned char digest[MD5_DIGEST_SIZE];
char hex[3];
setlocale(LC_ALL,"C"); /* make sure plain ASCII is used */
if (argc < 2) {
fprintf(stderr,"Usage: elproxy configfile [debuglevel]\n");
exit(99);
}
if (argc > 2) {
debug = atoi(argv[2]);
daemonize = 0;
}
if (readconfig(argv[1]) < 0) /* read configuration file */
exit(1);
now = timestamp(); /* in case it is used while init */
if ((p = confvar("LogFile")) != NULL && (logfile = fopen(p,"a")) == NULL)
fprintf(stderr,"Warning: cannot open logfile %s\n",p);
if ((p = confvar("StatusFile")) != NULL) {
unlink(p);
if ((statfile = fopen(p,"w")) == NULL)
fprintf(stderr,"Warning: cannot open statusfile %s\n",p);
}
/* process some of the global parameters */
if ((p = confvar("Daemonize")) != NULL)
daemonize = atoi(p);
if ((p = confvar("ConnectionTimeout")) != NULL)
connectiontimeout = atoi(p) * 60 * USEC;
denied_pattern = confvar("CallsignsDenied");
allowed_pattern = confvar("CallsignsAllowed");
/* allocate and initialize all of the proxy instances */
if ((p = confvar("NumberOfProxies")) != NULL)
nproxies = atoi(p);
if (nproxies != 0)
proxy = calloc(sizeof(struct proxy),nproxies);
for (n = 0; n < nproxies; n++) {
pr = &proxy[n];
snprintf(buf,sizeof(buf),"Proxy_%d",n);
/* init this proxy when an address is defined in the configuration */
if ((p = confvar(buf)) != NULL) {
if (hostlookup(p,&pr->laddr,pr->laddr_s,sizeof(pr->laddr_s)) < 0) {
fprintf(stderr,"Bad address %s=%s (%s) pr=%d\n",
buf,p,hstrerror(h_errno),n);
exit(2);
}
/* make sure all proxy addresses are unique */
i = HASHIP(IN_ADDR(pr->laddr));
for (pr2 = proxy_by_addr[i]; pr2 != NULL; pr2 = pr2->next)
if (IN_ADDR(pr->laddr) == IN_ADDR(pr2->laddr)) {
fprintf(stderr,"Duplicate address %s=%s\n",buf,pr->laddr_s);
exit(2);
}
pr->next = proxy_by_addr[i];
proxy_by_addr[i] = pr;
pr->regaddr = p;
/* check if specific PublicAddress for this proxy defined */
/* if not, use the Proxy address */
snprintf(buf,sizeof(buf),"PublicAddress_%d",n);
if ((p = confvar(buf)) != NULL)
pr->regaddr = p;
/* the RegistrationAddr, Name and Comment are hashed */
/* to be sent with the registration */
MD5Init(&md5ctx);
MD5Update(&md5ctx,(unsigned char *)pr->regaddr,strlen(pr->regaddr));
/* check if a specific RegistrationName for this proxy is defined */
/* if not, use the global RegistrationName */
snprintf(buf,sizeof(buf),"RegistrationName_%d",n);
if ((p = confvar(buf)) == NULL)
if ((p = confvar("RegistrationName")) == NULL) {
fprintf(stderr,"RegistrationName not set!\n");
exit(2);
}
snprintf(buf,sizeof(buf),p,n,n,n);
pr->regname = urlencode(buf);
MD5Update(&md5ctx,(unsigned char *)buf,strlen(buf));
/* check if specific RegistrationComment for this proxy defined */
/* if not, use the global RegistrationComment */
snprintf(buf,sizeof(buf),"RegistrationComment_%d",n);
if ((p = confvar(buf)) == NULL)
if ((p = confvar("RegistrationComment")) == NULL) {
fprintf(stderr,"RegistrationComment not set!\n");
exit(2);
}
snprintf(buf,sizeof(buf),p,n,n,n);
pr->regcomment = urlencode(buf);
strcpy(buf,MD5_SALT);
MD5Update(&md5ctx,(unsigned char *)buf,strlen(buf));
MD5Final(digest,&md5ctx);
for (i = 0; i < MD5_DIGEST_SIZE; i++) {
snprintf(hex,sizeof(hex),"%02X",digest[i]);
memcpy(pr->digest + 2*i,hex,2);
}
/* check if a specific password for this proxy is defined */
/* if not, use the global password */
snprintf(buf,sizeof(buf),"Password_%d",n);
if ((p = confvar(buf)) == NULL)
p = confvar("Password");
if (p == NULL || !strcmp(p,EXAMPLE_PASSWORD)) {
fprintf(stderr,
"Password not set, please edit configfile first!\n");
exit(2);
}
pr->password = p;
while (*p != '\0') {
*p = toupper(*p);
p++;
}
pr->state = READY;
}
}
/* allocate and initialize all of the relay instances */
if ((p = confvar("NumberOfRelays")) != NULL)
nrelays = atoi(p);
if (nrelays != 0)
relay = calloc(sizeof(struct relay),nrelays);
for (n = 0; n < nrelays; n++) {
rl = &relay[n];
snprintf(buf,sizeof(buf),"Relay_%d",n);
/* init this relay when an address is defined in the configuration */
if ((p = confvar(buf)) != NULL) {
if (hostlookup(p,&rl->laddr,rl->laddr_s,sizeof(rl->laddr_s)) < 0) {
fprintf(stderr,"Bad address %s=%s (%s)\n",
buf,p,hstrerror(h_errno));
exit(2);
}
/* make sure all relay addresses are unique */
i = HASHIP(IN_ADDR(rl->laddr));
for (pr2 = proxy_by_addr[i]; pr2 != NULL; pr2 = pr2->next)
if (IN_ADDR(rl->laddr) == IN_ADDR(pr2->laddr)) {
fprintf(stderr,"Duplicate address %s=%s\n",buf,rl->laddr_s);
exit(2);
}
for (rl2 = relay_by_addr[i]; rl2 != NULL; rl2 = rl2->next)
if (IN_ADDR(rl->laddr) == IN_ADDR(rl2->laddr)) {
fprintf(stderr,"Duplicate address %s=%s\n",buf,rl->laddr_s);
exit(2);
}
rl->next = relay_by_addr[i];
relay_by_addr[i] = rl;
rl->state = RELAY;
}
}
/* lookup poster host address once for entire run */
if (hostlookup(p = POSTER_HOST,&poster_addr,NULL,0) < 0) {
fprintf(stderr,"Lookup poster host %s failed (%s)\n",
p,hstrerror(h_errno));
exit(2);
}
IN_PORT(poster_addr) = htons(POSTER_PORT);
/* prepare the epoll instance */
if ((epoll_fd = epoll_create1(0)) < 0) {
fprintf(stderr,"Cannot create epoll instance (%s)\n",strerror(errno));
exit(2);
}
/* open all listen sockets */
if ((rtp_fd = bindport(RTP_PORT,SOCK_DGRAM)) < 0) {
fprintf(stderr,"Cannot bind UDP port %d\n",RTP_PORT);
exit(3);
}
registerfd(rtp_fd,RTP_FD,EPOLLIN);
if ((rtcp_fd = bindport(RTCP_PORT,SOCK_DGRAM)) < 0) {
fprintf(stderr,"Cannot bind UDP port %d\n",RTCP_PORT);
exit(3);
}
registerfd(rtcp_fd,RTCP_FD,EPOLLIN);
if (nrelays != 0) {
if ((client_fd = bindport(CLIENT_PORT,SOCK_DGRAM)) < 0) {
fprintf(stderr,"Cannot bind UDP port %d\n",CLIENT_PORT);
exit(3);
}
registerfd(client_fd,CLIENT_FD,EPOLLIN);
}
if (nproxies != 0) {
if ((p = confvar("Port")) != NULL)
proxy_port = atoi(p);
if ((proxy_fd = bindport(proxy_port,SOCK_STREAM)) < 0) {
fprintf(stderr,"Cannot bind TCP port %d\n",proxy_port);
exit(3);
}
registerfd(proxy_fd,PROXY_FD,EPOLLIN);
}
/* catch some signals */
signal(SIGHUP,sighndlr);
signal(SIGINT,sighndlr);
signal(SIGQUIT,sighndlr);
signal(SIGTERM,sighndlr);
/* ready to start the service */
if (daemonize)
daemon(1,0);
len403 = strlen(error403);
now = timestamp();
srand((unsigned int)now);
logmsg("start");
status();
while (!quit && programloop() == 0)
;
logmsg("quit %d",quit);
for (n = 0; n < nproxies; n++) {
pr = &proxy[n];
if (pr->state != UNUSED_P) {
pr->state = OFF;
poststatus(pr,0);
proxy_close(pr);
}
}
logmsg("exit %d",ex);
if (statfile != NULL)
fclose(statfile);
if (logfile != NULL)
fclose(logfile);
exit(ex);
}
/* log a message */
static void
logmsg (char *fmt,...)
{
time_t t;
char *tim,*p;
va_list ap;
if (logfile != NULL || !daemonize) {
t = (now = timestamp()) / USEC;
tim = ctime(&t);
if ((p = strchr(tim,'\n')) != NULL)
*p = '\0';
}
if (logfile != NULL) {
fprintf(logfile,"%s: ",tim);
va_start(ap,fmt);
vfprintf(logfile,fmt,ap);
va_end(ap);
fprintf(logfile,"\n");
}
if (!daemonize) {
fprintf(stderr,"%.15s.%06d: ",tim + 4,(int)(now % USEC));
va_start(ap,fmt);
vfprintf(stderr,fmt,ap);
va_end(ap);
fprintf(stderr,"\n");
}
}
/* signal handler */
static void
sighndlr (int sig)
{
char *p;
signal(sig,sighndlr);
if (sig == SIGHUP) {
if (logfile != NULL)
fclose(logfile);
if ((p = confvar("LogFile")) != NULL)
logfile = fopen(p,"a");
} else {
quit++;
}
logmsg("signal %d errno %d quit %d",sig,errno,quit);
if (quit > 3)
exit(5);
}
/* epoll loop to wait for next event */
static int
programloop ()
{
int e,fd,p;
int timeout;
int maxevents;
struct epoll_event *events;
/* allocate buffer to receive epoll_wait events (up to 10) */
maxevents = min((maxfd - minfd) + nproxies + 2,10);
events = calloc(maxevents,sizeof(struct epoll_event));
next = 0;
while (!quit) {
if (logfile != NULL)
fflush(logfile);
/* find the next time to process timers */
if (next < (now = timestamp())) {
next = now + STATS_INTERVAL;
for (fd = minfd; fd <= maxfd; fd++)
if (desc[fd].timeout && desc[fd].timeout < next)
next = desc[fd].timeout;
for (p = 0; p < nproxies; p++)
if (proxy[p].state != UNUSED_P)
if (proxy[p].nextstatuspost < next) {
if (proxy[p].nextstatuspost == 0)
next = min(next,now + POSTER_INTERVAL);
else
next = proxy[p].nextstatuspost;
}
}
/* epoll_wait uses milliseconds but our times are in microseconds */
if ((timeout = (next - now) / 1000) < 10)
timeout = 10;
/* find the ready file descriptors for input and output */
if ((numevents = epoll_wait(epoll_fd,events,maxevents,timeout)) < 0) {
if (errno == EINTR)
continue;
logmsg("error %d on epoll_wait",errno);
free(events);
return EOF;
}
/* update global current time */
now = timestamp();
/* process the received epoll events */
for (e = 0; e < numevents; e++) {
fd = events[e].data.fd;
if (events[e].events & EPOLLIN) {
switch (desc[fd].fdtype)
{
case RTP_FD:
udp_input(fd,htons(RTP_PORT),PROXY_MSG_UDP_DATA);
break;
case RTCP_FD:
udp_input(fd,htons(RTCP_PORT),PROXY_MSG_UDP_CONTROL);
break;
case CLIENT_FD:
client_input(fd);
break;
case PROXY_FD:
proxy_accept();
break;
case SESSION_FD:
proxy_session(fd);
break;
case ADDR_TCP_FD:
tcp_session(fd);
break;
case POSTER_FD:
recv(fd,buf,sizeof(buf),0);
close(fd);
unregisterfd(fd);
break;
default:
logmsg("epoll_wait returns EPOLLIN for fd=%d type=%d",
fd,desc[fd].fdtype);
unregisterfd(fd);
break;
}
}
if (events[e].events & EPOLLOUT) {
switch (desc[fd].fdtype)
{
case SESSION_FD:
proxy_resume(fd);
break;
case CONNECT_FD:
opentcp_upcall(fd,desc[fd].upcall,desc[fd].ptr);
break;
default:
logmsg("epoll_wait returns EPOLLOUT for fd=%d type=%d",
fd,desc[fd].fdtype);
unregisterfd(fd);
break;
}
}
}
/* process timers */
if (now >= next) {
for (fd = minfd; fd <= maxfd; fd++)
if (desc[fd].timeout && now >= desc[fd].timeout)
switch (desc[fd].fdtype)
{
case SESSION_FD:
proxy_timer(fd);
break;
case ADDR_TCP_FD:
tcp_timer(fd);
break;
case POSTER_FD:
close(fd);
unregisterfd(fd);
break;
case CONNECT_FD:
if (desc[fd].upcall != NULL)
(*desc[fd].upcall)(fd,desc[fd].ptr,1);
close(fd);
unregisterfd(fd);
break;
default:
logmsg("timer event for fd=%d type=%d",
fd,desc[fd].fdtype);
desc[fd].timeout = 0;
break;
}
next = 0;
}
status(); /* update statusfile */
if (nproxies != 0)
statusposter(); /* post proxy status */
if (nrelays != 0)
removeobsolete(); /* remove obsolete flows/reg */
}
free(events);
return 0;
}
/* write status into statfile */
static void
status ()
{
static int64_t nextstat = 0;
int n,h,npr,nbs,nrl,nuse,nreg,nflow;
struct relay *rl;
struct registration *reg;
struct flowrecord *fr;
if (statfile == NULL)
return;
if (now >= nextstat) {
npr = nbs = nrl = nuse = nreg = nflow = 0;
for (n = 0; n < nproxies; n++) {
if (proxy[n].state != UNUSED_P) {
npr++;
if (proxy[n].state != READY)
nbs++;
}
}
for (n = 0; n < nrelays; n++) {
rl = &relay[n];
if (rl->state != UNUSED_R) {
nrl++;
if (rl->registration != NULL) {
nuse++;
for (reg = rl->registration; reg != NULL; reg = reg->next)
nreg++;
for (h = 0; h < NHASH; h++)
for (fr = rl->flowrecord[h]; fr != NULL; fr = fr->next)
nflow++;
}
}
}
rewind(statfile);
fprintf(statfile,"OK\nOK: %d Proxies, %d Busy, %d Relays, %d Used, %d Regs, %d Flows | proxies=%d busy=%d relays=%d used=%d regs=%d flows=%d\n",
npr,nbs,nrl,nuse,nreg,nflow,npr,nbs,nrl,nuse,nreg,nflow);
if (nbs != 0)
fprintf(statfile,"Pr# Local address Remote address Callsign QSO Bytes TX\n");
for (n = 0; n < nproxies; n++) {
struct proxy *pr = &proxy[n];
if (pr->state != UNUSED_P && pr->state != READY) {
char tmp[16];
if (pr->state == BUSY) {
if (pr->qaddr) {
struct sockaddr addr;
memset(&addr,0,sizeof(addr));
addr.sa_family = AF_INET;
IN_ADDR(addr) = pr->qaddr;
getnameinfo(&addr,sizeof(struct sockaddr),tmp,
sizeof(tmp),NULL,0,NI_NUMERICHOST);
pr->qaddr = 0;
} else {
tmp[0] = '\0';
}
} else {
snprintf(tmp,sizeof(tmp),"[st%d fd%d t%d]",
pr->state,pr->fd,desc[pr->fd].fdtype);
}
fprintf(statfile,"%3d %-15s %-15s %-12s %-15s %11lld\n",
n,pr->laddr_s,pr->raddr_s,pr->clientcall,tmp,
(long long)pr->bytesout);
}
}
if (nbs != 0 && nreg != 0)
fprintf(statfile,"\n");
if (nreg != 0)
fprintf(statfile,"Rl# Local address Registr Flows Latest callsign Bytes RX Bytes TX\n");
for (n = 0; n < nrelays; n++) {
rl = &relay[n];
if (rl->state != UNUSED_R && rl->registration != NULL) {
char *callsign;
nreg = nflow = 0;
callsign = "";
for (reg = rl->registration; reg != NULL; reg = reg->next)
nreg++;
callsign = rl->registration->clientcall;
for (h = 0; h < NHASH; h++)
for (fr = rl->flowrecord[h]; fr != NULL; fr = fr->next)
nflow++;
fprintf(statfile,"%3d %-15s %7d %6d %-15s %11lld %11lld\n",
n,rl->laddr_s,nreg,nflow,callsign,
(long long)rl->bytesin,(long long)rl->bytesout);
}
}
fflush(statfile);
ftruncate(fileno(statfile),ftell(statfile));
nextstat = now + STATS_INTERVAL;
}
}
/* accept a connection to the proxy port */
static void
proxy_accept ()
{
int fd,v;
struct proxy *pr;
struct descriptor *ds;
socklen_t sklen;
struct sockaddr laddr,raddr;
char raddr_s[16];
memset(&raddr,0,sklen = sizeof(struct sockaddr));
if ((fd = accept(proxy_fd,&raddr,&sklen)) < 0) {
logmsg("accept failed (%d %s)",fd,strerror(errno));
return;
}
getnameinfo(&raddr,sizeof(struct sockaddr),raddr_s,sizeof(raddr_s),
NULL,0,NI_NUMERICHOST);
if (debug)
logmsg("accept from %s fd=%d",raddr_s,fd);
v = fcntl(fd,F_GETFL,0); /* set nonblocking mode */
fcntl(fd,F_SETFL,v|O_NONBLOCK);
v = 1; /* set keepalive mode */
setsockopt(fd,SOL_SOCKET,SO_KEEPALIVE,&v,sizeof(v));
memset(&laddr,0,sklen = sizeof(struct sockaddr));
getsockname(fd,&laddr,&sklen);
/* find the proxy entry with this local address */
for (pr = proxy_by_addr[HASHIP(IN_ADDR(laddr))]; pr != NULL; pr = pr->next)
if (IN_ADDR(laddr) == IN_ADDR(pr->laddr))
break;
/* when not found or busy, just close the connection */
if (pr == NULL || pr->state != READY) {
close(fd);
if (debug && pr != NULL)
logmsg("pr=%ld busy pr->fd=%d",(long)(pr - proxy),pr->fd);
return;
}
/* check if already some data sent -> probably a proxy scanner */
errno = 0;
if (recv(fd,buf,sizeof(buf),0) > 0) {
send(fd,error403,len403,MSG_NOSIGNAL);
close(fd);
if (debug)
logmsg("pr=%ld fd=%d unsolicited data",(long)(pr - proxy),fd);
return;
}
if (errno == EBADF) {
close(fd);
logmsg("pr=%ld fd=%d bad fd",(long)(pr - proxy),fd);
return;
}
/* register the fd and link everything together */
if ((ds = registerfd(fd,SESSION_FD,EPOLLIN)) == NULL) {
close(fd);
logmsg("register failed for fd=%d",fd);
return;
}
ds->proxy = pr;
/* initialize the proxy instance */
pr->fd = fd;
pr->raddr = raddr;
strncpy(pr->raddr_s,raddr_s,sizeof(pr->raddr_s));
memset(pr->clientcall,0,sizeof(pr->clientcall));
pr->tcp_fd = pr->hdrbytes = pr->msgbytes = pr->heldbytes = 0;
pr->qaddr = pr->bytesout = 0;
pr->state = NONCE;
/* set a .5 second timer to wait for unsolicited data (proxy scanner) */
ds->timeout = now + USEC/2;
next = min(next,ds->timeout);
}
/* receive data on a proxy session */
static void
proxy_session (int fd)
{
struct proxy *pr;
struct descriptor *ds;
int nread,n,letter,digit;
char *call;
uint32_t msglen;
struct sockaddr saddr;
MD5_CTX md5ctx;
unsigned char digest[MD5_DIGEST_SIZE];
char tmp[32];
if ((pr = (ds = &desc[fd])->proxy) == NULL)
return;
switch (pr->state)
{
case AUTH1:
/* get call (up to \n) and check for validity */
letter = digit = nread = 0;
for (n = 0; n < 12; n++) {
if ((nread = recv(fd,buf,1,0)) != 1)
break;
if (buf[0] == '\n')
break;
if (isupper((unsigned char)buf[0]))
letter++;
else
if (isdigit((unsigned char)buf[0]))
digit++;
else
if (buf[0] != '-')
break;
pr->clientcall[n] = buf[0];
}
/* when it does not look like a callsign, it probably is one of */
/* those webproxy scanners sending a CONNECT, GET, POST etc */
if (nread != 1 || letter < 2 || !digit || buf[0] != '\n') {
if (debug)
logmsg("pr=%ld fd=%d bad callsign %.15s",
(long)(pr - proxy),fd,pr->clientcall);
recv(fd,buf,sizeof(buf),0);
send(fd,error403,len403,MSG_NOSIGNAL);
proxy_close(pr);
break;
}
pr->clientcall[n] = '\0';
pr->state = AUTH2;
break;
case AUTH2:
/* check password (MD5 of password and nonce) */
MD5Init(&md5ctx);
MD5Update(&md5ctx,(unsigned char *)pr->password,strlen(pr->password));
snprintf(tmp,sizeof(tmp),"%x",pr->nonce);
MD5Update(&md5ctx,(unsigned char *)tmp,8);
MD5Final(digest,&md5ctx);
if (recv(fd,buf,MD5_DIGEST_SIZE,0) != MD5_DIGEST_SIZE) {
if (debug)
logmsg("pr=%ld fd=%d bad md5 digest",(long)(pr - proxy),fd);
send(fd,error403,len403,MSG_NOSIGNAL);
proxy_close(pr);
break;
}
if (memcmp(digest,buf,MD5_DIGEST_SIZE)) {
buf[0] = REASON_CODE_BAD_PW;
proxy_message(pr,PROXY_MSG_SYSTEM,0,buf,1);
logmsg("pr=%ld Bad password call=%s ip=%s",
(long)(pr - proxy),pr->clientcall,pr->raddr_s);
proxy_close(pr);
break;
}
/* check for allowed and denied callsigns */
if (!checkaccesscontrols(pr->clientcall)) {
buf[0] = REASON_CODE_ACCESS_DENIED;
proxy_message(pr,PROXY_MSG_SYSTEM,0,buf,1);
logmsg("pr=%ld Access denied call=%s ip=%s",
(long)(pr - proxy),pr->clientcall,pr->raddr_s);
proxy_close(pr);
break;
}
/* check if this same callsign is connected to another proxy */
for (n = 0; n < nproxies; n++)
if (proxy[n].state == BUSY &&
!strcmp(pr->clientcall,proxy[n].clientcall))
{
logmsg("pr=%ld Duplicate call=%s ip=%s other=%d fd=%d",
(long)(pr - proxy),pr->clientcall,pr->raddr_s,n,fd);
proxy_close(&proxy[n]);
break;
}
/* access OK, log and setup connection */
logmsg("pr=%ld Auth call=%s ip=%s fd=%d",
(long)(pr - proxy),pr->clientcall,pr->raddr_s,fd);
ds->timeout = now + CLIENT_SOCKET_TIMEOUT;
next = 0;
if (connectiontimeout)
pr->connectiontimeout = now + connectiontimeout;
else
pr->connectiontimeout = 0;
pr->state = BUSY;
poststatus(pr,1);
break;
case BUSY:
/* active proxy, collect header */
if (pr->hdrbytes != HEADER_SIZE) {
if ((nread = recv(fd,&pr->header[pr->hdrbytes],
HEADER_SIZE - pr->hdrbytes,0)) <= 0) {
if (debug)
logmsg("pr=%ld EOF on fd=%d",(long)(pr - proxy),fd);
proxy_close(pr);
break;
}
if ((pr->hdrbytes += nread) != HEADER_SIZE)
break;
}
/* extract address and message length from header */
memset(&saddr,0,sizeof(saddr));
saddr.sa_family = AF_INET;
memcpy(&IN_ADDR(saddr),&pr->header[1],4);
memcpy(&msglen,&pr->header[5],4);
if (msglen > MAX_DATA_SIZE) {
for (n = 0; n < HEADER_SIZE; n++)
snprintf(&tmp[2*n],3,"%02x",(unsigned char)pr->header[n]);
logmsg("invalid packet msglen=%u %s",msglen,tmp);
proxy_close(pr);
break;
}
/* collect message part by part */
if (pr->msgbytes != msglen) {
pr->heldbytes = 0;
if ((nread = recv(fd,&pr->message[pr->msgbytes],
msglen - pr->msgbytes,0)) <= 0) {
if (nread < 0 && errno == EAGAIN)
break;
if (debug)
logmsg("pr=%ld EOF on fd=%d",(long)(pr - proxy),fd);
proxy_close(pr);
break;
}
if ((pr->msgbytes += nread) != msglen)
break;
}
/* be ready for next message after handling this one */
pr->hdrbytes = pr->msgbytes = 0;
ds->timeout = now + CLIENT_SOCKET_TIMEOUT;
next = min(next,ds->timeout);
if (pr->connectiontimeout && now >= pr->connectiontimeout) {
logmsg("pr=%ld Conn timeout call=%s",
(long)(pr - proxy),pr->clientcall);
proxy_close(pr);
break;
}
/* process the message */
switch ((enum msgtype)pr->header[0])
{
case PROXY_MSG_TCP_OPEN:
if (pr->tcp_fd) {
close(pr->tcp_fd);
unregisterfd(pr->tcp_fd);
}
IN_PORT(saddr) = htons(ADDR_SERVER_PORT);
if ((pr->tcp_fd = opentcp(&pr->laddr,&saddr,1,proxy_tcp,pr)) < 0) {
buf[0] = errno? errno : 1;
buf[1] = buf[2] = buf[3] = 0;
proxy_message(pr,PROXY_MSG_TCP_STATUS,0,buf,4);
break;
}
modifyfd(fd,0); /* hold off further messages */
/* until connection established */
break;
case PROXY_MSG_TCP_DATA:
if ((call = getcallfrommessage(pr->message,msglen)) != NULL &&
!checkaccesscontrols(call))
{
buf[0] = REASON_CODE_ACCESS_DENIED;
proxy_message(pr,PROXY_MSG_SYSTEM,0,buf,1);
close(pr->tcp_fd);
unregisterfd(pr->tcp_fd);
pr->tcp_fd = 0;
logmsg("pr=%ld Access denied call=%s (msg)",
(long)(pr - proxy),call);
break;
}
if (send(pr->tcp_fd,pr->message,msglen,MSG_NOSIGNAL) != msglen) {
buf[0] = errno;
buf[1] = buf[2] = buf[3] = 0;
proxy_message(pr,PROXY_MSG_TCP_STATUS,0,buf,4);
break;
}
break;
case PROXY_MSG_TCP_CLOSE:
if (pr->tcp_fd) {
close(pr->tcp_fd);
unregisterfd(pr->tcp_fd);
pr->tcp_fd = 0;
}
if (debug)
logmsg("pr=%ld %s TCP server closed sent=%lld",
(long)(pr - proxy),pr->clientcall,(long long)pr->bytesout);
break;
case PROXY_MSG_UDP_DATA:
IN_PORT(saddr) = htons(RTP_PORT);
sendudp(rtp_fd,IN_ADDR(pr->laddr),&saddr,pr->message,msglen,NULL,0);
pr->qaddr = IN_ADDR(saddr);
break;
case PROXY_MSG_UDP_CONTROL:
IN_PORT(saddr) = htons(RTCP_PORT);
sendudp(rtcp_fd,IN_ADDR(pr->laddr),&saddr,pr->message,msglen,NULL,0);
pr->qaddr = IN_ADDR(saddr);
break;
default:
break;
}
break;
default:
recv(fd,buf,sizeof(buf),0);
send(fd,error403,len403,MSG_NOSIGNAL);
proxy_close(pr);
break;
}
}
/* upcall for ADDR TCP open from proxy session */
static void
proxy_tcp (int fd,void *ptr,int err)
{
struct proxy *pr = ptr;
struct descriptor *ds;
memset(buf,0,4);
if (err || pr->state != BUSY ||
(ds = registerfd(fd,ADDR_TCP_FD,EPOLLIN)) == NULL) {
if (errno)
memcpy(buf,&errno,4);
else
buf[0] = 1;
close(fd);
pr->tcp_fd = 0;
} else {
ds->proxy = pr;
ds->timeout = now + ADDR_SOCKET_TIMEOUT;
next = 0;
}
if (debug)
logmsg("pr=%ld %s TCP server opened (%d)",
(long)(pr - proxy),pr->clientcall,(unsigned char)buf[0]);
if (pr->state == BUSY) {
proxy_message(pr,PROXY_MSG_TCP_STATUS,0,buf,4);
modifyfd(pr->fd,EPOLLIN); /* enable client messages */
}
}
/* timer event on a proxy session */
static void
proxy_timer (int fd)
{
struct proxy *pr;
struct descriptor *ds;
pr = (ds = &desc[fd])->proxy;
switch (pr->state)
{
case NONCE: /* end of wait for unsol data */
while ((pr->nonce = rand()) < 0x10000000)
; /* avoid short values */
snprintf(buf,sizeof(buf),"%x",pr->nonce);
if (send(fd,buf,8,MSG_NOSIGNAL) != 8) {
proxy_close(pr);
} else {
pr->state = AUTH1;
ds->timeout = now + AUTH_SOCKET_TIMEOUT;
next = 0;
}
break;
case BUSY: /* conn idle for too long */
proxy_close(pr);
break;
default:
send(fd,error403,len403,MSG_NOSIGNAL);
proxy_close(pr);
break;
}
}
/* send a proxy message back to the user */
/* return 1 when successful, 0 when send error */
/* keep left-over data in message buffer so it can be tried again later */
static int
proxy_message (struct proxy *pr,enum msgtype type,in_addr_t addr,
char *data,int size)
{
int bytesout;
struct msghdr msg;
struct iovec iov[2];
char hdr[HEADER_SIZE];
/* construct the header */
hdr[0] = type;
memcpy(&hdr[1],&addr,4);
memcpy(&hdr[5],&size,4);
/* construct a descriptor for sendmsg */
memset(&msg,0,sizeof(msg));
msg.msg_iov = iov;
msg.msg_iovlen = 1;
iov[0].iov_base = hdr;
iov[0].iov_len = HEADER_SIZE;
if (size) {
iov[1].iov_base = data;
iov[1].iov_len = size;
msg.msg_iovlen = 2;
}
size += HEADER_SIZE;
/* send the message */
if ((bytesout = sendmsg(pr->fd,&msg,MSG_NOSIGNAL)) != size) {
if (bytesout < 0)
return bytesout;
/* part of message was sent, copy remainder to pr->message */
if ((size - bytesout) > MAX_DATA_SIZE)
return -2; /* should never happen! */
if (bytesout < HEADER_SIZE) {
memcpy(pr->message,hdr + bytesout,HEADER_SIZE - bytesout);
memcpy(pr->message + HEADER_SIZE - bytesout,data,size-HEADER_SIZE);
} else {
memcpy(pr->message,data + bytesout - HEADER_SIZE,size - bytesout);
}
pr->heldbytes = size - bytesout; /* #bytes to be re-sent */
logmsg("pr=%ld proxy_message msg=%d wr=%d held=%d",
(long)(pr - proxy),size,bytesout,pr->heldbytes);
modifyfd(pr->fd,EPOLLOUT); /* hold off input data */
/* and wait for space */
}
if (bytesout > 0)
pr->bytesout += bytesout;
return (bytesout == size);
}
/* resume sending on a blocked proxy connection */
/* pr->message holds unsent data from proxy_message */
static void
proxy_resume (int fd)
{
struct proxy *pr;
int bytesout;
if ((pr = desc[fd].proxy) == NULL)
return;
if (pr->heldbytes) {
/* try sending the remainder of the data, when error close the proxy */
if ((bytesout = send(fd,pr->message,pr->heldbytes,MSG_NOSIGNAL)) < 0) {
proxy_close(pr);
return;
}
pr->bytesout += bytesout;
/* still not completely sent? */
if (bytesout != pr->heldbytes) {
logmsg("pr=%ld proxy_resume msg=%d wr=%d sent=%lld",
(long)(pr - proxy),pr->heldbytes,bytesout,
(long long)pr->bytesout);
if (bytesout != 0) {
pr->heldbytes -= bytesout;
memmove(pr->message,pr->message + bytesout,pr->heldbytes);
}
return;
}
if (debug)
logmsg("pr=%ld proxy_resume msg=%d wr=%d sent=%lld",
(long)(pr - proxy),pr->heldbytes,bytesout,
(long long)pr->bytesout);
}
/* now all is sent, resume normal operation */
pr->heldbytes = 0;
modifyfd(fd,EPOLLIN);
if (pr->tcp_fd != 0 && desc[pr->tcp_fd].fdtype == ADDR_TCP_FD)
modifyfd(pr->tcp_fd,EPOLLIN);
}
/* forcibly close a proxy connection */
static void
proxy_close (struct proxy *pr)
{
if (pr->tcp_fd) {
close(pr->tcp_fd);
unregisterfd(pr->tcp_fd);
pr->tcp_fd = 0;
}
if (pr->fd) {
close(pr->fd);
unregisterfd(pr->fd);
pr->fd = 0;
}
pr->heldbytes = 0;
if (pr->state == BUSY) {
logmsg("pr=%ld Disc call=%s sent=%lld",
(long)(pr - proxy),pr->clientcall,(long long)pr->bytesout);
pr->nextstatuspost = now + POSTER_INTERVAL;
next = min(next,now + POSTER_INTERVAL);
}
pr->state = READY;
}
/* data received on ADDR TCP connection */
static void
tcp_session (int fd)
{
struct proxy *pr;
int nread,rv;
if ((pr = desc[fd].proxy) == NULL)
return;
if ((nread = recv(fd,buf,TCP_DATA_SIZE,0)) <= 0) {
close(fd);
unregisterfd(fd);
pr->tcp_fd = 0;
if (debug)
logmsg("pr=%ld %s TCP server EOF %d sent=%lld",
(long)(pr - proxy),pr->clientcall,nread,
(long long)pr->bytesout);
if (pr->state == BUSY)
proxy_message(pr,PROXY_MSG_TCP_CLOSE,0,NULL,0);
return;
}
if (pr->state == BUSY) {
if ((rv = proxy_message(pr,PROXY_MSG_TCP_DATA,0,buf,nread)) < 0) {
proxy_close(pr);
return;
}
/* proxy_message blocked? */
if (rv == 0)
modifyfd(fd,0); /* hold off further data */
}
}
/* timeout on ADDR TCP connection */
static void
tcp_timer (int fd)
{
struct proxy *pr;
pr = desc[fd].proxy;
close(fd);
unregisterfd(fd);
if (pr == NULL)
return;
pr->tcp_fd = 0;
if (pr->state == BUSY) {
if (!(desc[pr->fd].events & EPOLLIN))
modifyfd(pr->fd,EPOLLIN);
proxy_message(pr,PROXY_MSG_TCP_CLOSE,0,NULL,0);
}
}
/* UDP received on RTP or RTCP port */
/* this can be for Proxy or for Relay */
static void
udp_input (int fd,in_port_t port,enum msgtype msgtype)
{
struct proxy *pr;
struct relay *rl;
int nread,pkt,wr;
struct flowrecord *fr;
in_addr_t laddr;
struct sockaddr raddr;
memset(&raddr,0,sizeof(raddr));
laddr = pkt = 0;
while ((nread = recvudp(fd,&laddr,&raddr,buf,sizeof(buf))) >= 0 &&
++pkt < 10)
{
/* find the proxy entry with this local address */
for (pr = proxy_by_addr[HASHIP(laddr)]; pr != NULL; pr = pr->next)
if (laddr == IN_ADDR(pr->laddr))
{
if (pr->state == BUSY)
{
/* remember address of qso peer for status list */
pr->qaddr = IN_ADDR(raddr);
/* relay the message to the user */
if (!proxy_message(pr,msgtype,IN_ADDR(raddr),buf,nread))
proxy_close(pr);
}
goto nxtmsg;
}
/* find the relay entry with this local address */
for (rl = relay_by_addr[HASHIP(laddr)]; rl != NULL; rl = rl->next)
if (laddr == IN_ADDR(rl->laddr))
{
if ((fr = findpeerflowrecord(rl,IN_ADDR(raddr))) != NULL &&
(wr = client_message(rl,RESPONSE_TYPE_DATA,&fr->clientaddr,
&raddr,port,buf,nread)) > 0)
{
rl->bytesin += wr;
fr->bytesin += wr;
fr->lastactivity = now;
goto nxtmsg;
}
/* locating the flow or sending the packet failed */
/* if this is NOT a BYE packet, send back a BYE */
if (port == htons(RTCP_PORT) && !isbyepacket(buf,nread)) {
nread = buildbyepacket();
sendudp(fd,IN_ADDR(rl->laddr),&raddr,buf,nread,NULL,0);
}
goto nxtmsg;
}
nxtmsg: ;
}
}
/* post regular status updates for all proxies */
static void
statusposter ()
{
static int64_t nextpost = 0;
static int p = 0;
if (now >= nextpost) {
nextpost = now + POSTER_INTERVAL;
next = 0;
while (proxy[p].state == UNUSED_P || now < proxy[p].nextstatuspost)
if (++p >= nproxies) {
p = 0;
return;
}
poststatus(&proxy[p],1);
if (++p >= nproxies)
p = 0;
}
}
/* post proxy status for specified proxy */
static int
poststatus (struct proxy *pr,int nonblock)
{
if (pr->state == UNUSED_P)
return -1;
pr->nextstatuspost = now + STATP_INTERVAL;
next = 0;
return opentcp(&pr->laddr,&poster_addr,nonblock,post_upcall,pr);
}
/* upcall for TCP open from poststatus */
static void
post_upcall (int fd,void *ptr,int err)
{
struct proxy *pr = ptr;
int len;
struct descriptor *ds;
char *public,*status;
static char post[512];
if (err || (ds = registerfd(fd,POSTER_FD,EPOLLIN)) == NULL) {
close(fd);
return;
}
ds->timeout = now + CONNECT_TIMEOUT;
next = 0;
/* post a status message to the http server */
public = !strcmp(pr->password,PUBLIC_PASSWORD)? "Y" : "N";
switch (pr->state)
{
case READY:
status = "Ready";
break;
case OFF:
status = "Off";
break;
default:
status = "Busy";
break;
}
len = snprintf(post,sizeof(post),
"name=%s&comment=%s&public=%s&status=%s&a=%s&d=%.32s&p=%d&v=%s",
pr->regname,pr->regcomment,public,status,
pr->regaddr,pr->digest,proxy_port,PROGRAM_VERSION);
if (pr->state == BUSY)
len += snprintf(post + len,sizeof(post) - len,
"&cc=%s&ca=%s",pr->clientcall,pr->raddr_s);
len = snprintf(buf,sizeof(buf),
"POST %s HTTP/1.0\r\nHost: %s\r\nContent-Length: %d\r\nContent-Type: application/x-www-form-urlencoded\r\n\r\n%s\r\n",
POSTER_URL,POSTER_HOST,len,post);
if (send(fd,buf,len,MSG_NOSIGNAL) != len && debug)
logmsg("poststatus send error (%s)",strerror(errno));
}
/* UDP received on Relay client port */
static void
client_input (int fd)
{
struct relay *rl;
int nread,pkt,h,dfd;
unsigned int len;
struct registration *reg;
struct flowrecord *fr;
in_addr_t laddr;
struct sockaddr raddr,daddr;
char tmp1[16],tmp2[16];
laddr = pkt = 0;
memset(&raddr,0,sizeof(raddr));
memset(&daddr,0,sizeof(daddr));
daddr.sa_family = AF_INET;
while ((nread = recvudp(fd,&laddr,&raddr,buf,sizeof(buf))) >= 0 &&
++pkt < 10)
{
if ((nread -= REQ_HLEN) < 0 || nread > MAX_PACKET_DATA)
continue; /* too short or too long */
/* extract dest address and port from header */
memcpy(&IN_ADDR(daddr),&buf[1],4);
memcpy(&IN_PORT(daddr),&buf[5],2);
/* find the relay entry with this local address */
for (rl = relay_by_addr[HASHIP(laddr)]; rl != NULL; rl = rl->next)
if (laddr == IN_ADDR(rl->laddr))
break;
if (rl == NULL)
continue;
/* process the message */
switch (buf[0])
{
case REQUEST_TYPE_KEEPALIVE:
if ((fr = findflowrecord(rl,&raddr,IN_ADDR(daddr))) != NULL)
fr->lastactivity = now;
client_message(rl,RESPONSE_TYPE_ACK,&raddr,&daddr,0,NULL,0);
break;
case REQUEST_TYPE_DATA:
/* data record, locate existing flow record */
if ((fr = findflowrecord(rl,&raddr,IN_ADDR(daddr))) == NULL) {
/* no flow record, check if this user is registered */
if ((reg = findcallsign(rl,&raddr)) == NULL) {
if (!IN_EQUAL(raddr,rl->lastclient)) {
getnameinfo(&raddr,sizeof(struct sockaddr),
tmp1,sizeof(tmp1),tmp2,sizeof(tmp2),
NI_NUMERICHOST|NI_NUMERICSERV);
logmsg("rl=%ld flow from nonreg %s:%s",
(long)(rl - relay),tmp1,tmp2);
}
rl->lastclient = raddr;
client_error(rl,&raddr,&daddr,ERR_ALLOCATION);
break;
}
reg->lastactivity = now;
/* make sure there is no competing flow to this peer */
if (findpeerflowrecord(rl,IN_ADDR(daddr)) != NULL) {
if (!IN_EQUAL(raddr,rl->lastclient)) {
getnameinfo(&daddr,sizeof(struct sockaddr),tmp1,
sizeof(tmp1),NULL,0,NI_NUMERICHOST);
logmsg("rl=%ld competing flow %s to %s",
(long)(rl - relay),reg->clientcall,tmp1);
}
rl->lastclient = raddr;
client_error(rl,&raddr,&daddr,ERR_ALLOCATION);
break;
}
/* be sure we're not trying to send a packet to */
/* ourselves, or to some other client of this relay */
if (IN_ADDR(daddr) == IN_ADDR(rl->laddr)) {
if (!IN_EQUAL(raddr,rl->lastclient)) {
getnameinfo(&daddr,sizeof(struct sockaddr),tmp1,
sizeof(tmp1),NULL,0,NI_NUMERICHOST);
logmsg("rl=%ld conflicting flow %s to %s",
(long)(rl - relay),reg->clientcall,tmp1);
}
rl->lastclient = raddr;
client_error(rl,&raddr,&daddr,ERR_ALLOCATION);
break;
}
/* create new flow record */
if ((fr = calloc(1,sizeof(struct flowrecord))) == NULL) {
client_error(rl,&raddr,&daddr,ERR_ALLOCATION);
break;
}
getnameinfo(&daddr,sizeof(struct sockaddr),tmp1,
sizeof(tmp1),NULL,0,NI_NUMERICHOST);
if (debug)
logmsg("rl=%ld New flow %s to %s",
(long)(rl - relay),reg->clientcall,tmp1);
fr->clientaddr = raddr;
fr->peeraddr = IN_ADDR(daddr);
/* link it to the correct hashlist depending on peer */
h = HASHIP(IN_ADDR(daddr));
if ((fr->next = rl->flowrecord[h]) != NULL)
fr->next->prev = fr;
rl->flowrecord[h] = fr;
}
fr->lastactivity = now;
if (IN_PORT(daddr) == htons(RTP_PORT))
dfd = rtp_fd;
else
if (IN_PORT(daddr) == htons(RTCP_PORT))
dfd = rtcp_fd;
else
break;
if (sendudp(dfd,IN_ADDR(rl->laddr),&daddr,&buf[REQ_HLEN],nread,
NULL,0) == nread) {
rl->bytesout += nread;
fr->bytesout += nread;
}
break;
case REQUEST_TYPE_REGISTER:
if (nread >= 2) {
len = min((unsigned int)buf[REQ_HLEN],nread - 1);
buf[REQ_HLEN+1 + len] = '\0';
registercallsign(rl,&raddr,&buf[REQ_HLEN+1]);
client_message(rl,RESPONSE_TYPE_ACK,&raddr,&daddr,0,NULL,0);
}
break;
case REQUEST_TYPE_UNREGISTER:
if (nread >= 2) {
len = min((unsigned int)buf[REQ_HLEN],nread - 1);
buf[REQ_HLEN+1 + len] = '\0';
if ((reg = findcallsign(rl,&raddr)) != NULL &&
!strcmp(reg->clientcall,&buf[REQ_HLEN+1]))
{
unregistercallsign(rl,reg,"Unregister");
client_message(rl,RESPONSE_TYPE_ACK,&raddr,&daddr,0,NULL,0);
}
}
break;
case REQUEST_TYPE_DROP_FLOW:
if ((fr = findflowrecord(rl,&raddr,IN_ADDR(daddr))) != NULL)
deleteflowrecord(rl,fr,"Drop");
break;
case REQUEST_TYPE_PING:
IN_ADDR(daddr) = IN_ADDR(raddr);
IN_PORT(daddr) = 0;
len = snprintf(buf,sizeof(buf),"ver: %s\r\nipv4: %s\r\n",
PROGRAM_VERSION,rl->laddr_s);
client_message(rl,RESPONSE_TYPE_PING,&raddr,&daddr,0,buf,len);
break;
default:
break;
}
}
}
/* find flowrecord for source address */
static struct flowrecord *
findpeerflowrecord (struct relay *rl,in_addr_t saddr)
{
struct flowrecord *fr;
for (fr = rl->flowrecord[HASHIP(saddr)]; fr != NULL; fr = fr->next)
if (fr->peeraddr == saddr)
break;
return fr;
}
/* find flowrecord for client socket and address */
static struct flowrecord *
findflowrecord (struct relay *rl,struct sockaddr *caddr,in_addr_t paddr)
{
struct flowrecord *fr;
for (fr = rl->flowrecord[HASHIP(paddr)]; fr != NULL; fr = fr->next)
if (IN_EQUAL(fr->clientaddr,*caddr) && fr->peeraddr == paddr)
break;
return fr;
}
/* delete flowrecord for client socket and address */
static void
deleteflowrecord (struct relay *rl,struct flowrecord *fr,char *reason)
{
struct sockaddr addr;
char tmp1[16],tmp2[16];
/* log */
getnameinfo(&fr->clientaddr,sizeof(struct sockaddr),
tmp1,sizeof(tmp1),NULL,0,NI_NUMERICHOST);
memset(&addr,0,sizeof(addr));
addr.sa_family = AF_INET;
IN_ADDR(addr) = fr->peeraddr;
getnameinfo(&addr,sizeof(struct sockaddr),
tmp2,sizeof(tmp2),NULL,0,NI_NUMERICHOST);
if (debug)
logmsg("rl=%ld %s flow %s to %s in=%lld out=%lld",
(long)(rl - relay),reason,tmp1,tmp2,
(long long)fr->bytesin,(long long)fr->bytesout);
/* unlink this flow record */
if (fr->next != NULL)
fr->next->prev = fr->prev;
if (fr->prev != NULL)
fr->prev->next = fr->next;
else
rl->flowrecord[HASHIP(IN_ADDR(addr))] = fr->next;
free(fr);
}
/* update flow records for a client IP change */
static void
fixupflowrecord (struct relay *rl,struct sockaddr *raddr_old,struct sockaddr *raddr_new)
{
struct flowrecord *fr;
int h;
/* peer address remains the same, so records can be updated in-place */
for (h = 0; h < NHASH; h++) {
for (fr = rl->flowrecord[h]; fr != NULL; fr = fr->next) {
if (IN_EQUAL(fr->clientaddr,*raddr_old)) {
fr->clientaddr = *raddr_new;
}
}
}
}
/* register a relay client callsign */
static void
registercallsign (struct relay *rl,struct sockaddr *raddr,char *callsign)
{
char *p,letter,digit;
struct registration *reg;
char tmp1[16],tmp2[16];
/* validate callsign */
letter = digit = 0;
for (p = callsign; *p != '\0'; p++)
if (isupper((unsigned char)*p))
letter++;
else
if (isdigit((unsigned char)*p))
digit++;
else
if (*p != '-')
return;
if (!letter || !digit || (p - callsign) > 15)
return;
/* find existing registration */
for (reg = rl->registration; reg != NULL; reg = reg->next)
if (!strcmp(callsign,reg->clientcall))
break;
/* when not found, add a new one at head of list */
if (reg == NULL) {
if ((reg = calloc(1,sizeof(struct registration))) == NULL)
return;
if ((reg->next = rl->registration) != NULL)
reg->next->prev = reg;
rl->registration = reg;
strcpy(reg->clientcall,callsign);
getnameinfo(raddr,sizeof(struct sockaddr),tmp1,sizeof(tmp1),
tmp2,sizeof(tmp2),NI_NUMERICHOST|NI_NUMERICSERV);
logmsg("rl=%ld Register %s at %s:%s",
(long)(rl - relay),reg->clientcall,tmp1,tmp2);
} else {
/* If client IP address has changed for an existing */
/* callsign registration, fix up this address in any */
/* flow-table records also. */
if (!IN_EQUAL(reg->clientaddr,*raddr)) {
getnameinfo(raddr,sizeof(struct sockaddr),tmp1,sizeof(tmp1),
tmp2,sizeof(tmp2),NI_NUMERICHOST|NI_NUMERICSERV);
logmsg("rl=%ld client address change for %s to %s:%s",
(long)(rl - relay),reg->clientcall,tmp1,tmp2);
fixupflowrecord(rl,&reg->clientaddr,raddr);
}
}
/* update registration */
reg->clientaddr = *raddr;
reg->lastactivity = now;
}
/* find a client callsign registration */
static struct registration *
findcallsign (struct relay *rl,struct sockaddr *raddr)
{
struct registration *reg;
/* find registration by addr */
for (reg = rl->registration; reg != NULL; reg = reg->next)
if (IN_EQUAL(*raddr,reg->clientaddr))
break;
return reg;
}
/* unregister a relay client callsign */
static void
unregistercallsign (struct relay *rl,struct registration *reg,char *reason)
{
int h;
struct flowrecord *fr,*frn;
char tmp1[16],tmp2[16];
getnameinfo(&reg->clientaddr,sizeof(struct sockaddr),tmp1,sizeof(tmp1),
tmp2,sizeof(tmp2),NI_NUMERICHOST|NI_NUMERICSERV);
logmsg("rl=%ld %s %s at %s:%s",
(long)(rl - relay),reason,reg->clientcall,tmp1,tmp2);
/* remove any flows for this client */
for (h = 0; h < NHASH; h++)
for (fr = rl->flowrecord[h]; fr != NULL; fr = frn) {
frn = fr->next;
if (IN_EQUAL(fr->clientaddr,reg->clientaddr))
deleteflowrecord(rl,fr,reason);
}
/* unlink this registration */
if (reg->next != NULL)
reg->next->prev = reg->prev;
if (reg->prev != NULL)
reg->prev->next = reg->next;
else
rl->registration = reg->next;
free(reg);
}
/* remove obsolete flows and relay callsign registrations */
static void
removeobsolete (void)
{
static int64_t nextremove = 0;
struct relay *rl;
struct registration *reg,*regn;
struct flowrecord *fr,*frn;
int n,h;
if (now >= nextremove) {
for (n = 0; n < nrelays; n++) {
rl = &relay[n];
if (rl->state != RELAY)
continue;
for (h = 0; h < NHASH; h++)
for (fr = rl->flowrecord[h]; fr != NULL; fr = frn) {
frn = fr->next;
if (fr->lastactivity < (now - FLOW_TIMEOUT))
deleteflowrecord(rl,fr,"Timeout");
}
for (reg = rl->registration; reg != NULL; reg = regn) {
regn = reg->next;
if (reg->lastactivity < (now - REG_TIMEOUT))
unregistercallsign(rl,reg,"Timeout");
}
}
nextremove = now + RELAY_INTERVAL;
}
}
/* send an error code to the relay client */
static int
client_error (struct relay *rl,struct sockaddr *addr,struct sockaddr *saddr,
enum relayerror errcode)
{
char err[1];
err[0] = errcode;
return client_message(rl,RESPONSE_TYPE_ERROR,addr,saddr,0,err,1);
}
/* send a relay response message to the client */
static int
client_message (struct relay *rl,enum response resp,struct sockaddr *raddr,
struct sockaddr *saddr,in_port_t dport,char *data,int size)
{
int rv;
char hdr[RES_HLEN];
/* construct the header */
hdr[0] = resp;
memcpy(&hdr[1],&IN_ADDR(*saddr),4);
memcpy(&hdr[5],&dport,2);
memcpy(&hdr[7],&dport,2);
/* send the message */
errno = 0;
if ((rv = sendudp(client_fd,IN_ADDR(rl->laddr),raddr,
hdr,RES_HLEN,data,size)) != (RES_HLEN + size))
{
logmsg("rl=%ld client_message msg=%d wr=%d (%s)",
(long)(rl - relay),RES_HLEN + size,rv,strerror(errno));
rv = -1;
}
return rv;
}
/* check if a directory server message contains a callsign and extract it */
/* return a pointer to the \0-terminated callsign (overwritten by each call) */
static char *
getcallfrommessage (char *data,int len)
{
int i;
static char call[32];
if (len < 20)
return NULL;
for (i = 1; i < 19; i++)
if ((uint8_t)data[i] == 0xAC && (uint8_t)data[i+1] == 0xAC) {
memset(call,0,sizeof(call));
if (data[0] == 'l')
memcpy(call,data + 1,i - 1);
else
memcpy(call,data,i);
return call;
}
return NULL;
}
/* check access controls (allowed and denied callsigns) */
static int
checkaccesscontrols (char *call)
{
regex_t reg;
int result;
if (denied_pattern != NULL && denied_pattern[0] != '\0' &&
regcomp(&reg,denied_pattern,REG_EXTENDED|REG_ICASE|REG_NOSUB) == 0)
{
result = regexec(&reg,call,0,NULL,0);
regfree(&reg);
if (result == 0)
return 0;
}
if (allowed_pattern != NULL && allowed_pattern[0] != '\0' &&
regcomp(&reg,allowed_pattern,REG_EXTENDED|REG_ICASE|REG_NOSUB) == 0)
{
result = regexec(&reg,call,0,NULL,0);
regfree(&reg);
if (result != 0)
return 0;
}
return 1;
}
/* check if an RTCP message is a BYE message */
/* look for these two two-byte sequences: 0xC0 0xC9 and 0xE1 0xCB */
static int
isbyepacket (char *data,int len)
{
int i;
for (i = 0; i < (len - 1); i++)
if ((uint8_t)data[i] == 0xC0 && (uint8_t)data[i+1] == 0xC9) {
while (i < (len - 1)) {
if ((uint8_t)data[i] == 0xE1 && (uint8_t)data[i+1] == 0xCB)
return 1;
i++;
}
break;
}
return 0;
}
/* build an RTCP BYE message in the global buffer
000000: c0 c9 00 01 00 00 00 00 e1 cb 00 09 00 00 00 00 ................
000010: 13 4e 6f 20 72 6f 75 74 65 20 61 76 61 69 6c 61 .No route availa
000020: 62 6c 65 00 00 00 00 04 ble.....
*/
static int
buildbyepacket ()
{
char *p = buf;
memset(buf,0,40);
/* empty RR */
*p++ = 0xC0; /* ver 3, no padding, RC=0 */
*p++ = 0xC9; /* payload=201 */
p++; *p++ = 1; /* len=1 */
p += 4; /* SSRC of packet sender (0) */
/* BYE (count=1) */
*p++ = 0xE1; /* ver 3, padding, SC=1 */
*p++ = 0xCB; /* payload=203 */
p++; *p++ = 9; /* len=9 */
p += 4; /* SSRC/CSRC_1 (0) */
*p++ = 19; /* padded length */
strcpy(p,"No route available"); /* reason for leaving */
p += 19;
p += 3; *p++ = 4; /* 4 more bytes, last being 4 */
return p - buf;
}
/* read the configuration file */
/* syntax: lines with name=value (no quotes), # in 1st column for comments */
static int
readconfig (char *config)
{
FILE *cfile;
char *p,*q;
struct confvar *cv;
if ((cfile = fopen(config,"r")) == 0) {
fprintf(stderr,"Cannot open config file %s\n",config);
return -1;
}
while (fgets(buf,sizeof(buf),cfile) != NULL) {
if ((p = strchr(buf,'\r')) != NULL || (p = strchr(buf,'\n')) != NULL)
*p = '\0';
p = buf;
while (*p == ' ' || *p == '\t')
p++;
if (*p == '\0' || *p == '#')
continue;
if ((q = strchr(p,'=')) == NULL) {
fprintf(stderr,"Invalid config line: %s\n",p);
continue;
}
*q++ = '\0';
cv = calloc(1,sizeof(struct confvar));
cv->name = strdup(p);
cv->value = strdup(q);
cv->next = configvars;
configvars = cv;
}
fclose(cfile);
return 0;
}
/* get the value of a configuration variable */
static char *
confvar (char *name)
{
struct confvar *cv;
for (cv = configvars; cv != NULL; cv = cv->next)
if (!strcmp(name,cv->name))
return cv->value;
return NULL;
}
/* register a filedescriptor for epoll */
struct descriptor *
registerfd (int fd,enum fdtype fdtype,uint32_t events)
{
struct descriptor *ds;
static struct epoll_event epoll_event = {0};
if (fd < 0 || fd >= FD_SETSIZE)
return NULL; /* too big to register */
epoll_event.events = events;
epoll_event.data.fd = fd;
if (epoll_ctl(epoll_fd,EPOLL_CTL_ADD,fd,&epoll_event) < 0) {
logmsg("epoll_ctl add failed (%d %s)",fd,strerror(errno));
return NULL;
}
minfd = min(minfd,fd); /* keep track of min fd */
maxfd = max(maxfd,fd); /* keep track of max fd */
ds = &desc[fd];
ds->fdtype = fdtype;
ds->events = events;
return ds;
}
/* modify a filedescriptor's events for epoll */
static int
modifyfd (int fd,uint32_t events)
{
static struct epoll_event epoll_event = {0};
if (fd < 0 || fd >= FD_SETSIZE)
return -1;
epoll_event.events = events;
epoll_event.data.fd = fd;
if (epoll_ctl(epoll_fd,EPOLL_CTL_MOD,fd,&epoll_event) < 0) {
logmsg("epoll_ctl mod failed (%d %s)",fd,strerror(errno));
return -1;
}
desc[fd].events = events;
numevents = 0;
return 0;
}
/* remove a filedescriptor registration */
static int
unregisterfd (int fd)
{
if (fd < 0 || fd >= FD_SETSIZE)
return -1;
memset(&desc[fd],0,sizeof(struct descriptor));
epoll_ctl(epoll_fd,EPOLL_CTL_DEL,fd,NULL); /* ignore errors */
if (fd == maxfd) {
while (--fd >= 0 && desc[fd].fdtype == UNUSED)
;
maxfd = fd;
}
numevents = 0;
return 0;
}
/* lookup host (DNS or literal), convert to sockaddr and return plain IP */
static int
hostlookup (char *name,struct sockaddr *addr,char *text,size_t len)
{
struct hostent *hp;
if ((hp = gethostbyname2(name,AF_INET)) == NULL)
return -1;
memset(addr,0,sizeof(struct sockaddr));
addr->sa_family = hp->h_addrtype;
memcpy(&IN_ADDR(*addr),hp->h_addr_list[0],hp->h_length);
if (text != NULL && len != 0)
getnameinfo(addr,sizeof(struct sockaddr),text,len,
NULL,0,NI_NUMERICHOST);
return 0;
}
/* bind server port and listen */
static int
bindport (int port,int socktype)
{
int v,fd;
struct addrinfo *result,*rp;
struct addrinfo hints;
char port_s[16];
snprintf(port_s,sizeof(port_s),"%d",port);
memset(&hints,0,sizeof(struct addrinfo));
hints.ai_family = AF_INET; /* allow IPv4 only */
hints.ai_socktype = socktype;
hints.ai_flags = AI_PASSIVE; /* for wildcard IP address */
if ((v = getaddrinfo(NULL,port_s,&hints,&result)) != 0) {
fprintf(stderr,"getaddrinfo: %s\n",gai_strerror(v));
return 1;
}
/* getaddrinfo() returns a list of address structures.
Try each address until we successfully bind(2).
If socket(2) (or bind(2)) fails, we (close the socket
and) try the next address. */
for (rp = result; rp != NULL; rp = rp->ai_next) {
fd = socket(rp->ai_family,rp->ai_socktype,rp->ai_protocol);
if (fd == -1)
continue;
v = 1;
setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,&v,sizeof(v));
if (bind(fd,rp->ai_addr,rp->ai_addrlen) == 0)
break; /* success */
close(fd);
}
freeaddrinfo(result); /* no longer needed */
if (rp == NULL) { /* no address succeeded */
fprintf(stderr,"Could not bind\n");
return -2;
}
v = fcntl(fd,F_GETFL,0); /* set nonblocking mode */
fcntl(fd,F_SETFL,v|O_NONBLOCK);
if (socktype == SOCK_STREAM && listen(fd,3)) {
fprintf(stderr,"Could not listen\n");
return -3;
}
if (socktype == SOCK_DGRAM) {
v = 1;
if (setsockopt(fd,IPPROTO_IP,IP_PKTINFO,&v,sizeof(v)) < 0)
return -4;
}
return fd;
}
/* open TCP connection */
/* the passed upcall function is called after the connection has been */
/* established. this function can send the first data to the connection. */
/* when nonblock is true, the connect is done in nonblocking mode and the */
/* upcall function is called from the main event loop. */
static int
opentcp (struct sockaddr *from,struct sockaddr *to,int nonblock,
upcall_t upcall,void *ptr)
{
int fd,v,err;
struct descriptor *ds;
if ((fd = socket(AF_INET,SOCK_STREAM,0)) < 0)
return -1;
v = 1;
setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,&v,sizeof(v));
bind(fd,from,sizeof(struct sockaddr));
if (nonblock) {
v = fcntl(fd,F_GETFL,0); /* set nonblocking mode */
fcntl(fd,F_SETFL,v|O_NONBLOCK);
}
if (connect(fd,to,sizeof(struct sockaddr)) < 0) {
if (nonblock && errno == EINPROGRESS) {
/* register the fd for later upcall, but not for input yet */
if ((ds = registerfd(fd,CONNECT_FD,EPOLLOUT)) != NULL) {
ds->timeout = now + CONNECT_TIMEOUT;
next = 0;
ds->upcall = upcall;
ds->ptr = ptr;
return fd;
}
}
if (debug)
logmsg("opentcp connect failed nb=%d",nonblock);
close(fd);
return -1;
}
/* continue here when the connect could be completed synchronously */
v = fcntl(fd,F_GETFL,0); /* set nonblocking mode */
err = fcntl(fd,F_SETFL,v|O_NONBLOCK);
/* call the upcall function now, it should close the fd when err */
err |= opentcp_upcall(fd,upcall,ptr);
return err? -1 : fd;
}
/* set the final modes of the TCP socket and perform the upcall function */
static int
opentcp_upcall (int fd,upcall_t upcall,void *ptr)
{
int v,err;
socklen_t len;
if (desc[fd].fdtype == CONNECT_FD)
unregisterfd(fd); /* unregister when async mode */
err = errno = 0;
len = sizeof(err);
getsockopt(fd,SOL_SOCKET,SO_ERROR,&err,&len);
if (!errno && !err) {
v = 256*1024; /* set big buffer */
err |= setsockopt(fd,SOL_SOCKET,SO_SNDBUF,&v,sizeof(v));
v = 1; /* set keepalive mode */
err |= setsockopt(fd,SOL_SOCKET,SO_KEEPALIVE,&v,sizeof(v));
}
if (upcall != NULL)
(*upcall)(fd,ptr,err); /* call the upcall function */
/* it should close the fd when err */
return err;
}
/* receive datagram from UDP socket including addresses */
static int
recvudp (int fd,in_addr_t *laddr,struct sockaddr *raddr,char *buf1,int len1)
{
int nread;
struct msghdr msg;
struct cmsghdr *cp;
struct iovec iov[1];
struct in_pktinfo *pktinfo;
char cmsg[128];
msg.msg_name = raddr;
msg.msg_namelen = sizeof(struct sockaddr);
msg.msg_iov = iov;
msg.msg_iovlen = 1;
msg.msg_control = cmsg;
msg.msg_controllen = sizeof(cmsg);
msg.msg_flags = 0;
iov[0].iov_base = buf1;
iov[0].iov_len = len1;
if ((nread = recvmsg(fd,&msg,0)) < 0)
return nread;
for (cp = CMSG_FIRSTHDR(&msg); cp != NULL; cp = CMSG_NXTHDR(&msg,cp))
if (cp->cmsg_level == IPPROTO_IP && cp->cmsg_type == IP_PKTINFO) {
pktinfo = (struct in_pktinfo *)CMSG_DATA(cp);
*laddr = pktinfo->ipi_addr.s_addr;
break;
}
return nread;
}
/* send datagram to UDP socket specifying addresses */
static int
sendudp (int fd,in_addr_t laddr,struct sockaddr *raddr,
char *buf1,int len1,char *buf2,int len2)
{
struct msghdr msg;
struct cmsghdr *cp;
struct iovec iov[2];
struct in_pktinfo *pktinfo;
char cmsg[128];
msg.msg_name = raddr;
msg.msg_namelen = sizeof(struct sockaddr);
msg.msg_iov = iov;
msg.msg_iovlen = 1;
msg.msg_control = cmsg;
msg.msg_controllen = sizeof(cmsg);
msg.msg_flags = 0;
iov[0].iov_base = buf1;
iov[0].iov_len = len1;
if (len2) {
iov[1].iov_base = buf2;
iov[1].iov_len = len2;
msg.msg_iovlen = 2;
}
cp = CMSG_FIRSTHDR(&msg);
cp->cmsg_level = IPPROTO_IP;
cp->cmsg_type = IP_PKTINFO;
cp->cmsg_len = CMSG_LEN(sizeof(struct in_pktinfo));
pktinfo = (struct in_pktinfo *)CMSG_DATA(cp);
memset(pktinfo,0,sizeof(struct in_pktinfo));
pktinfo->ipi_spec_dst.s_addr = laddr;
msg.msg_controllen = cp->cmsg_len;
return sendmsg(fd,&msg,0);
}
/* return timestamp in microseconds */
static int64_t
timestamp ()
{
struct timeval tv;
gettimeofday(&tv,NULL);
return ((int64_t)tv.tv_sec * USEC) + tv.tv_usec;
}
/* URL encode of a string to a malloc'ed buffer */
static char *urlencode(char *s) {
char *tbuf = malloc(strlen(s) * 3 + 1), *pbuf = tbuf;
unsigned char c;
while ((c = *s++) != '\0') {
if (isalnum(c) || strchr("-_.~",c) != NULL)
*pbuf++ = c;
else if (c == ' ')
*pbuf++ = '+';
else
*pbuf++ = '%', snprintf(pbuf,3,"%02x",c), pbuf += 2;
}
*pbuf++ = '\0';
return realloc(tbuf,pbuf - tbuf);
}