hdfs dfs -mkdir datasets
hdfs dfs -put /opt/cesga/cursos/pyspark_2022/datasets datasets
Recogemos los datos del CSV ya en el propio hdfs diciendo que recoja también los Headers(Cabecera) y el inferSchema(La estructura de cada campo) para saber:
from pyspark.sql.functions import date_format
df = spark.read.csv("datasets/NYC_taxi_trip_records/yellow_tripdata_2018-11.csv", header=True,inferSchema=True)
rdd = df.rdd
Printamos el Schema para ver la estructura del CSV y recogemos de prueba los primeros X campos
df.printSchema()
root |-- VendorID: integer (nullable = true) |-- tpep_pickup_datetime: timestamp (nullable = true) |-- tpep_dropoff_datetime: timestamp (nullable = true) |-- passenger_count: integer (nullable = true) |-- trip_distance: double (nullable = true) |-- RatecodeID: integer (nullable = true) |-- store_and_fwd_flag: string (nullable = true) |-- PULocationID: integer (nullable = true) |-- DOLocationID: integer (nullable = true) |-- payment_type: integer (nullable = true) |-- fare_amount: double (nullable = true) |-- extra: double (nullable = true) |-- mta_tax: double (nullable = true) |-- tip_amount: double (nullable = true) |-- tolls_amount: double (nullable = true) |-- improvement_surcharge: double (nullable = true) |-- total_amount: double (nullable = true)
df.take(10)
[Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2018, 11, 1, 0, 51, 36), tpep_dropoff_datetime=datetime.datetime(2018, 11, 1, 0, 52, 36), passenger_count=1, trip_distance=0.0, RatecodeID=1, store_and_fwd_flag=u'N', PULocationID=145, DOLocationID=145, payment_type=2, fare_amount=2.5, extra=0.5, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, improvement_surcharge=0.3, total_amount=3.8), Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2018, 11, 1, 0, 7, 47), tpep_dropoff_datetime=datetime.datetime(2018, 11, 1, 0, 21, 43), passenger_count=1, trip_distance=2.3, RatecodeID=1, store_and_fwd_flag=u'N', PULocationID=142, DOLocationID=164, payment_type=1, fare_amount=11.0, extra=0.5, mta_tax=0.5, tip_amount=2.45, tolls_amount=0.0, improvement_surcharge=0.3, total_amount=14.75), Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2018, 11, 1, 0, 24, 27), tpep_dropoff_datetime=datetime.datetime(2018, 11, 1, 0, 34, 29), passenger_count=1, trip_distance=1.8, RatecodeID=1, store_and_fwd_flag=u'N', PULocationID=164, DOLocationID=48, payment_type=2, fare_amount=8.5, extra=0.5, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, improvement_surcharge=0.3, total_amount=9.8), Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2018, 11, 1, 0, 35, 27), tpep_dropoff_datetime=datetime.datetime(2018, 11, 1, 0, 47, 2), passenger_count=1, trip_distance=2.3, RatecodeID=1, store_and_fwd_flag=u'N', PULocationID=48, DOLocationID=107, payment_type=1, fare_amount=10.0, extra=0.5, mta_tax=0.5, tip_amount=3.35, tolls_amount=0.0, improvement_surcharge=0.3, total_amount=14.65), Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2018, 11, 1, 0, 16, 46), tpep_dropoff_datetime=datetime.datetime(2018, 11, 1, 0, 22, 50), passenger_count=1, trip_distance=1.0, RatecodeID=1, store_and_fwd_flag=u'N', PULocationID=163, DOLocationID=170, payment_type=2, fare_amount=6.0, extra=0.5, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, improvement_surcharge=0.3, total_amount=7.3), Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2018, 11, 1, 0, 23, 57), tpep_dropoff_datetime=datetime.datetime(2018, 11, 1, 0, 34, 29), passenger_count=1, trip_distance=2.1, RatecodeID=1, store_and_fwd_flag=u'N', PULocationID=170, DOLocationID=79, payment_type=2, fare_amount=9.0, extra=0.5, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, improvement_surcharge=0.3, total_amount=10.3), Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2018, 11, 1, 0, 57, 7), tpep_dropoff_datetime=datetime.datetime(2018, 11, 1, 1, 5, 27), passenger_count=1, trip_distance=1.6, RatecodeID=1, store_and_fwd_flag=u'N', PULocationID=148, DOLocationID=79, payment_type=2, fare_amount=7.5, extra=0.5, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, improvement_surcharge=0.3, total_amount=8.8), Row(VendorID=2, tpep_pickup_datetime=datetime.datetime(2018, 11, 1, 1, 3, 28), tpep_dropoff_datetime=datetime.datetime(2018, 11, 1, 1, 32, 10), passenger_count=2, trip_distance=22.09, RatecodeID=2, store_and_fwd_flag=u'N', PULocationID=132, DOLocationID=162, payment_type=2, fare_amount=52.0, extra=0.0, mta_tax=0.5, tip_amount=0.0, tolls_amount=5.76, improvement_surcharge=0.3, total_amount=58.56), Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2018, 11, 1, 0, 5, 28), tpep_dropoff_datetime=datetime.datetime(2018, 11, 1, 0, 14, 50), passenger_count=1, trip_distance=1.6, RatecodeID=1, store_and_fwd_flag=u'N', PULocationID=158, DOLocationID=100, payment_type=2, fare_amount=8.0, extra=0.5, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, improvement_surcharge=0.3, total_amount=9.3), Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2018, 11, 1, 0, 30, 17), tpep_dropoff_datetime=datetime.datetime(2018, 11, 1, 0, 41, 39), passenger_count=2, trip_distance=1.8, RatecodeID=1, store_and_fwd_flag=u'N', PULocationID=90, DOLocationID=246, payment_type=2, fare_amount=9.5, extra=0.5, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, improvement_surcharge=0.3, total_amount=10.8)]
Creamos la tabla en SparkSQL denominado taxi que irán los datos del csv a partir del DataFrame extraido
df.createOrReplaceTempView('taxi')
Hacemos una línea de prueba
spark.sql("Select * from taxi").show()
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+ |VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount| +--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+ | 1| 2018-11-01 00:51:36| 2018-11-01 00:52:36| 1| 0.0| 1| N| 145| 145| 2| 2.5| 0.5| 0.5| 0.0| 0.0| 0.3| 3.8| | 1| 2018-11-01 00:07:47| 2018-11-01 00:21:43| 1| 2.3| 1| N| 142| 164| 1| 11.0| 0.5| 0.5| 2.45| 0.0| 0.3| 14.75| | 1| 2018-11-01 00:24:27| 2018-11-01 00:34:29| 1| 1.8| 1| N| 164| 48| 2| 8.5| 0.5| 0.5| 0.0| 0.0| 0.3| 9.8| | 1| 2018-11-01 00:35:27| 2018-11-01 00:47:02| 1| 2.3| 1| N| 48| 107| 1| 10.0| 0.5| 0.5| 3.35| 0.0| 0.3| 14.65| | 1| 2018-11-01 00:16:46| 2018-11-01 00:22:50| 1| 1.0| 1| N| 163| 170| 2| 6.0| 0.5| 0.5| 0.0| 0.0| 0.3| 7.3| | 1| 2018-11-01 00:23:57| 2018-11-01 00:34:29| 1| 2.1| 1| N| 170| 79| 2| 9.0| 0.5| 0.5| 0.0| 0.0| 0.3| 10.3| | 1| 2018-11-01 00:57:07| 2018-11-01 01:05:27| 1| 1.6| 1| N| 148| 79| 2| 7.5| 0.5| 0.5| 0.0| 0.0| 0.3| 8.8| | 2| 2018-11-01 01:03:28| 2018-11-01 01:32:10| 2| 22.09| 2| N| 132| 162| 2| 52.0| 0.0| 0.5| 0.0| 5.76| 0.3| 58.56| | 1| 2018-11-01 00:05:28| 2018-11-01 00:14:50| 1| 1.6| 1| N| 158| 100| 2| 8.0| 0.5| 0.5| 0.0| 0.0| 0.3| 9.3| | 1| 2018-11-01 00:30:17| 2018-11-01 00:41:39| 2| 1.8| 1| N| 90| 246| 2| 9.5| 0.5| 0.5| 0.0| 0.0| 0.3| 10.8| | 1| 2018-11-01 00:07:47| 2018-11-01 00:17:42| 2| 2.3| 1| N| 158| 162| 2| 9.5| 0.5| 0.5| 0.0| 0.0| 0.3| 10.8| | 1| 2018-11-01 00:18:40| 2018-11-01 00:24:25| 2| 1.0| 1| N| 162| 161| 1| 6.0| 0.5| 0.5| 1.45| 0.0| 0.3| 8.75| | 1| 2018-11-01 00:29:40| 2018-11-01 00:49:02| 1| 7.0| 1| N| 162| 255| 1| 21.5| 0.5| 0.5| 3.0| 5.76| 0.3| 31.56| | 1| 2018-11-01 00:17:55| 2018-11-01 00:32:05| 2| 1.8| 1| N| 100| 107| 1| 11.0| 0.5| 0.5| 1.23| 0.0| 0.3| 13.53| | 1| 2018-11-01 00:37:58| 2018-11-01 00:58:01| 3| 2.1| 1| N| 148| 158| 3| 14.0| 0.5| 0.5| 0.0| 0.0| 0.3| 15.3| | 2| 2018-11-01 00:07:07| 2018-11-01 00:34:11| 3| 7.64| 1| N| 142| 243| 1| 26.5| 0.5| 0.5| 5.56| 0.0| 0.3| 33.36| | 2| 2018-11-01 00:59:37| 2018-11-01 01:04:10| 1| 0.98| 1| N| 24| 238| 1| 5.5| 0.5| 0.5| 1.36| 0.0| 0.3| 8.16| | 1| 2018-11-01 00:46:50| 2018-11-01 01:08:13| 1| 2.8| 1| N| 163| 249| 1| 15.5| 0.5| 0.5| 2.0| 0.0| 0.3| 18.8| | 1| 2018-11-01 00:26:12| 2018-11-01 00:34:22| 1| 2.0| 1| N| 237| 137| 2| 8.5| 0.5| 0.5| 0.0| 0.0| 0.3| 9.8| | 1| 2018-11-01 00:37:51| 2018-11-01 00:39:46| 1| 0.1| 1| N| 137| 137| 2| 3.5| 0.5| 0.5| 0.0| 0.0| 0.3| 4.8| +--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+ only showing top 20 rows
dfTime=spark.sql("Select tpep_pickup_datetime from taxi").withColumn('time',date_format('tpep_pickup_datetime','HH:mm:ss')).withColumn('date',date_format('tpep_pickup_datetime','yyyy-MM-dd'))
dfTime.show()
+--------------------+--------+----------+ |tpep_pickup_datetime| time| date| +--------------------+--------+----------+ | 2018-11-01 00:51:36|00:51:36|2018-11-01| | 2018-11-01 00:07:47|00:07:47|2018-11-01| | 2018-11-01 00:24:27|00:24:27|2018-11-01| | 2018-11-01 00:35:27|00:35:27|2018-11-01| | 2018-11-01 00:16:46|00:16:46|2018-11-01| | 2018-11-01 00:23:57|00:23:57|2018-11-01| | 2018-11-01 00:57:07|00:57:07|2018-11-01| | 2018-11-01 01:03:28|01:03:28|2018-11-01| | 2018-11-01 00:05:28|00:05:28|2018-11-01| | 2018-11-01 00:30:17|00:30:17|2018-11-01| | 2018-11-01 00:07:47|00:07:47|2018-11-01| | 2018-11-01 00:18:40|00:18:40|2018-11-01| | 2018-11-01 00:29:40|00:29:40|2018-11-01| | 2018-11-01 00:17:55|00:17:55|2018-11-01| | 2018-11-01 00:37:58|00:37:58|2018-11-01| | 2018-11-01 00:07:07|00:07:07|2018-11-01| | 2018-11-01 00:59:37|00:59:37|2018-11-01| | 2018-11-01 00:46:50|00:46:50|2018-11-01| | 2018-11-01 00:26:12|00:26:12|2018-11-01| | 2018-11-01 00:37:51|00:37:51|2018-11-01| +--------------------+--------+----------+ only showing top 20 rows
from pyspark.sql import Row
dfFilter=rdd.filter(lambda x: x[2].hour==0 or (x[2].hour==1 and x[2].minute==0 and x[2].second==0)).map(lambda y: Row(tpep_pickup_datetime=y[1])).toDF()
dfFilter.withColumn('date',date_format('tpep_pickup_datetime','yyyy-MM-dd')).groupBy('date').count().sort(['date']).show()
+----------+-----+ | date|count| +----------+-----+ |2008-12-31| 1| |2009-01-01| 6| |2018-10-31| 169| |2018-11-01|10356| |2018-11-02|10735| |2018-11-03|14353| |2018-11-04|12743| |2018-11-05| 4584| |2018-11-06| 5603| |2018-11-07| 7295| |2018-11-08| 9171| |2018-11-09|12401| |2018-11-10|15186| |2018-11-11|13233| |2018-11-12| 5102| |2018-11-13| 6286| |2018-11-14| 7492| |2018-11-15| 8981| |2018-11-16|10524| |2018-11-17|14928| +----------+-----+ only showing top 20 rows
dfFilter=rdd.filter(lambda x: x[2].hour==0 or (x[2].hour==1 and x[2].minute==0 and x[2].second==0)).map(lambda y: Row(tpep_pickup_datetime=y[1])).toDF()
dfFilter.withColumn('month',date_format('tpep_pickup_datetime','MM')).groupBy('month').count().sort(['month']).show()
+-----+------+ |month| count| +-----+------+ | 01| 6| | 10| 169| | 11|269617| | 12| 88| +-----+------+
from pyspark.sql.functions import date_format, avg
result = spark.sql("""
SELECT VendorID,
date_format(tpep_pickup_datetime, 'MM') AS date,
COUNT(*) AS trip_count
FROM taxi
GROUP BY VendorID, date
""").groupBy('VendorID').agg(avg('trip_count').alias('Viajes por mes')).orderBy('VendorID')
result.show()
+--------+--------------+ |VendorID|Viajes por mes| +--------+--------------+ | 1| 3262928.0| | 2| 1187854.75| | 4| 130817.0| +--------+--------------+
spark.sql("""
SELECT VendorID,
AVG(trips_per_day) AS VIAJES_X_DIA
FROM (
SELECT VendorID,
DATE(tpep_pickup_datetime) AS pickup_date,
COUNT(*) AS trips_per_day
FROM taxi
GROUP BY VendorID, DATE(tpep_pickup_datetime)
) AS VIAJES_X_DIA_V
GROUP BY VendorID
ORDER BY VendorID
""").show()
#df.withColumn('time',date_format('tpep_pickup_datetime','yyyy-MM')).filter("time = '2018-11'").orderBy('tpep_pickup_datetime').count()
+--------+------------------+ |VendorID| VIAJES_X_DIA| +--------+------------------+ | 1|108764.26666666666| | 2| 118785.475| | 4| 4360.566666666667| +--------+------------------+
spark.sql("""
SELECT YEAR(tpep_pickup_datetime) as Anio,
MONTH(tpep_pickup_datetime) as Mes,
MAX(passenger_count) as Pasajeros
FROM taxi
WHERE DAY(tpep_pickup_datetime) <= 7
GROUP BY Anio, Mes
""").show()
+----+---+---------+ |Anio|Mes|Pasajeros| +----+---+---------+ |2009| 1| 5| |2019| 1| 1| |2018| 12| 6| |2018| 11| 9| +----+---+---------+
from pyspark.sql.functions import year, month, max,min
taxi_df = df.withColumn("passenger_count", df["passenger_count"].cast("int"))
max_passengers_per_month = taxi_df.groupBy(year("tpep_pickup_datetime").alias("Anio"), month("tpep_pickup_datetime").alias("Mes")).agg(max("passenger_count").alias("Max_de_pasajeros")).orderBy(["Anio","Mes"])
max_passengers_per_month.show()
+----+---+----------------+ |Anio|Mes|Max_de_pasajeros| +----+---+----------------+ |2008| 12| 5| |2009| 1| 5| |2018| 10| 6| |2018| 11| 96| |2018| 12| 6| |2019| 1| 1| |2019| 11| 5| +----+---+----------------+
max_fare = df.agg(max('fare_amount').alias('MAS_CARO'))
max_fare.show()
+---------+ | MAS_CARO| +---------+ |187436.46| +---------+
spark.sql("""
SELECT MAX(fare_amount) AS MAS_CARO
FROM taxi
""").show()
+---------+ | MAS_CARO| +---------+ |187436.46| +---------+
print("MAS_CARO:",rdd.max(lambda x:x[10]).__getitem__('fare_amount'))
('MAS_CARO:', 187436.46)
spark.sql("""
SELECT MIN(fare_amount) AS MAS_BARATO
FROM taxi
""").show()
+----------+ |MAS_BARATO| +----------+ | -450.0| +----------+
min_fare = df.agg(min('fare_amount').alias('MAS_BARATO'))
min_fare.show()
+----------+ |MAS_BARATO| +----------+ | -450.0| +----------+
print("MAS_BARATO:",rdd.min(lambda x:x[10]).__getitem__('fare_amount'))
('MAS_BARATO:', -450.0)