IGRA#
import pandas as pd
import requests, zipfile, io
import re
from datetime import datetime
import numpy as np
# Path to IGRA2 station list
station_list_path = "./data/IGRA/igra2-station-list.txt"
# Read as whitespace-delimited file
# stations = pd.read_csv(
# station_list_path,
# delim_whitespace=True,
# header=None,
# usecols=[0,1,2,3,4,5,6],
# names=["id", "lat", "lon", "elev_m", "name1", "name2", "name3"],
# engine="python"
# )
stations = pd.read_fwf(
station_list_path,
header=None,
names=['id','lat','lon','elev_m','name','datei','datef','records'])
# The station name may be split across multiple columns, so join them
stations["name"] = stations[["name1","name2","name3"]].fillna("").agg(" ".join, axis=1).str.strip()
# Keep only relevant columns
stations = stations[["id","lat","lon","elev_m","name"]]
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
Cell In[3], line 17
11 stations = pd.read_fwf(
12 station_list_path,
13 header=None,
14 names=['id','lat','lon','elev_m','name','datei','datef','records'])
16 # The station name may be split across multiple columns, so join them
---> 17 stations["name"] = stations[["name1","name2","name3"]].fillna("").agg(" ".join, axis=1).str.strip()
19 # Keep only relevant columns
20 stations = stations[["id","lat","lon","elev_m","name"]]
File ~/opt/anaconda3/envs/jupyterbook_env/lib/python3.11/site-packages/pandas/core/frame.py:4108, in DataFrame.__getitem__(self, key)
4106 if is_iterator(key):
4107 key = list(key)
-> 4108 indexer = self.columns._get_indexer_strict(key, "columns")[1]
4110 # take() does not accept boolean indexers
4111 if getattr(indexer, "dtype", None) == bool:
File ~/opt/anaconda3/envs/jupyterbook_env/lib/python3.11/site-packages/pandas/core/indexes/base.py:6200, in Index._get_indexer_strict(self, key, axis_name)
6197 else:
6198 keyarr, indexer, new_indexer = self._reindex_non_unique(keyarr)
-> 6200 self._raise_if_missing(keyarr, indexer, axis_name)
6202 keyarr = self.take(indexer)
6203 if isinstance(key, Index):
6204 # GH 42790 - Preserve name from an Index
File ~/opt/anaconda3/envs/jupyterbook_env/lib/python3.11/site-packages/pandas/core/indexes/base.py:6249, in Index._raise_if_missing(self, key, indexer, axis_name)
6247 if nmissing:
6248 if nmissing == len(indexer):
-> 6249 raise KeyError(f"None of [{key}] are in the [{axis_name}]")
6251 not_found = list(ensure_index(key)[missing_mask.nonzero()[0]].unique())
6252 raise KeyError(f"{not_found} not in index")
KeyError: "None of [Index(['name1', 'name2', 'name3'], dtype='object')] are in the [columns]"
stations
# Filter MJO box: 65E–120E, 10S–10N
mjo_stations = stations[
(stations["lon"] >= 65) & (stations["lon"] <= 160) &
(stations["lat"] >= -5) & (stations["lat"] <= 5)
].reset_index(drop=True)
mjo_stations
# Suppose you've already built mjo_stations
station = mjo_stations.iloc[0]
station_id = station["id"]
print("Downloading:", station_id, station["name"])
# Build URL
url = f"https://www.ncei.noaa.gov/pub/data/igra/data/data-por/{station_id}-data.txt.zip"
print(url)
# Download and unzip
r = requests.get(url)
z = zipfile.ZipFile(io.BytesIO(r.content))
fname = z.namelist()[0]
# Read lines
lines = z.read(fname).decode("utf-8").splitlines()
print("Number of lines:", len(lines))
print("First few lines:\n", "\n".join(lines[:15]))
import re
from datetime import datetime
import pandas as pd
import numpy as np
QC_LETTER_RE = re.compile(r'[A-Za-z]')
def _to_num(token: str, scale=1.0, missing=-9999, allow_signed=True):
if token is None:
return np.nan
t = QC_LETTER_RE.sub('', token.strip())
if t == '':
return np.nan
try:
val = int(t) if allow_signed else int(t.replace('-', ''))
except ValueError:
return np.nan
if val == missing:
return np.nan
return val / scale
def parse_header(header_line: str):
stn_id = header_line[1:12].strip()
year = int(header_line[13:17])
month = int(header_line[18:20])
day = int(header_line[21:23])
hour = int(header_line[24:26])
ts = pd.Timestamp(datetime(year, month, day, hour))
try:
approx_levels = int(header_line[27:31].strip())
except Exception:
approx_levels = None
return stn_id, ts, approx_levels
def parse_level_line(ln: str) -> dict:
return {
"level_type1": int(ln[0]),
"level_type2": int(ln[1]),
"pressure_hPa": _to_num(ln[9:15], scale=100.0, missing=-9999),
"height_m": _to_num(ln[16:21], scale=1.0, missing=-9999),
"temp_C": _to_num(ln[22:27], scale=10.0, missing=-9999),
"rh_pct": _to_num(ln[28:33], scale=10.0, missing=-9999),
"dewpoint_C": _to_num(ln[34:39], scale=10.0, missing=-9999),
"wind_dir_deg": _to_num(ln[40:45], scale=1.0, missing=-9999),
"wind_speed_mps": _to_num(ln[46:51], scale=10.0, missing=-9999),
}
def split_into_soundings(lines: list[str]):
soundings = []
cur_header, cur_levels = None, []
def flush():
nonlocal cur_header, cur_levels
if cur_header is not None and cur_levels:
soundings.append((cur_header, cur_levels))
cur_header, cur_levels = None, []
for ln in lines:
if not ln.strip():
continue
if ln.startswith('#'):
flush()
cur_header = ln
else:
cur_levels.append(ln)
flush()
return soundings
def build_catalog_and_metadata(lines: list[str], drop_empty: bool = True):
"""
Returns:
catalog: dict[catalog_key -> DataFrame]
meta: DataFrame with per-sounding stats (includes 'catalog_key' and 'status')
If drop_empty=True, both outputs exclude soundings classified as 'empty'.
"""
soundings = split_into_soundings(lines)
catalog = {}
meta_rows = []
# handle duplicate timestamps by suffixing keys
duplicates_count: dict[pd.Timestamp, int] = {}
for header, level_lines in soundings:
stn_id, ts, approx_count = parse_header(header)
rows = [parse_level_line(ln) for ln in level_lines]
df = pd.DataFrame(rows)
df.insert(0, "station", stn_id)
df.insert(1, "time", ts)
# classify BEFORE dropping anything
n_valid_pres = int(df["pressure_hPa"].notna().sum())
n_valid_temp = int(df["temp_C"].notna().sum())
n_valid_wind = int(df["wind_speed_mps"].notna().sum())
if n_valid_pres == 0 and n_valid_temp == 0 and n_valid_wind == 0:
status = "empty"
elif n_valid_pres == 0 and n_valid_temp == 0 and n_valid_wind > 0:
status = "winds_only"
elif n_valid_temp == 0 and n_valid_pres > 0:
status = "pressure_only"
else:
status = "full"
# now drop rows that are completely useless across all key fields
df = df.dropna(
how="all",
subset=["pressure_hPa","height_m","temp_C","dewpoint_C","wind_dir_deg","wind_speed_mps"]
).reset_index(drop=True)
# sort by pressure if present; else by height if present
if df["pressure_hPa"].notna().any():
df = df.sort_values("pressure_hPa", ascending=False).reset_index(drop=True)
elif df["height_m"].notna().any():
df = df.sort_values("height_m", ascending=True).reset_index(drop=True)
# unique catalog key (timestamp + optional suffix)
if ts in duplicates_count:
duplicates_count[ts] += 1
key = f"{ts.isoformat()}.{duplicates_count[ts]}"
else:
duplicates_count[ts] = 1
key = ts.isoformat()
# store even if empty for now; we'll filter at the end if requested
catalog[key] = df
# compute simple stats from the (lightly) cleaned df
n_levels = len(df)
p_max = float(df["pressure_hPa"].max()) if df["pressure_hPa"].notna().any() else np.nan
p_min = float(df["pressure_hPa"].min()) if df["pressure_hPa"].notna().any() else np.nan
z_top = float(df["height_m"].max()) if df["height_m"].notna().any() else np.nan
meta_rows.append({
"catalog_key": key,
"station": stn_id,
"time": ts,
"status": status,
"approx_levels_header": approx_count,
"n_levels": n_levels,
"n_valid_pressure": n_valid_pres,
"n_valid_temp": n_valid_temp,
"n_valid_wind": n_valid_wind,
"p_max_hPa_surface": p_max,
"p_min_hPa_top": p_min,
"z_top_m": z_top,
})
meta = pd.DataFrame(meta_rows).sort_values("time").reset_index(drop=True)
if drop_empty:
# keep only non-empty in meta
meta = meta[meta["status"] != "empty"].reset_index(drop=True)
# align catalog to filtered meta
keep_keys = set(meta["catalog_key"])
catalog = {k: v for k, v in catalog.items() if k in keep_keys}
return catalog, meta
catalog, meta = build_catalog_and_metadata(lines, drop_empty=True)
print(meta["status"].value_counts())
print(f"Catalog size: {len(catalog)} | Meta rows: {len(meta)}")
print(len(catalog), "soundings parsed")
type(catalog)
# Filter meta for full soundings only
meta_full = meta[meta["status"] == "full"].reset_index(drop=True)
# Build a new catalog with only those keys
catalog_full = {k: catalog[k] for k in meta_full["catalog_key"]}
print(f"Original catalog: {len(catalog)} soundings")
print(f"Full-only subset: {len(catalog_full)} soundings")
# Quick check
#print(meta_full.head())
example_key = meta_full.loc[0, "catalog_key"]
#print(catalog_full[example_key].head())
import plotly.express as px
# Pick one sounding (first entry in the dict)
key, df = next(iter(catalog_full.items()))
# df has columns like ["station","time","level_type","pressure_hPa","height_m","temp_C",...]
# Make a temperature vs. pressure plot (log-y axis, inverted so surface is at bottom)
fig = px.line(
df,
x="temp_C",
y="pressure_hPa",
markers=True,
title=f"Sounding at {key} (Station {df['station'].iloc[0]})",
labels={"temp_C": "Temperature (°C)", "pressure_hPa": "Pressure (hPa)"}
)
# Reverse y-axis so surface (1000 hPa) is at bottom
fig.update_yaxes(autorange="reversed", type="log", range=[3, 3]) # auto log scale
fig.update_layout(
yaxis=dict(scaleanchor="x", scaleratio=5), # make vertical stretch adjustable
width=500,
height=700,
)
fig.show()