// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package test

import (
	"context"
	"encoding/binary"
	"errors"
	"fmt"
	"net/http"
	"reflect"
	"strings"
	"testing"
	"time"

	"github.com/pion/sdp/v3"
	"github.com/pion/webrtc/v4"
	"github.com/stretchr/testify/require"
	"github.com/thoas/go-funk"
	"github.com/twitchtv/twirp"
	"go.uber.org/atomic"

	"github.com/livekit/protocol/auth"
	"github.com/livekit/protocol/codecs/mime"
	"github.com/livekit/protocol/livekit"
	"github.com/livekit/protocol/logger"

	"github.com/livekit/livekit-server/pkg/config"
	"github.com/livekit/livekit-server/pkg/rtc"
	"github.com/livekit/livekit-server/pkg/sfu/datachannel"
	"github.com/livekit/livekit-server/pkg/testutils"
	testclient "github.com/livekit/livekit-server/test/client"
)

const (
	waitTick    = 10 * time.Millisecond
	waitTimeout = 5 * time.Second
)

func TestClientCouldConnect(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
		return
	}

	_, finish := setupSingleNodeTest("TestClientCouldConnect")
	defer finish()

	for _, testRTCServicePath := range testRTCServicePaths {
		t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
			c1 := createRTCClient("c1", defaultServerPort, testRTCServicePath, nil)
			c2 := createRTCClient("c2", defaultServerPort, testRTCServicePath, nil)
			waitUntilConnected(t, c1, c2)

			// ensure they both see each other
			testutils.WithTimeout(t, func() string {
				if len(c1.RemoteParticipants()) == 0 {
					return "c1 did not see c2"
				}
				if len(c2.RemoteParticipants()) == 0 {
					return "c2 did not see c1"
				}
				return ""
			})
		})
	}
}

func TestClientConnectDuplicate(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
		return
	}

	_, finish := setupSingleNodeTest("TestClientConnectDuplicate")
	defer finish()

	for _, testRTCServicePath := range testRTCServicePaths {
		t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
			grant := &auth.VideoGrant{RoomJoin: true, Room: testRoom}
			grant.SetCanPublish(true)
			grant.SetCanSubscribe(true)
			token := joinTokenWithGrant("c1", grant)
			c1 := createRTCClientWithToken(token, defaultServerPort, testRTCServicePath, nil)

			// publish 2 tracks
			t1, err := c1.AddStaticTrack("audio/opus", "audio", "webcam")
			require.NoError(t, err)
			defer t1.Stop()
			t2, err := c1.AddStaticTrack("video/vp8", "video", "webcam")
			require.NoError(t, err)
			defer t2.Stop()

			c2 := createRTCClient("c2", defaultServerPort, testRTCServicePath, nil)
			waitUntilConnected(t, c1, c2)

			opts := &testclient.Options{
				Publish: "duplicate_connection",
			}
			testutils.WithTimeout(t, func() string {
				if len(c2.SubscribedTracks()) == 0 {
					return "c2 didn't subscribe to anything"
				}
				// should have received two tracks
				if len(c2.SubscribedTracks()[c1.ID()]) != 2 {
					return "c2 didn't subscribe to both tracks from c1"
				}

				// participant ID can be appended with '#..' . but should contain orig id as prefix
				tr1 := c2.SubscribedTracks()[c1.ID()][0]
				participantId1, _ := rtc.UnpackStreamID(tr1.StreamID())
				require.Equal(t, c1.ID(), participantId1)
				tr2 := c2.SubscribedTracks()[c1.ID()][1]
				participantId2, _ := rtc.UnpackStreamID(tr2.StreamID())
				require.Equal(t, c1.ID(), participantId2)
				return ""
			})

			c1Dup := createRTCClientWithToken(token, defaultServerPort, testRTCServicePath, opts)

			waitUntilConnected(t, c1Dup)

			t3, err := c1Dup.AddStaticTrack("video/vp8", "video", "webcam")
			require.NoError(t, err)
			defer t3.Stop()

			testutils.WithTimeout(t, func() string {
				if len(c2.SubscribedTracks()[c1Dup.ID()]) != 1 {
					return "c2 was not subscribed to track from duplicated c1"
				}

				tr3 := c2.SubscribedTracks()[c1Dup.ID()][0]
				participantId3, _ := rtc.UnpackStreamID(tr3.StreamID())
				require.Contains(t, c1Dup.ID(), participantId3)

				return ""
			})
		})
	}
}

func TestSinglePublisher(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
		return
	}

	s, finish := setupSingleNodeTest("TestSinglePublisher")
	defer finish()

	for _, testRTCServicePath := range testRTCServicePaths {
		t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
			c1 := createRTCClient("c1", defaultServerPort, testRTCServicePath, nil)
			c2 := createRTCClient("c2", defaultServerPort, testRTCServicePath, nil)
			waitUntilConnected(t, c1, c2)

			// publish an audio and video track and ensure clients receive it ok
			t1, err := c1.AddStaticTrack("audio/OPUS", "audio", "webcamaudio")
			require.NoError(t, err)
			defer t1.Stop()
			t2, err := c1.AddStaticTrack("video/vp8", "video", "webcamvideo")
			require.NoError(t, err)
			defer t2.Stop()

			testutils.WithTimeout(t, func() string {
				if len(c2.SubscribedTracks()) == 0 {
					return "c2 was not subscribed to anything"
				}
				// should have received two tracks
				if len(c2.SubscribedTracks()[c1.ID()]) != 2 {
					return "c2 didn't subscribe to both tracks from c1"
				}

				tr1 := c2.SubscribedTracks()[c1.ID()][0]
				participantId, _ := rtc.UnpackStreamID(tr1.StreamID())
				require.Equal(t, c1.ID(), participantId)
				return ""
			})
			// ensure mime type is received
			remoteC1 := c2.GetRemoteParticipant(c1.ID())
			audioTrack := funk.Find(remoteC1.Tracks, func(ti *livekit.TrackInfo) bool {
				return ti.Name == "webcamaudio"
			}).(*livekit.TrackInfo)
			require.Equal(t, "audio/opus", audioTrack.MimeType)

			// a new client joins and should get the initial stream
			c3 := createRTCClient("c3", defaultServerPort, testRTCServicePath, nil)

			// ensure that new client that has joined also received tracks
			waitUntilConnected(t, c3)
			testutils.WithTimeout(t, func() string {
				if len(c3.SubscribedTracks()) == 0 {
					return "c3 didn't subscribe to anything"
				}
				// should have received two tracks
				if len(c3.SubscribedTracks()[c1.ID()]) != 2 {
					return "c3 didn't subscribe to tracks from c1"
				}
				return ""
			})

			// ensure that the track ids are generated by server
			tracks := c3.SubscribedTracks()[c1.ID()]
			for _, tr := range tracks {
				require.True(t, strings.HasPrefix(tr.ID(), "TR_"), "track should begin with TR")
			}

			// when c3 disconnects, ensure subscriber is cleaned up correctly
			c3.Stop()

			testutils.WithTimeout(t, func() string {
				room := s.RoomManager().GetRoom(context.Background(), testRoom)
				p := room.GetParticipant("c1")
				require.NotNil(t, p)

				for _, t := range p.GetPublishedTracks() {
					if t.IsSubscriber(c3.ID()) {
						return "c3 was not a subscriber of c1's tracks"
					}
				}
				return ""
			})
		})
	}
}

func Test_WhenAutoSubscriptionDisabled_ClientShouldNotReceiveAnyPublishedTracks(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
		return
	}

	_, finish := setupSingleNodeTest("Test_WhenAutoSubscriptionDisabled_ClientShouldNotReceiveAnyPublishedTracks")
	defer finish()

	for _, testRTCServicePath := range testRTCServicePaths {
		t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
			opts := testclient.Options{AutoSubscribe: false}
			publisher := createRTCClient("publisher", defaultServerPort, testRTCServicePath, &opts)
			client := createRTCClient("client", defaultServerPort, testRTCServicePath, &opts)
			defer publisher.Stop()
			defer client.Stop()
			waitUntilConnected(t, publisher, client)

			track, err := publisher.AddStaticTrack("audio/opus", "audio", "webcam")
			require.NoError(t, err)
			defer track.Stop()

			time.Sleep(syncDelay)

			require.Empty(t, client.SubscribedTracks()[publisher.ID()])
		})
	}
}

func Test_RenegotiationWithDifferentCodecs(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
		return
	}

	_, finish := setupSingleNodeTest("TestRenegotiationWithDifferentCodecs")
	defer finish()

	for _, testRTCServicePath := range testRTCServicePaths {
		t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
			c1 := createRTCClient("c1", defaultServerPort, testRTCServicePath, nil)
			c2 := createRTCClient("c2", defaultServerPort, testRTCServicePath, nil)
			waitUntilConnected(t, c1, c2)

			// publish a vp8 video track and ensure clients receive it ok
			t1, err := c1.AddStaticTrack("audio/opus", "audio", "webcam")
			require.NoError(t, err)
			defer t1.Stop()
			t2, err := c1.AddStaticTrack("video/vp8", "video", "webcam")
			require.NoError(t, err)
			defer t2.Stop()

			testutils.WithTimeout(t, func() string {
				if len(c2.SubscribedTracks()) == 0 {
					return "c2 was not subscribed to anything"
				}
				// should have received two tracks
				if len(c2.SubscribedTracks()[c1.ID()]) != 2 {
					return "c2 was not subscribed to tracks from c1"
				}

				tracks := c2.SubscribedTracks()[c1.ID()]
				for _, t := range tracks {
					if mime.IsMimeTypeStringVP8(t.Codec().MimeType) {
						return ""

					}
				}
				return "did not receive track with vp8"
			})

			t3, err := c1.AddStaticTrackWithCodec(webrtc.RTPCodecCapability{
				MimeType:    "video/h264",
				ClockRate:   90000,
				SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f",
			}, "videoscreen", "screen")
			defer t3.Stop()
			require.NoError(t, err)

			testutils.WithTimeout(t, func() string {
				if len(c2.SubscribedTracks()) == 0 {
					return "c2's not subscribed to anything"
				}
				// should have received three tracks
				if len(c2.SubscribedTracks()[c1.ID()]) != 3 {
					return "c2's not subscribed to 3 tracks from c1"
				}

				var vp8Found, h264Found bool
				tracks := c2.SubscribedTracks()[c1.ID()]
				for _, t := range tracks {
					if mime.IsMimeTypeStringVP8(t.Codec().MimeType) {
						vp8Found = true
					} else if mime.IsMimeTypeStringH264(t.Codec().MimeType) {
						h264Found = true
					}
				}
				if !vp8Found {
					return "did not receive track with vp8"
				}
				if !h264Found {
					return "did not receive track with h264"
				}
				return ""
			})
		})
	}
}

func TestSingleNodeRoomList(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
		return
	}
	_, finish := setupSingleNodeTest("TestSingleNodeRoomList")
	defer finish()

	roomServiceListRoom(t)
}

func TestSingleNodeUpdateParticipant(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
		return
	}
	_, finish := setupSingleNodeTest("TestSingleNodeRoomList")
	defer finish()

	adminCtx := contextWithToken(adminRoomToken(testRoom))
	t.Run("update nonexistent participant", func(t *testing.T) {
		_, err := roomClient.UpdateParticipant(adminCtx, &livekit.UpdateParticipantRequest{
			Room:     testRoom,
			Identity: "nonexistent",
			Permission: &livekit.ParticipantPermission{
				CanPublish: true,
			},
		})
		require.Error(t, err)
		var twErr twirp.Error
		require.True(t, errors.As(err, &twErr))
		require.Equal(t, twirp.NotFound, twErr.Code())
	})
}

// Ensure that CORS headers are returned
func TestSingleNodeCORS(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
		return
	}
	s, finish := setupSingleNodeTest("TestSingleNodeCORS")
	defer finish()

	req, err := http.NewRequest("POST", fmt.Sprintf("http://localhost:%d", s.HTTPPort()), nil)
	require.NoError(t, err)
	req.Header.Set("Authorization", "bearer xyz")
	req.Header.Set("Origin", "testhost.com")
	res, err := http.DefaultClient.Do(req)
	require.NoError(t, err)
	require.Equal(t, "testhost.com", res.Header.Get("Access-Control-Allow-Origin"))
}

func TestSingleNodeDoubleSlash(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
		return
	}
	s, finish := setupSingleNodeTest("TestSingleNodeDoubleSlash")
	defer finish()
	// client contains trailing slash in URL, causing path to contain double //
	// without our middleware, this would cause a 302 redirect
	roomClient = livekit.NewRoomServiceJSONClient(fmt.Sprintf("http://localhost:%d/", s.HTTPPort()), &http.Client{})
	_, err := roomClient.ListRooms(contextWithToken(listRoomToken()), &livekit.ListRoomsRequest{})
	require.NoError(t, err)
}

func TestPingPong(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
		return
	}
	_, finish := setupSingleNodeTest("TestPingPong")
	defer finish()

	for _, testRTCServicePath := range testRTCServicePaths {
		t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
			c1 := createRTCClient("c1", defaultServerPort, testRTCServicePath, nil)
			waitUntilConnected(t, c1)

			require.NoError(t, c1.SendPing())
			require.Eventually(t, func() bool {
				return c1.PongReceivedAt() > 0
			}, time.Second, 10*time.Millisecond)
		})
	}
}

func TestSingleNodeJoinAfterClose(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
		return
	}

	_, finish := setupSingleNodeTest("TestJoinAfterClose")
	defer finish()

	scenarioJoinClosedRoom(t)
}

func TestSingleNodeCloseNonRTCRoom(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
		return
	}

	_, finish := setupSingleNodeTest("closeNonRTCRoom")
	defer finish()

	closeNonRTCRoom(t)
}

func TestAutoCreate(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
		return
	}
	disableAutoCreate := func(conf *config.Config) {
		conf.Room.AutoCreate = false
	}
	t.Run("cannot join if room isn't created", func(t *testing.T) {
		s := createSingleNodeServer(disableAutoCreate)
		go func() {
			if err := s.Start(); err != nil {
				logger.Errorw("server returned error", err)
			}
		}()
		defer s.Stop(true)

		waitForServerToStart(s)

		for _, testRTCServicePath := range testRTCServicePaths {
			t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
				token := joinToken(testRoom, "start-before-create", nil)
				opts := &testclient.Options{}
				testRTCServicePathToTestClientOptions(testRTCServicePath, opts)
				_, err := testclient.NewWebSocketConn(
					fmt.Sprintf("ws://localhost:%d", defaultServerPort),
					token,
					opts,
				)
				require.Error(t, err)

				// second join should also fail
				token = joinToken(testRoom, "start-before-create-2", nil)
				_, err = testclient.NewWebSocketConn(
					fmt.Sprintf("ws://localhost:%d", defaultServerPort),
					token,
					opts,
				)
				require.Error(t, err)
			})
		}
	})

	t.Run("join with explicit createRoom", func(t *testing.T) {
		s := createSingleNodeServer(disableAutoCreate)
		go func() {
			if err := s.Start(); err != nil {
				logger.Errorw("server returned error", err)
			}
		}()
		defer s.Stop(true)

		waitForServerToStart(s)

		// explicitly create
		_, err := roomClient.CreateRoom(contextWithToken(createRoomToken()), &livekit.CreateRoomRequest{Name: testRoom})
		require.NoError(t, err)

		for _, testRTCServicePath := range testRTCServicePaths {
			t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
				c1 := createRTCClient("join-after-create", defaultServerPort, testRTCServicePath, nil)
				waitUntilConnected(t, c1)

				c1.Stop()
			})
		}
	})
}

// don't give user subscribe permissions initially, and ensure autosubscribe is triggered afterwards
func TestSingleNodeUpdateSubscriptionPermissions(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
		return
	}
	_, finish := setupSingleNodeTest("TestSingleNodeUpdateSubscriptionPermissions")
	defer finish()

	for _, testRTCServicePath := range testRTCServicePaths {
		t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
			pub := createRTCClient("pub", defaultServerPort, testRTCServicePath, nil)

			grant := &auth.VideoGrant{RoomJoin: true, Room: testRoom}
			grant.SetCanSubscribe(false)
			at := auth.NewAccessToken(testApiKey, testApiSecret).
				AddGrant(grant).
				SetIdentity("sub")
			token, err := at.ToJWT()
			require.NoError(t, err)
			sub := createRTCClientWithToken(token, defaultServerPort, testRTCServicePath, nil)

			waitUntilConnected(t, pub, sub)

			writers := publishTracksForClients(t, pub)
			defer stopWriters(writers...)

			// wait sub receives tracks
			testutils.WithTimeout(t, func() string {
				pubRemote := sub.GetRemoteParticipant(pub.ID())
				if pubRemote == nil {
					return "could not find remote publisher"
				}
				if len(pubRemote.Tracks) != 2 {
					return "did not receive metadata for published tracks"
				}
				return ""
			})

			// set permissions out of band
			ctx := contextWithToken(adminRoomToken(testRoom))
			_, err = roomClient.UpdateParticipant(ctx, &livekit.UpdateParticipantRequest{
				Room:     testRoom,
				Identity: "sub",
				Permission: &livekit.ParticipantPermission{
					CanSubscribe: true,
					CanPublish:   true,
				},
			})
			require.NoError(t, err)

			testutils.WithTimeout(t, func() string {
				tracks := sub.SubscribedTracks()[pub.ID()]
				if len(tracks) == 2 {
					return ""
				} else {
					return fmt.Sprintf("expected 2 tracks subscribed, actual: %d", len(tracks))
				}
			})
		})
	}
}

func TestSingleNodeAttributes(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
		return
	}
	_, finish := setupSingleNodeTest("TestSingleNodeAttributes")
	defer finish()

	for _, testRTCServicePath := range testRTCServicePaths {
		t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
			pub := createRTCClient("pub", defaultServerPort, testRTCServicePath, &testclient.Options{
				Attributes: map[string]string{
					"b": "2",
					"c": "3",
				},
				TokenCustomizer: func(token *auth.AccessToken, grants *auth.VideoGrant) {
					T := true
					grants.CanUpdateOwnMetadata = &T
					token.SetAttributes(map[string]string{
						"a": "0",
						"b": "1",
					})
				},
			})

			grant := &auth.VideoGrant{RoomJoin: true, Room: testRoom}
			grant.SetCanSubscribe(false)
			at := auth.NewAccessToken(testApiKey, testApiSecret).
				SetVideoGrant(grant).
				SetIdentity("sub")
			token, err := at.ToJWT()
			require.NoError(t, err)
			sub := createRTCClientWithToken(token, defaultServerPort, testRTCServicePath, nil)

			waitUntilConnected(t, pub, sub)

			// wait sub receives initial attributes
			testutils.WithTimeout(t, func() string {
				pubRemote := sub.GetRemoteParticipant(pub.ID())
				if pubRemote == nil {
					return "could not find remote publisher"
				}
				attrs := pubRemote.Attributes
				if !reflect.DeepEqual(attrs, map[string]string{
					"a": "0",
					"b": "2",
					"c": "3",
				}) {
					return fmt.Sprintf("did not receive expected attributes: %v", attrs)
				}
				return ""
			})
		})
	}
}

// TestDeviceCodecOverride checks that codecs that are incompatible with a device is not
// negotiated by the server
func TestDeviceCodecOverride(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
		return
	}

	_, finish := setupSingleNodeTest("TestDeviceCodecOverride")
	defer finish()

	for _, testRTCServicePath := range testRTCServicePaths {
		t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
			// simulate device that isn't compatible with H.264
			c1 := createRTCClient("c1", defaultServerPort, testRTCServicePath, &testclient.Options{
				ClientInfo: &livekit.ClientInfo{
					Os:          "android",
					DeviceModel: "Xiaomi 2201117TI",
				},
			})
			defer c1.Stop()
			waitUntilConnected(t, c1)

			// it doesn't really matter what the codec set here is, uses default Pion MediaEngine codecs
			tw, err := c1.AddStaticTrack("video/h264", "video", "webcam")
			require.NoError(t, err)
			defer stopWriters(tw)

			var desc *sdp.MediaDescription
			require.Eventually(t, func() bool {
				lastAnswer := c1.LastAnswer()
				if lastAnswer == nil {
					return false
				}

				sd := webrtc.SessionDescription{
					Type: webrtc.SDPTypeAnswer,
					SDP:  lastAnswer.SDP,
				}
				answer, err := sd.Unmarshal()
				require.NoError(t, err)

				// video and data channel
				if len(answer.MediaDescriptions) < 2 {
					return false
				}

				for _, md := range answer.MediaDescriptions {
					if md.MediaName.Media == "video" {
						desc = md
						break
					}
				}
				return desc != nil
			}, waitTimeout, waitTick, "did not receive answer")

			hasSeenVP8 := false
			for _, a := range desc.Attributes {
				if a.Key == "rtpmap" {
					require.NotContains(t, a.Value, mime.MimeTypeCodecH264.String(), "should not contain H264 codec")
					if strings.Contains(a.Value, mime.MimeTypeCodecVP8.String()) {
						hasSeenVP8 = true
					}
				}
			}
			require.True(t, hasSeenVP8, "should have seen VP8 codec in SDP")
		})
	}
}

func TestSubscribeToCodecUnsupported(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
		return
	}

	_, finish := setupSingleNodeTest("TestSubscribeToCodecUnsupported")
	defer finish()

	for _, testRTCServicePath := range testRTCServicePaths {
		t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
			c1 := createRTCClient("c1", defaultServerPort, testRTCServicePath, nil)
			// create a client that doesn't support H264
			c2 := createRTCClient("c2", defaultServerPort, testRTCServicePath, &testclient.Options{
				AutoSubscribe: true,
				DisabledCodecs: []webrtc.RTPCodecCapability{
					{MimeType: "video/H264"},
				},
			})
			waitUntilConnected(t, c1, c2)

			// publish a vp8 video track and ensure c2 receives it ok
			t1, err := c1.AddStaticTrack("audio/opus", "audio", "webcam")
			require.NoError(t, err)
			defer t1.Stop()
			t2, err := c1.AddStaticTrack("video/vp8", "video", "webcam")
			require.NoError(t, err)
			defer t2.Stop()

			testutils.WithTimeout(t, func() string {
				if len(c2.SubscribedTracks()) == 0 {
					return "c2 was not subscribed to anything"
				}
				// should have received two tracks
				if len(c2.SubscribedTracks()[c1.ID()]) != 2 {
					return "c2 was not subscribed to tracks from c1"
				}

				tracks := c2.SubscribedTracks()[c1.ID()]
				for _, t := range tracks {
					if mime.IsMimeTypeStringVP8(t.Codec().MimeType) {
						return ""
					}
				}
				return "did not receive track with vp8"
			})
			require.Nil(t, c2.GetSubscriptionResponseAndClear())

			// publish a h264 track and ensure c2 got subscription error
			t3, err := c1.AddStaticTrackWithCodec(webrtc.RTPCodecCapability{
				MimeType:    "video/h264",
				ClockRate:   90000,
				SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f",
			}, "videoscreen", "screen")
			defer t3.Stop()
			require.NoError(t, err)

			var h264TrackID string
			require.Eventually(t, func() bool {
				remoteC1 := c2.GetRemoteParticipant(c1.ID())
				require.NotNil(t, remoteC1)
				for _, track := range remoteC1.Tracks {
					if mime.IsMimeTypeStringH264(track.MimeType) {
						h264TrackID = track.Sid
						return true
					}
				}
				return false
			}, time.Second, 10*time.Millisecond, "did not receive track info with h264")

			require.Eventually(t, func() bool {
				sr := c2.GetSubscriptionResponseAndClear()
				if sr == nil {
					return false
				}
				require.Equal(t, h264TrackID, sr.TrackSid)
				require.Equal(t, livekit.SubscriptionError_SE_CODEC_UNSUPPORTED, sr.Err)
				return true
			}, 5*time.Second, 10*time.Millisecond, "did not receive subscription response")

			// publish another vp8 track again, ensure the transport recovered by sfu and c2 can receive it
			t4, err := c1.AddStaticTrack("video/vp8", "video2", "webcam2")
			require.NoError(t, err)
			defer t4.Stop()

			testutils.WithTimeout(t, func() string {
				if len(c2.SubscribedTracks()) == 0 {
					return "c2 was not subscribed to anything"
				}
				// should have received two tracks
				if len(c2.SubscribedTracks()[c1.ID()]) != 3 {
					return "c2 was not subscribed to tracks from c1"
				}

				var vp8Count int
				tracks := c2.SubscribedTracks()[c1.ID()]
				for _, t := range tracks {
					if mime.IsMimeTypeStringVP8(t.Codec().MimeType) {
						vp8Count++
					}
				}
				if vp8Count == 2 {
					return ""
				}
				return "did not 2 receive track with vp8"
			})
			require.Nil(t, c2.GetSubscriptionResponseAndClear())
		})
	}
}

func TestDataPublishSlowSubscriber(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
		return
	}

	dataChannelSlowThreshold := 21024

	logger.Infow("----------------STARTING TEST----------------", "test", t.Name())
	s := createSingleNodeServer(func(c *config.Config) {
		c.RTC.DatachannelSlowThreshold = dataChannelSlowThreshold
	})
	go func() {
		if err := s.Start(); err != nil {
			logger.Errorw("server returned error", err)
		}
	}()

	waitForServerToStart(s)

	defer func() {
		s.Stop(true)
		logger.Infow("----------------FINISHING TEST----------------", "test", t.Name())
	}()

	for _, testRTCServicePath := range testRTCServicePaths {
		t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
			pub := createRTCClient("pub", defaultServerPort, testRTCServicePath, nil)
			fastSub := createRTCClient("fastSub", defaultServerPort, testRTCServicePath, nil)
			slowSubNotDrop := createRTCClient("slowSubNotDrop", defaultServerPort, testRTCServicePath, nil)
			slowSubDrop := createRTCClient("slowSubDrop", defaultServerPort, testRTCServicePath, nil)
			waitUntilConnected(t, pub, fastSub, slowSubDrop, slowSubNotDrop)
			defer func() {
				pub.Stop()
				fastSub.Stop()
				slowSubNotDrop.Stop()
				slowSubDrop.Stop()
			}()

			// no data should be dropped for fast subscriber
			var fastDataIndex atomic.Uint64
			fastSub.OnDataReceived = func(data []byte, sid string) {
				idx := binary.BigEndian.Uint64(data[len(data)-8:])
				require.Equal(t, fastDataIndex.Load()+1, idx)
				fastDataIndex.Store(idx)
			}

			// no data should be dropped for slow subscriber that is above threshold
			var slowNoDropDataIndex atomic.Uint64
			var drainSlowSubNotDrop atomic.Bool
			slowNoDropReader := testclient.NewDataChannelReader(dataChannelSlowThreshold * 2)
			slowSubNotDrop.OnDataReceived = func(data []byte, sid string) {
				idx := binary.BigEndian.Uint64(data[len(data)-8:])
				require.Equal(t, slowNoDropDataIndex.Load()+1, idx)
				slowNoDropDataIndex.Store(idx)
				if !drainSlowSubNotDrop.Load() {
					slowNoDropReader.Read(data, sid)
				}
			}

			// data should be dropped for slow subscriber that is below threshold
			var slowDropDataIndex atomic.Uint64
			dropped := make(chan struct{})
			slowDropReader := testclient.NewDataChannelReader(dataChannelSlowThreshold / 2)
			slowSubDrop.OnDataReceived = func(data []byte, sid string) {
				select {
				case <-dropped:
					return
				default:
				}
				idx := binary.BigEndian.Uint64(data[len(data)-8:])
				if idx != slowDropDataIndex.Load()+1 {
					close(dropped)
				}
				slowDropDataIndex.Store(idx)
				slowDropReader.Read(data, sid)
			}

			// publisher sends data as fast as possible, it will block by the slowest subscriber above the slow threshold
			var (
				blocked   atomic.Bool
				stopWrite atomic.Bool
				writeIdx  atomic.Uint64
			)
			writeStopped := make(chan struct{})
			go func() {
				defer close(writeStopped)
				var i int
				buf := make([]byte, 100)
				for !stopWrite.Load() {
					i++
					binary.BigEndian.PutUint64(buf[len(buf)-8:], uint64(i))
					if err := pub.PublishData(buf, livekit.DataPacket_RELIABLE); err != nil {
						if errors.Is(err, datachannel.ErrDataDroppedBySlowReader) {
							blocked.Store(true)
							i--
							continue
						} else {
							t.Log("error writing", err)
							break
						}
					}
					writeIdx.Store(uint64(i))
				}
			}()

			<-dropped

			time.Sleep(time.Second)
			blocked.Store(false)
			require.Eventually(t, func() bool { return blocked.Load() }, 30*time.Second, 100*time.Millisecond)
			stopWrite.Store(true)
			<-writeStopped
			drainSlowSubNotDrop.Store(true)
			require.Eventually(t, func() bool {
				return writeIdx.Load() == fastDataIndex.Load() &&
					writeIdx.Load() == slowNoDropDataIndex.Load()
			}, 10*time.Second, 50*time.Millisecond, "writeIdx %d, fast %d, slowNoDrop %d", writeIdx.Load(), fastDataIndex.Load(), slowNoDropDataIndex.Load())
		})
	}
}

func TestFireTrackBySdp(t *testing.T) {
	_, finish := setupSingleNodeTest("TestFireTrackBySdp")
	defer finish()

	var cases = []struct {
		name   string
		codecs []webrtc.RTPCodecCapability
		pubSDK livekit.ClientInfo_SDK
	}{
		{
			name: "js client could pub a/v tracks",
			codecs: []webrtc.RTPCodecCapability{
				{MimeType: mime.MimeTypeH264.String()},
				{MimeType: mime.MimeTypeOpus.String()},
			},
			pubSDK: livekit.ClientInfo_JS,
		},
		{
			name: "go client could pub audio tracks",
			codecs: []webrtc.RTPCodecCapability{
				{MimeType: "audio/opus"},
			},
			pubSDK: livekit.ClientInfo_GO,
		},
	}

	for _, c := range cases {
		codecs, sdk := c.codecs, c.pubSDK
		t.Run(c.name, func(t *testing.T) {
			for _, testRTCServicePath := range testRTCServicePaths {
				t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
					c1 := createRTCClient(c.name+"_c1", defaultServerPort, testRTCServicePath, &testclient.Options{
						ClientInfo: &livekit.ClientInfo{
							Sdk: sdk,
						},
					})
					c2 := createRTCClient(c.name+"_c2", defaultServerPort, testRTCServicePath, &testclient.Options{
						AutoSubscribe: true,
						ClientInfo: &livekit.ClientInfo{
							Sdk: livekit.ClientInfo_JS,
						},
					})
					waitUntilConnected(t, c1, c2)
					defer func() {
						c1.Stop()
						c2.Stop()
					}()

					// publish tracks and don't write any packets
					for _, codec := range codecs {
						_, err := c1.AddStaticTrackWithCodec(codec, codec.MimeType, codec.MimeType, testclient.AddTrackNoWriter())
						require.NoError(t, err)
					}

					require.Eventually(t, func() bool {
						return len(c2.SubscribedTracks()[c1.ID()]) == len(codecs)
					}, 5*time.Second, 10*time.Millisecond)

					var found int
					for _, pubTrack := range c1.GetPublishedTrackIDs() {
						t.Log("pub track", pubTrack)
						tracks := c2.SubscribedTracks()[c1.ID()]
						for _, track := range tracks {
							t.Log("sub track", track.ID(), track.Codec())
							if track.Codec().PayloadType == 0 && track.ID() == pubTrack {
								found++
								break
							}
						}
					}
					require.Equal(t, len(codecs), found)
				})
			}
		})
	}
}

func TestSinglePublisherDataTrack(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
		return
	}

	s, finish := setupSingleNodeTest("TestSinglePublisherDataTrack")
	defer finish()

	for _, testRTCServicePath := range testRTCServicePaths {
		t.Run(fmt.Sprintf("testRTCServicePath=%s", testRTCServicePath.String()), func(t *testing.T) {
			c1 := createRTCClient("c1", defaultServerPort, testRTCServicePath, &testclient.Options{AutoSubscribeDataTrack: true})
			c2 := createRTCClient("c2", defaultServerPort, testRTCServicePath, &testclient.Options{AutoSubscribeDataTrack: true})
			waitUntilConnected(t, c1, c2)

			// publish a couple of data tracks and ensure clients receive it ok
			dt1, err := c1.PublishDataTrack()
			require.NoError(t, err)
			defer dt1.Stop()

			dt2, err := c1.PublishDataTrack()
			require.NoError(t, err)
			defer dt2.Stop()

			testutils.WithTimeout(t, func() string {
				if len(c2.SubscribedDataTracks()) == 0 {
					return "c2 was not subscribed to any data tracks"
				}
				// should have received two data tracks
				if len(c2.SubscribedDataTracks()[c1.ID()]) != 2 {
					return "c2 didn't subscribe to both data tracks from c1"
				}
				return ""
			})

			// a new client joins and should get the initial stream
			c3 := createRTCClient("c3", defaultServerPort, testRTCServicePath, &testclient.Options{AutoSubscribeDataTrack: true})

			// ensure that new client that has joined also received data tracks
			waitUntilConnected(t, c3)
			testutils.WithTimeout(t, func() string {
				if len(c3.SubscribedDataTracks()) == 0 {
					return "c3 didn't subscribe to any data tracks"
				}
				// should have received two data tracks
				if len(c3.SubscribedDataTracks()[c1.ID()]) != 2 {
					return "c3 didn't subscribe to tracks from c1"
				}
				return ""
			})

			// ensure that the data track ids are generated by server
			tracks := c3.SubscribedDataTracks()[c1.ID()]
			for _, tr := range tracks {
				require.True(t, strings.HasPrefix(string(tr.ID()), "DTR_"), "data track should begin with DTR")
			}

			// when c3 disconnects, ensure subscriber is cleaned up correctly
			c3.Stop()

			testutils.WithTimeout(t, func() string {
				room := s.RoomManager().GetRoom(context.Background(), testRoom)
				p := room.GetParticipant("c1")
				require.NotNil(t, p)

				for _, t := range p.GetPublishedDataTracks() {
					if t.IsSubscriber(c3.ID()) {
						return "c3 was not a subscriber of c1's data tracks"
					}
				}
				return ""
			})
		})
	}
}
