个性化阅读
专注于IT技术分析

Apache Spark流教程:识别流行的Twitter Hashtags

本文概述

如今, 数据正在以前所未有的速度增长和积累。目前, 我们世界上大约90%的数据仅在最近两年内生成。由于如此惊人的增长率, 大数据平台必须采用根本性的解决方案才能维护如此庞大的数据量。

当今的主要数据来源之一是社交网络。请允许我演示一个真实的示例:使用其中最重要的大数据回声解决方案之一(Apache Spark和Python)实时处理, 分析和从社交网络数据中提取见解。

Apache Spark Streaming可用于从社交媒体中提取见解,例如趋势Twitter主题标签

在本文中, 我将教你如何构建一个简单的应用程序, 该应用程序使用Python从Twitter读取在线流, 然后使用Apache Spark Streaming处理推文以标识主题标签, 最后返回最热门的主题标签并以真实的形式表示此数据。时间仪表板。

为Twitter API创建自己的凭证

为了从Twitter获得推文, 你需要在TwitterApps上注册, 方法是单击”创建新应用程序”, 然后填写以下表单, 然后单击”创建你的Twitter应用程序”。

屏幕截图:如何创建你的Twitter应用程序。

其次, 转到你新创建的应用程序, 然后打开”密钥和访问令牌”标签。然后点击”生成我的访问令牌”。

屏幕截图:设置Twitter应用程序凭据,密钥和访问令牌。

你的新访问令牌将显示如下。

屏幕截图:Twitter应用程序访问令牌设置。

现在, 你可以进行下一步了。

构建Twitter HTTP客户端

在这一步中, 我将向你展示如何构建一个简单的客户端, 该客户端将使用Python从Twitter API中获取推文, 并将其传递给Spark Streaming实例。对于任何专业的Python开发人员而言, 应该都易于遵循。

首先, 我们创建一个名为twitter_app.py的文件, 然后将代码添加到其中, 如下所示。

导入我们将要使用的库, 如下所示:

import socket
import sys
import requests
import requests_oauthlib
import json

并添加将在OAuth中用于连接到Twitter的变量, 如下所示:

# Replace the values below with yours
ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN'
ACCESS_SECRET = 'YOUR_ACCESS_SECRET'
CONSUMER_KEY = 'YOUR_CONSUMER_KEY'
CONSUMER_SECRET = 'YOUR_CONSUMER_SECRET'
my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN, ACCESS_SECRET)

现在, 我们将创建一个名为get_tweets的新函数, 该函数将调用Twitter API URL并返回对tweet流的响应。

def get_tweets():
	url = 'https://stream.twitter.com/1.1/statuses/filter.json'
	query_data = [('language', 'en'), ('locations', '-130, -20, 100, 50'), ('track', '#')]
	query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data])
	response = requests.get(query_url, auth=my_auth, stream=True)
	print(query_url, response)
	return response

然后, 创建一个函数, 以接收上述响应, 并从整个tweets的JSON对象中提取tweets的文本。之后, 它通过TCP连接将每条推文发送到Spark Streaming实例(稍后将讨论)。

def send_tweets_to_spark(http_resp, tcp_connection):
	for line in http_resp.iter_lines():
    	try:
        	full_tweet = json.loads(line)
        	tweet_text = full_tweet['text']
        	print("Tweet Text: " + tweet_text)
        	print ("------------------------------------------")
        	tcp_connection.send(tweet_text + '\n')
    	except:
        	e = sys.exc_info()[0]
        	print("Error: %s" % e)

现在, 我们将成为主要部分, 它将与Spark进行连接的应用程序主机套接字连接。我们将在这里将IP配置为localhost, 因为所有IP都将在同一台计算机和端口9009上运行。然后, 我们将调用上面所获得的get_tweets方法, 以从Twitter获取推文, 并将其响应与与send_tweets_to_spark的套接字连接, 用于将推文发送到Spark。

TCP_IP = "localhost"
TCP_PORT = 9009
conn = None
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((TCP_IP, TCP_PORT))
s.listen(1)
print("Waiting for TCP connection...")
conn, addr = s.accept()
print("Connected... Starting getting tweets.")
resp = get_tweets()
send_tweets_to_spark(resp, conn)

设置我们的Apache Spark流应用程序

让我们构建一个Spark流媒体应用程序, 该应用程序将对传入的推文进行实时处理, 从它们中提取主题标签, 并计算已提及的主题标签数。

插图:Spark流允许实时处理传入的推文和标签提取

首先, 我们必须创建一个Spark Context sc实例, 然后以2秒的批处理间隔从sc创建一个Streaming Context ssc, 它将对每两秒钟接收到的所有流进行转换。请注意, 我们已将日志级别设置为ERROR, 以禁用Spark写入的大多数日志。

我们在此处定义了一个检查点, 以便允许定期的RDD检查点;必须在我们的应用程序中使用它, 因为我们将使用有状态转换(将在同一部分稍后讨论)。

然后, 我们定义主DStream数据流, 该数据流将连接到之前在端口9009上创建的套接字服务器, 并从该端口读取推文。 DStream中的每个记录将是一条推文。

from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SQLContext
import sys
import requests
# create spark configuration
conf = SparkConf()
conf.setAppName("TwitterStreamApp")
# create spark context with the above configuration
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
# create the Streaming Context from the above spark context with interval size 2 seconds
ssc = StreamingContext(sc, 2)
# setting a checkpoint to allow RDD recovery
ssc.checkpoint("checkpoint_TwitterApp")
# read data from port 9009
dataStream = ssc.socketTextStream("localhost", 9009)

现在, 我们将定义转换逻辑。首先, 我们将所有tweet分解为单词, 然后将它们放入RDD中。然后, 我们将仅过滤所有单词中的主题标签, 并将它们映射到(主题标签, 1)对, 并将其放入主题标签RDD中。

然后, 我们需要计算#次提到主题标签的次数。我们可以通过使用reduceByKey函数来实现。此函数将计算每个批次中提及标签#的次数, 即它将重置每个批次中的计数。

在我们的情况下, 我们需要计算所有批次的计数, 因此我们将使用另一个名为updateStateByKey的函数, 因为该函数使你可以在用新数据更新RDD时保持其状态。这种方式称为状态转换。

请注意, 要使用updateStateByKey, 你必须配置一个检查点, 并且我们在上一步中已完成此操作。

# split each tweet into words
words = dataStream.flatMap(lambda line: line.split(" "))
# filter the words to get only hashtags, then map each hashtag to be a pair of (hashtag, 1)
hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1))
# adding the count of each hashtag to its last count
tags_totals = hashtags.updateStateByKey(aggregate_tags_count)
# do processing for each RDD generated in each interval
tags_totals.foreachRDD(process_rdd)
# start the streaming computation
ssc.start()
# wait for the streaming to finish
ssc.awaitTermination()

updateStateByKey将函数作为参数称为更新函数。它在RDD中的每个项目上运行, 并执行所需的逻辑。

在我们的案例中, 我们创建了一个称为aggregate_tags_count的更新函数, 该函数将对每个主题标签的所有new_values求和, 并将它们添加到total_sum(即所有批次的总和)中, 并将数据保存到tags_totals RDD中。

def aggregate_tags_count(new_values, total_sum):
	return sum(new_values) + (total_sum or 0)

然后, 我们对每个批次中的tags_totals RDD进行处理, 以便使用Spark SQL Context将其转换为temp表, 然后执行select语句, 以检索前十个具有其计数的hashtag, 并将其放入hashtag_counts_df数据帧中。

def get_sql_context_instance(spark_context):
	if ('sqlContextSingletonInstance' not in globals()):
        globals()['sqlContextSingletonInstance'] = SQLContext(spark_context)
	return globals()['sqlContextSingletonInstance']
def process_rdd(time, rdd):
	print("----------- %s -----------" % str(time))
	try:
    	# Get spark sql singleton context from the current context
    	sql_context = get_sql_context_instance(rdd.context)
    	# convert the RDD to Row RDD
    	row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1]))
    	# create a DF from the Row RDD
    	hashtags_df = sql_context.createDataFrame(row_rdd)
    	# Register the dataframe as table
    	hashtags_df.registerTempTable("hashtags")
    	# get the top 10 hashtags from the table using SQL and print them
    	hashtag_counts_df = sql_context.sql("select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10")
    	hashtag_counts_df.show()
    	# call this method to prepare top 10 hashtags DF and send them
    	send_df_to_dashboard(hashtag_counts_df)
	except:
    	e = sys.exc_info()[0]
    	print("Error: %s" % e)

我们的Spark应用程序的最后一步是将hashtag_counts_df数据帧发送到仪表板应用程序。因此, 我们将数据框转换为两个数组, 一个数组用于主题标签, 另一个数组用于其计数。然后, 我们将它们通过REST API发送到仪表板应用程序。

def send_df_to_dashboard(df):
	# extract the hashtags from dataframe and convert them into array
	top_tags = [str(t.hashtag) for t in df.select("hashtag").collect()]
	# extract the counts from dataframe and convert them into array
	tags_count = [p.hashtag_count for p in df.select("hashtag_count").collect()]
	# initialize and send the data through REST API
	url = 'http://localhost:5001/updateData'
	request_data = {'label': str(top_tags), 'data': str(tags_count)}
	response = requests.post(url, data=request_data)

最后, 这是运行和打印hashtag_counts_df时Spark Streaming的示例输出, 你会注意到, 按照批处理间隔每两秒精确打印一次输出。

每个批次间隔设置打印的Twitter Spark流输出示例

创建一个简单的实时仪表板来表示数据

现在, 我们将创建一个简单的仪表板应用程序, 该应用程序将由Spark实时更新。我们将使用Python, Flask和Charts.js进行构建。

首先, 让我们创建一个具有以下结构的Python项目, 然后下载Chart.js文件并将其添加到静态目录中。

插图:创建一个用于Twitter主题标签分析的Python项目

然后, 在app.py文件中, 我们将创建一个名为update_data的函数, Spark将通过URL http:// localhost:5001 / updateData调用该函数, 以更新全局标签和值数组。

另外, 创建了refresh_graph_data函数以供AJAX请求调用, 以将新的更新标签和值数组作为JSON返回。函数get_chart_page在调用时将呈现chart.html页面。

from flask import Flask, jsonify, request
from flask import render_template
import ast
app = Flask(__name__)
labels = []
values = []
@app.route("/")
def get_chart_page():
	global labels, values
	labels = []
	values = []
	return render_template('chart.html', values=values, labels=labels)
@app.route('/refreshData')
def refresh_graph_data():
	global labels, values
	print("labels now: " + str(labels))
	print("data now: " + str(values))
	return jsonify(sLabel=labels, sData=values)
@app.route('/updateData', methods=['POST'])
def update_data():
	global labels, values
	if not request.form or 'data' not in request.form:
    	return "error", 400
	labels = ast.literal_eval(request.form['label'])
	values = ast.literal_eval(request.form['data'])
	print("labels received: " + str(labels))
	print("data received: " + str(values))
	return "success", 201
if __name__ == "__main__":
	app.run(host='localhost', port=5001)

现在, 让我们在chart.html文件中创建一个简单的图表, 以显示主题标签数据并实时更新。如下定义, 我们需要导入Chart.js和jquery.min.js JavaScript库。

在body标记中, 我们必须创建一个画布并为其指定ID, 以便在下一步使用JavaScript显示图表时引用它。

<!DOCTYPE html>
<html>
	<head>
    	<meta charset="utf-8"/>
    	<title>Top Trending Twitter Hashtags</title>
    	<script src='static/Chart.js'></script>
    	<script src="//ajax.googleapis.com/ajax/libs/jquery/1.9.1/jquery.min.js"></script>
	
	</head>
	<body>
        	<h2>Top Trending Twitter Hashtags</h2>
        	<div style="width:700px;height=500px">
            	<canvas id="chart"></canvas>
        	</div>
	</body>
</html>

现在, 让我们使用下面的JavaScript代码来构建图表。首先, 我们获取canvas元素, 然后创建一个新的图表对象, 并将canvas元素传递给它, 然后定义其数据对象, 如下所示。

请注意, 数据的标签和数据受标签和值变量限制, 这些标签和值变量在调用app.py文件中的get_chart_page函数时呈现页面时返回。

最后剩下的部分是配置为每秒执行一次Ajax请求并调用URL / refreshData的函数, 该函数将在app.py中执行refresh_graph_data并返回新的更新数据, 然后更新呈现新数据的char。

<script>
   var ctx = document.getElementById("chart");
   var myChart = new Chart(ctx, {
    	type: 'horizontalBar', data: {
        	labels: [{% for item in labels %}
                  	"{{item}}", {% endfor %}], datasets: [{
            	label: '# of Mentions', data: [{% for item in values %}
     	                 {{item}}, {% endfor %}], backgroundColor: [
                	'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)', 'rgba(255, 159, 64, 0.2)', 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)'
            	], borderColor: [
                	'rgba(255, 99, 132, 1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)', 'rgba(255, 159, 64, 1)', 'rgba(255, 99, 132, 1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)'
            	], borderWidth: 1
        	}]
    	}, options: {
        	scales: {
	            yAxes: [{
                	ticks: {
                    	beginAtZero:true
                	}
            	}]
        	}
    	}
   });
   var src_Labels = [];
   var src_Data = [];
   setInterval(function(){
    	$.getJSON('/refreshData', {
    	}, function(data) {
        	src_Labels = data.sLabel;
        	src_Data = data.sData;
    	});
    	myChart.data.labels = src_Labels;
    	myChart.data.datasets[0].data = src_Data;
    	myChart.update();
   }, 1000);
</script>

一起运行应用程序

让我们按以下顺序运行这三个应用程序:1. Twitter App Client。 2. Spark应用程序。 3.仪表板Web应用程序。

然后, 你可以使用URL <http:// localhost:5001 />访问实时仪表板

现在, 你可以看到图表正在更新, 如下所示:

动画:实时Twitter趋势标签图表

Apache流式现实生活用例

我们已经学习了如何使用Spark Streaming实时对数据进行简单的数据分析, 以及如何使用RESTful网络服务将其直接与简单的仪表板集成。从这个示例中, 我们可以看到Spark的强大功能, 因为它捕获了大量的数据流, 对其进行转换, 并提取出有价值的见解, 这些见解可轻松用于立即做出决策。可以实施许多有用的用例, 这些用例可以服务于不同行业, 例如新闻或营销。

插图:标签可用于提取有价值的见解和观点,适用于多个行业。

新闻行业的例子

我们可以跟踪最常提及的主题标签, 以了解人们在社交媒体上谈论最多的话题。此外, 我们可以跟踪特定的主题标签及其推文, 以了解人们对世界上特定主题或事件的评价。

营销实例

我们可以收集推文流, 并通过情感分析将其分类并确定人们的兴趣, 以便针对他们的兴趣提供相关的要约。

此外, 有很多用例可以专门用于大数据分析, 并且可以服务于许多行业。一般而言, 有关更多Apache Spark用例, 建议你查看我们之前的文章之一。

我鼓励你从此处阅读有关Spark Streaming的更多信息, 以便更多地了解它的功能, 并对数据进行更高级的转换, 以便实时使用它获得更多见解。

赞(0)
未经允许不得转载:srcmini » Apache Spark流教程:识别流行的Twitter Hashtags

评论 抢沙发

评论前必须登录!