WITH BatchActivity AS (
-- 1. Get all successful logs from the last 10 minutes
SELECT
batch_id,
job_name,
count_timestamp AS log_time
FROM
row_counts_scheduler_log
WHERE
row_count >= 0
AND count_timestamp >= SYSTIMESTAMP - INTERVAL '10' MINUTE
),
BatchTotals AS (
-- 2. Get the total cumulative successful count for each batch, ever
SELECT
batch_id,
COUNT(*) AS total_processed_ever
FROM
row_counts_scheduler_log
WHERE
row_count >= 0
GROUP BY
batch_id
)
SELECT
ba.batch_id,
ba.job_name,
-- Calculate new tables processed in the last 5 minutes
SUM(CASE WHEN ba.log_time >= SYSTIMESTAMP - INTERVAL '5' MINUTE THEN 1 ELSE 0 END) AS tables_processed_last_5_min,
-- Calculate total tables processed in the previous 5 minutes (5 to 10 min ago)
SUM(CASE WHEN ba.log_time >= SYSTIMESTAMP - INTERVAL '10' MINUTE
AND ba.log_time < SYSTIMESTAMP - INTERVAL '5' MINUTE THEN 1 ELSE 0 END) AS tables_processed_prev_5_min,
-- Retrieve the cumulative total processed by this job
bt.total_processed_ever
FROM
BatchActivity ba
JOIN
BatchTotals bt ON ba.batch_id = bt.batch_id
GROUP BY
ba.batch_id,
ba.job_name,
bt.total_processed_ever
ORDER BY
tables_processed_last_5_min DESC, -- Show currently active jobs first
ba.batch_id
=======
WITH BatchActivity AS (
-- 1. Get all successful logs from the last 15 minutes
SELECT batch_id, job_name, count_timestamp AS log_time
FROM row_counts_scheduler_log
WHERE row_count >= 0
AND count_timestamp >= SYSTIMESTAMP - INTERVAL '15' MINUTE
),
BatchTotals AS (
-- 2. Get the total cumulative successful count and last activity time for each batch, ever
SELECT batch_id, COUNT(*) AS total_processed_ever,
MAX(count_timestamp) AS last_activity
FROM row_counts_scheduler_log WHERE row_count >= 0
GROUP BY batch_id
),
ExpectedWorkload AS (
-- 3. CRITICAL FIX: Calculate the unique, expected total table count for each partition (batch_id)
SELECT MOD(ORA_HASH(owner||table_name),100)+1 AS batch_id,
COUNT(*) AS expected
FROM all_tables
WHERE owner IN ('HR','SALES','FINANCE') AND table_name NOT LIKE 'BIN$%'
GROUP BY MOD(ORA_HASH(owner||table_name),100)+1
)
SELECT
-- LIVE STATUS: Flags jobs that have not logged anything in the last 5 minutes
CASE WHEN SUM(CASE WHEN ba.log_time >= SYSTIMESTAMP - INTERVAL '5' MINUTE THEN 1 ELSE 0 END) = 0
THEN ' STALLED' ELSE ' ACTIVE' END AS "STATUS",
ba.batch_id AS "BATCH",
-- Activity Comparison
SUM(CASE WHEN ba.log_time >= SYSTIMESTAMP - INTERVAL '5' MINUTE THEN 1 ELSE 0 END) AS "NOW (tpm)",
SUM(CASE WHEN ba.log_time >= SYSTIMESTAMP - INTERVAL '10' MINUTE
AND ba.log_time < SYSTIMESTAMP - INTERVAL '5' MINUTE THEN 1 ELSE 0 END) AS "PREV (tpm)",
-- Progress Metrics
ew.expected AS "EXPECTED",
bt.total_processed_ever AS "DONE",
ROUND((bt.total_processed_ever / ew.expected) * 100, 1) AS "%",
-- ETA Calculation: (Remaining Work / Current Rate) * Time Window
-- Only calculate ETA if there was activity in the last 5 minutes to avoid division by zero
CASE WHEN SUM(CASE WHEN ba.log_time >= SYSTIMESTAMP - INTERVAL '5' MINUTE THEN 1 ELSE 0 END) > 0
THEN ROUND(
(ew.expected - bt.total_processed_ever) /
SUM(CASE WHEN ba.log_time >= SYSTIMESTAMP - INTERVAL '5' MINUTE THEN 1 ELSE 0 END) * 5, 1
)
ELSE NULL
END AS "ETA (min)",
TO_CHAR(bt.last_activity, 'HH24:MI:SS') AS "LAST TOUCH"
FROM BatchActivity ba
JOIN BatchTotals bt ON ba.batch_id = bt.batch_id
JOIN ExpectedWorkload ew ON ba.batch_id = ew.batch_id
GROUP BY ba.batch_id, ba.job_name, bt.total_processed_ever, bt.last_activity, ew.expected
ORDER BY
-- Prioritize currently active batches at the top
SUM(CASE WHEN ba.log_time >= SYSTIMESTAMP - INTERVAL '5' MINUTE THEN 1 ELSE 0 END) DESC,
ba.batch_id;
===
SET SERVEROUTPUT ON SIZE UNLIMITED
DECLARE
-- Add the specific IDs returned by the query in Step 1
TYPE t_batch_list IS TABLE OF NUMBER;
v_stalled_batches t_batch_list := t_batch_list(32, 33, 45, 61, 62, 70); -- 👈 UPDATE THIS LIST
BEGIN
DBMS_OUTPUT.PUT_LINE('--- INITIATING MANUAL SEQUENTIAL RESUME ---');
FOR i IN 1..v_stalled_batches.COUNT LOOP
DBMS_OUTPUT.PUT_LINE(RPAD('-', 60, '-'));
DBMS_OUTPUT.PUT_LINE('Processing Batch ID: ' || v_stalled_batches(i));
-- EXECUTE THE PROCEDURE
count_table_batch(v_stalled_batches(i));
-- The procedure contains the COMMIT, so this ensures the log is immediately updated
DBMS_OUTPUT.PUT_LINE('Batch ID ' || v_stalled_batches(i) || ' finished. Checking log...');
-- Optional wait to reduce immediate I/O strain between batches
DBMS_LOCK.SLEEP(5);
END LOOP;
DBMS_OUTPUT.PUT_LINE('--- ALL STALLED BATCHES HAVE BEEN EXECUTED ---');
EXCEPTION
WHEN OTHERS THEN
DBMS_OUTPUT.PUT_LINE('!!! FATAL ERROR DURING SEQUENTIAL EXECUTION: ' || SQLERRM);
-- Note: Since the procedure commits per table, a final rollback isn't necessary.
RAISE;
END;
/
===
CREATE OR REPLACE PROCEDURE count_table_batch(p_batch_id NUMBER) AS
v_sql_staging VARCHAR2(4000);
v_sql_count VARCHAR2(200);
v_cnt NUMBER;
v_job_name_log CONSTANT VARCHAR2(30) := 'BATCH_' || p_batch_id;
v_error_msg VARCHAR2(4000);
-- Cursor to iterate over the tables staged in the GTT
CURSOR c_staged_tables IS
SELECT owner, table_name FROM job_staging_gtt;
BEGIN
-- PHASE 1: STAGE WORKLOAD (Quickly identify and stage ONLY the missing tables)
v_sql_staging := '
INSERT INTO job_staging_gtt (owner, table_name)
SELECT
t.owner,
t.table_name
FROM
all_tables t
LEFT JOIN
row_counts_scheduler_log l
-- Join only successful counts
ON (t.owner = l.schema_name AND t.table_name = l.table_name AND l.row_count >= 0)
WHERE
t.owner IN (''HR'',''SALES'',''FINANCE'') AND t.table_name NOT LIKE ''BIN$%''
-- Apply the batch partitioning logic
AND MOD(ORA_HASH(t.owner||t.table_name),100)+1 = :p_batch_id
-- CRITICAL: Only include tables where a successful log entry does NOT exist
AND l.table_name IS NULL';
-- Execute staging insert. This happens fast and only affects the session.
EXECUTE IMMEDIATE v_sql_staging USING p_batch_id;
COMMIT; -- Commits the DML outside the main counting transaction
-- PHASE 2: PROCESS STAGED WORKLOAD (Fast, isolated processing loop)
FOR r IN c_staged_tables LOOP
BEGIN
-- COUNT (The expensive, risky operation)
v_sql_count := 'SELECT COUNT(*) FROM "'||r.owner||'"."'||r.table_name||'"';
EXECUTE IMMEDIATE v_sql_count INTO v_cnt;
-- DELETE/INSERT (The logging transaction)
DELETE FROM row_counts_scheduler_log WHERE schema_name = r.owner AND table_name = r.table_name;
INSERT INTO row_counts_scheduler_log (schema_name, table_name, row_count, count_timestamp, job_name, batch_id, error_message)
VALUES (r.owner, r.table_name, v_cnt, SYSTIMESTAMP, v_job_name_log, p_batch_id, NULL);
COMMIT;
EXCEPTION
WHEN OTHERS THEN
-- Log failure
v_error_msg := SUBSTR(SQLERRM, 1, 4000);
INSERT INTO row_counts_scheduler_log (schema_name, table_name, row_count, count_timestamp, job_name, batch_id, error_message)
VALUES (r.owner, r.table_name, -1, SYSTIMESTAMP, v_job_name_log, p_batch_id, v_error_msg);
COMMIT;
END;
END LOOP;
-- The GTT is automatically cleared when the session ends (ON COMMIT PRESERVE ROWS allows us to commit inside the loop).
EXCEPTION
-- If staging failed entirely (e.g., severe privilege issue), log it.
WHEN OTHERS THEN
DBMS_OUTPUT.PUT_LINE('FATAL ERROR IN BATCH ' || p_batch_id || ' SETUP: ' || SQLERRM);
RAISE;
END;
/
No comments:
Post a Comment