// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package telemetry

import (
	"context"
	"time"

	"google.golang.org/protobuf/types/known/timestamppb"

	"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
	"github.com/livekit/protocol/codecs/mime"
	"github.com/livekit/protocol/egress"
	"github.com/livekit/protocol/livekit"
	"github.com/livekit/protocol/logger"
	"github.com/livekit/protocol/utils/guid"
	"github.com/livekit/protocol/webhook"
)

func (t *telemetryService) NotifyEvent(ctx context.Context, event *livekit.WebhookEvent, opts ...webhook.NotifyOption) {
	if t.notifier == nil {
		return
	}

	event.CreatedAt = time.Now().Unix()
	event.Id = guid.New("EV_")

	if err := t.notifier.QueueNotify(ctx, event, opts...); err != nil {
		logger.Warnw("failed to notify webhook", err, "event", event.Event)
	}
}

func (t *telemetryService) RoomStarted(ctx context.Context, room *livekit.Room) {
	t.enqueue(func() {
		t.NotifyEvent(ctx, &livekit.WebhookEvent{
			Event: webhook.EventRoomStarted,
			Room:  room,
		})

		t.SendEvent(ctx, &livekit.AnalyticsEvent{
			Type:      livekit.AnalyticsEventType_ROOM_CREATED,
			Timestamp: &timestamppb.Timestamp{Seconds: room.CreationTime},
			Room:      room,
		})
	})
}

func (t *telemetryService) RoomEnded(ctx context.Context, room *livekit.Room) {
	t.enqueue(func() {
		t.NotifyEvent(ctx, &livekit.WebhookEvent{
			Event: webhook.EventRoomFinished,
			Room:  room,
		})

		t.SendEvent(ctx, &livekit.AnalyticsEvent{
			Type:      livekit.AnalyticsEventType_ROOM_ENDED,
			Timestamp: timestamppb.Now(),
			RoomId:    room.Sid,
			Room:      room,
		})
	})
}

func (t *telemetryService) ParticipantJoined(
	ctx context.Context,
	room *livekit.Room,
	participant *livekit.ParticipantInfo,
	clientInfo *livekit.ClientInfo,
	clientMeta *livekit.AnalyticsClientMeta,
	shouldSendEvent bool,
	guard *ReferenceGuard,
) {
	t.enqueue(func() {
		_, found := t.getOrCreateWorker(
			ctx,
			livekit.RoomID(room.Sid),
			livekit.RoomName(room.Name),
			livekit.ParticipantID(participant.Sid),
			livekit.ParticipantIdentity(participant.Identity),
			guard,
		)
		if !found {
			prometheus.IncrementParticipantRtcConnected(1)
			prometheus.AddParticipant()
		}

		if shouldSendEvent {
			ev := newParticipantEvent(livekit.AnalyticsEventType_PARTICIPANT_JOINED, room, participant)
			ev.ClientInfo = clientInfo
			ev.ClientMeta = clientMeta
			t.SendEvent(ctx, ev)
		}
	})
}

func (t *telemetryService) ParticipantActive(
	ctx context.Context,
	room *livekit.Room,
	participant *livekit.ParticipantInfo,
	clientMeta *livekit.AnalyticsClientMeta,
	isMigration bool,
	guard *ReferenceGuard,
) {
	t.enqueue(func() {
		if !isMigration {
			// a participant is considered "joined" only when they become "active"
			t.NotifyEvent(ctx, &livekit.WebhookEvent{
				Event:       webhook.EventParticipantJoined,
				Room:        room,
				Participant: participant,
			})
		}

		worker, found := t.getOrCreateWorker(
			ctx,
			livekit.RoomID(room.Sid),
			livekit.RoomName(room.Name),
			livekit.ParticipantID(participant.Sid),
			livekit.ParticipantIdentity(participant.Identity),
			guard,
		)
		if !found {
			prometheus.AddParticipant()
		}
		worker.SetConnected()

		ev := newParticipantEvent(livekit.AnalyticsEventType_PARTICIPANT_ACTIVE, room, participant)
		ev.ClientMeta = clientMeta
		t.SendEvent(ctx, ev)
	})
}

func (t *telemetryService) ParticipantResumed(
	ctx context.Context,
	room *livekit.Room,
	participant *livekit.ParticipantInfo,
	nodeID livekit.NodeID,
	reason livekit.ReconnectReason,
) {
	t.enqueue(func() {
		// create a worker if needed.
		//
		// Signalling channel stats collector and media channel stats collector could both call
		// ParticipantJoined and ParticipantLeft.
		//
		// On a resume, the signalling channel collector would call `ParticipantLeft` which would close
		// the corresponding participant's stats worker.
		//
		// So, on a successful resume, create the worker if needed.
		_, found := t.getOrCreateWorker(
			ctx,
			livekit.RoomID(room.Sid),
			livekit.RoomName(room.Name),
			livekit.ParticipantID(participant.Sid),
			livekit.ParticipantIdentity(participant.Identity),
			nil,
		)
		if !found {
			prometheus.AddParticipant()
		}

		ev := newParticipantEvent(livekit.AnalyticsEventType_PARTICIPANT_RESUMED, room, participant)
		ev.ClientMeta = &livekit.AnalyticsClientMeta{
			Node:            string(nodeID),
			ReconnectReason: reason,
		}
		t.SendEvent(ctx, ev)
	})
}

func (t *telemetryService) ParticipantLeft(ctx context.Context,
	room *livekit.Room,
	participant *livekit.ParticipantInfo,
	shouldSendEvent bool,
	guard *ReferenceGuard,
) {
	t.enqueue(func() {
		isConnected := false
		if worker, ok := t.getWorker(livekit.RoomID(room.Sid), livekit.ParticipantID(participant.Sid)); ok {
			isConnected = worker.IsConnected()
			if worker.Close(guard) {
				prometheus.SubParticipant()
			} else {
				logger.Infow(
					"stats worker active",
					"room", room.Name,
					"roomID", room.Sid,
					"participant", participant.Identity,
					"pID", participant.Sid,
					"worker", worker,
				)
			}
		}

		if shouldSendEvent {
			webhookEvent := webhook.EventParticipantLeft
			analyticsEvent := livekit.AnalyticsEventType_PARTICIPANT_LEFT
			if !isConnected {
				webhookEvent = webhook.EventParticipantConnectionAborted
				analyticsEvent = livekit.AnalyticsEventType_PARTICIPANT_CONNECTION_ABORTED
			}
			t.NotifyEvent(ctx, &livekit.WebhookEvent{
				Event:       webhookEvent,
				Room:        room,
				Participant: participant,
			})

			t.SendEvent(ctx, newParticipantEvent(analyticsEvent, room, participant))
		}
	})
}

func (t *telemetryService) TrackPublishRequested(
	ctx context.Context,
	roomID livekit.RoomID,
	roomName livekit.RoomName,
	participantID livekit.ParticipantID,
	identity livekit.ParticipantIdentity,
	track *livekit.TrackInfo,
) {
	t.enqueue(func() {
		prometheus.RecordTrackPublishAttempt(track.Type.String())
		room := toMinimalRoomProto(roomID, roomName)
		ev := newTrackEvent(livekit.AnalyticsEventType_TRACK_PUBLISH_REQUESTED, room, participantID, track)
		if ev.Participant != nil {
			ev.Participant.Identity = string(identity)
		}
		t.SendEvent(ctx, ev)
	})
}

func (t *telemetryService) TrackPublished(
	ctx context.Context,
	roomID livekit.RoomID,
	roomName livekit.RoomName,
	participantID livekit.ParticipantID,
	identity livekit.ParticipantIdentity,
	track *livekit.TrackInfo,
	shouldSendEvent bool,
) {
	t.enqueue(func() {
		prometheus.AddPublishedTrack(track.Type.String())
		prometheus.RecordTrackPublishSuccess(track.Type.String())
		if !shouldSendEvent {
			return
		}

		room := toMinimalRoomProto(roomID, roomName)
		participant := &livekit.ParticipantInfo{
			Sid:      string(participantID),
			Identity: string(identity),
		}
		t.NotifyEvent(ctx, &livekit.WebhookEvent{
			Event:       webhook.EventTrackPublished,
			Room:        room,
			Participant: participant,
			Track:       track,
		})

		ev := newTrackEvent(livekit.AnalyticsEventType_TRACK_PUBLISHED, room, participantID, track)
		ev.Participant = participant
		t.SendEvent(ctx, ev)
	})
}

func (t *telemetryService) TrackPublishedUpdate(
	ctx context.Context,
	roomID livekit.RoomID,
	roomName livekit.RoomName,
	participantID livekit.ParticipantID,
	track *livekit.TrackInfo,
) {
	t.enqueue(func() {
		room := toMinimalRoomProto(roomID, roomName)
		t.SendEvent(ctx, newTrackEvent(livekit.AnalyticsEventType_TRACK_PUBLISHED_UPDATE, room, participantID, track))
	})
}

func (t *telemetryService) TrackMaxSubscribedVideoQuality(
	ctx context.Context,
	roomID livekit.RoomID,
	roomName livekit.RoomName,
	participantID livekit.ParticipantID,
	track *livekit.TrackInfo,
	mime mime.MimeType,
	maxQuality livekit.VideoQuality,
) {
	t.enqueue(func() {
		room := toMinimalRoomProto(roomID, roomName)
		ev := newTrackEvent(livekit.AnalyticsEventType_TRACK_MAX_SUBSCRIBED_VIDEO_QUALITY, room, participantID, track)
		ev.MaxSubscribedVideoQuality = maxQuality
		ev.Mime = mime.String()
		t.SendEvent(ctx, ev)
	})
}

func (t *telemetryService) TrackSubscribeRequested(
	ctx context.Context,
	roomID livekit.RoomID,
	roomName livekit.RoomName,
	participantID livekit.ParticipantID,
	track *livekit.TrackInfo,
) {
	t.enqueue(func() {
		prometheus.RecordTrackSubscribeAttempt()

		room := toMinimalRoomProto(roomID, roomName)
		ev := newTrackEvent(livekit.AnalyticsEventType_TRACK_SUBSCRIBE_REQUESTED, room, participantID, track)
		t.SendEvent(ctx, ev)
	})
}

func (t *telemetryService) TrackSubscribed(
	ctx context.Context,
	roomID livekit.RoomID,
	roomName livekit.RoomName,
	participantID livekit.ParticipantID,
	track *livekit.TrackInfo,
	publisher *livekit.ParticipantInfo,
	shouldSendEvent bool,
) {
	t.enqueue(func() {
		prometheus.RecordTrackSubscribeSuccess(track.Type.String())

		if !shouldSendEvent {
			return
		}

		room := toMinimalRoomProto(roomID, roomName)
		ev := newTrackEvent(livekit.AnalyticsEventType_TRACK_SUBSCRIBED, room, participantID, track)
		ev.Publisher = publisher
		t.SendEvent(ctx, ev)
	})
}

func (t *telemetryService) TrackSubscribeFailed(
	ctx context.Context,
	roomID livekit.RoomID,
	roomName livekit.RoomName,
	participantID livekit.ParticipantID,
	trackID livekit.TrackID,
	err error,
	isUserError bool,
) {
	t.enqueue(func() {
		prometheus.RecordTrackSubscribeFailure(err, isUserError)

		room := toMinimalRoomProto(roomID, roomName)
		ev := newTrackEvent(livekit.AnalyticsEventType_TRACK_SUBSCRIBE_FAILED, room, participantID, &livekit.TrackInfo{
			Sid: string(trackID),
		})
		ev.Error = err.Error()
		t.SendEvent(ctx, ev)
	})
}

func (t *telemetryService) TrackUnsubscribed(
	ctx context.Context,
	roomID livekit.RoomID,
	roomName livekit.RoomName,
	participantID livekit.ParticipantID,
	track *livekit.TrackInfo,
	shouldSendEvent bool,
) {
	t.enqueue(func() {
		prometheus.RecordTrackUnsubscribed(track.Type.String())

		if shouldSendEvent {
			room := toMinimalRoomProto(roomID, roomName)
			t.SendEvent(ctx, newTrackEvent(livekit.AnalyticsEventType_TRACK_UNSUBSCRIBED, room, participantID, track))
		}
	})
}

func (t *telemetryService) TrackUnpublished(
	ctx context.Context,
	roomID livekit.RoomID,
	roomName livekit.RoomName,
	participantID livekit.ParticipantID,
	identity livekit.ParticipantIdentity,
	track *livekit.TrackInfo,
	shouldSendEvent bool,
) {
	t.enqueue(func() {
		prometheus.SubPublishedTrack(track.Type.String())
		if !shouldSendEvent {
			return
		}

		room := toMinimalRoomProto(roomID, roomName)
		participant := &livekit.ParticipantInfo{
			Sid:      string(participantID),
			Identity: string(identity),
		}
		t.NotifyEvent(ctx, &livekit.WebhookEvent{
			Event:       webhook.EventTrackUnpublished,
			Room:        room,
			Participant: participant,
			Track:       track,
		})

		t.SendEvent(ctx, newTrackEvent(livekit.AnalyticsEventType_TRACK_UNPUBLISHED, room, participantID, track))
	})
}

func (t *telemetryService) TrackMuted(
	ctx context.Context,
	roomID livekit.RoomID,
	roomName livekit.RoomName,
	participantID livekit.ParticipantID,
	track *livekit.TrackInfo,
) {
	t.enqueue(func() {
		room := toMinimalRoomProto(roomID, roomName)
		t.SendEvent(ctx, newTrackEvent(livekit.AnalyticsEventType_TRACK_MUTED, room, participantID, track))
	})
}

func (t *telemetryService) TrackUnmuted(
	ctx context.Context,
	roomID livekit.RoomID,
	roomName livekit.RoomName,
	participantID livekit.ParticipantID,
	track *livekit.TrackInfo,
) {
	t.enqueue(func() {
		room := toMinimalRoomProto(roomID, roomName)
		t.SendEvent(ctx, newTrackEvent(livekit.AnalyticsEventType_TRACK_UNMUTED, room, participantID, track))
	})
}

func (t *telemetryService) TrackPublishRTPStats(
	ctx context.Context,
	roomID livekit.RoomID,
	roomName livekit.RoomName,
	participantID livekit.ParticipantID,
	trackID livekit.TrackID,
	mimeType mime.MimeType,
	layer int,
	stats *livekit.RTPStats,
) {
	t.enqueue(func() {
		room := toMinimalRoomProto(roomID, roomName)
		ev := newRoomEvent(livekit.AnalyticsEventType_TRACK_PUBLISH_STATS, room)
		ev.ParticipantId = string(participantID)
		ev.TrackId = string(trackID)
		ev.Mime = mimeType.String()
		ev.VideoLayer = int32(layer)
		ev.RtpStats = stats
		t.SendEvent(ctx, ev)
	})
}

func (t *telemetryService) TrackSubscribeRTPStats(
	ctx context.Context,
	roomID livekit.RoomID,
	roomName livekit.RoomName,
	participantID livekit.ParticipantID,
	trackID livekit.TrackID,
	mimeType mime.MimeType,
	stats *livekit.RTPStats,
) {
	t.enqueue(func() {
		room := toMinimalRoomProto(roomID, roomName)
		ev := newRoomEvent(livekit.AnalyticsEventType_TRACK_SUBSCRIBE_STATS, room)
		ev.ParticipantId = string(participantID)
		ev.TrackId = string(trackID)
		ev.Mime = mimeType.String()
		ev.RtpStats = stats
		t.SendEvent(ctx, ev)
	})
}

func (t *telemetryService) NotifyEgressEvent(ctx context.Context, event string, info *livekit.EgressInfo) {
	opts := egress.GetEgressNotifyOptions(info)

	t.NotifyEvent(ctx, &livekit.WebhookEvent{
		Event:      event,
		EgressInfo: info,
	}, opts...)
}

func (t *telemetryService) EgressStarted(ctx context.Context, info *livekit.EgressInfo) {

	t.enqueue(func() {
		t.NotifyEgressEvent(ctx, webhook.EventEgressStarted, info)

		t.SendEvent(ctx, newEgressEvent(livekit.AnalyticsEventType_EGRESS_STARTED, info))
	})
}

func (t *telemetryService) EgressUpdated(ctx context.Context, info *livekit.EgressInfo) {
	t.enqueue(func() {
		t.NotifyEgressEvent(ctx, webhook.EventEgressUpdated, info)

		t.SendEvent(ctx, newEgressEvent(livekit.AnalyticsEventType_EGRESS_UPDATED, info))
	})
}

func (t *telemetryService) EgressEnded(ctx context.Context, info *livekit.EgressInfo) {
	t.enqueue(func() {
		t.NotifyEgressEvent(ctx, webhook.EventEgressEnded, info)

		t.SendEvent(ctx, newEgressEvent(livekit.AnalyticsEventType_EGRESS_ENDED, info))
	})
}

func (t *telemetryService) IngressCreated(ctx context.Context, info *livekit.IngressInfo) {
	t.enqueue(func() {
		t.SendEvent(ctx, newIngressEvent(livekit.AnalyticsEventType_INGRESS_CREATED, info))
	})
}

func (t *telemetryService) IngressDeleted(ctx context.Context, info *livekit.IngressInfo) {
	t.enqueue(func() {
		t.SendEvent(ctx, newIngressEvent(livekit.AnalyticsEventType_INGRESS_DELETED, info))
	})
}

func (t *telemetryService) IngressStarted(ctx context.Context, info *livekit.IngressInfo) {
	t.enqueue(func() {
		t.NotifyEvent(ctx, &livekit.WebhookEvent{
			Event:       webhook.EventIngressStarted,
			IngressInfo: info,
		})

		t.SendEvent(ctx, newIngressEvent(livekit.AnalyticsEventType_INGRESS_STARTED, info))
	})
}

func (t *telemetryService) IngressUpdated(ctx context.Context, info *livekit.IngressInfo) {
	t.enqueue(func() {
		t.SendEvent(ctx, newIngressEvent(livekit.AnalyticsEventType_INGRESS_UPDATED, info))
	})
}

func (t *telemetryService) IngressEnded(ctx context.Context, info *livekit.IngressInfo) {
	t.enqueue(func() {
		t.NotifyEvent(ctx, &livekit.WebhookEvent{
			Event:       webhook.EventIngressEnded,
			IngressInfo: info,
		})

		t.SendEvent(ctx, newIngressEvent(livekit.AnalyticsEventType_INGRESS_ENDED, info))
	})
}

func (t *telemetryService) Report(ctx context.Context, reportInfo *livekit.ReportInfo) {
	t.enqueue(func() {
		ev := &livekit.AnalyticsEvent{
			Type:      livekit.AnalyticsEventType_REPORT,
			Timestamp: timestamppb.Now(),
			Report:    reportInfo,
		}
		t.SendEvent(ctx, ev)
	})
}

func (t *telemetryService) APICall(ctx context.Context, apiCallInfo *livekit.APICallInfo) {
	t.enqueue(func() {
		ev := &livekit.AnalyticsEvent{
			Type:      livekit.AnalyticsEventType_API_CALL,
			Timestamp: timestamppb.Now(),
			ApiCall:   apiCallInfo,
		}
		t.SendEvent(ctx, ev)
	})
}

func (t *telemetryService) Webhook(ctx context.Context, webhookInfo *livekit.WebhookInfo) {
	t.enqueue(func() {
		ev := &livekit.AnalyticsEvent{
			Type:      livekit.AnalyticsEventType_WEBHOOK,
			Timestamp: timestamppb.Now(),
			Webhook:   webhookInfo,
		}
		t.SendEvent(ctx, ev)
	})
}

func newRoomEvent(event livekit.AnalyticsEventType, room *livekit.Room) *livekit.AnalyticsEvent {
	ev := &livekit.AnalyticsEvent{
		Type:      event,
		Timestamp: timestamppb.Now(),
	}
	if room != nil {
		ev.Room = room
		ev.RoomId = room.Sid
	}
	return ev
}

func newParticipantEvent(event livekit.AnalyticsEventType, room *livekit.Room, participant *livekit.ParticipantInfo) *livekit.AnalyticsEvent {
	ev := newRoomEvent(event, room)
	if participant != nil {
		ev.ParticipantId = participant.Sid
		ev.Participant = participant
	}
	return ev
}

func newTrackEvent(event livekit.AnalyticsEventType, room *livekit.Room, participantID livekit.ParticipantID, track *livekit.TrackInfo) *livekit.AnalyticsEvent {
	ev := newParticipantEvent(event, room, &livekit.ParticipantInfo{
		Sid: string(participantID),
	})
	if track != nil {
		ev.TrackId = track.Sid
		ev.Track = track
	}
	return ev
}

func newEgressEvent(event livekit.AnalyticsEventType, egress *livekit.EgressInfo) *livekit.AnalyticsEvent {
	return &livekit.AnalyticsEvent{
		Type:      event,
		Timestamp: timestamppb.Now(),
		EgressId:  egress.EgressId,
		RoomId:    egress.RoomId,
		Egress:    egress,
	}
}

func newIngressEvent(event livekit.AnalyticsEventType, ingress *livekit.IngressInfo) *livekit.AnalyticsEvent {
	return &livekit.AnalyticsEvent{
		Type:      event,
		Timestamp: timestamppb.Now(),
		IngressId: ingress.IngressId,
		Ingress:   ingress,
	}
}

func toMinimalRoomProto(roomID livekit.RoomID, roomName livekit.RoomName) *livekit.Room {
	return &livekit.Room{
		Sid:  string(roomID),
		Name: string(roomName),
	}
}
