Skip to content
Snippets Groups Projects
Commit 467e4fb1 authored by Sayan Mandal's avatar Sayan Mandal
Browse files

Upload download and merge file

parent 6a6b70e4
Branches
No related tags found
No related merge requests found
import os
import datetime
import requests
import rasterio
import numpy as np
from rasterio.merge import merge
from rasterio.warp import calculate_default_transform, reproject, Resampling
import matplotlib.pyplot as plt
# Configuration
EARTHDATA_TOKEN = os.getenv("EARTHDATA_TOKEN")
CMR_SEARCH_URL = "https://cmr.earthdata.nasa.gov/search/granules.json"
TILE_NAME = "T20MRC"
start = datetime.datetime(2019, 7, 1)
end = datetime.datetime(2023, 3, 1)
cloud_cover_threshold = 50
output_dir = "./hls_merged"
temp_dir = "./temp_hls"
os.makedirs(output_dir, exist_ok=True)
os.makedirs(temp_dir, exist_ok=True)
BANDS_L30 = ["B02", "B03", "B04", "B05", "B06", "B07", "Fmask"]
BANDS_S30 = ["B02", "B03", "B04", "B8A", "B11", "B12", "Fmask"]
NODATA_VALUE = -9999.0
SCALE_FACTOR = 0.0001
# -------------------------------------------------------------
# 1. SEARCH for GRANULES
# -------------------------------------------------------------
def search_hls_granules_by_tile(product_short_name, tile_name, start_date, end_date):
if product_short_name == "HLSL30":
granule_prefix = "HLS.L30"
else:
granule_prefix = "HLS.S30"
params = {
"short_name": product_short_name,
"temporal": f"{start_date.isoformat()}Z,{end_date.isoformat()}Z",
"readable_granule_name": f"{granule_prefix}.{tile_name}.*",
"options[readable_granule_name][pattern]": "true",
"cloud_cover": f"0,{cloud_cover_threshold}",
"page_size": 200,
}
headers = {"Authorization": f"Bearer {EARTHDATA_TOKEN}"}
r = requests.get(CMR_SEARCH_URL, headers=headers, params=params)
if r.status_code == 200:
granules = r.json().get("feed", {}).get("entry", [])
print(f"Found {len(granules)} granules for {product_short_name} tile {tile_name}.")
return granules
else:
print(f"Error searching granules: {r.status_code} - {r.text}")
return []
# -------------------------------------------------------------
# 2. DOWNLOAD HELPERS
# -------------------------------------------------------------
def download_band_tif(url, out_path):
"""Download a single band TIF file, if it doesn't already exist."""
if os.path.exists(out_path):
return out_path
headers = {"Authorization": f"Bearer {EARTHDATA_TOKEN}"}
with requests.get(url, headers=headers, stream=True) as r:
if r.status_code == 200:
with open(out_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
return out_path
else:
print(f"Failed to download {url}: {r.status_code}, {r.text}")
return None
def download_bands_for_granule(granule):
"""Download all needed bands for a single granule (based on L30 or S30)."""
granule_id = granule["producer_granule_id"]
needed_bands = BANDS_L30 if "HLS.L30" in granule_id else BANDS_S30
band_paths = {}
links = granule.get("links", [])
if not links:
print(f"No links in granule {granule_id}")
return None
for band in needed_bands:
found_link = None
for link in links:
href = link.get("href", "").lower()
if href.endswith(f".{band.lower()}.tif"):
found_link = link.get("href")
break
if not found_link:
print(f"Band {band} not found in granule {granule_id}.")
return None
out_name = f"{granule_id}_{band}.tif"
out_path = os.path.join(temp_dir, out_name)
result = download_band_tif(found_link, out_path)
if not result:
return None
band_paths[band] = result
return band_paths
# -------------------------------------------------------------
# 3. STACKING & FMASK
# -------------------------------------------------------------
def stack_and_scale_bands(band_paths_dict):
"""Stack reflectance bands + Fmask into a single multiband array, applying scale factor."""
fmask_key = "Fmask"
reflectance_keys = [b for b in band_paths_dict if b != fmask_key]
# Sort reflectance bands in a known order
def band_sort_key(b):
order_map = {"B02": 2, "B03": 3, "B04": 4, "B05": 5,
"B06": 6, "B07": 7, "B8A": 8.1, "B11": 11, "B12": 12}
return order_map.get(b, 999)
reflectance_keys.sort(key=band_sort_key)
band_order = reflectance_keys + [fmask_key]
stacked_arrays = []
profile = None
for b in band_order:
path = band_paths_dict[b]
with rasterio.open(path) as ds:
if b != fmask_key:
arr = ds.read(1).astype(np.float32)
else:
arr = ds.read(1).astype(np.float32)
# print(f"Fmask unique values for band {b}: {np.unique(arr)}")
arr[arr < 128.0] = 0.0
arr[arr >= 128.0] = 255.0
# # Create a figure with two subplots: one for the image and one for the histogram
# fig, axs = plt.subplots(1, 2, figsize=(14, 6))
# # Plot the Fmask as an image
# axs[0].imshow(arr, cmap="viridis")
# axs[0].set_title("fmask Image")
# axs[0].axis("off")
# # Compute the histogram
# unique_values, counts = np.unique(arr, return_counts=True)
# # Plot the histogram
# axs[1].bar(unique_values, counts, color="blue", alpha=0.7)
# axs[1].set_title("fmask Histogram")
# axs[1].set_xlabel("Fmask Values")
# axs[1].set_ylabel("Frequency")
# axs[1].set_xticks(unique_values)
# axs[1].grid(axis="y", linestyle="--", alpha=0.7)
# # Adjust layout and display the plot
# plt.tight_layout()
# plt.show()
if profile is None:
profile = ds.profile.copy()
# Temporarily replace NODATA_VALUE with np.nan
arr[arr == NODATA_VALUE] = np.nan
# Scale if it's a reflectance band
if b != fmask_key:
arr = np.clip(arr, 0.0, 10000.0) * SCALE_FACTOR
# else:
# print(f"Fmask unique values for band {b}: {np.unique(arr)}")
stacked_arrays.append(arr)
# Convert list of arrays to a single stacked array: (bands, rows, cols)
stacked_data = np.stack(stacked_arrays, axis=0)
# Put back the NODATA_VALUE where we have NaN
stacked_data[np.isnan(stacked_data)] = NODATA_VALUE
return stacked_data, profile, band_order
def apply_fmask_in_memory(stacked_data, band_order):
"""
Use the Fmask band to set invalid pixels to NODATA_VALUE.
Fmask codes that indicate clouds/shadows/etc are replaced.
"""
fmask_idx = band_order.index("Fmask")
fmask = stacked_data[fmask_idx]
# According to HLS docs, Fmask values: 0=clear,1=water,2=cloud,3=shadow,4=snow,255=fill
# print('apply_fmask_in_memory: ', np.unique(fmask))
invalid = np.isin(fmask, [255])
for i, b in enumerate(band_order):
if b != "Fmask":
stacked_data[i][invalid] = NODATA_VALUE
return stacked_data
def process_one_granule(granule):
"""Download, stack, scale, apply Fmask, and write a single multiband TIF for a granule."""
granule_id = granule["producer_granule_id"]
band_paths = download_bands_for_granule(granule)
if not band_paths:
return None
stacked_data, profile, band_order = stack_and_scale_bands(band_paths)
stacked_data = apply_fmask_in_memory(stacked_data, band_order)
out_path = os.path.join(temp_dir, f"{granule_id}_masked_multiband.tif")
profile.update({"count": stacked_data.shape[0], "nodata": NODATA_VALUE, "dtype": "float32"})
with rasterio.open(out_path, "w", **profile) as dst:
dst.write(stacked_data.astype(np.float32))
return out_path
# -------------------------------------------------------------
# 4. GET YEAR-MONTH to group
# -------------------------------------------------------------
def get_year_month(granule):
"""
Attempt to parse 'time_start' in ISO format (YYYY-MM-DDTHH:MM:SSZ).
If that fails or is missing, parse the granule_id's Julian date to
produce a YYYY-MM string.
"""
time_str = granule.get("time_start", None)
if time_str is not None:
# Try standard ISO parse
try:
dt = datetime.datetime.fromisoformat(time_str.replace("Z", ""))
print(f"[DEBUG] dt={dt}, => Month={dt.strftime('%Y-%m')}")
return dt.strftime("%Y-%m")
except ValueError:
pass
# Fallback: parse from producer_granule_id
granule_id = granule["producer_granule_id"]
# Example: HLS.L30.T22MBA.2020169T134142.v2.0...
parts = granule_id.split(".")
if len(parts) < 4:
print(f"Warning: Unable to parse date from granule_id={granule_id}")
return "unknown"
date_part = parts[3] # e.g., "2020169T134142"
year = int(date_part[:4])
jday = int(date_part[4:7]) # e.g., "169"
dt = datetime.datetime(year, 1, 1) + datetime.timedelta(days=jday - 1)
print(f"[DEBUG] Year={year}, JulianDay={jday}, dt={dt}, => Month={dt.strftime('%Y-%m')}")
return dt.strftime("%Y-%m")
# --- 1) Sort by descending most number of valid pixels so that the best scene is 'base' ---
def count_valid_pixels(granule):
granule_path = process_one_granule(granule)
if not granule_path:
return 0 # If the granule can't be processed, return 0 valid pixels
with rasterio.open(granule_path) as ds:
fmask_band = ds.read(ds.count) # Assume Fmask is the last band
valid_pixels = np.isin(fmask_band, [0]) # Clear or water
return np.sum(valid_pixels)
# Sort granules by cloud cover (best coverage first)
# granules_sorted = sorted(granules, key=count_valid_pixels)
def plot_merge_debug_with_fmask(base_rgb, new_rgb, updated_rgb, base_fmask, new_fmask, update_mask,
title_base, title_new, title_updated):
"""
Plot the base image, new granule, updated base image, and their Fmasks with invalid pixels highlighted.
Parameters:
- base_rgb: RGB bands of the base image before update.
- new_rgb: RGB bands of the new granule being merged.
- updated_rgb: RGB bands of the updated base image after merging.
- base_fmask: Fmask of the base image.
- new_fmask: Fmask of the new granule.
- update_mask: Boolean mask of pixels being updated.
- title_base: Title for the base image.
- title_new: Title for the new granule.
- title_updated: Title for the updated base image.
"""
fig, axs = plt.subplots(2, 3, figsize=(18, 10))
# Plot the RGB images
axs[0, 0].imshow(base_rgb.transpose(1, 2, 0) * 3.5)
axs[0, 0].set_title(title_base)
axs[0, 0].axis("off")
axs[0, 1].imshow(new_rgb.transpose(1, 2, 0) * 3.5)
axs[0, 1].set_title(title_new)
axs[0, 1].axis("off")
axs[0, 2].imshow(updated_rgb.transpose(1, 2, 0) * 3.5)
axs[0, 2].set_title(title_updated)
axs[0, 2].axis("off")
# Plot the Fmasks
base_fmask_highlight = np.zeros_like(base_fmask, dtype=np.uint8)
base_fmask_highlight[np.isin(base_fmask, [2, 3, 4, 255])] = 255 # Red for invalid
axs[1, 0].imshow(base_fmask_highlight, cmap="gray")
axs[1, 0].set_title("Base Fmask")
axs[1, 0].axis("off")
new_fmask_highlight = np.zeros_like(new_fmask, dtype=np.uint8)
new_fmask_highlight[np.isin(new_fmask, [2, 3, 4, 255])] = 255 # Red for invalid
axs[1, 1].imshow(new_fmask_highlight, cmap="gray")
axs[1, 1].set_title("New Granule Fmask")
axs[1, 1].axis("off")
# Plot the update mask
update_mask_highlight = np.zeros_like(update_mask, dtype=np.uint8)
update_mask_highlight[update_mask] = 255 # Red for updated pixels
axs[1, 2].imshow(update_mask_highlight, cmap="gray")
axs[1, 2].set_title("Update Mask")
axs[1, 2].axis("off")
plt.tight_layout()
plt.show()
# -------------------------------------------------------------
# 5. MERGING (with Reprojection)
# -------------------------------------------------------------
def merge_granules(granules_l30, granules_s30, month_str):
"""
Pixel-wise 'best coverage' mosaic with detailed Fmask visualization.
Overwrite the mosaic pixel if the new granule is clearer (Fmask=0 or 1)
and the mosaic pixel is either nodata or cloud. Update Fmask for replaced pixels.
"""
granules = granules_l30 + granules_s30
if not granules:
print(f"No granules to merge for {month_str}.")
return
# Sort granules by most valid pixels
granules_sorted = sorted(granules, key=count_valid_pixels)
# Process the first granule (base granule)
first_path = process_one_granule(granules_sorted[-1])
if not first_path:
return
with rasterio.open(first_path) as ds_base:
mosaic_data = ds_base.read() # shape: (band_count, height, width)
mosaic_profile = ds_base.profile
base_transform = ds_base.transform
base_crs = ds_base.crs
mosaic_fmask = mosaic_data[-1] # Assume Fmask is the last band
# For each subsequent granule
for granule in granules_sorted[1:]:
path = process_one_granule(granule)
if not path:
continue
with rasterio.open(path) as ds:
if ds.crs != base_crs or ds.transform != base_transform:
reprojected_data = np.full_like(mosaic_data, NODATA_VALUE, dtype=mosaic_data.dtype)
reproject(
source=ds.read(),
destination=reprojected_data,
src_transform=ds.transform,
src_crs=ds.crs,
dst_transform=base_transform,
dst_crs=base_crs,
resampling=Resampling.nearest
)
else:
reprojected_data = ds.read()
# Extract the Fmask band from the new granule
new_fmask = reprojected_data[-1] # Assume Fmask is the last band
# Identify invalid pixels in the base mosaic (either NODATA or invalid Fmask)
# print('mosaic fmask unique: ', np.unique(mosaic_fmask))
mosaic_invalid_fmask = (mosaic_fmask == 255.0)
mosaic_invalid_bands = (mosaic_data[:-1] == NODATA_VALUE).any(axis=0) # Exclude the Fmask band
mosaic_invalid = mosaic_invalid_fmask | mosaic_invalid_bands # Combine conditions
# print('mosaic_invalid unique: ', np.unique(mosaic_invalid))
# Identify valid and clear pixels in the new granule
# print('new_fmask unique: ', np.unique(new_fmask))
new_valid_fmask = (new_fmask != 255) # Valid Fmask pixels
new_valid_bands = (reprojected_data[:-1] != NODATA_VALUE).all(axis=0) # Exclude Fmask
new_valid_and_clear = new_valid_fmask & new_valid_bands # Combine conditions
# print('new_valid_and_clear unique: ', np.unique(new_valid_and_clear))
# Update mask: Pixels that are invalid in the base but valid in the new granule
update_mask = mosaic_invalid & new_valid_and_clear
# print('update_mask unique: ', np.unique(update_mask))
# Debug visualization before update
base_rgb = mosaic_data[:3].copy() # Extract RGB bands of base image
new_rgb = reprojected_data[:3].copy() # Extract RGB bands of new granule
# Update mosaic data and the Fmask band
for band_i in range(mosaic_data.shape[0]):
mosaic_data[band_i][update_mask] = reprojected_data[band_i][update_mask]
# Update the Fmask band
mosaic_fmask[update_mask] = new_fmask[update_mask]
# Debug visualization with Fmask and update mask
updated_rgb = mosaic_data[:3].copy() # Updated RGB bands of base image
# plot_merge_debug_with_fmask(base_rgb, new_rgb, updated_rgb, mosaic_fmask.copy(), new_fmask.copy(), update_mask.copy(),
# "Base Image (Before)", "New Granule", "Base Image (Updated)")
# Write out the final mosaic
out_path = os.path.join(output_dir, f"{TILE_NAME}_merged_{month_str}.tif")
mosaic_profile.update({"count": mosaic_data.shape[0]})
with rasterio.open(out_path, "w", **mosaic_profile) as dst:
dst.write(mosaic_data)
print(f"Merged file saved: {out_path}")
if __name__ == "__main__":
# 1) Search for L30 & S30 granules
granules_l30 = search_hls_granules_by_tile("HLSL30", TILE_NAME, start, end)
granules_s30 = search_hls_granules_by_tile("HLSS30", TILE_NAME, start, end)
# 2) Group granules by month and year (YYYY-MM)
granules_by_month = {}
for g in granules_l30 + granules_s30:
ym = get_year_month(g)
if ym not in granules_by_month:
granules_by_month[ym] = {"L30": [], "S30": []}
if "HLSL30" in g["dataset_id"]:
granules_by_month[ym]["L30"].append(g)
else:
granules_by_month[ym]["S30"].append(g)
# 3) Merge granules month by month
for month_str, grps in granules_by_month.items():
print(f"\nMerging data for {month_str}")
merge_granules(grps["L30"], grps["S30"], month_str)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment