-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathreadCSVFile.py
More file actions
23 lines (19 loc) · 1.37 KB
/
readCSVFile.py
File metadata and controls
23 lines (19 loc) · 1.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#!/usr/bin/env python
from pyspark import SparkContext, RDD
from pyspark.sql import DataFrame, SparkSession
from itertools import izip, count
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import FloatType, LongType, StringType, TimestampType
def readCSVFile(fileName, spark, numOfPartition = 1):
""" Read the beerAdvocateReview CSV file to DataFrame
@type fileName: str
@type spark: SparkSession
@type numOfPartition: int
@rtype: DataFrame
"""
schema = StructType([
StructField("brewery_id", LongType()), StructField("brewery_name", StringType()), StructField("review_time", LongType()), StructField("review_overall", FloatType()), StructField("review_aroma", FloatType()), StructField("review_appearance", FloatType()), StructField("review_profilename", StringType()), StructField("beer_style", StringType()), StructField("review_palate", FloatType()), StructField("review_taste", FloatType()), StructField("beer_name", StringType()), StructField("beer_abv", FloatType()), StructField("beer_beerid", LongType()) ])
with open(fileName, "r") as fhd:
#trainInstance = self.sc.pdrallelize( [ (trainIdx, (amr, depTree, DepPathContainer(depTree))) for (trainIdx, (amr, depTree)) in enumerate(zip(self.amrReader.amrRepository, self.depReader.depTreeRepository))], self.numOfPartition)
df = spark.read.csv(fileName, header=True, schema=schema)
return df