In [26]:
%%html 
<a href="javascript:code_showhide_toggle()">Show/Hide Code</a>

Harmonize and Index Detroit Crime Incident Dataset¶

Contents¶

Introduction¶

This notebook will illustrate use of Jupyter with PySpark to

  • harmonize crime datasets from multiple jurisdictions
  • add metadata to support a data driven search UI
  • create a data dictionary with metadata for each variable
  • save data and metadata as Spark SQL tables backed by (Athena compatible) Parquet files in S3
  • index data and metadata to support search
  • self-publish this notebook so it can be linked from the search UI to provide transparency and reproducability

Setup¶

What does this do¶

This sets up our environment variables to use for data input, output, indexing, etc...

In [27]:
# CONFIGURATION VARIABLES
city="Detroit"
cityurl="https://data.detroitmi.gov/api/views/6gdg-y3kf/rows.csv?accessType=DOWNLOAD"
citynotebook="Detroit-notebook" # citynotebook is used to automatically publish the notebook at the end


# location for raw (unprocessed) dataset in S3
scratch_bucket=os.environ["S3_SCRATCH_BUCKET"]
datasets3 = "s3://{0}/crimedata/raw/{1}.csv".format(scratch_bucket, city)

# locaton for harmonized (processed) dataset in S3. Structure is:
#   outputroot
#       |--- data - incidents - multiple CSV files
#       |--- dictionary - incidents - CSV file containing data dictionary
#       |--- doc - copy of notebook (published for web access)
outputroot = "s3://{0}/crimedata/harmonized/{1}".format(scratch_bucket,city)
outputpath_data = "{}/data".format(outputroot)
outputpath_dictionary="{}/dictionary".format(outputroot)
outputpath_doc="{}/docs".format(outputroot)
notebook_urlbase = "https://s3.amazonaws.com/{0}/{1}".format(outputpath_doc.replace("s3://",""), citynotebook)

# elasticsearch cluster endpoint
# - use local proxy service (installed via an EMR bootstrap) which signs ES API calls using EMR EC2 role.
esendpoint="localhost"
esport=9200

# Summary of configuration
print("City: {}".format(city))
print("Notebook Name: {}".format(citynotebook))
print("Dataset URL: {}".format(cityurl))
print("S3 dataset input: {}".format(datasets3))
print("Harmonized output: {}".format(outputroot))
print("ElasticSearch Cluster: {}.{}".format(esendpoint, esport))
print("Setup & Initialization done")
City: Detroit
Notebook Name: Detroit-notebook
Dataset URL: https://data.detroitmi.gov/api/views/6gdg-y3kf/rows.csv?accessType=DOWNLOAD
S3 dataset input: s3://datasearch-blog-jupyterspark-xn0iflaqfg-emrbucket-10gb2028l2ygj/crimedata/raw/Detroit.csv
Harmonized output: s3://datasearch-blog-jupyterspark-xn0iflaqfg-emrbucket-10gb2028l2ygj/crimedata/harmonized/Detroit
ElasticSearch Cluster: localhost.9200
Setup & Initialization done
In [28]:
# INITIALIZATION - instantiate objects used by notebook

%matplotlib inline
from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql.functions import *
from IPython.display import display, HTML
import datetime
import subprocess

# elasticsearch and harmonization objects defined in ./lib/esindex.py and ./lib/harmonizeCrimeIncidents.py
from lib.esindex import esindex
from lib.harmonizeCrimeIncidents import harmonizeCrimeIncidents

sc = SparkContext.getOrCreate()
hc = HiveContext(sc)
es = esindex(esendpoint)
hz = harmonizeCrimeIncidents(hc)
print("Initialization complete.")
Initialization complete.

Load Input Data¶

What does this do¶

This takes a data source file, in our case a CSV file, and puts it into our S3 bucket for procssing. Once the source file is copied into the bucket, we will load the file into a Spark dataframe for analysis and processing.## Load Input Data

In [29]:
# Download & install dataset from website

def copy_from_website_to_s3(city, cityurl, datasets3):
    tmpfile="/tmp/{0}.csv".format(city)
    print("Downloading {0} from: {1}".format(tmpfile, cityurl))
    subprocess.check_output("curl {0} -o {1}".format(cityurl, tmpfile).split())
    print("Copying {0} to: {1}".format(tmpfile, datasets3))
    subprocess.check_output("aws s3 cp {0} {1} --sse AES256".format(tmpfile, datasets3).split())
    os.remove(tmpfile)
    
# uncomment if you want to re-download the dataset
copy_from_website_to_s3(city, cityurl, datasets3)
Downloading /tmp/Detroit.csv from: https://data.detroitmi.gov/api/views/6gdg-y3kf/rows.csv?accessType=DOWNLOAD
Copying /tmp/Detroit.csv to: s3://datasearch-blog-jupyterspark-xn0iflaqfg-emrbucket-10gb2028l2ygj/crimedata/raw/Detroit.csv
In [30]:
# Read input datasets as DataFrames
# No need to infer schema - all variables initially typed as strings
print("Loading dataset from S3")
df = hc.read.load( datasets3,
                   format='com.databricks.spark.csv',
                   header='true',
                   inferSchema='false',
                   delimiter=',') 
df_in=df # keep copy of raw data for reference
print("Dataset {0}: Loaded {1} rows.".format(city, df.count()))
Loading dataset from S3
Dataset Detroit: Loaded 96812 rows.

Exploratory Analysis¶

Edit as desired to interactively explore the data

What does this do¶

With the data source file loaded into a Spark Data Frame, you can get some details on the content of the data using methods from Spark, Pandas, and others. This is purely exploratory information to give users insights into the raw data set.

In [31]:
# examine column headers
str(df.columns)
Out[31]:
"['Crime ID', 'Report #', 'Incident Address', 'Offense Description', 'Offense Category', 'State Offense Code', 'Incident Date & Time', 'Incident Time (24h)', 'Day of Week (Sunday is 1)', 'Hour of Day', 'Year', 'Scout Car Area', 'Precinct Number', 'Census Block GEOID', 'Neighborhood', 'Council District', 'Zip Code', 'Longitude', 'Latitude', 'IBR Report Date', 'Location', 'uniq']"
In [32]:
# look at first 2 rows
df.head(2)
Out[32]:
[Row(Crime ID=u'3027833', Report #=u'1702210072', Incident Address=u'18400 block of W 8 MILE RD', Offense Description=u'ASSAULT AND BATTERY/SIMPLE ASSAULT', Offense Category=u'DAMAGE TO PROPERTY', State Offense Code=u'2900', Incident Date & Time=u'02/20/2017 11:00:00 PM', Incident Time (24h)=u'2300', Day of Week (Sunday is 1)=u'2', Hour of Day=u'23', Year=u'2017', Scout Car Area=u'0803', Precinct Number=u'08', Census Block GEOID=u'261635408002000', Neighborhood=None, Council District=u'1', Zip Code=u'48219', Longitude=u'-83.22352', Latitude=u'42.44409', IBR Report Date=u'02/23/2017 03:48:59 PM', Location=u'location', uniq=None),
 Row(Crime ID=u'(42.44409', Report #=u' -83.22352)"', Incident Address=u'1', Offense Description=None, Offense Category=None, State Offense Code=None, Incident Date & Time=None, Incident Time (24h)=None, Day of Week (Sunday is 1)=None, Hour of Day=None, Year=None, Scout Car Area=None, Precinct Number=None, Census Block GEOID=None, Neighborhood=None, Council District=None, Zip Code=None, Longitude=None, Latitude=None, IBR Report Date=None, Location=None, uniq=None)]
In [33]:
# look at the distinct values for 'CATEGORY'
df.select('OFFENSE CATEGORY').distinct().show(truncate=False)
+----------------------+
|OFFENSE CATEGORY      |
+----------------------+
|FRAUD                 |
|OUIL                  |
|WEAPONS OFFENSES      |
|LIQUOR                |
|null                  |
|STOLEN VEHICLE        |
|SOLICITATION          |
|ARSON                 |
|DAMAGE TO PROPERTY    |
|OBSTRUCTING THE POLICE|
|AGGRAVATED ASSAULT    |
|GAMBLING              |
|MISCELLANEOUS         |
|ASSAULT               |
|DANGEROUS DRUGS       |
|EXTORTION             |
|FORGERY               |
|SEX OFFENSES          |
|ROBBERY               |
|LARCENY               |
+----------------------+
only showing top 20 rows

In [34]:
# Graph incident count by Description
descr = df.select('OFFENSE CATEGORY').toPandas()
descrGrp = descr.groupby('OFFENSE CATEGORY').size().rename('counts')
descrPlot = descrGrp.plot(kind='bar')

Harmonize Variables¶

Standard harmonised variables for crime incident datasets are defined in ./lib/harmonizeCrimeIncidents.py (You can open this module in JupyterNB, and modify the variable list and associated standard variable metadata as required)

What does this do¶

Harmonization is the process of mapping the raw variables of each incoming dataset to use the standard 'harmonized' variables and associated units of measurement, as much as possible. Harmonized datasets support cross-dataset search as well as the ability to combine/union datasets to perform multi dataset analysis and research.

See examples below for how to generate new variables from existing variables, and how to manipulate variable values. The hz (harmonizeCrimeIncidents) class provides methods to help abstract the code for some common actions.

Why are we doing this¶

A core challenge when combining loosely coupled in a combined search index is dealing with different names for the same attribute. For example "Sex" versus "Gender" or "48in" versus "4ft". We have a pre-defined set of standard variable names and types that we are using for our search page, the harmonization process ensures that attributes and values in the raw data files to match that predifined set to allow for a consistent search tool across multiple data sets.

In [35]:
# Use hz.mapVar(old,new, keepOrig=False) to create new variables from the original variables, by default dropping 
# the original variable. Use 'keepOrig=True' argument to keep the original variable in the dataset.
# Metadata for the transformation will be captured and included in the data dictionary

df = hz.mapVar(df, "INCIDENT ADDRESS", "location")
df = hz.mapVar(df, "OFFENSE CATEGORY", "description", keepOrig=True)  # make a copy of CATEGORY variable, keeping original

# Rename any variables that have illegal names 
# no illegal characters or spaces (required to support parquet output format)
# all lowercase variable names (required to provide compatibility with Amazon Athena)
df = hz.makeValidVariableNames(df)
New variable <location> created from <INCIDENT ADDRESS>
Dropped variable <INCIDENT ADDRESS>
New variable <description> created from <OFFENSE CATEGORY>
New variable <crimeid> created from <Crime ID>
Dropped variable <Crime ID>
New variable <report#> created from <Report #>
Dropped variable <Report #>
New variable <offensedescription> created from <Offense Description>
Dropped variable <Offense Description>
New variable <offensecategory> created from <Offense Category>
Dropped variable <Offense Category>
New variable <stateoffensecode> created from <State Offense Code>
Dropped variable <State Offense Code>
New variable <incidentdate&time> created from <Incident Date & Time>
Dropped variable <Incident Date & Time>
New variable <incidenttime24h> created from <Incident Time (24h)>
Dropped variable <Incident Time (24h)>
New variable <dayofweeksundayis1> created from <Day of Week (Sunday is 1)>
Dropped variable <Day of Week (Sunday is 1)>
New variable <hourofday> created from <Hour of Day>
Dropped variable <Hour of Day>
New variable <year> created from <Year>
New variable <scoutcararea> created from <Scout Car Area>
Dropped variable <Scout Car Area>
New variable <precinctnumber> created from <Precinct Number>
Dropped variable <Precinct Number>
New variable <censusblockgeoid> created from <Census Block GEOID>
Dropped variable <Census Block GEOID>
New variable <neighborhood> created from <Neighborhood>
New variable <councildistrict> created from <Council District>
Dropped variable <Council District>
New variable <zipcode> created from <Zip Code>
Dropped variable <Zip Code>
New variable <longitude> created from <Longitude>
New variable <latitude> created from <Latitude>
New variable <ibrreportdate> created from <IBR Report Date>
Dropped variable <IBR Report Date>
In [36]:
# Harmonize Description variable to standard values.
descrMappings = {
    "ABORTION" : "Miscellaneous",
    "AGGRAVATED ASSAULT" : "Assault",
    "ARSON" : "Arson",
    "ASSAULT" : "Assault",
    "BRIBERY" : "Bribery",
    "BURGLARY" : "Burglary",
    "CIVIL" : "Miscellaneous",
    "DAMAGE TO PROPERTY" : "Miscellaneous",
    "DANGEROUS DRUGS" : "Narcotics",
    "DISORDERLY CONDUCT" : "Miscellaneous",
    "DRUNKENNESS" : "OUI",
    "EMBEZZLEMENT" : "Embezzlement",
    "ENVIRONMENT" : "Miscellaneous",
    "ESCAPE" : "Miscellaneous",
    "EXTORTION" : "Miscellaneous",
    "FAMILY OFFENSE" : "Miscellaneous",
    "FORGERY" : "Miscellaneous",
    "FRAUD" : "Fraud",
    "GAMBLING" : "Miscellaneous",
    "HOMICIDE" : "Homicide",
    "IMMIGRATION" : "Miscellaneous",
    "JUSTIFIABLE HOMICIDE" : "Homicide",
    "KIDNAPPING" : "Kidnapping",
    "KIDNAPING" : "Kidnapping",
    "LARCENY" : "Theft",
    "LIQUOR" : "Miscellaneous",
    "MILITARY" : "Miscellaneous",
    "MISCELLANEOUS" : "Miscellaneous",
    "MISCELLANEOUS ARREST" : "Miscellaneous",
    "MURDER/INFORMATION" : "Homicide",
    "NEGLIGENT HOMICIDE" : "Homicide",
    "OBSCENITY" : "Miscellaneous",
    "OBSTRUCTING JUDICIARY" : "Miscellaneous",
    "OBSTRUCTING THE POLICE" : "Miscellaneous",
    "OTHER" : "Miscellaneous",
    "OTHER BURGLARY" : "Burglary",
    "OUIL" : "OUI",
    "ROBBERY" : "Robbery",
    "RUNAWAY" : "Miscellaneous",
    "SOLICITATION" : "Miscellaneous",
    "STOLEN PROPERTY" : "Theft",
    "STOLEN VEHICLE" : "Vehicle Theft",
    "SEXUAL ASSAULT" : "Sex Offenses",
    "SEX OFFENSES" : "Sex Offenses",
    "TRAFFIC" : "Miscellaneous",
    "TRAFFIC OFFENSES" : "Miscellaneous",
    "VAGRANCY (OTHER)" : "Miscellaneous",
    "WEAPONS OFFENSES" : "Weapons"
}
df = hz.mapValues(df, "description", descrMappings)
Values for description converted per supplied mapping
In [37]:
df.select("description").distinct().show()
+-------------+
|  description|
+-------------+
|Miscellaneous|
|Vehicle Theft|
|      Assault|
|      Robbery|
| Embezzlement|
|         null|
|        Theft|
|      Weapons|
|        Arson|
|        Fraud|
|     Homicide|
|    Narcotics|
|     Burglary|
|   Kidnapping|
| Sex Offenses|
|          OUI|
+-------------+

In [38]:
# Add city variable. All rows are given the value of the current city name
df = df.withColumn('city', lit(city)) 

# Add TransformDescr metadata field
hz.addTransformDescr('city','"city" assigned by harmonization code')
print("Add 'city' variable")
Add 'city' variable

Cleanup and harmonise geographic co-ordinate variables. As we can see below, both LAT and LON variables have rows with invalid values - empty strings and invalid coordinates. For this example we will simply remove those rows from our dataset, in order to avoid constructing invalid geo-point coordinates that will cause failuers when attempting to index into elasticsearch later.

In [39]:
df.describe('latitude','longitude').show()
+-------+------------------+------------------+
|summary|          latitude|         longitude|
+-------+------------------+------------------+
|  count|             48406|             48406|
|   mean| 42.32597320476823|-83.38699325476185|
| stddev|0.8142596473263913|3.5176884111315285|
|    min|          32.02636|        -127.91418|
|    max|          42.47535|         -83.30388|
+-------+------------------+------------------+

In [40]:
# filter out records with invalid LAT or LON coordinates
c1 = df.count()
# eliminate empty & null values
df = df.where(length('latitude') > 0).where(length('longitude') > 0)
# eliminate 99999* values
df = df.where(df.latitude < 99999).where(df.longitude < 99999)
df = df.where(df.latitude > 0)  # Detroit latitude always > 0
df = df.where(df.longitude < 0)  # Detroit longitude always < 0
c2 = df.count()
print("Deleted {} rows with corrupted coordinates in LATITUDE and LONGITUDE".format(c1-c2))
Deleted 48406 rows with corrupted coordinates in LATITUDE and LONGITUDE
In [41]:
# Format GeoLocation field by combining LAT and LON
df = df.withColumn('geolocation', concat(df.latitude, lit(','),df.longitude)) 
df = df.drop('latitude').drop('longitude')
hz.addTransformDescr('geolocation','geolocation variable created from LAT and LON variables')
print("Generated Harmonized geolocation variable, and dropped original LAT and LON variables")
Generated Harmonized geolocation variable, and dropped original LAT and LON variables
In [42]:
# Generate standard datetime, date part, and time part fields from raw incidentdate and hour fields 
# for simplicity we will keep all times in localtime (ignoring timezones)

# Split incidentdate into year, month, day (all defined as harmonized variables in the hz class)
# incidentdate is formatted as day/month/year
dateregex = r'(\d+)/(\d+)/(\d+)'
df = df.withColumn('month', regexp_extract('IncidentDate&Time', dateregex, 1))
hz.addTransformDescr('month','month, extracted from incidentdate')
df = df.withColumn('day', regexp_extract('incidentdate&time', dateregex, 2))
hz.addTransformDescr('day','day, extracted from incidentdate')
df = df.withColumn('year', regexp_extract('incidentdate&time', dateregex, 3))
hz.addTransformDescr('year','year, extracted from incidentdate')

# There is no Incident Time field.. Only hour.
df = df.withColumn('hour', format_string("%02d",df['hourofday'].cast('int')))    # 2 digit hour
hz.addTransformDescr('hour','hour, derived from HOUR')
df = df.withColumn('minute', lit('00'))  # Hardcode minute to 00
hz.addTransformDescr('minute','minute, unrecorded in dataset - set to 00')

# Create new datetime field in format YYYY-MM-DD hhmm (defined as a harmonized variable in the hz class)
df = df.withColumn('datetime',concat(concat_ws('-',df.year,df.month,df.day),lit(' '),concat_ws(':',df.hour,df.minute,lit('00'))).cast("timestamp"))
hz.addTransformDescr('datetime','Full timestamp with date and time, eg 2007-04-05 14:30')  

# Drop the original incidentdate field - no longer needed
df = df.drop('incidentdate')

# Cast all the date time part fields from string to int (allows use of numeric range filters in the search UI)
for col in ['year','month','day','hour','minute']:
    df = df.withColumn(col, df[col].cast("int"))

# Add dayofweek variable, eg Monday, Tuesday, etc. (defined as a harmonized variable in the hz class)
df = df.withColumn('dayofweek',date_format('datetime', 'EEEE'))
hz.addTransformDescr('dayofweek','day of Week, calculated from datetime')  

print("Harmonized date & time variables")
Harmonized date & time variables
In [43]:
df.select('datetime').show(2,truncate=False)
+---------------------+
|datetime             |
+---------------------+
|2017-02-20 23:00:00.0|
|2017-02-20 23:00:00.0|
+---------------------+
only showing top 2 rows

In [44]:
# Add dataset descriptive variables (defined in hz class). 

# location of raw data
df = df.withColumn('rawdatapath', lit(datasets3))
hz.addTransformDescr('rawdatapath','Assigned by harmonization code')

# location of harmonized data
df = df.withColumn('harmonizeddatapath', lit(outputroot))
hz.addTransformDescr('harmonizeddatapath','Assigned by harmonization code')

# URL for this notoebook (notebook will be saved/published using cells at the end of the notebook)
df = df.withColumn('notebookhtml', lit(notebook_urlbase + ".html"))
hz.addTransformDescr('notebookhtml','Assigned by harmonization code')

print("Added dataset descriptive variables")
Added dataset descriptive variables

Add metadata for additional variables¶

Create some additional variables for this specific data set.

Here we can assign a default 'vargroup' for all variables that are not part of the defined list of harmonized variables in the hz class, using the hz.addVarGroup() method. (The 'vargroup' is used by the search UI to group variables under 'accordian' panels in the search page side bar).

We can also assign custom metadata to selected unharmonized variables as desired. NOTE:

  • default metadata for harmonized variables is defined in the hz class already, so we can ignore those
  • unharmonized variables will be assigned default metadata automatically when we build the data dictionary below using hz.buildDataDict(). However, you might want to explicitly assign metadata to selected variables to control the search UI widget type, and/or to add descriptions to the dictionary.

What does this do¶

This process adds customized search parameters to this specific data set in the UI that aren't part of our standard set.

Why are we doing this¶

Not all individual variables within a single data set are always valuable on their own. Some need some additional logic or combination to make search and discovery a better experience. This process allows the harmonization routine to apply that logic and expose it within the UI

In [45]:
# vargroups are used to define the Search Filter UI 'accordians' and their order
# set default variable group - used if variable is not exicitly assigned to a group
defaultVarGroup = "{0} (Unharmonized)".format(city)
hz.addVarGroup(defaultVarGroup,order=90,default=True)

# Metadata for the harmonized variables is already defined in 'hz' class.
# We can optionally add metadata here for additional (non harmonized) variables to control
# how they are presented in the data dictionary and search UI. 
# See Baltimore or LosAngeles notebooks for examples
In [46]:
df.head(2)
Out[46]:
[Row(description=u'Miscellaneous', year=2017, neighborhood=None, location=u'18400 block of W 8 MILE RD', uniq=None, crimeid=u'3027833', report#=u'1702210072', offensedescription=u'ASSAULT AND BATTERY/SIMPLE ASSAULT', offensecategory=u'DAMAGE TO PROPERTY', stateoffensecode=u'2900', incidentdate&time=u'02/20/2017 11:00:00 PM', incidenttime24h=u'2300', dayofweeksundayis1=u'2', hourofday=u'23', scoutcararea=u'0803', precinctnumber=u'08', censusblockgeoid=u'261635408002000', councildistrict=u'1', zipcode=u'48219', ibrreportdate=u'02/23/2017 03:48:59 PM', city=u'Detroit', geolocation=u'42.44409,-83.22352', month=2, day=20, hour=23, minute=0, datetime=datetime.datetime(2017, 2, 20, 23, 0), dayofweek=u'Monday', rawdatapath=u's3://datasearch-blog-jupyterspark-xn0iflaqfg-emrbucket-10gb2028l2ygj/crimedata/raw/Detroit.csv', harmonizeddatapath=u's3://datasearch-blog-jupyterspark-xn0iflaqfg-emrbucket-10gb2028l2ygj/crimedata/harmonized/Detroit', notebookhtml=u'https://s3.amazonaws.com/datasearch-blog-jupyterspark-xn0iflaqfg-emrbucket-10gb2028l2ygj/crimedata/harmonized/Detroit/docs/Detroit-notebook.html'),
 Row(description=u'Assault', year=2017, neighborhood=None, location=u'18400 block of W 8 MILE RD', uniq=None, crimeid=u'3027833', report#=u'1702210072', offensedescription=u'ASSAULT AND BATTERY/SIMPLE ASSAULT', offensecategory=u'ASSAULT', stateoffensecode=u'1301', incidentdate&time=u'02/20/2017 11:00:00 PM', incidenttime24h=u'2300', dayofweeksundayis1=u'2', hourofday=u'23', scoutcararea=u'0803', precinctnumber=u'08', censusblockgeoid=u'261635408002000', councildistrict=u'1', zipcode=u'48219', ibrreportdate=u'02/23/2017 03:48:59 PM', city=u'Detroit', geolocation=u'42.44409,-83.22352', month=2, day=20, hour=23, minute=0, datetime=datetime.datetime(2017, 2, 20, 23, 0), dayofweek=u'Monday', rawdatapath=u's3://datasearch-blog-jupyterspark-xn0iflaqfg-emrbucket-10gb2028l2ygj/crimedata/raw/Detroit.csv', harmonizeddatapath=u's3://datasearch-blog-jupyterspark-xn0iflaqfg-emrbucket-10gb2028l2ygj/crimedata/harmonized/Detroit', notebookhtml=u'https://s3.amazonaws.com/datasearch-blog-jupyterspark-xn0iflaqfg-emrbucket-10gb2028l2ygj/crimedata/harmonized/Detroit/docs/Detroit-notebook.html')]

Generate Dictionary¶

Generate a dictionary table containing field distribution stats and metadata data from the mappings

What does this do¶

All variables that a) don't match standard harmonized variables, and b) don't have added metadata will be assigned to the default vargroup, and the variable type will be derived from the data distribution characteristics, calculated by hz.buildDataDict().

Why are we doing this¶

The data dictionary is used to generate dynamic search widgets and tools based on the data sets themselves. By basing our search widgets on the data itself, rather than hard-coded, it allows the search UI to update based on available data.

In [47]:
df_dict = hz.buildDataDict(df)
print("Data Dictionary created.")
Data Dictionary created.

Save Data and Dictionary¶

Use the hiveContext object 'hc' to create a new schema for this city, and save the data dafarame and dictionary dataframe as tables in this schema with the hz.saveAsParquetTable() method.

What does this do¶

Data and the associated dictionary information is saved to the S3 output path as parquet files.

Why are we doing this¶

This allows the tables we've created or modified through harmonization to be easily restored, combined, and analysed using SQL.

In [48]:
# Drop and create schema
schema="incidents"
hc.sql("DROP SCHEMA IF EXISTS {0} CASCADE".format(schema))
hc.sql("CREATE SCHEMA {0} COMMENT 'Crime incident data for {0}'".format(schema))

# Create Incident Data as SparkSQL table with S3 backed storage
data_table=city.lower()
data_table_ddl=hz.saveAsParquetTable(df,schema,data_table,outputpath_data)

# Create Dictionary as SparkSQL table with S3 backed storage
dict_table = data_table+"_dict"
dict_table_ddl=hz.saveAsParquetTable(df_dict.coalesce(1),schema,dict_table,outputpath_dictionary)

print "Done creating tables"
Creating Spark SQL table: incidents.detroit
Creating Spark SQL table: incidents.detroit_dict
Done creating tables

Create External Tables in Amazon Athena¶

What does this do¶

The S3 parquet files containing the harmonizd data are registered as Amazon Athena external tables.

Why are we doing this¶

You can use Amazon Athena to perform detailed ad-hoc analysis of this and other harmonised datasets using the familiar power of SQL. Using Athena also allows you to easily integrate the dataset with Amazon Quicksight where you can create visual analyses and dashboards.

In [49]:
ddlList=[
    "CREATE DATABASE IF NOT EXISTS `{0}`;".format(schema),
    "DROP TABLE IF EXISTS `{0}`.`{1}`;".format(schema,data_table),
    "DROP TABLE IF EXISTS `{0}`.`{1}`;".format(schema,dict_table),
    data_table_ddl,
    dict_table_ddl
]
athena_s3_staging_dir = "s3://{0}/athena_staging_dir".format(scratch_bucket)
hz.executeAthenaDDL(athena_s3_staging_dir, ddlList)
Exectuting Athena DDL: CREATE DATABASE IF NOT EXISTS `incidents`;
Exectuting Athena DDL: DROP TABLE IF EXISTS `incidents`.`detroit`;
Exectuting Athena DDL: DROP TABLE IF EXISTS `incidents`.`detroit_dict`;
Exectuting Athena DDL: CREATE EXTERNAL TABLE `incidents`.`detroit` (`description` STRING, `year` INT, `neighborhood` STRING, `location` STRING, `uniq` STRING, `crimeid` STRING, `report#` STRING, `offensedescription` STRING, `offensecategory` STRING, `stateoffensecode` STRING, `incidentdate&time` STRING, `incidenttime24h` STRING, `dayofweeksundayis1` STRING, `hourofday` STRING, `scoutcararea` STRING, `precinctnumber` STRING, `censusblockgeoid` STRING, `councildistrict` STRING, `zipcode` STRING, `ibrreportdate` STRING, `city` STRING, `geolocation` STRING, `month` INT, `day` INT, `hour` INT, `minute` INT, `datetime` TIMESTAMP, `dayofweek` STRING, `rawdatapath` STRING, `harmonizeddatapath` STRING, `notebookhtml` STRING)
STORED AS parquet
LOCATION 's3://datasearch-blog-jupyterspark-xn0iflaqfg-emrbucket-10gb2028l2ygj/crimedata/harmonized/Detroit/data/table=detroit/';
Exectuting Athena DDL: CREATE EXTERNAL TABLE `incidents`.`detroit_dict` (`dict_field` STRING, `dict_count` BIGINT, `dict_countdistinct` BIGINT, `dict_countmissing` BIGINT, `dict_mean` DOUBLE, `dict_stddev` DOUBLE, `dict_min` STRING, `dict_max` STRING, `dict_vargroup` STRING, `dict_vardescr` STRING, `dict_uifilter` STRING, `dict_varmapping` STRING, `dict_vartype` STRING)
STORED AS parquet
LOCATION 's3://datasearch-blog-jupyterspark-xn0iflaqfg-emrbucket-10gb2028l2ygj/crimedata/harmonized/Detroit/dictionary/table=detroit_dict/';

Index Data and Dictionary¶

What does this do¶

Creates or replaces the elastic search index to store our harmonized data and assoicated dictionary file

Why are we doing this¶

We save both incident data and dictionary information to elastic search, to power the search page. The dictionary fields are used to dynamically build the search filter panels in the search page side bar - these fields identify each variable, it's vargroup (accordian panel), type (ui selector to use), & description (hover tooltip). The incident record fields are used for the dataset record search.

Call es.createOrReplaceIndex(es_index) to reset and set up default field mappings for the index (see ./lib/elasticsearch.py for more info on default mappings). You can also optionally specify field mappings for individual fields using es.addTypeMapping() as illustrated below, to support the search features you need - for example use a mapping to set date type for timestamps, or to speficy geo_point field if you want to use maps in your Kibana dashboard.

In [50]:
# set up data index
# index name city name for uniqueness, and *harmonized* to allow ES queries across all datasets
es_dataindex = "{0}_harmonized".format(city.lower())
es_datatype = 'incidents'
es.createOrReplaceIndex(es_dataindex)

# create mappings for geolocation and datetime field.. all other fields inherit default mapping
mapping="""
{
    "properties": {
        "geolocation": {
          "type": "geo_point"
        },
        "datetime": {
          "type": "date",
          "format" : "yyyy-MM-dd HH:mm:ss"}
        }
    }
}
"""
es.addTypeMapping(es_dataindex, es_datatype, mapping)
No existing elasticsearch index (detroit_harmonized)
Create index <detroit_harmonized> response: {"acknowledged":true,"shards_acknowledged":true}
Add type mapping for <detroit_harmonized.incidents> response: {"acknowledged":true}
In [51]:
# set up dictionary index
# index name city name for uniqueness, and *dictionary* to allow ES queries across all datasets
es_dictindex = "{0}_dictionary".format(city.lower())
es_dicttype = 'dictionary'
es.createOrReplaceIndex(es_dictindex)
No existing elasticsearch index (detroit_dictionary)
Create index <detroit_dictionary> response: {"acknowledged":true,"shards_acknowledged":true}

All data and dictionary fields can be indexed, by calling ex.saveToEs() and passing the df and df_dict object directly, as shown below. If instead you want to index a subset of the variables, make copy of the data dataframe, drop columns you don't want to index, generate a new dictionary dataframe using the new data dataframe as the argument to hz.buildDataDict(df_for_indexing), and pass the new data and dictionary dataframes to es.saveToEs() in the dfList array argument, shown below.

In [52]:
# index all variables in data dataframe
print("Saving data to elasticsearch - please be patient")
df = df.withColumn("datetime",df["datetime"].cast("string")) # elasticsearch needs datetimes in a string type
es.saveToEs(df,index=es_dataindex,doctype=es_datatype)
Saving data to elasticsearch - please be patient
Dataset 1 saved to elasticsearch <detroit_harmonized/incidents>
In [53]:
# index all variables in dictionary dataframe
print("Saving dictionary to elasticsearch - please be patient")
es.saveToEs(df_dict,index=es_dictindex,doctype=es_dicttype)
Saving dictionary to elasticsearch - please be patient
Dataset 1 saved to elasticsearch <detroit_dictionary/dictionary>

View Harmonized Data Sample¶

What does this do¶

Test that the data table was sucessfully created by using SQL to load a few rows.

Why are we doing this¶

What we see below is what will be exposed in the search UI, so we want to see what our harmonized data looks like. Did we get all the right headers? Does it match the target data type? Is any applied logic correct? etc...

In [54]:
sql="SELECT * FROM %s.%s LIMIT 3" % (schema, data_table)
# run query, convert results to a local pandas dataframe, and display as an HTML table.
HTML(hc.sql(sql).toPandas().to_html())
Out[54]:
description year neighborhood location uniq crimeid report# offensedescription offensecategory stateoffensecode incidentdate&time incidenttime24h dayofweeksundayis1 hourofday scoutcararea precinctnumber censusblockgeoid councildistrict zipcode ibrreportdate city geolocation month day hour minute datetime dayofweek rawdatapath harmonizeddatapath notebookhtml
0 Arson 2017 Barton-McFarland 8000 block of WISCONSIN ST None 3014192 1701160251 ARSON ARSON 2099 01/16/2017 05:20:00 PM 1720 2 17 0210 02 261635347002004 7 48204 02/22/2017 03:33:09 PM Detroit 42.35227,-83.15393 1 16 17 0 2017-01-16 17:00:00 Monday s3://datasearch-blog-jupyterspark-xn0iflaqfg-e... s3://datasearch-blog-jupyterspark-xn0iflaqfg-e... https://s3.amazonaws.com/datasearch-blog-jupyt...
1 Miscellaneous 2017 Barton-McFarland 8600 block of JOY RD None 3042163 1703240355 VIOLATION OF CONTROLED SUBSTANCE ACT - (VCSA) OBSTRUCTING THE POLICE 4801 03/24/2017 06:05:00 PM 1805 6 18 0209 02 261635347004034 7 48204 03/29/2017 09:26:03 AM Detroit 42.35896,-83.15486 3 24 18 0 2017-03-24 18:00:00 Friday s3://datasearch-blog-jupyterspark-xn0iflaqfg-e... s3://datasearch-blog-jupyterspark-xn0iflaqfg-e... https://s3.amazonaws.com/datasearch-blog-jupyt...
2 Narcotics 2017 Barton-McFarland 8600 block of JOY RD None 3042163 1703240355 VIOLATION OF CONTROLED SUBSTANCE ACT - (VCSA) DANGEROUS DRUGS 3501 03/24/2017 06:05:00 PM 1805 6 18 0209 02 261635347004034 7 48204 03/29/2017 09:26:03 AM Detroit 42.35896,-83.15486 3 24 18 0 2017-03-24 18:00:00 Friday s3://datasearch-blog-jupyterspark-xn0iflaqfg-e... s3://datasearch-blog-jupyterspark-xn0iflaqfg-e... https://s3.amazonaws.com/datasearch-blog-jupyt...

View Data Dictionary¶

What does this do¶

Test the summary of the data to verify our dictionary was successfully created

Why are we doing this¶

The data dictionary drives how the data is organized and presented in the UI. We want to ensure we have our expected row counts, data types, mins and means, and that data is organized correctly for display in the UI. What we see here will translate into the search widgets and any available ranges that control those widgets.

In [55]:
sql="SELECT * FROM %s.%s ORDER BY dict_field ASC" % (schema, dict_table)
# run query, convert results to a local pandas dataframe, and display as an HTML table.
HTML(hc.sql(sql).toPandas().to_html())
Out[55]:
dict_field dict_count dict_countdistinct dict_countmissing dict_mean dict_stddev dict_min dict_max dict_vargroup dict_vardescr dict_uifilter dict_varmapping dict_vartype
0 censusblockgeoid 48106 11232 300 NaN NaN 260992521002012 261639859001019 90.Detroit (Unharmonized) unknown True Source Census Block GEOID. Variable value unch... text
1 city 48406 1 0 NaN NaN Detroit Detroit 04.Location Incident city True "city" assigned by harmonization code text
2 councildistrict 47744 7 662 NaN NaN 1 7 90.Detroit (Unharmonized) unknown True Source Council District. Variable value unchan... text
3 crimeid 48406 45980 0 NaN NaN 3000004 3093774 90.Detroit (Unharmonized) unknown True Source Crime ID. Variable value unchanged from... text
4 datetime 48406 6139 0 NaN NaN 2007-04-17 00:00:00 2017-07-21 11:00:00 00.Date and Time Incident date and time True Full timestamp with date and time, eg 2007-04-... datetime
5 day 48406 31 0 15.201773 8.667899 1 31 00.Date and Time Incident date True day, extracted from incidentdate enum,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17...
6 dayofweek 48406 7 0 NaN NaN Friday Wednesday 00.Date and Time Incident day of week True day of Week, calculated from datetime enum,Sunday,Monday,Tuesday,Wednesday,Thursday,...
7 dayofweeksundayis1 48406 7 0 NaN NaN 1 7 90.Detroit (Unharmonized) unknown True Source Day of Week (Sunday is 1). Variable val... text
8 description 47350 15 1056 NaN NaN Arson Weapons 01.Incident Incident description True Source OFFENSE CATEGORY. Map values {"KIDNAPPI... text
9 geolocation 48406 39480 0 NaN NaN 32.02636,-127.91418 42.47535,-83.02128 04.Location Incident geoLocation coordinates False geolocation variable created from LAT and LON ... text
10 harmonizeddatapath 48406 1 0 NaN NaN s3://datasearch-blog-jupyterspark-xn0iflaqfg-e... s3://datasearch-blog-jupyterspark-xn0iflaqfg-e... 99.Miscellaneous S3 Path to harmonized dataset root prefix. False Assigned by harmonization code text
11 hour 48406 24 0 12.330806 7.085211 0 23 00.Date and Time Incident hour True hour, derived from HOUR enum,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17...
12 hourofday 48406 24 0 NaN NaN 0 9 90.Detroit (Unharmonized) unknown True Source Hour of Day. Variable value unchanged f... text
13 ibrreportdate 48406 57 0 NaN NaN 01/27/2017 03:08:19 PM 12/13/2016 12:47:00 PM 90.Detroit (Unharmonized) unknown True Source IBR Report Date. Variable value unchang... text
14 incidentdate&time 48406 28659 0 NaN NaN 01/01/2009 12:00:00 AM 12/31/2016 12:45:00 PM 90.Detroit (Unharmonized) unknown True Source Incident Date & Time. Variable value un... text
15 incidenttime24h 48406 1435 0 NaN NaN 0000 2359 90.Detroit (Unharmonized) unknown True Source Incident Time (24h). Variable value unc... text
16 location 48406 19988 0 NaN NaN 100 block of 8 MILE RD W WARREN AVE 04.Location Incident location/address True Source INCIDENT ADDRESS. Variable value unchan... text
17 minute 48406 1 0 0.000000 0.000000 0 0 00.Date and Time Incident minute True minute, unrecorded in dataset - set to 00 range,0,59,1
18 month 48406 12 0 4.882845 3.138476 1 12 00.Date and Time Incident month True month, extracted from incidentdate enum,1,2,3,4,5,6,7,8,9,10,11,12
19 neighborhood 47608 201 798 NaN NaN Airport Sub Yorkshire Woods 90.Detroit (Unharmonized) unknown True Source Neighborhood. Variable value unchanged ... text
20 notebookhtml 48406 1 0 NaN NaN https://s3.amazonaws.com/datasearch-blog-jupyt... https://s3.amazonaws.com/datasearch-blog-jupyt... 99.Miscellaneous URL to Jupyter notebook containing documentati... False Assigned by harmonization code text
21 offensecategory 47350 31 1056 NaN NaN AGGRAVATED ASSAULT WEAPONS OFFENSES 90.Detroit (Unharmonized) unknown True Source Offense Category. Variable value unchan... text
22 offensedescription 48406 143 0 NaN NaN WEAPONS OFFENSE - OTHER 90.Detroit (Unharmonized) unknown True Source Offense Description. Variable value unc... text
23 precinctnumber 48406 15 0 NaN NaN 0 HP 90.Detroit (Unharmonized) unknown True Source Precinct Number. Variable value unchang... text
24 rawdatapath 48406 1 0 NaN NaN s3://datasearch-blog-jupyterspark-xn0iflaqfg-e... s3://datasearch-blog-jupyterspark-xn0iflaqfg-e... 99.Miscellaneous S3 Path to raw dataset. False Assigned by harmonization code text
25 report# 48406 45979 0 NaN NaN 0101190334 9 90.Detroit (Unharmonized) unknown True Source Report #. Variable value unchanged from... text
26 scoutcararea 48406 146 0 NaN NaN 0 HPPD 90.Detroit (Unharmonized) unknown True Source Scout Car Area. Variable value unchange... text
27 stateoffensecode 47325 70 1081 NaN NaN 0901 7571 90.Detroit (Unharmonized) unknown True Source State Offense Code. Variable value unch... text
28 uniq 0 0 48406 NaN NaN None None 90.Detroit (Unharmonized) unknown True Variable value unchanged from source dataset. text
29 year 48406 9 0 2016.874251 0.345600 2007 2017 00.Date and Time Incident year True Source Year. year, extracted from incidentdate range,2000,2017,1
30 zipcode 47602 30 804 NaN NaN 48201 48243 90.Detroit (Unharmonized) unknown True Source Zip Code. Variable value unchanged from... text

Publish Notebook¶

Save the notebook using javascript to trigger the save_checkpoint method.

What does this do¶

Convert notebook .ipny to html, and use hz.publishNotebookToS3 to copy the .ipny and .html files to the target S3 folder with web access enabled.

Why are we doing this¶

This provides a record within the UI of all the harmonization logic used to transform the raw data into what is exposed through the search and discovery tool. This record allows for easier verification, enhancements, or modifications of harmonization routines.

In [56]:
%%html
<!- SHOW / HIDE CODE TOGGLE ->
<script>
    var code_hide=true; //true -> hide code at first
    function code_showhide_toggle() {
        if (code_hide){
            $('div.input').hide();
            $('div.prompt').hide();
        } else {
            $('div.input').show();
            $('div.prompt').show();
        }
        code_hide = !code_hide
    }
    $( document ).ready(code_showhide_toggle);
</script>
In [57]:
!date
Wed Aug  2 21:57:58 UTC 2017
In [58]:
%%javascript
// save current notebook
IPython.notebook.save_checkpoint()
In [59]:
# convert ipynb to html
!jupyter nbconvert --to html $citynotebook
[NbConvertApp] Converting notebook Detroit-notebook.ipynb to html
[NbConvertApp] Writing 405521 bytes to Detroit-notebook.html
In [60]:
# copy saved notebook (ipynb and html formats) to target S3 bucket
hz.publishNotebookToS3(outputpath_doc, notebook_urlbase, citynotebook) 
# move html copy of notebook into subfolder
f = citynotebook + ".html"
print("Move {} to subfolder ./html".format(f))
!mkdir -p ./html
os.rename(f,"./html/{}".format(f))
Copy Detroit-notebook.ipynb to s3://datasearch-blog-jupyterspark-xn0iflaqfg-emrbucket-10gb2028l2ygj/crimedata/harmonized/Detroit/docs/Detroit-notebook.ipynb
URL: https://s3.amazonaws.com/datasearch-blog-jupyterspark-xn0iflaqfg-emrbucket-10gb2028l2ygj/crimedata/harmonized/Detroit/docs/Detroit-notebook.ipynb
Copy Detroit-notebook.html to s3://datasearch-blog-jupyterspark-xn0iflaqfg-emrbucket-10gb2028l2ygj/crimedata/harmonized/Detroit/docs/Detroit-notebook.html
URL: https://s3.amazonaws.com/datasearch-blog-jupyterspark-xn0iflaqfg-emrbucket-10gb2028l2ygj/crimedata/harmonized/Detroit/docs/Detroit-notebook.html
Move Detroit-notebook.html to subfolder ./html
In [61]:
print("Notebook execution complete.")
Notebook execution complete.