Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions computing/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,12 @@
from .surface_water_bodies.merge_swb_ponds import merge_swb_ponds
from utilities.auth_check_decorator import api_security_check
from computing.layer_dependency.layer_generation_in_order import layer_generate_map
from .views import layer_status, get_layers_of_workspace, check_missing_layers
from .views import (
layer_status,
get_layers_of_workspace,
missing_layer_for_all_workspace,
clear_layer_cache,
)
from .misc.lcw_conflict import generate_lcw_conflict_data
from .misc.agroecological_space import generate_agroecological_data
from .misc.factory_csr import generate_factory_csr_data
Expand Down Expand Up @@ -1877,14 +1882,20 @@ def sync_layer_remote(request):
@schema(None)
def missing_layers(request):
try:
workspace = request.query_params.get("workspace").lower()
result = check_missing_layers(workspace)
result = missing_layer_for_all_workspace()
return Response({"result": result}, status=status.HTTP_200_OK)
except Exception as e:
print("Exception in get_layers_for_workspace api :: ", e)
return Response({"Exception": e}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)


@api_view(["GET"])
@schema(None)
def refresh_layer_cache(request, workspace=None):
clear_layer_cache(workspace)
return Response({"message": f"Cache cleared for: {workspace or 'all workspaces'}"})


@api_view(["POST"])
@schema(None)
def generate_fabdem_layer(request):
Expand Down
68 changes: 60 additions & 8 deletions computing/lulc_X_terrain/lulc_on_plain_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
sync_layer_to_geoserver,
save_layer_info_to_db,
update_layer_sync_status,
create_chunk,
merge_chunks,
)
from utilities.gee_utils import (
ee_initialize,
Expand All @@ -13,9 +15,10 @@
is_gee_asset_exists,
export_vector_asset_to_gee,
make_asset_public,
get_gee_dir_path,
)
from .utils import aez_lulcXterrain_cluster_centroids, process_mws, calculate_area
from utilities.constants import AEZ
from utilities.constants import AEZ, GEE_HELPER_PATH


@app.task(bind=True)
Expand All @@ -28,7 +31,7 @@ def lulc_on_plain_cluster(
valid_gee_text(district.lower())
+ "_"
+ valid_gee_text(block.lower())
+ "_lulcXplains_clusters"
+ "_lulcXplains_clusters_bk02_june"
)
asset_id = get_gee_asset_path(state, district, block) + asset_description

Expand Down Expand Up @@ -81,13 +84,62 @@ def lulc_on_plain_cluster(
)
plain_centroids = aez_lulcXterrain_cluster_centroids[f"aez{aez_no}"]["plains"]

result = process_feature_collection(
plain_mwsheds, study_area_landforms, study_area_lulc, plain_centroids
chunk_size = 50
rois, descs = create_chunk(mwsheds, asset_description, chunk_size)


tasks = []
temp_assets = []
for roi, desc in zip(rois, descs):
chunk_with_clusters = process_mws(roi)
plain_chunk = chunk_with_clusters.filter(
ee.Filter.neq("terrain_cluster", 2)
)


result_chunk = process_feature_collection(
plain_chunk, study_area_landforms, study_area_lulc, plain_centroids
)

chunk_asset_id = get_gee_dir_path([state, district, block], GEE_HELPER_PATH) + desc
temp_assets.append(chunk_asset_id)


task = export_vector_asset_to_gee(
result_chunk, desc, chunk_asset_id
)
if task:
tasks.append(task)


print("Started all chunk tasks")
task_id_list = check_task_status(tasks)
print("All chunk tasks completed:", task_id_list)


# Merge all chunks into one feature collection
print("Starting merge task")
final_task_id = merge_chunks(
mwsheds,
[state, district, block],
asset_description,
chunk_size,
merge_asset_id=asset_id,
)
print("Processing completed successfully")
task = export_vector_asset_to_gee(result, asset_description, asset_id)
task_id_list = check_task_status([task])
print("lulc_on_slope_cluster task completed - task_id_list:", task_id_list)
if final_task_id:
final_task_status = check_task_status([final_task_id])
print("Final merge task completed:", final_task_status)


# Clean up temporary assets
for chunk_id in temp_assets:
if is_gee_asset_exists(chunk_id):
try:
ee.data.deleteAsset(chunk_id)
print(f"Deleted temp asset {chunk_id}")
except Exception as e:
print(f"Failed to delete {chunk_id}: {e}")


layer_at_geoserver = False
if is_gee_asset_exists(asset_id):
Expand Down
35 changes: 15 additions & 20 deletions computing/misc/facilities_proximity.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,6 @@
Filters village facilities data from GEE by tehsil boundary and exports to GEE asset + GeoServer.
Uses admin boundary clipping (spatial filtering) for fast server-side processing.

Usage:
python -c "
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'nrm_app.settings')
import django
django.setup()
from computing.misc.facilities_proximity import generate_facilities_proximity
generate_facilities_proximity('Odisha', 'Koraput', 'Jaypur', gee_account_id=1)
"

GEE Asset: projects/corestack-datasets/assets/datasets/pan_india_facilities
"""

Expand Down Expand Up @@ -65,9 +55,22 @@ def _dissolve_admin_boundary(admin_boundary):
Merge repeated admin rows with the same village properties into one geometry.

This preserves full village shapes while preventing split polygon parts from
producing repeated output rows with identical attributes.
producing repeated output rows with identical attributes. Includes a schema
validation check to discard malformed village rows missing expected properties.
"""
admin_export_fc = admin_boundary.select(
# Filter out any feature that does not contain ALL required source fields
def filter_complete_schemas(feature):
props = feature.propertyNames()
has_all_fields = ee.List(ADMIN_BOUNDARY_SOURCE_FIELDS).map(
lambda field: props.contains(field)
).reduce(ee.Reducer.min())
return feature.set('has_complete_schema', has_all_fields)

filtered_admin = admin_boundary.map(filter_complete_schemas).filter(
ee.Filter.eq('has_complete_schema', 1)
)

admin_export_fc = filtered_admin.select(
ADMIN_BOUNDARY_SOURCE_FIELDS,
ADMIN_BOUNDARY_EXPORT_FIELDS,
)
Expand Down Expand Up @@ -143,14 +146,6 @@ def generate_facilities_proximity(state, district, block, gee_account_id):
"""
Generate facilities proximity layer for a tehsil/block.

Steps:
1. Initialize GEE
2. Create Output Asset ID
3. Filter facilities by admin boundary (spatial clipping)
4. Export as GEE asset
5. Make asset public and Register in database
6. Sync to GeoServer

Args:
state: State name (e.g., "Odisha")
district: District name (e.g., "Koraput")
Expand Down
4 changes: 4 additions & 0 deletions computing/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,4 +238,8 @@
api.generate_canal_vector,
name="generate-canal-vector",
),
path("refresh_cache/", api.refresh_layer_cache, name="refresh_cache"),
path(
"refresh_cache/<str:workspace>/", api.refresh_layer_cache, name="refresh_cache"
),
]
46 changes: 46 additions & 0 deletions computing/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
valid_gee_text,
)
from utilities.geoserver_utils import Geoserver
from django.core.mail import EmailMessage
import time

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -1030,3 +1032,47 @@ def update_layer_sync_status(

except Exception as e:
print(f"Error updating layer sync status: {e}")


# send missing layer to recipient email
def send_missing_layers_report(result: dict, recipients: list = None) -> bool:
if recipients is None:
recipients = getattr(settings, "MISSING_LAYER_RECIPIENTS", [])

if isinstance(recipients, str):
recipients = [recipients]

if not recipients:
logger.error("No recipients configured for missing layers report.")
return False

try:
email = EmailMessage(
subject="Missing Layers Report",
body=json.dumps(result, indent=4),
from_email=settings.EMAIL_HOST_USER,
to=recipients,
)
email.send()
logger.info(f"Missing layers report sent to {recipients}")
return True
except Exception as e:
logger.exception(f"Failed to send missing layers report: {e}")
return False


def _is_cache_valid(cache: dict, workspace: str) -> bool:
if workspace not in cache:
return False
age = time.time() - cache[workspace]["cached_at"]
if age > 3600:
logger.info(f"Cache expired for {workspace} (age: {int(age)}s)")
return False
return True


def _set_cache(cache: dict, workspace: str, data: set):
cache[workspace] = {
"data": data,
"cached_at": time.time(),
}
Loading