Analyzing 1.7 Billion Reddit Comments with Blaze and Impala

by Daniel Rodriguez and Kristopher Overholt

Blaze is a Python library and interface to query data on different storage systems. Blaze works by translating a subset of modified NumPy and Pandas-like syntax to databases and other computing systems. Blaze gives Python users a familiar interface to query data living in other data storage systems such as SQL databases, NoSQL data stores, Spark, Hive, Impala, and raw data files such as CSV, JSON, and HDF5. Hive and Impala are distributed SQL engines that can perform queries on data that is stored in the Hadoop Distributed File System (HDFS).

In this post, we'll use Blaze and Impala to interactively query and explore a data set of approximately 1.7 billion comments (975 GB uncompressed) from the reddit website from October 2007 to May 2015. This data set was made available on July 2015 in a reddit post. The data set is in JSON format (one comment per line) and consists of the comment body, author, subreddit, timestamp of creation and other fields.

We downloaded the reddit comment data set, extracted it, and moved it to Amazon S3. We'll create a Hadoop cluster on Amazon EC2, move the data set onto the cluster and into Impala, then query it interactively with Blaze from a client machine (a laptop).

This post describes the use of Blaze and Impala on a Hadoop cluster. Two related blog posts use Dask with larger-than-memory data sets to efficiently analyze one month of reddit comments on a single machine: Analyzing Reddit Comments with Dask and Castra and ReproduceIt: Reddit word count.

Ibis is a related Python data analysis framework that is currently focused on Impala, but is actively expanding to use other compute engines. Ibis also has the ability to perform queries on an Impala cluster without requiring you to switch back and forth between Python code and the Impala shell.

Dependencies

The Python packages required for this analysis can be installed on the client machine using the Anaconda Python distribution.

For convenience, the environment.yml file shown below contains the information required to create a new Conda environment (named blaze-impala) and install all of dependencies used in this analysis.

name: blaze-impala
channels:
  - blaze
dependencies:
  - blaze
  - bokeh=0.9
  - impyla
  - jupyter
  - pandas
  - pyhive
  - python=2.7

To create this environment, create a file named environment.yml, paste the above contents into this file, then execute the command conda env create in the same directory as the environment.yml file. You can then source activate blaze-impala to work in the new environment.

Cluster Setup

The Hive and Impala servers used in this analysis were run on a Hadoop cluster with HDFS, YARN, Hive, Impala, and Jupyter Notebook installed. Anaconda Cluster can create and provision a cluster with these data analysis tools available as plugins.

We used Anaconda Cluster to create an Linux cluster on Amazon EC2 with 8 nodes (m3.2xlarge instances, Ubuntu 14.04) and install the Hive, YARN, Impala, HDFS, and Jupyter Notebook plugins.

Preparing the Data

With our Hadoop cluster up and running, we can move the reddit comment data from Amazon S3 to HDFS.

We can SSH into the head node of the cluster and run the following command with valid AWS credentials, which will transfer the reddit comment data (975 GB of JSON data) from a public Amazon S3 bucket to the HDFS data store on the cluster:

hadoop distcp s3n://{{ AWS_KEY }}:{{ AWS_SECRET }}@blaze-data/reddit/json/*/*.json /user/ubuntu

The time required to move the data from Amazon S3 to HDFS was about 1 hour and 45 minutes.

We can run the following command on the head node to download the JSON serializer/deserializer (SerDe) module to enable Hive to read and write in JSON format:

$ wget http://s3.amazonaws.com/elasticmapreduce/samples/hive-ads/libs/jsonserde.jar

On the head node, we open the interactive Hive shell using the hive command, load the JSON SerDe module, create a table, and load the data into Hive:

$ hive
hive > ADD JAR jsonserde.jar;
hive > CREATE TABLE reddit_json (
  archived                 boolean,
  author                   string,
  author_flair_css_class   string,
  author_flair_text        string,
  body                     string,
  controversiality         int,
  created_utc              string,
  distinguished            string,
  downs                    int,
  edited                   boolean,
  gilded                   int,
  id                       string,
  link_id                  string,
  name                     string,
  parent_id                string,
  removal_reason           string,
  retrieved_on             timestamp,
  score                    int,
  score_hidden             boolean,
  subreddit                string,
  subreddit_id             string,
  ups                      int
)
ROW FORMAT
    serde 'com.amazon.elasticmapreduce.JsonSerde'
    with serdeproperties ('paths'='archived,author,author_flair_css_class,author_flair_text,body,controversiality,created_utc,distinguished,downs,edited,gilded,id,link_id,name,parent_id,removal_reason,retrieved_on,score,score_hidden,subreddit,subreddit_id,ups');

hive > LOAD DATA INPATH '/user/ubuntu/*.json' INTO TABLE reddit_json;

The time required to load the data into Hive was less than 1 minute.

Once the data is loaded in Hive, we can query the data using SQL statements such as SELECT count(*) FROM reddit_json;, however, the responses will be fairly slow because the data is in JSON format.

Alternatively, we can migrate the data to Parquet format. Apache Parquet is a columnar data store that was designed for HDFS and performs very well in many cases.

We can use the following commands in the interactive Hive shell to create a new table and convert the data to Parquet format:

hive > CREATE TABLE reddit_parquet (
  archived                 boolean,
  author                   string,
  author_flair_css_class   string,
  author_flair_text        string,
  body                     string,
  controversiality         int,
  created_utc              string,
  distinguished            string,
  downs                    int,
  edited                   boolean,
  gilded                   int,
  id                       string,
  link_id                  string,
  name                     string,
  parent_id                string,
  removal_reason           string,
  retrieved_on             timestamp,
  score                    int,
  score_hidden             boolean,
  subreddit                string,
  subreddit_id             string,
  ups                      int,
  created_utc_t            timestamp
)
STORED AS PARQUET;

hive > SET dfs.block.size=1g;

hive > INSERT OVERWRITE TABLE reddit_parquet select *, cast(cast(created_utc as double) as timestamp) as created_utc_t FROM reddit_json;

The time required to convert the data to Parquet format was about 50 minutes.

Note that we added a new column in timestamp format (created_utc_t) based on the original created_utc column. The original column was a string of numbers (timestamp), so first we cast this to a double and then we cast the resulting double to a timestamp.

Finally, we SSH into one of the compute nodes and execute the following command from the interactive Impala shell to update the tables from the Hive metastore.

$ sudo impala-shell
impala> invalidate metadata;

Data Analysis and Queries

Now that we've loaded the data into Impala and converted it to Parquet format, we can close the SSH connection to the cluster and work from our laptop and use Blaze to interactively query the data set. Note that we will need the IP addresses of the head node and one of the compute nodes, which are running the Hive and Impala servers, respectively.

First, we import the Blaze, pandas, and Bokeh Python packages.

In [1]:
from __future__ import division, print_function

import blaze as bz
import pandas as pd
import bokeh
from bokeh.charts import TimeSeries, output_notebook, show

output_notebook()
BokehJS successfully loaded.
In [2]:
print(bz.__version__)
print(pd.__version__)
print(bokeh.__version__)
0.8.3
0.16.2
0.9.3

Querying the data with Hive

We can use Blaze along with the PyHive Python package to query Hive.

First, we set up a connection to Hive and the table that contains the reddit comment data in Parquet format.

In [3]:
data = bz.Data('hive://54.81.249.17/default::reddit_parquet')

We can use Blaze to query Hive and show the types of all columns in the table.

In [4]:
data.dshape
Out[4]:
dshape("""var * {
  archived: ?bool,
  author: ?string,
  author_flair_css_class: ?string,
  author_flair_text: ?string,
  body: ?string,
  controversiality: ?int32,
  created_utc: ?string,
  distinguished: ?string,
  downs: ?int32,
  edited: ?bool,
  gilded: ?int32,
  id: ?string,
  link_id: ?string,
  name: ?string,
  parent_id: ?string,
  removal_reason: ?string,
  retrieved_on: ?datetime,
  score: ?int32,
  score_hidden: ?bool,
  subreddit: ?string,
  subreddit_id: ?string,
  ups: ?int32,
  created_utc_t: ?datetime
  }""")

Counting the total number of comments

We can count the total number of rows in the data set (that is, the total number of comments) using the following query.

In [5]:
n_posts = data.count()

The previous Blaze expression will not execute anything on Hive. Rather, it is a lazy expression that we can build upon to obtain the results we need from the data. We can also view how this expression translates to SQL using the bz.compute() syntax.

In [6]:
print(bz.compute(n_posts))
SELECT count(`reddit_parquet`.`archived`) AS `count_1` 
FROM `reddit_parquet`

We can then execute the expression on Hive and time the result.

In [7]:
%time int(n_posts)
CPU times: user 261 ms, sys: 56.2 ms, total: 317 ms
Wall time: 4min 30s
Out[7]:
'1659361605'

This query took about 4.5 minutes using Hive, and the result shows that the total number of reddit comments in this data set is about 1.66 billion,

Querying the data with Impala

We can also use Blaze along with the Impyla Python package to query Impala, which is another SQL-on-Hadoop engine that has better performance for most queries because it does not rely on MapReduce jobs like Hive.

With Blaze, all of the queries will use the same syntax since they are abstracted to the user via expressions. The only difference in this case is that we will use the IP address of one of the compute nodes in the cluster instead of the head node because this is where the Impala server is running.

First, we set up a connection to Impala and the table that contains the reddit comment data in Parquet format.

In [8]:
data = bz.Data('impala://54.81.251.205/default::reddit_parquet')

Counting the total number of comments

Again, we can count the total number of rows in the data set with the following query. Note that the syntax of the Blaze expression is the same for both Hive and Impala.

In [9]:
n_posts = data.count()

Similar to Hive, we can also view how this expression translates to SQL using the bz.compute() syntax. Note that the SQL syntax is the same for both Hive and Impala.

In [10]:
print(bz.compute(n_posts))
SELECT count(reddit_parquet.archived) AS count_1 
FROM reddit_parquet

We can then execute the expression on Impala and time the result.

In [11]:
%time int(n_posts)
CPU times: user 17.1 ms, sys: 6.05 ms, total: 23.2 ms
Wall time: 5.13 s
Out[11]:
1659361605

Note the significant difference in performance between Hive (4.5 minutes) and Impala (5 seconds) for the same query. There are a few seconds of network delay as our client machine communicates with the Hadoop cluster on Amazon EC2, so the actual computation time in Impala is slightly less than 5 seconds.

Counting the total number of upvotes

We can perform another simple query to sum all of the upvotes for all of the comments.

In [12]:
n_up_votes = data.ups.sum()
In [13]:
print(bz.compute(n_up_votes))
SELECT sum(reddit_parquet.ups) AS ups_sum 
FROM reddit_parquet
In [14]:
%time int(n_up_votes)
CPU times: user 14.1 ms, sys: 4.95 ms, total: 19 ms
Wall time: 4.14 s
Out[14]:
8720517935

This query took about 4 seconds, and the result shows that the total number of upvotes in this data set is about 8.72 billion.

Counting the total number of posts in the /r/soccer subreddit

Similar to pandas or NumPy, you can filter particular columns from the data using Blaze. For example, we can count the total number of comments in the /r/soccer subreddit.

In [15]:
n_posts_in_r_soccer = data[data.subreddit == 'soccer'].count()
In [16]:
%time int(n_posts_in_r_soccer)
CPU times: user 27.4 ms, sys: 8.93 ms, total: 36.3 ms
Wall time: 4.99 s
Out[16]:
11566836

This query took about 5 seconds, and the result shows that the total number of comments in the /r/soccer subreddit is about 11.6 million.

Listing the top 10 subreddits with the most comments

Group by and sort operations are also supported in Blaze. Suppose we want to identify the subreddits with the most comments. We can group by subreddit, count the number of rows in each group, sort using the new posts field, and display the first ten subreddits in the sorted list.

In [17]:
top_subreddits = bz.by(data.subreddit, posts=data.count()).sort('posts', ascending=False).head(10)
In [18]:
print(bz.compute(top_subreddits))
SELECT anon_1.subreddit, anon_1.posts 
FROM (SELECT reddit_parquet.subreddit AS subreddit, count(reddit_parquet.archived) AS posts 
FROM reddit_parquet GROUP BY reddit_parquet.subreddit) AS anon_1 ORDER BY anon_1.posts DESC
 LIMIT %(param_1)s

To convert the output into a pandas DataFrame, we can use odo, which is available in Blaze using bz.odo.

In [19]:
%time bz.odo(top_subreddits, pd.DataFrame)
CPU times: user 66 ms, sys: 22.4 ms, total: 88.4 ms
Wall time: 27.9 s
Out[19]:
subreddit posts
0 AskReddit 184540520
1 funny 49967219
2 pics 47208205
3 gaming 32202209
4 WTF 29779435
5 leagueoflegends 29706965
6 AdviceAnimals 27339965
7 politics 22904996
8 videos 21235667
9 worldnews 19687581

This query took about 28 seconds, and the results show that the subreddit with the most comments was /r/AskReddit with about 185 million comments.

Counting the number of comments before a specific hour

While converting the data from JSON to Parquet format, we added an extra field called created_utc_t as a TIMESTAMP type in Impala. Using this column, we can filter comments by specific hours using bz.hour in Blaze.

In [20]:
before_1pm = data.ups[bz.hour(data.created_utc_t) < 13].count()
In [21]:
%time int(before_1pm)
CPU times: user 23.2 ms, sys: 7.81 ms, total: 31 ms
Wall time: 6.26 s
Out[21]:
738941490

This query took about 6 seconds, and the result shows that the total number of comments posted before 1 pm is about 739 million (or 45% of the total comments).

Plotting the daily frequency of comments in the /r/IAmA subreddit

In a post on the /r/IAmA subreddit, a person posts details about who they are, and other people can ask the original poster questions about himself/herself.

We can build a query for the information that we need to plot the daily number of comments in the /r/IAmA subreddit over the entire date range of the data set.

First, we filter comments from the /r/IAmA subreddit.

In [22]:
iama = data[(data.subreddit == 'IAmA')]

We can view the date of the first few posts on /r/IAmA.

In [23]:
iama.created_utc_t.sort('created_utc_t').head(5)
Out[23]:
created_utc_t
0 2009-05-28 00:42:02
1 2009-05-28 00:43:55
2 2009-05-28 00:48:26
3 2009-05-28 00:55:07
4 2009-05-28 01:09:58

We can convert the timestamp field to a continuous day field using the following equation:

(Year - 2007) * 365 + (Month - 1) * 31 + Day

This equation oversimplifies by falsely assuming that all months have 31 days and ignoring the extra days in leap years such as 2012, but it will serve to demonstrate the functionality of Blaze and Impala.

In [24]:
days = (bz.year(iama.created_utc_t) - 2007) * 365 + (bz.month(iama.created_utc_t) - 1) * 31  + bz.day(iama.created_utc_t)

We can reuse the previous expression to add a day columm to the table using bz.transform.

In [25]:
with_day = bz.transform(iama, day=days)

We can then group by the new day column.

In [26]:
by_day = bz.by(with_day.day, posts=with_day.created_utc_t.count())

To convert the output into a pandas DataFrame, we can use odo, which is available in Blaze using bz.odo.

In [27]:
%time by_day_result = bz.odo(by_day, pd.DataFrame)
CPU times: user 109 ms, sys: 13.4 ms, total: 122 ms
Wall time: 7.52 s
In [28]:
by_day_result = bz.odo(by_day, pd.DataFrame)

We can use pandas to modify the data and sort it by day.

In [29]:
by_day_result = by_day_result.sort(columns=['day'])

We can create a DateRange in pandas and use this as an index in the time series plot.

In [30]:
rng = pd.date_range('5/28/2009', periods=len(by_day_result), freq='D')
by_day_result.index = rng

Finally, we can use Bokeh to plot the daily number of comments in the /r/IAmA subreddit from May 2009 (the date of the first comment) to May 2015.

In [31]:
f = TimeSeries(by_day_result.posts, by_day_result.index,
               title='Comments in /r/IAmA subreddit',
               xlabel='Date', ylabel='Comments',
               tools='pan,reset,save,wheel_zoom,box_zoom',
               width=600, height=500)
show(f)

Summary

Blaze allows us to build queries and interactively explore data sets that exist in different storage systems and formats (Hive and Impala in this post) using a familiar Python interface and a consistent expression system, regardless of the backend.

We were able to run this analysis in a notebook on our local machine and interactively construct Blaze expressions, explore the data set of approximately 1.7 billion comments, and generate interactive plots, whereas the data queries were executed on the cluster nodes using the Hive and Impala distributed SQL engines.

The tools in the Blaze ecosystem (Blaze, odo, datashape, etc.) give us the flexibility of interacting with different remote data storage systems while using a consistent expression syntax and leveraging the performance of Impala and Parquet.

comments powered by Disqus