// Copyright 2016 Google Inc. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package main import ( "runtime" ) type RateLimit struct { requests chan struct{} finished chan int released chan int stop chan struct{} } // NewRateLimit starts a new rate limiter with maxExecs number of executions // allowed to happen at a time. If maxExecs is <= 0, it will default to the // number of logical CPUs on the system. // // With Finish and Release, we'll keep track of outstanding buffer sizes to be // written. If that size goes above maxMem, we'll prevent starting new // executions. // // The total memory use may be higher due to current executions. This just // prevents runaway memory use due to slower writes. func NewRateLimit(maxExecs int, maxMem int64) *RateLimit { if maxExecs <= 0 { maxExecs = runtime.NumCPU() } if maxMem <= 0 { // Default to 512MB maxMem = 512 * 1024 * 1024 } ret := &RateLimit{ requests: make(chan struct{}), // Let all of the pending executions to mark themselves as finished, // even if our goroutine isn't processing input. finished: make(chan int, maxExecs), released: make(chan int), stop: make(chan struct{}), } go ret.goFunc(maxExecs, maxMem) return ret } // RequestExecution blocks until another execution can be allowed to run. func (r *RateLimit) RequestExecution() Execution { <-r.requests return r.finished } type Execution chan<- int // Finish will mark your execution as finished, and allow another request to be // approved. // // bufferSize may be specified to count memory buffer sizes, and must be // matched with calls to RateLimit.Release to mark the buffers as released. func (e Execution) Finish(bufferSize int) { e <- bufferSize } // Call Release when finished with a buffer recorded with Finish. func (r *RateLimit) Release(bufferSize int) { r.released <- bufferSize } // Stop the background goroutine func (r *RateLimit) Stop() { close(r.stop) } func (r *RateLimit) goFunc(maxExecs int, maxMem int64) { var curExecs int var curMemory int64 for { var requests chan struct{} if curExecs < maxExecs && curMemory < maxMem { requests = r.requests } select { case requests <- struct{}{}: curExecs++ case amount := <-r.finished: curExecs-- curMemory += int64(amount) if curExecs < 0 { panic("curExecs < 0") } case amount := <-r.released: curMemory -= int64(amount) case <-r.stop: return } } }