newsspider/decspider/pipelines.py
2024-05-17 16:25:45 +08:00

88 lines
3.6 KiB
Python

# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html
# useful for handling different item types with a single interface
import mysql.connector
from scrapy.exceptions import DropItem
from .items import NewsItem
from .settings import MYSQL_USERNAME, MYSQL_PASSWORD, MYSQL_HOST, MYSQL_PORT, MYSQL_DATABASE
class DecspiderPipeline:
def open_spider(self, spider):
# 连接数据库
self.conn = mysql.connector.connect(user=MYSQL_USERNAME, password=MYSQL_PASSWORD, host=MYSQL_HOST, database=MYSQL_DATABASE, port=MYSQL_PORT)
self.cursor = self.conn.cursor()
# 动态生成表名
self.table_name = f'{spider.settings.get("BOT_NAME")}_{spider.name}'
spider.log(f'Dataset name: {self.table_name}')
# 检查表是否存在,如果不存在就创建表
self.cursor.execute(f"""
CREATE TABLE IF NOT EXISTS `{self.table_name}` (
id INT AUTO_INCREMENT PRIMARY KEY,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
)
""")
# 获取当前表的列信息
self.cursor.execute(f"SHOW COLUMNS FROM `{self.table_name}`")
existing_columns = {row[0] for row in self.cursor.fetchall()}
# 获取 NewsItem 字段信息
item_columns = set(NewsItem.fields.keys())
# 添加 NewsItem 字段到表中
for column in item_columns:
if column not in existing_columns:
self.cursor.execute(f"ALTER TABLE `{self.table_name}` ADD COLUMN `{column}` TEXT")
spider.log(f'Added column `{column}` to `{self.table_name}` table')
# 添加 created_at 和 updated_at 字段,如果它们不存在
if 'created_at' not in existing_columns:
self.cursor.execute(f"ALTER TABLE `{self.table_name}` ADD COLUMN `created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP")
spider.log(f'Added column `created_at` to `{self.table_name}` table')
if 'updated_at' not in existing_columns:
self.cursor.execute(f"ALTER TABLE `{self.table_name}` ADD COLUMN `updated_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP")
spider.log(f'Added column `updated_at` to `{self.table_name}` table')
# 删除表中不存在于 NewsItem 中的字段
for column in existing_columns:
if column not in item_columns and column not in {'id', 'created_at', 'updated_at'}:
self.cursor.execute(f"ALTER TABLE `{self.table_name}` DROP COLUMN `{column}`")
spider.log(f'Dropped column `{column}` from `{self.table_name}` table')
self.conn.commit()
def close_spider(self, spider):
self.conn.close()
def process_item(self, item, spider):
if isinstance(item, NewsItem):
# 插入数据
columns = ', '.join(item.keys())
placeholders = ', '.join(['%s'] * len(item))
sql = f"INSERT INTO `{self.table_name}` ({columns}) VALUES ({placeholders})"
try:
self.cursor.execute(sql, list(item.values()))
self.conn.commit()
except mysql.connector.Error as e:
spider.log(f"Error when inserting item: {e}")
self.conn.rollback()
raise DropItem(f"Error when inserting item: {e}")
return item