import mysite
from xgb.all_import import load_spark
sc, sqlctx = load_spark('/data1/andyfei/.local/spark-2.3.1-bin-hadoop2.7/', master='local[16]')
from pyspark.sql import functions as F
from pyspark import sql
import json
def extract_request(line):
if '[console] [debug] request:' in line:
return line[53:]
else:
return ''
def is_json(x):
try:
json.loads(x)
return True
except:
return False
df = sqlctx.read.text('/data1/andyfei/log/*')
df = df.select(F.udf(extract_request)(df.value).alias('query'))
df = df.where(df.query != '')
df = df.where(F.udf(is_json, sql.types.BooleanType())(df.query))
df.cache()
def get_type(data):
try:
data = json.loads(data)
return data['Type']
except:
return ''
get_type = F.udf(get_type)
df = df.select(df.query, get_type(df.query).alias('type'))
def extract_spotype(data):
data = json.loads(data)
data = data['keys']
return data[0]['type']
df.groupby('type').count().show()
df = df.select([getattr(df, key) for key in df.columns] \
+ [F.udf(extract_spotype)(df.query).alias('stype')])
df.selectExpr
df.where(df.type == 'INTERSECTION').selectExpr('query as text').repartition(1).write.text('intersection.json')
df.select('stype').groupBy('stype').count().toPandas()
po_name = df.where(df.stype == 'po\tname')
po_name_df = po_name.toPandas()
def get_prop_name(data):
data = json.loads(data)
data = data['keys']
return data[0]['prop_name']
po_name_stat = po_name.select(F.udf(get_prop_name)(po_name.query).alias('prop_name')).groupby('prop_name').count().toPandas()
po_name_stat.sort_values('count', ascending=False)