package shade.com.aliyun.emr.fs.oss.flink;

import bigboot.protocol.type.CltFlushMultipartUploadReply;
import com.alibaba.jboot.JbootBlockletWriter;
import com.alibaba.jboot.buffer.JbootBufferFactory;
import com.alibaba.jboot.google.flatbuffers.Table;
import com.alibaba.jfs.OssFileletSystem;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shade.com.aliyun.emr.buffer.ChunkBuffer;
import shade.com.aliyun.emr.compatibility.StopWatch;
import shade.com.aliyun.emr.fs.Version;
import shade.com.aliyun.emr.fs.common.FsStats;
import shade.com.aliyun.emr.fs.internal.oss.OssOpContext;
import shade.com.aliyun.emr.fs.internal.oss.OssPartInfo;
import shade.com.aliyun.emr.fs.oss.commit.magic.SinglePendingCommit;

/* loaded from: input_file:shade/com/aliyun/emr/fs/oss/flink/JindoFlinkOutputStream.class */
public class JindoFlinkOutputStream extends OutputStream {
    static final Logger LOG = LoggerFactory.getLogger((Class<?>) JindoFlinkOutputStream.class);
    private boolean closed;
    private ChunkBuffer writeBuffer;
    private OssFileletSystem ossFileletSystem;
    private SinglePendingCommit pendingCommit;
    private OssFlinkWriteContext context;
    private final byte[] singleByteBuf = new byte[1];
    private long byteWriten = 0;
    private long writeElapsedNanos = 0;

    /* loaded from: input_file:shade/com/aliyun/emr/fs/oss/flink/JindoFlinkOutputStream$OssFlinkWriteContext.class */
    public static class OssFlinkWriteContext extends OssOpContext {
        public String path;
        public int bufferSize;
        public JbootBlockletWriter jbootWriter;
        public String bucket;
    }

    public JindoFlinkOutputStream(OssFlinkWriteContext ossFlinkWriteContext) throws IOException {
        this.context = ossFlinkWriteContext;
        this.ossFileletSystem = ossFlinkWriteContext.ossFileletSystem;
        this.writeBuffer = new ChunkBuffer(JbootBufferFactory.allocateBuffer(ossFlinkWriteContext.bufferSize));
        LOG.info("JindoOssMagicOutputStream init, path:{}. Buffer size {}, bucket {}", ossFlinkWriteContext.path, Integer.valueOf(ossFlinkWriteContext.bufferSize), ossFlinkWriteContext.bucket);
    }

    @Override // java.io.OutputStream, java.io.Flushable
    public synchronized void flush() throws IOException {
        LOG.debug("Not real flush");
    }

    @Override // java.io.OutputStream, java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() throws IOException {
        try {
            if (this.closed) {
                return;
            }
            try {
                writeOutBuffer();
                StopWatch start = new StopWatch().start();
                Table table = null;
                try {
                    try {
                        CltFlushMultipartUploadReply flushMultipartUpload = this.context.jbootWriter.flushMultipartUpload();
                        String uploadId = flushMultipartUpload.uploadId();
                        List<OssPartInfo> fromPartList = OssPartInfo.fromPartList(flushMultipartUpload.partListAsPartList());
                        if (flushMultipartUpload != null) {
                            JbootBufferFactory.returnBuffer(flushMultipartUpload.getByteBuffer());
                        }
                        if (StringUtils.isNotEmpty(uploadId)) {
                            setSingePendingCommit(fromPartList, uploadId);
                        }
                        this.writeElapsedNanos += start.stop().now();
                        this.closed = true;
                        FsStats.logStats("upload", new Path(this.context.path), (Path) null, this.byteWriten, (String) null, this.writeElapsedNanos, Version.smartdata_version);
                        cleanUp();
                    } catch (Throwable th) {
                        if (0 != 0) {
                            JbootBufferFactory.returnBuffer(table.getByteBuffer());
                        }
                        throw th;
                    }
                } catch (IOException e) {
                    throw new IOException("Failed to flush all written data to oss with multipart upload of path " + this.context.path, e);
                }
            } catch (IOException e2) {
                this.closed = true;
                LOG.error("Close " + this.context.path + " error " + e2.getMessage(), (Throwable) e2);
                throw new IOException("Close stream " + this.context.path + " error " + e2.getMessage(), e2);
            }
        } catch (Throwable th2) {
            FsStats.logStats("upload", new Path(this.context.path), (Path) null, this.byteWriten, (String) null, this.writeElapsedNanos, Version.smartdata_version);
            cleanUp();
            throw th2;
        }
    }

    private void setSingePendingCommit(List<OssPartInfo> list, String str) throws IOException {
        SinglePendingCommit singlePendingCommit = new SinglePendingCommit();
        singlePendingCommit.touch(System.currentTimeMillis());
        singlePendingCommit.setUri(this.context.path);
        singlePendingCommit.setUploadId(str);
        singlePendingCommit.setBucket(this.context.bucket);
        singlePendingCommit.setText("");
        singlePendingCommit.bindCommitData(list);
        singlePendingCommit.toBytes();
        LOG.info("Uncommitted data pending to file {},  with {} parts.", this.context.path, Integer.valueOf(list.size()));
        this.pendingCommit = singlePendingCommit;
    }

    public SinglePendingCommit getPendingCommit() {
        if (this.closed) {
            return this.pendingCommit;
        }
        throw new RuntimeException("Should be invoked after close this OutputStream.");
    }

    @Override // java.io.OutputStream
    public synchronized void write(int i) throws IOException {
        checkStatus();
        this.singleByteBuf[0] = (byte) i;
        write(this.singleByteBuf, 0, 1);
    }

    @Override // java.io.OutputStream
    public synchronized void write(byte[] bArr, int i, int i2) throws IOException {
        checkStatus();
        int write = this.writeBuffer.write(bArr, i, i2);
        if (write < i2) {
            try {
                writeOutBuffer();
                write(bArr, i + write, i2 - write);
            } catch (Exception e) {
                cleanUp();
                throw new IOException("Write " + i + "~" + i2 + " error", e);
            }
        }
    }

    protected void finalize() {
        cleanUp();
    }

    private void writeOutBuffer() throws IOException {
        StopWatch start = new StopWatch().start();
        this.writeBuffer.buffer().flip();
        if (this.writeBuffer.buffer().remaining() > 0) {
            this.context.jbootWriter.write(this.writeBuffer.buffer());
        }
        this.writeBuffer.buffer().position(0);
        this.writeElapsedNanos += start.stop().now();
    }

    private void checkStatus() throws IOException {
        if (this.closed) {
            throw new IOException("Output stream closed.");
        }
    }

    private void cleanUp() {
        this.context.jbootWriter.close();
        if (this.writeBuffer != null) {
            JbootBufferFactory.returnBuffer(this.writeBuffer.buffer());
            this.writeBuffer = null;
        }
    }
}
