Commit eb239d96 authored by Tianyang's avatar Tianyang

Q1 finished

parents
File added
import cassandra.cluster
def countData(database,session):
years = [i for i in range(2008,2018)]
for year in years:
count = 0
number = session.execute_async("select year from caitiany.%s where year = %d ALLOW FILTERING" %(database,year))
for row in number.result():
count += 1
print (year,' has ',count,' datas.\n')
if __name__ == "__main__":
cluster = cassandra.cluster.Cluster(['localhost'])
session = cluster.connect('caitiany')
countData('database_espace',session)
\ No newline at end of file
import cassandra.cluster
import csv
import re
def connection():
import cassandra.cluster
cluster = cassandra.cluster.Cluster(['localhost'])
session = cluster.connect('caitiany')
return session
def databaseCreate_Q1(session):
query = """
CREATE TABLE database_espace (
station varchar,
year int,
season varchar,
month int,
day int,
hour int,
minute int,
lon float,
lat float,
tmpf float,
dwpf float,
relh float,
drct float,
sknt float,
p01i float,
alti float,
mslp float,
vsby float,
gust float,
skyc1 varchar,
skyc2 varchar,
skyc3 varchar,
skyc4 varchar,
skyl1 float,
skyl2 float,
skyl3 float,
skyl4 float,
wxcodes varchar,
ice_accretion_1hr float,
ice_accretion_3hr float,
ice_accretion_6hr float,
peak_wind_gust float,
peak_wind_drct float,
peak_wind_time varchar,
feel float,
metar varchar,
PRIMARY KEY ((station),year, season, month,day,hour,minute)
)"""
session.execute(query)
print("DATA BASE database_espace created!")
def load_data(filename):
dateparser = re.compile("(?P<year>\d+)-(?P<month>\d+)-(?P<day>\d+) (?P<hour>\d+):(?P<minute>\d+)")
with open(filename) as f:
for r in csv.DictReader(f):
match_time = dateparser.match(r["valid"])
if not match_time:
continue
time = match_time.groupdict()
#add season
if 3<=int(time["month"])<=5:
r["season"]="Spring"
elif 6<=int(time["month"])<=8:
r["season"]="Summer"
elif 9<=int(time["month"])<=11:
r["season"]="Autumn"
elif int(time["month"]) in (12,1,2):
r["season"]="Winter"
else:
continue
for collonne in r:
if r[collonne] == "M":
r[collonne]= "nan"
data = {}
data["station"] = r["station"]
data["year"] = int(time["year"])
data["season"] = r["season"]
data["month"] = int(time["month"])
data["day"] = int(time["day"])
data["hour"] = int(time["hour"])
data["minute"] = int(time["minute"])
data["lon"] = float(r["lon"])
data["lat"] = float(r["lat"])
data["tmpf"] = float(r["tmpf"])
data["dwpf"] = float(r["dwpf"])
data["relh"] = float(r["relh"])
data["drct"] = float(r["drct"])
data["sknt"] = float(r["sknt"])
data["p01i"] = float(r["p01i"])
data["alti"] = float(r["alti"])
data["mslp"] = float(r["mslp"])
data["vsby"] = float(r["vsby"])
data["gust"] = float(r["gust"])
data["skyc1"] = r["skyc1"]
data["skyc2"] = r["skyc2"]
data["skyc3"] = r["skyc3"]
data["skyc4"] = r["skyc4"]
data["skyl1"] = float(r["skyl1"])
data["skyl2"] = float(r["skyl2"])
data["skyl3"] = float(r["skyl3"])
data["skyl4"] = float(r["skyl4"])
data["wxcodes"] = r["wxcodes"]
data["ice_accretion_1hr"] = float(r["ice_accretion_1hr"])
data["ice_accretion_3hr"] = float(r["ice_accretion_3hr"])
data["ice_accretion_6hr"] = float(r["ice_accretion_6hr"])
data["peak_wind_gust"] = float(r["peak_wind_gust"])
data["peak_wind_drct"] = float(r["peak_wind_drct"])
data["peak_wind_time"] = r["peak_wind_time"]
data["feel"] = float(r["feel"])
data["metar"] = r["metar"]
yield data
def insection_sql_Q1(filename,session):
target = load_data(filename)
i = 1
for data in target:
i += 1
if (i % 500 == 0):
print("500 finished.....")
ligne = (data["station"],
data["year"],
data["season"],
data["month"],
data["day"],
data["hour"],
data["minute"],
data["lon"],
data["lat"],
data["tmpf"],
data["dwpf"],
data["relh"],
data["drct"],
data["sknt"],
data["p01i"],
data["alti"],
data["mslp"],
data["vsby"],
data["gust"],
data["skyc1"],
data["skyc2"],
data["skyc3"],
data["skyc4"],
data["skyl1"],
data["skyl2"],
data["skyl3"],
data["skyl4"],
data["wxcodes"],
data["ice_accretion_1hr"],
data["ice_accretion_3hr"],
data["ice_accretion_6hr"],
data["peak_wind_gust"],
data["peak_wind_drct"],
data["peak_wind_time"],
data["feel"],
data["metar"])
query = """
INSERT INTO database_espace(
station,
year,
season,
month,
day,
hour,
minute,
lon,
lat,
tmpf,
dwpf,
relh,
drct,
sknt,
p01i,
alti,
mslp,
vsby,
gust,
skyc1,
skyc2,
skyc3,
skyc4,
skyl1,
skyl2,
skyl3,
skyl4,
wxcodes,
ice_accretion_1hr,
ice_accretion_3hr,
ice_accretion_6hr,
peak_wind_gust,
peak_wind_drct,
peak_wind_time,
feel,
metar)
VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
"""
session.execute(query, ligne)
if __name__ == "__main__":
session = connection()
databaseCreate_Q1(session)
insection_sql_Q1("Projet-NF26/data.csv",session)
\ No newline at end of file
import numpy as np
import matplotlib.pyplot as plt
from functools import reduce
from database_pre import connection
import matplotlib.pyplot as plt
table_variable = ['station varchar',
'year',
'season',
'month',
'day',
'hour',
'minute',
'lon',
'lat',
'tmpf',
'dwpf',
'relh',
'drct',
'sknt',
'p01i',
'alti',
'mslp',
'vsby',
'gust',
'skyc1',
'skyc2',
'skyc3',
'skyc4',
'skyl1',
'skyl2',
'skyl3',
'skyl4',
'wxcodes',
'ice_accretion_1hr',
'ice_accretion_3hr',
'ice_accretion_6hr',
'peak_wind_gust',
'peak_wind_drct',
'peak_wind_time',
'feel',
'metar']
def add (x,y):
return x+y
def fmax(x,y):
return max(x,y)
def fmin(x,y):
return min(x,y)
#caculate mean, max, min reduce
#input [count,mean,max,min]
def reduceFonction (x,y):
result = []
fonctions = {0:'add',1:'add',2:'fmax',3:'fmin'}
for i in range(4):
result.append(reduce(eval(fonctions.get(i)),[x[i],y[i]]))
return result
#input [valeur] -> [count,mean,max,min]
def mapFonction1 (x):
return [1,x,x,x]
#input [count,mean,max,min] -> [mean,max,min]
def mapFonction2 (x):
return [x[1]/x[0],x[2],x[3]]
def testNan (x):
test = x != x
return test
def mapReduce_mmm(data,timeNB,targetNB):
results = dict()
for row in data.result():
if timeNB == 1:
data_time = row[timeNB]
elif timeNB == 2:
data_time = (row[timeNB-1],row[timeNB])
elif timeNB == 3:
data_time = (row[timeNB-2],row[timeNB-1],row[timeNB])
else:
assert 1==2, "Doesn\'t exits!"
data_target = row[targetNB]
if testNan(data_time) or testNan(data_target):
continue
if results.get(data_time) is None:
results[data_time] = mapFonction1(data_target)
else:
mapresult = mapFonction1(data_target)
results[data_time] = reduceFonction(mapresult,results[data_time])
for eachTime in results:
results[eachTime] = mapFonction2(results[eachTime])
return results
def zipValues (values):
result = [[],[],[]]
for i in range(3):
for each in values:
result[i].append(each[i])
return result
def drawCourbe_history(session,time,target,timeNB,targetNB,espace):
data = session.execute_async("select * from caitiany.database_espace where station = '%s'"%espace )
results = mapReduce_mmm(data,timeNB,targetNB)
keys = list(results.keys())
values = list(results.values())
zipped_result = zipValues(values)
if isinstance(keys[0],tuple):
for each in keys:
index_each = keys.index(each)
each = list(each)
if timeNB == 2:
each[0] = str(each[0])
keys[index_each] = ".".join(each)
elif timeNB == 3:
each[0] = str(each[0])
each[2] = str(each[2])
keys[index_each] = each[0] +'.'+ each[2]
fig, ax = plt.subplots(1, 1)
plt.plot(keys,zipped_result[0],'x-',label="mean")
plt.plot(keys,zipped_result[1],'+-',label="max")
plt.plot(keys,zipped_result[2],'b--',label="min")
plt.xticks(keys, keys, rotation=45, fontsize=5)
plt.ylabel(target)
plt.grid(True)
if timeNB == 3:
for label in ax.get_xticklabels():
label.set_visible(False)
for label in ax.get_xticklabels()[::6]:
label.set_visible(True)
plt.legend(bbox_to_anchor=(1.0, 1), loc=1, borderaxespad=0.)
plt.savefig("Projet-NF26/test.png")
def checkNBvariable (x):
i=0
for each in table_variable:
if x == each:
return i
i += 1
print ('Doesn\'t exist!!')
def drawCourbe_season(session,season,target,targetNB,espace):
data = session.execute_async("select * from caitiany.database_espace where station = '%s'"%espace )
results = mapReduce_mmm(data,2,targetNB)
results = seprateSeason(results,season)
keys = list(results.keys())
values = list(results.values())
print(values)
zipped_result = zipValues(values)
for each in keys:
index_each = keys.index(each)
each = list(each)
each[0] = str(each[0])
keys[index_each] = ".".join(each)
fig, ax = plt.subplots(1, 1)
plt.plot(keys,zipped_result[0],'x-',label="mean")
plt.plot(keys,zipped_result[1],'+-',label="max")
plt.plot(keys,zipped_result[2],'b--',label="min")
plt.xticks(keys, keys, rotation=45, fontsize=5)
plt.ylabel(target)
plt.grid(True)
plt.legend(bbox_to_anchor=(1.0, 1), loc=1, borderaxespad=0.)
plt.savefig("Projet-NF26/test.png")
def seprateSeason (results,season):
output = dict()
for each in results:
if season in each:
output[each] = results[each]
return output
if __name__ == "__main__":
session = connection()
espace = input("Please enter what station you want to search: ")
#time = input("Per which kind of time: ")
target = input("Which value do you want to check: ")
#timeNB = checkNBvariable(time)
targetNB = checkNBvariable(target)
#drawCourbe_history(session,time,target,timeNB,targetNB,espace)
#Season check
season = input("Please enter which season you want to search: ")
drawCourbe_season(session,season,target,targetNB,espace)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment