1 Star 0 Fork 0

superdiaodiao/tran_log_json

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
code.py 3.53 KB
一键复制 编辑 原始数据 按行查看 历史
superdiaodiao 提交于 2020-09-17 00:11 . update code.py.
import json
import time
import logging
from kafka import KafkaConsumer
from kafka import KafkaProducer
# define the func to transfer the json
def tran_json(input):
result= {}
data = json.loads(input)
# get "app_package_name", "app_type", "app_uuid" and "auth_passed"
result["app_package_name"]=data['app_package_name']
result["app_type"]=data['app_type']
result["app_uuid"]=data['app_uuid']
result["auth_passed"]=data['auth_passed']
# make the "client" node
result["client"]={}
result["client"]["dev_manufacturer"]=data["client"]["dev_manufacturer"]
result["client"]["dev_model"]=data["client"]["dev_model"]
# make the "common" node
result["common"]={}
if data["app_type"] == 100:
if data["client"]["user_agent"] == "":
result["common"]["os"]="UNKNOWN"
result["common"]["platform"]="H5"
elif data["client"]["user_agent"] == "iOS":
result["common"]["os"] = "iOS"
result["common"]["platform"] = "iOS"
elif data["client"]["user_agent"] == "Android":
result["common"]["os"] = "Android"
result["common"]["platform"] = "Android"
result["common"]["version"]=data["version"]
# get "country"
result["country"]=data['country']
# make the "location" node
result["location"]={}
try:
result["location"]["lat"]=float(data['latitude'])
except:
logging.error('Cannot convert the latitude: {}'.format(data['latitude']))
try:
result["location"]["lng"]=float(data['longtitude'])
except:
logging.error('Cannot convert the longtitude: {}'.format(data['longtitude']))
# get "os_lang" and "remote_addr"
result["os_lang"]=data['os_lang']
result["remote_addr"]=data['remote_addr']
# convert the log_time to the timestampMs
try:
log_time = data['log_time']
timeArray = time.strptime(log_time, "%Y-%m-%d %H:%M:%S")
timeStamp = int(time.mktime(timeArray) * 1000)
result["timestampMs"]=timeStamp
except:
logging.error('Cannot convert the log_time: {} to the timestampMs'.format(data['log_time']))
# get "user_id" and "version"
result["user_id"]=data['user_id']
result["version"]=data['version']
return result
if __name__ == '__main__':
source_topic = "src"
target_topic = "des"
server = ['127.0.0.1:9092']
consumer = KafkaConsumer(source_topic, bootstrap_servers=server, group_id='test', auto_offset_reset='earliest')
producer = KafkaProducer(bootstrap_servers=server, value_serializer=lambda m: json.dumps(m).encode())
for msg in consumer:
if msg is not None:
input = msg.value
result = tran_json(input)
producer.send(target_topic, result)
######### unit_test follows: ###########
# demo_input = '''{
# "app_package_name": "com.google.app",
# "app_type": 100,
# "app_uuid": "6e1343ff1942828",
# "auth_passed": true,
# "client": {
# "dev_manufacturer": "vivo",
# "dev_model": "vivo 1612",
# "user_agent": ""
# },
# "country": "ID",
# "latitude": "120.3399",
# "log_time": "2020-08-15 00:00:02",
# "longtitude": "30.1211",
# "os_lang": "in_ID",
# "remote_addr": "110.139.149.102",
# "user_id": "8371792",
# "version": "3.3.1"
# }'''
# print the output and save it to the file
# demo_result = tran_json(demo_input)
# print(demo_result)
# with open('output.json','w+') as f:
# json.dump(demo_result,f)
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/superdiaodiao/tran_log_json.git
[email protected]:superdiaodiao/tran_log_json.git
superdiaodiao
tran_log_json
tran_log_json
master

搜索帮助