Commit 3759ed24 authored by Tianyang's avatar Tianyang

Question3 finished

parent eb239d96
No preview for this file type
import cassandra.cluster
import csv
import re
def connection():
import cassandra.cluster
cluster = cassandra.cluster.Cluster(['localhost'])
session = cluster.connect('caitiany')
return session
def databaseCreate_Q3(session):
query = """
CREATE TABLE database_kmeans (
date timestamp,
lon float,
lat float,
station varchar,
tmpf float,
dwpf float,
relh float,
drct float,
sknt float,
p01i float,
alti float,
mslp float,
vsby float,
gust float,
skyc1 varchar,
skyc2 varchar,
skyc3 varchar,
skyc4 varchar,
skyl1 float,
skyl2 float,
skyl3 float,
skyl4 float,
wxcodes varchar,
ice_accretion_1hr float,
ice_accretion_3hr float,
ice_accretion_6hr float,
peak_wind_gust float,
peak_wind_drct float,
peak_wind_time varchar,
feel float,
metar varchar,
PRIMARY KEY ((date),lon,lat,station)
)"""
session.execute(query)
print("DATA BASE database_kmeans created!")
def load_data(filename):
with open(filename) as f:
for r in csv.DictReader(f):
if not r["valid"]:
continue
for collonne in r:
if r[collonne] == "M":
r[collonne]= "nan"
data = {}
data["date"] = r["valid"]
data["station"] = r["station"]
data["lon"] = float(r["lon"])
data["lat"] = float(r["lat"])
data["tmpf"] = float(r["tmpf"])
data["dwpf"] = float(r["dwpf"])
data["relh"] = float(r["relh"])
data["drct"] = float(r["drct"])
data["sknt"] = float(r["sknt"])
data["p01i"] = float(r["p01i"])
data["alti"] = float(r["alti"])
data["mslp"] = float(r["mslp"])
data["vsby"] = float(r["vsby"])
data["gust"] = float(r["gust"])
data["skyc1"] = r["skyc1"]
data["skyc2"] = r["skyc2"]
data["skyc3"] = r["skyc3"]
data["skyc4"] = r["skyc4"]
data["skyl1"] = float(r["skyl1"])
data["skyl2"] = float(r["skyl2"])
data["skyl3"] = float(r["skyl3"])
data["skyl4"] = float(r["skyl4"])
data["wxcodes"] = r["wxcodes"]
data["ice_accretion_1hr"] = float(r["ice_accretion_1hr"])
data["ice_accretion_3hr"] = float(r["ice_accretion_3hr"])
data["ice_accretion_6hr"] = float(r["ice_accretion_6hr"])
data["peak_wind_gust"] = float(r["peak_wind_gust"])
data["peak_wind_drct"] = float(r["peak_wind_drct"])
data["peak_wind_time"] = r["peak_wind_time"]
data["feel"] = float(r["feel"])
data["metar"] = r["metar"]
yield data
def insection_sql_Q3(filename,session):
target = load_data(filename)
i = 1
for data in target:
i += 1
k = 0
if (i % 500 == 0):
k += 1
print(k,". 500 finished.....")
ligne = (
data["date"],
data["lon"],
data["lat"],
data["station"],
data["tmpf"],
data["dwpf"],
data["relh"],
data["drct"],
data["sknt"],
data["p01i"],
data["alti"],
data["mslp"],
data["vsby"],
data["gust"],
data["skyc1"],
data["skyc2"],
data["skyc3"],
data["skyc4"],
data["skyl1"],
data["skyl2"],
data["skyl3"],
data["skyl4"],
data["wxcodes"],
data["ice_accretion_1hr"],
data["ice_accretion_3hr"],
data["ice_accretion_6hr"],
data["peak_wind_gust"],
data["peak_wind_drct"],
data["peak_wind_time"],
data["feel"],
data["metar"])
query = """
INSERT INTO database_kmeans(
date,
lon,
lat,
station,
tmpf,
dwpf,
relh,
drct,
sknt,
p01i,
alti,
mslp,
vsby,
gust,
skyc1,
skyc2,
skyc3,
skyc4,
skyl1,
skyl2,
skyl3,
skyl4,
wxcodes,
ice_accretion_1hr,
ice_accretion_3hr,
ice_accretion_6hr,
peak_wind_gust,
peak_wind_drct,
peak_wind_time,
feel,
metar)
VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
"""
session.execute(query, ligne)
if __name__ == "__main__":
session = connection()
databaseCreate_Q3(session)
insection_sql_Q3("Projet-NF26/data.csv",session)
\ No newline at end of file
This diff is collapsed.
import numpy as np
import matplotlib.pyplot as plt
from functools import reduce
from database_pre import connection
from database_pre1 import connection
import matplotlib.pyplot as plt
......
import numpy as np
import matplotlib.pyplot as plt
from functools import reduce
from database_pre3 import connection
import matplotlib.pyplot as plt
import re
import folium
def add (x,y):
return x+y
def abs_diff(x,y):
return abs(x-y)
def diff(x,y):
return x-y
#caculate mean reduce
#input [count,mean]
def reduceFonction (x,y):
result = []
for i in range(2):
result.append(reduce(add,[x[i],y[i]]))
return result
#input [valeur] -> [count,mean]
def mapFonction1 (x):
return [1,x]
#input [count,mean] -> [mean]
def mapFonction2 (x):
return x[1]/x[0]
def testNan (x):
test = x != x
return test
def mapReduce_kmeans(data,targetNB):
results = dict()
for row in data.result():
data_target = row[targetNB]
if testNan(data_target):
continue
data_espace = (row[1],row[2],row[3])
if results.get(data_espace) is None:
results[data_espace] = mapFonction1(data_target)
else:
mapresult = mapFonction1(data_target)
results[data_espace] = reduceFonction(mapresult,results[data_espace])
for eachEspace in results:
results[eachEspace] = mapFonction2(results[eachEspace])
return results
def cluster_nb_diff(centre_new,centre):
sum = 0
for i in range(3):
sum += abs(centre_new[i][0]-centre[i][0])
return sum/3
#input [tmpt] -> [tmpt,tmpt,tmpt,tmpt]
def map1_kmeans(x):
return [x,x,x,x]
def mapCentre(x):
return [x[0],x[1],x[2],0]
#input [tmpt,tmpt,tmpt,tmpt] and [c1,c2,c3,0] -> [|tmpt - c1|,|tmpt - c2|,|tmpt - c3|,tmpt]
def reduceKmeans (x,y):
result = []
for i in range(4):
result.append(reduce(abs_diff,[x[i],y[i]]))
return result
#input [|tmpt - c1|,|tmpt - c2|,|tmpt - c3|,tmpt] -> [cluster number, min(|tmpt - c|), tmpt]
def map2_kmeans(x):
min_value = 10000000000000
index = 0
for each in range(3):
if min_value > x[each]:
min_value = x[each]
index = each
return [index,min_value,x[3]]
def MapnewCentre(x):
return x[1]/x[0]
def kmeans (data,targetNB):
#3centre with [point count, temprature centre]
centre = {0:[1,0],1:[1,0],2:[1,0]}
#cluster est pour stocler lat, lon de chaque point de chaque cluster
cluster = [[],[],[]]
result = mapReduce_kmeans(data,targetNB)
#mettre le premier 3 point comme le centres init
init_point_values = [result[i] for i in result.keys()][:3]
init_point_keys = [i for i in result.keys()][:3]
for key in centre.keys():
centre[key] = [1,init_point_values[key]]
cluster[key].append(init_point_keys[key])
#init the centre new and result new for mapreduce
centre_new = {0:[0,0],1:[0,0],2:[0,0]}
result_new = dict()
#When the number of point of cluster don't change,stop
while True:
for eachkey in result:
if eachkey in cluster[0] or eachkey in cluster[1] or eachkey in cluster[2]:
continue
#caculate the distance between the data of this lingne and the centre
#Map1_kemeans
result_new[eachkey] = map1_kmeans(result[eachkey])
centre_values = []
for each in centre:
centre_values.append(centre[each][1])
centre_values = mapCentre(centre_values)
#Reduce
result_new[eachkey] = reduceKmeans(result_new[eachkey],centre_values)
#Map2_kmeans
result_new[eachkey] = map2_kmeans(result_new[eachkey])
#Put all the distance and points into the clusters
#Result format [cluster number, min(|tmpt - c|),tmpt - c]
for eachpoint in result_new:
clusterNB = result_new[eachpoint][0]
centre_new[clusterNB][0] += 1
centre_new[clusterNB][1] += result_new[eachpoint][2]
cluster[clusterNB].append(eachpoint)
#compare centre_new and centre, if
if not cluster_nb_diff(centre_new,centre) > 1:
break
else:
#caculate the new centre
print ('jasdlkjalsdkjalskd ',cluster_nb_diff(centre_new,centre))
for eachculster in centre_new:
centre_new[eachculster][1] = MapnewCentre(centre_new[eachculster])
centre = centre_new
centre_new = {0:[0,0],1:[0,0],2:[0,0]}
result_new = dict()
cluster = [[],[],[]]
createMap(cluster)
def createMap (cluster):
mean_lat = 0
count = 0
for each in [cluster[0],cluster[1],cluster[2]]:
for each_pos in each:
mean_lat += each_pos[0]
count += 1
mean_lat = mean_lat/count
mean_lon = 0
count = 0
for each in [cluster[0],cluster[1],cluster[2]]:
for each_pos in each:
mean_lon += each_pos[1]
count += 1
mean_lon = mean_lon/count
m = folium.Map(location=[mean_lon,mean_lat],zoom_start=6)
color = {0:'blue',1:'red',2:'green'}
i = 0
for each in [cluster[0],cluster[1],cluster[2]]:
for each_pos in each:
folium.Marker([each_pos[1],each_pos[0]],
popup=each_pos[2],
icon=folium.Icon(color=color[i])).add_to(m)
i +=1
print(i)
m.save("Projet-NF26/map.html")
if __name__ == "__main__":
session = connection()
start = '2008-12-19'
end = '2012-12-14'
data = session.execute_async("select * from caitiany.database_kmeans where date >= '%s' and date <= '%s' ALLOW FILTERING"%(start,end))
kmeans(data,4)
test.png

20.8 KB

Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment