本文概述
- 为Twitter API创建自己的凭证
- 构建Twitter HTTP客户端
- 设置我们的Apache Spark流应用程序
- 创建一个简单的实时仪表板来表示数据
- 一起运行应用程序
- Apache流式现实生活用例
如今, 数据正在以前所未有的速度增长和积累。目前, 我们世界上大约90%的数据仅在最近两年内生成。由于如此惊人的增长率, 大数据平台必须采用根本性的解决方案才能维护如此庞大的数据量。
当今的主要数据来源之一是社交网络。请允许我演示一个真实的示例:使用其中最重要的大数据回声解决方案之一(Apache Spark和Python)实时处理, 分析和从社交网络数据中提取见解。
在本文中, 我将教你如何构建一个简单的应用程序, 该应用程序使用Python从Twitter读取在线流, 然后使用Apache Spark Streaming处理推文以标识主题标签, 最后返回最热门的主题标签并以真实的形式表示此数据。时间仪表板。
为Twitter API创建自己的凭证
为了从Twitter获得推文, 你需要在TwitterApps上注册, 方法是单击”创建新应用程序”, 然后填写以下表单, 然后单击”创建你的Twitter应用程序”。
其次, 转到你新创建的应用程序, 然后打开”密钥和访问令牌”标签。然后点击”生成我的访问令牌”。
你的新访问令牌将显示如下。
现在, 你可以进行下一步了。
构建Twitter HTTP客户端
在这一步中, 我将向你展示如何构建一个简单的客户端, 该客户端将使用Python从Twitter API中获取推文, 并将其传递给Spark Streaming实例。对于任何专业的Python开发人员而言, 应该都易于遵循。
首先, 我们创建一个名为twitter_app.py的文件, 然后将代码添加到其中, 如下所示。
导入我们将要使用的库, 如下所示:
import socket
import sys
import requests
import requests_oauthlib
import json
并添加将在OAuth中用于连接到Twitter的变量, 如下所示:
# Replace the values below with yours
ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN'
ACCESS_SECRET = 'YOUR_ACCESS_SECRET'
CONSUMER_KEY = 'YOUR_CONSUMER_KEY'
CONSUMER_SECRET = 'YOUR_CONSUMER_SECRET'
my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN, ACCESS_SECRET)
现在, 我们将创建一个名为get_tweets的新函数, 该函数将调用Twitter API URL并返回对tweet流的响应。
def get_tweets():
url = 'https://stream.twitter.com/1.1/statuses/filter.json'
query_data = [('language', 'en'), ('locations', '-130, -20, 100, 50'), ('track', '#')]
query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data])
response = requests.get(query_url, auth=my_auth, stream=True)
print(query_url, response)
return response
然后, 创建一个函数, 以接收上述响应, 并从整个tweets的JSON对象中提取tweets的文本。之后, 它通过TCP连接将每条推文发送到Spark Streaming实例(稍后将讨论)。
def send_tweets_to_spark(http_resp, tcp_connection):
for line in http_resp.iter_lines():
try:
full_tweet = json.loads(line)
tweet_text = full_tweet['text']
print("Tweet Text: " + tweet_text)
print ("------------------------------------------")
tcp_connection.send(tweet_text + '\n')
except:
e = sys.exc_info()[0]
print("Error: %s" % e)
现在, 我们将成为主要部分, 它将与Spark进行连接的应用程序主机套接字连接。我们将在这里将IP配置为localhost, 因为所有IP都将在同一台计算机和端口9009上运行。然后, 我们将调用上面所获得的get_tweets方法, 以从Twitter获取推文, 并将其响应与与send_tweets_to_spark的套接字连接, 用于将推文发送到Spark。
TCP_IP = "localhost"
TCP_PORT = 9009
conn = None
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((TCP_IP, TCP_PORT))
s.listen(1)
print("Waiting for TCP connection...")
conn, addr = s.accept()
print("Connected... Starting getting tweets.")
resp = get_tweets()
send_tweets_to_spark(resp, conn)
设置我们的Apache Spark流应用程序
让我们构建一个Spark流媒体应用程序, 该应用程序将对传入的推文进行实时处理, 从它们中提取主题标签, 并计算已提及的主题标签数。
首先, 我们必须创建一个Spark Context sc实例, 然后以2秒的批处理间隔从sc创建一个Streaming Context ssc, 它将对每两秒钟接收到的所有流进行转换。请注意, 我们已将日志级别设置为ERROR, 以禁用Spark写入的大多数日志。
我们在此处定义了一个检查点, 以便允许定期的RDD检查点;必须在我们的应用程序中使用它, 因为我们将使用有状态转换(将在同一部分稍后讨论)。
然后, 我们定义主DStream数据流, 该数据流将连接到之前在端口9009上创建的套接字服务器, 并从该端口读取推文。 DStream中的每个记录将是一条推文。
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SQLContext
import sys
import requests
# create spark configuration
conf = SparkConf()
conf.setAppName("TwitterStreamApp")
# create spark context with the above configuration
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
# create the Streaming Context from the above spark context with interval size 2 seconds
ssc = StreamingContext(sc, 2)
# setting a checkpoint to allow RDD recovery
ssc.checkpoint("checkpoint_TwitterApp")
# read data from port 9009
dataStream = ssc.socketTextStream("localhost", 9009)
现在, 我们将定义转换逻辑。首先, 我们将所有tweet分解为单词, 然后将它们放入RDD中。然后, 我们将仅过滤所有单词中的主题标签, 并将它们映射到(主题标签, 1)对, 并将其放入主题标签RDD中。
然后, 我们需要计算#次提到主题标签的次数。我们可以通过使用reduceByKey函数来实现。此函数将计算每个批次中提及标签#的次数, 即它将重置每个批次中的计数。
在我们的情况下, 我们需要计算所有批次的计数, 因此我们将使用另一个名为updateStateByKey的函数, 因为该函数使你可以在用新数据更新RDD时保持其状态。这种方式称为状态转换。
请注意, 要使用updateStateByKey, 你必须配置一个检查点, 并且我们在上一步中已完成此操作。
# split each tweet into words
words = dataStream.flatMap(lambda line: line.split(" "))
# filter the words to get only hashtags, then map each hashtag to be a pair of (hashtag, 1)
hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1))
# adding the count of each hashtag to its last count
tags_totals = hashtags.updateStateByKey(aggregate_tags_count)
# do processing for each RDD generated in each interval
tags_totals.foreachRDD(process_rdd)
# start the streaming computation
ssc.start()
# wait for the streaming to finish
ssc.awaitTermination()
updateStateByKey将函数作为参数称为更新函数。它在RDD中的每个项目上运行, 并执行所需的逻辑。
在我们的案例中, 我们创建了一个称为aggregate_tags_count的更新函数, 该函数将对每个主题标签的所有new_values求和, 并将它们添加到total_sum(即所有批次的总和)中, 并将数据保存到tags_totals RDD中。
def aggregate_tags_count(new_values, total_sum):
return sum(new_values) + (total_sum or 0)
然后, 我们对每个批次中的tags_totals RDD进行处理, 以便使用Spark SQL Context将其转换为temp表, 然后执行select语句, 以检索前十个具有其计数的hashtag, 并将其放入hashtag_counts_df数据帧中。
def get_sql_context_instance(spark_context):
if ('sqlContextSingletonInstance' not in globals()):
globals()['sqlContextSingletonInstance'] = SQLContext(spark_context)
return globals()['sqlContextSingletonInstance']
def process_rdd(time, rdd):
print("----------- %s -----------" % str(time))
try:
# Get spark sql singleton context from the current context
sql_context = get_sql_context_instance(rdd.context)
# convert the RDD to Row RDD
row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1]))
# create a DF from the Row RDD
hashtags_df = sql_context.createDataFrame(row_rdd)
# Register the dataframe as table
hashtags_df.registerTempTable("hashtags")
# get the top 10 hashtags from the table using SQL and print them
hashtag_counts_df = sql_context.sql("select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10")
hashtag_counts_df.show()
# call this method to prepare top 10 hashtags DF and send them
send_df_to_dashboard(hashtag_counts_df)
except:
e = sys.exc_info()[0]
print("Error: %s" % e)
我们的Spark应用程序的最后一步是将hashtag_counts_df数据帧发送到仪表板应用程序。因此, 我们将数据框转换为两个数组, 一个数组用于主题标签, 另一个数组用于其计数。然后, 我们将它们通过REST API发送到仪表板应用程序。
def send_df_to_dashboard(df):
# extract the hashtags from dataframe and convert them into array
top_tags = [str(t.hashtag) for t in df.select("hashtag").collect()]
# extract the counts from dataframe and convert them into array
tags_count = [p.hashtag_count for p in df.select("hashtag_count").collect()]
# initialize and send the data through REST API
url = 'http://localhost:5001/updateData'
request_data = {'label': str(top_tags), 'data': str(tags_count)}
response = requests.post(url, data=request_data)
最后, 这是运行和打印hashtag_counts_df时Spark Streaming的示例输出, 你会注意到, 按照批处理间隔每两秒精确打印一次输出。
创建一个简单的实时仪表板来表示数据
现在, 我们将创建一个简单的仪表板应用程序, 该应用程序将由Spark实时更新。我们将使用Python, Flask和Charts.js进行构建。
首先, 让我们创建一个具有以下结构的Python项目, 然后下载Chart.js文件并将其添加到静态目录中。
然后, 在app.py文件中, 我们将创建一个名为update_data的函数, Spark将通过URL http:// localhost:5001 / updateData调用该函数, 以更新全局标签和值数组。
另外, 创建了refresh_graph_data函数以供AJAX请求调用, 以将新的更新标签和值数组作为JSON返回。函数get_chart_page在调用时将呈现chart.html页面。
from flask import Flask, jsonify, request
from flask import render_template
import ast
app = Flask(__name__)
labels = []
values = []
@app.route("/")
def get_chart_page():
global labels, values
labels = []
values = []
return render_template('chart.html', values=values, labels=labels)
@app.route('/refreshData')
def refresh_graph_data():
global labels, values
print("labels now: " + str(labels))
print("data now: " + str(values))
return jsonify(sLabel=labels, sData=values)
@app.route('/updateData', methods=['POST'])
def update_data():
global labels, values
if not request.form or 'data' not in request.form:
return "error", 400
labels = ast.literal_eval(request.form['label'])
values = ast.literal_eval(request.form['data'])
print("labels received: " + str(labels))
print("data received: " + str(values))
return "success", 201
if __name__ == "__main__":
app.run(host='localhost', port=5001)
现在, 让我们在chart.html文件中创建一个简单的图表, 以显示主题标签数据并实时更新。如下定义, 我们需要导入Chart.js和jquery.min.js JavaScript库。
在body标记中, 我们必须创建一个画布并为其指定ID, 以便在下一步使用JavaScript显示图表时引用它。
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8"/>
<title>Top Trending Twitter Hashtags</title>
<script src='static/Chart.js'></script>
<script src="//ajax.googleapis.com/ajax/libs/jquery/1.9.1/jquery.min.js"></script>
</head>
<body>
<h2>Top Trending Twitter Hashtags</h2>
<div style="width:700px;height=500px">
<canvas id="chart"></canvas>
</div>
</body>
</html>
现在, 让我们使用下面的JavaScript代码来构建图表。首先, 我们获取canvas元素, 然后创建一个新的图表对象, 并将canvas元素传递给它, 然后定义其数据对象, 如下所示。
请注意, 数据的标签和数据受标签和值变量限制, 这些标签和值变量在调用app.py文件中的get_chart_page函数时呈现页面时返回。
最后剩下的部分是配置为每秒执行一次Ajax请求并调用URL / refreshData的函数, 该函数将在app.py中执行refresh_graph_data并返回新的更新数据, 然后更新呈现新数据的char。
<script>
var ctx = document.getElementById("chart");
var myChart = new Chart(ctx, {
type: 'horizontalBar', data: {
labels: [{% for item in labels %}
"{{item}}", {% endfor %}], datasets: [{
label: '# of Mentions', data: [{% for item in values %}
{{item}}, {% endfor %}], backgroundColor: [
'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)', 'rgba(255, 159, 64, 0.2)', 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)'
], borderColor: [
'rgba(255, 99, 132, 1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)', 'rgba(255, 159, 64, 1)', 'rgba(255, 99, 132, 1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)'
], borderWidth: 1
}]
}, options: {
scales: {
yAxes: [{
ticks: {
beginAtZero:true
}
}]
}
}
});
var src_Labels = [];
var src_Data = [];
setInterval(function(){
$.getJSON('/refreshData', {
}, function(data) {
src_Labels = data.sLabel;
src_Data = data.sData;
});
myChart.data.labels = src_Labels;
myChart.data.datasets[0].data = src_Data;
myChart.update();
}, 1000);
</script>
一起运行应用程序
让我们按以下顺序运行这三个应用程序:1. Twitter App Client。 2. Spark应用程序。 3.仪表板Web应用程序。
然后, 你可以使用URL <http:// localhost:5001 />访问实时仪表板
现在, 你可以看到图表正在更新, 如下所示:
Apache流式现实生活用例
我们已经学习了如何使用Spark Streaming实时对数据进行简单的数据分析, 以及如何使用RESTful网络服务将其直接与简单的仪表板集成。从这个示例中, 我们可以看到Spark的强大功能, 因为它捕获了大量的数据流, 对其进行转换, 并提取出有价值的见解, 这些见解可轻松用于立即做出决策。可以实施许多有用的用例, 这些用例可以服务于不同行业, 例如新闻或营销。
新闻行业的例子
我们可以跟踪最常提及的主题标签, 以了解人们在社交媒体上谈论最多的话题。此外, 我们可以跟踪特定的主题标签及其推文, 以了解人们对世界上特定主题或事件的评价。
营销实例
我们可以收集推文流, 并通过情感分析将其分类并确定人们的兴趣, 以便针对他们的兴趣提供相关的要约。
此外, 有很多用例可以专门用于大数据分析, 并且可以服务于许多行业。一般而言, 有关更多Apache Spark用例, 建议你查看我们之前的文章之一。
我鼓励你从此处阅读有关Spark Streaming的更多信息, 以便更多地了解它的功能, 并对数据进行更高级的转换, 以便实时使用它获得更多见解。
评论前必须登录!
注册