Sunday, October 19, 2025

live monitoring

 
WITH BatchActivity AS (
    -- 1. Get all successful logs from the last 10 minutes
    SELECT
        batch_id,
        job_name,
        count_timestamp AS log_time
    FROM
        row_counts_scheduler_log
    WHERE
        row_count >= 0
        AND count_timestamp >= SYSTIMESTAMP - INTERVAL '10' MINUTE
),
BatchTotals AS (
    -- 2. Get the total cumulative successful count for each batch, ever
    SELECT
        batch_id,
        COUNT(*) AS total_processed_ever
    FROM
        row_counts_scheduler_log
    WHERE
        row_count >= 0
    GROUP BY
        batch_id
)
SELECT
    ba.batch_id,
    ba.job_name,
    -- Calculate new tables processed in the last 5 minutes
    SUM(CASE WHEN ba.log_time >= SYSTIMESTAMP - INTERVAL '5' MINUTE THEN 1 ELSE 0 END) AS tables_processed_last_5_min,
    
    -- Calculate total tables processed in the previous 5 minutes (5 to 10 min ago)
    SUM(CASE WHEN ba.log_time >= SYSTIMESTAMP - INTERVAL '10' MINUTE 
             AND ba.log_time < SYSTIMESTAMP - INTERVAL '5' MINUTE THEN 1 ELSE 0 END) AS tables_processed_prev_5_min,
    -- Retrieve the cumulative total processed by this job
    bt.total_processed_ever
    
FROM
    BatchActivity ba
JOIN
    BatchTotals bt ON ba.batch_id = bt.batch_id
GROUP BY
    ba.batch_id,
    ba.job_name,
    bt.total_processed_ever
ORDER BY
    tables_processed_last_5_min DESC, -- Show currently active jobs first
    ba.batch_id


=======
WITH BatchActivity AS (
    -- 1. Get all successful logs from the last 15 minutes
    SELECT batch_id, job_name, count_timestamp AS log_time
    FROM row_counts_scheduler_log
    WHERE row_count >= 0
    AND count_timestamp >= SYSTIMESTAMP - INTERVAL '15' MINUTE
),
BatchTotals AS (
    -- 2. Get the total cumulative successful count and last activity time for each batch, ever
    SELECT batch_id, COUNT(*) AS total_processed_ever,
           MAX(count_timestamp) AS last_activity
    FROM row_counts_scheduler_log WHERE row_count >= 0 
    GROUP BY batch_id
),
ExpectedWorkload AS ( 
    -- 3. CRITICAL FIX: Calculate the unique, expected total table count for each partition (batch_id)
    SELECT MOD(ORA_HASH(owner||table_name),100)+1 AS batch_id,
           COUNT(*) AS expected
    FROM all_tables 
    WHERE owner IN ('HR','SALES','FINANCE') AND table_name NOT LIKE 'BIN$%'
    GROUP BY MOD(ORA_HASH(owner||table_name),100)+1
)
SELECT
    -- LIVE STATUS: Flags jobs that have not logged anything in the last 5 minutes
    CASE WHEN SUM(CASE WHEN ba.log_time >= SYSTIMESTAMP - INTERVAL '5' MINUTE THEN 1 ELSE 0 END) = 0 
         THEN ' STALLED' ELSE ' ACTIVE' END AS "STATUS",
         
    ba.batch_id AS "BATCH",
    
    -- Activity Comparison
    SUM(CASE WHEN ba.log_time >= SYSTIMESTAMP - INTERVAL '5' MINUTE THEN 1 ELSE 0 END) AS "NOW (tpm)",
    SUM(CASE WHEN ba.log_time >= SYSTIMESTAMP - INTERVAL '10' MINUTE
             AND ba.log_time < SYSTIMESTAMP - INTERVAL '5' MINUTE THEN 1 ELSE 0 END) AS "PREV (tpm)",
    
    -- Progress Metrics
    ew.expected AS "EXPECTED", 
    bt.total_processed_ever AS "DONE", 
    ROUND((bt.total_processed_ever / ew.expected) * 100, 1) AS "%",
    
    -- ETA Calculation: (Remaining Work / Current Rate) * Time Window
    -- Only calculate ETA if there was activity in the last 5 minutes to avoid division by zero
    CASE WHEN SUM(CASE WHEN ba.log_time >= SYSTIMESTAMP - INTERVAL '5' MINUTE THEN 1 ELSE 0 END) > 0 
         THEN ROUND(
                 (ew.expected - bt.total_processed_ever) / 
                 SUM(CASE WHEN ba.log_time >= SYSTIMESTAMP - INTERVAL '5' MINUTE THEN 1 ELSE 0 END) * 5, 1
              )
         ELSE NULL 
    END AS "ETA (min)",
    
    TO_CHAR(bt.last_activity, 'HH24:MI:SS') AS "LAST TOUCH"
    
FROM BatchActivity ba
JOIN BatchTotals bt ON ba.batch_id = bt.batch_id
JOIN ExpectedWorkload ew ON ba.batch_id = ew.batch_id 
GROUP BY ba.batch_id, ba.job_name, bt.total_processed_ever, bt.last_activity, ew.expected
ORDER BY
    -- Prioritize currently active batches at the top
    SUM(CASE WHEN ba.log_time >= SYSTIMESTAMP - INTERVAL '5' MINUTE THEN 1 ELSE 0 END) DESC, 
    ba.batch_id;

===

SET SERVEROUTPUT ON SIZE UNLIMITED

DECLARE
    -- Add the specific IDs returned by the query in Step 1
    TYPE t_batch_list IS TABLE OF NUMBER;
    v_stalled_batches t_batch_list := t_batch_list(32, 33, 45, 61, 62, 70); -- 👈 UPDATE THIS LIST

BEGIN
    DBMS_OUTPUT.PUT_LINE('--- INITIATING MANUAL SEQUENTIAL RESUME ---');
    
    FOR i IN 1..v_stalled_batches.COUNT LOOP
        DBMS_OUTPUT.PUT_LINE(RPAD('-', 60, '-'));
        DBMS_OUTPUT.PUT_LINE('Processing Batch ID: ' || v_stalled_batches(i));
        
        -- EXECUTE THE PROCEDURE
        count_table_batch(v_stalled_batches(i));
        
        -- The procedure contains the COMMIT, so this ensures the log is immediately updated
        
        DBMS_OUTPUT.PUT_LINE('Batch ID ' || v_stalled_batches(i) || ' finished. Checking log...');
        
        -- Optional wait to reduce immediate I/O strain between batches
        DBMS_LOCK.SLEEP(5); 
        
    END LOOP;
    
    DBMS_OUTPUT.PUT_LINE('--- ALL STALLED BATCHES HAVE BEEN EXECUTED ---');
EXCEPTION
    WHEN OTHERS THEN
        DBMS_OUTPUT.PUT_LINE('!!! FATAL ERROR DURING SEQUENTIAL EXECUTION: ' || SQLERRM);
        -- Note: Since the procedure commits per table, a final rollback isn't necessary.
        RAISE;
END;
/

===

CREATE OR REPLACE PROCEDURE count_table_batch(p_batch_id NUMBER) AS
    v_sql_staging VARCHAR2(4000);
    v_sql_count VARCHAR2(200);
    v_cnt NUMBER;
    v_job_name_log CONSTANT VARCHAR2(30) := 'BATCH_' || p_batch_id;
    v_error_msg VARCHAR2(4000);
    
    -- Cursor to iterate over the tables staged in the GTT
    CURSOR c_staged_tables IS
        SELECT owner, table_name FROM job_staging_gtt;

BEGIN
    -- PHASE 1: STAGE WORKLOAD (Quickly identify and stage ONLY the missing tables)
    v_sql_staging := '
        INSERT INTO job_staging_gtt (owner, table_name)
        SELECT 
            t.owner, 
            t.table_name
        FROM 
            all_tables t
        LEFT JOIN
            row_counts_scheduler_log l 
            -- Join only successful counts
            ON (t.owner = l.schema_name AND t.table_name = l.table_name AND l.row_count >= 0)
        WHERE 
            t.owner IN (''HR'',''SALES'',''FINANCE'') AND t.table_name NOT LIKE ''BIN$%''
            -- Apply the batch partitioning logic
            AND MOD(ORA_HASH(t.owner||t.table_name),100)+1 = :p_batch_id
            -- CRITICAL: Only include tables where a successful log entry does NOT exist
            AND l.table_name IS NULL';
            
    -- Execute staging insert. This happens fast and only affects the session.
    EXECUTE IMMEDIATE v_sql_staging USING p_batch_id;
    COMMIT; -- Commits the DML outside the main counting transaction

    -- PHASE 2: PROCESS STAGED WORKLOAD (Fast, isolated processing loop)
    FOR r IN c_staged_tables LOOP
        BEGIN 
            -- COUNT (The expensive, risky operation)
            v_sql_count := 'SELECT COUNT(*) FROM "'||r.owner||'"."'||r.table_name||'"';
            EXECUTE IMMEDIATE v_sql_count INTO v_cnt;
            
            -- DELETE/INSERT (The logging transaction)
            DELETE FROM row_counts_scheduler_log WHERE schema_name = r.owner AND table_name = r.table_name;
            INSERT INTO row_counts_scheduler_log (schema_name, table_name, row_count, count_timestamp, job_name, batch_id, error_message)
            VALUES (r.owner, r.table_name, v_cnt, SYSTIMESTAMP, v_job_name_log, p_batch_id, NULL);
            
            COMMIT;
        EXCEPTION 
            WHEN OTHERS THEN 
                -- Log failure
                v_error_msg := SUBSTR(SQLERRM, 1, 4000);
                INSERT INTO row_counts_scheduler_log (schema_name, table_name, row_count, count_timestamp, job_name, batch_id, error_message)
                VALUES (r.owner, r.table_name, -1, SYSTIMESTAMP, v_job_name_log, p_batch_id, v_error_msg);
                COMMIT;
        END;
    END LOOP;
    
    -- The GTT is automatically cleared when the session ends (ON COMMIT PRESERVE ROWS allows us to commit inside the loop).

EXCEPTION
    -- If staging failed entirely (e.g., severe privilege issue), log it.
    WHEN OTHERS THEN
        DBMS_OUTPUT.PUT_LINE('FATAL ERROR IN BATCH ' || p_batch_id || ' SETUP: ' || SQLERRM);
        RAISE;
END;
/

No comments: