IGRA

IGRA#

import pandas as pd
import requests, zipfile, io
import re
from datetime import datetime
import numpy as np
# Path to IGRA2 station list
station_list_path = "./data/IGRA/igra2-station-list.txt"
# Read as whitespace-delimited file
# stations = pd.read_csv(
#     station_list_path,
#     delim_whitespace=True,
#     header=None,
#     usecols=[0,1,2,3,4,5,6],
#     names=["id", "lat", "lon", "elev_m", "name1", "name2", "name3"],
#     engine="python"
# )

stations = pd.read_fwf(
    station_list_path,
    header=None,
    names=['id','lat','lon','elev_m','name','datei','datef','records'])

# The station name may be split across multiple columns, so join them
stations["name"] = stations[["name1","name2","name3"]].fillna("").agg(" ".join, axis=1).str.strip()

# Keep only relevant columns
stations = stations[["id","lat","lon","elev_m","name"]]
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
Cell In[3], line 17
     11 stations = pd.read_fwf(
     12     station_list_path,
     13     header=None,
     14     names=['id','lat','lon','elev_m','name','datei','datef','records'])
     16 # The station name may be split across multiple columns, so join them
---> 17 stations["name"] = stations[["name1","name2","name3"]].fillna("").agg(" ".join, axis=1).str.strip()
     19 # Keep only relevant columns
     20 stations = stations[["id","lat","lon","elev_m","name"]]

File ~/opt/anaconda3/envs/jupyterbook_env/lib/python3.11/site-packages/pandas/core/frame.py:4108, in DataFrame.__getitem__(self, key)
   4106     if is_iterator(key):
   4107         key = list(key)
-> 4108     indexer = self.columns._get_indexer_strict(key, "columns")[1]
   4110 # take() does not accept boolean indexers
   4111 if getattr(indexer, "dtype", None) == bool:

File ~/opt/anaconda3/envs/jupyterbook_env/lib/python3.11/site-packages/pandas/core/indexes/base.py:6200, in Index._get_indexer_strict(self, key, axis_name)
   6197 else:
   6198     keyarr, indexer, new_indexer = self._reindex_non_unique(keyarr)
-> 6200 self._raise_if_missing(keyarr, indexer, axis_name)
   6202 keyarr = self.take(indexer)
   6203 if isinstance(key, Index):
   6204     # GH 42790 - Preserve name from an Index

File ~/opt/anaconda3/envs/jupyterbook_env/lib/python3.11/site-packages/pandas/core/indexes/base.py:6249, in Index._raise_if_missing(self, key, indexer, axis_name)
   6247 if nmissing:
   6248     if nmissing == len(indexer):
-> 6249         raise KeyError(f"None of [{key}] are in the [{axis_name}]")
   6251     not_found = list(ensure_index(key)[missing_mask.nonzero()[0]].unique())
   6252     raise KeyError(f"{not_found} not in index")

KeyError: "None of [Index(['name1', 'name2', 'name3'], dtype='object')] are in the [columns]"
stations
# Filter MJO box: 65E–120E, 10S–10N
mjo_stations = stations[
    (stations["lon"] >= 65) & (stations["lon"] <= 160) &
    (stations["lat"] >= -5) & (stations["lat"] <= 5)
].reset_index(drop=True)

mjo_stations
# Suppose you've already built mjo_stations
station = mjo_stations.iloc[0]
station_id = station["id"]

print("Downloading:", station_id, station["name"])

# Build URL
url = f"https://www.ncei.noaa.gov/pub/data/igra/data/data-por/{station_id}-data.txt.zip"
print(url)

# Download and unzip
r = requests.get(url)
z = zipfile.ZipFile(io.BytesIO(r.content))
fname = z.namelist()[0]

# Read lines
lines = z.read(fname).decode("utf-8").splitlines()
print("Number of lines:", len(lines))
print("First few lines:\n", "\n".join(lines[:15]))
import re
from datetime import datetime
import pandas as pd
import numpy as np

QC_LETTER_RE = re.compile(r'[A-Za-z]')

def _to_num(token: str, scale=1.0, missing=-9999, allow_signed=True):
    if token is None:
        return np.nan
    t = QC_LETTER_RE.sub('', token.strip())
    if t == '':
        return np.nan
    try:
        val = int(t) if allow_signed else int(t.replace('-', ''))
    except ValueError:
        return np.nan
    if val == missing:
        return np.nan
    return val / scale

def parse_header(header_line: str):
    stn_id   = header_line[1:12].strip()
    year     = int(header_line[13:17])
    month    = int(header_line[18:20])
    day      = int(header_line[21:23])
    hour     = int(header_line[24:26])
    ts = pd.Timestamp(datetime(year, month, day, hour))
    try:
        approx_levels = int(header_line[27:31].strip())
    except Exception:
        approx_levels = None
    return stn_id, ts, approx_levels

def parse_level_line(ln: str) -> dict:
   return {
       "level_type1": int(ln[0]),
       "level_type2": int(ln[1]),
       "pressure_hPa": _to_num(ln[9:15], scale=100.0, missing=-9999),
       "height_m": _to_num(ln[16:21], scale=1.0, missing=-9999),
       "temp_C": _to_num(ln[22:27], scale=10.0, missing=-9999),
       "rh_pct": _to_num(ln[28:33], scale=10.0, missing=-9999),
       "dewpoint_C": _to_num(ln[34:39], scale=10.0, missing=-9999),
       "wind_dir_deg": _to_num(ln[40:45], scale=1.0, missing=-9999),
       "wind_speed_mps": _to_num(ln[46:51], scale=10.0, missing=-9999),
   }

def split_into_soundings(lines: list[str]):
    soundings = []
    cur_header, cur_levels = None, []
    def flush():
        nonlocal cur_header, cur_levels
        if cur_header is not None and cur_levels:
            soundings.append((cur_header, cur_levels))
        cur_header, cur_levels = None, []
    for ln in lines:
        if not ln.strip():
            continue
        if ln.startswith('#'):
            flush()
            cur_header = ln
        else:
            cur_levels.append(ln)
    flush()
    return soundings

def build_catalog_and_metadata(lines: list[str], drop_empty: bool = True):
    """
    Returns:
      catalog: dict[catalog_key -> DataFrame]
      meta:    DataFrame with per-sounding stats (includes 'catalog_key' and 'status')
    If drop_empty=True, both outputs exclude soundings classified as 'empty'.
    """
    soundings = split_into_soundings(lines)
    catalog = {}
    meta_rows = []

    # handle duplicate timestamps by suffixing keys
    duplicates_count: dict[pd.Timestamp, int] = {}

    for header, level_lines in soundings:
        stn_id, ts, approx_count = parse_header(header)
        rows = [parse_level_line(ln) for ln in level_lines]
        df = pd.DataFrame(rows)
        df.insert(0, "station", stn_id)
        df.insert(1, "time", ts)

        # classify BEFORE dropping anything
        n_valid_pres = int(df["pressure_hPa"].notna().sum())
        n_valid_temp = int(df["temp_C"].notna().sum())
        n_valid_wind = int(df["wind_speed_mps"].notna().sum())

        if n_valid_pres == 0 and n_valid_temp == 0 and n_valid_wind == 0:
            status = "empty"
        elif n_valid_pres == 0 and n_valid_temp == 0 and n_valid_wind > 0:
            status = "winds_only"
        elif n_valid_temp == 0 and n_valid_pres > 0:
            status = "pressure_only"
        else:
            status = "full"

        # now drop rows that are completely useless across all key fields
        df = df.dropna(
            how="all",
            subset=["pressure_hPa","height_m","temp_C","dewpoint_C","wind_dir_deg","wind_speed_mps"]
        ).reset_index(drop=True)

        # sort by pressure if present; else by height if present
        if df["pressure_hPa"].notna().any():
            df = df.sort_values("pressure_hPa", ascending=False).reset_index(drop=True)
        elif df["height_m"].notna().any():
            df = df.sort_values("height_m", ascending=True).reset_index(drop=True)

        # unique catalog key (timestamp + optional suffix)
        if ts in duplicates_count:
            duplicates_count[ts] += 1
            key = f"{ts.isoformat()}.{duplicates_count[ts]}"
        else:
            duplicates_count[ts] = 1
            key = ts.isoformat()

        # store even if empty for now; we'll filter at the end if requested
        catalog[key] = df

        # compute simple stats from the (lightly) cleaned df
        n_levels = len(df)
        p_max = float(df["pressure_hPa"].max()) if df["pressure_hPa"].notna().any() else np.nan
        p_min = float(df["pressure_hPa"].min()) if df["pressure_hPa"].notna().any() else np.nan
        z_top = float(df["height_m"].max()) if df["height_m"].notna().any() else np.nan

        meta_rows.append({
            "catalog_key": key,
            "station": stn_id,
            "time": ts,
            "status": status,
            "approx_levels_header": approx_count,
            "n_levels": n_levels,
            "n_valid_pressure": n_valid_pres,
            "n_valid_temp": n_valid_temp,
            "n_valid_wind": n_valid_wind,
            "p_max_hPa_surface": p_max,
            "p_min_hPa_top": p_min,
            "z_top_m": z_top,
        })

    meta = pd.DataFrame(meta_rows).sort_values("time").reset_index(drop=True)

    if drop_empty:
        # keep only non-empty in meta
        meta = meta[meta["status"] != "empty"].reset_index(drop=True)
        # align catalog to filtered meta
        keep_keys = set(meta["catalog_key"])
        catalog = {k: v for k, v in catalog.items() if k in keep_keys}

    return catalog, meta
catalog, meta = build_catalog_and_metadata(lines, drop_empty=True)
print(meta["status"].value_counts())
print(f"Catalog size: {len(catalog)}  |  Meta rows: {len(meta)}")
print(len(catalog), "soundings parsed")
type(catalog)
# Filter meta for full soundings only
meta_full = meta[meta["status"] == "full"].reset_index(drop=True)

# Build a new catalog with only those keys
catalog_full = {k: catalog[k] for k in meta_full["catalog_key"]}

print(f"Original catalog: {len(catalog)} soundings")
print(f"Full-only subset: {len(catalog_full)} soundings")

# Quick check
#print(meta_full.head())
example_key = meta_full.loc[0, "catalog_key"]
#print(catalog_full[example_key].head())
import plotly.express as px

# Pick one sounding (first entry in the dict)
key, df = next(iter(catalog_full.items()))

# df has columns like ["station","time","level_type","pressure_hPa","height_m","temp_C",...]

# Make a temperature vs. pressure plot (log-y axis, inverted so surface is at bottom)
fig = px.line(
    df,
    x="temp_C",
    y="pressure_hPa",
    markers=True,
    title=f"Sounding at {key} (Station {df['station'].iloc[0]})",
    labels={"temp_C": "Temperature (°C)", "pressure_hPa": "Pressure (hPa)"}
)

# Reverse y-axis so surface (1000 hPa) is at bottom
fig.update_yaxes(autorange="reversed", type="log", range=[3, 3])  # auto log scale

fig.update_layout(
    yaxis=dict(scaleanchor="x", scaleratio=5),  # make vertical stretch adjustable
    width=500,
    height=700,
)

fig.show()