Extraer con selenium datos de licitacións do Concello da Coruña¶

Importación¶

Inicializamos el GeckoDriver con la última versión y le decimos que utilice FireFox

In [ ]:
#Instalar gecko
from webdriver_manager.firefox import GeckoDriverManager


#Abrir un navegador
from selenium import webdriver
from selenium.webdriver.common.by import By
driver = webdriver.Firefox(executable_path=GeckoDriverManager(version="v0.34.0").install())
In [ ]:
import argparse
from enum import Enum
import sys
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.select import Select
from selenium.webdriver.common.action_chains import ActionChains
import time
import random

Definimos la ruta donde se guardarán las capturas de pantalla definidas por el Driver

In [ ]:
import pathlib

path=str(pathlib.Path().resolve())+"/Imagenes/"

Métodos¶

In [ ]:
def waitFinishLoad(driver,idLoader,needsDisapeared=False,by=By.ID,needsWait=True):
    """Espera a la carga de un elemento antes de continuar

    Args:
        driver (WebDriver): WebDriver
        idLoader (str): Id a buscar, puede ser XPath...
        needsDisapeared (bool, optional): Si el elemento a esperar tiene que aparecer o desaparecer como un CircularProgress. Defaults to False.
        by (str, optional): Tipo de By a buscar el Wait. Defaults to By.ID.
        needsWait (bool, optional): Si necesita esperar un extra para que no pueda saltar un Captcha. Defaults to False.
    """
    if idLoader is None:
        return
    WebDriverWait(driver, timeout=60).until(EC.presence_of_element_located((by, idLoader)))
    if needsDisapeared:
        WebDriverWait(driver, timeout=60).until(EC.invisibility_of_element((by, idLoader)))
    if needsWait:
        time.sleep(random.choice([0.05,0.1,0.2]))

def elementBy(query,driver,**kargs):
    """Devuelve un element|None dependiendo de la consulta

    Args:
        query (str): Busqueda a realizar del elemento
        driver (WebDriver): WebDriver

    Returns:
        Element|None: Elemento de la búsqueda
    """
    if not kargs.get("needsWait",False):
        waitFinishLoad(driver,kargs.get("idLoader",None),by=kargs.get("by",By.ID),needsWait=kargs.get("needsWait",False))
    try:
        element=driver.find_element(kargs.get("byq",By.XPATH),query)
        if kargs.get("screenshot"):
            driver.execute_script("arguments[0].scrollIntoView();", element)
            screenshot(driver,kargs.get("screenshot"))
    except:
        screenshot(driver,kargs.get("screenshot"))
        return None
    waitFinishLoad(driver,kargs.get("idLoader",None),by=kargs.get("by",By.ID),needsWait=kargs.get("needsWait",False))
    return element


def elementByClick(query,driver,**kargs):
    """Hace click en un elemento

    Args:
        query (str): Busqueda a realizar del elemento
        driver (WebDriver): WebDriver
    """
    if not kargs.get("needsWait",False):
        waitFinishLoad(driver,kargs.get("idLoader",None),by=kargs.get("by",By.ID),needsWait=kargs.get("needsWait",False))
    if kargs.get("screenshot"):
        screenshot(driver,kargs.get("screenshot"))
    element=driver.find_element(kargs.get("byq",By.XPATH),query)
    element.click()
    waitFinishLoad(driver,kargs.get("idLoader",None),by=kargs.get("by",By.ID),needsWait=kargs.get("needsWait",False))
    return element


def elementsBy(query,driver,**kargs):
    """Devuelve una lista de elements dependiendo de la consulta

    Args:
        query (str): Busqueda a realizar del elemento
        driver (WebDriver): WebDriver

    Returns:
        List<Element>: Lista de elementos
    """
    if not kargs.get("needsWait",False):
        waitFinishLoad(driver,kargs.get("idLoader",None),by=kargs.get("by",By.ID),needsWait=kargs.get("needsWait",False))
    element=driver.find_elements(kargs.get("byq",By.XPATH),query)
    waitFinishLoad(driver,kargs.get("idLoader",None),by=kargs.get("by",By.ID),needsWait=kargs.get("needsWait",False))
    return element


def clickEvent(driver,element,**kargs):
    """Click en un elemento que funciona con evento

    Args:
        driver (WebDriver): WebDriver
        element (Element): Elemento que tiene el evento
    """
    if not kargs.get("needsWait",False):
        waitFinishLoad(driver,kargs.get("idLoader",None),by=kargs.get("by",By.ID),needsWait=kargs.get("needsWait",False))
    if kargs.get("normal",False):
        element.click
    else:
        try:
            driver.execute_script(element.get_dom_attribute("onClick"))
        except:
            element.click
    waitFinishLoad(driver,kargs.get("idLoader",None),by=kargs.get("by",By.ID),needsWait=kargs.get("needsWait",False))
    if kargs.get("screenshot"):
        screenshot(driver,kargs.get("screenshot"))

    
def clickElement(driver,element,**kargs):
    """Click en un elemento que funciona con evento

    Args:
        driver (WebDriver): WebDriver
        element (Element): Elemento que tiene el evento
    """


def goToNextPage(driver):
    """Cambia de página en la pantalla de siguiente

    Args:
        driver (WebDriver): WebDriver
    """
    element=elementBy("//input[@value='Siguiente']",driver,idLoader="//table[@id='myTablaBusquedaCustom']",by=By.XPATH)
    driver.execute_script("arguments[0].scrollIntoView();", element)
    element.click()


def checkEndPage(driver,**kargs):
    """Comprueba si es el final y no hay más siguiente devuelve un booleano

    Args:
        driver (WebDriver): WebDriver

    Returns:
        Bool: True si es el final o False si no es
    """
    element=elementBy("//input[@value='Siguiente']",driver,idLoader="//table[@id='myTablaBusquedaCustom']",by=By.XPATH)
    if kargs.get("screenshot"):
        if element:
            driver.execute_script("arguments[0].scrollIntoView();", element)
        else:
            driver.execute_script("arguments[0].scrollIntoView();", elementBy("//input[@value='Último']",driver))
        screenshot(driver,kargs.get("screenshot"))
    if element:
        return True
    return False


def screenshot(driver,filename):
    """Saca la captura de pantalla y lo guarda en la ruta Imagenes

    Args:
        driver (WebDriver): WebDriver
        filename (str): Nombre de la captura de pantalla
    """
    if filename:
        driver.save_screenshot(path+filename)

Scrapping¶

Entramos en la ruta de la contratación del estado

In [ ]:
driver.get("https://contrataciondelestado.es")

Vamos al apartado de públicaciones

In [ ]:
elementBy("//a[@title='Buscar publicaciones']",driver,idLoader="footer-newShow",screenshot="Inicio.png").click()

Vamos al apartado de licitaciones

In [ ]:
# No es un enlace cualquiera, este esta gestionado por un script que controlaremos por código 

licitacion=elementBy("//div[@class='divLogo']/a",driver,idLoader="footer-newShow",screenshot="Busca.png")

clickEvent(driver,licitacion)

Le damos al texto de búsqueda avanzada

In [ ]:
busqueda_avanzada=elementBy("//div[@class='capaAvanzada']/a",driver,by=By.XPATH,idLoader="//div[@class='capaAvanzada']",screenshot="Formulario.png")

clickEvent(driver,busqueda_avanzada)

Y seleccionamos el elemento seleccionar para ir a la zona de Organización contratante

In [ ]:
seleccionar=elementBy("//div[@class='inlinebloque   ']/a",driver,by=By.XPATH,idLoader="//div[@class='inlinebloque   ']/a",screenshot="FormularioAvanzado.png")
clickEvent(driver,seleccionar)

Definición método en búcle de la lista para obtener el último apartado (Coruña)

In [ ]:
def get_corunha(list,driver,i=2):
    """ Va através de la lisa recorriendo hasta llegar el último elemento que lo devuelve

    Args:
        list (str): Textos
        driver (WebDriver): WebDriver
        i (int, optional): Valor inicial. Defaults to 2.
    """
    driver.save_screenshot(path+f"Sector-{list[i-2] if len(list)!=i-1 else f'{list[i-2]}2'}.png")
    if i-2<len(list):
        q=f"//*[text()='{list[i-2]}']"
        if len(list)!=i-1:
            q+="/../.."
            query=f"{q}/td[{i}]"
            total = f"{q}/../../../.." if i!=2 else 'footer-newShow'
        else:
            query=q
            q2=f"{q}/../.."
            total = f"{q2}/../../../.."
        if len(list)!=i-1:
            elementByClick(query,driver,by=By.XPATH if i!=2 else By.ID,idLoader=total)
        else:
            return elementsBy(query,driver,by=By.XPATH if i!=2 else By.ID,idLoader=total)
        if i-1!=len(list):
            return get_corunha(list,driver,i+1)

Esperamos a que cargue el primer elemento y en forma de bucle vamos esperando que el elemento cargue la lista que tiene abajo en forma de "Tree" y vaya abriendo de manera repetitiva, hasta obtener el último elemento y le daremos click al texto. Tras esto le diremos que escoja el primer órgano de contratación y le damos a añadir

In [ ]:
waitFinishLoad(driver,by=By.XPATH,idLoader="//*[text()='ENTIDADES LOCALES']")
get_corunha(['ENTIDADES LOCALES','Galicia','A Coruña','Ayuntamientos','A Coruña'],driver)[1].click()
elementByClick("//span/select/option[1]",driver,idLoader="//span/select/option[1]",by=By.XPATH)
elementByClick("//*[@value='Añadir']",driver)

Le damos al botón de buscar

In [ ]:
elementByClick("//*[@title='Buscar']",driver,idLoader="//*[@title='Buscar']",by=By.XPATH)

Y le decimos en un bucle que vaya añadiendo a un DataFrame los valores de la tabla y que si es la última pare el bucle

También entra en cada uno de los expedientes y saca captura

Solo las dos primeras para probar, si se quitara el if hace de todas hasta el final

In [ ]:
from io import StringIO
from bs4 import BeautifulSoup
import pandas as pd


dfT = pd.DataFrame([])
i=0
while(True):

    #region DataFrame
    waitFinishLoad(driver,by=By.XPATH,idLoader="//div/span[text()='Importe']")
    i+=1
    url = driver.current_url
    soup=BeautifulSoup(driver.page_source,'html.parser')
    df=pd.read_html(StringIO(str(soup.find(id="myTablaBusquedaCustom"))))[0]
    df=df[~df.iloc[:,0].str.startswith('Página')]
    dfT=pd.concat([dfT,df])
    #endregion
    
    #region Expediente Independiente
    #if i<3:  #SE TENDRÍA QUE BORRAR LA LÍNEA PARA SACAR CAPTURAS DE TODOS LOS EXPEDIENTES, SOLO SON LAS DOS PRIMERAS PÁGINAS
    elements = elementsBy("//tbody/tr/td/div/a[2]",driver)
    for x in range(len(elements)):
        element = elementsBy("//tbody/tr/td/div/a[2]",driver,by=By.XPATH,idLoader="//div/span[text()='Importe']",needsWait=False)
        
        # Abre en una nueva tab el expediente, le saca captura y la cierra
        driver.execute_script(f"window.open('{element[x].get_dom_attribute('href')}')")
        driver.switch_to.window(driver.window_handles[1])
        try:
            waitFinishLoad(driver,by=By.XPATH,idLoader="//ul/li/span[text()='Expediente:']",needsWait=False)
            screenshot(driver,f"Expedientes/ExpIndiv/expediente-P{i}-{x}.png")
        except:
            print(f"NO LE DIÓ TIEMPO, se saltará el {i}-{x}")
        driver.close()
        driver.switch_to.window(driver.window_handles[0])
        driver.get(url)
    #endregion

    if not checkEndPage(driver,screenshot=f"Expedientes/Tablas/PaginaTabla{i}.png"):
        break
    goToNextPage(driver)

driver.close()

Limpiamos un poco las columnas y le decimos que si hay una minúscula más una mayúscula juntas que añada un /n, esto significa que debería haber un salto de línea y remplazamos las , y . para que no de problemas el Float

In [ ]:
from numpy import float64


for i in dfT.columns:
    dfT[i] = dfT[i].str.replace(r'(?<![A-Z "“()])([A-Z]+)', lambda x: f"\\n{x.group(1)}" if x.start() != 0 else x.group(1), regex=True)

dfT['Importe'] = dfT['Importe'].apply(lambda x: float(x.replace('.', '').replace(',', '.')))

Le decimos que rellene los valores con 0 en este caso los floats y para el resto que sean nulos

In [ ]:
import numpy as np


dfT['Importe']=dfT['Importe'].fillna(0.0)
dfT = dfT.replace({np.nan: None})

Instalamos SQL Server en un docker

SQL Server

  • docker run -e "ACCEPT_EULA=Y" -e "MSSQL_SA_PASSWORD=Abcd1234." -e "MSSQL_PID=Evaluation" -p 41433:1433 --name sqlpreview --hostname sqlpreview -d mcr.microsoft.com/mssql/server:2022-preview-ubuntu-22.04

Teniendo en cuenta que la contraseña tiene que tener 8 caracteres y mayúscula

Instalamos pyodbc para manejar el SQL Server

In [ ]:
#!conda install -c conda-forge -y pyodbc

Instalación Driver¶

WINDOWS¶

Instalar el Driver desde la página del enlace con la versión 18

ODBC Driver .msi para Windows

DEBIAN¶

curl https://packages.microsoft.com/keys/microsoft.asc | sudo tee /etc/apt/trusted.gpg.d/microsoft.asc

Escoja la versión necesaria para la instalación

Debian 9¶

curl https://packages.microsoft.com/config/debian/9/prod.list | sudo tee /etc/apt/sources.list.d/mssql-release.list

Debian 10¶

curl https://packages.microsoft.com/config/debian/10/prod.list | sudo tee /etc/apt/sources.list.d/mssql-release.list

Debian 11¶

curl https://packages.microsoft.com/config/debian/11/prod.list | sudo tee /etc/apt/sources.list.d/mssql-release.list

Debian 12¶

curl https://packages.microsoft.com/config/debian/12/prod.list | sudo tee /etc/apt/sources.list.d/mssql-release.list

sudo apt-get update sudo ACCEPT_EULA=Y apt-get install -y msodbcsql18

optional: for bcp and sqlcmd

sudo ACCEPT_EULA=Y apt-get install -y mssql-tools18 echo 'export PATH="$PATH:/opt/mssql-tools18/bin"' >> ~/.bashrc source ~/.bashrc

optional: for unixODBC development headers

sudo apt-get install -y unixodbc-dev

optional: kerberos library for debian-slim distributions

sudo apt-get install -y libgssapi-krb5-2

© MicrosoftDocs

Manejo SQL Server¶

Inicializamos la conexión con el ODBC 18 descrgado anteriormente

In [ ]:
import pyodbc

connectionString = f'DRIVER={{ODBC Driver 18 for SQL Server}};SERVER=IP,41433;DATABASE=Concellos;UID=sa;PWD=Abcd1234.;Encrypt=no'
conexion = pyodbc.connect(connectionString)

Eliminamos en este caso la tabla si existe para no repetir datos

In [ ]:
cursor = conexion.cursor()
cursor.execute("""Drop TABLE if exists ConcelloDatos""")
cursor.commit()

Creamos la tabla con los valores, siendo las tablas con [] nombres con espacios

In [ ]:
cursor = conexion.cursor()
cursor.execute("""CREATE TABLE Concellos.dbo.ConcelloDatos (
	              Expediente varchar(800) NULL,
	              [Tipo de Contrato] varchar(400) NULL,
	              Estado varchar(50) NULL,
	              Importe decimal(38,0) NULL,
	              Fechas varchar(200) NULL,
	              [Órgano de Contratación] varchar(100) NULL
)
""")
cursor.commit()

Ejecutamos un executemany para jugar un PreparedStatment es decir una consulta con parámetros y le pasamos los valores del DataFrame

In [ ]:
cursor.executemany("""
                   INSERT INTO ConcelloDatos(Expediente,[Tipo de Contrato],Estado,Importe,Fechas,[Órgano de Contratación])
                   Values (?,?,?,?,?,?)
""",dfT.values.tolist())
cursor.commit()

#SQL ALCHEMY
#dfT.to_sql("ConcelloDatos",connectionString)

Hacemos un Select * de prueba y vemos por ejemplo la columna "Expediente"

In [ ]:
cursor.execute("""
                Select * 
                From ConcelloDatos
""")
tuplas = cursor.fetchall()
for tupla in tuplas:
    print(tupla.Expediente)

Cerramos la conexión a SQL Server y su Cursor

In [ ]:
cursor.close()
conexion.close()