// Package fulfiller provides a type that implements capnp.Answer that // resolves by calling setter methods. package fulfiller import ( "errors" "sync" "zombiezen.com/go/capnproto2" "zombiezen.com/go/capnproto2/internal/queue" ) // callQueueSize is the maximum number of pending calls. const callQueueSize = 64 // Fulfiller is a promise for a Struct. The zero value is an unresolved // answer. A Fulfiller is considered to be resolved once Fulfill or // Reject is called. Calls to the Fulfiller will queue up until it is // resolved. A Fulfiller is safe to use from multiple goroutines. type Fulfiller struct { once sync.Once resolved chan struct{} // initialized by init() // Protected by mu mu sync.RWMutex answer capnp.Answer queue []pcall // initialized by init() } // init initializes the Fulfiller. It is idempotent. // Should be called for each method on Fulfiller. func (f *Fulfiller) init() { f.once.Do(func() { f.resolved = make(chan struct{}) f.queue = make([]pcall, 0, callQueueSize) }) } // Fulfill sets the fulfiller's answer to s. If there are queued // pipeline calls, the capabilities on the struct will be embargoed // until the queued calls finish. Fulfill will panic if the fulfiller // has already been resolved. func (f *Fulfiller) Fulfill(s capnp.Struct) { f.init() f.mu.Lock() if f.answer != nil { f.mu.Unlock() panic("Fulfiller.Fulfill called more than once") } f.answer = capnp.ImmediateAnswer(s) queues := f.emptyQueue(s) ctab := s.Segment().Message().CapTable for capIdx, q := range queues { ctab[capIdx] = newEmbargoClient(ctab[capIdx], q) } close(f.resolved) f.mu.Unlock() } // emptyQueue splits the queue by which capability it targets and // drops any invalid calls. Once this function returns, f.queue will // be nil. func (f *Fulfiller) emptyQueue(s capnp.Struct) map[capnp.CapabilityID][]ecall { qs := make(map[capnp.CapabilityID][]ecall, len(f.queue)) for i, pc := range f.queue { c, err := capnp.TransformPtr(s.ToPtr(), pc.transform) if err != nil { pc.f.Reject(err) continue } in := c.Interface() if !in.IsValid() { pc.f.Reject(capnp.ErrNullClient) continue } cn := in.Capability() if qs[cn] == nil { qs[cn] = make([]ecall, 0, len(f.queue)-i) } qs[cn] = append(qs[cn], pc.ecall) } f.queue = nil return qs } // Reject sets the fulfiller's answer to err. If there are queued // pipeline calls, they will all return errors. Reject will panic if // the error is nil or the fulfiller has already been resolved. func (f *Fulfiller) Reject(err error) { if err == nil { panic("Fulfiller.Reject called with nil") } f.init() f.mu.Lock() if f.answer != nil { f.mu.Unlock() panic("Fulfiller.Reject called more than once") } f.answer = capnp.ErrorAnswer(err) for i := range f.queue { f.queue[i].f.Reject(err) f.queue[i] = pcall{} } close(f.resolved) f.mu.Unlock() } // Done returns a channel that is closed once f is resolved. func (f *Fulfiller) Done() <-chan struct{} { f.init() return f.resolved } // Peek returns f's resolved answer or nil if f has not been resolved. // The Struct method of an answer returned from Peek returns immediately. func (f *Fulfiller) Peek() capnp.Answer { f.init() f.mu.RLock() a := f.answer f.mu.RUnlock() return a } // Struct waits until f is resolved and returns its struct if fulfilled // or an error if rejected. func (f *Fulfiller) Struct() (capnp.Struct, error) { <-f.Done() return f.Peek().Struct() } // PipelineCall calls PipelineCall on the fulfilled answer or queues the // call if f has not been fulfilled. func (f *Fulfiller) PipelineCall(transform []capnp.PipelineOp, call *capnp.Call) capnp.Answer { f.init() // Fast path: pass-through after fulfilled. if a := f.Peek(); a != nil { return a.PipelineCall(transform, call) } f.mu.Lock() // Make sure that f wasn't fulfilled. if a := f.answer; a != nil { f.mu.Unlock() return a.PipelineCall(transform, call) } if len(f.queue) == cap(f.queue) { f.mu.Unlock() return capnp.ErrorAnswer(errCallQueueFull) } cc, err := call.Copy(nil) if err != nil { f.mu.Unlock() return capnp.ErrorAnswer(err) } g := new(Fulfiller) f.queue = append(f.queue, pcall{ transform: transform, ecall: ecall{ call: cc, f: g, }, }) f.mu.Unlock() return g } // PipelineClose waits until f is resolved and then calls PipelineClose // on the fulfilled answer. func (f *Fulfiller) PipelineClose(transform []capnp.PipelineOp) error { <-f.Done() return f.Peek().PipelineClose(transform) } // pcall is a queued pipeline call. type pcall struct { transform []capnp.PipelineOp ecall } // EmbargoClient is a client that flushes a queue of calls. // Fulfiller will create these automatically when pipelined calls are // made on unresolved answers. EmbargoClient is exported so that rpc // can avoid making calls on its own Conn. type EmbargoClient struct { client capnp.Client mu sync.RWMutex q queue.Queue calls ecallList } func newEmbargoClient(client capnp.Client, queue []ecall) capnp.Client { ec := &EmbargoClient{ client: client, calls: make(ecallList, callQueueSize), } ec.q.Init(ec.calls, copy(ec.calls, queue)) go ec.flushQueue() return ec } func (ec *EmbargoClient) push(cl *capnp.Call) capnp.Answer { f := new(Fulfiller) cl, err := cl.Copy(nil) if err != nil { return capnp.ErrorAnswer(err) } i := ec.q.Push() if i == -1 { return capnp.ErrorAnswer(errCallQueueFull) } ec.calls[i] = ecall{cl, f} return f } // flushQueue is run in its own goroutine. func (ec *EmbargoClient) flushQueue() { var c ecall ec.mu.Lock() if i := ec.q.Front(); i != -1 { c = ec.calls[i] } ec.mu.Unlock() for c.call != nil { ans := ec.client.Call(c.call) go func(f *Fulfiller, ans capnp.Answer) { s, err := ans.Struct() if err == nil { f.Fulfill(s) } else { f.Reject(err) } }(c.f, ans) ec.mu.Lock() ec.q.Pop() if i := ec.q.Front(); i != -1 { c = ec.calls[i] } else { c = ecall{} } ec.mu.Unlock() } } // Client returns the underlying client if the embargo has been lifted // and nil otherwise. func (ec *EmbargoClient) Client() capnp.Client { ec.mu.RLock() ok := ec.isPassthrough() ec.mu.RUnlock() if !ok { return nil } return ec.client } func (ec *EmbargoClient) isPassthrough() bool { return ec.q.Len() == 0 } // Call either queues a call to the underlying client or starts a call // if the embargo has been lifted. func (ec *EmbargoClient) Call(cl *capnp.Call) capnp.Answer { // Fast path: queue is flushed. ec.mu.RLock() ok := ec.isPassthrough() ec.mu.RUnlock() if ok { return ec.client.Call(cl) } // Add to queue. ec.mu.Lock() // Since we released the lock, check that the queue hasn't been flushed. if ec.isPassthrough() { ec.mu.Unlock() return ec.client.Call(cl) } ans := ec.push(cl) ec.mu.Unlock() return ans } // TryQueue will attempt to queue a call or return nil if the embargo // has been lifted. func (ec *EmbargoClient) TryQueue(cl *capnp.Call) capnp.Answer { ec.mu.Lock() if ec.isPassthrough() { ec.mu.Unlock() return nil } ans := ec.push(cl) ec.mu.Unlock() return ans } // Close closes the underlying client, rejecting any queued calls. func (ec *EmbargoClient) Close() error { ec.mu.Lock() // reject all queued calls for ec.q.Len() > 0 { ec.calls[ec.q.Front()].f.Reject(errQueueCallCancel) ec.q.Pop() } ec.mu.Unlock() return ec.client.Close() } // ecall is an queued embargoed call. type ecall struct { call *capnp.Call f *Fulfiller } type ecallList []ecall func (el ecallList) Len() int { return len(el) } func (el ecallList) Clear(i int) { el[i] = ecall{} } var ( errCallQueueFull = errors.New("capnp: promised answer call queue full") errQueueCallCancel = errors.New("capnp: queued call canceled") )