1. ホーム
  2. スクリプト・コラム
  3. ゴラン

Goトークンバケットベースのフローリミッターの実装

2022-02-13 05:07:39

はじめに

一般的な流量が多すぎて下流のシステムが対応できない場合は、このように流量を制限することで、実は地下鉄に行くのと同じで、上流から下流へのアクセスを遅くすることができるのです。
サービスへのアクセス頻度や回数を制限して、過負荷やレンガ化などを防ぐこと。

Golangの公式拡張パッケージtime (golang.org/x/time/rate) は、トークンバケツなどに基づくフローリミッターの実装を提供します。

原則の概要

  • トークン トークンを取得するたびに、その都度アクセスできるのは
  • バケット 、バケットの最大容量は決まっており、満タンになるまで一定の頻度でトークンが追加される
  • 1リクエストにつき1トークンを消費します。
  • トークン・バケットは、フロー・リミッターが初期化された時点で満杯になっているのが普通です。

の具体的な使用方法

package limiter

import (
 "fmt"
 "testing"
 "time"

 "golang.org/x/time/rate"
)

func TestLimter(t *testing.T) {
 limiter := rate.NewLimiter(rate.Every(time.Millisecond*31), 2)
 //Sleep(time.Second)
 for i := 0; i < 10; i++ {
  var ok bool
  if limiter.Allow() {
   ok = true
  }
  time.Sleep(time.Millisecond * 20)
  fmt.Println(ok, limiter.Burst())
 }
}

実施結果
=== RUN TestLimter
真 2
真2
真2
false 2
真2
真2
false 2
真2
真2
false 2
--- PASS: TestLimter (0.21s)

実行結果からわかるように、トークンのバケツは2つ満タンで始まり、トークン間隔がリクエスト間隔(31-20)より11ms長いので、2リクエストに1回失敗していることがわかります。

具体的な実施原則

まず、下位リミッターの作成について見てみましょう。NewLimiter

func NewLimiter(r Limit, b int) *Limiter {
 return &Limiter{
  limit: r,
  burst: b,
 }
}


View Limiter のデータ構造 Limiter

// The methods AllowN, ReserveN, and WaitN consume n tokens.
type Limiter struct {
 mu sync.Mutex
 limit Limit
 burst int
 tokens float64
 // last is the last time the limiter's tokens field was updated
 last time.Time
 // lastEvent is the latest time of a rate-limited event (past or future)
 lastEvent time.Time
Time }


  • バーストはバケツの大きさを表します
  • limitはバケツを入れる頻度を示します
  • トークンは残りのトークンの数を示す
  • last トークンが取得された直近の時刻
  • lastEvent 直近の流量制限イベントの発生時刻

トークン・バケットを発行すると、次のように定義されたReservationオブジェクトに予約され、timeToAct時間に達した後に取得できるトークンの数が記述される。

type Reservation struct {
  ok bool // whether the condition is met to assign tokens
  lim *Limiter // Limiter to send the token
  tokens int // number of tokens
  timeToAct time.Time // the time to meet the token issuance
  limit Limit // speed of token issuance
}


電流リミッターによる流量制限の仕組み

公式のフローリミッターには、ブロッキング待機、直接判定、メンテナンス予約などがあります。
フローリミッターを実装する方法のコードはreserveNにあります。

使用すると、Allow() メソッドが毎回呼び出されます。

// Allow is shorthand for AllowN(time.Now(), 1).
func (lim *Limiter) Allow() bool {
 return lim.AllowN(time.Now(), 1)
}

// AllowN reports whether n events may happen at time now.
// Use this method if you intend to drop / skip events that exceed the rate limit.
// Otherwise use Reserve or Wait.
func (lim *Limiter) AllowN(now time.Time, n int) bool {
 return lim.reserveN(now, n, 0).ok
}



reserverN アルゴリズムを続行する

メソッドの説明です。

  • 3つのパラメータ:now, n, maxFutureReserve
  • n トークンは現在時刻に、maxFutureReserve は maxFutureReserve になります。
  • 結果は、予約されたトークンReservationを持つオブジェクトを返します。
// maxFutureReserve specifies the maximum reservation wait duration allowed.
// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
 lim.mu.Lock()
 // first determine if the frequency is infinite, if infinite, it means no limit for now
 if lim.limit == Inf {
  lim.mu.Unlock()
  return Reservation{
   ok: true,
   lim: lim,
   tokens: n,
   timeToAct: now,
  }
 }
   // The number of tokens that can be obtained as of now time, the last time the token was taken is last
 now, last, tokens := lim.advance(now)

 // Calculate the remaining number of tokens resulting from the request.
 // Update the number of tokens and remove the ones that need to be taken away
 tokens -= float64(n)

 // Calculate the wait duration
 // If the number of tokens is negative, it means it needs to wait, calculate the wait duration WaitDuration
 var waitDuration time.
 if tokens < 0 {
  waitDuration = lim.limit.durationFromTokens(-tokens)
 }

 // Decide result
 // Calculate if the allocation requirements are met
 // 1. the size to be allocated does not exceed the bucket capacity
 // 2. wait no longer than the set wait time
 ok := n <= lim.burst && waitDuration <= maxFutureReserve

 // Prepare reservation
 // finally construct a Resvervation object
 r := Reservation{
  ok: ok,
  lim: lim,
  limit: lim.limit,
 limit, }
 if ok {
  r.tokens = n
  r.timeToAct = now.Add(waitDuration)
 }

 // Update state
   // need to update the value of the current limit 
 if ok {
  lim.last = now
  lim.tokens = tokens
  lim.lastEvent = r.timeToAct
 } else {
  lim.last = last
 }

 lim.mu.Unlock()
 return r
}

実装を見る限り、現在のバケツの数を逐一更新するのではなく、リミッターは前回訪問時のバケツと現在のバケツのトークン数を記録し、再度訪問した際に前回訪問時までの現在のトークン数を計算し、トークン発行の可否を判断しているようです。

Goのトークンバケットベースのフローリミッターについては、この記事がすべてです。Goのトークンバケットフローリミッターの詳細については、スクリプトハウスの過去記事を検索するか、以下の関連記事を引き続き閲覧してください。