// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

//go:build go1.21

package quic

import (
	"sync/atomic"
	"time"
)

// connInflow tracks connection-level flow control for data sent by the peer to us.
//
// There are four byte offsets of significance in the stream of data received from the peer,
// each >= to the previous:
//
//   - bytes read by the user
//   - bytes received from the peer
//   - limit sent to the peer in a MAX_DATA frame
//   - potential new limit to sent to the peer
//
// We maintain a flow control window, so as bytes are read by the user
// the potential limit is extended correspondingly.
//
// We keep an atomic counter of bytes read by the user and not yet applied to the
// potential limit (credit). When this count grows large enough, we update the
// new limit to send and mark that we need to send a new MAX_DATA frame.
type connInflow struct {
	sent      sentVal // set when we need to send a MAX_DATA update to the peer
	usedLimit int64   // total bytes sent by the peer, must be less than sentLimit
	sentLimit int64   // last MAX_DATA sent to the peer
	newLimit  int64   // new MAX_DATA to send

	credit atomic.Int64 // bytes read but not yet applied to extending the flow-control window
}

func (c *Conn) inflowInit() {
	// The initial MAX_DATA limit is sent as a transport parameter.
	c.streams.inflow.sentLimit = c.config.maxConnReadBufferSize()
	c.streams.inflow.newLimit = c.streams.inflow.sentLimit
}

// handleStreamBytesReadOffLoop records that the user has consumed bytes from a stream.
// We may extend the peer's flow control window.
//
// This is called indirectly by the user, via Read or CloseRead.
func (c *Conn) handleStreamBytesReadOffLoop(n int64) {
	if n == 0 {
		return
	}
	if c.shouldUpdateFlowControl(c.streams.inflow.credit.Add(n)) {
		// We should send a MAX_DATA update to the peer.
		// Record this on the Conn's main loop.
		c.sendMsg(func(now time.Time, c *Conn) {
			// A MAX_DATA update may have already happened, so check again.
			if c.shouldUpdateFlowControl(c.streams.inflow.credit.Load()) {
				c.sendMaxDataUpdate()
			}
		})
	}
}

// handleStreamBytesReadOnLoop extends the peer's flow control window after
// data has been discarded due to a RESET_STREAM frame.
//
// This is called on the conn's loop.
func (c *Conn) handleStreamBytesReadOnLoop(n int64) {
	if c.shouldUpdateFlowControl(c.streams.inflow.credit.Add(n)) {
		c.sendMaxDataUpdate()
	}
}

func (c *Conn) sendMaxDataUpdate() {
	c.streams.inflow.sent.setUnsent()
	// Apply current credit to the limit.
	// We don't strictly need to do this here
	// since appendMaxDataFrame will do so as well,
	// but this avoids redundant trips down this path
	// if the MAX_DATA frame doesn't go out right away.
	c.streams.inflow.newLimit += c.streams.inflow.credit.Swap(0)
}

func (c *Conn) shouldUpdateFlowControl(credit int64) bool {
	return shouldUpdateFlowControl(c.config.maxConnReadBufferSize(), credit)
}

// handleStreamBytesReceived records that the peer has sent us stream data.
func (c *Conn) handleStreamBytesReceived(n int64) error {
	c.streams.inflow.usedLimit += n
	if c.streams.inflow.usedLimit > c.streams.inflow.sentLimit {
		return localTransportError{
			code:   errFlowControl,
			reason: "stream exceeded flow control limit",
		}
	}
	return nil
}

// appendMaxDataFrame appends a MAX_DATA frame to the current packet.
//
// It returns true if no more frames need appending,
// false if it could not fit a frame in the current packet.
func (c *Conn) appendMaxDataFrame(w *packetWriter, pnum packetNumber, pto bool) bool {
	if c.streams.inflow.sent.shouldSendPTO(pto) {
		// Add any unapplied credit to the new limit now.
		c.streams.inflow.newLimit += c.streams.inflow.credit.Swap(0)
		if !w.appendMaxDataFrame(c.streams.inflow.newLimit) {
			return false
		}
		c.streams.inflow.sentLimit += c.streams.inflow.newLimit
		c.streams.inflow.sent.setSent(pnum)
	}
	return true
}

// ackOrLossMaxData records the fate of a MAX_DATA frame.
func (c *Conn) ackOrLossMaxData(pnum packetNumber, fate packetFate) {
	c.streams.inflow.sent.ackLatestOrLoss(pnum, fate)
}

// connOutflow tracks connection-level flow control for data sent by us to the peer.
type connOutflow struct {
	max  int64 // largest MAX_DATA received from peer
	used int64 // total bytes of STREAM data sent to peer
}

// setMaxData updates the connection-level flow control limit
// with the initial limit conveyed in transport parameters
// or an update from a MAX_DATA frame.
func (f *connOutflow) setMaxData(maxData int64) {
	f.max = max(f.max, maxData)
}

// avail returns the number of connection-level flow control bytes available.
func (f *connOutflow) avail() int64 {
	return f.max - f.used
}

// consume records consumption of n bytes of flow.
func (f *connOutflow) consume(n int64) {
	f.used += n
}
