In [1]:
import mysite
In [2]:
from xgb.all_import import load_spark
In [3]:
sc, sqlctx = load_spark('/data1/andyfei/.local/spark-2.3.1-bin-hadoop2.7/', master='local[16]')
In [76]:
from pyspark.sql import functions as F
from pyspark import sql
In [28]:
import json
In [17]:
def extract_request(line):
    if '[console] [debug] request:' in line:
        return line[53:]
    else:
        return ''
In [73]:
def is_json(x):
    try:
        json.loads(x)
        return True
    except:
        return False
In [78]:
df = sqlctx.read.text('/data1/andyfei/log/*')
df = df.select(F.udf(extract_request)(df.value).alias('query'))
df = df.where(df.query != '')
df = df.where(F.udf(is_json, sql.types.BooleanType())(df.query))
df.cache()
Out[78]:
DataFrame[query: string]
In [79]:
def get_type(data):
    try:
        data = json.loads(data)
        return data['Type']
    except:
        return ''
get_type = F.udf(get_type)
In [82]:
df = df.select(df.query, get_type(df.query).alias('type'))
In [85]:
def extract_spotype(data):
    data = json.loads(data)
    data = data['keys']
    return data[0]['type']
In [83]:
df.groupby('type').count().show()
+------------+------+
|        type| count|
+------------+------+
|INTERSECTION|  3812|
|        LIST|922099|
+------------+------+

In [86]:
df = df.select([getattr(df, key) for key in df.columns] \
        + [F.udf(extract_spotype)(df.query).alias('stype')])
In [111]:
df.selectExpr
Out[111]:
DataFrame[query: string, type: string, stype: string]
In [130]:
df.where(df.type == 'INTERSECTION').selectExpr('query as text').repartition(1).write.text('intersection.json')
In [91]:
df.select('stype').groupBy('stype').count().toPandas()
Out[91]:
stype count
0 sp\tid 6790
1 po\tname 39240
2 po\tid 318
3 sp\tname 879563
In [92]:
po_name = df.where(df.stype == 'po\tname')
In [93]:
po_name_df = po_name.toPandas()
In [101]:
def get_prop_name(data):
    data = json.loads(data)
    data = data['keys']
    return data[0]['prop_name']
In [104]:
po_name_stat = po_name.select(F.udf(get_prop_name)(po_name.query).alias('prop_name')).groupby('prop_name').count().toPandas()
In [108]:
po_name_stat.sort_values('count', ascending=False)
Out[108]:
prop_name count
50 名称 32321
54 语言 1148
1 系列 806
119 歌曲 711
115 ans_id 662
118 子类型 543
81 地区 405
41 演员 339
45 儿子 326
129 歌手 228
98 上映时间 221
110 电视剧 203
25 导演 130
65 风格 97
135 父亲 95
116 母亲 77
34 发行时间 70
126 主演 47
99 类别 41
35 儿女 36
92 电影 34
7 出生地 34
148 哥哥 32
133 兄弟 28
127 首唱 26
51 性别 25
28 外婆 25
125 妻子 24
122 姐姐 24
111 成员 23
... ... ...
24 著作 1
14 儿女.父亲 1
11 儿女.母亲 1
10 儿女.教师 1
8 儿女.图片 1
6 子女 1
5 未婚妻 1
4 男友 1
53 身高 1
58 评论 1
108 上司.名称 1
59 父亲.英文名 1
106 嫂嫂 1
104 全称 1
102 作品 1
101 男友.婚姻 1
100 祖父.奶奶 1
97 字号 1
96 母亲.图片 1
94 搭档 1
93 姨妈 1
88 表演.演员 1
75 妻子.出生地 1
73 养父 1
70 姐姐.儿女 1
68 女儿.名称 1
64 朝代 1
62 妹夫 1
60 职业 1
76 母亲.外文名 1

152 rows × 2 columns