本文概述
有人可能会争辩说, 正确的ETL管道是数据科学的重要组成部分。如果没有整洁有序的数据, 就很难产生可增强业务决策的高质量见解。
因此, 在本教程中, 我们将探讨构建简单的ETL管道以使用R将实时Tweets直接流式传输到SQLite数据库中的必要条件。例如, 这是社交网络分析中相当常见的任务。
重点将放在涵盖数据收集和存储的思考过程, 以及如何使用R中的rtweet包操作Twitter API。
首先, 让我们从拥有所有合适的工具开始。你需要做的第一件事是设置对Twitter API的访问权限。总体而言, 你需要执行以下步骤:
- 如果你没有Twitter帐户, 请创建一个。
- 单击此链接并申请开发者帐户(请注意, 此过程现在需要Twitter批准的应用程序)。
- 在以下网页$ ^ 1 $上创建一个新应用。
- 填写有关你的应用程序的所有详细信息, 并创建你的访问令牌$ ^ 1 $。
- 收集你需要连接到API的使用者密钥, 使用者密钥, 访问令牌和访问令牌密钥。
设置完Twitter API后, 如果没有, 则需要获取SQLite。有关如何在计算机上获取SQLite的完整过程, 请遵循srcmini上的SQLite入门指南教程。由于操作简单, 因此选择使用SQLite作为本教程。
步骤1:创建数据库和表以存储Twitter数据
通过安装Twitter API访问和SQLite, 我们最终可以开始构建管道以存储Tweet, 因为我们会随着时间流化它们。首先, 我们将使用R创建新的SQLite数据库, 如下所示:
# Import necessary libraries and functions
library(RSQLite)
library(rtweet)
library(tm)
library(dplyr)
library(knitr)
library(wordcloud)
library(lubridate)
library(ggplot2)
source("transform_and_clean_tweets.R")
# Create our SQLite database
conn <- dbConnect(RSQLite::SQLite(), "Tweet_DB.db")
之后, 我们可以在数据库内部创建一个表来保存推文。在我们的特定情况下, 我们将存储以下变量:
- Tweet_ID作为INTEGER主键
- 使用者为TEXT
- Tweet_Content为TEXT
- Date_Created为INTEGER
你可能想知道, 为什么我将日期设置为整数?这是因为SQLite没有保留日期和时间的数据类型。因此, 日期将被存储为自1970-01-01开始的秒数。
现在让我们继续写表:
dbExecute(conn, "CREATE TABLE Tweet_Data(
Tweet_ID INTEGER PRIMARY KEY, User TEXT, Tweet_Content TEXT, Date_Created INTEGER)")
创建表后, 可以转到sqlite3.exe并检查是否确实已创建。你可以在下面看到此屏幕截图:
第2步:串流有关你喜欢的主题的推文!
信不信由你, 你已经建立了简单, 功能正常的Twitter流传输管道所需的要求和基础结构已经完成。我们现在需要做的是使用API流推文。值得注意的是, 出于本教程的目的, 我将使用标准的免费API。例如, 如果你进行研究, 有一些付费版本可能更适合你的Tweet流媒体需求。
不要再拖延了, 让我们开始设置Twitter侦听器的过程。你需要做的第一件事是导入rtweet程序包, 并按照开头所述输入应用程序的访问令牌和机密:
token <- create_token(app = 'Your_App_Name', consumer_key = 'Your_Consumer_Key', consumer_secret = 'Your_Consumer_Secret', access_token = 'Your_Access_Token', access_secret = 'Your_Access_Secret')
放置好令牌后, 下一步是确定你要流式传输(即收听)哪些推文。 rtweet包中的stream_tweets函数为我们提供了查询Twitter API的各种选项。例如, 你可以流式传输包含一组给定的标签或关键字(最多400个)中的一个或多个, 所有公开可用推文的一小部分随机子集的推文, 跟踪一组用户ID或屏幕(用户)的推文。名称(最多5000个), 或按地理位置收集推文。
对于本教程, 我决定继续进行流式处理, 其中包含与数据科学相关的标签的推文(请参见下面的列表)。你可能会注意到, 我定义要推送的主题标签的格式有些奇怪。但是, 当你要收听主题标签或关键字时, 这是stream_tweets函数所需的格式。如果你打算听一组给定的用户或基于坐标, 则此格式会有所不同。有关更多详细信息, 请参阅文档。
keys <- "#nlp, #machinelearning, #datascience, #chatbots, #naturallanguageprocessing, #deeplearning"
定义了关键字之后, 就该定义推文流循环了。有几种方法可以做到这一点, 但是这种格式过去对我来说效果很好:
# Initialize the streaming hour tally
hour_counter <- 0
# Initialize a while loop that stops when the number of hours you want to stream tweets for is exceeded
while(hour_counter <= 12){
# Set the stream time to 2 hours each iteration (7200 seconds)
streamtime <- 7200
# Create the file name where the 2 hour stream will be stored. Note that the Twitter API outputs a .json file.
filename <- paste0("nlp_stream_", format(Sys.time(), '%d_%m_%Y__%H_%M_%S'), ".json")
# Stream Tweets containing the desired keys for the specified amount of time
stream_tweets(q = keys, timeout = streamtime, file_name = filename)
# Clean the streamed tweets and select the desired fields
clean_stream <- transform_and_clean_tweets(filename, remove_rts = TRUE)
# Append the streamed tweets to the Tweet_Data table in the SQLite database
dbWriteTable(conn, "Tweet_Data", clean_stream, append = T)
# Delete the .json file from this 2-hour stream
file.remove(filename)
# Add the hours to the tally
hour_counter <- hour_counter + 2
}
本质上, 该循环以2小时的间隔流式传输尽可能多的tweet, 以提及键字符串中的任何主题标签, 总时间为12小时。每2个小时, Twitter侦听器就会在你当前的工作目录中创建一个.json文件, 其名称在变量filename中指定。
然后, 它将这个文件名传递给transform_and_clean_tweets函数, 该函数会在需要时删除转推, 从Twitter API提供的所有列中选择要保留的列, 并对Tweets中包含的文本进行规范化。
然后, 它将结果数据帧附加到我们之前在SQLite数据库中创建的Tweet_Data表。最后, 它将2加到小时计数表中(因为流持续2个小时), 并删除了创建的.json文件。这样做是因为所有感兴趣的数据现在都在我们的数据库中, 并且保留.json文件可能会成为存储负担。
让我们知道详细了解一下transform_and_clean_tweets函数:
transform_and_clean_tweets <- function(filename, remove_rts = TRUE){
# Import the normalize_text function
source("normalize_text.R")
# Parse the .json file given by the Twitter API into an R data frame
df <- parse_stream(filename)
# If remove_rst = TRUE, filter out all the retweets from the stream
if(remove_rts == TRUE){
df <- filter(df, df$is_retweet == FALSE)
}
# Keep only the tweets that are in English
df <- filter(df, df$lang == "en")
# Select the features that you want to keep from the Twitter stream and rename them
# so the names match those of the columns in the Tweet_Data table in our database
small_df <- df[, c("screen_name", "text", "created_at")]
names(small_df) <- c("User", "Tweet_Content", "Date_Created")
# Finally normalize the tweet text
small_df$Tweet_Content <- sapply(small_df$Tweet_Content, normalize_text)
# Return the processed data frame
return(small_df)
}
如前所述, 此功能旨在过滤转发消息(如果需要), 保留所需功能并规范化推文文本。从本质上讲, 这可以视为ETL缩写词(转换)的” T”部分。此功能的关键组成部分是清除推文的过程。
通常, 文本数据需要一些预处理步骤, 才能准备好进行任何类型的分析。对于推文, 这些步骤可以包括删除URL, 停用词和提及, 将文本变为小写字母, 词干等。但是, 并非所有这些步骤始终都是必需的。不过, 暂时, 我将在下面显示用于预处理这些推文的normalize_text函数:
normalize_text <- function(text){
# Keep only ASCII characters
text = iconv(text, "latin1", "ASCII", sub="")
# Convert to lower case characters
text = tolower(text)
# Remove any HTML tags
text = gsub("<.*?>", " ", text)
# Remove URLs
text = gsub("\\s?(f|ht)(tp)(s?)(://)([^\\.]*)[\\.|/](\\S*)", "", text)
# Keep letters and numbers only
text = gsub("[^[:alnum:]]", " ", text)
# Remove stop words
text = removeWords(text, c("rt", "gt", stopwords("en")))
# Remove any extra white space
text = stripWhitespace(text)
text = gsub("^\\s+|\\s+$", "", text)
return(text)
}
根据你的用例, 这些步骤可能就足够了。如前所述, 你可以选择添加更多或不同的步骤, 例如词干提取或词形修饰, 或仅保留字母而不是字母和数字。我鼓励你尝试并尝试不同的组合。这可能是练习正则表达式技能的好地方。
完成所有这些步骤之后, 最终状态是一个SQLite数据库, 其中填充了所有流式推文。你可以通过运行几个简单的查询来验证一切正常, 例如:
data_test <- dbGetQuery(conn, "SELECT * FROM Tweet_Data LIMIT 20")
unique_rows <- dbGetQuery(conn, "SELECT COUNT() AS Total FROM Tweet_Data")
kable(data_test)
print(as.numeric(unique_rows))
## [1] 1863
步骤3进行分析
当确定ETL流程可以按预期运行时, 最后一步是收集一些见解并分析你收集的数据。例如, 对于我们收集的推文, 我们可以尝试一些不同的事情。该推文内容中提到的术语词云, 以及一个时间表, 以可视化方式显示我们在12小时的流媒体播放期间何时获得最多的推文。显然, 这份清单并不是对可以使用推文进行的所有研究的全面概述, 其范围可以从情感分析到心理研究等等。
话虽如此, 让我们直接建立一个不错的wordcloud:
# Gather all tweets from the database
all_tweets <- dbGetQuery(conn, "SELECT Tweet_ID, Tweet_Content FROM Tweet_Data")
# Create a term-document matrix and sort the words by frequency
dtm <- TermDocumentMatrix(VCorpus(VectorSource(all_tweets$Tweet_Content)))
dtm_mat <- as.matrix(dtm)
sorted <- sort(rowSums(dtm_mat), decreasing = TRUE)
freq_df <- data.frame(words = names(sorted), freq = sorted)
# Plot the wordcloud
set.seed(42)
wordcloud(words = freq_df$words, freq = freq_df$freq, min.freq = 10, max.words=50, random.order=FALSE, rot.per=0.15, colors=brewer.pal(8, "RdYlGn"))
好吧, 这真是一个巨大的惊喜!机器学习和数据科学是我们所有推文中最常被提及的词-它们恰好是我们要为其流式传输的两个标签。因此, 可以预期该结果。不过, 换句话来说, 看起来有些有趣。例如, 大数据和人工智能并不是我们密钥中的主题标签, 但是它们出现的频率很高, 因此人们可以说它们经常与其他两个主题一起被谈论。我们还选择了其他有趣的词, 例如python或tensorflow, 这些词使我们在收集的tweet内容中有了更多的上下文, 而不仅仅是井号。
现在让我们转向另一个简单的分析。在我们的12小时流中, 我们什么时候收集了最多的推文?为此, 我们将收集整数日期, 将其转换为适当的格式, 然后绘制一段时间内的推文数量:
# Select the dates in which the tweets were created and convert them into UTC date-time format
all_tweets <- dbGetQuery(conn, "SELECT Tweet_ID, Date_Created FROM Tweet_Data")
all_tweets$Date_Created <- as.POSIXct(all_tweets$Date_Created, origin = "1970-01-01", tz = "UTC")
# Group by the day and hour and count the number of tweets that occurred in each bucket
all_tweets_2 <- all_tweets %>%
mutate(day = day(Date_Created), month = month(Date_Created, label = TRUE), hour = hour(Date_Created)) %>%
mutate(day_hour = paste(month, "-", day, "-", hour, sep = "")) %>%
group_by(day_hour) %>%
tally()
# Simple line ggplot
ggplot(all_tweets_2, aes(x = day_hour, y = n)) +
geom_line(aes(group = 1)) +
theme_minimal() +
ggtitle("Tweet Freqeuncy During the 12-h Streming Period")+
ylab("Tweet Count")+
xlab("Month-Day-Hour")
甜!现在我们可以看到, 我们获得的最大数量的独特推文来自UTC时间9月2日(UTC晚上8:00至8:59 PM)(在军事时间显示为20)。
总结
恭喜你!现在你知道了如何在R中构建简单的ETL管道。我们进行的两个分析代表了使用Twitter数据进行的非常基本的分析。但是, 如前所述, 只要构建一个健壮的管道来引入数据, 还有很多事情要做。这方面是本教程的主要重点。
话虽如此, 本教程仅显示了一个非常小的案例研究, 以逐步了解为Twitter数据构建ETL管道的过程。为整个企业建立健壮且可扩展的ETL管道是一项复杂的工作, 需要大量的计算资源和知识, 尤其是在涉及大数据时。
我鼓励你进行进一步的研究, 并尝试构建自己的小规模管道, 这可能涉及在Python $ ^ 1 $中构建一个管道。也许, 甚至继续尝试一些大数据项目。例如, srcmini已经通过PySpark课程开设了诸如”大数据基础知识”之类的课程, 在这些课程中, 他们使用诸如PySpark之类的工具来学习大数据, 从而可以进一步了解该领域。
如果你想了解数据工程, 请参加srcmini的数据工程入门课程。
参考文献
- Foley, D.(2019年5月11日)。将Twitter数据流式传输到MySQL数据库中。取自https://towardsdatascience.com/streaming-twitter-data-into-a-mysql-database-d62a02b050d6
评论前必须登录!
注册