תיעוד טכני מפורט לפיתוח והרחבת המערכת
מערכת סריקת המסמכים היא מערכת מלאה לניהול, סריקה ועיבוד מסמכים פיננסיים. המערכת בנויה כמודול עצמאי בתוך פלטפורמת TechLabs והיא כוללת:
מודולים עיקריים:
SQLAlchemy (ORM), Pillow (Image Processing), pytesseract (OCR),
google-cloud-vision, boto3 (AWS), cryptography (Encryption)
ספריות:
Bootstrap Icons, Fetch API (Async/Await), Custom Manager Classes
פריסה:
Docker Compose, Traefik Reverse Proxy, SSL/TLS Certificates
┌─────────────┐
│ Browser │
└──────┬──────┘
│ HTTPS (Traefik)
▼
┌─────────────┐ ┌──────────────┐
│ Flask App │◄────►│ PostgreSQL │
│ (Gunicorn) │ │ Database │
└──────┬──────┘ └──────────────┘
│
├──────────►┌──────────────┐
│ │ Redis │
│ │ (Cache) │
│ └──────────────┘
│
├──────────►┌──────────────┐
│ │ Celery │
│ │ Workers │
│ └──────┬───────┘
│ │
│ ▼
│ ┌──────────────┐
│ │ OCR Services │
│ │ (Tesseract, │
│ │ GCP, AWS) │
│ └──────────────┘
│
└──────────►┌──────────────┐
│ Email APIs │
│ (Gmail/IMAP) │
└──────────────┘
app/ ├── routes/ │ └── routes_email_scanning.py # 320 lines - HTML routes ├── api/ │ ├── api_email_scanning.py # 825 lines - Core API │ └── api_email_scanning_enhanced.py # 619 lines - Enhanced API ├── models/ │ └── models_email_scanning.py # 765 lines - Database models ├── services/ │ ├── email_scanning_ocr_service.py # ~500 lines - OCR service │ ├── email_scanning_tasks.py # 11,486 lines - Celery tasks │ ├── email_integration_service.py # ~15,000 lines - Email service │ └── gmail_service.py # ~10,000 lines - Gmail OAuth ├── static/ │ ├── js/modules/ │ │ ├── email-scanning-manager.js # 912 lines - Main manager │ │ └── email-integration-accounts.js # 518 lines - Email UI │ ├── css/modules/ │ │ └── email-scanning.css # ~5,000 lines - Styles │ └── docs/email-scanning/ │ ├── index.html │ ├── user-guide.html │ ├── api-reference.html │ └── technical.html └── templates/email-scanning/ ├── dashboard.html ├── documents.html ├── upload.html └── ... (24 more templates)
טבלה: scanned_documents
תיאור: מסמך סרוק - הטבלה המרכזית של המערכת
class ScannedDocument(db.Model):
__tablename__ = 'scanned_documents'
# Primary Key
id = db.Column(db.Integer, primary_key=True)
# File Info
filename = db.Column(db.String(255), nullable=False)
original_filename = db.Column(db.String(255))
file_path = db.Column(db.String(512))
file_size = db.Column(db.Integer)
upload_date = db.Column(db.DateTime, default=datetime.utcnow)
# Ownership
user_id = db.Column(db.Integer, db.ForeignKey('users.id'))
company_id = db.Column(db.Integer, db.ForeignKey('scanning_companies.id'))
# OCR Results
ocr_status = db.Column(db.String(50)) # pending/processing/completed/failed
ocr_confidence = db.Column(db.Float)
extracted_text = db.Column(db.Text)
# Financial Data (extracted)
supplier_name = db.Column(db.String(255))
supplier_tax_id = db.Column(db.String(50))
invoice_number = db.Column(db.String(100))
total_amount = db.Column(db.Numeric(12, 2))
currency = db.Column(db.String(3), default='ILS')
invoice_date = db.Column(db.Date)
due_date = db.Column(db.Date)
# Status & Workflow
status = db.Column(db.String(50), default='pending')
approval_status = db.Column(db.String(50))
category_id = db.Column(db.Integer, db.ForeignKey('document_categories.id'))
# Relationships
category = db.relationship('DocumentCategory', backref='documents')
line_items = db.relationship('DocumentLineItem', backref='document')
versions = db.relationship('DocumentVersion', backref='document')
טבלה: document_categories
class DocumentCategory(db.Model):
__tablename__ = 'document_categories'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
description = db.Column(db.Text)
color = db.Column(db.String(7)) # Hex color
icon = db.Column(db.String(50))
# Auto-categorization
auto_categorize = db.Column(db.Boolean, default=False)
keywords = db.Column(db.JSON) # List of keywords
# Multi-tenant
company_id = db.Column(db.Integer, db.ForeignKey('scanning_companies.id'))
טבלה: email_accounts
class EmailAccount(db.Model):
__tablename__ = 'email_accounts'
id = db.Column(db.Integer, primary_key=True)
email = db.Column(db.String(255), nullable=False)
provider = db.Column(db.String(50)) # gmail/imap/outlook
# IMAP Configuration
imap_server = db.Column(db.String(255))
imap_port = db.Column(db.Integer)
imap_username = db.Column(db.String(255))
imap_password_encrypted = db.Column(db.Text) # AES-256 Fernet
# Gmail OAuth
gmail_token_encrypted = db.Column(db.Text)
gmail_refresh_token_encrypted = db.Column(db.Text)
# Auto-fetch
auto_fetch = db.Column(db.Boolean, default=False)
fetch_interval = db.Column(db.Integer, default=300) # seconds
last_fetch = db.Column(db.DateTime)
# Relationships
messages = db.relationship('EmailMessage', backref='account')
| # | שם טבלה | מטרה |
|---|---|---|
| 1 | scanned_documents | מסמכים סרוקים |
| 2 | document_categories | קטגוריות |
| 3 | document_ocr_jobs | תורי OCR |
| 4 | document_line_items | שורות חשבונית |
| 5 | document_exports | היסטוריית ייצוא |
| 6 | document_notifications | התראות |
| 7 | document_shares | שיתוף מסמכים |
| 8 | document_versions | גרסאות |
| 9 | scanning_companies | חברות (Multi-tenant) |
| 10 | email_accounts | חשבונות אימייל |
| 11 | email_messages | הודעות אימייל |
| 12 | email_attachments | קבצים מצורפים |
| 13 | email_processing_rules | כללי אוטומציה |
| 14 | email_fetch_logs | לוגים |
| 15 | documents | מסמכים כללי |
| 16 | portal_documents | מסמכי פורטל |
from flask import Blueprint, request, jsonify
from app.models_email_scanning import ScannedDocument
from app import db
from flask_login import login_required, current_user
email_scanning_api = Blueprint('email_scanning_api', __name__)
@email_scanning_api.route('/api/email-scanning/documents', methods=['GET'])
@login_required
def get_documents():
"""
Get list of documents with pagination and filtering
"""
# Get query parameters
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
category_id = request.args.get('category_id', type=int)
search = request.args.get('search', '')
# Build query
query = ScannedDocument.query.filter_by(user_id=current_user.id)
# Apply filters
if category_id:
query = query.filter_by(category_id=category_id)
if search:
query = query.filter(
db.or_(
ScannedDocument.original_filename.ilike(f'%{search}%'),
ScannedDocument.supplier_name.ilike(f'%{search}%'),
ScannedDocument.invoice_number.ilike(f'%{search}%')
)
)
# Paginate
pagination = query.order_by(
ScannedDocument.upload_date.desc()
).paginate(page=page, per_page=per_page, error_out=False)
# Serialize results
documents = [{
'id': doc.id,
'filename': doc.filename,
'original_filename': doc.original_filename,
'upload_date': doc.upload_date.isoformat() if doc.upload_date else None,
'category_name': doc.category.name if doc.category else None,
'status': doc.status,
'total_amount': float(doc.total_amount) if doc.total_amount else None
} for doc in pagination.items]
# Return success response
return success_response({
'documents': documents,
'total': pagination.total,
'pages': pagination.pages,
'current_page': page,
'per_page': per_page
})
def success_response(data=None, message='Success', status_code=200):
"""Helper function for consistent API responses"""
response = {
'success': True,
'message': message
}
if data is not None:
response['data'] = data
return jsonify(response), status_code
@login_required decorator לכל ה-endpointssuccess_response()class EmailScanningManager {
constructor() {
this.documents = [];
this.categories = [];
this.currentPage = 1;
this.totalPages = 1;
this.selectedCategory = null;
}
async init() {
console.log('🔧 Initializing EmailScanning Manager...');
try {
// Load initial data
await this.loadCategories();
await this.loadDocuments();
// Setup event listeners
this.setupEventListeners();
console.log('✅ EmailScanning Manager initialized');
} catch (error) {
console.error('❌ Initialization failed:', error);
}
}
async loadDocuments(page = 1) {
try {
const response = await fetch(
`/api/email-scanning/documents?page=${page}&per_page=20`,
{
method: 'GET',
credentials: 'include',
headers: {
'Content-Type': 'application/json'
}
}
);
if (!response.ok) {
throw new Error(`HTTP ${response.status}`);
}
const result = await response.json();
// IMPORTANT: Unwrap success_response
const data = result.data || result;
this.documents = Array.isArray(data)
? data
: (data.documents || []);
this.totalPages = data.pages || 1;
this.currentPage = page;
this.renderDocuments();
} catch (error) {
console.error('Error loading documents:', error);
this.showError('שגיאה בטעינת מסמכים');
}
}
renderDocuments() {
const container = document.getElementById('documents-container');
if (!container) return;
container.innerHTML = '';
this.documents.forEach(doc => {
const docCard = this.createDocumentCard(doc);
container.appendChild(docCard);
});
}
createDocumentCard(doc) {
const card = document.createElement('div');
card.className = 'document-card';
card.innerHTML = `
${doc.original_filename || doc.filename}
${doc.status}
ספק: ${doc.supplier_name || 'לא זוהה'}
סכום: ${doc.total_amount ? `₪${doc.total_amount}` : '-'}
תאריך: ${this.formatDate(doc.upload_date)}
`;
return card;
}
setupEventListeners() {
// Upload button
const uploadBtn = document.getElementById('upload-btn');
if (uploadBtn) {
uploadBtn.addEventListener('click', () => this.showUploadModal());
}
// Category filter
const categoryFilter = document.getElementById('category-filter');
if (categoryFilter) {
categoryFilter.addEventListener('change', (e) => {
this.selectedCategory = e.target.value;
this.loadDocuments(1);
});
}
}
showError(message) {
// Show toast notification
const toast = document.createElement('div');
toast.className = 'toast-notification error';
toast.textContent = message;
document.body.appendChild(toast);
setTimeout(() => toast.remove(), 3000);
}
}
// Initialize on page load
const manager = new EmailScanningManager();
document.addEventListener('DOMContentLoaded', () => manager.init());
הAPI מחזיר תשובות עטופות ב-success_response, לכן יש תמיד לחלץ את הנתונים מ-result.data:
// ✅ CORRECT const result = await response.json(); const data = result.data || result; this.documents = Array.isArray(data) ? data : (data.documents || []); // ❌ WRONG const data = await response.json(); this.documents = data.documents; // Will fail!
המערכת תומכת ב-3 ספקי OCR עם fallback אוטומטי:
import pytesseract
from PIL import Image
def extract_text_tesseract(image_path, lang='heb+eng'):
"""Extract text using Tesseract OCR"""
try:
image = Image.open(image_path)
text = pytesseract.image_to_string(image, lang=lang)
confidence = pytesseract.image_to_data(image, output_type='dict')
avg_conf = sum(confidence['conf']) / len(confidence['conf'])
return {
'text': text,
'confidence': avg_conf / 100,
'provider': 'tesseract'
}
except Exception as e:
logger.error(f"Tesseract OCR failed: {e}")
return None
from google.cloud import vision
def extract_text_google_vision(image_path):
"""Extract text using Google Cloud Vision"""
try:
client = vision.ImageAnnotatorClient()
with open(image_path, 'rb') as image_file:
content = image_file.read()
image = vision.Image(content=content)
response = client.document_text_detection(image=image)
if response.error.message:
raise Exception(response.error.message)
text = response.full_text_annotation.text
confidence = response.full_text_annotation.pages[0].confidence
return {
'text': text,
'confidence': confidence,
'provider': 'google_vision'
}
except Exception as e:
logger.error(f"Google Vision OCR failed: {e}")
return None
from celery import shared_task
@shared_task(bind=True, max_retries=3)
def process_document_ocr(self, document_id, provider='auto'):
"""
Celery task to process document OCR asynchronously
"""
try:
document = ScannedDocument.query.get(document_id)
if not document:
logger.error(f"Document {document_id} not found")
return
# Update status
document.ocr_status = 'processing'
db.session.commit()
# Perform OCR with fallback
result = None
providers = ['tesseract', 'google_vision', 'aws_textract']
if provider != 'auto':
providers = [provider]
for prov in providers:
logger.info(f"Trying OCR provider: {prov}")
result = perform_ocr(document.file_path, prov)
if result and result['confidence'] > 0.7:
break
if not result:
raise Exception("All OCR providers failed")
# Extract financial data
extracted_data = extract_invoice_data(result['text'])
# Update document
document.extracted_text = result['text']
document.ocr_confidence = result['confidence']
document.ocr_provider = result['provider']
document.ocr_status = 'completed'
# Update extracted fields
document.supplier_name = extracted_data.get('supplier')
document.invoice_number = extracted_data.get('invoice_number')
document.total_amount = extracted_data.get('total_amount')
db.session.commit()
logger.info(f"OCR completed for document {document_id}")
except Exception as e:
logger.error(f"OCR failed for document {document_id}: {e}")
document.ocr_status = 'failed'
db.session.commit()
# Retry with exponential backoff
raise self.retry(exc=e, countdown=60 * (2 ** self.request.retries))
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
def fetch_gmail_messages(account):
"""Fetch messages from Gmail using OAuth 2.0"""
try:
# Decrypt stored tokens
access_token = decrypt(account.gmail_token_encrypted)
refresh_token = decrypt(account.gmail_refresh_token_encrypted)
# Create credentials
creds = Credentials(
token=access_token,
refresh_token=refresh_token,
token_uri='https://oauth2.googleapis.com/token',
client_id=app.config['GMAIL_CLIENT_ID'],
client_secret=app.config['GMAIL_CLIENT_SECRET']
)
# Build Gmail service
service = build('gmail', 'v1', credentials=creds)
# Fetch messages
results = service.users().messages().list(
userId='me',
labelIds=['INBOX'],
q='has:attachment',
maxResults=50
).execute()
messages = results.get('messages', [])
# Process each message
for msg_meta in messages:
msg = service.users().messages().get(
userId='me',
id=msg_meta['id'],
format='full'
).execute()
# Save to database
save_email_message(account.id, msg)
# Download attachments
process_attachments(account.id, msg)
# Update last fetch
account.last_fetch = datetime.utcnow()
db.session.commit()
except Exception as e:
logger.error(f"Gmail fetch failed: {e}")
raise
import imaplib
import email
def fetch_imap_messages(account):
"""Fetch messages from IMAP server"""
try:
# Decrypt password
password = decrypt(account.imap_password_encrypted)
# Connect to IMAP server
if account.use_ssl:
imap = imaplib.IMAP4_SSL(account.imap_server, account.imap_port)
else:
imap = imaplib.IMAP4(account.imap_server, account.imap_port)
# Login
imap.login(account.imap_username, password)
# Select INBOX
imap.select('INBOX')
# Search for unread messages with attachments
_, message_numbers = imap.search(None, 'UNSEEN')
for num in message_numbers[0].split():
# Fetch message
_, msg_data = imap.fetch(num, '(RFC822)')
email_body = msg_data[0][1]
message = email.message_from_bytes(email_body)
# Save to database
save_email_message(account.id, message)
# Process attachments
if message.is_multipart():
for part in message.walk():
if part.get_content_disposition() == 'attachment':
save_attachment(account.id, part)
# Logout
imap.close()
imap.logout()
# Update last fetch
account.last_fetch = datetime.utcnow()
db.session.commit()
except Exception as e:
logger.error(f"IMAP fetch failed: {e}")
raise
כל הסיסמאות וה-tokens מוצפנים באמצעות AES-256 Fernet לפני השמירה במסד הנתונים.
מפתח ההצפנה נשמר ב-ENCRYPTION_KEY environment variable.
version: '3.8'
services:
web:
build: .
container_name: techlabs-web
volumes:
- ./app:/app/app
- ./logs:/app/logs
- ./uploads:/app/uploads
environment:
- FLASK_ENV=production
- DATABASE_URL=postgresql://user:pass@db:5432/techlabs
- REDIS_URL=redis://redis:6379/0
- ENCRYPTION_KEY=${ENCRYPTION_KEY}
depends_on:
- db
- redis
labels:
- "traefik.enable=true"
- "traefik.http.routers.techlabs.rule=Host(`labs.levor.io`)"
- "traefik.http.routers.techlabs.tls=true"
- "traefik.http.routers.techlabs.tls.certresolver=letsencrypt"
db:
image: postgres:15
container_name: techlabs-db
volumes:
- postgres_data:/var/lib/postgresql/data
environment:
- POSTGRES_DB=techlabs
- POSTGRES_USER=user
- POSTGRES_PASSWORD=password
redis:
image: redis:7-alpine
container_name: techlabs-redis
celery:
build: .
command: celery -A app.celery worker --loglevel=info
depends_on:
- redis
- db
environment:
- FLASK_ENV=production
volumes:
postgres_data:
# Build and start docker-compose up -d --build # View logs docker-compose logs -f web # Run migrations docker exec techlabs-web flask db upgrade # Restart services docker-compose restart # Stop docker-compose down
import pytest
from app import create_app, db
from app.models_email_scanning import ScannedDocument, DocumentCategory
@pytest.fixture
def app():
app = create_app('testing')
with app.app_context():
db.create_all()
yield app
db.session.remove()
db.drop_all()
@pytest.fixture
def client(app):
return app.test_client()
def test_create_document(client, app):
"""Test document creation"""
with app.app_context():
doc = ScannedDocument(
filename='test.pdf',
original_filename='חשבונית.pdf',
user_id=1
)
db.session.add(doc)
db.session.commit()
assert doc.id is not None
assert doc.filename == 'test.pdf'
def test_api_get_documents(client):
"""Test GET /api/email-scanning/documents"""
# Login first
client.post('/auth/login', data={
'email': 'test@example.com',
'password': 'password'
})
# Make API request
response = client.get('/api/email-scanning/documents')
assert response.status_code == 200
data = response.get_json()
assert data['success'] == True
assert 'data' in data
# Run tests
# pytest tests/ -v
#!/bin/bash
# test_api.sh
BASE_URL="https://labs.levor.io"
# Login and get cookie
curl -c cookies.txt -X POST \
${BASE_URL}/auth/login \
-d "email=admin@techlab.co.il" \
-d "password=yourpassword"
# Test document endpoint
curl -b cookies.txt \
${BASE_URL}/api/email-scanning/documents | jq
# Test upload
curl -b cookies.txt -X POST \
${BASE_URL}/api/email-scanning/documents/upload \
-F "file=@invoice.pdf" \
-F "category_id=1" | jq
api_email_scanning.py:@email_scanning_api.route('/api/email-scanning/custom-report', methods=['GET'])
@login_required
def custom_report():
"""Your custom endpoint"""
# Your logic here
data = {
'report': 'Your data'
}
return success_response(data)
async loadCustomReport() {
const response = await fetch('/api/email-scanning/custom-report', {
credentials: 'include'
});
const result = await response.json();
const data = result.data || result;
console.log(data.report);
}
def extract_text_custom_provider(image_path):
"""
Add your custom OCR provider
"""
try:
# Your OCR logic
text = your_ocr_api.extract(image_path)
return {
'text': text,
'confidence': 0.95,
'provider': 'custom'
}
except Exception as e:
logger.error(f"Custom OCR failed: {e}")
return None
# Register in ocr_service.py
OCR_PROVIDERS = {
'tesseract': extract_text_tesseract,
'google_vision': extract_text_google_vision,
'aws_textract': extract_text_aws_textract,
'custom': extract_text_custom_provider # ← Add here
}
עכשיו אתה מכיר את כל המערכת ויכול להתחיל לפתח. בהצלחה!