Friday, 19. June 2020 06:27PM

Importing JSON-Data into PostgreSQL

Parsing JSON files can be quick and efficient, importing them into PostgreSQL may take some time. In this article, we'll examine how to save compute resource time by pre-processing the JSON format into a CSV format with PSQL datatypes to be imported into PostgreSQL. This method seems to be the quickest way to import data into PostgreSQL

Lets begin with grabbing the Yelp Dataset from Kaggle: https://www.kaggle.com/yelp-dataset/yelp-dataset

Once the Yelp Dataset is downloaded, move it to a workspace and unzip the files. You'll find a few JSON files

-rw-r--r-- 1 nobody nobody 4809540040  6月 13 05:52 10100_1035793_bundle_archive.zip
-rw-r--r-- 1 nobody nobody      41776  3月 26 01:18 Dataset_Agreement.pdf
-rw-r--r-- 1 nobody nobody  152898689  3月 26 01:18 yelp_academic_dataset_business.json
-rw-r--r-- 1 nobody nobody  449663480  3月 26 01:18 yelp_academic_dataset_checkin.json
-rw-r--r-- 1 nobody nobody 6325565224  3月 26 01:19 yelp_academic_dataset_review.json
-rw-r--r-- 1 nobody nobody  263489322  3月 26 01:31 yelp_academic_dataset_tip.json
-rw-r--r-- 1 nobody nobody 3268069927  3月 26 01:32 yelp_academic_dataset_user.json

So that our code works here in this notebook. We'll use fake-data in place of the real data

In [1]:
fake_data = '''{"review_id":"xQY8N_XvtGbearJ5X4QryQ","user_id":"OwjRMXRC0KyPrIlcjaXeFQ","business_id":"-MhfebM0QIsKt87iDN-FNw","stars":2.0,"useful":5,"funny":0,"cool":0,"text":"Lorem Ipsum is simply dummy text of the printing and typesetting industry. Lorem Ipsum has been the industry's standard dummy text ever since the 1500s, when an unknown printer took a galley of type and scrambled it to make a type specimen book. It has survived not only five centuries, but also the leap into electronic typesetting, remaining essentially unchanged. It was popularised in the 1960s with the release of Letraset sheets containing Lorem Ipsum passages, and more recently with desktop publishing software like Aldus PageMaker including versions of Lorem Ipsum.","date":"2015-04-15 05:21:16"}
{"review_id":"UmFMZ8PyXZTY2QcwzsfQYA","user_id":"nIJD_7ZXHq-FX8byPMOkMQ","business_id":"lbrU8StCq3yDfr-QMnGrmQ","stars":1.0,"useful":1,"funny":1,"cool":0,"text":"Lorem Ipsum is simply dummy text of the printing and typesetting industry. Lorem Ipsum has been the industry's standard dummy text ever since the 1500s, when an unknown printer took a galley of type and scrambled it to make a type specimen book. It has survived not only five centuries, but also the leap into electronic typesetting, remaining essentially unchanged. It was popularised in the 1960s with the release of Letraset sheets containing Lorem Ipsum passages, and more recently with desktop publishing software like Aldus PageMaker including versions of Lorem Ipsum.", "date":"2013-12-07 03:16:52"}
{"review_id":"LG2ZaYiOgpr2DK_90pYjNw","user_id":"V34qejxNsCbcgD8C0HVk-Q","business_id":"HQl28KMwrEKHqhFrrDqVNQ","stars":5.0,"useful":1,"funny":0,"cool":0,"text":"Lorem Ipsum is simply dummy text of the printing and typesetting industry. Lorem Ipsum has been the industry's standard dummy text ever since the 1500s, when an unknown printer took a galley of type and scrambled it to make a type specimen book. It has survived not only five centuries, but also the leap into electronic typesetting, remaining essentially unchanged. It was popularised in the 1960s with the release of Letraset sheets containing Lorem Ipsum passages, and more recently with desktop publishing software like Aldus PageMaker including versions of Lorem Ipsum.", "date":"2015-12-05 03:18:11"}
'''
fake_data_filepath = 'yelp-fake-data.json'
with open(fake_data_filepath, 'w') as stream:
  stream.write(fake_data)

Lets start with importing the libraries we'll need to run the codes, setup logging and create a few constants near the top of the file. The logger probably isn't needed in a Jupyter Notebook, I'll include it though because this post is about transforming data

In [ ]:
#!/usr/bin/env python

import csv
import hashlib
import logging
import glob
import json
import typing
import uuid

from datetime import datetime

logger = logging.getLogger('')
sysHandler = logging.StreamHandler()
sysHandler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(sysHandler)
logger.setLevel(logging.INFO)
logger = logging.getLogger(__name__)

ENCODING: str = 'utf-8'

Create a function that'll help keep our transformed dataset clean of dumplicate records. datum_hash takes in a python-DICT and python-DICT->Keys and returns the hash. The Keys are required to maintain order of the dict->str encoding

In [3]:
def insert_hash(datum, headers) -> str:
    data = ''.join([str(datum[h]) for h in headers])
    return hashlib.sha256(data.encode('utf-8')).hexdigest()

The convert_json_to_csv function signature takes in a lot. filepath converts the |-.json path to |-.csv and will write the output into the same directory. unique_together takes a list of keys which values will be comebined and hashed to create a unique hash. If the hashed-value is already know. It'll be omitted in the CSV output only_columns trims down the output value to only the columns requested, or all the columns mutators is a list of functions that'll accept and return string values for each propery specified extenders is a list of functions that'll accept an array of rows, and extend the rows according to the data-type found in the rows[0]

With this, we have a fairly versatile script that'll give us uniform output for each JSON-entry in a file

In [27]:
def convert_json_to_csv(
    filepath: str,
    unique_together: typing.List[str],
    only_columns: typing.List[str] = [],
    mutators: typing.Dict['key-name', typing.List['function']] = {},
    extenders: typing.Dict['python-type', typing.List['function']] = {}) -> None:

    csv_filepath = filepath.rsplit('.', 1)[0]
    logger.info(f'Converting file: {csv_filepath} to CSV')
    csv_filepath = f'{csv_filepath}.csv'
    inserts = {}
    def _write_rows(rows: typing.List[typing.List[typing.Any]], writer: 'csv.Writer') -> None:
        result = rows[:]
        for data_type, extender_list in extenders.items():
            for extender in extender_list:
                result = extender(result)

        for row in result:
            writer.writerow(row)

    with open(filepath, 'rb') as json_stream:
        line = json_stream.readline().decode('utf-8').strip('\n')
        datum = json.loads(line)
        with open(csv_filepath, 'w') as csv_stream:
            writer = csv.writer(csv_stream)
            headers = [h for h in datum.keys()]
            for header in only_columns:
                assert header in headers

            if len(only_columns) > 0:
                headers = only_columns

            for header in unique_together:
                assert header in headers, headers

            headers.insert(0, 'created')
            headers.insert(0, 'guid')
            datum['guid'] = str(uuid.uuid4())
            datum['created'] = '2020-06-13 17:09:07.189521+00'
            for col_name, mutator_list in mutators.items():
                for mutator in mutator_list:
                    datum[col_name] = mutator(datum[col_name])

            writer.writerow(headers)
            _write_rows([[datum[h] for h in headers]], writer)
            if len(unique_together) > 0:
                inserts[insert_hash(datum, unique_together)] = 1

            while True:
                line = json_stream.readline().decode('utf-8').strip('\n')
                if line == '':
                    break

                datum = json.loads(line)
                datum['guid'] = str(uuid.uuid4())
                datum['created'] = '2020-06-13 17:09:07.189521+00'
                for col_name, mutator_list in mutators.items():
                    for mutator in mutator_list:
                        datum[col_name] = mutator(datum[col_name])

                datum_hash = insert_hash(datum, unique_together)
                if inserts.get(datum_hash, None) is None and len(unique_together) > 0:
                    inserts[datum_hash] = 1
                    _write_rows([[datum[h] for h in headers]], writer)
                elif len(unique_together) == 0:
                    _write_rows([[datum[h] for h in headers]], writer)

Create the two utility functions that'll assist in converting the data from JSON->CSV

In [25]:
def convert_datetime_to_psql(value: str) -> str:
    YELP_FORMAT = '%Y-%m-%d %H:%M:%S'
    PSQL_FORMAT = '%Y-%m-%d %H:%M:%S.%f+00'
    stamp = datetime.strptime(value, YELP_FORMAT)
    return stamp.strftime(PSQL_FORMAT)

def extend_uniform_lists(rows: typing.List[typing.List[typing.Any]]) -> typing.List[str]:
    result = []
    for row in rows:
        scalar_values = [value for value in row if not isinstance(value, list)]
        list_values = [value for value in row if isinstance(value, list)]
        if len(list_values) == 0:
            result.append(row)
            break
            
        for idx in range(0, len(list_values[0])):
            clone = scalar_values[:]
            clone.extend([value[idx] for value in list_values])
            result.append(clone)

    return result

Invoke the conversion function

In [28]:
convert_json_to_csv(
  fake_data_filepath,
  [],
  [],
  {'date': [convert_datetime_to_psql]},
  {list: [extend_uniform_lists]})
2020-06-14 18:07:31,709 - __main__ - INFO - Converting file: /tmp/yelp-fake-data to CSV

Make sure we have some output in the CSV file

In [33]:
!wc -l /tmp/yelp-fake-data.csv
4 /tmp/yelp-fake-data.csv

Looking good! Go ahead an import it into a PSQL service with the following command,

psql> COPY review FROM '/tmp/yelp-fake-data.csv' CSV HEADER;