// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rtc

import (
	"fmt"
	"io"
	"maps"
	"math/rand"
	"net"
	"slices"
	"strconv"
	"strings"
	"sync"
	"time"

	"github.com/pion/dtls/v3/pkg/crypto/elliptic"
	"github.com/pion/ice/v4"
	"github.com/pion/interceptor"
	"github.com/pion/interceptor/pkg/cc"
	"github.com/pion/interceptor/pkg/gcc"
	"github.com/pion/interceptor/pkg/twcc"
	"github.com/pion/rtcp"
	"github.com/pion/sdp/v3"
	"github.com/pion/webrtc/v4"
	"github.com/pkg/errors"
	"go.uber.org/atomic"
	"go.uber.org/zap/zapcore"
	"google.golang.org/protobuf/proto"

	"github.com/livekit/livekit-server/pkg/config"
	"github.com/livekit/livekit-server/pkg/rtc/transport"
	"github.com/livekit/livekit-server/pkg/rtc/types"
	"github.com/livekit/livekit-server/pkg/sfu/buffer"
	"github.com/livekit/livekit-server/pkg/sfu/bwe"
	"github.com/livekit/livekit-server/pkg/sfu/bwe/remotebwe"
	"github.com/livekit/livekit-server/pkg/sfu/bwe/sendsidebwe"
	"github.com/livekit/livekit-server/pkg/sfu/datachannel"
	sfuinterceptor "github.com/livekit/livekit-server/pkg/sfu/interceptor"
	"github.com/livekit/livekit-server/pkg/sfu/pacer"
	pd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/playoutdelay"
	"github.com/livekit/livekit-server/pkg/sfu/streamallocator"
	sfuutils "github.com/livekit/livekit-server/pkg/sfu/utils"
	"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
	"github.com/livekit/livekit-server/pkg/utils"
	lkinterceptor "github.com/livekit/mediatransportutil/pkg/interceptor"
	lktwcc "github.com/livekit/mediatransportutil/pkg/twcc"
	"github.com/livekit/protocol/codecs/mime"
	"github.com/livekit/protocol/livekit"
	"github.com/livekit/protocol/logger"
	"github.com/livekit/protocol/logger/pionlogger"
	lksdp "github.com/livekit/protocol/sdp"
	"github.com/livekit/protocol/utils/mono"
)

const (
	LossyDataChannel     = "_lossy"
	ReliableDataChannel  = "_reliable"
	DataTrackDataChannel = "_data_track"

	fastNegotiationFrequency   = 10 * time.Millisecond
	negotiationFrequency       = 150 * time.Millisecond
	negotiationFailedTimeout   = 15 * time.Second
	dtlsRetransmissionInterval = 100 * time.Millisecond

	iceDisconnectedTimeout = 10 * time.Second                          // compatible for ice-lite with firefox client
	iceFailedTimeout       = 5 * time.Second                           // time between disconnected and failed
	iceFailedTimeoutTotal  = iceFailedTimeout + iceDisconnectedTimeout // total time between connecting and failure
	iceKeepaliveInterval   = 2 * time.Second                           // pion's default

	minTcpICEConnectTimeout = 5 * time.Second
	maxTcpICEConnectTimeout = 12 * time.Second // js-sdk has a default 15s timeout for first connection, let server detect failure earlier before that

	minConnectTimeoutAfterICE = 10 * time.Second
	maxConnectTimeoutAfterICE = 20 * time.Second // max duration for waiting pc to connect after ICE is connected

	shortConnectionThreshold = 90 * time.Second

	dataChannelBufferSize             = 65535
	lossyDataChannelMinBufferedAmount = 8 * 1024
)

var (
	ErrNoICETransport                    = errors.New("no ICE transport")
	ErrIceRestartWithoutLocalSDP         = errors.New("ICE restart without local SDP settled")
	ErrIceRestartOnClosedPeerConnection  = errors.New("ICE restart on closed peer connection")
	ErrNoTransceiver                     = errors.New("no transceiver")
	ErrNoSender                          = errors.New("no sender")
	ErrMidNotFound                       = errors.New("mid not found")
	ErrNotSynchronousLocalCandidatesMode = errors.New("not using synchronous local candidates mode")
	ErrNoRemoteDescription               = errors.New("no remote description")
	ErrNoLocalDescription                = errors.New("no local description")
	ErrInvalidSDPFragment                = errors.New("invalid sdp fragment")
	ErrNoBundleMid                       = errors.New("could not get bundle mid")
	ErrMidMismatch                       = errors.New("media mid does not match bundle mid")
	ErrICECredentialMismatch             = errors.New("ice credential mismatch")
)

// -------------------------------------------------------------------------

type signal int

const (
	signalICEGatheringComplete signal = iota
	signalLocalICECandidate
	signalRemoteICECandidate
	signalSendOffer
	signalRemoteDescriptionReceived
	signalICERestart
)

func (s signal) String() string {
	switch s {
	case signalICEGatheringComplete:
		return "ICE_GATHERING_COMPLETE"
	case signalLocalICECandidate:
		return "LOCAL_ICE_CANDIDATE"
	case signalRemoteICECandidate:
		return "REMOTE_ICE_CANDIDATE"
	case signalSendOffer:
		return "SEND_OFFER"
	case signalRemoteDescriptionReceived:
		return "REMOTE_DESCRIPTION_RECEIVED"
	case signalICERestart:
		return "ICE_RESTART"
	default:
		return fmt.Sprintf("%d", int(s))
	}
}

// -------------------------------------------------------

type event struct {
	*PCTransport
	signal signal
	data   any
}

func (e event) String() string {
	return fmt.Sprintf("PCTransport:Event{signal: %s, data: %+v}", e.signal, e.data)
}

// -------------------------------------------------------

type wrappedICECandidatePairLogger struct {
	pair *webrtc.ICECandidatePair
}

func (w wrappedICECandidatePairLogger) MarshalLogObject(e zapcore.ObjectEncoder) error {
	if w.pair == nil {
		return nil
	}

	if w.pair.Local != nil {
		e.AddString("localProtocol", w.pair.Local.Protocol.String())
		e.AddString("localCandidateType", w.pair.Local.Typ.String())
		e.AddString("localAddress", w.pair.Local.Address)
		e.AddUint16("localPort", w.pair.Local.Port)
	}
	if w.pair.Remote != nil {
		e.AddString("remoteProtocol", w.pair.Remote.Protocol.String())
		e.AddString("remoteCandidateType", w.pair.Remote.Typ.String())
		e.AddString("remoteAddress", MaybeTruncateIP(w.pair.Remote.Address))
		e.AddUint16("remotePort", w.pair.Remote.Port)
		if w.pair.Remote.RelatedAddress != "" {
			e.AddString("relatedAddress", MaybeTruncateIP(w.pair.Remote.RelatedAddress))
			e.AddUint16("relatedPort", w.pair.Remote.RelatedPort)
		}
	}
	return nil
}

// -------------------------------------------------------------------

type trackDescription struct {
	mid    string
	sender *webrtc.RTPSender
}

// PCTransport is a wrapper around PeerConnection, with some helper methods
type PCTransport struct {
	params       TransportParams
	pc           *webrtc.PeerConnection
	iceTransport *webrtc.ICETransport
	me           *webrtc.MediaEngine

	lock sync.RWMutex

	firstOfferReceived      bool
	firstOfferNoDataChannel bool
	reliableDC              *datachannel.DataChannelWriter[*webrtc.DataChannel]
	reliableDCOpened        bool
	lossyDC                 *datachannel.DataChannelWriter[*webrtc.DataChannel]
	lossyDCOpened           bool
	dataTrackDC             *datachannel.DataChannelWriter[*webrtc.DataChannel]
	unlabeledDataChannels   []*datachannel.DataChannelWriter[*webrtc.DataChannel]

	iceStartedAt               time.Time
	iceConnectedAt             time.Time
	firstConnectedAt           time.Time
	connectedAt                time.Time
	tcpICETimer                *time.Timer
	connectAfterICETimer       *time.Timer // timer to wait for pc to connect after ice connected
	resetShortConnOnICERestart atomic.Bool
	signalingRTT               atomic.Uint32 // milliseconds

	debouncedNegotiate *sfuutils.Debouncer
	debouncePending    bool
	lastNegotiate      time.Time

	onNegotiationStateChanged func(state transport.NegotiationState)

	rtxInfoExtractorFactory *sfuinterceptor.RTXInfoExtractorFactory

	// stream allocator for subscriber PC
	streamAllocator *streamallocator.StreamAllocator

	// only for subscriber PC
	bwe   bwe.BWE
	pacer pacer.Pacer

	// transceivers (senders) waiting for SetRemoteDescription (offer) to happen before
	// SetCodecPreferences can be invoked on them.
	// Pion adapts codecs/payload types from remote description.
	// If SetCodecPreferences are done before the remote description is processed,
	// it is possible that the transceiver gets payload types from media engine.
	// Subssequently if the peer sends an offer with different payload type for the
	// same codec, there could be two payload types for the same codec and the wrong
	// one could be used in the forwarding path. So, wait for `SetRemoteDescription`
	// to happen so that remote side payload types are adapted.
	sendersPendingConfigMu sync.Mutex
	sendersPendingConfig   []configureSenderParams

	previousAnswer *webrtc.SessionDescription
	// track id -> description map in previous offer sdp
	previousTrackDescription map[string]*trackDescription
	canReuseTransceiver      bool

	preferTCP atomic.Bool
	isClosed  atomic.Bool

	// used to check for offer/answer pairing,
	// i. e. every offer should have an answer before another offer can be sent
	localOfferId   atomic.Uint32
	remoteAnswerId atomic.Uint32

	remoteOfferId atomic.Uint32
	localAnswerId atomic.Uint32

	eventsQueue *utils.TypedOpsQueue[event]

	connectionDetails      *types.ICEConnectionDetails
	selectedPair           atomic.Pointer[webrtc.ICECandidatePair]
	mayFailedICEStats      []iceCandidatePairStats
	mayFailedICEStatsTimer *time.Timer

	numOutstandingAudios uint32
	numRequestSentAudios uint32
	numOutstandingVideos uint32
	numRequestSentVideos uint32

	// the following should be accessed only in event processing go routine
	cacheLocalCandidates      bool
	cachedLocalCandidates     []*webrtc.ICECandidate
	pendingRemoteCandidates   []*webrtc.ICECandidateInit
	restartAfterGathering     bool
	restartAtNextOffer        bool
	negotiationState          transport.NegotiationState
	negotiateCounter          atomic.Int32
	signalStateCheckTimer     *time.Timer
	currentOfferIceCredential string // ice user:pwd, for publish side ice restart checking
	pendingRestartIceOffer    *webrtc.SessionDescription
}

type TransportParams struct {
	Handler                       transport.Handler
	ProtocolVersion               types.ProtocolVersion
	Config                        *WebRTCConfig
	Twcc                          *lktwcc.Responder
	DirectionConfig               DirectionConfig
	CongestionControlConfig       config.CongestionControlConfig
	EnabledCodecs                 []*livekit.Codec
	Logger                        logger.Logger
	Transport                     livekit.SignalTarget
	SimTracks                     map[uint32]sfuinterceptor.SimulcastTrackInfo
	ClientInfo                    ClientInfo
	IsOfferer                     bool
	IsSendSide                    bool
	AllowPlayoutDelay             bool
	UseOneShotSignallingMode      bool
	FireOnTrackBySdp              bool
	DataChannelMaxBufferedAmount  uint64
	DatachannelSlowThreshold      int
	DatachannelLossyTargetLatency time.Duration

	// for development test
	DatachannelMaxReceiverBufferSize int

	EnableDataTracks bool
}

func newPeerConnection(
	params TransportParams,
	onBandwidthEstimator func(estimator cc.BandwidthEstimator),
) (*webrtc.PeerConnection, *webrtc.MediaEngine, *sfuinterceptor.RTXInfoExtractorFactory, error) {
	directionConfig := params.DirectionConfig
	if params.AllowPlayoutDelay {
		directionConfig.RTPHeaderExtension.Video = append(directionConfig.RTPHeaderExtension.Video, pd.PlayoutDelayURI)
	}

	// Some of the browser clients do not handle H.264 High Profile in signalling properly.
	// They still decode if the actual stream is H.264 High Profile, but do not handle it well in signalling.
	// So, disable H.264 High Profile for SUBSCRIBER peer connection to ensure it is not offered.
	me, err := createMediaEngine(params.EnabledCodecs, directionConfig, params.IsOfferer)
	if err != nil {
		return nil, nil, nil, err
	}

	se := params.Config.SettingEngine
	se.DisableMediaEngineCopy(true)
	// simulcast layer disable/enable signalled via signalling channel,
	// so disable rid pause in SDP
	se.SetIgnoreRidPauseForRecv(true)

	// Change elliptic curve to improve connectivity
	// https://github.com/pion/dtls/pull/474
	se.SetDTLSEllipticCurves(elliptic.X25519, elliptic.P384, elliptic.P256)

	// Disable close by dtls to avoid peerconnection close too early in migration
	// https://github.com/pion/webrtc/pull/2961
	se.DisableCloseByDTLS(true)

	se.DetachDataChannels()
	if params.DatachannelSlowThreshold > 0 {
		se.EnableDataChannelBlockWrite(true)
	}
	if params.DatachannelMaxReceiverBufferSize > 0 {
		se.SetSCTPMaxReceiveBufferSize(uint32(params.DatachannelMaxReceiverBufferSize))
	}
	if params.FireOnTrackBySdp {
		se.SetFireOnTrackBeforeFirstRTP(true)
	}

	if params.ClientInfo.SupportsSctpZeroChecksum() {
		se.EnableSCTPZeroChecksum(true)
	}

	//
	// Disable SRTP replay protection (https://datatracker.ietf.org/doc/html/rfc3711#page-15).
	// Needed due to lack of RTX stream support in Pion.
	//
	// When clients probe for bandwidth, there are several possible approaches
	//   1. Use padding packet (Chrome uses this)
	//   2. Use an older packet (Firefox uses this)
	// Typically, these are sent over the RTX stream and hence SRTP replay protection will not
	// trigger. As Pion does not support RTX, when firefox uses older packet for probing, they
	// trigger the replay protection.
	//
	// That results in two issues
	//   - Firefox bandwidth probing is not successful
	//   - Pion runs out of read buffer capacity - this potentially looks like a Pion issue
	//
	// NOTE: It is not required to disable RTCP replay protection, but doing it to be symmetric.
	//
	se.DisableSRTPReplayProtection(true)
	se.DisableSRTCPReplayProtection(true)
	if !params.ProtocolVersion.SupportsICELite() || !params.ClientInfo.SupportsPrflxOverRelay() {
		// if client don't support prflx over relay which is only Firefox, disable ICE Lite to ensure that
		// aggressive nomination is handled properly. Firefox does aggressive nomination even if peer is
		// ICE Lite (see comment as to historical reasons: https://github.com/pion/ice/pull/739#issuecomment-2452245066).
		// pion/ice (as of v2.3.37) will accept all use-candidate switches when in ICE Lite mode.
		// That combined with aggressive nomination from Firefox could potentially lead to the two ends
		// ending up with different candidates.
		// As Firefox does not support migration, ICE Lite can be disabled.
		se.SetLite(false)
	}
	se.SetDTLSRetransmissionInterval(dtlsRetransmissionInterval)
	se.SetICETimeouts(iceDisconnectedTimeout, iceFailedTimeout, iceKeepaliveInterval)

	// if client don't support prflx over relay, we should not expose private address to it, use single external ip as host candidate
	if !params.ClientInfo.SupportsPrflxOverRelay() && len(params.Config.NAT1To1IPs) > 0 {
		var nat1to1Ips []string
		var includeIps []string
		for _, mapping := range params.Config.NAT1To1IPs {
			if ips := strings.Split(mapping, "/"); len(ips) == 2 {
				if ips[0] != ips[1] {
					nat1to1Ips = append(nat1to1Ips, mapping)
					includeIps = append(includeIps, ips[1])
				}
			}
		}
		if len(nat1to1Ips) > 0 {
			params.Logger.Infow("client doesn't support prflx over relay, use external ip only as host candidate", "ips", nat1to1Ips)
			se.SetNAT1To1IPs(nat1to1Ips, webrtc.ICECandidateTypeHost)
			se.SetIPFilter(func(ip net.IP) bool {
				if ip.To4() == nil {
					return true
				}
				ipstr := ip.String()
				return slices.Contains(includeIps, ipstr)
			})
		}
	}

	lf := pionlogger.NewLoggerFactory(params.Logger)
	if lf != nil {
		se.LoggerFactory = lf
	}

	ir := &interceptor.Registry{}
	if params.IsSendSide {
		if params.CongestionControlConfig.UseSendSideBWEInterceptor && !params.CongestionControlConfig.UseSendSideBWE {
			params.Logger.Infow("using send side BWE - interceptor")
			gf, err := cc.NewInterceptor(func() (cc.BandwidthEstimator, error) {
				return gcc.NewSendSideBWE(
					gcc.SendSideBWEInitialBitrate(1*1000*1000),
					gcc.SendSideBWEPacer(gcc.NewNoOpPacer()),
				)
			})
			if err == nil {
				gf.OnNewPeerConnection(func(id string, estimator cc.BandwidthEstimator) {
					if onBandwidthEstimator != nil {
						onBandwidthEstimator(estimator)
					}
				})
				ir.Add(gf)

				tf, err := twcc.NewHeaderExtensionInterceptor()
				if err == nil {
					ir.Add(tf)
				}
			}
		}
	}
	if !params.IsOfferer {
		// sfu only use interceptor to send XR but don't read response from it (use buffer instead),
		// so use a empty callback here
		ir.Add(lkinterceptor.NewRTTFromXRFactory(func(rtt uint32) {}))
	}
	if len(params.SimTracks) > 0 {
		f, err := sfuinterceptor.NewUnhandleSimulcastInterceptorFactory(sfuinterceptor.UnhandleSimulcastTracks(params.Logger, params.SimTracks))
		if err != nil {
			params.Logger.Warnw("NewUnhandleSimulcastInterceptorFactory failed", err)
		} else {
			ir.Add(f)
		}
	}

	setTWCCForVideo := func(info *interceptor.StreamInfo) {
		if !mime.IsMimeTypeStringVideo(info.MimeType) {
			return
		}
		// rtx stream don't have rtcp feedback, always set twcc for rtx stream
		twccFb := mime.GetMimeTypeCodec(info.MimeType) == mime.MimeTypeCodecRTX
		if !twccFb {
			for _, fb := range info.RTCPFeedback {
				if fb.Type == webrtc.TypeRTCPFBTransportCC {
					twccFb = true
					break
				}
			}
		}
		if !twccFb {
			return
		}

		twccExtID := sfuutils.GetHeaderExtensionID(info.RTPHeaderExtensions, webrtc.RTPHeaderExtensionCapability{URI: sdp.TransportCCURI})
		if twccExtID != 0 {
			if buffer := params.Config.BufferFactory.GetBuffer(info.SSRC); buffer != nil {
				params.Logger.Debugw(
					"set twcc and ext id",
					"ssrc", info.SSRC,
					"isRTX", mime.GetMimeTypeCodec(info.MimeType) == mime.MimeTypeCodecRTX,
					"twccExtID", twccExtID,
				)
				buffer.SetTWCCAndExtID(params.Twcc, uint8(twccExtID))
			} else {
				params.Logger.Warnw("failed to get buffer for stream", nil, "ssrc", info.SSRC)
			}
		}
	}
	rtxInfoExtractorFactory := sfuinterceptor.NewRTXInfoExtractorFactory(
		setTWCCForVideo,
		func(repair, base uint32, rsid string) {
			params.Logger.Debugw("rtx pair found from extension", "repair", repair, "base", base, "rsid", rsid)
			params.Config.BufferFactory.SetRTXPair(repair, base, rsid)
		},
		params.Logger,
	)
	// put rtx interceptor behind unhandle simulcast interceptor so it can get the correct mid & rid
	ir.Add(rtxInfoExtractorFactory)

	api := webrtc.NewAPI(
		webrtc.WithMediaEngine(me),
		webrtc.WithSettingEngine(se),
		webrtc.WithInterceptorRegistry(ir),
	)
	pc, err := api.NewPeerConnection(params.Config.Configuration)
	return pc, me, rtxInfoExtractorFactory, err
}

func NewPCTransport(params TransportParams) (*PCTransport, error) {
	if params.Logger == nil {
		params.Logger = logger.GetLogger()
	}
	t := &PCTransport{
		params:             params,
		debouncedNegotiate: sfuutils.NewDebouncer(negotiationFrequency),
		negotiationState:   transport.NegotiationStateNone,
		eventsQueue: utils.NewTypedOpsQueue[event](utils.OpsQueueParams{
			Name:    "transport",
			MinSize: 64,
			Logger:  params.Logger,
		}),
		previousTrackDescription: make(map[string]*trackDescription),
		canReuseTransceiver:      true,
		connectionDetails:        types.NewICEConnectionDetails(params.Transport, params.Logger),
		lastNegotiate:            time.Now(),
	}
	t.localOfferId.Store(uint32(rand.Intn(1<<8) + 1))

	bwe, err := t.createPeerConnection()
	if err != nil {
		return nil, err
	}

	if params.IsSendSide {
		if params.CongestionControlConfig.UseSendSideBWE {
			params.Logger.Infow("using send side BWE", "pacerBehavior", params.CongestionControlConfig.SendSideBWEPacer)
			t.bwe = sendsidebwe.NewSendSideBWE(sendsidebwe.SendSideBWEParams{
				Config: params.CongestionControlConfig.SendSideBWE,
				Logger: params.Logger,
			})
			switch pacer.PacerBehavior(params.CongestionControlConfig.SendSideBWEPacer) {
			case pacer.PacerBehaviorPassThrough:
				t.pacer = pacer.NewPassThrough(params.Logger, t.bwe)
			case pacer.PacerBehaviorNoQueue:
				t.pacer = pacer.NewNoQueue(params.Logger, t.bwe)
			default:
				t.pacer = pacer.NewNoQueue(params.Logger, t.bwe)
			}
		} else {
			t.bwe = remotebwe.NewRemoteBWE(remotebwe.RemoteBWEParams{
				Config: params.CongestionControlConfig.RemoteBWE,
				Logger: params.Logger,
			})
			t.pacer = pacer.NewPassThrough(params.Logger, nil)
		}

		t.streamAllocator = streamallocator.NewStreamAllocator(streamallocator.StreamAllocatorParams{
			Config:    params.CongestionControlConfig.StreamAllocator,
			BWE:       t.bwe,
			Pacer:     t.pacer,
			RTTGetter: t.GetRTT,
			Logger:    params.Logger.WithComponent(utils.ComponentCongestionControl),
		}, params.CongestionControlConfig.Enabled, params.CongestionControlConfig.AllowPause)
		t.streamAllocator.OnStreamStateChange(params.Handler.OnStreamStateChange)
		t.streamAllocator.Start()

		if bwe != nil {
			t.streamAllocator.SetSendSideBWEInterceptor(bwe)
		}
	}

	t.eventsQueue.Start()

	return t, nil
}

func (t *PCTransport) createPeerConnection() (cc.BandwidthEstimator, error) {
	var bwe cc.BandwidthEstimator
	pc, me, rtxInfoExtractorFactory, err := newPeerConnection(t.params, func(estimator cc.BandwidthEstimator) {
		bwe = estimator
	})
	if err != nil {
		return bwe, err
	}

	t.pc = pc
	if !t.params.UseOneShotSignallingMode {
		// one shot signalling mode gathers all candidates and sends in answer
		t.pc.OnICEGatheringStateChange(t.onICEGatheringStateChange)
		t.pc.OnICECandidate(t.onICECandidateTrickle)
	}
	t.pc.OnICEConnectionStateChange(t.onICEConnectionStateChange)
	t.pc.OnConnectionStateChange(t.onPeerConnectionStateChange)

	t.pc.OnDataChannel(t.onDataChannel)
	t.pc.OnTrack(t.params.Handler.OnTrack)

	t.iceTransport = t.pc.SCTP().Transport().ICETransport()
	if t.iceTransport == nil {
		return bwe, ErrNoICETransport
	}
	t.iceTransport.OnSelectedCandidatePairChange(func(pair *webrtc.ICECandidatePair) {
		t.params.Logger.Debugw("selected ICE candidate pair changed", "pair", wrappedICECandidatePairLogger{pair})
		t.connectionDetails.SetSelectedPair(pair)
		existingPair := t.selectedPair.Load()
		if existingPair != nil {
			t.params.Logger.Infow(
				"ice reconnected or switched pair",
				"existingPair", wrappedICECandidatePairLogger{existingPair},
				"newPair", wrappedICECandidatePairLogger{pair})
		}
		t.selectedPair.Store(pair)
	})

	t.me = me

	t.rtxInfoExtractorFactory = rtxInfoExtractorFactory
	return bwe, nil
}

func (t *PCTransport) RTPStreamPublished(ssrc uint32, mid, rid string) {
	t.rtxInfoExtractorFactory.SetStreamInfo(ssrc, mid, rid, "")
}

func (t *PCTransport) GetPacer() pacer.Pacer {
	return t.pacer
}

func (t *PCTransport) SetSignalingRTT(rtt uint32) {
	t.signalingRTT.Store(rtt)
}

func (t *PCTransport) setICEStartedAt(at time.Time) {
	t.lock.Lock()
	if t.iceStartedAt.IsZero() {
		t.iceStartedAt = at

		// checklist of ice agent will be cleared on ice failed, get stats before that
		t.mayFailedICEStatsTimer = time.AfterFunc(iceFailedTimeoutTotal-time.Second, t.logMayFailedICEStats)

		// set failure timer for tcp ice connection based on signaling RTT
		if t.preferTCP.Load() {
			signalingRTT := t.signalingRTT.Load()
			if signalingRTT < 1000 {
				tcpICETimeout := time.Duration(signalingRTT*8) * time.Millisecond
				if tcpICETimeout < minTcpICEConnectTimeout {
					tcpICETimeout = minTcpICEConnectTimeout
				} else if tcpICETimeout > maxTcpICEConnectTimeout {
					tcpICETimeout = maxTcpICEConnectTimeout
				}
				t.params.Logger.Debugw("set TCP ICE connect timer", "timeout", tcpICETimeout, "signalRTT", signalingRTT)
				t.tcpICETimer = time.AfterFunc(tcpICETimeout, func() {
					if t.pc.ICEConnectionState() == webrtc.ICEConnectionStateChecking {
						t.params.Logger.Infow("TCP ICE connect timeout", "timeout", tcpICETimeout, "signalRTT", signalingRTT)
						t.logMayFailedICEStats()
						t.handleConnectionFailed(true)
					}
				})
			}
		}
	}
	t.lock.Unlock()
}

func (t *PCTransport) setICEConnectedAt(at time.Time) {
	t.lock.Lock()
	if t.iceConnectedAt.IsZero() {
		//
		// Record initial connection time.
		// This prevents reset of connected at time if ICE goes `Connected` -> `Disconnected` -> `Connected`.
		//
		t.iceConnectedAt = at

		// set failure timer for dtls handshake
		iceDuration := at.Sub(t.iceStartedAt)
		connTimeoutAfterICE := min(max(minConnectTimeoutAfterICE, 3*iceDuration), maxConnectTimeoutAfterICE)
		t.params.Logger.Debugw("setting connection timer after ICE connected", "timeout", connTimeoutAfterICE, "iceDuration", iceDuration)
		t.connectAfterICETimer = time.AfterFunc(connTimeoutAfterICE, func() {
			state := t.pc.ConnectionState()
			// if pc is still checking or connected but not fully established after timeout, then fire connection fail
			if state != webrtc.PeerConnectionStateClosed && state != webrtc.PeerConnectionStateFailed && !t.isFullyEstablished() {
				t.params.Logger.Infow("connect timeout after ICE connected", "timeout", connTimeoutAfterICE, "iceDuration", iceDuration)
				t.handleConnectionFailed(false)
			}
		})

		// clear tcp ice connect timer
		if t.tcpICETimer != nil {
			t.tcpICETimer.Stop()
			t.tcpICETimer = nil
		}
	}

	if t.mayFailedICEStatsTimer != nil {
		t.mayFailedICEStatsTimer.Stop()
		t.mayFailedICEStatsTimer = nil
	}
	t.mayFailedICEStats = nil
	t.lock.Unlock()
}

func (t *PCTransport) logMayFailedICEStats() {
	if t.pc.ConnectionState() == webrtc.PeerConnectionStateClosed {
		return
	}

	var candidatePairStats []webrtc.ICECandidatePairStats
	pairStats := t.pc.GetStats()
	candidateStats := make(map[string]webrtc.ICECandidateStats)
	for _, stat := range pairStats {
		switch stat := stat.(type) {
		case webrtc.ICECandidatePairStats:
			candidatePairStats = append(candidatePairStats, stat)
		case webrtc.ICECandidateStats:
			candidateStats[stat.ID] = stat
		}
	}

	iceStats := make([]iceCandidatePairStats, 0, len(candidatePairStats))
	for _, pairStat := range candidatePairStats {
		iceStat := iceCandidatePairStats{ICECandidatePairStats: pairStat}
		if local, ok := candidateStats[pairStat.LocalCandidateID]; ok {
			iceStat.local = local
		}
		if remote, ok := candidateStats[pairStat.RemoteCandidateID]; ok {
			remote.IP = MaybeTruncateIP(remote.IP)
			iceStat.remote = remote
		}
		iceStats = append(iceStats, iceStat)
	}

	t.lock.Lock()
	t.mayFailedICEStats = iceStats
	t.lock.Unlock()
}

func (t *PCTransport) resetShortConn() {
	t.params.Logger.Infow("resetting short connection on ICE restart")
	t.lock.Lock()
	t.iceStartedAt = time.Time{}
	t.iceConnectedAt = time.Time{}
	t.connectedAt = time.Time{}
	if t.connectAfterICETimer != nil {
		t.connectAfterICETimer.Stop()
		t.connectAfterICETimer = nil
	}
	if t.tcpICETimer != nil {
		t.tcpICETimer.Stop()
		t.tcpICETimer = nil
	}
	t.lock.Unlock()
}

func (t *PCTransport) IsShortConnection(at time.Time) (bool, time.Duration) {
	t.lock.RLock()
	defer t.lock.RUnlock()

	if t.iceConnectedAt.IsZero() {
		return false, 0
	}

	duration := at.Sub(t.iceConnectedAt)
	return duration < shortConnectionThreshold, duration
}

func (t *PCTransport) setConnectedAt(at time.Time) bool {
	t.lock.Lock()
	t.connectedAt = at
	if !t.firstConnectedAt.IsZero() {
		t.lock.Unlock()
		return false
	}

	t.firstConnectedAt = at
	prometheus.RecordServiceOperationSuccess("peer_connection")
	t.lock.Unlock()
	return true
}

func (t *PCTransport) onICEGatheringStateChange(state webrtc.ICEGatheringState) {
	t.params.Logger.Debugw("ice gathering state change", "state", state.String())
	if state != webrtc.ICEGatheringStateComplete {
		return
	}

	t.postEvent(event{
		signal: signalICEGatheringComplete,
	})
}

func (t *PCTransport) onICECandidateTrickle(c *webrtc.ICECandidate) {
	t.postEvent(event{
		signal: signalLocalICECandidate,
		data:   c,
	})
}

func (t *PCTransport) handleConnectionFailed(forceShortConn bool) {
	isShort := forceShortConn
	if !isShort {
		var duration time.Duration
		isShort, duration = t.IsShortConnection(time.Now())
		if isShort {
			t.params.Logger.Debugw("short ICE connection", "pair", wrappedICECandidatePairLogger{t.selectedPair.Load()}, "duration", duration)
		}
	}

	t.params.Handler.OnFailed(isShort, t.GetICEConnectionInfo())
}

func (t *PCTransport) onICEConnectionStateChange(state webrtc.ICEConnectionState) {
	t.params.Logger.Debugw("ice connection state change", "state", state.String())
	switch state {
	case webrtc.ICEConnectionStateConnected:
		t.setICEConnectedAt(time.Now())

	case webrtc.ICEConnectionStateChecking:
		t.setICEStartedAt(time.Now())
	}
}

func (t *PCTransport) onPeerConnectionStateChange(state webrtc.PeerConnectionState) {
	t.params.Logger.Debugw("peer connection state change", "state", state.String())
	switch state {
	case webrtc.PeerConnectionStateConnected:
		t.clearConnTimer()
		isInitialConnection := t.setConnectedAt(time.Now())
		if isInitialConnection {
			t.params.Handler.OnInitialConnected()

			t.maybeNotifyFullyEstablished()
		}
	case webrtc.PeerConnectionStateFailed:
		t.clearConnTimer()
		t.handleConnectionFailed(false)
	}
}

func (t *PCTransport) onDataChannel(dc *webrtc.DataChannel) {
	dc.OnOpen(func() {
		t.params.Logger.Debugw(dc.Label() + " data channel open")
		var kind livekit.DataPacket_Kind
		var isDataTrack bool
		var isUnlabeled bool
		switch dc.Label() {
		case ReliableDataChannel:
			kind = livekit.DataPacket_RELIABLE

		case LossyDataChannel:
			kind = livekit.DataPacket_LOSSY

		case DataTrackDataChannel:
			isDataTrack = true

		default:
			t.params.Logger.Infow("unlabeled datachannel added", "label", dc.Label())
			isUnlabeled = true
		}

		rawDC, err := dc.DetachWithDeadline()
		if err != nil {
			t.params.Logger.Errorw("failed to detach data channel", err, "label", dc.Label())
			return
		}

		isHandled := true
		t.lock.Lock()
		switch {
		case isUnlabeled:
			t.unlabeledDataChannels = append(
				t.unlabeledDataChannels,
				datachannel.NewDataChannelWriterReliable(dc, rawDC, t.params.DatachannelSlowThreshold),
			)

		case isDataTrack:
			if !t.params.EnableDataTracks {
				t.params.Logger.Debugw("data tracks not enabled")
				isHandled = false
			} else {
				if t.dataTrackDC != nil {
					t.dataTrackDC.Close()
				}
				t.dataTrackDC = datachannel.NewDataChannelWriterUnreliable(dc, rawDC, 0, 0)
			}

		case kind == livekit.DataPacket_RELIABLE:
			if t.reliableDC != nil {
				t.reliableDC.Close()
			}
			t.reliableDC = datachannel.NewDataChannelWriterReliable(dc, rawDC, t.params.DatachannelSlowThreshold)
			t.reliableDCOpened = true

		case kind == livekit.DataPacket_LOSSY:
			if t.lossyDC != nil {
				t.lossyDC.Close()
			}
			t.lossyDC = datachannel.NewDataChannelWriterUnreliable(dc, rawDC, t.params.DatachannelLossyTargetLatency, uint64(lossyDataChannelMinBufferedAmount))
			t.lossyDCOpened = true
		}
		t.lock.Unlock()

		if !isHandled {
			rawDC.Close()
			return
		}

		go func() {
			defer rawDC.Close()
			buffer := make([]byte, dataChannelBufferSize)
			for {
				n, _, err := rawDC.ReadDataChannel(buffer)
				if err != nil {
					if !errors.Is(err, io.EOF) && !strings.Contains(err.Error(), "state=Closed") {
						t.params.Logger.Warnw("error reading data channel", err, "label", dc.Label())
					}
					return
				}

				switch {
				case isUnlabeled:
					t.params.Handler.OnDataMessageUnlabeled(buffer[:n])

				case isDataTrack:
					t.params.Handler.OnDataTrackMessage(buffer[:n], mono.UnixNano())

				default:
					t.params.Handler.OnDataMessage(kind, buffer[:n])
				}
			}
		}()

		t.maybeNotifyFullyEstablished()
	})
}

func (t *PCTransport) maybeNotifyFullyEstablished() {
	if t.isFullyEstablished() {
		t.params.Handler.OnFullyEstablished()
	}
}

func (t *PCTransport) isFullyEstablished() bool {
	t.lock.RLock()
	defer t.lock.RUnlock()

	dataChannelReady := t.params.UseOneShotSignallingMode || t.firstOfferNoDataChannel || (t.reliableDCOpened && t.lossyDCOpened)

	return dataChannelReady && !t.connectedAt.IsZero()
}

func (t *PCTransport) SetPreferTCP(preferTCP bool) {
	t.preferTCP.Store(preferTCP)
}

func (t *PCTransport) AddICECandidate(candidate webrtc.ICECandidateInit) {
	t.postEvent(event{
		signal: signalRemoteICECandidate,
		data:   &candidate,
	})
}

func (t *PCTransport) queueOrConfigureSender(
	transceiver *webrtc.RTPTransceiver,
	enabledCodecs []*livekit.Codec,
	rtcpFeedbackConfig RTCPFeedbackConfig,
	enableAudioStereo bool,
	enableAudioNACK bool,
) {
	params := configureSenderParams{
		transceiver,
		enabledCodecs,
		rtcpFeedbackConfig,
		!t.params.IsOfferer,
		enableAudioStereo,
		enableAudioNACK,
	}
	if !t.params.IsOfferer {
		t.sendersPendingConfigMu.Lock()
		t.sendersPendingConfig = append(t.sendersPendingConfig, params)
		t.sendersPendingConfigMu.Unlock()
		return
	}

	configureSender(params)
}

func (t *PCTransport) processSendersPendingConfig() {
	t.sendersPendingConfigMu.Lock()
	pending := t.sendersPendingConfig
	t.sendersPendingConfig = nil
	t.sendersPendingConfigMu.Unlock()

	var unprocessed []configureSenderParams
	for _, p := range pending {
		if p.transceiver.Mid() == "" {
			unprocessed = append(unprocessed, p)
			continue
		}

		configureSender(p)
	}

	if len(unprocessed) != 0 {
		t.sendersPendingConfigMu.Lock()
		t.sendersPendingConfig = append(t.sendersPendingConfig, unprocessed...)
		t.sendersPendingConfigMu.Unlock()
	}
}

func (t *PCTransport) AddTrack(
	trackLocal webrtc.TrackLocal,
	params types.AddTrackParams,
	enabledCodecs []*livekit.Codec,
	rtcpFeedbackConfig RTCPFeedbackConfig,
) (sender *webrtc.RTPSender, transceiver *webrtc.RTPTransceiver, err error) {
	t.lock.Lock()
	canReuse := t.canReuseTransceiver
	td, ok := t.previousTrackDescription[trackLocal.ID()]
	if ok {
		delete(t.previousTrackDescription, trackLocal.ID())
	}
	t.lock.Unlock()

	// keep track use same mid after migration if possible
	if td != nil && td.sender != nil {
		for _, tr := range t.pc.GetTransceivers() {
			if tr.Mid() == td.mid {
				return td.sender, tr, tr.SetSender(td.sender, trackLocal)
			}
		}
	}

	// if never negotiated with client, can't reuse transceiver for track not subscribed before migration
	if !canReuse {
		return t.AddTransceiverFromTrack(trackLocal, params, enabledCodecs, rtcpFeedbackConfig)
	}

	sender, err = t.pc.AddTrack(trackLocal)
	if err != nil {
		return
	}

	for _, tr := range t.pc.GetTransceivers() {
		if tr.Sender() == sender {
			transceiver = tr
			break
		}
	}

	if transceiver == nil {
		err = ErrNoTransceiver
		return
	}

	t.queueOrConfigureSender(
		transceiver,
		enabledCodecs,
		rtcpFeedbackConfig,
		params.Stereo,
		!params.Red || !t.params.ClientInfo.SupportsAudioRED(),
	)

	t.adjustNumOutstandingMedia(transceiver)
	return
}

func (t *PCTransport) AddTransceiverFromTrack(
	trackLocal webrtc.TrackLocal,
	params types.AddTrackParams,
	enabledCodecs []*livekit.Codec,
	rtcpFeedbackConfig RTCPFeedbackConfig,
) (sender *webrtc.RTPSender, transceiver *webrtc.RTPTransceiver, err error) {
	transceiver, err = t.pc.AddTransceiverFromTrack(trackLocal)
	if err != nil {
		return
	}

	sender = transceiver.Sender()
	if sender == nil {
		err = ErrNoSender
		return
	}

	t.queueOrConfigureSender(
		transceiver,
		enabledCodecs,
		rtcpFeedbackConfig,
		params.Stereo,
		!params.Red || !t.params.ClientInfo.SupportsAudioRED(),
	)

	t.adjustNumOutstandingMedia(transceiver)
	return
}

func (t *PCTransport) AddTransceiverFromKind(
	kind webrtc.RTPCodecType,
	init webrtc.RTPTransceiverInit,
) (*webrtc.RTPTransceiver, error) {
	return t.pc.AddTransceiverFromKind(kind, init)
}

func (t *PCTransport) RemoveTrack(sender *webrtc.RTPSender) error {
	return t.pc.RemoveTrack(sender)
}

func (t *PCTransport) CurrentLocalDescription() *webrtc.SessionDescription {
	cld := t.pc.CurrentLocalDescription()
	if cld == nil {
		return nil
	}

	ld := *cld
	return &ld
}

func (t *PCTransport) CurrentRemoteDescription() *webrtc.SessionDescription {
	crd := t.pc.CurrentRemoteDescription()
	if crd == nil {
		return nil
	}

	rd := *crd
	return &rd
}

func (t *PCTransport) PendingRemoteDescription() *webrtc.SessionDescription {
	prd := t.pc.PendingRemoteDescription()
	if prd == nil {
		return nil
	}

	rd := *prd
	return &rd
}

func (t *PCTransport) GetMid(rtpReceiver *webrtc.RTPReceiver) string {
	tr := rtpReceiver.RTPTransceiver()
	if tr != nil {
		return tr.Mid()
	}

	return ""
}

func (t *PCTransport) GetRTPTransceiver(mid string) *webrtc.RTPTransceiver {
	for _, tr := range t.pc.GetTransceivers() {
		if tr.Mid() == mid {
			return tr
		}
	}

	return nil
}

func (t *PCTransport) GetRTPReceiver(mid string) *webrtc.RTPReceiver {
	for _, tr := range t.pc.GetTransceivers() {
		if tr.Mid() == mid {
			return tr.Receiver()
		}
	}

	return nil
}

func (t *PCTransport) getNumUnmatchedTransceivers() (uint32, uint32) {
	if t.isClosed.Load() || t.pc.ConnectionState() == webrtc.PeerConnectionStateClosed {
		return 0, 0
	}

	numAudios := uint32(0)
	numVideos := uint32(0)
	for _, tr := range t.pc.GetTransceivers() {
		if tr.Mid() != "" {
			continue
		}

		switch tr.Kind() {
		case webrtc.RTPCodecTypeAudio:
			numAudios++

		case webrtc.RTPCodecTypeVideo:
			numVideos++
		}
	}

	return numAudios, numVideos
}

func (t *PCTransport) CreateDataChannel(label string, dci *webrtc.DataChannelInit) error {
	if label == DataTrackDataChannel && !t.params.EnableDataTracks {
		t.params.Logger.Debugw("data tracks not enabled")
		return nil
	}

	dc, err := t.pc.CreateDataChannel(label, dci)
	if err != nil {
		return err
	}
	var (
		dcPtr       **datachannel.DataChannelWriter[*webrtc.DataChannel]
		dcReady     *bool
		isDataTrack bool
		isUnlabeled bool
		kind        livekit.DataPacket_Kind
	)
	switch dc.Label() {
	default:
		isUnlabeled = true
		t.params.Logger.Infow("unlabeled datachannel added", "label", dc.Label())

	case ReliableDataChannel:
		dcPtr = &t.reliableDC
		dcReady = &t.reliableDCOpened
		kind = livekit.DataPacket_RELIABLE

	case LossyDataChannel:
		dcPtr = &t.lossyDC
		dcReady = &t.lossyDCOpened
		kind = livekit.DataPacket_LOSSY

	case DataTrackDataChannel:
		dcPtr = &t.dataTrackDC
		isDataTrack = true
	}

	dc.OnOpen(func() {
		rawDC, err := dc.DetachWithDeadline()
		if err != nil {
			t.params.Logger.Warnw("failed to detach data channel", err)
			return
		}

		var slowThreshold int
		if dc.Label() == ReliableDataChannel || isUnlabeled {
			slowThreshold = t.params.DatachannelSlowThreshold
		}

		t.lock.Lock()
		if isUnlabeled {
			t.unlabeledDataChannels = append(
				t.unlabeledDataChannels,
				datachannel.NewDataChannelWriterReliable(dc, rawDC, slowThreshold),
			)
		} else {
			if *dcPtr != nil {
				(*dcPtr).Close()
			}
			switch {
			case dcPtr == &t.reliableDC:
				*dcPtr = datachannel.NewDataChannelWriterReliable(dc, rawDC, slowThreshold)
			case dcPtr == &t.lossyDC:
				*dcPtr = datachannel.NewDataChannelWriterUnreliable(dc, rawDC, t.params.DatachannelLossyTargetLatency, uint64(lossyDataChannelMinBufferedAmount))
			case dcPtr == &t.dataTrackDC:
				*dcPtr = datachannel.NewDataChannelWriterUnreliable(dc, rawDC, 0, 0)
			}
			if dcReady != nil {
				*dcReady = true
			}
		}
		t.lock.Unlock()
		t.params.Logger.Debugw(dc.Label() + " data channel open")

		go func() {
			defer rawDC.Close()
			buffer := make([]byte, dataChannelBufferSize)
			for {
				n, _, err := rawDC.ReadDataChannel(buffer)
				if err != nil {
					if !errors.Is(err, io.EOF) && !strings.Contains(err.Error(), "state=Closed") {
						t.params.Logger.Warnw("error reading data channel", err, "label", dc.Label())
					}
					return
				}

				switch {
				case isUnlabeled:
					t.params.Handler.OnDataMessageUnlabeled(buffer[:n])

				case isDataTrack:
					t.params.Handler.OnDataTrackMessage(buffer[:n], mono.UnixNano())

				default:
					t.params.Handler.OnDataMessage(kind, buffer[:n])
				}
			}
		}()

		t.maybeNotifyFullyEstablished()
	})

	return nil
}

// for testing only
func (t *PCTransport) CreateReadableDataChannel(label string, dci *webrtc.DataChannelInit) error {
	dc, err := t.pc.CreateDataChannel(label, dci)
	if err != nil {
		return err
	}

	dc.OnOpen(func() {
		t.params.Logger.Debugw(dc.Label() + " data channel open")
		rawDC, err := dc.DetachWithDeadline()
		if err != nil {
			t.params.Logger.Errorw("failed to detach data channel", err, "label", dc.Label())
			return
		}

		t.lock.Lock()
		t.unlabeledDataChannels = append(
			t.unlabeledDataChannels,
			datachannel.NewDataChannelWriterReliable(dc, rawDC, t.params.DatachannelSlowThreshold),
		)
		t.lock.Unlock()

		go func() {
			defer rawDC.Close()
			buffer := make([]byte, dataChannelBufferSize)
			for {
				n, _, err := rawDC.ReadDataChannel(buffer)
				if err != nil {
					if !errors.Is(err, io.EOF) && !strings.Contains(err.Error(), "state=Closed") {
						t.params.Logger.Warnw("error reading data channel", err, "label", dc.Label())
					}
					return
				}

				t.params.Handler.OnDataMessageUnlabeled(buffer[:n])
			}
		}()
	})
	return nil
}

func (t *PCTransport) CreateDataChannelIfEmpty(dcLabel string, dci *webrtc.DataChannelInit) (label string, id uint16, existing bool, err error) {
	if dcLabel == DataTrackDataChannel && !t.params.EnableDataTracks {
		t.params.Logger.Debugw("data tracks not enabled")
		err = errors.New("data tracks not enabled")
		return
	}

	t.lock.RLock()
	var dcw *datachannel.DataChannelWriter[*webrtc.DataChannel]
	switch dcLabel {
	case ReliableDataChannel:
		dcw = t.reliableDC
	case LossyDataChannel:
		dcw = t.lossyDC
	case DataTrackDataChannel:
		dcw = t.dataTrackDC
	default:
		t.params.Logger.Warnw("unknown data channel label", nil, "label", label)
		err = errors.New("unknown data channel label")
	}
	t.lock.RUnlock()
	if err != nil {
		return
	}

	if dcw != nil {
		dc := dcw.BufferedAmountGetter()
		return dc.Label(), *dc.ID(), true, nil
	}

	dc, err := t.pc.CreateDataChannel(dcLabel, dci)
	if err != nil {
		return
	}

	t.onDataChannel(dc)
	return dc.Label(), *dc.ID(), false, nil
}

func (t *PCTransport) GetRTT() (float64, bool) {
	scps, ok := t.iceTransport.GetSelectedCandidatePairStats()
	if !ok {
		return 0.0, false
	}

	return scps.CurrentRoundTripTime, true
}

func (t *PCTransport) IsEstablished() bool {
	return t.pc.ConnectionState() != webrtc.PeerConnectionStateNew
}

func (t *PCTransport) HasEverConnected() bool {
	t.lock.RLock()
	defer t.lock.RUnlock()

	return !t.firstConnectedAt.IsZero()
}

func (t *PCTransport) GetICEConnectionInfo() *types.ICEConnectionInfo {
	return t.connectionDetails.GetInfo()
}

func (t *PCTransport) GetICEConnectionType() types.ICEConnectionType {
	return t.connectionDetails.GetConnectionType()
}

func (t *PCTransport) WriteRTCP(pkts []rtcp.Packet) error {
	return t.pc.WriteRTCP(pkts)
}

func (t *PCTransport) SendDataMessage(kind livekit.DataPacket_Kind, data []byte) error {
	convertFromUserPacket := false
	var dc *datachannel.DataChannelWriter[*webrtc.DataChannel]
	t.lock.RLock()
	if t.params.UseOneShotSignallingMode {
		if len(t.unlabeledDataChannels) > 0 {
			// use the first unlabeled to send
			dc = t.unlabeledDataChannels[0]
		}
		convertFromUserPacket = true
	} else {
		if kind == livekit.DataPacket_RELIABLE {
			dc = t.reliableDC
		} else {
			dc = t.lossyDC
		}
	}
	t.lock.RUnlock()

	if convertFromUserPacket {
		dp := &livekit.DataPacket{}
		if err := proto.Unmarshal(data, dp); err != nil {
			return err
		}

		switch payload := dp.Value.(type) {
		case *livekit.DataPacket_User:
			return t.sendDataMessage(dc, payload.User.Payload)
		default:
			return errors.New("cannot forward non user data packet")
		}
	}

	return t.sendDataMessage(dc, data)
}

func (t *PCTransport) SendDataMessageUnlabeled(data []byte, useRaw bool, sender livekit.ParticipantIdentity) error {
	convertToUserPacket := false
	var dc *datachannel.DataChannelWriter[*webrtc.DataChannel]
	t.lock.RLock()
	if t.params.UseOneShotSignallingMode || useRaw {
		if len(t.unlabeledDataChannels) > 0 {
			// use the first unlabeled to send
			dc = t.unlabeledDataChannels[0]
		}
	} else {
		if t.reliableDC != nil {
			dc = t.reliableDC
		} else if t.lossyDC != nil {
			dc = t.lossyDC
		}

		convertToUserPacket = true
	}
	t.lock.RUnlock()

	if convertToUserPacket {
		dpData, err := proto.Marshal(&livekit.DataPacket{
			ParticipantIdentity: string(sender),
			Value: &livekit.DataPacket_User{
				User: &livekit.UserPacket{Payload: data},
			},
		})
		if err != nil {
			return err
		}
		return t.sendDataMessage(dc, dpData)
	}

	return t.sendDataMessage(dc, data)
}

func (t *PCTransport) SendDataTrackMessage(data []byte) error {
	t.lock.RLock()
	dc := t.dataTrackDC
	t.lock.RUnlock()

	return t.sendDataMessage(dc, data)
}

func (t *PCTransport) sendDataMessage(dc *datachannel.DataChannelWriter[*webrtc.DataChannel], data []byte) error {
	if dc == nil {
		return ErrDataChannelUnavailable
	}

	if t.pc.ConnectionState() == webrtc.PeerConnectionStateFailed {
		return ErrTransportFailure
	}

	if t.params.DatachannelSlowThreshold == 0 && t.params.DataChannelMaxBufferedAmount > 0 && dc.BufferedAmountGetter().BufferedAmount() > t.params.DataChannelMaxBufferedAmount {
		return ErrDataChannelBufferFull
	}
	_, err := dc.Write(data)
	return err
}

func (t *PCTransport) Close() {
	if t.isClosed.Swap(true) {
		return
	}

	<-t.eventsQueue.Stop()
	t.clearSignalStateCheckTimer()

	if t.streamAllocator != nil {
		t.streamAllocator.Stop()
	}

	if t.pacer != nil {
		t.pacer.Stop()
	}

	t.clearConnTimer()

	t.lock.Lock()
	if t.mayFailedICEStatsTimer != nil {
		t.mayFailedICEStatsTimer.Stop()
		t.mayFailedICEStatsTimer = nil
	}

	if t.reliableDC != nil {
		t.reliableDC.Close()
		t.reliableDC = nil
	}

	if t.lossyDC != nil {
		t.lossyDC.Close()
		t.lossyDC = nil
	}

	if t.dataTrackDC != nil {
		t.dataTrackDC.Close()
		t.dataTrackDC = nil
	}

	for _, dc := range t.unlabeledDataChannels {
		dc.Close()
	}
	t.unlabeledDataChannels = nil
	t.lock.Unlock()

	if err := t.pc.Close(); err != nil {
		t.params.Logger.Warnw("unclean close of peer connection", err)
	}

	t.outputAndClearICEStats()
}

func (t *PCTransport) clearConnTimer() {
	t.lock.Lock()
	defer t.lock.Unlock()

	if t.connectAfterICETimer != nil {
		t.connectAfterICETimer.Stop()
		t.connectAfterICETimer = nil
	}

	if t.tcpICETimer != nil {
		t.tcpICETimer.Stop()
		t.tcpICETimer = nil
	}
}

func (t *PCTransport) HandleRemoteDescription(sd webrtc.SessionDescription, remoteId uint32) error {
	if t.params.UseOneShotSignallingMode {
		if sd.Type == webrtc.SDPTypeOffer {
			remoteOfferId := t.remoteOfferId.Load()
			if remoteOfferId != 0 && remoteOfferId != t.localAnswerId.Load() {
				t.params.Logger.Warnw(
					"sdp state: multiple offers without answer", nil,
					"remoteOfferId", remoteOfferId,
					"localAnswerId", t.localAnswerId.Load(),
					"receivedRemoteOfferId", remoteId,
				)
			}
			t.remoteOfferId.Store(remoteId)
		} else {
			if remoteId != 0 && remoteId != t.localOfferId.Load() {
				t.params.Logger.Warnw("sdp state: answer id mismatch", nil, "expected", t.localOfferId.Load(), "got", remoteId)
			}
			t.remoteAnswerId.Store(remoteId)
		}

		// add remote candidates to ICE connection details
		parsed, err := sd.Unmarshal()
		if err == nil {
			addRemoteICECandidates := func(attrs []sdp.Attribute) {
				for _, a := range attrs {
					if a.IsICECandidate() {
						c, err := ice.UnmarshalCandidate(a.Value)
						if err != nil {
							continue
						}
						t.connectionDetails.AddRemoteICECandidate(c, false, false, false)
					}
				}
			}

			addRemoteICECandidates(parsed.Attributes)
			for _, m := range parsed.MediaDescriptions {
				addRemoteICECandidates(m.Attributes)
			}
		}

		err = t.pc.SetRemoteDescription(sd)
		if err != nil {
			t.params.Logger.Errorw("could not set remote description on synchronous mode peer connection", err)
			return err
		}

		rtxRepairs := nonSimulcastRTXRepairsFromSDP(parsed, t.params.Logger)
		if len(rtxRepairs) > 0 {
			t.params.Logger.Debugw("rtx pairs found from sdp", "ssrcs", rtxRepairs)
			for repair, base := range rtxRepairs {
				t.params.Config.BufferFactory.SetRTXPair(repair, base, "")
			}
		}
		return nil
	}

	t.postEvent(event{
		signal: signalRemoteDescriptionReceived,
		data: remoteDescriptionData{
			sessionDescription: &sd,
			remoteId:           remoteId,
		},
	})
	return nil
}

func (t *PCTransport) GetAnswer() (webrtc.SessionDescription, uint32, error) {
	if !t.params.UseOneShotSignallingMode {
		return webrtc.SessionDescription{}, 0, ErrNotSynchronousLocalCandidatesMode
	}

	prd := t.pc.PendingRemoteDescription()
	if prd == nil || prd.Type != webrtc.SDPTypeOffer {
		return webrtc.SessionDescription{}, 0, ErrNoRemoteDescription
	}

	answer, err := t.pc.CreateAnswer(nil)
	if err != nil {
		return webrtc.SessionDescription{}, 0, err
	}

	if err = t.pc.SetLocalDescription(answer); err != nil {
		return webrtc.SessionDescription{}, 0, err
	}

	// wait for gathering to complete to include all candidates in the answer
	<-webrtc.GatheringCompletePromise(t.pc)

	cld := t.pc.CurrentLocalDescription()

	// add local candidates to ICE connection details
	parsed, err := cld.Unmarshal()
	if err == nil {
		addLocalICECandidates := func(attrs []sdp.Attribute) {
			for _, a := range attrs {
				if a.IsICECandidate() {
					c, err := ice.UnmarshalCandidate(a.Value)
					if err != nil {
						continue
					}
					t.connectionDetails.AddLocalICECandidate(c, false, false)
				}
			}
		}

		addLocalICECandidates(parsed.Attributes)
		for _, m := range parsed.MediaDescriptions {
			addLocalICECandidates(m.Attributes)
		}
	}

	answerId := t.remoteOfferId.Load()
	t.localAnswerId.Store(answerId)

	return *cld, answerId, nil
}

func (t *PCTransport) GetICESessionUfrag() (string, error) {
	cld := t.pc.CurrentLocalDescription()
	if cld == nil {
		return "", ErrNoLocalDescription
	}

	parsed, err := cld.Unmarshal()
	if err != nil {
		return "", err
	}

	ufrag, _, err := lksdp.ExtractICECredential(parsed)
	if err != nil {
		return "", err
	}

	return ufrag, nil
}

// Handles SDP Fragment for ICE Trickle in WHIP
func (t *PCTransport) HandleICETrickleSDPFragment(sdpFragment string) error {
	if !t.params.UseOneShotSignallingMode {
		return ErrNotSynchronousLocalCandidatesMode
	}

	parsedFragment := &lksdp.SDPFragment{}
	if err := parsedFragment.Unmarshal(sdpFragment); err != nil {
		t.params.Logger.Warnw("could not parse SDP fragment", err, "sdpFragment", sdpFragment)
		return ErrInvalidSDPFragment
	}

	crd := t.pc.CurrentRemoteDescription()
	if crd == nil {
		t.params.Logger.Warnw("no remote description", nil)
		return ErrNoRemoteDescription
	}

	parsedRemote, err := crd.Unmarshal()
	if err != nil {
		t.params.Logger.Warnw("could not parse remote description", err, "offer", crd)
		return err
	}

	// check if BUNDLE mid matches the "mid" in the SDP fragment
	bundleMid, found := lksdp.GetBundleMid(parsedRemote)
	if !found {
		return ErrNoBundleMid
	}

	if parsedFragment.Mid() != bundleMid {
		t.params.Logger.Warnw("incorrect mid", nil, "sdpFragment", sdpFragment)
		return ErrMidMismatch
	}

	fragmentICEUfrag, fragmentICEPwd, err := parsedFragment.ExtractICECredential()
	if err != nil {
		t.params.Logger.Warnw(
			"could not get ICE crendential from fragment", err,
			"sdpFragment", sdpFragment,
		)
		return ErrInvalidSDPFragment
	}
	remoteICEUfrag, remoteICEPwd, err := lksdp.ExtractICECredential(parsedRemote)
	if err != nil {
		t.params.Logger.Warnw("could not get ICE crendential from remote description", err, "sdpFragment", sdpFragment, "remoteDescription", crd)
		return err
	}
	if fragmentICEUfrag != "" && fragmentICEUfrag != remoteICEUfrag {
		t.params.Logger.Warnw(
			"ice ufrag mismatch", nil,
			"remoteICEUfrag", remoteICEUfrag,
			"fragmentICEUfrag", fragmentICEUfrag,
			"sdpFragment", sdpFragment,
			"remoteDescription", crd,
		)
		return ErrICECredentialMismatch
	}
	if fragmentICEPwd != "" && fragmentICEPwd != remoteICEPwd {
		t.params.Logger.Warnw(
			"ice pwd mismatch", nil,
			"remoteICEPwd", remoteICEPwd,
			"fragmentICEPwd", fragmentICEPwd,
			"sdpFragment", sdpFragment,
			"remoteDescription", crd,
		)
		return ErrICECredentialMismatch
	}

	// add candidates from media description
	for _, ic := range parsedFragment.Candidates() {
		c, err := ice.UnmarshalCandidate(ic)
		if err == nil {
			t.connectionDetails.AddRemoteICECandidate(c, false, false, false)
		}

		candidate := webrtc.ICECandidateInit{
			Candidate: ic,
		}
		if err := t.pc.AddICECandidate(candidate); err != nil {
			t.params.Logger.Warnw("failed to add ICE candidate", err, "candidate", candidate)
		} else {
			t.params.Logger.Debugw("added ICE candidate", "candidate", candidate)
		}
	}
	return nil
}

// Handles SDP Fragment for ICE Restart in WHIP
func (t *PCTransport) HandleICERestartSDPFragment(sdpFragment string) (string, error) {
	if !t.params.UseOneShotSignallingMode {
		return "", ErrNotSynchronousLocalCandidatesMode
	}

	parsedFragment := &lksdp.SDPFragment{}
	if err := parsedFragment.Unmarshal(sdpFragment); err != nil {
		t.params.Logger.Warnw("could not parse SDP fragment", err, "sdpFragment", sdpFragment)
		return "", ErrInvalidSDPFragment
	}

	crd := t.pc.CurrentRemoteDescription()
	if crd == nil {
		t.params.Logger.Warnw("no remote description", nil)
		return "", ErrNoRemoteDescription
	}

	parsedRemote, err := crd.Unmarshal()
	if err != nil {
		t.params.Logger.Warnw("could not parse remote description", err, "offer", crd)
		return "", err
	}

	if err := parsedFragment.PatchICECredentialAndCandidatesIntoSDP(parsedRemote); err != nil {
		t.params.Logger.Warnw("could not patch SDP fragment into remote description", err, "offer", crd, "sdpFragment", sdpFragment)
		return "", err
	}

	bytes, err := parsedRemote.Marshal()
	if err != nil {
		t.params.Logger.Warnw("could not marshal SDP with patched remote", err)
		return "", err
	}
	sd := webrtc.SessionDescription{
		SDP:  string(bytes),
		Type: webrtc.SDPTypeOffer,
	}
	if err := t.pc.SetRemoteDescription(sd); err != nil {
		t.params.Logger.Warnw("could not set remote description", err)
		return "", err
	}

	// clear out connection details on ICE restart and re-populate
	t.connectionDetails.Clear()
	for _, candidate := range parsedFragment.Candidates() {
		c, err := ice.UnmarshalCandidate(candidate)
		if err != nil {
			continue
		}
		t.connectionDetails.AddRemoteICECandidate(c, false, false, false)
	}

	ans, err := t.pc.CreateAnswer(nil)
	if err != nil {
		t.params.Logger.Warnw("could not create answer", err)
		return "", err
	}

	if err = t.pc.SetLocalDescription(ans); err != nil {
		t.params.Logger.Warnw("could not set local description", err)
		return "", err
	}

	// wait for gathering to complete to include all candidates in the answer
	<-webrtc.GatheringCompletePromise(t.pc)

	cld := t.pc.CurrentLocalDescription()

	// add local candidates to ICE connection details
	parsedAnswer, err := cld.Unmarshal()
	if err != nil {
		t.params.Logger.Warnw("could not parse local description", err)
		return "", err
	}

	addLocalICECandidates := func(attrs []sdp.Attribute) {
		for _, a := range attrs {
			if a.IsICECandidate() {
				c, err := ice.UnmarshalCandidate(a.Value)
				if err != nil {
					continue
				}
				t.connectionDetails.AddLocalICECandidate(c, false, false)
			}
		}
	}

	addLocalICECandidates(parsedAnswer.Attributes)
	for _, m := range parsedAnswer.MediaDescriptions {
		addLocalICECandidates(m.Attributes)
	}

	parsedFragmentAnswer, err := lksdp.ExtractSDPFragment(parsedAnswer)
	if err != nil {
		t.params.Logger.Warnw("could not extract SDP fragment", err)
		return "", err
	}

	answerFragment, err := parsedFragmentAnswer.Marshal()
	if err != nil {
		t.params.Logger.Warnw("could not marshal answer SDP fragment", err)
		return "", err
	}

	return answerFragment, nil
}

func (t *PCTransport) OnNegotiationStateChanged(f func(state transport.NegotiationState)) {
	t.lock.Lock()
	t.onNegotiationStateChanged = f
	t.lock.Unlock()
}

func (t *PCTransport) getOnNegotiationStateChanged() func(state transport.NegotiationState) {
	t.lock.RLock()
	defer t.lock.RUnlock()

	return t.onNegotiationStateChanged
}

func (t *PCTransport) Negotiate(force bool) {
	if t.isClosed.Load() {
		return
	}

	var postEvent bool
	t.lock.Lock()
	if force {
		t.debouncedNegotiate.Add(func() {
			// no op to cancel pending negotiation
		})
		t.debouncePending = false
		t.updateLastNegotiateLocked()

		postEvent = true
	} else {
		if !t.debouncePending {
			if time.Since(t.lastNegotiate) > negotiationFrequency {
				t.debouncedNegotiate.SetDuration(fastNegotiationFrequency)
			} else {
				t.debouncedNegotiate.SetDuration(negotiationFrequency)
			}

			t.debouncedNegotiate.Add(func() {
				t.lock.Lock()
				t.debouncePending = false
				t.updateLastNegotiateLocked()
				t.lock.Unlock()

				t.postEvent(event{
					signal: signalSendOffer,
				})
			})
			t.debouncePending = true
		}
	}
	t.lock.Unlock()

	if postEvent {
		t.postEvent(event{
			signal: signalSendOffer,
		})
	}
}

func (t *PCTransport) updateLastNegotiateLocked() {
	if now := time.Now(); now.After(t.lastNegotiate) {
		t.lastNegotiate = now
	}
}

func (t *PCTransport) ICERestart() error {
	if t.pc.ConnectionState() == webrtc.PeerConnectionStateClosed {
		t.params.Logger.Warnw("trying to restart ICE on closed peer connection", nil)
		return ErrIceRestartOnClosedPeerConnection
	}

	t.postEvent(event{
		signal: signalICERestart,
	})
	return nil
}

func (t *PCTransport) ResetShortConnOnICERestart() {
	t.resetShortConnOnICERestart.Store(true)
}

func (t *PCTransport) AddTrackToStreamAllocator(subTrack types.SubscribedTrack) {
	if t.streamAllocator == nil {
		return
	}

	layers := buffer.GetVideoLayersForMimeType(
		subTrack.DownTrack().Mime(),
		subTrack.MediaTrack().ToProto(),
	)
	t.streamAllocator.AddTrack(subTrack.DownTrack(), streamallocator.AddTrackParams{
		Source:         subTrack.MediaTrack().Source(),
		IsMultiLayered: len(layers) > 1,
		PublisherID:    subTrack.MediaTrack().PublisherID(),
	})
}

func (t *PCTransport) RemoveTrackFromStreamAllocator(subTrack types.SubscribedTrack) {
	if t.streamAllocator == nil {
		return
	}

	t.streamAllocator.RemoveTrack(subTrack.DownTrack())
}

func (t *PCTransport) SetAllowPauseOfStreamAllocator(allowPause bool) {
	if t.streamAllocator == nil {
		return
	}

	t.streamAllocator.SetAllowPause(allowPause)
}

func (t *PCTransport) SetChannelCapacityOfStreamAllocator(channelCapacity int64) {
	if t.streamAllocator == nil {
		return
	}

	t.streamAllocator.SetChannelCapacity(channelCapacity)
}

func (t *PCTransport) preparePC(previousAnswer webrtc.SessionDescription) error {
	// sticky data channel to first m-lines, if someday we don't send sdp without media streams to
	// client's subscribe pc after joining, should change this step
	parsed, err := previousAnswer.Unmarshal()
	if err != nil {
		return err
	}
	fp, fpHahs, err := lksdp.ExtractFingerprint(parsed)
	if err != nil {
		return err
	}

	offer, err := t.pc.CreateOffer(nil)
	if err != nil {
		return err
	}
	if err := t.pc.SetLocalDescription(offer); err != nil {
		return err
	}

	//
	// Simulate client side peer connection and set DTLS role from previous answer.
	// Role needs to be set properly (one side needs to be server and the other side
	// needs to be the client) for DTLS connection to form properly. As this is
	// trying to replicate previous setup, read from previous answer and use that role.
	//
	se := webrtc.SettingEngine{}
	_ = se.SetAnsweringDTLSRole(lksdp.ExtractDTLSRole(parsed))
	se.SetIgnoreRidPauseForRecv(true)
	api := webrtc.NewAPI(
		webrtc.WithSettingEngine(se),
		webrtc.WithMediaEngine(t.me),
	)
	pc2, err := api.NewPeerConnection(webrtc.Configuration{
		SDPSemantics: webrtc.SDPSemanticsUnifiedPlan,
	})
	if err != nil {
		return err
	}
	defer pc2.Close()

	if err := pc2.SetRemoteDescription(offer); err != nil {
		return err
	}
	ans, err := pc2.CreateAnswer(nil)
	if err != nil {
		return err
	}

	// replace client's fingerprint into dummy pc's answer, for pion's dtls process, it will
	// keep the fingerprint at first call of SetRemoteDescription, if dummy pc and client pc use
	// different fingerprint, that will cause pion denied dtls data after handshake with client
	// complete (can't pass fingerprint change).
	// in this step, we don't established connection with dummy pc(no candidate swap), just use
	// sdp negotiation to sticky data channel and keep client's fingerprint
	parsedAns, _ := ans.Unmarshal()
	fpLine := fpHahs + " " + fp
	replaceFP := func(attrs []sdp.Attribute, fpLine string) {
		for k := range attrs {
			if attrs[k].Key == "fingerprint" {
				attrs[k].Value = fpLine
			}
		}
	}
	replaceFP(parsedAns.Attributes, fpLine)
	for _, m := range parsedAns.MediaDescriptions {
		replaceFP(m.Attributes, fpLine)
	}
	bytes, err := parsedAns.Marshal()
	if err != nil {
		return err
	}
	ans.SDP = string(bytes)

	return t.pc.SetRemoteDescription(ans)
}

func (t *PCTransport) initPCWithPreviousAnswer(previousAnswer webrtc.SessionDescription) (map[string]*webrtc.RTPSender, error) {
	senders := make(map[string]*webrtc.RTPSender)
	parsed, err := previousAnswer.Unmarshal()
	if err != nil {
		return senders, err
	}
	for _, m := range parsed.MediaDescriptions {
		var codecType webrtc.RTPCodecType
		switch m.MediaName.Media {
		case "video":
			codecType = webrtc.RTPCodecTypeVideo
		case "audio":
			codecType = webrtc.RTPCodecTypeAudio
		case "application":
			if t.params.IsOfferer {
				// for pion generate unmatched sdp, it always appends data channel to last m-lines,
				// that not consistent with our previous answer that data channel might at middle-line
				// because sdp can negotiate multi times before migration.(it will sticky to the last m-line at first negotiate)
				// so use a dummy pc to negotiate sdp to fixed the datachannel's mid at same position with previous answer
				if err := t.preparePC(previousAnswer); err != nil {
					t.params.Logger.Warnw("prepare pc for migration failed", err)
					return senders, err
				}
			}
			continue
		default:
			continue
		}

		if !t.params.IsOfferer {
			// `sendrecv` or `sendonly` means this transceiver is used for sending

			// Note that a transceiver previously used to send could be `inactive`.
			// Let those transceivers be created when remote description is set.
			_, ok1 := m.Attribute(webrtc.RTPTransceiverDirectionSendrecv.String())
			_, ok2 := m.Attribute(webrtc.RTPTransceiverDirectionSendonly.String())
			if !ok1 && !ok2 {
				continue
			}
		}

		tr, err := t.pc.AddTransceiverFromKind(
			codecType,
			webrtc.RTPTransceiverInit{
				Direction: webrtc.RTPTransceiverDirectionSendonly,
			},
		)
		if err != nil {
			return senders, err
		}
		mid := lksdp.GetMidValue(m)
		if mid == "" {
			return senders, ErrMidNotFound
		}
		tr.SetMid(mid)

		// save mid -> senders for migration reuse
		sender := tr.Sender()
		senders[mid] = sender

		// set transceiver to inactive
		tr.SetSender(sender, nil)
	}
	return senders, nil
}

func (t *PCTransport) SetPreviousSdp(localDescription, remoteDescription *webrtc.SessionDescription) {
	// when there is no answer, cannot migrate, force a full reconnect
	if (t.params.IsOfferer && remoteDescription == nil) || (!t.params.IsOfferer && localDescription == nil) {
		t.onNegotiationFailed(true, "no previous answer")
		return
	}

	t.lock.Lock()
	var (
		senders   map[string]*webrtc.RTPSender
		err       error
		parseMids bool
	)
	if t.params.IsOfferer {
		if t.pc.RemoteDescription() == nil && t.previousAnswer == nil {
			t.previousAnswer = remoteDescription
			senders, err = t.initPCWithPreviousAnswer(*remoteDescription)
			parseMids = true
		}
	} else {
		if t.pc.LocalDescription() == nil {
			senders, err = t.initPCWithPreviousAnswer(*localDescription)
			parseMids = true
		}
	}
	if err != nil {
		t.lock.Unlock()
		t.onNegotiationFailed(true, fmt.Sprintf("initPCWithPreviousAnswer failed, error: %s", err))
		return
	}

	if localDescription != nil && parseMids {
		// in migration case, can't reuse transceiver before negotiating excepted tracks
		// that were subscribed at previous node
		t.canReuseTransceiver = false
		if err := t.parseTrackMid(*localDescription, senders); err != nil {
			t.params.Logger.Warnw(
				"parse previous local description failed", err,
				"localDescription", localDescription.SDP,
			)
		}
	}

	if t.params.IsOfferer {
		// disable fast negotiation temporarily after migration to avoid sending offer
		// contains part of subscribed tracks before migration, let the subscribed track
		// resume at the same time.
		t.lastNegotiate = time.Now().Add(iceFailedTimeoutTotal)
	}
	t.lock.Unlock()
}

func (t *PCTransport) parseTrackMid(sd webrtc.SessionDescription, senders map[string]*webrtc.RTPSender) error {
	parsed, err := sd.Unmarshal()
	if err != nil {
		return err
	}

	t.previousTrackDescription = make(map[string]*trackDescription)
	for _, m := range parsed.MediaDescriptions {
		msid, ok := m.Attribute(sdp.AttrKeyMsid)
		if !ok {
			continue
		}

		if split := strings.Split(msid, " "); len(split) == 2 {
			trackID := split[1]
			mid := lksdp.GetMidValue(m)
			if mid == "" {
				return ErrMidNotFound
			}
			if sender, ok := senders[mid]; ok {
				t.previousTrackDescription[trackID] = &trackDescription{mid, sender}
			}
		}
	}
	return nil
}

func (t *PCTransport) postEvent(e event) {
	e.PCTransport = t
	t.eventsQueue.Enqueue(func(e event) {
		var err error
		switch e.signal {
		case signalICEGatheringComplete:
			err = e.handleICEGatheringComplete(e)
		case signalLocalICECandidate:
			err = e.handleLocalICECandidate(e)
		case signalRemoteICECandidate:
			err = e.handleRemoteICECandidate(e)
		case signalSendOffer:
			err = e.handleSendOffer(e)
		case signalRemoteDescriptionReceived:
			err = e.handleRemoteDescriptionReceived(e)
		case signalICERestart:
			err = e.handleICERestart(e)
		}
		if err != nil {
			if !e.isClosed.Load() {
				e.onNegotiationFailed(true, fmt.Sprintf("error handling event. err: %s, event: %s", err, e))
			}
		}
	}, e)
}

func (t *PCTransport) handleICEGatheringComplete(_ event) error {
	if t.params.IsOfferer {
		return t.handleICEGatheringCompleteOfferer()
	} else {
		return t.handleICEGatheringCompleteAnswerer()
	}
}

func (t *PCTransport) handleICEGatheringCompleteOfferer() error {
	if !t.restartAfterGathering {
		return nil
	}

	t.params.Logger.Debugw("restarting ICE after ICE gathering")
	t.restartAfterGathering = false
	return t.doICERestart()
}

func (t *PCTransport) handleICEGatheringCompleteAnswerer() error {
	if t.pendingRestartIceOffer == nil {
		return nil
	}

	offer := *t.pendingRestartIceOffer
	t.pendingRestartIceOffer = nil

	t.params.Logger.Debugw("accept remote restart ice offer after ICE gathering")
	if err := t.setRemoteDescription(offer); err != nil {
		return err
	}
	t.params.Handler.OnSetRemoteDescriptionOffer()
	t.processSendersPendingConfig()

	return t.createAndSendAnswer()
}

func (t *PCTransport) localDescriptionSent() error {
	if !t.cacheLocalCandidates {
		return nil
	}

	t.cacheLocalCandidates = false

	cachedLocalCandidates := t.cachedLocalCandidates
	t.cachedLocalCandidates = nil

	for _, c := range cachedLocalCandidates {
		if err := t.params.Handler.OnICECandidate(c, t.params.Transport); err != nil {
			t.params.Logger.Warnw("failed to send cached ICE candidate", err, "candidate", c)
			return err
		}
	}
	return nil
}

func (t *PCTransport) clearLocalDescriptionSent() {
	t.cacheLocalCandidates = true
	t.cachedLocalCandidates = nil
	t.connectionDetails.Clear()
}

func (t *PCTransport) handleLocalICECandidate(e event) error {
	c := e.data.(*webrtc.ICECandidate)

	filtered := false
	if c != nil {
		if t.preferTCP.Load() && c.Protocol != webrtc.ICEProtocolTCP {
			t.params.Logger.Debugw("filtering out local candidate", "candidate", c.String())
			filtered = true
		}
		t.connectionDetails.AddLocalCandidate(c, filtered, true)
	}

	if filtered {
		return nil
	}

	if t.cacheLocalCandidates {
		t.cachedLocalCandidates = append(t.cachedLocalCandidates, c)
		return nil
	}

	if err := t.params.Handler.OnICECandidate(c, t.params.Transport); err != nil {
		t.params.Logger.Warnw("failed to send ICE candidate", err, "candidate", c)
		return err
	}

	return nil
}

func (t *PCTransport) handleRemoteICECandidate(e event) error {
	c := e.data.(*webrtc.ICECandidateInit)

	filtered := false
	if t.preferTCP.Load() && !strings.Contains(strings.ToLower(c.Candidate), "tcp") {
		t.params.Logger.Debugw("filtering out remote candidate", "candidate", c.Candidate)
		filtered = true
	}

	if !t.params.Config.UseMDNS && types.IsCandidateMDNS(*c) {
		t.params.Logger.Debugw("ignoring mDNS candidate", "candidate", c.Candidate)
		filtered = true
	}

	t.connectionDetails.AddRemoteCandidate(*c, filtered, true, false)
	if filtered {
		return nil
	}

	if t.pc.RemoteDescription() == nil {
		t.pendingRemoteCandidates = append(t.pendingRemoteCandidates, c)
		return nil
	}

	if err := t.pc.AddICECandidate(*c); err != nil {
		t.params.Logger.Warnw("failed to add ICE candidate", err, "candidate", c)
		// ignore ParseAddr error as it does not affect ICE connectivity
		if !strings.Contains(err.Error(), "ParseAddr") {
			return errors.Wrap(err, "add ice candidate failed")
		}
	} else {
		t.params.Logger.Debugw("added ICE candidate", "candidate", c)
	}

	return nil
}

func (t *PCTransport) setNegotiationState(state transport.NegotiationState) {
	t.negotiationState = state
	if onNegotiationStateChanged := t.getOnNegotiationStateChanged(); onNegotiationStateChanged != nil {
		onNegotiationStateChanged(t.negotiationState)
	}
}

func (t *PCTransport) filterCandidates(sd webrtc.SessionDescription, preferTCP, isLocal bool) webrtc.SessionDescription {
	parsed, err := sd.Unmarshal()
	if err != nil {
		t.params.Logger.Warnw("could not unmarshal SDP to filter candidates", err)
		return sd
	}

	filterAttributes := func(attrs []sdp.Attribute) []sdp.Attribute {
		filteredAttrs := make([]sdp.Attribute, 0, len(attrs))
		for _, a := range attrs {
			if a.IsICECandidate() {
				c, err := ice.UnmarshalCandidate(a.Value)
				if err != nil {
					t.params.Logger.Errorw("failed to unmarshal candidate in sdp", err, "isLocal", isLocal, "sdp", sd.SDP)
					filteredAttrs = append(filteredAttrs, a)
					continue
				}
				excluded := preferTCP && !c.NetworkType().IsTCP()
				if !excluded {
					if !t.params.Config.UseMDNS && types.IsICECandidateMDNS(c) {
						excluded = true
					}
				}
				if !excluded {
					filteredAttrs = append(filteredAttrs, a)
				}

				if isLocal {
					t.connectionDetails.AddLocalICECandidate(c, excluded, false)
				} else {
					t.connectionDetails.AddRemoteICECandidate(c, excluded, false, false)
				}
			} else {
				filteredAttrs = append(filteredAttrs, a)
			}
		}

		return filteredAttrs
	}

	parsed.Attributes = filterAttributes(parsed.Attributes)
	for _, m := range parsed.MediaDescriptions {
		m.Attributes = filterAttributes(m.Attributes)
	}

	bytes, err := parsed.Marshal()
	if err != nil {
		t.params.Logger.Warnw("could not marshal SDP to filter candidates", err)
		return sd
	}
	sd.SDP = string(bytes)
	return sd
}

func (t *PCTransport) clearSignalStateCheckTimer() {
	if t.signalStateCheckTimer != nil {
		t.signalStateCheckTimer.Stop()
		t.signalStateCheckTimer = nil
	}
}

func (t *PCTransport) setupSignalStateCheckTimer() {
	t.clearSignalStateCheckTimer()

	negotiateVersion := t.negotiateCounter.Inc()
	t.signalStateCheckTimer = time.AfterFunc(negotiationFailedTimeout, func() {
		t.clearSignalStateCheckTimer()

		failed := t.negotiationState != transport.NegotiationStateNone

		if t.negotiateCounter.Load() == negotiateVersion && failed && t.pc.ConnectionState() == webrtc.PeerConnectionStateConnected {
			t.onNegotiationFailed(false, "negotiation timed out")
		}
	})
}

func (t *PCTransport) adjustNumOutstandingMedia(transceiver *webrtc.RTPTransceiver) {
	if transceiver.Mid() != "" {
		return
	}

	t.lock.Lock()
	if transceiver.Kind() == webrtc.RTPCodecTypeAudio {
		t.numOutstandingAudios++
	} else {
		t.numOutstandingVideos++
	}
	t.lock.Unlock()
}

func (t *PCTransport) sendUnmatchedMediaRequirement(force bool) error {
	// if there are unmatched media sections, notify remote peer to generate offer with
	// enough media section in subsequent offers
	t.lock.Lock()
	numAudios := t.numOutstandingAudios - t.numRequestSentAudios
	t.numRequestSentAudios += numAudios

	numVideos := t.numOutstandingVideos - t.numRequestSentVideos
	t.numRequestSentVideos += numVideos
	t.lock.Unlock()

	if force || (numAudios+numVideos) != 0 {
		if err := t.params.Handler.OnUnmatchedMedia(numAudios, numVideos); err != nil {
			return errors.Wrap(err, "could not send unmatched media requirements")
		}
	}

	return nil
}

func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error {
	if t.pc.ConnectionState() == webrtc.PeerConnectionStateClosed {
		t.params.Logger.Warnw("trying to send offer on closed peer connection", nil)
		return nil
	}

	// when there's an ongoing negotiation, let it finish and not disrupt its state
	if t.negotiationState == transport.NegotiationStateRemote {
		t.params.Logger.Debugw("skipping negotiation, trying again later")
		t.setNegotiationState(transport.NegotiationStateRetry)
		return nil
	} else if t.negotiationState == transport.NegotiationStateRetry {
		// already set to retry, we can safely skip this attempt
		return nil
	}

	ensureICERestart := func(options *webrtc.OfferOptions) *webrtc.OfferOptions {
		if options == nil {
			options = &webrtc.OfferOptions{}
		}
		options.ICERestart = true
		return options
	}

	t.lock.Lock()
	if t.previousAnswer != nil {
		t.previousAnswer = nil
		options = ensureICERestart(options)
		t.params.Logger.Infow("ice restart due to previous answer")
	}
	t.lock.Unlock()

	if t.restartAtNextOffer {
		t.restartAtNextOffer = false
		options = ensureICERestart(options)
		t.params.Logger.Infow("ice restart at next offer")
	}

	if options != nil && options.ICERestart {
		t.clearLocalDescriptionSent()
	}

	offer, err := t.pc.CreateOffer(options)
	if err != nil {
		if errors.Is(err, webrtc.ErrConnectionClosed) {
			t.params.Logger.Warnw("trying to create offer on closed peer connection", nil)
			return nil
		}

		prometheus.RecordServiceOperationError("offer", "create")
		return errors.Wrap(err, "create offer failed")
	}

	preferTCP := t.preferTCP.Load()
	if preferTCP {
		t.params.Logger.Debugw("local offer (unfiltered)", "sdp", offer.SDP)
	}

	err = t.pc.SetLocalDescription(offer)
	if err != nil {
		if errors.Is(err, webrtc.ErrConnectionClosed) {
			t.params.Logger.Warnw("trying to set local description on closed peer connection", nil)
			return nil
		}

		prometheus.RecordServiceOperationError("offer", "local_description")
		return errors.Wrap(err, "setting local description failed")
	}

	//
	// Filter after setting local description as pion expects the offer
	// to match between CreateOffer and SetLocalDescription.
	// Filtered offer is sent to remote so that remote does not
	// see filtered candidates.
	//
	offer = t.filterCandidates(offer, preferTCP, true)
	if preferTCP {
		t.params.Logger.Debugw("local offer (filtered)", "sdp", offer.SDP)
	}

	// indicate waiting for remote
	t.setNegotiationState(transport.NegotiationStateRemote)

	t.setupSignalStateCheckTimer()

	remoteAnswerId := t.remoteAnswerId.Load()
	if remoteAnswerId != 0 && remoteAnswerId != t.localOfferId.Load() {
		t.params.Logger.Warnw(
			"sdp state: sending offer before receiving answer", nil,
			"localOfferId", t.localOfferId.Load(),
			"remoteAnswerId", remoteAnswerId,
		)
	}

	if err := t.params.Handler.OnOffer(offer, t.localOfferId.Inc(), t.getMidToTrackIDMapping()); err != nil {
		prometheus.RecordServiceOperationError("offer", "write_message")
		return errors.Wrap(err, "could not send offer")
	}
	prometheus.RecordServiceOperationSuccess("offer")

	return t.localDescriptionSent()
}

func (t *PCTransport) handleSendOffer(_ event) error {
	if !t.params.IsOfferer {
		return t.sendUnmatchedMediaRequirement(true)
	}

	return t.createAndSendOffer(nil)
}

type remoteDescriptionData struct {
	sessionDescription *webrtc.SessionDescription
	remoteId           uint32
}

func (t *PCTransport) handleRemoteDescriptionReceived(e event) error {
	rdd := e.data.(remoteDescriptionData)
	if rdd.sessionDescription.Type == webrtc.SDPTypeOffer {
		return t.handleRemoteOfferReceived(rdd.sessionDescription, rdd.remoteId)
	} else {
		return t.handleRemoteAnswerReceived(rdd.sessionDescription, rdd.remoteId)
	}
}

func (t *PCTransport) isRemoteOfferRestartICE(parsed *sdp.SessionDescription) (string, bool, error) {
	user, pwd, err := lksdp.ExtractICECredential(parsed)
	if err != nil {
		return "", false, err
	}

	credential := fmt.Sprintf("%s:%s", user, pwd)
	// ice credential changed, remote offer restart ice
	restartICE := t.currentOfferIceCredential != "" && t.currentOfferIceCredential != credential
	return credential, restartICE, nil
}

func (t *PCTransport) setRemoteDescription(sd webrtc.SessionDescription) error {
	// filter before setting remote description so that pion does not see filtered remote candidates
	preferTCP := t.preferTCP.Load()
	if preferTCP {
		t.params.Logger.Debugw("remote description (unfiltered)", "type", sd.Type, "sdp", sd.SDP)
	}
	sd = t.filterCandidates(sd, preferTCP, false)
	if preferTCP {
		t.params.Logger.Debugw("remote description (filtered)", "type", sd.Type, "sdp", sd.SDP)
	}

	if err := t.pc.SetRemoteDescription(sd); err != nil {
		if errors.Is(err, webrtc.ErrConnectionClosed) {
			t.params.Logger.Warnw("trying to set remote description on closed peer connection", nil)
			return nil
		}

		sdpType := "offer"
		if sd.Type == webrtc.SDPTypeAnswer {
			sdpType = "answer"
		}
		prometheus.RecordServiceOperationError(sdpType, "remote_description")
		return errors.Wrap(err, "setting remote description failed")
	} else if sd.Type == webrtc.SDPTypeAnswer {
		t.lock.Lock()
		if !t.canReuseTransceiver {
			t.canReuseTransceiver = true
			t.previousTrackDescription = make(map[string]*trackDescription)
		}
		t.lock.Unlock()
	}

	for _, c := range t.pendingRemoteCandidates {
		if err := t.pc.AddICECandidate(*c); err != nil {
			t.params.Logger.Warnw("failed to add cached ICE candidate", err, "candidate", c)
			return errors.Wrap(err, "add ice candidate failed")
		} else {
			t.params.Logger.Debugw("added cached ICE candidate", "candidate", c)
		}
	}
	t.pendingRemoteCandidates = nil

	return nil
}

func (t *PCTransport) createAndSendAnswer() error {
	numOutstandingAudios, numOutstandingVideos := t.getNumUnmatchedTransceivers()
	t.lock.Lock()
	t.numOutstandingAudios, t.numOutstandingVideos = numOutstandingAudios, numOutstandingVideos
	t.numRequestSentAudios, t.numRequestSentVideos = 0, 0
	t.lock.Unlock()

	answer, err := t.pc.CreateAnswer(nil)
	if err != nil {
		if errors.Is(err, webrtc.ErrConnectionClosed) {
			t.params.Logger.Warnw("trying to create answer on closed peer connection", nil)
			return nil
		}

		prometheus.RecordServiceOperationError("answer", "create")
		return errors.Wrap(err, "create answer failed")
	}

	preferTCP := t.preferTCP.Load()
	if preferTCP {
		t.params.Logger.Debugw("local answer (unfiltered)", "sdp", answer.SDP)
	}

	if err = t.pc.SetLocalDescription(answer); err != nil {
		prometheus.RecordServiceOperationError("answer", "local_description")
		return errors.Wrap(err, "setting local description failed")
	}

	//
	// Filter after setting local description as pion expects the answer
	// to match between CreateAnswer and SetLocalDescription.
	// Filtered answer is sent to remote so that remote does not
	// see filtered candidates.
	//
	answer = t.filterCandidates(answer, preferTCP, true)
	if preferTCP {
		t.params.Logger.Debugw("local answer (filtered)", "sdp", answer.SDP)
	}

	localAnswerId := t.localAnswerId.Load()
	if localAnswerId != 0 && localAnswerId >= t.remoteOfferId.Load() {
		t.params.Logger.Warnw(
			"sdp state: duplicate answer", nil,
			"localAnswerId", localAnswerId,
			"remoteOfferId", t.remoteOfferId.Load(),
		)
	}

	answerId := t.remoteOfferId.Load()

	if err := t.params.Handler.OnAnswer(answer, answerId, t.getMidToTrackIDMapping()); err != nil {
		prometheus.RecordServiceOperationError("answer", "write_message")
		return errors.Wrap(err, "could not send answer")
	}
	t.localAnswerId.Store(answerId)
	prometheus.RecordServiceOperationSuccess("asnwer")

	if err := t.sendUnmatchedMediaRequirement(false); err != nil {
		return err
	}

	t.lock.Lock()
	if !t.canReuseTransceiver {
		t.canReuseTransceiver = true
		t.previousTrackDescription = make(map[string]*trackDescription)
	}
	t.lock.Unlock()

	return t.localDescriptionSent()
}

func (t *PCTransport) handleRemoteOfferReceived(sd *webrtc.SessionDescription, offerId uint32) error {
	t.params.Logger.Debugw("processing offer", "offerId", offerId)
	remoteOfferId := t.remoteOfferId.Load()
	if remoteOfferId != 0 && remoteOfferId != t.localAnswerId.Load() {
		t.params.Logger.Warnw(
			"sdp state: multiple offers without answer", nil,
			"remoteOfferId", remoteOfferId,
			"localAnswerId", t.localAnswerId.Load(),
			"receivedRemoteOfferId", offerId,
		)
	}
	t.remoteOfferId.Store(offerId)

	parsed, err := sd.Unmarshal()
	if err != nil {
		return err
	}

	t.lock.Lock()
	if !t.firstOfferReceived {
		t.firstOfferReceived = true
		var dataChannelFound bool
		for _, media := range parsed.MediaDescriptions {
			if strings.EqualFold(media.MediaName.Media, "application") {
				dataChannelFound = true
				break
			}
		}
		t.firstOfferNoDataChannel = !dataChannelFound
	}
	t.lock.Unlock()

	iceCredential, offerRestartICE, err := t.isRemoteOfferRestartICE(parsed)
	if err != nil {
		return errors.Wrap(err, "check remote offer restart ice failed")
	}

	if offerRestartICE && t.pendingRestartIceOffer == nil {
		t.clearLocalDescriptionSent()
	}

	if offerRestartICE && t.pc.ICEGatheringState() == webrtc.ICEGatheringStateGathering {
		t.params.Logger.Debugw("remote offer restart ice while ice gathering")
		t.pendingRestartIceOffer = sd
		return nil
	}

	if offerRestartICE && t.resetShortConnOnICERestart.CompareAndSwap(true, false) {
		t.resetShortConn()
	}

	if offerRestartICE {
		t.outputAndClearICEStats()
	}

	if err := t.setRemoteDescription(*sd); err != nil {
		return err
	}
	t.params.Handler.OnSetRemoteDescriptionOffer()
	t.processSendersPendingConfig()

	rtxRepairs := nonSimulcastRTXRepairsFromSDP(parsed, t.params.Logger)
	if len(rtxRepairs) > 0 {
		t.params.Logger.Debugw("rtx pairs found from sdp", "ssrcs", rtxRepairs)
		for repair, base := range rtxRepairs {
			t.params.Config.BufferFactory.SetRTXPair(repair, base, "")
		}
	}

	if t.currentOfferIceCredential == "" || offerRestartICE {
		t.currentOfferIceCredential = iceCredential
	}

	return t.createAndSendAnswer()
}

func (t *PCTransport) handleRemoteAnswerReceived(sd *webrtc.SessionDescription, answerId uint32) error {
	t.params.Logger.Debugw("processing answer", "answerId", answerId)
	if answerId != 0 && answerId != t.localOfferId.Load() {
		t.params.Logger.Warnw(
			"sdp state: answer id mismatch", nil,
			"expected", t.localOfferId.Load(),
			"got", answerId,
		)
	}
	t.remoteAnswerId.Store(answerId)

	t.clearSignalStateCheckTimer()

	if err := t.setRemoteDescription(*sd); err != nil {
		// Pion will call RTPSender.Send method for each new added Downtrack, and return error if the DownTrack.Bind
		// returns error. In case of Downtrack.Bind returns ErrUnsupportedCodec, the signal state will be stable as negotiation is aleady compelted
		// before startRTPSenders, and the peerconnection state can be recovered by next negotiation which will be triggered
		// by the SubscriptionManager unsubscribe the failure DownTrack. So don't treat this error as negotiation failure.
		if !errors.Is(err, webrtc.ErrUnsupportedCodec) {
			return err
		}
	}

	if t.negotiationState == transport.NegotiationStateRetry {
		t.setNegotiationState(transport.NegotiationStateNone)

		t.params.Logger.Debugw("re-negotiate after receiving answer")
		return t.createAndSendOffer(nil)
	}

	t.setNegotiationState(transport.NegotiationStateNone)
	return nil
}

func (t *PCTransport) doICERestart() error {
	if t.pc.ConnectionState() == webrtc.PeerConnectionStateClosed {
		t.params.Logger.Warnw("trying to restart ICE on closed peer connection", nil)
		return nil
	}

	// if restart is requested, but negotiation never started
	iceGatheringState := t.pc.ICEGatheringState()
	if iceGatheringState == webrtc.ICEGatheringStateNew {
		t.params.Logger.Debugw("skipping ICE restart on not yet started peer connection")
		return nil
	}

	// if restart is requested, and we are not ready, then continue afterwards
	if iceGatheringState == webrtc.ICEGatheringStateGathering {
		t.params.Logger.Debugw("deferring ICE restart to after gathering")
		t.restartAfterGathering = true
		return nil
	}

	if t.resetShortConnOnICERestart.CompareAndSwap(true, false) {
		t.resetShortConn()
	}

	if t.negotiationState == transport.NegotiationStateNone {
		t.outputAndClearICEStats()
		return t.createAndSendOffer(&webrtc.OfferOptions{ICERestart: true})
	}

	currentRemoteDescription := t.pc.CurrentRemoteDescription()
	if currentRemoteDescription == nil {
		// restart without current remote description, send current local description again to try recover
		offer := t.pc.LocalDescription()
		if offer == nil {
			// it should not happen, log just in case
			t.params.Logger.Warnw("ice restart without local offer", nil)
			return ErrIceRestartWithoutLocalSDP
		} else {
			t.params.Logger.Infow("deferring ice restart to next offer")
			t.setNegotiationState(transport.NegotiationStateRetry)
			t.restartAtNextOffer = true

			remoteAnswerId := t.remoteAnswerId.Load()
			if remoteAnswerId != 0 && remoteAnswerId != t.localOfferId.Load() {
				t.params.Logger.Warnw(
					"sdp state: answer not received in ICE restart", nil,
					"localOfferId", t.localOfferId.Load(),
					"remoteAnswerId", remoteAnswerId,
				)
			}

			err := t.params.Handler.OnOffer(*offer, t.localOfferId.Inc(), t.getMidToTrackIDMapping())
			if err != nil {
				prometheus.RecordServiceOperationError("offer", "write_message")
			} else {
				prometheus.RecordServiceOperationSuccess("offer")
			}
			return err
		}
	} else {
		// recover by re-applying the last answer
		t.params.Logger.Infow("recovering from client negotiation state on ICE restart")
		if err := t.pc.SetRemoteDescription(*currentRemoteDescription); err != nil {
			prometheus.RecordServiceOperationError("offer", "remote_description")
			return errors.Wrap(err, "set remote description failed")
		} else {
			t.setNegotiationState(transport.NegotiationStateNone)
			t.outputAndClearICEStats()
			return t.createAndSendOffer(&webrtc.OfferOptions{ICERestart: true})
		}
	}
}

func (t *PCTransport) handleICERestart(_ event) error {
	return t.doICERestart()
}

func (t *PCTransport) onNegotiationFailed(warning bool, reason string) {
	logFields := []any{
		"reason", reason,
		"localCurrent", t.pc.CurrentLocalDescription(),
		"localPending", t.pc.PendingLocalDescription(),
		"remoteCurrent", t.pc.CurrentRemoteDescription(),
		"remotePending", t.pc.PendingRemoteDescription(),
	}
	if warning {
		t.params.Logger.Warnw(
			"negotiation failed",
			nil,
			logFields...,
		)
	} else {
		t.params.Logger.Infow("negotiation failed", logFields...)
	}
	t.params.Handler.OnNegotiationFailed()
}

func (t *PCTransport) outputAndClearICEStats() {
	t.lock.Lock()
	stats := t.mayFailedICEStats
	t.mayFailedICEStats = nil
	t.lock.Unlock()

	if len(stats) > 0 {
		t.params.Logger.Infow("ICE candidate pair stats", "stats", iceCandidatePairStatsEncoder{stats})
	}
}

func (t *PCTransport) getMidToTrackIDMapping() map[string]string {
	transceivers := t.pc.GetTransceivers()
	midToTrackID := make(map[string]string, len(transceivers))
	for _, tr := range transceivers {
		if mid := tr.Mid(); mid != "" {
			if sender := tr.Sender(); sender != nil {
				if track := sender.Track(); track != nil {
					midToTrackID[mid] = track.ID()
				}
			}
		}
	}
	return midToTrackID
}

// ----------------------

type configureSenderParams struct {
	transceiver              *webrtc.RTPTransceiver
	enabledCodecs            []*livekit.Codec
	rtcpFeedbackConfig       RTCPFeedbackConfig
	filterOutH264HighProfile bool
	enableAudioStereo        bool
	enableAudioNACK          bool
}

func configureSender(params configureSenderParams) {
	configureSenderCodecs(
		params.transceiver,
		params.enabledCodecs,
		params.rtcpFeedbackConfig,
		params.filterOutH264HighProfile,
	)

	if params.transceiver.Kind() == webrtc.RTPCodecTypeAudio {
		configureSenderAudio(params.transceiver, params.enableAudioStereo, params.enableAudioNACK)
	}
}

// configure subscriber transceiver for audio stereo and nack
// pion doesn't support per transciver codec configuration, so the nack of this session will be disabled
// forever once it is first disabled by a transceiver.
func configureSenderAudio(tr *webrtc.RTPTransceiver, stereo bool, nack bool) {
	sender := tr.Sender()
	if sender == nil {
		return
	}

	// enable stereo
	codecs := sender.GetParameters().Codecs
	configCodecs := make([]webrtc.RTPCodecParameters, 0, len(codecs))
	for _, c := range codecs {
		if mime.IsMimeTypeStringOpus(c.MimeType) {
			c.SDPFmtpLine = strings.ReplaceAll(c.SDPFmtpLine, ";sprop-stereo=1", "")
			if stereo {
				c.SDPFmtpLine += ";sprop-stereo=1"
			}
			if !nack {
				for i, fb := range c.RTCPFeedback {
					if fb.Type == webrtc.TypeRTCPFBNACK {
						c.RTCPFeedback = append(c.RTCPFeedback[:i], c.RTCPFeedback[i+1:]...)
						break
					}
				}
			}
		}
		configCodecs = append(configCodecs, c)
	}

	tr.SetCodecPreferences(configCodecs)
}

// In single peer connection mode, set up enebled codecs for sender.
// The config provides config of direction.
// For publisher peer connection those are publish enabled codecs
// and for subscriber peer connection those are subscribe enabled codecs.
//
// But, in single peer connection mode, if setting up a transceiver where the media is
// flowing in the other direction, the other direction codec config needs to be set.
func configureSenderCodecs(
	tr *webrtc.RTPTransceiver,
	enabledCodecs []*livekit.Codec,
	rtcpFeedbackConfig RTCPFeedbackConfig,
	filterOutH264HighProfile bool,
) {
	if len(enabledCodecs) == 0 {
		return
	}

	sender := tr.Sender()
	if sender == nil {
		return
	}

	filteredCodecs := filterCodecs(
		sender.GetParameters().Codecs,
		enabledCodecs,
		rtcpFeedbackConfig,
		filterOutH264HighProfile,
	)
	tr.SetCodecPreferences(filteredCodecs)
}

func configureReceiverCodecs(
	tr *webrtc.RTPTransceiver,
	preferredMimeType string,
	compliesWithCodecOrderInSDPAnswer bool,
) {
	receiver := tr.Receiver()
	if receiver == nil {
		return
	}

	var preferredCodecs, leftCodecs []webrtc.RTPCodecParameters
	for _, c := range receiver.GetParameters().Codecs {
		if tr.Kind() == webrtc.RTPCodecTypeAudio {
			nackFound := false
			for _, fb := range c.RTCPFeedback {
				if fb.Type == webrtc.TypeRTCPFBNACK {
					nackFound = true
					break
				}
			}

			if !nackFound {
				c.RTCPFeedback = append(c.RTCPFeedback, webrtc.RTCPFeedback{Type: webrtc.TypeRTCPFBNACK})
			}
		}

		if mime.GetMimeTypeCodec(preferredMimeType) == mime.GetMimeTypeCodec(c.RTPCodecCapability.MimeType) {
			preferredCodecs = append(preferredCodecs, c)
		} else {
			leftCodecs = append(leftCodecs, c)
		}
	}
	if len(preferredCodecs) == 0 {
		return
	}

	reorderedCodecs := append([]webrtc.RTPCodecParameters{}, preferredCodecs...)
	if tr.Kind() == webrtc.RTPCodecTypeVideo {
		// if the client don't comply with codec order in SDP answer, only keep preferred codecs to force client to use it
		if compliesWithCodecOrderInSDPAnswer {
			reorderedCodecs = append(reorderedCodecs, leftCodecs...)
		}
	} else {
		reorderedCodecs = append(reorderedCodecs, leftCodecs...)
	}
	tr.SetCodecPreferences(reorderedCodecs)
}

func nonSimulcastRTXRepairsFromSDP(s *sdp.SessionDescription, logger logger.Logger) map[uint32]uint32 {
	rtxRepairFlows := map[uint32]uint32{}
	for _, media := range s.MediaDescriptions {
		// extract rtx repair flows from the media section for non-simulcast stream,
		// pion will handle simulcast streams by rid probe, don't need handle it here.
		var ridFound bool
		rtxPairs := make(map[uint32]uint32)
	findRTX:
		for _, attr := range media.Attributes {
			switch attr.Key {
			case "rid":
				ridFound = true
				break findRTX
			case sdp.AttrKeySSRCGroup:
				split := strings.Split(attr.Value, " ")
				if split[0] == sdp.SemanticTokenFlowIdentification {
					// Essentially lines like `a=ssrc-group:FID 2231627014 632943048` are processed by this section
					// as this declares that the second SSRC (632943048) is a rtx repair flow (RFC4588) for the first
					// (2231627014) as specified in RFC5576
					if len(split) == 3 {
						baseSsrc, err := strconv.ParseUint(split[1], 10, 32)
						if err != nil {
							logger.Warnw("Failed to parse SSRC", err, "ssrc", split[1])
							continue
						}
						rtxRepairFlow, err := strconv.ParseUint(split[2], 10, 32)
						if err != nil {
							logger.Warnw("Failed to parse SSRC", err, "ssrc", split[2])
							continue
						}
						rtxPairs[uint32(rtxRepairFlow)] = uint32(baseSsrc)
					}
				}
			}
		}
		if !ridFound {
			maps.Copy(rtxRepairFlows, rtxPairs)
		}
	}

	return rtxRepairFlows
}

// ----------------------

type iceCandidatePairStatsEncoder struct {
	stats []iceCandidatePairStats
}

func (e iceCandidatePairStatsEncoder) MarshalLogArray(arr zapcore.ArrayEncoder) error {
	for _, s := range e.stats {
		if err := arr.AppendObject(s); err != nil {
			return err
		}
	}
	return nil
}

type iceCandidatePairStats struct {
	webrtc.ICECandidatePairStats
	local, remote webrtc.ICECandidateStats
}

func (r iceCandidatePairStats) MarshalLogObject(e zapcore.ObjectEncoder) error {
	candidateToString := func(c webrtc.ICECandidateStats) string {
		return fmt.Sprintf("%s:%d %s type(%s/%s), priority(%d)", c.IP, c.Port, c.Protocol, c.CandidateType, c.RelayProtocol, c.Priority)
	}
	e.AddString("state", string(r.State))
	e.AddBool("nominated", r.Nominated)
	e.AddString("local", candidateToString(r.local))
	e.AddString("remote", candidateToString(r.remote))
	e.AddUint64("requestsSent", r.RequestsSent)
	e.AddUint64("responsesReceived", r.ResponsesReceived)
	e.AddUint64("requestsReceived", r.RequestsReceived)
	e.AddUint64("responsesSent", r.ResponsesSent)
	e.AddTime("firstRequestSentAt", r.FirstRequestTimestamp.Time())
	e.AddTime("lastRequestSentAt", r.LastRequestTimestamp.Time())
	e.AddTime("firstResponseReceivedAt", r.FirstResponseTimestamp.Time())
	e.AddTime("lastResponseReceivedAt", r.LastResponseTimestamp.Time())
	e.AddTime("firstRequestReceivedAt", r.FirstRequestReceivedTimestamp.Time())
	e.AddTime("lastRequestReceivedAt", r.LastRequestReceivedTimestamp.Time())

	return nil
}
