224 lines
6.7 KiB
Go
224 lines
6.7 KiB
Go
|
/*
|
||
|
|
||
|
The remote package provides the pieces to allow Ginkgo test suites to report to remote listeners.
|
||
|
This is used, primarily, to enable streaming parallel test output but has, in principal, broader applications (e.g. streaming test output to a browser).
|
||
|
|
||
|
*/
|
||
|
|
||
|
package parallel_support
|
||
|
|
||
|
import (
|
||
|
"encoding/json"
|
||
|
"io"
|
||
|
"net"
|
||
|
"net/http"
|
||
|
|
||
|
"github.com/onsi/ginkgo/v2/reporters"
|
||
|
"github.com/onsi/ginkgo/v2/types"
|
||
|
)
|
||
|
|
||
|
/*
|
||
|
httpServer spins up on an automatically selected port and listens for communication from the forwarding reporter.
|
||
|
It then forwards that communication to attached reporters.
|
||
|
*/
|
||
|
type httpServer struct {
|
||
|
listener net.Listener
|
||
|
handler *ServerHandler
|
||
|
}
|
||
|
|
||
|
//Create a new server, automatically selecting a port
|
||
|
func newHttpServer(parallelTotal int, reporter reporters.Reporter) (*httpServer, error) {
|
||
|
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
return &httpServer{
|
||
|
listener: listener,
|
||
|
handler: newServerHandler(parallelTotal, reporter),
|
||
|
}, nil
|
||
|
}
|
||
|
|
||
|
//Start the server. You don't need to `go s.Start()`, just `s.Start()`
|
||
|
func (server *httpServer) Start() {
|
||
|
httpServer := &http.Server{}
|
||
|
mux := http.NewServeMux()
|
||
|
httpServer.Handler = mux
|
||
|
|
||
|
//streaming endpoints
|
||
|
mux.HandleFunc("/suite-will-begin", server.specSuiteWillBegin)
|
||
|
mux.HandleFunc("/did-run", server.didRun)
|
||
|
mux.HandleFunc("/suite-did-end", server.specSuiteDidEnd)
|
||
|
mux.HandleFunc("/emit-output", server.emitOutput)
|
||
|
mux.HandleFunc("/progress-report", server.emitProgressReport)
|
||
|
|
||
|
//synchronization endpoints
|
||
|
mux.HandleFunc("/before-suite-completed", server.handleBeforeSuiteCompleted)
|
||
|
mux.HandleFunc("/before-suite-state", server.handleBeforeSuiteState)
|
||
|
mux.HandleFunc("/have-nonprimary-procs-finished", server.handleHaveNonprimaryProcsFinished)
|
||
|
mux.HandleFunc("/aggregated-nonprimary-procs-report", server.handleAggregatedNonprimaryProcsReport)
|
||
|
mux.HandleFunc("/counter", server.handleCounter)
|
||
|
mux.HandleFunc("/up", server.handleUp)
|
||
|
mux.HandleFunc("/abort", server.handleAbort)
|
||
|
|
||
|
go httpServer.Serve(server.listener)
|
||
|
}
|
||
|
|
||
|
//Stop the server
|
||
|
func (server *httpServer) Close() {
|
||
|
server.listener.Close()
|
||
|
}
|
||
|
|
||
|
//The address the server can be reached it. Pass this into the `ForwardingReporter`.
|
||
|
func (server *httpServer) Address() string {
|
||
|
return "http://" + server.listener.Addr().String()
|
||
|
}
|
||
|
|
||
|
func (server *httpServer) GetSuiteDone() chan interface{} {
|
||
|
return server.handler.done
|
||
|
}
|
||
|
|
||
|
func (server *httpServer) GetOutputDestination() io.Writer {
|
||
|
return server.handler.outputDestination
|
||
|
}
|
||
|
|
||
|
func (server *httpServer) SetOutputDestination(w io.Writer) {
|
||
|
server.handler.outputDestination = w
|
||
|
}
|
||
|
|
||
|
func (server *httpServer) RegisterAlive(node int, alive func() bool) {
|
||
|
server.handler.registerAlive(node, alive)
|
||
|
}
|
||
|
|
||
|
//
|
||
|
// Streaming Endpoints
|
||
|
//
|
||
|
|
||
|
//The server will forward all received messages to Ginkgo reporters registered with `RegisterReporters`
|
||
|
func (server *httpServer) decode(writer http.ResponseWriter, request *http.Request, object interface{}) bool {
|
||
|
defer request.Body.Close()
|
||
|
if json.NewDecoder(request.Body).Decode(object) != nil {
|
||
|
writer.WriteHeader(http.StatusBadRequest)
|
||
|
return false
|
||
|
}
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
func (server *httpServer) handleError(err error, writer http.ResponseWriter) bool {
|
||
|
if err == nil {
|
||
|
return false
|
||
|
}
|
||
|
switch err {
|
||
|
case ErrorEarly:
|
||
|
writer.WriteHeader(http.StatusTooEarly)
|
||
|
case ErrorGone:
|
||
|
writer.WriteHeader(http.StatusGone)
|
||
|
case ErrorFailed:
|
||
|
writer.WriteHeader(http.StatusFailedDependency)
|
||
|
default:
|
||
|
writer.WriteHeader(http.StatusInternalServerError)
|
||
|
}
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
func (server *httpServer) specSuiteWillBegin(writer http.ResponseWriter, request *http.Request) {
|
||
|
var report types.Report
|
||
|
if !server.decode(writer, request, &report) {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
server.handleError(server.handler.SpecSuiteWillBegin(report, voidReceiver), writer)
|
||
|
}
|
||
|
|
||
|
func (server *httpServer) didRun(writer http.ResponseWriter, request *http.Request) {
|
||
|
var report types.SpecReport
|
||
|
if !server.decode(writer, request, &report) {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
server.handleError(server.handler.DidRun(report, voidReceiver), writer)
|
||
|
}
|
||
|
|
||
|
func (server *httpServer) specSuiteDidEnd(writer http.ResponseWriter, request *http.Request) {
|
||
|
var report types.Report
|
||
|
if !server.decode(writer, request, &report) {
|
||
|
return
|
||
|
}
|
||
|
server.handleError(server.handler.SpecSuiteDidEnd(report, voidReceiver), writer)
|
||
|
}
|
||
|
|
||
|
func (server *httpServer) emitOutput(writer http.ResponseWriter, request *http.Request) {
|
||
|
output, err := io.ReadAll(request.Body)
|
||
|
if err != nil {
|
||
|
writer.WriteHeader(http.StatusInternalServerError)
|
||
|
return
|
||
|
}
|
||
|
var n int
|
||
|
server.handleError(server.handler.EmitOutput(output, &n), writer)
|
||
|
}
|
||
|
|
||
|
func (server *httpServer) emitProgressReport(writer http.ResponseWriter, request *http.Request) {
|
||
|
var report types.ProgressReport
|
||
|
if !server.decode(writer, request, &report) {
|
||
|
return
|
||
|
}
|
||
|
server.handleError(server.handler.EmitProgressReport(report, voidReceiver), writer)
|
||
|
}
|
||
|
|
||
|
func (server *httpServer) handleBeforeSuiteCompleted(writer http.ResponseWriter, request *http.Request) {
|
||
|
var beforeSuiteState BeforeSuiteState
|
||
|
if !server.decode(writer, request, &beforeSuiteState) {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
server.handleError(server.handler.BeforeSuiteCompleted(beforeSuiteState, voidReceiver), writer)
|
||
|
}
|
||
|
|
||
|
func (server *httpServer) handleBeforeSuiteState(writer http.ResponseWriter, request *http.Request) {
|
||
|
var beforeSuiteState BeforeSuiteState
|
||
|
if server.handleError(server.handler.BeforeSuiteState(voidSender, &beforeSuiteState), writer) {
|
||
|
return
|
||
|
}
|
||
|
json.NewEncoder(writer).Encode(beforeSuiteState)
|
||
|
}
|
||
|
|
||
|
func (server *httpServer) handleHaveNonprimaryProcsFinished(writer http.ResponseWriter, request *http.Request) {
|
||
|
if server.handleError(server.handler.HaveNonprimaryProcsFinished(voidSender, voidReceiver), writer) {
|
||
|
return
|
||
|
}
|
||
|
writer.WriteHeader(http.StatusOK)
|
||
|
}
|
||
|
|
||
|
func (server *httpServer) handleAggregatedNonprimaryProcsReport(writer http.ResponseWriter, request *http.Request) {
|
||
|
var aggregatedReport types.Report
|
||
|
if server.handleError(server.handler.AggregatedNonprimaryProcsReport(voidSender, &aggregatedReport), writer) {
|
||
|
return
|
||
|
}
|
||
|
json.NewEncoder(writer).Encode(aggregatedReport)
|
||
|
}
|
||
|
|
||
|
func (server *httpServer) handleCounter(writer http.ResponseWriter, request *http.Request) {
|
||
|
var n int
|
||
|
if server.handleError(server.handler.Counter(voidSender, &n), writer) {
|
||
|
return
|
||
|
}
|
||
|
json.NewEncoder(writer).Encode(ParallelIndexCounter{Index: n})
|
||
|
}
|
||
|
|
||
|
func (server *httpServer) handleUp(writer http.ResponseWriter, request *http.Request) {
|
||
|
writer.WriteHeader(http.StatusOK)
|
||
|
}
|
||
|
|
||
|
func (server *httpServer) handleAbort(writer http.ResponseWriter, request *http.Request) {
|
||
|
if request.Method == "GET" {
|
||
|
var shouldAbort bool
|
||
|
server.handler.ShouldAbort(voidSender, &shouldAbort)
|
||
|
if shouldAbort {
|
||
|
writer.WriteHeader(http.StatusGone)
|
||
|
} else {
|
||
|
writer.WriteHeader(http.StatusOK)
|
||
|
}
|
||
|
} else {
|
||
|
server.handler.Abort(voidSender, voidReceiver)
|
||
|
}
|
||
|
}
|