use warehouse TEST;
use
database
TEST;
use role SYSADMIN;
create
or
replace
schema
TEST_STREAMS;
create
or
replace
table
STAGING
(ID
int
, CHANGE_NUMBER string, HOW_CHANGED string, FINAL_VALUE string);
create
or
replace
table
PRODUCTION
like
STAGING;
create
or
replace
stream STAGING_STREAM
on
table
STAGING;
select
*
from
STAGING;
select
*
from
STAGING_STREAM;
insert
into
STAGING (ID, CHANGE_NUMBER, HOW_CHANGED, FINAL_VALUE)
values
(1, '1st change
to
STAGING
table
', '
Inserted
', '
Original Row 1 Value
'),
(2, '
1st change
to
STAGING
table
', '
Inserted
', '
Original Row 2 Value
'),
(3, '
1st change
to
STAGING
table
', '
Inserted
', '
Original Row 3 Value
');
-- Let'
s look
at
the STAGING
table
now
to
see our three
rows
:
select
*
from
STAGING;
select
*
from
STAGING_STREAM;
select
*
from
STAGING_STREAM;
delete
from
STAGING;
select
*
from
STAGING_STREAM;
insert
into
STAGING (ID, CHANGE_NUMBER, HOW_CHANGED, FINAL_VALUE)
values
(1,
'3rd change to STAGING table'
,
'Inserted after deleted'
,
'Original Row 1 Value'
),
(2,
'3rd change to STAGING table'
,
'Inserted after deleted'
,
'Original Row 2 Value'
),
(3,
'3rd change to STAGING table'
,
'Inserted after deleted'
,
'Original Row 3 Value'
);
select
*
from
STAGING_STREAM;
insert
into
PRODUCTION
select
ID, CHANGE_NUMBER, HOW_CHANGED, FINAL_VALUE
from
STAGING_STREAM;
select
*
from
PRODUCTION;
select
*
from
STAGING_STREAM;
update
STAGING
set
FINAL_VALUE = 'Updated Row 1 Value
',
HOW_CHANGED = '
Updated
in
change 4
'
where ID = 1;
-- Examine the change in the staging table:
select * from STAGING;
-- Since the last time you consumed the stream, you have one UPDATE to process.
-- Let'
s see what that looks
like
in
the stream:
select
*
from
STAGING_STREAM;
delete
from
PRODUCTION
where
ID
in
(
select
ID
from
STAGING_STREAM
where
METADATA$
ACTION
= '
DELETE
');
-- We'
ve now deleted row ID 1, let
's check it and then do the INSERT:
select * from PRODUCTION;
-- But wait... What happened to the stream? Did it clear out only the DELETE
-- metadata action because that'
s the
only
one you used
in
the DML?
select
*
from
STAGING_STREAM;
delete
from
PRODUCTION;
insert
into
PRODUCTION
select
*
from
STAGING;
select
*
from
STAGING;
insert
into
STAGING (ID, CHANGE_NUMBER, HOW_CHANGED, FINAL_VALUE)
values
(4, '5th change
to
STAGING
table
', '
Inserted
in
change 5
', '
Original Row 5 value
');
update STAGING
set CHANGE_NUMBER = '
6th change
to
STAGING
table
', HOW_CHANGED = '
Updated
in
change 6
'
where ID = 2;
delete from STAGING where ID = 3;
-- Now your STAGING and PRODUCTION tables are out of sync. The stream captures
-- all changes (change data capture or CDC) needed to process to get the tables
-- in sync:
select * from STAGING_STREAM;
-- Note that we have *FOUR* rows after making one change for each verb
-- INSERT, UPDATE, and DELETE. Recall that Snowflake processes an UPDATE as
-- a DELETE followed by an INSERT, and shows this in the METADATA$ISUPDATE
-- metadata column.
-- What if all you want to do is keep PROD in sync with STAGING, but control
-- when those changes happen and have the option to examine them before
-- applying them? This next DML statement serves as a template to make this
-- use case super easy and efficient:
-- Let'
s look
at
the PRODUCTION
table
first
:
select
*
from
PRODUCTION;
merge
into
PRODUCTION P using
(
select
*
from
STAGING_STREAM
where
METADATA$
ACTION
<>
'DELETE'
or
METADATA$ISUPDATE =
false
) S
on
P.ID = S.ID
when
matched
AND
S.METADATA$ISUPDATE =
false
and
S.METADATA$
ACTION
=
'DELETE'
then
delete
when
matched
AND
S.METADATA$ISUPDATE =
true
then
update
set
P.ID = S.ID,
P.CHANGE_NUMBER = S.CHANGE_NUMBER,
P.HOW_CHANGED = S.HOW_CHANGED,
P.FINAL_VALUE = S.FINAL_VALUE
when
not
matched
then
insert
(ID, CHANGE_NUMBER, HOW_CHANGED, FINAL_VALUE)
values
(S.ID, S.CHANGE_NUMBER, S.HOW_CHANGED, S.FINAL_VALUE);
select
*
from
PRODUCTION;
select
*
from
STAGING_STREAM;
select
P.ID
as
P_ID,
P.CHANGE_NUMBER
as
P_CHANGE_NUMBER,
P.HOW_CHANGED
as
P_HOW_CHANGED,
P.FINAL_VALUE
as
P_FINAL_VALUE,
S.ID
as
S_ID,
S.CHANGE_NUMBER
as
S_CHANGE_NUMBER,
S.HOW_CHANGED
as
S_HOW_CHANGED,
S.FINAL_VALUE
as
S_FINAL_VALUE
from
PRODUCTION P
full
outer
join
STAGING S
on
P.ID = S.ID
and
P.CHANGE_NUMBER = S.CHANGE_NUMBER
and
P.HOW_CHANGED = S.HOW_CHANGED
and
P.FINAL_VALUE = S.FINAL_VALUE
where
P.ID
is
null
or
S.ID
is
null
or
P.CHANGE_NUMBER
is
null
or
S.CHANGE_NUMBER
is
null
or
P.HOW_CHANGED
is
null
or
S.HOW_CHANGED
is
null
or
P.FINAL_VALUE
is
null
or
S.FINAL_VALUE
is
null
;