// Make the select and repartition from the new tables
spark.sql("""
SELECT
wbit_item_id as id,
wby_name as type,
wbxl_language as language,
wbx_text as text
FROM wmf_raw.wikibase_wbt_item_terms
LEFT JOIN wmf_raw.wikibase_wbt_term_in_lang ON wbit_term_in_lang_id = wbtl_id
AND wmf_raw.wikibase_wbt_term_in_lang.wiki_db = 'wikidatawiki'
AND wmf_raw.wikibase_wbt_term_in_lang.snapshot = '2019-11'
LEFT JOIN wmf_raw.wikibase_wbt_type ON wbtl_type_id = wby_id
AND wmf_raw.wikibase_wbt_type.wiki_db = 'wikidatawiki'
AND wmf_raw.wikibase_wbt_type.snapshot = '2019-11'
LEFT JOIN wmf_raw.wikibase_wbt_text_in_lang ON wbtl_text_in_lang_id = wbxl_id
AND wmf_raw.wikibase_wbt_text_in_lang.wiki_db = 'wikidatawiki'
AND wmf_raw.wikibase_wbt_text_in_lang.snapshot = '2019-11'
LEFT JOIN wmf_raw.wikibase_wbt_text ON wbxl_text_id = wbx_id
AND wmf_raw.wikibase_wbt_text.wiki_db = 'wikidatawiki'
AND wmf_raw.wikibase_wbt_text.snapshot = '2019-11'
WHERE wmf_raw.wikibase_wbt_item_terms.wiki_db = 'wikidatawiki'
AND wmf_raw.wikibase_wbt_item_terms.snapshot = '2019-11'
""").repartition(64).createOrReplaceTempView("wd_compare_new_1")
spark.table("wd_compare_new_1").cache()
spark.table("wd_compare_new_1").count()
spark.table("wd_compare_new_1").show(10, false)
// Make the select and repartition from the old tables
spark.sql("""
SELECT
CAST(SUBSTR(term_full_entity_id, 2) AS int) as id,
term_type as type,
term_language as language,
term_text as text
FROM wmf_raw.wikibase_wb_terms
WHERE wiki_db = 'wikidatawiki'
AND snapshot = '2019-11'
AND term_entity_type = 'item'
""").repartition(64).createOrReplaceTempView("wd_compare_old_1")
spark.table("wd_compare_old_1").cache()
spark.table("wd_compare_old_1").count()
spark.table("wd_compare_old_1").show(10, false)
// Join the 2 tables
spark.sql("""
SELECT
old.id as id,
old.type as type,
old.language as language,
old.text as oldText,
new.text as newText
FROM wd_compare_old_1 as old
LEFT JOIN wd_compare_new_1 as new
ON CONCAT( old.id, old.type, old.language, old.text ) = CONCAT( new.id, new.type, new.language, new.text )
""").createOrReplaceTempView("wb_compare_both_1")
spark.table("wb_compare_both_1").cache()
spark.table("wb_compare_both_1").count()
spark.table("wb_compare_both_1").show(10, false)
spark.sql("""
SELECT
*
FROM wb_compare_both_1
WHERE
(
oldText != newText
OR newText IS NULL
)
""").createOrReplaceTempView("wb_compare_both_2")
spark.table("wb_compare_both_2").cache()
spark.table("wb_compare_both_2").count()
spark.table("wb_compare_both_2").show(10, false)
spark.sql("""
SELECT
DISTINCT id
FROM wb_compare_both_2
ORDER BY id ASC
""").createOrReplaceTempView("wb_compare_both_3")
spark.table("wb_compare_both_3").cache()
spark.table("wb_compare_both_3").count()
spark.table("wb_compare_both_3").show(10, false)
spark.table("wb_compare_both_3").repartition(1).write.csv("/tmp/addshore-tmpoutput.csv")
// hadoop fs -ls hdfs://analytics-hadoop/tmp/addshore-*
// hadoop fs -text hdfs://analytics-hadoop/tmp/addshore-tmpoutput.csv/part-00000-5a9faa9a-6e44-4579-979a-e6a413e4112b-c000.csv.snappy > ~/tmpoutput
// hadoop fs -rm -r hdfs://analytics-hadoop/tmp/addshore-*
// cp ./tmpoutput /srv/published-datasets/one-off/wikidata/addshore/T239470-20191209-1225-item-terms-migration-holes
// published-sync
// rm ./tmpoutput