python - pyspark将多个csv文件读取到一个数据帧中(或RDD?)

我有一个spark 2.0.2集群,我正在通过jupyter笔记本上的pyspark进行访问。我有多个以管道分隔的txt文件(加载到hdfs中)。但也可以在本地目录中找到),我需要使用spark csv将其加载到三个独立的数据帧中,具体取决于文件名。
我看到有三种方法可以采用-要么我可以使用python以某种方式遍历hdfs目录(还没有弄清楚如何做到这一点,加载每个文件,然后进行联合)。
我也知道火花中存在一些通配符函数(见AA>),我可能会利用它。
最后,我可以使用pandas将vanilla csv文件作为pandas数据帧从磁盘加载,然后创建spark数据帧。这里的缺点是这些文件很大,加载到单个节点上的内存可能需要大约8GB。(这就是为什么这首先会转移到集群中)。
这是我到目前为止拥有的代码和两种方法的一些伪代码:

import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
import pandas as pd

sc = pyspark.SparkContext(appName = 'claims_analysis', master='spark://someIP:7077')

spark = SparkSession(sc)

#METHOD 1 - iterate over HDFS directory
for currFile in os.listdir(HDFS:///someDir//):
    if #filename contains 'claim':
        #create or unionAll to merge claim_df
    if #filename contains 'pharm':
        #create or unionAll to merge pharm_df
    if #filename contains 'service':
        #create or unionAll to merge service_df

#Method 2 - some kind of wildcard functionality
claim_df = spark.read.format('com.databricks.spark.csv').options(delimiter = '|',header ='true',nullValue ='null').load('HDFS:///someDir//*<claim>.csv')
pharm_df = spark.read.format('com.databricks.spark.csv').options(delimiter = '|',header ='true',nullValue ='null').load('HDFS:///someDir//*<pharm>.csv')
service_df = spark.read.format('com.databricks.spark.csv').options(delimiter = '|',header ='true',nullValue ='null').load('HDFS:///someDir//*<service>.csv')


#METHOD 3 - load to a pandas df and then convert to spark df
for currFile in os.listdir(HDFS:///someDir//)
    pd_df = pd.read_csv(currFile, sep = '|')
    df = spark.createDataFrame(pd_df)
    if #filename contains 'claim':
        #create or unionAll to merge claim_df
    if #filename contains 'pharm':
        #create or unionAll to merge pharm_df
    if #filename contains 'service':
        #create or unionAll to merge service_df

有人知道如何实现方法1或2吗?我还没搞清楚。另外,我很惊讶没有更好的方法将csv文件加载到pyspark数据帧中-使用第三方包来完成一些看起来应该是本机特性的事情让我感到困惑(我是否错过了将csv文件加载到数据帧中的标准用例?)最后,我将把一个统一的数据帧写回hdfs(使用.write.parquet()),这样我就可以清除内存并使用mllib进行一些分析。如果我所强调的方法不是最佳实践,我会很感激朝着正确的方向努力!


最佳答案:

方法1:
在python中,不能直接引用hdfs位置。你需要像皮杜普这样的图书馆的帮助。在scala和java中,有api。即使有了pydoop,你也会一个接一个地阅读文件。不使用spark提供的并行读取选项,逐个读取文件是不好的。
方法2:
您应该能够用逗号分隔或通配符来指向多个文件。这种方式spark负责读取文件并将它们分发到分区中。但是,如果对每个数据帧使用union选项,则在动态读取每个文件时会出现一个边大小写。当你有很多文件时,这个列表在驱动程序级别会变得非常庞大,并可能导致内存问题。主要原因是,读取过程仍在驱动程序级别进行。
这个选择更好。spark将读取所有与regex相关的文件并将它们转换为分区。所有的通配符匹配都有一个RDD,从那里你就不必担心单个RDD的联合了
示例代码cnippet:

distFile = sc.textFile("/hdfs/path/to/folder/fixed_file_name_*.csv")

方法3:
除非在python中有一些使用panda特性的遗留应用程序,否则我更希望使用spark提供的api

译文:来源   文章分类: python apache-spark pyspark spark-dataframe jupyter-notebook

相关文章:

python - 使用pycurl获取回复的标头值

python - Python-未找到pip安装匹配的版本

python - 如何从目录输入多个文件

python - “平衡”符号列表

python - Python:使用NaN对数组进行排序

python - run.main()如何工作?

python - NAN值被认为是python中的字符串

python - httplib vs urllib2和cookie

python - 在Python / OpenGL中渲染数学符号?

python - Numpy-从距离矩阵中提取唯一值