SET SERVEROUTPUT ON SIZE UNLIMITED
WHENEVER SQLERROR EXIT FAILURE
DECLARE
-- Configuration variables
c_batch_size CONSTANT NUMBER := 40;
c_job_prefix CONSTANT VARCHAR2(15) := 'BATCH_COUNT_';
-- Runtime Variables
v_total_source_count NUMBER;
v_max_batches_needed NUMBER; -- 🌟 FIX: Holds the actual highest partition ID
v_job_name VARCHAR2(128);
v_ddl_statement VARCHAR2(4000);
v_total_submitted NUMBER := 0;
v_pending_in_batch NUMBER;
-- Helper procedure to execute DDL safely (defined to avoid compile errors on missing objects)
PROCEDURE execute_ddl(p_sql IN VARCHAR2) IS
BEGIN
EXECUTE IMMEDIATE p_sql;
EXCEPTION
WHEN OTHERS THEN
-- Ignore common errors (including ORA-27475: unknown job)
IF SQLCODE NOT IN (-942, -4043, -27475) THEN
RAISE;
END IF;
END;
-- Helper function to check for object existence
FUNCTION object_exists(p_object_name IN VARCHAR2) RETURN BOOLEAN IS
v_exists NUMBER;
BEGIN
SELECT COUNT(*) INTO v_exists FROM user_objects WHERE object_name = UPPER(p_object_name);
RETURN v_exists > 0;
END;
-- P1. Initializes DDL and Structural Integrity
PROCEDURE initialize_system IS
BEGIN
DBMS_OUTPUT.PUT_LINE('--- PHASES 0 & 1: PERFORMING DESTRUCTIVE FIRST-TIME SETUP ---');
-- Cleanup Phase
execute_ddl('DROP VIEW row_count_summary');
execute_ddl('DROP TABLE row_count_control CASCADE CONSTRAINTS');
execute_ddl('DROP TABLE row_counts_scheduler_log CASCADE CONSTRAINTS');
execute_ddl('DROP TABLE job_staging_gtt');
-- Creation Phase (Omitting DDL body for brevity, assume correct)
EXECUTE IMMEDIATE '
CREATE TABLE row_count_control (
schema_name VARCHAR2(128) NOT NULL, table_name VARCHAR2(128) NOT NULL,
job_partition_id NUMBER NOT NULL, status VARCHAR2(10) DEFAULT ''PENDING'' NOT NULL,
start_time TIMESTAMP, end_time TIMESTAMP, error_message VARCHAR2(4000),
CONSTRAINT pk_control PRIMARY KEY (schema_name, table_name)
)';
EXECUTE IMMEDIATE '
CREATE TABLE row_counts_scheduler_log (
schema_name VARCHAR2(128) NOT NULL, table_name VARCHAR2(128) NOT NULL,
row_count NUMBER, count_timestamp TIMESTAMP, job_name VARCHAR2(128),
batch_id NUMBER, error_message VARCHAR2(4000),
CONSTRAINT pk_final_log PRIMARY KEY (schema_name, table_name)
)';
EXECUTE IMMEDIATE '
CREATE GLOBAL TEMPORARY TABLE job_staging_gtt (
owner VARCHAR2(128), table_name VARCHAR2(128)
) ON COMMIT PRESERVE ROWS';
-- Create Summary View
v_ddl_statement := '
CREATE OR REPLACE VIEW row_count_summary AS
SELECT
COUNT(*) AS total_tasks, SUM(CASE WHEN status = ''COMPLETE'' THEN 1 ELSE 0 END) AS success_count,
SUM(CASE WHEN status = ''FAILED'' THEN 1 ELSE 0 END) AS failed_count, SUM(CASE WHEN status = ''RUNNING'' THEN 1 ELSE 0 END) AS running_count,
ROUND(SUM(CASE WHEN status = ''COMPLETE'' THEN 1 ELSE 0 END) / GREATEST(COUNT(*),1) * 100, 1) AS pct_complete,
MAX(end_time) AS last_completion
FROM row_count_control';
EXECUTE IMMEDIATE v_ddl_statement;
COMMIT;
DBMS_OUTPUT.PUT_LINE(' -> SYSTEM DDL SETUP COMPLETE.');
END initialize_system;
-- P2. Populates the Master Control Table using NTILE
PROCEDURE prepare_workload IS
v_total_count NUMBER;
v_ddl_insert VARCHAR2(4000);
BEGIN
SELECT COUNT(*) INTO v_total_count
FROM all_tables WHERE owner IN ('HR','SALES','FINANCE') AND table_name NOT LIKE 'BIN$%';
v_batches_needed := CEIL(v_total_count / c_batch_size);
-- CRITICAL: MERGE is used to add only new tables, preserving existing statuses
v_ddl_insert := '
MERGE INTO row_count_control c
USING (
SELECT owner AS schema_name, table_name,
NTILE(:num_batches) OVER (ORDER BY owner, table_name) AS job_partition_id
FROM all_tables
WHERE owner IN (''HR'',''SALES'',''FINANCE'') AND table_name NOT LIKE ''BIN$%''
) s
ON (c.schema_name = s.schema_name AND c.table_name = s.table_name)
WHEN NOT MATCHED THEN
INSERT (schema_name, table_name, job_partition_id, status)
VALUES (s.schema_name, s.table_name, s.job_partition_id, ''PENDING'')';
EXECUTE IMMEDIATE v_ddl_insert USING v_batches_needed;
COMMIT;
DBMS_OUTPUT.PUT_LINE(' -> WORKLOAD PREPARED. ' || SQL%ROWCOUNT || ' new tasks added/checked. Total partitions: ' || v_batches_needed);
END prepare_workload;
-- P3. Clears abandoned 'RUNNING' tasks
PROCEDURE reset_stalled_tasks IS
v_stale_threshold_minutes CONSTANT NUMBER := 60;
BEGIN
UPDATE row_count_control
SET status = 'PENDING', start_time = NULL, end_time = NULL, error_message = 'RESET: Task was forcefully recycled from RUNNING state.'
WHERE status = 'RUNNING'
AND start_time < SYSTIMESTAMP - INTERVAL '1' MINUTE * v_stale_threshold_minutes;
COMMIT;
END reset_stalled_tasks;
-- P4. Creates the core counting procedure
PROCEDURE create_counting_procedure IS
BEGIN
v_ddl_statement := '
CREATE OR REPLACE PROCEDURE count_table_batch(p_batch_id NUMBER) AS
v_sql_staging_filter VARCHAR2(4000); v_sql_count VARCHAR2(200); v_cnt NUMBER;
v_job_name_log CONSTANT VARCHAR2(30) := ''BATCH_'' || p_batch_id; v_error_msg VARCHAR2(4000);
CURSOR c_staged_tables IS SELECT owner, table_name FROM job_staging_gtt;
BEGIN
-- Phase 1: STAGE WORKLOAD (Isolation and Resumption)
EXECUTE IMMEDIATE ''TRUNCATE TABLE job_staging_gtt'';
v_sql_staging_filter := ''
INSERT INTO job_staging_gtt (owner, table_name)
SELECT schema_name, table_name
FROM row_count_control
WHERE job_partition_id = :p_batch_id AND status IN (''''PENDING'''', ''''FAILED'''')
'';
EXECUTE IMMEDIATE v_sql_staging_filter USING p_batch_id;
COMMIT;
-- Phase 2: PROCESS STAGED WORKLOAD
FOR r IN c_staged_tables LOOP
BEGIN
-- Update status to RUNNING immediately (for stall detection)
UPDATE row_count_control SET status = ''RUNNING'', start_time = SYSTIMESTAMP
WHERE schema_name = r.owner AND table_name = r.table_name;
COMMIT;
v_sql_count := ''SELECT COUNT(*) FROM "''||r.owner||''"."''||r.table_name||''"'';
EXECUTE IMMEDIATE v_sql_count INTO v_cnt;
-- LOG SUCCESS and Mark COMPLETE
DELETE FROM row_counts_scheduler_log WHERE schema_name = r.owner AND table_name = r.table_name;
INSERT INTO row_counts_scheduler_log (schema_name, table_name, row_count, count_timestamp, job_name, batch_id, error_message)
VALUES (r.owner, r.table_name, v_cnt, SYSTIMESTAMP, v_job_name_log, p_batch_id, NULL);
UPDATE row_count_control SET status = ''COMPLETE'', end_time = SYSTIMESTAMP, error_message = NULL
WHERE schema_name = r.owner AND table_name = r.table_name;
COMMIT;
EXCEPTION
WHEN OTHERS THEN
v_error_msg := SUBSTR(SQLERRM, 1, 4000);
-- Mark task as FAILED (prevents automatic retries until DBA intervenes)
UPDATE row_count_control SET status = ''FAILED'', end_time = SYSTIMESTAMP, error_message = v_error_msg
WHERE schema_name = r.owner AND table_name = r.table_name;
INSERT INTO row_counts_scheduler_log (schema_name, table_name, row_count, count_timestamp, job_name, batch_id, error_message)
VALUES (r.owner, r.table_name, -1, SYSTIMESTAMP, v_job_name_log, p_batch_id, v_error_msg);
COMMIT;
END;
END LOOP;
END;
';
EXECUTE IMMEDIATE v_ddl_statement;
DBMS_OUTPUT.PUT_LINE(' -> Counting Procedure created.');
END create_counting_procedure;
BEGIN
--------------------------------------------------------------------------------------------------
-- 0. INITIAL LAUNCH OR RESUME CHECK
--------------------------------------------------------------------------------------------------
IF NOT object_exists('ROW_COUNT_CONTROL') THEN
DBMS_OUTPUT.PUT_LINE('--- PHASE 1: FULL SYSTEM INITIALIZATION (FIRST RUN) ---');
initialize_system;
prepare_workload; -- Populates the queue for the very first time
ELSE
DBMS_OUTPUT.PUT_LINE('--- PHASE 1: SYSTEM IS LIVE. INITIATING RESTART/RESUME PROTOCOL ---');
prepare_workload; -- Check for new schemas/tables added since last run
END IF;
create_counting_procedure; -- Ensure procedure is compiled with latest logic
--------------------------------------------------------------------------------------------------
-- 1. EXECUTION AND RESTART PROTOCOL
--------------------------------------------------------------------------------------------------
DBMS_OUTPUT.PUT_LINE('PHASE 2: EXECUTING RESTART PROTOCOL');
-- Step A: Clear abandoned locks and recycle tasks
reset_stalled_tasks;
-- 🌟 CRITICAL FIX: Get final batch count from the actual MAX job partition ID 🌟
SELECT MAX(job_partition_id) INTO v_batches_needed
FROM row_count_control;
-- Safety check for the case where the control table is empty
IF v_batches_needed IS NULL THEN
DBMS_OUTPUT.PUT_LINE('ERROR: No tables found in the control queue. Cannot proceed.');
RETURN;
END IF;
-- Step B: Launch/Relaunch Execution
DBMS_OUTPUT.PUT_LINE('Launching up to ' || v_batches_needed || ' parallel jobs...');
FOR i IN 1..v_batches_needed LOOP
v_job_name := c_job_prefix || LPAD(i, 2, '0');
-- Check if this batch has any PENDING or FAILED work before launching (Optimization)
SELECT COUNT(*) INTO v_pending_in_batch
FROM row_count_control
WHERE job_partition_id = i AND status IN ('PENDING', 'FAILED');
IF v_pending_in_batch > 0 THEN
-- Cleanup old job definition (Avoids ORA-27475 crash)
execute_ddl('BEGIN DBMS_SCHEDULER.DROP_JOB(''' || v_job_name || ''', TRUE); END;');
-- Create, Set Argument, and Enable NEW JOB
DBMS_SCHEDULER.CREATE_JOB (
job_name => v_job_name, job_type => 'STORED_PROCEDURE', job_action => 'COUNT_TABLE_BATCH',
number_of_arguments => 1, start_date => SYSTIMESTAMP, repeat_interval => NULL, enabled => FALSE
);
-- Set argument (using TO_CHAR to avoid PLS-00307 ambiguity)
DBMS_SCHEDULER.SET_JOB_ARGUMENT_VALUE(v_job_name, 1, TO_CHAR(i));
DBMS_SCHEDULER.ENABLE(v_job_name);
v_total_submitted := v_total_submitted + 1;
DBMS_OUTPUT.PUT_LINE(' -> Submitted Job ' || v_job_name || ' (' || v_pending_in_batch || ' tasks pending/failed)');
END IF;
END LOOP;
COMMIT;
DBMS_OUTPUT.PUT_LINE('--- FINAL SUCCESS: ALL ' || v_total_submitted || ' jobs successfully launched/resumed. ---');
EXCEPTION
WHEN OTHERS THEN
DBMS_OUTPUT.PUT_LINE('!!! FATAL ERROR IN MASTER BLOCK !!! SQLERRM: ' || SQLERRM);
ROLLBACK;
RAISE;
END;
/
====================================================================
UPDATE row_count_control
SET
status = 'PENDING',
start_time = NULL,
end_time = NULL,
error_message = 'RETRY FORCED by DBA. Originally: ' || NVL(error_message, 'Unknown.')
WHERE
-- Target tasks that were abandoned (stuck in RUNNING for > 1 hour)
(status = 'RUNNING' AND start_time < SYSTIMESTAMP - INTERVAL '60' MINUTE)
-- OR Target tasks that previously failed but are now ready for retry
OR status = 'FAILED';
COMMIT;
No comments:
Post a Comment