Inicializamos el GeckoDriver con la última versión y le decimos que utilice FireFox
#Instalar gecko
from webdriver_manager.firefox import GeckoDriverManager
#Abrir un navegador
from selenium import webdriver
from selenium.webdriver.common.by import By
driver = webdriver.Firefox(executable_path=GeckoDriverManager(version="v0.34.0").install())
import argparse
from enum import Enum
import sys
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.select import Select
from selenium.webdriver.common.action_chains import ActionChains
import time
import random
Definimos la ruta donde se guardarán las capturas de pantalla definidas por el Driver
import pathlib
path=str(pathlib.Path().resolve())+"/Imagenes/"
def waitFinishLoad(driver,idLoader,needsDisapeared=False,by=By.ID,needsWait=True):
"""Espera a la carga de un elemento antes de continuar
Args:
driver (WebDriver): WebDriver
idLoader (str): Id a buscar, puede ser XPath...
needsDisapeared (bool, optional): Si el elemento a esperar tiene que aparecer o desaparecer como un CircularProgress. Defaults to False.
by (str, optional): Tipo de By a buscar el Wait. Defaults to By.ID.
needsWait (bool, optional): Si necesita esperar un extra para que no pueda saltar un Captcha. Defaults to False.
"""
if idLoader is None:
return
WebDriverWait(driver, timeout=60).until(EC.presence_of_element_located((by, idLoader)))
if needsDisapeared:
WebDriverWait(driver, timeout=60).until(EC.invisibility_of_element((by, idLoader)))
if needsWait:
time.sleep(random.choice([0.05,0.1,0.2]))
def elementBy(query,driver,**kargs):
"""Devuelve un element|None dependiendo de la consulta
Args:
query (str): Busqueda a realizar del elemento
driver (WebDriver): WebDriver
Returns:
Element|None: Elemento de la búsqueda
"""
if not kargs.get("needsWait",False):
waitFinishLoad(driver,kargs.get("idLoader",None),by=kargs.get("by",By.ID),needsWait=kargs.get("needsWait",False))
try:
element=driver.find_element(kargs.get("byq",By.XPATH),query)
if kargs.get("screenshot"):
driver.execute_script("arguments[0].scrollIntoView();", element)
screenshot(driver,kargs.get("screenshot"))
except:
screenshot(driver,kargs.get("screenshot"))
return None
waitFinishLoad(driver,kargs.get("idLoader",None),by=kargs.get("by",By.ID),needsWait=kargs.get("needsWait",False))
return element
def elementByClick(query,driver,**kargs):
"""Hace click en un elemento
Args:
query (str): Busqueda a realizar del elemento
driver (WebDriver): WebDriver
"""
if not kargs.get("needsWait",False):
waitFinishLoad(driver,kargs.get("idLoader",None),by=kargs.get("by",By.ID),needsWait=kargs.get("needsWait",False))
if kargs.get("screenshot"):
screenshot(driver,kargs.get("screenshot"))
element=driver.find_element(kargs.get("byq",By.XPATH),query)
element.click()
waitFinishLoad(driver,kargs.get("idLoader",None),by=kargs.get("by",By.ID),needsWait=kargs.get("needsWait",False))
return element
def elementsBy(query,driver,**kargs):
"""Devuelve una lista de elements dependiendo de la consulta
Args:
query (str): Busqueda a realizar del elemento
driver (WebDriver): WebDriver
Returns:
List<Element>: Lista de elementos
"""
if not kargs.get("needsWait",False):
waitFinishLoad(driver,kargs.get("idLoader",None),by=kargs.get("by",By.ID),needsWait=kargs.get("needsWait",False))
element=driver.find_elements(kargs.get("byq",By.XPATH),query)
waitFinishLoad(driver,kargs.get("idLoader",None),by=kargs.get("by",By.ID),needsWait=kargs.get("needsWait",False))
return element
def clickEvent(driver,element,**kargs):
"""Click en un elemento que funciona con evento
Args:
driver (WebDriver): WebDriver
element (Element): Elemento que tiene el evento
"""
if not kargs.get("needsWait",False):
waitFinishLoad(driver,kargs.get("idLoader",None),by=kargs.get("by",By.ID),needsWait=kargs.get("needsWait",False))
if kargs.get("normal",False):
element.click
else:
try:
driver.execute_script(element.get_dom_attribute("onClick"))
except:
element.click
waitFinishLoad(driver,kargs.get("idLoader",None),by=kargs.get("by",By.ID),needsWait=kargs.get("needsWait",False))
if kargs.get("screenshot"):
screenshot(driver,kargs.get("screenshot"))
def clickElement(driver,element,**kargs):
"""Click en un elemento que funciona con evento
Args:
driver (WebDriver): WebDriver
element (Element): Elemento que tiene el evento
"""
def goToNextPage(driver):
"""Cambia de página en la pantalla de siguiente
Args:
driver (WebDriver): WebDriver
"""
element=elementBy("//input[@value='Siguiente']",driver,idLoader="//table[@id='myTablaBusquedaCustom']",by=By.XPATH)
driver.execute_script("arguments[0].scrollIntoView();", element)
element.click()
def checkEndPage(driver,**kargs):
"""Comprueba si es el final y no hay más siguiente devuelve un booleano
Args:
driver (WebDriver): WebDriver
Returns:
Bool: True si es el final o False si no es
"""
element=elementBy("//input[@value='Siguiente']",driver,idLoader="//table[@id='myTablaBusquedaCustom']",by=By.XPATH)
if kargs.get("screenshot"):
if element:
driver.execute_script("arguments[0].scrollIntoView();", element)
else:
driver.execute_script("arguments[0].scrollIntoView();", elementBy("//input[@value='Último']",driver))
screenshot(driver,kargs.get("screenshot"))
if element:
return True
return False
def screenshot(driver,filename):
"""Saca la captura de pantalla y lo guarda en la ruta Imagenes
Args:
driver (WebDriver): WebDriver
filename (str): Nombre de la captura de pantalla
"""
if filename:
driver.save_screenshot(path+filename)
Entramos en la ruta de la contratación del estado
driver.get("https://contrataciondelestado.es")
Vamos al apartado de públicaciones
elementBy("//a[@title='Buscar publicaciones']",driver,idLoader="footer-newShow",screenshot="Inicio.png").click()
Vamos al apartado de licitaciones
# No es un enlace cualquiera, este esta gestionado por un script que controlaremos por código
licitacion=elementBy("//div[@class='divLogo']/a",driver,idLoader="footer-newShow",screenshot="Busca.png")
clickEvent(driver,licitacion)
Le damos al texto de búsqueda avanzada
busqueda_avanzada=elementBy("//div[@class='capaAvanzada']/a",driver,by=By.XPATH,idLoader="//div[@class='capaAvanzada']",screenshot="Formulario.png")
clickEvent(driver,busqueda_avanzada)
Y seleccionamos el elemento seleccionar para ir a la zona de Organización contratante
seleccionar=elementBy("//div[@class='inlinebloque ']/a",driver,by=By.XPATH,idLoader="//div[@class='inlinebloque ']/a",screenshot="FormularioAvanzado.png")
clickEvent(driver,seleccionar)
Definición método en búcle de la lista para obtener el último apartado (Coruña)
def get_corunha(list,driver,i=2):
""" Va através de la lisa recorriendo hasta llegar el último elemento que lo devuelve
Args:
list (str): Textos
driver (WebDriver): WebDriver
i (int, optional): Valor inicial. Defaults to 2.
"""
driver.save_screenshot(path+f"Sector-{list[i-2] if len(list)!=i-1 else f'{list[i-2]}2'}.png")
if i-2<len(list):
q=f"//*[text()='{list[i-2]}']"
if len(list)!=i-1:
q+="/../.."
query=f"{q}/td[{i}]"
total = f"{q}/../../../.." if i!=2 else 'footer-newShow'
else:
query=q
q2=f"{q}/../.."
total = f"{q2}/../../../.."
if len(list)!=i-1:
elementByClick(query,driver,by=By.XPATH if i!=2 else By.ID,idLoader=total)
else:
return elementsBy(query,driver,by=By.XPATH if i!=2 else By.ID,idLoader=total)
if i-1!=len(list):
return get_corunha(list,driver,i+1)
Esperamos a que cargue el primer elemento y en forma de bucle vamos esperando que el elemento cargue la lista que tiene abajo en forma de "Tree" y vaya abriendo de manera repetitiva, hasta obtener el último elemento y le daremos click al texto. Tras esto le diremos que escoja el primer órgano de contratación y le damos a añadir
waitFinishLoad(driver,by=By.XPATH,idLoader="//*[text()='ENTIDADES LOCALES']")
get_corunha(['ENTIDADES LOCALES','Galicia','A Coruña','Ayuntamientos','A Coruña'],driver)[1].click()
elementByClick("//span/select/option[1]",driver,idLoader="//span/select/option[1]",by=By.XPATH)
elementByClick("//*[@value='Añadir']",driver)
Le damos al botón de buscar
elementByClick("//*[@title='Buscar']",driver,idLoader="//*[@title='Buscar']",by=By.XPATH)
Y le decimos en un bucle que vaya añadiendo a un DataFrame los valores de la tabla y que si es la última pare el bucle
También entra en cada uno de los expedientes y saca captura
Solo las dos primeras para probar, si se quitara el if hace de todas hasta el final
from io import StringIO
from bs4 import BeautifulSoup
import pandas as pd
dfT = pd.DataFrame([])
i=0
while(True):
#region DataFrame
waitFinishLoad(driver,by=By.XPATH,idLoader="//div/span[text()='Importe']")
i+=1
url = driver.current_url
soup=BeautifulSoup(driver.page_source,'html.parser')
df=pd.read_html(StringIO(str(soup.find(id="myTablaBusquedaCustom"))))[0]
df=df[~df.iloc[:,0].str.startswith('Página')]
dfT=pd.concat([dfT,df])
#endregion
#region Expediente Independiente
#if i<3: #SE TENDRÍA QUE BORRAR LA LÍNEA PARA SACAR CAPTURAS DE TODOS LOS EXPEDIENTES, SOLO SON LAS DOS PRIMERAS PÁGINAS
elements = elementsBy("//tbody/tr/td/div/a[2]",driver)
for x in range(len(elements)):
element = elementsBy("//tbody/tr/td/div/a[2]",driver,by=By.XPATH,idLoader="//div/span[text()='Importe']",needsWait=False)
# Abre en una nueva tab el expediente, le saca captura y la cierra
driver.execute_script(f"window.open('{element[x].get_dom_attribute('href')}')")
driver.switch_to.window(driver.window_handles[1])
try:
waitFinishLoad(driver,by=By.XPATH,idLoader="//ul/li/span[text()='Expediente:']",needsWait=False)
screenshot(driver,f"Expedientes/ExpIndiv/expediente-P{i}-{x}.png")
except:
print(f"NO LE DIÓ TIEMPO, se saltará el {i}-{x}")
driver.close()
driver.switch_to.window(driver.window_handles[0])
driver.get(url)
#endregion
if not checkEndPage(driver,screenshot=f"Expedientes/Tablas/PaginaTabla{i}.png"):
break
goToNextPage(driver)
driver.close()
Limpiamos un poco las columnas y le decimos que si hay una minúscula más una mayúscula juntas que añada un /n, esto significa que debería haber un salto de línea y remplazamos las , y . para que no de problemas el Float
from numpy import float64
for i in dfT.columns:
dfT[i] = dfT[i].str.replace(r'(?<![A-Z "“()])([A-Z]+)', lambda x: f"\\n{x.group(1)}" if x.start() != 0 else x.group(1), regex=True)
dfT['Importe'] = dfT['Importe'].apply(lambda x: float(x.replace('.', '').replace(',', '.')))
Le decimos que rellene los valores con 0 en este caso los floats y para el resto que sean nulos
import numpy as np
dfT['Importe']=dfT['Importe'].fillna(0.0)
dfT = dfT.replace({np.nan: None})
Instalamos SQL Server en un docker
SQL Server
Teniendo en cuenta que la contraseña tiene que tener 8 caracteres y mayúscula
Instalamos pyodbc para manejar el SQL Server
#!conda install -c conda-forge -y pyodbc
Instalar el Driver desde la página del enlace con la versión 18
curl https://packages.microsoft.com/keys/microsoft.asc | sudo tee /etc/apt/trusted.gpg.d/microsoft.asc
Escoja la versión necesaria para la instalación
curl https://packages.microsoft.com/config/debian/9/prod.list | sudo tee /etc/apt/sources.list.d/mssql-release.list
curl https://packages.microsoft.com/config/debian/10/prod.list | sudo tee /etc/apt/sources.list.d/mssql-release.list
curl https://packages.microsoft.com/config/debian/11/prod.list | sudo tee /etc/apt/sources.list.d/mssql-release.list
curl https://packages.microsoft.com/config/debian/12/prod.list | sudo tee /etc/apt/sources.list.d/mssql-release.list
sudo apt-get update sudo ACCEPT_EULA=Y apt-get install -y msodbcsql18
optional: for bcp and sqlcmd
sudo ACCEPT_EULA=Y apt-get install -y mssql-tools18 echo 'export PATH="$PATH:/opt/mssql-tools18/bin"' >> ~/.bashrc source ~/.bashrc
optional: for unixODBC development headers
sudo apt-get install -y unixodbc-dev
optional: kerberos library for debian-slim distributions
sudo apt-get install -y libgssapi-krb5-2
© MicrosoftDocs
Inicializamos la conexión con el ODBC 18 descrgado anteriormente
import pyodbc
connectionString = f'DRIVER={{ODBC Driver 18 for SQL Server}};SERVER=IP,41433;DATABASE=Concellos;UID=sa;PWD=Abcd1234.;Encrypt=no'
conexion = pyodbc.connect(connectionString)
Eliminamos en este caso la tabla si existe para no repetir datos
cursor = conexion.cursor()
cursor.execute("""Drop TABLE if exists ConcelloDatos""")
cursor.commit()
Creamos la tabla con los valores, siendo las tablas con [] nombres con espacios
cursor = conexion.cursor()
cursor.execute("""CREATE TABLE Concellos.dbo.ConcelloDatos (
Expediente varchar(800) NULL,
[Tipo de Contrato] varchar(400) NULL,
Estado varchar(50) NULL,
Importe decimal(38,0) NULL,
Fechas varchar(200) NULL,
[Órgano de Contratación] varchar(100) NULL
)
""")
cursor.commit()
Ejecutamos un executemany para jugar un PreparedStatment es decir una consulta con parámetros y le pasamos los valores del DataFrame
cursor.executemany("""
INSERT INTO ConcelloDatos(Expediente,[Tipo de Contrato],Estado,Importe,Fechas,[Órgano de Contratación])
Values (?,?,?,?,?,?)
""",dfT.values.tolist())
cursor.commit()
#SQL ALCHEMY
#dfT.to_sql("ConcelloDatos",connectionString)
Hacemos un Select * de prueba y vemos por ejemplo la columna "Expediente"
cursor.execute("""
Select *
From ConcelloDatos
""")
tuplas = cursor.fetchall()
for tupla in tuplas:
print(tupla.Expediente)
Cerramos la conexión a SQL Server y su Cursor
cursor.close()
conexion.close()