Installing and running Jupyter Notebook and pandas on EC2

  • Stuart Mitchell (PhD)
  • python@stuartmitchell.com

What is the point of this

  1. You get your own linux box to play with
  2. You are very close to s3 and can store gigabytes of data cheaply
  3. you can scale the instance up and down as you need and still have everything else

Start Ec2 instance and install anaconda python

First of all we need to start up an EC2 instance and install anaconda, remember to setup an iamrole and security group

see any of these:

~$ wget https://repo.continuum.io/archive/Anaconda3-4.2.0-Linux-x86_64.sh
--2017-01-17 10:15:56--  https://repo.continuum.io/archive/Anaconda3-4.2.0-Linux-x86_64.sh
...
ubuntu@ip-172-30-1-40:~$ bash Anaconda3-4.2.0-Linux-x86_64.sh

Then login again

Install/ run Jupyter Notebook

~$ conda install jupyter -yq
~$ jupyter notebook --generate-config
~$ nano .jupyter/jupyter_notebook_config.py

add the following 2 lines to the top

c.NotebookApp.ip = '*'
c.NotebookApp.open_browser = False

add a security rule to allow all traffic from your own ip and you will be safe enough for this demo.

start tmux and start a server

~$ tmux
~$ mkdir notebook
~$ cd notebook
~/notebook$ jupyter notebook

Ctrl-b then d will detach from tmux

Play with NZEM data

This is found from http://www.emi.ea.govt.nz/Datasets/Wholesale/Final_pricing/Final_prices

but I have uploaded a copy to s3 (hopefully this is public)

These files contain the pricing data for every node in the nzem over the last 20 years (with annoying issues on 2 files removed)

In [7]:
# load this data into a pandas dataframe
import pandas as pd
pd.read_csv('s3://nzem-files/Wholesale/Final_pricing/Final_prices/199701_Final_prices.csv').head()
Out[7]:
Trading_date Trading_period Node Price
0 1997-01-01 1 ABY0111 25.93
1 1997-01-01 1 ADD0111 26.60
2 1997-01-01 1 ADD0661 26.58
3 1997-01-01 1 AHA0111 27.16
4 1997-01-01 1 ALB0331 30.20
In [8]:
# add some extra processing to the columns
month_data = pd.read_csv('s3://nzem-files/Wholesale/Final_pricing/Final_prices/199701_Final_prices.csv', 
                         parse_dates=['Trading_date'],
                        index_col=['Trading_date', 'Trading_period', 'Node'])
In [11]:
# What are the unique names of nodes
month_data.reset_index()['Node'].unique()
Out[11]:
array(['ABY0111', 'ADD0111', 'ADD0661', 'AHA0111', 'ALB0331', 'ALB1101',
       'ANA0111', 'APS0111', 'ARA2201', 'ARG1101', 'ARI0501', 'ARI1101',
       'ASB0331', 'ASY0111', 'ATI0111', 'ATI2201', 'AVI2201', 'BAL0331',
       'BEN0161', 'BEN2201', 'BLN0331', 'BOB0331', 'BPE0331', 'BPE0551',
       'BPE2201', 'BRB0331', 'BRK0331', 'BRY0111', 'BRY0661', 'BWK1101',
       'BWR0111', 'CBE0331', 'CBG0111', 'CLH0111', 'CML0331', 'COB0661',
       'COL0111', 'COL0661', 'CPK0111', 'CPK0331', 'CST0111', 'CST0331',
       'CUL0331', 'CYD0331', 'CYD2201', 'DAR0111', 'DOB0331', 'DOB0661',
       'DVK0111', 'EDG0331', 'EDN0331', 'FHL0331', 'FKN0331', 'GFD0331',
       'GIS0111', 'GIS0501', 'GLN0331', 'GLN0332', 'GOR0331', 'GYM0661',
       'GYT0331', 'HAM0111', 'HAM0331', 'HAM0551', 'HAM2201', 'HAY0111',
       'HAY0331', 'HAY1101', 'HAY2201', 'HBK0661', 'HEN0331', 'HEN2201',
       'HEP0331', 'HIN0331', 'HLY2201', 'HOR0331', 'HTI0331', 'HUI0331',
       'HWA0331', 'HWA1101', 'HWB0331', 'HWB2201', 'INV0331', 'INV2201',
       'ISL0331', 'ISL0661', 'ISL2201', 'KAI0111', 'KAW0111', 'KAW0112',
       'KEN0331', 'KIK0111', 'KIN0111', 'KIN0331', 'KKA0331', 'KOE0331',
       'KPO1101', 'KPU0661', 'KTA0331', 'KWA0111', 'LFD1101', 'LTN0331',
       'MAN2201', 'MAT1101', 'MCH0111', 'MDN0141', 'MDN0331', 'MDN1101',
       'MDN2201', 'MER0331', 'MER1101', 'MGM0331', 'MHO0331', 'MLG0111',
       'MLG0331', 'MNG0331', 'MNG1101', 'MNI0111', 'MNO0111', 'MOT0111',
       'MPE0331', 'MPI0331', 'MRA0111', 'MRR0111', 'MST0331', 'MTI0111',
       'MTI2201', 'MTM0111', 'MTM0331', 'MTN0331', 'MTO0331', 'MTR0331',
       'NMA0331', 'NPK0331', 'NPL0331', 'NPL1101', 'NPL2201', 'NSY0331',
       'OAM0331', 'OHA2201', 'OHB2201', 'OHC2201', 'OHK2201', 'OKI0111',
       'OKI2201', 'OKN0111', 'ONG0331', 'OPK0331', 'OTA0221', 'OTA1101',
       'OTA1102', 'OTA2201', 'OTI0111', 'OWH0111', 'PAK0331', 'PAL0331',
       'PAP0111', 'PAP0661', 'PEN0221', 'PEN0331', 'PEN1101', 'PNI0331',
       'PPI2201', 'PRM0331', 'RDF0331', 'RDF2201', 'RFT0111', 'ROS0221',
       'ROS1101', 'ROT0111', 'ROT0331', 'ROT1101', 'ROX1101', 'ROX2201',
       'RPO2201', 'SBK0331', 'SDN0331', 'SFD0331', 'SFD2201', 'SPN0331',
       'STK0331', 'STK2201', 'STU0111', 'SWN2201', 'TAK0331', 'TGA0111',
       'TGA0331', 'TIM0111', 'TKA0111', 'TKA0331', 'TKB2201', 'TKH0111',
       'TKR0331', 'TKU0331', 'TKU2201', 'TMI0331', 'TMK0111', 'TMN0551',
       'TMU0111', 'TMU1101', 'TNG0111', 'TNG0551', 'TOB0501', 'TRK2201',
       'TUI0111', 'TUI1101', 'TWI2201', 'TWZ0331', 'UHT0331', 'WAI0111',
       'WDV0111', 'WEL0331', 'WES0331', 'WGN0331', 'WHI0111', 'WHI2201',
       'WHU0331', 'WIL0331', 'WIR0331', 'WKM2201', 'WKO0331', 'WMG0331',
       'WPA2201', 'WPR0331', 'WPT0111', 'WPW0331', 'WRA0111', 'WRA0501',
       'WRK0331', 'WRK2201', 'WTK0111', 'WTK0331', 'WTK2201', 'WTN0111',
       'WTN0661', 'WTU0331', 'WVY0111', 'BDE0111'], dtype=object)
In [12]:
# OTA2201 is the node in Otahuhu Auckland
month_data.query('Node == "OTA2201"').head()
Out[12]:
Price
Trading_date Trading_period Node
1997-01-01 1 OTA2201 29.94
2 OTA2201 29.97
3 OTA2201 29.85
4 OTA2201 31.18
5 OTA2201 31.31
In [15]:
%matplotlib inline

# Show prices over the month in Auckland
month_data.query('Node == "OTA2201"').plot()
Out[15]:
<matplotlib.axes._subplots.AxesSubplot at 0x7ff609b1e4a8>
In [79]:
# Aggregate prices per day
month_data.query('Node == "OTA2201"').reset_index().groupby(['Trading_date']).describe().head()
Out[79]:
Price Trading_period
Trading_date
1997-01-01 count 48.000000 48.00
mean 28.521875 24.50
std 3.455103 14.00
min 23.320000 1.00
25% 25.515000 12.75
In [21]:
%%bash
conda install seaborn -y
Fetching package metadata .........
Solving package specifications: .

# All requested packages already installed.
# packages in environment at /home/ubuntu/anaconda3:
#
seaborn                   0.7.1                    py35_0  
In [22]:
# Show some sexier graphs
# box plot of daily prices for Auckland
import seaborn as sns
sns.boxplot(x='Trading_date', y='Price', data=month_data.query('Node == "OTA2201"').reset_index())
Out[22]:
<matplotlib.axes._subplots.AxesSubplot at 0x7ff5f7222208>
In [85]:
# Aggregate by date and find the maximum
month_data.reset_index().groupby(['Trading_date','Node']).max().head()
Out[85]:
Trading_period Price
Trading_date Node
1997-01-01 ABY0111 48 25.93
ADD0111 48 26.70
ADD0661 48 26.68
AHA0111 48 27.46
ALB0331 48 39.99
In [83]:
# Plot this data
month_data.reset_index().groupby(
    ['Trading_date','Node']).max().query('Node == "OTA2201"').plot(y='Price')
Out[83]:
<matplotlib.axes._subplots.AxesSubplot at 0x7ff5cc890dd8>
In [28]:
# For a particular data plot all the node data
# see how all the nodes are highly correlated except for 3 trading periods
month_data.query('Trading_date == "1997-01-01"').unstack('Node').plot(legend=False)
Out[28]:
<matplotlib.axes._subplots.AxesSubplot at 0x7ff5f67d6d68>

What happens when the data gets bigger?

In [ ]:
!conda install dask -y
In [ ]:
!conda install s3fs -y

Read in 1 years worth of data

In [2]:
#we can use dask to read multiply files at once!
import dask.dataframe as dd
dask_data = dd.read_csv('s3://nzem-files/Wholesale/Final_pricing/Final_prices/2016*_Final_prices.csv', 
                        parse_dates=['Trading_date'])
dask_data
Out[2]:
dd.DataFrame<from-de..., npartitions=11>

The dask dataframe does not actually do any work until its compute method is called

In [30]:
some_data = dask_data.compute()
some_data.head()
Out[30]:
Trading_date Trading_period Node Price
0 2016-01-01 1 ABY0111 68.57
1 2016-01-01 1 ALB0331 64.56
2 2016-01-01 1 ALB1101 64.45
3 2016-01-01 1 APS0111 70.92
4 2016-01-01 1 ARA2201 60.27
In [31]:
# now for the whole year we can see how the price changes by trading period
some_data.query('Node=="OTA2201"').groupby('Trading_period').mean().plot()
Out[31]:
<matplotlib.axes._subplots.AxesSubplot at 0x7ff5ec0dd668>

What happens when the data is even bigger

use dask.distributed and setup a cluster with dask-ec2

you will have to give your notebook instance EC2FullPrivilages and IAMFullPrivilages for this to work

In [ ]:
!conda install distributed -y
In [ ]:
!dask-ec2 up --keyname <keyname> --keypair <location_of your_key> --vpc-id <look_at_ec2_console> --subnet-id <look_at_ec2_console> --iaminstance-name <needed_to_access_private_data> --no-notebook 

That takes 10 minutes to run while that is going lets get a smaller dataset

In [9]:
dask_data = dd.read_csv('s3://nzem-files/Wholesale/Final_pricing/Final_prices/201[5-6]1*_Final_prices.csv', 
                        parse_dates=['Trading_date'])
dask_data['year'] = dask_data['Trading_date'].map(lambda d: d.year)
all_years = dask_data.groupby(['Trading_period','year']).mean()['Price']

Note that dask has not actually done any work, but it has built up a task tree for the 3 months of data

In [10]:
all_years.visualize()
Out[10]:
In [11]:
all_years = all_years.compute()
all_years.unstack('year').plot()
Out[11]:
<matplotlib.axes._subplots.AxesSubplot at 0x7fc66ff36710>

Now that our cluster has come up find out the IP address and connect to it.

Also while you are there connect to the head node and have a look at the graphical interface.

In [14]:
from distributed import Client
client = Client('54.234.162.41:8786')
In [15]:
client
Out[15]:
<Client: scheduler="54.234.162.41:8786" processes=5 cores=10>

I started a 6 node cluster 1 node is reserved to be the "scheduler"

In [13]:
dask_data = dd.read_csv('s3://nzem-files/Wholesale/Final_pricing/Final_prices/20*_Final_prices.csv', 
                        parse_dates=['Trading_date'])
dask_data['year'] = dask_data['Trading_date'].map(lambda d: d.year)
all_years = dask_data.groupby(['Trading_period','year']).mean()['Price']

Have fun watching the cluster chomp through 20 years of data!! Also if you like kill an instance (not the scheduler node dask-ec2-cluster-0) and see that the calculations just keep going

In [17]:
all_years = all_years.compute()
all_years.unstack('year').plot(legend=False)
Out[17]:
<matplotlib.axes._subplots.AxesSubplot at 0x7fc66ec462e8>