Sunday, October 19, 2025

test

SET SERVEROUTPUT ON SIZE UNLIMITED
WHENEVER SQLERROR EXIT FAILURE

DECLARE
    -- Configuration variables
    c_batch_size        CONSTANT NUMBER := 40;  
    c_job_prefix        CONSTANT VARCHAR2(15) := 'BATCH_COUNT_';

    -- Runtime Variables
    v_total_source_count NUMBER;
    v_max_batches_needed NUMBER; -- 🌟 FIX: Holds the actual highest partition ID
    v_job_name           VARCHAR2(128);
    v_ddl_statement      VARCHAR2(4000);
    v_total_submitted    NUMBER := 0;
    v_pending_in_batch   NUMBER; 

    -- Helper procedure to execute DDL safely (defined to avoid compile errors on missing objects)
    PROCEDURE execute_ddl(p_sql IN VARCHAR2) IS
    BEGIN
        EXECUTE IMMEDIATE p_sql;
    EXCEPTION
        WHEN OTHERS THEN
            -- Ignore common errors (including ORA-27475: unknown job)
            IF SQLCODE NOT IN (-942, -4043, -27475) THEN
                RAISE; 
            END IF;
    END;
    
    -- Helper function to check for object existence
    FUNCTION object_exists(p_object_name IN VARCHAR2) RETURN BOOLEAN IS
        v_exists NUMBER;
    BEGIN
        SELECT COUNT(*) INTO v_exists FROM user_objects WHERE object_name = UPPER(p_object_name);
        RETURN v_exists > 0;
    END;
    
    -- P1. Initializes DDL and Structural Integrity
    PROCEDURE initialize_system IS
    BEGIN
        DBMS_OUTPUT.PUT_LINE('--- PHASES 0 & 1: PERFORMING DESTRUCTIVE FIRST-TIME SETUP ---');
        
        -- Cleanup Phase
        execute_ddl('DROP VIEW row_count_summary');
        execute_ddl('DROP TABLE row_count_control CASCADE CONSTRAINTS');
        execute_ddl('DROP TABLE row_counts_scheduler_log CASCADE CONSTRAINTS'); 
        execute_ddl('DROP TABLE job_staging_gtt');

        -- Creation Phase (Omitting DDL body for brevity, assume correct)
        EXECUTE IMMEDIATE '
            CREATE TABLE row_count_control (
                schema_name VARCHAR2(128) NOT NULL, table_name VARCHAR2(128) NOT NULL,
                job_partition_id NUMBER NOT NULL, status VARCHAR2(10) DEFAULT ''PENDING'' NOT NULL,
                start_time TIMESTAMP, end_time TIMESTAMP, error_message VARCHAR2(4000),
                CONSTRAINT pk_control PRIMARY KEY (schema_name, table_name)
            )';

        EXECUTE IMMEDIATE '
            CREATE TABLE row_counts_scheduler_log (
                schema_name VARCHAR2(128) NOT NULL, table_name VARCHAR2(128) NOT NULL,
                row_count NUMBER, count_timestamp TIMESTAMP, job_name VARCHAR2(128),
                batch_id NUMBER, error_message VARCHAR2(4000),
                CONSTRAINT pk_final_log PRIMARY KEY (schema_name, table_name)
            )';

        EXECUTE IMMEDIATE '
            CREATE GLOBAL TEMPORARY TABLE job_staging_gtt (
                owner VARCHAR2(128), table_name VARCHAR2(128)
            ) ON COMMIT PRESERVE ROWS';
            
        -- Create Summary View
        v_ddl_statement := '
            CREATE OR REPLACE VIEW row_count_summary AS
            SELECT 
                COUNT(*) AS total_tasks, SUM(CASE WHEN status = ''COMPLETE'' THEN 1 ELSE 0 END) AS success_count,
                SUM(CASE WHEN status = ''FAILED'' THEN 1 ELSE 0 END) AS failed_count, SUM(CASE WHEN status = ''RUNNING'' THEN 1 ELSE 0 END) AS running_count,
                ROUND(SUM(CASE WHEN status = ''COMPLETE'' THEN 1 ELSE 0 END) / GREATEST(COUNT(*),1) * 100, 1) AS pct_complete,
                MAX(end_time) AS last_completion 
            FROM row_count_control';
        EXECUTE IMMEDIATE v_ddl_statement;
        
        COMMIT;
        DBMS_OUTPUT.PUT_LINE(' -> SYSTEM DDL SETUP COMPLETE.');
    END initialize_system;

    -- P2. Populates the Master Control Table using NTILE
    PROCEDURE prepare_workload IS
        v_total_count NUMBER;
        v_ddl_insert VARCHAR2(4000);
    BEGIN
        SELECT COUNT(*) INTO v_total_count 
        FROM all_tables WHERE owner IN ('HR','SALES','FINANCE') AND table_name NOT LIKE 'BIN$%';

        v_batches_needed := CEIL(v_total_count / c_batch_size);

        -- CRITICAL: MERGE is used to add only new tables, preserving existing statuses
        v_ddl_insert := '
            MERGE INTO row_count_control c
            USING (
                SELECT owner AS schema_name, table_name,
                        NTILE(:num_batches) OVER (ORDER BY owner, table_name) AS job_partition_id
                FROM all_tables
                WHERE owner IN (''HR'',''SALES'',''FINANCE'') AND table_name NOT LIKE ''BIN$%''
            ) s
            ON (c.schema_name = s.schema_name AND c.table_name = s.table_name)
            WHEN NOT MATCHED THEN
                INSERT (schema_name, table_name, job_partition_id, status)
                VALUES (s.schema_name, s.table_name, s.job_partition_id, ''PENDING'')';
                
        EXECUTE IMMEDIATE v_ddl_insert USING v_batches_needed;
        COMMIT;
        DBMS_OUTPUT.PUT_LINE(' -> WORKLOAD PREPARED. ' || SQL%ROWCOUNT || ' new tasks added/checked. Total partitions: ' || v_batches_needed);
    END prepare_workload;
    
    -- P3. Clears abandoned 'RUNNING' tasks
    PROCEDURE reset_stalled_tasks IS
        v_stale_threshold_minutes CONSTANT NUMBER := 60; 
    BEGIN
        UPDATE row_count_control
        SET status = 'PENDING', start_time = NULL, end_time = NULL, error_message = 'RESET: Task was forcefully recycled from RUNNING state.'
        WHERE status = 'RUNNING'
          AND start_time < SYSTIMESTAMP - INTERVAL '1' MINUTE * v_stale_threshold_minutes;
        COMMIT;
    END reset_stalled_tasks;
    
    -- P4. Creates the core counting procedure
    PROCEDURE create_counting_procedure IS
    BEGIN
        v_ddl_statement := '
            CREATE OR REPLACE PROCEDURE count_table_batch(p_batch_id NUMBER) AS
                v_sql_staging_filter VARCHAR2(4000); v_sql_count VARCHAR2(200); v_cnt NUMBER;
                v_job_name_log CONSTANT VARCHAR2(30) := ''BATCH_'' || p_batch_id; v_error_msg VARCHAR2(4000); 
                CURSOR c_staged_tables IS SELECT owner, table_name FROM job_staging_gtt;
            BEGIN
                -- Phase 1: STAGE WORKLOAD (Isolation and Resumption)
                EXECUTE IMMEDIATE ''TRUNCATE TABLE job_staging_gtt'';
                v_sql_staging_filter := ''
                    INSERT INTO job_staging_gtt (owner, table_name)
                    SELECT schema_name, table_name
                    FROM row_count_control
                    WHERE job_partition_id = :p_batch_id AND status IN (''''PENDING'''', ''''FAILED'''')
                    '';
                EXECUTE IMMEDIATE v_sql_staging_filter USING p_batch_id;
                COMMIT; 

                -- Phase 2: PROCESS STAGED WORKLOAD
                FOR r IN c_staged_tables LOOP
                    BEGIN 
                        -- Update status to RUNNING immediately (for stall detection)
                        UPDATE row_count_control SET status = ''RUNNING'', start_time = SYSTIMESTAMP
                        WHERE schema_name = r.owner AND table_name = r.table_name;
                        COMMIT; 

                        v_sql_count := ''SELECT COUNT(*) FROM "''||r.owner||''"."''||r.table_name||''"'';
                        EXECUTE IMMEDIATE v_sql_count INTO v_cnt;
                        
                        -- LOG SUCCESS and Mark COMPLETE
                        DELETE FROM row_counts_scheduler_log WHERE schema_name = r.owner AND table_name = r.table_name;
                        INSERT INTO row_counts_scheduler_log (schema_name, table_name, row_count, count_timestamp, job_name, batch_id, error_message)
                        VALUES (r.owner, r.table_name, v_cnt, SYSTIMESTAMP, v_job_name_log, p_batch_id, NULL);

                        UPDATE row_count_control SET status = ''COMPLETE'', end_time = SYSTIMESTAMP, error_message = NULL
                        WHERE schema_name = r.owner AND table_name = r.table_name;
                        COMMIT; 

                    EXCEPTION 
                        WHEN OTHERS THEN 
                            v_error_msg := SUBSTR(SQLERRM, 1, 4000);
                            -- Mark task as FAILED (prevents automatic retries until DBA intervenes)
                            UPDATE row_count_control SET status = ''FAILED'', end_time = SYSTIMESTAMP, error_message = v_error_msg
                            WHERE schema_name = r.owner AND table_name = r.table_name;
                            INSERT INTO row_counts_scheduler_log (schema_name, table_name, row_count, count_timestamp, job_name, batch_id, error_message)
                            VALUES (r.owner, r.table_name, -1, SYSTIMESTAMP, v_job_name_log, p_batch_id, v_error_msg);
                            COMMIT; 
                    END;
                END LOOP;
            END;
            ';
        EXECUTE IMMEDIATE v_ddl_statement;
        DBMS_OUTPUT.PUT_LINE(' -> Counting Procedure created.');
    END create_counting_procedure;


BEGIN
    --------------------------------------------------------------------------------------------------
    -- 0. INITIAL LAUNCH OR RESUME CHECK
    --------------------------------------------------------------------------------------------------
    IF NOT object_exists('ROW_COUNT_CONTROL') THEN
        DBMS_OUTPUT.PUT_LINE('--- PHASE 1: FULL SYSTEM INITIALIZATION (FIRST RUN) ---');
        initialize_system; 
        prepare_workload; -- Populates the queue for the very first time
    ELSE
        DBMS_OUTPUT.PUT_LINE('--- PHASE 1: SYSTEM IS LIVE. INITIATING RESTART/RESUME PROTOCOL ---');
        prepare_workload; -- Check for new schemas/tables added since last run
    END IF;
    
    create_counting_procedure; -- Ensure procedure is compiled with latest logic

    --------------------------------------------------------------------------------------------------
    -- 1. EXECUTION AND RESTART PROTOCOL
    --------------------------------------------------------------------------------------------------
    
    DBMS_OUTPUT.PUT_LINE('PHASE 2: EXECUTING RESTART PROTOCOL');
    
    -- Step A: Clear abandoned locks and recycle tasks
    reset_stalled_tasks; 

    -- 🌟 CRITICAL FIX: Get final batch count from the actual MAX job partition ID 🌟
    SELECT MAX(job_partition_id) INTO v_batches_needed
    FROM row_count_control;
    
    -- Safety check for the case where the control table is empty
    IF v_batches_needed IS NULL THEN
        DBMS_OUTPUT.PUT_LINE('ERROR: No tables found in the control queue. Cannot proceed.');
        RETURN;
    END IF;

    -- Step B: Launch/Relaunch Execution
    DBMS_OUTPUT.PUT_LINE('Launching up to ' || v_batches_needed || ' parallel jobs...');

    FOR i IN 1..v_batches_needed LOOP
        v_job_name := c_job_prefix || LPAD(i, 2, '0');
        
        -- Check if this batch has any PENDING or FAILED work before launching (Optimization)
        SELECT COUNT(*) INTO v_pending_in_batch
        FROM row_count_control
        WHERE job_partition_id = i AND status IN ('PENDING', 'FAILED');

        IF v_pending_in_batch > 0 THEN
            -- Cleanup old job definition (Avoids ORA-27475 crash)
            execute_ddl('BEGIN DBMS_SCHEDULER.DROP_JOB(''' || v_job_name || ''', TRUE); END;'); 

            -- Create, Set Argument, and Enable NEW JOB
            DBMS_SCHEDULER.CREATE_JOB (
                job_name => v_job_name, job_type => 'STORED_PROCEDURE', job_action => 'COUNT_TABLE_BATCH',
                number_of_arguments => 1, start_date => SYSTIMESTAMP, repeat_interval => NULL, enabled => FALSE 
            );
            
            -- Set argument (using TO_CHAR to avoid PLS-00307 ambiguity)
            DBMS_SCHEDULER.SET_JOB_ARGUMENT_VALUE(v_job_name, 1, TO_CHAR(i));
            DBMS_SCHEDULER.ENABLE(v_job_name);
            
            v_total_submitted := v_total_submitted + 1;
            DBMS_OUTPUT.PUT_LINE('  -> Submitted Job ' || v_job_name || ' (' || v_pending_in_batch || ' tasks pending/failed)');
        END IF;
    END LOOP;
    
    COMMIT;
    DBMS_OUTPUT.PUT_LINE('--- FINAL SUCCESS: ALL ' || v_total_submitted || ' jobs successfully launched/resumed. ---');

EXCEPTION
    WHEN OTHERS THEN
        DBMS_OUTPUT.PUT_LINE('!!! FATAL ERROR IN MASTER BLOCK !!! SQLERRM: ' || SQLERRM);
        ROLLBACK;
        RAISE;
END;
/

====================================================================

UPDATE row_count_control
SET 
    status = 'PENDING',
    start_time = NULL,
    end_time = NULL,
    error_message = 'RETRY FORCED by DBA. Originally: ' || NVL(error_message, 'Unknown.')
WHERE 
    -- Target tasks that were abandoned (stuck in RUNNING for > 1 hour)
    (status = 'RUNNING' AND start_time < SYSTIMESTAMP - INTERVAL '60' MINUTE)
    -- OR Target tasks that previously failed but are now ready for retry
    OR status = 'FAILED';

COMMIT;

No comments: