  "nbformat": 4,
  "nbformat_minor": 0,
  "metadata": {
    "colab": {
      "provenance": []
    "kernelspec": {
      "name": "python3",
      "display_name": "Python 3"
    "language_info": {
      "name": "python"
  "cells": [
      "cell_type": "code",
      "source": [
        "!pip install opencv-python-headless\n",
        "!pip install pytesseract\n",
        "!pip install Pillow\n",
        "!pip install pdf2image\n",
        "!pip install langchain"
      "metadata": {
        "id": "8SpFUpYjeGbS"
      "execution_count": null,
      "outputs": []
      "cell_type": "code",
      "source": [
        "!sudo apt-get install poppler-utils\n",
        "!sudo apt-get install tesseract-ocr"
      "metadata": {
        "id": "Qo6omoFzel98"
      "execution_count": null,
      "outputs": []
      "cell_type": "code",
      "source": [
        "import cv2\n",
        "import numpy as np\n",
        "import pytesseract\n",
        "from pdf2image import convert_from_path\n",
        "from PIL import Image\n",
        "import os\n",
        "import logging\n",
        "import re\n",
        "from langchain import HuggingFaceHub, PromptTemplate, LLMChain\n",
        "# from langchain.document_loaders import UnstructuredPDFLoader\n",
        "import re\n",
        "import json\n",
        "from typing import List,Tuple,Any,Union,Dict\n",
        "logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')\n",
        "class TableExtractor:\n",
        "    def __init__(self, pdf_path):\n",
        "        self.huggingfacehub_api_token = \"YOUR_HUGGINGFACE_API_KEY\"  # Replace with your Hugging Face token\n",
        "        self.repo_id = \"mistralai/Mistral-7B-Instruct-v0.1\"\n",
        "        self.llm = HuggingFaceHub(\n",
        "            huggingfacehub_api_token=self.huggingfacehub_api_token,\n",
        "            repo_id=self.repo_id,\n",
        "            model_kwargs={\"temperature\": 0.1, \"max_new_tokens\":3000}\n",
        "        )\n",
        "        self.pdf_path = pdf_path\n",
        "    def _image_list_(self, pdf_path: str) -> List[str]:\n",
        "        \"\"\"\n",
        "        Converts all pages in a PDF file to images, saving them locally and returning a list of image filenames.\n",
        "        Parameters:\n",
        "        - pdf_path (str): The file path of the PDF document to be converted.\n",
        "        Returns:\n",
        "        - List[str]: A list of filenames for the images created, one per page of the PDF.\n",
        "        Raises:\n",
        "        - Exception: Propagates any exception that occurs during the PDF to image conversion process,\n",
        "                    after logging the error.\n",
        "        \"\"\"\n",
        "        try:\n",
        "            images = convert_from_path(self.pdf_path)\n",
        "            img_list = []\n",
        "            for i, image in enumerate(images):\n",
        "                image_name = f'page_{i}.jpg'\n",
        "                image.save(image_name, 'JPEG')\n",
        "                img_list.append(image_name)\n",
        "            return img_list\n",
        "        except Exception as e:\n",
        "            logging.error(f\"Error converting PDF to images: {e}\")\n",
        "            raise\n",
        "    def _preprocess_image_(self, image_path: str) -> Any:\n",
        "      \"\"\"\n",
        "      Preprocesses an image to enhance table detection and OCR accuracy by converting it to grayscale,\n",
        "      applying noise reduction, and performing thresholding to obtain a binary image.\n",
        "      Parameters:\n",
        "      - image_path (str): The file path of the image to preprocess.\n",
        "      Returns:\n",
        "      - Any: The preprocessed image in a binary format suitable for further processing. The actual type\n",
        "            is dependent on the OpenCV version used, but it generally corresponds to a numpy array.\n",
        "      Raises:\n",
        "      - FileNotFoundError: If the specified image file does not exist.\n",
        "      - Exception: For issues related to reading the image or preprocessing steps.\n",
        "      \"\"\"\n",
        "      try:\n",
        "        img = cv2.imread(image_path)\n",
        "        # Convert to grayscale\n",
        "        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)\n",
        "        # Noise removal\n",
        "        denoised = cv2.fastNlMeansDenoising(gray, None, 30, 7, 21)\n",
        "        # Thresholding to get a binary image\n",
        "        _, thresh = cv2.threshold(denoised, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)\n",
        "        return thresh\n",
        "      except Exception as e:\n",
        "          logging.error(\"Error during the preprocessing of the image\", exc_info=True)\n",
        "          raise\n",
        "    def _detect_tables_(self, image: Any) -> List[Tuple[int, int, int, int]]:\n",
        "      \"\"\"\n",
        "      Detects tables in an image using morphological transformations for line detection\n",
        "      and contour detection to identify table boundaries.\n",
        "      Parameters:\n",
        "      - image (Any): The preprocessed binary image where tables are to be detected. The type is\n",
        "                    typically a NumPy array, though it is annotated as `Any` to accommodate for\n",
        "                    flexibility in input image types.\n",
        "      Returns:\n",
        "      - List[Tuple[int, int, int, int]]: A list of tuples, each representing the bounding box of a detected\n",
        "                                        table in the format (x, y, width, height).\n",
        "      Note:\n",
        "      This method assumes the input image is preprocessed, ideally binary, to highlight table structures.\n",
        "      \"\"\"\n",
        "      try:\n",
        "        # Use morphological transformations to detect lines\n",
        "        vertical_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (1, int(image.shape[0] / 30)))\n",
        "        horiz_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (int(image.shape[1] / 30), 1))\n",
        "        vertical_lines = cv2.morphologyEx(image, cv2.MORPH_OPEN, vertical_kernel, iterations=2)\n",
        "        horizontal_lines = cv2.morphologyEx(image, cv2.MORPH_OPEN, horiz_kernel, iterations=2)\n",
        "        # Combine lines\n",
        "        table_grid = cv2.add(horizontal_lines, vertical_lines)\n",
        "        # Find contours\n",
        "        contours, _ = cv2.findContours(table_grid, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)\n",
        "        tables = []\n",
        "        for contour in contours:\n",
        "            x, y, w, h = cv2.boundingRect(contour)\n",
        "            if w * h > image.size * 0.001:  # Filter out small contours\n",
        "                tables.append((x, y, w, h))\n",
        "        logging.info(f\"Detected {len(tables)} tables in the image.\")\n",
        "        return tables\n",
        "      except Exception as e:\n",
        "        logging.error(\"Error during table detection\", exc_info=True)\n",
        "        raise\n",
        "    def _extract_text_from_tables_(self, image: Any, tables: List[Tuple[int, int, int, int]]) -> List[str]:\n",
        "      \"\"\"\n",
        "      Extracts text from specified table regions in an image using OCR.\n",
        "      Parameters:\n",
        "      - image (Any): The image from which text is to be extracted. The type is typically a NumPy array,\n",
        "                    though it is annotated as `Any` to accommodate for flexibility in input image types.\n",
        "      - tables (List[Tuple[int, int, int, int]]): A list of tuples, each representing the bounding box\n",
        "                                                  of a table to extract text from, in the format\n",
        "                                                  (x, y, width, height).\n",
        "      Returns:\n",
        "      - List[str]: A list of strings, where each string contains the text extracted from the corresponding\n",
        "                  table region defined in the `tables` parameter.\n",
        "      Raises:\n",
        "      - Exception: For issues during the image cropping or OCR process.\n",
        "      \"\"\"\n",
        "      try:\n",
        "        texts = []\n",
        "        for (x, y, w, h) in tables:\n",
        "            table_image = image[y:y+h, x:x+w]\n",
        "            text = pytesseract.image_to_string(table_image, lang='eng')\n",
        "            texts.append(text)\n",
        "        logging.info(f\"Extracted text from {len(tables)} tables.\")\n",
        "        return texts\n",
        "      except Exception as e:\n",
        "        logging.error(\"Error extracting text from tables\", exc_info=True)\n",
        "        raise\n",
        "    def extract_tables_and_text(self) -> List[str]:\n",
        "      \"\"\"\n",
        "      Extracts tables and their respective text from the document specified by `self.pdf_path`.\n",
        "      This method integrates the workflow of converting PDF pages to images, preprocessing images for table\n",
        "      detection, detecting table boundaries, and extracting text from these tables.\n",
        "      Returns:\n",
        "      - List[str]: A list of strings, each string contains the text extracted from a table detected in the\n",
        "                  document. The list is compiled from all tables detected across all pages of the document.\n",
        "      Raises:\n",
        "      - Exception: For any issues encountered during the processes of image conversion, preprocessing,\n",
        "                  table detection, or text extraction.\n",
        "      \"\"\"\n",
        "      try:\n",
        "        logging.info(\"Starting table and text extraction process.\")\n",
        "        # Convert all pages of the PDF to images and store the paths in `images`.\n",
        "        images = self._image_list_(self.pdf_path)\n",
        "        # Initialize an empty list to hold all extracted texts from tables.\n",
        "        all_tables_text = []\n",
        "        # Iterate through each image path in the list of images.\n",
        "        for image_path in images:\n",
        "            preprocessed_image = self._preprocess_image_(image_path)\n",
        "            tables = self._detect_tables_(preprocessed_image)\n",
        "            texts = self._extract_text_from_tables_(preprocessed_image, tables)\n",
        "            all_tables_text.extend(texts)\n",
        "        logging.info(\"Completed table and text extraction process.\")\n",
        "        # Return the list of extracted texts from all tables.\n",
        "        return all_tables_text\n",
        "      except Exception as e:\n",
        "        logging.error(\"Error in extracting tables and text\", exc_info=True)\n",
        "        raise\n",
        "    def extracted_data(self) -> List[str]:\n",
        "      \"\"\"\n",
        "      Cleans and returns the extracted text data from tables in the document.\n",
        "      This method calls `extract_tables_and_text` to get the raw text from tables,\n",
        "      then cleans the text by normalizing spaces and removing excessive newlines.\n",
        "      Returns:\n",
        "          List[str]: A list of cleaned strings, each representing the text extracted\n",
        "                    and cleaned from a single table detected in the document.\n",
        "      \"\"\"\n",
        "      try:\n",
        "        # Log the start of the data extraction process\n",
        "        logging.info(\"Starting extracted data processing.\")\n",
        "        # Extract raw tables text\n",
        "        tables_text = self.extract_tables_and_text()\n",
        "        # Initialize an empty list to hold cleaned text data\n",
        "        answer=[]\n",
        "        # Iterate through each raw text extracted from tables\n",
        "        for text in tables_text:\n",
        "          # Replace multiple spaces or tabs with a single space\n",
        "          cleaned_string = re.sub(r'[ \\t]+', ' ', text)\n",
        "          cleaned_string = re.sub(r'\\n\\s*\\n', '', cleaned_string)\n",
        "          answer.append(cleaned_string)\n",
        "        logging.info(\"Completed data extraction and cleaning.\")\n",
        "        return answer\n",
        "      except Exception as e:\n",
        "          logging.error(\"Error in extracting data\", exc_info=True)\n",
        "          raise\n",
        "    def response(self, content: str) -> str:\n",
        "      \"\"\"\n",
        "      Processes the given content by formatting it into a key-value pair JSON-like structure using an AI assistant.\n",
        "      Args:\n",
        "          content (str): The input data that needs to be analyzed and formatted.\n",
        "      Returns:\n",
        "          str: The cleaned and formatted result as a JSON-like string, where keys without values are set to an empty string.\n",
        "      \"\"\"\n",
        "      try:\n",
        "        # Define the template for processing the input content\n",
        "        template = \"\"\"[INST]you are json formatter.your task analyze the given data{data} and must return answer as json.key doest have value return empty string.only generate json for given data's.all answers should be in  json format(for all data).[/INST]\"\"\"\n",
        "        # Assuming PromptTemplate and LLMChain are correctly defined and imported\n",
        "        prompt = PromptTemplate(template=template, input_variables=[\"data\"])\n",
        "        llm_chain = LLMChain(prompt=prompt, verbose=True, llm=self.llm)\n",
        "        # Run the language model chain with the provided content\n",
        "        result = llm_chain.run({\"data\":content})\n",
        "        # # Clean the result by removing the pattern specified\n",
        "        # pattern = r\"\\*[^*]*\\*\"\n",
        "        # cleaned_text = re.sub(pattern, '', result)\n",
        "        # # Log the completion of the cleaning process\n",
        "        # logging.info(\"Completed processing and cleaning the response.\")\n",
        "        # print(\"result\",result)\n",
        "        return result\n",
        "      except Exception as e:\n",
        "          logging.error(\"Error in response\", exc_info=True)\n",
        "          raise\n",
        "    def list_of_answer(self) -> List[str]:\n",
        "      \"\"\"\n",
        "      Processes extracted data to generate a list of answers after further cleaning and formatting.\n",
        "      This method iterates over the data extracted by `extracted_data`, processes each item using `response`,\n",
        "      and compiles the results into a final list.\n",
        "      Returns:\n",
        "          List[str]: A list of strings, each a processed and cleaned response based on the extracted data.\n",
        "      \"\"\"\n",
        "      try:\n",
        "        # Retrieve extracted data\n",
        "        answer=self.extracted_data()\n",
        "        # Initialize an empty list to hold the final processed results\n",
        "        final=[]\n",
        "        # Iterate over each item in the extracted data\n",
        "        for i in range(len(answer)):\n",
        "          result=self.response(answer[i])\n",
        "          final.append(result)\n",
        "        logging.info(\"Completed processing list of answers.\")\n",
        "        return final\n",
        "      except Exception as e:\n",
        "          logging.error(\"Error in list of answer\", exc_info=True)\n",
        "          raise\n",
        "    def extract_and_combine_json(self,text_list: List[str]) -> List[Dict[str, Any]]:\n",
        "      \"\"\"\n",
        "      Extracts JSON objects from a list of strings and combines them into a single list.\n",
        "      Each string in the input list is searched for JSON objects enclosed within ```json ... ``` markers.\n",
        "      All found JSON objects are parsed and combined into a list of dictionaries.\n",
        "      Args:\n",
        "          text_list: A list of strings, each potentially containing one or more JSON objects.\n",
        "      Returns:\n",
        "          A list of dictionaries, where each dictionary is a parsed JSON object found in the input text.\n",
        "      Note:\n",
        "          This function uses a specific pattern to identify JSON blocks within the text, which are enclosed in\n",
        "          triple backticks followed by 'json' keyword and assumes well-formed JSON objects.\n",
        "      \"\"\"\n",
        "      try:\n",
        "        # This pattern matches your JSON blocks specifically formatted in your example\n",
        "        pattern = r'```json\\n({.*?})\\n```'\n",
        "        combined_json_objects = []  # This will hold all your parsed JSON objects\n",
        "        for text in text_list:\n",
        "            # Find all JSON strings within the text\n",
        "            json_strings = re.findall(pattern, text, re.DOTALL)\n",
        "            for json_str in json_strings:\n",
        "                try:\n",
        "                    # Parse the JSON string and append the resulting object to your list\n",
        "                    json_obj = json.loads(json_str)\n",
        "                    combined_json_objects.append(json_obj)\n",
        "                except json.JSONDecodeError as e:\n",
        "                    print(f\"Failed to decode JSON: {e}\")\n",
        "        return combined_json_objects\n",
        "      except Exception as e:\n",
        "        print(f\"Error in extract_and_combine_json {e}\")\n",
        "    def key_value_pair(self) -> str:\n",
        "      \"\"\"\n",
        "      Extracts JSON objects from a list of text blocks, combines them, and returns the combined JSON as a string.\n",
        "      This method calls `list_of_answer` to retrieve a list of text blocks, each potentially containing JSON objects.\n",
        "      These blocks are then processed by `extract_and_combine_json` to extract and combine all JSON objects into a single structure.\n",
        "      Finally, it converts this structure into a nicely formatted JSON string.\n",
        "      Returns:\n",
        "          A string representation of the combined JSON objects, formatted with an indent of 2 spaces.\n",
        "      \"\"\"\n",
        "      try:\n",
        "        # Retrieve the list of text blocks that may contain JSON objects\n",
        "        list_of_text=self.list_of_answer()\n",
        "        # Extract and combine JSON objects from the text blocks\n",
        "        combined_json=self.extract_and_combine_json(list_of_text)\n",
        "        # Convert the combined JSON objects into a formatted string\n",
        "        key_value=json.dumps(combined_json, indent=2)\n",
        "        logging.info(\"Successfully combined JSON objects.\")\n",
        "        return key_value\n",
        "      except Exception as e:\n",
        "          logging.error(f\"An error occurred in key_value_pair: {e}\")\n",
        "          raise"
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        "id": "X695svACeG7u",
        "outputId": "c03afda1-3ff5-4fbe-b1d2-f9975ccfc61c"
      "execution_count": 11,
      "outputs": [
          "output_type": "stream",
          "name": "stdout",
          "text": [
            "Overwriting utils.py\n"
      "cell_type": "code",
      "source": [
        "if __name__==\"__main__\":\n",
        "  pdf_path=\"YOUR_PDF_PATH\"\n",
        "  table=TableExtractor(pdf_path)\n",
        "  result=table.key_value_pair()\n",
        "  print(result)"
      "metadata": {
        "id": "BGv8WZpEePtC"
      "execution_count": null,
      "outputs": []