Much improved tunnel state management: server-side mostly working, client-side re-listen -> svr re-Dial() still required

This commit is contained in:
Russ Magee 2018-11-01 22:14:01 -07:00
parent a425afe9b6
commit d2322af34b
3 changed files with 35 additions and 63 deletions

View File

@ -816,6 +816,7 @@ func (hc Conn) Read(b []byte) (n int, err error) {
rport := binary.BigEndian.Uint16(payloadBytes[2:4]) rport := binary.BigEndian.Uint16(payloadBytes[2:4])
logger.LogDebug(fmt.Sprintf("Read(): Tunnel setup [%d:%d]", lport, rport)) logger.LogDebug(fmt.Sprintf("Read(): Tunnel setup [%d:%d]", lport, rport))
hc.StartServerTunnel(lport, rport) hc.StartServerTunnel(lport, rport)
hc.tuns[rport].Ctl <- 'a' // Dial() rport
} else if ctrlStatOp == CSOTunSetupAck { } else if ctrlStatOp == CSOTunSetupAck {
lport := binary.BigEndian.Uint16(payloadBytes[0:2]) lport := binary.BigEndian.Uint16(payloadBytes[0:2])
rport := binary.BigEndian.Uint16(payloadBytes[2:4]) rport := binary.BigEndian.Uint16(payloadBytes[2:4])
@ -826,22 +827,29 @@ func (hc Conn) Read(b []byte) (n int, err error) {
lport := binary.BigEndian.Uint16(payloadBytes[0:2]) lport := binary.BigEndian.Uint16(payloadBytes[0:2])
rport := binary.BigEndian.Uint16(payloadBytes[2:4]) rport := binary.BigEndian.Uint16(payloadBytes[2:4])
logger.LogDebug(fmt.Sprintf("Read(): Tunnel refused [%d:%d]", lport, rport)) logger.LogDebug(fmt.Sprintf("Read(): Tunnel refused [%d:%d]", lport, rport))
hc.dBuf.Write(payloadBytes)
} else if ctrlStatOp == CSOTunDisconn { } else if ctrlStatOp == CSOTunDisconn {
// server side's rport has disconnected (server lost) // server side's rport has disconnected (server lost)
lport := binary.BigEndian.Uint16(payloadBytes[0:2]) lport := binary.BigEndian.Uint16(payloadBytes[0:2])
rport := binary.BigEndian.Uint16(payloadBytes[2:4]) rport := binary.BigEndian.Uint16(payloadBytes[2:4])
logger.LogDebug(fmt.Sprintf("Read(): Tunnel server disconnected [%d:%d]", lport, rport)) logger.LogDebug(fmt.Sprintf("Read(): Tunnel server disconnected [%d:%d]", lport, rport))
hc.dBuf.Write(payloadBytes)
} else if ctrlStatOp == CSOTunHangup { } else if ctrlStatOp == CSOTunHangup {
// client side's lport has hung up // client side's lport has hung up
lport := binary.BigEndian.Uint16(payloadBytes[0:2]) lport := binary.BigEndian.Uint16(payloadBytes[0:2])
rport := binary.BigEndian.Uint16(payloadBytes[2:4]) rport := binary.BigEndian.Uint16(payloadBytes[2:4])
logger.LogDebug(fmt.Sprintf("Read(): Tunnel client hung up [%d:%d]", lport, rport)) logger.LogDebug(fmt.Sprintf("Read(): Tunnel client hung up [%d:%d]", lport, rport))
hc.dBuf.Write(payloadBytes)
} else if ctrlStatOp == CSOTunData { } else if ctrlStatOp == CSOTunData {
lport := binary.BigEndian.Uint16(payloadBytes[0:2]) lport := binary.BigEndian.Uint16(payloadBytes[0:2])
rport := binary.BigEndian.Uint16(payloadBytes[2:4]) rport := binary.BigEndian.Uint16(payloadBytes[2:4])
//fmt.Printf("[Got CSOTunData: [lport %d:rport %d] data:%v\n", lport, rport, payloadBytes[4:]) //fmt.Printf("[Got CSOTunData: [lport %d:rport %d] data:%v\n", lport, rport, payloadBytes[4:])
logger.LogDebug(fmt.Sprintf("[Writing data to rport [%d:%d] %v", lport, rport, payloadBytes[4:])) if hc.tuns[rport] != nil {
logger.LogDebug(fmt.Sprintf("[Writing data to rport [%d:%d]", lport, rport))
hc.tuns[rport].Data <- payloadBytes[4:] hc.tuns[rport].Data <- payloadBytes[4:]
} else {
logger.LogDebug(fmt.Sprintf("[Attempt to write data to closed tun [%d:%d]", lport, rport))
}
} else if ctrlStatOp == CSONone { } else if ctrlStatOp == CSONone {
hc.dBuf.Write(payloadBytes) hc.dBuf.Write(payloadBytes)
} else { } else {

View File

@ -42,15 +42,9 @@ type (
Rport uint16 // Names are from client's perspective Rport uint16 // Names are from client's perspective
Lport uint16 // ... ie., RPort is on server, LPort is on client Lport uint16 // ... ie., RPort is on server, LPort is on client
Peer string //net.Addr Peer string //net.Addr
//Status byte //Last status of tunnel (eg., CSOTunSetupAck) Ctl chan rune //See TunCtl_* consts
Ctl chan<- rune //See TunCtl_* consts
Data chan []byte Data chan []byte
} }
//TunPacket struct {
// n uint32
// data []byte
//}
) )
func (hc *Conn) InitTunEndpoint(lp uint16, p string /* net.Addr */, rp uint16) { func (hc *Conn) InitTunEndpoint(lp uint16, p string /* net.Addr */, rp uint16) {
@ -64,22 +58,18 @@ func (hc *Conn) InitTunEndpoint(lp uint16, p string /* net.Addr */, rp uint16) {
p = addrs[0].String() p = addrs[0].String()
} }
hc.tuns[rp] = &TunEndpoint{ /*Status: CSOTunSetup,*/ Peer: p, hc.tuns[rp] = &TunEndpoint{ /*Status: CSOTunSetup,*/ Peer: p,
Lport: lp, Rport: rp, Data: make(chan[]byte, 32), Ctl: make(chan<- rune)} Lport: lp, Rport: rp, Data: make(chan []byte, 1),
Ctl: make(chan rune, 1)}
logger.LogDebug(fmt.Sprintf("InitTunEndpoint [%d:%s:%d]\n", lp, p, rp)) logger.LogDebug(fmt.Sprintf("InitTunEndpoint [%d:%s:%d]\n", lp, p, rp))
} }
return return
} }
//func (hc *Conn) GetTunStatus(rp uint16) byte {
// return hc.tuns[rp].Status
//}
func (hc *Conn) StartClientTunnel(lport, rport uint16) { func (hc *Conn) StartClientTunnel(lport, rport uint16) {
hc.InitTunEndpoint(lport, "", rport) hc.InitTunEndpoint(lport, "", rport)
t := hc.tuns[rport] // for convenience t := hc.tuns[rport] // for convenience
go func() { go func() {
logger.LogDebug(fmt.Sprintf("Listening for client tunnel port %d", lport)) logger.LogDebug(fmt.Sprintf("Listening for client tunnel port %d", lport))
l, e := net.Listen("tcp", fmt.Sprintf(":%d", lport)) l, e := net.Listen("tcp", fmt.Sprintf(":%d", lport))
if e != nil { if e != nil {
@ -90,17 +80,12 @@ func (hc *Conn) StartClientTunnel(lport, rport uint16) {
c, e := l.Accept() c, e := l.Accept()
defer func() { defer func() {
//if hc.tuns[rport] != nil {
// close(hc.tuns[rport])
// hc.tuns[rport] = nil
//}
c.Close() c.Close()
}() }()
if e != nil { if e != nil {
logger.LogDebug(fmt.Sprintf("Accept() got error(%v), hanging up.", e)) logger.LogDebug(fmt.Sprintf("Accept() got error(%v), hanging up.", e))
break break
//log.Fatal(err)
} else { } else {
logger.LogDebug(fmt.Sprintln("Accepted tunnel client")) logger.LogDebug(fmt.Sprintln("Accepted tunnel client"))
@ -124,7 +109,6 @@ func (hc *Conn) StartClientTunnel(lport, rport uint16) {
} }
if n > 0 { if n > 0 {
rBuf = append(tunDst.Bytes(), rBuf[:n]...) rBuf = append(tunDst.Bytes(), rBuf[:n]...)
//logger.LogDebug(fmt.Sprintf("Got lport data:%v", tunDst.Bytes()))
hc.WritePacket(rBuf[:n+4], CSOTunData) hc.WritePacket(rBuf[:n+4], CSOTunData)
} }
} }
@ -133,22 +117,16 @@ func (hc *Conn) StartClientTunnel(lport, rport uint16) {
// tunnel lport -> outside client (c) // tunnel lport -> outside client (c)
go func() { go func() {
defer func() { defer func() {
//if hc.tuns[rport] != nil {
// close(hc.tuns[rport])
// hc.tuns[rport] = nil
//}
c.Close() c.Close()
}() }()
for { for {
//fmt.Printf("Reading from client hc.tuns[%d]\n", lport)
bytes, ok := <-t.Data bytes, ok := <-t.Data
if ok { if ok {
//fmt.Printf("[Got this through tunnel:%v]\n", bytes)
c.Write(bytes) c.Write(bytes)
} else { } else {
logger.LogDebug(fmt.Sprintf("[Channel closed?]\n")) logger.LogDebug(fmt.Sprintf("[Channel closed?]\n"))
//break break
} }
} }
}() }()
@ -163,12 +141,16 @@ func (hc *Conn) StartServerTunnel(lport, rport uint16) {
hc.InitTunEndpoint(lport, "", rport) hc.InitTunEndpoint(lport, "", rport)
t := hc.tuns[rport] // for convenience t := hc.tuns[rport] // for convenience
//go func() {
// for cmd := range t.Ctl {
// var c net.Conn
// if cmd == 'a' {
logger.LogDebug("Server dialling...") logger.LogDebug("Server dialling...")
c, err := net.Dial("tcp", fmt.Sprintf(":%d", rport)) c, err := net.Dial("tcp", fmt.Sprintf(":%d", rport))
if err != nil { if err != nil {
logger.LogDebug(fmt.Sprintf("Nothing is serving at rport :%d!", rport)) logger.LogDebug(fmt.Sprintf("Nothing is serving at rport :%d!", rport))
var resp bytes.Buffer var resp bytes.Buffer
binary.Write(&resp, binary.BigEndian, lport) binary.Write(&resp, binary.BigEndian, /*lport*/uint16(0))
binary.Write(&resp, binary.BigEndian, rport) binary.Write(&resp, binary.BigEndian, rport)
hc.WritePacket(resp.Bytes(), CSOTunRefused) hc.WritePacket(resp.Bytes(), CSOTunRefused)
} else { } else {
@ -184,10 +166,6 @@ func (hc *Conn) StartServerTunnel(lport, rport uint16) {
// //
go func() { go func() {
defer func() { defer func() {
//if hc.tuns[rport] != nil {
// close(hc.tuns[rport])
// hc.tuns[rport] = nil
//}
c.Close() c.Close()
}() }()
@ -221,23 +199,27 @@ func (hc *Conn) StartServerTunnel(lport, rport uint16) {
// worker to read data from client (already decrypted) & fwd to rport // worker to read data from client (already decrypted) & fwd to rport
go func() { go func() {
defer func() { defer func() {
//if hc.tuns[rport] != nil {
//close(hc.tuns[rport])
//hc.tuns[rport] = nil
//}
c.Close() c.Close()
}() }()
for { for {
rData, ok := <-t.Data rData, ok := <-t.Data
if ok { if ok {
//logger.LogDebug(fmt.Sprintf("Got client data:%v", rData))
c.Write(rData) c.Write(rData)
} else { } else {
logger.LogDebug("!!! ERROR reading from hc.tuns[] channel !!!") logger.LogDebug("[ERROR reading from hc.tuns[] channel - closed?]")
break break
} }
} }
}() }()
} }
// } else if cmd == 'h' {
// logger.LogDebug("[Server hanging up on rport on behalf of client]")
// c.Close()
// } else {
// logger.LogDebug("[ERR: this should be unreachable]")
// }
// } // t.Ctl read loop
// logger.LogDebug("[ServerTunnel() exiting t.Ctl read loop - channel closed??]")
//}()
} }

View File

@ -349,18 +349,9 @@ func reqTunnel(hc *hkexnet.Conn, lp uint16, p string /*net.Addr*/, rp uint16) {
fmt.Printf("bTmp:%x\n", bTmp.Bytes()) fmt.Printf("bTmp:%x\n", bTmp.Bytes())
logger.LogDebug(fmt.Sprintln("[Client sending CSOTunSetup]")) logger.LogDebug(fmt.Sprintln("[Client sending CSOTunSetup]"))
hc.WritePacket(bTmp.Bytes(), hkexnet.CSOTunSetup) hc.WritePacket(bTmp.Bytes(), hkexnet.CSOTunSetup)
// hkexnet.WritePacket() handles processing of client side tun setup,
// calling hkexnet.StartClientTunnel()
// Server should reply immediately with CSOTunSetupAck[lport:rport] // Server should reply immediately with CSOTunSetupAck[lport:rport]
// hkexnet.Read() on server side handles server side tun setup. // hkexnet.Read() on server side handles server side tun setup.
// CSOTun packets don't reply with acks/naks in the datastream; they
// record the last status (other than CSOTunData) in the TunEndpoint
// .Status field. We can check this here to determine how the request
// completed.
// TODO: Should be a timeout check here of course to avoid hangs...
hc.InitTunEndpoint(lp, p, rp)
resp := make([]byte, 4) resp := make([]byte, 4)
var lpResp, rpResp uint16 var lpResp, rpResp uint16
n, e := io.ReadFull(hc, resp) n, e := io.ReadFull(hc, resp)
@ -369,15 +360,6 @@ func reqTunnel(hc *hkexnet.Conn, lp uint16, p string /*net.Addr*/, rp uint16) {
} else { } else {
lpResp = binary.BigEndian.Uint16(resp[0:2]) lpResp = binary.BigEndian.Uint16(resp[0:2])
rpResp = binary.BigEndian.Uint16(resp[2:4]) rpResp = binary.BigEndian.Uint16(resp[2:4])
//var s byte
//for timeout := 0; timeout < 5; timeout++ {
// s = hc.GetTunStatus(rp)
// if s != hkexnet.CSOTunSetup {
// logger.LogDebug(fmt.Sprintf("[Client tun setup result:%d\n]", s))
// break
// }
// time.Sleep(1 * time.Second)
//}
} }
if lpResp == lp && rpResp == rp { if lpResp == lp && rpResp == rp {