本文概述
在这篇文章中, 你将发现:
- PostgreSQL和Python教程
- 使用SQL数据库
- 用Python连接到数据库
- 尝试一些复杂的查询
- 剖析此功能
诸如PostgreSQL之类的数据库需要用户身份验证才能访问, 并且特定于给定的数据库结构。一旦可以访问数据库, 就可以采用类似的技术。
本模块说明了一些基本技术, 可用于连接和使用关系数据库中的数据, 在本例中为PostgreSQL, 它是几个基于SQL的数据库之一。如果你想了解有关关系数据库管理系统(RDBMS)的系统理论方面的更多信息, 那么”数据库系统和概念”是一个很好的资源。
使用SQL数据库
在Python中使用数据库依赖于能够在SQL中编写数据库查询。实际上, 测试Python脚本的一种好方法是首先在第三方程序(例如DBeaver)中测试SQL命令。
SQLZOO是你当前可以练习这些技能的好地方
基础
在SQL语言中, 数据存储在表中。表通常嵌套在称为”模式”的组中。一个数据库中可能有多个架构。
$$ \ text {数据库} \ supset \ text {Schema} \ supset \ text {Table} $$
示例:假设你有一个数据库, 其中包含数据和机器学习问题的结果, 例如预测不良警察事件。结果存储在”结果”模式中。特定的模型指标位于”评估”表中。要查看所有此类结果:
SQL从results.evaluations中选择*;
Decomposing this query, you have several essential elements: `select`. What task you will be doing, `select`. Next, what do you want to return? If everything uses `*`. If you only want one column `your_colunm_name`. Thus `select *` or `select your_column_name`. Next, where are your data stored: `from`? Specify a `schema.table`.
Putting this together, you have the general form: `select * from schema.table;` Note: it is best to end your SQL statements with a "`;`".
Now suppose you want only the results for a specific model id, parameter, and metric, and parameter, you can use a `where` statement to specify:
```sql
select * from results.evaluations
where model_id=971
and metric='false negatives@' and parameter='75.0';
用Python连接到数据库
连接数据库的一种简单方法是使用Python。在这里, 你首先需要一个凭证文件, 例如example_psql.py:
PGHOST=your_database_host
PGDATABASE=your_database_name
PGUSER=your_database_username
PGPASSWORD=your_database_secret_password
接下来, 你将需要导入几个软件包:
import psycopg2
import sys, os
import numpy as np
import pandas as pd
import example_psql as creds
import pandas.io.sql as psql
最后, 数据库连接可以相对简单:
## ****** LOAD PSQL DATABASE ***** ##
# Set up a connection to the postgres server.
conn_string = "host="+ creds.PGHOST +" port="+ "5432" +" dbname="+ creds.PGDATABASE +" user=" + creds.PGUSER \
+" password="+ creds.PGPASSWORD
conn=psycopg2.connect(conn_string)
print("Connected!")
# Create a cursor object
cursor = conn.cursor()
def load_data(schema, table):
sql_command = "SELECT * FROM {}.{};".format(str(schema), str(table))
print (sql_command)
# Load the data
data = pd.read_sql(sql_command, conn)
print(data.shape)
return (data)
为了强调此代码的作用, 它使用你的凭据连接到你的数据库, 并返回你查询的数据, 即schema.table中的select *;作为熊猫数据框。然后, 你可以可视化或分析此数据, 就像将任何数据从CSV加载到Pandas中一样。
一个更复杂的例子
除了简单的连接, 你还可以在单独的脚本中使用一系列功能来连接数据库。该脚本位于setup / setup_environment.py中:
#!/usr/bin/env python
import os
import yaml
from sqlalchemy import create_engine
import logging
log = logging.getLogger(__name__)
def get_database():
try:
engine = get_connection_from_profile()
log.info("Connected to PostgreSQL database!")
except IOError:
log.exception("Failed to get database connection!")
return None, 'fail'
return engine
def get_connection_from_profile(config_file_name="default_profile.yaml"):
"""
Sets up database connection from config file.
Input:
config_file_name: File containing PGHOST, PGUSER, PGPASSWORD, PGDATABASE, PGPORT, which are the
credentials for the PostgreSQL database
"""
with open(config_file_name, 'r') as f:
vals = yaml.load(f)
if not ('PGHOST' in vals.keys() and
'PGUSER' in vals.keys() and
'PGPASSWORD' in vals.keys() and
'PGDATABASE' in vals.keys() and
'PGPORT' in vals.keys()):
raise Exception('Bad config file: ' + config_file_name)
return get_engine(vals['PGDATABASE'], vals['PGUSER'], vals['PGHOST'], vals['PGPORT'], vals['PGPASSWORD'])
def get_engine(db, user, host, port, passwd):
"""
Get SQLalchemy engine using credentials.
Input:
db: database name
user: Username
host: Hostname of the database server
port: Port number
passwd: Password for the database
"""
url = 'postgresql://{user}:{passwd}@{host}:{port}/{db}'.format(
user=user, passwd=passwd, host=host, port=port, db=db)
engine = create_engine(url, pool_size = 50)
return engine
有了此脚本后, 我们可以使用新脚本连接到数据库:
import sys
import os
import pandas as pd
import subprocess
import argparse
import pdb
import pickle
from setup import setup_environment
# Make PostgreSQL Connection
engine = setup_environment.get_database()
try:
con = engine.raw_connection()
con.cursor().execute("SET SCHEMA '{}'".format('your_schema_name'))
except:
pass
注意:在此代码示例中, 你希望将” your_schema_name”替换为架构的特定名称, 例如” models”架构。
尝试一些复杂的查询:
现在, 你已经建立了数据库连接, 你可以尝试一个复杂的查询, 例如为使用Sci-kit Learn构建的模型结果返回pickle文件(一种存储数据的Python方法)。
def get_pickle_best_models(timestamp, metric, parameter=None, number=25, directory="results/"):
"""
--------------------------------------------------------
Get the PICKLE FILE of the best models
by the specified timestamp and given metric
RETURNS the PICKLE FILE to a DIRECTORY
--------------------------------------------------------
ARGUMENTS:
timestamp: models run on or after given timestamp
example: '2018-09-02'
metric: metric to be optimized
example: 'precision@'
parameter: parameter value or threshold if any
default=None
example: '10.0'
number: maximum number of desired results
default = 25
--------------------------------------------------------
"""
if parameter is None:
query = ("SELECT pickle_blob, run_time FROM \
(SELECT evaluations.model_id, run_time \
FROM results.evaluations JOIN results.models \
ON evaluations.model_id=models.model_id \
WHERE run_time >= '{}' \
AND value is not null \
AND metric = '{}' \
ORDER BY value DESC LIMIT {}) \
AS top_models \
INNER JOIN results.data \
ON top_models.model_id=data.model_id ; " ).format(timestamp, metric, number)
elif parameter is not None:
query = ("SELECT pickle_blob, run_time FROM \
(SELECT evaluations.model_id, run_time \
FROM results.evaluations JOIN results.models \
ON evaluations.model_id=models.model_id \
WHERE run_time >= '{}' \
AND value is not null \
AND metric = '{}' \
AND parameter = '{}' \
ORDER BY value DESC LIMIT {}) \
AS top_models \
INNER JOIN results.data \
ON top_models.model_id=data.model_id ; " ).format(timestamp, metric, parameter, number)
df_models = pd.read_sql(query, con=con)
N = len(df_models['pickle_blob'])
for file_number in range(0, N):
pickle_file = pickle.loads(df_models['pickle_blob'].iloc[file_number])
file_name = df_models['run_time'].apply(lambda x: str(x).replace(' ', 'T')).iloc[file_number]
if parameter is None:
full_file_name = "police_eis_results_"+"top_"+metric+"any"+"_"+file_name+".pkl"
elif parameter is not None:
full_file_name = "police_eis_results_"+"top_"+metric+parameter+"_"+file_name+".pkl"
file_path = directory+full_file_name
pickle.dump(pickle_file, open( file_path, "wb" ) )
return None
剖析此功能:
除去一些细节, 你便具有以下一般流程
def get_pickle_best_models(timestamp, metric, parameter=None, number=25, directory="results/"):
if parameter is None:
# Do Query WITHOUT Parameter
elif parameter is not None:
# Do Query WITH Parameter
# Run Query and Store Results as Pandas Data Frame
df_models = pd.read_sql(query, con=con)
# Loop Over Dataframe to Save Pickle Files for Different Model Run Times
# (Code Here)
编写Python查询
如示例所示, 你将SQL查询编写为字符串:
query = ("Long SQL Query as a String")
在这里, 你可以使用.format(var)方法将变量插入查询中。这样, 你可以根据传递给函数的各种参数来系统地更改查询。上述查询之一的完整格式如下:
query = ("SELECT pickle_blob, run_time FROM \
(SELECT evaluations.model_id, run_time \
FROM results.evaluations JOIN results.models \
ON evaluations.model_id=models.model_id \
WHERE run_time >= '{}' \
AND value is not null \
AND metric = '{}' \
AND parameter = '{}' \
ORDER BY value DESC LIMIT {}) \
AS top_models \
INNER JOIN results.data \
ON top_models.model_id=data.model_id ; " ).format(timestamp, metric, parameter, number)
注意查询的格式。在python代码中, 你需要使用\中断查询行, 并使用格式函数'{}’。format(x)将变量插入脚本。格式括号是否需要用单引号引起来, 具体取决于你是否尝试将字符串值或数字值传递给查询。从Python操作中抽象查询, 然后剩下一个SQL查询。下面, 你可以尝试一些示例格式的示例值, 以表示时间戳, 度量, 参数和结果数的Python格式:
SELECT pickle_blob, run_time FROM
(SELECT evaluations.model_id, run_time
FROM results.evaluations JOIN results.models
ON evaluations.model_id=models.model_id
WHERE run_time >= '2018-09-02'
AND value is not null
AND metric = 'precision@'
AND parameter = '10.0'
ORDER BY value DESC LIMIT 10)
AS top_models
INNER JOIN results.data
ON top_models.model_id=data.model_id ;
你还回什么
上面的查询是一个更复杂的示例, 但很有价值。首先, 你要返回什么:
- pickle_blob
- 运行
*例如。 SELECT pickle_blob, 运行时
这些数据从哪里来?
- 通过子查询对results.data表进行联接, 对result.evaluations和results.models进行联接。
在上面的示例中, 你需要模型的pickle_blob和模型的run_time。你还需要根据某些条件获得最佳模型。但是, 所有这些属性都位于结果模式中的离散表中。因此, 你必须对这些表进行几次连接。
从技术上讲, 你正在执行所谓的子查询。请注意主查询SELECT pickle_blob的第一行, run_time FROM, 其中FROM引用括号中的子查询(SELECT … LIMIT 10), 你使用别名AS top_models对其进行引用。
在这里, top_models是一个任意名称, 仅在查询中定义和使用。你可以将top_models替换为x或y或my_best_model_ever。该名称无所谓, 只要它与其他表名和保留的SQL语句(例如select)是离散的即可。它仅引用我们的子查询的结果。
本质上, 子查询返回联接的result.models和results.evaluations表的选择性版本。你可以使用ON的top_models.model_id = data.model_id对top_models子查询使用top_models子查询来进行INNER JOIN。
你的子查询是什么?
SELECT evaluations.model_id, run_time
FROM results.evaluations JOIN results.models
ON evaluations.model_id=models.model_id
WHERE run_time >= '2018-09-02'
AND value is not null
AND metric = 'precision@'
AND parameter = '10.0'
ORDER BY value DESC LIMIT 10
子查询还对在model_id上联接的result.evaluations和results.models表进行联接。它从联接的表中返回model_id和run_time。
条件:
在这里, 我们只需要符合特定条件的模型:
- 时间戳记WHERE运行时间> =’2018-09-02′
- value列上的结果为非null:AND值不为null
- 指标AND指标=’precision @’
- 参数AND参数= ’10 .0′
- 前n个结果ORDER BY值DESC LIMIT 10
如你所见, 尽管此查询包含许多元素, 但是你可以使用Python操纵查询。最后, 你可以利用数据库的功能和Python的功能来构建动态代码。
总结
做得好!你现在知道了Python中的数据库, 而PostgreSQL数据库中的示例很少。此外, 你可以自己进行自我学习并找到学习资源。如果你有兴趣一般地了解有关数据库主题的更多信息, 请阅读srcmini的教程”在SQL中执行Python / R”。
评论前必须登录!
注册