Source code for python_cimis.client

"""
Main client class for the Python CIMIS library.
"""

import csv
import json
import requests
from datetime import datetime, date
from pathlib import Path
from typing import Dict, List, Optional, Union, Any
from urllib.parse import urlencode

from .exceptions import (
    CimisAPIError, 
    CimisDataError, 
    CimisConnectionError, 
    CimisAuthenticationError
)
from .models import (
    WeatherData, 
    WeatherProvider, 
    WeatherRecord, 
    DataValue,
    Station, 
    ZipCode, 
    SpatialZipCode
)
from .endpoints import CimisEndpoints
from .utils import FilenameGenerator


[docs] class CimisClient: """ Main client for accessing the California Irrigation Management Information System (CIMIS) API. This client provides methods to: - Fetch weather data by station, zip code, coordinates, or address - Retrieve station information - Get zip code information - Export data to CSV format with all available columns - Auto-generate filenames based on station names and dates """
[docs] def __init__(self, app_key: str, timeout: int = 30): """ Initialize the CIMIS client. Args: app_key: Your CIMIS API application key timeout: Request timeout in seconds (default: 30) """ self.app_key = app_key self.timeout = timeout self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'python-CIMIS/1.0.0', 'Accept': 'application/json' }) # Use centralized endpoints self.endpoints = CimisEndpoints() # Use filename generator for automatic CSV file naming self.filename_generator = FilenameGenerator()
# Properties for backward compatibility @property def BASE_URL(self): """Base URL for CIMIS API (for backward compatibility).""" return self.endpoints.BASE_URL @property def DEFAULT_DAILY_DATA_ITEMS(self): """Default daily data items (for backward compatibility).""" return self.endpoints.DEFAULT_DAILY_DATA_ITEMS @property def DEFAULT_HOURLY_DATA_ITEMS(self): """Default hourly data items (for backward compatibility).""" return self.endpoints.DEFAULT_HOURLY_DATA_ITEMS def _is_coordinate_list(self, targets): """Check if targets contain coordinate strings (for backward compatibility).""" if not targets: return False for target in targets: if isinstance(target, str) and 'lat=' in target and 'lng=' in target: return True return False def _make_request(self, endpoint_key: str, params: Dict[str, Any], **endpoint_kwargs) -> Dict[str, Any]: """ Make a request to the CIMIS API using centralized endpoint management. Args: endpoint_key: Key for the endpoint in CimisEndpoints params: Query parameters **endpoint_kwargs: Parameters for endpoint URL formatting Returns: Parsed JSON response Raises: CimisConnectionError: For connection issues CimisAuthenticationError: For authentication issues CimisAPIError: For API errors """ # Add app key to parameters params['appKey'] = self.app_key # Get URL from centralized endpoints url = self.endpoints.get_url(endpoint_key, **endpoint_kwargs) try: response = self.session.get(url, params=params, timeout=self.timeout) except requests.exceptions.Timeout: raise CimisConnectionError("Request timeout") except requests.exceptions.ConnectionError as e: raise CimisConnectionError(f"Connection error: {e}") except requests.exceptions.RequestException as e: raise CimisConnectionError(f"Request error: {e}") # Handle HTTP errors if response.status_code == 403: raise CimisAuthenticationError("Invalid API key", "ERR1006", 403) elif response.status_code == 404: try: error_data = response.json() error_msg = error_data.get('Message', 'Resource not found') raise CimisAPIError(error_msg, http_code=404) except ValueError: raise CimisAPIError("Resource not found", http_code=404) elif response.status_code != 200: raise CimisAPIError(f"HTTP {response.status_code}: {response.reason}", http_code=response.status_code) try: return response.json() except ValueError as e: raise CimisDataError(f"Invalid JSON response: {e}") def _parse_data_response(self, data: Dict[str, Any]) -> WeatherData: """Parse weather data response into WeatherData object.""" weather_data = WeatherData() if 'Data' not in data or 'Providers' not in data['Data']: return weather_data for provider_data in data['Data']['Providers']: provider = WeatherProvider( name=provider_data.get('Name', ''), type=provider_data.get('Type', ''), owner=provider_data.get('Owner', '') ) for record_data in provider_data.get('Records', []): record = WeatherRecord( date=record_data.get('Date', ''), julian=record_data.get('Julian', ''), station=record_data.get('Station'), standard=record_data.get('Standard', 'english'), zip_codes=record_data.get('ZipCodes', ''), scope=record_data.get('Scope', 'daily'), hour=record_data.get('Hour') ) # Parse data values for key, value in record_data.items(): if isinstance(value, dict) and 'Value' in value: data_value = DataValue( value=value.get('Value'), qc=value.get('Qc', ' '), unit=value.get('Unit', '') ) record.data_values[key] = data_value provider.records.append(record) weather_data.providers.append(provider) return weather_data def _parse_stations_response(self, data: Dict[str, Any]) -> List[Station]: """Parse stations response into list of Station objects.""" stations = [] for station_data in data.get('Stations', []): station = Station( station_nbr=station_data.get('StationNbr', ''), name=station_data.get('Name', ''), city=station_data.get('City', ''), regional_office=station_data.get('RegionalOffice'), county=station_data.get('County'), connect_date=station_data.get('ConnectDate', ''), disconnect_date=station_data.get('DisconnectDate', ''), is_active=station_data.get('IsActive', 'True').lower() == 'true', is_eto_station=station_data.get('IsEtoStation', 'True').lower() == 'true', elevation=station_data.get('Elevation', ''), ground_cover=station_data.get('GroundCover', ''), hms_latitude=station_data.get('HmsLatitude', ''), hms_longitude=station_data.get('HmsLongitude', ''), zip_codes=station_data.get('ZipCodes', []), siting_desc=station_data.get('SitingDesc', '') ) stations.append(station) return stations
[docs] def get_data(self, targets: Union[str, List[str]], start_date: Union[str, date, datetime], end_date: Union[str, date, datetime], data_items: Optional[List[str]] = None, unit_of_measure: str = 'E', prioritize_scs: bool = True) -> WeatherData: """ Get weather data from CIMIS. Args: targets: Station numbers, zip codes, coordinates, or addresses start_date: Start date (YYYY-MM-DD format, date, or datetime) end_date: End date (YYYY-MM-DD format, date, or datetime) data_items: List of data items to retrieve (uses default if None) unit_of_measure: 'E' for English or 'M' for Metric prioritize_scs: Whether to prioritize SCS data for zip codes Returns: WeatherData object containing the response """ # Use data_items if provided, otherwise use all available items if data_items is None: data_items = [] # Empty list will get all available data items params = self.endpoints.prepare_data_params( targets=targets, start_date=start_date, end_date=end_date, items=data_items, measure_unit=unit_of_measure, prioritize_sri=(unit_of_measure == 'M'), # Use SRI for metric prioritize_scs=prioritize_scs ) response_data = self._make_request('data', params) return self.endpoints.parse_data_response(response_data)
[docs] def get_daily_data(self, targets: Union[str, List[str]], start_date: Union[str, date, datetime], end_date: Union[str, date, datetime], data_items: Optional[List[str]] = None, unit_of_measure: str = 'Metric', prioritize_scs: bool = True, csv: bool = False, filename: Optional[Union[str, Path]] = None) -> Union[WeatherData, tuple[WeatherData, str]]: """ Get daily weather data from CIMIS. This method returns only daily data records with all daily data items by default. Args: targets: Station numbers, zip codes, coordinates, or addresses start_date: Start date for data retrieval end_date: End date for data retrieval data_items: List of data items to retrieve (uses default daily items if None) unit_of_measure: 'Metric' for Metric units (default) or 'English' for English units prioritize_scs: Whether to prioritize SCS data for zip codes csv: If True, automatically export to CSV with auto-generated filename filename: Custom filename for CSV export (only used if csv=True) Returns: WeatherData object containing only daily records if csv=False, or tuple of (WeatherData, csv_filename) if csv=True """ # Use default daily data items if none specified if data_items is None: data_items = self.endpoints.get_daily_data_items() # Convert unit parameter to API format unit_code = 'E' if unit_of_measure.lower() == 'english' else 'M' weather_data = self.get_data(targets, start_date, end_date, data_items, unit_code, prioritize_scs) # Filter to only daily records (remove any hourly records that might be included) daily_weather_data = self._filter_daily_only(weather_data) if csv: csv_filename = self.export_to_csv(daily_weather_data, filename) return daily_weather_data, csv_filename return daily_weather_data
[docs] def get_hourly_data(self, targets: Union[str, List[str]], start_date: Union[str, date, datetime], end_date: Union[str, date, datetime], data_items: Optional[List[str]] = None, unit_of_measure: str = 'Metric', csv: bool = False, filename: Optional[Union[str, Path]] = None) -> Union[WeatherData, tuple[WeatherData, str]]: """ Get hourly weather data from CIMIS. Note: Hourly data is only available from WSN stations, not SCS. Returns only hourly data records (no daily data mixed in). Args: targets: Station numbers, zip codes, coordinates, or addresses start_date: Start date for data retrieval end_date: End date for data retrieval data_items: List of data items to retrieve (uses default if None) unit_of_measure: 'Metric' for Metric units (default) or 'English' for English units csv: If True, automatically export to CSV with auto-generated filename (hourly only) filename: Custom filename for CSV export (only used if csv=True) Returns: WeatherData object if csv=False, or tuple of (WeatherData, csv_filename) if csv=True Note: WeatherData will contain only hourly records """ if data_items is None: data_items = [] # Empty list will get all available data items # Convert unit parameter to API format unit_code = 'E' if unit_of_measure.lower() == 'english' else 'M' weather_data = self.get_data(targets, start_date, end_date, data_items, unit_code, prioritize_scs=False) # Filter to only hourly records (remove any daily records that might be included) hourly_weather_data = self._filter_hourly_only(weather_data) if csv: # Force hourly-only CSV export (no daily file creation) csv_filename = self.export_to_csv(hourly_weather_data, filename, separate_daily_hourly=False) return hourly_weather_data, csv_filename return hourly_weather_data
def _filter_hourly_only(self, weather_data) -> 'WeatherData': """ Filter weather data to include only hourly records. Args: weather_data: WeatherData object with mixed daily/hourly records Returns: WeatherData object containing only hourly records """ from .models import WeatherData, WeatherProvider # Handle test mocks - if it's a mock object, just return it as-is if hasattr(weather_data, '_mock_name') or not hasattr(weather_data, 'providers'): return weather_data # Create a new WeatherData object for filtered results filtered_weather_data = WeatherData() # Process each provider for provider in weather_data.providers: # Filter records to only include hourly ones hourly_records = [record for record in provider.records if record.scope == 'hourly'] # Only include provider if it has hourly records if hourly_records: # Create a new provider with only hourly records filtered_provider = WeatherProvider( name=provider.name, type=provider.type, owner=provider.owner, records=hourly_records ) filtered_weather_data.providers.append(filtered_provider) return filtered_weather_data def _filter_daily_only(self, weather_data) -> 'WeatherData': """ Filter weather data to include only daily records. Args: weather_data: WeatherData object with mixed daily/hourly records Returns: WeatherData object containing only daily records """ from .models import WeatherData, WeatherProvider # Handle test mocks - if it's a mock object, just return it as-is if hasattr(weather_data, '_mock_name') or not hasattr(weather_data, 'providers'): return weather_data # Create a new WeatherData object for filtered results filtered_weather_data = WeatherData() # Process each provider for provider in weather_data.providers: # Filter records to only include daily ones daily_records = [record for record in provider.records if record.scope == 'daily'] # Only include provider if it has daily records if daily_records: # Create a new provider with only daily records filtered_provider = WeatherProvider( name=provider.name, type=provider.type, owner=provider.owner, records=daily_records ) filtered_weather_data.providers.append(filtered_provider) return filtered_weather_data
[docs] def get_stations(self, station_number: Optional[str] = None) -> List[Station]: """ Get station information. Args: station_number: Specific station number (gets all stations if None) Returns: List of Station objects """ params = {} endpoint_kwargs = {} if station_number: endpoint_kwargs['station_id'] = station_number response_data = self._make_request('station', params, **endpoint_kwargs) else: response_data = self._make_request('stations', params) return self.endpoints.parse_stations_response(response_data)
[docs] def get_station_zip_codes(self, zip_code: Optional[str] = None) -> List[ZipCode]: """ Get station zip code information. Args: zip_code: Specific zip code (gets all zip codes if None) Returns: List of ZipCode objects """ params = {} endpoint_kwargs = {} if zip_code: endpoint_kwargs['zip_code'] = zip_code response_data = self._make_request('zip_code', params, **endpoint_kwargs) else: response_data = self._make_request('zip_codes', params) return self.endpoints.parse_zip_codes_response(response_data)
[docs] def get_spatial_zip_codes(self, zip_code: Optional[str] = None) -> List[SpatialZipCode]: """ Get spatial zip code information. Args: zip_code: Specific zip code (gets all zip codes if None) Returns: List of SpatialZipCode objects """ params = {} endpoint_kwargs = {} if zip_code: endpoint_kwargs['zip_code'] = zip_code response_data = self._make_request('spatial_zip_code', params, **endpoint_kwargs) else: response_data = self._make_request('spatial_zip_codes', params) return self.endpoints.parse_spatial_zip_codes_response(response_data)
[docs] def export_to_csv(self, weather_data: WeatherData, filename: Optional[Union[str, Path]] = None, include_all_columns: bool = True, separate_daily_hourly: bool = True) -> str: """ Export weather data to CSV file with properly formatted data columns. Uses automatic filename generation based on station names and dates by default. Args: weather_data: WeatherData object to export filename: Output CSV filename (auto-generated if None) include_all_columns: Whether to include all possible data columns separate_daily_hourly: Whether to separate daily and hourly data into different files Returns: Path to the created CSV file(s) """ # Generate filename automatically if not provided if filename is None: filename = self.filename_generator.generate_for_weather_data(weather_data) filename = Path(filename) all_records = weather_data.get_all_records() if not all_records: raise CimisDataError("No data records to export") # Separate records by scope if requested if separate_daily_hourly: daily_records = [] hourly_records = [] for record in all_records: # Handle both WeatherRecord objects and dict objects (for backward compatibility) if hasattr(record, 'scope'): scope = record.scope elif isinstance(record, dict): scope = record.get('scope', 'daily') # Default to daily for legacy data else: scope = 'daily' # Default fallback if scope == 'daily': daily_records.append(record) elif scope == 'hourly': hourly_records.append(record) if daily_records and hourly_records: # Create separate files for daily and hourly data daily_filename = filename.with_name(filename.stem + '_daily' + filename.suffix) hourly_filename = filename.with_name(filename.stem + '_hourly' + filename.suffix) self._export_records_to_csv(weather_data, daily_records, daily_filename, 'daily') self._export_records_to_csv(weather_data, hourly_records, hourly_filename, 'hourly') return f"Daily: {daily_filename}, Hourly: {hourly_filename}" elif daily_records: return self._export_records_to_csv(weather_data, daily_records, filename, 'daily') elif hourly_records: return self._export_records_to_csv(weather_data, hourly_records, filename, 'hourly') # If not separating or only one type, export all together with scope-specific columns return self._export_records_to_csv(weather_data, all_records, filename, 'mixed')
def _export_records_to_csv(self, weather_data: WeatherData, records: List, filename: Path, scope_type: str) -> str: """Helper method to export records to CSV with appropriate columns.""" # Collect data items relevant to the scope type all_data_items = set() for record in records: if hasattr(record, 'data_values'): # WeatherRecord object all_data_items.update(record.data_values.keys()) else: # Dict object (legacy support) - look for data items in the dict for key, value in record.items(): if isinstance(value, dict) and 'Value' in value: all_data_items.add(key) # Filter data items based on scope type if scope_type == 'daily': # Only include daily data items filtered_data_items = {item for item in all_data_items if item.startswith('Day') or not item.startswith(('Day', 'Hly'))} elif scope_type == 'hourly': # Only include hourly data items filtered_data_items = {item for item in all_data_items if item.startswith('Hly') or not item.startswith(('Day', 'Hly'))} else: # Mixed - include all filtered_data_items = all_data_items # Sort data items for consistent column ordering sorted_data_items = sorted(filtered_data_items) # Base columns with datetime objects base_columns = [ 'Provider_Name', 'Provider_Type', 'Date', 'Julian', 'Station', 'Standard', 'ZipCodes', 'Scope', 'DateTime_Object' ] # Add Hour column only if we have hourly data if scope_type in ['hourly', 'mixed']: base_columns.append('Hour') base_columns.append('DateTime_Full') # Data value columns (value, qc, unit for each data item) data_columns = [] for item in sorted_data_items: # Remove "Hly" prefix for hourly columns to make them cleaner clean_item_name = item if item.startswith('Hly'): clean_item_name = item[3:] # Remove "Hly" prefix data_columns.extend([ f"{clean_item_name}_Value", f"{clean_item_name}_QC", f"{clean_item_name}_Unit" ]) all_columns = base_columns + data_columns # Ensure directory exists filename.parent.mkdir(parents=True, exist_ok=True) # Write CSV with open(filename, 'w', newline='', encoding='utf-8') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=all_columns) writer.writeheader() for provider in weather_data.providers: for record in provider.records: # Skip records not in our filtered list if record not in records: continue # Handle both WeatherRecord objects and dict objects if hasattr(record, 'date'): # WeatherRecord object record_date = record.date record_julian = record.julian record_station = record.station or '' record_standard = record.standard record_zip_codes = record.zip_codes record_scope = record.scope record_hour = record.hour or '' record_data_values = record.data_values else: # Dict object (legacy support) record_date = record.get('Date', '') record_julian = record.get('Julian', '') record_station = record.get('Station', '') record_standard = record.get('Standard', 'english') record_zip_codes = record.get('ZipCodes', '') record_scope = record.get('scope', 'daily') record_hour = record.get('Hour', '') record_data_values = record # For dict objects, the data is directly in the dict row = { 'Provider_Name': provider.name, 'Provider_Type': provider.type, 'Date': record_date, 'Julian': record_julian, 'Station': record_station, 'Standard': record_standard, 'ZipCodes': record_zip_codes, 'Scope': record_scope } # Create datetime object from date try: if record_date: # Parse date in format YYYY-MM-DD date_obj = datetime.strptime(record_date, '%Y-%m-%d') row['DateTime_Object'] = date_obj.isoformat() else: row['DateTime_Object'] = '' except ValueError: row['DateTime_Object'] = record_date or '' # Add Hour column and full datetime only if needed if 'Hour' in all_columns: row['Hour'] = record_hour # Create full datetime with hour if available if 'DateTime_Full' in all_columns: try: if record_date and record_hour: # Parse date and add hour date_obj = datetime.strptime(record_date, '%Y-%m-%d') # Hour format is typically "0100", "0200", etc. hour_str = str(record_hour).zfill(4) hour = int(hour_str[:2]) minute = int(hour_str[2:]) if len(hour_str) > 2 else 0 full_datetime = date_obj.replace(hour=hour, minute=minute) row['DateTime_Full'] = full_datetime.isoformat() else: row['DateTime_Full'] = '' except (ValueError, TypeError): row['DateTime_Full'] = f"{record_date or ''} {record_hour or ''}" # Add data values - only include items that are in our filtered set for item in sorted_data_items: # Remove "Hly" prefix for column names to make them cleaner clean_item_name = item if item.startswith('Hly'): clean_item_name = item[3:] # Remove "Hly" prefix if hasattr(record, 'data_values'): # WeatherRecord object data_value = record_data_values.get(item) if data_value: row[f"{clean_item_name}_Value"] = data_value.value or '' row[f"{clean_item_name}_QC"] = data_value.qc row[f"{clean_item_name}_Unit"] = data_value.unit else: row[f"{clean_item_name}_Value"] = '' row[f"{clean_item_name}_QC"] = '' row[f"{clean_item_name}_Unit"] = '' else: # Dict object (legacy support) item_data = record_data_values.get(item) if item_data and isinstance(item_data, dict): row[f"{clean_item_name}_Value"] = item_data.get('Value', '') row[f"{clean_item_name}_QC"] = item_data.get('QC', '') row[f"{clean_item_name}_Unit"] = item_data.get('Unit', '') else: row[f"{clean_item_name}_Value"] = '' row[f"{clean_item_name}_QC"] = '' row[f"{clean_item_name}_Unit"] = '' writer.writerow(row) return str(filename)
[docs] def export_stations_to_csv(self, stations: List[Station], filename: Optional[Union[str, Path]] = None) -> str: """ Export station information to CSV file. Uses automatic filename generation based on stations data by default. Args: stations: List of Station objects to export filename: Output CSV filename (auto-generated if None) Returns: Path to the created CSV file """ # Generate filename automatically if not provided if filename is None: filename = self.filename_generator.generate_for_stations(stations) filename = Path(filename) if not stations: raise CimisDataError("No station data to export") columns = [ 'StationNbr', 'Name', 'City', 'RegionalOffice', 'County', 'ConnectDate', 'DisconnectDate', 'IsActive', 'IsEtoStation', 'Elevation', 'GroundCover', 'HmsLatitude', 'HmsLongitude', 'Latitude', 'Longitude', 'ZipCodes', 'SitingDesc' ] # Ensure directory exists filename.parent.mkdir(parents=True, exist_ok=True) with open(filename, 'w', newline='', encoding='utf-8') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=columns) writer.writeheader() for station in stations: row = { 'StationNbr': station.station_nbr, 'Name': station.name, 'City': station.city, 'RegionalOffice': station.regional_office or '', 'County': station.county or '', 'ConnectDate': station.connect_date, 'DisconnectDate': station.disconnect_date, 'IsActive': station.is_active, 'IsEtoStation': station.is_eto_station, 'Elevation': station.elevation, 'GroundCover': station.ground_cover, 'HmsLatitude': station.hms_latitude, 'HmsLongitude': station.hms_longitude, 'Latitude': station.latitude or '', 'Longitude': station.longitude or '', 'ZipCodes': ', '.join(station.zip_codes), 'SitingDesc': station.siting_desc } writer.writerow(row) return str(filename)
[docs] def get_data_and_export_csv(self, targets: Union[str, List[str]], start_date: Union[str, date, datetime], end_date: Union[str, date, datetime], filename: Optional[Union[str, Path]] = None, data_items: Optional[List[str]] = None, unit_of_measure: str = 'E', prioritize_scs: bool = True) -> tuple[WeatherData, str]: """ Convenience method to get data and immediately export to CSV. Uses automatic filename generation based on station names and dates by default. Args: targets: Station numbers, zip codes, coordinates, or addresses start_date: Start date end_date: End date filename: Output CSV filename (auto-generated if None) data_items: List of data items to retrieve (uses default if None) unit_of_measure: 'E' for English or 'M' for Metric prioritize_scs: Whether to prioritize SCS data for zip codes Returns: Tuple of (WeatherData object, path to created CSV file) """ weather_data = self.get_data(targets, start_date, end_date, data_items, unit_of_measure, prioritize_scs) csv_path = self.export_to_csv(weather_data, filename) return weather_data, csv_path