CREATE OR REPLACE PACKAGE BODY PDROP_BG_PKG AS
--------------------------------------------------------------------------
-- Helper: Y/N flag
--------------------------------------------------------------------------
FUNCTION is_yes(p_value IN VARCHAR2) RETURN BOOLEAN IS
BEGIN
RETURN UPPER(SUBSTR(NVL(p_value, 'N'), 1, 1)) = 'Y';
END is_yes;
--------------------------------------------------------------------------
-- Helper: normalize owner/table names
--------------------------------------------------------------------------
FUNCTION normalize_name(p_name IN VARCHAR2) RETURN VARCHAR2 IS
BEGIN
RETURN UPPER(TRIM(p_name));
END normalize_name;
--------------------------------------------------------------------------
-- Helper: double-quote object names safely
--------------------------------------------------------------------------
FUNCTION qname(p_name IN VARCHAR2) RETURN VARCHAR2 IS
BEGIN
RETURN '"' || REPLACE(p_name, '"', '""') || '"';
END qname;
--------------------------------------------------------------------------
-- Segment MB for table and table partitions/subpartitions
--------------------------------------------------------------------------
FUNCTION get_segment_mb (
p_owner IN VARCHAR2,
p_table IN VARCHAR2
) RETURN NUMBER IS
l_mb NUMBER;
BEGIN
SELECT ROUND(NVL(SUM(bytes), 0) / 1024 / 1024, 2)
INTO l_mb
FROM all_segments
WHERE owner = p_owner
AND segment_name = p_table
AND segment_type IN (
'TABLE',
'TABLE PARTITION',
'TABLE SUBPARTITION'
);
RETURN l_mb;
EXCEPTION
WHEN OTHERS THEN
RETURN NULL;
END get_segment_mb;
--------------------------------------------------------------------------
-- Partition count for target table
--------------------------------------------------------------------------
FUNCTION get_partition_count (
p_owner IN VARCHAR2,
p_table IN VARCHAR2
) RETURN NUMBER IS
l_cnt NUMBER;
BEGIN
SELECT COUNT(*)
INTO l_cnt
FROM all_tab_partitions
WHERE table_owner = p_owner
AND table_name = p_table;
RETURN l_cnt;
EXCEPTION
WHEN OTHERS THEN
RETURN NULL;
END get_partition_count;
--------------------------------------------------------------------------
-- Count unusable global indexes for a target table
--------------------------------------------------------------------------
FUNCTION get_unusable_global_index_count (
p_owner IN VARCHAR2,
p_table IN VARCHAR2
) RETURN NUMBER IS
l_cnt NUMBER;
BEGIN
SELECT
(
SELECT COUNT(*)
FROM all_indexes i
WHERE i.table_owner = p_owner
AND i.table_name = p_table
AND i.partitioned = 'NO'
AND i.status = 'UNUSABLE'
)
+
(
SELECT COUNT(*)
FROM all_ind_partitions ip
JOIN all_indexes i
ON i.owner = ip.index_owner
AND i.index_name = ip.index_name
JOIN all_part_indexes pi
ON pi.owner = i.owner
AND pi.index_name = i.index_name
WHERE i.table_owner = p_owner
AND i.table_name = p_table
AND pi.locality = 'GLOBAL'
AND ip.status = 'UNUSABLE'
)
INTO l_cnt
FROM dual;
RETURN l_cnt;
EXCEPTION
WHEN OTHERS THEN
RETURN NULL;
END get_unusable_global_index_count;
--------------------------------------------------------------------------
-- Validate table is single-column DATE/TIMESTAMP partitioned.
-- NOTE:
-- This is used as a warning guard only in run_cleanup.
-- The package still relies on HIGH_VALUE date parsing before any DROP.
--------------------------------------------------------------------------
FUNCTION table_has_single_date_key (
p_owner IN VARCHAR2,
p_table IN VARCHAR2
) RETURN BOOLEAN IS
l_key_count NUMBER;
l_data_type VARCHAR2(128);
BEGIN
SELECT COUNT(*), MAX(c.data_type)
INTO l_key_count, l_data_type
FROM all_part_key_columns pk
JOIN all_tab_cols c
ON c.owner = pk.owner
AND c.table_name = pk.name
AND c.column_name = pk.column_name
WHERE pk.owner = p_owner
AND pk.name = p_table
AND pk.object_type = 'TABLE';
RETURN (
l_key_count = 1
AND (
l_data_type = 'DATE'
OR l_data_type LIKE 'TIMESTAMP%'
)
);
EXCEPTION
WHEN OTHERS THEN
RETURN FALSE;
END table_has_single_date_key;
--------------------------------------------------------------------------
-- Convert partition HIGH_VALUE text to DATE
--------------------------------------------------------------------------
FUNCTION get_hv_date(p_hv_text IN VARCHAR2) RETURN DATE IS
l_d DATE;
l_date_text VARCHAR2(100);
BEGIN
IF p_hv_text IS NULL OR UPPER(p_hv_text) LIKE '%MAXVALUE%' THEN
RETURN NULL;
END IF;
-- Attempt 1: direct CAST from Oracle-generated HIGH_VALUE expression
BEGIN
EXECUTE IMMEDIATE
'SELECT CAST(' || p_hv_text || ' AS DATE) FROM dual'
INTO l_d;
RETURN l_d;
EXCEPTION
WHEN OTHERS THEN
NULL;
END;
-- Attempt 2: TIMESTAMP fallback
BEGIN
EXECUTE IMMEDIATE
'SELECT TRUNC(CAST(' || p_hv_text || ' AS TIMESTAMP)) FROM dual'
INTO l_d;
RETURN l_d;
EXCEPTION
WHEN OTHERS THEN
NULL;
END;
-- Attempt 3: regex fallback for common DATE text
l_date_text := REGEXP_SUBSTR(
p_hv_text,
'[0-9]{4}-[0-9]{2}-[0-9]{2}'
);
IF l_date_text IS NOT NULL THEN
BEGIN
RETURN TO_DATE(l_date_text, 'YYYY-MM-DD');
EXCEPTION
WHEN OTHERS THEN
RETURN NULL;
END;
END IF;
RETURN NULL;
END get_hv_date;
--------------------------------------------------------------------------
-- Detail log
--------------------------------------------------------------------------
PROCEDURE log_detail (
p_run_id IN VARCHAR2,
p_owner IN VARCHAR2,
p_table IN VARCHAR2,
p_partition IN VARCHAR2,
p_partition_position IN NUMBER,
p_max_partition_pos IN NUMBER,
p_high_value_text IN VARCHAR2,
p_high_value_date IN DATE,
p_stats_num_rows IN NUMBER,
p_last_analyzed IN DATE,
p_live_count IN NUMBER,
p_action IN VARCHAR2,
p_sql_text IN CLOB,
p_error_msg IN VARCHAR2 DEFAULT NULL
) IS
PRAGMA AUTONOMOUS_TRANSACTION;
BEGIN
INSERT INTO PDROP_BG_DETAIL (
run_id,
source_owner,
table_name,
partition_name,
partition_position,
max_partition_pos,
high_value_text,
high_value_date,
stats_num_rows,
last_analyzed,
live_count,
action,
sql_text,
error_msg
)
VALUES (
p_run_id,
p_owner,
p_table,
p_partition,
p_partition_position,
p_max_partition_pos,
p_high_value_text,
p_high_value_date,
p_stats_num_rows,
p_last_analyzed,
p_live_count,
p_action,
p_sql_text,
p_error_msg
);
COMMIT;
EXCEPTION
WHEN OTHERS THEN
NULL;
END log_detail;
--------------------------------------------------------------------------
-- Index log
--------------------------------------------------------------------------
PROCEDURE log_index (
p_run_id IN VARCHAR2,
p_index_owner IN VARCHAR2,
p_index_name IN VARCHAR2,
p_partition IN VARCHAR2,
p_table_owner IN VARCHAR2,
p_table_name IN VARCHAR2,
p_action IN VARCHAR2,
p_sql_text IN CLOB,
p_error_msg IN VARCHAR2 DEFAULT NULL
) IS
PRAGMA AUTONOMOUS_TRANSACTION;
BEGIN
INSERT INTO PDROP_BG_INDEX_LOG (
run_id,
index_owner,
index_name,
partition_name,
table_owner,
table_name,
action,
sql_text,
error_msg
)
VALUES (
p_run_id,
p_index_owner,
p_index_name,
p_partition,
p_table_owner,
p_table_name,
p_action,
p_sql_text,
p_error_msg
);
COMMIT;
EXCEPTION
WHEN OTHERS THEN
NULL;
END log_index;
--------------------------------------------------------------------------
-- Clear target list
--------------------------------------------------------------------------
PROCEDURE clear_targets (
p_owner IN VARCHAR2 DEFAULT NULL
) IS
l_owner VARCHAR2(128);
BEGIN
IF p_owner IS NULL THEN
DELETE FROM PDROP_BG_TARGETS;
ELSE
l_owner := normalize_name(p_owner);
DELETE FROM PDROP_BG_TARGETS
WHERE source_owner = l_owner;
END IF;
COMMIT;
END clear_targets;
--------------------------------------------------------------------------
-- Add target table
--------------------------------------------------------------------------
PROCEDURE add_target (
p_owner IN VARCHAR2,
p_table_name IN VARCHAR2,
p_enabled_flag IN VARCHAR2 DEFAULT 'Y',
p_notes IN VARCHAR2 DEFAULT NULL
) IS
l_owner VARCHAR2(128);
l_table_name VARCHAR2(128);
BEGIN
l_owner := normalize_name(p_owner);
l_table_name := normalize_name(p_table_name);
MERGE INTO PDROP_BG_TARGETS t
USING (
SELECT
l_owner AS source_owner,
l_table_name AS table_name
FROM dual
) s
ON (
t.source_owner = s.source_owner
AND t.table_name = s.table_name
)
WHEN MATCHED THEN
UPDATE SET
t.enabled_flag = UPPER(SUBSTR(NVL(p_enabled_flag, 'Y'), 1, 1)),
t.notes = p_notes,
t.updated_at = SYSTIMESTAMP
WHEN NOT MATCHED THEN
INSERT (
source_owner,
table_name,
enabled_flag,
notes
)
VALUES (
s.source_owner,
s.table_name,
UPPER(SUBSTR(NVL(p_enabled_flag, 'Y'), 1, 1)),
p_notes
);
COMMIT;
END add_target;
--------------------------------------------------------------------------
-- Create run
--------------------------------------------------------------------------
PROCEDURE create_run (
p_run_mode IN VARCHAR2 DEFAULT 'DRY_RUN',
p_cutoff_date IN DATE DEFAULT DATE '2025-01-01',
p_update_global_indexes IN VARCHAR2 DEFAULT 'Y',
p_gather_stats IN VARCHAR2 DEFAULT 'Y',
p_rebuild_unusable_indexes IN VARCHAR2 DEFAULT 'Y',
p_parallel_degree IN NUMBER DEFAULT 8,
p_log_skips IN VARCHAR2 DEFAULT 'N',
p_run_id OUT VARCHAR2
) IS
l_seq NUMBER;
l_run_id VARCHAR2(100);
l_job_name VARCHAR2(128);
l_targets NUMBER;
l_run_mode VARCHAR2(20) := UPPER(TRIM(p_run_mode));
BEGIN
IF l_run_mode NOT IN ('DRY_RUN', 'EXECUTE') THEN
RAISE_APPLICATION_ERROR(-20001, 'Run mode must be DRY_RUN or EXECUTE.');
END IF;
SELECT COUNT(*)
INTO l_targets
FROM PDROP_BG_TARGETS
WHERE enabled_flag = 'Y';
IF l_targets = 0 THEN
RAISE_APPLICATION_ERROR(-20002, 'No enabled targets found in PDROP_BG_TARGETS.');
END IF;
SELECT PDROP_BG_SEQ.NEXTVAL
INTO l_seq
FROM dual;
l_run_id := 'PDROP_' || TO_CHAR(SYSTIMESTAMP, 'YYYYMMDD_HH24MISS_') || l_seq;
l_job_name := 'PDROPJ_' || l_seq;
INSERT INTO PDROP_BG_RUNS (
run_id,
scheduler_job_name,
run_mode,
cutoff_date,
update_global_indexes,
gather_stats,
rebuild_unusable_indexes,
parallel_degree,
log_skips,
status,
created_by,
total_targets,
total_partitions_checked,
candidate_count,
dropped_count,
error_count,
warning_count,
notes
)
VALUES (
l_run_id,
l_job_name,
l_run_mode,
p_cutoff_date,
UPPER(SUBSTR(NVL(p_update_global_indexes, 'Y'), 1, 1)),
UPPER(SUBSTR(NVL(p_gather_stats, 'Y'), 1, 1)),
UPPER(SUBSTR(NVL(p_rebuild_unusable_indexes, 'Y'), 1, 1)),
p_parallel_degree,
UPPER(SUBSTR(NVL(p_log_skips, 'N'), 1, 1)),
'READY',
SYS_CONTEXT('USERENV', 'SESSION_USER'),
l_targets,
0,
0,
0,
0,
0,
CASE
WHEN l_run_mode = 'EXECUTE'
AND UPPER(SUBSTR(NVL(p_update_global_indexes, 'Y'), 1, 1)) = 'N'
THEN 'EXECUTE without UPDATE GLOBAL INDEXES. Scheduler rebuild fallback should remain enabled.'
ELSE NULL
END
);
INSERT INTO PDROP_BG_RUN_TARGETS (
run_id,
source_owner,
table_name,
status,
candidate_count,
dropped_count,
error_count
)
SELECT
l_run_id,
source_owner,
table_name,
'PENDING',
0,
0,
0
FROM PDROP_BG_TARGETS
WHERE enabled_flag = 'Y';
COMMIT;
p_run_id := l_run_id;
END create_run;
--------------------------------------------------------------------------
-- Capture before checks.
-- Guarded by before_time IS NULL to avoid duplicate baseline overwrite.
--------------------------------------------------------------------------
PROCEDURE capture_before (
p_run_id IN VARCHAR2
) IS
l_partition_count NUMBER;
l_segment_mb NUMBER;
l_unusable_count NUMBER;
BEGIN
FOR t IN (
SELECT source_owner, table_name
FROM PDROP_BG_RUN_TARGETS
WHERE run_id = p_run_id
AND before_time IS NULL
ORDER BY source_owner, table_name
) LOOP
l_partition_count := get_partition_count(t.source_owner, t.table_name);
l_segment_mb := get_segment_mb(t.source_owner, t.table_name);
l_unusable_count := get_unusable_global_index_count(t.source_owner, t.table_name);
UPDATE PDROP_BG_RUN_TARGETS
SET before_time = SYSTIMESTAMP,
before_partition_count = l_partition_count,
before_segment_mb = l_segment_mb,
before_unusable_index_count = l_unusable_count,
status = CASE
WHEN status = 'PENDING'
THEN 'BEFORE_CAPTURED'
ELSE status
END
WHERE run_id = p_run_id
AND source_owner = t.source_owner
AND table_name = t.table_name;
END LOOP;
UPDATE PDROP_BG_RUNS
SET status = CASE
WHEN status = 'READY'
THEN 'BEFORE_CAPTURED'
ELSE status
END
WHERE run_id = p_run_id;
COMMIT;
END capture_before;
--------------------------------------------------------------------------
-- Rebuild unusable global indexes for one table
--------------------------------------------------------------------------
PROCEDURE rebuild_indexes_for_table (
p_run_id IN VARCHAR2,
p_owner IN VARCHAR2,
p_table IN VARCHAR2,
p_parallel_degree IN NUMBER,
p_rebuild_count IN OUT NUMBER,
p_table_errors IN OUT NUMBER,
p_total_errors IN OUT NUMBER
) IS
l_sql VARCHAR2(32767);
l_errmsg VARCHAR2(4000);
BEGIN
----------------------------------------------------------------------
-- Non-partitioned global indexes
----------------------------------------------------------------------
FOR i IN (
SELECT owner, index_name
FROM all_indexes
WHERE table_owner = p_owner
AND table_name = p_table
AND partitioned = 'NO'
AND status = 'UNUSABLE'
ORDER BY owner, index_name
) LOOP
BEGIN
l_sql :=
'ALTER INDEX ' || qname(i.owner) || '.' || qname(i.index_name) ||
' REBUILD PARALLEL ' || p_parallel_degree;
EXECUTE IMMEDIATE l_sql;
BEGIN
EXECUTE IMMEDIATE
'ALTER INDEX ' || qname(i.owner) || '.' || qname(i.index_name) ||
' NOPARALLEL';
EXCEPTION
WHEN OTHERS THEN
NULL;
END;
p_rebuild_count := p_rebuild_count + 1;
log_index(
p_run_id,
i.owner,
i.index_name,
NULL,
p_owner,
p_table,
'IDX-REBUILT',
l_sql,
NULL
);
EXCEPTION
WHEN OTHERS THEN
l_errmsg := SQLERRM;
p_table_errors := p_table_errors + 1;
p_total_errors := p_total_errors + 1;
log_index(
p_run_id,
i.owner,
i.index_name,
NULL,
p_owner,
p_table,
'IDX-ERROR',
l_sql,
l_errmsg
);
END;
END LOOP;
----------------------------------------------------------------------
-- Partitioned GLOBAL indexes only
----------------------------------------------------------------------
FOR ip IN (
SELECT
ip.index_owner,
ip.index_name,
ip.partition_name
FROM all_ind_partitions ip
JOIN all_indexes i
ON i.owner = ip.index_owner
AND i.index_name = ip.index_name
JOIN all_part_indexes pi
ON pi.owner = i.owner
AND pi.index_name = i.index_name
WHERE i.table_owner = p_owner
AND i.table_name = p_table
AND pi.locality = 'GLOBAL'
AND ip.status = 'UNUSABLE'
ORDER BY ip.index_owner, ip.index_name, ip.partition_name
) LOOP
BEGIN
l_sql :=
'ALTER INDEX ' || qname(ip.index_owner) || '.' || qname(ip.index_name) ||
' REBUILD PARTITION ' || qname(ip.partition_name) ||
' PARALLEL ' || p_parallel_degree;
EXECUTE IMMEDIATE l_sql;
BEGIN
EXECUTE IMMEDIATE
'ALTER INDEX ' || qname(ip.index_owner) || '.' || qname(ip.index_name) ||
' NOPARALLEL';
EXCEPTION
WHEN OTHERS THEN
NULL;
END;
p_rebuild_count := p_rebuild_count + 1;
log_index(
p_run_id,
ip.index_owner,
ip.index_name,
ip.partition_name,
p_owner,
p_table,
'IDX-PART-REBUILT',
l_sql,
NULL
);
EXCEPTION
WHEN OTHERS THEN
l_errmsg := SQLERRM;
p_table_errors := p_table_errors + 1;
p_total_errors := p_total_errors + 1;
log_index(
p_run_id,
ip.index_owner,
ip.index_name,
ip.partition_name,
p_owner,
p_table,
'IDX-PART-ERROR',
l_sql,
l_errmsg
);
END;
END LOOP;
END rebuild_indexes_for_table;
--------------------------------------------------------------------------
-- Main cleanup procedure executed by scheduler
--------------------------------------------------------------------------
PROCEDURE run_cleanup (
p_run_id IN VARCHAR2
) IS
l_run PDROP_BG_RUNS%ROWTYPE;
l_last_pos NUMBER;
l_hv_long LONG;
l_hv_text VARCHAR2(4000);
l_hv_date DATE;
l_live_count NUMBER;
l_sql VARCHAR2(32767);
l_table_candidates NUMBER;
l_table_dropped NUMBER;
l_table_errors NUMBER;
l_total_partitions NUMBER := 0;
l_total_candidates NUMBER := 0;
l_total_dropped NUMBER := 0;
l_total_errors NUMBER := 0;
l_total_warnings NUMBER := 0;
l_index_rebuild_count NUMBER := 0;
l_after_partition_count NUMBER;
l_after_segment_mb NUMBER;
l_after_unusable_count NUMBER;
l_errmsg VARCHAR2(4000);
BEGIN
SELECT *
INTO l_run
FROM PDROP_BG_RUNS
WHERE run_id = p_run_id;
UPDATE PDROP_BG_RUNS
SET status = 'RUNNING',
start_time = NVL(start_time, SYSTIMESTAMP)
WHERE run_id = p_run_id;
COMMIT;
-- Safe even if already captured; capture_before only updates rows where before_time IS NULL.
capture_before(p_run_id);
FOR t IN (
SELECT source_owner, table_name
FROM PDROP_BG_RUN_TARGETS
WHERE run_id = p_run_id
ORDER BY source_owner, table_name
) LOOP
l_table_candidates := 0;
l_table_dropped := 0;
l_table_errors := 0;
UPDATE PDROP_BG_RUN_TARGETS
SET status = 'RUNNING'
WHERE run_id = p_run_id
AND source_owner = t.source_owner
AND table_name = t.table_name;
COMMIT;
------------------------------------------------------------------
-- Soft metadata warning only.
-- Do not hard-skip here; HIGH_VALUE parse/date/count guards still protect drops.
------------------------------------------------------------------
IF NOT table_has_single_date_key(t.source_owner, t.table_name) THEN
l_total_warnings := l_total_warnings + 1;
log_detail(
p_run_id,
t.source_owner,
t.table_name,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
'WARN-KEY-CHECK',
NULL,
'Partition key is not confirmed as single-column DATE/TIMESTAMP. Proceeding with HIGH_VALUE parse guard only.'
);
END IF;
IF is_yes(l_run.gather_stats) THEN
BEGIN
DBMS_STATS.GATHER_TABLE_STATS(
ownname => t.source_owner,
tabname => t.table_name,
estimate_percent => DBMS_STATS.AUTO_SAMPLE_SIZE,
cascade => FALSE,
granularity => 'ALL',
degree => CASE
WHEN NVL(l_run.parallel_degree, 0) > 0
THEN l_run.parallel_degree
ELSE DBMS_STATS.AUTO_DEGREE
END,
no_invalidate => DBMS_STATS.AUTO_INVALIDATE
);
EXCEPTION
WHEN OTHERS THEN
l_errmsg := SQLERRM;
l_table_errors := l_table_errors + 1;
l_total_errors := l_total_errors + 1;
log_detail(
p_run_id,
t.source_owner,
t.table_name,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
'STATS-ERROR',
NULL,
l_errmsg
);
END;
END IF;
SELECT MAX(partition_position)
INTO l_last_pos
FROM all_tab_partitions
WHERE table_owner = t.source_owner
AND table_name = t.table_name;
IF l_last_pos IS NULL THEN
l_total_warnings := l_total_warnings + 1;
log_detail(
p_run_id,
t.source_owner,
t.table_name,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
'SKIP-NO-PARTITIONS',
NULL,
'No partitions found in ALL_TAB_PARTITIONS for this target table.'
);
l_after_partition_count := get_partition_count(t.source_owner, t.table_name);
l_after_segment_mb := get_segment_mb(t.source_owner, t.table_name);
l_after_unusable_count := get_unusable_global_index_count(t.source_owner, t.table_name);
UPDATE PDROP_BG_RUN_TARGETS
SET status = 'SKIPPED',
after_time = SYSTIMESTAMP,
after_partition_count = l_after_partition_count,
after_segment_mb = l_after_segment_mb,
after_unusable_index_count = l_after_unusable_count,
candidate_count = l_table_candidates,
dropped_count = l_table_dropped,
error_count = l_table_errors,
notes = 'Skipped: no partitions found.'
WHERE run_id = p_run_id
AND source_owner = t.source_owner
AND table_name = t.table_name;
UPDATE PDROP_BG_RUNS
SET total_partitions_checked = l_total_partitions,
candidate_count = l_total_candidates,
dropped_count = l_total_dropped,
error_count = l_total_errors,
warning_count = l_total_warnings
WHERE run_id = p_run_id;
COMMIT;
CONTINUE;
END IF;
FOR p IN (
SELECT
partition_name,
partition_position,
num_rows,
last_analyzed,
high_value
FROM all_tab_partitions
WHERE table_owner = t.source_owner
AND table_name = t.table_name
ORDER BY partition_position
) LOOP
l_total_partitions := l_total_partitions + 1;
l_hv_long := p.high_value;
l_hv_text := SUBSTR(l_hv_long, 1, 4000);
l_hv_date := NULL;
l_live_count := NULL;
l_sql := NULL;
IF LENGTH(l_hv_text) >= 4000 THEN
l_total_warnings := l_total_warnings + 1;
log_detail(
p_run_id,
t.source_owner,
t.table_name,
p.partition_name,
p.partition_position,
l_last_pos,
l_hv_text,
NULL,
p.num_rows,
p.last_analyzed,
NULL,
'SKIP-HV-TRUNCATED',
NULL,
'HIGH_VALUE may be truncated at 4000 characters; manual review required.'
);
CONTINUE;
END IF;
IF p.partition_position = l_last_pos THEN
IF is_yes(l_run.log_skips) THEN
log_detail(
p_run_id,
t.source_owner,
t.table_name,
p.partition_name,
p.partition_position,
l_last_pos,
l_hv_text,
NULL,
p.num_rows,
p.last_analyzed,
NULL,
'SKIP-LAST-PARTITION',
NULL,
NULL
);
END IF;
CONTINUE;
END IF;
IF UPPER(l_hv_text) LIKE '%MAXVALUE%' THEN
IF is_yes(l_run.log_skips) THEN
log_detail(
p_run_id,
t.source_owner,
t.table_name,
p.partition_name,
p.partition_position,
l_last_pos,
l_hv_text,
NULL,
p.num_rows,
p.last_analyzed,
NULL,
'SKIP-MAXVALUE',
NULL,
NULL
);
END IF;
CONTINUE;
END IF;
l_hv_date := get_hv_date(l_hv_text);
IF l_hv_date IS NULL THEN
l_total_warnings := l_total_warnings + 1;
log_detail(
p_run_id,
t.source_owner,
t.table_name,
p.partition_name,
p.partition_position,
l_last_pos,
l_hv_text,
NULL,
p.num_rows,
p.last_analyzed,
NULL,
'SKIP-HV-PARSE',
NULL,
NULL
);
CONTINUE;
END IF;
IF l_hv_date > l_run.cutoff_date THEN
IF is_yes(l_run.log_skips) THEN
log_detail(
p_run_id,
t.source_owner,
t.table_name,
p.partition_name,
p.partition_position,
l_last_pos,
l_hv_text,
l_hv_date,
p.num_rows,
p.last_analyzed,
NULL,
'SKIP-FUTURE',
NULL,
NULL
);
END IF;
CONTINUE;
END IF;
------------------------------------------------------------------
-- Always log no-stats partition skips, even when log_skips = N.
------------------------------------------------------------------
IF p.last_analyzed IS NULL THEN
l_total_warnings := l_total_warnings + 1;
log_detail(
p_run_id,
t.source_owner,
t.table_name,
p.partition_name,
p.partition_position,
l_last_pos,
l_hv_text,
l_hv_date,
p.num_rows,
p.last_analyzed,
NULL,
'SKIP-NOSTATS',
NULL,
'Partition has no statistics; skipped for safety.'
);
CONTINUE;
END IF;
IF NVL(p.num_rows, -1) <> 0 THEN
IF is_yes(l_run.log_skips) THEN
log_detail(
p_run_id,
t.source_owner,
t.table_name,
p.partition_name,
p.partition_position,
l_last_pos,
l_hv_text,
l_hv_date,
p.num_rows,
p.last_analyzed,
NULL,
'SKIP-STATS-HASROWS',
NULL,
NULL
);
END IF;
CONTINUE;
END IF;
BEGIN
l_sql :=
'SELECT COUNT(*) FROM ' ||
qname(t.source_owner) || '.' || qname(t.table_name) ||
' PARTITION (' || qname(p.partition_name) || ')';
EXECUTE IMMEDIATE l_sql INTO l_live_count;
EXCEPTION
WHEN OTHERS THEN
l_errmsg := SQLERRM;
l_table_errors := l_table_errors + 1;
l_total_errors := l_total_errors + 1;
log_detail(
p_run_id,
t.source_owner,
t.table_name,
p.partition_name,
p.partition_position,
l_last_pos,
l_hv_text,
l_hv_date,
p.num_rows,
p.last_analyzed,
NULL,
'COUNT-ERROR',
l_sql,
l_errmsg
);
CONTINUE;
END;
IF NVL(l_live_count, -1) <> 0 THEN
IF is_yes(l_run.log_skips) THEN
log_detail(
p_run_id,
t.source_owner,
t.table_name,
p.partition_name,
p.partition_position,
l_last_pos,
l_hv_text,
l_hv_date,
p.num_rows,
p.last_analyzed,
l_live_count,
'SKIP-LIVE-HASROWS',
l_sql,
NULL
);
END IF;
CONTINUE;
END IF;
l_table_candidates := l_table_candidates + 1;
l_total_candidates := l_total_candidates + 1;
l_sql :=
'ALTER TABLE ' ||
qname(t.source_owner) || '.' || qname(t.table_name) ||
' DROP PARTITION ' || qname(p.partition_name);
IF is_yes(l_run.update_global_indexes) THEN
l_sql := l_sql || ' UPDATE GLOBAL INDEXES';
END IF;
IF l_run.run_mode = 'DRY_RUN' THEN
log_detail(
p_run_id,
t.source_owner,
t.table_name,
p.partition_name,
p.partition_position,
l_last_pos,
l_hv_text,
l_hv_date,
p.num_rows,
p.last_analyzed,
l_live_count,
'DRY-DROP',
l_sql,
NULL
);
ELSE
BEGIN
EXECUTE IMMEDIATE l_sql;
l_table_dropped := l_table_dropped + 1;
l_total_dropped := l_total_dropped + 1;
log_detail(
p_run_id,
t.source_owner,
t.table_name,
p.partition_name,
p.partition_position,
l_last_pos,
l_hv_text,
l_hv_date,
p.num_rows,
p.last_analyzed,
l_live_count,
'DROPPED',
l_sql,
NULL
);
EXCEPTION
WHEN OTHERS THEN
l_errmsg := SQLERRM;
l_table_errors := l_table_errors + 1;
l_total_errors := l_total_errors + 1;
log_detail(
p_run_id,
t.source_owner,
t.table_name,
p.partition_name,
p.partition_position,
l_last_pos,
l_hv_text,
l_hv_date,
p.num_rows,
p.last_analyzed,
l_live_count,
'DROP-ERROR',
l_sql,
l_errmsg
);
END;
END IF;
UPDATE PDROP_BG_RUN_TARGETS
SET candidate_count = l_table_candidates,
dropped_count = l_table_dropped,
error_count = l_table_errors
WHERE run_id = p_run_id
AND source_owner = t.source_owner
AND table_name = t.table_name;
UPDATE PDROP_BG_RUNS
SET total_partitions_checked = l_total_partitions,
candidate_count = l_total_candidates,
dropped_count = l_total_dropped,
error_count = l_total_errors,
warning_count = l_total_warnings
WHERE run_id = p_run_id;
COMMIT;
END LOOP;
IF l_run.run_mode = 'EXECUTE'
AND l_table_dropped > 0
AND is_yes(l_run.rebuild_unusable_indexes)
THEN
rebuild_indexes_for_table(
p_run_id,
t.source_owner,
t.table_name,
l_run.parallel_degree,
l_index_rebuild_count,
l_table_errors,
l_total_errors
);
END IF;
l_after_partition_count := get_partition_count(t.source_owner, t.table_name);
l_after_segment_mb := get_segment_mb(t.source_owner, t.table_name);
l_after_unusable_count := get_unusable_global_index_count(t.source_owner, t.table_name);
UPDATE PDROP_BG_RUN_TARGETS
SET status = CASE
WHEN l_table_errors > 0
THEN 'COMPLETED_WITH_ERRORS'
ELSE 'COMPLETED'
END,
after_time = SYSTIMESTAMP,
after_partition_count = l_after_partition_count,
after_segment_mb = l_after_segment_mb,
after_unusable_index_count = l_after_unusable_count,
candidate_count = l_table_candidates,
dropped_count = l_table_dropped,
error_count = l_table_errors
WHERE run_id = p_run_id
AND source_owner = t.source_owner
AND table_name = t.table_name;
UPDATE PDROP_BG_RUNS
SET total_partitions_checked = l_total_partitions,
candidate_count = l_total_candidates,
dropped_count = l_total_dropped,
error_count = l_total_errors,
warning_count = l_total_warnings
WHERE run_id = p_run_id;
COMMIT;
END LOOP;
UPDATE PDROP_BG_RUNS
SET status = CASE
WHEN l_total_errors > 0
THEN 'COMPLETED_WITH_ERRORS'
ELSE 'COMPLETED'
END,
end_time = SYSTIMESTAMP,
total_partitions_checked = l_total_partitions,
candidate_count = l_total_candidates,
dropped_count = l_total_dropped,
error_count = l_total_errors,
warning_count = l_total_warnings,
notes = CASE
WHEN l_index_rebuild_count > 0
THEN SUBSTR(
NVL(notes, '') ||
' Index rebuild count=' || l_index_rebuild_count || '.',
1,
4000
)
ELSE notes
END
WHERE run_id = p_run_id;
COMMIT;
EXCEPTION
WHEN OTHERS THEN
l_errmsg := SQLERRM;
UPDATE PDROP_BG_RUNS
SET status = 'FAILED',
end_time = SYSTIMESTAMP,
notes = SUBSTR(l_errmsg, 1, 4000),
error_count = NVL(error_count, 0) + 1
WHERE run_id = p_run_id;
COMMIT;
RAISE;
END run_cleanup;
--------------------------------------------------------------------------
-- Create and enable scheduler job
-- Includes job-exists retry safety and running-job protection.
--------------------------------------------------------------------------
PROCEDURE start_scheduler_job (
p_run_id IN VARCHAR2
) IS
l_job_name VARCHAR2(128);
l_status VARCHAR2(50);
l_running_count NUMBER;
l_errmsg VARCHAR2(4000);
BEGIN
SELECT scheduler_job_name, status
INTO l_job_name, l_status
FROM PDROP_BG_RUNS
WHERE run_id = p_run_id;
IF l_status = 'RUNNING' THEN
RAISE_APPLICATION_ERROR(-20003, 'Run is already RUNNING: ' || p_run_id);
END IF;
SELECT COUNT(*)
INTO l_running_count
FROM user_scheduler_running_jobs
WHERE job_name = UPPER(l_job_name);
IF l_running_count > 0 THEN
RAISE_APPLICATION_ERROR(-20004, 'Scheduler job is already running: ' || l_job_name);
END IF;
capture_before(p_run_id);
BEGIN
DBMS_SCHEDULER.DROP_JOB(
job_name => l_job_name,
force => TRUE
);
EXCEPTION
WHEN OTHERS THEN
NULL;
END;
DBMS_SCHEDULER.CREATE_JOB(
job_name => l_job_name,
job_type => 'PLSQL_BLOCK',
job_action => 'BEGIN PDROP_BG_PKG.RUN_CLEANUP(''' || p_run_id || '''); END;',
start_date => SYSTIMESTAMP,
enabled => FALSE,
auto_drop => FALSE,
comments => 'Background partition cleanup for run_id ' || p_run_id
);
UPDATE PDROP_BG_RUNS
SET status = 'SCHEDULER_CREATED'
WHERE run_id = p_run_id;
COMMIT;
UPDATE PDROP_BG_RUNS
SET status = 'SCHEDULER_ENABLED'
WHERE run_id = p_run_id
AND status NOT IN ('RUNNING', 'COMPLETED', 'COMPLETED_WITH_ERRORS', 'FAILED');
COMMIT;
BEGIN
DBMS_SCHEDULER.ENABLE(l_job_name);
EXCEPTION
WHEN OTHERS THEN
l_errmsg := SQLERRM;
UPDATE PDROP_BG_RUNS
SET status = 'SCHEDULER_ENABLE_FAILED',
notes = SUBSTR(l_errmsg, 1, 4000)
WHERE run_id = p_run_id;
COMMIT;
RAISE;
END;
END start_scheduler_job;
--------------------------------------------------------------------------
-- Stop scheduler job
--------------------------------------------------------------------------
PROCEDURE stop_scheduler_job (
p_run_id IN VARCHAR2
) IS
l_job_name VARCHAR2(128);
l_errmsg VARCHAR2(4000);
BEGIN
SELECT scheduler_job_name
INTO l_job_name
FROM PDROP_BG_RUNS
WHERE run_id = p_run_id;
BEGIN
DBMS_SCHEDULER.STOP_JOB(
job_name => l_job_name,
force => FALSE
);
UPDATE PDROP_BG_RUNS
SET status = 'STOP_REQUESTED'
WHERE run_id = p_run_id;
COMMIT;
EXCEPTION
WHEN OTHERS THEN
l_errmsg := SQLERRM;
UPDATE PDROP_BG_RUNS
SET status = 'STOP_FAILED',
notes = SUBSTR(l_errmsg, 1, 4000)
WHERE run_id = p_run_id;
COMMIT;
RAISE;
END;
END stop_scheduler_job;
END PDROP_BG_PKG;
/