In [2]:
%%html 
<a href="javascript:code_showhide_toggle()">Show/Hide Code</a>

Harmonize and Index Baltimore Crime Incident Dataset

Contents

Introduction

This notebook will illustrate use of Jupyter with PySpark to

  • harmonize crime datasets from multiple jurisdictions
  • add metadata to support a data driven search UI
  • create a data dictionary with metadata for each variable
  • save data and metadata as Spark SQL tables backed by (Athena compatible) Parquet files in S3
  • index data and metadata to support search
  • self-publish this notebook so it can be linked from the search UI to provide transparency and reproducability

Setup

What does this do

This sets up our environment variables to use for data input, output, indexing, etc...

In [3]:
# CONFIGURATION VARIABLES
city="Baltimore"
cityurl="https://data.baltimorecity.gov/api/views/wsfq-mvij/rows.csv?accessType=DOWNLOAD"
citynotebook="Baltimore-notebook" # citynotebook is used to automatically publish the notebook at the end


# location for raw (unprocessed) dataset in S3
scratch_bucket=os.environ["S3_SCRATCH_BUCKET"]
datasets3 = "s3://{0}/crimedata/raw/{1}.csv".format(scratch_bucket, city)

# locaton for harmonized (processed) dataset in S3. Structure is:
#   outputroot
#       |--- data - incidents - multiple CSV files
#       |--- dictionary - incidents - CSV file containing data dictionary
#       |--- doc - copy of notebook (published for web access)
outputroot = "s3://{0}/crimedata/harmonized/{1}".format(scratch_bucket,city)
outputpath_data = "{}/data".format(outputroot)
outputpath_dictionary="{}/dictionary".format(outputroot)
outputpath_doc="{}/docs".format(outputroot)
notebook_urlbase = "https://s3.amazonaws.com/{0}/{1}".format(outputpath_doc.replace("s3://",""), citynotebook)

# elasticsearch cluster endpoint
# - use local proxy service (installed via an EMR bootstrap) which signs ES API calls using EMR EC2 role.
esendpoint="localhost"
esport=9200

# Summary of configuration
print("City: {}".format(city))
print("Notebook Name: {}".format(citynotebook))
print("Dataset URL: {}".format(cityurl))
print("S3 dataset input: {}".format(datasets3))
print("Harmonized output: {}".format(outputroot))
print("ElasticSearch Cluster: {}.{}".format(esendpoint, esport))
print("Setup & Initialization done")
City: Baltimore
Notebook Name: Baltimore-notebook
Dataset URL: https://data.baltimorecity.gov/api/views/wsfq-mvij/rows.csv?accessType=DOWNLOAD
S3 dataset input: s3://datasearch-blog-jupyterspark-180dxejofo-emrbucket-1fvo1qv4b0l67/crimedata/raw/Baltimore.csv
Harmonized output: s3://datasearch-blog-jupyterspark-180dxejofo-emrbucket-1fvo1qv4b0l67/crimedata/harmonized/Baltimore
ElasticSearch Cluster: localhost.9200
Setup & Initialization done
In [4]:
# INITIALIZATION - instantiate objects used by notebook

%matplotlib inline
from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql.functions import *
from IPython.display import display, HTML
import datetime
import subprocess

# elasticsearch and harmonization objects defined in ./lib/esindex.py and ./lib/harmonizeCrimeIncidents.py
from lib.esindex import esindex
from lib.harmonizeCrimeIncidents import harmonizeCrimeIncidents

sc = SparkContext.getOrCreate()
hc = HiveContext(sc)
es = esindex(esendpoint)
hz = harmonizeCrimeIncidents(hc)
print("Initialization complete.")
Initialization complete.

Load Input Data

What does this do

This takes a data source file, in our case a CSV file, and puts it into our S3 bucket for procssing. Once the source file is copied into the bucket, we will load the file into a Spark dataframe for analysis and processing.

In [5]:
# Download & install dataset from website

def copy_from_website_to_s3(city, cityurl, datasets3):
    tmpfile="/tmp/{0}.csv".format(city)
    print("Downloading {0} from: {1}".format(tmpfile, cityurl))
    subprocess.check_output("curl {0} -o {1}".format(cityurl, tmpfile).split())
    print("Copying {0} to: {1}".format(tmpfile, datasets3))
    subprocess.check_output("aws s3 cp {0} {1} --sse AES256".format(tmpfile, datasets3).split())
    os.remove(tmpfile)
    
# uncomment if you want to re-download the dataset
copy_from_website_to_s3(city, cityurl, datasets3)
Downloading /tmp/Baltimore.csv from: https://data.baltimorecity.gov/api/views/wsfq-mvij/rows.csv?accessType=DOWNLOAD
Copying /tmp/Baltimore.csv to: s3://datasearch-blog-jupyterspark-180dxejofo-emrbucket-1fvo1qv4b0l67/crimedata/raw/Baltimore.csv
In [6]:
# Read input datasets as DataFrames
# No need to infer schema - all variables initially typed as strings
print("Loading dataset from S3")
df = hc.read.load( datasets3,
                   format='com.databricks.spark.csv',
                   header='true',
                   inferSchema='false',
                   delimiter=',') 
df_in=df # keep copy of raw data for reference
print("Dataset {0}: Loaded {1} rows.".format(city, df.count()))
Loading dataset from S3
Dataset Baltimore: Loaded 243399 rows.

Exploratory Analysis

Edit as desired to interactively explore the data

What does this do

With the data source file loaded into a Spark Data Frame, you can get some details on the content of the data using methods from Spark, Pandas, and others. This is purely exploratory information to give users insights into the raw data set.

In [7]:
# examine column headers
str(df.columns)
Out[7]:
"['CrimeDate', 'CrimeTime', 'CrimeCode', 'Location', 'Description', 'Inside/Outside', 'Weapon', 'Post', 'District', 'Neighborhood', 'Location 1', 'Premise', 'Year', 'Total Incidents']"
In [8]:
# look at first 2 rows
df.head(2)
Out[8]:
[Row(CrimeDate=u'01/14/2017', CrimeTime=u'23:30:00', CrimeCode=u'6D', Location=u'1700 AISQUITH ST', Description=u'LARCENY FROM AUTO', Inside/Outside=None, Weapon=None, Post=u'343', District=u'EASTERN', Neighborhood=u'Oliver', Location 1=u'(39.3098700000, -76.6025400000)', Premise=None, Year=u'2017', Total Incidents=u'1'),
 Row(CrimeDate=u'01/14/2017', CrimeTime=u'22:30:00', CrimeCode=u'5D', Location=u'800 N CALVERT ST', Description=u'BURGLARY', Inside/Outside=None, Weapon=None, Post=u'142', District=u'CENTRAL', Neighborhood=u'Mount Vernon', Location 1=u'(39.2989900000, -76.6130200000)', Premise=None, Year=u'2017', Total Incidents=u'1')]
In [9]:
# look at the distinct values for 'Description'
df.select('Description').distinct().show(truncate=False)
+--------------------+
|Description         |
+--------------------+
|ROBBERY - COMMERCIAL|
|AGG. ASSAULT        |
|ROBBERY - RESIDENCE |
|ARSON               |
|SHOOTING            |
|ROBBERY - STREET    |
|LARCENY FROM AUTO   |
|AUTO THEFT          |
|LARCENY             |
|HOMICIDE            |
|COMMON ASSAULT      |
|ROBBERY - CARJACKING|
|RAPE                |
|BURGLARY            |
|ASSAULT BY THREAT   |
+--------------------+

In [10]:
# Graph incident count by Description
descr = df.select('Description').toPandas()
descrGrp = descr.groupby('Description').size().rename('counts')
descrPlot = descrGrp.plot(kind='bar')

Harmonize Variables

Standard harmonised variables for crime incident datasets are defined in ./lib/harmonizeCrimeIncidents.py (You can open this module in JupyterNB, and modify the variable list and associated standard variable metadata as required)

What does this do

Harmonization is the process of mapping the raw variables of each incoming dataset to use the standard 'harmonized' variables and associated units of measurement, as much as possible. Harmonized datasets support cross-dataset search as well as the ability to combine/union datasets to perform multi dataset analysis and research.

See examples below for how to generate new variables from existing variables, and how to manipulate variable values. The hz (harmonizeCrimeIncidents) class provides methods to help abstract the code for some common actions.

Why are we doing this

A core challenge when combining loosely coupled in a combined search index is dealing with different names for the same attribute. For example "Sex" versus "Gender" or "48in" versus "4ft". We have a pre-defined set of standard variable names and types that we are using for our search page, the harmonization process ensures that attributes and values in the raw data files to match that predifined set to allow for a consistent search tool across multiple data sets.

In [11]:
# Use hz.mapVar(old, new, keepOrig=False) to create new variables from the original variables, by default dropping 
# the original variable. Use 'keepOrig=True' argument to keep the original variable in the dataset.
# Metadata for the transformation will be captured and included in the data dictionary

df = hz.mapVar(df, "Description", "description_orig", keepOrig=True)  # make a copy of original Description variable
df = hz.mapVar(df, "Location 1", "geolocation")

# Rename any variables that have illegal names 
# no illegal characters or spaces (required to support parquet output format)
# all lowercase variable names (required to provide compatibility with Amazon Athena)
df = hz.makeValidVariableNames(df)
New variable <description_orig> created from <Description>
New variable <geolocation> created from <Location 1>
Dropped variable <Location 1>
New variable <crimedate> created from <CrimeDate>
New variable <crimetime> created from <CrimeTime>
New variable <crimecode> created from <CrimeCode>
New variable <location> created from <Location>
New variable <description> created from <Description>
New variable <insideoutside> created from <Inside/Outside>
Dropped variable <Inside/Outside>
New variable <weapon> created from <Weapon>
New variable <post> created from <Post>
New variable <district> created from <District>
New variable <neighborhood> created from <Neighborhood>
New variable <premise> created from <Premise>
New variable <year> created from <Year>
New variable <totalincidents> created from <Total Incidents>
Dropped variable <Total Incidents>
In [12]:
# Harmonize Description variable to standard values.

# map value to standard descriptions
descrMappings = {
    "ARSON" : "Arson",
    "AGG. ASSAULT" : "Assault",
    "ASSAULT BY THREAT" : "Assault",
    "COMMON ASSAULT" : "Assault",
    "RAPE" : "Rape",
    "BURGLARY" : "Burglary",
    "ROBBERY - COMMERCIAL" : "Robbery",
    "ROBBERY - RESIDENCE" : "Robbery",
    "ROBBERY - STREET" : "Robbery",
    "ROBBERY - CARJACKING" : "Robbery",
    "LARCENY FROM AUTO" : "Theft",
    "AUTO THEFT" : "Theft",
    "LARCENY" : "Theft",
    "HOMICIDE" : "Homicide",
    "SHOOTING" : "Weapons"
}
df = hz.mapValues(df, "description", descrMappings)
Values for description converted per supplied mapping
In [13]:
# Add city variable. All rows are given the value of the current city name
df = df.withColumn('city', lit(city)) 

# Add TransformDescr metadata field
hz.addTransformDescr('city','"City" assigned by harmonization code')
print("Add 'city' variable")
Add 'city' variable
In [14]:
# Format geolocation field from '(lat, lon)' to 'lat,long' format understaood by elasticsearch - i.e. remove parenthesis
df = df.withColumn('geolocation', regexp_replace(df['geolocation'], r'[\(\)\s]', '')) 
hz.addTransformDescr('geolocation','geolocation format harmonised to "latitude, longitude"')
print("Harmonized geolocation variable")
Harmonized geolocation variable
In [15]:
# filter out records with empty or null coordinates
c1 = df.count()
df = df.where(length('geolocation') > 0).where(length('geolocation') > 0)
c2 = df.count()
print("Deleted {} rows with corrupted coordinates in LATITUDE and LONGITUDE".format(c1-c2))
Deleted 410 rows with corrupted coordinates in LATITUDE and LONGITUDE
In [16]:
# Generate standard datetime, date part, and time part fields from raw CrimeDate and CrimeTime fields 
# for simplicity we will keep all times in localtime (ignoring timezones)

# Split CrimeDate into year, month, day (all defined as harmonized variables in the hz class)
# CrimeDate is formatted as day/month/year
dateregex = r'(\d+)/(\d+)/(\d+)'
df = df.withColumn('month', regexp_extract('CrimeDate', dateregex, 1))
hz.addTransformDescr('month','month, extracted from CrimeDate')
df = df.withColumn('day', regexp_extract('CrimeDate', dateregex, 2))
hz.addTransformDescr('day','day, extracted from CrimeDate')
df = df.withColumn('year', regexp_extract('CrimeDate', dateregex, 3))
hz.addTransformDescr('year','year, extracted from CrimeDate')

# Split CrimeTime into hour & minute (both defined as harmonized variables in the hz class)
# CrimeDate is formatted as either 18:51:00 or 1851 - both formats matched below
time1regex = r'(\d+):(\d+):(\d+)'
time2regex = r'(\d\d)(\d\d)'
df = df.withColumn('hour1', regexp_extract('CrimeTime', time1regex, 1))        # match 18:51:00 format, or ""
df = df.withColumn('hour2', regexp_extract('CrimeTime', time2regex, 1))        # match 1851 format, or ""
df = df.withColumn('hour', concat(df.hour1,df.hour2))                          # combine above matches
df = df.drop('hour1').drop('hour2')                                            # drop temporary columns
df = df.withColumn('hour', regexp_replace('hour', '24', '00')) 
hz.addTransformDescr('hour','hour, extracted from CrimeTime')
df = df.withColumn('minute1', regexp_extract('CrimeTime', time1regex, 2))      # match 18:51:00 format, or ""
df = df.withColumn('minute2', regexp_extract('CrimeTime', time2regex, 2))      # match 1851 format, or ""
df = df.withColumn('minute', concat(df.minute1,df.minute2))                    # combine above matches
df = df.drop('minute1').drop('minute2')                                        # drop temporary columns
hz.addTransformDescr('minute','minute, extracted from CrimeTime')

# Create new datetime field in format YYYY-MM-DD hhmm (defined as a harmonized variable in the hz class)
df = df.withColumn('datetime',concat(concat_ws('-',df.year,df.month,df.day),lit(' '),concat_ws(':',df.hour,df.minute,lit('00'))).cast("timestamp"))
hz.addTransformDescr('datetime','Full timestamp with date and time, eg 2007-04-05 14:30')  

# Drop the original CrimeDate and CrimeTime fields - no longer needed
df = df.drop('CrimeDate').drop('CrimeTime')

# Cast all the date time part fields from string to int (allows use of numeric range filters in the search UI)
for col in ['year','month','day','hour','minute']:
    df = df.withColumn(col, df[col].cast("int"))

# Add dayOfWeek variable, eg Monday, Tuesday, etc. (defined as a harmonized variable in the hz class)
df = df.withColumn('dayofweek',date_format('datetime', 'EEEE'))
hz.addTransformDescr('dayofweek','day of Week, calculated from datetime')  

print("Harmonized date & time variables")
Harmonized date & time variables
In [17]:
df.select('datetime').show(2,truncate=False)
+---------------------+
|datetime             |
+---------------------+
|2017-01-14 23:30:00.0|
|2017-01-14 22:30:00.0|
+---------------------+
only showing top 2 rows

In [18]:
# Add dataset descriptive variables (defined in hz class). 

# location of raw data
df = df.withColumn('rawdatapath', lit(datasets3))
hz.addTransformDescr('rawdatapath','Assigned by harmonization code')

# location of harmonized data
df = df.withColumn('harmonizeddatapath', lit(outputroot))
hz.addTransformDescr('harmonizeddatapath','Assigned by harmonization code')

# URL for this notoebook (notebook will be saved/published using cells at the end of the notebook)
df = df.withColumn('notebookhtml', lit(notebook_urlbase + ".html"))
hz.addTransformDescr('notebookhtml','Assigned by harmonization code')

print("Added dataset descriptive variables")
Added dataset descriptive variables

Add metadata for additional variables

Create some additional variables for this specific data set.

Here we can assign a default 'vargroup' for all variables that are not part of the defined list of harmonized variables in the hz class, using the hz.addVarGroup() method. (The 'vargroup' is used by the search UI to group variables under 'accordian' panels in the search page side bar).

We can also assign custom metadata to selected unharmonized variables as desired. NOTE:

  • default metadata for harmonized variables is defined in the hz class already, so we can ignore those
  • unharmonized variables will be assigned default metadata automatically when we build the data dictionary below using hz.buildDataDict(). However, you might want to explicitly assign metadata to selected variables to control the search UI widget type, and/or to add descriptions to the dictionary.

What does this do

This process adds customized search parameters to this specific data set in the UI that aren't part of our standard set.

Why are we doing this

Not all individual variables within a single data set are always valuable on their own. Some need some additional logic or combination to make search and discovery a better experience. This process allows the harmonization routine to apply that logic and expose it within the UI

In [19]:
# vargroups are used to define the Search Filter UI 'accordians' and their order
# set default variable group - used if variable is not exicitly assigned to a group
defaultVarGroup = "{0} (Unharmonized)".format(city)
hz.addVarGroup(defaultVarGroup,order=90,default=True)

# Metadata for the harmonized variables is already defined in 'hz' class.
# We can add metadata here for additional (non harmonized) variables to control
# how they are presented in the data dictionary and search UI. 

# Here, "Inside/Outside" is not defined as one of the standard harmonized variables, so let's define its metadata here
# Set 'type' to use an enumerated drowndown selector. 
# (use the hz.get_unique_values() method to build a value list for the enum selector widget.)
values=",".join(hz.get_unique_values(df,'insideoutside'))
hz.addVarMetadata("insideoutside", 
                 vargroup=defaultVarGroup, 
                 type="enum,%s" % values, 
                 descr="Incident occurred inside or outside", 
                 uifilter=True)
print("Added metadata for variable <insideoutside>")
Added metadata for variable <insideoutside>
In [20]:
df.head(2)
Out[20]:
[Row(description=u'Theft', crimecode=u'6D', location=u'1700 AISQUITH ST', weapon=None, post=u'343', district=u'EASTERN', neighborhood=u'Oliver', premise=None, year=2017, description_orig=u'LARCENY FROM AUTO', geolocation=u'39.3098700000,-76.6025400000', insideoutside=None, totalincidents=u'1', city=u'Baltimore', month=1, day=14, hour=23, minute=30, datetime=datetime.datetime(2017, 1, 14, 23, 30), dayofweek=u'Saturday', rawdatapath=u's3://datasearch-blog-jupyterspark-180dxejofo-emrbucket-1fvo1qv4b0l67/crimedata/raw/Baltimore.csv', harmonizeddatapath=u's3://datasearch-blog-jupyterspark-180dxejofo-emrbucket-1fvo1qv4b0l67/crimedata/harmonized/Baltimore', notebookhtml=u'https://s3.amazonaws.com/datasearch-blog-jupyterspark-180dxejofo-emrbucket-1fvo1qv4b0l67/crimedata/harmonized/Baltimore/docs/Baltimore-notebook.html'),
 Row(description=u'Burglary', crimecode=u'5D', location=u'800 N CALVERT ST', weapon=None, post=u'142', district=u'CENTRAL', neighborhood=u'Mount Vernon', premise=None, year=2017, description_orig=u'BURGLARY', geolocation=u'39.2989900000,-76.6130200000', insideoutside=None, totalincidents=u'1', city=u'Baltimore', month=1, day=14, hour=22, minute=30, datetime=datetime.datetime(2017, 1, 14, 22, 30), dayofweek=u'Saturday', rawdatapath=u's3://datasearch-blog-jupyterspark-180dxejofo-emrbucket-1fvo1qv4b0l67/crimedata/raw/Baltimore.csv', harmonizeddatapath=u's3://datasearch-blog-jupyterspark-180dxejofo-emrbucket-1fvo1qv4b0l67/crimedata/harmonized/Baltimore', notebookhtml=u'https://s3.amazonaws.com/datasearch-blog-jupyterspark-180dxejofo-emrbucket-1fvo1qv4b0l67/crimedata/harmonized/Baltimore/docs/Baltimore-notebook.html')]

Generate Dictionary

Generate a dictionary table containing field distribution stats and metadata data from the mappings

What does this do

All variables that a) don't match standard harmonized variables, and b) don't have added metadata will be assigned to the default vargroup, and the variable type will be derived from the data distribution characteristics, calculated by hz.buildDataDict().

Why are we doing this

The data dictionary is used to generate dynamic search widgets and tools based on the data sets themselves. By basing our search widgets on the data itself, rather than hard-coded, it allows the search UI to update based on available data.

In [21]:
df_dict = hz.buildDataDict(df)
print("Data Dictionary created.")
Data Dictionary created.

Save Data and Dictionary

Use the hiveContext object 'hc' to create a new schema for this city, and save the data dafarame and dictionary dataframe as tables in this schema with the hz.saveAsParquetTable() method.

What does this do

Data and the associated dictionary information is saved to the S3 output path as parquet files.

Why are we doing this

This allows the tables we've created or modified through harmonization to be easily restored, combined, and analysed using SQL.

In [22]:
# Drop and create schema
schema="incidents"
hc.sql("DROP SCHEMA IF EXISTS {0} CASCADE".format(schema))
hc.sql("CREATE SCHEMA {0} COMMENT 'Crime incident data for {0}'".format(schema))

# Create Incident Data as SparkSQL table with S3 backed storage
data_table=city.lower()
data_table_ddl=hz.saveAsParquetTable(df,schema,data_table,outputpath_data)

# Create Dictionary as SparkSQL table with S3 backed storage
dict_table = data_table+"_dict"
dict_table_ddl=hz.saveAsParquetTable(df_dict.coalesce(1),schema,dict_table,outputpath_dictionary)

print "Done creating tables"
Creating Spark SQL table: incidents.baltimore
Creating Spark SQL table: incidents.baltimore_dict
Done creating tables

Create External Tables in Amazon Athena

What does this do

The S3 parquet files containing the harmonizd data are registered as Amazon Athena external tables.

Why are we doing this

You can use Amazon Athena to perform detailed ad-hoc analysis of this and other harmonised datasets using the familiar power of SQL. Using Athena also allows you to easily integrate the dataset with Amazon Quicksight where you can create visual analyses and dashboards.

In [23]:
ddlList=[
    "CREATE DATABASE IF NOT EXISTS `{0}`;".format(schema),
    "DROP TABLE IF EXISTS `{0}`.`{1}`;".format(schema,data_table),
    "DROP TABLE IF EXISTS `{0}`.`{1}`;".format(schema,dict_table),
    data_table_ddl,
    dict_table_ddl
]
athena_s3_staging_dir = "s3://{0}/athena_staging_dir".format(scratch_bucket)
hz.executeAthenaDDL(athena_s3_staging_dir, ddlList)
Exectuting Athena DDL: CREATE DATABASE IF NOT EXISTS `incidents`;
Exectuting Athena DDL: DROP TABLE IF EXISTS `incidents`.`baltimore`;
Exectuting Athena DDL: DROP TABLE IF EXISTS `incidents`.`baltimore_dict`;
Exectuting Athena DDL: CREATE EXTERNAL TABLE `incidents`.`baltimore` (`description` STRING, `crimecode` STRING, `location` STRING, `weapon` STRING, `post` STRING, `district` STRING, `neighborhood` STRING, `premise` STRING, `year` INT, `description_orig` STRING, `geolocation` STRING, `insideoutside` STRING, `totalincidents` STRING, `city` STRING, `month` INT, `day` INT, `hour` INT, `minute` INT, `datetime` TIMESTAMP, `dayofweek` STRING, `rawdatapath` STRING, `harmonizeddatapath` STRING, `notebookhtml` STRING)
STORED AS parquet
LOCATION 's3://datasearch-blog-jupyterspark-180dxejofo-emrbucket-1fvo1qv4b0l67/crimedata/harmonized/Baltimore/data/table=baltimore/';
Exectuting Athena DDL: CREATE EXTERNAL TABLE `incidents`.`baltimore_dict` (`dict_field` STRING, `dict_count` BIGINT, `dict_countdistinct` BIGINT, `dict_countmissing` BIGINT, `dict_mean` DOUBLE, `dict_stddev` DOUBLE, `dict_min` STRING, `dict_max` STRING, `dict_vargroup` STRING, `dict_vardescr` STRING, `dict_uifilter` STRING, `dict_varmapping` STRING, `dict_vartype` STRING)
STORED AS parquet
LOCATION 's3://datasearch-blog-jupyterspark-180dxejofo-emrbucket-1fvo1qv4b0l67/crimedata/harmonized/Baltimore/dictionary/table=baltimore_dict/';

Index Data and Dictionary

What does this do

Creates or replaces the elastic search index to store our harmonized data and assoicated dictionary file

Why are we doing this

We save both incident data and dictionary information to elastic search, to power the search page. The dictionary fields are used to dynamically build the search filter panels in the search page side bar - these fields identify each variable, it's vargroup (accordian panel), type (ui selector to use), & description (hover tooltip). The incident record fields are used for the dataset record search.

Call es.createOrReplaceIndex(es_index) to reset and set up default field mappings for the index (see ./lib/elasticsearch.py for more info on default mappings). You can also optionally specify field mappings for individual fields using es.addTypeMapping() as illustrated below, to support the search features you need - for example use a mapping to set date type for timestamps, or to speficy geo_point field if you want to use maps in your Kibana dashboard.

In [24]:
# set up data index
# index name city name for uniqueness, and *harmonized* to allow ES queries across all datasets
es_dataindex = "{0}_harmonized".format(city.lower())
es_datatype = 'incidents'
es.createOrReplaceIndex(es_dataindex)

# create mappings for geolocation and datetime field.. all other fields inherit default mapping
mapping="""
{
    "properties": {
        "geolocation": {
          "type": "geo_point"
        },
        "datetime": {
          "type": "date",
          "format" : "yyyy-MM-dd HH:mm:ss"}
        }
    }
}
"""
es.addTypeMapping(es_dataindex, es_datatype, mapping)
Deleted existing elasticsearch documents (baltimore_harmonized)
Create index <baltimore_harmonized> response: {"acknowledged":true}
Add type mapping for <baltimore_harmonized.incidents> response: {"acknowledged":true}
In [25]:
# set up dictionary index
# index name city name for uniqueness, and *dictionary* to allow ES queries across all datasets
es_dictindex = "{0}_dictionary".format(city.lower())
es_dicttype = 'dictionary'
es.createOrReplaceIndex(es_dictindex)
Deleted existing elasticsearch documents (baltimore_dictionary)
Create index <baltimore_dictionary> response: {"acknowledged":true}

All data and dictionary fields can be indexed, by calling ex.saveToEs() and passing the df and df_dict object directly, as shown below. If instead you want to index a subset of the variables, make copy of the data dataframe, drop columns you don't want to index, generate a new dictionary dataframe using the new data dataframe as the argument to hz.buildDataDict(df_for_indexing), and pass the new data and dictionary dataframes to es.saveToEs() as shown below.

In [26]:
# index all variables in data dataframe
print("Saving data to elasticsearch - please be patient")
df = df.withColumn("datetime",df["datetime"].cast("string")) # elasticsearch needs datetimes in a string type
es.saveToEs(df,index=es_dataindex,doctype=es_datatype)
Saving data to elasticsearch - please be patient
Dataset 1 saved to elasticsearch <baltimore_harmonized/incidents>
In [27]:
# index all variables in dictionary dataframe
print("Saving dictionary to elasticsearch - please be patient")
es.saveToEs(df_dict,index=es_dictindex,doctype=es_dicttype)
Saving dictionary to elasticsearch - please be patient
Dataset 1 saved to elasticsearch <baltimore_dictionary/dictionary>

View Harmonized Data Sample

What does this do

Test that the data table was sucessfully created by using SQL to load a few rows.

Why are we doing this

What we see below is what will be exposed in the search UI, so we want to see what our harmonized data looks like. Did we get all the right headers? Does it match the target data type? Is any applied logic correct? etc...

In [28]:
sql="SELECT * FROM %s.%s LIMIT 3" % (schema, data_table)
# run query, convert results to a local pandas dataframe, and display as an HTML table.
hc.sql(sql).show(truncate=False)
+-----------+---------+----------------+------+----+--------+---------------+-------+----+-----------------+----------------------------+-------------+--------------+---------+-----+---+----+------+---------------------+---------+------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------+
|description|crimecode|location        |weapon|post|district|neighborhood   |premise|year|description_orig |geolocation                 |insideoutside|totalincidents|city     |month|day|hour|minute|datetime             |dayofweek|rawdatapath                                                                                     |harmonizeddatapath                                                                                 |notebookhtml                                                                                                                                        |
+-----------+---------+----------------+------+----+--------+---------------+-------+----+-----------------+----------------------------+-------------+--------------+---------+-----+---+----+------+---------------------+---------+------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------+
|Theft      |6D       |1700 AISQUITH ST|null  |343 |EASTERN |Oliver         |null   |2017|LARCENY FROM AUTO|39.3098700000,-76.6025400000|null         |1             |Baltimore|1    |14 |23  |30    |2017-01-14 23:30:00.0|Saturday |s3://datasearch-blog-jupyterspark-180dxejofo-emrbucket-1fvo1qv4b0l67/crimedata/raw/Baltimore.csv|s3://datasearch-blog-jupyterspark-180dxejofo-emrbucket-1fvo1qv4b0l67/crimedata/harmonized/Baltimore|https://s3.amazonaws.com/datasearch-blog-jupyterspark-180dxejofo-emrbucket-1fvo1qv4b0l67/crimedata/harmonized/Baltimore/docs/Baltimore-notebook.html|
|Burglary   |5D       |800 N CALVERT ST|null  |142 |CENTRAL |Mount Vernon   |null   |2017|BURGLARY         |39.2989900000,-76.6130200000|null         |1             |Baltimore|1    |14 |22  |30    |2017-01-14 22:30:00.0|Saturday |s3://datasearch-blog-jupyterspark-180dxejofo-emrbucket-1fvo1qv4b0l67/crimedata/raw/Baltimore.csv|s3://datasearch-blog-jupyterspark-180dxejofo-emrbucket-1fvo1qv4b0l67/crimedata/harmonized/Baltimore|https://s3.amazonaws.com/datasearch-blog-jupyterspark-180dxejofo-emrbucket-1fvo1qv4b0l67/crimedata/harmonized/Baltimore/docs/Baltimore-notebook.html|
|Theft      |6D       |1100 E BIDDLE ST|null  |311 |EASTERN |Johnston Square|null   |2017|LARCENY FROM AUTO|39.3039300000,-76.6027600000|null         |1             |Baltimore|1    |14 |22  |0     |2017-01-14 22:00:00.0|Saturday |s3://datasearch-blog-jupyterspark-180dxejofo-emrbucket-1fvo1qv4b0l67/crimedata/raw/Baltimore.csv|s3://datasearch-blog-jupyterspark-180dxejofo-emrbucket-1fvo1qv4b0l67/crimedata/harmonized/Baltimore|https://s3.amazonaws.com/datasearch-blog-jupyterspark-180dxejofo-emrbucket-1fvo1qv4b0l67/crimedata/harmonized/Baltimore/docs/Baltimore-notebook.html|
+-----------+---------+----------------+------+----+--------+---------------+-------+----+-----------------+----------------------------+-------------+--------------+---------+-----+---+----+------+---------------------+---------+------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------+

View Data Dictionary

What does this do

Test the summary of the data to verify our dictionary was successfully created

Why are we doing this

The data dictionary drives how the data is organized and presented in the UI. We want to ensure we have our expected row counts, data types, mins and means, and that data is organized correctly for display in the UI. What we see here will translate into the search widgets and any available ranges that control those widgets.

In [29]:
sql="SELECT * FROM %s.%s ORDER BY dict_field ASC" % (schema, dict_table)
# run query, convert results to a local pandas dataframe, and display as an HTML table.
HTML(hc.sql(sql).toPandas().to_html())
Out[29]:
dict_field dict_count dict_countdistinct dict_countmissing dict_mean dict_stddev dict_min dict_max dict_vargroup dict_vardescr dict_uifilter dict_varmapping dict_vartype
0 city 242989 1 0 NaN NaN Baltimore Baltimore 04.Location Incident city True "City" assigned by harmonization code text
1 crimecode 242989 81 0 NaN NaN 1F 9S 90.Baltimore (Unharmonized) unknown True Source CrimeCode. Variable value unchanged fro... text
2 datetime 242989 155004 0 NaN NaN 2012-01-01 00:00:00 2017-01-14 23:30:00 00.Date and Time Incident date and time True Full timestamp with date and time, eg 2007-04-... datetime
3 day 242989 31 0 15.766570 8.802585 1 31 00.Date and Time Incident date True day, extracted from CrimeDate enum,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17...
4 dayofweek 242989 7 0 NaN NaN Friday Wednesday 00.Date and Time Incident day of week True day of Week, calculated from datetime enum,Sunday,Monday,Tuesday,Wednesday,Thursday,...
5 description 242989 8 0 NaN NaN Arson Weapons 01.Incident Incident description True Source Description. Map values {"HOMICIDE": "H... text
6 description_orig 242989 15 0 NaN NaN AGG. ASSAULT SHOOTING 90.Baltimore (Unharmonized) unknown True Source Description. Variable value unchanged f... text
7 district 242947 9 42 NaN NaN CENTRAL WESTERN 90.Baltimore (Unharmonized) unknown True Source District. Variable value unchanged from... text
8 geolocation 242989 88887 0 NaN NaN 37.5772600000,-81.5291900000 41.6316400000,-76.5152600000 04.Location Incident geoLocation coordinates False Source Location 1. geolocation format harmonis... text
9 harmonizeddatapath 242989 1 0 NaN NaN s3://datasearch-blog-jupyterspark-180dxejofo-e... s3://datasearch-blog-jupyterspark-180dxejofo-e... 99.Miscellaneous S3 Path to harmonized dataset root prefix. False Assigned by harmonization code text
10 hour 242989 24 0 13.318879 6.738351 0 23 00.Date and Time Incident hour True hour, extracted from CrimeTime enum,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17...
11 insideoutside 238648 4 4341 NaN NaN I Outside 90.Baltimore (Unharmonized) Incident occurred inside or outside True Source Inside/Outside. Variable value unchange... enum,I,Inside,O,Outside
12 location 242983 25759 6 NaN NaN "2900 O""DONNELL ST" YORK RD & E 41ST ST 04.Location Incident location/address True Source Location. Variable value unchanged from... text
13 minute 242989 60 0 18.990201 18.285722 0 59 00.Date and Time Incident minute True minute, extracted from CrimeTime range,0,59,1
14 month 242989 12 0 6.661849 3.363484 1 12 00.Date and Time Incident month True month, extracted from CrimeDate enum,1,2,3,4,5,6,7,8,9,10,11,12
15 neighborhood 242005 278 984 NaN NaN Abell York-Homeland 90.Baltimore (Unharmonized) unknown True Source Neighborhood. Variable value unchanged ... text
16 notebookhtml 242989 1 0 NaN NaN https://s3.amazonaws.com/datasearch-blog-jupyt... https://s3.amazonaws.com/datasearch-blog-jupyt... 99.Miscellaneous URL to Jupyter notebook containing documentati... False Assigned by harmonization code text
17 post 242012 148 977 NaN NaN 0 943 90.Baltimore (Unharmonized) unknown True Source Post. Variable value unchanged from sou... text
18 premise 238174 203 4815 NaN NaN ALLEY YARD/BUSINESS 90.Baltimore (Unharmonized) unknown True Source Premise. Variable value unchanged from ... text
19 rawdatapath 242989 1 0 NaN NaN s3://datasearch-blog-jupyterspark-180dxejofo-e... s3://datasearch-blog-jupyterspark-180dxejofo-e... 99.Miscellaneous S3 Path to raw dataset. False Assigned by harmonization code text
20 totalincidents 242989 1 0 NaN NaN 1 1 90.Baltimore (Unharmonized) unknown True Source Total Incidents. Variable value unchang... text
21 weapon 82236 4 160753 NaN NaN FIREARM OTHER 90.Baltimore (Unharmonized) unknown True Source Weapon. Variable value unchanged from s... text
22 year 242989 6 0 2014.001255 1.436607 2012 2017 00.Date and Time Incident year True Source Year. year, extracted from CrimeDate range,2000,2017,1

Publish Notebook

Save the notebook using javascript to trigger the save_checkpoint method.

What does this do

Convert notebook .ipny to html, and use hz.publishNotebookToS3 to copy the .ipny and .html files to the target S3 folder with web access enabled.

Why are we doing this

This provides a record within the UI of all the harmonization logic used to transform the raw data into what is exposed through the search and discovery tool. This record allows for easier verification, enhancements, or modifications of harmonization routines.

In [30]:
%%html
<!- SHOW / HIDE CODE TOGGLE ->
<script>
    var code_hide=true; //true -> hide code at first
    function code_showhide_toggle() {
        if (code_hide){
            $('div.input').hide();
            $('div.prompt').hide();
        } else {
            $('div.input').show();
            $('div.prompt').show();
        }
        code_hide = !code_hide
    }
    $( document ).ready(code_showhide_toggle);
</script>
In [31]:
!date
Fri Jan 20 20:36:49 UTC 2017
In [32]:
%%javascript
// save current notebook
IPython.notebook.save_checkpoint()
In [45]:
# convert ipynb to html, and move into subfolder
!jupyter nbconvert --to html $citynotebook
[NbConvertApp] Converting notebook Baltimore-notebook.ipynb to html
[NbConvertApp] Writing 381256 bytes to Baltimore-notebook.html
Moving Baltimore-notebook.html to ./html/Baltimore-notebook.html
In [34]:
# copy saved notebook (ipynb and html formats) to target S3 bucket
hz.publishNotebookToS3(outputpath_doc, notebook_urlbase, citynotebook) 
# move html copy of notebook into subfolder
f = citynotebook + ".html"
!mkdir -p ./html
os.rename()
Copy Baltimore-notebook.ipynb to s3://datasearch-blog-jupyterspark-180dxejofo-emrbucket-1fvo1qv4b0l67/crimedata/harmonized/Baltimore/docs/Baltimore-notebook.ipynb
URL: https://s3.amazonaws.com/datasearch-blog-jupyterspark-180dxejofo-emrbucket-1fvo1qv4b0l67/crimedata/harmonized/Baltimore/docs/Baltimore-notebook.ipynb
Copy Baltimore-notebook.html to s3://datasearch-blog-jupyterspark-180dxejofo-emrbucket-1fvo1qv4b0l67/crimedata/harmonized/Baltimore/docs/Baltimore-notebook.html
URL: https://s3.amazonaws.com/datasearch-blog-jupyterspark-180dxejofo-emrbucket-1fvo1qv4b0l67/crimedata/harmonized/Baltimore/docs/Baltimore-notebook.html
In [35]:
print("Notebook execution complete.")
Notebook execution complete.
In [ ]:
!aws s3 cp %s %s --sse --grants read=uri=http://acs.amazonaws.com/groups/global/AllUsers