Utilities
Utility functions and helper classes for the Python CIMIS Client.
FilenameGenerator
Utility class for generating intelligent CSV filenames.
- class python_cimis.utils.FilenameGenerator(base_directory: str | Path | None = None)[source]
Bases:
objectGenerates intelligent filenames for CIMIS data exports based on: - Station names and numbers - Date ranges - Data types (weather data, stations, etc.)
- __init__(base_directory: str | Path | None = None)[source]
Initialize the filename generator.
- Parameters:
base_directory – Base directory for file output (uses current directory if None)
- generate_weather_filename(weather_data: WeatherData) str[source]
Generate filename for weather data export.
- Parameters:
weather_data – WeatherData object containing the data
- Returns:
Generated filename with full path
- generate_stations_filename(stations: List[Station]) str[source]
Generate filename for station data export.
- Parameters:
stations – List of Station objects
- Returns:
Generated filename with full path
- generate_zip_codes_filename(zip_codes: List[str]) str[source]
Generate filename for zip code data export.
- Parameters:
zip_codes – List of zip codes
- Returns:
Generated filename with full path
- generate_custom_filename(data_type: str, identifiers: List[str] | None = None, date_range: str | None = None) str[source]
Generate custom filename with specified components.
- Parameters:
data_type – Type of data (e.g., ‘weather’, ‘stations’, ‘hourly’)
identifiers – List of identifiers (station numbers, zip codes, etc.)
date_range – Date range string
- Returns:
Generated filename with full path
- generate_for_weather_data(weather_data: WeatherData) str[source]
Generate filename for weather data export. Alias for generate_weather_filename for compatibility.
Usage Examples
from python_cimis.utils import FilenameGenerator
# Generate filename from weather data
weather_data = client.get_daily_data(targets=[2], start_date="2023-06-01", end_date="2023-06-07")
filename = FilenameGenerator.generate(weather_data)
print(filename) # Output: "Station2_FivePoints_20230601_to_20230607.csv"
# Multiple stations
weather_data = client.get_daily_data(targets=[2, 8, 127], start_date="2023-06-01", end_date="2023-06-07")
filename = FilenameGenerator.generate(weather_data)
print(filename) # Output: "MultiStation_20230601_to_20230607.csv"
Filename Patterns
The FilenameGenerator creates filenames using these patterns:
Single Station:
Station{number}_{name}_{start_date}_to_{end_date}.csv
Multiple Stations:
MultiStation_{start_date}_to_{end_date}.csv
Date Format:
YYYYMMDD (e.g., 20230601)
Examples:
Station2_FivePoints_20230601_to_20230607.csv
Station127_Fresno_20230101_to_20231231.csv
MultiStation_20230601_to_20230607.csv
Helper Functions
Data Validation
def validate_date_range(start_date, end_date, max_days=365):
"""Validate date range for API requests."""
from datetime import datetime, timedelta
# Convert strings to date objects if needed
if isinstance(start_date, str):
start_date = datetime.strptime(start_date, '%Y-%m-%d').date()
if isinstance(end_date, str):
end_date = datetime.strptime(end_date, '%Y-%m-%d').date()
# Validate range
if start_date > end_date:
raise ValueError("Start date must be before end date")
date_diff = (end_date - start_date).days
if date_diff > max_days:
raise ValueError(f"Date range too large: {date_diff} days (max: {max_days})")
return True
def validate_targets(targets):
"""Validate target list for API requests."""
if not targets:
raise ValueError("At least one target must be specified")
valid_targets = []
for target in targets:
if isinstance(target, int):
# Station number
if target < 1 or target > 999:
raise ValueError(f"Invalid station number: {target}")
valid_targets.append(str(target))
elif isinstance(target, str):
# Zip code, coordinates, or address
valid_targets.append(target)
else:
raise ValueError(f"Invalid target type: {type(target)}")
return valid_targets
Data Conversion
def convert_units(value, from_unit, to_unit):
"""Convert between different units."""
conversions = {
# Temperature conversions
('C', 'F'): lambda x: x * 9/5 + 32,
('F', 'C'): lambda x: (x - 32) * 5/9,
# Length conversions
('mm', 'in'): lambda x: x * 0.0393701,
('in', 'mm'): lambda x: x * 25.4,
('m', 'ft'): lambda x: x * 3.28084,
('ft', 'm'): lambda x: x * 0.3048,
# Speed conversions
('m/s', 'mph'): lambda x: x * 2.23694,
('mph', 'm/s'): lambda x: x * 0.44704,
('km/h', 'mph'): lambda x: x * 0.621371,
('mph', 'km/h'): lambda x: x * 1.60934,
}
conversion_key = (from_unit, to_unit)
if conversion_key in conversions:
return conversions[conversion_key](float(value))
else:
raise ValueError(f"Conversion from {from_unit} to {to_unit} not supported")
def celsius_to_fahrenheit(celsius):
"""Convert Celsius to Fahrenheit."""
return celsius * 9/5 + 32
def fahrenheit_to_celsius(fahrenheit):
"""Convert Fahrenheit to Celsius."""
return (fahrenheit - 32) * 5/9
def mm_to_inches(mm):
"""Convert millimeters to inches."""
return mm * 0.0393701
def inches_to_mm(inches):
"""Convert inches to millimeters."""
return inches * 25.4
Data Processing
def extract_data_item(weather_data, data_item, include_qc=False):
"""Extract a specific data item from weather data."""
extracted_data = []
for record in weather_data.get_all_records():
data_value = record.data_values.get(data_item)
if data_value and data_value.value:
entry = {
'date': record.date,
'station': record.station,
'value': data_value.value,
'unit': data_value.unit
}
if include_qc:
entry['qc'] = data_value.qc
extracted_data.append(entry)
return extracted_data
def aggregate_by_station(weather_data, aggregation_func='mean'):
"""Aggregate weather data by station."""
from collections import defaultdict
station_data = defaultdict(list)
# Group data by station
for record in weather_data.get_all_records():
station_data[record.station].append(record)
aggregated = {}
for station, records in station_data.items():
aggregated[station] = {
'record_count': len(records),
'date_range': {
'start': min(r.date for r in records),
'end': max(r.date for r in records)
},
'data_items': {}
}
# Aggregate each data item
data_items = set()
for record in records:
data_items.update(record.data_values.keys())
for item in data_items:
values = []
for record in records:
data_value = record.data_values.get(item)
if data_value and data_value.value and data_value.qc != 'M':
try:
values.append(float(data_value.value))
except ValueError:
continue
if values:
if aggregation_func == 'mean':
result = sum(values) / len(values)
elif aggregation_func == 'sum':
result = sum(values)
elif aggregation_func == 'min':
result = min(values)
elif aggregation_func == 'max':
result = max(values)
else:
result = values
aggregated[station]['data_items'][item] = {
'value': result,
'count': len(values),
'unit': records[0].data_values[item].unit if records[0].data_values.get(item) else None
}
return aggregated
Cache Management
import os
import pickle
import json
from datetime import datetime, timedelta
class CacheManager:
"""Simple cache manager for CIMIS data."""
def __init__(self, cache_dir="cimis_cache", max_age_hours=24):
self.cache_dir = cache_dir
self.max_age_hours = max_age_hours
os.makedirs(cache_dir, exist_ok=True)
def _get_cache_path(self, cache_key, format='pickle'):
"""Get the cache file path."""
extension = 'pkl' if format == 'pickle' else 'json'
return os.path.join(self.cache_dir, f"{cache_key}.{extension}")
def _generate_key(self, **params):
"""Generate cache key from parameters."""
sorted_params = sorted(params.items())
key_parts = []
for k, v in sorted_params:
if isinstance(v, list):
v = '_'.join(str(x) for x in sorted(v))
key_parts.append(f"{k}_{v}")
return '_'.join(key_parts)
def get(self, **params):
"""Get cached data."""
cache_key = self._generate_key(**params)
cache_path = self._get_cache_path(cache_key)
if os.path.exists(cache_path):
# Check if cache is still fresh
file_time = datetime.fromtimestamp(os.path.getmtime(cache_path))
if datetime.now() - file_time < timedelta(hours=self.max_age_hours):
try:
with open(cache_path, 'rb') as f:
return pickle.load(f)
except Exception:
# Remove corrupted cache file
os.remove(cache_path)
return None
def set(self, data, **params):
"""Set cached data."""
cache_key = self._generate_key(**params)
cache_path = self._get_cache_path(cache_key)
try:
with open(cache_path, 'wb') as f:
pickle.dump(data, f)
except Exception as e:
print(f"Warning: Could not cache data: {e}")
def clear_expired(self):
"""Clear expired cache files."""
if not os.path.exists(self.cache_dir):
return
cutoff_time = datetime.now() - timedelta(hours=self.max_age_hours)
for filename in os.listdir(self.cache_dir):
file_path = os.path.join(self.cache_dir, filename)
file_time = datetime.fromtimestamp(os.path.getmtime(file_path))
if file_time < cutoff_time:
try:
os.remove(file_path)
except Exception:
pass
def clear_all(self):
"""Clear all cache files."""
if not os.path.exists(self.cache_dir):
return
for filename in os.listdir(self.cache_dir):
file_path = os.path.join(self.cache_dir, filename)
try:
os.remove(file_path)
except Exception:
pass
Configuration Management
import json
import os
from typing import Dict, Any, Optional
class ConfigManager:
"""Configuration manager for CIMIS client settings."""
def __init__(self, config_file: str = "cimis_config.json"):
self.config_file = config_file
self.config = self._load_config()
def _load_config(self) -> Dict[str, Any]:
"""Load configuration from file."""
if os.path.exists(self.config_file):
try:
with open(self.config_file, 'r') as f:
return json.load(f)
except Exception:
pass
# Return default configuration
return {
'api_key': os.getenv('CIMIS_API_KEY'),
'timeout': 30,
'max_retries': 3,
'cache_enabled': True,
'cache_max_age_hours': 24,
'default_unit_of_measure': 'M',
'default_prioritize_scs': False
}
def save_config(self):
"""Save configuration to file."""
try:
with open(self.config_file, 'w') as f:
json.dump(self.config, f, indent=2)
except Exception as e:
print(f"Warning: Could not save configuration: {e}")
def get(self, key: str, default: Any = None) -> Any:
"""Get configuration value."""
return self.config.get(key, default)
def set(self, key: str, value: Any):
"""Set configuration value."""
self.config[key] = value
def update(self, **kwargs):
"""Update multiple configuration values."""
self.config.update(kwargs)
Batch Processing
from datetime import date, timedelta
from typing import List, Generator, Tuple
import time
def split_date_range(start_date: date, end_date: date, chunk_days: int = 30) -> Generator[Tuple[date, date], None, None]:
"""Split a date range into smaller chunks."""
current_date = start_date
while current_date <= end_date:
chunk_end = min(current_date + timedelta(days=chunk_days), end_date)
yield current_date, chunk_end
current_date = chunk_end + timedelta(days=1)
def split_target_list(targets: List, batch_size: int = 10) -> Generator[List, None, None]:
"""Split a target list into smaller batches."""
for i in range(0, len(targets), batch_size):
yield targets[i:i + batch_size]
def batch_process_with_rate_limit(
client,
targets: List,
start_date: date,
end_date: date,
batch_size: int = 10,
chunk_days: int = 30,
delay_seconds: float = 1.0
):
"""Process large requests in batches with rate limiting."""
all_records = []
# Split targets into batches
for target_batch in split_target_list(targets, batch_size):
# Split date range into chunks
for chunk_start, chunk_end in split_date_range(start_date, end_date, chunk_days):
try:
weather_data = client.get_daily_data(
targets=target_batch,
start_date=chunk_start,
end_date=chunk_end
)
all_records.extend(weather_data.get_all_records())
# Rate limiting
time.sleep(delay_seconds)
except Exception as e:
print(f"Error processing batch {target_batch} for {chunk_start} to {chunk_end}: {e}")
continue
return all_records
Quality Control
def assess_data_quality(weather_data, quality_threshold=0.8):
"""Assess the quality of weather data."""
total_points = 0
good_points = 0
missing_points = 0
estimated_points = 0
quality_by_item = {}
for record in weather_data.get_all_records():
for item, value in record.data_values.items():
total_points += 1
if item not in quality_by_item:
quality_by_item[item] = {'total': 0, 'good': 0, 'missing': 0, 'estimated': 0}
quality_by_item[item]['total'] += 1
if value.qc == ' ': # Good data
good_points += 1
quality_by_item[item]['good'] += 1
elif value.qc == 'M': # Missing
missing_points += 1
quality_by_item[item]['missing'] += 1
elif value.qc == 'Y': # Estimated
estimated_points += 1
quality_by_item[item]['estimated'] += 1
overall_quality = good_points / total_points if total_points > 0 else 0
# Calculate quality percentage for each data item
for item in quality_by_item:
item_total = quality_by_item[item]['total']
quality_by_item[item]['quality_percentage'] = (
quality_by_item[item]['good'] / item_total * 100 if item_total > 0 else 0
)
quality_report = {
'overall_quality_percentage': overall_quality * 100,
'meets_threshold': overall_quality >= quality_threshold,
'total_data_points': total_points,
'good_data_points': good_points,
'missing_data_points': missing_points,
'estimated_data_points': estimated_points,
'quality_by_item': quality_by_item
}
return quality_report
def filter_by_quality(weather_data, min_quality='good'):
"""Filter weather data by quality level."""
quality_levels = {
'good': [' '], # Only measured/calculated
'acceptable': [' ', 'Y'], # Measured/calculated + estimated
'all': [' ', 'Y', 'M'] # Include missing data
}
allowed_qc_flags = quality_levels.get(min_quality, [' '])
filtered_records = []
for record in weather_data.get_all_records():
# Check if record has any data meeting quality criteria
has_good_data = False
for item, value in record.data_values.items():
if value.qc in allowed_qc_flags and value.value:
has_good_data = True
break
if has_good_data:
filtered_records.append(record)
return filtered_records
Performance Monitoring
import time
from contextlib import contextmanager
@contextmanager
def measure_time(operation_name="Operation"):
"""Context manager to measure execution time."""
start_time = time.time()
try:
yield
finally:
end_time = time.time()
duration = end_time - start_time
print(f"{operation_name} took {duration:.2f} seconds")
def benchmark_api_call(client, **kwargs):
"""Benchmark an API call."""
with measure_time("API call"):
result = client.get_daily_data(**kwargs)
record_count = len(result.get_all_records())
print(f"Retrieved {record_count} records")
return result
Usage Examples
Complete Utility Usage
from python_cimis import CimisClient
from python_cimis.utils import FilenameGenerator
import os
# Initialize client with configuration
config = ConfigManager()
client = CimisClient(
app_key=config.get('api_key'),
timeout=config.get('timeout', 30)
)
# Setup caching
cache = CacheManager(max_age_hours=config.get('cache_max_age_hours', 24))
# Get data with caching
cache_params = {
'targets': [2, 8],
'start_date': '2023-06-01',
'end_date': '2023-06-07'
}
weather_data = cache.get(**cache_params)
if not weather_data:
print("Cache miss - fetching from API")
with measure_time("Data retrieval"):
weather_data = client.get_daily_data(**cache_params)
cache.set(weather_data, **cache_params)
else:
print("Cache hit - using cached data")
# Assess data quality
quality_report = assess_data_quality(weather_data)
print(f"Data quality: {quality_report['overall_quality_percentage']:.1f}%")
# Generate filename and export
filename = FilenameGenerator.generate(weather_data)
csv_file = client.export_to_csv(weather_data, filename=filename)
print(f"Exported to: {csv_file}")
This utilities module provides essential helper functions for working efficiently with the Python CIMIS Client library.