import csv import re def getData(container, value, type): result = container[value] if (result == "M" or result == "T") : result = None if (type == "string"): return result elif (type == "float") : if (result != None): result = float(result) else : return -8000 return result elif (type == "date") : if (result != None) : dateparser = re.compile("(?P\d+)-(?P\d+)-(?P\d+) (?P\d+):(?P\d+)") match = dateparser.match(container[value]) if not match : return "continue" data = match.groupdict() result = ( int(data["year"]), int(data["month"]), int(data["day"]), int(data["hour"]), int(data["minute"]), ) return result else : return None def loadata(filename): with open(filename) as f: for r in csv.DictReader(f): timestamp = getData(r, "valid", "date") if (timestamp == "continue" or timestamp == None): continue data = {} data["valid"] = timestamp data["station"] = getData(r, "station", "string") data["lon"] = getData(r, "lon", "float") data["lat"] = getData(r, "lat", "float") import csv import re def getData(container, value, type): result = container[value] if (result == "M" or result == "T") : result = None if (type == "string"): return result elif (type == "float") : if (result != None): result = float(result) return result elif (type == "date") : if (result != None) : dateparser = re.compile("(?P\d+)-(?P\d+)-(?P\d+) (?P\d+):(?P\d+)") match = dateparser.match(container[value]) if not match : return "continue" data = match.groupdict() result = ( int(data["year"]), int(data["month"]), int(data["day"]), int(data["hour"]), int(data["minute"]), ) return result else : return None def loadata(filename): with open(filename) as f: for r in csv.DictReader(f): timestamp = getData(r, "valid", "date") if (timestamp == "continue" or timestamp == None): continue data = {} data["valid"] = timestamp data["station"] = getData(r, "station", "string") data["lon"] = getData(r, "lon", "float") data["lat"] = getData(r, "lat", "float") data["tmpf"] = getData(r, "tmpf", "float") data["dwpf"] = getData(r, "dwpf", "float") data["relh"] = getData(r, "relh", "float") data["drct"] = getData(r, "drct", "float") data["sknt"] = getData(r, "sknt", "float") data["p01i"] = getData(r, "p01i", "float") data["alti"] = getData(r, "alti", "float") data["mslp"] = getData(r, "mslp", "float") data["vsby"] = getData(r, "vsby", "float") data["gust"] = getData(r, "gust", "float") data["skyc1"] = getData(r, "skyc1", "string") data["skyc2"] = getData(r, "skyc2", "string") data["skyc3"] = getData(r, "skyc3", "string") data["skyc4"] = getData(r, "skyc4", "string") data["skyl1"] = getData(r, "skyl1", "float") data["skyl2"] = getData(r, "skyl2", "float") data["skyl3"] = getData(r, "skyl3", "float") data["skyl4"] = getData(r, "skyl4", "float") data["wxcodes"] = getData(r, "wxcodes", "string") data["ice_accretion_1hr"] = getData(r, "ice_accretion_1hr", "float") data["ice_accretion_3hr"] = getData(r, "ice_accretion_3hr", "float") data["ice_accretion_6hr"] = getData(r, "ice_accretion_6hr", "float") data["peak_wind_gust"] = getData(r, "peak_wind_gust", "float") data["peak_wind_drct"] = getData(r, "peak_wind_drct", "float") data["peak_wind_time"] = getData(r, "peak_wind_time", "float") data["feel"] = getData(r, "feel", "float") data["metar"] = getData(r, "metar", "string") yield data import json from cassandra.cluster import Cluster cluster = Cluster(['localhost']) session = cluster.connect('dbermond_projet') query = ''' DROP TABLE IF EXISTS weatherByTime ; ''' session.execute(query) query = ''' CREATE TABLE weatherByTime( station text, valid_year varint, valid_month varint, valid_day varint, valid_hour varint, valid_minute varint, lon float, lat float, tmpf float, dwpf float, relh float, drct float, sknt float, p01i float, alti float, mslp float, vsby float, gust float, skyc1 text, skyc2 text, skyc3 text, skyc4 text, skyl1 float, skyl2 float, skyl3 float, skyl4 float, wxcodes text, ice_accretion_1hr float, ice_accretion_3hr float, ice_accretion_6hr float, peak_wind_gust float, peak_wind_drct float, peak_wind_time float, feel float, metar text, PRIMARY KEY ((valid_year, valid_month, valid_day), valid_hour, valid_minute, metar) ); ''' session.execute(query) def getWeatherByTime(csvfilename, session, limit): data = loadata(csvfilename) n = 0 ; for r in data: t = ( r["station"], r["valid"][0], r["valid"][1], r["valid"][2], r["valid"][3], r["valid"][4], r["lon"], r["lat"], r["tmpf"], r["dwpf"], r["relh"], r["drct"], r["sknt"], r["alti"], r["mslp"], r["p01i"], r["vsby"], r["gust"], r["skyc1"], r["skyc2"], r["skyc3"], r["skyc4"], r["skyl1"], r["skyl2"], r["skyl3"], r["skyl4"], r["wxcodes"], r["ice_accretion_1hr"], r["ice_accretion_3hr"], r["ice_accretion_6hr"], r["peak_wind_gust"], r["peak_wind_drct"], r["peak_wind_time"], r["feel"], r["metar"], ) query = """ INSERT INTO weatherByTime( station, valid_year, valid_month, valid_day, valid_hour, valid_minute, lon, lat, tmpf, dwpf, relh, drct, sknt, p01i, alti, mslp, vsby, gust, skyc1, skyc2, skyc3, skyc4, skyl1, skyl2, skyl3, skyl4, wxcodes, ice_accretion_1hr, ice_accretion_3hr, ice_accretion_6hr, peak_wind_gust, peak_wind_drct, peak_wind_time, feel, metar) VALUES (%s, %s, %s ,%s ,%s, %s, %s, %s ,%s ,%s, %s, %s, %s ,%s ,%s, %s, %s, %s ,%s ,%s, %s, %s, %s ,%s ,%s, %s, %s, %s ,%s ,%s, %s, %s, %s ,%s ,%s) """ #(%s, %d, %d, %d, %d, %d, %f, %f, %f, %f, %f, %f, %f, %f, %f, %f, %f, %f, %s, %s, %s, %s, %f, %f, %f, %f, %s, ) session.execute(query, t) n += 1 if (limit != 0 and n >= limit) : break