Data Models

Data model classes for weather data, stations, and related objects.

WeatherData

Container for weather data API responses.

class python_cimis.models.WeatherData(providers: ~typing.List[~python_cimis.models.WeatherProvider] = <factory>)[source]

Bases: object

Main container for weather data response.

providers: List[WeatherProvider]
get_all_records() List[WeatherRecord][source]

Get all weather records from all providers.

get_records_by_station(station_number: str) List[WeatherRecord][source]

Get all records for a specific station.

get_records_by_date(date: str) List[WeatherRecord][source]

Get all records for a specific date.

__init__(providers: ~typing.List[~python_cimis.models.WeatherProvider] = <factory>) None

Usage Examples

# Get all records
all_records = weather_data.get_all_records()
print(f"Total records: {len(all_records)}")

# Get records for specific station
station_2_records = weather_data.get_records_by_station("2")
print(f"Station 2 records: {len(station_2_records)}")

# Process records
for record in all_records:
    print(f"Date: {record.date}, Station: {record.station}")

    # Access temperature data
    temp_data = record.data_values.get('day-air-tmp-avg')
    if temp_data and temp_data.value:
        print(f"  Temperature: {temp_data.value}°{temp_data.unit}")

WeatherProvider

Represents a data provider (WSN or SCS).

class python_cimis.models.WeatherProvider(name: str, type: str, owner: str, records: ~typing.List[~python_cimis.models.WeatherRecord] = <factory>)[source]

Bases: object

Represents a weather data provider (WSN or SCS).

name: str
type: str
owner: str
records: List[WeatherRecord]
__init__(name: str, type: str, owner: str, records: ~typing.List[~python_cimis.models.WeatherRecord] = <factory>) None

WeatherRecord

Individual weather data record.

class python_cimis.models.WeatherRecord(date: str, julian: str, station: str | None = None, standard: str = 'english', zip_codes: str = '', scope: str = 'daily', hour: str | None = None, data_values: ~typing.Dict[str, ~python_cimis.models.DataValue] = <factory>)[source]

Bases: object

Represents a single weather data record.

date: str
julian: str
station: str | None = None
standard: str = 'english'
zip_codes: str = ''
scope: str = 'daily'
hour: str | None = None
data_values: Dict[str, DataValue]
get_value(data_item: str) DataValue | None[source]

Get a specific data value by data item name.

get_numeric_value(data_item: str) float | None[source]

Get a numeric value for a specific data item.

__init__(date: str, julian: str, station: str | None = None, standard: str = 'english', zip_codes: str = '', scope: str = 'daily', hour: str | None = None, data_values: ~typing.Dict[str, ~python_cimis.models.DataValue] = <factory>) None

Usage Examples

for record in weather_data.get_all_records():
    print(f"Date: {record.date}")
    print(f"Station: {record.station}")
    print(f"Julian day: {record.julian}")

    # Check if this is hourly data
    if hasattr(record, 'hour') and record.hour:
        print(f"Hour: {record.hour}")

    # Access data values
    for key, value in record.data_values.items():
        if value.value:  # Only show non-empty values
            print(f"  {key}: {value.value} {value.unit} (QC: {value.qc})")

DataValue

Individual data point with quality control information.

class python_cimis.models.DataValue(value: str | None, qc: str = ' ', unit: str = '')[source]

Bases: object

Represents a single data value with quality control information.

value: str | None
qc: str = ' '
unit: str = ''
property numeric_value: float | None

Return the numeric value if possible, None otherwise.

__init__(value: str | None, qc: str = ' ', unit: str = '') None

Quality Control Flags

# Check data quality
temp_data = record.data_values.get('day-air-tmp-avg')

if temp_data.qc == ' ':  # Space character
    print("Measured or calculated value")
elif temp_data.qc == 'Y':
    print("Missing data replaced with estimated value")
elif temp_data.qc == 'M':
    print("Missing data not replaced")

Station

Weather station information.

class python_cimis.models.Station(station_nbr: str, name: str, city: str, regional_office: str | None = None, county: str | None = None, connect_date: str = '', disconnect_date: str = '', is_active: bool = True, is_eto_station: bool = True, elevation: str = '', ground_cover: str = '', hms_latitude: str = '', hms_longitude: str = '', zip_codes: ~typing.List[str] = <factory>, siting_desc: str = '')[source]

Bases: object

Represents a CIMIS weather station.

station_nbr: str
name: str
city: str
regional_office: str | None = None
county: str | None = None
connect_date: str = ''
disconnect_date: str = ''
is_active: bool = True
is_eto_station: bool = True
elevation: str = ''
ground_cover: str = ''
hms_latitude: str = ''
hms_longitude: str = ''
zip_codes: List[str]
siting_desc: str = ''
property latitude: float | None

Extract decimal latitude from HMS format.

property longitude: float | None

Extract decimal longitude from HMS format.

__init__(station_nbr: str, name: str, city: str, regional_office: str | None = None, county: str | None = None, connect_date: str = '', disconnect_date: str = '', is_active: bool = True, is_eto_station: bool = True, elevation: str = '', ground_cover: str = '', hms_latitude: str = '', hms_longitude: str = '', zip_codes: ~typing.List[str] = <factory>, siting_desc: str = '') None

Usage Examples

stations = client.get_stations()

for station in stations:
    print(f"Station {station.station_nbr}: {station.name}")
    print(f"  Location: {station.city}, {station.county}")
    print(f"  Active: {station.is_active}")
    print(f"  ETo Station: {station.is_eto_station}")
    print(f"  Elevation: {station.elevation}")
    print(f"  Connected: {station.connect_date}")

    if station.disconnect_date:
        print(f"  Disconnected: {station.disconnect_date}")

Station Filtering

# Find active stations
active_stations = [s for s in stations if s.is_active]

# Find ETo stations
eto_stations = [s for s in stations if s.is_eto_station]

# Find stations in specific county
fresno_stations = [s for s in stations if s.county.lower() == 'fresno']

# Find stations by elevation
high_elevation = [s for s in stations if float(s.elevation) > 1000]

ZipCode

WSN station zip code information.

class python_cimis.models.ZipCode(zip_code: str, station_nbr: str | None = None, connect_date: str = '', disconnect_date: str = '', is_active: bool = True)[source]

Bases: object

Represents a zip code with CIMIS support information.

zip_code: str
station_nbr: str | None = None
connect_date: str = ''
disconnect_date: str = ''
is_active: bool = True
__init__(zip_code: str, station_nbr: str | None = None, connect_date: str = '', disconnect_date: str = '', is_active: bool = True) None

Usage Examples

zip_codes = client.get_station_zip_codes()

for zip_code in zip_codes:
    print(f"Zip: {zip_code.zip_code} -> Station: {zip_code.station_nbr}")

# Find station for specific zip code
target_zip = "95823"
matching_stations = [zc.station_nbr for zc in zip_codes if zc.zip_code == target_zip]

SpatialZipCode

Spatial CIMIS System zip code information.

class python_cimis.models.SpatialZipCode(zip_code: str, city: str = '', county: str = '', connect_date: str = '', disconnect_date: str = '', is_active: bool = True)[source]

Bases: object

Represents a spatial zip code supported by SCS.

zip_code: str
city: str = ''
county: str = ''
connect_date: str = ''
disconnect_date: str = ''
is_active: bool = True
__init__(zip_code: str, city: str = '', county: str = '', connect_date: str = '', disconnect_date: str = '', is_active: bool = True) None

Usage Examples

spatial_zips = client.get_spatial_zip_codes()

for zip_code in spatial_zips:
    print(f"Zip: {zip_code.zip_code}")
    print(f"  City: {zip_code.city}")
    print(f"  County: {zip_code.county}")

# Find zip codes in specific county
sac_county_zips = [sz for sz in spatial_zips if sz.county.lower() == 'sacramento']

Data Access Patterns

Accessing Weather Values

def extract_temperature_data(weather_data):
    """Extract temperature data from weather records."""
    temperatures = []

    for record in weather_data.get_all_records():
        temp_data = record.data_values.get('day-air-tmp-avg')

        if temp_data and temp_data.value and temp_data.qc != 'M':
            temperatures.append({
                'date': record.date,
                'station': record.station,
                'temperature': float(temp_data.value),
                'unit': temp_data.unit,
                'quality': temp_data.qc
            })

    return temperatures

Data Validation

def validate_data_quality(weather_data, max_missing_percent=10):
    """Validate data quality in weather records."""
    total_values = 0
    missing_values = 0
    estimated_values = 0

    for record in weather_data.get_all_records():
        for key, value in record.data_values.items():
            total_values += 1

            if value.qc == 'M':
                missing_values += 1
            elif value.qc == 'Y':
                estimated_values += 1

    missing_percent = (missing_values / total_values) * 100
    estimated_percent = (estimated_values / total_values) * 100

    quality_report = {
        'total_values': total_values,
        'missing_count': missing_values,
        'missing_percent': missing_percent,
        'estimated_count': estimated_values,
        'estimated_percent': estimated_percent,
        'quality_acceptable': missing_percent <= max_missing_percent
    }

    return quality_report

Converting to Other Formats

def weather_data_to_dict(weather_data):
    """Convert weather data to dictionary format."""
    result = []

    for record in weather_data.get_all_records():
        record_dict = {
            'date': record.date,
            'station': record.station,
            'julian': record.julian,
            'data': {}
        }

        # Add hour for hourly data
        if hasattr(record, 'hour') and record.hour:
            record_dict['hour'] = record.hour

        # Add all data values
        for key, value in record.data_values.items():
            record_dict['data'][key] = {
                'value': value.value,
                'qc': value.qc,
                'unit': value.unit
            }

        result.append(record_dict)

    return result

def weather_data_to_json(weather_data, filename=None):
    """Convert weather data to JSON format."""
    import json

    data_dict = weather_data_to_dict(weather_data)

    if filename:
        with open(filename, 'w') as f:
            json.dump(data_dict, f, indent=2)
        return filename
    else:
        return json.dumps(data_dict, indent=2)

Data Processing Helpers

Statistical Analysis

def calculate_statistics(weather_data, data_item):
    """Calculate basic statistics for a data item."""
    values = []

    for record in weather_data.get_all_records():
        data_value = record.data_values.get(data_item)

        if data_value and data_value.value and data_value.qc != 'M':
            try:
                values.append(float(data_value.value))
            except ValueError:
                continue

    if not values:
        return None

    return {
        'count': len(values),
        'mean': sum(values) / len(values),
        'min': min(values),
        'max': max(values),
        'range': max(values) - min(values)
    }

# Usage
temp_stats = calculate_statistics(weather_data, 'day-air-tmp-avg')
eto_stats = calculate_statistics(weather_data, 'day-eto')

Time Series Processing

def create_time_series(weather_data, data_items):
    """Create time series data for specified items."""
    from datetime import datetime

    time_series = {}

    for item in data_items:
        time_series[item] = {
            'dates': [],
            'values': [],
            'units': None
        }

    for record in weather_data.get_all_records():
        record_date = datetime.strptime(record.date, '%Y-%m-%d').date()

        for item in data_items:
            data_value = record.data_values.get(item)

            if data_value and data_value.value and data_value.qc != 'M':
                try:
                    value = float(data_value.value)
                    time_series[item]['dates'].append(record_date)
                    time_series[item]['values'].append(value)

                    if not time_series[item]['units']:
                        time_series[item]['units'] = data_value.unit

                except ValueError:
                    continue

    return time_series