More race cleanup, (hc.tuns).Data/ShutdownTun() races A,B,C still remain.

Signed-off-by: Russ Magee <rmagee@gmail.com>
This commit is contained in:
Russ Magee 2019-06-28 22:50:58 -07:00
parent 78b2006af6
commit 3a720cfb8e
2 changed files with 25 additions and 13 deletions

View File

@ -123,11 +123,11 @@ func Init(d bool, c string, f logger.Priority) {
} }
func (hc *Conn) Lock() { func (hc *Conn) Lock() {
hc.m.Lock() hc.m.Lock()
} }
func (hc *Conn) Unlock() { func (hc *Conn) Unlock() {
hc.m.Unlock() hc.m.Unlock()
} }
func (hc Conn) GetStatus() CSOType { func (hc Conn) GetStatus() CSOType {
@ -1136,7 +1136,9 @@ func (hc Conn) Read(b []byte) (n int, err error) {
_ = binary.BigEndian.Uint16(payloadBytes[0:2]) _ = binary.BigEndian.Uint16(payloadBytes[0:2])
//logger.LogDebug(fmt.Sprintf("[Server] Got CSOTunKeepAlive")) //logger.LogDebug(fmt.Sprintf("[Server] Got CSOTunKeepAlive"))
for _, t := range *hc.tuns { for _, t := range *hc.tuns {
hc.Lock()
t.KeepAlive = 0 t.KeepAlive = 0
hc.Unlock()
} }
} else if ctrlStatOp == CSONone { } else if ctrlStatOp == CSONone {
hc.dBuf.Write(payloadBytes) hc.dBuf.Write(payloadBytes)

View File

@ -16,7 +16,6 @@ import (
"net" "net"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"blitter.com/go/hkexsh/logger" "blitter.com/go/hkexsh/logger"
@ -156,7 +155,7 @@ func (hc *Conn) StartClientTunnel(lport, rport uint16) {
if hc.TunIsAlive(rport) { if hc.TunIsAlive(rport) {
hc.WritePacket(tunDst.Bytes(), CSOTunHangup) hc.WritePacket(tunDst.Bytes(), CSOTunHangup)
} }
hc.ShutdownTun(rport) hc.ShutdownTun(rport) // FIXME: race-C
break break
} else if strings.Contains(e.Error(), "i/o timeout") { } else if strings.Contains(e.Error(), "i/o timeout") {
if !hc.TunIsAlive(rport) { if !hc.TunIsAlive(rport) {
@ -200,7 +199,7 @@ func (hc *Conn) StartClientTunnel(lport, rport uint16) {
logger.LogDebug("[ClientTun] worker B: starting") logger.LogDebug("[ClientTun] worker B: starting")
for { for {
bytes, ok := <-(*hc.tuns)[rport].Data bytes, ok := <-(*hc.tuns)[rport].Data // FIXME: race-C w/ShutdownTun calls
if ok { if ok {
c.SetWriteDeadline(time.Now().Add(200 * time.Millisecond)) c.SetWriteDeadline(time.Now().Add(200 * time.Millisecond))
_, e := c.Write(bytes) _, e := c.Write(bytes)
@ -231,11 +230,22 @@ func (hc *Conn) StartClientTunnel(lport, rport uint16) {
} }
func (hc *Conn) AgeTunnel(endp uint16) uint32 { func (hc *Conn) AgeTunnel(endp uint16) uint32 {
return atomic.AddUint32(&(*hc.tuns)[endp].KeepAlive, 1) hc.Lock()
defer hc.Unlock()
(*hc.tuns)[endp].KeepAlive += 1
return (*hc.tuns)[endp].KeepAlive
} }
func (hc *Conn) ResetTunnelAge(endp uint16) { func (hc *Conn) ResetTunnelAge(endp uint16) {
atomic.StoreUint32(&(*hc.tuns)[endp].KeepAlive, 0) hc.Lock()
defer hc.Unlock()
(*hc.tuns)[endp].KeepAlive = 0
}
func (hc *Conn) TunIsNil(endp uint16) bool {
hc.Lock()
defer hc.Unlock()
return (*hc.tuns)[endp] == nil
} }
func (hc *Conn) TunIsAlive(endp uint16) bool { func (hc *Conn) TunIsAlive(endp uint16) bool {
@ -279,7 +289,7 @@ func (hc *Conn) StartServerTunnel(lport, rport uint16) {
defer wg.Done() defer wg.Done()
for { for {
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
if (*hc.tuns)[rport] == nil { if hc.TunIsNil(rport) {
logger.LogDebug("[ServerTun] worker A: Client endpoint removed.") logger.LogDebug("[ServerTun] worker A: Client endpoint removed.")
break break
} }
@ -297,7 +307,7 @@ func (hc *Conn) StartServerTunnel(lport, rport uint16) {
logger.LogDebug(fmt.Sprintf("[ServerTun] got Ctl '%c'.", cmd)) logger.LogDebug(fmt.Sprintf("[ServerTun] got Ctl '%c'.", cmd))
if cmd == 'd' { if cmd == 'd' {
// if re-using tunnel, re-init it // if re-using tunnel, re-init it
if (*hc.tuns)[rport] == nil { if hc.TunIsNil(rport) {
hc.InitTunEndpoint(lport, "", rport) hc.InitTunEndpoint(lport, "", rport)
} }
logger.LogDebug("[ServerTun] dialling...") logger.LogDebug("[ServerTun] dialling...")
@ -345,12 +355,12 @@ func (hc *Conn) StartServerTunnel(lport, rport uint16) {
if hc.TunIsAlive(rport) { if hc.TunIsAlive(rport) {
hc.WritePacket(tunDst.Bytes(), CSOTunDisconn) hc.WritePacket(tunDst.Bytes(), CSOTunDisconn)
} }
hc.ShutdownTun(rport) hc.ShutdownTun(rport) // FIXME: race-A
break break
} else if strings.Contains(e.Error(), "i/o timeout") { } else if strings.Contains(e.Error(), "i/o timeout") {
if !hc.TunIsAlive(rport) { if !hc.TunIsAlive(rport) {
logger.LogDebug(fmt.Sprintf("[ServerTun] worker A: timeout: Server side died, hanging up %v", (*hc.tuns)[rport])) logger.LogDebug(fmt.Sprintf("[ServerTun] worker A: timeout: Server side died, hanging up %v", (*hc.tuns)[rport]))
hc.ShutdownTun(rport) hc.ShutdownTun(rport) // FIXME: race-B
break break
} }
} else { } else {
@ -358,7 +368,7 @@ func (hc *Conn) StartServerTunnel(lport, rport uint16) {
if hc.TunIsAlive(rport) { if hc.TunIsAlive(rport) {
hc.WritePacket(tunDst.Bytes(), CSOTunDisconn) hc.WritePacket(tunDst.Bytes(), CSOTunDisconn)
} }
hc.ShutdownTun(rport) hc.ShutdownTun(rport) // FIXME: race-C
break break
} }
} }
@ -383,7 +393,7 @@ func (hc *Conn) StartServerTunnel(lport, rport uint16) {
logger.LogDebug("[ServerTun] worker B: starting") logger.LogDebug("[ServerTun] worker B: starting")
for { for {
rData, ok := <-(*hc.tuns)[rport].Data // FIXME: race w/ShutdownTun() calls rData, ok := <-(*hc.tuns)[rport].Data // FIXME: race-A, race-B, race-C (w/ShutdownTun() calls)
if ok { if ok {
c.SetWriteDeadline(time.Now().Add(200 * time.Millisecond)) c.SetWriteDeadline(time.Now().Add(200 * time.Millisecond))
_, e := c.Write(rData) _, e := c.Write(rData)