Utilities

Utility functions and helper classes for the Python CIMIS Client.

FilenameGenerator

Utility class for generating intelligent CSV filenames.

class python_cimis.utils.FilenameGenerator(base_directory: str | Path | None = None)[source]

Bases: object

Generates intelligent filenames for CIMIS data exports based on: - Station names and numbers - Date ranges - Data types (weather data, stations, etc.)

__init__(base_directory: str | Path | None = None)[source]

Initialize the filename generator.

Parameters:

base_directory – Base directory for file output (uses current directory if None)

generate_weather_filename(weather_data: WeatherData) str[source]

Generate filename for weather data export.

Parameters:

weather_data – WeatherData object containing the data

Returns:

Generated filename with full path

generate_stations_filename(stations: List[Station]) str[source]

Generate filename for station data export.

Parameters:

stations – List of Station objects

Returns:

Generated filename with full path

generate_zip_codes_filename(zip_codes: List[str]) str[source]

Generate filename for zip code data export.

Parameters:

zip_codes – List of zip codes

Returns:

Generated filename with full path

generate_custom_filename(data_type: str, identifiers: List[str] | None = None, date_range: str | None = None) str[source]

Generate custom filename with specified components.

Parameters:
  • data_type – Type of data (e.g., ‘weather’, ‘stations’, ‘hourly’)

  • identifiers – List of identifiers (station numbers, zip codes, etc.)

  • date_range – Date range string

Returns:

Generated filename with full path

generate_for_weather_data(weather_data: WeatherData) str[source]

Generate filename for weather data export. Alias for generate_weather_filename for compatibility.

generate_for_stations(stations: List[Station]) str[source]

Generate filename for stations data export. Alias for generate_stations_filename for compatibility.

set_base_directory(directory: str | Path) None[source]

Set the base directory for file output.

Usage Examples

from python_cimis.utils import FilenameGenerator

# Generate filename from weather data
weather_data = client.get_daily_data(targets=[2], start_date="2023-06-01", end_date="2023-06-07")
filename = FilenameGenerator.generate(weather_data)
print(filename)  # Output: "Station2_FivePoints_20230601_to_20230607.csv"

# Multiple stations
weather_data = client.get_daily_data(targets=[2, 8, 127], start_date="2023-06-01", end_date="2023-06-07")
filename = FilenameGenerator.generate(weather_data)
print(filename)  # Output: "MultiStation_20230601_to_20230607.csv"

Filename Patterns

The FilenameGenerator creates filenames using these patterns:

Single Station: Station{number}_{name}_{start_date}_to_{end_date}.csv

Multiple Stations: MultiStation_{start_date}_to_{end_date}.csv

Date Format: YYYYMMDD (e.g., 20230601)

Examples:

Station2_FivePoints_20230601_to_20230607.csv
Station127_Fresno_20230101_to_20231231.csv
MultiStation_20230601_to_20230607.csv

Helper Functions

Data Validation

def validate_date_range(start_date, end_date, max_days=365):
    """Validate date range for API requests."""
    from datetime import datetime, timedelta

    # Convert strings to date objects if needed
    if isinstance(start_date, str):
        start_date = datetime.strptime(start_date, '%Y-%m-%d').date()
    if isinstance(end_date, str):
        end_date = datetime.strptime(end_date, '%Y-%m-%d').date()

    # Validate range
    if start_date > end_date:
        raise ValueError("Start date must be before end date")

    date_diff = (end_date - start_date).days
    if date_diff > max_days:
        raise ValueError(f"Date range too large: {date_diff} days (max: {max_days})")

    return True

def validate_targets(targets):
    """Validate target list for API requests."""
    if not targets:
        raise ValueError("At least one target must be specified")

    valid_targets = []
    for target in targets:
        if isinstance(target, int):
            # Station number
            if target < 1 or target > 999:
                raise ValueError(f"Invalid station number: {target}")
            valid_targets.append(str(target))
        elif isinstance(target, str):
            # Zip code, coordinates, or address
            valid_targets.append(target)
        else:
            raise ValueError(f"Invalid target type: {type(target)}")

    return valid_targets

Data Conversion

def convert_units(value, from_unit, to_unit):
    """Convert between different units."""
    conversions = {
        # Temperature conversions
        ('C', 'F'): lambda x: x * 9/5 + 32,
        ('F', 'C'): lambda x: (x - 32) * 5/9,

        # Length conversions
        ('mm', 'in'): lambda x: x * 0.0393701,
        ('in', 'mm'): lambda x: x * 25.4,
        ('m', 'ft'): lambda x: x * 3.28084,
        ('ft', 'm'): lambda x: x * 0.3048,

        # Speed conversions
        ('m/s', 'mph'): lambda x: x * 2.23694,
        ('mph', 'm/s'): lambda x: x * 0.44704,
        ('km/h', 'mph'): lambda x: x * 0.621371,
        ('mph', 'km/h'): lambda x: x * 1.60934,
    }

    conversion_key = (from_unit, to_unit)
    if conversion_key in conversions:
        return conversions[conversion_key](float(value))
    else:
        raise ValueError(f"Conversion from {from_unit} to {to_unit} not supported")

def celsius_to_fahrenheit(celsius):
    """Convert Celsius to Fahrenheit."""
    return celsius * 9/5 + 32

def fahrenheit_to_celsius(fahrenheit):
    """Convert Fahrenheit to Celsius."""
    return (fahrenheit - 32) * 5/9

def mm_to_inches(mm):
    """Convert millimeters to inches."""
    return mm * 0.0393701

def inches_to_mm(inches):
    """Convert inches to millimeters."""
    return inches * 25.4

Data Processing

def extract_data_item(weather_data, data_item, include_qc=False):
    """Extract a specific data item from weather data."""
    extracted_data = []

    for record in weather_data.get_all_records():
        data_value = record.data_values.get(data_item)

        if data_value and data_value.value:
            entry = {
                'date': record.date,
                'station': record.station,
                'value': data_value.value,
                'unit': data_value.unit
            }

            if include_qc:
                entry['qc'] = data_value.qc

            extracted_data.append(entry)

    return extracted_data

def aggregate_by_station(weather_data, aggregation_func='mean'):
    """Aggregate weather data by station."""
    from collections import defaultdict

    station_data = defaultdict(list)

    # Group data by station
    for record in weather_data.get_all_records():
        station_data[record.station].append(record)

    aggregated = {}

    for station, records in station_data.items():
        aggregated[station] = {
            'record_count': len(records),
            'date_range': {
                'start': min(r.date for r in records),
                'end': max(r.date for r in records)
            },
            'data_items': {}
        }

        # Aggregate each data item
        data_items = set()
        for record in records:
            data_items.update(record.data_values.keys())

        for item in data_items:
            values = []
            for record in records:
                data_value = record.data_values.get(item)
                if data_value and data_value.value and data_value.qc != 'M':
                    try:
                        values.append(float(data_value.value))
                    except ValueError:
                        continue

            if values:
                if aggregation_func == 'mean':
                    result = sum(values) / len(values)
                elif aggregation_func == 'sum':
                    result = sum(values)
                elif aggregation_func == 'min':
                    result = min(values)
                elif aggregation_func == 'max':
                    result = max(values)
                else:
                    result = values

                aggregated[station]['data_items'][item] = {
                    'value': result,
                    'count': len(values),
                    'unit': records[0].data_values[item].unit if records[0].data_values.get(item) else None
                }

    return aggregated

Cache Management

import os
import pickle
import json
from datetime import datetime, timedelta

class CacheManager:
    """Simple cache manager for CIMIS data."""

    def __init__(self, cache_dir="cimis_cache", max_age_hours=24):
        self.cache_dir = cache_dir
        self.max_age_hours = max_age_hours
        os.makedirs(cache_dir, exist_ok=True)

    def _get_cache_path(self, cache_key, format='pickle'):
        """Get the cache file path."""
        extension = 'pkl' if format == 'pickle' else 'json'
        return os.path.join(self.cache_dir, f"{cache_key}.{extension}")

    def _generate_key(self, **params):
        """Generate cache key from parameters."""
        sorted_params = sorted(params.items())
        key_parts = []
        for k, v in sorted_params:
            if isinstance(v, list):
                v = '_'.join(str(x) for x in sorted(v))
            key_parts.append(f"{k}_{v}")
        return '_'.join(key_parts)

    def get(self, **params):
        """Get cached data."""
        cache_key = self._generate_key(**params)
        cache_path = self._get_cache_path(cache_key)

        if os.path.exists(cache_path):
            # Check if cache is still fresh
            file_time = datetime.fromtimestamp(os.path.getmtime(cache_path))
            if datetime.now() - file_time < timedelta(hours=self.max_age_hours):
                try:
                    with open(cache_path, 'rb') as f:
                        return pickle.load(f)
                except Exception:
                    # Remove corrupted cache file
                    os.remove(cache_path)

        return None

    def set(self, data, **params):
        """Set cached data."""
        cache_key = self._generate_key(**params)
        cache_path = self._get_cache_path(cache_key)

        try:
            with open(cache_path, 'wb') as f:
                pickle.dump(data, f)
        except Exception as e:
            print(f"Warning: Could not cache data: {e}")

    def clear_expired(self):
        """Clear expired cache files."""
        if not os.path.exists(self.cache_dir):
            return

        cutoff_time = datetime.now() - timedelta(hours=self.max_age_hours)

        for filename in os.listdir(self.cache_dir):
            file_path = os.path.join(self.cache_dir, filename)
            file_time = datetime.fromtimestamp(os.path.getmtime(file_path))

            if file_time < cutoff_time:
                try:
                    os.remove(file_path)
                except Exception:
                    pass

    def clear_all(self):
        """Clear all cache files."""
        if not os.path.exists(self.cache_dir):
            return

        for filename in os.listdir(self.cache_dir):
            file_path = os.path.join(self.cache_dir, filename)
            try:
                os.remove(file_path)
            except Exception:
                pass

Configuration Management

import json
import os
from typing import Dict, Any, Optional

class ConfigManager:
    """Configuration manager for CIMIS client settings."""

    def __init__(self, config_file: str = "cimis_config.json"):
        self.config_file = config_file
        self.config = self._load_config()

    def _load_config(self) -> Dict[str, Any]:
        """Load configuration from file."""
        if os.path.exists(self.config_file):
            try:
                with open(self.config_file, 'r') as f:
                    return json.load(f)
            except Exception:
                pass

        # Return default configuration
        return {
            'api_key': os.getenv('CIMIS_API_KEY'),
            'timeout': 30,
            'max_retries': 3,
            'cache_enabled': True,
            'cache_max_age_hours': 24,
            'default_unit_of_measure': 'M',
            'default_prioritize_scs': False
        }

    def save_config(self):
        """Save configuration to file."""
        try:
            with open(self.config_file, 'w') as f:
                json.dump(self.config, f, indent=2)
        except Exception as e:
            print(f"Warning: Could not save configuration: {e}")

    def get(self, key: str, default: Any = None) -> Any:
        """Get configuration value."""
        return self.config.get(key, default)

    def set(self, key: str, value: Any):
        """Set configuration value."""
        self.config[key] = value

    def update(self, **kwargs):
        """Update multiple configuration values."""
        self.config.update(kwargs)

Batch Processing

from datetime import date, timedelta
from typing import List, Generator, Tuple
import time

def split_date_range(start_date: date, end_date: date, chunk_days: int = 30) -> Generator[Tuple[date, date], None, None]:
    """Split a date range into smaller chunks."""
    current_date = start_date

    while current_date <= end_date:
        chunk_end = min(current_date + timedelta(days=chunk_days), end_date)
        yield current_date, chunk_end
        current_date = chunk_end + timedelta(days=1)

def split_target_list(targets: List, batch_size: int = 10) -> Generator[List, None, None]:
    """Split a target list into smaller batches."""
    for i in range(0, len(targets), batch_size):
        yield targets[i:i + batch_size]

def batch_process_with_rate_limit(
    client,
    targets: List,
    start_date: date,
    end_date: date,
    batch_size: int = 10,
    chunk_days: int = 30,
    delay_seconds: float = 1.0
):
    """Process large requests in batches with rate limiting."""
    all_records = []

    # Split targets into batches
    for target_batch in split_target_list(targets, batch_size):
        # Split date range into chunks
        for chunk_start, chunk_end in split_date_range(start_date, end_date, chunk_days):
            try:
                weather_data = client.get_daily_data(
                    targets=target_batch,
                    start_date=chunk_start,
                    end_date=chunk_end
                )

                all_records.extend(weather_data.get_all_records())

                # Rate limiting
                time.sleep(delay_seconds)

            except Exception as e:
                print(f"Error processing batch {target_batch} for {chunk_start} to {chunk_end}: {e}")
                continue

    return all_records

Quality Control

def assess_data_quality(weather_data, quality_threshold=0.8):
    """Assess the quality of weather data."""
    total_points = 0
    good_points = 0
    missing_points = 0
    estimated_points = 0

    quality_by_item = {}

    for record in weather_data.get_all_records():
        for item, value in record.data_values.items():
            total_points += 1

            if item not in quality_by_item:
                quality_by_item[item] = {'total': 0, 'good': 0, 'missing': 0, 'estimated': 0}

            quality_by_item[item]['total'] += 1

            if value.qc == ' ':  # Good data
                good_points += 1
                quality_by_item[item]['good'] += 1
            elif value.qc == 'M':  # Missing
                missing_points += 1
                quality_by_item[item]['missing'] += 1
            elif value.qc == 'Y':  # Estimated
                estimated_points += 1
                quality_by_item[item]['estimated'] += 1

    overall_quality = good_points / total_points if total_points > 0 else 0

    # Calculate quality percentage for each data item
    for item in quality_by_item:
        item_total = quality_by_item[item]['total']
        quality_by_item[item]['quality_percentage'] = (
            quality_by_item[item]['good'] / item_total * 100 if item_total > 0 else 0
        )

    quality_report = {
        'overall_quality_percentage': overall_quality * 100,
        'meets_threshold': overall_quality >= quality_threshold,
        'total_data_points': total_points,
        'good_data_points': good_points,
        'missing_data_points': missing_points,
        'estimated_data_points': estimated_points,
        'quality_by_item': quality_by_item
    }

    return quality_report

def filter_by_quality(weather_data, min_quality='good'):
    """Filter weather data by quality level."""
    quality_levels = {
        'good': [' '],           # Only measured/calculated
        'acceptable': [' ', 'Y'], # Measured/calculated + estimated
        'all': [' ', 'Y', 'M']   # Include missing data
    }

    allowed_qc_flags = quality_levels.get(min_quality, [' '])

    filtered_records = []

    for record in weather_data.get_all_records():
        # Check if record has any data meeting quality criteria
        has_good_data = False

        for item, value in record.data_values.items():
            if value.qc in allowed_qc_flags and value.value:
                has_good_data = True
                break

        if has_good_data:
            filtered_records.append(record)

    return filtered_records

Performance Monitoring

import time
from contextlib import contextmanager

@contextmanager
def measure_time(operation_name="Operation"):
    """Context manager to measure execution time."""
    start_time = time.time()
    try:
        yield
    finally:
        end_time = time.time()
        duration = end_time - start_time
        print(f"{operation_name} took {duration:.2f} seconds")

def benchmark_api_call(client, **kwargs):
    """Benchmark an API call."""
    with measure_time("API call"):
        result = client.get_daily_data(**kwargs)

    record_count = len(result.get_all_records())
    print(f"Retrieved {record_count} records")

    return result

Usage Examples

Complete Utility Usage

from python_cimis import CimisClient
from python_cimis.utils import FilenameGenerator
import os

# Initialize client with configuration
config = ConfigManager()
client = CimisClient(
    app_key=config.get('api_key'),
    timeout=config.get('timeout', 30)
)

# Setup caching
cache = CacheManager(max_age_hours=config.get('cache_max_age_hours', 24))

# Get data with caching
cache_params = {
    'targets': [2, 8],
    'start_date': '2023-06-01',
    'end_date': '2023-06-07'
}

weather_data = cache.get(**cache_params)
if not weather_data:
    print("Cache miss - fetching from API")
    with measure_time("Data retrieval"):
        weather_data = client.get_daily_data(**cache_params)
    cache.set(weather_data, **cache_params)
else:
    print("Cache hit - using cached data")

# Assess data quality
quality_report = assess_data_quality(weather_data)
print(f"Data quality: {quality_report['overall_quality_percentage']:.1f}%")

# Generate filename and export
filename = FilenameGenerator.generate(weather_data)
csv_file = client.export_to_csv(weather_data, filename=filename)
print(f"Exported to: {csv_file}")

This utilities module provides essential helper functions for working efficiently with the Python CIMIS Client library.