// Package server provides runtime support for implementing Cap'n Proto
// interfaces locally.
package server // import "zombiezen.com/go/capnproto2/server"

import (
	"errors"
	"sort"
	"sync"

	"golang.org/x/net/context"
	"zombiezen.com/go/capnproto2"
	"zombiezen.com/go/capnproto2/internal/fulfiller"
)

// A Method describes a single method on a server object.
type Method struct {
	capnp.Method
	Impl        Func
	ResultsSize capnp.ObjectSize
}

// A Func is a function that implements a single method.
type Func func(ctx context.Context, options capnp.CallOptions, params, results capnp.Struct) error

// Closer is the interface that wraps the Close method.
type Closer interface {
	Close() error
}

// A server is a locally implemented interface.
type server struct {
	methods sortedMethods
	closer  Closer
	queue   chan *call
	stop    chan struct{}
	done    chan struct{}
}

// New returns a client that makes calls to a set of methods.
// If closer is nil then the client's Close is a no-op.  The server
// guarantees message delivery order by blocking each call on the
// return or acknowledgment of the previous call.  See the Ack function
// for more details.
func New(methods []Method, closer Closer) capnp.Client {
	s := &server{
		methods: make(sortedMethods, len(methods)),
		closer:  closer,
		queue:   make(chan *call),
		stop:    make(chan struct{}),
		done:    make(chan struct{}),
	}
	copy(s.methods, methods)
	sort.Sort(s.methods)
	go s.dispatch()
	return s
}

// dispatch runs in its own goroutine.
func (s *server) dispatch() {
	defer close(s.done)
	for {
		select {
		case cl := <-s.queue:
			err := s.startCall(cl)
			if err != nil {
				cl.ans.Reject(err)
			}
		case <-s.stop:
			return
		}
	}
}

// startCall runs in the dispatch goroutine to start a call.
func (s *server) startCall(cl *call) error {
	_, out, err := capnp.NewMessage(capnp.SingleSegment(nil))
	if err != nil {
		return err
	}
	results, err := capnp.NewRootStruct(out, cl.method.ResultsSize)
	if err != nil {
		return err
	}
	acksig := newAckSignal()
	opts := cl.Options.With([]capnp.CallOption{capnp.SetOptionValue(ackSignalKey, acksig)})
	go func() {
		err := cl.method.Impl(cl.Ctx, opts, cl.Params, results)
		if err == nil {
			cl.ans.Fulfill(results)
		} else {
			cl.ans.Reject(err)
		}
	}()
	select {
	case <-acksig.c:
	case <-cl.ans.Done():
		// Implementation functions may not call Ack, which is fine for
		// smaller functions.
	case <-cl.Ctx.Done():
		// Ideally, this would reject the answer immediately, but then you
		// would race with the implementation function.
	}
	return nil
}

func (s *server) Call(cl *capnp.Call) capnp.Answer {
	sm := s.methods.find(&cl.Method)
	if sm == nil {
		return capnp.ErrorAnswer(&capnp.MethodError{
			Method: &cl.Method,
			Err:    capnp.ErrUnimplemented,
		})
	}
	cl, err := cl.Copy(nil)
	if err != nil {
		return capnp.ErrorAnswer(err)
	}
	scall := newCall(cl, sm)
	select {
	case s.queue <- scall:
		return &scall.ans
	case <-s.stop:
		return capnp.ErrorAnswer(errClosed)
	case <-cl.Ctx.Done():
		return capnp.ErrorAnswer(cl.Ctx.Err())
	}
}

func (s *server) Close() error {
	close(s.stop)
	<-s.done
	if s.closer == nil {
		return nil
	}
	return s.closer.Close()
}

// Ack acknowledges delivery of a server call, allowing other methods
// to be called on the server.  It is intended to be used inside the
// implementation of a server function.  Calling Ack on options that
// aren't from a server method implementation is a no-op.
//
// Example:
//
//	func (my *myServer) MyMethod(call schema.MyServer_myMethod) error {
//		server.Ack(call.Options)
//		// ... do long-running operation ...
//		return nil
//	}
//
// Ack need not be the first call in a function nor is it required.
// Since the function's return is also an acknowledgment of delivery,
// short functions can return without calling Ack.  However, since
// clients will not return an Answer until the delivery is acknowledged,
// it is advisable to ack early.
func Ack(opts capnp.CallOptions) {
	if ack, _ := opts.Value(ackSignalKey).(*ackSignal); ack != nil {
		ack.signal()
	}
}

type call struct {
	*capnp.Call
	ans    fulfiller.Fulfiller
	method *Method
}

func newCall(cl *capnp.Call, sm *Method) *call {
	return &call{Call: cl, method: sm}
}

type sortedMethods []Method

// find returns the method with the given ID or nil.
func (sm sortedMethods) find(id *capnp.Method) *Method {
	i := sort.Search(len(sm), func(i int) bool {
		m := &sm[i]
		if m.InterfaceID != id.InterfaceID {
			return m.InterfaceID >= id.InterfaceID
		}
		return m.MethodID >= id.MethodID
	})
	if i == len(sm) {
		return nil
	}
	m := &sm[i]
	if m.InterfaceID != id.InterfaceID || m.MethodID != id.MethodID {
		return nil
	}
	return m
}

func (sm sortedMethods) Len() int {
	return len(sm)
}

func (sm sortedMethods) Less(i, j int) bool {
	if id1, id2 := sm[i].InterfaceID, sm[j].InterfaceID; id1 != id2 {
		return id1 < id2
	}
	return sm[i].MethodID < sm[j].MethodID
}

func (sm sortedMethods) Swap(i, j int) {
	sm[i], sm[j] = sm[j], sm[i]
}

type ackSignal struct {
	c    chan struct{}
	once sync.Once
}

func newAckSignal() *ackSignal {
	return &ackSignal{c: make(chan struct{})}
}

func (ack *ackSignal) signal() {
	ack.once.Do(func() {
		close(ack.c)
	})
}

// callOptionKey is the unexported key type for predefined options.
type callOptionKey int

// Predefined call options
const (
	ackSignalKey callOptionKey = iota + 1
)

var errClosed = errors.New("capnp: server closed")