package shade.com.aliyun.emr.fs.hdfs.task;

import com.alibaba.jboot.JbootBlockletWriter;
import com.alibaba.jboot.buffer.JbootBufferFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shade.com.aliyun.emr.compatibility.StopWatch;
import shade.com.aliyun.emr.fs.hdfs.CacheUtils;
import shade.com.aliyun.emr.fs.hdfs.JindoHdfsStreamCacheConfigKeys;

/* loaded from: input_file:shade/com/aliyun/emr/fs/hdfs/task/HdfsBlockletAsyncTask.class */
public class HdfsBlockletAsyncTask implements Runnable {
    public static final Logger LOG = LoggerFactory.getLogger((Class<?>) HdfsBlockletAsyncTask.class);
    private Path path;
    private long cacheBlockletId;
    private int cacheBlockletSize;
    private JbootBlockletWriter writer;
    final FSDataInputStream in;
    final long timeoutInMS;
    final StopWatch sw;
    private AtomicBoolean closed = new AtomicBoolean(false);
    final DistributedFileSystem dfs = new DistributedFileSystem();

    public HdfsBlockletAsyncTask(String str, JbootBlockletWriter jbootBlockletWriter, long j, int i, Configuration configuration) throws IOException {
        this.writer = jbootBlockletWriter;
        this.path = new Path(str);
        this.cacheBlockletId = j;
        this.cacheBlockletSize = i;
        this.timeoutInMS = configuration.getLong(JindoHdfsStreamCacheConfigKeys.JindoCache.CACHE_BACKEND_JINDO_INPUT_WRITE_CACHE_TIMEOUT_IN_MS_KEY, 30000L);
        this.dfs.initialize(this.path.toUri(), configuration);
        this.in = this.dfs.open(this.path);
        this.sw = new StopWatch().start();
        this.writer.getWriterLock().readLock().lock();
    }

    public void close() {
        try {
            if (!this.closed.get()) {
                this.writer.getWriterLock().readLock().unlock();
                this.closed.set(true);
            }
        } catch (Exception e) {
            LOG.debug("Failed to close writer lock cache {}, cache_size:{}, idx:{}, {}, {}ms", this.path, Integer.valueOf(this.cacheBlockletSize), Long.valueOf(this.cacheBlockletId), e.getMessage(), Long.valueOf(this.sw.stop().now() / 1000000));
        }
    }

    public void write() {
        if (this.writer.isClose()) {
            this.writer.getWriterLock().readLock().unlock();
            LOG.debug("Failed to write remote cache {}, cache_size:{}, idx:{}, {}", this.path, Integer.valueOf(this.cacheBlockletSize), Long.valueOf(this.cacheBlockletId), "Writer is already closed");
            return;
        }
        final ByteBuffer allocateBuffer = JbootBufferFactory.allocateBuffer(this.cacheBlockletSize);
        try {
            TasksUtils.runWithTimeout(new Runnable() { // from class: shade.com.aliyun.emr.fs.hdfs.task.HdfsBlockletAsyncTask.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        byte[] bArr = new byte[512];
                        long j = HdfsBlockletAsyncTask.this.cacheBlockletId * 8388608;
                        int i = 0;
                        while (HdfsBlockletAsyncTask.this.cacheBlockletSize > i) {
                            int read = HdfsBlockletAsyncTask.this.in.read(j + i, bArr, 0, HdfsBlockletAsyncTask.this.cacheBlockletSize - i > 512 ? 512 : HdfsBlockletAsyncTask.this.cacheBlockletSize - i);
                            if (read == -1) {
                                return;
                            }
                            i += read;
                            allocateBuffer.put(bArr, 0, read);
                        }
                        allocateBuffer.flip();
                        HdfsBlockletAsyncTask.this.writeCache(HdfsBlockletAsyncTask.this.cacheBlockletId, allocateBuffer, HdfsBlockletAsyncTask.this.cacheBlockletId > 0 && HdfsBlockletAsyncTask.this.cacheBlockletSize < 8388608);
                        HdfsBlockletAsyncTask.LOG.debug("Succeed to write remote cache {}, cache_size:{}, idx:{}, {}ms", HdfsBlockletAsyncTask.this.path, Integer.valueOf(HdfsBlockletAsyncTask.this.cacheBlockletSize), Long.valueOf(HdfsBlockletAsyncTask.this.cacheBlockletId), Long.valueOf(HdfsBlockletAsyncTask.this.sw.stop().now() / 1000000));
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
            }, this.timeoutInMS, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            Logger logger = LOG;
            Object[] objArr = new Object[4];
            objArr[0] = this.path;
            objArr[1] = Integer.valueOf(this.cacheBlockletSize);
            objArr[2] = Long.valueOf(this.cacheBlockletId);
            objArr[3] = e instanceof TimeoutException ? "timeout " + (this.sw.stop().now() / 1000000) + "ms" : e.getMessage();
            logger.info("Failed to write remote cache {}, cache_size:{}, idx:{}, {}", objArr);
        } finally {
            JbootBufferFactory.returnBuffer(allocateBuffer);
            CacheUtils.closeQuietly(this.in);
            CacheUtils.closeQuietly(this.dfs);
            close();
        }
    }

    public boolean isClosed() {
        return this.closed.get();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void writeCache(long j, ByteBuffer byteBuffer, boolean z) throws IOException {
        this.writer.writeDirect(byteBuffer, j, z);
    }

    @Override // java.lang.Runnable
    public void run() {
        write();
    }

    protected void finalize() throws Throwable {
        close();
        super.finalize();
    }
}
