// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rtc

import (
	"errors"
	"sync"

	"github.com/livekit/protocol/livekit"
	"github.com/livekit/protocol/logger"
	"golang.org/x/exp/maps"

	"github.com/livekit/livekit-server/pkg/rtc/types"
	"github.com/livekit/protocol/utils"
)

var (
	ErrSubscriptionPermissionNeedsId = errors.New("either participant identity or SID needed")
)

type UpTrackManagerParams struct {
	Logger           logger.Logger
	VersionGenerator utils.TimedVersionGenerator
}

// UpTrackManager manages all uptracks from a participant
type UpTrackManager struct {
	// utils.TimedVersion is a atomic. To be correctly aligned also on 32bit archs
	// 64it atomics need to be at the front of a struct
	subscriptionPermissionVersion utils.TimedVersion

	params UpTrackManagerParams

	closed bool

	// publishedTracks that participant is publishing
	publishedTracks        map[livekit.TrackID]types.MediaTrack
	subscriptionPermission *livekit.SubscriptionPermission
	// subscriber permission for published tracks
	subscriberPermissions map[livekit.ParticipantIdentity]*livekit.TrackPermission // subscriberIdentity => *livekit.TrackPermission

	lock sync.RWMutex

	// callbacks & handlers
	onClose        func()
	onTrackUpdated func(track types.MediaTrack)
}

func NewUpTrackManager(params UpTrackManagerParams) *UpTrackManager {
	return &UpTrackManager{
		params:          params,
		publishedTracks: make(map[livekit.TrackID]types.MediaTrack),
	}
}

func (u *UpTrackManager) Close(isExpectedToResume bool) {
	u.lock.Lock()
	if u.closed {
		u.lock.Unlock()
		return
	}

	u.closed = true

	publishedTracks := u.publishedTracks
	u.publishedTracks = make(map[livekit.TrackID]types.MediaTrack)
	u.lock.Unlock()

	for _, t := range publishedTracks {
		t.Close(isExpectedToResume)
	}

	if onClose := u.getOnUpTrackManagerClose(); onClose != nil {
		onClose()
	}
}

func (u *UpTrackManager) OnUpTrackManagerClose(f func()) {
	u.lock.Lock()
	u.onClose = f
	u.lock.Unlock()
}

func (u *UpTrackManager) getOnUpTrackManagerClose() func() {
	u.lock.RLock()
	defer u.lock.RUnlock()

	return u.onClose
}

func (u *UpTrackManager) ToProto() []*livekit.TrackInfo {
	u.lock.RLock()
	defer u.lock.RUnlock()

	var trackInfos []*livekit.TrackInfo
	for _, t := range u.publishedTracks {
		trackInfos = append(trackInfos, t.ToProto())
	}

	return trackInfos
}

func (u *UpTrackManager) OnPublishedTrackUpdated(f func(track types.MediaTrack)) {
	u.onTrackUpdated = f
}

func (u *UpTrackManager) SetPublishedTrackMuted(trackID livekit.TrackID, muted bool) (types.MediaTrack, bool) {
	changed := false
	track := u.GetPublishedTrack(trackID)
	if track != nil {
		currentMuted := track.IsMuted()
		track.SetMuted(muted)

		if currentMuted != track.IsMuted() {
			changed = true
			u.params.Logger.Debugw("publisher mute status changed", "trackID", trackID, "muted", track.IsMuted())
			if u.onTrackUpdated != nil {
				u.onTrackUpdated(track)
			}
		}
	}

	return track, changed
}

func (u *UpTrackManager) GetPublishedTrack(trackID livekit.TrackID) types.MediaTrack {
	u.lock.RLock()
	defer u.lock.RUnlock()

	return u.getPublishedTrackLocked(trackID)
}

func (u *UpTrackManager) GetPublishedTracks() []types.MediaTrack {
	u.lock.RLock()
	defer u.lock.RUnlock()

	return maps.Values(u.publishedTracks)
}

func (u *UpTrackManager) UpdateSubscriptionPermission(
	subscriptionPermission *livekit.SubscriptionPermission,
	timedVersion utils.TimedVersion,
	resolverBySid func(participantID livekit.ParticipantID) types.LocalParticipant,
) error {
	u.lock.Lock()
	if !timedVersion.IsZero() {
		// it's possible for permission updates to come from another node. In that case
		// they would be the authority for this participant's permissions
		// we do not want to initialize subscriptionPermissionVersion too early since if another machine is the
		// owner for the data, we'd prefer to use their TimedVersion
		// ignore older version
		if !timedVersion.After(u.subscriptionPermissionVersion) {
			u.params.Logger.Debugw(
				"skipping older subscription permission version",
				"existingValue", logger.Proto(u.subscriptionPermission),
				"existingVersion", &u.subscriptionPermissionVersion,
				"requestingValue", logger.Proto(subscriptionPermission),
				"requestingVersion", &timedVersion,
			)
			u.lock.Unlock()
			return nil
		}
		u.subscriptionPermissionVersion.Update(timedVersion)
	} else {
		// for requests coming from the current node, use local versions
		u.subscriptionPermissionVersion.Update(u.params.VersionGenerator.Next())
	}

	// store as is for use when migrating
	u.subscriptionPermission = subscriptionPermission
	if subscriptionPermission == nil {
		u.params.Logger.Debugw(
			"updating subscription permission, setting to nil",
			"version", u.subscriptionPermissionVersion,
		)
		// possible to get a nil when migrating
		u.lock.Unlock()
		return nil
	}

	u.params.Logger.Debugw(
		"updating subscription permission",
		"permissions", logger.Proto(u.subscriptionPermission),
		"version", u.subscriptionPermissionVersion,
	)
	if err := u.parseSubscriptionPermissionsLocked(subscriptionPermission, func(pID livekit.ParticipantID) types.LocalParticipant {
		u.lock.Unlock()
		var p types.LocalParticipant
		if resolverBySid != nil {
			p = resolverBySid(pID)
		}
		u.lock.Lock()
		return p
	}); err != nil {
		// when failed, do not override previous permissions
		u.params.Logger.Errorw("failed updating subscription permission", err)
		u.lock.Unlock()
		return err
	}
	u.lock.Unlock()

	u.maybeRevokeSubscriptions()

	return nil
}

func (u *UpTrackManager) SubscriptionPermission() (*livekit.SubscriptionPermission, utils.TimedVersion) {
	u.lock.RLock()
	defer u.lock.RUnlock()

	if u.subscriptionPermissionVersion.IsZero() {
		return nil, u.subscriptionPermissionVersion.Load()
	}

	return u.subscriptionPermission, u.subscriptionPermissionVersion.Load()
}

func (u *UpTrackManager) HasPermission(trackID livekit.TrackID, subIdentity livekit.ParticipantIdentity) bool {
	u.lock.RLock()
	defer u.lock.RUnlock()

	return u.hasPermissionLocked(trackID, subIdentity)
}

func (u *UpTrackManager) UpdatePublishedAudioTrack(update *livekit.UpdateLocalAudioTrack) types.MediaTrack {
	track := u.GetPublishedTrack(livekit.TrackID(update.TrackSid))
	if track != nil {
		track.UpdateAudioTrack(update)
		if u.onTrackUpdated != nil {
			u.onTrackUpdated(track)
		}
	}

	return track
}

func (u *UpTrackManager) UpdatePublishedVideoTrack(update *livekit.UpdateLocalVideoTrack) types.MediaTrack {
	track := u.GetPublishedTrack(livekit.TrackID(update.TrackSid))
	if track != nil {
		track.UpdateVideoTrack(update)
		if u.onTrackUpdated != nil {
			u.onTrackUpdated(track)
		}
	}

	return track
}

func (u *UpTrackManager) AddPublishedTrack(track types.MediaTrack) {
	u.lock.Lock()
	if _, ok := u.publishedTracks[track.ID()]; !ok {
		u.publishedTracks[track.ID()] = track
	}
	u.lock.Unlock()
	u.params.Logger.Debugw("added published track", "trackID", track.ID(), "trackInfo", logger.Proto(track.ToProto()))

	track.AddOnClose(func(_isExpectedToResume bool) {
		u.lock.Lock()
		delete(u.publishedTracks, track.ID())
		// not modifying subscription permissions, will get reset on next update from participant
		u.lock.Unlock()
	})
}

func (u *UpTrackManager) RemovePublishedTrack(track types.MediaTrack, isExpectedToResume bool) {
	track.Close(isExpectedToResume)

	u.lock.Lock()
	delete(u.publishedTracks, track.ID())
	u.lock.Unlock()
}

func (u *UpTrackManager) getPublishedTrackLocked(trackID livekit.TrackID) types.MediaTrack {
	return u.publishedTracks[trackID]
}

func (u *UpTrackManager) parseSubscriptionPermissionsLocked(
	subscriptionPermission *livekit.SubscriptionPermission,
	resolver func(participantID livekit.ParticipantID) types.LocalParticipant,
) error {
	// every update overrides the existing

	// all_participants takes precedence
	if subscriptionPermission.AllParticipants {
		// everything is allowed, nothing else to do
		u.subscriberPermissions = nil
		return nil
	}

	// per participant permissions
	subscriberPermissions := make(map[livekit.ParticipantIdentity]*livekit.TrackPermission)
	for _, trackPerms := range subscriptionPermission.TrackPermissions {
		subscriberIdentity := livekit.ParticipantIdentity(trackPerms.ParticipantIdentity)
		if subscriberIdentity == "" {
			if trackPerms.ParticipantSid == "" {
				return ErrSubscriptionPermissionNeedsId
			}

			sub := resolver(livekit.ParticipantID(trackPerms.ParticipantSid))
			if sub == nil {
				u.params.Logger.Warnw("could not find subscriber for permissions update", nil, "subscriberID", trackPerms.ParticipantSid)
				continue
			}

			subscriberIdentity = sub.Identity()
		} else {
			if trackPerms.ParticipantSid != "" {
				sub := resolver(livekit.ParticipantID(trackPerms.ParticipantSid))
				if sub != nil && sub.Identity() != subscriberIdentity {
					u.params.Logger.Errorw("participant identity mismatch", nil, "expected", subscriberIdentity, "got", sub.Identity())
				}
				if sub == nil {
					u.params.Logger.Warnw("could not find subscriber for permissions update", nil, "subscriberID", trackPerms.ParticipantSid)
				}
			}
		}

		subscriberPermissions[subscriberIdentity] = trackPerms
	}

	u.subscriberPermissions = subscriberPermissions

	return nil
}

func (u *UpTrackManager) hasPermissionLocked(trackID livekit.TrackID, subscriberIdentity livekit.ParticipantIdentity) bool {
	if u.subscriberPermissions == nil {
		return true
	}

	perms, ok := u.subscriberPermissions[subscriberIdentity]
	if !ok {
		return false
	}

	if perms.AllTracks {
		return true
	}

	for _, sid := range perms.TrackSids {
		if livekit.TrackID(sid) == trackID {
			return true
		}
	}

	return false
}

// returns a list of participants that are allowed to subscribe to the track. if nil is returned, it means everyone is
// allowed to subscribe to this track
func (u *UpTrackManager) getAllowedSubscribersLocked(trackID livekit.TrackID) []livekit.ParticipantIdentity {
	if u.subscriberPermissions == nil {
		return nil
	}

	allowed := make([]livekit.ParticipantIdentity, 0)
	for subscriberIdentity, perms := range u.subscriberPermissions {
		if perms.AllTracks {
			allowed = append(allowed, subscriberIdentity)
			continue
		}

		for _, sid := range perms.TrackSids {
			if livekit.TrackID(sid) == trackID {
				allowed = append(allowed, subscriberIdentity)
				break
			}
		}
	}

	return allowed
}

func (u *UpTrackManager) maybeRevokeSubscriptions() {
	u.lock.Lock()
	defer u.lock.Unlock()

	for trackID, track := range u.publishedTracks {
		allowed := u.getAllowedSubscribersLocked(trackID)
		if allowed == nil {
			// no restrictions
			continue
		}

		track.RevokeDisallowedSubscribers(allowed)
	}
}

func (u *UpTrackManager) DebugInfo() map[string]any {
	info := map[string]any{}
	publishedTrackInfo := make(map[livekit.TrackID]any)

	u.lock.RLock()
	for trackID, track := range u.publishedTracks {
		if mt, ok := track.(*MediaTrack); ok {
			publishedTrackInfo[trackID] = mt.DebugInfo()
		} else {
			publishedTrackInfo[trackID] = map[string]any{
				"ID":       track.ID(),
				"Kind":     track.Kind().String(),
				"PubMuted": track.IsMuted(),
			}
		}
	}
	u.lock.RUnlock()

	info["PublishedTracks"] = publishedTrackInfo

	return info
}

func (u *UpTrackManager) GetAudioLevel() (level float64, active bool) {
	level = 0
	for _, pt := range u.GetPublishedTracks() {
		if pt.Source() == livekit.TrackSource_MICROPHONE {
			tl, ta := pt.GetAudioLevel()
			if ta {
				active = true
				if tl > level {
					level = tl
				}
			}
		}
	}
	return
}
