hdfs dfs -mkdir datasets

hdfs dfs -put /opt/cesga/cursos/pyspark_2022/datasets datasets

Recoger datos¶

Recogemos los datos del CSV ya en el propio hdfs diciendo que recoja también los Headers(Cabecera) y el inferSchema(La estructura de cada campo) para saber:

  • Nombre de la columna
  • Tipo de columna
  • In [1]:
    from pyspark.sql.functions import date_format
    
    df = spark.read.csv("datasets/NYC_taxi_trip_records/yellow_tripdata_2018-11.csv", header=True,inferSchema=True)
    rdd = df.rdd
    

    Printamos el Schema para ver la estructura del CSV y recogemos de prueba los primeros X campos

    In [2]:
    df.printSchema()
    
    root
     |-- VendorID: integer (nullable = true)
     |-- tpep_pickup_datetime: timestamp (nullable = true)
     |-- tpep_dropoff_datetime: timestamp (nullable = true)
     |-- passenger_count: integer (nullable = true)
     |-- trip_distance: double (nullable = true)
     |-- RatecodeID: integer (nullable = true)
     |-- store_and_fwd_flag: string (nullable = true)
     |-- PULocationID: integer (nullable = true)
     |-- DOLocationID: integer (nullable = true)
     |-- payment_type: integer (nullable = true)
     |-- fare_amount: double (nullable = true)
     |-- extra: double (nullable = true)
     |-- mta_tax: double (nullable = true)
     |-- tip_amount: double (nullable = true)
     |-- tolls_amount: double (nullable = true)
     |-- improvement_surcharge: double (nullable = true)
     |-- total_amount: double (nullable = true)
    
    
    In [3]:
    df.take(10)
    
    Out[3]:
    [Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2018, 11, 1, 0, 51, 36), tpep_dropoff_datetime=datetime.datetime(2018, 11, 1, 0, 52, 36), passenger_count=1, trip_distance=0.0, RatecodeID=1, store_and_fwd_flag=u'N', PULocationID=145, DOLocationID=145, payment_type=2, fare_amount=2.5, extra=0.5, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, improvement_surcharge=0.3, total_amount=3.8),
     Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2018, 11, 1, 0, 7, 47), tpep_dropoff_datetime=datetime.datetime(2018, 11, 1, 0, 21, 43), passenger_count=1, trip_distance=2.3, RatecodeID=1, store_and_fwd_flag=u'N', PULocationID=142, DOLocationID=164, payment_type=1, fare_amount=11.0, extra=0.5, mta_tax=0.5, tip_amount=2.45, tolls_amount=0.0, improvement_surcharge=0.3, total_amount=14.75),
     Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2018, 11, 1, 0, 24, 27), tpep_dropoff_datetime=datetime.datetime(2018, 11, 1, 0, 34, 29), passenger_count=1, trip_distance=1.8, RatecodeID=1, store_and_fwd_flag=u'N', PULocationID=164, DOLocationID=48, payment_type=2, fare_amount=8.5, extra=0.5, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, improvement_surcharge=0.3, total_amount=9.8),
     Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2018, 11, 1, 0, 35, 27), tpep_dropoff_datetime=datetime.datetime(2018, 11, 1, 0, 47, 2), passenger_count=1, trip_distance=2.3, RatecodeID=1, store_and_fwd_flag=u'N', PULocationID=48, DOLocationID=107, payment_type=1, fare_amount=10.0, extra=0.5, mta_tax=0.5, tip_amount=3.35, tolls_amount=0.0, improvement_surcharge=0.3, total_amount=14.65),
     Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2018, 11, 1, 0, 16, 46), tpep_dropoff_datetime=datetime.datetime(2018, 11, 1, 0, 22, 50), passenger_count=1, trip_distance=1.0, RatecodeID=1, store_and_fwd_flag=u'N', PULocationID=163, DOLocationID=170, payment_type=2, fare_amount=6.0, extra=0.5, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, improvement_surcharge=0.3, total_amount=7.3),
     Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2018, 11, 1, 0, 23, 57), tpep_dropoff_datetime=datetime.datetime(2018, 11, 1, 0, 34, 29), passenger_count=1, trip_distance=2.1, RatecodeID=1, store_and_fwd_flag=u'N', PULocationID=170, DOLocationID=79, payment_type=2, fare_amount=9.0, extra=0.5, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, improvement_surcharge=0.3, total_amount=10.3),
     Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2018, 11, 1, 0, 57, 7), tpep_dropoff_datetime=datetime.datetime(2018, 11, 1, 1, 5, 27), passenger_count=1, trip_distance=1.6, RatecodeID=1, store_and_fwd_flag=u'N', PULocationID=148, DOLocationID=79, payment_type=2, fare_amount=7.5, extra=0.5, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, improvement_surcharge=0.3, total_amount=8.8),
     Row(VendorID=2, tpep_pickup_datetime=datetime.datetime(2018, 11, 1, 1, 3, 28), tpep_dropoff_datetime=datetime.datetime(2018, 11, 1, 1, 32, 10), passenger_count=2, trip_distance=22.09, RatecodeID=2, store_and_fwd_flag=u'N', PULocationID=132, DOLocationID=162, payment_type=2, fare_amount=52.0, extra=0.0, mta_tax=0.5, tip_amount=0.0, tolls_amount=5.76, improvement_surcharge=0.3, total_amount=58.56),
     Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2018, 11, 1, 0, 5, 28), tpep_dropoff_datetime=datetime.datetime(2018, 11, 1, 0, 14, 50), passenger_count=1, trip_distance=1.6, RatecodeID=1, store_and_fwd_flag=u'N', PULocationID=158, DOLocationID=100, payment_type=2, fare_amount=8.0, extra=0.5, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, improvement_surcharge=0.3, total_amount=9.3),
     Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2018, 11, 1, 0, 30, 17), tpep_dropoff_datetime=datetime.datetime(2018, 11, 1, 0, 41, 39), passenger_count=2, trip_distance=1.8, RatecodeID=1, store_and_fwd_flag=u'N', PULocationID=90, DOLocationID=246, payment_type=2, fare_amount=9.5, extra=0.5, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, improvement_surcharge=0.3, total_amount=10.8)]

    Creamos la tabla en SparkSQL denominado taxi que irán los datos del csv a partir del DataFrame extraido

    In [4]:
    df.createOrReplaceTempView('taxi')
    

    Hacemos una línea de prueba

    In [5]:
    spark.sql("Select * from taxi").show()
    
    +--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
    |VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
    +--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
    |       1| 2018-11-01 00:51:36|  2018-11-01 00:52:36|              1|          0.0|         1|                 N|         145|         145|           2|        2.5|  0.5|    0.5|       0.0|         0.0|                  0.3|         3.8|
    |       1| 2018-11-01 00:07:47|  2018-11-01 00:21:43|              1|          2.3|         1|                 N|         142|         164|           1|       11.0|  0.5|    0.5|      2.45|         0.0|                  0.3|       14.75|
    |       1| 2018-11-01 00:24:27|  2018-11-01 00:34:29|              1|          1.8|         1|                 N|         164|          48|           2|        8.5|  0.5|    0.5|       0.0|         0.0|                  0.3|         9.8|
    |       1| 2018-11-01 00:35:27|  2018-11-01 00:47:02|              1|          2.3|         1|                 N|          48|         107|           1|       10.0|  0.5|    0.5|      3.35|         0.0|                  0.3|       14.65|
    |       1| 2018-11-01 00:16:46|  2018-11-01 00:22:50|              1|          1.0|         1|                 N|         163|         170|           2|        6.0|  0.5|    0.5|       0.0|         0.0|                  0.3|         7.3|
    |       1| 2018-11-01 00:23:57|  2018-11-01 00:34:29|              1|          2.1|         1|                 N|         170|          79|           2|        9.0|  0.5|    0.5|       0.0|         0.0|                  0.3|        10.3|
    |       1| 2018-11-01 00:57:07|  2018-11-01 01:05:27|              1|          1.6|         1|                 N|         148|          79|           2|        7.5|  0.5|    0.5|       0.0|         0.0|                  0.3|         8.8|
    |       2| 2018-11-01 01:03:28|  2018-11-01 01:32:10|              2|        22.09|         2|                 N|         132|         162|           2|       52.0|  0.0|    0.5|       0.0|        5.76|                  0.3|       58.56|
    |       1| 2018-11-01 00:05:28|  2018-11-01 00:14:50|              1|          1.6|         1|                 N|         158|         100|           2|        8.0|  0.5|    0.5|       0.0|         0.0|                  0.3|         9.3|
    |       1| 2018-11-01 00:30:17|  2018-11-01 00:41:39|              2|          1.8|         1|                 N|          90|         246|           2|        9.5|  0.5|    0.5|       0.0|         0.0|                  0.3|        10.8|
    |       1| 2018-11-01 00:07:47|  2018-11-01 00:17:42|              2|          2.3|         1|                 N|         158|         162|           2|        9.5|  0.5|    0.5|       0.0|         0.0|                  0.3|        10.8|
    |       1| 2018-11-01 00:18:40|  2018-11-01 00:24:25|              2|          1.0|         1|                 N|         162|         161|           1|        6.0|  0.5|    0.5|      1.45|         0.0|                  0.3|        8.75|
    |       1| 2018-11-01 00:29:40|  2018-11-01 00:49:02|              1|          7.0|         1|                 N|         162|         255|           1|       21.5|  0.5|    0.5|       3.0|        5.76|                  0.3|       31.56|
    |       1| 2018-11-01 00:17:55|  2018-11-01 00:32:05|              2|          1.8|         1|                 N|         100|         107|           1|       11.0|  0.5|    0.5|      1.23|         0.0|                  0.3|       13.53|
    |       1| 2018-11-01 00:37:58|  2018-11-01 00:58:01|              3|          2.1|         1|                 N|         148|         158|           3|       14.0|  0.5|    0.5|       0.0|         0.0|                  0.3|        15.3|
    |       2| 2018-11-01 00:07:07|  2018-11-01 00:34:11|              3|         7.64|         1|                 N|         142|         243|           1|       26.5|  0.5|    0.5|      5.56|         0.0|                  0.3|       33.36|
    |       2| 2018-11-01 00:59:37|  2018-11-01 01:04:10|              1|         0.98|         1|                 N|          24|         238|           1|        5.5|  0.5|    0.5|      1.36|         0.0|                  0.3|        8.16|
    |       1| 2018-11-01 00:46:50|  2018-11-01 01:08:13|              1|          2.8|         1|                 N|         163|         249|           1|       15.5|  0.5|    0.5|       2.0|         0.0|                  0.3|        18.8|
    |       1| 2018-11-01 00:26:12|  2018-11-01 00:34:22|              1|          2.0|         1|                 N|         237|         137|           2|        8.5|  0.5|    0.5|       0.0|         0.0|                  0.3|         9.8|
    |       1| 2018-11-01 00:37:51|  2018-11-01 00:39:46|              1|          0.1|         1|                 N|         137|         137|           2|        3.5|  0.5|    0.5|       0.0|         0.0|                  0.3|         4.8|
    +--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
    only showing top 20 rows
    
    
    In [6]:
    dfTime=spark.sql("Select tpep_pickup_datetime from taxi").withColumn('time',date_format('tpep_pickup_datetime','HH:mm:ss')).withColumn('date',date_format('tpep_pickup_datetime','yyyy-MM-dd'))
    
    In [7]:
    dfTime.show()
    
    +--------------------+--------+----------+
    |tpep_pickup_datetime|    time|      date|
    +--------------------+--------+----------+
    | 2018-11-01 00:51:36|00:51:36|2018-11-01|
    | 2018-11-01 00:07:47|00:07:47|2018-11-01|
    | 2018-11-01 00:24:27|00:24:27|2018-11-01|
    | 2018-11-01 00:35:27|00:35:27|2018-11-01|
    | 2018-11-01 00:16:46|00:16:46|2018-11-01|
    | 2018-11-01 00:23:57|00:23:57|2018-11-01|
    | 2018-11-01 00:57:07|00:57:07|2018-11-01|
    | 2018-11-01 01:03:28|01:03:28|2018-11-01|
    | 2018-11-01 00:05:28|00:05:28|2018-11-01|
    | 2018-11-01 00:30:17|00:30:17|2018-11-01|
    | 2018-11-01 00:07:47|00:07:47|2018-11-01|
    | 2018-11-01 00:18:40|00:18:40|2018-11-01|
    | 2018-11-01 00:29:40|00:29:40|2018-11-01|
    | 2018-11-01 00:17:55|00:17:55|2018-11-01|
    | 2018-11-01 00:37:58|00:37:58|2018-11-01|
    | 2018-11-01 00:07:07|00:07:07|2018-11-01|
    | 2018-11-01 00:59:37|00:59:37|2018-11-01|
    | 2018-11-01 00:46:50|00:46:50|2018-11-01|
    | 2018-11-01 00:26:12|00:26:12|2018-11-01|
    | 2018-11-01 00:37:51|00:37:51|2018-11-01|
    +--------------------+--------+----------+
    only showing top 20 rows
    
    

    A) Contar o número de viaxes entre 00:00 e 01:00 de cada día e dar o total por día (agrupados por día, tódolos días teñen un reconto).¶

  • Sintaxe antiga e directa con DataFrame
  • ¶

  • Chamadas á API
  • ¶

    In [8]:
    from pyspark.sql import Row
    
    dfFilter=rdd.filter(lambda x: x[2].hour==0 or (x[2].hour==1 and x[2].minute==0 and x[2].second==0)).map(lambda y: Row(tpep_pickup_datetime=y[1])).toDF()
    
    dfFilter.withColumn('date',date_format('tpep_pickup_datetime','yyyy-MM-dd')).groupBy('date').count().sort(['date']).show()
    
    +----------+-----+
    |      date|count|
    +----------+-----+
    |2008-12-31|    1|
    |2009-01-01|    6|
    |2018-10-31|  169|
    |2018-11-01|10356|
    |2018-11-02|10735|
    |2018-11-03|14353|
    |2018-11-04|12743|
    |2018-11-05| 4584|
    |2018-11-06| 5603|
    |2018-11-07| 7295|
    |2018-11-08| 9171|
    |2018-11-09|12401|
    |2018-11-10|15186|
    |2018-11-11|13233|
    |2018-11-12| 5102|
    |2018-11-13| 6286|
    |2018-11-14| 7492|
    |2018-11-15| 8981|
    |2018-11-16|10524|
    |2018-11-17|14928|
    +----------+-----+
    only showing top 20 rows
    
    

    B) Contar o número de viaxes entre 00:00 e 01:00 de cada día e dar o total por mes.¶

  • Sintaxe antiga e directa con DataFrame
  • ¶

  • Chamadas á API
  • ¶

    In [9]:
    dfFilter=rdd.filter(lambda x: x[2].hour==0 or (x[2].hour==1 and x[2].minute==0 and x[2].second==0)).map(lambda y: Row(tpep_pickup_datetime=y[1])).toDF()
    
    dfFilter.withColumn('month',date_format('tpep_pickup_datetime','MM')).groupBy('month').count().sort(['month']).show()
    
    +-----+------+
    |month| count|
    +-----+------+
    |   01|     6|
    |   10|   169|
    |   11|269617|
    |   12|    88|
    +-----+------+
    
    

    C) A media de viaxes ao mes que fai cada conductor.¶

  • Sintaxe antiga e directa con DataFrame
  • ¶

  • Spark SQL
  • ¶

    In [10]:
    from pyspark.sql.functions import date_format, avg
    
    result = spark.sql("""
        SELECT VendorID,
               date_format(tpep_pickup_datetime, 'MM') AS date,
               COUNT(*) AS trip_count
        FROM taxi
        GROUP BY VendorID, date
    """).groupBy('VendorID').agg(avg('trip_count').alias('Viajes por mes')).orderBy('VendorID')
    
    result.show()
    
    +--------+--------------+
    |VendorID|Viajes por mes|
    +--------+--------------+
    |       1|     3262928.0|
    |       2|    1187854.75|
    |       4|      130817.0|
    +--------+--------------+
    
    

    D) A media de viaxes ao día que fai cada conductor.¶

    Spark SQL¶

    In [11]:
    spark.sql("""
        SELECT VendorID,
               AVG(trips_per_day) AS VIAJES_X_DIA
        FROM (
            SELECT VendorID,
                  DATE(tpep_pickup_datetime) AS pickup_date,
                   COUNT(*) AS trips_per_day
            FROM taxi
            GROUP BY VendorID, DATE(tpep_pickup_datetime)
        ) AS VIAJES_X_DIA_V
        GROUP BY VendorID
        ORDER BY VendorID
    """).show()
    
    #df.withColumn('time',date_format('tpep_pickup_datetime','yyyy-MM')).filter("time = '2018-11'").orderBy('tpep_pickup_datetime').count()
    
    +--------+------------------+
    |VendorID|      VIAJES_X_DIA|
    +--------+------------------+
    |       1|108764.26666666666|
    |       2|        118785.475|
    |       4| 4360.566666666667|
    +--------+------------------+
    
    

    E) Cantos pasaxeiros foron como máximo na primeira semana do mes (nunha viaxe).¶

    Spark SQL¶

    In [12]:
    spark.sql("""
        SELECT YEAR(tpep_pickup_datetime) as Anio,
            MONTH(tpep_pickup_datetime) as Mes,
            MAX(passenger_count) as Pasajeros
        FROM taxi
        WHERE DAY(tpep_pickup_datetime) <= 7
        GROUP BY Anio, Mes
    """).show()
    
    +----+---+---------+
    |Anio|Mes|Pasajeros|
    +----+---+---------+
    |2009|  1|        5|
    |2019|  1|        1|
    |2018| 12|        6|
    |2018| 11|        9|
    +----+---+---------+
    
    

    F) Cantos pasaxeiros foron como máximo en todo o mes (nunha viaxe).¶

    Sintaxe antiga e directa con DataFrame¶

    In [13]:
    from pyspark.sql.functions import year, month, max,min
    
    taxi_df = df.withColumn("passenger_count", df["passenger_count"].cast("int"))
    
    max_passengers_per_month = taxi_df.groupBy(year("tpep_pickup_datetime").alias("Anio"), month("tpep_pickup_datetime").alias("Mes")).agg(max("passenger_count").alias("Max_de_pasajeros")).orderBy(["Anio","Mes"])
    
    max_passengers_per_month.show()
    
    +----+---+----------------+
    |Anio|Mes|Max_de_pasajeros|
    +----+---+----------------+
    |2008| 12|               5|
    |2009|  1|               5|
    |2018| 10|               6|
    |2018| 11|              96|
    |2018| 12|               6|
    |2019|  1|               1|
    |2019| 11|               5|
    +----+---+----------------+
    
    

    G) Cantos cartos costou o percorrido máis caro.¶

  • Spark SQL
  • ¶

  • Sintaxe antiga e directa con DataFrame
  • ¶

  • RDD
  • ¶

    In [14]:
    max_fare = df.agg(max('fare_amount').alias('MAS_CARO'))
    
    max_fare.show()
    
    +---------+
    | MAS_CARO|
    +---------+
    |187436.46|
    +---------+
    
    
    In [15]:
    spark.sql("""
        SELECT MAX(fare_amount) AS MAS_CARO
        FROM taxi
    """).show()
    
    +---------+
    | MAS_CARO|
    +---------+
    |187436.46|
    +---------+
    
    
    In [20]:
    print("MAS_CARO:",rdd.max(lambda x:x[10]).__getitem__('fare_amount'))
    
    ('MAS_CARO:', 187436.46)
    

    H) Cantos cartos costou o percorrido máis barato.¶

  • Spark SQL
  • ¶

  • Sintaxe antiga e directa con DataFrame
  • ¶

  • RDD
  • ¶

    In [21]:
    spark.sql("""
        SELECT MIN(fare_amount) AS MAS_BARATO
        FROM taxi
    """).show()
    
    +----------+
    |MAS_BARATO|
    +----------+
    |    -450.0|
    +----------+
    
    
    In [22]:
    min_fare = df.agg(min('fare_amount').alias('MAS_BARATO'))
    
    min_fare.show()
    
    +----------+
    |MAS_BARATO|
    +----------+
    |    -450.0|
    +----------+
    
    
    In [23]:
    print("MAS_BARATO:",rdd.min(lambda x:x[10]).__getitem__('fare_amount'))
    
    ('MAS_BARATO:', -450.0)