/*
 * Decompiled with CFR 0.152.
 */
package com.digiwin.dmp.utils;

import com.digiwin.dmp.constant.ImpalaUtilConstants;
import com.digiwin.dmp.model.HiveTableMeta;
import com.digiwin.dmp.model.TbbSourceBean;
import com.digiwin.dmp.parquet.ParquetDataSchema;
import com.digiwin.dmp.parquet.ParquetDataWrite;
import com.digiwin.dmp.utils.GsonUtil;
import com.digiwin.dmp.utils.HadoopUtil;
import com.digiwin.dmp.utils.ImpalaJdbcUtil;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.sql.Date;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.type.Timestamp;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;

public class HiveTableWriteUtil {
    private static final Logger logger = LogManager.getLogger(ImpalaJdbcUtil.class);
    private static final String PARQUET = "parquet";
    private static final String TEXTFILE = "textfile";

    public static TbbSourceBean getSourceData() throws IOException {
        InputStream inputStream = HiveTableWriteUtil.class.getResourceAsStream("/tbb_table_parquet_format.json");
        BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
        String line = reader.readLine();
        TbbSourceBean tbbSourceBean = GsonUtil.fromJson(line, TbbSourceBean.class);
        return tbbSourceBean;
    }

    public static void main(String[] args) throws Exception {
        TbbSourceBean tbbSourceBean = HiveTableWriteUtil.getSourceData();
        ArrayList<Map<String, Object>> partition = new ArrayList<Map<String, Object>>();
        HiveTableWriteUtil.doFileLoad("test_dev", "test", "tbb_table_parquet_format", partition, tbbSourceBean.getData(), tbbSourceBean.getSchema(), TEXTFILE, "\u0001", true, true);
    }

    public static void doFileLoad(String env, String database, String tableName, ArrayList<Map<String, Object>> partitionList, List<List<Object>> hiveData, List<HiveTableMeta> hiveTableMeta, String fileType, String delimiter, boolean isTruncate, boolean isLastPage) throws Exception {
        block4: {
            try {
                if (PARQUET.equals(fileType.toLowerCase())) {
                    HiveTableWriteUtil.writeJSONDataWithParquet(env, database, tableName, partitionList, hiveData, hiveTableMeta, isTruncate, isLastPage);
                    break block4;
                }
                if (TEXTFILE.equals(fileType.toLowerCase())) {
                    HiveTableWriteUtil.writeJSONDataWithText(env, database, tableName, partitionList, hiveData, hiveTableMeta, delimiter, isTruncate, isLastPage);
                    break block4;
                }
                String errLog = "=======>   The file format passed in is: " + fileType + ",Currently, only parquet and textfile are supported";
                logger.error(errLog);
                throw new Exception(errLog);
            }
            catch (Exception e) {
                logger.error("doFileLoad Exception1 :", e);
                e.printStackTrace();
                throw e;
            }
        }
    }

    public static void writeJSONDataWithParquet(String env, String database, String tableName, ArrayList<Map<String, Object>> partitionList, List<List<Object>> hiveData, List<HiveTableMeta> hiveTableMeta, boolean isTruncate, boolean isLastPage) throws Exception {
        Configuration conf = HadoopUtil.getHdfsConfiguration(env);
        FileSystem fileSystem = FileSystem.get((Configuration)conf);
        String defaultPath = HiveTableWriteUtil.getHiveTableDefaultPath(fileSystem, database, tableName, partitionList, false);
        String defaultPathTemp = HiveTableWriteUtil.getHiveTableDefaultPath(fileSystem, database, tableName, partitionList, true);
        Path hdfsFilePath = HiveTableWriteUtil.getTableHdfsFilePath(defaultPath, PARQUET);
        Path hdfsFilePathTemp = HiveTableWriteUtil.getTableHdfsFilePath(defaultPathTemp, PARQUET);
        try {
            MessageType messageType = HiveTableWriteUtil.getParquetSchema(hiveTableMeta);
            ExampleParquetWriter.Builder builder = ((ExampleParquetWriter.Builder)((ExampleParquetWriter.Builder)((ExampleParquetWriter.Builder)((ExampleParquetWriter.Builder)((ExampleParquetWriter.Builder)ExampleParquetWriter.builder((Path)hdfsFilePathTemp).withWriteMode(ParquetFileWriter.Mode.OVERWRITE)).withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)).withCompressionCodec(CompressionCodecName.SNAPPY)).withConf(conf)).withRowGroupSize(0x100000)).withType(messageType);
            ParquetWriter writer = builder.build();
            SimpleGroupFactory groupFactory = new SimpleGroupFactory(messageType);
            for (int i = 0; i < hiveData.size(); ++i) {
                Group group = groupFactory.newGroup();
                writer.write((Object)HiveTableWriteUtil.jsonArrayConvernParquetGroup(group, hiveData.get(i), hiveTableMeta));
            }
            writer.close();
        }
        catch (Exception e) {
            logger.error("doFileLoad Exception2 :", e);
            throw new Exception("Write parquet file to " + hdfsFilePathTemp.getName() + "Failed, please check " + e.getMessage());
        }
        HiveTableWriteUtil.moveFileToTargetPath(fileSystem, isTruncate, hdfsFilePathTemp, hdfsFilePath, defaultPath, defaultPathTemp, isLastPage);
    }

    public static void writeJSONDataWithText(String env, String database, String tableName, ArrayList<Map<String, Object>> partitionList, List<List<Object>> hiveData, List<HiveTableMeta> hiveTableMeta, String delimiter, boolean isTruncate, boolean isLastPage) throws Exception {
        Configuration conf = HadoopUtil.getHdfsConfiguration(env);
        FileSystem fileSystem = FileSystem.get((Configuration)conf);
        String defaultPath = HiveTableWriteUtil.getHiveTableDefaultPath(fileSystem, database, tableName, partitionList, false);
        String defaultPathTemp = HiveTableWriteUtil.getHiveTableDefaultPath(fileSystem, database, tableName, partitionList, true);
        Path hdfsFilePath = HiveTableWriteUtil.getTableHdfsFilePath(defaultPath, TEXTFILE);
        Path hdfsFilePathTemp = HiveTableWriteUtil.getTableHdfsFilePath(defaultPathTemp, TEXTFILE);
        try {
            FSDataOutputStream outputStream = fileSystem.create(hdfsFilePathTemp);
            for (int i = 0; i < hiveData.size(); ++i) {
                List colArray = hiveData.get(i).stream().map(Object::toString).collect(Collectors.toList());
                if (colArray.size() == hiveTableMeta.size()) {
                    String colValue = String.join((CharSequence)delimiter, colArray) + "\n";
                    outputStream.write(colValue.getBytes());
                    continue;
                }
                logger.warn("Number of data columns:" + colArray.size() + " != Number of columns in the table: " + hiveTableMeta.size());
            }
            outputStream.close();
            HiveTableWriteUtil.moveFileToTargetPath(fileSystem, isTruncate, hdfsFilePathTemp, hdfsFilePath, defaultPath, defaultPathTemp, isLastPage);
        }
        catch (Exception e) {
            logger.error("doFileLoad Exception3 :", e);
            e.printStackTrace();
            throw new Exception("Write text file to " + hdfsFilePathTemp.getName() + "Failed, please check " + e.getMessage());
        }
    }

    public static void truncateHdfsData(FileSystem fileSystem, Path path) {
        try {
            if (fileSystem.exists(path)) {
                FileStatus[] fileStatuses;
                for (FileStatus fileStatus : fileStatuses = fileSystem.listStatus(path)) {
                    fileSystem.delete(fileStatus.getPath(), true);
                }
            }
        }
        catch (IOException ioException) {
            logger.error("doFileLoad Exception4 :", ioException);
            ioException.printStackTrace();
            throw new RuntimeException("Delete file failed: " + ioException.getMessage());
        }
    }

    private static void moveFileToTargetPath(FileSystem fileSystem, boolean isTruncate, Path hdfsFilePathTemp, Path hdfsFilePath, String defaultPath, String defaultPathTemp, boolean isLastPage) throws IOException {
        try {
            if (isTruncate) {
                Path path = new Path(defaultPath);
                logger.info("Cleaning up: " + path.toUri());
                HiveTableWriteUtil.truncateHdfsData(fileSystem, path);
            }
            if (fileSystem.rename(hdfsFilePathTemp, hdfsFilePath)) {
                if (isLastPage) {
                    logger.info("Data moved to target folder: Successful, delete tmp file path: " + defaultPathTemp);
                    fileSystem.deleteOnExit(new Path(defaultPathTemp));
                } else {
                    logger.info("Data moved to target folder: Successful, isLastPage: " + isLastPage);
                }
            }
        }
        catch (IOException e) {
            logger.error("Data move to target folder: " + e.getMessage());
            throw e;
        }
    }

    public static String getHiveTableDefaultPath(FileSystem fileSystem, String database, String tableName, ArrayList<Map<String, Object>> partitionList, boolean isTmp) {
        String defaultPath = "";
        try {
            defaultPath = isTmp ? String.format("/user/hive/warehouse/%s.db/%s/.dofileload/", database, tableName) : String.format("/user/hive/warehouse/%s.db/%s/", database, tableName);
            for (int i = 0; i < partitionList.size(); ++i) {
                Map<String, Object> partition = partitionList.get(i);
                for (String key : partition.keySet()) {
                    defaultPath = defaultPath + key + "=" + partition.get(key) + "/";
                }
            }
            if (!fileSystem.exists(new Path(defaultPath))) {
                logger.info("Path does not exist on hdfs, create : " + defaultPath);
                fileSystem.mkdirs(new Path(defaultPath));
            }
        }
        catch (IOException e) {
            logger.error("doFileLoad Exception5 :", e);
            e.printStackTrace();
            throw new RuntimeException("Create HDFS path: " + defaultPath + "Failed, please check: " + e.getMessage());
        }
        return defaultPath;
    }

    public static Path getTableHdfsFilePath(String defaultPath, String fileType) {
        long timestamp = System.currentTimeMillis();
        String fileName = DigestUtils.md5Hex((String)defaultPath) + "_" + timestamp + (PARQUET.equals(fileType) ? ".parq" : ".text");
        defaultPath = defaultPath + fileName;
        return new Path(defaultPath);
    }

    public static MessageType getParquetSchema(List<HiveTableMeta> tableMetaData) throws Exception {
        ArrayList<String> columnNames = new ArrayList<String>();
        ArrayList<TypeInfo> columnTypes = new ArrayList<TypeInfo>();
        for (HiveTableMeta hiveTableMeta : tableMetaData) {
            columnNames.add(hiveTableMeta.getColumn_name());
            columnTypes.add(HiveTableWriteUtil.hiveTypeConvernParquetTypeInfo(hiveTableMeta.getColumn_type()));
        }
        MessageType messageType = ParquetDataSchema.convert(columnNames, columnTypes);
        return messageType;
    }

    private static TypeInfo hiveTypeConvernParquetTypeInfo(String value) throws Exception {
        if (ImpalaUtilConstants.hivePrimitiveTypeInfoMap.containsKey(value)) {
            return ImpalaUtilConstants.hivePrimitiveTypeInfoMap.get(value);
        }
        if (value.startsWith("decimal")) {
            String input = value.replaceAll(" ", "");
            Pattern pattern = Pattern.compile("decimal\\((\\d+),(\\d+)\\)");
            Matcher matcher = pattern.matcher(input.trim());
            if (matcher.find()) {
                int precision = Integer.parseInt(matcher.group(1));
                int scale = Integer.parseInt(matcher.group(2));
                return new DecimalTypeInfo(precision, scale);
            }
            throw new Exception("===>  Exception in obtaining decimal type: No matching precision, scale found");
        }
        ArrayList<String> supportType = new ArrayList<String>(ImpalaUtilConstants.hivePrimitiveTypeInfoMap.keySet());
        supportType.add("decimal");
        throw new Exception("Unsupported data type: " + value + ",Currently only supports: " + supportType);
    }

    private static Group jsonArrayConvernParquetGroup(Group group, List<Object> hiveData, List<HiveTableMeta> tableMetaData) throws Exception {
        for (int i = 0; i < tableMetaData.size(); ++i) {
            String column_name = tableMetaData.get(i).getColumn_name();
            String column_type = tableMetaData.get(i).getColumn_type();
            if (hiveData.size() != tableMetaData.size() || null == hiveData.get(i) || "NULL".equals(hiveData.get(i)) || "null".equals(hiveData.get(i))) continue;
            if (TypeInfoFactory.booleanTypeInfo.getTypeName().equals(column_type)) {
                group.append(column_name, ParquetDataWrite.booleanDataWriter(Boolean.parseBoolean(hiveData.get(i).toString())).booleanValue());
                continue;
            }
            if (TypeInfoFactory.intTypeInfo.getTypeName().equals(column_type)) {
                group.append(column_name, ParquetDataWrite.intWriter(Integer.parseInt(hiveData.get(i).toString())).intValue());
                continue;
            }
            if (TypeInfoFactory.longTypeInfo.getTypeName().equals(column_type)) {
                group.append(column_name, ParquetDataWrite.longWriter(Long.parseLong(hiveData.get(i).toString())).longValue());
                continue;
            }
            if (TypeInfoFactory.stringTypeInfo.getTypeName().equals(column_type) || TypeInfoFactory.charTypeInfo.getTypeName().equals(column_type) || TypeInfoFactory.varcharTypeInfo.getTypeName().equals(column_type)) {
                group.append(column_name, ParquetDataWrite.stringWriter(hiveData.get(i).toString()));
                continue;
            }
            if (TypeInfoFactory.floatTypeInfo.getTypeName().equals(column_type)) {
                group.append(column_name, ParquetDataWrite.floatWriter(Float.valueOf(Float.parseFloat(hiveData.get(i).toString()))).floatValue());
                continue;
            }
            if (TypeInfoFactory.doubleTypeInfo.getTypeName().equals(column_type)) {
                group.append(column_name, ParquetDataWrite.doubleDataWriter(Double.parseDouble(hiveData.get(i).toString())).doubleValue());
                continue;
            }
            if (TypeInfoFactory.shortTypeInfo.getTypeName().equals(column_type)) {
                group.append(column_name, ParquetDataWrite.shortDataWriter(Short.parseShort(hiveData.get(i).toString())).intValue());
                continue;
            }
            if (TypeInfoFactory.dateTypeInfo.getTypeName().equals(column_type)) {
                group.append(column_name, ParquetDataWrite.dateWrite(Date.valueOf(hiveData.get(i).toString())).intValue());
                continue;
            }
            if (TypeInfoFactory.timestampTypeInfo.getTypeName().equals(column_type)) {
                group.append(column_name, ParquetDataWrite.timestampWrite(Timestamp.valueOf((String)hiveData.get(i).toString())));
                continue;
            }
            if (column_type.startsWith("decimal")) {
                String input = column_type.replaceAll(" ", "");
                Pattern pattern = Pattern.compile("decimal\\((\\d+),(\\d+)\\)");
                Matcher matcher = pattern.matcher(input.trim());
                if (matcher.find()) {
                    int precision = Integer.parseInt(matcher.group(1));
                    int scale = Integer.parseInt(matcher.group(2));
                    group.append(column_name, ParquetDataWrite.decimalWrite(hiveData.get(i).toString(), precision, scale));
                    continue;
                }
                throw new Exception("===>  Exception in obtaining decimal type: No matching precision, scale found");
            }
            ArrayList<String> supportType = new ArrayList<String>(ImpalaUtilConstants.hivePrimitiveTypeInfoMap.keySet());
            supportType.add("decimal");
            throw new Exception("===> Unsupported data type: " + column_type + ",Currently only supports:" + supportType);
        }
        return group;
    }
}

