First of all we need to start up an EC2 instance and install anaconda, remember to setup an iamrole and security group
see any of these:
~$ wget https://repo.continuum.io/archive/Anaconda3-4.2.0-Linux-x86_64.sh
--2017-01-17 10:15:56-- https://repo.continuum.io/archive/Anaconda3-4.2.0-Linux-x86_64.sh
...
ubuntu@ip-172-30-1-40:~$ bash Anaconda3-4.2.0-Linux-x86_64.sh
Then login again
~$ conda install jupyter -yq
~$ jupyter notebook --generate-config
~$ nano .jupyter/jupyter_notebook_config.py
add the following 2 lines to the top
c.NotebookApp.ip = '*'
c.NotebookApp.open_browser = False
add a security rule to allow all traffic from your own ip and you will be safe enough for this demo.
start tmux
and start a server
~$ tmux
~$ mkdir notebook
~$ cd notebook
~/notebook$ jupyter notebook
Ctrl-b
then d
will detach from tmux
This is found from http://www.emi.ea.govt.nz/Datasets/Wholesale/Final_pricing/Final_prices
but I have uploaded a copy to s3 (hopefully this is public)
These files contain the pricing data for every node in the nzem over the last 20 years (with annoying issues on 2 files removed)
# load this data into a pandas dataframe
import pandas as pd
pd.read_csv('s3://nzem-files/Wholesale/Final_pricing/Final_prices/199701_Final_prices.csv').head()
# add some extra processing to the columns
month_data = pd.read_csv('s3://nzem-files/Wholesale/Final_pricing/Final_prices/199701_Final_prices.csv',
parse_dates=['Trading_date'],
index_col=['Trading_date', 'Trading_period', 'Node'])
# What are the unique names of nodes
month_data.reset_index()['Node'].unique()
# OTA2201 is the node in Otahuhu Auckland
month_data.query('Node == "OTA2201"').head()
%matplotlib inline
# Show prices over the month in Auckland
month_data.query('Node == "OTA2201"').plot()
# Aggregate prices per day
month_data.query('Node == "OTA2201"').reset_index().groupby(['Trading_date']).describe().head()
%%bash
conda install seaborn -y
# Show some sexier graphs
# box plot of daily prices for Auckland
import seaborn as sns
sns.boxplot(x='Trading_date', y='Price', data=month_data.query('Node == "OTA2201"').reset_index())
# Aggregate by date and find the maximum
month_data.reset_index().groupby(['Trading_date','Node']).max().head()
# Plot this data
month_data.reset_index().groupby(
['Trading_date','Node']).max().query('Node == "OTA2201"').plot(y='Price')
# For a particular data plot all the node data
# see how all the nodes are highly correlated except for 3 trading periods
month_data.query('Trading_date == "1997-01-01"').unstack('Node').plot(legend=False)
!conda install dask -y
!conda install s3fs -y
#we can use dask to read multiply files at once!
import dask.dataframe as dd
dask_data = dd.read_csv('s3://nzem-files/Wholesale/Final_pricing/Final_prices/2016*_Final_prices.csv',
parse_dates=['Trading_date'])
dask_data
The dask dataframe does not actually do any work until its compute method is called
some_data = dask_data.compute()
some_data.head()
# now for the whole year we can see how the price changes by trading period
some_data.query('Node=="OTA2201"').groupby('Trading_period').mean().plot()
use dask.distributed
and setup a cluster with dask-ec2
you will have to give your notebook instance EC2FullPrivilages and IAMFullPrivilages for this to work
!conda install distributed -y
!dask-ec2 up --keyname <keyname> --keypair <location_of your_key> --vpc-id <look_at_ec2_console> --subnet-id <look_at_ec2_console> --iaminstance-name <needed_to_access_private_data> --no-notebook
That takes 10 minutes to run while that is going lets get a smaller dataset
dask_data = dd.read_csv('s3://nzem-files/Wholesale/Final_pricing/Final_prices/201[5-6]1*_Final_prices.csv',
parse_dates=['Trading_date'])
dask_data['year'] = dask_data['Trading_date'].map(lambda d: d.year)
all_years = dask_data.groupby(['Trading_period','year']).mean()['Price']
Note that dask has not actually done any work, but it has built up a task tree for the 3 months of data
all_years.visualize()
all_years = all_years.compute()
all_years.unstack('year').plot()
Now that our cluster has come up find out the IP address and connect to it.
Also while you are there connect to the head node and have a look at the graphical interface.
from distributed import Client
client = Client('54.234.162.41:8786')
client
I started a 6 node cluster 1 node is reserved to be the "scheduler"
dask_data = dd.read_csv('s3://nzem-files/Wholesale/Final_pricing/Final_prices/20*_Final_prices.csv',
parse_dates=['Trading_date'])
dask_data['year'] = dask_data['Trading_date'].map(lambda d: d.year)
all_years = dask_data.groupby(['Trading_period','year']).mean()['Price']
Have fun watching the cluster chomp through 20 years of data!!
Also if you like kill an instance (not the scheduler node dask-ec2-cluster-0
) and see that the calculations just keep going
all_years = all_years.compute()
all_years.unstack('year').plot(legend=False)