{ "nbformat": 4, "nbformat_minor": 0, "metadata": { "colab": { "provenance": [] }, "kernelspec": { "name": "python3", "display_name": "Python 3" }, "language_info": { "name": "python" } }, "cells": [ { "cell_type": "code", "source": [ "!pip install opencv-python-headless\n", "!pip install pytesseract\n", "!pip install Pillow\n", "!pip install pdf2image\n", "!pip install langchain" ], "metadata": { "id": "8SpFUpYjeGbS" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "!sudo apt-get install poppler-utils\n", "!sudo apt-get install tesseract-ocr" ], "metadata": { "id": "Qo6omoFzel98" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "import cv2\n", "import numpy as np\n", "import pytesseract\n", "from pdf2image import convert_from_path\n", "from PIL import Image\n", "import os\n", "import logging\n", "import re\n", "from langchain import HuggingFaceHub, PromptTemplate, LLMChain\n", "# from langchain.document_loaders import UnstructuredPDFLoader\n", "import re\n", "import json\n", "from typing import List,Tuple,Any,Union,Dict\n", "logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')\n", "\n", "class TableExtractor:\n", "\n", " def __init__(self, pdf_path):\n", " self.huggingfacehub_api_token = \"YOUR_HUGGINGFACE_API_KEY\" # Replace with your Hugging Face token\n", " self.repo_id = \"mistralai/Mistral-7B-Instruct-v0.1\"\n", " self.llm = HuggingFaceHub(\n", " huggingfacehub_api_token=self.huggingfacehub_api_token,\n", " repo_id=self.repo_id,\n", " model_kwargs={\"temperature\": 0.1, \"max_new_tokens\":3000}\n", " )\n", " self.pdf_path = pdf_path\n", "\n", " def _image_list_(self, pdf_path: str) -> List[str]:\n", " \"\"\"\n", " Converts all pages in a PDF file to images, saving them locally and returning a list of image filenames.\n", "\n", " Parameters:\n", " - pdf_path (str): The file path of the PDF document to be converted.\n", "\n", " Returns:\n", " - List[str]: A list of filenames for the images created, one per page of the PDF.\n", "\n", " Raises:\n", " - Exception: Propagates any exception that occurs during the PDF to image conversion process,\n", " after logging the error.\n", " \"\"\"\n", "\n", " try:\n", " images = convert_from_path(self.pdf_path)\n", " img_list = []\n", " for i, image in enumerate(images):\n", " image_name = f'page_{i}.jpg'\n", " image.save(image_name, 'JPEG')\n", " img_list.append(image_name)\n", " return img_list\n", " except Exception as e:\n", " logging.error(f\"Error converting PDF to images: {e}\")\n", " raise\n", "\n", " def _preprocess_image_(self, image_path: str) -> Any:\n", " \"\"\"\n", " Preprocesses an image to enhance table detection and OCR accuracy by converting it to grayscale,\n", " applying noise reduction, and performing thresholding to obtain a binary image.\n", "\n", " Parameters:\n", " - image_path (str): The file path of the image to preprocess.\n", "\n", " Returns:\n", " - Any: The preprocessed image in a binary format suitable for further processing. The actual type\n", " is dependent on the OpenCV version used, but it generally corresponds to a numpy array.\n", "\n", " Raises:\n", " - FileNotFoundError: If the specified image file does not exist.\n", " - Exception: For issues related to reading the image or preprocessing steps.\n", " \"\"\"\n", " try:\n", " img = cv2.imread(image_path)\n", " # Convert to grayscale\n", " gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)\n", " # Noise removal\n", " denoised = cv2.fastNlMeansDenoising(gray, None, 30, 7, 21)\n", " # Thresholding to get a binary image\n", " _, thresh = cv2.threshold(denoised, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)\n", " return thresh\n", "\n", " except Exception as e:\n", " logging.error(\"Error during the preprocessing of the image\", exc_info=True)\n", " raise\n", "\n", "\n", " def _detect_tables_(self, image: Any) -> List[Tuple[int, int, int, int]]:\n", " \"\"\"\n", " Detects tables in an image using morphological transformations for line detection\n", " and contour detection to identify table boundaries.\n", "\n", " Parameters:\n", " - image (Any): The preprocessed binary image where tables are to be detected. The type is\n", " typically a NumPy array, though it is annotated as `Any` to accommodate for\n", " flexibility in input image types.\n", "\n", " Returns:\n", " - List[Tuple[int, int, int, int]]: A list of tuples, each representing the bounding box of a detected\n", " table in the format (x, y, width, height).\n", "\n", " Note:\n", " This method assumes the input image is preprocessed, ideally binary, to highlight table structures.\n", " \"\"\"\n", " try:\n", " # Use morphological transformations to detect lines\n", " vertical_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (1, int(image.shape[0] / 30)))\n", " horiz_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (int(image.shape[1] / 30), 1))\n", " vertical_lines = cv2.morphologyEx(image, cv2.MORPH_OPEN, vertical_kernel, iterations=2)\n", " horizontal_lines = cv2.morphologyEx(image, cv2.MORPH_OPEN, horiz_kernel, iterations=2)\n", "\n", " # Combine lines\n", " table_grid = cv2.add(horizontal_lines, vertical_lines)\n", " # Find contours\n", " contours, _ = cv2.findContours(table_grid, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)\n", "\n", " tables = []\n", " for contour in contours:\n", " x, y, w, h = cv2.boundingRect(contour)\n", " if w * h > image.size * 0.001: # Filter out small contours\n", " tables.append((x, y, w, h))\n", "\n", " logging.info(f\"Detected {len(tables)} tables in the image.\")\n", " return tables\n", "\n", " except Exception as e:\n", " logging.error(\"Error during table detection\", exc_info=True)\n", " raise\n", "\n", "\n", " def _extract_text_from_tables_(self, image: Any, tables: List[Tuple[int, int, int, int]]) -> List[str]:\n", " \"\"\"\n", " Extracts text from specified table regions in an image using OCR.\n", "\n", " Parameters:\n", " - image (Any): The image from which text is to be extracted. The type is typically a NumPy array,\n", " though it is annotated as `Any` to accommodate for flexibility in input image types.\n", " - tables (List[Tuple[int, int, int, int]]): A list of tuples, each representing the bounding box\n", " of a table to extract text from, in the format\n", " (x, y, width, height).\n", "\n", " Returns:\n", " - List[str]: A list of strings, where each string contains the text extracted from the corresponding\n", " table region defined in the `tables` parameter.\n", "\n", " Raises:\n", " - Exception: For issues during the image cropping or OCR process.\n", " \"\"\"\n", " try:\n", " texts = []\n", " for (x, y, w, h) in tables:\n", " table_image = image[y:y+h, x:x+w]\n", " text = pytesseract.image_to_string(table_image, lang='eng')\n", " texts.append(text)\n", " logging.info(f\"Extracted text from {len(tables)} tables.\")\n", " return texts\n", " except Exception as e:\n", " logging.error(\"Error extracting text from tables\", exc_info=True)\n", " raise\n", "\n", "\n", " def extract_tables_and_text(self) -> List[str]:\n", " \"\"\"\n", " Extracts tables and their respective text from the document specified by `self.pdf_path`.\n", "\n", " This method integrates the workflow of converting PDF pages to images, preprocessing images for table\n", " detection, detecting table boundaries, and extracting text from these tables.\n", "\n", " Returns:\n", " - List[str]: A list of strings, each string contains the text extracted from a table detected in the\n", " document. The list is compiled from all tables detected across all pages of the document.\n", "\n", " Raises:\n", " - Exception: For any issues encountered during the processes of image conversion, preprocessing,\n", " table detection, or text extraction.\n", " \"\"\"\n", " try:\n", " logging.info(\"Starting table and text extraction process.\")\n", " # Convert all pages of the PDF to images and store the paths in `images`.\n", " images = self._image_list_(self.pdf_path)\n", "\n", " # Initialize an empty list to hold all extracted texts from tables.\n", " all_tables_text = []\n", "\n", " # Iterate through each image path in the list of images.\n", " for image_path in images:\n", " preprocessed_image = self._preprocess_image_(image_path)\n", " tables = self._detect_tables_(preprocessed_image)\n", " texts = self._extract_text_from_tables_(preprocessed_image, tables)\n", " all_tables_text.extend(texts)\n", "\n", " logging.info(\"Completed table and text extraction process.\")\n", " # Return the list of extracted texts from all tables.\n", " return all_tables_text\n", " except Exception as e:\n", " logging.error(\"Error in extracting tables and text\", exc_info=True)\n", " raise\n", "\n", " def extracted_data(self) -> List[str]:\n", " \"\"\"\n", " Cleans and returns the extracted text data from tables in the document.\n", "\n", " This method calls `extract_tables_and_text` to get the raw text from tables,\n", " then cleans the text by normalizing spaces and removing excessive newlines.\n", "\n", " Returns:\n", " List[str]: A list of cleaned strings, each representing the text extracted\n", " and cleaned from a single table detected in the document.\n", " \"\"\"\n", " try:\n", " # Log the start of the data extraction process\n", " logging.info(\"Starting extracted data processing.\")\n", "\n", " # Extract raw tables text\n", " tables_text = self.extract_tables_and_text()\n", "\n", " # Initialize an empty list to hold cleaned text data\n", " answer=[]\n", "\n", " # Iterate through each raw text extracted from tables\n", " for text in tables_text:\n", " # Replace multiple spaces or tabs with a single space\n", " cleaned_string = re.sub(r'[ \\t]+', ' ', text)\n", " cleaned_string = re.sub(r'\\n\\s*\\n', '', cleaned_string)\n", " answer.append(cleaned_string)\n", " logging.info(\"Completed data extraction and cleaning.\")\n", " return answer\n", "\n", " except Exception as e:\n", " logging.error(\"Error in extracting data\", exc_info=True)\n", " raise\n", "\n", " def response(self, content: str) -> str:\n", " \"\"\"\n", " Processes the given content by formatting it into a key-value pair JSON-like structure using an AI assistant.\n", "\n", " Args:\n", " content (str): The input data that needs to be analyzed and formatted.\n", "\n", " Returns:\n", " str: The cleaned and formatted result as a JSON-like string, where keys without values are set to an empty string.\n", " \"\"\"\n", "\n", " try:\n", "\n", " # Define the template for processing the input content\n", " template = \"\"\"[INST]you are json formatter.your task analyze the given data{data} and must return answer as json.key doest have value return empty string.only generate json for given data's.all answers should be in json format(for all data).[/INST]\"\"\"\n", " # Assuming PromptTemplate and LLMChain are correctly defined and imported\n", " prompt = PromptTemplate(template=template, input_variables=[\"data\"])\n", " llm_chain = LLMChain(prompt=prompt, verbose=True, llm=self.llm)\n", " # Run the language model chain with the provided content\n", " result = llm_chain.run({\"data\":content})\n", " # # Clean the result by removing the pattern specified\n", " # pattern = r\"\\*[^*]*\\*\"\n", " # cleaned_text = re.sub(pattern, '', result)\n", " # # Log the completion of the cleaning process\n", " # logging.info(\"Completed processing and cleaning the response.\")\n", " # print(\"result\",result)\n", " return result\n", "\n", " except Exception as e:\n", " logging.error(\"Error in response\", exc_info=True)\n", " raise\n", "\n", " def list_of_answer(self) -> List[str]:\n", " \"\"\"\n", " Processes extracted data to generate a list of answers after further cleaning and formatting.\n", "\n", " This method iterates over the data extracted by `extracted_data`, processes each item using `response`,\n", " and compiles the results into a final list.\n", "\n", " Returns:\n", " List[str]: A list of strings, each a processed and cleaned response based on the extracted data.\n", " \"\"\"\n", " try:\n", " # Retrieve extracted data\n", " answer=self.extracted_data()\n", " # Initialize an empty list to hold the final processed results\n", " final=[]\n", " # Iterate over each item in the extracted data\n", " for i in range(len(answer)):\n", " result=self.response(answer[i])\n", " final.append(result)\n", " logging.info(\"Completed processing list of answers.\")\n", " return final\n", "\n", " except Exception as e:\n", " logging.error(\"Error in list of answer\", exc_info=True)\n", " raise\n", "\n", " def extract_and_combine_json(self,text_list: List[str]) -> List[Dict[str, Any]]:\n", " \"\"\"\n", " Extracts JSON objects from a list of strings and combines them into a single list.\n", "\n", " Each string in the input list is searched for JSON objects enclosed within ```json ... ``` markers.\n", " All found JSON objects are parsed and combined into a list of dictionaries.\n", "\n", " Args:\n", " text_list: A list of strings, each potentially containing one or more JSON objects.\n", "\n", " Returns:\n", " A list of dictionaries, where each dictionary is a parsed JSON object found in the input text.\n", "\n", " Note:\n", " This function uses a specific pattern to identify JSON blocks within the text, which are enclosed in\n", " triple backticks followed by 'json' keyword and assumes well-formed JSON objects.\n", " \"\"\"\n", " try:\n", " # This pattern matches your JSON blocks specifically formatted in your example\n", " pattern = r'```json\\n({.*?})\\n```'\n", " combined_json_objects = [] # This will hold all your parsed JSON objects\n", "\n", " for text in text_list:\n", " # Find all JSON strings within the text\n", " json_strings = re.findall(pattern, text, re.DOTALL)\n", " for json_str in json_strings:\n", " try:\n", " # Parse the JSON string and append the resulting object to your list\n", " json_obj = json.loads(json_str)\n", " combined_json_objects.append(json_obj)\n", " except json.JSONDecodeError as e:\n", " print(f\"Failed to decode JSON: {e}\")\n", " return combined_json_objects\n", "\n", " except Exception as e:\n", " print(f\"Error in extract_and_combine_json {e}\")\n", "\n", " def key_value_pair(self) -> str:\n", " \"\"\"\n", " Extracts JSON objects from a list of text blocks, combines them, and returns the combined JSON as a string.\n", "\n", " This method calls `list_of_answer` to retrieve a list of text blocks, each potentially containing JSON objects.\n", " These blocks are then processed by `extract_and_combine_json` to extract and combine all JSON objects into a single structure.\n", " Finally, it converts this structure into a nicely formatted JSON string.\n", "\n", " Returns:\n", " A string representation of the combined JSON objects, formatted with an indent of 2 spaces.\n", " \"\"\"\n", " try:\n", " # Retrieve the list of text blocks that may contain JSON objects\n", " list_of_text=self.list_of_answer()\n", " # Extract and combine JSON objects from the text blocks\n", " combined_json=self.extract_and_combine_json(list_of_text)\n", " # Convert the combined JSON objects into a formatted string\n", " key_value=json.dumps(combined_json, indent=2)\n", " logging.info(\"Successfully combined JSON objects.\")\n", " return key_value\n", "\n", " except Exception as e:\n", " logging.error(f\"An error occurred in key_value_pair: {e}\")\n", " raise" ], "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "X695svACeG7u", "outputId": "c03afda1-3ff5-4fbe-b1d2-f9975ccfc61c" }, "execution_count": 11, "outputs": [ { "output_type": "stream", "name": "stdout", "text": [ "Overwriting utils.py\n" ] } ] }, { "cell_type": "code", "source": [ "if __name__==\"__main__\":\n", " pdf_path=\"YOUR_PDF_PATH\"\n", " table=TableExtractor(pdf_path)\n", " result=table.key_value_pair()\n", " print(result)" ], "metadata": { "id": "BGv8WZpEePtC" }, "execution_count": null, "outputs": [] } ] }