from fastapi import Body, FastAPI, UploadFile, File, HTTPException
from fastapi.responses import JSONResponse
import tiktoken
import re
from typing import List, Dict
app = FastAPI()
# Constants
MAX_INPUT_LENGTH = 1000000 # ~1MB of text
BATCH_SIZE = 100000 # Increased batch size for better performance
# Pre-compile regex patterns for better performance
REPEAT_CHARS = re.compile(r'(.)\1{2,}') # For chars like ---, ===
BOX_CHARS = re.compile(r'[─━│┃┄┅┆┇┈┉┊┋╌╍╎╏═║╒╓╔╕╖╗╘╙╚╛╜╝╞╟╠╡╢╣╤╥╦╧╨╩╪╫╬]+')
def clean_text(text: str) -> str:
"""Clean text without any HTML parsing"""
# Reduce repetitive characters (3+ repeats down to 3)
text = REPEAT_CHARS.sub(r'\1\1\1', text)
# Replace box-drawing characters with simple dashes
text = BOX_CHARS.sub('---', text)
# Normalize whitespace
return ' '.join(text.split())
def chunk_text(text: str, max_tokens: int = 512, overlap: int = 50) -> List[Dict]:
"""Efficient chunking with token awareness"""
enc = tiktoken.get_encoding("cl100k_base")
tokens = enc.encode(text)
chunks = []
for i in range(0, len(tokens), max_tokens - overlap):
chunk_tokens = tokens[i:i + max_tokens]
chunks.append({
"text": enc.decode(chunk_tokens),
"tokens": len(chunk_tokens),
"start_token": i,
"end_token": i + len(chunk_tokens)
})
return chunks
@app.post("/chunk")
async def chunk_file(
file: UploadFile = File(...),
max_tokens: int = 512,
overlap: int = 50
):
if not file.content_type.startswith('text/'):
raise HTTPException(400, "Only text files accepted")
try:
text = (await file.read()).decode('utf-8')
if len(text) > MAX_INPUT_LENGTH:
raise HTTPException(413, f"Input too large. Max {MAX_INPUT_LENGTH} chars allowed")
cleaned_text = clean_text(text)
chunks = chunk_text(cleaned_text, max_tokens, overlap)
return JSONResponse({
"filename": file.filename,
"total_chunks": len(chunks),
"chunks": chunks
})
except Exception as e:
raise HTTPException(500, f"Processing error: {str(e)}")
@app.post("/chunk_text")
async def chunk_raw_text(
text: str = Body(..., embed=True),
max_tokens: int = Body(512),
overlap: int = Body(50)
):
try:
if len(text) > MAX_INPUT_LENGTH:
raise HTTPException(413, f"Input too large. Max {MAX_INPUT_LENGTH} chars allowed")
cleaned_text = clean_text(text)
chunks = chunk_text(cleaned_text, max_tokens, overlap)
return JSONResponse({
"total_chunks": len(chunks),
"chunks": chunks
})
except Exception as e:
raise HTTPException(500, f"Error: {str(e)}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8201)
# from fastapi import Body, FastAPI, UploadFile, File, HTTPException
# from fastapi.responses import JSONResponse
# import tiktoken
# import re
# from bs4 import BeautifulSoup
# from typing import List, Dict
# app = FastAPI()
# # Constants
# MAX_INPUT_LENGTH = 1000000
# BATCH_SIZE = 10000
# def preprocess_content(text: str) -> str:
# """Handle HTML and repetitive characters"""
# # HTML/XML content extraction
# if re.search(r'<[a-z][\s>]', text[:1000], re.I):
# try:
# soup = BeautifulSoup(text, 'html.parser')
# # Preserve preformatted text
# for pre in soup.find_all('pre'):
# pre.replace_with('\n' + pre.get_text() + '\n')
# text = soup.get_text()
# except:
# text = re.sub(r'<[^>]+>', '', text) # Fallback HTML tag removal
# # Reduce repetitive characters (3+ repeats down to 3)
# text = re.sub(r'(.)\1{2,}', r'\1\1\1', text) # For chars like ---, ===
# # Special handling for Emacs Lisp box-drawing chars
# text = re.sub(r'[─━│┃┄┅┆┇┈┉┊┋╌╍╎╏═║╒╓╔╕╖╗╘╙╚╛╜╝╞╟╠╡╢╣╤╥╦╧╨╩╪╫╬]+', '---', text)
# return re.sub(r'\s+', ' ', text).strip()
# def chunk_text(text: str, max_tokens: int = 512, overlap: int = 50) -> List[Dict]:
# """Smart chunking preserving code structure"""
# enc = tiktoken.get_encoding("cl100k_base")
# tokens = enc.encode(text)
# chunks = []
# # Find natural breakpoints (comments, defuns, etc.)
# separators = [';;;', ';;;;', '(defun', '(defvar', '(require', '\n\n']
# separator_indices = [
# i for i, token in enumerate(tokens)
# if any(enc.decode(tokens[i:i+len(s)]) == s for s in separators)
# ]
# for i in range(0, len(tokens), max_tokens - overlap):
# end = min(i + max_tokens, len(tokens))
# # Adjust to nearest semantic break
# if separator_indices:
# end = max(
# i + int(max_tokens * 0.7), # Minimum chunk size
# min([x for x in separator_indices if i <= x <= end] or [end])
# )
# chunks.append({
# "text": enc.decode(tokens[i:end]),
# "tokens": end - i,
# "start_token": i,
# "end_token": end
# })
# return chunks
# def process_large_text(text: str, max_tokens: int, overlap: int) -> List[Dict]:
# """Process text with format awareness"""
# processed_text = preprocess_content(text)
# if len(processed_text) <= BATCH_SIZE:
# return chunk_text(processed_text, max_tokens, overlap)
# # Split at major section breaks
# batches = re.split(r'(;;;+|\n{3,}|\(defun)', processed_text)
# all_chunks = []
# token_offset = 0
# for batch in batches:
# if not batch.strip():
# continue
# batch_chunks = chunk_text(batch, max_tokens, overlap)
# for chunk in batch_chunks:
# chunk['start_token'] += token_offset
# chunk['end_token'] += token_offset
# all_chunks.extend(batch_chunks)
# token_offset = all_chunks[-1]['end_token'] if all_chunks else 0
# return all_chunks
# @app.post("/chunk")
# async def chunk_file(
# file: UploadFile = File(...),
# max_tokens: int = 512,
# overlap: int = 50
# ):
# if not file.content_type.startswith(('text/', 'application/xml', 'application/html')):
# raise HTTPException(400, "Only text/HTML/XML files accepted")
# try:
# text = (await file.read()).decode('utf-8')
# if len(text) > MAX_INPUT_LENGTH:
# raise HTTPException(413, f"Input too large. Max {MAX_INPUT_LENGTH} chars allowed")
# chunks = process_large_text(text, max_tokens, overlap)
# return JSONResponse({
# "filename": file.filename,
# "total_chunks": len(chunks),
# "chunks": chunks,
# "preprocessing_applied": True
# })
# except Exception as e:
# raise HTTPException(500, f"Processing error: {str(e)}")
# @app.post("/chunk_text")
# async def chunk_raw_text(
# text: str = Body(..., embed=True),
# max_tokens: int = Body(512),
# overlap: int = Body(50)
# ):
# try:
# if len(text) > MAX_INPUT_LENGTH:
# raise HTTPException(413, f"Input too large. Max {MAX_INPUT_LENGTH} chars allowed")
# chunks = process_large_text(text, max_tokens, overlap)
# return JSONResponse({
# "total_chunks": len(chunks),
# "chunks": chunks,
# "preprocessing_applied": True
# })
# except Exception as e:
# raise HTTPException(500, f"Error: {str(e)}")
# if __name__ == "__main__":
# import uvicorn
# uvicorn.run(app, host="0.0.0.0", port=8201)