package fasthttp import ( "errors" "net" "runtime" "strings" "sync" "time" ) // workerPool serves incoming connections via a pool of workers // in FILO order, i.e. the most recently stopped worker will serve the next // incoming connection. // // Such a scheme keeps CPU caches hot (in theory). type workerPool struct { // Function for serving server connections. // It must leave c unclosed. WorkerFunc ServeHandler MaxWorkersCount int LogAllErrors bool MaxIdleWorkerDuration time.Duration Logger Logger lock sync.Mutex workersCount int mustStop bool ready []*workerChan stopCh chan struct{} workerChanPool sync.Pool connState func(net.Conn, ConnState) } type workerChan struct { lastUseTime time.Time ch chan net.Conn } func (wp *workerPool) Start() { if wp.stopCh != nil { return } wp.stopCh = make(chan struct{}) stopCh := wp.stopCh wp.workerChanPool.New = func() interface{} { return &workerChan{ ch: make(chan net.Conn, workerChanCap), } } go func() { var scratch []*workerChan for { wp.clean(&scratch) select { case <-stopCh: return default: time.Sleep(wp.getMaxIdleWorkerDuration()) } } }() } func (wp *workerPool) Stop() { if wp.stopCh == nil { return } close(wp.stopCh) wp.stopCh = nil // Stop all the workers waiting for incoming connections. // Do not wait for busy workers - they will stop after // serving the connection and noticing wp.mustStop = true. wp.lock.Lock() ready := wp.ready for i := range ready { ready[i].ch <- nil ready[i] = nil } wp.ready = ready[:0] wp.mustStop = true wp.lock.Unlock() } func (wp *workerPool) getMaxIdleWorkerDuration() time.Duration { if wp.MaxIdleWorkerDuration <= 0 { return 10 * time.Second } return wp.MaxIdleWorkerDuration } func (wp *workerPool) clean(scratch *[]*workerChan) { maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration() // Clean least recently used workers if they didn't serve connections // for more than maxIdleWorkerDuration. criticalTime := time.Now().Add(-maxIdleWorkerDuration) wp.lock.Lock() ready := wp.ready n := len(ready) // Use binary-search algorithm to find out the index of the least recently worker which can be cleaned up. l, r, mid := 0, n-1, 0 for l <= r { mid = (l + r) / 2 if criticalTime.After(wp.ready[mid].lastUseTime) { l = mid + 1 } else { r = mid - 1 } } i := r if i == -1 { wp.lock.Unlock() return } *scratch = append((*scratch)[:0], ready[:i+1]...) m := copy(ready, ready[i+1:]) for i = m; i < n; i++ { ready[i] = nil } wp.ready = ready[:m] wp.lock.Unlock() // Notify obsolete workers to stop. // This notification must be outside the wp.lock, since ch.ch // may be blocking and may consume a lot of time if many workers // are located on non-local CPUs. tmp := *scratch for i := range tmp { tmp[i].ch <- nil tmp[i] = nil } } func (wp *workerPool) Serve(c net.Conn) bool { ch := wp.getCh() if ch == nil { return false } ch.ch <- c return true } var workerChanCap = func() int { // Use blocking workerChan if GOMAXPROCS=1. // This immediately switches Serve to WorkerFunc, which results // in higher performance (under go1.5 at least). if runtime.GOMAXPROCS(0) == 1 { return 0 } // Use non-blocking workerChan if GOMAXPROCS>1, // since otherwise the Serve caller (Acceptor) may lag accepting // new connections if WorkerFunc is CPU-bound. return 1 }() func (wp *workerPool) getCh() *workerChan { var ch *workerChan createWorker := false wp.lock.Lock() ready := wp.ready n := len(ready) - 1 if n < 0 { if wp.workersCount < wp.MaxWorkersCount { createWorker = true wp.workersCount++ } } else { ch = ready[n] ready[n] = nil wp.ready = ready[:n] } wp.lock.Unlock() if ch == nil { if !createWorker { return nil } vch := wp.workerChanPool.Get() ch = vch.(*workerChan) go func() { wp.workerFunc(ch) wp.workerChanPool.Put(vch) }() } return ch } func (wp *workerPool) release(ch *workerChan) bool { ch.lastUseTime = time.Now() wp.lock.Lock() if wp.mustStop { wp.lock.Unlock() return false } wp.ready = append(wp.ready, ch) wp.lock.Unlock() return true } func (wp *workerPool) workerFunc(ch *workerChan) { var c net.Conn var err error for c = range ch.ch { if c == nil { break } if err = wp.WorkerFunc(c); err != nil && err != errHijacked { errStr := err.Error() if wp.LogAllErrors || !(strings.Contains(errStr, "broken pipe") || strings.Contains(errStr, "reset by peer") || strings.Contains(errStr, "request headers: small read buffer") || strings.Contains(errStr, "unexpected EOF") || strings.Contains(errStr, "i/o timeout") || errors.Is(err, ErrBadTrailer)) { wp.Logger.Printf("error when serving connection %q<->%q: %v", c.LocalAddr(), c.RemoteAddr(), err) } } if err == errHijacked { wp.connState(c, StateHijacked) } else { _ = c.Close() wp.connState(c, StateClosed) } c = nil if !wp.release(ch) { break } } wp.lock.Lock() wp.workersCount-- wp.lock.Unlock() }