2.7 Data Preprocessing (Python)

2.7.1 Phase 1

import numpy as np 
import pandas as pd 
from datetime import datetime, timedelta
import os
def preprocessing(path, month, save_name, sample=False, profiling=False):
    print('FILENAME:',path)
    if sample:
        df = pd.read_csv(path, index_col=0).reset_index(drop=True)
    else:
        df = pd.read_csv(path)
        
    n_rows = df.shape[0] # Original number of rows
    print('[0] Number of trips in the raw dataset:', n_rows)
    
    # Feature selection
    extraction = ['trip_distance', 'DOLocationID',
                  'PULocationID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime',
                  'payment_type', 'fare_amount', 'tip_amount', 'total_amount']
    df = df[extraction]
    print('[1] Select features')
    
    # Remove trips with non-zero distance
    zero = df[df['trip_distance']<=0].shape[0]
    print('[2] Remove trips with non-positive distance:', zero)
    df = df[df['trip_distance']>0]
    
    # Convert to datetime
    df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
    df['tpep_dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'])
    
    # Check datetime range
    d1 = datetime(2019, month, 1)
    if month==12:
        d2 = datetime(2020, 1, 1)
    else:
        d2 = datetime(2019, month+1, 1)
    # Check wrong date / month
    wrongdate = df[(df['tpep_pickup_datetime']<d1) | (df['tpep_dropoff_datetime']>=d2)].shape[0]
    df = df[(df['tpep_pickup_datetime']>=d1) & (df['tpep_dropoff_datetime']<d2)]
    print('[3] Convert to date/time & drop wrong dates:', wrongdate)

    # Feature engineer 'trip_duration' in minutes
    df['trip_duration'] = df['tpep_dropoff_datetime']-df['tpep_pickup_datetime']
    df['trip_duration'] = round(df['trip_duration'].dt.total_seconds().div(60)).astype(int)
    # Feature engineer 'pickup_hour' as integer
    df['pickup_hour'] = df['tpep_pickup_datetime'].dt.hour.astype(int)
    # Remove `tpep_dropoff_datetime`
    df = df.drop('tpep_dropoff_datetime',axis=1)
    print('[4] Feature engineer `trip_duration` in minutes and `pickup_hour` as nearest integer')
    
    # Remove trips more than 12 hours (likely forgot to turn off meter)
    duration = df[(df['trip_duration']>=720) | (df['trip_duration']<=0)].shape[0]
    df = df[(df['trip_duration']<720) & (df['trip_duration']>0)]
    print('[5] Remove trips more than 12 hours or non-positive:', duration)
    
    # Recast `PULocationID` as integer
    df['PULocationID'] = df['PULocationID'].astype(int)
    pu = df[(df['PULocationID']==264) | (df['PULocationID']==265)].shape[0]
    df = df[(df['PULocationID']!=265) & (df['PULocationID']!=264)]
    print('[6] Recast `PULocationID` as integer & remove unknown IDs:', pu)
    
    # Recast `DOLocationID` as integer
    df['DOLocationID'] = df['DOLocationID'].astype(int)
    pu = df[(df['DOLocationID']==264) | (df['DOLocationID']==265)].shape[0]
    df = df[(df['DOLocationID']!=265) & (df['DOLocationID']!=264)]
    print('[7] Recast `DOLocationID` as integer & remove unknown IDs:', pu)
    
    # Impute and categorise missing 'payment_type' with 0
    df['payment_type'] = df['payment_type'].fillna(0)
    df['payment_type'] = df['payment_type'].astype(int)
    print('[8] Impute and categorise missing `payment_type` with 0')
    
    # Drop duplication
    duplicate = df.duplicated().sum()
    df = df.drop_duplicates().reset_index(drop=True)
    print('[9] Drop duplicates & reset index:', duplicate)
    
    # Summary
    print()
    print('>> Final DF shape:',df.shape)
    print('>> Reduction size (%):', (n_rows-df.shape[0])/n_rows*100)
    print('>> Missing values:')
    print(df.isnull().sum())
    
    # Save
    print()
    df.to_feather(save_name)
    print('>> Saved to feather-format')
    
    # Panda Profiling
    if profiling: 
        ProfileReport(df, minimal=True).to_file(output_file=save_name+'_profiling.html')
    
    df = df[df['payment_type']==1].drop('payment_type',axis=1).reset_index(drop=True)
    df.to_feather(save_name+'_creditCard')
    print('Credit Card only size:',df.shape)
    print('>> Saved to feather-format, credit-card only')
    print()
    print('-----------------------------------------------')
    
    
def load_attribute(attr, credit_card=False):
    df = np.array([]) 
    for i in MONTHS:
        if credit_card:
            path = os.path.abspath(os.path.join("taxi", "ETL", i+"19"+"_creditCard"))
        else:
            path = os.path.abspath(os.path.join("taxi", "ETL", i+"19"))
        df = np.concatenate([df, pd.read_feather(path)[attr].to_numpy()])
    return df
    
# Feature engineer
ACPM = 0.58

df = {'location': np.load('PULocationID_Credit.npz')['data'],
      'total_amount': np.load('total_amount_Credit.npz')['data'],
      'tip_amount': np.load('tip_amount_Credit.npz')['data'],
      'trip_distance': np.load('trip_distance_Credit.npz')['data'],
      'trip_duration': np.load('trip_duration_Credit.npz')['data']}
df = pd.DataFrame(df)

df['rate_per_trip']=(df['total_amount']-ACPM*df['trip_distance'])/df['trip_duration']
df = df.drop(['trip_distance'],axis=1)

df['tip_rate'] = df['tip_amount']/(df['total_amount']-df['tip_amount'])
df = df.drop(['tip_amount','total_amount'],axis=1)

df = df.groupby('location').mean().reset_index()
df['location'] = df['location'].astype(int)
df.to_csv('feature-engineer_by_PULocationID.csv')
df.head() # 103 (Statue of Liberty Island) & 110 doesn't have any trips

2.7.2 Phase 2

import numpy as np 
import pandas as pd 
import random
from datetime import datetime, timedelta
import os

# Stage 1 - Sampling

# Set seed
random.seed(26)
# Randomize 1 million indices
ONEMILLION = random.sample(range(59360231), 1000000)
# 80-20 train test split
TRAIN_INDEX = random.sample(ONEMILLION, 800000)
TEST_INDEX = np.setdiff1d(ONEMILLION, TRAIN_INDEX)

ATTRS = ['trip_distance', 'trip_duration', 'PULocationID', 'DOLocationID', 'pickup_hour']
CREDIT = ['fare_amount', 'tip_amount', 'total_amount']

for i in ATTRS+CREDIT:
    print('>> Atribute:',i)
    df = np.load(i+'_credit.npz')['data']
    train_df = df[TRAIN_INDEX]
    print('Train Size (MB):', train_df.nbytes/1000000)
    np.savez_compressed(i+'_train.npz', data=train_df)
    del(train_df)
    test_df = df[TEST_INDEX]
    print('Test Size (MB):', test_df.nbytes/1000000)
    np.savez_compressed(i+'_test.npz', data=test_df)
    del(test_df)
    print('-------------------------------')
    del(df)
    
from scipy.stats import ks_2samp
# K-S Test for checking sampling integrity
for i in ATTRS+CREDIT:
    print('>> Atribute:',i)
    test_df = np.load(i+'_test.npz')['data']
    train_df = np.load(i+'_train.npz')['data']
    p = ks_2samp(test_df, train_df)[1]
    if p<0.05:
        print('The test set and train set have different distribution')
        print('p-value for KS Test is:',p)
    if p>=0.05:
        print('The test set and train set have similar distribution')
        print('p-value for KS Test is:',p)
    print('-------------------------------')
    del(test_df)
    del(train_df)
    
# Stage 2 - Dataset Creation
train = pd.DataFrame()
test = pd.DataFrame()

for i in ATTRS+CREDIT:
    train[i] = np.load(i+'_train.npz')['data']
    test[i] = np.load(i+'_test.npz')['data']
    
NUM_COLS = ['trip_distance', 'trip_duration', 'fare_amount', 'tip_amount', 'total_amount']
# Filter non-negative values
train = train[(train[NUM_COLS]>=0).all(1)]
test = test[(test[NUM_COLS]>=0).all(1)]

# Log transform the numerical cols (+0.001 to handle log(0))
for col in NUM_COLS:
    train[col] = np.log10(train[col]+0.001)
    test[col] = np.log10(test[col]+0.001)
    
print("Train set shape:", train.shape)
print("Test set shape:", test.shape)

# Save to feather format
train.reset_index(drop=True).to_feather('train_set')
test.reset_index(drop=True).to_feather('test_set')

# Stage 3: Feature Scaling
from matplotlib import pyplot as plt
import seaborn as sns
plt.style.use('D:/DS/0_ASS1/stylesheet.mplstyle')
### TRIP DISTANCE DISTRIBUTION (LOG SCALE)
#plt.figure(figsize=(10,5))
sns.distplot(train['trip_distance'], hist=True, kde=True, 
             bins=50, color='#86bfd0',
             kde_kws={'linewidth': 1, 'bw':0.05, 'color':'#147f9f'})
plt.ylabel('Density')
plt.xlabel('Log(Trip distance)')
plt.suptitle("Log Distribution of Trip distance (miles) in Train Set",
            ha = 'right')
plt.show()
### TOTAL AMOUNT DISTRIBUTION (LOG SCALE)
#plt.figure(figsize=(10,5))
sns.distplot(train['total_amount'], hist=True, kde=True, 
             bins=50, color='#fed38f',
             
             kde_kws={'linewidth': 1, 'bw':0.05, 'color':'#f9ab17'})
plt.xlabel('Log(Total amount)')
plt.ylabel('Density')
plt.suptitle("Log Distribution of Total amount ($) in Train Set",
            ha = 'right')
plt.show()
### TIP AMOUNT DISTRIBUTION (LOG SCALE)
#plt.figure(figsize=(10,5))
sns.distplot(train['tip_amount'], hist=True, kde=True, 
             bins=50, color='#c7827b',
             kde_kws={'linewidth': 1, 'bw':0.05, 'color':'#a80000'})
plt.xlabel('Log(Tip amount)')
plt.ylabel('Density')
plt.suptitle("Log Distribution of Tip amount ($) in Train Set",
            ha = 'right')
plt.show()
### TRIP DURATION DISTRIBUTION (LOG SCALE)
#plt.figure(figsize=(10,5))
sns.distplot(train['trip_duration'], hist=True, kde=True, 
             bins=50, color='#abc27e',
             kde_kws={'linewidth': 1, 'bw':0.05, 'color':'#5a8303'})
plt.xlabel('Log(Trip duration)')
plt.ylabel('Density')
plt.suptitle("Log Distribution of Trip duration (min) in Train Set",
            ha = 'right')
plt.show()
### PICK UP HOURS
#plt.figure(figsize=(30,20))
pickup_hour = train['pickup_hour']
(unique, counts) = np.unique(pickup_hour, return_counts=True)
plt.bar(unique, counts,
       tick_label=[str(int(x))+':00' for x in unique],
       width=0.9)
plt.xticks(rotation=70)
plt.xlabel('Time of day (hour)')
plt.ylabel('Number of trips')
plt.suptitle("Number of trips by time of day in Train Set",
            ha = 'right')
plt.show()
### PICK-UP ZONES
#plt.figure(figsize=(30,20))
pickup_zone = train['PULocationID']
(unique, counts) = np.unique(pickup_zone, return_counts=True)
plt.bar(unique, counts,
       width=0.9, color='#e5a5ff')
#plt.xticks(rotation=70)
plt.xlabel('Pick-up Zone ID')
plt.ylabel('Number of trips')
plt.suptitle("Number of trips by Pick-up Zone in Train Set",
            ha = 'right')
plt.show()
### DROP-OFF ZONES
#plt.figure(figsize=(30,20))
dropoff_zone = train['DOLocationID']
(unique, counts) = np.unique(dropoff_zone, return_counts=True)
plt.bar(unique, counts,
       width=0.9, color='#8ef1df')
plt.xlabel('Drop-off Zone ID')
plt.ylabel('Number of trips')
plt.suptitle("Number of trips by Drop-off Zone in Train Set",
            ha = 'right')
plt.show()
# Omnibus test of normality & Shapiro-Wilk test of normality
# https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.normaltest.html
# H0 := sample comes from normal dist
from scipy.stats import normaltest, shapiro

for col in NUM_COLS:
    print('>> Attribute:', col)
    p_train = normaltest(train[col])[1]
    p_test = normaltest(test[col])[1]
    if p_train<0.05:
        print('The train set is NOT approximately normal')
    if p_train>=0.05:
        print('The train set is approximately normal')
    if p_test<0.05:
        print('The test set is NOT approximately normal')
    if p_test>=0.05:
        print('The test set is approximately normal')
    print('-------------------------------')