Forum Discussion

marshal.tito01's avatar
marshal.tito01
Copper Contributor
Sep 27, 2018
Solved

How can I use NiFi to ingest data from/to ADLS?

I would like to use NiFi to connect with ADLS. My scenario is like this: Nifi is installed and running in windows machine.Now i want to move data from my windows local directory to ADLS. I am not using any hadoop component for now. From ADLS again i want to move that data to SQL server which is in Azure too.
How can i connect windows running Nifi to ADLS? All the instruction i found configuring core-site.xml files and taking the jars to Nifi specific folder. But as i dont have Hadoop running(so i dont have core-site.xml file) in that case how I can connect Nifi to ADLS?

Can anyone please share the pointers how it can be done?

Thanks in advance.

  • No Hadoop is needed .. For ADLS Gen1 and Gen1 you need a couple of JAR files and a simplified core-site.xml.  I am currently working with Nifi 1.9.0 (released feb 2019).

     

    For ADLS Gen1 I am using  :

    • azure-data-lake-store-sdk-2.3.1.jar
    • hadoop-azure-datalake-3.1.1.jar 
    • These jars are available in the Maven central repository
    • My core-site.xml : (replace the $< >$ with your values. 

     

    <configuration>
    <property>
    <name>fs.defaultFS</name>
    <value>adl://$<adls storage account name>$.azuredatalakestore.net</value>
    </property>
    <property>
    <name>dfs.adls.oauth2.access.token.provider.type</name>
    <value>ClientCredential</value>
    </property>
    <property>
    <name>dfs.adls.oauth2.refresh.url</name>
    <value>https://login.microsoftonline.com/$<tenant id>$/oauth2/token</value>
    </property>
    <property>
    <name>dfs.adls.oauth2.client.id</name>
    <value>$<client id>$</value>
    </property>
    <property>
    <name>dfs.adls.oauth2.credential</name>
    <value>$<key>$</value>
    </property>
    </configuration>
    
    

    For ADLS Gen2 I am using  : 

     

    • hadoop-azure-3.2.0.jar
    • wildfly-openssl-1.0.4.Final.jar
    • my core-site.xml (replace the $< >$ with your values. 

     

    <configuration>
      <property>
        <name>fs.defaultFS</name>
        <value>abfss://$<cotainer>$@$<account>$.dfs.core.windows.net</value>
      </property>
      <property>
        <name>fs.azure.account.key.adbstorgen2.dfs.core.windows.net</name>
        <value>$<storage key>$</value>
      </property>
      <property>
                 <name>fs.adlsGen2.impl</name>
                 <value>org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem</value>
       </property>
       <property>
                 <name>fs.abfss.impl</name>
                 <value>org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem</value>
       </property>
       <property>
                 <name>fs.AbstractFileSystem.adlsGen2.impl</name>
                 <value>org.apache.hadoop.fs.azurebfs.Abfs</value>
       </property>
       <property>
                 <name>fs.AbstractFileSystem.abfss.impl</name>
                 <value>org.apache.hadoop.fs.azurebfs.Abfss</value>
       </property>
       <property>
                 <name>fs.azure.check.block.md5</name>
                 <value>false</value>
       </property>
       <property>
                 <name>fs.azure.store.blob.md5</name>
                 <value>false</value>
       </property>
       <property>
                 <name>fs.azure.createRemoteFileSystemDuringInitialization</name>
                 <value>true</value>
       </property>
    </configuration>

    for the put list fetch HDFS flows .. you just need to fill in the Hadoop Configuration Resources with the path to your core-site.xml and Additional Classpath Resources  with the folder that has the azure jar files. Enjoy !

     

  • No Hadoop is needed .. For ADLS Gen1 and Gen1 you need a couple of JAR files and a simplified core-site.xml.  I am currently working with Nifi 1.9.0 (released feb 2019).

     

    For ADLS Gen1 I am using  :

    • azure-data-lake-store-sdk-2.3.1.jar
    • hadoop-azure-datalake-3.1.1.jar 
    • These jars are available in the Maven central repository
    • My core-site.xml : (replace the $< >$ with your values. 

     

    <configuration>
    <property>
    <name>fs.defaultFS</name>
    <value>adl://$<adls storage account name>$.azuredatalakestore.net</value>
    </property>
    <property>
    <name>dfs.adls.oauth2.access.token.provider.type</name>
    <value>ClientCredential</value>
    </property>
    <property>
    <name>dfs.adls.oauth2.refresh.url</name>
    <value>https://login.microsoftonline.com/$<tenant id>$/oauth2/token</value>
    </property>
    <property>
    <name>dfs.adls.oauth2.client.id</name>
    <value>$<client id>$</value>
    </property>
    <property>
    <name>dfs.adls.oauth2.credential</name>
    <value>$<key>$</value>
    </property>
    </configuration>
    
    

    For ADLS Gen2 I am using  : 

     

    • hadoop-azure-3.2.0.jar
    • wildfly-openssl-1.0.4.Final.jar
    • my core-site.xml (replace the $< >$ with your values. 

     

    <configuration>
      <property>
        <name>fs.defaultFS</name>
        <value>abfss://$<cotainer>$@$<account>$.dfs.core.windows.net</value>
      </property>
      <property>
        <name>fs.azure.account.key.adbstorgen2.dfs.core.windows.net</name>
        <value>$<storage key>$</value>
      </property>
      <property>
                 <name>fs.adlsGen2.impl</name>
                 <value>org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem</value>
       </property>
       <property>
                 <name>fs.abfss.impl</name>
                 <value>org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem</value>
       </property>
       <property>
                 <name>fs.AbstractFileSystem.adlsGen2.impl</name>
                 <value>org.apache.hadoop.fs.azurebfs.Abfs</value>
       </property>
       <property>
                 <name>fs.AbstractFileSystem.abfss.impl</name>
                 <value>org.apache.hadoop.fs.azurebfs.Abfss</value>
       </property>
       <property>
                 <name>fs.azure.check.block.md5</name>
                 <value>false</value>
       </property>
       <property>
                 <name>fs.azure.store.blob.md5</name>
                 <value>false</value>
       </property>
       <property>
                 <name>fs.azure.createRemoteFileSystemDuringInitialization</name>
                 <value>true</value>
       </property>
    </configuration>

    for the put list fetch HDFS flows .. you just need to fill in the Hadoop Configuration Resources with the path to your core-site.xml and Additional Classpath Resources  with the folder that has the azure jar files. Enjoy !

     

    • Vignesh_Shetty's avatar
      Vignesh_Shetty
      Copper Contributor
      After following these steps we ran into this issue
      2021-08-12 05:56:43,156 ERROR [Timer-Driven Process Thread-8] o.apache.nifi.processors.hadoop.PutHDFS PutHDFS[id=38c16dc4-017b-1000-daf4-be57448d3110] Failed to properly initialize Processor. If still scheduled to run, NiFi will attempt to initialize and run the Processor again after the 'Administrative Yield Duration' has elapsed. Failure is due to java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem not found: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem not found
      java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem not found
      at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2479)
      at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3254)
      at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3286)
      at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:473)
      at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:225)
      at org.apache.nifi.processors.hadoop.AbstractHadoopProcessor$1.run(AbstractHadoopProcessor.java:434)
      at org.apache.nifi.processors.hadoop.AbstractHadoopProcessor$1.run(AbstractHadoopProcessor.java:431)
      at java.security.AccessController.doPrivileged(Native Method)
      at javax.security.auth.Subject.doAs(Subject.java:422)
      at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1962)
      at org.apache.nifi.processors.hadoop.AbstractHadoopProcessor.getFileSystemAsUser(AbstractHadoopProcessor.java:431)
      at org.apache.nifi.processors.hadoop.AbstractHadoopProcessor.resetHDFSResources(AbstractHadoopProcessor.java:393)
      at org.apache.nifi.processors.hadoop.AbstractHadoopProcessor.abstractOnScheduled(AbstractHadoopProcessor.java:251)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:498)
      at org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:142)
      at org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:130)
      at org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:75)
      at org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotation(ReflectionUtils.java:52)
      at org.apache.nifi.controller.StandardProcessorNode.lambda$initiateStart$4(StandardProcessorNode.java:1515)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      at java.lang.Thread.run(Thread.java:748)
      Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem not found
      at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2383)
      at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2477)
      ... 27 common frames omitted
      2021-08-12 05:56:43,156 WARN [Timer-Driven Process Thread-8] o.apache.nifi.processors.hadoop.PutHDFS PutHDFS[id=38c16dc4-017b-1000-daf4-be57448d3110] Error stopping FileSystem statistics thread: null
      2021-08-12 05:56:46,375 INFO [Write-Ahead Local State Provider Maintenance] org.wali.MinimalLockingWriteAheadLog org.wali.MinimalLockingWriteAheadLog@3b25ce5e checkpointed with 1 Records and 0 Swap Files in 22 milliseconds (Stop-the-world time = 1 milliseconds, Clear Edit Logs time = 1 millis), max Transaction ID 2
      2021-08-12 05:56:46,477 INFO [pool-10-thread-1] o.a.n.c.r.WriteAheadFlowFileRepository Initiating checkpoint of FlowFile Repository
      2021-08-12 05:56:46,477 INFO [pool-10-thread-1] o.a.n.c.r.WriteAheadFlowFileRepository Successfully checkpointed FlowFile Repository with 1 records in 0 milliseconds

      I have included these jars in Additional classpath
      sshuser@wn2-kylohd:~$ ls /usr/lib/hdinsight-datalake/
      adls2-oauth2-token-provider-1.0.jar core-site.xml hdfs-site.xml jackson-core-2.9.8.jar okio-1.6.0.jar
      azure-data-lake-store-sdk-2.3.6.jar hadoop-azure-datalake-2.7.3.2.6.5.3033-1.jar hive-site.xml okhttp-2.7.5.jar

      the setup is with HDI3.6 ADLS Gen2 as storage account with nifi 1.9.0 on worker node where ownership is default during nifi standalone setup 501:staff
    • stawo's avatar
      stawo
      Copper Contributor

      Hi Bruce Nelson ,

      thanks for the tips.

      I tried to reproduce the steps you mentioned, but NiFi doesn't load properly the PutHDFS flow (it's the only one I'm testing at the moment).
      The error I get is:

      o.apache.nifi.processors.hadoop.PutHDFS PutHDFS[id=...] HDFS Configuration error - Configuration property [account name].dfs.core.windows.net not found.: Configuration property [account name].dfs.core.windows.net not found.
      org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConfigurationPropertyNotFoundException: Configuration property [account name].dfs.core.windows.net not found.
      at org.apache.hadoop.fs.azurebfs.AbfsConfiguration.getStorageAccountKey(AbfsConfiguration.java:342)
      at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.initializeClient(AzureBlobFileSystemStore.java:812)
      at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.<init>(AzureBlobFileSystemStore.java:149)
      at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.initialize(AzureBlobFileSystem.java:108)
      at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3288)
      at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:473)
      at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:225)
      at org.apache.nifi.processors.hadoop.AbstractHadoopProcessor$1.run(AbstractHadoopProcessor.java:434)
      at org.apache.nifi.processors.hadoop.AbstractHadoopProcessor$1.run(AbstractHadoopProcessor.java:431)
      at java.base/java.security.AccessController.doPrivileged(Native Method)
      at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
      at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1962)
      at org.apache.nifi.processors.hadoop.AbstractHadoopProcessor.getFileSystemAsUser(AbstractHadoopProcessor.java:431)
      at org.apache.nifi.processors.hadoop.AbstractHadoopProcessor.resetHDFSResources(AbstractHadoopProcessor.java:393)
      at org.apache.nifi.processors.hadoop.AbstractHadoopProcessor.abstractOnScheduled(AbstractHadoopProcessor.java:251)
      at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.base/java.lang.reflect.Method.invoke(Method.java:564)
      at org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:142)
      at org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:130)
      at org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:75)
      at org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotation(ReflectionUtils.java:52)
      at org.apache.nifi.controller.StandardProcessorNode.lambda$initiateStart$4(StandardProcessorNode.java:1515)
      at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
      at java.base/java.lang.Thread.run(Thread.java:844)

       

      I tried to check around, but I have no idea what it can refer to.

      I'm currently using Nifi 1.9.1, and the jars with the version you specified on Ubuntu 18.04 Server.

      Could you help me?

      Thanks in advance,

      Michele

      • stawo's avatar
        stawo
        Copper Contributor

        Ok, I figured out where the problem was by reading http://docs.wandisco.com/bigdata/wdfusion/adls/.
        In Bruce answer there is a small mistake in the ADLS Gen2 core-site.xml.
        The correct one should be (differences marked in bold):

         

        <configuration>
          <property>
            <name>fs.defaultFS</name>
            <value>abfss://$<container>$@$<account>$.dfs.core.windows.net</value>
          </property>
          <property>
            <name>fs.azure.account.key.$<account>$.dfs.core.windows.net</name>
            <value>$<storage key>$</value>
          </property>
          <property>
                     <name>fs.adlsGen2.impl</name>
                     <value>org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem</value>
           </property>
           <property>
                     <name>fs.abfss.impl</name>
                     <value>org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem</value>
           </property>
           <property>
                     <name>fs.AbstractFileSystem.adlsGen2.impl</name>
                     <value>org.apache.hadoop.fs.azurebfs.Abfs</value>
           </property>
           <property>
                     <name>fs.AbstractFileSystem.abfss.impl</name>
                     <value>org.apache.hadoop.fs.azurebfs.Abfss</value>
           </property>
           <property>
                     <name>fs.azure.check.block.md5</name>
                     <value>false</value>
           </property>
           <property>
                     <name>fs.azure.store.blob.md5</name>
                     <value>false</value>
           </property>
           <property>
                     <name>fs.azure.createRemoteFileSystemDuringInitialization</name>
                     <value>true</value>
           </property>
        </configuration>

         Tested using:

        • hadoop-azure-3.2.0.jar
        • wildfly-openssl-1.0.4.Final.jar

Resources