tpws: multi thread resolver

This commit is contained in:
bol-van
2024-04-02 18:14:03 +03:00
parent 81357a41bf
commit 1b01b62b34
22 changed files with 451 additions and 80 deletions

View File

@@ -22,6 +22,7 @@
#include "tamper.h"
#include "socks.h"
#include "helpers.h"
#include "hostlist.h"
// keep separate legs counter. counting every time thousands of legs can consume cpu
@@ -333,7 +334,7 @@ static bool proxy_remote_conn_ack(tproxy_conn_t *conn, int sock_err)
//Createas a socket and initiates the connection to the host specified by
//remote_addr.
//Returns -1 if something fails, >0 on success (socket fd).
static int connect_remote(const struct sockaddr *remote_addr)
static int connect_remote(const struct sockaddr *remote_addr, bool bApplyConnectionFooling)
{
int remote_fd = 0, yes = 1, no = 0, v;
@@ -351,7 +352,7 @@ static int connect_remote(const struct sockaddr *remote_addr)
close(remote_fd);
return -1;
}
if(setsockopt(remote_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) < 0)
if (setsockopt(remote_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) < 0)
{
perror("setsockopt (SO_REUSEADDR, connect_remote)");
close(remote_fd);
@@ -359,7 +360,7 @@ static int connect_remote(const struct sockaddr *remote_addr)
}
if (!set_socket_buffers(remote_fd, params.remote_rcvbuf, params.remote_sndbuf))
return -1;
if(!set_keepalive(remote_fd))
if (!set_keepalive(remote_fd))
{
perror("set_keepalive");
close(remote_fd);
@@ -371,7 +372,7 @@ static int connect_remote(const struct sockaddr *remote_addr)
close(remote_fd);
return -1;
}
if (params.mss)
if (bApplyConnectionFooling && params.mss)
{
uint16_t port = saport(remote_addr);
if (pf_in_range(port,&params.mss_pf))
@@ -389,7 +390,7 @@ static int connect_remote(const struct sockaddr *remote_addr)
VPRINT("Not setting MSS. Port %u is out of MSS port range.",port)
}
}
if(connect(remote_fd, remote_addr, remote_addr->sa_family == AF_INET ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6)) < 0)
if (connect(remote_fd, remote_addr, remote_addr->sa_family == AF_INET ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6)) < 0)
{
if(errno != EINPROGRESS)
{
@@ -417,6 +418,7 @@ static void free_conn(tproxy_conn_t *conn)
conn_free_buffers(conn);
if (conn->partner) conn->partner->partner=NULL;
if (conn->track.hostname) free(conn->track.hostname);
if (conn->socks_ri) conn->socks_ri->ptr = NULL; // detach conn
free(conn);
}
static tproxy_conn_t *new_conn(int fd, bool remote)
@@ -536,14 +538,14 @@ static tproxy_conn_t* add_tcp_connection(int efd, struct tailhead *conn_list,int
if (proxy_type==CONN_TYPE_TRANSPARENT)
{
if ((remote_fd = connect_remote((struct sockaddr *)&orig_dst)) < 0)
if ((remote_fd = connect_remote((struct sockaddr *)&orig_dst, true)) < 0)
{
fprintf(stderr, "Failed to connect\n");
close(local_fd);
return NULL;
}
}
if(!(conn = new_conn(local_fd, false)))
{
if (remote_fd) close(remote_fd);
@@ -700,7 +702,14 @@ bool proxy_mode_connect_remote(const struct sockaddr *sa, tproxy_conn_t *conn, s
return false;
}
if ((remote_fd = connect_remote(sa)) < 0)
bool bConnFooling=true;
if (conn->track.hostname && params.mss)
{
VPRINT("0-phase desync hostlist check")
bConnFooling=HostlistCheck(conn->track.hostname, NULL);
}
if ((remote_fd = connect_remote(sa, bConnFooling)) < 0)
{
fprintf(stderr, "socks failed to connect (1) errno=%d\n", errno);
socks_send_rep_errno(conn->socks_ver, conn->fd, errno);
@@ -726,7 +735,7 @@ bool proxy_mode_connect_remote(const struct sockaddr *sa, tproxy_conn_t *conn, s
TAILQ_INSERT_HEAD(conn_list, conn->partner, conn_ptrs);
legs_remote++;
print_legs();
DBGPRINT("socks connecting")
DBGPRINT("S_WAIT_CONNECTION")
conn->socks_state = S_WAIT_CONNECTION;
return true;
}
@@ -865,10 +874,8 @@ static bool handle_proxy_mode(tproxy_conn_t *conn, struct tailhead *conn_list)
((struct sockaddr_in6*)&ss)->sin6_scope_id = 0;
break;
case S5_ATYP_DOM:
// NOTE : resolving is blocking. do you want it really ?
{
struct addrinfo *ai,hints;
char sdom[256];
int r;
uint16_t port;
char sport[6];
@@ -886,26 +893,18 @@ static bool handle_proxy_mode(tproxy_conn_t *conn, struct tailhead *conn_list)
socks5_send_rep(conn->fd,S5_REP_HOST_UNREACHABLE);
return false;
}
snprintf(sport,sizeof(sport),"%u",port);
memcpy(sdom,m->dd.domport,m->dd.len);
sdom[m->dd.len] = '\0';
DBGPRINT("socks5 resolving hostname '%s' port '%s'",sdom,sport)
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_socktype = SOCK_STREAM;
r=getaddrinfo(sdom,sport,&hints,&ai);
if (r)
m->dd.domport[m->dd.len] = 0;
DBGPRINT("socks5 queue resolve hostname '%s' port '%u'",m->dd.domport,port)
conn->socks_ri = resolver_queue(m->dd.domport,port,conn);
if (!conn->socks_ri)
{
VPRINT("socks5 getaddrinfo error %d",r)
socks5_send_rep(conn->fd,S5_REP_HOST_UNREACHABLE);
VPRINT("socks5 could not queue resolve item")
socks5_send_rep(conn->fd,S5_REP_GENERAL_FAILURE);
return false;
}
if (params.debug>=2)
{
printf("socks5 hostname resolved to :\n");
print_addrinfo(ai);
}
memcpy(&ss,ai->ai_addr,ai->ai_addrlen);
freeaddrinfo(ai);
conn->socks_state=S_WAIT_RESOLVE;
DBGPRINT("S_WAIT_RESOLVE")
return true;
}
break;
default:
@@ -927,6 +926,40 @@ static bool handle_proxy_mode(tproxy_conn_t *conn, struct tailhead *conn_list)
return false;
}
static bool resolve_complete(struct resolve_item *ri, struct tailhead *conn_list)
{
tproxy_conn_t *conn = (tproxy_conn_t *)ri->ptr;
if (conn && (conn->state != CONN_CLOSED))
{
if (conn->socks_state==S_WAIT_RESOLVE)
{
DBGPRINT("resolve_complete %s. getaddrinfo result %d", ri->dom, ri->ga_res);
if (ri->ga_res)
{
socks5_send_rep(conn->fd,S5_REP_HOST_UNREACHABLE);
return false;;
}
else
{
if (!conn->track.hostname)
{
DBGPRINT("resolve_complete put hostname : %s", ri->dom)
conn->track.hostname = strdup(ri->dom);
}
return proxy_mode_connect_remote((struct sockaddr *)&ri->ss,conn,conn_list);
}
}
else
fprintf(stderr, "resolve_complete: conn in wrong socks_state !!! (%s)\n", ri->dom);
}
else
DBGPRINT("resolve_complete: orphaned resolve for %s", ri->dom);
return true;
}
static bool in_tamper_out_range(tproxy_conn_t *conn)
{
return (params.tamper_start_n ? (conn->tnrd+1) : conn->trd) >= params.tamper_start &&
@@ -1226,6 +1259,31 @@ static void conn_close_with_partner_check(struct tailhead *conn_list, struct tai
}
}
static bool handle_resolve_pipe(tproxy_conn_t **conn, struct tailhead *conn_list, int fd)
{
ssize_t rd;
struct resolve_item *ri;
bool b;
rd = read(fd,&ri,sizeof(void*));
if (rd<0)
{
perror("resolve_pipe read");
return false;
}
else if (rd!=sizeof(void*))
{
// partial pointer read is FATAL. in any case it will cause pointer corruption and coredump
fprintf(stderr,"resolve_pipe not full read %zu\n",rd);
exit(1000);
}
b = resolve_complete(ri, conn_list);
*conn = (tproxy_conn_t *)ri->ptr;
if (*conn) (*conn)->socks_ri = NULL;
free(ri);
return b;
}
int event_loop(const int *listen_fd, size_t listen_fd_ct)
{
int retval = 0, num_events = 0;
@@ -1239,8 +1297,11 @@ int event_loop(const int *listen_fd, size_t listen_fd_ct)
size_t sct;
struct sockaddr_storage accept_sa;
socklen_t accept_salen;
int resolve_pipe[2];
if (!listen_fd_ct) return -1;
resolve_pipe[0]=resolve_pipe[1]=0;
legs_local = legs_remote = 0;
//Initialize queue (remember that TAILQ_HEAD just defines the struct)
@@ -1272,6 +1333,34 @@ int event_loop(const int *listen_fd, size_t listen_fd_ct)
goto ex;
}
}
if ((params.proxy_type==CONN_TYPE_SOCKS) && !params.no_resolve)
{
if (pipe(resolve_pipe)==-1)
{
perror("pipe (resolve_pipe)");
retval = -1;
goto ex;
}
if (fcntl(resolve_pipe[0], F_SETFL, O_NONBLOCK) < 0)
{
perror("resolve_pipe set O_NONBLOCK");
retval = -1;
goto ex;
}
ev.data.ptr = NULL;
if (epoll_ctl(efd, EPOLL_CTL_ADD, resolve_pipe[0], &ev) == -1) {
perror("epoll_ctl (listen socket)");
retval = -1;
goto ex;
}
if (!resolver_init(params.resolver_threads,resolve_pipe[1]))
{
fprintf(stderr,"could not initialize multithreaded resolver\n");
retval = -1;
goto ex;
}
VPRINT("initialized multi threaded resolver with %d threads",resolver_thread_count());
}
for(;;)
{
@@ -1290,8 +1379,21 @@ int event_loop(const int *listen_fd, size_t listen_fd_ct)
for (i = 0; i < num_events; i++)
{
conn = (tproxy_conn_t*)events[i].data.ptr;
if (!conn)
{
DBGPRINT("\nEVENT mask %08X resolve_pipe",events[i].events)
if (events[i].events & EPOLLIN)
{
DBGPRINT("EPOLLIN")
if (!handle_resolve_pipe(&conn, &conn_list, resolve_pipe[0]))
{
DBGPRINT("handle_resolve_pipe false")
if (conn) close_tcp_conn(&conn_list,&close_list,conn);
}
}
continue;
}
conn->event_count++;
if (conn->listener)
{
DBGPRINT("\nEVENT mask %08X fd=%d accept",events[i].events,conn->fd)
@@ -1352,7 +1454,7 @@ int event_loop(const int *listen_fd, size_t listen_fd_ct)
VPRINT("Socket fd=%d (partner_fd=%d, remote=%d) %s so_error=%d (%s)",conn->fd,conn->partner ? conn->partner->fd : 0,conn->remote,se,errn,strerror(errn));
proxy_remote_conn_ack(conn,errn);
read_all_and_buffer(conn,3);
if (errn==ECONNRESET && conn->remote && conn_partner_alive(conn)) rst_in(&conn->partner->track);
if (errn==ECONNRESET && conn->remote && params.tamper && conn_partner_alive(conn)) rst_in(&conn->partner->track);
conn_close_with_partner_check(&conn_list,&close_list,conn);
continue;
@@ -1370,7 +1472,7 @@ int event_loop(const int *listen_fd, size_t listen_fd_ct)
{
DBGPRINT("EPOLLRDHUP")
read_all_and_buffer(conn,2);
if (!conn->remote) hup_out(&conn->track);
if (!conn->remote && params.tamper) hup_out(&conn->track);
if (conn_has_unsent(conn))
{
@@ -1419,6 +1521,9 @@ int event_loop(const int *listen_fd, size_t listen_fd_ct)
ex:
if (efd) close(efd);
if (listen_conn) free(listen_conn);
resolver_deinit();
if (resolve_pipe[0]) close(resolve_pipe[0]);
if (resolve_pipe[1]) close(resolve_pipe[1]);
return retval;
}