o
    ii#!                     @   s   d dl mZ d dlmZ d dlmZ d dlmZ d dlm	Z	 e	r4d dl
mZ d dl
mZ d dlmZmZ G d	d
 d
eZdd Zdd Zdd ZG dd deZG dd deZdS )    )configure_scope)Hub)Integration)capture_internal_exceptions)TYPE_CHECKING)Any)Optional)EventHintc                   @   s   e Zd ZdZedd ZdS )SparkIntegrationsparkc                   C   s
   t   d S N)patch_spark_context_init r   r   i/var/www/html/pca-backend/venv/lib/python3.10/site-packages/sentry_sdk/integrations/spark/spark_driver.py
setup_once   s   
zSparkIntegration.setup_onceN)__name__
__module____qualname__
identifierstaticmethodr   r   r   r   r   r      s    r   c                  C   s:   ddl m}  | j}|r|d|j |d|j dS dS )z
    Set properties in driver that propagate to worker processes, allowing for workers to have access to those properties.
    This allows worker integration to have access to app_name and application_id.
    r   SparkContextsentry_app_namesentry_application_idN)pysparkr   _active_spark_contextsetLocalPropertyappNameapplicationId)r   spark_contextr   r   r   _set_app_properties   s   r!   c                 C   s4   ddl m} | j}|| t }| j | dS )zA
    Start java gateway server to add custom `SparkListener`
    r   )ensure_callback_server_startedN)pyspark.java_gatewayr"   _gatewaySentryListener_jscscaddSparkListener)r'   r"   gwlistenerr   r   r   _start_sentry_listener(   s
   r+   c                     s(   ddl m}  | j  fdd}|| _d S )Nr   r   c                    sv    g|R i |}t jtd u r|S t  t  t }|j fdd}W d    |S 1 s4w   Y  |S )Nc                    s:  t   tjtd u r| W  d    S | di d   | di d jd | d d jd | d d jd	 | d d
 jd | d d j	 | d d j
 | d d j | d d j | d d j | di d j W d    | S 1 sw   Y  | S )Nuseridtagszexecutor.idzspark.executor.idzspark-submit.deployModezspark.submit.deployModezdriver.hostzspark.driver.hostzdriver.portzspark.driver.portspark_versionapp_nameapplication_idmaster
spark_homeextraweb_url)r   r   currentget_integrationr   
setdefault	sparkUser_confgetversionr   r   r2   	sparkHomeuiWebUrl)eventhintselfr   r   process_eventG   s8   

z[patch_spark_context_init.<locals>._sentry_patched_spark_context_init.<locals>.process_event)r   r6   r7   r   r+   r!   r   add_event_processor)rB   argskwargsinitscoperC   spark_context_initrA   r   "_sentry_patched_spark_context_init;   s   
""zDpatch_spark_context_init.<locals>._sentry_patched_spark_context_init)r   r   _do_init)r   rK   r   rI   r   r   5   s   
.r   c                   @   s   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd Zdd Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zdd Zdd  Zd!d" Zd#d$ Zd%d& Zd'd( Zd)d* Zd+d, Zd-d. Zd/d0 ZG d1d2 d2Zd3S )4SparkListenerc                 C      d S r   r   )rB   applicationEndr   r   r   onApplicationEndm      zSparkListener.onApplicationEndc                 C   rN   r   r   )rB   applicationStartr   r   r   onApplicationStartq   rQ   z SparkListener.onApplicationStartc                 C   rN   r   r   )rB   blockManagerAddedr   r   r   onBlockManagerAddedu   rQ   z!SparkListener.onBlockManagerAddedc                 C   rN   r   r   )rB   blockManagerRemovedr   r   r   onBlockManagerRemovedy   rQ   z#SparkListener.onBlockManagerRemovedc                 C   rN   r   r   )rB   blockUpdatedr   r   r   onBlockUpdated}   rQ   zSparkListener.onBlockUpdatedc                 C   rN   r   r   )rB   environmentUpdater   r   r   onEnvironmentUpdate   rQ   z!SparkListener.onEnvironmentUpdatec                 C   rN   r   r   )rB   executorAddedr   r   r   onExecutorAdded   rQ   zSparkListener.onExecutorAddedc                 C   rN   r   r   )rB   executorBlacklistedr   r   r   onExecutorBlacklisted   rQ   z#SparkListener.onExecutorBlacklistedc                 C   rN   r   r   )rB   executorBlacklistedForStager   r   r   onExecutorBlacklistedForStage   s   z+SparkListener.onExecutorBlacklistedForStagec                 C   rN   r   r   )rB   executorMetricsUpdater   r   r   onExecutorMetricsUpdate   rQ   z%SparkListener.onExecutorMetricsUpdatec                 C   rN   r   r   )rB   executorRemovedr   r   r   onExecutorRemoved   rQ   zSparkListener.onExecutorRemovedc                 C   rN   r   r   )rB   jobEndr   r   r   onJobEnd   rQ   zSparkListener.onJobEndc                 C   rN   r   r   )rB   jobStartr   r   r   
onJobStart   rQ   zSparkListener.onJobStartc                 C   rN   r   r   )rB   nodeBlacklistedr   r   r   onNodeBlacklisted   rQ   zSparkListener.onNodeBlacklistedc                 C   rN   r   r   )rB   nodeBlacklistedForStager   r   r   onNodeBlacklistedForStage   rQ   z'SparkListener.onNodeBlacklistedForStagec                 C   rN   r   r   )rB   nodeUnblacklistedr   r   r   onNodeUnblacklisted   rQ   z!SparkListener.onNodeUnblacklistedc                 C   rN   r   r   )rB   r?   r   r   r   onOtherEvent   rQ   zSparkListener.onOtherEventc                 C   rN   r   r   )rB   speculativeTaskr   r   r   onSpeculativeTaskSubmitted   rQ   z(SparkListener.onSpeculativeTaskSubmittedc                 C   rN   r   r   )rB   stageCompletedr   r   r   onStageCompleted   rQ   zSparkListener.onStageCompletedc                 C   rN   r   r   )rB   stageSubmittedr   r   r   onStageSubmitted   rQ   zSparkListener.onStageSubmittedc                 C   rN   r   r   )rB   taskEndr   r   r   	onTaskEnd   rQ   zSparkListener.onTaskEndc                 C   rN   r   r   )rB   taskGettingResultr   r   r   onTaskGettingResult   rQ   z!SparkListener.onTaskGettingResultc                 C   rN   r   r   )rB   	taskStartr   r   r   onTaskStart   rQ   zSparkListener.onTaskStartc                 C   rN   r   r   )rB   unpersistRDDr   r   r   onUnpersistRDD   rQ   zSparkListener.onUnpersistRDDc                   @   s   e Zd ZdgZdS )zSparkListener.Javaz1org.apache.spark.scheduler.SparkListenerInterfaceN)r   r   r   
implementsr   r   r   r   Java   s    
r   N)r   r   r   rP   rS   rU   rW   rY   r[   r]   r_   ra   rc   re   rg   ri   rk   rm   ro   rp   rr   rt   rv   rx   rz   r|   r~   r   r   r   r   r   rM   l   s4    rM   c                   @   s4   e Zd Zdd Zdd Zdd Zdd Zd	d
 ZdS )r%   c                 C   s   t j| _d S r   )r   r6   hubrA   r   r   r   __init__   s   zSentryListener.__init__c                 C   s(   d | }| jjd|d t  d S )NzJob {} Startedinfo)levelmessage)formatjobIdr   add_breadcrumbr!   )rB   rh   r   r   r   r   ri      s   
zSentryListener.onJobStartc                 C   sd   d}d}d|   i}|   dkrd}d| }n	d}d| }| jj|||d d S )	N resultJobSucceededr   zJob {} EndedwarningzJob {} Failedr   r   data)	jobResulttoStringr   r   r   r   )rB   rf   r   r   r   r   r   r   rg      s   zSentryListener.onJobEndc                 C   sD   |  }d| }| | d}| jjd||d t  d S )NzStage {} Submitted	attemptIdnamer   r   )	stageInfor   stageIdr   r   r   r   r!   )rB   ru   
stage_infor   r   r   r   r   rv      s
   
zSentryListener.onStageSubmittedc                 C   s   ddl m} | }d}d}| | d}z|  |d< d| }d}W n |y<   d| }d	}Y nw | j	j
|||d
 d S )Nr   )Py4JJavaErrorr   r   reasonzStage {} Failedr   zStage {} Completedr   r   )py4j.protocolr   r   r   r   failureReasonr;   r   r   r   r   )rB   rs   r   r   r   r   r   r   r   r   rt      s   zSentryListener.onStageCompletedN)r   r   r   r   ri   rg   rv   rt   r   r   r   r   r%      s    r%   N)
sentry_sdkr   sentry_sdk.hubr   sentry_sdk.integrationsr   sentry_sdk.utilsr   sentry_sdk._typesr   typingr   r   r	   r
   r   r!   r+   r   objectrM   r%   r   r   r   r   <module>   s    	7g