In [1]:
// Make the select and repartition from the new tables
spark.sql("""
SELECT
  wbit_item_id as id,
  wby_name as type,
  wbxl_language as language,
  wbx_text as text
FROM wmf_raw.wikibase_wbt_item_terms
LEFT JOIN wmf_raw.wikibase_wbt_term_in_lang ON wbit_term_in_lang_id = wbtl_id
  AND wmf_raw.wikibase_wbt_term_in_lang.wiki_db = 'wikidatawiki'
  AND wmf_raw.wikibase_wbt_term_in_lang.snapshot = '2019-11'
LEFT JOIN wmf_raw.wikibase_wbt_type ON wbtl_type_id = wby_id
  AND wmf_raw.wikibase_wbt_type.wiki_db = 'wikidatawiki'
  AND wmf_raw.wikibase_wbt_type.snapshot = '2019-11'
LEFT JOIN wmf_raw.wikibase_wbt_text_in_lang ON wbtl_text_in_lang_id = wbxl_id
  AND wmf_raw.wikibase_wbt_text_in_lang.wiki_db = 'wikidatawiki'
  AND wmf_raw.wikibase_wbt_text_in_lang.snapshot = '2019-11'
LEFT JOIN wmf_raw.wikibase_wbt_text ON wbxl_text_id = wbx_id
  AND wmf_raw.wikibase_wbt_text.wiki_db = 'wikidatawiki'
  AND wmf_raw.wikibase_wbt_text.snapshot = '2019-11'
WHERE wmf_raw.wikibase_wbt_item_terms.wiki_db = 'wikidatawiki'
  AND wmf_raw.wikibase_wbt_item_terms.snapshot = '2019-11'
""").repartition(64).createOrReplaceTempView("wd_compare_new_1")
spark.table("wd_compare_new_1").cache()
Waiting for a Spark session to start...
Out[1]:
[id: bigint, type: string ... 2 more fields]
In [2]:
spark.table("wd_compare_new_1").count()
Out[2]:
2129274689
In [3]:
spark.table("wd_compare_new_1").show(10, false)
+--------+-----------+--------+----------------------------------------------------------+
|id      |type       |language|text                                                      |
+--------+-----------+--------+----------------------------------------------------------+
|3161990 |label      |sv      |Janine Chanteur                                           |
|28216454|description|cs      |vědecký článek publikovaný v roce 2002                    |
|1832866 |label      |da      |Henk Scheermeijer                                         |
|4867940 |label      |nds-nl  |Bass Mansion                                              |
|35419064|label      |ceb     |Río Puntijao                                              |
|41690647|label      |en      |Photonic Shape Memory Polymer with Stable Multiple Colors.|
|37173334|label      |st      |Harutunyan                                                |
|2306542 |description|nl      |spoorwegstation in Polen                                  |
|46758779|label      |en      |Robert A. Batey                                           |
|14319565|description|gsw     |dorf in Kina                                              |
+--------+-----------+--------+----------------------------------------------------------+
only showing top 10 rows

In [4]:
// Make the select and repartition from the old tables
spark.sql("""
SELECT
  CAST(SUBSTR(term_full_entity_id, 2) AS int) as id,
  term_type as type,
  term_language as language,
  term_text as text
FROM wmf_raw.wikibase_wb_terms
WHERE wiki_db = 'wikidatawiki'
  AND snapshot = '2019-11'
  AND term_entity_type = 'item'
""").repartition(64).createOrReplaceTempView("wd_compare_old_1")
spark.table("wd_compare_old_1").cache()
Out[4]:
[id: int, type: string ... 2 more fields]
In [5]:
spark.table("wd_compare_old_1").count()
Out[5]:
2471118234
In [6]:
spark.table("wd_compare_old_1").show(10, false)
+--------+-----------+--------+----------------------------------------+
|id      |type       |language|text                                    |
+--------+-----------+--------+----------------------------------------+
|39402838|description|zh-hans |2017年论文                              |
|39051820|description|vi      |bài báo khoa học                        |
|19662223|description|ar      |درّاجة طريق برازيلية                    |
|28067460|description|ca      |article científic                       |
|36787507|description|en-ca   |scientific article published on May 2007|
|35758341|description|el      |επιστημονικό άρθρο                      |
|35806280|description|es      |artículo científico publicado en 2015   |
|28251499|description|lt      |mokslinis straipsnis                    |
|36800557|description|nb      |vitenskapelig artikkel                  |
|39018926|description|sr-ec   |научни чланак                           |
+--------+-----------+--------+----------------------------------------+
only showing top 10 rows

In [7]:
// Join the 2 tables
spark.sql("""
SELECT
  old.id as id,
  old.type as type,
  old.language as language,
  old.text as oldText,
  new.text as newText
FROM wd_compare_old_1 as old
LEFT JOIN wd_compare_new_1 as new
ON CONCAT( old.id, old.type, old.language, old.text ) = CONCAT( new.id, new.type, new.language, new.text )
""").createOrReplaceTempView("wb_compare_both_1")
spark.table("wb_compare_both_1").cache()
Out[7]:
[id: int, type: string ... 3 more fields]
In [8]:
spark.table("wb_compare_both_1").count()
Out[8]:
2471128034
In [9]:
spark.table("wb_compare_both_1").show(10, false)
+--------+-----------+--------+-----------------------------+---------------------+
|id      |type       |language|oldText                      |newText              |
+--------+-----------+--------+-----------------------------+---------------------+
|10000002|description|hu      |Wikimédia-kategória          |null                 |
|10000002|description|hy      |Վիքիմեդիայի նախագծի կատեգորիա|null                 |
|10000007|description|arz     |تصنيف بتاع ويكيميديا         |null                 |
|10000007|description|bar     |Wikimedia-Kategorie          |null                 |
|10000013|description|fy      |Wikimedia-kategory           |null                 |
|10000015|description|fo      |Wikimedia-bólkur             |null                 |
|10000019|description|th      |หน้าหมวดหมู่วิกิมีเดีย       |null                 |
|1000002 |label      |frp     |Claus Hammel                 |Claus Hammel         |
|10000032|description|fa      |ردهٔ ویکی‌پدیا               |null                 |
|1000003 |description|nl      |gemeente in Frankrijk        |gemeente in Frankrijk|
+--------+-----------+--------+-----------------------------+---------------------+
only showing top 10 rows

In [1]:
spark.sql("""
SELECT
  *
FROM wb_compare_both_1
WHERE 
(
oldText != newText
OR newText IS NULL
)
""").createOrReplaceTempView("wb_compare_both_2")
spark.table("wb_compare_both_2").cache()
Out[1]:
[id: int, type: string ... 3 more fields]
In [2]:
spark.table("wb_compare_both_2").count()
Out[2]:
1529258101
In [3]:
spark.table("wb_compare_both_2").show(10, false)
+--------+-----------+--------+-----------------------------+-------+
|id      |type       |language|oldText                      |newText|
+--------+-----------+--------+-----------------------------+-------+
|10000002|description|hu      |Wikimédia-kategória          |null   |
|10000002|description|hy      |Վիքիմեդիայի նախագծի կատեգորիա|null   |
|10000007|description|arz     |تصنيف بتاع ويكيميديا         |null   |
|10000007|description|bar     |Wikimedia-Kategorie          |null   |
|10000013|description|fy      |Wikimedia-kategory           |null   |
|10000015|description|fo      |Wikimedia-bólkur             |null   |
|10000019|description|th      |หน้าหมวดหมู่วิกิมีเดีย       |null   |
|10000032|description|fa      |ردهٔ ویکی‌پدیا               |null   |
|10000040|description|ace     |kawan Wikimèdia              |null   |
|10000040|description|mn      |категорияд Ангилал           |null   |
+--------+-----------+--------+-----------------------------+-------+
only showing top 10 rows

In [10]:
spark.sql("""
SELECT
  DISTINCT id
FROM wb_compare_both_2
ORDER BY id ASC
""").createOrReplaceTempView("wb_compare_both_3")
spark.table("wb_compare_both_3").cache()
Out[10]:
[id: int]
In [11]:
spark.table("wb_compare_both_3").count()
Out[11]:
49021987
In [12]:
spark.table("wb_compare_both_3").show(10, false)
+---+
|id |
+---+
|1  |
|2  |
|3  |
|5  |
|8  |
|21 |
|23 |
|52 |
|60 |
|65 |
+---+
only showing top 10 rows

In [13]:
spark.table("wb_compare_both_3").repartition(1).write.csv("/tmp/addshore-tmpoutput.csv")

// hadoop fs -ls hdfs://analytics-hadoop/tmp/addshore-*
// hadoop fs -text hdfs://analytics-hadoop/tmp/addshore-tmpoutput.csv/part-00000-5a9faa9a-6e44-4579-979a-e6a413e4112b-c000.csv.snappy > ~/tmpoutput
// hadoop fs -rm -r hdfs://analytics-hadoop/tmp/addshore-*
// cp ./tmpoutput /srv/published-datasets/one-off/wikidata/addshore/T239470-20191209-1225-item-terms-migration-holes
// published-sync
// rm ./tmpoutput