package shade.com.aliyun.emr.fs.proxy.task;

import com.alibaba.jboot.JbootBlockletWriter;
import com.alibaba.jboot.buffer.JbootBufferFactory;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shade.com.aliyun.emr.compatibility.StopWatch;
import shade.com.aliyun.emr.fs.proxy.JindoCacheUtils;
import shade.com.aliyun.emr.fs.proxy.JindoStreamCacheConfigKeys;

/* loaded from: input_file:shade/com/aliyun/emr/fs/proxy/task/WriteBlockletTask.class */
public class WriteBlockletTask implements Runnable {
    public static final Logger LOG = LoggerFactory.getLogger((Class<?>) WriteBlockletTask.class);
    private Path path;
    private long cacheBlockletId;
    private int cacheBlockletSize;
    private JbootBlockletWriter writer;
    final FileSystem fs;
    final FSDataInputStream in;
    final long timeoutInMS;
    final StopWatch sw;
    private AtomicBoolean closed = new AtomicBoolean(false);

    public WriteBlockletTask(String str, JbootBlockletWriter jbootBlockletWriter, long j, int i, Configuration configuration) throws IOException, URISyntaxException {
        this.writer = jbootBlockletWriter;
        this.path = new Path(str);
        this.cacheBlockletId = j;
        this.cacheBlockletSize = i;
        this.timeoutInMS = configuration.getLong(JindoStreamCacheConfigKeys.JindoCache.CACHE_BACKEND_JINDO_INPUT_WRITE_CACHE_TIMEOUT_IN_MS_KEY, 300000L);
        URI uri = this.path.toUri();
        String format = String.format("fs.jindo.%s.impl", uri.getScheme());
        Class<?> cls = configuration.getClass(format, null);
        if (cls == null) {
            throw new UnsupportedFileSystemException(String.format("%s=null: %s: %s", format, "No FileSystem configured for scheme", uri.getScheme()));
        }
        this.fs = (FileSystem) ReflectionUtils.newInstance(cls, configuration);
        this.fs.initialize(uri, configuration);
        this.in = this.fs.open(this.path);
        this.sw = new StopWatch().start();
        this.writer.getWriterLock().readLock().lock();
    }

    public void close() {
        try {
            if (!this.closed.get()) {
                this.writer.getWriterLock().readLock().unlock();
                this.closed.set(true);
            }
        } catch (Exception e) {
            LOG.info("Failed to close writer lock cache {}, cache_size:{}, idx:{}, {}, {}ms", this.path, Integer.valueOf(this.cacheBlockletSize), Long.valueOf(this.cacheBlockletId), e.getMessage(), Long.valueOf(this.sw.stop().now() / 1000000));
        }
    }

    public void write() {
        if (this.writer.isClose()) {
            this.writer.getWriterLock().readLock().unlock();
            LOG.info("Failed to write remote cache {}, cache_size:{}, idx:{}, {}", this.path, Integer.valueOf(this.cacheBlockletSize), Long.valueOf(this.cacheBlockletId), "Writer is already closed");
            return;
        }
        final ByteBuffer allocateBuffer = JbootBufferFactory.allocateBuffer(this.cacheBlockletSize);
        try {
            TasksUtils.runWithTimeout(new Runnable() { // from class: shade.com.aliyun.emr.fs.proxy.task.WriteBlockletTask.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        byte[] bArr = new byte[512];
                        long j = WriteBlockletTask.this.cacheBlockletId * 8388608;
                        int i = 0;
                        while (WriteBlockletTask.this.cacheBlockletSize > i) {
                            int read = WriteBlockletTask.this.in.read(j + i, bArr, 0, WriteBlockletTask.this.cacheBlockletSize - i > 512 ? 512 : WriteBlockletTask.this.cacheBlockletSize - i);
                            if (read == -1) {
                                return;
                            }
                            i += read;
                            allocateBuffer.put(bArr, 0, read);
                        }
                        allocateBuffer.flip();
                        WriteBlockletTask.this.writeCache(WriteBlockletTask.this.cacheBlockletId, allocateBuffer, WriteBlockletTask.this.cacheBlockletId > 0 && WriteBlockletTask.this.cacheBlockletSize < 8388608);
                        WriteBlockletTask.LOG.info("Succeed to write remote cache {}, cache_size:{}, idx:{}, {}ms", WriteBlockletTask.this.path, Integer.valueOf(WriteBlockletTask.this.cacheBlockletSize), Long.valueOf(WriteBlockletTask.this.cacheBlockletId), Long.valueOf(WriteBlockletTask.this.sw.stop().now() / 1000000));
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
            }, this.timeoutInMS, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            Logger logger = LOG;
            Object[] objArr = new Object[4];
            objArr[0] = this.path;
            objArr[1] = Integer.valueOf(this.cacheBlockletSize);
            objArr[2] = Long.valueOf(this.cacheBlockletId);
            objArr[3] = e instanceof TimeoutException ? "timeout " + (this.sw.stop().now() / 1000000) + "ms" : e.getMessage();
            logger.info("Failed to write remote cache {}, cache_size:{}, idx:{}, {}", objArr);
        } finally {
            JbootBufferFactory.returnBuffer(allocateBuffer);
            JindoCacheUtils.closeQuietly(this.in);
            JindoCacheUtils.closeQuietly(this.fs);
            close();
        }
    }

    public boolean isClosed() {
        return this.closed.get();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void writeCache(long j, ByteBuffer byteBuffer, boolean z) throws IOException {
        this.writer.writeDirect(byteBuffer, j, z);
    }

    @Override // java.lang.Runnable
    public void run() {
        write();
    }

    protected void finalize() throws Throwable {
        close();
        super.finalize();
    }
}
